Files
jx-callback/business/jxstore/initdata/temp_op.go

308 lines
11 KiB
Go

package initdata
import (
"math"
"runtime"
"time"
"git.rosy.net.cn/baseapi/platformapi/jdapi"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxstore/cms"
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
"git.rosy.net.cn/jx-callback/business/jxutils/tasksch"
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/model/dao"
"git.rosy.net.cn/jx-callback/business/model/legacymodel2"
"git.rosy.net.cn/jx-callback/globals"
"git.rosy.net.cn/jx-callback/globals/api"
)
func TransferLegacyJdOrder(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) {
sqlBatchCount := 1000
rootTask := tasksch.NewSeqTask("TransferLegacyJdOrder", ctx.GetUserName(), func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
sql := `
SELECT t1.*
FROM jdorder t1
LEFT JOIN goods_order_original t2 ON t2.vendor_order_id = t1.vendor_order_id
WHERE t2.id IS NULL
LIMIT ?
`
db := dao.GetDB()
var jdOrderList []*legacymodel2.Jdorder
if err = dao.GetRows(db, &jdOrderList, sql, sqlBatchCount); err != nil {
return nil, err
}
if len(jdOrderList) > 0 {
task := tasksch.NewParallelTask("TransferLegacyJdOrder2", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetBatchSize(40), ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
var orderDetailList []*model.GoodsOrderOriginal
for _, v := range batchItemList {
jdOrder := v.(*legacymodel2.Jdorder)
var detail map[string]interface{}
if err = utils.UnmarshalUseNumber([]byte(jdOrder.Data), &detail); err != nil {
return nil, err
}
resultList := detail["result"].(map[string]interface{})["resultList"].([]interface{})
if len(resultList) > 0 {
originalData := resultList[0].(map[string]interface{})
orgCode := originalData["orgCode"].(string)
if orgCode == "320406" {
orderDetail := &model.GoodsOrderOriginal{
VendorOrderID: jdOrder.VendorOrderID,
VendorID: model.VendorIDJD,
AccountNo: orgCode,
OrderCreatedAt: utils.Str2Time(originalData["orderPurchaseTime"].(string)),
OriginalData: string(utils.MustMarshal(originalData)),
}
orderDetailList = append(orderDetailList, orderDetail)
}
}
}
if len(orderDetailList) > 0 {
err = dao.CreateMultiEntities(db, orderDetailList)
}
return nil, err
}, jdOrderList)
rootTask.AddChild(task).Run()
_, err = task.GetResult(0)
runtime.GC()
} else {
rootTask.Cancel()
}
return nil, err
}, math.MaxInt32)
tasksch.ManageTask(rootTask).Run()
if !isAsync {
_, err = rootTask.GetResult(0)
} else {
hint = rootTask.ID
}
return hint, err
}
func TransferLegacyElmOrder(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) {
sqlBatchCount := 1000
rootTask := tasksch.NewSeqTask("TransferLegacyElmOrder", ctx.GetUserName(), func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
sql := `
SELECT t1.*
FROM elemeorder t1
LEFT JOIN goods_order_original t2 ON t2.vendor_order_id = t1.orderid
WHERE t2.id IS NULL
LIMIT ?
`
db := dao.GetDB()
var elmOrderList []*legacymodel2.Elemeorder
if err = dao.GetRows(db, &elmOrderList, sql, sqlBatchCount); err != nil {
return "", err
}
if len(elmOrderList) > 0 {
task := tasksch.NewParallelTask("TransferLegacyElmOrder2", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetBatchSize(40), ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
var orderDetailList []*model.GoodsOrderOriginal
for _, v := range batchItemList {
elmOrder := v.(*legacymodel2.Elemeorder)
var detail map[string]interface{}
if err = utils.UnmarshalUseNumber([]byte(elmOrder.Data), &detail); err != nil {
return nil, err
}
orderDetail := &model.GoodsOrderOriginal{
VendorOrderID: elmOrder.Orderid,
VendorID: model.VendorIDELM,
AccountNo: "fakeelm",
OrderCreatedAt: utils.Str2Time(detail["activeAt"].(string)),
OriginalData: elmOrder.Data,
}
orderDetailList = append(orderDetailList, orderDetail)
}
if len(orderDetailList) > 0 {
err = dao.CreateMultiEntities(db, orderDetailList)
}
return nil, err
}, elmOrderList)
rootTask.AddChild(task).Run()
_, err = task.GetResult(0)
runtime.GC()
} else {
rootTask.Cancel()
}
return nil, err
}, math.MaxInt32)
tasksch.ManageTask(rootTask).Run()
if !isAsync {
_, err = rootTask.GetResult(0)
} else {
hint = rootTask.ID
}
return hint, err
}
func saveJdOrderList(existJdIDMap map[string]int, jdOrderList []interface{}, jdStoreOrderList []map[string]interface{}) (err error) {
var storeOrderList []*model.TempGoodsOrderMobile
for _, v := range jdStoreOrderList {
order := &model.TempGoodsOrderMobile{
VendorOrderID: v["orderId"].(string),
VendorID: model.VendorIDJD,
AccountNo: v["venderId"].(string),
OrderCreatedAt: utils.Timestamp2Time(utils.MustInterface2Int64(v["orderPaidTime"]) / 1000),
Mobile: v["mobile"].(string),
}
storeOrderList = append(storeOrderList, order)
}
var orderDetailList []*model.GoodsOrderOriginal
for _, v := range jdOrderList {
orderMap := v.(map[string]interface{})
orderID := utils.Int64ToStr(utils.MustInterface2Int64(orderMap["orderId"]))
if existJdIDMap[orderID] == 0 {
orgCode := orderMap["orgCode"].(string)
orderDetail := &model.GoodsOrderOriginal{
VendorOrderID: orderID,
VendorID: model.VendorIDJD,
AccountNo: orgCode,
OrderCreatedAt: utils.Str2Time(orderMap["orderPurchaseTime"].(string)),
OriginalData: string(utils.MustMarshal(orderMap)),
}
orderDetailList = append(orderDetailList, orderDetail)
}
}
if len(orderDetailList) > 0 || len(storeOrderList) > 0 {
db := dao.GetDB()
if len(orderDetailList) > 0 {
// err = dao.CreateMultiEntities(db, orderDetailList)
for _, v := range orderDetailList {
dao.CreateEntity(db, v)
}
}
if len(storeOrderList) > 0 {
// err = dao.CreateMultiEntities(db, storeOrderList)
for _, v := range storeOrderList {
dao.CreateEntity(db, v)
}
}
}
return err
}
func PullJdOrder(ctx *jxcontext.Context, fromTime, toTime time.Time, isAsync, isContinueWhenError bool) (hint string, err error) {
var existJdIDs []string
db := dao.GetDB()
if err = dao.GetRows(db, &existJdIDs, `
SELECT vendor_order_id
FROM goods_order_original
WHERE vendor_id = ?`, model.VendorIDJD); err != nil {
return "", err
}
existJdIDMap := make(map[string]int)
for _, v := range existJdIDs {
existJdIDMap[v] = 1
}
pageSize := 100
hourGap := 1
gapCount := int((toTime.Sub(fromTime)-time.Hour*time.Duration(hourGap))/(time.Hour*time.Duration(hourGap)) + 1)
if gapCount <= 0 {
gapCount = 1
}
gapList := make([]int, gapCount)
for k := range gapList {
gapList[k] = k
}
rootTask := tasksch.NewParallelTask("PullJdOrder", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetParallelCount(20), ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
gapIndex := batchItemList[0].(int)
hourIndex := gapIndex * hourGap
subFromTime := fromTime.Add(time.Duration(hourIndex) * time.Hour)
subToTime := fromTime.Add(time.Duration(hourIndex+hourGap) * time.Hour)
if subToTime.Sub(toTime) > 0 {
subToTime = toTime
}
if true { //gapIndex < gapCount-1 {
subToTime = subToTime.Add(-1 * time.Second) // 减一秒
}
commonParams := map[string]interface{}{
jdapi.KeyPageSize: pageSize,
"orderPurchaseTime_begin": utils.Time2Str(subFromTime),
"orderPurchaseTime_end": utils.Time2Str(subToTime),
}
// globals.SugarLogger.Debugf("PullJdOrder, commonParams=%s", utils.Format4Output(commonParams, false))
orderList, totalCount, err := api.JdAPI.OrderQuery(commonParams)
if err != nil {
return "", err
}
storeOrderList, err := api.JdAPI.GetStoreOrderInfoList(utils.Time2Str(subFromTime), utils.Time2Str(subToTime))
if err != nil {
}
if err = saveJdOrderList(existJdIDMap, orderList, storeOrderList); err != nil {
return "", err
}
if false {
globals.SugarLogger.Debugf("subFromTime:%s, subToTime:%s, totalCount:%d, len(orderList):%d", utils.Time2Str(subFromTime), utils.Time2Str(subToTime), totalCount, len(orderList))
}
return nil, err
}, gapList)
tasksch.ManageTask(rootTask).Run()
if !isAsync {
_, err = rootTask.GetResult(0)
} else {
hint = rootTask.ID
}
return hint, err
}
func UpdateJdOrderRealMobile(ctx *jxcontext.Context, fromTime, toTime time.Time, isAsync, isContinueWhenError bool) (hint string, err error) {
return hint, err
}
func DeleteWrongSpu(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) {
// sql := `
// SELECT t1.*
// FROM sku_name t1
// JOIN sku_name t2 ON t2.link_id = t1.id AND t2.deleted_at = ?
// WHERE t1.deleted_at = ?;
// `
// db := dao.GetDB()
// var skuNameList []*model.SkuName
// if err = dao.GetRows(db, &skuNameList, sql, utils.DefaultTimeValue, utils.DefaultTimeValue); err != nil {
// return "", err
// }
// rootTask := tasksch.NewSeqTask("DeleteWrongSpu", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
// _, err = cms.DeleteSkuName(ctx, skuNameList[step].ID, ctx.GetUserName())
// if err != nil {
// globals.SugarLogger.Debugf("DeleteWrongSpu failed nameid:%d, name:%s, with error:%v", skuNameList[step].ID, skuNameList[step].Name, err)
// err = nil // 强制忽略错误
// }
// return nil, err
// }, len(skuNameList))
sql := `
SELECT t1.*
FROM sku_name t1
WHERE t1.deleted_at = ? AND t1.is_spu = 1 AND t1.jd_id > 0
ORDER BY t1.id
`
db := dao.GetDB()
var skuNameList []*model.SkuName
if err = dao.GetRows(db, &skuNameList, sql, utils.DefaultTimeValue); err != nil {
return "", err
}
rootTask := tasksch.NewSeqTask("DeleteWrongSpu", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
mapData := map[string]interface{}{
"name": skuNameList[step].Name,
}
_, err = cms.UpdateSkuName(ctx, skuNameList[step].ID, mapData, ctx.GetUserName())
if err != nil {
globals.SugarLogger.Debugf("DeleteWrongSpu failed nameid:%d, name:%s, with error:%v", skuNameList[step].ID, skuNameList[step].Name, err)
err = nil // 强制忽略错误
}
return nil, err
}, len(skuNameList))
tasksch.ManageTask(rootTask).Run()
if !isAsync {
_, err = rootTask.GetResult(0)
} else {
hint = rootTask.ID
}
return hint, err
}