From cce7839324701cad678b2425fb91d1d1a0ebc65a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E5=B0=B9=E5=B2=9A?= <770236076@qq.com> Date: Wed, 30 Dec 2020 14:07:34 +0800 Subject: [PATCH] a --- business/jxstore/event/event.go | 147 ++++++++++++++++++++++++++++ business/model/dao/event.go | 3 + controllers/event_controller.go | 163 ++------------------------------ 3 files changed, 157 insertions(+), 156 deletions(-) diff --git a/business/jxstore/event/event.go b/business/jxstore/event/event.go index 712524668..708adcd00 100644 --- a/business/jxstore/event/event.go +++ b/business/jxstore/event/event.go @@ -4,6 +4,7 @@ import ( "fmt" "regexp" "strings" + "sync" "time" "git.rosy.net.cn/baseapi/utils" @@ -12,6 +13,8 @@ import ( "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model/dao" + "git.rosy.net.cn/jx-callback/globals" + "github.com/gorilla/websocket" ) var ( @@ -21,8 +24,151 @@ var ( "UpdateUser": "UpdateUser", } regexpToken = regexp.MustCompile(`,"token":".*"`) + wsClient = &WSClient{} + //广播频道(通道) + broadcast = make(chan *model.ImMessageRecord) ) +//连接的客户端,吧每个客户端都放进来 +type WSClient struct { + Clients map[int]map[string]*websocket.Conn + ClientsHeart map[string]*websocket.Conn + s *sync.RWMutex +} + +func init() { + clients := make(map[int]map[string]*websocket.Conn) + clientsHeart := make(map[string]*websocket.Conn) + wsClient.Clients = clients + wsClient.ClientsHeart = clientsHeart + wsClient.s = new(sync.RWMutex) + + go handleMessages() +} + +//广播推送消息 +func handleMessages() { + for { + //读取通道中的消息 + msg := <-broadcast + if msg.GroupID == 0 { + // globals.SugarLogger.Debugf("heart %v", utils.Format4Output(msg, false)) + if wsClient.ClientsHeart[msg.UserID] != nil { + if err := wsClient.ClientsHeart[msg.UserID].WriteJSON(&model.ImMessageRecord{ + Key: "pang", + }); err != nil { + globals.SugarLogger.Debugf("heart client.WriteJSON error: %v", err) + wsClient.ClientsHeart[msg.UserID].Close() //关闭 + delete(wsClient.ClientsHeart, msg.UserID) + } + } + } else { + globals.SugarLogger.Debugf("clients len %v", len(wsClient.Clients)) + //循环map客户端 + for userID, client := range wsClient.Clients[msg.GroupID] { + //把通道中的消息发送给客户端 + user, err := dao.GetUser(dao.GetDB(), msg.UserID) + if err == nil { + msg.UserInfo = user + } + globals.SugarLogger.Debugf("msg %v", utils.Format4Output(msg, false)) + if msg.CreatedAt == utils.ZeroTimeValue { + msg.CreatedAt = time.Now() + } + err = client.WriteJSON(msg) + if err != nil { + globals.SugarLogger.Debugf("client.WriteJSON error: %v", err) + client.Close() //关闭 + delete(wsClient.Clients[msg.GroupID], userID) + // delete(clients, client) //删除map中的客户端 + } + } + } + } +} + +func ImMessage(userID string, ws *websocket.Conn) (err error) { + var ( + clientUser = make(map[string]*websocket.Conn) + db = dao.GetDB() + ) + //将当前客户端放入map中 + messageGroups, _ := dao.GetUserMessageGroups(dao.GetDB(), userID) + if len(messageGroups) == 0 { + return + } + wsClient.s.Lock() + clientUser[userID] = ws + wsClient.ClientsHeart[userID] = ws + for _, v := range messageGroups { + if len(wsClient.Clients[v.GroupID]) > 0 { + wsClient.Clients[v.GroupID][userID] = ws + } else { + wsClient.Clients[v.GroupID] = clientUser + } + } + wsClient.s.Unlock() + globals.SugarLogger.Debugf("userID :%v ,clients :%v", userID, utils.Format4Output(wsClient.Clients, false)) + + var s *model.ImMessageRecord + for { + //接收客户端的消息 + err := ws.ReadJSON(&s) + if err != nil { + globals.SugarLogger.Debugf("页面可能断开啦 ws.ReadJSON error: %v", err.Error()) + wsClient.s.RLock() + for k, _ := range wsClient.Clients { + delete(wsClient.Clients[k], userID) + } + wsClient.s.RUnlock() + wsClient.s.Lock() + delete(wsClient.ClientsHeart, userID) + wsClient.s.Unlock() + // delete(clients, ws) //删除map中的客户端 + break //结束循环 + } else { + //接受消息 业务逻辑 + broadcast <- s + if s.GroupID != 0 { + //发聊天消息时,这个组所有的成员包括创建者都在userIDs里 + userIDs := []string{} + if results, err := dao.GetMessageGroups(db, "", s.GroupID, 0, true, ""); err == nil { + for _, v := range results { + userIDs = append(userIDs, v.UserID) + for _, vv := range v.MessageGroupMembers { + userIDs = append(userIDs, vv.UserID) + } + } + } + //如果这些人不在这个组的ws池子里就打上未读标记 + for _, v := range userIDs { + if wsClient.ClientsHeart[v] == nil { + messageGroupReads, _ := dao.GetMessageGroupRead(db, v, s.GroupID) + for _, vv := range messageGroupReads { + vv.UnReadCount++ + dao.UpdateEntity(db, vv, "UnReadCount") + } + } + } + } + utils.CallFuncAsync(func() { + if s.GroupID != 0 { + dao.WrapAddIDCULDEntity(s, "") + dao.CreateEntity(db, s) + } + }) + } + } + return err +} + +func GetOnlineUserCount() (count int) { + wsClient.s.RLock() + defer wsClient.s.RUnlock() + count = len(wsClient.ClientsHeart) + return count +} + func AddOperateEvent(ctx *jxcontext.Context, accessUUID, jsonData string, errCode, errMsg string, useTime int, apiFunctionSpec string) (err error) { var ( apiFunction string @@ -584,5 +730,6 @@ func GetUserStatistics(ctx *jxcontext.Context) (getUserStatisticsResult *GetUser err = dao.GetRow(db, &getUserStatisticsResult, sql, sqlParams) paged, _ := dao.GetUsers2(db, "", "", 0, "", utils.ZeroTimeValue, utils.ZeroTimeValue, 0, nil, nil, 0, -1) getUserStatisticsResult.RegisterUserCount = paged.TotalCount + getUserStatisticsResult.OnlineUserCount = GetOnlineUserCount() return getUserStatisticsResult, err } diff --git a/business/model/dao/event.go b/business/model/dao/event.go index 8ef087161..1d4c9fd88 100644 --- a/business/model/dao/event.go +++ b/business/model/dao/event.go @@ -302,6 +302,9 @@ func GetUserMessageGroups(db *DaoDB, userID string) (messageGroup []*model.Messa utils.DefaultTimeValue, userID, } err = GetRows(db, &messageGroup, sql, sqlParams) + messageGroup = append(messageGroup, &model.MessageGroup{ + GroupID: model.SysGroupID, + }) return messageGroup, err } diff --git a/controllers/event_controller.go b/controllers/event_controller.go index 0d1e95991..791ac3725 100644 --- a/controllers/event_controller.go +++ b/controllers/event_controller.go @@ -7,15 +7,13 @@ import ( "net/http" "os" "path" - "sync" "time" "git.rosy.net.cn/jx-callback/business/jxstore/event" + "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" + "git.rosy.net.cn/jx-callback/globals" "git.rosy.net.cn/jx-callback/business/jxutils" - "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" - - "git.rosy.net.cn/jx-callback/globals" "git.rosy.net.cn/jx-callback/business/model/dao" @@ -33,32 +31,13 @@ type EventController struct { beego.Controller } -//连接的客户端,吧每个客户端都放进来 -type WSClient struct { - Clients map[int]map[string]*websocket.Conn - ClientsHeart map[string]*websocket.Conn - s *sync.RWMutex -} - -var ( - wsClient = &WSClient{} -) - -//广播频道(通道) -var broadcast = make(chan *model.ImMessageRecord) - -// 配置升级程序(升级为websocket) -var upgrader = websocket.Upgrader{} - -// 定义我们的消息对象 -type Message struct { - Data interface{} `json:"data"` -} - const ( audioPath = "/jxdata/cthrgw/dist/audio/" ) +// 配置升级程序(升级为websocket) +var upgrader = websocket.Upgrader{} + // @Title 测试websocket // @Description 测试websocket // @Success 200 {object} controllers.CallResult @@ -80,29 +59,11 @@ func (c *EventController) TestWebsocket() { } defer ws.Close() var ( - userID = c.GetString("userID") - clientUser = make(map[string]*websocket.Conn) + userID = c.GetString("userID") ) if userID == "" { return } - //将当前客户端放入map中 - messageGroups, _ := dao.GetUserMessageGroups(dao.GetDB(), userID) - if len(messageGroups) == 0 { - return - } - wsClient.s.Lock() - clientUser[userID] = ws - wsClient.ClientsHeart[userID] = ws - for _, v := range messageGroups { - if len(wsClient.Clients[v.GroupID]) > 0 { - wsClient.Clients[v.GroupID][userID] = ws - } else { - wsClient.Clients[v.GroupID] = clientUser - } - } - wsClient.s.Unlock() - db := dao.GetDB() if globals.IsProductEnv() { _, _, err = jxcontext.New(nil, c.GetString("token"), c.Ctx.ResponseWriter, c.Ctx.Request) if err != nil { @@ -114,109 +75,8 @@ func (c *EventController) TestWebsocket() { } globals.SugarLogger.Debugf("TestWebsocket connection...") } - globals.SugarLogger.Debugf("userID :%v ,clients :%v", userID, utils.Format4Output(wsClient.Clients, false)) + event.ImMessage(userID, ws) c.EnableRender = false //Beego不启用渲染 - - var s *model.ImMessageRecord - for { - //接收客户端的消息 - err := ws.ReadJSON(&s) - if err != nil { - globals.SugarLogger.Debugf("页面可能断开啦 ws.ReadJSON error: %v", err.Error()) - wsClient.s.RLock() - for k, _ := range wsClient.Clients { - delete(wsClient.Clients[k], userID) - } - wsClient.s.RUnlock() - wsClient.s.Lock() - delete(wsClient.ClientsHeart, userID) - wsClient.s.Unlock() - // delete(clients, ws) //删除map中的客户端 - break //结束循环 - } else { - //接受消息 业务逻辑 - broadcast <- s - if s.GroupID != 0 { - //发聊天消息时,这个组所有的成员包括创建者都在userIDs里 - userIDs := []string{} - if results, err := dao.GetMessageGroups(db, "", s.GroupID, 0, true, ""); err == nil { - for _, v := range results { - userIDs = append(userIDs, v.UserID) - for _, vv := range v.MessageGroupMembers { - userIDs = append(userIDs, vv.UserID) - } - } - } - //如果这些人不在这个组的ws池子里就打上未读标记 - for _, v := range userIDs { - if wsClient.ClientsHeart[v] == nil { - messageGroupReads, _ := dao.GetMessageGroupRead(db, v, s.GroupID) - for _, vv := range messageGroupReads { - vv.UnReadCount++ - dao.UpdateEntity(db, vv, "UnReadCount") - } - } - } - } - utils.CallFuncAsync(func() { - if s.GroupID != 0 { - dao.WrapAddIDCULDEntity(s, "") - dao.CreateEntity(db, s) - } - }) - } - } -} - -func init() { - clients := make(map[int]map[string]*websocket.Conn) - clientsHeart := make(map[string]*websocket.Conn) - wsClient.Clients = clients - wsClient.ClientsHeart = clientsHeart - wsClient.s = new(sync.RWMutex) - - go handleMessages() -} - -//广播推送消息 -func handleMessages() { - for { - //读取通道中的消息 - msg := <-broadcast - if msg.GroupID == 0 { - // globals.SugarLogger.Debugf("heart %v", utils.Format4Output(msg, false)) - if wsClient.ClientsHeart[msg.UserID] != nil { - if err := wsClient.ClientsHeart[msg.UserID].WriteJSON(&model.ImMessageRecord{ - Key: "pang", - }); err != nil { - globals.SugarLogger.Debugf("heart client.WriteJSON error: %v", err) - wsClient.ClientsHeart[msg.UserID].Close() //关闭 - delete(wsClient.ClientsHeart, msg.UserID) - } - } - } else { - globals.SugarLogger.Debugf("clients len %v", len(wsClient.Clients)) - //循环map客户端 - for userID, client := range wsClient.Clients[msg.GroupID] { - //把通道中的消息发送给客户端 - user, err := dao.GetUser(dao.GetDB(), msg.UserID) - if err == nil { - msg.UserInfo = user - } - globals.SugarLogger.Debugf("msg %v", utils.Format4Output(msg, false)) - if msg.CreatedAt == utils.ZeroTimeValue { - msg.CreatedAt = time.Now() - } - err = client.WriteJSON(msg) - if err != nil { - globals.SugarLogger.Debugf("client.WriteJSON error: %v", err) - client.Close() //关闭 - delete(wsClient.Clients[msg.GroupID], userID) - // delete(clients, client) //删除map中的客户端 - } - } - } - } } // @Title 查询聊天记录 @@ -455,13 +315,6 @@ func (c *EventController) UploadAudio() { }) } -func GetOnlineUserCount() (count int) { - wsClient.s.RLock() - defer wsClient.s.RUnlock() - count = len(wsClient.ClientsHeart) - return count -} - // @Title 得到用户统计数据 // @Description 得到用户统计数据 // @Param token header string true "认证token" @@ -471,8 +324,6 @@ func GetOnlineUserCount() (count int) { func (c *EventController) GetUserStatistics() { c.callGetUserStatistics(func(params *tEventGetUserStatisticsParams) (retVal interface{}, errCode string, err error) { retVal, err = event.GetUserStatistics(params.Ctx) - result := retVal.(*event.GetUserStatisticsResult) - result.OnlineUserCount = GetOnlineUserCount() return result, "", err }) }