删一些东西

This commit is contained in:
suyl
2021-07-08 11:41:25 +08:00
parent 9ad4d52352
commit 9aac434f1a
26 changed files with 1 additions and 3372 deletions

View File

@@ -1,10 +1,8 @@
package event
import (
"fmt"
"regexp"
"strings"
"sync"
"time"
"git.rosy.net.cn/baseapi/utils"
@@ -12,8 +10,6 @@ import (
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/model/dao"
"git.rosy.net.cn/jx-callback/globals"
"github.com/gorilla/websocket"
)
var (
@@ -23,161 +19,12 @@ var (
"UpdateUser": "UpdateUser",
}
regexpToken = regexp.MustCompile(`,"token":".*"`)
wsClient = &WSClient{}
//广播频道(通道)
broadcast = make(chan *model.ImMessageRecord)
)
const (
sysMessageTitle = ""
)
//连接的客户端,吧每个客户端都放进来
type WSClient struct {
Clients map[int]map[string]*websocket.Conn
ClientsHeart map[string]*websocket.Conn
s *sync.RWMutex
}
func init() {
clients := make(map[int]map[string]*websocket.Conn)
clientsHeart := make(map[string]*websocket.Conn)
wsClient.Clients = clients
wsClient.ClientsHeart = clientsHeart
wsClient.s = new(sync.RWMutex)
go handleMessages()
}
//广播推送消息
func handleMessages() {
for {
//读取通道中的消息
msg := <-broadcast
if msg.GroupID == 0 {
// globals.SugarLogger.Debugf("heart %v", utils.Format4Output(msg, false))
if wsClient.ClientsHeart[msg.UserID] != nil {
if err := wsClient.ClientsHeart[msg.UserID].WriteJSON(&model.ImMessageRecord{
Key: "pang",
}); err != nil {
globals.SugarLogger.Debugf("heart client.WriteJSON error: %v", err)
wsClient.ClientsHeart[msg.UserID].Close() //关闭
delete(wsClient.ClientsHeart, msg.UserID)
}
}
} else {
globals.SugarLogger.Debugf("clients len %v", len(wsClient.Clients))
//循环map客户端
for userID, client := range wsClient.Clients[msg.GroupID] {
//把通道中的消息发送给客户端
user, err := dao.GetUser(dao.GetDB(), msg.UserID)
if err == nil {
msg.UserInfo = user
}
globals.SugarLogger.Debugf("msg %v", utils.Format4Output(msg, false))
if msg.CreatedAt == utils.ZeroTimeValue {
msg.CreatedAt = time.Now()
}
err = client.WriteJSON(msg)
if err != nil {
globals.SugarLogger.Debugf("client.WriteJSON error: %v", err)
client.Close() //关闭
delete(wsClient.Clients[msg.GroupID], userID)
// delete(clients, client) //删除map中的客户端
}
}
}
}
}
func ImMessage(userID string, ws *websocket.Conn) (err error) {
var (
clientUser = make(map[string]*websocket.Conn)
db = dao.GetDB()
)
//将当前客户端放入map中
messageGroups, _ := dao.GetUserMessageGroups(dao.GetDB(), userID)
if len(messageGroups) == 0 {
return
}
wsClient.s.Lock()
clientUser[userID] = ws
wsClient.ClientsHeart[userID] = ws
for _, v := range messageGroups {
if len(wsClient.Clients[v.GroupID]) > 0 {
wsClient.Clients[v.GroupID][userID] = ws
} else {
wsClient.Clients[v.GroupID] = clientUser
}
}
wsClient.s.Unlock()
globals.SugarLogger.Debugf("userID :%v ,clients :%v", userID, utils.Format4Output(wsClient.Clients, false))
var s *model.ImMessageRecord
for {
//接收客户端的消息
err := ws.ReadJSON(&s)
if err != nil {
globals.SugarLogger.Debugf("页面可能断开啦 ws.ReadJSON error: %v", err.Error())
for k, _ := range wsClient.Clients {
delete(wsClient.Clients[k], userID)
}
delete(wsClient.ClientsHeart, userID)
// delete(clients, ws) //删除map中的客户端
break //结束循环
} else {
//接受消息 业务逻辑
broadcast <- s
if s.GroupID != 0 {
if s.GroupID != model.SysGroupID {
//发聊天消息时这个组所有的成员包括创建者都在userIDs里
userIDs := []string{}
if results, err := dao.GetMessageGroups(db, "", s.GroupID, 0, true, ""); err == nil {
for _, v := range results {
userIDs = append(userIDs, v.UserID)
for _, vv := range v.MessageGroupMembers {
userIDs = append(userIDs, vv.UserID)
}
}
}
//如果这些人不在这个组的ws池子里就打上未读标记
for _, v := range userIDs {
if wsClient.ClientsHeart[v] == nil {
messageGroupReads, _ := dao.GetMessageGroupRead(db, v, s.GroupID)
for _, vv := range messageGroupReads {
vv.UnReadCount++
dao.UpdateEntity(db, vv, "UnReadCount")
}
}
}
} else {
if wsClient.ClientsHeart[s.ToUserID] == nil {
messageGroupReads, _ := dao.GetMessageGroupRead(db, s.ToUserID, s.GroupID)
for _, vv := range messageGroupReads {
vv.UnReadCount++
dao.UpdateEntity(db, vv, "UnReadCount")
}
}
}
}
utils.CallFuncAsync(func() {
if s.GroupID != 0 {
dao.WrapAddIDCULDEntity(s, "")
dao.CreateEntity(db, s)
}
})
}
}
ws.Close()
return err
}
func GetOnlineUserCount() (count int) {
wsClient.s.RLock()
count = len(wsClient.ClientsHeart)
wsClient.s.RUnlock()
return count
}
func AddOperateEvent(ctx *jxcontext.Context, accessUUID, jsonData string, errCode, errMsg string, useTime int, apiFunctionSpec string) (err error) {
var (
apiFunction string
@@ -282,430 +129,3 @@ func GetOperateEvents(ctx *jxcontext.Context, name string, apiFunctions []string
}
return pageInfo, err
}
func CreateMessageGroup(ctx *jxcontext.Context, userID, userID2, groupName string, dividePercentage, quitPrice int) (messageGroupResult *dao.GetMessageGroupsResult, err error) {
var (
db = dao.GetDB()
groupID int
)
for {
groupID = jxutils.GenRand6()
temp := &model.MessageGroup{
GroupID: groupID,
}
dao.GetEntity(db, temp, "GroupID")
if temp.UserID == "" {
break
}
}
if userID2 != "" {
messageGroups, err := dao.GetMessageGroups(db, userID, 0, model.GroupTypeSingle, true, userID2)
if len(messageGroups) > 0 && len(messageGroups[0].MessageGroupMembers) > 0 {
return messageGroups[0], err
}
user, err := dao.GetUserByID(db, "user_id", userID2)
if err != nil {
return nil, err
}
if user == nil {
return nil, fmt.Errorf("无法找到要联系的用户!")
}
messageGroup := &model.MessageGroup{
GroupID: groupID,
UserID: userID,
// Name: user.Name,
Type: model.GroupTypeSingle,
MaxCount: 2,
}
messageGroupMember := &model.MessageGroupMember{
GroupID: groupID,
MemberUserID: userID2,
Type: model.GroupMemberTypeNormal,
}
dao.WrapAddIDCULDEntity(messageGroup, ctx.GetUserName())
dao.WrapAddIDCULDEntity(messageGroupMember, ctx.GetUserName())
err = dao.CreateEntity(db, messageGroup)
err = dao.CreateEntity(db, messageGroupMember)
if err == nil {
messageGroupRead := &model.MessageGroupRead{
GroupID: groupID,
UserID: userID,
}
dao.WrapAddIDCULEntity(messageGroupRead, ctx.GetUserName())
messageGroupRead2 := messageGroupRead
messageGroupRead2.UserID = userID2
dao.CreateEntity(db, messageGroupRead)
if err = dao.CreateEntity(db, messageGroupRead2); err == nil {
if messageGroupReads, err := dao.GetMessageGroupRead(db, userID2, model.SysGroupID); len(messageGroupReads) == 0 && err == nil {
messageGroupRead := &model.MessageGroupRead{
GroupID: model.SysGroupID,
UserID: userID2,
}
dao.WrapAddIDCULEntity(messageGroupRead, ctx.GetUserName())
dao.CreateEntity(db, messageGroupRead)
}
}
}
} else {
userMembers, err := dao.GetUserMember(db, userID, model.MemberTypeNormal)
messageGroupsResult, err := dao.GetMessageGroups(db, userID, 0, model.GroupTypeMulit, false, "")
messageGroupMembers, err := dao.GetMessageGroupMembers(db, 0, model.GroupTypeMulit, userID)
if err != nil {
return nil, err
}
if len(userMembers) == 0 {
return nil, fmt.Errorf("抱歉,只有会员才能创建群聊!")
}
if len(messageGroupsResult) > 0 {
return nil, fmt.Errorf("您已经有群组了,请勿重复创建!群号为:%d", messageGroupsResult[0].GroupID)
}
if len(messageGroupMembers) > 0 {
return nil, fmt.Errorf("您已加入了[%v]群,请先退出后再创建!", messageGroupMembers[0].GroupID)
}
messageGroup := &model.MessageGroup{
GroupID: groupID,
UserID: userID,
Name: groupName,
Type: model.GroupTypeMulit,
MaxCount: 2000,
DividePercentage: dividePercentage,
QuitPrice: quitPrice,
}
user, err := dao.GetUserByID(db, "user_id", userID)
if groupName == "" {
messageGroup.Name = user.Name + "的集团"
}
dao.WrapAddIDCULDEntity(messageGroup, ctx.GetUserName())
if err = dao.CreateEntity(db, messageGroup); err == nil {
messageGroupRead := &model.MessageGroupRead{
GroupID: groupID,
UserID: userID,
}
dao.WrapAddIDCULEntity(messageGroupRead, ctx.GetUserName())
dao.CreateEntity(db, messageGroupRead)
}
}
if err == nil {
if messageGroupReads, err := dao.GetMessageGroupRead(db, userID, model.SysGroupID); len(messageGroupReads) == 0 && err == nil {
messageGroupRead := &model.MessageGroupRead{
GroupID: model.SysGroupID,
UserID: userID,
}
dao.WrapAddIDCULEntity(messageGroupRead, ctx.GetUserName())
dao.CreateEntity(db, messageGroupRead)
}
}
return messageGroupResult, err
}
func GetMessageGroupByUser(ctx *jxcontext.Context, userID string) (messageGroupResult []*dao.GetMessageGroupsResult, err error) {
var (
db = dao.GetDB()
)
messageGroups, err := dao.GetMessageGroups(db, "", model.SysGroupID, 0, false, "")
messageGroups2, err := dao.GetMessageGroups(db, userID, 0, 0, true, "")
messageGroups = append(messageGroups, messageGroups2...)
messageGroupMembers, err := dao.GetMessageGroupMembers(db, 0, 0, userID)
for _, v := range messageGroupMembers {
if messageGroupList, err := dao.GetMessageGroups(db, "", v.GroupID, 0, false, ""); err == nil {
messageGroups = append(messageGroups, messageGroupList...)
}
}
for _, v := range messageGroups {
//是这个人创建的群聊,如果是单聊就返回对方的头像
if v.Type == model.GroupTypeSingle {
var userID2 string
if v.UserID == userID {
userID2 = v.MessageGroupMembers[0].MemberUserID
} else {
userID2 = userID
}
if user, err := dao.GetUserByID(db, "user_id", userID2); err == nil {
v.Avatar = user.Avatar
v.Name = user.Name
}
}
var (
imMessageRecord *model.ImMessageRecord
sql string
sqlParams = []interface{}{}
)
if v.GroupID != model.SysGroupID {
//最后一条记录和时间和人
sql = `
SELECT * FROM im_message_record WHERE group_id = ? ORDER BY created_at DESC LIMIT 1
`
sqlParams = append(sqlParams, v.GroupID)
} else {
//最后一条记录和时间和人
sql = `
SELECT * FROM im_message_record WHERE group_id = ? AND to_user_id = ? ORDER BY created_at DESC LIMIT 1
`
sqlParams = append(sqlParams, v.GroupID, userID)
}
if err = dao.GetRow(db, &imMessageRecord, sql, sqlParams); err == nil {
v.LastTime = imMessageRecord.CreatedAt
v.LastContent = imMessageRecord.Content
v.LastMessageType = imMessageRecord.MessageType
if user3, err := dao.GetUserByID(db, "user_id", imMessageRecord.UserID); err == nil {
v.LastUserName = user3.Name
}
} else {
err = nil
}
//该用户各组的未读消息数
var unReadCount int
if messageGroupReads, err := dao.GetMessageGroupRead(db, userID, v.GroupID); err == nil && len(messageGroupReads) > 0 {
for _, vv := range messageGroupReads {
unReadCount += vv.UnReadCount
}
}
v.UnReadMessageCount = unReadCount
}
return messageGroups, err
}
func AddMessageGroup(ctx *jxcontext.Context, groupID int, userID string) (err error) {
var (
db = dao.GetDB()
)
messageGroupMembers, err := dao.GetMessageGroupMembers(db, groupID, 0, userID)
messageGroupMembers3, err := dao.GetMessageGroupMembers(db, 0, 0, userID)
messageGroupsResult, err := dao.GetMessageGroups(db, userID, groupID, model.GroupTypeMulit, false, "")
messageGroupsResult2, err := dao.GetMessageGroups(db, "", groupID, model.GroupTypeMulit, false, "")
messageGroupsResult3, err := dao.GetMessageGroups(db, userID, 0, model.GroupTypeMulit, false, "")
messageGroupMembers2, err := dao.GetMessageGroupMembers(db, groupID, 0, "")
if err != nil {
return err
}
if len(messageGroupMembers) > 0 {
return fmt.Errorf("此用户已经在该群组中了!")
}
if len(messageGroupMembers3) > 0 {
return fmt.Errorf("您已经有群组[%v]了,不能申请加入其它群!", messageGroupMembers3[0].GroupID)
}
if len(messageGroupsResult) > 0 {
return fmt.Errorf("请不要加入自己创建的群!")
}
if len(messageGroupsResult2) > 0 && len(messageGroupMembers2) > 0 {
if len(messageGroupMembers2)+1 > messageGroupsResult2[0].MaxCount {
return fmt.Errorf("抱歉该群组已经满员了!")
}
}
if len(messageGroupsResult3) > 0 {
return fmt.Errorf("您已经拥有[%v]群了,不能再加入其它群!", messageGroupsResult3[0].GroupID)
}
messageGroupMember := &model.MessageGroupMember{
GroupID: groupID,
MemberUserID: userID,
Type: model.GroupMemberTypeNormal,
}
dao.WrapAddIDCULDEntity(messageGroupMember, ctx.GetUserName())
if err = dao.CreateEntity(db, messageGroupMember); err == nil {
messageGroupRead := &model.MessageGroupRead{
GroupID: groupID,
UserID: userID,
}
dao.WrapAddIDCULEntity(messageGroupRead, ctx.GetUserName())
if err = dao.CreateEntity(db, messageGroupRead); err == nil {
if messageGroupReads, err := dao.GetMessageGroupRead(db, userID, model.SysGroupID); len(messageGroupReads) == 0 && err == nil {
messageGroupRead := &model.MessageGroupRead{
GroupID: model.SysGroupID,
UserID: userID,
}
dao.WrapAddIDCULEntity(messageGroupRead, ctx.GetUserName())
dao.CreateEntity(db, messageGroupRead)
}
}
}
return err
}
func UpdateMessageGroup(ctx *jxcontext.Context, groupID int, payload map[string]interface{}) (num int64, err error) {
var (
db = dao.GetDB()
messageGroup = &model.MessageGroup{
GroupID: groupID,
}
)
if err = dao.GetEntity(db, messageGroup, "GroupID"); err != nil {
return 0, err
}
if messageGroup.UserID != ctx.GetUserID() {
return 0, fmt.Errorf("只有群主才能修改群信息!")
}
valid := dao.StrictMakeMapByStructObject(payload, messageGroup, ctx.GetUserName())
if len(valid) > 0 {
txDB, _ := dao.Begin(db)
defer func() {
if r := recover(); r != nil {
dao.Rollback(db, txDB)
panic(r)
}
}()
if num, err = dao.UpdateEntityLogically(db, messageGroup, valid, ctx.GetUserName(), nil); err != nil {
dao.Rollback(db, txDB)
return 0, err
}
dao.Commit(db, txDB)
}
return num, err
}
func TransferMessageGroupMaster(ctx *jxcontext.Context, groupID int, userID string) (err error) {
var (
db = dao.GetDB()
messageGroup = &model.MessageGroup{
GroupID: groupID,
}
)
if err = dao.GetEntity(db, messageGroup, "GroupID"); err != nil {
return err
}
if messageGroup.UserID != ctx.GetUserID() {
return fmt.Errorf("只有群主才能转让群主!")
}
//群主换成选的那个人
messageGroup.UserID = userID
messageGroup.LastOperator = ctx.GetUserName()
//之前那个群成员换成之前的群主
messageGroupMembers, err := dao.GetMessageGroupMembers(db, groupID, 0, userID)
if err != nil && len(messageGroupMembers) == 0 {
return err
}
messageGroupMember := messageGroupMembers[0]
messageGroupMember.MemberUserID = ctx.GetUserID()
messageGroupMember.LastOperator = ctx.GetUserName()
txDB, _ := dao.Begin(db)
defer func() {
if r := recover(); r != nil {
dao.Rollback(db, txDB)
panic(r)
}
}()
if _, err = dao.UpdateEntity(db, messageGroup, "UserID", "LastOperator"); err != nil {
dao.Rollback(db, txDB)
return err
}
if _, err = dao.UpdateEntity(db, messageGroupMember, "MemberUserID", "LastOperator"); err != nil {
dao.Rollback(db, txDB)
return err
}
dao.Commit(db, txDB)
return err
}
func SendSysMessageSimple(content, toUserID string) (err error) {
return SendSysMessage(jxcontext.AdminCtx, &model.ImMessageRecord{
Content: sysMessageTitle + " " + content,
ToUserID: toUserID,
GroupID: model.SysGroupID,
MessageType: 1, // 普通文字消息
})
}
func SendSysMessage(ctx *jxcontext.Context, imMessageRecord *model.ImMessageRecord) (err error) {
var (
db = dao.GetDB()
userID = imMessageRecord.ToUserID
groupID = imMessageRecord.GroupID
)
if groupID != model.SysGroupID {
return fmt.Errorf("只能给系统组发消息!")
}
imMessageRecord.CreatedAt = time.Now()
imMessageRecord.LastOperator = ctx.GetUserName()
imMessageRecord.DeletedAt = utils.DefaultTimeValue
imMessageRecord.UpdatedAt = utils.DefaultTimeValue
imMessageRecord.Seq = time.Now().Unix()
err = dao.CreateEntity(db, imMessageRecord)
if userID == "" {
//循环map客户端
for _, client := range wsClient.Clients[groupID] {
globals.SugarLogger.Debugf("msg %v", utils.Format4Output(imMessageRecord, false))
err = client.WriteJSON(imMessageRecord)
if err != nil {
globals.SugarLogger.Debugf("client.WriteJSON error: %v", err)
client.Close() //关闭
// delete(wsClient.Clients[msg.GroupID], userID)
}
}
} else {
client := wsClient.Clients[groupID][userID]
globals.SugarLogger.Debugf("msg %v", utils.Format4Output(imMessageRecord, false))
if client == nil {
return
}
err = client.WriteJSON(imMessageRecord)
if err != nil {
globals.SugarLogger.Debugf("client.WriteJSON error: %v", err)
client.Close() //关闭
// delete(wsClient.Clients[msg.GroupID], userID)
}
}
return err
}
func DeleteMessageRecord(ctx *jxcontext.Context) (err error) {
var (
db = dao.GetDB()
)
sql := `
DELETE FROM im_message_record WHERE created_at < ? AND group_id <> ?
`
sqlParams := []interface{}{
time.Now().AddDate(0, 0, -3), model.SysGroupID,
}
_, err = dao.ExecuteSQL(db, sql, sqlParams)
return err
}
func UpdateUserMessageGroupRead(ctx *jxcontext.Context, reads []*model.MessageGroupRead) (err error) {
var (
db = dao.GetDB()
userID = ctx.GetUserID()
)
for _, v := range reads {
if messageReads, err := dao.GetMessageGroupRead(db, userID, v.GroupID); err == nil {
if len(messageReads) > 0 {
messageReads[0].UnReadCount += v.UnReadCount
dao.UpdateEntity(db, messageReads[0], "UnReadCount")
} else {
}
}
}
return err
}
type GetUserStatisticsResult struct {
RegisterUserCount int `json:"registerUserCount"` //注册数
OnlineUserCount int `json:"onlineUserCount"` //在线用户数
ConsumeUserCount int `json:"consumeUserCount"` //消费用户数
MemberUserCount int `json:"memberUserCount"` //会员用户数
}
func GetUserStatistics(ctx *jxcontext.Context) (getUserStatisticsResult *GetUserStatisticsResult, err error) {
var (
db = dao.GetDB()
)
getUserStatisticsResult = &GetUserStatisticsResult{}
sql := `
SELECT a.member_user_count, b.consume_user_count FROM
( SELECT DISTINCT COUNT(a.user_id) member_user_count
FROM user a
JOIN user_member b ON a.user_id = b.user_id AND b.deleted_at = ? )a,
( SELECT DISTINCT COUNT(a.user_id) consume_user_count
FROM user a
JOIN ` + "`order`" + `b ON a.user_id = b.user_id AND b.status = ?)b
`
sqlParams := []interface{}{
utils.DefaultTimeValue, model.OrderStatusFinished,
}
err = dao.GetRow(db, &getUserStatisticsResult, sql, sqlParams)
paged, _ := dao.GetUsers2(db, "", "", 0, "", utils.ZeroTimeValue, utils.ZeroTimeValue, 0, nil, nil, 0, -1)
getUserStatisticsResult.RegisterUserCount = paged.TotalCount
getUserStatisticsResult.OnlineUserCount = GetOnlineUserCount()
return getUserStatisticsResult, err
}