- tasksch.NewParallelTask2,支持部分失败时得到准备的成功,失败个数信息

This commit is contained in:
gazebo
2019-08-06 14:20:32 +08:00
parent baa9157949
commit c020130819
9 changed files with 68 additions and 48 deletions

View File

@@ -203,7 +203,7 @@ func Login(authType, authID, authIDType, authSecret string) (authInfo *AuthInfo,
realAuthID = user.GetID() realAuthID = user.GetID()
} }
if authBindEx, err = handler.VerifySecret(realAuthID, authSecret); err == nil { if authBindEx, err = handler.VerifySecret(realAuthID, authSecret); err == nil {
globals.SugarLogger.Debugf("auth2 Login authBindEx:%s", utils.Format4Output(authBindEx, false)) // globals.SugarLogger.Debugf("auth2 Login authBindEx:%s", utils.Format4Output(authBindEx, false))
if authBindEx == nil { // mobile, email会返回nil表示不会新建AuthBind实体 if authBindEx == nil { // mobile, email会返回nil表示不会新建AuthBind实体
user = userProvider.GetUser(authID, authIDType) user = userProvider.GetUser(authID, authIDType)
authBindEx = &AuthBindEx{ authBindEx = &AuthBindEx{

View File

@@ -349,7 +349,7 @@ func syncStoreSkuNew(ctx *jxcontext.Context, parentTask tasksch.ITask, isFull bo
switch step { switch step {
case 0: case 0:
if len(deleteList) > 0 { if len(deleteList) > 0 {
_, err = putils.FreeBatchStoreSkuInfo(func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, err error) { _, err = putils.FreeBatchStoreSkuInfo(func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
var successList []*partner.StoreSkuInfo var successList []*partner.StoreSkuInfo
if successList, err = singleStoreHandler.DeleteStoreSkus(ctx, storeID, vendorStoreID, batchedStoreSkuList); singleStoreHandler.IsErrSkuNotExist(err) { if successList, err = singleStoreHandler.DeleteStoreSkus(ctx, storeID, vendorStoreID, batchedStoreSkuList); singleStoreHandler.IsErrSkuNotExist(err) {
err = nil err = nil
@@ -360,12 +360,12 @@ func syncStoreSkuNew(ctx *jxcontext.Context, parentTask tasksch.ITask, isFull bo
if len(successList) > 0 { if len(successList) > 0 {
updateStoreSku(dao.GetDB(), vendorID, bareSku2Sync(successList), model.SyncFlagDeletedMask) updateStoreSku(dao.GetDB(), vendorID, bareSku2Sync(successList), model.SyncFlagDeletedMask)
} }
return nil, err return nil, len(successList), err
}, ctx, task, deleteList, 1 /*singleStoreHandler.GetStoreSkusBatchSize(partner.FuncDeleteStoreSkus)*/, isContinueWhenError) }, ctx, task, deleteList, 1 /*singleStoreHandler.GetStoreSkusBatchSize(partner.FuncDeleteStoreSkus)*/, isContinueWhenError)
} }
case 1: case 1:
if len(createList) > 0 { if len(createList) > 0 {
_, err = putils.FreeBatchStoreSkuSyncInfo(func(task tasksch.ITask, batchedStoreSkuList []*dao.StoreSkuSyncInfo) (result interface{}, err error) { _, err = putils.FreeBatchStoreSkuSyncInfo(func(task tasksch.ITask, batchedStoreSkuList []*dao.StoreSkuSyncInfo) (result interface{}, successCount int, err error) {
var successList []*dao.StoreSkuSyncInfo var successList []*dao.StoreSkuSyncInfo
if successList, err = singleStoreHandler.CreateStoreSkus(ctx, storeID, vendorStoreID, batchedStoreSkuList); singleStoreHandler.IsErrSkuExist(err) { if successList, err = singleStoreHandler.CreateStoreSkus(ctx, storeID, vendorStoreID, batchedStoreSkuList); singleStoreHandler.IsErrSkuExist(err) {
if skuNameList, err2 := singleStoreHandler.GetStoreSkusFullInfo(ctx, task, storeID, vendorStoreID, []*partner.StoreSkuInfo{ if skuNameList, err2 := singleStoreHandler.GetStoreSkusFullInfo(ctx, task, storeID, vendorStoreID, []*partner.StoreSkuInfo{
@@ -387,12 +387,12 @@ func syncStoreSkuNew(ctx *jxcontext.Context, parentTask tasksch.ITask, isFull bo
if len(successList) > 0 { if len(successList) > 0 {
updateStoreSku(dao.GetDB(), vendorID, successList, model.SyncFlagNewMask) updateStoreSku(dao.GetDB(), vendorID, successList, model.SyncFlagNewMask)
} }
return nil, err return nil, len(successList), err
}, ctx, task, createList, 1 /*singleStoreHandler.GetStoreSkusBatchSize(partner.FuncCreateStoreSkus)*/, isContinueWhenError) }, ctx, task, createList, 1 /*singleStoreHandler.GetStoreSkusBatchSize(partner.FuncCreateStoreSkus)*/, isContinueWhenError)
} }
case 2: case 2:
if len(updateList) > 0 { if len(updateList) > 0 {
_, err = putils.FreeBatchStoreSkuSyncInfo(func(task tasksch.ITask, batchedStoreSkuList []*dao.StoreSkuSyncInfo) (result interface{}, err error) { _, err = putils.FreeBatchStoreSkuSyncInfo(func(task tasksch.ITask, batchedStoreSkuList []*dao.StoreSkuSyncInfo) (result interface{}, successCount int, err error) {
var successList []*dao.StoreSkuSyncInfo var successList []*dao.StoreSkuSyncInfo
if successList, err = singleStoreHandler.UpdateStoreSkus(ctx, storeID, vendorStoreID, batchedStoreSkuList); err == nil { if successList, err = singleStoreHandler.UpdateStoreSkus(ctx, storeID, vendorStoreID, batchedStoreSkuList); err == nil {
successList = batchedStoreSkuList successList = batchedStoreSkuList
@@ -400,13 +400,13 @@ func syncStoreSkuNew(ctx *jxcontext.Context, parentTask tasksch.ITask, isFull bo
if len(successList) > 0 { if len(successList) > 0 {
updateStoreSku(dao.GetDB(), vendorID, successList, model.SyncFlagModifiedMask) updateStoreSku(dao.GetDB(), vendorID, successList, model.SyncFlagModifiedMask)
} }
return nil, err return nil, len(successList), err
}, ctx, task, updateList, singleStoreHandler.GetStoreSkusBatchSize(partner.FuncUpdateStoreSkus), isContinueWhenError) }, ctx, task, updateList, singleStoreHandler.GetStoreSkusBatchSize(partner.FuncUpdateStoreSkus), isContinueWhenError)
} }
case 3: case 3:
for k, list := range [][]*partner.StoreSkuInfo{stockList /*, onlineList*/} { for k, list := range [][]*partner.StoreSkuInfo{stockList /*, onlineList*/} {
if len(list) > 0 { if len(list) > 0 {
_, err = putils.FreeBatchStoreSkuInfo(func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, err error) { _, err = putils.FreeBatchStoreSkuInfo(func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
var successList []*partner.StoreSkuInfo var successList []*partner.StoreSkuInfo
if successList, err = storeSkuHandler.UpdateStoreSkusStock(ctx, storeID, vendorStoreID, batchedStoreSkuList); err == nil { if successList, err = storeSkuHandler.UpdateStoreSkusStock(ctx, storeID, vendorStoreID, batchedStoreSkuList); err == nil {
successList = batchedStoreSkuList successList = batchedStoreSkuList
@@ -414,7 +414,7 @@ func syncStoreSkuNew(ctx *jxcontext.Context, parentTask tasksch.ITask, isFull bo
if k == 0 && len(successList) > 0 { if k == 0 && len(successList) > 0 {
updateStoreSku(dao.GetDB(), vendorID, bareSku2Sync(successList), model.SyncFlagModifiedMask) // ? updateStoreSku(dao.GetDB(), vendorID, bareSku2Sync(successList), model.SyncFlagModifiedMask) // ?
} }
return nil, err return nil, len(successList), err
}, ctx, task, list, storeSkuHandler.GetStoreSkusBatchSize(partner.FuncUpdateStoreSkusStock), isContinueWhenError) }, ctx, task, list, storeSkuHandler.GetStoreSkusBatchSize(partner.FuncUpdateStoreSkusStock), isContinueWhenError)
} }
} }
@@ -426,7 +426,7 @@ func syncStoreSkuNew(ctx *jxcontext.Context, parentTask tasksch.ITask, isFull bo
status = model.SkuStatusDontSale status = model.SkuStatusDontSale
} }
if len(statusList) > 0 { if len(statusList) > 0 {
_, err = putils.FreeBatchStoreSkuInfo(func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, err error) { _, err = putils.FreeBatchStoreSkuInfo(func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
var successList []*partner.StoreSkuInfo var successList []*partner.StoreSkuInfo
if successList, err = storeSkuHandler.UpdateStoreSkusStatus(ctx, storeID, vendorStoreID, batchedStoreSkuList, status); err == nil { if successList, err = storeSkuHandler.UpdateStoreSkusStatus(ctx, storeID, vendorStoreID, batchedStoreSkuList, status); err == nil {
successList = batchedStoreSkuList successList = batchedStoreSkuList
@@ -434,12 +434,12 @@ func syncStoreSkuNew(ctx *jxcontext.Context, parentTask tasksch.ITask, isFull bo
if len(successList) > 0 { if len(successList) > 0 {
updateStoreSku(dao.GetDB(), vendorID, bareSku2Sync(successList), model.SyncFlagSaleMask) updateStoreSku(dao.GetDB(), vendorID, bareSku2Sync(successList), model.SyncFlagSaleMask)
} }
return nil, err return nil, len(successList), err
}, ctx, task, statusList, storeSkuHandler.GetStoreSkusBatchSize(partner.FuncUpdateStoreSkusStatus), isContinueWhenError) }, ctx, task, statusList, storeSkuHandler.GetStoreSkusBatchSize(partner.FuncUpdateStoreSkusStatus), isContinueWhenError)
} }
case 6: case 6:
if len(priceList) > 0 { if len(priceList) > 0 {
_, err = putils.FreeBatchStoreSkuInfo(func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, err error) { _, err = putils.FreeBatchStoreSkuInfo(func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
var successList []*partner.StoreSkuInfo var successList []*partner.StoreSkuInfo
if successList, err = storeSkuHandler.UpdateStoreSkusPrice(ctx, storeID, vendorStoreID, batchedStoreSkuList); err == nil { if successList, err = storeSkuHandler.UpdateStoreSkusPrice(ctx, storeID, vendorStoreID, batchedStoreSkuList); err == nil {
successList = batchedStoreSkuList successList = batchedStoreSkuList
@@ -447,7 +447,7 @@ func syncStoreSkuNew(ctx *jxcontext.Context, parentTask tasksch.ITask, isFull bo
if len(successList) > 0 { if len(successList) > 0 {
updateStoreSku(dao.GetDB(), vendorID, bareSku2Sync(successList), model.SyncFlagPriceMask) updateStoreSku(dao.GetDB(), vendorID, bareSku2Sync(successList), model.SyncFlagPriceMask)
} }
return nil, err return nil, len(successList), err
}, ctx, task, priceList, storeSkuHandler.GetStoreSkusBatchSize(partner.FuncUpdateStoreSkusPrice), isContinueWhenError) }, ctx, task, priceList, storeSkuHandler.GetStoreSkusBatchSize(partner.FuncUpdateStoreSkusPrice), isContinueWhenError)
} }
} }
@@ -490,9 +490,9 @@ func PruneMissingStoreSkus(ctx *jxcontext.Context, parentTask tasksch.ITask, ven
} }
case 1: case 1:
if len(sku2Delete) > 0 { if len(sku2Delete) > 0 {
_, err = putils.FreeBatchStoreSkuInfo(func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, err error) { _, err = putils.FreeBatchStoreSkuInfo(func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
_, err = handler.DeleteStoreSkus(ctx, storeID, vendorStoreID, sku2Delete) _, err = handler.DeleteStoreSkus(ctx, storeID, vendorStoreID, sku2Delete)
return nil, err return nil, 0, err
}, ctx, parentTask, sku2Delete, handler.GetStoreSkusBatchSize(partner.FuncDeleteStoreSkus), isContinueWhenError) }, ctx, parentTask, sku2Delete, handler.GetStoreSkusBatchSize(partner.FuncDeleteStoreSkus), isContinueWhenError)
} }
} }

View File

@@ -116,7 +116,7 @@ func StartOrEndOpStore(isStart bool, startTime, endTime int16, isAsync, isContin
AddOrDelExtraStoreOptime(vendorID, storeID, vendorStoreID, &storeListValue.Store, startOpStoreTime, endOpStoreTime, false) AddOrDelExtraStoreOptime(vendorID, storeID, vendorStoreID, &storeListValue.Store, startOpStoreTime, endOpStoreTime, false)
} }
_, err = putils.FreeBatchStoreSkuInfo(func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, err error) { _, err = putils.FreeBatchStoreSkuInfo(func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
for _, skuValue := range batchedStoreSkuList { for _, skuValue := range batchedStoreSkuList {
skuValue.Stock = GetStockValue(isStart) skuValue.Stock = GetStockValue(isStart)
} }
@@ -125,7 +125,7 @@ func StartOrEndOpStore(isStart bool, startTime, endTime int16, isAsync, isContin
//successList = batchedStoreSkuList //successList = batchedStoreSkuList
//baseapi.SugarLogger.Debugf("StartOrEndOpStore successList:%v error:%v", successList, err) //baseapi.SugarLogger.Debugf("StartOrEndOpStore successList:%v error:%v", successList, err)
} }
return nil, err return nil, 0, err
}, ctx, task, storeSkuList, singleStoreHandler.GetStoreSkusBatchSize(partner.FuncUpdateStoreSkusStock), true) }, ctx, task, storeSkuList, singleStoreHandler.GetStoreSkusBatchSize(partner.FuncUpdateStoreSkusStock), true)
if isStart { if isStart {

View File

@@ -15,6 +15,7 @@ const (
) )
type WorkFunc func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) type WorkFunc func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error)
type WorkFunc2 func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, successCount int, err error)
type ResultHandlerFunc func(taskName string, result []interface{}, err error) type ResultHandlerFunc func(taskName string, result []interface{}, err error)
type ParallelConfig struct { type ParallelConfig struct {
@@ -28,7 +29,7 @@ type ParallelConfig struct {
type ParallelTask struct { type ParallelTask struct {
BaseTask BaseTask
worker WorkFunc worker WorkFunc2
jobList [][]interface{} jobList [][]interface{}
taskChan chan []interface{} taskChan chan []interface{}
subFinishChan chan interface{} subFinishChan chan interface{}
@@ -69,7 +70,7 @@ func (c *ParallelConfig) SetIsContinueWhenError(isContinueWhenError bool) *Paral
// return c // return c
// } // }
func NewParallelTask(taskName string, config *ParallelConfig, ctx *jxcontext.Context, worker WorkFunc, itemList interface{}, params ...interface{}) *ParallelTask { func NewParallelTask2(taskName string, config *ParallelConfig, ctx *jxcontext.Context, worker WorkFunc2, itemList interface{}, params ...interface{}) *ParallelTask {
if config == nil { if config == nil {
config = NewParallelConfig() config = NewParallelConfig()
} }
@@ -95,6 +96,14 @@ func NewParallelTask(taskName string, config *ParallelConfig, ctx *jxcontext.Con
return task return task
} }
func NewParallelTask(taskName string, config *ParallelConfig, ctx *jxcontext.Context, worker WorkFunc, itemList interface{}, params ...interface{}) *ParallelTask {
return NewParallelTask2(taskName, config, ctx,
func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, successCount int, err error) {
retVal, err = worker(task, batchItemList, params...)
return retVal, 0, err
}, itemList, params...)
}
func (task *ParallelTask) Run() { func (task *ParallelTask) Run() {
task.run(func() { task.run(func() {
globals.SugarLogger.Debugf("ParallelTask.Run %s", task.Name) globals.SugarLogger.Debugf("ParallelTask.Run %s", task.Name)
@@ -113,11 +122,11 @@ func (task *ParallelTask) Run() {
chanRetVal = retVal chanRetVal = retVal
goto end goto end
} else { } else {
result, err := task.callWorker(func() (retVal interface{}, err error) { result, successCount, err := task.callWorker2(func() (retVal interface{}, successCount int, err error) {
return task.worker(task, job, task.params...) return task.worker(task, job, task.params...)
}) })
// globals.SugarLogger.Debugf("ParallelTask.Run %s, after call worker result:%v, err:%v", task.Name, result, err) // globals.SugarLogger.Debugf("ParallelTask.Run %s, after call worker result:%v, err:%v", task.Name, result, err)
task.finishedOneJob(len(job), err) task.finishedOneJob(len(job), successCount, err)
if err != nil { // 出错 if err != nil { // 出错
// globals.SugarLogger.Infof("ParallelTask.Run %s, subtask(job:%s, params:%s) result:%v, failed with error:%v", task.Name, utils.Format4Output(job, true), utils.Format4Output(task.params, true), result, err) // globals.SugarLogger.Infof("ParallelTask.Run %s, subtask(job:%s, params:%s) result:%v, failed with error:%v", task.Name, utils.Format4Output(job, true), utils.Format4Output(task.params, true), result, err)
if task.IsContinueWhenError { if task.IsContinueWhenError {

View File

@@ -40,7 +40,7 @@ func (task *SeqTask) Run() {
result, err := task.callWorker(func() (retVal interface{}, err error) { result, err := task.callWorker(func() (retVal interface{}, err error) {
return task.worker(task, i, task.params...) return task.worker(task, i, task.params...)
}) })
task.finishedOneJob(1, err) task.finishedOneJob(1, 0, err)
if err != nil { if err != nil {
// globals.SugarLogger.Infof("SeqTask.Run %s step:%d failed with error:%v", task.Name, i, err) // globals.SugarLogger.Infof("SeqTask.Run %s step:%d failed with error:%v", task.Name, i, err)
if task.IsContinueWhenError { if task.IsContinueWhenError {

View File

@@ -434,7 +434,8 @@ func (t *BaseTask) run(taskHandler func()) {
} }
} }
func (t *BaseTask) finishedOneJob(itemCount int, err error) { // successCount表示在返回错误的情况下部分成功的个数如果没有返回错误则successCount无意义
func (t *BaseTask) finishedOneJob(itemCount, successCount int, err error) {
t.locker.Lock() t.locker.Lock()
defer t.locker.Unlock() defer t.locker.Unlock()
@@ -443,7 +444,8 @@ func (t *BaseTask) finishedOneJob(itemCount int, err error) {
t.FinishedItemCount += itemCount t.FinishedItemCount += itemCount
t.FinishedJobCount++ t.FinishedJobCount++
} else { } else {
t.FailedItemCount += itemCount t.FinishedItemCount += successCount
t.FailedItemCount += itemCount - successCount
t.FailedJobCount++ t.FailedJobCount++
} }
} }
@@ -473,6 +475,15 @@ func (task *BaseTask) callWorker(worker func() (retVal interface{}, err error))
err = fmt.Errorf("panic, r:%v", r) err = fmt.Errorf("panic, r:%v", r)
} }
}() }()
retVal, err = worker() return worker()
return retVal, err }
func (task *BaseTask) callWorker2(worker func() (retVal interface{}, successCount int, err error)) (retVal interface{}, successCount int, err error) {
defer func() {
if r := recover(); r != nil {
globals.SugarLogger.Errorf("callWorker panic:%v", r)
err = fmt.Errorf("panic, r:%v", r)
}
}()
return worker()
} }

View File

@@ -5,7 +5,6 @@ import (
"git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/globals"
) )
func GetAuthBind(db *DaoDB, userID, authType, authID string) (authBind *model.AuthBind, err error) { func GetAuthBind(db *DaoDB, userID, authType, authID string) (authBind *model.AuthBind, err error) {
@@ -33,7 +32,7 @@ func GetAuthBind(db *DaoDB, userID, authType, authID string) (authBind *model.Au
sql += " AND t1.auth_id = ?" sql += " AND t1.auth_id = ?"
sqlParams = append(sqlParams, authID) sqlParams = append(sqlParams, authID)
} }
globals.SugarLogger.Debugf("GetAuthBind sql:%s, sqlParams:%s", sql, utils.Format4Output(sqlParams, false)) // globals.SugarLogger.Debugf("GetAuthBind sql:%s, sqlParams:%s", sql, utils.Format4Output(sqlParams, false))
err = GetRow(db, &authBind, sql, sqlParams...) err = GetRow(db, &authBind, sql, sqlParams...)
return authBind, err return authBind, err
} }

View File

@@ -65,12 +65,12 @@ func (p *PurchaseHandler) getStoreSkusBareInfoLimitSize(ctx *jxcontext.Context,
} }
func (p *PurchaseHandler) GetStoreSkusBareInfo(ctx *jxcontext.Context, parentTask tasksch.ITask, storeID int, vendorStoreID string, inStoreSkuList []*partner.StoreSkuInfo) (outStoreSkuList []*partner.StoreSkuInfo, err error) { func (p *PurchaseHandler) GetStoreSkusBareInfo(ctx *jxcontext.Context, parentTask tasksch.ITask, storeID int, vendorStoreID string, inStoreSkuList []*partner.StoreSkuInfo) (outStoreSkuList []*partner.StoreSkuInfo, err error) {
result, err := putils.FreeBatchStoreSkuInfo(func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, err error) { result, err := putils.FreeBatchStoreSkuInfo(func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
list, err := p.getStoreSkusBareInfoLimitSize(ctx, task, storeID, vendorStoreID, batchedStoreSkuList) list, err := p.getStoreSkusBareInfoLimitSize(ctx, task, storeID, vendorStoreID, batchedStoreSkuList)
if err == nil { if err == nil {
result = list result = list
} }
return result, err return result, len(list), err
}, ctx, parentTask, inStoreSkuList, jdapi.MaxStoreSkuBatchSize, true) }, ctx, parentTask, inStoreSkuList, jdapi.MaxStoreSkuBatchSize, true)
for _, v := range result { for _, v := range result {
outStoreSkuList = append(outStoreSkuList, v.(*partner.StoreSkuInfo)) outStoreSkuList = append(outStoreSkuList, v.(*partner.StoreSkuInfo))

View File

@@ -29,9 +29,9 @@ func (p *DefSingleStorePlatform) DeleteStoreAllSkus(ctx *jxcontext.Context, pare
VendorSkuID: v.SkuList[0].VendorSkuID, VendorSkuID: v.SkuList[0].VendorSkuID,
} }
} }
_, err = FreeBatchStoreSkuInfo(func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, err error) { _, err = FreeBatchStoreSkuInfo(func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
_, err = p.DeleteStoreSkus(ctx, storeID, vendorStoreID, batchedStoreSkuList) _, err = p.DeleteStoreSkus(ctx, storeID, vendorStoreID, batchedStoreSkuList)
return nil, err return nil, 0, err
}, ctx, parentTask, storeStoreList, p.GetStoreSkusBatchSize(partner.FuncDeleteStoreSkus), isContinueWhenError) }, ctx, parentTask, storeStoreList, p.GetStoreSkusBatchSize(partner.FuncDeleteStoreSkus), isContinueWhenError)
return err return err
} }
@@ -76,8 +76,9 @@ func flatCatList(catList []*partner.BareCategoryInfo) (flattedCatList []*partner
} }
func (p *DefSingleStorePlatform) GetStoreSkusBareInfo(ctx *jxcontext.Context, parentTask tasksch.ITask, storeID int, vendorStoreID string, inStoreSkuList []*partner.StoreSkuInfo) (outStoreSkuList []*partner.StoreSkuInfo, err error) { func (p *DefSingleStorePlatform) GetStoreSkusBareInfo(ctx *jxcontext.Context, parentTask tasksch.ITask, storeID int, vendorStoreID string, inStoreSkuList []*partner.StoreSkuInfo) (outStoreSkuList []*partner.StoreSkuInfo, err error) {
resultList, err := FreeBatchStoreSkuInfo(func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, err error) { resultList, err := FreeBatchStoreSkuInfo(func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
return p.GetStoreSkusFullInfo(ctx, parentTask, storeID, vendorStoreID, batchedStoreSkuList) result, err = p.GetStoreSkusFullInfo(ctx, parentTask, storeID, vendorStoreID, batchedStoreSkuList)
return result, successCount, err
}, ctx, parentTask, inStoreSkuList, p.GetStoreSkusBatchSize(partner.FuncGetStoreSkusFullInfo), true) }, ctx, parentTask, inStoreSkuList, p.GetStoreSkusBatchSize(partner.FuncGetStoreSkusFullInfo), true)
if err != nil || len(resultList) == 0 { if err != nil || len(resultList) == 0 {
return nil, err return nil, err
@@ -133,24 +134,24 @@ func (p *DefSingleStorePlatform) GetStoreCategory(ctx *jxcontext.Context, storeI
return cat, err return cat, err
} }
func FreeBatchStoreSkuInfo(handler func(tasksch.ITask, []*partner.StoreSkuInfo) (interface{}, error), ctx *jxcontext.Context, parentTask tasksch.ITask, storeSkuList []*partner.StoreSkuInfo, batchSize int, isContinueWhenError bool) (resultList []interface{}, err error) { func FreeBatchStoreSkuInfo(handler func(tasksch.ITask, []*partner.StoreSkuInfo) (interface{}, int, error), ctx *jxcontext.Context, parentTask tasksch.ITask, storeSkuList []*partner.StoreSkuInfo, batchSize int, isContinueWhenError bool) (resultList []interface{}, err error) {
if len(storeSkuList) > batchSize { if true { //len(storeSkuList) > batchSize {
task := tasksch.NewParallelTask("FreeBatchStoreSkuInfo", tasksch.NewParallelConfig().SetParallelCount(1).SetBatchSize(batchSize).SetIsContinueWhenError(isContinueWhenError), ctx, task := tasksch.NewParallelTask2("FreeBatchStoreSkuInfo", tasksch.NewParallelConfig().SetParallelCount(1).SetBatchSize(batchSize).SetIsContinueWhenError(isContinueWhenError), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, successCount int, err error) {
batchStoreSkuList := make([]*partner.StoreSkuInfo, len(batchItemList)) batchStoreSkuList := make([]*partner.StoreSkuInfo, len(batchItemList))
for k, v := range batchItemList { for k, v := range batchItemList {
batchStoreSkuList[k] = v.(*partner.StoreSkuInfo) batchStoreSkuList[k] = v.(*partner.StoreSkuInfo)
} }
retVal, err = handler(task, batchStoreSkuList) retVal, successCount, err = handler(task, batchStoreSkuList)
if err != nil { if err != nil {
retVal = nil retVal = nil
} }
return retVal, err return retVal, successCount, err
}, storeSkuList) }, storeSkuList)
tasksch.HandleTask(task, parentTask, false).Run() tasksch.HandleTask(task, parentTask, false).Run()
resultList, err = task.GetResult(0) resultList, err = task.GetResult(0)
} else { } else {
result, err2 := handler(parentTask, storeSkuList) result, _, err2 := handler(parentTask, storeSkuList)
if err = err2; err == nil { if err = err2; err == nil {
resultList = utils.Interface2Slice(result) resultList = utils.Interface2Slice(result)
} }
@@ -158,24 +159,24 @@ func FreeBatchStoreSkuInfo(handler func(tasksch.ITask, []*partner.StoreSkuInfo)
return resultList, err return resultList, err
} }
func FreeBatchStoreSkuSyncInfo(handler func(tasksch.ITask, []*dao.StoreSkuSyncInfo) (interface{}, error), ctx *jxcontext.Context, parentTask tasksch.ITask, storeSkuList []*dao.StoreSkuSyncInfo, batchSize int, isContinueWhenError bool) (resultList []interface{}, err error) { func FreeBatchStoreSkuSyncInfo(handler func(tasksch.ITask, []*dao.StoreSkuSyncInfo) (interface{}, int, error), ctx *jxcontext.Context, parentTask tasksch.ITask, storeSkuList []*dao.StoreSkuSyncInfo, batchSize int, isContinueWhenError bool) (resultList []interface{}, err error) {
if len(storeSkuList) > batchSize { if true { //len(storeSkuList) > batchSize {
task := tasksch.NewParallelTask("FreeBatchStoreSkuSyncInfo", tasksch.NewParallelConfig().SetParallelCount(1).SetBatchSize(batchSize).SetIsContinueWhenError(isContinueWhenError), ctx, task := tasksch.NewParallelTask2("FreeBatchStoreSkuSyncInfo", tasksch.NewParallelConfig().SetParallelCount(1).SetBatchSize(batchSize).SetIsContinueWhenError(isContinueWhenError), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, successCount int, err error) {
batchStoreSkuList := make([]*dao.StoreSkuSyncInfo, len(batchItemList)) batchStoreSkuList := make([]*dao.StoreSkuSyncInfo, len(batchItemList))
for k, v := range batchItemList { for k, v := range batchItemList {
batchStoreSkuList[k] = v.(*dao.StoreSkuSyncInfo) batchStoreSkuList[k] = v.(*dao.StoreSkuSyncInfo)
} }
retVal, err = handler(task, batchStoreSkuList) retVal, successCount, err = handler(task, batchStoreSkuList)
if err != nil { if err != nil {
retVal = nil retVal = nil
} }
return retVal, err return retVal, successCount, err
}, storeSkuList) }, storeSkuList)
tasksch.HandleTask(task, parentTask, false).Run() tasksch.HandleTask(task, parentTask, false).Run()
resultList, err = task.GetResult(0) resultList, err = task.GetResult(0)
} else { } else {
result, err2 := handler(parentTask, storeSkuList) result, _, err2 := handler(parentTask, storeSkuList)
if err = err2; err == nil { if err = err2; err == nil {
resultList = utils.Interface2Slice(result) resultList = utils.Interface2Slice(result)
} }