This commit is contained in:
richboo111
2023-04-26 09:57:25 +08:00
parent a555ab70e5
commit 140fc00c68
10 changed files with 1055 additions and 1053 deletions

View File

@@ -12,13 +12,13 @@ type ICacher interface {
GetAs(key string, ptr interface{}) error
Keys(prefix string) ([]string, error)
//FlushDB() error
//Incr(key string) error
//LRange(key string) (retVal []string)
//Exists(keys ...string) (int64, error)
//RPush(key string, value interface{}) error
//Expire(key string, expiration time.Duration) error
//LRem(key string, count int, value interface{}) error
//LSet(key string, index int, value interface{}) error
//ExpireResult(key string, expiration time.Duration) (bool, error)
FlushDB() error
Incr(key string) error
LRange(key string) (retVal []string)
Exists(keys ...string) (int64, error)
RPush(key string, value interface{}) error
Expire(key string, expiration time.Duration) error
LRem(key string, count int, value interface{}) error
LSet(key string, index int, value interface{}) error
ExpireResult(key string, expiration time.Duration) (bool, error)
}

View File

@@ -74,46 +74,46 @@ func (c *Cacher) Keys(prefix string) ([]string, error) {
return c.client.Keys(prefix + "*").Result()
}
//func (c *Cacher) RPush(key string, value interface{}) error {
// return c.client.RPush(key, value).Err()
//}
//
//func (c *Cacher) FlushDB() error {
// return c.client.FlushDB().Err()
//}
//
//func (c *Cacher) Expire(key string, expiration time.Duration) error {
// return c.client.Expire(key, expiration).Err()
//}
//
//func (c *Cacher) ExpireResult(key string, expiration time.Duration) (bool, error) {
// ok, err := c.client.Expire(key, expiration).Result()
// if err != nil {
// return false, err
// }
// return ok, nil
//}
//
//func (c *Cacher) Exists(keys ...string) (int64, error) {
// ret := c.client.Exists(keys...)
// return ret.Val(), ret.Err()
//}
//
//func (c *Cacher) Incr(key string) error {
// return c.client.Incr(key).Err()
//}
//
//func (c *Cacher) LRange(key string) (retVal []string) {
// if c.client.LLen(key).Val() > 0 {
// retVal = c.client.LRange(key, 0, -1).Val()
// }
// return retVal
//}
//
//func (c *Cacher) LSet(key string, index int, value interface{}) error {
// return c.client.LSet(key, int64(index), value).Err()
//}
//
//func (c *Cacher) LRem(key string, count int, value interface{}) error {
// return c.client.LRem(key, int64(count), value).Err()
//}
func (c *Cacher) RPush(key string, value interface{}) error {
return c.client.RPush(key, value).Err()
}
func (c *Cacher) FlushDB() error {
return c.client.FlushDB().Err()
}
func (c *Cacher) Expire(key string, expiration time.Duration) error {
return c.client.Expire(key, expiration).Err()
}
func (c *Cacher) ExpireResult(key string, expiration time.Duration) (bool, error) {
ok, err := c.client.Expire(key, expiration).Result()
if err != nil {
return false, err
}
return ok, nil
}
func (c *Cacher) Exists(keys ...string) (int64, error) {
ret := c.client.Exists(keys...)
return ret.Val(), ret.Err()
}
func (c *Cacher) Incr(key string) error {
return c.client.Incr(key).Err()
}
func (c *Cacher) LRange(key string) (retVal []string) {
if c.client.LLen(key).Val() > 0 {
retVal = c.client.LRange(key, 0, -1).Val()
}
return retVal
}
func (c *Cacher) LSet(key string, index int, value interface{}) error {
return c.client.LSet(key, int64(index), value).Err()
}
func (c *Cacher) LRem(key string, count int, value interface{}) error {
return c.client.LRem(key, int64(count), value).Err()
}

View File

@@ -1,7 +1,11 @@
package ebai
import (
"encoding/json"
"git.rosy.net.cn/baseapi/platformapi/ebaiapi"
"git.rosy.net.cn/jx-callback/business/partner/purchase/im"
"git.rosy.net.cn/jx-callback/globals/api"
)
const (
@@ -10,9 +14,9 @@ const (
// OnImMessage 用户/骑手 发送/已读消息 回调
func (p *PurchaseHandler) OnImMessage(msg *ebaiapi.CallbackMsg) (response *ebaiapi.CallbackResponse) {
//str, err := json.Marshal(msg.Data)
str, err := json.Marshal(msg.Data)
//im.ReadMsgFromVendor(IMVendorIDELM, msg.Source, str)
return nil
//return api.EbaiAPI.Err2CallbackResponse(msg.Cmd, err, nil)
im.ReadMsgFromVendor(IMVendorIDELM, msg.Source, str)
//return nil
return api.EbaiAPI.Err2CallbackResponse(msg.Cmd, err, nil)
}

View File

@@ -1,356 +1,355 @@
package im
//
//import (
// "encoding/json"
// "errors"
// "fmt"
// "net/http"
//
// "git.rosy.net.cn/jx-callback/globals"
//
// "git.rosy.net.cn/baseapi/platformapi/ebaiapi"
//
// "git.rosy.net.cn/baseapi/platformapi/mtwmapi"
// "git.rosy.net.cn/baseapi/utils"
// push "git.rosy.net.cn/jx-callback/business/jxutils/unipush"
// "git.rosy.net.cn/jx-callback/globals/api"
// "github.com/gorilla/websocket"
//)
//
//// SendToVendor 向平台发消息
//func SendToVendor(msg []byte) {
// var (
// w http.ResponseWriter
// sendData SendData
// err error
// elmAppID = api.EbaiAPI.GetSource()
// )
//
// //解析数据
// if err = json.Unmarshal(msg, &sendData); err != nil {
// return
// }
//
// //存储数据
// ReadMsgFromClient(sendData.VendorID, elmAppID, sendData.Data)
//
// //发送信息
// if sendData.VendorID == VendorIDMT {
// temp, _ := json.Marshal(sendData.Data)
// Send(temp)
// }
// if sendData.VendorID == VendorIDELM {
// param := sendData.Data.(ebaiapi.BusinessSendMsgReq)
// if err := api.EbaiAPI.BusinessSendMsg(&param); err != nil {
// globals.SugarLogger.Debugf("elm发送信息错误%v", err)
// return
// }
// }
//
// if err != nil {
// ClientRender(w, Fail, FailMsg, map[string]string{
// "errMsg": fmt.Sprintf("%v", err),
// })
// } else {
// ClientRender(w, SuccessCode, SuccessMsg, map[string]interface{}{
// "vendorID": sendData.VendorID,
// "msg": "ok",
// })
// }
// return
//}
//
//func Send(data []byte) {
// //生成完整url
// fullUrl := GenFullUrl() //clientID暂时不用
//
// conn, resp, err := websocket.DefaultDialer.Dial(fullUrl, nil)
// if err != nil || resp.StatusCode != 101 {
// fmt.Printf("连接失败:%v http响应不成功", err)
// }
// //关闭
// defer func(conn *websocket.Conn) {
// err := conn.Close()
// if err != nil {
// return
// }
// }(conn)
//
// err = conn.WriteMessage(websocket.TextMessage, data)
// if err != nil {
// fmt.Println(err)
// }
//
// for {
// _, msg, err := conn.ReadMessage()
// if err != nil {
// break
// } else {
// temp := string(msg)
// if temp != HeartCheckSuccess {
// ReadMsgFromVendor(VendorIDMT, "", msg)
// }
// }
// fmt.Printf("%s receive: %s\n", conn.RemoteAddr(), string(msg))
// }
//}
//
//// ReadMsgFromClient 存储客户端发送的消息
//func ReadMsgFromClient(vendorID int, elmAppID string, msg interface{}) {
// var (
// err error
// jxMsg = &JXMsg{}
// userList = &UserMessageList{}
// )
//
// data, err := json.Marshal(msg)
// if err != nil {
// return
// }
//
// if vendorID == VendorIDMT {
// var MtSingleChat = mtwmapi.SingleChat{}
// err = json.Unmarshal(data, &MtSingleChat)
// jxMsg = &JXMsg{
// SendType: SendTypeJx,
// Data: MtSingleChat,
// }
// userList = &UserMessageList{
// VendorID: VendorIDMT,
// UserID: utils.Int2Str(MtSingleChat.OpenUserID),
// LatestMsg: MtSingleChat.MsgContent,
// LatestTime: MtSingleChat.Cts,
// }
// }
// if vendorID == VendorIDELM {
// var ElmData = ebaiapi.ImMessageSend{}
// err = json.Unmarshal(data, &ElmData)
// jxMsg = &JXMsg{
// SendType: SendTypeJx,
// Data: ElmData,
// }
// userList = &UserMessageList{
// VendorID: VendorIDMT,
// UserID: ElmData.PayLoad.GroupID,
// LatestMsg: ElmData.PayLoad.Content,
// LatestTime: int(ElmData.PayLoad.CreateTime),
// }
// }
//
// //1 存储详细聊天记录list
// if err = SetMessageDetail(jxMsg, vendorID, elmAppID); err != nil {
// return
// }
// //2 存储展示列表时单条数据
// if err = SetUserList(jxMsg, userList, vendorID, elmAppID); err != nil {
// return
// }
//}
//
//// ReadMsgFromVendor 读取数据并存储到redis
//func ReadMsgFromVendor(vendorID int, elmAppID string, msg []byte) {
// if string(msg) == "" {
// return
// }
// var (
// err error
// vendorStoreID string
// jxMsg = &JXMsg{}
// userList = &UserMessageList{}
// )
// if vendorID == VendorIDMT {
// var MtSingleChat = mtwmapi.SingleChat{}
// err = json.Unmarshal(msg, &MtSingleChat)
// jxMsg = &JXMsg{
// SendType: SendTypeMt,
// Data: MtSingleChat,
// }
// userList = &UserMessageList{
// VendorID: VendorIDMT,
// UserID: utils.Int2Str(MtSingleChat.OpenUserID),
// LatestMsg: MtSingleChat.MsgContent,
// LatestTime: MtSingleChat.Cts,
// }
// vendorStoreID = MtSingleChat.AppPoiCode
// }
// if vendorID == VendorIDELM {
// var ElmData = ebaiapi.ImMessageSend{}
// err = json.Unmarshal(msg, &ElmData)
// jxMsg = &JXMsg{
// SendType: SendTypeElm,
// Data: ElmData,
// }
// userList = &UserMessageList{
// VendorID: VendorIDMT,
// UserID: ElmData.PayLoad.GroupID,
// LatestMsg: ElmData.PayLoad.Content,
// LatestTime: int(ElmData.PayLoad.CreateTime),
// }
// }
//
// //1 存储详细聊天记录list
// if err = SetMessageDetail(jxMsg, vendorID, elmAppID); err != nil {
// return
// }
// //2 存储展示列表时单条数据
// if err = SetUserList(jxMsg, userList, vendorID, elmAppID); err != nil {
// return
// }
// //3 cid推送新消息
// err = PushMsgByCid(vendorStoreID, vendorID)
// //4 长链接通知给客户端
// ToClientChan <- clientInfo{Code: SuccessCode, Msg: fmt.Sprintf("%v", err), Data: jxMsg}
//}
//
//// PushMsgByCid 通过cid push用户
//func PushMsgByCid(vendorStoreID string, vendorID int) error {
// if err := push.NotifyImNewMessage(vendorStoreID, vendorID); err != nil {
// return err
// }
// return nil
//}
//
//// SetMessageDetail 赋值
////格式 AppID:AppPoiCode:10:OpenUserID
//func SetMessageDetail(req *JXMsg, vendorID int, elmAppID string) error {
// //生成京西消息ID detail
// msgID := GenMsgDetailID(req, vendorID, elmAppID)
//
// data, _ := json.Marshal(req)
// err := rdb.RPush(msgID, string(data))
// ok, err := rdb.ExpireResult(msgID, ExpireTimeDay)
// if err != nil || !ok {
// return err
// }
// return nil
//}
//
//// SetUserList 赋值
////AppPoiCode:10 [userid1:{SingleChat},userid2:{}]
//func SetUserList(jxMsg *JXMsg, userList *UserMessageList, vendorID int, elmAppID string) error {
// //生成msgID
// msgID := GenMsgListID(jxMsg, vendorID, elmAppID)
//
// //获取未读消息条数并删除旧数据
// cnt, err := GetNewAndTrim(msgID, userList.UserID)
// if cnt > 0 {
// userList.NewMessageNum = cnt
// } else {
// userList.NewMessageNum = 1
// }
// //存储当前数据
// data, _ := json.Marshal(userList)
// err = rdb.RPush(msgID, string(data))
//
// ok, err := rdb.ExpireResult(msgID, ExpireTimeDay)
// if err != nil || !ok {
// return err
// }
// return nil
//}
//
//// GetNewAndTrim 获取未读条数并清除旧数据
//func GetNewAndTrim(key string, flag string) (cnt int, err error) {
// cnt = 0
// if n, err := rdb.Exists(key); n > 0 && err == nil {
// s2 := rdb.LRange(key)
// for i := 0; i < len(s2); i++ {
// v := UserMessageList{}
// _ = json.Unmarshal([]byte(s2[i]), &v)
// if v.UserID == flag {
// err = rdb.LSet(key, i, "del")
// err = rdb.LRem(key, 0, "del")
// s2 = append(s2[:i], s2[i+1:]...)
// i--
// if v.NewMessageNum == 0 { //目前为首条
// cnt++ //赋值1
// } else {
// cnt = v.NewMessageNum
// }
// }
// }
// } else {
// return 0, nil
// }
// return cnt, err
//}
//
//// GenMsgDetailID 生成查询详细聊天记录ID
//func GenMsgDetailID(jxMsg *JXMsg, vendorID int, elmAppID string) (msgID string) {
// if vendorID == VendorIDMT {
// var d1 = jxMsg.Data.(mtwmapi.SingleChat)
// msgID = utils.Int2Str(d1.AppID) + ":" + d1.AppPoiCode + ":10:" + utils.Int2Str(d1.OpenUserID)
// }
// if vendorID == VendorIDELM {
// var d2 = jxMsg.Data.(ebaiapi.ImMessageSend)
// msgID = elmAppID + ":" + d2.PlatformShopID + ":11:" + d2.PayLoad.GroupID
// }
// return msgID
//}
//
//// GenMsgListID 生成展示列表时单条数据ID部分
//func GenMsgListID(jxMsg *JXMsg, vendorID int, elmAppID string) (msgID string) {
// if vendorID == VendorIDMT {
// var d1 = jxMsg.Data.(mtwmapi.SingleChat)
// msgID = utils.Int2Str(d1.AppID) + ":" + d1.AppPoiCode + ":10"
// }
// if vendorID == VendorIDELM {
// var d2 = jxMsg.Data.(ebaiapi.ImMessageSend)
// msgID = elmAppID + ":" + d2.PlatformShopID + ":11"
// }
// return msgID
//}
//
//// GetImUserList 获取门店用户聊天列表
//func GetImUserList(req []RelInfo) (retVal []interface{}, err error) {
// if len(req) == 0 {
// return nil, errors.New("msgID不允许为空")
// }
// var keys []string
// for _, i := range req {
// key := i.AppID + ":" + i.VendorStoreID + ":" + i.VendorID
// keys = append(keys, key)
// }
// for _, j := range keys {
// temp := rdb.Get(j)
// retVal = append(retVal, temp)
// }
// return retVal, err
//}
//
//// GetImChatDetail 获取门店用户聊天详情
//func GetImChatDetail(req []UserRelInfo) (retVal []interface{}, err error) {
// if len(req) == 0 {
// return nil, errors.New("msgID不允许为空")
// }
// var keys []string
// for _, i := range req {
// key := i.AppID + ":" + i.VendorStoreID + ":" + i.VendorID + ":" + i.UserID
// keys = append(keys, key)
// }
// for _, j := range keys {
// temp := rdb.Get(j)
// retVal = append(retVal, temp)
// }
// return retVal, err
//}
//
//// SetJxMsgRead 设置jx消息已读 userID(美团openUserID;饿了么groupID)
//func SetJxMsgRead(appID, vendorStoreID, vendorID, userID string) error {
// key := appID + ":" + vendorStoreID + ":" + vendorID
// if n, err := rdb.Exists(key); n > 0 && err == nil {
// s2 := rdb.LRange(key)
// for i := 0; i < len(s2); i++ {
// v := UserMessageList{}
// _ = json.Unmarshal([]byte(s2[i]), &v)
// if v.UserID == userID {
// err = rdb.LSet(key, i, "del")
// err = rdb.LRem(key, 0, "del")
// s2 = append(s2[:i], s2[i+1:]...)
// i--
// }
// }
// }
// return nil
//}
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"git.rosy.net.cn/jx-callback/globals"
"git.rosy.net.cn/baseapi/platformapi/ebaiapi"
"git.rosy.net.cn/baseapi/platformapi/mtwmapi"
"git.rosy.net.cn/baseapi/utils"
push "git.rosy.net.cn/jx-callback/business/jxutils/unipush"
"git.rosy.net.cn/jx-callback/globals/api"
"github.com/gorilla/websocket"
)
// SendToVendor 向平台发消息
func SendToVendor(msg []byte) {
var (
w http.ResponseWriter
sendData SendData
err error
elmAppID = api.EbaiAPI.GetSource()
)
//解析数据
if err = json.Unmarshal(msg, &sendData); err != nil {
return
}
//存储数据
ReadMsgFromClient(sendData.VendorID, elmAppID, sendData.Data)
//发送信息
if sendData.VendorID == VendorIDMT {
temp, _ := json.Marshal(sendData.Data)
Send(temp)
}
if sendData.VendorID == VendorIDELM {
param := sendData.Data.(ebaiapi.BusinessSendMsgReq)
if err := api.EbaiAPI.BusinessSendMsg(&param); err != nil {
globals.SugarLogger.Debugf("elm发送信息错误%v", err)
return
}
}
if err != nil {
ClientRender(w, Fail, FailMsg, map[string]string{
"errMsg": fmt.Sprintf("%v", err),
})
} else {
ClientRender(w, SuccessCode, SuccessMsg, map[string]interface{}{
"vendorID": sendData.VendorID,
"msg": "ok",
})
}
return
}
func Send(data []byte) {
//生成完整url
fullUrl := GenFullUrl() //clientID暂时不用
conn, resp, err := websocket.DefaultDialer.Dial(fullUrl, nil)
if err != nil || resp.StatusCode != 101 {
fmt.Printf("连接失败:%v http响应不成功", err)
}
//关闭
defer func(conn *websocket.Conn) {
err := conn.Close()
if err != nil {
return
}
}(conn)
err = conn.WriteMessage(websocket.TextMessage, data)
if err != nil {
fmt.Println(err)
}
for {
_, msg, err := conn.ReadMessage()
if err != nil {
break
} else {
temp := string(msg)
if temp != HeartCheckSuccess {
ReadMsgFromVendor(VendorIDMT, "", msg)
}
}
fmt.Printf("%s receive: %s\n", conn.RemoteAddr(), string(msg))
}
}
// ReadMsgFromClient 存储客户端发送的消息
func ReadMsgFromClient(vendorID int, elmAppID string, msg interface{}) {
var (
err error
jxMsg = &JXMsg{}
userList = &UserMessageList{}
)
data, err := json.Marshal(msg)
if err != nil {
return
}
if vendorID == VendorIDMT {
var MtSingleChat = mtwmapi.SingleChat{}
err = json.Unmarshal(data, &MtSingleChat)
jxMsg = &JXMsg{
SendType: SendTypeJx,
Data: MtSingleChat,
}
userList = &UserMessageList{
VendorID: VendorIDMT,
UserID: utils.Int2Str(MtSingleChat.OpenUserID),
LatestMsg: MtSingleChat.MsgContent,
LatestTime: MtSingleChat.Cts,
}
}
if vendorID == VendorIDELM {
var ElmData = ebaiapi.ImMessageSend{}
err = json.Unmarshal(data, &ElmData)
jxMsg = &JXMsg{
SendType: SendTypeJx,
Data: ElmData,
}
userList = &UserMessageList{
VendorID: VendorIDMT,
UserID: ElmData.PayLoad.GroupID,
LatestMsg: ElmData.PayLoad.Content,
LatestTime: int(ElmData.PayLoad.CreateTime),
}
}
//1 存储详细聊天记录list
if err = SetMessageDetail(jxMsg, vendorID, elmAppID); err != nil {
return
}
//2 存储展示列表时单条数据
if err = SetUserList(jxMsg, userList, vendorID, elmAppID); err != nil {
return
}
}
// ReadMsgFromVendor 读取数据并存储到redis
func ReadMsgFromVendor(vendorID int, elmAppID string, msg []byte) {
if string(msg) == "" {
return
}
var (
err error
vendorStoreID string
jxMsg = &JXMsg{}
userList = &UserMessageList{}
)
if vendorID == VendorIDMT {
var MtSingleChat = mtwmapi.SingleChat{}
err = json.Unmarshal(msg, &MtSingleChat)
jxMsg = &JXMsg{
SendType: SendTypeMt,
Data: MtSingleChat,
}
userList = &UserMessageList{
VendorID: VendorIDMT,
UserID: utils.Int2Str(MtSingleChat.OpenUserID),
LatestMsg: MtSingleChat.MsgContent,
LatestTime: MtSingleChat.Cts,
}
vendorStoreID = MtSingleChat.AppPoiCode
}
if vendorID == VendorIDELM {
var ElmData = ebaiapi.ImMessageSend{}
err = json.Unmarshal(msg, &ElmData)
jxMsg = &JXMsg{
SendType: SendTypeElm,
Data: ElmData,
}
userList = &UserMessageList{
VendorID: VendorIDMT,
UserID: ElmData.PayLoad.GroupID,
LatestMsg: ElmData.PayLoad.Content,
LatestTime: int(ElmData.PayLoad.CreateTime),
}
}
//1 存储详细聊天记录list
if err = SetMessageDetail(jxMsg, vendorID, elmAppID); err != nil {
return
}
//2 存储展示列表时单条数据
if err = SetUserList(jxMsg, userList, vendorID, elmAppID); err != nil {
return
}
//3 cid推送新消息
err = PushMsgByCid(vendorStoreID, vendorID)
//4 长链接通知给客户端
ToClientChan <- clientInfo{Code: SuccessCode, Msg: fmt.Sprintf("%v", err), Data: jxMsg}
}
// PushMsgByCid 通过cid push用户
func PushMsgByCid(vendorStoreID string, vendorID int) error {
if err := push.NotifyImNewMessage(vendorStoreID, vendorID); err != nil {
return err
}
return nil
}
// SetMessageDetail 赋值
//格式 AppID:AppPoiCode:10:OpenUserID
func SetMessageDetail(req *JXMsg, vendorID int, elmAppID string) error {
//生成京西消息ID detail
msgID := GenMsgDetailID(req, vendorID, elmAppID)
data, _ := json.Marshal(req)
err := rdb.RPush(msgID, string(data))
ok, err := rdb.ExpireResult(msgID, ExpireTimeDay)
if err != nil || !ok {
return err
}
return nil
}
// SetUserList 赋值
//AppPoiCode:10 [userid1:{SingleChat},userid2:{}]
func SetUserList(jxMsg *JXMsg, userList *UserMessageList, vendorID int, elmAppID string) error {
//生成msgID
msgID := GenMsgListID(jxMsg, vendorID, elmAppID)
//获取未读消息条数并删除旧数据
cnt, err := GetNewAndTrim(msgID, userList.UserID)
if cnt > 0 {
userList.NewMessageNum = cnt
} else {
userList.NewMessageNum = 1
}
//存储当前数据
data, _ := json.Marshal(userList)
err = rdb.RPush(msgID, string(data))
ok, err := rdb.ExpireResult(msgID, ExpireTimeDay)
if err != nil || !ok {
return err
}
return nil
}
// GetNewAndTrim 获取未读条数并清除旧数据
func GetNewAndTrim(key string, flag string) (cnt int, err error) {
cnt = 0
if n, err := rdb.Exists(key); n > 0 && err == nil {
s2 := rdb.LRange(key)
for i := 0; i < len(s2); i++ {
v := UserMessageList{}
_ = json.Unmarshal([]byte(s2[i]), &v)
if v.UserID == flag {
err = rdb.LSet(key, i, "del")
err = rdb.LRem(key, 0, "del")
s2 = append(s2[:i], s2[i+1:]...)
i--
if v.NewMessageNum == 0 { //目前为首条
cnt++ //赋值1
} else {
cnt = v.NewMessageNum
}
}
}
} else {
return 0, nil
}
return cnt, err
}
// GenMsgDetailID 生成查询详细聊天记录ID
func GenMsgDetailID(jxMsg *JXMsg, vendorID int, elmAppID string) (msgID string) {
if vendorID == VendorIDMT {
var d1 = jxMsg.Data.(mtwmapi.SingleChat)
msgID = utils.Int2Str(d1.AppID) + ":" + d1.AppPoiCode + ":10:" + utils.Int2Str(d1.OpenUserID)
}
if vendorID == VendorIDELM {
var d2 = jxMsg.Data.(ebaiapi.ImMessageSend)
msgID = elmAppID + ":" + d2.PlatformShopID + ":11:" + d2.PayLoad.GroupID
}
return msgID
}
// GenMsgListID 生成展示列表时单条数据ID部分
func GenMsgListID(jxMsg *JXMsg, vendorID int, elmAppID string) (msgID string) {
if vendorID == VendorIDMT {
var d1 = jxMsg.Data.(mtwmapi.SingleChat)
msgID = utils.Int2Str(d1.AppID) + ":" + d1.AppPoiCode + ":10"
}
if vendorID == VendorIDELM {
var d2 = jxMsg.Data.(ebaiapi.ImMessageSend)
msgID = elmAppID + ":" + d2.PlatformShopID + ":11"
}
return msgID
}
// GetImUserList 获取门店用户聊天列表
func GetImUserList(req []RelInfo) (retVal []interface{}, err error) {
if len(req) == 0 {
return nil, errors.New("msgID不允许为空")
}
var keys []string
for _, i := range req {
key := i.AppID + ":" + i.VendorStoreID + ":" + i.VendorID
keys = append(keys, key)
}
for _, j := range keys {
temp := rdb.Get(j)
retVal = append(retVal, temp)
}
return retVal, err
}
// GetImChatDetail 获取门店用户聊天详情
func GetImChatDetail(req []UserRelInfo) (retVal []interface{}, err error) {
if len(req) == 0 {
return nil, errors.New("msgID不允许为空")
}
var keys []string
for _, i := range req {
key := i.AppID + ":" + i.VendorStoreID + ":" + i.VendorID + ":" + i.UserID
keys = append(keys, key)
}
for _, j := range keys {
temp := rdb.Get(j)
retVal = append(retVal, temp)
}
return retVal, err
}
// SetJxMsgRead 设置jx消息已读 userID(美团openUserID;饿了么groupID)
func SetJxMsgRead(appID, vendorStoreID, vendorID, userID string) error {
key := appID + ":" + vendorStoreID + ":" + vendorID
if n, err := rdb.Exists(key); n > 0 && err == nil {
s2 := rdb.LRange(key)
for i := 0; i < len(s2); i++ {
v := UserMessageList{}
_ = json.Unmarshal([]byte(s2[i]), &v)
if v.UserID == userID {
err = rdb.LSet(key, i, "del")
err = rdb.LRem(key, 0, "del")
s2 = append(s2[:i], s2[i+1:]...)
i--
}
}
}
return nil
}

View File

@@ -1,276 +1,276 @@
package im
//
//import (
// "encoding/json"
// "flag"
// "fmt"
// "git.rosy.net.cn/baseapi/utils"
// "io"
// "log"
// "net"
// "net/http"
// "sync"
// "time"
//
// "git.rosy.net.cn/baseapi/platformapi/mtwmapi"
// "git.rosy.net.cn/jx-callback/globals/api"
// "github.com/gazeboxu/mapstructure"
// "github.com/gorilla/websocket"
// "gopkg.in/ini.v1"
//)
//
//// ClientManager 连接管理
//type ClientManager struct {
// ClientIdMap map[string]*Client // 全部的连接
// ClientIdMapLock sync.RWMutex // 读写锁
//
// Connect chan *Client // 连接处理
// DisConnect chan *Client // 断开连接处理
//
// GroupLock sync.RWMutex
// Groups map[string][]string
//
// SystemClientsLock sync.RWMutex
// SystemClients map[string][]string
//}
//
//// Client 客户端连接信息
//type Client struct {
// ClientId string // 标识ID
// Socket *websocket.Conn // 用户连接
// ConnectTime uint64 // 首次连接时间
// IsDeleted bool // 是否删除或下线
// UserId string // 业务端标识用户ID
// //Extend string // 扩展字段,用户可以自定义
// //GroupList []string
//}
//
////channel通道结构体
//type clientInfo struct {
// ClientId string `json:"clientId" validate:"required"` //链接ID
// Data interface{}
// SendUserId string
// MessageId string
// Code int
// Msg string
//}
//
//// RetData 统一返回值结构体
//type RetData struct {
// Code int `json:"code"` //响应code
// Msg string `json:"msg"` //响应msg success/fail
// Data interface{} `json:"data"` //信息
//
// //MessageType string `json:"messageType"` //消息类型 heart-心跳检测send-发送消息receive-接收消息
// //MessageId string `json:"messageId"` //发送/接收信息 id
// //UserId string `json:"userId"` //必须是平台方userID
//}
//
//type global struct {
// LocalHost string //本机内网IP
// ServerList map[string]string
// ServerListLock sync.RWMutex
//}
//type commonConf struct {
// HttpPort string
// RPCPort string
// Cluster bool
// CryptoKey string
//}
//
//// SendData 客户端写入参数
//type SendData struct {
// //ClientId string `json:"clientId" validate:"required"` //链接ID
// VendorID int `json:"vendorID"` //消息来源平台ID
// Data interface{} `json:"data"` //发送给平台 美团/饿了么消息结构体
// //返回值
// //Code int `json:"code"`
// //Msg string `json:"msg"`
// //SendUserId string `json:"sendUserId"`
//}
//
//// JXMsg 京西消息结构体
//type JXMsg struct {
// SendType string `json:"sendType"` //消息发送方 jx-商家mt-美团elm-饿了么
// Data interface{} `json:"data"` //美团/饿了么 单聊消息
//}
//
//// GetUserListReq 获取门店用户聊天列表
//type GetUserListReq struct {
// VendorStoreID string `json:"vendorStoreID"` //平台门店id
// VendorID string `json:"vendorID"` //平台标识id
// AppID string `json:"appID"` //应用ID
//}
//
//type GetChatDetailReq struct {
// VendorStoreID string `json:"vendorStoreID"` //平台门店id
// VendorID string `json:"vendorID"` //平台标识id
// AppID string `json:"appID"` //应用ID
// UserID string `json:"userID"` //userID/groupID
//}
//
//// UserMessageList 用户消息列表
//type UserMessageList struct {
// VendorID int `json:"vendorID"` //平台品牌 10-美团 11-饿了么
// UserID string `json:"userID"` //用户ID
// NewMessageNum int `json:"NewMessageNum"` //新消息数量
// LatestMsg string `json:"latestMsg"` //最新一条消息
// LatestTime int `json:"latestTime"` //最新一条消息发送时间
//}
//
//type RelInfo struct {
// VendorStoreID string `json:"vendorStoreID"` //平台门店id
// VendorID string `json:"vendorID"` //平台标识id
// AppID string `json:"appID"` //应用ID
//}
//
//type UserRelInfo struct {
// VendorStoreID string `json:"vendorStoreID"` //平台门店id
// VendorID string `json:"vendorID"` //平台标识id
// AppID string `json:"appID"` //应用ID
// UserID string `json:"userID"` //用户id/groupID
//}
//
//var (
// cfg *ini.File
// rdb = api.Cacher
// Manager = NewClientManager() // 管理者
// CommonSetting = &commonConf{}
// GlobalSetting = &global{}
// ToClientChan chan clientInfo
// heartbeatInterval = 60 * time.Second // 心跳间隔
// HeartCheckMsg = "~#HHHBBB#~" //心跳检测消息
// HeartCheckSuccess = "HB" //成功发送返回心跳消息
// VendorIDMT = 10 //im美团
// VendorIDELM = 11 //im饿了么
// SendTypeJx = "jx" //京西客户端发送方标识
// SendTypeMt = "mt" //美团用户发送方标识符
// SendTypeElm = "elm" //饿了么用户发送方标识符
// MTIMPushUrl = "wss://wpush.meituan.com/websocket" //buildPushConnect建立长连接
//)
//
//const (
// ExpireTimeDay = 24 * time.Hour //redis一天过期时间
// maxMessageSize = 8192 // 最大的消息大小
//)
//
//type renderData struct {
// ClientId string `json:"clientId"`
//}
//
//const (
// SuccessCode = 0
// SuccessMsg = "success"
// Fail = -1
// FailMsg = "fail"
//
// SYSTEM_ID_ERROR = -1001
// ONLINE_MESSAGE_CODE = 1001
// OFFLINE_MESSAGE_CODE = 1002
//)
//
//// Render 统一返回值
//func Render(conn *websocket.Conn, messageId string, code int, message string, data interface{}) error {
// return conn.WriteJSON(RetData{
// Code: code,
// Msg: message,
// Data: data,
// })
//}
//
//func ClientRender(w http.ResponseWriter, code int, msg string, data interface{}) (str string) {
// var retData RetData
//
// retData.Code = code
// retData.Msg = msg
// retData.Data = data
//
// retJson, _ := json.Marshal(retData)
// str = string(retJson)
//
// w.Header().Set("Content-Type", "application/json; charset=utf-8")
// _, _ = io.WriteString(w, str)
// return
//}
//
//func ConnRender(conn *websocket.Conn, data interface{}) (err error) {
// err = conn.WriteJSON(RetData{
// Code: SuccessCode,
// Msg: "success",
// Data: data,
// })
// return
//}
//
//// Default 给默认值
//func Default() {
// CommonSetting = &commonConf{
// HttpPort: "6000",
// RPCPort: "7000",
// Cluster: false,
// CryptoKey: "Adba723b7fe06819",
// }
//
// GlobalSetting = &global{
// LocalHost: getIntranetIp(),
// ServerList: make(map[string]string),
// }
//}
//
//// Setup 初始化全局设置变量
//func Setup() {
// configFile := flag.String("c", "conf/app.ini", "-c conf/app.ini")
//
// var err error
// cfg, err = ini.Load(*configFile)
// if err != nil {
// log.Fatalf("setting.Setup, fail to parse 'conf/app.ini': %v", err)
// }
//
// mapTo("common", CommonSetting)
//
// GlobalSetting = &global{
// LocalHost: getIntranetIp(),
// ServerList: make(map[string]string),
// }
// fmt.Printf("LocalHost=%s\n ServerList=%s\n", GlobalSetting.LocalHost, utils.Format4Output(GlobalSetting.ServerList, false))
//}
//
//// mapTo map section
//func mapTo(section string, v interface{}) {
// err := cfg.Section(section).MapTo(v)
// if err != nil {
// log.Fatalf("Cfg.MapTo %s err: %v", section, err)
// }
//}
//
////获取本机IP
//func getIntranetIp() string {
// addrs, _ := net.InterfaceAddrs()
// for _, addr := range addrs {
// // 检查ip地址判断是否回环地址
// if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
// if ipnet.IP.To4() != nil {
// return ipnet.IP.String()
// }
//
// }
// }
// return ""
//}
//
//// GenFullUrl 组装完整websocket url以及生成clientID
//func GenFullUrl() (fullUrl string) {
// resp, err := api.MtwmAPI.GetConnectionToken()
// if err != nil {
// return ""
// }
// retVal := mtwmapi.GetConnTokenResp{}
// err = mapstructure.Decode(resp, &retVal)
// fullUrl = MTIMPushUrl + "/" + retVal.AppKey + "/" + retVal.ConnectionToken
// //clientID = api.MtwmAPI.GetAppID() + ":" + retVal.ConnectionToken
// //打印输出
// //fmt.Printf("Create websocket connect failCount:%d", retVal.UserCount)
// return fullUrl
//}
import (
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net"
"net/http"
"sync"
"time"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/baseapi/platformapi/mtwmapi"
"git.rosy.net.cn/jx-callback/globals/api"
"github.com/gazeboxu/mapstructure"
"github.com/gorilla/websocket"
"gopkg.in/ini.v1"
)
// ClientManager 连接管理
type ClientManager struct {
ClientIdMap map[string]*Client // 全部的连接
ClientIdMapLock sync.RWMutex // 读写锁
Connect chan *Client // 连接处理
DisConnect chan *Client // 断开连接处理
GroupLock sync.RWMutex
Groups map[string][]string
SystemClientsLock sync.RWMutex
SystemClients map[string][]string
}
// Client 客户端连接信息
type Client struct {
ClientId string // 标识ID
Socket *websocket.Conn // 用户连接
ConnectTime uint64 // 首次连接时间
IsDeleted bool // 是否删除或下线
UserId string // 业务端标识用户ID
//Extend string // 扩展字段,用户可以自定义
//GroupList []string
}
//channel通道结构体
type clientInfo struct {
ClientId string `json:"clientId" validate:"required"` //链接ID
Data interface{}
SendUserId string
MessageId string
Code int
Msg string
}
// RetData 统一返回值结构体
type RetData struct {
Code int `json:"code"` //响应code
Msg string `json:"msg"` //响应msg success/fail
Data interface{} `json:"data"` //信息
//MessageType string `json:"messageType"` //消息类型 heart-心跳检测send-发送消息receive-接收消息
//MessageId string `json:"messageId"` //发送/接收信息 id
//UserId string `json:"userId"` //必须是平台方userID
}
type global struct {
LocalHost string //本机内网IP
ServerList map[string]string
ServerListLock sync.RWMutex
}
type commonConf struct {
HttpPort string
RPCPort string
Cluster bool
CryptoKey string
}
// SendData 客户端写入参数
type SendData struct {
//ClientId string `json:"clientId" validate:"required"` //链接ID
VendorID int `json:"vendorID"` //消息来源平台ID
Data interface{} `json:"data"` //发送给平台 美团/饿了么消息结构体
//返回值
//Code int `json:"code"`
//Msg string `json:"msg"`
//SendUserId string `json:"sendUserId"`
}
// JXMsg 京西消息结构体
type JXMsg struct {
SendType string `json:"sendType"` //消息发送方 jx-商家mt-美团elm-饿了么
Data interface{} `json:"data"` //美团/饿了么 单聊消息
}
// GetUserListReq 获取门店用户聊天列表
type GetUserListReq struct {
VendorStoreID string `json:"vendorStoreID"` //平台门店id
VendorID string `json:"vendorID"` //平台标识id
AppID string `json:"appID"` //应用ID
}
type GetChatDetailReq struct {
VendorStoreID string `json:"vendorStoreID"` //平台门店id
VendorID string `json:"vendorID"` //平台标识id
AppID string `json:"appID"` //应用ID
UserID string `json:"userID"` //userID/groupID
}
// UserMessageList 用户消息列表
type UserMessageList struct {
VendorID int `json:"vendorID"` //平台品牌 10-美团 11-饿了么
UserID string `json:"userID"` //用户ID
NewMessageNum int `json:"NewMessageNum"` //新消息数量
LatestMsg string `json:"latestMsg"` //最新一条消息
LatestTime int `json:"latestTime"` //最新一条消息发送时间
}
type RelInfo struct {
VendorStoreID string `json:"vendorStoreID"` //平台门店id
VendorID string `json:"vendorID"` //平台标识id
AppID string `json:"appID"` //应用ID
}
type UserRelInfo struct {
VendorStoreID string `json:"vendorStoreID"` //平台门店id
VendorID string `json:"vendorID"` //平台标识id
AppID string `json:"appID"` //应用ID
UserID string `json:"userID"` //用户id/groupID
}
var (
cfg *ini.File
rdb = api.Cacher
Manager = NewClientManager() // 管理者
CommonSetting = &commonConf{}
GlobalSetting = &global{}
ToClientChan chan clientInfo
heartbeatInterval = 60 * time.Second // 心跳间隔
HeartCheckMsg = "~#HHHBBB#~" //心跳检测消息
HeartCheckSuccess = "HB" //成功发送返回心跳消息
VendorIDMT = 10 //im美团
VendorIDELM = 11 //im饿了么
SendTypeJx = "jx" //京西客户端发送方标识
SendTypeMt = "mt" //美团用户发送方标识符
SendTypeElm = "elm" //饿了么用户发送方标识符
MTIMPushUrl = "wss://wpush.meituan.com/websocket" //buildPushConnect建立长连接
)
const (
ExpireTimeDay = 24 * time.Hour //redis一天过期时间
maxMessageSize = 8192 // 最大的消息大小
)
type renderData struct {
ClientId string `json:"clientId"`
}
const (
SuccessCode = 0
SuccessMsg = "success"
Fail = -1
FailMsg = "fail"
SYSTEM_ID_ERROR = -1001
ONLINE_MESSAGE_CODE = 1001
OFFLINE_MESSAGE_CODE = 1002
)
// Render 统一返回值
func Render(conn *websocket.Conn, messageId string, code int, message string, data interface{}) error {
return conn.WriteJSON(RetData{
Code: code,
Msg: message,
Data: data,
})
}
func ClientRender(w http.ResponseWriter, code int, msg string, data interface{}) (str string) {
var retData RetData
retData.Code = code
retData.Msg = msg
retData.Data = data
retJson, _ := json.Marshal(retData)
str = string(retJson)
w.Header().Set("Content-Type", "application/json; charset=utf-8")
_, _ = io.WriteString(w, str)
return
}
func ConnRender(conn *websocket.Conn, data interface{}) (err error) {
err = conn.WriteJSON(RetData{
Code: SuccessCode,
Msg: "success",
Data: data,
})
return
}
// Default 给默认值
func Default() {
CommonSetting = &commonConf{
HttpPort: "6000",
RPCPort: "7000",
Cluster: false,
CryptoKey: "Adba723b7fe06819",
}
GlobalSetting = &global{
LocalHost: getIntranetIp(),
ServerList: make(map[string]string),
}
}
// Setup 初始化全局设置变量
func Setup() {
configFile := flag.String("c", "conf/app.ini", "-c conf/app.ini")
var err error
cfg, err = ini.Load(*configFile)
if err != nil {
log.Fatalf("setting.Setup, fail to parse 'conf/app.ini': %v", err)
}
mapTo("common", CommonSetting)
GlobalSetting = &global{
LocalHost: getIntranetIp(),
ServerList: make(map[string]string),
}
fmt.Printf("LocalHost=%s\n ServerList=%s\n", GlobalSetting.LocalHost, utils.Format4Output(GlobalSetting.ServerList, false))
}
// mapTo map section
func mapTo(section string, v interface{}) {
err := cfg.Section(section).MapTo(v)
if err != nil {
log.Fatalf("Cfg.MapTo %s err: %v", section, err)
}
}
//获取本机IP
func getIntranetIp() string {
addrs, _ := net.InterfaceAddrs()
for _, addr := range addrs {
// 检查ip地址判断是否回环地址
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
return ipnet.IP.String()
}
}
}
return ""
}
// GenFullUrl 组装完整websocket url以及生成clientID
func GenFullUrl() (fullUrl string) {
resp, err := api.MtwmAPI.GetConnectionToken()
if err != nil {
return ""
}
retVal := mtwmapi.GetConnTokenResp{}
err = mapstructure.Decode(resp, &retVal)
fullUrl = MTIMPushUrl + "/" + retVal.AppKey + "/" + retVal.ConnectionToken
//clientID = api.MtwmAPI.GetAppID() + ":" + retVal.ConnectionToken
//打印输出
//fmt.Printf("Create websocket connect failCount:%d", retVal.UserCount)
return fullUrl
}

View File

@@ -1,264 +1,263 @@
package im
//
//import (
// "errors"
// "fmt"
// "net/http"
// "time"
//
// "git.rosy.net.cn/baseapi/utils"
//
// "git.rosy.net.cn/jx-callback/globals"
// "github.com/gorilla/websocket"
//)
//
//func Init() {
// //初始化
// ToClientChan = make(chan clientInfo, 1000)
// //写入全局变量
// //Default()
//
// Setup()
// //建立长链接
// //StartWebSocket(res, req)
// Send([]byte(HeartCheckMsg))
//
// //启动定时器
// PingTimer()
//
// go WriteMessage()
//
// go Manager.Start()
//
// fmt.Printf("服务器启动成功,端口号:%s\n", CommonSetting.HttpPort)
//}
//
//func Run(w http.ResponseWriter, r *http.Request) {
// conn, err := (&websocket.Upgrader{
// ReadBufferSize: 1024,
// WriteBufferSize: 1024,
// // 允许所有CORS跨域请求
// CheckOrigin: func(r *http.Request) bool {
// return true
// },
// }).Upgrade(w, r, nil)
// if err != nil {
// globals.SugarLogger.Debugf("upgrade error: %v", err)
// http.NotFound(w, r)
// return
// }
//
// //设置读取消息大小上线
// conn.SetReadLimit(maxMessageSize)
//
// clientID := r.FormValue("clientId")
// clientSocket := NewClient(clientID, conn)
//
// //读取客户端消息
// clientSocket.Read()
//
// if err = ConnRender(conn, renderData{ClientId: clientID}); err != nil {
// _ = conn.Close()
// return
// }
//
// // 用户连接事件
// Manager.Connect <- clientSocket
//}
//
//func StartWebSocket(conn *websocket.Conn, clientID string, err error) {
//
// //设置读取消息大小上线
// conn.SetReadLimit(maxMessageSize)
//
// clientSocket := NewClient(clientID, conn)
//
// //读取客户端消息
// clientSocket.Read()
//
// if err = ConnRender(conn, renderData{ClientId: clientID}); err != nil {
// _ = conn.Close()
// return
// }
//
// // 用户连接事件
// Manager.Connect <- clientSocket
//
//}
//
//// PingTimer 定时器发送心跳
//func PingTimer() {
// go func() {
// ticker := time.NewTicker(heartbeatInterval)
// defer ticker.Stop()
// //测试用
// i := 0
// for {
// i++
// <-ticker.C
// for clientId, conn := range Manager.AllClient() {
// if err := conn.Socket.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
// Manager.DisConnect <- conn
// globals.SugarLogger.Debugf("发送心跳失败: %s 总连接数:%d", clientId, Manager.Count())
// }
// if err := ConnRender(conn.Socket, renderData{ClientId: clientId}); err != nil {
// return
// }
// globals.SugarLogger.Debugf("clientId=%s,i=%d", clientId, i)
// }
// }
// }()
//}
//
//// WriteMessage 监听并发送给客户端信息
//func WriteMessage() {
// i := 0
// for {
// clientInfo := <-ToClientChan
// //广播发送通知所有客户端
// i++
// fmt.Printf("WriteMessage clientInfo=%s i=%d", utils.Format4Output(clientInfo, false), i)
// if Manager.AllClient() != nil {
// for _, conn := range Manager.AllClient() {
// if err := Render(conn.Socket, clientInfo.MessageId, clientInfo.Code, clientInfo.Msg, clientInfo.Data); err != nil {
// Manager.DisConnect <- conn
// }
// }
// } else {
// globals.SugarLogger.Debugf("无客户端连接,请检查")
// return
// }
// //if conn, err := Manager.GetByClientId(clientInfo.ClientId); err == nil && conn != nil {
// // if err := Render(conn.Socket, clientInfo.MessageId, clientInfo.Code, clientInfo.Msg, clientInfo.Data); err != nil {
// // Manager.DisConnect <- conn
// // }
// //}
// }
//}
//
//// Start 管道处理程序
//func (manager *ClientManager) Start() {
// for {
// select {
// case client := <-manager.Connect:
// // 建立连接事件
// manager.EventConnect(client)
// case conn := <-manager.DisConnect:
// // 断开连接事件
// manager.EventDisconnect(conn)
// }
// }
//}
//
////从客户端读取数据
//func (c *Client) Read() {
// go func() {
// for {
// messageType, msg, err := c.Socket.ReadMessage()
// if err != nil {
// if messageType == -1 && websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
// Manager.DisConnect <- c
// return
// } else if messageType != websocket.PingMessage {
// return
// }
// } else {
// SendToVendor(msg)
// }
// }
// }()
//}
//
//// 以下为连接事件操作*******************************************
//
//// EventConnect 建立连接事件
//func (manager *ClientManager) EventConnect(client *Client) {
// manager.AddClient(client)
//}
//
//// EventDisconnect 断开连接事件
//func (manager *ClientManager) EventDisconnect(client *Client) {
// //关闭连接
// _ = client.Socket.Close()
// manager.DelClient(client)
// //标记销毁
// client.IsDeleted = true
// client = nil
//}
//
////以下为客户端Client操作*******************************************
//
//// NewClient 初始化Client
//func NewClient(clientId string, socket *websocket.Conn) *Client {
// return &Client{
// ClientId: clientId,
// Socket: socket,
// ConnectTime: uint64(time.Now().Unix()),
// IsDeleted: false,
// }
//}
//
//// AddClient 添加客户端
//func (manager *ClientManager) AddClient(client *Client) {
// manager.ClientIdMapLock.Lock()
// defer manager.ClientIdMapLock.Unlock()
//
// manager.ClientIdMap[client.ClientId] = client
//}
//
//// DelClient 删除客户端
//func (manager *ClientManager) DelClient(client *Client) {
// manager.delClientIdMap(client.ClientId)
//
//}
//
//// 删除clientIdMap
//func (manager *ClientManager) delClientIdMap(clientId string) {
// manager.ClientIdMapLock.Lock()
// defer manager.ClientIdMapLock.Unlock()
//
// delete(manager.ClientIdMap, clientId)
//}
//
//// GetByClientId 通过clientId获取client
//func (manager *ClientManager) GetByClientId(clientId string) (*Client, error) {
// manager.ClientIdMapLock.RLock()
// defer manager.ClientIdMapLock.RUnlock()
//
// if client, ok := manager.ClientIdMap[clientId]; !ok {
// return nil, errors.New("客户端不存在")
// } else {
// return client, nil
// }
//}
//
//// AllClient 获取所有的客户端
//func (manager *ClientManager) AllClient() map[string]*Client {
// manager.ClientIdMapLock.RLock()
// defer manager.ClientIdMapLock.RUnlock()
//
// return manager.ClientIdMap
//}
//
////与客户端的交互操作*************************
//
//// NewClientManager 初始化客户端管理
//func NewClientManager() (clientManager *ClientManager) {
// clientManager = &ClientManager{
// ClientIdMap: make(map[string]*Client),
// Connect: make(chan *Client, 10000),
// DisConnect: make(chan *Client, 10000),
// Groups: make(map[string][]string, 100),
// SystemClients: make(map[string][]string, 100),
// }
//
// return
//}
//
//// Count 获取客户端数量
//func (manager *ClientManager) Count() int {
// manager.ClientIdMapLock.RLock()
// defer manager.ClientIdMapLock.RUnlock()
// return len(manager.ClientIdMap)
//}
import (
"errors"
"fmt"
"net/http"
"time"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/globals"
"github.com/gorilla/websocket"
)
func Init() {
//初始化
ToClientChan = make(chan clientInfo, 1000)
//写入全局变量
//Default()
Setup()
//建立长链接
//StartWebSocket(res, req)
Send([]byte(HeartCheckMsg))
//启动定时器
PingTimer()
go WriteMessage()
go Manager.Start()
fmt.Printf("服务器启动成功,端口号:%s\n", CommonSetting.HttpPort)
}
func Run(w http.ResponseWriter, r *http.Request) {
conn, err := (&websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
// 允许所有CORS跨域请求
CheckOrigin: func(r *http.Request) bool {
return true
},
}).Upgrade(w, r, nil)
if err != nil {
globals.SugarLogger.Debugf("upgrade error: %v", err)
http.NotFound(w, r)
return
}
//设置读取消息大小上线
conn.SetReadLimit(maxMessageSize)
clientID := r.FormValue("clientId")
clientSocket := NewClient(clientID, conn)
//读取客户端消息
clientSocket.Read()
if err = ConnRender(conn, renderData{ClientId: clientID}); err != nil {
_ = conn.Close()
return
}
// 用户连接事件
Manager.Connect <- clientSocket
}
func StartWebSocket(conn *websocket.Conn, clientID string, err error) {
//设置读取消息大小上线
conn.SetReadLimit(maxMessageSize)
clientSocket := NewClient(clientID, conn)
//读取客户端消息
clientSocket.Read()
if err = ConnRender(conn, renderData{ClientId: clientID}); err != nil {
_ = conn.Close()
return
}
// 用户连接事件
Manager.Connect <- clientSocket
}
// PingTimer 定时器发送心跳
func PingTimer() {
go func() {
ticker := time.NewTicker(heartbeatInterval)
defer ticker.Stop()
//测试用
i := 0
for {
i++
<-ticker.C
for clientId, conn := range Manager.AllClient() {
if err := conn.Socket.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
Manager.DisConnect <- conn
globals.SugarLogger.Debugf("发送心跳失败: %s 总连接数:%d", clientId, Manager.Count())
}
if err := ConnRender(conn.Socket, renderData{ClientId: clientId}); err != nil {
return
}
globals.SugarLogger.Debugf("clientId=%s,i=%d", clientId, i)
}
}
}()
}
// WriteMessage 监听并发送给客户端信息
func WriteMessage() {
i := 0
for {
clientInfo := <-ToClientChan
//广播发送通知所有客户端
i++
fmt.Printf("WriteMessage clientInfo=%s i=%d", utils.Format4Output(clientInfo, false), i)
if Manager.AllClient() != nil {
for _, conn := range Manager.AllClient() {
if err := Render(conn.Socket, clientInfo.MessageId, clientInfo.Code, clientInfo.Msg, clientInfo.Data); err != nil {
Manager.DisConnect <- conn
}
}
} else {
globals.SugarLogger.Debugf("无客户端连接,请检查")
return
}
//if conn, err := Manager.GetByClientId(clientInfo.ClientId); err == nil && conn != nil {
// if err := Render(conn.Socket, clientInfo.MessageId, clientInfo.Code, clientInfo.Msg, clientInfo.Data); err != nil {
// Manager.DisConnect <- conn
// }
//}
}
}
// Start 管道处理程序
func (manager *ClientManager) Start() {
for {
select {
case client := <-manager.Connect:
// 建立连接事件
manager.EventConnect(client)
case conn := <-manager.DisConnect:
// 断开连接事件
manager.EventDisconnect(conn)
}
}
}
//从客户端读取数据
func (c *Client) Read() {
go func() {
for {
messageType, msg, err := c.Socket.ReadMessage()
if err != nil {
if messageType == -1 && websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
Manager.DisConnect <- c
return
} else if messageType != websocket.PingMessage {
return
}
} else {
SendToVendor(msg)
}
}
}()
}
// 以下为连接事件操作*******************************************
// EventConnect 建立连接事件
func (manager *ClientManager) EventConnect(client *Client) {
manager.AddClient(client)
}
// EventDisconnect 断开连接事件
func (manager *ClientManager) EventDisconnect(client *Client) {
//关闭连接
_ = client.Socket.Close()
manager.DelClient(client)
//标记销毁
client.IsDeleted = true
client = nil
}
//以下为客户端Client操作*******************************************
// NewClient 初始化Client
func NewClient(clientId string, socket *websocket.Conn) *Client {
return &Client{
ClientId: clientId,
Socket: socket,
ConnectTime: uint64(time.Now().Unix()),
IsDeleted: false,
}
}
// AddClient 添加客户端
func (manager *ClientManager) AddClient(client *Client) {
manager.ClientIdMapLock.Lock()
defer manager.ClientIdMapLock.Unlock()
manager.ClientIdMap[client.ClientId] = client
}
// DelClient 删除客户端
func (manager *ClientManager) DelClient(client *Client) {
manager.delClientIdMap(client.ClientId)
}
// 删除clientIdMap
func (manager *ClientManager) delClientIdMap(clientId string) {
manager.ClientIdMapLock.Lock()
defer manager.ClientIdMapLock.Unlock()
delete(manager.ClientIdMap, clientId)
}
// GetByClientId 通过clientId获取client
func (manager *ClientManager) GetByClientId(clientId string) (*Client, error) {
manager.ClientIdMapLock.RLock()
defer manager.ClientIdMapLock.RUnlock()
if client, ok := manager.ClientIdMap[clientId]; !ok {
return nil, errors.New("客户端不存在")
} else {
return client, nil
}
}
// AllClient 获取所有的客户端
func (manager *ClientManager) AllClient() map[string]*Client {
manager.ClientIdMapLock.RLock()
defer manager.ClientIdMapLock.RUnlock()
return manager.ClientIdMap
}
//与客户端的交互操作*************************
// NewClientManager 初始化客户端管理
func NewClientManager() (clientManager *ClientManager) {
clientManager = &ClientManager{
ClientIdMap: make(map[string]*Client),
Connect: make(chan *Client, 10000),
DisConnect: make(chan *Client, 10000),
Groups: make(map[string][]string, 100),
SystemClients: make(map[string][]string, 100),
}
return
}
// Count 获取客户端数量
func (manager *ClientManager) Count() int {
manager.ClientIdMapLock.RLock()
defer manager.ClientIdMapLock.RUnlock()
return len(manager.ClientIdMap)
}

View File

@@ -1,106 +1,105 @@
package controllers
//
//import (
// "encoding/json"
// "net/http"
//
// "git.rosy.net.cn/jx-callback/globals"
//
// "git.rosy.net.cn/jx-callback/business/partner/purchase/im"
// "github.com/astaxie/beego/server/web"
//)
//
//type IMController struct {
// web.Controller
//}
//
//// @Title IM初始化长链接
//// @Description IM初始化长链接
//// @Success 200 {object} controllers.CallResult
//// @Failure 200 {object} controllers.CallResult
//// @router /StartWebSocket [get]
//func (c *IMController) StartWebSocket() {
// upgrader.CheckOrigin = func(r *http.Request) bool {
// return true
// }
// ws, err := upgrader.Upgrade(c.Ctx.ResponseWriter, c.Ctx.Request, nil)
// if err != nil {
// globals.SugarLogger.Errorf("upgrade error: %v", err)
// return
// }
// defer ws.Close()
//
// clientID := c.GetString("clientID")
// globals.SugarLogger.Debugf("clientID=%s", clientID)
//
// im.StartWebSocket(ws, clientID, err)
//}
//
//// @Title IM获取门店用户聊天列表
//// @Description IM获取门店用户聊天列表
//// @Param token header string true "认证token"
//// @Param payLoad formData string true "平台应用映射关系"
//// @Success 200 {object} controllers.CallResult
//// @Failure 200 {object} controllers.CallResult
//// @router /GetIMUserList [get]
//func (c *IMController) GetIMUserList() {
// c.callGetIMUserList(func(params *tImGetIMUserListParams) (retVal interface{}, errCode string, err error) {
// var relInfo []im.RelInfo
// if err = json.Unmarshal([]byte(params.PayLoad), &relInfo); err != nil {
// retVal, err = im.GetImUserList(relInfo)
import (
"encoding/json"
"net/http"
"git.rosy.net.cn/jx-callback/globals"
"git.rosy.net.cn/jx-callback/business/partner/purchase/im"
"github.com/astaxie/beego/server/web"
)
type IMController struct {
web.Controller
}
// @Title IM初始化长链接
// @Description IM初始化长链接
// @Success 200 {object} controllers.CallResult
// @Failure 200 {object} controllers.CallResult
// @router /StartWebSocket [get]
func (c *IMController) StartWebSocket() {
upgrader.CheckOrigin = func(r *http.Request) bool {
return true
}
ws, err := upgrader.Upgrade(c.Ctx.ResponseWriter, c.Ctx.Request, nil)
if err != nil {
globals.SugarLogger.Errorf("upgrade error: %v", err)
return
}
defer ws.Close()
clientID := c.GetString("clientID")
globals.SugarLogger.Debugf("clientID=%s", clientID)
im.StartWebSocket(ws, clientID, err)
}
// @Title IM获取门店用户聊天列表
// @Description IM获取门店用户聊天列表
// @Param token header string true "认证token"
// @Param payLoad formData string true "平台应用映射关系"
// @Success 200 {object} controllers.CallResult
// @Failure 200 {object} controllers.CallResult
// @router /GetIMUserList [get]
func (c *IMController) GetIMUserList() {
c.callGetIMUserList(func(params *tImGetIMUserListParams) (retVal interface{}, errCode string, err error) {
var relInfo []im.RelInfo
if err = json.Unmarshal([]byte(params.PayLoad), &relInfo); err != nil {
retVal, err = im.GetImUserList(relInfo)
}
return retVal, "", err
})
}
// @Title IM获取单个用户聊天详情
// @Description IM获取单个用户聊天详情
// @Param token header string true "认证token"
// @Param payLoad formData string true "平台用户应用映射关系"
// @Success 200 {object} controllers.CallResult
// @Failure 200 {object} controllers.CallResult
// @router /GetImChatDetail [get]
func (c *IMController) GetImChatDetail() {
c.callGetImChatDetail(func(params *tImGetImChatDetailParams) (retVal interface{}, errCode string, err error) {
var temp []im.UserRelInfo
if err = json.Unmarshal([]byte(params.PayLoad), &temp); err == nil {
retVal, err = im.GetImChatDetail(temp)
}
return retVal, "", err
})
}
// @Title IM设置门店与单个用户已读
// @Description IM设置门店与单个用户已读
// @Param token header string true "认证token"
// @Param appID formData string true "应用id"
// @Param vendorStoreID formData string true "平台门店id"
// @Param vendorID formData string true "平台id"
// @Param userID formData string true "用户id/会话id"
// @Success 200 {object} controllers.CallResult
// @Failure 200 {object} controllers.CallResult
// @router /SetImMsgRead [post]
func (c *IMController) SetImMsgRead() {
c.callSetImMsgRead(func(params *tImSetImMsgReadParams) (retVal interface{}, errCode string, err error) {
err = im.SetJxMsgRead(params.AppID, params.VendorStoreID, params.VendorID, params.UserID)
return nil, "", err
})
}
// @Title 向平台商发送信息
// @Description 向平台商发送信息
// @Param token header string true "认证token"
// @Param sendData formData string true "平台商消息结构体"
// @Success 200 {object} controllers.CallResult
// @Failure 200 {object} controllers.CallResult
// @router /SendToVendor [post]
//func (c *IMController) SendToVendor() {
// c.callSendToVendor(func(params *tImSendToVendorParams) (retVal interface{}, errCode string, err error) {
// var sendData im.SendData
// if err = json.Unmarshal([]byte(params.SendData), &sendData); err == nil {
// im.SendToVendor(sendData)
// }
// return retVal, "", err
// })
//}
//
//// @Title IM获取单个用户聊天详情
//// @Description IM获取单个用户聊天详情
//// @Param token header string true "认证token"
//// @Param payLoad formData string true "平台用户应用映射关系"
//// @Success 200 {object} controllers.CallResult
//// @Failure 200 {object} controllers.CallResult
//// @router /GetImChatDetail [get]
//func (c *IMController) GetImChatDetail() {
// c.callGetImChatDetail(func(params *tImGetImChatDetailParams) (retVal interface{}, errCode string, err error) {
// var temp []im.UserRelInfo
// if err = json.Unmarshal([]byte(params.PayLoad), &temp); err == nil {
// retVal, err = im.GetImChatDetail(temp)
// }
// return retVal, "", err
// })
//}
//
//// @Title IM设置门店与单个用户已读
//// @Description IM设置门店与单个用户已读
//// @Param token header string true "认证token"
//// @Param appID formData string true "应用id"
//// @Param vendorStoreID formData string true "平台门店id"
//// @Param vendorID formData string true "平台id"
//// @Param userID formData string true "用户id/会话id"
//// @Success 200 {object} controllers.CallResult
//// @Failure 200 {object} controllers.CallResult
//// @router /SetImMsgRead [post]
//func (c *IMController) SetImMsgRead() {
// c.callSetImMsgRead(func(params *tImSetImMsgReadParams) (retVal interface{}, errCode string, err error) {
// err = im.SetJxMsgRead(params.AppID, params.VendorStoreID, params.VendorID, params.UserID)
// return nil, "", err
// })
//}
//
//// @Title 向平台商发送信息
//// @Description 向平台商发送信息
//// @Param token header string true "认证token"
//// @Param sendData formData string true "平台商消息结构体"
//// @Success 200 {object} controllers.CallResult
//// @Failure 200 {object} controllers.CallResult
//// @router /SendToVendor [post]
////func (c *IMController) SendToVendor() {
//// c.callSendToVendor(func(params *tImSendToVendorParams) (retVal interface{}, errCode string, err error) {
//// var sendData im.SendData
//// if err = json.Unmarshal([]byte(params.SendData), &sendData); err == nil {
//// im.SendToVendor(sendData)
//// }
//// return nil, "", err
//// })
////}

View File

@@ -294,7 +294,7 @@ func Init() {
QiniuAPI = qbox.NewMac(beego.AppConfig.DefaultString("qiniuAK", ""), beego.AppConfig.DefaultString("qiniuSK", ""))
ShowAPI = showapi.New(beego.AppConfig.DefaultInt("showAppID", 0), beego.AppConfig.DefaultString("showAppSecret", ""))
Cacher = redis.New(beego.AppConfig.DefaultString("redisHost", ""), beego.AppConfig.DefaultInt("redisPort", 0), beego.AppConfig.DefaultString("redisPassword", ""))
Cacher = redis.New(beego.AppConfig.DefaultString("redisHost", "localhost"), beego.AppConfig.DefaultInt("redisPort", 0), beego.AppConfig.DefaultString("redisPassword", ""))
//Todo 本地测试用
//Cacher = redis.New(beego.AppConfig.DefaultString("redisHost", "127.0.0.1"), beego.AppConfig.DefaultInt("redisPort", 6379), beego.AppConfig.DefaultString("redisPassword", "123456"))

View File

@@ -3,6 +3,7 @@ package main
import (
"flag"
"fmt"
"git.rosy.net.cn/jx-callback/business/partner/purchase/im"
"net/http"
_ "net/http/pprof"
"os"
@@ -94,7 +95,7 @@ func Init() {
enterprise.Init() // 初始化enterprise key
auto_delivery.Init() // 初始化骑手列表
//im.Init() //初始化ws连接
im.Init() //初始化ws连接
//go http.HandleFunc("/v2/im/StartWebSocket", im.Run)
//test

View File

@@ -165,11 +165,11 @@ func init() {
&controllers.VersionController{},
),
),
//web.NSNamespace("/im",
// web.NSInclude(
// &controllers.IMController{},
// ),
//),
web.NSNamespace("/im",
web.NSInclude(
&controllers.IMController{},
),
),
)
web.AddNamespace(ns)