Merge remote-tracking branch 'origin/mark' into don

This commit is contained in:
Rosy-zhudan
2019-08-29 14:49:09 +08:00
10 changed files with 53 additions and 23 deletions

View File

@@ -24,6 +24,6 @@ func (m *SyncMapWithTimeout) Delete(key interface{}) {
if value, ok := m.timers.Load(key); ok {
timer := value.(*time.Timer)
timer.Stop()
m.timers.Delete(key)
}
m.timers.Delete(key)
}

View File

@@ -179,6 +179,7 @@ func (task *ParallelTask) Run() {
close(task.subFinishChan)
task.jobList = nil // 如果不释放,任务被管理的话,会导致内存不能释放
task.worker = nil
})
}

View File

@@ -59,6 +59,8 @@ func (task *SeqTask) Run() {
task.Result = taskResult
task.mainErr = taskErr
task.locker.Unlock()
task.worker = nil
})
}

View File

@@ -169,6 +169,7 @@ func (t *BaseTask) GetID() string {
return t.ID
}
// 此函数成功返回结果后结果在任务中会被删除以免被管理的任务不必要的HOLD住对象
func (t *BaseTask) GetResult(duration time.Duration) (retVal []interface{}, err error) {
if t.GetStatus() >= TaskStatusEndBegin {
return t.getResult(), t.GetErr()
@@ -279,10 +280,12 @@ func (t *BaseTask) GetNoticeMsg() string {
return t.NoticeMsg
}
func (t *BaseTask) getResult() []interface{} {
t.locker.RLock()
defer t.locker.RUnlock()
return t.Result
func (t *BaseTask) getResult() (result []interface{}) {
t.locker.Lock()
defer t.locker.Unlock()
result = t.Result
t.Result = nil
return result
}
func (t *BaseTask) AddBatchErr(err error) {