package defsch import ( "time" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxutils" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/scheduler" "git.rosy.net.cn/jx-callback/globals" "git.rosy.net.cn/jx-callback/legacy/models" "github.com/astaxie/beego/orm" ) const ( defTime2Delivery = 1 * time.Hour ) type WatchOrderInfo struct { order *model.GoodsOrder waybills []*model.Waybill timer *time.Timer } // 重要:此调度器要求同一定单的处理逻辑必须是序列化了的,不然会有并发问题 type DefScheduler struct { scheduler.BaseScheduler defWorkflowConfig map[int]*scheduler.StatusActionConfig orderMap jxutils.SyncMapWithTimeout } func init() { sch := &DefScheduler{} sch.Init() scheduler.CurrentScheduler = sch sch.defWorkflowConfig = map[int]*scheduler.StatusActionConfig{ model.OrderStatusNew: &scheduler.StatusActionConfig{ // 自动接单 Timeout: 1 * time.Second, TimeoutAction: func(order *model.GoodsOrder) (err error) { _ = sch.handleAutoAcceptOrder(order.VendorOrderID, order.VendorID, order.ConsigneeMobile, jxutils.GetJxStoreIDFromOrder(order), nil, func(isAcceptIt bool) error { return sch.GetPurchasePlatformFromVendorID(order.VendorID).AcceptOrRefuseOrder(order, isAcceptIt) }) return nil }, }, model.OrderStatusAccepted: &scheduler.StatusActionConfig{ // 20分钟后自动拣货 Timeout: 20 * time.Minute, TimeoutAction: func(order *model.GoodsOrder) (err error) { return sch.GetPurchasePlatformFromVendorID(order.VendorID).PickedUpGoods(order) }, }, model.OrderStatusFinishedPickup: &scheduler.StatusActionConfig{ // 拣货完成8分钟后开始在外部快递平台创建运单 Timeout: 8 * time.Minute, TimeoutAction: func(order *model.GoodsOrder) (err error) { return sch.createWaybillOn3rdProviders(order) }, }, } } // 以下是订单 func (s *DefScheduler) OnOrderNew(order *model.GoodsOrder) (err error) { config := s.mergeOrderStatusConfig(order.Status, s.GetPurchasePlatformFromVendorID(order.VendorID).GetStatusActionConfig(order.Status)) watchInfo := &WatchOrderInfo{ order: order, timer: time.AfterFunc(jxutils.GetRealTimeout(order.OrderCreatedAt, config.Timeout), func() { config.TimeoutAction(order) }), } s.orderMap.Store(jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID), watchInfo) return err } func (s *DefScheduler) OnOrderStatusChanged(status *model.OrderStatus) (err error) { if savedOrderInfo := s.loadWatchOrderFromMap(status.VendorOrderID, status.VendorID); savedOrderInfo != nil { if savedOrderInfo.timer != nil { savedOrderInfo.timer.Stop() } if status.Status > model.OrderStatusUnknown && status.Status < model.OrderStatusEndBegin { s.updateOrderByStatus(savedOrderInfo.order, status) config := s.mergeOrderStatusConfig(status.Status, s.GetPurchasePlatformFromVendorID(status.VendorID).GetStatusActionConfig(status.Status)) if config != nil && config.TimeoutAction != nil { var timeout time.Duration if status.Status == model.OrderStatusAccepted { savedOrderInfo := s.loadWatchOrderFromMap(status.VendorOrderID, status.VendorID) timeout = s.getLatestPickupTimeout(savedOrderInfo.order, config.Timeout) } else { timeout = jxutils.GetRealTimeout(savedOrderInfo.order.StatusTime, config.Timeout) } savedOrderInfo.timer = time.AfterFunc(timeout, func() { config.TimeoutAction(savedOrderInfo.order) }) } } else { s.orderMap.Delete(jxutils.GetUniversalOrderIDFromOrderStatus(status)) } } return err } // 以下是运单 func (s *DefScheduler) OnWaybillStatusChanged(bill *model.Waybill) (err error) { savedOrderInfo := s.loadWatchOrderFromMap(bill.VendorOrderID, bill.OrderVendorID) if bill.Status == model.WaybillStatusNew { err = s.addWaybill2Map(bill) } else { findIt := false for _, v := range savedOrderInfo.waybills { if v.VendorWaybillID == bill.VendorWaybillID && v.WaybillVendorID == bill.WaybillVendorID { findIt = true break } } if findIt { switch bill.Status { case model.WaybillStatusAccepted: s.cancelOtherWaybills(bill) if bill.WaybillVendorID != bill.OrderVendorID { s.swtich2SelfDeliverWithRetry(bill, 2, 10*time.Second) } s.CurOrderManager.UpdateWaybillVendorID(bill) savedOrderInfo.order.WaybillVendorID = bill.WaybillVendorID case model.WaybillStatusAcceptCanceled, model.WaybillStatusCanceled, model.WaybillStatusFailed: s.removeWaybillFromMap(bill, savedOrderInfo) if savedOrderInfo.order.WaybillVendorID == bill.WaybillVendorID { s.createWaybillOn3rdProviders(savedOrderInfo.order) bill.WaybillVendorID = model.VendorIDUnknown s.CurOrderManager.UpdateWaybillVendorID(bill) } case model.WaybillStatusDelivering: s.GetPurchasePlatformFromVendorID(bill.OrderVendorID).SelfDeliverDelievering(savedOrderInfo.order) case model.WaybillStatusDelivered: s.GetPurchasePlatformFromVendorID(bill.OrderVendorID).SelfDeliverDelievered(savedOrderInfo.order) s.removeWaybillFromMap(bill, savedOrderInfo) } } else { globals.SugarLogger.Infof("OnWaybillStatusChanged can not find bill:%v in saved info", bill) } } return nil } func (s *DefScheduler) addWaybill2Map(bill *model.Waybill) (err error) { savedOrderInfo := s.loadWatchOrderFromMap(bill.VendorOrderID, bill.OrderVendorID) for _, v := range savedOrderInfo.waybills { if v.VendorWaybillID == bill.VendorWaybillID && v.WaybillVendorID == bill.WaybillVendorID { // 如果已经存在,不做处理 globals.SugarLogger.Infof("addWaybill2Map bill:%v already exists", bill) return nil } } savedOrderInfo.waybills = append(savedOrderInfo.waybills, bill) return nil } func (s *DefScheduler) onWaybillFailed(bill *model.Waybill) (err error) { savedOrderInfo := s.loadWatchOrderFromMap(bill.VendorOrderID, bill.OrderVendorID) s.removeWaybillFromMap(bill, savedOrderInfo) if len(savedOrderInfo.waybills) == 0 { s.createWaybillOn3rdProviders(savedOrderInfo.order) } return nil } func (s *DefScheduler) createWaybillOn3rdProviders(order *model.GoodsOrder) (err error) { successCount := 0 for _, v := range s.DeliveryPlatformHandlers { if err = v.CreateWaybill(order); err == nil { successCount++ } } if successCount != 0 { return nil } return scheduler.ErrCanNotCreateAtLeastOneWaybill } func (s *DefScheduler) cancelOtherWaybills(bill *model.Waybill) (err error) { savedOrderInfo := s.loadWatchOrderFromMap(bill.VendorOrderID, bill.OrderVendorID) for _, v := range savedOrderInfo.waybills { if !(v.WaybillVendorID == bill.WaybillVendorID && v.VendorWaybillID == bill.VendorWaybillID) { _ = s.GetDeliveryPlatformFromVendorID(v.WaybillVendorID).CancelWaybill(v) } } return nil } func (s *DefScheduler) swtich2SelfDeliverWithRetry(bill *model.Waybill, retryCount int, duration time.Duration) { if err := s.GetPurchasePlatformFromVendorID(bill.OrderVendorID).Swtich2SelfDeliver(bill.VendorOrderID); err != nil { if retryCount >= 0 { time.AfterFunc(duration, func() { s.swtich2SelfDeliverWithRetry(bill, retryCount-1, duration) }) } else { // 如果购买平台转商家自送失败,最终还是要取消3方物流 s.GetDeliveryPlatformFromVendorID(bill.WaybillVendorID).CancelWaybill(bill) } } else { s.removeWaybillFromMap(bill, nil) } } func (s *DefScheduler) loadWatchOrderFromMap(vendorOrderID string, vendorID int) *WatchOrderInfo { universalOrderID := jxutils.ComposeUniversalOrderID(vendorOrderID, vendorID) var realSavedInfo *WatchOrderInfo if savedInfo, ok := s.orderMap.Load(universalOrderID); ok { realSavedInfo = savedInfo.(*WatchOrderInfo) } else { globals.SugarLogger.Infof("can not get saved order, vendorOrderID:%s, vendorID:%d, load it", vendorOrderID, vendorID) if order, err := s.CurOrderManager.LoadOrder(vendorOrderID, vendorID); err == nil { realSavedInfo = &WatchOrderInfo{ order: order, } s.orderMap.Store(universalOrderID, realSavedInfo) } else { globals.SugarLogger.Errorf("can not load order vendorOrderID:%s, vendorID:%d", vendorOrderID, vendorID) } } return realSavedInfo } func (s *DefScheduler) removeWaybillFromMap(bill *model.Waybill, savedOrderInfo *WatchOrderInfo) { if savedOrderInfo == nil { savedOrderInfo = s.loadWatchOrderFromMap(bill.VendorOrderID, bill.OrderVendorID) } for k, v := range savedOrderInfo.waybills { if v.VendorWaybillID == bill.VendorWaybillID && v.WaybillVendorID == bill.WaybillVendorID { savedOrderInfo.waybills = append(savedOrderInfo.waybills[0:k], savedOrderInfo.waybills[k+1:]...) break } } } func (s *DefScheduler) getLatestPickupTimeout(order *model.GoodsOrder, configTimeout time.Duration) (retVal time.Duration) { beginTime := order.StatusTime if order.ExpectedDeliveredTime != utils.DefaultTimeValue { beginTime = order.ExpectedDeliveredTime.Add(-defTime2Delivery) } return jxutils.GetRealTimeout(beginTime, configTimeout) } func (s *DefScheduler) handleAutoAcceptOrder(orderID string, vendorID int, userMobile string, jxStoreID int, db orm.Ormer, handler func(accepted bool) error) int { handleType := 0 if userMobile != "" { if db == nil { db = orm.NewOrm() } user := &models.BlackClient{ Mobile: userMobile, } if err := db.Read(user, "Mobile"); err != nil { if err != orm.ErrNoRows { globals.SugarLogger.Errorf("read data error:%v, data:%v, vendorID:%d", err, user, vendorID) } // 在访问数据库出错的情况下,也需要自动接单 handleType = 1 } else { // 强制拒单 globals.SugarLogger.Infof("force reject order:%s, vendorID:%d", orderID, vendorID) handleType = -1 } } else { globals.SugarLogger.Infof("order:%s, vendorID:%d, mobile is empty, should accept order", orderID, vendorID) handleType = 1 } if handleType == 1 { handler(true) } else if handleType == -1 { handler(false) } return handleType } func (s *DefScheduler) mergeOrderStatusConfig(status int, config *scheduler.StatusActionConfig) (retVal *scheduler.StatusActionConfig) { defConfig := s.defWorkflowConfig[status] if defConfig == nil && config == nil { return nil } retVal = &scheduler.StatusActionConfig{} if defConfig != nil { retVal.Timeout = defConfig.Timeout retVal.TimeoutAction = defConfig.TimeoutAction } if config != nil { if config.Timeout >= 0 { retVal.Timeout = config.Timeout } if config.TimeoutAction != nil { retVal.TimeoutAction = config.TimeoutAction } } return retVal } func (s *DefScheduler) updateOrderByStatus(order *model.GoodsOrder, status *model.OrderStatus) (retVal *model.GoodsOrder) { order.Status = status.Status order.VendorStatus = status.VendorStatus order.StatusTime = status.StatusTime return order }