- ren tasksch.Task to ParallelTask.

This commit is contained in:
gazebo
2018-10-20 09:35:17 +08:00
parent 66ef068fc3
commit c00336905a
2 changed files with 8 additions and 8 deletions

View File

@@ -24,7 +24,7 @@ type ParallelConfig struct {
ResultHandler ResultHandlerFunc ResultHandler ResultHandlerFunc
} }
type Task struct { type ParallelTask struct {
BaseTask BaseTask
resultHandler ResultHandlerFunc resultHandler ResultHandlerFunc
@@ -68,7 +68,7 @@ func (c *ParallelConfig) SetResultHandler(resultHandler ResultHandlerFunc) *Para
return c return c
} }
func NewParallelTask(taskName string, userName string, config *ParallelConfig, worker WorkFunc, itemList interface{}, params ...interface{}) *Task { func NewParallelTask(taskName string, userName string, config *ParallelConfig, worker WorkFunc, itemList interface{}, params ...interface{}) *ParallelTask {
if config == nil { if config == nil {
config = NewParallelConfig() config = NewParallelConfig()
} }
@@ -84,7 +84,7 @@ func NewParallelTask(taskName string, userName string, config *ParallelConfig, w
if config.ParallelCount > jobListLen { if config.ParallelCount > jobListLen {
config.ParallelCount = jobListLen config.ParallelCount = jobListLen
} }
task := &Task{ task := &ParallelTask{
subFinishChan: make(chan interface{}, config.ParallelCount), subFinishChan: make(chan interface{}, config.ParallelCount),
taskChan: make(chan []interface{}, len(realItemList)+config.ParallelCount), // 确保能装下所有taskitem加结束标记 taskChan: make(chan []interface{}, len(realItemList)+config.ParallelCount), // 确保能装下所有taskitem加结束标记
resultHandler: config.ResultHandler, resultHandler: config.ResultHandler,
@@ -95,13 +95,13 @@ func NewParallelTask(taskName string, userName string, config *ParallelConfig, w
return task return task
} }
func RunParallelTask(taskName string, userName string, config *ParallelConfig, worker WorkFunc, itemList interface{}, params ...interface{}) *Task { func RunParallelTask(taskName string, userName string, config *ParallelConfig, worker WorkFunc, itemList interface{}, params ...interface{}) *ParallelTask {
task := NewParallelTask(taskName, userName, config, worker, itemList, params...) task := NewParallelTask(taskName, userName, config, worker, itemList, params...)
task.Run() task.Run()
return task return task
} }
func RunTask(taskName string, isContinueWhenError bool, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, worker WorkFunc, itemList interface{}, params ...interface{}) *Task { func RunTask(taskName string, isContinueWhenError bool, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, worker WorkFunc, itemList interface{}, params ...interface{}) *ParallelTask {
config := NewParallelConfig() config := NewParallelConfig()
config.BatchSize = batchSize config.BatchSize = batchSize
config.IsContinueWhenError = isContinueWhenError config.IsContinueWhenError = isContinueWhenError
@@ -112,7 +112,7 @@ func RunTask(taskName string, isContinueWhenError bool, resultHandler ResultHand
return task return task
} }
func (task *Task) Run() { func (task *ParallelTask) Run() {
task.run(func() { task.run(func() {
globals.SugarLogger.Debugf("ParallelTask.Run %s", task.Name) globals.SugarLogger.Debugf("ParallelTask.Run %s", task.Name)
for i := 0; i < task.ParallelCount; i++ { for i := 0; i < task.ParallelCount; i++ {

View File

@@ -18,7 +18,7 @@ func init() {
defTaskMan.taskList = make(map[string]ITask) defTaskMan.taskList = make(map[string]ITask)
} }
func (m *TaskMan) RunTask(taskName string, isContinueWhenError bool, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, worker WorkFunc, itemList interface{}, params ...interface{}) *Task { func (m *TaskMan) RunTask(taskName string, isContinueWhenError bool, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, worker WorkFunc, itemList interface{}, params ...interface{}) *ParallelTask {
task := RunTask(taskName, isContinueWhenError, resultHandler, parallelCount, batchSize, userName, worker, itemList, params...) task := RunTask(taskName, isContinueWhenError, resultHandler, parallelCount, batchSize, userName, worker, itemList, params...)
m.taskList[task.ID] = task m.taskList[task.ID] = task
return task return task
@@ -39,7 +39,7 @@ func (m *TaskMan) GetTasks(taskID string, fromStatus, toStatus int, lastHours in
return taskList return taskList
} }
func RunManagedTask(taskName string, isContinueWhenError bool, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, worker WorkFunc, itemList interface{}, params ...interface{}) *Task { func RunManagedTask(taskName string, isContinueWhenError bool, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, worker WorkFunc, itemList interface{}, params ...interface{}) *ParallelTask {
return defTaskMan.RunTask(taskName, isContinueWhenError, resultHandler, parallelCount, batchSize, userName, worker, itemList, params...) return defTaskMan.RunTask(taskName, isContinueWhenError, resultHandler, parallelCount, batchSize, userName, worker, itemList, params...)
} }