- refactor file structure.
This commit is contained in:
144
business/jxcallback/orderman/orderman.go
Normal file
144
business/jxcallback/orderman/orderman.go
Normal file
@@ -0,0 +1,144 @@
|
||||
package orderman
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"git.rosy.net.cn/baseapi/utils"
|
||||
"git.rosy.net.cn/jx-callback/business/jxcallback/scheduler"
|
||||
_ "git.rosy.net.cn/jx-callback/business/jxcallback/scheduler/defsch" // 导入缺省订单调度器
|
||||
"git.rosy.net.cn/jx-callback/business/jxutils"
|
||||
"git.rosy.net.cn/jx-callback/business/model"
|
||||
"git.rosy.net.cn/jx-callback/business/partner"
|
||||
"git.rosy.net.cn/jx-callback/globals"
|
||||
"github.com/astaxie/beego/orm"
|
||||
)
|
||||
|
||||
const (
|
||||
pendingOrderGapMax = 2 * 24 * time.Hour // 每次重启机子时,要检查几天内的订单状态
|
||||
maxTimeHandlePendingOrder = 2 * time.Second //处理pending order的最长时间
|
||||
maxSleepGapHandlePendingOrder = 5 * time.Millisecond // 每个pending order的最长时间间隙
|
||||
)
|
||||
|
||||
var (
|
||||
CurOrderManager *OrderManager
|
||||
)
|
||||
|
||||
// 所有公共接口调用前,要求在order里或status中设置合适的Status
|
||||
type OrderManager struct {
|
||||
}
|
||||
|
||||
func NewOrderManager() *OrderManager {
|
||||
return &OrderManager{}
|
||||
}
|
||||
|
||||
type StatusTimer interface {
|
||||
GetStatusTime() time.Time
|
||||
}
|
||||
|
||||
type StatusTimerSlice []StatusTimer
|
||||
|
||||
func (s StatusTimerSlice) Len() int {
|
||||
return len(s)
|
||||
}
|
||||
|
||||
func (s StatusTimerSlice) Less(i, j int) bool {
|
||||
return s[i].GetStatusTime().Sub(s[j].GetStatusTime()) < 0
|
||||
}
|
||||
|
||||
func (s StatusTimerSlice) Swap(i, j int) {
|
||||
tmp := s[i]
|
||||
s[i] = s[j]
|
||||
s[j] = tmp
|
||||
}
|
||||
|
||||
func init() {
|
||||
CurOrderManager = NewOrderManager()
|
||||
partner.Init(CurOrderManager)
|
||||
}
|
||||
|
||||
func addOrderOrWaybillStatus(status *model.OrderStatus, db orm.Ormer) (isDuplicated bool, err error) {
|
||||
if status.OrderType == model.OrderTypeOrder {
|
||||
globals.SugarLogger.Debugf("addOrderStatus order:%v", status)
|
||||
} else {
|
||||
globals.SugarLogger.Debugf("addOrderStatus waybill:%v", status)
|
||||
}
|
||||
|
||||
status.ID = 0
|
||||
created, _, err := db.ReadOrCreate(status, "VendorOrderID", "VendorID", "OrderType", "VendorStatus", "StatusTime")
|
||||
if err == nil {
|
||||
if !created {
|
||||
globals.SugarLogger.Debugf("duplicated event:%v", status)
|
||||
isDuplicated = true
|
||||
status.DuplicatedCount++
|
||||
utils.CallFuncLogError(func() error {
|
||||
_, err = db.Update(status, "DuplicatedCount")
|
||||
return err
|
||||
}, "addOrderOrWaybillStatus update DuplicatedCount, status:%v", status)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
// todo 这里居然会有主键重复错误,逻辑上是不应该的
|
||||
globals.SugarLogger.Warnf("addOrderOrWaybillStatus status:%v, access db error:%v", status, err)
|
||||
}
|
||||
return isDuplicated, err
|
||||
}
|
||||
|
||||
// todo 最好还是改成全事件回放算了
|
||||
func LoadPendingOrders() {
|
||||
orders := CurOrderManager.LoadPendingOrders()
|
||||
globals.SugarLogger.Infof("LoadPendingOrders orders count:%d", len(orders))
|
||||
|
||||
ordersCount := len(orders)
|
||||
if ordersCount > 0 {
|
||||
bills := CurOrderManager.LoadPendingWaybills()
|
||||
globals.SugarLogger.Infof("LoadPendingOrders waybills count:%d", len(bills))
|
||||
var sortOrders StatusTimerSlice
|
||||
for _, order := range orders {
|
||||
if order.Status > model.OrderStatusNew {
|
||||
status := model.Order2Status(order)
|
||||
sortOrders = append(sortOrders, status)
|
||||
}
|
||||
// order.Status = model.OrderStatusNew // 就是要以实际order状态来调用scheduler.OnOrderNew
|
||||
order.StatusTime = order.OrderCreatedAt
|
||||
sortOrders = append(sortOrders, order)
|
||||
}
|
||||
for _, bill := range bills {
|
||||
if bill.Status > model.WaybillStatusNew {
|
||||
bill2 := *bill
|
||||
sortOrders = append(sortOrders, &bill2)
|
||||
}
|
||||
bill.Status = model.WaybillStatusNew
|
||||
bill.StatusTime = bill.WaybillCreatedAt
|
||||
sortOrders = append(sortOrders, bill)
|
||||
}
|
||||
sort.Sort(sortOrders)
|
||||
sleepGap := maxTimeHandlePendingOrder / time.Duration(ordersCount)
|
||||
if sleepGap > maxSleepGapHandlePendingOrder {
|
||||
sleepGap = maxSleepGapHandlePendingOrder
|
||||
}
|
||||
lastTime := time.Now()
|
||||
for _, item := range sortOrders {
|
||||
if order, ok := item.(*model.GoodsOrder); ok {
|
||||
jxutils.CallMsgHandlerAsync(func() {
|
||||
scheduler.CurrentScheduler.OnOrderNew(order, true)
|
||||
}, order.VendorOrderID)
|
||||
} else if status, ok := item.(*model.OrderStatus); ok {
|
||||
jxutils.CallMsgHandlerAsync(func() {
|
||||
scheduler.CurrentScheduler.OnOrderStatusChanged(status, true)
|
||||
}, status.VendorOrderID)
|
||||
} else {
|
||||
bill := item.(*model.Waybill)
|
||||
jxutils.CallMsgHandlerAsync(func() {
|
||||
scheduler.CurrentScheduler.OnWaybillStatusChanged(bill, true)
|
||||
}, bill.VendorOrderID)
|
||||
}
|
||||
curTime := time.Now()
|
||||
timeout := sleepGap - curTime.Sub(lastTime)
|
||||
if timeout > 0 {
|
||||
time.Sleep(timeout)
|
||||
}
|
||||
lastTime = curTime
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user