This commit is contained in:
wangchuxiao 2023-06-08 15:16:40 +08:00
parent 6a43ca8302
commit 51ee11f7d8
5 changed files with 145 additions and 71 deletions

View File

@ -44,10 +44,15 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
if err != nil {
log.ZError(ctx, "single data insert to mongo err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID)
}
err = mc.msgDatabase.DeleteMessageFromCache(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData)
var seqs []int64
for _, msg := range msgFromMQ.MsgData {
seqs = append(seqs, msg.Seq)
}
err = mc.msgDatabase.DeleteMessagesFromCache(ctx, msgFromMQ.ConversationID, seqs)
if err != nil {
log.ZError(ctx, "remove cache msg from redis err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID)
}
mc.msgDatabase.DelUserDeleteMsgsList(ctx, msgFromMQ.ConversationID, seqs)
}
func (OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }

View File

@ -624,15 +624,17 @@ func (s *groupServer) GetGroupMembersInfo(ctx context.Context, req *pbGroup.GetG
}
func (s *groupServer) GetGroupApplicationList(ctx context.Context, req *pbGroup.GetGroupApplicationListReq) (*pbGroup.GetGroupApplicationListResp, error) {
resp := &pbGroup.GetGroupApplicationListResp{}
pageNumber, showNumber := utils.GetPage(req.Pagination)
groupIDs, err := s.GroupDatabase.FindUserManagedGroupID(ctx, req.FromUserID)
if err != nil {
return nil, err
}
resp := &pbGroup.GetGroupApplicationListResp{}
if len(groupIDs) == 0 {
return resp, nil
}
total, groupRequests, err := s.GroupDatabase.PageGroupRequest(ctx, groupIDs, req.Pagination.PageNumber, req.Pagination.ShowNumber)
total, groupRequests, err := s.GroupDatabase.PageGroupRequest(ctx, groupIDs, pageNumber, showNumber)
if err != nil {
return nil, err
}

View File

@ -16,7 +16,7 @@ import (
)
func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.RevokeMsgResp, error) {
defer log.ZInfo(ctx, "RevokeMsg return line")
defer log.ZDebug(ctx, "RevokeMsg return line")
if req.UserID == "" {
return nil, errs.ErrArgs.Wrap("user_id is empty")
}

View File

@ -31,12 +31,13 @@ const (
appleDeviceToken = "DEVICE_TOKEN"
getuiToken = "GETUI_TOKEN"
getuiTaskID = "GETUI_TASK_ID"
messageCache = "MESSAGE_CACHE:"
messageReadCache = "MESSAGE_READ_CACHE:"
signalCache = "SIGNAL_CACHE:"
signalListCache = "SIGNAL_LIST_CACHE:"
fcmToken = "FCM_TOKEN:"
messageCache = "MESSAGE_CACHE:"
messageDelUserList = "MESSAGE_DEL_USER_LIST:"
userDelMessagesList = "USER_DEL_MESSAGES_LIST:"
sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:"
exTypeKeyLocker = "EX_LOCK:"
@ -63,33 +64,40 @@ type SeqCache interface {
GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)
}
type MsgModel interface {
SeqCache
AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error
GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error)
SetTokenMapByUidPid(ctx context.Context, userID string, platform string, m map[string]int) error
DeleteTokenByUidPid(ctx context.Context, userID string, platform string, fields []string) error
GetMessagesBySeq(ctx context.Context, conversationID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
SetMessageToCache(ctx context.Context, conversationID string, msgList []*sdkws.MsgData) (int, error)
DeleteMessageFromCache(ctx context.Context, conversationID string, msgList []*sdkws.MsgData) error
CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error
type thirdCache interface {
HandleSignalInvite(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error)
GetSignalInvitationInfoByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *sdkws.SignalInviteReq, err error)
GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *sdkws.SignalInviteReq, err error)
DelUserSignalList(ctx context.Context, userID string) error
DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error
SetGetuiToken(ctx context.Context, token string, expireTime int64) error
GetGetuiToken(ctx context.Context) (string, error)
SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error
GetGetuiTaskID(ctx context.Context) (string, error)
SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error)
GetFcmToken(ctx context.Context, account string, platformID int) (string, error)
DelFcmToken(ctx context.Context, account string, platformID int) error
IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error)
SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error
GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error)
SetGetuiToken(ctx context.Context, token string, expireTime int64) error
GetGetuiToken(ctx context.Context) (string, error)
SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error
GetGetuiTaskID(ctx context.Context) (string, error)
}
type MsgModel interface {
SeqCache
thirdCache
AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error
GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error)
SetTokenMapByUidPid(ctx context.Context, userID string, platform string, m map[string]int) error
DeleteTokenByUidPid(ctx context.Context, userID string, platform string, fields []string) error
GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error)
UserDeleteMsgs(ctx context.Context, conversationID string, seqs []int64, userID string) error
DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64)
DeleteMessages(ctx context.Context, conversationID string, seqs []int64) error
GetUserDelList(ctx context.Context, userID, conversationID string) (seqs []int64, err error)
CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error
DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error
SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error)
GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error)
DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error
@ -98,9 +106,6 @@ type MsgModel interface {
SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error
LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
GetMsgsByConversationIDAndSeq(ctx context.Context, userID, docID string, seqs []int64) ([]*unRelationTb.MsgInfoModel, error)
DeleteMsgByConversationIDAndSeq(ctx context.Context, docID string, seq int64) MsgModel
}
func NewMsgCacheModel(client redis.UniversalClient) MsgModel {
@ -327,10 +332,73 @@ func (c *msgCache) SetMessageToCache(ctx context.Context, conversationID string,
return len(failedMsgs), err
}
func (c *msgCache) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*sdkws.MsgData) error {
func (c *msgCache) getMessageDelUserListKey(conversationID string, seq int64) string {
return messageDelUserList + conversationID + ":" + strconv.Itoa(int(seq))
}
func (c *msgCache) getUserDelList(conversationID, userID string) string {
return userDelMessagesList + conversationID + ":" + userID
}
func (c *msgCache) UserDeleteMsgs(ctx context.Context, conversationID string, seqs []int64, userID string) error {
pipe := c.rdb.Pipeline()
for _, v := range msgList {
if err := pipe.Del(ctx, c.getMessageCacheKey(userID, v.Seq)).Err(); err != nil {
for _, seq := range seqs {
delUserListKey := c.getMessageDelUserListKey(conversationID, seq)
userDelListKey := c.getUserDelList(conversationID, userID)
err := pipe.SAdd(ctx, delUserListKey, userID).Err()
if err != nil {
return errs.Wrap(err)
}
err = pipe.SAdd(ctx, userDelListKey, seq).Err()
if err != nil {
return errs.Wrap(err)
}
if err := pipe.Expire(ctx, delUserListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
return errs.Wrap(err)
}
if err := pipe.Expire(ctx, userDelListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
return errs.Wrap(err)
}
}
_, err := pipe.Exec(ctx)
return errs.Wrap(err)
}
func (c *msgCache) GetUserDelList(ctx context.Context, userID, conversationID string) (seqs []int64, err error) {
result, err := c.rdb.SMembers(ctx, c.getUserDelList(userID, conversationID)).Result()
if err != nil {
return nil, errs.Wrap(err)
}
seqs = make([]int64, len(result))
for i, v := range result {
seqs[i] = utils.StringToInt64(v)
}
return seqs, nil
}
func (c *msgCache) DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64) {
for _, seq := range seqs {
delUsers, err := c.rdb.SMembers(ctx, c.getMessageDelUserListKey(conversationID, seq)).Result()
if err != nil {
log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq)
continue
}
for _, userID := range delUsers {
if err := c.rdb.SRem(ctx, c.getUserDelList(conversationID, userID), seq).Err(); err != nil {
log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq, "userID", userID)
}
if err := c.rdb.Del(ctx, c.getMessageDelUserListKey(conversationID, seq)).Err(); err != nil {
log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq)
}
}
}
}
func (c *msgCache) DeleteMessages(ctx context.Context, conversationID string, seqs []int64) error {
pipe := c.rdb.Pipeline()
for _, seq := range seqs {
if err := pipe.Del(ctx, c.getMessageCacheKey(conversationID, seq)).Err(); err != nil {
return errs.Wrap(err)
}
}
@ -572,41 +640,3 @@ func (c *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID
func (c *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err())
}
func (c *msgCache) NewCache() MsgModel {
return &msgCache{
metaCache: NewMetaCacheRedis(c.rcClient, c.metaCache.GetPreDelKeys()...),
expireTime: c.expireTime,
rcClient: c.rcClient,
}
}
func (c msgCache) getMsgReadCacheKey(docID string, seq int64) string {
return messageReadCache + docID + "_" + strconv.Itoa(int(seq))
}
func (c *msgCache) getMsgsIndex(msg *unRelationTb.MsgInfoModel, keys []string) (int, error) {
key := c.getMsgReadCacheKey(utils.GetConversationIDByMsgModel(msg.Msg), msg.Msg.Seq)
for i, _key := range keys {
if key == _key {
return i, nil
}
}
return 0, errIndex
}
func (c *msgCache) GetMsgsByConversationIDAndSeq(ctx context.Context, userID, docID string, seqs []int64) ([]*unRelationTb.MsgInfoModel, error) {
var keys []string
for _, seq := range seqs {
keys = append(keys, c.getMsgReadCacheKey(docID, seq))
}
return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.getMsgsIndex, func(ctx context.Context) ([]*unRelationTb.MsgInfoModel, error) {
return c.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, docID, seqs)
})
}
func (c *msgCache) DeleteMsgByConversationIDAndSeq(ctx context.Context, docID string, seq int64) MsgModel {
cache := c.NewCache()
c.AddKeys(c.getMsgReadCacheKey(docID, seq))
return cache
}

View File

@ -37,7 +37,8 @@ type CommonMsgDatabase interface {
// mark as read
MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, seqs []int64) error
// 刪除redis中消息缓存
DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error
DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error
DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64)
// incrSeq然后批量插入缓存
BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, err error)
@ -316,8 +317,12 @@ func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userI
return nil
}
func (db *commonMsgDatabase) DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error {
return db.cache.DeleteMessageFromCache(ctx, conversationID, msgs)
func (db *commonMsgDatabase) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error {
return db.cache.DeleteMessages(ctx, conversationID, seqs)
}
func (db *commonMsgDatabase) DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64) {
db.cache.DelUserDeleteMsgsList(ctx, conversationID, seqs)
}
func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) {
@ -474,13 +479,29 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
break
}
}
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs)
cachedMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs)
if err != nil {
if err != redis.Nil {
prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
log.ZError(ctx, "get message from redis exception", err, conversationID, seqs)
}
}
var successMsgs []*sdkws.MsgData
if len(cachedMsgs) > 0 {
delSeqs, err := db.cache.GetUserDelList(ctx, userID, conversationID)
if err != nil && errs.Unwrap(err) != redis.Nil {
return 0, 0, nil, err
} else {
log.ZDebug(ctx, "get delSeqs from redis", "delSeqs", delSeqs, "userID", userID, "conversationID", conversationID)
}
for _, msg := range cachedMsgs {
if !utils.Contain(msg.Seq, delSeqs...) {
successMsgs = append(successMsgs, msg)
}
}
}
log.ZDebug(ctx, "get msgs from cache", "successMsgs", successMsgs)
if len(failedSeqs) != 0 {
log.ZDebug(ctx, "msgs not exist in redis", err, "seqs", seqs)
}
@ -635,6 +656,9 @@ func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversatio
}
func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, allSeqs []int64) error {
if err := db.cache.DeleteMessages(ctx, conversationID, allSeqs); err != nil {
return err
}
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) {
var indexes []int
for _, seq := range seqs {
@ -648,6 +672,19 @@ func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conve
}
func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error {
msgs, _, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs)
if err != nil {
log.ZWarn(ctx, "DeleteUserMsgsBySeqs", err, "conversationID", conversationID, "seqs", seqs)
return err
}
var cacheSeqs []int64
for _, msg := range msgs {
cacheSeqs = append(cacheSeqs, msg.Seq)
}
if err := db.cache.UserDeleteMsgs(ctx, conversationID, cacheSeqs, userID); err != nil {
return err
}
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) {
for _, seq := range seqs {
if _, err := db.msgDocDatabase.PushUnique(ctx, docID, db.msg.GetMsgIndex(seq), "del_list", []string{userID}); err != nil {