Files
jx-callback/business/jxutils/tasksch/task.go
2018-09-21 15:42:39 +08:00

255 lines
5.9 KiB
Go

package tasksch
import (
"errors"
"sync"
"time"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxutils"
"git.rosy.net.cn/jx-callback/globals"
)
const (
TaskStatusBegin = 0
TaskStatusWorking = 0
TaskStatusCanceling = 1
TaskStatusEndBegin = 2
TaskStatusFinished = 2
TaskStatusCanceled = 3
TaskStatusFailed = 4
TaskStatusEnd = 4
)
type WorkFunc func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error)
type ResultHandlerFunc func(taskName string, result []interface{}, err error)
type Task struct {
ID string `json:"id"`
Name string `json:"name"`
CreatedBy string `json:"createdBy"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
TerminatedAt time.Time `json:"terminatedAt"`
ParallelCount int `json:"parallelCount"`
TotalItemCount int `json:"totalItemCount"`
TotalJobCount int `json:"totalJobCount"`
FinishedItemCount int `json:"finishedItemCount"`
FinishedJobCount int `json:"finishedJobCount"`
Status int `json:"status"`
C <-chan int `json:"-"`
taskChan chan []interface{}
quitChan chan int
subFinishChan chan interface{}
finishChan chan int
locker sync.RWMutex
result []interface{}
err error
}
type TaskList []*Task
func (s TaskList) Len() int {
return len(s)
}
func (s TaskList) Less(i, j int) bool {
return s[i].CreatedAt.Sub(s[j].CreatedAt) < 0
}
func (s TaskList) Swap(i, j int) {
tmp := s[i]
s[i] = s[j]
s[j] = tmp
}
var (
ErrTaskNotFinished = errors.New("任务还未完成")
ErrTaskIsCanceled = errors.New("任务被取消了")
)
func RunTask(taskName string, worker WorkFunc, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, itemList interface{}, params ...interface{}) *Task {
realItemList := utils.Interface2Slice(itemList)
jobList := jxutils.SplitSlice(realItemList, batchSize)
task := &Task{
ID: utils.GetUUID(),
Name: taskName,
CreatedAt: time.Now(),
CreatedBy: userName,
UpdatedAt: time.Now(),
TotalJobCount: len(jobList),
TotalItemCount: len(realItemList),
ParallelCount: parallelCount,
taskChan: make(chan []interface{}, parallelCount*100),
quitChan: make(chan int, parallelCount),
subFinishChan: make(chan interface{}, parallelCount),
finishChan: make(chan int, 2),
Status: TaskStatusWorking,
}
task.C = task.finishChan
go func() {
globals.SugarLogger.Debugf("RunTask %s", taskName)
for i := 0; i < parallelCount; i++ {
go func() {
var chanRetVal interface{}
retVal := make([]interface{}, 0)
for {
select {
case <-task.quitChan:
goto end
case job := <-task.taskChan:
if job == nil {
chanRetVal = retVal
goto end
} else {
result, err := worker(job, params...)
globals.SugarLogger.Debugf("RunTask %s, after call worker result:%v, err:%v", taskName, result, err)
if err == nil {
task.finishedOneJob(len(job))
if result != nil {
retVal = append(retVal, utils.Interface2Slice(result)...)
}
} else {
chanRetVal = err
go func() {
task.Cancel()
}()
goto end
}
}
}
}
end:
// globals.SugarLogger.Debugf("RunTask %s, put to chann chanRetVal:%v", taskName, chanRetVal)
task.subFinishChan <- chanRetVal
}()
}
for _, job := range jobList {
task.taskChan <- job
}
for i := 0; i < parallelCount; i++ {
task.taskChan <- nil
}
taskResult := make([]interface{}, 0)
var taskErr error
for i := 0; i < parallelCount; i++ {
result := <-task.subFinishChan
if err2, ok := result.(error); ok {
taskResult = nil
taskErr = err2
} else if result != nil {
resultList := result.([]interface{})
taskResult = append(taskResult, resultList...)
}
}
task.locker.Lock()
if taskErr != nil { // 如果有错误,肯定就是失败了
task.Status = TaskStatusFailed
} else {
if len(task.taskChan) > 0 {
taskErr = ErrTaskIsCanceled
task.Status = TaskStatusCanceled
} else {
task.Status = TaskStatusFinished
}
}
task.err = taskErr
task.result = taskResult
task.TerminatedAt = time.Now()
task.locker.Unlock()
globals.SugarLogger.Debugf("RunTask %s, result:%v, err:%v", taskName, taskResult, taskErr)
close(task.finishChan)
close(task.subFinishChan)
close(task.quitChan)
if resultHandler != nil {
resultHandler(taskName, task.result, task.err)
}
}()
return task
}
func (t *Task) GetResult(duration time.Duration) (retVal []interface{}, err error) {
if t.GetStatus() >= TaskStatusEndBegin {
return t.result, t.err
}
if duration == 0 {
duration = time.Hour * 10000 // duration为0表示无限等待
}
timer := time.NewTimer(duration)
select {
case <-t.finishChan:
timer.Stop()
t.locker.RLock()
defer t.locker.RUnlock()
return t.result, t.err
case <-timer.C:
}
return nil, ErrTaskNotFinished
}
func (t *Task) Cancel() {
if t.GetStatus() < TaskStatusEndBegin {
for i := 0; i < t.ParallelCount; i++ {
t.quitChan <- 0
}
t.setStatus(TaskStatusCanceling)
}
}
func (t *Task) GetTotalItemCount() int {
return t.TotalItemCount
}
func (t *Task) GetFinishedItemCount() int {
t.locker.RLock()
defer t.locker.RUnlock()
return t.FinishedItemCount
}
func (t *Task) GetTotalJobCount() int {
return t.TotalJobCount
}
func (t *Task) GetFinishedJobCount() int {
t.locker.RLock()
defer t.locker.RUnlock()
return t.FinishedJobCount
}
func (t *Task) GetStatus() int {
t.locker.RLock()
defer t.locker.RUnlock()
return t.Status
}
/////////
func (t *Task) finishedOneJob(itemCount int) {
t.locker.Lock()
defer t.locker.Unlock()
t.UpdatedAt = time.Now()
t.FinishedItemCount += itemCount
t.FinishedJobCount++
}
func (t *Task) setStatus(status int) {
t.locker.Lock()
defer t.locker.Unlock()
t.Status = status
}