- add parentTask.

This commit is contained in:
gazebo
2018-10-24 11:11:40 +08:00
parent f050ec926b
commit 5811d3cb68
12 changed files with 45 additions and 35 deletions

View File

@@ -120,7 +120,7 @@ func (v *VendorSync) GetSingleStoreHandler(vendorID int) partner.ISingleStoreHan
return nil
}
func (v *VendorSync) syncCategories(ctx *jxcontext.Context, multiStoresHandler partner.IMultipleStoresHandler, db *dao.DaoDB, cats []*model.SkuCategory, userName string) (err error) {
func (v *VendorSync) syncCategories(ctx *jxcontext.Context, parentTask tasksch.ITask, multiStoresHandler partner.IMultipleStoresHandler, db *dao.DaoDB, cats []*model.SkuCategory, userName string) (err error) {
syncStatusFieldName := multiStoresHandler.GetFieldSyncStatusName()
task := tasksch.NewParallelTask("syncCategories", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
cat := batchItemList[0].(*model.SkuCategory)
@@ -140,7 +140,7 @@ func (v *VendorSync) syncCategories(ctx *jxcontext.Context, multiStoresHandler p
}
return nil, err
}, cats)
ctx.SetTaskOrAddChild(task)
ctx.SetTaskOrAddChild(task, parentTask)
task.Run()
_, err = task.GetResult(0)
return err
@@ -159,7 +159,7 @@ func (v *VendorSync) SyncCategory(ctx *jxcontext.Context, db *dao.DaoDB, categor
}
err := dao.GetEntitiesByKV(db, &cats, cond, true)
if err == nil {
err = v.syncCategories(ctx, multiStoresHandler, db, cats, userName)
err = v.syncCategories(ctx, t, multiStoresHandler, db, cats, userName)
}
if err != nil || categoryID > 0 {
return nil, err
@@ -167,7 +167,7 @@ func (v *VendorSync) SyncCategory(ctx *jxcontext.Context, db *dao.DaoDB, categor
cond[model.FieldLevel] = 2
err = dao.GetEntitiesByKV(db, &cats, cond, true)
if err == nil {
err = v.syncCategories(ctx, multiStoresHandler, db, cats, userName)
err = v.syncCategories(ctx, t, multiStoresHandler, db, cats, userName)
}
return nil, err
})
@@ -264,7 +264,7 @@ func (v *VendorSync) LoopStoreMap(ctx *jxcontext.Context, db *dao.DaoDB, taskNam
storeMaps, err := GetStoreVendorMaps(ctx, db, storeID, -1)
if err == nil {
task := tasksch.NewParallelTask(taskName, nil, userName, handler, storeMaps)
ctx.SetTaskOrAddChild(task)
ctx.SetTaskOrAddChild(task, nil)
tasksch.ManageTask(task).Run()
hint = task.ID
if !isAsync {
@@ -279,7 +279,7 @@ func (v *VendorSync) LoopMultiStoresVendors(ctx *jxcontext.Context, db *dao.DaoD
taskName = "LoopMultiStoresVendors"
}
task := tasksch.NewParallelTask(taskName, nil, userName, handler, v.MultiStoreVendorIDs)
ctx.SetTaskOrAddChild(task)
ctx.SetTaskOrAddChild(task, nil)
tasksch.ManageTask(task).Run()
if !isAsync {
_, err = task.GetResult(0)
@@ -312,7 +312,7 @@ func (v *VendorSync) LoopStoreVendors(ctx *jxcontext.Context, db *dao.DaoDB, ven
allHandlers = allHandlers[:count]
}
task := tasksch.NewParallelTask(taskName, nil, userName, handler, allHandlers)
ctx.SetTaskOrAddChild(task)
ctx.SetTaskOrAddChild(task, nil)
tasksch.ManageTask(task).Run()
if !isAsync {
_, err = task.GetResult(0)
@@ -330,7 +330,7 @@ func (v *VendorSync) LoopSingleStoreVendors(ctx *jxcontext.Context, db *dao.DaoD
FROM store_map
WHERE vendor_id IN (`+dao.GenQuestionMarks(len(v.SingleStoreVendorIDs))+")", v.SingleStoreVendorIDs); err == nil {
task := tasksch.NewParallelTask(taskName, nil, userName, handler, storeMaps)
ctx.SetTaskOrAddChild(task)
ctx.SetTaskOrAddChild(task, nil)
tasksch.ManageTask(task).Run()
hint = task.ID
if !isAsync {
@@ -345,14 +345,14 @@ func (v *VendorSync) RefreshAllSkusID(ctx *jxcontext.Context, isAsync bool, vend
vendorID := batchItemList[0].(int)
if handler := v.GetStoreHandler(vendorID); handler != nil {
if multiHandler, ok := handler.(partner.IMultipleStoresHandler); ok {
_, err = multiHandler.RefreshAllSkusID(ctx, false)
_, err = multiHandler.RefreshAllSkusID(ctx, task, false)
} else if singleHandler, ok := handler.(partner.ISingleStoreHandler); ok {
_, err = singleHandler.RefreshStoresAllSkusID(ctx, false, storeIDs)
_, err = singleHandler.RefreshStoresAllSkusID(ctx, task, false, storeIDs)
}
}
return nil, err
}, vendorIDs)
ctx.SetTaskOrAddChild(task)
ctx.SetTaskOrAddChild(task, nil)
tasksch.ManageTask(task).Run()
if !isAsync {
_, err = task.GetResult(0)
@@ -364,11 +364,11 @@ func (v *VendorSync) RefreshAllStoresID(ctx *jxcontext.Context, isAsync bool, ve
task := tasksch.NewParallelTask("RefreshAllStoresID", nil, ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
vendorID := batchItemList[0].(int)
if handler := v.GetStoreHandler(vendorID); handler != nil {
_, err = handler.RefreshAllStoresID(ctx, false)
_, err = handler.RefreshAllStoresID(ctx, task, false)
}
return nil, err
}, vendorIDs)
ctx.SetTaskOrAddChild(task)
ctx.SetTaskOrAddChild(task, nil)
tasksch.ManageTask(task).Run()
if !isAsync {
_, err = task.GetResult(0)

View File

@@ -83,7 +83,7 @@ func SendFilesToStores(ctx *jxcontext.Context, files []*multipart.FileHeader, is
}
return retVal, err
}, fileList)
ctx.SetTaskOrAddChild(task)
ctx.SetTaskOrAddChild(task, nil)
tasksch.ManageTask(task).Run()
hint = task.ID
if !isAsync {

View File

@@ -301,7 +301,7 @@ func CreateJdPromotion(ctx *jxcontext.Context, isIDJd bool, isAsync bool, params
}
return nil, err
}, 4)
ctx.SetTaskOrAddChild(rootTask)
ctx.SetTaskOrAddChild(rootTask, nil)
tasksch.ManageTask(rootTask).Run()
if !isAsync {
_, err = rootTask.GetResult(0)

View File

@@ -79,7 +79,7 @@ func SendAdvertingByGoodsOrder(ctx *jxcontext.Context, advertising string, days
}
return nil, err
}, mobileNumbers)
ctx.SetTaskOrAddChild(task)
ctx.SetTaskOrAddChild(task, nil)
tasksch.ManageTask(task).Run()
if !isAsync {

View File

@@ -60,17 +60,24 @@ func (ctx *Context) GetRootTask() tasksch.ITask {
return ctx.rootTask
}
func (ctx *Context) SetTaskOrAddChild(task tasksch.ITask) bool {
ctx.locker.Lock()
func (ctx *Context) SetTaskOrAddChild(task tasksch.ITask, parentTask tasksch.ITask) bool {
if parentTask != nil {
parentTask.AddChild(task)
} else {
parentTask = task
}
ctx.locker.RLock()
if ctx.rootTask == nil {
ctx.rootTask = task
ctx.locker.RUnlock()
ctx.locker.Lock()
if ctx.rootTask == nil {
ctx.rootTask = parentTask
}
ctx.locker.Unlock()
return true
}
ctx.locker.Unlock()
ctx.rootTask.AddChild(task)
ctx.locker.RUnlock()
return false
}

View File

@@ -6,6 +6,7 @@ import (
"time"
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
"git.rosy.net.cn/jx-callback/business/jxutils/tasksch"
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/model/dao"
)
@@ -81,7 +82,7 @@ type IPurchasePlatformHandler interface {
// CloseStore(vendorStoreID, closeNotice, userName string) error
SyncStoresSkus(db *dao.DaoDB, storeIDs []int, skuIDs []int, isAsync bool, userName string) (hint string, err error)
RefreshAllStoresID(ctx *jxcontext.Context, isAsync bool) (hint string, err error)
RefreshAllStoresID(ctx *jxcontext.Context, parentTask tasksch.ITask, isAsync bool) (hint string, err error)
GetVendorID() int
GetFieldIDName() string
@@ -104,7 +105,7 @@ type IMultipleStoresHandler interface {
UpdateSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error)
DeleteSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error)
RefreshAllSkusID(ctx *jxcontext.Context, isAsync bool) (hint string, err error)
RefreshAllSkusID(ctx *jxcontext.Context, parentTask tasksch.ITask, isAsync bool) (hint string, err error)
}
type ISingleStoreHandler interface {
@@ -113,7 +114,7 @@ type ISingleStoreHandler interface {
ReadStoreCategories(storeID int) (cats []*model.SkuCategory, err error)
ReadStoreSku(storeID, skuID int) (skuNameExt *model.SkuNameExt, err error)
RefreshStoresAllSkusID(ctx *jxcontext.Context, isAsync bool, storeIDs []int) (hint string, err error)
RefreshStoresAllSkusID(ctx *jxcontext.Context, parentTask tasksch.ITask, isAsync bool, storeIDs []int) (hint string, err error)
}
type IDeliveryPlatformHandler interface {

View File

@@ -181,7 +181,7 @@ func (p *PurchaseHandler) UpdateStore(db *dao.DaoDB, storeID int, userName strin
return err
}
func (p *PurchaseHandler) RefreshAllStoresID(ctx *jxcontext.Context, isAsync bool) (hint string, err error) {
func (p *PurchaseHandler) RefreshAllStoresID(ctx *jxcontext.Context, parentTask tasksch.ITask, isAsync bool) (hint string, err error) {
globals.SugarLogger.Debugf("ebai RefreshAllStoresID")
const batchSize = 50
const stepCount = 3
@@ -223,7 +223,7 @@ func (p *PurchaseHandler) RefreshAllStoresID(ctx *jxcontext.Context, isAsync boo
return nil, err
}, stepCount)
ctx.SetTaskOrAddChild(rootTask)
ctx.SetTaskOrAddChild(rootTask, parentTask)
rootTask.Run()
if !isAsync {
_, err = rootTask.GetResult(0)

View File

@@ -276,7 +276,7 @@ func (p *PurchaseHandler) DeleteRemoteCategories(storeID int, vendorCatIDs []int
return err
}
func (p *PurchaseHandler) RefreshStoresAllSkusID(ctx *jxcontext.Context, isAsync bool, storeIDs []int) (hint string, err error) {
func (p *PurchaseHandler) RefreshStoresAllSkusID(ctx *jxcontext.Context, parentTask tasksch.ITask, isAsync bool, storeIDs []int) (hint string, err error) {
return hint, err
}

View File

@@ -2,6 +2,7 @@ package elm
import (
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
"git.rosy.net.cn/jx-callback/business/jxutils/tasksch"
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/model/dao"
)
@@ -22,7 +23,7 @@ func (p *PurchaseHandler) UpdateStore(db *dao.DaoDB, storeID int, userName strin
return nil
}
func (p *PurchaseHandler) RefreshAllStoresID(ctx *jxcontext.Context, isAsync bool) (hint string, err error) {
func (p *PurchaseHandler) RefreshAllStoresID(ctx *jxcontext.Context, parentTask tasksch.ITask, isAsync bool) (hint string, err error) {
return hint, err
}

View File

@@ -2,6 +2,7 @@ package elm
import (
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
"git.rosy.net.cn/jx-callback/business/jxutils/tasksch"
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/model/dao"
)
@@ -21,6 +22,6 @@ func (p *PurchaseHandler) SyncStoresSkus(db *dao.DaoDB, storeIDs []int, skuIDs [
return hint, err
}
func (p *PurchaseHandler) RefreshStoresAllSkusID(ctx *jxcontext.Context, isAsync bool, storeIDs []int) (hint string, err error) {
func (p *PurchaseHandler) RefreshStoresAllSkusID(ctx *jxcontext.Context, parentTask tasksch.ITask, isAsync bool, storeIDs []int) (hint string, err error) {
return hint, err
}

View File

@@ -254,7 +254,7 @@ func (p *PurchaseHandler) DeleteSku(db *dao.DaoDB, sku *model.Sku, userName stri
return err
}
func (p *PurchaseHandler) RefreshAllSkusID(ctx *jxcontext.Context, isAsync bool) (hint string, err error) {
func (p *PurchaseHandler) RefreshAllSkusID(ctx *jxcontext.Context, parentTask tasksch.ITask, isAsync bool) (hint string, err error) {
globals.SugarLogger.Debugf("jd RefreshAllSkusID")
db := dao.GetDB()
@@ -295,7 +295,7 @@ func (p *PurchaseHandler) RefreshAllSkusID(ctx *jxcontext.Context, isAsync bool)
}
return nil, err
}, 3)
ctx.SetTaskOrAddChild(rootTask)
ctx.SetTaskOrAddChild(rootTask, parentTask)
rootTask.Run()
if !isAsync {
_, err = rootTask.GetResult(0)

View File

@@ -190,7 +190,7 @@ func (p *PurchaseHandler) GetAllStoresFromRemote() ([]*model.Store, error) {
return nil, err
}
func (p *PurchaseHandler) RefreshAllStoresID(ctx *jxcontext.Context, isAsync bool) (hint string, err error) {
func (p *PurchaseHandler) RefreshAllStoresID(ctx *jxcontext.Context, parentTask tasksch.ITask, isAsync bool) (hint string, err error) {
globals.SugarLogger.Debugf("jd RefreshAllStoresID")
var stores []*tJdStoreInfo
@@ -228,7 +228,7 @@ func (p *PurchaseHandler) RefreshAllStoresID(ctx *jxcontext.Context, isAsync boo
return nil, err
}, 3)
ctx.SetTaskOrAddChild(rootTask)
ctx.SetTaskOrAddChild(rootTask, parentTask)
rootTask.Run()
if !isAsync {
_, err = rootTask.GetResult(0)