Merge remote-tracking branch 'origin/mark' into don

This commit is contained in:
Rosy-zhudan
2019-08-07 09:06:50 +08:00
39 changed files with 650 additions and 165 deletions

View File

@@ -57,11 +57,7 @@ func PrintOrderByOrder(ctx *jxcontext.Context, order *model.GoodsOrder) (printRe
PrintResult: partner.PrintResultNoPrinter,
}, nil
}
if globals.EnableStoreWrite {
printResult, err = handler.PrintOrder(ctx, store, order)
} else {
err = fmt.Errorf("当前环境不支持打印")
}
printResult, err = handler.PrintOrder(ctx, store, order)
if err == nil {
dao.SetOrderPrintFlag(db, ctx.GetUserName(), order.VendorOrderID, order.VendorID, true)
}

View File

@@ -15,6 +15,7 @@ const (
)
type WorkFunc func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error)
type WorkFunc2 func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, successCount int, err error)
type ResultHandlerFunc func(taskName string, result []interface{}, err error)
type ParallelConfig struct {
@@ -28,7 +29,7 @@ type ParallelConfig struct {
type ParallelTask struct {
BaseTask
worker WorkFunc
worker WorkFunc2
jobList [][]interface{}
taskChan chan []interface{}
subFinishChan chan interface{}
@@ -69,7 +70,7 @@ func (c *ParallelConfig) SetIsContinueWhenError(isContinueWhenError bool) *Paral
// return c
// }
func NewParallelTask(taskName string, config *ParallelConfig, ctx *jxcontext.Context, worker WorkFunc, itemList interface{}, params ...interface{}) *ParallelTask {
func NewParallelTask2(taskName string, config *ParallelConfig, ctx *jxcontext.Context, worker WorkFunc2, itemList interface{}, params ...interface{}) *ParallelTask {
if config == nil {
config = NewParallelConfig()
}
@@ -95,6 +96,14 @@ func NewParallelTask(taskName string, config *ParallelConfig, ctx *jxcontext.Con
return task
}
func NewParallelTask(taskName string, config *ParallelConfig, ctx *jxcontext.Context, worker WorkFunc, itemList interface{}, params ...interface{}) *ParallelTask {
return NewParallelTask2(taskName, config, ctx,
func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, successCount int, err error) {
retVal, err = worker(task, batchItemList, params...)
return retVal, 0, err
}, itemList, params...)
}
func (task *ParallelTask) Run() {
task.run(func() {
globals.SugarLogger.Debugf("ParallelTask.Run %s", task.Name)
@@ -113,11 +122,11 @@ func (task *ParallelTask) Run() {
chanRetVal = retVal
goto end
} else {
result, err := task.callWorker(func() (retVal interface{}, err error) {
result, successCount, err := task.callWorker2(func() (retVal interface{}, successCount int, err error) {
return task.worker(task, job, task.params...)
})
// globals.SugarLogger.Debugf("ParallelTask.Run %s, after call worker result:%v, err:%v", task.Name, result, err)
task.finishedOneJob(len(job), err)
task.finishedOneJob(len(job), successCount, err)
if err != nil { // 出错
// globals.SugarLogger.Infof("ParallelTask.Run %s, subtask(job:%s, params:%s) result:%v, failed with error:%v", task.Name, utils.Format4Output(job, true), utils.Format4Output(task.params, true), result, err)
if task.IsContinueWhenError {

View File

@@ -40,7 +40,7 @@ func (task *SeqTask) Run() {
result, err := task.callWorker(func() (retVal interface{}, err error) {
return task.worker(task, i, task.params...)
})
task.finishedOneJob(1, err)
task.finishedOneJob(1, 0, err)
if err != nil {
// globals.SugarLogger.Infof("SeqTask.Run %s step:%d failed with error:%v", task.Name, i, err)
if task.IsContinueWhenError {

View File

@@ -434,7 +434,8 @@ func (t *BaseTask) run(taskHandler func()) {
}
}
func (t *BaseTask) finishedOneJob(itemCount int, err error) {
// successCount表示在返回错误的情况下部分成功的个数如果没有返回错误则successCount无意义
func (t *BaseTask) finishedOneJob(itemCount, successCount int, err error) {
t.locker.Lock()
defer t.locker.Unlock()
@@ -443,7 +444,8 @@ func (t *BaseTask) finishedOneJob(itemCount int, err error) {
t.FinishedItemCount += itemCount
t.FinishedJobCount++
} else {
t.FailedItemCount += itemCount
t.FinishedItemCount += successCount
t.FailedItemCount += itemCount - successCount
t.FailedJobCount++
}
}
@@ -473,6 +475,15 @@ func (task *BaseTask) callWorker(worker func() (retVal interface{}, err error))
err = fmt.Errorf("panic, r:%v", r)
}
}()
retVal, err = worker()
return retVal, err
return worker()
}
func (task *BaseTask) callWorker2(worker func() (retVal interface{}, successCount int, err error)) (retVal interface{}, successCount int, err error) {
defer func() {
if r := recover(); r != nil {
globals.SugarLogger.Errorf("callWorker panic:%v", r)
err = fmt.Errorf("panic, r:%v", r)
}
}()
return worker()
}