diff --git a/business/jxstore/initdata/temp_op.go b/business/jxstore/initdata/temp_op.go index 7c4c1cdd7..d29d4e81a 100644 --- a/business/jxstore/initdata/temp_op.go +++ b/business/jxstore/initdata/temp_op.go @@ -1,6 +1,7 @@ package initdata import ( + "math" "time" "git.rosy.net.cn/baseapi/platformapi/jdapi" @@ -16,98 +17,116 @@ import ( ) func TransferLegacyJdOrder(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) { - sql := ` - SELECT t1.* - FROM jdorder t1 - LEFT JOIN goods_order t2 ON t2.vendor_order_id = t1.vendor_order_id - WHERE t2.id IS NULL - ` - db := dao.GetDB() - var jdOrderList []*legacymodel2.Jdorder - if err = dao.GetRows(db, &jdOrderList, sql); err != nil { - return "", err - } - - task := tasksch.NewParallelTask("TransferLegacyJdOrder", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetBatchSize(40), ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { - var orderDetailList []*model.GoodsOrderOriginal - for _, v := range batchItemList { - jdOrder := v.(*legacymodel2.Jdorder) - var detail map[string]interface{} - if err = utils.UnmarshalUseNumber([]byte(jdOrder.Data), &detail); err != nil { - return nil, err - } - resultList := detail["result"].(map[string]interface{})["resultList"].([]interface{}) - if len(resultList) > 0 { - originalData := resultList[0].(map[string]interface{}) - orgCode := originalData["orgCode"].(string) - if orgCode == "320406" { - orderDetail := &model.GoodsOrderOriginal{ - VendorOrderID: jdOrder.VendorOrderID, - VendorID: model.VendorIDJD, - AccountNo: orgCode, - OrderCreatedAt: utils.Str2Time(originalData["orderPurchaseTime"].(string)), - OriginalData: string(utils.MustMarshal(originalData)), - } - orderDetailList = append(orderDetailList, orderDetail) - } - } + sqlBatchCount := 1000 + rootTask := tasksch.NewSeqTask("TransferLegacyJdOrder", ctx.GetUserName(), func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { + sql := ` + SELECT t1.* + FROM jdorder t1 + LEFT JOIN goods_order_original t2 ON t2.vendor_order_id = t1.vendor_order_id + WHERE t2.id IS NULL + LIMIT ? + ` + db := dao.GetDB() + var jdOrderList []*legacymodel2.Jdorder + if err = dao.GetRows(db, &jdOrderList, sql, sqlBatchCount); err != nil { + return nil, err } - if len(orderDetailList) > 0 { - err = dao.CreateMultiEntities(db, orderDetailList) + if len(jdOrderList) > 0 { + task := tasksch.NewParallelTask("TransferLegacyJdOrder2", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetBatchSize(40), ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + var orderDetailList []*model.GoodsOrderOriginal + for _, v := range batchItemList { + jdOrder := v.(*legacymodel2.Jdorder) + var detail map[string]interface{} + if err = utils.UnmarshalUseNumber([]byte(jdOrder.Data), &detail); err != nil { + return nil, err + } + resultList := detail["result"].(map[string]interface{})["resultList"].([]interface{}) + if len(resultList) > 0 { + originalData := resultList[0].(map[string]interface{}) + orgCode := originalData["orgCode"].(string) + if orgCode == "320406" { + orderDetail := &model.GoodsOrderOriginal{ + VendorOrderID: jdOrder.VendorOrderID, + VendorID: model.VendorIDJD, + AccountNo: orgCode, + OrderCreatedAt: utils.Str2Time(originalData["orderPurchaseTime"].(string)), + OriginalData: string(utils.MustMarshal(originalData)), + } + orderDetailList = append(orderDetailList, orderDetail) + } + } + } + if len(orderDetailList) > 0 { + err = dao.CreateMultiEntities(db, orderDetailList) + } + return nil, err + }, jdOrderList) + rootTask.AddChild(task).Run() + _, err = task.GetResult(0) + } else { + rootTask.Cancel() } return nil, err - }, jdOrderList) - tasksch.ManageTask(task).Run() - + }, math.MaxInt32) + tasksch.ManageTask(rootTask).Run() if !isAsync { - _, err = task.GetResult(0) + _, err = rootTask.GetResult(0) } else { - hint = task.ID + hint = rootTask.ID } return hint, err } func TransferLegacyElmOrder(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) { - sql := ` - SELECT t1.* - FROM elemeorder t1 - LEFT JOIN goods_order t2 ON t2.vendor_order_id = t1.orderid - WHERE t2.id IS NULL + sqlBatchCount := 1000 + rootTask := tasksch.NewSeqTask("TransferLegacyElmOrder", ctx.GetUserName(), func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { + sql := ` + SELECT t1.* + FROM elemeorder t1 + LEFT JOIN goods_order_original t2 ON t2.vendor_order_id = t1.orderid + WHERE t2.id IS NULL + LIMIT ? ` - db := dao.GetDB() - var elmOrderList []*legacymodel2.Elemeorder - if err = dao.GetRows(db, &elmOrderList, sql); err != nil { - return "", err - } - - task := tasksch.NewParallelTask("TransferLegacyElmOrder", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetBatchSize(40), ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { - var orderDetailList []*model.GoodsOrderOriginal - for _, v := range batchItemList { - elmOrder := v.(*legacymodel2.Elemeorder) - var detail map[string]interface{} - if err = utils.UnmarshalUseNumber([]byte(elmOrder.Data), &detail); err != nil { - return nil, err - } - orderDetail := &model.GoodsOrderOriginal{ - VendorOrderID: elmOrder.Orderid, - VendorID: model.VendorIDELM, - AccountNo: "fakeelm", - OrderCreatedAt: utils.Str2Time(detail["activeAt"].(string)), - OriginalData: elmOrder.Data, - } - orderDetailList = append(orderDetailList, orderDetail) + db := dao.GetDB() + var elmOrderList []*legacymodel2.Elemeorder + if err = dao.GetRows(db, &elmOrderList, sql, sqlBatchCount); err != nil { + return "", err } - if len(orderDetailList) > 0 { - err = dao.CreateMultiEntities(db, orderDetailList) + if len(elmOrderList) > 0 { + task := tasksch.NewParallelTask("TransferLegacyElmOrder2", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetBatchSize(40), ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + var orderDetailList []*model.GoodsOrderOriginal + for _, v := range batchItemList { + elmOrder := v.(*legacymodel2.Elemeorder) + var detail map[string]interface{} + if err = utils.UnmarshalUseNumber([]byte(elmOrder.Data), &detail); err != nil { + return nil, err + } + orderDetail := &model.GoodsOrderOriginal{ + VendorOrderID: elmOrder.Orderid, + VendorID: model.VendorIDELM, + AccountNo: "fakeelm", + OrderCreatedAt: utils.Str2Time(detail["activeAt"].(string)), + OriginalData: elmOrder.Data, + } + orderDetailList = append(orderDetailList, orderDetail) + } + if len(orderDetailList) > 0 { + err = dao.CreateMultiEntities(db, orderDetailList) + } + return nil, err + }, elmOrderList) + rootTask.AddChild(task).Run() + _, err = task.GetResult(0) + } else { + rootTask.Cancel() } return nil, err - }, elmOrderList) - tasksch.ManageTask(task).Run() - + }, math.MaxInt32) + tasksch.ManageTask(rootTask).Run() if !isAsync { - _, err = task.GetResult(0) + _, err = rootTask.GetResult(0) } else { - hint = task.ID + hint = rootTask.ID } return hint, err }