This commit is contained in:
richboo111
2023-06-16 17:33:51 +08:00
parent 64e4339aeb
commit 061baad58f
9 changed files with 109 additions and 798 deletions

View File

@@ -5,7 +5,7 @@ import (
"errors"
"fmt"
"git.rosy.net.cn/jx-callback/business/jxutils"
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/baseapi/utils/errlist"
@@ -17,192 +17,53 @@ import (
"git.rosy.net.cn/baseapi/utils"
push "git.rosy.net.cn/jx-callback/business/jxutils/unipush"
"git.rosy.net.cn/jx-callback/globals/api"
"github.com/gorilla/websocket"
)
// SendToVendor 向平台发消息
func SendToVendor(msg []byte) {
var (
sendData SendData
err error
elmAppID = api.EbaiAPI.GetSource()
)
//解析数据
if err = json.Unmarshal(msg, &sendData); err != nil {
return
}
//存储数据
ReadMsgFromClient(sendData.VendorID, elmAppID, sendData.Data)
//发送信息
if sendData.VendorID == VendorIDMT {
temp, _ := json.Marshal(sendData.Data)
if sendData.Data.(map[string]interface{})["app_id"] == nil {
globals.SugarLogger.Debug("SendToVendor appId=null")
return
}
Send(temp, sendData.Data.(map[string]interface{})["app_id"])
return
}
if sendData.VendorID == VendorIDELM {
param := sendData.Data.(ebaiapi.BusinessSendMsgReq)
if err := api.EbaiAPI.BusinessSendMsg(&param); err != nil {
globals.SugarLogger.Debugf("elm发送信息错误%v", err)
return
func SendVendorV2(data SendData) (err error) {
if data.VendorID == model.VendorIDMTWM {
dataStr, _ := json.Marshal(data.Data)
temp := string(dataStr)
fmt.Println(temp)
if _, err = api.MtwmAPI.MsgSend(string(dataStr)); err != nil {
return err
}
}
return
}
func Send(data []byte, appID interface{}) {
//根据appID生成完整url
fullUrl := GenFullUrl(appID.(float64)) //clientID暂时不用
conn, resp, err := websocket.DefaultDialer.Dial(fullUrl, nil)
if err != nil || resp.StatusCode != 101 {
fmt.Printf("连接失败:%v http响应不成功", err)
}
//关闭
defer func(conn *websocket.Conn) {
err := conn.Close()
if err != nil {
return
}
}(conn)
err = conn.WriteMessage(websocket.TextMessage, data)
//if data.VendorID == model.VendorIDEBAI { //todo 后续添加
// err = nil
//}
err = ReadMsgFromClient(data.VendorID, "", data.Data)
if err != nil {
fmt.Println(err)
globals.SugarLogger.Debugf("SendVendorV2:%v", err)
}
for {
_, msg, err := conn.ReadMessage()
temp := string(msg)
res := JsonCommon(HeartSuccessWord)
fmt.Printf("Send %s receive: %s\n", conn.RemoteAddr(), string(msg))
if err != nil {
break
} else if temp == res {
continue
} else {
ReadMsgFromVendor(VendorIDMT, "", msg)
}
}
}
// MtInit 发送心跳
func MtInit() {
data := []byte(HeartCheckMsg)
//生成完整url
url := GenFullUrl2()
//主连接
jxutils.CallMsgHandlerAsync(func() {
conn, resp, err := websocket.DefaultDialer.Dial(url.UrlMain, nil)
if err != nil || resp.StatusCode != 101 {
fmt.Printf("连接失败:%v http响应不成功", err)
}
//关闭
defer func(conn *websocket.Conn) {
err := conn.Close()
if err != nil {
return
}
}(conn)
//client连接事件
client := NewClient(url.ClientIDMain, conn, ClientTypeMt)
Manager.Connect <- client
err = conn.WriteMessage(websocket.TextMessage, data)
if err != nil {
fmt.Println(err)
}
for {
_, msg, err := conn.ReadMessage()
temp := string(msg)
res := JsonCommon(HeartCheckSuccess)
fmt.Printf("MtInit %s receive: %s\n", conn.RemoteAddr(), string(msg))
if err != nil {
break
} else if temp == res {
continue
} else {
ReadMsgFromVendor(VendorIDMT, "", msg)
}
}
}, url.ClientIDMain)
//副连接
if url.UrlSub != "" {
jxutils.CallMsgHandlerAsync(func() {
connSub, respSub, errSub := websocket.DefaultDialer.Dial(url.UrlSub, nil)
if errSub != nil || respSub.StatusCode != 101 {
fmt.Printf("连接失败:%v http响应不成功", errSub)
}
//关闭
defer func(conn *websocket.Conn) {
err := conn.Close()
if err != nil {
return
}
}(connSub)
//client连接事件
client := NewClient(url.ClientIDSub, connSub, ClientTypeMt)
Manager.Connect <- client
errSub = connSub.WriteMessage(websocket.TextMessage, data)
if errSub != nil {
fmt.Println(errSub)
}
for {
_, msg, err := connSub.ReadMessage()
temp := string(msg)
res := JsonCommon(HeartCheckSuccess)
if err != nil || temp == res {
break
} else {
ReadMsgFromVendor(VendorIDMT, "", msg)
}
fmt.Printf("MtInit %s connSub:receive: %s\n", connSub.RemoteAddr(), string(msg))
}
}, url.ClientIDSub)
}
return nil
}
// ReadMsgFromClient 存储客户端发送的消息
func ReadMsgFromClient(vendorID int, elmAppID string, msg interface{}) {
func ReadMsgFromClient(vendorID int, elmAppID string, msg interface{}) error {
var (
err error
jxMsg = &JXMsg{}
errList errlist.ErrList
userList = &UserMessageList{}
)
data, err := json.Marshal(msg)
if err != nil {
return
errList.AddErr(fmt.Errorf("json处理数据错误%v", err))
}
if vendorID == VendorIDMT {
var MtSingleChat = mtwmapi.SingleChat{}
err = json.Unmarshal(data, &MtSingleChat)
var pushContent = mtwmapi.PushContentReq{}
err = json.Unmarshal(data, &pushContent)
jxMsg = &JXMsg{
SendType: SendTypeJx,
MsgContent: MtSingleChat,
MsgContent: pushContent,
}
userList = &UserMessageList{
VendorID: VendorIDMT,
UserID: utils.Int2Str(MtSingleChat.OpenUserID),
LatestMsg: MtSingleChat.MsgContent,
LatestTime: MtSingleChat.Cts,
UserID: utils.Int2Str(pushContent.OpenUserID),
LatestMsg: pushContent.MsgContent,
LatestTime: pushContent.Cts,
}
}
if vendorID == VendorIDELM {
@@ -222,41 +83,44 @@ func ReadMsgFromClient(vendorID int, elmAppID string, msg interface{}) {
//1 存储详细聊天记录list
if err = SetMessageDetail(jxMsg, vendorID, elmAppID); err != nil {
globals.SugarLogger.Debugf("ReadMsgFromClient SetMessageDetail err:=%v\n", err)
//return
errList.AddErr(fmt.Errorf("存储详细聊天记录错误:%v", err))
}
//2 存储展示列表时单条数据
if err = SetUserList(jxMsg, userList, vendorID, elmAppID); err != nil {
globals.SugarLogger.Debugf("ReadMsgFromClient SetUserList err:=%v\n", err)
//return
errList.AddErr(fmt.Errorf("存储STU聊天记录错误%v", err))
}
if errList.GetErrListAsOne() != nil {
return fmt.Errorf("ReadMsgFromClient:%v", errList.GetErrListAsOne())
}
return nil
}
// ReadMsgFromVendor 读取数据并存储到redis
func ReadMsgFromVendor(vendorID int, elmAppID string, msg []byte) {
if string(msg) == "" {
return
}
func ReadMsgFromVendor(vendorID int, elmAppID string, msg []byte) error {
var (
err error
//vendorStoreID string
jxMsg = &JXMsg{}
userList = &UserMessageList{}
err error
jxMsg = &JXMsg{}
vendorStoreID string
errList errlist.ErrList
userList = &UserMessageList{}
)
if string(msg) == "" {
errList.AddErr(fmt.Errorf("读取平台数据为空,请检查"))
}
if vendorID == VendorIDMT {
var MtSingleChat = mtwmapi.SingleChat{}
err = json.Unmarshal(msg, &MtSingleChat)
var PushContentReq = mtwmapi.PushContentReq{}
err = json.Unmarshal(msg, &PushContentReq)
jxMsg = &JXMsg{
SendType: SendTypeMt,
MsgContent: MtSingleChat,
MsgContent: PushContentReq,
}
userList = &UserMessageList{
VendorID: VendorIDMT,
UserID: utils.Int2Str(MtSingleChat.OpenUserID),
LatestMsg: MtSingleChat.MsgContent,
LatestTime: MtSingleChat.Cts,
UserID: utils.Int2Str(PushContentReq.OpenUserID),
LatestMsg: PushContentReq.MsgContent,
LatestTime: PushContentReq.Cts,
}
//vendorStoreID = MtSingleChat.AppPoiCode
vendorStoreID = PushContentReq.AppPoiCode
}
if vendorID == VendorIDELM {
var ElmData = ebaiapi.ImMessageSend{}
@@ -275,23 +139,21 @@ func ReadMsgFromVendor(vendorID int, elmAppID string, msg []byte) {
//1 存储详细聊天记录list
if err = SetMessageDetail(jxMsg, vendorID, elmAppID); err != nil {
globals.SugarLogger.Debugf("ReadMsgFromVendor SetMessageDetail err:=%v\n", err)
//return
errList.AddErr(fmt.Errorf("存储详细聊天记录错误:%v", err))
}
//2 存储展示列表时单条数据
if err = SetUserList(jxMsg, userList, vendorID, elmAppID); err != nil {
globals.SugarLogger.Debugf("ReadMsgFromVendor SetUserList err:=%v\n", err)
//return
errList.AddErr(fmt.Errorf("存储STU聊天记录错误%v", err))
}
//3 cid推送新消息
//err = PushMsgByCid(vendorStoreID, vendorID)
//4 长链接通知给客户端
if err != nil {
ToClientChan <- clientInfo{Code: SuccessCode, Msg: fmt.Sprintf("%v", err), Data: jxMsg}
} else {
ToClientChan <- clientInfo{Code: SuccessCode, Msg: SuccessMsg, Data: jxMsg}
if err = PushMsgByCid(vendorStoreID, vendorID); err != nil {
errList.AddErr(fmt.Errorf("向商家cid推送新消息错误%v", err))
}
return
if errList.GetErrListAsOne() != nil {
return fmt.Errorf("ReadMsgFromVendor:%v", errList.GetErrListAsOne())
}
return nil
}
// PushMsgByCid 通过cid push用户
@@ -303,7 +165,7 @@ func PushMsgByCid(vendorStoreID string, vendorID int) error {
}
// SetMessageDetail 赋值
//格式 AppID:AppPoiCode:10:OpenUserID
//格式 AppID:AppPoiCode:1:OpenUserID
func SetMessageDetail(req *JXMsg, vendorID int, elmAppID string) error {
//生成京西消息ID detail
msgID := GenMsgDetailID(req, vendorID, elmAppID)
@@ -368,7 +230,7 @@ func GetNewAndTrim(key string, flag string) (cnt int, err error) {
// GenMsgDetailID 生成查询详细聊天记录ID
func GenMsgDetailID(jxMsg *JXMsg, vendorID int, elmAppID string) (msgID string) {
if vendorID == VendorIDMT {
var d1 = jxMsg.MsgContent.(mtwmapi.SingleChat)
var d1 = jxMsg.MsgContent.(mtwmapi.PushContentReq)
msgID = utils.Int2Str(d1.AppID) + ":" + d1.AppPoiCode + ":1:" + utils.Int2Str(d1.OpenUserID)
}
if vendorID == VendorIDELM {
@@ -381,7 +243,7 @@ func GenMsgDetailID(jxMsg *JXMsg, vendorID int, elmAppID string) (msgID string)
// GenMsgListID 生成展示列表时单条数据ID部分
func GenMsgListID(jxMsg *JXMsg, vendorID int, elmAppID string) (msgID string) {
if vendorID == VendorIDMT {
var d1 = jxMsg.MsgContent.(mtwmapi.SingleChat)
var d1 = jxMsg.MsgContent.(mtwmapi.PushContentReq)
msgID = utils.Int2Str(d1.AppID) + ":" + d1.AppPoiCode + ":1"
}
if vendorID == VendorIDELM {

View File

@@ -1,85 +1,11 @@
package im
import (
"encoding/json"
"flag"
"fmt"
"io"
"log"
r "math/rand"
"net"
"net/http/httptest"
"sync"
"time"
"git.rosy.net.cn/baseapi/platformapi/mtwmapi"
"git.rosy.net.cn/jx-callback/globals/api"
"github.com/gazeboxu/mapstructure"
"github.com/gorilla/websocket"
"gopkg.in/ini.v1"
)
// ClientManager 连接管理
type ClientManager struct {
ClientIdMap map[string]*Client // 全部的连接
ClientIdMapLock sync.RWMutex // 读写锁
Connect chan *Client // 连接处理
DisConnect chan *Client // 断开连接处理
GroupLock sync.RWMutex
Groups map[string][]string
SystemClientsLock sync.RWMutex
SystemClients map[string][]string
}
// Client 客户端连接信息
type Client struct {
ClientId string // 标识ID
Socket *websocket.Conn // 用户连接
ClientType string //标识是美团/客户端长链接
ConnectTime uint64 // 首次连接时间
IsDeleted bool // 是否删除或下线
UserId string // 业务端标识用户ID
//Extend string // 扩展字段,用户可以自定义
//GroupList []string
}
//channel通道结构体
type clientInfo struct {
ClientId string `json:"clientId" validate:"required"` //链接ID
Data interface{}
SendUserId string
MessageId string
Code int
Msg string
}
// RetData 统一返回值结构体
type RetData struct {
Code int `json:"code"` //响应code
Msg string `json:"msg"` //响应msg success/fail
//MsgType string `json:"msgType"` //发送消息方类型mt;elm;jx
Data interface{} `json:"data"` //信息
//MessageType string `json:"messageType"` //消息类型 heart-心跳检测send-发送消息receive-接收消息
//MessageId string `json:"messageId"` //发送/接收信息 id
//UserId string `json:"userId"` //必须是平台方userID
}
type global struct {
LocalHost string //本机内网IP
ServerList map[string]string
ServerListLock sync.RWMutex
}
type commonConf struct {
HttpPort string
RPCPort string
Cluster bool
CryptoKey string
}
// SendData 客户端写入参数
type SendData struct {
VendorID int `json:"vendorID"` //消息来源平台ID 1-美团 3-饿了么
@@ -92,23 +18,9 @@ type JXMsg struct {
MsgContent interface{} `json:"msgContent"` //美团/饿了么 单聊消息
}
// GetUserListReq 获取门店用户聊天列表
type GetUserListReq struct {
VendorStoreID string `json:"vendorStoreID"` //平台门店id
VendorID string `json:"vendorID"` //平台标识id
AppID string `json:"appID"` //应用ID
}
type GetChatDetailReq struct {
VendorStoreID string `json:"vendorStoreID"` //平台门店id
VendorID string `json:"vendorID"` //平台标识id
AppID string `json:"appID"` //应用ID
UserID string `json:"userID"` //userID/groupID
}
// UserMessageList 用户消息列表
type UserMessageList struct {
VendorID int `json:"vendorID"` //平台品牌 10-美团 11-饿了么
VendorID int `json:"vendorID"` //平台品牌 1-美团 3-饿了么
UserID string `json:"userID"` //用户ID
NewMessageNum int `json:"NewMessageNum"` //新消息数量
LatestMsg string `json:"latestMsg"` //最新一条消息
@@ -128,222 +40,16 @@ type UserRelInfo struct {
UserID string `json:"userID"` //用户id/groupID
}
// UrlInfo 生成美团长链接url信息
type UrlInfo struct {
UrlMain string `json:"urlMain"` //主连接路由
ClientIDMain string `json:"ClientIDMain"` //主连接id
UrlSub string `json:"urlSub"` //副连接路由
ClientIDSub string `json:"ClientIDSub"` //副连接id
}
var (
cfg *ini.File
rdb = api.Cacher
//客户端相关
Manager = NewClientManager() // 管理者
ToClientChan chan clientInfo
ClientTypeJx = "jx" //京西客户端
ClientTypeMt = "mt" //美团客户端
//配置文件
CommonSetting = &commonConf{}
GlobalSetting = &global{}
//心跳相关
heartbeatInterval = 20 * time.Second // 心跳间隔
HeartCheckMsg = "~#HHHBBB#~" //心跳检测消息
HeartCheckSuccess = "HB" //成功发送返回心跳消息
HeartSuccessWord = "成功" //成功发送返回心跳消息
//平台标识
AppID5873 = float64(5873)
AppID589 = "589"
VendorIDMT = 1 //im美团
VendorIDELM = 3 //im饿了么
SendTypeJx = "jx" //京西客户端发送方标识
SendTypeMt = "mt" //美团用户发送方标识符
SendTypeElm = "elm" //饿了么用户发送方标识符
MTIMPushUrl = "wss://wpush.meituan.com/websocket" //buildPushConnect建立长连接
rdb = api.Cacher
VendorIDMT = 1 //im美团
VendorIDELM = 3 //im饿了么
SendTypeJx = "jx" //京西客户端发送方标识
SendTypeMt = "mt" //美团用户发送方标识符
SendTypeElm = "elm" //饿了么用户发送方标识符
)
const (
ExpireTimeDay = 2 * time.Hour //redis一天过期时间
maxMessageSize = 8192 // 最大的消息大小
ExpireTimeDay = 4 * time.Hour //redis过期时间
)
type renderData struct {
ClientId string `json:"clientId"`
}
const (
SuccessCode = 0
SuccessMsg = "success"
FailCode = -1
FailMsg = "fail"
SYSTEM_ID_ERROR = -1001
ONLINE_MESSAGE_CODE = 1001
OFFLINE_MESSAGE_CODE = 1002
)
// Render 统一返回值
func Render(conn *websocket.Conn, messageId string, code int, message string, data interface{}) error {
if data != nil {
str, _ := json.Marshal(data)
temp := string(str)
return conn.WriteJSON(RetData{
Code: code,
Msg: message,
//MsgType: temp.SendType,
Data: temp,
})
} else {
return conn.WriteJSON(RetData{
Code: code,
Msg: message,
})
}
}
// ClientRender http响应
func ClientRender(code int, msg string) (str string) {
var retData RetData
retData.Code = code
retData.Msg = msg
//retData.Data = data
retJson, _ := json.Marshal(retData)
str = string(retJson)
w := httptest.NewRecorder()
w.Header().Set("Content-Type", "application/json; charset=utf-8")
_, _ = io.WriteString(w, str)
return
}
func ConnRender(conn *websocket.Conn, data interface{}) (err error) {
err = conn.WriteJSON(RetData{
Code: SuccessCode,
Msg: "success",
Data: data,
})
return
}
// Default 给默认值
func Default() {
CommonSetting = &commonConf{
HttpPort: "6000",
RPCPort: "7000",
Cluster: false,
CryptoKey: "Adba723b7fe06819",
}
GlobalSetting = &global{
LocalHost: getIntranetIp(),
ServerList: make(map[string]string),
}
}
// Setup 初始化全局设置变量
func Setup() {
configFile := flag.String("c", "conf/app.ini", "-c conf/app.ini")
var err error
cfg, err = ini.Load(*configFile)
if err != nil {
log.Fatalf("setting.Setup, fail to parse 'conf/app.ini': %v", err)
}
mapTo("common", CommonSetting)
GlobalSetting = &global{
LocalHost: getIntranetIp(),
ServerList: make(map[string]string),
}
fmt.Printf("LocalHost=%s\n ", GlobalSetting.LocalHost)
}
// mapTo map section
func mapTo(section string, v interface{}) {
err := cfg.Section(section).MapTo(v)
if err != nil {
log.Fatalf("Cfg.MapTo %s err: %v", section, err)
}
}
//获取本机IP
func getIntranetIp() string {
addrs, _ := net.InterfaceAddrs()
for _, addr := range addrs {
// 检查ip地址判断是否回环地址
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
return ipnet.IP.String()
}
}
}
return ""
}
// GenFullUrl 组装完整websocket url以及生成clientID
func GenFullUrl(appID float64) (fullUrl string) {
if appID == AppID5873 {
if resp5873, err := api.Mtwm2API.GetConnectionToken(); err == nil {
r1 := mtwmapi.GetConnTokenResp{}
err = mapstructure.Decode(resp5873, &r1)
fullUrl = MTIMPushUrl + "/" + r1.AppKey + "/" + r1.ConnectionToken
return fullUrl
}
}
//589/4123
resp, err := api.MtwmAPI.GetConnectionToken()
if err != nil {
return ""
}
retVal := mtwmapi.GetConnTokenResp{}
err = mapstructure.Decode(resp, &retVal)
fullUrl = MTIMPushUrl + "/" + retVal.AppKey + "/" + retVal.ConnectionToken
return fullUrl
}
// GenFullUrl2 组装完整websocket url以及生成clientID
func GenFullUrl2() *UrlInfo {
urlInfo := &UrlInfo{}
//1 589/4123
resp, err := api.MtwmAPI.GetConnectionToken()
if err != nil {
return nil
}
retVal := mtwmapi.GetConnTokenResp{}
err = mapstructure.Decode(resp, &retVal)
urlInfo.UrlMain = MTIMPushUrl + "/" + retVal.AppKey + "/" + retVal.ConnectionToken
urlInfo.ClientIDMain = retVal.AppKey + ":" + retVal.ConnectionToken
if api.MtwmAPI.GetAppID() == AppID589 { //目前果园无4123
if resp5873, err := api.Mtwm2API.GetConnectionToken(); err == nil {
r1 := mtwmapi.GetConnTokenResp{}
err = mapstructure.Decode(resp5873, &r1)
urlInfo.UrlSub = MTIMPushUrl + "/" + r1.AppKey + "/" + r1.ConnectionToken
urlInfo.ClientIDSub = r1.AppKey + ":" + r1.ConnectionToken
}
}
return urlInfo
}
// RandString 生成随机字符串
func RandString() string {
bytes := make([]byte, 16)
for i := 0; i < 16; i++ {
b := r.Intn(26) + 65
bytes[i] = byte(b)
}
return string(bytes)
}
// JsonCommon json格式化
func JsonCommon(str string) (retVal string) {
temp, _ := json.Marshal(str)
_ = json.Unmarshal(temp, &retVal)
return retVal
}

View File

@@ -1,259 +0,0 @@
package im
import (
"errors"
"fmt"
"net/http"
"time"
"git.rosy.net.cn/jx-callback/business/jxutils"
"git.rosy.net.cn/jx-callback/globals"
"github.com/gorilla/websocket"
)
func Init() {
//初始化
ToClientChan = make(chan clientInfo, 1000)
//写入全局变量
Setup()
//建立长链接
jxutils.CallMsgHandlerAsync(func() {
MtInit()
}, "MtInit:"+RandString())
//启动定时器
PingTimer()
jxutils.CallMsgHandlerAsync(func() {
WriteMessage()
}, "WriteMessage:"+RandString())
jxutils.CallMsgHandlerAsync(func() {
Manager.Start()
}, "Manager Start:"+RandString())
}
func Run(w http.ResponseWriter, r *http.Request) {
conn, err := (&websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
// 允许所有CORS跨域请求
CheckOrigin: func(r *http.Request) bool {
return true
},
}).Upgrade(w, r, nil)
if err != nil {
globals.SugarLogger.Debugf("upgrade error: %v", err)
http.NotFound(w, r)
return
}
//设置读取消息大小上线
conn.SetReadLimit(maxMessageSize)
clientID := ""
if temp := r.Header.Get("Clientid"); len(temp) == 0 {
clientID = RandString()
} else {
clientID = temp
}
clientSocket := NewClient(clientID, conn, ClientTypeJx)
//读取客户端消息
clientSocket.Read()
if err = ConnRender(conn, renderData{ClientId: clientID}); err != nil {
_ = conn.Close()
return
}
// 用户连接事件
Manager.Connect <- clientSocket
}
// PingTimer 定时器发送心跳
func PingTimer() {
go func() {
ticker := time.NewTicker(heartbeatInterval)
defer ticker.Stop()
for {
<-ticker.C
for clientId, conn := range Manager.AllClient() {
if conn.ClientType == ClientTypeJx {
if err := conn.Socket.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
Manager.DisConnect <- conn
fmt.Printf("发送心跳失败: %s 总连接数:%d", clientId, Manager.Count())
}
if err := ConnRender(conn.Socket, renderData{ClientId: clientId}); err != nil {
return
}
globals.SugarLogger.Debugf("PingTimer jx clientID ")
} else {
fmt.Printf("PingTimer mt心跳conn%s,%s", conn.ClientId, conn.ClientType)
if err := conn.Socket.WriteMessage(websocket.TextMessage, []byte(HeartCheckMsg)); err != nil {
fmt.Printf("PingTimer mtHeartBeat err:%v", err)
//对美团重新建立连接
MtInit()
}
}
}
}
}()
}
// WriteMessage 监听并发送给客户端信息
func WriteMessage() {
i := 0
for {
clientInfo := <-ToClientChan
//广播发送通知所有京西客户端
i++
//fmt.Printf("WriteMessage clientInfo=%s i=%d\n", utils.Format4Output(clientInfo, false), i)
if Manager.AllClient() != nil {
for _, conn := range Manager.AllClient() {
if conn.ClientType == ClientTypeJx { //只发送给京西
fmt.Printf("WriteMessage conn.ClientId=%s\n", conn.ClientId)
if err := Render(conn.Socket, clientInfo.MessageId, clientInfo.Code, clientInfo.Msg, clientInfo.Data); err != nil {
Manager.DisConnect <- conn
}
}
}
} else {
globals.SugarLogger.Debugf("无客户端连接,请检查")
return
}
}
}
// Start 管道处理程序
func (manager *ClientManager) Start() {
for {
select {
case client := <-manager.Connect:
// 建立连接事件
manager.EventConnect(client)
case conn := <-manager.DisConnect:
// 断开连接事件
manager.EventDisconnect(conn)
}
}
}
//从客户端读取数据
func (c *Client) Read() {
go func() {
for {
messageType, msg, err := c.Socket.ReadMessage()
//temp := string(msg)
//fmt.Print(temp)
if err != nil {
if messageType == -1 && websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
Manager.DisConnect <- c
return
} else if messageType != websocket.PingMessage {
return
}
}
fmt.Printf("Client Read:receive: %s\n", string(msg))
//SendToVendor(msg)
}
}()
}
// 以下为连接事件操作*******************************************
// EventConnect 建立连接事件
func (manager *ClientManager) EventConnect(client *Client) {
manager.AddClient(client)
}
// EventDisconnect 断开连接事件
func (manager *ClientManager) EventDisconnect(client *Client) {
//关闭连接
_ = client.Socket.Close()
manager.DelClient(client)
//标记销毁
client.IsDeleted = true
client = nil
}
//以下为客户端Client操作*******************************************
// NewClient 初始化Client
func NewClient(clientId string, socket *websocket.Conn, clientType string) *Client {
return &Client{
ClientId: clientId,
Socket: socket,
ClientType: clientType,
ConnectTime: uint64(time.Now().Unix()),
IsDeleted: false,
}
}
// AddClient 添加客户端
func (manager *ClientManager) AddClient(client *Client) {
manager.ClientIdMapLock.Lock()
defer manager.ClientIdMapLock.Unlock()
manager.ClientIdMap[client.ClientId] = client
}
// DelClient 删除客户端
func (manager *ClientManager) DelClient(client *Client) {
manager.delClientIdMap(client.ClientId)
}
// 删除clientIdMap
func (manager *ClientManager) delClientIdMap(clientId string) {
manager.ClientIdMapLock.Lock()
defer manager.ClientIdMapLock.Unlock()
delete(manager.ClientIdMap, clientId)
}
// GetByClientId 通过clientId获取client
func (manager *ClientManager) GetByClientId(clientId string) (*Client, error) {
manager.ClientIdMapLock.RLock()
defer manager.ClientIdMapLock.RUnlock()
if client, ok := manager.ClientIdMap[clientId]; !ok {
return nil, errors.New("客户端不存在")
} else {
return client, nil
}
}
// AllClient 获取所有的客户端
func (manager *ClientManager) AllClient() map[string]*Client {
manager.ClientIdMapLock.RLock()
defer manager.ClientIdMapLock.RUnlock()
return manager.ClientIdMap
}
//与客户端的交互操作*************************
// NewClientManager 初始化客户端管理
func NewClientManager() (clientManager *ClientManager) {
clientManager = &ClientManager{
ClientIdMap: make(map[string]*Client),
Connect: make(chan *Client, 10000),
DisConnect: make(chan *Client, 10000),
Groups: make(map[string][]string, 100),
SystemClients: make(map[string][]string, 100),
}
return
}
// Count 获取客户端数量
func (manager *ClientManager) Count() int {
manager.ClientIdMapLock.RLock()
defer manager.ClientIdMapLock.RUnlock()
return len(manager.ClientIdMap)
}

View File

@@ -1,6 +1,12 @@
package mtwm
import (
"encoding/json"
"net/http"
"strings"
"git.rosy.net.cn/jx-callback/business/partner/purchase/im"
"git.rosy.net.cn/baseapi/platformapi/mtwmapi"
"git.rosy.net.cn/jx-callback/business/jxutils"
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
@@ -8,8 +14,6 @@ import (
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/model/dao"
"git.rosy.net.cn/jx-callback/globals"
"net/http"
"strings"
)
// 美团回调接口
@@ -115,3 +119,14 @@ func GetMsgCallBackUrl(msgType, appId string) string {
}
return interfaceUrl
}
// OnImMsg im消息回调
func OnImMsg(msg *mtwmapi.ImCallbackMsg) (response *mtwmapi.CallbackResponse) {
if str, err := json.Marshal(msg.PushContent); err == nil {
err = im.ReadMsgFromVendor(model.VendorIDMTWM, "", str)
if err != nil {
globals.SugarLogger.Debugf("OnImMsg提示%v", err)
}
}
return mtwmapi.SuccessResponse
}

View File

@@ -13,15 +13,6 @@ type IMController struct {
web.Controller
}
// @Title IM初始化长链接
// @Description IM初始化长链接
// @Success 200 {object} controllers.CallResult
// @Failure 200 {object} controllers.CallResult
// @router /StartWebSocket [get]
func (c *IMController) StartWebSocket() {
im.Run(c.Ctx.ResponseWriter, c.Ctx.Request)
}
// @Title IM获取门店用户聊天列表
// @Description IM获取门店用户聊天列表
// @Param token header string true "认证token"
@@ -73,24 +64,24 @@ func (c *IMController) SetImMsgRead() {
})
}
// @Title 向平台商发送信息
// @Description 向平台商发送信息
// @Title 向平台商发送信息(https方式)
// @Description 向平台商发送信息(https方式)
// @Param token header string true "认证token"
// @Param sendData formData string true "平台商消息结构体"
// @Success 200 {object} controllers.CallResult
// @Failure 200 {object} controllers.CallResult
// @router /SendToVendor [post]
func (c *IMController) SendToVendor() {
c.callSendToVendor(func(params *tImSendToVendorParams) (retVal interface{}, errCode string, err error) {
// @router /SendToVendorV2 [post]
func (c *IMController) SendToVendorV2() {
c.callSendToVendorV2(func(params *tImSendToVendorV2Params) (retVal interface{}, errCode string, err error) {
var sendData im.SendData
b := bytes.NewBufferString(params.SendData)
decoder := json.NewDecoder(b)
_ = decoder.Decode(&sendData)
fmt.Println(sendData)
if dataStr, err := json.Marshal(sendData); err == nil {
im.SendToVendor(dataStr)
if err = decoder.Decode(&sendData); err == nil {
fmt.Println(sendData)
if err = im.SendVendorV2(sendData); err != nil {
return nil, "", err
}
}
return nil, "", err
return nil, "", nil
})
}

View File

@@ -87,3 +87,16 @@ func (c *MtwmController) SkuDelete() {
func (c *MtwmController) StoreBind() {
c.onCallbackMsg(mtwmapi.MsgTypeStoreBind)
}
func (c *MtwmController) IMCallback() {
c.Data["json"] = mtwmapi.Err2CallbackResponse(nil, "")
msg, callbackResponse := api.MtwmAPI.GetIMCallbackMsg(c.Ctx.Request)
if callbackResponse == nil {
callbackResponse = mtwm.OnImMsg(msg)
if callbackResponse == nil {
callbackResponse = mtwmapi.Err2CallbackResponse(nil, "")
}
}
c.Data["json"] = callbackResponse
c.ServeJSON()
}

View File

@@ -131,7 +131,7 @@ var (
LogisticsApi *ali_logistics_query.API // 阿里云提供获取物流订单的配送信息
KuaiShouApi *kuaishou_mini.API // 快手平台
UniAppApi *uinapp.API // uinapp 消息通知
TaoVegetableApi tao_vegetable.API // 淘菜菜
TaoVegetableApi *tao_vegetable.API // 淘菜菜
)
func init() {

View File

@@ -8,8 +8,6 @@ import (
"os"
"time"
"git.rosy.net.cn/jx-callback/business/partner/purchase/im"
"git.rosy.net.cn/jx-callback/business/enterprise"
"git.rosy.net.cn/jx-callback/business/partner/purchase/jdshop"
@@ -93,13 +91,6 @@ func Init() {
misc.Init()
enterprise.Init() // 初始化enterprise key
im.Init() //初始化ws连接
//test
//mux := http.NewServeMux()
//mux.HandleFunc("/v2/im/SendToVendor", im.Run)
//go http.ListenAndServe(":8082", mux)
}
// 返回true表示非运行服务

View File

@@ -4395,20 +4395,12 @@ func init() {
Params: nil})
web.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:IMController"] = append(web.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:IMController"],
web.ControllerComments{
Method: "SendToVendor",
Router: `/SendToVendor`,
Method: "SendToVendorV2",
Router: `/SendToVendorV2`,
AllowHTTPMethods: []string{"post"},
MethodParams: param.Make(),
Filters: nil,
Params: nil})
web.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:IMController"] = append(web.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:IMController"],
web.ControllerComments{
Method: "StartWebSocket",
Router: `/StartWebSocket`,
AllowHTTPMethods: []string{"get"},
MethodParams: param.Make(),
Filters: nil,
Params: nil})
//web.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:FnController"] = append(web.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:FnController"],
// web.ControllerComments{