diff --git a/business/jxcallback/orderman/order.go b/business/jxcallback/orderman/order.go index 0251a1c44..ef9b9a898 100644 --- a/business/jxcallback/orderman/order.go +++ b/business/jxcallback/orderman/order.go @@ -56,8 +56,14 @@ func (c *OrderManager) OnOrderNew(order *model.GoodsOrder, msgVendorStatus strin order.ConsigneeMobile2 = order.ConsigneeMobile } - // todo transaction - db := orm.NewOrm() + db := dao.GetDB() + dao.Begin(db) + defer func() { + if r := recover(); r != nil { + dao.Rollback(db) + panic(r) + } + }() if order.Status == model.OrderStatusUnknown { order.Status = model.OrderStatusNew } @@ -68,9 +74,15 @@ func (c *OrderManager) OnOrderNew(order *model.GoodsOrder, msgVendorStatus strin status.VendorStatus = msgVendorStatus isDuplicated, err := addOrderOrWaybillStatus(status, db) if err == nil && !isDuplicated { - if isDuplicated, err = c.SaveOrder(order, false, db); err == nil && !isDuplicated { + isDuplicated, err = c.SaveOrder(order, false, db) + } + if err == nil { + dao.Commit(db) + if !isDuplicated { err = scheduler.CurrentScheduler.OnOrderNew(order, false) } + } else { + dao.Rollback(db) } return err } @@ -81,8 +93,14 @@ func (c *OrderManager) OnOrderAdjust(order *model.GoodsOrder, msgVendorStatus st order.ConsigneeMobile2 = order.ConsigneeMobile } - // todo transaction - db := orm.NewOrm() + db := dao.GetDB() + dao.Begin(db) + defer func() { + if r := recover(); r != nil { + dao.Rollback(db) + panic(r) + } + }() if order.Status == model.OrderStatusUnknown { order.Status = model.OrderStatusNew } @@ -92,33 +110,52 @@ func (c *OrderManager) OnOrderAdjust(order *model.GoodsOrder, msgVendorStatus st isDuplicated, err := addOrderOrWaybillStatus(status, db) if err == nil && !isDuplicated { err = utils.CallFuncLogError(func() error { - _, err = db.Raw("DELETE FROM order_sku WHERE vendor_order_id = ? AND vendor_id = ?", order.VendorOrderID, order.VendorID).Exec() + _, err = db.Db.Raw("DELETE FROM order_sku WHERE vendor_order_id = ? AND vendor_id = ?", order.VendorOrderID, order.VendorID).Exec() return err }, "OnAdjustOrder delete order, orderID:%s", order.VendorOrderID) if err != nil { return err } err = utils.CallFuncLogError(func() error { - _, err = db.Raw("DELETE FROM goods_order WHERE vendor_order_id = ? AND vendor_id = ?", order.VendorOrderID, order.VendorID).Exec() + _, err = db.Db.Raw("DELETE FROM goods_order WHERE vendor_order_id = ? AND vendor_id = ?", order.VendorOrderID, order.VendorID).Exec() return err }, "OnAdjustOrder delete order_sku, orderID:%s", order.VendorOrderID) if err != nil { return err } - if isDuplicated, err = c.SaveOrder(order, true, db); err == nil && !isDuplicated { + isDuplicated, err = c.SaveOrder(order, true, db) + } + if err == nil { + dao.Commit(db) + if !isDuplicated { msghub.OnNewOrder(order) // 因为订单调度器需要的是真实状态,所以用order的状态 - err = scheduler.CurrentScheduler.OnOrderNew(order, false) - err = scheduler.CurrentScheduler.OnOrderStatusChanged(model.Order2Status(order), false) + _ = scheduler.CurrentScheduler.OnOrderNew(order, false) + _ = scheduler.CurrentScheduler.OnOrderStatusChanged(model.Order2Status(order), false) } + } else { + dao.Rollback(db) } return err } func (c *OrderManager) OnOrderStatusChanged(orderStatus *model.OrderStatus) (err error) { - isDuplicated, err := c.addOrderStatus(orderStatus, nil) - if err == nil && !isDuplicated { - err = scheduler.CurrentScheduler.OnOrderStatusChanged(orderStatus, false) + db := dao.GetDB() + dao.Begin(db) + defer func() { + if r := recover(); r != nil { + dao.Rollback(db) + panic(r) + } + }() + isDuplicated, err := c.addOrderStatus(orderStatus, db) + if err == nil { + dao.Commit(db) + if !isDuplicated { + _ = scheduler.CurrentScheduler.OnOrderStatusChanged(orderStatus, false) + } + } else { + dao.Rollback(db) } return err } @@ -139,7 +176,7 @@ func (c *OrderManager) OnOrderMsg(order *model.GoodsOrder, vendorStatus, remark return err } -func (c *OrderManager) SaveOrder(order *model.GoodsOrder, isAdjust bool, db orm.Ormer) (isDuplicated bool, err error) { +func (c *OrderManager) SaveOrder(order *model.GoodsOrder, isAdjust bool, db *dao.DaoDB) (isDuplicated bool, err error) { globals.SugarLogger.Debugf("SaveOrder orderID:%s, VendorStoreID:%s", order.VendorOrderID, order.VendorStoreID) // 忽略查找JX信息错误 c.updateOrderOtherInfo(order, db) @@ -147,16 +184,25 @@ func (c *OrderManager) SaveOrder(order *model.GoodsOrder, isAdjust bool, db orm. order.WaybillVendorID = model.VendorIDUnknown order.OrderFinishedAt = utils.DefaultTimeValue + dao.Begin(db) + defer func() { + if r := recover(); r != nil || err != nil { + dao.Rollback(db) + if r != nil { + panic(r) + } + } + }() // todo hardcode 兼容京东消息错序问题 if true { //order.VendorID == model.VendorIDJD { orderStatus := &model.OrderStatus{} - if db.Raw(` + if dao.GetRow(db, orderStatus, ` SELECT * FROM order_status WHERE order_type = ? AND vendor_order_id = ? AND vendor_id = ? ORDER BY status_time DESC LIMIT 1 - `, model.OrderTypeOrder, order.VendorOrderID, order.VendorID).QueryRow(orderStatus) == nil { + `, model.OrderTypeOrder, order.VendorOrderID, order.VendorID) == nil { if orderStatus.Status > order.Status { order.Status = orderStatus.Status order.VendorStatus = orderStatus.VendorStatus @@ -166,13 +212,8 @@ func (c *OrderManager) SaveOrder(order *model.GoodsOrder, isAdjust bool, db orm. } order.OrderCreatedAt = order.StatusTime - // globals.SugarLogger.Debugf("saveOrder isAdjust:%t, order:%v", isAdjust, order) - db.Begin() - defer func() { - db.Rollback() - }() - created, _, err2 := db.ReadOrCreate(order, "VendorOrderID", "VendorID") + created, _, err2 := db.Db.ReadOrCreate(order, "VendorOrderID", "VendorID") if err = err2; err == nil { originalOrder := &model.GoodsOrderOriginal{ VendorOrderID: order.VendorOrderID, @@ -180,7 +221,7 @@ func (c *OrderManager) SaveOrder(order *model.GoodsOrder, isAdjust bool, db orm. OrderCreatedAt: order.OrderCreatedAt, OriginalData: order.OriginalData, } - if _, _, err = db.ReadOrCreate(originalOrder, "VendorOrderID", "VendorID"); err == nil { + if _, _, err = db.Db.ReadOrCreate(originalOrder, "VendorOrderID", "VendorID"); err == nil { if created { sql := `INSERT INTO order_sku(vendor_order_id, vendor_id, count, vendor_sku_id, sku_id, jx_sku_id, sku_name, shop_price, sale_price, weight, sku_type, promotion_type, order_created_at) VALUES` @@ -196,26 +237,26 @@ func (c *OrderManager) SaveOrder(order *model.GoodsOrder, isAdjust bool, db orm. sku.ShopPrice, sku.SalePrice, sku.Weight, sku.SkuType, sku.PromotionType, order.OrderCreatedAt) } sql = sql[:len(sql)-1] + ";" - if _, err = db.Raw(sql, params...).Exec(); err != nil { + if _, err = db.Db.Raw(sql, params...).Exec(); err != nil { baseapi.SugarLogger.Warnf("saveOrder insert order:%v, order_sku error:%v", order, err) - } else { - db.Commit() } } else { isDuplicated = true order.DuplicatedCount++ - db.Update(order, "DuplicatedCount") - db.Commit() + db.Db.Update(order, "DuplicatedCount") baseapi.SugarLogger.Infof("saveOrder duplicated orderid:%s msg received", order.VendorOrderID) } } } else { globals.SugarLogger.Warnf("saveOrder create order:%v, error:%v", order, err) } + if err == nil { + dao.Commit(db) + } return isDuplicated, err } -func (c *OrderManager) updateOrderSkuOtherInfo(order *model.GoodsOrder, db orm.Ormer) (err error) { +func (c *OrderManager) updateOrderSkuOtherInfo(order *model.GoodsOrder, db *dao.DaoDB) (err error) { globals.SugarLogger.Debugf("updateOrderSkuOtherInfo orderID:%s, VendorStoreID:%s", order.VendorOrderID, order.VendorStoreID) jxStoreID := jxutils.GetShowStoreIDFromOrder(order) opNumStr := "2" @@ -251,8 +292,7 @@ func (c *OrderManager) updateOrderSkuOtherInfo(order *model.GoodsOrder, db orm.O WHERE t1.deleted_at = ? AND %s.%s_id IN (-1, ` + dao.GenQuestionMarks(len(vendorSkuIDs)) + ")" sql = fmt.Sprintf(sql, tableName, fieldPrefix, tableName, fieldPrefix) var skuInfos []*tStoreSkuBindAndVendorSkuID - db2 := dao.WrapDB(db) - if err = dao.GetRows(db2, &skuInfos, sql, utils.DefaultTimeValue, jxStoreID, utils.DefaultTimeValue, vendorSkuIDs); err != nil { + if err = dao.GetRows(db, &skuInfos, sql, utils.DefaultTimeValue, jxStoreID, utils.DefaultTimeValue, vendorSkuIDs); err != nil { globals.SugarLogger.Errorf("updateOrderSkuOtherInfo can not get sku info for orderID:%s, error:%v", order.VendorOrderID, err) return err } @@ -282,7 +322,7 @@ func (c *OrderManager) updateOrderSkuOtherInfo(order *model.GoodsOrder, db orm.O return nil } -func (c *OrderManager) updateOrderOtherInfo(order *model.GoodsOrder, db orm.Ormer) (err error) { +func (c *OrderManager) updateOrderOtherInfo(order *model.GoodsOrder, db *dao.DaoDB) (err error) { globals.SugarLogger.Debugf("updateOrderOtherInfo orderID:%s, VendorStoreID:%s", order.VendorOrderID, order.VendorStoreID) storeMap := &model.StoreMap{ @@ -290,8 +330,7 @@ func (c *OrderManager) updateOrderOtherInfo(order *model.GoodsOrder, db orm.Orme VendorStoreID: order.VendorStoreID, } storeMap.DeletedAt = utils.DefaultTimeValue - db2 := dao.WrapDB(db) - if err = dao.GetEntity(db2, storeMap, model.FieldVendorID, model.FieldVendorStoreID, model.FieldDeletedAt); err != nil && err != orm.ErrNoRows { + if err = dao.GetEntity(db, storeMap, model.FieldVendorID, model.FieldVendorStoreID, model.FieldDeletedAt); err != nil && err != orm.ErrNoRows { globals.SugarLogger.Warnf("updateOrderOtherInfo GetEntity orderID:%s, VendorStoreID:%s, error:%v", order.VendorOrderID, order.VendorStoreID, err) return err } @@ -306,9 +345,9 @@ func (c *OrderManager) updateOrderOtherInfo(order *model.GoodsOrder, db orm.Orme return err } -func (c *OrderManager) addOrderStatus(orderStatus *model.OrderStatus, db orm.Ormer) (isDuplicated bool, err error) { +func (c *OrderManager) addOrderStatus(orderStatus *model.OrderStatus, db *dao.DaoDB) (isDuplicated bool, err error) { if db == nil { - db = orm.NewOrm() + db = dao.GetDB() } isDuplicated, err = addOrderOrWaybillStatus(orderStatus, db) if err == nil && !isDuplicated && @@ -318,7 +357,7 @@ func (c *OrderManager) addOrderStatus(orderStatus *model.OrderStatus, db orm.Orm VendorOrderID: orderStatus.VendorOrderID, VendorID: orderStatus.VendorID, } - if err = db.ReadForUpdate(order, "VendorOrderID", "VendorID"); err == nil { + if err = db.Db.ReadForUpdate(order, "VendorOrderID", "VendorID"); err == nil { if (orderStatus.Status == model.OrderStatusUnlocked || orderStatus.Status == model.OrderStatusLocked || orderStatus.Status == model.OrderStatusApplyCancel) || (orderStatus.Status > model.OrderStatusUnknown && orderStatus.Status >= order.Status) { // todo 要求status不能回绕 order.VendorStatus = orderStatus.VendorStatus @@ -345,7 +384,7 @@ func (c *OrderManager) addOrderStatus(orderStatus *model.OrderStatus, db orm.Orm updateFields = append(updateFields, "OrderFinishedAt") } utils.CallFuncLogError(func() error { - _, err = db.Update(order, updateFields...) + _, err = db.Db.Update(order, updateFields...) return err }, "addOrderStatus update orderID:%s, status:%v", order.VendorOrderID, orderStatus) } else { diff --git a/business/jxcallback/orderman/orderman.go b/business/jxcallback/orderman/orderman.go index cf4b0a7da..5e1a251a9 100644 --- a/business/jxcallback/orderman/orderman.go +++ b/business/jxcallback/orderman/orderman.go @@ -9,9 +9,9 @@ import ( "git.rosy.net.cn/jx-callback/business/jxcallback/scheduler" "git.rosy.net.cn/jx-callback/business/jxutils" "git.rosy.net.cn/jx-callback/business/model" + "git.rosy.net.cn/jx-callback/business/model/dao" "git.rosy.net.cn/jx-callback/business/partner" "git.rosy.net.cn/jx-callback/globals" - "github.com/astaxie/beego/orm" ) const ( @@ -62,22 +62,31 @@ func init() { partner.InitOrderManager(FixedOrderManager) } -func addOrderOrWaybillStatus(status *model.OrderStatus, db orm.Ormer) (isDuplicated bool, err error) { +func addOrderOrWaybillStatus(status *model.OrderStatus, db *dao.DaoDB) (isDuplicated bool, err error) { if status.OrderType == model.OrderTypeOrder { globals.SugarLogger.Debugf("addOrderStatus order:%v", status) } else { globals.SugarLogger.Debugf("addOrderStatus waybill:%v", status) } - + dao.Begin(db) + defer func() { + if r := recover(); r != nil || err != nil { + globals.SugarLogger.Debug("rollback") + dao.Rollback(db) + if r != nil { + panic(r) + } + } + }() status.ID = 0 - created, _, err := db.ReadOrCreate(status, "VendorOrderID", "VendorID", "OrderType", "VendorStatus", "StatusTime") + created, _, err := db.Db.ReadOrCreate(status, "VendorOrderID", "VendorID", "OrderType", "VendorStatus", "StatusTime") if err == nil { if !created { globals.SugarLogger.Debugf("duplicated event:%v", status) isDuplicated = true status.DuplicatedCount++ utils.CallFuncLogError(func() error { - _, err = db.Update(status, "DuplicatedCount") + _, err = db.Db.Update(status, "DuplicatedCount") return err }, "addOrderOrWaybillStatus update DuplicatedCount, status:%v", status) } @@ -85,6 +94,8 @@ func addOrderOrWaybillStatus(status *model.OrderStatus, db orm.Ormer) (isDuplica if err != nil { // todo 这里居然会有主键重复错误,逻辑上是不应该的 globals.SugarLogger.Warnf("addOrderOrWaybillStatus status:%v, access db error:%v", status, err) + } else { + dao.Commit(db) } return isDuplicated, err } diff --git a/business/jxcallback/orderman/waybill.go b/business/jxcallback/orderman/waybill.go index 782c025c0..7ffb2aab7 100644 --- a/business/jxcallback/orderman/waybill.go +++ b/business/jxcallback/orderman/waybill.go @@ -6,6 +6,7 @@ import ( "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxcallback/scheduler" "git.rosy.net.cn/jx-callback/business/model" + "git.rosy.net.cn/jx-callback/business/model/dao" "git.rosy.net.cn/jx-callback/globals" "github.com/astaxie/beego/orm" ) @@ -31,7 +32,7 @@ func (w *OrderManager) LoadPendingWaybills() []*model.Waybill { return bills } -func (w *OrderManager) onWaybillNew(bill2 *model.Waybill, db orm.Ormer) (isDuplicated bool, err error) { +func (w *OrderManager) onWaybillNew(bill2 *model.Waybill, db *dao.DaoDB) (isDuplicated bool, err error) { globals.SugarLogger.Debugf("onWaybillNew bill:%v", bill2) isDuplicated, err = addOrderOrWaybillStatus(model.Waybill2Status(bill2), db) if err == nil && !isDuplicated { @@ -40,16 +41,16 @@ func (w *OrderManager) onWaybillNew(bill2 *model.Waybill, db orm.Ormer) (isDupli bill2.WaybillFinishedAt = utils.DefaultTimeValue billCopied := *bill2 bill := &billCopied - created, _, err2 := db.ReadOrCreate(bill, "VendorWaybillID", "WaybillVendorID") + created, _, err2 := db.Db.ReadOrCreate(bill, "VendorWaybillID", "WaybillVendorID") if err = err2; err == nil { if !created { bill.DuplicatedCount++ if bill2.VendorOrderID == bill2.VendorWaybillID { // 购物平台(比如京东)重新建的运单,单号始终是与订单相同的 bill2.ID = bill.ID bill2.DuplicatedCount = bill.DuplicatedCount - db.Update(bill2) //更新所有字段 + db.Db.Update(bill2) //更新所有字段 } else { - db.Update(bill, "DuplicatedCount") + db.Db.Update(bill, "DuplicatedCount") isDuplicated = true globals.SugarLogger.Infof("onWaybillNew duplicated bill:%v msg received", bill2) } @@ -65,7 +66,14 @@ func (w *OrderManager) onWaybillNew(bill2 *model.Waybill, db orm.Ormer) (isDupli func (w *OrderManager) OnWaybillStatusChanged(bill *model.Waybill) (err error) { var isDuplicated bool - db := orm.NewOrm() + db := dao.GetDB() + dao.Begin(db) + defer func() { + if r := recover(); r != nil { + dao.Rollback(db) + panic(r) + } + }() if bill.Status == model.WaybillStatusNew { isDuplicated, err = w.onWaybillNew(bill, db) } else { @@ -83,13 +91,18 @@ func (w *OrderManager) OnWaybillStatusChanged(bill *model.Waybill) (err error) { } isDuplicated, err = w.addWaybillStatus(bill, db, addParams) } - if err == nil && !isDuplicated { - scheduler.CurrentScheduler.OnWaybillStatusChanged(bill, false) + if err == nil { + dao.Commit(db) + if !isDuplicated { + scheduler.CurrentScheduler.OnWaybillStatusChanged(bill, false) + } + } else { + dao.Rollback(db) } return err } -func (w *OrderManager) addWaybillStatus(bill *model.Waybill, db orm.Ormer, addParams orm.Params) (isDuplicated bool, err error) { +func (w *OrderManager) addWaybillStatus(bill *model.Waybill, db *dao.DaoDB, addParams orm.Params) (isDuplicated bool, err error) { waybillStatus := model.Waybill2Status(bill) isDuplicated, err = addOrderOrWaybillStatus(waybillStatus, db) if err == nil && !isDuplicated && waybillStatus.Status > model.WaybillStatusUnknown { // todo 这里应该和addOrderStatus一样的改法,状态不能回绕 @@ -99,7 +112,7 @@ func (w *OrderManager) addWaybillStatus(bill *model.Waybill, db orm.Ormer, addPa "status_time": bill.StatusTime, }, addParams) utils.CallFuncLogError(func() error { - _, err = db.QueryTable("waybill").Filter("vendor_waybill_id", bill.VendorWaybillID).Filter("waybill_vendor_id", bill.WaybillVendorID).Filter("status__lte", bill.Status).Update(params) + _, err = db.Db.QueryTable("waybill").Filter("vendor_waybill_id", bill.VendorWaybillID).Filter("waybill_vendor_id", bill.WaybillVendorID).Filter("status__lte", bill.Status).Update(params) return err }, "addWaybillStatus update waybill status, bill:%v", bill) } diff --git a/business/jxstore/tempop/tempop.go b/business/jxstore/tempop/tempop.go index dd9a82e48..29626cbc4 100644 --- a/business/jxstore/tempop/tempop.go +++ b/business/jxstore/tempop/tempop.go @@ -638,8 +638,7 @@ func CreateOrderFromOriginal(ctx *jxcontext.Context, isAsync, isContinueWhenErro } } if err == nil { - rawDB := orm.NewOrm() - if _, err = orderman.FixedOrderManager.SaveOrder(order, false, rawDB); err != nil { + if _, err = orderman.FixedOrderManager.SaveOrder(order, false, dao.GetDB()); err != nil { globals.SugarLogger.Debugf("CreateOrderFromOriginal abnormal orderID:%s, error:%v", orderOriginal.VendorOrderID, err) } } else { diff --git a/business/partner/partner.go b/business/partner/partner.go index d1d970b75..4d2fb54df 100644 --- a/business/partner/partner.go +++ b/business/partner/partner.go @@ -9,7 +9,6 @@ import ( "git.rosy.net.cn/jx-callback/business/jxutils/tasksch" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model/dao" - "github.com/astaxie/beego/orm" ) const ( @@ -99,7 +98,7 @@ var ( ) type IOrderManager interface { - SaveOrder(order *model.GoodsOrder, isAdjust bool, db orm.Ormer) (isDuplicated bool, err error) + SaveOrder(order *model.GoodsOrder, isAdjust bool, db *dao.DaoDB) (isDuplicated bool, err error) OnOrderNew(order *model.GoodsOrder, msgVendorStatus string) (err error) OnOrderAdjust(order *model.GoodsOrder, msgVendorStatus string) (err error)