Files
jx-callback/business/jxstore/cms/sync2.go
2019-12-11 22:30:32 +08:00

511 lines
19 KiB
Go

package cms
import (
"fmt"
"time"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/baseapi/utils/errlist"
"git.rosy.net.cn/jx-callback/business/jxutils"
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
"git.rosy.net.cn/jx-callback/business/jxutils/tasksch"
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/model/dao"
"git.rosy.net.cn/jx-callback/business/partner"
"git.rosy.net.cn/jx-callback/business/partner/putils"
"git.rosy.net.cn/jx-callback/globals"
)
type MultiStoreVendorInfo struct {
VendorID int
OrgCode string
}
func CombineVendorIDAndOrgCode(vendorID int, orgCode string) string {
return fmt.Sprintf("%d-%s", vendorID, orgCode)
}
func getMultiStoreVendorInfoList() (list []*MultiStoreVendorInfo) {
vendorIDs := partner.GetMultiStoreVendorIDs()
for _, vendorID := range vendorIDs {
orgCodeList := partner.CurAPIManager.GetAppOrgCodeList(vendorID)
for _, v := range orgCodeList {
list = append(list, &MultiStoreVendorInfo{
VendorID: vendorID,
OrgCode: v,
})
}
}
return list
}
func SyncCategories(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorIDs []int, appOrgCodes []string, catIDs []int, isAsync bool) (hint string, err error) {
globals.SugarLogger.Debugf("SyncCategories catIDs:%v", catIDs)
db := dao.GetDB()
catList, err := dao.GetSkuCategoryWithVendor(db, nil, nil, -1, catIDs, true)
if err == nil && len(catList) > 0 {
// todo 按vendorID orgCode合并操作
task := tasksch.NewParallelTask(fmt.Sprintf("同步分类:%v", catIDs), nil, ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
catVendorInfo := batchItemList[0].(*dao.SkuStoreCatInfo)
if multiStoresHandler, ok := partner.GetPurchasePlatformFromVendorID(catVendorInfo.VendorID).(partner.IMultipleStoresHandler); ok {
if model.IsSyncStatusDelete(catVendorInfo.CatSyncStatus) { //删除
if !dao.IsVendorThingIDEmpty(catVendorInfo.VendorCatID) &&
model.IsSyncStatusNeedDelete(catVendorInfo.CatSyncStatus) {
err = multiStoresHandler.DeleteCategory2(ctx, catVendorInfo.VendorOrgCode, catVendorInfo.VendorCatID)
}
} else if model.IsSyncStatusNew(catVendorInfo.CatSyncStatus) { // 新增
err = multiStoresHandler.CreateCategory2(ctx, catVendorInfo)
} else if model.IsSyncStatusUpdate(catVendorInfo.CatSyncStatus) { // 修改
err = multiStoresHandler.UpdateCategory2(ctx, catVendorInfo)
}
} else {
err = fmt.Errorf("平台:%d不合法", catVendorInfo.VendorID)
}
if err = OnThingSync(ctx, dao.GetDB(), SkuCategoryVendor2ThingMap(catVendorInfo), err); err == nil {
retVal = []int{1}
}
return retVal, err
}, catList)
tasksch.HandleTask(task, parentTask, len(catIDs) > 0).Run()
if isAsync {
hint = task.GetID()
} else {
resultList, err2 := task.GetResult(0)
if err = err2; err == nil {
hint = utils.Int2Str(len(resultList))
}
}
}
return hint, err
}
func SyncSkus(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorIDs []int, appOrgCodes []string, nameIDs, skuIDs []int, isAsync bool) (hint string, err error) {
globals.SugarLogger.Debugf("SyncSkus nameIDs:%v, skuIDs:%v", nameIDs, skuIDs)
db := dao.GetDB()
skuList, err := dao.GetSkusWithVendor(db, nil, nil, nameIDs, skuIDs, true)
if err == nil && len(skuList) > 0 {
// todo 按vendorID orgCode合并操作
task := tasksch.NewParallelTask(fmt.Sprintf("同步商品:%v,%v", nameIDs, skuIDs), nil, ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
skuVendorInfo := batchItemList[0].(*dao.StoreSkuSyncInfo)
skuVendorInfo.SkuName = jxutils.ComposeSkuName(skuVendorInfo.Prefix, skuVendorInfo.Name, skuVendorInfo.Comment, skuVendorInfo.Unit, skuVendorInfo.SpecQuality, skuVendorInfo.SpecUnit, 0)
skuVendorInfo.MergedStatus = jxutils.MergeSkuStatus(skuVendorInfo.Status, skuVendorInfo.NameStatus)
if multiStoresHandler, ok := partner.GetPurchasePlatformFromVendorID(skuVendorInfo.VendorID).(partner.IMultipleStoresHandler); ok {
if model.IsSyncStatusDelete(skuVendorInfo.SkuSyncStatus) { //删除
if !dao.IsVendorThingIDEmpty(skuVendorInfo.VendorSkuID) &&
model.IsSyncStatusNeedDelete(skuVendorInfo.SkuSyncStatus) {
err = multiStoresHandler.DeleteSku2(ctx, skuVendorInfo.VendorOrgCode, storeSkuSyncInfo2Bare(skuVendorInfo))
}
} else if model.IsSyncStatusNew(skuVendorInfo.SkuSyncStatus) { // 新增
err = multiStoresHandler.CreateSku2(ctx, skuVendorInfo)
} else if model.IsSyncStatusUpdate(skuVendorInfo.SkuSyncStatus) { // 修改
err = multiStoresHandler.UpdateSku2(ctx, skuVendorInfo)
}
} else {
err = fmt.Errorf("平台:%d不合法", skuVendorInfo.VendorID)
}
if err = OnThingSync(ctx, dao.GetDB(), SkuVendor2ThingMap(skuVendorInfo), err); err == nil {
retVal = []int{1}
}
return retVal, err
}, skuList)
tasksch.HandleTask(task, parentTask, len(skuList) > 0).Run()
if isAsync {
hint = task.GetID()
} else {
resultList, err2 := task.GetResult(0)
if err = err2; err == nil {
hint = utils.Int2Str(len(resultList))
}
}
}
return hint, err
}
func SyncReorderCategories(ctx *jxcontext.Context, parentCatID int, isAsync bool) (hint string, err error) {
globals.SugarLogger.Debugf("SyncReorderCategories parentCatID:%d", parentCatID)
db := dao.GetDB()
hint, err = CurVendorSync.LoopMultiStoresVendors(ctx, db, fmt.Sprintf("分类重排序:%d", parentCatID), isAsync, false,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
vendorInfo := batchItemList[0].(*MultiStoreVendorInfo)
multiStoresHandler := CurVendorSync.GetMultiStoreHandler(vendorInfo.VendorID)
if multiStoresHandler != nil {
catList, err2 := dao.GetSkuCategoryWithVendor(db, []int{vendorInfo.VendorID}, []string{vendorInfo.OrgCode}, parentCatID, nil, false)
if err = err2; err == nil {
var vendorCatIDList []string
for _, v := range catList {
if v.VendorCatID != "" {
vendorCatIDList = append(vendorCatIDList, v.VendorCatID)
}
}
if len(vendorCatIDList) > 0 {
if err = multiStoresHandler.ReorderCategories2(ctx, vendorInfo.OrgCode, catList[0].ParentVendorCatID, vendorCatIDList); err == nil {
retVal = []int{len(vendorCatIDList)}
}
}
}
} else {
err = fmt.Errorf("非法平台:%d", vendorInfo.VendorID)
}
return retVal, err
})
return hint, err
}
func getThingMap(db *dao.DaoDB, thingMap *model.ThingMap) (err error) {
return dao.GetEntity(db, thingMap, "ThingID", "ThingType", "VendorID", "VendorOrgCode", model.FieldDeletedAt)
}
func OnCreateThing(ctx *jxcontext.Context, db *dao.DaoDB, vendorInfoList []*MultiStoreVendorInfo, thingID int64, thingType int8) (err error) {
if thingType == model.ThingTypeSkuName {
return nil
}
if len(vendorInfoList) == 0 {
vendorInfoList = getMultiStoreVendorInfoList()
}
errList := errlist.New()
for _, v := range vendorInfoList {
thingMap := &model.ThingMap{
ThingID: thingID,
ThingType: thingType,
VendorID: v.VendorID,
VendorOrgCode: v.OrgCode,
SyncStatus: model.SyncFlagNewMask,
}
dao.WrapAddIDCULDEntity(thingMap, ctx.GetUserName())
errList.AddErr(dao.CreateEntity(db, thingMap))
updateThingMapEntity(db, thingMap)
}
if globals.IsUseThingMap {
err = errList.GetErrListAsOne()
}
return err
}
func OnUpdateThing(ctx *jxcontext.Context, db *dao.DaoDB, vendorInfoList []*MultiStoreVendorInfo, thingID int64, thingType int8) (err error) {
if thingType == model.ThingTypeSkuName {
return nil
}
if len(vendorInfoList) == 0 {
vendorInfoList = getMultiStoreVendorInfoList()
}
errList := errlist.New()
for _, v := range vendorInfoList {
thingMap := &model.ThingMap{
ThingID: thingID,
ThingType: thingType,
VendorID: v.VendorID,
VendorOrgCode: v.OrgCode,
}
thingMap.DeletedAt = utils.DefaultTimeValue
if err2 := getThingMap(db, thingMap); err2 == nil {
thingMap.SyncStatus |= model.SyncFlagModifiedMask
thingMap.LastOperator = ctx.GetUserName()
_, err2 = dao.UpdateEntity(db, thingMap)
errList.AddErr(err2)
updateThingMapEntity(db, thingMap)
} else {
errList.AddErr(err2)
}
}
if globals.IsUseThingMap {
err = errList.GetErrListAsOne()
}
return err
}
func OnDeleteThing(ctx *jxcontext.Context, db *dao.DaoDB, vendorInfoList []*MultiStoreVendorInfo, thingID int64, thingType int8) (err error) {
if thingType == model.ThingTypeSkuName {
return nil
}
if len(vendorInfoList) == 0 {
vendorInfoList = getMultiStoreVendorInfoList()
}
errList := errlist.New()
for _, v := range vendorInfoList {
thingMap := &model.ThingMap{
ThingID: thingID,
ThingType: thingType,
VendorID: v.VendorID,
VendorOrgCode: v.OrgCode,
}
thingMap.DeletedAt = utils.DefaultTimeValue
if err2 := getThingMap(db, thingMap); err2 == nil {
if model.IsSyncStatusNew(thingMap.SyncStatus) {
thingMap.SyncStatus = 0
thingMap.DeletedAt = time.Now()
} else {
thingMap.SyncStatus |= model.SyncFlagDeletedMask
}
thingMap.LastOperator = ctx.GetUserName()
_, err2 = dao.UpdateEntity(db, thingMap)
errList.AddErr(err2)
updateThingMapEntity(db, thingMap)
} else if !dao.IsNoRowsError(err2) {
errList.AddErr(err2)
}
}
if globals.IsUseThingMap {
err = errList.GetErrListAsOne()
}
return err
}
func SkuCategoryVendor2ThingMap(cat *dao.SkuStoreCatInfo) (thingMap *model.ThingMap) {
thingMap = &model.ThingMap{
ThingID: int64(cat.ID),
ThingType: model.ThingTypeCategory,
VendorID: cat.VendorID,
VendorOrgCode: cat.VendorOrgCode,
SyncStatus: cat.CatSyncStatus,
VendorThingID: cat.VendorCatID,
}
thingMap.ID = cat.MapID // 一定要赋值
return thingMap
}
func SkuVendor2ThingMap(sku *dao.StoreSkuSyncInfo) (thingMap *model.ThingMap) {
thingMap = &model.ThingMap{
ThingID: int64(sku.SkuID),
ThingType: model.ThingTypeSku,
VendorID: sku.VendorID,
VendorOrgCode: sku.VendorOrgCode,
SyncStatus: sku.SkuSyncStatus,
VendorThingID: sku.VendorSkuID,
}
thingMap.DeletedAt = utils.DefaultTimeValue
thingMap.ID = sku.BindID // 一定要赋值
return thingMap
}
func OnThingSync(ctx *jxcontext.Context, db *dao.DaoDB, thingMap *model.ThingMap, syncErr error) (err error) {
globals.SugarLogger.Debugf("OnThingSync thingMap:%s", utils.Format4Output(thingMap, true))
if syncErr != nil {
err = syncErr
thingMap.Remark = utils.LimitUTF8StringLen(err.Error(), 255)
dao.UpdateEntity(db, thingMap, "Remark")
} else {
updateFields := []string{
model.FieldSyncStatus,
model.FieldUpdatedAt,
model.FieldLastOperator,
"Remark",
}
if model.IsSyncStatusDelete(thingMap.SyncStatus) { //删除
thingMap.DeletedAt = time.Now()
thingMap.VendorThingID = ""
updateFields = append(updateFields, "VendorThingID", model.FieldDeletedAt)
} else if model.IsSyncStatusNew(thingMap.SyncStatus) { // 新增
updateFields = append(updateFields, "VendorThingID")
}
thingMap.SyncStatus = 0
thingMap.LastOperator = ctx.GetUserName()
thingMap.UpdatedAt = time.Now()
thingMap.Remark = ""
_, err = dao.UpdateEntity(db, thingMap, updateFields...)
updateThingMapEntity(db, thingMap)
}
return err
}
func updateThingMapEntity(db *dao.DaoDB, thingMap *model.ThingMap) {
if thingMap.ThingType == model.ThingTypeCategory {
cat := &model.SkuCategory{
JdID: utils.Str2Int64WithDefault(thingMap.VendorThingID, 0),
JdSyncStatus: thingMap.SyncStatus,
}
cat.ID = int(thingMap.ThingID)
dao.UpdateEntity(db, cat, "JdID", "JdSyncStatus")
} else if thingMap.ThingType == model.ThingTypeSku {
sku := &model.Sku{
JdID: utils.Str2Int64WithDefault(thingMap.VendorThingID, 0),
JdSyncStatus: thingMap.SyncStatus,
}
sku.ID = int(thingMap.ThingID)
dao.UpdateEntity(db, sku, "JdID", "JdSyncStatus")
}
}
func amendAndPruneVendorStuff(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorID int, vendorOrgCode string, isAsync, isContinueWhenError bool, opType int, isForceUpdate bool) (hint string, err error) {
handler, _ := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.IMultipleStoresHandler)
if handler == nil {
return "", fmt.Errorf("平台:%s不支持此操作", model.VendorChineseNames[vendorID])
}
db := dao.GetDB()
vendorInfo := []*MultiStoreVendorInfo{
&MultiStoreVendorInfo{
VendorID: vendorID,
OrgCode: vendorOrgCode,
},
}
var sku2Delete []*partner.StoreSkuInfo
var cat2Delete []*partner.BareCategoryInfo
task := tasksch.NewParallelTask(fmt.Sprintf("平台:%s,账号:%s上的商品与商家分类", model.VendorChineseNames[vendorID], vendorOrgCode),
tasksch.NewParallelConfig().SetParallelCount(1).SetIsContinueWhenError(isContinueWhenError), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
step := batchItemList[0].(int)
switch step {
case 0:
localSkuList, err := dao.GetSkusWithVendor(db, []int{vendorID}, []string{vendorOrgCode}, nil, nil, false)
if err != nil {
return nil, err
}
localSkuMap := make(map[string]*dao.StoreSkuSyncInfo)
for _, v := range localSkuList {
if v.VendorSkuID != "" {
localSkuMap[v.VendorSkuID] = v
}
}
remoteSkuList, err2 := handler.GetSkus(ctx, vendorOrgCode, 0, "")
if err = err2; err == nil {
remoteSkuMap := make(map[string]int)
for _, v := range remoteSkuList {
if vendorSkuID := v.SkuList[0].VendorSkuID; vendorSkuID != "" {
if localSkuMap[vendorSkuID] == nil {
sku2Delete = append(sku2Delete, &partner.StoreSkuInfo{
SkuID: v.SkuList[0].SkuID,
VendorSkuID: vendorSkuID,
})
} else {
remoteSkuMap[vendorSkuID] = 1
}
} else if v.VendorNameID != "" {
sku2Delete = append(sku2Delete, &partner.StoreSkuInfo{
SkuID: v.NameID,
VendorSkuID: v.VendorNameID,
})
}
}
if opType == AmendPruneOnlyAmend || opType == AmendPruneAll {
for _, v := range localSkuList {
if v.BindID != 0 {
if !model.IsSyncStatusDelete(v.SkuSyncStatus) {
if remoteSkuMap[v.VendorSkuID] == 0 {
if !model.IsSyncStatusNew(v.SkuSyncStatus) {
OnCreateThing(ctx, db, vendorInfo, int64(v.SkuID), model.ThingTypeSku)
}
} else if isForceUpdate {
OnUpdateThing(ctx, db, vendorInfo, int64(v.SkuID), model.ThingTypeSku)
}
}
} else {
OnCreateThing(ctx, db, vendorInfo, int64(v.SkuID), model.ThingTypeSku)
}
}
}
}
case 1:
if (opType == AmendPruneOnlyPrune || opType == AmendPruneAll) && len(sku2Delete) > 0 {
_, err = putils.FreeBatchStoreSkuInfo("删除商品", func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
if err = handler.DeleteSku2(ctx, vendorOrgCode, batchedStoreSkuList[0]); err == nil {
successCount = 1
}
return nil, successCount, err
}, ctx, task, sku2Delete, 1, isContinueWhenError)
}
sku2Delete = nil
case 2:
localCatList, err := dao.GetSkuCategoryWithVendor(db, []int{vendorID}, []string{vendorOrgCode}, -1, nil, false)
if err != nil {
return nil, err
}
localCatMap := make(map[string]*dao.SkuStoreCatInfo)
for _, v := range localCatList {
localCatMap[v.VendorCatID] = v
localCatMap[v.Name] = v
localCatMap[utils.Int2Str(v.ID)] = v
}
remoteCatList, err2 := handler.GetAllCategories(ctx, vendorOrgCode)
if err = err2; err == nil {
remoteCatMap := make(map[string]int)
cat2Delete = checkRemoteCatExist(remoteCatMap, localCatMap, remoteCatList)
for _, v := range localCatList {
if v.MapID != 0 {
if !model.IsSyncStatusDelete(v.CatSyncStatus) {
if remoteCatMap[v.VendorCatID] == 0 {
if !model.IsSyncStatusNew(v.CatSyncStatus) {
OnCreateThing(ctx, db, vendorInfo, int64(v.ID), model.ThingTypeCategory)
}
} else if isForceUpdate && !model.IsSyncStatusUpdate(v.CatSyncStatus) {
OnUpdateThing(ctx, db, vendorInfo, int64(v.ID), model.ThingTypeCategory)
}
}
} else {
OnCreateThing(ctx, db, vendorInfo, int64(v.ID), model.ThingTypeCategory)
}
}
}
case 3:
if (opType == AmendPruneOnlyPrune || opType == AmendPruneAll) && len(cat2Delete) > 0 {
for i := 0; i < 2; i++ {
level := 2 - i
var levelCat2Delete []*partner.BareCategoryInfo
for _, v := range cat2Delete {
if v.Level == level {
levelCat2Delete = append(levelCat2Delete, v)
}
}
if len(levelCat2Delete) > 0 {
task4Delete := tasksch.NewParallelTask(fmt.Sprintf("删除商家分类,level:%d", level), tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
cat := batchItemList[0].(*partner.BareCategoryInfo)
err = handler.DeleteCategory2(ctx, vendorOrgCode, cat.VendorCatID)
return nil, err
}, levelCat2Delete)
tasksch.HandleTask(task4Delete, task, true).Run()
_, err = task4Delete.GetResult(0)
}
}
}
cat2Delete = nil
}
return nil, err
}, []int{0, 1, 2, 3})
tasksch.HandleTask(task, parentTask, true).Run()
if !isAsync {
_, err = task.GetResult(0)
hint = "1"
} else {
hint = task.ID
}
return hint, err
}
func FullSyncVendorStuff(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorID int, vendorOrgCode string, isAsync, isContinueWhenError bool) (hint string, err error) {
multiStoreHandler, _ := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.IMultipleStoresHandler)
if multiStoreHandler == nil {
return "", fmt.Errorf("vendorID:%d不是多门店平台", vendorID)
}
task := tasksch.NewParallelTask("FullSyncStoreSkuNew", tasksch.NewParallelConfig().SetParallelCount(1).SetIsContinueWhenError(isContinueWhenError), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
step := batchItemList[0].(int)
switch step {
case 0:
_, err = amendAndPruneVendorStuff(ctx, task, vendorID, vendorOrgCode, false, isContinueWhenError, AmendPruneAll, false)
case 1:
_, err = SyncCategories(ctx, task, []int{vendorID}, []string{vendorOrgCode}, nil, false)
case 2:
_, err = SyncSkus(ctx, task, []int{vendorID}, []string{vendorOrgCode}, nil, nil, false)
}
return retVal, err
}, []int{0, 1, 2})
tasksch.HandleTask(task, parentTask, true).Run()
if !isAsync {
_, err = task.GetResult(0)
} else {
hint = task.GetID()
}
return hint, err
}