diff --git a/business/jxutils/tasksch/parallel_task.go b/business/jxutils/tasksch/parallel_task.go index 91eb6a197..3eecc6cae 100644 --- a/business/jxutils/tasksch/parallel_task.go +++ b/business/jxutils/tasksch/parallel_task.go @@ -112,23 +112,26 @@ func (task *ParallelTask) Run() { select { case <-task.quitChan: // 取消 goto end - case job := <-task.taskChan: - if job == nil { // 任务完成 - chanRetVal = retVal - goto end - } else { - result, err := task.worker(task, job, task.params...) - // globals.SugarLogger.Debugf("ParallelTask.Run %s, after call worker result:%v, err:%v", task.Name, result, err) - task.finishedOneJob(len(job), err) - if err == nil { - if result != nil { - retVal = append(retVal, utils.Interface2Slice(result)...) - } + default: + select { + case job := <-task.taskChan: + if job == nil { // 任务完成 + chanRetVal = retVal + goto end } else { - globals.SugarLogger.Infof("ParallelTask.Run %s, subtask(job:%s, params:%s) result:%v, failed with error:%v", task.Name, utils.Format4Output(job, true), utils.Format4Output(task.params, true), result, err) - if !task.IsContinueWhenError { // 出错 - chanRetVal = err - goto end + result, err := task.worker(task, job, task.params...) + // globals.SugarLogger.Debugf("ParallelTask.Run %s, after call worker result:%v, err:%v", task.Name, result, err) + task.finishedOneJob(len(job), err) + if err == nil { + if result != nil { + retVal = append(retVal, utils.Interface2Slice(result)...) + } + } else { + globals.SugarLogger.Infof("ParallelTask.Run %s, subtask(job:%s, params:%s) result:%v, failed with error:%v", task.Name, utils.Format4Output(job, true), utils.Format4Output(task.params, true), result, err) + if !task.IsContinueWhenError { // 出错 + chanRetVal = err + goto end + } } } } @@ -167,6 +170,9 @@ func (task *ParallelTask) Run() { } task.locker.Lock() + if task.Status != TaskStatusCanceling { + close(task.quitChan) + } if taskErr != nil { // 如果有错误,肯定就是失败了 task.Status = TaskStatusFailed } else { @@ -186,7 +192,6 @@ func (task *ParallelTask) Run() { close(task.finishChan) close(task.subFinishChan) - close(task.quitChan) if task.resultHandler != nil { task.resultHandler(task.Name, taskResult, task.Err) diff --git a/business/jxutils/tasksch/parallel_task_test.go b/business/jxutils/tasksch/parallel_task_test.go index 61e0fd828..08ca36274 100644 --- a/business/jxutils/tasksch/parallel_task_test.go +++ b/business/jxutils/tasksch/parallel_task_test.go @@ -1,6 +1,7 @@ package tasksch import ( + "fmt" "math/rand" "testing" "time" @@ -45,7 +46,7 @@ func TestCancelParallelTask(t *testing.T) { } task := RunParallelTask("test", NewParallelConfig().SetParallelCount(100).SetBatchSize(7), "autotest", func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { sleepSecond := rand.Intn(5) - t.Logf("sleep %d seconds", sleepSecond) + fmt.Printf("sleep %d seconds\n", sleepSecond) time.Sleep(time.Duration(sleepSecond) * time.Second) retSlice := make([]string, len(batchItemList)) for k := range retSlice { @@ -54,7 +55,7 @@ func TestCancelParallelTask(t *testing.T) { return retSlice, nil }, itemList, "a", "b", 1, 2) // time.Sleep(time.Second * 6) - t.Logf("finishedItemCount:%d, finishedJobCount:%d", task.GetFinishedItemCount(), task.GetFinishedJobCount()) + fmt.Printf("finishedItemCount:%d, finishedJobCount:%d\n", task.GetFinishedItemCount(), task.GetFinishedJobCount()) task.Cancel() _, err := task.GetResult(0) if err != ErrTaskIsCanceled { diff --git a/business/jxutils/tasksch/sequence_task.go b/business/jxutils/tasksch/sequence_task.go index 65f42364e..15912d9ea 100644 --- a/business/jxutils/tasksch/sequence_task.go +++ b/business/jxutils/tasksch/sequence_task.go @@ -57,6 +57,9 @@ func (task *SeqTask) Run() { } EndFor: task.locker.Lock() + if task.Status != TaskStatusCanceling { + close(task.quitChan) + } if taskErr != nil { // 如果有错误,肯定就是失败了 task.Status = TaskStatusFailed } else { @@ -75,7 +78,6 @@ func (task *SeqTask) Run() { globals.SugarLogger.Debugf("SeqTask.Run %s, result:%v, err:%v", task.Name, taskResult, taskErr) close(task.finishChan) - close(task.quitChan) }) } diff --git a/business/jxutils/tasksch/task.go b/business/jxutils/tasksch/task.go index afedf23fc..3ad2317b2 100644 --- a/business/jxutils/tasksch/task.go +++ b/business/jxutils/tasksch/task.go @@ -101,8 +101,8 @@ func (t *BaseTask) Init(parallelCount, batchSize int, isContinueWhenError bool, t.TerminatedAt = utils.DefaultTimeValue t.TotalItemCount = totalItemCount t.TotalJobCount = totalJobCount - t.quitChan = make(chan int, parallelCount) - t.finishChan = make(chan int, 2) + t.quitChan = make(chan int, 1) + t.finishChan = make(chan int, 1) t.Status = TaskStatusWorking t.C = t.finishChan @@ -143,9 +143,7 @@ func (t *BaseTask) Cancel() { t.locker.Lock() if t.Status < TaskStatusEndBegin && t.Status != TaskStatusCanceling { t.Status = TaskStatusCanceling - for i := 0; i < t.ParallelCount; i++ { - t.quitChan <- 0 - } + close(t.quitChan) } t.locker.Unlock()