From dd1c6154d85f8bfc27a8501a7c136411fc1de5b2 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Fri, 26 May 2023 18:45:00 +0800 Subject: [PATCH] msg --- pkg/common/db/cache/msg.go | 6 +++--- pkg/common/db/controller/msg.go | 31 +++++++++++++++++++++------ pkg/common/db/controller/msg_test.go | 22 +++++++++---------- pkg/common/db/table/unrelation/msg.go | 2 +- pkg/common/db/unrelation/msg.go | 5 +++-- 5 files changed, 43 insertions(+), 23 deletions(-) diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index 5d6574568..54690cb51 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -91,7 +91,7 @@ type MsgModel interface { LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error - GetMsgsByConversationIDAndSeq(ctx context.Context, docID string, seqs []int64) ([]*unRelationTb.MsgInfoModel, error) + GetMsgsByConversationIDAndSeq(ctx context.Context, userID, docID string, seqs []int64) ([]*unRelationTb.MsgInfoModel, error) DeleteMsgByConversationIDAndSeq(ctx context.Context, docID string, seq int64) MsgModel } @@ -568,13 +568,13 @@ func (c *msgCache) getMsgsIndex(msg *unRelationTb.MsgInfoModel, keys []string) ( return 0, errIndex } -func (c *msgCache) GetMsgsByConversationIDAndSeq(ctx context.Context, docID string, seqs []int64) ([]*unRelationTb.MsgInfoModel, error) { +func (c *msgCache) GetMsgsByConversationIDAndSeq(ctx context.Context, userID, docID string, seqs []int64) ([]*unRelationTb.MsgInfoModel, error) { var keys []string for _, seq := range seqs { keys = append(keys, c.getMsgReadCacheKey(docID, seq)) } return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.getMsgsIndex, func(ctx context.Context) ([]*unRelationTb.MsgInfoModel, error) { - return c.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, seqs) + return c.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, docID, seqs) }) } diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 5d61add0b..405d6e0ef 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -338,11 +338,11 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa return lastMaxSeq, isNew, utils.Wrap(err, "") } -func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { +func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { var totalUnExistSeqs []int64 for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) { log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs) - msgs, unexistSeqs, err := db.findMsgInfoBySeq(ctx, docID, seqs) + msgs, unexistSeqs, err := db.findMsgInfoBySeq(ctx, userID, docID, seqs) if err != nil { return nil, err } @@ -395,8 +395,8 @@ func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, conversationID st // return seqMsgs, nil // } -func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, docID string, seqs []int64) (totalMsgs []*unRelationTb.MsgInfoModel, unExistSeqs []int64, err error) { - msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, seqs) +func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID string, seqs []int64) (totalMsgs []*unRelationTb.MsgInfoModel, unExistSeqs []int64, err error) { + msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, userID, seqs) if err != nil { return nil, nil, err } @@ -426,7 +426,7 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin var delSeqs []int64 for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) { log.ZDebug(ctx, "getMsgBySeqsRange", "docID", docID, "seqs", seqs) - msgs, notExistSeqs, err := db.findMsgInfoBySeq(ctx, docID, seqs) + msgs, notExistSeqs, err := db.findMsgInfoBySeq(ctx, userID, docID, seqs) if err != nil { return nil, err } @@ -449,6 +449,25 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin } func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error) { + userMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID) + if err != nil && errs.Unwrap(err) != redis.Nil { + return nil, err + } + minSeq, err := db.cache.GetMinSeq(ctx, conversationID) + if err != nil && errs.Unwrap(err) != redis.Nil { + return nil, err + } + if userMinSeq < minSeq { + minSeq = userMinSeq + } + if minSeq > end { + log.ZInfo(ctx, "minSeq > end", "minSeq", minSeq, "end", end) + return nil, nil + } + if begin < minSeq { + begin = minSeq + } + var seqs []int64 for i := end; i > end-num; i-- { if i >= begin { @@ -508,7 +527,7 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co } prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs)) if len(failedSeqs) > 0 { - mongoMsgs, err := db.getMsgBySeqs(ctx, conversationID, failedSeqs) + mongoMsgs, err := db.getMsgBySeqs(ctx, userID, conversationID, failedSeqs) if err != nil { prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs)) return nil, err diff --git a/pkg/common/db/controller/msg_test.go b/pkg/common/db/controller/msg_test.go index 520549850..e76da93c4 100644 --- a/pkg/common/db/controller/msg_test.go +++ b/pkg/common/db/controller/msg_test.go @@ -185,17 +185,17 @@ func Test_Revoke(t *testing.T) { } } -func Test_Delete(t *testing.T) { - db := GetDB() - ctx := context.Background() - var arr []any - for i := 0; i < 123; i++ { - arr = append(arr, []string{"uid_1", "uid_2"}) - } - if err := db.BatchInsertBlock(ctx, "test", arr, updateKeyDel, 210); err != nil { - t.Fatal(err) - } -} +// func Test_Delete(t *testing.T) { +// db := GetDB() +// ctx := context.Background() +// var arr []any +// for i := 0; i < 123; i++ { +// arr = append(arr, []string{"uid_1", "uid_2"}) +// } +// if err := db.BatchInsertBlock(ctx, "test", arr, "", 210); err != nil { +// t.Fatal(err) +// } +// } //func Test_Delete1(t *testing.T) { // config.Config.Mongo.DBAddress = []string{"192.168.44.128:37017"} diff --git a/pkg/common/db/table/unrelation/msg.go b/pkg/common/db/table/unrelation/msg.go index 323e4a7fc..bbb388107 100644 --- a/pkg/common/db/table/unrelation/msg.go +++ b/pkg/common/db/table/unrelation/msg.go @@ -72,7 +72,7 @@ type MsgDocModelInterface interface { UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error IsExistDocID(ctx context.Context, docID string) (bool, error) FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error) - GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) ([]*MsgInfoModel, error) + GetMsgBySeqIndexIn1Doc(ctx context.Context, userID, docID string, seqs []int64) ([]*MsgInfoModel, error) GetNewestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error) GetOldestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error) DeleteDocs(ctx context.Context, docIDs []string) error diff --git a/pkg/common/db/unrelation/msg.go b/pkg/common/db/unrelation/msg.go index a583009dd..d842a61be 100644 --- a/pkg/common/db/unrelation/msg.go +++ b/pkg/common/db/unrelation/msg.go @@ -178,7 +178,7 @@ func (m *MsgMongoDriver) DeleteDocs(ctx context.Context, docIDs []string) error return err } -func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) (msgs []*table.MsgInfoModel, err error) { +func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, userID, docID string, seqs []int64) (msgs []*table.MsgInfoModel, err error) { beginSeq, endSeq := utils.GetSeqsBeginEnd(seqs) beginIndex := m.model.GetMsgIndex(beginSeq) num := endSeq - beginSeq + 1 @@ -206,9 +206,10 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID strin if err != nil { return nil, err } - if i == 0 { + if i == 1 { break } + i++ } log.ZDebug(ctx, "msgInfos", "num", len(doc.Msg), "docID", docID) for _, v := range doc.Msg {