同步错误返回

This commit is contained in:
苏尹岚
2019-12-02 15:42:01 +08:00
parent 2836da8c94
commit 7ec9a99cfb
2 changed files with 33 additions and 28 deletions

View File

@@ -540,11 +540,9 @@ func (v *VendorSync) LoopStoresMap2(ctx *jxcontext.Context, db *dao.DaoDB, taskN
taskName = fmt.Sprintf("%s,处理平台%s", taskName, model.VendorChineseNames[loopInfoList[0].VendorID]) taskName = fmt.Sprintf("%s,处理平台%s", taskName, model.VendorChineseNames[loopInfoList[0].VendorID])
} }
task = tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, handler, loopInfoList) task = tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, handler, loopInfoList)
task.SetFinishHook(func(task tasksch.ITask) { task.SetFinishHook(task)
fmt.Println("test2") failedList := task.GetErrMsg()
failedList := task.GetErrMsg()
fmt.Println(failedList)
})
tasksch.HandleTask(task, nil, isManageIt).Run() tasksch.HandleTask(task, nil, isManageIt).Run()
if !isAsync { if !isAsync {
resultList, err2 := task.GetResult(0) resultList, err2 := task.GetResult(0)

View File

@@ -65,8 +65,8 @@ type ITask interface {
AddBatchErr(err error) AddBatchErr(err error)
AddErrMsg(failedList ...interface{}) AddErrMsg(failedList ...interface{})
GetErrMsg() (failedList []interface{}) GetErrMsg() (failedList []interface{})
SetFinishHook(func(task ITask)) SetFinishHook(task *ParallelTask)
GetFinishHook() func(task ITask) GetFinishHook() *ParallelTask
json.Marshaler json.Marshaler
} }
@@ -132,7 +132,7 @@ type BaseTask struct {
ctx *jxcontext.Context ctx *jxcontext.Context
isGetResultCalled bool isGetResultCalled bool
FailedList []interface{} FailedList []interface{}
FinishHook func(task ITask) finishHook *ParallelTask
} }
func (s TaskList) Len() int { func (s TaskList) Len() int {
@@ -230,10 +230,6 @@ func (t *BaseTask) GetTotalJobCount() int {
return t.TotalJobCount return t.TotalJobCount
} }
func (t *BaseTask) GetFinishHook() func(ITask) {
return t.FinishHook
}
func (t *BaseTask) GetFinishedJobCount() int { func (t *BaseTask) GetFinishedJobCount() int {
t.locker.RLock() t.locker.RLock()
defer t.locker.RUnlock() defer t.locker.RUnlock()
@@ -364,8 +360,14 @@ func (t *BaseTask) Error() (errMsg string) {
return errMsg return errMsg
} }
func (t *BaseTask) SetFinishHook(hook func(task ITask)) { func (t *BaseTask) SetFinishHook(task *ParallelTask) {
t.FinishHook = hook t.locker.RLock()
defer t.locker.RUnlock()
t.finishHook = task
}
func (t *BaseTask) GetFinishHook() *ParallelTask {
return t.finishHook
} }
func (t *BaseTask) GetErrMsg() (failedList []interface{}) { func (t *BaseTask) GetErrMsg() (failedList []interface{}) {
@@ -425,7 +427,8 @@ func (t *BaseTask) run(taskHandler func()) {
utils.CallFuncAsync(func() { utils.CallFuncAsync(func() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
globals.SugarLogger.Errorf("panic in BaseTask.run task:%s, task detail:%s, r:%v", t.Name, utils.Format4Output(t, false), r) // globals.SugarLogger.Errorf("panic in BaseTask.run task:%s, task detail:%s, r:%v", t.Name, utils.Format4Output(t, false), r)
globals.SugarLogger.Errorf("panic in BaseTask.run task:%s, task detail:%s, r:%v", t.Name, "", r)
} }
}() }()
@@ -463,23 +466,27 @@ func (t *BaseTask) run(taskHandler func()) {
close(t.finishChan) close(t.finishChan)
time.Sleep(10 * time.Millisecond) // 等待GetResult中的isGetResultCalled赋值 time.Sleep(10 * time.Millisecond) // 等待GetResult中的isGetResultCalled赋值
globals.SugarLogger.Debugf("BaseTask task ID:%s, name:%s finished, isGetResultCalled:%t", t.ID, t.Name, t.isGetResultCalled) globals.SugarLogger.Debugf("BaseTask task ID:%s, name:%s finished, isGetResultCalled:%t", t.ID, t.Name, t.isGetResultCalled)
if !t.isGetResultCalled && t.parent == nil && len(GetTasks(t.ID, TaskStatusBegin, TaskStatusEnd, 24, "")) > 0 { p := t.GetFinishHook()
if authInfo, err := t.ctx.GetV2AuthInfo(); err == nil { // 这里应该是不管登录类型,直接以可能的方式发消息 if p != nil {
var content string if !t.isGetResultCalled && t.parent == nil && len(GetTasks(t.ID, TaskStatusBegin, TaskStatusEnd, 24, "")) > 0 {
taskDesc := fmt.Sprintf("你的异步任务[%s],ID[%s],开始于:%s,结束于:%s,", t.Name, t.ID, utils.Time2Str(t.CreatedAt), utils.Time2Str(t.TerminatedAt)) if authInfo, err := t.ctx.GetV2AuthInfo(); err == nil { // 这里应该是不管登录类型,直接以可能的方式发消息
content = fmt.Sprintf("%s执行%s", taskDesc, TaskStatusName[t.Status]) var content string
if t.Error() == "" { taskDesc := fmt.Sprintf("你的异步任务[%s],ID[%s],开始于:%s,结束于:%s,", t.Name, t.ID, utils.Time2Str(t.CreatedAt), utils.Time2Str(t.TerminatedAt))
noticeMsg := t.GetNoticeMsg() content = fmt.Sprintf("%s执行%s", taskDesc, TaskStatusName[t.Status])
if noticeMsg != "" { if t.Error() == "" {
content += ",通知消息:" + noticeMsg noticeMsg := t.GetNoticeMsg()
if noticeMsg != "" {
content += ",通知消息:" + noticeMsg
}
} else {
content += ",\n" + t.Error()
} }
} else { ddmsg.SendUserMessage(dingdingapi.MsgTyeText, authInfo.UserID, "异步任务完成", content)
content += ",\n" + t.Error()
} }
ddmsg.SendUserMessage(dingdingapi.MsgTyeText, authInfo.UserID, "异步任务完成", content)
} }
} else {
} }
t.GetFinishHook()
}) })
} }
} }