From c0770e9ab581df0b58672daecfb44961d1c6544f Mon Sep 17 00:00:00 2001 From: gazebo Date: Wed, 25 Jul 2018 20:43:41 +0800 Subject: [PATCH] - big refactor for scheduler. --- business/controller/controller.go | 12 +- business/controller/dada/waybill.go | 3 +- business/controller/elm/order.go | 14 +- business/controller/elm/waybill.go | 2 +- business/controller/jd/order.go | 24 +- business/controller/jd/order_test.go | 35 +++ business/controller/mtps/waybill.go | 2 +- business/controller/order.go | 18 +- business/controller/order_legacy.go | 146 ++++----- business/controller/waybill.go | 2 +- business/jxutils/jxutils.go | 36 ++- business/jxutils/jxutils_test.go | 17 ++ business/model/const.go | 27 ++ business/model/order.go | 5 +- business/scheduler/defsch/defsch.go | 422 +++++++++++++++------------ business/scheduler/scheduler.go | 71 +++-- 16 files changed, 515 insertions(+), 321 deletions(-) create mode 100644 business/controller/jd/order_test.go diff --git a/business/controller/controller.go b/business/controller/controller.go index 5f2a08882..4e8350c8a 100644 --- a/business/controller/controller.go +++ b/business/controller/controller.go @@ -49,6 +49,12 @@ func init() { } func addOrderOrWaybillStatus(status *model.OrderStatus, db orm.Ormer) (isDuplicated bool, err error) { + if status.OrderType == model.OrderTypeOrder { + globals.SugarLogger.Debugf("addOrderStatus order:%v", status) + } else { + globals.SugarLogger.Debugf("addOrderStatus waybill:%v", status) + } + status.ID = 0 created, _, err := db.ReadOrCreate(status, "VendorOrderID", "VendorID", "OrderType", "VendorStatus", "StatusTime") if err == nil { @@ -99,17 +105,17 @@ func LoadPendingOrders() { if order, ok := item.(*model.GoodsOrder); ok { if order.Status == model.OrderStatusNew { jxutils.CallMsgHandlerAsync(func() { - scheduler.CurrentScheduler.OnOrderNew(order) + scheduler.CurrentScheduler.OnOrderNew(order, true) }, order.VendorOrderID) } else { jxutils.CallMsgHandlerAsync(func() { - scheduler.CurrentScheduler.OnOrderStatusChanged(model.Order2Status(order)) + scheduler.CurrentScheduler.OnOrderStatusChanged(model.Order2Status(order), true) }, order.VendorOrderID) } } else { bill := item.(*model.Waybill) jxutils.CallMsgHandlerAsync(func() { - scheduler.CurrentScheduler.OnWaybillStatusChanged(bill) + scheduler.CurrentScheduler.OnWaybillStatusChanged(bill, true) }, bill.VendorOrderID) } } diff --git a/business/controller/dada/waybill.go b/business/controller/dada/waybill.go index 5305496e3..286a7c2ae 100644 --- a/business/controller/dada/waybill.go +++ b/business/controller/dada/waybill.go @@ -9,6 +9,7 @@ import ( "git.rosy.net.cn/jx-callback/business/controller" "git.rosy.net.cn/jx-callback/business/jxutils" "git.rosy.net.cn/jx-callback/business/model" + "git.rosy.net.cn/jx-callback/business/scheduler" "git.rosy.net.cn/jx-callback/globals" "git.rosy.net.cn/jx-callback/globals/api" "github.com/astaxie/beego/orm" @@ -22,7 +23,7 @@ type WaybillController struct { } func init() { - //scheduler.CurrentScheduler.RegisterDeliveryPlatform(model.VendorIDDada, new(WaybillController)) + scheduler.CurrentScheduler.RegisterDeliveryPlatform(model.VendorIDDada, new(WaybillController), false) } func (c *WaybillController) OnWaybillMsg(msg *dadaapi.CallbackMsg) (retVal *dadaapi.CallbackResponse) { diff --git a/business/controller/elm/order.go b/business/controller/elm/order.go index 3a3866ce0..e0976249e 100644 --- a/business/controller/elm/order.go +++ b/business/controller/elm/order.go @@ -158,6 +158,11 @@ func (c *OrderController) GetOrder(orderID string) (order *model.GoodsOrder, err Skus: []*model.OrderSku{}, } order.Status = c.GetStatusFromVendorStatus(order.VendorStatus) + if result["book"].(bool) { + order.BusinessType = model.BusinessTypeDingshida + } else { + order.BusinessType = model.BusinessTypeImmediate + } deliveryGeo := strings.Split(utils.Interface2String(result["deliveryGeo"]), ",") if len(deliveryGeo) == 2 { order.CoordinateType = model.CoordinateTypeMars @@ -273,8 +278,11 @@ func (c *OrderController) SelfDeliverDelievered(order *model.GoodsOrder) (err er return api.ElmAPI.CompleteDeliveryBySelf(order.VendorOrderID, order.ConsigneeMobile) } -func (c *OrderController) GetStatusActionConfig(status int) *scheduler.StatusActionConfig { - return &scheduler.StatusActionConfig{ - Timeout: acceptOrderDelay, // 饿了么没有拣货状态,接单后就为拣货完成,所以要延迟接单,否则门店来不及备货 +func (c *OrderController) GetStatusActionConfig(statusType, status int) *scheduler.StatusActionConfig { + if statusType == scheduler.TimerStatusTypeOrder && status == model.OrderStatusNew { + return &scheduler.StatusActionConfig{ + Timeout: acceptOrderDelay, // 饿了么没有拣货状态,接单后就为拣货完成,所以要延迟接单,否则门店来不及备货 + } } + return nil } diff --git a/business/controller/elm/waybill.go b/business/controller/elm/waybill.go index ae2a98fd9..351c68030 100644 --- a/business/controller/elm/waybill.go +++ b/business/controller/elm/waybill.go @@ -23,7 +23,7 @@ func (c *WaybillController) OnWaybillStatusMsg(msg *elmapi.CallbackWaybillStatus func (c *WaybillController) onWaybillStatusMsg(msg *elmapi.CallbackWaybillStatusMsg) (retVal *elmapi.CallbackResponse) { order := c.callbackMsg2Waybill(msg) - if msg.MsgType == elmapi.MsgTypeWaybillWait4DeliveryVendor { + if msg.MsgType == elmapi.MsgTypeWaybillWait4Courier { //MsgTypeWaybillWait4Courier事件与JD的新运单事件的时间机制更相似 order.Status = model.WaybillStatusNew } else if msg.MsgType == elmapi.MsgTypeWaybillPickingUp { if result, err := api.ElmAPI.GetOrder(msg.OrderID); err == nil { diff --git a/business/controller/jd/order.go b/business/controller/jd/order.go index 5aeda6882..b0ecfe0c4 100644 --- a/business/controller/jd/order.go +++ b/business/controller/jd/order.go @@ -90,6 +90,12 @@ func (c *OrderController) GetOrder(orderID string) (order *model.GoodsOrder, err Skus: []*model.OrderSku{}, } order.Status = c.GetStatusFromVendorStatus(order.VendorStatus) + businessTage := utils.Interface2String(result["businessTag"]) + if strings.Index(businessTage, "dj_aging_immediately") >= 0 { + order.BusinessType = model.BusinessTypeImmediate + } else { + order.BusinessType = model.BusinessTypeDingshida + } coordinateType := utils.Interface2Int64WithDefault(result["buyerCoordType"], 1) originalLng := utils.MustInterface2Float64(result["buyerLng"]) originalLat := utils.MustInterface2Float64(result["buyerLat"]) @@ -183,32 +189,20 @@ func (c *OrderController) AcceptOrRefuseOrder(order *model.GoodsOrder, isAcceptI func (c *OrderController) PickedUpGoods(order *model.GoodsOrder) (err error) { _, err = api.JdAPI.OrderJDZBDelivery(order.VendorOrderID) - return c.translateOrderRelatedErr(err) + return err } func (c *OrderController) Swtich2SelfDeliver(order *model.GoodsOrder) (err error) { _, err = api.JdAPI.ModifySellerDelivery(order.VendorOrderID) - return c.translateOrderRelatedErr(err) + return err } func (c *OrderController) SelfDeliverDelievering(order *model.GoodsOrder) (err error) { _, err = api.JdAPI.OrderSerllerDelivery(order.VendorOrderID) - return c.translateOrderRelatedErr(err) + return err } func (c *OrderController) SelfDeliverDelievered(order *model.GoodsOrder) (err error) { _, err = api.JdAPI.DeliveryEndOrder(order.VendorOrderID) return err } - -func (c *OrderController) translateOrderRelatedErr(err error) (retVal error) { - if err != nil { - if errWithCode, ok := err.(*utils.ErrorWithCode); ok { - if errWithCode.Level() == 1 && errWithCode.Code() == jdapi.ResponseInnerCodeOrderAlreadyPickedUp { - return nil - } - } - return scheduler.ErrStatusIsNotOKForOperation - } - return nil -} diff --git a/business/controller/jd/order_test.go b/business/controller/jd/order_test.go new file mode 100644 index 000000000..0cbda7a2c --- /dev/null +++ b/business/controller/jd/order_test.go @@ -0,0 +1,35 @@ +package jd + +import ( + "testing" + + "git.rosy.net.cn/jx-callback/business/controller" + "git.rosy.net.cn/jx-callback/business/model" + "git.rosy.net.cn/jx-callback/globals" + "git.rosy.net.cn/jx-callback/globals/api" + "git.rosy.net.cn/jx-callback/globals/db" + "github.com/astaxie/beego" +) + +func init() { + beego.InitBeegoBeforeTest("/Users/xujianhua/go/src/git.rosy.net.cn/jx-callback/conf/app.conf") + beego.BConfig.RunMode = "dev" // InitBeegoBeforeTest会将runmode设置为test + + globals.Init() + db.Init() + api.Init() +} + +func TestSwitch2SelfDeliver(t *testing.T) { + orderID := "817540316000041" + if order, err := controller.OrderManager.LoadOrder(orderID, model.VendorIDJD); err == nil { + // globals.SugarLogger.Debug(order) + c := new(OrderController) + if err = c.Swtich2SelfDeliver(order); err == nil { + } else { + t.Fatal(err.Error()) + } + } else { + t.Fatal(err.Error()) + } +} diff --git a/business/controller/mtps/waybill.go b/business/controller/mtps/waybill.go index a894c3cbe..66a1c0671 100644 --- a/business/controller/mtps/waybill.go +++ b/business/controller/mtps/waybill.go @@ -24,7 +24,7 @@ type WaybillController struct { } func init() { - scheduler.CurrentScheduler.RegisterDeliveryPlatform(model.VendorIDMTPS, new(WaybillController)) + scheduler.CurrentScheduler.RegisterDeliveryPlatform(model.VendorIDMTPS, new(WaybillController), true) } func (c *WaybillController) OnWaybillMsg(msg *mtpsapi.CallbackOrderMsg) (retVal *mtpsapi.CallbackResponse) { diff --git a/business/controller/order.go b/business/controller/order.go index 07560a8e5..049de0973 100644 --- a/business/controller/order.go +++ b/business/controller/order.go @@ -48,13 +48,16 @@ func (c *OrderController) LoadPendingOrders() []*model.GoodsOrder { // OnOrderAdjust也类似,而OrderStatus要记录的是消息,所以添加这个 func (c *OrderController) OnOrderNew(order *model.GoodsOrder, msgVendorStatus string) (err error) { db := orm.NewOrm() + if order.Status == model.OrderStatusUnknown { + order.Status = model.OrderStatusNew + } status := model.Order2Status(order) status.Status = model.OrderStatusNew status.VendorStatus = msgVendorStatus isDuplicated, err := addOrderOrWaybillStatus(status, db) if err == nil && !isDuplicated { if err = c.saveOrder(order, false, db); err == nil { - err = scheduler.CurrentScheduler.OnOrderNew(order) + err = scheduler.CurrentScheduler.OnOrderNew(order, false) weixinmsg.NotifyNewOrder(order) } } @@ -64,6 +67,9 @@ func (c *OrderController) OnOrderNew(order *model.GoodsOrder, msgVendorStatus st // todo 调整单的处理可能还需要再细化一点,当前只是简单的删除重建 func (c *OrderController) OnOrderAdjust(order *model.GoodsOrder, msgVendorStatus string) (err error) { db := orm.NewOrm() + if order.Status == model.OrderStatusUnknown { + order.Status = model.OrderStatusNew + } status := model.Order2Status(order) status.Status = model.OrderStatusAdjust status.VendorStatus = msgVendorStatus @@ -85,7 +91,9 @@ func (c *OrderController) OnOrderAdjust(order *model.GoodsOrder, msgVendorStatus } if err = c.saveOrder(order, true, db); err == nil { // 因为订单调度器需要的是真实状态,所以用order的状态 - err = scheduler.CurrentScheduler.OnOrderStatusChanged(model.Order2Status(order)) + err = scheduler.CurrentScheduler.OnOrderNew(order, false) + err = scheduler.CurrentScheduler.OnOrderStatusChanged(model.Order2Status(order), false) + weixinmsg.NotifyNewOrder(order) } } return err @@ -94,7 +102,7 @@ func (c *OrderController) OnOrderAdjust(order *model.GoodsOrder, msgVendorStatus func (c *OrderController) OnOrderStatusChanged(orderStatus *model.OrderStatus) (err error) { isDuplicated, err := c.addOrderStatus(orderStatus, nil) if err == nil && !isDuplicated { - err = scheduler.CurrentScheduler.OnOrderStatusChanged(orderStatus) + err = scheduler.CurrentScheduler.OnOrderStatusChanged(orderStatus, false) if globals.GenerateLegacyJxOrder { c.legacyJxOrderStatusChanged(orderStatus, nil) } @@ -258,14 +266,14 @@ func (c *OrderController) LoadOrder(vendorOrderID string, vendorID int) (order * } //Waybill -func (c *OrderController) UpdateWaybillVendorID(bill *model.Waybill) (err error) { +func (c *OrderController) UpdateWaybillVendorID(bill *model.Waybill, revertStatus bool) (err error) { globals.SugarLogger.Debugf("UpdateWaybillVendorID bill:%v", bill) db := orm.NewOrm() params := orm.Params{ "waybill_vendor_id": bill.WaybillVendorID, } // 如果运单被取消,则要保持在已拣货状态 - if bill.WaybillVendorID == model.VendorIDUnknown { + if revertStatus && bill.WaybillVendorID == model.VendorIDUnknown { params["status"] = model.OrderStatusFinishedPickup } utils.CallFuncLogError(func() error { diff --git a/business/controller/order_legacy.go b/business/controller/order_legacy.go index e81feb5a7..9ce477877 100644 --- a/business/controller/order_legacy.go +++ b/business/controller/order_legacy.go @@ -34,6 +34,8 @@ const ( JX_DELIVERY_STATUS_DELIVERY_FAIL = 5 //投递失败 JX_DELIVERY_STATUS_DELIVERY_DONE = 6 //已完成 JX_DELIVERY_STATUS_DELIVERY_CANCEL = 7 //已取消 + + JX_STATUS_UNKNOWN = -100 ) const ( @@ -84,7 +86,7 @@ func legacyMapOrderStatus(orderStatus int) (retVal int8) { retVal = JX_ORDER_STATUS_PICKING case model.OrderStatusDelivering: retVal = JX_ORDER_STATUS_DELIVERING - case model.OrderStatusDelivered: + case model.OrderStatusDelivered, model.OrderStatusFinished: retVal = JX_ORDER_STATUS_DELIVERY_DONE case model.OrderStatusAdjust: retVal = JX_ORDER_STATUS_ADJUST @@ -92,6 +94,8 @@ func legacyMapOrderStatus(orderStatus int) (retVal int8) { retVal = JX_ORDER_STATUS_EXCEPTION_APPLY case model.OrderStatusCanceled: retVal = JX_ORDER_STATUS_CANCEL + default: + retVal = JX_STATUS_UNKNOWN } return retVal } @@ -112,6 +116,8 @@ func legacyMapWaybillStatus(status int) (retVal int8) { retVal = JX_DELIVERY_STATUS_DELIVERY_CANCEL case model.WaybillStatusFailed: retVal = JX_DELIVERY_STATUS_DELIVERY_FAIL + default: + retVal = JX_STATUS_UNKNOWN } return retVal } @@ -132,8 +138,8 @@ func (c *OrderController) legacyWriteJxOrder(order *model.GoodsOrder, db orm.Orm db.Begin() if isDelFirst { - db.Raw("DELETE FROM "+globals.JxorderTableName+" WHERE order_id = ?", utils.Str2Int64(order.VendorOrderID)) - db.Raw("DELETE FROM "+globals.JxorderskuTableName+" WHERE order_id = ?", utils.Str2Int64(order.VendorOrderID)) + db.Raw("DELETE FROM "+globals.JxorderTableName+" WHERE order_id = ?", utils.Str2Int64(order.VendorOrderID)).Exec() + db.Raw("DELETE FROM "+globals.JxorderskuTableName+" WHERE order_id = ?", utils.Str2Int64(order.VendorOrderID)).Exec() } jxorder := &legacymodel.Jxorder2{ @@ -210,81 +216,87 @@ func (c *OrderController) legacyWriteJxOrder(order *model.GoodsOrder, db orm.Orm } func (c *OrderController) legacyJxOrderStatusChanged(status *model.OrderStatus, db orm.Ormer) (err error) { - if db == nil { - db = orm.NewOrm() - } - jxorder := &legacymodel.Jxorder2{ - OrderId: utils.Str2Int64(status.VendorOrderID), - } - if err = db.Read(jxorder, "OrderId"); err == nil { - utils.CallFuncLogError(func() error { - jxorder.OrderStatus = legacyMapOrderStatus(status.Status) - jxorder.OrderStatusTime = utils.Time2Str(status.StatusTime) - updateFields := []string{ - "OrderStatus", - "OrderStatusTime", - } - if status.Status >= model.OrderStatusEndBegin { - jxorder.DeliveryFinishTime = utils.Time2Str(status.StatusTime) - updateFields = append(updateFields, "DeliveryFinishTime") - } - _, err = db.Update(jxorder, updateFields...) - - db.Raw(` + orderStatus := legacyMapOrderStatus(status.Status) + if orderStatus != JX_STATUS_UNKNOWN { + if db == nil { + db = orm.NewOrm() + } + jxorder := &legacymodel.Jxorder2{ + OrderId: utils.Str2Int64(status.VendorOrderID), + } + if err = db.Read(jxorder, "OrderId"); err == nil { + utils.CallFuncLogError(func() error { + jxorder.OrderStatus = orderStatus + jxorder.OrderStatusTime = utils.Time2Str(status.StatusTime) + updateFields := []string{ + "OrderStatus", + "OrderStatusTime", + } + if status.Status >= model.OrderStatusEndBegin { + jxorder.DeliveryFinishTime = utils.Time2Str(status.StatusTime) + updateFields = append(updateFields, "DeliveryFinishTime") + } + _, err = db.Update(jxorder, updateFields...) + if orderStatus == JX_ORDER_STATUS_DELIVERY_DONE { + db.Raw(` UPDATE `+globals.JxorderTableName+` t1 JOIN waybill t2 ON t2.vendor_order_id = t1.order_id AND t2.status = 105 SET t1.delivery_price = IF(t2.waybill_vendor_id = 102, t2.desired_fee/100, t1.delivery_price), t1.delivery_price1 = IF(t2.waybill_vendor_id = 101, t2.desired_fee/100, t1.delivery_price1) WHERE t1.order_id = ? `, jxorder.OrderId).Exec() - - return err - }, "legacyJxOrderStatusChanged") - } else { - globals.SugarLogger.Infof("read legacyJxOrder orderID:%d error:%v, ", jxorder.OrderId, err) + } + return err + }, "legacyJxOrderStatusChanged") + } else { + globals.SugarLogger.Infof("read legacyJxOrder orderID:%d error:%v, ", jxorder.OrderId, err) + } } return err } func (c *WaybillController) legacyWaybillStatusChanged(bill *model.Waybill, db orm.Ormer) (err error) { - if db == nil { - db = orm.NewOrm() - } - jxorder := &legacymodel.Jxorder2{ - OrderId: utils.Str2Int64(bill.VendorOrderID), - } - if err = db.Read(jxorder, "OrderId"); err == nil { - utils.CallFuncLogError(func() error { - updateFields := []string{ - "DeliveryCarrierNo", - "DeliveryCarrierName", - "DeliveryManNo", - "DeliveryManName", - "DeliveryManPhone", - "DeliveryBillNo", - "DeliveryStatus", - } - // jxorder.DeliveryPackageWeight - jxorder.DeliveryCarrierNo = VENDOR_ID2CARRIER_NO[bill.WaybillVendorID] - jxorder.DeliveryCarrierName = CARRIERS_NAMES[jxorder.DeliveryCarrierNo] - jxorder.DeliveryManNo = bill.CourierMobile - jxorder.DeliveryManName = bill.CourierName - jxorder.DeliveryManPhone = bill.CourierMobile - jxorder.DeliveryBillNo = bill.VendorWaybillID - jxorder.DeliveryStatus = legacyMapWaybillStatus(bill.Status) - // jxorder.DeliveryConfirmTime - if bill.Status == model.WaybillStatusNew { - updateFields = append(updateFields, "DeliveryStartTime") - jxorder.DeliveryStartTime = utils.Time2Str(bill.StatusTime) - } else if bill.Status >= model.WaybillStatusEndBegin { - jxorder.DeliveryFinishTime = utils.Time2Str(bill.StatusTime) - updateFields = append(updateFields, "DeliveryFinishTime") - } - _, err = db.Update(jxorder, updateFields...) - return err - }, "legacyJxOrderStatusChanged") - } else { - globals.SugarLogger.Infof("read legacyJxOrder, orderID:%d error:%v", jxorder.OrderId, err) + deliveryStatus := legacyMapWaybillStatus(bill.Status) + if deliveryStatus != JX_STATUS_UNKNOWN { + if db == nil { + db = orm.NewOrm() + } + jxorder := &legacymodel.Jxorder2{ + OrderId: utils.Str2Int64(bill.VendorOrderID), + } + if err = db.Read(jxorder, "OrderId"); err == nil { + utils.CallFuncLogError(func() error { + updateFields := []string{ + "DeliveryCarrierNo", + "DeliveryCarrierName", + "DeliveryManNo", + "DeliveryManName", + "DeliveryManPhone", + "DeliveryBillNo", + "DeliveryStatus", + } + // jxorder.DeliveryPackageWeight + jxorder.DeliveryCarrierNo = VENDOR_ID2CARRIER_NO[bill.WaybillVendorID] + jxorder.DeliveryCarrierName = CARRIERS_NAMES[jxorder.DeliveryCarrierNo] + jxorder.DeliveryManNo = bill.CourierMobile + jxorder.DeliveryManName = bill.CourierName + jxorder.DeliveryManPhone = bill.CourierMobile + jxorder.DeliveryBillNo = bill.VendorWaybillID + jxorder.DeliveryStatus = deliveryStatus + // jxorder.DeliveryConfirmTime + if bill.Status == model.WaybillStatusNew { + updateFields = append(updateFields, "DeliveryStartTime") + jxorder.DeliveryStartTime = utils.Time2Str(bill.StatusTime) + } else if bill.Status >= model.WaybillStatusEndBegin { + jxorder.DeliveryFinishTime = utils.Time2Str(bill.StatusTime) + updateFields = append(updateFields, "DeliveryFinishTime") + } + _, err = db.Update(jxorder, updateFields...) + return err + }, "legacyJxOrderStatusChanged") + } else { + globals.SugarLogger.Infof("read legacyJxOrder, orderID:%d error:%v", jxorder.OrderId, err) + } } return err } diff --git a/business/controller/waybill.go b/business/controller/waybill.go index b505030b5..1ddb7ece0 100644 --- a/business/controller/waybill.go +++ b/business/controller/waybill.go @@ -78,7 +78,7 @@ func (w *WaybillController) OnWaybillStatusChanged(bill *model.Waybill) (err err isDuplicated, err = w.addWaybillStatus(bill, db, addParams) } if err == nil && !isDuplicated { - scheduler.CurrentScheduler.OnWaybillStatusChanged(bill) + scheduler.CurrentScheduler.OnWaybillStatusChanged(bill, false) if bill.Status == model.WaybillStatusAccepted || bill.Status == model.WaybillStatusDelivered { if order, err2 := OrderManager.LoadOrder(bill.VendorOrderID, bill.OrderVendorID); err2 == nil { weixinmsg.NotifyWaybillStatus(bill, order) diff --git a/business/jxutils/jxutils.go b/business/jxutils/jxutils.go index 2ccb99249..96d6cd619 100644 --- a/business/jxutils/jxutils.go +++ b/business/jxutils/jxutils.go @@ -15,16 +15,13 @@ import ( "git.rosy.net.cn/jx-callback/globals/api" ) -const ( - DefaultOrderCacheTimeout = 24 * time.Hour -) - var ( routinePool *routinepool.Pool ) type SyncMapWithTimeout struct { sync.Map + timers sync.Map } func init() { @@ -34,13 +31,18 @@ func init() { func (m *SyncMapWithTimeout) StoreWithTimeout(key, value interface{}, timeout time.Duration) { m.Map.Store(key, value) - time.AfterFunc(timeout, func() { + m.timers.Store(key, time.AfterFunc(timeout, func() { m.Delete(key) - }) + })) } -func (m *SyncMapWithTimeout) Store(key, value interface{}) { - m.StoreWithTimeout(key, value, DefaultOrderCacheTimeout) +func (m *SyncMapWithTimeout) Delete(key interface{}) { + m.Map.Delete(key) + if value, ok := m.timers.Load(key); ok { + timer := value.(*time.Timer) + timer.Stop() + } + m.timers.Delete(key) } func GetJxStoreIDFromOrder(order *model.GoodsOrder) (retVal int) { @@ -90,14 +92,6 @@ func GetUniversalOrderIDFromOrderStatus(status *model.OrderStatus) string { return ComposeUniversalOrderID(status.VendorOrderID, status.VendorID) } -func GetRealTimeout(beginTime time.Time, timeout time.Duration, minTimeout time.Duration) time.Duration { - retVal := beginTime.Add(timeout).Sub(time.Now()) - if retVal < minTimeout { - retVal = minTimeout + time.Duration(rand.Int31n(5*1000))*time.Millisecond // 随机分布在5秒内这样写的原因是避免启动时加载订单,TIMER同一瞬间启动 - } - return retVal -} - func EarthDistance(lat1, lng1, lat2, lng2 float64) float64 { radius := 6378.137 rad := math.Pi / 180.0 @@ -168,3 +162,13 @@ func SplitSkuName(fullName string) (name string, unit string) { } return fullName, "份" } + +func MapValue2Scope(value, fromMin, fromMax, toMin, toMax int64) int64 { + if value < fromMin { + value = fromMin + } + if value > fromMax { + value = fromMax + } + return int64(math.Round(float64(value-fromMin)/float64(fromMax-fromMin)*float64(toMax-toMin) + float64(toMin))) +} diff --git a/business/jxutils/jxutils_test.go b/business/jxutils/jxutils_test.go index 46fe309a0..40b04ec50 100644 --- a/business/jxutils/jxutils_test.go +++ b/business/jxutils/jxutils_test.go @@ -11,3 +11,20 @@ func TestEarthDistance(t *testing.T) { distance := EarthDistance(lat1, lng1, lat2, lng2) fmt.Print(distance) } + +func TestMapValue2Scope(t *testing.T) { + result := MapValue2Scope(-4, -10, 0, 0, 100) + if result != 60 { + t.Fatalf("result:%d is wrong", result) + } + + result = MapValue2Scope(-4, 0, 10, 0, 100) + if result != 0 { + t.Fatalf("result:%d is wrong", result) + } + + result = MapValue2Scope(100, 0, 10, 0, 100) + if result != 100 { + t.Fatalf("result:%d is wrong", result) + } +} diff --git a/business/model/const.go b/business/model/const.go index 0db6cb6d5..573f9e2df 100644 --- a/business/model/const.go +++ b/business/model/const.go @@ -30,6 +30,28 @@ var ( VendorIDDada: "达达众包", VendorIDMTPS: "美团配送", } + + OrderStatusName = map[int]string{ + OrderStatusNew: "OrderStatusNew", + OrderStatusAdjust: "OrderStatusAdjust", + OrderStatusAccepted: "OrderStatusAccepted", + OrderStatusFinishedPickup: "OrderStatusFinishedPickup", + OrderStatusDelivering: "OrderStatusDelivering", + OrderStatusDelivered: "OrderStatusDelivered", + OrderStatusFinished: "OrderStatusFinished", + OrderStatusCanceled: "OrderStatusCanceled", + OrderStatusFailed: "OrderStatusFailed", + } + WaybillStatusName = map[int]string{ + WaybillStatusNew: "WaybillStatusNew", + WaybillStatusAcceptCanceled: "WaybillStatusAcceptCanceled", + WaybillStatusAccepted: "WaybillStatusAccepted", + WaybillStatusCourierArrived: "WaybillStatusCourierArrived", + WaybillStatusDelivering: "WaybillStatusDelivering", + WaybillStatusDelivered: "WaybillStatusDelivered", + WaybillStatusCanceled: "WaybillStatusCanceled", + WaybillStatusFailed: "WaybillStatusFailed", + } ) const ( @@ -93,3 +115,8 @@ const ( WaybillStatusCanceled = 115 WaybillStatusFailed = 120 ) + +const ( + BusinessTypeImmediate = 1 + BusinessTypeDingshida = 2 +) diff --git a/business/model/order.go b/business/model/order.go index d465f8a23..0d47887bd 100644 --- a/business/model/order.go +++ b/business/model/order.go @@ -32,8 +32,9 @@ type GoodsOrder struct { Status int // 参见OrderStatus*相关的常量定义 VendorStatus string `orm:"size(255)"` LockStatus int - OrderSeq int // 门店订单序号 - BuyerComment string `orm:"size(255)"` + OrderSeq int // 门店订单序号 + BuyerComment string `orm:"size(255)"` + BusinessType int ExpectedDeliveredTime time.Time `orm:"type(datetime)"` // 预期送达时间 CancelApplyReason string `orm:"size(255)"` // ""表示没有申请,不为null表示用户正在取消申请 WaybillVendorID int `orm:"column(waybill_vendor_id)"` // 表示当前承运商,-1表示还没有安排 diff --git a/business/scheduler/defsch/defsch.go b/business/scheduler/defsch/defsch.go index 4eeece867..a5fccb2e4 100644 --- a/business/scheduler/defsch/defsch.go +++ b/business/scheduler/defsch/defsch.go @@ -15,26 +15,37 @@ import ( ) const ( - time2Delivered = 1 * time.Hour // 正常订单都是1小时达 - time2Schedule3rdCarrier = 330 * time.Second // 京东要求5分钟后才能转自送,保险起见,设置为5分半钟 - time2Schedule3rdCarrierGap4OrderStatus = 3 * time.Minute // 京东要求是运单状态为待抢单且超时5分钟,但为了防止没有运单事件,所以就拣货完成事件开始算,添加3分钟 - time2AutoPickupMin = 15 * time.Minute - time2AutoPickupGap = 5 * time.Minute - minTimeout = 5 * time.Second // timer的最小时间,这样写的上的是在load pending orders,让延迟的事件有机会执行 + time2Delivered = 1 * time.Hour // 正常从下单到送达的时间。 + time2Schedule3rdCarrier = 330 * time.Second // 京东要求5分钟后才能转自送,保险起见,设置为5分半钟 + // time2Schedule3rdCarrierGap4OrderStatus = 3 * time.Minute // 京东要求是运单状态为待抢单且超时5分钟,但为了防止没有运单事件,所以就拣货完成事件开始算,添加3分钟 + time2AutoPickupMin = 15 * time.Minute + time2AutoPickupGap = 5 * 60 //随机5分钟 + + // (把pending order timerout 在-pendingOrderTimerMinMinSecond至pendingOrderTimerMaxSecond映射到pendingOrderTimerMinSecond至pendingOrderTimerMaxSecond) + pendingOrderTimerMinMinSecond = 5 * 60 // 10分钟 + pendingOrderTimerMinSecond = 2 + pendingOrderTimerMaxSecond = 5 + + maxWaybillRetryCount = 2 + + orderMapStoreMaxTime = 4 * 24 * time.Hour // cache最长存储时间 ) type WatchOrderInfo struct { - order *model.GoodsOrder // order里的信息是保持更新的 - waybills []*model.Waybill // 这个waybills里的状态信息是不真实的,只使用id相关的信息 - timerStatus int - timer *time.Timer - retryCount int + order *model.GoodsOrder // order里的信息是保持更新的 + waybills []*model.Waybill // 这个waybills里的状态信息是不真实的,只使用id相关的信息 + + timerStatusType int // 0表示订单,1表示运单 + timerStatus int + timer *time.Timer + + retryCount int // 失败后尝试的次数,调试阶段可能出现死循化,阻止这种情况发生 } // 重要:此调度器要求同一定单的处理逻辑必须是序列化了的,不然会有并发问题 type DefScheduler struct { scheduler.BaseScheduler - defWorkflowConfig map[int]*scheduler.StatusActionConfig + defWorkflowConfig []map[int]*scheduler.StatusActionConfig orderMap jxutils.SyncMapWithTimeout } @@ -43,78 +54,73 @@ func init() { sch.IsReallyCallPlatformAPI = globals.ReallyCallPlatformAPI sch.Init() scheduler.CurrentScheduler = sch - sch.defWorkflowConfig = map[int]*scheduler.StatusActionConfig{ - model.OrderStatusNew: &scheduler.StatusActionConfig{ // 自动接单 - Timeout: 1 * time.Second, - TimeoutAction: func(order *model.GoodsOrder) (err error) { - _ = sch.handleAutoAcceptOrder(order.VendorOrderID, order.VendorID, order.ConsigneeMobile, jxutils.GetJxStoreIDFromOrder(order), nil, func(isAcceptIt bool) error { - if err = sch.AcceptOrRefuseOrder(order, isAcceptIt); err != nil { - // 为了解决京东新消息与接单消息乱序的问题 - if errWithCode, ok := err.(*utils.ErrorWithCode); ok && errWithCode.Level() == 1 && errWithCode.IntCode() == -1 { - if order2, err2 := sch.GetPurchasePlatformFromVendorID(order.VendorID).GetOrder(order.VendorOrderID); err2 == nil && order2.Status > order.Status { - sch.OnOrderStatusChanged(model.Order2Status(order2)) - err = nil - } else { - err = err2 + sch.defWorkflowConfig = []map[int]*scheduler.StatusActionConfig{ + map[int]*scheduler.StatusActionConfig{ + model.OrderStatusNew: &scheduler.StatusActionConfig{ // 自动接单 + TimerType: scheduler.TimerTypeBaseNow, + Timeout: 1 * time.Second, + TimeoutAction: func(order *model.GoodsOrder) (err error) { + _ = sch.handleAutoAcceptOrder(order.VendorOrderID, order.VendorID, order.ConsigneeMobile, jxutils.GetJxStoreIDFromOrder(order), nil, func(isAcceptIt bool) error { + if err = sch.AcceptOrRefuseOrder(order, isAcceptIt); err != nil { + // 为了解决京东新消息与接单消息乱序的问题 + if errWithCode, ok := err.(*utils.ErrorWithCode); ok && errWithCode.Level() == 1 && errWithCode.IntCode() == -1 { + if order2, err2 := sch.GetPurchasePlatformFromVendorID(order.VendorID).GetOrder(order.VendorOrderID); err2 == nil && order2.Status > order.Status { + sch.OnOrderStatusChanged(model.Order2Status(order2), false) + err = nil + } else { + err = err2 + } } } - } - return err - }) - return nil + return err + }) + return nil + }, + }, + model.OrderStatusAccepted: &scheduler.StatusActionConfig{ // 自动拣货 + TimerType: scheduler.TimerTypeBaseStatusTime, + Timeout: time2AutoPickupMin, + TimeoutGap: time2AutoPickupGap, + TimeoutAction: func(order *model.GoodsOrder) (err error) { + return sch.PickedUpGoods(order) + }, }, }, - model.OrderStatusAccepted: &scheduler.StatusActionConfig{ // 自动拣货 - Timeout: time2AutoPickupMin, - TimeoutAction: func(order *model.GoodsOrder) (err error) { - return sch.PickedUpGoods(order) - }, - }, - model.OrderStatusFinishedPickup: &scheduler.StatusActionConfig{ // 尝试召唤更多物流 - Timeout: time2Schedule3rdCarrier, - TimeoutAction: func(order *model.GoodsOrder) (err error) { - return sch.createWaybillOn3rdProviders(order, nil) + map[int]*scheduler.StatusActionConfig{ + model.WaybillStatusNew: &scheduler.StatusActionConfig{ // 尝试召唤更多物流 + TimerType: scheduler.TimerTypeBaseStatusTime, + Timeout: time2Schedule3rdCarrier, + TimeoutAction: func(order *model.GoodsOrder) (err error) { + return sch.createWaybillOn3rdProviders(order, nil) + }, }, }, } } // 以下是订单 -func (s *DefScheduler) OnOrderNew(order *model.GoodsOrder) (err error) { - globals.SugarLogger.Debugf("OnOrderNew, orderID:%s", order.VendorOrderID) - watchInfo := &WatchOrderInfo{ - order: order, +func (s *DefScheduler) OnOrderNew(order *model.GoodsOrder, isPending bool) (err error) { + globals.SugarLogger.Debugf("OnOrderNew orderID:%s", order.VendorOrderID) + savedOrderInfo := s.loadSavedOrderFromMap(model.Order2Status(order), false) + if savedOrderInfo == nil { + savedOrderInfo = &WatchOrderInfo{ + order: order, + } + s.orderMap.StoreWithTimeout(jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID), savedOrderInfo, orderMapStoreMaxTime) + } else { + savedOrderInfo.order = order // 调整单可能进到这里来 } - s.orderMap.Store(jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID), watchInfo) - s.resetTimer(watchInfo, watchInfo.order.Status, order.OrderCreatedAt, 0) + s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeOrder, savedOrderInfo.order.Status, false) return err } -func (s *DefScheduler) OnOrderStatusChanged(status *model.OrderStatus) (err error) { +func (s *DefScheduler) OnOrderStatusChanged(status *model.OrderStatus, isPending bool) (err error) { if status.Status > model.OrderStatusUnknown { // 只处理状态转换,一般消息不处理 - globals.SugarLogger.Debugf("OnOrderStatusChanged, status:%v", status) - savedOrderInfo := s.loadSavedOrderFromMap(status) + globals.SugarLogger.Debugf("OnOrderStatusChanged orderID:%s %s, status:%v", status.VendorOrderID, model.OrderStatusName[status.Status], status) + savedOrderInfo := s.loadSavedOrderFromMap(status, true) s.updateOrderByStatus(savedOrderInfo.order, status) - if status.Status > model.OrderStatusUnknown && status.Status < model.OrderStatusEndBegin { - if !(status.Status == model.OrderStatusFinishedPickup && len(savedOrderInfo.waybills) > 0) && status.Status != model.OrderStatusFinishedPickup { //饿了么还观察到运单消息早于拣货完成消息 - gap := 0 * time.Second - beginTime := status.StatusTime - if status.Status == model.OrderStatusNew { - beginTime = time.Now() - } else if status.Status == model.OrderStatusAccepted { - gap = time.Duration(rand.Int63n(int64(time2AutoPickupGap))) - beginTime = s.getBeginTime4LatestPickup(savedOrderInfo.order) - } else if status.Status == model.OrderStatusFinishedPickup { - // 召唤三方配送 - // 正常应该是只依赖于购物平台的第一个运单消息,但饿了么有观察到极少数情况下没有此事件,所以还是需要在这里加个保险的TIMER来驱动运单调度 - gap = time2Schedule3rdCarrierGap4OrderStatus - } - s.resetTimer(savedOrderInfo, status.Status, beginTime, gap) - } else { - s.stopTimer(savedOrderInfo) - } - } else { - s.stopTimer(savedOrderInfo) + s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeOrder, savedOrderInfo.order.Status, false) + if status.Status >= model.OrderStatusEndBegin { s.cancelOtherWaybills(savedOrderInfo, nil) s.orderMap.Delete(jxutils.GetUniversalOrderIDFromOrderStatus(status)) } @@ -123,78 +129,102 @@ func (s *DefScheduler) OnOrderStatusChanged(status *model.OrderStatus) (err erro } // 以下是运单 -func (s *DefScheduler) OnWaybillStatusChanged(bill *model.Waybill) (err error) { +func (s *DefScheduler) OnWaybillStatusChanged(bill *model.Waybill, isPending bool) (err error) { if bill.Status > model.WaybillStatusUnknown { - globals.SugarLogger.Debugf("OnWaybillStatusChanged, bill:%v", bill) - savedOrderInfo := s.loadSavedOrderFromMap(model.Waybill2Status(bill)) - if bill.Status >= model.WaybillStatusNew { - if bill.Status == model.WaybillStatusNew { - s.addWaybill2Map(savedOrderInfo, bill) - if bill.OrderVendorID == bill.WaybillVendorID { - if savedOrderInfo.timerStatus == model.OrderStatusFinishedPickup { // 如果当前TIMER还是OrderStatusFinishedPickup(在OnOrderStatusChanged中设置的),则重置 - s.resetTimer(savedOrderInfo, model.OrderStatusFinishedPickup, bill.StatusTime, 0) - } else if savedOrderInfo.timerStatus != 0 { - globals.SugarLogger.Infof("OnWaybillStatusChanged met other timer, status:%d", savedOrderInfo.timerStatus) - } + globals.SugarLogger.Debugf("OnWaybillStatusChanged orderID:%s %s, bill:%v", bill.VendorOrderID, model.WaybillStatusName[bill.Status], bill) + savedOrderInfo := s.loadSavedOrderFromMap(model.Waybill2Status(bill), true) + order := savedOrderInfo.order + if order.Status < model.OrderStatusFinishedPickup || order.Status > model.OrderStatusEndBegin { // 如果当前order状态是不应该出现运单状态 + globals.SugarLogger.Infof("OnWaybillStatusChanged orderID:%s status:%s is not suitable for waybill", order.VendorOrderID, model.OrderStatusName[order.Status]) + s.CancelWaybill(bill) + s.stopTimer(savedOrderInfo) + s.orderMap.Delete(jxutils.GetUniversalOrderIDFromOrderStatus(model.Order2Status(order))) + return nil + } + if bill.Status == model.WaybillStatusNew { + s.addWaybill2Map(savedOrderInfo, bill) + if order.WaybillVendorID != model.VendorIDUnknown && order.WaybillVendorID != bill.WaybillVendorID { + globals.SugarLogger.Infof("OnWaybillStatusChanged multiple waybill created, bill:%v", bill) + if bill.WaybillVendorID != bill.WaybillVendorID { + s.CancelWaybill(bill) + } else { + globals.SugarLogger.Warnf("OnWaybillStatusChanged bill:%v purchase platform bill came later than others, strange!!!", bill) } - if savedOrderInfo.order.WaybillVendorID != model.VendorIDUnknown && savedOrderInfo.order.WaybillVendorID != bill.WaybillVendorID { - globals.SugarLogger.Infof("OnWaybillStatusChanged multiple waybill created, bill:%v", bill) - if bill.WaybillVendorID != bill.WaybillVendorID { - s.CancelWaybill(bill) - } + } + s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeWaybill, bill.Status, false) + } else { + switch bill.Status { + case model.WaybillStatusAccepted: + if order.WaybillVendorID == model.VendorIDUnknown { + s.cancelOtherWaybills(savedOrderInfo, bill) + s.updateOrderByBill(order, bill, false) + } else { + s.CancelWaybill(bill) + globals.SugarLogger.Warnf("OnWaybillStatusChanged Accepted orderID:%s got multiple bill:%v, order details:%v", order.VendorOrderID, bill, order) } - } else { - if savedOrderInfo.order.WaybillVendorID == model.VendorIDUnknown || savedOrderInfo.order.WaybillVendorID == bill.WaybillVendorID { - s.stopTimer(savedOrderInfo) // todo 这样写可能不太合适!!! - } - switch bill.Status { - case model.WaybillStatusAccepted: - if savedOrderInfo.order.WaybillVendorID == model.VendorIDUnknown { - s.cancelOtherWaybills(savedOrderInfo, bill) - s.CurOrderManager.UpdateWaybillVendorID(bill) - savedOrderInfo.order.WaybillVendorID = bill.WaybillVendorID - } else { - globals.SugarLogger.Infof("orderID:%s got multiple bill:%v, order details:%v", savedOrderInfo.order.VendorOrderID, bill, savedOrderInfo.order) - } - case model.WaybillStatusAcceptCanceled: - if savedOrderInfo.order.WaybillVendorID == bill.WaybillVendorID { - bill.WaybillVendorID = model.VendorIDUnknown - s.CurOrderManager.UpdateWaybillVendorID(bill) - savedOrderInfo.order.WaybillVendorID = model.VendorIDUnknown + s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeWaybill, bill.Status, false) + case model.WaybillStatusAcceptCanceled: + if order.WaybillVendorID == bill.WaybillVendorID { + bill.WaybillVendorID = model.VendorIDUnknown + s.updateOrderByBill(order, bill, false) - s.createWaybillOn3rdProviders(savedOrderInfo.order, bill) - } - case model.WaybillStatusCourierArrived: // do nothing - case model.WaybillStatusFailed: // todo WaybillStatusFailed理解成订单整个失败了,不需要再尝试创建运单了,注意这里应该加个zabbix日志的报警 - s.removeWaybillFromMap(savedOrderInfo, bill) - globals.SugarLogger.Infof("OnWaybillStatusChanged WaybillStatusFailed, bill:%v", bill) - if savedOrderInfo.order.WaybillVendorID == bill.WaybillVendorID { - bill.WaybillVendorID = model.VendorIDUnknown - s.CurOrderManager.UpdateWaybillVendorID(bill) - savedOrderInfo.order.WaybillVendorID = model.VendorIDUnknown - } - case model.WaybillStatusCanceled: - s.removeWaybillFromMap(savedOrderInfo, bill) - if savedOrderInfo.order.WaybillVendorID == bill.WaybillVendorID { - bill.WaybillVendorID = model.VendorIDUnknown - s.CurOrderManager.UpdateWaybillVendorID(bill) - savedOrderInfo.order.WaybillVendorID = model.VendorIDUnknown - savedOrderInfo.retryCount++ - savedOrderInfo.order.Status = model.OrderStatusFinishedPickup // 如果运单被取消,且是主运单,将订单状态强制回滚到model.OrderStatusFinishedPickup - if savedOrderInfo.retryCount < 2 { - s.createWaybillOn3rdProviders(savedOrderInfo.order, nil) - } - } - case model.WaybillStatusDelivering: - if savedOrderInfo.order.WaybillVendorID == bill.WaybillVendorID && savedOrderInfo.order.VendorID != bill.WaybillVendorID { - s.SelfDeliverDelievering(savedOrderInfo.order) - } - case model.WaybillStatusDelivered: - s.removeWaybillFromMap(savedOrderInfo, bill) - if savedOrderInfo.order.WaybillVendorID == bill.WaybillVendorID && savedOrderInfo.order.VendorID != bill.WaybillVendorID { - s.SelfDeliverDelievered(savedOrderInfo.order) - } + s.createWaybillOn3rdProviders(order, bill) + s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeWaybill, bill.Status, false) + } else if order.WaybillVendorID != model.VendorIDUnknown { + s.CancelWaybill(bill) + globals.SugarLogger.Warnf("OnWaybillStatusChanged AcceptCanceled orderID:%s got multiple bill:%v, order details:%v", order.VendorOrderID, bill, order) } + case model.WaybillStatusCourierArrived: // do nothing + if order.WaybillVendorID == bill.WaybillVendorID { + } else { + globals.SugarLogger.Warnf("OnWaybillStatusChanged CourierArrived bill:%v shouldn't got here", bill) + } + s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeWaybill, bill.Status, false) + case model.WaybillStatusFailed: // todo WaybillStatusFailed理解成订单整个失败了,不需要再尝试创建运单了,注意这里应该加个zabbix日志的报警 + s.removeWaybillFromMap(savedOrderInfo, bill) + if order.WaybillVendorID == bill.WaybillVendorID { + globals.SugarLogger.Infof("OnWaybillStatusChanged WaybillStatusFailed, bill:%v", bill) + if order.WaybillVendorID == bill.WaybillVendorID { + bill.WaybillVendorID = model.VendorIDUnknown + s.updateOrderByBill(order, bill, true) + } + s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeWaybill, bill.Status, false) + } else { + globals.SugarLogger.Warnf("OnWaybillStatusChanged Failed bill:%v shouldn't got here", bill) + } + case model.WaybillStatusCanceled: + s.removeWaybillFromMap(savedOrderInfo, bill) + if order.WaybillVendorID == bill.WaybillVendorID { + bill.WaybillVendorID = model.VendorIDUnknown + s.updateOrderByBill(order, bill, true) + + savedOrderInfo.retryCount++ + if savedOrderInfo.retryCount <= maxWaybillRetryCount { + s.createWaybillOn3rdProviders(order, nil) + } else { + globals.SugarLogger.Warnf("OnWaybillStatusChanged Canceled bill:%v failed %d times, stop schedule", bill, savedOrderInfo.retryCount) + } + s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeWaybill, bill.Status, false) + } + case model.WaybillStatusDelivering: + if order.WaybillVendorID == bill.WaybillVendorID { + if order.VendorID != bill.WaybillVendorID { + s.SelfDeliverDelievering(order) + } + } else { + globals.SugarLogger.Warnf("OnWaybillStatusChanged Delivering bill:%v shouldn't got here", bill) + } + s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeWaybill, bill.Status, false) + case model.WaybillStatusDelivered: + s.removeWaybillFromMap(savedOrderInfo, bill) + if order.WaybillVendorID == bill.WaybillVendorID { + if order.VendorID != bill.WaybillVendorID { + s.SelfDeliverDelievered(order) + } + } else { + globals.SugarLogger.Warnf("OnWaybillStatusChanged Delivered bill:%v shouldn't got here", bill) + } + s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeWaybill, bill.Status, false) } } } @@ -205,7 +235,7 @@ func (s *DefScheduler) addWaybill2Map(savedOrderInfo *WatchOrderInfo, bill *mode for _, v := range savedOrderInfo.waybills { if v.VendorWaybillID == bill.VendorWaybillID && v.WaybillVendorID == bill.WaybillVendorID { // 如果已经存在,不做处理 - // globals.SugarLogger.Infof("addWaybill2Map bill:%v already exists", bill) + globals.SugarLogger.Warnf("addWaybill2Map bill:%v already exists", bill) return } } @@ -227,7 +257,7 @@ func (s *DefScheduler) createWaybillOn3rdProviders(order *model.GoodsOrder, excl if s.isOrderSupport3rdDelivery(order) { successCount := 0 for vendorID := range s.DeliveryPlatformHandlers { - if excludeBill == nil || vendorID != excludeBill.WaybillVendorID { + if (excludeBill == nil || vendorID != excludeBill.WaybillVendorID) && s.DeliveryPlatformHandlers[vendorID].Use4CreateWaybill { if err = s.CreateWaybill(vendorID, order); err == nil { successCount++ } @@ -236,12 +266,12 @@ func (s *DefScheduler) createWaybillOn3rdProviders(order *model.GoodsOrder, excl if successCount != 0 { return nil } - globals.SugarLogger.Warnf("createWaybillOn3rdProviders, orderID:%s all failed", order.VendorOrderID) + globals.SugarLogger.Infof("createWaybillOn3rdProviders, orderID:%s all failed", order.VendorOrderID) return scheduler.ErrCanNotCreateAtLeastOneWaybill } globals.SugarLogger.Debugf("createWaybillOn3rdProviders, orderID:%s, store:%d dont't support 3rd delivery platform", order.VendorOrderID, jxutils.GetJxStoreIDFromOrder(order)) } else { - globals.SugarLogger.Debugf("createWaybillOn3rdProviders, orderID:%s, status:%d doesn't match model.OrderStatusFinishedPickup, bypass", order.VendorOrderID, order.Status) + globals.SugarLogger.Warnf("createWaybillOn3rdProviders, orderID:%s, status:%d doesn't match model.OrderStatusFinishedPickup, bypass", order.VendorOrderID, order.Status) } return nil } @@ -259,35 +289,27 @@ func (s *DefScheduler) cancelOtherWaybills(savedOrderInfo *WatchOrderInfo, bill return nil } -// todo 这个函数也可能有线程安全问题 func (s *DefScheduler) swtich2SelfDeliverWithRetry(order *model.GoodsOrder, bill *model.Waybill, retryCount int, duration time.Duration) { - globals.SugarLogger.Debugf("Swtich2SelfDeliver orderID:%s", order.VendorOrderID) - utils.CallFuncRetryAsync(func(index int) error { - err := s.Swtich2SelfDeliver(order) - if err != nil { - globals.SugarLogger.Infof("Swtich2SelfDeliver failed, orderID:%s, error:%v", order.VendorOrderID, err) - if err != nil && index == 0 { - globals.SugarLogger.Warnf("Swtich2SelfDeliver finally failed, orderID:%s, error:%v, have to cancel bill:%v", order.VendorOrderID, err, bill) - // 如果购买平台转商家自送失败,最终还是要取消3方物流 - s.CancelWaybill(bill) - } - } - return err - }, duration, retryCount) + globals.SugarLogger.Debugf("swtich2SelfDeliverWithRetry orderID:%s", order.VendorOrderID) + // 当前先不加RETRY逻辑 + if err := s.Swtich2SelfDeliver(order); err != nil { + globals.SugarLogger.Infof("swtich2SelfDeliverWithRetry failed, cancel bill:%v, err:%v", bill, err) + s.CancelWaybill(bill) + } } // 这个函数这样写的原因是适应一些消息错序 -func (s *DefScheduler) loadSavedOrderFromMap(status *model.OrderStatus) *WatchOrderInfo { +func (s *DefScheduler) loadSavedOrderFromMap(status *model.OrderStatus, isAutoLoad bool) *WatchOrderInfo { globals.SugarLogger.Debugf("loadSavedOrderFromMap status:%v", status) universalOrderID := jxutils.ComposeUniversalOrderID(status.RefVendorOrderID, status.RefVendorID) var realSavedInfo *WatchOrderInfo if savedInfo, ok := s.orderMap.Load(universalOrderID); ok { realSavedInfo = savedInfo.(*WatchOrderInfo) } - if realSavedInfo == nil || !model.IsOrderSolid(realSavedInfo.order) { + if isAutoLoad && (realSavedInfo == nil || !model.IsOrderSolid(realSavedInfo.order)) { if realSavedInfo == nil { realSavedInfo = new(WatchOrderInfo) - s.orderMap.Store(universalOrderID, realSavedInfo) + s.orderMap.StoreWithTimeout(universalOrderID, realSavedInfo, orderMapStoreMaxTime) } else { globals.SugarLogger.Infof("loadSavedOrderFromMap order is incomplete, orderID:%s, load it", status.RefVendorOrderID) } @@ -308,41 +330,63 @@ func (s *DefScheduler) loadSavedOrderFromMap(status *model.OrderStatus) *WatchOr return realSavedInfo } -func (s *DefScheduler) getBeginTime4LatestPickup(order *model.GoodsOrder) (retVal time.Time) { - if order.ExpectedDeliveredTime != utils.DefaultTimeValue { - return order.ExpectedDeliveredTime.Add(-time2Delivered) - } - return order.StatusTime -} - func (s *DefScheduler) stopTimer(savedOrderInfo *WatchOrderInfo) { if savedOrderInfo.timer != nil { - globals.SugarLogger.Debugf("stopTimer orderid:%v", savedOrderInfo.order.VendorOrderID) + globals.SugarLogger.Debugf("stopTimer orderID:%s", savedOrderInfo.order.VendorOrderID) savedOrderInfo.timer.Stop() savedOrderInfo.timerStatus = 0 + savedOrderInfo.timer = nil } } -func (s *DefScheduler) resetTimer(savedOrderInfo *WatchOrderInfo, status int, beginTime time.Time, gap time.Duration) { - globals.SugarLogger.Debugf("resetTimer status:%v, orderID:%s", status, savedOrderInfo.order.VendorOrderID) - if status >= savedOrderInfo.timerStatus { // 新设置的TIMER不能覆盖状态在其后的TIMER,如果状态回绕,需要注意 - s.stopTimer(savedOrderInfo) - config := s.mergeOrderStatusConfig(status, s.GetPurchasePlatformFromVendorID(savedOrderInfo.order.VendorID).GetStatusActionConfig(status)) - if config != nil && config.TimeoutAction != nil { - timeout := jxutils.GetRealTimeout(beginTime, config.Timeout, minTimeout) + gap - globals.SugarLogger.Debugf("resetTimer timeout:%v, orderID:%s", timeout, savedOrderInfo.order.VendorOrderID) - savedOrderInfo.timerStatus = status - order := savedOrderInfo.order - savedOrderInfo.timer = time.AfterFunc(timeout, func() { - // order 事件序列化 - jxutils.CallMsgHandlerAsync(func() { - config.TimeoutAction(order) - savedOrderInfo.timerStatus = 0 - }, order.VendorOrderID) - }) +func (s *DefScheduler) resetTimer(savedOrderInfo *WatchOrderInfo, statusType, status int, isPending bool) { + order := savedOrderInfo.order + globals.SugarLogger.Debugf("resetTimer, orderID:%s status:%v", order.VendorOrderID, status) + + if statusType != savedOrderInfo.timerStatusType || status >= savedOrderInfo.timerStatus { // 新设置的TIMER不能覆盖状态在其后的TIMER,如果状态回绕,需要注意 + config := s.mergeOrderStatusConfig(statusType, status, s.GetPurchasePlatformFromVendorID(order.VendorID).GetStatusActionConfig(statusType, status)) + if config == nil || config.TimerType != scheduler.TimerTypeByPass { + s.stopTimer(savedOrderInfo) + } + if config != nil && config.TimeoutAction != nil && config.TimerType != scheduler.TimerTypeNoTimer && config.TimerType != scheduler.TimerTypeByPass { + var timeout time.Duration + switch config.TimerType { + case scheduler.TimerTypeBaseNow: + timeout = config.Timeout + case scheduler.TimerTypeBaseStatusTime: + timeout = order.StatusTime.Sub(time.Now()) + config.Timeout + case scheduler.TimerTypeBaseExpectedDeliveredTime: + expectedDeliveredTime := order.ExpectedDeliveredTime + if expectedDeliveredTime == utils.DefaultTimeValue { // 如果没有期望送达时间,则以订单创建时间加DefaultTimeValue来表示 + expectedDeliveredTime = order.OrderCreatedAt.Add(time2Delivered) + } + timeout = expectedDeliveredTime.Add(-config.Timeout).Sub(time.Now()) + default: + panic("TimerType is wrong!!!") + } + if config.TimeoutGap != 0 { + timeout += time.Duration(rand.Int31n(int32(config.TimeoutGap))) * time.Second + } + if isPending && timeout < pendingOrderTimerMaxSecond*time.Second { // 如果是PENDING的订单,则将其分布到2--5秒内,让后续事件有机会执行 + timeout = time.Duration(jxutils.MapValue2Scope(int64(timeout), -pendingOrderTimerMinMinSecond*1000, pendingOrderTimerMaxSecond*1000, pendingOrderTimerMinSecond*1000, pendingOrderTimerMaxSecond*1000)) * time.Millisecond + } else if timeout < 0 { + timeout = 0 + } + if timeout == 0 { + config.TimeoutAction(order) + } else { + savedOrderInfo.timerStatusType = statusType + savedOrderInfo.timerStatus = status + savedOrderInfo.timer = time.AfterFunc(timeout, func() { + jxutils.CallMsgHandlerAsync(func() { + config.TimeoutAction(order) + savedOrderInfo.timerStatus = 0 + savedOrderInfo.timerStatusType = scheduler.TimerStatusTypeUnknown + }, order.VendorOrderID) + }) + } + globals.SugarLogger.Debugf("resetTimer orderID:%s, status:%d, timeout:%v", order.VendorOrderID, status, timeout) } - } else { - globals.SugarLogger.Infof("resetTimer status revert, orderID:%s, current timer status:%d, status:%d", savedOrderInfo.order.VendorOrderID, savedOrderInfo.timerStatus, status) } } @@ -378,8 +422,8 @@ func (s *DefScheduler) handleAutoAcceptOrder(orderID string, vendorID int, userM return handleType } -func (s *DefScheduler) mergeOrderStatusConfig(status int, config *scheduler.StatusActionConfig) (retVal *scheduler.StatusActionConfig) { - defConfig := s.defWorkflowConfig[status] +func (s *DefScheduler) mergeOrderStatusConfig(statusType, status int, config *scheduler.StatusActionConfig) (retVal *scheduler.StatusActionConfig) { + defConfig := s.defWorkflowConfig[statusType][status] if defConfig == nil && config == nil { return nil } @@ -423,3 +467,11 @@ func (s *DefScheduler) isOrderSupport3rdDelivery(order *model.GoodsOrder) (retVa }, "isOrderSupport3rdDelivery") return retVal } + +func (s *DefScheduler) updateOrderByBill(order *model.GoodsOrder, bill *model.Waybill, revertStatus bool) { + s.CurOrderManager.UpdateWaybillVendorID(bill, revertStatus) + order.WaybillVendorID = bill.WaybillVendorID + if revertStatus { + order.Status = model.OrderStatusFinishedPickup + } +} diff --git a/business/scheduler/scheduler.go b/business/scheduler/scheduler.go index b1596d134..e39aa575b 100644 --- a/business/scheduler/scheduler.go +++ b/business/scheduler/scheduler.go @@ -10,6 +10,20 @@ import ( "git.rosy.net.cn/jx-callback/globals" ) +const ( + TimerStatusTypeUnknown = -1 + TimerStatusTypeOrder = 0 + TimerStatusTypeWaybill = 1 +) + +const ( + TimerTypeNoTimer = 0 // 即此状态没有TIMER(但为停掉之前的TIMER,如果要保持TIMER应该用TimerTypeByPass),状态没有配置缺省就是没有TIMER + TimerTypeByPass = 1 + TimerTypeBaseNow = 2 + TimerTypeBaseStatusTime = 3 + TimerTypeBaseExpectedDeliveredTime = 4 // 以expected delivery time倒推的时间(减去StatusActionConfig.Timeout) +) + var ( CurrentScheduler Scheduler ) @@ -23,14 +37,16 @@ var ( ) type StatusActionConfig struct { - Timeout time.Duration // 超时时间,为0的话表示禁用 + TimerType int // 参见上面的相关常量定义 + Timeout time.Duration // 超时时间,为0表示立即执行(其实也不能立即执行,因为有最小时间限制) + TimeoutGap int // 以秒为单位的随机时间,0表示不随机 TimeoutAction func(order *model.GoodsOrder) (err error) // 超时后需要执行的动作,为nil表示此状态不需要执行监控 } type PurchasePlatformHandler interface { GetStatusFromVendorStatus(vendorStatus string) int GetOrder(vendorOrderID string) (order *model.GoodsOrder, err error) - GetStatusActionConfig(status int) *StatusActionConfig + GetStatusActionConfig(statusType, status int) *StatusActionConfig AcceptOrRefuseOrder(order *model.GoodsOrder, isAcceptIt bool) (err error) PickedUpGoods(order *model.GoodsOrder) (err error) @@ -45,35 +61,40 @@ type DeliveryPlatformHandler interface { CancelWaybill(bill *model.Waybill) (err error) } +type DeliveryPlatformHandlerInfo struct { + Handler DeliveryPlatformHandler + Use4CreateWaybill bool +} + type OrderManager interface { LoadOrder(vendorOrderID string, vendorID int) (order *model.GoodsOrder, err error) // OnOrderStatusChanged(status *model.OrderStatus) (err error) // 此消息是否使用还不确定 - UpdateWaybillVendorID(bill *model.Waybill) (err error) + UpdateWaybillVendorID(bill *model.Waybill, revertStatus bool) (err error) } type Scheduler interface { RegisterOrderManager(handler OrderManager) RegisterPurchasePlatform(vendorID int, handler PurchasePlatformHandler) - RegisterDeliveryPlatform(vendorID int, handler DeliveryPlatformHandler) + RegisterDeliveryPlatform(vendorID int, handler DeliveryPlatformHandler, isUse4CreateWaybill bool) // 以下是订单 - OnOrderNew(order *model.GoodsOrder) (err error) - OnOrderStatusChanged(status *model.OrderStatus) (err error) + OnOrderNew(order *model.GoodsOrder, isPending bool) (err error) + OnOrderStatusChanged(status *model.OrderStatus, isPending bool) (err error) // 以下是运单 - OnWaybillStatusChanged(bill *model.Waybill) (err error) + OnWaybillStatusChanged(bill *model.Waybill, isPending bool) (err error) } type BaseScheduler struct { CurOrderManager OrderManager PurchasePlatformHandlers map[int]PurchasePlatformHandler - DeliveryPlatformHandlers map[int]DeliveryPlatformHandler + DeliveryPlatformHandlers map[int]*DeliveryPlatformHandlerInfo IsReallyCallPlatformAPI bool } func (c *BaseScheduler) Init() { c.PurchasePlatformHandlers = make(map[int]PurchasePlatformHandler) - c.DeliveryPlatformHandlers = make(map[int]DeliveryPlatformHandler) + c.DeliveryPlatformHandlers = make(map[int]*DeliveryPlatformHandlerInfo) } func (c *BaseScheduler) RegisterOrderManager(handler OrderManager) { @@ -90,21 +111,24 @@ func (c *BaseScheduler) RegisterPurchasePlatform(vendorID int, handler PurchaseP c.PurchasePlatformHandlers[vendorID] = handler } -func (c *BaseScheduler) RegisterDeliveryPlatform(vendorID int, handler DeliveryPlatformHandler) { +func (c *BaseScheduler) RegisterDeliveryPlatform(vendorID int, handler DeliveryPlatformHandler, isUse4CreateWaybill bool) { if !(vendorID >= model.VendorIDDeliveryBegin && vendorID <= model.VendorIDDeliveryEnd) { panic(fmt.Sprintf("delivery vendor:%d is illegal", vendorID)) } if _, ok := c.DeliveryPlatformHandlers[vendorID]; ok { panic(fmt.Sprintf("delivery vendor:%d, already exists", vendorID)) } - c.DeliveryPlatformHandlers[vendorID] = handler + c.DeliveryPlatformHandlers[vendorID] = &DeliveryPlatformHandlerInfo{ + Handler: handler, + Use4CreateWaybill: isUse4CreateWaybill, + } } func (c *BaseScheduler) GetPurchasePlatformFromVendorID(vendorID int) PurchasePlatformHandler { return c.PurchasePlatformHandlers[vendorID] } -func (c *BaseScheduler) GetDeliveryPlatformFromVendorID(vendorID int) DeliveryPlatformHandler { +func (c *BaseScheduler) GetDeliveryPlatformFromVendorID(vendorID int) *DeliveryPlatformHandlerInfo { return c.DeliveryPlatformHandlers[vendorID] } @@ -144,7 +168,7 @@ func (c *BaseScheduler) Swtich2SelfDeliver(order *model.GoodsOrder) (err error) }, "Swtich2SelfDeliver orderID:%s", order.VendorOrderID) } } else { - globals.SugarLogger.Infof("Swtich2SelfDeliver orderID:%s, status:% is not suitable", order.VendorOrderID, order.Status) + globals.SugarLogger.Infof("Swtich2SelfDeliver orderID:%s, status:%d is not suitable", order.VendorOrderID, order.Status) } return err } @@ -184,19 +208,24 @@ func (c *BaseScheduler) CreateWaybill(platformVendorID int, order *model.GoodsOr return ErrOrderIsNotSolid } if c.IsReallyCallPlatformAPI { - err = utils.CallFuncLogError(func() error { - return c.GetDeliveryPlatformFromVendorID(platformVendorID).CreateWaybill(order) - }, "CreateWaybill orderID:%s, vendorID:%d", order.VendorOrderID, platformVendorID) + handlerInfo := c.GetDeliveryPlatformFromVendorID(platformVendorID) + if handlerInfo.Use4CreateWaybill { + err = utils.CallFuncLogError(func() error { + return handlerInfo.Handler.CreateWaybill(order) + }, "CreateWaybill orderID:%s, vendorID:%d", order.VendorOrderID, platformVendorID) + } } return err } func (c *BaseScheduler) CancelWaybill(bill *model.Waybill) (err error) { globals.SugarLogger.Infof("CancelWaybill bill:%v", bill) - if c.IsReallyCallPlatformAPI { - err = utils.CallFuncLogError(func() error { - return c.GetDeliveryPlatformFromVendorID(bill.WaybillVendorID).CancelWaybill(bill) - }, "CancelWaybill bill:%v", bill) + if c.IsReallyCallPlatformAPI && bill.OrderVendorID != bill.WaybillVendorID { + if handlerInfo := c.GetDeliveryPlatformFromVendorID(bill.WaybillVendorID); handlerInfo != nil { + err = utils.CallFuncLogError(func() error { + return handlerInfo.Handler.CancelWaybill(bill) + }, "CancelWaybill bill:%v", bill) + } } return err } @@ -204,6 +233,6 @@ func (c *BaseScheduler) CancelWaybill(bill *model.Waybill) (err error) { type BasePurchasePlatform struct { } -func (p *BasePurchasePlatform) GetStatusActionConfig(status int) *StatusActionConfig { +func (p *BasePurchasePlatform) GetStatusActionConfig(statusType, status int) *StatusActionConfig { return nil }