- 新版同步逻辑基本OK

This commit is contained in:
gazebo
2019-07-17 17:02:31 +08:00
parent 6bb543f544
commit f6cf334567
7 changed files with 398 additions and 22 deletions

View File

@@ -7,6 +7,7 @@ import (
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
"git.rosy.net.cn/jx-callback/business/jxutils/tasksch"
"git.rosy.net.cn/jx-callback/business/model/dao"
"git.rosy.net.cn/jx-callback/business/partner"
)
@@ -29,7 +30,7 @@ func (p *DefSingleStorePlatform) DeleteStoreAllSkus(ctx *jxcontext.Context, pare
_, err = FreeBatchStoreSkuInfo(func(batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, err error) {
err = p.DeleteStoreSkus(ctx, storeID, vendorStoreID, batchedStoreSkuList)
return nil, err
}, ctx, parentTask, storeStoreList, p.GetStoreSkusBatchSize(partner.FuncDeleteStoreSkus))
}, ctx, parentTask, storeStoreList, p.GetStoreSkusBatchSize(partner.FuncDeleteStoreSkus), isContinueWhenError)
return err
}
@@ -56,7 +57,7 @@ func (p *DefSingleStorePlatform) DeleteStoreAllCategories(ctx *jxcontext.Context
}
err = FreeBatchCategoryIDOp(func(vendorCatID string) (err error) {
return p.DeleteStoreCategory(ctx, storeID, vendorStoreID, vendorCatID)
}, ctx, task, vendorCatIDs)
}, ctx, task, vendorCatIDs, isContinueWhenError)
return nil, err
}, len(levelList))
tasksch.HandleTask(task1, parentTask, true).Run()
@@ -75,7 +76,7 @@ func flatCatList(catList []*partner.BareCategoryInfo) (flattedCatList []*partner
func (p *DefSingleStorePlatform) GetStoreSkusBareInfo(ctx *jxcontext.Context, parentTask tasksch.ITask, storeID int, vendorStoreID string, inStoreSkuList []*partner.StoreSkuInfo) (outStoreSkuList []*partner.StoreSkuInfo, err error) {
resultList, err := FreeBatchStoreSkuInfo(func(batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, err error) {
return p.GetStoreSkusFullInfo(ctx, parentTask, storeID, vendorStoreID, batchedStoreSkuList)
}, ctx, parentTask, inStoreSkuList, p.GetStoreSkusBatchSize(partner.FuncGetStoreSkusFullInfo))
}, ctx, parentTask, inStoreSkuList, p.GetStoreSkusBatchSize(partner.FuncGetStoreSkusFullInfo), true)
if err != nil || len(resultList) == 0 {
return nil, err
}
@@ -119,15 +120,18 @@ func (p *DefSingleStorePlatform) GetStoreCategory(ctx *jxcontext.Context, storeI
return cat, err
}
func FreeBatchStoreSkuInfo(handler func([]*partner.StoreSkuInfo) (interface{}, error), ctx *jxcontext.Context, parentTask tasksch.ITask, storeSkuList []*partner.StoreSkuInfo, batchSize int) (resultList []interface{}, err error) {
func FreeBatchStoreSkuInfo(handler func([]*partner.StoreSkuInfo) (interface{}, error), ctx *jxcontext.Context, parentTask tasksch.ITask, storeSkuList []*partner.StoreSkuInfo, batchSize int, isContinueWhenError bool) (resultList []interface{}, err error) {
if len(storeSkuList) > batchSize {
task := tasksch.NewParallelTask("FreeBatchStoreSkuInfo", tasksch.NewParallelConfig().SetBatchSize(batchSize), ctx,
task := tasksch.NewParallelTask("FreeBatchStoreSkuInfo", tasksch.NewParallelConfig().SetBatchSize(batchSize).SetIsContinueWhenError(isContinueWhenError), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
batchStoreSkuList := make([]*partner.StoreSkuInfo, len(batchItemList))
for k, v := range batchItemList {
batchStoreSkuList[k] = v.(*partner.StoreSkuInfo)
}
retVal, err = handler(batchStoreSkuList)
if err != nil {
retVal = nil
}
return retVal, err
}, storeSkuList)
tasksch.HandleTask(task, parentTask, false).Run()
@@ -141,9 +145,34 @@ func FreeBatchStoreSkuInfo(handler func([]*partner.StoreSkuInfo) (interface{}, e
return resultList, err
}
func FreeBatchCategoryIDOp(handler func(vendorCatID string) (err error), ctx *jxcontext.Context, parentTask tasksch.ITask, vendorCatIDs []string) (err error) {
func FreeBatchStoreSkuSyncInfo(handler func([]*dao.StoreSkuSyncInfo) (interface{}, error), ctx *jxcontext.Context, parentTask tasksch.ITask, storeSkuList []*dao.StoreSkuSyncInfo, batchSize int, isContinueWhenError bool) (resultList []interface{}, err error) {
if len(storeSkuList) > batchSize {
task := tasksch.NewParallelTask("FreeBatchStoreSkuSyncInfo", tasksch.NewParallelConfig().SetBatchSize(batchSize).SetIsContinueWhenError(isContinueWhenError), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
batchStoreSkuList := make([]*dao.StoreSkuSyncInfo, len(batchItemList))
for k, v := range batchItemList {
batchStoreSkuList[k] = v.(*dao.StoreSkuSyncInfo)
}
retVal, err = handler(batchStoreSkuList)
if err != nil {
retVal = nil
}
return retVal, err
}, storeSkuList)
tasksch.HandleTask(task, parentTask, false).Run()
resultList, err = task.GetResult(0)
} else {
result, err2 := handler(storeSkuList)
if err = err2; err == nil {
resultList = utils.Interface2Slice(result)
}
}
return resultList, err
}
func FreeBatchCategoryIDOp(handler func(vendorCatID string) (err error), ctx *jxcontext.Context, parentTask tasksch.ITask, vendorCatIDs []string, isContinueWhenError bool) (err error) {
if len(vendorCatIDs) > 1 {
task := tasksch.NewParallelTask("FreeBatchCategoryIDOp", nil, ctx,
task := tasksch.NewParallelTask("FreeBatchCategoryIDOp", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
err = handler(batchItemList[0].(string))
return nil, err