- 尽量在老的同步实现中使用新的store_sku2基础函数

This commit is contained in:
gazebo
2019-07-15 23:45:26 +08:00
parent 9ebfcabcd1
commit 5328788e70
2 changed files with 48 additions and 213 deletions

View File

@@ -150,30 +150,35 @@ func (p *PurchaseHandler) FullSyncStoreSkus(ctx *jxcontext.Context, parentTask t
globals.SugarLogger.Debugf("ebai FullSyncStoreSkus storeID:%d, isContinueWhenError:%t, userName:%s", storeID, isContinueWhenError, userName)
db := dao.GetDB()
storeDetail, err := dao.GetStoreDetail(db, storeID, model.VendorIDEBAI)
if err != nil {
return "", err
}
rootTask := tasksch.NewSeqTask("FullSyncStoreSkus", ctx,
func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
switch step {
case 0:
err = p.DeleteRemoteSkus(ctx, rootTask, storeID, nil)
err = p.DeleteStoreAllSkus(ctx, rootTask, storeID, storeDetail.VendorStoreID, isContinueWhenError)
// 强制忽略删除SKU错误
if isContinueWhenError {
err = nil
}
if err == nil {
_, err = dao.SetStoreSkuSyncStatus(db, model.VendorIDEBAI, []int{storeID}, nil, model.SyncFlagNewMask)
}
case 1:
_, err = p.setStoreSkuSyncStatus(ctx, db, storeID, nil, model.SyncFlagNewMask)
case 2:
if err = p.DeleteRemoteCategories(ctx, rootTask, storeID, nil); err == nil {
if err = p.DeleteStoreAllCategories(ctx, rootTask, storeID, storeDetail.VendorStoreID, isContinueWhenError); err == nil {
_, err = dao.SetStoreCategorySyncStatus(db, model.VendorIDEBAI, []int{storeID}, nil, model.SyncFlagNewMask)
}
case 3:
case 2:
err = p.SyncLocalStoreCategory(db, storeID, userName)
case 4:
case 3:
_, err = p.SyncStoreCategory(ctx, rootTask, storeID, false)
case 5:
case 4:
_, err = p.SyncStoreSkus(ctx, rootTask, storeID, nil, false, isContinueWhenError)
}
return nil, err
}, 6)
}, 5)
tasksch.AddChild(parentTask, rootTask).Run()
if !isAsync {
_, err = rootTask.GetResult(0)
@@ -182,6 +187,16 @@ func (p *PurchaseHandler) FullSyncStoreSkus(ctx *jxcontext.Context, parentTask t
}
func (p *PurchaseHandler) SyncStoreSkus(ctx *jxcontext.Context, parentTask tasksch.ITask, storeID int, skuIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
db := dao.GetDB()
storeDetail, err := dao.GetStoreDetail(db, storeID, model.VendorIDEBAI)
if err != nil {
return "", err
}
return p.syncStoreSkus(ctx, parentTask, storeDetail, skuIDs, isAsync, isContinueWhenError)
}
func (p *PurchaseHandler) syncStoreSkus(ctx *jxcontext.Context, parentTask tasksch.ITask, storeDetail *dao.StoreDetail, skuIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
storeID := storeDetail.Store.ID
userName := ctx.GetUserName()
globals.SugarLogger.Debugf("ebai SyncStoreSkus storeID:%d, skuIDs:%v, isContinueWhenError:%t, userName:%s", storeID, skuIDs, isContinueWhenError, userName)
@@ -189,10 +204,6 @@ func (p *PurchaseHandler) SyncStoreSkus(ctx *jxcontext.Context, parentTask tasks
var num int64
strStoreID := utils.Int2Str(storeID)
storeDetail, err := dao.GetStoreDetail(dao.GetDB(), storeID, model.VendorIDEBAI)
if err != nil {
return "", err
}
rootTask := tasksch.NewSeqTask("SyncStoreSkus饿百1", ctx,
func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
if step == 0 {
@@ -325,96 +336,6 @@ func isErrModifyPrice(err error) bool {
return false
}
func (p *PurchaseHandler) GetAllRemoteSkus(ctx *jxcontext.Context, storeID int, parentTask tasksch.ITask) (skus []*ebaiapi.SkuInfo, err error) {
globals.SugarLogger.Debugf("ebai GetAllRemoteSkus storeID:%d, userName:%s", storeID, ctx.GetUserName())
page1, err := api.EbaiAPI.SkuList(utils.Int2Str(storeID), &ebaiapi.SkuListParams{
PageSize: MaxPageSize,
})
if err == nil {
skus = append(skus, page1.List...)
if page1.Pages > 1 {
pages := make([]int, page1.Pages-1)
for i := 2; i <= page1.Pages; i++ {
pages[i-2] = i
}
task := tasksch.NewParallelTask("GetAllRemoteSkus", nil, ctx,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
callParams := &ebaiapi.SkuListParams{
PageSize: MaxPageSize,
Page: batchItemList[0].(int),
}
pageSku, err2 := api.EbaiAPI.SkuList(utils.Int2Str(storeID), callParams)
if err2 == nil {
return pageSku.List, err2
}
// globals.SugarLogger.Debug(utils.Format4Output(callParams, false))
return nil, err2
}, pages)
tasksch.HandleTask(task, parentTask, false).Run()
result, err2 := task.GetResult(0)
if err = err2; err == nil {
for _, v := range result {
skus = append(skus, v.(*ebaiapi.SkuInfo))
}
}
}
}
return skus, err
}
func (p *PurchaseHandler) DeleteRemoteSkus(ctx *jxcontext.Context, parentTask tasksch.ITask, storeID int, vendorSkuIDs []string) (err error) {
globals.SugarLogger.Debugf("ebai DeleteRemoteSkus storeID:%d, userName:%s", storeID, ctx.GetUserName())
if vendorSkuIDs == nil {
result, err2 := p.GetAllRemoteSkus(ctx, storeID, parentTask)
if err = err2; err == nil {
vendorSkuIDs = make([]string, len(result))
for k, v := range result {
vendorSkuIDs[k] = utils.Int64ToStr(v.SkuID)
}
}
}
task := tasksch.NewParallelTask("DeleteRemoteSkus", tasksch.NewParallelConfig().SetBatchSize(100).SetIsContinueWhenError(true), ctx,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
strList := make([]string, len(batchItemList))
for k, v := range batchItemList {
strList[k] = v.(string)
}
if globals.EnableEbaiStoreWrite {
err = api.EbaiAPI.SkuDelete(utils.Int2Str(storeID), strings.Join(strList, ","))
}
return nil, err
}, vendorSkuIDs)
tasksch.AddChild(parentTask, task).Run()
_, err = task.GetResult(0)
return err
}
func (p *PurchaseHandler) DeleteRemoteCategories(ctx *jxcontext.Context, parentTask tasksch.ITask, storeID int, vendorCatIDs []int64) (err error) {
globals.SugarLogger.Debugf("ebai DeleteRemoteCategories storeID:%d, userName:%s", storeID, ctx.GetUserName())
strStoreID := utils.Int2Str(storeID)
if vendorCatIDs == nil {
result, err2 := api.EbaiAPI.ShopCategoryGet(strStoreID)
if err = err2; err == nil {
vendorCatIDs = make([]int64, len(result))
for k, v := range result {
vendorCatIDs[k] = v.CategoryID
}
}
}
task := tasksch.NewParallelTask("DeleteRemoteCategories", tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
if globals.EnableEbaiStoreWrite {
err = api.EbaiAPI.ShopCategoryDelete(strStoreID, batchItemList[0].(int64))
}
return nil, err
}, vendorCatIDs)
tasksch.AddChild(parentTask, task).Run()
_, err = task.GetResult(0)
return err
}
func (p *PurchaseHandler) RefreshStoresAllSkusID(ctx *jxcontext.Context, parentTask tasksch.ITask, isAsync bool, storeIDs []int) (hint string, err error) {
return hint, err
}
@@ -654,11 +575,6 @@ func (p *PurchaseHandler) updateLocalCatAsNew(db *dao.DaoDB, localCatMap map[str
return nil
}
func (p *PurchaseHandler) setStoreSkuSyncStatus(ctx *jxcontext.Context, db *dao.DaoDB, storeID int, skuIDs []int, syncStatus int) (num int64, err error) {
globals.SugarLogger.Debugf("ebai setStoreSkuSyncStatus storeID:%d, userName:%s", storeID, ctx.GetUserName())
return dao.SetStoreSkuSyncStatus(db, model.VendorIDEBAI, []int{storeID}, skuIDs, syncStatus)
}
func formatName(name string) string {
return utils.TrimBlankChar(utils.FilterMb4(name))
}

View File

@@ -227,8 +227,18 @@ func TranverseRemoteCatList(parentCatName string, remoteCats []*mtwmapi.RetailCa
return nil
}
// hint如果是异步返回的是任务ID如果是同步返回是本次需要同步的目录数
func (p *PurchaseHandler) SyncStoreSkus(ctx *jxcontext.Context, parentTask tasksch.ITask, storeID int, skuIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
db := dao.GetDB()
storeDetail, err := dao.GetStoreDetail(db, storeID, model.VendorIDMTWM)
if err != nil {
return "", err
}
return p.syncStoreSkus(ctx, parentTask, storeDetail, skuIDs, isAsync, isContinueWhenError)
}
// hint如果是异步返回的是任务ID如果是同步返回是本次需要同步的目录数
func (p *PurchaseHandler) syncStoreSkus(ctx *jxcontext.Context, parentTask tasksch.ITask, storeDetail *dao.StoreDetail, skuIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
storeID := storeDetail.Store.ID
globals.SugarLogger.Debugf("mtwm SyncStoreSkus storeID:%d, skuIDs:%v, isContinueWhenError:%t, userName:%s", storeID, skuIDs, isContinueWhenError, ctx.GetUserName())
db := dao.GetDB()
for i := 0; i < 3; i++ { // 最多重试三次
@@ -247,10 +257,6 @@ func (p *PurchaseHandler) SyncStoreSkus(ctx *jxcontext.Context, parentTask tasks
if hint != "0" {
return "", errors.New("同步门店商品所需目录失败")
}
storeDetail, err := dao.GetStoreDetail(db, storeID, model.VendorIDMTWM)
if err != nil {
return "", err
}
skus, err := dao.GetStoreSkus(db, model.VendorIDMTWM, storeID, skuIDs)
if err != nil {
return "", err
@@ -403,29 +409,35 @@ func (p *PurchaseHandler) FullSyncStoreSkus(ctx *jxcontext.Context, parentTask t
globals.SugarLogger.Debugf("mtwm FullSyncStoreSkus storeID:%d, isContinueWhenError:%t, userName:%s", storeID, isContinueWhenError, userName)
db := dao.GetDB()
storeDetail, err := dao.GetStoreDetail(db, storeID, model.VendorIDMTWM)
if err != nil {
return "", err
}
rootTask := tasksch.NewSeqTask("美团外卖FullSyncStoreSkus", ctx,
func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
switch step {
case 0:
err = p.DeleteRemoteSkus(ctx, rootTask, storeID, nil)
err = p.DeleteStoreAllSkus(ctx, rootTask, storeID, storeDetail.VendorStoreID, isContinueWhenError)
if isContinueWhenError {
err = nil
}
if err == nil {
_, err = dao.SetStoreSkuSyncStatus(db, model.VendorIDMTWM, []int{storeID}, nil, model.SyncFlagNewMask)
}
case 1:
_, err = dao.SetStoreSkuSyncStatus(db, model.VendorIDMTWM, []int{storeID}, nil, model.SyncFlagNewMask)
case 2:
if err = p.DeleteRemoteCategories(ctx, rootTask, storeID, nil); err == nil {
if err = p.DeleteStoreAllCategories(ctx, rootTask, storeID, storeDetail.VendorStoreID, isContinueWhenError); err == nil {
_, err = dao.SetStoreCategorySyncStatus(db, model.VendorIDMTWM, []int{storeID}, nil, model.SyncFlagNewMask)
}
case 3:
case 2:
_, err = p.SyncLocalStoreCategory(ctx, db, storeID, true, nil)
case 4:
case 3:
_, err = p.SyncStoreCategory(ctx, rootTask, storeID, false)
case 5:
case 4:
_, err = p.SyncStoreSkus(ctx, rootTask, storeID, nil, false, isContinueWhenError)
}
return nil, err
}, 6)
}, 5)
tasksch.AddChild(parentTask, rootTask).Run()
if !isAsync {
_, err = rootTask.GetResult(0)
@@ -433,99 +445,6 @@ func (p *PurchaseHandler) FullSyncStoreSkus(ctx *jxcontext.Context, parentTask t
return rootTask.ID, err
}
func (p *PurchaseHandler) DeleteRemoteSkus(ctx *jxcontext.Context, parentTask tasksch.ITask, storeID int, vendorSkuIDs []string) (err error) {
if vendorSkuIDs == nil {
result, err2 := p.GetAllRemoteSkus(storeID)
if err = err2; err == nil {
vendorSkuIDs = make([]string, len(result))
for k, v := range result {
vendorSkuIDs[k] = v.AppFoodCode
}
}
}
storeDetail, err := dao.GetStoreDetail(dao.GetDB(), storeID, model.VendorIDMTWM)
if err != nil {
return err
}
vendorStoreID := storeDetail.VendorStoreID
task := tasksch.NewParallelTask("mtwm DeleteRemoteSkus", tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
if globals.EnableMtwmStoreWrite {
// globals.SugarLogger.Debugf("mtwm RetailDelete vendorStoreID:%s, sku:%s", vendorStoreID, batchItemList[0].(string))
err = api.MtwmAPI.RetailDelete(vendorStoreID, batchItemList[0].(string))
}
return nil, err
}, vendorSkuIDs)
tasksch.AddChild(parentTask, task).Run()
_, err = task.GetResult(0)
return err
}
func (p *PurchaseHandler) GetAllRemoteSkus(storeID int) (skus []*mtwmapi.AppFood, err error) {
storeDetail, err := dao.GetStoreDetail(dao.GetDB(), storeID, model.VendorIDMTWM)
if err != nil {
return nil, err
}
vendorStoreID := storeDetail.VendorStoreID
for {
result, err := api.MtwmAPI.RetailList(vendorStoreID, len(skus), mtwmapi.GeneralMaxLimit)
if err != nil {
return nil, err
}
skus = append(skus, result...)
if len(result) < mtwmapi.GeneralMaxLimit {
break
}
}
return skus, err
}
func (p *PurchaseHandler) DeleteRemoteCategories(ctx *jxcontext.Context, parentTask tasksch.ITask, storeID int, vendorCatIDs []string) (err error) {
storeDetail, err := dao.GetStoreDetail(dao.GetDB(), storeID, model.VendorIDMTWM)
if err != nil {
return err
}
vendorStoreID := storeDetail.VendorStoreID
vendorCatIDs2 := make([]string, 0)
if vendorCatIDs == nil {
result, err := api.MtwmAPI.RetailCatList(vendorStoreID)
if err != nil {
return err
}
vendorCatIDs = make([]string, len(result))
for k, v := range result {
vendorCatIDs[k] = v.Name
for _, v2 := range v.Children {
vendorCatIDs2 = append(vendorCatIDs2, v2.Name)
}
}
}
rootTask := tasksch.NewSeqTask("mtwm DeleteRemoteCategories", ctx,
func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
var catIDs []string
if step == 0 {
catIDs = vendorCatIDs2
} else {
catIDs = vendorCatIDs
}
if len(catIDs) > 0 {
task := tasksch.NewParallelTask("mtwm DeleteRemoteCategories paralle", tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
if globals.EnableMtwmStoreWrite {
err = api.MtwmAPI.RetailCatDelete(vendorStoreID, batchItemList[0].(string))
}
return nil, err
}, catIDs)
rootTask.AddChild(task).Run()
_, err = task.GetResult(0)
}
return nil, err
}, 2)
tasksch.AddChild(parentTask, rootTask).Run()
_, err = rootTask.GetResult(0)
return err
}
func (p *PurchaseHandler) GetStoresSku(ctx *jxcontext.Context, parentTask tasksch.ITask, storeIDs []int) (storeSkuList []*model.StoreSkuBind, err error) {
return storeSkuList, err
}