From df148ae5afcf2d2ad069fb675a86de82c369df39 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Thu, 25 May 2023 15:40:19 +0800 Subject: [PATCH] RevokeMsg --- .../online_msg_to_mongo_handler.go | 63 +++++++++---------- internal/rpc/msg/revoke.go | 49 +++++++++++++++ pkg/common/db/controller/msg.go | 10 +-- pkg/utils/utils_v2.go | 9 +++ 4 files changed, 93 insertions(+), 38 deletions(-) diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 85f7211cc..ab5a4e35d 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -2,9 +2,6 @@ package msgtransfer import ( "context" - "encoding/json" - "errors" - "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" @@ -66,36 +63,36 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont log.ZError(ctx, "DelMsgBySeqs", err, "userIDs", deleteMessageTips.UserID, "seqs", deleteMessageTips.Seqs, "totalUnExistSeqs", totalUnExistSeqs) continue } - case constant.MsgRevokeNotification: - var elem sdkws.NotificationElem - if err := json.Unmarshal(v.Content, &elem); err != nil { - log.ZError(ctx, "json.Unmarshal NotificationElem", err, "content", string(v.Content)) - continue - } - var tips sdkws.RevokeMsgTips - if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil { - log.ZError(ctx, "json.Unmarshal RevokeMsgTips", err, "content", string(v.Content)) - continue - } - msgs, err := mc.msgDatabase.GetMsgBySeqs(ctx, tips.ConversationID, []int64{tips.Seq}) - if err != nil { - log.ZError(ctx, "GetMsgBySeqs", err, "conversationID", tips.ConversationID, "seq", tips.Seq) - continue - } - if len(msgs) == 0 { - log.ZError(ctx, "GetMsgBySeqs empty", errors.New("seq not found"), "conversationID", tips.ConversationID, "seq", tips.Seq) - continue - } - msgs[0].Content = []byte(elem.Detail) - data, err := proto.Marshal(msgs[0]) - if err != nil { - log.ZError(ctx, "proto.Marshal MsgData", err) - continue - } - if err := mc.msgDatabase.RevokeMsg(ctx, tips.ConversationID, tips.Seq, data); err != nil { - log.ZError(ctx, "RevokeMsg", err, "conversationID", tips.ConversationID, "seq", tips.Seq) - continue - } + //case constant.MsgRevokeNotification: + // var elem sdkws.NotificationElem + // if err := json.Unmarshal(v.Content, &elem); err != nil { + // log.ZError(ctx, "json.Unmarshal NotificationElem", err, "content", string(v.Content)) + // continue + // } + // var tips sdkws.RevokeMsgTips + // if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil { + // log.ZError(ctx, "json.Unmarshal RevokeMsgTips", err, "content", string(v.Content)) + // continue + // } + // msgs, err := mc.msgDatabase.GetMsgBySeqs(ctx, tips.ConversationID, []int64{tips.Seq}) + // if err != nil { + // log.ZError(ctx, "GetMsgBySeqs", err, "conversationID", tips.ConversationID, "seq", tips.Seq) + // continue + // } + // if len(msgs) == 0 { + // log.ZError(ctx, "GetMsgBySeqs empty", errors.New("seq not found"), "conversationID", tips.ConversationID, "seq", tips.Seq) + // continue + // } + // msgs[0].Content = []byte(elem.Detail) + // data, err := proto.Marshal(msgs[0]) + // if err != nil { + // log.ZError(ctx, "proto.Marshal MsgData", err) + // continue + // } + // if err := mc.msgDatabase.RevokeMsg(ctx, tips.ConversationID, tips.Seq, data); err != nil { + // log.ZError(ctx, "RevokeMsg", err, "conversationID", tips.ConversationID, "seq", tips.Seq) + // continue + // } } } } diff --git a/internal/rpc/msg/revoke.go b/internal/rpc/msg/revoke.go index f0a510212..6d9064f05 100644 --- a/internal/rpc/msg/revoke.go +++ b/internal/rpc/msg/revoke.go @@ -5,11 +5,13 @@ import ( "encoding/json" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" + unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" + "time" ) func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.RevokeMsgResp, error) { @@ -22,9 +24,16 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg. if req.RecvID != "" && req.GroupID != "" { return nil, errs.ErrArgs.Wrap("recv_id and group_id cannot exist at the same time") } + if req.Seq < 0 { + return nil, errs.ErrArgs.Wrap("seq is invalid") + } if err := tokenverify.CheckAccessV3(ctx, req.RecvID); err != nil { return nil, err } + user, err := m.User.GetUserInfo(ctx, req.UserID) + if err != nil { + return nil, err + } var sessionType int32 var conversationID string if req.GroupID == "" { @@ -34,6 +43,46 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg. sessionType = constant.SuperGroupChatType conversationID = utils.GenConversationUniqueKeyForGroup(req.GroupID) } + msgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, conversationID, []int64{req.Seq}) + if err != nil { + return nil, err + } + if len(msgs) == 0 { + return nil, errs.ErrRecordNotFound.Wrap("msg not found") + } + sendID := msgs[0].SendID + if !tokenverify.IsAppManagerUid(ctx) { + if req.GroupID == "" { + if req.UserID != sendID { + return nil, errs.ErrNoPermission.Wrap("no permission") + } + } else { + members, err := m.Group.GetGroupMemberInfoMap(ctx, req.GroupID, utils.Distinct([]string{req.UserID, sendID}), true) + if err != nil { + return nil, err + } + if req.UserID != sendID { + roleLevel := members[req.UserID].RoleLevel + switch members[req.UserID].RoleLevel { + case constant.GroupOwner: + case constant.GroupAdmin: + if roleLevel != constant.GroupOrdinaryUsers { + return nil, errs.ErrNoPermission.Wrap("no permission") + } + default: + return nil, errs.ErrNoPermission.Wrap("no permission") + } + } + } + } + err = m.MsgDatabase.RevokeMsg(ctx, conversationID, req.Seq, &unRelationTb.RevokeModel{ + UserID: req.UserID, + Nickname: user.Nickname, + Time: time.Now().UnixMilli(), + }) + if err != nil { + return nil, err + } tips := sdkws.RevokeMsgTips{ RevokerUserID: req.UserID, ClientMsgID: "", diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 24faab98e..9bd3693b9 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -32,7 +32,7 @@ type CommonMsgDatabase interface { // 批量插入消息 BatchInsertChat2DB(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, currentMaxSeq int64) error // 撤回消息 - RevokeMsg(ctx context.Context, conversationID string, seq int64, msg []byte) error + RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *unRelationTb.RevokeModel) error // 刪除redis中消息缓存 DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error // incrSeq然后批量插入缓存 @@ -316,10 +316,10 @@ func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversatio return nil } -func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, msg []byte) error { - index := seq / db.msg.GetSingleGocMsgNum() - docID := db.msg.IndexDocID(conversationID, index) - return db.msgDocDatabase.UpdateMsgContent(ctx, docID, seq%db.msg.GetSingleGocMsgNum(), msg) +func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *unRelationTb.RevokeModel) error { + msgs := []*unRelationTb.MsgInfoModel{{Revoke: revoke}} + return db.BatchInsertBlock(ctx, conversationID, msgs, seq) + //return db.msgDocDatabase.UpdateMsgContent(ctx, docID, seq%db.msg.GetSingleGocMsgNum(), msg) } func (db *commonMsgDatabase) DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error { diff --git a/pkg/utils/utils_v2.go b/pkg/utils/utils_v2.go index 9143b0685..777308fd7 100644 --- a/pkg/utils/utils_v2.go +++ b/pkg/utils/utils_v2.go @@ -80,6 +80,15 @@ func DistinctAnyGetComparable[E any, K comparable](es []E, fn func(e E) K) []K { // Distinct 去重 func Distinct[T comparable](ts []T) []T { + if len(ts) < 2 { + return ts + } else if len(ts) == 2 { + if ts[0] == ts[1] { + return ts[:1] + } else { + return ts + } + } return DistinctAny(ts, func(t T) T { return t })