- big refactor tasksch

This commit is contained in:
gazebo
2018-10-20 15:05:02 +08:00
parent 2bfa5fe17f
commit 0d5a5189ed
15 changed files with 210 additions and 184 deletions

View File

@@ -11,7 +11,7 @@ import (
func (p *PurchaseHandler) UpdatePlaces() (err error) {
provinces, err := api.EbaiAPI.CommonShopCities(0)
if err == nil {
task := tasksch.RunTask("UpdatePlaces", false, nil, 0, 1, "", func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
task := tasksch.RunParallelTask("UpdatePlaces", nil, "test", func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
province := batchItemList[0].(*ebaiapi.CityInfo)
retSlice := make([]*ebaiapi.CityInfo, 0)
if province.IsOpen != 0 {

View File

@@ -73,24 +73,26 @@ var (
func (p *PurchaseHandler) SyncStoresSkus(db *dao.DaoDB, storeIDs []int, skuIDs []int, isAsync bool, userName string) (hint string, err error) {
if globals.EnableStoreWrite {
for _, storeID := range storeIDs {
err = p.syncOneStoreSkus(db, storeID, skuIDs, isAsync, userName)
if err != nil {
break
}
task := tasksch.RunSeqTask("ebai.SyncStoresSkus", userName, func(t *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
storeID := storeIDs[step]
err = p.syncOneStoreSkus(t, db, storeID, skuIDs, false, userName)
return nil, err
}, len(storeIDs))
hint = task.ID
if !isAsync {
_, err = task.GetResult(0)
}
}
return hint, err
}
func (p *PurchaseHandler) syncOneStoreSkus(db *dao.DaoDB, storeID int, skuIDs []int, isAsync bool, userName string) (err error) {
func (p *PurchaseHandler) syncOneStoreSkus(parentTask tasksch.ITask, db *dao.DaoDB, storeID int, skuIDs []int, isAsync bool, userName string) (err error) {
globals.SugarLogger.Debugf("syncOneStoreSkus storeID:%d, skuIDs:%v, userName:%s", storeID, skuIDs, userName)
doThing := func() (err error) {
if err = p.syncOneStoreCategoriesFromRemote2Local(db, storeID, userName); err != nil {
return err
}
sql := `
if err = p.syncOneStoreCategoriesFromRemote2Local(db, storeID, userName); err != nil {
return err
}
sql := `
SELECT t1.*, t2.spec_quality, t2.spec_unit, t2.weight, t2.status sku_status,
t3.prefix, t3.name, t2.comment, t3.is_global, t3.unit, t3.img,
t4.name cat_name,
@@ -110,74 +112,68 @@ func (p *PurchaseHandler) syncOneStoreSkus(db *dao.DaoDB, storeID int, skuIDs []
WHERE t1.store_id = ? AND (t1.ebai_sync_status <> 0)
`
sqlParams := []interface{}{
model.VendorIDEBAI,
model.VendorIDEBAI,
storeID,
}
if skuIDs != nil && len(skuIDs) > 0 {
sql += " AND t1.sku_id IN (" + dao.GenQuestionMarks(len(skuIDs)) + ")"
sqlParams = append(sqlParams, skuIDs)
}
strStoreID := utils.Int2Str(storeID)
var storeSkuInfoList []*tStoreSkuFullInfo
if err = dao.GetRows(db, &storeSkuInfoList, sql, sqlParams...); err == nil {
// globals.SugarLogger.Debug(utils.Format4Output(storeSkuInfoList, false))
catList2Add := make(map[int]int)
for _, storeSku := range storeSkuInfoList {
if storeSku.EbaiSyncStatus&model.SyncFlagNewMask != 0 {
if storeSku.ParentCatEbaiID == 0 {
catList2Add[storeSku.ParentCatID] = 1
}
if storeSku.CatEbaiID == 0 {
catList2Add[storeSku.CatID] = 1
}
sqlParams := []interface{}{
model.VendorIDEBAI,
model.VendorIDEBAI,
storeID,
}
if skuIDs != nil && len(skuIDs) > 0 {
sql += " AND t1.sku_id IN (" + dao.GenQuestionMarks(len(skuIDs)) + ")"
sqlParams = append(sqlParams, skuIDs)
}
strStoreID := utils.Int2Str(storeID)
var storeSkuInfoList []*tStoreSkuFullInfo
if err = dao.GetRows(db, &storeSkuInfoList, sql, sqlParams...); err == nil {
// globals.SugarLogger.Debug(utils.Format4Output(storeSkuInfoList, false))
catList2Add := make(map[int]int)
for _, storeSku := range storeSkuInfoList {
if storeSku.EbaiSyncStatus&model.SyncFlagNewMask != 0 {
if storeSku.ParentCatEbaiID == 0 {
catList2Add[storeSku.ParentCatID] = 1
}
if storeSku.CatEbaiID == 0 {
catList2Add[storeSku.CatID] = 1
}
}
for k := range catList2Add {
if err = dao.AddStoreCategoryMap(db, storeID, k, model.VendorIDEBAI, "", model.SyncFlagNewMask, userName); err != nil {
return err
}
}
if err = p.SyncOneStoreCategories(db, storeID, userName); err != nil {
}
for k := range catList2Add {
if err = dao.AddStoreCategoryMap(db, storeID, k, model.VendorIDEBAI, "", model.SyncFlagNewMask, userName); err != nil {
return err
}
if err = dao.GetRows(db, &storeSkuInfoList, sql, sqlParams...); err == nil {
task := tasksch.RunManagedTask("syncOneStoreSkus skus", false, nil, 0, 1, userName, func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
storeSku := batchItemList[0].(*tStoreSkuFullInfo)
updateFields := []string{model.FieldEbaiSyncStatus}
if storeSku.EbaiSyncStatus&model.SyncFlagDeletedMask != 0 {
err = api.EbaiAPI.SkuDelete(strStoreID, utils.Int64ToStr(storeSku.EbaiID))
} else if storeSku.EbaiSyncStatus&model.SyncFlagNewMask != 0 {
// globals.SugarLogger.Debug(utils.Format4Output(genSkuParamsFromStoreSkuInfo(storeSku), false))
if storeSku.EbaiID, err = api.EbaiAPI.SkuCreate(strStoreID, storeSku.SkuID, genSkuParamsFromStoreSkuInfo(storeSku)); err == nil {
// todo 创建SKU后马上绑定分类会失败待解决
updateFields = append(updateFields, model.FieldEbaiID)
time.AfterFunc(3*time.Second, func() {
api.EbaiAPI.SkuShopCategoryMap(strStoreID, storeSku.EbaiID, utils.Int64ToStr(storeSku.CatEbaiID))
})
}
} else if storeSku.EbaiSyncStatus&model.SyncFlagModifiedMask != 0 {
if _, err = api.EbaiAPI.SkuUpdate(strStoreID, storeSku.EbaiID, genSkuParamsFromStoreSkuInfo(storeSku)); err == nil {
err = api.EbaiAPI.SkuShopCategoryMap(strStoreID, storeSku.EbaiID, utils.Int64ToStr(storeSku.CatEbaiID))
}
}
if err == nil {
storeSku.EbaiSyncStatus = 0
_, err = dao.UpdateEntity(nil, &storeSku.StoreSkuBind, updateFields...)
}
return nil, err
}, storeSkuInfoList)
_, err = task.GetResult(0)
}
}
return err
}
if !isAsync {
err = doThing()
} else {
go doThing()
if err = p.SyncOneStoreCategories(db, storeID, userName); err != nil {
return err
}
if err = dao.GetRows(db, &storeSkuInfoList, sql, sqlParams...); err == nil {
task := tasksch.RunParallelTask("syncOneStoreSkus skus", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
storeSku := batchItemList[0].(*tStoreSkuFullInfo)
updateFields := []string{model.FieldEbaiSyncStatus}
if storeSku.EbaiSyncStatus&model.SyncFlagDeletedMask != 0 {
err = api.EbaiAPI.SkuDelete(strStoreID, utils.Int64ToStr(storeSku.EbaiID))
} else if storeSku.EbaiSyncStatus&model.SyncFlagNewMask != 0 {
// globals.SugarLogger.Debug(utils.Format4Output(genSkuParamsFromStoreSkuInfo(storeSku), false))
if storeSku.EbaiID, err = api.EbaiAPI.SkuCreate(strStoreID, storeSku.SkuID, genSkuParamsFromStoreSkuInfo(storeSku)); err == nil {
// todo 创建SKU后马上绑定分类会失败待解决
updateFields = append(updateFields, model.FieldEbaiID)
time.AfterFunc(3*time.Second, func() {
api.EbaiAPI.SkuShopCategoryMap(strStoreID, storeSku.EbaiID, utils.Int64ToStr(storeSku.CatEbaiID))
})
}
} else if storeSku.EbaiSyncStatus&model.SyncFlagModifiedMask != 0 {
if _, err = api.EbaiAPI.SkuUpdate(strStoreID, storeSku.EbaiID, genSkuParamsFromStoreSkuInfo(storeSku)); err == nil {
err = api.EbaiAPI.SkuShopCategoryMap(strStoreID, storeSku.EbaiID, utils.Int64ToStr(storeSku.CatEbaiID))
}
}
if err == nil {
storeSku.EbaiSyncStatus = 0
_, err = dao.UpdateEntity(nil, &storeSku.StoreSkuBind, updateFields...)
}
return nil, err
}, storeSkuInfoList)
parentTask.AddChild(task)
_, err = task.GetResult(0)
}
}
return err
}
@@ -212,7 +208,7 @@ func (p *PurchaseHandler) GetAllRemoteSkus(storeID int) (skus []map[string]inter
for i := 2; i <= page1.Pages; i++ {
pages[i-2] = i
}
task := tasksch.RunTask("GetAllRemoteSkus", false, nil, 0, 1, "", func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
task := tasksch.RunParallelTask("GetAllRemoteSkus", nil, "", func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
callParams := map[string]interface{}{
"pagesize": MaxPageSize,
"page": batchItemList[0],
@@ -245,7 +241,7 @@ func (p *PurchaseHandler) DeleteRemoteSkus(storeID int, vendorSkuIDs []string) (
}
}
}
task := tasksch.RunTask("DeleteRemoteSkus", false, nil, 0, 100, "", func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
task := tasksch.RunParallelTask("DeleteRemoteSkus", tasksch.NewParallelConfig().SetBatchSize(100), "", func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
strList := make([]string, len(batchItemList))
for k, v := range batchItemList {
strList[k] = v.(string)
@@ -267,7 +263,7 @@ func (p *PurchaseHandler) DeleteRemoteCategories(storeID int, vendorCatIDs []int
}
}
}
task := tasksch.RunTask("DeleteRemoteCategories", false, nil, 0, 1, "", func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
task := tasksch.RunParallelTask("DeleteRemoteCategories", nil, "", func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
return nil, api.EbaiAPI.ShopCategoryDelete(strStoreID, batchItemList[0].(int64))
}, vendorCatIDs)
_, err = task.GetResult(0)
@@ -384,7 +380,7 @@ func (p *PurchaseHandler) SyncOneStoreCategories(db *dao.DaoDB, storeID int, use
}
if err = dao.GetRows(db, &catList, sql, sqlParams...); err == nil {
strStoreID := utils.Int2Str(storeID)
task := tasksch.RunTask("syncOneStoreCategoriesFromLocal2Remote", false, nil, 0, 1, userName, func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
task := tasksch.RunParallelTask("syncOneStoreCategoriesFromLocal2Remote", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
updateFields := []string{model.FieldEbaiSyncStatus}
catInfo := batchItemList[0].(*tStoreCatInfo)
// globals.SugarLogger.Debug(utils.Format4Output(catInfo, false))

View File

@@ -269,7 +269,7 @@ func (p *PurchaseHandler) SyncSkusIDMap(db *dao.DaoDB, skuIDs []int, userName st
if err = dao.GetRows(db, &skuPairs, sql, sqlParams); err == nil {
// globals.SugarLogger.Debug(utils.Format4Output(skuPairs, false))
globals.SugarLogger.Debug(len(skuPairs))
task := tasksch.RunTask("SyncSkusIDMap", true, nil, 10, 1, userName, func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
task := tasksch.RunParallelTask("SyncSkusIDMap", tasksch.NewParallelConfig().SetIsContinueWhenError(true), userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
skuPairs := make([]*jdapi.SkuIDPair, len(batchItemList))
for k, v := range batchItemList {
pair := v.(*jdapi.SkuIDPair)

View File

@@ -27,7 +27,7 @@ func (p *PurchaseHandler) SyncStoresSkus(db *dao.DaoDB, storeIDs []int, skuIDs [
if len(skuIDs) < MaxSkuBatchSize {
parallelCount = 10
}
task := tasksch.RunManagedTask("SyncStoresSkus", false, nil, parallelCount, 1, userName, func(batchItemList []interface{}, params ...interface{}) (interface{}, error) {
task := tasksch.RunManagedParallelTask("SyncStoresSkus", tasksch.NewParallelConfig().SetParallelCount(parallelCount), userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
storeID := batchItemList[0].(int)
sqlParams := []interface{}{
utils.DefaultTimeValue,
@@ -47,7 +47,7 @@ func (p *PurchaseHandler) SyncStoresSkus(db *dao.DaoDB, storeIDs []int, skuIDs [
// globals.SugarLogger.Debug(sql, sqlParams)
if err = dao.GetRows(db, &storeSkus, sql, sqlParams); err == nil {
outStationNo := utils.Int2Str(storeID)
task := tasksch.RunTask("SyncStoresSkus inner", false, nil, 0, MaxSkuBatchSize, userName, func(batchItemList []interface{}, params ...interface{}) (interface{}, error) {
task := tasksch.RunParallelTask("SyncStoresSkus inner", tasksch.NewParallelConfig().SetBatchSize(MaxSkuBatchSize), userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
var skuPriceInfoList []*jdapi.SkuPriceInfo
var skuVendibilityList []*jdapi.StockVendibility
var skuStockList []*jdapi.SkuStock