From eac24b9a5abd95ed6ad1d4c0bf59e37b6a21b35e Mon Sep 17 00:00:00 2001 From: gazebo Date: Sat, 27 Oct 2018 16:12:00 +0800 Subject: [PATCH] - SyncStoresCategory - TaskError for json output. --- business/jxstore/cms/sync.go | 45 +++++++++++----- business/jxutils/tasksch/parallel_task.go | 6 +-- business/jxutils/tasksch/sequence_task.go | 4 +- business/jxutils/tasksch/task.go | 23 ++++++-- business/jxutils/tasksch/task_test.go | 14 +++++ business/partner/purchase/ebai/store_sku.go | 21 ++++++-- controllers/cms_sync.go | 60 +++++++++++++++++++++ routers/commentsRouter_controllers.go | 16 ++++++ 8 files changed, 163 insertions(+), 26 deletions(-) create mode 100644 business/jxutils/tasksch/task_test.go diff --git a/business/jxstore/cms/sync.go b/business/jxstore/cms/sync.go index 3ca28962a..b7990958d 100644 --- a/business/jxstore/cms/sync.go +++ b/business/jxstore/cms/sync.go @@ -273,26 +273,47 @@ func (v *VendorSync) SyncSku(ctx *jxcontext.Context, db *dao.DaoDB, nameID, skuI return "", err } +func (v *VendorSync) SyncStoresCategory(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isAsync bool) (hint string, err error) { + globals.SugarLogger.Debug("SyncStoresCategory") + return v.LoopStoresMap(ctx, db, "SyncStoresCategory", isAsync, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { + loopMapInfo := batchItemList[0].(*LoopStoreMapInfo) + if handler := v.GetSingleStoreHandler(loopMapInfo.VendorID); handler != nil { + if len(loopMapInfo.StoreMapList) > 1 { + loopStoreTask := tasksch.NewSeqTask("SyncStoresCategory loop stores", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { + storeID := loopMapInfo.StoreMapList[step].StoreID + _, err = handler.SyncStoreCategory(ctx, task, storeID, false) + return nil, err + }, len(loopMapInfo.StoreMapList)) + t.AddChild(loopStoreTask).Run() + _, err = loopStoreTask.GetResult(0) + return nil, err + } + _, err = handler.SyncStoreCategory(ctx, t, loopMapInfo.StoreMapList[0].StoreID, false) + } + return nil, err + }) +} + // func (v *VendorSync) SyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, skuIDs []int, isAsync bool, userName string) (hint string, err error) { globals.SugarLogger.Debug("SyncStoresSkus") - hint, err = v.LoopStoresMap(ctx, db, "SyncStoresSkus", isAsync, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { + return v.LoopStoresMap(ctx, db, "SyncStoresSkus", isAsync, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { loopMapInfo := batchItemList[0].(*LoopStoreMapInfo) - handler := v.GetStoreHandler(loopMapInfo.VendorID) - if len(loopMapInfo.StoreMapList) > 1 { - loopStoreTask := tasksch.NewSeqTask("SyncStoresSkus loop stores", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { - storeID := loopMapInfo.StoreMapList[step].StoreID - _, err = handler.SyncStoreSkus(ctx, task, storeID, skuIDs, false) + if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil { + if len(loopMapInfo.StoreMapList) > 1 { + loopStoreTask := tasksch.NewSeqTask("SyncStoresSkus loop stores", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { + storeID := loopMapInfo.StoreMapList[step].StoreID + _, err = handler.SyncStoreSkus(ctx, task, storeID, skuIDs, false) + return nil, err + }, len(loopMapInfo.StoreMapList)) + t.AddChild(loopStoreTask).Run() + _, err = loopStoreTask.GetResult(0) return nil, err - }, len(loopMapInfo.StoreMapList)) - t.AddChild(loopStoreTask).Run() - _, err = loopStoreTask.GetResult(0) - return nil, err + } + _, err = handler.SyncStoreSkus(ctx, t, loopMapInfo.StoreMapList[0].StoreID, skuIDs, false) } - _, err = handler.SyncStoreSkus(ctx, t, loopMapInfo.StoreMapList[0].StoreID, skuIDs, false) return nil, err }) - return hint, err } func (v *VendorSync) LoopStoresMap(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync bool, vendorIDs []int, storeIDs []int, handler tasksch.WorkFunc) (hint string, err error) { diff --git a/business/jxutils/tasksch/parallel_task.go b/business/jxutils/tasksch/parallel_task.go index cc18202e4..a436bad3f 100644 --- a/business/jxutils/tasksch/parallel_task.go +++ b/business/jxutils/tasksch/parallel_task.go @@ -43,7 +43,7 @@ func NewParallelConfig() *ParallelConfig { return &ParallelConfig{ ParallelCount: DefParallelCount, BatchSize: 1, - IsContinueWhenError: false, + IsContinueWhenError: true, ResultHandler: nil, } } @@ -127,7 +127,7 @@ func (task *ParallelTask) Run() { goto end } task.locker.Lock() - task.DetailErrList = append(task.DetailErrList, err) + task.DetailErrList = append(task.DetailErrList, NewTaskError(err)) task.locker.Unlock() } } @@ -175,7 +175,7 @@ func (task *ParallelTask) Run() { task.Status = TaskStatusFinished } } - task.Err = taskErr + task.Err = NewTaskError(taskErr) task.Result = taskResult task.TerminatedAt = time.Now() task.locker.Unlock() diff --git a/business/jxutils/tasksch/sequence_task.go b/business/jxutils/tasksch/sequence_task.go index 7cd6e7480..36ebf69d0 100644 --- a/business/jxutils/tasksch/sequence_task.go +++ b/business/jxutils/tasksch/sequence_task.go @@ -46,7 +46,7 @@ func (task *SeqTask) Run() { break } task.locker.Lock() - task.DetailErrList = append(task.DetailErrList, err) + task.DetailErrList = append(task.DetailErrList, NewTaskError(err)) task.locker.Unlock() } else if result != nil { taskResult = append(taskResult, utils.Interface2Slice(result)...) @@ -64,7 +64,7 @@ func (task *SeqTask) Run() { task.Status = TaskStatusFinished } } - task.Err = taskErr + task.Err = NewTaskError(taskErr) task.Result = taskResult task.TerminatedAt = time.Now() task.locker.Unlock() diff --git a/business/jxutils/tasksch/task.go b/business/jxutils/tasksch/task.go index 207899be7..6faaf98db 100644 --- a/business/jxutils/tasksch/task.go +++ b/business/jxutils/tasksch/task.go @@ -43,6 +43,20 @@ type ITask interface { json.Marshaler } +type TaskError struct { + error +} + +func (t *TaskError) MarshalJSON() ([]byte, error) { + return json.Marshal(t.Error()) +} + +func NewTaskError(err error) *TaskError { + return &TaskError{ + err, + } +} + type BaseTask struct { Name string `json:"name"` ID string `json:"id"` @@ -61,10 +75,11 @@ type BaseTask struct { FailedJobCount int `json:"failedJobCount"` Status int `json:"status"` - Result []interface{} `json:"result"` - Err error `json:"err"` - Children TaskList `json:"children"` - DetailErrList []error `json:"detailErrList"` + Result []interface{} `json:"result"` + Children TaskList `json:"children"` + + Err error `json:"err"` + DetailErrList []error `json:"detailErrList"` finishChan chan int C <-chan int `json:"-"` diff --git a/business/jxutils/tasksch/task_test.go b/business/jxutils/tasksch/task_test.go new file mode 100644 index 000000000..745c3ee9d --- /dev/null +++ b/business/jxutils/tasksch/task_test.go @@ -0,0 +1,14 @@ +package tasksch + +import ( + "errors" + "fmt" + "testing" + + "git.rosy.net.cn/baseapi/utils" +) + +func TestTaskError(t *testing.T) { + err := NewTaskError(errors.New("hello")) + fmt.Println(utils.Format4Output(err, false)) +} diff --git a/business/partner/purchase/ebai/store_sku.go b/business/partner/purchase/ebai/store_sku.go index 678d3f7b1..69d11db84 100644 --- a/business/partner/purchase/ebai/store_sku.go +++ b/business/partner/purchase/ebai/store_sku.go @@ -49,6 +49,8 @@ type tStoreSkuFullInfo struct { EbaiCat1ID int64 `orm:"column(ebai_cat1_id)"` EbaiCat2ID int64 `orm:"column(ebai_cat2_id)"` EbaiCat3ID int64 `orm:"column(ebai_cat3_id)"` + + PricePercentage int } type tStoreCatInfo struct { @@ -81,7 +83,7 @@ func (p *PurchaseHandler) SyncStoreSkus(ctx *jxcontext.Context, parentTask tasks return "", err } sql := ` - SELECT t1.*, t2.spec_quality, t2.spec_unit, t2.weight, t2.status sku_status, + SELECT t8.price_percentage, t1.*, t2.spec_quality, t2.spec_unit, t2.weight, t2.status sku_status, t3.prefix, t3.name, t2.comment, t3.is_global, t3.unit, t3.img, t4.name cat_name, t4.id cat_id, t4.level cat_level, t5.ebai_id cat_ebai_id, @@ -96,6 +98,7 @@ func (p *PurchaseHandler) SyncStoreSkus(ctx *jxcontext.Context, parentTask tasks LEFT JOIN store_sku_category_map t5p ON t5p.store_id = t1.store_id AND t5p.category_id = t4p.id AND t5p.deleted_at = ? LEFT JOIN sku_vendor_category cat1 ON t4.ebai_category_id = cat1.vendor_category_id AND cat1.vendor_id = ? LEFT JOIN sku_vendor_category cat2 ON cat1.parent_id = cat2.vendor_category_id AND cat1.vendor_id = ? + JOIN store_map t8 ON t8.store_id = t1.store_id AND t8.vendor_id = ? AND t8.deleted_at = ? WHERE t1.store_id = ? AND (t1.ebai_sync_status <> 0) ` sqlParams := []interface{}{ @@ -103,6 +106,8 @@ func (p *PurchaseHandler) SyncStoreSkus(ctx *jxcontext.Context, parentTask tasks utils.DefaultTimeValue, model.VendorIDEBAI, model.VendorIDEBAI, + model.VendorIDEBAI, + utils.DefaultTimeValue, storeID, } if len(skuIDs) > 0 { @@ -270,12 +275,13 @@ func (p *PurchaseHandler) RefreshStoresAllSkusID(ctx *jxcontext.Context, parentT /////////// func genSkuParamsFromStoreSkuInfo(storeSku *tStoreSkuFullInfo) map[string]interface{} { + price := jxutils.CaculateSkuVendorPrice(storeSku.Price, storeSku.PricePercentage) return map[string]interface{}{ "name": jxutils.ComposeSkuName(storeSku.Prefix, storeSku.Name, storeSku.Comment, storeSku.Unit, storeSku.SpecQuality, storeSku.SpecUnit, 0), "status": jxSkuStatus2Ebai(jxutils.MergeSkuStatus(storeSku.SkuStatus, storeSku.Status)), "left_num": ebaiapi.MaxLeftNum, - "sale_price": storeSku.Price, - "market_price": storeSku.Price, + "sale_price": price, + "market_price": price, "cat1_id": getEbaiCat(storeSku.EbaiCat1ID, 1), "cat2_id": getEbaiCat(storeSku.EbaiCat2ID, 2), "cat3_id": getEbaiCat(storeSku.EbaiCat3ID, 3), @@ -391,13 +397,13 @@ func (p *PurchaseHandler) SyncStoreCategory(ctx *jxcontext.Context, parentTask t if catInfo.EbaiSyncStatus&model.SyncFlagDeletedMask != 0 { // 删除 err = api.EbaiAPI.ShopCategoryDelete(strStoreID, catInfo.EbaiID) } else if catInfo.EbaiSyncStatus&model.SyncFlagNewMask != 0 { // 新增 - ebaiID, err2 := api.EbaiAPI.ShopCategoryCreate(strStoreID, catInfo.ParentEbaiID, formatName(catInfo.Name), int(catInfo.Seq+1), utils.Int2Str(catInfo.CategoryID)) + ebaiID, err2 := api.EbaiAPI.ShopCategoryCreate(strStoreID, catInfo.ParentEbaiID, formatName(catInfo.Name), jxCatSeq2Ebai(catInfo.Seq), utils.Int2Str(catInfo.CategoryID)) if err = err2; err == nil { catInfo.EbaiID = ebaiID updateFields = append(updateFields, model.FieldEbaiID) } } else if catInfo.EbaiSyncStatus&model.SyncFlagModifiedMask != 0 { // 修改 - err = api.EbaiAPI.ShopCategoryUpdate(strStoreID, catInfo.EbaiID, formatName(catInfo.Name), int(catInfo.Seq+1), utils.Int2Str(catInfo.CategoryID)) + err = api.EbaiAPI.ShopCategoryUpdate(strStoreID, catInfo.EbaiID, formatName(catInfo.Name), jxCatSeq2Ebai(catInfo.Seq), utils.Int2Str(catInfo.CategoryID)) } } if err == nil { @@ -473,3 +479,8 @@ func (p *PurchaseHandler) updateLocalCatAsNew(db *dao.DaoDB, localCatMap map[str func formatName(name string) string { return strings.Trim(utils.FilterMb4(name), "\n\r\t ") } + +// 饿百的排序是从大到小 +func jxCatSeq2Ebai(seq int) int { + return 10000 - seq +} diff --git a/controllers/cms_sync.go b/controllers/cms_sync.go index 6402e34f8..81494761b 100644 --- a/controllers/cms_sync.go +++ b/controllers/cms_sync.go @@ -3,6 +3,7 @@ package controllers import ( "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxstore/cms" + "git.rosy.net.cn/jx-callback/business/model/dao" "github.com/astaxie/beego" ) @@ -10,6 +11,65 @@ type SyncController struct { beego.Controller } +// @Title 同步商家商品信息 +// @Description 同步商家商品信息 +// @Param token header string true "认证token" +// @Param storeIDs formData string true "门店ID列表" +// @Param vendorIDs formData string true "厂商ID列表" +// @Param isAsync formData bool true "是否异步操作" +// @Param skuIDs formData string false "SKU ID列表,缺省为全部" +// @Param isContinueWhenErr formData bool false "单个同步失败是否继续,缺省false" +// @Success 200 {object} controllers.CallResult +// @Failure 200 {object} controllers.CallResult +// @router /SyncStoresSkus [put] +func (c *SyncController) SyncStoresSkus() { + c.callSyncStoresSkus(func(params *tSyncSyncStoresSkusParams) (retVal interface{}, errCode string, err error) { + db := dao.GetDB() + var storeIDs []int + var skuIDs []int + var vendorIDs []int + if err = utils.UnmarshalUseNumber([]byte(params.StoreIDs), &storeIDs); err != nil { + return retVal, "", err + } + if err = utils.UnmarshalUseNumber([]byte(params.VendorIDs), &vendorIDs); err != nil { + return retVal, "", err + } + if params.SkuIDs != "" { + if err = utils.UnmarshalUseNumber([]byte(params.SkuIDs), &skuIDs); err != nil { + return retVal, "", err + } + } + retVal, err = cms.CurVendorSync.SyncStoresSkus(params.Ctx, db, vendorIDs, storeIDs, skuIDs, params.IsAsync, params.Ctx.GetUserName()) + return retVal, "", err + }) +} + +// @Title 同步商家商品信息 +// @Description 同步商家商品信息 +// @Param token header string true "认证token" +// @Param storeIDs formData string true "门店ID列表" +// @Param vendorIDs formData string true "厂商ID列表" +// @Param isAsync formData bool true "是否异步操作" +// @Param isContinueWhenErr formData bool false "单个同步失败是否继续,缺省false" +// @Success 200 {object} controllers.CallResult +// @Failure 200 {object} controllers.CallResult +// @router /SyncStoresCategory [put] +func (c *SyncController) SyncStoresCategory() { + c.callSyncStoresCategory(func(params *tSyncSyncStoresCategoryParams) (retVal interface{}, errCode string, err error) { + db := dao.GetDB() + var storeIDs []int + var vendorIDs []int + if err = utils.UnmarshalUseNumber([]byte(params.StoreIDs), &storeIDs); err != nil { + return retVal, "", err + } + if err = utils.UnmarshalUseNumber([]byte(params.VendorIDs), &vendorIDs); err != nil { + return retVal, "", err + } + retVal, err = cms.CurVendorSync.SyncStoresCategory(params.Ctx, db, vendorIDs, storeIDs, params.IsAsync) + return retVal, "", err + }) +} + // @Title 全部刷新所有门店商家ID // @Description 全部刷新所有门店商家ID // @Param token header string true "认证token" diff --git a/routers/commentsRouter_controllers.go b/routers/commentsRouter_controllers.go index 6457acca2..13ef7cb32 100644 --- a/routers/commentsRouter_controllers.go +++ b/routers/commentsRouter_controllers.go @@ -575,6 +575,22 @@ func init() { MethodParams: param.Make(), Params: nil}) + beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:SyncController"] = append(beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:SyncController"], + beego.ControllerComments{ + Method: "SyncStoresCategory", + Router: `/SyncStoresCategory`, + AllowHTTPMethods: []string{"put"}, + MethodParams: param.Make(), + Params: nil}) + + beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:SyncController"] = append(beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:SyncController"], + beego.ControllerComments{ + Method: "SyncStoresSkus", + Router: `/SyncStoresSkus`, + AllowHTTPMethods: []string{"put"}, + MethodParams: param.Make(), + Params: nil}) + beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:TaskController"] = append(beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:TaskController"], beego.ControllerComments{ Method: "CancelTask",