From ccf9a4d6b9b9ec2db72ebd5f78fe5a529f6ae28e Mon Sep 17 00:00:00 2001 From: gazebo Date: Tue, 5 Mar 2019 22:35:20 +0800 Subject: [PATCH] - first edition of long pulling --- business/jxcallback/orderman/order.go | 3 + business/model/dao/dao_order.go | 25 +++++ business/msghub/msghub.go | 153 ++++++++++++++++++++++++++ controllers/cms.go | 41 +++++++ routers/commentsRouter_controllers.go | 16 +++ 5 files changed, 238 insertions(+) create mode 100644 business/model/dao/dao_order.go create mode 100644 business/msghub/msghub.go diff --git a/business/jxcallback/orderman/order.go b/business/jxcallback/orderman/order.go index 73fbdbafe..e7a8fd94a 100644 --- a/business/jxcallback/orderman/order.go +++ b/business/jxcallback/orderman/order.go @@ -11,6 +11,7 @@ import ( "git.rosy.net.cn/jx-callback/business/jxutils" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model/dao" + "git.rosy.net.cn/jx-callback/business/msghub" "git.rosy.net.cn/jx-callback/globals" "github.com/astaxie/beego/orm" ) @@ -64,6 +65,7 @@ func (c *OrderManager) OnOrderNew(order *model.GoodsOrder, msgVendorStatus strin isDuplicated, err := addOrderOrWaybillStatus(status, db) if err == nil && !isDuplicated { if isDuplicated, err = c.SaveOrder(order, false, db); err == nil && !isDuplicated { + msghub.OnNewOrder(order) err = scheduler.CurrentScheduler.OnOrderNew(order, false) } } @@ -97,6 +99,7 @@ func (c *OrderManager) OnOrderAdjust(order *model.GoodsOrder, msgVendorStatus st return err } if isDuplicated, err = c.SaveOrder(order, true, db); err == nil && !isDuplicated { + msghub.OnNewOrder(order) // 因为订单调度器需要的是真实状态,所以用order的状态 err = scheduler.CurrentScheduler.OnOrderNew(order, false) err = scheduler.CurrentScheduler.OnOrderStatusChanged(model.Order2Status(order), false) diff --git a/business/model/dao/dao_order.go b/business/model/dao/dao_order.go new file mode 100644 index 000000000..24f855ecd --- /dev/null +++ b/business/model/dao/dao_order.go @@ -0,0 +1,25 @@ +package dao + +import ( + "time" + + "git.rosy.net.cn/jx-callback/business/model" +) + +func GetStoreOrderAfterTime(db *DaoDB, storeID int, orderTime time.Time, lastOrderSeqID int64) (orderList []*model.GoodsOrderExt, err error) { + sql := ` + SELECT t1.*, + t2.status waybill_status, t2.courier_name, t2.courier_mobile, + t2.actual_fee, t2.desired_fee, t2.waybill_created_at, t2.waybill_finished_at + FROM goods_order t1 + LEFT JOIN waybill t2 ON t1.vendor_waybill_id = t2.vendor_waybill_id AND t1.waybill_vendor_id = t2.waybill_vendor_id + WHERE IF(t1.jx_store_id <> 0, t1.jx_store_id, t1.store_id) = ? AND t1.order_created_at >= ? AND t1.id > ? + LIMIT 50; + ` + sqlParams := []interface{}{ + storeID, + orderTime, + lastOrderSeqID, + } + return orderList, GetRows(db, &orderList, sql, sqlParams...) +} diff --git a/business/msghub/msghub.go b/business/msghub/msghub.go new file mode 100644 index 000000000..0dc10ea5f --- /dev/null +++ b/business/msghub/msghub.go @@ -0,0 +1,153 @@ +package msghub + +import ( + "time" + + "git.rosy.net.cn/baseapi/utils" + "git.rosy.net.cn/jx-callback/business/jxutils" + "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" + "git.rosy.net.cn/jx-callback/business/model" + "git.rosy.net.cn/jx-callback/business/model/dao" + "git.rosy.net.cn/jx-callback/globals" +) + +const ( + ServerMsgRegister = "register" + ServerMsgUnregister = "unregister" + + ServerMsgPing = "ping" + ServerMsgNewOrder = "newOrder" +) + +const ( + maxGetOrderTimeDuration = 2 * time.Hour + pollingDuration = 5 * time.Minute +) + +var ( + channelMap map[int]map[chan<- *ServerMsg]int + msgChan chan *ServerMsg +) + +type MsgOp struct { + StoreID int + Chan2Listen chan<- *ServerMsg + Chan2Close chan int +} + +type ServerMsg struct { + Type string + StoreID int + MsgData interface{} +} + +func init() { + utils.CallFuncAsync(routinueFunc) +} + +func routinueFunc() { + msgChan = make(chan *ServerMsg, 100) + channelMap = make(map[int]map[chan<- *ServerMsg]int) + + for { + msg, ok := <-msgChan + if ok { + switch msg.Type { + case ServerMsgRegister: + registerMsg := msg.MsgData.(*MsgOp) + if channelMap[registerMsg.StoreID] == nil { + channelMap[registerMsg.StoreID] = make(map[chan<- *ServerMsg]int) + } + channelMap[registerMsg.StoreID][registerMsg.Chan2Listen] = 1 + case ServerMsgUnregister: + registerMsg := msg.MsgData.(*MsgOp) + delete(channelMap[registerMsg.StoreID], registerMsg.Chan2Listen) + close(registerMsg.Chan2Close) + case ServerMsgNewOrder: + globals.SugarLogger.Debugf("msghub routinueFunc, msg:%s", utils.Format4Output(msg, false)) + go func() { + for chan2Send := range channelMap[msg.StoreID] { + chan2Send <- msg + delete(channelMap[msg.StoreID], chan2Send) + } + }() + } + } + } +} + +func registerChan(storeID int, chan2Listen chan<- *ServerMsg) { + msgChan <- &ServerMsg{ + Type: ServerMsgRegister, + MsgData: &MsgOp{ + StoreID: storeID, + Chan2Listen: chan2Listen, + }, + } +} + +func unregisterChan(storeID int, chan2Listen chan<- *ServerMsg) { + chan2Close := make(chan int) + msgChan <- &ServerMsg{ + Type: ServerMsgRegister, + MsgData: &MsgOp{ + StoreID: storeID, + Chan2Listen: chan2Listen, + Chan2Close: chan2Close, + }, + } + <-chan2Close +} + +func getPendingOrderList(storeID int, lastOrderTime time.Time, lastOrderSeqID int64) (orderList []*model.GoodsOrderExt, err error) { + if !jxutils.IsTimeEmpty(lastOrderTime) { + if time.Now().Sub(lastOrderTime) > maxGetOrderTimeDuration { + // lastOrderTime = time.Now().Add(-maxGetOrderTimeDuration) + } + orderList, err = dao.GetStoreOrderAfterTime(dao.GetDB(), storeID, lastOrderTime, lastOrderSeqID) + } + return orderList, err +} + +func GetMsg(ctx *jxcontext.Context, storeID int, lastOrderTime time.Time, lastOrderSeqID int64, msgTypeList []string) (msg *ServerMsg, err error) { + orderList, err := getPendingOrderList(storeID, lastOrderTime, lastOrderSeqID) + if err == nil { + if len(orderList) == 0 { + chan2Listen := make(chan *ServerMsg, 1) + registerChan(storeID, chan2Listen) + timer := time.NewTimer(pollingDuration) + select { + case msg2, ok := <-chan2Listen: + timer.Stop() + if ok { + msg = msg2 + } + case <-timer.C: + unregisterChan(storeID, chan2Listen) + } + close(chan2Listen) + } else { + msg = &ServerMsg{ + Type: ServerMsgNewOrder, + StoreID: storeID, + MsgData: orderList, + } + } + } + return msg, err +} + +func OnNewOrder(order *model.GoodsOrder) { + globals.SugarLogger.Debugf("msghub OnNewOrder, order:%s", utils.Format4Output(order, false)) + go func() { + msgChan <- &ServerMsg{ + Type: ServerMsgNewOrder, + StoreID: jxutils.GetSaleStoreIDFromOrder(order), + MsgData: []*model.GoodsOrderExt{ + &model.GoodsOrderExt{ + GoodsOrder: *order, + }, + }, + } + }() +} diff --git a/controllers/cms.go b/controllers/cms.go index 68dcbb619..e727b7465 100644 --- a/controllers/cms.go +++ b/controllers/cms.go @@ -2,8 +2,11 @@ package controllers import ( "git.rosy.net.cn/baseapi/utils" + "git.rosy.net.cn/jx-callback/business/jxcallback/orderman" "git.rosy.net.cn/jx-callback/business/jxstore/cms" + "git.rosy.net.cn/jx-callback/business/jxutils" "git.rosy.net.cn/jx-callback/business/jxutils/configindb" + "git.rosy.net.cn/jx-callback/business/msghub" "git.rosy.net.cn/jx-callback/globals/api" "github.com/astaxie/beego" ) @@ -149,3 +152,41 @@ func (c *CmsController) GetProductInfoByBarCode() { return retVal, "", err }) } + +// @Title 得到服务器消息通知 +// @Description 得到服务器消息通知 +// @Param token header string true "认证token" +// @Param storeID query int true "京西门店ID" +// @Param lastOrderTime query string true "最后订单时间" +// @Param lastOrderSeqID query string true "最后订单流水ID" +// @Success 200 {object} controllers.CallResult +// @Failure 200 {object} controllers.CallResult +// @router /GetNewOrderMsg [get] +func (c *CmsController) GetNewOrderMsg() { + c.callGetNewOrderMsg(func(params *tCmsGetNewOrderMsgParams) (retVal interface{}, errCode string, err error) { + timeList, err := jxutils.BatchStr2Time(params.LastOrderTime) + if err == nil { + lastOrderSeqID := utils.Str2Int64WithDefault(params.LastOrderSeqID, 0) + retVal, err = msghub.GetMsg(params.Ctx, params.StoreID, timeList[0], lastOrderSeqID, nil) + } + return retVal, "", err + }) +} + +// @Title 发送新订单消息(测试用) +// @Description 发送新订单消息(测试用) +// @Param token header string true "认证token" +// @Param vendorOrderID formData string true "订单ID" +// @Param vendorID formData int true "订单厂商ID" +// @Success 200 {object} controllers.CallResult +// @Failure 200 {object} controllers.CallResult +// @router /FakeNewOrder [post] +func (c *CmsController) FakeNewOrder() { + c.callFakeNewOrder(func(params *tCmsFakeNewOrderParams) (retVal interface{}, errCode string, err error) { + order, err := orderman.FixedOrderManager.LoadOrder(params.VendorOrderID, params.VendorID) + if err == nil { + msghub.OnNewOrder(order) + } + return retVal, "", err + }) +} diff --git a/routers/commentsRouter_controllers.go b/routers/commentsRouter_controllers.go index bf6028cb2..6ebd692c2 100644 --- a/routers/commentsRouter_controllers.go +++ b/routers/commentsRouter_controllers.go @@ -159,6 +159,14 @@ func init() { MethodParams: param.Make(), Params: nil}) + beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:CmsController"] = append(beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:CmsController"], + beego.ControllerComments{ + Method: "FakeNewOrder", + Router: `/FakeNewOrder`, + AllowHTTPMethods: []string{"post"}, + MethodParams: param.Make(), + Params: nil}) + beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:CmsController"] = append(beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:CmsController"], beego.ControllerComments{ Method: "GetConfig", @@ -175,6 +183,14 @@ func init() { MethodParams: param.Make(), Params: nil}) + beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:CmsController"] = append(beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:CmsController"], + beego.ControllerComments{ + Method: "GetNewOrderMsg", + Router: `/GetNewOrderMsg`, + AllowHTTPMethods: []string{"get"}, + MethodParams: param.Make(), + Params: nil}) + beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:CmsController"] = append(beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:CmsController"], beego.ControllerComments{ Method: "GetPlaces",