1
This commit is contained in:
@@ -2,6 +2,8 @@ package basesch
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
tiktokShop "git.rosy.net.cn/baseapi/platformapi/tiktok_shop/tiktok_api"
|
tiktokShop "git.rosy.net.cn/baseapi/platformapi/tiktok_shop/tiktok_api"
|
||||||
"git.rosy.net.cn/baseapi/utils"
|
"git.rosy.net.cn/baseapi/utils"
|
||||||
"git.rosy.net.cn/jx-callback/business/jxcallback/scheduler"
|
"git.rosy.net.cn/jx-callback/business/jxcallback/scheduler"
|
||||||
@@ -12,7 +14,6 @@ import (
|
|||||||
"git.rosy.net.cn/jx-callback/business/model/dao"
|
"git.rosy.net.cn/jx-callback/business/model/dao"
|
||||||
"git.rosy.net.cn/jx-callback/business/partner"
|
"git.rosy.net.cn/jx-callback/business/partner"
|
||||||
"git.rosy.net.cn/jx-callback/globals"
|
"git.rosy.net.cn/jx-callback/globals"
|
||||||
"strings"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -159,6 +160,7 @@ func (c *BaseScheduler) SelfDeliverDelivering(order *model.GoodsOrder, userName
|
|||||||
} else {
|
} else {
|
||||||
if err := partner.GetPurchaseOrderHandlerFromVendorID(order.VendorID).SelfDeliverDelivering(order, userName); err != nil && (err != scheduler.ErrOrderStatusAlreadySatisfyCurOperation) {
|
if err := partner.GetPurchaseOrderHandlerFromVendorID(order.VendorID).SelfDeliverDelivering(order, userName); err != nil && (err != scheduler.ErrOrderStatusAlreadySatisfyCurOperation) {
|
||||||
partner.CurOrderManager.OnOrderMsg(order, "SelfDeliverDelivering 调用[SelfDeliverDelivering]转自送", err.Error())
|
partner.CurOrderManager.OnOrderMsg(order, "SelfDeliverDelivering 调用[SelfDeliverDelivering]转自送", err.Error())
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
|||||||
@@ -4,6 +4,11 @@ import (
|
|||||||
"crypto/sha1"
|
"crypto/sha1"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"git.rosy.net.cn/baseapi/platformapi/mtpsapi"
|
"git.rosy.net.cn/baseapi/platformapi/mtpsapi"
|
||||||
"git.rosy.net.cn/baseapi/utils"
|
"git.rosy.net.cn/baseapi/utils"
|
||||||
"git.rosy.net.cn/jx-callback/business/jxcallback/orderman"
|
"git.rosy.net.cn/jx-callback/business/jxcallback/orderman"
|
||||||
@@ -17,10 +22,6 @@ import (
|
|||||||
"git.rosy.net.cn/jx-callback/globals/api"
|
"git.rosy.net.cn/jx-callback/globals/api"
|
||||||
"github.com/astaxie/beego/client/orm"
|
"github.com/astaxie/beego/client/orm"
|
||||||
beego "github.com/astaxie/beego/server/web"
|
beego "github.com/astaxie/beego/server/web"
|
||||||
"net/http"
|
|
||||||
"net/url"
|
|
||||||
"sort"
|
|
||||||
"strings"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -243,12 +244,15 @@ func (c *DeliveryHandler) callbackMsg2Waybill(msg *mtpsapi.CallbackOrderMsg) (re
|
|||||||
StatusTime: utils.Timestamp2Time(msg.Timestamp),
|
StatusTime: utils.Timestamp2Time(msg.Timestamp),
|
||||||
Remark: msg.CancelReason,
|
Remark: msg.CancelReason,
|
||||||
}
|
}
|
||||||
retVal.VendorOrderID, retVal.OrderVendorID = jxutils.SplitUniversalOrderID(msg.OrderID)
|
var good *model.GoodsOrder
|
||||||
good, err := partner.CurOrderManager.LoadOrder(msg.OrderID, model.VendorIDMTWM)
|
sql := `SELECT * FROM goods_order WHERE vendor_order_id = ? ORDER BY order_created_at DESC LIMIT 1 OFFSET 0`
|
||||||
|
sqlParams := []interface{}{msg.OrderID}
|
||||||
|
err := dao.GetRow(dao.GetDB(), &good, sql, sqlParams)
|
||||||
if err != nil || good == nil || good.VendorOrderID == "" {
|
if err != nil || good == nil || good.VendorOrderID == "" {
|
||||||
retVal.OrderVendorID = 0
|
retVal.OrderVendorID = 0
|
||||||
} else {
|
} else {
|
||||||
retVal.OrderVendorID = good.VendorID
|
retVal.OrderVendorID = good.VendorID
|
||||||
|
retVal.VendorOrderID = good.VendorOrderID
|
||||||
}
|
}
|
||||||
return retVal, good
|
return retVal, good
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"git.rosy.net.cn/jx-callback/business/jxutils"
|
"git.rosy.net.cn/jx-callback/business/model"
|
||||||
|
|
||||||
"git.rosy.net.cn/baseapi/utils/errlist"
|
"git.rosy.net.cn/baseapi/utils/errlist"
|
||||||
|
|
||||||
@@ -17,192 +17,53 @@ import (
|
|||||||
"git.rosy.net.cn/baseapi/utils"
|
"git.rosy.net.cn/baseapi/utils"
|
||||||
push "git.rosy.net.cn/jx-callback/business/jxutils/unipush"
|
push "git.rosy.net.cn/jx-callback/business/jxutils/unipush"
|
||||||
"git.rosy.net.cn/jx-callback/globals/api"
|
"git.rosy.net.cn/jx-callback/globals/api"
|
||||||
"github.com/gorilla/websocket"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// SendToVendor 向平台发消息
|
func SendVendorV2(data SendData) (err error) {
|
||||||
func SendToVendor(msg []byte) {
|
if data.VendorID == model.VendorIDMTWM {
|
||||||
var (
|
dataStr, _ := json.Marshal(data.Data)
|
||||||
sendData SendData
|
temp := string(dataStr)
|
||||||
err error
|
fmt.Println(temp)
|
||||||
elmAppID = api.EbaiAPI.GetSource()
|
if _, err = api.MtwmAPI.MsgSend(string(dataStr)); err != nil {
|
||||||
)
|
return err
|
||||||
|
|
||||||
//解析数据
|
|
||||||
if err = json.Unmarshal(msg, &sendData); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
//存储数据
|
|
||||||
ReadMsgFromClient(sendData.VendorID, elmAppID, sendData.Data)
|
|
||||||
|
|
||||||
//发送信息
|
|
||||||
if sendData.VendorID == VendorIDMT {
|
|
||||||
temp, _ := json.Marshal(sendData.Data)
|
|
||||||
if sendData.Data.(map[string]interface{})["app_id"] == nil {
|
|
||||||
globals.SugarLogger.Debug("SendToVendor appId=null")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
Send(temp, sendData.Data.(map[string]interface{})["app_id"])
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if sendData.VendorID == VendorIDELM {
|
|
||||||
param := sendData.Data.(ebaiapi.BusinessSendMsgReq)
|
|
||||||
if err := api.EbaiAPI.BusinessSendMsg(¶m); err != nil {
|
|
||||||
globals.SugarLogger.Debugf("elm发送信息错误:%v", err)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
//if data.VendorID == model.VendorIDEBAI { //todo 后续添加
|
||||||
return
|
// err = nil
|
||||||
}
|
//}
|
||||||
|
err = ReadMsgFromClient(data.VendorID, "", data.Data)
|
||||||
func Send(data []byte, appID interface{}) {
|
|
||||||
//根据appID生成完整url
|
|
||||||
fullUrl := GenFullUrl(appID.(float64)) //clientID暂时不用
|
|
||||||
|
|
||||||
conn, resp, err := websocket.DefaultDialer.Dial(fullUrl, nil)
|
|
||||||
if err != nil || resp.StatusCode != 101 {
|
|
||||||
fmt.Printf("连接失败:%v http响应不成功", err)
|
|
||||||
}
|
|
||||||
//关闭
|
|
||||||
defer func(conn *websocket.Conn) {
|
|
||||||
err := conn.Close()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}(conn)
|
|
||||||
|
|
||||||
err = conn.WriteMessage(websocket.TextMessage, data)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
globals.SugarLogger.Debugf("SendVendorV2:%v", err)
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
for {
|
|
||||||
_, msg, err := conn.ReadMessage()
|
|
||||||
temp := string(msg)
|
|
||||||
res := JsonCommon(HeartSuccessWord)
|
|
||||||
fmt.Printf("Send %s receive: %s\n", conn.RemoteAddr(), string(msg))
|
|
||||||
if err != nil {
|
|
||||||
break
|
|
||||||
} else if temp == res {
|
|
||||||
continue
|
|
||||||
} else {
|
|
||||||
ReadMsgFromVendor(VendorIDMT, "", msg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// MtInit 发送心跳
|
|
||||||
func MtInit() {
|
|
||||||
data := []byte(HeartCheckMsg)
|
|
||||||
//生成完整url
|
|
||||||
url := GenFullUrl2()
|
|
||||||
//主连接
|
|
||||||
jxutils.CallMsgHandlerAsync(func() {
|
|
||||||
conn, resp, err := websocket.DefaultDialer.Dial(url.UrlMain, nil)
|
|
||||||
if err != nil || resp.StatusCode != 101 {
|
|
||||||
fmt.Printf("连接失败:%v http响应不成功", err)
|
|
||||||
}
|
|
||||||
//关闭
|
|
||||||
defer func(conn *websocket.Conn) {
|
|
||||||
err := conn.Close()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}(conn)
|
|
||||||
|
|
||||||
//client连接事件
|
|
||||||
client := NewClient(url.ClientIDMain, conn, ClientTypeMt)
|
|
||||||
Manager.Connect <- client
|
|
||||||
|
|
||||||
err = conn.WriteMessage(websocket.TextMessage, data)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
_, msg, err := conn.ReadMessage()
|
|
||||||
temp := string(msg)
|
|
||||||
res := JsonCommon(HeartCheckSuccess)
|
|
||||||
fmt.Printf("MtInit %s receive: %s\n", conn.RemoteAddr(), string(msg))
|
|
||||||
if err != nil {
|
|
||||||
break
|
|
||||||
} else if temp == res {
|
|
||||||
continue
|
|
||||||
} else {
|
|
||||||
ReadMsgFromVendor(VendorIDMT, "", msg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, url.ClientIDMain)
|
|
||||||
|
|
||||||
//副连接
|
|
||||||
if url.UrlSub != "" {
|
|
||||||
jxutils.CallMsgHandlerAsync(func() {
|
|
||||||
connSub, respSub, errSub := websocket.DefaultDialer.Dial(url.UrlSub, nil)
|
|
||||||
if errSub != nil || respSub.StatusCode != 101 {
|
|
||||||
fmt.Printf("连接失败:%v http响应不成功", errSub)
|
|
||||||
}
|
|
||||||
|
|
||||||
//关闭
|
|
||||||
defer func(conn *websocket.Conn) {
|
|
||||||
err := conn.Close()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}(connSub)
|
|
||||||
|
|
||||||
//client连接事件
|
|
||||||
client := NewClient(url.ClientIDSub, connSub, ClientTypeMt)
|
|
||||||
Manager.Connect <- client
|
|
||||||
|
|
||||||
errSub = connSub.WriteMessage(websocket.TextMessage, data)
|
|
||||||
if errSub != nil {
|
|
||||||
fmt.Println(errSub)
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
_, msg, err := connSub.ReadMessage()
|
|
||||||
temp := string(msg)
|
|
||||||
res := JsonCommon(HeartCheckSuccess)
|
|
||||||
if err != nil || temp == res {
|
|
||||||
break
|
|
||||||
} else {
|
|
||||||
ReadMsgFromVendor(VendorIDMT, "", msg)
|
|
||||||
}
|
|
||||||
fmt.Printf("MtInit %s connSub:receive: %s\n", connSub.RemoteAddr(), string(msg))
|
|
||||||
}
|
|
||||||
|
|
||||||
}, url.ClientIDSub)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadMsgFromClient 存储客户端发送的消息
|
// ReadMsgFromClient 存储客户端发送的消息
|
||||||
func ReadMsgFromClient(vendorID int, elmAppID string, msg interface{}) {
|
func ReadMsgFromClient(vendorID int, elmAppID string, msg interface{}) error {
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
jxMsg = &JXMsg{}
|
jxMsg = &JXMsg{}
|
||||||
|
errList errlist.ErrList
|
||||||
userList = &UserMessageList{}
|
userList = &UserMessageList{}
|
||||||
)
|
)
|
||||||
|
|
||||||
data, err := json.Marshal(msg)
|
data, err := json.Marshal(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
errList.AddErr(fmt.Errorf("json处理数据错误:%v", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
if vendorID == VendorIDMT {
|
if vendorID == VendorIDMT {
|
||||||
var MtSingleChat = mtwmapi.SingleChat{}
|
var pushContent = mtwmapi.PushContentReq{}
|
||||||
err = json.Unmarshal(data, &MtSingleChat)
|
err = json.Unmarshal(data, &pushContent)
|
||||||
jxMsg = &JXMsg{
|
jxMsg = &JXMsg{
|
||||||
SendType: SendTypeJx,
|
SendType: SendTypeJx,
|
||||||
MsgContent: MtSingleChat,
|
MsgContent: pushContent,
|
||||||
}
|
}
|
||||||
userList = &UserMessageList{
|
userList = &UserMessageList{
|
||||||
VendorID: VendorIDMT,
|
VendorID: VendorIDMT,
|
||||||
UserID: utils.Int2Str(MtSingleChat.OpenUserID),
|
UserID: utils.Int2Str(pushContent.OpenUserID),
|
||||||
LatestMsg: MtSingleChat.MsgContent,
|
LatestMsg: pushContent.MsgContent,
|
||||||
LatestTime: MtSingleChat.Cts,
|
LatestTime: pushContent.Cts,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if vendorID == VendorIDELM {
|
if vendorID == VendorIDELM {
|
||||||
@@ -222,41 +83,44 @@ func ReadMsgFromClient(vendorID int, elmAppID string, msg interface{}) {
|
|||||||
|
|
||||||
//1 存储详细聊天记录list
|
//1 存储详细聊天记录list
|
||||||
if err = SetMessageDetail(jxMsg, vendorID, elmAppID); err != nil {
|
if err = SetMessageDetail(jxMsg, vendorID, elmAppID); err != nil {
|
||||||
globals.SugarLogger.Debugf("ReadMsgFromClient SetMessageDetail err:=%v\n", err)
|
errList.AddErr(fmt.Errorf("存储详细聊天记录错误:%v", err))
|
||||||
//return
|
|
||||||
}
|
}
|
||||||
//2 存储展示列表时单条数据
|
//2 存储展示列表时单条数据
|
||||||
if err = SetUserList(jxMsg, userList, vendorID, elmAppID); err != nil {
|
if err = SetUserList(jxMsg, userList, vendorID, elmAppID); err != nil {
|
||||||
globals.SugarLogger.Debugf("ReadMsgFromClient SetUserList err:=%v\n", err)
|
errList.AddErr(fmt.Errorf("存储STU聊天记录错误:%v", err))
|
||||||
//return
|
|
||||||
}
|
}
|
||||||
|
if errList.GetErrListAsOne() != nil {
|
||||||
|
return fmt.Errorf("ReadMsgFromClient:%v", errList.GetErrListAsOne())
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadMsgFromVendor 读取数据并存储到redis
|
// ReadMsgFromVendor 读取数据并存储到redis
|
||||||
func ReadMsgFromVendor(vendorID int, elmAppID string, msg []byte) {
|
func ReadMsgFromVendor(vendorID int, elmAppID string, msg []byte) error {
|
||||||
if string(msg) == "" {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
//vendorStoreID string
|
jxMsg = &JXMsg{}
|
||||||
jxMsg = &JXMsg{}
|
vendorStoreID string
|
||||||
userList = &UserMessageList{}
|
errList errlist.ErrList
|
||||||
|
userList = &UserMessageList{}
|
||||||
)
|
)
|
||||||
|
if string(msg) == "" {
|
||||||
|
errList.AddErr(fmt.Errorf("读取平台数据为空,请检查"))
|
||||||
|
}
|
||||||
if vendorID == VendorIDMT {
|
if vendorID == VendorIDMT {
|
||||||
var MtSingleChat = mtwmapi.SingleChat{}
|
var PushContentReq = mtwmapi.PushContentReq{}
|
||||||
err = json.Unmarshal(msg, &MtSingleChat)
|
err = json.Unmarshal(msg, &PushContentReq)
|
||||||
jxMsg = &JXMsg{
|
jxMsg = &JXMsg{
|
||||||
SendType: SendTypeMt,
|
SendType: SendTypeMt,
|
||||||
MsgContent: MtSingleChat,
|
MsgContent: PushContentReq,
|
||||||
}
|
}
|
||||||
userList = &UserMessageList{
|
userList = &UserMessageList{
|
||||||
VendorID: VendorIDMT,
|
VendorID: VendorIDMT,
|
||||||
UserID: utils.Int2Str(MtSingleChat.OpenUserID),
|
UserID: utils.Int2Str(PushContentReq.OpenUserID),
|
||||||
LatestMsg: MtSingleChat.MsgContent,
|
LatestMsg: PushContentReq.MsgContent,
|
||||||
LatestTime: MtSingleChat.Cts,
|
LatestTime: PushContentReq.Cts,
|
||||||
}
|
}
|
||||||
//vendorStoreID = MtSingleChat.AppPoiCode
|
vendorStoreID = PushContentReq.AppPoiCode
|
||||||
}
|
}
|
||||||
if vendorID == VendorIDELM {
|
if vendorID == VendorIDELM {
|
||||||
var ElmData = ebaiapi.ImMessageSend{}
|
var ElmData = ebaiapi.ImMessageSend{}
|
||||||
@@ -275,23 +139,21 @@ func ReadMsgFromVendor(vendorID int, elmAppID string, msg []byte) {
|
|||||||
|
|
||||||
//1 存储详细聊天记录list
|
//1 存储详细聊天记录list
|
||||||
if err = SetMessageDetail(jxMsg, vendorID, elmAppID); err != nil {
|
if err = SetMessageDetail(jxMsg, vendorID, elmAppID); err != nil {
|
||||||
globals.SugarLogger.Debugf("ReadMsgFromVendor SetMessageDetail err:=%v\n", err)
|
errList.AddErr(fmt.Errorf("存储详细聊天记录错误:%v", err))
|
||||||
//return
|
|
||||||
}
|
}
|
||||||
//2 存储展示列表时单条数据
|
//2 存储展示列表时单条数据
|
||||||
if err = SetUserList(jxMsg, userList, vendorID, elmAppID); err != nil {
|
if err = SetUserList(jxMsg, userList, vendorID, elmAppID); err != nil {
|
||||||
globals.SugarLogger.Debugf("ReadMsgFromVendor SetUserList err:=%v\n", err)
|
errList.AddErr(fmt.Errorf("存储STU聊天记录错误:%v", err))
|
||||||
//return
|
|
||||||
}
|
}
|
||||||
//3 cid推送新消息
|
//3 cid推送新消息
|
||||||
//err = PushMsgByCid(vendorStoreID, vendorID)
|
if err = PushMsgByCid(vendorStoreID, vendorID); err != nil {
|
||||||
//4 长链接通知给客户端
|
errList.AddErr(fmt.Errorf("向商家cid推送新消息错误:%v", err))
|
||||||
if err != nil {
|
|
||||||
ToClientChan <- clientInfo{Code: SuccessCode, Msg: fmt.Sprintf("%v", err), Data: jxMsg}
|
|
||||||
} else {
|
|
||||||
ToClientChan <- clientInfo{Code: SuccessCode, Msg: SuccessMsg, Data: jxMsg}
|
|
||||||
}
|
}
|
||||||
return
|
|
||||||
|
if errList.GetErrListAsOne() != nil {
|
||||||
|
return fmt.Errorf("ReadMsgFromVendor:%v", errList.GetErrListAsOne())
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// PushMsgByCid 通过cid push用户
|
// PushMsgByCid 通过cid push用户
|
||||||
@@ -303,7 +165,7 @@ func PushMsgByCid(vendorStoreID string, vendorID int) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SetMessageDetail 赋值
|
// SetMessageDetail 赋值
|
||||||
//格式 AppID:AppPoiCode:10:OpenUserID
|
//格式 AppID:AppPoiCode:1:OpenUserID
|
||||||
func SetMessageDetail(req *JXMsg, vendorID int, elmAppID string) error {
|
func SetMessageDetail(req *JXMsg, vendorID int, elmAppID string) error {
|
||||||
//生成京西消息ID detail
|
//生成京西消息ID detail
|
||||||
msgID := GenMsgDetailID(req, vendorID, elmAppID)
|
msgID := GenMsgDetailID(req, vendorID, elmAppID)
|
||||||
@@ -368,7 +230,7 @@ func GetNewAndTrim(key string, flag string) (cnt int, err error) {
|
|||||||
// GenMsgDetailID 生成查询详细聊天记录ID
|
// GenMsgDetailID 生成查询详细聊天记录ID
|
||||||
func GenMsgDetailID(jxMsg *JXMsg, vendorID int, elmAppID string) (msgID string) {
|
func GenMsgDetailID(jxMsg *JXMsg, vendorID int, elmAppID string) (msgID string) {
|
||||||
if vendorID == VendorIDMT {
|
if vendorID == VendorIDMT {
|
||||||
var d1 = jxMsg.MsgContent.(mtwmapi.SingleChat)
|
var d1 = jxMsg.MsgContent.(mtwmapi.PushContentReq)
|
||||||
msgID = utils.Int2Str(d1.AppID) + ":" + d1.AppPoiCode + ":1:" + utils.Int2Str(d1.OpenUserID)
|
msgID = utils.Int2Str(d1.AppID) + ":" + d1.AppPoiCode + ":1:" + utils.Int2Str(d1.OpenUserID)
|
||||||
}
|
}
|
||||||
if vendorID == VendorIDELM {
|
if vendorID == VendorIDELM {
|
||||||
@@ -381,7 +243,7 @@ func GenMsgDetailID(jxMsg *JXMsg, vendorID int, elmAppID string) (msgID string)
|
|||||||
// GenMsgListID 生成展示列表时单条数据ID(部分)
|
// GenMsgListID 生成展示列表时单条数据ID(部分)
|
||||||
func GenMsgListID(jxMsg *JXMsg, vendorID int, elmAppID string) (msgID string) {
|
func GenMsgListID(jxMsg *JXMsg, vendorID int, elmAppID string) (msgID string) {
|
||||||
if vendorID == VendorIDMT {
|
if vendorID == VendorIDMT {
|
||||||
var d1 = jxMsg.MsgContent.(mtwmapi.SingleChat)
|
var d1 = jxMsg.MsgContent.(mtwmapi.PushContentReq)
|
||||||
msgID = utils.Int2Str(d1.AppID) + ":" + d1.AppPoiCode + ":1"
|
msgID = utils.Int2Str(d1.AppID) + ":" + d1.AppPoiCode + ":1"
|
||||||
}
|
}
|
||||||
if vendorID == VendorIDELM {
|
if vendorID == VendorIDELM {
|
||||||
|
|||||||
@@ -1,85 +1,11 @@
|
|||||||
package im
|
package im
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"log"
|
|
||||||
r "math/rand"
|
|
||||||
"net"
|
|
||||||
"net/http/httptest"
|
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.rosy.net.cn/baseapi/platformapi/mtwmapi"
|
|
||||||
"git.rosy.net.cn/jx-callback/globals/api"
|
"git.rosy.net.cn/jx-callback/globals/api"
|
||||||
"github.com/gazeboxu/mapstructure"
|
|
||||||
"github.com/gorilla/websocket"
|
|
||||||
"gopkg.in/ini.v1"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ClientManager 连接管理
|
|
||||||
type ClientManager struct {
|
|
||||||
ClientIdMap map[string]*Client // 全部的连接
|
|
||||||
ClientIdMapLock sync.RWMutex // 读写锁
|
|
||||||
|
|
||||||
Connect chan *Client // 连接处理
|
|
||||||
DisConnect chan *Client // 断开连接处理
|
|
||||||
|
|
||||||
GroupLock sync.RWMutex
|
|
||||||
Groups map[string][]string
|
|
||||||
|
|
||||||
SystemClientsLock sync.RWMutex
|
|
||||||
SystemClients map[string][]string
|
|
||||||
}
|
|
||||||
|
|
||||||
// Client 客户端连接信息
|
|
||||||
type Client struct {
|
|
||||||
ClientId string // 标识ID
|
|
||||||
Socket *websocket.Conn // 用户连接
|
|
||||||
ClientType string //标识是美团/客户端长链接
|
|
||||||
ConnectTime uint64 // 首次连接时间
|
|
||||||
IsDeleted bool // 是否删除或下线
|
|
||||||
UserId string // 业务端标识用户ID
|
|
||||||
//Extend string // 扩展字段,用户可以自定义
|
|
||||||
//GroupList []string
|
|
||||||
}
|
|
||||||
|
|
||||||
//channel通道结构体
|
|
||||||
type clientInfo struct {
|
|
||||||
ClientId string `json:"clientId" validate:"required"` //链接ID
|
|
||||||
Data interface{}
|
|
||||||
SendUserId string
|
|
||||||
MessageId string
|
|
||||||
Code int
|
|
||||||
Msg string
|
|
||||||
}
|
|
||||||
|
|
||||||
// RetData 统一返回值结构体
|
|
||||||
type RetData struct {
|
|
||||||
Code int `json:"code"` //响应code
|
|
||||||
Msg string `json:"msg"` //响应msg success/fail
|
|
||||||
//MsgType string `json:"msgType"` //发送消息方类型:mt;elm;jx
|
|
||||||
Data interface{} `json:"data"` //信息
|
|
||||||
|
|
||||||
//MessageType string `json:"messageType"` //消息类型 heart-心跳检测;send-发送消息;receive-接收消息
|
|
||||||
//MessageId string `json:"messageId"` //发送/接收信息 id
|
|
||||||
//UserId string `json:"userId"` //必须是平台方userID
|
|
||||||
}
|
|
||||||
|
|
||||||
type global struct {
|
|
||||||
LocalHost string //本机内网IP
|
|
||||||
ServerList map[string]string
|
|
||||||
ServerListLock sync.RWMutex
|
|
||||||
}
|
|
||||||
type commonConf struct {
|
|
||||||
HttpPort string
|
|
||||||
RPCPort string
|
|
||||||
Cluster bool
|
|
||||||
CryptoKey string
|
|
||||||
}
|
|
||||||
|
|
||||||
// SendData 客户端写入参数
|
// SendData 客户端写入参数
|
||||||
type SendData struct {
|
type SendData struct {
|
||||||
VendorID int `json:"vendorID"` //消息来源平台ID 1-美团 3-饿了么
|
VendorID int `json:"vendorID"` //消息来源平台ID 1-美团 3-饿了么
|
||||||
@@ -92,23 +18,9 @@ type JXMsg struct {
|
|||||||
MsgContent interface{} `json:"msgContent"` //美团/饿了么 单聊消息
|
MsgContent interface{} `json:"msgContent"` //美团/饿了么 单聊消息
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetUserListReq 获取门店用户聊天列表
|
|
||||||
type GetUserListReq struct {
|
|
||||||
VendorStoreID string `json:"vendorStoreID"` //平台门店id
|
|
||||||
VendorID string `json:"vendorID"` //平台标识id
|
|
||||||
AppID string `json:"appID"` //应用ID
|
|
||||||
}
|
|
||||||
|
|
||||||
type GetChatDetailReq struct {
|
|
||||||
VendorStoreID string `json:"vendorStoreID"` //平台门店id
|
|
||||||
VendorID string `json:"vendorID"` //平台标识id
|
|
||||||
AppID string `json:"appID"` //应用ID
|
|
||||||
UserID string `json:"userID"` //userID/groupID
|
|
||||||
}
|
|
||||||
|
|
||||||
// UserMessageList 用户消息列表
|
// UserMessageList 用户消息列表
|
||||||
type UserMessageList struct {
|
type UserMessageList struct {
|
||||||
VendorID int `json:"vendorID"` //平台品牌 10-美团 11-饿了么
|
VendorID int `json:"vendorID"` //平台品牌 1-美团 3-饿了么
|
||||||
UserID string `json:"userID"` //用户ID
|
UserID string `json:"userID"` //用户ID
|
||||||
NewMessageNum int `json:"NewMessageNum"` //新消息数量
|
NewMessageNum int `json:"NewMessageNum"` //新消息数量
|
||||||
LatestMsg string `json:"latestMsg"` //最新一条消息
|
LatestMsg string `json:"latestMsg"` //最新一条消息
|
||||||
@@ -128,222 +40,16 @@ type UserRelInfo struct {
|
|||||||
UserID string `json:"userID"` //用户id/groupID
|
UserID string `json:"userID"` //用户id/groupID
|
||||||
}
|
}
|
||||||
|
|
||||||
// UrlInfo 生成美团长链接url信息
|
|
||||||
type UrlInfo struct {
|
|
||||||
UrlMain string `json:"urlMain"` //主连接路由
|
|
||||||
ClientIDMain string `json:"ClientIDMain"` //主连接id
|
|
||||||
UrlSub string `json:"urlSub"` //副连接路由
|
|
||||||
ClientIDSub string `json:"ClientIDSub"` //副连接id
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
cfg *ini.File
|
rdb = api.Cacher
|
||||||
rdb = api.Cacher
|
VendorIDMT = 1 //im美团
|
||||||
//客户端相关
|
VendorIDELM = 3 //im饿了么
|
||||||
Manager = NewClientManager() // 管理者
|
SendTypeJx = "jx" //京西客户端发送方标识
|
||||||
ToClientChan chan clientInfo
|
SendTypeMt = "mt" //美团用户发送方标识符
|
||||||
ClientTypeJx = "jx" //京西客户端
|
SendTypeElm = "elm" //饿了么用户发送方标识符
|
||||||
ClientTypeMt = "mt" //美团客户端
|
|
||||||
//配置文件
|
|
||||||
CommonSetting = &commonConf{}
|
|
||||||
GlobalSetting = &global{}
|
|
||||||
//心跳相关
|
|
||||||
heartbeatInterval = 20 * time.Second // 心跳间隔
|
|
||||||
HeartCheckMsg = "~#HHHBBB#~" //心跳检测消息
|
|
||||||
HeartCheckSuccess = "HB" //成功发送返回心跳消息
|
|
||||||
HeartSuccessWord = "成功" //成功发送返回心跳消息
|
|
||||||
//平台标识
|
|
||||||
AppID5873 = float64(5873)
|
|
||||||
AppID589 = "589"
|
|
||||||
VendorIDMT = 1 //im美团
|
|
||||||
VendorIDELM = 3 //im饿了么
|
|
||||||
SendTypeJx = "jx" //京西客户端发送方标识
|
|
||||||
SendTypeMt = "mt" //美团用户发送方标识符
|
|
||||||
SendTypeElm = "elm" //饿了么用户发送方标识符
|
|
||||||
MTIMPushUrl = "wss://wpush.meituan.com/websocket" //buildPushConnect建立长连接
|
|
||||||
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
ExpireTimeDay = 2 * time.Hour //redis一天过期时间
|
ExpireTimeDay = 4 * time.Hour //redis过期时间
|
||||||
maxMessageSize = 8192 // 最大的消息大小
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type renderData struct {
|
|
||||||
ClientId string `json:"clientId"`
|
|
||||||
}
|
|
||||||
|
|
||||||
const (
|
|
||||||
SuccessCode = 0
|
|
||||||
SuccessMsg = "success"
|
|
||||||
FailCode = -1
|
|
||||||
FailMsg = "fail"
|
|
||||||
|
|
||||||
SYSTEM_ID_ERROR = -1001
|
|
||||||
ONLINE_MESSAGE_CODE = 1001
|
|
||||||
OFFLINE_MESSAGE_CODE = 1002
|
|
||||||
)
|
|
||||||
|
|
||||||
// Render 统一返回值
|
|
||||||
func Render(conn *websocket.Conn, messageId string, code int, message string, data interface{}) error {
|
|
||||||
if data != nil {
|
|
||||||
str, _ := json.Marshal(data)
|
|
||||||
temp := string(str)
|
|
||||||
return conn.WriteJSON(RetData{
|
|
||||||
Code: code,
|
|
||||||
Msg: message,
|
|
||||||
//MsgType: temp.SendType,
|
|
||||||
Data: temp,
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
return conn.WriteJSON(RetData{
|
|
||||||
Code: code,
|
|
||||||
Msg: message,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ClientRender http响应
|
|
||||||
func ClientRender(code int, msg string) (str string) {
|
|
||||||
var retData RetData
|
|
||||||
|
|
||||||
retData.Code = code
|
|
||||||
retData.Msg = msg
|
|
||||||
//retData.Data = data
|
|
||||||
|
|
||||||
retJson, _ := json.Marshal(retData)
|
|
||||||
str = string(retJson)
|
|
||||||
|
|
||||||
w := httptest.NewRecorder()
|
|
||||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
|
||||||
_, _ = io.WriteString(w, str)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func ConnRender(conn *websocket.Conn, data interface{}) (err error) {
|
|
||||||
err = conn.WriteJSON(RetData{
|
|
||||||
Code: SuccessCode,
|
|
||||||
Msg: "success",
|
|
||||||
Data: data,
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Default 给默认值
|
|
||||||
func Default() {
|
|
||||||
CommonSetting = &commonConf{
|
|
||||||
HttpPort: "6000",
|
|
||||||
RPCPort: "7000",
|
|
||||||
Cluster: false,
|
|
||||||
CryptoKey: "Adba723b7fe06819",
|
|
||||||
}
|
|
||||||
|
|
||||||
GlobalSetting = &global{
|
|
||||||
LocalHost: getIntranetIp(),
|
|
||||||
ServerList: make(map[string]string),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Setup 初始化全局设置变量
|
|
||||||
func Setup() {
|
|
||||||
configFile := flag.String("c", "conf/app.ini", "-c conf/app.ini")
|
|
||||||
|
|
||||||
var err error
|
|
||||||
cfg, err = ini.Load(*configFile)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("setting.Setup, fail to parse 'conf/app.ini': %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
mapTo("common", CommonSetting)
|
|
||||||
|
|
||||||
GlobalSetting = &global{
|
|
||||||
LocalHost: getIntranetIp(),
|
|
||||||
ServerList: make(map[string]string),
|
|
||||||
}
|
|
||||||
fmt.Printf("LocalHost=%s\n ", GlobalSetting.LocalHost)
|
|
||||||
}
|
|
||||||
|
|
||||||
// mapTo map section
|
|
||||||
func mapTo(section string, v interface{}) {
|
|
||||||
err := cfg.Section(section).MapTo(v)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Cfg.MapTo %s err: %v", section, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//获取本机IP
|
|
||||||
func getIntranetIp() string {
|
|
||||||
addrs, _ := net.InterfaceAddrs()
|
|
||||||
for _, addr := range addrs {
|
|
||||||
// 检查ip地址判断是否回环地址
|
|
||||||
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
|
|
||||||
if ipnet.IP.To4() != nil {
|
|
||||||
return ipnet.IP.String()
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
|
|
||||||
// GenFullUrl 组装完整websocket url以及生成clientID
|
|
||||||
func GenFullUrl(appID float64) (fullUrl string) {
|
|
||||||
if appID == AppID5873 {
|
|
||||||
if resp5873, err := api.Mtwm2API.GetConnectionToken(); err == nil {
|
|
||||||
r1 := mtwmapi.GetConnTokenResp{}
|
|
||||||
err = mapstructure.Decode(resp5873, &r1)
|
|
||||||
fullUrl = MTIMPushUrl + "/" + r1.AppKey + "/" + r1.ConnectionToken
|
|
||||||
return fullUrl
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//589/4123
|
|
||||||
resp, err := api.MtwmAPI.GetConnectionToken()
|
|
||||||
if err != nil {
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
retVal := mtwmapi.GetConnTokenResp{}
|
|
||||||
err = mapstructure.Decode(resp, &retVal)
|
|
||||||
fullUrl = MTIMPushUrl + "/" + retVal.AppKey + "/" + retVal.ConnectionToken
|
|
||||||
return fullUrl
|
|
||||||
}
|
|
||||||
|
|
||||||
// GenFullUrl2 组装完整websocket url以及生成clientID
|
|
||||||
func GenFullUrl2() *UrlInfo {
|
|
||||||
urlInfo := &UrlInfo{}
|
|
||||||
//1 589/4123
|
|
||||||
resp, err := api.MtwmAPI.GetConnectionToken()
|
|
||||||
if err != nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
retVal := mtwmapi.GetConnTokenResp{}
|
|
||||||
err = mapstructure.Decode(resp, &retVal)
|
|
||||||
urlInfo.UrlMain = MTIMPushUrl + "/" + retVal.AppKey + "/" + retVal.ConnectionToken
|
|
||||||
urlInfo.ClientIDMain = retVal.AppKey + ":" + retVal.ConnectionToken
|
|
||||||
|
|
||||||
if api.MtwmAPI.GetAppID() == AppID589 { //目前果园无4123
|
|
||||||
if resp5873, err := api.Mtwm2API.GetConnectionToken(); err == nil {
|
|
||||||
r1 := mtwmapi.GetConnTokenResp{}
|
|
||||||
err = mapstructure.Decode(resp5873, &r1)
|
|
||||||
urlInfo.UrlSub = MTIMPushUrl + "/" + r1.AppKey + "/" + r1.ConnectionToken
|
|
||||||
urlInfo.ClientIDSub = r1.AppKey + ":" + r1.ConnectionToken
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return urlInfo
|
|
||||||
}
|
|
||||||
|
|
||||||
// RandString 生成随机字符串
|
|
||||||
func RandString() string {
|
|
||||||
bytes := make([]byte, 16)
|
|
||||||
for i := 0; i < 16; i++ {
|
|
||||||
b := r.Intn(26) + 65
|
|
||||||
bytes[i] = byte(b)
|
|
||||||
}
|
|
||||||
return string(bytes)
|
|
||||||
}
|
|
||||||
|
|
||||||
// JsonCommon json格式化
|
|
||||||
func JsonCommon(str string) (retVal string) {
|
|
||||||
temp, _ := json.Marshal(str)
|
|
||||||
_ = json.Unmarshal(temp, &retVal)
|
|
||||||
return retVal
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -1,259 +0,0 @@
|
|||||||
package im
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"net/http"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.rosy.net.cn/jx-callback/business/jxutils"
|
|
||||||
|
|
||||||
"git.rosy.net.cn/jx-callback/globals"
|
|
||||||
"github.com/gorilla/websocket"
|
|
||||||
)
|
|
||||||
|
|
||||||
func Init() {
|
|
||||||
//初始化
|
|
||||||
ToClientChan = make(chan clientInfo, 1000)
|
|
||||||
//写入全局变量
|
|
||||||
Setup()
|
|
||||||
//建立长链接
|
|
||||||
jxutils.CallMsgHandlerAsync(func() {
|
|
||||||
MtInit()
|
|
||||||
}, "MtInit:"+RandString())
|
|
||||||
|
|
||||||
//启动定时器
|
|
||||||
PingTimer()
|
|
||||||
|
|
||||||
jxutils.CallMsgHandlerAsync(func() {
|
|
||||||
WriteMessage()
|
|
||||||
}, "WriteMessage:"+RandString())
|
|
||||||
|
|
||||||
jxutils.CallMsgHandlerAsync(func() {
|
|
||||||
Manager.Start()
|
|
||||||
}, "Manager Start:"+RandString())
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func Run(w http.ResponseWriter, r *http.Request) {
|
|
||||||
conn, err := (&websocket.Upgrader{
|
|
||||||
ReadBufferSize: 1024,
|
|
||||||
WriteBufferSize: 1024,
|
|
||||||
// 允许所有CORS跨域请求
|
|
||||||
CheckOrigin: func(r *http.Request) bool {
|
|
||||||
return true
|
|
||||||
},
|
|
||||||
}).Upgrade(w, r, nil)
|
|
||||||
if err != nil {
|
|
||||||
globals.SugarLogger.Debugf("upgrade error: %v", err)
|
|
||||||
http.NotFound(w, r)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
//设置读取消息大小上线
|
|
||||||
conn.SetReadLimit(maxMessageSize)
|
|
||||||
|
|
||||||
clientID := ""
|
|
||||||
if temp := r.Header.Get("Clientid"); len(temp) == 0 {
|
|
||||||
clientID = RandString()
|
|
||||||
} else {
|
|
||||||
clientID = temp
|
|
||||||
}
|
|
||||||
|
|
||||||
clientSocket := NewClient(clientID, conn, ClientTypeJx)
|
|
||||||
|
|
||||||
//读取客户端消息
|
|
||||||
clientSocket.Read()
|
|
||||||
|
|
||||||
if err = ConnRender(conn, renderData{ClientId: clientID}); err != nil {
|
|
||||||
_ = conn.Close()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// 用户连接事件
|
|
||||||
Manager.Connect <- clientSocket
|
|
||||||
}
|
|
||||||
|
|
||||||
// PingTimer 定时器发送心跳
|
|
||||||
func PingTimer() {
|
|
||||||
go func() {
|
|
||||||
ticker := time.NewTicker(heartbeatInterval)
|
|
||||||
defer ticker.Stop()
|
|
||||||
for {
|
|
||||||
<-ticker.C
|
|
||||||
for clientId, conn := range Manager.AllClient() {
|
|
||||||
if conn.ClientType == ClientTypeJx {
|
|
||||||
if err := conn.Socket.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
|
|
||||||
Manager.DisConnect <- conn
|
|
||||||
fmt.Printf("发送心跳失败: %s 总连接数:%d", clientId, Manager.Count())
|
|
||||||
}
|
|
||||||
if err := ConnRender(conn.Socket, renderData{ClientId: clientId}); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
globals.SugarLogger.Debugf("PingTimer jx clientID ")
|
|
||||||
} else {
|
|
||||||
fmt.Printf("PingTimer mt心跳,conn%s,%s", conn.ClientId, conn.ClientType)
|
|
||||||
if err := conn.Socket.WriteMessage(websocket.TextMessage, []byte(HeartCheckMsg)); err != nil {
|
|
||||||
fmt.Printf("PingTimer mtHeartBeat err:%v", err)
|
|
||||||
//对美团重新建立连接
|
|
||||||
MtInit()
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
// WriteMessage 监听并发送给客户端信息
|
|
||||||
func WriteMessage() {
|
|
||||||
i := 0
|
|
||||||
for {
|
|
||||||
clientInfo := <-ToClientChan
|
|
||||||
//广播发送通知所有京西客户端
|
|
||||||
i++
|
|
||||||
//fmt.Printf("WriteMessage clientInfo=%s i=%d\n", utils.Format4Output(clientInfo, false), i)
|
|
||||||
if Manager.AllClient() != nil {
|
|
||||||
for _, conn := range Manager.AllClient() {
|
|
||||||
if conn.ClientType == ClientTypeJx { //只发送给京西
|
|
||||||
fmt.Printf("WriteMessage conn.ClientId=%s\n", conn.ClientId)
|
|
||||||
if err := Render(conn.Socket, clientInfo.MessageId, clientInfo.Code, clientInfo.Msg, clientInfo.Data); err != nil {
|
|
||||||
Manager.DisConnect <- conn
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
globals.SugarLogger.Debugf("无客户端连接,请检查")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start 管道处理程序
|
|
||||||
func (manager *ClientManager) Start() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case client := <-manager.Connect:
|
|
||||||
// 建立连接事件
|
|
||||||
manager.EventConnect(client)
|
|
||||||
case conn := <-manager.DisConnect:
|
|
||||||
// 断开连接事件
|
|
||||||
manager.EventDisconnect(conn)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//从客户端读取数据
|
|
||||||
func (c *Client) Read() {
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
messageType, msg, err := c.Socket.ReadMessage()
|
|
||||||
//temp := string(msg)
|
|
||||||
//fmt.Print(temp)
|
|
||||||
if err != nil {
|
|
||||||
if messageType == -1 && websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
|
|
||||||
Manager.DisConnect <- c
|
|
||||||
return
|
|
||||||
} else if messageType != websocket.PingMessage {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fmt.Printf("Client Read:receive: %s\n", string(msg))
|
|
||||||
//SendToVendor(msg)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
// 以下为连接事件操作*******************************************
|
|
||||||
|
|
||||||
// EventConnect 建立连接事件
|
|
||||||
func (manager *ClientManager) EventConnect(client *Client) {
|
|
||||||
manager.AddClient(client)
|
|
||||||
}
|
|
||||||
|
|
||||||
// EventDisconnect 断开连接事件
|
|
||||||
func (manager *ClientManager) EventDisconnect(client *Client) {
|
|
||||||
//关闭连接
|
|
||||||
_ = client.Socket.Close()
|
|
||||||
manager.DelClient(client)
|
|
||||||
//标记销毁
|
|
||||||
client.IsDeleted = true
|
|
||||||
client = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
//以下为客户端Client操作*******************************************
|
|
||||||
|
|
||||||
// NewClient 初始化Client
|
|
||||||
func NewClient(clientId string, socket *websocket.Conn, clientType string) *Client {
|
|
||||||
return &Client{
|
|
||||||
ClientId: clientId,
|
|
||||||
Socket: socket,
|
|
||||||
ClientType: clientType,
|
|
||||||
ConnectTime: uint64(time.Now().Unix()),
|
|
||||||
IsDeleted: false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddClient 添加客户端
|
|
||||||
func (manager *ClientManager) AddClient(client *Client) {
|
|
||||||
manager.ClientIdMapLock.Lock()
|
|
||||||
defer manager.ClientIdMapLock.Unlock()
|
|
||||||
|
|
||||||
manager.ClientIdMap[client.ClientId] = client
|
|
||||||
}
|
|
||||||
|
|
||||||
// DelClient 删除客户端
|
|
||||||
func (manager *ClientManager) DelClient(client *Client) {
|
|
||||||
manager.delClientIdMap(client.ClientId)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// 删除clientIdMap
|
|
||||||
func (manager *ClientManager) delClientIdMap(clientId string) {
|
|
||||||
manager.ClientIdMapLock.Lock()
|
|
||||||
defer manager.ClientIdMapLock.Unlock()
|
|
||||||
|
|
||||||
delete(manager.ClientIdMap, clientId)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetByClientId 通过clientId获取client
|
|
||||||
func (manager *ClientManager) GetByClientId(clientId string) (*Client, error) {
|
|
||||||
manager.ClientIdMapLock.RLock()
|
|
||||||
defer manager.ClientIdMapLock.RUnlock()
|
|
||||||
|
|
||||||
if client, ok := manager.ClientIdMap[clientId]; !ok {
|
|
||||||
return nil, errors.New("客户端不存在")
|
|
||||||
} else {
|
|
||||||
return client, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// AllClient 获取所有的客户端
|
|
||||||
func (manager *ClientManager) AllClient() map[string]*Client {
|
|
||||||
manager.ClientIdMapLock.RLock()
|
|
||||||
defer manager.ClientIdMapLock.RUnlock()
|
|
||||||
|
|
||||||
return manager.ClientIdMap
|
|
||||||
}
|
|
||||||
|
|
||||||
//与客户端的交互操作*************************
|
|
||||||
|
|
||||||
// NewClientManager 初始化客户端管理
|
|
||||||
func NewClientManager() (clientManager *ClientManager) {
|
|
||||||
clientManager = &ClientManager{
|
|
||||||
ClientIdMap: make(map[string]*Client),
|
|
||||||
Connect: make(chan *Client, 10000),
|
|
||||||
DisConnect: make(chan *Client, 10000),
|
|
||||||
Groups: make(map[string][]string, 100),
|
|
||||||
SystemClients: make(map[string][]string, 100),
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Count 获取客户端数量
|
|
||||||
func (manager *ClientManager) Count() int {
|
|
||||||
manager.ClientIdMapLock.RLock()
|
|
||||||
defer manager.ClientIdMapLock.RUnlock()
|
|
||||||
return len(manager.ClientIdMap)
|
|
||||||
}
|
|
||||||
@@ -1,6 +1,12 @@
|
|||||||
package mtwm
|
package mtwm
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"git.rosy.net.cn/jx-callback/business/partner/purchase/im"
|
||||||
|
|
||||||
"git.rosy.net.cn/baseapi/platformapi/mtwmapi"
|
"git.rosy.net.cn/baseapi/platformapi/mtwmapi"
|
||||||
"git.rosy.net.cn/jx-callback/business/jxutils"
|
"git.rosy.net.cn/jx-callback/business/jxutils"
|
||||||
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
|
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
|
||||||
@@ -8,8 +14,6 @@ import (
|
|||||||
"git.rosy.net.cn/jx-callback/business/model"
|
"git.rosy.net.cn/jx-callback/business/model"
|
||||||
"git.rosy.net.cn/jx-callback/business/model/dao"
|
"git.rosy.net.cn/jx-callback/business/model/dao"
|
||||||
"git.rosy.net.cn/jx-callback/globals"
|
"git.rosy.net.cn/jx-callback/globals"
|
||||||
"net/http"
|
|
||||||
"strings"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// 美团回调接口
|
// 美团回调接口
|
||||||
@@ -115,3 +119,14 @@ func GetMsgCallBackUrl(msgType, appId string) string {
|
|||||||
}
|
}
|
||||||
return interfaceUrl
|
return interfaceUrl
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// OnImMsg im消息回调
|
||||||
|
func OnImMsg(msg *mtwmapi.ImCallbackMsg) (response *mtwmapi.CallbackResponse) {
|
||||||
|
if str, err := json.Marshal(msg.PushContent); err == nil {
|
||||||
|
err = im.ReadMsgFromVendor(model.VendorIDMTWM, "", str)
|
||||||
|
if err != nil {
|
||||||
|
globals.SugarLogger.Debugf("OnImMsg提示:%v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return mtwmapi.SuccessResponse
|
||||||
|
}
|
||||||
|
|||||||
@@ -13,15 +13,6 @@ type IMController struct {
|
|||||||
web.Controller
|
web.Controller
|
||||||
}
|
}
|
||||||
|
|
||||||
// @Title IM初始化长链接
|
|
||||||
// @Description IM初始化长链接
|
|
||||||
// @Success 200 {object} controllers.CallResult
|
|
||||||
// @Failure 200 {object} controllers.CallResult
|
|
||||||
// @router /StartWebSocket [get]
|
|
||||||
func (c *IMController) StartWebSocket() {
|
|
||||||
im.Run(c.Ctx.ResponseWriter, c.Ctx.Request)
|
|
||||||
}
|
|
||||||
|
|
||||||
// @Title IM获取门店用户聊天列表
|
// @Title IM获取门店用户聊天列表
|
||||||
// @Description IM获取门店用户聊天列表
|
// @Description IM获取门店用户聊天列表
|
||||||
// @Param token header string true "认证token"
|
// @Param token header string true "认证token"
|
||||||
@@ -73,24 +64,24 @@ func (c *IMController) SetImMsgRead() {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// @Title 向平台商发送信息
|
// @Title 向平台商发送信息(https方式)
|
||||||
// @Description 向平台商发送信息
|
// @Description 向平台商发送信息(https方式)
|
||||||
// @Param token header string true "认证token"
|
// @Param token header string true "认证token"
|
||||||
// @Param sendData formData string true "平台商消息结构体"
|
// @Param sendData formData string true "平台商消息结构体"
|
||||||
// @Success 200 {object} controllers.CallResult
|
// @Success 200 {object} controllers.CallResult
|
||||||
// @Failure 200 {object} controllers.CallResult
|
// @Failure 200 {object} controllers.CallResult
|
||||||
// @router /SendToVendor [post]
|
// @router /SendToVendorV2 [post]
|
||||||
func (c *IMController) SendToVendor() {
|
func (c *IMController) SendToVendorV2() {
|
||||||
c.callSendToVendor(func(params *tImSendToVendorParams) (retVal interface{}, errCode string, err error) {
|
c.callSendToVendorV2(func(params *tImSendToVendorV2Params) (retVal interface{}, errCode string, err error) {
|
||||||
var sendData im.SendData
|
var sendData im.SendData
|
||||||
b := bytes.NewBufferString(params.SendData)
|
b := bytes.NewBufferString(params.SendData)
|
||||||
decoder := json.NewDecoder(b)
|
decoder := json.NewDecoder(b)
|
||||||
_ = decoder.Decode(&sendData)
|
if err = decoder.Decode(&sendData); err == nil {
|
||||||
fmt.Println(sendData)
|
fmt.Println(sendData)
|
||||||
|
if err = im.SendVendorV2(sendData); err != nil {
|
||||||
if dataStr, err := json.Marshal(sendData); err == nil {
|
return nil, "", err
|
||||||
im.SendToVendor(dataStr)
|
}
|
||||||
}
|
}
|
||||||
return nil, "", err
|
return nil, "", nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -87,3 +87,20 @@ func (c *MtwmController) SkuDelete() {
|
|||||||
func (c *MtwmController) StoreBind() {
|
func (c *MtwmController) StoreBind() {
|
||||||
c.onCallbackMsg(mtwmapi.MsgTypeStoreBind)
|
c.onCallbackMsg(mtwmapi.MsgTypeStoreBind)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *MtwmController) IMCallback() {
|
||||||
|
c.OnIMCallback()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *MtwmController) OnIMCallback() {
|
||||||
|
c.Data["json"] = mtwmapi.Err2CallbackResponse(nil, "")
|
||||||
|
msg, callbackResponse := api.MtwmAPI.GetIMCallbackMsg(c.Ctx.Request)
|
||||||
|
if callbackResponse == nil {
|
||||||
|
callbackResponse = mtwm.OnImMsg(msg)
|
||||||
|
if callbackResponse == nil {
|
||||||
|
callbackResponse = mtwmapi.Err2CallbackResponse(nil, "")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
c.Data["json"] = callbackResponse
|
||||||
|
c.ServeJSON()
|
||||||
|
}
|
||||||
|
|||||||
9
main.go
9
main.go
@@ -8,8 +8,6 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.rosy.net.cn/jx-callback/business/partner/purchase/im"
|
|
||||||
|
|
||||||
"git.rosy.net.cn/jx-callback/business/enterprise"
|
"git.rosy.net.cn/jx-callback/business/enterprise"
|
||||||
"git.rosy.net.cn/jx-callback/business/partner/purchase/jdshop"
|
"git.rosy.net.cn/jx-callback/business/partner/purchase/jdshop"
|
||||||
|
|
||||||
@@ -93,13 +91,6 @@ func Init() {
|
|||||||
misc.Init()
|
misc.Init()
|
||||||
enterprise.Init() // 初始化enterprise key
|
enterprise.Init() // 初始化enterprise key
|
||||||
|
|
||||||
im.Init() //初始化ws连接
|
|
||||||
|
|
||||||
//test
|
|
||||||
//mux := http.NewServeMux()
|
|
||||||
//mux.HandleFunc("/v2/im/SendToVendor", im.Run)
|
|
||||||
//go http.ListenAndServe(":8082", mux)
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 返回true表示非运行服务
|
// 返回true表示非运行服务
|
||||||
|
|||||||
@@ -4395,20 +4395,12 @@ func init() {
|
|||||||
Params: nil})
|
Params: nil})
|
||||||
web.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:IMController"] = append(web.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:IMController"],
|
web.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:IMController"] = append(web.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:IMController"],
|
||||||
web.ControllerComments{
|
web.ControllerComments{
|
||||||
Method: "SendToVendor",
|
Method: "SendToVendorV2",
|
||||||
Router: `/SendToVendor`,
|
Router: `/SendToVendorV2`,
|
||||||
AllowHTTPMethods: []string{"post"},
|
AllowHTTPMethods: []string{"post"},
|
||||||
MethodParams: param.Make(),
|
MethodParams: param.Make(),
|
||||||
Filters: nil,
|
Filters: nil,
|
||||||
Params: nil})
|
Params: nil})
|
||||||
web.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:IMController"] = append(web.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:IMController"],
|
|
||||||
web.ControllerComments{
|
|
||||||
Method: "StartWebSocket",
|
|
||||||
Router: `/StartWebSocket`,
|
|
||||||
AllowHTTPMethods: []string{"get"},
|
|
||||||
MethodParams: param.Make(),
|
|
||||||
Filters: nil,
|
|
||||||
Params: nil})
|
|
||||||
|
|
||||||
//web.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:FnController"] = append(web.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:FnController"],
|
//web.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:FnController"] = append(web.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:FnController"],
|
||||||
// web.ControllerComments{
|
// web.ControllerComments{
|
||||||
|
|||||||
Reference in New Issue
Block a user