From 98332eb6a12c1b67038ebb5fe9b8a9d40d445921 Mon Sep 17 00:00:00 2001 From: gazebo Date: Tue, 7 May 2019 14:19:51 +0800 Subject: [PATCH] =?UTF-8?q?-=20=E5=B0=BD=E9=87=8F=E8=AE=A9=E5=BC=82?= =?UTF-8?q?=E6=AD=A5=E4=BB=BB=E5=8A=A1=E7=9A=84hint=E8=BF=94=E5=9B=9E?= =?UTF-8?q?=E6=BB=A1=E8=B6=B3=E7=BA=A6=E5=AE=9A=EF=BC=88"",=20"0",=20"numb?= =?UTF-8?q?er",=20"taskID"=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- business/jxstore/cms/sync.go | 42 +++++++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/business/jxstore/cms/sync.go b/business/jxstore/cms/sync.go index 4ebf98873..a0309f0fc 100644 --- a/business/jxstore/cms/sync.go +++ b/business/jxstore/cms/sync.go @@ -213,22 +213,23 @@ func (v *VendorSync) SyncStore(ctx *jxcontext.Context, db *dao.DaoDB, vendorID, vendorID, } } - hint, err = v.LoopStoresMap(ctx, db, fmt.Sprintf("同步门店信息:%d", storeID), isAsync, false, vendorIDs, []int{storeID}, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { + return v.LoopStoresMap(ctx, db, fmt.Sprintf("同步门店信息:%d", storeID), isAsync, false, vendorIDs, []int{storeID}, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { loopMapInfo := batchItemList[0].(*LoopStoreMapInfo) handler := v.GetStoreHandler(loopMapInfo.VendorID) if len(loopMapInfo.StoreMapList) > 1 { loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[loopMapInfo.VendorID]), nil, ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + var resultList []interface{} storeMap := batchItemList[0].(*model.StoreMap) if err = handler.UpdateStore(db, storeMap.StoreID, userName); err == nil { storeMap.SyncStatus = 0 _, err = dao.UpdateEntity(db, storeMap, model.FieldSyncStatus) + resultList = append(resultList, 1) } - return nil, err + return resultList, err }, loopMapInfo.StoreMapList) t.AddChild(loopStoreTask).Run() - _, err = loopStoreTask.GetResult(0) - return nil, err + return loopStoreTask.GetResult(0) } storeMap := loopMapInfo.StoreMapList[0] if err = handler.UpdateStore(db, storeMap.StoreID, userName); err == nil { @@ -236,9 +237,8 @@ func (v *VendorSync) SyncStore(ctx *jxcontext.Context, db *dao.DaoDB, vendorID, _, err = dao.UpdateEntity(db, storeMap, model.FieldSyncStatus) } err = jxutils.AddVendorInfo2Err(err, loopMapInfo.VendorID) - return nil, err + return []interface{}{1}, err }) - return hint, err } func (v *VendorSync) SyncSku(ctx *jxcontext.Context, db *dao.DaoDB, nameID, skuID int, isAsync, isContinueWhenError bool, userName string) (hint string, err error) { @@ -259,6 +259,7 @@ func (v *VendorSync) SyncSkus(ctx *jxcontext.Context, db *dao.DaoDB, nameIDs []i globals.SugarLogger.Debugf("SyncSku trackInfo:%s, nameIDs:%v, skuIDs:%v, userName:%s", ctx.GetTrackInfo(), nameIDs, skuIDs, userName) return v.LoopMultiStoresVendors(ctx, db, fmt.Sprintf("同步商品信息, nameIDs:%v, skuIDs:%v", nameIDs, skuIDs), isAsync, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { + var resultList []interface{} vendorID := batchItemList[0].(int) multiStoresHandler := v.GetMultiStoreHandler(vendorID) syncStatusFieldName := dao.GetSyncStatusStructField(model.VendorNames[multiStoresHandler.GetVendorID()]) @@ -297,6 +298,7 @@ func (v *VendorSync) SyncSkus(ctx *jxcontext.Context, db *dao.DaoDB, nameIDs []i // todo 同一skuName下的sku顺序处理的原因是京东SPU特殊类型必须要序列化同步才能正常处理, db可能会有多线程问题 task := tasksch.NewParallelTask(fmt.Sprintf("处理平台%s", model.VendorChineseNames[vendorID]), tasksch.NewParallelConfig().SetParallelCount(10).SetIsContinueWhenError(isContinueWhenError), ctx, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { + var resultList []interface{} skuName := batchItemList[0].(*model.SkuName) var skuList []*model.Sku if err = dao.GetRows(db, &skuList, fmt.Sprintf(` @@ -333,6 +335,8 @@ func (v *VendorSync) SyncSkus(ctx *jxcontext.Context, db *dao.DaoDB, nameIDs []i refutil.SetObjFieldByName(sku, syncStatusFieldName, int8(0)) if _, err = dao.UpdateEntity(db, sku, updateFields...); err != nil { break + } else { + resultList = append(resultList, 1) } } } @@ -342,12 +346,15 @@ func (v *VendorSync) SyncSkus(ctx *jxcontext.Context, db *dao.DaoDB, nameIDs []i refutil.SetObjFieldByName(skuName, syncStatusFieldName, int8(0)) _, err = dao.UpdateEntity(db, skuName, syncStatusFieldName) } - return nil, err + return resultList, err }, skuNameList) t.AddChild(task).Run() - _, err = task.GetResult(0) + result, err2 := task.GetResult(0) + if err = err2; err == nil { + resultList = append(resultList, result...) + } } - return nil, err + return resultList, err }) } @@ -506,18 +513,27 @@ func (v *VendorSync) LoopStoresMap(ctx *jxcontext.Context, db *dao.DaoDB, taskNa task := tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, handler, loopInfoList) tasksch.HandleTask(task, nil, isManageIt).Run() if !isAsync { - _, err = task.GetResult(0) + if _, err = task.GetResult(0); err == nil { + hint = "1" // todo 暂时这样 + } + } else { + hint = task.ID } - return task.ID, makeSyncError(err) + return hint, makeSyncError(err) } func (v *VendorSync) LoopMultiStoresVendors(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync bool, userName string, handler tasksch.WorkFunc) (hint string, err error) { task := tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, handler, v.MultiStoreVendorIDs) tasksch.HandleTask(task, nil, true).Run() if !isAsync { - _, err = task.GetResult(0) + result, err2 := task.GetResult(0) + if err = err2; err == nil { + hint = utils.Int2Str(len(result)) + } + } else { + hint = task.ID } - return task.ID, makeSyncError(err) + return hint, makeSyncError(err) } func (v *VendorSync) RefreshAllSkusID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int, storeIDs []int) (hint string, err error) {