- 重构tasksch
This commit is contained in:
@@ -4,7 +4,6 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
|
||||||
|
|
||||||
"git.rosy.net.cn/baseapi/utils"
|
"git.rosy.net.cn/baseapi/utils"
|
||||||
"git.rosy.net.cn/jx-callback/business/jxutils"
|
"git.rosy.net.cn/jx-callback/business/jxutils"
|
||||||
@@ -421,19 +420,19 @@ func (v *VendorSync) SyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendo
|
|||||||
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
|
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
|
||||||
}, isContinueWhenError)
|
}, isContinueWhenError)
|
||||||
if task != nil {
|
if task != nil {
|
||||||
if vendorErr := partner.IsErrChangePriceFailed(task.GetOriginalErr()); vendorErr != nil {
|
// if vendorErr := partner.IsErrChangePriceFailed(task.GetOriginalErr()); vendorErr != nil {
|
||||||
platformList := make([]string, len(task.GetDetailErrList()))
|
// platformList := make([]string, len(task.GetDetailErrList()))
|
||||||
for k, v := range task.GetDetailErrList() {
|
// for k, v := range task.GetDetailErrList() {
|
||||||
if vendorErr := partner.IsErrVendorError(v); vendorErr != nil {
|
// if vendorErr := partner.IsErrVendorError(v); vendorErr != nil {
|
||||||
platformList[k] = model.VendorChineseNames[vendorErr.VendorID()]
|
// platformList[k] = model.VendorChineseNames[vendorErr.VendorID()]
|
||||||
} else {
|
// } else {
|
||||||
platformList[k] = "未知"
|
// platformList[k] = "未知"
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
err = fmt.Errorf("同步价格失败\n失败平台:%s", strings.Join(platformList, ","))
|
// err = fmt.Errorf("同步价格失败\n失败平台:%s", strings.Join(platformList, ","))
|
||||||
} else {
|
// } else {
|
||||||
err = makeSyncError(err)
|
// }
|
||||||
}
|
err = makeSyncError(err)
|
||||||
}
|
}
|
||||||
return hint, err
|
return hint, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ package tasksch
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.rosy.net.cn/baseapi/utils"
|
"git.rosy.net.cn/baseapi/utils"
|
||||||
"git.rosy.net.cn/jx-callback/business/jxutils"
|
"git.rosy.net.cn/jx-callback/business/jxutils"
|
||||||
@@ -167,29 +166,9 @@ func (task *ParallelTask) Run() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
task.locker.Lock()
|
task.locker.Lock()
|
||||||
if taskErr != nil { // 如果有错误,肯定就是失败了
|
|
||||||
task.Status = TaskStatusFailed
|
|
||||||
} else {
|
|
||||||
if task.FinishedJobCount+task.FailedJobCount < task.TotalJobCount {
|
|
||||||
taskErr = ErrTaskIsCanceled
|
|
||||||
task.Status = TaskStatusCanceled
|
|
||||||
} else {
|
|
||||||
task.Status = TaskStatusFinished
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if taskErr != nil {
|
|
||||||
task.OriginalErr = taskErr
|
|
||||||
task.Err = NewTaskError(task.Name, taskErr)
|
|
||||||
} else {
|
|
||||||
if len(task.batchErrList) > 0 {
|
|
||||||
task.OriginalErr = task.batchErrList[0]
|
|
||||||
}
|
|
||||||
task.Err = task.buildTaskErrFromBatchErrList()
|
|
||||||
}
|
|
||||||
task.Result = taskResult
|
task.Result = taskResult
|
||||||
task.TerminatedAt = time.Now()
|
task.mainErr = taskErr
|
||||||
task.locker.Unlock()
|
task.locker.Unlock()
|
||||||
globals.SugarLogger.Debugf("ParallelTask.Run %s, err:%v", task.Name, task.Err)
|
|
||||||
|
|
||||||
close(task.subFinishChan)
|
close(task.subFinishChan)
|
||||||
task.jobList = nil // 如果不释放,任务被管理的话,会导致内存不能释放
|
task.jobList = nil // 如果不释放,任务被管理的话,会导致内存不能释放
|
||||||
|
|||||||
@@ -1,8 +1,6 @@
|
|||||||
package tasksch
|
package tasksch
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.rosy.net.cn/baseapi/utils"
|
"git.rosy.net.cn/baseapi/utils"
|
||||||
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
|
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
|
||||||
"git.rosy.net.cn/jx-callback/globals"
|
"git.rosy.net.cn/jx-callback/globals"
|
||||||
@@ -57,29 +55,9 @@ func (task *SeqTask) Run() {
|
|||||||
}
|
}
|
||||||
EndFor:
|
EndFor:
|
||||||
task.locker.Lock()
|
task.locker.Lock()
|
||||||
if taskErr != nil { // 如果有错误,肯定就是失败了
|
|
||||||
task.Status = TaskStatusFailed
|
|
||||||
} else {
|
|
||||||
if task.FinishedJobCount+task.FailedJobCount < task.TotalJobCount {
|
|
||||||
taskErr = ErrTaskIsCanceled
|
|
||||||
task.Status = TaskStatusCanceled
|
|
||||||
} else {
|
|
||||||
task.Status = TaskStatusFinished
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if taskErr != nil {
|
|
||||||
task.OriginalErr = taskErr
|
|
||||||
task.Err = NewTaskError(task.Name, taskErr)
|
|
||||||
} else {
|
|
||||||
if len(task.batchErrList) > 0 {
|
|
||||||
task.OriginalErr = task.batchErrList[0]
|
|
||||||
}
|
|
||||||
task.Err = task.buildTaskErrFromBatchErrList()
|
|
||||||
}
|
|
||||||
task.Result = taskResult
|
task.Result = taskResult
|
||||||
task.TerminatedAt = time.Now()
|
task.mainErr = taskErr
|
||||||
task.locker.Unlock()
|
task.locker.Unlock()
|
||||||
globals.SugarLogger.Debugf("SeqTask.Run %s, result:%v, err:%v", task.Name, taskResult, task.Err)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -3,7 +3,6 @@ package tasksch
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -58,35 +57,37 @@ type ITask interface {
|
|||||||
AddChild(task ITask) ITask
|
AddChild(task ITask) ITask
|
||||||
GetChildren() TaskList
|
GetChildren() TaskList
|
||||||
SetParent(parentTask ITask)
|
SetParent(parentTask ITask)
|
||||||
GetOriginalErr() error
|
// GetOriginalErr() error
|
||||||
GetDetailErrList() []error
|
GetErr() error
|
||||||
|
// GetDetailErrList() []error
|
||||||
|
GetLeafResult() (finishedItemCount, failedItemCount int)
|
||||||
|
|
||||||
json.Marshaler
|
json.Marshaler
|
||||||
}
|
}
|
||||||
|
|
||||||
type TaskError struct {
|
// type TaskError struct {
|
||||||
name string
|
// name string
|
||||||
errStr string
|
// errStr string
|
||||||
}
|
// }
|
||||||
|
|
||||||
func (t *TaskError) MarshalJSON() ([]byte, error) {
|
// func (t *TaskError) MarshalJSON() ([]byte, error) {
|
||||||
return json.Marshal(t.Error())
|
// return json.Marshal(t.Error())
|
||||||
}
|
// }
|
||||||
|
|
||||||
func (t *TaskError) Error() string {
|
// func (t *TaskError) Error() string {
|
||||||
return fmt.Sprintf("[%s], 错误:%s", t.name, t.errStr)
|
// return fmt.Sprintf("[%s], 错误:%s", t.name, t.errStr)
|
||||||
}
|
// }
|
||||||
|
|
||||||
func (t *TaskError) String() string {
|
// func (t *TaskError) String() string {
|
||||||
return t.Error()
|
// return t.Error()
|
||||||
}
|
// }
|
||||||
|
|
||||||
func NewTaskError(name string, err error) *TaskError {
|
// func NewTaskError(name string, err error) *TaskError {
|
||||||
return &TaskError{
|
// return &TaskError{
|
||||||
name: name,
|
// name: name,
|
||||||
errStr: err.Error(),
|
// errStr: err.Error(),
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
|
||||||
type BaseTask struct {
|
type BaseTask struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
@@ -108,11 +109,11 @@ type BaseTask struct {
|
|||||||
|
|
||||||
NoticeMsg string `json:"noticeMsg"`
|
NoticeMsg string `json:"noticeMsg"`
|
||||||
|
|
||||||
Result []interface{} `json:"-"`
|
Result []interface{} `json:"-"`
|
||||||
Children TaskList `json:"children"`
|
Children TaskList `json:"children"`
|
||||||
Err error `json:"err"`
|
Err string `json:"err"`
|
||||||
OriginalErr error `json:"-"`
|
|
||||||
|
|
||||||
|
mainErr error
|
||||||
batchErrList []error
|
batchErrList []error
|
||||||
|
|
||||||
finishChan chan struct{}
|
finishChan chan struct{}
|
||||||
@@ -168,7 +169,7 @@ func (t *BaseTask) GetID() string {
|
|||||||
|
|
||||||
func (t *BaseTask) GetResult(duration time.Duration) (retVal []interface{}, err error) {
|
func (t *BaseTask) GetResult(duration time.Duration) (retVal []interface{}, err error) {
|
||||||
if t.GetStatus() >= TaskStatusEndBegin {
|
if t.GetStatus() >= TaskStatusEndBegin {
|
||||||
return t.getResult(), t.GetOriginalErr()
|
return t.getResult(), t.GetErr()
|
||||||
}
|
}
|
||||||
if duration == 0 {
|
if duration == 0 {
|
||||||
duration = time.Hour * 10000 // duration为0表示无限等待
|
duration = time.Hour * 10000 // duration为0表示无限等待
|
||||||
@@ -178,7 +179,7 @@ func (t *BaseTask) GetResult(duration time.Duration) (retVal []interface{}, err
|
|||||||
case <-t.finishChan:
|
case <-t.finishChan:
|
||||||
t.isGetResultCalled = true
|
t.isGetResultCalled = true
|
||||||
timer.Stop()
|
timer.Stop()
|
||||||
return t.getResult(), t.GetOriginalErr()
|
return t.getResult(), t.GetErr()
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
}
|
}
|
||||||
return nil, ErrTaskNotFinished
|
return nil, ErrTaskNotFinished
|
||||||
@@ -282,24 +283,69 @@ func (t *BaseTask) getResult() []interface{} {
|
|||||||
return t.Result
|
return t.Result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// func (t *BaseTask) GetOriginalErr() error {
|
||||||
|
// t.locker.RLock()
|
||||||
|
// defer t.locker.RUnlock()
|
||||||
|
// return nil
|
||||||
|
// }
|
||||||
|
|
||||||
func (t *BaseTask) GetErr() error {
|
func (t *BaseTask) GetErr() error {
|
||||||
t.locker.RLock()
|
t.locker.RLock()
|
||||||
defer t.locker.RUnlock()
|
defer t.locker.RUnlock()
|
||||||
return t.Err
|
if t.mainErr == nil && len(t.batchErrList) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return t
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *BaseTask) GetOriginalErr() error {
|
func (t *BaseTask) GetLeafResult() (finishedItemCount, failedItemCount int) {
|
||||||
t.locker.RLock()
|
if len(t.Children) == 0 {
|
||||||
defer t.locker.RUnlock()
|
return t.FinishedItemCount, t.FailedItemCount
|
||||||
return t.OriginalErr
|
}
|
||||||
|
for _, v := range t.Children {
|
||||||
|
subFinishedItemCount, subFailedItemCount := v.GetLeafResult()
|
||||||
|
finishedItemCount += subFinishedItemCount
|
||||||
|
failedItemCount += subFailedItemCount
|
||||||
|
}
|
||||||
|
return finishedItemCount, failedItemCount
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *BaseTask) GetDetailErrList() []error {
|
func (t *BaseTask) Error() (errMsg string) {
|
||||||
|
if t.mainErr == nil && len(t.batchErrList) == 0 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
t.locker.RLock()
|
t.locker.RLock()
|
||||||
defer t.locker.RUnlock()
|
errMsg = t.Err
|
||||||
return t.batchErrList
|
t.locker.RUnlock()
|
||||||
|
if errMsg != "" {
|
||||||
|
return errMsg
|
||||||
|
}
|
||||||
|
errMsg = "任务:" + t.Name
|
||||||
|
if t.parent == nil {
|
||||||
|
finishedItemCount, failedItemCount := t.GetLeafResult()
|
||||||
|
errMsg += fmt.Sprintf(", 全部总共:%d, 成功:%d, 失败:%d,\n", (finishedItemCount + failedItemCount), finishedItemCount, failedItemCount)
|
||||||
|
}
|
||||||
|
if t.mainErr != nil {
|
||||||
|
errMsg += "失败"
|
||||||
|
errMsg += "," + t.mainErr.Error()
|
||||||
|
} else {
|
||||||
|
errMsg += fmt.Sprintf("部分失败, 总共:%d, 成功:%d, 失败:%d, 详情如下:\n", t.TotalItemCount, t.FinishedItemCount, t.FailedItemCount)
|
||||||
|
for _, v := range t.batchErrList {
|
||||||
|
errMsg += fmt.Sprintf("%s,\n", v.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
t.locker.Lock()
|
||||||
|
t.Err = errMsg
|
||||||
|
t.locker.Unlock()
|
||||||
|
return errMsg
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// func (t *BaseTask) GetDetailErrList() []error {
|
||||||
|
// t.locker.RLock()
|
||||||
|
// defer t.locker.RUnlock()
|
||||||
|
// return t.batchErrList
|
||||||
|
// }
|
||||||
|
|
||||||
func AddChild(parentTask ITask, task ITask) ITask {
|
func AddChild(parentTask ITask, task ITask) ITask {
|
||||||
if parentTask != nil {
|
if parentTask != nil {
|
||||||
return parentTask.AddChild(task)
|
return parentTask.AddChild(task)
|
||||||
@@ -334,6 +380,25 @@ func (t *BaseTask) run(taskHandler func()) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
taskHandler()
|
taskHandler()
|
||||||
|
|
||||||
|
task := t
|
||||||
|
task.locker.Lock()
|
||||||
|
if task.mainErr != nil { // 如果有错误,肯定就是失败了
|
||||||
|
task.Status = TaskStatusFailed
|
||||||
|
} else {
|
||||||
|
if task.FinishedJobCount+task.FailedJobCount < task.TotalJobCount {
|
||||||
|
task.mainErr = ErrTaskIsCanceled
|
||||||
|
task.Status = TaskStatusCanceled
|
||||||
|
} else {
|
||||||
|
task.Status = TaskStatusFinished
|
||||||
|
}
|
||||||
|
}
|
||||||
|
task.TerminatedAt = time.Now()
|
||||||
|
task.locker.Unlock()
|
||||||
|
task.Error()
|
||||||
|
|
||||||
|
globals.SugarLogger.Debugf("Task:%s, result:%v, err:%v", task.Name, task.Result, task.mainErr)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-t.quitChan:
|
case <-t.quitChan:
|
||||||
default:
|
default:
|
||||||
@@ -353,18 +418,14 @@ func (t *BaseTask) run(taskHandler func()) {
|
|||||||
if authInfo, err := t.ctx.GetV2AuthInfo(); err == nil { // 这里应该是不管登录类型,直接以可能的方式发消息
|
if authInfo, err := t.ctx.GetV2AuthInfo(); err == nil { // 这里应该是不管登录类型,直接以可能的方式发消息
|
||||||
var content string
|
var content string
|
||||||
taskDesc := fmt.Sprintf("你的异步任务[%s],ID[%s],开始于:%s,结束于:%s,", t.Name, t.ID, utils.Time2Str(t.CreatedAt), utils.Time2Str(t.TerminatedAt))
|
taskDesc := fmt.Sprintf("你的异步任务[%s],ID[%s],开始于:%s,结束于:%s,", t.Name, t.ID, utils.Time2Str(t.CreatedAt), utils.Time2Str(t.TerminatedAt))
|
||||||
if t.Err == nil {
|
if t.mainErr == nil {
|
||||||
content = fmt.Sprintf("%s执行%s", taskDesc, TaskStatusName[t.Status])
|
content = fmt.Sprintf("%s执行%s", taskDesc, TaskStatusName[t.Status])
|
||||||
noticeMsg := t.GetNoticeMsg()
|
noticeMsg := t.GetNoticeMsg()
|
||||||
if noticeMsg != "" {
|
if noticeMsg != "" {
|
||||||
content += ",通知消息:" + noticeMsg
|
content += ",通知消息:" + noticeMsg
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if t.Status == TaskStatusFinished {
|
content = t.Error()
|
||||||
content = fmt.Sprintf("%s执行部分失败,%s", taskDesc, t.Err.Error())
|
|
||||||
} else {
|
|
||||||
content = fmt.Sprintf("%s执行失败,%s", taskDesc, t.Err.Error())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
msg.SendUserMessage(dingdingapi.MsgTyeText, authInfo.UserID, "异步任务完成", content)
|
msg.SendUserMessage(dingdingapi.MsgTyeText, authInfo.UserID, "异步任务完成", content)
|
||||||
}
|
}
|
||||||
@@ -394,16 +455,16 @@ func (t *BaseTask) setStatus(status int) {
|
|||||||
t.Status = status
|
t.Status = status
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *BaseTask) buildTaskErrFromBatchErrList() (err error) {
|
// func (t *BaseTask) buildTaskErrFromBatchErrList() (err error) {
|
||||||
if len(t.batchErrList) > 0 {
|
// if len(t.batchErrList) > 0 {
|
||||||
strList := make([]string, len(t.batchErrList))
|
// strList := make([]string, len(t.batchErrList))
|
||||||
for k, v := range t.batchErrList {
|
// for k, v := range t.batchErrList {
|
||||||
strList[k] = v.Error()
|
// strList[k] = v.Error()
|
||||||
}
|
// }
|
||||||
return NewTaskError(t.Name, fmt.Errorf("总共:%d, 成功:%d, 失败:%d, 详情:\n%s", t.TotalItemCount, t.FinishedItemCount, t.FailedItemCount, strings.Join(strList, "\n")))
|
// return NewTaskError(t.Name, fmt.Errorf("总共:%d, 成功:%d, 失败:%d, 详情:\n%s", t.TotalItemCount, t.FinishedItemCount, t.FailedItemCount, strings.Join(strList, "\n")))
|
||||||
}
|
// }
|
||||||
return nil
|
// return nil
|
||||||
}
|
// }
|
||||||
|
|
||||||
func (task *BaseTask) callWorker(worker func() (retVal interface{}, err error)) (retVal interface{}, err error) {
|
func (task *BaseTask) callWorker(worker func() (retVal interface{}, err error)) (retVal interface{}, err error) {
|
||||||
defer func() {
|
defer func() {
|
||||||
|
|||||||
@@ -1,15 +1,11 @@
|
|||||||
package tasksch
|
package tasksch
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"git.rosy.net.cn/baseapi/utils"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestTaskError(t *testing.T) {
|
func TestTaskError(t *testing.T) {
|
||||||
err := NewTaskError("test", errors.New("hello"))
|
// err := NewTaskError("test", errors.New("hello"))
|
||||||
fmt.Println(utils.Format4Output(err, false))
|
// fmt.Println(utils.Format4Output(err, false))
|
||||||
fmt.Println(err.Error())
|
// fmt.Println(err.Error())
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user