- fix weixin push msg bug.

- handle jd out-of-order msg.
This commit is contained in:
gazebo
2018-07-20 23:22:31 +08:00
parent 6f0d27e54d
commit 65eeef9966
6 changed files with 97 additions and 65 deletions

View File

@@ -24,6 +24,7 @@ const (
type WatchOrderInfo struct {
order *model.GoodsOrder // order里的信息是保持更新的
dirty int // 因为京东事件序列New与Accepted有极少数情况下会错序处理延迟加载
waybills []*model.Waybill // 这个waybills里的状态信息是不真实的只使用id相关的信息
timerStatus int
timer *time.Timer
@@ -79,7 +80,7 @@ func (s *DefScheduler) OnOrderNew(order *model.GoodsOrder) (err error) {
func (s *DefScheduler) OnOrderStatusChanged(status *model.OrderStatus) (err error) {
if status.Status > model.OrderStatusUnknown {
globals.SugarLogger.Debugf("OnOrderStatusChanged, status:%v", status)
if savedOrderInfo := s.loadWatchOrderFromMap(status.VendorOrderID, status.VendorID); savedOrderInfo != nil {
if savedOrderInfo := s.loadSavedOrderFromMap(status); savedOrderInfo != nil {
s.updateOrderByStatus(savedOrderInfo.order, status)
if status.Status > model.OrderStatusUnknown && status.Status < model.OrderStatusEndBegin {
if !(status.Status == model.OrderStatusFinishedPickup && len(savedOrderInfo.waybills) > 0) { //饿了么还观察到运单消息早于拣货完成消息
@@ -99,6 +100,7 @@ func (s *DefScheduler) OnOrderStatusChanged(status *model.OrderStatus) (err erro
}
} else {
s.stopTimer(savedOrderInfo)
s.cancelOtherWaybills(savedOrderInfo, nil)
s.orderMap.Delete(jxutils.GetUniversalOrderIDFromOrderStatus(status))
}
} else {
@@ -112,7 +114,7 @@ func (s *DefScheduler) OnOrderStatusChanged(status *model.OrderStatus) (err erro
func (s *DefScheduler) OnWaybillStatusChanged(bill *model.Waybill) (err error) {
if bill.Status > model.WaybillStatusUnknown {
globals.SugarLogger.Debugf("OnWaybillStatusChanged, bill:%v", bill)
if savedOrderInfo := s.loadWatchOrderFromMap(bill.VendorOrderID, bill.OrderVendorID); savedOrderInfo != nil {
if savedOrderInfo := s.loadSavedOrderFromMap(model.Waybill2Status(bill)); savedOrderInfo != nil {
s.addWaybill2Map(savedOrderInfo, bill) // 这样写的原因是因为调试时程度从中途运行没有接受到WaybillStatusNew事件
if bill.Status == model.WaybillStatusNew {
if bill.OrderVendorID == bill.WaybillVendorID {
@@ -144,7 +146,10 @@ func (s *DefScheduler) OnWaybillStatusChanged(bill *model.Waybill) (err error) {
s.CurOrderManager.UpdateWaybillVendorID(bill)
savedOrderInfo.order.WaybillVendorID = bill.WaybillVendorID
}
case model.WaybillStatusCanceled, model.WaybillStatusFailed:
case model.WaybillStatusFailed: // WaybillStatusFailed理解成订单整个失败了,不需要再尝试创建运单了
s.removeWaybillFromMap(savedOrderInfo, bill)
globals.SugarLogger.Infof("OnWaybillStatusChanged WaybillStatusFailed, bill:%v", bill)
case model.WaybillStatusCanceled:
s.removeWaybillFromMap(savedOrderInfo, bill)
if savedOrderInfo.order.WaybillVendorID == bill.WaybillVendorID {
s.createWaybillOn3rdProviders(savedOrderInfo.order, nil)
@@ -158,10 +163,10 @@ func (s *DefScheduler) OnWaybillStatusChanged(bill *model.Waybill) (err error) {
s.GetPurchasePlatformFromVendorID(bill.OrderVendorID).SelfDeliverDelievering(savedOrderInfo.order)
}
case model.WaybillStatusDelivered:
s.removeWaybillFromMap(savedOrderInfo, bill)
if savedOrderInfo.order.VendorID != bill.WaybillVendorID {
s.GetPurchasePlatformFromVendorID(bill.OrderVendorID).SelfDeliverDelievered(savedOrderInfo.order)
}
s.removeWaybillFromMap(savedOrderInfo, bill)
}
}
} else {
@@ -210,11 +215,11 @@ func (s *DefScheduler) createWaybillOn3rdProviders(order *model.GoodsOrder, excl
func (s *DefScheduler) cancelOtherWaybills(savedOrderInfo *WatchOrderInfo, bill *model.Waybill) (err error) {
globals.SugarLogger.Debugf("cancelOtherWaybills, order:%v, bill:%v", savedOrderInfo.order, bill)
for _, v := range savedOrderInfo.waybills {
if v.WaybillVendorID != bill.OrderVendorID && !(v.WaybillVendorID == bill.WaybillVendorID && v.VendorWaybillID == bill.VendorWaybillID) {
if bill == nil || v.WaybillVendorID != bill.OrderVendorID && !(v.WaybillVendorID == bill.WaybillVendorID && v.VendorWaybillID == bill.VendorWaybillID) {
_ = s.GetDeliveryPlatformFromVendorID(v.WaybillVendorID).CancelWaybill(v)
}
}
if bill.WaybillVendorID != bill.OrderVendorID {
if bill != nil && bill.WaybillVendorID != bill.OrderVendorID {
s.swtich2SelfDeliverWithRetry(bill, 2, 10*time.Second)
}
return nil
@@ -231,20 +236,34 @@ func (s *DefScheduler) swtich2SelfDeliverWithRetry(bill *model.Waybill, retryCou
}, duration, retryCount)
}
func (s *DefScheduler) loadWatchOrderFromMap(vendorOrderID string, vendorID int) *WatchOrderInfo {
universalOrderID := jxutils.ComposeUniversalOrderID(vendorOrderID, vendorID)
// 这个函数这样写的原因是适应一些消息错序
func (s *DefScheduler) loadSavedOrderFromMap(status *model.OrderStatus) *WatchOrderInfo {
universalOrderID := jxutils.ComposeUniversalOrderID(status.VendorOrderID, status.VendorID)
var realSavedInfo *WatchOrderInfo
if savedInfo, ok := s.orderMap.Load(universalOrderID); ok {
realSavedInfo = savedInfo.(*WatchOrderInfo)
} else {
globals.SugarLogger.Infof("can not get saved order, vendorOrderID:%s, vendorID:%d, load it", vendorOrderID, vendorID)
if order, err := s.CurOrderManager.LoadOrder(vendorOrderID, vendorID); err == nil {
realSavedInfo = &WatchOrderInfo{
order: order,
}
}
if realSavedInfo == nil || realSavedInfo.dirty == 1 {
if realSavedInfo == nil {
realSavedInfo = new(WatchOrderInfo)
s.orderMap.Store(universalOrderID, realSavedInfo)
} else {
globals.SugarLogger.Errorf("can not load order vendorOrderID:%s, vendorID:%d", vendorOrderID, vendorID)
realSavedInfo.dirty = 0
globals.SugarLogger.Infof("order is dirty, vendorOrderID:%s, vendorID:%d, load it", status.VendorOrderID, status.VendorID)
}
if order, err := s.CurOrderManager.LoadOrder(status.VendorOrderID, status.VendorID); err == nil {
realSavedInfo.order = order
} else {
realSavedInfo.order = &model.GoodsOrder{
VendorOrderID: status.VendorOrderID,
VendorID: status.VendorID,
Status: status.Status,
StatusTime: status.StatusTime,
OrderCreatedAt: status.StatusTime,
WaybillVendorID: model.VendorIDUnknown,
}
realSavedInfo.dirty = 1
globals.SugarLogger.Infof("can not load order vendorOrderID:%s, vendorID:%d", status.VendorOrderID, status.VendorID)
}
}
return realSavedInfo
@@ -266,16 +285,20 @@ func (s *DefScheduler) stopTimer(savedOrderInfo *WatchOrderInfo) {
func (s *DefScheduler) resetTimer(savedOrderInfo *WatchOrderInfo, status int, beginTime time.Time, gap time.Duration) {
globals.SugarLogger.Debugf("resetTimer status:%v, orderid:%v", status, savedOrderInfo.order.VendorOrderID)
s.stopTimer(savedOrderInfo)
config := s.mergeOrderStatusConfig(status, s.GetPurchasePlatformFromVendorID(savedOrderInfo.order.VendorID).GetStatusActionConfig(status))
if config != nil && config.TimeoutAction != nil {
timeout := jxutils.GetRealTimeout(beginTime, config.Timeout) + gap
globals.SugarLogger.Debugf("resetTimer timeout:%v, orderid:%v", timeout, savedOrderInfo.order.VendorOrderID)
savedOrderInfo.timerStatus = status
savedOrderInfo.timer = time.AfterFunc(timeout, func() {
config.TimeoutAction(savedOrderInfo.order)
savedOrderInfo.timerStatus = 0
})
if status >= savedOrderInfo.timerStatus { // 新设置的TIMER不能覆盖状态在其后的TIMER
s.stopTimer(savedOrderInfo)
config := s.mergeOrderStatusConfig(status, s.GetPurchasePlatformFromVendorID(savedOrderInfo.order.VendorID).GetStatusActionConfig(status))
if config != nil && config.TimeoutAction != nil {
timeout := jxutils.GetRealTimeout(beginTime, config.Timeout) + gap
globals.SugarLogger.Debugf("resetTimer timeout:%v, orderid:%v", timeout, savedOrderInfo.order.VendorOrderID)
savedOrderInfo.timerStatus = status
savedOrderInfo.timer = time.AfterFunc(timeout, func() {
config.TimeoutAction(savedOrderInfo.order)
savedOrderInfo.timerStatus = 0 // todo 可能有线程安全问题,考虑加入订单队列
})
}
} else {
globals.SugarLogger.Infof("resetTimer status revert, orderid:%s, current timer status:%d, status:%d", savedOrderInfo.order.VendorOrderID, savedOrderInfo.timerStatus, status)
}
}