diff --git a/business/jxstore/cms/sync_store_sku.go b/business/jxstore/cms/sync_store_sku.go index f987a76eb..225407115 100644 --- a/business/jxstore/cms/sync_store_sku.go +++ b/business/jxstore/cms/sync_store_sku.go @@ -3,8 +3,11 @@ package cms import ( "fmt" + "git.rosy.net.cn/jx-callback/business/jxutils" + "git.rosy.net.cn/jx-callback/business/partner/putils" "git.rosy.net.cn/jx-callback/globals" + "git.rosy.net.cn/jx-callback/globals/refutil" "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" "git.rosy.net.cn/jx-callback/business/jxutils/tasksch" @@ -13,10 +16,337 @@ import ( "git.rosy.net.cn/jx-callback/business/partner" ) -func SyncStoreSkuNew(ctx *jxcontext.Context, vendorID, storeID int, vendorStoreID string, nameIDs, skuIDs []int) (hint string, err error) { +func CreateStoreCategoryByStoreSku(ctx *jxcontext.Context, vendorID, storeID int, vendorStoreID string, nameIDs, skuIDs []int) (err error) { + db := dao.GetDB() + dao.Begin(db) + defer func() { + if r := recover(); r != nil || err != nil { + dao.Rollback(db) + if r != nil { + panic(r) + } + } + }() + for i := 0; i < 2; i++ { + localCats, err2 := dao.GetSkusCategories(db, model.VendorIDMTWM, storeID, skuIDs, i+1) + if err = err2; err != nil { + return err + } + for _, v := range localCats { + if v.MapID == 0 { + if err = dao.AddStoreCategoryMap(db, storeID, v.ID, vendorID, "", model.SyncFlagNewMask, ctx.GetUserName()); err != nil { + return err + } + } + } + } + dao.Commit(db) + return err +} + +func SyncStorCategories(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorID, storeID int, vendorStoreID string, nameIDs, skuIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) { + globals.SugarLogger.Debugf("SyncStorCategories %s storeID:%d, userName:%s", storeID, model.VendorChineseNames[vendorID], ctx.GetUserName()) + handler := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler) + num := 0 + db := dao.GetDB() + rootTask := tasksch.NewSeqTask(fmt.Sprintf("%s SyncStoreCategory step1", model.VendorChineseNames[vendorID]), ctx, + func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { + level := step + 1 + catList, err := dao.GetStoreCategories(db, vendorID, storeID, level) + if len(catList) > 0 { + num += len(catList) + task := tasksch.NewParallelTask(fmt.Sprintf("%s SyncStoreCategory step2, level=%d", model.VendorChineseNames[vendorID], level), + tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError), ctx, + func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + updateFields := []string{dao.GetSyncStatusStructField(model.VendorNames[vendorID])} + syncStatusFieldName := dao.GetVendorThingIDStructField(model.VendorNames[vendorID]) + catInfo := batchItemList[0].(*dao.SkuStoreCatInfo) + storeCatMap := &model.StoreSkuCategoryMap{} + storeCatMap.ID = catInfo.MapID + if model.IsSyncStatusDelete(catInfo.StoreCatSyncStatus) { // 删除 + if model.IsSyncStatusDelete(catInfo.StoreCatSyncStatus) && !dao.IsVendorThingIDEmpty(catInfo.VendorCatID) { + err = handler.DeleteStoreCategory(ctx, storeID, vendorStoreID, catInfo.VendorCatID) + } + } else if model.IsSyncStatusNew(catInfo.StoreCatSyncStatus) { // 新增 + if err = handler.CreateStoreCategory(ctx, storeID, vendorStoreID, catInfo); err == nil { + refutil.SetObjFieldByName(storeCatMap, syncStatusFieldName, catInfo.VendorCatID) + updateFields = append(updateFields, syncStatusFieldName) + } + } else if model.IsSyncStatusUpdate(catInfo.StoreCatSyncStatus) { // 修改 + if err = handler.UpdateStoreCategory(ctx, storeID, vendorStoreID, catInfo); err == nil { + refutil.SetObjFieldByName(storeCatMap, syncStatusFieldName, catInfo.VendorCatID) + updateFields = append(updateFields, syncStatusFieldName) + } + } + if err == nil { + _, err = dao.UpdateEntity(db, storeCatMap, updateFields...) + } + return nil, err + }, catList) + rootTask.AddChild(task).Run() + _, err = task.GetResult(0) + } + return nil, err + }, 2) + tasksch.AddChild(parentTask, rootTask).Run() + if !isAsync { + _, err = rootTask.GetResult(0) + } else { + hint = rootTask.GetID() + } return hint, err } +func SyncStoreSkuNew(ctx *jxcontext.Context, vendorID, storeID int, vendorStoreID string, nameIDs, skuIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) { + singleStoreHandler, _ := partner.GetPrinterPlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler) + if singleStoreHandler != nil { + if err = CreateStoreCategoryByStoreSku(ctx, vendorID, storeID, vendorStoreID, nameIDs, skuIDs); err != nil { + return "", err + } + } + task := tasksch.NewSeqTask("SyncStoreSkuNew", ctx, + func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { + switch step { + case 0: + if singleStoreHandler != nil { + _, err = SyncStorCategories(ctx, task, vendorID, storeID, vendorStoreID, nameIDs, skuIDs, false, isContinueWhenError) + } + case 1: + err = syncStoreSkuNew(ctx, task, vendorID, storeID, nameIDs, skuIDs, isContinueWhenError) + } + return result, err + }, 2) + tasksch.HandleTask(task, nil, true).Run() + if !isAsync { + _, err = task.GetResult(0) + } else { + hint = task.GetID() + } + return hint, err +} + +func FullSyncStoreSkuNew(ctx *jxcontext.Context, vendorID, storeID int, vendorStoreID string, isAsync, isContinueWhenError bool) (hint string, err error) { + task := tasksch.NewParallelTask("FullSyncStoreSkuNew", tasksch.NewParallelConfig().SetParallelCount(1).SetIsContinueWhenError(isContinueWhenError), ctx, + func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + step := batchItemList[0].(int) + switch step { + case 0: + _, err = ClearRemoteStoreStuffAndSetNew(ctx, task, vendorID, storeID, vendorStoreID, false, isContinueWhenError) + case 1: + _, err = SyncStoreSkuNew(ctx, vendorID, storeID, vendorStoreID, nil, nil, false, isContinueWhenError) + } + return retVal, err + }, []int{0, 1}) + tasksch.HandleTask(task, nil, true).Run() + if !isAsync { + _, err = task.GetResult(0) + } else { + hint = task.GetID() + } + return hint, err +} + +func storeSkuSyncInfo2Bare(inSku *dao.StoreSkuSyncInfo) (outSku *partner.StoreSkuInfo) { + outSku = &partner.StoreSkuInfo{ + SkuID: inSku.SkuID, + VendorSkuID: inSku.VendorSkuID, + NameID: inSku.NameID, + VendorNameID: inSku.VendorNameID, + + Status: jxutils.MergeSkuStatus(inSku.Status, inSku.StoreSkuStatus), + Price: inSku.VendorPrice, + } + if !model.IsSyncStatusDelete(inSku.StoreSkuSyncStatus) { + outSku.Stock = model.MaxStoreSkuStockQty + } + return outSku +} + +func calVendorPrice4StoreSku(inSku *dao.StoreSkuSyncInfo, pricePercentagePack model.PricePercentagePack, pricePercentage int) (outSku *dao.StoreSkuSyncInfo) { + pricePercentage = jxutils.GetPricePercentage(pricePercentagePack, int(inSku.Price), pricePercentage) + inSku.VendorPrice = int64(jxutils.CaculateSkuVendorPrice(int(inSku.Price), pricePercentage, 0)) + return inSku +} + +func sku2Update(vendorID int, sku *dao.StoreSkuSyncInfo, syncStatus int8) (item *dao.KVUpdateItem) { + newSyncStatus := int8(0) + if syncStatus&(model.SyncFlagDeletedMask|model.SyncFlagNewMask|model.SyncFlagModifiedMask) != 0 { + newSyncStatus = 0 + } else { + newSyncStatus = sku.StoreSkuSyncStatus & ^syncStatus + } + kvs := map[string]interface{}{ + dao.GetSyncStatusStructField(model.VendorNames[vendorID]): newSyncStatus, + } + if syncStatus == model.SyncFlagNewMask { + kvs[dao.GetVendorThingIDStructField(model.VendorNames[vendorID])] = sku.VendorSkuID + } + storeSku := &model.StoreSkuBind{} + storeSku.ID = sku.BindID + item = &dao.KVUpdateItem{ + Item: storeSku, + KVs: kvs, + } + return item +} + +func updateStoreSku(db *dao.DaoDB, vendorID int, storeSkuList []*dao.StoreSkuSyncInfo, syncStatus int8) (num int64, err error) { + if len(storeSkuList) > 0 { + updateItemList := make([]*dao.KVUpdateItem, len(storeSkuList)) + for k, v := range storeSkuList { + updateItemList[k] = sku2Update(vendorID, v, syncStatus) + } + num, err = dao.BatchUpdateEntityByKV(db, updateItemList) + } + return num, err +} + +func syncStoreSkuNew(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorID, storeID int, nameIDs, skuIDs []int, isContinueWhenError bool) (err error) { + db := dao.GetDB() + storeDetail, err := dao.GetStoreDetail(db, storeID, vendorID) + if err != nil { + return err + } + vendorStoreID := storeDetail.VendorStoreID + + skus, err := dao.GetStoreSkus(db, vendorID, storeID, skuIDs) + if err != nil { + return err + } + + singleStoreHandler, _ := partner.GetPrinterPlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler) + storeSkuHandler := partner.GetPrinterPlatformFromVendorID(vendorID).(partner.IPurchasePlatformStoreSkuHandler) + + var ( + createList, updateList []*dao.StoreSkuSyncInfo + deleteList, stockList, statusList, priceList []*partner.StoreSkuInfo + updateItems []*dao.KVUpdateItem + ) + skuMap := make(map[*partner.StoreSkuInfo]*dao.StoreSkuSyncInfo) + + for _, sku := range skus { + var bareSku *partner.StoreSkuInfo + if model.IsSyncStatusDelete(sku.StoreSkuSyncStatus) { + if !dao.IsVendorThingIDEmpty(sku.VendorSkuID) { + bareSku = storeSkuSyncInfo2Bare(sku) + if singleStoreHandler == nil { + stockList = append(stockList, bareSku) + } else { + deleteList = append(deleteList, bareSku) + } + } else { + updateItems = append(updateItems, sku2Update(vendorID, sku, model.SyncFlagDeletedMask)) + } + } else if model.IsSyncStatusNew(sku.StoreSkuSyncStatus) { + calVendorPrice4StoreSku(sku, storeDetail.PricePercentagePackObj, int(storeDetail.PricePercentage)) + if singleStoreHandler != nil { + bareSku = storeSkuSyncInfo2Bare(sku) + stockList = append(stockList, bareSku) + } else { + createList = append(createList, sku) + } + } else { + if model.IsSyncStatusUpdate(sku.StoreSkuSyncStatus) { + if singleStoreHandler != nil { + updateList = append(updateList, calVendorPrice4StoreSku(sku, storeDetail.PricePercentagePackObj, int(storeDetail.PricePercentage))) + } + } else { + if model.IsSyncStatusPrice(sku.StoreSkuSyncStatus) { + bareSku = storeSkuSyncInfo2Bare(calVendorPrice4StoreSku(sku, storeDetail.PricePercentagePackObj, int(storeDetail.PricePercentage))) + priceList = append(priceList, bareSku) + } + if model.IsSyncStatusSale(sku.StoreSkuSyncStatus) { + if bareSku == nil { + bareSku = storeSkuSyncInfo2Bare(sku) + } + statusList = append(statusList, bareSku) + } + } + } + if bareSku != nil { + skuMap[bareSku] = sku + } + } + if _, err = dao.BatchUpdateEntityByKV(db, updateItems); err != nil { + return err + } + + bareSku2Sync := func(bareSkuList []*partner.StoreSkuInfo) (skuList []*dao.StoreSkuSyncInfo) { + if len(bareSkuList) > 0 { + skuList = make([]*dao.StoreSkuSyncInfo, len(bareSkuList)) + for k, v := range bareSkuList { + skuList[k] = skuMap[v] + } + } + return skuList + } + + task := tasksch.NewParallelTask("syncStoreSkuNew", tasksch.NewParallelConfig().SetParallelCount(1).SetIsContinueWhenError(isContinueWhenError), ctx, + func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + step := batchItemList[0].(int) + switch step { + case 0: + if len(deleteList) > 0 { + _, err = putils.FreeBatchStoreSkuInfo(func(batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, err error) { + if err = singleStoreHandler.DeleteStoreSkus(ctx, storeID, vendorStoreID, batchedStoreSkuList); err == nil { + _, err = updateStoreSku(db, vendorID, bareSku2Sync(batchedStoreSkuList), model.SyncFlagDeletedMask) + } + return nil, err + }, ctx, task, deleteList, singleStoreHandler.GetStoreSkusBatchSize(partner.FuncDeleteStoreSkus), isContinueWhenError) + } + case 1: + if len(createList) > 0 { + _, err = putils.FreeBatchStoreSkuSyncInfo(func(batchedStoreSkuList []*dao.StoreSkuSyncInfo) (result interface{}, err error) { + if err = singleStoreHandler.CreateStoreSkus(ctx, storeID, vendorStoreID, batchedStoreSkuList); err == nil { + _, err = updateStoreSku(db, vendorID, batchedStoreSkuList, model.SyncFlagNewMask) + } + return nil, err + }, ctx, task, createList, singleStoreHandler.GetStoreSkusBatchSize(partner.FuncCreateStoreSkus), isContinueWhenError) + } + case 2: + if len(updateList) > 0 { + _, err = putils.FreeBatchStoreSkuSyncInfo(func(batchedStoreSkuList []*dao.StoreSkuSyncInfo) (result interface{}, err error) { + if err = singleStoreHandler.CreateStoreSkus(ctx, storeID, vendorStoreID, batchedStoreSkuList); err == nil { + _, err = updateStoreSku(db, vendorID, batchedStoreSkuList, model.SyncFlagModifiedMask) + } + return nil, err + }, ctx, task, updateList, singleStoreHandler.GetStoreSkusBatchSize(partner.FuncUpdateStoreSkus), isContinueWhenError) + } + case 3: + if len(stockList) > 0 { + _, err = putils.FreeBatchStoreSkuInfo(func(batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, err error) { + if err = storeSkuHandler.UpdateStoreSkusStock(ctx, storeID, vendorStoreID, stockList); err == nil { + _, err = updateStoreSku(db, vendorID, bareSku2Sync(batchedStoreSkuList), model.SyncFlagModifiedMask) // ? + } + return nil, err + }, ctx, task, stockList, storeSkuHandler.GetStoreSkusBatchSize(partner.FuncUpdateStoreSkusStock), isContinueWhenError) + } + case 4: + if len(statusList) > 0 { + _, err = putils.FreeBatchStoreSkuInfo(func(batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, err error) { + if err = storeSkuHandler.UpdateStoreSkusStock(ctx, storeID, vendorStoreID, statusList); err == nil { + _, err = updateStoreSku(db, vendorID, bareSku2Sync(batchedStoreSkuList), model.SyncFlagSaleMask) + } + return nil, err + }, ctx, task, statusList, storeSkuHandler.GetStoreSkusBatchSize(partner.FuncUpdateStoreSkusStatus), isContinueWhenError) + } + case 5: + if len(priceList) > 0 { + _, err = putils.FreeBatchStoreSkuInfo(func(batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, err error) { + if err = storeSkuHandler.UpdateStoreSkusStock(ctx, storeID, vendorStoreID, priceList); err == nil { + _, err = updateStoreSku(db, vendorID, bareSku2Sync(batchedStoreSkuList), model.SyncFlagPriceMask) + } + return nil, err + }, ctx, task, priceList, storeSkuHandler.GetStoreSkusBatchSize(partner.FuncUpdateStoreSkusPrice), isContinueWhenError) + } + } + return retVal, err + }, []int{0, 1, 2, 3, 4, 5}) + tasksch.HandleTask(task, parentTask, true).Run() + _, err = task.GetResult(0) + return err +} + func PruneMissingStoreSkus(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorID, storeID int, vendorStoreID string, isAsync, isContinueWhenError bool) (hint string, err error) { handler, _ := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler) if handler == nil { @@ -52,7 +382,7 @@ func PruneMissingStoreSkus(ctx *jxcontext.Context, parentTask tasksch.ITask, ven _, err = putils.FreeBatchStoreSkuInfo(func(batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, err error) { err = handler.DeleteStoreSkus(ctx, storeID, vendorStoreID, sku2Delete) return nil, err - }, ctx, parentTask, sku2Delete, handler.GetStoreSkusBatchSize(partner.FuncDeleteStoreSkus)) + }, ctx, parentTask, sku2Delete, handler.GetStoreSkusBatchSize(partner.FuncDeleteStoreSkus), isContinueWhenError) } } return nil, err diff --git a/business/model/dao/dao_bz.go b/business/model/dao/dao_bz.go index da1cb74e3..3b3e3a79b 100644 --- a/business/model/dao/dao_bz.go +++ b/business/model/dao/dao_bz.go @@ -150,11 +150,18 @@ func AddStoreCategoryMap(db *DaoDB, storeID, categoryID int, vendorID int, vendo if err = GetEntity(db, storeCat, model.FieldStoreID, model.FieldCategoryID, model.FieldDeletedAt); err != nil && err != orm.ErrNoRows { return err } - if vendorID == model.VendorIDEBAI { - if vendorCategoryID != "" { - storeCat.EbaiID = utils.Str2Int64(vendorCategoryID) + if vendorID == model.VendorIDMTWM { + storeCat.MtwmID = vendorCategoryID + storeCat.MtwmSyncStatus = status + } else if vendorID == model.VendorIDEBAI || vendorID == model.VendorIDWSC { + intVendorCategoryID := utils.Str2Int64WithDefault(vendorCategoryID, 0) + if vendorID == model.VendorIDEBAI { + storeCat.EbaiID = intVendorCategoryID + storeCat.EbaiSyncStatus = status + } else { + storeCat.WscID = intVendorCategoryID + storeCat.WscSyncStatus = status } - storeCat.EbaiSyncStatus = status } else { panic("unsupported vendor") } diff --git a/business/model/dao/store_sku.go b/business/model/dao/store_sku.go index aaa29cf84..dc3502fe1 100644 --- a/business/model/dao/store_sku.go +++ b/business/model/dao/store_sku.go @@ -33,11 +33,13 @@ type SkuStoreCatInfo struct { type StoreSkuSyncInfo struct { // 平台无关的store sku信息 - BindID int `orm:"column(bind_id)"` // 换名的原因是与Sku.ID同名区别 - StoreID int `orm:"column(store_id)"` - SkuID int `orm:"column(sku_id)"` // 这个与Sku.ID的区别是SkuID是必然存在的 - Price int64 - UnitPrice int64 + BindID int `orm:"column(bind_id)"` // 换名的原因是与Sku.ID同名区别 + StoreID int `orm:"column(store_id)"` + SkuID int `orm:"column(sku_id)"` // 这个与Sku.ID的区别是SkuID是必然存在的 + + Price int64 + UnitPrice int64 + VendorPrice int64 // 平台相关的store sku信息 StoreSkuStatus int diff --git a/business/model/model.go b/business/model/model.go index a89b3df0e..d46d57e25 100644 --- a/business/model/model.go +++ b/business/model/model.go @@ -95,6 +95,14 @@ func IsSyncStatusUpdate(syncStatus int8) bool { return (syncStatus & SyncFlagModifiedMask) != 0 } +func IsSyncStatusSale(syncStatus int8) bool { + return (syncStatus & SyncFlagSaleMask) != 0 +} + +func IsSyncStatusPrice(syncStatus int8) bool { + return (syncStatus & SyncFlagPriceMask) != 0 +} + func IsSyncStatusNeedCreate(syncStatus int8) bool { return IsSyncStatusNew(syncStatus) && !IsSyncStatusDelete(syncStatus) } diff --git a/business/partner/purchase/ebai/store_sku2.go b/business/partner/purchase/ebai/store_sku2.go index fdfad1485..381804996 100644 --- a/business/partner/purchase/ebai/store_sku2.go +++ b/business/partner/purchase/ebai/store_sku2.go @@ -186,8 +186,8 @@ func genSkuParamsFromStoreSkuInfo2(storeSku *dao.StoreSkuSyncInfo) (params map[s params["rtf"] = storeSku.DescImg } if storeSku.StoreSkuSyncStatus&(model.SyncFlagPriceMask|model.SyncFlagNewMask) != 0 { - params["sale_price"] = storeSku.Price - params["market_price"] = storeSku.Price + params["sale_price"] = storeSku.VendorPrice + params["market_price"] = storeSku.VendorPrice } if storeSku.StoreSkuSyncStatus&(model.SyncFlagSaleMask|model.SyncFlagNewMask) != 0 { params["status"] = jxSkuStatus2Ebai(storeSku.StoreSkuStatus) diff --git a/business/partner/purchase/mtwm/store_sku2.go b/business/partner/purchase/mtwm/store_sku2.go index 94f45071a..93e29ddbe 100644 --- a/business/partner/purchase/mtwm/store_sku2.go +++ b/business/partner/purchase/mtwm/store_sku2.go @@ -119,7 +119,7 @@ func (p *PurchaseHandler) CreateStoreSkus(ctx *jxcontext.Context, storeID int, v foodData["skus"] = skus foodData["name"] = utils.LimitUTF8StringLen(storeSku.Name, 30) foodData["description"] = storeSku.Comment - foodData["price"] = storeSku.Price + foodData["price"] = storeSku.VendorPrice foodData["min_order_count"] = 1 foodData["unit"] = storeSku.Unit foodData["box_num"] = 0 @@ -130,7 +130,7 @@ func (p *PurchaseHandler) CreateStoreSkus(ctx *jxcontext.Context, storeID int, v if storeSku.DescImg != "" { foodData["picture_contents"] = storeSku.DescImg } - foodData["sequence"] = storeSku.Price + foodData["sequence"] = storeSku.VendorPrice if storeSku.VendorVendorCatID != 0 { foodData["tag_id"] = utils.Int64ToStr(storeSku.VendorVendorCatID) } else { diff --git a/business/partner/putils/store_sku.go b/business/partner/putils/store_sku.go index 00abc2e98..9a5df9ef6 100644 --- a/business/partner/putils/store_sku.go +++ b/business/partner/putils/store_sku.go @@ -7,6 +7,7 @@ import ( "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" "git.rosy.net.cn/jx-callback/business/jxutils/tasksch" + "git.rosy.net.cn/jx-callback/business/model/dao" "git.rosy.net.cn/jx-callback/business/partner" ) @@ -29,7 +30,7 @@ func (p *DefSingleStorePlatform) DeleteStoreAllSkus(ctx *jxcontext.Context, pare _, err = FreeBatchStoreSkuInfo(func(batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, err error) { err = p.DeleteStoreSkus(ctx, storeID, vendorStoreID, batchedStoreSkuList) return nil, err - }, ctx, parentTask, storeStoreList, p.GetStoreSkusBatchSize(partner.FuncDeleteStoreSkus)) + }, ctx, parentTask, storeStoreList, p.GetStoreSkusBatchSize(partner.FuncDeleteStoreSkus), isContinueWhenError) return err } @@ -56,7 +57,7 @@ func (p *DefSingleStorePlatform) DeleteStoreAllCategories(ctx *jxcontext.Context } err = FreeBatchCategoryIDOp(func(vendorCatID string) (err error) { return p.DeleteStoreCategory(ctx, storeID, vendorStoreID, vendorCatID) - }, ctx, task, vendorCatIDs) + }, ctx, task, vendorCatIDs, isContinueWhenError) return nil, err }, len(levelList)) tasksch.HandleTask(task1, parentTask, true).Run() @@ -75,7 +76,7 @@ func flatCatList(catList []*partner.BareCategoryInfo) (flattedCatList []*partner func (p *DefSingleStorePlatform) GetStoreSkusBareInfo(ctx *jxcontext.Context, parentTask tasksch.ITask, storeID int, vendorStoreID string, inStoreSkuList []*partner.StoreSkuInfo) (outStoreSkuList []*partner.StoreSkuInfo, err error) { resultList, err := FreeBatchStoreSkuInfo(func(batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, err error) { return p.GetStoreSkusFullInfo(ctx, parentTask, storeID, vendorStoreID, batchedStoreSkuList) - }, ctx, parentTask, inStoreSkuList, p.GetStoreSkusBatchSize(partner.FuncGetStoreSkusFullInfo)) + }, ctx, parentTask, inStoreSkuList, p.GetStoreSkusBatchSize(partner.FuncGetStoreSkusFullInfo), true) if err != nil || len(resultList) == 0 { return nil, err } @@ -119,15 +120,18 @@ func (p *DefSingleStorePlatform) GetStoreCategory(ctx *jxcontext.Context, storeI return cat, err } -func FreeBatchStoreSkuInfo(handler func([]*partner.StoreSkuInfo) (interface{}, error), ctx *jxcontext.Context, parentTask tasksch.ITask, storeSkuList []*partner.StoreSkuInfo, batchSize int) (resultList []interface{}, err error) { +func FreeBatchStoreSkuInfo(handler func([]*partner.StoreSkuInfo) (interface{}, error), ctx *jxcontext.Context, parentTask tasksch.ITask, storeSkuList []*partner.StoreSkuInfo, batchSize int, isContinueWhenError bool) (resultList []interface{}, err error) { if len(storeSkuList) > batchSize { - task := tasksch.NewParallelTask("FreeBatchStoreSkuInfo", tasksch.NewParallelConfig().SetBatchSize(batchSize), ctx, + task := tasksch.NewParallelTask("FreeBatchStoreSkuInfo", tasksch.NewParallelConfig().SetBatchSize(batchSize).SetIsContinueWhenError(isContinueWhenError), ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { batchStoreSkuList := make([]*partner.StoreSkuInfo, len(batchItemList)) for k, v := range batchItemList { batchStoreSkuList[k] = v.(*partner.StoreSkuInfo) } retVal, err = handler(batchStoreSkuList) + if err != nil { + retVal = nil + } return retVal, err }, storeSkuList) tasksch.HandleTask(task, parentTask, false).Run() @@ -141,9 +145,34 @@ func FreeBatchStoreSkuInfo(handler func([]*partner.StoreSkuInfo) (interface{}, e return resultList, err } -func FreeBatchCategoryIDOp(handler func(vendorCatID string) (err error), ctx *jxcontext.Context, parentTask tasksch.ITask, vendorCatIDs []string) (err error) { +func FreeBatchStoreSkuSyncInfo(handler func([]*dao.StoreSkuSyncInfo) (interface{}, error), ctx *jxcontext.Context, parentTask tasksch.ITask, storeSkuList []*dao.StoreSkuSyncInfo, batchSize int, isContinueWhenError bool) (resultList []interface{}, err error) { + if len(storeSkuList) > batchSize { + task := tasksch.NewParallelTask("FreeBatchStoreSkuSyncInfo", tasksch.NewParallelConfig().SetBatchSize(batchSize).SetIsContinueWhenError(isContinueWhenError), ctx, + func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + batchStoreSkuList := make([]*dao.StoreSkuSyncInfo, len(batchItemList)) + for k, v := range batchItemList { + batchStoreSkuList[k] = v.(*dao.StoreSkuSyncInfo) + } + retVal, err = handler(batchStoreSkuList) + if err != nil { + retVal = nil + } + return retVal, err + }, storeSkuList) + tasksch.HandleTask(task, parentTask, false).Run() + resultList, err = task.GetResult(0) + } else { + result, err2 := handler(storeSkuList) + if err = err2; err == nil { + resultList = utils.Interface2Slice(result) + } + } + return resultList, err +} + +func FreeBatchCategoryIDOp(handler func(vendorCatID string) (err error), ctx *jxcontext.Context, parentTask tasksch.ITask, vendorCatIDs []string, isContinueWhenError bool) (err error) { if len(vendorCatIDs) > 1 { - task := tasksch.NewParallelTask("FreeBatchCategoryIDOp", nil, ctx, + task := tasksch.NewParallelTask("FreeBatchCategoryIDOp", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError), ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { err = handler(batchItemList[0].(string)) return nil, err