Files
jx-callback/business/scheduler/defsch/defsch.go
gazebo 0cb34fe089 - all call CallFuncLogError add order info
- callLegacyMsgHandler, callNewMsgHandler and generateLegacyJxOrder config
- call legacy lefted msg handler in new process(elm urge order and bad comment on jd)
2018-07-22 13:45:16 +08:00

374 lines
15 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package defsch
import (
"time"
"math/rand"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxutils"
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/scheduler"
"git.rosy.net.cn/jx-callback/globals"
"git.rosy.net.cn/jx-callback/legacy/models"
"github.com/astaxie/beego/orm"
)
const (
time2Delivered = 1 * time.Hour // 正常订单都是1小时达
time2Schedule3rdCarrier = 330 * time.Second // 京东要求5分钟后才能转自送保险起见设置为5分半钟
time2Schedule3rdCarrierGap4OrderStatus = 3 * time.Minute // 京东要求是运单状态为待抢单且超时5分钟但为了防止没有运单事件所以就拣货完成事件开始算添加3分钟
time2AutoPickupMin = 15 * time.Minute
time2AutoPickupGap = 5 * time.Minute
minTimeout = 5 * time.Second // timer的最小时间这样写的上的是在load pending orders让延迟的事件有机会执行
)
type WatchOrderInfo struct {
order *model.GoodsOrder // order里的信息是保持更新的
dirty int // 因为京东事件序列New与Accepted有极少数情况下会错序处理延迟加载
waybills []*model.Waybill // 这个waybills里的状态信息是不真实的只使用id相关的信息
timerStatus int
timer *time.Timer
}
// 重要:此调度器要求同一定单的处理逻辑必须是序列化了的,不然会有并发问题
type DefScheduler struct {
scheduler.BaseScheduler
defWorkflowConfig map[int]*scheduler.StatusActionConfig
orderMap jxutils.SyncMapWithTimeout
}
func init() {
sch := &DefScheduler{}
sch.IsReallyCallPlatformAPI = globals.ReallyCallPlatformAPI
sch.Init()
scheduler.CurrentScheduler = sch
sch.defWorkflowConfig = map[int]*scheduler.StatusActionConfig{
model.OrderStatusNew: &scheduler.StatusActionConfig{ // 自动接单
Timeout: 1 * time.Second,
TimeoutAction: func(order *model.GoodsOrder) (err error) {
_ = sch.handleAutoAcceptOrder(order.VendorOrderID, order.VendorID, order.ConsigneeMobile, jxutils.GetJxStoreIDFromOrder(order), nil, func(isAcceptIt bool) error {
return sch.AcceptOrRefuseOrder(order, isAcceptIt)
})
return nil
},
},
model.OrderStatusAccepted: &scheduler.StatusActionConfig{ // 自动拣货
Timeout: time2AutoPickupMin,
TimeoutAction: func(order *model.GoodsOrder) (err error) {
return sch.PickedUpGoods(order)
},
},
model.OrderStatusFinishedPickup: &scheduler.StatusActionConfig{ // 尝试召唤更多物流
Timeout: time2Schedule3rdCarrier,
TimeoutAction: func(order *model.GoodsOrder) (err error) {
return sch.createWaybillOn3rdProviders(order, nil)
},
},
}
}
// 以下是订单
func (s *DefScheduler) OnOrderNew(order *model.GoodsOrder) (err error) {
globals.SugarLogger.Debugf("OnOrderNew, order:%v", order)
watchInfo := &WatchOrderInfo{
order: order,
}
s.orderMap.Store(jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID), watchInfo)
s.resetTimer(watchInfo, model.OrderStatusNew, order.OrderCreatedAt, 0)
return err
}
func (s *DefScheduler) OnOrderStatusChanged(status *model.OrderStatus) (err error) {
if status.Status > model.OrderStatusUnknown { // 只处理状态转换,一般消息不处理
globals.SugarLogger.Debugf("OnOrderStatusChanged, status:%v", status)
if savedOrderInfo := s.loadSavedOrderFromMap(status); savedOrderInfo != nil {
s.updateOrderByStatus(savedOrderInfo.order, status)
if status.Status > model.OrderStatusUnknown && status.Status < model.OrderStatusEndBegin {
if !(status.Status == model.OrderStatusFinishedPickup && len(savedOrderInfo.waybills) > 0) { //饿了么还观察到运单消息早于拣货完成消息
gap := 0 * time.Second
beginTime := status.StatusTime
if status.Status == model.OrderStatusNew {
beginTime = time.Now()
} else if status.Status == model.OrderStatusAccepted {
gap = time.Duration(rand.Int63n(int64(time2AutoPickupGap)))
beginTime = s.getBeginTime4LatestPickup(savedOrderInfo.order)
} else if status.Status == model.OrderStatusFinishedPickup {
// 召唤三方配送
// 正常应该是只依赖于购物平台的第一个运单消息但饿了么有观察到极少数情况下没有此事件所以还是需要在这里加个保险的TIMER来驱动运单调度
gap = time2Schedule3rdCarrierGap4OrderStatus
}
s.resetTimer(savedOrderInfo, status.Status, beginTime, gap)
}
} else {
s.stopTimer(savedOrderInfo)
s.cancelOtherWaybills(savedOrderInfo, nil)
s.orderMap.Delete(jxutils.GetUniversalOrderIDFromOrderStatus(status))
}
} else {
err = scheduler.ErrCanNotFindOrder
}
}
return err
}
// 以下是运单
func (s *DefScheduler) OnWaybillStatusChanged(bill *model.Waybill) (err error) {
if bill.Status > model.WaybillStatusUnknown {
globals.SugarLogger.Debugf("OnWaybillStatusChanged, bill:%v", bill)
if savedOrderInfo := s.loadSavedOrderFromMap(model.Waybill2Status(bill)); savedOrderInfo != nil {
s.addWaybill2Map(savedOrderInfo, bill) // 这样写的原因是因为调试时程度从中途运行没有接受到WaybillStatusNew事件
if bill.Status == model.WaybillStatusNew {
if bill.OrderVendorID == bill.WaybillVendorID {
if savedOrderInfo.timerStatus == model.OrderStatusFinishedPickup { // 如果当前TIMER还是OrderStatusFinishedPickup在OnOrderStatusChanged中设置的则重置
s.resetTimer(savedOrderInfo, model.OrderStatusFinishedPickup, bill.StatusTime, 0)
} else if savedOrderInfo.timerStatus != 0 {
globals.SugarLogger.Infof("OnWaybillStatusChanged met other timer, status:%d", savedOrderInfo.timerStatus)
}
}
if savedOrderInfo.order.WaybillVendorID != model.VendorIDUnknown {
globals.SugarLogger.Infof("OnWaybillStatusChanged multiple waybill created, bill:%v", bill)
if bill.WaybillVendorID != bill.WaybillVendorID {
s.CancelWaybill(bill)
}
}
} else {
switch bill.Status {
case model.WaybillStatusAccepted:
s.stopTimer(savedOrderInfo) // todo 这里应该另外启动一个TIMER
s.cancelOtherWaybills(savedOrderInfo, bill)
s.CurOrderManager.UpdateWaybillVendorID(bill)
savedOrderInfo.order.WaybillVendorID = bill.WaybillVendorID
case model.WaybillStatusAcceptCanceled:
if savedOrderInfo.order.WaybillVendorID == bill.WaybillVendorID {
s.createWaybillOn3rdProviders(savedOrderInfo.order, bill)
bill.WaybillVendorID = model.VendorIDUnknown
s.CurOrderManager.UpdateWaybillVendorID(bill)
savedOrderInfo.order.WaybillVendorID = bill.WaybillVendorID
}
case model.WaybillStatusFailed: // todo WaybillStatusFailed理解成订单整个失败了不需要再尝试创建运单了注意这里应该加个zabbix日志的报警
s.removeWaybillFromMap(savedOrderInfo, bill)
globals.SugarLogger.Infof("OnWaybillStatusChanged WaybillStatusFailed, bill:%v", bill)
case model.WaybillStatusCanceled:
s.removeWaybillFromMap(savedOrderInfo, bill)
if savedOrderInfo.order.WaybillVendorID == bill.WaybillVendorID {
s.createWaybillOn3rdProviders(savedOrderInfo.order, nil)
bill.WaybillVendorID = model.VendorIDUnknown
s.CurOrderManager.UpdateWaybillVendorID(bill)
savedOrderInfo.order.WaybillVendorID = bill.WaybillVendorID
}
case model.WaybillStatusDelivering:
if savedOrderInfo.order.VendorID != bill.WaybillVendorID {
s.SelfDeliverDelievering(savedOrderInfo.order)
}
case model.WaybillStatusDelivered:
s.removeWaybillFromMap(savedOrderInfo, bill)
if savedOrderInfo.order.VendorID != bill.WaybillVendorID {
s.SelfDeliverDelievered(savedOrderInfo.order)
}
}
}
} else {
err = scheduler.ErrCanNotFindOrder
}
}
return err
}
func (s *DefScheduler) addWaybill2Map(savedOrderInfo *WatchOrderInfo, bill *model.Waybill) {
for _, v := range savedOrderInfo.waybills {
if v.VendorWaybillID == bill.VendorWaybillID && v.WaybillVendorID == bill.WaybillVendorID {
// 如果已经存在,不做处理
// globals.SugarLogger.Infof("addWaybill2Map bill:%v already exists", bill)
return
}
}
savedOrderInfo.waybills = append(savedOrderInfo.waybills, bill)
}
func (s *DefScheduler) removeWaybillFromMap(savedOrderInfo *WatchOrderInfo, bill *model.Waybill) {
for k, v := range savedOrderInfo.waybills {
if v.VendorWaybillID == bill.VendorWaybillID && v.WaybillVendorID == bill.WaybillVendorID {
savedOrderInfo.waybills = append(savedOrderInfo.waybills[0:k], savedOrderInfo.waybills[k+1:]...)
break
}
}
}
func (s *DefScheduler) createWaybillOn3rdProviders(order *model.GoodsOrder, excludeBill *model.Waybill) (err error) {
globals.SugarLogger.Debugf("createWaybillOn3rdProviders, orderID:%s, excludeBill:%v", order.VendorOrderID, excludeBill)
successCount := 0
for vendorID := range s.DeliveryPlatformHandlers {
if excludeBill == nil || vendorID != excludeBill.WaybillVendorID {
if err = s.CreateWaybill(vendorID, order); err == nil {
successCount++
}
}
}
if successCount != 0 {
return nil
}
globals.SugarLogger.Warnf("createWaybillOn3rdProviders, orderID:%s all failed", order.VendorOrderID)
return scheduler.ErrCanNotCreateAtLeastOneWaybill
}
func (s *DefScheduler) cancelOtherWaybills(savedOrderInfo *WatchOrderInfo, bill *model.Waybill) (err error) {
globals.SugarLogger.Debugf("cancelOtherWaybills, orderID:%s, bill:%v", savedOrderInfo.order.VendorOrderID, bill)
for _, v := range savedOrderInfo.waybills {
if (v.OrderVendorID != v.WaybillVendorID) && (bill == nil || !(v.WaybillVendorID == bill.WaybillVendorID && v.VendorWaybillID == bill.VendorWaybillID)) {
s.CancelWaybill(v)
}
}
if bill != nil && bill.WaybillVendorID != bill.OrderVendorID {
s.swtich2SelfDeliverWithRetry(savedOrderInfo.order, bill, 2, 10*time.Second)
}
return nil
}
// todo 这个函数也可能有线程安全问题
func (s *DefScheduler) swtich2SelfDeliverWithRetry(order *model.GoodsOrder, bill *model.Waybill, retryCount int, duration time.Duration) {
globals.SugarLogger.Debugf("Swtich2SelfDeliver orderID:%s", order.VendorOrderID)
utils.CallFuncRetryAsync(func(index int) error {
err := s.Swtich2SelfDeliver(order)
if err != nil {
globals.SugarLogger.Infof("Swtich2SelfDeliver failed, orderID:%s, error:%v", order.VendorOrderID, err)
if err != nil && index == 0 {
globals.SugarLogger.Warnf("Swtich2SelfDeliver finally failed, orderID:%s, error:%v, have to cancel bill:%v", order.VendorOrderID, err, bill)
// 如果购买平台转商家自送失败最终还是要取消3方物流
s.CancelWaybill(bill)
}
}
return err
}, duration, retryCount)
}
// 这个函数这样写的原因是适应一些消息错序
func (s *DefScheduler) loadSavedOrderFromMap(status *model.OrderStatus) *WatchOrderInfo {
universalOrderID := jxutils.ComposeUniversalOrderID(status.VendorOrderID, status.VendorID)
var realSavedInfo *WatchOrderInfo
if savedInfo, ok := s.orderMap.Load(universalOrderID); ok {
realSavedInfo = savedInfo.(*WatchOrderInfo)
}
if realSavedInfo == nil || realSavedInfo.dirty == 1 {
if realSavedInfo == nil {
realSavedInfo = new(WatchOrderInfo)
s.orderMap.Store(universalOrderID, realSavedInfo)
} else {
realSavedInfo.dirty = 0
globals.SugarLogger.Infof("order is dirty, orderID:%s, load it", status.VendorOrderID)
}
if order, err := s.CurOrderManager.LoadOrder(status.VendorOrderID, status.VendorID); err == nil {
realSavedInfo.order = order
} else {
realSavedInfo.order = &model.GoodsOrder{
VendorOrderID: status.VendorOrderID,
VendorID: status.VendorID,
Status: status.Status,
StatusTime: status.StatusTime,
OrderCreatedAt: status.StatusTime,
WaybillVendorID: model.VendorIDUnknown,
}
realSavedInfo.dirty = 1
globals.SugarLogger.Infof("can not load order orderID:%s", status.VendorOrderID)
}
}
return realSavedInfo
}
func (s *DefScheduler) getBeginTime4LatestPickup(order *model.GoodsOrder) (retVal time.Time) {
if order.ExpectedDeliveredTime != utils.DefaultTimeValue {
return order.ExpectedDeliveredTime.Add(-time2Delivered)
}
return order.StatusTime
}
func (s *DefScheduler) stopTimer(savedOrderInfo *WatchOrderInfo) {
if savedOrderInfo.timer != nil {
globals.SugarLogger.Debugf("stopTimer orderid:%v", savedOrderInfo.order.VendorOrderID)
savedOrderInfo.timer.Stop()
}
}
func (s *DefScheduler) resetTimer(savedOrderInfo *WatchOrderInfo, status int, beginTime time.Time, gap time.Duration) {
globals.SugarLogger.Debugf("resetTimer status:%v, orderID:%s", status, savedOrderInfo.order.VendorOrderID)
if status >= savedOrderInfo.timerStatus { // 新设置的TIMER不能覆盖状态在其后的TIMER
s.stopTimer(savedOrderInfo)
config := s.mergeOrderStatusConfig(status, s.GetPurchasePlatformFromVendorID(savedOrderInfo.order.VendorID).GetStatusActionConfig(status))
if config != nil && config.TimeoutAction != nil {
timeout := jxutils.GetRealTimeout(beginTime, config.Timeout, minTimeout) + gap
globals.SugarLogger.Debugf("resetTimer timeout:%v, orderID:%s", timeout, savedOrderInfo.order.VendorOrderID)
savedOrderInfo.timerStatus = status
savedOrderInfo.timer = time.AfterFunc(timeout, func() {
config.TimeoutAction(savedOrderInfo.order)
savedOrderInfo.timerStatus = 0 // todo 可能有线程安全问题,考虑加入订单队列
})
}
} else {
globals.SugarLogger.Infof("resetTimer status revert, orderID:%s, current timer status:%d, status:%d", savedOrderInfo.order.VendorOrderID, savedOrderInfo.timerStatus, status)
}
}
func (s *DefScheduler) handleAutoAcceptOrder(orderID string, vendorID int, userMobile string, jxStoreID int, db orm.Ormer, handler func(accepted bool) error) int {
handleType := 0
if userMobile != "" {
if db == nil {
db = orm.NewOrm()
}
user := &models.BlackClient{
Mobile: userMobile,
}
if err := db.Read(user, "Mobile"); err != nil {
if err != orm.ErrNoRows {
globals.SugarLogger.Errorf("read data error:%v, data:%v, vendorID:%d", err, user, vendorID)
}
// 在访问数据库出错的情况下,也需要自动接单
handleType = 1
} else {
// 强制拒单
globals.SugarLogger.Infof("force reject order:%s, vendorID:%d", orderID, vendorID)
handleType = -1
}
} else {
globals.SugarLogger.Infof("order:%s, vendorID:%d, mobile is empty, should accept order", orderID, vendorID)
handleType = 1
}
if handleType == 1 {
handler(true)
} else if handleType == -1 {
handler(false)
}
return handleType
}
func (s *DefScheduler) mergeOrderStatusConfig(status int, config *scheduler.StatusActionConfig) (retVal *scheduler.StatusActionConfig) {
defConfig := s.defWorkflowConfig[status]
if defConfig == nil && config == nil {
return nil
}
retVal = &scheduler.StatusActionConfig{}
if defConfig != nil {
retVal.Timeout = defConfig.Timeout
retVal.TimeoutAction = defConfig.TimeoutAction
}
if config != nil {
if config.Timeout >= 0 {
retVal.Timeout = config.Timeout
}
if config.TimeoutAction != nil {
retVal.TimeoutAction = config.TimeoutAction
}
}
return retVal
}
func (s *DefScheduler) updateOrderByStatus(order *model.GoodsOrder, status *model.OrderStatus) (retVal *model.GoodsOrder) {
order.Status = status.Status
order.VendorStatus = status.VendorStatus
order.StatusTime = status.StatusTime
return order
}