package tasksch import ( "encoding/json" "fmt" "strings" "sync" "time" "git.rosy.net.cn/baseapi/platformapi/dingdingapi" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxutils/ddmsg" "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" "git.rosy.net.cn/jx-callback/globals" ) const ( TaskStatusBegin = 0 TaskStatusWorking = 0 TaskStatusCanceling = 1 TaskStatusEndBegin = 2 TaskStatusFinished = 2 TaskStatusCanceled = 3 TaskStatusFailed = 4 TaskStatusEnd = 4 ) const ( MaxTaskNameLen = 50 ) var ( TaskStatusName = map[int]string{ TaskStatusWorking: "运行中", TaskStatusCanceling: "取消中", TaskStatusFinished: "结束", TaskStatusCanceled: "已取消", TaskStatusFailed: "失败", } ) type TaskList []ITask type ITask interface { Run() GetID() string GetResult(duration time.Duration) (retVal []interface{}, err error) Cancel() GetTotalItemCount() int GetFinishedItemCount() int GetTotalJobCount() int GetFinishedJobCount() int GetStatus() int GetCreatedAt() time.Time GetCreatedBy() string AddChild(task ITask) ITask GetChildren() TaskList SetParent(parentTask ITask) // GetOriginalErr() error GetErr() error // GetDetailErrList() []error GetLeafResult() (finishedItemCount, failedItemCount int) AddBatchErr(err error) AddFailedList(failedList ...interface{}) GetFailedList() (failedList []interface{}) SetFinishHook(func(task ITask)) json.Marshaler } // type TaskError struct { // name string // errStr string // } // func (t *TaskError) MarshalJSON() ([]byte, error) { // return json.Marshal(t.Error()) // } // func (t *TaskError) Error() string { // return fmt.Sprintf("[%s], 错误:%s", t.name, t.errStr) // } // func (t *TaskError) String() string { // return t.Error() // } // func NewTaskError(name string, err error) *TaskError { // return &TaskError{ // name: name, // errStr: err.Error(), // } // } type BaseTask struct { Name string `json:"name"` ID string `json:"id"` ParallelCount int `json:"parallelCount"` BatchSize int `json:"batchSize"` IsContinueWhenError bool `json:"isContinueWhenError"` CreatedBy string `json:"createdBy"` CreatedAt time.Time `json:"createdAt"` UpdatedAt time.Time `json:"updatedAt"` TerminatedAt time.Time `json:"terminatedAt"` TotalItemCount int `json:"totalItemCount"` TotalJobCount int `json:"totalJobCount"` FinishedItemCount int `json:"finishedItemCount"` FinishedJobCount int `json:"finishedJobCount"` FailedItemCount int `json:"failedItemCount"` FailedJobCount int `json:"failedJobCount"` Status int `json:"status"` NoticeMsg string `json:"noticeMsg"` Result []interface{} `json:"-"` Children TaskList `json:"children"` Err string `json:"err"` mainErr error batchErrList []error finishChan chan struct{} C <-chan struct{} `json:"-"` params []interface{} quitChan chan int locker sync.RWMutex parent ITask ctx *jxcontext.Context isGetResultCalled bool FailedList []interface{} finishHook func(task ITask) } func (s TaskList) Len() int { return len(s) } func (s TaskList) Less(i, j int) bool { return s[i].GetCreatedAt().Sub(s[j].GetCreatedAt()) > 0 } func (s TaskList) Swap(i, j int) { tmp := s[i] s[i] = s[j] s[j] = tmp } func (t *BaseTask) Init(parallelCount, batchSize int, isContinueWhenError bool, params []interface{}, name string, ctx *jxcontext.Context, totalItemCount, totalJobCount int) { t.ID = utils.GetUUID() t.ParallelCount = parallelCount t.BatchSize = batchSize t.IsContinueWhenError = isContinueWhenError t.params = params t.Name = utils.LimitUTF8StringLen(name, MaxTaskNameLen) t.CreatedAt = time.Now() t.ctx = ctx t.CreatedBy = ctx.GetUserName() t.UpdatedAt = t.CreatedAt t.TerminatedAt = utils.DefaultTimeValue t.TotalItemCount = totalItemCount t.TotalJobCount = totalJobCount t.quitChan = make(chan int) t.finishChan = make(chan struct{}) t.Status = TaskStatusWorking t.C = t.finishChan } func (t *BaseTask) GetID() string { return t.ID } func (t *BaseTask) Run() { } // 此函数成功返回结果后,结果在任务中会被删除(以免被管理的任务不必要的HOLD住对象) func (t *BaseTask) GetResult(duration time.Duration) (retVal []interface{}, err error) { if t.GetStatus() >= TaskStatusEndBegin { return t.getResult(), t.GetErr() } if duration == 0 { duration = time.Hour * 10000 // duration为0表示无限等待 } timer := time.NewTimer(duration) select { case <-t.finishChan: t.isGetResultCalled = true timer.Stop() return t.getResult(), t.GetErr() case <-timer.C: } return nil, ErrTaskNotFinished } func (t *BaseTask) GetCreatedAt() time.Time { return t.CreatedAt } func (t *BaseTask) GetCreatedBy() string { return t.CreatedBy } func (t *BaseTask) Cancel() { t.locker.Lock() if t.Status < TaskStatusEndBegin && t.Status != TaskStatusCanceling { t.Status = TaskStatusCanceling close(t.quitChan) } t.locker.Unlock() children := t.GetChildren() for _, subTask := range children { subTask.Cancel() } } func (t *BaseTask) GetTotalItemCount() int { return t.TotalItemCount } func (t *BaseTask) GetFinishedItemCount() int { t.locker.RLock() defer t.locker.RUnlock() return t.FinishedItemCount } func (t *BaseTask) GetTotalJobCount() int { return t.TotalJobCount } func (t *BaseTask) GetFinishedJobCount() int { t.locker.RLock() defer t.locker.RUnlock() return t.FinishedJobCount } func (t *BaseTask) GetStatus() int { t.locker.RLock() defer t.locker.RUnlock() return t.Status } func (t *BaseTask) AddChild(task ITask) ITask { t.locker.Lock() defer t.locker.Unlock() t.Children = append(t.Children, task) return task } func (t *BaseTask) GetChildren() (children TaskList) { t.locker.RLock() defer t.locker.RUnlock() if len(t.Children) > 0 { children = make(TaskList, len(t.Children)) copy(children, t.Children) } return children } func (t *BaseTask) SetParent(parentTask ITask) { t.locker.Lock() defer t.locker.Unlock() if t.parent != nil { panic(fmt.Sprintf("task:%s already have parent!", utils.Format4Output(t, false))) } t.parent = parentTask } func (t *BaseTask) SetNoticeMsg(noticeMsg string) { t.locker.Lock() defer t.locker.Unlock() t.NoticeMsg = noticeMsg } func (t *BaseTask) GetNoticeMsg() string { t.locker.RLock() defer t.locker.RUnlock() return t.NoticeMsg } func (t *BaseTask) getResult() (result []interface{}) { t.locker.Lock() defer t.locker.Unlock() result = t.Result t.Result = nil return result } func (t *BaseTask) AddBatchErr(err error) { if err != nil { t.locker.Lock() defer t.locker.Unlock() t.batchErrList = append(t.batchErrList, err) } } // func (t *BaseTask) GetOriginalErr() error { // t.locker.RLock() // defer t.locker.RUnlock() // return nil // } func (t *BaseTask) GetErr() error { t.locker.RLock() defer t.locker.RUnlock() if t.mainErr == nil && len(t.batchErrList) == 0 { return nil } return t } func (t *BaseTask) GetLeafResult() (finishedItemCount, failedItemCount int) { if len(t.Children) == 0 { return t.FinishedItemCount, t.FailedItemCount } for _, v := range t.Children { subFinishedItemCount, subFailedItemCount := v.GetLeafResult() finishedItemCount += subFinishedItemCount failedItemCount += subFailedItemCount } return finishedItemCount, failedItemCount } func (t *BaseTask) Error() (errMsg string) { if t.GetErr() == nil { return "" } t.locker.RLock() errMsg = t.Err t.locker.RUnlock() if errMsg != "" { return errMsg } errMsg = "任务:" + t.Name if t.parent == nil { finishedItemCount, failedItemCount := t.GetLeafResult() errMsg += fmt.Sprintf(", 全部总共:%d, 成功:%d, 失败:%d,\n", (finishedItemCount + failedItemCount), finishedItemCount, failedItemCount) } if t.mainErr != nil { errMsg += "失败" errMsg += "," + t.mainErr.Error() } else { errMsg += fmt.Sprintf("部分失败, 总共:%d, 成功:%d, 失败:%d, 详情如下:\n", t.TotalItemCount, t.FinishedItemCount, t.FailedItemCount) strList := make([]string, len(t.batchErrList)) for k, v := range t.batchErrList { strList[k] = v.Error() } errMsg += strings.Join(strList, ",\n") } t.locker.Lock() t.Err = errMsg t.locker.Unlock() return errMsg } func (t *BaseTask) SetFinishHook(hook func(task ITask)) { t.locker.RLock() defer t.locker.RUnlock() t.finishHook = hook } func (t *BaseTask) GetFailedList() (failedList []interface{}) { t.locker.RLock() failedList = append(failedList, t.FailedList...) t.locker.RUnlock() for _, v := range t.Children { failedList = append(failedList, v.GetFailedList()...) } return failedList } func (t *BaseTask) AddFailedList(failedList ...interface{}) { if len(failedList) > 0 { t.locker.Lock() defer t.locker.Unlock() t.FailedList = append(t.FailedList, failedList...) } } // func (t *BaseTask) GetDetailErrList() []error { // t.locker.RLock() // defer t.locker.RUnlock() // return t.batchErrList // } func AddChild(parentTask ITask, task ITask) ITask { if parentTask != nil { return parentTask.AddChild(task) } return task } func HandleTask(task, parentTask ITask, isMangeIt bool) ITask { AddChild(parentTask, task) if parentTask == nil && isMangeIt { ManageTask(task) } return task } ///////// func (t *BaseTask) MarshalJSON() ([]byte, error) { type tBaseTask BaseTask t.locker.RLock() defer t.locker.RUnlock() return utils.MustMarshal((*tBaseTask)(t)), nil } func (t *BaseTask) run(taskHandler func()) { if t.GetStatus() == TaskStatusBegin { utils.CallFuncAsync(func() { taskHandler() task := t task.locker.Lock() if task.mainErr != nil { // 如果有错误,肯定就是失败了 task.Status = TaskStatusFailed } else { if task.FinishedJobCount+task.FailedJobCount < task.TotalJobCount { task.mainErr = ErrTaskIsCanceled task.Status = TaskStatusCanceled } else { task.Status = TaskStatusFinished } } task.TerminatedAt = time.Now() task.locker.Unlock() task.Error() globals.SugarLogger.Debugf("Task:%s, mainErr:%v, batchErrList:%v", task.Name, task.mainErr, task.batchErrList) select { case <-t.quitChan: default: close(t.quitChan) } // todo 如下代码可能有对t.Children操作的并发问题 for _, subTask := range t.Children { if _, err := subTask.GetResult(0); err != nil { globals.SugarLogger.Infof("BaseTask run, failed with error:%v", err) } } close(t.finishChan) if t.finishHook != nil { t.finishHook(t) } else { SendMessage(t) } }) } } func SendMessage(t *BaseTask) { time.Sleep(10 * time.Millisecond) // 等待GetResult中的isGetResultCalled赋值 globals.SugarLogger.Debugf("BaseTask task ID:%s, name:%s finished, isGetResultCalled:%t", t.ID, t.Name, t.isGetResultCalled) if !t.isGetResultCalled && t.parent == nil && len(GetTasks(t.ID, TaskStatusBegin, TaskStatusEnd, 24, "")) > 0 { if authInfo, err := t.ctx.GetV2AuthInfo(); err == nil { // 这里应该是不管登录类型,直接以可能的方式发消息 var content string taskDesc := fmt.Sprintf("你的异步任务[%s],ID[%s],开始于:%s,结束于:%s,", t.Name, t.ID, utils.Time2Str(t.CreatedAt), utils.Time2Str(t.TerminatedAt)) content = fmt.Sprintf("%s执行%s", taskDesc, TaskStatusName[t.Status]) if t.Error() == "" { noticeMsg := t.GetNoticeMsg() if noticeMsg != "" { content += ",通知消息:" + noticeMsg } } else { content += ",\n" + t.Error() } ddmsg.SendUserMessage(dingdingapi.MsgTyeText, authInfo.UserID, "异步任务完成", content) } } } // successCount表示在返回错误的情况下,(部分)成功的个数,如果没有返回错误,则successCount无意义 func (t *BaseTask) finishedOneJob(itemCount, successCount int, err error) { t.locker.Lock() defer t.locker.Unlock() t.UpdatedAt = time.Now() if err == nil { t.FinishedItemCount += itemCount t.FinishedJobCount++ } else { t.FinishedItemCount += successCount t.FailedItemCount += itemCount - successCount t.FailedJobCount++ } } func (t *BaseTask) setStatus(status int) { t.locker.Lock() defer t.locker.Unlock() t.Status = status } // func (t *BaseTask) buildTaskErrFromBatchErrList() (err error) { // if len(t.batchErrList) > 0 { // strList := make([]string, len(t.batchErrList)) // for k, v := range t.batchErrList { // strList[k] = v.Error() // } // return NewTaskError(t.Name, fmt.Errorf("总共:%d, 成功:%d, 失败:%d, 详情:\n%s", t.TotalItemCount, t.FinishedItemCount, t.FailedItemCount, strings.Join(strList, "\n"))) // } // return nil // } func (task *BaseTask) callWorker(worker func() (retVal interface{}, err error)) (retVal interface{}, err error) { defer func() { if r := recover(); r != nil { globals.SugarLogger.Errorf("callWorker panic:%v", r) err = fmt.Errorf("panic, r:%v", r) } }() return worker() } func (task *BaseTask) callWorker2(worker func() (retVal interface{}, successCount int, err error)) (retVal interface{}, successCount int, err error) { defer func() { if r := recover(); r != nil { globals.SugarLogger.Errorf("callWorker panic:%v", r) err = fmt.Errorf("panic, r:%v", r) } }() return worker() }