package defsch import ( "errors" "fmt" "math/rand" "time" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxcallback/scheduler" "git.rosy.net.cn/jx-callback/business/jxcallback/scheduler/basesch" "git.rosy.net.cn/jx-callback/business/jxutils" "git.rosy.net.cn/jx-callback/business/jxutils/weixinmsg" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model/dao" "git.rosy.net.cn/jx-callback/business/model/legacymodel" "git.rosy.net.cn/jx-callback/business/model/legacymodel2" "git.rosy.net.cn/jx-callback/business/partner" "git.rosy.net.cn/jx-callback/globals" "github.com/astaxie/beego/orm" ) const ( time2Delivered = 1 * time.Hour // 正常从下单到送达的时间。 time2Schedule3rdCarrier = 30 * time.Minute // 京东要求5分钟后才能转自送,保险起见,设置为5分半钟 // time2Schedule3rdCarrierGap4OrderStatus = 3 * time.Minute // 京东要求是运单状态为待抢单且超时5分钟,但为了防止没有运单事件,所以就拣货完成事件开始算,添加3分钟 time2AutoPickupMin = 15 * time.Minute time2AutoPickupGap = 5 * 60 //随机5分钟 // (把pending order timerout 在-pendingOrderTimerMinMinSecond至pendingOrderTimerMaxSecond映射到pendingOrderTimerMinSecond至pendingOrderTimerMaxSecond) pendingOrderTimerMinMinSecond = 5 * 60 // 5分钟 pendingOrderTimerMinSecond = 2 pendingOrderTimerMaxSecond = 5 maxWaybillRetryCount = 3 orderMapStoreMaxTime = 4 * 24 * time.Hour // cache最长存储时间 ) const ( maxAddFee = 200 // 最大增加费用,单位为分,超过不发三方配送了 ) var ( ErrAddFeeExceeded = errors.New("配送超过基准价太多") ) var ( FixedScheduler *DefScheduler ) type WatchOrderInfo struct { autoPickupTimeoutMinute int // 0表示禁用,1表示用缺省值time2AutoPickupMin,其它表示分钟数 storeDeliveryType int supported3rdCarriers []int order *model.GoodsOrder // order里的信息是保持更新的 waybills map[int]*model.Waybill // 这个waybills里的状态信息是不真实的,只使用id相关的信息 timerStatusType int // 0表示订单,1表示运单 timerStatus int timer *time.Timer retryCount int // 失败后尝试的次数,调试阶段可能出现死循化,阻止这种情况发生 } type StatusActionConfig struct { TimerType int // 参见上面的相关常量定义 Timeout time.Duration // 超时时间,0在GetStatusActionConfig返回时表示不修改缺省 TimeoutGap int // 以秒为单位的随机时间,0在GetStatusActionConfig返回时表示不修改缺省 TimeoutAction func(savedOrderInfo *WatchOrderInfo) (err error) // 超时后需要执行的动作,为nil表示此状态不需要执行监控, nil在GetStatusActionConfig返回时表示不修改缺省 } // 重要:此调度器要求同一定单的处理逻辑必须是序列化了的,不然会有并发问题 type DefScheduler struct { basesch.BaseScheduler defWorkflowConfig []map[int]*StatusActionConfig orderMap jxutils.SyncMapWithTimeout } func NewWatchOrderInfo(order *model.GoodsOrder) (retVal *WatchOrderInfo) { retVal = &WatchOrderInfo{ autoPickupTimeoutMinute: 1, storeDeliveryType: scheduler.StoreDeliveryTypeCrowdSourcing, waybills: map[int]*model.Waybill{}, supported3rdCarriers: []int{}, } retVal.SetOrder(order) return retVal } func (s *WatchOrderInfo) SetOrder(order *model.GoodsOrder) (retVal *model.GoodsOrder) { retVal = s.order if s.order != order { if order != nil { s.updateOrderStoreFeature(order) } s.order = order } return retVal } func (s *WatchOrderInfo) updateOrderStoreFeature(order *model.GoodsOrder) (err error) { globals.SugarLogger.Debugf("updateOrderStoreFeature orderID:%s", order.VendorOrderID) jxStoreID := jxutils.GetJxStoreIDFromOrder(order) if globals.OrderUseNewTable || jxStoreID == globals.DebugStoreID { if jxStoreID > 0 { db := dao.GetDB() storeMap, err2 := dao.GetStoreMapByStoreID(db, jxStoreID, order.VendorID) if err = err2; err != nil { return err } s.autoPickupTimeoutMinute = int(storeMap.AutoPickup) s.storeDeliveryType = int(storeMap.DeliveryType) if s.storeDeliveryType == scheduler.StoreDeliveryTypeByStore { order.DeliveryFlag |= model.OrderDeliveryFlagMaskPurcahseDisabled } isNeedSchedule := s.storeDeliveryType == scheduler.StoreDeliveryTypeByStore || storeMap.DeliveryCompetition != 0 if isNeedSchedule { vendorList, err2 := dao.GetStoreCouriersByStoreID(db, jxStoreID, -1) if err = err2; err != nil { return err } for _, v := range vendorList { s.supported3rdCarriers = append(s.supported3rdCarriers, v.VendorID) } if len(s.supported3rdCarriers) == 0 { isNeedSchedule = false } } if !isNeedSchedule { order.DeliveryFlag |= model.OrderDeliveryFlagMaskScheduleDisabled } } return err } storefeature := &legacymodel2.Jxstorefeature{ Id: jxStoreID, } if storefeature.Id > 0 { db := orm.NewOrm() utils.CallFuncLogError(func() error { err = db.Read(storefeature, "Id") if err == nil { s.autoPickupTimeoutMinute = int(storefeature.Autopickup) if order.VendorID == model.VendorIDELM || order.VendorID == model.VendorIDEBAI { s.storeDeliveryType = int(storefeature.ElmDeliveryType) } else if order.VendorID == model.VendorIDJD { s.storeDeliveryType = int(storefeature.JdDeliveryType) } if s.storeDeliveryType == scheduler.StoreDeliveryTypeByStore { order.DeliveryFlag |= model.OrderDeliveryFlagMaskPurcahseDisabled } isNeedSchedule := false if (s.storeDeliveryType == scheduler.StoreDeliveryTypeByStore) || (order.VendorID == model.VendorIDJD && storefeature.JdCompetition != 0) || ((order.VendorID == model.VendorIDELM || order.VendorID == model.VendorIDEBAI) && storefeature.ElmCompetition != 0) { isNeedSchedule = true } if isNeedSchedule { if storefeature.SupportMtps != 0 { s.supported3rdCarriers = append(s.supported3rdCarriers, model.VendorIDMTPS) } if storefeature.SupportDada != 0 { s.supported3rdCarriers = append(s.supported3rdCarriers, model.VendorIDDada) } if storefeature.SupportFengNiao != 0 { s.supported3rdCarriers = append(s.supported3rdCarriers, model.VendorIDFengNiao) } if len(s.supported3rdCarriers) == 0 { isNeedSchedule = false globals.SugarLogger.Infof("updateOrderStoreFeature orderID:%s no at least one carrier supported", order.VendorOrderID) } } if !isNeedSchedule { order.DeliveryFlag |= model.OrderDeliveryFlagMaskScheduleDisabled } } return err }, "updateOrderStoreFeature") } return err } func init() { sch := &DefScheduler{} basesch.FixedBaseScheduler = &sch.BaseScheduler FixedScheduler = sch sch.IsReallyCallPlatformAPI = globals.ReallyCallPlatformAPI scheduler.CurrentScheduler = sch sch.defWorkflowConfig = []map[int]*StatusActionConfig{ map[int]*StatusActionConfig{ model.OrderStatusNew: &StatusActionConfig{ // 自动接单 TimerType: scheduler.TimerTypeBaseStatusTime, Timeout: 1 * time.Second, TimeoutAction: func(savedOrderInfo *WatchOrderInfo) (err error) { order := savedOrderInfo.order _ = sch.handleAutoAcceptOrder(order.VendorOrderID, order.VendorID, order.ConsigneeMobile, jxutils.GetJxStoreIDFromOrder(order), nil, func(isAcceptIt bool) error { if err = sch.AcceptOrRefuseOrder(order, isAcceptIt, ""); err != nil && err != scheduler.ErrOrderStatusAlreadySatisfyCurOperation { // 为了解决京东新消息与接单消息乱序的问题 if errWithCode, ok := err.(*utils.ErrorWithCode); ok && errWithCode.Level() == 1 && errWithCode.IntCode() == -1 { if order2, err2 := partner.GetPurchasePlatformFromVendorID(order.VendorID).GetOrder(order.VendorOrderID); err2 == nil && order2.Status > order.Status { sch.OnOrderStatusChanged(model.Order2Status(order2), false) err = nil } else { err = err2 } } } return err }) return nil }, }, model.OrderStatusAccepted: &StatusActionConfig{ // 自动拣货 TimerType: scheduler.TimerTypeBaseExpectedDeliveredTime, Timeout: time2AutoPickupMin, // 此值会被门店设置覆盖 TimeoutGap: time2AutoPickupGap, TimeoutAction: func(savedOrderInfo *WatchOrderInfo) (err error) { if savedOrderInfo.autoPickupTimeoutMinute > 0 { return sch.autoPickupGood(savedOrderInfo.order) } return nil }, }, model.OrderStatusFinishedPickup: &StatusActionConfig{ TimerType: scheduler.TimerTypeBaseStatusTime, Timeout: 1 * time.Second, TimeoutGap: 0, TimeoutAction: func(savedOrderInfo *WatchOrderInfo) (err error) { if savedOrderInfo.storeDeliveryType == scheduler.StoreDeliveryTypeByStore { // 自配送商家使用 return sch.createWaybillOn3rdProviders(savedOrderInfo, nil) } return nil }, }, }, map[int]*StatusActionConfig{ model.WaybillStatusNew: &StatusActionConfig{ TimerType: scheduler.TimerTypeBaseStatusTime, Timeout: time2Schedule3rdCarrier, TimeoutAction: func(savedOrderInfo *WatchOrderInfo) (err error) { if savedOrderInfo.storeDeliveryType != scheduler.StoreDeliveryTypeByStore { // 非自配置商家使用 return sch.createWaybillOn3rdProviders(savedOrderInfo, nil) } return nil }, }, }, } } // 以下是订单 func (s *DefScheduler) OnOrderNew(order *model.GoodsOrder, isPending bool) (err error) { globals.SugarLogger.Debugf("OnOrderNew orderID:%s", order.VendorOrderID) savedOrderInfo := s.loadSavedOrderFromMap(model.Order2Status(order), false) if savedOrderInfo == nil { savedOrderInfo = NewWatchOrderInfo(order) s.orderMap.StoreWithTimeout(jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID), savedOrderInfo, orderMapStoreMaxTime) } else { savedOrderInfo.SetOrder(order) // 调整单或消息错序都可能进到这里来 } s.resetTimer(savedOrderInfo, nil, isPending) if !isPending { weixinmsg.NotifyNewOrder(order) } return err } // todo 这个接口应该可以直接传order的,因为在OrderManager中每次都生成了 func (s *DefScheduler) OnOrderStatusChanged(status *model.OrderStatus, isPending bool) (err error) { if status.LockStatus != model.OrderStatusUnknown || status.Status > model.OrderStatusUnknown { globals.SugarLogger.Debugf("OnOrderStatusChanged orderID:%s %s, status:%v", status.VendorOrderID, model.OrderStatusName[status.Status], status) savedOrderInfo := s.loadSavedOrderFromMap(status, true) s.updateOrderByStatus(savedOrderInfo.order, status) if status.LockStatus == model.OrderStatusUnknown && status.Status > model.OrderStatusUnknown { // 只处理状态转换,一般消息不处理 s.resetTimer(savedOrderInfo, nil, isPending) if status.Status >= model.OrderStatusEndBegin { curWaybill := savedOrderInfo.waybills[savedOrderInfo.order.WaybillVendorID] if status.Status == model.OrderStatusDelivered || status.Status == model.OrderStatusFinished { if curWaybill != nil && curWaybill.WaybillVendorID != curWaybill.OrderVendorID { globals.SugarLogger.Infof("OnOrderStatusChanged [运营2]订单orderID:%s可能被手动点击送达,会对程序状态产生不利影响,请通知门店不要这样操作!", status.VendorOrderID) } } s.cancelOtherWaybills(savedOrderInfo, curWaybill, partner.CancelWaybillReasonOther, partner.CancelWaybillReasonStrOrderAlreadyFinished) s.orderMap.Delete(jxutils.GetUniversalOrderIDFromOrderStatus(status)) } } else if status.LockStatus != model.OrderStatusUnknown { s.stopTimer(savedOrderInfo) } } return err } // 以下是运单 func (s *DefScheduler) OnWaybillStatusChanged(bill *model.Waybill, isPending bool) (err error) { if bill.Status > model.WaybillStatusUnknown { globals.SugarLogger.Debugf("OnWaybillStatusChanged orderID:%s %s, bill:%v", bill.VendorOrderID, model.WaybillStatusName[bill.Status], bill) savedOrderInfo := s.loadSavedOrderFromMap(model.Waybill2Status(bill), true) order := savedOrderInfo.order // todo 当前收到的事件顺序有时是乱的,不能严格限制,暂时放开 // if order.Status < model.OrderStatusFinishedPickup || order.Status > model.OrderStatusEndBegin { // 如果当前order状态是不应该出现运单状态 // globals.SugarLogger.Infof("OnWaybillStatusChanged orderID:%s status:%s is not suitable for waybill", order.VendorOrderID, model.OrderStatusName[order.Status]) // s.ProxyCancelWaybill(order, bill) // s.stopTimer(savedOrderInfo) // s.orderMap.Delete(jxutils.GetUniversalOrderIDFromOrderStatus(model.Order2Status(order))) // return nil // } // if (order.DeliveryFlag & model.OrderDeliveryFlagMaskScheduleDisabled) == 0 { // 如果被停止调度,整个不动作 if bill.Status == model.WaybillStatusNew { s.addWaybill2Map(savedOrderInfo, bill) if !isPending { if order.Status > model.OrderStatusEndBegin { s.ProxyCancelWaybill(order, bill, partner.CancelWaybillReasonNotAcceptIntime, partner.CancelWaybillReasonStrNotAcceptIntime) } else if order.WaybillVendorID != model.VendorIDUnknown { globals.SugarLogger.Debugf("OnWaybillStatusChanged multiple waybill created, bill:%v", bill) if order.VendorID == bill.WaybillVendorID { // 是购物平台运单 if order.VendorID != order.WaybillVendorID { // 既有运单不是购物平台运单 globals.SugarLogger.Infof("OnWaybillStatusChanged bill:%v purchase platform bill came later than others, strange!!!", bill) oldBill := savedOrderInfo.waybills[order.WaybillVendorID] if oldBill != nil { s.ProxyCancelWaybill(order, oldBill, partner.CancelWaybillReasonNotAcceptIntime, partner.CancelWaybillReasonStrNotAcceptIntime) } else { globals.SugarLogger.Warnf("OnWaybillStatusChanged bill:%v, oldBill is null, strange!!!", bill) } } bill.WaybillVendorID = model.VendorIDUnknown s.updateOrderByBill(order, bill, false) } else { s.ProxyCancelWaybill(order, bill, partner.CancelWaybillReasonNotAcceptIntime, partner.CancelWaybillReasonStrNotAcceptIntime) } } } // 只有购物平台的新运单消息才会启动抢单TIMER if bill.OrderVendorID == bill.WaybillVendorID { s.resetTimer(savedOrderInfo, bill, isPending) } } else { switch bill.Status { case model.WaybillStatusAccepted: s.resetTimer(savedOrderInfo, bill, isPending) if !isPending { // todo 购买平台的运单,优先级最高,但这样写也可能带来问题,即在这个时间,因为之前3方已经接单,已经发出了转自送请求(而且可能成功了),所以加个状态判断 if order.WaybillVendorID == model.VendorIDUnknown || (order.VendorID == bill.WaybillVendorID && order.VendorID != order.WaybillVendorID && (order.DeliveryFlag&model.OrderDeliveryFlagMaskPurcahseDisabled) == 0) { if order.WaybillVendorID != model.VendorIDUnknown { // 进到这里的原因是,在这个时间点,购物平台物流已经抢单(但抢单消息还没有被收到)(比如:818810379000941) globals.SugarLogger.Infof("OnWaybillStatusChanged orderID:%s purchase platform waybill arrvied later, may case problem", order.VendorOrderID) } s.updateOrderByBill(order, bill, false) s.cancelOtherWaybills(savedOrderInfo, bill, partner.CancelWaybillReasonNotAcceptIntime, partner.CancelWaybillReasonStrNotAcceptIntime) if bill.WaybillVendorID != bill.OrderVendorID { if savedOrderInfo.storeDeliveryType == scheduler.StoreDeliveryTypeByStore { s.SelfDeliverDelivering(savedOrderInfo.order, "") } else { s.swtich2SelfDeliverWithRetry(savedOrderInfo, bill, 2, 10*time.Second) } } if !isPending { weixinmsg.NotifyWaybillStatus(bill, order) } } else if !s.isBillCandidate(order, bill) && bill.WaybillVendorID != order.VendorID { // 发生这种情况的原因就是两个接单事件几乎同时到达(来不及取消),也算正常 s.ProxyCancelWaybill(order, bill, partner.CancelWaybillReasonNotAcceptIntime, partner.CancelWaybillReasonStrNotAcceptIntime) globals.SugarLogger.Infof("OnWaybillStatusChanged Accepted orderID:%s got multiple bill:%v", order.VendorOrderID, bill) } } case model.WaybillStatusAcceptCanceled: if s.isBillCandidate(order, bill) { s.resetTimer(savedOrderInfo, bill, isPending) if !isPending { bill.WaybillVendorID = model.VendorIDUnknown s.updateOrderByBill(order, bill, false) s.createWaybillOn3rdProviders(savedOrderInfo, bill) } } else if order.WaybillVendorID != model.VendorIDUnknown { s.ProxyCancelWaybill(order, bill, partner.CancelWaybillReasonNotAcceptIntime, partner.CancelWaybillReasonStrNotAcceptIntime) globals.SugarLogger.Warnf("OnWaybillStatusChanged AcceptCanceled orderID:%s got multiple bill:%v, order details:%v", order.VendorOrderID, bill, order) } case model.WaybillStatusCourierArrived: // do nothing s.resetTimer(savedOrderInfo, bill, isPending) if s.isBillCandidate(order, bill) { } else { // s.ProxyCancelWaybill(order, bill, partner.CancelWaybillReasonNotAcceptIntime, partner.CancelWaybillReasonStrNotAcceptIntime) globals.SugarLogger.Infof("OnWaybillStatusChanged CourierArrived order(%d, %s) bill(%d, %s), bill:%v shouldn't get here", order.WaybillVendorID, order.VendorWaybillID, bill.WaybillVendorID, bill.VendorWaybillID, bill) } case model.WaybillStatusFailed: // todo WaybillStatusFailed理解成订单整个失败了,不需要再尝试创建运单了,注意这里应该加个zabbix日志的报警 s.removeWaybillFromMap(savedOrderInfo, bill.WaybillVendorID) if s.isBillCandidate(order, bill) { s.resetTimer(savedOrderInfo, bill, isPending) if !isPending { globals.SugarLogger.Infof("OnWaybillStatusChanged WaybillStatusFailed, bill:%v", bill) bill.WaybillVendorID = model.VendorIDUnknown s.updateOrderByBill(order, bill, false) } } else { // 创建运单失败时,可能到这里来(比如:818874313000121) globals.SugarLogger.Infof("OnWaybillStatusChanged Failed bill:%v shouldn't got here, order details:%v", bill, order) } case model.WaybillStatusCanceled: s.removeWaybillFromMap(savedOrderInfo, bill.WaybillVendorID) if s.isBillCandidate(order, bill) || order.WaybillVendorID == model.VendorIDUnknown { s.resetTimer(savedOrderInfo, bill, isPending) if !isPending { if order.WaybillVendorID != model.VendorIDUnknown { bill.WaybillVendorID = model.VendorIDUnknown s.updateOrderByBill(order, bill, false) } if bill.WaybillVendorID != order.VendorID { // 3方的运单取消才会重新发起创建3方订单,购物平台的运单取消后,它本身还会再创建新运单(NewWabill事件有相应TIMER)),至少京东是这样的,暂时按京东的行为来 s.createWaybillOn3rdProviders(savedOrderInfo, nil) } } } case model.WaybillStatusDelivering: s.resetTimer(savedOrderInfo, bill, isPending) if s.isBillCandidate(order, bill) { // do nothing } else { // s.ProxyCancelWaybill(order, bill, partner.CancelWaybillReasonNotAcceptIntime, partner.CancelWaybillReasonStrNotAcceptIntime) globals.SugarLogger.Infof("OnWaybillStatusChanged Delivering order(%d, %s) bill(%d, %s), bill:%v shouldn't get here", order.WaybillVendorID, order.VendorWaybillID, bill.WaybillVendorID, bill.VendorWaybillID, bill) } case model.WaybillStatusDelivered: s.resetTimer(savedOrderInfo, bill, isPending) s.removeWaybillFromMap(savedOrderInfo, bill.WaybillVendorID) if order.VendorID != bill.WaybillVendorID && !isPending { if savedOrderInfo.storeDeliveryType == scheduler.StoreDeliveryTypeByStore { s.SelfDeliverDelievered(order, "") } else { s.Swtich2SelfDelivered(order, "") } } if !s.isBillCandidate(order, bill) { // 一般只会消息乱序才会到这里,即新订单消息在运单接单消息后到达 // 典型的一个:1223633660228537567 globals.SugarLogger.Infof("OnWaybillStatusChanged Delivered order(%d, %s) bill(%d, %s), bill:%v shouldn't get here", order.WaybillVendorID, order.VendorWaybillID, bill.WaybillVendorID, bill.VendorWaybillID, bill) if order.WaybillVendorID == model.VendorIDUnknown { s.updateOrderByBill(order, bill, false) } } if !isPending { weixinmsg.NotifyWaybillStatus(bill, order) } } } // } } return err } func (s *DefScheduler) addWaybill2Map(savedOrderInfo *WatchOrderInfo, bill *model.Waybill) { if _, ok := savedOrderInfo.waybills[bill.WaybillVendorID]; ok { if bill.WaybillVendorID != bill.OrderVendorID { // 购买平台重复发相同号的新运单是正常的,京东就是 globals.SugarLogger.Warnf("addWaybill2Map bill:%v already exists", bill) } } savedOrderInfo.waybills[bill.WaybillVendorID] = bill } func (s *DefScheduler) removeWaybillFromMap(savedOrderInfo *WatchOrderInfo, waybillVendorID int) { if _, ok := savedOrderInfo.waybills[waybillVendorID]; ok { delete(savedOrderInfo.waybills, waybillVendorID) } } func (s *DefScheduler) createWaybillOn3rdProviders(savedOrderInfo *WatchOrderInfo, excludeBill *model.Waybill) (err error) { order := savedOrderInfo.order globals.SugarLogger.Debugf("createWaybillOn3rdProviders, orderID:%s, status:%d, excludeBill:%v", order.VendorOrderID, order.Status, excludeBill) if order.WaybillVendorID == model.VendorIDUnknown { if order.LockStatus == model.OrderStatusUnknown && order.Status >= model.OrderStatusFinishedPickup && order.Status < model.OrderStatusEndBegin { // 订单在配送中被取消时就是配送中状态 if (order.DeliveryFlag & model.OrderDeliveryFlagMaskScheduleDisabled) == 0 { if savedOrderInfo.retryCount <= maxWaybillRetryCount { successCount := 0 for _, vendorID := range savedOrderInfo.supported3rdCarriers { handlerInfo := partner.GetDeliveryPlatformFromVendorID(vendorID) if handlerInfo != nil && handlerInfo.Use4CreateWaybill && savedOrderInfo.waybills[vendorID] == nil && (excludeBill == nil || vendorID != excludeBill.WaybillVendorID) { if _, err = s.CreateWaybill(vendorID, order, func(deliveryFee, addFee int64) error { if addFee > maxAddFee { db := orm.NewOrm() globals.SugarLogger.Infof("CreateWaybill orderID:%s addFee exceeded too much, it's %d", order.VendorOrderID, addFee) tmpLog := &legacymodel.TempLog{ VendorOrderID: order.VendorOrderID, RefVendorOrderID: order.VendorOrderID, IntValue1: addFee, Msg: fmt.Sprintf("CreateWaybill orderID:%s addFee exceeded too much, it's %d", order.VendorOrderID, addFee), } db.Insert(tmpLog) return ErrAddFeeExceeded } return nil }); err == nil { successCount++ } } } if successCount != 0 { savedOrderInfo.retryCount++ err = nil } else { globals.SugarLogger.Infof("createWaybillOn3rdProviders, orderID:%s all failed", order.VendorOrderID) err = scheduler.ErrCanNotCreateAtLeastOneWaybill } } else { globals.SugarLogger.Infof("createWaybillOn3rdProviders [运营]同一订单orderID:%s尝试了%d次创建运单失败, 停止调度,如果还需要发单,请人工处理", order.VendorOrderID, savedOrderInfo.retryCount) tmpLog := &legacymodel.TempLog{ VendorOrderID: order.VendorOrderID, RefVendorOrderID: order.VendorOrderID, Msg: fmt.Sprintf("createWaybillOn3rdProviders, orderID:%s failed %d times, stop schedule", order.VendorOrderID, savedOrderInfo.retryCount), } db := orm.NewOrm() db.Insert(tmpLog) } } else { globals.SugarLogger.Debugf("createWaybillOn3rdProviders, orderID:%s, store:%d dont't support 3rd delivery platform", order.VendorOrderID, jxutils.GetJxStoreIDFromOrder(order)) } } else { globals.SugarLogger.Debugf("createWaybillOn3rdProviders, orderID:%s, status:%d doesn't match model.OrderStatusFinishedPickup, bypass", order.VendorOrderID, order.Status) } } else { globals.SugarLogger.Debugf("createWaybillOn3rdProviders, orderID:%s, waybillVendorID:%d, vendorWaybilID:%s is not empty, bypass", order.VendorOrderID, order.WaybillVendorID, order.VendorWaybillID) } return err } func (s *DefScheduler) cancelOtherWaybills(savedOrderInfo *WatchOrderInfo, bill2Keep *model.Waybill, cancelReasonID int, cancelReason string) (err error) { globals.SugarLogger.Debugf("cancelOtherWaybills, orderID:%s, bill:%v", savedOrderInfo.order.VendorOrderID, bill2Keep) if (savedOrderInfo.order.DeliveryFlag & model.OrderDeliveryFlagMaskScheduleDisabled) == 0 { toBeDeleted := []*model.Waybill{} for _, v := range savedOrderInfo.waybills { if (v.OrderVendorID != v.WaybillVendorID) && (bill2Keep == nil || !(v.WaybillVendorID == bill2Keep.WaybillVendorID && v.VendorWaybillID == bill2Keep.VendorWaybillID)) { err2 := s.ProxyCancelWaybill(savedOrderInfo.order, v, cancelReasonID, cancelReason) if err2 == nil { toBeDeleted = append(toBeDeleted, v) } // 至少返回一个错误 if err == nil && err2 != nil { err = err2 } } } for _, v := range toBeDeleted { s.removeWaybillFromMap(savedOrderInfo, v.WaybillVendorID) } } else { globals.SugarLogger.Debugf("cancelOtherWaybills, orderID:%s, bill:%v stop schedule", savedOrderInfo.order.VendorOrderID, bill2Keep) } return err } func (s *DefScheduler) swtich2SelfDeliverWithRetry(savedOrderInfo *WatchOrderInfo, bill *model.Waybill, retryCount int, duration time.Duration) { order := savedOrderInfo.order globals.SugarLogger.Debugf("swtich2SelfDeliverWithRetry orderID:%s", order.VendorOrderID) if (order.DeliveryFlag & model.OrderDeliveryFlagMaskPurcahseDisabled) == 0 { if order.WaybillVendorID != order.VendorID { if err := s.Swtich2SelfDeliver(order, ""); err != nil && err != scheduler.ErrOrderStatusAlreadySatisfyCurOperation { globals.SugarLogger.Infof("swtich2SelfDeliverWithRetry failed, bill:%v, err:%v", bill, err) if retryCount > 0 { time.AfterFunc(duration, func() { jxutils.CallMsgHandlerAsync(func() { s.swtich2SelfDeliverWithRetry(savedOrderInfo, bill, retryCount-1, duration) }, order.VendorOrderID) }) } else { globals.SugarLogger.Infof("swtich2SelfDeliverWithRetry finally failed, orderID:%s bill:%v, err:%v", order.VendorOrderID, bill, err) tmpLog := &legacymodel.TempLog{ VendorOrderID: bill.VendorWaybillID, RefVendorOrderID: order.VendorOrderID, Msg: fmt.Sprintf("swtich2SelfDeliverWithRetry finally failed, orderID:%s bill:%v, err:%v", order.VendorOrderID, bill, err), } db := orm.NewOrm() db.Insert(tmpLog) if s.ProxyCancelWaybill(order, bill, partner.CancelWaybillReasonSwitch2SelfFailed, partner.CancelWaybillReasonStrSwitch2SelfFailed) == nil { // 转自送失败的取消,要将订单中的运单状态更新 if s.isBillCandidate(order, bill) { bill.WaybillVendorID = model.VendorIDUnknown s.updateOrderByBill(order, bill, false) } } } } else { s.removeWaybillFromMap(savedOrderInfo, order.VendorID) partner.CurOrderManager.UpdateOrderStatusAndFlag(order) } } else { // 进到这里的原因是,在这个时间点,购物平台物流已经抢单(但抢单消息还没有被收到),所以转自送会失败 (比如:818810379000941),更好的做法应该是判断Swtich2SelfDeliver的返回值,这种情况下就不得试了 globals.SugarLogger.Infof("swtich2SelfDeliverWithRetry orderID:%s status is wrong(maybe purchase platform accepted waybill)", order.VendorOrderID) // globals.SugarLogger.Warnf("swtich2SelfDeliverWithRetry orderID:%s status is wrong, order details:%v", order.VendorOrderID, order) } } } // 这个函数这样写的原因是适应一些消息错序 func (s *DefScheduler) loadSavedOrderFromMap(status *model.OrderStatus, isAutoLoad bool) *WatchOrderInfo { globals.SugarLogger.Debugf("loadSavedOrderFromMap status:%v", status) universalOrderID := jxutils.ComposeUniversalOrderID(status.RefVendorOrderID, status.RefVendorID) var realSavedInfo *WatchOrderInfo if savedInfo, ok := s.orderMap.Load(universalOrderID); ok { realSavedInfo = savedInfo.(*WatchOrderInfo) } if isAutoLoad && (realSavedInfo == nil || !model.IsOrderSolid(realSavedInfo.order)) { if realSavedInfo == nil { realSavedInfo = NewWatchOrderInfo(nil) s.orderMap.StoreWithTimeout(universalOrderID, realSavedInfo, orderMapStoreMaxTime) } else { globals.SugarLogger.Infof("loadSavedOrderFromMap order is incomplete, orderID:%s, load it", status.RefVendorOrderID) } if order, err := partner.CurOrderManager.LoadOrder(status.RefVendorOrderID, status.RefVendorID); err == nil { realSavedInfo.SetOrder(order) } else { realSavedInfo.SetOrder(&model.GoodsOrder{ VendorOrderID: status.RefVendorOrderID, VendorID: status.RefVendorID, Status: status.Status, StatusTime: status.StatusTime, OrderCreatedAt: status.StatusTime, WaybillVendorID: model.VendorIDUnknown, }) globals.SugarLogger.Infof("loadSavedOrderFromMap can not load order orderID:%s", status.VendorOrderID) } } return realSavedInfo } func (s *DefScheduler) stopTimer(savedOrderInfo *WatchOrderInfo) { if savedOrderInfo.timer != nil { globals.SugarLogger.Debugf("stopTimer orderID:%s", savedOrderInfo.order.VendorOrderID) savedOrderInfo.timer.Stop() savedOrderInfo.timerStatus = 0 savedOrderInfo.timerStatusType = scheduler.TimerStatusTypeUnknown savedOrderInfo.timer = nil } } func (s *DefScheduler) resetTimer(savedOrderInfo *WatchOrderInfo, bill *model.Waybill, isPending bool) { order := savedOrderInfo.order status := order.Status statusType := scheduler.TimerStatusTypeOrder statusTime := order.StatusTime if bill != nil { status = bill.Status statusType = scheduler.TimerStatusTypeWaybill statusTime = bill.StatusTime } globals.SugarLogger.Debugf("resetTimer, orderID:%s statusType:%d status:%v", order.VendorOrderID, statusType, status) if statusType != savedOrderInfo.timerStatusType || status >= savedOrderInfo.timerStatus { // 新设置的TIMER不能覆盖状态在其后的TIMER,如果状态回绕,需要注意 config := s.mergeOrderStatusConfig(statusType, status, order.VendorID) if config == nil || config.TimerType != scheduler.TimerTypeByPass { s.stopTimer(savedOrderInfo) } if config != nil && config.TimeoutAction != nil && config.TimerType != scheduler.TimerTypeByPass { // 对于自动拣货,以订单中的设置为准 if statusType == scheduler.TimerStatusTypeOrder && status == model.OrderStatusAccepted && savedOrderInfo.autoPickupTimeoutMinute > 1 { config.Timeout = time.Duration(savedOrderInfo.autoPickupTimeoutMinute) * time.Minute } var timeout time.Duration switch config.TimerType { case scheduler.TimerTypeBaseNow: timeout = config.Timeout case scheduler.TimerTypeBaseStatusTime: timeout = statusTime.Sub(time.Now()) + config.Timeout case scheduler.TimerTypeBaseExpectedDeliveredTime: if order.BusinessType == model.BusinessTypeDingshida && order.ExpectedDeliveredTime != utils.DefaultTimeValue { statusTime = order.ExpectedDeliveredTime.Add(-time2Delivered) } timeout = statusTime.Sub(time.Now()) + config.Timeout default: panic("TimerType is wrong!!!") } if config.TimeoutGap != 0 { timeout += time.Duration(rand.Int31n(int32(config.TimeoutGap))) * time.Second } if isPending && timeout < pendingOrderTimerMaxSecond*time.Second { // 如果是PENDING的订单,则将其分布到2--5秒内,让后续事件有机会执行 timeout = time.Duration(jxutils.MapValue2Scope(int64(timeout), -pendingOrderTimerMinMinSecond*1000, pendingOrderTimerMaxSecond*1000, pendingOrderTimerMinSecond*1000, pendingOrderTimerMaxSecond*1000)) * time.Millisecond } else if timeout < 0 { timeout = 0 } if timeout == 0 { config.TimeoutAction(savedOrderInfo) } else { timerName := "" if statusType == scheduler.TimerStatusTypeOrder { timerName = model.OrderStatusName[status] } else if statusType == scheduler.TimerStatusTypeWaybill { timerName = model.WaybillStatusName[status] } savedOrderInfo.timerStatusType = statusType savedOrderInfo.timerStatus = status savedOrderInfo.timer = time.AfterFunc(timeout, func() { jxutils.CallMsgHandlerAsync(func() { globals.SugarLogger.Debugf("fire timer:%s, orderID:%s", timerName, order.VendorOrderID) config.TimeoutAction(savedOrderInfo) savedOrderInfo.timerStatus = 0 savedOrderInfo.timerStatusType = scheduler.TimerStatusTypeUnknown }, order.VendorOrderID) }) } globals.SugarLogger.Debugf("resetTimer, orderID:%s, status:%d, timeout:%v", order.VendorOrderID, status, timeout) } } } func (s *DefScheduler) handleAutoAcceptOrder(orderID string, vendorID int, userMobile string, jxStoreID int, db orm.Ormer, handler func(accepted bool) error) int { handleType := 0 if userMobile != "" { if db == nil { db = orm.NewOrm() } user := &legacymodel.BlackClient{ Mobile: userMobile, } if err := db.Read(user, "Mobile"); err != nil { if err != orm.ErrNoRows { globals.SugarLogger.Errorf("read data error:%v, data:%v, vendorID:%d", err, user, vendorID) } // 在访问数据库出错的情况下,也需要自动接单 handleType = 1 } else { // 强制拒单 globals.SugarLogger.Infof("force reject order:%s, vendorID:%d", orderID, vendorID) handleType = -1 } } else { globals.SugarLogger.Infof("order:%s, vendorID:%d, mobile is empty, should accept order", orderID, vendorID) handleType = 1 } if handleType == 1 { handler(true) } else if handleType == -1 { handler(false) } return handleType } func (s *DefScheduler) mergeOrderStatusConfig(statusType, status int, purchaseVendorID int) (retVal *StatusActionConfig) { vendorTimeout := partner.GetPurchasePlatformFromVendorID(purchaseVendorID).GetStatusActionTimeout(statusType, status) defConfig := s.defWorkflowConfig[statusType][status] if defConfig == nil { return nil } retVal = &StatusActionConfig{} if defConfig != nil { *retVal = *defConfig } if vendorTimeout != 0 { retVal.Timeout = vendorTimeout } return retVal } func (s *DefScheduler) updateOrderByStatus(order *model.GoodsOrder, status *model.OrderStatus) (retVal *model.GoodsOrder) { order.Status = status.Status order.VendorStatus = status.VendorStatus order.StatusTime = status.StatusTime order.LockStatus = status.LockStatus return order } func (s *DefScheduler) updateOrderByBill(order *model.GoodsOrder, bill *model.Waybill, revertStatus bool) { if order.Status > model.OrderStatusEndBegin { return } if bill.WaybillVendorID == model.VendorIDUnknown { bill.VendorWaybillID = "" } partner.CurOrderManager.UpdateWaybillVendorID(bill, revertStatus) order.WaybillVendorID = bill.WaybillVendorID order.VendorWaybillID = bill.VendorWaybillID if revertStatus { order.Status = model.OrderStatusFinishedPickup } } func (s *DefScheduler) autoPickupGood(order *model.GoodsOrder) (err error) { err = s.PickupGoods(order, "") if err == scheduler.ErrOrderStatusAlreadySatisfyCurOperation { err = nil } return err } func (s *DefScheduler) isBillCandidate(order *model.GoodsOrder, bill *model.Waybill) bool { return order.WaybillVendorID == bill.WaybillVendorID && order.VendorWaybillID == bill.VendorWaybillID } func (s *DefScheduler) ProxyCancelWaybill(order *model.GoodsOrder, bill *model.Waybill, cancelReasonID int, cancelReason string) (err error) { globals.SugarLogger.Debugf("ProxyCancelWaybill orderID:%s", order.VendorOrderID) if (order.DeliveryFlag & model.OrderDeliveryFlagMaskScheduleDisabled) == 0 { return s.CancelWaybill(bill, cancelReasonID, cancelReason) } globals.SugarLogger.Debugf("ProxyCancelWaybill orderID:%s stop schedule, bypass CancelWaybill", order.VendorOrderID) return nil }