From 764bc0b7b3fab9d55bcc7eaa6e5fdd3c17b85620 Mon Sep 17 00:00:00 2001 From: gazebo Date: Mon, 17 Sep 2018 17:49:13 +0800 Subject: [PATCH] - fix bug send msg to closed channel. --- business/jxutils/tasksch/task.go | 44 ++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/business/jxutils/tasksch/task.go b/business/jxutils/tasksch/task.go index a8b5cf21b..1a23f5813 100644 --- a/business/jxutils/tasksch/task.go +++ b/business/jxutils/tasksch/task.go @@ -98,7 +98,6 @@ func RunTask(taskName string, worker WorkFunc, resultHandler ResultHandlerFunc, for { select { case <-task.quitChan: - chanRetVal = retVal goto end case job := <-task.taskChan: if job == nil { @@ -113,7 +112,6 @@ func RunTask(taskName string, worker WorkFunc, resultHandler ResultHandlerFunc, } else { chanRetVal = err task.Cancel() - task.setStatus(TaskStatusFailed) } } } @@ -129,26 +127,41 @@ func RunTask(taskName string, worker WorkFunc, resultHandler ResultHandlerFunc, task.taskChan <- nil } - task.result = make([]interface{}, 0) + taskResult := make([]interface{}, 0) + var taskErr error for i := 0; i < parallelCount; i++ { result := <-task.subFinishChan if err2, ok := result.(error); ok { - task.result = nil - task.err = err2 + taskResult = nil + taskErr = err2 break - } else { + } else if result != nil { resultList := result.([]interface{}) - task.result = append(task.result, resultList...) + taskResult = append(taskResult, resultList...) } } - if task.GetStatus() != TaskStatusFailed { + + task.locker.Lock() + if taskErr != nil { // 如果有错误,肯定就是失败了 + task.Status = TaskStatusFailed + } else { if len(task.taskChan) > 0 { - task.err = ErrTaskIsCanceled - task.setStatus(TaskStatusCanceled) + taskErr = ErrTaskIsCanceled + task.Status = TaskStatusCanceled } else { - task.setStatus(TaskStatusFinished) + task.Status = TaskStatusFinished } } + task.err = taskErr + task.result = taskResult + task.TerminatedAt = time.Now() + + task.locker.Unlock() + + close(task.finishChan) + close(task.subFinishChan) + close(task.quitChan) + if resultHandler != nil { resultHandler(taskName, task.result, task.err) } @@ -167,6 +180,9 @@ func (t *Task) GetResult(duration time.Duration) (retVal []interface{}, err erro select { case <-t.finishChan: timer.Stop() + t.locker.RLock() + defer t.locker.RUnlock() + return t.result, t.err case <-timer.C: } @@ -227,10 +243,4 @@ func (t *Task) setStatus(status int) { defer t.locker.Unlock() t.Status = status - if status >= TaskStatusEndBegin { - t.TerminatedAt = time.Now() - close(t.finishChan) - close(t.subFinishChan) - close(t.quitChan) - } }