diff --git a/business/jxstore/initdata/temp_op.go b/business/jxstore/initdata/temp_op.go index ebd94e71a..5aa6a5f92 100644 --- a/business/jxstore/initdata/temp_op.go +++ b/business/jxstore/initdata/temp_op.go @@ -37,10 +37,12 @@ func TransferLegacyJdOrder(ctx *jxcontext.Context, isAsync, isContinueWhenError resultList := detail["result"].(map[string]interface{})["resultList"].([]interface{}) if len(resultList) > 0 { originalData := resultList[0].(map[string]interface{}) - if originalData["orgCode"].(string) == "320406" { + orgCode := originalData["orgCode"].(string) + if orgCode == "320406" { orderDetail := &model.GoodsOrderOriginal{ VendorOrderID: jdOrder.VendorOrderID, VendorID: model.VendorIDJD, + AccountNo: orgCode, OrderCreatedAt: utils.Str2Time(originalData["orderPurchaseTime"].(string)), OriginalData: string(utils.MustMarshal(originalData)), } @@ -87,6 +89,7 @@ func TransferLegacyElmOrder(ctx *jxcontext.Context, isAsync, isContinueWhenError orderDetail := &model.GoodsOrderOriginal{ VendorOrderID: elmOrder.Orderid, VendorID: model.VendorIDELM, + AccountNo: "fakeelm", OrderCreatedAt: utils.Str2Time(detail["activeAt"].(string)), OriginalData: elmOrder.Data, } @@ -113,9 +116,11 @@ func saveJdOrderList(existJdIDMap map[string]int, jdOrderList []interface{}) (er orderMap := v.(map[string]interface{}) orderID := utils.Int64ToStr(utils.MustInterface2Int64(orderMap["orderId"])) if existJdIDMap[orderID] == 0 { + orgCode := orderMap["orgCode"].(string) orderDetail := &model.GoodsOrderOriginal{ VendorOrderID: orderID, VendorID: model.VendorIDJD, + AccountNo: orgCode, OrderCreatedAt: utils.Str2Time(orderMap["orderPurchaseTime"].(string)), OriginalData: string(utils.MustMarshal(orderMap)), } @@ -126,8 +131,8 @@ func saveJdOrderList(existJdIDMap map[string]int, jdOrderList []interface{}) (er // for _, v := range orderDetailList { // globals.SugarLogger.Debug(v.VendorOrderID) // } - // db := dao.GetDB() - // err = dao.CreateMultiEntities(db, orderDetailList) + db := dao.GetDB() + err = dao.CreateMultiEntities(db, orderDetailList) } return err } @@ -147,15 +152,22 @@ func PullJdOrder(ctx *jxcontext.Context, fromTime, toTime time.Time, isAsync, is } pageSize := 100 - hours := int((toTime.Sub(fromTime)-time.Hour)/time.Hour + 1) - globals.SugarLogger.Debug(hours) - rootTask := tasksch.NewSeqTask("PullJdOrder", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { - subFromTime := fromTime.Add(time.Duration(step) * time.Hour) - subToTime := fromTime.Add(time.Duration(step+1) * time.Hour) + hourGap := 1 + gapCount := int((toTime.Sub(fromTime)-time.Hour*time.Duration(hourGap))/(time.Hour*time.Duration(hourGap)) + 1) + gapList := make([]int, gapCount) + for k := range gapList { + gapList[k] = k + + } + rootTask := tasksch.NewParallelTask("PullJdOrder", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetParallelCount(20), ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + gapIndex := batchItemList[0].(int) + hourIndex := gapIndex * hourGap + subFromTime := fromTime.Add(time.Duration(hourIndex) * time.Hour) + subToTime := fromTime.Add(time.Duration(hourIndex+hourGap) * time.Hour) if subToTime.Sub(toTime) > 0 { subToTime = toTime } - if step < hours-1 { + if gapIndex < gapCount-1 { subToTime = subToTime.Add(-1 * time.Second) // 减一秒 } commonParams := map[string]interface{}{ @@ -163,35 +175,18 @@ func PullJdOrder(ctx *jxcontext.Context, fromTime, toTime time.Time, isAsync, is "orderPurchaseTime_begin": utils.Time2Str(subFromTime), "orderPurchaseTime_end": utils.Time2Str(subToTime), } - orderList, totalCount, err := api.JdAPI.OrderQuery(utils.MergeMaps(commonParams, utils.Params2Map("pageNo", 1))) + orderList, totalCount, err := api.JdAPI.OrderQuery(utils.MergeMaps(commonParams, utils.Params2Map("pageNo", 0))) if err != nil { return "", err } if err = saveJdOrderList(existJdIDMap, orderList); err != nil { return "", err } - leftPageCount := (totalCount - 1) / pageSize - globals.SugarLogger.Debugf("subFromTime:%s, subToTime:%s, leftPageCount:%d, totalCount:%d", utils.Time2Str(subFromTime), utils.Time2Str(subToTime), leftPageCount, totalCount) - if leftPageCount > 0 { - pageNoList := make([]int, leftPageCount) - for k := range pageNoList { - pageNoList[k] = k + 2 - } - subTask := tasksch.NewParallelTask("PullJdOrder sub", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetParallelCount(10), ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { - subOrderList, subTotalCount, err := api.JdAPI.OrderQuery(utils.MergeMaps(commonParams, utils.Params2Map("pageNo", batchItemList[0]))) - if err == nil { - if totalCount != subTotalCount { - globals.SugarLogger.Warnf("PullJdOrder totalCount:%d, subTotalCount:%d are not same", totalCount, subTotalCount) - } - err = saveJdOrderList(existJdIDMap, subOrderList) - } - return nil, err - }, pageNoList) - task.AddChild(subTask).Run() - _, err = subTask.GetResult(0) + if false { + globals.SugarLogger.Debugf("subFromTime:%s, subToTime:%s, totalCount:%d, len(orderList):%d", utils.Time2Str(subFromTime), utils.Time2Str(subToTime), totalCount, len(orderList)) } return nil, err - }, hours) + }, gapList) tasksch.ManageTask(rootTask).Run() if !isAsync { diff --git a/business/jxutils/tasksch/parallel_task.go b/business/jxutils/tasksch/parallel_task.go index 71eff0da0..8655b495f 100644 --- a/business/jxutils/tasksch/parallel_task.go +++ b/business/jxutils/tasksch/parallel_task.go @@ -11,7 +11,7 @@ import ( const ( DefParallelCount = 10 - MaxParallelCount = 10 + MaxParallelCount = 50 ) type WorkFunc func(task *ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) diff --git a/business/model/order.go b/business/model/order.go index 5fc6f85cc..6ee044b7f 100644 --- a/business/model/order.go +++ b/business/model/order.go @@ -66,6 +66,7 @@ type GoodsOrderOriginal struct { ID int64 `orm:"column(id)" json:"-"` VendorOrderID string `orm:"column(vendor_order_id);size(48)" json:"vendorOrderID"` VendorID int `orm:"column(vendor_id)" json:"vendorID"` + AccountNo string `orm:"size(32)" json:"accountNo"` OrderCreatedAt time.Time `orm:"type(datetime);index" json:"orderCreatedAt"` // 这里记录的是订单生效时间,即用户支付完成(货到付款即为下单时间) CreatedAt time.Time `orm:"auto_now_add;type(datetime)" json:"createdAt"` OriginalData string `orm:"type(text)" json:"-"` diff --git a/controllers/temp_op.go b/controllers/temp_op.go index 1cb30b958..309e10107 100644 --- a/controllers/temp_op.go +++ b/controllers/temp_op.go @@ -5,6 +5,36 @@ import ( "git.rosy.net.cn/jx-callback/business/jxutils" ) +// @Title 将遗留JD订单合并 +// @Description 将遗留JD订单合并 +// @Param token header string true "认证token" +// @Param isAsync formData bool false "是否异步操作" +// @Param isContinueWhenError formData bool false "单个同步失败是否继续,缺省false" +// @Success 200 {object} controllers.CallResult +// @Failure 200 {object} controllers.CallResult +// @router /TransferLegacyJdOrder [post] +func (c *InitDataController) TransferLegacyJdOrder() { + c.callTransferLegacyJdOrder(func(params *tInitdataTransferLegacyJdOrderParams) (retVal interface{}, errCode string, err error) { + retVal, err = initdata.TransferLegacyJdOrder(params.Ctx, params.IsAsync, params.IsContinueWhenError) + return retVal, "", err + }) +} + +// @Title 将遗留ELM订单合并 +// @Description 将遗留ELM订单合并 +// @Param token header string true "认证token" +// @Param isAsync formData bool false "是否异步操作" +// @Param isContinueWhenError formData bool false "单个同步失败是否继续,缺省false" +// @Success 200 {object} controllers.CallResult +// @Failure 200 {object} controllers.CallResult +// @router /TransferLegacyElmOrder [post] +func (c *InitDataController) TransferLegacyElmOrder() { + c.callTransferLegacyElmOrder(func(params *tInitdataTransferLegacyElmOrderParams) (retVal interface{}, errCode string, err error) { + retVal, err = initdata.TransferLegacyElmOrder(params.Ctx, params.IsAsync, params.IsContinueWhenError) + return retVal, "", err + }) +} + // @Title 拉取京东订单补齐本地 // @Description 拉取京东订单补齐本地 // @Param token header string true "认证token"