Files
jx-callback/business/jxutils/tasksch/task.go
2019-08-01 20:07:39 +08:00

479 lines
12 KiB
Go

package tasksch
import (
"encoding/json"
"fmt"
"sync"
"time"
"git.rosy.net.cn/baseapi/platformapi/dingdingapi"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
"git.rosy.net.cn/jx-callback/business/jxutils/msg"
"git.rosy.net.cn/jx-callback/globals"
)
const (
TaskStatusBegin = 0
TaskStatusWorking = 0
TaskStatusCanceling = 1
TaskStatusEndBegin = 2
TaskStatusFinished = 2
TaskStatusCanceled = 3
TaskStatusFailed = 4
TaskStatusEnd = 4
)
const (
MaxTaskNameLen = 50
)
var (
TaskStatusName = map[int]string{
TaskStatusWorking: "运行中",
TaskStatusCanceling: "取消中",
TaskStatusFinished: "结束",
TaskStatusCanceled: "已取消",
TaskStatusFailed: "失败",
}
)
type TaskList []ITask
type ITask interface {
Run()
GetID() string
GetResult(duration time.Duration) (retVal []interface{}, err error)
Cancel()
GetTotalItemCount() int
GetFinishedItemCount() int
GetTotalJobCount() int
GetFinishedJobCount() int
GetStatus() int
GetCreatedAt() time.Time
GetCreatedBy() string
AddChild(task ITask) ITask
GetChildren() TaskList
SetParent(parentTask ITask)
// GetOriginalErr() error
GetErr() error
// GetDetailErrList() []error
GetLeafResult() (finishedItemCount, failedItemCount int)
json.Marshaler
}
// type TaskError struct {
// name string
// errStr string
// }
// func (t *TaskError) MarshalJSON() ([]byte, error) {
// return json.Marshal(t.Error())
// }
// func (t *TaskError) Error() string {
// return fmt.Sprintf("[%s], 错误:%s", t.name, t.errStr)
// }
// func (t *TaskError) String() string {
// return t.Error()
// }
// func NewTaskError(name string, err error) *TaskError {
// return &TaskError{
// name: name,
// errStr: err.Error(),
// }
// }
type BaseTask struct {
Name string `json:"name"`
ID string `json:"id"`
ParallelCount int `json:"parallelCount"`
BatchSize int `json:"batchSize"`
IsContinueWhenError bool `json:"isContinueWhenError"`
CreatedBy string `json:"createdBy"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
TerminatedAt time.Time `json:"terminatedAt"`
TotalItemCount int `json:"totalItemCount"`
TotalJobCount int `json:"totalJobCount"`
FinishedItemCount int `json:"finishedItemCount"`
FinishedJobCount int `json:"finishedJobCount"`
FailedItemCount int `json:"failedItemCount"`
FailedJobCount int `json:"failedJobCount"`
Status int `json:"status"`
NoticeMsg string `json:"noticeMsg"`
Result []interface{} `json:"-"`
Children TaskList `json:"children"`
Err string `json:"err"`
mainErr error
batchErrList []error
finishChan chan struct{}
C <-chan struct{} `json:"-"`
params []interface{}
quitChan chan int
locker sync.RWMutex
parent ITask
ctx *jxcontext.Context
isGetResultCalled bool
}
func (s TaskList) Len() int {
return len(s)
}
func (s TaskList) Less(i, j int) bool {
return s[i].GetCreatedAt().Sub(s[j].GetCreatedAt()) > 0
}
func (s TaskList) Swap(i, j int) {
tmp := s[i]
s[i] = s[j]
s[j] = tmp
}
func (t *BaseTask) Init(parallelCount, batchSize int, isContinueWhenError bool, params []interface{}, name string, ctx *jxcontext.Context, totalItemCount, totalJobCount int) {
t.ID = utils.GetUUID()
t.ParallelCount = parallelCount
t.BatchSize = batchSize
t.IsContinueWhenError = isContinueWhenError
t.params = params
t.Name = utils.LimitUTF8StringLen(name, MaxTaskNameLen)
t.CreatedAt = time.Now()
t.ctx = ctx
t.CreatedBy = ctx.GetUserName()
t.UpdatedAt = t.CreatedAt
t.TerminatedAt = utils.DefaultTimeValue
t.TotalItemCount = totalItemCount
t.TotalJobCount = totalJobCount
t.quitChan = make(chan int)
t.finishChan = make(chan struct{})
t.Status = TaskStatusWorking
t.C = t.finishChan
}
func (t *BaseTask) GetID() string {
return t.ID
}
func (t *BaseTask) GetResult(duration time.Duration) (retVal []interface{}, err error) {
if t.GetStatus() >= TaskStatusEndBegin {
return t.getResult(), t.GetErr()
}
if duration == 0 {
duration = time.Hour * 10000 // duration为0表示无限等待
}
timer := time.NewTimer(duration)
select {
case <-t.finishChan:
t.isGetResultCalled = true
timer.Stop()
return t.getResult(), t.GetErr()
case <-timer.C:
}
return nil, ErrTaskNotFinished
}
func (t *BaseTask) GetCreatedAt() time.Time {
return t.CreatedAt
}
func (t *BaseTask) GetCreatedBy() string {
return t.CreatedBy
}
func (t *BaseTask) Cancel() {
t.locker.Lock()
if t.Status < TaskStatusEndBegin && t.Status != TaskStatusCanceling {
t.Status = TaskStatusCanceling
close(t.quitChan)
}
t.locker.Unlock()
children := t.GetChildren()
for _, subTask := range children {
subTask.Cancel()
}
}
func (t *BaseTask) GetTotalItemCount() int {
return t.TotalItemCount
}
func (t *BaseTask) GetFinishedItemCount() int {
t.locker.RLock()
defer t.locker.RUnlock()
return t.FinishedItemCount
}
func (t *BaseTask) GetTotalJobCount() int {
return t.TotalJobCount
}
func (t *BaseTask) GetFinishedJobCount() int {
t.locker.RLock()
defer t.locker.RUnlock()
return t.FinishedJobCount
}
func (t *BaseTask) GetStatus() int {
t.locker.RLock()
defer t.locker.RUnlock()
return t.Status
}
func (t *BaseTask) AddChild(task ITask) ITask {
t.locker.Lock()
defer t.locker.Unlock()
t.Children = append(t.Children, task)
return task
}
func (t *BaseTask) GetChildren() (children TaskList) {
t.locker.RLock()
defer t.locker.RUnlock()
if len(t.Children) > 0 {
children = make(TaskList, len(t.Children))
copy(children, t.Children)
}
return children
}
func (t *BaseTask) SetParent(parentTask ITask) {
t.locker.Lock()
defer t.locker.Unlock()
if t.parent != nil {
panic(fmt.Sprintf("task:%s already have parent!", utils.Format4Output(t, false)))
}
t.parent = parentTask
}
func (t *BaseTask) SetNoticeMsg(noticeMsg string) {
t.locker.Lock()
defer t.locker.Unlock()
t.NoticeMsg = noticeMsg
}
func (t *BaseTask) GetNoticeMsg() string {
t.locker.RLock()
defer t.locker.RUnlock()
return t.NoticeMsg
}
func (t *BaseTask) getResult() []interface{} {
t.locker.RLock()
defer t.locker.RUnlock()
return t.Result
}
// func (t *BaseTask) GetOriginalErr() error {
// t.locker.RLock()
// defer t.locker.RUnlock()
// return nil
// }
func (t *BaseTask) GetErr() error {
t.locker.RLock()
defer t.locker.RUnlock()
if t.mainErr == nil && len(t.batchErrList) == 0 {
return nil
}
return t
}
func (t *BaseTask) GetLeafResult() (finishedItemCount, failedItemCount int) {
if len(t.Children) == 0 {
return t.FinishedItemCount, t.FailedItemCount
}
for _, v := range t.Children {
subFinishedItemCount, subFailedItemCount := v.GetLeafResult()
finishedItemCount += subFinishedItemCount
failedItemCount += subFailedItemCount
}
return finishedItemCount, failedItemCount
}
func (t *BaseTask) Error() (errMsg string) {
if t.GetErr() == nil {
return ""
}
t.locker.RLock()
errMsg = t.Err
t.locker.RUnlock()
if errMsg != "" {
return errMsg
}
errMsg = "任务:" + t.Name
if t.parent == nil {
finishedItemCount, failedItemCount := t.GetLeafResult()
errMsg += fmt.Sprintf(", 全部总共:%d, 成功:%d, 失败:%d,\n", (finishedItemCount + failedItemCount), finishedItemCount, failedItemCount)
}
if t.mainErr != nil {
errMsg += "失败"
errMsg += "," + t.mainErr.Error()
} else {
errMsg += fmt.Sprintf("部分失败, 总共:%d, 成功:%d, 失败:%d, 详情如下:\n", t.TotalItemCount, t.FinishedItemCount, t.FailedItemCount)
for _, v := range t.batchErrList {
errMsg += fmt.Sprintf("%s,\n", v.Error())
}
}
t.locker.Lock()
t.Err = errMsg
t.locker.Unlock()
return errMsg
}
// func (t *BaseTask) GetDetailErrList() []error {
// t.locker.RLock()
// defer t.locker.RUnlock()
// return t.batchErrList
// }
func AddChild(parentTask ITask, task ITask) ITask {
if parentTask != nil {
return parentTask.AddChild(task)
}
return task
}
func HandleTask(task, parentTask ITask, isMangeIt bool) ITask {
AddChild(parentTask, task)
if parentTask == nil && isMangeIt {
ManageTask(task)
}
return task
}
/////////
func (t *BaseTask) MarshalJSON() ([]byte, error) {
type tBaseTask BaseTask
t.locker.RLock()
defer t.locker.RUnlock()
return utils.MustMarshal((*tBaseTask)(t)), nil
}
func (t *BaseTask) run(taskHandler func()) {
if t.GetStatus() == TaskStatusBegin {
utils.CallFuncAsync(func() {
defer func() {
if r := recover(); r != nil {
globals.SugarLogger.Errorf("panic in BaseTask.run task:%s, task detail:%s, r:%v", t.Name, utils.Format4Output(t, false), r)
}
}()
taskHandler()
task := t
task.locker.Lock()
if task.mainErr != nil { // 如果有错误,肯定就是失败了
task.Status = TaskStatusFailed
} else {
if task.FinishedJobCount+task.FailedJobCount < task.TotalJobCount {
task.mainErr = ErrTaskIsCanceled
task.Status = TaskStatusCanceled
} else {
task.Status = TaskStatusFinished
}
}
task.TerminatedAt = time.Now()
task.locker.Unlock()
task.Error()
globals.SugarLogger.Debugf("Task:%s, mainErr:%v, batchErrList:%v", task.Name, task.mainErr, task.batchErrList)
select {
case <-t.quitChan:
default:
close(t.quitChan)
}
// todo 如下代码可能有对t.Children操作的并发问题
for _, subTask := range t.Children {
if _, err := subTask.GetResult(0); err != nil {
globals.SugarLogger.Infof("BaseTask run, failed with error:%v", err)
}
}
close(t.finishChan)
time.Sleep(10 * time.Millisecond) // 等待GetResult中的isGetResultCalled赋值
globals.SugarLogger.Debugf("BaseTask task ID:%s, name:%s finished, isGetResultCalled:%t", t.ID, t.Name, t.isGetResultCalled)
if !t.isGetResultCalled && t.parent == nil && len(GetTasks(t.ID, TaskStatusBegin, TaskStatusEnd, 24, "")) > 0 {
if authInfo, err := t.ctx.GetV2AuthInfo(); err == nil { // 这里应该是不管登录类型,直接以可能的方式发消息
var content string
taskDesc := fmt.Sprintf("你的异步任务[%s],ID[%s],开始于:%s,结束于:%s,", t.Name, t.ID, utils.Time2Str(t.CreatedAt), utils.Time2Str(t.TerminatedAt))
if t.mainErr == nil {
content = fmt.Sprintf("%s执行%s", taskDesc, TaskStatusName[t.Status])
noticeMsg := t.GetNoticeMsg()
if noticeMsg != "" {
content += ",通知消息:" + noticeMsg
}
} else {
content = t.Error()
}
msg.SendUserMessage(dingdingapi.MsgTyeText, authInfo.UserID, "异步任务完成", content)
}
}
})
}
}
func (t *BaseTask) finishedOneJob(itemCount int, err error) {
t.locker.Lock()
defer t.locker.Unlock()
t.UpdatedAt = time.Now()
if err == nil {
t.FinishedItemCount += itemCount
t.FinishedJobCount++
} else {
t.FailedItemCount += itemCount
t.FailedJobCount++
}
}
func (t *BaseTask) setStatus(status int) {
t.locker.Lock()
defer t.locker.Unlock()
t.Status = status
}
// func (t *BaseTask) buildTaskErrFromBatchErrList() (err error) {
// if len(t.batchErrList) > 0 {
// strList := make([]string, len(t.batchErrList))
// for k, v := range t.batchErrList {
// strList[k] = v.Error()
// }
// return NewTaskError(t.Name, fmt.Errorf("总共:%d, 成功:%d, 失败:%d, 详情:\n%s", t.TotalItemCount, t.FinishedItemCount, t.FailedItemCount, strings.Join(strList, "\n")))
// }
// return nil
// }
func (task *BaseTask) callWorker(worker func() (retVal interface{}, err error)) (retVal interface{}, err error) {
defer func() {
if r := recover(); r != nil {
globals.SugarLogger.Errorf("callWorker panic:%v", r)
err = fmt.Errorf("panic, r:%v", r)
}
}()
retVal, err = worker()
return retVal, err
}