From 852fff6eff0e1a0f69e0ed2fb3ba7b8b49bdcbc9 Mon Sep 17 00:00:00 2001 From: gazebo Date: Thu, 1 Aug 2019 16:20:52 +0800 Subject: [PATCH] =?UTF-8?q?-=20=E9=87=8D=E6=9E=84tasksch?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- business/jxstore/cms/sync.go | 27 ++-- business/jxutils/tasksch/parallel_task.go | 23 +-- business/jxutils/tasksch/sequence_task.go | 24 +--- business/jxutils/tasksch/task.go | 165 +++++++++++++++------- business/jxutils/tasksch/task_test.go | 10 +- 5 files changed, 131 insertions(+), 118 deletions(-) diff --git a/business/jxstore/cms/sync.go b/business/jxstore/cms/sync.go index 809a56fc2..ee11c0c48 100644 --- a/business/jxstore/cms/sync.go +++ b/business/jxstore/cms/sync.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "reflect" - "strings" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxutils" @@ -421,19 +420,19 @@ func (v *VendorSync) SyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendo return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID) }, isContinueWhenError) if task != nil { - if vendorErr := partner.IsErrChangePriceFailed(task.GetOriginalErr()); vendorErr != nil { - platformList := make([]string, len(task.GetDetailErrList())) - for k, v := range task.GetDetailErrList() { - if vendorErr := partner.IsErrVendorError(v); vendorErr != nil { - platformList[k] = model.VendorChineseNames[vendorErr.VendorID()] - } else { - platformList[k] = "未知" - } - } - err = fmt.Errorf("同步价格失败\n失败平台:%s", strings.Join(platformList, ",")) - } else { - err = makeSyncError(err) - } + // if vendorErr := partner.IsErrChangePriceFailed(task.GetOriginalErr()); vendorErr != nil { + // platformList := make([]string, len(task.GetDetailErrList())) + // for k, v := range task.GetDetailErrList() { + // if vendorErr := partner.IsErrVendorError(v); vendorErr != nil { + // platformList[k] = model.VendorChineseNames[vendorErr.VendorID()] + // } else { + // platformList[k] = "未知" + // } + // } + // err = fmt.Errorf("同步价格失败\n失败平台:%s", strings.Join(platformList, ",")) + // } else { + // } + err = makeSyncError(err) } return hint, err } diff --git a/business/jxutils/tasksch/parallel_task.go b/business/jxutils/tasksch/parallel_task.go index a0f2b2b6a..bc59b1a5e 100644 --- a/business/jxutils/tasksch/parallel_task.go +++ b/business/jxutils/tasksch/parallel_task.go @@ -2,7 +2,6 @@ package tasksch import ( "errors" - "time" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxutils" @@ -167,29 +166,9 @@ func (task *ParallelTask) Run() { } } task.locker.Lock() - if taskErr != nil { // 如果有错误,肯定就是失败了 - task.Status = TaskStatusFailed - } else { - if task.FinishedJobCount+task.FailedJobCount < task.TotalJobCount { - taskErr = ErrTaskIsCanceled - task.Status = TaskStatusCanceled - } else { - task.Status = TaskStatusFinished - } - } - if taskErr != nil { - task.OriginalErr = taskErr - task.Err = NewTaskError(task.Name, taskErr) - } else { - if len(task.batchErrList) > 0 { - task.OriginalErr = task.batchErrList[0] - } - task.Err = task.buildTaskErrFromBatchErrList() - } task.Result = taskResult - task.TerminatedAt = time.Now() + task.mainErr = taskErr task.locker.Unlock() - globals.SugarLogger.Debugf("ParallelTask.Run %s, err:%v", task.Name, task.Err) close(task.subFinishChan) task.jobList = nil // 如果不释放,任务被管理的话,会导致内存不能释放 diff --git a/business/jxutils/tasksch/sequence_task.go b/business/jxutils/tasksch/sequence_task.go index 764594422..6c25b26d5 100644 --- a/business/jxutils/tasksch/sequence_task.go +++ b/business/jxutils/tasksch/sequence_task.go @@ -1,8 +1,6 @@ package tasksch import ( - "time" - "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" "git.rosy.net.cn/jx-callback/globals" @@ -57,29 +55,9 @@ func (task *SeqTask) Run() { } EndFor: task.locker.Lock() - if taskErr != nil { // 如果有错误,肯定就是失败了 - task.Status = TaskStatusFailed - } else { - if task.FinishedJobCount+task.FailedJobCount < task.TotalJobCount { - taskErr = ErrTaskIsCanceled - task.Status = TaskStatusCanceled - } else { - task.Status = TaskStatusFinished - } - } - if taskErr != nil { - task.OriginalErr = taskErr - task.Err = NewTaskError(task.Name, taskErr) - } else { - if len(task.batchErrList) > 0 { - task.OriginalErr = task.batchErrList[0] - } - task.Err = task.buildTaskErrFromBatchErrList() - } task.Result = taskResult - task.TerminatedAt = time.Now() + task.mainErr = taskErr task.locker.Unlock() - globals.SugarLogger.Debugf("SeqTask.Run %s, result:%v, err:%v", task.Name, taskResult, task.Err) }) } diff --git a/business/jxutils/tasksch/task.go b/business/jxutils/tasksch/task.go index 67cb58fc9..8363ab9d5 100644 --- a/business/jxutils/tasksch/task.go +++ b/business/jxutils/tasksch/task.go @@ -3,7 +3,6 @@ package tasksch import ( "encoding/json" "fmt" - "strings" "sync" "time" @@ -58,35 +57,37 @@ type ITask interface { AddChild(task ITask) ITask GetChildren() TaskList SetParent(parentTask ITask) - GetOriginalErr() error - GetDetailErrList() []error + // GetOriginalErr() error + GetErr() error + // GetDetailErrList() []error + GetLeafResult() (finishedItemCount, failedItemCount int) json.Marshaler } -type TaskError struct { - name string - errStr string -} +// type TaskError struct { +// name string +// errStr string +// } -func (t *TaskError) MarshalJSON() ([]byte, error) { - return json.Marshal(t.Error()) -} +// func (t *TaskError) MarshalJSON() ([]byte, error) { +// return json.Marshal(t.Error()) +// } -func (t *TaskError) Error() string { - return fmt.Sprintf("[%s], 错误:%s", t.name, t.errStr) -} +// func (t *TaskError) Error() string { +// return fmt.Sprintf("[%s], 错误:%s", t.name, t.errStr) +// } -func (t *TaskError) String() string { - return t.Error() -} +// func (t *TaskError) String() string { +// return t.Error() +// } -func NewTaskError(name string, err error) *TaskError { - return &TaskError{ - name: name, - errStr: err.Error(), - } -} +// func NewTaskError(name string, err error) *TaskError { +// return &TaskError{ +// name: name, +// errStr: err.Error(), +// } +// } type BaseTask struct { Name string `json:"name"` @@ -108,11 +109,11 @@ type BaseTask struct { NoticeMsg string `json:"noticeMsg"` - Result []interface{} `json:"-"` - Children TaskList `json:"children"` - Err error `json:"err"` - OriginalErr error `json:"-"` + Result []interface{} `json:"-"` + Children TaskList `json:"children"` + Err string `json:"err"` + mainErr error batchErrList []error finishChan chan struct{} @@ -168,7 +169,7 @@ func (t *BaseTask) GetID() string { func (t *BaseTask) GetResult(duration time.Duration) (retVal []interface{}, err error) { if t.GetStatus() >= TaskStatusEndBegin { - return t.getResult(), t.GetOriginalErr() + return t.getResult(), t.GetErr() } if duration == 0 { duration = time.Hour * 10000 // duration为0表示无限等待 @@ -178,7 +179,7 @@ func (t *BaseTask) GetResult(duration time.Duration) (retVal []interface{}, err case <-t.finishChan: t.isGetResultCalled = true timer.Stop() - return t.getResult(), t.GetOriginalErr() + return t.getResult(), t.GetErr() case <-timer.C: } return nil, ErrTaskNotFinished @@ -282,24 +283,69 @@ func (t *BaseTask) getResult() []interface{} { return t.Result } +// func (t *BaseTask) GetOriginalErr() error { +// t.locker.RLock() +// defer t.locker.RUnlock() +// return nil +// } + func (t *BaseTask) GetErr() error { t.locker.RLock() defer t.locker.RUnlock() - return t.Err + if t.mainErr == nil && len(t.batchErrList) == 0 { + return nil + } + return t } -func (t *BaseTask) GetOriginalErr() error { - t.locker.RLock() - defer t.locker.RUnlock() - return t.OriginalErr +func (t *BaseTask) GetLeafResult() (finishedItemCount, failedItemCount int) { + if len(t.Children) == 0 { + return t.FinishedItemCount, t.FailedItemCount + } + for _, v := range t.Children { + subFinishedItemCount, subFailedItemCount := v.GetLeafResult() + finishedItemCount += subFinishedItemCount + failedItemCount += subFailedItemCount + } + return finishedItemCount, failedItemCount } -func (t *BaseTask) GetDetailErrList() []error { +func (t *BaseTask) Error() (errMsg string) { + if t.mainErr == nil && len(t.batchErrList) == 0 { + return "" + } t.locker.RLock() - defer t.locker.RUnlock() - return t.batchErrList + errMsg = t.Err + t.locker.RUnlock() + if errMsg != "" { + return errMsg + } + errMsg = "任务:" + t.Name + if t.parent == nil { + finishedItemCount, failedItemCount := t.GetLeafResult() + errMsg += fmt.Sprintf(", 全部总共:%d, 成功:%d, 失败:%d,\n", (finishedItemCount + failedItemCount), finishedItemCount, failedItemCount) + } + if t.mainErr != nil { + errMsg += "失败" + errMsg += "," + t.mainErr.Error() + } else { + errMsg += fmt.Sprintf("部分失败, 总共:%d, 成功:%d, 失败:%d, 详情如下:\n", t.TotalItemCount, t.FinishedItemCount, t.FailedItemCount) + for _, v := range t.batchErrList { + errMsg += fmt.Sprintf("%s,\n", v.Error()) + } + } + t.locker.Lock() + t.Err = errMsg + t.locker.Unlock() + return errMsg } +// func (t *BaseTask) GetDetailErrList() []error { +// t.locker.RLock() +// defer t.locker.RUnlock() +// return t.batchErrList +// } + func AddChild(parentTask ITask, task ITask) ITask { if parentTask != nil { return parentTask.AddChild(task) @@ -334,6 +380,25 @@ func (t *BaseTask) run(taskHandler func()) { }() taskHandler() + + task := t + task.locker.Lock() + if task.mainErr != nil { // 如果有错误,肯定就是失败了 + task.Status = TaskStatusFailed + } else { + if task.FinishedJobCount+task.FailedJobCount < task.TotalJobCount { + task.mainErr = ErrTaskIsCanceled + task.Status = TaskStatusCanceled + } else { + task.Status = TaskStatusFinished + } + } + task.TerminatedAt = time.Now() + task.locker.Unlock() + task.Error() + + globals.SugarLogger.Debugf("Task:%s, result:%v, err:%v", task.Name, task.Result, task.mainErr) + select { case <-t.quitChan: default: @@ -353,18 +418,14 @@ func (t *BaseTask) run(taskHandler func()) { if authInfo, err := t.ctx.GetV2AuthInfo(); err == nil { // 这里应该是不管登录类型,直接以可能的方式发消息 var content string taskDesc := fmt.Sprintf("你的异步任务[%s],ID[%s],开始于:%s,结束于:%s,", t.Name, t.ID, utils.Time2Str(t.CreatedAt), utils.Time2Str(t.TerminatedAt)) - if t.Err == nil { + if t.mainErr == nil { content = fmt.Sprintf("%s执行%s", taskDesc, TaskStatusName[t.Status]) noticeMsg := t.GetNoticeMsg() if noticeMsg != "" { content += ",通知消息:" + noticeMsg } } else { - if t.Status == TaskStatusFinished { - content = fmt.Sprintf("%s执行部分失败,%s", taskDesc, t.Err.Error()) - } else { - content = fmt.Sprintf("%s执行失败,%s", taskDesc, t.Err.Error()) - } + content = t.Error() } msg.SendUserMessage(dingdingapi.MsgTyeText, authInfo.UserID, "异步任务完成", content) } @@ -394,16 +455,16 @@ func (t *BaseTask) setStatus(status int) { t.Status = status } -func (t *BaseTask) buildTaskErrFromBatchErrList() (err error) { - if len(t.batchErrList) > 0 { - strList := make([]string, len(t.batchErrList)) - for k, v := range t.batchErrList { - strList[k] = v.Error() - } - return NewTaskError(t.Name, fmt.Errorf("总共:%d, 成功:%d, 失败:%d, 详情:\n%s", t.TotalItemCount, t.FinishedItemCount, t.FailedItemCount, strings.Join(strList, "\n"))) - } - return nil -} +// func (t *BaseTask) buildTaskErrFromBatchErrList() (err error) { +// if len(t.batchErrList) > 0 { +// strList := make([]string, len(t.batchErrList)) +// for k, v := range t.batchErrList { +// strList[k] = v.Error() +// } +// return NewTaskError(t.Name, fmt.Errorf("总共:%d, 成功:%d, 失败:%d, 详情:\n%s", t.TotalItemCount, t.FinishedItemCount, t.FailedItemCount, strings.Join(strList, "\n"))) +// } +// return nil +// } func (task *BaseTask) callWorker(worker func() (retVal interface{}, err error)) (retVal interface{}, err error) { defer func() { diff --git a/business/jxutils/tasksch/task_test.go b/business/jxutils/tasksch/task_test.go index 788716526..8b77e5b4b 100644 --- a/business/jxutils/tasksch/task_test.go +++ b/business/jxutils/tasksch/task_test.go @@ -1,15 +1,11 @@ package tasksch import ( - "errors" - "fmt" "testing" - - "git.rosy.net.cn/baseapi/utils" ) func TestTaskError(t *testing.T) { - err := NewTaskError("test", errors.New("hello")) - fmt.Println(utils.Format4Output(err, false)) - fmt.Println(err.Error()) + // err := NewTaskError("test", errors.New("hello")) + // fmt.Println(utils.Format4Output(err, false)) + // fmt.Println(err.Error()) }