Files
jx-callback/business/jxstore/cms/sync.go
2020-07-03 08:38:17 +08:00

1366 lines
58 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package cms
import (
"errors"
"fmt"
"strconv"
"sync"
"time"
"git.rosy.net.cn/baseapi/platformapi/mtpsapi"
"git.rosy.net.cn/jx-callback/business/partner/putils"
"git.rosy.net.cn/jx-callback/globals/api"
"git.rosy.net.cn/baseapi"
"git.rosy.net.cn/baseapi/platformapi/dingdingapi"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxutils"
"git.rosy.net.cn/jx-callback/business/jxutils/ddmsg"
"git.rosy.net.cn/jx-callback/business/jxutils/excel"
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
"git.rosy.net.cn/jx-callback/business/jxutils/tasksch"
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/model/dao"
"git.rosy.net.cn/jx-callback/business/partner"
"git.rosy.net.cn/jx-callback/globals"
)
type SyncErrResult struct {
SkuID int `json:"商品ID"`
CategoryName string `json:"分类名"`
StoreID int `json:"门店ID"`
VendorName string `json:"平台名"`
VendorSkuID string `json:"平台商品ID"`
NameID int `json:"商品nameID"`
VendorPrice int64 `json:"平台价"`
SyncType string `json:"同步类型"`
ErrMsg string `json:"错误信息"`
}
type SyncErrResultLock struct {
syncErrResult []SyncErrResult
locker sync.RWMutex
}
type LoopStoreMapInfo struct {
VendorID int
StoreMapList []*model.StoreMap
}
type VendorSync struct {
}
type SyncError struct {
Original error `json:"original"`
Message string `json:"message"`
}
type SpecSyncError struct {
SpecErr error `json:"specErr"`
}
// 对于多门店平台接口的通用处理
type MultiStoreHandlerWrapper struct {
partner.IMultipleStoresHandler
}
// 对于单门店平台接口的通用处理
type SingleStoreHandlerWrapper struct {
partner.ISingleStoreHandler
}
var (
CurVendorSync VendorSync
)
var (
ErrHaveNotImplementedYet = errors.New("还没有实现")
ErrEntityNotExist = errors.New("找不到相应实体")
SyncErrResultTitle = []string{
"商品ID",
"分类名",
"门店ID",
"平台名",
"平台商品ID",
"商品nameID",
"平台价",
"同步类型",
"错误信息",
}
syncErrResultLock SyncErrResultLock
)
// func (p *MultiStoreHandlerWrapper) DeleteCategory(db *dao.DaoDB, cat *model.SkuCategory, userName string) (err error) {
// if jxutils.IsEmptyID(cat.JdID) {
// return nil
// }
// return p.IMultipleStoresHandler.DeleteCategory(db, cat, userName)
// }
// func (p *MultiStoreHandlerWrapper) UpdateCategory(db *dao.DaoDB, cat *model.SkuCategory, userName string) (err error) {
// if jxutils.IsEmptyID(cat.JdID) {
// globals.SugarLogger.Warnf("UpdateCategory fakeid cat:%s should not get here", utils.Format4Output(cat, true))
// return nil
// }
// return p.IMultipleStoresHandler.UpdateCategory(db, cat, userName)
// }
// func (p *MultiStoreHandlerWrapper) DeleteSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error) {
// globals.SugarLogger.Debugf("wrapper DeleteSku, sku:%s", utils.Format4Output(sku, false))
// if jxutils.IsEmptyID(sku.JdID) {
// return nil
// }
// return p.IMultipleStoresHandler.DeleteSku(db, sku, userName)
// }
// func (p *MultiStoreHandlerWrapper) UpdateSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error) {
// if jxutils.IsEmptyID(sku.JdID) {
// globals.SugarLogger.Warnf("UpdateSku fakeid sku:%s should not get here", utils.Format4Output(sku, true))
// return nil
// }
// return p.IMultipleStoresHandler.UpdateSku(db, sku, userName)
// }
func (v *VendorSync) GetStoreHandler(vendorID int) partner.IPurchasePlatformHandler {
return partner.GetPurchasePlatformFromVendorID(vendorID)
}
func (v *VendorSync) GetMultiStoreHandler(vendorID int) partner.IMultipleStoresHandler {
if handler, ok := v.GetStoreHandler(vendorID).(partner.IMultipleStoresHandler); ok {
return handler
}
return nil
}
func (v *VendorSync) GetSingleStoreHandler(vendorID int) partner.ISingleStoreHandler {
if handler, ok := v.GetStoreHandler(vendorID).(partner.ISingleStoreHandler); ok {
return handler
}
return nil
}
// func (v *VendorSync) syncCategories(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorID int, db *dao.DaoDB, cats []*model.SkuCategory, userName string) (err error) {
// multiStoresHandler := v.GetMultiStoreHandler(vendorID)
// syncStatusFieldName := dao.GetSyncStatusStructField(model.VendorNames[vendorID])
// task := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[vendorID]), nil, ctx,
// func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
// cat := batchItemList[0].(*model.SkuCategory)
// updateFields := []string{syncStatusFieldName}
// syncStatus := refutil.GetObjFieldByName(cat, syncStatusFieldName).(int8)
// if (syncStatus & model.SyncFlagDeletedMask) != 0 { //删除
// if (syncStatus & model.SyncFlagNewMask) == 0 {
// err = multiStoresHandler.DeleteCategory(db, cat, userName)
// }
// } else if (syncStatus & model.SyncFlagNewMask) != 0 { // 新增
// err = multiStoresHandler.CreateCategory(db, cat, userName)
// updateFields = append(updateFields, dao.GetVendorThingIDStructField(model.VendorNames[multiStoresHandler.GetVendorID()]))
// } else if (syncStatus & model.SyncFlagModifiedMask) != 0 { // 修改
// err = multiStoresHandler.UpdateCategory(db, cat, userName)
// if intErr, ok := err.(*utils.ErrorWithCode); ok && intErr.IntCode() == -3 {
// err = nil
// }
// }
// if err == nil {
// refutil.SetObjFieldByName(cat, syncStatusFieldName, int8(0))
// _, err = dao.UpdateEntity(db, cat, updateFields...)
// }
// return nil, err
// }, cats)
// tasksch.HandleTask(task, parentTask, false).Run()
// _, err = task.GetResult(0)
// return err
// }
func (v *VendorSync) SyncCategory(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) {
return SyncCategories(ctx, nil, nil, nil, []int{categoryID}, isAsync)
}
// func (v *VendorSync) oldSyncCategory(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) {
// globals.SugarLogger.Debug("SyncCategory")
// hint, err = v.LoopMultiStoresVendors(ctx, db, fmt.Sprintf("同步分类信息:%d", categoryID), isAsync, false,
// func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
// vendorInfo := batchItemList[0].(*MultiStoreVendorInfo)
// var cats []*model.SkuCategory
// cond := make(map[string]interface{})
// if categoryID > 0 {
// cond[model.FieldID] = categoryID
// } else {
// cond[model.FieldLevel] = 1
// }
// err := dao.GetEntitiesByKV(db, &cats, cond, true)
// if err == nil {
// err = v.syncCategories(ctx, t, vendorInfo.VendorID, db, cats, userName)
// }
// if err != nil || categoryID > 0 {
// return nil, err
// }
// cond[model.FieldLevel] = 2
// err = dao.GetEntitiesByKV(db, &cats, cond, true)
// if err == nil {
// err = v.syncCategories(ctx, t, vendorInfo.VendorID, db, cats, userName)
// }
// return nil, err
// })
// return "", err
// }
func (v *VendorSync) SyncReorderCategories(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) {
return SyncReorderCategories(ctx, categoryID, isAsync)
}
// func (v *VendorSync) oldSyncReorderCategories(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) {
// hint, err = v.LoopMultiStoresVendors(ctx, db, fmt.Sprintf("分类重排序:%d", categoryID), isAsync, false, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
// vendorInfo := batchItemList[0].(*MultiStoreVendorInfo)
// multiStoresHandler := v.GetMultiStoreHandler(vendorInfo.VendorID)
// err2 := multiStoresHandler.ReorderCategories(db, categoryID, userName)
// if err2 == nil {
// cat := &model.SkuCategory{}
// _, err2 = dao.UpdateEntityByKV(db, cat, utils.Params2Map(dao.GetSyncStatusStructField(model.VendorNames[multiStoresHandler.GetVendorID()]), 0), utils.Params2Map(model.FieldParentID, categoryID))
// return nil, err2
// }
// return nil, err2
// })
// return "", err
// }
func (v *VendorSync) SyncStore2(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs, storeIDs []int, mustDirty, isAsync bool) (hint string, err error) {
globals.SugarLogger.Debugf("SyncStore2, storeIDs:%d", storeIDs)
userName := ctx.GetUserName()
isManageIt := len(storeIDs) == 0 || len(storeIDs) > 5
_, hint, err = v.LoopStoresMap2(ctx, nil, db, fmt.Sprintf("同步门店信息:%v", storeIDs), isAsync, isManageIt, vendorIDs, storeIDs, mustDirty, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (resultList interface{}, err error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
var failedList []*partner.StoreSkuInfoWithErr
handler := v.GetStoreHandler(loopMapInfo.VendorID)
if handler != nil {
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
var resultList []interface{}
var vendorStoreID string
storeMap := batchItemList[0].(*model.StoreMap)
db2 := db
if len(loopMapInfo.StoreMapList) > 1 {
db2 = dao.GetDB()
}
if model.IsSyncStatusNew(storeMap.SyncStatus) {
if vendorStoreID, err = handler.CreateStore2(db2, storeMap.StoreID, userName); err == nil {
resultList = append(resultList, 1)
} else {
failedList = putils.GetErrMsg2FailedSingleList(nil, err, storeMap.StoreID, model.VendorChineseNames[storeMap.VendorID], "创建门店")
}
} else if model.IsSyncStatusDelete(storeMap.SyncStatus) {
if err = handler.DeleteStore(db2, storeMap.StoreID, userName); err == nil {
resultList = append(resultList, 1)
} else {
failedList = putils.GetErrMsg2FailedSingleList(nil, err, storeMap.StoreID, model.VendorChineseNames[storeMap.VendorID], "删除门店")
}
} else {
if err = handler.UpdateStore(db2, storeMap.StoreID, userName); err == nil {
resultList = append(resultList, 1)
} else {
failedList = putils.GetErrMsg2FailedSingleList(nil, err, storeMap.StoreID, model.VendorChineseNames[storeMap.VendorID], "更新门店")
}
}
if err == nil {
if model.IsSyncStatusNew(storeMap.SyncStatus) {
storeMap.VendorStoreID = vendorStoreID
storeMap.SyncStatus = 0
_, err = dao.UpdateEntity(db, storeMap, "VendorStoreID", model.FieldSyncStatus)
} else {
storeMap.SyncStatus = 0
_, err = dao.UpdateEntity(db, storeMap, model.FieldSyncStatus)
}
}
return resultList, err
}, loopMapInfo.StoreMapList)
t.AddChild(loopStoreTask).Run()
resultList, err = loopStoreTask.GetResult(0)
} else {
var resultList []interface{}
db2 := db
var vendorStoreID string
storeMap := loopMapInfo.StoreMapList[0]
if model.IsSyncStatusNew(storeMap.SyncStatus) {
if vendorStoreID, err = handler.CreateStore2(db2, storeMap.StoreID, userName); err == nil {
resultList = append(resultList, 1)
} else {
failedList = putils.GetErrMsg2FailedSingleList(nil, err, storeMap.StoreID, model.VendorChineseNames[storeMap.VendorID], "创建门店")
}
} else if model.IsSyncStatusDelete(storeMap.SyncStatus) {
if err = handler.DeleteStore(db2, storeMap.StoreID, userName); err == nil {
resultList = append(resultList, 1)
} else {
failedList = putils.GetErrMsg2FailedSingleList(nil, err, storeMap.StoreID, model.VendorChineseNames[storeMap.VendorID], "删除门店")
}
} else {
if err = handler.UpdateStore(db, storeMap.StoreID, userName); err == nil {
resultList = append(resultList, 1)
} else {
failedList = putils.GetErrMsg2FailedSingleList(nil, err, storeMap.StoreID, model.VendorChineseNames[storeMap.VendorID], "更新门店")
}
}
if err == nil {
resultList = []interface{}{1}
if model.IsSyncStatusNew(storeMap.SyncStatus) {
storeMap.VendorStoreID = vendorStoreID
storeMap.SyncStatus = 0
_, err = dao.UpdateEntity(db, storeMap, "VendorStoreID", model.FieldSyncStatus)
} else {
storeMap.SyncStatus = 0
_, err = dao.UpdateEntity(db, storeMap, model.FieldSyncStatus)
}
}
}
err = partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}
if len(failedList) > 0 {
t.AddFailedList(failedList)
}
return resultList, err
}, true)
return hint, makeSyncError(err)
}
func (v *VendorSync) SyncStore(ctx *jxcontext.Context, db *dao.DaoDB, vendorID, storeID int, isAsync bool, userName string) (hint string, err error) {
var vendorIDs []int
if vendorID != -1 {
vendorIDs = []int{
vendorID,
}
}
return v.SyncStore2(ctx, db, vendorIDs, []int{storeID}, false, isAsync)
}
func (v *VendorSync) SyncSku(ctx *jxcontext.Context, db *dao.DaoDB, nameID, skuID int, isAsync, isContinueWhenError bool, userName string) (hint string, err error) {
var (
nameIDs []int
skuIDs []int
)
if nameID != -1 {
nameIDs = []int{nameID}
}
if skuID != -1 {
skuIDs = []int{skuID}
}
return v.SyncSkus(ctx, db, nameIDs, skuIDs, isAsync, isContinueWhenError, userName)
}
func (v *VendorSync) SyncSkus(ctx *jxcontext.Context, db *dao.DaoDB, nameIDs []int, skuIDs []int, isAsync, isContinueWhenError bool, userName string) (hint string, err error) {
return SyncSkus(ctx, nil, nil, nil, nameIDs, skuIDs, isAsync)
}
// func (v *VendorSync) oldSyncSkus(ctx *jxcontext.Context, db *dao.DaoDB, nameIDs []int, skuIDs []int, isAsync, isContinueWhenError bool, userName string) (hint string, err error) {
// globals.SugarLogger.Debugf("SyncSku trackInfo:%s, nameIDs:%v, skuIDs:%v, userName:%s", ctx.GetTrackInfo(), nameIDs, skuIDs, userName)
// isManagedIt := !(len(nameIDs) > 0 && len(nameIDs) <= 2 || len(skuIDs) > 0 && len(skuIDs) < 8)
// return v.LoopMultiStoresVendors(ctx, db, fmt.Sprintf("同步商品信息, nameIDs:%v, skuIDs:%v", nameIDs, skuIDs), isAsync, isManagedIt,
// func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
// var resultList []interface{}
// vendorInfo := batchItemList[0].(*MultiStoreVendorInfo)
// multiStoresHandler := v.GetMultiStoreHandler(vendorInfo.VendorID)
// syncStatusFieldName := dao.GetSyncStatusStructField(model.VendorNames[multiStoresHandler.GetVendorID()])
// dbField := dao.ConvertDBFieldPrefix(model.VendorNames[multiStoresHandler.GetVendorID()])
// skuMap := make(map[int]bool)
// sql := fmt.Sprintf(`
// SELECT DISTINCT t2.*
// FROM sku t1
// JOIN sku_name t2 ON t2.id = t1.name_id
// WHERE t1.%s_sync_status <> 0
// `, dbField)
// sqlParams := []interface{}{}
// if len(nameIDs) > 0 {
// sql += " AND t1.name_id IN (" + dao.GenQuestionMarks(len(nameIDs)) + ")"
// sqlParams = append(sqlParams, nameIDs)
// }
// if len(skuIDs) > 0 {
// sql += " AND t1.id IN(" + dao.GenQuestionMarks(len(skuIDs)) + ")"
// sqlParams = append(sqlParams, skuIDs)
// }
// for _, v := range skuIDs {
// skuMap[v] = true
// }
// sql += " ORDER BY t2.id"
// var skuNameList []*model.SkuName
// err := dao.GetRows(db, &skuNameList, sql, sqlParams...)
// if err == nil && len(skuNameList) > 0 {
// // todo 同一skuName下的sku顺序处理的原因是京东SPU特殊类型必须要序列化同步才能正常处理, db可能会有多线程问题
// task := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[vendorInfo.VendorID]), tasksch.NewParallelConfig().SetParallelCount(10).SetIsContinueWhenError(isContinueWhenError), ctx,
// func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
// var resultList []interface{}
// skuName := batchItemList[0].(*model.SkuName)
// var skuList []*model.Sku
// if err = dao.GetRows(db, &skuList, fmt.Sprintf(`
// SELECT *
// FROM sku
// WHERE name_id = ? AND %s_sync_status <> 0
// ORDER BY IF(spec_unit IN('kg', 'L'), 1000, 1) * spec_quality
// `, dbField), skuName.ID); err == nil && len(skuList) > 0 {
// for _, sku := range skuList {
// syncStatus := refutil.GetObjFieldByName(sku, syncStatusFieldName).(int8)
// globals.SugarLogger.Debugf("SyncSku trackInfo2:%s, skuID:%d, syncStatus:%d", ctx.GetTrackInfo(), sku.ID, syncStatus)
// if (len(skuIDs) == 0 || skuMap[sku.ID]) && (syncStatus != 0) {
// updateFields := []string{syncStatusFieldName}
// if syncStatus&model.SyncFlagDeletedMask != 0 { // 删除
// if syncStatus&model.SyncFlagNewMask == 0 {
// err = multiStoresHandler.DeleteSku(db, sku, userName)
// }
// } else if syncStatus&model.SyncFlagNewMask != 0 { // 新增
// if err = multiStoresHandler.CreateSku(db, sku, userName); err == nil {
// var tmpStruct struct {
// MaxIndex int
// }
// // todo hard code 得到京东spu中sku的顺序以方便以后修改销售属性这个必须要每次重新从数据库取
// if dao.GetRow(db, &tmpStruct, "SELECT MAX(sku_index) max_index FROM sku WHERE name_id = ? AND jd_id > 0 AND jd_id < 4024012631406 ", sku.NameID) == nil {
// sku.SkuIndex = tmpStruct.MaxIndex + 1
// updateFields = append(updateFields, "SkuIndex")
// }
// updateFields = append(updateFields, dao.GetVendorThingIDStructField(model.VendorNames[multiStoresHandler.GetVendorID()]))
// }
// } else if syncStatus&model.SyncFlagModifiedMask != 0 { // 修改
// err = multiStoresHandler.UpdateSku(db, sku, userName)
// }
// if err == nil {
// refutil.SetObjFieldByName(sku, syncStatusFieldName, int8(0))
// if _, err = dao.UpdateEntity(db, sku, updateFields...); err != nil {
// break
// } else {
// resultList = append(resultList, 1)
// }
// }
// }
// }
// }
// if err == nil {
// refutil.SetObjFieldByName(skuName, syncStatusFieldName, int8(0))
// _, err = dao.UpdateEntity(db, skuName, syncStatusFieldName)
// }
// return resultList, err
// }, skuNameList)
// t.AddChild(task).Run()
// result, err2 := task.GetResult(0)
// if err = err2; err == nil {
// resultList = append(resultList, result...)
// }
// }
// return resultList, err
// })
// }
func (v *VendorSync) SyncStoresCategory(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isForce, isAsync, isContinueWhenError bool) (hint string, err error) {
globals.SugarLogger.Debug("SyncStoresCategory")
isManageIt := len(storeIDs) != 1
hint, err = v.LoopStoresMap(ctx, db, fmt.Sprintf("同步门店分类信息:%v", storeIDs), isAsync, isManageIt, vendorIDs, storeIDs,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if handler := v.GetSingleStoreHandler(loopMapInfo.VendorID); handler != nil {
if isForce {
dao.SetStoreCategorySyncStatus(db, loopMapInfo.VendorID, storeIDs, nil, model.SyncFlagModifiedMask)
}
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]),
tasksch.NewParallelConfig().SetParallelCount(5).SetIsContinueWhenError(isContinueWhenError), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
storeMap := batchItemList[0].(*model.StoreMap)
_, err = SyncStoreCategories(ctx, task, storeMap.VendorID, storeMap.StoreID, storeMap.VendorStoreID, nil, nil, false, isContinueWhenError)
return nil, err
}, loopMapInfo.StoreMapList)
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
} else {
storeMap := loopMapInfo.StoreMapList[0]
_, err = SyncStoreCategories(ctx, t, storeMap.VendorID, storeMap.StoreID, storeMap.VendorStoreID, nil, nil, false, isContinueWhenError)
}
}
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}, isContinueWhenError)
return hint, makeSyncError(err)
}
//
func (v *VendorSync) SyncStoresSkus2(ctx *jxcontext.Context, parentTask tasksch.ITask, causeFlag int, db *dao.DaoDB, vendorIDs []int, storeIDs []int, syncDisabled bool, skuIDs, excludeSkuIDs []int, setSyncStatus int, isAsync, isContinueWhenError bool) (hint string, err error) {
globals.SugarLogger.Debug("SyncStoresSkus2")
isManageIt := len(storeIDs) != 1 || len(skuIDs) == 0 || len(skuIDs) > 8
task, hint, err := v.LoopStoresMap2(ctx, parentTask, db, fmt.Sprintf("同步门店商品信息:%v", storeIDs), isAsync, isManageIt, vendorIDs, storeIDs, false,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil {
parallelCount := 5
if model.MultiStoresVendorMap[loopMapInfo.VendorID] == 1 {
parallelCount = 2
}
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]),
tasksch.NewParallelConfig().SetParallelCount(parallelCount).SetIsContinueWhenError(isContinueWhenError), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
storeMap := batchItemList[0].(*model.StoreMap)
if syncDisabled || storeMap.Status > model.StoreStatusDisabled {
if setSyncStatus != 0 {
dao.SetStoreSkuSyncStatus(db, storeMap.VendorID, []int{storeMap.StoreID}, skuIDs, setSyncStatus)
}
if _, err = SyncStoreSkuNew(ctx, task, causeFlag, storeMap.VendorID, storeMap.StoreID, storeMap.VendorStoreID, nil, skuIDs, excludeSkuIDs, false, isContinueWhenError); err != nil {
globals.SugarLogger.Debugf("SyncStoresSkus2 failed2 store:%d failed with error:%v", storeMap.StoreID, err)
}
}
return nil, err
}, loopMapInfo.StoreMapList)
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
}
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}, isContinueWhenError)
if task != nil {
err = makeSyncError(err)
}
return hint, err
}
func (v *VendorSync) SyncStoresSkus(ctx *jxcontext.Context, parentTask tasksch.ITask, causeFlag int, db *dao.DaoDB, vendorIDs []int, storeIDs []int, skuIDs []int, isForce, isAsync, isContinueWhenError bool) (hint string, err error) {
setSyncStatus := 0
if isForce {
setSyncStatus = model.SyncFlagStoreSkuModifiedMask
}
return v.SyncStoresSkus2(ctx, parentTask, causeFlag, db, vendorIDs, storeIDs, true, skuIDs, nil, setSyncStatus, isAsync, isContinueWhenError)
}
func (v *VendorSync) FullSyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, syncDisabled bool, excludeSkuIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
globals.SugarLogger.Debug("FullSyncStoresSkus")
hint, err = v.LoopStoresMap(ctx, db, fmt.Sprintf("初始化门店商品信息:%v", storeIDs), isAsync, true, vendorIDs, storeIDs,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil {
parallelCount := 5
if model.MultiStoresVendorMap[loopMapInfo.VendorID] == 1 {
parallelCount = 1
}
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]),
tasksch.NewParallelConfig().SetParallelCount(parallelCount).SetIsContinueWhenError(isContinueWhenError), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
storeMap := batchItemList[0].(*model.StoreMap)
if syncDisabled || storeMap.Status > model.StoreStatusDisabled {
if _, err = FullSyncStoreSkuNew(ctx, task, storeMap.VendorID, storeMap.StoreID, storeMap.VendorStoreID, excludeSkuIDs, false, isContinueWhenError); err != nil {
globals.SugarLogger.Debugf("FullSyncStoresSkus failed2 store:%d failed with error:%v", storeMap.StoreID, err)
}
}
return nil, err
}, loopMapInfo.StoreMapList)
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
}
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}, isContinueWhenError)
return hint, makeSyncError(err)
}
func (v *VendorSync) DeleteRemoteStoreSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
globals.SugarLogger.Debug("DeleteRemoteStoreSkus")
hint, err = v.LoopStoresMap(ctx, db, fmt.Sprintf("删除远程门店商品信息:%v", storeIDs), isAsync, true, vendorIDs, storeIDs,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewSeqTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), ctx,
func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
storeMap := loopMapInfo.StoreMapList[step]
_, err = ClearRemoteStoreStuffAndSetNew(ctx, task, storeMap.VendorID, storeMap.StoreID, storeMap.VendorStoreID, false, isContinueWhenError)
return nil, err
}, len(loopMapInfo.StoreMapList))
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
} else {
_, err = ClearRemoteStoreStuffAndSetNew(ctx, t, loopMapInfo.StoreMapList[0].VendorID, loopMapInfo.StoreMapList[0].StoreID, loopMapInfo.StoreMapList[0].VendorStoreID, false, isContinueWhenError)
}
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}, isContinueWhenError)
return hint, makeSyncError(err)
}
// 将平台有,但本地没有的门店商品清除
// todo京东到家也应该支持
func (v *VendorSync) PruneMissingStoreSkus(ctx *jxcontext.Context, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
globals.SugarLogger.Debug("PruneMissingStoreSkus")
hint, err = v.LoopStoresMap(ctx, dao.GetDB(), fmt.Sprintf("删除远程无关联的门店商品信息:%v", storeIDs), isAsync, true, vendorIDs, storeIDs,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]),
tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetParallelCount(5), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
storeMap := batchItemList[0].(*model.StoreMap)
_, err = PruneMissingStoreSkus(ctx, task, loopMapInfo.VendorID, storeMap.StoreID, storeMap.VendorStoreID, false, isContinueWhenError)
return nil, err
}, loopMapInfo.StoreMapList)
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
} else {
_, err = PruneMissingStoreSkus(ctx, t, loopMapInfo.VendorID, loopMapInfo.StoreMapList[0].StoreID, loopMapInfo.StoreMapList[0].VendorStoreID, false, isContinueWhenError)
}
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}, isContinueWhenError)
return hint, makeSyncError(err)
}
// 把京西有,平台无且没有待创建标记的商品加上待创建标记
// todo京东到家也应该支持
func (v *VendorSync) AddCreateFlagForJxStoreSku(ctx *jxcontext.Context, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
globals.SugarLogger.Debug("AddCreateFlagForJxStoreSku")
hint, err = v.LoopStoresMap(ctx, dao.GetDB(), fmt.Sprintf("处理京西有,平台无且没有待创建标记的商品加上待创建标记:%v", storeIDs), isAsync, true, vendorIDs, storeIDs,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]),
tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetParallelCount(5), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
storeMap := batchItemList[0].(*model.StoreMap)
_, err = AddCreateFlagForJxStoreSku(ctx, task, loopMapInfo.VendorID, storeMap.StoreID, storeMap.VendorStoreID, false, isContinueWhenError)
return nil, err
}, loopMapInfo.StoreMapList)
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
} else {
_, err = AddCreateFlagForJxStoreSku(ctx, t, loopMapInfo.VendorID, loopMapInfo.StoreMapList[0].StoreID, loopMapInfo.StoreMapList[0].VendorStoreID, false, isContinueWhenError)
}
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}, isContinueWhenError)
return hint, makeSyncError(err)
}
func (v *VendorSync) AmendAndPruneStoreStuff(ctx *jxcontext.Context, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool, optType int, isForceUpdate bool) (hint string, err error) {
globals.SugarLogger.Debug("AmendAndPruneStoreStuff")
hint, err = v.LoopStoresMap(ctx, dao.GetDB(), fmt.Sprintf("处理京西平台商家分类与商品差异:%v", storeIDs), isAsync, true, vendorIDs, storeIDs,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]),
tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetParallelCount(5), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
storeMap := batchItemList[0].(*model.StoreMap)
_, err = amendAndPruneStoreStuff(ctx, task, loopMapInfo.VendorID, storeMap.StoreID, storeMap.VendorStoreID, false, isContinueWhenError, optType, isForceUpdate)
return nil, err
}, loopMapInfo.StoreMapList)
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
} else {
_, err = amendAndPruneStoreStuff(ctx, t, loopMapInfo.VendorID, loopMapInfo.StoreMapList[0].StoreID, loopMapInfo.StoreMapList[0].VendorStoreID, false, isContinueWhenError, optType, isForceUpdate)
}
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}, isContinueWhenError)
return hint, makeSyncError(err)
}
func (v *VendorSync) LoopStoresMap2(ctx *jxcontext.Context, parentTask tasksch.ITask, db *dao.DaoDB, taskName string, isAsync, isManageIt bool, vendorIDs []int, storeIDs []int, mustDirty bool, handler tasksch.WorkFunc, isContinueWhenError bool) (task tasksch.ITask, hint string, err error) {
var storeMapList []*model.StoreMap
if storeMapList, err = dao.GetStoresMapList2(db, vendorIDs, storeIDs, nil, model.StoreStatusAll, model.StoreIsSyncYes, "", "", mustDirty); err != nil {
return nil, "", err
}
if len(storeMapList) == 0 {
return nil, "", nil
}
vendorStoreMap := make(map[int][]*model.StoreMap)
for _, v := range storeMapList {
vendorStoreMap[v.VendorID] = append(vendorStoreMap[v.VendorID], v)
}
loopInfoList := make([]*LoopStoreMapInfo, len(vendorStoreMap))
index := 0
for k, v := range vendorStoreMap {
loopInfoList[index] = &LoopStoreMapInfo{
VendorID: k,
StoreMapList: v,
}
index++
}
if len(loopInfoList) == 1 {
taskName = fmt.Sprintf("%s,处理平台%s", taskName, model.VendorChineseNames[loopInfoList[0].VendorID])
}
// 临时把京东的并发改为2
task = tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, handler, loopInfoList)
if isAsync {
buildSetFinishHook(task, ctx)
}
tasksch.HandleTask(task, parentTask, isManageIt).Run()
if !isAsync {
resultList, err2 := task.GetResult(0)
if len(task.GetFailedList()) > 0 {
err2 = buildErrMsg(task)
err = err2
}
if err = err2; err == nil {
if len(resultList) == 0 {
hint = "1" // todo 暂时这样
} else {
hint = jxutils.TaskResult2Hint(resultList)
}
}
} else {
hint = task.GetID()
}
return task, hint, err
}
func buildSetFinishHook(task tasksch.ITask, ctx *jxcontext.Context) {
task.SetFinishHook(func(task tasksch.ITask) {
var noticeMsg = "您此次的同步任务错误详情返回如下: \n"
if ctx.GetUserName() != "jxadmin" {
if len(task.GetFailedList()) > 10 {
downloadURL, _, _ := WirteToExcelBySyncFailed(task)
noticeMsg += fmt.Sprintf("[详情点我]%s/billshow/?normal=true&path=%s \n", globals.BackstageHost, downloadURL)
} else if len(task.GetFailedList()) > 0 && len(task.GetFailedList()) <= 10 {
if task.GetErr() != nil {
noticeMsg += utils.Format4Output(buildErrMsgJson(task), true)
}
} else {
noticeMsg = "您的同步任务执行完成,没有错误返回。"
}
if authInfo, err := ctx.GetV2AuthInfo(); err == nil {
ddmsg.SendUserMessage(dingdingapi.MsgTyeText, authInfo.UserID, "同步错误返回", noticeMsg)
} else {
globals.SugarLogger.Debugf("同步错误发送钉钉消息失败, authinfo [%v] , [%v]", *authInfo, err)
}
}
// else {
// if len(task.GetFailedList()) > 1 {
// if time.Now().Hour() >= 20 || time.Now().Hour() < 7 {
// downloadURL, _, _ := WirteToExcelBySyncFailed(task)
// user, err := dao.GetUserByID(dao.GetDB(), "mobile", "18160030913")
// noticeMsg += fmt.Sprintf("[详情点我]%s/billshow/?normal=true&path=%s \n", globals.BackstageHost, downloadURL)
// if user != nil && err == nil {
// ddmsg.SendUserMessage(dingdingapi.MsgTyeText, user.UserID, "同步错误返回", noticeMsg)
// }
// }
// }
// }
})
}
func buildErrMsg(task tasksch.ITask) (err error) {
err = fmt.Errorf(utils.Format4Output(buildErrMsgJson(task), true))
return makeSpecSyncError(err)
}
func buildErrMsgJson(task tasksch.ITask) (resultL []*SyncErrResult) {
failedList := task.GetFailedList()
for _, v := range failedList {
for _, vv := range v.([]*partner.StoreSkuInfoWithErr) {
result := &SyncErrResult{
SkuID: 0,
StoreID: vv.StoreID,
CategoryName: vv.CategoryName,
VendorName: vv.VendoreName,
VendorSkuID: "",
NameID: 0,
VendorPrice: 0,
SyncType: vv.SyncType,
ErrMsg: vv.ErrMsg,
}
if vv.StoreSkuInfo != nil {
result.SkuID = vv.StoreSkuInfo.SkuID
result.VendorSkuID = vv.StoreSkuInfo.VendorSkuID
result.NameID = vv.StoreSkuInfo.NameID
result.VendorPrice = vv.StoreSkuInfo.VendorPrice
}
resultL = append(resultL, result)
}
}
return resultL
}
func (v *VendorSync) LoopStoresMap(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync, isManageIt bool, vendorIDs []int, storeIDs []int, handler tasksch.WorkFunc, isContinueWhenError bool) (hint string, err error) {
_, hint, err = v.LoopStoresMap2(ctx, nil, db, taskName, isAsync, isManageIt, vendorIDs, storeIDs, false, handler, isContinueWhenError)
return hint, err
}
func (v *VendorSync) LoopMultiStoresVendors(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync bool, isManageIt bool, handler tasksch.WorkFunc) (hint string, err error) {
task := tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, handler, getMultiStoreVendorInfoList())
tasksch.HandleTask(task, nil, isManageIt).Run()
if !isAsync {
result, err2 := task.GetResult(0)
if err = err2; err == nil {
hint = utils.Int2Str(len(result))
}
} else {
hint = task.ID
}
return hint, makeSyncError(err)
}
// func (v *VendorSync) RefreshAllSkusID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int, storeIDs []int) (hint string, err error) {
// task := tasksch.NewParallelTask("RefreshAllSkusID", nil, ctx,
// func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
// vendorID := batchItemList[0].(int)
// if handler := v.GetStoreHandler(vendorID); handler != nil {
// if multiHandler, ok := handler.(partner.IMultipleStoresHandler); ok {
// _, err = multiHandler.RefreshAllSkusID(ctx, task, false)
// } else if singleHandler, ok := handler.(partner.ISingleStoreHandler); ok {
// _, err = singleHandler.RefreshStoresAllSkusID(ctx, task, false, storeIDs)
// }
// }
// return nil, err
// }, vendorIDs)
// tasksch.HandleTask(task, nil, true).Run()
// if !isAsync {
// _, err = task.GetResult(0)
// }
// return task.ID, err
// }
func (v *VendorSync) RefreshAllStoresID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int) (hint string, err error) {
task := tasksch.NewParallelTask("RefreshAllStoresID", nil, ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
vendorID := batchItemList[0].(int)
if handler := v.GetStoreHandler(vendorID); handler != nil {
_, err = handler.RefreshAllStoresID(ctx, task, false)
}
return nil, err
}, vendorIDs)
tasksch.HandleTask(task, nil, true).Run()
if !isAsync {
_, err = task.GetResult(0)
}
return task.ID, err
}
func makeSyncError(err error) (newErr error) {
if err != nil {
if _, ok := err.(*SyncError); !ok {
return &SyncError{
Original: err,
}
}
}
return err
}
func makeSpecSyncError(err error) (newErr error) {
if err != nil {
if _, ok := err.(*SpecSyncError); !ok {
return &SpecSyncError{
SpecErr: err,
}
}
}
return err
}
func (e *SpecSyncError) Error() string {
return e.SpecErr.Error()
}
func (e *SyncError) Error() string {
return fmt.Sprintf("本地数据修改成功,但同步失败,请根据错误提示处理!,同步错误信息:%s", e.Original.Error())
}
func isSyncError(err error) bool {
_, ok := err.(*SyncError)
return ok
}
func (v *VendorSync) SyncSkuNames(ctx *jxcontext.Context, nameIDs []int, isForce, isAsync, isContinueWhenError bool) (hint string, err error) {
return SyncSkus(ctx, nil, nil, nil, nameIDs, nil, isAsync)
}
func (v *VendorSync) oldSyncSkuNames(ctx *jxcontext.Context, nameIDs []int, isForce, isAsync, isContinueWhenError bool) (hint string, err error) {
db := dao.GetDB()
if isForce {
dao.SetSkuNameSyncStatus(db, nil, nil, nameIDs, model.SyncFlagModifiedMask)
}
return v.SyncSkus(ctx, db, nameIDs, nil, isAsync, isContinueWhenError, ctx.GetUserName())
}
func (v *VendorSync) ChangeStoreSkuSaleStatus(ctx *jxcontext.Context, storeID int, isAsync, isContinueWhenError bool) (err error) {
var (
storeIDs []int
skuIDs []int
)
db := dao.GetDB()
storeSkuList, err := dao.GetStoresSkusInfoBySaleTime(db, storeID)
if len(storeSkuList) < 1 || err != nil {
return nil
}
for _, v := range storeSkuList {
storeIDs = append(storeIDs, v.StoreID)
skuIDs = append(skuIDs, v.SkuID)
}
vendorIDs := partner.GetPurchasePlatformVendorIDs()
dao.UpdateStoreSkuBindSyncStatusForSaleStatus(db, vendorIDs, storeID)
v.SyncStoresSkus(ctx, nil, model.SyncFlagSaleMask, db, vendorIDs, storeIDs, skuIDs, false, isAsync, isContinueWhenError)
return err
}
func GetTimeMixByInt(begin1, end1, begin2, end2 int16) (beginAt, endAt int16) {
if (begin1 > begin2 && begin1 > end2) || (begin2 > end1 && end2 > end1) {
return 0, 0
}
if begin1 > begin2 {
beginAt = begin1
if end1 > end2 {
endAt = end2
} else {
endAt = end1
}
} else {
beginAt = begin2
if end1 > end2 {
endAt = end2
} else {
endAt = end1
}
}
return beginAt, endAt
}
func WirteToExcelBySyncFailed(task tasksch.ITask) (downloadURL, fileName string, err error) {
var (
sheetList1 []*excel.Obj2ExcelSheetConfig
)
syncErrResultLock.syncErrResult = syncErrResultLock.syncErrResult[0:0]
list := buildErrMsgJson(task)
for _, v := range list {
syncErrResultLock.AppendData(*v)
}
excelConf1 := &excel.Obj2ExcelSheetConfig{
Title: "同步错误",
Data: syncErrResultLock.syncErrResult,
CaptionList: SyncErrResultTitle,
}
sheetList1 = append(sheetList1, excelConf1)
if excelConf1 != nil {
downloadURL, fileName, err = jxutils.UploadExeclAndPushMsg(sheetList1, time.Now().Format("200601021504")+"同步错误返回")
baseapi.SugarLogger.Debug("WriteToExcel: download is [%v]", downloadURL)
} else {
baseapi.SugarLogger.Debug("WriteToExcel: dataSuccess is nil!")
}
if err != nil {
baseapi.SugarLogger.Errorf("WriteToExcel:upload %s , failed error:%v", fileName, err)
}
return downloadURL, fileName, err
}
func (d *SyncErrResultLock) AppendData(syncErrResult SyncErrResult) {
d.locker.Lock()
defer d.locker.Unlock()
d.syncErrResult = append(d.syncErrResult, syncErrResult)
}
func (v *VendorSync) SyncStoreSkusFromYb(ctx *jxcontext.Context, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
var (
vendorID = model.VendorIDYB
)
hint, err = v.LoopStoresMap(ctx, dao.GetDB(), fmt.Sprintf("同步银豹到京西:%v", storeIDs), isAsync, true, []int{vendorID}, storeIDs,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
for _, v := range loopMapInfo.StoreMapList {
hint, err = syncStoreSkusFromYb(ctx, v.StoreID, vendorID, v.VendorStoreID, isAsync, isContinueWhenError)
}
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}, isContinueWhenError)
return hint, err
}
func syncStoreSkusFromYb(ctx *jxcontext.Context, storeID, vendorID int, vendorStoreID string, isAsync, isContinueWhenError bool) (hint string, err error) {
var (
db = dao.GetDB()
localSkuMap = make(map[string]*dao.StoreSkuSyncInfo)
vendorSkuMap = make(map[string]*partner.SkuNameInfo)
// skuBindInfosDel []*StoreSkuBindInfo
// skuBindInfosUpt []*StoreSkuBindInfo
addList []*partner.SkuNameInfo
updateList []*partner.SkuNameInfo
deleteList []*dao.StoreSkuSyncInfo
)
handler, _ := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler)
localSkuList, err := dao.GetStoreSkus2(db, vendorID, storeID, nil, false)
if err != nil {
return "", err
}
for _, v := range localSkuList {
localSkuMap[v.VendorSkuID] = v
}
remoteSkuList, err := handler.GetStoreSkusFullInfo(ctx, nil, storeID, vendorStoreID, nil)
if err != nil {
return "", err
}
for _, v := range remoteSkuList {
if localSkuMap[v.SkuList[0].VendorSkuID] == nil {
if len(v.YbBarCode) > 7 {
addList = append(addList, v)
}
} else {
updateList = append(updateList, v)
}
vendorSkuMap[v.SkuList[0].VendorSkuID] = v
}
for _, v := range localSkuList {
if vendorSkuMap[v.VendorSkuID] == nil {
deleteList = append(deleteList, v)
}
}
fmt.Println("remoteSkuList", len(remoteSkuList))
fmt.Println("addList", len(addList))
fmt.Println("updateList", utils.Format4Output(updateList, false))
fmt.Println("deleteList", utils.Format4Output(deleteList, false))
// taskSeqFunc := func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
// store, _ := dao.GetStoreDetail(db, storeID, vendorID)
// switch step {
// case 0:
// if len(addList) > 0 {
// taskFunc := func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
// var (
// v = batchItemList[0].(*partner.SkuNameInfo)
// upc = v.YbBarCode
// )
// err = AddSkuNameByUpc(ctx, upc, store, v)
// if err != nil {
// task.AddFailedList(putils.GetErrMsg2FailedSingleList(nil, err, storeID, model.VendorChineseNames[vendorID], "根据upc创建京西商品"))
// }
// return retVal, err
// }
// taskParallel := tasksch.NewParallelTask("创建商品", tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, taskFunc, addList)
// tasksch.HandleTask(taskParallel, task, true).Run()
// _, err = taskParallel.GetResult(0)
// }
// case 1:
// if len(deleteList) > 0 {
// taskFunc := func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
// var (
// v = batchItemList[0].(*dao.StoreSkuSyncInfo)
// )
// skuBindInfo := &StoreSkuBindInfo{
// NameID: v.NameID,
// IsFocus: -1,
// }
// retVal = []*StoreSkuBindInfo{skuBindInfo}
// return retVal, err
// }
// taskParallel := tasksch.NewParallelTask("删除商品", tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, taskFunc, deleteList)
// tasksch.HandleTask(taskParallel, task, true).Run()
// resultDel, _ := taskParallel.GetResult(0)
// for _, v := range resultDel {
// skuBindInfosDel = append(skuBindInfosDel, v.(*StoreSkuBindInfo))
// }
// _, err = updateStoresSkusWithoutSync(ctx, db, []int{storeID}, skuBindInfosDel, false, false)
// }
// case 2:
// if len(updateList) > 0 {
// taskFunc := func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
// var (
// v = batchItemList[0].(*partner.SkuNameInfo)
// skuBindInfo = &StoreSkuBindInfo{}
// storeSkus []*dao.StoreSkuExt
// pricePercentagePack []*model.PricePercentageItem
// )
// sql := `
// SELECT a.*, c.id name_id
// FROM store_sku_bind a
// JOIN sku b ON a.sku_id = b.id
// JOIN sku_name c ON c.id = b.name_id
// WHERE a.store_id = ? AND a.yb_id = ? AND a.deleted_at = ?
// `
// sqlParams := []interface{}{storeID, v.SkuList[0].VendorSkuID, utils.DefaultTimeValue}
// err = dao.GetRows(db, &storeSkus, sql, sqlParams)
// if len(storeSkus) > 0 {
// if storeSkus[0].YbPrice != int(v.SkuList[0].VendorPrice) {
// err = jxutils.Strings2Objs(store.PricePercentagePackStr, &pricePercentagePack)
// skuBindInfo.UnitPrice = jxutils.CaculateJxPriceByPricePack(pricePercentagePack, 0, int(v.SkuList[0].VendorPrice))
// }
// } else {
// return retVal, fmt.Errorf("未查询到门店商品yb_id [%v]", v.SkuList[0].VendorSkuID)
// }
// if v.SkuList[0].Stock < 1 {
// skuBindInfo.IsSale = model.StoreSkuBindStatusDontSale
// } else {
// skuBindInfo.IsSale = model.StoreSkuBindStatusNormal
// }
// skuBindInfo.NameID = storeSkus[0].NameID
// retVal = []*StoreSkuBindInfo{skuBindInfo}
// return retVal, err
// }
// taskParallel := tasksch.NewParallelTask("更新商品价格和库存", tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, taskFunc, updateList)
// tasksch.HandleTask(taskParallel, task, true).Run()
// resultUpt, _ := taskParallel.GetResult(0)
// for _, v := range resultUpt {
// skuBindInfosUpt = append(skuBindInfosUpt, v.(*StoreSkuBindInfo))
// }
// _, err = updateStoresSkusWithoutSync(ctx, db, []int{storeID}, skuBindInfosUpt, false, false)
// }
// case 3:
// _, err = CurVendorSync.SyncStoresSkus2(jxcontext.AdminCtx, nil, 0, db, []int{0, 1, 3}, nil, false, nil, nil, 0, true, true)
// }
// return result, err
// }
// taskSeq := tasksch.NewSeqTask2("同步银豹商品到京西", ctx, true, taskSeqFunc, 3)
// tasksch.HandleTask(taskSeq, nil, true).Run()
// hint = taskSeq.GetID()
return hint, err
}
func (v *VendorSync) SyncJdsStoresSkus(ctx *jxcontext.Context, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
var (
db = dao.GetDB()
)
storeSkus, _ := dao.GetStoresSkusInfo(db, []int{model.JdShopMainStoreID}, nil)
_, hint, err = v.LoopStoresMap2(ctx, nil, db, fmt.Sprintf("同步京东商城库存商品信息:%v", storeIDs), isAsync, true, []int{model.VendorIDJDShop}, storeIDs, false,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil {
for _, storeMap := range loopMapInfo.StoreMapList {
if storeMap.Status > model.StoreStatusDisabled && storeMap.StoreID != model.JdShopMainStoreID && storeMap.SyncRule != 0 {
err = syncJdsStoresSkus(ctx, db, t, storeMap, isAsync, isContinueWhenError)
}
err = syncJdsStoreStock(ctx, db, t, storeSkus, storeMap, isAsync, isContinueWhenError)
}
}
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}, isContinueWhenError)
return hint, err
}
func syncJdsStoreStock(ctx *jxcontext.Context, db *dao.DaoDB, parentTask tasksch.ITask, storeSkus []*model.StoreSkuBind, storeMap *model.StoreMap, isAsync, isContinueWhenError bool) (err error) {
// for _, storeSku := range storeSkus {
// stock := 0
// if storeSku.Status == model.StoreSkuBindStatusNormal {
// stock = 9999
// }
// storeSku2, _ := dao.GetStoresSkusInfo(db, []int{storeMap.StoreID}, []int{storeSku.SkuID})
// if storeSku.JdsID != 0 {
// if len(storeSku2) > 0 {
// if storeSku.Status != storeSku2[0].Status && storeMap.VendorStoreID != "" {
// err = api.JdShopAPI.UpdateSkuSiteStock(storeSku.JdsID, stock, utils.Str2Int(storeMap.VendorStoreID))
// }
// } else {
// err = api.JdShopAPI.UpdateSkuSiteStock(storeSku.JdsID, 0, utils.Str2Int(storeMap.VendorStoreID))
// }
// }
// }
task := tasksch.NewParallelTask("syncJdsStoreStock", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
storeSku := batchItemList[0].(*model.StoreSkuBind)
stock := 0
if storeSku.Status == model.StoreSkuBindStatusNormal {
stock = 9999
}
storeSku2, _ := dao.GetStoresSkusInfo(db, []int{storeMap.StoreID}, []int{storeSku.SkuID})
if storeSku.JdsID != 0 {
if len(storeSku2) > 0 {
if storeSku.Status != storeSku2[0].Status && storeMap.VendorStoreID != "" {
err = api.JdShopAPI.UpdateSkuSiteStock(storeSku.JdsID, stock, utils.Str2Int(storeMap.VendorStoreID))
}
} else {
err = api.JdShopAPI.UpdateSkuSiteStock(storeSku.JdsID, 0, utils.Str2Int(storeMap.VendorStoreID))
}
}
return retVal, err
}, storeSkus)
tasksch.HandleTask(task, parentTask, true).Run()
if !isAsync {
_, err = task.GetResult(0)
}
return err
}
func syncJdsStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, parentTask tasksch.ITask, storeMap *model.StoreMap, isAsync, isContinueWhenError bool) (err error) {
globals.SugarLogger.Debugf("syncJdsStoresSkus")
var (
mainSkusMap = make(map[int][]*dao.StoreSkuSyncInfo)
skusMap = make(map[int][]*dao.StoreSkuSyncInfo)
updateList []*dao.StoreSkuSyncInfo
addList []*dao.StoreSkuSyncInfo
skuBindInfos1 []*StoreSkuBindInfo
skuBindInfos2 []*StoreSkuBindInfo
)
storeSkusMain, err := dao.GetStoreSkusByNameIDs(db, []int{model.JdShopMainStoreID}, 0)
for _, v := range storeSkusMain {
mainSkusMap[v.NameID] = append(mainSkusMap[v.NameID], v)
}
storeSkus, err := dao.GetStoreSkusByNameIDs(db, []int{storeMap.StoreID}, 0)
for _, v := range storeSkus {
skusMap[v.NameID] = append(skusMap[v.NameID], v)
}
for k, v := range skusMap {
if mainSkusMap[k] != nil {
flag := false
for _, storeSku := range v {
if storeSku.StoreSkuStatus == model.StoreSkuBindStatusNormal {
flag = true
}
}
if !flag {
continue
}
for _, storeSku := range v {
for _, storeSkuMain := range mainSkusMap[k] {
if storeSkuMain.StoreSkuStatus == model.StoreSkuBindStatusNormal && storeSku.StoreSkuStatus == model.StoreSkuBindStatusDontSale &&
storeSkuMain.SkuID == storeSku.SkuID {
updateList = append(updateList, storeSkuMain)
}
}
}
}
}
for k, v := range mainSkusMap {
if skusMap[k] == nil {
if storeMap.SyncRule == 2 {
for _, storeSkuMain := range v {
addList = append(addList, storeSkuMain)
}
}
}
}
// fmt.Println("updateList", utils.Format4Output(updateList, false))
// fmt.Println("addList", utils.Format4Output(addList, false))
if len(updateList) > 0 {
for _, v := range updateList {
skuBindInfos1 = append(skuBindInfos1, buildStoreSkuBindInfo(db, storeMap.StoreID, v, false))
}
UpdateStoresSkusByBind(ctx, parentTask, skuBindInfos1, isAsync, isContinueWhenError, false)
}
if len(addList) > 0 {
for _, v := range addList {
skuBindInfos2 = append(skuBindInfos2, buildStoreSkuBindInfo(db, storeMap.StoreID, v, true))
}
UpdateStoresSkusByBind(ctx, parentTask, skuBindInfos2, isAsync, isContinueWhenError, false)
}
return err
}
func buildStoreSkuBindInfo(db *dao.DaoDB, storeID int, storeBind *dao.StoreSkuSyncInfo, isFocus bool) (skuBindInfo *StoreSkuBindInfo) {
skus := []*StoreSkuBindSkuInfo{
&StoreSkuBindSkuInfo{
SkuID: storeBind.SkuID,
},
}
skuBindInfo = &StoreSkuBindInfo{
StoreID: storeID,
NameID: storeBind.NameID,
}
if isFocus {
skuBindInfo.IsFocus = 1
}
if storeBind.StoreSkuStatus == model.SkuStatusNormal {
skus[0].IsSale = 1
} else {
skus[0].IsSale = -1
}
skuBindInfo.Skus = skus
return skuBindInfo
}
func SyncSkuExperfixAndWatermark(ctx *jxcontext.Context) (err error) {
var (
db = dao.GetDB()
)
skuExinfos, err := dao.GetSkuExinfos(db, nil, []int{model.VendorIDMTWM, model.VendorIDEBAI, model.VendorIDJD}, "", utils.ZeroTimeValue, utils.ZeroTimeValue)
task := tasksch.NewParallelTask("SyncSkuExperfixAndWatermark", tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
skuExinfo := batchItemList[0].(*model.SkuExinfoMap)
skus, err := dao.GetSkus(db, nil, []int{skuExinfo.NameID}, nil, nil, nil)
var skuIDs []int
for _, v := range skus {
skuIDs = append(skuIDs, v.ID)
}
if partner.IsMultiStore(skuExinfo.VendorID) {
for _, v := range skuIDs {
OnUpdateThing(ctx, db, nil, int64(v), model.ThingTypeSku)
}
// CurVendorSync.SyncSkus(ctx, db, nil, skuIDs, true, true, ctx.GetUserName())
} else {
if len(skuIDs) > 0 {
SetStoreSkuSyncStatus2(db, nil, []int{skuExinfo.VendorID}, skuIDs, model.SyncFlagModifiedMask)
}
// CurVendorSync.SyncStoresSkus2(ctx, nil, 0, db, []int{skuExinfo.VendorID}, nil, false, skuIDs, nil, model.SyncFlagModifiedMask, true, true)
}
return retVal, err
}, skuExinfos)
tasksch.HandleTask(task, nil, true).Run()
_, err = task.GetResult(0)
return err
}
func SetMTPSStatus(ctx *jxcontext.Context) {
globals.SugarLogger.Debug("StoreOpenAll skuID is start")
/*获取美团门店信息*/
StoreInfoList, _ := api.MtpsAPI.GetStoreStatusAll()
StoreInfoList2 := make(map[string]string)
for _, store := range StoreInfoList {
for _, data := range store.DataList {
StoreInfoList2[data.OuterPoiID] = data.PoiName
}
}
db := dao.GetDB()
/*比较营业状态*/
/*把获取的京西状态和名称存一下*/
StoreCourierList, _ := dao.GetStoreCourierList(db, []int{}, model.StoreStatusAll, model.StoreStatusAll)
/*循环美团*/
for _, StoreInfoList1 := range StoreInfoList {
for _, StoreInfoList11 := range StoreInfoList1.DataList {
/*循环京西*/
for _, StoreCourierList1 := range StoreCourierList {
/*只比较美团*/
if StoreCourierList1.VendorID != model.VendorIDMTPS {
continue
}
/*如果门店ID相同的时候进入判断,一个门店只用判断一次就行*/
if StoreCourierList1.VendorStoreID == StoreInfoList11.OuterPoiID {
if StoreCourierList1.Status != StoreInfoList11.OpenType {
sl := make(map[string]interface{})
sl["vendorStoreID"] = StoreInfoList11.OuterPoiID
sl["status"] = StoreInfoList11.OpenType
sl["vendorStatus"] = StoreInfoList11.OpenType
globals.SugarLogger.Debugf("被修改配送状态的VendorStoreID是:%s,名称是:%s,美团状态是:%s,本地状态是:%s",
StoreInfoList11.OuterPoiID, StoreInfoList11.PoiName, strconv.Itoa(StoreInfoList11.OpenType), strconv.Itoa(StoreCourierList1.Status))
UpdateStoreCourierMap(ctx, nil, StoreCourierList1.StoreID, StoreCourierList1.VendorID, sl, ctx.GetUserName())
break
}
}
}
}
}
/* 美团配送的门店是否存在调用美团配送的api有可能接了查询京西门店对应的美团配送门店是否存在若不存在则要在京西这边解绑美团配送门店
怎么解绑可以在网页上门店管理那点一下看看调的什么接口,传的什么参数*/
/*获取所有门店信息*/
//test:
for _, StoreCourierList1 := range StoreCourierList {
diff := false
StoreLists, _ := dao.GetStoreList(db, []int{StoreCourierList1.StoreID}, nil, nil, nil, "")
if StoreLists == nil {
globals.SugarLogger.Debugf("StoreID为:%s,在store表未找到", StoreCourierList1.StoreID)
continue
}
if StoreCourierList1.VendorID != model.VendorIDMTPS || StoreCourierList1.VendorStoreID == "" {
continue
}
if StoreCourierList1.Status == model.StoreStatusDisabled || StoreCourierList1.Status == model.StoreStatusClosed {
continue
}
/*京西不为空,容错*/
//if {
/*调用API获取美团的商店信息*/
MTPSInfo := new(mtpsapi.ShopInfo)
MTPSInfo, _ = api.MtpsAPI.ShopQuery(StoreCourierList1.VendorStoreID)
if MTPSInfo == nil {
globals.SugarLogger.Debug("美团未找到该门店," + StoreLists[0].Name + strconv.Itoa(StoreCourierList1.StoreID) + " 被解绑关联的ID为" + StoreCourierList1.VendorStoreID)
diff = true
}
if MTPSInfo != nil && MTPSInfo.ShopName == "" {
MTPSInfo.ShopName = StoreInfoList2[MTPSInfo.ShopID]
}
if MTPSInfo != nil && MTPSInfo.ShopLng != StoreCourierList1.Lng && MTPSInfo.ShopLat == StoreCourierList1.Lat {
/*平台上但是坐标不同,解绑*/
globals.SugarLogger.Debug("商店与美团配送上的坐标不同," + StoreLists[0].Name + strconv.Itoa(StoreCourierList1.StoreID) + " 被解绑关联的ID为" + StoreCourierList1.VendorStoreID)
if _, err := DeleteStoreCourierMap(ctx, db, StoreCourierList1.StoreID, StoreCourierList1.VendorID, ctx.GetUserName()); err != nil {
globals.SugarLogger.Debug(err.Error())
return
}
diff = true
}
if diff {
if _, err := DeleteStoreCourierMap(ctx, db, StoreCourierList1.StoreID, StoreCourierList1.VendorID, ctx.GetUserName()); err != nil {
globals.SugarLogger.Debug(err.Error())
return
}
//break test
}
}
globals.SugarLogger.Debug("StoreOpenAll skuID is Complete")
}