From 5811d3cb68a6ac8a9b9e0f834cfeb3469203164f Mon Sep 17 00:00:00 2001 From: gazebo Date: Wed, 24 Oct 2018 11:11:40 +0800 Subject: [PATCH] - add parentTask. --- business/jxstore/cms/sync.go | 26 ++++++++++----------- business/jxstore/financial/financial.go | 2 +- business/jxstore/promotion/jd_promotion.go | 2 +- business/jxstore/promotion/promotion.go | 2 +- business/jxutils/jxcontext/jxcontext.go | 21 +++++++++++------ business/partner/partner.go | 7 +++--- business/partner/purchase/ebai/store.go | 4 ++-- business/partner/purchase/ebai/store_sku.go | 2 +- business/partner/purchase/elm/store.go | 3 ++- business/partner/purchase/elm/store_sku.go | 3 ++- business/partner/purchase/jd/sku.go | 4 ++-- business/partner/purchase/jd/store.go | 4 ++-- 12 files changed, 45 insertions(+), 35 deletions(-) diff --git a/business/jxstore/cms/sync.go b/business/jxstore/cms/sync.go index 3ecb269ab..8715c6ac2 100644 --- a/business/jxstore/cms/sync.go +++ b/business/jxstore/cms/sync.go @@ -120,7 +120,7 @@ func (v *VendorSync) GetSingleStoreHandler(vendorID int) partner.ISingleStoreHan return nil } -func (v *VendorSync) syncCategories(ctx *jxcontext.Context, multiStoresHandler partner.IMultipleStoresHandler, db *dao.DaoDB, cats []*model.SkuCategory, userName string) (err error) { +func (v *VendorSync) syncCategories(ctx *jxcontext.Context, parentTask tasksch.ITask, multiStoresHandler partner.IMultipleStoresHandler, db *dao.DaoDB, cats []*model.SkuCategory, userName string) (err error) { syncStatusFieldName := multiStoresHandler.GetFieldSyncStatusName() task := tasksch.NewParallelTask("syncCategories", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) { cat := batchItemList[0].(*model.SkuCategory) @@ -140,7 +140,7 @@ func (v *VendorSync) syncCategories(ctx *jxcontext.Context, multiStoresHandler p } return nil, err }, cats) - ctx.SetTaskOrAddChild(task) + ctx.SetTaskOrAddChild(task, parentTask) task.Run() _, err = task.GetResult(0) return err @@ -159,7 +159,7 @@ func (v *VendorSync) SyncCategory(ctx *jxcontext.Context, db *dao.DaoDB, categor } err := dao.GetEntitiesByKV(db, &cats, cond, true) if err == nil { - err = v.syncCategories(ctx, multiStoresHandler, db, cats, userName) + err = v.syncCategories(ctx, t, multiStoresHandler, db, cats, userName) } if err != nil || categoryID > 0 { return nil, err @@ -167,7 +167,7 @@ func (v *VendorSync) SyncCategory(ctx *jxcontext.Context, db *dao.DaoDB, categor cond[model.FieldLevel] = 2 err = dao.GetEntitiesByKV(db, &cats, cond, true) if err == nil { - err = v.syncCategories(ctx, multiStoresHandler, db, cats, userName) + err = v.syncCategories(ctx, t, multiStoresHandler, db, cats, userName) } return nil, err }) @@ -264,7 +264,7 @@ func (v *VendorSync) LoopStoreMap(ctx *jxcontext.Context, db *dao.DaoDB, taskNam storeMaps, err := GetStoreVendorMaps(ctx, db, storeID, -1) if err == nil { task := tasksch.NewParallelTask(taskName, nil, userName, handler, storeMaps) - ctx.SetTaskOrAddChild(task) + ctx.SetTaskOrAddChild(task, nil) tasksch.ManageTask(task).Run() hint = task.ID if !isAsync { @@ -279,7 +279,7 @@ func (v *VendorSync) LoopMultiStoresVendors(ctx *jxcontext.Context, db *dao.DaoD taskName = "LoopMultiStoresVendors" } task := tasksch.NewParallelTask(taskName, nil, userName, handler, v.MultiStoreVendorIDs) - ctx.SetTaskOrAddChild(task) + ctx.SetTaskOrAddChild(task, nil) tasksch.ManageTask(task).Run() if !isAsync { _, err = task.GetResult(0) @@ -312,7 +312,7 @@ func (v *VendorSync) LoopStoreVendors(ctx *jxcontext.Context, db *dao.DaoDB, ven allHandlers = allHandlers[:count] } task := tasksch.NewParallelTask(taskName, nil, userName, handler, allHandlers) - ctx.SetTaskOrAddChild(task) + ctx.SetTaskOrAddChild(task, nil) tasksch.ManageTask(task).Run() if !isAsync { _, err = task.GetResult(0) @@ -330,7 +330,7 @@ func (v *VendorSync) LoopSingleStoreVendors(ctx *jxcontext.Context, db *dao.DaoD FROM store_map WHERE vendor_id IN (`+dao.GenQuestionMarks(len(v.SingleStoreVendorIDs))+")", v.SingleStoreVendorIDs); err == nil { task := tasksch.NewParallelTask(taskName, nil, userName, handler, storeMaps) - ctx.SetTaskOrAddChild(task) + ctx.SetTaskOrAddChild(task, nil) tasksch.ManageTask(task).Run() hint = task.ID if !isAsync { @@ -345,14 +345,14 @@ func (v *VendorSync) RefreshAllSkusID(ctx *jxcontext.Context, isAsync bool, vend vendorID := batchItemList[0].(int) if handler := v.GetStoreHandler(vendorID); handler != nil { if multiHandler, ok := handler.(partner.IMultipleStoresHandler); ok { - _, err = multiHandler.RefreshAllSkusID(ctx, false) + _, err = multiHandler.RefreshAllSkusID(ctx, task, false) } else if singleHandler, ok := handler.(partner.ISingleStoreHandler); ok { - _, err = singleHandler.RefreshStoresAllSkusID(ctx, false, storeIDs) + _, err = singleHandler.RefreshStoresAllSkusID(ctx, task, false, storeIDs) } } return nil, err }, vendorIDs) - ctx.SetTaskOrAddChild(task) + ctx.SetTaskOrAddChild(task, nil) tasksch.ManageTask(task).Run() if !isAsync { _, err = task.GetResult(0) @@ -364,11 +364,11 @@ func (v *VendorSync) RefreshAllStoresID(ctx *jxcontext.Context, isAsync bool, ve task := tasksch.NewParallelTask("RefreshAllStoresID", nil, ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { vendorID := batchItemList[0].(int) if handler := v.GetStoreHandler(vendorID); handler != nil { - _, err = handler.RefreshAllStoresID(ctx, false) + _, err = handler.RefreshAllStoresID(ctx, task, false) } return nil, err }, vendorIDs) - ctx.SetTaskOrAddChild(task) + ctx.SetTaskOrAddChild(task, nil) tasksch.ManageTask(task).Run() if !isAsync { _, err = task.GetResult(0) diff --git a/business/jxstore/financial/financial.go b/business/jxstore/financial/financial.go index 524ddabf1..55545d880 100644 --- a/business/jxstore/financial/financial.go +++ b/business/jxstore/financial/financial.go @@ -83,7 +83,7 @@ func SendFilesToStores(ctx *jxcontext.Context, files []*multipart.FileHeader, is } return retVal, err }, fileList) - ctx.SetTaskOrAddChild(task) + ctx.SetTaskOrAddChild(task, nil) tasksch.ManageTask(task).Run() hint = task.ID if !isAsync { diff --git a/business/jxstore/promotion/jd_promotion.go b/business/jxstore/promotion/jd_promotion.go index c4c14b078..4e4336205 100644 --- a/business/jxstore/promotion/jd_promotion.go +++ b/business/jxstore/promotion/jd_promotion.go @@ -301,7 +301,7 @@ func CreateJdPromotion(ctx *jxcontext.Context, isIDJd bool, isAsync bool, params } return nil, err }, 4) - ctx.SetTaskOrAddChild(rootTask) + ctx.SetTaskOrAddChild(rootTask, nil) tasksch.ManageTask(rootTask).Run() if !isAsync { _, err = rootTask.GetResult(0) diff --git a/business/jxstore/promotion/promotion.go b/business/jxstore/promotion/promotion.go index 8df86eacf..2fb9f8039 100644 --- a/business/jxstore/promotion/promotion.go +++ b/business/jxstore/promotion/promotion.go @@ -79,7 +79,7 @@ func SendAdvertingByGoodsOrder(ctx *jxcontext.Context, advertising string, days } return nil, err }, mobileNumbers) - ctx.SetTaskOrAddChild(task) + ctx.SetTaskOrAddChild(task, nil) tasksch.ManageTask(task).Run() if !isAsync { diff --git a/business/jxutils/jxcontext/jxcontext.go b/business/jxutils/jxcontext/jxcontext.go index 4202c6c15..eb379f49e 100644 --- a/business/jxutils/jxcontext/jxcontext.go +++ b/business/jxutils/jxcontext/jxcontext.go @@ -60,17 +60,24 @@ func (ctx *Context) GetRootTask() tasksch.ITask { return ctx.rootTask } -func (ctx *Context) SetTaskOrAddChild(task tasksch.ITask) bool { - ctx.locker.Lock() - +func (ctx *Context) SetTaskOrAddChild(task tasksch.ITask, parentTask tasksch.ITask) bool { + if parentTask != nil { + parentTask.AddChild(task) + } else { + parentTask = task + } + ctx.locker.RLock() if ctx.rootTask == nil { - ctx.rootTask = task + ctx.locker.RUnlock() + + ctx.locker.Lock() + if ctx.rootTask == nil { + ctx.rootTask = parentTask + } ctx.locker.Unlock() return true } - ctx.locker.Unlock() - - ctx.rootTask.AddChild(task) + ctx.locker.RUnlock() return false } diff --git a/business/partner/partner.go b/business/partner/partner.go index b1f6ccd7f..db767b910 100644 --- a/business/partner/partner.go +++ b/business/partner/partner.go @@ -6,6 +6,7 @@ import ( "time" "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" + "git.rosy.net.cn/jx-callback/business/jxutils/tasksch" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model/dao" ) @@ -81,7 +82,7 @@ type IPurchasePlatformHandler interface { // CloseStore(vendorStoreID, closeNotice, userName string) error SyncStoresSkus(db *dao.DaoDB, storeIDs []int, skuIDs []int, isAsync bool, userName string) (hint string, err error) - RefreshAllStoresID(ctx *jxcontext.Context, isAsync bool) (hint string, err error) + RefreshAllStoresID(ctx *jxcontext.Context, parentTask tasksch.ITask, isAsync bool) (hint string, err error) GetVendorID() int GetFieldIDName() string @@ -104,7 +105,7 @@ type IMultipleStoresHandler interface { UpdateSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error) DeleteSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error) - RefreshAllSkusID(ctx *jxcontext.Context, isAsync bool) (hint string, err error) + RefreshAllSkusID(ctx *jxcontext.Context, parentTask tasksch.ITask, isAsync bool) (hint string, err error) } type ISingleStoreHandler interface { @@ -113,7 +114,7 @@ type ISingleStoreHandler interface { ReadStoreCategories(storeID int) (cats []*model.SkuCategory, err error) ReadStoreSku(storeID, skuID int) (skuNameExt *model.SkuNameExt, err error) - RefreshStoresAllSkusID(ctx *jxcontext.Context, isAsync bool, storeIDs []int) (hint string, err error) + RefreshStoresAllSkusID(ctx *jxcontext.Context, parentTask tasksch.ITask, isAsync bool, storeIDs []int) (hint string, err error) } type IDeliveryPlatformHandler interface { diff --git a/business/partner/purchase/ebai/store.go b/business/partner/purchase/ebai/store.go index d86e78516..96dd6d64f 100644 --- a/business/partner/purchase/ebai/store.go +++ b/business/partner/purchase/ebai/store.go @@ -181,7 +181,7 @@ func (p *PurchaseHandler) UpdateStore(db *dao.DaoDB, storeID int, userName strin return err } -func (p *PurchaseHandler) RefreshAllStoresID(ctx *jxcontext.Context, isAsync bool) (hint string, err error) { +func (p *PurchaseHandler) RefreshAllStoresID(ctx *jxcontext.Context, parentTask tasksch.ITask, isAsync bool) (hint string, err error) { globals.SugarLogger.Debugf("ebai RefreshAllStoresID") const batchSize = 50 const stepCount = 3 @@ -223,7 +223,7 @@ func (p *PurchaseHandler) RefreshAllStoresID(ctx *jxcontext.Context, isAsync boo return nil, err }, stepCount) - ctx.SetTaskOrAddChild(rootTask) + ctx.SetTaskOrAddChild(rootTask, parentTask) rootTask.Run() if !isAsync { _, err = rootTask.GetResult(0) diff --git a/business/partner/purchase/ebai/store_sku.go b/business/partner/purchase/ebai/store_sku.go index 8776a52f4..47f53ded0 100644 --- a/business/partner/purchase/ebai/store_sku.go +++ b/business/partner/purchase/ebai/store_sku.go @@ -276,7 +276,7 @@ func (p *PurchaseHandler) DeleteRemoteCategories(storeID int, vendorCatIDs []int return err } -func (p *PurchaseHandler) RefreshStoresAllSkusID(ctx *jxcontext.Context, isAsync bool, storeIDs []int) (hint string, err error) { +func (p *PurchaseHandler) RefreshStoresAllSkusID(ctx *jxcontext.Context, parentTask tasksch.ITask, isAsync bool, storeIDs []int) (hint string, err error) { return hint, err } diff --git a/business/partner/purchase/elm/store.go b/business/partner/purchase/elm/store.go index f2a751d24..9ffc29035 100644 --- a/business/partner/purchase/elm/store.go +++ b/business/partner/purchase/elm/store.go @@ -2,6 +2,7 @@ package elm import ( "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" + "git.rosy.net.cn/jx-callback/business/jxutils/tasksch" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model/dao" ) @@ -22,7 +23,7 @@ func (p *PurchaseHandler) UpdateStore(db *dao.DaoDB, storeID int, userName strin return nil } -func (p *PurchaseHandler) RefreshAllStoresID(ctx *jxcontext.Context, isAsync bool) (hint string, err error) { +func (p *PurchaseHandler) RefreshAllStoresID(ctx *jxcontext.Context, parentTask tasksch.ITask, isAsync bool) (hint string, err error) { return hint, err } diff --git a/business/partner/purchase/elm/store_sku.go b/business/partner/purchase/elm/store_sku.go index 7f37e428c..a3b097561 100644 --- a/business/partner/purchase/elm/store_sku.go +++ b/business/partner/purchase/elm/store_sku.go @@ -2,6 +2,7 @@ package elm import ( "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" + "git.rosy.net.cn/jx-callback/business/jxutils/tasksch" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model/dao" ) @@ -21,6 +22,6 @@ func (p *PurchaseHandler) SyncStoresSkus(db *dao.DaoDB, storeIDs []int, skuIDs [ return hint, err } -func (p *PurchaseHandler) RefreshStoresAllSkusID(ctx *jxcontext.Context, isAsync bool, storeIDs []int) (hint string, err error) { +func (p *PurchaseHandler) RefreshStoresAllSkusID(ctx *jxcontext.Context, parentTask tasksch.ITask, isAsync bool, storeIDs []int) (hint string, err error) { return hint, err } diff --git a/business/partner/purchase/jd/sku.go b/business/partner/purchase/jd/sku.go index 8f794b885..be6bea6db 100644 --- a/business/partner/purchase/jd/sku.go +++ b/business/partner/purchase/jd/sku.go @@ -254,7 +254,7 @@ func (p *PurchaseHandler) DeleteSku(db *dao.DaoDB, sku *model.Sku, userName stri return err } -func (p *PurchaseHandler) RefreshAllSkusID(ctx *jxcontext.Context, isAsync bool) (hint string, err error) { +func (p *PurchaseHandler) RefreshAllSkusID(ctx *jxcontext.Context, parentTask tasksch.ITask, isAsync bool) (hint string, err error) { globals.SugarLogger.Debugf("jd RefreshAllSkusID") db := dao.GetDB() @@ -295,7 +295,7 @@ func (p *PurchaseHandler) RefreshAllSkusID(ctx *jxcontext.Context, isAsync bool) } return nil, err }, 3) - ctx.SetTaskOrAddChild(rootTask) + ctx.SetTaskOrAddChild(rootTask, parentTask) rootTask.Run() if !isAsync { _, err = rootTask.GetResult(0) diff --git a/business/partner/purchase/jd/store.go b/business/partner/purchase/jd/store.go index 57b324935..e2c142457 100644 --- a/business/partner/purchase/jd/store.go +++ b/business/partner/purchase/jd/store.go @@ -190,7 +190,7 @@ func (p *PurchaseHandler) GetAllStoresFromRemote() ([]*model.Store, error) { return nil, err } -func (p *PurchaseHandler) RefreshAllStoresID(ctx *jxcontext.Context, isAsync bool) (hint string, err error) { +func (p *PurchaseHandler) RefreshAllStoresID(ctx *jxcontext.Context, parentTask tasksch.ITask, isAsync bool) (hint string, err error) { globals.SugarLogger.Debugf("jd RefreshAllStoresID") var stores []*tJdStoreInfo @@ -228,7 +228,7 @@ func (p *PurchaseHandler) RefreshAllStoresID(ctx *jxcontext.Context, isAsync boo return nil, err }, 3) - ctx.SetTaskOrAddChild(rootTask) + ctx.SetTaskOrAddChild(rootTask, parentTask) rootTask.Run() if !isAsync { _, err = rootTask.GetResult(0)