- refactor jxcontent and tasksch (remove dependency from jxcontent to tasksch)

- send dingding msg to user when async task finished
This commit is contained in:
gazebo
2019-03-22 15:04:28 +08:00
parent 0f5445020a
commit 25ac631c5c
31 changed files with 1527 additions and 1488 deletions

View File

@@ -130,29 +130,29 @@ func (v *VendorSync) GetSingleStoreHandler(vendorID int) partner.ISingleStoreHan
func (v *VendorSync) syncCategories(ctx *jxcontext.Context, parentTask tasksch.ITask, multiStoresHandler partner.IMultipleStoresHandler, db *dao.DaoDB, cats []*model.SkuCategory, userName string) (err error) {
syncStatusFieldName := dao.GetSyncStatusStructField(model.VendorNames[multiStoresHandler.GetVendorID()])
task := tasksch.NewParallelTask("syncCategories", nil, userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
cat := batchItemList[0].(*model.SkuCategory)
updateFields := []string{syncStatusFieldName}
syncStatus := jxutils.GetObjFieldByName(cat, syncStatusFieldName).(int8)
if (syncStatus & model.SyncFlagDeletedMask) != 0 { //删除
err = multiStoresHandler.DeleteCategory(db, cat, userName)
} else if (syncStatus & model.SyncFlagNewMask) != 0 { // 新增
err = multiStoresHandler.CreateCategory(db, cat, userName)
updateFields = append(updateFields, dao.GetVendorThingIDStructField(model.VendorNames[multiStoresHandler.GetVendorID()]))
} else if (syncStatus & model.SyncFlagModifiedMask) != 0 { // 修改
err = multiStoresHandler.UpdateCategory(db, cat, userName)
if intErr, ok := err.(*utils.ErrorWithCode); ok && intErr.IntCode() == -3 {
err = nil
task := tasksch.NewParallelTask("syncCategories", nil, ctx,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
cat := batchItemList[0].(*model.SkuCategory)
updateFields := []string{syncStatusFieldName}
syncStatus := jxutils.GetObjFieldByName(cat, syncStatusFieldName).(int8)
if (syncStatus & model.SyncFlagDeletedMask) != 0 { //删除
err = multiStoresHandler.DeleteCategory(db, cat, userName)
} else if (syncStatus & model.SyncFlagNewMask) != 0 { // 新增
err = multiStoresHandler.CreateCategory(db, cat, userName)
updateFields = append(updateFields, dao.GetVendorThingIDStructField(model.VendorNames[multiStoresHandler.GetVendorID()]))
} else if (syncStatus & model.SyncFlagModifiedMask) != 0 { // 修改
err = multiStoresHandler.UpdateCategory(db, cat, userName)
if intErr, ok := err.(*utils.ErrorWithCode); ok && intErr.IntCode() == -3 {
err = nil
}
}
}
if err == nil {
jxutils.SetObjFieldByName(cat, syncStatusFieldName, int8(0))
_, err = dao.UpdateEntity(db, cat, updateFields...)
}
return nil, err
}, cats)
ctx.SetTaskOrAddChild(task, parentTask)
task.Run()
if err == nil {
jxutils.SetObjFieldByName(cat, syncStatusFieldName, int8(0))
_, err = dao.UpdateEntity(db, cat, updateFields...)
}
return nil, err
}, cats)
tasksch.HandleTask(task, parentTask, false).Run()
_, err = task.GetResult(0)
return err
}
@@ -212,14 +212,15 @@ func (v *VendorSync) SyncStore(ctx *jxcontext.Context, db *dao.DaoDB, vendorID,
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
handler := v.GetStoreHandler(loopMapInfo.VendorID)
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("SyncStore loop store %s", model.VendorChineseNames[loopMapInfo.VendorID]), nil, ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
storeMap := batchItemList[0].(*model.StoreMap)
if err = handler.UpdateStore(db, storeMap.StoreID, userName); err == nil {
storeMap.SyncStatus = 0
_, err = dao.UpdateEntity(db, storeMap, model.FieldSyncStatus)
}
return nil, err
}, loopMapInfo.StoreMapList)
loopStoreTask := tasksch.NewParallelTask(fmt.Sprintf("SyncStore loop store %s", model.VendorChineseNames[loopMapInfo.VendorID]), nil, ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
storeMap := batchItemList[0].(*model.StoreMap)
if err = handler.UpdateStore(db, storeMap.StoreID, userName); err == nil {
storeMap.SyncStatus = 0
_, err = dao.UpdateEntity(db, storeMap, model.FieldSyncStatus)
}
return nil, err
}, loopMapInfo.StoreMapList)
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
return nil, err
@@ -262,51 +263,52 @@ func (v *VendorSync) SyncSku(ctx *jxcontext.Context, db *dao.DaoDB, nameID, skuI
err := dao.GetRows(db, &skuNameList, sql, sqlParams...)
if err == nil && len(skuNameList) > 0 {
// todo 同一skuName下的sku顺序处理的原因是京东SPU特殊类型必须要序列化同步才能正常处理, db可能会有多线程问题
task := tasksch.NewParallelTask("SyncSku loop sku", tasksch.NewParallelConfig().SetParallelCount(10).SetIsContinueWhenError(isContinueWhenError), userName, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
skuName := batchItemList[0].(*model.SkuName)
var skuList []*model.Sku
if err = dao.GetRows(db, &skuList, fmt.Sprintf(`
task := tasksch.NewParallelTask("SyncSku loop sku", tasksch.NewParallelConfig().SetParallelCount(10).SetIsContinueWhenError(isContinueWhenError), ctx,
func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
skuName := batchItemList[0].(*model.SkuName)
var skuList []*model.Sku
if err = dao.GetRows(db, &skuList, fmt.Sprintf(`
SELECT *
FROM sku
WHERE name_id = ? AND %s_sync_status <> 0
ORDER BY IF(spec_unit IN('kg', 'L'), 1000, 1) * spec_quality
`, dbField), skuName.ID); err == nil && len(skuList) > 0 {
for _, sku := range skuList {
syncStatus := jxutils.GetObjFieldByName(sku, syncStatusFieldName).(int8)
if (skuID == -1 || skuID == sku.ID) && (syncStatus != 0) {
updateFields := []string{syncStatusFieldName}
if syncStatus&model.SyncFlagDeletedMask != 0 { // 删除
err = multiStoresHandler.DeleteSku(db, sku, userName)
} else if syncStatus&model.SyncFlagNewMask != 0 { // 新增
if err = multiStoresHandler.CreateSku(db, sku, userName); err == nil {
var tmpStruct struct {
MaxIndex int
for _, sku := range skuList {
syncStatus := jxutils.GetObjFieldByName(sku, syncStatusFieldName).(int8)
if (skuID == -1 || skuID == sku.ID) && (syncStatus != 0) {
updateFields := []string{syncStatusFieldName}
if syncStatus&model.SyncFlagDeletedMask != 0 { // 删除
err = multiStoresHandler.DeleteSku(db, sku, userName)
} else if syncStatus&model.SyncFlagNewMask != 0 { // 新增
if err = multiStoresHandler.CreateSku(db, sku, userName); err == nil {
var tmpStruct struct {
MaxIndex int
}
// todo hard code 得到京东spu中sku的顺序以方便以后修改销售属性这个必须要每次重新从数据库取
if dao.GetRow(db, &tmpStruct, "SELECT MAX(sku_index) max_index FROM sku WHERE name_id = ? AND jd_id > 0 AND jd_id < 4024012631406 ", sku.NameID) == nil {
sku.SkuIndex = tmpStruct.MaxIndex + 1
updateFields = append(updateFields, "SkuIndex")
}
updateFields = append(updateFields, dao.GetVendorThingIDStructField(model.VendorNames[multiStoresHandler.GetVendorID()]))
}
// todo hard code 得到京东spu中sku的顺序以方便以后修改销售属性这个必须要每次重新从数据库取
if dao.GetRow(db, &tmpStruct, "SELECT MAX(sku_index) max_index FROM sku WHERE name_id = ? AND jd_id > 0 AND jd_id < 4024012631406 ", sku.NameID) == nil {
sku.SkuIndex = tmpStruct.MaxIndex + 1
updateFields = append(updateFields, "SkuIndex")
}
updateFields = append(updateFields, dao.GetVendorThingIDStructField(model.VendorNames[multiStoresHandler.GetVendorID()]))
} else if syncStatus&model.SyncFlagModifiedMask != 0 { // 修改
err = multiStoresHandler.UpdateSku(db, sku, userName)
}
} else if syncStatus&model.SyncFlagModifiedMask != 0 { // 修改
err = multiStoresHandler.UpdateSku(db, sku, userName)
}
if err == nil {
jxutils.SetObjFieldByName(sku, syncStatusFieldName, int8(0))
if _, err = dao.UpdateEntity(db, sku, updateFields...); err != nil {
break
if err == nil {
jxutils.SetObjFieldByName(sku, syncStatusFieldName, int8(0))
if _, err = dao.UpdateEntity(db, sku, updateFields...); err != nil {
break
}
}
}
}
}
}
if err == nil {
jxutils.SetObjFieldByName(skuName, syncStatusFieldName, int8(0))
_, err = dao.UpdateEntity(db, skuName, syncStatusFieldName)
}
return nil, err
}, skuNameList)
if err == nil {
jxutils.SetObjFieldByName(skuName, syncStatusFieldName, int8(0))
_, err = dao.UpdateEntity(db, skuName, syncStatusFieldName)
}
return nil, err
}, skuNameList)
t.AddChild(task).Run()
_, err = task.GetResult(0)
}
@@ -320,11 +322,12 @@ func (v *VendorSync) SyncStoresCategory(ctx *jxcontext.Context, db *dao.DaoDB, v
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if handler := v.GetSingleStoreHandler(loopMapInfo.VendorID); handler != nil {
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewSeqTask("SyncStoresCategory loop stores", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
storeID := loopMapInfo.StoreMapList[step].StoreID
_, err = handler.SyncStoreCategory(ctx, task, storeID, false)
return nil, err
}, len(loopMapInfo.StoreMapList))
loopStoreTask := tasksch.NewSeqTask("SyncStoresCategory loop stores", ctx,
func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
storeID := loopMapInfo.StoreMapList[step].StoreID
_, err = handler.SyncStoreCategory(ctx, task, storeID, false)
return nil, err
}, len(loopMapInfo.StoreMapList))
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
return nil, err
@@ -342,16 +345,17 @@ func (v *VendorSync) SyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendo
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil {
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewSeqTask("SyncStoresSkus相同平台循环门店", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
storeID := loopMapInfo.StoreMapList[step].StoreID
if _, err = handler.SyncStoreSkus(ctx, task, storeID, skuIDs, false, isContinueWhenError); err != nil {
globals.SugarLogger.Debugf("SyncStoresSkus failed1 store:%d failed with error:%v", storeID, err)
if isContinueWhenError {
err = nil
loopStoreTask := tasksch.NewSeqTask("SyncStoresSkus相同平台循环门店", ctx,
func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
storeID := loopMapInfo.StoreMapList[step].StoreID
if _, err = handler.SyncStoreSkus(ctx, task, storeID, skuIDs, false, isContinueWhenError); err != nil {
globals.SugarLogger.Debugf("SyncStoresSkus failed1 store:%d failed with error:%v", storeID, err)
if isContinueWhenError {
err = nil
}
}
}
return nil, err
}, len(loopMapInfo.StoreMapList))
return nil, err
}, len(loopMapInfo.StoreMapList))
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
return nil, err
@@ -368,11 +372,12 @@ func (v *VendorSync) FullSyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, v
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil {
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewSeqTask("FullSyncStoresSkus相同平台循环门店", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
storeID := loopMapInfo.StoreMapList[step].StoreID
_, err = handler.FullSyncStoreSkus(ctx, task, storeID, false, isContinueWhenError)
return nil, err
}, len(loopMapInfo.StoreMapList))
loopStoreTask := tasksch.NewSeqTask("FullSyncStoresSkus相同平台循环门店", ctx,
func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
storeID := loopMapInfo.StoreMapList[step].StoreID
_, err = handler.FullSyncStoreSkus(ctx, task, storeID, false, isContinueWhenError)
return nil, err
}, len(loopMapInfo.StoreMapList))
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
return nil, err
@@ -389,11 +394,12 @@ func (v *VendorSync) DeleteRemoteStoreSkus(ctx *jxcontext.Context, db *dao.DaoDB
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil {
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewSeqTask("DeleteRemoteStoreSkus相同平台循环门店", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
storeID := loopMapInfo.StoreMapList[step].StoreID
_, err = handler.DeleteRemoteStoreSkus(ctx, task, storeID, false, isContinueWhenError)
return nil, err
}, len(loopMapInfo.StoreMapList))
loopStoreTask := tasksch.NewSeqTask("DeleteRemoteStoreSkus相同平台循环门店", ctx,
func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
storeID := loopMapInfo.StoreMapList[step].StoreID
_, err = handler.DeleteRemoteStoreSkus(ctx, task, storeID, false, isContinueWhenError)
return nil, err
}, len(loopMapInfo.StoreMapList))
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
return nil, err
@@ -443,9 +449,8 @@ func (v *VendorSync) LoopStoresMap(ctx *jxcontext.Context, db *dao.DaoDB, taskNa
}
index++
}
task := tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx.GetUserName(), handler, loopInfoList)
ctx.SetTaskOrAddChild(task, nil)
tasksch.ManageTask(task).Run()
task := tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, handler, loopInfoList)
tasksch.HandleTask(task, nil, true).Run()
if !isAsync {
_, err = task.GetResult(0)
}
@@ -453,62 +458,28 @@ func (v *VendorSync) LoopStoresMap(ctx *jxcontext.Context, db *dao.DaoDB, taskNa
}
func (v *VendorSync) LoopMultiStoresVendors(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync bool, userName string, handler tasksch.WorkFunc) (hint string, err error) {
task := tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), userName, handler, v.MultiStoreVendorIDs)
ctx.SetTaskOrAddChild(task, nil)
tasksch.ManageTask(task).Run()
task := tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, handler, v.MultiStoreVendorIDs)
tasksch.HandleTask(task, nil, true).Run()
if !isAsync {
_, err = task.GetResult(0)
}
return task.ID, makeSyncError(err)
}
// func (v *VendorSync) LoopStoreVendors(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, taskName string, isAsync bool, userName string, handler tasksch.WorkFunc) (hint string, err error) {
// if taskName == "" {
// taskName = "LoopStoreVendors"
// }
// var vendorIDMap map[int]int
// if len(vendorIDs) != 0 {
// vendorIDMap = make(map[int]int)
// for _, v := range vendorIDs {
// vendorIDMap[v] = 1
// }
// }
// allHandlers := make([]int, len(v.MultiStoreVendorIDs)+len(v.SingleStoreVendorIDs))
// copy(allHandlers, v.MultiStoreVendorIDs)
// copy(allHandlers[len(v.MultiStoreVendorIDs):], v.SingleStoreVendorIDs)
// if vendorIDMap != nil {
// count := 0
// for _, v := range allHandlers {
// if vendorIDMap[v] == 1 {
// allHandlers[count] = v
// count++
// }
// }
// allHandlers = allHandlers[:count]
// }
// task := tasksch.NewParallelTask(taskName, nil, userName, handler, allHandlers)
// ctx.SetTaskOrAddChild(task, nil)
// tasksch.ManageTask(task).Run()
// if !isAsync {
// _, err = task.GetResult(0)
// }
// return task.ID, err
// }
func (v *VendorSync) RefreshAllSkusID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int, storeIDs []int) (hint string, err error) {
task := tasksch.NewParallelTask("RefreshAllSkusID", nil, ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
vendorID := batchItemList[0].(int)
if handler := v.GetStoreHandler(vendorID); handler != nil {
if multiHandler, ok := handler.(partner.IMultipleStoresHandler); ok {
_, err = multiHandler.RefreshAllSkusID(ctx, task, false)
} else if singleHandler, ok := handler.(partner.ISingleStoreHandler); ok {
_, err = singleHandler.RefreshStoresAllSkusID(ctx, task, false, storeIDs)
task := tasksch.NewParallelTask("RefreshAllSkusID", nil, ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
vendorID := batchItemList[0].(int)
if handler := v.GetStoreHandler(vendorID); handler != nil {
if multiHandler, ok := handler.(partner.IMultipleStoresHandler); ok {
_, err = multiHandler.RefreshAllSkusID(ctx, task, false)
} else if singleHandler, ok := handler.(partner.ISingleStoreHandler); ok {
_, err = singleHandler.RefreshStoresAllSkusID(ctx, task, false, storeIDs)
}
}
}
return nil, err
}, vendorIDs)
ctx.SetTaskOrAddChild(task, nil)
tasksch.ManageTask(task).Run()
return nil, err
}, vendorIDs)
tasksch.HandleTask(task, nil, true).Run()
if !isAsync {
_, err = task.GetResult(0)
}
@@ -516,15 +487,15 @@ func (v *VendorSync) RefreshAllSkusID(ctx *jxcontext.Context, isAsync bool, vend
}
func (v *VendorSync) RefreshAllStoresID(ctx *jxcontext.Context, isAsync bool, vendorIDs []int) (hint string, err error) {
task := tasksch.NewParallelTask("RefreshAllStoresID", nil, ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
vendorID := batchItemList[0].(int)
if handler := v.GetStoreHandler(vendorID); handler != nil {
_, err = handler.RefreshAllStoresID(ctx, task, false)
}
return nil, err
}, vendorIDs)
ctx.SetTaskOrAddChild(task, nil)
tasksch.ManageTask(task).Run()
task := tasksch.NewParallelTask("RefreshAllStoresID", nil, ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
vendorID := batchItemList[0].(int)
if handler := v.GetStoreHandler(vendorID); handler != nil {
_, err = handler.RefreshAllStoresID(ctx, task, false)
}
return nil, err
}, vendorIDs)
tasksch.HandleTask(task, nil, true).Run()
if !isAsync {
_, err = task.GetResult(0)
}