package tasksch import ( "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" "git.rosy.net.cn/jx-callback/globals" ) const ( StepBegin = "begin" StepEnd = "End" ) type SeqWorkFunc func(task *SeqTask, step int, params ...interface{}) (result interface{}, err error) type SeqTask struct { BaseTask worker SeqWorkFunc } func NewSeqTask(taskName string, ctx *jxcontext.Context, worker SeqWorkFunc, stepCount int, params ...interface{}) *SeqTask { return NewSeqTask2(taskName, ctx, false, worker, stepCount, params...) } func NewSeqTask2(taskName string, ctx *jxcontext.Context, isContinueWhenError bool, worker SeqWorkFunc, stepCount int, params ...interface{}) *SeqTask { task := &SeqTask{ worker: worker, } task.Init(1, 1, isContinueWhenError, params, taskName, ctx, stepCount, stepCount) return task } func (task *SeqTask) Run() { task.run(func() { globals.SugarLogger.Debugf("SeqTask.Run %s", task.Name) var taskErr error var taskResult []interface{} for i := 0; i < task.TotalItemCount; i++ { select { case <-task.quitChan: goto EndFor default: } result, err := task.callWorker(func() (retVal interface{}, err error) { return task.worker(task, i, task.params...) }) task.finishedOneJob(1, 0, err) if err != nil { // globals.SugarLogger.Infof("SeqTask.Run %s step:%d failed with error:%v", task.Name, i, err) if task.IsContinueWhenError { task.AddBatchErr(err) } else { taskErr = err break } } else if result != nil { taskResult = append(taskResult, utils.Interface2Slice(result)...) } } EndFor: task.locker.Lock() task.Result = taskResult task.mainErr = taskErr task.locker.Unlock() task.worker = nil }) } func (t *SeqTask) AddChild(task ITask) ITask { return t.BaseTask.AddChild(task) }