Merge remote-tracking branch 'origin/errcode' into errcode

# Conflicts:
#	pkg/common/db/cache/msg.go
#	pkg/common/db/controller/msg.go
#	pkg/common/db/controller/msg_test.go
#	pkg/common/db/table/unrelation/msg.go
#	pkg/common/db/unrelation/msg.go
This commit is contained in:
withchao 2023-05-29 14:39:54 +08:00
commit aa9042bd62
4 changed files with 35 additions and 22 deletions

View File

@ -91,7 +91,7 @@ type MsgModel interface {
LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
GetMsgsByConversationIDAndSeq(ctx context.Context, docID string, seqs []int64) ([]*unRelationTb.MsgInfoModel, error) GetMsgsByConversationIDAndSeq(ctx context.Context, userID, docID string, seqs []int64) ([]*unRelationTb.MsgInfoModel, error)
DeleteMsgByConversationIDAndSeq(ctx context.Context, docID string, seq int64) MsgModel DeleteMsgByConversationIDAndSeq(ctx context.Context, docID string, seq int64) MsgModel
} }
@ -568,13 +568,13 @@ func (c *msgCache) getMsgsIndex(msg *unRelationTb.MsgInfoModel, keys []string) (
return 0, errIndex return 0, errIndex
} }
func (c *msgCache) GetMsgsByConversationIDAndSeq(ctx context.Context, docID string, seqs []int64) ([]*unRelationTb.MsgInfoModel, error) { func (c *msgCache) GetMsgsByConversationIDAndSeq(ctx context.Context, userID, docID string, seqs []int64) ([]*unRelationTb.MsgInfoModel, error) {
var keys []string var keys []string
for _, seq := range seqs { for _, seq := range seqs {
keys = append(keys, c.getMsgReadCacheKey(docID, seq)) keys = append(keys, c.getMsgReadCacheKey(docID, seq))
} }
return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.getMsgsIndex, func(ctx context.Context) ([]*unRelationTb.MsgInfoModel, error) { return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.getMsgsIndex, func(ctx context.Context) ([]*unRelationTb.MsgInfoModel, error) {
return c.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, "", seqs) return c.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, docID, seqs)
}) })
} }

View File

@ -338,11 +338,11 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
return lastMaxSeq, isNew, utils.Wrap(err, "") return lastMaxSeq, isNew, utils.Wrap(err, "")
} }
func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, conversationID string, userID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) {
var totalUnExistSeqs []int64 var totalUnExistSeqs []int64
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) { for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) {
log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs) log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs)
msgs, unexistSeqs, err := db.findMsgInfoBySeq(ctx, docID, userID, seqs) msgs, unexistSeqs, err := db.findMsgInfoBySeq(ctx, userID, docID, seqs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -395,7 +395,7 @@ func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, conversationID st
// return seqMsgs, nil // return seqMsgs, nil
// } // }
func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, docID string, userID string, seqs []int64) (totalMsgs []*unRelationTb.MsgInfoModel, unExistSeqs []int64, err error) { func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID string, seqs []int64) (totalMsgs []*unRelationTb.MsgInfoModel, unExistSeqs []int64, err error) {
msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, userID, seqs) msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, userID, seqs)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
@ -426,7 +426,7 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin
var delSeqs []int64 var delSeqs []int64
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) { for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) {
log.ZDebug(ctx, "getMsgBySeqsRange", "docID", docID, "seqs", seqs) log.ZDebug(ctx, "getMsgBySeqsRange", "docID", docID, "seqs", seqs)
msgs, notExistSeqs, err := db.findMsgInfoBySeq(ctx, docID, userID, seqs) msgs, notExistSeqs, err := db.findMsgInfoBySeq(ctx, userID, docID, seqs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -449,6 +449,25 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin
} }
func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error) { func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error) {
userMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID)
if err != nil && errs.Unwrap(err) != redis.Nil {
return nil, err
}
minSeq, err := db.cache.GetMinSeq(ctx, conversationID)
if err != nil && errs.Unwrap(err) != redis.Nil {
return nil, err
}
if userMinSeq < minSeq {
minSeq = userMinSeq
}
if minSeq > end {
log.ZInfo(ctx, "minSeq > end", "minSeq", minSeq, "end", end)
return nil, nil
}
if begin < minSeq {
begin = minSeq
}
var seqs []int64 var seqs []int64
for i := end; i > end-num; i-- { for i := end; i > end-num; i-- {
if i >= begin { if i >= begin {
@ -509,7 +528,7 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co
log.ZInfo(ctx, "db.cache.GetMessagesBySeq", "userID", userID, "conversationID", conversationID, "seqs", seqs, "successMsgs", len(successMsgs), "failedSeqs", failedSeqs, "conversationID", conversationID) log.ZInfo(ctx, "db.cache.GetMessagesBySeq", "userID", userID, "conversationID", conversationID, "seqs", seqs, "successMsgs", len(successMsgs), "failedSeqs", failedSeqs, "conversationID", conversationID)
prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs)) prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
if len(failedSeqs) > 0 { if len(failedSeqs) > 0 {
mongoMsgs, err := db.getMsgBySeqs(ctx, conversationID, userID, failedSeqs) mongoMsgs, err := db.getMsgBySeqs(ctx, userID, conversationID, failedSeqs)
if err != nil { if err != nil {
prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs)) prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
return nil, err return nil, err

View File

@ -2,7 +2,6 @@ package controller
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"math/rand" "math/rand"
"strconv" "strconv"
@ -155,17 +154,17 @@ func Test_Insert(t *testing.T) {
db := GetDB() db := GetDB()
ctx := context.Background() ctx := context.Background()
var arr []any var arr []any
for i := 1; i <= 2000; i++ { for i := 0; i < 345; i++ {
//if i%2 == 0 { if i%2 == 0 {
// arr = append(arr, (*unRelationTb.MsgDataModel)(nil)) arr = append(arr, (*unRelationTb.MsgDataModel)(nil))
// continue continue
//} }
arr = append(arr, &unRelationTb.MsgDataModel{ arr = append(arr, &unRelationTb.MsgDataModel{
Seq: int64(i), Seq: int64(i),
Content: fmt.Sprintf("seq-%d", i), Content: fmt.Sprintf("test-%d", i),
}) })
} }
if err := db.BatchInsertBlock(ctx, "test", arr, updateKeyMsg, 1); err != nil { if err := db.BatchInsertBlock(ctx, "test", arr, updateKeyMsg, 0); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }

View File

@ -64,11 +64,6 @@ type MsgInfoModel struct {
DelList []string `bson:"del_list"` DelList []string `bson:"del_list"`
} }
//type MsgDocModel struct {
// DocID string `bson:"doc_id"`
// Msgs []*MsgInfoModel `bson:"msgs"`
//}
type MsgDocModelInterface interface { type MsgDocModelInterface interface {
PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []MsgInfoModel) error PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []MsgInfoModel) error
Create(ctx context.Context, model *MsgDocModel) error Create(ctx context.Context, model *MsgDocModel) error
@ -77,7 +72,7 @@ type MsgDocModelInterface interface {
UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error
IsExistDocID(ctx context.Context, docID string) (bool, error) IsExistDocID(ctx context.Context, docID string) (bool, error)
FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error) FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error)
GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, userID string, seqs []int64) ([]*MsgInfoModel, error) GetMsgBySeqIndexIn1Doc(ctx context.Context, userID, docID string, seqs []int64) ([]*MsgInfoModel, error)
GetNewestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error) GetNewestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error)
GetOldestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error) GetOldestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error)
DeleteDocs(ctx context.Context, docIDs []string) error DeleteDocs(ctx context.Context, docIDs []string) error