diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index 24b35e0c7..54690cb51 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -91,7 +91,7 @@ type MsgModel interface { LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error - GetMsgsByConversationIDAndSeq(ctx context.Context, docID string, seqs []int64) ([]*unRelationTb.MsgInfoModel, error) + GetMsgsByConversationIDAndSeq(ctx context.Context, userID, docID string, seqs []int64) ([]*unRelationTb.MsgInfoModel, error) DeleteMsgByConversationIDAndSeq(ctx context.Context, docID string, seq int64) MsgModel } @@ -568,13 +568,13 @@ func (c *msgCache) getMsgsIndex(msg *unRelationTb.MsgInfoModel, keys []string) ( return 0, errIndex } -func (c *msgCache) GetMsgsByConversationIDAndSeq(ctx context.Context, docID string, seqs []int64) ([]*unRelationTb.MsgInfoModel, error) { +func (c *msgCache) GetMsgsByConversationIDAndSeq(ctx context.Context, userID, docID string, seqs []int64) ([]*unRelationTb.MsgInfoModel, error) { var keys []string for _, seq := range seqs { keys = append(keys, c.getMsgReadCacheKey(docID, seq)) } return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.getMsgsIndex, func(ctx context.Context) ([]*unRelationTb.MsgInfoModel, error) { - return c.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, "", seqs) + return c.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, docID, seqs) }) } diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 4f2b3a0b7..0a43c5ce5 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -338,11 +338,11 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa return lastMaxSeq, isNew, utils.Wrap(err, "") } -func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, conversationID string, userID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { +func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { var totalUnExistSeqs []int64 for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) { log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs) - msgs, unexistSeqs, err := db.findMsgInfoBySeq(ctx, docID, userID, seqs) + msgs, unexistSeqs, err := db.findMsgInfoBySeq(ctx, userID, docID, seqs) if err != nil { return nil, err } @@ -395,7 +395,7 @@ func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, conversationID st // return seqMsgs, nil // } -func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, docID string, userID string, seqs []int64) (totalMsgs []*unRelationTb.MsgInfoModel, unExistSeqs []int64, err error) { +func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID string, seqs []int64) (totalMsgs []*unRelationTb.MsgInfoModel, unExistSeqs []int64, err error) { msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, userID, seqs) if err != nil { return nil, nil, err @@ -426,7 +426,7 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin var delSeqs []int64 for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) { log.ZDebug(ctx, "getMsgBySeqsRange", "docID", docID, "seqs", seqs) - msgs, notExistSeqs, err := db.findMsgInfoBySeq(ctx, docID, userID, seqs) + msgs, notExistSeqs, err := db.findMsgInfoBySeq(ctx, userID, docID, seqs) if err != nil { return nil, err } @@ -449,6 +449,25 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin } func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error) { + userMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID) + if err != nil && errs.Unwrap(err) != redis.Nil { + return nil, err + } + minSeq, err := db.cache.GetMinSeq(ctx, conversationID) + if err != nil && errs.Unwrap(err) != redis.Nil { + return nil, err + } + if userMinSeq < minSeq { + minSeq = userMinSeq + } + if minSeq > end { + log.ZInfo(ctx, "minSeq > end", "minSeq", minSeq, "end", end) + return nil, nil + } + if begin < minSeq { + begin = minSeq + } + var seqs []int64 for i := end; i > end-num; i-- { if i >= begin { @@ -509,7 +528,7 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co log.ZInfo(ctx, "db.cache.GetMessagesBySeq", "userID", userID, "conversationID", conversationID, "seqs", seqs, "successMsgs", len(successMsgs), "failedSeqs", failedSeqs, "conversationID", conversationID) prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs)) if len(failedSeqs) > 0 { - mongoMsgs, err := db.getMsgBySeqs(ctx, conversationID, userID, failedSeqs) + mongoMsgs, err := db.getMsgBySeqs(ctx, userID, conversationID, failedSeqs) if err != nil { prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs)) return nil, err diff --git a/pkg/common/db/controller/msg_test.go b/pkg/common/db/controller/msg_test.go index 754ca9808..e3d0a57bd 100644 --- a/pkg/common/db/controller/msg_test.go +++ b/pkg/common/db/controller/msg_test.go @@ -2,7 +2,6 @@ package controller import ( "context" - "encoding/json" "fmt" "math/rand" "strconv" @@ -155,17 +154,17 @@ func Test_Insert(t *testing.T) { db := GetDB() ctx := context.Background() var arr []any - for i := 1; i <= 2000; i++ { - //if i%2 == 0 { - // arr = append(arr, (*unRelationTb.MsgDataModel)(nil)) - // continue - //} + for i := 0; i < 345; i++ { + if i%2 == 0 { + arr = append(arr, (*unRelationTb.MsgDataModel)(nil)) + continue + } arr = append(arr, &unRelationTb.MsgDataModel{ Seq: int64(i), - Content: fmt.Sprintf("seq-%d", i), + Content: fmt.Sprintf("test-%d", i), }) } - if err := db.BatchInsertBlock(ctx, "test", arr, updateKeyMsg, 1); err != nil { + if err := db.BatchInsertBlock(ctx, "test", arr, updateKeyMsg, 0); err != nil { t.Fatal(err) } } diff --git a/pkg/common/db/table/unrelation/msg.go b/pkg/common/db/table/unrelation/msg.go index 6ba5a4cc1..bbb388107 100644 --- a/pkg/common/db/table/unrelation/msg.go +++ b/pkg/common/db/table/unrelation/msg.go @@ -64,11 +64,6 @@ type MsgInfoModel struct { DelList []string `bson:"del_list"` } -//type MsgDocModel struct { -// DocID string `bson:"doc_id"` -// Msgs []*MsgInfoModel `bson:"msgs"` -//} - type MsgDocModelInterface interface { PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []MsgInfoModel) error Create(ctx context.Context, model *MsgDocModel) error @@ -77,7 +72,7 @@ type MsgDocModelInterface interface { UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error IsExistDocID(ctx context.Context, docID string) (bool, error) FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error) - GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, userID string, seqs []int64) ([]*MsgInfoModel, error) + GetMsgBySeqIndexIn1Doc(ctx context.Context, userID, docID string, seqs []int64) ([]*MsgInfoModel, error) GetNewestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error) GetOldestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error) DeleteDocs(ctx context.Context, docIDs []string) error