- 尽量让异步任务的hint返回满足约定("", "0", "number", "taskID")

This commit is contained in:
gazebo
2019-05-07 14:19:51 +08:00
parent 45e17d4935
commit 98332eb6a1

View File

@@ -213,22 +213,23 @@ func (v *VendorSync) SyncStore(ctx *jxcontext.Context, db *dao.DaoDB, vendorID,
vendorID,
}
}
hint, err = v.LoopStoresMap(ctx, db, fmt.Sprintf("同步门店信息:%d", storeID), isAsync, false, vendorIDs, []int{storeID}, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
return v.LoopStoresMap(ctx, db, fmt.Sprintf("同步门店信息:%d", storeID), isAsync, false, vendorIDs, []int{storeID}, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
handler := v.GetStoreHandler(loopMapInfo.VendorID)
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), nil, ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
var resultList []interface{}
storeMap := batchItemList[0].(*model.StoreMap)
if err = handler.UpdateStore(db, storeMap.StoreID, userName); err == nil {
storeMap.SyncStatus = 0
_, err = dao.UpdateEntity(db, storeMap, model.FieldSyncStatus)
resultList = append(resultList, 1)
}
return nil, err
return resultList, err
}, loopMapInfo.StoreMapList)
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
return nil, err
return loopStoreTask.GetResult(0)
}
storeMap := loopMapInfo.StoreMapList[0]
if err = handler.UpdateStore(db, storeMap.StoreID, userName); err == nil {
@@ -236,9 +237,8 @@ func (v *VendorSync) SyncStore(ctx *jxcontext.Context, db *dao.DaoDB, vendorID,
_, err = dao.UpdateEntity(db, storeMap, model.FieldSyncStatus)
}
err = jxutils.AddVendorInfo2Err(err, loopMapInfo.VendorID)
return nil, err
return []interface{}{1}, err
})
return hint, err
}
func (v *VendorSync) SyncSku(ctx *jxcontext.Context, db *dao.DaoDB, nameID, skuID int, isAsync, isContinueWhenError bool, userName string) (hint string, err error) {
@@ -259,6 +259,7 @@ func (v *VendorSync) SyncSkus(ctx *jxcontext.Context, db *dao.DaoDB, nameIDs []i
globals.SugarLogger.Debugf("SyncSku trackInfo:%s, nameIDs:%v, skuIDs:%v, userName:%s", ctx.GetTrackInfo(), nameIDs, skuIDs, userName)
return v.LoopMultiStoresVendors(ctx, db, fmt.Sprintf("同步商品信息, nameIDs:%v, skuIDs:%v", nameIDs, skuIDs), isAsync, userName,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
var resultList []interface{}
vendorID := batchItemList[0].(int)
multiStoresHandler := v.GetMultiStoreHandler(vendorID)
syncStatusFieldName := dao.GetSyncStatusStructField(model.VendorNames[multiStoresHandler.GetVendorID()])
@@ -297,6 +298,7 @@ func (v *VendorSync) SyncSkus(ctx *jxcontext.Context, db *dao.DaoDB, nameIDs []i
// todo 同一skuName下的sku顺序处理的原因是京东SPU特殊类型必须要序列化同步才能正常处理, db可能会有多线程问题
task := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[vendorID]), tasksch.NewParallelConfig().SetParallelCount(10).SetIsContinueWhenError(isContinueWhenError), ctx,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
var resultList []interface{}
skuName := batchItemList[0].(*model.SkuName)
var skuList []*model.Sku
if err = dao.GetRows(db, &skuList, fmt.Sprintf(`
@@ -333,6 +335,8 @@ func (v *VendorSync) SyncSkus(ctx *jxcontext.Context, db *dao.DaoDB, nameIDs []i
refutil.SetObjFieldByName(sku, syncStatusFieldName, int8(0))
if _, err = dao.UpdateEntity(db, sku, updateFields...); err != nil {
break
} else {
resultList = append(resultList, 1)
}
}
}
@@ -342,12 +346,15 @@ func (v *VendorSync) SyncSkus(ctx *jxcontext.Context, db *dao.DaoDB, nameIDs []i
refutil.SetObjFieldByName(skuName, syncStatusFieldName, int8(0))
_, err = dao.UpdateEntity(db, skuName, syncStatusFieldName)
}
return nil, err
return resultList, err
}, skuNameList)
t.AddChild(task).Run()
_, err = task.GetResult(0)
result, err2 := task.GetResult(0)
if err = err2; err == nil {
resultList = append(resultList, result...)
}
}
return nil, err
return resultList, err
})
}
@@ -506,18 +513,27 @@ func (v *VendorSync) LoopStoresMap(ctx *jxcontext.Context, db *dao.DaoDB, taskNa
task := tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, handler, loopInfoList)
tasksch.HandleTask(task, nil, isManageIt).Run()
if !isAsync {
_, err = task.GetResult(0)
if _, err = task.GetResult(0); err == nil {
hint = "1" // todo 暂时这样
}
} else {
hint = task.ID
}
return task.ID, makeSyncError(err)
return hint, makeSyncError(err)
}
func (v *VendorSync) LoopMultiStoresVendors(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync bool, userName string, handler tasksch.WorkFunc) (hint string, err error) {
task := tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, handler, v.MultiStoreVendorIDs)
tasksch.HandleTask(task, nil, true).Run()
if !isAsync {
_, err = task.GetResult(0)
result, err2 := task.GetResult(0)
if err = err2; err == nil {
hint = utils.Int2Str(len(result))
}
} else {
hint = task.ID
}
return task.ID, makeSyncError(err)
return hint, makeSyncError(err)
}
func (v *VendorSync) RefreshAllSkusID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int, storeIDs []int) (hint string, err error) {