- 整理tasksch

This commit is contained in:
gazebo
2019-08-01 14:29:22 +08:00
parent 43f33799f7
commit a31ba8f730
3 changed files with 35 additions and 34 deletions

View File

@@ -24,13 +24,11 @@ type ParallelConfig struct {
ParallelCount int ParallelCount int
BatchSize int BatchSize int
IsContinueWhenError bool IsContinueWhenError bool
ResultHandler ResultHandlerFunc
} }
type ParallelTask struct { type ParallelTask struct {
BaseTask BaseTask
resultHandler ResultHandlerFunc
worker WorkFunc worker WorkFunc
jobList [][]interface{} jobList [][]interface{}
taskChan chan []interface{} taskChan chan []interface{}
@@ -49,7 +47,6 @@ func NewParallelConfig() *ParallelConfig {
IsContinueWhenError: false, IsContinueWhenError: false,
ParallelCount: DefParallelCount, ParallelCount: DefParallelCount,
BatchSize: 1, BatchSize: 1,
ResultHandler: nil,
} }
} }
@@ -73,11 +70,6 @@ func (c *ParallelConfig) SetIsContinueWhenError(isContinueWhenError bool) *Paral
// return c // return c
// } // }
func (c *ParallelConfig) SetResultHandler(resultHandler ResultHandlerFunc) *ParallelConfig {
c.ResultHandler = resultHandler
return c
}
func NewParallelTask(taskName string, config *ParallelConfig, ctx *jxcontext.Context, worker WorkFunc, itemList interface{}, params ...interface{}) *ParallelTask { func NewParallelTask(taskName string, config *ParallelConfig, ctx *jxcontext.Context, worker WorkFunc, itemList interface{}, params ...interface{}) *ParallelTask {
if config == nil { if config == nil {
config = NewParallelConfig() config = NewParallelConfig()
@@ -97,7 +89,6 @@ func NewParallelTask(taskName string, config *ParallelConfig, ctx *jxcontext.Con
task := &ParallelTask{ task := &ParallelTask{
subFinishChan: make(chan interface{}, config.ParallelCount), subFinishChan: make(chan interface{}, config.ParallelCount),
taskChan: make(chan []interface{}, len(realItemList)), taskChan: make(chan []interface{}, len(realItemList)),
resultHandler: config.ResultHandler,
worker: worker, worker: worker,
jobList: jobList, jobList: jobList,
} }
@@ -135,7 +126,7 @@ func (task *ParallelTask) Run() {
} else { } else {
globals.SugarLogger.Infof("ParallelTask.Run %s, subtask(job:%s, params:%s) result:%v, failed with error:%v", task.Name, utils.Format4Output(job, true), utils.Format4Output(task.params, true), result, err) globals.SugarLogger.Infof("ParallelTask.Run %s, subtask(job:%s, params:%s) result:%v, failed with error:%v", task.Name, utils.Format4Output(job, true), utils.Format4Output(task.params, true), result, err)
task.locker.Lock() task.locker.Lock()
task.detailErrList = append(task.detailErrList, err) task.batchErrList = append(task.batchErrList, err)
task.locker.Unlock() task.locker.Unlock()
if !task.IsContinueWhenError { // 出错 if !task.IsContinueWhenError { // 出错
chanRetVal = err chanRetVal = err
@@ -175,12 +166,11 @@ func (task *ParallelTask) Run() {
taskResult = append(taskResult, resultList...) taskResult = append(taskResult, resultList...)
} }
} }
task.locker.Lock() task.locker.Lock()
if taskErr != nil { // 如果有错误,肯定就是失败了 if taskErr != nil { // 如果有错误,肯定就是失败了
task.Status = TaskStatusFailed task.Status = TaskStatusFailed
} else { } else {
if len(task.taskChan) > 0 { if task.FinishedJobCount+task.FailedJobCount < task.TotalJobCount {
taskErr = ErrTaskIsCanceled taskErr = ErrTaskIsCanceled
task.Status = TaskStatusCanceled task.Status = TaskStatusCanceled
} else { } else {
@@ -191,21 +181,19 @@ func (task *ParallelTask) Run() {
task.OriginalErr = taskErr task.OriginalErr = taskErr
task.Err = NewTaskError(task.Name, taskErr) task.Err = NewTaskError(task.Name, taskErr)
} else { } else {
if len(task.detailErrList) > 0 { if len(task.batchErrList) > 0 {
task.OriginalErr = task.detailErrList[0] task.OriginalErr = task.batchErrList[0]
} }
task.Err = task.buildTaskErrFromDetail() task.Err = task.buildTaskErrFromBatchErrList()
} }
task.Result = taskResult task.Result = taskResult
task.TerminatedAt = time.Now() task.TerminatedAt = time.Now()
task.jobList = nil // 如果不释放,任务被管理的话,会导致内存不能释放
task.locker.Unlock() task.locker.Unlock()
globals.SugarLogger.Debugf("ParallelTask.Run %s, err:%v", task.Name, task.Err) globals.SugarLogger.Debugf("ParallelTask.Run %s, err:%v", task.Name, task.Err)
close(task.subFinishChan)
if task.resultHandler != nil { close(task.subFinishChan)
task.resultHandler(task.Name, taskResult, task.Err) task.jobList = nil // 如果不释放,任务被管理的话,会导致内存不能释放
}
}) })
} }

View File

@@ -45,7 +45,7 @@ func (task *SeqTask) Run() {
task.finishedOneJob(1, err) task.finishedOneJob(1, err)
if taskErr = err; taskErr != nil { if taskErr = err; taskErr != nil {
task.locker.Lock() task.locker.Lock()
task.detailErrList = append(task.detailErrList, err) task.batchErrList = append(task.batchErrList, err)
task.locker.Unlock() task.locker.Unlock()
globals.SugarLogger.Infof("SeqTask.Run %s step:%d failed with error:%v", task.Name, i, err) globals.SugarLogger.Infof("SeqTask.Run %s step:%d failed with error:%v", task.Name, i, err)
if !task.IsContinueWhenError { if !task.IsContinueWhenError {
@@ -60,7 +60,7 @@ func (task *SeqTask) Run() {
if taskErr != nil { // 如果有错误,肯定就是失败了 if taskErr != nil { // 如果有错误,肯定就是失败了
task.Status = TaskStatusFailed task.Status = TaskStatusFailed
} else { } else {
if task.FinishedJobCount < task.TotalJobCount { if task.FinishedJobCount+task.FailedJobCount < task.TotalJobCount {
taskErr = ErrTaskIsCanceled taskErr = ErrTaskIsCanceled
task.Status = TaskStatusCanceled task.Status = TaskStatusCanceled
} else { } else {
@@ -71,10 +71,10 @@ func (task *SeqTask) Run() {
task.OriginalErr = taskErr task.OriginalErr = taskErr
task.Err = NewTaskError(task.Name, taskErr) task.Err = NewTaskError(task.Name, taskErr)
} else { } else {
if len(task.detailErrList) > 0 { if len(task.batchErrList) > 0 {
task.OriginalErr = task.detailErrList[0] task.OriginalErr = task.batchErrList[0]
} }
task.Err = task.buildTaskErrFromDetail() task.Err = task.buildTaskErrFromBatchErrList()
} }
task.Result = taskResult task.Result = taskResult
task.TerminatedAt = time.Now() task.TerminatedAt = time.Now()

View File

@@ -113,7 +113,7 @@ type BaseTask struct {
Err error `json:"err"` Err error `json:"err"`
OriginalErr error `json:"-"` OriginalErr error `json:"-"`
detailErrList []error batchErrList []error
finishChan chan struct{} finishChan chan struct{}
C <-chan struct{} `json:"-"` C <-chan struct{} `json:"-"`
@@ -168,7 +168,7 @@ func (t *BaseTask) GetID() string {
func (t *BaseTask) GetResult(duration time.Duration) (retVal []interface{}, err error) { func (t *BaseTask) GetResult(duration time.Duration) (retVal []interface{}, err error) {
if t.GetStatus() >= TaskStatusEndBegin { if t.GetStatus() >= TaskStatusEndBegin {
return t.Result, t.OriginalErr return t.getResult(), t.GetOriginalErr()
} }
if duration == 0 { if duration == 0 {
duration = time.Hour * 10000 // duration为0表示无限等待 duration = time.Hour * 10000 // duration为0表示无限等待
@@ -178,7 +178,7 @@ func (t *BaseTask) GetResult(duration time.Duration) (retVal []interface{}, err
case <-t.finishChan: case <-t.finishChan:
t.isGetResultCalled = true t.isGetResultCalled = true
timer.Stop() timer.Stop()
return t.Result, t.OriginalErr return t.getResult(), t.GetOriginalErr()
case <-timer.C: case <-timer.C:
} }
return nil, ErrTaskNotFinished return nil, ErrTaskNotFinished
@@ -276,6 +276,18 @@ func (t *BaseTask) GetNoticeMsg() string {
return t.NoticeMsg return t.NoticeMsg
} }
func (t *BaseTask) getResult() []interface{} {
t.locker.RLock()
defer t.locker.RUnlock()
return t.Result
}
func (t *BaseTask) GetErr() error {
t.locker.RLock()
defer t.locker.RUnlock()
return t.Err
}
func (t *BaseTask) GetOriginalErr() error { func (t *BaseTask) GetOriginalErr() error {
t.locker.RLock() t.locker.RLock()
defer t.locker.RUnlock() defer t.locker.RUnlock()
@@ -285,7 +297,7 @@ func (t *BaseTask) GetOriginalErr() error {
func (t *BaseTask) GetDetailErrList() []error { func (t *BaseTask) GetDetailErrList() []error {
t.locker.RLock() t.locker.RLock()
defer t.locker.RUnlock() defer t.locker.RUnlock()
return t.detailErrList return t.batchErrList
} }
func AddChild(parentTask ITask, task ITask) ITask { func AddChild(parentTask ITask, task ITask) ITask {
@@ -327,6 +339,7 @@ func (t *BaseTask) run(taskHandler func()) {
default: default:
close(t.quitChan) close(t.quitChan)
} }
// todo 如下代码可能有对t.Children操作的并发问题
for _, subTask := range t.Children { for _, subTask := range t.Children {
if _, err := subTask.GetResult(0); err != nil { if _, err := subTask.GetResult(0); err != nil {
globals.SugarLogger.Infof("BaseTask run, failed with error:%v", err) globals.SugarLogger.Infof("BaseTask run, failed with error:%v", err)
@@ -381,13 +394,13 @@ func (t *BaseTask) setStatus(status int) {
t.Status = status t.Status = status
} }
func (t *BaseTask) buildTaskErrFromDetail() (err error) { func (t *BaseTask) buildTaskErrFromBatchErrList() (err error) {
if len(t.detailErrList) > 0 { if len(t.batchErrList) > 0 {
strList := make([]string, len(t.detailErrList)) strList := make([]string, len(t.batchErrList))
for k, v := range t.detailErrList { for k, v := range t.batchErrList {
strList[k] = v.Error() strList[k] = v.Error()
} }
return NewTaskError(t.Name, fmt.Errorf("总共:%d, 失败:%d, 详情:\n%s", t.TotalItemCount, t.FailedItemCount, strings.Join(strList, "\n"))) return NewTaskError(t.Name, fmt.Errorf("总共:%d, 成功:%d, 失败:%d, 详情:\n%s", t.TotalItemCount, t.FinishedItemCount, t.FailedItemCount, strings.Join(strList, "\n")))
} }
return nil return nil
} }