From 402e60ab50eb5e8da1efdc72271ecf79c119a3e2 Mon Sep 17 00:00:00 2001 From: gazebo Date: Thu, 1 Aug 2019 20:07:39 +0800 Subject: [PATCH] =?UTF-8?q?-=20=E6=95=B4=E7=90=86tasksch?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- business/jxutils/tasksch/parallel_task.go | 21 ++++++++++----------- business/jxutils/tasksch/sequence_task.go | 15 +++++++++------ business/jxutils/tasksch/task.go | 4 ++-- 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/business/jxutils/tasksch/parallel_task.go b/business/jxutils/tasksch/parallel_task.go index bc59b1a5e..067176391 100644 --- a/business/jxutils/tasksch/parallel_task.go +++ b/business/jxutils/tasksch/parallel_task.go @@ -118,19 +118,18 @@ func (task *ParallelTask) Run() { }) // globals.SugarLogger.Debugf("ParallelTask.Run %s, after call worker result:%v, err:%v", task.Name, result, err) task.finishedOneJob(len(job), err) - if err == nil { - if result != nil { - retVal = append(retVal, utils.Interface2Slice(result)...) - } - } else { - globals.SugarLogger.Infof("ParallelTask.Run %s, subtask(job:%s, params:%s) result:%v, failed with error:%v", task.Name, utils.Format4Output(job, true), utils.Format4Output(task.params, true), result, err) - task.locker.Lock() - task.batchErrList = append(task.batchErrList, err) - task.locker.Unlock() - if !task.IsContinueWhenError { // 出错 + if err != nil { // 出错 + // globals.SugarLogger.Infof("ParallelTask.Run %s, subtask(job:%s, params:%s) result:%v, failed with error:%v", task.Name, utils.Format4Output(job, true), utils.Format4Output(task.params, true), result, err) + if task.IsContinueWhenError { + task.locker.Lock() + task.batchErrList = append(task.batchErrList, err) + task.locker.Unlock() + } else { chanRetVal = err goto end } + } else if result != nil { + retVal = append(retVal, utils.Interface2Slice(result)...) } } } @@ -165,6 +164,7 @@ func (task *ParallelTask) Run() { taskResult = append(taskResult, resultList...) } } + task.locker.Lock() task.Result = taskResult task.mainErr = taskErr @@ -172,7 +172,6 @@ func (task *ParallelTask) Run() { close(task.subFinishChan) task.jobList = nil // 如果不释放,任务被管理的话,会导致内存不能释放 - }) } diff --git a/business/jxutils/tasksch/sequence_task.go b/business/jxutils/tasksch/sequence_task.go index 6c25b26d5..f606d8ffe 100644 --- a/business/jxutils/tasksch/sequence_task.go +++ b/business/jxutils/tasksch/sequence_task.go @@ -41,12 +41,14 @@ func (task *SeqTask) Run() { return task.worker(task, i, task.params...) }) task.finishedOneJob(1, err) - if taskErr = err; taskErr != nil { - task.locker.Lock() - task.batchErrList = append(task.batchErrList, err) - task.locker.Unlock() - globals.SugarLogger.Infof("SeqTask.Run %s step:%d failed with error:%v", task.Name, i, err) - if !task.IsContinueWhenError { + if err != nil { + // globals.SugarLogger.Infof("SeqTask.Run %s step:%d failed with error:%v", task.Name, i, err) + if task.IsContinueWhenError { + task.locker.Lock() + task.batchErrList = append(task.batchErrList, err) + task.locker.Unlock() + } else { + taskErr = err break } } else if result != nil { @@ -54,6 +56,7 @@ func (task *SeqTask) Run() { } } EndFor: + task.locker.Lock() task.Result = taskResult task.mainErr = taskErr diff --git a/business/jxutils/tasksch/task.go b/business/jxutils/tasksch/task.go index 8363ab9d5..80070a1fc 100644 --- a/business/jxutils/tasksch/task.go +++ b/business/jxutils/tasksch/task.go @@ -311,7 +311,7 @@ func (t *BaseTask) GetLeafResult() (finishedItemCount, failedItemCount int) { } func (t *BaseTask) Error() (errMsg string) { - if t.mainErr == nil && len(t.batchErrList) == 0 { + if t.GetErr() == nil { return "" } t.locker.RLock() @@ -397,7 +397,7 @@ func (t *BaseTask) run(taskHandler func()) { task.locker.Unlock() task.Error() - globals.SugarLogger.Debugf("Task:%s, result:%v, err:%v", task.Name, task.Result, task.mainErr) + globals.SugarLogger.Debugf("Task:%s, mainErr:%v, batchErrList:%v", task.Name, task.mainErr, task.batchErrList) select { case <-t.quitChan: