From f6ed94261927ad588d5f6c8f2f613f9dd6e00076 Mon Sep 17 00:00:00 2001 From: gazebo Date: Thu, 10 Jan 2019 18:49:42 +0800 Subject: [PATCH] - temp op for jd order --- business/jxstore/initdata/temp_op.go | 203 +++++++++++++++++++++++++ business/model/legacymodel2/jdorder.go | 1 + controllers/temp_op.go | 27 ++++ routers/commentsRouter_controllers.go | 8 + 4 files changed, 239 insertions(+) create mode 100644 business/jxstore/initdata/temp_op.go create mode 100644 controllers/temp_op.go diff --git a/business/jxstore/initdata/temp_op.go b/business/jxstore/initdata/temp_op.go new file mode 100644 index 000000000..ebd94e71a --- /dev/null +++ b/business/jxstore/initdata/temp_op.go @@ -0,0 +1,203 @@ +package initdata + +import ( + "time" + + "git.rosy.net.cn/baseapi/utils" + "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" + "git.rosy.net.cn/jx-callback/business/jxutils/tasksch" + "git.rosy.net.cn/jx-callback/business/model" + "git.rosy.net.cn/jx-callback/business/model/dao" + "git.rosy.net.cn/jx-callback/business/model/legacymodel2" + "git.rosy.net.cn/jx-callback/globals" + "git.rosy.net.cn/jx-callback/globals/api" +) + +func TransferLegacyJdOrder(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) { + sql := ` + SELECT t1.* + FROM jdorder t1 + LEFT JOIN goods_order t2 ON t2.vendor_order_id = t1.vendor_order_id + WHERE t2.id IS NULL + ` + db := dao.GetDB() + var jdOrderList []*legacymodel2.Jdorder + if err = dao.GetRows(db, &jdOrderList, sql); err != nil { + return "", err + } + + task := tasksch.NewParallelTask("TransferLegacyJdOrder", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetBatchSize(40), ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + var orderDetailList []*model.GoodsOrderOriginal + for _, v := range batchItemList { + jdOrder := v.(*legacymodel2.Jdorder) + var detail map[string]interface{} + if err = utils.UnmarshalUseNumber([]byte(jdOrder.Data), &detail); err != nil { + return nil, err + } + resultList := detail["result"].(map[string]interface{})["resultList"].([]interface{}) + if len(resultList) > 0 { + originalData := resultList[0].(map[string]interface{}) + if originalData["orgCode"].(string) == "320406" { + orderDetail := &model.GoodsOrderOriginal{ + VendorOrderID: jdOrder.VendorOrderID, + VendorID: model.VendorIDJD, + OrderCreatedAt: utils.Str2Time(originalData["orderPurchaseTime"].(string)), + OriginalData: string(utils.MustMarshal(originalData)), + } + orderDetailList = append(orderDetailList, orderDetail) + } + } + } + if len(orderDetailList) > 0 { + err = dao.CreateMultiEntities(db, orderDetailList) + } + return nil, err + }, jdOrderList) + tasksch.ManageTask(task).Run() + + if !isAsync { + _, err = task.GetResult(0) + } else { + hint = task.ID + } + return hint, err +} + +func TransferLegacyElmOrder(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) { + sql := ` + SELECT t1.* + FROM elemeorder t1 + LEFT JOIN goods_order t2 ON t2.vendor_order_id = t1.orderid + WHERE t2.id IS NULL + ` + db := dao.GetDB() + var elmOrderList []*legacymodel2.Elemeorder + if err = dao.GetRows(db, &elmOrderList, sql); err != nil { + return "", err + } + + task := tasksch.NewParallelTask("TransferLegacyElmOrder", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetBatchSize(40), ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + var orderDetailList []*model.GoodsOrderOriginal + for _, v := range batchItemList { + elmOrder := v.(*legacymodel2.Elemeorder) + var detail map[string]interface{} + if err = utils.UnmarshalUseNumber([]byte(elmOrder.Data), &detail); err != nil { + return nil, err + } + orderDetail := &model.GoodsOrderOriginal{ + VendorOrderID: elmOrder.Orderid, + VendorID: model.VendorIDELM, + OrderCreatedAt: utils.Str2Time(detail["activeAt"].(string)), + OriginalData: elmOrder.Data, + } + orderDetailList = append(orderDetailList, orderDetail) + } + if len(orderDetailList) > 0 { + err = dao.CreateMultiEntities(db, orderDetailList) + } + return nil, err + }, elmOrderList) + tasksch.ManageTask(task).Run() + + if !isAsync { + _, err = task.GetResult(0) + } else { + hint = task.ID + } + return hint, err +} + +func saveJdOrderList(existJdIDMap map[string]int, jdOrderList []interface{}) (err error) { + var orderDetailList []*model.GoodsOrderOriginal + for _, v := range jdOrderList { + orderMap := v.(map[string]interface{}) + orderID := utils.Int64ToStr(utils.MustInterface2Int64(orderMap["orderId"])) + if existJdIDMap[orderID] == 0 { + orderDetail := &model.GoodsOrderOriginal{ + VendorOrderID: orderID, + VendorID: model.VendorIDJD, + OrderCreatedAt: utils.Str2Time(orderMap["orderPurchaseTime"].(string)), + OriginalData: string(utils.MustMarshal(orderMap)), + } + orderDetailList = append(orderDetailList, orderDetail) + } + } + if len(orderDetailList) > 0 { + // for _, v := range orderDetailList { + // globals.SugarLogger.Debug(v.VendorOrderID) + // } + // db := dao.GetDB() + // err = dao.CreateMultiEntities(db, orderDetailList) + } + return err +} + +func PullJdOrder(ctx *jxcontext.Context, fromTime, toTime time.Time, isAsync, isContinueWhenError bool) (hint string, err error) { + var existJdIDs []string + db := dao.GetDB() + if err = dao.GetRows(db, &existJdIDs, ` + SELECT vendor_order_id + FROM goods_order_original + WHERE vendor_id = ?`, model.VendorIDJD); err != nil { + return "", err + } + existJdIDMap := make(map[string]int) + for _, v := range existJdIDs { + existJdIDMap[v] = 1 + } + + pageSize := 100 + hours := int((toTime.Sub(fromTime)-time.Hour)/time.Hour + 1) + globals.SugarLogger.Debug(hours) + rootTask := tasksch.NewSeqTask("PullJdOrder", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { + subFromTime := fromTime.Add(time.Duration(step) * time.Hour) + subToTime := fromTime.Add(time.Duration(step+1) * time.Hour) + if subToTime.Sub(toTime) > 0 { + subToTime = toTime + } + if step < hours-1 { + subToTime = subToTime.Add(-1 * time.Second) // 减一秒 + } + commonParams := map[string]interface{}{ + "pageSize": pageSize, + "orderPurchaseTime_begin": utils.Time2Str(subFromTime), + "orderPurchaseTime_end": utils.Time2Str(subToTime), + } + orderList, totalCount, err := api.JdAPI.OrderQuery(utils.MergeMaps(commonParams, utils.Params2Map("pageNo", 1))) + if err != nil { + return "", err + } + if err = saveJdOrderList(existJdIDMap, orderList); err != nil { + return "", err + } + leftPageCount := (totalCount - 1) / pageSize + globals.SugarLogger.Debugf("subFromTime:%s, subToTime:%s, leftPageCount:%d, totalCount:%d", utils.Time2Str(subFromTime), utils.Time2Str(subToTime), leftPageCount, totalCount) + if leftPageCount > 0 { + pageNoList := make([]int, leftPageCount) + for k := range pageNoList { + pageNoList[k] = k + 2 + } + subTask := tasksch.NewParallelTask("PullJdOrder sub", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetParallelCount(10), ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + subOrderList, subTotalCount, err := api.JdAPI.OrderQuery(utils.MergeMaps(commonParams, utils.Params2Map("pageNo", batchItemList[0]))) + if err == nil { + if totalCount != subTotalCount { + globals.SugarLogger.Warnf("PullJdOrder totalCount:%d, subTotalCount:%d are not same", totalCount, subTotalCount) + } + err = saveJdOrderList(existJdIDMap, subOrderList) + } + return nil, err + }, pageNoList) + task.AddChild(subTask).Run() + _, err = subTask.GetResult(0) + } + return nil, err + }, hours) + tasksch.ManageTask(rootTask).Run() + + if !isAsync { + _, err = rootTask.GetResult(0) + } else { + hint = rootTask.ID + } + return hint, err +} diff --git a/business/model/legacymodel2/jdorder.go b/business/model/legacymodel2/jdorder.go index aeb4b0e91..874e8057a 100644 --- a/business/model/legacymodel2/jdorder.go +++ b/business/model/legacymodel2/jdorder.go @@ -7,6 +7,7 @@ type Jdorder struct { Data string `orm:"column(data);null"` Success int8 `orm:"column(success);null"` Jdorderid int64 `orm:"column(jdorderid);null;unique"` + VendorOrderID string `orm:"column(vendor_order_id);size(48);unique" json:"vendorOrderID"` Cityname string `orm:"column(cityname);size(20);null"` Orderstatus int `orm:"column(orderstatus);null"` Orderstatustime string `orm:"column(orderstatustime);size(50);null;index"` diff --git a/controllers/temp_op.go b/controllers/temp_op.go new file mode 100644 index 000000000..1cb30b958 --- /dev/null +++ b/controllers/temp_op.go @@ -0,0 +1,27 @@ +package controllers + +import ( + "git.rosy.net.cn/jx-callback/business/jxstore/initdata" + "git.rosy.net.cn/jx-callback/business/jxutils" +) + +// @Title 拉取京东订单补齐本地 +// @Description 拉取京东订单补齐本地 +// @Param token header string true "认证token" +// @Param fromTime formData string true "开始时间" +// @Param toTime formData string true "结束时间" +// @Param isAsync formData bool false "是否异步操作" +// @Param isContinueWhenError formData bool false "单个同步失败是否继续,缺省false" +// @Success 200 {object} controllers.CallResult +// @Failure 200 {object} controllers.CallResult +// @router /PullJdOrder [post] +func (c *InitDataController) PullJdOrder() { + c.callPullJdOrder(func(params *tInitdataPullJdOrderParams) (retVal interface{}, errCode string, err error) { + timeList, err := jxutils.BatchStr2Time(params.FromTime, params.ToTime) + if err != nil { + return retVal, "", err + } + retVal, err = initdata.PullJdOrder(params.Ctx, timeList[0], timeList[1], params.IsAsync, params.IsContinueWhenError) + return retVal, "", err + }) +} diff --git a/routers/commentsRouter_controllers.go b/routers/commentsRouter_controllers.go index b317672ed..16ddab0d8 100644 --- a/routers/commentsRouter_controllers.go +++ b/routers/commentsRouter_controllers.go @@ -207,6 +207,14 @@ func init() { MethodParams: param.Make(), Params: nil}) + beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:InitDataController"] = append(beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:InitDataController"], + beego.ControllerComments{ + Method: "PullJdOrder", + Router: `/PullJdOrder`, + AllowHTTPMethods: []string{"post"}, + MethodParams: param.Make(), + Params: nil}) + beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:MsgController"] = append(beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:MsgController"], beego.ControllerComments{ Method: "GetStoreMessageStatuses",