- RefreshAllStoresID

- RefreshAllSkusID
- use new tasksch when possible(not use run directly).
This commit is contained in:
gazebo
2018-10-23 16:34:42 +08:00
parent ad3d548824
commit 93a7202423
18 changed files with 323 additions and 155 deletions

View File

@@ -122,7 +122,7 @@ func (v *VendorSync) GetSingleStoreHandler(vendorID int) partner.ISingleStoreHan
func (v *VendorSync) syncCategories(ctx *jxcontext.Context, multiStoresHandler partner.IMultipleStoresHandler, db *dao.DaoDB, cats []*model.SkuCategory, userName string) (err error) {
syncStatusFieldName := multiStoresHandler.GetFieldSyncStatusName()
task := tasksch.RunParallelTask("syncCategories", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
task := tasksch.NewParallelTask("syncCategories", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
cat := batchItemList[0].(*model.SkuCategory)
updateFields := []string{syncStatusFieldName}
syncStatus := jxutils.GetObjFieldByName(cat, syncStatusFieldName).(int8)
@@ -141,6 +141,7 @@ func (v *VendorSync) syncCategories(ctx *jxcontext.Context, multiStoresHandler p
return nil, err
}, cats)
ctx.SetTaskOrAddChild(task)
task.Run()
_, err = task.GetResult(0)
return err
}
@@ -220,7 +221,7 @@ func (v *VendorSync) SyncSku(ctx *jxcontext.Context, db *dao.DaoDB, nameID, skuI
err := dao.GetEntitiesByKV(db, &skuList, cond, true)
if err == nil {
// globals.SugarLogger.Debug(utils.Format4Output(skuList, false))
task := tasksch.RunParallelTask("SyncSku", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
task := tasksch.NewParallelTask("SyncSku", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
sku := batchItemList[0].(*model.Sku)
syncStatus := jxutils.GetObjFieldByName(sku, syncStatusFieldName).(int8)
if (skuID == -1 || skuID == sku.ID) && (syncStatus != 0) {
@@ -240,7 +241,7 @@ func (v *VendorSync) SyncSku(ctx *jxcontext.Context, db *dao.DaoDB, nameID, skuI
}
return nil, err
}, skuList)
t.AddChild(task)
t.AddChild(task).Run()
_, err = task.GetResult(0)
}
return nil, err
@@ -262,8 +263,9 @@ func (v *VendorSync) SyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendo
func (v *VendorSync) LoopStoreMap(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync bool, userName string, storeID int, handler tasksch.WorkFunc) (hint string, err error) {
storeMaps, err := GetStoreVendorMaps(ctx, db, storeID, -1)
if err == nil {
task := tasksch.RunManagedParallelTask(taskName, nil, userName, handler, storeMaps)
task := tasksch.NewParallelTask(taskName, nil, userName, handler, storeMaps)
ctx.SetTaskOrAddChild(task)
tasksch.ManageTask(task).Run()
hint = task.ID
if !isAsync {
_, err = task.GetResult(0)
@@ -276,8 +278,9 @@ func (v *VendorSync) LoopMultiStoresVendors(ctx *jxcontext.Context, db *dao.DaoD
if taskName == "" {
taskName = "LoopMultiStoresVendors"
}
task := tasksch.RunManagedParallelTask(taskName, nil, userName, handler, v.MultiStoreVendorIDs)
task := tasksch.NewParallelTask(taskName, nil, userName, handler, v.MultiStoreVendorIDs)
ctx.SetTaskOrAddChild(task)
tasksch.ManageTask(task).Run()
if !isAsync {
_, err = task.GetResult(0)
}
@@ -308,8 +311,9 @@ func (v *VendorSync) LoopStoreVendors(ctx *jxcontext.Context, db *dao.DaoDB, ven
}
allHandlers = allHandlers[:count]
}
task := tasksch.RunManagedParallelTask(taskName, nil, userName, handler, allHandlers)
task := tasksch.NewParallelTask(taskName, nil, userName, handler, allHandlers)
ctx.SetTaskOrAddChild(task)
tasksch.ManageTask(task).Run()
if !isAsync {
_, err = task.GetResult(0)
}
@@ -325,8 +329,9 @@ func (v *VendorSync) LoopSingleStoreVendors(ctx *jxcontext.Context, db *dao.DaoD
SELECT *
FROM store_map
WHERE vendor_id IN (`+dao.GenQuestionMarks(len(v.SingleStoreVendorIDs))+")", v.SingleStoreVendorIDs); err == nil {
task := tasksch.RunManagedParallelTask(taskName, nil, userName, handler, storeMaps)
task := tasksch.NewParallelTask(taskName, nil, userName, handler, storeMaps)
ctx.SetTaskOrAddChild(task)
tasksch.ManageTask(task).Run()
hint = task.ID
if !isAsync {
_, err = task.GetResult(0)
@@ -335,35 +340,40 @@ func (v *VendorSync) LoopSingleStoreVendors(ctx *jxcontext.Context, db *dao.DaoD
return hint, makeSyncError(err)
}
func (v *VendorSync) RefreshSkuIDs(ctx *jxcontext.Context, nameID, skuID int, userName string) (err error) {
sql := `
SELECT t1.id
FROM sku t1
JOIN sku_name t2 ON t1.name_id = t2.id
WHERE 1 = 1
`
sqlParams := []interface{}{}
if nameID != -1 {
sql += " AND t1.name_id = ?"
sqlParams = append(sqlParams, nameID)
}
if skuID != -1 {
sql += " AND t1.id = ?"
sqlParams = append(sqlParams, skuID)
func (v *VendorSync) RefreshAllSkusID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int, storeIDs []int) (hint string, err error) {
task := tasksch.NewParallelTask("RefreshAllSkusID", nil, ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
vendorID := batchItemList[0].(int)
if handler := v.GetStoreHandler(vendorID); handler != nil {
if multiHandler, ok := handler.(partner.IMultipleStoresHandler); ok {
_, err = multiHandler.RefreshAllSkusID(ctx, false)
} else if singleHandler, ok := handler.(partner.ISingleStoreHandler); ok {
_, err = singleHandler.RefreshStoresAllSkusID(ctx, false, storeIDs)
}
}
return nil, err
}, vendorIDs)
ctx.SetTaskOrAddChild(task)
tasksch.ManageTask(task).Run()
if !isAsync {
_, err = task.GetResult(0)
}
return task.ID, err
}
var ids []int
db := dao.GetDB()
if err = dao.GetRows(db, &ids, sql, sqlParams); err == nil {
// globals.SugarLogger.Debug(utils.Format4Output(ids, false))
_, err = v.LoopMultiStoresVendors(ctx, db, "RefreshSkuIDs", false, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
multiStoresHandler := v.GetMultiStoreHandler(batchItemList[0].(int))
err := multiStoresHandler.SyncSkusIDMap(db, ids, userName)
globals.SugarLogger.Debug(err)
return nil, err
})
func (v *VendorSync) RefreshAllStoresID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int) (hint string, err error) {
task := tasksch.NewParallelTask("RefreshAllStoresID", nil, ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
vendorID := batchItemList[0].(int)
if handler := v.GetStoreHandler(vendorID); handler != nil {
_, err = handler.RefreshAllStoresID(ctx, false)
}
return nil, err
}, vendorIDs)
ctx.SetTaskOrAddChild(task)
tasksch.ManageTask(task).Run()
if !isAsync {
_, err = task.GetResult(0)
}
return err
return task.ID, err
}
func makeSyncError(err error) (newErr error) {