- avoid memory overflow 4 func TransferLegacyJdOrder and TransferLegacyElmOrder

This commit is contained in:
gazebo
2019-01-12 17:06:53 +08:00
parent b7c1e555e9
commit 849653980f

View File

@@ -1,6 +1,7 @@
package initdata package initdata
import ( import (
"math"
"time" "time"
"git.rosy.net.cn/baseapi/platformapi/jdapi" "git.rosy.net.cn/baseapi/platformapi/jdapi"
@@ -16,98 +17,116 @@ import (
) )
func TransferLegacyJdOrder(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) { func TransferLegacyJdOrder(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) {
sql := ` sqlBatchCount := 1000
SELECT t1.* rootTask := tasksch.NewSeqTask("TransferLegacyJdOrder", ctx.GetUserName(), func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
FROM jdorder t1 sql := `
LEFT JOIN goods_order t2 ON t2.vendor_order_id = t1.vendor_order_id SELECT t1.*
WHERE t2.id IS NULL FROM jdorder t1
` LEFT JOIN goods_order_original t2 ON t2.vendor_order_id = t1.vendor_order_id
db := dao.GetDB() WHERE t2.id IS NULL
var jdOrderList []*legacymodel2.Jdorder LIMIT ?
if err = dao.GetRows(db, &jdOrderList, sql); err != nil { `
return "", err db := dao.GetDB()
} var jdOrderList []*legacymodel2.Jdorder
if err = dao.GetRows(db, &jdOrderList, sql, sqlBatchCount); err != nil {
task := tasksch.NewParallelTask("TransferLegacyJdOrder", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetBatchSize(40), ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { return nil, err
var orderDetailList []*model.GoodsOrderOriginal
for _, v := range batchItemList {
jdOrder := v.(*legacymodel2.Jdorder)
var detail map[string]interface{}
if err = utils.UnmarshalUseNumber([]byte(jdOrder.Data), &detail); err != nil {
return nil, err
}
resultList := detail["result"].(map[string]interface{})["resultList"].([]interface{})
if len(resultList) > 0 {
originalData := resultList[0].(map[string]interface{})
orgCode := originalData["orgCode"].(string)
if orgCode == "320406" {
orderDetail := &model.GoodsOrderOriginal{
VendorOrderID: jdOrder.VendorOrderID,
VendorID: model.VendorIDJD,
AccountNo: orgCode,
OrderCreatedAt: utils.Str2Time(originalData["orderPurchaseTime"].(string)),
OriginalData: string(utils.MustMarshal(originalData)),
}
orderDetailList = append(orderDetailList, orderDetail)
}
}
} }
if len(orderDetailList) > 0 { if len(jdOrderList) > 0 {
err = dao.CreateMultiEntities(db, orderDetailList) task := tasksch.NewParallelTask("TransferLegacyJdOrder2", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetBatchSize(40), ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
var orderDetailList []*model.GoodsOrderOriginal
for _, v := range batchItemList {
jdOrder := v.(*legacymodel2.Jdorder)
var detail map[string]interface{}
if err = utils.UnmarshalUseNumber([]byte(jdOrder.Data), &detail); err != nil {
return nil, err
}
resultList := detail["result"].(map[string]interface{})["resultList"].([]interface{})
if len(resultList) > 0 {
originalData := resultList[0].(map[string]interface{})
orgCode := originalData["orgCode"].(string)
if orgCode == "320406" {
orderDetail := &model.GoodsOrderOriginal{
VendorOrderID: jdOrder.VendorOrderID,
VendorID: model.VendorIDJD,
AccountNo: orgCode,
OrderCreatedAt: utils.Str2Time(originalData["orderPurchaseTime"].(string)),
OriginalData: string(utils.MustMarshal(originalData)),
}
orderDetailList = append(orderDetailList, orderDetail)
}
}
}
if len(orderDetailList) > 0 {
err = dao.CreateMultiEntities(db, orderDetailList)
}
return nil, err
}, jdOrderList)
rootTask.AddChild(task).Run()
_, err = task.GetResult(0)
} else {
rootTask.Cancel()
} }
return nil, err return nil, err
}, jdOrderList) }, math.MaxInt32)
tasksch.ManageTask(task).Run() tasksch.ManageTask(rootTask).Run()
if !isAsync { if !isAsync {
_, err = task.GetResult(0) _, err = rootTask.GetResult(0)
} else { } else {
hint = task.ID hint = rootTask.ID
} }
return hint, err return hint, err
} }
func TransferLegacyElmOrder(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) { func TransferLegacyElmOrder(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) {
sql := ` sqlBatchCount := 1000
SELECT t1.* rootTask := tasksch.NewSeqTask("TransferLegacyElmOrder", ctx.GetUserName(), func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
FROM elemeorder t1 sql := `
LEFT JOIN goods_order t2 ON t2.vendor_order_id = t1.orderid SELECT t1.*
WHERE t2.id IS NULL FROM elemeorder t1
LEFT JOIN goods_order_original t2 ON t2.vendor_order_id = t1.orderid
WHERE t2.id IS NULL
LIMIT ?
` `
db := dao.GetDB() db := dao.GetDB()
var elmOrderList []*legacymodel2.Elemeorder var elmOrderList []*legacymodel2.Elemeorder
if err = dao.GetRows(db, &elmOrderList, sql); err != nil { if err = dao.GetRows(db, &elmOrderList, sql, sqlBatchCount); err != nil {
return "", err return "", err
}
task := tasksch.NewParallelTask("TransferLegacyElmOrder", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetBatchSize(40), ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
var orderDetailList []*model.GoodsOrderOriginal
for _, v := range batchItemList {
elmOrder := v.(*legacymodel2.Elemeorder)
var detail map[string]interface{}
if err = utils.UnmarshalUseNumber([]byte(elmOrder.Data), &detail); err != nil {
return nil, err
}
orderDetail := &model.GoodsOrderOriginal{
VendorOrderID: elmOrder.Orderid,
VendorID: model.VendorIDELM,
AccountNo: "fakeelm",
OrderCreatedAt: utils.Str2Time(detail["activeAt"].(string)),
OriginalData: elmOrder.Data,
}
orderDetailList = append(orderDetailList, orderDetail)
} }
if len(orderDetailList) > 0 { if len(elmOrderList) > 0 {
err = dao.CreateMultiEntities(db, orderDetailList) task := tasksch.NewParallelTask("TransferLegacyElmOrder2", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetBatchSize(40), ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
var orderDetailList []*model.GoodsOrderOriginal
for _, v := range batchItemList {
elmOrder := v.(*legacymodel2.Elemeorder)
var detail map[string]interface{}
if err = utils.UnmarshalUseNumber([]byte(elmOrder.Data), &detail); err != nil {
return nil, err
}
orderDetail := &model.GoodsOrderOriginal{
VendorOrderID: elmOrder.Orderid,
VendorID: model.VendorIDELM,
AccountNo: "fakeelm",
OrderCreatedAt: utils.Str2Time(detail["activeAt"].(string)),
OriginalData: elmOrder.Data,
}
orderDetailList = append(orderDetailList, orderDetail)
}
if len(orderDetailList) > 0 {
err = dao.CreateMultiEntities(db, orderDetailList)
}
return nil, err
}, elmOrderList)
rootTask.AddChild(task).Run()
_, err = task.GetResult(0)
} else {
rootTask.Cancel()
} }
return nil, err return nil, err
}, elmOrderList) }, math.MaxInt32)
tasksch.ManageTask(task).Run() tasksch.ManageTask(rootTask).Run()
if !isAsync { if !isAsync {
_, err = task.GetResult(0) _, err = rootTask.GetResult(0)
} else { } else {
hint = task.ID hint = rootTask.ID
} }
return hint, err return hint, err
} }