Files
jx-callback/business/jxstore/cms/sync.go
2018-10-12 11:41:16 +08:00

329 lines
12 KiB
Go

package cms
import (
"errors"
"fmt"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxcallback/scheduler/basesch"
"git.rosy.net.cn/jx-callback/business/jxutils"
"git.rosy.net.cn/jx-callback/business/jxutils/tasksch"
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/model/dao"
"git.rosy.net.cn/jx-callback/business/partner"
"git.rosy.net.cn/jx-callback/globals"
)
type VendorSync struct {
MultiStoreVendorIDs []int
SingleStoreVendorIDs []int
PurchaseHandlers map[int]partner.IPurchasePlatformHandler
}
type SyncError struct {
Original error `json:"original"`
Message string `json:"message"`
}
// 对于多门店平台接口的通用处理
type MultiStoreHandlerWrapper struct {
partner.IMultipleStoresHandler
}
// 对于单门店平台接口的通用处理
type SingleStoreHandlerWrapper struct {
partner.ISingleStoreHandler
}
var (
CurVendorSync VendorSync
)
var (
ErrHaveNotImplementedYet = errors.New("还没有实现")
ErrEntityNotExist = errors.New("找不到相应实体")
)
func Init() {
CurVendorSync.PurchaseHandlers = make(map[int]partner.IPurchasePlatformHandler)
for k, v := range basesch.FixedBaseScheduler.PurchasePlatformHandlers {
if multiHandler, ok := v.(partner.IMultipleStoresHandler); ok {
CurVendorSync.MultiStoreVendorIDs = append(CurVendorSync.MultiStoreVendorIDs, k)
CurVendorSync.PurchaseHandlers[k] = &MultiStoreHandlerWrapper{
IMultipleStoresHandler: multiHandler,
}
} else if singleHandler, ok := v.(partner.ISingleStoreHandler); ok {
CurVendorSync.SingleStoreVendorIDs = append(CurVendorSync.SingleStoreVendorIDs, k)
CurVendorSync.PurchaseHandlers[k] = &SingleStoreHandlerWrapper{
ISingleStoreHandler: singleHandler,
}
} else {
panic(fmt.Sprintf("platform:%d type is wrong!", k))
}
}
}
func (p *MultiStoreHandlerWrapper) DeleteCategory(db *dao.DaoDB, cat *model.SkuCategory, userName string) (err error) {
if jxutils.IsFakeID(cat.JdID) {
return nil
}
return p.IMultipleStoresHandler.DeleteCategory(db, cat, userName)
}
func (p *MultiStoreHandlerWrapper) UpdateCategory(db *dao.DaoDB, cat *model.SkuCategory, userName string) (err error) {
if jxutils.IsFakeID(cat.JdID) {
globals.SugarLogger.Warnf("UpdateCategory fakeid cat:%s should not get here", utils.Format4Output(cat, true))
return nil
}
return p.IMultipleStoresHandler.UpdateCategory(db, cat, userName)
}
func (p *MultiStoreHandlerWrapper) DeleteSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error) {
if jxutils.IsFakeID(sku.JdID) {
return nil
}
return p.IMultipleStoresHandler.DeleteSku(db, sku, userName)
}
func (p *MultiStoreHandlerWrapper) UpdateSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error) {
if jxutils.IsFakeID(sku.JdID) {
globals.SugarLogger.Warnf("UpdateSku fakeid sku:%s should not get here", utils.Format4Output(sku, true))
return nil
}
return p.IMultipleStoresHandler.UpdateSku(db, sku, userName)
}
func (v *VendorSync) GetStoreHandler(vendorID int) partner.IPurchasePlatformHandler {
return v.PurchaseHandlers[vendorID]
}
func (v *VendorSync) GetMultiStoreHandler(vendorID int) partner.IMultipleStoresHandler {
return v.PurchaseHandlers[vendorID].(partner.IMultipleStoresHandler)
}
func (v *VendorSync) GetSingleStoreHandler(vendorID int) partner.ISingleStoreHandler {
return v.PurchaseHandlers[vendorID].(partner.ISingleStoreHandler)
}
func (v *VendorSync) SyncCategory(db *dao.DaoDB, categoryID int, isForce bool, userName string) (err error) {
err = v.LoopMultiStoresVendors(db, "SyncCategory", userName, func(batchItemList []interface{}, params ...interface{}) (interface{}, error) {
multiStoresHandler := v.GetMultiStoreHandler(batchItemList[0].(int))
syncStatusFieldName := multiStoresHandler.GetFieldSyncStatusName()
var cats []*model.SkuCategory
cond := make(map[string]interface{})
if categoryID > 0 {
cond[model.FieldID] = categoryID
}
err := dao.GetEntitiesByKV(db, &cats, cond, true)
if err == nil {
task := tasksch.RunTask("", false, nil, len(cats), 1, userName, func(batchItemList []interface{}, params ...interface{}) (interface{}, error) {
for _, v := range batchItemList {
cat := v.(*model.SkuCategory)
updateFields := []string{syncStatusFieldName}
syncStatus := jxutils.GetObjFieldByName(cat, syncStatusFieldName).(int8)
if (syncStatus & model.SyncFlagDeletedMask) != 0 { //删除
err = multiStoresHandler.DeleteCategory(db, cat, userName)
} else if (syncStatus&model.SyncFlagNewMask) != 0 || isForce { // 新增
err = multiStoresHandler.CreateCategory(db, cat, userName)
updateFields = append(updateFields, multiStoresHandler.GetFieldIDName())
} else if (syncStatus & model.SyncFlagModifiedMask) != 0 { // 修改
err = multiStoresHandler.UpdateCategory(db, cat, userName)
}
if err == nil {
jxutils.SetObjFieldByName(cat, syncStatusFieldName, int8(0))
_, err = dao.UpdateEntity(db, cat, updateFields...)
} else {
break
}
}
return nil, err
}, cats)
_, err = task.GetResult(0)
}
return nil, err
})
return err
}
func (v *VendorSync) SyncReorderCategories(db *dao.DaoDB, categoryID int, isForce bool, userName string) (err error) {
err = v.LoopMultiStoresVendors(db, "SyncReorderCategories", userName, func(batchItemList []interface{}, params ...interface{}) (interface{}, error) {
multiStoresHandler := v.GetMultiStoreHandler(batchItemList[0].(int))
err2 := multiStoresHandler.ReorderCategories(db, categoryID, userName)
if err2 == nil {
cat := &model.SkuCategory{}
_, err2 = dao.UpdateEntityByKV(db, cat, utils.Params2Map(multiStoresHandler.GetFieldSyncStatusName(), 0), utils.Params2Map(model.FieldParentID, categoryID))
return nil, err2
}
return nil, err2
})
return err
}
func (v *VendorSync) SyncStore(db *dao.DaoDB, vendorID, storeID int, isForce bool, userName string) (err error) {
globals.SugarLogger.Debugf("SyncStore, storeID:%d", storeID)
err = v.LoopStoreMap(db, "SyncStore", userName, storeID, func(batchItemList []interface{}, params ...interface{}) (interface{}, error) {
storeMap := batchItemList[0].(*model.StoreMap)
if (vendorID == -1 || vendorID == storeMap.VendorID) && (isForce || storeMap.SyncStatus != 0) {
if handler := v.GetStoreHandler(storeMap.VendorID); handler != nil {
if err = handler.UpdateStore(db, storeID, userName); err == nil {
storeMap.SyncStatus = 0
_, err = dao.UpdateEntity(db, storeMap, model.FieldSyncStatus)
}
}
}
return nil, err
})
return err
}
func (v *VendorSync) SyncSku(db *dao.DaoDB, nameID, skuID int, isForce bool, userName string) (err error) {
globals.SugarLogger.Debugf("SyncSku, nameID:%d, skuID:%d, isForce:%t, userName:%s", nameID, skuID, isForce, userName)
err = v.LoopMultiStoresVendors(db, "SyncSku", userName, func(batchItemList []interface{}, params ...interface{}) (interface{}, error) {
multiStoresHandler := v.GetMultiStoreHandler(batchItemList[0].(int))
syncStatusFieldName := multiStoresHandler.GetFieldSyncStatusName()
var skuList []*model.Sku
cond := make(map[string]interface{})
if nameID != -1 {
cond[model.FieldNameID] = nameID
}
if skuID != -1 {
cond[model.FieldID] = skuID
}
err := dao.GetEntitiesByKV(db, &skuList, cond, true)
if err == nil {
// globals.SugarLogger.Debug(utils.Format4Output(skuList, false))
task := tasksch.RunTask("SyncSku", false, nil, len(skuList), 1, userName, func(batchItemList []interface{}, params ...interface{}) (interface{}, error) {
for _, v := range batchItemList {
sku := v.(*model.Sku)
syncStatus := jxutils.GetObjFieldByName(sku, syncStatusFieldName).(int8)
if (skuID == -1 || skuID == sku.ID) && (isForce || syncStatus != 0) {
updateFields := []string{syncStatusFieldName}
if syncStatus&model.SyncFlagDeletedMask != 0 { // 删除
err = multiStoresHandler.DeleteSku(db, sku, userName)
} else if syncStatus&model.SyncFlagNewMask != 0 { // 新增
err = multiStoresHandler.CreateSku(db, sku, userName)
updateFields = append(updateFields, multiStoresHandler.GetFieldIDName())
} else if syncStatus&model.SyncFlagModifiedMask != 0 { // 修改
err = multiStoresHandler.UpdateSku(db, sku, userName)
}
if err == nil {
jxutils.SetObjFieldByName(sku, syncStatusFieldName, int8(0))
dao.UpdateEntity(db, sku, updateFields...)
} else {
break
}
}
}
return nil, err
}, skuList)
_, err = task.GetResult(0)
}
return nil, err
})
return err
}
//
func (v *VendorSync) SyncStoresSkus(db *dao.DaoDB, storeIDs []int, skuIDs []int, isForce bool, userName string) (err error) {
globals.SugarLogger.Debug("SyncStoresSkus")
err = v.LoopStoreVendors(db, "SyncStoresSkus", userName, func(batchItemList []interface{}, params ...interface{}) (interface{}, error) {
handler := v.GetStoreHandler(batchItemList[0].(int))
err = handler.SyncStoresSkus(db, storeIDs, skuIDs, isForce, userName)
return nil, err
})
return err
}
func (v *VendorSync) LoopStoreMap(db *dao.DaoDB, taskName, userName string, storeID int, handler tasksch.WorkFunc) (err error) {
storeMaps, err := GetStoreVendorMaps(db, storeID, -1)
if err == nil {
task := tasksch.RunManagedTask(taskName, false, nil, len(storeMaps), 1, userName, handler, storeMaps)
_, err = task.GetResult(0)
}
return makeSyncError(err)
}
func (v *VendorSync) LoopMultiStoresVendors(db *dao.DaoDB, taskName, userName string, handler tasksch.WorkFunc) (err error) {
if taskName == "" {
taskName = "LoopMultiStoresVendors"
}
task := tasksch.RunManagedTask(taskName, false, nil, len(v.MultiStoreVendorIDs), 1, userName, handler, v.MultiStoreVendorIDs)
_, err = task.GetResult(0)
return makeSyncError(err)
}
func (v *VendorSync) LoopStoreVendors(db *dao.DaoDB, taskName, userName string, handler tasksch.WorkFunc) (err error) {
if taskName == "" {
taskName = "LoopStoreVendors"
}
allHandlers := make([]int, len(v.MultiStoreVendorIDs)+len(v.SingleStoreVendorIDs))
copy(allHandlers, v.MultiStoreVendorIDs)
copy(allHandlers[len(v.MultiStoreVendorIDs):], v.SingleStoreVendorIDs)
task := tasksch.RunManagedTask(taskName, false, nil, len(allHandlers), 1, userName, handler, allHandlers)
_, err = task.GetResult(0)
return makeSyncError(err)
}
func (v *VendorSync) LoopSingleStoreVendors(db *dao.DaoDB, taskName, userName string, handler tasksch.WorkFunc) (err error) {
if taskName == "" {
taskName = "LoopSingleStoreVendors"
}
var storeMaps []*model.StoreMap
if err = dao.GetRows(db, &storeMaps, `
SELECT *
FROM store_map
WHERE vendor_id IN (`+dao.GenQuestionMarks(len(v.SingleStoreVendorIDs))+")", v.SingleStoreVendorIDs); err == nil {
parellelCount := len(storeMaps)
if parellelCount > 20 {
parellelCount = 20
}
task := tasksch.RunManagedTask(taskName, false, nil, parellelCount, 1, userName, handler, storeMaps)
_, err = task.GetResult(0)
}
return makeSyncError(err)
}
func (v *VendorSync) RefreshSkuIDs(nameID, skuID int, isForce bool, userName string) (err error) {
sql := `
SELECT t1.id
FROM sku t1
JOIN sku_name t2 ON t1.name_id = t2.id
WHERE 1 = 1
`
sqlParams := []interface{}{}
if nameID != -1 {
sql += " AND t1.name_id = ?"
sqlParams = append(sqlParams, nameID)
}
if skuID != -1 {
sql += " AND t1.id = ?"
sqlParams = append(sqlParams, skuID)
}
var ids []int
db := dao.GetDB()
if err = dao.GetRows(db, &ids, sql, sqlParams); err == nil {
// globals.SugarLogger.Debug(utils.Format4Output(ids, false))
err = v.LoopMultiStoresVendors(db, "RefreshSkuIDs", userName, func(batchItemList []interface{}, params ...interface{}) (interface{}, error) {
multiStoresHandler := v.GetMultiStoreHandler(batchItemList[0].(int))
err := multiStoresHandler.SyncSkusIDMap(db, ids, userName)
globals.SugarLogger.Debug(err)
return nil, err
})
}
return err
}
func makeSyncError(err error) (newErr error) {
if err != nil {
return &SyncError{
Original: err,
}
}
return err
}
func (e *SyncError) Error() string {
return fmt.Sprintf("本地数据修改成功,但同步失败,请根据错误提示处理!,同步错误信息:%s", e.Original.Error())
}