GetMsgBySeqIndexIn1Doc

This commit is contained in:
withchao 2023-05-29 14:36:41 +08:00
parent 4d7b49aa1f
commit edad1eee41
5 changed files with 143 additions and 65 deletions

View File

@ -574,7 +574,7 @@ func (c *msgCache) GetMsgsByConversationIDAndSeq(ctx context.Context, docID stri
keys = append(keys, c.getMsgReadCacheKey(docID, seq)) keys = append(keys, c.getMsgReadCacheKey(docID, seq))
} }
return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.getMsgsIndex, func(ctx context.Context) ([]*unRelationTb.MsgInfoModel, error) { return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.getMsgsIndex, func(ctx context.Context) ([]*unRelationTb.MsgInfoModel, error) {
return c.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, seqs) return c.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, "", seqs)
}) })
} }

View File

@ -338,11 +338,11 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
return lastMaxSeq, isNew, utils.Wrap(err, "") return lastMaxSeq, isNew, utils.Wrap(err, "")
} }
func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, conversationID string, userID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) {
var totalUnExistSeqs []int64 var totalUnExistSeqs []int64
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) { for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) {
log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs) log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs)
msgs, unexistSeqs, err := db.findMsgInfoBySeq(ctx, docID, seqs) msgs, unexistSeqs, err := db.findMsgInfoBySeq(ctx, docID, userID, seqs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -395,8 +395,8 @@ func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, conversationID st
// return seqMsgs, nil // return seqMsgs, nil
// } // }
func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, docID string, seqs []int64) (totalMsgs []*unRelationTb.MsgInfoModel, unExistSeqs []int64, err error) { func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, docID string, userID string, seqs []int64) (totalMsgs []*unRelationTb.MsgInfoModel, unExistSeqs []int64, err error) {
msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, seqs) msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, userID, seqs)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -426,7 +426,7 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin
var delSeqs []int64 var delSeqs []int64
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) { for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) {
log.ZDebug(ctx, "getMsgBySeqsRange", "docID", docID, "seqs", seqs) log.ZDebug(ctx, "getMsgBySeqsRange", "docID", docID, "seqs", seqs)
msgs, notExistSeqs, err := db.findMsgInfoBySeq(ctx, docID, seqs) msgs, notExistSeqs, err := db.findMsgInfoBySeq(ctx, docID, userID, seqs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -509,7 +509,7 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co
log.ZInfo(ctx, "db.cache.GetMessagesBySeq", "userID", userID, "conversationID", conversationID, "seqs", seqs, "successMsgs", len(successMsgs), "failedSeqs", failedSeqs, "conversationID", conversationID) log.ZInfo(ctx, "db.cache.GetMessagesBySeq", "userID", userID, "conversationID", conversationID, "seqs", seqs, "successMsgs", len(successMsgs), "failedSeqs", failedSeqs, "conversationID", conversationID)
prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs)) prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
if len(failedSeqs) > 0 { if len(failedSeqs) > 0 {
mongoMsgs, err := db.getMsgBySeqs(ctx, conversationID, failedSeqs) mongoMsgs, err := db.getMsgBySeqs(ctx, conversationID, userID, failedSeqs)
if err != nil { if err != nil {
prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs)) prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
return nil, err return nil, err

View File

@ -2,6 +2,7 @@ package controller
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"math/rand" "math/rand"
"strconv" "strconv"
@ -154,17 +155,17 @@ func Test_Insert(t *testing.T) {
db := GetDB() db := GetDB()
ctx := context.Background() ctx := context.Background()
var arr []any var arr []any
for i := 0; i < 345; i++ { for i := 1; i <= 2000; i++ {
if i%2 == 0 { //if i%2 == 0 {
arr = append(arr, (*unRelationTb.MsgDataModel)(nil)) // arr = append(arr, (*unRelationTb.MsgDataModel)(nil))
continue // continue
} //}
arr = append(arr, &unRelationTb.MsgDataModel{ arr = append(arr, &unRelationTb.MsgDataModel{
Seq: int64(i), Seq: int64(i),
Content: fmt.Sprintf("test-%d", i), Content: fmt.Sprintf("seq-%d", i),
}) })
} }
if err := db.BatchInsertBlock(ctx, "test", arr, updateKeyMsg, 0); err != nil { if err := db.BatchInsertBlock(ctx, "test", arr, updateKeyMsg, 1); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
@ -185,18 +186,29 @@ func Test_Revoke(t *testing.T) {
} }
} }
func Test_Delete(t *testing.T) { func Test_FindBySeq(t *testing.T) {
db := GetDB() db := GetDB()
ctx := context.Background() ctx := context.Background()
var arr []any res, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, "test:0", "123456", []int64{1, 2, 3})
for i := 0; i < 123; i++ { if err != nil {
arr = append(arr, []string{"uid_1", "uid_2"})
}
if err := db.BatchInsertBlock(ctx, "test", arr, updateKeyDel, 210); err != nil {
t.Fatal(err) t.Fatal(err)
} }
data, _ := json.Marshal(res)
fmt.Println(string(data))
} }
//func Test_Delete(t *testing.T) {
// db := GetDB()
// ctx := context.Background()
// var arr []any
// for i := 0; i < 123; i++ {
// arr = append(arr, []string{"uid_1", "uid_2"})
// }
// if err := db.BatchInsertBlock(ctx, "test", arr, updateKeyDel, 210); err != nil {
// t.Fatal(err)
// }
//}
//func Test_Delete1(t *testing.T) { //func Test_Delete1(t *testing.T) {
// config.Config.Mongo.DBAddress = []string{"192.168.44.128:37017"} // config.Config.Mongo.DBAddress = []string{"192.168.44.128:37017"}
// config.Config.Mongo.DBTimeout = 60 // config.Config.Mongo.DBTimeout = 60

View File

@ -64,6 +64,11 @@ type MsgInfoModel struct {
DelList []string `bson:"del_list"` DelList []string `bson:"del_list"`
} }
//type MsgDocModel struct {
// DocID string `bson:"doc_id"`
// Msgs []*MsgInfoModel `bson:"msgs"`
//}
type MsgDocModelInterface interface { type MsgDocModelInterface interface {
PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []MsgInfoModel) error PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []MsgInfoModel) error
Create(ctx context.Context, model *MsgDocModel) error Create(ctx context.Context, model *MsgDocModel) error
@ -72,7 +77,7 @@ type MsgDocModelInterface interface {
UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error
IsExistDocID(ctx context.Context, docID string) (bool, error) IsExistDocID(ctx context.Context, docID string) (bool, error)
FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error) FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error)
GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) ([]*MsgInfoModel, error) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, userID string, seqs []int64) ([]*MsgInfoModel, error)
GetNewestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error) GetNewestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error)
GetOldestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error) GetOldestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error)
DeleteDocs(ctx context.Context, docIDs []string) error DeleteDocs(ctx context.Context, docIDs []string) error

View File

@ -6,7 +6,6 @@ import (
"fmt" "fmt"
table "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" table "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
@ -178,59 +177,121 @@ func (m *MsgMongoDriver) DeleteDocs(ctx context.Context, docIDs []string) error
return err return err
} }
func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) (msgs []*table.MsgInfoModel, err error) { func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, userID string, seqs []int64) (msgs []*table.MsgInfoModel, err error) {
beginSeq, endSeq := utils.GetSeqsBeginEnd(seqs) indexs := make([]int64, 0, len(seqs))
beginIndex := m.model.GetMsgIndex(beginSeq) for _, seq := range seqs {
num := endSeq - beginSeq + 1 indexs = append(indexs, m.model.GetMsgIndex(seq))
log.ZInfo(ctx, "GetMsgBySeqIndexIn1Doc", "docID", docID, "seqs", seqs, "beginSeq", beginSeq, "endSeq", endSeq, "beginIndex", beginIndex, "num", num) }
pipeline := bson.A{ pipeline := mongo.Pipeline{
bson.M{ {
"$match": bson.M{"doc_id": docID}, {"$match", bson.D{
{"doc_id", docID},
}},
}, },
bson.M{ {
"$project": bson.M{ {"$project", bson.D{
"msgs": bson.M{ {"_id", 0},
"$slice": bson.A{"$msgs", beginIndex, num}, {"msgs", bson.D{
}, {"$map", bson.D{
}, {"input", indexs},
{"as", "index"},
{"in", bson.D{
{"$let", bson.D{
{"vars", bson.D{
{"currentMsg", bson.D{
{"$arrayElemAt", []string{"$msgs", "$$index"}},
}},
}},
{"in", bson.D{
{"$cond", bson.D{
{"if", bson.D{
{"$in", []string{userID, "$$currentMsg.del_list"}},
}},
{"then", nil},
{"else", "$$currentMsg"},
}},
}},
}},
}},
}},
}},
}},
},
{
{"$project", bson.D{
{"doc_id", 0},
{"msgs.del_list", 0},
}},
}, },
} }
cursor, err := m.MsgCollection.Aggregate(ctx, pipeline) cur, err := m.MsgCollection.Aggregate(ctx, pipeline)
if err != nil { if err != nil {
return nil, errs.Wrap(err) return nil, errs.Wrap(err)
} }
defer cursor.Close(ctx) defer cur.Close(ctx)
var doc table.MsgDocModel var msgDocModel []table.MsgDocModel
if cursor.Next(ctx) { if err := cur.All(ctx, &msgDocModel); err != nil {
if err := cursor.Decode(&doc); err != nil { return nil, errs.Wrap(err)
return nil, err
}
} }
////i := 0 if len(msgDocModel) == 0 {
//for cursor.Next(ctx) { return nil, errs.Wrap(mongo.ErrNoDocuments)
// err := cursor.Decode(&doc)
// if err != nil {
// return nil, err
// }
// //if i == 0 {
// // break
// //}
//}
log.ZDebug(ctx, "msgInfos", "num", len(doc.Msg), "docID", docID)
for _, v := range doc.Msg {
if v.Msg == nil {
continue
}
if v.Msg.Seq >= beginSeq && v.Msg.Seq <= endSeq {
log.ZDebug(ctx, "find msg", "msg", v.Msg)
msgs = append(msgs, v)
} else {
log.ZWarn(ctx, "this msg is at wrong position", nil, "msg", v.Msg)
}
} }
return msgs, nil return msgDocModel[0].Msg, nil
} }
//func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) (msgs []*table.MsgInfoModel, err error) {
// beginSeq, endSeq := utils.GetSeqsBeginEnd(seqs)
// beginIndex := m.model.GetMsgIndex(beginSeq)
// num := endSeq - beginSeq + 1
// log.ZInfo(ctx, "GetMsgBySeqIndexIn1Doc", "docID", docID, "seqs", seqs, "beginSeq", beginSeq, "endSeq", endSeq, "beginIndex", beginIndex, "num", num)
// pipeline := bson.A{
// bson.M{
// "$match": bson.M{"doc_id": docID},
// },
// bson.M{
// "$project": bson.M{
// "msgs": bson.M{
// "$slice": bson.A{"$msgs", beginIndex, num},
// },
// },
// },
// }
// cursor, err := m.MsgCollection.Aggregate(ctx, pipeline)
// if err != nil {
// return nil, errs.Wrap(err)
// }
// defer cursor.Close(ctx)
// var doc table.MsgDocModel
// if cursor.Next(ctx) {
// if err := cursor.Decode(&doc); err != nil {
// return nil, err
// }
// }
// ////i := 0
// //for cursor.Next(ctx) {
// // err := cursor.Decode(&doc)
// // if err != nil {
// // return nil, err
// // }
// // //if i == 0 {
// // // break
// // //}
// //}
// log.ZDebug(ctx, "msgInfos", "num", len(doc.Msg), "docID", docID)
// for _, v := range doc.Msg {
// if v.Msg == nil {
// continue
// }
// if v.Msg.Seq >= beginSeq && v.Msg.Seq <= endSeq {
// log.ZDebug(ctx, "find msg", "msg", v.Msg)
// msgs = append(msgs, v)
// } else {
// log.ZWarn(ctx, "this msg is at wrong position", nil, "msg", v.Msg)
// }
// }
// return msgs, nil
//}
func (m *MsgMongoDriver) IsExistDocID(ctx context.Context, docID string) (bool, error) { func (m *MsgMongoDriver) IsExistDocID(ctx context.Context, docID string) (bool, error) {
count, err := m.MsgCollection.CountDocuments(ctx, bson.M{"doc_id": docID}) count, err := m.MsgCollection.CountDocuments(ctx, bson.M{"doc_id": docID})
if err != nil { if err != nil {