package defsch import ( "time" "math/rand" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxutils" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/scheduler" "git.rosy.net.cn/jx-callback/globals" "git.rosy.net.cn/jx-callback/legacy/models" "github.com/astaxie/beego/orm" ) const ( defTime2Delivered = 1 * time.Hour // 正常订单都是1小时达 defTime2Schedule3rdCarrier = 330 * time.Second // 京东要求5分钟后才能转自送,保险起见,设置为5分半钟 time2Schedule3rdCarrierGap4OrderStatus = 3 * time.Minute // 京东要求是运单状态为待抢单且超时5分钟,但为了防止没有运单事件,所以就拣货完成事件开始算,添加3分钟 defTime2AutoPickupMin = 25 * time.Minute time2AutoPickupGap = 5 * time.Minute ) type WatchOrderInfo struct { order *model.GoodsOrder // order里的信息是保持更新的 waybills []*model.Waybill // 这个waybills里的状态信息是不真实的,只使用id相关的信息 timerStatus int timer *time.Timer } // 重要:此调度器要求同一定单的处理逻辑必须是序列化了的,不然会有并发问题 type DefScheduler struct { scheduler.BaseScheduler defWorkflowConfig map[int]*scheduler.StatusActionConfig orderMap jxutils.SyncMapWithTimeout } func init() { sch := &DefScheduler{} sch.Init() scheduler.CurrentScheduler = sch sch.defWorkflowConfig = map[int]*scheduler.StatusActionConfig{ model.OrderStatusNew: &scheduler.StatusActionConfig{ // 自动接单 Timeout: 1 * time.Second, TimeoutAction: func(order *model.GoodsOrder) (err error) { _ = sch.handleAutoAcceptOrder(order.VendorOrderID, order.VendorID, order.ConsigneeMobile, jxutils.GetJxStoreIDFromOrder(order), nil, func(isAcceptIt bool) error { return sch.GetPurchasePlatformFromVendorID(order.VendorID).AcceptOrRefuseOrder(order, isAcceptIt) }) return nil }, }, model.OrderStatusAccepted: &scheduler.StatusActionConfig{ // 自动拣货 Timeout: defTime2AutoPickupMin, TimeoutAction: func(order *model.GoodsOrder) (err error) { return sch.GetPurchasePlatformFromVendorID(order.VendorID).PickedUpGoods(order) }, }, model.OrderStatusFinishedPickup: &scheduler.StatusActionConfig{ // 尝试召唤更多物流 Timeout: defTime2Schedule3rdCarrier, TimeoutAction: func(order *model.GoodsOrder) (err error) { return sch.createWaybillOn3rdProviders(order, nil) }, }, } } // 以下是订单 func (s *DefScheduler) OnOrderNew(order *model.GoodsOrder) (err error) { globals.SugarLogger.Debugf("OnOrderNew, order:%v", order) watchInfo := &WatchOrderInfo{ order: order, } s.orderMap.Store(jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID), watchInfo) s.resetTimer(model.OrderStatusNew, watchInfo, 0) return err } func (s *DefScheduler) OnOrderStatusChanged(status *model.OrderStatus) (err error) { globals.SugarLogger.Debugf("OnOrderStatusChanged, status:%v", status) if savedOrderInfo := s.loadWatchOrderFromMap(status.VendorOrderID, status.VendorID); savedOrderInfo != nil { if status.Status > model.OrderStatusUnknown && status.Status < model.OrderStatusEndBegin { s.updateOrderByStatus(savedOrderInfo.order, status) gap := 0 * time.Second if status.Status == model.OrderStatusAccepted { gap = time.Duration(rand.Int63n(int64(time2AutoPickupGap))) } else if status.Status == model.OrderStatusFinishedPickup { gap = time2Schedule3rdCarrierGap4OrderStatus } s.resetTimer(status.Status, savedOrderInfo, gap) } else { s.stopTimer(savedOrderInfo) s.orderMap.Delete(jxutils.GetUniversalOrderIDFromOrderStatus(status)) } } return err } // 以下是运单 func (s *DefScheduler) OnWaybillStatusChanged(bill *model.Waybill) (err error) { globals.SugarLogger.Debugf("OnWaybillStatusChanged, bill:%v", bill) if savedOrderInfo := s.loadWatchOrderFromMap(bill.VendorOrderID, bill.OrderVendorID); savedOrderInfo != nil { s.addWaybill2Map(savedOrderInfo, bill) // 这样写的原因是因为调试时,程度从中途运行,没有接受到WaybillStatusNew事件 if bill.Status == model.WaybillStatusNew { if bill.OrderVendorID == bill.WaybillVendorID { if savedOrderInfo.timerStatus == model.OrderStatusFinishedPickup { s.resetTimer(model.OrderStatusFinishedPickup, savedOrderInfo, 0) } else { globals.SugarLogger.Infof("OnWaybillStatusChanged met other timer, status:%d", savedOrderInfo.timerStatus) } } if savedOrderInfo.order.WaybillVendorID != model.VendorIDUnknown { globals.SugarLogger.Infof("OnWaybillStatusChanged multiple waybill created, bill:%v", bill) if bill.WaybillVendorID != bill.WaybillVendorID { s.GetDeliveryPlatformFromVendorID(bill.WaybillVendorID).CancelWaybill(bill) } } } else { switch bill.Status { case model.WaybillStatusAccepted: s.stopTimer(savedOrderInfo) // todo 这里应该另外启动一个TIMER s.cancelOtherWaybills(savedOrderInfo, bill) s.CurOrderManager.UpdateWaybillVendorID(bill) savedOrderInfo.order.WaybillVendorID = bill.WaybillVendorID case model.WaybillStatusAcceptCanceled: s.createWaybillOn3rdProviders(savedOrderInfo.order, bill) if savedOrderInfo.order.WaybillVendorID == bill.WaybillVendorID { bill.WaybillVendorID = model.VendorIDUnknown s.CurOrderManager.UpdateWaybillVendorID(bill) savedOrderInfo.order.WaybillVendorID = bill.WaybillVendorID } case model.WaybillStatusCanceled, model.WaybillStatusFailed: s.removeWaybillFromMap(savedOrderInfo, bill) if savedOrderInfo.order.WaybillVendorID == bill.WaybillVendorID { s.createWaybillOn3rdProviders(savedOrderInfo.order, nil) bill.WaybillVendorID = model.VendorIDUnknown s.CurOrderManager.UpdateWaybillVendorID(bill) savedOrderInfo.order.WaybillVendorID = bill.WaybillVendorID } case model.WaybillStatusDelivering: if savedOrderInfo.order.VendorID != bill.WaybillVendorID { s.GetPurchasePlatformFromVendorID(bill.OrderVendorID).SelfDeliverDelievering(savedOrderInfo.order) } case model.WaybillStatusDelivered: if savedOrderInfo.order.VendorID != bill.WaybillVendorID { s.GetPurchasePlatformFromVendorID(bill.OrderVendorID).SelfDeliverDelievered(savedOrderInfo.order) } s.removeWaybillFromMap(savedOrderInfo, bill) } } } return nil } func (s *DefScheduler) addWaybill2Map(savedOrderInfo *WatchOrderInfo, bill *model.Waybill) { for _, v := range savedOrderInfo.waybills { if v.VendorWaybillID == bill.VendorWaybillID && v.WaybillVendorID == bill.WaybillVendorID { // 如果已经存在,不做处理 // globals.SugarLogger.Infof("addWaybill2Map bill:%v already exists", bill) return } } savedOrderInfo.waybills = append(savedOrderInfo.waybills, bill) } func (s *DefScheduler) createWaybillOn3rdProviders(order *model.GoodsOrder, excludeBill *model.Waybill) (err error) { globals.SugarLogger.Debugf("createWaybillOn3rdProviders, order:%v", order) successCount := 0 for k, v := range s.DeliveryPlatformHandlers { if excludeBill == nil || k != excludeBill.WaybillVendorID { if err = v.CreateWaybill(order); err == nil { successCount++ } } } if successCount != 0 { return nil } return scheduler.ErrCanNotCreateAtLeastOneWaybill } func (s *DefScheduler) cancelOtherWaybills(savedOrderInfo *WatchOrderInfo, bill *model.Waybill) (err error) { globals.SugarLogger.Debugf("cancelOtherWaybills, order:%v, bill:%v", savedOrderInfo.order, bill) for _, v := range savedOrderInfo.waybills { if v.WaybillVendorID != bill.OrderVendorID && !(v.WaybillVendorID == bill.WaybillVendorID && v.VendorWaybillID == bill.VendorWaybillID) { _ = s.GetDeliveryPlatformFromVendorID(v.WaybillVendorID).CancelWaybill(v) } } if bill.WaybillVendorID != bill.OrderVendorID { s.swtich2SelfDeliverWithRetry(bill, 2, 10*time.Second) } return nil } func (s *DefScheduler) swtich2SelfDeliverWithRetry(bill *model.Waybill, retryCount int, duration time.Duration) { utils.CallFuncRetryAsync(func(index int) error { err := s.GetPurchasePlatformFromVendorID(bill.OrderVendorID).Swtich2SelfDeliver(bill.VendorOrderID) if err != nil && index == 0 { // 如果购买平台转商家自送失败,最终还是要取消3方物流 s.GetDeliveryPlatformFromVendorID(bill.WaybillVendorID).CancelWaybill(bill) } return err }, duration, retryCount) } func (s *DefScheduler) loadWatchOrderFromMap(vendorOrderID string, vendorID int) *WatchOrderInfo { universalOrderID := jxutils.ComposeUniversalOrderID(vendorOrderID, vendorID) var realSavedInfo *WatchOrderInfo if savedInfo, ok := s.orderMap.Load(universalOrderID); ok { realSavedInfo = savedInfo.(*WatchOrderInfo) } else { globals.SugarLogger.Infof("can not get saved order, vendorOrderID:%s, vendorID:%d, load it", vendorOrderID, vendorID) if order, err := s.CurOrderManager.LoadOrder(vendorOrderID, vendorID); err == nil { realSavedInfo = &WatchOrderInfo{ order: order, } s.orderMap.Store(universalOrderID, realSavedInfo) } else { globals.SugarLogger.Errorf("can not load order vendorOrderID:%s, vendorID:%d", vendorOrderID, vendorID) } } return realSavedInfo } func (s *DefScheduler) removeWaybillFromMap(savedOrderInfo *WatchOrderInfo, bill *model.Waybill) { if savedOrderInfo == nil { savedOrderInfo = s.loadWatchOrderFromMap(bill.VendorOrderID, bill.OrderVendorID) } if savedOrderInfo != nil { for k, v := range savedOrderInfo.waybills { if v.VendorWaybillID == bill.VendorWaybillID && v.WaybillVendorID == bill.WaybillVendorID { savedOrderInfo.waybills = append(savedOrderInfo.waybills[0:k], savedOrderInfo.waybills[k+1:]...) break } } } } func (s *DefScheduler) getLatestPickupTimeout(order *model.GoodsOrder, configTimeout time.Duration) (retVal time.Duration) { beginTime := order.StatusTime if order.ExpectedDeliveredTime != utils.DefaultTimeValue { beginTime = order.ExpectedDeliveredTime.Add(-defTime2Delivered) } return jxutils.GetRealTimeout(beginTime, configTimeout) } func (s *DefScheduler) stopTimer(savedOrderInfo *WatchOrderInfo) { if savedOrderInfo.timer != nil { globals.SugarLogger.Debugf("stopTimer orderid:%v", savedOrderInfo.order.VendorOrderID) savedOrderInfo.timer.Stop() } } func (s *DefScheduler) resetTimer(status int, savedOrderInfo *WatchOrderInfo, gap time.Duration) { globals.SugarLogger.Debugf("resetTimer status:%v, orderid:%v", status, savedOrderInfo.order.VendorOrderID) s.stopTimer(savedOrderInfo) config := s.mergeOrderStatusConfig(status, s.GetPurchasePlatformFromVendorID(savedOrderInfo.order.VendorID).GetStatusActionConfig(status)) if config != nil && config.TimeoutAction != nil { var timeout time.Duration if status == model.OrderStatusNew { timeout = config.Timeout // 绝对值 } else if status == model.OrderStatusAccepted { timeout = s.getLatestPickupTimeout(savedOrderInfo.order, config.Timeout) } else { timeout = jxutils.GetRealTimeout(savedOrderInfo.order.StatusTime, config.Timeout) } timeout += gap globals.SugarLogger.Debugf("resetTimer timeout:%v, orderid:%v", timeout, savedOrderInfo.order.VendorOrderID) savedOrderInfo.timerStatus = status savedOrderInfo.timer = time.AfterFunc(timeout, func() { config.TimeoutAction(savedOrderInfo.order) }) } } func (s *DefScheduler) handleAutoAcceptOrder(orderID string, vendorID int, userMobile string, jxStoreID int, db orm.Ormer, handler func(accepted bool) error) int { handleType := 0 if userMobile != "" { if db == nil { db = orm.NewOrm() } user := &models.BlackClient{ Mobile: userMobile, } if err := db.Read(user, "Mobile"); err != nil { if err != orm.ErrNoRows { globals.SugarLogger.Errorf("read data error:%v, data:%v, vendorID:%d", err, user, vendorID) } // 在访问数据库出错的情况下,也需要自动接单 handleType = 1 } else { // 强制拒单 globals.SugarLogger.Infof("force reject order:%s, vendorID:%d", orderID, vendorID) handleType = -1 } } else { globals.SugarLogger.Infof("order:%s, vendorID:%d, mobile is empty, should accept order", orderID, vendorID) handleType = 1 } if handleType == 1 { handler(true) } else if handleType == -1 { handler(false) } return handleType } func (s *DefScheduler) mergeOrderStatusConfig(status int, config *scheduler.StatusActionConfig) (retVal *scheduler.StatusActionConfig) { defConfig := s.defWorkflowConfig[status] if defConfig == nil && config == nil { return nil } retVal = &scheduler.StatusActionConfig{} if defConfig != nil { retVal.Timeout = defConfig.Timeout retVal.TimeoutAction = defConfig.TimeoutAction } if config != nil { if config.Timeout >= 0 { retVal.Timeout = config.Timeout } if config.TimeoutAction != nil { retVal.TimeoutAction = config.TimeoutAction } } return retVal } func (s *DefScheduler) updateOrderByStatus(order *model.GoodsOrder, status *model.OrderStatus) (retVal *model.GoodsOrder) { order.Status = status.Status order.VendorStatus = status.VendorStatus order.StatusTime = status.StatusTime return order }