!!!重新实现订单调度timer,可以允许多个timer.

This commit is contained in:
gazebo
2019-11-25 15:49:38 +08:00
parent f1b3547645
commit 622bfc89df

View File

@@ -63,14 +63,14 @@ var (
FixedScheduler *DefScheduler
)
// type tTimerInfo struct {
// statusType int
// vendorID int
// status int
type tTimerInfo struct {
statusType int
vendorID int
status int
// timer *time.Timer
// timerTime time.Time
// }
timer *time.Timer
timerTime time.Time
}
type WatchOrderInfo struct {
order *model.GoodsOrder // order里的信息是保持更新的
@@ -83,12 +83,12 @@ type WatchOrderInfo struct {
waybills map[int]*model.Waybill // 这个waybills里的状态信息是不真实的只使用id相关的信息
timerStatusType int // 0表示订单1表示运单
timerStatus int
timer *time.Timer
timerTime time.Time
// timerStatusType int // 0表示订单1表示运单
// timerStatus int
// timer *time.Timer
// timerTime time.Time
// timerList []*tTimerInfo
timerList []*tTimerInfo
retryCount int // 失败后尝试的次数,调试阶段可能出现死循化,阻止这种情况发生
}
@@ -167,34 +167,34 @@ func (s *WatchOrderInfo) GetWaybillVendorIDs() (vendorIDs []int) {
// orderType-1全部
// vendorID-1全部
// status-1全部
// func (w *WatchOrderInfo) StopTimer(statusType, vendorID, status int) {
// var newTimerList []*tTimerInfo
// for _, timerInfo := range w.timerList {
// if (statusType == -1 || statusType == timerInfo.statusType) ||
// (vendorID == -1 || vendorID == timerInfo.vendorID) ||
// (status == -1 || status <= timerInfo.status) {
// if timerInfo.timer != nil {
// timerInfo.timer.Stop()
// timerInfo.timer = nil
// }
// } else {
// newTimerList = append(newTimerList, timerInfo)
// }
// }
// w.timerList = newTimerList
// }
func (w *WatchOrderInfo) StopTimer(statusType, vendorID, status int) {
var newTimerList []*tTimerInfo
for _, timerInfo := range w.timerList {
if (statusType == -1 || statusType == timerInfo.statusType) &&
(vendorID == -1 || vendorID == timerInfo.vendorID) &&
(status == -1 || status <= timerInfo.status) {
if timerInfo.timer != nil {
timerInfo.timer.Stop()
timerInfo.timer = nil
}
} else {
newTimerList = append(newTimerList, timerInfo)
}
}
w.timerList = newTimerList
}
func (w *WatchOrderInfo) GetCreateWaybillTimeout() (timeoutSecond int) {
if w.timerStatusType == scheduler.TimerStatusTypeWaybill && w.timerStatus == model.WaybillStatusNew {
timeoutSecond = int(w.timerTime.Sub(time.Now()) / time.Second)
}
// for _, timerInfo := range w.timerList {
// if timerInfo.statusType == scheduler.TimerStatusTypeWaybill &&
// timerInfo.status == model.WaybillStatusNew {
// timeoutSecond = int(timerInfo.timerTime.Sub(time.Now()) / time.Second)
// break
// }
// if w.timerStatusType == scheduler.TimerStatusTypeWaybill && w.timerStatus == model.WaybillStatusNew {
// timeoutSecond = int(w.timerTime.Sub(time.Now()) / time.Second)
// }
for _, timerInfo := range w.timerList {
if timerInfo.statusType == scheduler.TimerStatusTypeWaybill &&
timerInfo.status == model.WaybillStatusNew {
timeoutSecond = int(timerInfo.timerTime.Sub(time.Now()) / time.Second)
break
}
}
return timeoutSecond
}
@@ -771,169 +771,169 @@ func (s *DefScheduler) loadSavedOrderFromMap(status *model.OrderStatus, isForceL
return realSavedInfo
}
// func (s *DefScheduler) resetTimer(savedOrderInfo *WatchOrderInfo, bill *model.Waybill, isPending bool) {
// order := savedOrderInfo.order
// status := order.Status
// statusType := scheduler.TimerStatusTypeOrder
// vendorID := order.VendorID
// statusTime := order.StatusTime
// if bill != nil {
// status = bill.Status
// statusType = scheduler.TimerStatusTypeWaybill
// vendorID = bill.WaybillVendorID
// statusTime = bill.StatusTime
// }
// globals.SugarLogger.Debugf("resetTimer, orderID:%s statusType:%d status:%d", order.VendorOrderID, statusType, status)
// config := s.mergeOrderStatusConfig(savedOrderInfo, statusTime, statusType, status)
// stopStatusType := statusType
// stopStatus := status
// if statusType == scheduler.TimerStatusTypeOrder {
// if status > model.OrderStatusEndBegin {
// stopStatusType = -1
// stopStatus = -1
// }
// }
// if config == nil || config.TimerType != partner.TimerTypeByPass {
// savedOrderInfo.StopTimer(stopStatusType, -1, stopStatus)
// }
// if config != nil && config.TimeoutAction != nil && config.TimerType != partner.TimerTypeByPass {
// if config.CallShouldSetTimer(savedOrderInfo, bill) {
// timeout := config.GetRefTimeout(statusTime, order.OrderCreatedAt)
// if config.TimeoutGap != 0 {
// timeout += time.Duration(rand.Intn(int(config.TimeoutGap))) * time.Second
// }
// if isPending && timeout < pendingOrderTimerMaxSecond*time.Second { // 如果是PENDING的订单则将其分布到2--5秒内让后续事件有机会执行
// timeout = time.Duration(jxutils.MapValue2Scope(int64(timeout), -pendingOrderTimerMinMinSecond*1000, pendingOrderTimerMaxSecond*1000, pendingOrderTimerMinSecond*1000, pendingOrderTimerMaxSecond*1000)) * time.Millisecond
// } else if timeout < 0 {
// timeout = 0
// }
// if timeout == 0 {
// config.CallTimeoutAction(savedOrderInfo, bill)
// } else {
// timerName := ""
// if statusType == model.OrderTypeOrder {
// timerName = model.OrderStatusName[status]
// } else if statusType == model.OrderTypeWaybill {
// timerName = model.WaybillStatusName[status]
// }
// timerInfo := &tTimerInfo{
// statusType: statusType,
// vendorID: vendorID,
// status: status,
// timerTime: time.Now().Add(timeout),
// }
// timerInfo.timer = utils.AfterFuncWithRecover(timeout, func() {
// jxutils.CallMsgHandlerAsync(func() {
// globals.SugarLogger.Debugf("fire timer:%s, orderID:%s", timerName, order.VendorOrderID)
// ts := s.loadSavedOrderFromMap(model.Order2Status(order), true)
// config.CallTimeoutAction(ts, bill)
// timerInfo.timer = nil
// ts.StopTimer(statusType, vendorID, status)
// }, jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID))
// })
// }
// globals.SugarLogger.Debugf("resetTimer, orderID:%s, statusType:%d, status:%d, timeout:%v", order.VendorOrderID, statusType, status, timeout)
// } else {
// globals.SugarLogger.Debugf("resetTimer, orderID:%s, statusType:%d, status:%d, should not set timer", order.VendorOrderID, statusType, status)
// }
// } else {
// globals.SugarLogger.Debugf("resetTimer bypass2, orderID:%s statusType:%d status:%v, config:%s", order.VendorOrderID, statusType, status, utils.Format4Output(config, true))
// }
// }
// func (s *DefScheduler) stopTimer(savedOrderInfo *WatchOrderInfo) {
// savedOrderInfo.StopTimer(0, -1, 0)
// }
func (s *DefScheduler) stopTimer(savedOrderInfo *WatchOrderInfo) {
if savedOrderInfo.timer != nil {
globals.SugarLogger.Debugf("stopTimer orderID:%s", savedOrderInfo.order.VendorOrderID)
savedOrderInfo.timer.Stop()
savedOrderInfo.timerStatus = model.OrderStatusUnknown
savedOrderInfo.timerStatusType = scheduler.TimerStatusTypeUnknown
savedOrderInfo.timer = nil
}
}
func (s *DefScheduler) resetTimer(savedOrderInfo *WatchOrderInfo, bill *model.Waybill, isPending bool) {
order := savedOrderInfo.order
status := order.Status
statusType := scheduler.TimerStatusTypeOrder
vendorID := order.VendorID
statusTime := order.StatusTime
if bill != nil {
status = bill.Status
statusType = scheduler.TimerStatusTypeWaybill
vendorID = bill.WaybillVendorID
statusTime = bill.StatusTime
}
globals.SugarLogger.Debugf("resetTimer, orderID:%s statusType:%d status:%d", order.VendorOrderID, statusType, status)
if isStatusNewer(order.VendorID, savedOrderInfo.timerStatusType, savedOrderInfo.timerStatus, statusType, status) { // 新设置的TIMER不能覆盖状态在其后的TIMER如果状态回绕需要注意
config := s.mergeOrderStatusConfig(savedOrderInfo, statusTime, statusType, status)
if config == nil || config.TimerType != partner.TimerTypeByPass {
s.stopTimer(savedOrderInfo)
config := s.mergeOrderStatusConfig(savedOrderInfo, statusTime, statusType, status)
stopStatusType := statusType
stopStatus := status
if statusType == scheduler.TimerStatusTypeOrder {
if status >= model.OrderStatusDelivering {
stopStatusType = -1
stopStatus = -1
}
if config != nil && config.TimeoutAction != nil && config.TimerType != partner.TimerTypeByPass {
if config.CallShouldSetTimer(savedOrderInfo, bill) {
timeout := config.GetRefTimeout(statusTime, order.OrderCreatedAt)
if config.TimeoutGap != 0 {
timeout += time.Duration(rand.Intn(int(config.TimeoutGap))) * time.Second
}
if isPending && timeout < pendingOrderTimerMaxSecond*time.Second { // 如果是PENDING的订单则将其分布到2--5秒内让后续事件有机会执行
timeout = time.Duration(jxutils.MapValue2Scope(int64(timeout), -pendingOrderTimerMinMinSecond*1000, pendingOrderTimerMaxSecond*1000, pendingOrderTimerMinSecond*1000, pendingOrderTimerMaxSecond*1000)) * time.Millisecond
} else if timeout < 0 {
timeout = 0
}
if timeout == 0 {
config.CallTimeoutAction(savedOrderInfo, bill)
} else {
timerName := ""
if statusType == scheduler.TimerStatusTypeOrder {
timerName = model.OrderStatusName[status]
} else if statusType == scheduler.TimerStatusTypeWaybill {
timerName = model.WaybillStatusName[status]
}
savedOrderInfo.timerStatusType = statusType
savedOrderInfo.timerStatus = status
savedOrderInfo.timerTime = time.Now().Add(timeout)
savedOrderInfo.timer = utils.AfterFuncWithRecover(timeout, func() {
jxutils.CallMsgHandlerAsync(func() {
globals.SugarLogger.Debugf("fire timer:%s, orderID:%s", timerName, order.VendorOrderID)
savedOrderInfo := s.loadSavedOrderFromMap(model.Order2Status(order), true)
config.CallTimeoutAction(savedOrderInfo, bill)
savedOrderInfo.timerStatus = 0
savedOrderInfo.timerStatusType = scheduler.TimerStatusTypeUnknown
}, jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID))
})
}
globals.SugarLogger.Debugf("resetTimer, orderID:%s, statusType:%d, status:%d, timeout:%v", order.VendorOrderID, statusType, status, timeout)
} else {
globals.SugarLogger.Debugf("resetTimer, orderID:%s, statusType:%d, status:%d, should not set timer", order.VendorOrderID, statusType, status)
}
if config == nil || config.TimerType != partner.TimerTypeByPass {
savedOrderInfo.StopTimer(stopStatusType, -1, stopStatus)
}
if config != nil && config.TimeoutAction != nil && config.TimerType != partner.TimerTypeByPass {
if config.CallShouldSetTimer(savedOrderInfo, bill) {
timeout := config.GetRefTimeout(statusTime, order.OrderCreatedAt)
if config.TimeoutGap != 0 {
timeout += time.Duration(rand.Intn(int(config.TimeoutGap))) * time.Second
}
if isPending && timeout < pendingOrderTimerMaxSecond*time.Second { // 如果是PENDING的订单则将其分布到2--5秒内让后续事件有机会执行
timeout = time.Duration(jxutils.MapValue2Scope(int64(timeout), -pendingOrderTimerMinMinSecond*1000, pendingOrderTimerMaxSecond*1000, pendingOrderTimerMinSecond*1000, pendingOrderTimerMaxSecond*1000)) * time.Millisecond
} else if timeout < 0 {
timeout = 0
}
if timeout == 0 {
config.CallTimeoutAction(savedOrderInfo, bill)
} else {
timerName := ""
if statusType == model.OrderTypeOrder {
timerName = model.OrderStatusName[status]
} else if statusType == model.OrderTypeWaybill {
timerName = model.WaybillStatusName[status]
}
timerInfo := &tTimerInfo{
statusType: statusType,
vendorID: vendorID,
status: status,
timerTime: time.Now().Add(timeout),
}
timerInfo.timer = utils.AfterFuncWithRecover(timeout, func() {
jxutils.CallMsgHandlerAsync(func() {
globals.SugarLogger.Debugf("fire timer:%s, orderID:%s", timerName, order.VendorOrderID)
ts := s.loadSavedOrderFromMap(model.Order2Status(order), true)
config.CallTimeoutAction(ts, bill)
timerInfo.timer = nil
ts.StopTimer(statusType, vendorID, status)
}, jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID))
})
}
globals.SugarLogger.Debugf("resetTimer, orderID:%s, statusType:%d, status:%d, timeout:%v", order.VendorOrderID, statusType, status, timeout)
} else {
globals.SugarLogger.Debugf("resetTimer bypass2, orderID:%s statusType:%d status:%v, config:%s", order.VendorOrderID, statusType, status, utils.Format4Output(config, true))
globals.SugarLogger.Debugf("resetTimer, orderID:%s, statusType:%d, status:%d, should not set timer", order.VendorOrderID, statusType, status)
}
} else {
globals.SugarLogger.Debugf("resetTimer bypass1, orderID:%s statusType:%d status:%v", order.VendorOrderID, statusType, status)
globals.SugarLogger.Debugf("resetTimer bypass2, orderID:%s statusType:%d status:%v, config:%s", order.VendorOrderID, statusType, status, utils.Format4Output(config, true))
}
}
func isStatusNewer(vendorID int, curStatusType, curStatus, statusType, status int) bool {
// 拣货完成及之前的订单事件TIMER不能覆盖运单TIMER一般是消息错序引起的
// 美团订单在接单后就会收到新运单事件因当前只支持一个TIMER暂时舍弃三方配送调度而要自动拣货调度
if vendorID != model.VendorIDMTWM {
if curStatusType == scheduler.TimerStatusTypeWaybill && statusType == scheduler.TimerStatusTypeOrder && status <= model.OrderStatusFinishedPickup {
return false
}
} else {
return statusType == scheduler.TimerStatusTypeOrder && status >= curStatus
}
if curStatusType == scheduler.TimerStatusTypeWaybill {
return curStatus != status
}
return curStatusType != statusType || status >= curStatus
func (s *DefScheduler) stopTimer(savedOrderInfo *WatchOrderInfo) {
savedOrderInfo.StopTimer(-1, -1, -1)
}
// func (s *DefScheduler) stopTimer(savedOrderInfo *WatchOrderInfo) {
// if savedOrderInfo.timer != nil {
// globals.SugarLogger.Debugf("stopTimer orderID:%s", savedOrderInfo.order.VendorOrderID)
// savedOrderInfo.timer.Stop()
// savedOrderInfo.timerStatus = model.OrderStatusUnknown
// savedOrderInfo.timerStatusType = scheduler.TimerStatusTypeUnknown
// savedOrderInfo.timer = nil
// }
// }
// func (s *DefScheduler) resetTimer(savedOrderInfo *WatchOrderInfo, bill *model.Waybill, isPending bool) {
// order := savedOrderInfo.order
// status := order.Status
// statusType := scheduler.TimerStatusTypeOrder
// statusTime := order.StatusTime
// if bill != nil {
// status = bill.Status
// statusType = scheduler.TimerStatusTypeWaybill
// statusTime = bill.StatusTime
// }
// globals.SugarLogger.Debugf("resetTimer, orderID:%s statusType:%d status:%d", order.VendorOrderID, statusType, status)
// if isStatusNewer(order.VendorID, savedOrderInfo.timerStatusType, savedOrderInfo.timerStatus, statusType, status) { // 新设置的TIMER不能覆盖状态在其后的TIMER如果状态回绕需要注意
// config := s.mergeOrderStatusConfig(savedOrderInfo, statusTime, statusType, status)
// if config == nil || config.TimerType != partner.TimerTypeByPass {
// s.stopTimer(savedOrderInfo)
// }
// if config != nil && config.TimeoutAction != nil && config.TimerType != partner.TimerTypeByPass {
// if config.CallShouldSetTimer(savedOrderInfo, bill) {
// timeout := config.GetRefTimeout(statusTime, order.OrderCreatedAt)
// if config.TimeoutGap != 0 {
// timeout += time.Duration(rand.Intn(int(config.TimeoutGap))) * time.Second
// }
// if isPending && timeout < pendingOrderTimerMaxSecond*time.Second { // 如果是PENDING的订单则将其分布到2--5秒内让后续事件有机会执行
// timeout = time.Duration(jxutils.MapValue2Scope(int64(timeout), -pendingOrderTimerMinMinSecond*1000, pendingOrderTimerMaxSecond*1000, pendingOrderTimerMinSecond*1000, pendingOrderTimerMaxSecond*1000)) * time.Millisecond
// } else if timeout < 0 {
// timeout = 0
// }
// if timeout == 0 {
// config.CallTimeoutAction(savedOrderInfo, bill)
// } else {
// timerName := ""
// if statusType == scheduler.TimerStatusTypeOrder {
// timerName = model.OrderStatusName[status]
// } else if statusType == scheduler.TimerStatusTypeWaybill {
// timerName = model.WaybillStatusName[status]
// }
// savedOrderInfo.timerStatusType = statusType
// savedOrderInfo.timerStatus = status
// savedOrderInfo.timerTime = time.Now().Add(timeout)
// savedOrderInfo.timer = utils.AfterFuncWithRecover(timeout, func() {
// jxutils.CallMsgHandlerAsync(func() {
// globals.SugarLogger.Debugf("fire timer:%s, orderID:%s", timerName, order.VendorOrderID)
// savedOrderInfo := s.loadSavedOrderFromMap(model.Order2Status(order), true)
// config.CallTimeoutAction(savedOrderInfo, bill)
// savedOrderInfo.timerStatus = 0
// savedOrderInfo.timerStatusType = scheduler.TimerStatusTypeUnknown
// }, jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID))
// })
// }
// globals.SugarLogger.Debugf("resetTimer, orderID:%s, statusType:%d, status:%d, timeout:%v", order.VendorOrderID, statusType, status, timeout)
// } else {
// globals.SugarLogger.Debugf("resetTimer, orderID:%s, statusType:%d, status:%d, should not set timer", order.VendorOrderID, statusType, status)
// }
// } else {
// globals.SugarLogger.Debugf("resetTimer bypass2, orderID:%s statusType:%d status:%v, config:%s", order.VendorOrderID, statusType, status, utils.Format4Output(config, true))
// }
// } else {
// globals.SugarLogger.Debugf("resetTimer bypass1, orderID:%s statusType:%d status:%v", order.VendorOrderID, statusType, status)
// }
// }
// func isStatusNewer(vendorID int, curStatusType, curStatus, statusType, status int) bool {
// // 拣货完成及之前的订单事件TIMER不能覆盖运单TIMER一般是消息错序引起的
// // 美团订单在接单后就会收到新运单事件因当前只支持一个TIMER暂时舍弃三方配送调度而要自动拣货调度
// if vendorID != model.VendorIDMTWM {
// if curStatusType == scheduler.TimerStatusTypeWaybill && statusType == scheduler.TimerStatusTypeOrder && status <= model.OrderStatusFinishedPickup {
// return false
// }
// } else {
// return statusType == scheduler.TimerStatusTypeOrder && status >= curStatus
// }
// if curStatusType == scheduler.TimerStatusTypeWaybill {
// return curStatus != status
// }
// return curStatusType != statusType || status >= curStatus
// }
func (s *DefScheduler) mergeOrderStatusConfig(savedOrderInfo *WatchOrderInfo, statusTime time.Time, statusType, status int) (retVal *StatusActionConfig) {
s.locker.RLock()
defer func() {