- big refactor for scheduler.

This commit is contained in:
gazebo
2018-07-25 20:43:41 +08:00
parent f3df85c8e0
commit c0770e9ab5
16 changed files with 515 additions and 321 deletions

View File

@@ -15,26 +15,37 @@ import (
)
const (
time2Delivered = 1 * time.Hour // 正常订单都是1小时达
time2Schedule3rdCarrier = 330 * time.Second // 京东要求5分钟后才能转自送保险起见设置为5分半钟
time2Schedule3rdCarrierGap4OrderStatus = 3 * time.Minute // 京东要求是运单状态为待抢单且超时5分钟但为了防止没有运单事件所以就拣货完成事件开始算添加3分钟
time2AutoPickupMin = 15 * time.Minute
time2AutoPickupGap = 5 * time.Minute
minTimeout = 5 * time.Second // timer的最小时间这样写的上的是在load pending orders让延迟的事件有机会执行
time2Delivered = 1 * time.Hour // 正常从下单到送达的时间。
time2Schedule3rdCarrier = 330 * time.Second // 京东要求5分钟后才能转自送保险起见设置为5分半钟
// time2Schedule3rdCarrierGap4OrderStatus = 3 * time.Minute // 京东要求是运单状态为待抢单且超时5分钟但为了防止没有运单事件所以就拣货完成事件开始算添加3分钟
time2AutoPickupMin = 15 * time.Minute
time2AutoPickupGap = 5 * 60 //随机5分钟
// 把pending order timerout 在-pendingOrderTimerMinMinSecond至pendingOrderTimerMaxSecond映射到pendingOrderTimerMinSecond至pendingOrderTimerMaxSecond
pendingOrderTimerMinMinSecond = 5 * 60 // 10分钟
pendingOrderTimerMinSecond = 2
pendingOrderTimerMaxSecond = 5
maxWaybillRetryCount = 2
orderMapStoreMaxTime = 4 * 24 * time.Hour // cache最长存储时间
)
type WatchOrderInfo struct {
order *model.GoodsOrder // order里的信息是保持更新的
waybills []*model.Waybill // 这个waybills里的状态信息是不真实的只使用id相关的信息
timerStatus int
timer *time.Timer
retryCount int
order *model.GoodsOrder // order里的信息是保持更新的
waybills []*model.Waybill // 这个waybills里的状态信息是不真实的只使用id相关的信息
timerStatusType int // 0表示订单1表示运单
timerStatus int
timer *time.Timer
retryCount int // 失败后尝试的次数,调试阶段可能出现死循化,阻止这种情况发生
}
// 重要:此调度器要求同一定单的处理逻辑必须是序列化了的,不然会有并发问题
type DefScheduler struct {
scheduler.BaseScheduler
defWorkflowConfig map[int]*scheduler.StatusActionConfig
defWorkflowConfig []map[int]*scheduler.StatusActionConfig
orderMap jxutils.SyncMapWithTimeout
}
@@ -43,78 +54,73 @@ func init() {
sch.IsReallyCallPlatformAPI = globals.ReallyCallPlatformAPI
sch.Init()
scheduler.CurrentScheduler = sch
sch.defWorkflowConfig = map[int]*scheduler.StatusActionConfig{
model.OrderStatusNew: &scheduler.StatusActionConfig{ // 自动接单
Timeout: 1 * time.Second,
TimeoutAction: func(order *model.GoodsOrder) (err error) {
_ = sch.handleAutoAcceptOrder(order.VendorOrderID, order.VendorID, order.ConsigneeMobile, jxutils.GetJxStoreIDFromOrder(order), nil, func(isAcceptIt bool) error {
if err = sch.AcceptOrRefuseOrder(order, isAcceptIt); err != nil {
// 为了解决京东新消息与接单消息乱序的问题
if errWithCode, ok := err.(*utils.ErrorWithCode); ok && errWithCode.Level() == 1 && errWithCode.IntCode() == -1 {
if order2, err2 := sch.GetPurchasePlatformFromVendorID(order.VendorID).GetOrder(order.VendorOrderID); err2 == nil && order2.Status > order.Status {
sch.OnOrderStatusChanged(model.Order2Status(order2))
err = nil
} else {
err = err2
sch.defWorkflowConfig = []map[int]*scheduler.StatusActionConfig{
map[int]*scheduler.StatusActionConfig{
model.OrderStatusNew: &scheduler.StatusActionConfig{ // 自动接单
TimerType: scheduler.TimerTypeBaseNow,
Timeout: 1 * time.Second,
TimeoutAction: func(order *model.GoodsOrder) (err error) {
_ = sch.handleAutoAcceptOrder(order.VendorOrderID, order.VendorID, order.ConsigneeMobile, jxutils.GetJxStoreIDFromOrder(order), nil, func(isAcceptIt bool) error {
if err = sch.AcceptOrRefuseOrder(order, isAcceptIt); err != nil {
// 为了解决京东新消息与接单消息乱序的问题
if errWithCode, ok := err.(*utils.ErrorWithCode); ok && errWithCode.Level() == 1 && errWithCode.IntCode() == -1 {
if order2, err2 := sch.GetPurchasePlatformFromVendorID(order.VendorID).GetOrder(order.VendorOrderID); err2 == nil && order2.Status > order.Status {
sch.OnOrderStatusChanged(model.Order2Status(order2), false)
err = nil
} else {
err = err2
}
}
}
}
return err
})
return nil
return err
})
return nil
},
},
model.OrderStatusAccepted: &scheduler.StatusActionConfig{ // 自动拣货
TimerType: scheduler.TimerTypeBaseStatusTime,
Timeout: time2AutoPickupMin,
TimeoutGap: time2AutoPickupGap,
TimeoutAction: func(order *model.GoodsOrder) (err error) {
return sch.PickedUpGoods(order)
},
},
},
model.OrderStatusAccepted: &scheduler.StatusActionConfig{ // 自动拣货
Timeout: time2AutoPickupMin,
TimeoutAction: func(order *model.GoodsOrder) (err error) {
return sch.PickedUpGoods(order)
},
},
model.OrderStatusFinishedPickup: &scheduler.StatusActionConfig{ // 尝试召唤更多物流
Timeout: time2Schedule3rdCarrier,
TimeoutAction: func(order *model.GoodsOrder) (err error) {
return sch.createWaybillOn3rdProviders(order, nil)
map[int]*scheduler.StatusActionConfig{
model.WaybillStatusNew: &scheduler.StatusActionConfig{ // 尝试召唤更多物流
TimerType: scheduler.TimerTypeBaseStatusTime,
Timeout: time2Schedule3rdCarrier,
TimeoutAction: func(order *model.GoodsOrder) (err error) {
return sch.createWaybillOn3rdProviders(order, nil)
},
},
},
}
}
// 以下是订单
func (s *DefScheduler) OnOrderNew(order *model.GoodsOrder) (err error) {
globals.SugarLogger.Debugf("OnOrderNew, orderID:%s", order.VendorOrderID)
watchInfo := &WatchOrderInfo{
order: order,
func (s *DefScheduler) OnOrderNew(order *model.GoodsOrder, isPending bool) (err error) {
globals.SugarLogger.Debugf("OnOrderNew orderID:%s", order.VendorOrderID)
savedOrderInfo := s.loadSavedOrderFromMap(model.Order2Status(order), false)
if savedOrderInfo == nil {
savedOrderInfo = &WatchOrderInfo{
order: order,
}
s.orderMap.StoreWithTimeout(jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID), savedOrderInfo, orderMapStoreMaxTime)
} else {
savedOrderInfo.order = order // 调整单可能进到这里来
}
s.orderMap.Store(jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID), watchInfo)
s.resetTimer(watchInfo, watchInfo.order.Status, order.OrderCreatedAt, 0)
s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeOrder, savedOrderInfo.order.Status, false)
return err
}
func (s *DefScheduler) OnOrderStatusChanged(status *model.OrderStatus) (err error) {
func (s *DefScheduler) OnOrderStatusChanged(status *model.OrderStatus, isPending bool) (err error) {
if status.Status > model.OrderStatusUnknown { // 只处理状态转换,一般消息不处理
globals.SugarLogger.Debugf("OnOrderStatusChanged, status:%v", status)
savedOrderInfo := s.loadSavedOrderFromMap(status)
globals.SugarLogger.Debugf("OnOrderStatusChanged orderID:%s %s, status:%v", status.VendorOrderID, model.OrderStatusName[status.Status], status)
savedOrderInfo := s.loadSavedOrderFromMap(status, true)
s.updateOrderByStatus(savedOrderInfo.order, status)
if status.Status > model.OrderStatusUnknown && status.Status < model.OrderStatusEndBegin {
if !(status.Status == model.OrderStatusFinishedPickup && len(savedOrderInfo.waybills) > 0) && status.Status != model.OrderStatusFinishedPickup { //饿了么还观察到运单消息早于拣货完成消息
gap := 0 * time.Second
beginTime := status.StatusTime
if status.Status == model.OrderStatusNew {
beginTime = time.Now()
} else if status.Status == model.OrderStatusAccepted {
gap = time.Duration(rand.Int63n(int64(time2AutoPickupGap)))
beginTime = s.getBeginTime4LatestPickup(savedOrderInfo.order)
} else if status.Status == model.OrderStatusFinishedPickup {
// 召唤三方配送
// 正常应该是只依赖于购物平台的第一个运单消息但饿了么有观察到极少数情况下没有此事件所以还是需要在这里加个保险的TIMER来驱动运单调度
gap = time2Schedule3rdCarrierGap4OrderStatus
}
s.resetTimer(savedOrderInfo, status.Status, beginTime, gap)
} else {
s.stopTimer(savedOrderInfo)
}
} else {
s.stopTimer(savedOrderInfo)
s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeOrder, savedOrderInfo.order.Status, false)
if status.Status >= model.OrderStatusEndBegin {
s.cancelOtherWaybills(savedOrderInfo, nil)
s.orderMap.Delete(jxutils.GetUniversalOrderIDFromOrderStatus(status))
}
@@ -123,78 +129,102 @@ func (s *DefScheduler) OnOrderStatusChanged(status *model.OrderStatus) (err erro
}
// 以下是运单
func (s *DefScheduler) OnWaybillStatusChanged(bill *model.Waybill) (err error) {
func (s *DefScheduler) OnWaybillStatusChanged(bill *model.Waybill, isPending bool) (err error) {
if bill.Status > model.WaybillStatusUnknown {
globals.SugarLogger.Debugf("OnWaybillStatusChanged, bill:%v", bill)
savedOrderInfo := s.loadSavedOrderFromMap(model.Waybill2Status(bill))
if bill.Status >= model.WaybillStatusNew {
if bill.Status == model.WaybillStatusNew {
s.addWaybill2Map(savedOrderInfo, bill)
if bill.OrderVendorID == bill.WaybillVendorID {
if savedOrderInfo.timerStatus == model.OrderStatusFinishedPickup { // 如果当前TIMER还是OrderStatusFinishedPickup在OnOrderStatusChanged中设置的则重置
s.resetTimer(savedOrderInfo, model.OrderStatusFinishedPickup, bill.StatusTime, 0)
} else if savedOrderInfo.timerStatus != 0 {
globals.SugarLogger.Infof("OnWaybillStatusChanged met other timer, status:%d", savedOrderInfo.timerStatus)
}
globals.SugarLogger.Debugf("OnWaybillStatusChanged orderID:%s %s, bill:%v", bill.VendorOrderID, model.WaybillStatusName[bill.Status], bill)
savedOrderInfo := s.loadSavedOrderFromMap(model.Waybill2Status(bill), true)
order := savedOrderInfo.order
if order.Status < model.OrderStatusFinishedPickup || order.Status > model.OrderStatusEndBegin { // 如果当前order状态是不应该出现运单状态
globals.SugarLogger.Infof("OnWaybillStatusChanged orderID:%s status:%s is not suitable for waybill", order.VendorOrderID, model.OrderStatusName[order.Status])
s.CancelWaybill(bill)
s.stopTimer(savedOrderInfo)
s.orderMap.Delete(jxutils.GetUniversalOrderIDFromOrderStatus(model.Order2Status(order)))
return nil
}
if bill.Status == model.WaybillStatusNew {
s.addWaybill2Map(savedOrderInfo, bill)
if order.WaybillVendorID != model.VendorIDUnknown && order.WaybillVendorID != bill.WaybillVendorID {
globals.SugarLogger.Infof("OnWaybillStatusChanged multiple waybill created, bill:%v", bill)
if bill.WaybillVendorID != bill.WaybillVendorID {
s.CancelWaybill(bill)
} else {
globals.SugarLogger.Warnf("OnWaybillStatusChanged bill:%v purchase platform bill came later than others, strange!!!", bill)
}
if savedOrderInfo.order.WaybillVendorID != model.VendorIDUnknown && savedOrderInfo.order.WaybillVendorID != bill.WaybillVendorID {
globals.SugarLogger.Infof("OnWaybillStatusChanged multiple waybill created, bill:%v", bill)
if bill.WaybillVendorID != bill.WaybillVendorID {
s.CancelWaybill(bill)
}
}
s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeWaybill, bill.Status, false)
} else {
switch bill.Status {
case model.WaybillStatusAccepted:
if order.WaybillVendorID == model.VendorIDUnknown {
s.cancelOtherWaybills(savedOrderInfo, bill)
s.updateOrderByBill(order, bill, false)
} else {
s.CancelWaybill(bill)
globals.SugarLogger.Warnf("OnWaybillStatusChanged Accepted orderID:%s got multiple bill:%v, order details:%v", order.VendorOrderID, bill, order)
}
} else {
if savedOrderInfo.order.WaybillVendorID == model.VendorIDUnknown || savedOrderInfo.order.WaybillVendorID == bill.WaybillVendorID {
s.stopTimer(savedOrderInfo) // todo 这样写可能不太合适!!!
}
switch bill.Status {
case model.WaybillStatusAccepted:
if savedOrderInfo.order.WaybillVendorID == model.VendorIDUnknown {
s.cancelOtherWaybills(savedOrderInfo, bill)
s.CurOrderManager.UpdateWaybillVendorID(bill)
savedOrderInfo.order.WaybillVendorID = bill.WaybillVendorID
} else {
globals.SugarLogger.Infof("orderID:%s got multiple bill:%v, order details:%v", savedOrderInfo.order.VendorOrderID, bill, savedOrderInfo.order)
}
case model.WaybillStatusAcceptCanceled:
if savedOrderInfo.order.WaybillVendorID == bill.WaybillVendorID {
bill.WaybillVendorID = model.VendorIDUnknown
s.CurOrderManager.UpdateWaybillVendorID(bill)
savedOrderInfo.order.WaybillVendorID = model.VendorIDUnknown
s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeWaybill, bill.Status, false)
case model.WaybillStatusAcceptCanceled:
if order.WaybillVendorID == bill.WaybillVendorID {
bill.WaybillVendorID = model.VendorIDUnknown
s.updateOrderByBill(order, bill, false)
s.createWaybillOn3rdProviders(savedOrderInfo.order, bill)
}
case model.WaybillStatusCourierArrived: // do nothing
case model.WaybillStatusFailed: // todo WaybillStatusFailed理解成订单整个失败了不需要再尝试创建运单了注意这里应该加个zabbix日志的报警
s.removeWaybillFromMap(savedOrderInfo, bill)
globals.SugarLogger.Infof("OnWaybillStatusChanged WaybillStatusFailed, bill:%v", bill)
if savedOrderInfo.order.WaybillVendorID == bill.WaybillVendorID {
bill.WaybillVendorID = model.VendorIDUnknown
s.CurOrderManager.UpdateWaybillVendorID(bill)
savedOrderInfo.order.WaybillVendorID = model.VendorIDUnknown
}
case model.WaybillStatusCanceled:
s.removeWaybillFromMap(savedOrderInfo, bill)
if savedOrderInfo.order.WaybillVendorID == bill.WaybillVendorID {
bill.WaybillVendorID = model.VendorIDUnknown
s.CurOrderManager.UpdateWaybillVendorID(bill)
savedOrderInfo.order.WaybillVendorID = model.VendorIDUnknown
savedOrderInfo.retryCount++
savedOrderInfo.order.Status = model.OrderStatusFinishedPickup // 如果运单被取消且是主运单将订单状态强制回滚到model.OrderStatusFinishedPickup
if savedOrderInfo.retryCount < 2 {
s.createWaybillOn3rdProviders(savedOrderInfo.order, nil)
}
}
case model.WaybillStatusDelivering:
if savedOrderInfo.order.WaybillVendorID == bill.WaybillVendorID && savedOrderInfo.order.VendorID != bill.WaybillVendorID {
s.SelfDeliverDelievering(savedOrderInfo.order)
}
case model.WaybillStatusDelivered:
s.removeWaybillFromMap(savedOrderInfo, bill)
if savedOrderInfo.order.WaybillVendorID == bill.WaybillVendorID && savedOrderInfo.order.VendorID != bill.WaybillVendorID {
s.SelfDeliverDelievered(savedOrderInfo.order)
}
s.createWaybillOn3rdProviders(order, bill)
s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeWaybill, bill.Status, false)
} else if order.WaybillVendorID != model.VendorIDUnknown {
s.CancelWaybill(bill)
globals.SugarLogger.Warnf("OnWaybillStatusChanged AcceptCanceled orderID:%s got multiple bill:%v, order details:%v", order.VendorOrderID, bill, order)
}
case model.WaybillStatusCourierArrived: // do nothing
if order.WaybillVendorID == bill.WaybillVendorID {
} else {
globals.SugarLogger.Warnf("OnWaybillStatusChanged CourierArrived bill:%v shouldn't got here", bill)
}
s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeWaybill, bill.Status, false)
case model.WaybillStatusFailed: // todo WaybillStatusFailed理解成订单整个失败了不需要再尝试创建运单了注意这里应该加个zabbix日志的报警
s.removeWaybillFromMap(savedOrderInfo, bill)
if order.WaybillVendorID == bill.WaybillVendorID {
globals.SugarLogger.Infof("OnWaybillStatusChanged WaybillStatusFailed, bill:%v", bill)
if order.WaybillVendorID == bill.WaybillVendorID {
bill.WaybillVendorID = model.VendorIDUnknown
s.updateOrderByBill(order, bill, true)
}
s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeWaybill, bill.Status, false)
} else {
globals.SugarLogger.Warnf("OnWaybillStatusChanged Failed bill:%v shouldn't got here", bill)
}
case model.WaybillStatusCanceled:
s.removeWaybillFromMap(savedOrderInfo, bill)
if order.WaybillVendorID == bill.WaybillVendorID {
bill.WaybillVendorID = model.VendorIDUnknown
s.updateOrderByBill(order, bill, true)
savedOrderInfo.retryCount++
if savedOrderInfo.retryCount <= maxWaybillRetryCount {
s.createWaybillOn3rdProviders(order, nil)
} else {
globals.SugarLogger.Warnf("OnWaybillStatusChanged Canceled bill:%v failed %d times, stop schedule", bill, savedOrderInfo.retryCount)
}
s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeWaybill, bill.Status, false)
}
case model.WaybillStatusDelivering:
if order.WaybillVendorID == bill.WaybillVendorID {
if order.VendorID != bill.WaybillVendorID {
s.SelfDeliverDelievering(order)
}
} else {
globals.SugarLogger.Warnf("OnWaybillStatusChanged Delivering bill:%v shouldn't got here", bill)
}
s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeWaybill, bill.Status, false)
case model.WaybillStatusDelivered:
s.removeWaybillFromMap(savedOrderInfo, bill)
if order.WaybillVendorID == bill.WaybillVendorID {
if order.VendorID != bill.WaybillVendorID {
s.SelfDeliverDelievered(order)
}
} else {
globals.SugarLogger.Warnf("OnWaybillStatusChanged Delivered bill:%v shouldn't got here", bill)
}
s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeWaybill, bill.Status, false)
}
}
}
@@ -205,7 +235,7 @@ func (s *DefScheduler) addWaybill2Map(savedOrderInfo *WatchOrderInfo, bill *mode
for _, v := range savedOrderInfo.waybills {
if v.VendorWaybillID == bill.VendorWaybillID && v.WaybillVendorID == bill.WaybillVendorID {
// 如果已经存在,不做处理
// globals.SugarLogger.Infof("addWaybill2Map bill:%v already exists", bill)
globals.SugarLogger.Warnf("addWaybill2Map bill:%v already exists", bill)
return
}
}
@@ -227,7 +257,7 @@ func (s *DefScheduler) createWaybillOn3rdProviders(order *model.GoodsOrder, excl
if s.isOrderSupport3rdDelivery(order) {
successCount := 0
for vendorID := range s.DeliveryPlatformHandlers {
if excludeBill == nil || vendorID != excludeBill.WaybillVendorID {
if (excludeBill == nil || vendorID != excludeBill.WaybillVendorID) && s.DeliveryPlatformHandlers[vendorID].Use4CreateWaybill {
if err = s.CreateWaybill(vendorID, order); err == nil {
successCount++
}
@@ -236,12 +266,12 @@ func (s *DefScheduler) createWaybillOn3rdProviders(order *model.GoodsOrder, excl
if successCount != 0 {
return nil
}
globals.SugarLogger.Warnf("createWaybillOn3rdProviders, orderID:%s all failed", order.VendorOrderID)
globals.SugarLogger.Infof("createWaybillOn3rdProviders, orderID:%s all failed", order.VendorOrderID)
return scheduler.ErrCanNotCreateAtLeastOneWaybill
}
globals.SugarLogger.Debugf("createWaybillOn3rdProviders, orderID:%s, store:%d dont't support 3rd delivery platform", order.VendorOrderID, jxutils.GetJxStoreIDFromOrder(order))
} else {
globals.SugarLogger.Debugf("createWaybillOn3rdProviders, orderID:%s, status:%d doesn't match model.OrderStatusFinishedPickup, bypass", order.VendorOrderID, order.Status)
globals.SugarLogger.Warnf("createWaybillOn3rdProviders, orderID:%s, status:%d doesn't match model.OrderStatusFinishedPickup, bypass", order.VendorOrderID, order.Status)
}
return nil
}
@@ -259,35 +289,27 @@ func (s *DefScheduler) cancelOtherWaybills(savedOrderInfo *WatchOrderInfo, bill
return nil
}
// todo 这个函数也可能有线程安全问题
func (s *DefScheduler) swtich2SelfDeliverWithRetry(order *model.GoodsOrder, bill *model.Waybill, retryCount int, duration time.Duration) {
globals.SugarLogger.Debugf("Swtich2SelfDeliver orderID:%s", order.VendorOrderID)
utils.CallFuncRetryAsync(func(index int) error {
err := s.Swtich2SelfDeliver(order)
if err != nil {
globals.SugarLogger.Infof("Swtich2SelfDeliver failed, orderID:%s, error:%v", order.VendorOrderID, err)
if err != nil && index == 0 {
globals.SugarLogger.Warnf("Swtich2SelfDeliver finally failed, orderID:%s, error:%v, have to cancel bill:%v", order.VendorOrderID, err, bill)
// 如果购买平台转商家自送失败最终还是要取消3方物流
s.CancelWaybill(bill)
}
}
return err
}, duration, retryCount)
globals.SugarLogger.Debugf("swtich2SelfDeliverWithRetry orderID:%s", order.VendorOrderID)
// 当前先不加RETRY逻辑
if err := s.Swtich2SelfDeliver(order); err != nil {
globals.SugarLogger.Infof("swtich2SelfDeliverWithRetry failed, cancel bill:%v, err:%v", bill, err)
s.CancelWaybill(bill)
}
}
// 这个函数这样写的原因是适应一些消息错序
func (s *DefScheduler) loadSavedOrderFromMap(status *model.OrderStatus) *WatchOrderInfo {
func (s *DefScheduler) loadSavedOrderFromMap(status *model.OrderStatus, isAutoLoad bool) *WatchOrderInfo {
globals.SugarLogger.Debugf("loadSavedOrderFromMap status:%v", status)
universalOrderID := jxutils.ComposeUniversalOrderID(status.RefVendorOrderID, status.RefVendorID)
var realSavedInfo *WatchOrderInfo
if savedInfo, ok := s.orderMap.Load(universalOrderID); ok {
realSavedInfo = savedInfo.(*WatchOrderInfo)
}
if realSavedInfo == nil || !model.IsOrderSolid(realSavedInfo.order) {
if isAutoLoad && (realSavedInfo == nil || !model.IsOrderSolid(realSavedInfo.order)) {
if realSavedInfo == nil {
realSavedInfo = new(WatchOrderInfo)
s.orderMap.Store(universalOrderID, realSavedInfo)
s.orderMap.StoreWithTimeout(universalOrderID, realSavedInfo, orderMapStoreMaxTime)
} else {
globals.SugarLogger.Infof("loadSavedOrderFromMap order is incomplete, orderID:%s, load it", status.RefVendorOrderID)
}
@@ -308,41 +330,63 @@ func (s *DefScheduler) loadSavedOrderFromMap(status *model.OrderStatus) *WatchOr
return realSavedInfo
}
func (s *DefScheduler) getBeginTime4LatestPickup(order *model.GoodsOrder) (retVal time.Time) {
if order.ExpectedDeliveredTime != utils.DefaultTimeValue {
return order.ExpectedDeliveredTime.Add(-time2Delivered)
}
return order.StatusTime
}
func (s *DefScheduler) stopTimer(savedOrderInfo *WatchOrderInfo) {
if savedOrderInfo.timer != nil {
globals.SugarLogger.Debugf("stopTimer orderid:%v", savedOrderInfo.order.VendorOrderID)
globals.SugarLogger.Debugf("stopTimer orderID:%s", savedOrderInfo.order.VendorOrderID)
savedOrderInfo.timer.Stop()
savedOrderInfo.timerStatus = 0
savedOrderInfo.timer = nil
}
}
func (s *DefScheduler) resetTimer(savedOrderInfo *WatchOrderInfo, status int, beginTime time.Time, gap time.Duration) {
globals.SugarLogger.Debugf("resetTimer status:%v, orderID:%s", status, savedOrderInfo.order.VendorOrderID)
if status >= savedOrderInfo.timerStatus { // 新设置的TIMER不能覆盖状态在其后的TIMER如果状态回绕需要注意
s.stopTimer(savedOrderInfo)
config := s.mergeOrderStatusConfig(status, s.GetPurchasePlatformFromVendorID(savedOrderInfo.order.VendorID).GetStatusActionConfig(status))
if config != nil && config.TimeoutAction != nil {
timeout := jxutils.GetRealTimeout(beginTime, config.Timeout, minTimeout) + gap
globals.SugarLogger.Debugf("resetTimer timeout:%v, orderID:%s", timeout, savedOrderInfo.order.VendorOrderID)
savedOrderInfo.timerStatus = status
order := savedOrderInfo.order
savedOrderInfo.timer = time.AfterFunc(timeout, func() {
// order 事件序列化
jxutils.CallMsgHandlerAsync(func() {
config.TimeoutAction(order)
savedOrderInfo.timerStatus = 0
}, order.VendorOrderID)
})
func (s *DefScheduler) resetTimer(savedOrderInfo *WatchOrderInfo, statusType, status int, isPending bool) {
order := savedOrderInfo.order
globals.SugarLogger.Debugf("resetTimer, orderID:%s status:%v", order.VendorOrderID, status)
if statusType != savedOrderInfo.timerStatusType || status >= savedOrderInfo.timerStatus { // 新设置的TIMER不能覆盖状态在其后的TIMER如果状态回绕需要注意
config := s.mergeOrderStatusConfig(statusType, status, s.GetPurchasePlatformFromVendorID(order.VendorID).GetStatusActionConfig(statusType, status))
if config == nil || config.TimerType != scheduler.TimerTypeByPass {
s.stopTimer(savedOrderInfo)
}
if config != nil && config.TimeoutAction != nil && config.TimerType != scheduler.TimerTypeNoTimer && config.TimerType != scheduler.TimerTypeByPass {
var timeout time.Duration
switch config.TimerType {
case scheduler.TimerTypeBaseNow:
timeout = config.Timeout
case scheduler.TimerTypeBaseStatusTime:
timeout = order.StatusTime.Sub(time.Now()) + config.Timeout
case scheduler.TimerTypeBaseExpectedDeliveredTime:
expectedDeliveredTime := order.ExpectedDeliveredTime
if expectedDeliveredTime == utils.DefaultTimeValue { // 如果没有期望送达时间则以订单创建时间加DefaultTimeValue来表示
expectedDeliveredTime = order.OrderCreatedAt.Add(time2Delivered)
}
timeout = expectedDeliveredTime.Add(-config.Timeout).Sub(time.Now())
default:
panic("TimerType is wrong!!!")
}
if config.TimeoutGap != 0 {
timeout += time.Duration(rand.Int31n(int32(config.TimeoutGap))) * time.Second
}
if isPending && timeout < pendingOrderTimerMaxSecond*time.Second { // 如果是PENDING的订单则将其分布到2--5秒内让后续事件有机会执行
timeout = time.Duration(jxutils.MapValue2Scope(int64(timeout), -pendingOrderTimerMinMinSecond*1000, pendingOrderTimerMaxSecond*1000, pendingOrderTimerMinSecond*1000, pendingOrderTimerMaxSecond*1000)) * time.Millisecond
} else if timeout < 0 {
timeout = 0
}
if timeout == 0 {
config.TimeoutAction(order)
} else {
savedOrderInfo.timerStatusType = statusType
savedOrderInfo.timerStatus = status
savedOrderInfo.timer = time.AfterFunc(timeout, func() {
jxutils.CallMsgHandlerAsync(func() {
config.TimeoutAction(order)
savedOrderInfo.timerStatus = 0
savedOrderInfo.timerStatusType = scheduler.TimerStatusTypeUnknown
}, order.VendorOrderID)
})
}
globals.SugarLogger.Debugf("resetTimer orderID:%s, status:%d, timeout:%v", order.VendorOrderID, status, timeout)
}
} else {
globals.SugarLogger.Infof("resetTimer status revert, orderID:%s, current timer status:%d, status:%d", savedOrderInfo.order.VendorOrderID, savedOrderInfo.timerStatus, status)
}
}
@@ -378,8 +422,8 @@ func (s *DefScheduler) handleAutoAcceptOrder(orderID string, vendorID int, userM
return handleType
}
func (s *DefScheduler) mergeOrderStatusConfig(status int, config *scheduler.StatusActionConfig) (retVal *scheduler.StatusActionConfig) {
defConfig := s.defWorkflowConfig[status]
func (s *DefScheduler) mergeOrderStatusConfig(statusType, status int, config *scheduler.StatusActionConfig) (retVal *scheduler.StatusActionConfig) {
defConfig := s.defWorkflowConfig[statusType][status]
if defConfig == nil && config == nil {
return nil
}
@@ -423,3 +467,11 @@ func (s *DefScheduler) isOrderSupport3rdDelivery(order *model.GoodsOrder) (retVa
}, "isOrderSupport3rdDelivery")
return retVal
}
func (s *DefScheduler) updateOrderByBill(order *model.GoodsOrder, bill *model.Waybill, revertStatus bool) {
s.CurOrderManager.UpdateWaybillVendorID(bill, revertStatus)
order.WaybillVendorID = bill.WaybillVendorID
if revertStatus {
order.Status = model.OrderStatusFinishedPickup
}
}