Files
jx-callback/business/jxstore/cms/sync_store_sku.go
2019-07-18 09:11:31 +08:00

470 lines
19 KiB
Go

package cms
import (
"fmt"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxutils"
"git.rosy.net.cn/jx-callback/business/partner/putils"
"git.rosy.net.cn/jx-callback/globals"
"git.rosy.net.cn/jx-callback/globals/refutil"
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
"git.rosy.net.cn/jx-callback/business/jxutils/tasksch"
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/model/dao"
"git.rosy.net.cn/jx-callback/business/partner"
)
func CreateStoreCategoryByStoreSku(ctx *jxcontext.Context, vendorID, storeID int, vendorStoreID string, nameIDs, skuIDs []int) (err error) {
db := dao.GetDB()
dao.Begin(db)
defer func() {
if r := recover(); r != nil || err != nil {
dao.Rollback(db)
if r != nil {
panic(r)
}
}
}()
for i := 0; i < 2; i++ {
localCats, err2 := dao.GetSkusCategories(db, model.VendorIDMTWM, storeID, skuIDs, i+1)
if err = err2; err != nil {
return err
}
for _, v := range localCats {
if v.MapID == 0 {
if err = dao.AddStoreCategoryMap(db, storeID, v.ID, vendorID, "", model.SyncFlagNewMask, ctx.GetUserName()); err != nil {
return err
}
}
}
}
dao.Commit(db)
return err
}
func SyncStorCategories(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorID, storeID int, vendorStoreID string, nameIDs, skuIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
globals.SugarLogger.Debugf("SyncStorCategories %s storeID:%d, %s, userName:%s", storeID, model.VendorChineseNames[vendorID], ctx.GetUserName())
handler := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler)
num := 0
db := dao.GetDB()
rootTask := tasksch.NewSeqTask(fmt.Sprintf("%s SyncStoreCategory step1", model.VendorChineseNames[vendorID]), ctx,
func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
level := step + 1
catList, err := dao.GetStoreCategories(db, vendorID, storeID, level)
if len(catList) > 0 {
num += len(catList)
task := tasksch.NewParallelTask(fmt.Sprintf("%s SyncStoreCategory step2, level=%d", model.VendorChineseNames[vendorID], level),
tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
updateFields := []string{dao.GetSyncStatusStructField(model.VendorNames[vendorID])}
syncStatusFieldName := dao.GetVendorThingIDStructField(model.VendorNames[vendorID])
catInfo := batchItemList[0].(*dao.SkuStoreCatInfo)
storeCatMap := &model.StoreSkuCategoryMap{}
storeCatMap.ID = catInfo.MapID
if model.IsSyncStatusDelete(catInfo.StoreCatSyncStatus) { // 删除
if model.IsSyncStatusDelete(catInfo.StoreCatSyncStatus) && !dao.IsVendorThingIDEmpty(catInfo.VendorCatID) {
err = handler.DeleteStoreCategory(ctx, storeID, vendorStoreID, catInfo.VendorCatID)
}
} else if model.IsSyncStatusNew(catInfo.StoreCatSyncStatus) { // 新增
if err = handler.CreateStoreCategory(ctx, storeID, vendorStoreID, catInfo); err == nil {
updateFields = append(updateFields, syncStatusFieldName)
}
} else if model.IsSyncStatusUpdate(catInfo.StoreCatSyncStatus) { // 修改
if err = handler.UpdateStoreCategory(ctx, storeID, vendorStoreID, catInfo); err == nil {
updateFields = append(updateFields, syncStatusFieldName)
}
}
if err == nil {
if vendorID == model.VendorIDMTWM {
refutil.SetObjFieldByName(storeCatMap, syncStatusFieldName, catInfo.VendorCatID)
} else {
refutil.SetObjFieldByName(storeCatMap, syncStatusFieldName, utils.Str2Int64WithDefault(catInfo.VendorCatID, 0))
}
_, err = dao.UpdateEntity(db, storeCatMap, updateFields...)
}
return nil, err
}, catList)
rootTask.AddChild(task).Run()
_, err = task.GetResult(0)
}
return nil, err
}, 2)
tasksch.AddChild(parentTask, rootTask).Run()
if !isAsync {
_, err = rootTask.GetResult(0)
} else {
hint = rootTask.GetID()
}
return hint, err
}
func SyncStoreSkuNew(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorID, storeID int, vendorStoreID string, nameIDs, skuIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
singleStoreHandler, _ := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler)
if singleStoreHandler != nil {
if err = CreateStoreCategoryByStoreSku(ctx, vendorID, storeID, vendorStoreID, nameIDs, skuIDs); err != nil {
return "", err
}
}
task := tasksch.NewSeqTask("SyncStoreSkuNew", ctx,
func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
switch step {
case 0:
if singleStoreHandler != nil {
_, err = SyncStorCategories(ctx, task, vendorID, storeID, vendorStoreID, nameIDs, skuIDs, false, isContinueWhenError)
}
case 1:
err = syncStoreSkuNew(ctx, task, false, vendorID, storeID, nameIDs, skuIDs, isContinueWhenError)
}
return result, err
}, 2)
tasksch.HandleTask(task, parentTask, true).Run()
if !isAsync {
_, err = task.GetResult(0)
} else {
hint = task.GetID()
}
return hint, err
}
func FullSyncStoreSkuNew(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorID, storeID int, vendorStoreID string, isAsync, isContinueWhenError bool) (hint string, err error) {
singleStoreHandler, _ := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler)
task := tasksch.NewParallelTask("FullSyncStoreSkuNew", tasksch.NewParallelConfig().SetParallelCount(1).SetIsContinueWhenError(isContinueWhenError), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
step := batchItemList[0].(int)
switch step {
case 0:
if singleStoreHandler != nil {
_, err = ClearRemoteStoreStuffAndSetNew(ctx, task, vendorID, storeID, vendorStoreID, false, isContinueWhenError)
} else {
_, err = dao.SetStoreSkuSyncStatus(dao.GetDB(), vendorID, []int{storeID}, nil, model.SyncFlagStoreSkuOnlyMask)
}
case 1:
if singleStoreHandler != nil {
_, err = SyncStoreSkuNew(ctx, task, vendorID, storeID, vendorStoreID, nil, nil, false, isContinueWhenError)
} else {
err = syncStoreSkuNew(ctx, task, false, vendorID, storeID, nil, nil, isContinueWhenError)
}
}
return retVal, err
}, []int{0, 1})
tasksch.HandleTask(task, parentTask, true).Run()
if !isAsync {
_, err = task.GetResult(0)
} else {
hint = task.GetID()
}
return hint, err
}
func isStoreSkuSyncNeedDelete(storeSku *dao.StoreSkuSyncInfo) bool {
return model.IsSyncStatusDelete(storeSku.StoreSkuSyncStatus) || storeSku.DeletedAt != utils.DefaultTimeValue || storeSku.BindID == 0 || storeSku.NameID == 0
}
func storeSkuSyncInfo2Bare(inSku *dao.StoreSkuSyncInfo) (outSku *partner.StoreSkuInfo) {
outSku = &partner.StoreSkuInfo{
SkuID: inSku.SkuID,
VendorSkuID: inSku.VendorSkuID,
NameID: inSku.NameID,
VendorNameID: inSku.VendorNameID,
Status: inSku.MergedStatus,
VendorPrice: inSku.VendorPrice,
}
if !isStoreSkuSyncNeedDelete(inSku) {
outSku.Stock = model.MaxStoreSkuStockQty
}
return outSku
}
func calVendorPrice4StoreSku(inSku *dao.StoreSkuSyncInfo, pricePercentagePack model.PricePercentagePack, pricePercentage int) (outSku *dao.StoreSkuSyncInfo) {
pricePercentage = jxutils.GetPricePercentage(pricePercentagePack, int(inSku.Price), pricePercentage)
inSku.VendorPrice = int64(jxutils.CaculateSkuVendorPrice(int(inSku.Price), pricePercentage, 0))
return inSku
}
func formalizeStoreSkuList(inSkuList []*dao.StoreSkuSyncInfo) []*dao.StoreSkuSyncInfo {
for _, skuItem := range inSkuList {
skuItem.MergedStatus = jxutils.MergeSkuStatus(skuItem.Status, skuItem.StoreSkuStatus)
skuItem.SkuName = jxutils.ComposeSkuName(skuItem.Prefix, skuItem.Name, skuItem.Comment, skuItem.Unit, skuItem.SpecQuality, skuItem.SpecUnit, 0)
}
return inSkuList
}
func sku2Update(vendorID int, sku *dao.StoreSkuSyncInfo, syncStatus int8) (item *dao.KVUpdateItem) {
if syncStatus&(model.SyncFlagDeletedMask|model.SyncFlagNewMask|model.SyncFlagModifiedMask) != 0 {
sku.StoreSkuSyncStatus = 0
} else {
sku.StoreSkuSyncStatus = sku.StoreSkuSyncStatus & ^syncStatus
}
kvs := map[string]interface{}{
dao.GetSyncStatusStructField(model.VendorNames[vendorID]): sku.StoreSkuSyncStatus,
}
if syncStatus == model.SyncFlagNewMask {
kvs[dao.GetVendorThingIDStructField(model.VendorNames[vendorID])] = sku.VendorSkuID
}
storeSku := &model.StoreSkuBind{}
storeSku.ID = sku.BindID
item = &dao.KVUpdateItem{
Item: storeSku,
KVs: kvs,
}
return item
}
func updateStoreSku(db *dao.DaoDB, vendorID int, storeSkuList []*dao.StoreSkuSyncInfo, syncStatus int8) (num int64, err error) {
if len(storeSkuList) > 0 {
updateItemList := make([]*dao.KVUpdateItem, len(storeSkuList))
for k, v := range storeSkuList {
updateItemList[k] = sku2Update(vendorID, v, syncStatus)
}
num, err = dao.BatchUpdateEntityByKV(db, updateItemList)
}
return num, err
}
func syncStoreSkuNew(ctx *jxcontext.Context, parentTask tasksch.ITask, isFull bool, vendorID, storeID int, nameIDs, skuIDs []int, isContinueWhenError bool) (err error) {
db := dao.GetDB()
storeDetail, err := dao.GetStoreDetail(db, storeID, vendorID)
if err != nil {
return err
}
vendorStoreID := storeDetail.VendorStoreID
var skus []*dao.StoreSkuSyncInfo
if isFull {
skus, err = dao.GetFullStoreSkus(db, vendorID, storeID)
} else {
skus, err = dao.GetStoreSkus(db, vendorID, storeID, skuIDs)
}
if err != nil {
return err
}
formalizeStoreSkuList(skus)
singleStoreHandler, _ := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler)
storeSkuHandler := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.IPurchasePlatformStoreSkuHandler)
var (
createList, updateList []*dao.StoreSkuSyncInfo
deleteList, stockList, statusList, priceList []*partner.StoreSkuInfo
updateItems []*dao.KVUpdateItem
)
skuMap := make(map[*partner.StoreSkuInfo]*dao.StoreSkuSyncInfo)
for _, sku := range skus {
var bareSku *partner.StoreSkuInfo
if isStoreSkuSyncNeedDelete(sku) {
if !dao.IsVendorThingIDEmpty(sku.VendorSkuID) {
bareSku = storeSkuSyncInfo2Bare(sku)
if singleStoreHandler == nil {
stockList = append(stockList, bareSku)
} else {
deleteList = append(deleteList, bareSku)
}
} else {
updateItems = append(updateItems, sku2Update(vendorID, sku, model.SyncFlagDeletedMask))
}
} else if model.IsSyncStatusNew(sku.StoreSkuSyncStatus) {
calVendorPrice4StoreSku(sku, storeDetail.PricePercentagePackObj, int(storeDetail.PricePercentage))
bareSku = storeSkuSyncInfo2Bare(sku)
if singleStoreHandler == nil {
stockList = append(stockList, bareSku)
} else {
if bareSku.Status == model.SkuStatusNormal && !dao.IsVendorThingIDEmpty(sku.VendorCatID) {
createList = append(createList, sku)
}
}
} else {
if model.IsSyncStatusUpdate(sku.StoreSkuSyncStatus) {
if singleStoreHandler != nil {
updateList = append(updateList, calVendorPrice4StoreSku(sku, storeDetail.PricePercentagePackObj, int(storeDetail.PricePercentage)))
}
} else {
if model.IsSyncStatusPrice(sku.StoreSkuSyncStatus) {
bareSku = storeSkuSyncInfo2Bare(calVendorPrice4StoreSku(sku, storeDetail.PricePercentagePackObj, int(storeDetail.PricePercentage)))
priceList = append(priceList, bareSku)
}
if model.IsSyncStatusSale(sku.StoreSkuSyncStatus) {
if bareSku == nil {
bareSku = storeSkuSyncInfo2Bare(sku)
}
statusList = append(statusList, bareSku)
}
}
}
if bareSku != nil {
skuMap[bareSku] = sku
}
}
if _, err = dao.BatchUpdateEntityByKV(db, updateItems); err != nil {
return err
}
bareSku2Sync := func(bareSkuList []*partner.StoreSkuInfo) (skuList []*dao.StoreSkuSyncInfo) {
if len(bareSkuList) > 0 {
skuList = make([]*dao.StoreSkuSyncInfo, len(bareSkuList))
for k, v := range bareSkuList {
skuList[k] = skuMap[v]
}
}
return skuList
}
task := tasksch.NewParallelTask("syncStoreSkuNew", tasksch.NewParallelConfig().SetParallelCount(1).SetIsContinueWhenError(isContinueWhenError), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
step := batchItemList[0].(int)
globals.SugarLogger.Debugf("step:%d", step)
switch step {
case 0:
if len(deleteList) > 0 {
_, err = putils.FreeBatchStoreSkuInfo(func(batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, err error) {
if err = singleStoreHandler.DeleteStoreSkus(ctx, storeID, vendorStoreID, batchedStoreSkuList); err == nil {
_, err = updateStoreSku(dao.GetDB(), vendorID, bareSku2Sync(batchedStoreSkuList), model.SyncFlagDeletedMask)
}
return nil, err
}, ctx, task, deleteList, singleStoreHandler.GetStoreSkusBatchSize(partner.FuncDeleteStoreSkus), isContinueWhenError)
}
case 1:
if len(createList) > 0 {
_, err = putils.FreeBatchStoreSkuSyncInfo(func(batchedStoreSkuList []*dao.StoreSkuSyncInfo) (result interface{}, err error) {
globals.SugarLogger.Debug(utils.Format4Output(batchedStoreSkuList, false))
if err = singleStoreHandler.CreateStoreSkus(ctx, storeID, vendorStoreID, batchedStoreSkuList); err == nil {
_, err = updateStoreSku(dao.GetDB(), vendorID, batchedStoreSkuList, model.SyncFlagNewMask)
}
return nil, err
}, ctx, task, createList, singleStoreHandler.GetStoreSkusBatchSize(partner.FuncCreateStoreSkus), isContinueWhenError)
}
case 2:
if len(updateList) > 0 {
_, err = putils.FreeBatchStoreSkuSyncInfo(func(batchedStoreSkuList []*dao.StoreSkuSyncInfo) (result interface{}, err error) {
if err = singleStoreHandler.UpdateStoreSkus(ctx, storeID, vendorStoreID, batchedStoreSkuList); err == nil {
_, err = updateStoreSku(dao.GetDB(), vendorID, batchedStoreSkuList, model.SyncFlagModifiedMask)
}
return nil, err
}, ctx, task, updateList, singleStoreHandler.GetStoreSkusBatchSize(partner.FuncUpdateStoreSkus), isContinueWhenError)
}
case 3:
if len(stockList) > 0 {
_, err = putils.FreeBatchStoreSkuInfo(func(batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, err error) {
if err = storeSkuHandler.UpdateStoreSkusStock(ctx, storeID, vendorStoreID, stockList); err == nil {
_, err = updateStoreSku(dao.GetDB(), vendorID, bareSku2Sync(batchedStoreSkuList), model.SyncFlagModifiedMask) // ?
}
return nil, err
}, ctx, task, stockList, storeSkuHandler.GetStoreSkusBatchSize(partner.FuncUpdateStoreSkusStock), isContinueWhenError)
}
case 4:
if len(statusList) > 0 {
_, err = putils.FreeBatchStoreSkuInfo(func(batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, err error) {
if err = storeSkuHandler.UpdateStoreSkusStatus(ctx, storeID, vendorStoreID, statusList); err == nil {
_, err = updateStoreSku(dao.GetDB(), vendorID, bareSku2Sync(batchedStoreSkuList), model.SyncFlagSaleMask)
}
return nil, err
}, ctx, task, statusList, storeSkuHandler.GetStoreSkusBatchSize(partner.FuncUpdateStoreSkusStatus), isContinueWhenError)
}
case 5:
if len(priceList) > 0 {
_, err = putils.FreeBatchStoreSkuInfo(func(batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, err error) {
if err = storeSkuHandler.UpdateStoreSkusPrice(ctx, storeID, vendorStoreID, priceList); err == nil {
_, err = updateStoreSku(dao.GetDB(), vendorID, bareSku2Sync(batchedStoreSkuList), model.SyncFlagPriceMask)
}
return nil, err
}, ctx, task, priceList, storeSkuHandler.GetStoreSkusBatchSize(partner.FuncUpdateStoreSkusPrice), isContinueWhenError)
}
}
return retVal, err
}, []int{0, 1, 2, 3, 4, 5})
tasksch.HandleTask(task, parentTask, true).Run()
_, err = task.GetResult(0)
return err
}
func PruneMissingStoreSkus(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorID, storeID int, vendorStoreID string, isAsync, isContinueWhenError bool) (hint string, err error) {
handler, _ := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler)
if handler == nil {
return "", fmt.Errorf("平台:%s不支持此操作", model.VendorChineseNames[vendorID])
}
db := dao.GetDB()
localSkuList, err := dao.GetStoreSkus2(db, vendorID, storeID, nil, false)
if err != nil {
return "", err
}
localSkuMap := make(map[int]*dao.StoreSkuSyncInfo)
for _, v := range localSkuList {
localSkuMap[v.SkuID] = v
}
var sku2Delete []*partner.StoreSkuInfo
task := tasksch.NewSeqTask(fmt.Sprintf("PruneMissingStoreSkus平台:%s", model.VendorChineseNames[vendorID]), ctx,
func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
switch step {
case 0:
remoteSkuList, err2 := handler.GetStoreSkusFullInfo(ctx, task, storeID, vendorStoreID, nil)
if err = err2; err == nil {
for _, v := range remoteSkuList {
if localSkuMap[v.SkuList[0].SkuID] == nil {
sku2Delete = append(sku2Delete, &partner.StoreSkuInfo{
SkuID: v.SkuList[0].SkuID,
VendorSkuID: v.SkuList[0].VendorSkuID,
})
}
}
}
case 1:
if len(sku2Delete) > 0 {
_, err = putils.FreeBatchStoreSkuInfo(func(batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, err error) {
err = handler.DeleteStoreSkus(ctx, storeID, vendorStoreID, sku2Delete)
return nil, err
}, ctx, parentTask, sku2Delete, handler.GetStoreSkusBatchSize(partner.FuncDeleteStoreSkus), isContinueWhenError)
}
}
return nil, err
}, 2)
tasksch.HandleTask(task, parentTask, true).Run()
if isAsync {
hint = task.GetID()
} else {
_, err = task.GetResult(0)
}
return hint, err
}
func ClearRemoteStoreStuffAndSetNew(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorID, storeID int, vendorStoreID string, isAsync, isContinueWhenError bool) (hint string, err error) {
userName := ctx.GetUserName()
globals.SugarLogger.Debugf("ClearRemoteStoreStuffAndSetNew storeID:%d, isContinueWhenError:%t, userName:%s", storeID, isContinueWhenError, userName)
handler, _ := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler)
if handler == nil {
return "", fmt.Errorf("平台:%s不支持此操作", model.VendorChineseNames[vendorID])
}
db := dao.GetDB()
var errDeleteSku error
rootTask := tasksch.NewSeqTask(fmt.Sprintf("ClearRemoteStoreStuffAndSetNew:%s", model.VendorChineseNames[vendorID]), ctx,
func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
switch step {
case 0:
err = handler.DeleteStoreAllSkus(ctx, rootTask, storeID, vendorStoreID, isContinueWhenError)
errDeleteSku = err
// 强制忽略删除SKU错误
if isContinueWhenError {
err = nil
}
if err == nil {
_, err = dao.SetStoreSkuSyncStatus(db, vendorID, []int{storeID}, nil, model.SyncFlagNewMask)
}
case 1:
if err = handler.DeleteStoreAllCategories(ctx, rootTask, storeID, vendorStoreID, isContinueWhenError); err == nil {
_, err = dao.SetStoreCategorySyncStatus(db, vendorID, []int{storeID}, nil, model.SyncFlagNewMask)
}
}
return nil, err
}, 2)
tasksch.AddChild(parentTask, rootTask).Run()
if !isAsync {
_, err = rootTask.GetResult(0)
}
if err == nil {
err = errDeleteSku
}
return rootTask.ID, err
}