- optimize SyncStoreSkus, isContinueWhenError will really continue

This commit is contained in:
gazebo
2019-03-13 10:54:47 +08:00
parent 6a9067f643
commit 786239a897

View File

@@ -1,6 +1,8 @@
package jd package jd
import ( import (
"fmt"
"git.rosy.net.cn/baseapi/platformapi/jdapi" "git.rosy.net.cn/baseapi/platformapi/jdapi"
"git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxutils" "git.rosy.net.cn/jx-callback/business/jxutils"
@@ -52,7 +54,7 @@ func (p *PurchaseHandler) SyncStoreSkus(ctx *jxcontext.Context, parentTask tasks
if err = dao.GetRows(db, &storeSkus, sql, append(sqlParams, sqlWhereParams...)...); err != nil { if err = dao.GetRows(db, &storeSkus, sql, append(sqlParams, sqlWhereParams...)...); err != nil {
return "", err return "", err
} }
task := tasksch.NewParallelTask("SyncStoresSkus京东", tasksch.NewParallelConfig().SetBatchSize(jdapi.MaxStoreSkuBatchSize).SetIsContinueWhenError(isContinueWhenError), ctx.GetUserName(), func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { task := tasksch.NewParallelTask("SyncStoresSkus京东", tasksch.NewParallelConfig().SetBatchSize(jdapi.MaxStoreSkuBatchSize).SetIsContinueWhenError(isContinueWhenError), ctx.GetUserName(), func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
var skuPriceInfoList []*jdapi.SkuPriceInfo var skuPriceInfoList []*jdapi.SkuPriceInfo
var skuVendibilityList []*jdapi.StockVendibility var skuVendibilityList []*jdapi.StockVendibility
var skuStockList []*jdapi.SkuStock var skuStockList []*jdapi.SkuStock
@@ -103,26 +105,45 @@ func (p *PurchaseHandler) SyncStoreSkus(ctx *jxcontext.Context, parentTask tasks
} }
} }
} }
syncMask := 0
errList := []error{}
if globals.EnableStoreWrite { if globals.EnableStoreWrite {
// todo 以下可以优化为并行操作 // todo 以下可以优化为并行操作
globals.SugarLogger.Debug(utils.Format4Output(skuVendibilityList, false), utils.Format4Output(skuPriceInfoList, false), utils.Format4Output(skuStockList, false)) globals.SugarLogger.Debug(utils.Format4Output(skuVendibilityList, false), utils.Format4Output(skuPriceInfoList, false), utils.Format4Output(skuStockList, false))
if len(skuVendibilityList) > 0 { if len(skuVendibilityList) > 0 {
_, err = api.JdAPI.BatchUpdateVendibility("", stationNo, skuVendibilityList, ctx.GetUserName()) if _, err = api.JdAPI.BatchUpdateVendibility("", stationNo, skuVendibilityList, ctx.GetUserName()); err == nil {
syncMask |= model.SyncFlagSaleMask
} else {
errList = append(errList, err)
}
} }
if err == nil && len(skuStockList) > 0 { if (err == nil || isContinueWhenError) && len(skuStockList) > 0 {
_, err = api.JdAPI.BatchUpdateCurrentQtys("", stationNo, skuStockList, ctx.GetUserName()) if _, err = api.JdAPI.BatchUpdateCurrentQtys("", stationNo, skuStockList, ctx.GetUserName()); err == nil {
syncMask |= model.SyncFlagNewMask | model.SyncFlagDeletedMask
} else {
errList = append(errList, err)
}
} }
if err == nil && len(skuPriceInfoList) > 0 { if (err == nil || isContinueWhenError) && len(skuPriceInfoList) > 0 {
_, err = api.JdAPI.UpdateVendorStationPrice("", stationNo, skuPriceInfoList) if _, err = api.JdAPI.UpdateVendorStationPrice("", stationNo, skuPriceInfoList); err == nil {
syncMask |= model.SyncFlagPriceMask
} else {
errList = append(errList, err)
}
} }
} }
if err == nil && len(batchSkuIDs) > 0 { if len(errList) == 0 {
syncMask = -1
} else if len(errList) > 1 {
err = fmt.Errorf("%v", errList)
}
if syncMask != 0 && len(batchSkuIDs) > 0 {
db := dao.GetDB() // 多线程问题 db := dao.GetDB() // 多线程问题
sql := ` sql := `
UPDATE store_sku_bind t1 UPDATE store_sku_bind t1
SET t1.jd_sync_status = 0 SET t1.jd_sync_status = t1.jd_sync_status & ?
` + sqlWhere0 + " AND t1.sku_id IN (" + dao.GenQuestionMarks(len(batchSkuIDs)) + ")" ` + sqlWhere0 + " AND t1.sku_id IN (" + dao.GenQuestionMarks(len(batchSkuIDs)) + ")"
_, err = dao.ExecuteSQL(db, sql, storeID, batchSkuIDs) _, err = dao.ExecuteSQL(db, sql, ^syncMask, storeID, batchSkuIDs)
} }
return nil, err return nil, err
}, storeSkus) }, storeSkus)