package cms import ( "errors" "fmt" "reflect" "strings" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxutils" "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" "git.rosy.net.cn/jx-callback/business/jxutils/tasksch" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model/dao" "git.rosy.net.cn/jx-callback/business/partner" "git.rosy.net.cn/jx-callback/globals" "git.rosy.net.cn/jx-callback/globals/api" "git.rosy.net.cn/jx-callback/globals/refutil" ) type LoopStoreMapInfo struct { VendorID int StoreMapList []*model.StoreMap } type VendorSync struct { MultiStoreVendorIDs []int SingleStoreVendorIDs []int PurchaseHandlers map[int]partner.IPurchasePlatformHandler } type SyncError struct { Original error `json:"original"` Message string `json:"message"` } // 对于多门店平台接口的通用处理 type MultiStoreHandlerWrapper struct { partner.IMultipleStoresHandler } // 对于单门店平台接口的通用处理 type SingleStoreHandlerWrapper struct { partner.ISingleStoreHandler } var ( CurVendorSync VendorSync ) var ( ErrHaveNotImplementedYet = errors.New("还没有实现") ErrEntityNotExist = errors.New("找不到相应实体") ) func Init() { apiMap := map[int]interface{}{ model.VendorIDJD: api.JdAPI, model.VendorIDELM: api.ElmAPI, model.VendorIDEBAI: api.EbaiAPI, model.VendorIDMTWM: api.MtwmAPI, model.VendorIDWSC: api.WeimobAPI, } CurVendorSync.PurchaseHandlers = make(map[int]partner.IPurchasePlatformHandler) for k, v := range partner.PurchasePlatformHandlers { if !reflect.ValueOf(apiMap[k]).IsNil() { if multiHandler, ok := v.(partner.IMultipleStoresHandler); ok { CurVendorSync.MultiStoreVendorIDs = append(CurVendorSync.MultiStoreVendorIDs, k) CurVendorSync.PurchaseHandlers[k] = &MultiStoreHandlerWrapper{ IMultipleStoresHandler: multiHandler, } } else if singleHandler, ok := v.(partner.ISingleStoreHandler); ok { CurVendorSync.SingleStoreVendorIDs = append(CurVendorSync.SingleStoreVendorIDs, k) CurVendorSync.PurchaseHandlers[k] = &SingleStoreHandlerWrapper{ ISingleStoreHandler: singleHandler, } } else { panic(fmt.Sprintf("platform:%d type is wrong!", k)) } } } } func (p *MultiStoreHandlerWrapper) DeleteCategory(db *dao.DaoDB, cat *model.SkuCategory, userName string) (err error) { if jxutils.IsEmptyID(cat.JdID) { return nil } return p.IMultipleStoresHandler.DeleteCategory(db, cat, userName) } func (p *MultiStoreHandlerWrapper) UpdateCategory(db *dao.DaoDB, cat *model.SkuCategory, userName string) (err error) { if jxutils.IsEmptyID(cat.JdID) { globals.SugarLogger.Warnf("UpdateCategory fakeid cat:%s should not get here", utils.Format4Output(cat, true)) return nil } return p.IMultipleStoresHandler.UpdateCategory(db, cat, userName) } func (p *MultiStoreHandlerWrapper) DeleteSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error) { globals.SugarLogger.Debugf("wrapper DeleteSku, sku:%s", utils.Format4Output(sku, false)) if jxutils.IsEmptyID(sku.JdID) { return nil } return p.IMultipleStoresHandler.DeleteSku(db, sku, userName) } func (p *MultiStoreHandlerWrapper) UpdateSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error) { if jxutils.IsEmptyID(sku.JdID) { globals.SugarLogger.Warnf("UpdateSku fakeid sku:%s should not get here", utils.Format4Output(sku, true)) return nil } return p.IMultipleStoresHandler.UpdateSku(db, sku, userName) } func (v *VendorSync) GetStoreHandler(vendorID int) partner.IPurchasePlatformHandler { return v.PurchaseHandlers[vendorID] } func (v *VendorSync) GetMultiStoreHandler(vendorID int) partner.IMultipleStoresHandler { if handler, ok := v.PurchaseHandlers[vendorID].(partner.IMultipleStoresHandler); ok { return handler } return nil } func (v *VendorSync) GetSingleStoreHandler(vendorID int) partner.ISingleStoreHandler { if handler, ok := v.PurchaseHandlers[vendorID].(partner.ISingleStoreHandler); ok { return handler } return nil } func (v *VendorSync) syncCategories(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorID int, db *dao.DaoDB, cats []*model.SkuCategory, userName string) (err error) { multiStoresHandler := v.GetMultiStoreHandler(vendorID) syncStatusFieldName := dao.GetSyncStatusStructField(model.VendorNames[vendorID]) task := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[vendorID]), nil, ctx, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { cat := batchItemList[0].(*model.SkuCategory) updateFields := []string{syncStatusFieldName} syncStatus := refutil.GetObjFieldByName(cat, syncStatusFieldName).(int8) if (syncStatus & model.SyncFlagDeletedMask) != 0 { //删除 if (syncStatus & model.SyncFlagNewMask) == 0 { err = multiStoresHandler.DeleteCategory(db, cat, userName) } } else if (syncStatus & model.SyncFlagNewMask) != 0 { // 新增 err = multiStoresHandler.CreateCategory(db, cat, userName) updateFields = append(updateFields, dao.GetVendorThingIDStructField(model.VendorNames[multiStoresHandler.GetVendorID()])) } else if (syncStatus & model.SyncFlagModifiedMask) != 0 { // 修改 err = multiStoresHandler.UpdateCategory(db, cat, userName) if intErr, ok := err.(*utils.ErrorWithCode); ok && intErr.IntCode() == -3 { err = nil } } if err == nil { refutil.SetObjFieldByName(cat, syncStatusFieldName, int8(0)) _, err = dao.UpdateEntity(db, cat, updateFields...) } return nil, err }, cats) tasksch.HandleTask(task, parentTask, false).Run() _, err = task.GetResult(0) return err } func (v *VendorSync) SyncCategory(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) { globals.SugarLogger.Debug(v.MultiStoreVendorIDs) hint, err = v.LoopMultiStoresVendors(ctx, db, fmt.Sprintf("同步分类信息:%d", categoryID), isAsync, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { vendorID := batchItemList[0].(int) var cats []*model.SkuCategory cond := make(map[string]interface{}) if categoryID > 0 { cond[model.FieldID] = categoryID } else { cond[model.FieldLevel] = 1 } err := dao.GetEntitiesByKV(db, &cats, cond, true) if err == nil { err = v.syncCategories(ctx, t, vendorID, db, cats, userName) } if err != nil || categoryID > 0 { return nil, err } cond[model.FieldLevel] = 2 err = dao.GetEntitiesByKV(db, &cats, cond, true) if err == nil { err = v.syncCategories(ctx, t, vendorID, db, cats, userName) } return nil, err }) return "", err } func (v *VendorSync) SyncReorderCategories(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) { hint, err = v.LoopMultiStoresVendors(ctx, db, fmt.Sprintf("分类重排序:%d", categoryID), isAsync, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { multiStoresHandler := v.GetMultiStoreHandler(batchItemList[0].(int)) err2 := multiStoresHandler.ReorderCategories(db, categoryID, userName) if err2 == nil { cat := &model.SkuCategory{} _, err2 = dao.UpdateEntityByKV(db, cat, utils.Params2Map(dao.GetSyncStatusStructField(model.VendorNames[multiStoresHandler.GetVendorID()]), 0), utils.Params2Map(model.FieldParentID, categoryID)) return nil, err2 } return nil, err2 }) return "", err } func (v *VendorSync) SyncStore(ctx *jxcontext.Context, db *dao.DaoDB, vendorID, storeID int, isAsync bool, userName string) (hint string, err error) { globals.SugarLogger.Debugf("SyncStore, storeID:%d", storeID) var vendorIDs []int if vendorID != -1 { vendorIDs = []int{ vendorID, } } hint, err = v.LoopStoresMap(ctx, db, fmt.Sprintf("同步门店信息:%d", storeID), isAsync, false, vendorIDs, []int{storeID}, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (resultList interface{}, err error) { loopMapInfo := batchItemList[0].(*LoopStoreMapInfo) handler := v.GetStoreHandler(loopMapInfo.VendorID) if len(loopMapInfo.StoreMapList) > 1 { loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), nil, ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { var resultList []interface{} storeMap := batchItemList[0].(*model.StoreMap) if err = handler.UpdateStore(db, storeMap.StoreID, userName); err == nil { storeMap.SyncStatus = 0 _, err = dao.UpdateEntity(db, storeMap, model.FieldSyncStatus) resultList = append(resultList, 1) } return resultList, err }, loopMapInfo.StoreMapList) t.AddChild(loopStoreTask).Run() resultList, err = loopStoreTask.GetResult(0) } else { storeMap := loopMapInfo.StoreMapList[0] if err = handler.UpdateStore(db, storeMap.StoreID, userName); err == nil { storeMap.SyncStatus = 0 _, err = dao.UpdateEntity(db, storeMap, model.FieldSyncStatus) } if err == nil { resultList = []interface{}{1} } } return resultList, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID) }, true) return hint, makeSyncError(err) } func (v *VendorSync) SyncSku(ctx *jxcontext.Context, db *dao.DaoDB, nameID, skuID int, isAsync, isContinueWhenError bool, userName string) (hint string, err error) { var ( nameIDs []int skuIDs []int ) if nameID != -1 { nameIDs = []int{nameID} } if skuID != -1 { skuIDs = []int{skuID} } return v.SyncSkus(ctx, db, nameIDs, skuIDs, isAsync, isContinueWhenError, userName) } func (v *VendorSync) SyncSkus(ctx *jxcontext.Context, db *dao.DaoDB, nameIDs []int, skuIDs []int, isAsync, isContinueWhenError bool, userName string) (hint string, err error) { globals.SugarLogger.Debugf("SyncSku trackInfo:%s, nameIDs:%v, skuIDs:%v, userName:%s", ctx.GetTrackInfo(), nameIDs, skuIDs, userName) return v.LoopMultiStoresVendors(ctx, db, fmt.Sprintf("同步商品信息, nameIDs:%v, skuIDs:%v", nameIDs, skuIDs), isAsync, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { var resultList []interface{} vendorID := batchItemList[0].(int) multiStoresHandler := v.GetMultiStoreHandler(vendorID) syncStatusFieldName := dao.GetSyncStatusStructField(model.VendorNames[multiStoresHandler.GetVendorID()]) dbField := dao.ConvertDBFieldPrefix(model.VendorNames[multiStoresHandler.GetVendorID()]) skuMap := make(map[int]bool) sql := fmt.Sprintf(` SELECT DISTINCT t2.* FROM sku t1 JOIN sku_name t2 ON t2.id = t1.name_id WHERE t1.%s_sync_status <> 0 `, dbField) sqlParams := []interface{}{} if len(nameIDs) > 1 { sql += " AND t1.name_id IN (" + dao.GenQuestionMarks(len(nameIDs)) + ")" sqlParams = append(sqlParams, nameIDs) } else if len(nameIDs) == 1 { sql += " AND t1.name_id = ? " sqlParams = append(sqlParams, nameIDs[0]) } if len(skuIDs) > 0 { sql += " AND t1.id IN(" + dao.GenQuestionMarks(len(skuIDs)) + ")" sqlParams = append(sqlParams, skuIDs) } else if len(skuIDs) == 1 { sql += " AND t1.id = ? " sqlParams = append(sqlParams, skuIDs[0]) } for _, v := range skuIDs { skuMap[v] = true } sql += " ORDER BY t2.id" var skuNameList []*model.SkuName err := dao.GetRows(db, &skuNameList, sql, sqlParams...) if err == nil && len(skuNameList) > 0 { // todo 同一skuName下的sku顺序处理的原因是京东SPU特殊类型必须要序列化同步才能正常处理, db可能会有多线程问题 task := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[vendorID]), tasksch.NewParallelConfig().SetParallelCount(10).SetIsContinueWhenError(isContinueWhenError), ctx, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { var resultList []interface{} skuName := batchItemList[0].(*model.SkuName) var skuList []*model.Sku if err = dao.GetRows(db, &skuList, fmt.Sprintf(` SELECT * FROM sku WHERE name_id = ? AND %s_sync_status <> 0 ORDER BY IF(spec_unit IN('kg', 'L'), 1000, 1) * spec_quality `, dbField), skuName.ID); err == nil && len(skuList) > 0 { for _, sku := range skuList { syncStatus := refutil.GetObjFieldByName(sku, syncStatusFieldName).(int8) globals.SugarLogger.Debugf("SyncSku trackInfo:%s, skuID:%d, syncStatus:%d", ctx.GetTrackInfo(), sku.ID, syncStatus) if (len(skuIDs) == 0 || skuMap[sku.ID]) && (syncStatus != 0) { updateFields := []string{syncStatusFieldName} if syncStatus&model.SyncFlagDeletedMask != 0 { // 删除 if syncStatus&model.SyncFlagNewMask == 0 { err = multiStoresHandler.DeleteSku(db, sku, userName) } } else if syncStatus&model.SyncFlagNewMask != 0 { // 新增 if err = multiStoresHandler.CreateSku(db, sku, userName); err == nil { var tmpStruct struct { MaxIndex int } // todo hard code 得到京东spu中sku的顺序(以方便以后修改销售属性),这个必须要每次重新从数据库取 if dao.GetRow(db, &tmpStruct, "SELECT MAX(sku_index) max_index FROM sku WHERE name_id = ? AND jd_id > 0 AND jd_id < 4024012631406 ", sku.NameID) == nil { sku.SkuIndex = tmpStruct.MaxIndex + 1 updateFields = append(updateFields, "SkuIndex") } updateFields = append(updateFields, dao.GetVendorThingIDStructField(model.VendorNames[multiStoresHandler.GetVendorID()])) } } else if syncStatus&model.SyncFlagModifiedMask != 0 { // 修改 err = multiStoresHandler.UpdateSku(db, sku, userName) } if err == nil { refutil.SetObjFieldByName(sku, syncStatusFieldName, int8(0)) if _, err = dao.UpdateEntity(db, sku, updateFields...); err != nil { break } else { resultList = append(resultList, 1) } } } } } if err == nil { refutil.SetObjFieldByName(skuName, syncStatusFieldName, int8(0)) _, err = dao.UpdateEntity(db, skuName, syncStatusFieldName) } return resultList, err }, skuNameList) t.AddChild(task).Run() result, err2 := task.GetResult(0) if err = err2; err == nil { resultList = append(resultList, result...) } } return resultList, err }) } func (v *VendorSync) SyncStoresCategory(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isForce, isAsync, isContinueWhenError bool) (hint string, err error) { globals.SugarLogger.Debug("SyncStoresCategory") isManageIt := len(storeIDs) != 1 hint, err = v.LoopStoresMap(ctx, db, fmt.Sprintf("同步门店分类信息:%v", storeIDs), isAsync, isManageIt, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { loopMapInfo := batchItemList[0].(*LoopStoreMapInfo) if handler := v.GetSingleStoreHandler(loopMapInfo.VendorID); handler != nil { if isForce { dao.SetStoreCategorySyncStatus(db, loopMapInfo.VendorID, storeIDs, nil, model.SyncFlagModifiedMask) } if len(loopMapInfo.StoreMapList) > 1 { loopStoreTask := tasksch.NewSeqTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), ctx, func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { storeID := loopMapInfo.StoreMapList[step].StoreID _, err = handler.SyncStoreCategory(ctx, task, storeID, false) return nil, err }, len(loopMapInfo.StoreMapList)) t.AddChild(loopStoreTask).Run() _, err = loopStoreTask.GetResult(0) } else { _, err = handler.SyncStoreCategory(ctx, t, loopMapInfo.StoreMapList[0].StoreID, false) } } return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID) }, isContinueWhenError) return hint, makeSyncError(err) } // func (v *VendorSync) SyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, skuIDs []int, isForce, isAsync, isContinueWhenError bool) (hint string, err error) { globals.SugarLogger.Debug("SyncStoresSkus") isManageIt := isAsync || len(storeIDs) != 1 || len(skuIDs) == 0 || len(skuIDs) > 8 task, hint, err := v.LoopStoresMap2(ctx, db, fmt.Sprintf("同步门店商品信息:%v", storeIDs), isAsync, isManageIt, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { loopMapInfo := batchItemList[0].(*LoopStoreMapInfo) if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil { if isForce { dao.SetStoreSkuSyncStatus(db, loopMapInfo.VendorID, storeIDs, skuIDs, model.SyncFlagStoreSkuModifiedMask) } if len(loopMapInfo.StoreMapList) > 1 { var loopStoreTask tasksch.ITask if model.MultiStoresVendorMap[loopMapInfo.VendorID] == 1 { loopStoreTask = tasksch.NewSeqTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), ctx, func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { storeID := loopMapInfo.StoreMapList[step].StoreID if _, err = handler.SyncStoreSkus(ctx, task, storeID, skuIDs, false, isContinueWhenError); err != nil { globals.SugarLogger.Debugf("SyncStoresSkus failed1 store:%d failed with error:%v", storeID, err) if isContinueWhenError { err = nil } } return nil, err }, len(loopMapInfo.StoreMapList)) } else { loopStoreTask = tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError), ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { storeMap := batchItemList[0].(*model.StoreMap) if _, err = handler.SyncStoreSkus(ctx, task, storeMap.StoreID, skuIDs, false, isContinueWhenError); err != nil { globals.SugarLogger.Debugf("SyncStoresSkus failed2 store:%d failed with error:%v", storeMap.StoreID, err) } return nil, err }, loopMapInfo.StoreMapList) } t.AddChild(loopStoreTask).Run() _, err = loopStoreTask.GetResult(0) } else { _, err = handler.SyncStoreSkus(ctx, t, loopMapInfo.StoreMapList[0].StoreID, skuIDs, false, isContinueWhenError) } } return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID) }, isContinueWhenError) if task != nil { if vendorErr := partner.IsErrChangePriceFailed(task.GetOriginalErr()); vendorErr != nil { platformList := make([]string, len(task.GetDetailErrList())) for k, v := range task.GetDetailErrList() { if vendorErr := partner.IsErrVendorError(v); vendorErr != nil { platformList[k] = model.VendorChineseNames[vendorErr.VendorID()] } else { platformList[k] = "未知" } } err = fmt.Errorf("同步价格失败\n失败平台:%s", strings.Join(platformList, ",")) } else { err = makeSyncError(err) } } return hint, err } func (v *VendorSync) FullSyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) { globals.SugarLogger.Debug("FullSyncStoresSkus") hint, err = v.LoopStoresMap(ctx, db, fmt.Sprintf("初始化门店商品信息:%v", storeIDs), isAsync, true, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { loopMapInfo := batchItemList[0].(*LoopStoreMapInfo) if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil { if len(loopMapInfo.StoreMapList) > 1 { var loopStoreTask tasksch.ITask if model.MultiStoresVendorMap[loopMapInfo.VendorID] == 1 { loopStoreTask = tasksch.NewSeqTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), ctx, func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { storeID := loopMapInfo.StoreMapList[step].StoreID if _, err = handler.FullSyncStoreSkus(ctx, task, storeID, false, isContinueWhenError); err != nil { globals.SugarLogger.Debugf("FullSyncStoresSkus failed1 store:%d failed with error:%v", storeID, err) if isContinueWhenError { err = nil } } return nil, err }, len(loopMapInfo.StoreMapList)) } else { loopStoreTask = tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError), ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { storeMap := batchItemList[0].(*model.StoreMap) if _, err = handler.FullSyncStoreSkus(ctx, task, storeMap.StoreID, false, isContinueWhenError); err != nil { globals.SugarLogger.Debugf("FullSyncStoresSkus failed2 store:%d failed with error:%v", storeMap.StoreID, err) } return nil, err }, loopMapInfo.StoreMapList) } t.AddChild(loopStoreTask).Run() _, err = loopStoreTask.GetResult(0) } else { _, err = handler.FullSyncStoreSkus(ctx, t, loopMapInfo.StoreMapList[0].StoreID, false, isContinueWhenError) } } return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID) }, isContinueWhenError) return hint, makeSyncError(err) } func (v *VendorSync) DeleteRemoteStoreSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) { globals.SugarLogger.Debug("DeleteRemoteStoreSkus") hint, err = v.LoopStoresMap(ctx, db, fmt.Sprintf("删除远程门店商品信息:%v", storeIDs), isAsync, true, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { loopMapInfo := batchItemList[0].(*LoopStoreMapInfo) if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil { if len(loopMapInfo.StoreMapList) > 1 { loopStoreTask := tasksch.NewSeqTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), ctx, func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { storeID := loopMapInfo.StoreMapList[step].StoreID _, err = handler.DeleteRemoteStoreSkus(ctx, task, storeID, false, isContinueWhenError) return nil, err }, len(loopMapInfo.StoreMapList)) t.AddChild(loopStoreTask).Run() _, err = loopStoreTask.GetResult(0) } else { _, err = handler.DeleteRemoteStoreSkus(ctx, t, loopMapInfo.StoreMapList[0].StoreID, false, isContinueWhenError) } } return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID) }, isContinueWhenError) return hint, makeSyncError(err) } // 将平台有,但本地没有的门店商品清除 // todo,京东到家也应该支持 func (v *VendorSync) PruneMissingStoreSkus(ctx *jxcontext.Context, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) { globals.SugarLogger.Debug("PruneMissingStoreSkus") hint, err = v.LoopStoresMap(ctx, dao.GetDB(), fmt.Sprintf("删除远程无关联的门店商品信息:%v", storeIDs), isAsync, true, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { loopMapInfo := batchItemList[0].(*LoopStoreMapInfo) if handler, _ := v.GetStoreHandler(loopMapInfo.VendorID).(partner.ISingleStoreHandler); handler != nil { if len(loopMapInfo.StoreMapList) > 1 { loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError), ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { storeMap := batchItemList[0].(*model.StoreMap) storeID := storeMap.StoreID _, err = handler.PruneMissingStoreSkus(ctx, task, storeID, false, isContinueWhenError) return nil, err }, loopMapInfo.StoreMapList) t.AddChild(loopStoreTask).Run() _, err = loopStoreTask.GetResult(0) } else { _, err = handler.PruneMissingStoreSkus(ctx, t, loopMapInfo.StoreMapList[0].StoreID, false, isContinueWhenError) } } return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID) }, isContinueWhenError) return hint, makeSyncError(err) } func (v *VendorSync) LoopStoresMap2(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync, isManageIt bool, vendorIDs []int, storeIDs []int, handler tasksch.WorkFunc, isContinueWhenError bool) (task tasksch.ITask, hint string, err error) { var storeMapList []*model.StoreMap if storeMapList, err = dao.GetStoresMapList(db, vendorIDs, storeIDs, model.StoreStatusAll, model.StoreIsSyncYes); err != nil { return nil, "", err } if len(storeMapList) == 0 { return nil, "", nil } vendorStoreMap := make(map[int][]*model.StoreMap) for _, v := range storeMapList { vendorStoreMap[v.VendorID] = append(vendorStoreMap[v.VendorID], v) } loopInfoList := make([]*LoopStoreMapInfo, len(vendorStoreMap)) index := 0 for k, v := range vendorStoreMap { loopInfoList[index] = &LoopStoreMapInfo{ VendorID: k, StoreMapList: v, } index++ } if len(loopInfoList) == 1 { taskName = fmt.Sprintf("%s,处理平台%s", taskName, model.VendorChineseNames[loopInfoList[0].VendorID]) } task = tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, handler, loopInfoList) tasksch.HandleTask(task, nil, isManageIt).Run() if !isAsync { resultList, err2 := task.GetResult(0) if err = err2; err == nil { if len(resultList) == 0 { hint = "1" // todo 暂时这样 } else { hint = jxutils.TaskResult2Hint(resultList) } } } else { hint = task.GetID() } return task, hint, err } func (v *VendorSync) LoopStoresMap(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync, isManageIt bool, vendorIDs []int, storeIDs []int, handler tasksch.WorkFunc, isContinueWhenError bool) (hint string, err error) { _, hint, err = v.LoopStoresMap2(ctx, db, taskName, isAsync, isManageIt, vendorIDs, storeIDs, handler, isContinueWhenError) return hint, err } func (v *VendorSync) LoopMultiStoresVendors(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync bool, userName string, handler tasksch.WorkFunc) (hint string, err error) { task := tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, handler, v.MultiStoreVendorIDs) tasksch.HandleTask(task, nil, true).Run() if !isAsync { result, err2 := task.GetResult(0) if err = err2; err == nil { hint = utils.Int2Str(len(result)) } } else { hint = task.ID } return hint, makeSyncError(err) } func (v *VendorSync) RefreshAllSkusID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int, storeIDs []int) (hint string, err error) { task := tasksch.NewParallelTask("RefreshAllSkusID", nil, ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { vendorID := batchItemList[0].(int) if handler := v.GetStoreHandler(vendorID); handler != nil { if multiHandler, ok := handler.(partner.IMultipleStoresHandler); ok { _, err = multiHandler.RefreshAllSkusID(ctx, task, false) } else if singleHandler, ok := handler.(partner.ISingleStoreHandler); ok { _, err = singleHandler.RefreshStoresAllSkusID(ctx, task, false, storeIDs) } } return nil, err }, vendorIDs) tasksch.HandleTask(task, nil, true).Run() if !isAsync { _, err = task.GetResult(0) } return task.ID, err } func (v *VendorSync) RefreshAllStoresID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int) (hint string, err error) { task := tasksch.NewParallelTask("RefreshAllStoresID", nil, ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { vendorID := batchItemList[0].(int) if handler := v.GetStoreHandler(vendorID); handler != nil { _, err = handler.RefreshAllStoresID(ctx, task, false) } return nil, err }, vendorIDs) tasksch.HandleTask(task, nil, true).Run() if !isAsync { _, err = task.GetResult(0) } return task.ID, err } func makeSyncError(err error) (newErr error) { if err != nil { if _, ok := err.(*SyncError); !ok { return &SyncError{ Original: err, } } } return err } func (e *SyncError) Error() string { return fmt.Sprintf("本地数据修改成功,但同步失败,请根据错误提示处理!,同步错误信息:%s", e.Original.Error()) } func isSyncError(err error) bool { _, ok := err.(*SyncError) return ok } func (v *VendorSync) SyncSkuNames(ctx *jxcontext.Context, nameIDs []int, isForce, isAsync, isContinueWhenError bool) (hint string, err error) { db := dao.GetDB() if isForce { sql := ` UPDATE sku t1 SET t1.jd_sync_status = t1.jd_sync_status | ? WHERE t1.deleted_at = ? ` sqlParams := []interface{}{ model.SyncFlagModifiedMask, utils.DefaultTimeValue, } if len(nameIDs) > 0 { sql += " AND t1.name_id IN(" + dao.GenQuestionMarks(len(nameIDs)) + ")" sqlParams = append(sqlParams, nameIDs) } if _, err = dao.ExecuteSQL(db, sql, sqlParams...); err != nil { return "", err } } return v.SyncSkus(ctx, db, nameIDs, nil, isAsync, isContinueWhenError, ctx.GetUserName()) }