- fix bug send msg to closed channel.

This commit is contained in:
gazebo
2018-09-17 17:49:13 +08:00
parent 35da901319
commit 764bc0b7b3

View File

@@ -98,7 +98,6 @@ func RunTask(taskName string, worker WorkFunc, resultHandler ResultHandlerFunc,
for { for {
select { select {
case <-task.quitChan: case <-task.quitChan:
chanRetVal = retVal
goto end goto end
case job := <-task.taskChan: case job := <-task.taskChan:
if job == nil { if job == nil {
@@ -113,7 +112,6 @@ func RunTask(taskName string, worker WorkFunc, resultHandler ResultHandlerFunc,
} else { } else {
chanRetVal = err chanRetVal = err
task.Cancel() task.Cancel()
task.setStatus(TaskStatusFailed)
} }
} }
} }
@@ -129,26 +127,41 @@ func RunTask(taskName string, worker WorkFunc, resultHandler ResultHandlerFunc,
task.taskChan <- nil task.taskChan <- nil
} }
task.result = make([]interface{}, 0) taskResult := make([]interface{}, 0)
var taskErr error
for i := 0; i < parallelCount; i++ { for i := 0; i < parallelCount; i++ {
result := <-task.subFinishChan result := <-task.subFinishChan
if err2, ok := result.(error); ok { if err2, ok := result.(error); ok {
task.result = nil taskResult = nil
task.err = err2 taskErr = err2
break break
} else { } else if result != nil {
resultList := result.([]interface{}) resultList := result.([]interface{})
task.result = append(task.result, resultList...) taskResult = append(taskResult, resultList...)
} }
} }
if task.GetStatus() != TaskStatusFailed {
task.locker.Lock()
if taskErr != nil { // 如果有错误,肯定就是失败了
task.Status = TaskStatusFailed
} else {
if len(task.taskChan) > 0 { if len(task.taskChan) > 0 {
task.err = ErrTaskIsCanceled taskErr = ErrTaskIsCanceled
task.setStatus(TaskStatusCanceled) task.Status = TaskStatusCanceled
} else { } else {
task.setStatus(TaskStatusFinished) task.Status = TaskStatusFinished
} }
} }
task.err = taskErr
task.result = taskResult
task.TerminatedAt = time.Now()
task.locker.Unlock()
close(task.finishChan)
close(task.subFinishChan)
close(task.quitChan)
if resultHandler != nil { if resultHandler != nil {
resultHandler(taskName, task.result, task.err) resultHandler(taskName, task.result, task.err)
} }
@@ -167,6 +180,9 @@ func (t *Task) GetResult(duration time.Duration) (retVal []interface{}, err erro
select { select {
case <-t.finishChan: case <-t.finishChan:
timer.Stop() timer.Stop()
t.locker.RLock()
defer t.locker.RUnlock()
return t.result, t.err return t.result, t.err
case <-timer.C: case <-timer.C:
} }
@@ -227,10 +243,4 @@ func (t *Task) setStatus(status int) {
defer t.locker.Unlock() defer t.locker.Unlock()
t.Status = status t.Status = status
if status >= TaskStatusEndBegin {
t.TerminatedAt = time.Now()
close(t.finishChan)
close(t.subFinishChan)
close(t.quitChan)
}
} }