From 93a720242395184c280fb2be18e29edd097ef8cc Mon Sep 17 00:00:00 2001 From: gazebo Date: Tue, 23 Oct 2018 16:34:42 +0800 Subject: [PATCH] - RefreshAllStoresID - RefreshAllSkusID - use new tasksch when possible(not use run directly). --- business/jxstore/cms/sync.go | 76 +++++++++-------- business/jxstore/financial/financial.go | 5 +- business/jxstore/promotion/jd_promotion.go | 18 ++-- business/jxstore/promotion/promotion.go | 9 +- business/jxutils/tasksch/parallel_task.go | 4 +- business/jxutils/tasksch/sequence_task.go | 4 +- business/jxutils/tasksch/task.go | 5 +- business/partner/partner.go | 5 +- business/partner/purchase/ebai/store.go | 5 ++ business/partner/purchase/ebai/store_sku.go | 92 ++++++++++++--------- business/partner/purchase/elm/store.go | 5 ++ business/partner/purchase/elm/store_sku.go | 5 ++ business/partner/purchase/jd/sku.go | 92 ++++++++++++--------- business/partner/purchase/jd/store.go | 54 ++++++++++++ controllers/cms_sku.go | 15 ---- controllers/cms_sync.go | 55 ++++++++++++ routers/commentsRouter_controllers.go | 24 ++++-- routers/router.go | 5 ++ 18 files changed, 323 insertions(+), 155 deletions(-) create mode 100644 controllers/cms_sync.go diff --git a/business/jxstore/cms/sync.go b/business/jxstore/cms/sync.go index 5af143ae7..5dff79bfd 100644 --- a/business/jxstore/cms/sync.go +++ b/business/jxstore/cms/sync.go @@ -122,7 +122,7 @@ func (v *VendorSync) GetSingleStoreHandler(vendorID int) partner.ISingleStoreHan func (v *VendorSync) syncCategories(ctx *jxcontext.Context, multiStoresHandler partner.IMultipleStoresHandler, db *dao.DaoDB, cats []*model.SkuCategory, userName string) (err error) { syncStatusFieldName := multiStoresHandler.GetFieldSyncStatusName() - task := tasksch.RunParallelTask("syncCategories", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { + task := tasksch.NewParallelTask("syncCategories", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { cat := batchItemList[0].(*model.SkuCategory) updateFields := []string{syncStatusFieldName} syncStatus := jxutils.GetObjFieldByName(cat, syncStatusFieldName).(int8) @@ -141,6 +141,7 @@ func (v *VendorSync) syncCategories(ctx *jxcontext.Context, multiStoresHandler p return nil, err }, cats) ctx.SetTaskOrAddChild(task) + task.Run() _, err = task.GetResult(0) return err } @@ -220,7 +221,7 @@ func (v *VendorSync) SyncSku(ctx *jxcontext.Context, db *dao.DaoDB, nameID, skuI err := dao.GetEntitiesByKV(db, &skuList, cond, true) if err == nil { // globals.SugarLogger.Debug(utils.Format4Output(skuList, false)) - task := tasksch.RunParallelTask("SyncSku", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { + task := tasksch.NewParallelTask("SyncSku", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { sku := batchItemList[0].(*model.Sku) syncStatus := jxutils.GetObjFieldByName(sku, syncStatusFieldName).(int8) if (skuID == -1 || skuID == sku.ID) && (syncStatus != 0) { @@ -240,7 +241,7 @@ func (v *VendorSync) SyncSku(ctx *jxcontext.Context, db *dao.DaoDB, nameID, skuI } return nil, err }, skuList) - t.AddChild(task) + t.AddChild(task).Run() _, err = task.GetResult(0) } return nil, err @@ -262,8 +263,9 @@ func (v *VendorSync) SyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendo func (v *VendorSync) LoopStoreMap(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync bool, userName string, storeID int, handler tasksch.WorkFunc) (hint string, err error) { storeMaps, err := GetStoreVendorMaps(ctx, db, storeID, -1) if err == nil { - task := tasksch.RunManagedParallelTask(taskName, nil, userName, handler, storeMaps) + task := tasksch.NewParallelTask(taskName, nil, userName, handler, storeMaps) ctx.SetTaskOrAddChild(task) + tasksch.ManageTask(task).Run() hint = task.ID if !isAsync { _, err = task.GetResult(0) @@ -276,8 +278,9 @@ func (v *VendorSync) LoopMultiStoresVendors(ctx *jxcontext.Context, db *dao.DaoD if taskName == "" { taskName = "LoopMultiStoresVendors" } - task := tasksch.RunManagedParallelTask(taskName, nil, userName, handler, v.MultiStoreVendorIDs) + task := tasksch.NewParallelTask(taskName, nil, userName, handler, v.MultiStoreVendorIDs) ctx.SetTaskOrAddChild(task) + tasksch.ManageTask(task).Run() if !isAsync { _, err = task.GetResult(0) } @@ -308,8 +311,9 @@ func (v *VendorSync) LoopStoreVendors(ctx *jxcontext.Context, db *dao.DaoDB, ven } allHandlers = allHandlers[:count] } - task := tasksch.RunManagedParallelTask(taskName, nil, userName, handler, allHandlers) + task := tasksch.NewParallelTask(taskName, nil, userName, handler, allHandlers) ctx.SetTaskOrAddChild(task) + tasksch.ManageTask(task).Run() if !isAsync { _, err = task.GetResult(0) } @@ -325,8 +329,9 @@ func (v *VendorSync) LoopSingleStoreVendors(ctx *jxcontext.Context, db *dao.DaoD SELECT * FROM store_map WHERE vendor_id IN (`+dao.GenQuestionMarks(len(v.SingleStoreVendorIDs))+")", v.SingleStoreVendorIDs); err == nil { - task := tasksch.RunManagedParallelTask(taskName, nil, userName, handler, storeMaps) + task := tasksch.NewParallelTask(taskName, nil, userName, handler, storeMaps) ctx.SetTaskOrAddChild(task) + tasksch.ManageTask(task).Run() hint = task.ID if !isAsync { _, err = task.GetResult(0) @@ -335,35 +340,40 @@ func (v *VendorSync) LoopSingleStoreVendors(ctx *jxcontext.Context, db *dao.DaoD return hint, makeSyncError(err) } -func (v *VendorSync) RefreshSkuIDs(ctx *jxcontext.Context, nameID, skuID int, userName string) (err error) { - sql := ` - SELECT t1.id - FROM sku t1 - JOIN sku_name t2 ON t1.name_id = t2.id - WHERE 1 = 1 - ` - sqlParams := []interface{}{} - if nameID != -1 { - sql += " AND t1.name_id = ?" - sqlParams = append(sqlParams, nameID) - } - if skuID != -1 { - sql += " AND t1.id = ?" - sqlParams = append(sqlParams, skuID) +func (v *VendorSync) RefreshAllSkusID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int, storeIDs []int) (hint string, err error) { + task := tasksch.NewParallelTask("RefreshAllSkusID", nil, ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + vendorID := batchItemList[0].(int) + if handler := v.GetStoreHandler(vendorID); handler != nil { + if multiHandler, ok := handler.(partner.IMultipleStoresHandler); ok { + _, err = multiHandler.RefreshAllSkusID(ctx, false) + } else if singleHandler, ok := handler.(partner.ISingleStoreHandler); ok { + _, err = singleHandler.RefreshStoresAllSkusID(ctx, false, storeIDs) + } + } + return nil, err + }, vendorIDs) + ctx.SetTaskOrAddChild(task) + tasksch.ManageTask(task).Run() + if !isAsync { + _, err = task.GetResult(0) } + return task.ID, err +} - var ids []int - db := dao.GetDB() - if err = dao.GetRows(db, &ids, sql, sqlParams); err == nil { - // globals.SugarLogger.Debug(utils.Format4Output(ids, false)) - _, err = v.LoopMultiStoresVendors(ctx, db, "RefreshSkuIDs", false, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { - multiStoresHandler := v.GetMultiStoreHandler(batchItemList[0].(int)) - err := multiStoresHandler.SyncSkusIDMap(db, ids, userName) - globals.SugarLogger.Debug(err) - return nil, err - }) +func (v *VendorSync) RefreshAllStoresID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int) (hint string, err error) { + task := tasksch.NewParallelTask("RefreshAllStoresID", nil, ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + vendorID := batchItemList[0].(int) + if handler := v.GetStoreHandler(vendorID); handler != nil { + _, err = handler.RefreshAllStoresID(ctx, false) + } + return nil, err + }, vendorIDs) + ctx.SetTaskOrAddChild(task) + tasksch.ManageTask(task).Run() + if !isAsync { + _, err = task.GetResult(0) } - return err + return task.ID, err } func makeSyncError(err error) (newErr error) { diff --git a/business/jxstore/financial/financial.go b/business/jxstore/financial/financial.go index 0f6336994..524ddabf1 100644 --- a/business/jxstore/financial/financial.go +++ b/business/jxstore/financial/financial.go @@ -52,7 +52,7 @@ func SendFilesToStores(ctx *jxcontext.Context, files []*multipart.FileHeader, is } upToken := putPolicy.UploadToken(api.QiniuAPI) cfg := &storage.Config{} - task := tasksch.RunManagedParallelTask("SendFilesToStores", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + task := tasksch.NewParallelTask("SendFilesToStores", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { fileInfo := batchItemList[0].(*tUploadFileInfo) fileHeader := fileInfo.FileHeader storeID := fileInfo.StoreID @@ -83,7 +83,8 @@ func SendFilesToStores(ctx *jxcontext.Context, files []*multipart.FileHeader, is } return retVal, err }, fileList) - + ctx.SetTaskOrAddChild(task) + tasksch.ManageTask(task).Run() hint = task.ID if !isAsync { _, err = task.GetResult(0) diff --git a/business/jxstore/promotion/jd_promotion.go b/business/jxstore/promotion/jd_promotion.go index ba347b970..c4c14b078 100644 --- a/business/jxstore/promotion/jd_promotion.go +++ b/business/jxstore/promotion/jd_promotion.go @@ -259,9 +259,9 @@ func CreateJdPromotion(ctx *jxcontext.Context, isIDJd bool, isAsync bool, params } dao.Commit(db) - rootTask := tasksch.RunManagedSeqTask("CreateJdPromotion", userName, func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { + rootTask := tasksch.NewSeqTask("CreateJdPromotion", userName, func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { if step == 0 { - task1 := tasksch.RunParallelTask("CreateJdPromotion update sku price", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + task1 := tasksch.NewParallelTask("CreateJdPromotion update sku price", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { storeID := batchItemList[0].(int) modifyPricesList := jxutils.SplitSlice(modifyPricesList[storeID], jdapi.MaxStoreSkuBatchSize) for _, modifyPrices := range modifyPricesList { @@ -275,7 +275,7 @@ func CreateJdPromotion(ctx *jxcontext.Context, isIDJd bool, isAsync bool, params } return nil, nil }, jxStoreIDs) - task.AddChild(task1) + task.AddChild(task1).Run() if _, err = task1.GetResult(0); err != nil { return "", err } @@ -284,7 +284,7 @@ func CreateJdPromotion(ctx *jxcontext.Context, isIDJd bool, isAsync bool, params return "", err } } else if step == 2 { - task2 := tasksch.RunParallelTask("CreateJdPromotion CreatePromotionSku", tasksch.NewParallelConfig().SetBatchSize(jdapi.MaxPromotionSkuCount), userName, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + task2 := tasksch.NewParallelTask("CreateJdPromotion CreatePromotionSku", tasksch.NewParallelConfig().SetBatchSize(jdapi.MaxPromotionSkuCount), userName, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { skus := make([]map[string]interface{}, len(batchItemList)) for k, v := range batchItemList { skus[k] = v.(map[string]interface{}) @@ -292,7 +292,7 @@ func CreateJdPromotion(ctx *jxcontext.Context, isIDJd bool, isAsync bool, params _, err = promotionHandler.CreatePromotionSku(infoId, "", skus) return nil, err }, promotionPrices) - task.AddChild(task2) + task.AddChild(task2).Run() if _, err = task2.GetResult(0); err != nil { return "", err } @@ -301,6 +301,8 @@ func CreateJdPromotion(ctx *jxcontext.Context, isIDJd bool, isAsync bool, params } return nil, err }, 4) + ctx.SetTaskOrAddChild(rootTask) + tasksch.ManageTask(rootTask).Run() if !isAsync { _, err = rootTask.GetResult(0) } @@ -310,7 +312,7 @@ func CreateJdPromotion(ctx *jxcontext.Context, isIDJd bool, isAsync bool, params func CreatePromotionByExcel(ctx *jxcontext.Context, isAsync bool, promotionType int, fileHeader *multipart.FileHeader, userName string) (hint string, err error) { file, err := fileHeader.Open() if err != nil { - return hint, err + return "", err } contents := excel.Excel2Slice(file) file.Close() @@ -321,10 +323,10 @@ func CreatePromotionByExcel(ctx *jxcontext.Context, isAsync bool, promotionType Type: promotionType, } if promotionParams.BeginAt, err = excelStr2Time(v[1][colBeginAtIndex]); err != nil { - return hint, err + return "", err } if promotionParams.EndAt, err = excelStr2Time(v[1][colEndAtIndex]); err != nil { - return hint, err + return "", err } for rowIndex, row := range v { diff --git a/business/jxstore/promotion/promotion.go b/business/jxstore/promotion/promotion.go index f8bb96924..8df86eacf 100644 --- a/business/jxstore/promotion/promotion.go +++ b/business/jxstore/promotion/promotion.go @@ -51,7 +51,7 @@ func SendAdvertingByGoodsOrder(ctx *jxcontext.Context, advertising string, days var mobiles []*tMobileInfo db := dao.GetDB() if err = dao.GetRows(db, &mobiles, sql1+" UNION DISTINCT "+sql2, sqlParams...); err != nil { - return hint, err + return "", err } index := 0 mobileNumbers := make([]string, len(mobiles)) @@ -65,7 +65,7 @@ func SendAdvertingByGoodsOrder(ctx *jxcontext.Context, advertising string, days mobileNumbers = append(mobileNumbers, "18180948107") smsClient := aliyunsmsclient.New("http://dysmsapi.aliyuncs.com/") - task := tasksch.RunManagedParallelTask("SendAdvertingByGoodsOrder", tasksch.NewParallelConfig().SetBatchSize(MaxBatchSize), userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + task := tasksch.NewParallelTask("SendAdvertingByGoodsOrder", tasksch.NewParallelConfig().SetBatchSize(MaxBatchSize), userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { numbers := make([]string, len(batchItemList)) for k, v := range batchItemList { numbers[k] = v.(string) @@ -79,10 +79,11 @@ func SendAdvertingByGoodsOrder(ctx *jxcontext.Context, advertising string, days } return nil, err }, mobileNumbers) + ctx.SetTaskOrAddChild(task) + tasksch.ManageTask(task).Run() - hint = task.ID if !isAsync { _, err = task.GetResult(0) } - return hint, err + return task.ID, err } diff --git a/business/jxutils/tasksch/parallel_task.go b/business/jxutils/tasksch/parallel_task.go index 18d012c62..c2d34a11a 100644 --- a/business/jxutils/tasksch/parallel_task.go +++ b/business/jxutils/tasksch/parallel_task.go @@ -193,7 +193,7 @@ func (task *ParallelTask) Run() { }) } -func (t *ParallelTask) AddChild(task ITask) { - t.BaseTask.AddChild(task) +func (t *ParallelTask) AddChild(task ITask) ITask { task.SetParent(t) + return t.BaseTask.AddChild(task) } diff --git a/business/jxutils/tasksch/sequence_task.go b/business/jxutils/tasksch/sequence_task.go index 5e13fea24..70c51a937 100644 --- a/business/jxutils/tasksch/sequence_task.go +++ b/business/jxutils/tasksch/sequence_task.go @@ -76,7 +76,7 @@ func (task *SeqTask) Run() { }) } -func (t *SeqTask) AddChild(task ITask) { - t.BaseTask.AddChild(task) +func (t *SeqTask) AddChild(task ITask) ITask { task.SetParent(t) + return t.BaseTask.AddChild(task) } diff --git a/business/jxutils/tasksch/task.go b/business/jxutils/tasksch/task.go index 29d0f4cff..9ca5f9fc2 100644 --- a/business/jxutils/tasksch/task.go +++ b/business/jxutils/tasksch/task.go @@ -36,7 +36,7 @@ type ITask interface { GetStatus() int GetCreatedAt() time.Time - AddChild(task ITask) + AddChild(task ITask) ITask GetChildren() TaskList SetParent(parentTask ITask) @@ -182,11 +182,12 @@ func (t *BaseTask) GetStatus() int { return t.Status } -func (t *BaseTask) AddChild(task ITask) { +func (t *BaseTask) AddChild(task ITask) ITask { t.locker.Lock() defer t.locker.Unlock() t.Children = append(t.Children, task) + return task } func (t *BaseTask) GetChildren() (children TaskList) { diff --git a/business/partner/partner.go b/business/partner/partner.go index be25f508b..b1f6ccd7f 100644 --- a/business/partner/partner.go +++ b/business/partner/partner.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model/dao" ) @@ -80,6 +81,7 @@ type IPurchasePlatformHandler interface { // CloseStore(vendorStoreID, closeNotice, userName string) error SyncStoresSkus(db *dao.DaoDB, storeIDs []int, skuIDs []int, isAsync bool, userName string) (hint string, err error) + RefreshAllStoresID(ctx *jxcontext.Context, isAsync bool) (hint string, err error) GetVendorID() int GetFieldIDName() string @@ -102,7 +104,7 @@ type IMultipleStoresHandler interface { UpdateSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error) DeleteSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error) - SyncSkusIDMap(db *dao.DaoDB, skuIDs []int, userName string) (err error) + RefreshAllSkusID(ctx *jxcontext.Context, isAsync bool) (hint string, err error) } type ISingleStoreHandler interface { @@ -111,6 +113,7 @@ type ISingleStoreHandler interface { ReadStoreCategories(storeID int) (cats []*model.SkuCategory, err error) ReadStoreSku(storeID, skuID int) (skuNameExt *model.SkuNameExt, err error) + RefreshStoresAllSkusID(ctx *jxcontext.Context, isAsync bool, storeIDs []int) (hint string, err error) } type IDeliveryPlatformHandler interface { diff --git a/business/partner/purchase/ebai/store.go b/business/partner/purchase/ebai/store.go index 8c78b1a41..6c1f712ac 100644 --- a/business/partner/purchase/ebai/store.go +++ b/business/partner/purchase/ebai/store.go @@ -9,6 +9,7 @@ import ( "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxcallback/scheduler" "git.rosy.net.cn/jx-callback/business/jxutils" + "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model/dao" "git.rosy.net.cn/jx-callback/business/partner" @@ -179,6 +180,10 @@ func (p *PurchaseHandler) UpdateStore(db *dao.DaoDB, storeID int, userName strin return err } +func (p *PurchaseHandler) RefreshAllStoresID(ctx *jxcontext.Context, isAsync bool) (hint string, err error) { + return hint, err +} + func EbaiDeliveryType2Jx(deliveryType string) int8 { spIndex := strings.Index(deliveryType, "|") baiduDeliveryType := utils.Str2Int64(deliveryType[spIndex+1:]) diff --git a/business/partner/purchase/ebai/store_sku.go b/business/partner/purchase/ebai/store_sku.go index 08d24fbb6..8776a52f4 100644 --- a/business/partner/purchase/ebai/store_sku.go +++ b/business/partner/purchase/ebai/store_sku.go @@ -8,6 +8,7 @@ import ( "git.rosy.net.cn/baseapi/platformapi/ebaiapi" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxutils" + "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" "git.rosy.net.cn/jx-callback/business/jxutils/tasksch" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model/dao" @@ -72,18 +73,16 @@ var ( ) func (p *PurchaseHandler) SyncStoresSkus(db *dao.DaoDB, storeIDs []int, skuIDs []int, isAsync bool, userName string) (hint string, err error) { - if globals.EnableStoreWrite { - task := tasksch.RunSeqTask("ebai.SyncStoresSkus", userName, func(t *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { - storeID := storeIDs[step] - err = p.syncOneStoreSkus(t, db, storeID, skuIDs, false, userName) - return nil, err - }, len(storeIDs)) - hint = task.ID - if !isAsync { - _, err = task.GetResult(0) - } + task := tasksch.RunSeqTask("ebai.SyncStoresSkus", userName, func(t *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { + storeID := storeIDs[step] + err = p.syncOneStoreSkus(t, db, storeID, skuIDs, false, userName) + return nil, err + }, len(storeIDs)) + + if !isAsync { + _, err = task.GetResult(0) } - return hint, err + return task.ID, err } func (p *PurchaseHandler) syncOneStoreSkus(parentTask tasksch.ITask, db *dao.DaoDB, storeID int, skuIDs []int, isAsync bool, userName string) (err error) { @@ -145,33 +144,34 @@ func (p *PurchaseHandler) syncOneStoreSkus(parentTask tasksch.ITask, db *dao.Dao return err } if err = dao.GetRows(db, &storeSkuInfoList, sql, sqlParams...); err == nil { - task := tasksch.RunParallelTask("syncOneStoreSkus skus", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + task := tasksch.NewParallelTask("syncOneStoreSkus skus", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { storeSku := batchItemList[0].(*tStoreSkuFullInfo) updateFields := []string{model.FieldEbaiSyncStatus} - if storeSku.EbaiSyncStatus&model.SyncFlagDeletedMask != 0 { - err = api.EbaiAPI.SkuDelete(strStoreID, utils.Int64ToStr(storeSku.EbaiID)) - } else if storeSku.EbaiSyncStatus&model.SyncFlagNewMask != 0 { - // globals.SugarLogger.Debug(utils.Format4Output(genSkuParamsFromStoreSkuInfo(storeSku), false)) - if storeSku.EbaiID, err = api.EbaiAPI.SkuCreate(strStoreID, storeSku.SkuID, genSkuParamsFromStoreSkuInfo(storeSku)); err == nil { - // todo 创建SKU后马上绑定分类,会失败,待解决 - updateFields = append(updateFields, model.FieldEbaiID) - time.AfterFunc(3*time.Second, func() { - api.EbaiAPI.SkuShopCategoryMap(strStoreID, storeSku.EbaiID, utils.Int64ToStr(storeSku.CatEbaiID)) - }) - } - } else if storeSku.EbaiSyncStatus&model.SyncFlagModifiedMask != 0 { - if _, err = api.EbaiAPI.SkuUpdate(strStoreID, storeSku.EbaiID, genSkuParamsFromStoreSkuInfo(storeSku)); err == nil { - err = api.EbaiAPI.SkuShopCategoryMap(strStoreID, storeSku.EbaiID, utils.Int64ToStr(storeSku.CatEbaiID)) + if globals.EnableStoreWrite { + if storeSku.EbaiSyncStatus&model.SyncFlagDeletedMask != 0 { + err = api.EbaiAPI.SkuDelete(strStoreID, utils.Int64ToStr(storeSku.EbaiID)) + } else if storeSku.EbaiSyncStatus&model.SyncFlagNewMask != 0 { + // globals.SugarLogger.Debug(utils.Format4Output(genSkuParamsFromStoreSkuInfo(storeSku), false)) + if storeSku.EbaiID, err = api.EbaiAPI.SkuCreate(strStoreID, storeSku.SkuID, genSkuParamsFromStoreSkuInfo(storeSku)); err == nil { + // todo 创建SKU后马上绑定分类,会失败,待解决 + updateFields = append(updateFields, model.FieldEbaiID) + time.AfterFunc(3*time.Second, func() { + api.EbaiAPI.SkuShopCategoryMap(strStoreID, storeSku.EbaiID, utils.Int64ToStr(storeSku.CatEbaiID)) + }) + } + } else if storeSku.EbaiSyncStatus&model.SyncFlagModifiedMask != 0 { + if _, err = api.EbaiAPI.SkuUpdate(strStoreID, storeSku.EbaiID, genSkuParamsFromStoreSkuInfo(storeSku)); err == nil { + err = api.EbaiAPI.SkuShopCategoryMap(strStoreID, storeSku.EbaiID, utils.Int64ToStr(storeSku.CatEbaiID)) + } } } - if err == nil { storeSku.EbaiSyncStatus = 0 _, err = dao.UpdateEntity(nil, &storeSku.StoreSkuBind, updateFields...) } return nil, err }, storeSkuInfoList) - parentTask.AddChild(task) + parentTask.AddChild(task).Run() _, err = task.GetResult(0) } } @@ -246,7 +246,10 @@ func (p *PurchaseHandler) DeleteRemoteSkus(storeID int, vendorSkuIDs []string) ( for k, v := range batchItemList { strList[k] = v.(string) } - return nil, api.EbaiAPI.SkuDelete(utils.Int2Str(storeID), strings.Join(strList, ",")) + if globals.EnableStoreWrite { + err = api.EbaiAPI.SkuDelete(utils.Int2Str(storeID), strings.Join(strList, ",")) + } + return nil, err }, vendorSkuIDs) _, err = task.GetResult(0) return err @@ -264,12 +267,19 @@ func (p *PurchaseHandler) DeleteRemoteCategories(storeID int, vendorCatIDs []int } } task := tasksch.RunParallelTask("DeleteRemoteCategories", nil, "", func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { - return nil, api.EbaiAPI.ShopCategoryDelete(strStoreID, batchItemList[0].(int64)) + if globals.EnableStoreWrite { + err = api.EbaiAPI.ShopCategoryDelete(strStoreID, batchItemList[0].(int64)) + } + return nil, err }, vendorCatIDs) _, err = task.GetResult(0) return err } +func (p *PurchaseHandler) RefreshStoresAllSkusID(ctx *jxcontext.Context, isAsync bool, storeIDs []int) (hint string, err error) { + return hint, err +} + /////////// func genSkuParamsFromStoreSkuInfo(storeSku *tStoreSkuFullInfo) map[string]interface{} { return map[string]interface{}{ @@ -384,19 +394,21 @@ func (p *PurchaseHandler) SyncOneStoreCategories(db *dao.DaoDB, storeID int, use updateFields := []string{model.FieldEbaiSyncStatus} catInfo := batchItemList[0].(*tStoreCatInfo) // globals.SugarLogger.Debug(utils.Format4Output(catInfo, false)) - db2 := dao.GetDB() - if catInfo.EbaiSyncStatus&model.SyncFlagDeletedMask != 0 { // 删除 - err = api.EbaiAPI.ShopCategoryDelete(strStoreID, catInfo.EbaiID) - } else if catInfo.EbaiSyncStatus&model.SyncFlagNewMask != 0 { // 新增 - ebaiID, err2 := api.EbaiAPI.ShopCategoryCreate(strStoreID, catInfo.ParentEbaiID, formatName(catInfo.Name), int(catInfo.Seq+1), utils.Int2Str(catInfo.CategoryID)) - if err = err2; err == nil { - catInfo.EbaiID = ebaiID - updateFields = append(updateFields, model.FieldEbaiID) + if globals.EnableStoreWrite { + if catInfo.EbaiSyncStatus&model.SyncFlagDeletedMask != 0 { // 删除 + err = api.EbaiAPI.ShopCategoryDelete(strStoreID, catInfo.EbaiID) + } else if catInfo.EbaiSyncStatus&model.SyncFlagNewMask != 0 { // 新增 + ebaiID, err2 := api.EbaiAPI.ShopCategoryCreate(strStoreID, catInfo.ParentEbaiID, formatName(catInfo.Name), int(catInfo.Seq+1), utils.Int2Str(catInfo.CategoryID)) + if err = err2; err == nil { + catInfo.EbaiID = ebaiID + updateFields = append(updateFields, model.FieldEbaiID) + } + } else if catInfo.EbaiSyncStatus&model.SyncFlagModifiedMask != 0 { // 修改 + err = api.EbaiAPI.ShopCategoryUpdate(strStoreID, catInfo.EbaiID, formatName(catInfo.Name), int(catInfo.Seq+1), utils.Int2Str(catInfo.CategoryID)) } - } else if catInfo.EbaiSyncStatus&model.SyncFlagModifiedMask != 0 { // 修改 - err = api.EbaiAPI.ShopCategoryUpdate(strStoreID, catInfo.EbaiID, formatName(catInfo.Name), int(catInfo.Seq+1), utils.Int2Str(catInfo.CategoryID)) } if err == nil { + db2 := dao.GetDB() catInfo.EbaiSyncStatus = 0 _, err = dao.UpdateEntity(db2, &catInfo.StoreSkuCategoryMap, updateFields...) } diff --git a/business/partner/purchase/elm/store.go b/business/partner/purchase/elm/store.go index 4cdd18307..f2a751d24 100644 --- a/business/partner/purchase/elm/store.go +++ b/business/partner/purchase/elm/store.go @@ -1,6 +1,7 @@ package elm import ( + "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model/dao" ) @@ -21,6 +22,10 @@ func (p *PurchaseHandler) UpdateStore(db *dao.DaoDB, storeID int, userName strin return nil } +func (p *PurchaseHandler) RefreshAllStoresID(ctx *jxcontext.Context, isAsync bool) (hint string, err error) { + return hint, err +} + // func (p *PurchaseHandler) EnableAutoAcceptOrder(vendorStoreID string, isEnabled bool) error { // return nil // } diff --git a/business/partner/purchase/elm/store_sku.go b/business/partner/purchase/elm/store_sku.go index cd848ccc4..7f37e428c 100644 --- a/business/partner/purchase/elm/store_sku.go +++ b/business/partner/purchase/elm/store_sku.go @@ -1,6 +1,7 @@ package elm import ( + "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model/dao" ) @@ -19,3 +20,7 @@ func (p *PurchaseHandler) ReadStoreSku(storeID, skuID int) (skuNameExt *model.Sk func (p *PurchaseHandler) SyncStoresSkus(db *dao.DaoDB, storeIDs []int, skuIDs []int, isAsync bool, userName string) (hint string, err error) { return hint, err } + +func (p *PurchaseHandler) RefreshStoresAllSkusID(ctx *jxcontext.Context, isAsync bool, storeIDs []int) (hint string, err error) { + return hint, err +} diff --git a/business/partner/purchase/jd/sku.go b/business/partner/purchase/jd/sku.go index 547416e63..ca943cb48 100644 --- a/business/partner/purchase/jd/sku.go +++ b/business/partner/purchase/jd/sku.go @@ -6,6 +6,7 @@ import ( "git.rosy.net.cn/baseapi/platformapi/jdapi" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxutils" + "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" "git.rosy.net.cn/jx-callback/business/jxutils/tasksch" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model/dao" @@ -253,48 +254,63 @@ func (p *PurchaseHandler) DeleteSku(db *dao.DaoDB, sku *model.Sku, userName stri return err } -func (p *PurchaseHandler) SyncSkusIDMap(db *dao.DaoDB, skuIDs []int, userName string) (err error) { - sql := ` - SELECT t1.id out_sku_id, t1.jd_id sku_id - FROM sku t1 - WHERE t1.jd_sync_status <> 0 - ` - // AND t1.jd_sync_status <> 0 - sqlParams := []interface{}{} - if skuIDs != nil && len(skuIDs) > 0 { - sql += " AND t1.id IN (" + dao.GenQuestionMarks(len(skuIDs)) + ")" - sqlParams = append(sqlParams, skuIDs) - } +func (p *PurchaseHandler) RefreshAllSkusID(ctx *jxcontext.Context, isAsync bool) (hint string, err error) { + globals.SugarLogger.Debugf("jd RefreshAllSkusID") + + db := dao.GetDB() var skuPairs []*jdapi.SkuIDPair - if err = dao.GetRows(db, &skuPairs, sql, sqlParams); err == nil { - // globals.SugarLogger.Debug(utils.Format4Output(skuPairs, false)) - globals.SugarLogger.Debug(len(skuPairs)) - task := tasksch.RunParallelTask("SyncSkusIDMap", tasksch.NewParallelConfig().SetIsContinueWhenError(true), userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { - skuPairs := make([]*jdapi.SkuIDPair, len(batchItemList)) - for k, v := range batchItemList { - pair := v.(*jdapi.SkuIDPair) - skuPairs[k] = &jdapi.SkuIDPair{ - SkuId: pair.SkuId, - OutSkuId: pair.OutSkuId, //utils.GetUUID(), + + rootTask := tasksch.NewSeqTask("jd RefreshAllSkusID", ctx.GetUserName(), func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { + switch step { + case 0: + err = dao.GetRows(db, &skuPairs, ` + SELECT t1.id out_sku_id, t1.jd_id sku_id + FROM sku t1 + WHERE t1.deleted_at = ? + `, utils.DefaultTimeValue) + case 1: + task1 := tasksch.NewParallelTask("RefreshAllSkusID update uuid", tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx.GetUserName(), func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + skuPairs := make([]*jdapi.SkuIDPair, len(batchItemList)) + for k, v := range batchItemList { + pair := v.(*jdapi.SkuIDPair) + skuPairs[k] = &jdapi.SkuIDPair{ + SkuId: pair.SkuId, + OutSkuId: utils.GetUUID(), + } } - } - if globals.EnableStoreWrite { - if _, err = api.JdAPI.BatchUpdateOutSkuId(skuPairs); err == nil { - // for k, v := range batchItemList { - // pair := v.(*jdapi.SkuIDPair) - // skuPairs[k].OutSkuId = pair.OutSkuId - // } - // _, err = api.JdAPI.BatchUpdateOutSkuId(skuPairs) + if globals.EnableStoreWrite { + _, err = api.JdAPI.BatchUpdateOutSkuId(skuPairs) } - } - if err != nil { - globals.SugarLogger.Debug(utils.Format4Output(skuPairs, false)) - } - return nil, err - }, skuPairs) - _, err = task.GetResult(0) + return nil, err + }, skuPairs) + rootTask.AddChild(task1).Run() + _, err = task1.GetResult(0) + case 2: + task2 := tasksch.NewParallelTask("RefreshAllSkusID update id", tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx.GetUserName(), func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + skuPairs := make([]*jdapi.SkuIDPair, len(batchItemList)) + for k, v := range batchItemList { + pair := v.(*jdapi.SkuIDPair) + skuPairs[k] = &jdapi.SkuIDPair{ + SkuId: pair.SkuId, + OutSkuId: pair.OutSkuId, + } + } + if globals.EnableStoreWrite { + _, err = api.JdAPI.BatchUpdateOutSkuId(skuPairs) + } + return nil, err + }, skuPairs) + rootTask.AddChild(task2).Run() + _, err = task2.GetResult(0) + } + return nil, err + }, 3) + ctx.SetTaskOrAddChild(rootTask) + rootTask.Run() + if !isAsync { + _, err = rootTask.GetResult(0) } - return err + return rootTask.ID, err } func jdStatus2jxStatus(jdStatus int) (jxStatus int) { diff --git a/business/partner/purchase/jd/store.go b/business/partner/purchase/jd/store.go index 8d7670ee4..48029aef1 100644 --- a/business/partner/purchase/jd/store.go +++ b/business/partner/purchase/jd/store.go @@ -3,6 +3,8 @@ package jd import ( "git.rosy.net.cn/jx-callback/business/jxcallback/scheduler" "git.rosy.net.cn/jx-callback/business/jxutils" + "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" + "git.rosy.net.cn/jx-callback/business/jxutils/tasksch" "git.rosy.net.cn/jx-callback/business/model/dao" "git.rosy.net.cn/jx-callback/business/partner" "git.rosy.net.cn/jx-callback/globals" @@ -188,6 +190,58 @@ func (p *PurchaseHandler) GetAllStoresFromRemote() ([]*model.Store, error) { return nil, err } +func (p *PurchaseHandler) RefreshAllStoresID(ctx *jxcontext.Context, isAsync bool) (hint string, err error) { + globals.SugarLogger.Debugf("jd RefreshAllStoresID") + + var stores []*tJdStoreInfo + db := dao.GetDB() + rootTask := tasksch.NewSeqTask("jd RefreshAllStoresID", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { + switch step { + case 0: + err = dao.GetRows(db, &stores, ` + SELECT t1.*, t2.vendor_store_id + FROM store t1 + JOIN store_map t2 ON t1.id = t2.store_id AND t2.deleted_at = ? + WHERE t1.deleted_at = ? + `, utils.DefaultTimeValue, utils.DefaultTimeValue) + case 1: + task1 := tasksch.NewParallelTask("jd RefreshAllStoresID update to uuid", nil, ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + store := batchItemList[0].(*tJdStoreInfo) + storeParams := map[string]interface{}{ + "outSystemId": utils.GetUUID(), + } + if globals.EnableStoreWrite { + err = api.JdAPI.UpdateStoreInfo4Open(store.VendorStoreID, ctx.GetUserName(), storeParams) + } + return nil, err + }, stores) + task.AddChild(task1).Run() + _, err = task1.GetResult(0) + case 2: + task2 := tasksch.NewParallelTask("jd RefreshAllStoresID update outSystemId", nil, ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + store := batchItemList[0].(*tJdStoreInfo) + storeParams := map[string]interface{}{ + "outSystemId": store.ID, + } + if globals.EnableStoreWrite { + err = api.JdAPI.UpdateStoreInfo4Open(store.VendorStoreID, ctx.GetUserName(), storeParams) + } + return nil, err + }, stores) + task.AddChild(task2).Run() + _, err = task2.GetResult(0) + } + return nil, err + }, 3) + + ctx.SetTaskOrAddChild(rootTask) + rootTask.Run() + if !isAsync { + _, err = rootTask.GetResult(0) + } + return rootTask.ID, err +} + // func JdRange2JxRange(jdRanges string) (jxRanges string) { // coords := strings.Split(jdRanges, ";") // intCoords := []string{} diff --git a/controllers/cms_sku.go b/controllers/cms_sku.go index 7eea1c905..36b0d9d60 100644 --- a/controllers/cms_sku.go +++ b/controllers/cms_sku.go @@ -317,18 +317,3 @@ func (c *SkuController) SyncSku() { return retVal, "", err }) } - -// @Title 重新刷新商家ID -// @Description 重新刷新商家ID -// @Param token header string true "认证token" -// @Param nameID query int true "name ID, -1表示所有" -// @Param skuID query int true "sku ID, -1表示所有" -// @Success 200 {object} controllers.CallResult -// @Failure 200 {object} controllers.CallResult -// @router /RefreshSkuIDs [put] -func (c *SkuController) RefreshSkuIDs() { - c.callRefreshSkuIDs(func(params *tSkuRefreshSkuIDsParams) (retVal interface{}, errCode string, err error) { - err = cms.CurVendorSync.RefreshSkuIDs(params.Ctx, params.NameID, params.SkuID, params.Ctx.GetUserName()) - return retVal, "", err - }) -} diff --git a/controllers/cms_sync.go b/controllers/cms_sync.go new file mode 100644 index 000000000..e8dfa4839 --- /dev/null +++ b/controllers/cms_sync.go @@ -0,0 +1,55 @@ +package controllers + +import ( + "git.rosy.net.cn/baseapi/utils" + "git.rosy.net.cn/jx-callback/business/jxstore/cms" + "github.com/astaxie/beego" +) + +type SyncController struct { + beego.Controller +} + +// @Title 查询长时间运行任务 +// @Description 查询长时间运行任务 +// @Param token header string true "认证token" +// @Param vendorIDs query string true "需要刷新的厂商ID列表" +// @Param isAsync query bool false "起始状态" +// @Success 200 {object} controllers.CallResult +// @Failure 200 {object} controllers.CallResult +// @router /RefreshAllStoresID [put] +func (c *SyncController) RefreshAllStoresID() { + c.callRefreshAllStoresID(func(params *tSyncRefreshAllStoresIDParams) (retVal interface{}, errCode string, err error) { + var vendorIDs []int + if err = utils.UnmarshalUseNumber([]byte(params.VendorIDs), &vendorIDs); err != nil { + return retVal, "", err + } + retVal, err = cms.CurVendorSync.RefreshAllStoresID(params.Ctx, params.IsAsync, vendorIDs) + return retVal, "", err + }) +} + +// @Title 重新刷新商家ID +// @Description 重新刷新商家ID,单门店厂商必须指定storeIDs +// @Param token header string true "认证token" +// @Param vendorIDs query string true "需要刷新的厂商ID列表" +// @Param storeIDs query string false "需要刷新的门店ID列表(对于单门店必须指定)" +// @Param isAsync query bool false "起始状态" +// @Success 200 {object} controllers.CallResult +// @Failure 200 {object} controllers.CallResult +// @router /RefreshAllSkusID [put] +func (c *SyncController) RefreshAllSkusID() { + c.callRefreshAllSkusID(func(params *tSyncRefreshAllSkusIDParams) (retVal interface{}, errCode string, err error) { + var vendorIDs, storeIDs []int + if err = utils.UnmarshalUseNumber([]byte(params.VendorIDs), &vendorIDs); err != nil { + return retVal, "", err + } + if params.StoreIDs != "" { + if err = utils.UnmarshalUseNumber([]byte(params.StoreIDs), &storeIDs); err != nil { + return retVal, "", err + } + } + retVal, err = cms.CurVendorSync.RefreshAllSkusID(params.Ctx, params.IsAsync, vendorIDs, storeIDs) + return retVal, "", err + }) +} diff --git a/routers/commentsRouter_controllers.go b/routers/commentsRouter_controllers.go index b670f61a4..419ae2572 100644 --- a/routers/commentsRouter_controllers.go +++ b/routers/commentsRouter_controllers.go @@ -319,14 +319,6 @@ func init() { MethodParams: param.Make(), Params: nil}) - beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:SkuController"] = append(beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:SkuController"], - beego.ControllerComments{ - Method: "RefreshSkuIDs", - Router: `/RefreshSkuIDs`, - AllowHTTPMethods: []string{"put"}, - MethodParams: param.Make(), - Params: nil}) - beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:SkuController"] = append(beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:SkuController"], beego.ControllerComments{ Method: "ReorderCategories", @@ -551,6 +543,22 @@ func init() { MethodParams: param.Make(), Params: nil}) + beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:SyncController"] = append(beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:SyncController"], + beego.ControllerComments{ + Method: "RefreshAllSkusID", + Router: `/RefreshAllSkusID`, + AllowHTTPMethods: []string{"put"}, + MethodParams: param.Make(), + Params: nil}) + + beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:SyncController"] = append(beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:SyncController"], + beego.ControllerComments{ + Method: "RefreshAllStoresID", + Router: `/RefreshAllStoresID`, + AllowHTTPMethods: []string{"put"}, + MethodParams: param.Make(), + Params: nil}) + beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:TaskController"] = append(beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:TaskController"], beego.ControllerComments{ Method: "CancelTask", diff --git a/routers/router.go b/routers/router.go index e1940ad3a..f7e944a4b 100644 --- a/routers/router.go +++ b/routers/router.go @@ -66,6 +66,11 @@ func init() { &controllers.PromotionController{}, ), ), + beego.NSNamespace("/sync", + beego.NSInclude( + &controllers.SyncController{}, + ), + ), ) beego.AddNamespace(ns)