package tasksch import ( "errors" "sync" "time" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxutils" ) const ( TaskStatusBegin = 0 TaskStatusWorking = 0 TaskStatusCanceling = 1 TaskStatusEndBegin = 2 TaskStatusFinished = 2 TaskStatusCanceled = 3 TaskStatusFailed = 4 TaskStatusEnd = 4 ) type WorkFunc func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) type ResultHandlerFunc func(taskName string, result []interface{}, err error) type Task struct { ID string `json:"id"` Name string `json:"name"` CreatedBy string `json:"createdBy"` CreatedAt time.Time `json:"createdAt"` UpdatedAt time.Time `json:"updatedAt"` TerminatedAt time.Time `json:"terminatedAt"` ParallelCount int `json:"parallelCount"` TotalItemCount int `json:"totalItemCount"` TotalJobCount int `json:"totalJobCount"` FinishedItemCount int `json:"finishedItemCount"` FinishedJobCount int `json:"finishedJobCount"` Status int `json:"status"` C <-chan int `json:"-"` taskChan chan []interface{} quitChan chan int subFinishChan chan interface{} finishChan chan int locker sync.RWMutex result []interface{} err error } type TaskList []*Task func (s TaskList) Len() int { return len(s) } func (s TaskList) Less(i, j int) bool { return s[i].CreatedAt.Sub(s[j].CreatedAt) < 0 } func (s TaskList) Swap(i, j int) { tmp := s[i] s[i] = s[j] s[j] = tmp } var ( ErrTaskNotFinished = errors.New("任务还未完成") ErrTaskIsCanceled = errors.New("任务被取消了") ) func RunTask(taskName string, worker WorkFunc, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, itemList interface{}, params ...interface{}) *Task { realItemList := utils.Interface2Slice(itemList) jobList := jxutils.SplitSlice(realItemList, batchSize) task := &Task{ ID: utils.GetUUID(), Name: taskName, CreatedAt: time.Now(), CreatedBy: userName, UpdatedAt: time.Now(), TotalJobCount: len(jobList), TotalItemCount: len(realItemList), ParallelCount: parallelCount, taskChan: make(chan []interface{}, parallelCount*100), quitChan: make(chan int, parallelCount), subFinishChan: make(chan interface{}, parallelCount), finishChan: make(chan int, 2), Status: TaskStatusWorking, } task.C = task.finishChan go func() { for i := 0; i < parallelCount; i++ { go func() { var chanRetVal interface{} retVal := make([]interface{}, 0) for { select { case <-task.quitChan: goto end case job := <-task.taskChan: if job == nil { chanRetVal = retVal goto end } else { if result, err := worker(job, params...); err == nil { task.finishedOneJob(len(job)) if result != nil { retVal = append(retVal, utils.Interface2Slice(result)...) } } else { chanRetVal = err task.Cancel() } } } } end: task.subFinishChan <- chanRetVal }() } for _, job := range jobList { task.taskChan <- job } for i := 0; i < parallelCount; i++ { task.taskChan <- nil } taskResult := make([]interface{}, 0) var taskErr error for i := 0; i < parallelCount; i++ { result := <-task.subFinishChan if err2, ok := result.(error); ok { taskResult = nil taskErr = err2 break } else if result != nil { resultList := result.([]interface{}) taskResult = append(taskResult, resultList...) } } task.locker.Lock() if taskErr != nil { // 如果有错误,肯定就是失败了 task.Status = TaskStatusFailed } else { if len(task.taskChan) > 0 { taskErr = ErrTaskIsCanceled task.Status = TaskStatusCanceled } else { task.Status = TaskStatusFinished } } task.err = taskErr task.result = taskResult task.TerminatedAt = time.Now() task.locker.Unlock() close(task.finishChan) close(task.subFinishChan) close(task.quitChan) if resultHandler != nil { resultHandler(taskName, task.result, task.err) } }() return task } func (t *Task) GetResult(duration time.Duration) (retVal []interface{}, err error) { if t.GetStatus() >= TaskStatusEndBegin { return t.result, t.err } if duration == 0 { duration = time.Hour * 10000 // duration为0表示无限等待 } timer := time.NewTimer(duration) select { case <-t.finishChan: timer.Stop() t.locker.RLock() defer t.locker.RUnlock() return t.result, t.err case <-timer.C: } return nil, ErrTaskNotFinished } func (t *Task) Cancel() { if t.GetStatus() < TaskStatusEndBegin { for i := 0; i < t.ParallelCount; i++ { t.quitChan <- 0 } t.setStatus(TaskStatusCanceling) } } func (t *Task) GetTotalItemCount() int { return t.TotalItemCount } func (t *Task) GetFinishedItemCount() int { t.locker.RLock() defer t.locker.RUnlock() return t.FinishedItemCount } func (t *Task) GetTotalJobCount() int { return t.TotalJobCount } func (t *Task) GetFinishedJobCount() int { t.locker.RLock() defer t.locker.RUnlock() return t.FinishedJobCount } func (t *Task) GetStatus() int { t.locker.RLock() defer t.locker.RUnlock() return t.Status } ///////// func (t *Task) finishedOneJob(itemCount int) { t.locker.Lock() defer t.locker.Unlock() t.UpdatedAt = time.Now() t.FinishedItemCount += itemCount t.FinishedJobCount++ } func (t *Task) setStatus(status int) { t.locker.Lock() defer t.locker.Unlock() t.Status = status }