package im import ( "errors" "fmt" "net/http" "time" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/globals" "github.com/gorilla/websocket" ) func Init() { //初始化 ToClientChan = make(chan clientInfo, 1000) //写入全局变量 //Default() Setup() //建立长链接 //StartWebSocket(res, req) Send([]byte(HeartCheckMsg)) //启动定时器 PingTimer() go WriteMessage() go Manager.Start() fmt.Printf("服务器启动成功,端口号:%s\n", CommonSetting.HttpPort) } func StartWebSocket(w http.ResponseWriter, r *http.Request) { conn, err := (&websocket.Upgrader{ ReadBufferSize: 8182, WriteBufferSize: 8182, // 允许所有CORS跨域请求 CheckOrigin: func(r *http.Request) bool { return true }, }).Upgrade(w, r, nil) if err != nil { globals.SugarLogger.Errorf("upgrade error: %v", err) http.NotFound(w, r) return } //设置读取消息大小上线 conn.SetReadLimit(maxMessageSize) clientId := "" temp := r.Header["Clientid"] if temp[0] != "" { clientId = temp[0] } else { clientId = "defaultClientIDJXCS" } clientSocket := NewClient(clientId, conn) //读取客户端消息 clientSocket.Read() if err = ConnRender(conn, renderData{ClientId: clientId}); err != nil { _ = conn.Close() return } // 用户连接事件 Manager.Connect <- clientSocket } // PingTimer 定时器发送心跳 func PingTimer() { go func() { ticker := time.NewTicker(heartbeatInterval) defer ticker.Stop() //测试用 i := 0 for { i++ <-ticker.C for clientId, conn := range Manager.AllClient() { if err := conn.Socket.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { Manager.DisConnect <- conn globals.SugarLogger.Debugf("发送心跳失败: %s 总连接数:%d", clientId, Manager.Count()) } if err := ConnRender(conn.Socket, renderData{ClientId: clientId}); err != nil { return } globals.SugarLogger.Debugf("clientId=%s,i=%d", clientId, i) } } }() } // WriteMessage 监听并发送给客户端信息 func WriteMessage() { i := 0 for { clientInfo := <-ToClientChan //广播发送通知所有客户端 i++ fmt.Printf("WriteMessage clientInfo=%s i=%d", utils.Format4Output(clientInfo, false), i) if Manager.AllClient() != nil { for _, conn := range Manager.AllClient() { if err := Render(conn.Socket, clientInfo.MessageId, clientInfo.Code, clientInfo.Msg, clientInfo.Data); err != nil { Manager.DisConnect <- conn } } } else { globals.SugarLogger.Debugf("无客户端连接,请检查") return } //if conn, err := Manager.GetByClientId(clientInfo.ClientId); err == nil && conn != nil { // if err := Render(conn.Socket, clientInfo.MessageId, clientInfo.Code, clientInfo.Msg, clientInfo.Data); err != nil { // Manager.DisConnect <- conn // } //} } } // Start 管道处理程序 func (manager *ClientManager) Start() { for { select { case client := <-manager.Connect: // 建立连接事件 manager.EventConnect(client) case conn := <-manager.DisConnect: // 断开连接事件 manager.EventDisconnect(conn) } } } //从客户端读取数据 func (c *Client) Read() { go func() { for { messageType, msg, err := c.Socket.ReadMessage() if err != nil { if messageType == -1 && websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) { Manager.DisConnect <- c return } else if messageType != websocket.PingMessage { return } } else { SendToVendor(msg) } } }() } // 以下为连接事件操作******************************************* // EventConnect 建立连接事件 func (manager *ClientManager) EventConnect(client *Client) { manager.AddClient(client) } // EventDisconnect 断开连接事件 func (manager *ClientManager) EventDisconnect(client *Client) { //关闭连接 _ = client.Socket.Close() manager.DelClient(client) //标记销毁 client.IsDeleted = true client = nil } //以下为客户端Client操作******************************************* // NewClient 初始化Client func NewClient(clientId string, socket *websocket.Conn) *Client { return &Client{ ClientId: clientId, Socket: socket, ConnectTime: uint64(time.Now().Unix()), IsDeleted: false, } } // AddClient 添加客户端 func (manager *ClientManager) AddClient(client *Client) { manager.ClientIdMapLock.Lock() defer manager.ClientIdMapLock.Unlock() manager.ClientIdMap[client.ClientId] = client } // DelClient 删除客户端 func (manager *ClientManager) DelClient(client *Client) { manager.delClientIdMap(client.ClientId) } // 删除clientIdMap func (manager *ClientManager) delClientIdMap(clientId string) { manager.ClientIdMapLock.Lock() defer manager.ClientIdMapLock.Unlock() delete(manager.ClientIdMap, clientId) } // GetByClientId 通过clientId获取client func (manager *ClientManager) GetByClientId(clientId string) (*Client, error) { manager.ClientIdMapLock.RLock() defer manager.ClientIdMapLock.RUnlock() if client, ok := manager.ClientIdMap[clientId]; !ok { return nil, errors.New("客户端不存在") } else { return client, nil } } // AllClient 获取所有的客户端 func (manager *ClientManager) AllClient() map[string]*Client { manager.ClientIdMapLock.RLock() defer manager.ClientIdMapLock.RUnlock() return manager.ClientIdMap } //与客户端的交互操作************************* // NewClientManager 初始化客户端管理 func NewClientManager() (clientManager *ClientManager) { clientManager = &ClientManager{ ClientIdMap: make(map[string]*Client), Connect: make(chan *Client, 10000), DisConnect: make(chan *Client, 10000), Groups: make(map[string][]string, 100), SystemClients: make(map[string][]string, 100), } return } // Count 获取客户端数量 func (manager *ClientManager) Count() int { manager.ClientIdMapLock.RLock() defer manager.ClientIdMapLock.RUnlock() return len(manager.ClientIdMap) }