- optimize PullJdOrder
This commit is contained in:
@@ -37,10 +37,12 @@ func TransferLegacyJdOrder(ctx *jxcontext.Context, isAsync, isContinueWhenError
|
|||||||
resultList := detail["result"].(map[string]interface{})["resultList"].([]interface{})
|
resultList := detail["result"].(map[string]interface{})["resultList"].([]interface{})
|
||||||
if len(resultList) > 0 {
|
if len(resultList) > 0 {
|
||||||
originalData := resultList[0].(map[string]interface{})
|
originalData := resultList[0].(map[string]interface{})
|
||||||
if originalData["orgCode"].(string) == "320406" {
|
orgCode := originalData["orgCode"].(string)
|
||||||
|
if orgCode == "320406" {
|
||||||
orderDetail := &model.GoodsOrderOriginal{
|
orderDetail := &model.GoodsOrderOriginal{
|
||||||
VendorOrderID: jdOrder.VendorOrderID,
|
VendorOrderID: jdOrder.VendorOrderID,
|
||||||
VendorID: model.VendorIDJD,
|
VendorID: model.VendorIDJD,
|
||||||
|
AccountNo: orgCode,
|
||||||
OrderCreatedAt: utils.Str2Time(originalData["orderPurchaseTime"].(string)),
|
OrderCreatedAt: utils.Str2Time(originalData["orderPurchaseTime"].(string)),
|
||||||
OriginalData: string(utils.MustMarshal(originalData)),
|
OriginalData: string(utils.MustMarshal(originalData)),
|
||||||
}
|
}
|
||||||
@@ -87,6 +89,7 @@ func TransferLegacyElmOrder(ctx *jxcontext.Context, isAsync, isContinueWhenError
|
|||||||
orderDetail := &model.GoodsOrderOriginal{
|
orderDetail := &model.GoodsOrderOriginal{
|
||||||
VendorOrderID: elmOrder.Orderid,
|
VendorOrderID: elmOrder.Orderid,
|
||||||
VendorID: model.VendorIDELM,
|
VendorID: model.VendorIDELM,
|
||||||
|
AccountNo: "fakeelm",
|
||||||
OrderCreatedAt: utils.Str2Time(detail["activeAt"].(string)),
|
OrderCreatedAt: utils.Str2Time(detail["activeAt"].(string)),
|
||||||
OriginalData: elmOrder.Data,
|
OriginalData: elmOrder.Data,
|
||||||
}
|
}
|
||||||
@@ -113,9 +116,11 @@ func saveJdOrderList(existJdIDMap map[string]int, jdOrderList []interface{}) (er
|
|||||||
orderMap := v.(map[string]interface{})
|
orderMap := v.(map[string]interface{})
|
||||||
orderID := utils.Int64ToStr(utils.MustInterface2Int64(orderMap["orderId"]))
|
orderID := utils.Int64ToStr(utils.MustInterface2Int64(orderMap["orderId"]))
|
||||||
if existJdIDMap[orderID] == 0 {
|
if existJdIDMap[orderID] == 0 {
|
||||||
|
orgCode := orderMap["orgCode"].(string)
|
||||||
orderDetail := &model.GoodsOrderOriginal{
|
orderDetail := &model.GoodsOrderOriginal{
|
||||||
VendorOrderID: orderID,
|
VendorOrderID: orderID,
|
||||||
VendorID: model.VendorIDJD,
|
VendorID: model.VendorIDJD,
|
||||||
|
AccountNo: orgCode,
|
||||||
OrderCreatedAt: utils.Str2Time(orderMap["orderPurchaseTime"].(string)),
|
OrderCreatedAt: utils.Str2Time(orderMap["orderPurchaseTime"].(string)),
|
||||||
OriginalData: string(utils.MustMarshal(orderMap)),
|
OriginalData: string(utils.MustMarshal(orderMap)),
|
||||||
}
|
}
|
||||||
@@ -126,8 +131,8 @@ func saveJdOrderList(existJdIDMap map[string]int, jdOrderList []interface{}) (er
|
|||||||
// for _, v := range orderDetailList {
|
// for _, v := range orderDetailList {
|
||||||
// globals.SugarLogger.Debug(v.VendorOrderID)
|
// globals.SugarLogger.Debug(v.VendorOrderID)
|
||||||
// }
|
// }
|
||||||
// db := dao.GetDB()
|
db := dao.GetDB()
|
||||||
// err = dao.CreateMultiEntities(db, orderDetailList)
|
err = dao.CreateMultiEntities(db, orderDetailList)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -147,15 +152,22 @@ func PullJdOrder(ctx *jxcontext.Context, fromTime, toTime time.Time, isAsync, is
|
|||||||
}
|
}
|
||||||
|
|
||||||
pageSize := 100
|
pageSize := 100
|
||||||
hours := int((toTime.Sub(fromTime)-time.Hour)/time.Hour + 1)
|
hourGap := 1
|
||||||
globals.SugarLogger.Debug(hours)
|
gapCount := int((toTime.Sub(fromTime)-time.Hour*time.Duration(hourGap))/(time.Hour*time.Duration(hourGap)) + 1)
|
||||||
rootTask := tasksch.NewSeqTask("PullJdOrder", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
|
gapList := make([]int, gapCount)
|
||||||
subFromTime := fromTime.Add(time.Duration(step) * time.Hour)
|
for k := range gapList {
|
||||||
subToTime := fromTime.Add(time.Duration(step+1) * time.Hour)
|
gapList[k] = k
|
||||||
|
|
||||||
|
}
|
||||||
|
rootTask := tasksch.NewParallelTask("PullJdOrder", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetParallelCount(20), ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
||||||
|
gapIndex := batchItemList[0].(int)
|
||||||
|
hourIndex := gapIndex * hourGap
|
||||||
|
subFromTime := fromTime.Add(time.Duration(hourIndex) * time.Hour)
|
||||||
|
subToTime := fromTime.Add(time.Duration(hourIndex+hourGap) * time.Hour)
|
||||||
if subToTime.Sub(toTime) > 0 {
|
if subToTime.Sub(toTime) > 0 {
|
||||||
subToTime = toTime
|
subToTime = toTime
|
||||||
}
|
}
|
||||||
if step < hours-1 {
|
if gapIndex < gapCount-1 {
|
||||||
subToTime = subToTime.Add(-1 * time.Second) // 减一秒
|
subToTime = subToTime.Add(-1 * time.Second) // 减一秒
|
||||||
}
|
}
|
||||||
commonParams := map[string]interface{}{
|
commonParams := map[string]interface{}{
|
||||||
@@ -163,35 +175,18 @@ func PullJdOrder(ctx *jxcontext.Context, fromTime, toTime time.Time, isAsync, is
|
|||||||
"orderPurchaseTime_begin": utils.Time2Str(subFromTime),
|
"orderPurchaseTime_begin": utils.Time2Str(subFromTime),
|
||||||
"orderPurchaseTime_end": utils.Time2Str(subToTime),
|
"orderPurchaseTime_end": utils.Time2Str(subToTime),
|
||||||
}
|
}
|
||||||
orderList, totalCount, err := api.JdAPI.OrderQuery(utils.MergeMaps(commonParams, utils.Params2Map("pageNo", 1)))
|
orderList, totalCount, err := api.JdAPI.OrderQuery(utils.MergeMaps(commonParams, utils.Params2Map("pageNo", 0)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
if err = saveJdOrderList(existJdIDMap, orderList); err != nil {
|
if err = saveJdOrderList(existJdIDMap, orderList); err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
leftPageCount := (totalCount - 1) / pageSize
|
if false {
|
||||||
globals.SugarLogger.Debugf("subFromTime:%s, subToTime:%s, leftPageCount:%d, totalCount:%d", utils.Time2Str(subFromTime), utils.Time2Str(subToTime), leftPageCount, totalCount)
|
globals.SugarLogger.Debugf("subFromTime:%s, subToTime:%s, totalCount:%d, len(orderList):%d", utils.Time2Str(subFromTime), utils.Time2Str(subToTime), totalCount, len(orderList))
|
||||||
if leftPageCount > 0 {
|
|
||||||
pageNoList := make([]int, leftPageCount)
|
|
||||||
for k := range pageNoList {
|
|
||||||
pageNoList[k] = k + 2
|
|
||||||
}
|
|
||||||
subTask := tasksch.NewParallelTask("PullJdOrder sub", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetParallelCount(10), ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
|
||||||
subOrderList, subTotalCount, err := api.JdAPI.OrderQuery(utils.MergeMaps(commonParams, utils.Params2Map("pageNo", batchItemList[0])))
|
|
||||||
if err == nil {
|
|
||||||
if totalCount != subTotalCount {
|
|
||||||
globals.SugarLogger.Warnf("PullJdOrder totalCount:%d, subTotalCount:%d are not same", totalCount, subTotalCount)
|
|
||||||
}
|
|
||||||
err = saveJdOrderList(existJdIDMap, subOrderList)
|
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}, pageNoList)
|
}, gapList)
|
||||||
task.AddChild(subTask).Run()
|
|
||||||
_, err = subTask.GetResult(0)
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}, hours)
|
|
||||||
tasksch.ManageTask(rootTask).Run()
|
tasksch.ManageTask(rootTask).Run()
|
||||||
|
|
||||||
if !isAsync {
|
if !isAsync {
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ import (
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
DefParallelCount = 10
|
DefParallelCount = 10
|
||||||
MaxParallelCount = 10
|
MaxParallelCount = 50
|
||||||
)
|
)
|
||||||
|
|
||||||
type WorkFunc func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error)
|
type WorkFunc func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error)
|
||||||
|
|||||||
@@ -66,6 +66,7 @@ type GoodsOrderOriginal struct {
|
|||||||
ID int64 `orm:"column(id)" json:"-"`
|
ID int64 `orm:"column(id)" json:"-"`
|
||||||
VendorOrderID string `orm:"column(vendor_order_id);size(48)" json:"vendorOrderID"`
|
VendorOrderID string `orm:"column(vendor_order_id);size(48)" json:"vendorOrderID"`
|
||||||
VendorID int `orm:"column(vendor_id)" json:"vendorID"`
|
VendorID int `orm:"column(vendor_id)" json:"vendorID"`
|
||||||
|
AccountNo string `orm:"size(32)" json:"accountNo"`
|
||||||
OrderCreatedAt time.Time `orm:"type(datetime);index" json:"orderCreatedAt"` // 这里记录的是订单生效时间,即用户支付完成(货到付款即为下单时间)
|
OrderCreatedAt time.Time `orm:"type(datetime);index" json:"orderCreatedAt"` // 这里记录的是订单生效时间,即用户支付完成(货到付款即为下单时间)
|
||||||
CreatedAt time.Time `orm:"auto_now_add;type(datetime)" json:"createdAt"`
|
CreatedAt time.Time `orm:"auto_now_add;type(datetime)" json:"createdAt"`
|
||||||
OriginalData string `orm:"type(text)" json:"-"`
|
OriginalData string `orm:"type(text)" json:"-"`
|
||||||
|
|||||||
@@ -5,6 +5,36 @@ import (
|
|||||||
"git.rosy.net.cn/jx-callback/business/jxutils"
|
"git.rosy.net.cn/jx-callback/business/jxutils"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// @Title 将遗留JD订单合并
|
||||||
|
// @Description 将遗留JD订单合并
|
||||||
|
// @Param token header string true "认证token"
|
||||||
|
// @Param isAsync formData bool false "是否异步操作"
|
||||||
|
// @Param isContinueWhenError formData bool false "单个同步失败是否继续,缺省false"
|
||||||
|
// @Success 200 {object} controllers.CallResult
|
||||||
|
// @Failure 200 {object} controllers.CallResult
|
||||||
|
// @router /TransferLegacyJdOrder [post]
|
||||||
|
func (c *InitDataController) TransferLegacyJdOrder() {
|
||||||
|
c.callTransferLegacyJdOrder(func(params *tInitdataTransferLegacyJdOrderParams) (retVal interface{}, errCode string, err error) {
|
||||||
|
retVal, err = initdata.TransferLegacyJdOrder(params.Ctx, params.IsAsync, params.IsContinueWhenError)
|
||||||
|
return retVal, "", err
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// @Title 将遗留ELM订单合并
|
||||||
|
// @Description 将遗留ELM订单合并
|
||||||
|
// @Param token header string true "认证token"
|
||||||
|
// @Param isAsync formData bool false "是否异步操作"
|
||||||
|
// @Param isContinueWhenError formData bool false "单个同步失败是否继续,缺省false"
|
||||||
|
// @Success 200 {object} controllers.CallResult
|
||||||
|
// @Failure 200 {object} controllers.CallResult
|
||||||
|
// @router /TransferLegacyElmOrder [post]
|
||||||
|
func (c *InitDataController) TransferLegacyElmOrder() {
|
||||||
|
c.callTransferLegacyElmOrder(func(params *tInitdataTransferLegacyElmOrderParams) (retVal interface{}, errCode string, err error) {
|
||||||
|
retVal, err = initdata.TransferLegacyElmOrder(params.Ctx, params.IsAsync, params.IsContinueWhenError)
|
||||||
|
return retVal, "", err
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// @Title 拉取京东订单补齐本地
|
// @Title 拉取京东订单补齐本地
|
||||||
// @Description 拉取京东订单补齐本地
|
// @Description 拉取京东订单补齐本地
|
||||||
// @Param token header string true "认证token"
|
// @Param token header string true "认证token"
|
||||||
|
|||||||
Reference in New Issue
Block a user