Merge branch 'jdshop' of e.coding.net:rosydev/jx-callback into jdshop

This commit is contained in:
邹宗楠
2023-05-11 10:58:55 +08:00
21 changed files with 471 additions and 214 deletions

View File

@@ -9,7 +9,7 @@ import (
)
const (
IMVendorIDELM = 11 //饿了么
IMVendorIDELM = 3 //饿了么
)
// OnImMessage 用户/骑手 发送/已读消息 回调

View File

@@ -5,6 +5,10 @@ import (
"errors"
"fmt"
"git.rosy.net.cn/jx-callback/business/jxutils"
"git.rosy.net.cn/baseapi/utils/errlist"
"git.rosy.net.cn/jx-callback/globals"
"git.rosy.net.cn/baseapi/platformapi/ebaiapi"
@@ -19,7 +23,6 @@ import (
// SendToVendor 向平台发消息
func SendToVendor(msg []byte) {
var (
//w http.ResponseWriter
sendData SendData
err error
elmAppID = api.EbaiAPI.GetSource()
@@ -36,7 +39,11 @@ func SendToVendor(msg []byte) {
//发送信息
if sendData.VendorID == VendorIDMT {
temp, _ := json.Marshal(sendData.Data)
Send(temp)
if sendData.Data.(map[string]interface{})["app_id"] == nil {
globals.SugarLogger.Debug("SendToVendor appId=null")
return
}
Send(temp, sendData.Data.(map[string]interface{})["app_id"])
}
if sendData.VendorID == VendorIDELM {
param := sendData.Data.(ebaiapi.BusinessSendMsgReq)
@@ -46,23 +53,24 @@ func SendToVendor(msg []byte) {
}
}
if err != nil {
ClientRender(Fail, FailMsg, map[string]string{
"errMsg": fmt.Sprintf("%v", err),
})
return
} else {
ClientRender(SuccessCode, SuccessMsg, map[string]interface{}{
"vendorID": sendData.VendorID,
"msg": "ok",
})
return
}
//if err != nil {
// ClientRender(Fail, FailMsg, map[string]string{
// "errMsg": fmt.Sprintf("%v", err),
// })
// return err
//} else {
// ClientRender(SuccessCode, SuccessMsg, map[string]interface{}{
// "vendorID": sendData.VendorID,
// "msg": "ok",
// })
// return nil
//}
return
}
func Send(data []byte) {
//生成完整url
fullUrl := GenFullUrl() //clientID暂时不用
func Send(data []byte, appID interface{}) {
//根据appID生成完整url
fullUrl := GenFullUrl(appID.(float64)) //clientID暂时不用
conn, resp, err := websocket.DefaultDialer.Dial(fullUrl, nil)
if err != nil || resp.StatusCode != 101 {
@@ -83,11 +91,11 @@ func Send(data []byte) {
for {
_, msg, err := conn.ReadMessage()
temp := string(msg)
if err != nil {
break
} else {
temp := string(msg)
if temp != HeartCheckSuccess || temp != "成功" {
if temp != HeartSuccessWord {
ReadMsgFromVendor(VendorIDMT, "", msg)
}
}
@@ -96,6 +104,87 @@ func Send(data []byte) {
return
}
// MtInit 发送心跳
func MtInit() {
data := []byte(HeartCheckMsg)
//生成完整url
url := GenFullUrl2()
//主连接
jxutils.CallMsgHandlerAsync(func() {
conn, resp, err := websocket.DefaultDialer.Dial(url.UrlMain, nil)
if err != nil || resp.StatusCode != 101 {
fmt.Printf("连接失败:%v http响应不成功", err)
}
//关闭
defer func(conn *websocket.Conn) {
err := conn.Close()
if err != nil {
return
}
}(conn)
//client连接事件
client := NewClient(url.ClientIDMain, conn, ClientTypeMt)
Manager.Connect <- client
err = conn.WriteMessage(websocket.TextMessage, data)
if err != nil {
fmt.Println(err)
}
for {
_, msg, err := conn.ReadMessage()
temp := string(msg)
if err != nil || temp != "HB" {
break
} else {
ReadMsgFromVendor(VendorIDMT, "", msg)
}
fmt.Printf("%s receive: %s\n", conn.RemoteAddr(), string(msg))
}
}, url.ClientIDMain)
//副连接
if url.UrlSub != "" {
jxutils.CallMsgHandlerAsync(func() {
connSub, respSub, errSub := websocket.DefaultDialer.Dial(url.UrlSub, nil)
if errSub != nil || respSub.StatusCode != 101 {
fmt.Printf("连接失败:%v http响应不成功", errSub)
}
//关闭
defer func(conn *websocket.Conn) {
err := conn.Close()
if err != nil {
return
}
}(connSub)
//client连接事件
client := NewClient(url.ClientIDSub, connSub, ClientTypeMt)
Manager.Connect <- client
errSub = connSub.WriteMessage(websocket.TextMessage, data)
if errSub != nil {
fmt.Println(errSub)
}
for {
_, msg, err := connSub.ReadMessage()
temp := string(msg)
if err != nil || temp != HeartCheckSuccess {
break
} else {
ReadMsgFromVendor(VendorIDMT, "", msg)
}
fmt.Printf("%s connSub:receive: %s\n", connSub.RemoteAddr(), string(msg))
}
}, url.ClientIDSub)
}
}
// ReadMsgFromClient 存储客户端发送的消息
func ReadMsgFromClient(vendorID int, elmAppID string, msg interface{}) {
var (
@@ -221,13 +310,8 @@ func PushMsgByCid(vendorStoreID string, vendorID int) error {
func SetMessageDetail(req *JXMsg, vendorID int, elmAppID string) error {
//生成京西消息ID detail
msgID := GenMsgDetailID(req, vendorID, elmAppID)
err := rdb.Set("test", "可以插入数据sjdfoiqaj", ExpireTimeDay)
if err != nil {
globals.SugarLogger.Debugf("测试插入err:%v", err)
}
data, _ := json.Marshal(req)
err = rdb.RPush(msgID, string(data))
globals.SugarLogger.Debugf("im SetUserList err=%v", err)
err := rdb.RPush(msgID, string(data))
ok, err := rdb.ExpireResult(msgID, ExpireTimeDay)
if err != nil || !ok {
return err
@@ -251,12 +335,6 @@ func SetUserList(jxMsg *JXMsg, userList *UserMessageList, vendorID int, elmAppID
//存储当前数据
data, _ := json.Marshal(userList)
err = rdb.RPush(msgID, string(data))
globals.SugarLogger.Debugf("im SetUserList msgID=%s", msgID)
globals.SugarLogger.Debugf("im SetUserList err=%v", err)
//test
str := rdb.Get(msgID)
globals.SugarLogger.Debugf("im SetUserList str=%v", str)
//over
ok, err := rdb.ExpireResult(msgID, ExpireTimeDay)
if err != nil || !ok {
return err
@@ -331,6 +409,8 @@ func GetImUserList(req []RelInfo) (map[string][]interface{}, error) {
temp := rdb.LRange(j)
for _, v := range temp {
retVal[j] = append(retVal[j], v)
//暂时写死
//retVal["userList"] = append(retVal["userList"], v)
}
}
return retVal, nil
@@ -351,6 +431,7 @@ func GetImChatDetail(req []UserRelInfo) (map[string][]interface{}, error) {
temp := rdb.LRange(j)
for _, v := range temp {
retVal[j] = append(retVal[j], v)
//retVal["chatDetail"] = append(retVal["chatDetail"], v)
}
}
return retVal, nil
@@ -388,3 +469,18 @@ func SetJxMsgRead(appID, vendorStoreID, vendorID, userID string) error {
}
return nil
}
// DelRedisByKey 清除redis数据
func DelRedisByKey(keys []string) {
var errList errlist.ErrList
for _, key := range keys {
err := rdb.Del(key)
if err != nil {
errList.AddErr(err)
}
}
if errList.GetErrListAsOne() != nil {
globals.SugarLogger.Debugf("DelRedisByKey err=%v", errList.GetErrListAsOne())
}
return
}

View File

@@ -38,6 +38,7 @@ type ClientManager struct {
type Client struct {
ClientId string // 标识ID
Socket *websocket.Conn // 用户连接
ClientType string //标识是美团/客户端长链接
ConnectTime uint64 // 首次连接时间
IsDeleted bool // 是否删除或下线
UserId string // 业务端标识用户ID
@@ -132,27 +133,45 @@ type UserRelInfo struct {
UserID string `json:"userID"` //用户id/groupID
}
// UrlInfo 生成美团长链接url信息
type UrlInfo struct {
UrlMain string `json:"urlMain"` //主连接路由
ClientIDMain string `json:"ClientIDMain"` //主连接id
UrlSub string `json:"urlSub"` //副连接路由
ClientIDSub string `json:"ClientIDSub"` //副连接id
}
var (
cfg *ini.File
rdb = api.Cacher
Manager = NewClientManager() // 管理者
CommonSetting = &commonConf{}
GlobalSetting = &global{}
ToClientChan chan clientInfo
heartbeatInterval = 60 * time.Second // 心跳间隔
HeartCheckMsg = "~#HHHBBB#~" //心跳检测消息
HeartCheckSuccess = "HB" //成功发送返回心跳消息
VendorIDMT = 10 //im美团
VendorIDELM = 11 //im饿了么
SendTypeJx = "jx" //京西客户端发送方标识
SendTypeMt = "mt" //美团用户发送方标识符
SendTypeElm = "elm" //饿了么用户发送方标识符
MTIMPushUrl = "wss://wpush.meituan.com/websocket" //buildPushConnect建立长连接
cfg *ini.File
rdb = api.Cacher
//客户端相关
Manager = NewClientManager() // 管理者
ToClientChan chan clientInfo
ClientTypeJx = "jx" //京西客户端
ClientTypeMt = "mt" //美团客户端
//配置文件
CommonSetting = &commonConf{}
GlobalSetting = &global{}
//心跳相关
heartbeatInterval = 60 * time.Second // 心跳间隔
HeartCheckMsg = "~#HHHBBB#~" //心跳检测消息
HeartCheckSuccess = "HB" //成功发送返回心跳消息
HeartSuccessWord = "成功" //成功发送返回心跳消息
//平台标识
AppID5873 = float64(5873)
AppID589 = "589"
VendorIDMT = 1 //im美团
VendorIDELM = 3 //im饿了么
SendTypeJx = "jx" //京西客户端发送方标识
SendTypeMt = "mt" //美团用户发送方标识符
SendTypeElm = "elm" //饿了么用户发送方标识符
MTIMPushUrl = "wss://wpush.meituan.com/websocket" //buildPushConnect建立长连接
)
const (
ExpireTimeDay = 24 * time.Hour //redis一天过期时间
maxMessageSize = 8192 // 最大的消息大小
ExpireTimeDay = 2 * time.Hour //redis一天过期时间
maxMessageSize = 8192 // 最大的消息大小
)
type renderData struct {
@@ -162,7 +181,7 @@ type renderData struct {
const (
SuccessCode = 0
SuccessMsg = "success"
Fail = -1
FailCode = -1
FailMsg = "fail"
SYSTEM_ID_ERROR = -1001
@@ -189,12 +208,13 @@ func Render(conn *websocket.Conn, messageId string, code int, message string, da
}
}
func ClientRender(code int, msg string, data interface{}) (str string) {
// ClientRender http响应
func ClientRender(code int, msg string) (str string) {
var retData RetData
retData.Code = code
retData.Msg = msg
retData.Data = data
//retData.Data = data
retJson, _ := json.Marshal(retData)
str = string(retJson)
@@ -272,7 +292,16 @@ func getIntranetIp() string {
}
// GenFullUrl 组装完整websocket url以及生成clientID
func GenFullUrl() (fullUrl string) {
func GenFullUrl(appID float64) (fullUrl string) {
if appID == AppID5873 {
if resp5873, err := api.Mtwm2API.GetConnectionToken(); err == nil {
r1 := mtwmapi.GetConnTokenResp{}
err = mapstructure.Decode(resp5873, &r1)
fullUrl = MTIMPushUrl + "/" + r1.AppKey + "/" + r1.ConnectionToken
return fullUrl
}
}
//589/4123
resp, err := api.MtwmAPI.GetConnectionToken()
if err != nil {
return ""
@@ -280,13 +309,34 @@ func GenFullUrl() (fullUrl string) {
retVal := mtwmapi.GetConnTokenResp{}
err = mapstructure.Decode(resp, &retVal)
fullUrl = MTIMPushUrl + "/" + retVal.AppKey + "/" + retVal.ConnectionToken
//clientID = api.MtwmAPI.GetAppID() + ":" + retVal.ConnectionToken
//打印输出
//fmt.Printf("Create websocket connect failCount:%d", retVal.UserCount)
return fullUrl
}
//生成随机字符串
// GenFullUrl2 组装完整websocket url以及生成clientID
func GenFullUrl2() *UrlInfo {
urlInfo := &UrlInfo{}
//1 589/4123
resp, err := api.MtwmAPI.GetConnectionToken()
if err != nil {
return nil
}
retVal := mtwmapi.GetConnTokenResp{}
err = mapstructure.Decode(resp, &retVal)
urlInfo.UrlMain = MTIMPushUrl + "/" + retVal.AppKey + "/" + retVal.ConnectionToken
urlInfo.ClientIDMain = retVal.AppKey + ":" + retVal.ConnectionToken
if api.MtwmAPI.GetAppID() == AppID589 { //目前果园无4123
if resp5873, err := api.Mtwm2API.GetConnectionToken(); err == nil {
r1 := mtwmapi.GetConnTokenResp{}
err = mapstructure.Decode(resp5873, &r1)
urlInfo.UrlSub = MTIMPushUrl + "/" + r1.AppKey + "/" + r1.ConnectionToken
urlInfo.ClientIDSub = r1.AppKey + ":" + r1.ConnectionToken
}
}
return urlInfo
}
// RandString 生成随机字符串
func RandString() string {
bytes := make([]byte, 16)
for i := 0; i < 16; i++ {

View File

@@ -6,6 +6,8 @@ import (
"net/http"
"time"
"git.rosy.net.cn/jx-callback/business/jxutils"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/globals"
@@ -18,16 +20,21 @@ func Init() {
//写入全局变量
Setup()
//建立长链接
go Send([]byte(HeartCheckMsg))
jxutils.CallMsgHandlerAsync(func() {
MtInit()
}, "MtInit:"+RandString())
//启动定时器
PingTimer()
go WriteMessage()
jxutils.CallMsgHandlerAsync(func() {
WriteMessage()
}, "WriteMessage:"+RandString())
go Manager.Start()
jxutils.CallMsgHandlerAsync(func() {
Manager.Start()
}, "Manager Start:"+RandString())
//fmt.Printf("服务器启动成功,端口号:%s\n", CommonSetting.HttpPort)
}
func Run(w http.ResponseWriter, r *http.Request) {
@@ -54,8 +61,8 @@ func Run(w http.ResponseWriter, r *http.Request) {
} else {
clientID = temp
}
globals.SugarLogger.Debugf("Run clientID=%s", clientID)
clientSocket := NewClient(clientID, conn)
clientSocket := NewClient(clientID, conn, ClientTypeJx)
//读取客户端消息
clientSocket.Read()
@@ -69,48 +76,32 @@ func Run(w http.ResponseWriter, r *http.Request) {
Manager.Connect <- clientSocket
}
func StartWebSocket(conn *websocket.Conn, clientID string, err error) {
//设置读取消息大小上线
conn.SetReadLimit(maxMessageSize)
clientSocket := NewClient(clientID, conn)
//读取客户端消息
clientSocket.Read()
if err = ConnRender(conn, renderData{ClientId: clientID}); err != nil {
_ = conn.Close()
return
}
// 用户连接事件
Manager.Connect <- clientSocket
}
// PingTimer 定时器发送心跳
func PingTimer() {
go func() {
ticker := time.NewTicker(heartbeatInterval)
defer ticker.Stop()
//测试用
i := 0
//i := 0
for {
i++
//i++
<-ticker.C
//对美团发送心跳
Send([]byte(HeartCheckMsg))
for clientId, conn := range Manager.AllClient() {
if err := conn.Socket.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
Manager.DisConnect <- conn
globals.SugarLogger.Debugf("发送心跳失败: %s 总连接数:%d", clientId, Manager.Count())
if conn.ClientType == ClientTypeJx {
if err := conn.Socket.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
Manager.DisConnect <- conn
globals.SugarLogger.Debugf("发送心跳失败: %s 总连接数:%d", clientId, Manager.Count())
}
if err := ConnRender(conn.Socket, renderData{ClientId: clientId}); err != nil {
return
}
} else {
if err := conn.Socket.WriteMessage(websocket.TextMessage, []byte(HeartCheckMsg)); err != nil {
//对美团重新建立连接
MtInit()
}
}
if err := ConnRender(conn.Socket, renderData{ClientId: clientId}); err != nil {
return
}
globals.SugarLogger.Debugf("发送心跳 clientId=%s,i=%d", clientId, i)
//globals.SugarLogger.Debugf("发送心跳 clientId=%s,i=%d", clientId, i)
}
}
}()
@@ -121,14 +112,16 @@ func WriteMessage() {
i := 0
for {
clientInfo := <-ToClientChan
//广播发送通知所有客户端
//广播发送通知所有京西客户端
i++
fmt.Printf("WriteMessage clientInfo=%s i=%d", utils.Format4Output(clientInfo, false), i)
if Manager.AllClient() != nil {
for _, conn := range Manager.AllClient() {
globals.SugarLogger.Debugf("WriteMessage conn.ClientId=%s", conn.ClientId)
if err := Render(conn.Socket, clientInfo.MessageId, clientInfo.Code, clientInfo.Msg, clientInfo.Data); err != nil {
Manager.DisConnect <- conn
if conn.ClientType == ClientTypeJx { //只发送给京西
globals.SugarLogger.Debugf("WriteMessage conn.ClientId=%s", conn.ClientId)
if err := Render(conn.Socket, clientInfo.MessageId, clientInfo.Code, clientInfo.Msg, clientInfo.Data); err != nil {
Manager.DisConnect <- conn
}
}
}
} else {
@@ -162,6 +155,8 @@ func (c *Client) Read() {
go func() {
for {
messageType, msg, err := c.Socket.ReadMessage()
temp := string(msg)
fmt.Print(temp)
if err != nil {
if messageType == -1 && websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
Manager.DisConnect <- c
@@ -169,10 +164,8 @@ func (c *Client) Read() {
} else if messageType != websocket.PingMessage {
return
}
} else {
SendToVendor(msg)
return
}
SendToVendor(msg)
}
}()
}
@@ -197,10 +190,11 @@ func (manager *ClientManager) EventDisconnect(client *Client) {
//以下为客户端Client操作*******************************************
// NewClient 初始化Client
func NewClient(clientId string, socket *websocket.Conn) *Client {
func NewClient(clientId string, socket *websocket.Conn, clientType string) *Client {
return &Client{
ClientId: clientId,
Socket: socket,
ClientType: clientType,
ConnectTime: uint64(time.Now().Unix()),
IsDeleted: false,
}

View File

@@ -122,6 +122,10 @@ func (p *PurchaseHandler) Map2Order(orderData map[string]interface{}) (order *mo
// 因为美团外卖不能自动设置商家门店号,且只能通过商家门店号来访问门店,
// 为了在后台设置简单一致把app_poi_code直接当成平台门店号使用(即在后台设置时,平台门店号与商家门店号一样)
// 订单中wm_poi_id实际来平台门店号app_poi_code为商家门店号这样一来这两个就相同了
//_修改为,
caution := strings.ReplaceAll(utils.Interface2String(result["caution"]), "_", ",")
order = &model.GoodsOrder{
VendorOrderID: vendorOrderID,
// VendorOrderID2: utils.Int64ToStr(utils.MustInterface2Int64(result["wm_order_id_view"])),
@@ -130,12 +134,13 @@ func (p *PurchaseHandler) Map2Order(orderData map[string]interface{}) (order *mo
StoreID: 0,
// VendorStoreID: utils.Int64ToStr(utils.MustInterface2Int64(result["wm_poi_id"])),
// StoreID: int(utils.Str2Int64WithDefault(utils.Interface2String(result["app_poi_code"]), 0)),
StoreName: result["wm_poi_name"].(string),
ConsigneeName: result["recipient_name"].(string),
ConsigneeMobile: jxutils.FormalizeMobile(result["recipient_phone"].(string)),
ConsigneeAddress: result["recipient_address"].(string),
CoordinateType: model.CoordinateTypeMars,
BuyerComment: utils.TrimBlankChar(utils.Interface2String(result["caution"])),
StoreName: result["wm_poi_name"].(string),
ConsigneeName: result["recipient_name"].(string),
ConsigneeMobile: jxutils.FormalizeMobile(result["recipient_phone"].(string)),
ConsigneeAddress: result["recipient_address"].(string),
CoordinateType: model.CoordinateTypeMars,
//BuyerComment: utils.TrimBlankChar(utils.Interface2String(result["caution"])),
BuyerComment: utils.TrimBlankChar(caution),
ExpectedDeliveredTime: getTimeFromTimestamp(utils.Interface2Int64WithDefault(result["delivery_time"], 0)),
PickDeadline: utils.DefaultTimeValue,
VendorStatus: utils.Int64ToStr(utils.MustInterface2Int64(result["status"])),