This commit is contained in:
wangchuxiao 2023-05-06 18:51:10 +08:00
parent 287b028ba8
commit 121523626d
8 changed files with 1065 additions and 931 deletions

View File

@ -132,12 +132,19 @@ func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sd
if err != nil { if err != nil {
return nil, err return nil, err
} }
seqs, err := m.MsgDatabase.GetMaxSeqs(ctx, conversationIDs) maxSeqs, err := m.MsgDatabase.GetMaxSeqs(ctx, conversationIDs)
if err != nil { if err != nil {
log.ZWarn(ctx, "GetMaxSeqs error", err, "conversationIDs", conversationIDs) log.ZWarn(ctx, "GetMaxSeqs error", err, "conversationIDs", conversationIDs)
return nil, err
}
minSeqs, err := m.MsgDatabase.GetMinSeqs(ctx, conversationIDs)
if err != nil {
log.ZWarn(ctx, "GetMinSeqs error", err, "conversationIDs", conversationIDs)
return nil, err
} }
resp := new(sdkws.GetMaxSeqResp) resp := new(sdkws.GetMaxSeqResp)
resp.MaxSeqs = seqs resp.MaxSeqs = maxSeqs
resp.MinSeqs = minSeqs
return resp, nil return resp, nil
} }

View File

@ -2,6 +2,7 @@ package user
import ( import (
"context" "context"
"strings"
"time" "time"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
@ -179,6 +180,9 @@ func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterR
if user.UserID == "" { if user.UserID == "" {
return nil, errs.ErrArgs.Wrap("userID is empty") return nil, errs.ErrArgs.Wrap("userID is empty")
} }
if strings.Contains(user.UserID, ":") {
return nil, errs.ErrArgs.Wrap("userID contains ':' is invalid userID")
}
userIDs = append(userIDs, user.UserID) userIDs = append(userIDs, user.UserID)
} }
exist, err := s.IsExist(ctx, userIDs) exist, err := s.IsExist(ctx, userIDs)

View File

@ -270,15 +270,15 @@ func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string,
} else { } else {
msg := sdkws.MsgData{} msg := sdkws.MsgData{}
err = jsonpb.UnmarshalString(v.String(), &msg) err = jsonpb.UnmarshalString(v.String(), &msg)
if err != nil { if err == nil {
failedSeqs = append(failedSeqs, seqs[i])
} else {
if msg.Status != constant.MsgDeleted { if msg.Status != constant.MsgDeleted {
seqMsgs = append(seqMsgs, &msg) seqMsgs = append(seqMsgs, &msg)
} else { continue
failedSeqs = append(failedSeqs, seqs[i])
} }
} else {
log.ZWarn(ctx, "UnmarshalString failed", err, "conversationID", conversationID, "seq", seqs[i])
} }
failedSeqs = append(failedSeqs, seqs[i])
} }
} }
return seqMsgs, failedSeqs, err return seqMsgs, failedSeqs, err

View File

@ -2,6 +2,7 @@ package controller
import ( import (
"fmt" "fmt"
"sort"
"sync" "sync"
"time" "time"
@ -12,8 +13,8 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"github.com/gogo/protobuf/sortkeys" "github.com/gogo/protobuf/sortkeys"
"context" "context"
@ -50,7 +51,7 @@ type MsgDatabase interface {
// DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error // DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error
// 删除会话消息重置最小seq remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache) // 删除会话消息重置最小seq remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
DeleteConversationMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error
// 获取会话 seq mongo和redis // 获取会话 seq mongo和redis
GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
@ -82,9 +83,9 @@ type MsgDatabase interface {
// to mq // to mq
MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error
MsgToModifyMQ(ctx context.Context, conversationID string, messages []*sdkws.MsgData) error MsgToModifyMQ(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error
MsgToPushMQ(ctx context.Context, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) MsgToPushMQ(ctx context.Context, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error)
MsgToMongoMQ(ctx context.Context, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error MsgToMongoMQ(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, lastSeq int64) error
} }
func NewMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.MsgModel) MsgDatabase { func NewMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.MsgModel) MsgDatabase {
@ -153,7 +154,7 @@ func (db *msgDatabase) GetExtendMsg(ctx context.Context, conversationID string,
} }
extendMsg, ok := extendMsgSet.ExtendMsgs[clientMsgID] extendMsg, ok := extendMsgSet.ExtendMsgs[clientMsgID]
if !ok { if !ok {
return nil, errors.New(fmt.Sprintf("cant find client msg id: %s", clientMsgID)) return nil, errs.ErrRecordNotFound.Wrap(fmt.Sprintf("cant find client msg id: %s", clientMsgID))
} }
reactionExtensionList := make(map[string]*pbMsg.KeyValueResp) reactionExtensionList := make(map[string]*pbMsg.KeyValueResp)
for key, model := range extendMsg.ReactionExtensionList { for key, model := range extendMsg.ReactionExtensionList {
@ -462,28 +463,119 @@ func (db *msgDatabase) getMsgBySeqs(ctx context.Context, conversationID string,
return seqMsgs, nil return seqMsgs, nil
} }
func (db *msgDatabase) getMsgBySeqsRange(ctx context.Context, conversationID string, seqs []int64, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error) { func (db *msgDatabase) refetchDelSeqsMsgs(ctx context.Context, conversationID string, delNums, rangeBegin, begin int64) (seqMsgs []*sdkws.MsgData, err error) {
m := db.msg.GetDocIDSeqsMap(conversationID, seqs) var reFetchSeqs []int64
for int64(len(seqMsg)) != num { if delNums > 0 {
for docID, value := range m { newBeginSeq := rangeBegin - delNums
beginSeq, endSeq := db.msg.GetSeqsBeginEnd(value) if newBeginSeq >= begin {
msgs, _, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, beginSeq, endSeq) newEndSeq := rangeBegin - 1
if err != nil { for i := newBeginSeq; i <= newEndSeq; i++ {
log.ZError(ctx, "GetMsgBySeqIndexIn1Doc error", err, "docID", docID, "beginSeq", beginSeq, "endSeq", endSeq) reFetchSeqs = append(reFetchSeqs, i)
continue
}
var newMsgs []*sdkws.MsgData
for _, msg := range msgs {
if msg.Status != constant.MsgDeleted {
newMsgs = append(newMsgs, msg)
}
}
if int64(len(newMsgs)) != num {
} }
} }
} }
return seqMsg, nil if len(reFetchSeqs) == 0 {
return
}
if len(reFetchSeqs) > 0 {
m := db.msg.GetDocIDSeqsMap(conversationID, reFetchSeqs)
for docID, seq := range m {
msgs, _, err := db.findMsgBySeq(ctx, docID, seq)
if err != nil {
return nil, err
}
for _, msg := range msgs {
if msg.Status != constant.MsgDeleted {
seqMsgs = append(seqMsgs, msg)
}
}
}
}
if len(seqMsgs) < int(delNums) {
seqMsgs2, err := db.refetchDelSeqsMsgs(ctx, conversationID, delNums-int64(len(seqMsgs)), rangeBegin-1, begin)
if err != nil {
return seqMsgs, err
}
seqMsgs = append(seqMsgs, seqMsgs2...)
}
return seqMsgs, nil
}
func (db *msgDatabase) findMsgBySeq(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, unExistSeqs []int64, err error) {
beginSeq, endSeq := db.msg.GetSeqsBeginEnd(seqs)
msgs, _, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, beginSeq, endSeq)
if err != nil {
return nil, nil, err
}
for _, seq := range seqs {
for i, msg := range msgs {
if seq == msg.Seq {
seqMsgs = append(seqMsgs, msg)
continue
}
if i == len(msgs)-1 {
unExistSeqs = append(unExistSeqs, seq)
}
}
}
msgs, _, unExistSeqs, err = db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs)
if err != nil {
return nil, nil, err
}
seqMsgs = append(seqMsgs, msgs...)
return seqMsgs, unExistSeqs, nil
}
func (db *msgDatabase) getMsgBySeqsRange(ctx context.Context, conversationID string, allSeqs []int64, begin, end, num int64) (seqMsgs []*sdkws.MsgData, err error) {
log.ZDebug(ctx, "getMsgBySeqsRange", "conversationID", conversationID, "allSeqs", allSeqs, "begin", begin, "end", end, "num", num)
m := db.msg.GetDocIDSeqsMap(conversationID, allSeqs)
var totalNotExistSeqs []int64
// mongo index
for docID, seqs := range m {
msgs, notExistSeqs, err := db.findMsgBySeq(ctx, docID, seqs)
if err != nil {
return nil, err
}
log.ZDebug(ctx, "getMsgBySeqsRange", "docID", docID, "seqs", seqs, "unExistSeqs", notExistSeqs, "msgs", msgs)
seqMsgs = append(seqMsgs, msgs...)
totalNotExistSeqs = append(totalNotExistSeqs, notExistSeqs...)
}
log.ZDebug(ctx, "getMsgBySeqsRange", "totalNotExistSeqs", totalNotExistSeqs)
// find by next doc
if len(totalNotExistSeqs) > 0 {
m = db.msg.GetDocIDSeqsMap(conversationID, totalNotExistSeqs)
for docID, seqs := range m {
docID = db.msg.ToNextDoc(docID)
msgs, _, unExistSeqs, err := db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs)
if err != nil {
log.ZError(ctx, "get message from mongo exception", err, "docID", docID, "seqs", seqs)
continue
}
seqMsgs = append(seqMsgs, msgs...)
if len(unExistSeqs) > 0 {
log.ZWarn(ctx, "some seqs lost in mongo", err, "docID", docID, "seqs", seqs, "unExistSeqs", unExistSeqs)
}
}
}
var delSeqs []int64
for _, msg := range seqMsgs {
if msg.Status == constant.MsgDeleted {
delSeqs = append(delSeqs, msg.Seq)
}
}
if len(delSeqs) > 0 {
msgs, err := db.refetchDelSeqsMsgs(ctx, conversationID, int64(len(delSeqs)), allSeqs[0], begin)
if err != nil {
log.ZWarn(ctx, "refetchDelSeqsMsgs", err, "delSeqs", delSeqs, "begin", begin)
}
seqMsgs = append(seqMsgs, msgs...)
}
// sort by seq
if len(totalNotExistSeqs) > 0 || len(delSeqs) > 0 {
sort.Sort(utils.MsgBySeq(seqMsgs))
}
// missSeqs为依然缺失的
return seqMsgs, nil
} }
func (db *msgDatabase) GetMsgBySeqsRange(ctx context.Context, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error) { func (db *msgDatabase) GetMsgBySeqsRange(ctx context.Context, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error) {
@ -521,7 +613,7 @@ func (db *msgDatabase) GetMsgBySeqs(ctx context.Context, conversationID string,
if err != nil { if err != nil {
if err != redis.Nil { if err != redis.Nil {
prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs)) prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
log.Error(mcontext.GetOperationID(ctx), "get message from redis exception", err.Error(), failedSeqs) log.ZError(ctx, "get message from redis exception", err, "failedSeqs", failedSeqs, "conversationID", conversationID)
} }
} }
prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs)) prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
@ -577,7 +669,7 @@ func (db *msgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Context, c
var delStruct delMsgRecursionStruct var delStruct delMsgRecursionStruct
minSeq, err := db.deleteMsgRecursion(ctx, conversationID, unRelationTb.OldestList, &delStruct, remainTime) minSeq, err := db.deleteMsgRecursion(ctx, conversationID, unRelationTb.OldestList, &delStruct, remainTime)
if err != nil { if err != nil {
return utils.Wrap(err, "") return err
} }
if minSeq == 0 { if minSeq == 0 {
return nil return nil

View File

@ -76,14 +76,12 @@ func (m MsgDocModel) GetSeqDocIDList(userID string, maxSeq int64) []string {
return seqUserIDs return seqUserIDs
} }
// func (m MsgDocModel) getSeqSuperGroupID(groupID string, seq int64) string { func (m MsgDocModel) ToNextDoc(docID string) string {
// seqSuffix := seq / singleGocMsgNum l := strings.Split(docID, ":")
// return m.superGroupIndexGen(groupID, seqSuffix) index, _ := strconv.Atoi(l[len(l)-1])
// } index++
return strings.Split(docID, ":")[0] + ":" + strconv.Itoa(index)
// func (m MsgDocModel) superGroupIndexGen(groupID string, seqSuffix int64) string { }
// return "super_group_" + groupID + ":" + strconv.FormatInt(int64(seqSuffix), 10)
// }
func (m MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[string][]int64 { func (m MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[string][]int64 {
t := make(map[string][]int64) t := make(map[string][]int64)

File diff suppressed because it is too large Load Diff

View File

@ -148,6 +148,7 @@ message GetMaxSeqReq {
message GetMaxSeqResp { message GetMaxSeqResp {
map<string, int64> maxSeqs = 1; map<string, int64> maxSeqs = 1;
map<string, int64> minSeqs = 2;
} }
message UserSendMsgResp { message UserSendMsgResp {

View File

@ -238,3 +238,17 @@ func GetNotificationConversationIDBySessionType(sessionType int, ids ...string)
func IsNotification(conversationID string) bool { func IsNotification(conversationID string) bool {
return strings.HasPrefix(conversationID, "n_") return strings.HasPrefix(conversationID, "n_")
} }
type MsgBySeq []*sdkws.MsgData
func (s MsgBySeq) Len() int {
return len(s)
}
func (s MsgBySeq) Less(i, j int) bool {
return s[i].Seq < s[j].Seq
}
func (s MsgBySeq) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}