This commit is contained in:
苏尹岚
2020-12-30 14:07:34 +08:00
parent d1d592c633
commit cce7839324
3 changed files with 157 additions and 156 deletions

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"regexp"
"strings"
"sync"
"time"
"git.rosy.net.cn/baseapi/utils"
@@ -12,6 +13,8 @@ import (
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/model/dao"
"git.rosy.net.cn/jx-callback/globals"
"github.com/gorilla/websocket"
)
var (
@@ -21,8 +24,151 @@ var (
"UpdateUser": "UpdateUser",
}
regexpToken = regexp.MustCompile(`,"token":".*"`)
wsClient = &WSClient{}
//广播频道(通道)
broadcast = make(chan *model.ImMessageRecord)
)
//连接的客户端,吧每个客户端都放进来
type WSClient struct {
Clients map[int]map[string]*websocket.Conn
ClientsHeart map[string]*websocket.Conn
s *sync.RWMutex
}
func init() {
clients := make(map[int]map[string]*websocket.Conn)
clientsHeart := make(map[string]*websocket.Conn)
wsClient.Clients = clients
wsClient.ClientsHeart = clientsHeart
wsClient.s = new(sync.RWMutex)
go handleMessages()
}
//广播推送消息
func handleMessages() {
for {
//读取通道中的消息
msg := <-broadcast
if msg.GroupID == 0 {
// globals.SugarLogger.Debugf("heart %v", utils.Format4Output(msg, false))
if wsClient.ClientsHeart[msg.UserID] != nil {
if err := wsClient.ClientsHeart[msg.UserID].WriteJSON(&model.ImMessageRecord{
Key: "pang",
}); err != nil {
globals.SugarLogger.Debugf("heart client.WriteJSON error: %v", err)
wsClient.ClientsHeart[msg.UserID].Close() //关闭
delete(wsClient.ClientsHeart, msg.UserID)
}
}
} else {
globals.SugarLogger.Debugf("clients len %v", len(wsClient.Clients))
//循环map客户端
for userID, client := range wsClient.Clients[msg.GroupID] {
//把通道中的消息发送给客户端
user, err := dao.GetUser(dao.GetDB(), msg.UserID)
if err == nil {
msg.UserInfo = user
}
globals.SugarLogger.Debugf("msg %v", utils.Format4Output(msg, false))
if msg.CreatedAt == utils.ZeroTimeValue {
msg.CreatedAt = time.Now()
}
err = client.WriteJSON(msg)
if err != nil {
globals.SugarLogger.Debugf("client.WriteJSON error: %v", err)
client.Close() //关闭
delete(wsClient.Clients[msg.GroupID], userID)
// delete(clients, client) //删除map中的客户端
}
}
}
}
}
func ImMessage(userID string, ws *websocket.Conn) (err error) {
var (
clientUser = make(map[string]*websocket.Conn)
db = dao.GetDB()
)
//将当前客户端放入map中
messageGroups, _ := dao.GetUserMessageGroups(dao.GetDB(), userID)
if len(messageGroups) == 0 {
return
}
wsClient.s.Lock()
clientUser[userID] = ws
wsClient.ClientsHeart[userID] = ws
for _, v := range messageGroups {
if len(wsClient.Clients[v.GroupID]) > 0 {
wsClient.Clients[v.GroupID][userID] = ws
} else {
wsClient.Clients[v.GroupID] = clientUser
}
}
wsClient.s.Unlock()
globals.SugarLogger.Debugf("userID :%v ,clients :%v", userID, utils.Format4Output(wsClient.Clients, false))
var s *model.ImMessageRecord
for {
//接收客户端的消息
err := ws.ReadJSON(&s)
if err != nil {
globals.SugarLogger.Debugf("页面可能断开啦 ws.ReadJSON error: %v", err.Error())
wsClient.s.RLock()
for k, _ := range wsClient.Clients {
delete(wsClient.Clients[k], userID)
}
wsClient.s.RUnlock()
wsClient.s.Lock()
delete(wsClient.ClientsHeart, userID)
wsClient.s.Unlock()
// delete(clients, ws) //删除map中的客户端
break //结束循环
} else {
//接受消息 业务逻辑
broadcast <- s
if s.GroupID != 0 {
//发聊天消息时这个组所有的成员包括创建者都在userIDs里
userIDs := []string{}
if results, err := dao.GetMessageGroups(db, "", s.GroupID, 0, true, ""); err == nil {
for _, v := range results {
userIDs = append(userIDs, v.UserID)
for _, vv := range v.MessageGroupMembers {
userIDs = append(userIDs, vv.UserID)
}
}
}
//如果这些人不在这个组的ws池子里就打上未读标记
for _, v := range userIDs {
if wsClient.ClientsHeart[v] == nil {
messageGroupReads, _ := dao.GetMessageGroupRead(db, v, s.GroupID)
for _, vv := range messageGroupReads {
vv.UnReadCount++
dao.UpdateEntity(db, vv, "UnReadCount")
}
}
}
}
utils.CallFuncAsync(func() {
if s.GroupID != 0 {
dao.WrapAddIDCULDEntity(s, "")
dao.CreateEntity(db, s)
}
})
}
}
return err
}
func GetOnlineUserCount() (count int) {
wsClient.s.RLock()
defer wsClient.s.RUnlock()
count = len(wsClient.ClientsHeart)
return count
}
func AddOperateEvent(ctx *jxcontext.Context, accessUUID, jsonData string, errCode, errMsg string, useTime int, apiFunctionSpec string) (err error) {
var (
apiFunction string
@@ -584,5 +730,6 @@ func GetUserStatistics(ctx *jxcontext.Context) (getUserStatisticsResult *GetUser
err = dao.GetRow(db, &getUserStatisticsResult, sql, sqlParams)
paged, _ := dao.GetUsers2(db, "", "", 0, "", utils.ZeroTimeValue, utils.ZeroTimeValue, 0, nil, nil, 0, -1)
getUserStatisticsResult.RegisterUserCount = paged.TotalCount
getUserStatisticsResult.OnlineUserCount = GetOnlineUserCount()
return getUserStatisticsResult, err
}

View File

@@ -302,6 +302,9 @@ func GetUserMessageGroups(db *DaoDB, userID string) (messageGroup []*model.Messa
utils.DefaultTimeValue, userID,
}
err = GetRows(db, &messageGroup, sql, sqlParams)
messageGroup = append(messageGroup, &model.MessageGroup{
GroupID: model.SysGroupID,
})
return messageGroup, err
}