- 统一清理调用CallMsgHandlerAsync时的primaryID设置

- defsch中,自动接单调用OnOrderStatusChanged改为异步,防止死循环
This commit is contained in:
gazebo
2019-05-14 11:54:22 +08:00
parent 313b9ca19b
commit 135563ea4d
7 changed files with 21 additions and 24 deletions

View File

@@ -142,17 +142,17 @@ func LoadPendingOrders() {
if order, ok := item.(*model.GoodsOrder); ok { if order, ok := item.(*model.GoodsOrder); ok {
jxutils.CallMsgHandlerAsync(func() { jxutils.CallMsgHandlerAsync(func() {
scheduler.CurrentScheduler.OnOrderNew(order, true) scheduler.CurrentScheduler.OnOrderNew(order, true)
}, order.VendorOrderID) }, jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID))
} else if status, ok := item.(*model.OrderStatus); ok { } else if status, ok := item.(*model.OrderStatus); ok {
jxutils.CallMsgHandlerAsync(func() { jxutils.CallMsgHandlerAsync(func() {
order := orderMap[jxutils.ComposeUniversalOrderID(status.VendorOrderID, status.VendorID)] order := orderMap[jxutils.ComposeUniversalOrderID(status.VendorOrderID, status.VendorID)]
scheduler.CurrentScheduler.OnOrderStatusChanged(order, status, true) scheduler.CurrentScheduler.OnOrderStatusChanged(order, status, true)
}, status.VendorOrderID) }, jxutils.ComposeUniversalOrderID(status.RefVendorOrderID, status.RefVendorID))
} else { } else {
bill := item.(*model.Waybill) bill := item.(*model.Waybill)
jxutils.CallMsgHandlerAsync(func() { jxutils.CallMsgHandlerAsync(func() {
scheduler.CurrentScheduler.OnWaybillStatusChanged(bill, true) scheduler.CurrentScheduler.OnWaybillStatusChanged(bill, true)
}, bill.VendorOrderID) }, jxutils.ComposeUniversalOrderID(bill.VendorOrderID, bill.OrderVendorID))
} }
curTime := time.Now() curTime := time.Now()
timeout := sleepGap - curTime.Sub(lastTime) timeout := sleepGap - curTime.Sub(lastTime)

View File

@@ -143,7 +143,10 @@ func init() {
// 为了解决京东新消息与接单消息乱序的问题 // 为了解决京东新消息与接单消息乱序的问题
if errWithCode, ok := err.(*utils.ErrorWithCode); ok && errWithCode.Level() == 1 && errWithCode.IntCode() == -1 { if errWithCode, ok := err.(*utils.ErrorWithCode); ok && errWithCode.Level() == 1 && errWithCode.IntCode() == -1 {
if order2, err2 := partner.GetPurchasePlatformFromVendorID(order.VendorID).GetOrder(order.VendorOrderID); err2 == nil && order2.Status > order.Status { if order2, err2 := partner.GetPurchasePlatformFromVendorID(order.VendorID).GetOrder(order.VendorOrderID); err2 == nil && order2.Status > order.Status {
// sch.OnOrderStatusChanged(order, model.Order2Status(order2), false) order.Status = order2.Status
jxutils.CallMsgHandlerAsync(func() {
sch.OnOrderStatusChanged(order, model.Order2Status(order2), false)
}, jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID))
err = nil err = nil
} else { } else {
err = err2 err = err2
@@ -574,7 +577,7 @@ func (s *DefScheduler) swtich2SelfDeliverWithRetry(savedOrderInfo *WatchOrderInf
utils.AfterFuncWithRecover(duration, func() { utils.AfterFuncWithRecover(duration, func() {
jxutils.CallMsgHandlerAsync(func() { jxutils.CallMsgHandlerAsync(func() {
s.swtich2SelfDeliverWithRetry(savedOrderInfo, bill, retryCount-1, duration) s.swtich2SelfDeliverWithRetry(savedOrderInfo, bill, retryCount-1, duration)
}, order.VendorOrderID) }, jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID))
}) })
} else { } else {
errStr := fmt.Sprintf("订单:%s转自配送失败, 错误信息:%v", order.VendorOrderID, err) errStr := fmt.Sprintf("订单:%s转自配送失败, 错误信息:%v", order.VendorOrderID, err)
@@ -685,7 +688,7 @@ func (s *DefScheduler) resetTimer(savedOrderInfo *WatchOrderInfo, bill *model.Wa
config.TimeoutAction(savedOrderInfo) config.TimeoutAction(savedOrderInfo)
savedOrderInfo.timerStatus = 0 savedOrderInfo.timerStatus = 0
savedOrderInfo.timerStatusType = scheduler.TimerStatusTypeUnknown savedOrderInfo.timerStatusType = scheduler.TimerStatusTypeUnknown
}, order.VendorOrderID) }, jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID))
}) })
} }
globals.SugarLogger.Debugf("resetTimer, orderID:%s, status:%d, timeout:%v", order.VendorOrderID, status, timeout) globals.SugarLogger.Debugf("resetTimer, orderID:%s, status:%d, timeout:%v", order.VendorOrderID, status, timeout)

View File

@@ -50,7 +50,7 @@ func (c *DeliveryHandler) GetVendorID() int {
func (c *DeliveryHandler) OnWaybillMsg(msg *dadaapi.CallbackMsg) (retVal *dadaapi.CallbackResponse) { func (c *DeliveryHandler) OnWaybillMsg(msg *dadaapi.CallbackMsg) (retVal *dadaapi.CallbackResponse) {
jxutils.CallMsgHandler(func() { jxutils.CallMsgHandler(func() {
retVal = c.onWaybillMsg(msg) retVal = c.onWaybillMsg(msg)
}, msg.OrderID) }, jxutils.ComposeUniversalOrderID(msg.OrderID, model.VendorIDDada))
return retVal return retVal
} }

View File

@@ -55,7 +55,7 @@ func OnWaybillExcept(msg *mtpsapi.CallbackOrderExceptionMsg) (retVal *mtpsapi.Ca
func (c *DeliveryHandler) OnWaybillMsg(msg *mtpsapi.CallbackOrderMsg) (retVal *mtpsapi.CallbackResponse) { func (c *DeliveryHandler) OnWaybillMsg(msg *mtpsapi.CallbackOrderMsg) (retVal *mtpsapi.CallbackResponse) {
jxutils.CallMsgHandler(func() { jxutils.CallMsgHandler(func() {
retVal = c.onWaybillMsg(msg) retVal = c.onWaybillMsg(msg)
}, msg.OrderID) }, jxutils.ComposeUniversalOrderID(msg.OrderID, model.VendorIDMTPS))
return retVal return retVal
} }
@@ -73,7 +73,7 @@ func (c *DeliveryHandler) OnWaybillExcept(msg *mtpsapi.CallbackOrderExceptionMsg
} }
order.VendorOrderID, order.OrderVendorID = jxutils.SplitUniversalOrderID(msg.OrderID) order.VendorOrderID, order.OrderVendorID = jxutils.SplitUniversalOrderID(msg.OrderID)
retVal = mtpsapi.Err2CallbackResponse(partner.CurOrderManager.OnWaybillStatusChanged(order), "mtps OnWaybillExcept") retVal = mtpsapi.Err2CallbackResponse(partner.CurOrderManager.OnWaybillStatusChanged(order), "mtps OnWaybillExcept")
}, msg.OrderID) }, jxutils.ComposeUniversalOrderID(msg.OrderID, model.VendorIDDada))
return retVal return retVal
} }

View File

@@ -45,11 +45,9 @@ func (c *PurchaseHandler) isAfsMsg(msg *ebaiapi.CallbackMsg) bool {
} }
func (c *PurchaseHandler) OnAfsOrderMsg(msg *ebaiapi.CallbackMsg) (retVal *ebaiapi.CallbackResponse) { func (c *PurchaseHandler) OnAfsOrderMsg(msg *ebaiapi.CallbackMsg) (retVal *ebaiapi.CallbackResponse) {
utils.CallFuncAsync(func() { jxutils.CallMsgHandlerAsync(func() {
jxutils.CallMsgHandler(func() { retVal = c.onAfsOrderMsg(msg)
retVal = c.onAfsOrderMsg(msg) }, jxutils.ComposeUniversalOrderID(GetOrderIDFromMsg(msg), model.VendorIDEBAI))
}, jxutils.ComposeUniversalOrderID(GetOrderIDFromMsg(msg), model.VendorIDEBAI))
})
return retVal return retVal
} }

View File

@@ -60,11 +60,9 @@ var (
) )
func (c *PurchaseHandler) OnAfsOrderMsg(msg *jdapi.CallbackOrderMsg) (retVal *jdapi.CallbackResponse) { func (c *PurchaseHandler) OnAfsOrderMsg(msg *jdapi.CallbackOrderMsg) (retVal *jdapi.CallbackResponse) {
utils.CallFuncAsync(func() { jxutils.CallMsgHandlerAsync(func() {
jxutils.CallMsgHandler(func() { retVal = c.onAfsOrderMsg(msg)
retVal = c.onAfsOrderMsg(msg) }, jxutils.ComposeUniversalOrderID(msg.BillID, model.VendorIDJD))
}, jxutils.ComposeUniversalOrderID(msg.BillID, model.VendorIDJD))
})
return retVal return retVal
} }

View File

@@ -42,11 +42,9 @@ func (c *PurchaseHandler) isAfsMsg(msg *mtwmapi.CallbackMsg) bool {
} }
func (c *PurchaseHandler) OnAfsOrderMsg(msg *mtwmapi.CallbackMsg) (retVal *mtwmapi.CallbackResponse) { func (c *PurchaseHandler) OnAfsOrderMsg(msg *mtwmapi.CallbackMsg) (retVal *mtwmapi.CallbackResponse) {
utils.CallFuncAsync(func() { jxutils.CallMsgHandlerAsync(func() {
jxutils.CallMsgHandler(func() { retVal = c.onAfsOrderMsg(msg)
retVal = c.onAfsOrderMsg(msg) }, jxutils.ComposeUniversalOrderID(GetOrderIDFromMsg(msg), model.VendorIDEBAI))
}, jxutils.ComposeUniversalOrderID(GetOrderIDFromMsg(msg), model.VendorIDEBAI))
})
return retVal return retVal
} }