diff --git a/business/jxutils/cache/cache.go b/business/jxutils/cache/cache.go index 01da965ee..391173fdc 100644 --- a/business/jxutils/cache/cache.go +++ b/business/jxutils/cache/cache.go @@ -12,13 +12,13 @@ type ICacher interface { GetAs(key string, ptr interface{}) error Keys(prefix string) ([]string, error) - //FlushDB() error - //Incr(key string) error - //LRange(key string) (retVal []string) - //Exists(keys ...string) (int64, error) - //RPush(key string, value interface{}) error - //Expire(key string, expiration time.Duration) error - //LRem(key string, count int, value interface{}) error - //LSet(key string, index int, value interface{}) error - //ExpireResult(key string, expiration time.Duration) (bool, error) + FlushDB() error + Incr(key string) error + LRange(key string) (retVal []string) + Exists(keys ...string) (int64, error) + RPush(key string, value interface{}) error + Expire(key string, expiration time.Duration) error + LRem(key string, count int, value interface{}) error + LSet(key string, index int, value interface{}) error + ExpireResult(key string, expiration time.Duration) (bool, error) } diff --git a/business/jxutils/cache/redis/redis.go b/business/jxutils/cache/redis/redis.go index bf4ef4377..38dfd5d59 100644 --- a/business/jxutils/cache/redis/redis.go +++ b/business/jxutils/cache/redis/redis.go @@ -74,46 +74,46 @@ func (c *Cacher) Keys(prefix string) ([]string, error) { return c.client.Keys(prefix + "*").Result() } -//func (c *Cacher) RPush(key string, value interface{}) error { -// return c.client.RPush(key, value).Err() -//} -// -//func (c *Cacher) FlushDB() error { -// return c.client.FlushDB().Err() -//} -// -//func (c *Cacher) Expire(key string, expiration time.Duration) error { -// return c.client.Expire(key, expiration).Err() -//} -// -//func (c *Cacher) ExpireResult(key string, expiration time.Duration) (bool, error) { -// ok, err := c.client.Expire(key, expiration).Result() -// if err != nil { -// return false, err -// } -// return ok, nil -//} -// -//func (c *Cacher) Exists(keys ...string) (int64, error) { -// ret := c.client.Exists(keys...) -// return ret.Val(), ret.Err() -//} -// -//func (c *Cacher) Incr(key string) error { -// return c.client.Incr(key).Err() -//} -// -//func (c *Cacher) LRange(key string) (retVal []string) { -// if c.client.LLen(key).Val() > 0 { -// retVal = c.client.LRange(key, 0, -1).Val() -// } -// return retVal -//} -// -//func (c *Cacher) LSet(key string, index int, value interface{}) error { -// return c.client.LSet(key, int64(index), value).Err() -//} -// -//func (c *Cacher) LRem(key string, count int, value interface{}) error { -// return c.client.LRem(key, int64(count), value).Err() -//} +func (c *Cacher) RPush(key string, value interface{}) error { + return c.client.RPush(key, value).Err() +} + +func (c *Cacher) FlushDB() error { + return c.client.FlushDB().Err() +} + +func (c *Cacher) Expire(key string, expiration time.Duration) error { + return c.client.Expire(key, expiration).Err() +} + +func (c *Cacher) ExpireResult(key string, expiration time.Duration) (bool, error) { + ok, err := c.client.Expire(key, expiration).Result() + if err != nil { + return false, err + } + return ok, nil +} + +func (c *Cacher) Exists(keys ...string) (int64, error) { + ret := c.client.Exists(keys...) + return ret.Val(), ret.Err() +} + +func (c *Cacher) Incr(key string) error { + return c.client.Incr(key).Err() +} + +func (c *Cacher) LRange(key string) (retVal []string) { + if c.client.LLen(key).Val() > 0 { + retVal = c.client.LRange(key, 0, -1).Val() + } + return retVal +} + +func (c *Cacher) LSet(key string, index int, value interface{}) error { + return c.client.LSet(key, int64(index), value).Err() +} + +func (c *Cacher) LRem(key string, count int, value interface{}) error { + return c.client.LRem(key, int64(count), value).Err() +} diff --git a/business/partner/purchase/ebai/im.go b/business/partner/purchase/ebai/im.go index 520cdca10..a1edd851e 100644 --- a/business/partner/purchase/ebai/im.go +++ b/business/partner/purchase/ebai/im.go @@ -1,7 +1,11 @@ package ebai import ( + "encoding/json" + "git.rosy.net.cn/baseapi/platformapi/ebaiapi" + "git.rosy.net.cn/jx-callback/business/partner/purchase/im" + "git.rosy.net.cn/jx-callback/globals/api" ) const ( @@ -10,9 +14,9 @@ const ( // OnImMessage 用户/骑手 发送/已读消息 回调 func (p *PurchaseHandler) OnImMessage(msg *ebaiapi.CallbackMsg) (response *ebaiapi.CallbackResponse) { - //str, err := json.Marshal(msg.Data) + str, err := json.Marshal(msg.Data) - //im.ReadMsgFromVendor(IMVendorIDELM, msg.Source, str) - return nil - //return api.EbaiAPI.Err2CallbackResponse(msg.Cmd, err, nil) + im.ReadMsgFromVendor(IMVendorIDELM, msg.Source, str) + //return nil + return api.EbaiAPI.Err2CallbackResponse(msg.Cmd, err, nil) } diff --git a/business/partner/purchase/im/im.go b/business/partner/purchase/im/im.go index 8851a333e..551c6d810 100644 --- a/business/partner/purchase/im/im.go +++ b/business/partner/purchase/im/im.go @@ -1,356 +1,355 @@ package im -// -//import ( -// "encoding/json" -// "errors" -// "fmt" -// "net/http" -// -// "git.rosy.net.cn/jx-callback/globals" -// -// "git.rosy.net.cn/baseapi/platformapi/ebaiapi" -// -// "git.rosy.net.cn/baseapi/platformapi/mtwmapi" -// "git.rosy.net.cn/baseapi/utils" -// push "git.rosy.net.cn/jx-callback/business/jxutils/unipush" -// "git.rosy.net.cn/jx-callback/globals/api" -// "github.com/gorilla/websocket" -//) -// -//// SendToVendor 向平台发消息 -//func SendToVendor(msg []byte) { -// var ( -// w http.ResponseWriter -// sendData SendData -// err error -// elmAppID = api.EbaiAPI.GetSource() -// ) -// -// //解析数据 -// if err = json.Unmarshal(msg, &sendData); err != nil { -// return -// } -// -// //存储数据 -// ReadMsgFromClient(sendData.VendorID, elmAppID, sendData.Data) -// -// //发送信息 -// if sendData.VendorID == VendorIDMT { -// temp, _ := json.Marshal(sendData.Data) -// Send(temp) -// } -// if sendData.VendorID == VendorIDELM { -// param := sendData.Data.(ebaiapi.BusinessSendMsgReq) -// if err := api.EbaiAPI.BusinessSendMsg(¶m); err != nil { -// globals.SugarLogger.Debugf("elm发送信息错误:%v", err) -// return -// } -// } -// -// if err != nil { -// ClientRender(w, Fail, FailMsg, map[string]string{ -// "errMsg": fmt.Sprintf("%v", err), -// }) -// } else { -// ClientRender(w, SuccessCode, SuccessMsg, map[string]interface{}{ -// "vendorID": sendData.VendorID, -// "msg": "ok", -// }) -// } -// return -//} -// -//func Send(data []byte) { -// //生成完整url -// fullUrl := GenFullUrl() //clientID暂时不用 -// -// conn, resp, err := websocket.DefaultDialer.Dial(fullUrl, nil) -// if err != nil || resp.StatusCode != 101 { -// fmt.Printf("连接失败:%v http响应不成功", err) -// } -// //关闭 -// defer func(conn *websocket.Conn) { -// err := conn.Close() -// if err != nil { -// return -// } -// }(conn) -// -// err = conn.WriteMessage(websocket.TextMessage, data) -// if err != nil { -// fmt.Println(err) -// } -// -// for { -// _, msg, err := conn.ReadMessage() -// if err != nil { -// break -// } else { -// temp := string(msg) -// if temp != HeartCheckSuccess { -// ReadMsgFromVendor(VendorIDMT, "", msg) -// } -// } -// fmt.Printf("%s receive: %s\n", conn.RemoteAddr(), string(msg)) -// } -//} -// -//// ReadMsgFromClient 存储客户端发送的消息 -//func ReadMsgFromClient(vendorID int, elmAppID string, msg interface{}) { -// var ( -// err error -// jxMsg = &JXMsg{} -// userList = &UserMessageList{} -// ) -// -// data, err := json.Marshal(msg) -// if err != nil { -// return -// } -// -// if vendorID == VendorIDMT { -// var MtSingleChat = mtwmapi.SingleChat{} -// err = json.Unmarshal(data, &MtSingleChat) -// jxMsg = &JXMsg{ -// SendType: SendTypeJx, -// Data: MtSingleChat, -// } -// userList = &UserMessageList{ -// VendorID: VendorIDMT, -// UserID: utils.Int2Str(MtSingleChat.OpenUserID), -// LatestMsg: MtSingleChat.MsgContent, -// LatestTime: MtSingleChat.Cts, -// } -// } -// if vendorID == VendorIDELM { -// var ElmData = ebaiapi.ImMessageSend{} -// err = json.Unmarshal(data, &ElmData) -// jxMsg = &JXMsg{ -// SendType: SendTypeJx, -// Data: ElmData, -// } -// userList = &UserMessageList{ -// VendorID: VendorIDMT, -// UserID: ElmData.PayLoad.GroupID, -// LatestMsg: ElmData.PayLoad.Content, -// LatestTime: int(ElmData.PayLoad.CreateTime), -// } -// } -// -// //1 存储详细聊天记录list -// if err = SetMessageDetail(jxMsg, vendorID, elmAppID); err != nil { -// return -// } -// //2 存储展示列表时单条数据 -// if err = SetUserList(jxMsg, userList, vendorID, elmAppID); err != nil { -// return -// } -//} -// -//// ReadMsgFromVendor 读取数据并存储到redis -//func ReadMsgFromVendor(vendorID int, elmAppID string, msg []byte) { -// if string(msg) == "" { -// return -// } -// var ( -// err error -// vendorStoreID string -// jxMsg = &JXMsg{} -// userList = &UserMessageList{} -// ) -// if vendorID == VendorIDMT { -// var MtSingleChat = mtwmapi.SingleChat{} -// err = json.Unmarshal(msg, &MtSingleChat) -// jxMsg = &JXMsg{ -// SendType: SendTypeMt, -// Data: MtSingleChat, -// } -// userList = &UserMessageList{ -// VendorID: VendorIDMT, -// UserID: utils.Int2Str(MtSingleChat.OpenUserID), -// LatestMsg: MtSingleChat.MsgContent, -// LatestTime: MtSingleChat.Cts, -// } -// vendorStoreID = MtSingleChat.AppPoiCode -// } -// if vendorID == VendorIDELM { -// var ElmData = ebaiapi.ImMessageSend{} -// err = json.Unmarshal(msg, &ElmData) -// jxMsg = &JXMsg{ -// SendType: SendTypeElm, -// Data: ElmData, -// } -// userList = &UserMessageList{ -// VendorID: VendorIDMT, -// UserID: ElmData.PayLoad.GroupID, -// LatestMsg: ElmData.PayLoad.Content, -// LatestTime: int(ElmData.PayLoad.CreateTime), -// } -// } -// -// //1 存储详细聊天记录list -// if err = SetMessageDetail(jxMsg, vendorID, elmAppID); err != nil { -// return -// } -// //2 存储展示列表时单条数据 -// if err = SetUserList(jxMsg, userList, vendorID, elmAppID); err != nil { -// return -// } -// //3 cid推送新消息 -// err = PushMsgByCid(vendorStoreID, vendorID) -// //4 长链接通知给客户端 -// ToClientChan <- clientInfo{Code: SuccessCode, Msg: fmt.Sprintf("%v", err), Data: jxMsg} -//} -// -//// PushMsgByCid 通过cid push用户 -//func PushMsgByCid(vendorStoreID string, vendorID int) error { -// if err := push.NotifyImNewMessage(vendorStoreID, vendorID); err != nil { -// return err -// } -// return nil -//} -// -//// SetMessageDetail 赋值 -////格式 AppID:AppPoiCode:10:OpenUserID -//func SetMessageDetail(req *JXMsg, vendorID int, elmAppID string) error { -// //生成京西消息ID detail -// msgID := GenMsgDetailID(req, vendorID, elmAppID) -// -// data, _ := json.Marshal(req) -// err := rdb.RPush(msgID, string(data)) -// ok, err := rdb.ExpireResult(msgID, ExpireTimeDay) -// if err != nil || !ok { -// return err -// } -// return nil -//} -// -//// SetUserList 赋值 -////AppPoiCode:10 [userid1:{SingleChat},userid2:{}] -//func SetUserList(jxMsg *JXMsg, userList *UserMessageList, vendorID int, elmAppID string) error { -// //生成msgID -// msgID := GenMsgListID(jxMsg, vendorID, elmAppID) -// -// //获取未读消息条数并删除旧数据 -// cnt, err := GetNewAndTrim(msgID, userList.UserID) -// if cnt > 0 { -// userList.NewMessageNum = cnt -// } else { -// userList.NewMessageNum = 1 -// } -// //存储当前数据 -// data, _ := json.Marshal(userList) -// err = rdb.RPush(msgID, string(data)) -// -// ok, err := rdb.ExpireResult(msgID, ExpireTimeDay) -// if err != nil || !ok { -// return err -// } -// return nil -//} -// -//// GetNewAndTrim 获取未读条数并清除旧数据 -//func GetNewAndTrim(key string, flag string) (cnt int, err error) { -// cnt = 0 -// if n, err := rdb.Exists(key); n > 0 && err == nil { -// s2 := rdb.LRange(key) -// for i := 0; i < len(s2); i++ { -// v := UserMessageList{} -// _ = json.Unmarshal([]byte(s2[i]), &v) -// if v.UserID == flag { -// err = rdb.LSet(key, i, "del") -// err = rdb.LRem(key, 0, "del") -// s2 = append(s2[:i], s2[i+1:]...) -// i-- -// if v.NewMessageNum == 0 { //目前为首条 -// cnt++ //赋值1 -// } else { -// cnt = v.NewMessageNum -// } -// } -// } -// } else { -// return 0, nil -// } -// return cnt, err -//} -// -//// GenMsgDetailID 生成查询详细聊天记录ID -//func GenMsgDetailID(jxMsg *JXMsg, vendorID int, elmAppID string) (msgID string) { -// if vendorID == VendorIDMT { -// var d1 = jxMsg.Data.(mtwmapi.SingleChat) -// msgID = utils.Int2Str(d1.AppID) + ":" + d1.AppPoiCode + ":10:" + utils.Int2Str(d1.OpenUserID) -// } -// if vendorID == VendorIDELM { -// var d2 = jxMsg.Data.(ebaiapi.ImMessageSend) -// msgID = elmAppID + ":" + d2.PlatformShopID + ":11:" + d2.PayLoad.GroupID -// } -// return msgID -//} -// -//// GenMsgListID 生成展示列表时单条数据ID(部分) -//func GenMsgListID(jxMsg *JXMsg, vendorID int, elmAppID string) (msgID string) { -// if vendorID == VendorIDMT { -// var d1 = jxMsg.Data.(mtwmapi.SingleChat) -// msgID = utils.Int2Str(d1.AppID) + ":" + d1.AppPoiCode + ":10" -// } -// if vendorID == VendorIDELM { -// var d2 = jxMsg.Data.(ebaiapi.ImMessageSend) -// msgID = elmAppID + ":" + d2.PlatformShopID + ":11" -// } -// return msgID -//} -// -//// GetImUserList 获取门店用户聊天列表 -//func GetImUserList(req []RelInfo) (retVal []interface{}, err error) { -// if len(req) == 0 { -// return nil, errors.New("msgID不允许为空") -// } -// var keys []string -// for _, i := range req { -// key := i.AppID + ":" + i.VendorStoreID + ":" + i.VendorID -// keys = append(keys, key) -// } -// for _, j := range keys { -// temp := rdb.Get(j) -// retVal = append(retVal, temp) -// } -// return retVal, err -//} -// -//// GetImChatDetail 获取门店用户聊天详情 -//func GetImChatDetail(req []UserRelInfo) (retVal []interface{}, err error) { -// if len(req) == 0 { -// return nil, errors.New("msgID不允许为空") -// } -// var keys []string -// for _, i := range req { -// key := i.AppID + ":" + i.VendorStoreID + ":" + i.VendorID + ":" + i.UserID -// keys = append(keys, key) -// } -// for _, j := range keys { -// temp := rdb.Get(j) -// retVal = append(retVal, temp) -// } -// return retVal, err -//} -// -//// SetJxMsgRead 设置jx消息已读 userID(美团:openUserID;饿了么:groupID) -//func SetJxMsgRead(appID, vendorStoreID, vendorID, userID string) error { -// key := appID + ":" + vendorStoreID + ":" + vendorID -// if n, err := rdb.Exists(key); n > 0 && err == nil { -// s2 := rdb.LRange(key) -// for i := 0; i < len(s2); i++ { -// v := UserMessageList{} -// _ = json.Unmarshal([]byte(s2[i]), &v) -// if v.UserID == userID { -// err = rdb.LSet(key, i, "del") -// err = rdb.LRem(key, 0, "del") -// s2 = append(s2[:i], s2[i+1:]...) -// i-- -// } -// } -// } -// return nil -//} +import ( + "encoding/json" + "errors" + "fmt" + "net/http" + + "git.rosy.net.cn/jx-callback/globals" + + "git.rosy.net.cn/baseapi/platformapi/ebaiapi" + + "git.rosy.net.cn/baseapi/platformapi/mtwmapi" + "git.rosy.net.cn/baseapi/utils" + push "git.rosy.net.cn/jx-callback/business/jxutils/unipush" + "git.rosy.net.cn/jx-callback/globals/api" + "github.com/gorilla/websocket" +) + +// SendToVendor 向平台发消息 +func SendToVendor(msg []byte) { + var ( + w http.ResponseWriter + sendData SendData + err error + elmAppID = api.EbaiAPI.GetSource() + ) + + //解析数据 + if err = json.Unmarshal(msg, &sendData); err != nil { + return + } + + //存储数据 + ReadMsgFromClient(sendData.VendorID, elmAppID, sendData.Data) + + //发送信息 + if sendData.VendorID == VendorIDMT { + temp, _ := json.Marshal(sendData.Data) + Send(temp) + } + if sendData.VendorID == VendorIDELM { + param := sendData.Data.(ebaiapi.BusinessSendMsgReq) + if err := api.EbaiAPI.BusinessSendMsg(¶m); err != nil { + globals.SugarLogger.Debugf("elm发送信息错误:%v", err) + return + } + } + + if err != nil { + ClientRender(w, Fail, FailMsg, map[string]string{ + "errMsg": fmt.Sprintf("%v", err), + }) + } else { + ClientRender(w, SuccessCode, SuccessMsg, map[string]interface{}{ + "vendorID": sendData.VendorID, + "msg": "ok", + }) + } + return +} + +func Send(data []byte) { + //生成完整url + fullUrl := GenFullUrl() //clientID暂时不用 + + conn, resp, err := websocket.DefaultDialer.Dial(fullUrl, nil) + if err != nil || resp.StatusCode != 101 { + fmt.Printf("连接失败:%v http响应不成功", err) + } + //关闭 + defer func(conn *websocket.Conn) { + err := conn.Close() + if err != nil { + return + } + }(conn) + + err = conn.WriteMessage(websocket.TextMessage, data) + if err != nil { + fmt.Println(err) + } + + for { + _, msg, err := conn.ReadMessage() + if err != nil { + break + } else { + temp := string(msg) + if temp != HeartCheckSuccess { + ReadMsgFromVendor(VendorIDMT, "", msg) + } + } + fmt.Printf("%s receive: %s\n", conn.RemoteAddr(), string(msg)) + } +} + +// ReadMsgFromClient 存储客户端发送的消息 +func ReadMsgFromClient(vendorID int, elmAppID string, msg interface{}) { + var ( + err error + jxMsg = &JXMsg{} + userList = &UserMessageList{} + ) + + data, err := json.Marshal(msg) + if err != nil { + return + } + + if vendorID == VendorIDMT { + var MtSingleChat = mtwmapi.SingleChat{} + err = json.Unmarshal(data, &MtSingleChat) + jxMsg = &JXMsg{ + SendType: SendTypeJx, + Data: MtSingleChat, + } + userList = &UserMessageList{ + VendorID: VendorIDMT, + UserID: utils.Int2Str(MtSingleChat.OpenUserID), + LatestMsg: MtSingleChat.MsgContent, + LatestTime: MtSingleChat.Cts, + } + } + if vendorID == VendorIDELM { + var ElmData = ebaiapi.ImMessageSend{} + err = json.Unmarshal(data, &ElmData) + jxMsg = &JXMsg{ + SendType: SendTypeJx, + Data: ElmData, + } + userList = &UserMessageList{ + VendorID: VendorIDMT, + UserID: ElmData.PayLoad.GroupID, + LatestMsg: ElmData.PayLoad.Content, + LatestTime: int(ElmData.PayLoad.CreateTime), + } + } + + //1 存储详细聊天记录list + if err = SetMessageDetail(jxMsg, vendorID, elmAppID); err != nil { + return + } + //2 存储展示列表时单条数据 + if err = SetUserList(jxMsg, userList, vendorID, elmAppID); err != nil { + return + } +} + +// ReadMsgFromVendor 读取数据并存储到redis +func ReadMsgFromVendor(vendorID int, elmAppID string, msg []byte) { + if string(msg) == "" { + return + } + var ( + err error + vendorStoreID string + jxMsg = &JXMsg{} + userList = &UserMessageList{} + ) + if vendorID == VendorIDMT { + var MtSingleChat = mtwmapi.SingleChat{} + err = json.Unmarshal(msg, &MtSingleChat) + jxMsg = &JXMsg{ + SendType: SendTypeMt, + Data: MtSingleChat, + } + userList = &UserMessageList{ + VendorID: VendorIDMT, + UserID: utils.Int2Str(MtSingleChat.OpenUserID), + LatestMsg: MtSingleChat.MsgContent, + LatestTime: MtSingleChat.Cts, + } + vendorStoreID = MtSingleChat.AppPoiCode + } + if vendorID == VendorIDELM { + var ElmData = ebaiapi.ImMessageSend{} + err = json.Unmarshal(msg, &ElmData) + jxMsg = &JXMsg{ + SendType: SendTypeElm, + Data: ElmData, + } + userList = &UserMessageList{ + VendorID: VendorIDMT, + UserID: ElmData.PayLoad.GroupID, + LatestMsg: ElmData.PayLoad.Content, + LatestTime: int(ElmData.PayLoad.CreateTime), + } + } + + //1 存储详细聊天记录list + if err = SetMessageDetail(jxMsg, vendorID, elmAppID); err != nil { + return + } + //2 存储展示列表时单条数据 + if err = SetUserList(jxMsg, userList, vendorID, elmAppID); err != nil { + return + } + //3 cid推送新消息 + err = PushMsgByCid(vendorStoreID, vendorID) + //4 长链接通知给客户端 + ToClientChan <- clientInfo{Code: SuccessCode, Msg: fmt.Sprintf("%v", err), Data: jxMsg} +} + +// PushMsgByCid 通过cid push用户 +func PushMsgByCid(vendorStoreID string, vendorID int) error { + if err := push.NotifyImNewMessage(vendorStoreID, vendorID); err != nil { + return err + } + return nil +} + +// SetMessageDetail 赋值 +//格式 AppID:AppPoiCode:10:OpenUserID +func SetMessageDetail(req *JXMsg, vendorID int, elmAppID string) error { + //生成京西消息ID detail + msgID := GenMsgDetailID(req, vendorID, elmAppID) + + data, _ := json.Marshal(req) + err := rdb.RPush(msgID, string(data)) + ok, err := rdb.ExpireResult(msgID, ExpireTimeDay) + if err != nil || !ok { + return err + } + return nil +} + +// SetUserList 赋值 +//AppPoiCode:10 [userid1:{SingleChat},userid2:{}] +func SetUserList(jxMsg *JXMsg, userList *UserMessageList, vendorID int, elmAppID string) error { + //生成msgID + msgID := GenMsgListID(jxMsg, vendorID, elmAppID) + + //获取未读消息条数并删除旧数据 + cnt, err := GetNewAndTrim(msgID, userList.UserID) + if cnt > 0 { + userList.NewMessageNum = cnt + } else { + userList.NewMessageNum = 1 + } + //存储当前数据 + data, _ := json.Marshal(userList) + err = rdb.RPush(msgID, string(data)) + + ok, err := rdb.ExpireResult(msgID, ExpireTimeDay) + if err != nil || !ok { + return err + } + return nil +} + +// GetNewAndTrim 获取未读条数并清除旧数据 +func GetNewAndTrim(key string, flag string) (cnt int, err error) { + cnt = 0 + if n, err := rdb.Exists(key); n > 0 && err == nil { + s2 := rdb.LRange(key) + for i := 0; i < len(s2); i++ { + v := UserMessageList{} + _ = json.Unmarshal([]byte(s2[i]), &v) + if v.UserID == flag { + err = rdb.LSet(key, i, "del") + err = rdb.LRem(key, 0, "del") + s2 = append(s2[:i], s2[i+1:]...) + i-- + if v.NewMessageNum == 0 { //目前为首条 + cnt++ //赋值1 + } else { + cnt = v.NewMessageNum + } + } + } + } else { + return 0, nil + } + return cnt, err +} + +// GenMsgDetailID 生成查询详细聊天记录ID +func GenMsgDetailID(jxMsg *JXMsg, vendorID int, elmAppID string) (msgID string) { + if vendorID == VendorIDMT { + var d1 = jxMsg.Data.(mtwmapi.SingleChat) + msgID = utils.Int2Str(d1.AppID) + ":" + d1.AppPoiCode + ":10:" + utils.Int2Str(d1.OpenUserID) + } + if vendorID == VendorIDELM { + var d2 = jxMsg.Data.(ebaiapi.ImMessageSend) + msgID = elmAppID + ":" + d2.PlatformShopID + ":11:" + d2.PayLoad.GroupID + } + return msgID +} + +// GenMsgListID 生成展示列表时单条数据ID(部分) +func GenMsgListID(jxMsg *JXMsg, vendorID int, elmAppID string) (msgID string) { + if vendorID == VendorIDMT { + var d1 = jxMsg.Data.(mtwmapi.SingleChat) + msgID = utils.Int2Str(d1.AppID) + ":" + d1.AppPoiCode + ":10" + } + if vendorID == VendorIDELM { + var d2 = jxMsg.Data.(ebaiapi.ImMessageSend) + msgID = elmAppID + ":" + d2.PlatformShopID + ":11" + } + return msgID +} + +// GetImUserList 获取门店用户聊天列表 +func GetImUserList(req []RelInfo) (retVal []interface{}, err error) { + if len(req) == 0 { + return nil, errors.New("msgID不允许为空") + } + var keys []string + for _, i := range req { + key := i.AppID + ":" + i.VendorStoreID + ":" + i.VendorID + keys = append(keys, key) + } + for _, j := range keys { + temp := rdb.Get(j) + retVal = append(retVal, temp) + } + return retVal, err +} + +// GetImChatDetail 获取门店用户聊天详情 +func GetImChatDetail(req []UserRelInfo) (retVal []interface{}, err error) { + if len(req) == 0 { + return nil, errors.New("msgID不允许为空") + } + var keys []string + for _, i := range req { + key := i.AppID + ":" + i.VendorStoreID + ":" + i.VendorID + ":" + i.UserID + keys = append(keys, key) + } + for _, j := range keys { + temp := rdb.Get(j) + retVal = append(retVal, temp) + } + return retVal, err +} + +// SetJxMsgRead 设置jx消息已读 userID(美团:openUserID;饿了么:groupID) +func SetJxMsgRead(appID, vendorStoreID, vendorID, userID string) error { + key := appID + ":" + vendorStoreID + ":" + vendorID + if n, err := rdb.Exists(key); n > 0 && err == nil { + s2 := rdb.LRange(key) + for i := 0; i < len(s2); i++ { + v := UserMessageList{} + _ = json.Unmarshal([]byte(s2[i]), &v) + if v.UserID == userID { + err = rdb.LSet(key, i, "del") + err = rdb.LRem(key, 0, "del") + s2 = append(s2[:i], s2[i+1:]...) + i-- + } + } + } + return nil +} diff --git a/business/partner/purchase/im/im_model.go b/business/partner/purchase/im/im_model.go index 2228668ab..ca1269331 100644 --- a/business/partner/purchase/im/im_model.go +++ b/business/partner/purchase/im/im_model.go @@ -1,276 +1,276 @@ package im -// -//import ( -// "encoding/json" -// "flag" -// "fmt" -// "git.rosy.net.cn/baseapi/utils" -// "io" -// "log" -// "net" -// "net/http" -// "sync" -// "time" -// -// "git.rosy.net.cn/baseapi/platformapi/mtwmapi" -// "git.rosy.net.cn/jx-callback/globals/api" -// "github.com/gazeboxu/mapstructure" -// "github.com/gorilla/websocket" -// "gopkg.in/ini.v1" -//) -// -//// ClientManager 连接管理 -//type ClientManager struct { -// ClientIdMap map[string]*Client // 全部的连接 -// ClientIdMapLock sync.RWMutex // 读写锁 -// -// Connect chan *Client // 连接处理 -// DisConnect chan *Client // 断开连接处理 -// -// GroupLock sync.RWMutex -// Groups map[string][]string -// -// SystemClientsLock sync.RWMutex -// SystemClients map[string][]string -//} -// -//// Client 客户端连接信息 -//type Client struct { -// ClientId string // 标识ID -// Socket *websocket.Conn // 用户连接 -// ConnectTime uint64 // 首次连接时间 -// IsDeleted bool // 是否删除或下线 -// UserId string // 业务端标识用户ID -// //Extend string // 扩展字段,用户可以自定义 -// //GroupList []string -//} -// -////channel通道结构体 -//type clientInfo struct { -// ClientId string `json:"clientId" validate:"required"` //链接ID -// Data interface{} -// SendUserId string -// MessageId string -// Code int -// Msg string -//} -// -//// RetData 统一返回值结构体 -//type RetData struct { -// Code int `json:"code"` //响应code -// Msg string `json:"msg"` //响应msg success/fail -// Data interface{} `json:"data"` //信息 -// -// //MessageType string `json:"messageType"` //消息类型 heart-心跳检测;send-发送消息;receive-接收消息 -// //MessageId string `json:"messageId"` //发送/接收信息 id -// //UserId string `json:"userId"` //必须是平台方userID -//} -// -//type global struct { -// LocalHost string //本机内网IP -// ServerList map[string]string -// ServerListLock sync.RWMutex -//} -//type commonConf struct { -// HttpPort string -// RPCPort string -// Cluster bool -// CryptoKey string -//} -// -//// SendData 客户端写入参数 -//type SendData struct { -// //ClientId string `json:"clientId" validate:"required"` //链接ID -// VendorID int `json:"vendorID"` //消息来源平台ID -// Data interface{} `json:"data"` //发送给平台 美团/饿了么消息结构体 -// //返回值 -// //Code int `json:"code"` -// //Msg string `json:"msg"` -// //SendUserId string `json:"sendUserId"` -//} -// -//// JXMsg 京西消息结构体 -//type JXMsg struct { -// SendType string `json:"sendType"` //消息发送方 jx-商家;mt-美团;elm-饿了么 -// Data interface{} `json:"data"` //美团/饿了么 单聊消息 -//} -// -//// GetUserListReq 获取门店用户聊天列表 -//type GetUserListReq struct { -// VendorStoreID string `json:"vendorStoreID"` //平台门店id -// VendorID string `json:"vendorID"` //平台标识id -// AppID string `json:"appID"` //应用ID -//} -// -//type GetChatDetailReq struct { -// VendorStoreID string `json:"vendorStoreID"` //平台门店id -// VendorID string `json:"vendorID"` //平台标识id -// AppID string `json:"appID"` //应用ID -// UserID string `json:"userID"` //userID/groupID -//} -// -//// UserMessageList 用户消息列表 -//type UserMessageList struct { -// VendorID int `json:"vendorID"` //平台品牌 10-美团 11-饿了么 -// UserID string `json:"userID"` //用户ID -// NewMessageNum int `json:"NewMessageNum"` //新消息数量 -// LatestMsg string `json:"latestMsg"` //最新一条消息 -// LatestTime int `json:"latestTime"` //最新一条消息发送时间 -//} -// -//type RelInfo struct { -// VendorStoreID string `json:"vendorStoreID"` //平台门店id -// VendorID string `json:"vendorID"` //平台标识id -// AppID string `json:"appID"` //应用ID -//} -// -//type UserRelInfo struct { -// VendorStoreID string `json:"vendorStoreID"` //平台门店id -// VendorID string `json:"vendorID"` //平台标识id -// AppID string `json:"appID"` //应用ID -// UserID string `json:"userID"` //用户id/groupID -//} -// -//var ( -// cfg *ini.File -// rdb = api.Cacher -// Manager = NewClientManager() // 管理者 -// CommonSetting = &commonConf{} -// GlobalSetting = &global{} -// ToClientChan chan clientInfo -// heartbeatInterval = 60 * time.Second // 心跳间隔 -// HeartCheckMsg = "~#HHHBBB#~" //心跳检测消息 -// HeartCheckSuccess = "HB" //成功发送返回心跳消息 -// VendorIDMT = 10 //im美团 -// VendorIDELM = 11 //im饿了么 -// SendTypeJx = "jx" //京西客户端发送方标识 -// SendTypeMt = "mt" //美团用户发送方标识符 -// SendTypeElm = "elm" //饿了么用户发送方标识符 -// MTIMPushUrl = "wss://wpush.meituan.com/websocket" //buildPushConnect建立长连接 -//) -// -//const ( -// ExpireTimeDay = 24 * time.Hour //redis一天过期时间 -// maxMessageSize = 8192 // 最大的消息大小 -//) -// -//type renderData struct { -// ClientId string `json:"clientId"` -//} -// -//const ( -// SuccessCode = 0 -// SuccessMsg = "success" -// Fail = -1 -// FailMsg = "fail" -// -// SYSTEM_ID_ERROR = -1001 -// ONLINE_MESSAGE_CODE = 1001 -// OFFLINE_MESSAGE_CODE = 1002 -//) -// -//// Render 统一返回值 -//func Render(conn *websocket.Conn, messageId string, code int, message string, data interface{}) error { -// return conn.WriteJSON(RetData{ -// Code: code, -// Msg: message, -// Data: data, -// }) -//} -// -//func ClientRender(w http.ResponseWriter, code int, msg string, data interface{}) (str string) { -// var retData RetData -// -// retData.Code = code -// retData.Msg = msg -// retData.Data = data -// -// retJson, _ := json.Marshal(retData) -// str = string(retJson) -// -// w.Header().Set("Content-Type", "application/json; charset=utf-8") -// _, _ = io.WriteString(w, str) -// return -//} -// -//func ConnRender(conn *websocket.Conn, data interface{}) (err error) { -// err = conn.WriteJSON(RetData{ -// Code: SuccessCode, -// Msg: "success", -// Data: data, -// }) -// return -//} -// -//// Default 给默认值 -//func Default() { -// CommonSetting = &commonConf{ -// HttpPort: "6000", -// RPCPort: "7000", -// Cluster: false, -// CryptoKey: "Adba723b7fe06819", -// } -// -// GlobalSetting = &global{ -// LocalHost: getIntranetIp(), -// ServerList: make(map[string]string), -// } -//} -// -//// Setup 初始化全局设置变量 -//func Setup() { -// configFile := flag.String("c", "conf/app.ini", "-c conf/app.ini") -// -// var err error -// cfg, err = ini.Load(*configFile) -// if err != nil { -// log.Fatalf("setting.Setup, fail to parse 'conf/app.ini': %v", err) -// } -// -// mapTo("common", CommonSetting) -// -// GlobalSetting = &global{ -// LocalHost: getIntranetIp(), -// ServerList: make(map[string]string), -// } -// fmt.Printf("LocalHost=%s\n ServerList=%s\n", GlobalSetting.LocalHost, utils.Format4Output(GlobalSetting.ServerList, false)) -//} -// -//// mapTo map section -//func mapTo(section string, v interface{}) { -// err := cfg.Section(section).MapTo(v) -// if err != nil { -// log.Fatalf("Cfg.MapTo %s err: %v", section, err) -// } -//} -// -////获取本机IP -//func getIntranetIp() string { -// addrs, _ := net.InterfaceAddrs() -// for _, addr := range addrs { -// // 检查ip地址判断是否回环地址 -// if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { -// if ipnet.IP.To4() != nil { -// return ipnet.IP.String() -// } -// -// } -// } -// return "" -//} -// -//// GenFullUrl 组装完整websocket url以及生成clientID -//func GenFullUrl() (fullUrl string) { -// resp, err := api.MtwmAPI.GetConnectionToken() -// if err != nil { -// return "" -// } -// retVal := mtwmapi.GetConnTokenResp{} -// err = mapstructure.Decode(resp, &retVal) -// fullUrl = MTIMPushUrl + "/" + retVal.AppKey + "/" + retVal.ConnectionToken -// //clientID = api.MtwmAPI.GetAppID() + ":" + retVal.ConnectionToken -// //打印输出 -// //fmt.Printf("Create websocket connect failCount:%d", retVal.UserCount) -// return fullUrl -//} +import ( + "encoding/json" + "flag" + "fmt" + "io" + "log" + "net" + "net/http" + "sync" + "time" + + "git.rosy.net.cn/baseapi/utils" + + "git.rosy.net.cn/baseapi/platformapi/mtwmapi" + "git.rosy.net.cn/jx-callback/globals/api" + "github.com/gazeboxu/mapstructure" + "github.com/gorilla/websocket" + "gopkg.in/ini.v1" +) + +// ClientManager 连接管理 +type ClientManager struct { + ClientIdMap map[string]*Client // 全部的连接 + ClientIdMapLock sync.RWMutex // 读写锁 + + Connect chan *Client // 连接处理 + DisConnect chan *Client // 断开连接处理 + + GroupLock sync.RWMutex + Groups map[string][]string + + SystemClientsLock sync.RWMutex + SystemClients map[string][]string +} + +// Client 客户端连接信息 +type Client struct { + ClientId string // 标识ID + Socket *websocket.Conn // 用户连接 + ConnectTime uint64 // 首次连接时间 + IsDeleted bool // 是否删除或下线 + UserId string // 业务端标识用户ID + //Extend string // 扩展字段,用户可以自定义 + //GroupList []string +} + +//channel通道结构体 +type clientInfo struct { + ClientId string `json:"clientId" validate:"required"` //链接ID + Data interface{} + SendUserId string + MessageId string + Code int + Msg string +} + +// RetData 统一返回值结构体 +type RetData struct { + Code int `json:"code"` //响应code + Msg string `json:"msg"` //响应msg success/fail + Data interface{} `json:"data"` //信息 + + //MessageType string `json:"messageType"` //消息类型 heart-心跳检测;send-发送消息;receive-接收消息 + //MessageId string `json:"messageId"` //发送/接收信息 id + //UserId string `json:"userId"` //必须是平台方userID +} + +type global struct { + LocalHost string //本机内网IP + ServerList map[string]string + ServerListLock sync.RWMutex +} +type commonConf struct { + HttpPort string + RPCPort string + Cluster bool + CryptoKey string +} + +// SendData 客户端写入参数 +type SendData struct { + //ClientId string `json:"clientId" validate:"required"` //链接ID + VendorID int `json:"vendorID"` //消息来源平台ID + Data interface{} `json:"data"` //发送给平台 美团/饿了么消息结构体 + //返回值 + //Code int `json:"code"` + //Msg string `json:"msg"` + //SendUserId string `json:"sendUserId"` +} + +// JXMsg 京西消息结构体 +type JXMsg struct { + SendType string `json:"sendType"` //消息发送方 jx-商家;mt-美团;elm-饿了么 + Data interface{} `json:"data"` //美团/饿了么 单聊消息 +} + +// GetUserListReq 获取门店用户聊天列表 +type GetUserListReq struct { + VendorStoreID string `json:"vendorStoreID"` //平台门店id + VendorID string `json:"vendorID"` //平台标识id + AppID string `json:"appID"` //应用ID +} + +type GetChatDetailReq struct { + VendorStoreID string `json:"vendorStoreID"` //平台门店id + VendorID string `json:"vendorID"` //平台标识id + AppID string `json:"appID"` //应用ID + UserID string `json:"userID"` //userID/groupID +} + +// UserMessageList 用户消息列表 +type UserMessageList struct { + VendorID int `json:"vendorID"` //平台品牌 10-美团 11-饿了么 + UserID string `json:"userID"` //用户ID + NewMessageNum int `json:"NewMessageNum"` //新消息数量 + LatestMsg string `json:"latestMsg"` //最新一条消息 + LatestTime int `json:"latestTime"` //最新一条消息发送时间 +} + +type RelInfo struct { + VendorStoreID string `json:"vendorStoreID"` //平台门店id + VendorID string `json:"vendorID"` //平台标识id + AppID string `json:"appID"` //应用ID +} + +type UserRelInfo struct { + VendorStoreID string `json:"vendorStoreID"` //平台门店id + VendorID string `json:"vendorID"` //平台标识id + AppID string `json:"appID"` //应用ID + UserID string `json:"userID"` //用户id/groupID +} + +var ( + cfg *ini.File + rdb = api.Cacher + Manager = NewClientManager() // 管理者 + CommonSetting = &commonConf{} + GlobalSetting = &global{} + ToClientChan chan clientInfo + heartbeatInterval = 60 * time.Second // 心跳间隔 + HeartCheckMsg = "~#HHHBBB#~" //心跳检测消息 + HeartCheckSuccess = "HB" //成功发送返回心跳消息 + VendorIDMT = 10 //im美团 + VendorIDELM = 11 //im饿了么 + SendTypeJx = "jx" //京西客户端发送方标识 + SendTypeMt = "mt" //美团用户发送方标识符 + SendTypeElm = "elm" //饿了么用户发送方标识符 + MTIMPushUrl = "wss://wpush.meituan.com/websocket" //buildPushConnect建立长连接 +) + +const ( + ExpireTimeDay = 24 * time.Hour //redis一天过期时间 + maxMessageSize = 8192 // 最大的消息大小 +) + +type renderData struct { + ClientId string `json:"clientId"` +} + +const ( + SuccessCode = 0 + SuccessMsg = "success" + Fail = -1 + FailMsg = "fail" + + SYSTEM_ID_ERROR = -1001 + ONLINE_MESSAGE_CODE = 1001 + OFFLINE_MESSAGE_CODE = 1002 +) + +// Render 统一返回值 +func Render(conn *websocket.Conn, messageId string, code int, message string, data interface{}) error { + return conn.WriteJSON(RetData{ + Code: code, + Msg: message, + Data: data, + }) +} + +func ClientRender(w http.ResponseWriter, code int, msg string, data interface{}) (str string) { + var retData RetData + + retData.Code = code + retData.Msg = msg + retData.Data = data + + retJson, _ := json.Marshal(retData) + str = string(retJson) + + w.Header().Set("Content-Type", "application/json; charset=utf-8") + _, _ = io.WriteString(w, str) + return +} + +func ConnRender(conn *websocket.Conn, data interface{}) (err error) { + err = conn.WriteJSON(RetData{ + Code: SuccessCode, + Msg: "success", + Data: data, + }) + return +} + +// Default 给默认值 +func Default() { + CommonSetting = &commonConf{ + HttpPort: "6000", + RPCPort: "7000", + Cluster: false, + CryptoKey: "Adba723b7fe06819", + } + + GlobalSetting = &global{ + LocalHost: getIntranetIp(), + ServerList: make(map[string]string), + } +} + +// Setup 初始化全局设置变量 +func Setup() { + configFile := flag.String("c", "conf/app.ini", "-c conf/app.ini") + + var err error + cfg, err = ini.Load(*configFile) + if err != nil { + log.Fatalf("setting.Setup, fail to parse 'conf/app.ini': %v", err) + } + + mapTo("common", CommonSetting) + + GlobalSetting = &global{ + LocalHost: getIntranetIp(), + ServerList: make(map[string]string), + } + fmt.Printf("LocalHost=%s\n ServerList=%s\n", GlobalSetting.LocalHost, utils.Format4Output(GlobalSetting.ServerList, false)) +} + +// mapTo map section +func mapTo(section string, v interface{}) { + err := cfg.Section(section).MapTo(v) + if err != nil { + log.Fatalf("Cfg.MapTo %s err: %v", section, err) + } +} + +//获取本机IP +func getIntranetIp() string { + addrs, _ := net.InterfaceAddrs() + for _, addr := range addrs { + // 检查ip地址判断是否回环地址 + if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { + if ipnet.IP.To4() != nil { + return ipnet.IP.String() + } + + } + } + return "" +} + +// GenFullUrl 组装完整websocket url以及生成clientID +func GenFullUrl() (fullUrl string) { + resp, err := api.MtwmAPI.GetConnectionToken() + if err != nil { + return "" + } + retVal := mtwmapi.GetConnTokenResp{} + err = mapstructure.Decode(resp, &retVal) + fullUrl = MTIMPushUrl + "/" + retVal.AppKey + "/" + retVal.ConnectionToken + //clientID = api.MtwmAPI.GetAppID() + ":" + retVal.ConnectionToken + //打印输出 + //fmt.Printf("Create websocket connect failCount:%d", retVal.UserCount) + return fullUrl +} diff --git a/business/partner/purchase/im/im_server.go b/business/partner/purchase/im/im_server.go index e1186190a..1ff6aeed5 100644 --- a/business/partner/purchase/im/im_server.go +++ b/business/partner/purchase/im/im_server.go @@ -1,264 +1,263 @@ package im -// -//import ( -// "errors" -// "fmt" -// "net/http" -// "time" -// -// "git.rosy.net.cn/baseapi/utils" -// -// "git.rosy.net.cn/jx-callback/globals" -// "github.com/gorilla/websocket" -//) -// -//func Init() { -// //初始化 -// ToClientChan = make(chan clientInfo, 1000) -// //写入全局变量 -// //Default() -// -// Setup() -// //建立长链接 -// //StartWebSocket(res, req) -// Send([]byte(HeartCheckMsg)) -// -// //启动定时器 -// PingTimer() -// -// go WriteMessage() -// -// go Manager.Start() -// -// fmt.Printf("服务器启动成功,端口号:%s\n", CommonSetting.HttpPort) -//} -// -//func Run(w http.ResponseWriter, r *http.Request) { -// conn, err := (&websocket.Upgrader{ -// ReadBufferSize: 1024, -// WriteBufferSize: 1024, -// // 允许所有CORS跨域请求 -// CheckOrigin: func(r *http.Request) bool { -// return true -// }, -// }).Upgrade(w, r, nil) -// if err != nil { -// globals.SugarLogger.Debugf("upgrade error: %v", err) -// http.NotFound(w, r) -// return -// } -// -// //设置读取消息大小上线 -// conn.SetReadLimit(maxMessageSize) -// -// clientID := r.FormValue("clientId") -// clientSocket := NewClient(clientID, conn) -// -// //读取客户端消息 -// clientSocket.Read() -// -// if err = ConnRender(conn, renderData{ClientId: clientID}); err != nil { -// _ = conn.Close() -// return -// } -// -// // 用户连接事件 -// Manager.Connect <- clientSocket -//} -// -//func StartWebSocket(conn *websocket.Conn, clientID string, err error) { -// -// //设置读取消息大小上线 -// conn.SetReadLimit(maxMessageSize) -// -// clientSocket := NewClient(clientID, conn) -// -// //读取客户端消息 -// clientSocket.Read() -// -// if err = ConnRender(conn, renderData{ClientId: clientID}); err != nil { -// _ = conn.Close() -// return -// } -// -// // 用户连接事件 -// Manager.Connect <- clientSocket -// -//} -// -//// PingTimer 定时器发送心跳 -//func PingTimer() { -// go func() { -// ticker := time.NewTicker(heartbeatInterval) -// defer ticker.Stop() -// //测试用 -// i := 0 -// for { -// i++ -// <-ticker.C -// for clientId, conn := range Manager.AllClient() { -// if err := conn.Socket.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { -// Manager.DisConnect <- conn -// globals.SugarLogger.Debugf("发送心跳失败: %s 总连接数:%d", clientId, Manager.Count()) -// } -// if err := ConnRender(conn.Socket, renderData{ClientId: clientId}); err != nil { -// return -// } -// globals.SugarLogger.Debugf("clientId=%s,i=%d", clientId, i) -// } -// } -// }() -//} -// -//// WriteMessage 监听并发送给客户端信息 -//func WriteMessage() { -// i := 0 -// for { -// clientInfo := <-ToClientChan -// //广播发送通知所有客户端 -// i++ -// fmt.Printf("WriteMessage clientInfo=%s i=%d", utils.Format4Output(clientInfo, false), i) -// if Manager.AllClient() != nil { -// for _, conn := range Manager.AllClient() { -// if err := Render(conn.Socket, clientInfo.MessageId, clientInfo.Code, clientInfo.Msg, clientInfo.Data); err != nil { -// Manager.DisConnect <- conn -// } -// } -// } else { -// globals.SugarLogger.Debugf("无客户端连接,请检查") -// return -// } -// //if conn, err := Manager.GetByClientId(clientInfo.ClientId); err == nil && conn != nil { -// // if err := Render(conn.Socket, clientInfo.MessageId, clientInfo.Code, clientInfo.Msg, clientInfo.Data); err != nil { -// // Manager.DisConnect <- conn -// // } -// //} -// } -//} -// -//// Start 管道处理程序 -//func (manager *ClientManager) Start() { -// for { -// select { -// case client := <-manager.Connect: -// // 建立连接事件 -// manager.EventConnect(client) -// case conn := <-manager.DisConnect: -// // 断开连接事件 -// manager.EventDisconnect(conn) -// } -// } -//} -// -////从客户端读取数据 -//func (c *Client) Read() { -// go func() { -// for { -// messageType, msg, err := c.Socket.ReadMessage() -// if err != nil { -// if messageType == -1 && websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) { -// Manager.DisConnect <- c -// return -// } else if messageType != websocket.PingMessage { -// return -// } -// } else { -// SendToVendor(msg) -// } -// } -// }() -//} -// -//// 以下为连接事件操作******************************************* -// -//// EventConnect 建立连接事件 -//func (manager *ClientManager) EventConnect(client *Client) { -// manager.AddClient(client) -//} -// -//// EventDisconnect 断开连接事件 -//func (manager *ClientManager) EventDisconnect(client *Client) { -// //关闭连接 -// _ = client.Socket.Close() -// manager.DelClient(client) -// //标记销毁 -// client.IsDeleted = true -// client = nil -//} -// -////以下为客户端Client操作******************************************* -// -//// NewClient 初始化Client -//func NewClient(clientId string, socket *websocket.Conn) *Client { -// return &Client{ -// ClientId: clientId, -// Socket: socket, -// ConnectTime: uint64(time.Now().Unix()), -// IsDeleted: false, -// } -//} -// -//// AddClient 添加客户端 -//func (manager *ClientManager) AddClient(client *Client) { -// manager.ClientIdMapLock.Lock() -// defer manager.ClientIdMapLock.Unlock() -// -// manager.ClientIdMap[client.ClientId] = client -//} -// -//// DelClient 删除客户端 -//func (manager *ClientManager) DelClient(client *Client) { -// manager.delClientIdMap(client.ClientId) -// -//} -// -//// 删除clientIdMap -//func (manager *ClientManager) delClientIdMap(clientId string) { -// manager.ClientIdMapLock.Lock() -// defer manager.ClientIdMapLock.Unlock() -// -// delete(manager.ClientIdMap, clientId) -//} -// -//// GetByClientId 通过clientId获取client -//func (manager *ClientManager) GetByClientId(clientId string) (*Client, error) { -// manager.ClientIdMapLock.RLock() -// defer manager.ClientIdMapLock.RUnlock() -// -// if client, ok := manager.ClientIdMap[clientId]; !ok { -// return nil, errors.New("客户端不存在") -// } else { -// return client, nil -// } -//} -// -//// AllClient 获取所有的客户端 -//func (manager *ClientManager) AllClient() map[string]*Client { -// manager.ClientIdMapLock.RLock() -// defer manager.ClientIdMapLock.RUnlock() -// -// return manager.ClientIdMap -//} -// -////与客户端的交互操作************************* -// -//// NewClientManager 初始化客户端管理 -//func NewClientManager() (clientManager *ClientManager) { -// clientManager = &ClientManager{ -// ClientIdMap: make(map[string]*Client), -// Connect: make(chan *Client, 10000), -// DisConnect: make(chan *Client, 10000), -// Groups: make(map[string][]string, 100), -// SystemClients: make(map[string][]string, 100), -// } -// -// return -//} -// -//// Count 获取客户端数量 -//func (manager *ClientManager) Count() int { -// manager.ClientIdMapLock.RLock() -// defer manager.ClientIdMapLock.RUnlock() -// return len(manager.ClientIdMap) -//} +import ( + "errors" + "fmt" + "net/http" + "time" + + "git.rosy.net.cn/baseapi/utils" + + "git.rosy.net.cn/jx-callback/globals" + "github.com/gorilla/websocket" +) + +func Init() { + //初始化 + ToClientChan = make(chan clientInfo, 1000) + //写入全局变量 + //Default() + + Setup() + //建立长链接 + //StartWebSocket(res, req) + Send([]byte(HeartCheckMsg)) + + //启动定时器 + PingTimer() + + go WriteMessage() + + go Manager.Start() + + fmt.Printf("服务器启动成功,端口号:%s\n", CommonSetting.HttpPort) +} + +func Run(w http.ResponseWriter, r *http.Request) { + conn, err := (&websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + // 允许所有CORS跨域请求 + CheckOrigin: func(r *http.Request) bool { + return true + }, + }).Upgrade(w, r, nil) + if err != nil { + globals.SugarLogger.Debugf("upgrade error: %v", err) + http.NotFound(w, r) + return + } + + //设置读取消息大小上线 + conn.SetReadLimit(maxMessageSize) + + clientID := r.FormValue("clientId") + clientSocket := NewClient(clientID, conn) + + //读取客户端消息 + clientSocket.Read() + + if err = ConnRender(conn, renderData{ClientId: clientID}); err != nil { + _ = conn.Close() + return + } + + // 用户连接事件 + Manager.Connect <- clientSocket +} + +func StartWebSocket(conn *websocket.Conn, clientID string, err error) { + + //设置读取消息大小上线 + conn.SetReadLimit(maxMessageSize) + + clientSocket := NewClient(clientID, conn) + + //读取客户端消息 + clientSocket.Read() + + if err = ConnRender(conn, renderData{ClientId: clientID}); err != nil { + _ = conn.Close() + return + } + + // 用户连接事件 + Manager.Connect <- clientSocket + +} + +// PingTimer 定时器发送心跳 +func PingTimer() { + go func() { + ticker := time.NewTicker(heartbeatInterval) + defer ticker.Stop() + //测试用 + i := 0 + for { + i++ + <-ticker.C + for clientId, conn := range Manager.AllClient() { + if err := conn.Socket.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { + Manager.DisConnect <- conn + globals.SugarLogger.Debugf("发送心跳失败: %s 总连接数:%d", clientId, Manager.Count()) + } + if err := ConnRender(conn.Socket, renderData{ClientId: clientId}); err != nil { + return + } + globals.SugarLogger.Debugf("clientId=%s,i=%d", clientId, i) + } + } + }() +} + +// WriteMessage 监听并发送给客户端信息 +func WriteMessage() { + i := 0 + for { + clientInfo := <-ToClientChan + //广播发送通知所有客户端 + i++ + fmt.Printf("WriteMessage clientInfo=%s i=%d", utils.Format4Output(clientInfo, false), i) + if Manager.AllClient() != nil { + for _, conn := range Manager.AllClient() { + if err := Render(conn.Socket, clientInfo.MessageId, clientInfo.Code, clientInfo.Msg, clientInfo.Data); err != nil { + Manager.DisConnect <- conn + } + } + } else { + globals.SugarLogger.Debugf("无客户端连接,请检查") + return + } + //if conn, err := Manager.GetByClientId(clientInfo.ClientId); err == nil && conn != nil { + // if err := Render(conn.Socket, clientInfo.MessageId, clientInfo.Code, clientInfo.Msg, clientInfo.Data); err != nil { + // Manager.DisConnect <- conn + // } + //} + } +} + +// Start 管道处理程序 +func (manager *ClientManager) Start() { + for { + select { + case client := <-manager.Connect: + // 建立连接事件 + manager.EventConnect(client) + case conn := <-manager.DisConnect: + // 断开连接事件 + manager.EventDisconnect(conn) + } + } +} + +//从客户端读取数据 +func (c *Client) Read() { + go func() { + for { + messageType, msg, err := c.Socket.ReadMessage() + if err != nil { + if messageType == -1 && websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) { + Manager.DisConnect <- c + return + } else if messageType != websocket.PingMessage { + return + } + } else { + SendToVendor(msg) + } + } + }() +} + +// 以下为连接事件操作******************************************* + +// EventConnect 建立连接事件 +func (manager *ClientManager) EventConnect(client *Client) { + manager.AddClient(client) +} + +// EventDisconnect 断开连接事件 +func (manager *ClientManager) EventDisconnect(client *Client) { + //关闭连接 + _ = client.Socket.Close() + manager.DelClient(client) + //标记销毁 + client.IsDeleted = true + client = nil +} + +//以下为客户端Client操作******************************************* + +// NewClient 初始化Client +func NewClient(clientId string, socket *websocket.Conn) *Client { + return &Client{ + ClientId: clientId, + Socket: socket, + ConnectTime: uint64(time.Now().Unix()), + IsDeleted: false, + } +} + +// AddClient 添加客户端 +func (manager *ClientManager) AddClient(client *Client) { + manager.ClientIdMapLock.Lock() + defer manager.ClientIdMapLock.Unlock() + + manager.ClientIdMap[client.ClientId] = client +} + +// DelClient 删除客户端 +func (manager *ClientManager) DelClient(client *Client) { + manager.delClientIdMap(client.ClientId) + +} + +// 删除clientIdMap +func (manager *ClientManager) delClientIdMap(clientId string) { + manager.ClientIdMapLock.Lock() + defer manager.ClientIdMapLock.Unlock() + + delete(manager.ClientIdMap, clientId) +} + +// GetByClientId 通过clientId获取client +func (manager *ClientManager) GetByClientId(clientId string) (*Client, error) { + manager.ClientIdMapLock.RLock() + defer manager.ClientIdMapLock.RUnlock() + + if client, ok := manager.ClientIdMap[clientId]; !ok { + return nil, errors.New("客户端不存在") + } else { + return client, nil + } +} + +// AllClient 获取所有的客户端 +func (manager *ClientManager) AllClient() map[string]*Client { + manager.ClientIdMapLock.RLock() + defer manager.ClientIdMapLock.RUnlock() + + return manager.ClientIdMap +} + +//与客户端的交互操作************************* + +// NewClientManager 初始化客户端管理 +func NewClientManager() (clientManager *ClientManager) { + clientManager = &ClientManager{ + ClientIdMap: make(map[string]*Client), + Connect: make(chan *Client, 10000), + DisConnect: make(chan *Client, 10000), + Groups: make(map[string][]string, 100), + SystemClients: make(map[string][]string, 100), + } + + return +} + +// Count 获取客户端数量 +func (manager *ClientManager) Count() int { + manager.ClientIdMapLock.RLock() + defer manager.ClientIdMapLock.RUnlock() + return len(manager.ClientIdMap) +} diff --git a/controllers/im.go b/controllers/im.go index 656be4c4f..01aec4bd3 100644 --- a/controllers/im.go +++ b/controllers/im.go @@ -1,106 +1,105 @@ package controllers -// -//import ( -// "encoding/json" -// "net/http" -// -// "git.rosy.net.cn/jx-callback/globals" -// -// "git.rosy.net.cn/jx-callback/business/partner/purchase/im" -// "github.com/astaxie/beego/server/web" -//) -// -//type IMController struct { -// web.Controller -//} -// -//// @Title IM初始化长链接 -//// @Description IM初始化长链接 -//// @Success 200 {object} controllers.CallResult -//// @Failure 200 {object} controllers.CallResult -//// @router /StartWebSocket [get] -//func (c *IMController) StartWebSocket() { -// upgrader.CheckOrigin = func(r *http.Request) bool { -// return true -// } -// ws, err := upgrader.Upgrade(c.Ctx.ResponseWriter, c.Ctx.Request, nil) -// if err != nil { -// globals.SugarLogger.Errorf("upgrade error: %v", err) -// return -// } -// defer ws.Close() -// -// clientID := c.GetString("clientID") -// globals.SugarLogger.Debugf("clientID=%s", clientID) -// -// im.StartWebSocket(ws, clientID, err) -//} -// -//// @Title IM获取门店用户聊天列表 -//// @Description IM获取门店用户聊天列表 -//// @Param token header string true "认证token" -//// @Param payLoad formData string true "平台应用映射关系" -//// @Success 200 {object} controllers.CallResult -//// @Failure 200 {object} controllers.CallResult -//// @router /GetIMUserList [get] -//func (c *IMController) GetIMUserList() { -// c.callGetIMUserList(func(params *tImGetIMUserListParams) (retVal interface{}, errCode string, err error) { -// var relInfo []im.RelInfo -// if err = json.Unmarshal([]byte(params.PayLoad), &relInfo); err != nil { -// retVal, err = im.GetImUserList(relInfo) +import ( + "encoding/json" + "net/http" + + "git.rosy.net.cn/jx-callback/globals" + + "git.rosy.net.cn/jx-callback/business/partner/purchase/im" + "github.com/astaxie/beego/server/web" +) + +type IMController struct { + web.Controller +} + +// @Title IM初始化长链接 +// @Description IM初始化长链接 +// @Success 200 {object} controllers.CallResult +// @Failure 200 {object} controllers.CallResult +// @router /StartWebSocket [get] +func (c *IMController) StartWebSocket() { + upgrader.CheckOrigin = func(r *http.Request) bool { + return true + } + ws, err := upgrader.Upgrade(c.Ctx.ResponseWriter, c.Ctx.Request, nil) + if err != nil { + globals.SugarLogger.Errorf("upgrade error: %v", err) + return + } + defer ws.Close() + + clientID := c.GetString("clientID") + globals.SugarLogger.Debugf("clientID=%s", clientID) + + im.StartWebSocket(ws, clientID, err) +} + +// @Title IM获取门店用户聊天列表 +// @Description IM获取门店用户聊天列表 +// @Param token header string true "认证token" +// @Param payLoad formData string true "平台应用映射关系" +// @Success 200 {object} controllers.CallResult +// @Failure 200 {object} controllers.CallResult +// @router /GetIMUserList [get] +func (c *IMController) GetIMUserList() { + c.callGetIMUserList(func(params *tImGetIMUserListParams) (retVal interface{}, errCode string, err error) { + var relInfo []im.RelInfo + if err = json.Unmarshal([]byte(params.PayLoad), &relInfo); err != nil { + retVal, err = im.GetImUserList(relInfo) + } + return retVal, "", err + }) +} + +// @Title IM获取单个用户聊天详情 +// @Description IM获取单个用户聊天详情 +// @Param token header string true "认证token" +// @Param payLoad formData string true "平台用户应用映射关系" +// @Success 200 {object} controllers.CallResult +// @Failure 200 {object} controllers.CallResult +// @router /GetImChatDetail [get] +func (c *IMController) GetImChatDetail() { + c.callGetImChatDetail(func(params *tImGetImChatDetailParams) (retVal interface{}, errCode string, err error) { + var temp []im.UserRelInfo + if err = json.Unmarshal([]byte(params.PayLoad), &temp); err == nil { + retVal, err = im.GetImChatDetail(temp) + } + return retVal, "", err + }) +} + +// @Title IM设置门店与单个用户已读 +// @Description IM设置门店与单个用户已读 +// @Param token header string true "认证token" +// @Param appID formData string true "应用id" +// @Param vendorStoreID formData string true "平台门店id" +// @Param vendorID formData string true "平台id" +// @Param userID formData string true "用户id/会话id" +// @Success 200 {object} controllers.CallResult +// @Failure 200 {object} controllers.CallResult +// @router /SetImMsgRead [post] +func (c *IMController) SetImMsgRead() { + c.callSetImMsgRead(func(params *tImSetImMsgReadParams) (retVal interface{}, errCode string, err error) { + err = im.SetJxMsgRead(params.AppID, params.VendorStoreID, params.VendorID, params.UserID) + return nil, "", err + }) +} + +// @Title 向平台商发送信息 +// @Description 向平台商发送信息 +// @Param token header string true "认证token" +// @Param sendData formData string true "平台商消息结构体" +// @Success 200 {object} controllers.CallResult +// @Failure 200 {object} controllers.CallResult +// @router /SendToVendor [post] +//func (c *IMController) SendToVendor() { +// c.callSendToVendor(func(params *tImSendToVendorParams) (retVal interface{}, errCode string, err error) { +// var sendData im.SendData +// if err = json.Unmarshal([]byte(params.SendData), &sendData); err == nil { +// im.SendToVendor(sendData) // } -// return retVal, "", err -// }) -//} -// -//// @Title IM获取单个用户聊天详情 -//// @Description IM获取单个用户聊天详情 -//// @Param token header string true "认证token" -//// @Param payLoad formData string true "平台用户应用映射关系" -//// @Success 200 {object} controllers.CallResult -//// @Failure 200 {object} controllers.CallResult -//// @router /GetImChatDetail [get] -//func (c *IMController) GetImChatDetail() { -// c.callGetImChatDetail(func(params *tImGetImChatDetailParams) (retVal interface{}, errCode string, err error) { -// var temp []im.UserRelInfo -// if err = json.Unmarshal([]byte(params.PayLoad), &temp); err == nil { -// retVal, err = im.GetImChatDetail(temp) -// } -// return retVal, "", err -// }) -//} -// -//// @Title IM设置门店与单个用户已读 -//// @Description IM设置门店与单个用户已读 -//// @Param token header string true "认证token" -//// @Param appID formData string true "应用id" -//// @Param vendorStoreID formData string true "平台门店id" -//// @Param vendorID formData string true "平台id" -//// @Param userID formData string true "用户id/会话id" -//// @Success 200 {object} controllers.CallResult -//// @Failure 200 {object} controllers.CallResult -//// @router /SetImMsgRead [post] -//func (c *IMController) SetImMsgRead() { -// c.callSetImMsgRead(func(params *tImSetImMsgReadParams) (retVal interface{}, errCode string, err error) { -// err = im.SetJxMsgRead(params.AppID, params.VendorStoreID, params.VendorID, params.UserID) // return nil, "", err // }) //} -// -//// @Title 向平台商发送信息 -//// @Description 向平台商发送信息 -//// @Param token header string true "认证token" -//// @Param sendData formData string true "平台商消息结构体" -//// @Success 200 {object} controllers.CallResult -//// @Failure 200 {object} controllers.CallResult -//// @router /SendToVendor [post] -////func (c *IMController) SendToVendor() { -//// c.callSendToVendor(func(params *tImSendToVendorParams) (retVal interface{}, errCode string, err error) { -//// var sendData im.SendData -//// if err = json.Unmarshal([]byte(params.SendData), &sendData); err == nil { -//// im.SendToVendor(sendData) -//// } -//// return nil, "", err -//// }) -////} diff --git a/globals/api/api.go b/globals/api/api.go index 5a1740de4..a682ce751 100644 --- a/globals/api/api.go +++ b/globals/api/api.go @@ -294,7 +294,7 @@ func Init() { QiniuAPI = qbox.NewMac(beego.AppConfig.DefaultString("qiniuAK", ""), beego.AppConfig.DefaultString("qiniuSK", "")) ShowAPI = showapi.New(beego.AppConfig.DefaultInt("showAppID", 0), beego.AppConfig.DefaultString("showAppSecret", "")) - Cacher = redis.New(beego.AppConfig.DefaultString("redisHost", ""), beego.AppConfig.DefaultInt("redisPort", 0), beego.AppConfig.DefaultString("redisPassword", "")) + Cacher = redis.New(beego.AppConfig.DefaultString("redisHost", "localhost"), beego.AppConfig.DefaultInt("redisPort", 0), beego.AppConfig.DefaultString("redisPassword", "")) //Todo 本地测试用 //Cacher = redis.New(beego.AppConfig.DefaultString("redisHost", "127.0.0.1"), beego.AppConfig.DefaultInt("redisPort", 6379), beego.AppConfig.DefaultString("redisPassword", "123456")) diff --git a/main.go b/main.go index 0c9cfff73..ef347c489 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "flag" "fmt" + "git.rosy.net.cn/jx-callback/business/partner/purchase/im" "net/http" _ "net/http/pprof" "os" @@ -94,7 +95,7 @@ func Init() { enterprise.Init() // 初始化enterprise key auto_delivery.Init() // 初始化骑手列表 - //im.Init() //初始化ws连接 + im.Init() //初始化ws连接 //go http.HandleFunc("/v2/im/StartWebSocket", im.Run) //test diff --git a/routers/router.go b/routers/router.go index 13f7deab4..2554e47ab 100644 --- a/routers/router.go +++ b/routers/router.go @@ -165,11 +165,11 @@ func init() { &controllers.VersionController{}, ), ), - //web.NSNamespace("/im", - // web.NSInclude( - // &controllers.IMController{}, - // ), - //), + web.NSNamespace("/im", + web.NSInclude( + &controllers.IMController{}, + ), + ), ) web.AddNamespace(ns)