Files
jx-callback/business/jxutils/dtask/dtask.go

169 lines
4.2 KiB
Go

package dtask
import (
"bytes"
"encoding/base64"
"encoding/gob"
"errors"
"reflect"
"time"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxcallback/scheduler/basesch"
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/model/dao"
)
var (
CurMan *DurableTaskMan
)
type DurableTaskMan struct {
objCreator func(objHint string) interface{}
tasks map[string]*DurableTask
}
type DurableTask struct {
cmdChan chan string
data *model.DurableTask
itemCount int
items []*model.DurableTaskItem
}
func Init(objCreator func(objHint string) interface{}, interfaceTypes ...interface{}) {
if CurMan == nil {
if objCreator == nil {
objCreator = defObjCreator
}
CurMan = &DurableTaskMan{
tasks: make(map[string]*DurableTask),
objCreator: objCreator,
}
for _, v := range interfaceTypes {
gob.Register(v)
}
}
}
func (m *DurableTaskMan) LoadPendingTask() (err error) {
db := dao.GetDB()
tasks := make([]*model.DurableTask, 0)
if err = dao.GetRows(db, &tasks, "SELECT * FROM durable_task WHERE status = 0"); err == nil {
for _, task := range tasks {
dTask := &DurableTask{
data: task,
}
m.tasks[task.TaskID] = dTask
return dao.GetRows(db, &dTask.items, "SELECT * FROM durable_task_item WHERE status = 0")
}
}
return err
}
func (m *DurableTaskMan) Start() error {
for _, task := range m.tasks {
m.StartTask(task.data.TaskID)
}
return nil
}
func (m *DurableTaskMan) AddTask(desc, createdBy string) (taskID string, err error) {
task := &DurableTask{}
task.data = &model.DurableTask{
TaskID: utils.GetUUID(),
Description: desc,
CreatedBy: createdBy,
Status: 0,
FinishedAt: utils.DefaultTimeValue,
}
if err = dao.CreateEntity(nil, task.data); err == nil {
m.tasks[task.data.TaskID] = task
return task.data.TaskID, nil
}
return "", err
}
func (m *DurableTaskMan) AddItem(taskID, objHint string, funcName string, params ...interface{}) (err error) {
d := m.tasks[taskID]
d.data.TotalItem++
item := &model.DurableTaskItem{
ObjHint: objHint,
FuncName: funcName,
TaskID: d.data.TaskID,
TaskIndex: d.data.TotalItem,
FinishedAt: utils.DefaultTimeValue,
}
if item.Params, err = SerializeData(params); err == nil {
db := dao.GetDB()
if err = dao.CreateEntity(db, item); err == nil {
err = dao.UpdateEntity(db, d.data, "TotalItem")
d.items = append(d.items, item)
}
}
d.data.TotalItem--
return err
}
func (m *DurableTaskMan) StartTask(taskID string) error {
d := m.tasks[taskID]
if d.cmdChan == nil {
d.cmdChan = make(chan string)
go func() {
failedItemCount := 0
for _, taskItem := range d.items {
if taskItem.Status == 0 {
obj := m.objCreator(taskItem.ObjHint)
objValue := reflect.ValueOf(obj)
func2Call := objValue.MethodByName(taskItem.FuncName)
params := []interface{}{}
DeSerializeData(taskItem.Params, &params)
valueParams := make([]reflect.Value, len(params))
for k, v := range params {
valueParams[k] = reflect.ValueOf(v)
}
if func2Call.Call(valueParams)[0].IsNil() {
taskItem.Status = 1
taskItem.FinishedAt = time.Now()
taskItem.UpdatedAt = taskItem.FinishedAt
dao.UpdateEntity(nil, taskItem, "Status", "FinishedAt", "UpdatedAt")
} else {
failedItemCount++
}
}
}
if failedItemCount == 0 {
d.data.Status = 1
d.data.FinishedAt = time.Now()
d.data.UpdatedAt = d.data.FinishedAt
dao.UpdateEntity(nil, d.data, "Status", "FinishedAt", "UpdatedAt")
}
}()
return nil
}
return errors.New("任务已经启动")
}
func defObjCreator(objHint string) interface{} {
return basesch.FixedBaseScheduler.GetPurchasePlatformFromVendorID(0)
}
func SerializeData(data interface{}) (strValue string, err error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
if err = enc.Encode(data); err == nil {
strValue = base64.StdEncoding.EncodeToString(buf.Bytes())
return strValue, nil
}
return "", err
}
func DeSerializeData(strValue string, dataPtr interface{}) (err error) {
byteData, err := base64.StdEncoding.DecodeString(strValue)
if err == nil {
dec := gob.NewDecoder(bytes.NewReader(byteData))
return dec.Decode(dataPtr)
}
return err
}