diff --git a/business/partner/purchase/jd/store_sku.go b/business/partner/purchase/jd/store_sku.go index 10a6e9e4b..6c4b50f16 100644 --- a/business/partner/purchase/jd/store_sku.go +++ b/business/partner/purchase/jd/store_sku.go @@ -33,112 +33,121 @@ func (p *PurchaseHandler) syncStoreSkus(ctx *jxcontext.Context, parentTask tasks return "", err } batchSize := jdapi.MaxStoreSkuBatchSize - storeSkusLen := len(storeSkus) - if storeSkusLen < jdapi.MaxStoreSkuBatchSize*2 { - batchSize = (storeSkusLen + 2) / 3 - } else if storeSkusLen < jdapi.MaxStoreSkuBatchSize { - batchSize = (storeSkusLen + 1) / 2 - } else if storeSkusLen < jdapi.MaxStoreSkuBatchSize/2 { - batchSize = 1 - } + // storeSkusLen := len(storeSkus) + // if storeSkusLen < jdapi.MaxStoreSkuBatchSize/2 { + // batchSize = 1 + // } else if storeSkusLen < jdapi.MaxStoreSkuBatchSize { + // batchSize = (storeSkusLen + 1) / 2 + // } else if storeSkusLen < jdapi.MaxStoreSkuBatchSize*2 { + // batchSize = (storeSkusLen + 2) / 3 + // } task := tasksch.NewParallelTask("syncStoreSkus京东", tasksch.NewParallelConfig().SetBatchSize(batchSize).SetIsContinueWhenError(isContinueWhenError), ctx, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { - var skuPriceInfoList []*jdapi.SkuPriceInfo - var skuVendibilityList []*jdapi.StockVendibility - var skuStockList []*jdapi.SkuStock - stationNo := storeDetail.VendorStoreID - var batchBindIDs []int - for _, v := range batchItemList { - storeSku := v.(*dao.StoreSkuSyncInfo) - alreadyAddStock := false - if storeSku.StoreSkuSyncStatus&model.SyncFlagChangedMask != 0 || storeSku.BindID == 0 || storeSku.NameID == 0 { - if storeSku.BindID > 0 { - batchBindIDs = append(batchBindIDs, storeSku.BindID) - } - if storeSku.StoreSkuSyncStatus&(model.SyncFlagDeletedMask|model.SyncFlagNewMask) != 0 || storeSku.BindID == 0 || storeSku.NameID == 0 { // 关注或取消关注 - stock := &jdapi.SkuStock{ - OutSkuId: utils.Int2Str(storeSku.SkuID), - StockQty: model.MaxStoreSkuStockQty, + doWork := func(batchItemList []interface{}) (err error) { + var skuPriceInfoList []*jdapi.SkuPriceInfo + var skuVendibilityList []*jdapi.StockVendibility + var skuStockList []*jdapi.SkuStock + stationNo := storeDetail.VendorStoreID + var batchBindIDs []int + for _, v := range batchItemList { + storeSku := v.(*dao.StoreSkuSyncInfo) + alreadyAddStock := false + if storeSku.StoreSkuSyncStatus&model.SyncFlagChangedMask != 0 || storeSku.BindID == 0 || storeSku.NameID == 0 { + if storeSku.BindID > 0 { + batchBindIDs = append(batchBindIDs, storeSku.BindID) } - if storeSku.StoreSkuSyncStatus&model.SyncFlagDeletedMask != 0 || storeSku.DeletedAt != utils.DefaultTimeValue || storeSku.BindID == 0 || storeSku.NameID == 0 { - stock.StockQty = 0 - } else { - alreadyAddStock = true - } - if stock.StockQty != 0 || !storeskulock.IsJdStoreSkuLocked(stationNo, storeSku.JdID) { - skuStockList = append(skuStockList, stock) - } - } - if storeSku.StoreSkuSyncStatus&(model.SyncFlagPriceMask|model.SyncFlagNewMask) != 0 { - skuPriceInfoList = append(skuPriceInfoList, &jdapi.SkuPriceInfo{ - OutSkuId: utils.Int2Str(storeSku.SkuID), - Price: constrainPrice(jxutils.CaculateSkuVendorPrice(int(storeSku.Price), int(storeDetail.PricePercentage), storeSku.CatPricePercentage)), - }) - } - if storeSku.StoreSkuSyncStatus&(model.SyncFlagSaleMask|model.SyncFlagNewMask) != 0 { - vendibility := &jdapi.StockVendibility{ - OutSkuId: utils.Int2Str(storeSku.SkuID), - DoSale: true, - } - if storeSku.StoreSkuStatus != model.StoreSkuBindStatusNormal { - vendibility.DoSale = false - } else if !alreadyAddStock { // 如果是设置可售则自动将库存加满 + if storeSku.StoreSkuSyncStatus&(model.SyncFlagDeletedMask|model.SyncFlagNewMask) != 0 || storeSku.BindID == 0 || storeSku.NameID == 0 { // 关注或取消关注 stock := &jdapi.SkuStock{ OutSkuId: utils.Int2Str(storeSku.SkuID), StockQty: model.MaxStoreSkuStockQty, } - skuStockList = append(skuStockList, stock) + if storeSku.StoreSkuSyncStatus&model.SyncFlagDeletedMask != 0 || storeSku.DeletedAt != utils.DefaultTimeValue || storeSku.BindID == 0 || storeSku.NameID == 0 { + stock.StockQty = 0 + } else { + alreadyAddStock = true + } + if stock.StockQty != 0 || !storeskulock.IsJdStoreSkuLocked(stationNo, storeSku.JdID) { + skuStockList = append(skuStockList, stock) + } } - if vendibility.DoSale || !storeskulock.IsJdStoreSkuLocked(stationNo, storeSku.JdID) { - skuVendibilityList = append(skuVendibilityList, vendibility) + if storeSku.StoreSkuSyncStatus&(model.SyncFlagPriceMask|model.SyncFlagNewMask) != 0 { + skuPriceInfoList = append(skuPriceInfoList, &jdapi.SkuPriceInfo{ + OutSkuId: utils.Int2Str(storeSku.SkuID), + Price: constrainPrice(jxutils.CaculateSkuVendorPrice(int(storeSku.Price), int(storeDetail.PricePercentage), storeSku.CatPricePercentage)), + }) + } + if storeSku.StoreSkuSyncStatus&(model.SyncFlagSaleMask|model.SyncFlagNewMask) != 0 { + vendibility := &jdapi.StockVendibility{ + OutSkuId: utils.Int2Str(storeSku.SkuID), + DoSale: true, + } + if storeSku.StoreSkuStatus != model.StoreSkuBindStatusNormal { + vendibility.DoSale = false + } else if !alreadyAddStock { // 如果是设置可售则自动将库存加满 + stock := &jdapi.SkuStock{ + OutSkuId: utils.Int2Str(storeSku.SkuID), + StockQty: model.MaxStoreSkuStockQty, + } + skuStockList = append(skuStockList, stock) + } + if vendibility.DoSale || !storeskulock.IsJdStoreSkuLocked(stationNo, storeSku.JdID) { + skuVendibilityList = append(skuVendibilityList, vendibility) + } } } } - } - syncMask := 0 - errList := []error{} - if globals.EnableJdStoreWrite { - // todo 以下可以优化为并行操作 - globals.SugarLogger.Debug(utils.Format4Output(skuVendibilityList, false), utils.Format4Output(skuPriceInfoList, false), utils.Format4Output(skuStockList, false)) - if len(skuVendibilityList) > 0 { - if _, err = api.JdAPI.BatchUpdateVendibility("", stationNo, skuVendibilityList, ctx.GetUserName()); err == nil { - syncMask |= model.SyncFlagSaleMask - } else { - errList = append(errList, err) + syncMask := 0 + errList := []error{} + if globals.EnableJdStoreWrite { + // todo 以下可以优化为并行操作 + globals.SugarLogger.Debug(utils.Format4Output(skuVendibilityList, false), utils.Format4Output(skuPriceInfoList, false), utils.Format4Output(skuStockList, false)) + if len(skuVendibilityList) > 0 { + if _, err = api.JdAPI.BatchUpdateVendibility("", stationNo, skuVendibilityList, ctx.GetUserName()); err == nil { + syncMask |= model.SyncFlagSaleMask + } else { + errList = append(errList, err) + } + } + if (err == nil || isContinueWhenError) && len(skuStockList) > 0 { + if _, err = api.JdAPI.BatchUpdateCurrentQtys("", stationNo, skuStockList, ctx.GetUserName()); err == nil { + syncMask |= model.SyncFlagNewMask | model.SyncFlagDeletedMask + } else { + errList = append(errList, err) + } + } + if (err == nil || isContinueWhenError) && len(skuPriceInfoList) > 0 { + if _, err = api.JdAPI.UpdateVendorStationPrice("", stationNo, skuPriceInfoList); err == nil { + syncMask |= model.SyncFlagPriceMask + } else { + errList = append(errList, partner.NewErrorCode(err.Error(), partner.ErrCodeChangePriceFailed, model.VendorIDJD)) + } } } - if (err == nil || isContinueWhenError) && len(skuStockList) > 0 { - if _, err = api.JdAPI.BatchUpdateCurrentQtys("", stationNo, skuStockList, ctx.GetUserName()); err == nil { - syncMask |= model.SyncFlagNewMask | model.SyncFlagDeletedMask - } else { - errList = append(errList, err) - } + if len(errList) == 0 { + syncMask = -1 } - if (err == nil || isContinueWhenError) && len(skuPriceInfoList) > 0 { - if _, err = api.JdAPI.UpdateVendorStationPrice("", stationNo, skuPriceInfoList); err == nil { - syncMask |= model.SyncFlagPriceMask - } else { - errList = append(errList, partner.NewErrorCode(err.Error(), partner.ErrCodeChangePriceFailed, model.VendorIDJD)) - } - } - } - if len(errList) == 0 { - syncMask = -1 - } - if syncMask != 0 && len(batchBindIDs) > 0 { - db := dao.GetDB() // 多线程问题 - sql := ` + if syncMask != 0 && len(batchBindIDs) > 0 { + // db := dao.GetDB() // 多线程问题 + sql := ` UPDATE store_sku_bind t1 SET t1.jd_sync_status = t1.jd_sync_status & ? WHERE t1.id IN (` + dao.GenQuestionMarks(len(batchBindIDs)) + ")" - if _, err = dao.ExecuteSQL(db, sql, ^syncMask, batchBindIDs); err != nil { - errList = append(errList, err) + if _, err = dao.ExecuteSQL(db, sql, ^syncMask, batchBindIDs); err != nil { + errList = append(errList, err) + } } + if len(errList) == 1 { + err = errList[0] + } else if len(errList) > 1 { + err = fmt.Errorf("%v", errList) + } + return err } - if len(errList) == 1 { - err = errList[0] - } else if len(errList) > 1 { - err = fmt.Errorf("%v", errList) + err = doWork(batchItemList) + if isErrPartialFailed(err) && len(batchItemList) > 1 { + for _, v := range batchItemList { + doWork([]interface{}{v}) + } } return nil, err }, storeSkus) @@ -149,6 +158,13 @@ func (p *PurchaseHandler) syncStoreSkus(ctx *jxcontext.Context, parentTask tasks return task.ID, err } +func isErrPartialFailed(err error) bool { + if errExt, ok := err.(*utils.ErrorWithCode); ok && errExt.Code() == jdapi.ResponseInnerCodePartialFailed { + return true + } + return false +} + func (p *PurchaseHandler) SyncStoreSkus(ctx *jxcontext.Context, parentTask tasksch.ITask, storeID int, skuIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) { globals.SugarLogger.Debugf("jd SyncStoresSkus, storeID:%d, skuIDs:%v", storeID, skuIDs) db := dao.GetDB()