diff --git a/config/notification.yaml b/config/notification.yaml index fc5d1c071..3550933d6 100644 --- a/config/notification.yaml +++ b/config/notification.yaml @@ -13,6 +13,7 @@ groupCreated: groupInfoSet: isSendMsg: true + reliabilityLevel: 1 unreadCount: false offlinePush: enable: false @@ -23,6 +24,7 @@ groupInfoSet: joinGroupApplication: isSendMsg: true + reliabilityLevel: 1 unreadCount: false offlinePush: enable: false @@ -32,6 +34,7 @@ joinGroupApplication: memberQuit: isSendMsg: true + reliabilityLevel: 1 unreadCount: false offlinePush: enable: false @@ -41,6 +44,7 @@ memberQuit: groupApplicationAccepted: isSendMsg: true + reliabilityLevel: 1 unreadCount: false offlinePush: enable: false @@ -50,6 +54,7 @@ groupApplicationAccepted: groupApplicationRejected: isSendMsg: true + reliabilityLevel: 1 unreadCount: false offlinePush: enable: false @@ -60,6 +65,7 @@ groupApplicationRejected: groupOwnerTransferred: isSendMsg: true + reliabilityLevel: 1 unreadCount: false offlinePush: enable: false @@ -69,6 +75,7 @@ groupOwnerTransferred: memberKicked: isSendMsg: true + reliabilityLevel: 1 unreadCount: false offlinePush: enable: false @@ -78,6 +85,7 @@ memberKicked: memberInvited: isSendMsg: true + reliabilityLevel: 1 unreadCount: false offlinePush: enable: false @@ -87,6 +95,7 @@ memberInvited: memberEnter: isSendMsg: true + reliabilityLevel: 1 unreadCount: false offlinePush: enable: false @@ -96,6 +105,7 @@ memberEnter: groupDismissed: isSendMsg: true + reliabilityLevel: 1 unreadCount: false offlinePush: enable: false @@ -105,6 +115,7 @@ groupDismissed: groupMuted: isSendMsg: true + reliabilityLevel: 1 unreadCount: false offlinePush: enable: false @@ -114,6 +125,7 @@ groupMuted: groupCancelMuted: isSendMsg: true + reliabilityLevel: 1 unreadCount: false offlinePush: enable: false @@ -126,6 +138,7 @@ groupCancelMuted: groupMemberMuted: isSendMsg: true + reliabilityLevel: 1 unreadCount: false offlinePush: enable: false @@ -135,6 +148,7 @@ groupMemberMuted: groupMemberCancelMuted: isSendMsg: true + reliabilityLevel: 1 unreadCount: false offlinePush: enable: false @@ -144,6 +158,7 @@ groupMemberCancelMuted: groupMemberInfoSet: isSendMsg: true + reliabilityLevel: 1 unreadCount: false offlinePush: enable: false @@ -154,6 +169,7 @@ groupMemberInfoSet: #############################friend################################# friendApplicationAdded: isSendMsg: true + reliabilityLevel: 1 unreadCount: false offlinePush: enable: false @@ -163,6 +179,7 @@ friendApplicationAdded: friendApplicationApproved: isSendMsg: true + reliabilityLevel: 1 unreadCount: false offlinePush: enable: true @@ -172,6 +189,7 @@ friendApplicationApproved: friendApplicationRejected: isSendMsg: true + reliabilityLevel: 1 unreadCount: false offlinePush: enable: true @@ -181,6 +199,7 @@ friendApplicationRejected: friendAdded: isSendMsg: true + reliabilityLevel: 1 unreadCount: false offlinePush: enable: true @@ -190,6 +209,7 @@ friendAdded: friendDeleted: isSendMsg: true + reliabilityLevel: 1 unreadCount: false offlinePush: enable: true @@ -199,6 +219,7 @@ friendDeleted: friendRemarkSet: isSendMsg: true + reliabilityLevel: 1 unreadCount: false offlinePush: enable: true @@ -208,6 +229,7 @@ friendRemarkSet: blackAdded: isSendMsg: true + reliabilityLevel: 1 unreadCount: false offlinePush: enable: true @@ -217,6 +239,7 @@ blackAdded: blackDeleted: isSendMsg: true + reliabilityLevel: 1 unreadCount: false offlinePush: enable: true @@ -226,6 +249,7 @@ blackDeleted: friendInfoUpdated: isSendMsg: true + reliabilityLevel: 1 unreadCount: false offlinePush: enable: true @@ -236,6 +260,7 @@ friendInfoUpdated: #####################user######################### userInfoUpdated: isSendMsg: true + reliabilityLevel: 1 unreadCount: false offlinePush: enable: true @@ -246,6 +271,7 @@ userInfoUpdated: #####################conversation######################### conversationChanged: isSendMsg: true + reliabilityLevel: 1 unreadCount: false offlinePush: enable: true @@ -255,6 +281,7 @@ conversationChanged: conversationSetPrivate: isSendMsg: true + reliabilityLevel: 1 unreadCount: false offlinePush: enable: true diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index 512528352..3a38202ba 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -235,14 +235,14 @@ func (c *Client) writeBinaryMsg(resp Resp) error { } encodedBuf := bufferPool.Get().([]byte) resultBuf := bufferPool.Get().([]byte) - encodeBuf, err := c.longConnServer.Encode(resp) + encodedBuf, err := c.longConnServer.Encode(resp) if err != nil { return utils.Wrap(err, "") } _ = c.conn.SetWriteDeadline(writeWait) if c.isCompress { var compressErr error - resultBuf, compressErr = c.longConnServer.Compress(encodeBuf) + resultBuf, compressErr = c.longConnServer.Compress(encodedBuf) if compressErr != nil { return utils.Wrap(compressErr, "") } diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 38ad1f376..8984d7ded 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -90,11 +90,11 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { ctxMsgList := msgChannelValue.ctxMsgList ctx := msgChannelValue.ctx log.ZDebug(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "conversationID", msgChannelValue.conversationID) - storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(msgChannelValue.conversationID, ctxMsgList) + storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(ctxMsgList) log.ZDebug(ctx, "msg lens", "storageMsgList", len(storageMsgList), "notStorageMsgList", len(notStorageMsgList), "storageNotificationList", len(storageNotificationList), "notStorageNotificationList", len(notStorageNotificationList), "modifyMsgList", len(modifyMsgList)) - och.handleMsg(ctx, msgChannelValue.conversationID, storageMsgList, notStorageMsgList) - och.handleNotification(ctx, msgChannelValue.conversationID, storageNotificationList, notStorageNotificationList) + och.handleMsg(ctx, utils.GetChatConversationIDByMsg(ctxMsgList[0].message), storageMsgList, notStorageMsgList) + och.handleNotification(ctx, utils.GetNotificationConversationID(ctxMsgList[0].message), storageNotificationList, notStorageNotificationList) if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.conversationID, modifyMsgList); err != nil { log.ZError(ctx, "msg to modify mq error", err, "conversationID", msgChannelValue.conversationID, "modifyMsgList", modifyMsgList) } @@ -104,7 +104,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { } // 获取消息/通知 存储的消息列表, 不存储并且推送的消息列表, -func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(conversationID string, totalMsgs []*ContextMsg) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*sdkws.MsgData) { +func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(totalMsgs []*ContextMsg) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*sdkws.MsgData) { isStorage := func(msg *sdkws.MsgData) bool { options2 := utils.Options(msg.Options) if options2.IsHistory() { @@ -124,7 +124,7 @@ func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(conversation if options.IsSendMsg() { // 消息 if v.message.Options != nil { - v.message.Options = utils.WithOptions(utils.Options(v.message.Options), utils.WithNotification(false), utils.WithSendMsg(false)) + v.message.Options = utils.WithOptions(utils.Options(v.message.Options), utils.WithNotNotification(true), utils.WithSendMsg(false)) } storageMsgList = append(storageMsgList, v.message) } @@ -155,7 +155,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con log.ZError(ctx, "notification batch insert to redis error", err, "conversationID", conversationID, "storageList", storageList) return } - log.ZDebug(ctx, "success to next topic") + log.ZDebug(ctx, "success to next topic", "conversationID", conversationID) och.msgDatabase.MsgToMongoMQ(ctx, conversationID, storageList, lastSeq) och.toPushTopic(ctx, conversationID, storageList) } diff --git a/internal/rpc/msg/send_pull.go b/internal/rpc/msg/send_pull.go index eba6edf20..6b1c7bfe9 100644 --- a/internal/rpc/msg/send_pull.go +++ b/internal/rpc/msg/send_pull.go @@ -62,10 +62,14 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *msg.SendMsgReq) if err != nil { return nil, err } + + var isSend bool = true conversationID := utils.GetConversationIDByMsg(req.MsgData) - isSend, err := m.modifyMessageByUserMessageReceiveOpt(ctx, req.MsgData.RecvID, conversationID, constant.SingleChatType, req) - if err != nil { - return nil, err + if utils.MsgIsNotification(req.MsgData) { + isSend, err = m.modifyMessageByUserMessageReceiveOpt(ctx, req.MsgData.RecvID, conversationID, constant.SingleChatType, req) + if err != nil { + return nil, err + } } if isSend { err = m.MsgDatabase.MsgToMQ(ctx, conversationID, req.MsgData) diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 7488f68fb..8dadbc0bb 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -341,7 +341,7 @@ type Notification struct { ////////////////////////user/////////////////////// UserInfoUpdated NotificationConf `yaml:"userInfoUpdated"` //////////////////////friend/////////////////////// - FriendApplication NotificationConf `yaml:"friendApplicationAdded"` + FriendApplicationAdded NotificationConf `yaml:"friendApplicationAdded"` FriendApplicationApproved NotificationConf `yaml:"friendApplicationApproved"` FriendApplicationRejected NotificationConf `yaml:"friendApplicationRejected"` FriendAdded NotificationConf `yaml:"friendAdded"` diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index 8a65ec01e..11b012010 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -142,7 +142,7 @@ const ( IsSenderConversationUpdate = "senderConversationUpdate" IsSenderNotificationPush = "senderNotificationPush" IsReactionFromCache = "reactionFromCache" - IsNotification = "isNotification" + IsNotNotification = "isNotNotification" IsSendMsg = "isSendMsg" //GroupStatus diff --git a/pkg/common/db/cache/conversation.go b/pkg/common/db/cache/conversation.go index 77979e153..f52ef6cd4 100644 --- a/pkg/common/db/cache/conversation.go +++ b/pkg/common/db/cache/conversation.go @@ -30,10 +30,10 @@ type ConversationCache interface { NewCache() ConversationCache // get user's conversationIDs from msgCache GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) - DelConversationIDs(userIDs []string) ConversationCache + DelConversationIDs(userIDs ...string) ConversationCache // get one conversation from msgCache GetConversation(ctx context.Context, ownerUserID, conversationID string) (*relationTb.ConversationModel, error) - DelConvsersations(ownerUserID string, conversationIDs []string) ConversationCache + DelConvsersations(ownerUserID string, conversationIDs ...string) ConversationCache DelUsersConversation(conversationID string, ownerUserIDs ...string) ConversationCache // get one conversation from msgCache GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) @@ -97,7 +97,7 @@ func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, own }) } -func (c *ConversationRedisCache) DelConversationIDs(userIDs []string) ConversationCache { +func (c *ConversationRedisCache) DelConversationIDs(userIDs ...string) ConversationCache { var keys []string for _, userID := range userIDs { keys = append(keys, c.getConversationIDsKey(userID)) @@ -113,7 +113,7 @@ func (c *ConversationRedisCache) GetConversation(ctx context.Context, ownerUserI }) } -func (c *ConversationRedisCache) DelConvsersations(ownerUserID string, convsersationIDs []string) ConversationCache { +func (c *ConversationRedisCache) DelConvsersations(ownerUserID string, convsersationIDs ...string) ConversationCache { var keys []string for _, conversationID := range convsersationIDs { keys = append(keys, c.getConversationKey(ownerUserID, conversationID)) diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index 89f6c9967..99dfe46b4 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -7,9 +7,11 @@ import ( "time" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" + "github.com/dtm-labs/rockscache" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" + unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" @@ -29,6 +31,7 @@ const ( getuiToken = "GETUI_TOKEN" getuiTaskID = "GETUI_TASK_ID" messageCache = "MESSAGE_CACHE:" + messageReadCache = "MESSAGE_READ_CACHE:" signalCache = "SIGNAL_CACHE:" signalListCache = "SIGNAL_LIST_CACHE:" fcmToken = "FCM_TOKEN:" @@ -84,6 +87,9 @@ type MsgModel interface { SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error + + GetMsgsByConversationIDAndSeq(ctx context.Context, docID string, seqs []int64) ([]*sdkws.MsgData, error) + DeleteMsgByConversationIDAndSeq(ctx context.Context, docID string, seq int64) MsgModel } func NewMsgCacheModel(client redis.UniversalClient) MsgModel { @@ -91,39 +97,11 @@ func NewMsgCacheModel(client redis.UniversalClient) MsgModel { } type msgCache struct { - rdb redis.UniversalClient -} - -// 兼容老版本调用 -func (c *msgCache) DelKeys() { - for _, key := range []string{"GROUP_CACHE:", "FRIEND_RELATION_CACHE:", "BLACK_LIST_CACHE:", "USER_INFO_CACHE:", "GROUP_INFO_CACHE:", "JOINED_GROUP_LIST_CACHE:", - "GROUP_MEMBER_INFO_CACHE:", "GROUP_ALL_MEMBER_INFO_CACHE:", "ALL_FRIEND_INFO_CACHE:"} { - fName := utils.GetSelfFuncName() - var cursor uint64 - var n int - for { - var keys []string - var err error - keys, cursor, err = c.rdb.Scan(context.Background(), cursor, key+"*", scanCount).Result() - if err != nil { - panic(err.Error()) - } - n += len(keys) - // for each for redis cluster - for _, key := range keys { - if err = c.rdb.Del(context.Background(), key).Err(); err != nil { - log.NewError("", fName, key, err.Error()) - err = c.rdb.Del(context.Background(), key).Err() - if err != nil { - panic(err.Error()) - } - } - } - if cursor == 0 { - break - } - } - } + metaCache + rdb redis.UniversalClient + expireTime time.Duration + rcClient *rockscache.Client + msgDocDatabase unRelationTb.MsgDocModelInterface } func (c *msgCache) getMaxSeqKey(conversationID string) string { @@ -145,7 +123,6 @@ func (c *msgCache) getSeq(ctx context.Context, conversationID string, getkey fun func (c *msgCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) { pipe := c.rdb.Pipeline() for _, v := range items { - log.ZDebug(ctx, "getSeqs", "getkey", getkey(v)) if err := pipe.Get(ctx, getkey(v)).Err(); err != nil && err != redis.Nil { return nil, errs.Wrap(err) } @@ -550,3 +527,41 @@ func (c *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID func (c *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err()) } + +func (c *msgCache) NewCache() MsgModel { + return &msgCache{ + metaCache: NewMetaCacheRedis(c.rcClient, c.metaCache.GetPreDelKeys()...), + expireTime: c.expireTime, + rcClient: c.rcClient, + } +} + +func (c msgCache) getMsgReadCacheKey(docID string, seq int64) string { + return messageReadCache + docID + "_" + strconv.Itoa(int(seq)) +} + +func (c *msgCache) getMsgsIndex(msg *sdkws.MsgData, keys []string) (int, error) { + key := c.getMsgReadCacheKey(utils.GetConversationIDByMsg(msg), msg.Seq) + for i, _key := range keys { + if key == _key { + return i, nil + } + } + return 0, errIndex +} + +func (c *msgCache) GetMsgsByConversationIDAndSeq(ctx context.Context, docID string, seqs []int64) ([]*sdkws.MsgData, error) { + var keys []string + for _, seq := range seqs { + keys = append(keys, c.getMsgReadCacheKey(docID, seq)) + } + return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.getMsgsIndex, func(ctx context.Context) ([]*sdkws.MsgData, error) { + return c.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, seqs) + }) +} + +func (c *msgCache) DeleteMsgByConversationIDAndSeq(ctx context.Context, docID string, seq int64) MsgModel { + cache := c.NewCache() + c.AddKeys(c.getMsgReadCacheKey(docID, seq)) + return cache +} diff --git a/pkg/common/db/controller/black.go b/pkg/common/db/controller/black.go index 07380053a..7479722b8 100644 --- a/pkg/common/db/controller/black.go +++ b/pkg/common/db/controller/black.go @@ -2,10 +2,10 @@ package controller import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" - "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" - "gorm.io/gorm" + "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" ) type BlackDatabase interface { @@ -31,12 +31,26 @@ func NewBlackDatabase(black relation.BlackModelInterface, cache cache.BlackCache // Create 增加黑名单 func (b *blackDatabase) Create(ctx context.Context, blacks []*relation.BlackModel) (err error) { - return b.black.Create(ctx, blacks) + if err := b.black.Create(ctx, blacks); err != nil { + return err + } + return b.deleteBlackIDsCache(ctx, blacks) } // Delete 删除黑名单 func (b *blackDatabase) Delete(ctx context.Context, blacks []*relation.BlackModel) (err error) { - return b.black.Delete(ctx, blacks) + if err := b.black.Delete(ctx, blacks); err != nil { + return err + } + return b.deleteBlackIDsCache(ctx, blacks) +} + +func (b *blackDatabase) deleteBlackIDsCache(ctx context.Context, blacks []*relation.BlackModel) (err error) { + cache := b.cache.NewCache() + for _, black := range blacks { + cache = cache.DelBlackIDs(ctx, black.OwnerUserID) + } + return cache.ExecDel(ctx) } // FindOwnerBlacks 获取黑名单列表 @@ -46,21 +60,15 @@ func (b *blackDatabase) FindOwnerBlacks(ctx context.Context, ownerUserID string, // CheckIn 检查user2是否在user1的黑名单列表中(inUser1Blacks==true) 检查user1是否在user2的黑名单列表中(inUser2Blacks==true) func (b *blackDatabase) CheckIn(ctx context.Context, userID1, userID2 string) (inUser1Blacks bool, inUser2Blacks bool, err error) { - _, err = b.black.Take(ctx, userID1, userID2) + userID1BlackIDs, err := b.cache.GetBlackIDs(ctx, userID1) if err != nil { - if errs.Unwrap(err) != gorm.ErrRecordNotFound { - return - } + return } - inUser1Blacks = err == nil - _, err = b.black.Take(ctx, userID2, userID1) + userID2BlackIDs, err := b.cache.GetBlackIDs(ctx, userID2) if err != nil { - if errs.Unwrap(err) != gorm.ErrRecordNotFound { - return - } + return } - inUser2Blacks = err == nil - return inUser1Blacks, inUser2Blacks, nil + return utils.IsContain(userID2, userID1BlackIDs), utils.IsContain(userID1, userID2BlackIDs), nil } func (b *blackDatabase) FindBlackIDs(ctx context.Context, ownerUserID string) (blackIDs []string, err error) { diff --git a/pkg/common/db/controller/conversation.go b/pkg/common/db/controller/conversation.go index aa3eec74d..bcdd36eba 100644 --- a/pkg/common/db/controller/conversation.go +++ b/pkg/common/db/controller/conversation.go @@ -46,14 +46,14 @@ type ConversationDataBase struct { tx tx.Tx } -func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error { - return c.tx.Transaction(func(tx any) error { +func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) (err error) { + cache := c.cache.NewCache() + if err := c.tx.Transaction(func(tx any) error { conversationTx := c.conversationDB.NewTx(tx) haveUserIDs, err := conversationTx.FindUserID(ctx, userIDs, []string{conversation.ConversationID}) if err != nil { return err } - cache := c.cache.NewCache() if len(haveUserIDs) > 0 { _, err = conversationTx.UpdateByMap(ctx, haveUserIDs, conversation.ConversationID, filedMap) if err != nil { @@ -71,19 +71,20 @@ func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, } temp.OwnerUserID = v conversations = append(conversations, temp) - } + } if len(conversations) > 0 { err = conversationTx.Create(ctx, conversations) if err != nil { return err } - cache = cache.DelConversationIDs(NotUserIDs) + cache = cache.DelConversationIDs(NotUserIDs...) } - // clear cache - log.ZDebug(ctx, "SetUsersConversationFiledTx", "cache", cache.GetPreDelKeys(), "addr", &cache) - return cache.ExecDel(ctx) - }) + return nil + }); err != nil { + return err + } + return cache.ExecDel(ctx) } func (c *ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context, userIDs []string, conversationID string, args map[string]interface{}) error { @@ -98,13 +99,17 @@ func (c *ConversationDataBase) CreateConversation(ctx context.Context, conversat if err := c.conversationDB.Create(ctx, conversations); err != nil { return err } - return nil + var userIDs []string + for _, conversation := range conversations { + userIDs = append(userIDs, conversation.OwnerUserID) + } + return c.cache.DelConversationIDs(userIDs...).ExecDel(ctx) } func (c *ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversations []*relationTb.ConversationModel) error { - return c.tx.Transaction(func(tx any) error { + cache := c.cache.NewCache() + if err := c.tx.Transaction(func(tx any) error { conversationTx := c.conversationDB.NewTx(tx) - cache := c.cache.NewCache() for _, conversation := range conversations { for _, v := range [][2]string{{conversation.OwnerUserID, conversation.UserID}, {conversation.UserID, conversation.OwnerUserID}} { haveUserIDs, err := conversationTx.FindUserID(ctx, []string{v[0]}, []string{conversation.ConversationID}) @@ -126,12 +131,15 @@ func (c *ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Con if err := conversationTx.Create(ctx, []*relationTb.ConversationModel{&newConversation}); err != nil { return err } - cache = cache.DelConversationIDs([]string{v[0]}) + cache = cache.DelConversationIDs([]string{v[0]}...) } } } - return c.cache.ExecDel(ctx) - }) + return nil + }); err != nil { + return err + } + return c.cache.ExecDel(ctx) } func (c *ConversationDataBase) FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) { @@ -147,7 +155,8 @@ func (c *ConversationDataBase) GetUserAllConversation(ctx context.Context, owner } func (c *ConversationDataBase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error { - return c.tx.Transaction(func(tx any) error { + cache := c.cache.NewCache() + if err := c.tx.Transaction(func(tx any) error { var conversationIDs []string for _, conversation := range conversations { conversationIDs = append(conversationIDs, conversation.ConversationID) @@ -181,13 +190,14 @@ func (c *ConversationDataBase) SetUserConversations(ctx context.Context, ownerUs if err != nil { return err } + cache = cache.DelConversationIDs([]string{ownerUserID}...) } - cache := c.cache.NewCache() - if len(notExistConversations) > 0 { - cache = cache.DelConversationIDs([]string{ownerUserID}) - } - return cache.DelConvsersations(ownerUserID, existConversationIDs).ExecDel(ctx) - }) + cache = cache.DelConvsersations(ownerUserID, existConversationIDs...) + return nil + }); err != nil { + return err + } + return cache.ExecDel(ctx) } func (c *ConversationDataBase) FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) { @@ -195,27 +205,36 @@ func (c *ConversationDataBase) FindRecvMsgNotNotifyUserIDs(ctx context.Context, } func (c *ConversationDataBase) CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error { + cache := c.cache.NewCache() conversationID := utils.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID) - return c.tx.Transaction(func(tx any) error { - existConversationUserIDs, err := c.conversationDB.FindUserID(ctx, userIDs, []string{groupID}) + if err := c.tx.Transaction(func(tx any) error { + existConversationUserIDs, err := c.conversationDB.FindUserID(ctx, userIDs, []string{conversationID}) if err != nil { return err } notExistUserIDs := utils.DifferenceString(userIDs, existConversationUserIDs) - var conversations []*relationTb.ConversationModel for _, v := range notExistUserIDs { conversation := relationTb.ConversationModel{ConversationType: constant.SuperGroupChatType, GroupID: groupID, OwnerUserID: v, ConversationID: conversationID} conversations = append(conversations, &conversation) } + cache = cache.DelConversationIDs(notExistUserIDs...) err = c.conversationDB.Create(ctx, conversations) if err != nil { return err } - _, err = c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, map[string]interface{}{"max_seq": 0}) + _, err = c.conversationDB.UpdateByMap(ctx, existConversationUserIDs, conversationID, map[string]interface{}{"max_seq": 0}) + if err != nil { + return err + } + for _, v := range existConversationUserIDs { + cache = cache.DelConvsersations(v, conversationID) + } + return nil + }); err != nil { return err - }) - + } + return cache.ExecDel(ctx) } func (c *ConversationDataBase) GetConversationIDs(ctx context.Context, userID string) ([]string, error) { diff --git a/pkg/common/db/controller/friend.go b/pkg/common/db/controller/friend.go index e24fc9165..1c3fc0364 100644 --- a/pkg/common/db/controller/friend.go +++ b/pkg/common/db/controller/friend.go @@ -55,19 +55,15 @@ func NewFriendDatabase(friend relation.FriendModelInterface, friendRequest relat // ok 检查user2是否在user1的好友列表中(inUser1Friends==true) 检查user1是否在user2的好友列表中(inUser2Friends==true) func (f *friendDatabase) CheckIn(ctx context.Context, userID1, userID2 string) (inUser1Friends bool, inUser2Friends bool, err error) { - friends, err := f.friend.FindUserState(ctx, userID1, userID2) + userID1FriendIDs, err := f.cache.GetFriendIDs(ctx, userID1) if err != nil { - return false, false, err + return } - for _, v := range friends { - if v.OwnerUserID == userID1 && v.FriendUserID == userID2 { - inUser1Friends = true - } - if v.OwnerUserID == userID2 && v.FriendUserID == userID1 { - inUser2Friends = true - } + userID2FriendIDs, err := f.cache.GetFriendIDs(ctx, userID2) + if err != nil { + return } - return + return utils.IsContain(userID2, userID1FriendIDs), utils.IsContain(userID1, userID2FriendIDs), nil } // 增加或者更新好友申请 如果之前有记录则更新,没有记录则新增 @@ -100,7 +96,8 @@ func (f *friendDatabase) AddFriendRequest(ctx context.Context, fromUserID, toUse // (1)先判断是否在好友表 (在不在都不返回错误) (2)对于不在好友列表的 插入即可 func (f *friendDatabase) BecomeFriends(ctx context.Context, ownerUserID string, friendUserIDs []string, addSource int32) (err error) { - return f.tx.Transaction(func(tx any) error { + cache := f.cache.NewCache() + if err := f.tx.Transaction(func(tx any) error { //先find 找出重复的 去掉重复的 fs1, err := f.friend.NewTx(tx).FindFriends(ctx, ownerUserID, friendUserIDs) if err != nil { @@ -135,8 +132,12 @@ func (f *friendDatabase) BecomeFriends(ctx context.Context, ownerUserID string, return err } newFriendIDs = append(newFriendIDs, ownerUserID) - return f.cache.DelFriendIDs(newFriendIDs...).ExecDel(ctx) - }) + cache = cache.DelFriendIDs(newFriendIDs...) + return nil + }); err != nil { + return nil + } + return cache.ExecDel(ctx) } // 拒绝好友申请 (1)检查是否有申请记录且为未处理状态 (没有记录返回错误) (2)修改申请记录 已拒绝 @@ -199,24 +200,18 @@ func (f *friendDatabase) AgreeFriendRequest(ctx context.Context, friendRequest * // 删除好友 外部判断是否好友关系 func (f *friendDatabase) Delete(ctx context.Context, ownerUserID string, friendUserIDs []string) (err error) { - return f.tx.Transaction(func(tx any) error { - if err := f.friend.Delete(ctx, ownerUserID, friendUserIDs); err != nil { - return err - } - return f.cache.DelFriendIDs(append(friendUserIDs, ownerUserID)...).ExecDel(ctx) - }) - + if err := f.friend.Delete(ctx, ownerUserID, friendUserIDs); err != nil { + return err + } + return f.cache.DelFriendIDs(append(friendUserIDs, ownerUserID)...).ExecDel(ctx) } // 更新好友备注 零值也支持 func (f *friendDatabase) UpdateRemark(ctx context.Context, ownerUserID, friendUserID, remark string) (err error) { - return f.tx.Transaction(func(tx any) error { - err := f.friend.UpdateRemark(ctx, ownerUserID, friendUserID, remark) - if err != nil { - return err - } - return f.cache.DelFriend(ownerUserID, friendUserID).ExecDel(ctx) - }) + if err := f.friend.UpdateRemark(ctx, ownerUserID, friendUserID, remark); err != nil { + return err + } + return f.cache.DelFriend(ownerUserID, friendUserID).ExecDel(ctx) } // 获取ownerUserID的好友列表 无结果不返回错误 diff --git a/pkg/common/db/controller/group.go b/pkg/common/db/controller/group.go index 57c5f1fb2..7f5cc4101 100644 --- a/pkg/common/db/controller/group.go +++ b/pkg/common/db/controller/group.go @@ -113,7 +113,8 @@ func (g *groupDatabase) FindGroupMemberUserID(ctx context.Context, groupID strin } func (g *groupDatabase) CreateGroup(ctx context.Context, groups []*relationTb.GroupModel, groupMembers []*relationTb.GroupMemberModel) error { - return g.tx.Transaction(func(tx any) error { + var cache = g.cache.NewCache() + if err := g.tx.Transaction(func(tx any) error { if len(groups) > 0 { if err := g.groupDB.NewTx(tx).Create(ctx, groups); err != nil { return err @@ -128,7 +129,7 @@ func (g *groupDatabase) CreateGroup(ctx context.Context, groups []*relationTb.Gr return group.GroupID }) m := make(map[string]struct{}) - var cache = g.cache.NewCache() + for _, groupMember := range groupMembers { if _, ok := m[groupMember.GroupID]; !ok { m[groupMember.GroupID] = struct{}{} @@ -137,8 +138,11 @@ func (g *groupDatabase) CreateGroup(ctx context.Context, groups []*relationTb.Gr cache = cache.DelJoinedGroupID(groupMember.UserID).DelGroupMembersInfo(groupMember.GroupID, groupMember.UserID) } cache = cache.DelGroupsInfo(createGroupIDs...) - return cache.ExecDel(ctx) - }) + return nil + }); err != nil { + return err + } + return cache.ExecDel(ctx) } func (g *groupDatabase) TakeGroup(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error) { @@ -154,16 +158,15 @@ func (g *groupDatabase) SearchGroup(ctx context.Context, keyword string, pageNum } func (g *groupDatabase) UpdateGroup(ctx context.Context, groupID string, data map[string]any) error { - return g.tx.Transaction(func(tx any) error { - if err := g.groupDB.NewTx(tx).UpdateMap(ctx, groupID, data); err != nil { - return err - } - return g.cache.DelGroupsInfo(groupID).ExecDel(ctx) - }) + if err := g.groupDB.UpdateMap(ctx, groupID, data); err != nil { + return err + } + return g.cache.DelGroupsInfo(groupID).ExecDel(ctx) } func (g *groupDatabase) DismissGroup(ctx context.Context, groupID string) error { - return g.tx.Transaction(func(tx any) error { + cache := g.cache.NewCache() + if err := g.tx.Transaction(func(tx any) error { if err := g.groupDB.NewTx(tx).UpdateStatus(ctx, groupID, constant.GroupStatusDismissed); err != nil { return err } @@ -174,8 +177,12 @@ func (g *groupDatabase) DismissGroup(ctx context.Context, groupID string) error if err != nil { return err } - return g.cache.DelJoinedGroupID(userIDs...).DelGroupsInfo(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelGroupMembersHash(groupID).ExecDel(ctx) - }) + cache = cache.DelJoinedGroupID(userIDs...).DelGroupsInfo(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelGroupMembersHash(groupID) + return nil + }); err != nil { + return err + } + return cache.ExecDel(ctx) } func (g *groupDatabase) TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relationTb.GroupMemberModel, err error) { @@ -236,7 +243,8 @@ func (g *groupDatabase) SearchGroupMember(ctx context.Context, keyword string, g } func (g *groupDatabase) HandlerGroupRequest(ctx context.Context, groupID string, userID string, handledMsg string, handleResult int32, member *relationTb.GroupMemberModel) error { - return g.tx.Transaction(func(tx any) error { + cache := g.cache.NewCache() + if err := g.tx.Transaction(func(tx any) error { if err := g.groupRequestDB.NewTx(tx).UpdateHandler(ctx, groupID, userID, handledMsg, handleResult); err != nil { return err } @@ -244,19 +252,20 @@ func (g *groupDatabase) HandlerGroupRequest(ctx context.Context, groupID string, if err := g.groupMemberDB.NewTx(tx).Create(ctx, []*relationTb.GroupMemberModel{member}); err != nil { return err } - return g.cache.DelGroupMembersHash(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelJoinedGroupID(member.UserID).ExecDel(ctx) + cache = cache.DelGroupMembersHash(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelJoinedGroupID(member.UserID) } return nil - }) + }); err != nil { + return err + } + return cache.ExecDel(ctx) } func (g *groupDatabase) DeleteGroupMember(ctx context.Context, groupID string, userIDs []string) error { - return g.tx.Transaction(func(tx any) error { - if err := g.groupMemberDB.NewTx(tx).Delete(ctx, groupID, userIDs); err != nil { - return err - } - return g.cache.DelGroupMembersHash(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelJoinedGroupID(userIDs...).DelGroupMembersInfo(groupID, userIDs...).ExecDel(ctx) - }) + if err := g.groupMemberDB.Delete(ctx, groupID, userIDs); err != nil { + return err + } + return g.cache.DelGroupMembersHash(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelJoinedGroupID(userIDs...).DelGroupMembersInfo(groupID, userIDs...).ExecDel(ctx) } func (g *groupDatabase) MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error) { @@ -276,7 +285,7 @@ func (g *groupDatabase) MapGroupMemberNum(ctx context.Context, groupIDs []string } func (g *groupDatabase) TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error { - return g.tx.Transaction(func(tx any) error { + if err := g.tx.Transaction(func(tx any) error { rowsAffected, err := g.groupMemberDB.NewTx(tx).UpdateRoleLevel(ctx, groupID, oldOwnerUserID, roleLevel) if err != nil { return err @@ -291,30 +300,34 @@ func (g *groupDatabase) TransferGroupOwner(ctx context.Context, groupID string, if rowsAffected != 1 { return utils.Wrap(fmt.Errorf("newOwnerUserID %s rowsAffected = %d", newOwnerUserID, rowsAffected), "") } - return g.cache.DelGroupMembersInfo(groupID, oldOwnerUserID, newOwnerUserID).ExecDel(ctx) - }) + return nil + }); err != nil { + return err + } + return g.cache.DelGroupMembersInfo(groupID, oldOwnerUserID, newOwnerUserID).ExecDel(ctx) } func (g *groupDatabase) UpdateGroupMember(ctx context.Context, groupID string, userID string, data map[string]any) error { - return g.tx.Transaction(func(tx any) error { - if err := g.groupMemberDB.NewTx(tx).Update(ctx, groupID, userID, data); err != nil { - return err - } - return g.cache.DelGroupMembersInfo(groupID, userID).ExecDel(ctx) - }) + if err := g.groupMemberDB.Update(ctx, groupID, userID, data); err != nil { + return err + } + return g.cache.DelGroupMembersInfo(groupID, userID).ExecDel(ctx) } func (g *groupDatabase) UpdateGroupMembers(ctx context.Context, data []*relationTb.BatchUpdateGroupMember) error { - return g.tx.Transaction(func(tx any) error { - var cache = g.cache.NewCache() + var cache = g.cache.NewCache() + if err := g.tx.Transaction(func(tx any) error { for _, item := range data { if err := g.groupMemberDB.NewTx(tx).Update(ctx, item.GroupID, item.UserID, item.Map); err != nil { return err } cache = cache.DelGroupMembersInfo(item.GroupID, item.UserID) } - return cache.ExecDel(ctx) - }) + return nil + }); err != nil { + return err + } + return cache.ExecDel(ctx) } func (g *groupDatabase) CreateGroupRequest(ctx context.Context, requests []*relationTb.GroupRequestModel) error { @@ -346,16 +359,15 @@ func (g *groupDatabase) FindJoinSuperGroup(ctx context.Context, userID string) ( } func (g *groupDatabase) CreateSuperGroup(ctx context.Context, groupID string, initMemberIDs []string) error { - return g.ctxTx.Transaction(ctx, func(ctx context.Context) error { - if err := g.mongoDB.CreateSuperGroup(ctx, groupID, initMemberIDs); err != nil { - return err - } - return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(initMemberIDs...).ExecDel(ctx) - }) + if err := g.mongoDB.CreateSuperGroup(ctx, groupID, initMemberIDs); err != nil { + return err + } + return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(initMemberIDs...).ExecDel(ctx) } func (g *groupDatabase) DeleteSuperGroup(ctx context.Context, groupID string) error { - return g.ctxTx.Transaction(ctx, func(ctx context.Context) error { + cache := g.cache.NewCache() + if err := g.ctxTx.Transaction(ctx, func(ctx context.Context) error { if err := g.mongoDB.DeleteSuperGroup(ctx, groupID); err != nil { return err } @@ -363,28 +375,27 @@ func (g *groupDatabase) DeleteSuperGroup(ctx context.Context, groupID string) er if err != nil { return err } - cache := g.cache.DelSuperGroupMemberIDs(groupID) + cache = cache.DelSuperGroupMemberIDs(groupID) if len(models) > 0 { cache = cache.DelJoinedSuperGroupIDs(models[0].MemberIDs...) } - return cache.ExecDel(ctx) - }) + return nil + }); err != nil { + return err + } + return cache.ExecDel(ctx) } func (g *groupDatabase) DeleteSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error { - return g.ctxTx.Transaction(ctx, func(ctx context.Context) error { - if err := g.mongoDB.RemoverUserFromSuperGroup(ctx, groupID, userIDs); err != nil { - return err - } - return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(userIDs...).ExecDel(ctx) - }) + if err := g.mongoDB.RemoverUserFromSuperGroup(ctx, groupID, userIDs); err != nil { + return err + } + return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(userIDs...).ExecDel(ctx) } func (g *groupDatabase) CreateSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error { - return g.ctxTx.Transaction(ctx, func(ctx context.Context) error { - if err := g.mongoDB.AddUserToSuperGroup(ctx, groupID, userIDs); err != nil { - return err - } - return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(userIDs...).ExecDel(ctx) - }) + if err := g.mongoDB.AddUserToSuperGroup(ctx, groupID, userIDs); err != nil { + return err + } + return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(userIDs...).ExecDel(ctx) } diff --git a/pkg/common/db/controller/common_msg.go b/pkg/common/db/controller/msg.go similarity index 94% rename from pkg/common/db/controller/common_msg.go rename to pkg/common/db/controller/msg.go index 819d20811..762e4f19e 100644 --- a/pkg/common/db/controller/common_msg.go +++ b/pkg/common/db/controller/msg.go @@ -100,13 +100,12 @@ type commonMsgDatabase struct { msgDocDatabase unRelationTb.MsgDocModelInterface extendMsgDatabase unRelationTb.ExtendMsgSetModelInterface extendMsgSetModel unRelationTb.ExtendMsgSetModel + msg unRelationTb.MsgDocModel cache cache.MsgModel producer *kafka.Producer producerToMongo *kafka.Producer producerToModify *kafka.Producer producerToPush *kafka.Producer - // model - msg unRelationTb.MsgDocModel } func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error { @@ -277,7 +276,7 @@ func (db *commonMsgDatabase) DelMsgBySeqs(ctx context.Context, conversationID st } func (db *commonMsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (unExistSeqs []int64, err error) { - seqMsgs, indexes, unExistSeqs, err := db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs) + seqMsgs, indexes, unExistSeqs, err := db.msgDocDatabase.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs) if err != nil { return nil, err } @@ -289,37 +288,6 @@ func (db *commonMsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID str return unExistSeqs, nil } -func (db *commonMsgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) { - doc, err := db.msgDocDatabase.FindOneByDocID(ctx, docID) - if err != nil { - return nil, nil, nil, err - } - singleCount := 0 - var hasSeqList []int64 - for i := 0; i < len(doc.Msg); i++ { - msgPb, err := db.unmarshalMsg(&doc.Msg[i]) - if err != nil { - return nil, nil, nil, err - } - if utils.Contain(msgPb.Seq, seqs...) { - indexes = append(indexes, i) - seqMsgs = append(seqMsgs, msgPb) - hasSeqList = append(hasSeqList, msgPb.Seq) - singleCount++ - if singleCount == len(seqs) { - break - } - } - } - for _, i := range seqs { - if utils.Contain(i, hasSeqList...) { - continue - } - unExistSeqs = append(unExistSeqs, i) - } - return seqMsgs, indexes, unExistSeqs, nil -} - func (db *commonMsgDatabase) GetNewestMsg(ctx context.Context, conversationID string) (msgPb *sdkws.MsgData, err error) { msgInfo, err := db.msgDocDatabase.GetNewestMsg(ctx, conversationID) if err != nil { @@ -345,15 +313,22 @@ func (db *commonMsgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (m return msgPb, nil } -func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, err error) { - seqMsgs, unexistSeqs, err := db.findMsgBySeq(ctx, conversationID, seqs) - if err != nil { - return nil, err +func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { + m := db.msg.GetDocIDSeqsMap(conversationID, seqs) + var totalUnExistSeqs []int64 + for docID, seqs := range m { + log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs) + seqMsgs, unexistSeqs, err := db.findMsgBySeq(ctx, docID, seqs) + if err != nil { + return nil, err + } + totalMsgs = append(totalMsgs, seqMsgs...) + totalUnExistSeqs = append(totalUnExistSeqs, unexistSeqs...) } - for _, unexistSeq := range unexistSeqs { - seqMsgs = append(seqMsgs, db.msg.GenExceptionMessageBySeqs([]int64{unexistSeq})...) + for _, unexistSeq := range totalUnExistSeqs { + totalMsgs = append(totalMsgs, db.msg.GenExceptionMessageBySeqs([]int64{unexistSeq})...) } - return seqMsgs, nil + return totalMsgs, nil } func (db *commonMsgDatabase) refetchDelSeqsMsgs(ctx context.Context, conversationID string, delNums, rangeBegin, begin int64) (seqMsgs []*sdkws.MsgData, err error) { @@ -372,8 +347,8 @@ func (db *commonMsgDatabase) refetchDelSeqsMsgs(ctx context.Context, conversatio } if len(reFetchSeqs) > 0 { m := db.msg.GetDocIDSeqsMap(conversationID, reFetchSeqs) - for docID, seq := range m { - msgs, _, err := db.findMsgBySeq(ctx, docID, seq) + for docID, seqs := range m { + msgs, _, err := db.findMsgBySeq(ctx, docID, seqs) if err != nil { return nil, err } @@ -395,12 +370,11 @@ func (db *commonMsgDatabase) refetchDelSeqsMsgs(ctx context.Context, conversatio } func (db *commonMsgDatabase) findMsgBySeq(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, unExistSeqs []int64, err error) { - beginSeq, endSeq := db.msg.GetSeqsBeginEnd(seqs) - msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, beginSeq, endSeq) + msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, seqs) if err != nil { return nil, nil, err } - log.ZDebug(ctx, "findMsgBySeq", "docID", docID, "seqs", seqs, "beginSeq", beginSeq, "endSeq", endSeq, "len(msgs)", len(msgs)) + log.ZDebug(ctx, "findMsgBySeq", "docID", docID, "seqs", seqs, "len(msgs)", len(msgs)) seqMsgs = append(seqMsgs, msgs...) if len(msgs) == 0 { unExistSeqs = seqs @@ -416,7 +390,7 @@ func (db *commonMsgDatabase) findMsgBySeq(ctx context.Context, docID string, seq } } } - msgs, _, unExistSeqs, err = db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, unExistSeqs) + msgs, _, unExistSeqs, err = db.msgDocDatabase.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, unExistSeqs) if err != nil { return nil, nil, err } @@ -446,7 +420,7 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, conversation m = db.msg.GetDocIDSeqsMap(conversationID, totalNotExistSeqs) for docID, seqs := range m { docID = db.msg.ToNextDoc(docID) - msgs, _, unExistSeqs, err := db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs) + msgs, _, unExistSeqs, err := db.msgDocDatabase.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs) if err != nil { missedSeqs = append(missedSeqs, seqs...) log.ZError(ctx, "get message from mongo exception", err, "docID", docID, "seqs", seqs) @@ -477,7 +451,6 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, conversation if len(totalNotExistSeqs) > 0 || len(delSeqs) > 0 { sort.Sort(utils.MsgBySeq(seqMsgs)) } - // missSeqs为依然缺失的 return seqMsgs, nil } @@ -485,7 +458,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, conversation var seqs []int64 for i := end; i > end-num; i-- { if i >= begin { - seqs = append(seqs, i) + seqs = append([]int64{i}, seqs...) } else { break } diff --git a/pkg/common/db/controller/user.go b/pkg/common/db/controller/user.go index 019942cda..72b1a9c49 100644 --- a/pkg/common/db/controller/user.go +++ b/pkg/common/db/controller/user.go @@ -76,40 +76,36 @@ func (u *userDatabase) Find(ctx context.Context, userIDs []string) (users []*rel // 插入多条 外部保证userID 不重复 且在db中不存在 func (u *userDatabase) Create(ctx context.Context, users []*relation.UserModel) (err error) { - return u.tx.Transaction(func(tx any) error { + if err := u.tx.Transaction(func(tx any) error { err = u.userDB.Create(ctx, users) if err != nil { return err } - var userIDs []string - for _, user := range users { - userIDs = append(userIDs, user.UserID) - } - return u.cache.DelUsersInfo(userIDs...).ExecDel(ctx) - }) + return nil + }); err != nil { + return err + } + var userIDs []string + for _, user := range users { + userIDs = append(userIDs, user.UserID) + } + return u.cache.DelUsersInfo(userIDs...).ExecDel(ctx) } // 更新(非零值) 外部保证userID存在 func (u *userDatabase) Update(ctx context.Context, user *relation.UserModel) (err error) { - return u.tx.Transaction(func(tx any) error { - err = u.userDB.Update(ctx, user) - if err != nil { - return err - } - return u.cache.DelUsersInfo(user.UserID).ExecDel(ctx) - }) + if err := u.userDB.Update(ctx, user); err != nil { + return err + } + return u.cache.DelUsersInfo(user.UserID).ExecDel(ctx) } // 更新(零值) 外部保证userID存在 func (u *userDatabase) UpdateByMap(ctx context.Context, userID string, args map[string]interface{}) (err error) { - return u.tx.Transaction(func(tx any) error { - err = u.userDB.UpdateByMap(ctx, userID, args) - if err != nil { - return err - } - return u.cache.DelUsersInfo(userID).ExecDel(ctx) - }) - + if err := u.userDB.UpdateByMap(ctx, userID, args); err != nil { + return err + } + return u.cache.DelUsersInfo(userID).ExecDel(ctx) } // 获取,如果没找到,不返回错误 diff --git a/pkg/common/db/table/unrelation/msg.go b/pkg/common/db/table/unrelation/msg.go index 6e6916cb1..7d67269d5 100644 --- a/pkg/common/db/table/unrelation/msg.go +++ b/pkg/common/db/table/unrelation/msg.go @@ -31,7 +31,8 @@ type MsgDocModelInterface interface { Create(ctx context.Context, model *MsgDocModel) error UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error) - GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, beginSeq, endSeq int64) ([]*sdkws.MsgData, error) + GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) ([]*sdkws.MsgData, error) + GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) GetNewestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error) GetOldestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error) Delete(ctx context.Context, docIDs []string) error @@ -97,13 +98,6 @@ func (m MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[st return t } -func (m MsgDocModel) GetSeqsBeginEnd(seqs []int64) (int64, int64) { - if len(seqs) == 0 { - return 0, 0 - } - return seqs[len(seqs)-1], seqs[0] -} - func (m MsgDocModel) GetMsgIndex(seq int64) int64 { seqSuffix := seq / singleGocMsgNum var index int64 diff --git a/pkg/common/db/unrelation/msg.go b/pkg/common/db/unrelation/msg.go index b0ace47ae..0db2f2faf 100644 --- a/pkg/common/db/unrelation/msg.go +++ b/pkg/common/db/unrelation/msg.go @@ -57,6 +57,37 @@ func (m *MsgMongoDriver) FindOneByDocID(ctx context.Context, docID string) (*tab return doc, err } +func (m *MsgMongoDriver) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) { + doc, err := m.FindOneByDocID(ctx, docID) + if err != nil { + return nil, nil, nil, err + } + singleCount := 0 + var hasSeqList []int64 + for i := 0; i < len(doc.Msg); i++ { + var msg sdkws.MsgData + if err := proto.Unmarshal(doc.Msg[i].Msg, &msg); err != nil { + return nil, nil, nil, err + } + if utils.Contain(msg.Seq, seqs...) { + indexes = append(indexes, i) + seqMsgs = append(seqMsgs, &msg) + hasSeqList = append(hasSeqList, msg.Seq) + singleCount++ + if singleCount == len(seqs) { + break + } + } + } + for _, i := range seqs { + if utils.Contain(i, hasSeqList...) { + continue + } + unExistSeqs = append(unExistSeqs, i) + } + return seqMsgs, indexes, unExistSeqs, nil +} + func (m *MsgMongoDriver) GetMsgsByIndex(ctx context.Context, conversationID string, index int64) (*table.MsgDocModel, error) { findOpts := options.Find().SetLimit(1).SetSkip(index).SetSort(bson.M{"doc_id": 1}) cursor, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": primitive.Regex{Pattern: fmt.Sprintf("^%s:", conversationID)}}, findOpts) @@ -134,7 +165,8 @@ func (m *MsgMongoDriver) UpdateOneDoc(ctx context.Context, msg *table.MsgDocMode return err } -func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, beginSeq, endSeq int64) (msgs []*sdkws.MsgData, err error) { +func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) (msgs []*sdkws.MsgData, err error) { + beginSeq, endSeq := utils.GetSeqsBeginEnd(seqs) beginIndex := m.msg.GetMsgIndex(beginSeq) num := endSeq - beginSeq + 1 pipeline := bson.A{ @@ -165,7 +197,7 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID strin break } } - log.ZDebug(ctx, "msgInfos", "num", len(doc.Msg)) + log.ZDebug(ctx, "msgInfos", "num", len(doc.Msg), "docID", docID) for _, v := range doc.Msg { var msg sdkws.MsgData if err := proto.Unmarshal(v.Msg, &msg); err != nil { diff --git a/pkg/proto/friend/validate.go b/pkg/proto/friend/validate.go index aea3d09e5..9a864cfa5 100644 --- a/pkg/proto/friend/validate.go +++ b/pkg/proto/friend/validate.go @@ -3,7 +3,6 @@ package friend import "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" func (m *ApplyToAddFriendReq) Check() error { - *m = ApplyToAddFriendReq{} if m.GetToUserID() == "" { return errs.ErrArgs.Wrap("get toUserID is empty") } diff --git a/pkg/rpcclient/msg.go b/pkg/rpcclient/msg.go index 3160235a4..fd72d0d27 100644 --- a/pkg/rpcclient/msg.go +++ b/pkg/rpcclient/msg.go @@ -41,7 +41,7 @@ func newContentTypeConf() map[int32]config.NotificationConf { // user constant.UserInfoUpdatedNotification: config.Config.Notification.UserInfoUpdated, // friend - constant.FriendApplicationNotification: config.Config.Notification.FriendApplication, + constant.FriendApplicationNotification: config.Config.Notification.FriendApplicationAdded, constant.FriendApplicationApprovedNotification: config.Config.Notification.FriendApplicationApproved, constant.FriendApplicationRejectedNotification: config.Config.Notification.FriendApplicationRejected, constant.FriendAddedNotification: config.Config.Notification.FriendAdded, diff --git a/pkg/utils/options.go b/pkg/utils/options.go index 49a4325d1..4381d8132 100644 --- a/pkg/utils/options.go +++ b/pkg/utils/options.go @@ -7,7 +7,7 @@ type OptionsOpt func(Options) func NewOptions(opts ...OptionsOpt) Options { options := make(map[string]bool, 11) - options[constant.IsNotification] = false + options[constant.IsNotNotification] = false options[constant.IsSendMsg] = false options[constant.IsHistory] = false options[constant.IsPersistent] = false @@ -32,9 +32,9 @@ func WithOptions(options Options, opts ...OptionsOpt) Options { return options } -func WithNotification(b bool) OptionsOpt { +func WithNotNotification(b bool) OptionsOpt { return func(options Options) { - options[constant.IsNotification] = b + options[constant.IsNotNotification] = b } } @@ -113,7 +113,7 @@ func (o Options) Is(notification string) bool { } func (o Options) IsNotNotification() bool { - return o.Is(constant.IsNotification) + return o.Is(constant.IsNotNotification) } func (o Options) IsSendMsg() bool { diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 4e28b631c..903abc78d 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -170,6 +170,43 @@ func GetHashCode(s string) uint32 { return crc32.ChecksumIEEE([]byte(s)) } +func MsgIsNotification(msg *sdkws.MsgData) bool { + options := Options(msg.Options) + return !options.IsNotNotification() +} + +func GetNotificationConversationID(msg *sdkws.MsgData) string { + switch msg.SessionType { + case constant.SingleChatType: + l := []string{msg.SendID, msg.RecvID} + sort.Strings(l) + return "n_" + strings.Join(l, "_") + case constant.GroupChatType: + return "n_" + msg.GroupID + case constant.SuperGroupChatType: + return "n_" + msg.GroupID + case constant.NotificationChatType: + return "n_" + msg.SendID + "_" + msg.RecvID + } + return "" +} + +func GetChatConversationIDByMsg(msg *sdkws.MsgData) string { + switch msg.SessionType { + case constant.SingleChatType: + l := []string{msg.SendID, msg.RecvID} + sort.Strings(l) + return "si_" + strings.Join(l, "_") + case constant.GroupChatType: + return "g_" + msg.GroupID + case constant.SuperGroupChatType: + return "sg_" + msg.GroupID + case constant.NotificationChatType: + return "sn_" + msg.SendID + "_" + msg.RecvID + } + return "" +} + func GetConversationIDByMsg(msg *sdkws.MsgData) string { options := Options(msg.Options) switch msg.SessionType { @@ -252,6 +289,13 @@ func GetSelfNotificationConversationID(userID string) []string { return []string{"n_" + userID + "_" + userID, "si_" + userID + "_" + userID} } +func GetSeqsBeginEnd(seqs []int64) (int64, int64) { + if len(seqs) == 0 { + return 0, 0 + } + return seqs[0], seqs[len(seqs)-1] +} + type MsgBySeq []*sdkws.MsgData func (s MsgBySeq) Len() int {