From ceb66bc48a876d0013ccea51ca8ef9a835bbb1b2 Mon Sep 17 00:00:00 2001 From: gazebo Date: Mon, 3 Dec 2018 16:19:51 +0800 Subject: [PATCH] - tasksch error show taks name - distinguish platform error --- business/jxstore/cms/sync.go | 8 ++++---- business/jxutils/tasksch/parallel_task.go | 2 +- business/jxutils/tasksch/sequence_task.go | 2 +- business/jxutils/tasksch/task.go | 14 ++++++++++---- business/jxutils/tasksch/task_test.go | 3 ++- business/partner/purchase/ebai/store_sku.go | 4 ++-- business/partner/purchase/jd/store_sku.go | 2 +- 7 files changed, 21 insertions(+), 14 deletions(-) diff --git a/business/jxstore/cms/sync.go b/business/jxstore/cms/sync.go index 0083782ae..f47956376 100644 --- a/business/jxstore/cms/sync.go +++ b/business/jxstore/cms/sync.go @@ -297,11 +297,11 @@ func (v *VendorSync) SyncStoresCategory(ctx *jxcontext.Context, db *dao.DaoDB, v // func (v *VendorSync) SyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, skuIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) { globals.SugarLogger.Debug("SyncStoresSkus") - return v.LoopStoresMap(ctx, db, "SyncStoresSkus", isAsync, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { + return v.LoopStoresMap(ctx, db, "SyncStoresSkus顶层", isAsync, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { loopMapInfo := batchItemList[0].(*LoopStoreMapInfo) if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil { if len(loopMapInfo.StoreMapList) > 1 { - loopStoreTask := tasksch.NewSeqTask("SyncStoresSkus loop stores", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { + loopStoreTask := tasksch.NewSeqTask("SyncStoresSkus相同平台循环门店", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { storeID := loopMapInfo.StoreMapList[step].StoreID _, err = handler.SyncStoreSkus(ctx, task, storeID, skuIDs, false, isContinueWhenError) return nil, err @@ -318,11 +318,11 @@ func (v *VendorSync) SyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendo func (v *VendorSync) FullSyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) { globals.SugarLogger.Debug("FullSyncStoresSkus") - return v.LoopStoresMap(ctx, db, "FullSyncStoresSkus", isAsync, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { + return v.LoopStoresMap(ctx, db, "FullSyncStoresSkus顶层", isAsync, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { loopMapInfo := batchItemList[0].(*LoopStoreMapInfo) if handler := v.GetSingleStoreHandler(loopMapInfo.VendorID); handler != nil { if len(loopMapInfo.StoreMapList) > 1 { - loopStoreTask := tasksch.NewSeqTask("FullSyncStoresSkus loop stores", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { + loopStoreTask := tasksch.NewSeqTask("FullSyncStoresSkus相同平台循环门店", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { storeID := loopMapInfo.StoreMapList[step].StoreID _, err = handler.FullSyncStoreSkus(ctx, task, storeID, false, isContinueWhenError) return nil, err diff --git a/business/jxutils/tasksch/parallel_task.go b/business/jxutils/tasksch/parallel_task.go index e263d88fd..71eff0da0 100644 --- a/business/jxutils/tasksch/parallel_task.go +++ b/business/jxutils/tasksch/parallel_task.go @@ -176,7 +176,7 @@ func (task *ParallelTask) Run() { } } if taskErr != nil { - task.Err = NewTaskError(taskErr) + task.Err = NewTaskError(task.Name, taskErr) } else { task.Err = task.buildTaskErrFromDetail() } diff --git a/business/jxutils/tasksch/sequence_task.go b/business/jxutils/tasksch/sequence_task.go index 45aefe921..ae8903394 100644 --- a/business/jxutils/tasksch/sequence_task.go +++ b/business/jxutils/tasksch/sequence_task.go @@ -65,7 +65,7 @@ func (task *SeqTask) Run() { } } if taskErr != nil { - task.Err = NewTaskError(taskErr) + task.Err = NewTaskError(task.Name, taskErr) } else { task.Err = task.buildTaskErrFromDetail() } diff --git a/business/jxutils/tasksch/task.go b/business/jxutils/tasksch/task.go index b6cc70afd..b60ab847f 100644 --- a/business/jxutils/tasksch/task.go +++ b/business/jxutils/tasksch/task.go @@ -56,16 +56,22 @@ type ITask interface { } type TaskError struct { - error + name string + errStr string } func (t *TaskError) MarshalJSON() ([]byte, error) { return json.Marshal(t.Error()) } -func NewTaskError(err error) *TaskError { +func (t *TaskError) Error() string { + return fmt.Sprintf("任务[%s]执行失败,错误详情:\n%s", t.name, t.errStr) +} + +func NewTaskError(name string, err error) *TaskError { return &TaskError{ - err, + name: name, + errStr: err.Error(), } } @@ -301,7 +307,7 @@ func (t *BaseTask) setStatus(status int) { func (t *BaseTask) buildTaskErrFromDetail() (err error) { if len(t.detailErrMsgList) > 0 { - return NewTaskError(fmt.Errorf("设置了错误继续标志,部分操作失败,总任务数:%d,失败数:%d,以下为详情:\n%s", t.TotalItemCount, t.FailedItemCount, strings.Join(t.detailErrMsgList, "\n"))) + return NewTaskError(t.Name, fmt.Errorf("设置了错误继续标志,部分操作失败,总任务数:%d,失败数:%d,以下为详情:\n%s", t.TotalItemCount, t.FailedItemCount, strings.Join(t.detailErrMsgList, "\n"))) } return nil } diff --git a/business/jxutils/tasksch/task_test.go b/business/jxutils/tasksch/task_test.go index 745c3ee9d..788716526 100644 --- a/business/jxutils/tasksch/task_test.go +++ b/business/jxutils/tasksch/task_test.go @@ -9,6 +9,7 @@ import ( ) func TestTaskError(t *testing.T) { - err := NewTaskError(errors.New("hello")) + err := NewTaskError("test", errors.New("hello")) fmt.Println(utils.Format4Output(err, false)) + fmt.Println(err.Error()) } diff --git a/business/partner/purchase/ebai/store_sku.go b/business/partner/purchase/ebai/store_sku.go index 6652396e9..351ed49ec 100644 --- a/business/partner/purchase/ebai/store_sku.go +++ b/business/partner/purchase/ebai/store_sku.go @@ -175,7 +175,7 @@ func (p *PurchaseHandler) SyncStoreSkus(ctx *jxcontext.Context, parentTask tasks var storeSkuInfoList []*tStoreSkuFullInfo var num int64 strStoreID := utils.Int2Str(storeID) - rootTask := tasksch.NewSeqTask("SyncStoreSkus", userName, func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { + rootTask := tasksch.NewSeqTask("SyncStoreSkus饿百1", userName, func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { if step == 0 { db := dao.GetDB() for i := 0; i < 3; i++ { @@ -194,7 +194,7 @@ func (p *PurchaseHandler) SyncStoreSkus(ctx *jxcontext.Context, parentTask tasks return nil, errors.New("不能创建商品所需的类别") } } else if step == 1 { - task := tasksch.NewParallelTask("SyncStoreSkus skus", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError), userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + task := tasksch.NewParallelTask("SyncStoreSkus饿百2", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError), userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { storeSku := batchItemList[0].(*tStoreSkuFullInfo) updateFields := []string{model.FieldEbaiSyncStatus} isCreate := false diff --git a/business/partner/purchase/jd/store_sku.go b/business/partner/purchase/jd/store_sku.go index 1f15a2941..83b3e3c6e 100644 --- a/business/partner/purchase/jd/store_sku.go +++ b/business/partner/purchase/jd/store_sku.go @@ -51,7 +51,7 @@ func (p *PurchaseHandler) SyncStoreSkus(ctx *jxcontext.Context, parentTask tasks if err = dao.GetRows(db, &storeSkus, sql, append(sqlParams, sqlWhereParams...)...); err != nil { return "", err } - task := tasksch.NewParallelTask("SyncStoresSkus inner", tasksch.NewParallelConfig().SetBatchSize(jdapi.MaxStoreSkuBatchSize).SetIsContinueWhenError(isContinueWhenError), ctx.GetUserName(), func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { + task := tasksch.NewParallelTask("SyncStoresSkus京东", tasksch.NewParallelConfig().SetBatchSize(jdapi.MaxStoreSkuBatchSize).SetIsContinueWhenError(isContinueWhenError), ctx.GetUserName(), func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { var skuPriceInfoList []*jdapi.SkuPriceInfo var skuVendibilityList []*jdapi.StockVendibility var skuStockList []*jdapi.SkuStock