package cms import ( "errors" "fmt" "strconv" "strings" "sync" "time" "git.rosy.net.cn/baseapi/platformapi/mtpsapi" "git.rosy.net.cn/jx-callback/business/partner/putils" "git.rosy.net.cn/jx-callback/globals/api" "git.rosy.net.cn/baseapi" "git.rosy.net.cn/baseapi/platformapi/dingdingapi" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxutils" "git.rosy.net.cn/jx-callback/business/jxutils/ddmsg" "git.rosy.net.cn/jx-callback/business/jxutils/excel" "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" "git.rosy.net.cn/jx-callback/business/jxutils/tasksch" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model/dao" "git.rosy.net.cn/jx-callback/business/partner" "git.rosy.net.cn/jx-callback/globals" ) type SyncErrResult struct { SkuID int `json:"商品ID"` CategoryName string `json:"分类名"` StoreID int `json:"门店ID"` VendorName string `json:"平台名"` VendorSkuID string `json:"平台商品ID"` NameID int `json:"商品nameID"` VendorPrice int64 `json:"平台价"` SyncType string `json:"同步类型"` ErrMsg string `json:"错误信息"` } type SyncErrResultLock struct { syncErrResult []SyncErrResult locker sync.RWMutex } type LoopStoreMapInfo struct { VendorID int StoreMapList []*model.StoreMap } type VendorSync struct { } type SyncError struct { Original error `json:"original"` Message string `json:"message"` } type SpecSyncError struct { SpecErr error `json:"specErr"` } // 对于多门店平台接口的通用处理 type MultiStoreHandlerWrapper struct { partner.IMultipleStoresHandler } // 对于单门店平台接口的通用处理 type SingleStoreHandlerWrapper struct { partner.ISingleStoreHandler } var ( CurVendorSync VendorSync ) var ( ErrHaveNotImplementedYet = errors.New("还没有实现") ErrEntityNotExist = errors.New("找不到相应实体") SyncErrResultTitle = []string{ "商品ID", "分类名", "门店ID", "平台名", "平台商品ID", "商品nameID", "平台价", "同步类型", "错误信息", } syncErrResultLock SyncErrResultLock ) // func (p *MultiStoreHandlerWrapper) DeleteCategory(db *dao.DaoDB, cat *model.SkuCategory, userName string) (err error) { // if jxutils.IsEmptyID(cat.JdID) { // return nil // } // return p.IMultipleStoresHandler.DeleteCategory(db, cat, userName) // } // func (p *MultiStoreHandlerWrapper) UpdateCategory(db *dao.DaoDB, cat *model.SkuCategory, userName string) (err error) { // if jxutils.IsEmptyID(cat.JdID) { // globals.SugarLogger.Warnf("UpdateCategory fakeid cat:%s should not get here", utils.Format4Output(cat, true)) // return nil // } // return p.IMultipleStoresHandler.UpdateCategory(db, cat, userName) // } // func (p *MultiStoreHandlerWrapper) DeleteSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error) { // globals.SugarLogger.Debugf("wrapper DeleteSku, sku:%s", utils.Format4Output(sku, false)) // if jxutils.IsEmptyID(sku.JdID) { // return nil // } // return p.IMultipleStoresHandler.DeleteSku(db, sku, userName) // } // func (p *MultiStoreHandlerWrapper) UpdateSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error) { // if jxutils.IsEmptyID(sku.JdID) { // globals.SugarLogger.Warnf("UpdateSku fakeid sku:%s should not get here", utils.Format4Output(sku, true)) // return nil // } // return p.IMultipleStoresHandler.UpdateSku(db, sku, userName) // } func (v *VendorSync) GetStoreHandler(vendorID int) partner.IPurchasePlatformHandler { return partner.GetPurchasePlatformFromVendorID(vendorID) } func (v *VendorSync) GetMultiStoreHandler(vendorID int) partner.IMultipleStoresHandler { if handler, ok := v.GetStoreHandler(vendorID).(partner.IMultipleStoresHandler); ok { return handler } return nil } func (v *VendorSync) GetSingleStoreHandler(vendorID int) partner.ISingleStoreHandler { if handler, ok := v.GetStoreHandler(vendorID).(partner.ISingleStoreHandler); ok { return handler } return nil } // func (v *VendorSync) syncCategories(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorID int, db *dao.DaoDB, cats []*model.SkuCategory, userName string) (err error) { // multiStoresHandler := v.GetMultiStoreHandler(vendorID) // syncStatusFieldName := dao.GetSyncStatusStructField(model.VendorNames[vendorID]) // task := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[vendorID]), nil, ctx, // func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { // cat := batchItemList[0].(*model.SkuCategory) // updateFields := []string{syncStatusFieldName} // syncStatus := refutil.GetObjFieldByName(cat, syncStatusFieldName).(int8) // if (syncStatus & model.SyncFlagDeletedMask) != 0 { //删除 // if (syncStatus & model.SyncFlagNewMask) == 0 { // err = multiStoresHandler.DeleteCategory(db, cat, userName) // } // } else if (syncStatus & model.SyncFlagNewMask) != 0 { // 新增 // err = multiStoresHandler.CreateCategory(db, cat, userName) // updateFields = append(updateFields, dao.GetVendorThingIDStructField(model.VendorNames[multiStoresHandler.GetVendorID()])) // } else if (syncStatus & model.SyncFlagModifiedMask) != 0 { // 修改 // err = multiStoresHandler.UpdateCategory(db, cat, userName) // if intErr, ok := err.(*utils.ErrorWithCode); ok && intErr.IntCode() == -3 { // err = nil // } // } // if err == nil { // refutil.SetObjFieldByName(cat, syncStatusFieldName, int8(0)) // _, err = dao.UpdateEntity(db, cat, updateFields...) // } // return nil, err // }, cats) // tasksch.HandleTask(task, parentTask, false).Run() // _, err = task.GetResult(0) // return err // } func (v *VendorSync) SyncCategory(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) { return SyncCategories(ctx, nil, nil, nil, []int{categoryID}, isAsync) } // func (v *VendorSync) oldSyncCategory(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) { // globals.SugarLogger.Debug("SyncCategory") // hint, err = v.LoopMultiStoresVendors(ctx, db, fmt.Sprintf("同步分类信息:%d", categoryID), isAsync, false, // func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { // vendorInfo := batchItemList[0].(*MultiStoreVendorInfo) // var cats []*model.SkuCategory // cond := make(map[string]interface{}) // if categoryID > 0 { // cond[model.FieldID] = categoryID // } else { // cond[model.FieldLevel] = 1 // } // err := dao.GetEntitiesByKV(db, &cats, cond, true) // if err == nil { // err = v.syncCategories(ctx, t, vendorInfo.VendorID, db, cats, userName) // } // if err != nil || categoryID > 0 { // return nil, err // } // cond[model.FieldLevel] = 2 // err = dao.GetEntitiesByKV(db, &cats, cond, true) // if err == nil { // err = v.syncCategories(ctx, t, vendorInfo.VendorID, db, cats, userName) // } // return nil, err // }) // return "", err // } func (v *VendorSync) SyncReorderCategories(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) { return SyncReorderCategories(ctx, categoryID, isAsync) } // func (v *VendorSync) oldSyncReorderCategories(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) { // hint, err = v.LoopMultiStoresVendors(ctx, db, fmt.Sprintf("分类重排序:%d", categoryID), isAsync, false, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { // vendorInfo := batchItemList[0].(*MultiStoreVendorInfo) // multiStoresHandler := v.GetMultiStoreHandler(vendorInfo.VendorID) // err2 := multiStoresHandler.ReorderCategories(db, categoryID, userName) // if err2 == nil { // cat := &model.SkuCategory{} // _, err2 = dao.UpdateEntityByKV(db, cat, utils.Params2Map(dao.GetSyncStatusStructField(model.VendorNames[multiStoresHandler.GetVendorID()]), 0), utils.Params2Map(model.FieldParentID, categoryID)) // return nil, err2 // } // return nil, err2 // }) // return "", err // } func (v *VendorSync) SyncStore2(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs, storeIDs []int, mustDirty, isAsync bool) (hint string, err error) { globals.SugarLogger.Debugf("SyncStore2, storeIDs:%d", storeIDs) userName := ctx.GetUserName() isManageIt := len(storeIDs) == 0 || len(storeIDs) > 5 _, hint, err = v.LoopStoresMap2(ctx, nil, db, fmt.Sprintf("同步门店信息:%v", storeIDs), isAsync, isManageIt, vendorIDs, storeIDs, mustDirty, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (resultList interface{}, err error) { loopMapInfo := batchItemList[0].(*LoopStoreMapInfo) var failedList []*partner.StoreSkuInfoWithErr handler := v.GetStoreHandler(loopMapInfo.VendorID) if handler != nil { if len(loopMapInfo.StoreMapList) > 1 { loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { var resultList []interface{} var vendorStoreID string storeMap := batchItemList[0].(*model.StoreMap) db2 := db if len(loopMapInfo.StoreMapList) > 1 { db2 = dao.GetDB() } if model.IsSyncStatusNew(storeMap.SyncStatus) { if vendorStoreID, err = handler.CreateStore2(db2, storeMap.StoreID, userName); err == nil { resultList = append(resultList, 1) } else { failedList = putils.GetErrMsg2FailedSingleList(nil, err, storeMap.StoreID, model.VendorChineseNames[storeMap.VendorID], "创建门店") } } else if model.IsSyncStatusDelete(storeMap.SyncStatus) { if err = handler.DeleteStore(db2, storeMap.StoreID, userName); err == nil { resultList = append(resultList, 1) } else { failedList = putils.GetErrMsg2FailedSingleList(nil, err, storeMap.StoreID, model.VendorChineseNames[storeMap.VendorID], "删除门店") } } else { if err = handler.UpdateStore(db2, storeMap.StoreID, userName); err == nil { resultList = append(resultList, 1) } else { failedList = putils.GetErrMsg2FailedSingleList(nil, err, storeMap.StoreID, model.VendorChineseNames[storeMap.VendorID], "更新门店") } } if err == nil { if model.IsSyncStatusNew(storeMap.SyncStatus) { storeMap.VendorStoreID = vendorStoreID storeMap.SyncStatus = 0 _, err = dao.UpdateEntity(db, storeMap, "VendorStoreID", model.FieldSyncStatus) } else { storeMap.SyncStatus = 0 _, err = dao.UpdateEntity(db, storeMap, model.FieldSyncStatus) } } return resultList, err }, loopMapInfo.StoreMapList) t.AddChild(loopStoreTask).Run() resultList, err = loopStoreTask.GetResult(0) } else { var resultList []interface{} db2 := db var vendorStoreID string storeMap := loopMapInfo.StoreMapList[0] if model.IsSyncStatusNew(storeMap.SyncStatus) { if vendorStoreID, err = handler.CreateStore2(db2, storeMap.StoreID, userName); err == nil { resultList = append(resultList, 1) } else { failedList = putils.GetErrMsg2FailedSingleList(nil, err, storeMap.StoreID, model.VendorChineseNames[storeMap.VendorID], "创建门店") } } else if model.IsSyncStatusDelete(storeMap.SyncStatus) { if err = handler.DeleteStore(db2, storeMap.StoreID, userName); err == nil { resultList = append(resultList, 1) } else { failedList = putils.GetErrMsg2FailedSingleList(nil, err, storeMap.StoreID, model.VendorChineseNames[storeMap.VendorID], "删除门店") } } else { if err = handler.UpdateStore(db, storeMap.StoreID, userName); err == nil { resultList = append(resultList, 1) } else { failedList = putils.GetErrMsg2FailedSingleList(nil, err, storeMap.StoreID, model.VendorChineseNames[storeMap.VendorID], "更新门店") } } if err == nil { resultList = []interface{}{1} if model.IsSyncStatusNew(storeMap.SyncStatus) { storeMap.VendorStoreID = vendorStoreID storeMap.SyncStatus = 0 _, err = dao.UpdateEntity(db, storeMap, "VendorStoreID", model.FieldSyncStatus) } else { storeMap.SyncStatus = 0 _, err = dao.UpdateEntity(db, storeMap, model.FieldSyncStatus) } } } err = partner.AddVendorInfo2Err(err, loopMapInfo.VendorID) } if len(failedList) > 0 { t.AddFailedList(failedList) } return resultList, err }, true) return hint, makeSyncError(err) } func (v *VendorSync) SyncStore(ctx *jxcontext.Context, db *dao.DaoDB, vendorID, storeID int, isAsync bool, userName string) (hint string, err error) { var vendorIDs []int if vendorID != -1 { vendorIDs = []int{ vendorID, } } return v.SyncStore2(ctx, db, vendorIDs, []int{storeID}, false, isAsync) } func (v *VendorSync) SyncSku(ctx *jxcontext.Context, db *dao.DaoDB, nameID, skuID int, isAsync, isContinueWhenError bool, userName string) (hint string, err error) { var ( nameIDs []int skuIDs []int ) if nameID != -1 { nameIDs = []int{nameID} } if skuID != -1 { skuIDs = []int{skuID} } return v.SyncSkus(ctx, db, nameIDs, skuIDs, isAsync, isContinueWhenError, userName) } func (v *VendorSync) SyncSkus(ctx *jxcontext.Context, db *dao.DaoDB, nameIDs []int, skuIDs []int, isAsync, isContinueWhenError bool, userName string) (hint string, err error) { return SyncSkus(ctx, nil, nil, nil, nameIDs, skuIDs, isAsync) } // func (v *VendorSync) oldSyncSkus(ctx *jxcontext.Context, db *dao.DaoDB, nameIDs []int, skuIDs []int, isAsync, isContinueWhenError bool, userName string) (hint string, err error) { // globals.SugarLogger.Debugf("SyncSku trackInfo:%s, nameIDs:%v, skuIDs:%v, userName:%s", ctx.GetTrackInfo(), nameIDs, skuIDs, userName) // isManagedIt := !(len(nameIDs) > 0 && len(nameIDs) <= 2 || len(skuIDs) > 0 && len(skuIDs) < 8) // return v.LoopMultiStoresVendors(ctx, db, fmt.Sprintf("同步商品信息, nameIDs:%v, skuIDs:%v", nameIDs, skuIDs), isAsync, isManagedIt, // func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { // var resultList []interface{} // vendorInfo := batchItemList[0].(*MultiStoreVendorInfo) // multiStoresHandler := v.GetMultiStoreHandler(vendorInfo.VendorID) // syncStatusFieldName := dao.GetSyncStatusStructField(model.VendorNames[multiStoresHandler.GetVendorID()]) // dbField := dao.ConvertDBFieldPrefix(model.VendorNames[multiStoresHandler.GetVendorID()]) // skuMap := make(map[int]bool) // sql := fmt.Sprintf(` // SELECT DISTINCT t2.* // FROM sku t1 // JOIN sku_name t2 ON t2.id = t1.name_id // WHERE t1.%s_sync_status <> 0 // `, dbField) // sqlParams := []interface{}{} // if len(nameIDs) > 0 { // sql += " AND t1.name_id IN (" + dao.GenQuestionMarks(len(nameIDs)) + ")" // sqlParams = append(sqlParams, nameIDs) // } // if len(skuIDs) > 0 { // sql += " AND t1.id IN(" + dao.GenQuestionMarks(len(skuIDs)) + ")" // sqlParams = append(sqlParams, skuIDs) // } // for _, v := range skuIDs { // skuMap[v] = true // } // sql += " ORDER BY t2.id" // var skuNameList []*model.SkuName // err := dao.GetRows(db, &skuNameList, sql, sqlParams...) // if err == nil && len(skuNameList) > 0 { // // todo 同一skuName下的sku顺序处理的原因是京东SPU特殊类型必须要序列化同步才能正常处理, db可能会有多线程问题 // task := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[vendorInfo.VendorID]), tasksch.NewParallelConfig().SetParallelCount(10).SetIsContinueWhenError(isContinueWhenError), ctx, // func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { // var resultList []interface{} // skuName := batchItemList[0].(*model.SkuName) // var skuList []*model.Sku // if err = dao.GetRows(db, &skuList, fmt.Sprintf(` // SELECT * // FROM sku // WHERE name_id = ? AND %s_sync_status <> 0 // ORDER BY IF(spec_unit IN('kg', 'L'), 1000, 1) * spec_quality // `, dbField), skuName.ID); err == nil && len(skuList) > 0 { // for _, sku := range skuList { // syncStatus := refutil.GetObjFieldByName(sku, syncStatusFieldName).(int8) // globals.SugarLogger.Debugf("SyncSku trackInfo2:%s, skuID:%d, syncStatus:%d", ctx.GetTrackInfo(), sku.ID, syncStatus) // if (len(skuIDs) == 0 || skuMap[sku.ID]) && (syncStatus != 0) { // updateFields := []string{syncStatusFieldName} // if syncStatus&model.SyncFlagDeletedMask != 0 { // 删除 // if syncStatus&model.SyncFlagNewMask == 0 { // err = multiStoresHandler.DeleteSku(db, sku, userName) // } // } else if syncStatus&model.SyncFlagNewMask != 0 { // 新增 // if err = multiStoresHandler.CreateSku(db, sku, userName); err == nil { // var tmpStruct struct { // MaxIndex int // } // // todo hard code 得到京东spu中sku的顺序(以方便以后修改销售属性),这个必须要每次重新从数据库取 // if dao.GetRow(db, &tmpStruct, "SELECT MAX(sku_index) max_index FROM sku WHERE name_id = ? AND jd_id > 0 AND jd_id < 4024012631406 ", sku.NameID) == nil { // sku.SkuIndex = tmpStruct.MaxIndex + 1 // updateFields = append(updateFields, "SkuIndex") // } // updateFields = append(updateFields, dao.GetVendorThingIDStructField(model.VendorNames[multiStoresHandler.GetVendorID()])) // } // } else if syncStatus&model.SyncFlagModifiedMask != 0 { // 修改 // err = multiStoresHandler.UpdateSku(db, sku, userName) // } // if err == nil { // refutil.SetObjFieldByName(sku, syncStatusFieldName, int8(0)) // if _, err = dao.UpdateEntity(db, sku, updateFields...); err != nil { // break // } else { // resultList = append(resultList, 1) // } // } // } // } // } // if err == nil { // refutil.SetObjFieldByName(skuName, syncStatusFieldName, int8(0)) // _, err = dao.UpdateEntity(db, skuName, syncStatusFieldName) // } // return resultList, err // }, skuNameList) // t.AddChild(task).Run() // result, err2 := task.GetResult(0) // if err = err2; err == nil { // resultList = append(resultList, result...) // } // } // return resultList, err // }) // } func (v *VendorSync) SyncStoresCategory(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isForce, isAsync, isContinueWhenError bool) (hint string, err error) { globals.SugarLogger.Debug("SyncStoresCategory") isManageIt := len(storeIDs) != 1 hint, err = v.LoopStoresMap(ctx, db, fmt.Sprintf("同步门店分类信息:%v", storeIDs), isAsync, isManageIt, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { loopMapInfo := batchItemList[0].(*LoopStoreMapInfo) if handler := v.GetSingleStoreHandler(loopMapInfo.VendorID); handler != nil { if isForce { dao.SetStoreCategorySyncStatus(db, loopMapInfo.VendorID, storeIDs, nil, model.SyncFlagModifiedMask) } if len(loopMapInfo.StoreMapList) > 1 { loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), tasksch.NewParallelConfig().SetParallelCount(5).SetIsContinueWhenError(isContinueWhenError), ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { storeMap := batchItemList[0].(*model.StoreMap) _, err = SyncStoreCategories(ctx, task, storeMap.VendorID, storeMap.StoreID, storeMap.VendorStoreID, nil, nil, false, isContinueWhenError) return nil, err }, loopMapInfo.StoreMapList) t.AddChild(loopStoreTask).Run() _, err = loopStoreTask.GetResult(0) } else { storeMap := loopMapInfo.StoreMapList[0] _, err = SyncStoreCategories(ctx, t, storeMap.VendorID, storeMap.StoreID, storeMap.VendorStoreID, nil, nil, false, isContinueWhenError) } } return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID) }, isContinueWhenError) return hint, makeSyncError(err) } // func (v *VendorSync) SyncStoresSkus2(ctx *jxcontext.Context, parentTask tasksch.ITask, causeFlag int, db *dao.DaoDB, vendorIDs []int, storeIDs []int, syncDisabled bool, skuIDs, excludeSkuIDs []int, setSyncStatus int, isAsync, isContinueWhenError bool) (hint string, err error) { globals.SugarLogger.Debug("SyncStoresSkus2") isManageIt := len(storeIDs) != 1 || len(skuIDs) == 0 || len(skuIDs) > 8 task, hint, err := v.LoopStoresMap2(ctx, parentTask, db, fmt.Sprintf("同步门店商品信息:%v", storeIDs), isAsync, isManageIt, vendorIDs, storeIDs, false, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { loopMapInfo := batchItemList[0].(*LoopStoreMapInfo) if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil { parallelCount := 5 if model.MultiStoresVendorMap[loopMapInfo.VendorID] == 1 { parallelCount = 2 } loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), tasksch.NewParallelConfig().SetParallelCount(parallelCount).SetIsContinueWhenError(isContinueWhenError), ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { storeMap := batchItemList[0].(*model.StoreMap) if syncDisabled || storeMap.Status > model.StoreStatusDisabled { if setSyncStatus != 0 { dao.SetStoreSkuSyncStatus(db, storeMap.VendorID, []int{storeMap.StoreID}, skuIDs, setSyncStatus) } if _, err = SyncStoreSkuNew(ctx, task, causeFlag, storeMap.VendorID, storeMap.StoreID, storeMap.VendorStoreID, nil, skuIDs, excludeSkuIDs, false, isContinueWhenError); err != nil { globals.SugarLogger.Debugf("SyncStoresSkus2 failed2 store:%d failed with error:%v", storeMap.StoreID, err) } } return nil, err }, loopMapInfo.StoreMapList) t.AddChild(loopStoreTask).Run() _, err = loopStoreTask.GetResult(0) } return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID) }, isContinueWhenError) if task != nil { err = makeSyncError(err) } return hint, err } func (v *VendorSync) SyncStoresSkus(ctx *jxcontext.Context, parentTask tasksch.ITask, causeFlag int, db *dao.DaoDB, vendorIDs []int, storeIDs []int, skuIDs []int, isForce, isAsync, isContinueWhenError bool) (hint string, err error) { setSyncStatus := 0 if isForce { setSyncStatus = model.SyncFlagStoreSkuModifiedMask } return v.SyncStoresSkus2(ctx, parentTask, causeFlag, db, vendorIDs, storeIDs, true, skuIDs, nil, setSyncStatus, isAsync, isContinueWhenError) } func (v *VendorSync) FullSyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, syncDisabled bool, excludeSkuIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) { globals.SugarLogger.Debug("FullSyncStoresSkus") hint, err = v.LoopStoresMap(ctx, db, fmt.Sprintf("初始化门店商品信息:%v", storeIDs), isAsync, true, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { loopMapInfo := batchItemList[0].(*LoopStoreMapInfo) if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil { parallelCount := 5 if model.MultiStoresVendorMap[loopMapInfo.VendorID] == 1 { parallelCount = 1 } loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), tasksch.NewParallelConfig().SetParallelCount(parallelCount).SetIsContinueWhenError(isContinueWhenError), ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { storeMap := batchItemList[0].(*model.StoreMap) if syncDisabled || storeMap.Status > model.StoreStatusDisabled { if _, err = FullSyncStoreSkuNew(ctx, task, storeMap.VendorID, storeMap.StoreID, storeMap.VendorStoreID, excludeSkuIDs, false, isContinueWhenError); err != nil { globals.SugarLogger.Debugf("FullSyncStoresSkus failed2 store:%d failed with error:%v", storeMap.StoreID, err) } } return nil, err }, loopMapInfo.StoreMapList) t.AddChild(loopStoreTask).Run() _, err = loopStoreTask.GetResult(0) } return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID) }, isContinueWhenError) return hint, makeSyncError(err) } func (v *VendorSync) DeleteRemoteStoreSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) { globals.SugarLogger.Debug("DeleteRemoteStoreSkus") hint, err = v.LoopStoresMap(ctx, db, fmt.Sprintf("删除远程门店商品信息:%v", storeIDs), isAsync, true, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { loopMapInfo := batchItemList[0].(*LoopStoreMapInfo) if len(loopMapInfo.StoreMapList) > 1 { loopStoreTask := tasksch.NewSeqTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), ctx, func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { storeMap := loopMapInfo.StoreMapList[step] _, err = ClearRemoteStoreStuffAndSetNew(ctx, task, storeMap.VendorID, storeMap.StoreID, storeMap.VendorStoreID, false, isContinueWhenError) return nil, err }, len(loopMapInfo.StoreMapList)) t.AddChild(loopStoreTask).Run() _, err = loopStoreTask.GetResult(0) } else { _, err = ClearRemoteStoreStuffAndSetNew(ctx, t, loopMapInfo.StoreMapList[0].VendorID, loopMapInfo.StoreMapList[0].StoreID, loopMapInfo.StoreMapList[0].VendorStoreID, false, isContinueWhenError) } return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID) }, isContinueWhenError) return hint, makeSyncError(err) } // 将平台有,但本地没有的门店商品清除 // todo,京东到家也应该支持 func (v *VendorSync) PruneMissingStoreSkus(ctx *jxcontext.Context, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) { globals.SugarLogger.Debug("PruneMissingStoreSkus") hint, err = v.LoopStoresMap(ctx, dao.GetDB(), fmt.Sprintf("删除远程无关联的门店商品信息:%v", storeIDs), isAsync, true, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { loopMapInfo := batchItemList[0].(*LoopStoreMapInfo) if len(loopMapInfo.StoreMapList) > 1 { loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetParallelCount(5), ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { storeMap := batchItemList[0].(*model.StoreMap) _, err = PruneMissingStoreSkus(ctx, task, loopMapInfo.VendorID, storeMap.StoreID, storeMap.VendorStoreID, false, isContinueWhenError) return nil, err }, loopMapInfo.StoreMapList) t.AddChild(loopStoreTask).Run() _, err = loopStoreTask.GetResult(0) } else { _, err = PruneMissingStoreSkus(ctx, t, loopMapInfo.VendorID, loopMapInfo.StoreMapList[0].StoreID, loopMapInfo.StoreMapList[0].VendorStoreID, false, isContinueWhenError) } return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID) }, isContinueWhenError) return hint, makeSyncError(err) } // 把京西有,平台无且没有待创建标记的商品加上待创建标记 // todo,京东到家也应该支持 func (v *VendorSync) AddCreateFlagForJxStoreSku(ctx *jxcontext.Context, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) { globals.SugarLogger.Debug("AddCreateFlagForJxStoreSku") hint, err = v.LoopStoresMap(ctx, dao.GetDB(), fmt.Sprintf("处理京西有,平台无且没有待创建标记的商品加上待创建标记:%v", storeIDs), isAsync, true, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { loopMapInfo := batchItemList[0].(*LoopStoreMapInfo) if len(loopMapInfo.StoreMapList) > 1 { loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetParallelCount(5), ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { storeMap := batchItemList[0].(*model.StoreMap) _, err = AddCreateFlagForJxStoreSku(ctx, task, loopMapInfo.VendorID, storeMap.StoreID, storeMap.VendorStoreID, false, isContinueWhenError) return nil, err }, loopMapInfo.StoreMapList) t.AddChild(loopStoreTask).Run() _, err = loopStoreTask.GetResult(0) } else { _, err = AddCreateFlagForJxStoreSku(ctx, t, loopMapInfo.VendorID, loopMapInfo.StoreMapList[0].StoreID, loopMapInfo.StoreMapList[0].VendorStoreID, false, isContinueWhenError) } return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID) }, isContinueWhenError) return hint, makeSyncError(err) } func (v *VendorSync) AmendAndPruneStoreStuff(ctx *jxcontext.Context, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool, optType int, isForceUpdate bool) (hint string, err error) { globals.SugarLogger.Debug("AmendAndPruneStoreStuff") hint, err = v.LoopStoresMap(ctx, dao.GetDB(), fmt.Sprintf("处理京西平台商家分类与商品差异:%v", storeIDs), isAsync, true, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { loopMapInfo := batchItemList[0].(*LoopStoreMapInfo) if len(loopMapInfo.StoreMapList) > 1 { loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetParallelCount(5), ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { storeMap := batchItemList[0].(*model.StoreMap) _, err = amendAndPruneStoreStuff(ctx, task, loopMapInfo.VendorID, storeMap.StoreID, storeMap.VendorStoreID, false, isContinueWhenError, optType, isForceUpdate) return nil, err }, loopMapInfo.StoreMapList) t.AddChild(loopStoreTask).Run() _, err = loopStoreTask.GetResult(0) } else { _, err = amendAndPruneStoreStuff(ctx, t, loopMapInfo.VendorID, loopMapInfo.StoreMapList[0].StoreID, loopMapInfo.StoreMapList[0].VendorStoreID, false, isContinueWhenError, optType, isForceUpdate) } return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID) }, isContinueWhenError) return hint, makeSyncError(err) } func (v *VendorSync) LoopStoresMap2(ctx *jxcontext.Context, parentTask tasksch.ITask, db *dao.DaoDB, taskName string, isAsync, isManageIt bool, vendorIDs []int, storeIDs []int, mustDirty bool, handler tasksch.WorkFunc, isContinueWhenError bool) (task tasksch.ITask, hint string, err error) { var storeMapList []*model.StoreMap if storeMapList, err = dao.GetStoresMapList2(db, vendorIDs, storeIDs, nil, model.StoreStatusAll, model.StoreIsSyncYes, "", "", mustDirty); err != nil { return nil, "", err } if len(storeMapList) == 0 { return nil, "", nil } vendorStoreMap := make(map[int][]*model.StoreMap) for _, v := range storeMapList { vendorStoreMap[v.VendorID] = append(vendorStoreMap[v.VendorID], v) } loopInfoList := make([]*LoopStoreMapInfo, len(vendorStoreMap)) index := 0 for k, v := range vendorStoreMap { loopInfoList[index] = &LoopStoreMapInfo{ VendorID: k, StoreMapList: v, } index++ } if len(loopInfoList) == 1 { taskName = fmt.Sprintf("%s,处理平台%s", taskName, model.VendorChineseNames[loopInfoList[0].VendorID]) } // 临时把京东的并发改为2 task = tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, handler, loopInfoList) if isAsync { buildSetFinishHook(task, ctx) } tasksch.HandleTask(task, parentTask, isManageIt).Run() if !isAsync { resultList, err2 := task.GetResult(0) if len(task.GetFailedList()) > 0 { err2 = buildErrMsg(task) err = err2 } if err = err2; err == nil { if len(resultList) == 0 { hint = "1" // todo 暂时这样 } else { hint = jxutils.TaskResult2Hint(resultList) } } } else { hint = task.GetID() } return task, hint, err } func buildSetFinishHook(task tasksch.ITask, ctx *jxcontext.Context) { task.SetFinishHook(func(task tasksch.ITask) { var noticeMsg = "您此次的同步任务错误详情返回如下: \n" if ctx.GetUserName() != "jxadmin" { if len(task.GetFailedList()) > 10 { downloadURL, _, _ := WirteToExcelBySyncFailed(task) noticeMsg += fmt.Sprintf("[详情点我]%s/billshow/?normal=true&path=%s \n", globals.BackstageHost, downloadURL) } else if len(task.GetFailedList()) > 0 && len(task.GetFailedList()) <= 10 { if task.GetErr() != nil { noticeMsg += utils.Format4Output(buildErrMsgJson(task), true) } } else { noticeMsg = "您的同步任务执行完成,没有错误返回。" } if authInfo, err := ctx.GetV2AuthInfo(); err == nil { ddmsg.SendUserMessage(dingdingapi.MsgTyeText, authInfo.UserID, "同步错误返回", noticeMsg) } else { globals.SugarLogger.Debugf("同步错误发送钉钉消息失败, authinfo [%v] , [%v]", *authInfo, err) } } // else { // if len(task.GetFailedList()) > 1 { // if time.Now().Hour() >= 20 || time.Now().Hour() < 7 { // downloadURL, _, _ := WirteToExcelBySyncFailed(task) // user, err := dao.GetUserByID(dao.GetDB(), "mobile", "18160030913") // noticeMsg += fmt.Sprintf("[详情点我]%s/billshow/?normal=true&path=%s \n", globals.BackstageHost, downloadURL) // if user != nil && err == nil { // ddmsg.SendUserMessage(dingdingapi.MsgTyeText, user.UserID, "同步错误返回", noticeMsg) // } // } // } // } }) } func buildErrMsg(task tasksch.ITask) (err error) { err = fmt.Errorf(utils.Format4Output(buildErrMsgJson(task), true)) return makeSpecSyncError(err) } func buildErrMsgJson(task tasksch.ITask) (resultL []*SyncErrResult) { failedList := task.GetFailedList() for _, v := range failedList { for _, vv := range v.([]*partner.StoreSkuInfoWithErr) { result := &SyncErrResult{ SkuID: 0, StoreID: vv.StoreID, CategoryName: vv.CategoryName, VendorName: vv.VendoreName, VendorSkuID: "", NameID: 0, VendorPrice: 0, SyncType: vv.SyncType, ErrMsg: vv.ErrMsg, } if vv.StoreSkuInfo != nil { result.SkuID = vv.StoreSkuInfo.SkuID result.VendorSkuID = vv.StoreSkuInfo.VendorSkuID result.NameID = vv.StoreSkuInfo.NameID result.VendorPrice = vv.StoreSkuInfo.VendorPrice } resultL = append(resultL, result) } } return resultL } func (v *VendorSync) LoopStoresMap(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync, isManageIt bool, vendorIDs []int, storeIDs []int, handler tasksch.WorkFunc, isContinueWhenError bool) (hint string, err error) { _, hint, err = v.LoopStoresMap2(ctx, nil, db, taskName, isAsync, isManageIt, vendorIDs, storeIDs, false, handler, isContinueWhenError) return hint, err } func (v *VendorSync) LoopMultiStoresVendors(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync bool, isManageIt bool, handler tasksch.WorkFunc) (hint string, err error) { task := tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, handler, getMultiStoreVendorInfoList()) tasksch.HandleTask(task, nil, isManageIt).Run() if !isAsync { result, err2 := task.GetResult(0) if err = err2; err == nil { hint = utils.Int2Str(len(result)) } } else { hint = task.ID } return hint, makeSyncError(err) } // func (v *VendorSync) RefreshAllSkusID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int, storeIDs []int) (hint string, err error) { // task := tasksch.NewParallelTask("RefreshAllSkusID", nil, ctx, // func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { // vendorID := batchItemList[0].(int) // if handler := v.GetStoreHandler(vendorID); handler != nil { // if multiHandler, ok := handler.(partner.IMultipleStoresHandler); ok { // _, err = multiHandler.RefreshAllSkusID(ctx, task, false) // } else if singleHandler, ok := handler.(partner.ISingleStoreHandler); ok { // _, err = singleHandler.RefreshStoresAllSkusID(ctx, task, false, storeIDs) // } // } // return nil, err // }, vendorIDs) // tasksch.HandleTask(task, nil, true).Run() // if !isAsync { // _, err = task.GetResult(0) // } // return task.ID, err // } func (v *VendorSync) RefreshAllStoresID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int) (hint string, err error) { task := tasksch.NewParallelTask("RefreshAllStoresID", nil, ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { vendorID := batchItemList[0].(int) if handler := v.GetStoreHandler(vendorID); handler != nil { _, err = handler.RefreshAllStoresID(ctx, task, false) } return nil, err }, vendorIDs) tasksch.HandleTask(task, nil, true).Run() if !isAsync { _, err = task.GetResult(0) } return task.ID, err } func makeSyncError(err error) (newErr error) { if err != nil { if _, ok := err.(*SyncError); !ok { return &SyncError{ Original: err, } } } return err } func makeSpecSyncError(err error) (newErr error) { if err != nil { if _, ok := err.(*SpecSyncError); !ok { return &SpecSyncError{ SpecErr: err, } } } return err } func (e *SpecSyncError) Error() string { return e.SpecErr.Error() } func (e *SyncError) Error() string { return fmt.Sprintf("本地数据修改成功,但同步失败,请根据错误提示处理!,同步错误信息:%s", e.Original.Error()) } func isSyncError(err error) bool { _, ok := err.(*SyncError) return ok } func (v *VendorSync) SyncSkuNames(ctx *jxcontext.Context, nameIDs []int, isForce, isAsync, isContinueWhenError bool) (hint string, err error) { return SyncSkus(ctx, nil, nil, nil, nameIDs, nil, isAsync) } func (v *VendorSync) oldSyncSkuNames(ctx *jxcontext.Context, nameIDs []int, isForce, isAsync, isContinueWhenError bool) (hint string, err error) { db := dao.GetDB() if isForce { dao.SetSkuNameSyncStatus(db, nil, nil, nameIDs, model.SyncFlagModifiedMask) } return v.SyncSkus(ctx, db, nameIDs, nil, isAsync, isContinueWhenError, ctx.GetUserName()) } func (v *VendorSync) ChangeStoreSkuSaleStatus(ctx *jxcontext.Context, storeID int, isAsync, isContinueWhenError bool) (err error) { var ( storeIDs []int skuIDs []int ) db := dao.GetDB() storeSkuList, err := dao.GetStoresSkusInfoBySaleTime(db, storeID) if len(storeSkuList) < 1 || err != nil { return nil } for _, v := range storeSkuList { storeIDs = append(storeIDs, v.StoreID) skuIDs = append(skuIDs, v.SkuID) } vendorIDs := partner.GetPurchasePlatformVendorIDs() dao.UpdateStoreSkuBindSyncStatusForSaleStatus(db, vendorIDs, storeID) v.SyncStoresSkus(ctx, nil, model.SyncFlagSaleMask, db, vendorIDs, storeIDs, skuIDs, false, isAsync, isContinueWhenError) return err } func GetTimeMixByInt(begin1, end1, begin2, end2 int16) (beginAt, endAt int16) { if (begin1 > begin2 && begin1 > end2) || (begin2 > end1 && end2 > end1) { return 0, 0 } if begin1 > begin2 { beginAt = begin1 if end1 > end2 { endAt = end2 } else { endAt = end1 } } else { beginAt = begin2 if end1 > end2 { endAt = end2 } else { endAt = end1 } } return beginAt, endAt } func WirteToExcelBySyncFailed(task tasksch.ITask) (downloadURL, fileName string, err error) { var ( sheetList1 []*excel.Obj2ExcelSheetConfig ) syncErrResultLock.syncErrResult = syncErrResultLock.syncErrResult[0:0] list := buildErrMsgJson(task) for _, v := range list { syncErrResultLock.AppendData(*v) } excelConf1 := &excel.Obj2ExcelSheetConfig{ Title: "同步错误", Data: syncErrResultLock.syncErrResult, CaptionList: SyncErrResultTitle, } sheetList1 = append(sheetList1, excelConf1) if excelConf1 != nil { downloadURL, fileName, err = jxutils.UploadExeclAndPushMsg(sheetList1, time.Now().Format("200601021504")+"同步错误返回") baseapi.SugarLogger.Debug("WriteToExcel: download is [%v]", downloadURL) } else { baseapi.SugarLogger.Debug("WriteToExcel: dataSuccess is nil!") } if err != nil { baseapi.SugarLogger.Errorf("WriteToExcel:upload %s , failed error:%v", fileName, err) } return downloadURL, fileName, err } func (d *SyncErrResultLock) AppendData(syncErrResult SyncErrResult) { d.locker.Lock() defer d.locker.Unlock() d.syncErrResult = append(d.syncErrResult, syncErrResult) } func (v *VendorSync) SyncStoreSkusFromYb(ctx *jxcontext.Context, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) { var ( vendorID = model.VendorIDYB ) hint, err = v.LoopStoresMap(ctx, dao.GetDB(), fmt.Sprintf("同步银豹到京西:%v", storeIDs), isAsync, true, []int{vendorID}, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { loopMapInfo := batchItemList[0].(*LoopStoreMapInfo) for _, v := range loopMapInfo.StoreMapList { hint, err = syncStoreSkusFromYb(ctx, v.StoreID, vendorID, v.VendorStoreID, isAsync, isContinueWhenError) } return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID) }, isContinueWhenError) return hint, err } func syncStoreSkusFromYb(ctx *jxcontext.Context, storeID, vendorID int, vendorStoreID string, isAsync, isContinueWhenError bool) (hint string, err error) { var ( db = dao.GetDB() localSkuMap = make(map[string]*dao.StoreSkuSyncInfo) vendorSkuMap = make(map[string]*partner.SkuNameInfo) // skuBindInfosDel []*StoreSkuBindInfo // skuBindInfosUpt []*StoreSkuBindInfo addList []*partner.SkuNameInfo updateList []*partner.SkuNameInfo deleteList []*dao.StoreSkuSyncInfo ) handler, _ := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler) localSkuList, err := dao.GetStoreSkus2(db, vendorID, storeID, nil, false) if err != nil { return "", err } for _, v := range localSkuList { localSkuMap[v.VendorSkuID] = v } remoteSkuList, err := handler.GetStoreSkusFullInfo(ctx, nil, storeID, vendorStoreID, nil) if err != nil { return "", err } for _, v := range remoteSkuList { if localSkuMap[v.SkuList[0].VendorSkuID] == nil { if len(v.YbBarCode) > 7 { addList = append(addList, v) } } else { updateList = append(updateList, v) } vendorSkuMap[v.SkuList[0].VendorSkuID] = v } for _, v := range localSkuList { if vendorSkuMap[v.VendorSkuID] == nil { deleteList = append(deleteList, v) } } fmt.Println("remoteSkuList", len(remoteSkuList)) fmt.Println("addList", len(addList)) fmt.Println("updateList", utils.Format4Output(updateList, false)) fmt.Println("deleteList", utils.Format4Output(deleteList, false)) // taskSeqFunc := func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { // store, _ := dao.GetStoreDetail(db, storeID, vendorID) // switch step { // case 0: // if len(addList) > 0 { // taskFunc := func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { // var ( // v = batchItemList[0].(*partner.SkuNameInfo) // upc = v.YbBarCode // ) // err = AddSkuNameByUpc(ctx, upc, store, v) // if err != nil { // task.AddFailedList(putils.GetErrMsg2FailedSingleList(nil, err, storeID, model.VendorChineseNames[vendorID], "根据upc创建京西商品")) // } // return retVal, err // } // taskParallel := tasksch.NewParallelTask("创建商品", tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, taskFunc, addList) // tasksch.HandleTask(taskParallel, task, true).Run() // _, err = taskParallel.GetResult(0) // } // case 1: // if len(deleteList) > 0 { // taskFunc := func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { // var ( // v = batchItemList[0].(*dao.StoreSkuSyncInfo) // ) // skuBindInfo := &StoreSkuBindInfo{ // NameID: v.NameID, // IsFocus: -1, // } // retVal = []*StoreSkuBindInfo{skuBindInfo} // return retVal, err // } // taskParallel := tasksch.NewParallelTask("删除商品", tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, taskFunc, deleteList) // tasksch.HandleTask(taskParallel, task, true).Run() // resultDel, _ := taskParallel.GetResult(0) // for _, v := range resultDel { // skuBindInfosDel = append(skuBindInfosDel, v.(*StoreSkuBindInfo)) // } // _, err = updateStoresSkusWithoutSync(ctx, db, []int{storeID}, skuBindInfosDel, false, false) // } // case 2: // if len(updateList) > 0 { // taskFunc := func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { // var ( // v = batchItemList[0].(*partner.SkuNameInfo) // skuBindInfo = &StoreSkuBindInfo{} // storeSkus []*dao.StoreSkuExt // pricePercentagePack []*model.PricePercentageItem // ) // sql := ` // SELECT a.*, c.id name_id // FROM store_sku_bind a // JOIN sku b ON a.sku_id = b.id // JOIN sku_name c ON c.id = b.name_id // WHERE a.store_id = ? AND a.yb_id = ? AND a.deleted_at = ? // ` // sqlParams := []interface{}{storeID, v.SkuList[0].VendorSkuID, utils.DefaultTimeValue} // err = dao.GetRows(db, &storeSkus, sql, sqlParams) // if len(storeSkus) > 0 { // if storeSkus[0].YbPrice != int(v.SkuList[0].VendorPrice) { // err = jxutils.Strings2Objs(store.PricePercentagePackStr, &pricePercentagePack) // skuBindInfo.UnitPrice = jxutils.CaculateJxPriceByPricePack(pricePercentagePack, 0, int(v.SkuList[0].VendorPrice)) // } // } else { // return retVal, fmt.Errorf("未查询到门店商品,yb_id [%v]", v.SkuList[0].VendorSkuID) // } // if v.SkuList[0].Stock < 1 { // skuBindInfo.IsSale = model.StoreSkuBindStatusDontSale // } else { // skuBindInfo.IsSale = model.StoreSkuBindStatusNormal // } // skuBindInfo.NameID = storeSkus[0].NameID // retVal = []*StoreSkuBindInfo{skuBindInfo} // return retVal, err // } // taskParallel := tasksch.NewParallelTask("更新商品价格和库存", tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, taskFunc, updateList) // tasksch.HandleTask(taskParallel, task, true).Run() // resultUpt, _ := taskParallel.GetResult(0) // for _, v := range resultUpt { // skuBindInfosUpt = append(skuBindInfosUpt, v.(*StoreSkuBindInfo)) // } // _, err = updateStoresSkusWithoutSync(ctx, db, []int{storeID}, skuBindInfosUpt, false, false) // } // case 3: // _, err = CurVendorSync.SyncStoresSkus2(jxcontext.AdminCtx, nil, 0, db, []int{0, 1, 3}, nil, false, nil, nil, 0, true, true) // } // return result, err // } // taskSeq := tasksch.NewSeqTask2("同步银豹商品到京西", ctx, true, taskSeqFunc, 3) // tasksch.HandleTask(taskSeq, nil, true).Run() // hint = taskSeq.GetID() return hint, err } func (v *VendorSync) SyncJdsStoresSkus(ctx *jxcontext.Context, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) { var ( db = dao.GetDB() ) storeSkus, _ := dao.GetStoresSkusInfo(db, []int{model.JdShopMainStoreID}, nil) _, hint, err = v.LoopStoresMap2(ctx, nil, db, fmt.Sprintf("同步京东商城门店的可售信息:%v", storeIDs), isAsync, true, []int{model.VendorIDJDShop}, storeIDs, false, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { loopMapInfo := batchItemList[0].(*LoopStoreMapInfo) if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil { for _, storeMap := range loopMapInfo.StoreMapList { if storeMap.Status > model.StoreStatusDisabled && storeMap.StoreID != model.JdShopMainStoreID && storeMap.SyncRule != 0 { err = syncJdsStoresSkus(ctx, db, t, storeMap, isAsync, isContinueWhenError) } err = syncJdsStoreStock(ctx, db, t, storeSkus, storeMap, isAsync, isContinueWhenError) } } return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID) }, isContinueWhenError) return hint, err } func syncJdsStoreStock(ctx *jxcontext.Context, db *dao.DaoDB, parentTask tasksch.ITask, storeSkus []*model.StoreSkuBind, storeMap *model.StoreMap, isAsync, isContinueWhenError bool) (err error) { // storeMaps, err := dao.GetStoresMapList(db, []int{model.VendorIDJDShop}, nil, nil, model.StoreStatusAll, model.StoreIsSyncAll, "", "") // if err != nil { // return err // } // storeSkus, err := dao.GetStoresSkusInfo(db, []int{model.JdShopMainStoreID}, nil) // if err != nil { // return err // } // for _, storeMap := range storeMaps { task := tasksch.NewParallelTask("syncJdsStoreStock", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError), ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { storeSku := batchItemList[0].(*model.StoreSkuBind) stock := 0 storeSku2, _ := dao.GetStoresSkusInfo(db, []int{storeMap.StoreID}, []int{storeSku.SkuID}) if storeSku.JdsID != 0 { if len(storeSku2) > 0 { if storeSku2[0].Status == model.StoreSkuBindStatusNormal && storeSku.Status == model.StoreSkuBindStatusNormal { stock = 9999 } if storeMap.VendorStoreID != "" { err = api.JdShopAPI.UpdateSkuSiteStock(storeSku.JdsID, stock, utils.Str2Int(storeMap.VendorStoreID)) } } else { err = api.JdShopAPI.UpdateSkuSiteStock(storeSku.JdsID, 0, utils.Str2Int(storeMap.VendorStoreID)) } } return retVal, err }, storeSkus) tasksch.HandleTask(task, parentTask, true).Run() _, err = task.GetResult(0) // } return err } func syncJdsStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, parentTask tasksch.ITask, storeMap *model.StoreMap, isAsync, isContinueWhenError bool) (err error) { globals.SugarLogger.Debugf("syncJdsStoresSkus") var ( mainSkusMap = make(map[int][]*dao.StoreSkuSyncInfo) skusMap = make(map[int][]*dao.StoreSkuSyncInfo) updateList []*dao.StoreSkuSyncInfo addList []*dao.StoreSkuSyncInfo skuBindInfos1 []*StoreSkuBindInfo skuBindInfos2 []*StoreSkuBindInfo ) storeSkusMain, err := dao.GetStoreSkusByNameIDs(db, []int{model.JdShopMainStoreID}, 0) for _, v := range storeSkusMain { mainSkusMap[v.NameID] = append(mainSkusMap[v.NameID], v) } storeSkus, err := dao.GetStoreSkusByNameIDs(db, []int{storeMap.StoreID}, 0) for _, v := range storeSkus { skusMap[v.NameID] = append(skusMap[v.NameID], v) } for k, v := range skusMap { if mainSkusMap[k] != nil { flag := false for _, storeSku := range v { if storeSku.StoreSkuStatus == model.StoreSkuBindStatusNormal { flag = true } } if !flag { continue } for _, storeSku := range v { for _, storeSkuMain := range mainSkusMap[k] { if storeSkuMain.StoreSkuStatus == model.StoreSkuBindStatusNormal && storeSku.StoreSkuStatus == model.StoreSkuBindStatusDontSale && storeSkuMain.SkuID == storeSku.SkuID { updateList = append(updateList, storeSkuMain) } } } } } for k, v := range mainSkusMap { if skusMap[k] == nil { if storeMap.SyncRule == 2 { for _, storeSkuMain := range v { addList = append(addList, storeSkuMain) } } } } // fmt.Println("updateList", utils.Format4Output(updateList, false)) // fmt.Println("addList", utils.Format4Output(addList, false)) if len(updateList) > 0 { for _, v := range updateList { skuBindInfos1 = append(skuBindInfos1, buildStoreSkuBindInfo(db, storeMap.StoreID, v, false)) } UpdateStoresSkusByBind(ctx, parentTask, skuBindInfos1, isAsync, isContinueWhenError, false) } if len(addList) > 0 { for _, v := range addList { skuBindInfos2 = append(skuBindInfos2, buildStoreSkuBindInfo(db, storeMap.StoreID, v, true)) } UpdateStoresSkusByBind(ctx, parentTask, skuBindInfos2, isAsync, isContinueWhenError, false) } return err } func buildStoreSkuBindInfo(db *dao.DaoDB, storeID int, storeBind *dao.StoreSkuSyncInfo, isFocus bool) (skuBindInfo *StoreSkuBindInfo) { skus := []*StoreSkuBindSkuInfo{ &StoreSkuBindSkuInfo{ SkuID: storeBind.SkuID, }, } skuBindInfo = &StoreSkuBindInfo{ StoreID: storeID, NameID: storeBind.NameID, } if isFocus { skuBindInfo.IsFocus = 1 } if storeBind.StoreSkuStatus == model.SkuStatusNormal { skus[0].IsSale = 1 } else { skus[0].IsSale = -1 } skuBindInfo.Skus = skus return skuBindInfo } func SyncSkuExperfixAndWatermark(ctx *jxcontext.Context) (err error) { var ( db = dao.GetDB() ) skuExinfos, err := dao.GetSkuExinfos(db, nil, []int{model.VendorIDMTWM, model.VendorIDEBAI, model.VendorIDJD, model.VendorIDJDShop}, "", utils.ZeroTimeValue, utils.ZeroTimeValue) task := tasksch.NewParallelTask("SyncSkuExperfixAndWatermark", tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { skuExinfo := batchItemList[0].(*model.SkuExinfoMap) if utils.Time2Date(time.Now().Add(6*time.Hour)).Sub(skuExinfo.EndAt) > 0 { skuExinfo.DeletedAt = time.Now() dao.UpdateEntity(db, skuExinfo, "DeletedAt") goto setModifiyFlag } if utils.Time2Date(time.Now().Add(6*time.Hour)).Sub(skuExinfo.BeginAt) == 0 { goto setModifiyFlag } setModifiyFlag: { skus, _ := dao.GetSkus(db, nil, []int{skuExinfo.NameID}, nil, nil, nil) var skuIDs []int for _, v := range skus { skuIDs = append(skuIDs, v.ID) } if partner.IsMultiStore(skuExinfo.VendorID) { for _, v := range skuIDs { OnUpdateThing(ctx, db, nil, int64(v), model.ThingTypeSku) } } else { if len(skuIDs) > 0 { SetStoreSkuSyncStatus2(db, nil, []int{skuExinfo.VendorID}, skuIDs, model.SyncFlagModifiedMask) } } } return retVal, err }, skuExinfos) tasksch.HandleTask(task, nil, true).Run() _, err = task.GetResult(0) return err } func SetMTPSStatus(ctx *jxcontext.Context, storeId, courierStatus int) { globals.SugarLogger.Debug("SetMTPSStatus is start ") globals.SugarLogger.Debug("StoreId", "CourierStatus", storeId, courierStatus) /*获取美团门店信息*/ //&& CourierStatus != 0 if storeId != 0 { var ShopName string StoreLists, _ := dao.GetStoreList(nil, []int{storeId}, nil, nil, nil, "") ShopName = StoreLists[0].Name StoreInfoList := new(mtpsapi.GetStoreStatusResultAll) StoreInfoList, _ = api.MtpsAPI.GetStoreStatus(ShopName) /*通过名字找到了,正常情况*/ if StoreInfoList != nil && StoreInfoList.DataList != nil { goto ifExist } else { /*如果通过名字找不到,那就先通过ID去找名字*/ ShopInfo, _ := api.MtpsAPI.ShopQuery(strconv.Itoa(storeId)) if ShopInfo != nil { ShopName = ShopInfo.ShopName if ShopName == "" { ShopName = StoreLists[0].Name[:len(StoreLists[0].Name)-3] } StoreInfoList, _ = api.MtpsAPI.GetStoreStatus(ShopName) if StoreInfoList != nil && StoreInfoList.DataList != nil { goto ifExist } else { /*如果通过API返回的名字也找不到*/ goto ifNotExist } } else { /*如果通过ID找不到,那就直接去判断名字*/ goto NameProblem } NameProblem: { var NotOpen int if strings.Contains(StoreLists[0].Name, "不做") || strings.Contains(StoreLists[0].Name, "不想做") { NotOpen = strings.Index(StoreLists[0].Name, "-") if strings.Index(StoreLists[0].Name, "-") > 0 { StoreLists[0].Name = StoreLists[0].Name[:NotOpen-1] } NotOpen = strings.Index(StoreLists[0].Name, "不") if strings.Index(StoreLists[0].Name, "不") > 0 { StoreLists[0].Name = StoreLists[0].Name[:NotOpen-1] } NotOpen = strings.Index(StoreLists[0].Name, "(") if strings.Index(StoreLists[0].Name, "(") > 0 { StoreLists[0].Name = StoreLists[0].Name[:NotOpen-1] } NotOpen = strings.Index(StoreLists[0].Name, "(") if strings.Index(StoreLists[0].Name, "(") > 0 { StoreLists[0].Name = StoreLists[0].Name[:NotOpen-1] } NotOpen = strings.Index(StoreLists[0].Name, " ") if NotOpen > 0 { StoreLists[0].Name = StoreLists[0].Name[:NotOpen-1] } ShopName = StoreLists[0].Name StoreInfoList, _ = api.MtpsAPI.GetStoreStatus(ShopName) if StoreInfoList != nil && StoreInfoList.DataList != nil { goto ifExist } else { goto ifNotExist } } } } ifNotExist: { sl := make(map[string]interface{}) sl["vendorStatus"] = 0 globals.SugarLogger.Debug("因为没找到被修改配送状态的VendorStoreID是", ShopName, storeId) UpdateStoreCourierMap(ctx, nil, storeId, model.VendorIDMTPS, sl, ctx.GetUserName()) return } ifExist: { if StoreInfoList.DataList[0].OuterPoiID != "" { //若存在且名字不为空,就是找到了 // if StoreInfoList.DataList[0].OpenType != courierStatus ||{ sl := make(map[string]interface{}) sl["vendorStoreID"] = StoreInfoList.DataList[0].OuterPoiID sl["status"] = StoreInfoList.DataList[0].OpenType sl["vendorStatus"] = StoreInfoList.DataList[0].OpenType globals.SugarLogger.Debugf("被修改配送状态的VendorStoreID是:%s,名称是:%s,美团状态是:%s,本地状态是:%s", StoreInfoList.DataList[0].OuterPoiID, StoreInfoList.DataList[0].PoiName, strconv.Itoa(StoreInfoList.DataList[0].OpenType), strconv.Itoa(StoreLists[0].Status)) UpdateStoreCourierMap(ctx, nil, storeId, model.VendorIDMTPS, sl, ctx.GetUserName()) // } } } } else { StoreInfoList, _ := api.MtpsAPI.GetStoreStatusAll() StoreInfoList2 := make(map[string]string) for _, store := range StoreInfoList { for _, data := range store.DataList { StoreInfoList2[data.OuterPoiID] = data.PoiName } } db := dao.GetDB() /*比较营业状态*/ /*把获取的京西状态和名称存一下*/ StoreCourierList, _ := dao.GetStoreCourierList(db, []int{}, model.StoreStatusAll, model.StoreStatusAll) /*循环美团*/ for _, StoreInfoList1 := range StoreInfoList { for _, StoreInfoList11 := range StoreInfoList1.DataList { /*循环京西*/ for _, StoreCourierList1 := range StoreCourierList { /*只比较美团*/ if StoreCourierList1.VendorID != model.VendorIDMTPS { continue } /*如果门店ID相同的时候进入判断,一个门店只用判断一次就行*/ if StoreCourierList1.VendorStoreID == StoreInfoList11.OuterPoiID { if StoreCourierList1.Status != StoreInfoList11.OpenType { sl := make(map[string]interface{}) sl["vendorStoreID"] = StoreInfoList11.OuterPoiID sl["status"] = StoreInfoList11.OpenType sl["vendorStatus"] = StoreInfoList11.OpenType globals.SugarLogger.Debugf("被修改配送状态的VendorStoreID是:%s,名称是:%s,美团状态是:%s,本地状态是:%s", StoreInfoList11.OuterPoiID, StoreInfoList11.PoiName, strconv.Itoa(StoreInfoList11.OpenType), strconv.Itoa(StoreCourierList1.Status)) UpdateStoreCourierMap(ctx, nil, StoreCourierList1.StoreID, StoreCourierList1.VendorID, sl, ctx.GetUserName()) break } } } } } /* 美团配送的门店是否存在,调用美团配送的api(有可能接了),查询京西门店对应的美团配送门店是否存在,若不存在则要在京西这边解绑美团配送门店 怎么解绑可以在网页上门店管理那点一下看看调的什么接口,传的什么参数*/ /*获取所有门店信息*/ //test: for _, StoreCourierList1 := range StoreCourierList { diff := false StoreLists, _ := dao.GetStoreList(db, []int{StoreCourierList1.StoreID}, nil, nil, nil, "") if StoreLists == nil { globals.SugarLogger.Debugf("StoreID为:%s,在store表未找到", StoreCourierList1.StoreID) continue } if StoreCourierList1.VendorID != model.VendorIDMTPS || StoreCourierList1.VendorStoreID == "" { continue } if StoreCourierList1.Status == model.StoreStatusDisabled || StoreCourierList1.Status == model.StoreStatusClosed { continue } /*京西不为空,容错*/ //if { /*调用API获取美团的商店信息*/ MTPSInfo := new(mtpsapi.ShopInfo) MTPSInfo, _ = api.MtpsAPI.ShopQuery(StoreCourierList1.VendorStoreID) if MTPSInfo == nil { globals.SugarLogger.Debug("美团未找到该门店," + StoreLists[0].Name + strconv.Itoa(StoreCourierList1.StoreID) + " 被解绑,关联的ID为:" + StoreCourierList1.VendorStoreID) diff = true } if MTPSInfo != nil && MTPSInfo.ShopName == "" { MTPSInfo.ShopName = StoreInfoList2[MTPSInfo.ShopID] } if MTPSInfo != nil && MTPSInfo.ShopLng != StoreCourierList1.Lng && MTPSInfo.ShopLat == StoreCourierList1.Lat { /*平台上但是坐标不同,解绑*/ globals.SugarLogger.Debug("商店与美团配送上的坐标不同," + StoreLists[0].Name + strconv.Itoa(StoreCourierList1.StoreID) + " 被解绑,关联的ID为:" + StoreCourierList1.VendorStoreID) if _, err := DeleteStoreCourierMap(ctx, db, StoreCourierList1.StoreID, StoreCourierList1.VendorID, ctx.GetUserName()); err != nil { globals.SugarLogger.Debug(err.Error()) return } diff = true } if diff { if _, err := DeleteStoreCourierMap(ctx, db, StoreCourierList1.StoreID, StoreCourierList1.VendorID, ctx.GetUserName()); err != nil { globals.SugarLogger.Debug(err.Error()) return } //break test } } } globals.SugarLogger.Debug("SetMTPSStatus is Complete") }