523 lines
21 KiB
Go
523 lines
21 KiB
Go
package cms
|
||
|
||
import (
|
||
"errors"
|
||
"fmt"
|
||
"reflect"
|
||
"sort"
|
||
|
||
"git.rosy.net.cn/baseapi/utils"
|
||
"git.rosy.net.cn/jx-callback/business/jxutils"
|
||
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
|
||
"git.rosy.net.cn/jx-callback/business/jxutils/tasksch"
|
||
"git.rosy.net.cn/jx-callback/business/model"
|
||
"git.rosy.net.cn/jx-callback/business/model/dao"
|
||
"git.rosy.net.cn/jx-callback/business/partner"
|
||
"git.rosy.net.cn/jx-callback/globals"
|
||
"git.rosy.net.cn/jx-callback/globals/api"
|
||
)
|
||
|
||
type LoopStoreMapInfo struct {
|
||
VendorID int
|
||
StoreMapList []*model.StoreMap
|
||
}
|
||
|
||
type VendorSync struct {
|
||
MultiStoreVendorIDs []int
|
||
SingleStoreVendorIDs []int
|
||
PurchaseHandlers map[int]partner.IPurchasePlatformHandler
|
||
}
|
||
|
||
type SyncError struct {
|
||
Original error `json:"original"`
|
||
Message string `json:"message"`
|
||
}
|
||
|
||
// 对于多门店平台接口的通用处理
|
||
type MultiStoreHandlerWrapper struct {
|
||
partner.IMultipleStoresHandler
|
||
}
|
||
|
||
// 对于单门店平台接口的通用处理
|
||
type SingleStoreHandlerWrapper struct {
|
||
partner.ISingleStoreHandler
|
||
}
|
||
|
||
var (
|
||
CurVendorSync VendorSync
|
||
)
|
||
|
||
var (
|
||
ErrHaveNotImplementedYet = errors.New("还没有实现")
|
||
ErrEntityNotExist = errors.New("找不到相应实体")
|
||
)
|
||
|
||
func Init() {
|
||
apiMap := map[int]interface{}{
|
||
model.VendorIDJD: api.JdAPI,
|
||
model.VendorIDELM: api.ElmAPI,
|
||
model.VendorIDEBAI: api.EbaiAPI,
|
||
model.VendorIDMTWM: api.MtwmAPI,
|
||
model.VendorIDWSC: api.WeimobAPI,
|
||
}
|
||
CurVendorSync.PurchaseHandlers = make(map[int]partner.IPurchasePlatformHandler)
|
||
for k, v := range partner.PurchasePlatformHandlers {
|
||
if !reflect.ValueOf(apiMap[k]).IsNil() {
|
||
if multiHandler, ok := v.(partner.IMultipleStoresHandler); ok {
|
||
CurVendorSync.MultiStoreVendorIDs = append(CurVendorSync.MultiStoreVendorIDs, k)
|
||
CurVendorSync.PurchaseHandlers[k] = &MultiStoreHandlerWrapper{
|
||
IMultipleStoresHandler: multiHandler,
|
||
}
|
||
} else if singleHandler, ok := v.(partner.ISingleStoreHandler); ok {
|
||
CurVendorSync.SingleStoreVendorIDs = append(CurVendorSync.SingleStoreVendorIDs, k)
|
||
CurVendorSync.PurchaseHandlers[k] = &SingleStoreHandlerWrapper{
|
||
ISingleStoreHandler: singleHandler,
|
||
}
|
||
} else {
|
||
panic(fmt.Sprintf("platform:%d type is wrong!", k))
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
func (p *MultiStoreHandlerWrapper) DeleteCategory(db *dao.DaoDB, cat *model.SkuCategory, userName string) (err error) {
|
||
if jxutils.IsFakeID(cat.JdID) {
|
||
return nil
|
||
}
|
||
return p.IMultipleStoresHandler.DeleteCategory(db, cat, userName)
|
||
}
|
||
|
||
func (p *MultiStoreHandlerWrapper) UpdateCategory(db *dao.DaoDB, cat *model.SkuCategory, userName string) (err error) {
|
||
if jxutils.IsFakeID(cat.JdID) {
|
||
globals.SugarLogger.Warnf("UpdateCategory fakeid cat:%s should not get here", utils.Format4Output(cat, true))
|
||
return nil
|
||
}
|
||
return p.IMultipleStoresHandler.UpdateCategory(db, cat, userName)
|
||
}
|
||
|
||
func (p *MultiStoreHandlerWrapper) DeleteSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error) {
|
||
globals.SugarLogger.Debugf("wrapper DeleteSku, sku:%s", utils.Format4Output(sku, false))
|
||
if jxutils.IsFakeID(sku.JdID) {
|
||
return nil
|
||
}
|
||
return p.IMultipleStoresHandler.DeleteSku(db, sku, userName)
|
||
}
|
||
|
||
func (p *MultiStoreHandlerWrapper) UpdateSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error) {
|
||
if jxutils.IsFakeID(sku.JdID) {
|
||
globals.SugarLogger.Warnf("UpdateSku fakeid sku:%s should not get here", utils.Format4Output(sku, true))
|
||
return nil
|
||
}
|
||
return p.IMultipleStoresHandler.UpdateSku(db, sku, userName)
|
||
}
|
||
|
||
func (v *VendorSync) GetStoreHandler(vendorID int) partner.IPurchasePlatformHandler {
|
||
return v.PurchaseHandlers[vendorID]
|
||
}
|
||
|
||
func (v *VendorSync) GetMultiStoreHandler(vendorID int) partner.IMultipleStoresHandler {
|
||
if handler, ok := v.PurchaseHandlers[vendorID].(partner.IMultipleStoresHandler); ok {
|
||
return handler
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (v *VendorSync) GetSingleStoreHandler(vendorID int) partner.ISingleStoreHandler {
|
||
if handler, ok := v.PurchaseHandlers[vendorID].(partner.ISingleStoreHandler); ok {
|
||
return handler
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (v *VendorSync) syncCategories(ctx *jxcontext.Context, parentTask tasksch.ITask, multiStoresHandler partner.IMultipleStoresHandler, db *dao.DaoDB, cats []*model.SkuCategory, userName string) (err error) {
|
||
syncStatusFieldName := dao.GetSyncStatusStructField(model.VendorNames[multiStoresHandler.GetVendorID()])
|
||
task := tasksch.NewParallelTask("syncCategories", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||
cat := batchItemList[0].(*model.SkuCategory)
|
||
updateFields := []string{syncStatusFieldName}
|
||
syncStatus := jxutils.GetObjFieldByName(cat, syncStatusFieldName).(int8)
|
||
if (syncStatus & model.SyncFlagDeletedMask) != 0 { //删除
|
||
err = multiStoresHandler.DeleteCategory(db, cat, userName)
|
||
} else if (syncStatus & model.SyncFlagNewMask) != 0 { // 新增
|
||
err = multiStoresHandler.CreateCategory(db, cat, userName)
|
||
updateFields = append(updateFields, dao.GetVendorThingIDStructField(model.VendorNames[multiStoresHandler.GetVendorID()]))
|
||
} else if (syncStatus & model.SyncFlagModifiedMask) != 0 { // 修改
|
||
err = multiStoresHandler.UpdateCategory(db, cat, userName)
|
||
}
|
||
if err == nil {
|
||
jxutils.SetObjFieldByName(cat, syncStatusFieldName, int8(0))
|
||
_, err = dao.UpdateEntity(db, cat, updateFields...)
|
||
}
|
||
return nil, err
|
||
}, cats)
|
||
ctx.SetTaskOrAddChild(task, parentTask)
|
||
task.Run()
|
||
_, err = task.GetResult(0)
|
||
return err
|
||
}
|
||
|
||
func (v *VendorSync) SyncCategory(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) {
|
||
globals.SugarLogger.Debug(v.MultiStoreVendorIDs)
|
||
hint, err = v.LoopMultiStoresVendors(ctx, db, "SyncCategory", isAsync, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||
multiStoresHandler := v.GetMultiStoreHandler(batchItemList[0].(int))
|
||
var cats []*model.SkuCategory
|
||
cond := make(map[string]interface{})
|
||
if categoryID > 0 {
|
||
cond[model.FieldID] = categoryID
|
||
} else {
|
||
cond[model.FieldLevel] = 1
|
||
}
|
||
err := dao.GetEntitiesByKV(db, &cats, cond, true)
|
||
if err == nil {
|
||
err = v.syncCategories(ctx, t, multiStoresHandler, db, cats, userName)
|
||
}
|
||
if err != nil || categoryID > 0 {
|
||
return nil, err
|
||
}
|
||
cond[model.FieldLevel] = 2
|
||
err = dao.GetEntitiesByKV(db, &cats, cond, true)
|
||
if err == nil {
|
||
err = v.syncCategories(ctx, t, multiStoresHandler, db, cats, userName)
|
||
}
|
||
return nil, err
|
||
})
|
||
return "", err
|
||
}
|
||
|
||
func (v *VendorSync) SyncReorderCategories(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) {
|
||
hint, err = v.LoopMultiStoresVendors(ctx, db, "SyncReorderCategories", isAsync, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||
multiStoresHandler := v.GetMultiStoreHandler(batchItemList[0].(int))
|
||
err2 := multiStoresHandler.ReorderCategories(db, categoryID, userName)
|
||
if err2 == nil {
|
||
cat := &model.SkuCategory{}
|
||
_, err2 = dao.UpdateEntityByKV(db, cat, utils.Params2Map(dao.GetSyncStatusStructField(model.VendorNames[multiStoresHandler.GetVendorID()]), 0), utils.Params2Map(model.FieldParentID, categoryID))
|
||
return nil, err2
|
||
}
|
||
return nil, err2
|
||
})
|
||
return "", err
|
||
}
|
||
|
||
func (v *VendorSync) SyncStore(ctx *jxcontext.Context, db *dao.DaoDB, vendorID, storeID int, isAsync bool, userName string) (hint string, err error) {
|
||
globals.SugarLogger.Debugf("SyncStore, storeID:%d", storeID)
|
||
|
||
var vendorIDs []int
|
||
if vendorID != -1 {
|
||
vendorIDs = []int{
|
||
vendorID,
|
||
}
|
||
}
|
||
hint, err = v.LoopStoresMap(ctx, db, "SyncStore", isAsync, vendorIDs, []int{storeID}, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
|
||
handler := v.GetStoreHandler(loopMapInfo.VendorID)
|
||
if len(loopMapInfo.StoreMapList) > 1 {
|
||
loopStoreTask := tasksch.NewParallelTask("SyncStore loop stores", nil, ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||
storeMap := batchItemList[0].(*model.StoreMap)
|
||
if err = handler.UpdateStore(db, storeMap.StoreID, userName); err == nil {
|
||
storeMap.SyncStatus = 0
|
||
_, err = dao.UpdateEntity(db, storeMap, model.FieldSyncStatus)
|
||
}
|
||
return nil, err
|
||
}, loopMapInfo.StoreMapList)
|
||
t.AddChild(loopStoreTask).Run()
|
||
_, err = loopStoreTask.GetResult(0)
|
||
return nil, err
|
||
}
|
||
storeMap := loopMapInfo.StoreMapList[0]
|
||
if err = handler.UpdateStore(db, storeMap.StoreID, userName); err == nil {
|
||
storeMap.SyncStatus = 0
|
||
_, err = dao.UpdateEntity(db, storeMap, model.FieldSyncStatus)
|
||
}
|
||
return nil, err
|
||
})
|
||
return hint, err
|
||
}
|
||
|
||
func (v *VendorSync) SyncSku(ctx *jxcontext.Context, db *dao.DaoDB, nameID, skuID int, isAsync bool, userName string) (hint string, err error) {
|
||
globals.SugarLogger.Debugf("SyncSku, nameID:%d, skuID:%d, userName:%s", nameID, skuID, userName)
|
||
return v.LoopMultiStoresVendors(ctx, db, "SyncSku", isAsync, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||
multiStoresHandler := v.GetMultiStoreHandler(batchItemList[0].(int))
|
||
syncStatusFieldName := dao.GetSyncStatusStructField(model.VendorNames[multiStoresHandler.GetVendorID()])
|
||
var skuList []*model.Sku
|
||
cond := make(map[string]interface{})
|
||
if nameID != -1 {
|
||
cond[model.FieldNameID] = nameID
|
||
}
|
||
if skuID != -1 {
|
||
cond[model.FieldID] = skuID
|
||
}
|
||
err := dao.GetEntitiesByKV(db, &skuList, cond, true)
|
||
if err == nil && len(skuList) > 0 {
|
||
sort.Sort(jxutils.SkuList(skuList))
|
||
// globals.SugarLogger.Debug(utils.Format4Output(skuList, false))
|
||
// todo 这里SetParallelCount(1)的原因是京东SPU特殊类型必须要序列化同步才能正常处理, db可能会有多线程问题
|
||
task := tasksch.NewParallelTask("SyncSku loop sku", tasksch.NewParallelConfig().SetParallelCount(1), userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||
sku := batchItemList[0].(*model.Sku)
|
||
syncStatus := jxutils.GetObjFieldByName(sku, syncStatusFieldName).(int8)
|
||
if (skuID == -1 || skuID == sku.ID) && (syncStatus != 0) {
|
||
updateFields := []string{syncStatusFieldName}
|
||
if syncStatus&model.SyncFlagDeletedMask != 0 { // 删除
|
||
err = multiStoresHandler.DeleteSku(db, sku, userName)
|
||
} else if syncStatus&model.SyncFlagNewMask != 0 { // 新增
|
||
if err = multiStoresHandler.CreateSku(db, sku, userName); err == nil {
|
||
var tmpStruct struct {
|
||
MaxIndex int
|
||
}
|
||
// todo hard code 得到京东spu中sku的顺序(以方便以后修改销售属性)
|
||
if dao.GetRow(db, &tmpStruct, "SELECT MAX(sku_index) max_index FROM sku WHERE name_id = ? AND jd_id > 0 AND jd_id < 4024012631406 ", sku.NameID) == nil {
|
||
sku.SkuIndex = tmpStruct.MaxIndex + 1
|
||
updateFields = append(updateFields, "SkuIndex")
|
||
}
|
||
updateFields = append(updateFields, dao.GetVendorThingIDStructField(model.VendorNames[multiStoresHandler.GetVendorID()]))
|
||
}
|
||
} else if syncStatus&model.SyncFlagModifiedMask != 0 { // 修改
|
||
err = multiStoresHandler.UpdateSku(db, sku, userName)
|
||
}
|
||
if err == nil {
|
||
jxutils.SetObjFieldByName(sku, syncStatusFieldName, int8(0))
|
||
dao.UpdateEntity(db, sku, updateFields...)
|
||
}
|
||
}
|
||
return nil, err
|
||
}, skuList)
|
||
t.AddChild(task).Run()
|
||
_, err = task.GetResult(0)
|
||
}
|
||
return nil, err
|
||
})
|
||
}
|
||
|
||
func (v *VendorSync) SyncStoresCategory(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isAsync bool) (hint string, err error) {
|
||
globals.SugarLogger.Debug("SyncStoresCategory")
|
||
return v.LoopStoresMap(ctx, db, "SyncStoresCategory", isAsync, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
|
||
if handler := v.GetSingleStoreHandler(loopMapInfo.VendorID); handler != nil {
|
||
if len(loopMapInfo.StoreMapList) > 1 {
|
||
loopStoreTask := tasksch.NewSeqTask("SyncStoresCategory loop stores", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
|
||
storeID := loopMapInfo.StoreMapList[step].StoreID
|
||
_, err = handler.SyncStoreCategory(ctx, task, storeID, false)
|
||
return nil, err
|
||
}, len(loopMapInfo.StoreMapList))
|
||
t.AddChild(loopStoreTask).Run()
|
||
_, err = loopStoreTask.GetResult(0)
|
||
return nil, err
|
||
}
|
||
_, err = handler.SyncStoreCategory(ctx, t, loopMapInfo.StoreMapList[0].StoreID, false)
|
||
}
|
||
return nil, err
|
||
})
|
||
}
|
||
|
||
//
|
||
func (v *VendorSync) SyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, skuIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
|
||
globals.SugarLogger.Debug("SyncStoresSkus")
|
||
return v.LoopStoresMap(ctx, db, "SyncStoresSkus顶层", isAsync, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
|
||
if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil {
|
||
if len(loopMapInfo.StoreMapList) > 1 {
|
||
loopStoreTask := tasksch.NewSeqTask("SyncStoresSkus相同平台循环门店", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
|
||
storeID := loopMapInfo.StoreMapList[step].StoreID
|
||
if _, err = handler.SyncStoreSkus(ctx, task, storeID, skuIDs, false, isContinueWhenError); err != nil {
|
||
globals.SugarLogger.Debugf("SyncStoresSkus failed1 store:%d failed with error:%v", storeID, err)
|
||
if isContinueWhenError {
|
||
err = nil
|
||
}
|
||
}
|
||
return nil, err
|
||
}, len(loopMapInfo.StoreMapList))
|
||
t.AddChild(loopStoreTask).Run()
|
||
_, err = loopStoreTask.GetResult(0)
|
||
return nil, err
|
||
}
|
||
_, err = handler.SyncStoreSkus(ctx, t, loopMapInfo.StoreMapList[0].StoreID, skuIDs, false, isContinueWhenError)
|
||
}
|
||
return nil, err
|
||
})
|
||
}
|
||
|
||
func (v *VendorSync) FullSyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
|
||
globals.SugarLogger.Debug("FullSyncStoresSkus")
|
||
return v.LoopStoresMap(ctx, db, "FullSyncStoresSkus顶层", isAsync, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
|
||
if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil {
|
||
if len(loopMapInfo.StoreMapList) > 1 {
|
||
loopStoreTask := tasksch.NewSeqTask("FullSyncStoresSkus相同平台循环门店", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
|
||
storeID := loopMapInfo.StoreMapList[step].StoreID
|
||
_, err = handler.FullSyncStoreSkus(ctx, task, storeID, false, isContinueWhenError)
|
||
return nil, err
|
||
}, len(loopMapInfo.StoreMapList))
|
||
t.AddChild(loopStoreTask).Run()
|
||
_, err = loopStoreTask.GetResult(0)
|
||
return nil, err
|
||
}
|
||
_, err = handler.FullSyncStoreSkus(ctx, t, loopMapInfo.StoreMapList[0].StoreID, false, isContinueWhenError)
|
||
}
|
||
return nil, err
|
||
})
|
||
}
|
||
|
||
func (v *VendorSync) DeleteRemoteStoreSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
|
||
globals.SugarLogger.Debug("DeleteRemoteStoreSkus")
|
||
return v.LoopStoresMap(ctx, db, "DeleteRemoteStoreSkus顶层", isAsync, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
||
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
|
||
if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil {
|
||
if len(loopMapInfo.StoreMapList) > 1 {
|
||
loopStoreTask := tasksch.NewSeqTask("DeleteRemoteStoreSkus相同平台循环门店", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
|
||
storeID := loopMapInfo.StoreMapList[step].StoreID
|
||
_, err = handler.DeleteRemoteStoreSkus(ctx, task, storeID, false, isContinueWhenError)
|
||
return nil, err
|
||
}, len(loopMapInfo.StoreMapList))
|
||
t.AddChild(loopStoreTask).Run()
|
||
_, err = loopStoreTask.GetResult(0)
|
||
return nil, err
|
||
}
|
||
_, err = handler.DeleteRemoteStoreSkus(ctx, t, loopMapInfo.StoreMapList[0].StoreID, false, isContinueWhenError)
|
||
}
|
||
return nil, err
|
||
})
|
||
}
|
||
|
||
func (v *VendorSync) LoopStoresMap(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync bool, vendorIDs []int, storeIDs []int, handler tasksch.WorkFunc) (hint string, err error) {
|
||
sql := `
|
||
SELECT t1.*
|
||
FROM store_map t1
|
||
WHERE t1.is_sync = 1 AND t1.deleted_at = ?
|
||
`
|
||
sqlParams := []interface{}{
|
||
utils.DefaultTimeValue,
|
||
}
|
||
if len(vendorIDs) > 0 {
|
||
sql += " AND t1.vendor_id IN (" + dao.GenQuestionMarks(len(vendorIDs)) + ")"
|
||
sqlParams = append(sqlParams, vendorIDs)
|
||
}
|
||
if len(storeIDs) > 0 {
|
||
sql += " AND t1.store_id IN (" + dao.GenQuestionMarks(len(storeIDs)) + ")"
|
||
sqlParams = append(sqlParams, storeIDs)
|
||
}
|
||
sql += " ORDER BY t1.store_id, t1.vendor_id"
|
||
var storeMapList []*model.StoreMap
|
||
if err = dao.GetRows(db, &storeMapList, sql, sqlParams...); err != nil {
|
||
return "", err
|
||
}
|
||
|
||
if len(storeMapList) == 0 {
|
||
return "", nil
|
||
}
|
||
vendorStoreMap := make(map[int][]*model.StoreMap)
|
||
for _, v := range storeMapList {
|
||
vendorStoreMap[v.VendorID] = append(vendorStoreMap[v.VendorID], v)
|
||
}
|
||
loopInfoList := make([]*LoopStoreMapInfo, len(vendorStoreMap))
|
||
index := 0
|
||
for k, v := range vendorStoreMap {
|
||
loopInfoList[index] = &LoopStoreMapInfo{
|
||
VendorID: k,
|
||
StoreMapList: v,
|
||
}
|
||
index++
|
||
}
|
||
task := tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx.GetUserName(), handler, loopInfoList)
|
||
ctx.SetTaskOrAddChild(task, nil)
|
||
tasksch.ManageTask(task).Run()
|
||
if !isAsync {
|
||
_, err = task.GetResult(0)
|
||
}
|
||
return task.ID, makeSyncError(err)
|
||
}
|
||
|
||
func (v *VendorSync) LoopMultiStoresVendors(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync bool, userName string, handler tasksch.WorkFunc) (hint string, err error) {
|
||
task := tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), userName, handler, v.MultiStoreVendorIDs)
|
||
ctx.SetTaskOrAddChild(task, nil)
|
||
tasksch.ManageTask(task).Run()
|
||
if !isAsync {
|
||
_, err = task.GetResult(0)
|
||
}
|
||
return task.ID, makeSyncError(err)
|
||
}
|
||
|
||
// func (v *VendorSync) LoopStoreVendors(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, taskName string, isAsync bool, userName string, handler tasksch.WorkFunc) (hint string, err error) {
|
||
// if taskName == "" {
|
||
// taskName = "LoopStoreVendors"
|
||
// }
|
||
// var vendorIDMap map[int]int
|
||
// if len(vendorIDs) != 0 {
|
||
// vendorIDMap = make(map[int]int)
|
||
// for _, v := range vendorIDs {
|
||
// vendorIDMap[v] = 1
|
||
// }
|
||
// }
|
||
// allHandlers := make([]int, len(v.MultiStoreVendorIDs)+len(v.SingleStoreVendorIDs))
|
||
// copy(allHandlers, v.MultiStoreVendorIDs)
|
||
// copy(allHandlers[len(v.MultiStoreVendorIDs):], v.SingleStoreVendorIDs)
|
||
// if vendorIDMap != nil {
|
||
// count := 0
|
||
// for _, v := range allHandlers {
|
||
// if vendorIDMap[v] == 1 {
|
||
// allHandlers[count] = v
|
||
// count++
|
||
// }
|
||
// }
|
||
// allHandlers = allHandlers[:count]
|
||
// }
|
||
// task := tasksch.NewParallelTask(taskName, nil, userName, handler, allHandlers)
|
||
// ctx.SetTaskOrAddChild(task, nil)
|
||
// tasksch.ManageTask(task).Run()
|
||
// if !isAsync {
|
||
// _, err = task.GetResult(0)
|
||
// }
|
||
// return task.ID, err
|
||
// }
|
||
|
||
func (v *VendorSync) RefreshAllSkusID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int, storeIDs []int) (hint string, err error) {
|
||
task := tasksch.NewParallelTask("RefreshAllSkusID", nil, ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||
vendorID := batchItemList[0].(int)
|
||
if handler := v.GetStoreHandler(vendorID); handler != nil {
|
||
if multiHandler, ok := handler.(partner.IMultipleStoresHandler); ok {
|
||
_, err = multiHandler.RefreshAllSkusID(ctx, task, false)
|
||
} else if singleHandler, ok := handler.(partner.ISingleStoreHandler); ok {
|
||
_, err = singleHandler.RefreshStoresAllSkusID(ctx, task, false, storeIDs)
|
||
}
|
||
}
|
||
return nil, err
|
||
}, vendorIDs)
|
||
ctx.SetTaskOrAddChild(task, nil)
|
||
tasksch.ManageTask(task).Run()
|
||
if !isAsync {
|
||
_, err = task.GetResult(0)
|
||
}
|
||
return task.ID, err
|
||
}
|
||
|
||
func (v *VendorSync) RefreshAllStoresID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int) (hint string, err error) {
|
||
task := tasksch.NewParallelTask("RefreshAllStoresID", nil, ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||
vendorID := batchItemList[0].(int)
|
||
if handler := v.GetStoreHandler(vendorID); handler != nil {
|
||
_, err = handler.RefreshAllStoresID(ctx, task, false)
|
||
}
|
||
return nil, err
|
||
}, vendorIDs)
|
||
ctx.SetTaskOrAddChild(task, nil)
|
||
tasksch.ManageTask(task).Run()
|
||
if !isAsync {
|
||
_, err = task.GetResult(0)
|
||
}
|
||
return task.ID, err
|
||
}
|
||
|
||
func makeSyncError(err error) (newErr error) {
|
||
if err != nil {
|
||
return &SyncError{
|
||
Original: err,
|
||
}
|
||
}
|
||
return err
|
||
}
|
||
|
||
func (e *SyncError) Error() string {
|
||
return fmt.Sprintf("本地数据修改成功,但同步失败,请根据错误提示处理!,同步错误信息:%s", e.Original.Error())
|
||
}
|
||
|
||
func isSyncError(err error) bool {
|
||
_, ok := err.(*SyncError)
|
||
return ok
|
||
}
|