Files
jx-callback/business/jxutils/tasksch/task.go
gazebo 9781e85e4d - up
2019-03-22 15:26:03 +08:00

342 lines
8.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package tasksch
import (
"encoding/json"
"fmt"
"strings"
"sync"
"time"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
"git.rosy.net.cn/jx-callback/business/jxutils/msg"
"git.rosy.net.cn/jx-callback/globals"
)
const (
TaskStatusBegin = 0
TaskStatusWorking = 0
TaskStatusCanceling = 1
TaskStatusEndBegin = 2
TaskStatusFinished = 2
TaskStatusCanceled = 3
TaskStatusFailed = 4
TaskStatusEnd = 4
)
var (
TaskStatusName = map[int]string{
TaskStatusWorking: "运行中",
TaskStatusCanceling: "取消中",
TaskStatusFinished: "结束",
TaskStatusCanceled: "已取消",
TaskStatusFailed: "失败",
}
)
type TaskList []ITask
type ITask interface {
Run()
GetID() string
GetResult(duration time.Duration) (retVal []interface{}, err error)
Cancel()
GetTotalItemCount() int
GetFinishedItemCount() int
GetTotalJobCount() int
GetFinishedJobCount() int
GetStatus() int
GetCreatedAt() time.Time
GetCreatedBy() string
AddChild(task ITask) ITask
GetChildren() TaskList
SetParent(parentTask ITask)
json.Marshaler
}
type TaskError struct {
name string
errStr string
}
func (t *TaskError) MarshalJSON() ([]byte, error) {
return json.Marshal(t.Error())
}
func (t *TaskError) Error() string {
return fmt.Sprintf("任务[%s]执行失败,错误详情:\n%s", t.name, t.errStr)
}
func NewTaskError(name string, err error) *TaskError {
return &TaskError{
name: name,
errStr: err.Error(),
}
}
type BaseTask struct {
Name string `json:"name"`
ID string `json:"id"`
ParallelCount int `json:"parallelCount"`
BatchSize int `json:"batchSize"`
IsContinueWhenError bool `json:"isContinueWhenError"`
CreatedBy string `json:"createdBy"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
TerminatedAt time.Time `json:"terminatedAt"`
TotalItemCount int `json:"totalItemCount"`
TotalJobCount int `json:"totalJobCount"`
FinishedItemCount int `json:"finishedItemCount"`
FinishedJobCount int `json:"finishedJobCount"`
FailedItemCount int `json:"failedItemCount"`
FailedJobCount int `json:"failedJobCount"`
Status int `json:"status"`
Result []interface{} `json:"result"`
Children TaskList `json:"children"`
Err error `json:"err"`
detailErrMsgList []string
finishChan chan struct{}
C <-chan struct{} `json:"-"`
params []interface{}
quitChan chan int
locker sync.RWMutex
parent ITask
ctx *jxcontext.Context
isGetResultCalled bool
}
func (s TaskList) Len() int {
return len(s)
}
func (s TaskList) Less(i, j int) bool {
return s[i].GetCreatedAt().Sub(s[j].GetCreatedAt()) > 0
}
func (s TaskList) Swap(i, j int) {
tmp := s[i]
s[i] = s[j]
s[j] = tmp
}
func (t *BaseTask) Init(parallelCount, batchSize int, isContinueWhenError bool, params []interface{}, name string, ctx *jxcontext.Context, totalItemCount, totalJobCount int) {
t.ID = utils.GetUUID()
t.ParallelCount = parallelCount
t.BatchSize = batchSize
t.IsContinueWhenError = isContinueWhenError
t.params = params
t.Name = name
t.CreatedAt = time.Now()
t.ctx = ctx
t.CreatedBy = ctx.GetUserName()
t.UpdatedAt = t.CreatedAt
t.TerminatedAt = utils.DefaultTimeValue
t.TotalItemCount = totalItemCount
t.TotalJobCount = totalJobCount
t.quitChan = make(chan int)
t.finishChan = make(chan struct{})
t.Status = TaskStatusWorking
t.C = t.finishChan
}
func (t *BaseTask) GetID() string {
return t.ID
}
func (t *BaseTask) GetResult(duration time.Duration) (retVal []interface{}, err error) {
if t.GetStatus() >= TaskStatusEndBegin {
return t.Result, t.Err
}
if duration == 0 {
duration = time.Hour * 10000 // duration为0表示无限等待
}
timer := time.NewTimer(duration)
select {
case <-t.finishChan:
t.isGetResultCalled = true
timer.Stop()
return t.Result, t.Err
case <-timer.C:
}
return nil, ErrTaskNotFinished
}
func (t *BaseTask) GetCreatedAt() time.Time {
return t.CreatedAt
}
func (t *BaseTask) GetCreatedBy() string {
return t.CreatedBy
}
func (t *BaseTask) Cancel() {
t.locker.Lock()
if t.Status < TaskStatusEndBegin && t.Status != TaskStatusCanceling {
t.Status = TaskStatusCanceling
close(t.quitChan)
}
t.locker.Unlock()
children := t.GetChildren()
for _, subTask := range children {
subTask.Cancel()
}
}
func (t *BaseTask) GetTotalItemCount() int {
return t.TotalItemCount
}
func (t *BaseTask) GetFinishedItemCount() int {
t.locker.RLock()
defer t.locker.RUnlock()
return t.FinishedItemCount
}
func (t *BaseTask) GetTotalJobCount() int {
return t.TotalJobCount
}
func (t *BaseTask) GetFinishedJobCount() int {
t.locker.RLock()
defer t.locker.RUnlock()
return t.FinishedJobCount
}
func (t *BaseTask) GetStatus() int {
t.locker.RLock()
defer t.locker.RUnlock()
return t.Status
}
func (t *BaseTask) AddChild(task ITask) ITask {
t.locker.Lock()
defer t.locker.Unlock()
t.Children = append(t.Children, task)
return task
}
func (t *BaseTask) GetChildren() (children TaskList) {
t.locker.RLock()
defer t.locker.RUnlock()
if len(t.Children) > 0 {
children = make(TaskList, len(t.Children))
copy(children, t.Children)
}
return children
}
func (t *BaseTask) SetParent(parentTask ITask) {
t.locker.Lock()
defer t.locker.Unlock()
if t.parent != nil {
panic(fmt.Sprintf("task:%s already have parent!", utils.Format4Output(t, false)))
}
t.parent = parentTask
}
func AddChild(parentTask ITask, task ITask) ITask {
if parentTask != nil {
return parentTask.AddChild(task)
}
return task
}
func HandleTask(task, parentTask ITask, isMangeIt bool) ITask {
AddChild(parentTask, task)
if parentTask == nil && isMangeIt {
ManageTask(task)
}
return task
}
/////////
func (t *BaseTask) MarshalJSON() ([]byte, error) {
type tBaseTask BaseTask
t.locker.RLock()
defer t.locker.RUnlock()
return utils.MustMarshal((*tBaseTask)(t)), nil
}
func (t *BaseTask) run(taskHandler func()) {
if t.GetStatus() == TaskStatusBegin {
utils.CallFuncAsync(func() {
defer func() {
if r := recover(); r != nil {
globals.SugarLogger.Errorf("panic in BaseTask.run task:%s, task detail:%s, r:%v", t.Name, utils.Format4Output(t, false), r)
}
}()
taskHandler()
select {
case <-t.quitChan:
default:
close(t.quitChan)
}
for _, subTask := range t.Children {
if _, err := subTask.GetResult(0); err != nil {
globals.SugarLogger.Infof("BaseTask run, failed with error:%v", err)
}
}
close(t.finishChan)
time.Sleep(10 * time.Millisecond) // 等待GetResult中的isGetResultCalled赋值
globals.SugarLogger.Debugf("BaseTask task ID:%s, name:%s finished, isGetResultCalled:%t", t.ID, t.Name, t.isGetResultCalled)
if !t.isGetResultCalled && t.parent == nil && len(GetTasks(t.ID, TaskStatusBegin, TaskStatusEnd, 24, "")) > 0 {
if authInfo, err := t.ctx.GetV2AuthInfo(); err == nil { // 这里应该是不管登录类型,直接以可能的方式发消息
var content string
if t.Status == TaskStatusFinished {
content = fmt.Sprintf("你的异步任务[%s]执行成功完成", t.Name)
} else {
content = fmt.Sprintf("你的异步任务[%s]执行失败,%s", t.Name, t.Err.Error())
}
msg.SendUserMessage(authInfo.UserID, "异步任务完成", content)
}
}
})
}
}
func (t *BaseTask) finishedOneJob(itemCount int, err error) {
t.locker.Lock()
defer t.locker.Unlock()
t.UpdatedAt = time.Now()
if err == nil {
t.FinishedItemCount += itemCount
t.FinishedJobCount++
} else {
t.FailedItemCount += itemCount
t.FailedJobCount++
}
}
func (t *BaseTask) setStatus(status int) {
t.locker.Lock()
defer t.locker.Unlock()
t.Status = status
}
func (t *BaseTask) buildTaskErrFromDetail() (err error) {
if len(t.detailErrMsgList) > 0 {
return NewTaskError(t.Name, fmt.Errorf("设置了错误继续标志,部分操作失败,总任务数:%d失败数%d以下为详情\n%s", t.TotalItemCount, t.FailedItemCount, strings.Join(t.detailErrMsgList, "\n")))
}
return nil
}