From 68db9281874e864385370dde933cb04b490b1753 Mon Sep 17 00:00:00 2001 From: gazebo Date: Fri, 19 Oct 2018 18:34:31 +0800 Subject: [PATCH] - rough sequence task added. --- business/jxutils/tasksch/parellel_task.go | 201 ++++++++++++++++++++++ business/jxutils/tasksch/sequence_task.go | 79 +++++++++ business/jxutils/tasksch/task.go | 189 ++++---------------- 3 files changed, 313 insertions(+), 156 deletions(-) create mode 100644 business/jxutils/tasksch/parellel_task.go create mode 100644 business/jxutils/tasksch/sequence_task.go diff --git a/business/jxutils/tasksch/parellel_task.go b/business/jxutils/tasksch/parellel_task.go new file mode 100644 index 000000000..18b96981e --- /dev/null +++ b/business/jxutils/tasksch/parellel_task.go @@ -0,0 +1,201 @@ +package tasksch + +import ( + "errors" + "time" + + "git.rosy.net.cn/baseapi/utils" + "git.rosy.net.cn/jx-callback/business/jxutils" + "git.rosy.net.cn/jx-callback/globals" +) + +const ( + DefParallelCount = 10 + MaxParallelCount = 10 +) + +type WorkFunc func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) +type ResultHandlerFunc func(taskName string, result []interface{}, err error) + +type ParallelConfig struct { + ParallelCount int + BatchSize int + IsContinueWhenError bool + ResultHandler ResultHandlerFunc +} + +type Task struct { + BaseTask + + resultHandler ResultHandlerFunc + worker WorkFunc + jobList [][]interface{} + taskChan chan []interface{} + subFinishChan chan interface{} +} + +var ( + ErrTaskNotFinished = errors.New("任务还未完成") + ErrTaskIsCanceled = errors.New("任务被取消了") +) + +func NewParallelConfig() *ParallelConfig { + return &ParallelConfig{ + ParallelCount: DefParallelCount, + BatchSize: 1, + IsContinueWhenError: false, + ResultHandler: nil, + } +} + +func NewParallelTask(taskName string, userName string, config *ParallelConfig, worker WorkFunc, itemList interface{}, params ...interface{}) *Task { + if config == nil { + config = NewParallelConfig() + } + if config.ParallelCount == 0 { + config.ParallelCount = DefParallelCount + } + if config.ParallelCount > MaxParallelCount { + config.ParallelCount = MaxParallelCount + } + realItemList := utils.Interface2Slice(itemList) + jobList := jxutils.SplitSlice(realItemList, config.BatchSize) + jobListLen := jxutils.GetSliceLen(jobList) + if config.ParallelCount > jobListLen { + config.ParallelCount = jobListLen + } + + task := &Task{ + BaseTask: BaseTask{ + ParallelCount: config.ParallelCount, + BatchSize: config.BatchSize, + IsContinueWhenError: config.IsContinueWhenError, + params: params, + + ID: utils.GetUUID(), + Name: taskName, + CreatedAt: time.Now(), + CreatedBy: userName, + UpdatedAt: time.Now(), + TotalJobCount: len(jobList), + TotalItemCount: len(realItemList), + quitChan: make(chan int, config.ParallelCount), + finishChan: make(chan int, 2), + Status: TaskStatusWorking, + }, + subFinishChan: make(chan interface{}, config.ParallelCount), + taskChan: make(chan []interface{}, len(realItemList)+config.ParallelCount), // 确保能装下所有taskitem,加结束标记 + resultHandler: config.ResultHandler, + worker: worker, + jobList: jobList, + } + task.C = task.finishChan + return task +} + +func RunParallelTask(taskName string, userName string, config *ParallelConfig, worker WorkFunc, itemList interface{}, params ...interface{}) *Task { + task := NewParallelTask(taskName, userName, config, worker, itemList, params...) + return task.Run() +} + +func RunTask(taskName string, isContinueWhenError bool, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, worker WorkFunc, itemList interface{}, params ...interface{}) *Task { + config := NewParallelConfig() + config.BatchSize = batchSize + config.IsContinueWhenError = isContinueWhenError + config.ParallelCount = parallelCount + config.ResultHandler = resultHandler + task := NewParallelTask(taskName, userName, config, worker, itemList, params...) + task.Run() + return task +} + +func (task *Task) Run() *Task { + go func() { + globals.SugarLogger.Debugf("Run ParallelTask %s", task.Name) + for i := 0; i < task.ParallelCount; i++ { + go func() { + var chanRetVal interface{} + retVal := make([]interface{}, 0) + for { + select { + case <-task.quitChan: // 取消 + goto end + case job := <-task.taskChan: + if job == nil { // 任务完成 + chanRetVal = retVal + goto end + } else { + result, err := task.worker(job, task.params...) + globals.SugarLogger.Debugf("RunTask %s, after call worker result:%v, err:%v", task.Name, result, err) + task.finishedOneJob(len(job), err) + if err == nil { + if result != nil { + retVal = append(retVal, utils.Interface2Slice(result)...) + } + } else if !task.IsContinueWhenError { // 出错 + chanRetVal = err + goto end + } + } + } + } + end: + globals.SugarLogger.Debugf("RunTask %s, put to chann chanRetVal:%v", task.Name, chanRetVal) + task.locker.RLock() + if task.Status < TaskStatusEndBegin { + task.subFinishChan <- chanRetVal + } + task.locker.RUnlock() + }() + } + for _, job := range task.jobList { + task.taskChan <- job + } + for i := 0; i < task.ParallelCount; i++ { + task.taskChan <- nil + } + + taskResult := make([]interface{}, 0) + var taskErr error + for i := 0; i < task.ParallelCount; i++ { + result := <-task.subFinishChan + // globals.SugarLogger.Debugf("RunTask %s, received from chann result:%v", taskName, result) + if err2, ok := result.(error); ok { + task.Cancel() + taskResult = nil + taskErr = err2 + break // 出错情况下是否需要直接跳出? + } else if result != nil { + resultList := result.([]interface{}) + taskResult = append(taskResult, resultList...) + } + } + + task.locker.Lock() + if taskErr != nil { // 如果有错误,肯定就是失败了 + task.Status = TaskStatusFailed + } else { + if len(task.taskChan) > 0 { + taskErr = ErrTaskIsCanceled + task.Status = TaskStatusCanceled + } else { + task.Status = TaskStatusFinished + } + } + task.err = taskErr + task.result = taskResult + task.TerminatedAt = time.Now() + task.locker.Unlock() + + globals.SugarLogger.Debugf("RunTask %s, result:%v, err:%v", task.Name, taskResult, taskErr) + + close(task.finishChan) + close(task.subFinishChan) + close(task.quitChan) + + if task.resultHandler != nil { + task.resultHandler(task.Name, taskResult, task.err) + } + }() + return task +} diff --git a/business/jxutils/tasksch/sequence_task.go b/business/jxutils/tasksch/sequence_task.go new file mode 100644 index 000000000..979a3bebd --- /dev/null +++ b/business/jxutils/tasksch/sequence_task.go @@ -0,0 +1,79 @@ +package tasksch + +import ( + "time" + + "git.rosy.net.cn/baseapi/utils" + "git.rosy.net.cn/jx-callback/globals" +) + +const ( + StepBegin = "begin" + StepEnd = "End" +) + +type SeqWorkFunc func(step int, params ...interface{}) (result interface{}, err error) // 只有最后一次返回结果保留 + +type SeqTask struct { + BaseTask + worker SeqWorkFunc +} + +func NewSeqTask(taskName string, userName string, worker SeqWorkFunc, stepCount int, params ...interface{}) *SeqTask { + task := &SeqTask{ + BaseTask: BaseTask{ + ParallelCount: 1, + params: params, + ID: utils.GetUUID(), + Name: taskName, + CreatedAt: time.Now(), + CreatedBy: userName, + UpdatedAt: time.Now(), + TotalJobCount: stepCount, + TotalItemCount: stepCount, + quitChan: make(chan int, 1), + finishChan: make(chan int, 2), + Status: TaskStatusWorking, + }, + worker: worker, + } + task.C = task.finishChan + return task +} + +func (task *SeqTask) Run() *SeqTask { + go func() { + globals.SugarLogger.Debugf("Run SeqTask %s", task.Name) + var taskErr error + var taskResult interface{} + for i := 0; i < task.TotalItemCount; i++ { + taskResult, taskErr = task.worker(i, task.params...) + task.finishedOneJob(1, taskErr) + if taskErr != nil { + break + } + } + + task.locker.Lock() + if taskErr != nil { // 如果有错误,肯定就是失败了 + task.Status = TaskStatusFailed + } else { + if task.FinishedJobCount < task.TotalJobCount { + taskErr = ErrTaskIsCanceled + task.Status = TaskStatusCanceled + } else { + task.Status = TaskStatusFinished + } + } + task.err = taskErr + task.result = taskResult + task.TerminatedAt = time.Now() + task.locker.Unlock() + + globals.SugarLogger.Debugf("Run SeqTask %s, result:%v, err:%v", task.Name, taskResult, taskErr) + + close(task.finishChan) + close(task.quitChan) + }() + return task +} diff --git a/business/jxutils/tasksch/task.go b/business/jxutils/tasksch/task.go index c991262d1..95cde054e 100644 --- a/business/jxutils/tasksch/task.go +++ b/business/jxutils/tasksch/task.go @@ -1,13 +1,8 @@ package tasksch import ( - "errors" "sync" "time" - - "git.rosy.net.cn/baseapi/utils" - "git.rosy.net.cn/jx-callback/business/jxutils" - "git.rosy.net.cn/jx-callback/globals" ) const ( @@ -22,39 +17,42 @@ const ( TaskStatusEnd = 4 ) -const ( - MaxParallelCount = 10 -) +type ITask interface { + Run() *ITask + GetResult(duration time.Duration) (retVal []interface{}, err error) + Cancel() + GetTotalItemCount() int + GetFinishedItemCount() int + GetTotalJobCount() int + GetFinishedJobCount() int + GetStatus() int +} -type WorkFunc func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) -type ResultHandlerFunc func(taskName string, result []interface{}, err error) - -type Task struct { - ID string `json:"id"` +type BaseTask struct { Name string `json:"name"` + ID string `json:"id"` + ParallelCount int `json:"parallelCount"` + BatchSize int `json:"batchSize"` + IsContinueWhenError bool `json:"isContinueWhenError"` CreatedBy string `json:"createdBy"` CreatedAt time.Time `json:"createdAt"` UpdatedAt time.Time `json:"updatedAt"` TerminatedAt time.Time `json:"terminatedAt"` - ParallelCount int `json:"parallelCount"` TotalItemCount int `json:"totalItemCount"` TotalJobCount int `json:"totalJobCount"` FinishedItemCount int `json:"finishedItemCount"` FinishedJobCount int `json:"finishedJobCount"` FailedItemCount int `json:"failedItemCount"` FailedJobCount int `json:"failedJobCount"` - IsContinueWhenError bool `json:"isContinueWhenError"` Status int `json:"status"` - C <-chan int `json:"-"` - - taskChan chan []interface{} - quitChan chan int - subFinishChan chan interface{} - finishChan chan int + finishChan chan int + C <-chan int `json:"-"` + params []interface{} + quitChan chan int locker sync.RWMutex - result []interface{} + result interface{} err error } @@ -74,131 +72,10 @@ func (s TaskList) Swap(i, j int) { s[j] = tmp } -var ( - ErrTaskNotFinished = errors.New("任务还未完成") - ErrTaskIsCanceled = errors.New("任务被取消了") -) - -func RunTask(taskName string, isContinueWhenError bool, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, worker WorkFunc, itemList interface{}, params ...interface{}) *Task { - if parallelCount > MaxParallelCount || parallelCount == 0 { - parallelCount = MaxParallelCount - } - realItemList := utils.Interface2Slice(itemList) - jobList := jxutils.SplitSlice(realItemList, batchSize) - jobListLen := jxutils.GetSliceLen(jobList) - if parallelCount > jobListLen { - parallelCount = jobListLen - } - task := &Task{ - ID: utils.GetUUID(), - Name: taskName, - CreatedAt: time.Now(), - CreatedBy: userName, - UpdatedAt: time.Now(), - TotalJobCount: len(jobList), - TotalItemCount: len(realItemList), - ParallelCount: parallelCount, - taskChan: make(chan []interface{}, len(realItemList)+parallelCount), // 确保能装下所有taskitem,加结束标记 - quitChan: make(chan int, parallelCount), - subFinishChan: make(chan interface{}, parallelCount), - finishChan: make(chan int, 2), - Status: TaskStatusWorking, - IsContinueWhenError: isContinueWhenError, - } - task.C = task.finishChan - go func() { - globals.SugarLogger.Debugf("RunTask %s", taskName) - for i := 0; i < parallelCount; i++ { - go func() { - var chanRetVal interface{} - retVal := make([]interface{}, 0) - for { - select { - case <-task.quitChan: // 取消 - goto end - case job := <-task.taskChan: - if job == nil { // 任务完成 - chanRetVal = retVal - goto end - } else { - result, err := worker(job, params...) - globals.SugarLogger.Debugf("RunTask %s, after call worker result:%v, err:%v", taskName, result, err) - task.finishedOneJob(len(job), err) - if err == nil { - if result != nil { - retVal = append(retVal, utils.Interface2Slice(result)...) - } - } else if !isContinueWhenError { // 出错 - chanRetVal = err - goto end - } - } - } - } - end: - globals.SugarLogger.Debugf("RunTask %s, put to chann chanRetVal:%v", taskName, chanRetVal) - task.locker.RLock() - if task.Status < TaskStatusEndBegin { - task.subFinishChan <- chanRetVal - } - task.locker.RUnlock() - }() - } - for _, job := range jobList { - task.taskChan <- job - } - for i := 0; i < parallelCount; i++ { - task.taskChan <- nil - } - - taskResult := make([]interface{}, 0) - var taskErr error - for i := 0; i < parallelCount; i++ { - result := <-task.subFinishChan - // globals.SugarLogger.Debugf("RunTask %s, received from chann result:%v", taskName, result) - if err2, ok := result.(error); ok { - task.Cancel() - taskResult = nil - taskErr = err2 - break // 出错情况下是否需要直接跳出? - } else if result != nil { - resultList := result.([]interface{}) - taskResult = append(taskResult, resultList...) - } - } - - task.locker.Lock() - if taskErr != nil { // 如果有错误,肯定就是失败了 - task.Status = TaskStatusFailed - } else { - if len(task.taskChan) > 0 { - taskErr = ErrTaskIsCanceled - task.Status = TaskStatusCanceled - } else { - task.Status = TaskStatusFinished - } - } - task.err = taskErr - task.result = taskResult - task.TerminatedAt = time.Now() - task.locker.Unlock() - - globals.SugarLogger.Debugf("RunTask %s, result:%v, err:%v", taskName, taskResult, taskErr) - - close(task.finishChan) - close(task.subFinishChan) - close(task.quitChan) - - if resultHandler != nil { - resultHandler(taskName, task.result, task.err) - } - }() - return task -} - -func (t *Task) GetResult(duration time.Duration) (retVal []interface{}, err error) { +func (t *BaseTask) GetResult(duration time.Duration) (retVal []interface{}, err error) { if t.GetStatus() >= TaskStatusEndBegin { - return t.result, t.err + retVal, _ = t.result.([]interface{}) + return retVal, t.err } if duration == 0 { duration = time.Hour * 10000 // duration为0表示无限等待 @@ -210,13 +87,14 @@ func (t *Task) GetResult(duration time.Duration) (retVal []interface{}, err erro t.locker.RLock() defer t.locker.RUnlock() - return t.result, t.err + retVal, _ = t.result.([]interface{}) + return retVal, t.err case <-timer.C: } return nil, ErrTaskNotFinished } -func (t *Task) Cancel() { +func (t *BaseTask) Cancel() { t.locker.Lock() defer t.locker.Unlock() if t.Status < TaskStatusEndBegin && t.Status != TaskStatusCanceling { @@ -227,29 +105,29 @@ func (t *Task) Cancel() { } } -func (t *Task) GetTotalItemCount() int { +func (t *BaseTask) GetTotalItemCount() int { return t.TotalItemCount } -func (t *Task) GetFinishedItemCount() int { +func (t *BaseTask) GetFinishedItemCount() int { t.locker.RLock() defer t.locker.RUnlock() return t.FinishedItemCount } -func (t *Task) GetTotalJobCount() int { +func (t *BaseTask) GetTotalJobCount() int { return t.TotalJobCount } -func (t *Task) GetFinishedJobCount() int { +func (t *BaseTask) GetFinishedJobCount() int { t.locker.RLock() defer t.locker.RUnlock() return t.FinishedJobCount } -func (t *Task) GetStatus() int { +func (t *BaseTask) GetStatus() int { t.locker.RLock() defer t.locker.RUnlock() @@ -257,8 +135,7 @@ func (t *Task) GetStatus() int { } ///////// - -func (t *Task) finishedOneJob(itemCount int, err error) { +func (t *BaseTask) finishedOneJob(itemCount int, err error) { t.locker.Lock() defer t.locker.Unlock() @@ -272,7 +149,7 @@ func (t *Task) finishedOneJob(itemCount int, err error) { } } -func (t *Task) setStatus(status int) { +func (t *BaseTask) setStatus(status int) { t.locker.Lock() defer t.locker.Unlock()