diff --git a/business/jxutils/dtask/dtask.go b/business/jxutils/dtask/dtask.go index 91d1e1122..d7e6e63f1 100644 --- a/business/jxutils/dtask/dtask.go +++ b/business/jxutils/dtask/dtask.go @@ -107,7 +107,7 @@ func (m *DurableTaskMan) StartTask(taskID string) error { d := m.tasks[taskID] if d.cmdChan == nil { d.cmdChan = make(chan string) - go func() { + utils.CallFuncAsync(func() { failedItemCount := 0 for _, taskItem := range d.items { if taskItem.Status == 0 { @@ -137,7 +137,7 @@ func (m *DurableTaskMan) StartTask(taskID string) error { d.data.UpdatedAt = d.data.FinishedAt dao.UpdateEntity(nil, d.data, "Status", "FinishedAt", "UpdatedAt") } - }() + }) return nil } return errors.New("任务已经启动") diff --git a/business/jxutils/tasksch/parallel_task.go b/business/jxutils/tasksch/parallel_task.go index c59055fe3..ed6e6d3c5 100644 --- a/business/jxutils/tasksch/parallel_task.go +++ b/business/jxutils/tasksch/parallel_task.go @@ -116,7 +116,7 @@ func (task *ParallelTask) Run() { task.run(func() { globals.SugarLogger.Debugf("ParallelTask.Run %s", task.Name) for i := 0; i < task.ParallelCount; i++ { - go func() { + utils.CallFuncAsync(func() { var chanRetVal interface{} retVal := make([]interface{}, 0) for { @@ -152,7 +152,7 @@ func (task *ParallelTask) Run() { task.subFinishChan <- chanRetVal } task.locker.RUnlock() - }() + }) } for _, job := range task.jobList { task.taskChan <- job diff --git a/business/jxutils/tasksch/task.go b/business/jxutils/tasksch/task.go index 8a65b59b4..177401d32 100644 --- a/business/jxutils/tasksch/task.go +++ b/business/jxutils/tasksch/task.go @@ -209,14 +209,20 @@ func (t *BaseTask) MarshalJSON() ([]byte, error) { } func (t *BaseTask) run(taskHandler func()) { - go func() { + utils.CallFuncAsync(func() { + defer func() { + if r := recover(); r != nil { + globals.SugarLogger.Errorf("panic in BaseTask.run task:%s, task detail:%s", t.Name, utils.Format4Output(t, false)) + } + }() + taskHandler() for _, subTask := range t.Children { if _, err := subTask.GetResult(0); err != nil { globals.SugarLogger.Warnf("BaseTask run, failed with error:%v", err) } } - }() + }) } func (t *BaseTask) finishedOneJob(itemCount int, err error) { diff --git a/legacy/tasks/configrefresh.go b/legacy/tasks/configrefresh.go index e335f929d..f555aaa8b 100644 --- a/legacy/tasks/configrefresh.go +++ b/legacy/tasks/configrefresh.go @@ -79,12 +79,12 @@ func RefreshConfig(configKey string, expiresTime time.Duration, configGetter fun token, err := refreshFunc() // 这样写的目的是强制第一次调用时要刷新一次 if err == nil { configSetter(token) - go func() { + utils.CallFuncAsync(func() { for { time.Sleep(sleepGap) refreshFunc() } - }() + }) } return err }