diff --git a/business/partner/purchase/im/im.go b/business/partner/purchase/im/im.go index 5e8fd4d02..10ed3b540 100644 --- a/business/partner/purchase/im/im.go +++ b/business/partner/purchase/im/im.go @@ -5,7 +5,7 @@ import ( "errors" "fmt" - "git.rosy.net.cn/jx-callback/business/jxutils" + "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/baseapi/utils/errlist" @@ -17,192 +17,53 @@ import ( "git.rosy.net.cn/baseapi/utils" push "git.rosy.net.cn/jx-callback/business/jxutils/unipush" "git.rosy.net.cn/jx-callback/globals/api" - "github.com/gorilla/websocket" ) -// SendToVendor 向平台发消息 -func SendToVendor(msg []byte) { - var ( - sendData SendData - err error - elmAppID = api.EbaiAPI.GetSource() - ) - - //解析数据 - if err = json.Unmarshal(msg, &sendData); err != nil { - return - } - - //存储数据 - ReadMsgFromClient(sendData.VendorID, elmAppID, sendData.Data) - - //发送信息 - if sendData.VendorID == VendorIDMT { - temp, _ := json.Marshal(sendData.Data) - if sendData.Data.(map[string]interface{})["app_id"] == nil { - globals.SugarLogger.Debug("SendToVendor appId=null") - return - } - Send(temp, sendData.Data.(map[string]interface{})["app_id"]) - return - } - if sendData.VendorID == VendorIDELM { - param := sendData.Data.(ebaiapi.BusinessSendMsgReq) - if err := api.EbaiAPI.BusinessSendMsg(¶m); err != nil { - globals.SugarLogger.Debugf("elm发送信息错误:%v", err) - return +func SendVendorV2(data SendData) (err error) { + if data.VendorID == model.VendorIDMTWM { + dataStr, _ := json.Marshal(data.Data) + temp := string(dataStr) + fmt.Println(temp) + if _, err = api.MtwmAPI.MsgSend(string(dataStr)); err != nil { + return err } } - - return -} - -func Send(data []byte, appID interface{}) { - //根据appID生成完整url - fullUrl := GenFullUrl(appID.(float64)) //clientID暂时不用 - - conn, resp, err := websocket.DefaultDialer.Dial(fullUrl, nil) - if err != nil || resp.StatusCode != 101 { - fmt.Printf("连接失败:%v http响应不成功", err) - } - //关闭 - defer func(conn *websocket.Conn) { - err := conn.Close() - if err != nil { - return - } - }(conn) - - err = conn.WriteMessage(websocket.TextMessage, data) + //if data.VendorID == model.VendorIDEBAI { //todo 后续添加 + // err = nil + //} + err = ReadMsgFromClient(data.VendorID, "", data.Data) if err != nil { - fmt.Println(err) + globals.SugarLogger.Debugf("SendVendorV2:%v", err) } - - for { - _, msg, err := conn.ReadMessage() - temp := string(msg) - res := JsonCommon(HeartSuccessWord) - fmt.Printf("Send %s receive: %s\n", conn.RemoteAddr(), string(msg)) - if err != nil { - break - } else if temp == res { - continue - } else { - ReadMsgFromVendor(VendorIDMT, "", msg) - } - } -} - -// MtInit 发送心跳 -func MtInit() { - data := []byte(HeartCheckMsg) - //生成完整url - url := GenFullUrl2() - //主连接 - jxutils.CallMsgHandlerAsync(func() { - conn, resp, err := websocket.DefaultDialer.Dial(url.UrlMain, nil) - if err != nil || resp.StatusCode != 101 { - fmt.Printf("连接失败:%v http响应不成功", err) - } - //关闭 - defer func(conn *websocket.Conn) { - err := conn.Close() - if err != nil { - return - } - }(conn) - - //client连接事件 - client := NewClient(url.ClientIDMain, conn, ClientTypeMt) - Manager.Connect <- client - - err = conn.WriteMessage(websocket.TextMessage, data) - if err != nil { - fmt.Println(err) - } - - for { - _, msg, err := conn.ReadMessage() - temp := string(msg) - res := JsonCommon(HeartCheckSuccess) - fmt.Printf("MtInit %s receive: %s\n", conn.RemoteAddr(), string(msg)) - if err != nil { - break - } else if temp == res { - continue - } else { - ReadMsgFromVendor(VendorIDMT, "", msg) - } - } - }, url.ClientIDMain) - - //副连接 - if url.UrlSub != "" { - jxutils.CallMsgHandlerAsync(func() { - connSub, respSub, errSub := websocket.DefaultDialer.Dial(url.UrlSub, nil) - if errSub != nil || respSub.StatusCode != 101 { - fmt.Printf("连接失败:%v http响应不成功", errSub) - } - - //关闭 - defer func(conn *websocket.Conn) { - err := conn.Close() - if err != nil { - return - } - }(connSub) - - //client连接事件 - client := NewClient(url.ClientIDSub, connSub, ClientTypeMt) - Manager.Connect <- client - - errSub = connSub.WriteMessage(websocket.TextMessage, data) - if errSub != nil { - fmt.Println(errSub) - } - - for { - _, msg, err := connSub.ReadMessage() - temp := string(msg) - res := JsonCommon(HeartCheckSuccess) - if err != nil || temp == res { - break - } else { - ReadMsgFromVendor(VendorIDMT, "", msg) - } - fmt.Printf("MtInit %s connSub:receive: %s\n", connSub.RemoteAddr(), string(msg)) - } - - }, url.ClientIDSub) - } - + return nil } // ReadMsgFromClient 存储客户端发送的消息 -func ReadMsgFromClient(vendorID int, elmAppID string, msg interface{}) { +func ReadMsgFromClient(vendorID int, elmAppID string, msg interface{}) error { var ( err error jxMsg = &JXMsg{} + errList errlist.ErrList userList = &UserMessageList{} ) data, err := json.Marshal(msg) if err != nil { - return + errList.AddErr(fmt.Errorf("json处理数据错误:%v", err)) } if vendorID == VendorIDMT { - var MtSingleChat = mtwmapi.SingleChat{} - err = json.Unmarshal(data, &MtSingleChat) + var pushContent = mtwmapi.PushContentReq{} + err = json.Unmarshal(data, &pushContent) jxMsg = &JXMsg{ SendType: SendTypeJx, - MsgContent: MtSingleChat, + MsgContent: pushContent, } userList = &UserMessageList{ VendorID: VendorIDMT, - UserID: utils.Int2Str(MtSingleChat.OpenUserID), - LatestMsg: MtSingleChat.MsgContent, - LatestTime: MtSingleChat.Cts, + UserID: utils.Int2Str(pushContent.OpenUserID), + LatestMsg: pushContent.MsgContent, + LatestTime: pushContent.Cts, } } if vendorID == VendorIDELM { @@ -222,41 +83,44 @@ func ReadMsgFromClient(vendorID int, elmAppID string, msg interface{}) { //1 存储详细聊天记录list if err = SetMessageDetail(jxMsg, vendorID, elmAppID); err != nil { - globals.SugarLogger.Debugf("ReadMsgFromClient SetMessageDetail err:=%v\n", err) - //return + errList.AddErr(fmt.Errorf("存储详细聊天记录错误:%v", err)) } //2 存储展示列表时单条数据 if err = SetUserList(jxMsg, userList, vendorID, elmAppID); err != nil { - globals.SugarLogger.Debugf("ReadMsgFromClient SetUserList err:=%v\n", err) - //return + errList.AddErr(fmt.Errorf("存储STU聊天记录错误:%v", err)) } + if errList.GetErrListAsOne() != nil { + return fmt.Errorf("ReadMsgFromClient:%v", errList.GetErrListAsOne()) + } + return nil } // ReadMsgFromVendor 读取数据并存储到redis -func ReadMsgFromVendor(vendorID int, elmAppID string, msg []byte) { - if string(msg) == "" { - return - } +func ReadMsgFromVendor(vendorID int, elmAppID string, msg []byte) error { var ( - err error - //vendorStoreID string - jxMsg = &JXMsg{} - userList = &UserMessageList{} + err error + jxMsg = &JXMsg{} + vendorStoreID string + errList errlist.ErrList + userList = &UserMessageList{} ) + if string(msg) == "" { + errList.AddErr(fmt.Errorf("读取平台数据为空,请检查")) + } if vendorID == VendorIDMT { - var MtSingleChat = mtwmapi.SingleChat{} - err = json.Unmarshal(msg, &MtSingleChat) + var PushContentReq = mtwmapi.PushContentReq{} + err = json.Unmarshal(msg, &PushContentReq) jxMsg = &JXMsg{ SendType: SendTypeMt, - MsgContent: MtSingleChat, + MsgContent: PushContentReq, } userList = &UserMessageList{ VendorID: VendorIDMT, - UserID: utils.Int2Str(MtSingleChat.OpenUserID), - LatestMsg: MtSingleChat.MsgContent, - LatestTime: MtSingleChat.Cts, + UserID: utils.Int2Str(PushContentReq.OpenUserID), + LatestMsg: PushContentReq.MsgContent, + LatestTime: PushContentReq.Cts, } - //vendorStoreID = MtSingleChat.AppPoiCode + vendorStoreID = PushContentReq.AppPoiCode } if vendorID == VendorIDELM { var ElmData = ebaiapi.ImMessageSend{} @@ -275,23 +139,21 @@ func ReadMsgFromVendor(vendorID int, elmAppID string, msg []byte) { //1 存储详细聊天记录list if err = SetMessageDetail(jxMsg, vendorID, elmAppID); err != nil { - globals.SugarLogger.Debugf("ReadMsgFromVendor SetMessageDetail err:=%v\n", err) - //return + errList.AddErr(fmt.Errorf("存储详细聊天记录错误:%v", err)) } //2 存储展示列表时单条数据 if err = SetUserList(jxMsg, userList, vendorID, elmAppID); err != nil { - globals.SugarLogger.Debugf("ReadMsgFromVendor SetUserList err:=%v\n", err) - //return + errList.AddErr(fmt.Errorf("存储STU聊天记录错误:%v", err)) } //3 cid推送新消息 - //err = PushMsgByCid(vendorStoreID, vendorID) - //4 长链接通知给客户端 - if err != nil { - ToClientChan <- clientInfo{Code: SuccessCode, Msg: fmt.Sprintf("%v", err), Data: jxMsg} - } else { - ToClientChan <- clientInfo{Code: SuccessCode, Msg: SuccessMsg, Data: jxMsg} + if err = PushMsgByCid(vendorStoreID, vendorID); err != nil { + errList.AddErr(fmt.Errorf("向商家cid推送新消息错误:%v", err)) } - return + + if errList.GetErrListAsOne() != nil { + return fmt.Errorf("ReadMsgFromVendor:%v", errList.GetErrListAsOne()) + } + return nil } // PushMsgByCid 通过cid push用户 @@ -303,7 +165,7 @@ func PushMsgByCid(vendorStoreID string, vendorID int) error { } // SetMessageDetail 赋值 -//格式 AppID:AppPoiCode:10:OpenUserID +//格式 AppID:AppPoiCode:1:OpenUserID func SetMessageDetail(req *JXMsg, vendorID int, elmAppID string) error { //生成京西消息ID detail msgID := GenMsgDetailID(req, vendorID, elmAppID) @@ -368,7 +230,7 @@ func GetNewAndTrim(key string, flag string) (cnt int, err error) { // GenMsgDetailID 生成查询详细聊天记录ID func GenMsgDetailID(jxMsg *JXMsg, vendorID int, elmAppID string) (msgID string) { if vendorID == VendorIDMT { - var d1 = jxMsg.MsgContent.(mtwmapi.SingleChat) + var d1 = jxMsg.MsgContent.(mtwmapi.PushContentReq) msgID = utils.Int2Str(d1.AppID) + ":" + d1.AppPoiCode + ":1:" + utils.Int2Str(d1.OpenUserID) } if vendorID == VendorIDELM { @@ -381,7 +243,7 @@ func GenMsgDetailID(jxMsg *JXMsg, vendorID int, elmAppID string) (msgID string) // GenMsgListID 生成展示列表时单条数据ID(部分) func GenMsgListID(jxMsg *JXMsg, vendorID int, elmAppID string) (msgID string) { if vendorID == VendorIDMT { - var d1 = jxMsg.MsgContent.(mtwmapi.SingleChat) + var d1 = jxMsg.MsgContent.(mtwmapi.PushContentReq) msgID = utils.Int2Str(d1.AppID) + ":" + d1.AppPoiCode + ":1" } if vendorID == VendorIDELM { diff --git a/business/partner/purchase/im/im_model.go b/business/partner/purchase/im/im_model.go index d69edcaa9..4a6789c03 100644 --- a/business/partner/purchase/im/im_model.go +++ b/business/partner/purchase/im/im_model.go @@ -1,85 +1,11 @@ package im import ( - "encoding/json" - "flag" - "fmt" - "io" - "log" - r "math/rand" - "net" - "net/http/httptest" - "sync" "time" - "git.rosy.net.cn/baseapi/platformapi/mtwmapi" "git.rosy.net.cn/jx-callback/globals/api" - "github.com/gazeboxu/mapstructure" - "github.com/gorilla/websocket" - "gopkg.in/ini.v1" ) -// ClientManager 连接管理 -type ClientManager struct { - ClientIdMap map[string]*Client // 全部的连接 - ClientIdMapLock sync.RWMutex // 读写锁 - - Connect chan *Client // 连接处理 - DisConnect chan *Client // 断开连接处理 - - GroupLock sync.RWMutex - Groups map[string][]string - - SystemClientsLock sync.RWMutex - SystemClients map[string][]string -} - -// Client 客户端连接信息 -type Client struct { - ClientId string // 标识ID - Socket *websocket.Conn // 用户连接 - ClientType string //标识是美团/客户端长链接 - ConnectTime uint64 // 首次连接时间 - IsDeleted bool // 是否删除或下线 - UserId string // 业务端标识用户ID - //Extend string // 扩展字段,用户可以自定义 - //GroupList []string -} - -//channel通道结构体 -type clientInfo struct { - ClientId string `json:"clientId" validate:"required"` //链接ID - Data interface{} - SendUserId string - MessageId string - Code int - Msg string -} - -// RetData 统一返回值结构体 -type RetData struct { - Code int `json:"code"` //响应code - Msg string `json:"msg"` //响应msg success/fail - //MsgType string `json:"msgType"` //发送消息方类型:mt;elm;jx - Data interface{} `json:"data"` //信息 - - //MessageType string `json:"messageType"` //消息类型 heart-心跳检测;send-发送消息;receive-接收消息 - //MessageId string `json:"messageId"` //发送/接收信息 id - //UserId string `json:"userId"` //必须是平台方userID -} - -type global struct { - LocalHost string //本机内网IP - ServerList map[string]string - ServerListLock sync.RWMutex -} -type commonConf struct { - HttpPort string - RPCPort string - Cluster bool - CryptoKey string -} - // SendData 客户端写入参数 type SendData struct { VendorID int `json:"vendorID"` //消息来源平台ID 1-美团 3-饿了么 @@ -92,23 +18,9 @@ type JXMsg struct { MsgContent interface{} `json:"msgContent"` //美团/饿了么 单聊消息 } -// GetUserListReq 获取门店用户聊天列表 -type GetUserListReq struct { - VendorStoreID string `json:"vendorStoreID"` //平台门店id - VendorID string `json:"vendorID"` //平台标识id - AppID string `json:"appID"` //应用ID -} - -type GetChatDetailReq struct { - VendorStoreID string `json:"vendorStoreID"` //平台门店id - VendorID string `json:"vendorID"` //平台标识id - AppID string `json:"appID"` //应用ID - UserID string `json:"userID"` //userID/groupID -} - // UserMessageList 用户消息列表 type UserMessageList struct { - VendorID int `json:"vendorID"` //平台品牌 10-美团 11-饿了么 + VendorID int `json:"vendorID"` //平台品牌 1-美团 3-饿了么 UserID string `json:"userID"` //用户ID NewMessageNum int `json:"NewMessageNum"` //新消息数量 LatestMsg string `json:"latestMsg"` //最新一条消息 @@ -128,222 +40,16 @@ type UserRelInfo struct { UserID string `json:"userID"` //用户id/groupID } -// UrlInfo 生成美团长链接url信息 -type UrlInfo struct { - UrlMain string `json:"urlMain"` //主连接路由 - ClientIDMain string `json:"ClientIDMain"` //主连接id - UrlSub string `json:"urlSub"` //副连接路由 - ClientIDSub string `json:"ClientIDSub"` //副连接id -} - var ( - cfg *ini.File - rdb = api.Cacher - //客户端相关 - Manager = NewClientManager() // 管理者 - ToClientChan chan clientInfo - ClientTypeJx = "jx" //京西客户端 - ClientTypeMt = "mt" //美团客户端 - //配置文件 - CommonSetting = &commonConf{} - GlobalSetting = &global{} - //心跳相关 - heartbeatInterval = 20 * time.Second // 心跳间隔 - HeartCheckMsg = "~#HHHBBB#~" //心跳检测消息 - HeartCheckSuccess = "HB" //成功发送返回心跳消息 - HeartSuccessWord = "成功" //成功发送返回心跳消息 - //平台标识 - AppID5873 = float64(5873) - AppID589 = "589" - VendorIDMT = 1 //im美团 - VendorIDELM = 3 //im饿了么 - SendTypeJx = "jx" //京西客户端发送方标识 - SendTypeMt = "mt" //美团用户发送方标识符 - SendTypeElm = "elm" //饿了么用户发送方标识符 - MTIMPushUrl = "wss://wpush.meituan.com/websocket" //buildPushConnect建立长连接 + rdb = api.Cacher + VendorIDMT = 1 //im美团 + VendorIDELM = 3 //im饿了么 + SendTypeJx = "jx" //京西客户端发送方标识 + SendTypeMt = "mt" //美团用户发送方标识符 + SendTypeElm = "elm" //饿了么用户发送方标识符 ) const ( - ExpireTimeDay = 2 * time.Hour //redis一天过期时间 - maxMessageSize = 8192 // 最大的消息大小 + ExpireTimeDay = 4 * time.Hour //redis过期时间 ) - -type renderData struct { - ClientId string `json:"clientId"` -} - -const ( - SuccessCode = 0 - SuccessMsg = "success" - FailCode = -1 - FailMsg = "fail" - - SYSTEM_ID_ERROR = -1001 - ONLINE_MESSAGE_CODE = 1001 - OFFLINE_MESSAGE_CODE = 1002 -) - -// Render 统一返回值 -func Render(conn *websocket.Conn, messageId string, code int, message string, data interface{}) error { - if data != nil { - str, _ := json.Marshal(data) - temp := string(str) - return conn.WriteJSON(RetData{ - Code: code, - Msg: message, - //MsgType: temp.SendType, - Data: temp, - }) - } else { - return conn.WriteJSON(RetData{ - Code: code, - Msg: message, - }) - } -} - -// ClientRender http响应 -func ClientRender(code int, msg string) (str string) { - var retData RetData - - retData.Code = code - retData.Msg = msg - //retData.Data = data - - retJson, _ := json.Marshal(retData) - str = string(retJson) - - w := httptest.NewRecorder() - w.Header().Set("Content-Type", "application/json; charset=utf-8") - _, _ = io.WriteString(w, str) - return -} - -func ConnRender(conn *websocket.Conn, data interface{}) (err error) { - err = conn.WriteJSON(RetData{ - Code: SuccessCode, - Msg: "success", - Data: data, - }) - return -} - -// Default 给默认值 -func Default() { - CommonSetting = &commonConf{ - HttpPort: "6000", - RPCPort: "7000", - Cluster: false, - CryptoKey: "Adba723b7fe06819", - } - - GlobalSetting = &global{ - LocalHost: getIntranetIp(), - ServerList: make(map[string]string), - } -} - -// Setup 初始化全局设置变量 -func Setup() { - configFile := flag.String("c", "conf/app.ini", "-c conf/app.ini") - - var err error - cfg, err = ini.Load(*configFile) - if err != nil { - log.Fatalf("setting.Setup, fail to parse 'conf/app.ini': %v", err) - } - - mapTo("common", CommonSetting) - - GlobalSetting = &global{ - LocalHost: getIntranetIp(), - ServerList: make(map[string]string), - } - fmt.Printf("LocalHost=%s\n ", GlobalSetting.LocalHost) -} - -// mapTo map section -func mapTo(section string, v interface{}) { - err := cfg.Section(section).MapTo(v) - if err != nil { - log.Fatalf("Cfg.MapTo %s err: %v", section, err) - } -} - -//获取本机IP -func getIntranetIp() string { - addrs, _ := net.InterfaceAddrs() - for _, addr := range addrs { - // 检查ip地址判断是否回环地址 - if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { - if ipnet.IP.To4() != nil { - return ipnet.IP.String() - } - - } - } - return "" -} - -// GenFullUrl 组装完整websocket url以及生成clientID -func GenFullUrl(appID float64) (fullUrl string) { - if appID == AppID5873 { - if resp5873, err := api.Mtwm2API.GetConnectionToken(); err == nil { - r1 := mtwmapi.GetConnTokenResp{} - err = mapstructure.Decode(resp5873, &r1) - fullUrl = MTIMPushUrl + "/" + r1.AppKey + "/" + r1.ConnectionToken - return fullUrl - } - } - //589/4123 - resp, err := api.MtwmAPI.GetConnectionToken() - if err != nil { - return "" - } - retVal := mtwmapi.GetConnTokenResp{} - err = mapstructure.Decode(resp, &retVal) - fullUrl = MTIMPushUrl + "/" + retVal.AppKey + "/" + retVal.ConnectionToken - return fullUrl -} - -// GenFullUrl2 组装完整websocket url以及生成clientID -func GenFullUrl2() *UrlInfo { - urlInfo := &UrlInfo{} - //1 589/4123 - resp, err := api.MtwmAPI.GetConnectionToken() - if err != nil { - return nil - } - retVal := mtwmapi.GetConnTokenResp{} - err = mapstructure.Decode(resp, &retVal) - urlInfo.UrlMain = MTIMPushUrl + "/" + retVal.AppKey + "/" + retVal.ConnectionToken - urlInfo.ClientIDMain = retVal.AppKey + ":" + retVal.ConnectionToken - - if api.MtwmAPI.GetAppID() == AppID589 { //目前果园无4123 - if resp5873, err := api.Mtwm2API.GetConnectionToken(); err == nil { - r1 := mtwmapi.GetConnTokenResp{} - err = mapstructure.Decode(resp5873, &r1) - urlInfo.UrlSub = MTIMPushUrl + "/" + r1.AppKey + "/" + r1.ConnectionToken - urlInfo.ClientIDSub = r1.AppKey + ":" + r1.ConnectionToken - } - } - return urlInfo -} - -// RandString 生成随机字符串 -func RandString() string { - bytes := make([]byte, 16) - for i := 0; i < 16; i++ { - b := r.Intn(26) + 65 - bytes[i] = byte(b) - } - return string(bytes) -} - -// JsonCommon json格式化 -func JsonCommon(str string) (retVal string) { - temp, _ := json.Marshal(str) - _ = json.Unmarshal(temp, &retVal) - return retVal -} diff --git a/business/partner/purchase/im/im_server.go b/business/partner/purchase/im/im_server.go deleted file mode 100644 index 27f5a489a..000000000 --- a/business/partner/purchase/im/im_server.go +++ /dev/null @@ -1,259 +0,0 @@ -package im - -import ( - "errors" - "fmt" - "net/http" - "time" - - "git.rosy.net.cn/jx-callback/business/jxutils" - - "git.rosy.net.cn/jx-callback/globals" - "github.com/gorilla/websocket" -) - -func Init() { - //初始化 - ToClientChan = make(chan clientInfo, 1000) - //写入全局变量 - Setup() - //建立长链接 - jxutils.CallMsgHandlerAsync(func() { - MtInit() - }, "MtInit:"+RandString()) - - //启动定时器 - PingTimer() - - jxutils.CallMsgHandlerAsync(func() { - WriteMessage() - }, "WriteMessage:"+RandString()) - - jxutils.CallMsgHandlerAsync(func() { - Manager.Start() - }, "Manager Start:"+RandString()) - -} - -func Run(w http.ResponseWriter, r *http.Request) { - conn, err := (&websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - // 允许所有CORS跨域请求 - CheckOrigin: func(r *http.Request) bool { - return true - }, - }).Upgrade(w, r, nil) - if err != nil { - globals.SugarLogger.Debugf("upgrade error: %v", err) - http.NotFound(w, r) - return - } - - //设置读取消息大小上线 - conn.SetReadLimit(maxMessageSize) - - clientID := "" - if temp := r.Header.Get("Clientid"); len(temp) == 0 { - clientID = RandString() - } else { - clientID = temp - } - - clientSocket := NewClient(clientID, conn, ClientTypeJx) - - //读取客户端消息 - clientSocket.Read() - - if err = ConnRender(conn, renderData{ClientId: clientID}); err != nil { - _ = conn.Close() - return - } - - // 用户连接事件 - Manager.Connect <- clientSocket -} - -// PingTimer 定时器发送心跳 -func PingTimer() { - go func() { - ticker := time.NewTicker(heartbeatInterval) - defer ticker.Stop() - for { - <-ticker.C - for clientId, conn := range Manager.AllClient() { - if conn.ClientType == ClientTypeJx { - if err := conn.Socket.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { - Manager.DisConnect <- conn - fmt.Printf("发送心跳失败: %s 总连接数:%d", clientId, Manager.Count()) - } - if err := ConnRender(conn.Socket, renderData{ClientId: clientId}); err != nil { - return - } - globals.SugarLogger.Debugf("PingTimer jx clientID ") - } else { - fmt.Printf("PingTimer mt心跳,conn%s,%s", conn.ClientId, conn.ClientType) - if err := conn.Socket.WriteMessage(websocket.TextMessage, []byte(HeartCheckMsg)); err != nil { - fmt.Printf("PingTimer mtHeartBeat err:%v", err) - //对美团重新建立连接 - MtInit() - } - - } - } - } - }() -} - -// WriteMessage 监听并发送给客户端信息 -func WriteMessage() { - i := 0 - for { - clientInfo := <-ToClientChan - //广播发送通知所有京西客户端 - i++ - //fmt.Printf("WriteMessage clientInfo=%s i=%d\n", utils.Format4Output(clientInfo, false), i) - if Manager.AllClient() != nil { - for _, conn := range Manager.AllClient() { - if conn.ClientType == ClientTypeJx { //只发送给京西 - fmt.Printf("WriteMessage conn.ClientId=%s\n", conn.ClientId) - if err := Render(conn.Socket, clientInfo.MessageId, clientInfo.Code, clientInfo.Msg, clientInfo.Data); err != nil { - Manager.DisConnect <- conn - } - } - } - } else { - globals.SugarLogger.Debugf("无客户端连接,请检查") - return - } - } -} - -// Start 管道处理程序 -func (manager *ClientManager) Start() { - for { - select { - case client := <-manager.Connect: - // 建立连接事件 - manager.EventConnect(client) - case conn := <-manager.DisConnect: - // 断开连接事件 - manager.EventDisconnect(conn) - } - } -} - -//从客户端读取数据 -func (c *Client) Read() { - go func() { - for { - messageType, msg, err := c.Socket.ReadMessage() - //temp := string(msg) - //fmt.Print(temp) - if err != nil { - if messageType == -1 && websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) { - Manager.DisConnect <- c - return - } else if messageType != websocket.PingMessage { - return - } - } - fmt.Printf("Client Read:receive: %s\n", string(msg)) - //SendToVendor(msg) - } - }() -} - -// 以下为连接事件操作******************************************* - -// EventConnect 建立连接事件 -func (manager *ClientManager) EventConnect(client *Client) { - manager.AddClient(client) -} - -// EventDisconnect 断开连接事件 -func (manager *ClientManager) EventDisconnect(client *Client) { - //关闭连接 - _ = client.Socket.Close() - manager.DelClient(client) - //标记销毁 - client.IsDeleted = true - client = nil -} - -//以下为客户端Client操作******************************************* - -// NewClient 初始化Client -func NewClient(clientId string, socket *websocket.Conn, clientType string) *Client { - return &Client{ - ClientId: clientId, - Socket: socket, - ClientType: clientType, - ConnectTime: uint64(time.Now().Unix()), - IsDeleted: false, - } -} - -// AddClient 添加客户端 -func (manager *ClientManager) AddClient(client *Client) { - manager.ClientIdMapLock.Lock() - defer manager.ClientIdMapLock.Unlock() - - manager.ClientIdMap[client.ClientId] = client -} - -// DelClient 删除客户端 -func (manager *ClientManager) DelClient(client *Client) { - manager.delClientIdMap(client.ClientId) - -} - -// 删除clientIdMap -func (manager *ClientManager) delClientIdMap(clientId string) { - manager.ClientIdMapLock.Lock() - defer manager.ClientIdMapLock.Unlock() - - delete(manager.ClientIdMap, clientId) -} - -// GetByClientId 通过clientId获取client -func (manager *ClientManager) GetByClientId(clientId string) (*Client, error) { - manager.ClientIdMapLock.RLock() - defer manager.ClientIdMapLock.RUnlock() - - if client, ok := manager.ClientIdMap[clientId]; !ok { - return nil, errors.New("客户端不存在") - } else { - return client, nil - } -} - -// AllClient 获取所有的客户端 -func (manager *ClientManager) AllClient() map[string]*Client { - manager.ClientIdMapLock.RLock() - defer manager.ClientIdMapLock.RUnlock() - - return manager.ClientIdMap -} - -//与客户端的交互操作************************* - -// NewClientManager 初始化客户端管理 -func NewClientManager() (clientManager *ClientManager) { - clientManager = &ClientManager{ - ClientIdMap: make(map[string]*Client), - Connect: make(chan *Client, 10000), - DisConnect: make(chan *Client, 10000), - Groups: make(map[string][]string, 100), - SystemClients: make(map[string][]string, 100), - } - - return -} - -// Count 获取客户端数量 -func (manager *ClientManager) Count() int { - manager.ClientIdMapLock.RLock() - defer manager.ClientIdMapLock.RUnlock() - return len(manager.ClientIdMap) -} diff --git a/business/partner/purchase/mtwm/callback.go b/business/partner/purchase/mtwm/callback.go index ff1c42e4b..8c0a0dc81 100644 --- a/business/partner/purchase/mtwm/callback.go +++ b/business/partner/purchase/mtwm/callback.go @@ -1,6 +1,12 @@ package mtwm import ( + "encoding/json" + "net/http" + "strings" + + "git.rosy.net.cn/jx-callback/business/partner/purchase/im" + "git.rosy.net.cn/baseapi/platformapi/mtwmapi" "git.rosy.net.cn/jx-callback/business/jxutils" "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" @@ -8,8 +14,6 @@ import ( "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model/dao" "git.rosy.net.cn/jx-callback/globals" - "net/http" - "strings" ) // 美团回调接口 @@ -115,3 +119,14 @@ func GetMsgCallBackUrl(msgType, appId string) string { } return interfaceUrl } + +// OnImMsg im消息回调 +func OnImMsg(msg *mtwmapi.ImCallbackMsg) (response *mtwmapi.CallbackResponse) { + if str, err := json.Marshal(msg.PushContent); err == nil { + err = im.ReadMsgFromVendor(model.VendorIDMTWM, "", str) + if err != nil { + globals.SugarLogger.Debugf("OnImMsg提示:%v", err) + } + } + return mtwmapi.SuccessResponse +} diff --git a/controllers/im.go b/controllers/im.go index a748a01fa..a84099571 100644 --- a/controllers/im.go +++ b/controllers/im.go @@ -13,15 +13,6 @@ type IMController struct { web.Controller } -// @Title IM初始化长链接 -// @Description IM初始化长链接 -// @Success 200 {object} controllers.CallResult -// @Failure 200 {object} controllers.CallResult -// @router /StartWebSocket [get] -func (c *IMController) StartWebSocket() { - im.Run(c.Ctx.ResponseWriter, c.Ctx.Request) -} - // @Title IM获取门店用户聊天列表 // @Description IM获取门店用户聊天列表 // @Param token header string true "认证token" @@ -73,24 +64,24 @@ func (c *IMController) SetImMsgRead() { }) } -// @Title 向平台商发送信息 -// @Description 向平台商发送信息 +// @Title 向平台商发送信息(https方式) +// @Description 向平台商发送信息(https方式) // @Param token header string true "认证token" // @Param sendData formData string true "平台商消息结构体" // @Success 200 {object} controllers.CallResult // @Failure 200 {object} controllers.CallResult -// @router /SendToVendor [post] -func (c *IMController) SendToVendor() { - c.callSendToVendor(func(params *tImSendToVendorParams) (retVal interface{}, errCode string, err error) { +// @router /SendToVendorV2 [post] +func (c *IMController) SendToVendorV2() { + c.callSendToVendorV2(func(params *tImSendToVendorV2Params) (retVal interface{}, errCode string, err error) { var sendData im.SendData b := bytes.NewBufferString(params.SendData) decoder := json.NewDecoder(b) - _ = decoder.Decode(&sendData) - fmt.Println(sendData) - - if dataStr, err := json.Marshal(sendData); err == nil { - im.SendToVendor(dataStr) + if err = decoder.Decode(&sendData); err == nil { + fmt.Println(sendData) + if err = im.SendVendorV2(sendData); err != nil { + return nil, "", err + } } - return nil, "", err + return nil, "", nil }) } diff --git a/controllers/mtwm_callback.go b/controllers/mtwm_callback.go index c505227cc..c4ad7b0c8 100644 --- a/controllers/mtwm_callback.go +++ b/controllers/mtwm_callback.go @@ -87,3 +87,16 @@ func (c *MtwmController) SkuDelete() { func (c *MtwmController) StoreBind() { c.onCallbackMsg(mtwmapi.MsgTypeStoreBind) } + +func (c *MtwmController) IMCallback() { + c.Data["json"] = mtwmapi.Err2CallbackResponse(nil, "") + msg, callbackResponse := api.MtwmAPI.GetIMCallbackMsg(c.Ctx.Request) + if callbackResponse == nil { + callbackResponse = mtwm.OnImMsg(msg) + if callbackResponse == nil { + callbackResponse = mtwmapi.Err2CallbackResponse(nil, "") + } + } + c.Data["json"] = callbackResponse + c.ServeJSON() +} diff --git a/globals/api/api.go b/globals/api/api.go index db27ab9fc..1c3f208ef 100644 --- a/globals/api/api.go +++ b/globals/api/api.go @@ -131,7 +131,7 @@ var ( LogisticsApi *ali_logistics_query.API // 阿里云提供获取物流订单的配送信息 KuaiShouApi *kuaishou_mini.API // 快手平台 UniAppApi *uinapp.API // uinapp 消息通知 - TaoVegetableApi tao_vegetable.API // 淘菜菜 + TaoVegetableApi *tao_vegetable.API // 淘菜菜 ) func init() { diff --git a/main.go b/main.go index 3276adf0b..c48dcc0d0 100644 --- a/main.go +++ b/main.go @@ -8,8 +8,6 @@ import ( "os" "time" - "git.rosy.net.cn/jx-callback/business/partner/purchase/im" - "git.rosy.net.cn/jx-callback/business/enterprise" "git.rosy.net.cn/jx-callback/business/partner/purchase/jdshop" @@ -93,13 +91,6 @@ func Init() { misc.Init() enterprise.Init() // 初始化enterprise key - im.Init() //初始化ws连接 - - //test - //mux := http.NewServeMux() - //mux.HandleFunc("/v2/im/SendToVendor", im.Run) - //go http.ListenAndServe(":8082", mux) - } // 返回true表示非运行服务 diff --git a/routers/commentsRouter_controllers.go b/routers/commentsRouter_controllers.go index f24364bdd..e5f20286e 100644 --- a/routers/commentsRouter_controllers.go +++ b/routers/commentsRouter_controllers.go @@ -4395,20 +4395,12 @@ func init() { Params: nil}) web.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:IMController"] = append(web.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:IMController"], web.ControllerComments{ - Method: "SendToVendor", - Router: `/SendToVendor`, + Method: "SendToVendorV2", + Router: `/SendToVendorV2`, AllowHTTPMethods: []string{"post"}, MethodParams: param.Make(), Filters: nil, Params: nil}) - web.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:IMController"] = append(web.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:IMController"], - web.ControllerComments{ - Method: "StartWebSocket", - Router: `/StartWebSocket`, - AllowHTTPMethods: []string{"get"}, - MethodParams: param.Make(), - Filters: nil, - Params: nil}) //web.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:FnController"] = append(web.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:FnController"], // web.ControllerComments{