From ae214d38b9dd60e5dcb98227418a477da2eeae6d Mon Sep 17 00:00:00 2001 From: gazebo Date: Sun, 22 Jul 2018 23:11:20 +0800 Subject: [PATCH] - timer task shceuled to order goroutine to avoid concurrent problem. --- business/controller/controller.go | 25 ++++++++----------------- business/controller/dada/waybill.go | 2 +- business/controller/elm/elm.go | 3 +-- business/controller/elm/order.go | 6 +++--- business/controller/elm/waybill.go | 2 +- business/controller/jd/order.go | 2 +- business/controller/jd/waybill.go | 2 +- business/controller/mtps/waybill.go | 4 ++-- business/jxutils/jxutils.go | 18 ++++++++++++++++++ business/scheduler/defsch/defsch.go | 23 +++++++++++++++++++---- 10 files changed, 55 insertions(+), 32 deletions(-) diff --git a/business/controller/controller.go b/business/controller/controller.go index 3ac63aac5..eda51f2e8 100644 --- a/business/controller/controller.go +++ b/business/controller/controller.go @@ -5,7 +5,7 @@ import ( "time" "git.rosy.net.cn/baseapi/utils" - "git.rosy.net.cn/baseapi/utils/routinepool" + "git.rosy.net.cn/jx-callback/business/jxutils" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/scheduler" _ "git.rosy.net.cn/jx-callback/business/scheduler/defsch" // 导入缺省订单调度器 @@ -20,11 +20,9 @@ const ( var ( OrderManager *OrderController WaybillManager *WaybillController - routinePool *routinepool.Pool ) func init() { - routinePool = routinepool.New(1000, 1000) OrderManager = NewOrderManager() WaybillManager = NewWaybillManager() scheduler.CurrentScheduler.RegisterOrderManager(OrderManager) @@ -51,13 +49,6 @@ func addOrderOrWaybillStatus(status *model.OrderStatus, db orm.Ormer) (isDuplica return isDuplicated, err } -func CallMsgHandler(handler func(), primaryID string) { - // handler() - routinePool.CallFun(func() { - handler() - }, primaryID) -} - func GetDataCityCodeFromOrder(order *model.GoodsOrder) (retVal string, err error) { var sql string if order.VendorID == model.VendorIDJD { @@ -110,47 +101,47 @@ func LoadPendingOrders() { orderNew := *order orderNew.Status = model.OrderStatusNew orderNew.StatusTime = order.OrderCreatedAt - routinePool.CallFunAsync(func() { + jxutils.CallMsgHandlerAsync(func() { scheduler.CurrentScheduler.OnOrderNew(&orderNew) }, order.VendorOrderID) for _, bill := range billsMap[order.VendorOrderID] { if order.Status > model.OrderStatusNew && !isNoNewSent && order.StatusTime.Sub(bill.WaybillCreatedAt) < 0 { isNoNewSent = true order2 := *order - routinePool.CallFunAsync(func() { + jxutils.CallMsgHandlerAsync(func() { scheduler.CurrentScheduler.OnOrderStatusChanged(model.Order2Status(&order2)) }, order.VendorOrderID) } billNew := *bill billNew.Status = model.OrderStatusNew billNew.StatusTime = order.OrderCreatedAt - routinePool.CallFunAsync(func() { + jxutils.CallMsgHandlerAsync(func() { scheduler.CurrentScheduler.OnWaybillStatusChanged(&billNew) }, bill.VendorOrderID) if order.Status > model.OrderStatusNew && !isNoNewSent && order.StatusTime.Sub(bill.StatusTime) < 0 { isNoNewSent = true order2 := *order - routinePool.CallFunAsync(func() { + jxutils.CallMsgHandlerAsync(func() { scheduler.CurrentScheduler.OnOrderStatusChanged(model.Order2Status(&order2)) }, order.VendorOrderID) } if bill.Status > model.WaybillStatusNew { bill2 := *bill - routinePool.CallFunAsync(func() { + jxutils.CallMsgHandlerAsync(func() { scheduler.CurrentScheduler.OnWaybillStatusChanged(&bill2) }, bill.VendorOrderID) } if order.Status > model.OrderStatusNew && !isNoNewSent { isNoNewSent = true order2 := *order - routinePool.CallFunAsync(func() { + jxutils.CallMsgHandlerAsync(func() { scheduler.CurrentScheduler.OnOrderStatusChanged(model.Order2Status(&order2)) }, order.VendorOrderID) } } if order.Status > model.OrderStatusNew && !isNoNewSent { order2 := *order - routinePool.CallFunAsync(func() { + jxutils.CallMsgHandlerAsync(func() { scheduler.CurrentScheduler.OnOrderStatusChanged(model.Order2Status(&order2)) }, order.VendorOrderID) } diff --git a/business/controller/dada/waybill.go b/business/controller/dada/waybill.go index 7d51389c6..352ce47d7 100644 --- a/business/controller/dada/waybill.go +++ b/business/controller/dada/waybill.go @@ -18,7 +18,7 @@ func init() { } func (c *WaybillController) OnWaybillMsg(msg *dadaapi.CallbackMsg) (retVal *dadaapi.CallbackResponse) { - controller.CallMsgHandler(func() { + jxutils.CallMsgHandler(func() { retVal = c.onWaybillMsg(msg) }, msg.OrderID) return retVal diff --git a/business/controller/elm/elm.go b/business/controller/elm/elm.go index 2005973da..79b1f20ab 100644 --- a/business/controller/elm/elm.go +++ b/business/controller/elm/elm.go @@ -3,7 +3,6 @@ package elm import ( "git.rosy.net.cn/baseapi/platformapi/elmapi" "git.rosy.net.cn/baseapi/utils" - "git.rosy.net.cn/jx-callback/business/controller" "git.rosy.net.cn/jx-callback/business/jxutils" "git.rosy.net.cn/jx-callback/business/model" ) @@ -46,7 +45,7 @@ func (c *Controller) OnCallbackMsg(msg *elmapi.CallbackMsg) (retVal *elmapi.Call retVal = elmapi.Err2CallbackResponse(err, "") } else { innerMsg.MsgType = msg.Type - controller.CallMsgHandler(func() { + jxutils.CallMsgHandler(func() { retVal = new(OrderController).onOrderUserUrgeOrder(&innerMsg) }, jxutils.ComposeUniversalOrderID(innerMsg.OrderID, model.VendorIDELM)) } diff --git a/business/controller/elm/order.go b/business/controller/elm/order.go index 52ab91a1b..3a3866ce0 100644 --- a/business/controller/elm/order.go +++ b/business/controller/elm/order.go @@ -43,21 +43,21 @@ func init() { } func (c *OrderController) OnOrderStatusMsg(msg *elmapi.CallbackOrderStatusMsg) (retVal *elmapi.CallbackResponse) { - controller.CallMsgHandler(func() { + jxutils.CallMsgHandler(func() { retVal = c.onOrderStatusMsg(msg) }, jxutils.ComposeUniversalOrderID(msg.OrderID, model.VendorIDELM)) return retVal } func (c *OrderController) OnOrderNewMsg(msg map[string]interface{}) (retVal *elmapi.CallbackResponse) { - controller.CallMsgHandler(func() { + jxutils.CallMsgHandler(func() { retVal = c.onOrderNew(msg) }, jxutils.ComposeUniversalOrderID(msg["orderId"].(string), model.VendorIDELM)) return retVal } func (c *OrderController) OnOrderCancelRefundMsg(msg *elmapi.CallbackOrderCancelRefundMsg) (retVal *elmapi.CallbackResponse) { - controller.CallMsgHandler(func() { + jxutils.CallMsgHandler(func() { retVal = c.onOrderCancelRefundMsg(msg) }, jxutils.ComposeUniversalOrderID(msg.OrderID, model.VendorIDELM)) return retVal diff --git a/business/controller/elm/waybill.go b/business/controller/elm/waybill.go index 2652bae14..ae2a98fd9 100644 --- a/business/controller/elm/waybill.go +++ b/business/controller/elm/waybill.go @@ -15,7 +15,7 @@ type WaybillController struct { } func (c *WaybillController) OnWaybillStatusMsg(msg *elmapi.CallbackWaybillStatusMsg) (retVal *elmapi.CallbackResponse) { - controller.CallMsgHandler(func() { + jxutils.CallMsgHandler(func() { retVal = c.onWaybillStatusMsg(msg) }, jxutils.ComposeUniversalOrderID(msg.OrderID, model.VendorIDELM)) return retVal diff --git a/business/controller/jd/order.go b/business/controller/jd/order.go index 5e1d5ddf0..26591677c 100644 --- a/business/controller/jd/order.go +++ b/business/controller/jd/order.go @@ -36,7 +36,7 @@ func init() { } func (c *OrderController) OnOrderMsg(msg *jdapi.CallbackOrderMsg) (retVal *jdapi.CallbackResponse) { - controller.CallMsgHandler(func() { + jxutils.CallMsgHandler(func() { retVal = c.onOrderMsg(msg) }, jxutils.ComposeUniversalOrderID(msg.BillID, model.VendorIDJD)) return retVal diff --git a/business/controller/jd/waybill.go b/business/controller/jd/waybill.go index 7a9ca6ce7..abf0774a9 100644 --- a/business/controller/jd/waybill.go +++ b/business/controller/jd/waybill.go @@ -13,7 +13,7 @@ type WaybillController struct { } func (c *WaybillController) OnWaybillMsg(msg *jdapi.CallbackDeliveryStatusMsg) (retVal *jdapi.CallbackResponse) { - controller.CallMsgHandler(func() { + jxutils.CallMsgHandler(func() { retVal = c.onWaybillMsg(msg) }, jxutils.ComposeUniversalOrderID(msg.OrderID, model.VendorIDJD)) return retVal diff --git a/business/controller/mtps/waybill.go b/business/controller/mtps/waybill.go index 0eda6d91f..18b805ddb 100644 --- a/business/controller/mtps/waybill.go +++ b/business/controller/mtps/waybill.go @@ -23,14 +23,14 @@ func init() { } func (c *WaybillController) OnWaybillMsg(msg *mtpsapi.CallbackOrderMsg) (retVal *mtpsapi.CallbackResponse) { - controller.CallMsgHandler(func() { + jxutils.CallMsgHandler(func() { retVal = c.onWaybillMsg(msg) }, msg.OrderID) return retVal } func (c *WaybillController) OnWaybillExcept(msg *mtpsapi.CallbackOrderExceptionMsg) (retVal *mtpsapi.CallbackResponse) { - controller.CallMsgHandler(func() { + jxutils.CallMsgHandler(func() { order := &model.Waybill{ VendorWaybillID: msg.MtPeisongID, VendorWaybillID2: utils.Int64ToStr(msg.DeliveryID), diff --git a/business/jxutils/jxutils.go b/business/jxutils/jxutils.go index 6be0904fc..2cc6575c4 100644 --- a/business/jxutils/jxutils.go +++ b/business/jxutils/jxutils.go @@ -10,6 +10,7 @@ import ( "git.rosy.net.cn/baseapi/platformapi/autonavi" "git.rosy.net.cn/baseapi/utils" + "git.rosy.net.cn/baseapi/utils/routinepool" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/globals/api" ) @@ -18,12 +19,17 @@ const ( DefaultOrderCacheTimeout = 24 * time.Hour ) +var ( + routinePool *routinepool.Pool +) + type SyncMapWithTimeout struct { sync.Map } func init() { rand.Seed(time.Now().Unix()) + routinePool = routinepool.New(1000, 1000) } func (m *SyncMapWithTimeout) StoreWithTimeout(key, value interface{}, timeout time.Duration) { @@ -142,3 +148,15 @@ func StandardPrice2Int(value float64) int64 { func IntPrice2StandardString(value int64) string { return fmt.Sprintf("%.2f", IntPrice2Standard(value)) } + +func CallMsgHandler(handler func(), primaryID string) { + routinePool.CallFun(func() { + handler() + }, primaryID) +} + +func CallMsgHandlerAsync(handler func(), primaryID string) { + routinePool.CallFunAsync(func() { + handler() + }, primaryID) +} diff --git a/business/scheduler/defsch/defsch.go b/business/scheduler/defsch/defsch.go index e7fa704d5..c82106191 100644 --- a/business/scheduler/defsch/defsch.go +++ b/business/scheduler/defsch/defsch.go @@ -48,7 +48,18 @@ func init() { Timeout: 1 * time.Second, TimeoutAction: func(order *model.GoodsOrder) (err error) { _ = sch.handleAutoAcceptOrder(order.VendorOrderID, order.VendorID, order.ConsigneeMobile, jxutils.GetJxStoreIDFromOrder(order), nil, func(isAcceptIt bool) error { - return sch.AcceptOrRefuseOrder(order, isAcceptIt) + if err = sch.AcceptOrRefuseOrder(order, isAcceptIt); err != nil { + // 为了解决京东新消息与接单消息乱序的问题 + if errWithCode, ok := err.(*utils.ErrorWithCode); ok && errWithCode.Level() == 1 && errWithCode.IntCode() == -1 { + if order2, err2 := sch.CurOrderManager.LoadOrder(order.VendorOrderID, order.VendorID); err2 == nil && order2.Status > order.Status { + sch.OnOrderStatusChanged(model.Order2Status(order2)) + err = nil + } else { + err = err2 + } + } + } + return err }) return nil }, @@ -75,7 +86,7 @@ func (s *DefScheduler) OnOrderNew(order *model.GoodsOrder) (err error) { order: order, } s.orderMap.Store(jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID), watchInfo) - s.resetTimer(watchInfo, model.OrderStatusNew, order.OrderCreatedAt, 0) + s.resetTimer(watchInfo, watchInfo.order.Status, order.OrderCreatedAt, 0) return err } @@ -301,9 +312,13 @@ func (s *DefScheduler) resetTimer(savedOrderInfo *WatchOrderInfo, status int, be timeout := jxutils.GetRealTimeout(beginTime, config.Timeout, minTimeout) + gap globals.SugarLogger.Debugf("resetTimer timeout:%v, orderID:%s", timeout, savedOrderInfo.order.VendorOrderID) savedOrderInfo.timerStatus = status + order := savedOrderInfo.order savedOrderInfo.timer = time.AfterFunc(timeout, func() { - config.TimeoutAction(savedOrderInfo.order) - savedOrderInfo.timerStatus = 0 // todo 可能有线程安全问题,考虑加入订单队列 + // order 事件序列化 + jxutils.CallMsgHandlerAsync(func() { + config.TimeoutAction(order) + savedOrderInfo.timerStatus = 0 + }, order.VendorOrderID) }) } } else {