Files
jx-callback/business/scheduler/defsch/defsch.go

484 lines
21 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package defsch
import (
"time"
"math/rand"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxutils"
"git.rosy.net.cn/jx-callback/business/legacymodel"
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/scheduler"
"git.rosy.net.cn/jx-callback/globals"
"github.com/astaxie/beego/orm"
)
const (
time2Delivered = 1 * time.Hour // 正常从下单到送达的时间。
time2Schedule3rdCarrier = 330 * time.Second // 京东要求5分钟后才能转自送保险起见设置为5分半钟
// time2Schedule3rdCarrierGap4OrderStatus = 3 * time.Minute // 京东要求是运单状态为待抢单且超时5分钟但为了防止没有运单事件所以就拣货完成事件开始算添加3分钟
time2AutoPickupMin = 15 * time.Minute
time2AutoPickupGap = 5 * 60 //随机5分钟
// 把pending order timerout 在-pendingOrderTimerMinMinSecond至pendingOrderTimerMaxSecond映射到pendingOrderTimerMinSecond至pendingOrderTimerMaxSecond
pendingOrderTimerMinMinSecond = 5 * 60 // 10分钟
pendingOrderTimerMinSecond = 2
pendingOrderTimerMaxSecond = 5
maxWaybillRetryCount = 2
orderMapStoreMaxTime = 4 * 24 * time.Hour // cache最长存储时间
)
type WatchOrderInfo struct {
order *model.GoodsOrder // order里的信息是保持更新的
waybills []*model.Waybill // 这个waybills里的状态信息是不真实的只使用id相关的信息
timerStatusType int // 0表示订单1表示运单
timerStatus int
timer *time.Timer
retryCount int // 失败后尝试的次数,调试阶段可能出现死循化,阻止这种情况发生
}
// 重要:此调度器要求同一定单的处理逻辑必须是序列化了的,不然会有并发问题
type DefScheduler struct {
scheduler.BaseScheduler
defWorkflowConfig []map[int]*scheduler.StatusActionConfig
orderMap jxutils.SyncMapWithTimeout
}
func init() {
sch := &DefScheduler{}
sch.IsReallyCallPlatformAPI = globals.ReallyCallPlatformAPI
sch.Init()
scheduler.CurrentScheduler = sch
sch.defWorkflowConfig = []map[int]*scheduler.StatusActionConfig{
map[int]*scheduler.StatusActionConfig{
model.OrderStatusNew: &scheduler.StatusActionConfig{ // 自动接单
TimerType: scheduler.TimerTypeBaseNow,
Timeout: 1 * time.Second,
TimeoutAction: func(order *model.GoodsOrder) (err error) {
_ = sch.handleAutoAcceptOrder(order.VendorOrderID, order.VendorID, order.ConsigneeMobile, jxutils.GetJxStoreIDFromOrder(order), nil, func(isAcceptIt bool) error {
if err = sch.AcceptOrRefuseOrder(order, isAcceptIt); err != nil {
// 为了解决京东新消息与接单消息乱序的问题
if errWithCode, ok := err.(*utils.ErrorWithCode); ok && errWithCode.Level() == 1 && errWithCode.IntCode() == -1 {
if order2, err2 := sch.GetPurchasePlatformFromVendorID(order.VendorID).GetOrder(order.VendorOrderID); err2 == nil && order2.Status > order.Status {
sch.OnOrderStatusChanged(model.Order2Status(order2), false)
err = nil
} else {
err = err2
}
}
}
return err
})
return nil
},
},
model.OrderStatusAccepted: &scheduler.StatusActionConfig{ // 自动拣货
TimerType: scheduler.TimerTypeBaseStatusTime,
Timeout: time2AutoPickupMin,
TimeoutGap: time2AutoPickupGap,
TimeoutAction: func(order *model.GoodsOrder) (err error) {
return sch.PickedUpGoods(order)
},
},
},
map[int]*scheduler.StatusActionConfig{
model.WaybillStatusNew: &scheduler.StatusActionConfig{ // 尝试召唤更多物流
TimerType: scheduler.TimerTypeBaseStatusTime,
Timeout: time2Schedule3rdCarrier,
TimeoutAction: func(order *model.GoodsOrder) (err error) {
return sch.createWaybillOn3rdProviders(order, nil)
},
},
},
}
}
// 以下是订单
func (s *DefScheduler) OnOrderNew(order *model.GoodsOrder, isPending bool) (err error) {
globals.SugarLogger.Debugf("OnOrderNew orderID:%s", order.VendorOrderID)
savedOrderInfo := s.loadSavedOrderFromMap(model.Order2Status(order), false)
if savedOrderInfo == nil {
savedOrderInfo = &WatchOrderInfo{
order: order,
}
s.orderMap.StoreWithTimeout(jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID), savedOrderInfo, orderMapStoreMaxTime)
} else {
savedOrderInfo.order = order // 调整单或消息错序都可能进到这里来
}
s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeOrder, savedOrderInfo.order.Status, false)
return err
}
func (s *DefScheduler) OnOrderStatusChanged(status *model.OrderStatus, isPending bool) (err error) {
if status.Status > model.OrderStatusUnknown { // 只处理状态转换,一般消息不处理
globals.SugarLogger.Debugf("OnOrderStatusChanged orderID:%s %s, status:%v", status.VendorOrderID, model.OrderStatusName[status.Status], status)
savedOrderInfo := s.loadSavedOrderFromMap(status, true)
s.updateOrderByStatus(savedOrderInfo.order, status)
s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeOrder, savedOrderInfo.order.Status, false)
if status.Status >= model.OrderStatusEndBegin {
s.cancelOtherWaybills(savedOrderInfo, nil)
s.orderMap.Delete(jxutils.GetUniversalOrderIDFromOrderStatus(status))
}
}
return err
}
// 以下是运单
func (s *DefScheduler) OnWaybillStatusChanged(bill *model.Waybill, isPending bool) (err error) {
if bill.Status > model.WaybillStatusUnknown {
globals.SugarLogger.Debugf("OnWaybillStatusChanged orderID:%s %s, bill:%v", bill.VendorOrderID, model.WaybillStatusName[bill.Status], bill)
savedOrderInfo := s.loadSavedOrderFromMap(model.Waybill2Status(bill), true)
order := savedOrderInfo.order
if order.Status < model.OrderStatusFinishedPickup || order.Status > model.OrderStatusEndBegin { // 如果当前order状态是不应该出现运单状态
globals.SugarLogger.Infof("OnWaybillStatusChanged orderID:%s status:%s is not suitable for waybill", order.VendorOrderID, model.OrderStatusName[order.Status])
s.CancelWaybill(bill)
s.stopTimer(savedOrderInfo)
s.orderMap.Delete(jxutils.GetUniversalOrderIDFromOrderStatus(model.Order2Status(order)))
return nil
}
if bill.Status == model.WaybillStatusNew {
s.addWaybill2Map(savedOrderInfo, bill)
if order.WaybillVendorID != model.VendorIDUnknown {
globals.SugarLogger.Debugf("OnWaybillStatusChanged multiple waybill created, bill:%v", bill)
if !(order.WaybillVendorID == bill.WaybillVendorID && order.VendorWaybillID == bill.VendorWaybillID) && bill.WaybillVendorID != order.VendorID {
s.CancelWaybill(bill)
} else if bill.WaybillVendorID == order.VendorID {
globals.SugarLogger.Warnf("OnWaybillStatusChanged bill:%v purchase platform bill came later than others, strange!!!", bill)
}
}
s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeWaybill, bill.Status, false)
} else {
switch bill.Status {
case model.WaybillStatusAccepted:
if order.WaybillVendorID == model.VendorIDUnknown {
s.cancelOtherWaybills(savedOrderInfo, bill)
s.updateOrderByBill(order, bill, false)
} else {
s.CancelWaybill(bill)
globals.SugarLogger.Warnf("OnWaybillStatusChanged Accepted orderID:%s got multiple bill:%v, order details:%v", order.VendorOrderID, bill, order)
}
s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeWaybill, bill.Status, false)
case model.WaybillStatusAcceptCanceled:
if order.WaybillVendorID == bill.WaybillVendorID && order.VendorWaybillID == bill.VendorWaybillID {
bill.WaybillVendorID = model.VendorIDUnknown
s.updateOrderByBill(order, bill, false)
s.createWaybillOn3rdProviders(order, bill)
s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeWaybill, bill.Status, false)
} else if order.WaybillVendorID != model.VendorIDUnknown {
s.CancelWaybill(bill)
globals.SugarLogger.Warnf("OnWaybillStatusChanged AcceptCanceled orderID:%s got multiple bill:%v, order details:%v", order.VendorOrderID, bill, order)
}
case model.WaybillStatusCourierArrived: // do nothing
if order.WaybillVendorID == bill.WaybillVendorID && order.VendorWaybillID == bill.VendorWaybillID {
} else {
// s.CancelWaybill(bill)
globals.SugarLogger.Warnf("OnWaybillStatusChanged CourierArrived order(%d, %s) bill(%d, %s), bill:%v shouldn't got here", order.WaybillVendorID, order.VendorWaybillID, bill.WaybillVendorID, bill.VendorWaybillID, bill)
}
s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeWaybill, bill.Status, false)
case model.WaybillStatusFailed: // todo WaybillStatusFailed理解成订单整个失败了不需要再尝试创建运单了注意这里应该加个zabbix日志的报警
s.removeWaybillFromMap(savedOrderInfo, bill)
if order.WaybillVendorID == bill.WaybillVendorID && order.VendorWaybillID == bill.VendorWaybillID {
globals.SugarLogger.Infof("OnWaybillStatusChanged WaybillStatusFailed, bill:%v", bill)
if order.WaybillVendorID == bill.WaybillVendorID {
bill.WaybillVendorID = model.VendorIDUnknown
s.updateOrderByBill(order, bill, true)
}
s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeWaybill, bill.Status, false)
} else {
globals.SugarLogger.Warnf("OnWaybillStatusChanged Failed bill:%v shouldn't got here", bill)
}
case model.WaybillStatusCanceled:
s.removeWaybillFromMap(savedOrderInfo, bill)
if order.WaybillVendorID == bill.WaybillVendorID && order.VendorWaybillID == bill.VendorWaybillID {
bill.WaybillVendorID = model.VendorIDUnknown
s.updateOrderByBill(order, bill, true)
savedOrderInfo.retryCount++
if savedOrderInfo.retryCount <= maxWaybillRetryCount {
s.createWaybillOn3rdProviders(order, nil)
} else {
globals.SugarLogger.Warnf("OnWaybillStatusChanged Canceled bill:%v failed %d times, stop schedule", bill, savedOrderInfo.retryCount)
}
s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeWaybill, bill.Status, false)
}
case model.WaybillStatusDelivering:
if order.WaybillVendorID == bill.WaybillVendorID && order.VendorWaybillID == bill.VendorWaybillID {
if order.VendorID != bill.WaybillVendorID {
s.SelfDeliverDelievering(order)
}
} else {
// s.CancelWaybill(bill)
globals.SugarLogger.Warnf("OnWaybillStatusChanged Delivering order(%d, %s) bill(%d, %s), bill:%v shouldn't got here", order.WaybillVendorID, order.VendorWaybillID, bill.WaybillVendorID, bill.VendorWaybillID, bill)
}
s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeWaybill, bill.Status, false)
case model.WaybillStatusDelivered:
s.removeWaybillFromMap(savedOrderInfo, bill)
if order.WaybillVendorID == bill.WaybillVendorID && order.VendorWaybillID == bill.VendorWaybillID {
if order.VendorID != bill.WaybillVendorID {
s.SelfDeliverDelievered(order)
}
} else {
globals.SugarLogger.Warnf("OnWaybillStatusChanged Delivered order(%d, %s) bill(%d, %s), bill:%v shouldn't got here", order.WaybillVendorID, order.VendorWaybillID, bill.WaybillVendorID, bill.VendorWaybillID, bill)
}
s.resetTimer(savedOrderInfo, scheduler.TimerStatusTypeWaybill, bill.Status, false)
}
}
}
return err
}
func (s *DefScheduler) addWaybill2Map(savedOrderInfo *WatchOrderInfo, bill *model.Waybill) {
for _, v := range savedOrderInfo.waybills {
if v.VendorWaybillID == bill.VendorWaybillID && v.WaybillVendorID == bill.WaybillVendorID {
// 如果已经存在,不做处理
globals.SugarLogger.Warnf("addWaybill2Map bill:%v already exists", bill)
return
}
}
savedOrderInfo.waybills = append(savedOrderInfo.waybills, bill)
}
func (s *DefScheduler) removeWaybillFromMap(savedOrderInfo *WatchOrderInfo, bill *model.Waybill) {
for k, v := range savedOrderInfo.waybills {
if v.VendorWaybillID == bill.VendorWaybillID && v.WaybillVendorID == bill.WaybillVendorID {
savedOrderInfo.waybills = append(savedOrderInfo.waybills[0:k], savedOrderInfo.waybills[k+1:]...)
break
}
}
}
func (s *DefScheduler) createWaybillOn3rdProviders(order *model.GoodsOrder, excludeBill *model.Waybill) (err error) {
globals.SugarLogger.Debugf("createWaybillOn3rdProviders, orderID:%s, status:%d, excludeBill:%v", order.VendorOrderID, order.Status, excludeBill)
if order.Status == model.OrderStatusFinishedPickup {
if s.isOrderSupport3rdDelivery(order) {
successCount := 0
for vendorID := range s.DeliveryPlatformHandlers {
if (excludeBill == nil || vendorID != excludeBill.WaybillVendorID) && s.DeliveryPlatformHandlers[vendorID].Use4CreateWaybill {
if err = s.CreateWaybill(vendorID, order); err == nil {
successCount++
}
}
}
if successCount != 0 {
return nil
}
globals.SugarLogger.Infof("createWaybillOn3rdProviders, orderID:%s all failed", order.VendorOrderID)
return scheduler.ErrCanNotCreateAtLeastOneWaybill
}
globals.SugarLogger.Debugf("createWaybillOn3rdProviders, orderID:%s, store:%d dont't support 3rd delivery platform", order.VendorOrderID, jxutils.GetJxStoreIDFromOrder(order))
} else {
globals.SugarLogger.Warnf("createWaybillOn3rdProviders, orderID:%s, status:%d doesn't match model.OrderStatusFinishedPickup, bypass", order.VendorOrderID, order.Status)
}
return nil
}
func (s *DefScheduler) cancelOtherWaybills(savedOrderInfo *WatchOrderInfo, bill2Keep *model.Waybill) (err error) {
globals.SugarLogger.Debugf("cancelOtherWaybills, orderID:%s, bill:%v", savedOrderInfo.order.VendorOrderID, bill2Keep)
for _, v := range savedOrderInfo.waybills {
if (v.OrderVendorID != v.WaybillVendorID) && (bill2Keep == nil || !(v.WaybillVendorID == bill2Keep.WaybillVendorID && v.VendorWaybillID == bill2Keep.VendorWaybillID)) {
s.CancelWaybill(v)
}
}
if bill2Keep != nil && bill2Keep.WaybillVendorID != bill2Keep.OrderVendorID {
s.swtich2SelfDeliverWithRetry(savedOrderInfo.order, bill2Keep, 2, 10*time.Second)
}
return nil
}
func (s *DefScheduler) swtich2SelfDeliverWithRetry(order *model.GoodsOrder, bill *model.Waybill, retryCount int, duration time.Duration) {
globals.SugarLogger.Debugf("swtich2SelfDeliverWithRetry orderID:%s", order.VendorOrderID)
// 当前先不加RETRY逻辑
if err := s.Swtich2SelfDeliver(order); err != nil {
globals.SugarLogger.Infof("swtich2SelfDeliverWithRetry failed, cancel bill:%v, err:%v", bill, err)
s.CancelWaybill(bill)
}
}
// 这个函数这样写的原因是适应一些消息错序
func (s *DefScheduler) loadSavedOrderFromMap(status *model.OrderStatus, isAutoLoad bool) *WatchOrderInfo {
globals.SugarLogger.Debugf("loadSavedOrderFromMap status:%v", status)
universalOrderID := jxutils.ComposeUniversalOrderID(status.RefVendorOrderID, status.RefVendorID)
var realSavedInfo *WatchOrderInfo
if savedInfo, ok := s.orderMap.Load(universalOrderID); ok {
realSavedInfo = savedInfo.(*WatchOrderInfo)
}
if isAutoLoad && (realSavedInfo == nil || !model.IsOrderSolid(realSavedInfo.order)) {
if realSavedInfo == nil {
realSavedInfo = new(WatchOrderInfo)
s.orderMap.StoreWithTimeout(universalOrderID, realSavedInfo, orderMapStoreMaxTime)
} else {
globals.SugarLogger.Infof("loadSavedOrderFromMap order is incomplete, orderID:%s, load it", status.RefVendorOrderID)
}
if order, err := s.CurOrderManager.LoadOrder(status.RefVendorOrderID, status.RefVendorID); err == nil {
realSavedInfo.order = order
} else {
realSavedInfo.order = &model.GoodsOrder{
VendorOrderID: status.RefVendorOrderID,
VendorID: status.RefVendorID,
Status: status.Status,
StatusTime: status.StatusTime,
OrderCreatedAt: status.StatusTime,
WaybillVendorID: model.VendorIDUnknown,
}
globals.SugarLogger.Infof("loadSavedOrderFromMap can not load order orderID:%s", status.VendorOrderID)
}
}
return realSavedInfo
}
func (s *DefScheduler) stopTimer(savedOrderInfo *WatchOrderInfo) {
if savedOrderInfo.timer != nil {
globals.SugarLogger.Debugf("stopTimer orderID:%s", savedOrderInfo.order.VendorOrderID)
savedOrderInfo.timer.Stop()
savedOrderInfo.timerStatus = 0
savedOrderInfo.timer = nil
}
}
func (s *DefScheduler) resetTimer(savedOrderInfo *WatchOrderInfo, statusType, status int, isPending bool) {
order := savedOrderInfo.order
globals.SugarLogger.Debugf("resetTimer, orderID:%s status:%v", order.VendorOrderID, status)
if statusType != savedOrderInfo.timerStatusType || status >= savedOrderInfo.timerStatus { // 新设置的TIMER不能覆盖状态在其后的TIMER如果状态回绕需要注意
config := s.mergeOrderStatusConfig(statusType, status, s.GetPurchasePlatformFromVendorID(order.VendorID).GetStatusActionConfig(statusType, status))
if config == nil || config.TimerType != scheduler.TimerTypeByPass {
s.stopTimer(savedOrderInfo)
}
if config != nil && config.TimeoutAction != nil && config.TimerType != scheduler.TimerTypeNoTimer && config.TimerType != scheduler.TimerTypeByPass {
var timeout time.Duration
switch config.TimerType {
case scheduler.TimerTypeBaseNow:
timeout = config.Timeout
case scheduler.TimerTypeBaseStatusTime:
timeout = order.StatusTime.Sub(time.Now()) + config.Timeout
case scheduler.TimerTypeBaseExpectedDeliveredTime:
expectedDeliveredTime := order.ExpectedDeliveredTime
if expectedDeliveredTime == utils.DefaultTimeValue { // 如果没有期望送达时间则以订单创建时间加DefaultTimeValue来表示
expectedDeliveredTime = order.OrderCreatedAt.Add(time2Delivered)
}
timeout = expectedDeliveredTime.Add(-config.Timeout).Sub(time.Now())
default:
panic("TimerType is wrong!!!")
}
if config.TimeoutGap != 0 {
timeout += time.Duration(rand.Int31n(int32(config.TimeoutGap))) * time.Second
}
if isPending && timeout < pendingOrderTimerMaxSecond*time.Second { // 如果是PENDING的订单则将其分布到2--5秒内让后续事件有机会执行
timeout = time.Duration(jxutils.MapValue2Scope(int64(timeout), -pendingOrderTimerMinMinSecond*1000, pendingOrderTimerMaxSecond*1000, pendingOrderTimerMinSecond*1000, pendingOrderTimerMaxSecond*1000)) * time.Millisecond
} else if timeout < 0 {
timeout = 0
}
if timeout == 0 {
config.TimeoutAction(order)
} else {
savedOrderInfo.timerStatusType = statusType
savedOrderInfo.timerStatus = status
savedOrderInfo.timer = time.AfterFunc(timeout, func() {
jxutils.CallMsgHandlerAsync(func() {
config.TimeoutAction(order)
savedOrderInfo.timerStatus = 0
savedOrderInfo.timerStatusType = scheduler.TimerStatusTypeUnknown
}, order.VendorOrderID)
})
}
globals.SugarLogger.Debugf("resetTimer orderID:%s, status:%d, timeout:%v", order.VendorOrderID, status, timeout)
}
}
}
func (s *DefScheduler) handleAutoAcceptOrder(orderID string, vendorID int, userMobile string, jxStoreID int, db orm.Ormer, handler func(accepted bool) error) int {
handleType := 0
if userMobile != "" {
if db == nil {
db = orm.NewOrm()
}
user := &legacymodel.BlackClient{
Mobile: userMobile,
}
if err := db.Read(user, "Mobile"); err != nil {
if err != orm.ErrNoRows {
globals.SugarLogger.Errorf("read data error:%v, data:%v, vendorID:%d", err, user, vendorID)
}
// 在访问数据库出错的情况下,也需要自动接单
handleType = 1
} else {
// 强制拒单
globals.SugarLogger.Infof("force reject order:%s, vendorID:%d", orderID, vendorID)
handleType = -1
}
} else {
globals.SugarLogger.Infof("order:%s, vendorID:%d, mobile is empty, should accept order", orderID, vendorID)
handleType = 1
}
if handleType == 1 {
handler(true)
} else if handleType == -1 {
handler(false)
}
return handleType
}
func (s *DefScheduler) mergeOrderStatusConfig(statusType, status int, config *scheduler.StatusActionConfig) (retVal *scheduler.StatusActionConfig) {
defConfig := s.defWorkflowConfig[statusType][status]
if defConfig == nil && config == nil {
return nil
}
retVal = &scheduler.StatusActionConfig{}
if defConfig != nil {
retVal.Timeout = defConfig.Timeout
retVal.TimeoutAction = defConfig.TimeoutAction
}
if config != nil {
if config.Timeout >= 0 {
retVal.Timeout = config.Timeout
}
if config.TimeoutAction != nil {
retVal.TimeoutAction = config.TimeoutAction
}
}
return retVal
}
func (s *DefScheduler) updateOrderByStatus(order *model.GoodsOrder, status *model.OrderStatus) (retVal *model.GoodsOrder) {
order.Status = status.Status
order.VendorStatus = status.VendorStatus
order.StatusTime = status.StatusTime
return order
}
func (s *DefScheduler) isOrderSupport3rdDelivery(order *model.GoodsOrder) (retVal bool) {
storefeature := &model.Jxstorefeature{
Id: jxutils.GetJxStoreIDFromOrder(order),
}
db := orm.NewOrm()
utils.CallFuncLogError(func() error {
err := db.Read(storefeature, "Id")
if err == nil {
if (order.VendorID == model.VendorIDJD && storefeature.Deliverycompetition == 1) ||
(order.VendorID == model.VendorIDELM && storefeature.Transmtzs == 1) {
retVal = true
}
}
return err
}, "isOrderSupport3rdDelivery")
return retVal
}
func (s *DefScheduler) updateOrderByBill(order *model.GoodsOrder, bill *model.Waybill, revertStatus bool) {
if bill.WaybillVendorID == model.VendorIDUnknown {
bill.VendorWaybillID = ""
}
s.CurOrderManager.UpdateWaybillVendorID(bill, revertStatus)
order.WaybillVendorID = bill.WaybillVendorID
order.VendorWaybillID = bill.VendorWaybillID
if revertStatus {
order.Status = model.OrderStatusFinishedPickup
}
}