diff --git a/business/jxstore/cms/sync.go b/business/jxstore/cms/sync.go index 6089e70f3..1a405c9e8 100644 --- a/business/jxstore/cms/sync.go +++ b/business/jxstore/cms/sync.go @@ -119,9 +119,9 @@ func (v *VendorSync) GetSingleStoreHandler(vendorID int) partner.ISingleStoreHan return nil } -func (v *VendorSync) syncCategories(multiStoresHandler partner.IMultipleStoresHandler, db *dao.DaoDB, cats []*model.SkuCategory, userName string) (err error) { +func (v *VendorSync) syncCategories(parentTask tasksch.ITask, multiStoresHandler partner.IMultipleStoresHandler, db *dao.DaoDB, cats []*model.SkuCategory, userName string) (err error) { syncStatusFieldName := multiStoresHandler.GetFieldSyncStatusName() - task := tasksch.RunTask("syncCategories", false, nil, len(cats), 1, userName, func(batchItemList []interface{}, params ...interface{}) (interface{}, error) { + task := tasksch.RunParallelTask("syncCategories", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { cat := batchItemList[0].(*model.SkuCategory) updateFields := []string{syncStatusFieldName} syncStatus := jxutils.GetObjFieldByName(cat, syncStatusFieldName).(int8) @@ -139,13 +139,14 @@ func (v *VendorSync) syncCategories(multiStoresHandler partner.IMultipleStoresHa } return nil, err }, cats) + parentTask.AddChild(task) _, err = task.GetResult(0) return err } func (v *VendorSync) SyncCategory(db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) { globals.SugarLogger.Debug(v.MultiStoreVendorIDs) - hint, err = v.LoopMultiStoresVendors(db, "SyncCategory", isAsync, userName, func(batchItemList []interface{}, params ...interface{}) (interface{}, error) { + hint, err = v.LoopMultiStoresVendors(db, "SyncCategory", isAsync, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { multiStoresHandler := v.GetMultiStoreHandler(batchItemList[0].(int)) var cats []*model.SkuCategory cond := make(map[string]interface{}) @@ -156,7 +157,7 @@ func (v *VendorSync) SyncCategory(db *dao.DaoDB, categoryID int, isAsync bool, u } err := dao.GetEntitiesByKV(db, &cats, cond, true) if err == nil { - err = v.syncCategories(multiStoresHandler, db, cats, userName) + err = v.syncCategories(t, multiStoresHandler, db, cats, userName) } if err != nil || categoryID > 0 { return nil, err @@ -164,7 +165,7 @@ func (v *VendorSync) SyncCategory(db *dao.DaoDB, categoryID int, isAsync bool, u cond[model.FieldLevel] = 2 err = dao.GetEntitiesByKV(db, &cats, cond, true) if err == nil { - err = v.syncCategories(multiStoresHandler, db, cats, userName) + err = v.syncCategories(t, multiStoresHandler, db, cats, userName) } return nil, err }) @@ -172,7 +173,7 @@ func (v *VendorSync) SyncCategory(db *dao.DaoDB, categoryID int, isAsync bool, u } func (v *VendorSync) SyncReorderCategories(db *dao.DaoDB, categoryID int, isAsync bool, userName string) (hint string, err error) { - hint, err = v.LoopMultiStoresVendors(db, "SyncReorderCategories", isAsync, userName, func(batchItemList []interface{}, params ...interface{}) (interface{}, error) { + hint, err = v.LoopMultiStoresVendors(db, "SyncReorderCategories", isAsync, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { multiStoresHandler := v.GetMultiStoreHandler(batchItemList[0].(int)) err2 := multiStoresHandler.ReorderCategories(db, categoryID, userName) if err2 == nil { @@ -187,7 +188,7 @@ func (v *VendorSync) SyncReorderCategories(db *dao.DaoDB, categoryID int, isAsyn func (v *VendorSync) SyncStore(db *dao.DaoDB, vendorID, storeID int, isAsync bool, userName string) (hint string, err error) { globals.SugarLogger.Debugf("SyncStore, storeID:%d", storeID) - hint, err = v.LoopStoreMap(db, "SyncStore", isAsync, userName, storeID, func(batchItemList []interface{}, params ...interface{}) (interface{}, error) { + hint, err = v.LoopStoreMap(db, "SyncStore", isAsync, userName, storeID, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { storeMap := batchItemList[0].(*model.StoreMap) if (vendorID == -1 || vendorID == storeMap.VendorID) && (storeMap.SyncStatus != 0) { if handler := v.GetStoreHandler(storeMap.VendorID); handler != nil { @@ -204,7 +205,7 @@ func (v *VendorSync) SyncStore(db *dao.DaoDB, vendorID, storeID int, isAsync boo func (v *VendorSync) SyncSku(db *dao.DaoDB, nameID, skuID int, isAsync bool, userName string) (hint string, err error) { globals.SugarLogger.Debugf("SyncSku, nameID:%d, skuID:%d, userName:%s", nameID, skuID, userName) - hint, err = v.LoopMultiStoresVendors(db, "SyncSku", isAsync, userName, func(batchItemList []interface{}, params ...interface{}) (interface{}, error) { + hint, err = v.LoopMultiStoresVendors(db, "SyncSku", isAsync, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { multiStoresHandler := v.GetMultiStoreHandler(batchItemList[0].(int)) syncStatusFieldName := multiStoresHandler.GetFieldSyncStatusName() var skuList []*model.Sku @@ -218,7 +219,7 @@ func (v *VendorSync) SyncSku(db *dao.DaoDB, nameID, skuID int, isAsync bool, use err := dao.GetEntitiesByKV(db, &skuList, cond, true) if err == nil { // globals.SugarLogger.Debug(utils.Format4Output(skuList, false)) - task := tasksch.RunTask("SyncSku", false, nil, len(skuList), 1, userName, func(batchItemList []interface{}, params ...interface{}) (interface{}, error) { + task := tasksch.RunParallelTask("SyncSku", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { sku := batchItemList[0].(*model.Sku) syncStatus := jxutils.GetObjFieldByName(sku, syncStatusFieldName).(int8) if (skuID == -1 || skuID == sku.ID) && (syncStatus != 0) { @@ -238,6 +239,7 @@ func (v *VendorSync) SyncSku(db *dao.DaoDB, nameID, skuID int, isAsync bool, use } return nil, err }, skuList) + t.AddChild(task) _, err = task.GetResult(0) } return nil, err @@ -248,7 +250,7 @@ func (v *VendorSync) SyncSku(db *dao.DaoDB, nameID, skuID int, isAsync bool, use // func (v *VendorSync) SyncStoresSkus(db *dao.DaoDB, vendorIDs []int, storeIDs []int, skuIDs []int, isAsync bool, userName string) (hint string, err error) { globals.SugarLogger.Debug("SyncStoresSkus") - hint, err = v.LoopStoreVendors(db, vendorIDs, "SyncStoresSkus", isAsync, userName, func(batchItemList []interface{}, params ...interface{}) (interface{}, error) { + hint, err = v.LoopStoreVendors(db, vendorIDs, "SyncStoresSkus", isAsync, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { handler := v.GetStoreHandler(batchItemList[0].(int)) _, err = handler.SyncStoresSkus(db, storeIDs, skuIDs, false, userName) return nil, err @@ -259,25 +261,24 @@ func (v *VendorSync) SyncStoresSkus(db *dao.DaoDB, vendorIDs []int, storeIDs []i func (v *VendorSync) LoopStoreMap(db *dao.DaoDB, taskName string, isAsync bool, userName string, storeID int, handler tasksch.WorkFunc) (hint string, err error) { storeMaps, err := GetStoreVendorMaps(db, storeID, -1) if err == nil { - task := tasksch.RunManagedTask(taskName, false, nil, len(storeMaps), 1, userName, handler, storeMaps) + task := tasksch.RunManagedParallelTask(taskName, nil, userName, handler, storeMaps) hint = task.ID if !isAsync { _, err = task.GetResult(0) } } - return "", makeSyncError(err) + return hint, makeSyncError(err) } func (v *VendorSync) LoopMultiStoresVendors(db *dao.DaoDB, taskName string, isAsync bool, userName string, handler tasksch.WorkFunc) (hint string, err error) { if taskName == "" { taskName = "LoopMultiStoresVendors" } - task := tasksch.RunManagedTask(taskName, false, nil, len(v.MultiStoreVendorIDs), 1, userName, handler, v.MultiStoreVendorIDs) - hint = task.ID + task := tasksch.RunManagedParallelTask(taskName, nil, userName, handler, v.MultiStoreVendorIDs) if !isAsync { _, err = task.GetResult(0) } - return "", makeSyncError(err) + return task.ID, makeSyncError(err) } func (v *VendorSync) LoopStoreVendors(db *dao.DaoDB, vendorIDs []int, taskName string, isAsync bool, userName string, handler tasksch.WorkFunc) (hint string, err error) { @@ -304,12 +305,11 @@ func (v *VendorSync) LoopStoreVendors(db *dao.DaoDB, vendorIDs []int, taskName s } allHandlers = allHandlers[:count] } - task := tasksch.RunManagedTask(taskName, false, nil, len(allHandlers), 1, userName, handler, allHandlers) - if isAsync { - return task.ID, nil + task := tasksch.RunManagedParallelTask(taskName, nil, userName, handler, allHandlers) + if !isAsync { + _, err = task.GetResult(0) } - _, err = task.GetResult(0) - return "", makeSyncError(err) + return task.ID, err } func (v *VendorSync) LoopSingleStoreVendors(db *dao.DaoDB, taskName string, isAsync bool, userName string, handler tasksch.WorkFunc) (hint string, err error) { @@ -321,17 +321,13 @@ func (v *VendorSync) LoopSingleStoreVendors(db *dao.DaoDB, taskName string, isAs SELECT * FROM store_map WHERE vendor_id IN (`+dao.GenQuestionMarks(len(v.SingleStoreVendorIDs))+")", v.SingleStoreVendorIDs); err == nil { - parellelCount := len(storeMaps) - if parellelCount > 20 { - parellelCount = 20 - } - task := tasksch.RunManagedTask(taskName, false, nil, parellelCount, 1, userName, handler, storeMaps) + task := tasksch.RunManagedParallelTask(taskName, nil, userName, handler, storeMaps) hint = task.ID if !isAsync { _, err = task.GetResult(0) } } - return "", makeSyncError(err) + return hint, makeSyncError(err) } func (v *VendorSync) RefreshSkuIDs(nameID, skuID int, userName string) (err error) { @@ -355,7 +351,7 @@ func (v *VendorSync) RefreshSkuIDs(nameID, skuID int, userName string) (err erro db := dao.GetDB() if err = dao.GetRows(db, &ids, sql, sqlParams); err == nil { // globals.SugarLogger.Debug(utils.Format4Output(ids, false)) - _, err = v.LoopMultiStoresVendors(db, "RefreshSkuIDs", false, userName, func(batchItemList []interface{}, params ...interface{}) (interface{}, error) { + _, err = v.LoopMultiStoresVendors(db, "RefreshSkuIDs", false, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { multiStoresHandler := v.GetMultiStoreHandler(batchItemList[0].(int)) err := multiStoresHandler.SyncSkusIDMap(db, ids, userName) globals.SugarLogger.Debug(err) diff --git a/business/jxstore/financial/financial.go b/business/jxstore/financial/financial.go index e42bf401b..a754ebb6e 100644 --- a/business/jxstore/financial/financial.go +++ b/business/jxstore/financial/financial.go @@ -25,7 +25,7 @@ type tUploadFileInfo struct { StoreID int } -func SendFilesToStores(files []*multipart.FileHeader, isAsync bool, userName string) (msg string, err error) { +func SendFilesToStores(files []*multipart.FileHeader, isAsync bool, userName string) (hint string, err error) { globals.SugarLogger.Debugf("SendFilesToStores, fileCount:%d isAsync:%t, userName:%s", len(files), isAsync, userName) if len(files) == 0 { return "", errors.New("没有文件上传!") @@ -51,7 +51,7 @@ func SendFilesToStores(files []*multipart.FileHeader, isAsync bool, userName str } upToken := putPolicy.UploadToken(api.QiniuAPI) cfg := &storage.Config{} - task := tasksch.RunManagedTask("SendFilesToStores", false, nil, 0, 1, userName, func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + task := tasksch.RunManagedParallelTask("SendFilesToStores", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { fileInfo := batchItemList[0].(*tUploadFileInfo) fileHeader := fileInfo.FileHeader storeID := fileInfo.StoreID @@ -83,11 +83,9 @@ func SendFilesToStores(files []*multipart.FileHeader, isAsync bool, userName str return retVal, err }, fileList) - if isAsync { - return task.ID, nil + hint = task.ID + if !isAsync { + _, err = task.GetResult(0) } - if _, err = task.GetResult(0); err == nil { - return "Done", nil - } - return "", err + return task.ID, err } diff --git a/business/jxstore/promotion/jd_promotion.go b/business/jxstore/promotion/jd_promotion.go index fac82f7b1..518deb718 100644 --- a/business/jxstore/promotion/jd_promotion.go +++ b/business/jxstore/promotion/jd_promotion.go @@ -258,40 +258,52 @@ func CreateJdPromotion(isIDJd bool, isAsync bool, params *PromotionParams, userN } dao.Commit(db) - task := tasksch.RunTask("CreateJdPromotion update sku price", false, nil, 0, 1, userName, func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { - storeID := batchItemList[0].(int) - modifyPricesList := jxutils.SplitSlice(modifyPricesList[storeID], jdapi.MaxStoreSkuBatchSize) - for _, modifyPrices := range modifyPricesList { - modifyPrices2 := make([]*jdapi.SkuPriceInfo, len(modifyPrices)) - for k, v := range modifyPrices { - modifyPrices2[k] = v.(*jdapi.SkuPriceInfo) + rootTask := tasksch.RunManagedSeqTask("CreateJdPromotion", userName, func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { + if step == 0 { + task1 := tasksch.RunParallelTask("CreateJdPromotion update sku price", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + storeID := batchItemList[0].(int) + modifyPricesList := jxutils.SplitSlice(modifyPricesList[storeID], jdapi.MaxStoreSkuBatchSize) + for _, modifyPrices := range modifyPricesList { + modifyPrices2 := make([]*jdapi.SkuPriceInfo, len(modifyPrices)) + for k, v := range modifyPrices { + modifyPrices2[k] = v.(*jdapi.SkuPriceInfo) + } + if _, err = api.JdAPI.UpdateVendorStationPrice(utils.Int2Str(storeID), "", modifyPrices2); err != nil { + return nil, err + } + } + return nil, nil + }, jxStoreIDs) + task.AddChild(task1) + if _, err = task1.GetResult(0); err != nil { + return "", err } - if _, err = api.JdAPI.UpdateVendorStationPrice(utils.Int2Str(storeID), "", modifyPrices2); err != nil { + } else if step == 1 { + if err = promotionHandler.CreatePromotionRules(infoId, "", 1, 1, 1, 1); err != nil { + return "", err + } + } else if step == 2 { + task2 := tasksch.RunParallelTask("CreateJdPromotion CreatePromotionSku", tasksch.NewParallelConfig().SetBatchSize(jdapi.MaxPromotionSkuCount), userName, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + skus := make([]map[string]interface{}, len(batchItemList)) + for k, v := range batchItemList { + skus[k] = v.(map[string]interface{}) + } + _, err = promotionHandler.CreatePromotionSku(infoId, "", skus) return nil, err + }, promotionPrices) + task.AddChild(task2) + if _, err = task2.GetResult(0); err != nil { + return "", err } + } else if step == 3 { + err = promotionHandler.ConfirmPromotion(infoId, "") } - return nil, nil - }, jxStoreIDs) - if _, err = task.GetResult(0); err != nil { - return "", err - } - if err = promotionHandler.CreatePromotionRules(infoId, "", 1, 1, 1, 1); err != nil { - return "", err - } - task = tasksch.RunManagedTask("CreateJdPromotion CreatePromotionSku", false, nil, 0, jdapi.MaxPromotionSkuCount, userName, func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { - skus := make([]map[string]interface{}, len(batchItemList)) - for k, v := range batchItemList { - skus[k] = v.(map[string]interface{}) - } - _, err = promotionHandler.CreatePromotionSku(infoId, "", skus) return nil, err - }, promotionPrices) - if _, err = task.GetResult(0); err != nil { - return "", err + }, 4) + if !isAsync { + _, err = rootTask.GetResult(0) } - err = promotionHandler.ConfirmPromotion(infoId, "") - - return "", err + return rootTask.ID, err } func CreatePromotionByExcel(isAsync bool, promotionType int, fileHeader *multipart.FileHeader, userName string) (hint string, err error) { diff --git a/business/jxstore/promotion/promotion.go b/business/jxstore/promotion/promotion.go index e32c04f8c..0eda2ef73 100644 --- a/business/jxstore/promotion/promotion.go +++ b/business/jxstore/promotion/promotion.go @@ -64,7 +64,7 @@ func SendAdvertingByGoodsOrder(advertising string, days int, isAsync bool, userN mobileNumbers = append(mobileNumbers, "18180948107") smsClient := aliyunsmsclient.New("http://dysmsapi.aliyuncs.com/") - task := tasksch.RunManagedTask("SendAdvertingByGoodsOrder", true, nil, 0, MaxBatchSize, userName, func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + task := tasksch.RunManagedParallelTask("SendAdvertingByGoodsOrder", tasksch.NewParallelConfig().SetBatchSize(MaxBatchSize), userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { numbers := make([]string, len(batchItemList)) for k, v := range batchItemList { numbers[k] = v.(string) diff --git a/business/jxutils/tasksch/parallel_task.go b/business/jxutils/tasksch/parallel_task.go index ed6e6d3c5..91eb6a197 100644 --- a/business/jxutils/tasksch/parallel_task.go +++ b/business/jxutils/tasksch/parallel_task.go @@ -14,7 +14,7 @@ const ( MaxParallelCount = 10 ) -type WorkFunc func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) +type WorkFunc func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) type ResultHandlerFunc func(taskName string, result []interface{}, err error) type ParallelConfig struct { @@ -68,7 +68,7 @@ func (c *ParallelConfig) SetResultHandler(resultHandler ResultHandlerFunc) *Para return c } -func NewParallelTask(taskName string, userName string, config *ParallelConfig, worker WorkFunc, itemList interface{}, params ...interface{}) *ParallelTask { +func NewParallelTask(taskName string, config *ParallelConfig, userName string, worker WorkFunc, itemList interface{}, params ...interface{}) *ParallelTask { if config == nil { config = NewParallelConfig() } @@ -95,19 +95,8 @@ func NewParallelTask(taskName string, userName string, config *ParallelConfig, w return task } -func RunParallelTask(taskName string, userName string, config *ParallelConfig, worker WorkFunc, itemList interface{}, params ...interface{}) *ParallelTask { - task := NewParallelTask(taskName, userName, config, worker, itemList, params...) - task.Run() - return task -} - -func RunTask(taskName string, isContinueWhenError bool, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, worker WorkFunc, itemList interface{}, params ...interface{}) *ParallelTask { - config := NewParallelConfig() - config.BatchSize = batchSize - config.IsContinueWhenError = isContinueWhenError - config.ParallelCount = parallelCount - config.ResultHandler = resultHandler - task := NewParallelTask(taskName, userName, config, worker, itemList, params...) +func RunParallelTask(taskName string, config *ParallelConfig, userName string, worker WorkFunc, itemList interface{}, params ...interface{}) *ParallelTask { + task := NewParallelTask(taskName, config, userName, worker, itemList, params...) task.Run() return task } @@ -128,7 +117,7 @@ func (task *ParallelTask) Run() { chanRetVal = retVal goto end } else { - result, err := task.worker(job, task.params...) + result, err := task.worker(task, job, task.params...) // globals.SugarLogger.Debugf("ParallelTask.Run %s, after call worker result:%v, err:%v", task.Name, result, err) task.finishedOneJob(len(job), err) if err == nil { @@ -204,3 +193,8 @@ func (task *ParallelTask) Run() { } }) } + +func (t *ParallelTask) AddChild(task ITask) { + t.BaseTask.AddChild(task) + task.SetParent(t) +} diff --git a/business/jxutils/tasksch/parallel_task_test.go b/business/jxutils/tasksch/parallel_task_test.go index f489f3493..61e0fd828 100644 --- a/business/jxutils/tasksch/parallel_task_test.go +++ b/business/jxutils/tasksch/parallel_task_test.go @@ -8,15 +8,12 @@ import ( "git.rosy.net.cn/baseapi/utils" ) -func TestRunTask(t *testing.T) { +func TestRunParallelTask(t *testing.T) { itemList := make([]int, 100) for k := range itemList { itemList[k] = k } - task := RunTask("test", false, func(taskName string, result []interface{}, err error) { - // t.Log("finished here") - // t.Log(utils.Format4Output(result, false)) - }, 100, 7, "autotest", func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + task := RunParallelTask("test", NewParallelConfig().SetParallelCount(100).SetBatchSize(7), "autotest", func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { sleepSecond := rand.Intn(5) t.Logf("sleep %d seconds", sleepSecond) time.Sleep(time.Duration(sleepSecond) * time.Second) @@ -41,15 +38,12 @@ func TestRunTask(t *testing.T) { t.Log(task.GetStatus()) } -func TestCancelTask(t *testing.T) { +func TestCancelParallelTask(t *testing.T) { itemList := make([]int, 100) for k := range itemList { itemList[k] = k } - task := RunTask("test", false, func(taskName string, result []interface{}, err error) { - // t.Log("finished here") - // t.Log(utils.Format4Output(result, false)) - }, 100, 7, "autotest", func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + task := RunParallelTask("test", NewParallelConfig().SetParallelCount(100).SetBatchSize(7), "autotest", func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { sleepSecond := rand.Intn(5) t.Logf("sleep %d seconds", sleepSecond) time.Sleep(time.Duration(sleepSecond) * time.Second) diff --git a/business/jxutils/tasksch/sequence_task.go b/business/jxutils/tasksch/sequence_task.go index 9fe04f151..65f42364e 100644 --- a/business/jxutils/tasksch/sequence_task.go +++ b/business/jxutils/tasksch/sequence_task.go @@ -12,7 +12,7 @@ const ( StepEnd = "End" ) -type SeqWorkFunc func(step int, params ...interface{}) (result interface{}, err error) // 只有最后一次返回结果保留 +type SeqWorkFunc func(task *SeqTask, step int, params ...interface{}) (result interface{}, err error) type SeqTask struct { BaseTask @@ -27,6 +27,12 @@ func NewSeqTask(taskName string, userName string, worker SeqWorkFunc, stepCount return task } +func RunSeqTask(taskName string, userName string, worker SeqWorkFunc, stepCount int, params ...interface{}) *SeqTask { + task := NewSeqTask(taskName, userName, worker, stepCount, params...) + task.Run() + return task +} + func (task *SeqTask) Run() { task.run(func() { globals.SugarLogger.Debugf("SeqTask.Run %s", task.Name) @@ -38,7 +44,7 @@ func (task *SeqTask) Run() { goto EndFor default: } - result, err := task.worker(i, task.params...) + result, err := task.worker(task, i, task.params...) task.finishedOneJob(1, err) if taskErr = err; taskErr != nil { globals.SugarLogger.Infof("SeqTask.Run %s step:%d failed with error:%v", task.Name, i, err) @@ -72,3 +78,8 @@ func (task *SeqTask) Run() { close(task.quitChan) }) } + +func (t *SeqTask) AddChild(task ITask) { + t.BaseTask.AddChild(task) + task.SetParent(t) +} diff --git a/business/jxutils/tasksch/sequence_task_test.go b/business/jxutils/tasksch/sequence_task_test.go index fc3ea8d7b..e28709d30 100644 --- a/business/jxutils/tasksch/sequence_task_test.go +++ b/business/jxutils/tasksch/sequence_task_test.go @@ -11,11 +11,11 @@ import ( func TestRunSeqTask(t *testing.T) { var seqTask ITask - seqTask = NewSeqTask("TestSeqTask", "autotest", func(step int, params ...interface{}) (result interface{}, err error) { + seqTask = NewSeqTask("TestSeqTask", "autotest", func(task *SeqTask, step int, params ...interface{}) (result interface{}, err error) { switch step { case 0: fmt.Println("ONE") - task2 := NewParallelTask("hello", "xjh", nil, func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + task2 := NewParallelTask("hello", nil, "xjh", func(parallelTask *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { i := batchItemList[0].(int) time.Sleep(2 * time.Second) fmt.Println(i * 2) diff --git a/business/jxutils/tasksch/task.go b/business/jxutils/tasksch/task.go index 177401d32..afedf23fc 100644 --- a/business/jxutils/tasksch/task.go +++ b/business/jxutils/tasksch/task.go @@ -2,6 +2,7 @@ package tasksch import ( "encoding/json" + "fmt" "sync" "time" @@ -37,6 +38,7 @@ type ITask interface { AddChild(task ITask) GetChildren() TaskList + SetParent(parentTask ITask) json.Marshaler } @@ -69,6 +71,7 @@ type BaseTask struct { quitChan chan int locker sync.RWMutex + parent ITask } func (s TaskList) Len() int { @@ -199,6 +202,16 @@ func (t *BaseTask) GetChildren() (children TaskList) { return children } +func (t *BaseTask) SetParent(parentTask ITask) { + t.locker.Lock() + defer t.locker.Unlock() + + if t.parent != nil { + panic(fmt.Sprintf("task:%s already have parent!", utils.Format4Output(t, false))) + } + t.parent = parentTask +} + ///////// func (t *BaseTask) MarshalJSON() ([]byte, error) { diff --git a/business/jxutils/tasksch/task_man.go b/business/jxutils/tasksch/task_man.go index c117e4b0c..f42ee1847 100644 --- a/business/jxutils/tasksch/task_man.go +++ b/business/jxutils/tasksch/task_man.go @@ -18,8 +18,14 @@ func init() { defTaskMan.taskList = make(map[string]ITask) } -func (m *TaskMan) RunTask(taskName string, isContinueWhenError bool, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, worker WorkFunc, itemList interface{}, params ...interface{}) *ParallelTask { - task := RunTask(taskName, isContinueWhenError, resultHandler, parallelCount, batchSize, userName, worker, itemList, params...) +func (m *TaskMan) RunParallelTask(taskName string, config *ParallelConfig, userName string, worker WorkFunc, itemList interface{}, params ...interface{}) *ParallelTask { + task := RunParallelTask(taskName, config, userName, worker, itemList, params...) + m.ManageTask(task) + return task +} + +func (m *TaskMan) RunSeqTask(taskName string, userName string, worker SeqWorkFunc, stepCount int, params ...interface{}) *SeqTask { + task := RunSeqTask(taskName, userName, worker, stepCount, params...) m.ManageTask(task) return task } @@ -44,8 +50,12 @@ func (m *TaskMan) ManageTask(task ITask) ITask { return task } -func RunManagedTask(taskName string, isContinueWhenError bool, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, worker WorkFunc, itemList interface{}, params ...interface{}) *ParallelTask { - return defTaskMan.RunTask(taskName, isContinueWhenError, resultHandler, parallelCount, batchSize, userName, worker, itemList, params...) +func RunManagedParallelTask(taskName string, config *ParallelConfig, userName string, worker WorkFunc, itemList interface{}, params ...interface{}) *ParallelTask { + return defTaskMan.RunParallelTask(taskName, config, userName, worker, itemList, params...) +} + +func RunManagedSeqTask(taskName string, userName string, worker SeqWorkFunc, stepCount int, params ...interface{}) *SeqTask { + return defTaskMan.RunSeqTask(taskName, userName, worker, stepCount, params...) } func GetTasks(taskID string, fromStatus, toStatus int, lastHours int) (taskList TaskList) { diff --git a/business/jxutils/tasksch/task_man_test.go b/business/jxutils/tasksch/task_man_test.go index 59b19eebd..4aa6ce4c2 100644 --- a/business/jxutils/tasksch/task_man_test.go +++ b/business/jxutils/tasksch/task_man_test.go @@ -13,10 +13,11 @@ func TestTaskMan(t *testing.T) { for k := range itemList { itemList[k] = k } - task1 := RunManagedTask("test", false, func(taskName string, result []interface{}, err error) { + config1 := NewParallelConfig().SetResultHandler(func(taskName string, result []interface{}, err error) { // t.Log("finished here") // t.Log(utils.Format4Output(result, false)) - }, 100, 7, "autotest", func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + }).SetParallelCount(100).SetBatchSize(7) + task1 := RunManagedParallelTask("test", config1, "autotest", func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { sleepSecond := rand.Intn(5) t.Logf("sleep %d seconds", sleepSecond) time.Sleep(time.Duration(sleepSecond) * time.Second) @@ -27,10 +28,11 @@ func TestTaskMan(t *testing.T) { return retSlice, nil }, itemList, "a", "b", 1, 2) - task2 := RunManagedTask("test", false, func(taskName string, result []interface{}, err error) { + config2 := NewParallelConfig().SetResultHandler(func(taskName string, result []interface{}, err error) { // t.Log("finished here") // t.Log(utils.Format4Output(result, false)) - }, 100, 7, "autotest", func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + }).SetParallelCount(100).SetBatchSize(7) + task2 := RunManagedParallelTask("test", config2, "autotest", func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { sleepSecond := rand.Intn(5) t.Logf("sleep %d seconds", sleepSecond) time.Sleep(time.Duration(sleepSecond) * time.Second) diff --git a/business/partner/purchase/ebai/common.go b/business/partner/purchase/ebai/common.go index b86ca4748..ddb3ae2ec 100644 --- a/business/partner/purchase/ebai/common.go +++ b/business/partner/purchase/ebai/common.go @@ -11,7 +11,7 @@ import ( func (p *PurchaseHandler) UpdatePlaces() (err error) { provinces, err := api.EbaiAPI.CommonShopCities(0) if err == nil { - task := tasksch.RunTask("UpdatePlaces", false, nil, 0, 1, "", func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + task := tasksch.RunParallelTask("UpdatePlaces", nil, "test", func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { province := batchItemList[0].(*ebaiapi.CityInfo) retSlice := make([]*ebaiapi.CityInfo, 0) if province.IsOpen != 0 { diff --git a/business/partner/purchase/ebai/store_sku.go b/business/partner/purchase/ebai/store_sku.go index 71c2f3228..08d24fbb6 100644 --- a/business/partner/purchase/ebai/store_sku.go +++ b/business/partner/purchase/ebai/store_sku.go @@ -73,24 +73,26 @@ var ( func (p *PurchaseHandler) SyncStoresSkus(db *dao.DaoDB, storeIDs []int, skuIDs []int, isAsync bool, userName string) (hint string, err error) { if globals.EnableStoreWrite { - for _, storeID := range storeIDs { - err = p.syncOneStoreSkus(db, storeID, skuIDs, isAsync, userName) - if err != nil { - break - } + task := tasksch.RunSeqTask("ebai.SyncStoresSkus", userName, func(t *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { + storeID := storeIDs[step] + err = p.syncOneStoreSkus(t, db, storeID, skuIDs, false, userName) + return nil, err + }, len(storeIDs)) + hint = task.ID + if !isAsync { + _, err = task.GetResult(0) } } return hint, err } -func (p *PurchaseHandler) syncOneStoreSkus(db *dao.DaoDB, storeID int, skuIDs []int, isAsync bool, userName string) (err error) { +func (p *PurchaseHandler) syncOneStoreSkus(parentTask tasksch.ITask, db *dao.DaoDB, storeID int, skuIDs []int, isAsync bool, userName string) (err error) { globals.SugarLogger.Debugf("syncOneStoreSkus storeID:%d, skuIDs:%v, userName:%s", storeID, skuIDs, userName) - doThing := func() (err error) { - if err = p.syncOneStoreCategoriesFromRemote2Local(db, storeID, userName); err != nil { - return err - } - sql := ` + if err = p.syncOneStoreCategoriesFromRemote2Local(db, storeID, userName); err != nil { + return err + } + sql := ` SELECT t1.*, t2.spec_quality, t2.spec_unit, t2.weight, t2.status sku_status, t3.prefix, t3.name, t2.comment, t3.is_global, t3.unit, t3.img, t4.name cat_name, @@ -110,74 +112,68 @@ func (p *PurchaseHandler) syncOneStoreSkus(db *dao.DaoDB, storeID int, skuIDs [] WHERE t1.store_id = ? AND (t1.ebai_sync_status <> 0) ` - sqlParams := []interface{}{ - model.VendorIDEBAI, - model.VendorIDEBAI, - storeID, - } - if skuIDs != nil && len(skuIDs) > 0 { - sql += " AND t1.sku_id IN (" + dao.GenQuestionMarks(len(skuIDs)) + ")" - sqlParams = append(sqlParams, skuIDs) - } - strStoreID := utils.Int2Str(storeID) - var storeSkuInfoList []*tStoreSkuFullInfo - if err = dao.GetRows(db, &storeSkuInfoList, sql, sqlParams...); err == nil { - // globals.SugarLogger.Debug(utils.Format4Output(storeSkuInfoList, false)) - catList2Add := make(map[int]int) - for _, storeSku := range storeSkuInfoList { - if storeSku.EbaiSyncStatus&model.SyncFlagNewMask != 0 { - if storeSku.ParentCatEbaiID == 0 { - catList2Add[storeSku.ParentCatID] = 1 - } - if storeSku.CatEbaiID == 0 { - catList2Add[storeSku.CatID] = 1 - } + sqlParams := []interface{}{ + model.VendorIDEBAI, + model.VendorIDEBAI, + storeID, + } + if skuIDs != nil && len(skuIDs) > 0 { + sql += " AND t1.sku_id IN (" + dao.GenQuestionMarks(len(skuIDs)) + ")" + sqlParams = append(sqlParams, skuIDs) + } + strStoreID := utils.Int2Str(storeID) + var storeSkuInfoList []*tStoreSkuFullInfo + if err = dao.GetRows(db, &storeSkuInfoList, sql, sqlParams...); err == nil { + // globals.SugarLogger.Debug(utils.Format4Output(storeSkuInfoList, false)) + catList2Add := make(map[int]int) + for _, storeSku := range storeSkuInfoList { + if storeSku.EbaiSyncStatus&model.SyncFlagNewMask != 0 { + if storeSku.ParentCatEbaiID == 0 { + catList2Add[storeSku.ParentCatID] = 1 + } + if storeSku.CatEbaiID == 0 { + catList2Add[storeSku.CatID] = 1 } } - for k := range catList2Add { - if err = dao.AddStoreCategoryMap(db, storeID, k, model.VendorIDEBAI, "", model.SyncFlagNewMask, userName); err != nil { - return err - } - } - if err = p.SyncOneStoreCategories(db, storeID, userName); err != nil { + } + for k := range catList2Add { + if err = dao.AddStoreCategoryMap(db, storeID, k, model.VendorIDEBAI, "", model.SyncFlagNewMask, userName); err != nil { return err } - if err = dao.GetRows(db, &storeSkuInfoList, sql, sqlParams...); err == nil { - task := tasksch.RunManagedTask("syncOneStoreSkus skus", false, nil, 0, 1, userName, func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { - storeSku := batchItemList[0].(*tStoreSkuFullInfo) - updateFields := []string{model.FieldEbaiSyncStatus} - if storeSku.EbaiSyncStatus&model.SyncFlagDeletedMask != 0 { - err = api.EbaiAPI.SkuDelete(strStoreID, utils.Int64ToStr(storeSku.EbaiID)) - } else if storeSku.EbaiSyncStatus&model.SyncFlagNewMask != 0 { - // globals.SugarLogger.Debug(utils.Format4Output(genSkuParamsFromStoreSkuInfo(storeSku), false)) - if storeSku.EbaiID, err = api.EbaiAPI.SkuCreate(strStoreID, storeSku.SkuID, genSkuParamsFromStoreSkuInfo(storeSku)); err == nil { - // todo 创建SKU后马上绑定分类,会失败,待解决 - updateFields = append(updateFields, model.FieldEbaiID) - time.AfterFunc(3*time.Second, func() { - api.EbaiAPI.SkuShopCategoryMap(strStoreID, storeSku.EbaiID, utils.Int64ToStr(storeSku.CatEbaiID)) - }) - } - } else if storeSku.EbaiSyncStatus&model.SyncFlagModifiedMask != 0 { - if _, err = api.EbaiAPI.SkuUpdate(strStoreID, storeSku.EbaiID, genSkuParamsFromStoreSkuInfo(storeSku)); err == nil { - err = api.EbaiAPI.SkuShopCategoryMap(strStoreID, storeSku.EbaiID, utils.Int64ToStr(storeSku.CatEbaiID)) - } - } - - if err == nil { - storeSku.EbaiSyncStatus = 0 - _, err = dao.UpdateEntity(nil, &storeSku.StoreSkuBind, updateFields...) - } - return nil, err - }, storeSkuInfoList) - _, err = task.GetResult(0) - } } - return err - } - if !isAsync { - err = doThing() - } else { - go doThing() + if err = p.SyncOneStoreCategories(db, storeID, userName); err != nil { + return err + } + if err = dao.GetRows(db, &storeSkuInfoList, sql, sqlParams...); err == nil { + task := tasksch.RunParallelTask("syncOneStoreSkus skus", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + storeSku := batchItemList[0].(*tStoreSkuFullInfo) + updateFields := []string{model.FieldEbaiSyncStatus} + if storeSku.EbaiSyncStatus&model.SyncFlagDeletedMask != 0 { + err = api.EbaiAPI.SkuDelete(strStoreID, utils.Int64ToStr(storeSku.EbaiID)) + } else if storeSku.EbaiSyncStatus&model.SyncFlagNewMask != 0 { + // globals.SugarLogger.Debug(utils.Format4Output(genSkuParamsFromStoreSkuInfo(storeSku), false)) + if storeSku.EbaiID, err = api.EbaiAPI.SkuCreate(strStoreID, storeSku.SkuID, genSkuParamsFromStoreSkuInfo(storeSku)); err == nil { + // todo 创建SKU后马上绑定分类,会失败,待解决 + updateFields = append(updateFields, model.FieldEbaiID) + time.AfterFunc(3*time.Second, func() { + api.EbaiAPI.SkuShopCategoryMap(strStoreID, storeSku.EbaiID, utils.Int64ToStr(storeSku.CatEbaiID)) + }) + } + } else if storeSku.EbaiSyncStatus&model.SyncFlagModifiedMask != 0 { + if _, err = api.EbaiAPI.SkuUpdate(strStoreID, storeSku.EbaiID, genSkuParamsFromStoreSkuInfo(storeSku)); err == nil { + err = api.EbaiAPI.SkuShopCategoryMap(strStoreID, storeSku.EbaiID, utils.Int64ToStr(storeSku.CatEbaiID)) + } + } + + if err == nil { + storeSku.EbaiSyncStatus = 0 + _, err = dao.UpdateEntity(nil, &storeSku.StoreSkuBind, updateFields...) + } + return nil, err + }, storeSkuInfoList) + parentTask.AddChild(task) + _, err = task.GetResult(0) + } } return err } @@ -212,7 +208,7 @@ func (p *PurchaseHandler) GetAllRemoteSkus(storeID int) (skus []map[string]inter for i := 2; i <= page1.Pages; i++ { pages[i-2] = i } - task := tasksch.RunTask("GetAllRemoteSkus", false, nil, 0, 1, "", func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + task := tasksch.RunParallelTask("GetAllRemoteSkus", nil, "", func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { callParams := map[string]interface{}{ "pagesize": MaxPageSize, "page": batchItemList[0], @@ -245,7 +241,7 @@ func (p *PurchaseHandler) DeleteRemoteSkus(storeID int, vendorSkuIDs []string) ( } } } - task := tasksch.RunTask("DeleteRemoteSkus", false, nil, 0, 100, "", func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + task := tasksch.RunParallelTask("DeleteRemoteSkus", tasksch.NewParallelConfig().SetBatchSize(100), "", func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { strList := make([]string, len(batchItemList)) for k, v := range batchItemList { strList[k] = v.(string) @@ -267,7 +263,7 @@ func (p *PurchaseHandler) DeleteRemoteCategories(storeID int, vendorCatIDs []int } } } - task := tasksch.RunTask("DeleteRemoteCategories", false, nil, 0, 1, "", func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + task := tasksch.RunParallelTask("DeleteRemoteCategories", nil, "", func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { return nil, api.EbaiAPI.ShopCategoryDelete(strStoreID, batchItemList[0].(int64)) }, vendorCatIDs) _, err = task.GetResult(0) @@ -384,7 +380,7 @@ func (p *PurchaseHandler) SyncOneStoreCategories(db *dao.DaoDB, storeID int, use } if err = dao.GetRows(db, &catList, sql, sqlParams...); err == nil { strStoreID := utils.Int2Str(storeID) - task := tasksch.RunTask("syncOneStoreCategoriesFromLocal2Remote", false, nil, 0, 1, userName, func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + task := tasksch.RunParallelTask("syncOneStoreCategoriesFromLocal2Remote", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { updateFields := []string{model.FieldEbaiSyncStatus} catInfo := batchItemList[0].(*tStoreCatInfo) // globals.SugarLogger.Debug(utils.Format4Output(catInfo, false)) diff --git a/business/partner/purchase/jd/sku.go b/business/partner/purchase/jd/sku.go index a577b8b64..547416e63 100644 --- a/business/partner/purchase/jd/sku.go +++ b/business/partner/purchase/jd/sku.go @@ -269,7 +269,7 @@ func (p *PurchaseHandler) SyncSkusIDMap(db *dao.DaoDB, skuIDs []int, userName st if err = dao.GetRows(db, &skuPairs, sql, sqlParams); err == nil { // globals.SugarLogger.Debug(utils.Format4Output(skuPairs, false)) globals.SugarLogger.Debug(len(skuPairs)) - task := tasksch.RunTask("SyncSkusIDMap", true, nil, 10, 1, userName, func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + task := tasksch.RunParallelTask("SyncSkusIDMap", tasksch.NewParallelConfig().SetIsContinueWhenError(true), userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { skuPairs := make([]*jdapi.SkuIDPair, len(batchItemList)) for k, v := range batchItemList { pair := v.(*jdapi.SkuIDPair) diff --git a/business/partner/purchase/jd/store_sku.go b/business/partner/purchase/jd/store_sku.go index b94e065d9..505df782a 100644 --- a/business/partner/purchase/jd/store_sku.go +++ b/business/partner/purchase/jd/store_sku.go @@ -27,7 +27,7 @@ func (p *PurchaseHandler) SyncStoresSkus(db *dao.DaoDB, storeIDs []int, skuIDs [ if len(skuIDs) < MaxSkuBatchSize { parallelCount = 10 } - task := tasksch.RunManagedTask("SyncStoresSkus", false, nil, parallelCount, 1, userName, func(batchItemList []interface{}, params ...interface{}) (interface{}, error) { + task := tasksch.RunManagedParallelTask("SyncStoresSkus", tasksch.NewParallelConfig().SetParallelCount(parallelCount), userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { storeID := batchItemList[0].(int) sqlParams := []interface{}{ utils.DefaultTimeValue, @@ -47,7 +47,7 @@ func (p *PurchaseHandler) SyncStoresSkus(db *dao.DaoDB, storeIDs []int, skuIDs [ // globals.SugarLogger.Debug(sql, sqlParams) if err = dao.GetRows(db, &storeSkus, sql, sqlParams); err == nil { outStationNo := utils.Int2Str(storeID) - task := tasksch.RunTask("SyncStoresSkus inner", false, nil, 0, MaxSkuBatchSize, userName, func(batchItemList []interface{}, params ...interface{}) (interface{}, error) { + task := tasksch.RunParallelTask("SyncStoresSkus inner", tasksch.NewParallelConfig().SetBatchSize(MaxSkuBatchSize), userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { var skuPriceInfoList []*jdapi.SkuPriceInfo var skuVendibilityList []*jdapi.StockVendibility var skuStockList []*jdapi.SkuStock