199 lines
6.7 KiB
Go
199 lines
6.7 KiB
Go
package initdata
|
|
|
|
import (
|
|
"time"
|
|
|
|
"git.rosy.net.cn/baseapi/utils"
|
|
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
|
|
"git.rosy.net.cn/jx-callback/business/jxutils/tasksch"
|
|
"git.rosy.net.cn/jx-callback/business/model"
|
|
"git.rosy.net.cn/jx-callback/business/model/dao"
|
|
"git.rosy.net.cn/jx-callback/business/model/legacymodel2"
|
|
"git.rosy.net.cn/jx-callback/globals"
|
|
"git.rosy.net.cn/jx-callback/globals/api"
|
|
)
|
|
|
|
func TransferLegacyJdOrder(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) {
|
|
sql := `
|
|
SELECT t1.*
|
|
FROM jdorder t1
|
|
LEFT JOIN goods_order t2 ON t2.vendor_order_id = t1.vendor_order_id
|
|
WHERE t2.id IS NULL
|
|
`
|
|
db := dao.GetDB()
|
|
var jdOrderList []*legacymodel2.Jdorder
|
|
if err = dao.GetRows(db, &jdOrderList, sql); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
task := tasksch.NewParallelTask("TransferLegacyJdOrder", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetBatchSize(40), ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
|
var orderDetailList []*model.GoodsOrderOriginal
|
|
for _, v := range batchItemList {
|
|
jdOrder := v.(*legacymodel2.Jdorder)
|
|
var detail map[string]interface{}
|
|
if err = utils.UnmarshalUseNumber([]byte(jdOrder.Data), &detail); err != nil {
|
|
return nil, err
|
|
}
|
|
resultList := detail["result"].(map[string]interface{})["resultList"].([]interface{})
|
|
if len(resultList) > 0 {
|
|
originalData := resultList[0].(map[string]interface{})
|
|
orgCode := originalData["orgCode"].(string)
|
|
if orgCode == "320406" {
|
|
orderDetail := &model.GoodsOrderOriginal{
|
|
VendorOrderID: jdOrder.VendorOrderID,
|
|
VendorID: model.VendorIDJD,
|
|
AccountNo: orgCode,
|
|
OrderCreatedAt: utils.Str2Time(originalData["orderPurchaseTime"].(string)),
|
|
OriginalData: string(utils.MustMarshal(originalData)),
|
|
}
|
|
orderDetailList = append(orderDetailList, orderDetail)
|
|
}
|
|
}
|
|
}
|
|
if len(orderDetailList) > 0 {
|
|
err = dao.CreateMultiEntities(db, orderDetailList)
|
|
}
|
|
return nil, err
|
|
}, jdOrderList)
|
|
tasksch.ManageTask(task).Run()
|
|
|
|
if !isAsync {
|
|
_, err = task.GetResult(0)
|
|
} else {
|
|
hint = task.ID
|
|
}
|
|
return hint, err
|
|
}
|
|
|
|
func TransferLegacyElmOrder(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) {
|
|
sql := `
|
|
SELECT t1.*
|
|
FROM elemeorder t1
|
|
LEFT JOIN goods_order t2 ON t2.vendor_order_id = t1.orderid
|
|
WHERE t2.id IS NULL
|
|
`
|
|
db := dao.GetDB()
|
|
var elmOrderList []*legacymodel2.Elemeorder
|
|
if err = dao.GetRows(db, &elmOrderList, sql); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
task := tasksch.NewParallelTask("TransferLegacyElmOrder", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetBatchSize(40), ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
|
var orderDetailList []*model.GoodsOrderOriginal
|
|
for _, v := range batchItemList {
|
|
elmOrder := v.(*legacymodel2.Elemeorder)
|
|
var detail map[string]interface{}
|
|
if err = utils.UnmarshalUseNumber([]byte(elmOrder.Data), &detail); err != nil {
|
|
return nil, err
|
|
}
|
|
orderDetail := &model.GoodsOrderOriginal{
|
|
VendorOrderID: elmOrder.Orderid,
|
|
VendorID: model.VendorIDELM,
|
|
AccountNo: "fakeelm",
|
|
OrderCreatedAt: utils.Str2Time(detail["activeAt"].(string)),
|
|
OriginalData: elmOrder.Data,
|
|
}
|
|
orderDetailList = append(orderDetailList, orderDetail)
|
|
}
|
|
if len(orderDetailList) > 0 {
|
|
err = dao.CreateMultiEntities(db, orderDetailList)
|
|
}
|
|
return nil, err
|
|
}, elmOrderList)
|
|
tasksch.ManageTask(task).Run()
|
|
|
|
if !isAsync {
|
|
_, err = task.GetResult(0)
|
|
} else {
|
|
hint = task.ID
|
|
}
|
|
return hint, err
|
|
}
|
|
|
|
func saveJdOrderList(existJdIDMap map[string]int, jdOrderList []interface{}) (err error) {
|
|
var orderDetailList []*model.GoodsOrderOriginal
|
|
for _, v := range jdOrderList {
|
|
orderMap := v.(map[string]interface{})
|
|
orderID := utils.Int64ToStr(utils.MustInterface2Int64(orderMap["orderId"]))
|
|
if existJdIDMap[orderID] == 0 {
|
|
orgCode := orderMap["orgCode"].(string)
|
|
orderDetail := &model.GoodsOrderOriginal{
|
|
VendorOrderID: orderID,
|
|
VendorID: model.VendorIDJD,
|
|
AccountNo: orgCode,
|
|
OrderCreatedAt: utils.Str2Time(orderMap["orderPurchaseTime"].(string)),
|
|
OriginalData: string(utils.MustMarshal(orderMap)),
|
|
}
|
|
orderDetailList = append(orderDetailList, orderDetail)
|
|
}
|
|
}
|
|
if len(orderDetailList) > 0 {
|
|
// for _, v := range orderDetailList {
|
|
// globals.SugarLogger.Debug(v.VendorOrderID)
|
|
// }
|
|
db := dao.GetDB()
|
|
err = dao.CreateMultiEntities(db, orderDetailList)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func PullJdOrder(ctx *jxcontext.Context, fromTime, toTime time.Time, isAsync, isContinueWhenError bool) (hint string, err error) {
|
|
var existJdIDs []string
|
|
db := dao.GetDB()
|
|
if err = dao.GetRows(db, &existJdIDs, `
|
|
SELECT vendor_order_id
|
|
FROM goods_order_original
|
|
WHERE vendor_id = ?`, model.VendorIDJD); err != nil {
|
|
return "", err
|
|
}
|
|
existJdIDMap := make(map[string]int)
|
|
for _, v := range existJdIDs {
|
|
existJdIDMap[v] = 1
|
|
}
|
|
|
|
pageSize := 100
|
|
hourGap := 1
|
|
gapCount := int((toTime.Sub(fromTime)-time.Hour*time.Duration(hourGap))/(time.Hour*time.Duration(hourGap)) + 1)
|
|
gapList := make([]int, gapCount)
|
|
for k := range gapList {
|
|
gapList[k] = k
|
|
|
|
}
|
|
rootTask := tasksch.NewParallelTask("PullJdOrder", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetParallelCount(20), ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
|
gapIndex := batchItemList[0].(int)
|
|
hourIndex := gapIndex * hourGap
|
|
subFromTime := fromTime.Add(time.Duration(hourIndex) * time.Hour)
|
|
subToTime := fromTime.Add(time.Duration(hourIndex+hourGap) * time.Hour)
|
|
if subToTime.Sub(toTime) > 0 {
|
|
subToTime = toTime
|
|
}
|
|
if gapIndex < gapCount-1 {
|
|
subToTime = subToTime.Add(-1 * time.Second) // 减一秒
|
|
}
|
|
commonParams := map[string]interface{}{
|
|
"pageSize": pageSize,
|
|
"orderPurchaseTime_begin": utils.Time2Str(subFromTime),
|
|
"orderPurchaseTime_end": utils.Time2Str(subToTime),
|
|
}
|
|
orderList, totalCount, err := api.JdAPI.OrderQuery(utils.MergeMaps(commonParams, utils.Params2Map("pageNo", 0)))
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if err = saveJdOrderList(existJdIDMap, orderList); err != nil {
|
|
return "", err
|
|
}
|
|
if false {
|
|
globals.SugarLogger.Debugf("subFromTime:%s, subToTime:%s, totalCount:%d, len(orderList):%d", utils.Time2Str(subFromTime), utils.Time2Str(subToTime), totalCount, len(orderList))
|
|
}
|
|
return nil, err
|
|
}, gapList)
|
|
tasksch.ManageTask(rootTask).Run()
|
|
|
|
if !isAsync {
|
|
_, err = rootTask.GetResult(0)
|
|
} else {
|
|
hint = rootTask.ID
|
|
}
|
|
return hint, err
|
|
}
|