This commit is contained in:
richboo111
2023-04-25 10:31:54 +08:00
parent 352bb15636
commit 0901cfc573
12 changed files with 784 additions and 90 deletions

View File

@@ -1,8 +1,50 @@
package mtwmapi
const (
MsgSourceStore = 1 //商家
MsgSourceUser = 2 //用户
MsgTypeText = 1 //文字
MsgTypePic = 2 //图片
MsgTypeVoice = 3 //语音
MsgTypeGoodsCard = 4 //商品卡片
MsgTypeOrderCard = 5 //订单卡片
)
//单聊信息体
type SingleChat struct {
AppID int `json:"app_id"` //应用标识
AppPoiCode string `json:"app_poi_code"` //门店标识
Cts int `json:"cts"` //消息发送时间,10位时间戳
MsgContent string `json:"msg_content"` //消息内容
MsgID int `json:"msg_id"` //消息id确保消息唯一性发送消息时为三方的消息id接收消息时为美团的消息id
MsgSource int `json:"msg_source"` //消息发送方 商家1用户2
MsgType int `json:"msg_type"` //消息类型: 文字-1; 图片-2;语音-3注意b2c不支持语音; 商品卡片-4发送商品卡片类型则不关注msg_content; 订单卡片-5订单卡片类型商家只能接收消息不支持给用户发送消息只支持单聊
OpenUserID int `json:"open_user_id"` //用户id
OrderID int `json:"order_id"` // 订单id
AppSpuCodes string `json:"app_spu_codes"` //开放平台侧商品标识(无须加密)
}
//获取长链接token返回参数
type GetConnTokenResp struct {
ConnectionToken string `json:"connectionToken"` //建立长连接的token
UserCount int `json:"userCount"` //30分钟内消息发送失败的用户数
AppKey string `json:"appKey"` //建立长连接的appkey
}
//获取长连接的token
//https://developer.waimai.meituan.com/home/docDetail/461
func (a *API) GetConnectionToken() (err error) {
_, err = a.AccessAPI("wm/IM/getConnectionToken", false, nil)
func (a *API) GetConnectionToken() (retVal interface{}, err error) {
retVal, err = a.AccessAPI("wm/IM/getConnectionToken", false, nil)
return retVal, err
}
//设置消息已读
//https://open-shangou.meituan.com/home/docDetail/465
func (a *API) MsgRead(appPoiCode string, msgID, openUserID int) error {
_, err := a.AccessAPI("/wm/IM/msgRead", false, map[string]interface{}{
"app_poi_code": appPoiCode,
"msg_id": msgID,
"open_user_id": openUserID,
})
return err
}

View File

@@ -0,0 +1,171 @@
package mtwmapi
import (
"bytes"
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"encoding/base64"
"errors"
"io"
"log"
"sync"
)
//ip配置信息
type global struct {
LocalHost string //本机内网IP
RemoteHost string //远程端IP
RemotePort string //远程端口
ServerList map[string]string
ServerListLock sync.RWMutex
}
type commonConf struct {
HttpPort string
RPCPort string
Cluster bool
CryptoKey string
}
var (
GlobalSetting = &global{}
CommonSetting = &commonConf{
HttpPort: "6000",
RPCPort: "7000",
Cluster: false,
CryptoKey: "Adba723b7fe06819",
}
)
/*
以下为clientID相关逻辑
*/
//对称加密IP和端口当做clientId
func GenClientId() string {
raw := []byte(GlobalSetting.LocalHost + ":" + CommonSetting.RPCPort)
//raw := []byte(hostStr)
str, err := Encrypt(raw, []byte(CommonSetting.CryptoKey))
if err != nil {
log.Fatal(err)
}
return str
}
func Encrypt(rawData, key []byte) (string, error) {
data, err := aesCBCEncrypt(rawData, key)
if err != nil {
return "", err
}
return base64.StdEncoding.EncodeToString(data), nil
}
//AES加密
func aesCBCEncrypt(rawData, key []byte) ([]byte, error) {
block, err := aes.NewCipher(key)
if err != nil {
return []byte{}, err
}
//填充原文
blockSize := block.BlockSize()
rawData = pKCS7Padding(rawData, blockSize)
//初始向量IV必须是唯一但不需要保密
cipherText := make([]byte, blockSize+len(rawData))
//block大小 16
iv := cipherText[:blockSize]
if _, err := io.ReadFull(rand.Reader, iv); err != nil {
return []byte{}, err
}
//block大小和初始向量大小一定要一致
mode := cipher.NewCBCEncrypter(block, iv)
mode.CryptBlocks(cipherText[blockSize:], rawData)
return cipherText, nil
}
func DecryptDESECB(d, key []byte) string {
data, err := base64.StdEncoding.DecodeString(string(d))
if err != nil {
return ""
}
block, err := aes.NewCipher(key)
if err != nil {
return ""
}
bs := block.BlockSize()
if len(data)%bs != 0 {
return ""
}
out := make([]byte, len(data))
dst := out
for len(data) > 0 {
block.Decrypt(dst, data[:bs])
data = data[bs:]
dst = dst[bs:]
}
out = PKCS5UnPadding(out)
return string(out)
}
func PKCS5UnPadding(origData []byte) []byte {
length := len(origData)
unpadding := int(origData[length-1])
return origData[:(length - unpadding)]
}
func Decrypt(rawData string, key []byte) (string, error) {
data, err := base64.StdEncoding.DecodeString(rawData)
if err != nil {
return "", err
}
dnData, err := aesCBCDncrypt(data, key)
if err != nil {
return "", err
}
return string(dnData), nil
}
//AES解密
func aesCBCDncrypt(encryptData, key []byte) ([]byte, error) {
block, err := aes.NewCipher(key)
if err != nil {
return []byte{}, err
}
blockSize := block.BlockSize()
if len(encryptData) < blockSize {
return []byte{}, errors.New("ciphertext too short")
}
iv := encryptData[:blockSize]
encryptData = encryptData[blockSize:]
if len(encryptData)%blockSize != 0 {
return []byte{}, errors.New("ciphertext is not a multiple of the block size")
}
mode := cipher.NewCBCDecrypter(block, iv)
mode.CryptBlocks(encryptData, encryptData)
//解填充
encryptData, err = pKCS7UnPadding(encryptData)
return encryptData, err
}
func pKCS7Padding(ciphertext []byte, blockSize int) []byte {
padding := blockSize - len(ciphertext)%blockSize
padText := bytes.Repeat([]byte{byte(padding)}, padding)
return append(ciphertext, padText...)
}
func pKCS7UnPadding(origData []byte) ([]byte, error) {
length := len(origData)
unPadding := int(origData[length-1])
if length-unPadding < 0 || length-unPadding > len(origData) {
return nil, errors.New("unPadding error")
}
return origData[:(length - unPadding)], nil
}

View File

@@ -1,13 +1,267 @@
package mtwmapi
import (
"encoding/json"
"fmt"
"git.rosy.net.cn/baseapi/utils"
"github.com/gazeboxu/mapstructure"
"github.com/go-redis/redis"
"github.com/gorilla/websocket"
"sync"
"testing"
"time"
)
const (
MTIMPushUrl = "wss://wpush.meituan.com/websocket"
TestAppID = "589_WMOPEN"
TestToken = "wo589i4VsZHFH2fh4uVsr6Dtc3k6vG8Xu0vxpreBQFy6QAvg"
TestMTIMPushUrl = "wss://wpush.meituan.com/websocket/589_WMOPEN/wo589i4VsZHFH2fh4uVsr6Dtc3k6vG8Xu0vxpreBQFy6QAvg"
)
type ClientManager struct {
ClientIdMap map[string]*Client // 全部的连接
ClientIdMapLock sync.RWMutex // 读写锁
Connect chan *Client // 连接处理
DisConnect chan *Client // 断开连接处理
GroupLock sync.RWMutex
Groups map[string][]string
//SystemClientsLock sync.RWMutex
//SystemClients map[string][]string
Clients map[string]*Client // 保存连接
Accounts map[string][]string // 账号和连接关系,map的key是账号id即AccountId这里主要考虑到一个账号多个连接
mu *sync.Mutex
}
var Manager = NewClientManager()
func NewClientManager() (clientManager *ClientManager) {
clientManager = &ClientManager{
Accounts: make(map[string][]string),
ClientIdMap: make(map[string]*Client, 100),
Connect: make(chan *Client, 10000),
DisConnect: make(chan *Client, 10000),
mu: new(sync.Mutex),
}
return
}
var RegisterChan = make(chan *Client, 100)
type Client struct {
ID string // 连接ID
AccountId string // 账号id, 一个账号可能有多个连接
Socket *websocket.Conn // 连接
HeartbeatTime int64 // 前一次心跳时间
}
var rdb = redis.NewClient(&redis.Options{
//Addr: "www.jxc4.com:6379",
//Password: "",
Addr: "127.0.0.1:6379",
Password: "123456",
DB: 0,
})
//测试心跳
func TestHeartCheck(t *testing.T) {
//go func() {
// ticker := time.NewTicker(5 * time.Second)
// defer ticker.Stop()
// for {
// <-ticker.C
//发送心跳
conn, resp, err := websocket.DefaultDialer.Dial(TestMTIMPushUrl, nil)
fmt.Println(resp, err)
err1 := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
for {
_, msg, err := conn.ReadMessage()
if err != nil {
break
}
fmt.Printf("%s receive: %s\n", conn.RemoteAddr(), string(msg))
}
fmt.Println(err1)
//}
//}()
}
func TestGetConnectionToken(t *testing.T) {
err := api.GetConnectionToken()
resp, err := api.GetConnectionToken()
if err != nil {
t.Fatal(err)
}
// t.Log(utils.Format4Output(result, false))
retVal := GetConnTokenResp{}
err = mapstructure.Decode(resp, &retVal)
fmt.Println(err)
fmt.Println(utils.Format4Output(retVal, false))
}
//解密测试
func TestAesCBCDecrypt(t *testing.T) {
secret := "a81eb3df418d83d6a1a4b7c572156d2f"
key := secret[:16]
str := "qodhoVd4IGtgPKrvYwq6QrzBecJZkeSUPYR88iGRUsCRFmCFxDHpUhqsbBztNXQx"
//str := "Vv+Y/K8vfS42W+P7xq26aIb6uoaG/nL0ZoMMXpitc5QQ3XJm3Roh10NuSoojYrG/3JZwbzgtYA+kBvodoY2eJV00f9MBY+kLkxToP+aSofsYva9tHbipvjVtexebc+eP7aQMtzbwU4BNNnuRG6e7TkXP+BLdtiGsyvHolGfky+p2fZgWes9R6JIxkuRCXW/yBhUo8F+wWCZ2YQl/szp5lHJ3cmneD6cwem36E0FBcvxZNB9an4pRkBrqi1p43V8QBLO719oXkQ+dqTqJMi1/xDSBrCDYN8QORnARP8+j1oDuqE34Kklcse4WL9rwTJ2sOmOu/O2h6Gx3ZaFaMaWRXBDYv8JpzTZjCbRrLSENlEHTof29BmvXTJ0QZ7qi6iAD"
data, err := Decrypt(str, []byte(key))
//data, err := DecryptAES(key, str)
fmt.Println(data)
fmt.Println(err)
}
var wsList []*websocket.Conn
func sendmsg() {
for _, conn := range wsList {
if err := conn.WriteMessage(websocket.TextMessage, []byte("~#HHHBBB#~")); err != nil {
fmt.Printf("%s", err) //"use of closed network connection"
}
}
}
func TestPut(t *testing.T) {
fmt.Println(wsList)
}
func TestWebSocketClient(t *testing.T) {
//发送webSocket请求
conn, resp, err := websocket.DefaultDialer.Dial(TestMTIMPushUrl, nil)
if err != nil {
fmt.Printf("连接失败:%v", err)
}
fmt.Printf("响应:%s", fmt.Sprint(resp))
//wsList = append(wsList, conn)
//关闭
conn.SetCloseHandler(func(code int, text string) error {
fmt.Printf("WebSocket connection closed with code %d and text: %s\n", code, text)
return nil
})
defer func(conn *websocket.Conn) {
err := conn.Close()
if err != nil {
return
}
}(conn)
//赋入全局变量
//Default(conn)
//生成clientID
clientID := GenClientId()
//创建实例连接
client := &Client{
ID: clientID,
//AccountId:conn. ,
Socket: conn,
HeartbeatTime: time.Now().Unix(),
}
//rdb.Set("testPush", client, 0)
//注册到连接管理
RegisterChan <- client
//todo 暂时不确定放哪
//go Start()
done := make(chan SingleChat)
//err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
err = conn.WriteMessage(websocket.TextMessage, []byte("~#HHHBBB#~"))
if err != nil {
fmt.Println(err)
}
for {
_, msg, err := conn.ReadMessage()
if err != nil {
//log.Fatal(err)
break
}
fmt.Printf("%s receive: %s\n", conn.RemoteAddr(), string(msg))
}
<-done
}
func TestPUSH(t *testing.T) {
key := "589:7954977:10"
//rdb.RPush(key, "1111111111")
//rdb.RPush(key, "{\"vendorID\":10,\"userID\":11158569333,\"NewMessageNum\":3,\"latestMsg\":\"hhhhhhhhhhh\",\"latestTime\":1681983980}")
//rdb.RPush(key, "{\"vendorID\":10,\"userID\":11158569333,\"NewMessageNum\":3,\"latestMsg\":\"oooooooooo\",\"latestTime\":1681983980}")
//rdb.RPush(key, "2222222222222")
rdb.RPush(key, "{\"vendorID\":10,\"userID\":11158569333,\"NewMessageNum\":4,\"latestMsg\":\"成功插入新数据看下cnt\",\"latestTime\":1681983980}")
rdb.RPush(key, "{\"vendorID\":10,\"userID\":11158569333,\"NewMessageNum\":5,\"latestMsg\":\"成功插入新数据看下cnt\",\"latestTime\":1681983980}")
}
//用户消息列表
type UserMessageList struct {
VendorID int `json:"vendorID"` //平台品牌 10-美团 11-饿了么
UserID int `json:"userID"` //用户ID
NewMessageNum int `json:"NewMessageNum"` //新消息数量
LatestMsg string `json:"latestMsg"` //最新一条消息
LatestTime int `json:"latestTime"` //最新一条消息发送时间
}
func TestNewRedis(t *testing.T) {
var flag = 11158569333
var key = "589:7954977:10"
s2 := rdb.LRange(key, 0, -1).Val()
fmt.Printf("before len %d\n", len(s2))
fmt.Printf("before ans %s\n", s2)
cnt := 0
n := rdb.Exists(key).Val()
if n > 0 {
for i := 0; i < len(s2); i++ {
v := UserMessageList{}
_ = json.Unmarshal([]byte(s2[i]), &v)
if v.UserID == flag {
rdb.LSet(key, int64(i), "del")
rdb.LRem(key, 0, "del")
s2 = append(s2[:i], s2[i+1:]...)
i--
if v.NewMessageNum == 0 { //目前为首条
cnt++ //赋值1
} else {
cnt = v.NewMessageNum
}
}
}
}
fmt.Printf("after cnt %d\n", cnt)
fmt.Printf("after len %d\n", len(s2))
fmt.Printf("after ans %s\n", s2)
//存入flag数据
ans := UserMessageList{
VendorID: 10,
UserID: 11158569333,
NewMessageNum: cnt,
LatestMsg: "成功插入新数据看下cnt",
LatestTime: 1681983980,
}
param, _ := json.Marshal(ans)
rdb.RPush(key, param)
}
// 根据账号获取连接
func TestGetClient(t *testing.T) {
accountId := "QW+r2FtsRKGGLJnlgyDNlChzcKcSZ8Kfgh0qw//ONuQCDKzky4x+nlbnx3k1JX13"
clients := make([]*Client, 0)
Manager.mu.Lock()
defer Manager.mu.Unlock()
if len(Manager.Accounts[accountId]) > 0 {
for _, clientId := range Manager.Accounts[accountId] {
if c, ok := Manager.Clients[clientId]; ok {
clients = append(clients, c)
}
}
}
fmt.Printf(utils.Format4Output(clients, false))
}
func TestMal(t *testing.T) {
}