From 2446a5d48c7a6fda4f2bc4e732944e531278083b Mon Sep 17 00:00:00 2001
From: wangchuxiao <wangchuxiao97@outlook.com>
Date: Tue, 16 May 2023 16:31:35 +0800
Subject: [PATCH] msgCache

---
 pkg/common/db/cache/conversation.go           |   8 +-
 pkg/common/db/cache/msg.go                    |  82 +++++++-----
 pkg/common/db/controller/black.go             |  38 +++---
 pkg/common/db/controller/conversation.go      |  75 +++++++----
 pkg/common/db/controller/friend.go            |  49 +++----
 pkg/common/db/controller/group.go             | 125 ++++++++++--------
 .../db/controller/{common_msg.go => msg.go}   |  46 +------
 pkg/common/db/controller/user.go              |  40 +++---
 pkg/common/db/table/unrelation/msg.go         |  10 +-
 pkg/common/db/unrelation/msg.go               |  34 ++++-
 pkg/utils/utils.go                            |   7 +
 11 files changed, 279 insertions(+), 235 deletions(-)
 rename pkg/common/db/controller/{common_msg.go => msg.go} (95%)

diff --git a/pkg/common/db/cache/conversation.go b/pkg/common/db/cache/conversation.go
index 77979e153..f52ef6cd4 100644
--- a/pkg/common/db/cache/conversation.go
+++ b/pkg/common/db/cache/conversation.go
@@ -30,10 +30,10 @@ type ConversationCache interface {
 	NewCache() ConversationCache
 	// get user's conversationIDs from msgCache
 	GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error)
-	DelConversationIDs(userIDs []string) ConversationCache
+	DelConversationIDs(userIDs ...string) ConversationCache
 	// get one conversation from msgCache
 	GetConversation(ctx context.Context, ownerUserID, conversationID string) (*relationTb.ConversationModel, error)
-	DelConvsersations(ownerUserID string, conversationIDs []string) ConversationCache
+	DelConvsersations(ownerUserID string, conversationIDs ...string) ConversationCache
 	DelUsersConversation(conversationID string, ownerUserIDs ...string) ConversationCache
 	// get one conversation from msgCache
 	GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error)
@@ -97,7 +97,7 @@ func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, own
 	})
 }
 
-func (c *ConversationRedisCache) DelConversationIDs(userIDs []string) ConversationCache {
+func (c *ConversationRedisCache) DelConversationIDs(userIDs ...string) ConversationCache {
 	var keys []string
 	for _, userID := range userIDs {
 		keys = append(keys, c.getConversationIDsKey(userID))
@@ -113,7 +113,7 @@ func (c *ConversationRedisCache) GetConversation(ctx context.Context, ownerUserI
 	})
 }
 
-func (c *ConversationRedisCache) DelConvsersations(ownerUserID string, convsersationIDs []string) ConversationCache {
+func (c *ConversationRedisCache) DelConvsersations(ownerUserID string, convsersationIDs ...string) ConversationCache {
 	var keys []string
 	for _, conversationID := range convsersationIDs {
 		keys = append(keys, c.getConversationKey(ownerUserID, conversationID))
diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go
index 89f6c9967..038eee303 100644
--- a/pkg/common/db/cache/msg.go
+++ b/pkg/common/db/cache/msg.go
@@ -7,9 +7,11 @@ import (
 	"time"
 
 	"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
+	"github.com/dtm-labs/rockscache"
 
 	"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
 	"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
+	unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
 	"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
 	"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
 	"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
@@ -29,6 +31,7 @@ const (
 	getuiToken       = "GETUI_TOKEN"
 	getuiTaskID      = "GETUI_TASK_ID"
 	messageCache     = "MESSAGE_CACHE:"
+	messageReadCache = "MESSAGE_READ_CACHE:"
 	signalCache      = "SIGNAL_CACHE:"
 	signalListCache  = "SIGNAL_LIST_CACHE:"
 	fcmToken         = "FCM_TOKEN:"
@@ -84,6 +87,9 @@ type MsgModel interface {
 	SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error
 	LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
 	UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
+
+	GetMsgsByConversationIDAndSeq(ctx context.Context, docID string, seqs []int64) ([]*sdkws.MsgData, error)
+	DeleteMsgByConversationIDAndSeq(ctx context.Context, docID string, seq int64) MsgModel
 }
 
 func NewMsgCacheModel(client redis.UniversalClient) MsgModel {
@@ -91,39 +97,11 @@ func NewMsgCacheModel(client redis.UniversalClient) MsgModel {
 }
 
 type msgCache struct {
-	rdb redis.UniversalClient
-}
-
-// 兼容老版本调用
-func (c *msgCache) DelKeys() {
-	for _, key := range []string{"GROUP_CACHE:", "FRIEND_RELATION_CACHE:", "BLACK_LIST_CACHE:", "USER_INFO_CACHE:", "GROUP_INFO_CACHE:", "JOINED_GROUP_LIST_CACHE:",
-		"GROUP_MEMBER_INFO_CACHE:", "GROUP_ALL_MEMBER_INFO_CACHE:", "ALL_FRIEND_INFO_CACHE:"} {
-		fName := utils.GetSelfFuncName()
-		var cursor uint64
-		var n int
-		for {
-			var keys []string
-			var err error
-			keys, cursor, err = c.rdb.Scan(context.Background(), cursor, key+"*", scanCount).Result()
-			if err != nil {
-				panic(err.Error())
-			}
-			n += len(keys)
-			// for each for redis cluster
-			for _, key := range keys {
-				if err = c.rdb.Del(context.Background(), key).Err(); err != nil {
-					log.NewError("", fName, key, err.Error())
-					err = c.rdb.Del(context.Background(), key).Err()
-					if err != nil {
-						panic(err.Error())
-					}
-				}
-			}
-			if cursor == 0 {
-				break
-			}
-		}
-	}
+	metaCache
+	rdb            redis.UniversalClient
+	expireTime     time.Duration
+	rcClient       *rockscache.Client
+	msgDocDatabase unRelationTb.MsgDocModelInterface
 }
 
 func (c *msgCache) getMaxSeqKey(conversationID string) string {
@@ -550,3 +528,41 @@ func (c *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID
 func (c *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
 	return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err())
 }
+
+func (c *msgCache) NewCache() MsgModel {
+	return &msgCache{
+		metaCache:  NewMetaCacheRedis(c.rcClient, c.metaCache.GetPreDelKeys()...),
+		expireTime: c.expireTime,
+		rcClient:   c.rcClient,
+	}
+}
+
+func (c msgCache) getMsgReadCacheKey(docID string, seq int64) string {
+	return messageReadCache + docID + "_" + strconv.Itoa(int(seq))
+}
+
+func (c *msgCache) getMsgsIndex(msg *sdkws.MsgData, keys []string) (int, error) {
+	key := c.getMsgReadCacheKey(utils.GetConversationIDByMsg(msg), msg.Seq)
+	for i, _key := range keys {
+		if key == _key {
+			return i, nil
+		}
+	}
+	return 0, errIndex
+}
+
+func (c *msgCache) GetMsgsByConversationIDAndSeq(ctx context.Context, docID string, seqs []int64) ([]*sdkws.MsgData, error) {
+	var keys []string
+	for _, seq := range seqs {
+		keys = append(keys, c.getMsgReadCacheKey(docID, seq))
+	}
+	return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.getMsgsIndex, func(ctx context.Context) ([]*sdkws.MsgData, error) {
+		return c.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, seqs)
+	})
+}
+
+func (c *msgCache) DeleteMsgByConversationIDAndSeq(ctx context.Context, docID string, seq int64) MsgModel {
+	cache := c.NewCache()
+	c.AddKeys(c.getMsgReadCacheKey(docID, seq))
+	return cache
+}
diff --git a/pkg/common/db/controller/black.go b/pkg/common/db/controller/black.go
index 07380053a..7479722b8 100644
--- a/pkg/common/db/controller/black.go
+++ b/pkg/common/db/controller/black.go
@@ -2,10 +2,10 @@ package controller
 
 import (
 	"context"
+
 	"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
 	"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
-	"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
-	"gorm.io/gorm"
+	"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
 )
 
 type BlackDatabase interface {
@@ -31,12 +31,26 @@ func NewBlackDatabase(black relation.BlackModelInterface, cache cache.BlackCache
 
 // Create 增加黑名单
 func (b *blackDatabase) Create(ctx context.Context, blacks []*relation.BlackModel) (err error) {
-	return b.black.Create(ctx, blacks)
+	if err := b.black.Create(ctx, blacks); err != nil {
+		return err
+	}
+	return b.deleteBlackIDsCache(ctx, blacks)
 }
 
 // Delete 删除黑名单
 func (b *blackDatabase) Delete(ctx context.Context, blacks []*relation.BlackModel) (err error) {
-	return b.black.Delete(ctx, blacks)
+	if err := b.black.Delete(ctx, blacks); err != nil {
+		return err
+	}
+	return b.deleteBlackIDsCache(ctx, blacks)
+}
+
+func (b *blackDatabase) deleteBlackIDsCache(ctx context.Context, blacks []*relation.BlackModel) (err error) {
+	cache := b.cache.NewCache()
+	for _, black := range blacks {
+		cache = cache.DelBlackIDs(ctx, black.OwnerUserID)
+	}
+	return cache.ExecDel(ctx)
 }
 
 // FindOwnerBlacks 获取黑名单列表
@@ -46,21 +60,15 @@ func (b *blackDatabase) FindOwnerBlacks(ctx context.Context, ownerUserID string,
 
 // CheckIn 检查user2是否在user1的黑名单列表中(inUser1Blacks==true) 检查user1是否在user2的黑名单列表中(inUser2Blacks==true)
 func (b *blackDatabase) CheckIn(ctx context.Context, userID1, userID2 string) (inUser1Blacks bool, inUser2Blacks bool, err error) {
-	_, err = b.black.Take(ctx, userID1, userID2)
+	userID1BlackIDs, err := b.cache.GetBlackIDs(ctx, userID1)
 	if err != nil {
-		if errs.Unwrap(err) != gorm.ErrRecordNotFound {
-			return
-		}
+		return
 	}
-	inUser1Blacks = err == nil
-	_, err = b.black.Take(ctx, userID2, userID1)
+	userID2BlackIDs, err := b.cache.GetBlackIDs(ctx, userID2)
 	if err != nil {
-		if errs.Unwrap(err) != gorm.ErrRecordNotFound {
-			return
-		}
+		return
 	}
-	inUser2Blacks = err == nil
-	return inUser1Blacks, inUser2Blacks, nil
+	return utils.IsContain(userID2, userID1BlackIDs), utils.IsContain(userID1, userID2BlackIDs), nil
 }
 
 func (b *blackDatabase) FindBlackIDs(ctx context.Context, ownerUserID string) (blackIDs []string, err error) {
diff --git a/pkg/common/db/controller/conversation.go b/pkg/common/db/controller/conversation.go
index aa3eec74d..bcdd36eba 100644
--- a/pkg/common/db/controller/conversation.go
+++ b/pkg/common/db/controller/conversation.go
@@ -46,14 +46,14 @@ type ConversationDataBase struct {
 	tx             tx.Tx
 }
 
-func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error {
-	return c.tx.Transaction(func(tx any) error {
+func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) (err error) {
+	cache := c.cache.NewCache()
+	if err := c.tx.Transaction(func(tx any) error {
 		conversationTx := c.conversationDB.NewTx(tx)
 		haveUserIDs, err := conversationTx.FindUserID(ctx, userIDs, []string{conversation.ConversationID})
 		if err != nil {
 			return err
 		}
-		cache := c.cache.NewCache()
 		if len(haveUserIDs) > 0 {
 			_, err = conversationTx.UpdateByMap(ctx, haveUserIDs, conversation.ConversationID, filedMap)
 			if err != nil {
@@ -71,19 +71,20 @@ func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context,
 			}
 			temp.OwnerUserID = v
 			conversations = append(conversations, temp)
-		}
 
+		}
 		if len(conversations) > 0 {
 			err = conversationTx.Create(ctx, conversations)
 			if err != nil {
 				return err
 			}
-			cache = cache.DelConversationIDs(NotUserIDs)
+			cache = cache.DelConversationIDs(NotUserIDs...)
 		}
-		// clear cache
-		log.ZDebug(ctx, "SetUsersConversationFiledTx", "cache", cache.GetPreDelKeys(), "addr", &cache)
-		return cache.ExecDel(ctx)
-	})
+		return nil
+	}); err != nil {
+		return err
+	}
+	return cache.ExecDel(ctx)
 }
 
 func (c *ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context, userIDs []string, conversationID string, args map[string]interface{}) error {
@@ -98,13 +99,17 @@ func (c *ConversationDataBase) CreateConversation(ctx context.Context, conversat
 	if err := c.conversationDB.Create(ctx, conversations); err != nil {
 		return err
 	}
-	return nil
+	var userIDs []string
+	for _, conversation := range conversations {
+		userIDs = append(userIDs, conversation.OwnerUserID)
+	}
+	return c.cache.DelConversationIDs(userIDs...).ExecDel(ctx)
 }
 
 func (c *ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversations []*relationTb.ConversationModel) error {
-	return c.tx.Transaction(func(tx any) error {
+	cache := c.cache.NewCache()
+	if err := c.tx.Transaction(func(tx any) error {
 		conversationTx := c.conversationDB.NewTx(tx)
-		cache := c.cache.NewCache()
 		for _, conversation := range conversations {
 			for _, v := range [][2]string{{conversation.OwnerUserID, conversation.UserID}, {conversation.UserID, conversation.OwnerUserID}} {
 				haveUserIDs, err := conversationTx.FindUserID(ctx, []string{v[0]}, []string{conversation.ConversationID})
@@ -126,12 +131,15 @@ func (c *ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Con
 					if err := conversationTx.Create(ctx, []*relationTb.ConversationModel{&newConversation}); err != nil {
 						return err
 					}
-					cache = cache.DelConversationIDs([]string{v[0]})
+					cache = cache.DelConversationIDs([]string{v[0]}...)
 				}
 			}
 		}
-		return c.cache.ExecDel(ctx)
-	})
+		return nil
+	}); err != nil {
+		return err
+	}
+	return c.cache.ExecDel(ctx)
 }
 
 func (c *ConversationDataBase) FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) {
@@ -147,7 +155,8 @@ func (c *ConversationDataBase) GetUserAllConversation(ctx context.Context, owner
 }
 
 func (c *ConversationDataBase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error {
-	return c.tx.Transaction(func(tx any) error {
+	cache := c.cache.NewCache()
+	if err := c.tx.Transaction(func(tx any) error {
 		var conversationIDs []string
 		for _, conversation := range conversations {
 			conversationIDs = append(conversationIDs, conversation.ConversationID)
@@ -181,13 +190,14 @@ func (c *ConversationDataBase) SetUserConversations(ctx context.Context, ownerUs
 			if err != nil {
 				return err
 			}
+			cache = cache.DelConversationIDs([]string{ownerUserID}...)
 		}
-		cache := c.cache.NewCache()
-		if len(notExistConversations) > 0 {
-			cache = cache.DelConversationIDs([]string{ownerUserID})
-		}
-		return cache.DelConvsersations(ownerUserID, existConversationIDs).ExecDel(ctx)
-	})
+		cache = cache.DelConvsersations(ownerUserID, existConversationIDs...)
+		return nil
+	}); err != nil {
+		return err
+	}
+	return cache.ExecDel(ctx)
 }
 
 func (c *ConversationDataBase) FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) {
@@ -195,27 +205,36 @@ func (c *ConversationDataBase) FindRecvMsgNotNotifyUserIDs(ctx context.Context,
 }
 
 func (c *ConversationDataBase) CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error {
+	cache := c.cache.NewCache()
 	conversationID := utils.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID)
-	return c.tx.Transaction(func(tx any) error {
-		existConversationUserIDs, err := c.conversationDB.FindUserID(ctx, userIDs, []string{groupID})
+	if err := c.tx.Transaction(func(tx any) error {
+		existConversationUserIDs, err := c.conversationDB.FindUserID(ctx, userIDs, []string{conversationID})
 		if err != nil {
 			return err
 		}
 		notExistUserIDs := utils.DifferenceString(userIDs, existConversationUserIDs)
-
 		var conversations []*relationTb.ConversationModel
 		for _, v := range notExistUserIDs {
 			conversation := relationTb.ConversationModel{ConversationType: constant.SuperGroupChatType, GroupID: groupID, OwnerUserID: v, ConversationID: conversationID}
 			conversations = append(conversations, &conversation)
 		}
+		cache = cache.DelConversationIDs(notExistUserIDs...)
 		err = c.conversationDB.Create(ctx, conversations)
 		if err != nil {
 			return err
 		}
-		_, err = c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, map[string]interface{}{"max_seq": 0})
+		_, err = c.conversationDB.UpdateByMap(ctx, existConversationUserIDs, conversationID, map[string]interface{}{"max_seq": 0})
+		if err != nil {
+			return err
+		}
+		for _, v := range existConversationUserIDs {
+			cache = cache.DelConvsersations(v, conversationID)
+		}
+		return nil
+	}); err != nil {
 		return err
-	})
-
+	}
+	return cache.ExecDel(ctx)
 }
 
 func (c *ConversationDataBase) GetConversationIDs(ctx context.Context, userID string) ([]string, error) {
diff --git a/pkg/common/db/controller/friend.go b/pkg/common/db/controller/friend.go
index e24fc9165..1c3fc0364 100644
--- a/pkg/common/db/controller/friend.go
+++ b/pkg/common/db/controller/friend.go
@@ -55,19 +55,15 @@ func NewFriendDatabase(friend relation.FriendModelInterface, friendRequest relat
 
 // ok 检查user2是否在user1的好友列表中(inUser1Friends==true) 检查user1是否在user2的好友列表中(inUser2Friends==true)
 func (f *friendDatabase) CheckIn(ctx context.Context, userID1, userID2 string) (inUser1Friends bool, inUser2Friends bool, err error) {
-	friends, err := f.friend.FindUserState(ctx, userID1, userID2)
+	userID1FriendIDs, err := f.cache.GetFriendIDs(ctx, userID1)
 	if err != nil {
-		return false, false, err
+		return
 	}
-	for _, v := range friends {
-		if v.OwnerUserID == userID1 && v.FriendUserID == userID2 {
-			inUser1Friends = true
-		}
-		if v.OwnerUserID == userID2 && v.FriendUserID == userID1 {
-			inUser2Friends = true
-		}
+	userID2FriendIDs, err := f.cache.GetFriendIDs(ctx, userID2)
+	if err != nil {
+		return
 	}
-	return
+	return utils.IsContain(userID2, userID1FriendIDs), utils.IsContain(userID1, userID2FriendIDs), nil
 }
 
 // 增加或者更新好友申请 如果之前有记录则更新,没有记录则新增
@@ -100,7 +96,8 @@ func (f *friendDatabase) AddFriendRequest(ctx context.Context, fromUserID, toUse
 
 // (1)先判断是否在好友表 (在不在都不返回错误) (2)对于不在好友列表的 插入即可
 func (f *friendDatabase) BecomeFriends(ctx context.Context, ownerUserID string, friendUserIDs []string, addSource int32) (err error) {
-	return f.tx.Transaction(func(tx any) error {
+	cache := f.cache.NewCache()
+	if err := f.tx.Transaction(func(tx any) error {
 		//先find 找出重复的 去掉重复的
 		fs1, err := f.friend.NewTx(tx).FindFriends(ctx, ownerUserID, friendUserIDs)
 		if err != nil {
@@ -135,8 +132,12 @@ func (f *friendDatabase) BecomeFriends(ctx context.Context, ownerUserID string,
 			return err
 		}
 		newFriendIDs = append(newFriendIDs, ownerUserID)
-		return f.cache.DelFriendIDs(newFriendIDs...).ExecDel(ctx)
-	})
+		cache = cache.DelFriendIDs(newFriendIDs...)
+		return nil
+	}); err != nil {
+		return nil
+	}
+	return cache.ExecDel(ctx)
 }
 
 // 拒绝好友申请 (1)检查是否有申请记录且为未处理状态 (没有记录返回错误) (2)修改申请记录 已拒绝
@@ -199,24 +200,18 @@ func (f *friendDatabase) AgreeFriendRequest(ctx context.Context, friendRequest *
 
 // 删除好友  外部判断是否好友关系
 func (f *friendDatabase) Delete(ctx context.Context, ownerUserID string, friendUserIDs []string) (err error) {
-	return f.tx.Transaction(func(tx any) error {
-		if err := f.friend.Delete(ctx, ownerUserID, friendUserIDs); err != nil {
-			return err
-		}
-		return f.cache.DelFriendIDs(append(friendUserIDs, ownerUserID)...).ExecDel(ctx)
-	})
-
+	if err := f.friend.Delete(ctx, ownerUserID, friendUserIDs); err != nil {
+		return err
+	}
+	return f.cache.DelFriendIDs(append(friendUserIDs, ownerUserID)...).ExecDel(ctx)
 }
 
 // 更新好友备注 零值也支持
 func (f *friendDatabase) UpdateRemark(ctx context.Context, ownerUserID, friendUserID, remark string) (err error) {
-	return f.tx.Transaction(func(tx any) error {
-		err := f.friend.UpdateRemark(ctx, ownerUserID, friendUserID, remark)
-		if err != nil {
-			return err
-		}
-		return f.cache.DelFriend(ownerUserID, friendUserID).ExecDel(ctx)
-	})
+	if err := f.friend.UpdateRemark(ctx, ownerUserID, friendUserID, remark); err != nil {
+		return err
+	}
+	return f.cache.DelFriend(ownerUserID, friendUserID).ExecDel(ctx)
 }
 
 // 获取ownerUserID的好友列表 无结果不返回错误
diff --git a/pkg/common/db/controller/group.go b/pkg/common/db/controller/group.go
index 57c5f1fb2..7f5cc4101 100644
--- a/pkg/common/db/controller/group.go
+++ b/pkg/common/db/controller/group.go
@@ -113,7 +113,8 @@ func (g *groupDatabase) FindGroupMemberUserID(ctx context.Context, groupID strin
 }
 
 func (g *groupDatabase) CreateGroup(ctx context.Context, groups []*relationTb.GroupModel, groupMembers []*relationTb.GroupMemberModel) error {
-	return g.tx.Transaction(func(tx any) error {
+	var cache = g.cache.NewCache()
+	if err := g.tx.Transaction(func(tx any) error {
 		if len(groups) > 0 {
 			if err := g.groupDB.NewTx(tx).Create(ctx, groups); err != nil {
 				return err
@@ -128,7 +129,7 @@ func (g *groupDatabase) CreateGroup(ctx context.Context, groups []*relationTb.Gr
 			return group.GroupID
 		})
 		m := make(map[string]struct{})
-		var cache = g.cache.NewCache()
+
 		for _, groupMember := range groupMembers {
 			if _, ok := m[groupMember.GroupID]; !ok {
 				m[groupMember.GroupID] = struct{}{}
@@ -137,8 +138,11 @@ func (g *groupDatabase) CreateGroup(ctx context.Context, groups []*relationTb.Gr
 			cache = cache.DelJoinedGroupID(groupMember.UserID).DelGroupMembersInfo(groupMember.GroupID, groupMember.UserID)
 		}
 		cache = cache.DelGroupsInfo(createGroupIDs...)
-		return cache.ExecDel(ctx)
-	})
+		return nil
+	}); err != nil {
+		return err
+	}
+	return cache.ExecDel(ctx)
 }
 
 func (g *groupDatabase) TakeGroup(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error) {
@@ -154,16 +158,15 @@ func (g *groupDatabase) SearchGroup(ctx context.Context, keyword string, pageNum
 }
 
 func (g *groupDatabase) UpdateGroup(ctx context.Context, groupID string, data map[string]any) error {
-	return g.tx.Transaction(func(tx any) error {
-		if err := g.groupDB.NewTx(tx).UpdateMap(ctx, groupID, data); err != nil {
-			return err
-		}
-		return g.cache.DelGroupsInfo(groupID).ExecDel(ctx)
-	})
+	if err := g.groupDB.UpdateMap(ctx, groupID, data); err != nil {
+		return err
+	}
+	return g.cache.DelGroupsInfo(groupID).ExecDel(ctx)
 }
 
 func (g *groupDatabase) DismissGroup(ctx context.Context, groupID string) error {
-	return g.tx.Transaction(func(tx any) error {
+	cache := g.cache.NewCache()
+	if err := g.tx.Transaction(func(tx any) error {
 		if err := g.groupDB.NewTx(tx).UpdateStatus(ctx, groupID, constant.GroupStatusDismissed); err != nil {
 			return err
 		}
@@ -174,8 +177,12 @@ func (g *groupDatabase) DismissGroup(ctx context.Context, groupID string) error
 		if err != nil {
 			return err
 		}
-		return g.cache.DelJoinedGroupID(userIDs...).DelGroupsInfo(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelGroupMembersHash(groupID).ExecDel(ctx)
-	})
+		cache = cache.DelJoinedGroupID(userIDs...).DelGroupsInfo(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelGroupMembersHash(groupID)
+		return nil
+	}); err != nil {
+		return err
+	}
+	return cache.ExecDel(ctx)
 }
 
 func (g *groupDatabase) TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relationTb.GroupMemberModel, err error) {
@@ -236,7 +243,8 @@ func (g *groupDatabase) SearchGroupMember(ctx context.Context, keyword string, g
 }
 
 func (g *groupDatabase) HandlerGroupRequest(ctx context.Context, groupID string, userID string, handledMsg string, handleResult int32, member *relationTb.GroupMemberModel) error {
-	return g.tx.Transaction(func(tx any) error {
+	cache := g.cache.NewCache()
+	if err := g.tx.Transaction(func(tx any) error {
 		if err := g.groupRequestDB.NewTx(tx).UpdateHandler(ctx, groupID, userID, handledMsg, handleResult); err != nil {
 			return err
 		}
@@ -244,19 +252,20 @@ func (g *groupDatabase) HandlerGroupRequest(ctx context.Context, groupID string,
 			if err := g.groupMemberDB.NewTx(tx).Create(ctx, []*relationTb.GroupMemberModel{member}); err != nil {
 				return err
 			}
-			return g.cache.DelGroupMembersHash(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelJoinedGroupID(member.UserID).ExecDel(ctx)
+			cache = cache.DelGroupMembersHash(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelJoinedGroupID(member.UserID)
 		}
 		return nil
-	})
+	}); err != nil {
+		return err
+	}
+	return cache.ExecDel(ctx)
 }
 
 func (g *groupDatabase) DeleteGroupMember(ctx context.Context, groupID string, userIDs []string) error {
-	return g.tx.Transaction(func(tx any) error {
-		if err := g.groupMemberDB.NewTx(tx).Delete(ctx, groupID, userIDs); err != nil {
-			return err
-		}
-		return g.cache.DelGroupMembersHash(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelJoinedGroupID(userIDs...).DelGroupMembersInfo(groupID, userIDs...).ExecDel(ctx)
-	})
+	if err := g.groupMemberDB.Delete(ctx, groupID, userIDs); err != nil {
+		return err
+	}
+	return g.cache.DelGroupMembersHash(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelJoinedGroupID(userIDs...).DelGroupMembersInfo(groupID, userIDs...).ExecDel(ctx)
 }
 
 func (g *groupDatabase) MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error) {
@@ -276,7 +285,7 @@ func (g *groupDatabase) MapGroupMemberNum(ctx context.Context, groupIDs []string
 }
 
 func (g *groupDatabase) TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error {
-	return g.tx.Transaction(func(tx any) error {
+	if err := g.tx.Transaction(func(tx any) error {
 		rowsAffected, err := g.groupMemberDB.NewTx(tx).UpdateRoleLevel(ctx, groupID, oldOwnerUserID, roleLevel)
 		if err != nil {
 			return err
@@ -291,30 +300,34 @@ func (g *groupDatabase) TransferGroupOwner(ctx context.Context, groupID string,
 		if rowsAffected != 1 {
 			return utils.Wrap(fmt.Errorf("newOwnerUserID %s rowsAffected = %d", newOwnerUserID, rowsAffected), "")
 		}
-		return g.cache.DelGroupMembersInfo(groupID, oldOwnerUserID, newOwnerUserID).ExecDel(ctx)
-	})
+		return nil
+	}); err != nil {
+		return err
+	}
+	return g.cache.DelGroupMembersInfo(groupID, oldOwnerUserID, newOwnerUserID).ExecDel(ctx)
 }
 
 func (g *groupDatabase) UpdateGroupMember(ctx context.Context, groupID string, userID string, data map[string]any) error {
-	return g.tx.Transaction(func(tx any) error {
-		if err := g.groupMemberDB.NewTx(tx).Update(ctx, groupID, userID, data); err != nil {
-			return err
-		}
-		return g.cache.DelGroupMembersInfo(groupID, userID).ExecDel(ctx)
-	})
+	if err := g.groupMemberDB.Update(ctx, groupID, userID, data); err != nil {
+		return err
+	}
+	return g.cache.DelGroupMembersInfo(groupID, userID).ExecDel(ctx)
 }
 
 func (g *groupDatabase) UpdateGroupMembers(ctx context.Context, data []*relationTb.BatchUpdateGroupMember) error {
-	return g.tx.Transaction(func(tx any) error {
-		var cache = g.cache.NewCache()
+	var cache = g.cache.NewCache()
+	if err := g.tx.Transaction(func(tx any) error {
 		for _, item := range data {
 			if err := g.groupMemberDB.NewTx(tx).Update(ctx, item.GroupID, item.UserID, item.Map); err != nil {
 				return err
 			}
 			cache = cache.DelGroupMembersInfo(item.GroupID, item.UserID)
 		}
-		return cache.ExecDel(ctx)
-	})
+		return nil
+	}); err != nil {
+		return err
+	}
+	return cache.ExecDel(ctx)
 }
 
 func (g *groupDatabase) CreateGroupRequest(ctx context.Context, requests []*relationTb.GroupRequestModel) error {
@@ -346,16 +359,15 @@ func (g *groupDatabase) FindJoinSuperGroup(ctx context.Context, userID string) (
 }
 
 func (g *groupDatabase) CreateSuperGroup(ctx context.Context, groupID string, initMemberIDs []string) error {
-	return g.ctxTx.Transaction(ctx, func(ctx context.Context) error {
-		if err := g.mongoDB.CreateSuperGroup(ctx, groupID, initMemberIDs); err != nil {
-			return err
-		}
-		return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(initMemberIDs...).ExecDel(ctx)
-	})
+	if err := g.mongoDB.CreateSuperGroup(ctx, groupID, initMemberIDs); err != nil {
+		return err
+	}
+	return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(initMemberIDs...).ExecDel(ctx)
 }
 
 func (g *groupDatabase) DeleteSuperGroup(ctx context.Context, groupID string) error {
-	return g.ctxTx.Transaction(ctx, func(ctx context.Context) error {
+	cache := g.cache.NewCache()
+	if err := g.ctxTx.Transaction(ctx, func(ctx context.Context) error {
 		if err := g.mongoDB.DeleteSuperGroup(ctx, groupID); err != nil {
 			return err
 		}
@@ -363,28 +375,27 @@ func (g *groupDatabase) DeleteSuperGroup(ctx context.Context, groupID string) er
 		if err != nil {
 			return err
 		}
-		cache := g.cache.DelSuperGroupMemberIDs(groupID)
+		cache = cache.DelSuperGroupMemberIDs(groupID)
 		if len(models) > 0 {
 			cache = cache.DelJoinedSuperGroupIDs(models[0].MemberIDs...)
 		}
-		return cache.ExecDel(ctx)
-	})
+		return nil
+	}); err != nil {
+		return err
+	}
+	return cache.ExecDel(ctx)
 }
 
 func (g *groupDatabase) DeleteSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error {
-	return g.ctxTx.Transaction(ctx, func(ctx context.Context) error {
-		if err := g.mongoDB.RemoverUserFromSuperGroup(ctx, groupID, userIDs); err != nil {
-			return err
-		}
-		return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(userIDs...).ExecDel(ctx)
-	})
+	if err := g.mongoDB.RemoverUserFromSuperGroup(ctx, groupID, userIDs); err != nil {
+		return err
+	}
+	return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(userIDs...).ExecDel(ctx)
 }
 
 func (g *groupDatabase) CreateSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error {
-	return g.ctxTx.Transaction(ctx, func(ctx context.Context) error {
-		if err := g.mongoDB.AddUserToSuperGroup(ctx, groupID, userIDs); err != nil {
-			return err
-		}
-		return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(userIDs...).ExecDel(ctx)
-	})
+	if err := g.mongoDB.AddUserToSuperGroup(ctx, groupID, userIDs); err != nil {
+		return err
+	}
+	return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(userIDs...).ExecDel(ctx)
 }
diff --git a/pkg/common/db/controller/common_msg.go b/pkg/common/db/controller/msg.go
similarity index 95%
rename from pkg/common/db/controller/common_msg.go
rename to pkg/common/db/controller/msg.go
index 819d20811..d544e0595 100644
--- a/pkg/common/db/controller/common_msg.go
+++ b/pkg/common/db/controller/msg.go
@@ -100,13 +100,12 @@ type commonMsgDatabase struct {
 	msgDocDatabase    unRelationTb.MsgDocModelInterface
 	extendMsgDatabase unRelationTb.ExtendMsgSetModelInterface
 	extendMsgSetModel unRelationTb.ExtendMsgSetModel
+	msg               unRelationTb.MsgDocModel
 	cache             cache.MsgModel
 	producer          *kafka.Producer
 	producerToMongo   *kafka.Producer
 	producerToModify  *kafka.Producer
 	producerToPush    *kafka.Producer
-	// model
-	msg unRelationTb.MsgDocModel
 }
 
 func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error {
@@ -277,7 +276,7 @@ func (db *commonMsgDatabase) DelMsgBySeqs(ctx context.Context, conversationID st
 }
 
 func (db *commonMsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (unExistSeqs []int64, err error) {
-	seqMsgs, indexes, unExistSeqs, err := db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs)
+	seqMsgs, indexes, unExistSeqs, err := db.msgDocDatabase.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs)
 	if err != nil {
 		return nil, err
 	}
@@ -289,37 +288,6 @@ func (db *commonMsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID str
 	return unExistSeqs, nil
 }
 
-func (db *commonMsgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) {
-	doc, err := db.msgDocDatabase.FindOneByDocID(ctx, docID)
-	if err != nil {
-		return nil, nil, nil, err
-	}
-	singleCount := 0
-	var hasSeqList []int64
-	for i := 0; i < len(doc.Msg); i++ {
-		msgPb, err := db.unmarshalMsg(&doc.Msg[i])
-		if err != nil {
-			return nil, nil, nil, err
-		}
-		if utils.Contain(msgPb.Seq, seqs...) {
-			indexes = append(indexes, i)
-			seqMsgs = append(seqMsgs, msgPb)
-			hasSeqList = append(hasSeqList, msgPb.Seq)
-			singleCount++
-			if singleCount == len(seqs) {
-				break
-			}
-		}
-	}
-	for _, i := range seqs {
-		if utils.Contain(i, hasSeqList...) {
-			continue
-		}
-		unExistSeqs = append(unExistSeqs, i)
-	}
-	return seqMsgs, indexes, unExistSeqs, nil
-}
-
 func (db *commonMsgDatabase) GetNewestMsg(ctx context.Context, conversationID string) (msgPb *sdkws.MsgData, err error) {
 	msgInfo, err := db.msgDocDatabase.GetNewestMsg(ctx, conversationID)
 	if err != nil {
@@ -395,12 +363,11 @@ func (db *commonMsgDatabase) refetchDelSeqsMsgs(ctx context.Context, conversatio
 }
 
 func (db *commonMsgDatabase) findMsgBySeq(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, unExistSeqs []int64, err error) {
-	beginSeq, endSeq := db.msg.GetSeqsBeginEnd(seqs)
-	msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, beginSeq, endSeq)
+	msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, seqs)
 	if err != nil {
 		return nil, nil, err
 	}
-	log.ZDebug(ctx, "findMsgBySeq", "docID", docID, "seqs", seqs, "beginSeq", beginSeq, "endSeq", endSeq, "len(msgs)", len(msgs))
+	log.ZDebug(ctx, "findMsgBySeq", "docID", docID, "seqs", seqs, "len(msgs)", len(msgs))
 	seqMsgs = append(seqMsgs, msgs...)
 	if len(msgs) == 0 {
 		unExistSeqs = seqs
@@ -416,7 +383,7 @@ func (db *commonMsgDatabase) findMsgBySeq(ctx context.Context, docID string, seq
 			}
 		}
 	}
-	msgs, _, unExistSeqs, err = db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, unExistSeqs)
+	msgs, _, unExistSeqs, err = db.msgDocDatabase.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, unExistSeqs)
 	if err != nil {
 		return nil, nil, err
 	}
@@ -446,7 +413,7 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, conversation
 		m = db.msg.GetDocIDSeqsMap(conversationID, totalNotExistSeqs)
 		for docID, seqs := range m {
 			docID = db.msg.ToNextDoc(docID)
-			msgs, _, unExistSeqs, err := db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs)
+			msgs, _, unExistSeqs, err := db.msgDocDatabase.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs)
 			if err != nil {
 				missedSeqs = append(missedSeqs, seqs...)
 				log.ZError(ctx, "get message from mongo exception", err, "docID", docID, "seqs", seqs)
@@ -477,7 +444,6 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, conversation
 	if len(totalNotExistSeqs) > 0 || len(delSeqs) > 0 {
 		sort.Sort(utils.MsgBySeq(seqMsgs))
 	}
-	// missSeqs为依然缺失的
 	return seqMsgs, nil
 }
 
diff --git a/pkg/common/db/controller/user.go b/pkg/common/db/controller/user.go
index 019942cda..72b1a9c49 100644
--- a/pkg/common/db/controller/user.go
+++ b/pkg/common/db/controller/user.go
@@ -76,40 +76,36 @@ func (u *userDatabase) Find(ctx context.Context, userIDs []string) (users []*rel
 
 // 插入多条 外部保证userID 不重复 且在db中不存在
 func (u *userDatabase) Create(ctx context.Context, users []*relation.UserModel) (err error) {
-	return u.tx.Transaction(func(tx any) error {
+	if err := u.tx.Transaction(func(tx any) error {
 		err = u.userDB.Create(ctx, users)
 		if err != nil {
 			return err
 		}
-		var userIDs []string
-		for _, user := range users {
-			userIDs = append(userIDs, user.UserID)
-		}
-		return u.cache.DelUsersInfo(userIDs...).ExecDel(ctx)
-	})
+		return nil
+	}); err != nil {
+		return err
+	}
+	var userIDs []string
+	for _, user := range users {
+		userIDs = append(userIDs, user.UserID)
+	}
+	return u.cache.DelUsersInfo(userIDs...).ExecDel(ctx)
 }
 
 // 更新(非零值) 外部保证userID存在
 func (u *userDatabase) Update(ctx context.Context, user *relation.UserModel) (err error) {
-	return u.tx.Transaction(func(tx any) error {
-		err = u.userDB.Update(ctx, user)
-		if err != nil {
-			return err
-		}
-		return u.cache.DelUsersInfo(user.UserID).ExecDel(ctx)
-	})
+	if err := u.userDB.Update(ctx, user); err != nil {
+		return err
+	}
+	return u.cache.DelUsersInfo(user.UserID).ExecDel(ctx)
 }
 
 // 更新(零值) 外部保证userID存在
 func (u *userDatabase) UpdateByMap(ctx context.Context, userID string, args map[string]interface{}) (err error) {
-	return u.tx.Transaction(func(tx any) error {
-		err = u.userDB.UpdateByMap(ctx, userID, args)
-		if err != nil {
-			return err
-		}
-		return u.cache.DelUsersInfo(userID).ExecDel(ctx)
-	})
-
+	if err := u.userDB.UpdateByMap(ctx, userID, args); err != nil {
+		return err
+	}
+	return u.cache.DelUsersInfo(userID).ExecDel(ctx)
 }
 
 // 获取,如果没找到,不返回错误
diff --git a/pkg/common/db/table/unrelation/msg.go b/pkg/common/db/table/unrelation/msg.go
index 6e6916cb1..7d67269d5 100644
--- a/pkg/common/db/table/unrelation/msg.go
+++ b/pkg/common/db/table/unrelation/msg.go
@@ -31,7 +31,8 @@ type MsgDocModelInterface interface {
 	Create(ctx context.Context, model *MsgDocModel) error
 	UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error
 	FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error)
-	GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, beginSeq, endSeq int64) ([]*sdkws.MsgData, error)
+	GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) ([]*sdkws.MsgData, error)
+	GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error)
 	GetNewestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error)
 	GetOldestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error)
 	Delete(ctx context.Context, docIDs []string) error
@@ -97,13 +98,6 @@ func (m MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[st
 	return t
 }
 
-func (m MsgDocModel) GetSeqsBeginEnd(seqs []int64) (int64, int64) {
-	if len(seqs) == 0 {
-		return 0, 0
-	}
-	return seqs[len(seqs)-1], seqs[0]
-}
-
 func (m MsgDocModel) GetMsgIndex(seq int64) int64 {
 	seqSuffix := seq / singleGocMsgNum
 	var index int64
diff --git a/pkg/common/db/unrelation/msg.go b/pkg/common/db/unrelation/msg.go
index b0ace47ae..7e738864b 100644
--- a/pkg/common/db/unrelation/msg.go
+++ b/pkg/common/db/unrelation/msg.go
@@ -57,6 +57,37 @@ func (m *MsgMongoDriver) FindOneByDocID(ctx context.Context, docID string) (*tab
 	return doc, err
 }
 
+func (m *MsgMongoDriver) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) {
+	doc, err := m.FindOneByDocID(ctx, docID)
+	if err != nil {
+		return nil, nil, nil, err
+	}
+	singleCount := 0
+	var hasSeqList []int64
+	for i := 0; i < len(doc.Msg); i++ {
+		var msg sdkws.MsgData
+		if err := proto.Unmarshal(doc.Msg[i].Msg, &msg); err != nil {
+			return nil, nil, nil, err
+		}
+		if utils.Contain(msg.Seq, seqs...) {
+			indexes = append(indexes, i)
+			seqMsgs = append(seqMsgs, &msg)
+			hasSeqList = append(hasSeqList, msg.Seq)
+			singleCount++
+			if singleCount == len(seqs) {
+				break
+			}
+		}
+	}
+	for _, i := range seqs {
+		if utils.Contain(i, hasSeqList...) {
+			continue
+		}
+		unExistSeqs = append(unExistSeqs, i)
+	}
+	return seqMsgs, indexes, unExistSeqs, nil
+}
+
 func (m *MsgMongoDriver) GetMsgsByIndex(ctx context.Context, conversationID string, index int64) (*table.MsgDocModel, error) {
 	findOpts := options.Find().SetLimit(1).SetSkip(index).SetSort(bson.M{"doc_id": 1})
 	cursor, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": primitive.Regex{Pattern: fmt.Sprintf("^%s:", conversationID)}}, findOpts)
@@ -134,7 +165,8 @@ func (m *MsgMongoDriver) UpdateOneDoc(ctx context.Context, msg *table.MsgDocMode
 	return err
 }
 
-func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, beginSeq, endSeq int64) (msgs []*sdkws.MsgData, err error) {
+func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) (msgs []*sdkws.MsgData, err error) {
+	beginSeq, endSeq := utils.GetSeqsBeginEnd(seqs)
 	beginIndex := m.msg.GetMsgIndex(beginSeq)
 	num := endSeq - beginSeq + 1
 	pipeline := bson.A{
diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go
index 4e28b631c..8f8af972d 100644
--- a/pkg/utils/utils.go
+++ b/pkg/utils/utils.go
@@ -252,6 +252,13 @@ func GetSelfNotificationConversationID(userID string) []string {
 	return []string{"n_" + userID + "_" + userID, "si_" + userID + "_" + userID}
 }
 
+func GetSeqsBeginEnd(seqs []int64) (int64, int64) {
+	if len(seqs) == 0 {
+		return 0, 0
+	}
+	return seqs[len(seqs)-1], seqs[0]
+}
+
 type MsgBySeq []*sdkws.MsgData
 
 func (s MsgBySeq) Len() int {