Files
jx-callback/business/partner/purchase/im/im_server.go
richboo111 866aa16bc5 1
2023-06-12 14:16:00 +08:00

260 lines
6.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package im
import (
"errors"
"fmt"
"net/http"
"time"
"git.rosy.net.cn/jx-callback/business/jxutils"
"git.rosy.net.cn/jx-callback/globals"
"github.com/gorilla/websocket"
)
func Init() {
//初始化
ToClientChan = make(chan clientInfo, 1000)
//写入全局变量
Setup()
//建立长链接
jxutils.CallMsgHandlerAsync(func() {
MtInit()
}, "MtInit:"+RandString())
//启动定时器
PingTimer()
jxutils.CallMsgHandlerAsync(func() {
WriteMessage()
}, "WriteMessage:"+RandString())
jxutils.CallMsgHandlerAsync(func() {
Manager.Start()
}, "Manager Start:"+RandString())
}
func Run(w http.ResponseWriter, r *http.Request) {
conn, err := (&websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
// 允许所有CORS跨域请求
CheckOrigin: func(r *http.Request) bool {
return true
},
}).Upgrade(w, r, nil)
if err != nil {
globals.SugarLogger.Debugf("upgrade error: %v", err)
http.NotFound(w, r)
return
}
//设置读取消息大小上线
conn.SetReadLimit(maxMessageSize)
clientID := ""
if temp := r.Header.Get("Clientid"); len(temp) == 0 {
clientID = RandString()
} else {
clientID = temp
}
clientSocket := NewClient(clientID, conn, ClientTypeJx)
//读取客户端消息
clientSocket.Read()
if err = ConnRender(conn, renderData{ClientId: clientID}); err != nil {
_ = conn.Close()
return
}
// 用户连接事件
Manager.Connect <- clientSocket
}
// PingTimer 定时器发送心跳
func PingTimer() {
go func() {
ticker := time.NewTicker(heartbeatInterval)
defer ticker.Stop()
for {
<-ticker.C
for clientId, conn := range Manager.AllClient() {
if conn.ClientType == ClientTypeJx {
if err := conn.Socket.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
Manager.DisConnect <- conn
fmt.Printf("发送心跳失败: %s 总连接数:%d", clientId, Manager.Count())
}
if err := ConnRender(conn.Socket, renderData{ClientId: clientId}); err != nil {
return
}
globals.SugarLogger.Debugf("PingTimer jx clientID ")
} else {
fmt.Printf("PingTimer mt心跳conn%s,%s", conn.ClientId, conn.ClientType)
if err := conn.Socket.WriteMessage(websocket.TextMessage, []byte(HeartCheckMsg)); err != nil {
fmt.Printf("PingTimer mtHeartBeat err:%v", err)
//对美团重新建立连接
MtInit()
}
}
}
}
}()
}
// WriteMessage 监听并发送给客户端信息
func WriteMessage() {
i := 0
for {
clientInfo := <-ToClientChan
//广播发送通知所有京西客户端
i++
//fmt.Printf("WriteMessage clientInfo=%s i=%d\n", utils.Format4Output(clientInfo, false), i)
if Manager.AllClient() != nil {
for _, conn := range Manager.AllClient() {
if conn.ClientType == ClientTypeJx { //只发送给京西
fmt.Printf("WriteMessage conn.ClientId=%s\n", conn.ClientId)
if err := Render(conn.Socket, clientInfo.MessageId, clientInfo.Code, clientInfo.Msg, clientInfo.Data); err != nil {
Manager.DisConnect <- conn
}
}
}
} else {
globals.SugarLogger.Debugf("无客户端连接,请检查")
return
}
}
}
// Start 管道处理程序
func (manager *ClientManager) Start() {
for {
select {
case client := <-manager.Connect:
// 建立连接事件
manager.EventConnect(client)
case conn := <-manager.DisConnect:
// 断开连接事件
manager.EventDisconnect(conn)
}
}
}
//从客户端读取数据
func (c *Client) Read() {
go func() {
for {
messageType, msg, err := c.Socket.ReadMessage()
//temp := string(msg)
//fmt.Print(temp)
if err != nil {
if messageType == -1 && websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
Manager.DisConnect <- c
return
} else if messageType != websocket.PingMessage {
return
}
}
fmt.Printf("Client Read:receive: %s\n", string(msg))
//SendToVendor(msg)
}
}()
}
// 以下为连接事件操作*******************************************
// EventConnect 建立连接事件
func (manager *ClientManager) EventConnect(client *Client) {
manager.AddClient(client)
}
// EventDisconnect 断开连接事件
func (manager *ClientManager) EventDisconnect(client *Client) {
//关闭连接
_ = client.Socket.Close()
manager.DelClient(client)
//标记销毁
client.IsDeleted = true
client = nil
}
//以下为客户端Client操作*******************************************
// NewClient 初始化Client
func NewClient(clientId string, socket *websocket.Conn, clientType string) *Client {
return &Client{
ClientId: clientId,
Socket: socket,
ClientType: clientType,
ConnectTime: uint64(time.Now().Unix()),
IsDeleted: false,
}
}
// AddClient 添加客户端
func (manager *ClientManager) AddClient(client *Client) {
manager.ClientIdMapLock.Lock()
defer manager.ClientIdMapLock.Unlock()
manager.ClientIdMap[client.ClientId] = client
}
// DelClient 删除客户端
func (manager *ClientManager) DelClient(client *Client) {
manager.delClientIdMap(client.ClientId)
}
// 删除clientIdMap
func (manager *ClientManager) delClientIdMap(clientId string) {
manager.ClientIdMapLock.Lock()
defer manager.ClientIdMapLock.Unlock()
delete(manager.ClientIdMap, clientId)
}
// GetByClientId 通过clientId获取client
func (manager *ClientManager) GetByClientId(clientId string) (*Client, error) {
manager.ClientIdMapLock.RLock()
defer manager.ClientIdMapLock.RUnlock()
if client, ok := manager.ClientIdMap[clientId]; !ok {
return nil, errors.New("客户端不存在")
} else {
return client, nil
}
}
// AllClient 获取所有的客户端
func (manager *ClientManager) AllClient() map[string]*Client {
manager.ClientIdMapLock.RLock()
defer manager.ClientIdMapLock.RUnlock()
return manager.ClientIdMap
}
//与客户端的交互操作*************************
// NewClientManager 初始化客户端管理
func NewClientManager() (clientManager *ClientManager) {
clientManager = &ClientManager{
ClientIdMap: make(map[string]*Client),
Connect: make(chan *Client, 10000),
DisConnect: make(chan *Client, 10000),
Groups: make(map[string][]string, 100),
SystemClients: make(map[string][]string, 100),
}
return
}
// Count 获取客户端数量
func (manager *ClientManager) Count() int {
manager.ClientIdMapLock.RLock()
defer manager.ClientIdMapLock.RUnlock()
return len(manager.ClientIdMap)
}