! orderman中尽量使用事务

! orderman中全换成dao.DaoDB
This commit is contained in:
gazebo
2019-04-16 16:33:53 +08:00
parent 3b8209906f
commit 8083f2f638
5 changed files with 117 additions and 56 deletions

View File

@@ -56,8 +56,14 @@ func (c *OrderManager) OnOrderNew(order *model.GoodsOrder, msgVendorStatus strin
order.ConsigneeMobile2 = order.ConsigneeMobile
}
// todo transaction
db := orm.NewOrm()
db := dao.GetDB()
dao.Begin(db)
defer func() {
if r := recover(); r != nil {
dao.Rollback(db)
panic(r)
}
}()
if order.Status == model.OrderStatusUnknown {
order.Status = model.OrderStatusNew
}
@@ -68,9 +74,15 @@ func (c *OrderManager) OnOrderNew(order *model.GoodsOrder, msgVendorStatus strin
status.VendorStatus = msgVendorStatus
isDuplicated, err := addOrderOrWaybillStatus(status, db)
if err == nil && !isDuplicated {
if isDuplicated, err = c.SaveOrder(order, false, db); err == nil && !isDuplicated {
isDuplicated, err = c.SaveOrder(order, false, db)
}
if err == nil {
dao.Commit(db)
if !isDuplicated {
err = scheduler.CurrentScheduler.OnOrderNew(order, false)
}
} else {
dao.Rollback(db)
}
return err
}
@@ -81,8 +93,14 @@ func (c *OrderManager) OnOrderAdjust(order *model.GoodsOrder, msgVendorStatus st
order.ConsigneeMobile2 = order.ConsigneeMobile
}
// todo transaction
db := orm.NewOrm()
db := dao.GetDB()
dao.Begin(db)
defer func() {
if r := recover(); r != nil {
dao.Rollback(db)
panic(r)
}
}()
if order.Status == model.OrderStatusUnknown {
order.Status = model.OrderStatusNew
}
@@ -92,33 +110,52 @@ func (c *OrderManager) OnOrderAdjust(order *model.GoodsOrder, msgVendorStatus st
isDuplicated, err := addOrderOrWaybillStatus(status, db)
if err == nil && !isDuplicated {
err = utils.CallFuncLogError(func() error {
_, err = db.Raw("DELETE FROM order_sku WHERE vendor_order_id = ? AND vendor_id = ?", order.VendorOrderID, order.VendorID).Exec()
_, err = db.Db.Raw("DELETE FROM order_sku WHERE vendor_order_id = ? AND vendor_id = ?", order.VendorOrderID, order.VendorID).Exec()
return err
}, "OnAdjustOrder delete order, orderID:%s", order.VendorOrderID)
if err != nil {
return err
}
err = utils.CallFuncLogError(func() error {
_, err = db.Raw("DELETE FROM goods_order WHERE vendor_order_id = ? AND vendor_id = ?", order.VendorOrderID, order.VendorID).Exec()
_, err = db.Db.Raw("DELETE FROM goods_order WHERE vendor_order_id = ? AND vendor_id = ?", order.VendorOrderID, order.VendorID).Exec()
return err
}, "OnAdjustOrder delete order_sku, orderID:%s", order.VendorOrderID)
if err != nil {
return err
}
if isDuplicated, err = c.SaveOrder(order, true, db); err == nil && !isDuplicated {
isDuplicated, err = c.SaveOrder(order, true, db)
}
if err == nil {
dao.Commit(db)
if !isDuplicated {
msghub.OnNewOrder(order)
// 因为订单调度器需要的是真实状态所以用order的状态
err = scheduler.CurrentScheduler.OnOrderNew(order, false)
err = scheduler.CurrentScheduler.OnOrderStatusChanged(model.Order2Status(order), false)
_ = scheduler.CurrentScheduler.OnOrderNew(order, false)
_ = scheduler.CurrentScheduler.OnOrderStatusChanged(model.Order2Status(order), false)
}
} else {
dao.Rollback(db)
}
return err
}
func (c *OrderManager) OnOrderStatusChanged(orderStatus *model.OrderStatus) (err error) {
isDuplicated, err := c.addOrderStatus(orderStatus, nil)
if err == nil && !isDuplicated {
err = scheduler.CurrentScheduler.OnOrderStatusChanged(orderStatus, false)
db := dao.GetDB()
dao.Begin(db)
defer func() {
if r := recover(); r != nil {
dao.Rollback(db)
panic(r)
}
}()
isDuplicated, err := c.addOrderStatus(orderStatus, db)
if err == nil {
dao.Commit(db)
if !isDuplicated {
_ = scheduler.CurrentScheduler.OnOrderStatusChanged(orderStatus, false)
}
} else {
dao.Rollback(db)
}
return err
}
@@ -139,7 +176,7 @@ func (c *OrderManager) OnOrderMsg(order *model.GoodsOrder, vendorStatus, remark
return err
}
func (c *OrderManager) SaveOrder(order *model.GoodsOrder, isAdjust bool, db orm.Ormer) (isDuplicated bool, err error) {
func (c *OrderManager) SaveOrder(order *model.GoodsOrder, isAdjust bool, db *dao.DaoDB) (isDuplicated bool, err error) {
globals.SugarLogger.Debugf("SaveOrder orderID:%s, VendorStoreID:%s", order.VendorOrderID, order.VendorStoreID)
// 忽略查找JX信息错误
c.updateOrderOtherInfo(order, db)
@@ -147,16 +184,25 @@ func (c *OrderManager) SaveOrder(order *model.GoodsOrder, isAdjust bool, db orm.
order.WaybillVendorID = model.VendorIDUnknown
order.OrderFinishedAt = utils.DefaultTimeValue
dao.Begin(db)
defer func() {
if r := recover(); r != nil || err != nil {
dao.Rollback(db)
if r != nil {
panic(r)
}
}
}()
// todo hardcode 兼容京东消息错序问题
if true { //order.VendorID == model.VendorIDJD {
orderStatus := &model.OrderStatus{}
if db.Raw(`
if dao.GetRow(db, orderStatus, `
SELECT *
FROM order_status
WHERE order_type = ? AND vendor_order_id = ? AND vendor_id = ?
ORDER BY status_time DESC
LIMIT 1
`, model.OrderTypeOrder, order.VendorOrderID, order.VendorID).QueryRow(orderStatus) == nil {
`, model.OrderTypeOrder, order.VendorOrderID, order.VendorID) == nil {
if orderStatus.Status > order.Status {
order.Status = orderStatus.Status
order.VendorStatus = orderStatus.VendorStatus
@@ -166,13 +212,8 @@ func (c *OrderManager) SaveOrder(order *model.GoodsOrder, isAdjust bool, db orm.
}
order.OrderCreatedAt = order.StatusTime
// globals.SugarLogger.Debugf("saveOrder isAdjust:%t, order:%v", isAdjust, order)
db.Begin()
defer func() {
db.Rollback()
}()
created, _, err2 := db.ReadOrCreate(order, "VendorOrderID", "VendorID")
created, _, err2 := db.Db.ReadOrCreate(order, "VendorOrderID", "VendorID")
if err = err2; err == nil {
originalOrder := &model.GoodsOrderOriginal{
VendorOrderID: order.VendorOrderID,
@@ -180,7 +221,7 @@ func (c *OrderManager) SaveOrder(order *model.GoodsOrder, isAdjust bool, db orm.
OrderCreatedAt: order.OrderCreatedAt,
OriginalData: order.OriginalData,
}
if _, _, err = db.ReadOrCreate(originalOrder, "VendorOrderID", "VendorID"); err == nil {
if _, _, err = db.Db.ReadOrCreate(originalOrder, "VendorOrderID", "VendorID"); err == nil {
if created {
sql := `INSERT INTO order_sku(vendor_order_id, vendor_id, count, vendor_sku_id, sku_id, jx_sku_id, sku_name,
shop_price, sale_price, weight, sku_type, promotion_type, order_created_at) VALUES`
@@ -196,26 +237,26 @@ func (c *OrderManager) SaveOrder(order *model.GoodsOrder, isAdjust bool, db orm.
sku.ShopPrice, sku.SalePrice, sku.Weight, sku.SkuType, sku.PromotionType, order.OrderCreatedAt)
}
sql = sql[:len(sql)-1] + ";"
if _, err = db.Raw(sql, params...).Exec(); err != nil {
if _, err = db.Db.Raw(sql, params...).Exec(); err != nil {
baseapi.SugarLogger.Warnf("saveOrder insert order:%v, order_sku error:%v", order, err)
} else {
db.Commit()
}
} else {
isDuplicated = true
order.DuplicatedCount++
db.Update(order, "DuplicatedCount")
db.Commit()
db.Db.Update(order, "DuplicatedCount")
baseapi.SugarLogger.Infof("saveOrder duplicated orderid:%s msg received", order.VendorOrderID)
}
}
} else {
globals.SugarLogger.Warnf("saveOrder create order:%v, error:%v", order, err)
}
if err == nil {
dao.Commit(db)
}
return isDuplicated, err
}
func (c *OrderManager) updateOrderSkuOtherInfo(order *model.GoodsOrder, db orm.Ormer) (err error) {
func (c *OrderManager) updateOrderSkuOtherInfo(order *model.GoodsOrder, db *dao.DaoDB) (err error) {
globals.SugarLogger.Debugf("updateOrderSkuOtherInfo orderID:%s, VendorStoreID:%s", order.VendorOrderID, order.VendorStoreID)
jxStoreID := jxutils.GetShowStoreIDFromOrder(order)
opNumStr := "2"
@@ -251,8 +292,7 @@ func (c *OrderManager) updateOrderSkuOtherInfo(order *model.GoodsOrder, db orm.O
WHERE t1.deleted_at = ? AND %s.%s_id IN (-1, ` + dao.GenQuestionMarks(len(vendorSkuIDs)) + ")"
sql = fmt.Sprintf(sql, tableName, fieldPrefix, tableName, fieldPrefix)
var skuInfos []*tStoreSkuBindAndVendorSkuID
db2 := dao.WrapDB(db)
if err = dao.GetRows(db2, &skuInfos, sql, utils.DefaultTimeValue, jxStoreID, utils.DefaultTimeValue, vendorSkuIDs); err != nil {
if err = dao.GetRows(db, &skuInfos, sql, utils.DefaultTimeValue, jxStoreID, utils.DefaultTimeValue, vendorSkuIDs); err != nil {
globals.SugarLogger.Errorf("updateOrderSkuOtherInfo can not get sku info for orderID:%s, error:%v", order.VendorOrderID, err)
return err
}
@@ -282,7 +322,7 @@ func (c *OrderManager) updateOrderSkuOtherInfo(order *model.GoodsOrder, db orm.O
return nil
}
func (c *OrderManager) updateOrderOtherInfo(order *model.GoodsOrder, db orm.Ormer) (err error) {
func (c *OrderManager) updateOrderOtherInfo(order *model.GoodsOrder, db *dao.DaoDB) (err error) {
globals.SugarLogger.Debugf("updateOrderOtherInfo orderID:%s, VendorStoreID:%s", order.VendorOrderID, order.VendorStoreID)
storeMap := &model.StoreMap{
@@ -290,8 +330,7 @@ func (c *OrderManager) updateOrderOtherInfo(order *model.GoodsOrder, db orm.Orme
VendorStoreID: order.VendorStoreID,
}
storeMap.DeletedAt = utils.DefaultTimeValue
db2 := dao.WrapDB(db)
if err = dao.GetEntity(db2, storeMap, model.FieldVendorID, model.FieldVendorStoreID, model.FieldDeletedAt); err != nil && err != orm.ErrNoRows {
if err = dao.GetEntity(db, storeMap, model.FieldVendorID, model.FieldVendorStoreID, model.FieldDeletedAt); err != nil && err != orm.ErrNoRows {
globals.SugarLogger.Warnf("updateOrderOtherInfo GetEntity orderID:%s, VendorStoreID:%s, error:%v", order.VendorOrderID, order.VendorStoreID, err)
return err
}
@@ -306,9 +345,9 @@ func (c *OrderManager) updateOrderOtherInfo(order *model.GoodsOrder, db orm.Orme
return err
}
func (c *OrderManager) addOrderStatus(orderStatus *model.OrderStatus, db orm.Ormer) (isDuplicated bool, err error) {
func (c *OrderManager) addOrderStatus(orderStatus *model.OrderStatus, db *dao.DaoDB) (isDuplicated bool, err error) {
if db == nil {
db = orm.NewOrm()
db = dao.GetDB()
}
isDuplicated, err = addOrderOrWaybillStatus(orderStatus, db)
if err == nil && !isDuplicated &&
@@ -318,7 +357,7 @@ func (c *OrderManager) addOrderStatus(orderStatus *model.OrderStatus, db orm.Orm
VendorOrderID: orderStatus.VendorOrderID,
VendorID: orderStatus.VendorID,
}
if err = db.ReadForUpdate(order, "VendorOrderID", "VendorID"); err == nil {
if err = db.Db.ReadForUpdate(order, "VendorOrderID", "VendorID"); err == nil {
if (orderStatus.Status == model.OrderStatusUnlocked || orderStatus.Status == model.OrderStatusLocked || orderStatus.Status == model.OrderStatusApplyCancel) ||
(orderStatus.Status > model.OrderStatusUnknown && orderStatus.Status >= order.Status) { // todo 要求status不能回绕
order.VendorStatus = orderStatus.VendorStatus
@@ -345,7 +384,7 @@ func (c *OrderManager) addOrderStatus(orderStatus *model.OrderStatus, db orm.Orm
updateFields = append(updateFields, "OrderFinishedAt")
}
utils.CallFuncLogError(func() error {
_, err = db.Update(order, updateFields...)
_, err = db.Db.Update(order, updateFields...)
return err
}, "addOrderStatus update orderID:%s, status:%v", order.VendorOrderID, orderStatus)
} else {

View File

@@ -9,9 +9,9 @@ import (
"git.rosy.net.cn/jx-callback/business/jxcallback/scheduler"
"git.rosy.net.cn/jx-callback/business/jxutils"
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/model/dao"
"git.rosy.net.cn/jx-callback/business/partner"
"git.rosy.net.cn/jx-callback/globals"
"github.com/astaxie/beego/orm"
)
const (
@@ -62,22 +62,31 @@ func init() {
partner.InitOrderManager(FixedOrderManager)
}
func addOrderOrWaybillStatus(status *model.OrderStatus, db orm.Ormer) (isDuplicated bool, err error) {
func addOrderOrWaybillStatus(status *model.OrderStatus, db *dao.DaoDB) (isDuplicated bool, err error) {
if status.OrderType == model.OrderTypeOrder {
globals.SugarLogger.Debugf("addOrderStatus order:%v", status)
} else {
globals.SugarLogger.Debugf("addOrderStatus waybill:%v", status)
}
dao.Begin(db)
defer func() {
if r := recover(); r != nil || err != nil {
globals.SugarLogger.Debug("rollback")
dao.Rollback(db)
if r != nil {
panic(r)
}
}
}()
status.ID = 0
created, _, err := db.ReadOrCreate(status, "VendorOrderID", "VendorID", "OrderType", "VendorStatus", "StatusTime")
created, _, err := db.Db.ReadOrCreate(status, "VendorOrderID", "VendorID", "OrderType", "VendorStatus", "StatusTime")
if err == nil {
if !created {
globals.SugarLogger.Debugf("duplicated event:%v", status)
isDuplicated = true
status.DuplicatedCount++
utils.CallFuncLogError(func() error {
_, err = db.Update(status, "DuplicatedCount")
_, err = db.Db.Update(status, "DuplicatedCount")
return err
}, "addOrderOrWaybillStatus update DuplicatedCount, status:%v", status)
}
@@ -85,6 +94,8 @@ func addOrderOrWaybillStatus(status *model.OrderStatus, db orm.Ormer) (isDuplica
if err != nil {
// todo 这里居然会有主键重复错误,逻辑上是不应该的
globals.SugarLogger.Warnf("addOrderOrWaybillStatus status:%v, access db error:%v", status, err)
} else {
dao.Commit(db)
}
return isDuplicated, err
}

View File

@@ -6,6 +6,7 @@ import (
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxcallback/scheduler"
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/model/dao"
"git.rosy.net.cn/jx-callback/globals"
"github.com/astaxie/beego/orm"
)
@@ -31,7 +32,7 @@ func (w *OrderManager) LoadPendingWaybills() []*model.Waybill {
return bills
}
func (w *OrderManager) onWaybillNew(bill2 *model.Waybill, db orm.Ormer) (isDuplicated bool, err error) {
func (w *OrderManager) onWaybillNew(bill2 *model.Waybill, db *dao.DaoDB) (isDuplicated bool, err error) {
globals.SugarLogger.Debugf("onWaybillNew bill:%v", bill2)
isDuplicated, err = addOrderOrWaybillStatus(model.Waybill2Status(bill2), db)
if err == nil && !isDuplicated {
@@ -40,16 +41,16 @@ func (w *OrderManager) onWaybillNew(bill2 *model.Waybill, db orm.Ormer) (isDupli
bill2.WaybillFinishedAt = utils.DefaultTimeValue
billCopied := *bill2
bill := &billCopied
created, _, err2 := db.ReadOrCreate(bill, "VendorWaybillID", "WaybillVendorID")
created, _, err2 := db.Db.ReadOrCreate(bill, "VendorWaybillID", "WaybillVendorID")
if err = err2; err == nil {
if !created {
bill.DuplicatedCount++
if bill2.VendorOrderID == bill2.VendorWaybillID { // 购物平台(比如京东)重新建的运单,单号始终是与订单相同的
bill2.ID = bill.ID
bill2.DuplicatedCount = bill.DuplicatedCount
db.Update(bill2) //更新所有字段
db.Db.Update(bill2) //更新所有字段
} else {
db.Update(bill, "DuplicatedCount")
db.Db.Update(bill, "DuplicatedCount")
isDuplicated = true
globals.SugarLogger.Infof("onWaybillNew duplicated bill:%v msg received", bill2)
}
@@ -65,7 +66,14 @@ func (w *OrderManager) onWaybillNew(bill2 *model.Waybill, db orm.Ormer) (isDupli
func (w *OrderManager) OnWaybillStatusChanged(bill *model.Waybill) (err error) {
var isDuplicated bool
db := orm.NewOrm()
db := dao.GetDB()
dao.Begin(db)
defer func() {
if r := recover(); r != nil {
dao.Rollback(db)
panic(r)
}
}()
if bill.Status == model.WaybillStatusNew {
isDuplicated, err = w.onWaybillNew(bill, db)
} else {
@@ -83,13 +91,18 @@ func (w *OrderManager) OnWaybillStatusChanged(bill *model.Waybill) (err error) {
}
isDuplicated, err = w.addWaybillStatus(bill, db, addParams)
}
if err == nil && !isDuplicated {
scheduler.CurrentScheduler.OnWaybillStatusChanged(bill, false)
if err == nil {
dao.Commit(db)
if !isDuplicated {
scheduler.CurrentScheduler.OnWaybillStatusChanged(bill, false)
}
} else {
dao.Rollback(db)
}
return err
}
func (w *OrderManager) addWaybillStatus(bill *model.Waybill, db orm.Ormer, addParams orm.Params) (isDuplicated bool, err error) {
func (w *OrderManager) addWaybillStatus(bill *model.Waybill, db *dao.DaoDB, addParams orm.Params) (isDuplicated bool, err error) {
waybillStatus := model.Waybill2Status(bill)
isDuplicated, err = addOrderOrWaybillStatus(waybillStatus, db)
if err == nil && !isDuplicated && waybillStatus.Status > model.WaybillStatusUnknown { // todo 这里应该和addOrderStatus一样的改法状态不能回绕
@@ -99,7 +112,7 @@ func (w *OrderManager) addWaybillStatus(bill *model.Waybill, db orm.Ormer, addPa
"status_time": bill.StatusTime,
}, addParams)
utils.CallFuncLogError(func() error {
_, err = db.QueryTable("waybill").Filter("vendor_waybill_id", bill.VendorWaybillID).Filter("waybill_vendor_id", bill.WaybillVendorID).Filter("status__lte", bill.Status).Update(params)
_, err = db.Db.QueryTable("waybill").Filter("vendor_waybill_id", bill.VendorWaybillID).Filter("waybill_vendor_id", bill.WaybillVendorID).Filter("status__lte", bill.Status).Update(params)
return err
}, "addWaybillStatus update waybill status, bill:%v", bill)
}

View File

@@ -638,8 +638,7 @@ func CreateOrderFromOriginal(ctx *jxcontext.Context, isAsync, isContinueWhenErro
}
}
if err == nil {
rawDB := orm.NewOrm()
if _, err = orderman.FixedOrderManager.SaveOrder(order, false, rawDB); err != nil {
if _, err = orderman.FixedOrderManager.SaveOrder(order, false, dao.GetDB()); err != nil {
globals.SugarLogger.Debugf("CreateOrderFromOriginal abnormal orderID:%s, error:%v", orderOriginal.VendorOrderID, err)
}
} else {

View File

@@ -9,7 +9,6 @@ import (
"git.rosy.net.cn/jx-callback/business/jxutils/tasksch"
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/model/dao"
"github.com/astaxie/beego/orm"
)
const (
@@ -99,7 +98,7 @@ var (
)
type IOrderManager interface {
SaveOrder(order *model.GoodsOrder, isAdjust bool, db orm.Ormer) (isDuplicated bool, err error)
SaveOrder(order *model.GoodsOrder, isAdjust bool, db *dao.DaoDB) (isDuplicated bool, err error)
OnOrderNew(order *model.GoodsOrder, msgVendorStatus string) (err error)
OnOrderAdjust(order *model.GoodsOrder, msgVendorStatus string) (err error)