package cms import ( "errors" "fmt" "reflect" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxutils" "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" "git.rosy.net.cn/jx-callback/business/jxutils/tasksch" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model/dao" "git.rosy.net.cn/jx-callback/business/partner" "git.rosy.net.cn/jx-callback/globals" "git.rosy.net.cn/jx-callback/globals/api" ) type LoopStoreMapInfo struct { VendorID int StoreMapList []*model.StoreMap } type VendorSync struct { MultiStoreVendorIDs []int SingleStoreVendorIDs []int PurchaseHandlers map[int]partner.IPurchasePlatformHandler } type SyncError struct { Original error `json:"original"` Message string `json:"message"` } // 对于多门店平台接口的通用处理 type MultiStoreHandlerWrapper struct { partner.IMultipleStoresHandler } // 对于单门店平台接口的通用处理 type SingleStoreHandlerWrapper struct { partner.ISingleStoreHandler } var ( CurVendorSync VendorSync ) var ( ErrHaveNotImplementedYet = errors.New("还没有实现") ErrEntityNotExist = errors.New("找不到相应实体") ) func Init() { apiMap := map[int]interface{}{ model.VendorIDJD: api.JdAPI, model.VendorIDELM: api.ElmAPI, model.VendorIDEBAI: api.EbaiAPI, model.VendorIDMTWM: api.MtwmAPI, } CurVendorSync.PurchaseHandlers = make(map[int]partner.IPurchasePlatformHandler) for k, v := range partner.PurchasePlatformHandlers { if !reflect.ValueOf(apiMap[k]).IsNil() { if multiHandler, ok := v.(partner.IMultipleStoresHandler); ok { CurVendorSync.MultiStoreVendorIDs = append(CurVendorSync.MultiStoreVendorIDs, k) CurVendorSync.PurchaseHandlers[k] = &MultiStoreHandlerWrapper{ IMultipleStoresHandler: multiHandler, } } else if singleHandler, ok := v.(partner.ISingleStoreHandler); ok { CurVendorSync.SingleStoreVendorIDs = append(CurVendorSync.SingleStoreVendorIDs, k) CurVendorSync.PurchaseHandlers[k] = &SingleStoreHandlerWrapper{ ISingleStoreHandler: singleHandler, } } else { panic(fmt.Sprintf("platform:%d type is wrong!", k)) } } } } func (p *MultiStoreHandlerWrapper) DeleteCategory(db *dao.DaoDB, cat *model.SkuCategory, userName string) (err error) { if jxutils.IsFakeID(cat.JdID) { return nil } return p.IMultipleStoresHandler.DeleteCategory(db, cat, userName) } func (p *MultiStoreHandlerWrapper) UpdateCategory(db *dao.DaoDB, cat *model.SkuCategory, userName string) (err error) { if jxutils.IsFakeID(cat.JdID) { globals.SugarLogger.Warnf("UpdateCategory fakeid cat:%s should not get here", utils.Format4Output(cat, true)) return nil } return p.IMultipleStoresHandler.UpdateCategory(db, cat, userName) } func (p *MultiStoreHandlerWrapper) DeleteSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error) { globals.SugarLogger.Debugf("wrapper DeleteSku, sku:%s", utils.Format4Output(sku, false)) if jxutils.IsFakeID(sku.JdID) { return nil } return p.IMultipleStoresHandler.DeleteSku(db, sku, userName) } func (p *MultiStoreHandlerWrapper) UpdateSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error) { if jxutils.IsFakeID(sku.JdID) { globals.SugarLogger.Warnf("UpdateSku fakeid sku:%s should not get here", utils.Format4Output(sku, true)) return nil } return p.IMultipleStoresHandler.UpdateSku(db, sku, userName) } func (v *VendorSync) GetStoreHandler(vendorID int) partner.IPurchasePlatformHandler { return v.PurchaseHandlers[vendorID] } func (v *VendorSync) GetMultiStoreHandler(vendorID int) partner.IMultipleStoresHandler { if handler, ok := v.PurchaseHandlers[vendorID].(partner.IMultipleStoresHandler); ok { return handler } return nil } func (v *VendorSync) GetSingleStoreHandler(vendorID int) partner.ISingleStoreHandler { if handler, ok := v.PurchaseHandlers[vendorID].(partner.ISingleStoreHandler); ok { return handler } return nil } func (v *VendorSync) syncCategories(ctx *jxcontext.Context, parentTask tasksch.ITask, multiStoresHandler partner.IMultipleStoresHandler, db *dao.DaoDB, cats []*model.SkuCategory, userName string) (err error) { syncStatusFieldName := dao.GetSyncStatusStructField(model.VendorNames[multiStoresHandler.GetVendorID()]) task := tasksch.NewParallelTask("syncCategories", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { cat := batchItemList[0].(*model.SkuCategory) updateFields := []string{syncStatusFieldName} syncStatus := jxutils.GetObjFieldByName(cat, syncStatusFieldName).(int8) if (syncStatus & model.SyncFlagDeletedMask) != 0 { //删除 err = multiStoresHandler.DeleteCategory(db, cat, userName) } else if (syncStatus & model.SyncFlagNewMask) != 0 { // 新增 err = multiStoresHandler.CreateCategory(db, cat, userName) updateFields = append(updateFields, dao.GetVendorThingIDStructField(model.VendorNames[multiStoresHandler.GetVendorID()])) } else if (syncStatus & model.SyncFlagModifiedMask) != 0 { // 修改 err = multiStoresHandler.UpdateCategory(db, cat, userName) } if err == nil { jxutils.SetObjFieldByName(cat, syncStatusFieldName, int8(0)) _, err = dao.UpdateEntity(db, cat, updateFields...) } return nil, err }, cats) ctx.SetTaskOrAddChild(task, parentTask) task.Run() _, err = task.GetResult(0) return err } func (v *VendorSync) SyncCategory(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) { globals.SugarLogger.Debug(v.MultiStoreVendorIDs) hint, err = v.LoopMultiStoresVendors(ctx, db, "SyncCategory", isAsync, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { multiStoresHandler := v.GetMultiStoreHandler(batchItemList[0].(int)) var cats []*model.SkuCategory cond := make(map[string]interface{}) if categoryID > 0 { cond[model.FieldID] = categoryID } else { cond[model.FieldLevel] = 1 } err := dao.GetEntitiesByKV(db, &cats, cond, true) if err == nil { err = v.syncCategories(ctx, t, multiStoresHandler, db, cats, userName) } if err != nil || categoryID > 0 { return nil, err } cond[model.FieldLevel] = 2 err = dao.GetEntitiesByKV(db, &cats, cond, true) if err == nil { err = v.syncCategories(ctx, t, multiStoresHandler, db, cats, userName) } return nil, err }) return "", err } func (v *VendorSync) SyncReorderCategories(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) { hint, err = v.LoopMultiStoresVendors(ctx, db, "SyncReorderCategories", isAsync, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { multiStoresHandler := v.GetMultiStoreHandler(batchItemList[0].(int)) err2 := multiStoresHandler.ReorderCategories(db, categoryID, userName) if err2 == nil { cat := &model.SkuCategory{} _, err2 = dao.UpdateEntityByKV(db, cat, utils.Params2Map(dao.GetSyncStatusStructField(model.VendorNames[multiStoresHandler.GetVendorID()]), 0), utils.Params2Map(model.FieldParentID, categoryID)) return nil, err2 } return nil, err2 }) return "", err } func (v *VendorSync) SyncStore(ctx *jxcontext.Context, db *dao.DaoDB, vendorID, storeID int, isAsync bool, userName string) (hint string, err error) { globals.SugarLogger.Debugf("SyncStore, storeID:%d", storeID) var vendorIDs []int if vendorID != -1 { vendorIDs = []int{ vendorID, } } hint, err = v.LoopStoresMap(ctx, db, "SyncStore", isAsync, vendorIDs, []int{storeID}, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { loopMapInfo := batchItemList[0].(*LoopStoreMapInfo) handler := v.GetStoreHandler(loopMapInfo.VendorID) if len(loopMapInfo.StoreMapList) > 1 { loopStoreTask := tasksch.NewParallelTask("SyncStore loop stores", nil, ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { storeMap := batchItemList[0].(*model.StoreMap) if err = handler.UpdateStore(db, storeMap.StoreID, userName); err == nil { storeMap.SyncStatus = 0 _, err = dao.UpdateEntity(db, storeMap, model.FieldSyncStatus) } return nil, err }, loopMapInfo.StoreMapList) t.AddChild(loopStoreTask).Run() _, err = loopStoreTask.GetResult(0) return nil, err } storeMap := loopMapInfo.StoreMapList[0] if err = handler.UpdateStore(db, storeMap.StoreID, userName); err == nil { storeMap.SyncStatus = 0 _, err = dao.UpdateEntity(db, storeMap, model.FieldSyncStatus) } return nil, err }) return hint, err } func (v *VendorSync) SyncSku(ctx *jxcontext.Context, db *dao.DaoDB, nameID, skuID int, isAsync bool, userName string) (hint string, err error) { globals.SugarLogger.Debugf("SyncSku, nameID:%d, skuID:%d, userName:%s", nameID, skuID, userName) return v.LoopMultiStoresVendors(ctx, db, "SyncSku", isAsync, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { multiStoresHandler := v.GetMultiStoreHandler(batchItemList[0].(int)) syncStatusFieldName := dao.GetSyncStatusStructField(model.VendorNames[multiStoresHandler.GetVendorID()]) var skuList []*model.Sku cond := make(map[string]interface{}) if nameID != -1 { cond[model.FieldNameID] = nameID } if skuID != -1 { cond[model.FieldID] = skuID } err := dao.GetEntitiesByKV(db, &skuList, cond, true) if err == nil && len(skuList) > 0 { // globals.SugarLogger.Debug(utils.Format4Output(skuList, false)) // todo 这里SetParallelCount(1)的原因是京东SPU特殊类型必须要序列化同步才能正常处理, db可能会有多线程问题 task := tasksch.NewParallelTask("SyncSku loop sku", tasksch.NewParallelConfig().SetParallelCount(1), userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { sku := batchItemList[0].(*model.Sku) syncStatus := jxutils.GetObjFieldByName(sku, syncStatusFieldName).(int8) if (skuID == -1 || skuID == sku.ID) && (syncStatus != 0) { updateFields := []string{syncStatusFieldName} if syncStatus&model.SyncFlagDeletedMask != 0 { // 删除 err = multiStoresHandler.DeleteSku(db, sku, userName) } else if syncStatus&model.SyncFlagNewMask != 0 { // 新增 err = multiStoresHandler.CreateSku(db, sku, userName) updateFields = append(updateFields, dao.GetVendorThingIDStructField(model.VendorNames[multiStoresHandler.GetVendorID()])) } else if syncStatus&model.SyncFlagModifiedMask != 0 { // 修改 err = multiStoresHandler.UpdateSku(db, sku, userName) } if err == nil { jxutils.SetObjFieldByName(sku, syncStatusFieldName, int8(0)) dao.UpdateEntity(db, sku, updateFields...) } } return nil, err }, skuList) t.AddChild(task).Run() _, err = task.GetResult(0) } return nil, err }) } func (v *VendorSync) SyncStoresCategory(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isAsync bool) (hint string, err error) { globals.SugarLogger.Debug("SyncStoresCategory") return v.LoopStoresMap(ctx, db, "SyncStoresCategory", isAsync, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { loopMapInfo := batchItemList[0].(*LoopStoreMapInfo) if handler := v.GetSingleStoreHandler(loopMapInfo.VendorID); handler != nil { if len(loopMapInfo.StoreMapList) > 1 { loopStoreTask := tasksch.NewSeqTask("SyncStoresCategory loop stores", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { storeID := loopMapInfo.StoreMapList[step].StoreID _, err = handler.SyncStoreCategory(ctx, task, storeID, false) return nil, err }, len(loopMapInfo.StoreMapList)) t.AddChild(loopStoreTask).Run() _, err = loopStoreTask.GetResult(0) return nil, err } _, err = handler.SyncStoreCategory(ctx, t, loopMapInfo.StoreMapList[0].StoreID, false) } return nil, err }) } // func (v *VendorSync) SyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, skuIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) { globals.SugarLogger.Debug("SyncStoresSkus") return v.LoopStoresMap(ctx, db, "SyncStoresSkus顶层", isAsync, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { loopMapInfo := batchItemList[0].(*LoopStoreMapInfo) if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil { if len(loopMapInfo.StoreMapList) > 1 { loopStoreTask := tasksch.NewSeqTask("SyncStoresSkus相同平台循环门店", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { storeID := loopMapInfo.StoreMapList[step].StoreID _, err = handler.SyncStoreSkus(ctx, task, storeID, skuIDs, false, isContinueWhenError) return nil, err }, len(loopMapInfo.StoreMapList)) t.AddChild(loopStoreTask).Run() _, err = loopStoreTask.GetResult(0) return nil, err } _, err = handler.SyncStoreSkus(ctx, t, loopMapInfo.StoreMapList[0].StoreID, skuIDs, false, isContinueWhenError) } return nil, err }) } func (v *VendorSync) FullSyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) { globals.SugarLogger.Debug("FullSyncStoresSkus") return v.LoopStoresMap(ctx, db, "FullSyncStoresSkus顶层", isAsync, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { loopMapInfo := batchItemList[0].(*LoopStoreMapInfo) if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil { if len(loopMapInfo.StoreMapList) > 1 { loopStoreTask := tasksch.NewSeqTask("FullSyncStoresSkus相同平台循环门店", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { storeID := loopMapInfo.StoreMapList[step].StoreID _, err = handler.FullSyncStoreSkus(ctx, task, storeID, false, isContinueWhenError) return nil, err }, len(loopMapInfo.StoreMapList)) t.AddChild(loopStoreTask).Run() _, err = loopStoreTask.GetResult(0) return nil, err } _, err = handler.FullSyncStoreSkus(ctx, t, loopMapInfo.StoreMapList[0].StoreID, false, isContinueWhenError) } return nil, err }) } func (v *VendorSync) LoopStoresMap(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync bool, vendorIDs []int, storeIDs []int, handler tasksch.WorkFunc) (hint string, err error) { sql := ` SELECT t1.* FROM store_map t1 WHERE t1.is_sync = 1 AND t1.deleted_at = ? ` sqlParams := []interface{}{ utils.DefaultTimeValue, } if len(vendorIDs) > 0 { sql += " AND t1.vendor_id IN (" + dao.GenQuestionMarks(len(vendorIDs)) + ")" sqlParams = append(sqlParams, vendorIDs) } if len(storeIDs) > 0 { sql += " AND t1.store_id IN (" + dao.GenQuestionMarks(len(storeIDs)) + ")" sqlParams = append(sqlParams, storeIDs) } sql += " ORDER BY t1.store_id, t1.vendor_id" var storeMapList []*model.StoreMap if err = dao.GetRows(db, &storeMapList, sql, sqlParams...); err != nil { return "", err } if len(storeMapList) == 0 { return "", nil } vendorStoreMap := make(map[int][]*model.StoreMap) for _, v := range storeMapList { vendorStoreMap[v.VendorID] = append(vendorStoreMap[v.VendorID], v) } loopInfoList := make([]*LoopStoreMapInfo, len(vendorStoreMap)) index := 0 for k, v := range vendorStoreMap { loopInfoList[index] = &LoopStoreMapInfo{ VendorID: k, StoreMapList: v, } index++ } task := tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx.GetUserName(), handler, loopInfoList) ctx.SetTaskOrAddChild(task, nil) tasksch.ManageTask(task).Run() if !isAsync { _, err = task.GetResult(0) } return task.ID, makeSyncError(err) } func (v *VendorSync) LoopMultiStoresVendors(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync bool, userName string, handler tasksch.WorkFunc) (hint string, err error) { task := tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), userName, handler, v.MultiStoreVendorIDs) ctx.SetTaskOrAddChild(task, nil) tasksch.ManageTask(task).Run() if !isAsync { _, err = task.GetResult(0) } return task.ID, makeSyncError(err) } // func (v *VendorSync) LoopStoreVendors(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, taskName string, isAsync bool, userName string, handler tasksch.WorkFunc) (hint string, err error) { // if taskName == "" { // taskName = "LoopStoreVendors" // } // var vendorIDMap map[int]int // if len(vendorIDs) != 0 { // vendorIDMap = make(map[int]int) // for _, v := range vendorIDs { // vendorIDMap[v] = 1 // } // } // allHandlers := make([]int, len(v.MultiStoreVendorIDs)+len(v.SingleStoreVendorIDs)) // copy(allHandlers, v.MultiStoreVendorIDs) // copy(allHandlers[len(v.MultiStoreVendorIDs):], v.SingleStoreVendorIDs) // if vendorIDMap != nil { // count := 0 // for _, v := range allHandlers { // if vendorIDMap[v] == 1 { // allHandlers[count] = v // count++ // } // } // allHandlers = allHandlers[:count] // } // task := tasksch.NewParallelTask(taskName, nil, userName, handler, allHandlers) // ctx.SetTaskOrAddChild(task, nil) // tasksch.ManageTask(task).Run() // if !isAsync { // _, err = task.GetResult(0) // } // return task.ID, err // } func (v *VendorSync) RefreshAllSkusID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int, storeIDs []int) (hint string, err error) { task := tasksch.NewParallelTask("RefreshAllSkusID", nil, ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { vendorID := batchItemList[0].(int) if handler := v.GetStoreHandler(vendorID); handler != nil { if multiHandler, ok := handler.(partner.IMultipleStoresHandler); ok { _, err = multiHandler.RefreshAllSkusID(ctx, task, false) } else if singleHandler, ok := handler.(partner.ISingleStoreHandler); ok { _, err = singleHandler.RefreshStoresAllSkusID(ctx, task, false, storeIDs) } } return nil, err }, vendorIDs) ctx.SetTaskOrAddChild(task, nil) tasksch.ManageTask(task).Run() if !isAsync { _, err = task.GetResult(0) } return task.ID, err } func (v *VendorSync) RefreshAllStoresID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int) (hint string, err error) { task := tasksch.NewParallelTask("RefreshAllStoresID", nil, ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { vendorID := batchItemList[0].(int) if handler := v.GetStoreHandler(vendorID); handler != nil { _, err = handler.RefreshAllStoresID(ctx, task, false) } return nil, err }, vendorIDs) ctx.SetTaskOrAddChild(task, nil) tasksch.ManageTask(task).Run() if !isAsync { _, err = task.GetResult(0) } return task.ID, err } func makeSyncError(err error) (newErr error) { if err != nil { return &SyncError{ Original: err, } } return err } func (e *SyncError) Error() string { return fmt.Sprintf("本地数据修改成功,但同步失败,请根据错误提示处理!,同步错误信息:%s", e.Original.Error()) } func isSyncError(err error) bool { _, ok := err.(*SyncError) return ok }