- category sync almost ok.

- refactor tasksch (isContinueWhenError)
This commit is contained in:
gazebo
2018-09-22 10:26:31 +08:00
parent 0623d31c71
commit 81089fb85c
13 changed files with 283 additions and 114 deletions

View File

@@ -22,22 +22,29 @@ const (
TaskStatusEnd = 4
)
const (
MaxParallelCount = 10
)
type WorkFunc func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error)
type ResultHandlerFunc func(taskName string, result []interface{}, err error)
type Task struct {
ID string `json:"id"`
Name string `json:"name"`
CreatedBy string `json:"createdBy"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
TerminatedAt time.Time `json:"terminatedAt"`
ParallelCount int `json:"parallelCount"`
TotalItemCount int `json:"totalItemCount"`
TotalJobCount int `json:"totalJobCount"`
FinishedItemCount int `json:"finishedItemCount"`
FinishedJobCount int `json:"finishedJobCount"`
Status int `json:"status"`
ID string `json:"id"`
Name string `json:"name"`
CreatedBy string `json:"createdBy"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
TerminatedAt time.Time `json:"terminatedAt"`
ParallelCount int `json:"parallelCount"`
TotalItemCount int `json:"totalItemCount"`
TotalJobCount int `json:"totalJobCount"`
FinishedItemCount int `json:"finishedItemCount"`
FinishedJobCount int `json:"finishedJobCount"`
FailedItemCount int `json:"failedItemCount"`
FailedJobCount int `json:"failedJobCount"`
IsContinueWhenError bool `json:"isContinueWhenError"`
Status int `json:"status"`
C <-chan int `json:"-"`
@@ -72,23 +79,31 @@ var (
ErrTaskIsCanceled = errors.New("任务被取消了")
)
func RunTask(taskName string, worker WorkFunc, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, itemList interface{}, params ...interface{}) *Task {
func RunTask(taskName string, isContinueWhenError bool, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, worker WorkFunc, itemList interface{}, params ...interface{}) *Task {
if parallelCount > MaxParallelCount {
parallelCount = MaxParallelCount
}
listLen := jxutils.GetSliceLen(itemList)
if parallelCount > listLen {
parallelCount = listLen
}
realItemList := utils.Interface2Slice(itemList)
jobList := jxutils.SplitSlice(realItemList, batchSize)
task := &Task{
ID: utils.GetUUID(),
Name: taskName,
CreatedAt: time.Now(),
CreatedBy: userName,
UpdatedAt: time.Now(),
TotalJobCount: len(jobList),
TotalItemCount: len(realItemList),
ParallelCount: parallelCount,
taskChan: make(chan []interface{}, parallelCount*100),
quitChan: make(chan int, parallelCount),
subFinishChan: make(chan interface{}, parallelCount),
finishChan: make(chan int, 2),
Status: TaskStatusWorking,
ID: utils.GetUUID(),
Name: taskName,
CreatedAt: time.Now(),
CreatedBy: userName,
UpdatedAt: time.Now(),
TotalJobCount: len(jobList),
TotalItemCount: len(realItemList),
ParallelCount: parallelCount,
taskChan: make(chan []interface{}, len(realItemList)+parallelCount), // 确保能装下所有taskitem加结束标记
quitChan: make(chan int, parallelCount),
subFinishChan: make(chan interface{}, parallelCount),
finishChan: make(chan int, 2),
Status: TaskStatusWorking,
IsContinueWhenError: isContinueWhenError,
}
task.C = task.finishChan
go func() {
@@ -99,35 +114,34 @@ func RunTask(taskName string, worker WorkFunc, resultHandler ResultHandlerFunc,
retVal := make([]interface{}, 0)
for {
select {
case <-task.quitChan:
case <-task.quitChan: // 取消
goto end
case job := <-task.taskChan:
if job == nil {
if job == nil { // 任务完成
chanRetVal = retVal
goto end
} else {
result, err := worker(job, params...)
globals.SugarLogger.Debugf("RunTask %s, after call worker result:%v, err:%v", taskName, result, err)
task.finishedOneJob(len(job), err)
if err == nil {
task.finishedOneJob(len(job))
if result != nil {
retVal = append(retVal, utils.Interface2Slice(result)...)
}
} else {
} else if !isContinueWhenError { // 出错
chanRetVal = err
go func() {
task.Cancel()
}()
goto end
}
}
}
}
end:
// globals.SugarLogger.Debugf("RunTask %s, put to chann chanRetVal:%v", taskName, chanRetVal)
if task.GetStatus() < TaskStatusEndBegin {
globals.SugarLogger.Debugf("RunTask %s, put to chann chanRetVal:%v", taskName, chanRetVal)
task.locker.RLock()
if task.Status < TaskStatusEndBegin {
task.subFinishChan <- chanRetVal
}
task.locker.RUnlock()
}()
}
for _, job := range jobList {
@@ -141,10 +155,12 @@ func RunTask(taskName string, worker WorkFunc, resultHandler ResultHandlerFunc,
var taskErr error
for i := 0; i < parallelCount; i++ {
result := <-task.subFinishChan
// globals.SugarLogger.Debugf("RunTask %s, received from chann result:%v", taskName, result)
if err2, ok := result.(error); ok {
task.Cancel()
taskResult = nil
taskErr = err2
break
break // 出错情况下是否需要直接跳出?
} else if result != nil {
resultList := result.([]interface{})
taskResult = append(taskResult, resultList...)
@@ -201,11 +217,13 @@ func (t *Task) GetResult(duration time.Duration) (retVal []interface{}, err erro
}
func (t *Task) Cancel() {
if t.GetStatus() < TaskStatusEndBegin {
t.locker.Lock()
defer t.locker.Unlock()
if t.Status < TaskStatusEndBegin && t.Status != TaskStatusCanceling {
t.Status = TaskStatusCanceling
for i := 0; i < t.ParallelCount; i++ {
t.quitChan <- 0
}
t.setStatus(TaskStatusCanceling)
}
}
@@ -240,13 +258,18 @@ func (t *Task) GetStatus() int {
/////////
func (t *Task) finishedOneJob(itemCount int) {
func (t *Task) finishedOneJob(itemCount int, err error) {
t.locker.Lock()
defer t.locker.Unlock()
t.UpdatedAt = time.Now()
t.FinishedItemCount += itemCount
t.FinishedJobCount++
if err == nil {
t.FinishedItemCount += itemCount
t.FinishedJobCount++
} else {
t.FailedItemCount += itemCount
t.FailedJobCount++
}
}
func (t *Task) setStatus(status int) {