- close quitChan when Cancel for tasksch.
This commit is contained in:
@@ -112,23 +112,26 @@ func (task *ParallelTask) Run() {
|
|||||||
select {
|
select {
|
||||||
case <-task.quitChan: // 取消
|
case <-task.quitChan: // 取消
|
||||||
goto end
|
goto end
|
||||||
case job := <-task.taskChan:
|
default:
|
||||||
if job == nil { // 任务完成
|
select {
|
||||||
chanRetVal = retVal
|
case job := <-task.taskChan:
|
||||||
goto end
|
if job == nil { // 任务完成
|
||||||
} else {
|
chanRetVal = retVal
|
||||||
result, err := task.worker(task, job, task.params...)
|
goto end
|
||||||
// globals.SugarLogger.Debugf("ParallelTask.Run %s, after call worker result:%v, err:%v", task.Name, result, err)
|
|
||||||
task.finishedOneJob(len(job), err)
|
|
||||||
if err == nil {
|
|
||||||
if result != nil {
|
|
||||||
retVal = append(retVal, utils.Interface2Slice(result)...)
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
globals.SugarLogger.Infof("ParallelTask.Run %s, subtask(job:%s, params:%s) result:%v, failed with error:%v", task.Name, utils.Format4Output(job, true), utils.Format4Output(task.params, true), result, err)
|
result, err := task.worker(task, job, task.params...)
|
||||||
if !task.IsContinueWhenError { // 出错
|
// globals.SugarLogger.Debugf("ParallelTask.Run %s, after call worker result:%v, err:%v", task.Name, result, err)
|
||||||
chanRetVal = err
|
task.finishedOneJob(len(job), err)
|
||||||
goto end
|
if err == nil {
|
||||||
|
if result != nil {
|
||||||
|
retVal = append(retVal, utils.Interface2Slice(result)...)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
globals.SugarLogger.Infof("ParallelTask.Run %s, subtask(job:%s, params:%s) result:%v, failed with error:%v", task.Name, utils.Format4Output(job, true), utils.Format4Output(task.params, true), result, err)
|
||||||
|
if !task.IsContinueWhenError { // 出错
|
||||||
|
chanRetVal = err
|
||||||
|
goto end
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -167,6 +170,9 @@ func (task *ParallelTask) Run() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
task.locker.Lock()
|
task.locker.Lock()
|
||||||
|
if task.Status != TaskStatusCanceling {
|
||||||
|
close(task.quitChan)
|
||||||
|
}
|
||||||
if taskErr != nil { // 如果有错误,肯定就是失败了
|
if taskErr != nil { // 如果有错误,肯定就是失败了
|
||||||
task.Status = TaskStatusFailed
|
task.Status = TaskStatusFailed
|
||||||
} else {
|
} else {
|
||||||
@@ -186,7 +192,6 @@ func (task *ParallelTask) Run() {
|
|||||||
|
|
||||||
close(task.finishChan)
|
close(task.finishChan)
|
||||||
close(task.subFinishChan)
|
close(task.subFinishChan)
|
||||||
close(task.quitChan)
|
|
||||||
|
|
||||||
if task.resultHandler != nil {
|
if task.resultHandler != nil {
|
||||||
task.resultHandler(task.Name, taskResult, task.Err)
|
task.resultHandler(task.Name, taskResult, task.Err)
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package tasksch
|
package tasksch
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@@ -45,7 +46,7 @@ func TestCancelParallelTask(t *testing.T) {
|
|||||||
}
|
}
|
||||||
task := RunParallelTask("test", NewParallelConfig().SetParallelCount(100).SetBatchSize(7), "autotest", func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
task := RunParallelTask("test", NewParallelConfig().SetParallelCount(100).SetBatchSize(7), "autotest", func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||||||
sleepSecond := rand.Intn(5)
|
sleepSecond := rand.Intn(5)
|
||||||
t.Logf("sleep %d seconds", sleepSecond)
|
fmt.Printf("sleep %d seconds\n", sleepSecond)
|
||||||
time.Sleep(time.Duration(sleepSecond) * time.Second)
|
time.Sleep(time.Duration(sleepSecond) * time.Second)
|
||||||
retSlice := make([]string, len(batchItemList))
|
retSlice := make([]string, len(batchItemList))
|
||||||
for k := range retSlice {
|
for k := range retSlice {
|
||||||
@@ -54,7 +55,7 @@ func TestCancelParallelTask(t *testing.T) {
|
|||||||
return retSlice, nil
|
return retSlice, nil
|
||||||
}, itemList, "a", "b", 1, 2)
|
}, itemList, "a", "b", 1, 2)
|
||||||
// time.Sleep(time.Second * 6)
|
// time.Sleep(time.Second * 6)
|
||||||
t.Logf("finishedItemCount:%d, finishedJobCount:%d", task.GetFinishedItemCount(), task.GetFinishedJobCount())
|
fmt.Printf("finishedItemCount:%d, finishedJobCount:%d\n", task.GetFinishedItemCount(), task.GetFinishedJobCount())
|
||||||
task.Cancel()
|
task.Cancel()
|
||||||
_, err := task.GetResult(0)
|
_, err := task.GetResult(0)
|
||||||
if err != ErrTaskIsCanceled {
|
if err != ErrTaskIsCanceled {
|
||||||
|
|||||||
@@ -57,6 +57,9 @@ func (task *SeqTask) Run() {
|
|||||||
}
|
}
|
||||||
EndFor:
|
EndFor:
|
||||||
task.locker.Lock()
|
task.locker.Lock()
|
||||||
|
if task.Status != TaskStatusCanceling {
|
||||||
|
close(task.quitChan)
|
||||||
|
}
|
||||||
if taskErr != nil { // 如果有错误,肯定就是失败了
|
if taskErr != nil { // 如果有错误,肯定就是失败了
|
||||||
task.Status = TaskStatusFailed
|
task.Status = TaskStatusFailed
|
||||||
} else {
|
} else {
|
||||||
@@ -75,7 +78,6 @@ func (task *SeqTask) Run() {
|
|||||||
globals.SugarLogger.Debugf("SeqTask.Run %s, result:%v, err:%v", task.Name, taskResult, taskErr)
|
globals.SugarLogger.Debugf("SeqTask.Run %s, result:%v, err:%v", task.Name, taskResult, taskErr)
|
||||||
|
|
||||||
close(task.finishChan)
|
close(task.finishChan)
|
||||||
close(task.quitChan)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -101,8 +101,8 @@ func (t *BaseTask) Init(parallelCount, batchSize int, isContinueWhenError bool,
|
|||||||
t.TerminatedAt = utils.DefaultTimeValue
|
t.TerminatedAt = utils.DefaultTimeValue
|
||||||
t.TotalItemCount = totalItemCount
|
t.TotalItemCount = totalItemCount
|
||||||
t.TotalJobCount = totalJobCount
|
t.TotalJobCount = totalJobCount
|
||||||
t.quitChan = make(chan int, parallelCount)
|
t.quitChan = make(chan int, 1)
|
||||||
t.finishChan = make(chan int, 2)
|
t.finishChan = make(chan int, 1)
|
||||||
t.Status = TaskStatusWorking
|
t.Status = TaskStatusWorking
|
||||||
|
|
||||||
t.C = t.finishChan
|
t.C = t.finishChan
|
||||||
@@ -143,9 +143,7 @@ func (t *BaseTask) Cancel() {
|
|||||||
t.locker.Lock()
|
t.locker.Lock()
|
||||||
if t.Status < TaskStatusEndBegin && t.Status != TaskStatusCanceling {
|
if t.Status < TaskStatusEndBegin && t.Status != TaskStatusCanceling {
|
||||||
t.Status = TaskStatusCanceling
|
t.Status = TaskStatusCanceling
|
||||||
for i := 0; i < t.ParallelCount; i++ {
|
close(t.quitChan)
|
||||||
t.quitChan <- 0
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
t.locker.Unlock()
|
t.locker.Unlock()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user