- tasksch error show taks name
- distinguish platform error
This commit is contained in:
@@ -297,11 +297,11 @@ func (v *VendorSync) SyncStoresCategory(ctx *jxcontext.Context, db *dao.DaoDB, v
|
||||
//
|
||||
func (v *VendorSync) SyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, skuIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
|
||||
globals.SugarLogger.Debug("SyncStoresSkus")
|
||||
return v.LoopStoresMap(ctx, db, "SyncStoresSkus", isAsync, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||||
return v.LoopStoresMap(ctx, db, "SyncStoresSkus顶层", isAsync, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||||
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
|
||||
if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil {
|
||||
if len(loopMapInfo.StoreMapList) > 1 {
|
||||
loopStoreTask := tasksch.NewSeqTask("SyncStoresSkus loop stores", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
|
||||
loopStoreTask := tasksch.NewSeqTask("SyncStoresSkus相同平台循环门店", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
|
||||
storeID := loopMapInfo.StoreMapList[step].StoreID
|
||||
_, err = handler.SyncStoreSkus(ctx, task, storeID, skuIDs, false, isContinueWhenError)
|
||||
return nil, err
|
||||
@@ -318,11 +318,11 @@ func (v *VendorSync) SyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendo
|
||||
|
||||
func (v *VendorSync) FullSyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
|
||||
globals.SugarLogger.Debug("FullSyncStoresSkus")
|
||||
return v.LoopStoresMap(ctx, db, "FullSyncStoresSkus", isAsync, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||||
return v.LoopStoresMap(ctx, db, "FullSyncStoresSkus顶层", isAsync, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||||
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
|
||||
if handler := v.GetSingleStoreHandler(loopMapInfo.VendorID); handler != nil {
|
||||
if len(loopMapInfo.StoreMapList) > 1 {
|
||||
loopStoreTask := tasksch.NewSeqTask("FullSyncStoresSkus loop stores", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
|
||||
loopStoreTask := tasksch.NewSeqTask("FullSyncStoresSkus相同平台循环门店", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
|
||||
storeID := loopMapInfo.StoreMapList[step].StoreID
|
||||
_, err = handler.FullSyncStoreSkus(ctx, task, storeID, false, isContinueWhenError)
|
||||
return nil, err
|
||||
|
||||
@@ -176,7 +176,7 @@ func (task *ParallelTask) Run() {
|
||||
}
|
||||
}
|
||||
if taskErr != nil {
|
||||
task.Err = NewTaskError(taskErr)
|
||||
task.Err = NewTaskError(task.Name, taskErr)
|
||||
} else {
|
||||
task.Err = task.buildTaskErrFromDetail()
|
||||
}
|
||||
|
||||
@@ -65,7 +65,7 @@ func (task *SeqTask) Run() {
|
||||
}
|
||||
}
|
||||
if taskErr != nil {
|
||||
task.Err = NewTaskError(taskErr)
|
||||
task.Err = NewTaskError(task.Name, taskErr)
|
||||
} else {
|
||||
task.Err = task.buildTaskErrFromDetail()
|
||||
}
|
||||
|
||||
@@ -56,16 +56,22 @@ type ITask interface {
|
||||
}
|
||||
|
||||
type TaskError struct {
|
||||
error
|
||||
name string
|
||||
errStr string
|
||||
}
|
||||
|
||||
func (t *TaskError) MarshalJSON() ([]byte, error) {
|
||||
return json.Marshal(t.Error())
|
||||
}
|
||||
|
||||
func NewTaskError(err error) *TaskError {
|
||||
func (t *TaskError) Error() string {
|
||||
return fmt.Sprintf("任务[%s]执行失败,错误详情:\n%s", t.name, t.errStr)
|
||||
}
|
||||
|
||||
func NewTaskError(name string, err error) *TaskError {
|
||||
return &TaskError{
|
||||
err,
|
||||
name: name,
|
||||
errStr: err.Error(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -301,7 +307,7 @@ func (t *BaseTask) setStatus(status int) {
|
||||
|
||||
func (t *BaseTask) buildTaskErrFromDetail() (err error) {
|
||||
if len(t.detailErrMsgList) > 0 {
|
||||
return NewTaskError(fmt.Errorf("设置了错误继续标志,部分操作失败,总任务数:%d,失败数:%d,以下为详情:\n%s", t.TotalItemCount, t.FailedItemCount, strings.Join(t.detailErrMsgList, "\n")))
|
||||
return NewTaskError(t.Name, fmt.Errorf("设置了错误继续标志,部分操作失败,总任务数:%d,失败数:%d,以下为详情:\n%s", t.TotalItemCount, t.FailedItemCount, strings.Join(t.detailErrMsgList, "\n")))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
)
|
||||
|
||||
func TestTaskError(t *testing.T) {
|
||||
err := NewTaskError(errors.New("hello"))
|
||||
err := NewTaskError("test", errors.New("hello"))
|
||||
fmt.Println(utils.Format4Output(err, false))
|
||||
fmt.Println(err.Error())
|
||||
}
|
||||
|
||||
@@ -175,7 +175,7 @@ func (p *PurchaseHandler) SyncStoreSkus(ctx *jxcontext.Context, parentTask tasks
|
||||
var storeSkuInfoList []*tStoreSkuFullInfo
|
||||
var num int64
|
||||
strStoreID := utils.Int2Str(storeID)
|
||||
rootTask := tasksch.NewSeqTask("SyncStoreSkus", userName, func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
|
||||
rootTask := tasksch.NewSeqTask("SyncStoreSkus饿百1", userName, func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
|
||||
if step == 0 {
|
||||
db := dao.GetDB()
|
||||
for i := 0; i < 3; i++ {
|
||||
@@ -194,7 +194,7 @@ func (p *PurchaseHandler) SyncStoreSkus(ctx *jxcontext.Context, parentTask tasks
|
||||
return nil, errors.New("不能创建商品所需的类别")
|
||||
}
|
||||
} else if step == 1 {
|
||||
task := tasksch.NewParallelTask("SyncStoreSkus skus", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError), userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||||
task := tasksch.NewParallelTask("SyncStoreSkus饿百2", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError), userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||||
storeSku := batchItemList[0].(*tStoreSkuFullInfo)
|
||||
updateFields := []string{model.FieldEbaiSyncStatus}
|
||||
isCreate := false
|
||||
|
||||
@@ -51,7 +51,7 @@ func (p *PurchaseHandler) SyncStoreSkus(ctx *jxcontext.Context, parentTask tasks
|
||||
if err = dao.GetRows(db, &storeSkus, sql, append(sqlParams, sqlWhereParams...)...); err != nil {
|
||||
return "", err
|
||||
}
|
||||
task := tasksch.NewParallelTask("SyncStoresSkus inner", tasksch.NewParallelConfig().SetBatchSize(jdapi.MaxStoreSkuBatchSize).SetIsContinueWhenError(isContinueWhenError), ctx.GetUserName(), func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||||
task := tasksch.NewParallelTask("SyncStoresSkus京东", tasksch.NewParallelConfig().SetBatchSize(jdapi.MaxStoreSkuBatchSize).SetIsContinueWhenError(isContinueWhenError), ctx.GetUserName(), func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||||
var skuPriceInfoList []*jdapi.SkuPriceInfo
|
||||
var skuVendibilityList []*jdapi.StockVendibility
|
||||
var skuStockList []*jdapi.SkuStock
|
||||
|
||||
Reference in New Issue
Block a user