This commit is contained in:
wangchuxiao 2023-05-26 18:45:00 +08:00
parent 5fbfafb1ab
commit dd1c6154d8
5 changed files with 43 additions and 23 deletions

View File

@ -91,7 +91,7 @@ type MsgModel interface {
LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
GetMsgsByConversationIDAndSeq(ctx context.Context, docID string, seqs []int64) ([]*unRelationTb.MsgInfoModel, error) GetMsgsByConversationIDAndSeq(ctx context.Context, userID, docID string, seqs []int64) ([]*unRelationTb.MsgInfoModel, error)
DeleteMsgByConversationIDAndSeq(ctx context.Context, docID string, seq int64) MsgModel DeleteMsgByConversationIDAndSeq(ctx context.Context, docID string, seq int64) MsgModel
} }
@ -568,13 +568,13 @@ func (c *msgCache) getMsgsIndex(msg *unRelationTb.MsgInfoModel, keys []string) (
return 0, errIndex return 0, errIndex
} }
func (c *msgCache) GetMsgsByConversationIDAndSeq(ctx context.Context, docID string, seqs []int64) ([]*unRelationTb.MsgInfoModel, error) { func (c *msgCache) GetMsgsByConversationIDAndSeq(ctx context.Context, userID, docID string, seqs []int64) ([]*unRelationTb.MsgInfoModel, error) {
var keys []string var keys []string
for _, seq := range seqs { for _, seq := range seqs {
keys = append(keys, c.getMsgReadCacheKey(docID, seq)) keys = append(keys, c.getMsgReadCacheKey(docID, seq))
} }
return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.getMsgsIndex, func(ctx context.Context) ([]*unRelationTb.MsgInfoModel, error) { return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.getMsgsIndex, func(ctx context.Context) ([]*unRelationTb.MsgInfoModel, error) {
return c.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, seqs) return c.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, docID, seqs)
}) })
} }

View File

@ -338,11 +338,11 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
return lastMaxSeq, isNew, utils.Wrap(err, "") return lastMaxSeq, isNew, utils.Wrap(err, "")
} }
func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) {
var totalUnExistSeqs []int64 var totalUnExistSeqs []int64
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) { for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) {
log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs) log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs)
msgs, unexistSeqs, err := db.findMsgInfoBySeq(ctx, docID, seqs) msgs, unexistSeqs, err := db.findMsgInfoBySeq(ctx, userID, docID, seqs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -395,8 +395,8 @@ func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, conversationID st
// return seqMsgs, nil // return seqMsgs, nil
// } // }
func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, docID string, seqs []int64) (totalMsgs []*unRelationTb.MsgInfoModel, unExistSeqs []int64, err error) { func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID string, seqs []int64) (totalMsgs []*unRelationTb.MsgInfoModel, unExistSeqs []int64, err error) {
msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, seqs) msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, userID, seqs)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -426,7 +426,7 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin
var delSeqs []int64 var delSeqs []int64
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) { for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) {
log.ZDebug(ctx, "getMsgBySeqsRange", "docID", docID, "seqs", seqs) log.ZDebug(ctx, "getMsgBySeqsRange", "docID", docID, "seqs", seqs)
msgs, notExistSeqs, err := db.findMsgInfoBySeq(ctx, docID, seqs) msgs, notExistSeqs, err := db.findMsgInfoBySeq(ctx, userID, docID, seqs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -449,6 +449,25 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin
} }
func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error) { func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error) {
userMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID)
if err != nil && errs.Unwrap(err) != redis.Nil {
return nil, err
}
minSeq, err := db.cache.GetMinSeq(ctx, conversationID)
if err != nil && errs.Unwrap(err) != redis.Nil {
return nil, err
}
if userMinSeq < minSeq {
minSeq = userMinSeq
}
if minSeq > end {
log.ZInfo(ctx, "minSeq > end", "minSeq", minSeq, "end", end)
return nil, nil
}
if begin < minSeq {
begin = minSeq
}
var seqs []int64 var seqs []int64
for i := end; i > end-num; i-- { for i := end; i > end-num; i-- {
if i >= begin { if i >= begin {
@ -508,7 +527,7 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co
} }
prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs)) prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
if len(failedSeqs) > 0 { if len(failedSeqs) > 0 {
mongoMsgs, err := db.getMsgBySeqs(ctx, conversationID, failedSeqs) mongoMsgs, err := db.getMsgBySeqs(ctx, userID, conversationID, failedSeqs)
if err != nil { if err != nil {
prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs)) prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
return nil, err return nil, err

View File

@ -185,17 +185,17 @@ func Test_Revoke(t *testing.T) {
} }
} }
func Test_Delete(t *testing.T) { // func Test_Delete(t *testing.T) {
db := GetDB() // db := GetDB()
ctx := context.Background() // ctx := context.Background()
var arr []any // var arr []any
for i := 0; i < 123; i++ { // for i := 0; i < 123; i++ {
arr = append(arr, []string{"uid_1", "uid_2"}) // arr = append(arr, []string{"uid_1", "uid_2"})
} // }
if err := db.BatchInsertBlock(ctx, "test", arr, updateKeyDel, 210); err != nil { // if err := db.BatchInsertBlock(ctx, "test", arr, "", 210); err != nil {
t.Fatal(err) // t.Fatal(err)
} // }
} // }
//func Test_Delete1(t *testing.T) { //func Test_Delete1(t *testing.T) {
// config.Config.Mongo.DBAddress = []string{"192.168.44.128:37017"} // config.Config.Mongo.DBAddress = []string{"192.168.44.128:37017"}

View File

@ -72,7 +72,7 @@ type MsgDocModelInterface interface {
UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error
IsExistDocID(ctx context.Context, docID string) (bool, error) IsExistDocID(ctx context.Context, docID string) (bool, error)
FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error) FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error)
GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) ([]*MsgInfoModel, error) GetMsgBySeqIndexIn1Doc(ctx context.Context, userID, docID string, seqs []int64) ([]*MsgInfoModel, error)
GetNewestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error) GetNewestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error)
GetOldestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error) GetOldestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error)
DeleteDocs(ctx context.Context, docIDs []string) error DeleteDocs(ctx context.Context, docIDs []string) error

View File

@ -178,7 +178,7 @@ func (m *MsgMongoDriver) DeleteDocs(ctx context.Context, docIDs []string) error
return err return err
} }
func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) (msgs []*table.MsgInfoModel, err error) { func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, userID, docID string, seqs []int64) (msgs []*table.MsgInfoModel, err error) {
beginSeq, endSeq := utils.GetSeqsBeginEnd(seqs) beginSeq, endSeq := utils.GetSeqsBeginEnd(seqs)
beginIndex := m.model.GetMsgIndex(beginSeq) beginIndex := m.model.GetMsgIndex(beginSeq)
num := endSeq - beginSeq + 1 num := endSeq - beginSeq + 1
@ -206,9 +206,10 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID strin
if err != nil { if err != nil {
return nil, err return nil, err
} }
if i == 0 { if i == 1 {
break break
} }
i++
} }
log.ZDebug(ctx, "msgInfos", "num", len(doc.Msg), "docID", docID) log.ZDebug(ctx, "msgInfos", "num", len(doc.Msg), "docID", docID)
for _, v := range doc.Msg { for _, v := range doc.Msg {