107 lines
3.4 KiB
Go
107 lines
3.4 KiB
Go
package tasksch
|
|
|
|
import (
|
|
"fmt"
|
|
"math/rand"
|
|
"testing"
|
|
"time"
|
|
|
|
"git.rosy.net.cn/baseapi/utils"
|
|
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
|
|
)
|
|
|
|
func TestRunParallelTask(t *testing.T) {
|
|
itemList := make([]int, 100)
|
|
for k := range itemList {
|
|
itemList[k] = k
|
|
}
|
|
task := NewParallelTask("test", NewParallelConfig().SetParallelCount(100).SetBatchSize(7), jxcontext.AdminCtx,
|
|
func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
|
sleepSecond := rand.Intn(5)
|
|
t.Logf("sleep %d seconds", sleepSecond)
|
|
time.Sleep(time.Duration(sleepSecond) * time.Second)
|
|
retSlice := make([]string, len(batchItemList))
|
|
for k := range retSlice {
|
|
retSlice[k] = "hello:" + utils.Int2Str(batchItemList[k].(int)*2)
|
|
}
|
|
return retSlice, nil
|
|
}, itemList, "a", "b", 1, 2)
|
|
task.Run()
|
|
result, err := task.GetResult(1 * time.Microsecond)
|
|
if err == nil || task.GetStatus() != TaskStatusWorking {
|
|
t.Fatal("task can not be done in 1 microsecond")
|
|
}
|
|
result, err = task.GetResult(0)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if len(result) != len(itemList) {
|
|
t.Log(utils.Format4Output(result, false))
|
|
t.Fatal("result size doesn't match with itemList")
|
|
}
|
|
t.Log(task.GetStatus())
|
|
}
|
|
|
|
func TestCancelParallelTask(t *testing.T) {
|
|
itemList := make([]int, 100)
|
|
for k := range itemList {
|
|
itemList[k] = k
|
|
}
|
|
task := NewParallelTask("test", NewParallelConfig().SetParallelCount(100).SetBatchSize(7), jxcontext.AdminCtx,
|
|
func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
|
sleepSecond := rand.Intn(5)
|
|
fmt.Printf("sleep %d seconds\n", sleepSecond)
|
|
time.Sleep(time.Duration(sleepSecond) * time.Second)
|
|
retSlice := make([]string, len(batchItemList))
|
|
for k := range retSlice {
|
|
retSlice[k] = "hello:" + utils.Int2Str(batchItemList[k].(int)*2)
|
|
}
|
|
return retSlice, nil
|
|
}, itemList, "a", "b", 1, 2)
|
|
task.Run()
|
|
// time.Sleep(time.Second * 6)
|
|
fmt.Printf("finishedItemCount:%d, finishedJobCount:%d\n", task.GetFinishedItemCount(), task.GetFinishedJobCount())
|
|
task.Cancel()
|
|
_, err := task.GetResult(0)
|
|
if err != ErrTaskIsCanceled {
|
|
t.Fatal("task should in canceled status")
|
|
}
|
|
// t.Log(utils.Format4Output(result, false))
|
|
}
|
|
|
|
func TestRunParallelTaskPartialSuccess(t *testing.T) {
|
|
itemList := make([]int, 100)
|
|
for k := range itemList {
|
|
itemList[k] = k
|
|
}
|
|
task := NewParallelTask("test", NewParallelConfig().SetParallelCount(100).SetBatchSize(7).SetIsContinueWhenError(true), jxcontext.AdminCtx,
|
|
func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
|
sleepSecond := rand.Intn(5)
|
|
t.Logf("sleep %d seconds", sleepSecond)
|
|
time.Sleep(time.Duration(sleepSecond) * time.Second)
|
|
retSlice := make([]string, len(batchItemList))
|
|
for k := range retSlice {
|
|
retSlice[k] = "hello:" + utils.Int2Str(batchItemList[k].(int)*2)
|
|
}
|
|
if rand.Intn(2) == 1 {
|
|
return nil, fmt.Errorf("test:%d", batchItemList[0].(int))
|
|
}
|
|
return retSlice, nil
|
|
}, itemList, "a", "b", 1, 2)
|
|
task.Run()
|
|
result, err := task.GetResult(1 * time.Microsecond)
|
|
if err == nil || task.GetStatus() != TaskStatusWorking {
|
|
t.Fatal("task can not be done in 1 microsecond")
|
|
}
|
|
result, err = task.GetResult(0)
|
|
if err == nil {
|
|
t.Fatal("不应该全部成功")
|
|
}
|
|
fmt.Println(err)
|
|
if len(result) == len(itemList) {
|
|
t.Log(utils.Format4Output(result, false))
|
|
t.Fatal("不应该全部成功")
|
|
}
|
|
t.Log(task.GetStatus())
|
|
}
|