From e456ee1be974d439ca2a0f03d999714e50e746b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E5=B0=B9=E5=B2=9A?= <770236076@qq.com> Date: Mon, 28 Dec 2020 16:47:43 +0800 Subject: [PATCH] a --- business/jxstore/event/event.go | 29 ++++++++++ controllers/event_controller.go | 78 ++++++++++++++++++++------- routers/commentsRouter_controllers.go | 9 ++++ 3 files changed, 98 insertions(+), 18 deletions(-) diff --git a/business/jxstore/event/event.go b/business/jxstore/event/event.go index d64e17656..f1386394f 100644 --- a/business/jxstore/event/event.go +++ b/business/jxstore/event/event.go @@ -486,3 +486,32 @@ func UpdateUserMessageGroupRead(ctx *jxcontext.Context, reads []*model.MessageGr } return err } + +type GetUserStatisticsResult struct { + RegisterUserCount int `json:"registerUserCount"` //注册数 + OnlineUserCount int `json:"onlineUserCount"` //在线用户数 + ConsumeUserCount int `json:"consumeUserCount"` //消费用户数 + MemberUserCount int `json:"memberUserCount"` //会员用户数 +} + +func GetUserStatistics(ctx *jxcontext.Context) (getUserStatisticsResult *GetUserStatisticsResult, err error) { + var ( + db = dao.GetDB() + ) + sql := ` + SELECT a.member_user_count, b.consume_user_count FROM + ( SELECT DISTINCT COUNT(a.user_id) member_user_count + FROM user a + JOIN user_member b ON a.user_id = b.user_id AND b.deleted_at = ? )a, + ( SELECT DISTINCT COUNT(a.user_id) consume_user_count + FROM user a + JOIN order b ON a.user_id = b.user_id AND b.status = ?)b + ` + sqlParams := []interface{}{ + utils.DefaultTimeValue, model.OrderStatusFinished, + } + err = dao.GetRow(db, &getUserStatisticsResult, sql, sqlParams) + paged, _ := dao.GetUsers2(db, "", "", 0, "", utils.ZeroTimeValue, utils.ZeroTimeValue, 0, nil, nil, 0, -1) + getUserStatisticsResult.RegisterUserCount = paged.TotalCount + return getUserStatisticsResult, err +} diff --git a/controllers/event_controller.go b/controllers/event_controller.go index af42aaa28..20ea4c520 100644 --- a/controllers/event_controller.go +++ b/controllers/event_controller.go @@ -7,6 +7,7 @@ import ( "net/http" "os" "path" + "sync" "time" "git.rosy.net.cn/jx-callback/business/jxstore/event" @@ -33,8 +34,15 @@ type EventController struct { } //连接的客户端,吧每个客户端都放进来 -var clients = make(map[int]map[string]*websocket.Conn) -var ClientsHeart = make(map[string]*websocket.Conn) +type WSClient struct { + Clients map[int]map[string]*websocket.Conn + ClientsHeart map[string]*websocket.Conn + s *sync.RWMutex +} + +var ( + wsClient = &WSClient{} +) //广播频道(通道) var broadcast = make(chan *model.ImMessageRecord) @@ -83,15 +91,17 @@ func (c *EventController) TestWebsocket() { if len(messageGroups) == 0 { return } + wsClient.s.Lock() clientUser[userID] = ws - ClientsHeart[userID] = ws + wsClient.ClientsHeart[userID] = ws for _, v := range messageGroups { - if len(clients[v.GroupID]) > 0 { - clients[v.GroupID][userID] = ws + if len(wsClient.Clients[v.GroupID]) > 0 { + wsClient.Clients[v.GroupID][userID] = ws } else { - clients[v.GroupID] = clientUser + wsClient.Clients[v.GroupID] = clientUser } } + wsClient.s.Unlock() db := dao.GetDB() if globals.IsProductEnv() { _, _, err = jxcontext.New(nil, c.GetString("token"), c.Ctx.ResponseWriter, c.Ctx.Request) @@ -104,7 +114,7 @@ func (c *EventController) TestWebsocket() { } globals.SugarLogger.Debugf("TestWebsocket connection...") } - globals.SugarLogger.Debugf("userID :%v ,clients :%v", userID, utils.Format4Output(clients, false)) + globals.SugarLogger.Debugf("userID :%v ,clients :%v", userID, utils.Format4Output(wsClient.Clients, false)) c.EnableRender = false //Beego不启用渲染 var s *model.ImMessageRecord @@ -113,10 +123,14 @@ func (c *EventController) TestWebsocket() { err := ws.ReadJSON(&s) if err != nil { globals.SugarLogger.Debugf("页面可能断开啦 ws.ReadJSON error: %v", err.Error()) - for k, _ := range clients { - delete(clients[k], userID) + wsClient.s.RLock() + for k, _ := range wsClient.Clients { + delete(wsClient.Clients[k], userID) } - delete(ClientsHeart, userID) + wsClient.s.RUnlock() + wsClient.s.Lock() + delete(wsClient.ClientsHeart, userID) + wsClient.s.Unlock() // delete(clients, ws) //删除map中的客户端 break //结束循环 } else { @@ -135,7 +149,7 @@ func (c *EventController) TestWebsocket() { } //如果这些人不在这个组的ws池子里就打上未读标记 for _, v := range userIDs { - if ClientsHeart[v] == nil { + if wsClient.ClientsHeart[v] == nil { messageGroupReads, _ := dao.GetMessageGroupRead(db, v, s.GroupID) for _, vv := range messageGroupReads { vv.UnReadCount++ @@ -155,6 +169,12 @@ func (c *EventController) TestWebsocket() { } func init() { + clients := make(map[int]map[string]*websocket.Conn) + clientsHeart := make(map[string]*websocket.Conn) + wsClient.Clients = clients + wsClient.ClientsHeart = clientsHeart + wsClient.s = new(sync.RWMutex) + go handleMessages() } @@ -165,19 +185,19 @@ func handleMessages() { msg := <-broadcast if msg.GroupID == 0 { // globals.SugarLogger.Debugf("heart %v", utils.Format4Output(msg, false)) - if ClientsHeart[msg.UserID] != nil { - if err := ClientsHeart[msg.UserID].WriteJSON(&model.ImMessageRecord{ + if wsClient.ClientsHeart[msg.UserID] != nil { + if err := wsClient.ClientsHeart[msg.UserID].WriteJSON(&model.ImMessageRecord{ Key: "pang", }); err != nil { globals.SugarLogger.Debugf("heart client.WriteJSON error: %v", err) - ClientsHeart[msg.UserID].Close() //关闭 - delete(ClientsHeart, msg.UserID) + wsClient.ClientsHeart[msg.UserID].Close() //关闭 + delete(wsClient.ClientsHeart, msg.UserID) } } } else { - globals.SugarLogger.Debugf("clients len %v", len(clients)) + globals.SugarLogger.Debugf("clients len %v", len(wsClient.Clients)) //循环map客户端 - for userID, client := range clients[msg.GroupID] { + for userID, client := range wsClient.Clients[msg.GroupID] { //把通道中的消息发送给客户端 user, err := dao.GetUser(dao.GetDB(), msg.UserID) if err == nil { @@ -191,7 +211,7 @@ func handleMessages() { if err != nil { globals.SugarLogger.Debugf("client.WriteJSON error: %v", err) client.Close() //关闭 - delete(clients[msg.GroupID], userID) + delete(wsClient.Clients[msg.GroupID], userID) // delete(clients, client) //删除map中的客户端 } } @@ -418,3 +438,25 @@ func (c *EventController) UploadAudio() { }, "", err }) } + +func GetOnlineUserCount() (count int) { + wsClient.s.RLock() + defer wsClient.s.RUnlock() + count = len(wsClient.ClientsHeart) + return count +} + +// @Title 得到用户统计数据 +// @Description 得到用户统计数据 +// @Param token header string true "认证token" +// @Success 200 {object} controllers.CallResult +// @Failure 200 {object} controllers.CallResult +// @router /GetUserStatistics [get] +func (c *EventController) GetUserStatistics() { + c.callGetUserStatistics(func(params *tEventGetUserStatisticsParams) (retVal interface{}, errCode string, err error) { + retVal, err = event.GetUserStatistics(params.Ctx) + result := retVal.(*event.GetUserStatisticsResult) + result.OnlineUserCount = GetOnlineUserCount() + return result, "", err + }) +} diff --git a/routers/commentsRouter_controllers.go b/routers/commentsRouter_controllers.go index 4913ccd73..7104122f8 100644 --- a/routers/commentsRouter_controllers.go +++ b/routers/commentsRouter_controllers.go @@ -268,6 +268,15 @@ func init() { Filters: nil, Params: nil}) + beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:EventController"] = append(beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:EventController"], + beego.ControllerComments{ + Method: "GetUserStatistics", + Router: `/GetUserStatistics`, + AllowHTTPMethods: []string{"get"}, + MethodParams: param.Make(), + Filters: nil, + Params: nil}) + beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:EventController"] = append(beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:EventController"], beego.ControllerComments{ Method: "SendSysMessage",