From edad1eee412891112c38e81ebd903ef86bdf5380 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Mon, 29 May 2023 14:36:41 +0800 Subject: [PATCH] GetMsgBySeqIndexIn1Doc --- pkg/common/db/cache/msg.go | 2 +- pkg/common/db/controller/msg.go | 12 +-- pkg/common/db/controller/msg_test.go | 38 ++++--- pkg/common/db/table/unrelation/msg.go | 7 +- pkg/common/db/unrelation/msg.go | 149 ++++++++++++++++++-------- 5 files changed, 143 insertions(+), 65 deletions(-) diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index 5d6574568..24b35e0c7 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -574,7 +574,7 @@ func (c *msgCache) GetMsgsByConversationIDAndSeq(ctx context.Context, docID stri keys = append(keys, c.getMsgReadCacheKey(docID, seq)) } return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.getMsgsIndex, func(ctx context.Context) ([]*unRelationTb.MsgInfoModel, error) { - return c.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, seqs) + return c.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, "", seqs) }) } diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index c6a095780..4f2b3a0b7 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -338,11 +338,11 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa return lastMaxSeq, isNew, utils.Wrap(err, "") } -func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { +func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, conversationID string, userID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { var totalUnExistSeqs []int64 for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) { log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs) - msgs, unexistSeqs, err := db.findMsgInfoBySeq(ctx, docID, seqs) + msgs, unexistSeqs, err := db.findMsgInfoBySeq(ctx, docID, userID, seqs) if err != nil { return nil, err } @@ -395,8 +395,8 @@ func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, conversationID st // return seqMsgs, nil // } -func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, docID string, seqs []int64) (totalMsgs []*unRelationTb.MsgInfoModel, unExistSeqs []int64, err error) { - msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, seqs) +func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, docID string, userID string, seqs []int64) (totalMsgs []*unRelationTb.MsgInfoModel, unExistSeqs []int64, err error) { + msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, userID, seqs) if err != nil { return nil, nil, err } @@ -426,7 +426,7 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin var delSeqs []int64 for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) { log.ZDebug(ctx, "getMsgBySeqsRange", "docID", docID, "seqs", seqs) - msgs, notExistSeqs, err := db.findMsgInfoBySeq(ctx, docID, seqs) + msgs, notExistSeqs, err := db.findMsgInfoBySeq(ctx, docID, userID, seqs) if err != nil { return nil, err } @@ -509,7 +509,7 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co log.ZInfo(ctx, "db.cache.GetMessagesBySeq", "userID", userID, "conversationID", conversationID, "seqs", seqs, "successMsgs", len(successMsgs), "failedSeqs", failedSeqs, "conversationID", conversationID) prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs)) if len(failedSeqs) > 0 { - mongoMsgs, err := db.getMsgBySeqs(ctx, conversationID, failedSeqs) + mongoMsgs, err := db.getMsgBySeqs(ctx, conversationID, userID, failedSeqs) if err != nil { prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs)) return nil, err diff --git a/pkg/common/db/controller/msg_test.go b/pkg/common/db/controller/msg_test.go index 520549850..754ca9808 100644 --- a/pkg/common/db/controller/msg_test.go +++ b/pkg/common/db/controller/msg_test.go @@ -2,6 +2,7 @@ package controller import ( "context" + "encoding/json" "fmt" "math/rand" "strconv" @@ -154,17 +155,17 @@ func Test_Insert(t *testing.T) { db := GetDB() ctx := context.Background() var arr []any - for i := 0; i < 345; i++ { - if i%2 == 0 { - arr = append(arr, (*unRelationTb.MsgDataModel)(nil)) - continue - } + for i := 1; i <= 2000; i++ { + //if i%2 == 0 { + // arr = append(arr, (*unRelationTb.MsgDataModel)(nil)) + // continue + //} arr = append(arr, &unRelationTb.MsgDataModel{ Seq: int64(i), - Content: fmt.Sprintf("test-%d", i), + Content: fmt.Sprintf("seq-%d", i), }) } - if err := db.BatchInsertBlock(ctx, "test", arr, updateKeyMsg, 0); err != nil { + if err := db.BatchInsertBlock(ctx, "test", arr, updateKeyMsg, 1); err != nil { t.Fatal(err) } } @@ -185,18 +186,29 @@ func Test_Revoke(t *testing.T) { } } -func Test_Delete(t *testing.T) { +func Test_FindBySeq(t *testing.T) { db := GetDB() ctx := context.Background() - var arr []any - for i := 0; i < 123; i++ { - arr = append(arr, []string{"uid_1", "uid_2"}) - } - if err := db.BatchInsertBlock(ctx, "test", arr, updateKeyDel, 210); err != nil { + res, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, "test:0", "123456", []int64{1, 2, 3}) + if err != nil { t.Fatal(err) } + data, _ := json.Marshal(res) + fmt.Println(string(data)) } +//func Test_Delete(t *testing.T) { +// db := GetDB() +// ctx := context.Background() +// var arr []any +// for i := 0; i < 123; i++ { +// arr = append(arr, []string{"uid_1", "uid_2"}) +// } +// if err := db.BatchInsertBlock(ctx, "test", arr, updateKeyDel, 210); err != nil { +// t.Fatal(err) +// } +//} + //func Test_Delete1(t *testing.T) { // config.Config.Mongo.DBAddress = []string{"192.168.44.128:37017"} // config.Config.Mongo.DBTimeout = 60 diff --git a/pkg/common/db/table/unrelation/msg.go b/pkg/common/db/table/unrelation/msg.go index 323e4a7fc..6ba5a4cc1 100644 --- a/pkg/common/db/table/unrelation/msg.go +++ b/pkg/common/db/table/unrelation/msg.go @@ -64,6 +64,11 @@ type MsgInfoModel struct { DelList []string `bson:"del_list"` } +//type MsgDocModel struct { +// DocID string `bson:"doc_id"` +// Msgs []*MsgInfoModel `bson:"msgs"` +//} + type MsgDocModelInterface interface { PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []MsgInfoModel) error Create(ctx context.Context, model *MsgDocModel) error @@ -72,7 +77,7 @@ type MsgDocModelInterface interface { UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error IsExistDocID(ctx context.Context, docID string) (bool, error) FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error) - GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) ([]*MsgInfoModel, error) + GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, userID string, seqs []int64) ([]*MsgInfoModel, error) GetNewestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error) GetOldestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error) DeleteDocs(ctx context.Context, docIDs []string) error diff --git a/pkg/common/db/unrelation/msg.go b/pkg/common/db/unrelation/msg.go index 41d70506d..d4fcd1097 100644 --- a/pkg/common/db/unrelation/msg.go +++ b/pkg/common/db/unrelation/msg.go @@ -6,7 +6,6 @@ import ( "fmt" table "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" @@ -178,59 +177,121 @@ func (m *MsgMongoDriver) DeleteDocs(ctx context.Context, docIDs []string) error return err } -func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) (msgs []*table.MsgInfoModel, err error) { - beginSeq, endSeq := utils.GetSeqsBeginEnd(seqs) - beginIndex := m.model.GetMsgIndex(beginSeq) - num := endSeq - beginSeq + 1 - log.ZInfo(ctx, "GetMsgBySeqIndexIn1Doc", "docID", docID, "seqs", seqs, "beginSeq", beginSeq, "endSeq", endSeq, "beginIndex", beginIndex, "num", num) - pipeline := bson.A{ - bson.M{ - "$match": bson.M{"doc_id": docID}, +func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, userID string, seqs []int64) (msgs []*table.MsgInfoModel, err error) { + indexs := make([]int64, 0, len(seqs)) + for _, seq := range seqs { + indexs = append(indexs, m.model.GetMsgIndex(seq)) + } + pipeline := mongo.Pipeline{ + { + {"$match", bson.D{ + {"doc_id", docID}, + }}, }, - bson.M{ - "$project": bson.M{ - "msgs": bson.M{ - "$slice": bson.A{"$msgs", beginIndex, num}, - }, - }, + { + {"$project", bson.D{ + {"_id", 0}, + {"msgs", bson.D{ + {"$map", bson.D{ + {"input", indexs}, + {"as", "index"}, + {"in", bson.D{ + {"$let", bson.D{ + {"vars", bson.D{ + {"currentMsg", bson.D{ + {"$arrayElemAt", []string{"$msgs", "$$index"}}, + }}, + }}, + {"in", bson.D{ + {"$cond", bson.D{ + {"if", bson.D{ + {"$in", []string{userID, "$$currentMsg.del_list"}}, + }}, + {"then", nil}, + {"else", "$$currentMsg"}, + }}, + }}, + }}, + }}, + }}, + }}, + }}, + }, + { + {"$project", bson.D{ + {"doc_id", 0}, + {"msgs.del_list", 0}, + }}, }, } - cursor, err := m.MsgCollection.Aggregate(ctx, pipeline) + cur, err := m.MsgCollection.Aggregate(ctx, pipeline) if err != nil { return nil, errs.Wrap(err) } - defer cursor.Close(ctx) - var doc table.MsgDocModel - if cursor.Next(ctx) { - if err := cursor.Decode(&doc); err != nil { - return nil, err - } + defer cur.Close(ctx) + var msgDocModel []table.MsgDocModel + if err := cur.All(ctx, &msgDocModel); err != nil { + return nil, errs.Wrap(err) } - ////i := 0 - //for cursor.Next(ctx) { - // err := cursor.Decode(&doc) - // if err != nil { - // return nil, err - // } - // //if i == 0 { - // // break - // //} - //} - log.ZDebug(ctx, "msgInfos", "num", len(doc.Msg), "docID", docID) - for _, v := range doc.Msg { - if v.Msg == nil { - continue - } - if v.Msg.Seq >= beginSeq && v.Msg.Seq <= endSeq { - log.ZDebug(ctx, "find msg", "msg", v.Msg) - msgs = append(msgs, v) - } else { - log.ZWarn(ctx, "this msg is at wrong position", nil, "msg", v.Msg) - } + if len(msgDocModel) == 0 { + return nil, errs.Wrap(mongo.ErrNoDocuments) } - return msgs, nil + return msgDocModel[0].Msg, nil } +//func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) (msgs []*table.MsgInfoModel, err error) { +// beginSeq, endSeq := utils.GetSeqsBeginEnd(seqs) +// beginIndex := m.model.GetMsgIndex(beginSeq) +// num := endSeq - beginSeq + 1 +// log.ZInfo(ctx, "GetMsgBySeqIndexIn1Doc", "docID", docID, "seqs", seqs, "beginSeq", beginSeq, "endSeq", endSeq, "beginIndex", beginIndex, "num", num) +// pipeline := bson.A{ +// bson.M{ +// "$match": bson.M{"doc_id": docID}, +// }, +// bson.M{ +// "$project": bson.M{ +// "msgs": bson.M{ +// "$slice": bson.A{"$msgs", beginIndex, num}, +// }, +// }, +// }, +// } +// cursor, err := m.MsgCollection.Aggregate(ctx, pipeline) +// if err != nil { +// return nil, errs.Wrap(err) +// } +// defer cursor.Close(ctx) +// var doc table.MsgDocModel +// if cursor.Next(ctx) { +// if err := cursor.Decode(&doc); err != nil { +// return nil, err +// } +// } +// ////i := 0 +// //for cursor.Next(ctx) { +// // err := cursor.Decode(&doc) +// // if err != nil { +// // return nil, err +// // } +// // //if i == 0 { +// // // break +// // //} +// //} +// log.ZDebug(ctx, "msgInfos", "num", len(doc.Msg), "docID", docID) +// for _, v := range doc.Msg { +// if v.Msg == nil { +// continue +// } +// if v.Msg.Seq >= beginSeq && v.Msg.Seq <= endSeq { +// log.ZDebug(ctx, "find msg", "msg", v.Msg) +// msgs = append(msgs, v) +// } else { +// log.ZWarn(ctx, "this msg is at wrong position", nil, "msg", v.Msg) +// } +// } +// return msgs, nil +//} + func (m *MsgMongoDriver) IsExistDocID(ctx context.Context, docID string) (bool, error) { count, err := m.MsgCollection.CountDocuments(ctx, bson.M{"doc_id": docID}) if err != nil {