diff --git a/business/jxstore/cms/sync.go b/business/jxstore/cms/sync.go index d1bfe40e7..22999df22 100644 --- a/business/jxstore/cms/sync.go +++ b/business/jxstore/cms/sync.go @@ -16,6 +16,11 @@ import ( "git.rosy.net.cn/jx-callback/globals/api" ) +type LoopStoreMapInfo struct { + VendorID int + StoreMapList []*model.StoreMap +} + type VendorSync struct { MultiStoreVendorIDs []int SingleStoreVendorIDs []int @@ -191,19 +196,37 @@ func (v *VendorSync) SyncReorderCategories(ctx *jxcontext.Context, db *dao.DaoDB func (v *VendorSync) SyncStore(ctx *jxcontext.Context, db *dao.DaoDB, vendorID, storeID int, isAsync bool, userName string) (hint string, err error) { globals.SugarLogger.Debugf("SyncStore, storeID:%d", storeID) - hint, err = v.LoopStoreMap(ctx, db, "SyncStore", isAsync, userName, storeID, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { - storeMap := batchItemList[0].(*model.StoreMap) - if (vendorID == -1 || vendorID == storeMap.VendorID) && (storeMap.SyncStatus != 0) { - if handler := v.GetStoreHandler(storeMap.VendorID); handler != nil { - if err = handler.UpdateStore(db, storeID, userName); err == nil { + + var vendorIDs []int + if vendorID != -1 { + vendorIDs = []int{ + vendorID, + } + } + hint, err = v.LoopStoresMap(ctx, db, "SyncStore", isAsync, vendorIDs, []int{storeID}, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { + loopMapInfo := batchItemList[0].(*LoopStoreMapInfo) + handler := v.GetStoreHandler(loopMapInfo.VendorID) + if len(loopMapInfo.StoreMapList) > 1 { + loopStoreTask := tasksch.NewParallelTask("SyncStore loop stores", nil, ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + storeMap := batchItemList[0].(*model.StoreMap) + if err = handler.UpdateStore(db, storeMap.StoreID, userName); err == nil { storeMap.SyncStatus = 0 _, err = dao.UpdateEntity(db, storeMap, model.FieldSyncStatus) } - } + return nil, err + }, loopMapInfo.StoreMapList) + t.AddChild(loopStoreTask).Run() + _, err = loopStoreTask.GetResult(0) + return nil, err + } + storeMap := loopMapInfo.StoreMapList[0] + if err = handler.UpdateStore(db, storeMap.StoreID, userName); err == nil { + storeMap.SyncStatus = 0 + _, err = dao.UpdateEntity(db, storeMap, model.FieldSyncStatus) } return nil, err }) - return "", err + return hint, err } func (v *VendorSync) SyncSku(ctx *jxcontext.Context, db *dao.DaoDB, nameID, skuID int, isAsync bool, userName string) (hint string, err error) { @@ -253,32 +276,75 @@ func (v *VendorSync) SyncSku(ctx *jxcontext.Context, db *dao.DaoDB, nameID, skuI // func (v *VendorSync) SyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, skuIDs []int, isAsync bool, userName string) (hint string, err error) { globals.SugarLogger.Debug("SyncStoresSkus") - hint, err = v.LoopStoreVendors(ctx, db, vendorIDs, "SyncStoresSkus", isAsync, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { - handler := v.GetStoreHandler(batchItemList[0].(int)) - _, err = handler.SyncStoresSkus(ctx, t, db, storeIDs, skuIDs, false) + hint, err = v.LoopStoresMap(ctx, db, "SyncStoresSkus", isAsync, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { + loopMapInfo := batchItemList[0].(*LoopStoreMapInfo) + handler := v.GetStoreHandler(loopMapInfo.VendorID) + if len(loopMapInfo.StoreMapList) > 1 { + loopStoreTask := tasksch.NewSeqTask("SyncStoresSkus loop stores", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { + storeID := loopMapInfo.StoreMapList[step].StoreID + _, err = handler.SyncStoreSkus(ctx, task, storeID, skuIDs, false) + return nil, err + }, len(loopMapInfo.StoreMapList)) + t.AddChild(loopStoreTask).Run() + _, err = loopStoreTask.GetResult(0) + return nil, err + } + _, err = handler.SyncStoreSkus(ctx, t, loopMapInfo.StoreMapList[0].StoreID, skuIDs, false) return nil, err }) return hint, err } -func (v *VendorSync) LoopStoreMap(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync bool, userName string, storeID int, handler tasksch.WorkFunc) (hint string, err error) { - storeMaps, err := GetStoreVendorMaps(ctx, db, storeID, -1) - if err == nil { - task := tasksch.NewParallelTask(taskName, nil, userName, handler, storeMaps) - ctx.SetTaskOrAddChild(task, nil) - tasksch.ManageTask(task).Run() - hint = task.ID - if !isAsync { - _, err = task.GetResult(0) +func (v *VendorSync) LoopStoresMap(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync bool, vendorIDs []int, storeIDs []int, handler tasksch.WorkFunc) (hint string, err error) { + sql := ` + SELECT t1.* + FROM store_map t1 + WHERE t1.deleted_at = ? + ` + sqlParams := []interface{}{ + utils.DefaultTimeValue, + } + if len(vendorIDs) > 0 { + sql += " AND t1.vendor_id IN (" + dao.GenQuestionMarks(len(vendorIDs)) + ")" + sqlParams = append(sqlParams, vendorIDs) + } + if len(storeIDs) > 0 { + sql += " AND t1.store_id IN (" + dao.GenQuestionMarks(len(storeIDs)) + ")" + sqlParams = append(sqlParams, storeIDs) + } + + var storeMapList []*model.StoreMap + if err = dao.GetRows(db, &storeMapList, sql, sqlParams...); err != nil { + return "", err + } + + if len(storeMapList) == 0 { + return "", nil + } + vendorStoreMap := make(map[int][]*model.StoreMap) + for _, v := range storeMapList { + vendorStoreMap[v.VendorID] = append(vendorStoreMap[v.VendorID], v) + } + loopInfoList := make([]*LoopStoreMapInfo, len(vendorStoreMap)) + index := 0 + for k, v := range vendorStoreMap { + loopInfoList[index] = &LoopStoreMapInfo{ + VendorID: k, + StoreMapList: v, } + index++ + } + task := tasksch.NewParallelTask(taskName, nil, ctx.GetUserName(), handler, loopInfoList) + ctx.SetTaskOrAddChild(task, nil) + tasksch.ManageTask(task).Run() + hint = task.ID + if !isAsync { + _, err = task.GetResult(0) } return hint, makeSyncError(err) } func (v *VendorSync) LoopMultiStoresVendors(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync bool, userName string, handler tasksch.WorkFunc) (hint string, err error) { - if taskName == "" { - taskName = "LoopMultiStoresVendors" - } task := tasksch.NewParallelTask(taskName, nil, userName, handler, v.MultiStoreVendorIDs) ctx.SetTaskOrAddChild(task, nil) tasksch.ManageTask(task).Run() @@ -288,58 +354,38 @@ func (v *VendorSync) LoopMultiStoresVendors(ctx *jxcontext.Context, db *dao.DaoD return task.ID, makeSyncError(err) } -func (v *VendorSync) LoopStoreVendors(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, taskName string, isAsync bool, userName string, handler tasksch.WorkFunc) (hint string, err error) { - if taskName == "" { - taskName = "LoopStoreVendors" - } - var vendorIDMap map[int]int - if len(vendorIDs) != 0 { - vendorIDMap = make(map[int]int) - for _, v := range vendorIDs { - vendorIDMap[v] = 1 - } - } - allHandlers := make([]int, len(v.MultiStoreVendorIDs)+len(v.SingleStoreVendorIDs)) - copy(allHandlers, v.MultiStoreVendorIDs) - copy(allHandlers[len(v.MultiStoreVendorIDs):], v.SingleStoreVendorIDs) - if vendorIDMap != nil { - count := 0 - for _, v := range allHandlers { - if vendorIDMap[v] == 1 { - allHandlers[count] = v - count++ - } - } - allHandlers = allHandlers[:count] - } - task := tasksch.NewParallelTask(taskName, nil, userName, handler, allHandlers) - ctx.SetTaskOrAddChild(task, nil) - tasksch.ManageTask(task).Run() - if !isAsync { - _, err = task.GetResult(0) - } - return task.ID, err -} - -func (v *VendorSync) LoopSingleStoreVendors(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync bool, userName string, handler tasksch.WorkFunc) (hint string, err error) { - if taskName == "" { - taskName = "LoopSingleStoreVendors" - } - var storeMaps []*model.StoreMap - if err = dao.GetRows(db, &storeMaps, ` - SELECT * - FROM store_map - WHERE deleted_at = ? AND vendor_id IN (`+dao.GenQuestionMarks(len(v.SingleStoreVendorIDs))+")", utils.DefaultTimeValue, v.SingleStoreVendorIDs); err == nil { - task := tasksch.NewParallelTask(taskName, nil, userName, handler, storeMaps) - ctx.SetTaskOrAddChild(task, nil) - tasksch.ManageTask(task).Run() - hint = task.ID - if !isAsync { - _, err = task.GetResult(0) - } - } - return hint, makeSyncError(err) -} +// func (v *VendorSync) LoopStoreVendors(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, taskName string, isAsync bool, userName string, handler tasksch.WorkFunc) (hint string, err error) { +// if taskName == "" { +// taskName = "LoopStoreVendors" +// } +// var vendorIDMap map[int]int +// if len(vendorIDs) != 0 { +// vendorIDMap = make(map[int]int) +// for _, v := range vendorIDs { +// vendorIDMap[v] = 1 +// } +// } +// allHandlers := make([]int, len(v.MultiStoreVendorIDs)+len(v.SingleStoreVendorIDs)) +// copy(allHandlers, v.MultiStoreVendorIDs) +// copy(allHandlers[len(v.MultiStoreVendorIDs):], v.SingleStoreVendorIDs) +// if vendorIDMap != nil { +// count := 0 +// for _, v := range allHandlers { +// if vendorIDMap[v] == 1 { +// allHandlers[count] = v +// count++ +// } +// } +// allHandlers = allHandlers[:count] +// } +// task := tasksch.NewParallelTask(taskName, nil, userName, handler, allHandlers) +// ctx.SetTaskOrAddChild(task, nil) +// tasksch.ManageTask(task).Run() +// if !isAsync { +// _, err = task.GetResult(0) +// } +// return task.ID, err +// } func (v *VendorSync) RefreshAllSkusID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int, storeIDs []int) (hint string, err error) { task := tasksch.NewParallelTask("RefreshAllSkusID", nil, ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { diff --git a/business/partner/partner.go b/business/partner/partner.go index 9298340ec..7b4792ce2 100644 --- a/business/partner/partner.go +++ b/business/partner/partner.go @@ -85,7 +85,7 @@ type IPurchasePlatformHandler interface { // OpenStore(vendorStoreID string, userName string) error // CloseStore(vendorStoreID, closeNotice, userName string) error - SyncStoresSkus(ctx *jxcontext.Context, parentTask tasksch.ITask, db *dao.DaoDB, storeIDs []int, skuIDs []int, isAsync bool) (hint string, err error) + SyncStoreSkus(ctx *jxcontext.Context, parentTask tasksch.ITask, storeID int, skuIDs []int, isAsync bool) (hint string, err error) RefreshAllStoresID(ctx *jxcontext.Context, parentTask tasksch.ITask, isAsync bool) (hint string, err error) GetVendorID() int diff --git a/business/partner/purchase/ebai/store_sku.go b/business/partner/purchase/ebai/store_sku.go index e0c0ce182..58e3afd4f 100644 --- a/business/partner/purchase/ebai/store_sku.go +++ b/business/partner/purchase/ebai/store_sku.go @@ -14,7 +14,6 @@ import ( "git.rosy.net.cn/jx-callback/business/model/dao" "git.rosy.net.cn/jx-callback/globals" "git.rosy.net.cn/jx-callback/globals/api" - "github.com/astaxie/beego/orm" ) const ( @@ -73,33 +72,13 @@ var ( } ) -func (p *PurchaseHandler) SyncStoresSkus(ctx *jxcontext.Context, parentTask tasksch.ITask, db *dao.DaoDB, storeIDs []int, skuIDs []int, isAsync bool) (hint string, err error) { - task := tasksch.NewSeqTask("ebai.SyncStoresSkus", ctx.GetUserName(), func(t *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { - storeID := storeIDs[step] - err = p.syncOneStoreSkus(t, db, storeID, skuIDs, false, ctx.GetUserName()) - return nil, err - }, len(storeIDs)) - ctx.SetTaskOrAddChild(task, parentTask) - task.Run() - if !isAsync { - _, err = task.GetResult(0) - } - return task.ID, err -} - -func (p *PurchaseHandler) syncOneStoreSkus(parentTask tasksch.ITask, db *dao.DaoDB, storeID int, skuIDs []int, isAsync bool, userName string) (err error) { - globals.SugarLogger.Debugf("syncOneStoreSkus storeID:%d, skuIDs:%v, userName:%s", storeID, skuIDs, userName) - - _, err2 := dao.GetStoreMapByStoreID(db, storeID, model.VendorIDEBAI) - if err = err2; err != nil { - if err == orm.ErrNoRows { - err = nil - } - return err - } +func (p *PurchaseHandler) SyncStoreSkus(ctx *jxcontext.Context, parentTask tasksch.ITask, storeID int, skuIDs []int, isAsync bool) (hint string, err error) { + userName := ctx.GetUserName() + globals.SugarLogger.Debugf("SyncStoreSkus storeID:%d, skuIDs:%v, userName:%s", storeID, skuIDs, userName) + db := dao.GetDB() if err = p.syncOneStoreCategoriesFromRemote2Local(db, storeID, userName); err != nil { - return err + return "", err } sql := ` SELECT t1.*, t2.spec_quality, t2.spec_unit, t2.weight, t2.status sku_status, @@ -109,24 +88,28 @@ func (p *PurchaseHandler) syncOneStoreSkus(parentTask tasksch.ITask, db *dao.Dao t4p.id parent_cat_id, t5p.ebai_id parent_cat_ebai_id, t5p.ebai_sync_status parent_cat_ebai_sync_status, cat1.vendor_category_id ebai_cat3_id, cat2.vendor_category_id ebai_cat2_id, cat2.parent_id ebai_cat1_id FROM store_sku_bind t1 - JOIN sku t2 ON t1.sku_id = t2.id - JOIN sku_name t3 ON t2.name_id = t3.id - JOIN sku_category t4 ON t3.category_id = t4.id - JOIN sku_category t4p ON t4p.id = t4.parent_id - LEFT JOIN store_sku_category_map t5 ON t5.store_id = t1.store_id AND t5.category_id = t4.id - LEFT JOIN store_sku_category_map t5p ON t5p.store_id = t1.store_id AND t5p.category_id = t4p.id - + JOIN sku t2 ON t1.sku_id = t2.id AND t2.deleted_at = ? + JOIN sku_name t3 ON t2.name_id = t3.id AND t3.deleted_at = ? + JOIN sku_category t4 ON t3.category_id = t4.id AND t4.deleted_at = ? + JOIN sku_category t4p ON t4p.id = t4.parent_id AND t4p.deleted_at = ? + LEFT JOIN store_sku_category_map t5 ON t5.store_id = t1.store_id AND t5.category_id = t4.id AND t5.deleted_at = ? + LEFT JOIN store_sku_category_map t5p ON t5p.store_id = t1.store_id AND t5p.category_id = t4p.id AND t5p.deleted_at = ? LEFT JOIN sku_vendor_category cat1 ON t4.ebai_category_id = cat1.vendor_category_id AND cat1.vendor_id = ? LEFT JOIN sku_vendor_category cat2 ON cat1.parent_id = cat2.vendor_category_id AND cat1.vendor_id = ? - WHERE t1.store_id = ? AND (t1.ebai_sync_status <> 0) ` sqlParams := []interface{}{ + utils.DefaultTimeValue, + utils.DefaultTimeValue, + utils.DefaultTimeValue, + utils.DefaultTimeValue, + utils.DefaultTimeValue, + utils.DefaultTimeValue, model.VendorIDEBAI, model.VendorIDEBAI, storeID, } - if skuIDs != nil && len(skuIDs) > 0 { + if len(skuIDs) > 0 { sql += " AND t1.sku_id IN (" + dao.GenQuestionMarks(len(skuIDs)) + ")" sqlParams = append(sqlParams, skuIDs) } @@ -147,11 +130,11 @@ func (p *PurchaseHandler) syncOneStoreSkus(parentTask tasksch.ITask, db *dao.Dao } for k := range catList2Add { if err = dao.AddStoreCategoryMap(db, storeID, k, model.VendorIDEBAI, "", model.SyncFlagNewMask, userName); err != nil { - return err + return "", err } } if err = p.SyncOneStoreCategories(db, storeID, userName); err != nil { - return err + return "", err } if err = dao.GetRows(db, &storeSkuInfoList, sql, sqlParams...); err == nil { task := tasksch.NewParallelTask("syncOneStoreSkus skus", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { @@ -185,7 +168,7 @@ func (p *PurchaseHandler) syncOneStoreSkus(parentTask tasksch.ITask, db *dao.Dao _, err = task.GetResult(0) } } - return err + return "", err } func (p *PurchaseHandler) SyncStoresCategories(db *dao.DaoDB, storeIDs []int, userName string) (err error) { @@ -381,6 +364,7 @@ func (p *PurchaseHandler) syncOneStoreCategoriesFromRemote2Local(db *dao.DaoDB, // 从本地同步分类信息到饿百 // 测试过程中出现过,父分类创建成功后马上创建子分类会报没有父分类错 +// todo 对于deleted_at的处理有问题 func (p *PurchaseHandler) SyncOneStoreCategories(db *dao.DaoDB, storeID int, userName string) (err error) { globals.SugarLogger.Debugf("SyncOneStoreCategories storeID:%d, userName:%s", storeID, userName) diff --git a/business/partner/purchase/elm/store_sku.go b/business/partner/purchase/elm/store_sku.go index 21eedacdd..aa67df6e1 100644 --- a/business/partner/purchase/elm/store_sku.go +++ b/business/partner/purchase/elm/store_sku.go @@ -18,7 +18,7 @@ func (p *PurchaseHandler) ReadStoreSku(storeID, skuID int) (skuNameExt *model.Sk return nil, nil } -func (p *PurchaseHandler) SyncStoresSkus(ctx *jxcontext.Context, parentTask tasksch.ITask, db *dao.DaoDB, storeIDs []int, skuIDs []int, isAsync bool) (hint string, err error) { +func (p *PurchaseHandler) SyncStoreSkus(ctx *jxcontext.Context, parentTask tasksch.ITask, storeID int, skuIDs []int, isAsync bool) (hint string, err error) { return hint, err } diff --git a/business/partner/purchase/jd/store_sku.go b/business/partner/purchase/jd/store_sku.go index 7a50bcd76..67e3ff098 100644 --- a/business/partner/purchase/jd/store_sku.go +++ b/business/partner/purchase/jd/store_sku.go @@ -23,116 +23,102 @@ type tStoreSkuBindExt struct { } // 京东到家,以有库存表示关注(认领) -func (p *PurchaseHandler) SyncStoresSkus(ctx *jxcontext.Context, parentTask tasksch.ITask, db *dao.DaoDB, storeIDs []int, skuIDs []int, isAsync bool) (hint string, err error) { - globals.SugarLogger.Debugf("jd SyncStoresSkus, storeIDs:%v, skuIDs:%v", storeIDs, skuIDs) - - parallelCount := 1 - if len(skuIDs) < MaxSkuBatchSize { - parallelCount = 10 +func (p *PurchaseHandler) SyncStoreSkus(ctx *jxcontext.Context, parentTask tasksch.ITask, storeID int, skuIDs []int, isAsync bool) (hint string, err error) { + globals.SugarLogger.Debugf("jd SyncStoresSkus, storeID:%d, skuIDs:%v", storeID, skuIDs) + sqlWhere := ` + WHERE (t1.jd_sync_status <> 0) AND t1.store_id = ? + ` + sqlWhereParams := []interface{}{ + storeID, } - task := tasksch.NewParallelTask("SyncStoresSkus", tasksch.NewParallelConfig().SetParallelCount(parallelCount), ctx.GetUserName(), func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { - storeID := batchItemList[0].(int) - sqlWhere := ` - WHERE (t1.jd_sync_status <> 0) AND t1.store_id = ? - ` - sqlWhereParams := []interface{}{ - storeID, - } - if len(skuIDs) > 0 { - sqlWhere += " AND t1.sku_id IN (" + dao.GenQuestionMarks(len(skuIDs)) + ")" - sqlWhereParams = append(sqlWhereParams, skuIDs) - } + if len(skuIDs) > 0 { + sqlWhere += " AND t1.sku_id IN (" + dao.GenQuestionMarks(len(skuIDs)) + ")" + sqlWhereParams = append(sqlWhereParams, skuIDs) + } - sql := ` - SELECT t1.*, t2.price_percentage - FROM store_sku_bind t1 - JOIN store_map t2 ON t1.store_id = t2.store_id AND t2.vendor_id = ? AND t2.deleted_at = ? - ` + sqlWhere - var storeSkus []*tStoreSkuBindExt - sqlParams := []interface{}{ - model.VendorIDJD, - utils.DefaultTimeValue, - } - if err = dao.GetRows(db, &storeSkus, sql, append(sqlParams, sqlWhereParams...)...); err == nil { - outStationNo := utils.Int2Str(storeID) - task := tasksch.NewParallelTask("SyncStoresSkus inner", tasksch.NewParallelConfig().SetBatchSize(MaxSkuBatchSize), ctx.GetUserName(), func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { - var skuPriceInfoList []*jdapi.SkuPriceInfo - var skuVendibilityList []*jdapi.StockVendibility - var skuStockList []*jdapi.SkuStock - - for _, v := range batchItemList { - storeSku := v.(*tStoreSkuBindExt) - alreadyAddStock := false - if storeSku.JdSyncStatus&model.SyncFlagChangedMask != 0 { - if storeSku.JdSyncStatus&(model.SyncFlagDeletedMask|model.SyncFlagNewMask) != 0 { // 关注或取消关注 - stock := &jdapi.SkuStock{ - OutSkuId: utils.Int2Str(storeSku.SkuID), - StockQty: MaxStockQty, - } - if storeSku.DeletedAt != utils.DefaultTimeValue { - stock.StockQty = 0 - } else { - alreadyAddStock = true - } - skuStockList = append(skuStockList, stock) - } - if storeSku.JdSyncStatus&(model.SyncFlagPriceMask|model.SyncFlagNewMask) != 0 { - skuPriceInfoList = append(skuPriceInfoList, &jdapi.SkuPriceInfo{ - OutSkuId: utils.Int2Str(storeSku.SkuID), - Price: jxutils.CaculateSkuVendorPrice(storeSku.Price, storeSku.PricePercentage), - }) - } - if storeSku.JdSyncStatus&(model.SyncFlagSaleMask|model.SyncFlagNewMask) != 0 { - vendibility := &jdapi.StockVendibility{ - OutSkuId: utils.Int2Str(storeSku.SkuID), - DoSale: true, - } - if storeSku.Status != model.StoreSkuBindStatusNormal { - vendibility.DoSale = false - } else if !alreadyAddStock { // 如果是设置可售则自动将库存加满 - stock := &jdapi.SkuStock{ - OutSkuId: utils.Int2Str(storeSku.SkuID), - StockQty: MaxStockQty, - } - skuStockList = append(skuStockList, stock) - } - skuVendibilityList = append(skuVendibilityList, vendibility) - } + sql := ` + SELECT t1.*, t2.price_percentage + FROM store_sku_bind t1 + JOIN store_map t2 ON t1.store_id = t2.store_id AND t2.vendor_id = ? AND t2.deleted_at = ? + ` + sqlWhere + var storeSkus []*tStoreSkuBindExt + sqlParams := []interface{}{ + model.VendorIDJD, + utils.DefaultTimeValue, + } + db := dao.GetDB() + if err = dao.GetRows(db, &storeSkus, sql, append(sqlParams, sqlWhereParams...)...); err != nil { + return "", err + } + outStationNo := utils.Int2Str(storeID) + task := tasksch.NewParallelTask("SyncStoresSkus inner", tasksch.NewParallelConfig().SetBatchSize(MaxSkuBatchSize), ctx.GetUserName(), func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { + var skuPriceInfoList []*jdapi.SkuPriceInfo + var skuVendibilityList []*jdapi.StockVendibility + var skuStockList []*jdapi.SkuStock + for _, v := range batchItemList { + storeSku := v.(*tStoreSkuBindExt) + alreadyAddStock := false + if storeSku.JdSyncStatus&model.SyncFlagChangedMask != 0 { + if storeSku.JdSyncStatus&(model.SyncFlagDeletedMask|model.SyncFlagNewMask) != 0 { // 关注或取消关注 + stock := &jdapi.SkuStock{ + OutSkuId: utils.Int2Str(storeSku.SkuID), + StockQty: MaxStockQty, } + if storeSku.DeletedAt != utils.DefaultTimeValue { + stock.StockQty = 0 + } else { + alreadyAddStock = true + } + skuStockList = append(skuStockList, stock) } - if globals.EnableStoreWrite { - // todo 以下可以优化为并行操作 - globals.SugarLogger.Debug(utils.Format4Output(skuVendibilityList, false), utils.Format4Output(skuPriceInfoList, false), utils.Format4Output(skuStockList, false)) - if len(skuVendibilityList) > 0 { - _, err = api.JdAPI.BatchUpdateVendibility(outStationNo, "", skuVendibilityList, ctx.GetUserName()) - } - if err == nil && len(skuPriceInfoList) > 0 { - _, err = api.JdAPI.UpdateVendorStationPrice(outStationNo, "", skuPriceInfoList) - } - if err == nil && len(skuStockList) > 0 { - _, err = api.JdAPI.BatchUpdateCurrentQtys(outStationNo, "", skuStockList, ctx.GetUserName()) - } + if storeSku.JdSyncStatus&(model.SyncFlagPriceMask|model.SyncFlagNewMask) != 0 { + skuPriceInfoList = append(skuPriceInfoList, &jdapi.SkuPriceInfo{ + OutSkuId: utils.Int2Str(storeSku.SkuID), + Price: jxutils.CaculateSkuVendorPrice(storeSku.Price, storeSku.PricePercentage), + }) } - return nil, err - }, storeSkus) - t.AddChild(task).Run() - if _, err = task.GetResult(0); err == nil { - sql := ` - UPDATE store_sku_bind t1 - SET t1.jd_sync_status = 0 - ` + sqlWhere - _, err = dao.ExecuteSQL(db, sql, sqlWhereParams...) + if storeSku.JdSyncStatus&(model.SyncFlagSaleMask|model.SyncFlagNewMask) != 0 { + vendibility := &jdapi.StockVendibility{ + OutSkuId: utils.Int2Str(storeSku.SkuID), + DoSale: true, + } + if storeSku.Status != model.StoreSkuBindStatusNormal { + vendibility.DoSale = false + } else if !alreadyAddStock { // 如果是设置可售则自动将库存加满 + stock := &jdapi.SkuStock{ + OutSkuId: utils.Int2Str(storeSku.SkuID), + StockQty: MaxStockQty, + } + skuStockList = append(skuStockList, stock) + } + skuVendibilityList = append(skuVendibilityList, vendibility) + } + } + } + if globals.EnableStoreWrite { + // todo 以下可以优化为并行操作 + globals.SugarLogger.Debug(utils.Format4Output(skuVendibilityList, false), utils.Format4Output(skuPriceInfoList, false), utils.Format4Output(skuStockList, false)) + if len(skuVendibilityList) > 0 { + _, err = api.JdAPI.BatchUpdateVendibility(outStationNo, "", skuVendibilityList, ctx.GetUserName()) + } + if err == nil && len(skuPriceInfoList) > 0 { + _, err = api.JdAPI.UpdateVendorStationPrice(outStationNo, "", skuPriceInfoList) + } + if err == nil && len(skuStockList) > 0 { + _, err = api.JdAPI.BatchUpdateCurrentQtys(outStationNo, "", skuStockList, ctx.GetUserName()) } } return nil, err - }, storeIDs) - + }, storeSkus) ctx.SetTaskOrAddChild(task, parentTask) task.Run() - if isAsync { - return task.ID, nil + if _, err = task.GetResult(0); err == nil { + sql := ` + UPDATE store_sku_bind t1 + SET t1.jd_sync_status = 0 + ` + sqlWhere + _, err = dao.ExecuteSQL(db, sql, sqlWhereParams...) } - _, err = task.GetResult(0) - return "", err + return task.ID, err }