Files
jx-callback/business/jxstore/cms/sync.go
2019-04-25 08:54:40 +08:00

593 lines
24 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package cms
import (
"errors"
"fmt"
"reflect"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxutils"
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
"git.rosy.net.cn/jx-callback/business/jxutils/tasksch"
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/model/dao"
"git.rosy.net.cn/jx-callback/business/partner"
"git.rosy.net.cn/jx-callback/globals"
"git.rosy.net.cn/jx-callback/globals/api"
"git.rosy.net.cn/jx-callback/globals/refutil"
)
type LoopStoreMapInfo struct {
VendorID int
StoreMapList []*model.StoreMap
}
type VendorSync struct {
MultiStoreVendorIDs []int
SingleStoreVendorIDs []int
PurchaseHandlers map[int]partner.IPurchasePlatformHandler
}
type SyncError struct {
Original error `json:"original"`
Message string `json:"message"`
}
// 对于多门店平台接口的通用处理
type MultiStoreHandlerWrapper struct {
partner.IMultipleStoresHandler
}
// 对于单门店平台接口的通用处理
type SingleStoreHandlerWrapper struct {
partner.ISingleStoreHandler
}
var (
CurVendorSync VendorSync
)
var (
ErrHaveNotImplementedYet = errors.New("还没有实现")
ErrEntityNotExist = errors.New("找不到相应实体")
)
func Init() {
apiMap := map[int]interface{}{
model.VendorIDJD: api.JdAPI,
model.VendorIDELM: api.ElmAPI,
model.VendorIDEBAI: api.EbaiAPI,
model.VendorIDMTWM: api.MtwmAPI,
model.VendorIDWSC: api.WeimobAPI,
}
CurVendorSync.PurchaseHandlers = make(map[int]partner.IPurchasePlatformHandler)
for k, v := range partner.PurchasePlatformHandlers {
if !reflect.ValueOf(apiMap[k]).IsNil() {
if multiHandler, ok := v.(partner.IMultipleStoresHandler); ok {
CurVendorSync.MultiStoreVendorIDs = append(CurVendorSync.MultiStoreVendorIDs, k)
CurVendorSync.PurchaseHandlers[k] = &MultiStoreHandlerWrapper{
IMultipleStoresHandler: multiHandler,
}
} else if singleHandler, ok := v.(partner.ISingleStoreHandler); ok {
CurVendorSync.SingleStoreVendorIDs = append(CurVendorSync.SingleStoreVendorIDs, k)
CurVendorSync.PurchaseHandlers[k] = &SingleStoreHandlerWrapper{
ISingleStoreHandler: singleHandler,
}
} else {
panic(fmt.Sprintf("platform:%d type is wrong!", k))
}
}
}
}
func (p *MultiStoreHandlerWrapper) DeleteCategory(db *dao.DaoDB, cat *model.SkuCategory, userName string) (err error) {
if jxutils.IsFakeID(cat.JdID) {
return nil
}
return p.IMultipleStoresHandler.DeleteCategory(db, cat, userName)
}
func (p *MultiStoreHandlerWrapper) UpdateCategory(db *dao.DaoDB, cat *model.SkuCategory, userName string) (err error) {
if jxutils.IsFakeID(cat.JdID) {
globals.SugarLogger.Warnf("UpdateCategory fakeid cat:%s should not get here", utils.Format4Output(cat, true))
return nil
}
return p.IMultipleStoresHandler.UpdateCategory(db, cat, userName)
}
func (p *MultiStoreHandlerWrapper) DeleteSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error) {
globals.SugarLogger.Debugf("wrapper DeleteSku, sku:%s", utils.Format4Output(sku, false))
if jxutils.IsFakeID(sku.JdID) {
return nil
}
return p.IMultipleStoresHandler.DeleteSku(db, sku, userName)
}
func (p *MultiStoreHandlerWrapper) UpdateSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error) {
if jxutils.IsFakeID(sku.JdID) {
globals.SugarLogger.Warnf("UpdateSku fakeid sku:%s should not get here", utils.Format4Output(sku, true))
return nil
}
return p.IMultipleStoresHandler.UpdateSku(db, sku, userName)
}
func (v *VendorSync) GetStoreHandler(vendorID int) partner.IPurchasePlatformHandler {
return v.PurchaseHandlers[vendorID]
}
func (v *VendorSync) GetMultiStoreHandler(vendorID int) partner.IMultipleStoresHandler {
if handler, ok := v.PurchaseHandlers[vendorID].(partner.IMultipleStoresHandler); ok {
return handler
}
return nil
}
func (v *VendorSync) GetSingleStoreHandler(vendorID int) partner.ISingleStoreHandler {
if handler, ok := v.PurchaseHandlers[vendorID].(partner.ISingleStoreHandler); ok {
return handler
}
return nil
}
func (v *VendorSync) syncCategories(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorID int, db *dao.DaoDB, cats []*model.SkuCategory, userName string) (err error) {
multiStoresHandler := v.GetMultiStoreHandler(vendorID)
syncStatusFieldName := dao.GetSyncStatusStructField(model.VendorNames[vendorID])
task := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[vendorID]), nil, ctx,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
cat := batchItemList[0].(*model.SkuCategory)
updateFields := []string{syncStatusFieldName}
syncStatus := refutil.GetObjFieldByName(cat, syncStatusFieldName).(int8)
if (syncStatus & model.SyncFlagDeletedMask) != 0 { //删除
if (syncStatus & model.SyncFlagNewMask) == 0 {
err = multiStoresHandler.DeleteCategory(db, cat, userName)
}
} else if (syncStatus & model.SyncFlagNewMask) != 0 { // 新增
err = multiStoresHandler.CreateCategory(db, cat, userName)
updateFields = append(updateFields, dao.GetVendorThingIDStructField(model.VendorNames[multiStoresHandler.GetVendorID()]))
} else if (syncStatus & model.SyncFlagModifiedMask) != 0 { // 修改
err = multiStoresHandler.UpdateCategory(db, cat, userName)
if intErr, ok := err.(*utils.ErrorWithCode); ok && intErr.IntCode() == -3 {
err = nil
}
}
if err == nil {
refutil.SetObjFieldByName(cat, syncStatusFieldName, int8(0))
_, err = dao.UpdateEntity(db, cat, updateFields...)
}
return nil, err
}, cats)
tasksch.HandleTask(task, parentTask, false).Run()
_, err = task.GetResult(0)
return err
}
func (v *VendorSync) SyncCategory(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) {
globals.SugarLogger.Debug(v.MultiStoreVendorIDs)
hint, err = v.LoopMultiStoresVendors(ctx, db, fmt.Sprintf("同步分类信息:%d", categoryID), isAsync, userName,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
vendorID := batchItemList[0].(int)
var cats []*model.SkuCategory
cond := make(map[string]interface{})
if categoryID > 0 {
cond[model.FieldID] = categoryID
} else {
cond[model.FieldLevel] = 1
}
err := dao.GetEntitiesByKV(db, &cats, cond, true)
if err == nil {
err = v.syncCategories(ctx, t, vendorID, db, cats, userName)
}
if err != nil || categoryID > 0 {
return nil, err
}
cond[model.FieldLevel] = 2
err = dao.GetEntitiesByKV(db, &cats, cond, true)
if err == nil {
err = v.syncCategories(ctx, t, vendorID, db, cats, userName)
}
return nil, err
})
return "", err
}
func (v *VendorSync) SyncReorderCategories(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) {
hint, err = v.LoopMultiStoresVendors(ctx, db, fmt.Sprintf("分类重排序:%d", categoryID), isAsync, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
multiStoresHandler := v.GetMultiStoreHandler(batchItemList[0].(int))
err2 := multiStoresHandler.ReorderCategories(db, categoryID, userName)
if err2 == nil {
cat := &model.SkuCategory{}
_, err2 = dao.UpdateEntityByKV(db, cat, utils.Params2Map(dao.GetSyncStatusStructField(model.VendorNames[multiStoresHandler.GetVendorID()]), 0), utils.Params2Map(model.FieldParentID, categoryID))
return nil, err2
}
return nil, err2
})
return "", err
}
func (v *VendorSync) SyncStore(ctx *jxcontext.Context, db *dao.DaoDB, vendorID, storeID int, isAsync bool, userName string) (hint string, err error) {
globals.SugarLogger.Debugf("SyncStore, storeID:%d", storeID)
var vendorIDs []int
if vendorID != -1 {
vendorIDs = []int{
vendorID,
}
}
hint, err = v.LoopStoresMap(ctx, db, fmt.Sprintf("同步门店信息:%d", storeID), isAsync, false, vendorIDs, []int{storeID}, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
handler := v.GetStoreHandler(loopMapInfo.VendorID)
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), nil, ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
storeMap := batchItemList[0].(*model.StoreMap)
if err = handler.UpdateStore(db, storeMap.StoreID, userName); err == nil {
storeMap.SyncStatus = 0
_, err = dao.UpdateEntity(db, storeMap, model.FieldSyncStatus)
}
return nil, err
}, loopMapInfo.StoreMapList)
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
return nil, err
}
storeMap := loopMapInfo.StoreMapList[0]
if err = handler.UpdateStore(db, storeMap.StoreID, userName); err == nil {
storeMap.SyncStatus = 0
_, err = dao.UpdateEntity(db, storeMap, model.FieldSyncStatus)
}
err = jxutils.AddVendorInfo2Err(err, loopMapInfo.VendorID)
return nil, err
})
return hint, err
}
func (v *VendorSync) SyncSku(ctx *jxcontext.Context, db *dao.DaoDB, nameID, skuID int, isAsync, isContinueWhenError bool, userName string) (hint string, err error) {
var (
nameIDs []int
skuIDs []int
)
if nameID != -1 {
nameIDs = []int{nameID}
}
if skuID != -1 {
skuIDs = []int{skuID}
}
return v.SyncSkus(ctx, db, nameIDs, skuIDs, isAsync, isContinueWhenError, userName)
}
func (v *VendorSync) SyncSkus(ctx *jxcontext.Context, db *dao.DaoDB, nameIDs []int, skuIDs []int, isAsync, isContinueWhenError bool, userName string) (hint string, err error) {
globals.SugarLogger.Debugf("SyncSku trackInfo:%s, nameIDs:%v, skuIDs:%v, userName:%s", ctx.GetTrackInfo(), nameIDs, skuIDs, userName)
return v.LoopMultiStoresVendors(ctx, db, fmt.Sprintf("同步商品信息, nameIDs:%v, skuIDs:%v", nameIDs, skuIDs), isAsync, userName,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
vendorID := batchItemList[0].(int)
multiStoresHandler := v.GetMultiStoreHandler(vendorID)
syncStatusFieldName := dao.GetSyncStatusStructField(model.VendorNames[multiStoresHandler.GetVendorID()])
dbField := dao.ConvertDBFieldPrefix(model.VendorNames[multiStoresHandler.GetVendorID()])
skuMap := make(map[int]bool)
sql := fmt.Sprintf(`
SELECT DISTINCT t2.*
FROM sku t1
JOIN sku_name t2 ON t2.id = t1.name_id
WHERE t1.%s_sync_status <> 0
`, dbField)
sqlParams := []interface{}{}
if len(nameIDs) > 1 {
sql += " AND t1.name_id IN (" + dao.GenQuestionMarks(len(nameIDs)) + ")"
sqlParams = append(sqlParams, nameIDs)
} else if len(nameIDs) == 1 {
sql += " AND t1.name_id = ? "
sqlParams = append(sqlParams, nameIDs[0])
}
if len(skuIDs) > 0 {
sql += " AND t1.id IN(" + dao.GenQuestionMarks(len(skuIDs)) + ")"
sqlParams = append(sqlParams, skuIDs)
} else if len(skuIDs) == 1 {
sql += " AND t1.id = ? "
sqlParams = append(sqlParams, skuIDs[0])
}
for _, v := range skuIDs {
skuMap[v] = true
}
sql += " ORDER BY t2.id"
var skuNameList []*model.SkuName
err := dao.GetRows(db, &skuNameList, sql, sqlParams...)
if err == nil && len(skuNameList) > 0 {
// todo 同一skuName下的sku顺序处理的原因是京东SPU特殊类型必须要序列化同步才能正常处理, db可能会有多线程问题
task := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[vendorID]), tasksch.NewParallelConfig().SetParallelCount(10).SetIsContinueWhenError(isContinueWhenError), ctx,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
skuName := batchItemList[0].(*model.SkuName)
var skuList []*model.Sku
if err = dao.GetRows(db, &skuList, fmt.Sprintf(`
SELECT *
FROM sku
WHERE name_id = ? AND %s_sync_status <> 0
ORDER BY IF(spec_unit IN('kg', 'L'), 1000, 1) * spec_quality
`, dbField), skuName.ID); err == nil && len(skuList) > 0 {
for _, sku := range skuList {
syncStatus := refutil.GetObjFieldByName(sku, syncStatusFieldName).(int8)
globals.SugarLogger.Debugf("SyncSku trackInfo:%s, skuID:%d, syncStatus:%d", ctx.GetTrackInfo(), sku.ID, syncStatus)
if (len(skuIDs) == 0 || skuMap[sku.ID]) && (syncStatus != 0) {
updateFields := []string{syncStatusFieldName}
if syncStatus&model.SyncFlagDeletedMask != 0 { // 删除
if syncStatus&model.SyncFlagNewMask == 0 {
err = multiStoresHandler.DeleteSku(db, sku, userName)
}
} else if syncStatus&model.SyncFlagNewMask != 0 { // 新增
if err = multiStoresHandler.CreateSku(db, sku, userName); err == nil {
var tmpStruct struct {
MaxIndex int
}
// todo hard code 得到京东spu中sku的顺序以方便以后修改销售属性这个必须要每次重新从数据库取
if dao.GetRow(db, &tmpStruct, "SELECT MAX(sku_index) max_index FROM sku WHERE name_id = ? AND jd_id > 0 AND jd_id < 4024012631406 ", sku.NameID) == nil {
sku.SkuIndex = tmpStruct.MaxIndex + 1
updateFields = append(updateFields, "SkuIndex")
}
updateFields = append(updateFields, dao.GetVendorThingIDStructField(model.VendorNames[multiStoresHandler.GetVendorID()]))
}
} else if syncStatus&model.SyncFlagModifiedMask != 0 { // 修改
err = multiStoresHandler.UpdateSku(db, sku, userName)
}
if err == nil {
refutil.SetObjFieldByName(sku, syncStatusFieldName, int8(0))
if _, err = dao.UpdateEntity(db, sku, updateFields...); err != nil {
break
}
}
}
}
}
if err == nil {
refutil.SetObjFieldByName(skuName, syncStatusFieldName, int8(0))
_, err = dao.UpdateEntity(db, skuName, syncStatusFieldName)
}
return nil, err
}, skuNameList)
t.AddChild(task).Run()
_, err = task.GetResult(0)
}
return nil, err
})
}
func (v *VendorSync) SyncStoresCategory(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isAsync bool) (hint string, err error) {
globals.SugarLogger.Debug("SyncStoresCategory")
isManageIt := len(storeIDs) != 1
return v.LoopStoresMap(ctx, db, fmt.Sprintf("同步门店分类信息:%v", storeIDs), isAsync, isManageIt, vendorIDs, storeIDs,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if handler := v.GetSingleStoreHandler(loopMapInfo.VendorID); handler != nil {
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewSeqTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), ctx,
func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
storeID := loopMapInfo.StoreMapList[step].StoreID
_, err = handler.SyncStoreCategory(ctx, task, storeID, false)
return nil, err
}, len(loopMapInfo.StoreMapList))
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
return nil, err
}
_, err = handler.SyncStoreCategory(ctx, t, loopMapInfo.StoreMapList[0].StoreID, false)
err = jxutils.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}
return nil, err
})
}
//
func (v *VendorSync) SyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, skuIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
globals.SugarLogger.Debug("SyncStoresSkus")
isManageIt := len(storeIDs) != 1 || len(skuIDs) == 0 || len(skuIDs) > 8
return v.LoopStoresMap(ctx, db, fmt.Sprintf("同步门店商品信息:%v", storeIDs), isAsync, isManageIt, vendorIDs, storeIDs,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil {
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewSeqTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), ctx,
func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
storeID := loopMapInfo.StoreMapList[step].StoreID
if _, err = handler.SyncStoreSkus(ctx, task, storeID, skuIDs, false, isContinueWhenError); err != nil {
globals.SugarLogger.Debugf("SyncStoresSkus failed1 store:%d failed with error:%v", storeID, err)
if isContinueWhenError {
err = nil
}
}
return nil, err
}, len(loopMapInfo.StoreMapList))
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
return nil, err
}
_, err = handler.SyncStoreSkus(ctx, t, loopMapInfo.StoreMapList[0].StoreID, skuIDs, false, isContinueWhenError)
err = jxutils.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}
return nil, err
})
}
func (v *VendorSync) FullSyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
globals.SugarLogger.Debug("FullSyncStoresSkus")
return v.LoopStoresMap(ctx, db, fmt.Sprintf("初始化门店商品信息:%v", storeIDs), isAsync, true, vendorIDs, storeIDs,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil {
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewSeqTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), ctx,
func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
storeID := loopMapInfo.StoreMapList[step].StoreID
_, err = handler.FullSyncStoreSkus(ctx, task, storeID, false, isContinueWhenError)
return nil, err
}, len(loopMapInfo.StoreMapList))
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
return nil, err
}
_, err = handler.FullSyncStoreSkus(ctx, t, loopMapInfo.StoreMapList[0].StoreID, false, isContinueWhenError)
err = jxutils.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}
return nil, err
})
}
func (v *VendorSync) DeleteRemoteStoreSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
globals.SugarLogger.Debug("DeleteRemoteStoreSkus")
return v.LoopStoresMap(ctx, db, fmt.Sprintf("删除远程门店商品信息:%v", storeIDs), isAsync, true, vendorIDs, storeIDs,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil {
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewSeqTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), ctx,
func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
storeID := loopMapInfo.StoreMapList[step].StoreID
_, err = handler.DeleteRemoteStoreSkus(ctx, task, storeID, false, isContinueWhenError)
return nil, err
}, len(loopMapInfo.StoreMapList))
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
return nil, err
}
_, err = handler.DeleteRemoteStoreSkus(ctx, t, loopMapInfo.StoreMapList[0].StoreID, false, isContinueWhenError)
err = jxutils.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}
return nil, err
})
}
func (v *VendorSync) LoopStoresMap(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync, isManageIt bool, vendorIDs []int, storeIDs []int, handler tasksch.WorkFunc) (hint string, err error) {
sql := `
SELECT t1.*
FROM store_map t1
WHERE t1.is_sync = 1 AND t1.deleted_at = ?
`
sqlParams := []interface{}{
utils.DefaultTimeValue,
}
if len(vendorIDs) > 0 {
sql += " AND t1.vendor_id IN (" + dao.GenQuestionMarks(len(vendorIDs)) + ")"
sqlParams = append(sqlParams, vendorIDs)
}
if len(storeIDs) > 0 {
sql += " AND t1.store_id IN (" + dao.GenQuestionMarks(len(storeIDs)) + ")"
sqlParams = append(sqlParams, storeIDs)
}
sql += " ORDER BY t1.store_id, t1.vendor_id"
var storeMapList []*model.StoreMap
if err = dao.GetRows(db, &storeMapList, sql, sqlParams...); err != nil {
return "", err
}
if len(storeMapList) == 0 {
return "", nil
}
vendorStoreMap := make(map[int][]*model.StoreMap)
for _, v := range storeMapList {
vendorStoreMap[v.VendorID] = append(vendorStoreMap[v.VendorID], v)
}
loopInfoList := make([]*LoopStoreMapInfo, len(vendorStoreMap))
index := 0
for k, v := range vendorStoreMap {
loopInfoList[index] = &LoopStoreMapInfo{
VendorID: k,
StoreMapList: v,
}
index++
}
if len(loopInfoList) == 1 {
taskName = fmt.Sprintf("%s,处理平台%s", taskName, model.VendorChineseNames[loopInfoList[0].VendorID])
}
task := tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, handler, loopInfoList)
tasksch.HandleTask(task, nil, isManageIt).Run()
if !isAsync {
_, err = task.GetResult(0)
}
return task.ID, makeSyncError(err)
}
func (v *VendorSync) LoopMultiStoresVendors(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync bool, userName string, handler tasksch.WorkFunc) (hint string, err error) {
task := tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, handler, v.MultiStoreVendorIDs)
tasksch.HandleTask(task, nil, true).Run()
if !isAsync {
_, err = task.GetResult(0)
}
return task.ID, makeSyncError(err)
}
func (v *VendorSync) RefreshAllSkusID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int, storeIDs []int) (hint string, err error) {
task := tasksch.NewParallelTask("RefreshAllSkusID", nil, ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
vendorID := batchItemList[0].(int)
if handler := v.GetStoreHandler(vendorID); handler != nil {
if multiHandler, ok := handler.(partner.IMultipleStoresHandler); ok {
_, err = multiHandler.RefreshAllSkusID(ctx, task, false)
} else if singleHandler, ok := handler.(partner.ISingleStoreHandler); ok {
_, err = singleHandler.RefreshStoresAllSkusID(ctx, task, false, storeIDs)
}
}
return nil, err
}, vendorIDs)
tasksch.HandleTask(task, nil, true).Run()
if !isAsync {
_, err = task.GetResult(0)
}
return task.ID, err
}
func (v *VendorSync) RefreshAllStoresID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int) (hint string, err error) {
task := tasksch.NewParallelTask("RefreshAllStoresID", nil, ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
vendorID := batchItemList[0].(int)
if handler := v.GetStoreHandler(vendorID); handler != nil {
_, err = handler.RefreshAllStoresID(ctx, task, false)
}
return nil, err
}, vendorIDs)
tasksch.HandleTask(task, nil, true).Run()
if !isAsync {
_, err = task.GetResult(0)
}
return task.ID, err
}
func makeSyncError(err error) (newErr error) {
if err != nil {
return &SyncError{
Original: err,
}
}
return err
}
func (e *SyncError) Error() string {
return fmt.Sprintf("本地数据修改成功,但同步失败,请根据错误提示处理!,同步错误信息:%s", e.Original.Error())
}
func isSyncError(err error) bool {
_, ok := err.(*SyncError)
return ok
}
func (v *VendorSync) SyncSkuNames(ctx *jxcontext.Context, nameIDs []int, isForce, isAsync, isContinueWhenError bool) (hint string, err error) {
db := dao.GetDB()
if isForce {
sql := `
UPDATE sku t1
SET t1.jd_sync_status = t1.jd_sync_status | ?
WHERE t1.deleted_at = ?
`
sqlParams := []interface{}{
model.SyncFlagModifiedMask,
utils.DefaultTimeValue,
}
if len(nameIDs) > 0 {
sql += " AND t1.name_id IN(" + dao.GenQuestionMarks(len(nameIDs)) + ")"
sqlParams = append(sqlParams, nameIDs)
}
if _, err = dao.ExecuteSQL(db, sql, sqlParams...); err != nil {
return "", err
}
}
return v.SyncSkus(ctx, db, nameIDs, nil, isAsync, isContinueWhenError, ctx.GetUserName())
}