From 2169484361d2d087a1693417e6d2fbde6a5bc77d Mon Sep 17 00:00:00 2001 From: gazebo Date: Fri, 18 Jan 2019 16:50:05 +0800 Subject: [PATCH] - refactor SyncSku, group by skuName --- business/jxstore/cms/sync.go | 106 ++++++++++++++++++++--------------- 1 file changed, 60 insertions(+), 46 deletions(-) diff --git a/business/jxstore/cms/sync.go b/business/jxstore/cms/sync.go index 97a763f20..d38146565 100644 --- a/business/jxstore/cms/sync.go +++ b/business/jxstore/cms/sync.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "reflect" - "sort" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxutils" @@ -237,59 +236,74 @@ func (v *VendorSync) SyncSku(ctx *jxcontext.Context, db *dao.DaoDB, nameID, skuI return v.LoopMultiStoresVendors(ctx, db, "SyncSku", isAsync, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { multiStoresHandler := v.GetMultiStoreHandler(batchItemList[0].(int)) syncStatusFieldName := dao.GetSyncStatusStructField(model.VendorNames[multiStoresHandler.GetVendorID()]) - var skuList []*model.Sku - cond := make(map[string]interface{}) + dbField := dao.ConvertDBFieldPrefix(model.VendorNames[multiStoresHandler.GetVendorID()]) + sql := fmt.Sprintf(` + SELECT DISTINCT t2.* + FROM sku t1 + JOIN sku_name t2 ON t2.id = t1.name_id + WHERE t1.%s_sync_status <> 0 + `, dbField) + sqlParams := []interface{}{} if nameID != -1 { - cond[model.FieldNameID] = nameID + sql += " AND t1.name_id = ?" + sqlParams = append(sqlParams, nameID) } if skuID != -1 { - cond[model.FieldID] = skuID + sql += " AND t1.id = ?" + sqlParams = append(sqlParams, skuID) + } - err := dao.GetEntitiesByKV(db, &skuList, cond, true) - if err == nil { - var dirtySkuList []*model.Sku - for _, v := range skuList { - syncStatus := jxutils.GetObjFieldByName(v, syncStatusFieldName).(int8) - if syncStatus != 0 { - dirtySkuList = append(dirtySkuList, v) - } - } - if len(dirtySkuList) > 0 { - sort.Sort(jxutils.SkuList(dirtySkuList)) - // globals.SugarLogger.Debug(utils.Format4Output(dirtySkuList, false)) - // todo 这里SetParallelCount(1)的原因是京东SPU特殊类型必须要序列化同步才能正常处理, db可能会有多线程问题 - task := tasksch.NewParallelTask("SyncSku loop sku", tasksch.NewParallelConfig().SetParallelCount(1).SetIsContinueWhenError(isContinueWhenError), userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { - sku := batchItemList[0].(*model.Sku) - syncStatus := jxutils.GetObjFieldByName(sku, syncStatusFieldName).(int8) - if (skuID == -1 || skuID == sku.ID) && (syncStatus != 0) { - updateFields := []string{syncStatusFieldName} - if syncStatus&model.SyncFlagDeletedMask != 0 { // 删除 - err = multiStoresHandler.DeleteSku(db, sku, userName) - } else if syncStatus&model.SyncFlagNewMask != 0 { // 新增 - if err = multiStoresHandler.CreateSku(db, sku, userName); err == nil { - var tmpStruct struct { - MaxIndex int + sql += " ORDER BY t2.id" + + var skuNameList []*model.SkuName + err := dao.GetRows(db, &skuNameList, sql, sqlParams...) + if err == nil && len(skuNameList) > 0 { + // todo 同一skuName下的sku顺序处理的原因是京东SPU特殊类型必须要序列化同步才能正常处理, db可能会有多线程问题 + task := tasksch.NewParallelTask("SyncSku loop sku", tasksch.NewParallelConfig().SetParallelCount(10).SetIsContinueWhenError(isContinueWhenError), userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { + skuName := batchItemList[0].(*model.SkuName) + var skuList []*model.Sku + if err = dao.GetRows(db, &skuList, fmt.Sprintf(` + SELECT * + FROM sku + WHERE name_id = ? AND %s_sync_status <> 0 + ORDER BY updated_at, id + `, dbField), skuName.ID); err == nil && len(skuList) > 0 { + for _, sku := range skuList { + syncStatus := jxutils.GetObjFieldByName(sku, syncStatusFieldName).(int8) + if (skuID == -1 || skuID == sku.ID) && (syncStatus != 0) { + updateFields := []string{syncStatusFieldName} + if syncStatus&model.SyncFlagDeletedMask != 0 { // 删除 + err = multiStoresHandler.DeleteSku(db, sku, userName) + } else if syncStatus&model.SyncFlagNewMask != 0 { // 新增 + if err = multiStoresHandler.CreateSku(db, sku, userName); err == nil { + var tmpStruct struct { + MaxIndex int + } + // todo hard code 得到京东spu中sku的顺序(以方便以后修改销售属性),这个必须要每次重新从数据库取 + if dao.GetRow(db, &tmpStruct, "SELECT MAX(sku_index) max_index FROM sku WHERE name_id = ? AND jd_id > 0 AND jd_id < 4024012631406 ", sku.NameID) == nil { + sku.SkuIndex = tmpStruct.MaxIndex + 1 + updateFields = append(updateFields, "SkuIndex") + } + updateFields = append(updateFields, dao.GetVendorThingIDStructField(model.VendorNames[multiStoresHandler.GetVendorID()])) } - // todo hard code 得到京东spu中sku的顺序(以方便以后修改销售属性) - if dao.GetRow(db, &tmpStruct, "SELECT MAX(sku_index) max_index FROM sku WHERE name_id = ? AND jd_id > 0 AND jd_id < 4024012631406 ", sku.NameID) == nil { - sku.SkuIndex = tmpStruct.MaxIndex + 1 - updateFields = append(updateFields, "SkuIndex") - } - updateFields = append(updateFields, dao.GetVendorThingIDStructField(model.VendorNames[multiStoresHandler.GetVendorID()])) + } else if syncStatus&model.SyncFlagModifiedMask != 0 { // 修改 + err = multiStoresHandler.UpdateSku(db, sku, userName) + } + if err == nil { + jxutils.SetObjFieldByName(sku, syncStatusFieldName, int8(0)) + dao.UpdateEntity(db, sku, updateFields...) } - } else if syncStatus&model.SyncFlagModifiedMask != 0 { // 修改 - err = multiStoresHandler.UpdateSku(db, sku, userName) - } - if err == nil { - jxutils.SetObjFieldByName(sku, syncStatusFieldName, int8(0)) - dao.UpdateEntity(db, sku, updateFields...) } } - return nil, err - }, dirtySkuList) - t.AddChild(task).Run() - _, err = task.GetResult(0) - } + } + if err == nil { + jxutils.SetObjFieldByName(skuName, syncStatusFieldName, int8(0)) + dao.UpdateEntity(db, skuName, syncStatusFieldName) + } + return nil, err + }, skuNameList) + t.AddChild(task).Run() + _, err = task.GetResult(0) } return nil, err })