package tasksch import ( "encoding/json" "fmt" "strings" "sync" "time" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/globals" ) const ( TaskStatusBegin = 0 TaskStatusWorking = 0 TaskStatusCanceling = 1 TaskStatusEndBegin = 2 TaskStatusFinished = 2 TaskStatusCanceled = 3 TaskStatusFailed = 4 TaskStatusEnd = 4 ) var ( TaskStatusName = map[int]string{ TaskStatusWorking: "运行中", TaskStatusCanceling: "取消中", TaskStatusFinished: "结束", TaskStatusCanceled: "已取消", TaskStatusFailed: "失败", } ) type TaskList []ITask type ITask interface { Run() GetID() string GetResult(duration time.Duration) (retVal []interface{}, err error) Cancel() GetTotalItemCount() int GetFinishedItemCount() int GetTotalJobCount() int GetFinishedJobCount() int GetStatus() int GetCreatedAt() time.Time GetCreatedBy() string AddChild(task ITask) ITask GetChildren() TaskList SetParent(parentTask ITask) json.Marshaler } type TaskError struct { name string errStr string } func (t *TaskError) MarshalJSON() ([]byte, error) { return json.Marshal(t.Error()) } func (t *TaskError) Error() string { return fmt.Sprintf("任务[%s]执行失败,错误详情:\n%s", t.name, t.errStr) } func NewTaskError(name string, err error) *TaskError { return &TaskError{ name: name, errStr: err.Error(), } } type BaseTask struct { Name string `json:"name"` ID string `json:"id"` ParallelCount int `json:"parallelCount"` BatchSize int `json:"batchSize"` IsContinueWhenError bool `json:"isContinueWhenError"` CreatedBy string `json:"createdBy"` CreatedAt time.Time `json:"createdAt"` UpdatedAt time.Time `json:"updatedAt"` TerminatedAt time.Time `json:"terminatedAt"` TotalItemCount int `json:"totalItemCount"` TotalJobCount int `json:"totalJobCount"` FinishedItemCount int `json:"finishedItemCount"` FinishedJobCount int `json:"finishedJobCount"` FailedItemCount int `json:"failedItemCount"` FailedJobCount int `json:"failedJobCount"` Status int `json:"status"` Result []interface{} `json:"result"` Children TaskList `json:"children"` Err error `json:"err"` detailErrMsgList []string `json:"-"` finishChan chan int C <-chan int `json:"-"` params []interface{} quitChan chan int locker sync.RWMutex parent ITask } func (s TaskList) Len() int { return len(s) } func (s TaskList) Less(i, j int) bool { return s[i].GetCreatedAt().Sub(s[j].GetCreatedAt()) > 0 } func (s TaskList) Swap(i, j int) { tmp := s[i] s[i] = s[j] s[j] = tmp } func (t *BaseTask) Init(parallelCount, batchSize int, isContinueWhenError bool, params []interface{}, name, userName string, totalItemCount, totalJobCount int) { t.ID = utils.GetUUID() t.ParallelCount = parallelCount t.BatchSize = batchSize t.IsContinueWhenError = isContinueWhenError t.params = params t.Name = name t.CreatedAt = time.Now() t.CreatedBy = userName t.UpdatedAt = t.CreatedAt t.TerminatedAt = utils.DefaultTimeValue t.TotalItemCount = totalItemCount t.TotalJobCount = totalJobCount t.quitChan = make(chan int) t.finishChan = make(chan int) t.Status = TaskStatusWorking t.C = t.finishChan } func (t *BaseTask) GetID() string { return t.ID } func (t *BaseTask) GetResult(duration time.Duration) (retVal []interface{}, err error) { if t.GetStatus() >= TaskStatusEndBegin { return t.Result, t.Err } if duration == 0 { duration = time.Hour * 10000 // duration为0表示无限等待 } timer := time.NewTimer(duration) select { case <-t.finishChan: timer.Stop() return t.Result, t.Err case <-timer.C: } return nil, ErrTaskNotFinished } func (t *BaseTask) GetCreatedAt() time.Time { return t.CreatedAt } func (t *BaseTask) GetCreatedBy() string { return t.CreatedBy } func (t *BaseTask) Cancel() { t.locker.Lock() if t.Status < TaskStatusEndBegin && t.Status != TaskStatusCanceling { t.Status = TaskStatusCanceling close(t.quitChan) } t.locker.Unlock() children := t.GetChildren() for _, subTask := range children { subTask.Cancel() } } func (t *BaseTask) GetTotalItemCount() int { return t.TotalItemCount } func (t *BaseTask) GetFinishedItemCount() int { t.locker.RLock() defer t.locker.RUnlock() return t.FinishedItemCount } func (t *BaseTask) GetTotalJobCount() int { return t.TotalJobCount } func (t *BaseTask) GetFinishedJobCount() int { t.locker.RLock() defer t.locker.RUnlock() return t.FinishedJobCount } func (t *BaseTask) GetStatus() int { t.locker.RLock() defer t.locker.RUnlock() return t.Status } func (t *BaseTask) AddChild(task ITask) ITask { t.locker.Lock() defer t.locker.Unlock() t.Children = append(t.Children, task) return task } func (t *BaseTask) GetChildren() (children TaskList) { t.locker.RLock() defer t.locker.RUnlock() if len(t.Children) > 0 { children = make(TaskList, len(t.Children)) copy(children, t.Children) } return children } func (t *BaseTask) SetParent(parentTask ITask) { t.locker.Lock() defer t.locker.Unlock() if t.parent != nil { panic(fmt.Sprintf("task:%s already have parent!", utils.Format4Output(t, false))) } t.parent = parentTask } func AddChild(parentTask ITask, task ITask) ITask { if parentTask != nil { return parentTask.AddChild(task) } return task } ///////// func (t *BaseTask) MarshalJSON() ([]byte, error) { type tBaseTask BaseTask t.locker.RLock() defer t.locker.RUnlock() return utils.MustMarshal((*tBaseTask)(t)), nil } func (t *BaseTask) run(taskHandler func()) { if t.GetStatus() == TaskStatusBegin { utils.CallFuncAsync(func() { defer func() { if r := recover(); r != nil { globals.SugarLogger.Errorf("panic in BaseTask.run task:%s, task detail:%s", t.Name, utils.Format4Output(t, false)) } }() taskHandler() select { case <-t.quitChan: default: close(t.quitChan) } for _, subTask := range t.Children { if _, err := subTask.GetResult(0); err != nil { globals.SugarLogger.Infof("BaseTask run, failed with error:%v", err) } } close(t.finishChan) }) } } func (t *BaseTask) finishedOneJob(itemCount int, err error) { t.locker.Lock() defer t.locker.Unlock() t.UpdatedAt = time.Now() if err == nil { t.FinishedItemCount += itemCount t.FinishedJobCount++ } else { t.FailedItemCount += itemCount t.FailedJobCount++ } } func (t *BaseTask) setStatus(status int) { t.locker.Lock() defer t.locker.Unlock() t.Status = status } func (t *BaseTask) buildTaskErrFromDetail() (err error) { if len(t.detailErrMsgList) > 0 { return NewTaskError(t.Name, fmt.Errorf("设置了错误继续标志,部分操作失败,总任务数:%d,失败数:%d,以下为详情:\n%s", t.TotalItemCount, t.FailedItemCount, strings.Join(t.detailErrMsgList, "\n"))) } return nil }