diff --git a/business/jxutils/tasksch/parallel_task.go b/business/jxutils/tasksch/parallel_task.go index be1e48a5d..e263d88fd 100644 --- a/business/jxutils/tasksch/parallel_task.go +++ b/business/jxutils/tasksch/parallel_task.go @@ -127,7 +127,7 @@ func (task *ParallelTask) Run() { goto end } task.locker.Lock() - task.DetailErrList = append(task.DetailErrList, NewTaskError(err)) + task.detailErrMsgList = append(task.detailErrMsgList, err.Error()) task.locker.Unlock() } } @@ -177,6 +177,8 @@ func (task *ParallelTask) Run() { } if taskErr != nil { task.Err = NewTaskError(taskErr) + } else { + task.Err = task.buildTaskErrFromDetail() } task.Result = taskResult task.TerminatedAt = time.Now() diff --git a/business/jxutils/tasksch/parallel_task_test.go b/business/jxutils/tasksch/parallel_task_test.go index 45223a48e..01b332c18 100644 --- a/business/jxutils/tasksch/parallel_task_test.go +++ b/business/jxutils/tasksch/parallel_task_test.go @@ -65,3 +65,38 @@ func TestCancelParallelTask(t *testing.T) { } // t.Log(utils.Format4Output(result, false)) } + +func TestRunParallelTaskPartialSuccess(t *testing.T) { + itemList := make([]int, 100) + for k := range itemList { + itemList[k] = k + } + task := NewParallelTask("test", NewParallelConfig().SetParallelCount(100).SetBatchSize(7).SetIsContinueWhenError(true), "autotest", func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + sleepSecond := rand.Intn(5) + t.Logf("sleep %d seconds", sleepSecond) + time.Sleep(time.Duration(sleepSecond) * time.Second) + retSlice := make([]string, len(batchItemList)) + for k := range retSlice { + retSlice[k] = "hello:" + utils.Int2Str(batchItemList[k].(int)*2) + } + if rand.Intn(2) == 1 { + return nil, fmt.Errorf("test:%d", batchItemList[0].(int)) + } + return retSlice, nil + }, itemList, "a", "b", 1, 2) + task.Run() + result, err := task.GetResult(1 * time.Microsecond) + if err == nil || task.GetStatus() != TaskStatusWorking { + t.Fatal("task can not be done in 1 microsecond") + } + result, err = task.GetResult(0) + if err == nil { + t.Fatal("不应该全部成功") + } + fmt.Println(err) + if len(result) == len(itemList) { + t.Log(utils.Format4Output(result, false)) + t.Fatal("不应该全部成功") + } + t.Log(task.GetStatus()) +} diff --git a/business/jxutils/tasksch/sequence_task.go b/business/jxutils/tasksch/sequence_task.go index 46ab279c6..45aefe921 100644 --- a/business/jxutils/tasksch/sequence_task.go +++ b/business/jxutils/tasksch/sequence_task.go @@ -46,7 +46,7 @@ func (task *SeqTask) Run() { break } task.locker.Lock() - task.DetailErrList = append(task.DetailErrList, NewTaskError(err)) + task.detailErrMsgList = append(task.detailErrMsgList, err.Error()) task.locker.Unlock() } else if result != nil { taskResult = append(taskResult, utils.Interface2Slice(result)...) @@ -66,6 +66,8 @@ func (task *SeqTask) Run() { } if taskErr != nil { task.Err = NewTaskError(taskErr) + } else { + task.Err = task.buildTaskErrFromDetail() } task.Result = taskResult task.TerminatedAt = time.Now() diff --git a/business/jxutils/tasksch/task.go b/business/jxutils/tasksch/task.go index 6faaf98db..7ae09d9c2 100644 --- a/business/jxutils/tasksch/task.go +++ b/business/jxutils/tasksch/task.go @@ -3,6 +3,7 @@ package tasksch import ( "encoding/json" "fmt" + "strings" "sync" "time" @@ -77,14 +78,13 @@ type BaseTask struct { Result []interface{} `json:"result"` Children TaskList `json:"children"` + Err error `json:"err"` - Err error `json:"err"` - DetailErrList []error `json:"detailErrList"` - - finishChan chan int - C <-chan int `json:"-"` - params []interface{} - quitChan chan int + detailErrMsgList []string `json:"-"` + finishChan chan int + C <-chan int `json:"-"` + params []interface{} + quitChan chan int locker sync.RWMutex parent ITask @@ -139,9 +139,6 @@ func (t *BaseTask) GetResult(duration time.Duration) (retVal []interface{}, err select { case <-t.finishChan: timer.Stop() - t.locker.RLock() - defer t.locker.RUnlock() - return t.Result, t.Err case <-timer.C: } @@ -263,6 +260,7 @@ func (t *BaseTask) run(taskHandler func()) { globals.SugarLogger.Infof("BaseTask run, failed with error:%v", err) } } + close(t.finishChan) }) } @@ -288,3 +286,10 @@ func (t *BaseTask) setStatus(status int) { t.Status = status } + +func (t *BaseTask) buildTaskErrFromDetail() (err error) { + if len(t.detailErrMsgList) > 0 { + return NewTaskError(fmt.Errorf("设置了错误继续标志,部分操作失败,总任务数:%d,失败数:%d,以下为详情:\n%s", t.TotalItemCount, t.FailedItemCount, strings.Join(t.detailErrMsgList, "\n"))) + } + return nil +}