diff --git a/business/jxcallback/scheduler/defsch/defsch.go b/business/jxcallback/scheduler/defsch/defsch.go index 83832bf3f..5d0a0712c 100644 --- a/business/jxcallback/scheduler/defsch/defsch.go +++ b/business/jxcallback/scheduler/defsch/defsch.go @@ -63,14 +63,14 @@ var ( FixedScheduler *DefScheduler ) -// type tTimerInfo struct { -// statusType int -// vendorID int -// status int +type tTimerInfo struct { + statusType int + vendorID int + status int -// timer *time.Timer -// timerTime time.Time -// } + timer *time.Timer + timerTime time.Time +} type WatchOrderInfo struct { order *model.GoodsOrder // order里的信息是保持更新的 @@ -83,12 +83,12 @@ type WatchOrderInfo struct { waybills map[int]*model.Waybill // 这个waybills里的状态信息是不真实的,只使用id相关的信息 - timerStatusType int // 0表示订单,1表示运单 - timerStatus int - timer *time.Timer - timerTime time.Time + // timerStatusType int // 0表示订单,1表示运单 + // timerStatus int + // timer *time.Timer + // timerTime time.Time - // timerList []*tTimerInfo + timerList []*tTimerInfo retryCount int // 失败后尝试的次数,调试阶段可能出现死循化,阻止这种情况发生 } @@ -167,34 +167,34 @@ func (s *WatchOrderInfo) GetWaybillVendorIDs() (vendorIDs []int) { // orderType,-1:全部 // vendorID,-1:全部 // status,-1:全部 -// func (w *WatchOrderInfo) StopTimer(statusType, vendorID, status int) { -// var newTimerList []*tTimerInfo -// for _, timerInfo := range w.timerList { -// if (statusType == -1 || statusType == timerInfo.statusType) || -// (vendorID == -1 || vendorID == timerInfo.vendorID) || -// (status == -1 || status <= timerInfo.status) { -// if timerInfo.timer != nil { -// timerInfo.timer.Stop() -// timerInfo.timer = nil -// } -// } else { -// newTimerList = append(newTimerList, timerInfo) -// } -// } -// w.timerList = newTimerList -// } +func (w *WatchOrderInfo) StopTimer(statusType, vendorID, status int) { + var newTimerList []*tTimerInfo + for _, timerInfo := range w.timerList { + if (statusType == -1 || statusType == timerInfo.statusType) && + (vendorID == -1 || vendorID == timerInfo.vendorID) && + (status == -1 || status <= timerInfo.status) { + if timerInfo.timer != nil { + timerInfo.timer.Stop() + timerInfo.timer = nil + } + } else { + newTimerList = append(newTimerList, timerInfo) + } + } + w.timerList = newTimerList +} func (w *WatchOrderInfo) GetCreateWaybillTimeout() (timeoutSecond int) { - if w.timerStatusType == scheduler.TimerStatusTypeWaybill && w.timerStatus == model.WaybillStatusNew { - timeoutSecond = int(w.timerTime.Sub(time.Now()) / time.Second) - } - // for _, timerInfo := range w.timerList { - // if timerInfo.statusType == scheduler.TimerStatusTypeWaybill && - // timerInfo.status == model.WaybillStatusNew { - // timeoutSecond = int(timerInfo.timerTime.Sub(time.Now()) / time.Second) - // break - // } + // if w.timerStatusType == scheduler.TimerStatusTypeWaybill && w.timerStatus == model.WaybillStatusNew { + // timeoutSecond = int(w.timerTime.Sub(time.Now()) / time.Second) // } + for _, timerInfo := range w.timerList { + if timerInfo.statusType == scheduler.TimerStatusTypeWaybill && + timerInfo.status == model.WaybillStatusNew { + timeoutSecond = int(timerInfo.timerTime.Sub(time.Now()) / time.Second) + break + } + } return timeoutSecond } @@ -771,169 +771,169 @@ func (s *DefScheduler) loadSavedOrderFromMap(status *model.OrderStatus, isForceL return realSavedInfo } -// func (s *DefScheduler) resetTimer(savedOrderInfo *WatchOrderInfo, bill *model.Waybill, isPending bool) { -// order := savedOrderInfo.order -// status := order.Status -// statusType := scheduler.TimerStatusTypeOrder -// vendorID := order.VendorID -// statusTime := order.StatusTime -// if bill != nil { -// status = bill.Status -// statusType = scheduler.TimerStatusTypeWaybill -// vendorID = bill.WaybillVendorID -// statusTime = bill.StatusTime -// } -// globals.SugarLogger.Debugf("resetTimer, orderID:%s statusType:%d status:%d", order.VendorOrderID, statusType, status) -// config := s.mergeOrderStatusConfig(savedOrderInfo, statusTime, statusType, status) - -// stopStatusType := statusType -// stopStatus := status -// if statusType == scheduler.TimerStatusTypeOrder { -// if status > model.OrderStatusEndBegin { -// stopStatusType = -1 -// stopStatus = -1 -// } -// } -// if config == nil || config.TimerType != partner.TimerTypeByPass { -// savedOrderInfo.StopTimer(stopStatusType, -1, stopStatus) -// } - -// if config != nil && config.TimeoutAction != nil && config.TimerType != partner.TimerTypeByPass { -// if config.CallShouldSetTimer(savedOrderInfo, bill) { -// timeout := config.GetRefTimeout(statusTime, order.OrderCreatedAt) -// if config.TimeoutGap != 0 { -// timeout += time.Duration(rand.Intn(int(config.TimeoutGap))) * time.Second -// } -// if isPending && timeout < pendingOrderTimerMaxSecond*time.Second { // 如果是PENDING的订单,则将其分布到2--5秒内,让后续事件有机会执行 -// timeout = time.Duration(jxutils.MapValue2Scope(int64(timeout), -pendingOrderTimerMinMinSecond*1000, pendingOrderTimerMaxSecond*1000, pendingOrderTimerMinSecond*1000, pendingOrderTimerMaxSecond*1000)) * time.Millisecond -// } else if timeout < 0 { -// timeout = 0 -// } -// if timeout == 0 { -// config.CallTimeoutAction(savedOrderInfo, bill) -// } else { -// timerName := "" -// if statusType == model.OrderTypeOrder { -// timerName = model.OrderStatusName[status] -// } else if statusType == model.OrderTypeWaybill { -// timerName = model.WaybillStatusName[status] -// } -// timerInfo := &tTimerInfo{ -// statusType: statusType, -// vendorID: vendorID, -// status: status, -// timerTime: time.Now().Add(timeout), -// } -// timerInfo.timer = utils.AfterFuncWithRecover(timeout, func() { -// jxutils.CallMsgHandlerAsync(func() { -// globals.SugarLogger.Debugf("fire timer:%s, orderID:%s", timerName, order.VendorOrderID) -// ts := s.loadSavedOrderFromMap(model.Order2Status(order), true) -// config.CallTimeoutAction(ts, bill) -// timerInfo.timer = nil -// ts.StopTimer(statusType, vendorID, status) -// }, jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID)) -// }) -// } -// globals.SugarLogger.Debugf("resetTimer, orderID:%s, statusType:%d, status:%d, timeout:%v", order.VendorOrderID, statusType, status, timeout) -// } else { -// globals.SugarLogger.Debugf("resetTimer, orderID:%s, statusType:%d, status:%d, should not set timer", order.VendorOrderID, statusType, status) -// } -// } else { -// globals.SugarLogger.Debugf("resetTimer bypass2, orderID:%s statusType:%d status:%v, config:%s", order.VendorOrderID, statusType, status, utils.Format4Output(config, true)) -// } -// } - -// func (s *DefScheduler) stopTimer(savedOrderInfo *WatchOrderInfo) { -// savedOrderInfo.StopTimer(0, -1, 0) -// } - -func (s *DefScheduler) stopTimer(savedOrderInfo *WatchOrderInfo) { - if savedOrderInfo.timer != nil { - globals.SugarLogger.Debugf("stopTimer orderID:%s", savedOrderInfo.order.VendorOrderID) - savedOrderInfo.timer.Stop() - savedOrderInfo.timerStatus = model.OrderStatusUnknown - savedOrderInfo.timerStatusType = scheduler.TimerStatusTypeUnknown - savedOrderInfo.timer = nil - } -} - func (s *DefScheduler) resetTimer(savedOrderInfo *WatchOrderInfo, bill *model.Waybill, isPending bool) { order := savedOrderInfo.order status := order.Status statusType := scheduler.TimerStatusTypeOrder + vendorID := order.VendorID statusTime := order.StatusTime if bill != nil { status = bill.Status statusType = scheduler.TimerStatusTypeWaybill + vendorID = bill.WaybillVendorID statusTime = bill.StatusTime } globals.SugarLogger.Debugf("resetTimer, orderID:%s statusType:%d status:%d", order.VendorOrderID, statusType, status) - if isStatusNewer(order.VendorID, savedOrderInfo.timerStatusType, savedOrderInfo.timerStatus, statusType, status) { // 新设置的TIMER不能覆盖状态在其后的TIMER,如果状态回绕,需要注意 - config := s.mergeOrderStatusConfig(savedOrderInfo, statusTime, statusType, status) - if config == nil || config.TimerType != partner.TimerTypeByPass { - s.stopTimer(savedOrderInfo) + config := s.mergeOrderStatusConfig(savedOrderInfo, statusTime, statusType, status) + + stopStatusType := statusType + stopStatus := status + if statusType == scheduler.TimerStatusTypeOrder { + if status >= model.OrderStatusDelivering { + stopStatusType = -1 + stopStatus = -1 } - if config != nil && config.TimeoutAction != nil && config.TimerType != partner.TimerTypeByPass { - if config.CallShouldSetTimer(savedOrderInfo, bill) { - timeout := config.GetRefTimeout(statusTime, order.OrderCreatedAt) - if config.TimeoutGap != 0 { - timeout += time.Duration(rand.Intn(int(config.TimeoutGap))) * time.Second - } - if isPending && timeout < pendingOrderTimerMaxSecond*time.Second { // 如果是PENDING的订单,则将其分布到2--5秒内,让后续事件有机会执行 - timeout = time.Duration(jxutils.MapValue2Scope(int64(timeout), -pendingOrderTimerMinMinSecond*1000, pendingOrderTimerMaxSecond*1000, pendingOrderTimerMinSecond*1000, pendingOrderTimerMaxSecond*1000)) * time.Millisecond - } else if timeout < 0 { - timeout = 0 - } - if timeout == 0 { - config.CallTimeoutAction(savedOrderInfo, bill) - } else { - timerName := "" - if statusType == scheduler.TimerStatusTypeOrder { - timerName = model.OrderStatusName[status] - } else if statusType == scheduler.TimerStatusTypeWaybill { - timerName = model.WaybillStatusName[status] - } - savedOrderInfo.timerStatusType = statusType - savedOrderInfo.timerStatus = status - savedOrderInfo.timerTime = time.Now().Add(timeout) - savedOrderInfo.timer = utils.AfterFuncWithRecover(timeout, func() { - jxutils.CallMsgHandlerAsync(func() { - globals.SugarLogger.Debugf("fire timer:%s, orderID:%s", timerName, order.VendorOrderID) - savedOrderInfo := s.loadSavedOrderFromMap(model.Order2Status(order), true) - config.CallTimeoutAction(savedOrderInfo, bill) - savedOrderInfo.timerStatus = 0 - savedOrderInfo.timerStatusType = scheduler.TimerStatusTypeUnknown - }, jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID)) - }) - } - globals.SugarLogger.Debugf("resetTimer, orderID:%s, statusType:%d, status:%d, timeout:%v", order.VendorOrderID, statusType, status, timeout) - } else { - globals.SugarLogger.Debugf("resetTimer, orderID:%s, statusType:%d, status:%d, should not set timer", order.VendorOrderID, statusType, status) + } + if config == nil || config.TimerType != partner.TimerTypeByPass { + savedOrderInfo.StopTimer(stopStatusType, -1, stopStatus) + } + + if config != nil && config.TimeoutAction != nil && config.TimerType != partner.TimerTypeByPass { + if config.CallShouldSetTimer(savedOrderInfo, bill) { + timeout := config.GetRefTimeout(statusTime, order.OrderCreatedAt) + if config.TimeoutGap != 0 { + timeout += time.Duration(rand.Intn(int(config.TimeoutGap))) * time.Second } + if isPending && timeout < pendingOrderTimerMaxSecond*time.Second { // 如果是PENDING的订单,则将其分布到2--5秒内,让后续事件有机会执行 + timeout = time.Duration(jxutils.MapValue2Scope(int64(timeout), -pendingOrderTimerMinMinSecond*1000, pendingOrderTimerMaxSecond*1000, pendingOrderTimerMinSecond*1000, pendingOrderTimerMaxSecond*1000)) * time.Millisecond + } else if timeout < 0 { + timeout = 0 + } + if timeout == 0 { + config.CallTimeoutAction(savedOrderInfo, bill) + } else { + timerName := "" + if statusType == model.OrderTypeOrder { + timerName = model.OrderStatusName[status] + } else if statusType == model.OrderTypeWaybill { + timerName = model.WaybillStatusName[status] + } + timerInfo := &tTimerInfo{ + statusType: statusType, + vendorID: vendorID, + status: status, + timerTime: time.Now().Add(timeout), + } + timerInfo.timer = utils.AfterFuncWithRecover(timeout, func() { + jxutils.CallMsgHandlerAsync(func() { + globals.SugarLogger.Debugf("fire timer:%s, orderID:%s", timerName, order.VendorOrderID) + ts := s.loadSavedOrderFromMap(model.Order2Status(order), true) + config.CallTimeoutAction(ts, bill) + timerInfo.timer = nil + ts.StopTimer(statusType, vendorID, status) + }, jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID)) + }) + } + globals.SugarLogger.Debugf("resetTimer, orderID:%s, statusType:%d, status:%d, timeout:%v", order.VendorOrderID, statusType, status, timeout) } else { - globals.SugarLogger.Debugf("resetTimer bypass2, orderID:%s statusType:%d status:%v, config:%s", order.VendorOrderID, statusType, status, utils.Format4Output(config, true)) + globals.SugarLogger.Debugf("resetTimer, orderID:%s, statusType:%d, status:%d, should not set timer", order.VendorOrderID, statusType, status) } } else { - globals.SugarLogger.Debugf("resetTimer bypass1, orderID:%s statusType:%d status:%v", order.VendorOrderID, statusType, status) + globals.SugarLogger.Debugf("resetTimer bypass2, orderID:%s statusType:%d status:%v, config:%s", order.VendorOrderID, statusType, status, utils.Format4Output(config, true)) } } -func isStatusNewer(vendorID int, curStatusType, curStatus, statusType, status int) bool { - // 拣货完成及之前的订单事件TIMER不能覆盖运单TIMER(一般是消息错序引起的) - // 美团订单在接单后就会收到新运单事件,因当前只支持一个TIMER,暂时舍弃三方配送调度,而要自动拣货调度 - if vendorID != model.VendorIDMTWM { - if curStatusType == scheduler.TimerStatusTypeWaybill && statusType == scheduler.TimerStatusTypeOrder && status <= model.OrderStatusFinishedPickup { - return false - } - } else { - return statusType == scheduler.TimerStatusTypeOrder && status >= curStatus - } - if curStatusType == scheduler.TimerStatusTypeWaybill { - return curStatus != status - } - return curStatusType != statusType || status >= curStatus +func (s *DefScheduler) stopTimer(savedOrderInfo *WatchOrderInfo) { + savedOrderInfo.StopTimer(-1, -1, -1) } +// func (s *DefScheduler) stopTimer(savedOrderInfo *WatchOrderInfo) { +// if savedOrderInfo.timer != nil { +// globals.SugarLogger.Debugf("stopTimer orderID:%s", savedOrderInfo.order.VendorOrderID) +// savedOrderInfo.timer.Stop() +// savedOrderInfo.timerStatus = model.OrderStatusUnknown +// savedOrderInfo.timerStatusType = scheduler.TimerStatusTypeUnknown +// savedOrderInfo.timer = nil +// } +// } + +// func (s *DefScheduler) resetTimer(savedOrderInfo *WatchOrderInfo, bill *model.Waybill, isPending bool) { +// order := savedOrderInfo.order +// status := order.Status +// statusType := scheduler.TimerStatusTypeOrder +// statusTime := order.StatusTime +// if bill != nil { +// status = bill.Status +// statusType = scheduler.TimerStatusTypeWaybill +// statusTime = bill.StatusTime +// } +// globals.SugarLogger.Debugf("resetTimer, orderID:%s statusType:%d status:%d", order.VendorOrderID, statusType, status) +// if isStatusNewer(order.VendorID, savedOrderInfo.timerStatusType, savedOrderInfo.timerStatus, statusType, status) { // 新设置的TIMER不能覆盖状态在其后的TIMER,如果状态回绕,需要注意 +// config := s.mergeOrderStatusConfig(savedOrderInfo, statusTime, statusType, status) +// if config == nil || config.TimerType != partner.TimerTypeByPass { +// s.stopTimer(savedOrderInfo) +// } +// if config != nil && config.TimeoutAction != nil && config.TimerType != partner.TimerTypeByPass { +// if config.CallShouldSetTimer(savedOrderInfo, bill) { +// timeout := config.GetRefTimeout(statusTime, order.OrderCreatedAt) +// if config.TimeoutGap != 0 { +// timeout += time.Duration(rand.Intn(int(config.TimeoutGap))) * time.Second +// } +// if isPending && timeout < pendingOrderTimerMaxSecond*time.Second { // 如果是PENDING的订单,则将其分布到2--5秒内,让后续事件有机会执行 +// timeout = time.Duration(jxutils.MapValue2Scope(int64(timeout), -pendingOrderTimerMinMinSecond*1000, pendingOrderTimerMaxSecond*1000, pendingOrderTimerMinSecond*1000, pendingOrderTimerMaxSecond*1000)) * time.Millisecond +// } else if timeout < 0 { +// timeout = 0 +// } +// if timeout == 0 { +// config.CallTimeoutAction(savedOrderInfo, bill) +// } else { +// timerName := "" +// if statusType == scheduler.TimerStatusTypeOrder { +// timerName = model.OrderStatusName[status] +// } else if statusType == scheduler.TimerStatusTypeWaybill { +// timerName = model.WaybillStatusName[status] +// } +// savedOrderInfo.timerStatusType = statusType +// savedOrderInfo.timerStatus = status +// savedOrderInfo.timerTime = time.Now().Add(timeout) +// savedOrderInfo.timer = utils.AfterFuncWithRecover(timeout, func() { +// jxutils.CallMsgHandlerAsync(func() { +// globals.SugarLogger.Debugf("fire timer:%s, orderID:%s", timerName, order.VendorOrderID) +// savedOrderInfo := s.loadSavedOrderFromMap(model.Order2Status(order), true) +// config.CallTimeoutAction(savedOrderInfo, bill) +// savedOrderInfo.timerStatus = 0 +// savedOrderInfo.timerStatusType = scheduler.TimerStatusTypeUnknown +// }, jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID)) +// }) +// } +// globals.SugarLogger.Debugf("resetTimer, orderID:%s, statusType:%d, status:%d, timeout:%v", order.VendorOrderID, statusType, status, timeout) +// } else { +// globals.SugarLogger.Debugf("resetTimer, orderID:%s, statusType:%d, status:%d, should not set timer", order.VendorOrderID, statusType, status) +// } +// } else { +// globals.SugarLogger.Debugf("resetTimer bypass2, orderID:%s statusType:%d status:%v, config:%s", order.VendorOrderID, statusType, status, utils.Format4Output(config, true)) +// } +// } else { +// globals.SugarLogger.Debugf("resetTimer bypass1, orderID:%s statusType:%d status:%v", order.VendorOrderID, statusType, status) +// } +// } + +// func isStatusNewer(vendorID int, curStatusType, curStatus, statusType, status int) bool { +// // 拣货完成及之前的订单事件TIMER不能覆盖运单TIMER(一般是消息错序引起的) +// // 美团订单在接单后就会收到新运单事件,因当前只支持一个TIMER,暂时舍弃三方配送调度,而要自动拣货调度 +// if vendorID != model.VendorIDMTWM { +// if curStatusType == scheduler.TimerStatusTypeWaybill && statusType == scheduler.TimerStatusTypeOrder && status <= model.OrderStatusFinishedPickup { +// return false +// } +// } else { +// return statusType == scheduler.TimerStatusTypeOrder && status >= curStatus +// } +// if curStatusType == scheduler.TimerStatusTypeWaybill { +// return curStatus != status +// } +// return curStatusType != statusType || status >= curStatus +// } + func (s *DefScheduler) mergeOrderStatusConfig(savedOrderInfo *WatchOrderInfo, statusTime time.Time, statusType, status int) (retVal *StatusActionConfig) { s.locker.RLock() defer func() {