This commit is contained in:
richboo111
2023-04-25 16:52:01 +08:00
parent 2e8b699b4f
commit a3a663c729
9 changed files with 1032 additions and 1128 deletions

View File

@@ -1,263 +1,264 @@
package im
import (
"errors"
"fmt"
"net/http"
"time"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/globals"
"github.com/gorilla/websocket"
)
func Init() {
//初始化
ToClientChan = make(chan clientInfo, 1000)
//写入全局变量
//Default()
Setup()
//建立长链接
//StartWebSocket(res, req)
Send([]byte(HeartCheckMsg))
//启动定时器
PingTimer()
go WriteMessage()
go Manager.Start()
fmt.Printf("服务器启动成功,端口号:%s\n", CommonSetting.HttpPort)
}
func Run(w http.ResponseWriter, r *http.Request) {
conn, err := (&websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
// 允许所有CORS跨域请求
CheckOrigin: func(r *http.Request) bool {
return true
},
}).Upgrade(w, r, nil)
if err != nil {
globals.SugarLogger.Debugf("upgrade error: %v", err)
http.NotFound(w, r)
return
}
//设置读取消息大小上线
conn.SetReadLimit(maxMessageSize)
clientID := r.FormValue("clientId")
clientSocket := NewClient(clientID, conn)
//读取客户端消息
clientSocket.Read()
if err = ConnRender(conn, renderData{ClientId: clientID}); err != nil {
_ = conn.Close()
return
}
// 用户连接事件
Manager.Connect <- clientSocket
}
func StartWebSocket(conn *websocket.Conn, clientID string, err error) {
//设置读取消息大小上线
conn.SetReadLimit(maxMessageSize)
clientSocket := NewClient(clientID, conn)
//读取客户端消息
clientSocket.Read()
if err = ConnRender(conn, renderData{ClientId: clientID}); err != nil {
_ = conn.Close()
return
}
// 用户连接事件
Manager.Connect <- clientSocket
}
// PingTimer 定时器发送心跳
func PingTimer() {
go func() {
ticker := time.NewTicker(heartbeatInterval)
defer ticker.Stop()
//测试用
i := 0
for {
i++
<-ticker.C
for clientId, conn := range Manager.AllClient() {
if err := conn.Socket.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
Manager.DisConnect <- conn
globals.SugarLogger.Debugf("发送心跳失败: %s 总连接数:%d", clientId, Manager.Count())
}
if err := ConnRender(conn.Socket, renderData{ClientId: clientId}); err != nil {
return
}
globals.SugarLogger.Debugf("clientId=%s,i=%d", clientId, i)
}
}
}()
}
// WriteMessage 监听并发送给客户端信息
func WriteMessage() {
i := 0
for {
clientInfo := <-ToClientChan
//广播发送通知所有客户端
i++
fmt.Printf("WriteMessage clientInfo=%s i=%d", utils.Format4Output(clientInfo, false), i)
if Manager.AllClient() != nil {
for _, conn := range Manager.AllClient() {
if err := Render(conn.Socket, clientInfo.MessageId, clientInfo.Code, clientInfo.Msg, clientInfo.Data); err != nil {
Manager.DisConnect <- conn
}
}
} else {
globals.SugarLogger.Debugf("无客户端连接,请检查")
return
}
//if conn, err := Manager.GetByClientId(clientInfo.ClientId); err == nil && conn != nil {
// if err := Render(conn.Socket, clientInfo.MessageId, clientInfo.Code, clientInfo.Msg, clientInfo.Data); err != nil {
// Manager.DisConnect <- conn
// }
//}
}
}
// Start 管道处理程序
func (manager *ClientManager) Start() {
for {
select {
case client := <-manager.Connect:
// 建立连接事件
manager.EventConnect(client)
case conn := <-manager.DisConnect:
// 断开连接事件
manager.EventDisconnect(conn)
}
}
}
//从客户端读取数据
func (c *Client) Read() {
go func() {
for {
messageType, msg, err := c.Socket.ReadMessage()
if err != nil {
if messageType == -1 && websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
Manager.DisConnect <- c
return
} else if messageType != websocket.PingMessage {
return
}
} else {
SendToVendor(msg)
}
}
}()
}
// 以下为连接事件操作*******************************************
// EventConnect 建立连接事件
func (manager *ClientManager) EventConnect(client *Client) {
manager.AddClient(client)
}
// EventDisconnect 断开连接事件
func (manager *ClientManager) EventDisconnect(client *Client) {
//关闭连接
_ = client.Socket.Close()
manager.DelClient(client)
//标记销毁
client.IsDeleted = true
client = nil
}
//以下为客户端Client操作*******************************************
// NewClient 初始化Client
func NewClient(clientId string, socket *websocket.Conn) *Client {
return &Client{
ClientId: clientId,
Socket: socket,
ConnectTime: uint64(time.Now().Unix()),
IsDeleted: false,
}
}
// AddClient 添加客户端
func (manager *ClientManager) AddClient(client *Client) {
manager.ClientIdMapLock.Lock()
defer manager.ClientIdMapLock.Unlock()
manager.ClientIdMap[client.ClientId] = client
}
// DelClient 删除客户端
func (manager *ClientManager) DelClient(client *Client) {
manager.delClientIdMap(client.ClientId)
}
// 删除clientIdMap
func (manager *ClientManager) delClientIdMap(clientId string) {
manager.ClientIdMapLock.Lock()
defer manager.ClientIdMapLock.Unlock()
delete(manager.ClientIdMap, clientId)
}
// GetByClientId 通过clientId获取client
func (manager *ClientManager) GetByClientId(clientId string) (*Client, error) {
manager.ClientIdMapLock.RLock()
defer manager.ClientIdMapLock.RUnlock()
if client, ok := manager.ClientIdMap[clientId]; !ok {
return nil, errors.New("客户端不存在")
} else {
return client, nil
}
}
// AllClient 获取所有的客户端
func (manager *ClientManager) AllClient() map[string]*Client {
manager.ClientIdMapLock.RLock()
defer manager.ClientIdMapLock.RUnlock()
return manager.ClientIdMap
}
//与客户端的交互操作*************************
// NewClientManager 初始化客户端管理
func NewClientManager() (clientManager *ClientManager) {
clientManager = &ClientManager{
ClientIdMap: make(map[string]*Client),
Connect: make(chan *Client, 10000),
DisConnect: make(chan *Client, 10000),
Groups: make(map[string][]string, 100),
SystemClients: make(map[string][]string, 100),
}
return
}
// Count 获取客户端数量
func (manager *ClientManager) Count() int {
manager.ClientIdMapLock.RLock()
defer manager.ClientIdMapLock.RUnlock()
return len(manager.ClientIdMap)
}
//
//import (
// "errors"
// "fmt"
// "net/http"
// "time"
//
// "git.rosy.net.cn/baseapi/utils"
//
// "git.rosy.net.cn/jx-callback/globals"
// "github.com/gorilla/websocket"
//)
//
//func Init() {
// //初始化
// ToClientChan = make(chan clientInfo, 1000)
// //写入全局变量
// //Default()
//
// Setup()
// //建立长链接
// //StartWebSocket(res, req)
// Send([]byte(HeartCheckMsg))
//
// //启动定时器
// PingTimer()
//
// go WriteMessage()
//
// go Manager.Start()
//
// fmt.Printf("服务器启动成功,端口号:%s\n", CommonSetting.HttpPort)
//}
//
//func Run(w http.ResponseWriter, r *http.Request) {
// conn, err := (&websocket.Upgrader{
// ReadBufferSize: 1024,
// WriteBufferSize: 1024,
// // 允许所有CORS跨域请求
// CheckOrigin: func(r *http.Request) bool {
// return true
// },
// }).Upgrade(w, r, nil)
// if err != nil {
// globals.SugarLogger.Debugf("upgrade error: %v", err)
// http.NotFound(w, r)
// return
// }
//
// //设置读取消息大小上线
// conn.SetReadLimit(maxMessageSize)
//
// clientID := r.FormValue("clientId")
// clientSocket := NewClient(clientID, conn)
//
// //读取客户端消息
// clientSocket.Read()
//
// if err = ConnRender(conn, renderData{ClientId: clientID}); err != nil {
// _ = conn.Close()
// return
// }
//
// // 用户连接事件
// Manager.Connect <- clientSocket
//}
//
//func StartWebSocket(conn *websocket.Conn, clientID string, err error) {
//
// //设置读取消息大小上线
// conn.SetReadLimit(maxMessageSize)
//
// clientSocket := NewClient(clientID, conn)
//
// //读取客户端消息
// clientSocket.Read()
//
// if err = ConnRender(conn, renderData{ClientId: clientID}); err != nil {
// _ = conn.Close()
// return
// }
//
// // 用户连接事件
// Manager.Connect <- clientSocket
//
//}
//
//// PingTimer 定时器发送心跳
//func PingTimer() {
// go func() {
// ticker := time.NewTicker(heartbeatInterval)
// defer ticker.Stop()
// //测试用
// i := 0
// for {
// i++
// <-ticker.C
// for clientId, conn := range Manager.AllClient() {
// if err := conn.Socket.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
// Manager.DisConnect <- conn
// globals.SugarLogger.Debugf("发送心跳失败: %s 总连接数:%d", clientId, Manager.Count())
// }
// if err := ConnRender(conn.Socket, renderData{ClientId: clientId}); err != nil {
// return
// }
// globals.SugarLogger.Debugf("clientId=%s,i=%d", clientId, i)
// }
// }
// }()
//}
//
//// WriteMessage 监听并发送给客户端信息
//func WriteMessage() {
// i := 0
// for {
// clientInfo := <-ToClientChan
// //广播发送通知所有客户端
// i++
// fmt.Printf("WriteMessage clientInfo=%s i=%d", utils.Format4Output(clientInfo, false), i)
// if Manager.AllClient() != nil {
// for _, conn := range Manager.AllClient() {
// if err := Render(conn.Socket, clientInfo.MessageId, clientInfo.Code, clientInfo.Msg, clientInfo.Data); err != nil {
// Manager.DisConnect <- conn
// }
// }
// } else {
// globals.SugarLogger.Debugf("无客户端连接,请检查")
// return
// }
// //if conn, err := Manager.GetByClientId(clientInfo.ClientId); err == nil && conn != nil {
// // if err := Render(conn.Socket, clientInfo.MessageId, clientInfo.Code, clientInfo.Msg, clientInfo.Data); err != nil {
// // Manager.DisConnect <- conn
// // }
// //}
// }
//}
//
//// Start 管道处理程序
//func (manager *ClientManager) Start() {
// for {
// select {
// case client := <-manager.Connect:
// // 建立连接事件
// manager.EventConnect(client)
// case conn := <-manager.DisConnect:
// // 断开连接事件
// manager.EventDisconnect(conn)
// }
// }
//}
//
////从客户端读取数据
//func (c *Client) Read() {
// go func() {
// for {
// messageType, msg, err := c.Socket.ReadMessage()
// if err != nil {
// if messageType == -1 && websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
// Manager.DisConnect <- c
// return
// } else if messageType != websocket.PingMessage {
// return
// }
// } else {
// SendToVendor(msg)
// }
// }
// }()
//}
//
//// 以下为连接事件操作*******************************************
//
//// EventConnect 建立连接事件
//func (manager *ClientManager) EventConnect(client *Client) {
// manager.AddClient(client)
//}
//
//// EventDisconnect 断开连接事件
//func (manager *ClientManager) EventDisconnect(client *Client) {
// //关闭连接
// _ = client.Socket.Close()
// manager.DelClient(client)
// //标记销毁
// client.IsDeleted = true
// client = nil
//}
//
////以下为客户端Client操作*******************************************
//
//// NewClient 初始化Client
//func NewClient(clientId string, socket *websocket.Conn) *Client {
// return &Client{
// ClientId: clientId,
// Socket: socket,
// ConnectTime: uint64(time.Now().Unix()),
// IsDeleted: false,
// }
//}
//
//// AddClient 添加客户端
//func (manager *ClientManager) AddClient(client *Client) {
// manager.ClientIdMapLock.Lock()
// defer manager.ClientIdMapLock.Unlock()
//
// manager.ClientIdMap[client.ClientId] = client
//}
//
//// DelClient 删除客户端
//func (manager *ClientManager) DelClient(client *Client) {
// manager.delClientIdMap(client.ClientId)
//
//}
//
//// 删除clientIdMap
//func (manager *ClientManager) delClientIdMap(clientId string) {
// manager.ClientIdMapLock.Lock()
// defer manager.ClientIdMapLock.Unlock()
//
// delete(manager.ClientIdMap, clientId)
//}
//
//// GetByClientId 通过clientId获取client
//func (manager *ClientManager) GetByClientId(clientId string) (*Client, error) {
// manager.ClientIdMapLock.RLock()
// defer manager.ClientIdMapLock.RUnlock()
//
// if client, ok := manager.ClientIdMap[clientId]; !ok {
// return nil, errors.New("客户端不存在")
// } else {
// return client, nil
// }
//}
//
//// AllClient 获取所有的客户端
//func (manager *ClientManager) AllClient() map[string]*Client {
// manager.ClientIdMapLock.RLock()
// defer manager.ClientIdMapLock.RUnlock()
//
// return manager.ClientIdMap
//}
//
////与客户端的交互操作*************************
//
//// NewClientManager 初始化客户端管理
//func NewClientManager() (clientManager *ClientManager) {
// clientManager = &ClientManager{
// ClientIdMap: make(map[string]*Client),
// Connect: make(chan *Client, 10000),
// DisConnect: make(chan *Client, 10000),
// Groups: make(map[string][]string, 100),
// SystemClients: make(map[string][]string, 100),
// }
//
// return
//}
//
//// Count 获取客户端数量
//func (manager *ClientManager) Count() int {
// manager.ClientIdMapLock.RLock()
// defer manager.ClientIdMapLock.RUnlock()
// return len(manager.ClientIdMap)
//}