package defsch import ( "time" "math/rand" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxutils" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/scheduler" "git.rosy.net.cn/jx-callback/globals" "git.rosy.net.cn/jx-callback/legacy/models" "github.com/astaxie/beego/orm" ) const ( time2Delivered = 1 * time.Hour // 正常订单都是1小时达 time2Schedule3rdCarrier = 330 * time.Second // 京东要求5分钟后才能转自送,保险起见,设置为5分半钟 time2Schedule3rdCarrierGap4OrderStatus = 3 * time.Minute // 京东要求是运单状态为待抢单且超时5分钟,但为了防止没有运单事件,所以就拣货完成事件开始算,添加3分钟 time2AutoPickupMin = 15 * time.Minute time2AutoPickupGap = 5 * time.Minute minTimeout = 5 * time.Second // timer的最小时间,这样写的上的是在load pending orders,让延迟的事件有机会执行 ) type WatchOrderInfo struct { order *model.GoodsOrder // order里的信息是保持更新的 dirty int // 因为京东事件序列New与Accepted有极少数情况下会错序,处理延迟加载 waybills []*model.Waybill // 这个waybills里的状态信息是不真实的,只使用id相关的信息 timerStatus int timer *time.Timer } // 重要:此调度器要求同一定单的处理逻辑必须是序列化了的,不然会有并发问题 type DefScheduler struct { scheduler.BaseScheduler defWorkflowConfig map[int]*scheduler.StatusActionConfig orderMap jxutils.SyncMapWithTimeout } func init() { sch := &DefScheduler{} sch.IsReallyCallPlatformAPI = globals.ReallyCallPlatformAPI sch.Init() scheduler.CurrentScheduler = sch sch.defWorkflowConfig = map[int]*scheduler.StatusActionConfig{ model.OrderStatusNew: &scheduler.StatusActionConfig{ // 自动接单 Timeout: 1 * time.Second, TimeoutAction: func(order *model.GoodsOrder) (err error) { _ = sch.handleAutoAcceptOrder(order.VendorOrderID, order.VendorID, order.ConsigneeMobile, jxutils.GetJxStoreIDFromOrder(order), nil, func(isAcceptIt bool) error { if err = sch.AcceptOrRefuseOrder(order, isAcceptIt); err != nil { // 为了解决京东新消息与接单消息乱序的问题 if errWithCode, ok := err.(*utils.ErrorWithCode); ok && errWithCode.Level() == 1 && errWithCode.IntCode() == -1 { if order2, err2 := sch.GetPurchasePlatformFromVendorID(order.VendorID).GetOrder(order.VendorOrderID); err2 == nil && order2.Status > order.Status { sch.OnOrderStatusChanged(model.Order2Status(order2)) err = nil } else { err = err2 } } } return err }) return nil }, }, model.OrderStatusAccepted: &scheduler.StatusActionConfig{ // 自动拣货 Timeout: time2AutoPickupMin, TimeoutAction: func(order *model.GoodsOrder) (err error) { return sch.PickedUpGoods(order) }, }, model.OrderStatusFinishedPickup: &scheduler.StatusActionConfig{ // 尝试召唤更多物流 Timeout: time2Schedule3rdCarrier, TimeoutAction: func(order *model.GoodsOrder) (err error) { return sch.createWaybillOn3rdProviders(order, nil) }, }, } } // 以下是订单 func (s *DefScheduler) OnOrderNew(order *model.GoodsOrder) (err error) { globals.SugarLogger.Debugf("OnOrderNew, orderID:%s", order.VendorOrderID) watchInfo := &WatchOrderInfo{ order: order, } s.orderMap.Store(jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID), watchInfo) s.resetTimer(watchInfo, watchInfo.order.Status, order.OrderCreatedAt, 0) return err } func (s *DefScheduler) OnOrderStatusChanged(status *model.OrderStatus) (err error) { if status.Status > model.OrderStatusUnknown { // 只处理状态转换,一般消息不处理 globals.SugarLogger.Debugf("OnOrderStatusChanged, status:%v", status) if savedOrderInfo := s.loadSavedOrderFromMap(status); savedOrderInfo != nil { s.updateOrderByStatus(savedOrderInfo.order, status) if status.Status > model.OrderStatusUnknown && status.Status < model.OrderStatusEndBegin { if !(status.Status == model.OrderStatusFinishedPickup && len(savedOrderInfo.waybills) > 0) { //饿了么还观察到运单消息早于拣货完成消息 gap := 0 * time.Second beginTime := status.StatusTime if status.Status == model.OrderStatusNew { beginTime = time.Now() } else if status.Status == model.OrderStatusAccepted { gap = time.Duration(rand.Int63n(int64(time2AutoPickupGap))) beginTime = s.getBeginTime4LatestPickup(savedOrderInfo.order) } else if status.Status == model.OrderStatusFinishedPickup { // 召唤三方配送 // 正常应该是只依赖于购物平台的第一个运单消息,但饿了么有观察到极少数情况下没有此事件,所以还是需要在这里加个保险的TIMER来驱动运单调度 gap = time2Schedule3rdCarrierGap4OrderStatus } s.resetTimer(savedOrderInfo, status.Status, beginTime, gap) } } else { s.stopTimer(savedOrderInfo) s.cancelOtherWaybills(savedOrderInfo, nil) s.orderMap.Delete(jxutils.GetUniversalOrderIDFromOrderStatus(status)) } } else { err = scheduler.ErrCanNotFindOrder } } return err } // 以下是运单 func (s *DefScheduler) OnWaybillStatusChanged(bill *model.Waybill) (err error) { if bill.Status > model.WaybillStatusUnknown { globals.SugarLogger.Debugf("OnWaybillStatusChanged, bill:%v", bill) if savedOrderInfo := s.loadSavedOrderFromMap(model.Waybill2Status(bill)); savedOrderInfo != nil { s.addWaybill2Map(savedOrderInfo, bill) // 这样写的原因是因为调试时,程度从中途运行,没有接受到WaybillStatusNew事件 if bill.Status == model.WaybillStatusNew { if bill.OrderVendorID == bill.WaybillVendorID { if savedOrderInfo.timerStatus == model.OrderStatusFinishedPickup { // 如果当前TIMER还是OrderStatusFinishedPickup(在OnOrderStatusChanged中设置的),则重置 s.resetTimer(savedOrderInfo, model.OrderStatusFinishedPickup, bill.StatusTime, 0) } else if savedOrderInfo.timerStatus != 0 { globals.SugarLogger.Infof("OnWaybillStatusChanged met other timer, status:%d", savedOrderInfo.timerStatus) } } if savedOrderInfo.order.WaybillVendorID != model.VendorIDUnknown { globals.SugarLogger.Infof("OnWaybillStatusChanged multiple waybill created, bill:%v", bill) if bill.WaybillVendorID != bill.WaybillVendorID { s.CancelWaybill(bill) } } } else { switch bill.Status { case model.WaybillStatusAccepted: s.stopTimer(savedOrderInfo) // todo 这里应该另外启动一个TIMER s.cancelOtherWaybills(savedOrderInfo, bill) s.CurOrderManager.UpdateWaybillVendorID(bill) savedOrderInfo.order.WaybillVendorID = bill.WaybillVendorID case model.WaybillStatusAcceptCanceled: if savedOrderInfo.order.WaybillVendorID == bill.WaybillVendorID { s.createWaybillOn3rdProviders(savedOrderInfo.order, bill) bill.WaybillVendorID = model.VendorIDUnknown s.CurOrderManager.UpdateWaybillVendorID(bill) savedOrderInfo.order.WaybillVendorID = bill.WaybillVendorID } case model.WaybillStatusFailed: // todo WaybillStatusFailed理解成订单整个失败了,不需要再尝试创建运单了,注意这里应该加个zabbix日志的报警 s.removeWaybillFromMap(savedOrderInfo, bill) globals.SugarLogger.Infof("OnWaybillStatusChanged WaybillStatusFailed, bill:%v", bill) case model.WaybillStatusCanceled: s.removeWaybillFromMap(savedOrderInfo, bill) if savedOrderInfo.order.WaybillVendorID == bill.WaybillVendorID { s.createWaybillOn3rdProviders(savedOrderInfo.order, nil) bill.WaybillVendorID = model.VendorIDUnknown s.CurOrderManager.UpdateWaybillVendorID(bill) savedOrderInfo.order.WaybillVendorID = bill.WaybillVendorID } case model.WaybillStatusDelivering: if savedOrderInfo.order.VendorID != bill.WaybillVendorID { s.SelfDeliverDelievering(savedOrderInfo.order) } case model.WaybillStatusDelivered: s.removeWaybillFromMap(savedOrderInfo, bill) if savedOrderInfo.order.VendorID != bill.WaybillVendorID { s.SelfDeliverDelievered(savedOrderInfo.order) } } } } else { err = scheduler.ErrCanNotFindOrder } } return err } func (s *DefScheduler) addWaybill2Map(savedOrderInfo *WatchOrderInfo, bill *model.Waybill) { for _, v := range savedOrderInfo.waybills { if v.VendorWaybillID == bill.VendorWaybillID && v.WaybillVendorID == bill.WaybillVendorID { // 如果已经存在,不做处理 // globals.SugarLogger.Infof("addWaybill2Map bill:%v already exists", bill) return } } savedOrderInfo.waybills = append(savedOrderInfo.waybills, bill) } func (s *DefScheduler) removeWaybillFromMap(savedOrderInfo *WatchOrderInfo, bill *model.Waybill) { for k, v := range savedOrderInfo.waybills { if v.VendorWaybillID == bill.VendorWaybillID && v.WaybillVendorID == bill.WaybillVendorID { savedOrderInfo.waybills = append(savedOrderInfo.waybills[0:k], savedOrderInfo.waybills[k+1:]...) break } } } func (s *DefScheduler) createWaybillOn3rdProviders(order *model.GoodsOrder, excludeBill *model.Waybill) (err error) { globals.SugarLogger.Debugf("createWaybillOn3rdProviders, orderID:%s, excludeBill:%v", order.VendorOrderID, excludeBill) successCount := 0 for vendorID := range s.DeliveryPlatformHandlers { if excludeBill == nil || vendorID != excludeBill.WaybillVendorID { if err = s.CreateWaybill(vendorID, order); err == nil { successCount++ } } } if successCount != 0 { return nil } globals.SugarLogger.Warnf("createWaybillOn3rdProviders, orderID:%s all failed", order.VendorOrderID) return scheduler.ErrCanNotCreateAtLeastOneWaybill } func (s *DefScheduler) cancelOtherWaybills(savedOrderInfo *WatchOrderInfo, bill *model.Waybill) (err error) { globals.SugarLogger.Debugf("cancelOtherWaybills, orderID:%s, bill:%v", savedOrderInfo.order.VendorOrderID, bill) for _, v := range savedOrderInfo.waybills { if (v.OrderVendorID != v.WaybillVendorID) && (bill == nil || !(v.WaybillVendorID == bill.WaybillVendorID && v.VendorWaybillID == bill.VendorWaybillID)) { s.CancelWaybill(v) } } if bill != nil && bill.WaybillVendorID != bill.OrderVendorID { s.swtich2SelfDeliverWithRetry(savedOrderInfo.order, bill, 2, 10*time.Second) } return nil } // todo 这个函数也可能有线程安全问题 func (s *DefScheduler) swtich2SelfDeliverWithRetry(order *model.GoodsOrder, bill *model.Waybill, retryCount int, duration time.Duration) { globals.SugarLogger.Debugf("Swtich2SelfDeliver orderID:%s", order.VendorOrderID) utils.CallFuncRetryAsync(func(index int) error { err := s.Swtich2SelfDeliver(order) if err != nil { globals.SugarLogger.Infof("Swtich2SelfDeliver failed, orderID:%s, error:%v", order.VendorOrderID, err) if err != nil && index == 0 { globals.SugarLogger.Warnf("Swtich2SelfDeliver finally failed, orderID:%s, error:%v, have to cancel bill:%v", order.VendorOrderID, err, bill) // 如果购买平台转商家自送失败,最终还是要取消3方物流 s.CancelWaybill(bill) } } return err }, duration, retryCount) } // 这个函数这样写的原因是适应一些消息错序 func (s *DefScheduler) loadSavedOrderFromMap(status *model.OrderStatus) *WatchOrderInfo { globals.SugarLogger.Debugf("loadSavedOrderFromMap status:%v", status) universalOrderID := jxutils.ComposeUniversalOrderID(status.RefVendorOrderID, status.RefVendorID) var realSavedInfo *WatchOrderInfo if savedInfo, ok := s.orderMap.Load(universalOrderID); ok { realSavedInfo = savedInfo.(*WatchOrderInfo) } if realSavedInfo == nil || realSavedInfo.dirty == 1 { if realSavedInfo == nil { realSavedInfo = new(WatchOrderInfo) s.orderMap.Store(universalOrderID, realSavedInfo) } else { realSavedInfo.dirty = 0 globals.SugarLogger.Infof("loadSavedOrderFromMap order is dirty, orderID:%s, load it", status.RefVendorOrderID) } if order, err := s.CurOrderManager.LoadOrder(status.RefVendorOrderID, status.RefVendorID); err == nil { realSavedInfo.order = order } else { realSavedInfo.order = &model.GoodsOrder{ VendorOrderID: status.RefVendorOrderID, VendorID: status.RefVendorID, Status: status.Status, StatusTime: status.StatusTime, OrderCreatedAt: status.StatusTime, WaybillVendorID: model.VendorIDUnknown, } realSavedInfo.dirty = 1 globals.SugarLogger.Infof("loadSavedOrderFromMap can not load order orderID:%s", status.VendorOrderID) } } return realSavedInfo } func (s *DefScheduler) getBeginTime4LatestPickup(order *model.GoodsOrder) (retVal time.Time) { if order.ExpectedDeliveredTime != utils.DefaultTimeValue { return order.ExpectedDeliveredTime.Add(-time2Delivered) } return order.StatusTime } func (s *DefScheduler) stopTimer(savedOrderInfo *WatchOrderInfo) { if savedOrderInfo.timer != nil { globals.SugarLogger.Debugf("stopTimer orderid:%v", savedOrderInfo.order.VendorOrderID) savedOrderInfo.timer.Stop() } } func (s *DefScheduler) resetTimer(savedOrderInfo *WatchOrderInfo, status int, beginTime time.Time, gap time.Duration) { globals.SugarLogger.Debugf("resetTimer status:%v, orderID:%s", status, savedOrderInfo.order.VendorOrderID) if status >= savedOrderInfo.timerStatus { // 新设置的TIMER不能覆盖状态在其后的TIMER s.stopTimer(savedOrderInfo) config := s.mergeOrderStatusConfig(status, s.GetPurchasePlatformFromVendorID(savedOrderInfo.order.VendorID).GetStatusActionConfig(status)) if config != nil && config.TimeoutAction != nil { timeout := jxutils.GetRealTimeout(beginTime, config.Timeout, minTimeout) + gap globals.SugarLogger.Debugf("resetTimer timeout:%v, orderID:%s", timeout, savedOrderInfo.order.VendorOrderID) savedOrderInfo.timerStatus = status order := savedOrderInfo.order savedOrderInfo.timer = time.AfterFunc(timeout, func() { // order 事件序列化 jxutils.CallMsgHandlerAsync(func() { config.TimeoutAction(order) savedOrderInfo.timerStatus = 0 }, order.VendorOrderID) }) } } else { globals.SugarLogger.Infof("resetTimer status revert, orderID:%s, current timer status:%d, status:%d", savedOrderInfo.order.VendorOrderID, savedOrderInfo.timerStatus, status) } } func (s *DefScheduler) handleAutoAcceptOrder(orderID string, vendorID int, userMobile string, jxStoreID int, db orm.Ormer, handler func(accepted bool) error) int { handleType := 0 if userMobile != "" { if db == nil { db = orm.NewOrm() } user := &models.BlackClient{ Mobile: userMobile, } if err := db.Read(user, "Mobile"); err != nil { if err != orm.ErrNoRows { globals.SugarLogger.Errorf("read data error:%v, data:%v, vendorID:%d", err, user, vendorID) } // 在访问数据库出错的情况下,也需要自动接单 handleType = 1 } else { // 强制拒单 globals.SugarLogger.Infof("force reject order:%s, vendorID:%d", orderID, vendorID) handleType = -1 } } else { globals.SugarLogger.Infof("order:%s, vendorID:%d, mobile is empty, should accept order", orderID, vendorID) handleType = 1 } if handleType == 1 { handler(true) } else if handleType == -1 { handler(false) } return handleType } func (s *DefScheduler) mergeOrderStatusConfig(status int, config *scheduler.StatusActionConfig) (retVal *scheduler.StatusActionConfig) { defConfig := s.defWorkflowConfig[status] if defConfig == nil && config == nil { return nil } retVal = &scheduler.StatusActionConfig{} if defConfig != nil { retVal.Timeout = defConfig.Timeout retVal.TimeoutAction = defConfig.TimeoutAction } if config != nil { if config.Timeout >= 0 { retVal.Timeout = config.Timeout } if config.TimeoutAction != nil { retVal.TimeoutAction = config.TimeoutAction } } return retVal } func (s *DefScheduler) updateOrderByStatus(order *model.GoodsOrder, status *model.OrderStatus) (retVal *model.GoodsOrder) { order.Status = status.Status order.VendorStatus = status.VendorStatus order.StatusTime = status.StatusTime return order }