664 lines
27 KiB
Go
664 lines
27 KiB
Go
package cms
|
||
|
||
import (
|
||
"fmt"
|
||
"regexp"
|
||
"strings"
|
||
"time"
|
||
|
||
"git.rosy.net.cn/baseapi/utils"
|
||
"git.rosy.net.cn/jx-callback/business/jxutils"
|
||
|
||
"git.rosy.net.cn/jx-callback/business/partner/putils"
|
||
"git.rosy.net.cn/jx-callback/globals"
|
||
"git.rosy.net.cn/jx-callback/globals/refutil"
|
||
|
||
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
|
||
"git.rosy.net.cn/jx-callback/business/jxutils/tasksch"
|
||
"git.rosy.net.cn/jx-callback/business/model"
|
||
"git.rosy.net.cn/jx-callback/business/model/dao"
|
||
"git.rosy.net.cn/jx-callback/business/partner"
|
||
)
|
||
|
||
var (
|
||
subSensitiveWordRegexp = regexp.MustCompile(`[^\[\]\"\}]`)
|
||
)
|
||
|
||
func CreateStoreCategoryByStoreSku(ctx *jxcontext.Context, vendorID, storeID int, vendorStoreID string, nameIDs, skuIDs []int) (err error) {
|
||
db := dao.GetDB()
|
||
dao.Begin(db)
|
||
defer func() {
|
||
if r := recover(); r != nil || err != nil {
|
||
dao.Rollback(db)
|
||
if r != nil {
|
||
panic(r)
|
||
}
|
||
}
|
||
}()
|
||
for i := 0; i < 2; i++ {
|
||
localCats, err2 := dao.GetSkusCategories(db, model.VendorIDMTWM, storeID, skuIDs, i+1)
|
||
if err = err2; err != nil {
|
||
return err
|
||
}
|
||
for _, v := range localCats {
|
||
if v.MapID == 0 {
|
||
if err = dao.AddStoreCategoryMap(db, storeID, v.ID, vendorID, "", model.SyncFlagNewMask, ctx.GetUserName()); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
}
|
||
}
|
||
dao.Commit(db)
|
||
return err
|
||
}
|
||
|
||
func SyncStorCategories(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorID, storeID int, vendorStoreID string, nameIDs, skuIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
|
||
globals.SugarLogger.Debugf("SyncStorCategories %s storeID:%d, %s, userName:%s", model.VendorChineseNames[vendorID], storeID, vendorStoreID, ctx.GetUserName())
|
||
handler := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler)
|
||
num := 0
|
||
db := dao.GetDB()
|
||
rootTask := tasksch.NewSeqTask(fmt.Sprintf("%s SyncStoreCategory step1", model.VendorChineseNames[vendorID]), ctx,
|
||
func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
|
||
level := step + 1
|
||
catList, err := dao.GetStoreCategories(db, vendorID, storeID, level)
|
||
if len(catList) > 0 {
|
||
num += len(catList)
|
||
task := tasksch.NewParallelTask(fmt.Sprintf("%s SyncStoreCategory step2, level=%d", model.VendorChineseNames[vendorID], level),
|
||
tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError), ctx,
|
||
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||
updateFields := []string{dao.GetSyncStatusStructField(model.VendorNames[vendorID])}
|
||
syncStatusFieldName := dao.GetVendorThingIDStructField(model.VendorNames[vendorID])
|
||
catInfo := batchItemList[0].(*dao.SkuStoreCatInfo)
|
||
storeCatMap := &model.StoreSkuCategoryMap{}
|
||
storeCatMap.ID = catInfo.MapID
|
||
if model.IsSyncStatusDelete(catInfo.StoreCatSyncStatus) { // 删除
|
||
if model.IsSyncStatusDelete(catInfo.StoreCatSyncStatus) && !dao.IsVendorThingIDEmpty(catInfo.VendorCatID) {
|
||
err = handler.DeleteStoreCategory(ctx, storeID, vendorStoreID, catInfo.VendorCatID)
|
||
if err != nil && handler.IsErrCategoryNotExist(err) {
|
||
err = nil
|
||
}
|
||
}
|
||
} else if model.IsSyncStatusNew(catInfo.StoreCatSyncStatus) { // 新增
|
||
err = handler.CreateStoreCategory(ctx, storeID, vendorStoreID, catInfo)
|
||
if err != nil && handler.IsErrCategoryExist(err) {
|
||
if cat, err2 := handler.GetStoreCategory(ctx, storeID, vendorStoreID, catInfo.Name); err2 == nil {
|
||
catInfo.VendorCatID = cat.VendorCatID
|
||
err = nil
|
||
}
|
||
}
|
||
if err == nil {
|
||
updateFields = append(updateFields, syncStatusFieldName)
|
||
}
|
||
} else if model.IsSyncStatusUpdate(catInfo.StoreCatSyncStatus) { // 修改
|
||
if err = handler.UpdateStoreCategory(ctx, storeID, vendorStoreID, catInfo); err == nil {
|
||
updateFields = append(updateFields, syncStatusFieldName)
|
||
}
|
||
}
|
||
if err == nil {
|
||
if vendorID == model.VendorIDMTWM {
|
||
refutil.SetObjFieldByName(storeCatMap, syncStatusFieldName, catInfo.VendorCatID)
|
||
} else {
|
||
refutil.SetObjFieldByName(storeCatMap, syncStatusFieldName, utils.Str2Int64WithDefault(catInfo.VendorCatID, 0))
|
||
}
|
||
_, err = dao.UpdateEntity(db, storeCatMap, updateFields...)
|
||
}
|
||
return nil, err
|
||
}, catList)
|
||
rootTask.AddChild(task).Run()
|
||
_, err = task.GetResult(0)
|
||
}
|
||
return nil, err
|
||
}, 2)
|
||
tasksch.AddChild(parentTask, rootTask).Run()
|
||
if !isAsync {
|
||
_, err = rootTask.GetResult(0)
|
||
} else {
|
||
hint = rootTask.GetID()
|
||
}
|
||
return hint, err
|
||
}
|
||
|
||
func SyncStoreSkuNew(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorID, storeID int, vendorStoreID string, nameIDs, skuIDs, excludeSkuIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
|
||
singleStoreHandler, _ := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler)
|
||
if singleStoreHandler != nil {
|
||
if err = CreateStoreCategoryByStoreSku(ctx, vendorID, storeID, vendorStoreID, nameIDs, skuIDs); err != nil {
|
||
return "", err
|
||
}
|
||
}
|
||
task := tasksch.NewSeqTask("SyncStoreSkuNew", ctx,
|
||
func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
|
||
switch step {
|
||
case 0:
|
||
if singleStoreHandler != nil {
|
||
_, err = SyncStorCategories(ctx, task, vendorID, storeID, vendorStoreID, nameIDs, skuIDs, false, isContinueWhenError)
|
||
}
|
||
case 1:
|
||
err = syncStoreSkuNew(ctx, task, false, vendorID, storeID, nameIDs, skuIDs, excludeSkuIDs, isContinueWhenError)
|
||
}
|
||
return result, err
|
||
}, 2)
|
||
tasksch.HandleTask(task, parentTask, true).Run()
|
||
if !isAsync {
|
||
_, err = task.GetResult(0)
|
||
} else {
|
||
hint = task.GetID()
|
||
}
|
||
return hint, err
|
||
}
|
||
|
||
func FullSyncStoreSkuNew(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorID, storeID int, vendorStoreID string, isAsync, isContinueWhenError bool) (hint string, err error) {
|
||
singleStoreHandler, _ := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler)
|
||
task := tasksch.NewParallelTask("FullSyncStoreSkuNew", tasksch.NewParallelConfig().SetParallelCount(1).SetIsContinueWhenError(isContinueWhenError), ctx,
|
||
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||
step := batchItemList[0].(int)
|
||
switch step {
|
||
case 0:
|
||
if singleStoreHandler != nil {
|
||
_, err = ClearRemoteStoreStuffAndSetNew(ctx, task, vendorID, storeID, vendorStoreID, false, isContinueWhenError)
|
||
} else {
|
||
_, err = dao.SetStoreSkuSyncStatus(dao.GetDB(), vendorID, []int{storeID}, nil, model.SyncFlagStoreSkuOnlyMask)
|
||
}
|
||
case 1:
|
||
if singleStoreHandler != nil {
|
||
_, err = SyncStoreSkuNew(ctx, task, vendorID, storeID, vendorStoreID, nil, nil, nil, false, isContinueWhenError)
|
||
} else {
|
||
err = syncStoreSkuNew(ctx, task, true, vendorID, storeID, nil, nil, nil, isContinueWhenError)
|
||
}
|
||
}
|
||
return retVal, err
|
||
}, []int{0, 1})
|
||
tasksch.HandleTask(task, parentTask, true).Run()
|
||
if !isAsync {
|
||
_, err = task.GetResult(0)
|
||
} else {
|
||
hint = task.GetID()
|
||
}
|
||
return hint, err
|
||
}
|
||
|
||
func isStoreSkuSyncNeedDelete(storeSku *dao.StoreSkuSyncInfo) bool {
|
||
return model.IsSyncStatusDelete(storeSku.StoreSkuSyncStatus) || storeSku.DeletedAt != utils.DefaultTimeValue || storeSku.BindID == 0 || storeSku.NameID == 0
|
||
}
|
||
|
||
func storeSkuSyncInfo2Bare(inSku *dao.StoreSkuSyncInfo) (outSku *partner.StoreSkuInfo) {
|
||
outSku = &partner.StoreSkuInfo{
|
||
SkuID: inSku.SkuID,
|
||
VendorSkuID: inSku.VendorSkuID,
|
||
NameID: inSku.NameID,
|
||
VendorNameID: inSku.VendorNameID,
|
||
|
||
Status: inSku.MergedStatus,
|
||
VendorPrice: inSku.VendorPrice,
|
||
}
|
||
if !isStoreSkuSyncNeedDelete(inSku) {
|
||
outSku.Stock = model.MaxStoreSkuStockQty
|
||
}
|
||
return outSku
|
||
}
|
||
|
||
func calVendorPrice4StoreSku(inSku *dao.StoreSkuSyncInfo, pricePercentagePack model.PricePercentagePack, pricePercentage int) (outSku *dao.StoreSkuSyncInfo) {
|
||
pricePercentage = jxutils.GetPricePercentage(pricePercentagePack, int(inSku.Price), pricePercentage)
|
||
inSku.VendorPrice = int64(jxutils.CaculateSkuVendorPrice(int(inSku.Price), pricePercentage, 0))
|
||
if inSku.VendorPrice <= 0 {
|
||
inSku.VendorPrice = 1 // 最少1分钱
|
||
}
|
||
return inSku
|
||
}
|
||
|
||
func formalizeStoreSkuList(inSkuList []*dao.StoreSkuSyncInfo) []*dao.StoreSkuSyncInfo {
|
||
for _, skuItem := range inSkuList {
|
||
skuItem.MergedStatus = jxutils.MergeSkuStatus(skuItem.Status, skuItem.StoreSkuStatus)
|
||
skuItem.SkuName = jxutils.ComposeSkuName(skuItem.Prefix, skuItem.Name, skuItem.Comment, skuItem.Unit, skuItem.SpecQuality, skuItem.SpecUnit, 0)
|
||
}
|
||
return inSkuList
|
||
}
|
||
|
||
func sku2Update(vendorID int, sku *dao.StoreSkuSyncInfo, syncStatus int8) (item *dao.KVUpdateItem) {
|
||
kvs := map[string]interface{}{}
|
||
if syncStatus&(model.SyncFlagDeletedMask|model.SyncFlagNewMask|model.SyncFlagModifiedMask) != 0 {
|
||
if model.IsSyncStatusNew(syncStatus) {
|
||
sku.StoreSkuSyncStatus = 0
|
||
kvs[dao.GetVendorThingIDStructField(model.VendorNames[vendorID])] = utils.Str2Int64WithDefault(sku.VendorSkuID, 0)
|
||
} else if model.IsSyncStatusDelete(syncStatus) {
|
||
sku.StoreSkuSyncStatus = 0
|
||
if utils.IsTimeZero(sku.BindDeletedAt) {
|
||
kvs[model.FieldDeletedAt] = time.Now()
|
||
}
|
||
kvs[dao.GetVendorThingIDStructField(model.VendorNames[vendorID])] = 0
|
||
} else {
|
||
sku.StoreSkuSyncStatus = sku.StoreSkuSyncStatus & model.SyncFlagPriceMask
|
||
}
|
||
} else if syncStatus&model.SyncFlagStockMask != 0 {
|
||
if isStoreSkuSyncNeedDelete(sku) {
|
||
sku.StoreSkuSyncStatus = 0
|
||
} else {
|
||
sku.StoreSkuSyncStatus = sku.StoreSkuSyncStatus & (model.SyncFlagPriceMask | model.SyncFlagSaleMask)
|
||
}
|
||
} else {
|
||
sku.StoreSkuSyncStatus = sku.StoreSkuSyncStatus & ^syncStatus
|
||
}
|
||
kvs[dao.GetSyncStatusStructField(model.VendorNames[vendorID])] = sku.StoreSkuSyncStatus
|
||
if sku.VendorPrice > 0 {
|
||
kvs[dao.GetVendorPriceStructField(model.VendorNames[vendorID])] = sku.VendorPrice
|
||
}
|
||
storeSku := &model.StoreSkuBind{}
|
||
storeSku.ID = sku.BindID
|
||
item = &dao.KVUpdateItem{
|
||
Item: storeSku,
|
||
KVs: kvs,
|
||
}
|
||
return item
|
||
}
|
||
|
||
func updateStoreSku(db *dao.DaoDB, vendorID int, storeSkuList []*dao.StoreSkuSyncInfo, syncStatus int8) (num int64, err error) {
|
||
if len(storeSkuList) > 0 {
|
||
updateItemList := make([]*dao.KVUpdateItem, len(storeSkuList))
|
||
for k, v := range storeSkuList {
|
||
updateItemList[k] = sku2Update(vendorID, v, syncStatus)
|
||
}
|
||
num, err = dao.BatchUpdateEntityByKV(db, updateItemList)
|
||
}
|
||
return num, err
|
||
}
|
||
|
||
func syncStoreSkuNew(ctx *jxcontext.Context, parentTask tasksch.ITask, isFull bool, vendorID, storeID int, nameIDs, skuIDs, excludeSkuIDs []int, isContinueWhenError bool) (err error) {
|
||
db := dao.GetDB()
|
||
storeDetail, err := dao.GetStoreDetail(db, storeID, vendorID)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
vendorStoreID := storeDetail.VendorStoreID
|
||
|
||
var skus []*dao.StoreSkuSyncInfo
|
||
if isFull {
|
||
skus, err = dao.GetFullStoreSkus(db, vendorID, storeID)
|
||
} else {
|
||
skus, err = dao.GetStoreSkus(db, vendorID, storeID, skuIDs)
|
||
}
|
||
if err != nil || len(skus) == 0 {
|
||
return err
|
||
}
|
||
if len(excludeSkuIDs) > 0 {
|
||
excludeSkuMap := jxutils.IntList2Map(excludeSkuIDs)
|
||
var skus2 []*dao.StoreSkuSyncInfo
|
||
for _, v := range skus {
|
||
if excludeSkuMap[v.SkuID] == 0 {
|
||
skus2 = append(skus2, v)
|
||
}
|
||
}
|
||
skus = skus2
|
||
}
|
||
formalizeStoreSkuList(skus)
|
||
|
||
singleStoreHandler, _ := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler)
|
||
storeSkuHandler := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.IPurchasePlatformStoreSkuHandler)
|
||
|
||
var (
|
||
createList, updateList []*dao.StoreSkuSyncInfo
|
||
deleteList, stockList, onlineList, offlineList, priceList []*partner.StoreSkuInfo
|
||
updateItems []*dao.KVUpdateItem
|
||
)
|
||
skuMap := make(map[*partner.StoreSkuInfo]*dao.StoreSkuSyncInfo)
|
||
|
||
for _, sku := range skus {
|
||
var bareSku *partner.StoreSkuInfo
|
||
if isStoreSkuSyncNeedDelete(sku) {
|
||
if !dao.IsVendorThingIDEmpty(sku.VendorSkuID) {
|
||
bareSku = storeSkuSyncInfo2Bare(sku)
|
||
if singleStoreHandler == nil {
|
||
stockList = append(stockList, bareSku)
|
||
} else {
|
||
deleteList = append(deleteList, bareSku)
|
||
}
|
||
} else {
|
||
updateItems = append(updateItems, sku2Update(vendorID, sku, model.SyncFlagDeletedMask))
|
||
}
|
||
} else if model.IsSyncStatusNew(sku.StoreSkuSyncStatus) {
|
||
calVendorPrice4StoreSku(sku, storeDetail.PricePercentagePackObj, int(storeDetail.PricePercentage))
|
||
if singleStoreHandler == nil {
|
||
sku.StoreSkuSyncStatus |= model.SyncFlagSaleMask | model.SyncFlagPriceMask
|
||
bareSku = storeSkuSyncInfo2Bare(calVendorPrice4StoreSku(sku, storeDetail.PricePercentagePackObj, int(storeDetail.PricePercentage)))
|
||
stockList = append(stockList, bareSku)
|
||
priceList = append(priceList, bareSku)
|
||
if sku.MergedStatus == model.SkuStatusNormal {
|
||
onlineList = append(onlineList, bareSku)
|
||
} else {
|
||
offlineList = append(offlineList, bareSku)
|
||
}
|
||
} else {
|
||
if sku.MergedStatus == model.SkuStatusNormal /*&& !dao.IsVendorThingIDEmpty(sku.VendorCatID)*/ {
|
||
createList = append(createList, sku)
|
||
}
|
||
}
|
||
} else {
|
||
if dao.IsVendorThingIDEmpty(sku.VendorSkuID) {
|
||
err = fmt.Errorf("门店:%d,修改没有创建的商品:%d", storeID, sku.SkuID)
|
||
if parentTask == nil {
|
||
return err
|
||
}
|
||
parentTask.AddBatchErr(err)
|
||
} else {
|
||
isAdded2Update := false
|
||
// 修改商品信息时不改价(以免活动引起的失败),而用单独的改价来改
|
||
if model.IsSyncStatusUpdate(sku.StoreSkuSyncStatus) && singleStoreHandler != nil {
|
||
isAdded2Update = true
|
||
updateList = append(updateList, calVendorPrice4StoreSku(sku, storeDetail.PricePercentagePackObj, int(storeDetail.PricePercentage)))
|
||
}
|
||
if model.IsSyncStatusPrice(sku.StoreSkuSyncStatus) {
|
||
bareSku = storeSkuSyncInfo2Bare(calVendorPrice4StoreSku(sku, storeDetail.PricePercentagePackObj, int(storeDetail.PricePercentage)))
|
||
priceList = append(priceList, bareSku)
|
||
}
|
||
if !isAdded2Update {
|
||
if model.IsSyncStatusUpdate(sku.StoreSkuSyncStatus) && singleStoreHandler == nil { // 正常就不应该进到这里
|
||
if bareSku == nil {
|
||
bareSku = storeSkuSyncInfo2Bare(sku)
|
||
}
|
||
updateItems = append(updateItems, sku2Update(vendorID, sku, model.SyncFlagStockMask))
|
||
}
|
||
if model.IsSyncStatusSale(sku.StoreSkuSyncStatus) {
|
||
if bareSku == nil {
|
||
bareSku = storeSkuSyncInfo2Bare(sku)
|
||
}
|
||
if sku.MergedStatus == model.SkuStatusNormal {
|
||
onlineList = append(onlineList, bareSku)
|
||
stockList = append(stockList, bareSku)
|
||
} else {
|
||
offlineList = append(offlineList, bareSku)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
if bareSku != nil {
|
||
skuMap[bareSku] = sku
|
||
}
|
||
}
|
||
if _, err = dao.BatchUpdateEntityByKV(db, updateItems); err != nil {
|
||
return err
|
||
}
|
||
bareSku2Sync := func(bareSkuList []*partner.StoreSkuInfo) (skuList []*dao.StoreSkuSyncInfo) {
|
||
if len(bareSkuList) > 0 {
|
||
skuList = make([]*dao.StoreSkuSyncInfo, len(bareSkuList))
|
||
for k, v := range bareSkuList {
|
||
skuList[k] = skuMap[v]
|
||
}
|
||
}
|
||
return skuList
|
||
}
|
||
task := tasksch.NewParallelTask("syncStoreSkuNew", tasksch.NewParallelConfig().SetParallelCount(1).SetIsContinueWhenError(isContinueWhenError), ctx,
|
||
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||
step := batchItemList[0].(int)
|
||
// globals.SugarLogger.Debugf("step:%d", step)
|
||
switch step {
|
||
case 0:
|
||
if len(deleteList) > 0 {
|
||
_, err = putils.FreeBatchStoreSkuInfo("删除门店商品", func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
|
||
var successList []*partner.StoreSkuInfo
|
||
if successList, err = singleStoreHandler.DeleteStoreSkus(ctx, storeID, vendorStoreID, batchedStoreSkuList); singleStoreHandler.IsErrSkuNotExist(err) {
|
||
err = nil
|
||
}
|
||
if err == nil {
|
||
successList = batchedStoreSkuList
|
||
}
|
||
if len(successList) > 0 {
|
||
updateStoreSku(dao.GetDB(), vendorID, bareSku2Sync(successList), model.SyncFlagDeletedMask)
|
||
}
|
||
return nil, len(successList), err
|
||
}, ctx, task, deleteList, 1 /*singleStoreHandler.GetStoreSkusBatchSize(partner.FuncDeleteStoreSkus)*/, isContinueWhenError)
|
||
}
|
||
case 1:
|
||
if len(createList) > 0 {
|
||
_, err = putils.FreeBatchStoreSkuSyncInfo("创建门店商品", func(task tasksch.ITask, batchedStoreSkuList []*dao.StoreSkuSyncInfo) (result interface{}, successCount int, err error) {
|
||
var successList []*dao.StoreSkuSyncInfo
|
||
if successList, err = singleStoreHandler.CreateStoreSkus(ctx, storeID, vendorStoreID, batchedStoreSkuList); singleStoreHandler.IsErrSkuExist(err) {
|
||
if skuNameList, err2 := singleStoreHandler.GetStoreSkusFullInfo(ctx, task, storeID, vendorStoreID, []*partner.StoreSkuInfo{
|
||
&partner.StoreSkuInfo{
|
||
SkuID: batchedStoreSkuList[0].SkuID,
|
||
},
|
||
}); err2 == nil && len(skuNameList) > 0 {
|
||
batchedStoreSkuList[0].VendorNameID = skuNameList[0].VendorNameID
|
||
batchedStoreSkuList[0].VendorSkuID = skuNameList[0].SkuList[0].VendorSkuID
|
||
|
||
// 如果创建商品时已经存在,需要更新
|
||
updateList = append(updateList, calVendorPrice4StoreSku(batchedStoreSkuList[0], storeDetail.PricePercentagePackObj, int(storeDetail.PricePercentage)))
|
||
err = nil
|
||
}
|
||
}
|
||
if err == nil {
|
||
successList = batchedStoreSkuList
|
||
} else {
|
||
//handle error for sensitive words, if find, then insert to table sensitive_words
|
||
if sensitiveWord := GetSensitiveWord(singleStoreHandler, err.Error()); sensitiveWord != "" {
|
||
dao.InsertSensitiveWord(sensitiveWord, vendorID, ctx.GetUserName())
|
||
}
|
||
}
|
||
if len(successList) > 0 {
|
||
updateStoreSku(dao.GetDB(), vendorID, successList, model.SyncFlagNewMask)
|
||
}
|
||
return nil, len(successList), err
|
||
}, ctx, task, createList, 1 /*singleStoreHandler.GetStoreSkusBatchSize(partner.FuncCreateStoreSkus)*/, isContinueWhenError)
|
||
}
|
||
case 2:
|
||
if len(updateList) > 0 {
|
||
_, err = putils.FreeBatchStoreSkuSyncInfo("更新门店商品基础信息", func(task tasksch.ITask, batchedStoreSkuList []*dao.StoreSkuSyncInfo) (result interface{}, successCount int, err error) {
|
||
var successList []*dao.StoreSkuSyncInfo
|
||
if successList, err = singleStoreHandler.UpdateStoreSkus(ctx, storeID, vendorStoreID, batchedStoreSkuList); err == nil {
|
||
successList = batchedStoreSkuList
|
||
}
|
||
if len(successList) > 0 {
|
||
updateStoreSku(dao.GetDB(), vendorID, successList, model.SyncFlagModifiedMask)
|
||
}
|
||
return nil, len(successList), err
|
||
}, ctx, task, updateList, singleStoreHandler.GetStoreSkusBatchSize(partner.FuncUpdateStoreSkus), isContinueWhenError)
|
||
}
|
||
case 3:
|
||
for k, list := range [][]*partner.StoreSkuInfo{stockList /*, onlineList*/} {
|
||
if len(list) > 0 {
|
||
_, err = putils.FreeBatchStoreSkuInfo("更新门店商品库存", func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
|
||
var successList []*partner.StoreSkuInfo
|
||
if successList, err = storeSkuHandler.UpdateStoreSkusStock(ctx, storeID, vendorStoreID, batchedStoreSkuList); err == nil {
|
||
successList = batchedStoreSkuList
|
||
}
|
||
if k == 0 && len(successList) > 0 {
|
||
updateStoreSku(dao.GetDB(), vendorID, bareSku2Sync(successList), model.SyncFlagStockMask)
|
||
}
|
||
return nil, len(successList), err
|
||
}, ctx, task, list, storeSkuHandler.GetStoreSkusBatchSize(partner.FuncUpdateStoreSkusStock), isContinueWhenError)
|
||
}
|
||
}
|
||
case 4, 5:
|
||
statusList := onlineList
|
||
status := model.SkuStatusNormal
|
||
name := "可售门店商品"
|
||
if step == 5 {
|
||
statusList = offlineList
|
||
status = model.SkuStatusDontSale
|
||
name = "不可售门店商品"
|
||
}
|
||
if len(statusList) > 0 {
|
||
_, err = putils.FreeBatchStoreSkuInfo(name, func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
|
||
var successList []*partner.StoreSkuInfo
|
||
if successList, err = storeSkuHandler.UpdateStoreSkusStatus(ctx, storeID, vendorStoreID, batchedStoreSkuList, status); err == nil {
|
||
successList = batchedStoreSkuList
|
||
}
|
||
if len(successList) > 0 {
|
||
updateStoreSku(dao.GetDB(), vendorID, bareSku2Sync(successList), model.SyncFlagSaleMask)
|
||
}
|
||
return nil, len(successList), err
|
||
}, ctx, task, statusList, storeSkuHandler.GetStoreSkusBatchSize(partner.FuncUpdateStoreSkusStatus), isContinueWhenError)
|
||
}
|
||
case 6:
|
||
if len(priceList) > 0 {
|
||
_, err = putils.FreeBatchStoreSkuInfo("更新门店商品价格", func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
|
||
var successList []*partner.StoreSkuInfo
|
||
if successList, err = storeSkuHandler.UpdateStoreSkusPrice(ctx, storeID, vendorStoreID, batchedStoreSkuList); err == nil {
|
||
successList = batchedStoreSkuList
|
||
}
|
||
if len(successList) > 0 {
|
||
updateStoreSku(dao.GetDB(), vendorID, bareSku2Sync(successList), model.SyncFlagPriceMask)
|
||
}
|
||
return nil, len(successList), err
|
||
}, ctx, task, priceList, storeSkuHandler.GetStoreSkusBatchSize(partner.FuncUpdateStoreSkusPrice), isContinueWhenError)
|
||
}
|
||
}
|
||
return retVal, err
|
||
}, []int{0, 1, 2, 3, 4, 5, 6})
|
||
tasksch.HandleTask(task, parentTask, true).Run()
|
||
_, err = task.GetResult(0)
|
||
return err
|
||
}
|
||
|
||
// 清除京西没有,平台有的商品
|
||
func PruneMissingStoreSkus(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorID, storeID int, vendorStoreID string, isAsync, isContinueWhenError bool) (hint string, err error) {
|
||
handler, _ := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler)
|
||
if handler == nil {
|
||
return "", fmt.Errorf("平台:%s不支持此操作", model.VendorChineseNames[vendorID])
|
||
}
|
||
db := dao.GetDB()
|
||
localSkuList, err := dao.GetStoreSkus2(db, vendorID, storeID, nil, false)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
localSkuMap := make(map[int]*dao.StoreSkuSyncInfo)
|
||
for _, v := range localSkuList {
|
||
localSkuMap[v.SkuID] = v
|
||
}
|
||
var sku2Delete []*partner.StoreSkuInfo
|
||
task := tasksch.NewSeqTask(fmt.Sprintf("清除平台:%s上多余的门店商品", model.VendorChineseNames[vendorID]), ctx,
|
||
func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
|
||
switch step {
|
||
case 0:
|
||
remoteSkuList, err2 := handler.GetStoreSkusFullInfo(ctx, task, storeID, vendorStoreID, nil)
|
||
if err = err2; err == nil {
|
||
for _, v := range remoteSkuList {
|
||
if localSkuMap[v.SkuList[0].SkuID] == nil {
|
||
sku2Delete = append(sku2Delete, &partner.StoreSkuInfo{
|
||
SkuID: v.SkuList[0].SkuID,
|
||
VendorSkuID: v.SkuList[0].VendorSkuID,
|
||
})
|
||
}
|
||
}
|
||
}
|
||
case 1:
|
||
if len(sku2Delete) > 0 {
|
||
_, err = putils.FreeBatchStoreSkuInfo("删除门店商品", func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
|
||
_, err = handler.DeleteStoreSkus(ctx, storeID, vendorStoreID, sku2Delete)
|
||
return nil, 0, err
|
||
}, ctx, parentTask, sku2Delete, handler.GetStoreSkusBatchSize(partner.FuncDeleteStoreSkus), isContinueWhenError)
|
||
}
|
||
}
|
||
return nil, err
|
||
}, 2)
|
||
tasksch.HandleTask(task, parentTask, true).Run()
|
||
if isAsync {
|
||
hint = task.GetID()
|
||
} else {
|
||
_, err = task.GetResult(0)
|
||
}
|
||
return hint, err
|
||
}
|
||
|
||
// 把京西有,平台无且没有待创建标记的商品加上待创建标记
|
||
func AddCreateFlagForJxStoreSku(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorID, storeID int, vendorStoreID string, isAsync, isContinueWhenError bool) (hint string, err error) {
|
||
handler, _ := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler)
|
||
if handler == nil {
|
||
return "", fmt.Errorf("平台:%s不支持此操作", model.VendorChineseNames[vendorID])
|
||
}
|
||
db := dao.GetDB()
|
||
localSkuList, err := dao.GetStoreSkus2(db, vendorID, storeID, nil, false)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
|
||
var skuIDList []int
|
||
seqTaskFunc := func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
|
||
switch step {
|
||
case 0:
|
||
remoteSkuList, err2 := handler.GetStoreSkusFullInfo(ctx, task, storeID, vendorStoreID, nil)
|
||
if err = err2; err == nil {
|
||
remoteSkuMap := make(map[int]*partner.SkuNameInfo)
|
||
for _, value := range remoteSkuList {
|
||
for _, skuInfo := range value.SkuList {
|
||
remoteSkuMap[skuInfo.SkuID] = value
|
||
}
|
||
}
|
||
for _, v := range localSkuList {
|
||
if remoteSkuMap[v.SkuID] == nil && !model.IsSyncStatusNew(v.StoreSkuSyncStatus) && !model.IsSyncStatusDelete(v.StoreSkuSyncStatus) {
|
||
skuIDList = append(skuIDList, v.SkuID)
|
||
}
|
||
}
|
||
}
|
||
case 1:
|
||
if len(skuIDList) > 0 {
|
||
storeSkuList, err := dao.GetStoresSkusInfo(db, []int{storeID}, skuIDList)
|
||
if err == nil {
|
||
for _, skuBind := range storeSkuList {
|
||
fieldStatus := dao.GetSyncStatusStructField(model.VendorNames[vendorID])
|
||
dao.UpdateEntityLogicallyAndUpdateSyncStatus(db, skuBind, nil, ctx.GetUserName(), nil, fieldStatus, model.SyncFlagNewMask)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
return nil, err
|
||
}
|
||
task := tasksch.NewSeqTask(fmt.Sprintf("处理京西门店商品加待创建标记:%s", model.VendorChineseNames[vendorID]), ctx, seqTaskFunc, 2)
|
||
tasksch.HandleTask(task, parentTask, true).Run()
|
||
if isAsync {
|
||
hint = task.GetID()
|
||
} else {
|
||
_, err = task.GetResult(0)
|
||
}
|
||
return hint, err
|
||
}
|
||
|
||
func ClearRemoteStoreStuffAndSetNew(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorID, storeID int, vendorStoreID string, isAsync, isContinueWhenError bool) (hint string, err error) {
|
||
userName := ctx.GetUserName()
|
||
globals.SugarLogger.Debugf("ClearRemoteStoreStuffAndSetNew storeID:%d, isContinueWhenError:%t, userName:%s", storeID, isContinueWhenError, userName)
|
||
handler, _ := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler)
|
||
if handler == nil {
|
||
return "", fmt.Errorf("平台:%s不支持此操作", model.VendorChineseNames[vendorID])
|
||
}
|
||
|
||
db := dao.GetDB()
|
||
var errDeleteSku error
|
||
rootTask := tasksch.NewSeqTask(fmt.Sprintf("ClearRemoteStoreStuffAndSetNew:%s", model.VendorChineseNames[vendorID]), ctx,
|
||
func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
|
||
switch step {
|
||
case 0:
|
||
err = handler.DeleteStoreAllSkus(ctx, rootTask, storeID, vendorStoreID, isContinueWhenError)
|
||
errDeleteSku = err
|
||
// 强制忽略删除SKU错误
|
||
if isContinueWhenError {
|
||
err = nil
|
||
}
|
||
if err == nil {
|
||
_, err = dao.SetStoreSkuSyncStatus(db, vendorID, []int{storeID}, nil, model.SyncFlagNewMask)
|
||
}
|
||
case 1:
|
||
if err = handler.DeleteStoreAllCategories(ctx, rootTask, storeID, vendorStoreID, isContinueWhenError); err == nil {
|
||
_, err = dao.SetStoreCategorySyncStatus(db, vendorID, []int{storeID}, nil, model.SyncFlagNewMask)
|
||
}
|
||
}
|
||
return nil, err
|
||
}, 2)
|
||
tasksch.AddChild(parentTask, rootTask).Run()
|
||
if !isAsync {
|
||
_, err = rootTask.GetResult(0)
|
||
}
|
||
if err == nil {
|
||
err = errDeleteSku
|
||
}
|
||
return rootTask.ID, err
|
||
}
|
||
|
||
func GetSensitiveWord(singleStoreHandler partner.ISingleStoreStoreSkuHandler, str string) string {
|
||
sensitiveWordRegexp := singleStoreHandler.GetSensitiveWordRegexp()
|
||
findResult := sensitiveWordRegexp.FindStringSubmatch(str)
|
||
if findResult != nil && len(findResult) > 1 {
|
||
findSubResult := subSensitiveWordRegexp.FindAllString(findResult[1], -1)
|
||
return strings.Join(findSubResult, "")
|
||
}
|
||
|
||
return ""
|
||
}
|