Files
jx-callback/business/jxcallback/orderman/orderman.go
苏尹岚 0dd7235485 aa
2021-03-30 17:39:07 +08:00

181 lines
5.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package orderman
import (
"errors"
"sort"
"time"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxcallback/scheduler"
"git.rosy.net.cn/jx-callback/business/jxutils"
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/model/dao"
"git.rosy.net.cn/jx-callback/business/partner"
"git.rosy.net.cn/jx-callback/globals"
)
const (
pendingOrderGapMax = 2 * 24 * time.Hour // 每次重启机子时,要检查几天内的订单状态
maxTimeHandlePendingOrder = 2 * time.Second //处理pending order的最长时间
maxSleepGapHandlePendingOrder = 5 * time.Millisecond // 每个pending order的最长时间间隙
)
var (
ErrCanNotFindOrder = errors.New("找不到相应订单")
ErrCanNotFindWaybill = errors.New("找不到相应运单")
)
var (
FixedOrderManager *OrderManager
)
// 所有公共接口调用前要求在order里或status中设置合适的Status
type OrderManager struct {
}
func NewOrderManager() *OrderManager {
return &OrderManager{}
}
type IStatusTimer interface {
GetStatusTime() time.Time
}
type StatusTimerSlice []IStatusTimer
func (s StatusTimerSlice) Len() int {
return len(s)
}
func (s StatusTimerSlice) Less(i, j int) bool {
return s[i].GetStatusTime().Sub(s[j].GetStatusTime()) < 0
}
func (s StatusTimerSlice) Swap(i, j int) {
tmp := s[i]
s[i] = s[j]
s[j] = tmp
}
func init() {
FixedOrderManager = NewOrderManager()
partner.InitOrderManager(FixedOrderManager)
}
func addOrderOrWaybillStatus(status *model.OrderStatus, db *dao.DaoDB) (isDuplicated bool, err error) {
if status.OrderType == model.OrderTypeOrder {
globals.SugarLogger.Debugf("addOrderStatus order:%v", status)
} else if status.OrderType == model.OrderTypeWaybill {
globals.SugarLogger.Debugf("addOrderStatus waybill:%v", status)
} else {
globals.SugarLogger.Debugf("addOrderStatus afsOrder:%v", status)
}
txDB , _ := dao.Begin(db)
defer func() {
if r := recover(); r != nil || err != nil {
globals.SugarLogger.Debug("rollback")
dao.Rollback(db, txDB)
if r != nil {
panic(r)
}
}
}()
status.ID = 0
status.Remark = utils.LimitUTF8StringLen(status.Remark, 255)
created, _, err := db.Db.ReadOrCreate(status, "VendorOrderID", "VendorID", "OrderType", "Status", "VendorStatus", "StatusTime")
if err == nil {
if !created {
globals.SugarLogger.Debugf("duplicated event:%v", status)
isDuplicated = true
status.DuplicatedCount++
utils.CallFuncLogError(func() error {
_, err = db.Db.Update(status, "DuplicatedCount")
return err
}, "addOrderOrWaybillStatus update DuplicatedCount, status:%v", status)
}
}
if err != nil {
// todo 这里居然会有主键重复错误,逻辑上是不应该的
globals.SugarLogger.Warnf("addOrderOrWaybillStatus status:%v, access db error:%v", status, err)
} else {
dao.Commit(db, txDB)
}
return isDuplicated, err
}
func (c *OrderManager) GetStatusDuplicatedCount(status *model.OrderStatus) (duplicatedCount int) {
if status == nil {
return 0
}
db := dao.GetDB()
if err := dao.GetEntity(db, status, "VendorOrderID", "VendorID", "OrderType", "VendorStatus", "StatusTime"); err == nil {
return status.DuplicatedCount
}
return 0
}
// todo 最好还是改成全事件回放算了
func LoadPendingOrders() {
orders, err := dao.LoadPendingOrders(dao.GetDB(), time.Now().Add(-pendingOrderGapMax), model.OrderStatusEndBegin)
globals.SugarLogger.Infof("LoadPendingOrders orders count:%d, err:%v", len(orders), err)
if err != nil {
return
}
ordersCount := len(orders)
if ordersCount > 0 {
bills := FixedOrderManager.LoadPendingWaybills()
globals.SugarLogger.Infof("LoadPendingOrders waybills count:%d", len(bills))
var sortOrders StatusTimerSlice
orderMap := make(map[string]*model.GoodsOrder)
for _, order := range orders {
if order.Status > model.OrderStatusNew {
status := model.Order2Status(order)
sortOrders = append(sortOrders, status)
}
// order.Status = model.OrderStatusNew // 就是要以实际order状态来调用scheduler.OnOrderNew
order.StatusTime = order.OrderCreatedAt
sortOrders = append(sortOrders, order)
orderMap[jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID)] = order
}
for _, bill := range bills {
if bill.Status > model.WaybillStatusNew {
bill2 := *bill
sortOrders = append(sortOrders, &bill2)
}
bill.Status = model.WaybillStatusNew
bill.StatusTime = bill.WaybillCreatedAt
sortOrders = append(sortOrders, bill)
}
sort.Sort(sortOrders)
sleepGap := maxTimeHandlePendingOrder / time.Duration(ordersCount)
if sleepGap > maxSleepGapHandlePendingOrder {
sleepGap = maxSleepGapHandlePendingOrder
}
lastTime := time.Now()
for _, item := range sortOrders {
if order, ok := item.(*model.GoodsOrder); ok {
jxutils.CallMsgHandlerAsync(func() {
scheduler.CurrentScheduler.OnOrderNew(order, true, true)
}, jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID))
} else if status, ok := item.(*model.OrderStatus); ok {
jxutils.CallMsgHandlerAsync(func() {
order := orderMap[jxutils.ComposeUniversalOrderID(status.VendorOrderID, status.VendorID)]
scheduler.CurrentScheduler.OnOrderStatusChanged(order, status, true)
}, jxutils.ComposeUniversalOrderID(status.RefVendorOrderID, status.RefVendorID))
} else {
bill := item.(*model.Waybill)
jxutils.CallMsgHandlerAsync(func() {
scheduler.CurrentScheduler.OnWaybillStatusChanged(bill, true)
}, jxutils.ComposeUniversalOrderID(bill.VendorOrderID, bill.OrderVendorID))
}
curTime := time.Now()
timeout := sleepGap - curTime.Sub(lastTime)
if timeout > 0 {
time.Sleep(timeout)
}
lastTime = curTime
}
}
}