Files
jx-callback/business/jxutils/tasksch/task.go
gazebo b936d3354b - up.
2018-09-22 23:28:52 +08:00

281 lines
7.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package tasksch
import (
"errors"
"sync"
"time"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxutils"
"git.rosy.net.cn/jx-callback/globals"
)
const (
TaskStatusBegin = 0
TaskStatusWorking = 0
TaskStatusCanceling = 1
TaskStatusEndBegin = 2
TaskStatusFinished = 2
TaskStatusCanceled = 3
TaskStatusFailed = 4
TaskStatusEnd = 4
)
const (
MaxParallelCount = 10
)
type WorkFunc func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error)
type ResultHandlerFunc func(taskName string, result []interface{}, err error)
type Task struct {
ID string `json:"id"`
Name string `json:"name"`
CreatedBy string `json:"createdBy"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
TerminatedAt time.Time `json:"terminatedAt"`
ParallelCount int `json:"parallelCount"`
TotalItemCount int `json:"totalItemCount"`
TotalJobCount int `json:"totalJobCount"`
FinishedItemCount int `json:"finishedItemCount"`
FinishedJobCount int `json:"finishedJobCount"`
FailedItemCount int `json:"failedItemCount"`
FailedJobCount int `json:"failedJobCount"`
IsContinueWhenError bool `json:"isContinueWhenError"`
Status int `json:"status"`
C <-chan int `json:"-"`
taskChan chan []interface{}
quitChan chan int
subFinishChan chan interface{}
finishChan chan int
locker sync.RWMutex
result []interface{}
err error
}
type TaskList []*Task
func (s TaskList) Len() int {
return len(s)
}
func (s TaskList) Less(i, j int) bool {
return s[i].CreatedAt.Sub(s[j].CreatedAt) < 0
}
func (s TaskList) Swap(i, j int) {
tmp := s[i]
s[i] = s[j]
s[j] = tmp
}
var (
ErrTaskNotFinished = errors.New("任务还未完成")
ErrTaskIsCanceled = errors.New("任务被取消了")
)
func RunTask(taskName string, isContinueWhenError bool, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, worker WorkFunc, itemList interface{}, params ...interface{}) *Task {
if parallelCount > MaxParallelCount || parallelCount == 0 {
parallelCount = MaxParallelCount
}
listLen := jxutils.GetSliceLen(itemList)
if parallelCount > listLen {
parallelCount = listLen
}
realItemList := utils.Interface2Slice(itemList)
jobList := jxutils.SplitSlice(realItemList, batchSize)
task := &Task{
ID: utils.GetUUID(),
Name: taskName,
CreatedAt: time.Now(),
CreatedBy: userName,
UpdatedAt: time.Now(),
TotalJobCount: len(jobList),
TotalItemCount: len(realItemList),
ParallelCount: parallelCount,
taskChan: make(chan []interface{}, len(realItemList)+parallelCount), // 确保能装下所有taskitem加结束标记
quitChan: make(chan int, parallelCount),
subFinishChan: make(chan interface{}, parallelCount),
finishChan: make(chan int, 2),
Status: TaskStatusWorking,
IsContinueWhenError: isContinueWhenError,
}
task.C = task.finishChan
go func() {
globals.SugarLogger.Debugf("RunTask %s", taskName)
for i := 0; i < parallelCount; i++ {
go func() {
var chanRetVal interface{}
retVal := make([]interface{}, 0)
for {
select {
case <-task.quitChan: // 取消
goto end
case job := <-task.taskChan:
if job == nil { // 任务完成
chanRetVal = retVal
goto end
} else {
result, err := worker(job, params...)
globals.SugarLogger.Debugf("RunTask %s, after call worker result:%v, err:%v", taskName, result, err)
task.finishedOneJob(len(job), err)
if err == nil {
if result != nil {
retVal = append(retVal, utils.Interface2Slice(result)...)
}
} else if !isContinueWhenError { // 出错
chanRetVal = err
goto end
}
}
}
}
end:
globals.SugarLogger.Debugf("RunTask %s, put to chann chanRetVal:%v", taskName, chanRetVal)
task.locker.RLock()
if task.Status < TaskStatusEndBegin {
task.subFinishChan <- chanRetVal
}
task.locker.RUnlock()
}()
}
for _, job := range jobList {
task.taskChan <- job
}
for i := 0; i < parallelCount; i++ {
task.taskChan <- nil
}
taskResult := make([]interface{}, 0)
var taskErr error
for i := 0; i < parallelCount; i++ {
result := <-task.subFinishChan
// globals.SugarLogger.Debugf("RunTask %s, received from chann result:%v", taskName, result)
if err2, ok := result.(error); ok {
task.Cancel()
taskResult = nil
taskErr = err2
break // 出错情况下是否需要直接跳出?
} else if result != nil {
resultList := result.([]interface{})
taskResult = append(taskResult, resultList...)
}
}
task.locker.Lock()
if taskErr != nil { // 如果有错误,肯定就是失败了
task.Status = TaskStatusFailed
} else {
if len(task.taskChan) > 0 {
taskErr = ErrTaskIsCanceled
task.Status = TaskStatusCanceled
} else {
task.Status = TaskStatusFinished
}
}
task.err = taskErr
task.result = taskResult
task.TerminatedAt = time.Now()
task.locker.Unlock()
globals.SugarLogger.Debugf("RunTask %s, result:%v, err:%v", taskName, taskResult, taskErr)
close(task.finishChan)
close(task.subFinishChan)
close(task.quitChan)
if resultHandler != nil {
resultHandler(taskName, task.result, task.err)
}
}()
return task
}
func (t *Task) GetResult(duration time.Duration) (retVal []interface{}, err error) {
if t.GetStatus() >= TaskStatusEndBegin {
return t.result, t.err
}
if duration == 0 {
duration = time.Hour * 10000 // duration为0表示无限等待
}
timer := time.NewTimer(duration)
select {
case <-t.finishChan:
timer.Stop()
t.locker.RLock()
defer t.locker.RUnlock()
return t.result, t.err
case <-timer.C:
}
return nil, ErrTaskNotFinished
}
func (t *Task) Cancel() {
t.locker.Lock()
defer t.locker.Unlock()
if t.Status < TaskStatusEndBegin && t.Status != TaskStatusCanceling {
t.Status = TaskStatusCanceling
for i := 0; i < t.ParallelCount; i++ {
t.quitChan <- 0
}
}
}
func (t *Task) GetTotalItemCount() int {
return t.TotalItemCount
}
func (t *Task) GetFinishedItemCount() int {
t.locker.RLock()
defer t.locker.RUnlock()
return t.FinishedItemCount
}
func (t *Task) GetTotalJobCount() int {
return t.TotalJobCount
}
func (t *Task) GetFinishedJobCount() int {
t.locker.RLock()
defer t.locker.RUnlock()
return t.FinishedJobCount
}
func (t *Task) GetStatus() int {
t.locker.RLock()
defer t.locker.RUnlock()
return t.Status
}
/////////
func (t *Task) finishedOneJob(itemCount int, err error) {
t.locker.Lock()
defer t.locker.Unlock()
t.UpdatedAt = time.Now()
if err == nil {
t.FinishedItemCount += itemCount
t.FinishedJobCount++
} else {
t.FailedItemCount += itemCount
t.FailedJobCount++
}
}
func (t *Task) setStatus(status int) {
t.locker.Lock()
defer t.locker.Unlock()
t.Status = status
}