package im import ( "errors" "fmt" "net/http" "time" "git.rosy.net.cn/jx-callback/business/jxutils" "git.rosy.net.cn/jx-callback/globals" "github.com/gorilla/websocket" ) func Init() { //初始化 ToClientChan = make(chan clientInfo, 1000) //写入全局变量 Setup() //建立长链接 jxutils.CallMsgHandlerAsync(func() { MtInit() }, "MtInit:"+RandString()) //启动定时器 PingTimer() jxutils.CallMsgHandlerAsync(func() { WriteMessage() }, "WriteMessage:"+RandString()) jxutils.CallMsgHandlerAsync(func() { Manager.Start() }, "Manager Start:"+RandString()) } func Run(w http.ResponseWriter, r *http.Request) { conn, err := (&websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, // 允许所有CORS跨域请求 CheckOrigin: func(r *http.Request) bool { return true }, }).Upgrade(w, r, nil) if err != nil { globals.SugarLogger.Debugf("upgrade error: %v", err) http.NotFound(w, r) return } //设置读取消息大小上线 conn.SetReadLimit(maxMessageSize) clientID := "" if temp := r.Header.Get("Clientid"); len(temp) == 0 { clientID = RandString() } else { clientID = temp } clientSocket := NewClient(clientID, conn, ClientTypeJx) //读取客户端消息 clientSocket.Read() if err = ConnRender(conn, renderData{ClientId: clientID}); err != nil { _ = conn.Close() return } // 用户连接事件 Manager.Connect <- clientSocket } // PingTimer 定时器发送心跳 func PingTimer() { go func() { ticker := time.NewTicker(heartbeatInterval) defer ticker.Stop() //测试用 //i := 0 for { //i++ <-ticker.C for clientId, conn := range Manager.AllClient() { if conn.ClientType == ClientTypeJx { if err := conn.Socket.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { Manager.DisConnect <- conn fmt.Printf("发送心跳失败: %s 总连接数:%d", clientId, Manager.Count()) } if err := ConnRender(conn.Socket, renderData{ClientId: clientId}); err != nil { return } } else { if err := conn.Socket.WriteMessage(websocket.TextMessage, []byte(HeartCheckMsg)); err != nil { fmt.Printf("PingTimer mtHeartBeat err:%v", err) //对美团重新建立连接 MtInit() } } } } }() } // WriteMessage 监听并发送给客户端信息 func WriteMessage() { i := 0 for { clientInfo := <-ToClientChan //广播发送通知所有京西客户端 i++ //fmt.Printf("WriteMessage clientInfo=%s i=%d\n", utils.Format4Output(clientInfo, false), i) if Manager.AllClient() != nil { for _, conn := range Manager.AllClient() { if conn.ClientType == ClientTypeJx { //只发送给京西 fmt.Printf("WriteMessage conn.ClientId=%s\n", conn.ClientId) if err := Render(conn.Socket, clientInfo.MessageId, clientInfo.Code, clientInfo.Msg, clientInfo.Data); err != nil { Manager.DisConnect <- conn } } } } else { globals.SugarLogger.Debugf("无客户端连接,请检查") return } } } // Start 管道处理程序 func (manager *ClientManager) Start() { for { select { case client := <-manager.Connect: // 建立连接事件 manager.EventConnect(client) case conn := <-manager.DisConnect: // 断开连接事件 manager.EventDisconnect(conn) } } } //从客户端读取数据 func (c *Client) Read() { go func() { for { messageType, msg, err := c.Socket.ReadMessage() //temp := string(msg) //fmt.Print(temp) if err != nil { if messageType == -1 && websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) { Manager.DisConnect <- c return } else if messageType != websocket.PingMessage { return } } fmt.Printf("Client Read:receive: %s\n", string(msg)) SendToVendor(msg) } }() } // 以下为连接事件操作******************************************* // EventConnect 建立连接事件 func (manager *ClientManager) EventConnect(client *Client) { manager.AddClient(client) } // EventDisconnect 断开连接事件 func (manager *ClientManager) EventDisconnect(client *Client) { //关闭连接 _ = client.Socket.Close() manager.DelClient(client) //标记销毁 client.IsDeleted = true client = nil } //以下为客户端Client操作******************************************* // NewClient 初始化Client func NewClient(clientId string, socket *websocket.Conn, clientType string) *Client { return &Client{ ClientId: clientId, Socket: socket, ClientType: clientType, ConnectTime: uint64(time.Now().Unix()), IsDeleted: false, } } // AddClient 添加客户端 func (manager *ClientManager) AddClient(client *Client) { manager.ClientIdMapLock.Lock() defer manager.ClientIdMapLock.Unlock() manager.ClientIdMap[client.ClientId] = client } // DelClient 删除客户端 func (manager *ClientManager) DelClient(client *Client) { manager.delClientIdMap(client.ClientId) } // 删除clientIdMap func (manager *ClientManager) delClientIdMap(clientId string) { manager.ClientIdMapLock.Lock() defer manager.ClientIdMapLock.Unlock() delete(manager.ClientIdMap, clientId) } // GetByClientId 通过clientId获取client func (manager *ClientManager) GetByClientId(clientId string) (*Client, error) { manager.ClientIdMapLock.RLock() defer manager.ClientIdMapLock.RUnlock() if client, ok := manager.ClientIdMap[clientId]; !ok { return nil, errors.New("客户端不存在") } else { return client, nil } } // AllClient 获取所有的客户端 func (manager *ClientManager) AllClient() map[string]*Client { manager.ClientIdMapLock.RLock() defer manager.ClientIdMapLock.RUnlock() return manager.ClientIdMap } //与客户端的交互操作************************* // NewClientManager 初始化客户端管理 func NewClientManager() (clientManager *ClientManager) { clientManager = &ClientManager{ ClientIdMap: make(map[string]*Client), Connect: make(chan *Client, 10000), DisConnect: make(chan *Client, 10000), Groups: make(map[string][]string, 100), SystemClients: make(map[string][]string, 100), } return } // Count 获取客户端数量 func (manager *ClientManager) Count() int { manager.ClientIdMapLock.RLock() defer manager.ClientIdMapLock.RUnlock() return len(manager.ClientIdMap) }