- rough sequence task added.

This commit is contained in:
gazebo
2018-10-19 18:34:31 +08:00
parent 94196d72e5
commit 68db928187
3 changed files with 313 additions and 156 deletions

View File

@@ -0,0 +1,201 @@
package tasksch
import (
"errors"
"time"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxutils"
"git.rosy.net.cn/jx-callback/globals"
)
const (
DefParallelCount = 10
MaxParallelCount = 10
)
type WorkFunc func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error)
type ResultHandlerFunc func(taskName string, result []interface{}, err error)
type ParallelConfig struct {
ParallelCount int
BatchSize int
IsContinueWhenError bool
ResultHandler ResultHandlerFunc
}
type Task struct {
BaseTask
resultHandler ResultHandlerFunc
worker WorkFunc
jobList [][]interface{}
taskChan chan []interface{}
subFinishChan chan interface{}
}
var (
ErrTaskNotFinished = errors.New("任务还未完成")
ErrTaskIsCanceled = errors.New("任务被取消了")
)
func NewParallelConfig() *ParallelConfig {
return &ParallelConfig{
ParallelCount: DefParallelCount,
BatchSize: 1,
IsContinueWhenError: false,
ResultHandler: nil,
}
}
func NewParallelTask(taskName string, userName string, config *ParallelConfig, worker WorkFunc, itemList interface{}, params ...interface{}) *Task {
if config == nil {
config = NewParallelConfig()
}
if config.ParallelCount == 0 {
config.ParallelCount = DefParallelCount
}
if config.ParallelCount > MaxParallelCount {
config.ParallelCount = MaxParallelCount
}
realItemList := utils.Interface2Slice(itemList)
jobList := jxutils.SplitSlice(realItemList, config.BatchSize)
jobListLen := jxutils.GetSliceLen(jobList)
if config.ParallelCount > jobListLen {
config.ParallelCount = jobListLen
}
task := &Task{
BaseTask: BaseTask{
ParallelCount: config.ParallelCount,
BatchSize: config.BatchSize,
IsContinueWhenError: config.IsContinueWhenError,
params: params,
ID: utils.GetUUID(),
Name: taskName,
CreatedAt: time.Now(),
CreatedBy: userName,
UpdatedAt: time.Now(),
TotalJobCount: len(jobList),
TotalItemCount: len(realItemList),
quitChan: make(chan int, config.ParallelCount),
finishChan: make(chan int, 2),
Status: TaskStatusWorking,
},
subFinishChan: make(chan interface{}, config.ParallelCount),
taskChan: make(chan []interface{}, len(realItemList)+config.ParallelCount), // 确保能装下所有taskitem加结束标记
resultHandler: config.ResultHandler,
worker: worker,
jobList: jobList,
}
task.C = task.finishChan
return task
}
func RunParallelTask(taskName string, userName string, config *ParallelConfig, worker WorkFunc, itemList interface{}, params ...interface{}) *Task {
task := NewParallelTask(taskName, userName, config, worker, itemList, params...)
return task.Run()
}
func RunTask(taskName string, isContinueWhenError bool, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, worker WorkFunc, itemList interface{}, params ...interface{}) *Task {
config := NewParallelConfig()
config.BatchSize = batchSize
config.IsContinueWhenError = isContinueWhenError
config.ParallelCount = parallelCount
config.ResultHandler = resultHandler
task := NewParallelTask(taskName, userName, config, worker, itemList, params...)
task.Run()
return task
}
func (task *Task) Run() *Task {
go func() {
globals.SugarLogger.Debugf("Run ParallelTask %s", task.Name)
for i := 0; i < task.ParallelCount; i++ {
go func() {
var chanRetVal interface{}
retVal := make([]interface{}, 0)
for {
select {
case <-task.quitChan: // 取消
goto end
case job := <-task.taskChan:
if job == nil { // 任务完成
chanRetVal = retVal
goto end
} else {
result, err := task.worker(job, task.params...)
globals.SugarLogger.Debugf("RunTask %s, after call worker result:%v, err:%v", task.Name, result, err)
task.finishedOneJob(len(job), err)
if err == nil {
if result != nil {
retVal = append(retVal, utils.Interface2Slice(result)...)
}
} else if !task.IsContinueWhenError { // 出错
chanRetVal = err
goto end
}
}
}
}
end:
globals.SugarLogger.Debugf("RunTask %s, put to chann chanRetVal:%v", task.Name, chanRetVal)
task.locker.RLock()
if task.Status < TaskStatusEndBegin {
task.subFinishChan <- chanRetVal
}
task.locker.RUnlock()
}()
}
for _, job := range task.jobList {
task.taskChan <- job
}
for i := 0; i < task.ParallelCount; i++ {
task.taskChan <- nil
}
taskResult := make([]interface{}, 0)
var taskErr error
for i := 0; i < task.ParallelCount; i++ {
result := <-task.subFinishChan
// globals.SugarLogger.Debugf("RunTask %s, received from chann result:%v", taskName, result)
if err2, ok := result.(error); ok {
task.Cancel()
taskResult = nil
taskErr = err2
break // 出错情况下是否需要直接跳出?
} else if result != nil {
resultList := result.([]interface{})
taskResult = append(taskResult, resultList...)
}
}
task.locker.Lock()
if taskErr != nil { // 如果有错误,肯定就是失败了
task.Status = TaskStatusFailed
} else {
if len(task.taskChan) > 0 {
taskErr = ErrTaskIsCanceled
task.Status = TaskStatusCanceled
} else {
task.Status = TaskStatusFinished
}
}
task.err = taskErr
task.result = taskResult
task.TerminatedAt = time.Now()
task.locker.Unlock()
globals.SugarLogger.Debugf("RunTask %s, result:%v, err:%v", task.Name, taskResult, taskErr)
close(task.finishChan)
close(task.subFinishChan)
close(task.quitChan)
if task.resultHandler != nil {
task.resultHandler(task.Name, taskResult, task.err)
}
}()
return task
}

View File

@@ -0,0 +1,79 @@
package tasksch
import (
"time"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/globals"
)
const (
StepBegin = "begin"
StepEnd = "End"
)
type SeqWorkFunc func(step int, params ...interface{}) (result interface{}, err error) // 只有最后一次返回结果保留
type SeqTask struct {
BaseTask
worker SeqWorkFunc
}
func NewSeqTask(taskName string, userName string, worker SeqWorkFunc, stepCount int, params ...interface{}) *SeqTask {
task := &SeqTask{
BaseTask: BaseTask{
ParallelCount: 1,
params: params,
ID: utils.GetUUID(),
Name: taskName,
CreatedAt: time.Now(),
CreatedBy: userName,
UpdatedAt: time.Now(),
TotalJobCount: stepCount,
TotalItemCount: stepCount,
quitChan: make(chan int, 1),
finishChan: make(chan int, 2),
Status: TaskStatusWorking,
},
worker: worker,
}
task.C = task.finishChan
return task
}
func (task *SeqTask) Run() *SeqTask {
go func() {
globals.SugarLogger.Debugf("Run SeqTask %s", task.Name)
var taskErr error
var taskResult interface{}
for i := 0; i < task.TotalItemCount; i++ {
taskResult, taskErr = task.worker(i, task.params...)
task.finishedOneJob(1, taskErr)
if taskErr != nil {
break
}
}
task.locker.Lock()
if taskErr != nil { // 如果有错误,肯定就是失败了
task.Status = TaskStatusFailed
} else {
if task.FinishedJobCount < task.TotalJobCount {
taskErr = ErrTaskIsCanceled
task.Status = TaskStatusCanceled
} else {
task.Status = TaskStatusFinished
}
}
task.err = taskErr
task.result = taskResult
task.TerminatedAt = time.Now()
task.locker.Unlock()
globals.SugarLogger.Debugf("Run SeqTask %s, result:%v, err:%v", task.Name, taskResult, taskErr)
close(task.finishChan)
close(task.quitChan)
}()
return task
}

View File

@@ -1,13 +1,8 @@
package tasksch
import (
"errors"
"sync"
"time"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxutils"
"git.rosy.net.cn/jx-callback/globals"
)
const (
@@ -22,39 +17,42 @@ const (
TaskStatusEnd = 4
)
const (
MaxParallelCount = 10
)
type ITask interface {
Run() *ITask
GetResult(duration time.Duration) (retVal []interface{}, err error)
Cancel()
GetTotalItemCount() int
GetFinishedItemCount() int
GetTotalJobCount() int
GetFinishedJobCount() int
GetStatus() int
}
type WorkFunc func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error)
type ResultHandlerFunc func(taskName string, result []interface{}, err error)
type Task struct {
ID string `json:"id"`
type BaseTask struct {
Name string `json:"name"`
ID string `json:"id"`
ParallelCount int `json:"parallelCount"`
BatchSize int `json:"batchSize"`
IsContinueWhenError bool `json:"isContinueWhenError"`
CreatedBy string `json:"createdBy"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
TerminatedAt time.Time `json:"terminatedAt"`
ParallelCount int `json:"parallelCount"`
TotalItemCount int `json:"totalItemCount"`
TotalJobCount int `json:"totalJobCount"`
FinishedItemCount int `json:"finishedItemCount"`
FinishedJobCount int `json:"finishedJobCount"`
FailedItemCount int `json:"failedItemCount"`
FailedJobCount int `json:"failedJobCount"`
IsContinueWhenError bool `json:"isContinueWhenError"`
Status int `json:"status"`
C <-chan int `json:"-"`
taskChan chan []interface{}
quitChan chan int
subFinishChan chan interface{}
finishChan chan int
finishChan chan int
C <-chan int `json:"-"`
params []interface{}
quitChan chan int
locker sync.RWMutex
result []interface{}
result interface{}
err error
}
@@ -74,131 +72,10 @@ func (s TaskList) Swap(i, j int) {
s[j] = tmp
}
var (
ErrTaskNotFinished = errors.New("任务还未完成")
ErrTaskIsCanceled = errors.New("任务被取消了")
)
func RunTask(taskName string, isContinueWhenError bool, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, worker WorkFunc, itemList interface{}, params ...interface{}) *Task {
if parallelCount > MaxParallelCount || parallelCount == 0 {
parallelCount = MaxParallelCount
}
realItemList := utils.Interface2Slice(itemList)
jobList := jxutils.SplitSlice(realItemList, batchSize)
jobListLen := jxutils.GetSliceLen(jobList)
if parallelCount > jobListLen {
parallelCount = jobListLen
}
task := &Task{
ID: utils.GetUUID(),
Name: taskName,
CreatedAt: time.Now(),
CreatedBy: userName,
UpdatedAt: time.Now(),
TotalJobCount: len(jobList),
TotalItemCount: len(realItemList),
ParallelCount: parallelCount,
taskChan: make(chan []interface{}, len(realItemList)+parallelCount), // 确保能装下所有taskitem加结束标记
quitChan: make(chan int, parallelCount),
subFinishChan: make(chan interface{}, parallelCount),
finishChan: make(chan int, 2),
Status: TaskStatusWorking,
IsContinueWhenError: isContinueWhenError,
}
task.C = task.finishChan
go func() {
globals.SugarLogger.Debugf("RunTask %s", taskName)
for i := 0; i < parallelCount; i++ {
go func() {
var chanRetVal interface{}
retVal := make([]interface{}, 0)
for {
select {
case <-task.quitChan: // 取消
goto end
case job := <-task.taskChan:
if job == nil { // 任务完成
chanRetVal = retVal
goto end
} else {
result, err := worker(job, params...)
globals.SugarLogger.Debugf("RunTask %s, after call worker result:%v, err:%v", taskName, result, err)
task.finishedOneJob(len(job), err)
if err == nil {
if result != nil {
retVal = append(retVal, utils.Interface2Slice(result)...)
}
} else if !isContinueWhenError { // 出错
chanRetVal = err
goto end
}
}
}
}
end:
globals.SugarLogger.Debugf("RunTask %s, put to chann chanRetVal:%v", taskName, chanRetVal)
task.locker.RLock()
if task.Status < TaskStatusEndBegin {
task.subFinishChan <- chanRetVal
}
task.locker.RUnlock()
}()
}
for _, job := range jobList {
task.taskChan <- job
}
for i := 0; i < parallelCount; i++ {
task.taskChan <- nil
}
taskResult := make([]interface{}, 0)
var taskErr error
for i := 0; i < parallelCount; i++ {
result := <-task.subFinishChan
// globals.SugarLogger.Debugf("RunTask %s, received from chann result:%v", taskName, result)
if err2, ok := result.(error); ok {
task.Cancel()
taskResult = nil
taskErr = err2
break // 出错情况下是否需要直接跳出?
} else if result != nil {
resultList := result.([]interface{})
taskResult = append(taskResult, resultList...)
}
}
task.locker.Lock()
if taskErr != nil { // 如果有错误,肯定就是失败了
task.Status = TaskStatusFailed
} else {
if len(task.taskChan) > 0 {
taskErr = ErrTaskIsCanceled
task.Status = TaskStatusCanceled
} else {
task.Status = TaskStatusFinished
}
}
task.err = taskErr
task.result = taskResult
task.TerminatedAt = time.Now()
task.locker.Unlock()
globals.SugarLogger.Debugf("RunTask %s, result:%v, err:%v", taskName, taskResult, taskErr)
close(task.finishChan)
close(task.subFinishChan)
close(task.quitChan)
if resultHandler != nil {
resultHandler(taskName, task.result, task.err)
}
}()
return task
}
func (t *Task) GetResult(duration time.Duration) (retVal []interface{}, err error) {
func (t *BaseTask) GetResult(duration time.Duration) (retVal []interface{}, err error) {
if t.GetStatus() >= TaskStatusEndBegin {
return t.result, t.err
retVal, _ = t.result.([]interface{})
return retVal, t.err
}
if duration == 0 {
duration = time.Hour * 10000 // duration为0表示无限等待
@@ -210,13 +87,14 @@ func (t *Task) GetResult(duration time.Duration) (retVal []interface{}, err erro
t.locker.RLock()
defer t.locker.RUnlock()
return t.result, t.err
retVal, _ = t.result.([]interface{})
return retVal, t.err
case <-timer.C:
}
return nil, ErrTaskNotFinished
}
func (t *Task) Cancel() {
func (t *BaseTask) Cancel() {
t.locker.Lock()
defer t.locker.Unlock()
if t.Status < TaskStatusEndBegin && t.Status != TaskStatusCanceling {
@@ -227,29 +105,29 @@ func (t *Task) Cancel() {
}
}
func (t *Task) GetTotalItemCount() int {
func (t *BaseTask) GetTotalItemCount() int {
return t.TotalItemCount
}
func (t *Task) GetFinishedItemCount() int {
func (t *BaseTask) GetFinishedItemCount() int {
t.locker.RLock()
defer t.locker.RUnlock()
return t.FinishedItemCount
}
func (t *Task) GetTotalJobCount() int {
func (t *BaseTask) GetTotalJobCount() int {
return t.TotalJobCount
}
func (t *Task) GetFinishedJobCount() int {
func (t *BaseTask) GetFinishedJobCount() int {
t.locker.RLock()
defer t.locker.RUnlock()
return t.FinishedJobCount
}
func (t *Task) GetStatus() int {
func (t *BaseTask) GetStatus() int {
t.locker.RLock()
defer t.locker.RUnlock()
@@ -257,8 +135,7 @@ func (t *Task) GetStatus() int {
}
/////////
func (t *Task) finishedOneJob(itemCount int, err error) {
func (t *BaseTask) finishedOneJob(itemCount int, err error) {
t.locker.Lock()
defer t.locker.Unlock()
@@ -272,7 +149,7 @@ func (t *Task) finishedOneJob(itemCount int, err error) {
}
}
func (t *Task) setStatus(status int) {
func (t *BaseTask) setStatus(status int) {
t.locker.Lock()
defer t.locker.Unlock()