Files
jx-callback/business/jxstore/cms/sync.go
2020-05-20 18:23:17 +08:00

1086 lines
47 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package cms
import (
"errors"
"fmt"
"sync"
"time"
"git.rosy.net.cn/jx-callback/business/partner/putils"
"git.rosy.net.cn/baseapi"
"git.rosy.net.cn/baseapi/platformapi/dingdingapi"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxutils"
"git.rosy.net.cn/jx-callback/business/jxutils/ddmsg"
"git.rosy.net.cn/jx-callback/business/jxutils/excel"
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
"git.rosy.net.cn/jx-callback/business/jxutils/tasksch"
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/model/dao"
"git.rosy.net.cn/jx-callback/business/partner"
"git.rosy.net.cn/jx-callback/globals"
)
type SyncErrResult struct {
SkuID int `json:"商品ID"`
CategoryName string `json:"分类名"`
StoreID int `json:"门店ID"`
VendorName string `json:"平台名"`
VendorSkuID string `json:"平台商品ID"`
NameID int `json:"商品nameID"`
VendorPrice int64 `json:"平台价"`
SyncType string `json:"同步类型"`
ErrMsg string `json:"错误信息"`
}
type SyncErrResultLock struct {
syncErrResult []SyncErrResult
locker sync.RWMutex
}
type LoopStoreMapInfo struct {
VendorID int
StoreMapList []*model.StoreMap
}
type VendorSync struct {
}
type SyncError struct {
Original error `json:"original"`
Message string `json:"message"`
}
type SpecSyncError struct {
SpecErr error `json:"specErr"`
}
// 对于多门店平台接口的通用处理
type MultiStoreHandlerWrapper struct {
partner.IMultipleStoresHandler
}
// 对于单门店平台接口的通用处理
type SingleStoreHandlerWrapper struct {
partner.ISingleStoreHandler
}
var (
CurVendorSync VendorSync
)
var (
ErrHaveNotImplementedYet = errors.New("还没有实现")
ErrEntityNotExist = errors.New("找不到相应实体")
SyncErrResultTitle = []string{
"商品ID",
"分类名",
"门店ID",
"平台名",
"平台商品ID",
"商品nameID",
"平台价",
"同步类型",
"错误信息",
}
syncErrResultLock SyncErrResultLock
)
// func (p *MultiStoreHandlerWrapper) DeleteCategory(db *dao.DaoDB, cat *model.SkuCategory, userName string) (err error) {
// if jxutils.IsEmptyID(cat.JdID) {
// return nil
// }
// return p.IMultipleStoresHandler.DeleteCategory(db, cat, userName)
// }
// func (p *MultiStoreHandlerWrapper) UpdateCategory(db *dao.DaoDB, cat *model.SkuCategory, userName string) (err error) {
// if jxutils.IsEmptyID(cat.JdID) {
// globals.SugarLogger.Warnf("UpdateCategory fakeid cat:%s should not get here", utils.Format4Output(cat, true))
// return nil
// }
// return p.IMultipleStoresHandler.UpdateCategory(db, cat, userName)
// }
// func (p *MultiStoreHandlerWrapper) DeleteSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error) {
// globals.SugarLogger.Debugf("wrapper DeleteSku, sku:%s", utils.Format4Output(sku, false))
// if jxutils.IsEmptyID(sku.JdID) {
// return nil
// }
// return p.IMultipleStoresHandler.DeleteSku(db, sku, userName)
// }
// func (p *MultiStoreHandlerWrapper) UpdateSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error) {
// if jxutils.IsEmptyID(sku.JdID) {
// globals.SugarLogger.Warnf("UpdateSku fakeid sku:%s should not get here", utils.Format4Output(sku, true))
// return nil
// }
// return p.IMultipleStoresHandler.UpdateSku(db, sku, userName)
// }
func (v *VendorSync) GetStoreHandler(vendorID int) partner.IPurchasePlatformHandler {
return partner.GetPurchasePlatformFromVendorID(vendorID)
}
func (v *VendorSync) GetMultiStoreHandler(vendorID int) partner.IMultipleStoresHandler {
if handler, ok := v.GetStoreHandler(vendorID).(partner.IMultipleStoresHandler); ok {
return handler
}
return nil
}
func (v *VendorSync) GetSingleStoreHandler(vendorID int) partner.ISingleStoreHandler {
if handler, ok := v.GetStoreHandler(vendorID).(partner.ISingleStoreHandler); ok {
return handler
}
return nil
}
// func (v *VendorSync) syncCategories(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorID int, db *dao.DaoDB, cats []*model.SkuCategory, userName string) (err error) {
// multiStoresHandler := v.GetMultiStoreHandler(vendorID)
// syncStatusFieldName := dao.GetSyncStatusStructField(model.VendorNames[vendorID])
// task := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[vendorID]), nil, ctx,
// func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
// cat := batchItemList[0].(*model.SkuCategory)
// updateFields := []string{syncStatusFieldName}
// syncStatus := refutil.GetObjFieldByName(cat, syncStatusFieldName).(int8)
// if (syncStatus & model.SyncFlagDeletedMask) != 0 { //删除
// if (syncStatus & model.SyncFlagNewMask) == 0 {
// err = multiStoresHandler.DeleteCategory(db, cat, userName)
// }
// } else if (syncStatus & model.SyncFlagNewMask) != 0 { // 新增
// err = multiStoresHandler.CreateCategory(db, cat, userName)
// updateFields = append(updateFields, dao.GetVendorThingIDStructField(model.VendorNames[multiStoresHandler.GetVendorID()]))
// } else if (syncStatus & model.SyncFlagModifiedMask) != 0 { // 修改
// err = multiStoresHandler.UpdateCategory(db, cat, userName)
// if intErr, ok := err.(*utils.ErrorWithCode); ok && intErr.IntCode() == -3 {
// err = nil
// }
// }
// if err == nil {
// refutil.SetObjFieldByName(cat, syncStatusFieldName, int8(0))
// _, err = dao.UpdateEntity(db, cat, updateFields...)
// }
// return nil, err
// }, cats)
// tasksch.HandleTask(task, parentTask, false).Run()
// _, err = task.GetResult(0)
// return err
// }
func (v *VendorSync) SyncCategory(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) {
return SyncCategories(ctx, nil, nil, nil, []int{categoryID}, isAsync)
}
// func (v *VendorSync) oldSyncCategory(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) {
// globals.SugarLogger.Debug("SyncCategory")
// hint, err = v.LoopMultiStoresVendors(ctx, db, fmt.Sprintf("同步分类信息:%d", categoryID), isAsync, false,
// func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
// vendorInfo := batchItemList[0].(*MultiStoreVendorInfo)
// var cats []*model.SkuCategory
// cond := make(map[string]interface{})
// if categoryID > 0 {
// cond[model.FieldID] = categoryID
// } else {
// cond[model.FieldLevel] = 1
// }
// err := dao.GetEntitiesByKV(db, &cats, cond, true)
// if err == nil {
// err = v.syncCategories(ctx, t, vendorInfo.VendorID, db, cats, userName)
// }
// if err != nil || categoryID > 0 {
// return nil, err
// }
// cond[model.FieldLevel] = 2
// err = dao.GetEntitiesByKV(db, &cats, cond, true)
// if err == nil {
// err = v.syncCategories(ctx, t, vendorInfo.VendorID, db, cats, userName)
// }
// return nil, err
// })
// return "", err
// }
func (v *VendorSync) SyncReorderCategories(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) {
return SyncReorderCategories(ctx, categoryID, isAsync)
}
// func (v *VendorSync) oldSyncReorderCategories(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) {
// hint, err = v.LoopMultiStoresVendors(ctx, db, fmt.Sprintf("分类重排序:%d", categoryID), isAsync, false, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
// vendorInfo := batchItemList[0].(*MultiStoreVendorInfo)
// multiStoresHandler := v.GetMultiStoreHandler(vendorInfo.VendorID)
// err2 := multiStoresHandler.ReorderCategories(db, categoryID, userName)
// if err2 == nil {
// cat := &model.SkuCategory{}
// _, err2 = dao.UpdateEntityByKV(db, cat, utils.Params2Map(dao.GetSyncStatusStructField(model.VendorNames[multiStoresHandler.GetVendorID()]), 0), utils.Params2Map(model.FieldParentID, categoryID))
// return nil, err2
// }
// return nil, err2
// })
// return "", err
// }
func (v *VendorSync) SyncStore2(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs, storeIDs []int, mustDirty, isAsync bool) (hint string, err error) {
globals.SugarLogger.Debugf("SyncStore2, storeIDs:%d", storeIDs)
userName := ctx.GetUserName()
isManageIt := len(storeIDs) == 0 || len(storeIDs) > 5
_, hint, err = v.LoopStoresMap2(ctx, nil, db, fmt.Sprintf("同步门店信息:%v", storeIDs), isAsync, isManageIt, vendorIDs, storeIDs, mustDirty, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (resultList interface{}, err error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
var failedList []*partner.StoreSkuInfoWithErr
handler := v.GetStoreHandler(loopMapInfo.VendorID)
if handler != nil {
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
var resultList []interface{}
var vendorStoreID string
storeMap := batchItemList[0].(*model.StoreMap)
db2 := db
if len(loopMapInfo.StoreMapList) > 1 {
db2 = dao.GetDB()
}
if model.IsSyncStatusNew(storeMap.SyncStatus) {
if vendorStoreID, err = handler.CreateStore2(db2, storeMap.StoreID, userName); err == nil {
resultList = append(resultList, 1)
} else {
failedList = putils.GetErrMsg2FailedSingleList(nil, err, storeMap.StoreID, model.VendorChineseNames[storeMap.VendorID], "创建门店")
}
} else if model.IsSyncStatusDelete(storeMap.SyncStatus) {
if err = handler.DeleteStore(db2, storeMap.StoreID, userName); err == nil {
resultList = append(resultList, 1)
} else {
failedList = putils.GetErrMsg2FailedSingleList(nil, err, storeMap.StoreID, model.VendorChineseNames[storeMap.VendorID], "删除门店")
}
} else {
if err = handler.UpdateStore(db2, storeMap.StoreID, userName); err == nil {
resultList = append(resultList, 1)
} else {
failedList = putils.GetErrMsg2FailedSingleList(nil, err, storeMap.StoreID, model.VendorChineseNames[storeMap.VendorID], "更新门店")
}
}
if err == nil {
if model.IsSyncStatusNew(storeMap.SyncStatus) {
storeMap.VendorStoreID = vendorStoreID
storeMap.SyncStatus = 0
_, err = dao.UpdateEntity(db, storeMap, "VendorStoreID", model.FieldSyncStatus)
} else {
storeMap.SyncStatus = 0
_, err = dao.UpdateEntity(db, storeMap, model.FieldSyncStatus)
}
}
return resultList, err
}, loopMapInfo.StoreMapList)
t.AddChild(loopStoreTask).Run()
resultList, err = loopStoreTask.GetResult(0)
} else {
var resultList []interface{}
db2 := db
var vendorStoreID string
storeMap := loopMapInfo.StoreMapList[0]
if model.IsSyncStatusNew(storeMap.SyncStatus) {
if vendorStoreID, err = handler.CreateStore2(db2, storeMap.StoreID, userName); err == nil {
resultList = append(resultList, 1)
} else {
failedList = putils.GetErrMsg2FailedSingleList(nil, err, storeMap.StoreID, model.VendorChineseNames[storeMap.VendorID], "创建门店")
}
} else if model.IsSyncStatusDelete(storeMap.SyncStatus) {
if err = handler.DeleteStore(db2, storeMap.StoreID, userName); err == nil {
resultList = append(resultList, 1)
} else {
failedList = putils.GetErrMsg2FailedSingleList(nil, err, storeMap.StoreID, model.VendorChineseNames[storeMap.VendorID], "删除门店")
}
} else {
if err = handler.UpdateStore(db, storeMap.StoreID, userName); err == nil {
resultList = append(resultList, 1)
} else {
failedList = putils.GetErrMsg2FailedSingleList(nil, err, storeMap.StoreID, model.VendorChineseNames[storeMap.VendorID], "更新门店")
}
}
if err == nil {
resultList = []interface{}{1}
if model.IsSyncStatusNew(storeMap.SyncStatus) {
storeMap.VendorStoreID = vendorStoreID
storeMap.SyncStatus = 0
_, err = dao.UpdateEntity(db, storeMap, "VendorStoreID", model.FieldSyncStatus)
} else {
storeMap.SyncStatus = 0
_, err = dao.UpdateEntity(db, storeMap, model.FieldSyncStatus)
}
}
}
err = partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}
if len(failedList) > 0 {
t.AddFailedList(failedList)
}
return resultList, err
}, true)
return hint, makeSyncError(err)
}
func (v *VendorSync) SyncStore(ctx *jxcontext.Context, db *dao.DaoDB, vendorID, storeID int, isAsync bool, userName string) (hint string, err error) {
var vendorIDs []int
if vendorID != -1 {
vendorIDs = []int{
vendorID,
}
}
return v.SyncStore2(ctx, db, vendorIDs, []int{storeID}, false, isAsync)
}
func (v *VendorSync) SyncSku(ctx *jxcontext.Context, db *dao.DaoDB, nameID, skuID int, isAsync, isContinueWhenError bool, userName string) (hint string, err error) {
var (
nameIDs []int
skuIDs []int
)
if nameID != -1 {
nameIDs = []int{nameID}
}
if skuID != -1 {
skuIDs = []int{skuID}
}
return v.SyncSkus(ctx, db, nameIDs, skuIDs, isAsync, isContinueWhenError, userName)
}
func (v *VendorSync) SyncSkus(ctx *jxcontext.Context, db *dao.DaoDB, nameIDs []int, skuIDs []int, isAsync, isContinueWhenError bool, userName string) (hint string, err error) {
return SyncSkus(ctx, nil, nil, nil, nameIDs, skuIDs, isAsync)
}
// func (v *VendorSync) oldSyncSkus(ctx *jxcontext.Context, db *dao.DaoDB, nameIDs []int, skuIDs []int, isAsync, isContinueWhenError bool, userName string) (hint string, err error) {
// globals.SugarLogger.Debugf("SyncSku trackInfo:%s, nameIDs:%v, skuIDs:%v, userName:%s", ctx.GetTrackInfo(), nameIDs, skuIDs, userName)
// isManagedIt := !(len(nameIDs) > 0 && len(nameIDs) <= 2 || len(skuIDs) > 0 && len(skuIDs) < 8)
// return v.LoopMultiStoresVendors(ctx, db, fmt.Sprintf("同步商品信息, nameIDs:%v, skuIDs:%v", nameIDs, skuIDs), isAsync, isManagedIt,
// func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
// var resultList []interface{}
// vendorInfo := batchItemList[0].(*MultiStoreVendorInfo)
// multiStoresHandler := v.GetMultiStoreHandler(vendorInfo.VendorID)
// syncStatusFieldName := dao.GetSyncStatusStructField(model.VendorNames[multiStoresHandler.GetVendorID()])
// dbField := dao.ConvertDBFieldPrefix(model.VendorNames[multiStoresHandler.GetVendorID()])
// skuMap := make(map[int]bool)
// sql := fmt.Sprintf(`
// SELECT DISTINCT t2.*
// FROM sku t1
// JOIN sku_name t2 ON t2.id = t1.name_id
// WHERE t1.%s_sync_status <> 0
// `, dbField)
// sqlParams := []interface{}{}
// if len(nameIDs) > 0 {
// sql += " AND t1.name_id IN (" + dao.GenQuestionMarks(len(nameIDs)) + ")"
// sqlParams = append(sqlParams, nameIDs)
// }
// if len(skuIDs) > 0 {
// sql += " AND t1.id IN(" + dao.GenQuestionMarks(len(skuIDs)) + ")"
// sqlParams = append(sqlParams, skuIDs)
// }
// for _, v := range skuIDs {
// skuMap[v] = true
// }
// sql += " ORDER BY t2.id"
// var skuNameList []*model.SkuName
// err := dao.GetRows(db, &skuNameList, sql, sqlParams...)
// if err == nil && len(skuNameList) > 0 {
// // todo 同一skuName下的sku顺序处理的原因是京东SPU特殊类型必须要序列化同步才能正常处理, db可能会有多线程问题
// task := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[vendorInfo.VendorID]), tasksch.NewParallelConfig().SetParallelCount(10).SetIsContinueWhenError(isContinueWhenError), ctx,
// func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
// var resultList []interface{}
// skuName := batchItemList[0].(*model.SkuName)
// var skuList []*model.Sku
// if err = dao.GetRows(db, &skuList, fmt.Sprintf(`
// SELECT *
// FROM sku
// WHERE name_id = ? AND %s_sync_status <> 0
// ORDER BY IF(spec_unit IN('kg', 'L'), 1000, 1) * spec_quality
// `, dbField), skuName.ID); err == nil && len(skuList) > 0 {
// for _, sku := range skuList {
// syncStatus := refutil.GetObjFieldByName(sku, syncStatusFieldName).(int8)
// globals.SugarLogger.Debugf("SyncSku trackInfo2:%s, skuID:%d, syncStatus:%d", ctx.GetTrackInfo(), sku.ID, syncStatus)
// if (len(skuIDs) == 0 || skuMap[sku.ID]) && (syncStatus != 0) {
// updateFields := []string{syncStatusFieldName}
// if syncStatus&model.SyncFlagDeletedMask != 0 { // 删除
// if syncStatus&model.SyncFlagNewMask == 0 {
// err = multiStoresHandler.DeleteSku(db, sku, userName)
// }
// } else if syncStatus&model.SyncFlagNewMask != 0 { // 新增
// if err = multiStoresHandler.CreateSku(db, sku, userName); err == nil {
// var tmpStruct struct {
// MaxIndex int
// }
// // todo hard code 得到京东spu中sku的顺序以方便以后修改销售属性这个必须要每次重新从数据库取
// if dao.GetRow(db, &tmpStruct, "SELECT MAX(sku_index) max_index FROM sku WHERE name_id = ? AND jd_id > 0 AND jd_id < 4024012631406 ", sku.NameID) == nil {
// sku.SkuIndex = tmpStruct.MaxIndex + 1
// updateFields = append(updateFields, "SkuIndex")
// }
// updateFields = append(updateFields, dao.GetVendorThingIDStructField(model.VendorNames[multiStoresHandler.GetVendorID()]))
// }
// } else if syncStatus&model.SyncFlagModifiedMask != 0 { // 修改
// err = multiStoresHandler.UpdateSku(db, sku, userName)
// }
// if err == nil {
// refutil.SetObjFieldByName(sku, syncStatusFieldName, int8(0))
// if _, err = dao.UpdateEntity(db, sku, updateFields...); err != nil {
// break
// } else {
// resultList = append(resultList, 1)
// }
// }
// }
// }
// }
// if err == nil {
// refutil.SetObjFieldByName(skuName, syncStatusFieldName, int8(0))
// _, err = dao.UpdateEntity(db, skuName, syncStatusFieldName)
// }
// return resultList, err
// }, skuNameList)
// t.AddChild(task).Run()
// result, err2 := task.GetResult(0)
// if err = err2; err == nil {
// resultList = append(resultList, result...)
// }
// }
// return resultList, err
// })
// }
func (v *VendorSync) SyncStoresCategory(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isForce, isAsync, isContinueWhenError bool) (hint string, err error) {
globals.SugarLogger.Debug("SyncStoresCategory")
isManageIt := len(storeIDs) != 1
hint, err = v.LoopStoresMap(ctx, db, fmt.Sprintf("同步门店分类信息:%v", storeIDs), isAsync, isManageIt, vendorIDs, storeIDs,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if handler := v.GetSingleStoreHandler(loopMapInfo.VendorID); handler != nil {
if isForce {
dao.SetStoreCategorySyncStatus(db, loopMapInfo.VendorID, storeIDs, nil, model.SyncFlagModifiedMask)
}
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]),
tasksch.NewParallelConfig().SetParallelCount(5).SetIsContinueWhenError(isContinueWhenError), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
storeMap := batchItemList[0].(*model.StoreMap)
_, err = SyncStoreCategories(ctx, task, storeMap.VendorID, storeMap.StoreID, storeMap.VendorStoreID, nil, nil, false, isContinueWhenError)
return nil, err
}, loopMapInfo.StoreMapList)
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
} else {
storeMap := loopMapInfo.StoreMapList[0]
_, err = SyncStoreCategories(ctx, t, storeMap.VendorID, storeMap.StoreID, storeMap.VendorStoreID, nil, nil, false, isContinueWhenError)
}
}
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}, isContinueWhenError)
return hint, makeSyncError(err)
}
//
func (v *VendorSync) SyncStoresSkus2(ctx *jxcontext.Context, parentTask tasksch.ITask, causeFlag int, db *dao.DaoDB, vendorIDs []int, storeIDs []int, syncDisabled bool, skuIDs, excludeSkuIDs []int, setSyncStatus int, isAsync, isContinueWhenError bool) (hint string, err error) {
globals.SugarLogger.Debug("SyncStoresSkus2")
isManageIt := len(storeIDs) != 1 || len(skuIDs) == 0 || len(skuIDs) > 8
task, hint, err := v.LoopStoresMap2(ctx, parentTask, db, fmt.Sprintf("同步门店商品信息:%v", storeIDs), isAsync, isManageIt, vendorIDs, storeIDs, false,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil {
parallelCount := 5
if model.MultiStoresVendorMap[loopMapInfo.VendorID] == 1 {
parallelCount = 2
}
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]),
tasksch.NewParallelConfig().SetParallelCount(parallelCount).SetIsContinueWhenError(isContinueWhenError), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
storeMap := batchItemList[0].(*model.StoreMap)
if syncDisabled || storeMap.Status > model.StoreStatusDisabled {
if setSyncStatus != 0 {
dao.SetStoreSkuSyncStatus(db, storeMap.VendorID, []int{storeMap.StoreID}, skuIDs, setSyncStatus)
}
if _, err = SyncStoreSkuNew(ctx, task, causeFlag, storeMap.VendorID, storeMap.StoreID, storeMap.VendorStoreID, nil, skuIDs, excludeSkuIDs, false, isContinueWhenError); err != nil {
globals.SugarLogger.Debugf("SyncStoresSkus2 failed2 store:%d failed with error:%v", storeMap.StoreID, err)
}
}
return nil, err
}, loopMapInfo.StoreMapList)
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
}
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}, isContinueWhenError)
if task != nil {
err = makeSyncError(err)
}
return hint, err
}
func (v *VendorSync) SyncStoresSkus(ctx *jxcontext.Context, parentTask tasksch.ITask, causeFlag int, db *dao.DaoDB, vendorIDs []int, storeIDs []int, skuIDs []int, isForce, isAsync, isContinueWhenError bool) (hint string, err error) {
setSyncStatus := 0
if isForce {
setSyncStatus = model.SyncFlagStoreSkuModifiedMask
}
return v.SyncStoresSkus2(ctx, parentTask, causeFlag, db, vendorIDs, storeIDs, true, skuIDs, nil, setSyncStatus, isAsync, isContinueWhenError)
}
func (v *VendorSync) FullSyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, syncDisabled bool, excludeSkuIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
globals.SugarLogger.Debug("FullSyncStoresSkus")
hint, err = v.LoopStoresMap(ctx, db, fmt.Sprintf("初始化门店商品信息:%v", storeIDs), isAsync, true, vendorIDs, storeIDs,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil {
parallelCount := 5
if model.MultiStoresVendorMap[loopMapInfo.VendorID] == 1 {
parallelCount = 1
}
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]),
tasksch.NewParallelConfig().SetParallelCount(parallelCount).SetIsContinueWhenError(isContinueWhenError), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
storeMap := batchItemList[0].(*model.StoreMap)
if syncDisabled || storeMap.Status > model.StoreStatusDisabled {
if _, err = FullSyncStoreSkuNew(ctx, task, storeMap.VendorID, storeMap.StoreID, storeMap.VendorStoreID, excludeSkuIDs, false, isContinueWhenError); err != nil {
globals.SugarLogger.Debugf("FullSyncStoresSkus failed2 store:%d failed with error:%v", storeMap.StoreID, err)
}
}
return nil, err
}, loopMapInfo.StoreMapList)
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
}
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}, isContinueWhenError)
return hint, makeSyncError(err)
}
func (v *VendorSync) DeleteRemoteStoreSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
globals.SugarLogger.Debug("DeleteRemoteStoreSkus")
hint, err = v.LoopStoresMap(ctx, db, fmt.Sprintf("删除远程门店商品信息:%v", storeIDs), isAsync, true, vendorIDs, storeIDs,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewSeqTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), ctx,
func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
storeMap := loopMapInfo.StoreMapList[step]
_, err = ClearRemoteStoreStuffAndSetNew(ctx, task, storeMap.VendorID, storeMap.StoreID, storeMap.VendorStoreID, false, isContinueWhenError)
return nil, err
}, len(loopMapInfo.StoreMapList))
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
} else {
_, err = ClearRemoteStoreStuffAndSetNew(ctx, t, loopMapInfo.StoreMapList[0].VendorID, loopMapInfo.StoreMapList[0].StoreID, loopMapInfo.StoreMapList[0].VendorStoreID, false, isContinueWhenError)
}
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}, isContinueWhenError)
return hint, makeSyncError(err)
}
// 将平台有,但本地没有的门店商品清除
// todo京东到家也应该支持
func (v *VendorSync) PruneMissingStoreSkus(ctx *jxcontext.Context, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
globals.SugarLogger.Debug("PruneMissingStoreSkus")
hint, err = v.LoopStoresMap(ctx, dao.GetDB(), fmt.Sprintf("删除远程无关联的门店商品信息:%v", storeIDs), isAsync, true, vendorIDs, storeIDs,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]),
tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetParallelCount(5), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
storeMap := batchItemList[0].(*model.StoreMap)
_, err = PruneMissingStoreSkus(ctx, task, loopMapInfo.VendorID, storeMap.StoreID, storeMap.VendorStoreID, false, isContinueWhenError)
return nil, err
}, loopMapInfo.StoreMapList)
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
} else {
_, err = PruneMissingStoreSkus(ctx, t, loopMapInfo.VendorID, loopMapInfo.StoreMapList[0].StoreID, loopMapInfo.StoreMapList[0].VendorStoreID, false, isContinueWhenError)
}
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}, isContinueWhenError)
return hint, makeSyncError(err)
}
// 把京西有,平台无且没有待创建标记的商品加上待创建标记
// todo京东到家也应该支持
func (v *VendorSync) AddCreateFlagForJxStoreSku(ctx *jxcontext.Context, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
globals.SugarLogger.Debug("AddCreateFlagForJxStoreSku")
hint, err = v.LoopStoresMap(ctx, dao.GetDB(), fmt.Sprintf("处理京西有,平台无且没有待创建标记的商品加上待创建标记:%v", storeIDs), isAsync, true, vendorIDs, storeIDs,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]),
tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetParallelCount(5), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
storeMap := batchItemList[0].(*model.StoreMap)
_, err = AddCreateFlagForJxStoreSku(ctx, task, loopMapInfo.VendorID, storeMap.StoreID, storeMap.VendorStoreID, false, isContinueWhenError)
return nil, err
}, loopMapInfo.StoreMapList)
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
} else {
_, err = AddCreateFlagForJxStoreSku(ctx, t, loopMapInfo.VendorID, loopMapInfo.StoreMapList[0].StoreID, loopMapInfo.StoreMapList[0].VendorStoreID, false, isContinueWhenError)
}
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}, isContinueWhenError)
return hint, makeSyncError(err)
}
func (v *VendorSync) AmendAndPruneStoreStuff(ctx *jxcontext.Context, vendorIDs []int, storeIDs []int, isAsync, isContinueWhenError bool, optType int, isForceUpdate bool) (hint string, err error) {
globals.SugarLogger.Debug("AmendAndPruneStoreStuff")
hint, err = v.LoopStoresMap(ctx, dao.GetDB(), fmt.Sprintf("处理京西平台商家分类与商品差异:%v", storeIDs), isAsync, true, vendorIDs, storeIDs,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]),
tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetParallelCount(5), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
storeMap := batchItemList[0].(*model.StoreMap)
_, err = amendAndPruneStoreStuff(ctx, task, loopMapInfo.VendorID, storeMap.StoreID, storeMap.VendorStoreID, false, isContinueWhenError, optType, isForceUpdate)
return nil, err
}, loopMapInfo.StoreMapList)
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
} else {
_, err = amendAndPruneStoreStuff(ctx, t, loopMapInfo.VendorID, loopMapInfo.StoreMapList[0].StoreID, loopMapInfo.StoreMapList[0].VendorStoreID, false, isContinueWhenError, optType, isForceUpdate)
}
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}, isContinueWhenError)
return hint, makeSyncError(err)
}
func (v *VendorSync) LoopStoresMap2(ctx *jxcontext.Context, parentTask tasksch.ITask, db *dao.DaoDB, taskName string, isAsync, isManageIt bool, vendorIDs []int, storeIDs []int, mustDirty bool, handler tasksch.WorkFunc, isContinueWhenError bool) (task tasksch.ITask, hint string, err error) {
var storeMapList []*model.StoreMap
if storeMapList, err = dao.GetStoresMapList2(db, vendorIDs, storeIDs, nil, model.StoreStatusAll, model.StoreIsSyncYes, "", mustDirty); err != nil {
return nil, "", err
}
if len(storeMapList) == 0 {
return nil, "", nil
}
vendorStoreMap := make(map[int][]*model.StoreMap)
for _, v := range storeMapList {
vendorStoreMap[v.VendorID] = append(vendorStoreMap[v.VendorID], v)
}
loopInfoList := make([]*LoopStoreMapInfo, len(vendorStoreMap))
index := 0
for k, v := range vendorStoreMap {
loopInfoList[index] = &LoopStoreMapInfo{
VendorID: k,
StoreMapList: v,
}
index++
}
if len(loopInfoList) == 1 {
taskName = fmt.Sprintf("%s,处理平台%s", taskName, model.VendorChineseNames[loopInfoList[0].VendorID])
}
// 临时把京东的并发改为2
task = tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, handler, loopInfoList)
if isAsync {
buildSetFinishHook(task, ctx)
}
tasksch.HandleTask(task, parentTask, isManageIt).Run()
if !isAsync {
resultList, err2 := task.GetResult(0)
if len(task.GetFailedList()) > 0 {
err2 = buildErrMsg(task)
}
if err = err2; err == nil {
if len(resultList) == 0 {
hint = "1" // todo 暂时这样
} else {
hint = jxutils.TaskResult2Hint(resultList)
}
}
} else {
hint = task.GetID()
}
return task, hint, err
}
func buildSetFinishHook(task tasksch.ITask, ctx *jxcontext.Context) {
task.SetFinishHook(func(task tasksch.ITask) {
var noticeMsg = "您此次的同步任务错误详情返回如下: \n"
if ctx.GetUserName() != "jxadmin" {
if len(task.GetFailedList()) > 10 {
downloadURL, _, _ := WirteToExcelBySyncFailed(task)
noticeMsg += fmt.Sprintf("[详情点我]%s/billshow/?normal=true&path=%s \n", globals.BackstageHost, downloadURL)
} else if len(task.GetFailedList()) > 0 && len(task.GetFailedList()) <= 10 {
if task.GetErr() != nil {
noticeMsg += utils.Format4Output(buildErrMsgJson(task), true)
}
} else {
noticeMsg = "您的同步任务执行完成,没有错误返回。"
}
if authInfo, err := ctx.GetV2AuthInfo(); err == nil {
ddmsg.SendUserMessage(dingdingapi.MsgTyeText, authInfo.UserID, "同步错误返回", noticeMsg)
} else {
globals.SugarLogger.Debugf("同步错误发送钉钉消息失败, authinfo [%v] , [%v]", *authInfo, err)
}
} else {
if time.Now().Hour() >= 20 || time.Now().Hour() < 7 {
downloadURL, _, _ := WirteToExcelBySyncFailed(task)
user, err := dao.GetUserByID(dao.GetDB(), "mobile", "18160030913")
noticeMsg += fmt.Sprintf("[详情点我]%s/billshow/?normal=true&path=%s \n", globals.BackstageHost, downloadURL)
if user != nil && err == nil {
ddmsg.SendUserMessage(dingdingapi.MsgTyeText, user.UserID, "同步错误返回", noticeMsg)
}
}
}
})
}
func buildErrMsg(task tasksch.ITask) (err error) {
err = fmt.Errorf(utils.Format4Output(buildErrMsgJson(task), true))
return makeSpecSyncError(err)
}
func buildErrMsgJson(task tasksch.ITask) (resultL []*SyncErrResult) {
failedList := task.GetFailedList()
for _, v := range failedList {
for _, vv := range v.([]*partner.StoreSkuInfoWithErr) {
result := &SyncErrResult{
SkuID: 0,
StoreID: vv.StoreID,
CategoryName: vv.CategoryName,
VendorName: vv.VendoreName,
VendorSkuID: "",
NameID: 0,
VendorPrice: 0,
SyncType: vv.SyncType,
ErrMsg: vv.ErrMsg,
}
if vv.StoreSkuInfo != nil {
result.SkuID = vv.StoreSkuInfo.SkuID
result.VendorSkuID = vv.StoreSkuInfo.VendorSkuID
result.NameID = vv.StoreSkuInfo.NameID
result.VendorPrice = vv.StoreSkuInfo.VendorPrice
}
resultL = append(resultL, result)
}
}
return resultL
}
func (v *VendorSync) LoopStoresMap(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync, isManageIt bool, vendorIDs []int, storeIDs []int, handler tasksch.WorkFunc, isContinueWhenError bool) (hint string, err error) {
_, hint, err = v.LoopStoresMap2(ctx, nil, db, taskName, isAsync, isManageIt, vendorIDs, storeIDs, false, handler, isContinueWhenError)
return hint, err
}
func (v *VendorSync) LoopMultiStoresVendors(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync bool, isManageIt bool, handler tasksch.WorkFunc) (hint string, err error) {
task := tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, handler, getMultiStoreVendorInfoList())
tasksch.HandleTask(task, nil, isManageIt).Run()
if !isAsync {
result, err2 := task.GetResult(0)
if err = err2; err == nil {
hint = utils.Int2Str(len(result))
}
} else {
hint = task.ID
}
return hint, makeSyncError(err)
}
// func (v *VendorSync) RefreshAllSkusID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int, storeIDs []int) (hint string, err error) {
// task := tasksch.NewParallelTask("RefreshAllSkusID", nil, ctx,
// func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
// vendorID := batchItemList[0].(int)
// if handler := v.GetStoreHandler(vendorID); handler != nil {
// if multiHandler, ok := handler.(partner.IMultipleStoresHandler); ok {
// _, err = multiHandler.RefreshAllSkusID(ctx, task, false)
// } else if singleHandler, ok := handler.(partner.ISingleStoreHandler); ok {
// _, err = singleHandler.RefreshStoresAllSkusID(ctx, task, false, storeIDs)
// }
// }
// return nil, err
// }, vendorIDs)
// tasksch.HandleTask(task, nil, true).Run()
// if !isAsync {
// _, err = task.GetResult(0)
// }
// return task.ID, err
// }
func (v *VendorSync) RefreshAllStoresID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int) (hint string, err error) {
task := tasksch.NewParallelTask("RefreshAllStoresID", nil, ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
vendorID := batchItemList[0].(int)
if handler := v.GetStoreHandler(vendorID); handler != nil {
_, err = handler.RefreshAllStoresID(ctx, task, false)
}
return nil, err
}, vendorIDs)
tasksch.HandleTask(task, nil, true).Run()
if !isAsync {
_, err = task.GetResult(0)
}
return task.ID, err
}
func makeSyncError(err error) (newErr error) {
if err != nil {
if _, ok := err.(*SyncError); !ok {
return &SyncError{
Original: err,
}
}
}
return err
}
func makeSpecSyncError(err error) (newErr error) {
if err != nil {
if _, ok := err.(*SpecSyncError); !ok {
return &SpecSyncError{
SpecErr: err,
}
}
}
return err
}
func (e *SpecSyncError) Error() string {
return e.SpecErr.Error()
}
func (e *SyncError) Error() string {
return fmt.Sprintf("本地数据修改成功,但同步失败,请根据错误提示处理!,同步错误信息:%s", e.Original.Error())
}
func isSyncError(err error) bool {
_, ok := err.(*SyncError)
return ok
}
func (v *VendorSync) SyncSkuNames(ctx *jxcontext.Context, nameIDs []int, isForce, isAsync, isContinueWhenError bool) (hint string, err error) {
return SyncSkus(ctx, nil, nil, nil, nameIDs, nil, isAsync)
}
func (v *VendorSync) oldSyncSkuNames(ctx *jxcontext.Context, nameIDs []int, isForce, isAsync, isContinueWhenError bool) (hint string, err error) {
db := dao.GetDB()
if isForce {
dao.SetSkuNameSyncStatus(db, nil, nil, nameIDs, model.SyncFlagModifiedMask)
}
return v.SyncSkus(ctx, db, nameIDs, nil, isAsync, isContinueWhenError, ctx.GetUserName())
}
func (v *VendorSync) ChangeStoreSkuSaleStatus(ctx *jxcontext.Context, storeID int, isAsync, isContinueWhenError bool) (err error) {
var (
storeIDs []int
skuIDs []int
)
db := dao.GetDB()
storeSkuList, err := dao.GetStoresSkusInfoBySaleTime(db, storeID)
if len(storeSkuList) < 1 || err != nil {
return nil
}
for _, v := range storeSkuList {
storeIDs = append(storeIDs, v.StoreID)
skuIDs = append(skuIDs, v.SkuID)
}
vendorIDs := partner.GetPurchasePlatformVendorIDs()
dao.UpdateStoreSkuBindSyncStatusForSaleStatus(db, vendorIDs, storeID)
v.SyncStoresSkus(ctx, nil, model.SyncFlagSaleMask, db, vendorIDs, storeIDs, skuIDs, false, isAsync, isContinueWhenError)
return err
}
func GetTimeMixByInt(begin1, end1, begin2, end2 int16) (beginAt, endAt int16) {
if (begin1 > begin2 && begin1 > end2) || (begin2 > end1 && end2 > end1) {
return 0, 0
}
if begin1 > begin2 {
beginAt = begin1
if end1 > end2 {
endAt = end2
} else {
endAt = end1
}
} else {
beginAt = begin2
if end1 > end2 {
endAt = end2
} else {
endAt = end1
}
}
return beginAt, endAt
}
func WirteToExcelBySyncFailed(task tasksch.ITask) (downloadURL, fileName string, err error) {
var (
sheetList1 []*excel.Obj2ExcelSheetConfig
)
syncErrResultLock.syncErrResult = syncErrResultLock.syncErrResult[0:0]
list := buildErrMsgJson(task)
for _, v := range list {
syncErrResultLock.AppendData(*v)
}
excelConf1 := &excel.Obj2ExcelSheetConfig{
Title: "同步错误",
Data: syncErrResultLock.syncErrResult,
CaptionList: SyncErrResultTitle,
}
sheetList1 = append(sheetList1, excelConf1)
if excelConf1 != nil {
downloadURL, fileName, err = jxutils.UploadExeclAndPushMsg(sheetList1, time.Now().Format("200601021504")+"同步错误返回")
baseapi.SugarLogger.Debug("WriteToExcel: download is [%v]", downloadURL)
} else {
baseapi.SugarLogger.Debug("WriteToExcel: dataSuccess is nil!")
}
if err != nil {
baseapi.SugarLogger.Errorf("WriteToExcel:upload %s , failed error:%v", fileName, err)
}
return downloadURL, fileName, err
}
func (d *SyncErrResultLock) AppendData(syncErrResult SyncErrResult) {
d.locker.Lock()
defer d.locker.Unlock()
d.syncErrResult = append(d.syncErrResult, syncErrResult)
}
func (v *VendorSync) SyncStoreSkusFromYb(ctx *jxcontext.Context, storeIDs []int, isAsync, isContinueWhenError bool) (hint string, err error) {
var (
vendorID = model.VendorIDYB
)
hint, err = v.LoopStoresMap(ctx, dao.GetDB(), fmt.Sprintf("同步银豹到京西:%v", storeIDs), isAsync, true, []int{vendorID}, storeIDs,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
for _, v := range loopMapInfo.StoreMapList {
hint, err = syncStoreSkusFromYb(ctx, v.StoreID, vendorID, v.VendorStoreID, isAsync, isContinueWhenError)
}
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}, isContinueWhenError)
return hint, err
}
func syncStoreSkusFromYb(ctx *jxcontext.Context, storeID, vendorID int, vendorStoreID string, isAsync, isContinueWhenError bool) (hint string, err error) {
var (
db = dao.GetDB()
localSkuMap = make(map[string]*dao.StoreSkuSyncInfo)
vendorSkuMap = make(map[string]*partner.SkuNameInfo)
skuBindInfosDel []*StoreSkuBindInfo
skuBindInfosUpt []*StoreSkuBindInfo
addList []*partner.SkuNameInfo
updateList []*partner.SkuNameInfo
deleteList []*dao.StoreSkuSyncInfo
)
handler, _ := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler)
localSkuList, err := dao.GetStoreSkus2(db, vendorID, storeID, nil, false)
if err != nil {
return "", err
}
for _, v := range localSkuList {
localSkuMap[v.VendorSkuID] = v
}
remoteSkuList, err := handler.GetStoreSkusFullInfo(ctx, nil, storeID, vendorStoreID, nil)
if err != nil {
return "", err
}
for _, v := range remoteSkuList {
if localSkuMap[v.SkuList[0].VendorSkuID] == nil {
if len(v.YbBarCode) > 7 {
addList = append(addList, v)
}
} else {
updateList = append(updateList, v)
}
vendorSkuMap[v.SkuList[0].VendorSkuID] = v
}
for _, v := range localSkuList {
if vendorSkuMap[v.VendorSkuID] == nil {
deleteList = append(deleteList, v)
}
}
// fmt.Println("remoteSkuList", len(remoteSkuList))
// fmt.Println("addList", len(addList))
// fmt.Println("updateList", len(updateList))
taskSeqFunc := func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
store, _ := dao.GetStoreDetail(db, storeID, vendorID)
switch step {
case 0:
if len(addList) > 0 {
taskFunc := func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
var (
v = batchItemList[0].(*partner.SkuNameInfo)
upc = v.YbBarCode
)
err = AddSkuNameByUpc(ctx, upc, store, v)
if err != nil {
task.AddFailedList(putils.GetErrMsg2FailedSingleList(nil, err, storeID, model.VendorChineseNames[vendorID], "根据upc创建京西商品"))
}
return retVal, err
}
taskParallel := tasksch.NewParallelTask("创建商品", tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, taskFunc, addList)
tasksch.HandleTask(taskParallel, task, true).Run()
_, err = taskParallel.GetResult(0)
}
case 1:
if len(deleteList) > 0 {
taskFunc := func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
var (
v = batchItemList[0].(*dao.StoreSkuSyncInfo)
)
skuBindInfo := &StoreSkuBindInfo{
NameID: v.NameID,
IsFocus: -1,
}
retVal = []*StoreSkuBindInfo{skuBindInfo}
return retVal, err
}
taskParallel := tasksch.NewParallelTask("删除商品", tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, taskFunc, deleteList)
tasksch.HandleTask(taskParallel, task, true).Run()
resultDel, _ := taskParallel.GetResult(0)
for _, v := range resultDel {
skuBindInfosDel = append(skuBindInfosDel, v.(*StoreSkuBindInfo))
}
_, err = updateStoresSkusWithoutSync(ctx, db, []int{storeID}, skuBindInfosDel, false)
}
case 2:
if len(updateList) > 0 {
taskFunc := func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
var (
v = batchItemList[0].(*partner.SkuNameInfo)
skuBindInfo = &StoreSkuBindInfo{}
storeSkus []*dao.StoreSkuExt
pricePercentagePack []*model.PricePercentageItem
)
sql := `
SELECT a.*, c.id name_id
FROM store_sku_bind a
JOIN sku b ON a.sku_id = b.id
JOIN sku_name c ON c.id = b.name_id
WHERE a.store_id = ? AND a.yb_id = ? AND a.deleted_at = ?
`
sqlParams := []interface{}{storeID, v.SkuList[0].VendorSkuID, utils.DefaultTimeValue}
err = dao.GetRows(db, &storeSkus, sql, sqlParams)
if len(storeSkus) > 0 {
if storeSkus[0].YbPrice != int(v.SkuList[0].VendorPrice) {
err = jxutils.Strings2Objs(store.PricePercentagePackStr, &pricePercentagePack)
skuBindInfo.UnitPrice = jxutils.CaculateJxPriceByPricePack(pricePercentagePack, 0, int(v.SkuList[0].VendorPrice))
}
} else {
return retVal, fmt.Errorf("未查询到门店商品yb_id [%v]", v.SkuList[0].VendorSkuID)
}
if v.SkuList[0].Stock < 1 {
skuBindInfo.IsSale = model.StoreSkuBindStatusDontSale
} else {
skuBindInfo.IsSale = model.StoreSkuBindStatusNormal
}
skuBindInfo.NameID = storeSkus[0].NameID
retVal = []*StoreSkuBindInfo{skuBindInfo}
return retVal, err
}
taskParallel := tasksch.NewParallelTask("更新商品价格和库存", tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, taskFunc, updateList)
tasksch.HandleTask(taskParallel, task, true).Run()
resultUpt, _ := taskParallel.GetResult(0)
for _, v := range resultUpt {
skuBindInfosUpt = append(skuBindInfosUpt, v.(*StoreSkuBindInfo))
}
_, err = updateStoresSkusWithoutSync(ctx, db, []int{storeID}, skuBindInfosUpt, false)
}
case 3:
_, err = CurVendorSync.SyncStoresSkus2(jxcontext.AdminCtx, nil, 0, db, []int{0, 1, 3}, nil, false, nil, nil, 0, true, true)
}
return result, err
}
taskSeq := tasksch.NewSeqTask2("同步银豹商品到京西", ctx, true, taskSeqFunc, 3)
tasksch.HandleTask(taskSeq, nil, true).Run()
hint = taskSeq.GetID()
return hint, err
}