From 6c7b9afd10555790b16cb32f91562ab81fd50aa4 Mon Sep 17 00:00:00 2001 From: gazebo Date: Fri, 26 Oct 2018 13:38:35 +0800 Subject: [PATCH] - remove RunParallelTask - refactor partner.SyncStoreCategory. --- business/jxstore/cms/sync.go | 3 +- business/jxutils/tasksch/parallel_task.go | 6 --- .../jxutils/tasksch/parallel_task_test.go | 6 ++- business/jxutils/tasksch/sequence_task.go | 6 --- business/partner/partner.go | 2 +- business/partner/purchase/ebai/common.go | 3 +- business/partner/purchase/ebai/store_sku.go | 41 +++++++++---------- business/partner/purchase/elm/store_sku.go | 5 +-- 8 files changed, 30 insertions(+), 42 deletions(-) diff --git a/business/jxstore/cms/sync.go b/business/jxstore/cms/sync.go index 22999df22..3ca28962a 100644 --- a/business/jxstore/cms/sync.go +++ b/business/jxstore/cms/sync.go @@ -337,11 +337,10 @@ func (v *VendorSync) LoopStoresMap(ctx *jxcontext.Context, db *dao.DaoDB, taskNa task := tasksch.NewParallelTask(taskName, nil, ctx.GetUserName(), handler, loopInfoList) ctx.SetTaskOrAddChild(task, nil) tasksch.ManageTask(task).Run() - hint = task.ID if !isAsync { _, err = task.GetResult(0) } - return hint, makeSyncError(err) + return task.ID, makeSyncError(err) } func (v *VendorSync) LoopMultiStoresVendors(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync bool, userName string, handler tasksch.WorkFunc) (hint string, err error) { diff --git a/business/jxutils/tasksch/parallel_task.go b/business/jxutils/tasksch/parallel_task.go index c2d34a11a..6aae8e859 100644 --- a/business/jxutils/tasksch/parallel_task.go +++ b/business/jxutils/tasksch/parallel_task.go @@ -95,12 +95,6 @@ func NewParallelTask(taskName string, config *ParallelConfig, userName string, w return task } -func RunParallelTask(taskName string, config *ParallelConfig, userName string, worker WorkFunc, itemList interface{}, params ...interface{}) *ParallelTask { - task := NewParallelTask(taskName, config, userName, worker, itemList, params...) - task.Run() - return task -} - func (task *ParallelTask) Run() { task.run(func() { globals.SugarLogger.Debugf("ParallelTask.Run %s", task.Name) diff --git a/business/jxutils/tasksch/parallel_task_test.go b/business/jxutils/tasksch/parallel_task_test.go index 08ca36274..45223a48e 100644 --- a/business/jxutils/tasksch/parallel_task_test.go +++ b/business/jxutils/tasksch/parallel_task_test.go @@ -14,7 +14,7 @@ func TestRunParallelTask(t *testing.T) { for k := range itemList { itemList[k] = k } - task := RunParallelTask("test", NewParallelConfig().SetParallelCount(100).SetBatchSize(7), "autotest", func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + task := NewParallelTask("test", NewParallelConfig().SetParallelCount(100).SetBatchSize(7), "autotest", func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { sleepSecond := rand.Intn(5) t.Logf("sleep %d seconds", sleepSecond) time.Sleep(time.Duration(sleepSecond) * time.Second) @@ -24,6 +24,7 @@ func TestRunParallelTask(t *testing.T) { } return retSlice, nil }, itemList, "a", "b", 1, 2) + task.Run() result, err := task.GetResult(1 * time.Microsecond) if err == nil || task.GetStatus() != TaskStatusWorking { t.Fatal("task can not be done in 1 microsecond") @@ -44,7 +45,7 @@ func TestCancelParallelTask(t *testing.T) { for k := range itemList { itemList[k] = k } - task := RunParallelTask("test", NewParallelConfig().SetParallelCount(100).SetBatchSize(7), "autotest", func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + task := NewParallelTask("test", NewParallelConfig().SetParallelCount(100).SetBatchSize(7), "autotest", func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { sleepSecond := rand.Intn(5) fmt.Printf("sleep %d seconds\n", sleepSecond) time.Sleep(time.Duration(sleepSecond) * time.Second) @@ -54,6 +55,7 @@ func TestCancelParallelTask(t *testing.T) { } return retSlice, nil }, itemList, "a", "b", 1, 2) + task.Run() // time.Sleep(time.Second * 6) fmt.Printf("finishedItemCount:%d, finishedJobCount:%d\n", task.GetFinishedItemCount(), task.GetFinishedJobCount()) task.Cancel() diff --git a/business/jxutils/tasksch/sequence_task.go b/business/jxutils/tasksch/sequence_task.go index 70c51a937..873459744 100644 --- a/business/jxutils/tasksch/sequence_task.go +++ b/business/jxutils/tasksch/sequence_task.go @@ -27,12 +27,6 @@ func NewSeqTask(taskName string, userName string, worker SeqWorkFunc, stepCount return task } -func RunSeqTask(taskName string, userName string, worker SeqWorkFunc, stepCount int, params ...interface{}) *SeqTask { - task := NewSeqTask(taskName, userName, worker, stepCount, params...) - task.Run() - return task -} - func (task *SeqTask) Run() { task.run(func() { globals.SugarLogger.Debugf("SeqTask.Run %s", task.Name) diff --git a/business/partner/partner.go b/business/partner/partner.go index 7b4792ce2..eff59323e 100644 --- a/business/partner/partner.go +++ b/business/partner/partner.go @@ -114,7 +114,7 @@ type IMultipleStoresHandler interface { type ISingleStoreHandler interface { IPurchasePlatformHandler - SyncStoresCategories(db *dao.DaoDB, storeIDs []int, userName string) (err error) + SyncStoreCategory(ctx *jxcontext.Context, parentTask tasksch.ITask, storeID int, isAsync bool) (hint string, err error) ReadStoreCategories(storeID int) (cats []*model.SkuCategory, err error) ReadStoreSku(storeID, skuID int) (skuNameExt *model.SkuNameExt, err error) diff --git a/business/partner/purchase/ebai/common.go b/business/partner/purchase/ebai/common.go index ddb3ae2ec..588685ec8 100644 --- a/business/partner/purchase/ebai/common.go +++ b/business/partner/purchase/ebai/common.go @@ -11,7 +11,7 @@ import ( func (p *PurchaseHandler) UpdatePlaces() (err error) { provinces, err := api.EbaiAPI.CommonShopCities(0) if err == nil { - task := tasksch.RunParallelTask("UpdatePlaces", nil, "test", func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + task := tasksch.NewParallelTask("UpdatePlaces", nil, "test", func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { province := batchItemList[0].(*ebaiapi.CityInfo) retSlice := make([]*ebaiapi.CityInfo, 0) if province.IsOpen != 0 { @@ -35,6 +35,7 @@ func (p *PurchaseHandler) UpdatePlaces() (err error) { } return retSlice, err }, provinces) + task.Run() places, err2 := task.GetResult(0) if err = err2; err == nil { globals.SugarLogger.Debug(utils.Format4Output(places, false)) diff --git a/business/partner/purchase/ebai/store_sku.go b/business/partner/purchase/ebai/store_sku.go index 58e3afd4f..5ed2481f5 100644 --- a/business/partner/purchase/ebai/store_sku.go +++ b/business/partner/purchase/ebai/store_sku.go @@ -133,7 +133,7 @@ func (p *PurchaseHandler) SyncStoreSkus(ctx *jxcontext.Context, parentTask tasks return "", err } } - if err = p.SyncOneStoreCategories(db, storeID, userName); err != nil { + if _, err = p.SyncStoreCategory(ctx, parentTask, storeID, false); err != nil { return "", err } if err = dao.GetRows(db, &storeSkuInfoList, sql, sqlParams...); err == nil { @@ -171,19 +171,6 @@ func (p *PurchaseHandler) SyncStoreSkus(ctx *jxcontext.Context, parentTask tasks return "", err } -func (p *PurchaseHandler) SyncStoresCategories(db *dao.DaoDB, storeIDs []int, userName string) (err error) { - globals.SugarLogger.Debugf("SyncStoresCategories storeIDs:%d, userName:%s", storeIDs, userName) - - if globals.EnableStoreWrite { - for _, storeID := range storeIDs { - if err = p.SyncOneStoreCategories(db, storeID, userName); err != nil { - break - } - } - } - return err -} - func (p *PurchaseHandler) ReadStoreCategories(storeID int) (cats []*model.SkuCategory, err error) { return nil, err } @@ -201,7 +188,7 @@ func (p *PurchaseHandler) GetAllRemoteSkus(storeID int) (skus []map[string]inter for i := 2; i <= page1.Pages; i++ { pages[i-2] = i } - task := tasksch.RunParallelTask("GetAllRemoteSkus", nil, "", func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + task := tasksch.NewParallelTask("GetAllRemoteSkus", nil, "", func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { callParams := map[string]interface{}{ "pagesize": MaxPageSize, "page": batchItemList[0], @@ -213,6 +200,7 @@ func (p *PurchaseHandler) GetAllRemoteSkus(storeID int) (skus []map[string]inter globals.SugarLogger.Debug(utils.Format4Output(callParams, false)) return nil, err2 }, pages) + task.Run() result, err2 := task.GetResult(0) if err = err2; err == nil { for _, v := range result { @@ -234,7 +222,7 @@ func (p *PurchaseHandler) DeleteRemoteSkus(storeID int, vendorSkuIDs []string) ( } } } - task := tasksch.RunParallelTask("DeleteRemoteSkus", tasksch.NewParallelConfig().SetBatchSize(100), "", func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + task := tasksch.NewParallelTask("DeleteRemoteSkus", tasksch.NewParallelConfig().SetBatchSize(100), "", func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { strList := make([]string, len(batchItemList)) for k, v := range batchItemList { strList[k] = v.(string) @@ -244,6 +232,7 @@ func (p *PurchaseHandler) DeleteRemoteSkus(storeID int, vendorSkuIDs []string) ( } return nil, err }, vendorSkuIDs) + task.Run() _, err = task.GetResult(0) return err } @@ -259,12 +248,13 @@ func (p *PurchaseHandler) DeleteRemoteCategories(storeID int, vendorCatIDs []int } } } - task := tasksch.RunParallelTask("DeleteRemoteCategories", nil, "", func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + task := tasksch.NewParallelTask("DeleteRemoteCategories", nil, "", func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { if globals.EnableStoreWrite { err = api.EbaiAPI.ShopCategoryDelete(strStoreID, batchItemList[0].(int64)) } return nil, err }, vendorCatIDs) + task.Run() _, err = task.GetResult(0) return err } @@ -365,10 +355,13 @@ func (p *PurchaseHandler) syncOneStoreCategoriesFromRemote2Local(db *dao.DaoDB, // 从本地同步分类信息到饿百 // 测试过程中出现过,父分类创建成功后马上创建子分类会报没有父分类错 // todo 对于deleted_at的处理有问题 -func (p *PurchaseHandler) SyncOneStoreCategories(db *dao.DaoDB, storeID int, userName string) (err error) { +func (p *PurchaseHandler) SyncStoreCategory(ctx *jxcontext.Context, parentTask tasksch.ITask, storeID int, isAsync bool) (hint string, err error) { + userName := ctx.GetUserName() globals.SugarLogger.Debugf("SyncOneStoreCategories storeID:%d, userName:%s", storeID, userName) - for level := 1; level <= 2; level++ { + db := dao.GetDB() + rootTask := tasksch.NewSeqTask("ebai SyncStoreCategory", userName, func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { + level := step + 1 sql := ` SELECT t2.*, t1.name, t1.parent_id, t1.level, t1.type, t1.seq, t2p.ebai_id parent_ebai_id FROM sku_category t1 @@ -385,7 +378,7 @@ func (p *PurchaseHandler) SyncOneStoreCategories(db *dao.DaoDB, storeID int, use } if err = dao.GetRows(db, &catList, sql, sqlParams...); err == nil { strStoreID := utils.Int2Str(storeID) - task := tasksch.RunParallelTask("syncOneStoreCategoriesFromLocal2Remote", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + task := tasksch.NewParallelTask("syncOneStoreCategoriesFromLocal2Remote", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { updateFields := []string{model.FieldEbaiSyncStatus} catInfo := batchItemList[0].(*tStoreCatInfo) // globals.SugarLogger.Debug(utils.Format4Output(catInfo, false)) @@ -409,10 +402,16 @@ func (p *PurchaseHandler) SyncOneStoreCategories(db *dao.DaoDB, storeID int, use } return nil, err }, catList) + rootTask.AddChild(task).Run() _, err = task.GetResult(0) } + return nil, err + }, 2) + parentTask.AddChild(rootTask).Run() + if !isAsync { + _, err = rootTask.GetResult(0) } - return err + return rootTask.ID, err } func (p *PurchaseHandler) processLocalCatByRemote(db *dao.DaoDB, storeID int, localCatMap map[string]*tStoreCatInfo, remoteCatList []*ebaiapi.CategoryInfo, userName string) (err error) { diff --git a/business/partner/purchase/elm/store_sku.go b/business/partner/purchase/elm/store_sku.go index aa67df6e1..d54263c30 100644 --- a/business/partner/purchase/elm/store_sku.go +++ b/business/partner/purchase/elm/store_sku.go @@ -4,11 +4,10 @@ import ( "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" "git.rosy.net.cn/jx-callback/business/jxutils/tasksch" "git.rosy.net.cn/jx-callback/business/model" - "git.rosy.net.cn/jx-callback/business/model/dao" ) -func (p *PurchaseHandler) SyncStoresCategories(db *dao.DaoDB, storeIDs []int, userName string) (err error) { - return nil +func (p *PurchaseHandler) SyncStoreCategory(ctx *jxcontext.Context, parentTask tasksch.ITask, storeID int, isAsync bool) (hint string, err error) { + return "", nil } func (p *PurchaseHandler) ReadStoreCategories(storeID int) (cats []*model.SkuCategory, err error) { return nil, nil