- task schedule added.

This commit is contained in:
gazebo
2018-09-15 18:53:39 +08:00
parent b2d365e36f
commit af5038e8a7
7 changed files with 444 additions and 1 deletions

View File

@@ -7,3 +7,24 @@ func MergeStoreStatus(status int, vendorStatus int) int {
}
return vendorStatus
}
func SplitSlice(list []interface{}, batchCount int) (listInList [][]interface{}) {
len := len(list)
if len > 0 {
listInListLen := (len-1)/batchCount + 1
listInList = make([][]interface{}, listInListLen)
index := 0
for i := 0; i < len; i++ {
if i%batchCount == 0 {
index = i / batchCount
arrLen := len - i
if arrLen > batchCount {
arrLen = batchCount
}
listInList[index] = make([]interface{}, arrLen)
}
listInList[index][i%batchCount] = list[i]
}
}
return listInList
}

View File

@@ -0,0 +1,24 @@
package jxutils
import (
"testing"
"git.rosy.net.cn/baseapi/utils"
)
func TestSplitSlice(t *testing.T) {
testValue1 := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
list := utils.Interface2Slice(testValue1)
result := SplitSlice(list, 3)
if !(len(result) == 4 && len(result[3]) == 1) {
t.Log("result is not ok")
}
testValue2 := []string{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"}
list = utils.Interface2Slice(testValue2)
result = SplitSlice(list, 1)
if !(len(result) == 10 && len(result[3]) == 1) {
t.Log("result is not ok")
}
t.Log(result)
}

View File

@@ -0,0 +1,236 @@
package tasksch
import (
"errors"
"sync"
"time"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxutils"
)
const (
TaskStatusBegin = 0
TaskStatusWorking = 0
TaskStatusCanceling = 1
TaskStatusEndBegin = 2
TaskStatusFinished = 2
TaskStatusCanceled = 3
TaskStatusFailed = 4
TaskStatusEnd = 4
)
type WorkFunc func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error)
type ResultHandlerFunc func(taskName string, result []interface{}, err error)
type Task struct {
ID string `json:"id"`
Name string `json:"name"`
CreatedBy string `json:"createdBy"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
TerminatedAt time.Time `json:"terminatedAt"`
ParallelCount int `json:"parallelCount"`
TotalItemCount int `json:"totalItemCount"`
TotalJobCount int `json:"totalJobCount"`
FinishedItemCount int `json:"finishedItemCount"`
FinishedJobCount int `json:"finishedJobCount"`
Status int `json:"status"`
C <-chan int `json:"-"`
taskChan chan []interface{}
quitChan chan int
subFinishChan chan interface{}
finishChan chan int
locker sync.RWMutex
result []interface{}
err error
}
type TaskList []*Task
func (s TaskList) Len() int {
return len(s)
}
func (s TaskList) Less(i, j int) bool {
return s[i].CreatedAt.Sub(s[j].CreatedAt) < 0
}
func (s TaskList) Swap(i, j int) {
tmp := s[i]
s[i] = s[j]
s[j] = tmp
}
var (
ErrTaskNotFinished = errors.New("任务还未完成")
ErrTaskIsCanceled = errors.New("任务被取消了")
)
func RunTask(taskName string, worker WorkFunc, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, itemList interface{}, params ...interface{}) *Task {
realItemList := utils.Interface2Slice(itemList)
jobList := jxutils.SplitSlice(realItemList, batchSize)
task := &Task{
ID: utils.GetUUID(),
Name: taskName,
CreatedAt: time.Now(),
CreatedBy: userName,
UpdatedAt: time.Now(),
TotalJobCount: len(jobList),
TotalItemCount: len(realItemList),
ParallelCount: parallelCount,
taskChan: make(chan []interface{}, parallelCount*100),
quitChan: make(chan int, parallelCount),
subFinishChan: make(chan interface{}, parallelCount),
finishChan: make(chan int, 2),
Status: TaskStatusWorking,
}
task.C = task.finishChan
go func() {
for i := 0; i < parallelCount; i++ {
go func() {
var chanRetVal interface{}
retVal := make([]interface{}, 0)
for {
select {
case <-task.quitChan:
chanRetVal = retVal
goto end
case job := <-task.taskChan:
if job == nil {
chanRetVal = retVal
goto end
} else {
if result, err := worker(job, params...); err == nil {
task.finishedOneJob(len(job))
if result != nil {
retVal = append(retVal, utils.Interface2Slice(result)...)
}
} else {
chanRetVal = err
task.Cancel()
task.setStatus(TaskStatusFailed)
}
}
}
}
end:
task.subFinishChan <- chanRetVal
}()
}
for _, job := range jobList {
task.taskChan <- job
}
for i := 0; i < parallelCount; i++ {
task.taskChan <- nil
}
task.result = make([]interface{}, 0)
for i := 0; i < parallelCount; i++ {
result := <-task.subFinishChan
if err2, ok := result.(error); ok {
task.result = nil
task.err = err2
break
} else {
resultList := result.([]interface{})
task.result = append(task.result, resultList...)
}
}
if task.GetStatus() != TaskStatusFailed {
if len(task.taskChan) > 0 {
task.err = ErrTaskIsCanceled
task.setStatus(TaskStatusCanceled)
} else {
task.setStatus(TaskStatusFinished)
}
}
if resultHandler != nil {
resultHandler(taskName, task.result, task.err)
}
}()
return task
}
func (t *Task) GetResult(duration time.Duration) (retVal []interface{}, err error) {
if t.GetStatus() >= TaskStatusEndBegin {
return t.result, t.err
}
if duration == 0 {
duration = time.Hour * 10000 // duration为0表示无限等待
}
timer := time.NewTimer(duration)
select {
case <-t.finishChan:
timer.Stop()
return t.result, t.err
case <-timer.C:
}
return nil, ErrTaskNotFinished
}
func (t *Task) Cancel() {
if t.GetStatus() < TaskStatusEndBegin {
for i := 0; i < t.ParallelCount; i++ {
t.quitChan <- 0
}
t.setStatus(TaskStatusCanceling)
}
}
func (t *Task) GetTotalItemCount() int {
return t.TotalItemCount
}
func (t *Task) GetFinishedItemCount() int {
t.locker.RLock()
defer t.locker.RUnlock()
return t.FinishedItemCount
}
func (t *Task) GetTotalJobCount() int {
return t.TotalJobCount
}
func (t *Task) GetFinishedJobCount() int {
t.locker.RLock()
defer t.locker.RUnlock()
return t.FinishedJobCount
}
func (t *Task) GetStatus() int {
t.locker.RLock()
defer t.locker.RUnlock()
return t.Status
}
/////////
func (t *Task) finishedOneJob(itemCount int) {
t.locker.Lock()
defer t.locker.Unlock()
t.UpdatedAt = time.Now()
t.FinishedItemCount += itemCount
t.FinishedJobCount++
}
func (t *Task) setStatus(status int) {
t.locker.Lock()
defer t.locker.Unlock()
t.Status = status
if status >= TaskStatusEndBegin {
t.TerminatedAt = time.Now()
close(t.finishChan)
close(t.subFinishChan)
close(t.quitChan)
}
}

View File

@@ -0,0 +1,43 @@
package tasksch
import (
"sort"
"time"
)
var (
defTaskMan TaskMan
)
type TaskMan struct {
taskList map[string]*Task
}
func init() {
defTaskMan.taskList = make(map[string]*Task)
}
func (m *TaskMan) RunTask(taskName string, worker WorkFunc, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, itemList interface{}, params ...interface{}) *Task {
task := RunTask(taskName, worker, resultHandler, parallelCount, batchSize, userName, itemList, params...)
m.taskList[task.ID] = task
return task
}
func (m *TaskMan) GetTasks(taskID string, fromStatus, toStatus int, lastHours int) (taskList []*Task) {
lastTime := time.Now().Add(time.Duration(-lastHours) * time.Hour).Unix()
for k, v := range m.taskList {
if !((taskID != "" && taskID != k) || v.Status < fromStatus || v.Status > toStatus || v.CreatedAt.Unix() < lastTime) {
taskList = append(taskList, v)
}
}
sort.Sort(TaskList(taskList))
return taskList
}
func RunManagedTask(taskName string, worker WorkFunc, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, itemList interface{}, params ...interface{}) *Task {
return defTaskMan.RunTask(taskName, worker, resultHandler, parallelCount, batchSize, userName, itemList, params...)
}
func GetTasks(taskID string, fromStatus, toStatus int, lastHours int) (taskList []*Task) {
return defTaskMan.GetTasks(taskID, fromStatus, toStatus, lastHours)
}

View File

@@ -0,0 +1,49 @@
package tasksch
import (
"math/rand"
"testing"
"time"
"git.rosy.net.cn/baseapi/utils"
)
func TestTaskMan(t *testing.T) {
itemList := make([]int, 100)
for k := range itemList {
itemList[k] = k
}
task1 := RunManagedTask("test", func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
sleepSecond := rand.Intn(5)
t.Logf("sleep %d seconds", sleepSecond)
time.Sleep(time.Duration(sleepSecond) * time.Second)
retSlice := make([]string, len(batchItemList))
for k := range retSlice {
retSlice[k] = "hello:" + utils.Int2Str(batchItemList[k].(int)*2)
}
return retSlice, nil
}, func(taskName string, result []interface{}, err error) {
// t.Log("finished here")
// t.Log(utils.Format4Output(result, false))
}, 100, 7, "autotest", itemList, "a", "b", 1, 2)
task2 := RunManagedTask("test", func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
sleepSecond := rand.Intn(5)
t.Logf("sleep %d seconds", sleepSecond)
time.Sleep(time.Duration(sleepSecond) * time.Second)
retSlice := make([]string, len(batchItemList))
for k := range retSlice {
retSlice[k] = "hello:" + utils.Int2Str(batchItemList[k].(int)*2)
}
return retSlice, nil
}, func(taskName string, result []interface{}, err error) {
// t.Log("finished here")
// t.Log(utils.Format4Output(result, false))
}, 100, 7, "autotest", itemList, "a", "b", 1, 2)
time.Sleep(2 * time.Second)
task2.Cancel()
if task1.GetStatus() == task2.GetStatus() {
t.Log(utils.Format4Output(GetTasks("", TaskStatusBegin, TaskStatusEnd, 5), false))
t.Fatal("status should not be same")
}
}

View File

@@ -0,0 +1,70 @@
package tasksch
import (
"math/rand"
"testing"
"time"
"git.rosy.net.cn/baseapi/utils"
)
func TestRunTask(t *testing.T) {
itemList := make([]int, 100)
for k := range itemList {
itemList[k] = k
}
task := RunTask("test", func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
sleepSecond := rand.Intn(5)
t.Logf("sleep %d seconds", sleepSecond)
time.Sleep(time.Duration(sleepSecond) * time.Second)
retSlice := make([]string, len(batchItemList))
for k := range retSlice {
retSlice[k] = "hello:" + utils.Int2Str(batchItemList[k].(int)*2)
}
return retSlice, nil
}, func(taskName string, result []interface{}, err error) {
// t.Log("finished here")
// t.Log(utils.Format4Output(result, false))
}, 100, 7, "autotest", itemList, "a", "b", 1, 2)
result, err := task.GetResult(1 * time.Microsecond)
if err == nil || task.GetStatus() != TaskStatusWorking {
t.Fatal("task can not be done in 1 microsecond")
}
result, err = task.GetResult(0)
if err != nil {
t.Fatal(err)
}
if len(result) != len(itemList) {
t.Log(utils.Format4Output(result, false))
t.Fatal("result size doesn't match with itemList")
}
t.Log(task.GetStatus())
}
func TestCancelTask(t *testing.T) {
itemList := make([]int, 100)
for k := range itemList {
itemList[k] = k
}
task := RunTask("test", func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
sleepSecond := rand.Intn(5)
t.Logf("sleep %d seconds", sleepSecond)
time.Sleep(time.Duration(sleepSecond) * time.Second)
retSlice := make([]string, len(batchItemList))
for k := range retSlice {
retSlice[k] = "hello:" + utils.Int2Str(batchItemList[k].(int)*2)
}
return retSlice, nil
}, func(taskName string, result []interface{}, err error) {
// t.Log("finished here")
// t.Log(utils.Format4Output(result, false))
}, 100, 7, "autotest", itemList, "a", "b", 1, 2)
// time.Sleep(time.Second * 6)
t.Logf("finishedItemCount:%d, finishedJobCount:%d", task.GetFinishedItemCount(), task.GetFinishedJobCount())
task.Cancel()
_, err := task.GetResult(0)
if err != ErrTaskIsCanceled {
t.Fatal("task should in canceled status")
}
// t.Log(utils.Format4Output(result, false))
}

View File

@@ -32,7 +32,7 @@ func Init() {
// db.Set("gorm:table_options", "CHARSET=utf8mb4").AutoMigrate(&model.SkuCategory{})
orm.RegisterModel(&model.SkuCategory{})
orm.RegisterModel(&model.WeiXins{}, &model.JxBackendUser{})
orm.RegisterModel(&model.DurableTask{}, &model.DurableTaskItem{})
// orm.RegisterModel(&model.DurableTask{}, &model.DurableTaskItem{})
}
// create table
orm.RunSyncdb("default", false, true)