Files
jx-callback/business/jxstore/cms/sync.go
2020-07-01 15:33:46 +08:00

1353 lines
58 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package cms
import (
"errors"
"fmt"
"strconv"
"sync"
"time"
"git.rosy.net.cn/jx-callback/business/partner/putils"
"git.rosy.net.cn/jx-callback/globals/api"
"git.rosy.net.cn/baseapi"
"git.rosy.net.cn/baseapi/platformapi/dingdingapi"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxutils"
"git.rosy.net.cn/jx-callback/business/jxutils/ddmsg"
"git.rosy.net.cn/jx-callback/business/jxutils/excel"
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
"git.rosy.net.cn/jx-callback/business/jxutils/tasksch"
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/model/dao"
"git.rosy.net.cn/jx-callback/business/partner"
"git.rosy.net.cn/jx-callback/globals"
)
type SyncErrResult struct {
SkuID int `json:"商品ID"`
CategoryName string `json:"分类名"`
StoreID int `json:"门店ID"`
VendorName string `json:"平台名"`
VendorSkuID string `json:"平台商品ID"`
NameID int `json:"商品nameID"`
VendorPrice int64 `json:"平台价"`
SyncType string `json:"同步类型"`
ErrMsg string `json:"错误信息"`
}
type SyncErrResultLock struct {
syncErrResult []SyncErrResult
locker sync.RWMutex
}
type LoopStoreMapInfo struct {
VendorID int
StoreMapList []*model.StoreMap
}
type VendorSync struct {
}
type SyncError struct {
Original error `json:"original"`
Message string `json:"message"`
}
type SpecSyncError struct {
SpecErr error `json:"specErr"`
}
// 对于多门店平台接口的通用处理
type MultiStoreHandlerWrapper struct {
partner.IMultipleStoresHandler
}
// 对于单门店平台接口的通用处理
type SingleStoreHandlerWrapper struct {
partner.ISingleStoreHandler
}
var (
CurVendorSync VendorSync
)
var (
ErrHaveNotImplementedYet = errors.New("还没有实现")
ErrEntityNotExist = errors.New("找不到相应实体")
SyncErrResultTitle = []string{
"商品ID",
"分类名",
"门店ID",
"平台名",
"平台商品ID",
"商品nameID",
"平台价",
"同步类型",
"错误信息",
}
syncErrResultLock SyncErrResultLock
)
// func (p *MultiStoreHandlerWrapper) DeleteCategory(db *dao.DaoDB, cat *model.SkuCategory, userName string) (err error) {
// if jxutils.IsEmptyID(cat.JdID) {
// return nil
// }
// return p.IMultipleStoresHandler.DeleteCategory(db, cat, userName)
// }
// func (p *MultiStoreHandlerWrapper) UpdateCategory(db *dao.DaoDB, cat *model.SkuCategory, userName string) (err error) {
// if jxutils.IsEmptyID(cat.JdID) {
// globals.SugarLogger.Warnf("UpdateCategory fakeid cat:%s should not get here", utils.Format4Output(cat, true))
// return nil
// }
// return p.IMultipleStoresHandler.UpdateCategory(db, cat, userName)
// }
// func (p *MultiStoreHandlerWrapper) DeleteSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error) {
// globals.SugarLogger.Debugf("wrapper DeleteSku, sku:%s", utils.Format4Output(sku, false))
// if jxutils.IsEmptyID(sku.JdID) {
// return nil
// }
// return p.IMultipleStoresHandler.DeleteSku(db, sku, userName)
// }
// func (p *MultiStoreHandlerWrapper) UpdateSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error) {
// if jxutils.IsEmptyID(sku.JdID) {
// globals.SugarLogger.Warnf("UpdateSku fakeid sku:%s should not get here", utils.Format4Output(sku, true))
// return nil
// }
// return p.IMultipleStoresHandler.UpdateSku(db, sku, userName)
// }
func (v *VendorSync) GetStoreHandler(vendorID int) partner.IPurchasePlatformHandler {
return partner.GetPurchasePlatformFromVendorID(vendorID)
}
func (v *VendorSync) GetMultiStoreHandler(vendorID int) partner.IMultipleStoresHandler {
if handler, ok := v.GetStoreHandler(vendorID).(partner.IMultipleStoresHandler); ok {
return handler
}
return nil
}
func (v *VendorSync) GetSingleStoreHandler(vendorID int) partner.ISingleStoreHandler {
if handler, ok := v.GetStoreHandler(vendorID).(partner.ISingleStoreHandler); ok {
return handler
}
return nil
}
// func (v *VendorSync) syncCategories(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorID int, db *dao.DaoDB, cats []*model.SkuCategory, userName string) (err error) {
// multiStoresHandler := v.GetMultiStoreHandler(vendorID)
// syncStatusFieldName := dao.GetSyncStatusStructField(model.VendorNames[vendorID])
// task := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[vendorID]), nil, ctx,
// func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
// cat := batchItemList[0].(*model.SkuCategory)
// updateFields := []string{syncStatusFieldName}
// syncStatus := refutil.GetObjFieldByName(cat, syncStatusFieldName).(int8)
// if (syncStatus & model.SyncFlagDeletedMask) != 0 { //删除
// if (syncStatus & model.SyncFlagNewMask) == 0 {
// err = multiStoresHandler.DeleteCategory(db, cat, userName)
// }
// } else if (syncStatus & model.SyncFlagNewMask) != 0 { // 新增
// err = multiStoresHandler.CreateCategory(db, cat, userName)
// updateFields = append(updateFields, dao.GetVendorThingIDStructField(model.VendorNames[multiStoresHandler.GetVendorID()]))
// } else if (syncStatus & model.SyncFlagModifiedMask) != 0 { // 修改
// err = multiStoresHandler.UpdateCategory(db, cat, userName)
// if intErr, ok := err.(*utils.ErrorWithCode); ok && intErr.IntCode() == -3 {
// err = nil
// }
// }
// if err == nil {
// refutil.SetObjFieldByName(cat, syncStatusFieldName, int8(0))
// _, err = dao.UpdateEntity(db, cat, updateFields...)
// }
// return nil, err
// }, cats)
// tasksch.HandleTask(task, parentTask, false).Run()
// _, err = task.GetResult(0)
// return err
// }
func (v *VendorSync) SyncCategory(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) {
return SyncCategories(ctx, nil, nil, nil, []int{categoryID}, isAsync)
}
// func (v *VendorSync) oldSyncCategory(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) {
// globals.SugarLogger.Debug("SyncCategory")
// hint, err = v.LoopMultiStoresVendors(ctx, db, fmt.Sprintf("同步分类信息:%d", categoryID), isAsync, false,
// func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
// vendorInfo := batchItemList[0].(*MultiStoreVendorInfo)
// var cats []*model.SkuCategory
// cond := make(map[string]interface{})
// if categoryID > 0 {
// cond[model.FieldID] = categoryID
// } else {
// cond[model.FieldLevel] = 1
// }
// err := dao.GetEntitiesByKV(db, &cats, cond, true)
// if err == nil {
// err = v.syncCategories(ctx, t, vendorInfo.VendorID, db, cats, userName)
// }
// if err != nil || categoryID > 0 {
// return nil, err
// }
// cond[model.FieldLevel] = 2
// err = dao.GetEntitiesByKV(db, &cats, cond, true)
// if err == nil {
// err = v.syncCategories(ctx, t, vendorInfo.VendorID, db, cats, userName)
// }
// return nil, err
// })
// return "", err
// }
func (v *VendorSync) SyncReorderCategories(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) {
return SyncReorderCategories(ctx, categoryID, isAsync)
}
// func (v *VendorSync) oldSyncReorderCategories(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) {
// hint, err = v.LoopMultiStoresVendors(ctx, db, fmt.Sprintf("分类重排序:%d", categoryID), isAsync, false, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
// vendorInfo := batchItemList[0].(*MultiStoreVendorInfo)
// multiStoresHandler := v.GetMultiStoreHandler(vendorInfo.VendorID)
// err2 := multiStoresHandler.ReorderCategories(db, categoryID, userName)
// if err2 == nil {
// cat := &model.SkuCategory{}
// _, err2 = dao.UpdateEntityByKV(db, cat, utils.Params2Map(dao.GetSyncStatusStructField(model.VendorNames[multiStoresHandler.GetVendorID()]), 0), utils.Params2Map(model.FieldParentID, categoryID))
// return nil, err2
// }
// return nil, err2
// })
// return "", err
// }
func (v *VendorSync) SyncStore2(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs, storeIDs []int, mustDirty, isAsync bool) (hint string, err error) {
globals.SugarLogger.Debugf("SyncStore2, storeIDs:%d", storeIDs)
userName := ctx.GetUserName()
isManageIt := len(storeIDs) == 0 || len(storeIDs) > 5
_, hint, err = v.LoopStoresMap2(ctx, nil, db, fmt.Sprintf("同步门店信息:%v", storeIDs), isAsync, isManageIt, vendorIDs, storeIDs, mustDirty, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (resultList interface{}, err error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
var failedList []*partner.StoreSkuInfoWithErr
handler := v.GetStoreHandler(loopMapInfo.VendorID)
if handler != nil {
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
var resultList []interface{}
var vendorStoreID string
storeMap := batchItemList[0].(*model.StoreMap)
db2 := db
if len(loopMapInfo.StoreMapList) > 1 {
db2 = dao.GetDB()
}
if model.IsSyncStatusNew(storeMap.SyncStatus) {
if vendorStoreID, err = handler.CreateStore2(db2, storeMap.StoreID, userName); err == nil {
resultList = append(resultList, 1)
} else {
failedList = putils.GetErrMsg2FailedSingleList(nil, err, storeMap.StoreID, model.VendorChineseNames[storeMap.VendorID], "创建门店")
}
} else if model.IsSyncStatusDelete(storeMap.SyncStatus) {
if err = handler.DeleteStore(db2, storeMap.StoreID, userName); err == nil {
resultList = append(resultList, 1)
} else {
failedList = putils.GetErrMsg2FailedSingleList(nil, err, storeMap.StoreID, model.VendorChineseNames[storeMap.VendorID], "删除门店")
}
} else {
if err = handler.UpdateStore(db2, storeMap.StoreID, userName); err == nil {
resultList = append(resultList, 1)
} else {
failedList = putils.GetErrMsg2FailedSingleList(nil, err, storeMap.StoreID, model.VendorChineseNames[storeMap.VendorID], "更新门店")
}
}
if err == nil {
if model.IsSyncStatusNew(storeMap.SyncStatus) {
storeMap.VendorStoreID = vendorStoreID
storeMap.SyncStatus = 0
_, err = dao.UpdateEntity(db, storeMap, "VendorStoreID", model.FieldSyncStatus)
} else {
storeMap.SyncStatus = 0
_, err = dao.UpdateEntity(db, storeMap, model.FieldSyncStatus)
}
}
return resultList, err
}, loopMapInfo.StoreMapList)
t.AddChild(loopStoreTask).Run()
resultList, err = loopStoreTask.GetResult(0)
} else {
var resultList []interface{}
db2 := db
var vendorStoreID string
storeMap := loopMapInfo.StoreMapList[0]
if model.IsSyncStatusNew(storeMap.SyncStatus) {
if vendorStoreID, err = handler.CreateStore2(db2, storeMap.StoreID, userName); err == nil {
resultList = append(resultList, 1)
} else {
failedList = putils.GetErrMsg2FailedSingleList(nil, err, storeMap.StoreID, model.VendorChineseNames[storeMap.VendorID], "创建门店")
}
} else if model.IsSyncStatusDelete(storeMap.SyncStatus) {
if err = handler.DeleteStore(db2, storeMap.StoreID, userName); err == nil {
resultList = append(resultList, 1)
} else {
failedList = putils.GetErrMsg2FailedSingleList(nil, err, storeMap.StoreID, model.VendorChineseNames[storeMap.VendorID], "删除门店")
}
} else {
if err = handler.UpdateStore(db, storeMap.StoreID, userName); err == nil {
resultList = append(resultList, 1)
} else {
failedList = putils.GetErrMsg2FailedSingleList(nil, err, storeMap.StoreID, model.VendorChineseNames[storeMap.VendorID], "更新门店")
}
}
if err == nil {
resultList = []interface{}{1}
if model.IsSyncStatusNew(storeMap.SyncStatus) {
storeMap.VendorStoreID = vendorStoreID
storeMap.SyncStatus = 0
_, err = dao.UpdateEntity(db, storeMap, "VendorStoreID", model.FieldSyncStatus)
} else {
storeMap.SyncStatus = 0
_, err = dao.UpdateEntity(db, storeMap, model.FieldSyncStatus)
}
}
}
err = partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}
if len(failedList) > 0 {
t.AddFailedList(failedList)
}
return resultList, err
}, true)
return hint, makeSyncError(err)
}
func (v *VendorSync) SyncStore(ctx *jxcontext.Context, db *dao.DaoDB, vendorID, storeID int, isAsync bool, userName string) (hint string, err error) {
var vendorIDs []int
if vendorID != -1 {
vendorIDs = []int{
vendorID,
}
}
return v.SyncStore2(ctx, db, vendorIDs, []int{storeID}, false, isAsync)
}
func (v *VendorSync) SyncSku(ctx *jxcontext.Context, db *dao.DaoDB, nameID, skuID int, isAsync, isContinueWhenError bool, userName string) (hint string, err error) {
var (
nameIDs []int
skuIDs []int
)
if nameID != -1 {
nameIDs = []int{nameID}
}
if skuID != -1 {
skuIDs = []int{skuID}
}
return v.SyncSkus(ctx, db, nameIDs, skuIDs, isAsync, isContinueWhenError, userName)
}
func (v *VendorSync) SyncSkus(ctx *jxcontext.Context, db *dao.DaoDB, nameIDs []int, skuIDs []int, isAsync, isContinueWhenError bool, userName string) (hint string, err error) {
return SyncSkus(ctx, nil, nil, nil, nameIDs, skuIDs, isAsync)
}
// func (v *VendorSync) oldSyncSkus(ctx *jxcontext.Context, db *dao.DaoDB, nameIDs []int, skuIDs []int, isAsync, isContinueWhenError bool, userName string) (hint string, err error) {
// globals.SugarLogger.Debugf("SyncSku trackInfo:%s, nameIDs:%v, skuIDs:%v, userName:%s", ctx.GetTrackInfo(), nameIDs, skuIDs, userName)
// isManagedIt := !(len(nameIDs) > 0 && len(nameIDs) <= 2 || len(skuIDs) > 0 && len(skuIDs) < 8)
// return v.LoopMultiStoresVendors(ctx, db, fmt.Sprintf("同步商品信息, nameIDs:%v, skuIDs:%v", nameIDs, skuIDs), isAsync, isManagedIt,
// func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
// var resultList []interface{}
// vendorInfo := batchItemList[0].(*MultiStoreVendorInfo)
// multiStoresHandler := v.GetMultiStoreHandler(vendorInfo.VendorID)
// syncStatusFieldName := dao.GetSyncStatusStructField(model.VendorNames[multiStoresHandler.GetVendorID()])
// dbField := dao.ConvertDBFieldPrefix(model.VendorNames[multiStoresHandler.GetVendorID()])
// skuMap := make(map[int]bool)
// sql := fmt.Sprintf(`
// SELECT DISTINCT t2.*
// FROM sku t1
// JOIN sku_name t2 ON t2.id = t1.name_id
// WHERE t1.%s_sync_status <> 0
// `, dbField)
// sqlParams := []interface{}{}
// if len(nameIDs) > 0 {
// sql += " AND t1.name_id IN (" + dao.GenQuestionMarks(len(nameIDs)) + ")"
// sqlParams = append(sqlParams, nameIDs)
// }
// if len(skuIDs) > 0 {
// sql += " AND t1.id IN(" + dao.GenQuestionMarks(len(skuIDs)) + ")"
// sqlParams = append(sqlParams, skuIDs)
// }
// for _, v := range skuIDs {
// skuMap[v] = true
// }
// sql += " ORDER BY t2.id"
// var skuNameList []*model.SkuName
// err := dao.GetRows(db, &skuNameList, sql, sqlParams...)
// if err == nil && len(skuNameList) > 0 {
// // todo 同一skuName下的sku顺序处理的原因是京东SPU特殊类型必须要序列化同步才能正常处理, db可能会有多线程问题
// task := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[vendorInfo.VendorID]), tasksch.NewParallelConfig().SetParallelCount(10).SetIsContinueWhenError(isContinueWhenError), ctx,
// func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
// var resultList []interface{}
// skuName := batchItemList[0].(*model.SkuName)
// var skuList []*model.Sku
// if err = dao.GetRows(db, &skuList, fmt.Sprintf(`
// SELECT *
// FROM sku
// WHERE name_id = ? AND %s_sync_status <> 0
// ORDER BY IF(spec_unit IN('kg', 'L'), 1000, 1) * spec_quality
// `, dbField), skuName.ID); err == nil && len(skuList) > 0 {
// for _, sku := range skuList {
// syncStatus := refutil.GetObjFieldByName(sku, syncStatusFieldName).(int8)
// globals.SugarLogger.Debugf("SyncSku trackInfo2:%s, skuID:%d, syncStatus:%d", ctx.GetTrackInfo(), sku.ID, syncStatus)
// if (len(skuIDs) == 0 || skuMap[sku.ID]) && (syncStatus != 0) {
// updateFields := []string{syncStatusFieldName}
// if syncStatus&model.SyncFlagDeletedMask != 0 { // 删除
// if syncStatus&model.SyncFlagNewMask == 0 {
// err = multiStoresHandler.DeleteSku(db, sku, userName)
// }
// } else if syncStatus&model.SyncFlagNewMask != 0 { // 新增
// if err = multiStoresHandler.CreateSku(db, sku, userName); err == nil {
// var tmpStruct struct {
// MaxIndex int
// }
// // todo hard code 得到京东spu中sku的顺序以方便以后修改销售属性这个必须要每次重新从数据库取
// if dao.GetRow(db, &tmpStruct, "SELECT MAX(sku_index) max_index FROM sku WHERE name_id = ? AND jd_id > 0 AND jd_id < 4024012631406 ", sku.NameID) == nil {
// sku.SkuIndex = tmpStruct.MaxIndex + 1
// updateFields = append(updateFields, "SkuIndex")
// }
// updateFields = append(updateFields, dao.GetVendorThingIDStructField(model.VendorNames[multiStoresHandler.GetVendorID()]))
// }
// } else if syncStatus&model.SyncFlagModifiedMask != 0 { // 修改
// err = multiStoresHandler.UpdateSku(db, sku, userName)
// }
// if err == nil {
// refutil.SetObjFieldByName(sku, syncStatusFieldName, int8(0))
// if _, err = dao.UpdateEntity(db, sku, updateFields...); err != nil {
// break
// } else {
// resultList = append(resultList, 1)
// }
// }
// }
// }
// }
// if err == nil {
// refutil.SetObjFieldByName(skuName, syncStatusFieldName, int8(0))
// _, err = dao.UpdateEntity(db, skuName, syncStatusFieldName)
// }
// return resultList, err
// }, skuNameList)
// t.AddChild(task).Run()
// result, err2 := task.GetResult(0)
// if err = err2; err == nil {
// resultList = append(resultList, result...)
// }
// }
// return resultList, err
// })
// }
func (v *VendorSync) SyncStoresCategory(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isForce, isAsync, isContinueWhenError bool) (hint string, err error) {
globals.SugarLogger.Debug("SyncStoresCategory")
isManageIt := len(storeIDs) != 1
hint, err = v.LoopStoresMap(ctx, db, fmt.Sprintf("同步门店分类信息:%v", storeIDs), isAsync, isManageIt, vendorIDs, storeIDs,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if handler := v.GetSingleStoreHandler(loopMapInfo.VendorID); handler != nil {
if isForce {
dao.SetStoreCategorySyncStatus(db, loopMapInfo.VendorID, storeIDs, nil, model.SyncFlagModifiedMask)
}
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]),
tasksch.NewParallelConfig().SetParallelCount(5).SetIsContinueWhenError(isContinueWhenError), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
storeMap := batchItemList[0].(*model.StoreMap)
_, err = SyncStoreCategories(ctx, task, storeMap.VendorID, storeMap.StoreID, storeMap.VendorStoreID, nil, nil, false, isContinueWhenError)
return nil, err
}, loopMapInfo.StoreMapList)
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
} else {
storeMap := loopMapInfo.StoreMapList[0]
_, err = SyncStoreCategories(ctx, t, storeMap.VendorID, storeMap.StoreID, storeMap.VendorStoreID, nil, nil, false, isContinueWhenError)
}
}
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}, isContinueWhenError)
return hint, makeSyncError(err)
}
//
func (v *VendorSync) SyncStoresSkus2(ctx *jxcontext.Context, parentTask tasksch.ITask, causeFlag int, db *dao.DaoDB, vendorIDs []int, storeIDs []int, syncDisabled bool, skuIDs, excludeSkuIDs []int, setSyncStatus int, isAsync, isContinueWhenError bool) (hint string, err error) {
globals.SugarLogger.Debug("SyncStoresSkus2")
isManageIt := len(storeIDs) != 1 || len(skuIDs) == 0 || len(skuIDs) > 8
task, hint, err := v.LoopStoresMap2(ctx, parentTask, db, fmt.Sprintf("同步门店商品信息:%v", storeIDs), isAsync, isManageIt, vendorIDs, storeIDs, false,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil {
parallelCount := 5
if model.MultiStoresVendorMap[loopMapInfo.VendorID] == 1 {
parallelCount = 2
}
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]),
tasksch.NewParallelConfig().SetParallelCount(parallelCount).SetIsContinueWhenError(isContinueWhenError), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
storeMap := batchItemList[0].(*model.StoreMap)
if syncDisabled || storeMap.Status > model.StoreStatusDisabled {
if setSyncStatus != 0 {
dao.SetStoreSkuSyncStatus(db, storeMap.VendorID, []int{storeMap.StoreID}, skuIDs, setSyncStatus)
}
if _, err = SyncStoreSkuNew(ctx, task, causeFlag, storeMap.VendorID, storeMap.StoreID, storeMap.VendorStoreID, nil, skuIDs, excludeSkuIDs, false, isContinueWhenError); err != nil {
globals.SugarLogger.Debugf("SyncStoresSkus2 failed2 store:%d failed with error:%v", storeMap.StoreID, err)
}
}
return nil, err
}, loopMapInfo.StoreMapList)
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
}
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}, isContinueWhenError)
if task != nil {
err = makeSyncError(err)
}
return hint, err
}
func (v *VendorSync) SyncStoresSkus(ctx *jxcontext.Context, parentTask tasksch.ITask, causeFlag int, db *dao.DaoDB, vendorIDs []int, storeIDs []int, skuIDs []int, isForce, isAsync, isContinueWhenError bool) (hint string, err error) {
setSyncStatus := 0
if isForce {
setSyncStatus = model.SyncFlagStoreSkuModifiedMask
}
return v.SyncStoresSkus2(ctx, parentTask, causeFlag, db, vendorIDs, storeIDs, true, skuIDs, nil, setSyncStatus, isAsync, isContinueWhenError)
}
func (v *VendorSync) FullSyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, syncDisabled bool, excludeSkuIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
globals.SugarLogger.Debug("FullSyncStoresSkus")
hint, err = v.LoopStoresMap(ctx, db, fmt.Sprintf("初始化门店商品信息:%v", storeIDs), isAsync, true, vendorIDs, storeIDs,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil {
parallelCount := 5
if model.MultiStoresVendorMap[loopMapInfo.VendorID] == 1 {
parallelCount = 1
}
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]),
tasksch.NewParallelConfig().SetParallelCount(parallelCount).SetIsContinueWhenError(isContinueWhenError), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
storeMap := batchItemList[0].(*model.StoreMap)
if syncDisabled || storeMap.Status > model.StoreStatusDisabled {
if _, err = FullSyncStoreSkuNew(ctx, task, storeMap.VendorID, storeMap.StoreID, storeMap.VendorStoreID, excludeSkuIDs, false, isContinueWhenError); err != nil {
globals.SugarLogger.Debugf("FullSyncStoresSkus failed2 store:%d failed with error:%v", storeMap.StoreID, err)
}
}
return nil, err
}, loopMapInfo.StoreMapList)
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
}
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}, isContinueWhenError)
return hint, makeSyncError(err)
}
func (v *VendorSync) DeleteRemoteStoreSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
globals.SugarLogger.Debug("DeleteRemoteStoreSkus")
hint, err = v.LoopStoresMap(ctx, db, fmt.Sprintf("删除远程门店商品信息:%v", storeIDs), isAsync, true, vendorIDs, storeIDs,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewSeqTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), ctx,
func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
storeMap := loopMapInfo.StoreMapList[step]
_, err = ClearRemoteStoreStuffAndSetNew(ctx, task, storeMap.VendorID, storeMap.StoreID, storeMap.VendorStoreID, false, isContinueWhenError)
return nil, err
}, len(loopMapInfo.StoreMapList))
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
} else {
_, err = ClearRemoteStoreStuffAndSetNew(ctx, t, loopMapInfo.StoreMapList[0].VendorID, loopMapInfo.StoreMapList[0].StoreID, loopMapInfo.StoreMapList[0].VendorStoreID, false, isContinueWhenError)
}
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}, isContinueWhenError)
return hint, makeSyncError(err)
}
// 将平台有,但本地没有的门店商品清除
// todo京东到家也应该支持
func (v *VendorSync) PruneMissingStoreSkus(ctx *jxcontext.Context, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
globals.SugarLogger.Debug("PruneMissingStoreSkus")
hint, err = v.LoopStoresMap(ctx, dao.GetDB(), fmt.Sprintf("删除远程无关联的门店商品信息:%v", storeIDs), isAsync, true, vendorIDs, storeIDs,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]),
tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetParallelCount(5), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
storeMap := batchItemList[0].(*model.StoreMap)
_, err = PruneMissingStoreSkus(ctx, task, loopMapInfo.VendorID, storeMap.StoreID, storeMap.VendorStoreID, false, isContinueWhenError)
return nil, err
}, loopMapInfo.StoreMapList)
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
} else {
_, err = PruneMissingStoreSkus(ctx, t, loopMapInfo.VendorID, loopMapInfo.StoreMapList[0].StoreID, loopMapInfo.StoreMapList[0].VendorStoreID, false, isContinueWhenError)
}
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}, isContinueWhenError)
return hint, makeSyncError(err)
}
// 把京西有,平台无且没有待创建标记的商品加上待创建标记
// todo京东到家也应该支持
func (v *VendorSync) AddCreateFlagForJxStoreSku(ctx *jxcontext.Context, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
globals.SugarLogger.Debug("AddCreateFlagForJxStoreSku")
hint, err = v.LoopStoresMap(ctx, dao.GetDB(), fmt.Sprintf("处理京西有,平台无且没有待创建标记的商品加上待创建标记:%v", storeIDs), isAsync, true, vendorIDs, storeIDs,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]),
tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetParallelCount(5), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
storeMap := batchItemList[0].(*model.StoreMap)
_, err = AddCreateFlagForJxStoreSku(ctx, task, loopMapInfo.VendorID, storeMap.StoreID, storeMap.VendorStoreID, false, isContinueWhenError)
return nil, err
}, loopMapInfo.StoreMapList)
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
} else {
_, err = AddCreateFlagForJxStoreSku(ctx, t, loopMapInfo.VendorID, loopMapInfo.StoreMapList[0].StoreID, loopMapInfo.StoreMapList[0].VendorStoreID, false, isContinueWhenError)
}
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}, isContinueWhenError)
return hint, makeSyncError(err)
}
func (v *VendorSync) AmendAndPruneStoreStuff(ctx *jxcontext.Context, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool, optType int, isForceUpdate bool) (hint string, err error) {
globals.SugarLogger.Debug("AmendAndPruneStoreStuff")
hint, err = v.LoopStoresMap(ctx, dao.GetDB(), fmt.Sprintf("处理京西平台商家分类与商品差异:%v", storeIDs), isAsync, true, vendorIDs, storeIDs,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]),
tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetParallelCount(5), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
storeMap := batchItemList[0].(*model.StoreMap)
_, err = amendAndPruneStoreStuff(ctx, task, loopMapInfo.VendorID, storeMap.StoreID, storeMap.VendorStoreID, false, isContinueWhenError, optType, isForceUpdate)
return nil, err
}, loopMapInfo.StoreMapList)
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
} else {
_, err = amendAndPruneStoreStuff(ctx, t, loopMapInfo.VendorID, loopMapInfo.StoreMapList[0].StoreID, loopMapInfo.StoreMapList[0].VendorStoreID, false, isContinueWhenError, optType, isForceUpdate)
}
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}, isContinueWhenError)
return hint, makeSyncError(err)
}
func (v *VendorSync) LoopStoresMap2(ctx *jxcontext.Context, parentTask tasksch.ITask, db *dao.DaoDB, taskName string, isAsync, isManageIt bool, vendorIDs []int, storeIDs []int, mustDirty bool, handler tasksch.WorkFunc, isContinueWhenError bool) (task tasksch.ITask, hint string, err error) {
var storeMapList []*model.StoreMap
if storeMapList, err = dao.GetStoresMapList2(db, vendorIDs, storeIDs, nil, model.StoreStatusAll, model.StoreIsSyncYes, "", "", mustDirty); err != nil {
return nil, "", err
}
if len(storeMapList) == 0 {
return nil, "", nil
}
vendorStoreMap := make(map[int][]*model.StoreMap)
for _, v := range storeMapList {
vendorStoreMap[v.VendorID] = append(vendorStoreMap[v.VendorID], v)
}
loopInfoList := make([]*LoopStoreMapInfo, len(vendorStoreMap))
index := 0
for k, v := range vendorStoreMap {
loopInfoList[index] = &LoopStoreMapInfo{
VendorID: k,
StoreMapList: v,
}
index++
}
if len(loopInfoList) == 1 {
taskName = fmt.Sprintf("%s,处理平台%s", taskName, model.VendorChineseNames[loopInfoList[0].VendorID])
}
// 临时把京东的并发改为2
task = tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, handler, loopInfoList)
if isAsync {
buildSetFinishHook(task, ctx)
}
tasksch.HandleTask(task, parentTask, isManageIt).Run()
if !isAsync {
resultList, err2 := task.GetResult(0)
if len(task.GetFailedList()) > 0 {
err2 = buildErrMsg(task)
err = err2
}
if err = err2; err == nil {
if len(resultList) == 0 {
hint = "1" // todo 暂时这样
} else {
hint = jxutils.TaskResult2Hint(resultList)
}
}
} else {
hint = task.GetID()
}
return task, hint, err
}
func buildSetFinishHook(task tasksch.ITask, ctx *jxcontext.Context) {
task.SetFinishHook(func(task tasksch.ITask) {
var noticeMsg = "您此次的同步任务错误详情返回如下: \n"
if ctx.GetUserName() != "jxadmin" {
if len(task.GetFailedList()) > 10 {
downloadURL, _, _ := WirteToExcelBySyncFailed(task)
noticeMsg += fmt.Sprintf("[详情点我]%s/billshow/?normal=true&path=%s \n", globals.BackstageHost, downloadURL)
} else if len(task.GetFailedList()) > 0 && len(task.GetFailedList()) <= 10 {
if task.GetErr() != nil {
noticeMsg += utils.Format4Output(buildErrMsgJson(task), true)
}
} else {
noticeMsg = "您的同步任务执行完成,没有错误返回。"
}
if authInfo, err := ctx.GetV2AuthInfo(); err == nil {
ddmsg.SendUserMessage(dingdingapi.MsgTyeText, authInfo.UserID, "同步错误返回", noticeMsg)
} else {
globals.SugarLogger.Debugf("同步错误发送钉钉消息失败, authinfo [%v] , [%v]", *authInfo, err)
}
} else {
if len(task.GetFailedList()) > 1 {
if time.Now().Hour() >= 20 || time.Now().Hour() < 7 {
downloadURL, _, _ := WirteToExcelBySyncFailed(task)
user, err := dao.GetUserByID(dao.GetDB(), "mobile", "18160030913")
noticeMsg += fmt.Sprintf("[详情点我]%s/billshow/?normal=true&path=%s \n", globals.BackstageHost, downloadURL)
if user != nil && err == nil {
ddmsg.SendUserMessage(dingdingapi.MsgTyeText, user.UserID, "同步错误返回", noticeMsg)
}
}
}
}
})
}
func buildErrMsg(task tasksch.ITask) (err error) {
err = fmt.Errorf(utils.Format4Output(buildErrMsgJson(task), true))
return makeSpecSyncError(err)
}
func buildErrMsgJson(task tasksch.ITask) (resultL []*SyncErrResult) {
failedList := task.GetFailedList()
for _, v := range failedList {
for _, vv := range v.([]*partner.StoreSkuInfoWithErr) {
result := &SyncErrResult{
SkuID: 0,
StoreID: vv.StoreID,
CategoryName: vv.CategoryName,
VendorName: vv.VendoreName,
VendorSkuID: "",
NameID: 0,
VendorPrice: 0,
SyncType: vv.SyncType,
ErrMsg: vv.ErrMsg,
}
if vv.StoreSkuInfo != nil {
result.SkuID = vv.StoreSkuInfo.SkuID
result.VendorSkuID = vv.StoreSkuInfo.VendorSkuID
result.NameID = vv.StoreSkuInfo.NameID
result.VendorPrice = vv.StoreSkuInfo.VendorPrice
}
resultL = append(resultL, result)
}
}
return resultL
}
func (v *VendorSync) LoopStoresMap(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync, isManageIt bool, vendorIDs []int, storeIDs []int, handler tasksch.WorkFunc, isContinueWhenError bool) (hint string, err error) {
_, hint, err = v.LoopStoresMap2(ctx, nil, db, taskName, isAsync, isManageIt, vendorIDs, storeIDs, false, handler, isContinueWhenError)
return hint, err
}
func (v *VendorSync) LoopMultiStoresVendors(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync bool, isManageIt bool, handler tasksch.WorkFunc) (hint string, err error) {
task := tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, handler, getMultiStoreVendorInfoList())
tasksch.HandleTask(task, nil, isManageIt).Run()
if !isAsync {
result, err2 := task.GetResult(0)
if err = err2; err == nil {
hint = utils.Int2Str(len(result))
}
} else {
hint = task.ID
}
return hint, makeSyncError(err)
}
// func (v *VendorSync) RefreshAllSkusID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int, storeIDs []int) (hint string, err error) {
// task := tasksch.NewParallelTask("RefreshAllSkusID", nil, ctx,
// func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
// vendorID := batchItemList[0].(int)
// if handler := v.GetStoreHandler(vendorID); handler != nil {
// if multiHandler, ok := handler.(partner.IMultipleStoresHandler); ok {
// _, err = multiHandler.RefreshAllSkusID(ctx, task, false)
// } else if singleHandler, ok := handler.(partner.ISingleStoreHandler); ok {
// _, err = singleHandler.RefreshStoresAllSkusID(ctx, task, false, storeIDs)
// }
// }
// return nil, err
// }, vendorIDs)
// tasksch.HandleTask(task, nil, true).Run()
// if !isAsync {
// _, err = task.GetResult(0)
// }
// return task.ID, err
// }
func (v *VendorSync) RefreshAllStoresID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int) (hint string, err error) {
task := tasksch.NewParallelTask("RefreshAllStoresID", nil, ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
vendorID := batchItemList[0].(int)
if handler := v.GetStoreHandler(vendorID); handler != nil {
_, err = handler.RefreshAllStoresID(ctx, task, false)
}
return nil, err
}, vendorIDs)
tasksch.HandleTask(task, nil, true).Run()
if !isAsync {
_, err = task.GetResult(0)
}
return task.ID, err
}
func makeSyncError(err error) (newErr error) {
if err != nil {
if _, ok := err.(*SyncError); !ok {
return &SyncError{
Original: err,
}
}
}
return err
}
func makeSpecSyncError(err error) (newErr error) {
if err != nil {
if _, ok := err.(*SpecSyncError); !ok {
return &SpecSyncError{
SpecErr: err,
}
}
}
return err
}
func (e *SpecSyncError) Error() string {
return e.SpecErr.Error()
}
func (e *SyncError) Error() string {
return fmt.Sprintf("本地数据修改成功,但同步失败,请根据错误提示处理!,同步错误信息:%s", e.Original.Error())
}
func isSyncError(err error) bool {
_, ok := err.(*SyncError)
return ok
}
func (v *VendorSync) SyncSkuNames(ctx *jxcontext.Context, nameIDs []int, isForce, isAsync, isContinueWhenError bool) (hint string, err error) {
return SyncSkus(ctx, nil, nil, nil, nameIDs, nil, isAsync)
}
func (v *VendorSync) oldSyncSkuNames(ctx *jxcontext.Context, nameIDs []int, isForce, isAsync, isContinueWhenError bool) (hint string, err error) {
db := dao.GetDB()
if isForce {
dao.SetSkuNameSyncStatus(db, nil, nil, nameIDs, model.SyncFlagModifiedMask)
}
return v.SyncSkus(ctx, db, nameIDs, nil, isAsync, isContinueWhenError, ctx.GetUserName())
}
func (v *VendorSync) ChangeStoreSkuSaleStatus(ctx *jxcontext.Context, storeID int, isAsync, isContinueWhenError bool) (err error) {
var (
storeIDs []int
skuIDs []int
)
db := dao.GetDB()
storeSkuList, err := dao.GetStoresSkusInfoBySaleTime(db, storeID)
if len(storeSkuList) < 1 || err != nil {
return nil
}
for _, v := range storeSkuList {
storeIDs = append(storeIDs, v.StoreID)
skuIDs = append(skuIDs, v.SkuID)
}
vendorIDs := partner.GetPurchasePlatformVendorIDs()
dao.UpdateStoreSkuBindSyncStatusForSaleStatus(db, vendorIDs, storeID)
v.SyncStoresSkus(ctx, nil, model.SyncFlagSaleMask, db, vendorIDs, storeIDs, skuIDs, false, isAsync, isContinueWhenError)
return err
}
func GetTimeMixByInt(begin1, end1, begin2, end2 int16) (beginAt, endAt int16) {
if (begin1 > begin2 && begin1 > end2) || (begin2 > end1 && end2 > end1) {
return 0, 0
}
if begin1 > begin2 {
beginAt = begin1
if end1 > end2 {
endAt = end2
} else {
endAt = end1
}
} else {
beginAt = begin2
if end1 > end2 {
endAt = end2
} else {
endAt = end1
}
}
return beginAt, endAt
}
func WirteToExcelBySyncFailed(task tasksch.ITask) (downloadURL, fileName string, err error) {
var (
sheetList1 []*excel.Obj2ExcelSheetConfig
)
syncErrResultLock.syncErrResult = syncErrResultLock.syncErrResult[0:0]
list := buildErrMsgJson(task)
for _, v := range list {
syncErrResultLock.AppendData(*v)
}
excelConf1 := &excel.Obj2ExcelSheetConfig{
Title: "同步错误",
Data: syncErrResultLock.syncErrResult,
CaptionList: SyncErrResultTitle,
}
sheetList1 = append(sheetList1, excelConf1)
if excelConf1 != nil {
downloadURL, fileName, err = jxutils.UploadExeclAndPushMsg(sheetList1, time.Now().Format("200601021504")+"同步错误返回")
baseapi.SugarLogger.Debug("WriteToExcel: download is [%v]", downloadURL)
} else {
baseapi.SugarLogger.Debug("WriteToExcel: dataSuccess is nil!")
}
if err != nil {
baseapi.SugarLogger.Errorf("WriteToExcel:upload %s , failed error:%v", fileName, err)
}
return downloadURL, fileName, err
}
func (d *SyncErrResultLock) AppendData(syncErrResult SyncErrResult) {
d.locker.Lock()
defer d.locker.Unlock()
d.syncErrResult = append(d.syncErrResult, syncErrResult)
}
func (v *VendorSync) SyncStoreSkusFromYb(ctx *jxcontext.Context, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
var (
vendorID = model.VendorIDYB
)
hint, err = v.LoopStoresMap(ctx, dao.GetDB(), fmt.Sprintf("同步银豹到京西:%v", storeIDs), isAsync, true, []int{vendorID}, storeIDs,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
for _, v := range loopMapInfo.StoreMapList {
hint, err = syncStoreSkusFromYb(ctx, v.StoreID, vendorID, v.VendorStoreID, isAsync, isContinueWhenError)
}
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}, isContinueWhenError)
return hint, err
}
func syncStoreSkusFromYb(ctx *jxcontext.Context, storeID, vendorID int, vendorStoreID string, isAsync, isContinueWhenError bool) (hint string, err error) {
var (
db = dao.GetDB()
localSkuMap = make(map[string]*dao.StoreSkuSyncInfo)
vendorSkuMap = make(map[string]*partner.SkuNameInfo)
// skuBindInfosDel []*StoreSkuBindInfo
// skuBindInfosUpt []*StoreSkuBindInfo
addList []*partner.SkuNameInfo
updateList []*partner.SkuNameInfo
deleteList []*dao.StoreSkuSyncInfo
)
handler, _ := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler)
localSkuList, err := dao.GetStoreSkus2(db, vendorID, storeID, nil, false)
if err != nil {
return "", err
}
for _, v := range localSkuList {
localSkuMap[v.VendorSkuID] = v
}
remoteSkuList, err := handler.GetStoreSkusFullInfo(ctx, nil, storeID, vendorStoreID, nil)
if err != nil {
return "", err
}
for _, v := range remoteSkuList {
if localSkuMap[v.SkuList[0].VendorSkuID] == nil {
if len(v.YbBarCode) > 7 {
addList = append(addList, v)
}
} else {
updateList = append(updateList, v)
}
vendorSkuMap[v.SkuList[0].VendorSkuID] = v
}
for _, v := range localSkuList {
if vendorSkuMap[v.VendorSkuID] == nil {
deleteList = append(deleteList, v)
}
}
fmt.Println("remoteSkuList", len(remoteSkuList))
fmt.Println("addList", len(addList))
fmt.Println("updateList", len(updateList))
// taskSeqFunc := func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
// store, _ := dao.GetStoreDetail(db, storeID, vendorID)
// switch step {
// case 0:
// if len(addList) > 0 {
// taskFunc := func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
// var (
// v = batchItemList[0].(*partner.SkuNameInfo)
// upc = v.YbBarCode
// )
// err = AddSkuNameByUpc(ctx, upc, store, v)
// if err != nil {
// task.AddFailedList(putils.GetErrMsg2FailedSingleList(nil, err, storeID, model.VendorChineseNames[vendorID], "根据upc创建京西商品"))
// }
// return retVal, err
// }
// taskParallel := tasksch.NewParallelTask("创建商品", tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, taskFunc, addList)
// tasksch.HandleTask(taskParallel, task, true).Run()
// _, err = taskParallel.GetResult(0)
// }
// case 1:
// if len(deleteList) > 0 {
// taskFunc := func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
// var (
// v = batchItemList[0].(*dao.StoreSkuSyncInfo)
// )
// skuBindInfo := &StoreSkuBindInfo{
// NameID: v.NameID,
// IsFocus: -1,
// }
// retVal = []*StoreSkuBindInfo{skuBindInfo}
// return retVal, err
// }
// taskParallel := tasksch.NewParallelTask("删除商品", tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, taskFunc, deleteList)
// tasksch.HandleTask(taskParallel, task, true).Run()
// resultDel, _ := taskParallel.GetResult(0)
// for _, v := range resultDel {
// skuBindInfosDel = append(skuBindInfosDel, v.(*StoreSkuBindInfo))
// }
// _, err = updateStoresSkusWithoutSync(ctx, db, []int{storeID}, skuBindInfosDel, false, false)
// }
// case 2:
// if len(updateList) > 0 {
// taskFunc := func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
// var (
// v = batchItemList[0].(*partner.SkuNameInfo)
// skuBindInfo = &StoreSkuBindInfo{}
// storeSkus []*dao.StoreSkuExt
// pricePercentagePack []*model.PricePercentageItem
// )
// sql := `
// SELECT a.*, c.id name_id
// FROM store_sku_bind a
// JOIN sku b ON a.sku_id = b.id
// JOIN sku_name c ON c.id = b.name_id
// WHERE a.store_id = ? AND a.yb_id = ? AND a.deleted_at = ?
// `
// sqlParams := []interface{}{storeID, v.SkuList[0].VendorSkuID, utils.DefaultTimeValue}
// err = dao.GetRows(db, &storeSkus, sql, sqlParams)
// if len(storeSkus) > 0 {
// if storeSkus[0].YbPrice != int(v.SkuList[0].VendorPrice) {
// err = jxutils.Strings2Objs(store.PricePercentagePackStr, &pricePercentagePack)
// skuBindInfo.UnitPrice = jxutils.CaculateJxPriceByPricePack(pricePercentagePack, 0, int(v.SkuList[0].VendorPrice))
// }
// } else {
// return retVal, fmt.Errorf("未查询到门店商品yb_id [%v]", v.SkuList[0].VendorSkuID)
// }
// if v.SkuList[0].Stock < 1 {
// skuBindInfo.IsSale = model.StoreSkuBindStatusDontSale
// } else {
// skuBindInfo.IsSale = model.StoreSkuBindStatusNormal
// }
// skuBindInfo.NameID = storeSkus[0].NameID
// retVal = []*StoreSkuBindInfo{skuBindInfo}
// return retVal, err
// }
// taskParallel := tasksch.NewParallelTask("更新商品价格和库存", tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, taskFunc, updateList)
// tasksch.HandleTask(taskParallel, task, true).Run()
// resultUpt, _ := taskParallel.GetResult(0)
// for _, v := range resultUpt {
// skuBindInfosUpt = append(skuBindInfosUpt, v.(*StoreSkuBindInfo))
// }
// _, err = updateStoresSkusWithoutSync(ctx, db, []int{storeID}, skuBindInfosUpt, false, false)
// }
// case 3:
// _, err = CurVendorSync.SyncStoresSkus2(jxcontext.AdminCtx, nil, 0, db, []int{0, 1, 3}, nil, false, nil, nil, 0, true, true)
// }
// return result, err
// }
// taskSeq := tasksch.NewSeqTask2("同步银豹商品到京西", ctx, true, taskSeqFunc, 3)
// tasksch.HandleTask(taskSeq, nil, true).Run()
// hint = taskSeq.GetID()
return hint, err
}
func (v *VendorSync) SyncJdsStoresSkus(ctx *jxcontext.Context, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
var (
db = dao.GetDB()
)
storeSkus, _ := dao.GetStoresSkusInfo(db, []int{model.JdShopMainStoreID}, nil)
_, hint, err = v.LoopStoresMap2(ctx, nil, db, fmt.Sprintf("同步京东商城库存商品信息:%v", storeIDs), isAsync, true, []int{model.VendorIDJDShop}, storeIDs, false,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil {
for _, storeMap := range loopMapInfo.StoreMapList {
if storeMap.Status > model.StoreStatusDisabled && storeMap.StoreID != model.JdShopMainStoreID && storeMap.SyncRule != 0 {
err = syncJdsStoresSkus(ctx, db, nil, storeMap, isAsync, isContinueWhenError)
}
err = syncJdsStoreStock(ctx, db, storeSkus, storeMap)
}
}
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}, isContinueWhenError)
return hint, err
}
func syncJdsStoreStock(ctx *jxcontext.Context, db *dao.DaoDB, storeSkus []*model.StoreSkuBind, storeMap *model.StoreMap) (err error) {
for _, storeSku := range storeSkus {
stock := 0
if storeSku.Status == model.StoreSkuBindStatusNormal {
stock = 9999
}
storeSku2, _ := dao.GetStoresSkusInfo(db, []int{storeMap.StoreID}, []int{storeSku.SkuID})
if storeSku.JdsID != 0 && len(storeSku2) > 0 {
if storeSku.Status != storeSku2[0].Status && storeMap.VendorStoreID != "" {
err = api.JdShopAPI.UpdateSkuSiteStock(storeSku.JdsID, stock, utils.Str2Int(storeMap.VendorStoreID))
}
}
}
// task := tasksch.NewParallelTask("syncJdsStoreStock", tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx,
// func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
// storeSku := batchItemList[0].(*model.StoreSkuBind)
// return retVal, err
// }, storeSkus)
// tasksch.HandleTask(task, nil, true).Run()
// _, err = task.GetResult(0)
return err
}
func syncJdsStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, parentTask tasksch.ITask, storeMap *model.StoreMap, isAsync, isContinueWhenError bool) (err error) {
var (
mainSkusMap = make(map[int][]*dao.StoreSkuSyncInfo)
skusMap = make(map[int][]*dao.StoreSkuSyncInfo)
updateList []*dao.StoreSkuSyncInfo
addList []*dao.StoreSkuSyncInfo
skuBindInfos1 []*StoreSkuBindInfo
skuBindInfos2 []*StoreSkuBindInfo
)
storeSkusMain, err := dao.GetStoreSkusByNameIDs(db, []int{model.JdShopMainStoreID}, 0)
for _, v := range storeSkusMain {
mainSkusMap[v.NameID] = append(mainSkusMap[v.NameID], v)
}
storeSkus, err := dao.GetStoreSkusByNameIDs(db, []int{storeMap.StoreID}, 0)
for _, v := range storeSkus {
skusMap[v.NameID] = append(skusMap[v.NameID], v)
}
for k, v := range skusMap {
if mainSkusMap[k] != nil {
flag := false
for _, storeSku := range v {
if storeSku.StoreSkuStatus == model.StoreSkuBindStatusNormal {
flag = true
}
}
if !flag {
continue
}
for _, storeSku := range v {
for _, storeSkuMain := range mainSkusMap[k] {
if storeSkuMain.StoreSkuStatus == model.StoreSkuBindStatusNormal && storeSku.StoreSkuStatus == model.StoreSkuBindStatusDontSale &&
storeSkuMain.SkuID == storeSku.SkuID {
updateList = append(updateList, storeSkuMain)
}
}
}
}
}
for k, v := range mainSkusMap {
if skusMap[k] == nil {
if storeMap.SyncRule == 2 {
for _, storeSkuMain := range v {
addList = append(addList, storeSkuMain)
}
}
}
}
// fmt.Println("updateList", utils.Format4Output(updateList, false))
// fmt.Println("addList", utils.Format4Output(addList, false))
if len(updateList) > 0 {
for _, v := range updateList {
skuBindInfos1 = append(skuBindInfos1, buildStoreSkuBindInfo(db, storeMap.StoreID, v, false))
}
UpdateStoresSkusByBind(ctx, parentTask, skuBindInfos1, isAsync, isContinueWhenError, false)
}
if len(addList) > 0 {
for _, v := range addList {
skuBindInfos2 = append(skuBindInfos2, buildStoreSkuBindInfo(db, storeMap.StoreID, v, true))
}
UpdateStoresSkusByBind(ctx, parentTask, skuBindInfos2, isAsync, isContinueWhenError, false)
}
return err
}
func buildStoreSkuBindInfo(db *dao.DaoDB, storeID int, storeBind *dao.StoreSkuSyncInfo, isFocus bool) (skuBindInfo *StoreSkuBindInfo) {
skus := []*StoreSkuBindSkuInfo{
&StoreSkuBindSkuInfo{
SkuID: storeBind.SkuID,
},
}
skuBindInfo = &StoreSkuBindInfo{
StoreID: storeID,
NameID: storeBind.NameID,
}
if isFocus {
skuBindInfo.IsFocus = 1
}
if storeBind.StoreSkuStatus == model.SkuStatusNormal {
skus[0].IsSale = 1
} else {
skus[0].IsSale = -1
}
skuBindInfo.Skus = skus
return skuBindInfo
}
func SyncSkuExperfixAndWatermark(ctx *jxcontext.Context) (err error) {
var (
db = dao.GetDB()
)
skuExinfos, err := dao.GetSkuExinfos(db, nil, []int{model.VendorIDMTWM, model.VendorIDEBAI, model.VendorIDJD}, "", utils.ZeroTimeValue, utils.ZeroTimeValue)
task := tasksch.NewParallelTask("SyncSkuExperfixAndWatermark", tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
skuExinfo := batchItemList[0].(*model.SkuExinfoMap)
skus, err := dao.GetSkus(db, nil, []int{skuExinfo.NameID}, nil, nil, nil)
var skuIDs []int
for _, v := range skus {
skuIDs = append(skuIDs, v.ID)
}
if partner.IsMultiStore(skuExinfo.VendorID) {
for _, v := range skuIDs {
OnUpdateThing(ctx, db, nil, int64(v), model.ThingTypeSku)
}
// CurVendorSync.SyncSkus(ctx, db, nil, skuIDs, true, true, ctx.GetUserName())
} else {
if len(skuIDs) > 0 {
SetStoreSkuSyncStatus2(db, nil, []int{skuExinfo.VendorID}, skuIDs, model.SyncFlagModifiedMask)
}
// CurVendorSync.SyncStoresSkus2(ctx, nil, 0, db, []int{skuExinfo.VendorID}, nil, false, skuIDs, nil, model.SyncFlagModifiedMask, true, true)
}
return retVal, err
}, skuExinfos)
tasksch.HandleTask(task, nil, true).Run()
_, err = task.GetResult(0)
return err
}
func SetMTPSStatus(ctx *jxcontext.Context) {
/* 美团配送的门店营业状态,如果是休息,则京西这边不发单,如果是营业,京西这边状态就是发单
美团配送的门店营业状态要通过网页扒取(应该是接了的代码里要找一下)*/
/*获取门店信息*/
StoreInfoList, err := api.MtpsAPI.GetStoreStatusAll()
StoreInfoList1 := make(map[string]int)
for _, store := range StoreInfoList {
for _, data := range store.DataList {
StoreInfoList1[data.OuterPoiID] = data.OpenType
}
}
/* fmt.Println(num)
fmt.Println(StoreInfoList)*/
db := dao.GetDB()
/*比较营业状态*/
StoreCourierList, err := dao.GetStoreCourierList(db, []int{}, -9, -9)
for _, StoreCourierList1 := range StoreCourierList {
/*如果京西店在美团上没有,那下面有专门方法处理*/
if _, ok := StoreInfoList1[StoreCourierList1.VendorStoreID]; ok {
/*京西配送状态为配送,美团营业状态是没有营业的*/
if StoreCourierList1.Status == 1 && StoreInfoList1[StoreCourierList1.VendorStoreID] == 0 {
sl := make(map[string]interface{})
sl["status"] = 0
sl["vendorStoreID"] = StoreCourierList1.VendorStoreID
UpdateStoreCourierMap(ctx, nil, StoreCourierList1.StoreID, StoreCourierList1.VendorID, sl, ctx.GetUserName())
/*京西配送状态为不配送,美团营业状态是营业的*/
} else if StoreCourierList1.Status != 1 && StoreInfoList1[StoreCourierList1.VendorStoreID] == 1 {
sl := make(map[string]interface{})
sl["status"] = 1
sl["vendorStoreID"] = StoreCourierList1.VendorStoreID
UpdateStoreCourierMap(ctx, nil, StoreCourierList1.StoreID, StoreCourierList1.VendorID, sl, ctx.GetUserName())
}
}
}
/* 美团配送的门店是否存在调用美团配送的api有可能接了查询京西门店对应的美团配送门店是否存在若不存在则要在京西这边解绑美团配送门店
怎么解绑可以在网页上门店管理那点一下看看调的什么接口,传的什么参数*/
/*获取所有门店信息*/
StoreLists, err := dao.GetStoreList(db, nil, nil, []int{}, nil, "")
if err != nil {
globals.SugarLogger.Debug(err.Error())
}
for _, StoreList := range StoreLists {
StoreCourierList, err := dao.GetStoreCourierList(db, []int{StoreList.ID}, model.StoreStatusAll, model.StoreStatusAll)
if err != nil {
globals.SugarLogger.Debug(err.Error())
}
for _, StoreCourierList1 := range StoreCourierList {
if StoreCourierList1.VendorID != model.VendorIDMTPS {
continue
}
if StoreCourierList1.Status == model.StoreStatusDisabled {
continue
}
/*京西不为空*/
if StoreCourierList1.VendorStoreID != "" {
/*调用API获取美团的商店信息*/
MTPSInfo, _ := api.MtpsAPI.ShopQuery(StoreCourierList1.VendorStoreID)
/*京西有,美团没有*/
if MTPSInfo == nil {
fmt.Println(strconv.Itoa(StoreCourierList1.StoreID) + " 被解绑")
if _, err := DeleteStoreCourierMap(ctx, db, StoreCourierList1.StoreID, StoreCourierList1.VendorID, ctx.GetUserName()); err != nil {
globals.SugarLogger.Debug(err.Error())
return
}
continue
}
/* 果园和菜市的门店ID可能有重复导致在美团配送上建的门店只有一方的门店是生效的可以根据ID在美团配送上查查到是果园的店那菜市的店则要重新绑定美团配送要换一个美团配送的门店ID。不然要重复
反之也要,可根据其他信息,比如门店名(传到平台上的规则都是一样的),坐标等判断是果园的还是菜市的*/
/*美团上坐标不为空,容错 PS用名字的话本身就没办法判断是菜市还是果园*/
if MTPSInfo.ShopLng != 0 && MTPSInfo.ShopLat != 0 {
if MTPSInfo.ShopLng == StoreList.Lng && MTPSInfo.ShopLat == StoreList.Lat {
/*平台上有且坐标相同不做处理*/
continue
} else {
/*平台上但是坐标不同,解绑*/
fmt.Println(strconv.Itoa(StoreCourierList1.StoreID) + " 被解绑")
if _, err := DeleteStoreCourierMap(ctx, db, StoreCourierList1.StoreID, StoreCourierList1.VendorID, ctx.GetUserName()); err != nil {
globals.SugarLogger.Debug(err.Error())
return
}
}
} else { //容错
/*美团平台上面没找到,解绑*/
fmt.Println(strconv.Itoa(StoreCourierList1.StoreID) + " 被解绑")
if _, err := DeleteStoreCourierMap(ctx, db, StoreCourierList1.StoreID, StoreCourierList1.VendorID, ctx.GetUserName()); err != nil {
globals.SugarLogger.Debug(err.Error())
return
}
//} else {
// 美团平台上有,京西没得,调用方法可能需要做一些修改
// context.TODO()
//}
}
}
}
}
}