This commit is contained in:
richboo111
2023-05-05 14:48:54 +08:00
parent 2b7bfe4e5e
commit 82cd796b5c
11 changed files with 343 additions and 188 deletions

View File

@@ -9,7 +9,7 @@ import (
)
const (
IMVendorIDELM = 11 //饿了么
IMVendorIDELM = 3 //饿了么
)
// OnImMessage 用户/骑手 发送/已读消息 回调

View File

@@ -5,6 +5,10 @@ import (
"errors"
"fmt"
"git.rosy.net.cn/jx-callback/business/jxutils"
"git.rosy.net.cn/baseapi/utils/errlist"
"git.rosy.net.cn/jx-callback/globals"
"git.rosy.net.cn/baseapi/platformapi/ebaiapi"
@@ -19,7 +23,6 @@ import (
// SendToVendor 向平台发消息
func SendToVendor(msg []byte) {
var (
//w http.ResponseWriter
sendData SendData
err error
elmAppID = api.EbaiAPI.GetSource()
@@ -36,7 +39,11 @@ func SendToVendor(msg []byte) {
//发送信息
if sendData.VendorID == VendorIDMT {
temp, _ := json.Marshal(sendData.Data)
Send(temp)
if sendData.Data.(map[string]interface{})["app_id"] == nil {
globals.SugarLogger.Debug("SendToVendor appId=null")
return
}
Send(temp, sendData.Data.(map[string]interface{})["app_id"])
}
if sendData.VendorID == VendorIDELM {
param := sendData.Data.(ebaiapi.BusinessSendMsgReq)
@@ -46,23 +53,24 @@ func SendToVendor(msg []byte) {
}
}
if err != nil {
ClientRender(Fail, FailMsg, map[string]string{
"errMsg": fmt.Sprintf("%v", err),
})
return
} else {
ClientRender(SuccessCode, SuccessMsg, map[string]interface{}{
"vendorID": sendData.VendorID,
"msg": "ok",
})
return
}
//if err != nil {
// ClientRender(Fail, FailMsg, map[string]string{
// "errMsg": fmt.Sprintf("%v", err),
// })
// return err
//} else {
// ClientRender(SuccessCode, SuccessMsg, map[string]interface{}{
// "vendorID": sendData.VendorID,
// "msg": "ok",
// })
// return nil
//}
return
}
func Send(data []byte) {
//生成完整url
fullUrl := GenFullUrl() //clientID暂时不用
func Send(data []byte, appID interface{}) {
//根据appID生成完整url
fullUrl := GenFullUrl(appID.(float64)) //clientID暂时不用
conn, resp, err := websocket.DefaultDialer.Dial(fullUrl, nil)
if err != nil || resp.StatusCode != 101 {
@@ -83,11 +91,11 @@ func Send(data []byte) {
for {
_, msg, err := conn.ReadMessage()
temp := string(msg)
if err != nil {
break
} else {
temp := string(msg)
if temp != HeartCheckSuccess || temp != "成功" {
if temp != HeartSuccessWord {
ReadMsgFromVendor(VendorIDMT, "", msg)
}
}
@@ -96,6 +104,83 @@ func Send(data []byte) {
return
}
// MtInit 发送心跳
func MtInit() {
data := []byte(HeartCheckMsg)
//生成完整url
url := GenFullUrl2()
//主连接
jxutils.CallMsgHandlerAsync(func() {
conn, resp, err := websocket.DefaultDialer.Dial(url.UrlMain, nil)
if err != nil || resp.StatusCode != 101 {
fmt.Printf("连接失败:%v http响应不成功", err)
}
//关闭
defer func(conn *websocket.Conn) {
err := conn.Close()
if err != nil {
return
}
}(conn)
//client连接事件
client := NewClient(url.ClientIDMain, conn, ClientTypeMt)
Manager.Connect <- client
err = conn.WriteMessage(websocket.TextMessage, data)
if err != nil {
fmt.Println(err)
}
for {
_, msg, err := conn.ReadMessage()
temp := string(msg)
if err != nil || temp != "HB" {
break
}
fmt.Printf("%s receive: %s\n", conn.RemoteAddr(), string(msg))
}
}, url.ClientIDMain)
//副连接
if url.UrlSub != "" {
jxutils.CallMsgHandlerAsync(func() {
connSub, respSub, errSub := websocket.DefaultDialer.Dial(url.UrlSub, nil)
if errSub != nil || respSub.StatusCode != 101 {
fmt.Printf("连接失败:%v http响应不成功", errSub)
}
//关闭
defer func(conn *websocket.Conn) {
err := conn.Close()
if err != nil {
return
}
}(connSub)
//client连接事件
client := NewClient(url.ClientIDSub, connSub, ClientTypeMt)
Manager.Connect <- client
errSub = connSub.WriteMessage(websocket.TextMessage, data)
if errSub != nil {
fmt.Println(errSub)
}
for {
_, msg, err := connSub.ReadMessage()
temp := string(msg)
if err != nil || temp != HeartCheckSuccess {
break
}
fmt.Printf("%s connSub:receive: %s\n", connSub.RemoteAddr(), string(msg))
}
}, url.ClientIDSub)
}
}
// ReadMsgFromClient 存储客户端发送的消息
func ReadMsgFromClient(vendorID int, elmAppID string, msg interface{}) {
var (
@@ -221,13 +306,8 @@ func PushMsgByCid(vendorStoreID string, vendorID int) error {
func SetMessageDetail(req *JXMsg, vendorID int, elmAppID string) error {
//生成京西消息ID detail
msgID := GenMsgDetailID(req, vendorID, elmAppID)
err := rdb.Set("test", "可以插入数据sjdfoiqaj", ExpireTimeDay)
if err != nil {
globals.SugarLogger.Debugf("测试插入err:%v", err)
}
data, _ := json.Marshal(req)
err = rdb.RPush(msgID, string(data))
globals.SugarLogger.Debugf("im SetUserList err=%v", err)
err := rdb.RPush(msgID, string(data))
ok, err := rdb.ExpireResult(msgID, ExpireTimeDay)
if err != nil || !ok {
return err
@@ -251,12 +331,6 @@ func SetUserList(jxMsg *JXMsg, userList *UserMessageList, vendorID int, elmAppID
//存储当前数据
data, _ := json.Marshal(userList)
err = rdb.RPush(msgID, string(data))
globals.SugarLogger.Debugf("im SetUserList msgID=%s", msgID)
globals.SugarLogger.Debugf("im SetUserList err=%v", err)
//test
str := rdb.Get(msgID)
globals.SugarLogger.Debugf("im SetUserList str=%v", str)
//over
ok, err := rdb.ExpireResult(msgID, ExpireTimeDay)
if err != nil || !ok {
return err
@@ -331,6 +405,8 @@ func GetImUserList(req []RelInfo) (map[string][]interface{}, error) {
temp := rdb.LRange(j)
for _, v := range temp {
retVal[j] = append(retVal[j], v)
//暂时写死
//retVal["userList"] = append(retVal["userList"], v)
}
}
return retVal, nil
@@ -351,6 +427,7 @@ func GetImChatDetail(req []UserRelInfo) (map[string][]interface{}, error) {
temp := rdb.LRange(j)
for _, v := range temp {
retVal[j] = append(retVal[j], v)
//retVal["chatDetail"] = append(retVal["chatDetail"], v)
}
}
return retVal, nil
@@ -388,3 +465,18 @@ func SetJxMsgRead(appID, vendorStoreID, vendorID, userID string) error {
}
return nil
}
// DelRedisByKey 清除redis数据
func DelRedisByKey(keys []string) {
var errList errlist.ErrList
for _, key := range keys {
err := rdb.Del(key)
if err != nil {
errList.AddErr(err)
}
}
if errList.GetErrListAsOne() != nil {
globals.SugarLogger.Debugf("DelRedisByKey err=%v", errList.GetErrListAsOne())
}
return
}

View File

@@ -12,6 +12,8 @@ import (
"sync"
"time"
"git.rosy.net.cn/jx-callback/globals"
"git.rosy.net.cn/baseapi/platformapi/mtwmapi"
"git.rosy.net.cn/jx-callback/globals/api"
"github.com/gazeboxu/mapstructure"
@@ -38,6 +40,7 @@ type ClientManager struct {
type Client struct {
ClientId string // 标识ID
Socket *websocket.Conn // 用户连接
ClientType string //标识是美团/客户端长链接
ConnectTime uint64 // 首次连接时间
IsDeleted bool // 是否删除或下线
UserId string // 业务端标识用户ID
@@ -132,27 +135,45 @@ type UserRelInfo struct {
UserID string `json:"userID"` //用户id/groupID
}
// UrlInfo 生成美团长链接url信息
type UrlInfo struct {
UrlMain string `json:"urlMain"` //主连接路由
ClientIDMain string `json:"ClientIDMain"` //主连接id
UrlSub string `json:"urlSub"` //副连接路由
ClientIDSub string `json:"ClientIDSub"` //副连接id
}
var (
cfg *ini.File
rdb = api.Cacher
Manager = NewClientManager() // 管理者
CommonSetting = &commonConf{}
GlobalSetting = &global{}
ToClientChan chan clientInfo
heartbeatInterval = 60 * time.Second // 心跳间隔
HeartCheckMsg = "~#HHHBBB#~" //心跳检测消息
HeartCheckSuccess = "HB" //成功发送返回心跳消息
VendorIDMT = 10 //im美团
VendorIDELM = 11 //im饿了么
SendTypeJx = "jx" //京西客户端发送方标识
SendTypeMt = "mt" //美团用户发送方标识符
SendTypeElm = "elm" //饿了么用户发送方标识符
MTIMPushUrl = "wss://wpush.meituan.com/websocket" //buildPushConnect建立长连接
cfg *ini.File
rdb = api.Cacher
//客户端相关
Manager = NewClientManager() // 管理者
ToClientChan chan clientInfo
ClientTypeJx = "jx" //京西客户端
ClientTypeMt = "mt" //美团客户端
//配置文件
CommonSetting = &commonConf{}
GlobalSetting = &global{}
//心跳相关
heartbeatInterval = 60 * time.Second // 心跳间隔
HeartCheckMsg = "~#HHHBBB#~" //心跳检测消息
HeartCheckSuccess = "HB" //成功发送返回心跳消息
HeartSuccessWord = "成功" //成功发送返回心跳消息
//平台标识
AppID5873 = float64(5873)
AppID589 = "589"
VendorIDMT = 1 //im美团
VendorIDELM = 3 //im饿了么
SendTypeJx = "jx" //京西客户端发送方标识
SendTypeMt = "mt" //美团用户发送方标识符
SendTypeElm = "elm" //饿了么用户发送方标识符
MTIMPushUrl = "wss://wpush.meituan.com/websocket" //buildPushConnect建立长连接
)
const (
ExpireTimeDay = 24 * time.Hour //redis一天过期时间
maxMessageSize = 8192 // 最大的消息大小
ExpireTimeDay = 2 * time.Hour //redis一天过期时间
maxMessageSize = 8192 // 最大的消息大小
)
type renderData struct {
@@ -162,7 +183,7 @@ type renderData struct {
const (
SuccessCode = 0
SuccessMsg = "success"
Fail = -1
FailCode = -1
FailMsg = "fail"
SYSTEM_ID_ERROR = -1001
@@ -189,12 +210,13 @@ func Render(conn *websocket.Conn, messageId string, code int, message string, da
}
}
func ClientRender(code int, msg string, data interface{}) (str string) {
// ClientRender http响应
func ClientRender(code int, msg string) (str string) {
var retData RetData
retData.Code = code
retData.Msg = msg
retData.Data = data
//retData.Data = data
retJson, _ := json.Marshal(retData)
str = string(retJson)
@@ -272,7 +294,16 @@ func getIntranetIp() string {
}
// GenFullUrl 组装完整websocket url以及生成clientID
func GenFullUrl() (fullUrl string) {
func GenFullUrl(appID float64) (fullUrl string) {
if appID == AppID5873 {
if resp5873, err := api.Mtwm2API.GetConnectionToken(); err == nil {
r1 := mtwmapi.GetConnTokenResp{}
err = mapstructure.Decode(resp5873, &r1)
fullUrl = MTIMPushUrl + "/" + r1.AppKey + "/" + r1.ConnectionToken
return fullUrl
}
}
//589/4123
resp, err := api.MtwmAPI.GetConnectionToken()
if err != nil {
return ""
@@ -280,13 +311,48 @@ func GenFullUrl() (fullUrl string) {
retVal := mtwmapi.GetConnTokenResp{}
err = mapstructure.Decode(resp, &retVal)
fullUrl = MTIMPushUrl + "/" + retVal.AppKey + "/" + retVal.ConnectionToken
//clientID = api.MtwmAPI.GetAppID() + ":" + retVal.ConnectionToken
//打印输出
//fmt.Printf("Create websocket connect failCount:%d", retVal.UserCount)
//todo 测试
tete := api.MtwmAPI.GetAppID()
globals.SugarLogger.Debugf("GenFullUrl appID=%s", tete)
//todo
fmt.Printf("GenFullUrl:%s", fullUrl)
return fullUrl
}
//生成随机字符串
// GenFullUrl2 组装完整websocket url以及生成clientID
func GenFullUrl2() *UrlInfo {
urlInfo := &UrlInfo{}
//1 589/4123
resp, err := api.MtwmAPI.GetConnectionToken()
if err != nil {
return nil
}
retVal := mtwmapi.GetConnTokenResp{}
err = mapstructure.Decode(resp, &retVal)
urlInfo.UrlMain = MTIMPushUrl + "/" + retVal.AppKey + "/" + retVal.ConnectionToken
urlInfo.ClientIDMain = retVal.AppKey + ":" + retVal.ConnectionToken
if api.MtwmAPI.GetAppID() == AppID589 { //目前果园无4123
if resp5873, err := api.Mtwm2API.GetConnectionToken(); err == nil {
r1 := mtwmapi.GetConnTokenResp{}
err = mapstructure.Decode(resp5873, &r1)
urlInfo.UrlSub = MTIMPushUrl + "/" + r1.AppKey + "/" + r1.ConnectionToken
urlInfo.ClientIDSub = r1.AppKey + ":" + r1.ConnectionToken
}
}
//todo 测试
tete := api.MtwmAPI.GetAppID()
globals.SugarLogger.Debugf("GenFullUrl appID=%s", tete)
//todo
fmt.Printf("GenFullUrl:urlMain=%s, urlSub=%s", urlInfo.UrlMain, urlInfo.UrlSub)
return urlInfo
}
// RandString 生成随机字符串
func RandString() string {
bytes := make([]byte, 16)
for i := 0; i < 16; i++ {

View File

@@ -6,6 +6,8 @@ import (
"net/http"
"time"
"git.rosy.net.cn/jx-callback/business/jxutils"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/globals"
@@ -18,16 +20,21 @@ func Init() {
//写入全局变量
Setup()
//建立长链接
go Send([]byte(HeartCheckMsg))
jxutils.CallMsgHandlerAsync(func() {
MtInit()
}, "MtInit:"+RandString())
//启动定时器
PingTimer()
go WriteMessage()
jxutils.CallMsgHandlerAsync(func() {
WriteMessage()
}, "WriteMessage:"+RandString())
go Manager.Start()
jxutils.CallMsgHandlerAsync(func() {
Manager.Start()
}, "Manager Start:"+RandString())
//fmt.Printf("服务器启动成功,端口号:%s\n", CommonSetting.HttpPort)
}
func Run(w http.ResponseWriter, r *http.Request) {
@@ -54,8 +61,8 @@ func Run(w http.ResponseWriter, r *http.Request) {
} else {
clientID = temp
}
globals.SugarLogger.Debugf("Run clientID=%s", clientID)
clientSocket := NewClient(clientID, conn)
clientSocket := NewClient(clientID, conn, ClientTypeJx)
//读取客户端消息
clientSocket.Read()
@@ -69,26 +76,6 @@ func Run(w http.ResponseWriter, r *http.Request) {
Manager.Connect <- clientSocket
}
func StartWebSocket(conn *websocket.Conn, clientID string, err error) {
//设置读取消息大小上线
conn.SetReadLimit(maxMessageSize)
clientSocket := NewClient(clientID, conn)
//读取客户端消息
clientSocket.Read()
if err = ConnRender(conn, renderData{ClientId: clientID}); err != nil {
_ = conn.Close()
return
}
// 用户连接事件
Manager.Connect <- clientSocket
}
// PingTimer 定时器发送心跳
func PingTimer() {
go func() {
@@ -99,16 +86,20 @@ func PingTimer() {
for {
i++
<-ticker.C
//对美团发送心跳
Send([]byte(HeartCheckMsg))
for clientId, conn := range Manager.AllClient() {
if err := conn.Socket.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
Manager.DisConnect <- conn
globals.SugarLogger.Debugf("发送心跳失败: %s 总连接数:%d", clientId, Manager.Count())
}
if err := ConnRender(conn.Socket, renderData{ClientId: clientId}); err != nil {
return
if conn.ClientType == ClientTypeJx {
if err := conn.Socket.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
Manager.DisConnect <- conn
globals.SugarLogger.Debugf("发送心跳失败: %s 总连接数:%d", clientId, Manager.Count())
}
if err := ConnRender(conn.Socket, renderData{ClientId: clientId}); err != nil {
return
}
} else {
if err := conn.Socket.WriteMessage(websocket.TextMessage, []byte(HeartCheckMsg)); err != nil {
//对美团重新建立连接
MtInit()
}
}
globals.SugarLogger.Debugf("发送心跳 clientId=%s,i=%d", clientId, i)
}
@@ -121,14 +112,16 @@ func WriteMessage() {
i := 0
for {
clientInfo := <-ToClientChan
//广播发送通知所有客户端
//广播发送通知所有京西客户端
i++
fmt.Printf("WriteMessage clientInfo=%s i=%d", utils.Format4Output(clientInfo, false), i)
if Manager.AllClient() != nil {
for _, conn := range Manager.AllClient() {
globals.SugarLogger.Debugf("WriteMessage conn.ClientId=%s", conn.ClientId)
if err := Render(conn.Socket, clientInfo.MessageId, clientInfo.Code, clientInfo.Msg, clientInfo.Data); err != nil {
Manager.DisConnect <- conn
if conn.ClientType == ClientTypeJx { //只发送给京西
globals.SugarLogger.Debugf("WriteMessage conn.ClientId=%s", conn.ClientId)
if err := Render(conn.Socket, clientInfo.MessageId, clientInfo.Code, clientInfo.Msg, clientInfo.Data); err != nil {
Manager.DisConnect <- conn
}
}
}
} else {
@@ -162,6 +155,8 @@ func (c *Client) Read() {
go func() {
for {
messageType, msg, err := c.Socket.ReadMessage()
//temp := string(msg)
//fmt.Print(temp)
if err != nil {
if messageType == -1 && websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
Manager.DisConnect <- c
@@ -169,10 +164,8 @@ func (c *Client) Read() {
} else if messageType != websocket.PingMessage {
return
}
} else {
SendToVendor(msg)
return
}
SendToVendor(msg)
}
}()
}
@@ -197,10 +190,11 @@ func (manager *ClientManager) EventDisconnect(client *Client) {
//以下为客户端Client操作*******************************************
// NewClient 初始化Client
func NewClient(clientId string, socket *websocket.Conn) *Client {
func NewClient(clientId string, socket *websocket.Conn, clientType string) *Client {
return &Client{
ClientId: clientId,
Socket: socket,
ClientType: clientType,
ConnectTime: uint64(time.Now().Unix()),
IsDeleted: false,
}