package tasksch import ( "errors" "sync" "time" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxutils" "git.rosy.net.cn/jx-callback/globals" ) const ( TaskStatusBegin = 0 TaskStatusWorking = 0 TaskStatusCanceling = 1 TaskStatusEndBegin = 2 TaskStatusFinished = 2 TaskStatusCanceled = 3 TaskStatusFailed = 4 TaskStatusEnd = 4 ) const ( MaxParallelCount = 10 ) type WorkFunc func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) type ResultHandlerFunc func(taskName string, result []interface{}, err error) type Task struct { ID string `json:"id"` Name string `json:"name"` CreatedBy string `json:"createdBy"` CreatedAt time.Time `json:"createdAt"` UpdatedAt time.Time `json:"updatedAt"` TerminatedAt time.Time `json:"terminatedAt"` ParallelCount int `json:"parallelCount"` TotalItemCount int `json:"totalItemCount"` TotalJobCount int `json:"totalJobCount"` FinishedItemCount int `json:"finishedItemCount"` FinishedJobCount int `json:"finishedJobCount"` FailedItemCount int `json:"failedItemCount"` FailedJobCount int `json:"failedJobCount"` IsContinueWhenError bool `json:"isContinueWhenError"` Status int `json:"status"` C <-chan int `json:"-"` taskChan chan []interface{} quitChan chan int subFinishChan chan interface{} finishChan chan int locker sync.RWMutex result []interface{} err error } type TaskList []*Task func (s TaskList) Len() int { return len(s) } func (s TaskList) Less(i, j int) bool { return s[i].CreatedAt.Sub(s[j].CreatedAt) < 0 } func (s TaskList) Swap(i, j int) { tmp := s[i] s[i] = s[j] s[j] = tmp } var ( ErrTaskNotFinished = errors.New("任务还未完成") ErrTaskIsCanceled = errors.New("任务被取消了") ) func RunTask(taskName string, isContinueWhenError bool, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, worker WorkFunc, itemList interface{}, params ...interface{}) *Task { if parallelCount > MaxParallelCount || parallelCount == 0 { parallelCount = MaxParallelCount } listLen := jxutils.GetSliceLen(itemList) if parallelCount > listLen { parallelCount = listLen } realItemList := utils.Interface2Slice(itemList) jobList := jxutils.SplitSlice(realItemList, batchSize) task := &Task{ ID: utils.GetUUID(), Name: taskName, CreatedAt: time.Now(), CreatedBy: userName, UpdatedAt: time.Now(), TotalJobCount: len(jobList), TotalItemCount: len(realItemList), ParallelCount: parallelCount, taskChan: make(chan []interface{}, len(realItemList)+parallelCount), // 确保能装下所有taskitem,加结束标记 quitChan: make(chan int, parallelCount), subFinishChan: make(chan interface{}, parallelCount), finishChan: make(chan int, 2), Status: TaskStatusWorking, IsContinueWhenError: isContinueWhenError, } task.C = task.finishChan go func() { globals.SugarLogger.Debugf("RunTask %s", taskName) for i := 0; i < parallelCount; i++ { go func() { var chanRetVal interface{} retVal := make([]interface{}, 0) for { select { case <-task.quitChan: // 取消 goto end case job := <-task.taskChan: if job == nil { // 任务完成 chanRetVal = retVal goto end } else { result, err := worker(job, params...) globals.SugarLogger.Debugf("RunTask %s, after call worker result:%v, err:%v", taskName, result, err) task.finishedOneJob(len(job), err) if err == nil { if result != nil { retVal = append(retVal, utils.Interface2Slice(result)...) } } else if !isContinueWhenError { // 出错 chanRetVal = err goto end } } } } end: globals.SugarLogger.Debugf("RunTask %s, put to chann chanRetVal:%v", taskName, chanRetVal) task.locker.RLock() if task.Status < TaskStatusEndBegin { task.subFinishChan <- chanRetVal } task.locker.RUnlock() }() } for _, job := range jobList { task.taskChan <- job } for i := 0; i < parallelCount; i++ { task.taskChan <- nil } taskResult := make([]interface{}, 0) var taskErr error for i := 0; i < parallelCount; i++ { result := <-task.subFinishChan // globals.SugarLogger.Debugf("RunTask %s, received from chann result:%v", taskName, result) if err2, ok := result.(error); ok { task.Cancel() taskResult = nil taskErr = err2 break // 出错情况下是否需要直接跳出? } else if result != nil { resultList := result.([]interface{}) taskResult = append(taskResult, resultList...) } } task.locker.Lock() if taskErr != nil { // 如果有错误,肯定就是失败了 task.Status = TaskStatusFailed } else { if len(task.taskChan) > 0 { taskErr = ErrTaskIsCanceled task.Status = TaskStatusCanceled } else { task.Status = TaskStatusFinished } } task.err = taskErr task.result = taskResult task.TerminatedAt = time.Now() task.locker.Unlock() globals.SugarLogger.Debugf("RunTask %s, result:%v, err:%v", taskName, taskResult, taskErr) close(task.finishChan) close(task.subFinishChan) close(task.quitChan) if resultHandler != nil { resultHandler(taskName, task.result, task.err) } }() return task } func (t *Task) GetResult(duration time.Duration) (retVal []interface{}, err error) { if t.GetStatus() >= TaskStatusEndBegin { return t.result, t.err } if duration == 0 { duration = time.Hour * 10000 // duration为0表示无限等待 } timer := time.NewTimer(duration) select { case <-t.finishChan: timer.Stop() t.locker.RLock() defer t.locker.RUnlock() return t.result, t.err case <-timer.C: } return nil, ErrTaskNotFinished } func (t *Task) Cancel() { t.locker.Lock() defer t.locker.Unlock() if t.Status < TaskStatusEndBegin && t.Status != TaskStatusCanceling { t.Status = TaskStatusCanceling for i := 0; i < t.ParallelCount; i++ { t.quitChan <- 0 } } } func (t *Task) GetTotalItemCount() int { return t.TotalItemCount } func (t *Task) GetFinishedItemCount() int { t.locker.RLock() defer t.locker.RUnlock() return t.FinishedItemCount } func (t *Task) GetTotalJobCount() int { return t.TotalJobCount } func (t *Task) GetFinishedJobCount() int { t.locker.RLock() defer t.locker.RUnlock() return t.FinishedJobCount } func (t *Task) GetStatus() int { t.locker.RLock() defer t.locker.RUnlock() return t.Status } ///////// func (t *Task) finishedOneJob(itemCount int, err error) { t.locker.Lock() defer t.locker.Unlock() t.UpdatedAt = time.Now() if err == nil { t.FinishedItemCount += itemCount t.FinishedJobCount++ } else { t.FailedItemCount += itemCount t.FailedJobCount++ } } func (t *Task) setStatus(status int) { t.locker.Lock() defer t.locker.Unlock() t.Status = status }