This commit is contained in:
苏尹岚
2020-12-28 16:47:43 +08:00
parent 1d78471933
commit e456ee1be9
3 changed files with 98 additions and 18 deletions

View File

@@ -7,6 +7,7 @@ import (
"net/http"
"os"
"path"
"sync"
"time"
"git.rosy.net.cn/jx-callback/business/jxstore/event"
@@ -33,8 +34,15 @@ type EventController struct {
}
//连接的客户端,吧每个客户端都放进来
var clients = make(map[int]map[string]*websocket.Conn)
var ClientsHeart = make(map[string]*websocket.Conn)
type WSClient struct {
Clients map[int]map[string]*websocket.Conn
ClientsHeart map[string]*websocket.Conn
s *sync.RWMutex
}
var (
wsClient = &WSClient{}
)
//广播频道(通道)
var broadcast = make(chan *model.ImMessageRecord)
@@ -83,15 +91,17 @@ func (c *EventController) TestWebsocket() {
if len(messageGroups) == 0 {
return
}
wsClient.s.Lock()
clientUser[userID] = ws
ClientsHeart[userID] = ws
wsClient.ClientsHeart[userID] = ws
for _, v := range messageGroups {
if len(clients[v.GroupID]) > 0 {
clients[v.GroupID][userID] = ws
if len(wsClient.Clients[v.GroupID]) > 0 {
wsClient.Clients[v.GroupID][userID] = ws
} else {
clients[v.GroupID] = clientUser
wsClient.Clients[v.GroupID] = clientUser
}
}
wsClient.s.Unlock()
db := dao.GetDB()
if globals.IsProductEnv() {
_, _, err = jxcontext.New(nil, c.GetString("token"), c.Ctx.ResponseWriter, c.Ctx.Request)
@@ -104,7 +114,7 @@ func (c *EventController) TestWebsocket() {
}
globals.SugarLogger.Debugf("TestWebsocket connection...")
}
globals.SugarLogger.Debugf("userID :%v ,clients :%v", userID, utils.Format4Output(clients, false))
globals.SugarLogger.Debugf("userID :%v ,clients :%v", userID, utils.Format4Output(wsClient.Clients, false))
c.EnableRender = false //Beego不启用渲染
var s *model.ImMessageRecord
@@ -113,10 +123,14 @@ func (c *EventController) TestWebsocket() {
err := ws.ReadJSON(&s)
if err != nil {
globals.SugarLogger.Debugf("页面可能断开啦 ws.ReadJSON error: %v", err.Error())
for k, _ := range clients {
delete(clients[k], userID)
wsClient.s.RLock()
for k, _ := range wsClient.Clients {
delete(wsClient.Clients[k], userID)
}
delete(ClientsHeart, userID)
wsClient.s.RUnlock()
wsClient.s.Lock()
delete(wsClient.ClientsHeart, userID)
wsClient.s.Unlock()
// delete(clients, ws) //删除map中的客户端
break //结束循环
} else {
@@ -135,7 +149,7 @@ func (c *EventController) TestWebsocket() {
}
//如果这些人不在这个组的ws池子里就打上未读标记
for _, v := range userIDs {
if ClientsHeart[v] == nil {
if wsClient.ClientsHeart[v] == nil {
messageGroupReads, _ := dao.GetMessageGroupRead(db, v, s.GroupID)
for _, vv := range messageGroupReads {
vv.UnReadCount++
@@ -155,6 +169,12 @@ func (c *EventController) TestWebsocket() {
}
func init() {
clients := make(map[int]map[string]*websocket.Conn)
clientsHeart := make(map[string]*websocket.Conn)
wsClient.Clients = clients
wsClient.ClientsHeart = clientsHeart
wsClient.s = new(sync.RWMutex)
go handleMessages()
}
@@ -165,19 +185,19 @@ func handleMessages() {
msg := <-broadcast
if msg.GroupID == 0 {
// globals.SugarLogger.Debugf("heart %v", utils.Format4Output(msg, false))
if ClientsHeart[msg.UserID] != nil {
if err := ClientsHeart[msg.UserID].WriteJSON(&model.ImMessageRecord{
if wsClient.ClientsHeart[msg.UserID] != nil {
if err := wsClient.ClientsHeart[msg.UserID].WriteJSON(&model.ImMessageRecord{
Key: "pang",
}); err != nil {
globals.SugarLogger.Debugf("heart client.WriteJSON error: %v", err)
ClientsHeart[msg.UserID].Close() //关闭
delete(ClientsHeart, msg.UserID)
wsClient.ClientsHeart[msg.UserID].Close() //关闭
delete(wsClient.ClientsHeart, msg.UserID)
}
}
} else {
globals.SugarLogger.Debugf("clients len %v", len(clients))
globals.SugarLogger.Debugf("clients len %v", len(wsClient.Clients))
//循环map客户端
for userID, client := range clients[msg.GroupID] {
for userID, client := range wsClient.Clients[msg.GroupID] {
//把通道中的消息发送给客户端
user, err := dao.GetUser(dao.GetDB(), msg.UserID)
if err == nil {
@@ -191,7 +211,7 @@ func handleMessages() {
if err != nil {
globals.SugarLogger.Debugf("client.WriteJSON error: %v", err)
client.Close() //关闭
delete(clients[msg.GroupID], userID)
delete(wsClient.Clients[msg.GroupID], userID)
// delete(clients, client) //删除map中的客户端
}
}
@@ -418,3 +438,25 @@ func (c *EventController) UploadAudio() {
}, "", err
})
}
func GetOnlineUserCount() (count int) {
wsClient.s.RLock()
defer wsClient.s.RUnlock()
count = len(wsClient.ClientsHeart)
return count
}
// @Title 得到用户统计数据
// @Description 得到用户统计数据
// @Param token header string true "认证token"
// @Success 200 {object} controllers.CallResult
// @Failure 200 {object} controllers.CallResult
// @router /GetUserStatistics [get]
func (c *EventController) GetUserStatistics() {
c.callGetUserStatistics(func(params *tEventGetUserStatisticsParams) (retVal interface{}, errCode string, err error) {
retVal, err = event.GetUserStatistics(params.Ctx)
result := retVal.(*event.GetUserStatisticsResult)
result.OnlineUserCount = GetOnlineUserCount()
return result, "", err
})
}