From af5038e8a75712a4d68d0b911c4230c13d1e341e Mon Sep 17 00:00:00 2001 From: gazebo Date: Sat, 15 Sep 2018 18:53:39 +0800 Subject: [PATCH] - task schedule added. --- business/jxutils/jxutils_cms.go | 21 ++ business/jxutils/jxutils_cms_test.go | 24 +++ business/jxutils/tasksch/task.go | 236 ++++++++++++++++++++++ business/jxutils/tasksch/task_man.go | 43 ++++ business/jxutils/tasksch/task_man_test.go | 49 +++++ business/jxutils/tasksch/task_test.go | 70 +++++++ globals/beegodb/beegodb.go | 2 +- 7 files changed, 444 insertions(+), 1 deletion(-) create mode 100644 business/jxutils/jxutils_cms_test.go create mode 100644 business/jxutils/tasksch/task.go create mode 100644 business/jxutils/tasksch/task_man.go create mode 100644 business/jxutils/tasksch/task_man_test.go create mode 100644 business/jxutils/tasksch/task_test.go diff --git a/business/jxutils/jxutils_cms.go b/business/jxutils/jxutils_cms.go index 33f2a4a42..cb28b7c98 100644 --- a/business/jxutils/jxutils_cms.go +++ b/business/jxutils/jxutils_cms.go @@ -7,3 +7,24 @@ func MergeStoreStatus(status int, vendorStatus int) int { } return vendorStatus } + +func SplitSlice(list []interface{}, batchCount int) (listInList [][]interface{}) { + len := len(list) + if len > 0 { + listInListLen := (len-1)/batchCount + 1 + listInList = make([][]interface{}, listInListLen) + index := 0 + for i := 0; i < len; i++ { + if i%batchCount == 0 { + index = i / batchCount + arrLen := len - i + if arrLen > batchCount { + arrLen = batchCount + } + listInList[index] = make([]interface{}, arrLen) + } + listInList[index][i%batchCount] = list[i] + } + } + return listInList +} diff --git a/business/jxutils/jxutils_cms_test.go b/business/jxutils/jxutils_cms_test.go new file mode 100644 index 000000000..255270254 --- /dev/null +++ b/business/jxutils/jxutils_cms_test.go @@ -0,0 +1,24 @@ +package jxutils + +import ( + "testing" + + "git.rosy.net.cn/baseapi/utils" +) + +func TestSplitSlice(t *testing.T) { + testValue1 := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} + list := utils.Interface2Slice(testValue1) + result := SplitSlice(list, 3) + if !(len(result) == 4 && len(result[3]) == 1) { + t.Log("result is not ok") + } + + testValue2 := []string{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"} + list = utils.Interface2Slice(testValue2) + result = SplitSlice(list, 1) + if !(len(result) == 10 && len(result[3]) == 1) { + t.Log("result is not ok") + } + t.Log(result) +} diff --git a/business/jxutils/tasksch/task.go b/business/jxutils/tasksch/task.go new file mode 100644 index 000000000..a8b5cf21b --- /dev/null +++ b/business/jxutils/tasksch/task.go @@ -0,0 +1,236 @@ +package tasksch + +import ( + "errors" + "sync" + "time" + + "git.rosy.net.cn/baseapi/utils" + "git.rosy.net.cn/jx-callback/business/jxutils" +) + +const ( + TaskStatusBegin = 0 + TaskStatusWorking = 0 + TaskStatusCanceling = 1 + + TaskStatusEndBegin = 2 + TaskStatusFinished = 2 + TaskStatusCanceled = 3 + TaskStatusFailed = 4 + TaskStatusEnd = 4 +) + +type WorkFunc func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) +type ResultHandlerFunc func(taskName string, result []interface{}, err error) + +type Task struct { + ID string `json:"id"` + Name string `json:"name"` + CreatedBy string `json:"createdBy"` + CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` + TerminatedAt time.Time `json:"terminatedAt"` + ParallelCount int `json:"parallelCount"` + TotalItemCount int `json:"totalItemCount"` + TotalJobCount int `json:"totalJobCount"` + FinishedItemCount int `json:"finishedItemCount"` + FinishedJobCount int `json:"finishedJobCount"` + Status int `json:"status"` + + C <-chan int `json:"-"` + + taskChan chan []interface{} + quitChan chan int + subFinishChan chan interface{} + finishChan chan int + + locker sync.RWMutex + result []interface{} + err error +} + +type TaskList []*Task + +func (s TaskList) Len() int { + return len(s) +} + +func (s TaskList) Less(i, j int) bool { + return s[i].CreatedAt.Sub(s[j].CreatedAt) < 0 +} + +func (s TaskList) Swap(i, j int) { + tmp := s[i] + s[i] = s[j] + s[j] = tmp +} + +var ( + ErrTaskNotFinished = errors.New("任务还未完成") + ErrTaskIsCanceled = errors.New("任务被取消了") +) + +func RunTask(taskName string, worker WorkFunc, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, itemList interface{}, params ...interface{}) *Task { + realItemList := utils.Interface2Slice(itemList) + jobList := jxutils.SplitSlice(realItemList, batchSize) + task := &Task{ + ID: utils.GetUUID(), + Name: taskName, + CreatedAt: time.Now(), + CreatedBy: userName, + UpdatedAt: time.Now(), + TotalJobCount: len(jobList), + TotalItemCount: len(realItemList), + ParallelCount: parallelCount, + taskChan: make(chan []interface{}, parallelCount*100), + quitChan: make(chan int, parallelCount), + subFinishChan: make(chan interface{}, parallelCount), + finishChan: make(chan int, 2), + Status: TaskStatusWorking, + } + task.C = task.finishChan + go func() { + for i := 0; i < parallelCount; i++ { + go func() { + var chanRetVal interface{} + retVal := make([]interface{}, 0) + for { + select { + case <-task.quitChan: + chanRetVal = retVal + goto end + case job := <-task.taskChan: + if job == nil { + chanRetVal = retVal + goto end + } else { + if result, err := worker(job, params...); err == nil { + task.finishedOneJob(len(job)) + if result != nil { + retVal = append(retVal, utils.Interface2Slice(result)...) + } + } else { + chanRetVal = err + task.Cancel() + task.setStatus(TaskStatusFailed) + } + } + } + } + end: + task.subFinishChan <- chanRetVal + }() + } + for _, job := range jobList { + task.taskChan <- job + } + for i := 0; i < parallelCount; i++ { + task.taskChan <- nil + } + + task.result = make([]interface{}, 0) + for i := 0; i < parallelCount; i++ { + result := <-task.subFinishChan + if err2, ok := result.(error); ok { + task.result = nil + task.err = err2 + break + } else { + resultList := result.([]interface{}) + task.result = append(task.result, resultList...) + } + } + if task.GetStatus() != TaskStatusFailed { + if len(task.taskChan) > 0 { + task.err = ErrTaskIsCanceled + task.setStatus(TaskStatusCanceled) + } else { + task.setStatus(TaskStatusFinished) + } + } + if resultHandler != nil { + resultHandler(taskName, task.result, task.err) + } + }() + return task +} + +func (t *Task) GetResult(duration time.Duration) (retVal []interface{}, err error) { + if t.GetStatus() >= TaskStatusEndBegin { + return t.result, t.err + } + if duration == 0 { + duration = time.Hour * 10000 // duration为0表示无限等待 + } + timer := time.NewTimer(duration) + select { + case <-t.finishChan: + timer.Stop() + return t.result, t.err + case <-timer.C: + } + return nil, ErrTaskNotFinished +} + +func (t *Task) Cancel() { + if t.GetStatus() < TaskStatusEndBegin { + for i := 0; i < t.ParallelCount; i++ { + t.quitChan <- 0 + } + t.setStatus(TaskStatusCanceling) + } +} + +func (t *Task) GetTotalItemCount() int { + return t.TotalItemCount +} + +func (t *Task) GetFinishedItemCount() int { + t.locker.RLock() + defer t.locker.RUnlock() + + return t.FinishedItemCount +} + +func (t *Task) GetTotalJobCount() int { + return t.TotalJobCount +} + +func (t *Task) GetFinishedJobCount() int { + t.locker.RLock() + defer t.locker.RUnlock() + + return t.FinishedJobCount +} + +func (t *Task) GetStatus() int { + t.locker.RLock() + defer t.locker.RUnlock() + + return t.Status +} + +///////// + +func (t *Task) finishedOneJob(itemCount int) { + t.locker.Lock() + defer t.locker.Unlock() + + t.UpdatedAt = time.Now() + t.FinishedItemCount += itemCount + t.FinishedJobCount++ +} + +func (t *Task) setStatus(status int) { + t.locker.Lock() + defer t.locker.Unlock() + + t.Status = status + if status >= TaskStatusEndBegin { + t.TerminatedAt = time.Now() + close(t.finishChan) + close(t.subFinishChan) + close(t.quitChan) + } +} diff --git a/business/jxutils/tasksch/task_man.go b/business/jxutils/tasksch/task_man.go new file mode 100644 index 000000000..41cefbf0e --- /dev/null +++ b/business/jxutils/tasksch/task_man.go @@ -0,0 +1,43 @@ +package tasksch + +import ( + "sort" + "time" +) + +var ( + defTaskMan TaskMan +) + +type TaskMan struct { + taskList map[string]*Task +} + +func init() { + defTaskMan.taskList = make(map[string]*Task) +} + +func (m *TaskMan) RunTask(taskName string, worker WorkFunc, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, itemList interface{}, params ...interface{}) *Task { + task := RunTask(taskName, worker, resultHandler, parallelCount, batchSize, userName, itemList, params...) + m.taskList[task.ID] = task + return task +} + +func (m *TaskMan) GetTasks(taskID string, fromStatus, toStatus int, lastHours int) (taskList []*Task) { + lastTime := time.Now().Add(time.Duration(-lastHours) * time.Hour).Unix() + for k, v := range m.taskList { + if !((taskID != "" && taskID != k) || v.Status < fromStatus || v.Status > toStatus || v.CreatedAt.Unix() < lastTime) { + taskList = append(taskList, v) + } + } + sort.Sort(TaskList(taskList)) + return taskList +} + +func RunManagedTask(taskName string, worker WorkFunc, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, itemList interface{}, params ...interface{}) *Task { + return defTaskMan.RunTask(taskName, worker, resultHandler, parallelCount, batchSize, userName, itemList, params...) +} + +func GetTasks(taskID string, fromStatus, toStatus int, lastHours int) (taskList []*Task) { + return defTaskMan.GetTasks(taskID, fromStatus, toStatus, lastHours) +} diff --git a/business/jxutils/tasksch/task_man_test.go b/business/jxutils/tasksch/task_man_test.go new file mode 100644 index 000000000..7bc639371 --- /dev/null +++ b/business/jxutils/tasksch/task_man_test.go @@ -0,0 +1,49 @@ +package tasksch + +import ( + "math/rand" + "testing" + "time" + + "git.rosy.net.cn/baseapi/utils" +) + +func TestTaskMan(t *testing.T) { + itemList := make([]int, 100) + for k := range itemList { + itemList[k] = k + } + task1 := RunManagedTask("test", func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + sleepSecond := rand.Intn(5) + t.Logf("sleep %d seconds", sleepSecond) + time.Sleep(time.Duration(sleepSecond) * time.Second) + retSlice := make([]string, len(batchItemList)) + for k := range retSlice { + retSlice[k] = "hello:" + utils.Int2Str(batchItemList[k].(int)*2) + } + return retSlice, nil + }, func(taskName string, result []interface{}, err error) { + // t.Log("finished here") + // t.Log(utils.Format4Output(result, false)) + }, 100, 7, "autotest", itemList, "a", "b", 1, 2) + + task2 := RunManagedTask("test", func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + sleepSecond := rand.Intn(5) + t.Logf("sleep %d seconds", sleepSecond) + time.Sleep(time.Duration(sleepSecond) * time.Second) + retSlice := make([]string, len(batchItemList)) + for k := range retSlice { + retSlice[k] = "hello:" + utils.Int2Str(batchItemList[k].(int)*2) + } + return retSlice, nil + }, func(taskName string, result []interface{}, err error) { + // t.Log("finished here") + // t.Log(utils.Format4Output(result, false)) + }, 100, 7, "autotest", itemList, "a", "b", 1, 2) + time.Sleep(2 * time.Second) + task2.Cancel() + if task1.GetStatus() == task2.GetStatus() { + t.Log(utils.Format4Output(GetTasks("", TaskStatusBegin, TaskStatusEnd, 5), false)) + t.Fatal("status should not be same") + } +} diff --git a/business/jxutils/tasksch/task_test.go b/business/jxutils/tasksch/task_test.go new file mode 100644 index 000000000..42077f624 --- /dev/null +++ b/business/jxutils/tasksch/task_test.go @@ -0,0 +1,70 @@ +package tasksch + +import ( + "math/rand" + "testing" + "time" + + "git.rosy.net.cn/baseapi/utils" +) + +func TestRunTask(t *testing.T) { + itemList := make([]int, 100) + for k := range itemList { + itemList[k] = k + } + task := RunTask("test", func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + sleepSecond := rand.Intn(5) + t.Logf("sleep %d seconds", sleepSecond) + time.Sleep(time.Duration(sleepSecond) * time.Second) + retSlice := make([]string, len(batchItemList)) + for k := range retSlice { + retSlice[k] = "hello:" + utils.Int2Str(batchItemList[k].(int)*2) + } + return retSlice, nil + }, func(taskName string, result []interface{}, err error) { + // t.Log("finished here") + // t.Log(utils.Format4Output(result, false)) + }, 100, 7, "autotest", itemList, "a", "b", 1, 2) + result, err := task.GetResult(1 * time.Microsecond) + if err == nil || task.GetStatus() != TaskStatusWorking { + t.Fatal("task can not be done in 1 microsecond") + } + result, err = task.GetResult(0) + if err != nil { + t.Fatal(err) + } + if len(result) != len(itemList) { + t.Log(utils.Format4Output(result, false)) + t.Fatal("result size doesn't match with itemList") + } + t.Log(task.GetStatus()) +} + +func TestCancelTask(t *testing.T) { + itemList := make([]int, 100) + for k := range itemList { + itemList[k] = k + } + task := RunTask("test", func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + sleepSecond := rand.Intn(5) + t.Logf("sleep %d seconds", sleepSecond) + time.Sleep(time.Duration(sleepSecond) * time.Second) + retSlice := make([]string, len(batchItemList)) + for k := range retSlice { + retSlice[k] = "hello:" + utils.Int2Str(batchItemList[k].(int)*2) + } + return retSlice, nil + }, func(taskName string, result []interface{}, err error) { + // t.Log("finished here") + // t.Log(utils.Format4Output(result, false)) + }, 100, 7, "autotest", itemList, "a", "b", 1, 2) + // time.Sleep(time.Second * 6) + t.Logf("finishedItemCount:%d, finishedJobCount:%d", task.GetFinishedItemCount(), task.GetFinishedJobCount()) + task.Cancel() + _, err := task.GetResult(0) + if err != ErrTaskIsCanceled { + t.Fatal("task should in canceled status") + } + // t.Log(utils.Format4Output(result, false)) +} diff --git a/globals/beegodb/beegodb.go b/globals/beegodb/beegodb.go index b4fa0ccef..b25e949e3 100644 --- a/globals/beegodb/beegodb.go +++ b/globals/beegodb/beegodb.go @@ -32,7 +32,7 @@ func Init() { // db.Set("gorm:table_options", "CHARSET=utf8mb4").AutoMigrate(&model.SkuCategory{}) orm.RegisterModel(&model.SkuCategory{}) orm.RegisterModel(&model.WeiXins{}, &model.JxBackendUser{}) - orm.RegisterModel(&model.DurableTask{}, &model.DurableTaskItem{}) + // orm.RegisterModel(&model.DurableTask{}, &model.DurableTaskItem{}) } // create table orm.RunSyncdb("default", false, true)