- 将同步任务的名字改为中文
This commit is contained in:
@@ -129,9 +129,10 @@ func (v *VendorSync) GetSingleStoreHandler(vendorID int) partner.ISingleStoreHan
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *VendorSync) syncCategories(ctx *jxcontext.Context, parentTask tasksch.ITask, multiStoresHandler partner.IMultipleStoresHandler, db *dao.DaoDB, cats []*model.SkuCategory, userName string) (err error) {
|
||||
syncStatusFieldName := dao.GetSyncStatusStructField(model.VendorNames[multiStoresHandler.GetVendorID()])
|
||||
task := tasksch.NewParallelTask("syncCategories", nil, ctx,
|
||||
func (v *VendorSync) syncCategories(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorID int, db *dao.DaoDB, cats []*model.SkuCategory, userName string) (err error) {
|
||||
multiStoresHandler := v.GetMultiStoreHandler(vendorID)
|
||||
syncStatusFieldName := dao.GetSyncStatusStructField(model.VendorNames[vendorID])
|
||||
task := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[vendorID]), nil, ctx,
|
||||
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||||
cat := batchItemList[0].(*model.SkuCategory)
|
||||
updateFields := []string{syncStatusFieldName}
|
||||
@@ -162,8 +163,9 @@ func (v *VendorSync) syncCategories(ctx *jxcontext.Context, parentTask tasksch.I
|
||||
|
||||
func (v *VendorSync) SyncCategory(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) {
|
||||
globals.SugarLogger.Debug(v.MultiStoreVendorIDs)
|
||||
hint, err = v.LoopMultiStoresVendors(ctx, db, "SyncCategory", isAsync, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||||
multiStoresHandler := v.GetMultiStoreHandler(batchItemList[0].(int))
|
||||
hint, err = v.LoopMultiStoresVendors(ctx, db, "同步分类信息", isAsync, userName,
|
||||
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||||
vendorID := batchItemList[0].(int)
|
||||
var cats []*model.SkuCategory
|
||||
cond := make(map[string]interface{})
|
||||
if categoryID > 0 {
|
||||
@@ -173,7 +175,7 @@ func (v *VendorSync) SyncCategory(ctx *jxcontext.Context, db *dao.DaoDB, categor
|
||||
}
|
||||
err := dao.GetEntitiesByKV(db, &cats, cond, true)
|
||||
if err == nil {
|
||||
err = v.syncCategories(ctx, t, multiStoresHandler, db, cats, userName)
|
||||
err = v.syncCategories(ctx, t, vendorID, db, cats, userName)
|
||||
}
|
||||
if err != nil || categoryID > 0 {
|
||||
return nil, err
|
||||
@@ -181,7 +183,7 @@ func (v *VendorSync) SyncCategory(ctx *jxcontext.Context, db *dao.DaoDB, categor
|
||||
cond[model.FieldLevel] = 2
|
||||
err = dao.GetEntitiesByKV(db, &cats, cond, true)
|
||||
if err == nil {
|
||||
err = v.syncCategories(ctx, t, multiStoresHandler, db, cats, userName)
|
||||
err = v.syncCategories(ctx, t, vendorID, db, cats, userName)
|
||||
}
|
||||
return nil, err
|
||||
})
|
||||
@@ -211,11 +213,11 @@ func (v *VendorSync) SyncStore(ctx *jxcontext.Context, db *dao.DaoDB, vendorID,
|
||||
vendorID,
|
||||
}
|
||||
}
|
||||
hint, err = v.LoopStoresMap(ctx, db, "SyncStore", isAsync, false, vendorIDs, []int{storeID}, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||||
hint, err = v.LoopStoresMap(ctx, db, "同步门店信息", isAsync, false, vendorIDs, []int{storeID}, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||||
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
|
||||
handler := v.GetStoreHandler(loopMapInfo.VendorID)
|
||||
if len(loopMapInfo.StoreMapList) > 1 {
|
||||
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("SyncStore loop store %s", model.VendorChineseNames[loopMapInfo.VendorID]), nil, ctx,
|
||||
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), nil, ctx,
|
||||
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||||
storeMap := batchItemList[0].(*model.StoreMap)
|
||||
if err = handler.UpdateStore(db, storeMap.StoreID, userName); err == nil {
|
||||
@@ -240,8 +242,10 @@ func (v *VendorSync) SyncStore(ctx *jxcontext.Context, db *dao.DaoDB, vendorID,
|
||||
|
||||
func (v *VendorSync) SyncSku(ctx *jxcontext.Context, db *dao.DaoDB, nameID, skuID int, isAsync, isContinueWhenError bool, userName string) (hint string, err error) {
|
||||
globals.SugarLogger.Debugf("SyncSku trackInfo:%s, nameID:%d, skuID:%d, userName:%s", ctx.GetTrackInfo(), nameID, skuID, userName)
|
||||
return v.LoopMultiStoresVendors(ctx, db, "SyncSku", isAsync, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||||
multiStoresHandler := v.GetMultiStoreHandler(batchItemList[0].(int))
|
||||
return v.LoopMultiStoresVendors(ctx, db, "同步商品信息", isAsync, userName,
|
||||
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||||
vendorID := batchItemList[0].(int)
|
||||
multiStoresHandler := v.GetMultiStoreHandler(vendorID)
|
||||
syncStatusFieldName := dao.GetSyncStatusStructField(model.VendorNames[multiStoresHandler.GetVendorID()])
|
||||
dbField := dao.ConvertDBFieldPrefix(model.VendorNames[multiStoresHandler.GetVendorID()])
|
||||
sql := fmt.Sprintf(`
|
||||
@@ -266,7 +270,7 @@ func (v *VendorSync) SyncSku(ctx *jxcontext.Context, db *dao.DaoDB, nameID, skuI
|
||||
err := dao.GetRows(db, &skuNameList, sql, sqlParams...)
|
||||
if err == nil && len(skuNameList) > 0 {
|
||||
// todo 同一skuName下的sku顺序处理的原因是京东SPU特殊类型必须要序列化同步才能正常处理, db可能会有多线程问题
|
||||
task := tasksch.NewParallelTask("SyncSku loop sku", tasksch.NewParallelConfig().SetParallelCount(10).SetIsContinueWhenError(isContinueWhenError), ctx,
|
||||
task := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[vendorID]), tasksch.NewParallelConfig().SetParallelCount(10).SetIsContinueWhenError(isContinueWhenError), ctx,
|
||||
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||||
skuName := batchItemList[0].(*model.SkuName)
|
||||
var skuList []*model.Sku
|
||||
@@ -325,11 +329,12 @@ func (v *VendorSync) SyncSku(ctx *jxcontext.Context, db *dao.DaoDB, nameID, skuI
|
||||
func (v *VendorSync) SyncStoresCategory(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isAsync bool) (hint string, err error) {
|
||||
globals.SugarLogger.Debug("SyncStoresCategory")
|
||||
isManageIt := len(storeIDs) != 1
|
||||
return v.LoopStoresMap(ctx, db, "SyncStoresCategory", isAsync, isManageIt, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||||
return v.LoopStoresMap(ctx, db, "同步门店分类信息", isAsync, isManageIt, vendorIDs, storeIDs,
|
||||
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||||
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
|
||||
if handler := v.GetSingleStoreHandler(loopMapInfo.VendorID); handler != nil {
|
||||
if len(loopMapInfo.StoreMapList) > 1 {
|
||||
loopStoreTask := tasksch.NewSeqTask("SyncStoresCategory loop stores", ctx,
|
||||
loopStoreTask := tasksch.NewSeqTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), ctx,
|
||||
func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
|
||||
storeID := loopMapInfo.StoreMapList[step].StoreID
|
||||
_, err = handler.SyncStoreCategory(ctx, task, storeID, false)
|
||||
@@ -349,11 +354,12 @@ func (v *VendorSync) SyncStoresCategory(ctx *jxcontext.Context, db *dao.DaoDB, v
|
||||
func (v *VendorSync) SyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, skuIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
|
||||
globals.SugarLogger.Debug("SyncStoresSkus")
|
||||
isManageIt := len(storeIDs) != 1 || len(skuIDs) == 0 || len(skuIDs) > 8
|
||||
return v.LoopStoresMap(ctx, db, "SyncStoresSkus顶层", isAsync, isManageIt, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||||
return v.LoopStoresMap(ctx, db, "同步门店商品信息", isAsync, isManageIt, vendorIDs, storeIDs,
|
||||
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||||
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
|
||||
if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil {
|
||||
if len(loopMapInfo.StoreMapList) > 1 {
|
||||
loopStoreTask := tasksch.NewSeqTask("SyncStoresSkus相同平台循环门店", ctx,
|
||||
loopStoreTask := tasksch.NewSeqTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), ctx,
|
||||
func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
|
||||
storeID := loopMapInfo.StoreMapList[step].StoreID
|
||||
if _, err = handler.SyncStoreSkus(ctx, task, storeID, skuIDs, false, isContinueWhenError); err != nil {
|
||||
@@ -376,11 +382,12 @@ func (v *VendorSync) SyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendo
|
||||
|
||||
func (v *VendorSync) FullSyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
|
||||
globals.SugarLogger.Debug("FullSyncStoresSkus")
|
||||
return v.LoopStoresMap(ctx, db, "FullSyncStoresSkus顶层", isAsync, true, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||||
return v.LoopStoresMap(ctx, db, "初始化门店商品信息", isAsync, true, vendorIDs, storeIDs,
|
||||
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||||
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
|
||||
if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil {
|
||||
if len(loopMapInfo.StoreMapList) > 1 {
|
||||
loopStoreTask := tasksch.NewSeqTask("FullSyncStoresSkus相同平台循环门店", ctx,
|
||||
loopStoreTask := tasksch.NewSeqTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), ctx,
|
||||
func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
|
||||
storeID := loopMapInfo.StoreMapList[step].StoreID
|
||||
_, err = handler.FullSyncStoreSkus(ctx, task, storeID, false, isContinueWhenError)
|
||||
@@ -398,11 +405,12 @@ func (v *VendorSync) FullSyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, v
|
||||
|
||||
func (v *VendorSync) DeleteRemoteStoreSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
|
||||
globals.SugarLogger.Debug("DeleteRemoteStoreSkus")
|
||||
return v.LoopStoresMap(ctx, db, "DeleteRemoteStoreSkus顶层", isAsync, true, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||||
return v.LoopStoresMap(ctx, db, "删除远程门店商品信息", isAsync, true, vendorIDs, storeIDs,
|
||||
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||||
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
|
||||
if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil {
|
||||
if len(loopMapInfo.StoreMapList) > 1 {
|
||||
loopStoreTask := tasksch.NewSeqTask("DeleteRemoteStoreSkus相同平台循环门店", ctx,
|
||||
loopStoreTask := tasksch.NewSeqTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), ctx,
|
||||
func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
|
||||
storeID := loopMapInfo.StoreMapList[step].StoreID
|
||||
_, err = handler.DeleteRemoteStoreSkus(ctx, task, storeID, false, isContinueWhenError)
|
||||
@@ -457,6 +465,9 @@ func (v *VendorSync) LoopStoresMap(ctx *jxcontext.Context, db *dao.DaoDB, taskNa
|
||||
}
|
||||
index++
|
||||
}
|
||||
if len(loopInfoList) == 1 {
|
||||
taskName = fmt.Sprintf("%s,处理平台%s", taskName, model.VendorChineseNames[loopInfoList[0].VendorID])
|
||||
}
|
||||
task := tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, handler, loopInfoList)
|
||||
tasksch.HandleTask(task, nil, isManageIt).Run()
|
||||
if !isAsync {
|
||||
|
||||
Reference in New Issue
Block a user