From 51ee11f7d85d6eadc6e2fee24c019c6d370d9ac1 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 8 Jun 2023 15:16:40 +0800 Subject: [PATCH] del msg --- .../online_msg_to_mongo_handler.go | 7 +- internal/rpc/group/group.go | 6 +- internal/rpc/msg/revoke.go | 2 +- pkg/common/db/cache/msg.go | 156 +++++++++++------- pkg/common/db/controller/msg.go | 45 ++++- 5 files changed, 145 insertions(+), 71 deletions(-) diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index e2b2557b4..5d052fac4 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -44,10 +44,15 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont if err != nil { log.ZError(ctx, "single data insert to mongo err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID) } - err = mc.msgDatabase.DeleteMessageFromCache(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData) + var seqs []int64 + for _, msg := range msgFromMQ.MsgData { + seqs = append(seqs, msg.Seq) + } + err = mc.msgDatabase.DeleteMessagesFromCache(ctx, msgFromMQ.ConversationID, seqs) if err != nil { log.ZError(ctx, "remove cache msg from redis err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID) } + mc.msgDatabase.DelUserDeleteMsgsList(ctx, msgFromMQ.ConversationID, seqs) } func (OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index fb19ed68e..d39e00e1e 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -624,15 +624,17 @@ func (s *groupServer) GetGroupMembersInfo(ctx context.Context, req *pbGroup.GetG } func (s *groupServer) GetGroupApplicationList(ctx context.Context, req *pbGroup.GetGroupApplicationListReq) (*pbGroup.GetGroupApplicationListResp, error) { - resp := &pbGroup.GetGroupApplicationListResp{} + pageNumber, showNumber := utils.GetPage(req.Pagination) + groupIDs, err := s.GroupDatabase.FindUserManagedGroupID(ctx, req.FromUserID) if err != nil { return nil, err } + resp := &pbGroup.GetGroupApplicationListResp{} if len(groupIDs) == 0 { return resp, nil } - total, groupRequests, err := s.GroupDatabase.PageGroupRequest(ctx, groupIDs, req.Pagination.PageNumber, req.Pagination.ShowNumber) + total, groupRequests, err := s.GroupDatabase.PageGroupRequest(ctx, groupIDs, pageNumber, showNumber) if err != nil { return nil, err } diff --git a/internal/rpc/msg/revoke.go b/internal/rpc/msg/revoke.go index 33856a109..bc6a7a551 100644 --- a/internal/rpc/msg/revoke.go +++ b/internal/rpc/msg/revoke.go @@ -16,7 +16,7 @@ import ( ) func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.RevokeMsgResp, error) { - defer log.ZInfo(ctx, "RevokeMsg return line") + defer log.ZDebug(ctx, "RevokeMsg return line") if req.UserID == "" { return nil, errs.ErrArgs.Wrap("user_id is empty") } diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index 2dfb93bc1..af8ef2f3d 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -31,12 +31,13 @@ const ( appleDeviceToken = "DEVICE_TOKEN" getuiToken = "GETUI_TOKEN" getuiTaskID = "GETUI_TASK_ID" - messageCache = "MESSAGE_CACHE:" - messageReadCache = "MESSAGE_READ_CACHE:" signalCache = "SIGNAL_CACHE:" signalListCache = "SIGNAL_LIST_CACHE:" fcmToken = "FCM_TOKEN:" + messageCache = "MESSAGE_CACHE:" + messageDelUserList = "MESSAGE_DEL_USER_LIST:" + userDelMessagesList = "USER_DEL_MESSAGES_LIST:" sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:" userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:" exTypeKeyLocker = "EX_LOCK:" @@ -63,33 +64,40 @@ type SeqCache interface { GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) } -type MsgModel interface { - SeqCache - AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error - GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error) - SetTokenMapByUidPid(ctx context.Context, userID string, platform string, m map[string]int) error - DeleteTokenByUidPid(ctx context.Context, userID string, platform string, fields []string) error - GetMessagesBySeq(ctx context.Context, conversationID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) - SetMessageToCache(ctx context.Context, conversationID string, msgList []*sdkws.MsgData) (int, error) - DeleteMessageFromCache(ctx context.Context, conversationID string, msgList []*sdkws.MsgData) error - CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error +type thirdCache interface { HandleSignalInvite(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) GetSignalInvitationInfoByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *sdkws.SignalInviteReq, err error) GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *sdkws.SignalInviteReq, err error) DelUserSignalList(ctx context.Context, userID string) error - DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error - SetGetuiToken(ctx context.Context, token string, expireTime int64) error - GetGetuiToken(ctx context.Context) (string, error) - SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error - GetGetuiTaskID(ctx context.Context) (string, error) - SetSendMsgStatus(ctx context.Context, id string, status int32) error - GetSendMsgStatus(ctx context.Context, id string) (int32, error) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) DelFcmToken(ctx context.Context, account string, platformID int) error IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) + SetGetuiToken(ctx context.Context, token string, expireTime int64) error + GetGetuiToken(ctx context.Context) (string, error) + SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error + GetGetuiTaskID(ctx context.Context) (string, error) +} + +type MsgModel interface { + SeqCache + thirdCache + AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error + GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error) + SetTokenMapByUidPid(ctx context.Context, userID string, platform string, m map[string]int) error + DeleteTokenByUidPid(ctx context.Context, userID string, platform string, fields []string) error + GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) + SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) + UserDeleteMsgs(ctx context.Context, conversationID string, seqs []int64, userID string) error + DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64) + DeleteMessages(ctx context.Context, conversationID string, seqs []int64) error + GetUserDelList(ctx context.Context, userID, conversationID string) (seqs []int64, err error) + CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error + DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error + SetSendMsgStatus(ctx context.Context, id string, status int32) error + GetSendMsgStatus(ctx context.Context, id string) (int32, error) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error @@ -98,9 +106,6 @@ type MsgModel interface { SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error - - GetMsgsByConversationIDAndSeq(ctx context.Context, userID, docID string, seqs []int64) ([]*unRelationTb.MsgInfoModel, error) - DeleteMsgByConversationIDAndSeq(ctx context.Context, docID string, seq int64) MsgModel } func NewMsgCacheModel(client redis.UniversalClient) MsgModel { @@ -327,10 +332,73 @@ func (c *msgCache) SetMessageToCache(ctx context.Context, conversationID string, return len(failedMsgs), err } -func (c *msgCache) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*sdkws.MsgData) error { +func (c *msgCache) getMessageDelUserListKey(conversationID string, seq int64) string { + return messageDelUserList + conversationID + ":" + strconv.Itoa(int(seq)) +} + +func (c *msgCache) getUserDelList(conversationID, userID string) string { + return userDelMessagesList + conversationID + ":" + userID +} + +func (c *msgCache) UserDeleteMsgs(ctx context.Context, conversationID string, seqs []int64, userID string) error { pipe := c.rdb.Pipeline() - for _, v := range msgList { - if err := pipe.Del(ctx, c.getMessageCacheKey(userID, v.Seq)).Err(); err != nil { + for _, seq := range seqs { + delUserListKey := c.getMessageDelUserListKey(conversationID, seq) + userDelListKey := c.getUserDelList(conversationID, userID) + err := pipe.SAdd(ctx, delUserListKey, userID).Err() + if err != nil { + return errs.Wrap(err) + } + err = pipe.SAdd(ctx, userDelListKey, seq).Err() + if err != nil { + return errs.Wrap(err) + } + if err := pipe.Expire(ctx, delUserListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { + return errs.Wrap(err) + } + if err := pipe.Expire(ctx, userDelListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { + return errs.Wrap(err) + } + } + _, err := pipe.Exec(ctx) + return errs.Wrap(err) +} + +func (c *msgCache) GetUserDelList(ctx context.Context, userID, conversationID string) (seqs []int64, err error) { + result, err := c.rdb.SMembers(ctx, c.getUserDelList(userID, conversationID)).Result() + if err != nil { + return nil, errs.Wrap(err) + } + seqs = make([]int64, len(result)) + for i, v := range result { + seqs[i] = utils.StringToInt64(v) + } + return seqs, nil +} + +func (c *msgCache) DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64) { + for _, seq := range seqs { + delUsers, err := c.rdb.SMembers(ctx, c.getMessageDelUserListKey(conversationID, seq)).Result() + if err != nil { + log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq) + continue + } + for _, userID := range delUsers { + if err := c.rdb.SRem(ctx, c.getUserDelList(conversationID, userID), seq).Err(); err != nil { + log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq, "userID", userID) + } + + if err := c.rdb.Del(ctx, c.getMessageDelUserListKey(conversationID, seq)).Err(); err != nil { + log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq) + } + } + } +} + +func (c *msgCache) DeleteMessages(ctx context.Context, conversationID string, seqs []int64) error { + pipe := c.rdb.Pipeline() + for _, seq := range seqs { + if err := pipe.Del(ctx, c.getMessageCacheKey(conversationID, seq)).Err(); err != nil { return errs.Wrap(err) } } @@ -572,41 +640,3 @@ func (c *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID func (c *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err()) } - -func (c *msgCache) NewCache() MsgModel { - return &msgCache{ - metaCache: NewMetaCacheRedis(c.rcClient, c.metaCache.GetPreDelKeys()...), - expireTime: c.expireTime, - rcClient: c.rcClient, - } -} - -func (c msgCache) getMsgReadCacheKey(docID string, seq int64) string { - return messageReadCache + docID + "_" + strconv.Itoa(int(seq)) -} - -func (c *msgCache) getMsgsIndex(msg *unRelationTb.MsgInfoModel, keys []string) (int, error) { - key := c.getMsgReadCacheKey(utils.GetConversationIDByMsgModel(msg.Msg), msg.Msg.Seq) - for i, _key := range keys { - if key == _key { - return i, nil - } - } - return 0, errIndex -} - -func (c *msgCache) GetMsgsByConversationIDAndSeq(ctx context.Context, userID, docID string, seqs []int64) ([]*unRelationTb.MsgInfoModel, error) { - var keys []string - for _, seq := range seqs { - keys = append(keys, c.getMsgReadCacheKey(docID, seq)) - } - return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.getMsgsIndex, func(ctx context.Context) ([]*unRelationTb.MsgInfoModel, error) { - return c.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, docID, seqs) - }) -} - -func (c *msgCache) DeleteMsgByConversationIDAndSeq(ctx context.Context, docID string, seq int64) MsgModel { - cache := c.NewCache() - c.AddKeys(c.getMsgReadCacheKey(docID, seq)) - return cache -} diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 9a59e9030..5f4ad098c 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -37,7 +37,8 @@ type CommonMsgDatabase interface { // mark as read MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, seqs []int64) error // 刪除redis中消息缓存 - DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error + DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error + DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64) // incrSeq然后批量插入缓存 BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, err error) @@ -316,8 +317,12 @@ func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userI return nil } -func (db *commonMsgDatabase) DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error { - return db.cache.DeleteMessageFromCache(ctx, conversationID, msgs) +func (db *commonMsgDatabase) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error { + return db.cache.DeleteMessages(ctx, conversationID, seqs) +} + +func (db *commonMsgDatabase) DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64) { + db.cache.DelUserDeleteMsgsList(ctx, conversationID, seqs) } func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) { @@ -474,13 +479,29 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin break } } - successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs) + cachedMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs) if err != nil { if err != redis.Nil { prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs)) log.ZError(ctx, "get message from redis exception", err, conversationID, seqs) } } + var successMsgs []*sdkws.MsgData + if len(cachedMsgs) > 0 { + delSeqs, err := db.cache.GetUserDelList(ctx, userID, conversationID) + if err != nil && errs.Unwrap(err) != redis.Nil { + return 0, 0, nil, err + } else { + log.ZDebug(ctx, "get delSeqs from redis", "delSeqs", delSeqs, "userID", userID, "conversationID", conversationID) + } + + for _, msg := range cachedMsgs { + if !utils.Contain(msg.Seq, delSeqs...) { + successMsgs = append(successMsgs, msg) + } + } + } + log.ZDebug(ctx, "get msgs from cache", "successMsgs", successMsgs) if len(failedSeqs) != 0 { log.ZDebug(ctx, "msgs not exist in redis", err, "seqs", seqs) } @@ -635,6 +656,9 @@ func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversatio } func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, allSeqs []int64) error { + if err := db.cache.DeleteMessages(ctx, conversationID, allSeqs); err != nil { + return err + } for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) { var indexes []int for _, seq := range seqs { @@ -648,6 +672,19 @@ func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conve } func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error { + msgs, _, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs) + if err != nil { + log.ZWarn(ctx, "DeleteUserMsgsBySeqs", err, "conversationID", conversationID, "seqs", seqs) + return err + } + var cacheSeqs []int64 + for _, msg := range msgs { + cacheSeqs = append(cacheSeqs, msg.Seq) + } + if err := db.cache.UserDeleteMsgs(ctx, conversationID, cacheSeqs, userID); err != nil { + return err + } + for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) { for _, seq := range seqs { if _, err := db.msgDocDatabase.PushUnique(ctx, docID, db.msg.GetMsgIndex(seq), "del_list", []string{userID}); err != nil {