package tasksch import ( "encoding/json" "fmt" "strings" "sync" "time" "git.rosy.net.cn/baseapi/platformapi/dingdingapi" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" "git.rosy.net.cn/jx-callback/business/jxutils/msg" "git.rosy.net.cn/jx-callback/globals" ) const ( TaskStatusBegin = 0 TaskStatusWorking = 0 TaskStatusCanceling = 1 TaskStatusEndBegin = 2 TaskStatusFinished = 2 TaskStatusCanceled = 3 TaskStatusFailed = 4 TaskStatusEnd = 4 ) const ( MaxTaskNameLen = 50 ) var ( TaskStatusName = map[int]string{ TaskStatusWorking: "运行中", TaskStatusCanceling: "取消中", TaskStatusFinished: "结束", TaskStatusCanceled: "已取消", TaskStatusFailed: "失败", } ) type TaskList []ITask type ITask interface { Run() GetID() string GetResult(duration time.Duration) (retVal []interface{}, err error) Cancel() GetTotalItemCount() int GetFinishedItemCount() int GetTotalJobCount() int GetFinishedJobCount() int GetStatus() int GetCreatedAt() time.Time GetCreatedBy() string AddChild(task ITask) ITask GetChildren() TaskList SetParent(parentTask ITask) GetOriginalErr() error GetDetailErrList() []error json.Marshaler } type TaskError struct { name string errStr string } func (t *TaskError) MarshalJSON() ([]byte, error) { return json.Marshal(t.Error()) } func (t *TaskError) Error() string { return fmt.Sprintf("[%s], 错误:%s", t.name, t.errStr) } func (t *TaskError) String() string { return t.Error() } func NewTaskError(name string, err error) *TaskError { return &TaskError{ name: name, errStr: err.Error(), } } type BaseTask struct { Name string `json:"name"` ID string `json:"id"` ParallelCount int `json:"parallelCount"` BatchSize int `json:"batchSize"` IsContinueWhenError bool `json:"isContinueWhenError"` CreatedBy string `json:"createdBy"` CreatedAt time.Time `json:"createdAt"` UpdatedAt time.Time `json:"updatedAt"` TerminatedAt time.Time `json:"terminatedAt"` TotalItemCount int `json:"totalItemCount"` TotalJobCount int `json:"totalJobCount"` FinishedItemCount int `json:"finishedItemCount"` FinishedJobCount int `json:"finishedJobCount"` FailedItemCount int `json:"failedItemCount"` FailedJobCount int `json:"failedJobCount"` Status int `json:"status"` NoticeMsg string `json:"noticeMsg"` Result []interface{} `json:"-"` Children TaskList `json:"children"` Err error `json:"err"` OriginalErr error `json:"-"` detailErrList []error finishChan chan struct{} C <-chan struct{} `json:"-"` params []interface{} quitChan chan int locker sync.RWMutex parent ITask ctx *jxcontext.Context isGetResultCalled bool } func (s TaskList) Len() int { return len(s) } func (s TaskList) Less(i, j int) bool { return s[i].GetCreatedAt().Sub(s[j].GetCreatedAt()) > 0 } func (s TaskList) Swap(i, j int) { tmp := s[i] s[i] = s[j] s[j] = tmp } func (t *BaseTask) Init(parallelCount, batchSize int, isContinueWhenError bool, params []interface{}, name string, ctx *jxcontext.Context, totalItemCount, totalJobCount int) { t.ID = utils.GetUUID() t.ParallelCount = parallelCount t.BatchSize = batchSize t.IsContinueWhenError = isContinueWhenError t.params = params t.Name = utils.LimitUTF8StringLen(name, MaxTaskNameLen) t.CreatedAt = time.Now() t.ctx = ctx t.CreatedBy = ctx.GetUserName() t.UpdatedAt = t.CreatedAt t.TerminatedAt = utils.DefaultTimeValue t.TotalItemCount = totalItemCount t.TotalJobCount = totalJobCount t.quitChan = make(chan int) t.finishChan = make(chan struct{}) t.Status = TaskStatusWorking t.C = t.finishChan } func (t *BaseTask) GetID() string { return t.ID } func (t *BaseTask) GetResult(duration time.Duration) (retVal []interface{}, err error) { if t.GetStatus() >= TaskStatusEndBegin { return t.Result, t.OriginalErr } if duration == 0 { duration = time.Hour * 10000 // duration为0表示无限等待 } timer := time.NewTimer(duration) select { case <-t.finishChan: t.isGetResultCalled = true timer.Stop() return t.Result, t.OriginalErr case <-timer.C: } return nil, ErrTaskNotFinished } func (t *BaseTask) GetCreatedAt() time.Time { return t.CreatedAt } func (t *BaseTask) GetCreatedBy() string { return t.CreatedBy } func (t *BaseTask) Cancel() { t.locker.Lock() if t.Status < TaskStatusEndBegin && t.Status != TaskStatusCanceling { t.Status = TaskStatusCanceling close(t.quitChan) } t.locker.Unlock() children := t.GetChildren() for _, subTask := range children { subTask.Cancel() } } func (t *BaseTask) GetTotalItemCount() int { return t.TotalItemCount } func (t *BaseTask) GetFinishedItemCount() int { t.locker.RLock() defer t.locker.RUnlock() return t.FinishedItemCount } func (t *BaseTask) GetTotalJobCount() int { return t.TotalJobCount } func (t *BaseTask) GetFinishedJobCount() int { t.locker.RLock() defer t.locker.RUnlock() return t.FinishedJobCount } func (t *BaseTask) GetStatus() int { t.locker.RLock() defer t.locker.RUnlock() return t.Status } func (t *BaseTask) AddChild(task ITask) ITask { t.locker.Lock() defer t.locker.Unlock() t.Children = append(t.Children, task) return task } func (t *BaseTask) GetChildren() (children TaskList) { t.locker.RLock() defer t.locker.RUnlock() if len(t.Children) > 0 { children = make(TaskList, len(t.Children)) copy(children, t.Children) } return children } func (t *BaseTask) SetParent(parentTask ITask) { t.locker.Lock() defer t.locker.Unlock() if t.parent != nil { panic(fmt.Sprintf("task:%s already have parent!", utils.Format4Output(t, false))) } t.parent = parentTask } func (t *BaseTask) SetNoticeMsg(noticeMsg string) { t.locker.Lock() defer t.locker.Unlock() t.NoticeMsg = noticeMsg } func (t *BaseTask) GetNoticeMsg() string { t.locker.RLock() defer t.locker.RUnlock() return t.NoticeMsg } func (t *BaseTask) GetOriginalErr() error { t.locker.RLock() defer t.locker.RUnlock() return t.OriginalErr } func (t *BaseTask) GetDetailErrList() []error { t.locker.RLock() defer t.locker.RUnlock() return t.detailErrList } func AddChild(parentTask ITask, task ITask) ITask { if parentTask != nil { return parentTask.AddChild(task) } return task } func HandleTask(task, parentTask ITask, isMangeIt bool) ITask { AddChild(parentTask, task) if parentTask == nil && isMangeIt { ManageTask(task) } return task } ///////// func (t *BaseTask) MarshalJSON() ([]byte, error) { type tBaseTask BaseTask t.locker.RLock() defer t.locker.RUnlock() return utils.MustMarshal((*tBaseTask)(t)), nil } func (t *BaseTask) run(taskHandler func()) { if t.GetStatus() == TaskStatusBegin { utils.CallFuncAsync(func() { defer func() { if r := recover(); r != nil { globals.SugarLogger.Errorf("panic in BaseTask.run task:%s, task detail:%s, r:%v", t.Name, utils.Format4Output(t, false), r) } }() taskHandler() select { case <-t.quitChan: default: close(t.quitChan) } for _, subTask := range t.Children { if _, err := subTask.GetResult(0); err != nil { globals.SugarLogger.Infof("BaseTask run, failed with error:%v", err) } } close(t.finishChan) time.Sleep(10 * time.Millisecond) // 等待GetResult中的isGetResultCalled赋值 globals.SugarLogger.Debugf("BaseTask task ID:%s, name:%s finished, isGetResultCalled:%t", t.ID, t.Name, t.isGetResultCalled) if !t.isGetResultCalled && t.parent == nil && len(GetTasks(t.ID, TaskStatusBegin, TaskStatusEnd, 24, "")) > 0 { if authInfo, err := t.ctx.GetV2AuthInfo(); err == nil { // 这里应该是不管登录类型,直接以可能的方式发消息 var content string taskDesc := fmt.Sprintf("你的异步任务[%s],ID[%s],开始于:%s,结束于:%s,", t.Name, t.ID, utils.Time2Str(t.CreatedAt), utils.Time2Str(t.TerminatedAt)) if t.Err == nil { content = fmt.Sprintf("%s执行%s", taskDesc, TaskStatusName[t.Status]) noticeMsg := t.GetNoticeMsg() if noticeMsg != "" { content += ",通知消息:" + noticeMsg } } else { if t.Status == TaskStatusFinished { content = fmt.Sprintf("%s执行部分失败,%s", taskDesc, t.Err.Error()) } else { content = fmt.Sprintf("%s执行失败,%s", taskDesc, t.Err.Error()) } } msg.SendUserMessage(dingdingapi.MsgTyeText, authInfo.UserID, "异步任务完成", content) } } }) } } func (t *BaseTask) finishedOneJob(itemCount int, err error) { t.locker.Lock() defer t.locker.Unlock() t.UpdatedAt = time.Now() if err == nil { t.FinishedItemCount += itemCount t.FinishedJobCount++ } else { t.FailedItemCount += itemCount t.FailedJobCount++ } } func (t *BaseTask) setStatus(status int) { t.locker.Lock() defer t.locker.Unlock() t.Status = status } func (t *BaseTask) buildTaskErrFromDetail() (err error) { if len(t.detailErrList) > 0 { strList := make([]string, len(t.detailErrList)) for k, v := range t.detailErrList { strList[k] = v.Error() } return NewTaskError(t.Name, fmt.Errorf("总共:%d, 失败:%d, 详情:\n%s", t.TotalItemCount, t.FailedItemCount, strings.Join(strList, "\n"))) } return nil } func (task *BaseTask) callWorker(worker func() (retVal interface{}, err error)) (retVal interface{}, err error) { defer func() { if r := recover(); r != nil { globals.SugarLogger.Errorf("callWorker panic:%v", r) err = fmt.Errorf("panic, r:%v", r) } }() retVal, err = worker() return retVal, err }