package orderman import ( "errors" "sort" "time" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxcallback/scheduler" "git.rosy.net.cn/jx-callback/business/jxutils" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model/dao" "git.rosy.net.cn/jx-callback/business/partner" "git.rosy.net.cn/jx-callback/globals" ) const ( pendingOrderGapMax = 2 * 24 * time.Hour // 每次重启机子时,要检查几天内的订单状态 maxTimeHandlePendingOrder = 2 * time.Second //处理pending order的最长时间 maxSleepGapHandlePendingOrder = 5 * time.Millisecond // 每个pending order的最长时间间隙 ) var ( ErrCanNotFindOrder = errors.New("找不到相应订单") ErrCanNotFindWaybill = errors.New("找不到相应运单") ) var ( FixedOrderManager *OrderManager ) // 所有公共接口调用前,要求在order里或status中设置合适的Status type OrderManager struct { } func NewOrderManager() *OrderManager { return &OrderManager{} } type IStatusTimer interface { GetStatusTime() time.Time } type StatusTimerSlice []IStatusTimer func (s StatusTimerSlice) Len() int { return len(s) } func (s StatusTimerSlice) Less(i, j int) bool { return s[i].GetStatusTime().Sub(s[j].GetStatusTime()) < 0 } func (s StatusTimerSlice) Swap(i, j int) { tmp := s[i] s[i] = s[j] s[j] = tmp } func init() { FixedOrderManager = NewOrderManager() partner.InitOrderManager(FixedOrderManager) } // 美团回调错误信息 func addOrderOrWaybillStatus(status *model.OrderStatus, db *dao.DaoDB) (isDuplicated bool, err error) { if status.OrderType == model.OrderTypeOrder { globals.SugarLogger.Debugf("addOrderStatus order:%v", status) } else if status.OrderType == model.OrderTypeWaybill { globals.SugarLogger.Debugf("addOrderStatus waybill:%v", status) } else { globals.SugarLogger.Debugf("addOrderStatus afsOrder:%v", status) } txDB, _ := dao.Begin(db) defer func() { if r := recover(); r != nil || err != nil { globals.SugarLogger.Debug("rollback") dao.Rollback(db, txDB) if r != nil { panic(r) } } }() status.ID = 0 status.Remark = utils.LimitUTF8StringLen(status.Remark, 255) created, _, err := db.Db.ReadOrCreate(status, "VendorOrderID", "VendorID", "OrderType", "Status", "VendorStatus", "StatusTime") if err == nil { if !created { globals.SugarLogger.Debugf("duplicated event:%v", status) isDuplicated = true status.DuplicatedCount++ utils.CallFuncLogError(func() error { _, err = db.Db.Update(status, "DuplicatedCount") return err }, "addOrderOrWaybillStatus update DuplicatedCount, status:%v", status) } } if err != nil { // todo 这里居然会有主键重复错误,逻辑上是不应该的 globals.SugarLogger.Warnf("addOrderOrWaybillStatus status:%v, access db error:%v", status, err) } else { dao.Commit(db, txDB) } return isDuplicated, err } func (c *OrderManager) GetStatusDuplicatedCount(status *model.OrderStatus) (duplicatedCount int) { if status == nil { return 0 } db := dao.GetDB() if err := dao.GetEntity(db, status, "VendorOrderID", "VendorID", "OrderType", "VendorStatus", "StatusTime"); err == nil { return status.DuplicatedCount } return 0 } // todo 最好还是改成全事件回放算了 func LoadPendingOrders() { orders, err := dao.LoadPendingOrders(dao.GetDB(), time.Now().Add(-pendingOrderGapMax), model.OrderStatusEndBegin) globals.SugarLogger.Infof("LoadPendingOrders orders count:%d, err:%v", len(orders), err) if err != nil { return } ordersCount := len(orders) if ordersCount > 0 { bills := FixedOrderManager.LoadPendingWaybills() globals.SugarLogger.Infof("LoadPendingOrders waybills count:%d", len(bills)) var sortOrders StatusTimerSlice orderMap := make(map[string]*model.GoodsOrder) for _, order := range orders { if order.Status > model.OrderStatusNew { status := model.Order2Status(order) sortOrders = append(sortOrders, status) } // order.Status = model.OrderStatusNew // 就是要以实际order状态来调用scheduler.OnOrderNew order.StatusTime = order.OrderCreatedAt sortOrders = append(sortOrders, order) orderMap[jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID)] = order } for _, bill := range bills { if bill.Status > model.WaybillStatusNew { bill2 := *bill sortOrders = append(sortOrders, &bill2) } bill.Status = model.WaybillStatusNew bill.StatusTime = bill.WaybillCreatedAt sortOrders = append(sortOrders, bill) } sort.Sort(sortOrders) sleepGap := maxTimeHandlePendingOrder / time.Duration(ordersCount) if sleepGap > maxSleepGapHandlePendingOrder { sleepGap = maxSleepGapHandlePendingOrder } lastTime := time.Now() for _, item := range sortOrders { if order, ok := item.(*model.GoodsOrder); ok { jxutils.CallMsgHandlerAsync(func() { scheduler.CurrentScheduler.OnOrderNew(order, true, true) }, jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID)) } else if status, ok := item.(*model.OrderStatus); ok { jxutils.CallMsgHandlerAsync(func() { order := orderMap[jxutils.ComposeUniversalOrderID(status.VendorOrderID, status.VendorID)] scheduler.CurrentScheduler.OnOrderStatusChanged(order, status, true) }, jxutils.ComposeUniversalOrderID(status.RefVendorOrderID, status.RefVendorID)) } else { bill := item.(*model.Waybill) jxutils.CallMsgHandlerAsync(func() { scheduler.CurrentScheduler.OnWaybillStatusChanged(bill, true) }, jxutils.ComposeUniversalOrderID(bill.VendorOrderID, bill.OrderVendorID)) } curTime := time.Now() timeout := sleepGap - curTime.Sub(lastTime) if timeout > 0 { time.Sleep(timeout) } lastTime = curTime } } }