671 lines
29 KiB
Go
671 lines
29 KiB
Go
package cms
|
||
|
||
import (
|
||
"errors"
|
||
"fmt"
|
||
"reflect"
|
||
|
||
"git.rosy.net.cn/baseapi/utils"
|
||
"git.rosy.net.cn/jx-callback/business/jxutils"
|
||
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
|
||
"git.rosy.net.cn/jx-callback/business/jxutils/tasksch"
|
||
"git.rosy.net.cn/jx-callback/business/model"
|
||
"git.rosy.net.cn/jx-callback/business/model/dao"
|
||
"git.rosy.net.cn/jx-callback/business/partner"
|
||
"git.rosy.net.cn/jx-callback/globals"
|
||
"git.rosy.net.cn/jx-callback/globals/api"
|
||
"git.rosy.net.cn/jx-callback/globals/refutil"
|
||
)
|
||
|
||
type LoopStoreMapInfo struct {
|
||
VendorID int
|
||
StoreMapList []*model.StoreMap
|
||
}
|
||
|
||
type VendorSync struct {
|
||
MultiStoreVendorIDs []int
|
||
SingleStoreVendorIDs []int
|
||
PurchaseHandlers map[int]partner.IPurchasePlatformHandler
|
||
}
|
||
|
||
type SyncError struct {
|
||
Original error `json:"original"`
|
||
Message string `json:"message"`
|
||
}
|
||
|
||
// 对于多门店平台接口的通用处理
|
||
type MultiStoreHandlerWrapper struct {
|
||
partner.IMultipleStoresHandler
|
||
}
|
||
|
||
// 对于单门店平台接口的通用处理
|
||
type SingleStoreHandlerWrapper struct {
|
||
partner.ISingleStoreHandler
|
||
}
|
||
|
||
var (
|
||
CurVendorSync VendorSync
|
||
)
|
||
|
||
var (
|
||
ErrHaveNotImplementedYet = errors.New("还没有实现")
|
||
ErrEntityNotExist = errors.New("找不到相应实体")
|
||
)
|
||
|
||
func Init() {
|
||
apiMap := map[int]interface{}{
|
||
model.VendorIDJD: api.JdAPI,
|
||
model.VendorIDELM: api.ElmAPI,
|
||
model.VendorIDEBAI: api.EbaiAPI,
|
||
model.VendorIDMTWM: api.MtwmAPI,
|
||
model.VendorIDWSC: api.WeimobAPI,
|
||
}
|
||
CurVendorSync.PurchaseHandlers = make(map[int]partner.IPurchasePlatformHandler)
|
||
for k, v := range partner.PurchasePlatformHandlers {
|
||
if !reflect.ValueOf(apiMap[k]).IsNil() {
|
||
if multiHandler, ok := v.(partner.IMultipleStoresHandler); ok {
|
||
CurVendorSync.MultiStoreVendorIDs = append(CurVendorSync.MultiStoreVendorIDs, k)
|
||
CurVendorSync.PurchaseHandlers[k] = &MultiStoreHandlerWrapper{
|
||
IMultipleStoresHandler: multiHandler,
|
||
}
|
||
} else if singleHandler, ok := v.(partner.ISingleStoreHandler); ok {
|
||
CurVendorSync.SingleStoreVendorIDs = append(CurVendorSync.SingleStoreVendorIDs, k)
|
||
CurVendorSync.PurchaseHandlers[k] = &SingleStoreHandlerWrapper{
|
||
ISingleStoreHandler: singleHandler,
|
||
}
|
||
} else {
|
||
panic(fmt.Sprintf("platform:%d type is wrong!", k))
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
func (p *MultiStoreHandlerWrapper) DeleteCategory(db *dao.DaoDB, cat *model.SkuCategory, userName string) (err error) {
|
||
if jxutils.IsEmptyID(cat.JdID) {
|
||
return nil
|
||
}
|
||
return p.IMultipleStoresHandler.DeleteCategory(db, cat, userName)
|
||
}
|
||
|
||
func (p *MultiStoreHandlerWrapper) UpdateCategory(db *dao.DaoDB, cat *model.SkuCategory, userName string) (err error) {
|
||
if jxutils.IsEmptyID(cat.JdID) {
|
||
globals.SugarLogger.Warnf("UpdateCategory fakeid cat:%s should not get here", utils.Format4Output(cat, true))
|
||
return nil
|
||
}
|
||
return p.IMultipleStoresHandler.UpdateCategory(db, cat, userName)
|
||
}
|
||
|
||
func (p *MultiStoreHandlerWrapper) DeleteSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error) {
|
||
globals.SugarLogger.Debugf("wrapper DeleteSku, sku:%s", utils.Format4Output(sku, false))
|
||
if jxutils.IsEmptyID(sku.JdID) {
|
||
return nil
|
||
}
|
||
return p.IMultipleStoresHandler.DeleteSku(db, sku, userName)
|
||
}
|
||
|
||
func (p *MultiStoreHandlerWrapper) UpdateSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error) {
|
||
if jxutils.IsEmptyID(sku.JdID) {
|
||
globals.SugarLogger.Warnf("UpdateSku fakeid sku:%s should not get here", utils.Format4Output(sku, true))
|
||
return nil
|
||
}
|
||
return p.IMultipleStoresHandler.UpdateSku(db, sku, userName)
|
||
}
|
||
|
||
func (v *VendorSync) GetStoreHandler(vendorID int) partner.IPurchasePlatformHandler {
|
||
return v.PurchaseHandlers[vendorID]
|
||
}
|
||
|
||
func (v *VendorSync) GetMultiStoreHandler(vendorID int) partner.IMultipleStoresHandler {
|
||
if handler, ok := v.PurchaseHandlers[vendorID].(partner.IMultipleStoresHandler); ok {
|
||
return handler
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (v *VendorSync) GetSingleStoreHandler(vendorID int) partner.ISingleStoreHandler {
|
||
if handler, ok := v.PurchaseHandlers[vendorID].(partner.ISingleStoreHandler); ok {
|
||
return handler
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (v *VendorSync) syncCategories(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorID int, db *dao.DaoDB, cats []*model.SkuCategory, userName string) (err error) {
|
||
multiStoresHandler := v.GetMultiStoreHandler(vendorID)
|
||
syncStatusFieldName := dao.GetSyncStatusStructField(model.VendorNames[vendorID])
|
||
task := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[vendorID]), nil, ctx,
|
||
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||
cat := batchItemList[0].(*model.SkuCategory)
|
||
updateFields := []string{syncStatusFieldName}
|
||
syncStatus := refutil.GetObjFieldByName(cat, syncStatusFieldName).(int8)
|
||
if (syncStatus & model.SyncFlagDeletedMask) != 0 { //删除
|
||
if (syncStatus & model.SyncFlagNewMask) == 0 {
|
||
err = multiStoresHandler.DeleteCategory(db, cat, userName)
|
||
}
|
||
} else if (syncStatus & model.SyncFlagNewMask) != 0 { // 新增
|
||
err = multiStoresHandler.CreateCategory(db, cat, userName)
|
||
updateFields = append(updateFields, dao.GetVendorThingIDStructField(model.VendorNames[multiStoresHandler.GetVendorID()]))
|
||
} else if (syncStatus & model.SyncFlagModifiedMask) != 0 { // 修改
|
||
err = multiStoresHandler.UpdateCategory(db, cat, userName)
|
||
if intErr, ok := err.(*utils.ErrorWithCode); ok && intErr.IntCode() == -3 {
|
||
err = nil
|
||
}
|
||
}
|
||
if err == nil {
|
||
refutil.SetObjFieldByName(cat, syncStatusFieldName, int8(0))
|
||
_, err = dao.UpdateEntity(db, cat, updateFields...)
|
||
}
|
||
return nil, err
|
||
}, cats)
|
||
tasksch.HandleTask(task, parentTask, false).Run()
|
||
_, err = task.GetResult(0)
|
||
return err
|
||
}
|
||
|
||
func (v *VendorSync) SyncCategory(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) {
|
||
globals.SugarLogger.Debug(v.MultiStoreVendorIDs)
|
||
hint, err = v.LoopMultiStoresVendors(ctx, db, fmt.Sprintf("同步分类信息:%d", categoryID), isAsync, false,
|
||
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||
vendorID := batchItemList[0].(int)
|
||
var cats []*model.SkuCategory
|
||
cond := make(map[string]interface{})
|
||
if categoryID > 0 {
|
||
cond[model.FieldID] = categoryID
|
||
} else {
|
||
cond[model.FieldLevel] = 1
|
||
}
|
||
err := dao.GetEntitiesByKV(db, &cats, cond, true)
|
||
if err == nil {
|
||
err = v.syncCategories(ctx, t, vendorID, db, cats, userName)
|
||
}
|
||
if err != nil || categoryID > 0 {
|
||
return nil, err
|
||
}
|
||
cond[model.FieldLevel] = 2
|
||
err = dao.GetEntitiesByKV(db, &cats, cond, true)
|
||
if err == nil {
|
||
err = v.syncCategories(ctx, t, vendorID, db, cats, userName)
|
||
}
|
||
return nil, err
|
||
})
|
||
return "", err
|
||
}
|
||
|
||
func (v *VendorSync) SyncReorderCategories(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) {
|
||
hint, err = v.LoopMultiStoresVendors(ctx, db, fmt.Sprintf("分类重排序:%d", categoryID), isAsync, false, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||
multiStoresHandler := v.GetMultiStoreHandler(batchItemList[0].(int))
|
||
err2 := multiStoresHandler.ReorderCategories(db, categoryID, userName)
|
||
if err2 == nil {
|
||
cat := &model.SkuCategory{}
|
||
_, err2 = dao.UpdateEntityByKV(db, cat, utils.Params2Map(dao.GetSyncStatusStructField(model.VendorNames[multiStoresHandler.GetVendorID()]), 0), utils.Params2Map(model.FieldParentID, categoryID))
|
||
return nil, err2
|
||
}
|
||
return nil, err2
|
||
})
|
||
return "", err
|
||
}
|
||
|
||
func (v *VendorSync) SyncStore(ctx *jxcontext.Context, db *dao.DaoDB, vendorID, storeID int, isAsync bool, userName string) (hint string, err error) {
|
||
globals.SugarLogger.Debugf("SyncStore, storeID:%d", storeID)
|
||
|
||
var vendorIDs []int
|
||
if vendorID != -1 {
|
||
vendorIDs = []int{
|
||
vendorID,
|
||
}
|
||
}
|
||
hint, err = v.LoopStoresMap(ctx, db, fmt.Sprintf("同步门店信息:%d", storeID), isAsync, false, vendorIDs, []int{storeID}, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (resultList interface{}, err error) {
|
||
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
|
||
handler := v.GetStoreHandler(loopMapInfo.VendorID)
|
||
if len(loopMapInfo.StoreMapList) > 1 {
|
||
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), nil, ctx,
|
||
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||
var resultList []interface{}
|
||
storeMap := batchItemList[0].(*model.StoreMap)
|
||
if err = handler.UpdateStore(db, storeMap.StoreID, userName); err == nil {
|
||
storeMap.SyncStatus = 0
|
||
_, err = dao.UpdateEntity(db, storeMap, model.FieldSyncStatus)
|
||
resultList = append(resultList, 1)
|
||
}
|
||
return resultList, err
|
||
}, loopMapInfo.StoreMapList)
|
||
t.AddChild(loopStoreTask).Run()
|
||
resultList, err = loopStoreTask.GetResult(0)
|
||
} else {
|
||
storeMap := loopMapInfo.StoreMapList[0]
|
||
if err = handler.UpdateStore(db, storeMap.StoreID, userName); err == nil {
|
||
storeMap.SyncStatus = 0
|
||
_, err = dao.UpdateEntity(db, storeMap, model.FieldSyncStatus)
|
||
}
|
||
if err == nil {
|
||
resultList = []interface{}{1}
|
||
}
|
||
}
|
||
return resultList, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
|
||
}, true)
|
||
return hint, makeSyncError(err)
|
||
}
|
||
|
||
func (v *VendorSync) SyncSku(ctx *jxcontext.Context, db *dao.DaoDB, nameID, skuID int, isAsync, isContinueWhenError bool, userName string) (hint string, err error) {
|
||
var (
|
||
nameIDs []int
|
||
skuIDs []int
|
||
)
|
||
if nameID != -1 {
|
||
nameIDs = []int{nameID}
|
||
}
|
||
if skuID != -1 {
|
||
skuIDs = []int{skuID}
|
||
}
|
||
return v.SyncSkus(ctx, db, nameIDs, skuIDs, isAsync, isContinueWhenError, userName)
|
||
}
|
||
|
||
func (v *VendorSync) SyncSkus(ctx *jxcontext.Context, db *dao.DaoDB, nameIDs []int, skuIDs []int, isAsync, isContinueWhenError bool, userName string) (hint string, err error) {
|
||
globals.SugarLogger.Debugf("SyncSku trackInfo:%s, nameIDs:%v, skuIDs:%v, userName:%s", ctx.GetTrackInfo(), nameIDs, skuIDs, userName)
|
||
isManagedIt := len(nameIDs) > 1 || len(skuIDs) > 1
|
||
return v.LoopMultiStoresVendors(ctx, db, fmt.Sprintf("同步商品信息, nameIDs:%v, skuIDs:%v", nameIDs, skuIDs), isAsync, isManagedIt,
|
||
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||
var resultList []interface{}
|
||
vendorID := batchItemList[0].(int)
|
||
multiStoresHandler := v.GetMultiStoreHandler(vendorID)
|
||
syncStatusFieldName := dao.GetSyncStatusStructField(model.VendorNames[multiStoresHandler.GetVendorID()])
|
||
dbField := dao.ConvertDBFieldPrefix(model.VendorNames[multiStoresHandler.GetVendorID()])
|
||
skuMap := make(map[int]bool)
|
||
sql := fmt.Sprintf(`
|
||
SELECT DISTINCT t2.*
|
||
FROM sku t1
|
||
JOIN sku_name t2 ON t2.id = t1.name_id
|
||
WHERE t1.%s_sync_status <> 0
|
||
`, dbField)
|
||
sqlParams := []interface{}{}
|
||
if len(nameIDs) > 1 {
|
||
sql += " AND t1.name_id IN (" + dao.GenQuestionMarks(len(nameIDs)) + ")"
|
||
sqlParams = append(sqlParams, nameIDs)
|
||
} else if len(nameIDs) == 1 {
|
||
sql += " AND t1.name_id = ? "
|
||
sqlParams = append(sqlParams, nameIDs[0])
|
||
}
|
||
if len(skuIDs) > 0 {
|
||
sql += " AND t1.id IN(" + dao.GenQuestionMarks(len(skuIDs)) + ")"
|
||
sqlParams = append(sqlParams, skuIDs)
|
||
|
||
} else if len(skuIDs) == 1 {
|
||
sql += " AND t1.id = ? "
|
||
sqlParams = append(sqlParams, skuIDs[0])
|
||
}
|
||
for _, v := range skuIDs {
|
||
skuMap[v] = true
|
||
}
|
||
sql += " ORDER BY t2.id"
|
||
|
||
var skuNameList []*model.SkuName
|
||
err := dao.GetRows(db, &skuNameList, sql, sqlParams...)
|
||
if err == nil && len(skuNameList) > 0 {
|
||
// todo 同一skuName下的sku顺序处理的原因是京东SPU特殊类型必须要序列化同步才能正常处理, db可能会有多线程问题
|
||
task := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[vendorID]), tasksch.NewParallelConfig().SetParallelCount(10).SetIsContinueWhenError(isContinueWhenError), ctx,
|
||
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||
var resultList []interface{}
|
||
skuName := batchItemList[0].(*model.SkuName)
|
||
var skuList []*model.Sku
|
||
if err = dao.GetRows(db, &skuList, fmt.Sprintf(`
|
||
SELECT *
|
||
FROM sku
|
||
WHERE name_id = ? AND %s_sync_status <> 0
|
||
ORDER BY IF(spec_unit IN('kg', 'L'), 1000, 1) * spec_quality
|
||
`, dbField), skuName.ID); err == nil && len(skuList) > 0 {
|
||
for _, sku := range skuList {
|
||
syncStatus := refutil.GetObjFieldByName(sku, syncStatusFieldName).(int8)
|
||
globals.SugarLogger.Debugf("SyncSku trackInfo:%s, skuID:%d, syncStatus:%d", ctx.GetTrackInfo(), sku.ID, syncStatus)
|
||
if (len(skuIDs) == 0 || skuMap[sku.ID]) && (syncStatus != 0) {
|
||
updateFields := []string{syncStatusFieldName}
|
||
if syncStatus&model.SyncFlagDeletedMask != 0 { // 删除
|
||
if syncStatus&model.SyncFlagNewMask == 0 {
|
||
err = multiStoresHandler.DeleteSku(db, sku, userName)
|
||
}
|
||
} else if syncStatus&model.SyncFlagNewMask != 0 { // 新增
|
||
if err = multiStoresHandler.CreateSku(db, sku, userName); err == nil {
|
||
var tmpStruct struct {
|
||
MaxIndex int
|
||
}
|
||
// todo hard code 得到京东spu中sku的顺序(以方便以后修改销售属性),这个必须要每次重新从数据库取
|
||
if dao.GetRow(db, &tmpStruct, "SELECT MAX(sku_index) max_index FROM sku WHERE name_id = ? AND jd_id > 0 AND jd_id < 4024012631406 ", sku.NameID) == nil {
|
||
sku.SkuIndex = tmpStruct.MaxIndex + 1
|
||
updateFields = append(updateFields, "SkuIndex")
|
||
}
|
||
updateFields = append(updateFields, dao.GetVendorThingIDStructField(model.VendorNames[multiStoresHandler.GetVendorID()]))
|
||
}
|
||
} else if syncStatus&model.SyncFlagModifiedMask != 0 { // 修改
|
||
err = multiStoresHandler.UpdateSku(db, sku, userName)
|
||
}
|
||
if err == nil {
|
||
refutil.SetObjFieldByName(sku, syncStatusFieldName, int8(0))
|
||
if _, err = dao.UpdateEntity(db, sku, updateFields...); err != nil {
|
||
break
|
||
} else {
|
||
resultList = append(resultList, 1)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
if err == nil {
|
||
refutil.SetObjFieldByName(skuName, syncStatusFieldName, int8(0))
|
||
_, err = dao.UpdateEntity(db, skuName, syncStatusFieldName)
|
||
}
|
||
return resultList, err
|
||
}, skuNameList)
|
||
t.AddChild(task).Run()
|
||
result, err2 := task.GetResult(0)
|
||
if err = err2; err == nil {
|
||
resultList = append(resultList, result...)
|
||
}
|
||
}
|
||
return resultList, err
|
||
})
|
||
}
|
||
|
||
func (v *VendorSync) SyncStoresCategory(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isForce, isAsync, isContinueWhenError bool) (hint string, err error) {
|
||
globals.SugarLogger.Debug("SyncStoresCategory")
|
||
isManageIt := len(storeIDs) != 1
|
||
hint, err = v.LoopStoresMap(ctx, db, fmt.Sprintf("同步门店分类信息:%v", storeIDs), isAsync, isManageIt, vendorIDs, storeIDs,
|
||
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
|
||
if handler := v.GetSingleStoreHandler(loopMapInfo.VendorID); handler != nil {
|
||
if isForce {
|
||
dao.SetStoreCategorySyncStatus(db, loopMapInfo.VendorID, storeIDs, nil, model.SyncFlagModifiedMask)
|
||
}
|
||
if len(loopMapInfo.StoreMapList) > 1 {
|
||
loopStoreTask := tasksch.NewSeqTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), ctx,
|
||
func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
|
||
storeID := loopMapInfo.StoreMapList[step].StoreID
|
||
_, err = handler.SyncStoreCategory(ctx, task, storeID, false)
|
||
return nil, err
|
||
}, len(loopMapInfo.StoreMapList))
|
||
t.AddChild(loopStoreTask).Run()
|
||
_, err = loopStoreTask.GetResult(0)
|
||
} else {
|
||
_, err = handler.SyncStoreCategory(ctx, t, loopMapInfo.StoreMapList[0].StoreID, false)
|
||
}
|
||
}
|
||
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
|
||
}, isContinueWhenError)
|
||
return hint, makeSyncError(err)
|
||
}
|
||
|
||
//
|
||
func (v *VendorSync) SyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, skuIDs []int, isForce, isAsync, isContinueWhenError bool) (hint string, err error) {
|
||
globals.SugarLogger.Debug("SyncStoresSkus")
|
||
isManageIt := isAsync || len(storeIDs) != 1 || len(skuIDs) == 0 || len(skuIDs) > 8
|
||
task, hint, err := v.LoopStoresMap2(ctx, db, fmt.Sprintf("同步门店商品信息:%v", storeIDs), isAsync, isManageIt, vendorIDs, storeIDs,
|
||
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
|
||
if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil {
|
||
if isForce {
|
||
dao.SetStoreSkuSyncStatus(db, loopMapInfo.VendorID, storeIDs, skuIDs, model.SyncFlagStoreSkuModifiedMask)
|
||
}
|
||
parallelCount := 5
|
||
if model.MultiStoresVendorMap[loopMapInfo.VendorID] == 1 {
|
||
parallelCount = 1
|
||
}
|
||
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]),
|
||
tasksch.NewParallelConfig().SetParallelCount(parallelCount).SetIsContinueWhenError(isContinueWhenError), ctx,
|
||
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||
storeMap := batchItemList[0].(*model.StoreMap)
|
||
if _, err = v.proxySyncStoreSku(ctx, task, storeMap, nil, skuIDs, false, isContinueWhenError); err != nil {
|
||
globals.SugarLogger.Debugf("SyncStoresSkus failed2 store:%d failed with error:%v", storeMap.StoreID, err)
|
||
}
|
||
return nil, err
|
||
}, loopMapInfo.StoreMapList)
|
||
t.AddChild(loopStoreTask).Run()
|
||
_, err = loopStoreTask.GetResult(0)
|
||
}
|
||
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
|
||
}, isContinueWhenError)
|
||
if task != nil {
|
||
// if vendorErr := partner.IsErrChangePriceFailed(task.GetOriginalErr()); vendorErr != nil {
|
||
// platformList := make([]string, len(task.GetDetailErrList()))
|
||
// for k, v := range task.GetDetailErrList() {
|
||
// if vendorErr := partner.IsErrVendorError(v); vendorErr != nil {
|
||
// platformList[k] = model.VendorChineseNames[vendorErr.VendorID()]
|
||
// } else {
|
||
// platformList[k] = "未知"
|
||
// }
|
||
// }
|
||
// err = fmt.Errorf("同步价格失败\n失败平台:%s", strings.Join(platformList, ","))
|
||
// } else {
|
||
// }
|
||
err = makeSyncError(err)
|
||
}
|
||
return hint, err
|
||
}
|
||
|
||
func isUseOldSyncLogic(storeMap *model.StoreMap) bool {
|
||
return false
|
||
return globals.IsProductEnv() && storeMap.StoreID != 102652 // 绿城四季鲜店
|
||
}
|
||
|
||
func (v *VendorSync) proxySyncStoreSku(ctx *jxcontext.Context, parentTask tasksch.ITask, storeMap *model.StoreMap, nameIDs, skuIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
|
||
if isUseOldSyncLogic(storeMap) {
|
||
return v.GetStoreHandler(storeMap.VendorID).SyncStoreSkus(ctx, parentTask, storeMap.StoreID, skuIDs, isAsync, isContinueWhenError)
|
||
}
|
||
return SyncStoreSkuNew(ctx, parentTask, storeMap.VendorID, storeMap.StoreID, storeMap.VendorStoreID, nil, skuIDs, isAsync, isContinueWhenError)
|
||
}
|
||
|
||
func (v *VendorSync) proxyFullSyncStoreSku(ctx *jxcontext.Context, parentTask tasksch.ITask, storeMap *model.StoreMap, isAsync, isContinueWhenError bool) (hint string, err error) {
|
||
if isUseOldSyncLogic(storeMap) {
|
||
return v.GetStoreHandler(storeMap.VendorID).FullSyncStoreSkus(ctx, parentTask, storeMap.StoreID, isAsync, isContinueWhenError)
|
||
}
|
||
return FullSyncStoreSkuNew(ctx, parentTask, storeMap.VendorID, storeMap.StoreID, storeMap.VendorStoreID, isAsync, isContinueWhenError)
|
||
}
|
||
|
||
func (v *VendorSync) FullSyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
|
||
globals.SugarLogger.Debug("FullSyncStoresSkus")
|
||
hint, err = v.LoopStoresMap(ctx, db, fmt.Sprintf("初始化门店商品信息:%v", storeIDs), isAsync, true, vendorIDs, storeIDs,
|
||
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
|
||
if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil {
|
||
parallelCount := 5
|
||
if model.MultiStoresVendorMap[loopMapInfo.VendorID] == 1 {
|
||
parallelCount = 1
|
||
}
|
||
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]),
|
||
tasksch.NewParallelConfig().SetParallelCount(parallelCount).SetIsContinueWhenError(isContinueWhenError), ctx,
|
||
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||
storeMap := batchItemList[0].(*model.StoreMap)
|
||
if _, err = v.proxyFullSyncStoreSku(ctx, task, storeMap, false, isContinueWhenError); err != nil {
|
||
globals.SugarLogger.Debugf("FullSyncStoresSkus failed2 store:%d failed with error:%v", storeMap.StoreID, err)
|
||
}
|
||
return nil, err
|
||
}, loopMapInfo.StoreMapList)
|
||
t.AddChild(loopStoreTask).Run()
|
||
_, err = loopStoreTask.GetResult(0)
|
||
}
|
||
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
|
||
}, isContinueWhenError)
|
||
return hint, makeSyncError(err)
|
||
}
|
||
|
||
func (v *VendorSync) DeleteRemoteStoreSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
|
||
globals.SugarLogger.Debug("DeleteRemoteStoreSkus")
|
||
hint, err = v.LoopStoresMap(ctx, db, fmt.Sprintf("删除远程门店商品信息:%v", storeIDs), isAsync, true, vendorIDs, storeIDs,
|
||
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
|
||
if len(loopMapInfo.StoreMapList) > 1 {
|
||
loopStoreTask := tasksch.NewSeqTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), ctx,
|
||
func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
|
||
storeMap := loopMapInfo.StoreMapList[step]
|
||
_, err = ClearRemoteStoreStuffAndSetNew(ctx, task, storeMap.VendorID, storeMap.StoreID, storeMap.VendorStoreID, false, isContinueWhenError)
|
||
return nil, err
|
||
}, len(loopMapInfo.StoreMapList))
|
||
t.AddChild(loopStoreTask).Run()
|
||
_, err = loopStoreTask.GetResult(0)
|
||
} else {
|
||
_, err = ClearRemoteStoreStuffAndSetNew(ctx, t, loopMapInfo.StoreMapList[0].VendorID, loopMapInfo.StoreMapList[0].StoreID, loopMapInfo.StoreMapList[0].VendorStoreID, false, isContinueWhenError)
|
||
}
|
||
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
|
||
}, isContinueWhenError)
|
||
return hint, makeSyncError(err)
|
||
}
|
||
|
||
// 将平台有,但本地没有的门店商品清除
|
||
// todo,京东到家也应该支持
|
||
func (v *VendorSync) PruneMissingStoreSkus(ctx *jxcontext.Context, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
|
||
globals.SugarLogger.Debug("PruneMissingStoreSkus")
|
||
hint, err = v.LoopStoresMap(ctx, dao.GetDB(), fmt.Sprintf("删除远程无关联的门店商品信息:%v", storeIDs), isAsync, true, vendorIDs, storeIDs,
|
||
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
|
||
if len(loopMapInfo.StoreMapList) > 1 {
|
||
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError), ctx,
|
||
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||
storeMap := batchItemList[0].(*model.StoreMap)
|
||
_, err = PruneMissingStoreSkus(ctx, task, loopMapInfo.VendorID, storeMap.StoreID, storeMap.VendorStoreID, false, isContinueWhenError)
|
||
return nil, err
|
||
}, loopMapInfo.StoreMapList)
|
||
t.AddChild(loopStoreTask).Run()
|
||
_, err = loopStoreTask.GetResult(0)
|
||
} else {
|
||
_, err = PruneMissingStoreSkus(ctx, t, loopMapInfo.VendorID, loopMapInfo.StoreMapList[0].StoreID, loopMapInfo.StoreMapList[0].VendorStoreID, false, isContinueWhenError)
|
||
}
|
||
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
|
||
}, isContinueWhenError)
|
||
return hint, makeSyncError(err)
|
||
}
|
||
|
||
func (v *VendorSync) LoopStoresMap2(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync, isManageIt bool, vendorIDs []int, storeIDs []int, handler tasksch.WorkFunc, isContinueWhenError bool) (task tasksch.ITask, hint string, err error) {
|
||
var storeMapList []*model.StoreMap
|
||
if storeMapList, err = dao.GetStoresMapList(db, vendorIDs, storeIDs, model.StoreStatusAll, model.StoreIsSyncYes, ""); err != nil {
|
||
return nil, "", err
|
||
}
|
||
if len(storeMapList) == 0 {
|
||
return nil, "", nil
|
||
}
|
||
vendorStoreMap := make(map[int][]*model.StoreMap)
|
||
for _, v := range storeMapList {
|
||
vendorStoreMap[v.VendorID] = append(vendorStoreMap[v.VendorID], v)
|
||
}
|
||
loopInfoList := make([]*LoopStoreMapInfo, len(vendorStoreMap))
|
||
index := 0
|
||
for k, v := range vendorStoreMap {
|
||
loopInfoList[index] = &LoopStoreMapInfo{
|
||
VendorID: k,
|
||
StoreMapList: v,
|
||
}
|
||
index++
|
||
}
|
||
if len(loopInfoList) == 1 {
|
||
taskName = fmt.Sprintf("%s,处理平台%s", taskName, model.VendorChineseNames[loopInfoList[0].VendorID])
|
||
}
|
||
task = tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, handler, loopInfoList)
|
||
tasksch.HandleTask(task, nil, isManageIt).Run()
|
||
if !isAsync {
|
||
resultList, err2 := task.GetResult(0)
|
||
if err = err2; err == nil {
|
||
if len(resultList) == 0 {
|
||
hint = "1" // todo 暂时这样
|
||
} else {
|
||
hint = jxutils.TaskResult2Hint(resultList)
|
||
}
|
||
}
|
||
} else {
|
||
hint = task.GetID()
|
||
}
|
||
return task, hint, err
|
||
}
|
||
|
||
func (v *VendorSync) LoopStoresMap(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync, isManageIt bool, vendorIDs []int, storeIDs []int, handler tasksch.WorkFunc, isContinueWhenError bool) (hint string, err error) {
|
||
_, hint, err = v.LoopStoresMap2(ctx, db, taskName, isAsync, isManageIt, vendorIDs, storeIDs, handler, isContinueWhenError)
|
||
return hint, err
|
||
}
|
||
|
||
func (v *VendorSync) LoopMultiStoresVendors(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync bool, isManageIt bool, handler tasksch.WorkFunc) (hint string, err error) {
|
||
task := tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, handler, v.MultiStoreVendorIDs)
|
||
tasksch.HandleTask(task, nil, isManageIt).Run()
|
||
if !isAsync {
|
||
result, err2 := task.GetResult(0)
|
||
if err = err2; err == nil {
|
||
hint = utils.Int2Str(len(result))
|
||
}
|
||
} else {
|
||
hint = task.ID
|
||
}
|
||
return hint, makeSyncError(err)
|
||
}
|
||
|
||
func (v *VendorSync) RefreshAllSkusID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int, storeIDs []int) (hint string, err error) {
|
||
task := tasksch.NewParallelTask("RefreshAllSkusID", nil, ctx,
|
||
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||
vendorID := batchItemList[0].(int)
|
||
if handler := v.GetStoreHandler(vendorID); handler != nil {
|
||
if multiHandler, ok := handler.(partner.IMultipleStoresHandler); ok {
|
||
_, err = multiHandler.RefreshAllSkusID(ctx, task, false)
|
||
} else if singleHandler, ok := handler.(partner.ISingleStoreHandler); ok {
|
||
_, err = singleHandler.RefreshStoresAllSkusID(ctx, task, false, storeIDs)
|
||
}
|
||
}
|
||
return nil, err
|
||
}, vendorIDs)
|
||
tasksch.HandleTask(task, nil, true).Run()
|
||
if !isAsync {
|
||
_, err = task.GetResult(0)
|
||
}
|
||
return task.ID, err
|
||
}
|
||
|
||
func (v *VendorSync) RefreshAllStoresID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int) (hint string, err error) {
|
||
task := tasksch.NewParallelTask("RefreshAllStoresID", nil, ctx,
|
||
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||
vendorID := batchItemList[0].(int)
|
||
if handler := v.GetStoreHandler(vendorID); handler != nil {
|
||
_, err = handler.RefreshAllStoresID(ctx, task, false)
|
||
}
|
||
return nil, err
|
||
}, vendorIDs)
|
||
tasksch.HandleTask(task, nil, true).Run()
|
||
if !isAsync {
|
||
_, err = task.GetResult(0)
|
||
}
|
||
return task.ID, err
|
||
}
|
||
|
||
func makeSyncError(err error) (newErr error) {
|
||
if err != nil {
|
||
if _, ok := err.(*SyncError); !ok {
|
||
return &SyncError{
|
||
Original: err,
|
||
}
|
||
}
|
||
}
|
||
return err
|
||
}
|
||
|
||
func (e *SyncError) Error() string {
|
||
return fmt.Sprintf("本地数据修改成功,但同步失败,请根据错误提示处理!,同步错误信息:%s", e.Original.Error())
|
||
}
|
||
|
||
func isSyncError(err error) bool {
|
||
_, ok := err.(*SyncError)
|
||
return ok
|
||
}
|
||
|
||
func (v *VendorSync) SyncSkuNames(ctx *jxcontext.Context, nameIDs []int, isForce, isAsync, isContinueWhenError bool) (hint string, err error) {
|
||
db := dao.GetDB()
|
||
if isForce {
|
||
sql := `
|
||
UPDATE sku t1
|
||
SET t1.jd_sync_status = t1.jd_sync_status | ?
|
||
WHERE t1.deleted_at = ?
|
||
`
|
||
sqlParams := []interface{}{
|
||
model.SyncFlagModifiedMask,
|
||
utils.DefaultTimeValue,
|
||
}
|
||
if len(nameIDs) > 0 {
|
||
sql += " AND t1.name_id IN(" + dao.GenQuestionMarks(len(nameIDs)) + ")"
|
||
sqlParams = append(sqlParams, nameIDs)
|
||
}
|
||
if _, err = dao.ExecuteSQL(db, sql, sqlParams...); err != nil {
|
||
return "", err
|
||
}
|
||
}
|
||
return v.SyncSkus(ctx, db, nameIDs, nil, isAsync, isContinueWhenError, ctx.GetUserName())
|
||
}
|