216 lines
7.1 KiB
Go
216 lines
7.1 KiB
Go
package orderman
|
||
|
||
import (
|
||
"time"
|
||
|
||
"git.rosy.net.cn/baseapi/utils"
|
||
"git.rosy.net.cn/jx-callback/business/jxcallback/scheduler"
|
||
"git.rosy.net.cn/jx-callback/business/model"
|
||
"git.rosy.net.cn/jx-callback/business/model/dao"
|
||
"git.rosy.net.cn/jx-callback/globals"
|
||
"github.com/astaxie/beego/orm"
|
||
)
|
||
|
||
var (
|
||
waybillOrderStatusMap = map[int]int{
|
||
model.WaybillStatusApplyFailedGetGoods: model.OrderStatusApplyFailedGetGoods,
|
||
model.WaybillStatusAgreeFailedGetGoods: model.OrderStatusAgreeFailedGetGoods,
|
||
model.WaybillStatusRefuseFailedGetGoods: model.OrderStatusRefuseFailedGetGoods,
|
||
model.WaybillStatusDeliverFailed: model.OrderStatusDeliverFailed,
|
||
}
|
||
)
|
||
|
||
func (w *OrderManager) LoadPendingWaybills() []*model.Waybill {
|
||
db := orm.NewOrm()
|
||
var bills []*model.Waybill
|
||
tillTime := time.Now().Add(-pendingOrderGapMax)
|
||
_, err := db.Raw(`
|
||
SELECT t1.*
|
||
FROM waybill t1
|
||
JOIN goods_order t2 ON t2.vendor_order_id = t1.vendor_order_id
|
||
AND t2.vendor_id = t1.order_vendor_id
|
||
AND t2.order_created_at >= ?
|
||
AND t2.status < ?
|
||
WHERE t1.waybill_created_at >= ?
|
||
AND t1.status < ?
|
||
`, tillTime, model.OrderStatusEndBegin, tillTime, model.WaybillStatusEndBegin).QueryRows(&bills)
|
||
if err != nil {
|
||
globals.SugarLogger.Warnf("LoadPendingWaybills load pending waybills error:%v", err)
|
||
return nil
|
||
}
|
||
return bills
|
||
}
|
||
|
||
func (w *OrderManager) onWaybillNew(bill2 *model.Waybill, db *dao.DaoDB) (isDuplicated bool, err error) {
|
||
globals.SugarLogger.Debugf("onWaybillNew bill:%v", bill2)
|
||
isDuplicated, err = addOrderOrWaybillStatus(model.Waybill2Status(bill2), db)
|
||
if err == nil && !isDuplicated {
|
||
bill2.ID = 0
|
||
bill2.WaybillCreatedAt = bill2.StatusTime
|
||
bill2.WaybillFinishedAt = utils.DefaultTimeValue
|
||
billCopied := *bill2
|
||
bill := &billCopied
|
||
created, _, err2 := db.Db.ReadOrCreate(bill, "VendorWaybillID", "WaybillVendorID")
|
||
if err = err2; err == nil {
|
||
if !created {
|
||
bill.DuplicatedCount++
|
||
if bill2.VendorOrderID == bill2.VendorWaybillID { // 购物平台(比如京东)重新建的运单,单号始终是与订单相同的
|
||
globals.SugarLogger.Infof("onWaybillNew duplicated1, DuplicatedCount:%d, bill:%v msg received", bill2.DuplicatedCount, bill2)
|
||
bill2.ID = bill.ID
|
||
bill2.CreatedAt = bill.CreatedAt
|
||
bill2.DuplicatedCount = bill.DuplicatedCount
|
||
err = utils.CallFuncLogError(func() error {
|
||
_, err = db.Db.Update(bill2) //更新所有字段
|
||
return err
|
||
}, "onWaybillNew Update1")
|
||
} else {
|
||
globals.SugarLogger.Infof("onWaybillNew duplicated2 DuplicatedCount:%d, bill:%v msg received", bill.DuplicatedCount, bill2)
|
||
isDuplicated = true
|
||
err = utils.CallFuncLogError(func() error {
|
||
_, err = db.Db.Update(bill, "DuplicatedCount")
|
||
return err
|
||
}, "onWaybillNew Update2")
|
||
}
|
||
} else {
|
||
globals.SugarLogger.Debugf("onWaybillNew created bill:%v", bill2)
|
||
*bill2 = *bill
|
||
}
|
||
} else {
|
||
globals.SugarLogger.Warnf("onWaybillNew create bill:%v, error:%v", bill2, err)
|
||
}
|
||
}
|
||
return isDuplicated, err
|
||
}
|
||
|
||
func (w *OrderManager) OnWaybillStatusChanged(bill *model.Waybill) (err error) {
|
||
var isDuplicated bool
|
||
db := dao.GetDB()
|
||
dao.Begin(db)
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
dao.Rollback(db)
|
||
panic(r)
|
||
}
|
||
}()
|
||
duplicatedCount := 0
|
||
if bill.Status == model.WaybillStatusNew {
|
||
isDuplicated, err = w.onWaybillNew(bill, db)
|
||
if isDuplicated {
|
||
duplicatedCount = 1
|
||
}
|
||
} else {
|
||
if bill.Status == model.WaybillStatusAccepted { // 处理美团配送丢失新运单消息的情况
|
||
existingBill, err2 := w.LoadWaybill(bill.VendorWaybillID, bill.WaybillVendorID)
|
||
if err2 != nil {
|
||
if dao.IsNoRowsError(err2) || err2 == ErrCanNotFindWaybill {
|
||
existingBill = bill
|
||
billCopy := *bill
|
||
billCopy.Status = model.WaybillStatusNew
|
||
if isDuplicated, err = w.onWaybillNew(&billCopy, db); err != nil {
|
||
dao.Rollback(db)
|
||
return err
|
||
}
|
||
dao.Commit(db)
|
||
// 进运单调度器OnWaybillStatusChanged之前要确保事务是提交了的,否则会导致死锁
|
||
scheduler.CurrentScheduler.OnWaybillStatusChanged(&billCopy, false)
|
||
dao.Begin(db)
|
||
} else {
|
||
dao.Rollback(db)
|
||
return err2
|
||
}
|
||
}
|
||
// 运单消息错序,之前已经结束了,直接返回
|
||
if existingBill.Status >= model.WaybillStatusEndBegin {
|
||
dao.Commit(db)
|
||
return nil
|
||
}
|
||
}
|
||
addParams := orm.Params{}
|
||
if bill.Status >= model.WaybillStatusAccepted && bill.Status < model.WaybillStatusEndBegin {
|
||
if bill.Status == model.WaybillStatusAccepted {
|
||
if bill.DesiredFee > 0 {
|
||
addParams["desired_fee"] = bill.DesiredFee
|
||
}
|
||
if bill.ActualFee > 0 {
|
||
addParams["actual_fee"] = bill.ActualFee
|
||
}
|
||
}
|
||
if bill.CourierMobile != "" {
|
||
addParams["courier_name"] = bill.CourierName
|
||
addParams["courier_mobile"] = bill.CourierMobile
|
||
}
|
||
} else if bill.Status >= model.WaybillStatusEndBegin {
|
||
addParams["waybill_finished_at"] = bill.StatusTime
|
||
}
|
||
duplicatedCount, err = w.addWaybillStatus(bill, db, addParams)
|
||
if err != nil {
|
||
dao.Rollback(db)
|
||
return err
|
||
}
|
||
}
|
||
if err == nil {
|
||
dao.Commit(db)
|
||
if duplicatedCount == 0 {
|
||
scheduler.CurrentScheduler.OnWaybillStatusChanged(bill, false)
|
||
}
|
||
} else {
|
||
dao.Rollback(db)
|
||
}
|
||
if bill.VendorOrderID == bill.VendorWaybillID {
|
||
if status, ok := waybillOrderStatusMap[bill.Status]; ok {
|
||
fakeOrderStatus := &model.OrderStatus{
|
||
VendorOrderID: bill.VendorOrderID,
|
||
VendorID: bill.OrderVendorID,
|
||
OrderType: model.OrderTypeOrder,
|
||
RefVendorOrderID: bill.VendorOrderID,
|
||
RefVendorID: bill.OrderVendorID,
|
||
Status: status,
|
||
VendorStatus: bill.VendorStatus,
|
||
StatusTime: bill.StatusTime,
|
||
Remark: bill.Remark,
|
||
}
|
||
w.OnOrderStatusChanged(fakeOrderStatus)
|
||
}
|
||
}
|
||
return err
|
||
}
|
||
|
||
func (w *OrderManager) addWaybillStatus(bill *model.Waybill, db *dao.DaoDB, addParams orm.Params) (duplicatedCount int, err error) {
|
||
waybillStatus := model.Waybill2Status(bill)
|
||
isDuplicated, err := addOrderOrWaybillStatus(waybillStatus, db)
|
||
if err == nil && !isDuplicated {
|
||
if waybillStatus.Status > model.WaybillStatusUnknown { // todo 这里应该和addOrderStatus一样的改法,状态不能回绕
|
||
params := utils.MergeMaps(orm.Params{
|
||
"status": bill.Status,
|
||
"vendor_status": bill.VendorStatus,
|
||
"status_time": bill.StatusTime,
|
||
}, addParams)
|
||
utils.CallFuncLogError(func() error {
|
||
_, err = db.Db.QueryTable("waybill").Filter("vendor_waybill_id", bill.VendorWaybillID).Filter("waybill_vendor_id", bill.WaybillVendorID).Filter("status__lte", bill.Status).Update(params)
|
||
return err
|
||
}, "addWaybillStatus update waybill status, bill:%v", bill)
|
||
} else {
|
||
duplicatedCount = -1
|
||
}
|
||
} else {
|
||
duplicatedCount = 1
|
||
}
|
||
return duplicatedCount, err
|
||
}
|
||
|
||
func (c *OrderManager) LoadWaybill(vendorWaybillID string, waybillVendorID int) (bill *model.Waybill, err error) {
|
||
db := orm.NewOrm()
|
||
bill = &model.Waybill{
|
||
VendorWaybillID: vendorWaybillID,
|
||
WaybillVendorID: waybillVendorID,
|
||
}
|
||
if err = db.Read(bill, "VendorWaybillID", "WaybillVendorID"); err != nil {
|
||
bill = nil
|
||
if err == orm.ErrNoRows {
|
||
err = ErrCanNotFindWaybill
|
||
}
|
||
globals.SugarLogger.Infof("LoadWaybill vendorWaybillID:%s failed with error:%v", vendorWaybillID, err)
|
||
}
|
||
return bill, err
|
||
}
|