392 lines
15 KiB
Go
392 lines
15 KiB
Go
package cms
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"reflect"
|
|
|
|
"git.rosy.net.cn/baseapi/utils"
|
|
"git.rosy.net.cn/jx-callback/business/jxutils"
|
|
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
|
|
"git.rosy.net.cn/jx-callback/business/jxutils/tasksch"
|
|
"git.rosy.net.cn/jx-callback/business/model"
|
|
"git.rosy.net.cn/jx-callback/business/model/dao"
|
|
"git.rosy.net.cn/jx-callback/business/partner"
|
|
"git.rosy.net.cn/jx-callback/globals"
|
|
"git.rosy.net.cn/jx-callback/globals/api"
|
|
)
|
|
|
|
type VendorSync struct {
|
|
MultiStoreVendorIDs []int
|
|
SingleStoreVendorIDs []int
|
|
PurchaseHandlers map[int]partner.IPurchasePlatformHandler
|
|
}
|
|
|
|
type SyncError struct {
|
|
Original error `json:"original"`
|
|
Message string `json:"message"`
|
|
}
|
|
|
|
// 对于多门店平台接口的通用处理
|
|
type MultiStoreHandlerWrapper struct {
|
|
partner.IMultipleStoresHandler
|
|
}
|
|
|
|
// 对于单门店平台接口的通用处理
|
|
type SingleStoreHandlerWrapper struct {
|
|
partner.ISingleStoreHandler
|
|
}
|
|
|
|
var (
|
|
CurVendorSync VendorSync
|
|
)
|
|
|
|
var (
|
|
ErrHaveNotImplementedYet = errors.New("还没有实现")
|
|
ErrEntityNotExist = errors.New("找不到相应实体")
|
|
)
|
|
|
|
func Init() {
|
|
apiMap := map[int]interface{}{
|
|
model.VendorIDJD: api.JdAPI,
|
|
model.VendorIDELM: api.ElmAPI,
|
|
model.VendorIDEBAI: api.EbaiAPI,
|
|
}
|
|
CurVendorSync.PurchaseHandlers = make(map[int]partner.IPurchasePlatformHandler)
|
|
for k, v := range partner.PurchasePlatformHandlers {
|
|
if !reflect.ValueOf(apiMap[k]).IsNil() {
|
|
if multiHandler, ok := v.(partner.IMultipleStoresHandler); ok {
|
|
CurVendorSync.MultiStoreVendorIDs = append(CurVendorSync.MultiStoreVendorIDs, k)
|
|
CurVendorSync.PurchaseHandlers[k] = &MultiStoreHandlerWrapper{
|
|
IMultipleStoresHandler: multiHandler,
|
|
}
|
|
} else if singleHandler, ok := v.(partner.ISingleStoreHandler); ok {
|
|
CurVendorSync.SingleStoreVendorIDs = append(CurVendorSync.SingleStoreVendorIDs, k)
|
|
CurVendorSync.PurchaseHandlers[k] = &SingleStoreHandlerWrapper{
|
|
ISingleStoreHandler: singleHandler,
|
|
}
|
|
} else {
|
|
panic(fmt.Sprintf("platform:%d type is wrong!", k))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *MultiStoreHandlerWrapper) DeleteCategory(db *dao.DaoDB, cat *model.SkuCategory, userName string) (err error) {
|
|
if jxutils.IsFakeID(cat.JdID) {
|
|
return nil
|
|
}
|
|
return p.IMultipleStoresHandler.DeleteCategory(db, cat, userName)
|
|
}
|
|
|
|
func (p *MultiStoreHandlerWrapper) UpdateCategory(db *dao.DaoDB, cat *model.SkuCategory, userName string) (err error) {
|
|
if jxutils.IsFakeID(cat.JdID) {
|
|
globals.SugarLogger.Warnf("UpdateCategory fakeid cat:%s should not get here", utils.Format4Output(cat, true))
|
|
return nil
|
|
}
|
|
return p.IMultipleStoresHandler.UpdateCategory(db, cat, userName)
|
|
}
|
|
|
|
func (p *MultiStoreHandlerWrapper) DeleteSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error) {
|
|
globals.SugarLogger.Debugf("wrapper DeleteSku, sku:%s", utils.Format4Output(sku, false))
|
|
if jxutils.IsFakeID(sku.JdID) {
|
|
return nil
|
|
}
|
|
return p.IMultipleStoresHandler.DeleteSku(db, sku, userName)
|
|
}
|
|
|
|
func (p *MultiStoreHandlerWrapper) UpdateSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error) {
|
|
if jxutils.IsFakeID(sku.JdID) {
|
|
globals.SugarLogger.Warnf("UpdateSku fakeid sku:%s should not get here", utils.Format4Output(sku, true))
|
|
return nil
|
|
}
|
|
return p.IMultipleStoresHandler.UpdateSku(db, sku, userName)
|
|
}
|
|
|
|
func (v *VendorSync) GetStoreHandler(vendorID int) partner.IPurchasePlatformHandler {
|
|
return v.PurchaseHandlers[vendorID]
|
|
}
|
|
|
|
func (v *VendorSync) GetMultiStoreHandler(vendorID int) partner.IMultipleStoresHandler {
|
|
if handler, ok := v.PurchaseHandlers[vendorID].(partner.IMultipleStoresHandler); ok {
|
|
return handler
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (v *VendorSync) GetSingleStoreHandler(vendorID int) partner.ISingleStoreHandler {
|
|
if handler, ok := v.PurchaseHandlers[vendorID].(partner.ISingleStoreHandler); ok {
|
|
return handler
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (v *VendorSync) syncCategories(ctx *jxcontext.Context, parentTask tasksch.ITask, multiStoresHandler partner.IMultipleStoresHandler, db *dao.DaoDB, cats []*model.SkuCategory, userName string) (err error) {
|
|
syncStatusFieldName := multiStoresHandler.GetFieldSyncStatusName()
|
|
task := tasksch.NewParallelTask("syncCategories", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
|
cat := batchItemList[0].(*model.SkuCategory)
|
|
updateFields := []string{syncStatusFieldName}
|
|
syncStatus := jxutils.GetObjFieldByName(cat, syncStatusFieldName).(int8)
|
|
if (syncStatus & model.SyncFlagDeletedMask) != 0 { //删除
|
|
err = multiStoresHandler.DeleteCategory(db, cat, userName)
|
|
} else if (syncStatus & model.SyncFlagNewMask) != 0 { // 新增
|
|
err = multiStoresHandler.CreateCategory(db, cat, userName)
|
|
updateFields = append(updateFields, multiStoresHandler.GetFieldIDName())
|
|
} else if (syncStatus & model.SyncFlagModifiedMask) != 0 { // 修改
|
|
err = multiStoresHandler.UpdateCategory(db, cat, userName)
|
|
}
|
|
if err == nil {
|
|
jxutils.SetObjFieldByName(cat, syncStatusFieldName, int8(0))
|
|
_, err = dao.UpdateEntity(db, cat, updateFields...)
|
|
}
|
|
return nil, err
|
|
}, cats)
|
|
ctx.SetTaskOrAddChild(task, parentTask)
|
|
task.Run()
|
|
_, err = task.GetResult(0)
|
|
return err
|
|
}
|
|
|
|
func (v *VendorSync) SyncCategory(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) {
|
|
globals.SugarLogger.Debug(v.MultiStoreVendorIDs)
|
|
hint, err = v.LoopMultiStoresVendors(ctx, db, "SyncCategory", isAsync, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
|
multiStoresHandler := v.GetMultiStoreHandler(batchItemList[0].(int))
|
|
var cats []*model.SkuCategory
|
|
cond := make(map[string]interface{})
|
|
if categoryID > 0 {
|
|
cond[model.FieldID] = categoryID
|
|
} else {
|
|
cond[model.FieldLevel] = 1
|
|
}
|
|
err := dao.GetEntitiesByKV(db, &cats, cond, true)
|
|
if err == nil {
|
|
err = v.syncCategories(ctx, t, multiStoresHandler, db, cats, userName)
|
|
}
|
|
if err != nil || categoryID > 0 {
|
|
return nil, err
|
|
}
|
|
cond[model.FieldLevel] = 2
|
|
err = dao.GetEntitiesByKV(db, &cats, cond, true)
|
|
if err == nil {
|
|
err = v.syncCategories(ctx, t, multiStoresHandler, db, cats, userName)
|
|
}
|
|
return nil, err
|
|
})
|
|
return "", err
|
|
}
|
|
|
|
func (v *VendorSync) SyncReorderCategories(ctx *jxcontext.Context, db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) {
|
|
hint, err = v.LoopMultiStoresVendors(ctx, db, "SyncReorderCategories", isAsync, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
|
multiStoresHandler := v.GetMultiStoreHandler(batchItemList[0].(int))
|
|
err2 := multiStoresHandler.ReorderCategories(db, categoryID, userName)
|
|
if err2 == nil {
|
|
cat := &model.SkuCategory{}
|
|
_, err2 = dao.UpdateEntityByKV(db, cat, utils.Params2Map(multiStoresHandler.GetFieldSyncStatusName(), 0), utils.Params2Map(model.FieldParentID, categoryID))
|
|
return nil, err2
|
|
}
|
|
return nil, err2
|
|
})
|
|
return "", err
|
|
}
|
|
|
|
func (v *VendorSync) SyncStore(ctx *jxcontext.Context, db *dao.DaoDB, vendorID, storeID int, isAsync bool, userName string) (hint string, err error) {
|
|
globals.SugarLogger.Debugf("SyncStore, storeID:%d", storeID)
|
|
hint, err = v.LoopStoreMap(ctx, db, "SyncStore", isAsync, userName, storeID, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
|
storeMap := batchItemList[0].(*model.StoreMap)
|
|
if (vendorID == -1 || vendorID == storeMap.VendorID) && (storeMap.SyncStatus != 0) {
|
|
if handler := v.GetStoreHandler(storeMap.VendorID); handler != nil {
|
|
if err = handler.UpdateStore(db, storeID, userName); err == nil {
|
|
storeMap.SyncStatus = 0
|
|
_, err = dao.UpdateEntity(db, storeMap, model.FieldSyncStatus)
|
|
}
|
|
}
|
|
}
|
|
return nil, err
|
|
})
|
|
return "", err
|
|
}
|
|
|
|
func (v *VendorSync) SyncSku(ctx *jxcontext.Context, db *dao.DaoDB, nameID, skuID int, isAsync bool, userName string) (hint string, err error) {
|
|
globals.SugarLogger.Debugf("SyncSku, nameID:%d, skuID:%d, userName:%s", nameID, skuID, userName)
|
|
hint, err = v.LoopMultiStoresVendors(ctx, db, "SyncSku", isAsync, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
|
multiStoresHandler := v.GetMultiStoreHandler(batchItemList[0].(int))
|
|
syncStatusFieldName := multiStoresHandler.GetFieldSyncStatusName()
|
|
var skuList []*model.Sku
|
|
cond := make(map[string]interface{})
|
|
if nameID != -1 {
|
|
cond[model.FieldNameID] = nameID
|
|
}
|
|
if skuID != -1 {
|
|
cond[model.FieldID] = skuID
|
|
}
|
|
err := dao.GetEntitiesByKV(db, &skuList, cond, true)
|
|
if err == nil {
|
|
// globals.SugarLogger.Debug(utils.Format4Output(skuList, false))
|
|
task := tasksch.NewParallelTask("SyncSku", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
|
sku := batchItemList[0].(*model.Sku)
|
|
syncStatus := jxutils.GetObjFieldByName(sku, syncStatusFieldName).(int8)
|
|
if (skuID == -1 || skuID == sku.ID) && (syncStatus != 0) {
|
|
updateFields := []string{syncStatusFieldName}
|
|
if syncStatus&model.SyncFlagDeletedMask != 0 { // 删除
|
|
err = multiStoresHandler.DeleteSku(db, sku, userName)
|
|
} else if syncStatus&model.SyncFlagNewMask != 0 { // 新增
|
|
err = multiStoresHandler.CreateSku(db, sku, userName)
|
|
updateFields = append(updateFields, multiStoresHandler.GetFieldIDName())
|
|
} else if syncStatus&model.SyncFlagModifiedMask != 0 { // 修改
|
|
err = multiStoresHandler.UpdateSku(db, sku, userName)
|
|
}
|
|
if err == nil {
|
|
jxutils.SetObjFieldByName(sku, syncStatusFieldName, int8(0))
|
|
dao.UpdateEntity(db, sku, updateFields...)
|
|
}
|
|
}
|
|
return nil, err
|
|
}, skuList)
|
|
t.AddChild(task).Run()
|
|
_, err = task.GetResult(0)
|
|
}
|
|
return nil, err
|
|
})
|
|
return "", err
|
|
}
|
|
|
|
//
|
|
func (v *VendorSync) SyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, skuIDs []int, isAsync bool, userName string) (hint string, err error) {
|
|
globals.SugarLogger.Debug("SyncStoresSkus")
|
|
hint, err = v.LoopStoreVendors(ctx, db, vendorIDs, "SyncStoresSkus", isAsync, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
|
|
handler := v.GetStoreHandler(batchItemList[0].(int))
|
|
_, err = handler.SyncStoresSkus(ctx, t, db, storeIDs, skuIDs, false)
|
|
return nil, err
|
|
})
|
|
return hint, err
|
|
}
|
|
|
|
func (v *VendorSync) LoopStoreMap(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync bool, userName string, storeID int, handler tasksch.WorkFunc) (hint string, err error) {
|
|
storeMaps, err := GetStoreVendorMaps(ctx, db, storeID, -1)
|
|
if err == nil {
|
|
task := tasksch.NewParallelTask(taskName, nil, userName, handler, storeMaps)
|
|
ctx.SetTaskOrAddChild(task, nil)
|
|
tasksch.ManageTask(task).Run()
|
|
hint = task.ID
|
|
if !isAsync {
|
|
_, err = task.GetResult(0)
|
|
}
|
|
}
|
|
return hint, makeSyncError(err)
|
|
}
|
|
|
|
func (v *VendorSync) LoopMultiStoresVendors(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync bool, userName string, handler tasksch.WorkFunc) (hint string, err error) {
|
|
if taskName == "" {
|
|
taskName = "LoopMultiStoresVendors"
|
|
}
|
|
task := tasksch.NewParallelTask(taskName, nil, userName, handler, v.MultiStoreVendorIDs)
|
|
ctx.SetTaskOrAddChild(task, nil)
|
|
tasksch.ManageTask(task).Run()
|
|
if !isAsync {
|
|
_, err = task.GetResult(0)
|
|
}
|
|
return task.ID, makeSyncError(err)
|
|
}
|
|
|
|
func (v *VendorSync) LoopStoreVendors(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, taskName string, isAsync bool, userName string, handler tasksch.WorkFunc) (hint string, err error) {
|
|
if taskName == "" {
|
|
taskName = "LoopStoreVendors"
|
|
}
|
|
var vendorIDMap map[int]int
|
|
if len(vendorIDs) != 0 {
|
|
vendorIDMap = make(map[int]int)
|
|
for _, v := range vendorIDs {
|
|
vendorIDMap[v] = 1
|
|
}
|
|
}
|
|
allHandlers := make([]int, len(v.MultiStoreVendorIDs)+len(v.SingleStoreVendorIDs))
|
|
copy(allHandlers, v.MultiStoreVendorIDs)
|
|
copy(allHandlers[len(v.MultiStoreVendorIDs):], v.SingleStoreVendorIDs)
|
|
if vendorIDMap != nil {
|
|
count := 0
|
|
for _, v := range allHandlers {
|
|
if vendorIDMap[v] == 1 {
|
|
allHandlers[count] = v
|
|
count++
|
|
}
|
|
}
|
|
allHandlers = allHandlers[:count]
|
|
}
|
|
task := tasksch.NewParallelTask(taskName, nil, userName, handler, allHandlers)
|
|
ctx.SetTaskOrAddChild(task, nil)
|
|
tasksch.ManageTask(task).Run()
|
|
if !isAsync {
|
|
_, err = task.GetResult(0)
|
|
}
|
|
return task.ID, err
|
|
}
|
|
|
|
func (v *VendorSync) LoopSingleStoreVendors(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync bool, userName string, handler tasksch.WorkFunc) (hint string, err error) {
|
|
if taskName == "" {
|
|
taskName = "LoopSingleStoreVendors"
|
|
}
|
|
var storeMaps []*model.StoreMap
|
|
if err = dao.GetRows(db, &storeMaps, `
|
|
SELECT *
|
|
FROM store_map
|
|
WHERE deleted_at = ? AND vendor_id IN (`+dao.GenQuestionMarks(len(v.SingleStoreVendorIDs))+")", utils.DefaultTimeValue, v.SingleStoreVendorIDs); err == nil {
|
|
task := tasksch.NewParallelTask(taskName, nil, userName, handler, storeMaps)
|
|
ctx.SetTaskOrAddChild(task, nil)
|
|
tasksch.ManageTask(task).Run()
|
|
hint = task.ID
|
|
if !isAsync {
|
|
_, err = task.GetResult(0)
|
|
}
|
|
}
|
|
return hint, makeSyncError(err)
|
|
}
|
|
|
|
func (v *VendorSync) RefreshAllSkusID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int, storeIDs []int) (hint string, err error) {
|
|
task := tasksch.NewParallelTask("RefreshAllSkusID", nil, ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
|
vendorID := batchItemList[0].(int)
|
|
if handler := v.GetStoreHandler(vendorID); handler != nil {
|
|
if multiHandler, ok := handler.(partner.IMultipleStoresHandler); ok {
|
|
_, err = multiHandler.RefreshAllSkusID(ctx, task, false)
|
|
} else if singleHandler, ok := handler.(partner.ISingleStoreHandler); ok {
|
|
_, err = singleHandler.RefreshStoresAllSkusID(ctx, task, false, storeIDs)
|
|
}
|
|
}
|
|
return nil, err
|
|
}, vendorIDs)
|
|
ctx.SetTaskOrAddChild(task, nil)
|
|
tasksch.ManageTask(task).Run()
|
|
if !isAsync {
|
|
_, err = task.GetResult(0)
|
|
}
|
|
return task.ID, err
|
|
}
|
|
|
|
func (v *VendorSync) RefreshAllStoresID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int) (hint string, err error) {
|
|
task := tasksch.NewParallelTask("RefreshAllStoresID", nil, ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
|
vendorID := batchItemList[0].(int)
|
|
if handler := v.GetStoreHandler(vendorID); handler != nil {
|
|
_, err = handler.RefreshAllStoresID(ctx, task, false)
|
|
}
|
|
return nil, err
|
|
}, vendorIDs)
|
|
ctx.SetTaskOrAddChild(task, nil)
|
|
tasksch.ManageTask(task).Run()
|
|
if !isAsync {
|
|
_, err = task.GetResult(0)
|
|
}
|
|
return task.ID, err
|
|
}
|
|
|
|
func makeSyncError(err error) (newErr error) {
|
|
if err != nil {
|
|
return &SyncError{
|
|
Original: err,
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (e *SyncError) Error() string {
|
|
return fmt.Sprintf("本地数据修改成功,但同步失败,请根据错误提示处理!,同步错误信息:%s", e.Original.Error())
|
|
}
|