- refactor
- load pending orders, fixed bug.
This commit is contained in:
@@ -35,10 +35,13 @@ func addOrderOrWaybillStatus(status *model.OrderStatus, db orm.Ormer) (isDuplica
|
||||
created, _, err := db.ReadOrCreate(status, "VendorOrderID", "VendorID", "OrderType", "VendorStatus", "StatusTime")
|
||||
if err == nil {
|
||||
if !created {
|
||||
globals.SugarLogger.Infof("duplicated event:%v", status)
|
||||
isDuplicated = true
|
||||
status.DuplicatedCount++
|
||||
_, err = db.Update(status, "DuplicatedCount")
|
||||
globals.SugarLogger.Infof("duplicated event:%v", status)
|
||||
utils.CallFuncLogError(func() error {
|
||||
_, err = db.Update(status, "DuplicatedCount")
|
||||
return err
|
||||
}, "addOrderOrWaybillStatus update DuplicatedCount")
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
@@ -86,3 +89,64 @@ func GetDataCityCodeFromOrder(order *model.GoodsOrder) (retVal string, err error
|
||||
}
|
||||
return retVal, err
|
||||
}
|
||||
|
||||
// todo 可以考虑改成完全按StatusTime来发送事件
|
||||
func LoadPendingOrders() {
|
||||
orders := OrderManager.LoadPendingOrders()
|
||||
globals.SugarLogger.Infof("LoadPendingOrders orders count:%d", len(orders))
|
||||
bills := WaybillManager.LoadPendingWaybills()
|
||||
globals.SugarLogger.Infof("LoadPendingOrders waybills count:%d", len(bills))
|
||||
|
||||
billsMap := map[string][]*model.Waybill{}
|
||||
for _, v := range bills {
|
||||
savedBills := []*model.Waybill{v}
|
||||
if bills, ok := billsMap[v.VendorOrderID]; ok {
|
||||
savedBills = append(bills, v)
|
||||
}
|
||||
billsMap[v.VendorOrderID] = savedBills
|
||||
}
|
||||
for _, order := range orders {
|
||||
isNoNewSent := false
|
||||
orderNew := *order
|
||||
orderNew.Status = model.OrderStatusNew
|
||||
orderNew.StatusTime = order.OrderCreatedAt
|
||||
routinePool.CallFunAsync(func() {
|
||||
scheduler.CurrentScheduler.OnOrderNew(&orderNew)
|
||||
}, order.VendorOrderID)
|
||||
for _, bill := range billsMap[order.VendorOrderID] {
|
||||
if order.Status > model.OrderStatusNew && !isNoNewSent && order.StatusTime.Sub(bill.WaybillCreatedAt) < 0 {
|
||||
isNoNewSent = true
|
||||
order2 := *order
|
||||
routinePool.CallFunAsync(func() {
|
||||
scheduler.CurrentScheduler.OnOrderStatusChanged(model.Order2Status(&order2))
|
||||
}, order.VendorOrderID)
|
||||
}
|
||||
billNew := *bill
|
||||
billNew.Status = model.OrderStatusNew
|
||||
billNew.StatusTime = order.OrderCreatedAt
|
||||
routinePool.CallFunAsync(func() {
|
||||
scheduler.CurrentScheduler.OnWaybillStatusChanged(&billNew)
|
||||
}, bill.VendorOrderID)
|
||||
if order.Status > model.OrderStatusNew && !isNoNewSent && order.StatusTime.Sub(bill.StatusTime) < 0 {
|
||||
isNoNewSent = true
|
||||
order2 := *order
|
||||
routinePool.CallFunAsync(func() {
|
||||
scheduler.CurrentScheduler.OnOrderStatusChanged(model.Order2Status(&order2))
|
||||
}, order.VendorOrderID)
|
||||
}
|
||||
if bill.Status > model.WaybillStatusNew {
|
||||
bill2 := *bill
|
||||
routinePool.CallFunAsync(func() {
|
||||
scheduler.CurrentScheduler.OnWaybillStatusChanged(&bill2)
|
||||
}, bill.VendorOrderID)
|
||||
}
|
||||
if order.Status > model.OrderStatusNew {
|
||||
isNoNewSent = true
|
||||
order2 := *order
|
||||
routinePool.CallFunAsync(func() {
|
||||
scheduler.CurrentScheduler.OnOrderStatusChanged(model.Order2Status(&order2))
|
||||
}, order.VendorOrderID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,12 +51,12 @@ func (c *WaybillController) onWaybillMsg(msg *dadaapi.CallbackMsg) (retVal *dada
|
||||
|
||||
func (c *WaybillController) callbackMsg2Waybill(msg *dadaapi.CallbackMsg) (retVal *model.Waybill) {
|
||||
retVal = &model.Waybill{
|
||||
VendorWaybillID: msg.ClientID,
|
||||
WaybillVendorID: model.VendorIDDada,
|
||||
CourierName: msg.DmName,
|
||||
CourierMobile: msg.DmMobile,
|
||||
VendorStatus: utils.Int2Str(msg.OrderStatus),
|
||||
WaybillCreatedAt: utils.Timestamp2Time(int64(msg.UpdateTime)),
|
||||
VendorWaybillID: msg.ClientID,
|
||||
WaybillVendorID: model.VendorIDDada,
|
||||
CourierName: msg.DmName,
|
||||
CourierMobile: msg.DmMobile,
|
||||
VendorStatus: utils.Int2Str(msg.OrderStatus),
|
||||
StatusTime: utils.Timestamp2Time(int64(msg.UpdateTime)),
|
||||
}
|
||||
retVal.VendorOrderID, retVal.OrderVendorID = jxutils.SplitUniversalOrderID(msg.OrderID)
|
||||
return retVal
|
||||
|
||||
@@ -140,7 +140,7 @@ func (c *OrderController) getOrderInfo(orderID string) (order *model.GoodsOrder,
|
||||
ExpectedDeliveredTime: utils.Str2TimeWithDefault(utils.Interface2String(result["deliverTime"]), utils.DefaultTimeValue),
|
||||
VendorStatus: utils.Interface2String(result["status"]), // 取订单的原始status,不合并消息类型(因为当前消息类型没有意义)
|
||||
OrderSeq: int(utils.MustInterface2Int64(result["daySn"])),
|
||||
OrderCreatedAt: utils.Str2Time(result["createdAt"].(string)),
|
||||
StatusTime: utils.Str2Time(result["createdAt"].(string)),
|
||||
OriginalData: utils.FilterMb4(string(utils.MustMarshal(result))),
|
||||
ActualPayPrice: jxutils.StandardPrice2Int(utils.MustInterface2Float64(result["totalPrice"])),
|
||||
Skus: []*model.OrderSku{},
|
||||
@@ -157,15 +157,14 @@ func (c *OrderController) getOrderInfo(orderID string) (order *model.GoodsOrder,
|
||||
for _, product2 := range group["items"].([]interface{}) {
|
||||
product := product2.(map[string]interface{})
|
||||
sku := &model.OrderSku{
|
||||
VendorOrderID: orderID,
|
||||
VendorID: model.VendorIDELM,
|
||||
Count: int(utils.MustInterface2Int64(product["quantity"])),
|
||||
SkuID: int(utils.Str2Int64WithDefault(utils.Interface2String(product["extendCode"]), 0)),
|
||||
VendorSkuID: utils.Int64ToStr(utils.MustInterface2Int64(product["skuId"])),
|
||||
SkuName: product["name"].(string),
|
||||
SalePrice: jxutils.StandardPrice2Int(utils.MustInterface2Float64(product["userPrice"])),
|
||||
Weight: int(math.Round(utils.Interface2FloatWithDefault(product["weight"], 0.0))),
|
||||
OrderCreatedAt: order.OrderCreatedAt,
|
||||
VendorOrderID: orderID,
|
||||
VendorID: model.VendorIDELM,
|
||||
Count: int(utils.MustInterface2Int64(product["quantity"])),
|
||||
SkuID: int(utils.Str2Int64WithDefault(utils.Interface2String(product["extendCode"]), 0)),
|
||||
VendorSkuID: utils.Int64ToStr(utils.MustInterface2Int64(product["skuId"])),
|
||||
SkuName: product["name"].(string),
|
||||
SalePrice: jxutils.StandardPrice2Int(utils.MustInterface2Float64(product["userPrice"])),
|
||||
Weight: int(math.Round(utils.Interface2FloatWithDefault(product["weight"], 0.0))),
|
||||
}
|
||||
order.Skus = append(order.Skus, sku)
|
||||
order.SkuCount++
|
||||
|
||||
@@ -20,7 +20,7 @@ func (c *OrderController) legacyWriteElmOrder(order *model.GoodsOrder) (err erro
|
||||
Type: msgType,
|
||||
Consignee: order.ConsigneeName,
|
||||
Mobile: order.ConsigneeMobile,
|
||||
OrderCreatedAt: utils.Time2Str(order.OrderCreatedAt),
|
||||
OrderCreatedAt: utils.Time2Str(order.StatusTime),
|
||||
}
|
||||
_, err = db.Insert(legacyOrder)
|
||||
if err != nil {
|
||||
|
||||
@@ -53,14 +53,14 @@ func (c *WaybillController) onWaybillStatusMsg(msg *elmapi.CallbackWaybillStatus
|
||||
|
||||
func (c *WaybillController) callbackMsg2Waybill(msg *elmapi.CallbackWaybillStatusMsg) (retVal *model.Waybill) {
|
||||
retVal = &model.Waybill{
|
||||
VendorOrderID: msg.OrderID,
|
||||
OrderVendorID: model.VendorIDELM,
|
||||
VendorWaybillID: msg.OrderID,
|
||||
WaybillVendorID: model.VendorIDELM,
|
||||
CourierName: msg.Name,
|
||||
CourierMobile: msg.Phone,
|
||||
VendorStatus: c.composeState(msg.State, msg.SubState, msg.MsgType),
|
||||
WaybillCreatedAt: utils.Timestamp2Time(msg.UpdateAt / 1000),
|
||||
VendorOrderID: msg.OrderID,
|
||||
OrderVendorID: model.VendorIDELM,
|
||||
VendorWaybillID: msg.OrderID,
|
||||
WaybillVendorID: model.VendorIDELM,
|
||||
CourierName: msg.Name,
|
||||
CourierMobile: msg.Phone,
|
||||
VendorStatus: c.composeState(msg.State, msg.SubState, msg.MsgType),
|
||||
StatusTime: utils.Timestamp2Time(msg.UpdateAt / 1000),
|
||||
}
|
||||
return retVal
|
||||
}
|
||||
|
||||
@@ -79,7 +79,7 @@ func (c *OrderController) getOrderInfo(msg *jdapi.CallbackOrderMsg) (order *mode
|
||||
ExpectedDeliveredTime: utils.Str2TimeWithDefault(utils.Interface2String(result["orderPreEndDeliveryTime"]), utils.DefaultTimeValue),
|
||||
VendorStatus: msg.StatusID,
|
||||
OrderSeq: int(utils.MustInterface2Int64(result["orderNum"])),
|
||||
OrderCreatedAt: utils.Str2Time(result["orderStartTime"].(string)),
|
||||
StatusTime: utils.Str2Time(result["orderStartTime"].(string)),
|
||||
OriginalData: utils.FilterMb4(string(utils.MustMarshal(result))),
|
||||
ActualPayPrice: utils.MustInterface2Int64(result["orderBuyerPayableMoney"]),
|
||||
Skus: []*model.OrderSku{},
|
||||
@@ -103,16 +103,15 @@ func (c *OrderController) getOrderInfo(msg *jdapi.CallbackOrderMsg) (order *mode
|
||||
for _, product2 := range result["product"].([]interface{}) {
|
||||
product := product2.(map[string]interface{})
|
||||
sku := &model.OrderSku{
|
||||
VendorOrderID: msg.BillID,
|
||||
VendorID: model.VendorIDJD,
|
||||
Count: int(utils.MustInterface2Int64(product["skuCount"])),
|
||||
SkuID: int(utils.Str2Int64WithDefault(utils.Interface2String(product["skuIdIsv"]), 0)),
|
||||
VendorSkuID: utils.Int64ToStr(utils.MustInterface2Int64(product["skuId"])),
|
||||
SkuName: product["skuName"].(string),
|
||||
Weight: int(utils.MustInterface2Float64(product["skuWeight"]) * 1000),
|
||||
SalePrice: utils.MustInterface2Int64(product["skuJdPrice"]),
|
||||
PromotionType: int(utils.MustInterface2Int64(product["promotionType"])),
|
||||
OrderCreatedAt: order.OrderCreatedAt,
|
||||
VendorOrderID: msg.BillID,
|
||||
VendorID: model.VendorIDJD,
|
||||
Count: int(utils.MustInterface2Int64(product["skuCount"])),
|
||||
SkuID: int(utils.Str2Int64WithDefault(utils.Interface2String(product["skuIdIsv"]), 0)),
|
||||
VendorSkuID: utils.Int64ToStr(utils.MustInterface2Int64(product["skuId"])),
|
||||
SkuName: product["skuName"].(string),
|
||||
Weight: int(utils.MustInterface2Float64(product["skuWeight"]) * 1000),
|
||||
SalePrice: utils.MustInterface2Int64(product["skuJdPrice"]),
|
||||
PromotionType: int(utils.MustInterface2Int64(product["promotionType"])),
|
||||
}
|
||||
if product["isGift"].(bool) {
|
||||
sku.SkuType = 1
|
||||
|
||||
@@ -18,7 +18,7 @@ func (c *OrderController) legacyWriteJdOrder(order *model.GoodsOrder, delOldFirs
|
||||
Code: "0",
|
||||
Jdorderid: utils.Str2Int64(order.VendorOrderID),
|
||||
Orderstatus: int(utils.Str2Int64(order.VendorStatus)),
|
||||
Orderstatustime: utils.Time2Str(order.OrderCreatedAt),
|
||||
Orderstatustime: utils.Time2Str(order.StatusTime),
|
||||
Success: 1,
|
||||
Cityname: "all",
|
||||
}
|
||||
|
||||
@@ -49,14 +49,14 @@ func (c *WaybillController) onWaybillMsg(msg *jdapi.CallbackDeliveryStatusMsg) (
|
||||
|
||||
func (c *WaybillController) callbackMsg2Waybill(msg *jdapi.CallbackDeliveryStatusMsg) (retVal *model.Waybill) {
|
||||
retVal = &model.Waybill{
|
||||
VendorOrderID: msg.OrderID,
|
||||
OrderVendorID: model.VendorIDJD,
|
||||
VendorWaybillID: msg.OrderID,
|
||||
WaybillVendorID: model.VendorIDJD,
|
||||
CourierName: msg.DeliveryManName,
|
||||
CourierMobile: msg.DeliveryManPhone,
|
||||
VendorStatus: msg.DeliveryStatus,
|
||||
WaybillCreatedAt: utils.Str2Time(msg.DeliveryStatusTime),
|
||||
VendorOrderID: msg.OrderID,
|
||||
OrderVendorID: model.VendorIDJD,
|
||||
VendorWaybillID: msg.OrderID,
|
||||
WaybillVendorID: model.VendorIDJD,
|
||||
CourierName: msg.DeliveryManName,
|
||||
CourierMobile: msg.DeliveryManPhone,
|
||||
VendorStatus: msg.DeliveryStatus,
|
||||
StatusTime: utils.Str2Time(msg.DeliveryStatusTime),
|
||||
}
|
||||
return retVal
|
||||
}
|
||||
|
||||
@@ -39,7 +39,7 @@ func (c *WaybillController) OnWaybillExcept(msg *mtpsapi.CallbackOrderExceptionM
|
||||
CourierMobile: msg.CourierPhone,
|
||||
Status: model.WaybillStatusFailed, // todo 这里要再确定一下是否只要收到订单异常消息就视为订单失败
|
||||
VendorStatus: utils.Int2Str(msg.ExceptionCode),
|
||||
WaybillCreatedAt: utils.Timestamp2Time(msg.Timestamp),
|
||||
StatusTime: utils.Timestamp2Time(msg.Timestamp),
|
||||
}
|
||||
order.VendorOrderID, order.OrderVendorID = jxutils.SplitUniversalOrderID(msg.OrderID)
|
||||
retVal = mtpsapi.Err2CallbackResponse(controller.WaybillManager.OnWaybillStatusChanged(order), "mtps OnWaybillExcept")
|
||||
@@ -76,7 +76,7 @@ func (c *WaybillController) callbackMsg2Waybill(msg *mtpsapi.CallbackOrderMsg) (
|
||||
CourierName: msg.CourierName,
|
||||
CourierMobile: msg.CourierPhone,
|
||||
VendorStatus: utils.Int2Str(msg.Status),
|
||||
WaybillCreatedAt: utils.Timestamp2Time(msg.Timestamp),
|
||||
StatusTime: utils.Timestamp2Time(msg.Timestamp),
|
||||
}
|
||||
retVal.VendorOrderID, retVal.OrderVendorID = jxutils.SplitUniversalOrderID(msg.OrderID)
|
||||
return retVal
|
||||
@@ -127,7 +127,7 @@ func (c *WaybillController) calculateDeliveryFee(bill *model.Waybill) (retVal in
|
||||
delieveryFee += jxutils.StandardPrice2Int(2.5 + 10 + 2*float64(order.Weight/1000-20))
|
||||
}
|
||||
|
||||
hour, min, sec := bill.WaybillCreatedAt.Clock()
|
||||
hour, min, sec := bill.StatusTime.Clock()
|
||||
totalSeconds := hour*3600 + min*60 + sec
|
||||
if totalSeconds >= 11*3600+30*60 && totalSeconds <= 13*3600 { // 11:30 -- 13:00
|
||||
delieveryFee += jxutils.StandardPrice2Int(3)
|
||||
|
||||
@@ -22,7 +22,7 @@ func NewOrderManager() *OrderController {
|
||||
return &OrderController{}
|
||||
}
|
||||
|
||||
func (c *OrderController) LoadPendingOrders() {
|
||||
func (c *OrderController) LoadPendingOrders() []*model.GoodsOrder {
|
||||
db := orm.NewOrm()
|
||||
var orders []*model.GoodsOrder
|
||||
_, err := db.Raw(`
|
||||
@@ -34,20 +34,13 @@ func (c *OrderController) LoadPendingOrders() {
|
||||
`, time.Now().Add(-pendingOrderGapMax), model.OrderStatusEndBegin).QueryRows(&orders)
|
||||
if err != nil {
|
||||
globals.SugarLogger.Warnf("init load pending orders error:%v", err)
|
||||
return
|
||||
}
|
||||
globals.SugarLogger.Info(len(orders))
|
||||
for _, v := range orders {
|
||||
v2 := v
|
||||
routinePool.CallFunAsync(func() {
|
||||
scheduler.CurrentScheduler.OnOrderNew(v2)
|
||||
}, v2.VendorOrderID)
|
||||
return nil
|
||||
}
|
||||
return orders
|
||||
}
|
||||
|
||||
func (c *OrderController) OnOrderNew(order *model.GoodsOrder) (err error) {
|
||||
db := orm.NewOrm()
|
||||
order.StatusTime = order.OrderCreatedAt
|
||||
isDuplicated, err := addOrderOrWaybillStatus(model.Order2Status(order), db)
|
||||
if err == nil && !isDuplicated {
|
||||
if err = c.saveOrder(order, false, db); err == nil {
|
||||
@@ -58,9 +51,9 @@ func (c *OrderController) OnOrderNew(order *model.GoodsOrder) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
// todo 调整单的处理可能还需要再细化一点,当前只是简单的删除重建
|
||||
func (c *OrderController) OnOrderAdjust(order *model.GoodsOrder) (err error) {
|
||||
db := orm.NewOrm()
|
||||
order.StatusTime = order.OrderCreatedAt
|
||||
status := model.Order2Status(order)
|
||||
isDuplicated, err := addOrderOrWaybillStatus(status, db)
|
||||
if err == nil && !isDuplicated {
|
||||
@@ -102,9 +95,10 @@ func (c *OrderController) saveOrder(order *model.GoodsOrder, isAdjust bool, db o
|
||||
c.updateOrderOtherInfo(order, db)
|
||||
db.Begin()
|
||||
// globals.SugarLogger.Debugf("new order:%v", order)
|
||||
order.ID = 0
|
||||
order.WaybillVendorID = model.VendorIDUnknown
|
||||
order.OrderFinishedAt = utils.DefaultTimeValue
|
||||
order.ID = 0
|
||||
order.OrderCreatedAt = order.StatusTime
|
||||
created, _, err2 := db.ReadOrCreate(order, "VendorOrderID", "VendorID")
|
||||
if err = err2; err == nil {
|
||||
if created {
|
||||
@@ -114,7 +108,7 @@ func (c *OrderController) saveOrder(order *model.GoodsOrder, isAdjust bool, db o
|
||||
for _, sku := range order.Skus {
|
||||
sql += "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?),"
|
||||
params = append(params, sku.VendorOrderID, sku.VendorID, sku.Count, sku.VendorSkuID, sku.SkuID, sku.JxSkuID, sku.SkuName,
|
||||
sku.ShopPrice, sku.SalePrice, sku.Weight, sku.SkuType, sku.PromotionType, order.OrderCreatedAt)
|
||||
sku.ShopPrice, sku.SalePrice, sku.Weight, sku.SkuType, sku.PromotionType, order.StatusTime)
|
||||
}
|
||||
sql = sql[:len(sql)-1] + ";"
|
||||
if _, err = db.Raw(sql, params...).Exec(); err != nil {
|
||||
@@ -215,10 +209,11 @@ func (c *OrderController) addOrderStatus(orderStatus *model.OrderStatus, db orm.
|
||||
db = orm.NewOrm()
|
||||
}
|
||||
isDuplicated, err = addOrderOrWaybillStatus(orderStatus, db)
|
||||
if !isDuplicated && orderStatus.Status > model.OrderStatusNew {
|
||||
if err == nil && !isDuplicated && orderStatus.Status > model.OrderStatusNew {
|
||||
params := orm.Params{
|
||||
"status": orderStatus.Status,
|
||||
"vendor_status": orderStatus.VendorStatus,
|
||||
"status_time": orderStatus.StatusTime,
|
||||
}
|
||||
if orderStatus.Status >= model.OrderStatusEndBegin {
|
||||
params["order_finished_at"] = orderStatus.StatusTime
|
||||
|
||||
@@ -143,7 +143,7 @@ func (c *OrderController) legacyWriteJxOrder(order *model.GoodsOrder, db orm.Orm
|
||||
JxStoreName: order.StoreName,
|
||||
OrderNum: order.OrderSeq,
|
||||
OrderStatus: legacyMapOrderStatus(order.Status),
|
||||
OrderStatusTime: utils.Time2Str(order.OrderCreatedAt),
|
||||
OrderStatusTime: utils.Time2Str(order.StatusTime),
|
||||
BusinessTag: businessTags,
|
||||
SkuCount: order.SkuCount,
|
||||
OrderBuyerRemark: order.BuyerComment,
|
||||
@@ -154,7 +154,7 @@ func (c *OrderController) legacyWriteJxOrder(order *model.GoodsOrder, db orm.Orm
|
||||
BuyerLng: jxutils.IntCoordinate2Standard(order.ConsigneeLng),
|
||||
BuyerLat: jxutils.IntCoordinate2Standard(order.ConsigneeLat),
|
||||
CityName: "all",
|
||||
OrderStartTime: utils.Time2Str(order.OrderCreatedAt),
|
||||
OrderStartTime: utils.Time2Str(order.StatusTime),
|
||||
JdStoreId: order.VendorStoreID,
|
||||
// DeliveryPackageWeight: float64(order.Weight) / 1000,
|
||||
}
|
||||
@@ -264,9 +264,9 @@ func (c *WaybillController) legacyWaybillStatusChanged(bill *model.Waybill, db o
|
||||
// jxorder.DeliveryConfirmTime
|
||||
if bill.Status == model.WaybillStatusNew {
|
||||
updateFields = append(updateFields, "DeliveryStartTime")
|
||||
jxorder.DeliveryStartTime = utils.Time2Str(bill.WaybillCreatedAt)
|
||||
jxorder.DeliveryStartTime = utils.Time2Str(bill.StatusTime)
|
||||
} else if bill.Status >= model.WaybillStatusEndBegin {
|
||||
jxorder.DeliveryFinishTime = utils.Time2Str(bill.WaybillCreatedAt)
|
||||
jxorder.DeliveryFinishTime = utils.Time2Str(bill.StatusTime)
|
||||
updateFields = append(updateFields, "DeliveryFinishTime")
|
||||
}
|
||||
_, err = db.Update(jxorder, updateFields...)
|
||||
|
||||
@@ -19,7 +19,7 @@ func NewWaybillManager() *WaybillController {
|
||||
return &WaybillController{}
|
||||
}
|
||||
|
||||
func (w *WaybillController) LoadPendingWaybills() {
|
||||
func (w *WaybillController) LoadPendingWaybills() []*model.Waybill {
|
||||
db := orm.NewOrm()
|
||||
var bills []*model.Waybill
|
||||
_, err := db.Raw(`
|
||||
@@ -36,29 +36,17 @@ func (w *WaybillController) LoadPendingWaybills() {
|
||||
time.Now().Add(-pendingOrderGapMax), model.WaybillStatusEndBegin).QueryRows(&bills)
|
||||
if err != nil {
|
||||
globals.SugarLogger.Warnf("init load pending waybills error:%v", err)
|
||||
return
|
||||
}
|
||||
globals.SugarLogger.Info(len(bills))
|
||||
for _, v := range bills {
|
||||
v2 := v
|
||||
routinePool.CallFunAsync(func() {
|
||||
if v2.Status != model.WaybillStatusNew {
|
||||
savedStatus := v2.Status
|
||||
v2.Status = model.WaybillStatusNew
|
||||
scheduler.CurrentScheduler.OnWaybillStatusChanged(v2)
|
||||
v2.Status = savedStatus
|
||||
}
|
||||
scheduler.CurrentScheduler.OnWaybillStatusChanged(v2)
|
||||
}, v2.VendorOrderID)
|
||||
return nil
|
||||
}
|
||||
return bills
|
||||
}
|
||||
|
||||
func (w *WaybillController) onWaybillNew(bill *model.Waybill) (err error) {
|
||||
db := orm.NewOrm()
|
||||
isDuplicated, err := addOrderOrWaybillStatus(model.Waybill2Status(bill), db)
|
||||
if !isDuplicated {
|
||||
bill.WaybillFinishedAt = utils.DefaultTimeValue
|
||||
func (w *WaybillController) onWaybillNew(bill *model.Waybill, db orm.Ormer) (isDuplicated bool, err error) {
|
||||
isDuplicated, err = addOrderOrWaybillStatus(model.Waybill2Status(bill), db)
|
||||
if err == nil && !isDuplicated {
|
||||
bill.ID = 0
|
||||
bill.WaybillCreatedAt = bill.StatusTime
|
||||
bill.WaybillFinishedAt = utils.DefaultTimeValue
|
||||
created, _, err2 := db.ReadOrCreate(bill, "VendorWaybillID", "WaybillVendorID")
|
||||
if err = err2; err == nil {
|
||||
if !created {
|
||||
@@ -66,54 +54,36 @@ func (w *WaybillController) onWaybillNew(bill *model.Waybill) (err error) {
|
||||
db.Update(bill, "DuplicatedCount")
|
||||
globals.SugarLogger.Infof("duplicated bill:%v vendorID:%d, msg received", bill.VendorWaybillID, bill.WaybillVendorID)
|
||||
}
|
||||
err = scheduler.CurrentScheduler.OnWaybillStatusChanged(bill)
|
||||
if globals.HandleLegacyJxOrder {
|
||||
w.legacyWaybillStatusChanged(bill, db)
|
||||
}
|
||||
} else {
|
||||
globals.SugarLogger.Warnf("create bill:%v, error:%v", bill, err)
|
||||
}
|
||||
}
|
||||
return err
|
||||
return isDuplicated, err
|
||||
}
|
||||
|
||||
func (w *WaybillController) OnWaybillStatusChanged(bill *model.Waybill) (err error) {
|
||||
var isDuplicated bool
|
||||
db := orm.NewOrm()
|
||||
if bill.Status == model.WaybillStatusNew {
|
||||
err = w.onWaybillNew(bill)
|
||||
} else if bill.Status == model.WaybillStatusAccepted {
|
||||
err = w.onWaybillAccepted(bill)
|
||||
isDuplicated, err = w.onWaybillNew(bill, db)
|
||||
} else {
|
||||
db := orm.NewOrm()
|
||||
isDuplicated, err2 := w.addWaybillStatus(bill, db)
|
||||
if err = err2; err == nil && !isDuplicated {
|
||||
err = scheduler.CurrentScheduler.OnWaybillStatusChanged(bill)
|
||||
if globals.HandleLegacyJxOrder {
|
||||
w.legacyWaybillStatusChanged(bill, db)
|
||||
var addParams orm.Params
|
||||
if bill.Status == model.WaybillStatusAccepted {
|
||||
addParams = orm.Params{
|
||||
"courier_name": bill.CourierName,
|
||||
"courier_mobile": bill.CourierMobile,
|
||||
"desired_fee": bill.DesiredFee,
|
||||
}
|
||||
}
|
||||
isDuplicated, err = w.addWaybillStatus(bill, db, addParams)
|
||||
}
|
||||
if bill.Status == model.WaybillStatusAccepted || bill.Status == model.WaybillStatusDelivered {
|
||||
if order, err2 := OrderManager.LoadOrder(bill.VendorOrderID, bill.OrderVendorID); err2 == nil {
|
||||
weixinmsg.NotifyWaybillStatus(bill, order)
|
||||
if err == nil && !isDuplicated {
|
||||
scheduler.CurrentScheduler.OnWaybillStatusChanged(bill)
|
||||
if bill.Status == model.WaybillStatusAccepted || bill.Status == model.WaybillStatusDelivered {
|
||||
if order, err2 := OrderManager.LoadOrder(bill.VendorOrderID, bill.OrderVendorID); err2 == nil {
|
||||
weixinmsg.NotifyWaybillStatus(bill, order)
|
||||
}
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *WaybillController) onWaybillAccepted(bill *model.Waybill) (err error) {
|
||||
db := orm.NewOrm()
|
||||
isDuplicated, err := w.addWaybillStatus(bill, db)
|
||||
if !isDuplicated {
|
||||
params := orm.Params{
|
||||
"courier_name": bill.CourierName,
|
||||
"courier_mobile": bill.CourierMobile,
|
||||
"desired_fee": bill.DesiredFee,
|
||||
}
|
||||
utils.CallFuncLogError(func() error {
|
||||
_, err = db.QueryTable("waybill").Filter("vendor_waybill_id", bill.VendorWaybillID).Filter("waybill_vendor_id", bill.WaybillVendorID).Update(params)
|
||||
return err
|
||||
}, "update waybill courier info")
|
||||
err = scheduler.CurrentScheduler.OnWaybillStatusChanged(bill)
|
||||
if globals.HandleLegacyJxOrder {
|
||||
w.legacyWaybillStatusChanged(bill, db)
|
||||
}
|
||||
@@ -121,19 +91,17 @@ func (w *WaybillController) onWaybillAccepted(bill *model.Waybill) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *WaybillController) addWaybillStatus(bill *model.Waybill, db orm.Ormer) (isDuplicated bool, err error) {
|
||||
if db == nil {
|
||||
db = orm.NewOrm()
|
||||
}
|
||||
func (w *WaybillController) addWaybillStatus(bill *model.Waybill, db orm.Ormer, addParams orm.Params) (isDuplicated bool, err error) {
|
||||
waybillStatus := model.Waybill2Status(bill)
|
||||
isDuplicated, err = addOrderOrWaybillStatus(waybillStatus, db)
|
||||
if !isDuplicated && waybillStatus.Status > model.WaybillStatusNew {
|
||||
params := orm.Params{
|
||||
if err == nil && !isDuplicated && waybillStatus.Status > model.WaybillStatusNew {
|
||||
params := utils.MergeMaps(orm.Params{
|
||||
"status": bill.Status,
|
||||
"vendor_status": bill.VendorStatus,
|
||||
}
|
||||
"status_time": bill.StatusTime,
|
||||
}, addParams)
|
||||
if bill.Status >= model.WaybillStatusEndBegin {
|
||||
params["waybill_finished_at"] = bill.WaybillCreatedAt
|
||||
params["waybill_finished_at"] = bill.StatusTime
|
||||
}
|
||||
utils.CallFuncLogError(func() error {
|
||||
_, err = db.QueryTable("waybill").Filter("vendor_waybill_id", bill.VendorWaybillID).Filter("waybill_vendor_id", bill.WaybillVendorID).Update(params)
|
||||
|
||||
@@ -84,10 +84,10 @@ func GetUniversalOrderIDFromOrderStatus(status *model.OrderStatus) string {
|
||||
return ComposeUniversalOrderID(status.VendorOrderID, status.VendorID)
|
||||
}
|
||||
|
||||
func GetRealTimeout(beginTime time.Time, timeout time.Duration) time.Duration {
|
||||
func GetRealTimeout(beginTime time.Time, timeout time.Duration, minTimeout time.Duration) time.Duration {
|
||||
retVal := beginTime.Add(timeout).Sub(time.Now())
|
||||
if retVal < 0 {
|
||||
retVal = 0
|
||||
if retVal < minTimeout {
|
||||
retVal = minTimeout
|
||||
}
|
||||
return retVal
|
||||
}
|
||||
|
||||
@@ -23,7 +23,7 @@ func Waybill2Status(bill *Waybill) (retVal *OrderStatus) {
|
||||
RefVendorID: bill.OrderVendorID,
|
||||
Status: bill.Status,
|
||||
VendorStatus: bill.VendorStatus,
|
||||
StatusTime: bill.WaybillCreatedAt,
|
||||
StatusTime: bill.StatusTime,
|
||||
}
|
||||
return retVal
|
||||
}
|
||||
|
||||
@@ -40,10 +40,10 @@ type GoodsOrder struct {
|
||||
DuplicatedCount int // 重复新订单消息数,这个一般不是由于消息重发造成的(消息重发由OrderStatus过滤),一般是业务逻辑造成的
|
||||
OrderCreatedAt time.Time `orm:"type(datetime);index"`
|
||||
OrderFinishedAt time.Time `orm:"type(datetime)"`
|
||||
StatusTime time.Time `orm:"type(datetime)"` // last status time
|
||||
ModelTimeInfo
|
||||
OriginalData string `orm:"type(text)"`
|
||||
Skus []*OrderSku `orm:"-"`
|
||||
StatusTime time.Time `orm:"-"` // 用于传递数据,不实际存储
|
||||
}
|
||||
|
||||
func (o *GoodsOrder) TableUnique() [][]string {
|
||||
@@ -91,8 +91,9 @@ type Waybill struct {
|
||||
ActualFee int64 // 实际要支付给快递公司的实际费用
|
||||
DesiredFee int64 // 根据合同计算出来的预期费用
|
||||
DuplicatedCount int // 重复新订单消息数,这个一般不是由于消息重发造成的(消息重发由OrderStatus过滤),一般是业务逻辑造成的
|
||||
WaybillCreatedAt time.Time `orm:"type(datetime);index"` // 此字段在此结构体用于传递非新运单消息时,为事件发生事件(而非运单创建时间)
|
||||
WaybillCreatedAt time.Time `orm:"type(datetime);index"`
|
||||
WaybillFinishedAt time.Time `orm:"type(datetime)"`
|
||||
StatusTime time.Time `orm:"type(datetime)"` // last status time
|
||||
ModelTimeInfo
|
||||
OriginalData string `orm:"type(text)"`
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ const (
|
||||
time2Schedule3rdCarrierGap4OrderStatus = 3 * time.Minute // 京东要求是运单状态为待抢单且超时5分钟,但为了防止没有运单事件,所以就拣货完成事件开始算,添加3分钟
|
||||
time2AutoPickupMin = 15 * time.Minute
|
||||
time2AutoPickupGap = 5 * time.Minute
|
||||
minTimeout = 10 * time.Second // timer的最小时间,这样写的上的是在load pending orders,让延迟的事件有机会执行
|
||||
)
|
||||
|
||||
type WatchOrderInfo struct {
|
||||
@@ -74,9 +75,6 @@ func (s *DefScheduler) OnOrderNew(order *model.GoodsOrder) (err error) {
|
||||
}
|
||||
s.orderMap.Store(jxutils.ComposeUniversalOrderID(order.VendorOrderID, order.VendorID), watchInfo)
|
||||
s.resetTimer(watchInfo, model.OrderStatusNew, order.OrderCreatedAt, 0)
|
||||
if order.Status > model.OrderStatusNew {
|
||||
return s.OnOrderStatusChanged(model.Order2Status(order))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -122,7 +120,7 @@ func (s *DefScheduler) OnWaybillStatusChanged(bill *model.Waybill) (err error) {
|
||||
if bill.Status == model.WaybillStatusNew {
|
||||
if bill.OrderVendorID == bill.WaybillVendorID {
|
||||
if savedOrderInfo.timerStatus == model.OrderStatusFinishedPickup { // 如果当前TIMER还是OrderStatusFinishedPickup(在OnOrderStatusChanged中设置的),则重置
|
||||
s.resetTimer(savedOrderInfo, model.OrderStatusFinishedPickup, bill.WaybillCreatedAt, 0)
|
||||
s.resetTimer(savedOrderInfo, model.OrderStatusFinishedPickup, bill.StatusTime, 0)
|
||||
} else if savedOrderInfo.timerStatus != 0 {
|
||||
globals.SugarLogger.Infof("OnWaybillStatusChanged met other timer, status:%d", savedOrderInfo.timerStatus)
|
||||
}
|
||||
@@ -292,7 +290,7 @@ func (s *DefScheduler) resetTimer(savedOrderInfo *WatchOrderInfo, status int, be
|
||||
s.stopTimer(savedOrderInfo)
|
||||
config := s.mergeOrderStatusConfig(status, s.GetPurchasePlatformFromVendorID(savedOrderInfo.order.VendorID).GetStatusActionConfig(status))
|
||||
if config != nil && config.TimeoutAction != nil {
|
||||
timeout := jxutils.GetRealTimeout(beginTime, config.Timeout) + gap
|
||||
timeout := jxutils.GetRealTimeout(beginTime, config.Timeout, minTimeout) + gap
|
||||
globals.SugarLogger.Debugf("resetTimer timeout:%v, orderid:%v", timeout, savedOrderInfo.order.VendorOrderID)
|
||||
savedOrderInfo.timerStatus = status
|
||||
savedOrderInfo.timer = time.AfterFunc(timeout, func() {
|
||||
|
||||
@@ -36,7 +36,7 @@ func InitOrder() {
|
||||
func handlePendingOrderMsg() {
|
||||
var ordersInfo []models.Jdorder
|
||||
db := orm.NewOrm()
|
||||
_, err := db.Raw("SELECT * FROM jdorder WHERE orderstatustime >= DATE_ADD(NOW(), interval -2 day) AND code = ?", MsgNotHandledCode).QueryRows(&ordersInfo)
|
||||
_, err := db.Raw("SELECT * FROM jdorder WHERE orderstatustime >= DATE_FORMAT(DATE_ADD(NOW(), interval -1 day), '%Y-%m-%d %H:%i:%s') AND code = ?", MsgNotHandledCode).QueryRows(&ordersInfo)
|
||||
if err != nil {
|
||||
globals.SugarLogger.Errorf("can not get jdorder from db, error:%v", err)
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user