- optimize tasksch channel.

This commit is contained in:
gazebo
2018-10-21 17:09:08 +08:00
parent 0fb6fe27fe
commit c818c45a4d
3 changed files with 12 additions and 17 deletions

View File

@@ -86,7 +86,7 @@ func NewParallelTask(taskName string, config *ParallelConfig, userName string, w
} }
task := &ParallelTask{ task := &ParallelTask{
subFinishChan: make(chan interface{}, config.ParallelCount), subFinishChan: make(chan interface{}, config.ParallelCount),
taskChan: make(chan []interface{}, len(realItemList)+config.ParallelCount), // 确保能装下所有taskitem加结束标记 taskChan: make(chan []interface{}, len(realItemList)),
resultHandler: config.ResultHandler, resultHandler: config.ResultHandler,
worker: worker, worker: worker,
jobList: jobList, jobList: jobList,
@@ -114,8 +114,8 @@ func (task *ParallelTask) Run() {
goto end goto end
default: default:
select { select {
case job := <-task.taskChan: case job, ok := <-task.taskChan:
if job == nil { // 任务完成 if !ok { // 任务完成
chanRetVal = retVal chanRetVal = retVal
goto end goto end
} else { } else {
@@ -149,9 +149,7 @@ func (task *ParallelTask) Run() {
for _, job := range task.jobList { for _, job := range task.jobList {
task.taskChan <- job task.taskChan <- job
} }
for i := 0; i < task.ParallelCount; i++ { close(task.taskChan)
task.taskChan <- nil
}
taskResult := make([]interface{}, 0) taskResult := make([]interface{}, 0)
var taskErr error var taskErr error
@@ -170,9 +168,6 @@ func (task *ParallelTask) Run() {
} }
task.locker.Lock() task.locker.Lock()
if task.Status != TaskStatusCanceling {
close(task.quitChan)
}
if taskErr != nil { // 如果有错误,肯定就是失败了 if taskErr != nil { // 如果有错误,肯定就是失败了
task.Status = TaskStatusFailed task.Status = TaskStatusFailed
} else { } else {
@@ -190,7 +185,6 @@ func (task *ParallelTask) Run() {
globals.SugarLogger.Debugf("ParallelTask.Run %s, result:%v, err:%v", task.Name, taskResult, taskErr) globals.SugarLogger.Debugf("ParallelTask.Run %s, result:%v, err:%v", task.Name, taskResult, taskErr)
close(task.finishChan)
close(task.subFinishChan) close(task.subFinishChan)
if task.resultHandler != nil { if task.resultHandler != nil {

View File

@@ -57,9 +57,6 @@ func (task *SeqTask) Run() {
} }
EndFor: EndFor:
task.locker.Lock() task.locker.Lock()
if task.Status != TaskStatusCanceling {
close(task.quitChan)
}
if taskErr != nil { // 如果有错误,肯定就是失败了 if taskErr != nil { // 如果有错误,肯定就是失败了
task.Status = TaskStatusFailed task.Status = TaskStatusFailed
} else { } else {
@@ -76,8 +73,6 @@ func (task *SeqTask) Run() {
task.locker.Unlock() task.locker.Unlock()
globals.SugarLogger.Debugf("SeqTask.Run %s, result:%v, err:%v", task.Name, taskResult, taskErr) globals.SugarLogger.Debugf("SeqTask.Run %s, result:%v, err:%v", task.Name, taskResult, taskErr)
close(task.finishChan)
}) })
} }

View File

@@ -101,8 +101,8 @@ func (t *BaseTask) Init(parallelCount, batchSize int, isContinueWhenError bool,
t.TerminatedAt = utils.DefaultTimeValue t.TerminatedAt = utils.DefaultTimeValue
t.TotalItemCount = totalItemCount t.TotalItemCount = totalItemCount
t.TotalJobCount = totalJobCount t.TotalJobCount = totalJobCount
t.quitChan = make(chan int, 1) t.quitChan = make(chan int)
t.finishChan = make(chan int, 1) t.finishChan = make(chan int)
t.Status = TaskStatusWorking t.Status = TaskStatusWorking
t.C = t.finishChan t.C = t.finishChan
@@ -228,11 +228,17 @@ func (t *BaseTask) run(taskHandler func()) {
}() }()
taskHandler() taskHandler()
select {
case <-t.quitChan:
default:
close(t.quitChan)
}
for _, subTask := range t.Children { for _, subTask := range t.Children {
if _, err := subTask.GetResult(0); err != nil { if _, err := subTask.GetResult(0); err != nil {
globals.SugarLogger.Warnf("BaseTask run, failed with error:%v", err) globals.SugarLogger.Warnf("BaseTask run, failed with error:%v", err)
} }
} }
close(t.finishChan)
}) })
} }