From a31ba8f7309a41ca0b84a6363a1ffdf586ebb705 Mon Sep 17 00:00:00 2001 From: gazebo Date: Thu, 1 Aug 2019 14:29:22 +0800 Subject: [PATCH] =?UTF-8?q?-=20=E6=95=B4=E7=90=86tasksch?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- business/jxutils/tasksch/parallel_task.go | 28 ++++++-------------- business/jxutils/tasksch/sequence_task.go | 10 ++++---- business/jxutils/tasksch/task.go | 31 ++++++++++++++++------- 3 files changed, 35 insertions(+), 34 deletions(-) diff --git a/business/jxutils/tasksch/parallel_task.go b/business/jxutils/tasksch/parallel_task.go index 626df2e3e..a0f2b2b6a 100644 --- a/business/jxutils/tasksch/parallel_task.go +++ b/business/jxutils/tasksch/parallel_task.go @@ -24,13 +24,11 @@ type ParallelConfig struct { ParallelCount int BatchSize int IsContinueWhenError bool - ResultHandler ResultHandlerFunc } type ParallelTask struct { BaseTask - resultHandler ResultHandlerFunc worker WorkFunc jobList [][]interface{} taskChan chan []interface{} @@ -49,7 +47,6 @@ func NewParallelConfig() *ParallelConfig { IsContinueWhenError: false, ParallelCount: DefParallelCount, BatchSize: 1, - ResultHandler: nil, } } @@ -73,11 +70,6 @@ func (c *ParallelConfig) SetIsContinueWhenError(isContinueWhenError bool) *Paral // return c // } -func (c *ParallelConfig) SetResultHandler(resultHandler ResultHandlerFunc) *ParallelConfig { - c.ResultHandler = resultHandler - return c -} - func NewParallelTask(taskName string, config *ParallelConfig, ctx *jxcontext.Context, worker WorkFunc, itemList interface{}, params ...interface{}) *ParallelTask { if config == nil { config = NewParallelConfig() @@ -97,7 +89,6 @@ func NewParallelTask(taskName string, config *ParallelConfig, ctx *jxcontext.Con task := &ParallelTask{ subFinishChan: make(chan interface{}, config.ParallelCount), taskChan: make(chan []interface{}, len(realItemList)), - resultHandler: config.ResultHandler, worker: worker, jobList: jobList, } @@ -135,7 +126,7 @@ func (task *ParallelTask) Run() { } else { globals.SugarLogger.Infof("ParallelTask.Run %s, subtask(job:%s, params:%s) result:%v, failed with error:%v", task.Name, utils.Format4Output(job, true), utils.Format4Output(task.params, true), result, err) task.locker.Lock() - task.detailErrList = append(task.detailErrList, err) + task.batchErrList = append(task.batchErrList, err) task.locker.Unlock() if !task.IsContinueWhenError { // 出错 chanRetVal = err @@ -175,12 +166,11 @@ func (task *ParallelTask) Run() { taskResult = append(taskResult, resultList...) } } - task.locker.Lock() if taskErr != nil { // 如果有错误,肯定就是失败了 task.Status = TaskStatusFailed } else { - if len(task.taskChan) > 0 { + if task.FinishedJobCount+task.FailedJobCount < task.TotalJobCount { taskErr = ErrTaskIsCanceled task.Status = TaskStatusCanceled } else { @@ -191,21 +181,19 @@ func (task *ParallelTask) Run() { task.OriginalErr = taskErr task.Err = NewTaskError(task.Name, taskErr) } else { - if len(task.detailErrList) > 0 { - task.OriginalErr = task.detailErrList[0] + if len(task.batchErrList) > 0 { + task.OriginalErr = task.batchErrList[0] } - task.Err = task.buildTaskErrFromDetail() + task.Err = task.buildTaskErrFromBatchErrList() } task.Result = taskResult task.TerminatedAt = time.Now() - task.jobList = nil // 如果不释放,任务被管理的话,会导致内存不能释放 task.locker.Unlock() globals.SugarLogger.Debugf("ParallelTask.Run %s, err:%v", task.Name, task.Err) - close(task.subFinishChan) - if task.resultHandler != nil { - task.resultHandler(task.Name, taskResult, task.Err) - } + close(task.subFinishChan) + task.jobList = nil // 如果不释放,任务被管理的话,会导致内存不能释放 + }) } diff --git a/business/jxutils/tasksch/sequence_task.go b/business/jxutils/tasksch/sequence_task.go index 5ec16d942..764594422 100644 --- a/business/jxutils/tasksch/sequence_task.go +++ b/business/jxutils/tasksch/sequence_task.go @@ -45,7 +45,7 @@ func (task *SeqTask) Run() { task.finishedOneJob(1, err) if taskErr = err; taskErr != nil { task.locker.Lock() - task.detailErrList = append(task.detailErrList, err) + task.batchErrList = append(task.batchErrList, err) task.locker.Unlock() globals.SugarLogger.Infof("SeqTask.Run %s step:%d failed with error:%v", task.Name, i, err) if !task.IsContinueWhenError { @@ -60,7 +60,7 @@ func (task *SeqTask) Run() { if taskErr != nil { // 如果有错误,肯定就是失败了 task.Status = TaskStatusFailed } else { - if task.FinishedJobCount < task.TotalJobCount { + if task.FinishedJobCount+task.FailedJobCount < task.TotalJobCount { taskErr = ErrTaskIsCanceled task.Status = TaskStatusCanceled } else { @@ -71,10 +71,10 @@ func (task *SeqTask) Run() { task.OriginalErr = taskErr task.Err = NewTaskError(task.Name, taskErr) } else { - if len(task.detailErrList) > 0 { - task.OriginalErr = task.detailErrList[0] + if len(task.batchErrList) > 0 { + task.OriginalErr = task.batchErrList[0] } - task.Err = task.buildTaskErrFromDetail() + task.Err = task.buildTaskErrFromBatchErrList() } task.Result = taskResult task.TerminatedAt = time.Now() diff --git a/business/jxutils/tasksch/task.go b/business/jxutils/tasksch/task.go index d418d2df9..67cb58fc9 100644 --- a/business/jxutils/tasksch/task.go +++ b/business/jxutils/tasksch/task.go @@ -113,7 +113,7 @@ type BaseTask struct { Err error `json:"err"` OriginalErr error `json:"-"` - detailErrList []error + batchErrList []error finishChan chan struct{} C <-chan struct{} `json:"-"` @@ -168,7 +168,7 @@ func (t *BaseTask) GetID() string { func (t *BaseTask) GetResult(duration time.Duration) (retVal []interface{}, err error) { if t.GetStatus() >= TaskStatusEndBegin { - return t.Result, t.OriginalErr + return t.getResult(), t.GetOriginalErr() } if duration == 0 { duration = time.Hour * 10000 // duration为0表示无限等待 @@ -178,7 +178,7 @@ func (t *BaseTask) GetResult(duration time.Duration) (retVal []interface{}, err case <-t.finishChan: t.isGetResultCalled = true timer.Stop() - return t.Result, t.OriginalErr + return t.getResult(), t.GetOriginalErr() case <-timer.C: } return nil, ErrTaskNotFinished @@ -276,6 +276,18 @@ func (t *BaseTask) GetNoticeMsg() string { return t.NoticeMsg } +func (t *BaseTask) getResult() []interface{} { + t.locker.RLock() + defer t.locker.RUnlock() + return t.Result +} + +func (t *BaseTask) GetErr() error { + t.locker.RLock() + defer t.locker.RUnlock() + return t.Err +} + func (t *BaseTask) GetOriginalErr() error { t.locker.RLock() defer t.locker.RUnlock() @@ -285,7 +297,7 @@ func (t *BaseTask) GetOriginalErr() error { func (t *BaseTask) GetDetailErrList() []error { t.locker.RLock() defer t.locker.RUnlock() - return t.detailErrList + return t.batchErrList } func AddChild(parentTask ITask, task ITask) ITask { @@ -327,6 +339,7 @@ func (t *BaseTask) run(taskHandler func()) { default: close(t.quitChan) } + // todo 如下代码可能有对t.Children操作的并发问题 for _, subTask := range t.Children { if _, err := subTask.GetResult(0); err != nil { globals.SugarLogger.Infof("BaseTask run, failed with error:%v", err) @@ -381,13 +394,13 @@ func (t *BaseTask) setStatus(status int) { t.Status = status } -func (t *BaseTask) buildTaskErrFromDetail() (err error) { - if len(t.detailErrList) > 0 { - strList := make([]string, len(t.detailErrList)) - for k, v := range t.detailErrList { +func (t *BaseTask) buildTaskErrFromBatchErrList() (err error) { + if len(t.batchErrList) > 0 { + strList := make([]string, len(t.batchErrList)) + for k, v := range t.batchErrList { strList[k] = v.Error() } - return NewTaskError(t.Name, fmt.Errorf("总共:%d, 失败:%d, 详情:\n%s", t.TotalItemCount, t.FailedItemCount, strings.Join(strList, "\n"))) + return NewTaskError(t.Name, fmt.Errorf("总共:%d, 成功:%d, 失败:%d, 详情:\n%s", t.TotalItemCount, t.FinishedItemCount, t.FailedItemCount, strings.Join(strList, "\n"))) } return nil }