Files
jx-callback/business/jxstore/tempop/tempop.go
2019-04-17 14:42:40 +08:00

959 lines
33 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package tempop
import (
"fmt"
"math"
"regexp"
"strings"
"sync"
"time"
"git.rosy.net.cn/baseapi/platformapi/jdapi"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxcallback/orderman"
"git.rosy.net.cn/jx-callback/business/jxstore/cms"
"git.rosy.net.cn/jx-callback/business/jxutils"
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
"git.rosy.net.cn/jx-callback/business/jxutils/tasksch"
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/model/dao"
"git.rosy.net.cn/jx-callback/business/model/legacymodel"
"git.rosy.net.cn/jx-callback/business/model/legacymodel2"
"git.rosy.net.cn/jx-callback/business/partner"
"git.rosy.net.cn/jx-callback/business/partner/purchase/ebai"
"git.rosy.net.cn/jx-callback/globals"
"git.rosy.net.cn/jx-callback/globals/api"
"github.com/astaxie/beego/orm"
)
var innerDataPat *regexp.Regexp
func init() {
innerDataPat = regexp.MustCompile(`"result":(.*),"code":200`)
}
func Convert2JDSPU(ctx *jxcontext.Context, count int, isAsync, isContinueWhenError bool) (hint string, err error) {
sql := `
SELECT t1.*
FROM sku_name t1
LEFT JOIN sku_name t2 ON t2.link_id = t1.id AND t2.deleted_at = ?
WHERE t1.deleted_at = ? AND t1.status <> ? AND t1.is_spu = 0/* AND t1.unit = '份'*/
AND t2.id IS NULL
ORDER BY t1.id
`
if count > 0 {
sql += " LIMIT " + utils.Int2Str(count)
}
sqlParams := []interface{}{
utils.DefaultTimeValue,
utils.DefaultTimeValue,
model.SkuStatusDeleted,
}
db := dao.GetDB()
var skuNameList []*model.SkuName
if err = dao.GetRows(db, &skuNameList, sql, sqlParams...); err != nil {
return "", err
}
for _, skuName := range skuNameList {
sql = `
SELECT *
FROM sku
WHERE name_id = ? AND deleted_at = ? AND status = ?;
`
sqlParams := []interface{}{
skuName.ID,
utils.DefaultTimeValue,
model.SkuStatusNormal,
}
var skuList []*model.Sku
if err = dao.GetRows(db, &skuList, sql, sqlParams...); err != nil {
return "", err
}
sql = `
SELECT t1.*
FROM sku_name_place_bind t1
WHERE t1.name_id = ?
`
sqlParams = []interface{}{
skuName.ID,
}
var skuNamePlaceBindList []*model.SkuNamePlaceBind
if err = dao.GetRows(db, &skuNamePlaceBindList, sql, sqlParams...); err != nil {
return "", err
}
globals.SugarLogger.Debugf("Convert2JDSPU, skuName:%s, skuCount:%d", skuName.Name, len(skuList))
dao.Begin(db)
skuNameNew2 := *skuName
skuNameNew := &skuNameNew2
dao.WrapAddIDCULEntity(skuNameNew, ctx.GetUserName())
skuNameNew.JdID = 0
skuNameNew.LinkID = skuName.ID
skuNameNew.IsSpu = 1
skuNameNew.JdSyncStatus = model.SyncFlagNewMask
// skuNameNew.Status = model.SkuStatusDontSale
if err = dao.CreateEntity(db, skuNameNew); err != nil {
dao.Rollback(db)
return "", err
}
if len(skuList) > 0 {
for _, sku := range skuList {
skuNew2 := *sku
skuNew := &skuNew2
dao.WrapAddIDCULEntity(skuNew, ctx.GetUserName())
skuNew.JdID = 0 //jxutils.GenFakeID()
skuNew.LinkID = sku.ID
skuNew.NameID = skuNameNew.ID
skuNew.JdSyncStatus = model.SyncFlagNewMask
if skuNameNew.Status == model.SkuStatusDontSale {
skuNew.Status = model.SkuStatusDontSale
}
globals.SugarLogger.Debugf("Convert2JDSPU, sku:%s", utils.Format4Output(skuNew, false))
if err = dao.CreateEntity(db, skuNew); err != nil {
dao.Rollback(db)
return "", err
}
}
for _, placeBind := range skuNamePlaceBindList {
dao.WrapAddIDCULEntity(placeBind, ctx.GetUserName())
placeBind.NameID = skuNameNew.ID
globals.SugarLogger.Debugf("Convert2JDSPU, placeBind:%s", utils.Format4Output(placeBind, false))
if err = dao.CreateEntity(db, placeBind); err != nil {
dao.Rollback(db)
return "", err
}
}
}
dao.Commit(db)
}
sql = `
SELECT DISTINCT t1.*
FROM sku_name t1
JOIN sku t2 ON t1.id = t2.name_id AND t2.jd_sync_status <> 0 AND t2.deleted_at = ?
WHERE t1.link_id > 0;
`
skuNameList = []*model.SkuName{}
if err = dao.GetRows(db, &skuNameList, sql, utils.DefaultTimeValue); err != nil {
return "", err
}
rootTask := tasksch.NewParallelTask("Convert2JDSPU", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
skuName := batchItemList[0].(*model.SkuName)
_, err = cms.CurVendorSync.SyncSku(ctx, db, skuName.ID, -1, false, isContinueWhenError, ctx.GetUserName())
return nil, err
}, skuNameList)
tasksch.ManageTask(rootTask).Run()
if !isAsync {
_, err = rootTask.GetResult(0)
} else {
hint = rootTask.ID
}
return hint, err
}
func Change2JDSPU4Store(ctx *jxcontext.Context, storeIDs []int, step int, isAsync, isContinueWhenError bool) (hint string, err error) {
db := dao.GetDB()
if len(storeIDs) == 0 {
if err = dao.GetRows(db, &storeIDs, `
SELECT t1.id
FROM store t1
JOIN store_map t2 ON t2.store_id = t1.id AND t2.vendor_id = 0 AND t2.deleted_at = ? AND t2.status <> ?
WHERE t1.deleted_at = ? AND t1.status <> ? /* AND t1.city_code IN (110100, 120100, 440100, 440300, 510100)*/
`, utils.DefaultTimeValue, model.StoreStatusDisabled, utils.DefaultTimeValue, model.StoreStatusDisabled); err != nil {
return "", err
}
}
var sql string
var sqlParams []interface{}
dao.Begin(db)
defer dao.Rollback(db)
if step == 1 {
sql = `
DELETE t1
FROM store_sku_bind t1
JOIN sku t2 ON t2.id = t1.sku_id AND t2.link_id > 0
WHERE 1 = 1
`
sqlParams = []interface{}{}
if len(storeIDs) > 0 {
sql += " AND store_id IN (" + dao.GenQuestionMarks(len(storeIDs)) + ")"
sqlParams = append(sqlParams, storeIDs)
}
if _, err = dao.ExecuteSQL(db, sql, sqlParams...); err != nil {
return "", err
}
sql = `
INSERT INTO store_sku_bind(created_at, updated_at, last_operator, deleted_at, store_id, sku_id, price, unit_price, status, ebai_id, mtwm_id, jd_sync_status, ebai_sync_status, mtwm_sync_status)
SELECT NOW(), NOW(), ?, ?, t1.store_id, t2.id, t1.price, t1.unit_price, t1.status , 0, 0, ?, ?, ?
FROM store_sku_bind t1
JOIN sku t2 ON t2.link_id = t1.sku_id AND t2.deleted_at = ?
JOIN store t3 ON t3.id = t1.store_id
JOIN sku_name t4 ON t4.id = t2.name_id
LEFT JOIN sku_name_place_bind t5 ON t5.place_code = t3.city_code AND t5.name_id = t4.id
WHERE t1.deleted_at = ? AND (t4.is_global = 1 OR t5.id IS NOT NULL) AND t1.price > 0
`
sqlParams = []interface{}{
ctx.GetUserName(),
utils.DefaultTimeValue,
// model.SkuStatusDontSale,
model.SyncFlagNewMask,
0, //model.SyncFlagNewMask,
0, //model.SyncFlagNewMask,
utils.DefaultTimeValue,
utils.DefaultTimeValue,
}
} else if step == 2 {
sql = `
SELECT COUNT(*) ct
FROM store_sku_bind t1
JOIN sku t2 ON t2.id = t1.sku_id AND t2.link_id > 0
WHERE 1 = 1
`
sqlParams = []interface{}{}
if len(storeIDs) > 0 {
sql += " AND store_id IN (" + dao.GenQuestionMarks(len(storeIDs)) + ")"
sqlParams = append(sqlParams, storeIDs)
}
ct := 0
if err = dao.GetRow(db, &ct, sql, sqlParams...); err != nil {
return "", err
}
if ct == 0 {
return "", fmt.Errorf("%s看起来还没有执行《将转化的SPU在门店上架》", utils.Format4Output(storeIDs, true))
}
sql = `
UPDATE store_sku_bind t1
JOIN sku t2 ON t2.link_id = t1.sku_id
SET t1.status = 0,
t1.jd_sync_status = ?
WHERE t1.deleted_at = ?
`
sqlParams = []interface{}{
model.SyncFlagSaleMask | model.SyncFlagModifiedMask,
utils.DefaultTimeValue,
}
} else {
return "", fmt.Errorf("非法的step")
}
if len(storeIDs) > 0 {
sql += " AND t1.store_id IN (" + dao.GenQuestionMarks(len(storeIDs)) + ")"
sqlParams = append(sqlParams, storeIDs)
}
globals.SugarLogger.Debug(sql)
globals.SugarLogger.Debug(utils.Format4Output(sqlParams, false))
var num int64
if num, err = dao.ExecuteSQL(db, sql, sqlParams...); err != nil {
return "", err
}
globals.SugarLogger.Debug(num)
dao.Commit(db)
var skuIDs []int
if step == 1 {
sql = `
SELECT id
FROM sku t1
WHERE t1.link_id > 0 AND t1.deleted_at = ?
`
sqlParams = []interface{}{
utils.DefaultTimeValue,
}
} else if step == 2 {
sql = `
SELECT t1.link_id
FROM sku t1
WHERE t1.link_id > 0 AND t1.deleted_at = ?
`
sqlParams = []interface{}{
utils.DefaultTimeValue,
}
}
if err = dao.GetRows(db, &skuIDs, sql, sqlParams...); err != nil {
return "", err
}
hint, err = cms.CurVendorSync.SyncStoresSkus(ctx, db, []int{model.VendorIDJD}, storeIDs, skuIDs, isAsync, isContinueWhenError)
return hint, err
}
func TransferLegacyJdOrder(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) {
sqlBatchCount := 1000
rootTask := tasksch.NewSeqTask("TransferLegacyJdOrder", ctx,
func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
sql := `
SELECT t1.*
FROM jdorder t1
LEFT JOIN goods_order_original t2 ON t2.vendor_order_id = t1.vendor_order_id
WHERE t2.id IS NULL AND LENGTH(t1.data) > 10 AND t1.cityname = 'all'
ORDER BY t1.orderstatustime
LIMIT ?
`
db := dao.GetDB()
var jdOrderList []*legacymodel2.Jdorder
if err = dao.GetRows(db, &jdOrderList, sql, sqlBatchCount); err != nil {
return nil, err
}
if len(jdOrderList) > 0 {
task := tasksch.NewParallelTask("TransferLegacyJdOrder2", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetBatchSize(40), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
var orderDetailList []*model.GoodsOrderOriginal
for _, v := range batchItemList {
jdOrder := v.(*legacymodel2.Jdorder)
var detail map[string]interface{}
if err = utils.UnmarshalUseNumber([]byte(strings.Replace(strings.Replace(jdOrder.Data, "\n", "", -1), "\r", "", -1)), &detail); err != nil {
return nil, err
}
resultList := detail["result"].(map[string]interface{})["resultList"].([]interface{})
if len(resultList) > 0 {
originalData := resultList[0].(map[string]interface{})
orgCode := originalData["orgCode"].(string)
if orgCode == "320406" {
orderPurchaseTime := utils.Interface2String(originalData["orderPurchaseTime"])
if orderPurchaseTime == "" {
globals.SugarLogger.Debugf("TransferLegacyJdOrder abnormal order:%s", jdOrder.VendorOrderID)
orderPurchaseTime = utils.Interface2String(originalData["orderStartTime"])
}
if orderPurchaseTime != "" {
orderDetail := &model.GoodsOrderOriginal{
VendorOrderID: jdOrder.VendorOrderID,
VendorID: model.VendorIDJD,
AccountNo: orgCode,
OrderCreatedAt: utils.Str2Time(orderPurchaseTime),
OriginalData: string(utils.MustMarshal(originalData)),
}
orderDetailList = append(orderDetailList, orderDetail)
} else {
globals.SugarLogger.Debugf("TransferLegacyJdOrder abnormal2 order:%s", jdOrder.VendorOrderID)
}
}
}
}
if len(orderDetailList) > 0 {
err = dao.CreateMultiEntities(db, orderDetailList)
}
return nil, err
}, jdOrderList)
// rootTask.AddChild(task).Run()
task.Run()
_, err = task.GetResult(0)
} else {
rootTask.Cancel()
}
return nil, err
}, math.MaxInt32)
tasksch.ManageTask(rootTask).Run()
if !isAsync {
_, err = rootTask.GetResult(0)
} else {
hint = rootTask.ID
}
return hint, err
}
func TransferLegacyElmOrder(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) {
// sqlBatchCount := 1000
// rootTask := tasksch.NewSeqTask("TransferLegacyElmOrder", ctx.GetUserName(), func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
// sql := `
// SELECT t1.*
// FROM elemeorder t1
// LEFT JOIN goods_order_original t2 ON t2.vendor_order_id = t1.orderid
// WHERE t2.id IS NULL AND LENGTH(t1.data) > 10
// ORDER BY t1.order_created_at
// LIMIT ?
// `
// db := dao.GetDB()
// var elmOrderList []*legacymodel2.Elemeorder
// if err = dao.GetRows(db, &elmOrderList, sql, sqlBatchCount); err != nil {
// return "", err
// }
// if len(elmOrderList) > 0 {
// task := tasksch.NewParallelTask("TransferLegacyElmOrder2", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetBatchSize(40), ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
// var orderDetailList []*model.GoodsOrderOriginal
// for _, v := range batchItemList {
// elmOrder := v.(*legacymodel2.Elemeorder)
// if len(elmOrder.Data) > 10 {
// var detail map[string]interface{}
// if err = utils.UnmarshalUseNumber([]byte(strings.Replace(strings.Replace(elmOrder.Data, "\n", "", -1), "\r", "", -1)), &detail); err != nil {
// return nil, err
// }
// orderDetail := &model.GoodsOrderOriginal{
// VendorOrderID: elmOrder.Orderid,
// VendorID: model.VendorIDELM,
// AccountNo: "fakeelm",
// OrderCreatedAt: utils.Str2Time(detail["activeAt"].(string)),
// OriginalData: elmOrder.Data,
// }
// orderDetailList = append(orderDetailList, orderDetail)
// }
// }
// if len(orderDetailList) > 0 {
// err = dao.CreateMultiEntities(db, orderDetailList)
// }
// return nil, err
// }, elmOrderList)
// // rootTask.AddChild(task).Run()
// task.Run()
// _, err = task.GetResult(0)
// } else {
// rootTask.Cancel()
// }
// return nil, err
// }, math.MaxInt32)
// tasksch.ManageTask(rootTask).Run()
// if !isAsync {
// _, err = rootTask.GetResult(0)
// } else {
// hint = rootTask.ID
// }
return hint, err
}
func saveJdOrderList(existJdIDMap map[string]int, jdOrderList []interface{}, jdStoreOrderList []map[string]interface{}) (err error) {
var storeOrderList []*model.TempGoodsOrderMobile
for _, v := range jdStoreOrderList {
order := &model.TempGoodsOrderMobile{
VendorOrderID: v["orderId"].(string),
VendorID: model.VendorIDJD,
AccountNo: v["venderId"].(string),
OrderCreatedAt: utils.Timestamp2Time(utils.MustInterface2Int64(v["orderPaidTime"]) / 1000),
Mobile: v["mobile"].(string),
}
storeOrderList = append(storeOrderList, order)
}
var orderDetailList []*model.GoodsOrderOriginal
for _, v := range jdOrderList {
orderMap := v.(map[string]interface{})
orderID := utils.Int64ToStr(utils.MustInterface2Int64(orderMap["orderId"]))
if existJdIDMap[orderID] == 0 {
orgCode := orderMap["orgCode"].(string)
orderDetail := &model.GoodsOrderOriginal{
VendorOrderID: orderID,
VendorID: model.VendorIDJD,
AccountNo: orgCode,
OrderCreatedAt: utils.Str2Time(orderMap["orderPurchaseTime"].(string)),
OriginalData: string(utils.MustMarshal(orderMap)),
}
orderDetailList = append(orderDetailList, orderDetail)
}
}
if len(orderDetailList) > 0 || len(storeOrderList) > 0 {
db := dao.GetDB()
if len(orderDetailList) > 0 {
// err = dao.CreateMultiEntities(db, orderDetailList)
for _, v := range orderDetailList {
dao.CreateEntity(db, v)
}
}
if len(storeOrderList) > 0 {
// err = dao.CreateMultiEntities(db, storeOrderList)
for _, v := range storeOrderList {
dao.CreateEntity(db, v)
}
}
}
return err
}
func PullJdOrder(ctx *jxcontext.Context, fromTime, toTime time.Time, isAsync, isContinueWhenError bool) (hint string, err error) {
var existJdIDs []string
db := dao.GetDB()
if err = dao.GetRows(db, &existJdIDs, `
SELECT vendor_order_id
FROM goods_order_original
WHERE vendor_id = ?`, model.VendorIDJD); err != nil {
return "", err
}
existJdIDMap := make(map[string]int)
for _, v := range existJdIDs {
existJdIDMap[v] = 1
}
pageSize := 100
hourGap := 1
gapCount := int((toTime.Sub(fromTime)-time.Hour*time.Duration(hourGap))/(time.Hour*time.Duration(hourGap)) + 1)
if gapCount <= 0 {
gapCount = 1
}
gapList := make([]int, gapCount)
for k := range gapList {
gapList[k] = k
}
rootTask := tasksch.NewParallelTask("PullJdOrder", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetParallelCount(20), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
gapIndex := batchItemList[0].(int)
hourIndex := gapIndex * hourGap
subFromTime := fromTime.Add(time.Duration(hourIndex) * time.Hour)
subToTime := fromTime.Add(time.Duration(hourIndex+hourGap) * time.Hour)
if subToTime.Sub(toTime) > 0 {
subToTime = toTime
}
if true { //gapIndex < gapCount-1 {
subToTime = subToTime.Add(-1 * time.Second) // 减一秒
}
commonParams := map[string]interface{}{
jdapi.KeyPageSize: pageSize,
"orderPurchaseTime_begin": utils.Time2Str(subFromTime),
"orderPurchaseTime_end": utils.Time2Str(subToTime),
}
// globals.SugarLogger.Debugf("PullJdOrder, commonParams=%s", utils.Format4Output(commonParams, false))
orderList, totalCount, err := api.JdAPI.OrderQuery(commonParams)
if err != nil {
return "", err
}
storeOrderList, err := api.JdAPI.GetStoreOrderInfoList(utils.Time2Str(subFromTime), utils.Time2Str(subToTime))
if err != nil {
}
if err = saveJdOrderList(existJdIDMap, orderList, storeOrderList); err != nil {
return "", err
}
if false {
globals.SugarLogger.Debugf("subFromTime:%s, subToTime:%s, totalCount:%d, len(orderList):%d", utils.Time2Str(subFromTime), utils.Time2Str(subToTime), totalCount, len(orderList))
}
return nil, err
}, gapList)
tasksch.ManageTask(rootTask).Run()
if !isAsync {
_, err = rootTask.GetResult(0)
} else {
hint = rootTask.ID
}
return hint, err
}
func UpdateJdOrderRealMobile(ctx *jxcontext.Context, fromTime, toTime time.Time, isAsync, isContinueWhenError bool) (hint string, err error) {
return hint, err
}
func DeleteWrongSpu(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) {
// sql := `
// SELECT t1.*
// FROM sku_name t1
// JOIN sku_name t2 ON t2.link_id = t1.id AND t2.deleted_at = ?
// WHERE t1.deleted_at = ?;
// `
// db := dao.GetDB()
// var skuNameList []*model.SkuName
// if err = dao.GetRows(db, &skuNameList, sql, utils.DefaultTimeValue, utils.DefaultTimeValue); err != nil {
// return "", err
// }
// rootTask := tasksch.NewSeqTask("DeleteWrongSpu", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
// _, err = cms.DeleteSkuName(ctx, skuNameList[step].ID, ctx.GetUserName())
// if err != nil {
// globals.SugarLogger.Debugf("DeleteWrongSpu failed nameid:%d, name:%s, with error:%v", skuNameList[step].ID, skuNameList[step].Name, err)
// err = nil // 强制忽略错误
// }
// return nil, err
// }, len(skuNameList))
sql := `
SELECT t1.*
FROM sku_name t1
WHERE t1.deleted_at = ? AND t1.is_spu = 1 AND t1.jd_id > 0
ORDER BY t1.id
`
db := dao.GetDB()
var skuNameList []*model.SkuName
if err = dao.GetRows(db, &skuNameList, sql, utils.DefaultTimeValue); err != nil {
return "", err
}
rootTask := tasksch.NewSeqTask("DeleteWrongSpu", ctx,
func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
mapData := map[string]interface{}{
"name": skuNameList[step].Name,
}
_, err = cms.UpdateSkuName(ctx, skuNameList[step].ID, mapData, ctx.GetUserName())
if err != nil {
globals.SugarLogger.Debugf("DeleteWrongSpu failed nameid:%d, name:%s, with error:%v", skuNameList[step].ID, skuNameList[step].Name, err)
err = nil // 强制忽略错误
}
return nil, err
}, len(skuNameList))
tasksch.ManageTask(rootTask).Run()
if !isAsync {
_, err = rootTask.GetResult(0)
} else {
hint = rootTask.ID
}
return hint, err
}
type GoodsOrderOriginalEx struct {
model.GoodsOrderOriginal
OrderStatus int
}
func CreateOrderFromOriginal(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) {
sqlBatchCount := 5000
rootTask := tasksch.NewSeqTask("CreateOrderFromOriginal", ctx,
func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
sql := `
SELECT t1.*, t3.order_status
FROM goods_order_original t1
LEFT JOIN goods_order t2 ON t2.vendor_order_id = t1.vendor_order_id
LEFT JOIN jxorder t3 ON t3.order_id = t1.vendor_order_id
WHERE t2.id IS NULL
LIMIT ?;
`
rawDB := orm.NewOrm()
db := dao.WrapDB(rawDB)
var orderList []*GoodsOrderOriginalEx
if err = dao.GetRows(db, &orderList, sql, sqlBatchCount); err != nil {
return nil, err
}
if len(orderList) > 0 {
task := tasksch.NewParallelTask("CreateOrderFromOriginal", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetParallelCount(1), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
orderOriginal := batchItemList[0].(*GoodsOrderOriginalEx)
globals.SugarLogger.Debugf("CreateOrderFromOriginal processing orderID:%s", orderOriginal.VendorOrderID)
if handler := partner.GetPurchasePlatformFromVendorID(orderOriginal.VendorID); handler != nil {
var detail map[string]interface{}
if err = utils.UnmarshalUseNumber([]byte(strings.Replace(strings.Replace(orderOriginal.OriginalData, "\n", "", -1), "\r", "", -1)), &detail); err != nil {
globals.SugarLogger.Debugf("CreateOrderFromOriginal abnormal orderID:%s, error:%v", orderOriginal.VendorOrderID, err)
return nil, err
}
order := handler.Map2Order(detail)
if order.Status < model.OrderStatusEndBegin {
if orderOriginal.OrderStatus == 3 {
order.Status = model.OrderStatusFinished
} else if orderOriginal.OrderStatus == 7 {
order.Status = model.OrderStatusCanceled
} else {
order2, err2 := handler.GetOrder(order.VendorOrderID)
if err = err2; err == nil {
order.Status = order2.Status
} else {
err = nil // ignore get status error
}
}
}
if err == nil {
if _, err = orderman.FixedOrderManager.SaveOrder(order, false, dao.GetDB()); err != nil {
globals.SugarLogger.Debugf("CreateOrderFromOriginal abnormal orderID:%s, error:%v", orderOriginal.VendorOrderID, err)
}
} else {
globals.SugarLogger.Debugf("CreateOrderFromOriginal abnormal orderID:%s, error:%v", orderOriginal.VendorOrderID, err)
}
} else {
globals.SugarLogger.Debugf("CreateOrderFromOriginal abnormal orderID:%s", orderOriginal.VendorOrderID)
}
return nil, err
}, orderList)
// rootTask.AddChild(task).Run()
task.Run()
_, err = task.GetResult(0)
} else {
rootTask.Cancel()
}
return nil, err
}, math.MaxInt32)
tasksch.ManageTask(rootTask).Run()
if !isAsync {
_, err = rootTask.GetResult(0)
} else {
hint = rootTask.ID
}
return hint, err
}
func TransformJdSpu2Sku(ctx *jxcontext.Context, skuNameIDs []int, count int, isAsync, isContinueWhenError bool) (hint string, err error) {
sql := `
SELECT t1.*
FROM sku_name t1
WHERE t1.deleted_at = ? AND t1.status <> ? AND t1.is_spu = 1 AND jd_id <> 0
`
sqlParams := []interface{}{
utils.DefaultTimeValue,
model.SkuStatusDeleted,
}
if len(skuNameIDs) > 0 {
sql += " AND t1.id IN (" + dao.GenQuestionMarks(len(skuNameIDs)) + ")"
sqlParams = append(sqlParams, skuNameIDs)
}
sql += " ORDER BY t1.id"
if count > 0 {
sql += " LIMIT ?"
sqlParams = append(sqlParams, count)
}
db := dao.GetDB()
var skuNameList []*model.SkuName
if err = dao.GetRows(db, &skuNameList, sql, sqlParams...); err != nil {
return "", err
}
if len(skuNameList) == 0 {
return "", fmt.Errorf("待转换的skuName为空")
}
batchSize := 40
rootTask := tasksch.NewSeqTask("TransformJdSpu2Sku", ctx,
func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
var (
locker sync.Mutex
skuIDs []int
)
lastIndex := (step + 1) * batchSize
if lastIndex > len(skuNameList) {
lastIndex = len(skuNameList)
}
batchSkNameList := skuNameList[step*batchSize : lastIndex]
subTask := tasksch.NewParallelTask(fmt.Sprintf("TransformJdSpu2Sku:%d", step), tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError), ctx,
func(subTask *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
skuName := batchItemList[0].(*model.SkuName)
if !jxutils.IsFakeID(skuName.JdID) {
sql = `
SELECT *
FROM sku
WHERE name_id = ? AND deleted_at = ? AND status <> ?;
`
sqlParams := []interface{}{
skuName.ID,
utils.DefaultTimeValue,
model.SkuStatusDeleted,
}
var skuList []*model.Sku
if err = dao.GetRows(db, &skuList, sql, sqlParams...); err != nil {
return "", err
}
globals.SugarLogger.Debugf("TransformJdSpu2Sku skuList:%s", utils.Format4Output(skuList, false))
if len(skuList) > 0 {
for _, sku := range skuList {
locker.Lock()
skuIDs = append(skuIDs, sku.ID)
locker.Unlock()
if !jxutils.IsFakeID(sku.JdID) {
if globals.EnableStoreWrite {
if err = api.JdAPI.UpdateSkuBaseInfo(utils.Int2Str(skuName.ID), utils.Int2Str(sku.ID), utils.Params2Map(jdapi.KeyFixedStatus, jdapi.SkuFixedStatusDeleted)); err != nil {
break
}
}
}
}
}
if err == nil && globals.EnableStoreWrite {
if err = api.JdAPI.UpdateSpu(utils.Int2Str(skuName.ID), utils.Params2Map(jdapi.KeyFixedStatus, jdapi.SkuFixedStatusOffline)); err == nil {
err = api.JdAPI.UpdateSpu(utils.Int2Str(skuName.ID), utils.Params2Map(jdapi.KeyFixedStatus, jdapi.SkuFixedStatusDeleted))
}
}
if err == nil {
skuName.IsSpu = 0
skuName.JdID = 0
if _, err = dao.UpdateEntity(db, skuName, "IsSpu", "JdID"); err == nil {
sql := `
UPDATE sku t1
SET
t1.jd_sync_status = ?,
t1.jd_id = 0
WHERE t1.name_id = ? AND t1.deleted_at = ? AND t1.status <> ?
`
sqlParams := []interface{}{
model.SyncFlagNewMask,
skuName.ID,
utils.DefaultTimeValue,
model.SkuStatusDeleted,
}
if _, err = dao.ExecuteSQL(db, sql, sqlParams...); err == nil {
_, err = cms.CurVendorSync.SyncSku(ctx, db, skuName.ID, -1, false, isContinueWhenError, ctx.GetUserName())
}
}
}
} else {
globals.SugarLogger.Debugf("TransformJdSpu2Sku skuName:%d is fake", skuName.ID)
}
return nil, err
}, batchSkNameList)
rootTask.AddChild(subTask).Run()
if _, err = subTask.GetResult(0); err == nil {
if len(skuIDs) > 0 {
if _, err = dao.SetStoreSkuSyncStatus(db, model.VendorIDJD, -1, skuIDs, model.SyncFlagModifiedMask|model.SyncFlagPriceMask|model.SyncFlagSaleMask); err == nil {
time.Sleep(20 * time.Second)
_, err = cms.CurVendorSync.SyncStoresSkus(ctx, db, []int{model.VendorIDJD}, nil, skuIDs, false, isContinueWhenError)
}
}
}
return nil, err
}, (len(skuNameList)-1)/batchSize+1)
tasksch.ManageTask(rootTask).Run()
if !isAsync {
_, err = rootTask.GetResult(0)
} else {
hint = rootTask.ID
}
return hint, err
}
func ReProcessJdBadComment(ctx *jxcontext.Context, isForce, isAsync, isContinueWhenError bool) (hint string, err error) {
sql := `
SELECT *
FROM jx_bad_comments
`
if !isForce {
sql += " WHERE (createtime IS NULL OR createtime = '') OR (updatetime IS NULL OR updatetime = '')"
}
// sql += " LIMIT 1"
db := dao.GetDB()
var commentList []*legacymodel.JxBadComments
if err = dao.GetRows(db, &commentList, sql); err == nil {
if len(commentList) > 0 {
rootTask := tasksch.NewParallelTask("ReProcessJdBadComment", nil, ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
badComment := batchItemList[0].(*legacymodel.JxBadComments)
comment1, _ := unmarshalCommentText(badComment.Msg)
comment2, _ := unmarshalCommentText(badComment.UpdatedMsg)
if len(comment1) > 0 {
badComment.Createtime = utils.Timestamp2Str(utils.MustInterface2Int64(comment1["createTime"].(map[string]interface{})["time"]) / 1000)
badComment.Msg = string(utils.MustMarshal(comment1))
if len(comment2) > 0 {
badComment.Updatetime = utils.Timestamp2Str(utils.MustInterface2Int64(comment2["createTime"].(map[string]interface{})["time"]) / 1000)
badComment.UpdatedMsg = string(utils.MustMarshal(comment2))
}
_, err = dao.UpdateEntity(db, badComment)
}
return nil, err
}, commentList)
tasksch.ManageTask(rootTask).Run()
if !isAsync {
_, err = rootTask.GetResult(0)
} else {
hint = rootTask.ID
}
}
}
return hint, err
}
func unmarshalCommentText(commentStr string) (retVal map[string]interface{}, isNeedUpdate bool) {
var err error
for {
var retVal map[string]interface{} // 必须要用局部变量
if commentStr == "" {
return nil, false
}
if err = jxutils.Strings2Objs(commentStr, &retVal); err == nil {
if retVal["data"] != nil {
commentStr = retVal["data"].(string)
} else if retVal["result"] != nil {
return retVal["result"].(map[string]interface{}), true
} else {
return retVal, false
}
} else {
strList := innerDataPat.FindStringSubmatch(commentStr)
if strList[1] != "" {
commentStr = strList[1]
} else {
return nil, false
}
}
}
}
func RefreshEbaiBadComment(ctx *jxcontext.Context, fromTime, toTime time.Time, isAsync, isContinueWhenError bool) (hint string, err error) {
if utils.IsTimeZero(fromTime) {
fromTime = utils.Str2Time("2018-05-03 00:00:00")
}
if utils.IsTimeZero(toTime) {
toTime = time.Now().Add(-3 * time.Hour)
}
days := int(toTime.Sub(fromTime)/(24*time.Hour) + 1)
globals.SugarLogger.Debugf("RefreshEbaiBadComment fromTime:%s, toTime:%s, days:%d", utils.Time2Str(fromTime), utils.Time2Str(toTime), days)
rootTask := tasksch.NewSeqTask("RefreshEbaiBadComment", ctx,
func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
batchFromTime := fromTime.Add(time.Duration(step) * 24 * time.Hour)
batchToTime := batchFromTime.Add(24*time.Hour - time.Second)
if batchToTime.Sub(toTime) > 0 {
batchToTime = toTime
}
err = ebai.CurPurchaseHandler.RefreshComment(batchFromTime, batchToTime)
return nil, err
}, days)
tasksch.ManageTask(rootTask).Run()
if !isAsync {
_, err = rootTask.GetResult(0)
} else {
hint = rootTask.ID
}
return hint, err
}
func PrintMsg(ctx *jxcontext.Context, vendorID int, id1, id2, msgTitle, msgContent string) (printerStatus *partner.PrinterStatus, err error) {
handler := partner.GetPrinterPlatformFromVendorID(vendorID)
if handler == nil {
return nil, fmt.Errorf("打印机厂商:%d当前不被支持请检查vendorID", vendorID)
}
return handler.PrintMsg(ctx, id1, id2, msgTitle, msgContent)
}
func UpdateAllWeiXinRemark(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) {
var weixinList []*legacymodel.WeiXins
if err = dao.GetRows(dao.GetDB(), &weixinList, `
SELECT *
FROM weixins
WHERE openid <> '' AND tel <> ''
`); err == nil {
rootTask := tasksch.NewParallelTask("刷新微信备注", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
weixins := batchItemList[0].(*legacymodel.WeiXins)
err = jxutils.HandleUserWXRemark(dao.GetDB(), weixins.Tel)
return nil, err
}, weixinList)
tasksch.ManageTask(rootTask).Run()
if !isAsync {
_, err = rootTask.GetResult(0)
} else {
hint = rootTask.ID
}
}
return hint, err
}
// 从饿百得到执照信息
func RetrieveEbaiShopLicence(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) {
var ebaiStoreList []*model.StoreMap
db := dao.GetDB()
if err = dao.GetRows(db, &ebaiStoreList, `
SELECT *
FROM store_map
WHERE vendor_id = ? AND deleted_at = ?
`, model.VendorIDEBAI, utils.DefaultTimeValue); err == nil {
globals.SugarLogger.Debugf("RetrieveEbaiShopLicence, count:%d", len(ebaiStoreList))
rootTask := tasksch.NewParallelTask("XXXX", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError), ctx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
ebaiStore := batchItemList[0].(*model.StoreMap)
xxList, err := api.EbaiAPI.ShopAptitudeGet("", utils.Str2Int64(ebaiStore.VendorStoreID))
if err == nil && ebaiStore != nil {
if len(xxList) > 1 {
ebaiStore2, err2 := api.EbaiAPI.ShopGet("", utils.Str2Int64(ebaiStore.VendorStoreID))
if err = err2; err == nil {
shop := &legacymodel.EbaiShopLicence{
ShopName: utils.Interface2String(ebaiStore2["name"]),
Licence: utils.Interface2String(xxList[1]["license_number"]),
Address: utils.Interface2String(ebaiStore2["address"]),
Owner: utils.Interface2String(xxList[1]["legal_representative_name"]),
Tel: utils.Interface2String(ebaiStore2["service_phone"]),
LicenceName: utils.Interface2String(xxList[1]["license_name"]),
}
err = dao.CreateEntity(db, shop)
}
}
}
return nil, err
}, ebaiStoreList)
tasksch.ManageTask(rootTask).Run()
if !isAsync {
_, err = rootTask.GetResult(0)
} else {
hint = rootTask.ID
}
}
return hint, err
}