- first version of order schedule.
This commit is contained in:
@@ -1,6 +1,10 @@
|
||||
package defsch
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"git.rosy.net.cn/baseapi/utils"
|
||||
"git.rosy.net.cn/jx-callback/business/jxutils"
|
||||
"git.rosy.net.cn/jx-callback/business/model"
|
||||
"git.rosy.net.cn/jx-callback/business/scheduler"
|
||||
"git.rosy.net.cn/jx-callback/globals"
|
||||
@@ -8,50 +12,235 @@ import (
|
||||
"github.com/astaxie/beego/orm"
|
||||
)
|
||||
|
||||
const (
|
||||
defTime2Delivery = 1 * time.Hour
|
||||
)
|
||||
|
||||
type WatchOrderInfo struct {
|
||||
order *model.GoodsOrder
|
||||
waybills []*model.Waybill
|
||||
timer *time.Timer
|
||||
}
|
||||
|
||||
// 重要:此调度器要求同一定单的处理逻辑必须是序列化了的,不然会有并发问题
|
||||
type DefScheduler struct {
|
||||
scheduler.BaseScheduler
|
||||
defWorkflowConfig map[int]*scheduler.StatusActionConfig
|
||||
orderMap jxutils.SyncMapWithTimeout
|
||||
}
|
||||
|
||||
func init() {
|
||||
sch := &DefScheduler{}
|
||||
sch.Init()
|
||||
scheduler.CurrentScheduler = sch
|
||||
|
||||
sch.defWorkflowConfig = map[int]*scheduler.StatusActionConfig{
|
||||
model.OrderStatusNew: &scheduler.StatusActionConfig{ // 自动接单
|
||||
Timeout: 1 * time.Second,
|
||||
TimeoutAction: func(order *model.GoodsOrder) (err error) {
|
||||
_ = sch.handleAutoAcceptOrder(order.VendorOrderID, order.VendorID, order.ConsigneeMobile, jxutils.GetJxStoreIDFromOrder(order), nil, func(isAcceptIt bool) error {
|
||||
return sch.GetPurchasePlatformFromVendorID(order.VendorID).AcceptOrRefuseOrder(order, isAcceptIt)
|
||||
})
|
||||
return nil
|
||||
},
|
||||
},
|
||||
model.OrderStatusAccepted: &scheduler.StatusActionConfig{ // 20分钟后自动拣货
|
||||
Timeout: 20 * time.Minute,
|
||||
TimeoutAction: func(order *model.GoodsOrder) (err error) {
|
||||
return sch.GetPurchasePlatformFromVendorID(order.VendorID).PickedUpGoods(order)
|
||||
},
|
||||
},
|
||||
model.OrderStatusFinishedPickup: &scheduler.StatusActionConfig{ // 拣货完成8分钟后开始在外部快递平台创建运单
|
||||
Timeout: 8 * time.Minute,
|
||||
TimeoutAction: func(order *model.GoodsOrder) (err error) {
|
||||
return sch.createWaybillOn3rdProviders(order)
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// 以下是订单
|
||||
func (s *DefScheduler) OnOrderNew(order *model.GoodsOrder) (err error) {
|
||||
return nil
|
||||
config := s.mergeOrderStartConfig(order.Status, s.GetPurchasePlatformFromVendorID(order.VendorID).GetStatusActionConfig(order.Status))
|
||||
watchInfo := &WatchOrderInfo{
|
||||
order: order,
|
||||
timer: time.AfterFunc(jxutils.GetRealTimeout(order.OrderCreatedAt, config.Timeout), func() {
|
||||
config.TimeoutAction(order)
|
||||
}),
|
||||
}
|
||||
s.orderMap.Store(jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID), watchInfo)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *DefScheduler) OnOrderStatusChanged(status *model.OrderStatus) (err error) {
|
||||
return nil
|
||||
if savedOrderInfo := s.loadWatchOrderFromMap(status.VendorOrderID, status.VendorID); savedOrderInfo != nil {
|
||||
if savedOrderInfo.timer != nil {
|
||||
savedOrderInfo.timer.Stop()
|
||||
}
|
||||
if status.Status < model.OrderStatusEndBegin {
|
||||
s.updateOrderByStatus(savedOrderInfo.order, status)
|
||||
config := s.mergeOrderStartConfig(status.Status, s.GetPurchasePlatformFromVendorID(status.VendorID).GetStatusActionConfig(status.Status))
|
||||
if config.TimeoutAction != nil {
|
||||
var timeout time.Duration
|
||||
if status.Status == model.OrderStatusAccepted {
|
||||
savedOrderInfo := s.loadWatchOrderFromMap(status.VendorOrderID, status.VendorID)
|
||||
timeout = s.getLatestPickupTimeout(savedOrderInfo.order, config.Timeout)
|
||||
} else {
|
||||
timeout = jxutils.GetRealTimeout(savedOrderInfo.order.StatusTime, config.Timeout)
|
||||
}
|
||||
savedOrderInfo.timer = time.AfterFunc(timeout, func() {
|
||||
config.TimeoutAction(savedOrderInfo.order)
|
||||
})
|
||||
}
|
||||
} else {
|
||||
s.orderMap.Delete(jxutils.GetUniversalOrderIDFromOrderStatus(status))
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// 以下是运单
|
||||
func (s *DefScheduler) OnWaybillStatusChanged(bill *model.Waybill) (err error) {
|
||||
if bill.Status == model.WaybillStatusAccepted {
|
||||
s.CurOrderManager.UpdateWaybillVendorID(bill)
|
||||
} else if bill.Status == model.WaybillStatusAcceptCanceled {
|
||||
bill.WaybillVendorID = model.VendorIDUnknown
|
||||
s.CurOrderManager.UpdateWaybillVendorID(bill)
|
||||
savedOrderInfo := s.loadWatchOrderFromMap(bill.VendorOrderID, bill.OrderVendorID)
|
||||
if bill.Status == model.WaybillStatusNew {
|
||||
err = s.addWaybill2Map(bill)
|
||||
} else {
|
||||
findIt := false
|
||||
for _, v := range savedOrderInfo.waybills {
|
||||
if v.VendorWaybillID == bill.VendorWaybillID && v.WaybillVendorID == bill.WaybillVendorID {
|
||||
findIt = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if findIt {
|
||||
switch bill.Status {
|
||||
case model.WaybillStatusAccepted:
|
||||
s.cancelOtherWaybills(bill)
|
||||
if bill.WaybillVendorID != bill.OrderVendorID {
|
||||
s.swtich2SelfDeliverWithRetry(bill, 2, 10*time.Second)
|
||||
}
|
||||
s.CurOrderManager.UpdateWaybillVendorID(bill)
|
||||
savedOrderInfo.order.WaybillVendorID = bill.WaybillVendorID
|
||||
case model.WaybillStatusAcceptCanceled, model.WaybillStatusCanceled, model.WaybillStatusFailed:
|
||||
s.removeWaybillFromMap(bill, savedOrderInfo)
|
||||
if savedOrderInfo.order.WaybillVendorID == bill.WaybillVendorID {
|
||||
s.createWaybillOn3rdProviders(savedOrderInfo.order)
|
||||
bill.WaybillVendorID = model.VendorIDUnknown
|
||||
s.CurOrderManager.UpdateWaybillVendorID(bill)
|
||||
}
|
||||
case model.WaybillStatusDelivering:
|
||||
s.GetPurchasePlatformFromVendorID(bill.OrderVendorID).SelfDeliverDelievering(savedOrderInfo.order)
|
||||
case model.WaybillStatusDelivered:
|
||||
s.GetPurchasePlatformFromVendorID(bill.OrderVendorID).SelfDeliverDelievered(savedOrderInfo.order)
|
||||
s.removeWaybillFromMap(bill, savedOrderInfo)
|
||||
}
|
||||
} else {
|
||||
globals.SugarLogger.Infof("OnWaybillStatusChanged can not find bill:%v in saved info", bill)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *DefScheduler) getPurchasePlatformFromVendorID(vendorID int) scheduler.PurchasePlatformHandler {
|
||||
return s.PurchasePlatformHandlers[vendorID]
|
||||
func (s *DefScheduler) addWaybill2Map(bill *model.Waybill) (err error) {
|
||||
savedOrderInfo := s.loadWatchOrderFromMap(bill.VendorOrderID, bill.OrderVendorID)
|
||||
for _, v := range savedOrderInfo.waybills {
|
||||
if v.VendorWaybillID == bill.VendorWaybillID && v.WaybillVendorID == bill.WaybillVendorID {
|
||||
// 如果已经存在,不做处理
|
||||
globals.SugarLogger.Infof("addWaybill2Map bill:%v already exists", bill)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
savedOrderInfo.waybills = append(savedOrderInfo.waybills, bill)
|
||||
return nil
|
||||
}
|
||||
|
||||
// c.handleAutoAcceptOrder(order.VendorOrderID, order.VendorID, order.ConsigneeMobile, order.StoreID, db, func(isAccept bool) {
|
||||
// // c.purchasePlatformHandlers[order.VendorID].AcceptOrRefuseOrder(order, isAccept)
|
||||
// if isAccept {
|
||||
// order.Status = model.OrderStatusAccepted
|
||||
// } else {
|
||||
// order.Status = model.OrderStatusFailed
|
||||
// }
|
||||
// })
|
||||
func (s *DefScheduler) onWaybillFailed(bill *model.Waybill) (err error) {
|
||||
savedOrderInfo := s.loadWatchOrderFromMap(bill.VendorOrderID, bill.OrderVendorID)
|
||||
s.removeWaybillFromMap(bill, savedOrderInfo)
|
||||
if len(savedOrderInfo.waybills) == 0 {
|
||||
s.createWaybillOn3rdProviders(savedOrderInfo.order)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *DefScheduler) handleAutoAcceptOrder(orderID string, vendorID int, userMobile string, jxStoreID int, db orm.Ormer, handler func(accepted bool)) int {
|
||||
func (s *DefScheduler) createWaybillOn3rdProviders(order *model.GoodsOrder) (err error) {
|
||||
successCount := 0
|
||||
for _, v := range s.DeliveryPlatformHandlers {
|
||||
if err = v.CreateWaybill(order); err == nil {
|
||||
successCount++
|
||||
}
|
||||
}
|
||||
if successCount != 0 {
|
||||
return nil
|
||||
}
|
||||
return scheduler.ErrCanNotCreateAtLeastOneWaybill
|
||||
}
|
||||
|
||||
func (s *DefScheduler) cancelOtherWaybills(bill *model.Waybill) (err error) {
|
||||
savedOrderInfo := s.loadWatchOrderFromMap(bill.VendorOrderID, bill.OrderVendorID)
|
||||
for _, v := range savedOrderInfo.waybills {
|
||||
if !(v.WaybillVendorID == bill.WaybillVendorID && v.VendorWaybillID == bill.VendorWaybillID) {
|
||||
_ = s.GetDeliveryPlatformFromVendorID(v.WaybillVendorID).CancelWaybill(v)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *DefScheduler) swtich2SelfDeliverWithRetry(bill *model.Waybill, retryCount int, duration time.Duration) {
|
||||
if err := s.GetPurchasePlatformFromVendorID(bill.OrderVendorID).Swtich2SelfDeliver(bill.VendorOrderID); err != nil {
|
||||
if retryCount >= 0 {
|
||||
time.AfterFunc(duration, func() {
|
||||
s.swtich2SelfDeliverWithRetry(bill, retryCount-1, duration)
|
||||
})
|
||||
} else {
|
||||
// 如果购买平台转商家自送失败,最终还是要取消3方物流
|
||||
s.GetDeliveryPlatformFromVendorID(bill.WaybillVendorID).CancelWaybill(bill)
|
||||
}
|
||||
} else {
|
||||
s.removeWaybillFromMap(bill, nil)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *DefScheduler) loadWatchOrderFromMap(vendorOrderID string, vendorID int) *WatchOrderInfo {
|
||||
universalOrderID := jxutils.ComposeUniversalOrderID(vendorOrderID, vendorID)
|
||||
var realSavedInfo *WatchOrderInfo
|
||||
if savedInfo, ok := s.orderMap.Load(universalOrderID); ok {
|
||||
realSavedInfo = savedInfo.(*WatchOrderInfo)
|
||||
} else {
|
||||
globals.SugarLogger.Infof("can not get saved order, vendorOrderID:%s, vendorID:%d, load it", vendorOrderID, vendorID)
|
||||
if order, err := s.CurOrderManager.LoadOrder(vendorOrderID, vendorID); err == nil {
|
||||
realSavedInfo = &WatchOrderInfo{
|
||||
order: order,
|
||||
}
|
||||
s.orderMap.Store(universalOrderID, realSavedInfo)
|
||||
} else {
|
||||
globals.SugarLogger.Errorf("can not load order vendorOrderID:%s, vendorID:%d", vendorOrderID, vendorID)
|
||||
}
|
||||
}
|
||||
return realSavedInfo
|
||||
}
|
||||
|
||||
func (s *DefScheduler) removeWaybillFromMap(bill *model.Waybill, savedOrderInfo *WatchOrderInfo) {
|
||||
if savedOrderInfo == nil {
|
||||
savedOrderInfo = s.loadWatchOrderFromMap(bill.VendorOrderID, bill.OrderVendorID)
|
||||
}
|
||||
for k, v := range savedOrderInfo.waybills {
|
||||
if v.VendorWaybillID == bill.VendorWaybillID && v.WaybillVendorID == bill.WaybillVendorID {
|
||||
savedOrderInfo.waybills = append(savedOrderInfo.waybills[0:k], savedOrderInfo.waybills[k+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *DefScheduler) getLatestPickupTimeout(order *model.GoodsOrder, configTimeout time.Duration) (retVal time.Duration) {
|
||||
beginTime := order.StatusTime
|
||||
if order.ExpectedDeliveredTime != utils.DefaultTimeValue {
|
||||
beginTime = order.ExpectedDeliveredTime.Add(-defTime2Delivery)
|
||||
}
|
||||
return jxutils.GetRealTimeout(beginTime, configTimeout)
|
||||
}
|
||||
|
||||
func (s *DefScheduler) handleAutoAcceptOrder(orderID string, vendorID int, userMobile string, jxStoreID int, db orm.Ormer, handler func(accepted bool) error) int {
|
||||
handleType := 0
|
||||
if userMobile != "" {
|
||||
if db == nil {
|
||||
@@ -83,3 +272,26 @@ func (s *DefScheduler) handleAutoAcceptOrder(orderID string, vendorID int, userM
|
||||
}
|
||||
return handleType
|
||||
}
|
||||
|
||||
func (s *DefScheduler) mergeOrderStartConfig(status int, config *scheduler.StatusActionConfig) (retVal *scheduler.StatusActionConfig) {
|
||||
retVal = &scheduler.StatusActionConfig{
|
||||
Timeout: s.defWorkflowConfig[status].Timeout,
|
||||
TimeoutAction: s.defWorkflowConfig[status].TimeoutAction,
|
||||
}
|
||||
if config != nil {
|
||||
if config.Timeout >= 0 {
|
||||
retVal.Timeout = config.Timeout
|
||||
}
|
||||
if config.TimeoutAction != nil {
|
||||
retVal.TimeoutAction = config.TimeoutAction
|
||||
}
|
||||
}
|
||||
return retVal
|
||||
}
|
||||
|
||||
func (s *DefScheduler) updateOrderByStatus(order *model.GoodsOrder, status *model.OrderStatus) (retVal *model.GoodsOrder) {
|
||||
order.Status = status.Status
|
||||
order.VendorStatus = status.VendorStatus
|
||||
order.StatusTime = status.StatusTime
|
||||
return order
|
||||
}
|
||||
|
||||
@@ -13,26 +13,25 @@ var (
|
||||
)
|
||||
|
||||
var (
|
||||
ErrStatusIsNotOKForOperation = errors.New("当前状态操作无效")
|
||||
ErrStatusIsNotOKForOperation = errors.New("当前状态操作无效")
|
||||
ErrCanNotCreateAtLeastOneWaybill = errors.New("一个运单都不能创建")
|
||||
)
|
||||
|
||||
type StatusTimeoutAction struct {
|
||||
Action func(order *model.GoodsOrder) (err error)
|
||||
Timeout time.Duration
|
||||
}
|
||||
|
||||
type StatusConfig struct {
|
||||
handler PurchasePlatformHandler
|
||||
AutoStatusChange map[int]*StatusTimeoutAction
|
||||
type StatusActionConfig struct {
|
||||
Timeout time.Duration // 超时时间,为0的话表示立即执行
|
||||
TimeoutAction func(order *model.GoodsOrder) (err error) // 超时后需要执行的动作,为nil表示此状态不需要执行监控
|
||||
}
|
||||
|
||||
type PurchasePlatformHandler interface {
|
||||
AcceptOrRefuseOrder(order *model.GoodsOrder, isAcceptIt bool) (err error)
|
||||
PickedUpGoods(order *model.GoodsOrder) (err error)
|
||||
Swtich2SelfDeliver(order *model.GoodsOrder) (err error)
|
||||
SelfDeliverPickedUpGoods(order *model.GoodsOrder) (err error)
|
||||
|
||||
Swtich2SelfDeliver(vendorOrderID string) (err error)
|
||||
|
||||
SelfDeliverDelievering(order *model.GoodsOrder) (err error)
|
||||
SelfDeliverDelievered(order *model.GoodsOrder) (err error)
|
||||
|
||||
GetStatusActionConfig(status int) *StatusActionConfig
|
||||
}
|
||||
|
||||
type DeliveryPlatformHandler interface {
|
||||
@@ -41,6 +40,7 @@ type DeliveryPlatformHandler interface {
|
||||
}
|
||||
|
||||
type OrderManager interface {
|
||||
LoadOrder(vendorOrderID string, vendorID int) (order *model.GoodsOrder, err error)
|
||||
OnOrderStatusChanged(status *model.OrderStatus) (err error) // 此消息是否使用还不确定
|
||||
UpdateWaybillVendorID(bill *model.Waybill) (err error)
|
||||
}
|
||||
@@ -92,3 +92,18 @@ func (c *BaseScheduler) RegisterDeliveryPlatform(vendorID int, handler DeliveryP
|
||||
}
|
||||
c.DeliveryPlatformHandlers[vendorID] = handler
|
||||
}
|
||||
|
||||
func (c *BaseScheduler) GetPurchasePlatformFromVendorID(vendorID int) PurchasePlatformHandler {
|
||||
return c.PurchasePlatformHandlers[vendorID]
|
||||
}
|
||||
|
||||
func (c *BaseScheduler) GetDeliveryPlatformFromVendorID(vendorID int) DeliveryPlatformHandler {
|
||||
return c.DeliveryPlatformHandlers[vendorID]
|
||||
}
|
||||
|
||||
type BasePurchasePlatform struct {
|
||||
}
|
||||
|
||||
func (p *BasePurchasePlatform) GetStatusActionConfig(status int) *StatusActionConfig {
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user