Files
jx-callback/business/jxutils/tasksch/task.go
2018-12-04 17:21:15 +08:00

314 lines
6.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package tasksch
import (
"encoding/json"
"fmt"
"strings"
"sync"
"time"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/globals"
)
const (
TaskStatusBegin = 0
TaskStatusWorking = 0
TaskStatusCanceling = 1
TaskStatusEndBegin = 2
TaskStatusFinished = 2
TaskStatusCanceled = 3
TaskStatusFailed = 4
TaskStatusEnd = 4
)
var (
TaskStatusName = map[int]string{
TaskStatusWorking: "运行中",
TaskStatusCanceling: "取消中",
TaskStatusFinished: "结束",
TaskStatusCanceled: "已取消",
TaskStatusFailed: "失败",
}
)
type TaskList []ITask
type ITask interface {
Run()
GetID() string
GetResult(duration time.Duration) (retVal []interface{}, err error)
Cancel()
GetTotalItemCount() int
GetFinishedItemCount() int
GetTotalJobCount() int
GetFinishedJobCount() int
GetStatus() int
GetCreatedAt() time.Time
GetCreatedBy() string
AddChild(task ITask) ITask
GetChildren() TaskList
SetParent(parentTask ITask)
json.Marshaler
}
type TaskError struct {
name string
errStr string
}
func (t *TaskError) MarshalJSON() ([]byte, error) {
return json.Marshal(t.Error())
}
func (t *TaskError) Error() string {
return fmt.Sprintf("任务[%s]执行失败,错误详情:\n%s", t.name, t.errStr)
}
func NewTaskError(name string, err error) *TaskError {
return &TaskError{
name: name,
errStr: err.Error(),
}
}
type BaseTask struct {
Name string `json:"name"`
ID string `json:"id"`
ParallelCount int `json:"parallelCount"`
BatchSize int `json:"batchSize"`
IsContinueWhenError bool `json:"isContinueWhenError"`
CreatedBy string `json:"createdBy"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
TerminatedAt time.Time `json:"terminatedAt"`
TotalItemCount int `json:"totalItemCount"`
TotalJobCount int `json:"totalJobCount"`
FinishedItemCount int `json:"finishedItemCount"`
FinishedJobCount int `json:"finishedJobCount"`
FailedItemCount int `json:"failedItemCount"`
FailedJobCount int `json:"failedJobCount"`
Status int `json:"status"`
Result []interface{} `json:"result"`
Children TaskList `json:"children"`
Err error `json:"err"`
detailErrMsgList []string `json:"-"`
finishChan chan int
C <-chan int `json:"-"`
params []interface{}
quitChan chan int
locker sync.RWMutex
parent ITask
}
func (s TaskList) Len() int {
return len(s)
}
func (s TaskList) Less(i, j int) bool {
return s[i].GetCreatedAt().Sub(s[j].GetCreatedAt()) > 0
}
func (s TaskList) Swap(i, j int) {
tmp := s[i]
s[i] = s[j]
s[j] = tmp
}
func (t *BaseTask) Init(parallelCount, batchSize int, isContinueWhenError bool, params []interface{}, name, userName string, totalItemCount, totalJobCount int) {
t.ID = utils.GetUUID()
t.ParallelCount = parallelCount
t.BatchSize = batchSize
t.IsContinueWhenError = isContinueWhenError
t.params = params
t.Name = name
t.CreatedAt = time.Now()
t.CreatedBy = userName
t.UpdatedAt = t.CreatedAt
t.TerminatedAt = utils.DefaultTimeValue
t.TotalItemCount = totalItemCount
t.TotalJobCount = totalJobCount
t.quitChan = make(chan int)
t.finishChan = make(chan int)
t.Status = TaskStatusWorking
t.C = t.finishChan
}
func (t *BaseTask) GetID() string {
return t.ID
}
func (t *BaseTask) GetResult(duration time.Duration) (retVal []interface{}, err error) {
if t.GetStatus() >= TaskStatusEndBegin {
return t.Result, t.Err
}
if duration == 0 {
duration = time.Hour * 10000 // duration为0表示无限等待
}
timer := time.NewTimer(duration)
select {
case <-t.finishChan:
timer.Stop()
return t.Result, t.Err
case <-timer.C:
}
return nil, ErrTaskNotFinished
}
func (t *BaseTask) GetCreatedAt() time.Time {
return t.CreatedAt
}
func (t *BaseTask) GetCreatedBy() string {
return t.CreatedBy
}
func (t *BaseTask) Cancel() {
t.locker.Lock()
if t.Status < TaskStatusEndBegin && t.Status != TaskStatusCanceling {
t.Status = TaskStatusCanceling
close(t.quitChan)
}
t.locker.Unlock()
children := t.GetChildren()
for _, subTask := range children {
subTask.Cancel()
}
}
func (t *BaseTask) GetTotalItemCount() int {
return t.TotalItemCount
}
func (t *BaseTask) GetFinishedItemCount() int {
t.locker.RLock()
defer t.locker.RUnlock()
return t.FinishedItemCount
}
func (t *BaseTask) GetTotalJobCount() int {
return t.TotalJobCount
}
func (t *BaseTask) GetFinishedJobCount() int {
t.locker.RLock()
defer t.locker.RUnlock()
return t.FinishedJobCount
}
func (t *BaseTask) GetStatus() int {
t.locker.RLock()
defer t.locker.RUnlock()
return t.Status
}
func (t *BaseTask) AddChild(task ITask) ITask {
t.locker.Lock()
defer t.locker.Unlock()
t.Children = append(t.Children, task)
return task
}
func (t *BaseTask) GetChildren() (children TaskList) {
t.locker.RLock()
defer t.locker.RUnlock()
if len(t.Children) > 0 {
children = make(TaskList, len(t.Children))
copy(children, t.Children)
}
return children
}
func (t *BaseTask) SetParent(parentTask ITask) {
t.locker.Lock()
defer t.locker.Unlock()
if t.parent != nil {
panic(fmt.Sprintf("task:%s already have parent!", utils.Format4Output(t, false)))
}
t.parent = parentTask
}
func AddChild(parentTask ITask, task ITask) ITask {
if parentTask != nil {
return parentTask.AddChild(task)
}
return task
}
/////////
func (t *BaseTask) MarshalJSON() ([]byte, error) {
type tBaseTask BaseTask
t.locker.RLock()
defer t.locker.RUnlock()
return utils.MustMarshal((*tBaseTask)(t)), nil
}
func (t *BaseTask) run(taskHandler func()) {
if t.GetStatus() == TaskStatusBegin {
utils.CallFuncAsync(func() {
defer func() {
if r := recover(); r != nil {
globals.SugarLogger.Errorf("panic in BaseTask.run task:%s, task detail:%s", t.Name, utils.Format4Output(t, false))
}
}()
taskHandler()
select {
case <-t.quitChan:
default:
close(t.quitChan)
}
for _, subTask := range t.Children {
if _, err := subTask.GetResult(0); err != nil {
globals.SugarLogger.Infof("BaseTask run, failed with error:%v", err)
}
}
close(t.finishChan)
})
}
}
func (t *BaseTask) finishedOneJob(itemCount int, err error) {
t.locker.Lock()
defer t.locker.Unlock()
t.UpdatedAt = time.Now()
if err == nil {
t.FinishedItemCount += itemCount
t.FinishedJobCount++
} else {
t.FailedItemCount += itemCount
t.FailedJobCount++
}
}
func (t *BaseTask) setStatus(status int) {
t.locker.Lock()
defer t.locker.Unlock()
t.Status = status
}
func (t *BaseTask) buildTaskErrFromDetail() (err error) {
if len(t.detailErrMsgList) > 0 {
return NewTaskError(t.Name, fmt.Errorf("设置了错误继续标志,部分操作失败,总任务数:%d失败数%d以下为详情\n%s", t.TotalItemCount, t.FailedItemCount, strings.Join(t.detailErrMsgList, "\n")))
}
return nil
}