Files
jx-callback/business/scheduler/defsch/defsch.go
gazebo a56cb64f8e - delivery fee added.
- fixed scheduler bug
2018-07-19 18:05:40 +08:00

347 lines
13 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package defsch
import (
"time"
"math/rand"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxutils"
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/scheduler"
"git.rosy.net.cn/jx-callback/globals"
"git.rosy.net.cn/jx-callback/legacy/models"
"github.com/astaxie/beego/orm"
)
const (
defTime2Delivered = 1 * time.Hour // 正常订单都是1小时达
defTime2Schedule3rdCarrier = 330 * time.Second // 京东要求5分钟后才能转自送保险起见设置为5分半钟
time2Schedule3rdCarrierGap4OrderStatus = 3 * time.Minute // 京东要求是运单状态为待抢单且超时5分钟但为了防止没有运单事件所以就拣货完成事件开始算添加3分钟
defTime2AutoPickupMin = 25 * time.Minute
time2AutoPickupGap = 5 * time.Minute
)
type WatchOrderInfo struct {
order *model.GoodsOrder // order里的信息是保持更新的
waybills []*model.Waybill // 这个waybills里的状态信息是不真实的只使用id相关的信息
timerStatus int
timer *time.Timer
}
// 重要:此调度器要求同一定单的处理逻辑必须是序列化了的,不然会有并发问题
type DefScheduler struct {
scheduler.BaseScheduler
defWorkflowConfig map[int]*scheduler.StatusActionConfig
orderMap jxutils.SyncMapWithTimeout
}
func init() {
sch := &DefScheduler{}
sch.Init()
scheduler.CurrentScheduler = sch
sch.defWorkflowConfig = map[int]*scheduler.StatusActionConfig{
model.OrderStatusNew: &scheduler.StatusActionConfig{ // 自动接单
Timeout: 1 * time.Second,
TimeoutAction: func(order *model.GoodsOrder) (err error) {
_ = sch.handleAutoAcceptOrder(order.VendorOrderID, order.VendorID, order.ConsigneeMobile, jxutils.GetJxStoreIDFromOrder(order), nil, func(isAcceptIt bool) error {
return sch.GetPurchasePlatformFromVendorID(order.VendorID).AcceptOrRefuseOrder(order, isAcceptIt)
})
return nil
},
},
model.OrderStatusAccepted: &scheduler.StatusActionConfig{ // 自动拣货
Timeout: defTime2AutoPickupMin,
TimeoutAction: func(order *model.GoodsOrder) (err error) {
return sch.GetPurchasePlatformFromVendorID(order.VendorID).PickedUpGoods(order)
},
},
model.OrderStatusFinishedPickup: &scheduler.StatusActionConfig{ // 尝试召唤更多物流
Timeout: defTime2Schedule3rdCarrier,
TimeoutAction: func(order *model.GoodsOrder) (err error) {
return sch.createWaybillOn3rdProviders(order, nil)
},
},
}
}
// 以下是订单
func (s *DefScheduler) OnOrderNew(order *model.GoodsOrder) (err error) {
globals.SugarLogger.Debugf("OnOrderNew, order:%v", order)
watchInfo := &WatchOrderInfo{
order: order,
}
s.orderMap.Store(jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID), watchInfo)
s.resetTimer(model.OrderStatusNew, watchInfo, 0)
return err
}
func (s *DefScheduler) OnOrderStatusChanged(status *model.OrderStatus) (err error) {
globals.SugarLogger.Debugf("OnOrderStatusChanged, status:%v", status)
savedOrderInfo := s.loadWatchOrderFromMap(status.VendorOrderID, status.VendorID)
if status.Status > model.OrderStatusUnknown && status.Status < model.OrderStatusEndBegin {
s.updateOrderByStatus(savedOrderInfo.order, status)
gap := 0 * time.Second
if status.Status == model.OrderStatusAccepted {
gap = time.Duration(rand.Int63n(int64(time2AutoPickupGap)))
} else if status.Status == model.OrderStatusFinishedPickup {
gap = time2Schedule3rdCarrierGap4OrderStatus
}
s.resetTimer(status.Status, savedOrderInfo, gap)
} else {
s.stopTimer(savedOrderInfo)
s.orderMap.Delete(jxutils.GetUniversalOrderIDFromOrderStatus(status))
}
return err
}
// 以下是运单
func (s *DefScheduler) OnWaybillStatusChanged(bill *model.Waybill) (err error) {
globals.SugarLogger.Debugf("OnWaybillStatusChanged, bill:%v", bill)
savedOrderInfo := s.loadWatchOrderFromMap(bill.VendorOrderID, bill.OrderVendorID)
if bill.Status == model.WaybillStatusNew {
if bill.OrderVendorID == bill.WaybillVendorID {
if savedOrderInfo.timerStatus == model.OrderStatusFinishedPickup {
s.resetTimer(model.OrderStatusFinishedPickup, savedOrderInfo, 0)
} else {
globals.SugarLogger.Infof("OnWaybillStatusChanged met other timer, status:%d", savedOrderInfo.timerStatus)
}
err = s.addWaybill2Map(savedOrderInfo, bill)
}
if savedOrderInfo.order.WaybillVendorID != model.VendorIDUnknown {
globals.SugarLogger.Infof("OnWaybillStatusChanged multiple waybill created, bill:%v", bill)
if bill.WaybillVendorID != bill.WaybillVendorID {
s.GetDeliveryPlatformFromVendorID(bill.WaybillVendorID).CancelWaybill(bill)
}
}
} else {
findIt := false
for _, v := range savedOrderInfo.waybills {
if v.VendorWaybillID == bill.VendorWaybillID && v.WaybillVendorID == bill.WaybillVendorID {
findIt = true
break
}
}
if findIt {
switch bill.Status {
case model.WaybillStatusAccepted:
s.stopTimer(savedOrderInfo) // todo 这里应该另外启动一个TIMER
s.cancelOtherWaybills(savedOrderInfo, bill)
s.CurOrderManager.UpdateWaybillVendorID(bill)
savedOrderInfo.order.WaybillVendorID = bill.WaybillVendorID
case model.WaybillStatusAcceptCanceled:
s.createWaybillOn3rdProviders(savedOrderInfo.order, bill)
if savedOrderInfo.order.WaybillVendorID == bill.WaybillVendorID {
bill.WaybillVendorID = model.VendorIDUnknown
s.CurOrderManager.UpdateWaybillVendorID(bill)
savedOrderInfo.order.WaybillVendorID = bill.WaybillVendorID
}
case model.WaybillStatusCanceled, model.WaybillStatusFailed:
s.removeWaybillFromMap(savedOrderInfo, bill)
if savedOrderInfo.order.WaybillVendorID == bill.WaybillVendorID {
s.createWaybillOn3rdProviders(savedOrderInfo.order, nil)
bill.WaybillVendorID = model.VendorIDUnknown
s.CurOrderManager.UpdateWaybillVendorID(bill)
savedOrderInfo.order.WaybillVendorID = bill.WaybillVendorID
}
case model.WaybillStatusDelivering:
if savedOrderInfo.order.VendorID != bill.WaybillVendorID {
s.GetPurchasePlatformFromVendorID(bill.OrderVendorID).SelfDeliverDelievering(savedOrderInfo.order)
}
case model.WaybillStatusDelivered:
if savedOrderInfo.order.VendorID != bill.WaybillVendorID {
s.GetPurchasePlatformFromVendorID(bill.OrderVendorID).SelfDeliverDelievered(savedOrderInfo.order)
}
s.removeWaybillFromMap(savedOrderInfo, bill)
}
} else {
globals.SugarLogger.Infof("OnWaybillStatusChanged can not find bill:%v in saved info", bill)
}
}
return nil
}
func (s *DefScheduler) addWaybill2Map(savedOrderInfo *WatchOrderInfo, bill *model.Waybill) (err error) {
for _, v := range savedOrderInfo.waybills {
if v.VendorWaybillID == bill.VendorWaybillID && v.WaybillVendorID == bill.WaybillVendorID {
// 如果已经存在,不做处理
globals.SugarLogger.Infof("addWaybill2Map bill:%v already exists", bill)
return nil
}
}
savedOrderInfo.waybills = append(savedOrderInfo.waybills, bill)
return nil
}
func (s *DefScheduler) createWaybillOn3rdProviders(order *model.GoodsOrder, excludeBill *model.Waybill) (err error) {
globals.SugarLogger.Debugf("createWaybillOn3rdProviders, order:%v", order)
successCount := 0
for k, v := range s.DeliveryPlatformHandlers {
if excludeBill == nil || k != excludeBill.WaybillVendorID {
if err = v.CreateWaybill(order); err == nil {
successCount++
}
}
}
if successCount != 0 {
return nil
}
return scheduler.ErrCanNotCreateAtLeastOneWaybill
}
func (s *DefScheduler) cancelOtherWaybills(savedOrderInfo *WatchOrderInfo, bill *model.Waybill) (err error) {
globals.SugarLogger.Debugf("cancelOtherWaybills, order:%v, bill:%v", savedOrderInfo.order, bill)
for _, v := range savedOrderInfo.waybills {
if v.WaybillVendorID != bill.OrderVendorID && !(v.WaybillVendorID == bill.WaybillVendorID && v.VendorWaybillID == bill.VendorWaybillID) {
_ = s.GetDeliveryPlatformFromVendorID(v.WaybillVendorID).CancelWaybill(v)
}
}
if bill.WaybillVendorID != bill.OrderVendorID {
s.swtich2SelfDeliverWithRetry(bill, 2, 10*time.Second)
}
return nil
}
func (s *DefScheduler) swtich2SelfDeliverWithRetry(bill *model.Waybill, retryCount int, duration time.Duration) {
utils.CallFuncRetryAsync(func(index int) error {
err := s.GetPurchasePlatformFromVendorID(bill.OrderVendorID).Swtich2SelfDeliver(bill.VendorOrderID)
if err == nil {
s.removeWaybillFromMap(nil, bill) // todo 是否在这里删除运单,还是在运单事件里处理更好些?
} else if index == 0 {
// 如果购买平台转商家自送失败最终还是要取消3方物流
s.GetDeliveryPlatformFromVendorID(bill.WaybillVendorID).CancelWaybill(bill)
}
return err
}, duration, retryCount)
}
func (s *DefScheduler) loadWatchOrderFromMap(vendorOrderID string, vendorID int) *WatchOrderInfo {
universalOrderID := jxutils.ComposeUniversalOrderID(vendorOrderID, vendorID)
var realSavedInfo *WatchOrderInfo
if savedInfo, ok := s.orderMap.Load(universalOrderID); ok {
realSavedInfo = savedInfo.(*WatchOrderInfo)
} else {
globals.SugarLogger.Infof("can not get saved order, vendorOrderID:%s, vendorID:%d, load it", vendorOrderID, vendorID)
if order, err := s.CurOrderManager.LoadOrder(vendorOrderID, vendorID); err == nil {
realSavedInfo = &WatchOrderInfo{
order: order,
}
s.orderMap.Store(universalOrderID, realSavedInfo)
} else {
globals.SugarLogger.Errorf("can not load order vendorOrderID:%s, vendorID:%d", vendorOrderID, vendorID)
}
}
return realSavedInfo
}
func (s *DefScheduler) removeWaybillFromMap(savedOrderInfo *WatchOrderInfo, bill *model.Waybill) {
if savedOrderInfo == nil {
savedOrderInfo = s.loadWatchOrderFromMap(bill.VendorOrderID, bill.OrderVendorID)
}
for k, v := range savedOrderInfo.waybills {
if v.VendorWaybillID == bill.VendorWaybillID && v.WaybillVendorID == bill.WaybillVendorID {
savedOrderInfo.waybills = append(savedOrderInfo.waybills[0:k], savedOrderInfo.waybills[k+1:]...)
break
}
}
}
func (s *DefScheduler) getLatestPickupTimeout(order *model.GoodsOrder, configTimeout time.Duration) (retVal time.Duration) {
beginTime := order.StatusTime
if order.ExpectedDeliveredTime != utils.DefaultTimeValue {
beginTime = order.ExpectedDeliveredTime.Add(-defTime2Delivered)
}
return jxutils.GetRealTimeout(beginTime, configTimeout)
}
func (s *DefScheduler) stopTimer(savedOrderInfo *WatchOrderInfo) {
if savedOrderInfo.timer != nil {
globals.SugarLogger.Debugf("stopTimer orderid:%v", savedOrderInfo.order.VendorOrderID)
savedOrderInfo.timer.Stop()
}
}
func (s *DefScheduler) resetTimer(status int, savedOrderInfo *WatchOrderInfo, gap time.Duration) {
globals.SugarLogger.Debugf("resetTimer status:%v, orderid:%v", status, savedOrderInfo.order.VendorOrderID)
s.stopTimer(savedOrderInfo)
config := s.mergeOrderStatusConfig(status, s.GetPurchasePlatformFromVendorID(savedOrderInfo.order.VendorID).GetStatusActionConfig(status))
if config != nil && config.TimeoutAction != nil {
var timeout time.Duration
if status == model.OrderStatusNew {
timeout = config.Timeout // 绝对值
} else if status == model.OrderStatusAccepted {
timeout = s.getLatestPickupTimeout(savedOrderInfo.order, config.Timeout)
} else {
timeout = jxutils.GetRealTimeout(savedOrderInfo.order.StatusTime, config.Timeout)
}
globals.SugarLogger.Debugf("resetTimer timeout:%v, orderid:%v", timeout, savedOrderInfo.order.VendorOrderID)
savedOrderInfo.timerStatus = status
savedOrderInfo.timer = time.AfterFunc(timeout+gap, func() {
config.TimeoutAction(savedOrderInfo.order)
})
}
}
func (s *DefScheduler) handleAutoAcceptOrder(orderID string, vendorID int, userMobile string, jxStoreID int, db orm.Ormer, handler func(accepted bool) error) int {
handleType := 0
if userMobile != "" {
if db == nil {
db = orm.NewOrm()
}
user := &models.BlackClient{
Mobile: userMobile,
}
if err := db.Read(user, "Mobile"); err != nil {
if err != orm.ErrNoRows {
globals.SugarLogger.Errorf("read data error:%v, data:%v, vendorID:%d", err, user, vendorID)
}
// 在访问数据库出错的情况下,也需要自动接单
handleType = 1
} else {
// 强制拒单
globals.SugarLogger.Infof("force reject order:%s, vendorID:%d", orderID, vendorID)
handleType = -1
}
} else {
globals.SugarLogger.Infof("order:%s, vendorID:%d, mobile is empty, should accept order", orderID, vendorID)
handleType = 1
}
if handleType == 1 {
handler(true)
} else if handleType == -1 {
handler(false)
}
return handleType
}
func (s *DefScheduler) mergeOrderStatusConfig(status int, config *scheduler.StatusActionConfig) (retVal *scheduler.StatusActionConfig) {
defConfig := s.defWorkflowConfig[status]
if defConfig == nil && config == nil {
return nil
}
retVal = &scheduler.StatusActionConfig{}
if defConfig != nil {
retVal.Timeout = defConfig.Timeout
retVal.TimeoutAction = defConfig.TimeoutAction
}
if config != nil {
if config.Timeout >= 0 {
retVal.Timeout = config.Timeout
}
if config.TimeoutAction != nil {
retVal.TimeoutAction = config.TimeoutAction
}
}
return retVal
}
func (s *DefScheduler) updateOrderByStatus(order *model.GoodsOrder, status *model.OrderStatus) (retVal *model.GoodsOrder) {
order.Status = status.Status
order.VendorStatus = status.VendorStatus
order.StatusTime = status.StatusTime
return order
}