- remove RunParallelTask
- refactor partner.SyncStoreCategory.
This commit is contained in:
@@ -337,11 +337,10 @@ func (v *VendorSync) LoopStoresMap(ctx *jxcontext.Context, db *dao.DaoDB, taskNa
|
||||
task := tasksch.NewParallelTask(taskName, nil, ctx.GetUserName(), handler, loopInfoList)
|
||||
ctx.SetTaskOrAddChild(task, nil)
|
||||
tasksch.ManageTask(task).Run()
|
||||
hint = task.ID
|
||||
if !isAsync {
|
||||
_, err = task.GetResult(0)
|
||||
}
|
||||
return hint, makeSyncError(err)
|
||||
return task.ID, makeSyncError(err)
|
||||
}
|
||||
|
||||
func (v *VendorSync) LoopMultiStoresVendors(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync bool, userName string, handler tasksch.WorkFunc) (hint string, err error) {
|
||||
|
||||
@@ -95,12 +95,6 @@ func NewParallelTask(taskName string, config *ParallelConfig, userName string, w
|
||||
return task
|
||||
}
|
||||
|
||||
func RunParallelTask(taskName string, config *ParallelConfig, userName string, worker WorkFunc, itemList interface{}, params ...interface{}) *ParallelTask {
|
||||
task := NewParallelTask(taskName, config, userName, worker, itemList, params...)
|
||||
task.Run()
|
||||
return task
|
||||
}
|
||||
|
||||
func (task *ParallelTask) Run() {
|
||||
task.run(func() {
|
||||
globals.SugarLogger.Debugf("ParallelTask.Run %s", task.Name)
|
||||
|
||||
@@ -14,7 +14,7 @@ func TestRunParallelTask(t *testing.T) {
|
||||
for k := range itemList {
|
||||
itemList[k] = k
|
||||
}
|
||||
task := RunParallelTask("test", NewParallelConfig().SetParallelCount(100).SetBatchSize(7), "autotest", func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||||
task := NewParallelTask("test", NewParallelConfig().SetParallelCount(100).SetBatchSize(7), "autotest", func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||||
sleepSecond := rand.Intn(5)
|
||||
t.Logf("sleep %d seconds", sleepSecond)
|
||||
time.Sleep(time.Duration(sleepSecond) * time.Second)
|
||||
@@ -24,6 +24,7 @@ func TestRunParallelTask(t *testing.T) {
|
||||
}
|
||||
return retSlice, nil
|
||||
}, itemList, "a", "b", 1, 2)
|
||||
task.Run()
|
||||
result, err := task.GetResult(1 * time.Microsecond)
|
||||
if err == nil || task.GetStatus() != TaskStatusWorking {
|
||||
t.Fatal("task can not be done in 1 microsecond")
|
||||
@@ -44,7 +45,7 @@ func TestCancelParallelTask(t *testing.T) {
|
||||
for k := range itemList {
|
||||
itemList[k] = k
|
||||
}
|
||||
task := RunParallelTask("test", NewParallelConfig().SetParallelCount(100).SetBatchSize(7), "autotest", func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||||
task := NewParallelTask("test", NewParallelConfig().SetParallelCount(100).SetBatchSize(7), "autotest", func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||||
sleepSecond := rand.Intn(5)
|
||||
fmt.Printf("sleep %d seconds\n", sleepSecond)
|
||||
time.Sleep(time.Duration(sleepSecond) * time.Second)
|
||||
@@ -54,6 +55,7 @@ func TestCancelParallelTask(t *testing.T) {
|
||||
}
|
||||
return retSlice, nil
|
||||
}, itemList, "a", "b", 1, 2)
|
||||
task.Run()
|
||||
// time.Sleep(time.Second * 6)
|
||||
fmt.Printf("finishedItemCount:%d, finishedJobCount:%d\n", task.GetFinishedItemCount(), task.GetFinishedJobCount())
|
||||
task.Cancel()
|
||||
|
||||
@@ -27,12 +27,6 @@ func NewSeqTask(taskName string, userName string, worker SeqWorkFunc, stepCount
|
||||
return task
|
||||
}
|
||||
|
||||
func RunSeqTask(taskName string, userName string, worker SeqWorkFunc, stepCount int, params ...interface{}) *SeqTask {
|
||||
task := NewSeqTask(taskName, userName, worker, stepCount, params...)
|
||||
task.Run()
|
||||
return task
|
||||
}
|
||||
|
||||
func (task *SeqTask) Run() {
|
||||
task.run(func() {
|
||||
globals.SugarLogger.Debugf("SeqTask.Run %s", task.Name)
|
||||
|
||||
@@ -114,7 +114,7 @@ type IMultipleStoresHandler interface {
|
||||
|
||||
type ISingleStoreHandler interface {
|
||||
IPurchasePlatformHandler
|
||||
SyncStoresCategories(db *dao.DaoDB, storeIDs []int, userName string) (err error)
|
||||
SyncStoreCategory(ctx *jxcontext.Context, parentTask tasksch.ITask, storeID int, isAsync bool) (hint string, err error)
|
||||
ReadStoreCategories(storeID int) (cats []*model.SkuCategory, err error)
|
||||
|
||||
ReadStoreSku(storeID, skuID int) (skuNameExt *model.SkuNameExt, err error)
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
func (p *PurchaseHandler) UpdatePlaces() (err error) {
|
||||
provinces, err := api.EbaiAPI.CommonShopCities(0)
|
||||
if err == nil {
|
||||
task := tasksch.RunParallelTask("UpdatePlaces", nil, "test", func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||||
task := tasksch.NewParallelTask("UpdatePlaces", nil, "test", func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||||
province := batchItemList[0].(*ebaiapi.CityInfo)
|
||||
retSlice := make([]*ebaiapi.CityInfo, 0)
|
||||
if province.IsOpen != 0 {
|
||||
@@ -35,6 +35,7 @@ func (p *PurchaseHandler) UpdatePlaces() (err error) {
|
||||
}
|
||||
return retSlice, err
|
||||
}, provinces)
|
||||
task.Run()
|
||||
places, err2 := task.GetResult(0)
|
||||
if err = err2; err == nil {
|
||||
globals.SugarLogger.Debug(utils.Format4Output(places, false))
|
||||
|
||||
@@ -133,7 +133,7 @@ func (p *PurchaseHandler) SyncStoreSkus(ctx *jxcontext.Context, parentTask tasks
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
if err = p.SyncOneStoreCategories(db, storeID, userName); err != nil {
|
||||
if _, err = p.SyncStoreCategory(ctx, parentTask, storeID, false); err != nil {
|
||||
return "", err
|
||||
}
|
||||
if err = dao.GetRows(db, &storeSkuInfoList, sql, sqlParams...); err == nil {
|
||||
@@ -171,19 +171,6 @@ func (p *PurchaseHandler) SyncStoreSkus(ctx *jxcontext.Context, parentTask tasks
|
||||
return "", err
|
||||
}
|
||||
|
||||
func (p *PurchaseHandler) SyncStoresCategories(db *dao.DaoDB, storeIDs []int, userName string) (err error) {
|
||||
globals.SugarLogger.Debugf("SyncStoresCategories storeIDs:%d, userName:%s", storeIDs, userName)
|
||||
|
||||
if globals.EnableStoreWrite {
|
||||
for _, storeID := range storeIDs {
|
||||
if err = p.SyncOneStoreCategories(db, storeID, userName); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (p *PurchaseHandler) ReadStoreCategories(storeID int) (cats []*model.SkuCategory, err error) {
|
||||
return nil, err
|
||||
}
|
||||
@@ -201,7 +188,7 @@ func (p *PurchaseHandler) GetAllRemoteSkus(storeID int) (skus []map[string]inter
|
||||
for i := 2; i <= page1.Pages; i++ {
|
||||
pages[i-2] = i
|
||||
}
|
||||
task := tasksch.RunParallelTask("GetAllRemoteSkus", nil, "", func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||||
task := tasksch.NewParallelTask("GetAllRemoteSkus", nil, "", func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||||
callParams := map[string]interface{}{
|
||||
"pagesize": MaxPageSize,
|
||||
"page": batchItemList[0],
|
||||
@@ -213,6 +200,7 @@ func (p *PurchaseHandler) GetAllRemoteSkus(storeID int) (skus []map[string]inter
|
||||
globals.SugarLogger.Debug(utils.Format4Output(callParams, false))
|
||||
return nil, err2
|
||||
}, pages)
|
||||
task.Run()
|
||||
result, err2 := task.GetResult(0)
|
||||
if err = err2; err == nil {
|
||||
for _, v := range result {
|
||||
@@ -234,7 +222,7 @@ func (p *PurchaseHandler) DeleteRemoteSkus(storeID int, vendorSkuIDs []string) (
|
||||
}
|
||||
}
|
||||
}
|
||||
task := tasksch.RunParallelTask("DeleteRemoteSkus", tasksch.NewParallelConfig().SetBatchSize(100), "", func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||||
task := tasksch.NewParallelTask("DeleteRemoteSkus", tasksch.NewParallelConfig().SetBatchSize(100), "", func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||||
strList := make([]string, len(batchItemList))
|
||||
for k, v := range batchItemList {
|
||||
strList[k] = v.(string)
|
||||
@@ -244,6 +232,7 @@ func (p *PurchaseHandler) DeleteRemoteSkus(storeID int, vendorSkuIDs []string) (
|
||||
}
|
||||
return nil, err
|
||||
}, vendorSkuIDs)
|
||||
task.Run()
|
||||
_, err = task.GetResult(0)
|
||||
return err
|
||||
}
|
||||
@@ -259,12 +248,13 @@ func (p *PurchaseHandler) DeleteRemoteCategories(storeID int, vendorCatIDs []int
|
||||
}
|
||||
}
|
||||
}
|
||||
task := tasksch.RunParallelTask("DeleteRemoteCategories", nil, "", func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||||
task := tasksch.NewParallelTask("DeleteRemoteCategories", nil, "", func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||||
if globals.EnableStoreWrite {
|
||||
err = api.EbaiAPI.ShopCategoryDelete(strStoreID, batchItemList[0].(int64))
|
||||
}
|
||||
return nil, err
|
||||
}, vendorCatIDs)
|
||||
task.Run()
|
||||
_, err = task.GetResult(0)
|
||||
return err
|
||||
}
|
||||
@@ -365,10 +355,13 @@ func (p *PurchaseHandler) syncOneStoreCategoriesFromRemote2Local(db *dao.DaoDB,
|
||||
// 从本地同步分类信息到饿百
|
||||
// 测试过程中出现过,父分类创建成功后马上创建子分类会报没有父分类错
|
||||
// todo 对于deleted_at的处理有问题
|
||||
func (p *PurchaseHandler) SyncOneStoreCategories(db *dao.DaoDB, storeID int, userName string) (err error) {
|
||||
func (p *PurchaseHandler) SyncStoreCategory(ctx *jxcontext.Context, parentTask tasksch.ITask, storeID int, isAsync bool) (hint string, err error) {
|
||||
userName := ctx.GetUserName()
|
||||
globals.SugarLogger.Debugf("SyncOneStoreCategories storeID:%d, userName:%s", storeID, userName)
|
||||
|
||||
for level := 1; level <= 2; level++ {
|
||||
db := dao.GetDB()
|
||||
rootTask := tasksch.NewSeqTask("ebai SyncStoreCategory", userName, func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
|
||||
level := step + 1
|
||||
sql := `
|
||||
SELECT t2.*, t1.name, t1.parent_id, t1.level, t1.type, t1.seq, t2p.ebai_id parent_ebai_id
|
||||
FROM sku_category t1
|
||||
@@ -385,7 +378,7 @@ func (p *PurchaseHandler) SyncOneStoreCategories(db *dao.DaoDB, storeID int, use
|
||||
}
|
||||
if err = dao.GetRows(db, &catList, sql, sqlParams...); err == nil {
|
||||
strStoreID := utils.Int2Str(storeID)
|
||||
task := tasksch.RunParallelTask("syncOneStoreCategoriesFromLocal2Remote", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||||
task := tasksch.NewParallelTask("syncOneStoreCategoriesFromLocal2Remote", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||||
updateFields := []string{model.FieldEbaiSyncStatus}
|
||||
catInfo := batchItemList[0].(*tStoreCatInfo)
|
||||
// globals.SugarLogger.Debug(utils.Format4Output(catInfo, false))
|
||||
@@ -409,10 +402,16 @@ func (p *PurchaseHandler) SyncOneStoreCategories(db *dao.DaoDB, storeID int, use
|
||||
}
|
||||
return nil, err
|
||||
}, catList)
|
||||
rootTask.AddChild(task).Run()
|
||||
_, err = task.GetResult(0)
|
||||
}
|
||||
return nil, err
|
||||
}, 2)
|
||||
parentTask.AddChild(rootTask).Run()
|
||||
if !isAsync {
|
||||
_, err = rootTask.GetResult(0)
|
||||
}
|
||||
return err
|
||||
return rootTask.ID, err
|
||||
}
|
||||
|
||||
func (p *PurchaseHandler) processLocalCatByRemote(db *dao.DaoDB, storeID int, localCatMap map[string]*tStoreCatInfo, remoteCatList []*ebaiapi.CategoryInfo, userName string) (err error) {
|
||||
|
||||
@@ -4,11 +4,10 @@ import (
|
||||
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
|
||||
"git.rosy.net.cn/jx-callback/business/jxutils/tasksch"
|
||||
"git.rosy.net.cn/jx-callback/business/model"
|
||||
"git.rosy.net.cn/jx-callback/business/model/dao"
|
||||
)
|
||||
|
||||
func (p *PurchaseHandler) SyncStoresCategories(db *dao.DaoDB, storeIDs []int, userName string) (err error) {
|
||||
return nil
|
||||
func (p *PurchaseHandler) SyncStoreCategory(ctx *jxcontext.Context, parentTask tasksch.ITask, storeID int, isAsync bool) (hint string, err error) {
|
||||
return "", nil
|
||||
}
|
||||
func (p *PurchaseHandler) ReadStoreCategories(storeID int) (cats []*model.SkuCategory, err error) {
|
||||
return nil, nil
|
||||
|
||||
Reference in New Issue
Block a user