// Copyright © 2023 OpenIM. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package controller import ( "context" "encoding/json" "errors" "github.com/openimsdk/tools/utils/jsonutil" "strconv" "strings" "time" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/redis/go-redis/v9" "go.mongodb.org/mongo-driver/mongo" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/protocol/constant" pbmsg "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/utils/datautil" ) const ( updateKeyMsg = iota updateKeyRevoke ) // CommonMsgDatabase defines the interface for message database operations. type CommonMsgDatabase interface { // RevokeMsg revokes a message in a conversation. RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *model.RevokeModel) error // MarkSingleChatMsgsAsRead marks messages as read for a single chat by sequence numbers. MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, seqs []int64) error // GetMsgBySeqsRange retrieves messages from MongoDB by a range of sequence numbers. GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num, userMaxSeq int64) (minSeq int64, maxSeq int64, seqMsg []*sdkws.MsgData, err error) // GetMsgBySeqs retrieves messages for large groups from MongoDB by sequence numbers. GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (minSeq int64, maxSeq int64, seqMsg []*sdkws.MsgData, err error) GetMessagesBySeqWithBounds(ctx context.Context, userID string, conversationID string, seqs []int64, pullOrder sdkws.PullOrder) (bool, int64, []*sdkws.MsgData, error) // DeleteUserMsgsBySeqs allows a user to delete messages based on sequence numbers. DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error // DeleteMsgsPhysicalBySeqs physically deletes messages by emptying them based on sequence numbers. DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, seqs []int64) error //SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) SetMinSeqs(ctx context.Context, seqs map[string]int64) error SetMinSeq(ctx context.Context, conversationID string, seq int64) error SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error GetMaxSeqsWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) GetMaxSeqWithTime(ctx context.Context, conversationID string) (database.SeqTime, error) GetCacheMaxSeqWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) SetSendMsgStatus(ctx context.Context, id string, status int32) error GetSendMsgStatus(ctx context.Context, id string) (int32, error) SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int64, msgData []*pbmsg.SearchedMsgData, err error) FindOneByDocIDs(ctx context.Context, docIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error) // to mq MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error) GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) SetUserConversationsMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error SetUserConversationsMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error DeleteDoc(ctx context.Context, docID string) error GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) GetLastMessage(ctx context.Context, conversationIDS []string, userID string) (map[string]*sdkws.MsgData, error) } func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) { conf, err := kafka.BuildProducerConfig(*kafkaConf.Build()) if err != nil { return nil, err } producerToRedis, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToRedisTopic) if err != nil { return nil, err } return &commonMsgDatabase{ msgDocDatabase: msgDocModel, msgCache: msg, seqUser: seqUser, seqConversation: seqConversation, producer: producerToRedis, }, nil } type commonMsgDatabase struct { msgDocDatabase database.Msg msgTable model.MsgDocModel msgCache cache.MsgCache seqConversation cache.SeqConversationCache seqUser cache.SeqUser producer *kafka.Producer } func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error { _, _, err := db.producer.SendMessage(ctx, key, msg2mq) return err } func (db *commonMsgDatabase) batchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error { if len(fields) == 0 { return nil } num := db.msgTable.GetSingleGocMsgNum() // num = 100 for i, field := range fields { // Check the type of the field var ok bool switch key { case updateKeyMsg: var msg *model.MsgDataModel msg, ok = field.(*model.MsgDataModel) if msg != nil && msg.Seq != firstSeq+int64(i) { return errs.ErrInternalServer.WrapMsg("seq is invalid") } case updateKeyRevoke: _, ok = field.(*model.RevokeModel) default: return errs.ErrInternalServer.WrapMsg("key is invalid") } if !ok { return errs.ErrInternalServer.WrapMsg("field type is invalid") } } // Returns true if the document exists in the database, false if the document does not exist in the database updateMsgModel := func(seq int64, i int) (bool, error) { var ( res *mongo.UpdateResult err error ) docID := db.msgTable.GetDocID(conversationID, seq) index := db.msgTable.GetMsgIndex(seq) field := fields[i] switch key { case updateKeyMsg: res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "msg", field) case updateKeyRevoke: res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "revoke", field) } if err != nil { return false, err } return res.MatchedCount > 0, nil } tryUpdate := true for i := 0; i < len(fields); i++ { seq := firstSeq + int64(i) // Current sequence number if tryUpdate { matched, err := updateMsgModel(seq, i) if err != nil { return err } if matched { continue // The current data has been updated, skip the current data } } doc := model.MsgDocModel{ DocID: db.msgTable.GetDocID(conversationID, seq), Msg: make([]*model.MsgInfoModel, num), } var insert int // Inserted data number for j := i; j < len(fields); j++ { seq = firstSeq + int64(j) if db.msgTable.GetDocID(conversationID, seq) != doc.DocID { break } insert++ switch key { case updateKeyMsg: doc.Msg[db.msgTable.GetMsgIndex(seq)] = &model.MsgInfoModel{ Msg: fields[j].(*model.MsgDataModel), } case updateKeyRevoke: doc.Msg[db.msgTable.GetMsgIndex(seq)] = &model.MsgInfoModel{ Revoke: fields[j].(*model.RevokeModel), } } } for i, msgInfo := range doc.Msg { if msgInfo == nil { msgInfo = &model.MsgInfoModel{} doc.Msg[i] = msgInfo } if msgInfo.DelList == nil { doc.Msg[i].DelList = []string{} } } if err := db.msgDocDatabase.Create(ctx, &doc); err != nil { if mongo.IsDuplicateKeyError(err) { i-- // already inserted tryUpdate = true // next block use update mode continue } return err } tryUpdate = false // The current block is inserted successfully, and the next block is inserted preferentially i += insert - 1 // Skip the inserted data } return nil } func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *model.RevokeModel) error { if err := db.batchInsertBlock(ctx, conversationID, []any{revoke}, updateKeyRevoke, seq); err != nil { return err } return db.msgCache.DelMessageBySeqs(ctx, conversationID, []int64{seq}) } func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, totalSeqs []int64) error { for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, totalSeqs) { var indexes []int64 for _, seq := range seqs { indexes = append(indexes, db.msgTable.GetMsgIndex(seq)) } log.ZDebug(ctx, "MarkSingleChatMsgsAsRead", "userID", userID, "docID", docID, "indexes", indexes) if err := db.msgDocDatabase.MarkSingleChatMsgsAsRead(ctx, userID, docID, indexes); err != nil { log.ZError(ctx, "MarkSingleChatMsgsAsRead", err, "userID", userID, "docID", docID, "indexes", indexes) return err } } return db.msgCache.DelMessageBySeqs(ctx, conversationID, totalSeqs) } func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { return db.GetMessageBySeqs(ctx, conversationID, userID, seqs) } func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][]*model.MsgInfoModel, userID, conversationID string, msg *model.MsgInfoModel) { if msg == nil || msg.Msg == nil { return } if msg.IsRead { msg.Msg.IsRead = true } if msg.Msg.ContentType != constant.Quote { return } if msg.Msg.Content == "" { return } type MsgData struct { SendID string `json:"sendID"` RecvID string `json:"recvID"` GroupID string `json:"groupID"` ClientMsgID string `json:"clientMsgID"` ServerMsgID string `json:"serverMsgID"` SenderPlatformID int32 `json:"senderPlatformID"` SenderNickname string `json:"senderNickname"` SenderFaceURL string `json:"senderFaceURL"` SessionType int32 `json:"sessionType"` MsgFrom int32 `json:"msgFrom"` ContentType int32 `json:"contentType"` Content string `json:"content"` Seq int64 `json:"seq"` SendTime int64 `json:"sendTime"` CreateTime int64 `json:"createTime"` Status int32 `json:"status"` IsRead bool `json:"isRead"` Options map[string]bool `json:"options,omitempty"` OfflinePushInfo *sdkws.OfflinePushInfo `json:"offlinePushInfo"` AtUserIDList []string `json:"atUserIDList"` AttachedInfo string `json:"attachedInfo"` Ex string `json:"ex"` KeyVersion int32 `json:"keyVersion"` DstUserIDs []string `json:"dstUserIDs"` } var quoteMsg struct { Text string `json:"text,omitempty"` QuoteMessage *MsgData `json:"quoteMessage,omitempty"` MessageEntityList json.RawMessage `json:"messageEntityList,omitempty"` } if err := json.Unmarshal([]byte(msg.Msg.Content), "eMsg); err != nil { log.ZError(ctx, "json.Unmarshal", err) return } if quoteMsg.QuoteMessage == nil || quoteMsg.QuoteMessage.Content == "" { return } if quoteMsg.QuoteMessage.Content == "e30=" { quoteMsg.QuoteMessage.Content = "{}" data, err := json.Marshal("eMsg) if err != nil { return } msg.Msg.Content = string(data) } if quoteMsg.QuoteMessage.Seq <= 0 && quoteMsg.QuoteMessage.ContentType == constant.MsgRevokeNotification { return } var msgs []*model.MsgInfoModel if v, ok := cache[quoteMsg.QuoteMessage.Seq]; ok { msgs = v } else { if quoteMsg.QuoteMessage.Seq > 0 { ms, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, db.msgTable.GetDocID(conversationID, quoteMsg.QuoteMessage.Seq), []int64{quoteMsg.QuoteMessage.Seq}) if err != nil { log.ZError(ctx, "GetMsgBySeqIndexIn1Doc", err, "conversationID", conversationID, "seq", quoteMsg.QuoteMessage.Seq) return } msgs = ms cache[quoteMsg.QuoteMessage.Seq] = ms } } if len(msgs) != 0 && msgs[0].Msg.ContentType != constant.MsgRevokeNotification { return } quoteMsg.QuoteMessage.ContentType = constant.MsgRevokeNotification if len(msgs) > 0 { quoteMsg.QuoteMessage.Content = msgs[0].Msg.Content } else { quoteMsg.QuoteMessage.Content = "{}" } data, err := json.Marshal("eMsg) if err != nil { log.ZError(ctx, "json.Marshal", err) return } msg.Msg.Content = string(data) } func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID string, conversationID string, seqs []int64) (totalMsgs []*model.MsgInfoModel, err error) { msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, docID, seqs) if err != nil { return nil, err } tempCache := make(map[int64][]*model.MsgInfoModel) for _, msg := range msgs { db.handlerDBMsg(ctx, tempCache, userID, conversationID, msg) } return msgs, err } // GetMsgBySeqsRange In the context of group chat, we have the following parameters: // // "maxSeq" of a conversation: It represents the maximum value of messages in the group conversation. // "minSeq" of a conversation (default: 1): It represents the minimum value of messages in the group conversation. // // For a user's perspective regarding the group conversation, we have the following parameters: // // "userMaxSeq": It represents the user's upper limit for message retrieval in the group. If not set (default: 0), // it means the upper limit is the same as the conversation's "maxSeq". // "userMinSeq": It represents the user's starting point for message retrieval in the group. If not set (default: 0), // it means the starting point is the same as the conversation's "minSeq". // // The scenarios for these parameters are as follows: // // For users who have been kicked out of the group, "userMaxSeq" can be set as the maximum value they had before // being kicked out. This limits their ability to retrieve messages up to a certain point. // For new users joining the group, if they don't need to receive old messages, // "userMinSeq" can be set as the same value as the conversation's "maxSeq" at the moment they join the group. // This ensures that their message retrieval starts from the point they joined. func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num, userMaxSeq int64) (int64, int64, []*sdkws.MsgData, error) { userMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID) if err != nil && !errors.Is(err, redis.Nil) { return 0, 0, nil, err } minSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID) if err != nil { return 0, 0, nil, err } if userMinSeq > minSeq { minSeq = userMinSeq } // "minSeq" represents the startSeq value that the user can retrieve. if minSeq > end { log.ZWarn(ctx, "minSeq > end", errs.New("minSeq>end"), "minSeq", minSeq, "end", end) return 0, 0, nil, nil } maxSeq, err := db.seqConversation.GetMaxSeq(ctx, conversationID) if err != nil { return 0, 0, nil, err } log.ZDebug(ctx, "GetMsgBySeqsRange", "userMinSeq", userMinSeq, "conMinSeq", minSeq, "conMaxSeq", maxSeq, "userMaxSeq", userMaxSeq) if userMaxSeq != 0 { if userMaxSeq < maxSeq { maxSeq = userMaxSeq } } // "maxSeq" represents the endSeq value that the user can retrieve. if begin < minSeq { begin = minSeq } if end > maxSeq { end = maxSeq } // "begin" and "end" represent the actual startSeq and endSeq values that the user can retrieve. if end < begin { return 0, 0, nil, errs.ErrArgs.WrapMsg("seq end < begin") } var seqs []int64 if end-begin+1 <= num { for i := begin; i <= end; i++ { seqs = append(seqs, i) } } else { for i := end - num + 1; i <= end; i++ { seqs = append(seqs, i) } } successMsgs, err := db.GetMessageBySeqs(ctx, conversationID, userID, seqs) if err != nil { return 0, 0, nil, err } return minSeq, maxSeq, successMsgs, nil } func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (int64, int64, []*sdkws.MsgData, error) { userMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID) if err != nil { return 0, 0, nil, err } minSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID) if err != nil { return 0, 0, nil, err } maxSeq, err := db.seqConversation.GetMaxSeq(ctx, conversationID) if err != nil { return 0, 0, nil, err } userMaxSeq, err := db.seqUser.GetUserMaxSeq(ctx, conversationID, userID) if err != nil { return 0, 0, nil, err } if userMinSeq > minSeq { minSeq = userMinSeq } if userMaxSeq > 0 && userMaxSeq < maxSeq { maxSeq = userMaxSeq } newSeqs := make([]int64, 0, len(seqs)) for _, seq := range seqs { if seq <= 0 { continue } if seq >= minSeq && seq <= maxSeq { newSeqs = append(newSeqs, seq) } } successMsgs, err := db.GetMessageBySeqs(ctx, conversationID, userID, newSeqs) if err != nil { return 0, 0, nil, err } return minSeq, maxSeq, successMsgs, nil } func (db *commonMsgDatabase) GetMessagesBySeqWithBounds(ctx context.Context, userID string, conversationID string, seqs []int64, pullOrder sdkws.PullOrder) (bool, int64, []*sdkws.MsgData, error) { var endSeq int64 var isEnd bool userMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID) if err != nil { return false, 0, nil, err } minSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID) if err != nil { return false, 0, nil, err } maxSeq, err := db.seqConversation.GetMaxSeq(ctx, conversationID) if err != nil { return false, 0, nil, err } userMaxSeq, err := db.seqUser.GetUserMaxSeq(ctx, conversationID, userID) if err != nil { return false, 0, nil, err } if userMinSeq > minSeq { minSeq = userMinSeq } if userMaxSeq > 0 && userMaxSeq < maxSeq { maxSeq = userMaxSeq } newSeqs := make([]int64, 0, len(seqs)) for _, seq := range seqs { if seq <= 0 { continue } // The normal range and can fetch messages if seq >= minSeq && seq <= maxSeq { newSeqs = append(newSeqs, seq) continue } // If the requested seq is smaller than the minimum seq and the pull order is descending (pulling older messages) if seq < minSeq && pullOrder == sdkws.PullOrder_PullOrderDesc { isEnd = true endSeq = minSeq } // If the requested seq is larger than the maximum seq and the pull order is ascending (pulling newer messages) if seq > maxSeq && pullOrder == sdkws.PullOrder_PullOrderAsc { isEnd = true endSeq = maxSeq } } if len(newSeqs) == 0 { return isEnd, endSeq, nil, nil } successMsgs, err := db.GetMessageBySeqs(ctx, conversationID, userID, newSeqs) if err != nil { return false, 0, nil, err } return isEnd, endSeq, successMsgs, nil } func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, allSeqs []int64) error { for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, allSeqs) { var indexes []int for _, seq := range seqs { indexes = append(indexes, int(db.msgTable.GetMsgIndex(seq))) } if err := db.msgDocDatabase.DeleteMsgsInOneDocByIndex(ctx, docID, indexes); err != nil { return err } } return db.msgCache.DelMessageBySeqs(ctx, conversationID, allSeqs) } func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error { for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, seqs) { for _, seq := range seqs { if _, err := db.msgDocDatabase.PushUnique(ctx, docID, db.msgTable.GetMsgIndex(seq), "del_list", []string{userID}); err != nil { return err } } } return db.msgCache.DelMessageBySeqs(ctx, conversationID, seqs) } func (db *commonMsgDatabase) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { return db.seqConversation.GetMaxSeqs(ctx, conversationIDs) } func (db *commonMsgDatabase) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { return db.seqConversation.GetMaxSeq(ctx, conversationID) } func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int64) error { return db.seqConversation.SetMinSeqs(ctx, seqs) } func (db *commonMsgDatabase) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error { return db.seqUser.SetUserMinSeqs(ctx, userID, seqs) } func (db *commonMsgDatabase) SetUserConversationsMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error { return db.seqUser.SetUserMaxSeq(ctx, conversationID, userID, seq) } func (db *commonMsgDatabase) SetUserConversationsMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error { return db.seqUser.SetUserMinSeq(ctx, conversationID, userID, seq) } func (db *commonMsgDatabase) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error { return db.seqUser.SetUserReadSeqs(ctx, userID, hasReadSeqs) } func (db *commonMsgDatabase) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error { return db.seqUser.SetUserReadSeq(ctx, conversationID, userID, hasReadSeq) } func (db *commonMsgDatabase) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) { return db.seqUser.GetUserReadSeqs(ctx, userID, conversationIDs) } func (db *commonMsgDatabase) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) { return db.seqUser.GetUserReadSeq(ctx, conversationID, userID) } func (db *commonMsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error { return db.msgCache.SetSendMsgStatus(ctx, id, status) } func (db *commonMsgDatabase) GetSendMsgStatus(ctx context.Context, id string) (int32, error) { return db.msgCache.GetSendMsgStatus(ctx, id) } func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) { minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, conversationID) if err != nil { return } minSeqCache, err = db.seqConversation.GetMinSeq(ctx, conversationID) if err != nil { return } maxSeqCache, err = db.seqConversation.GetMaxSeq(ctx, conversationID) if err != nil { return } return } func (db *commonMsgDatabase) GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) { return db.GetMinMaxSeqMongo(ctx, conversationID) } func (db *commonMsgDatabase) GetMinMaxSeqMongo(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) { oldestMsgMongo, err := db.msgDocDatabase.GetOldestMsg(ctx, conversationID) if err != nil { return } minSeqMongo = oldestMsgMongo.Msg.Seq newestMsgMongo, err := db.msgDocDatabase.GetNewestMsg(ctx, conversationID) if err != nil { return } maxSeqMongo = newestMsgMongo.Msg.Seq return } func (db *commonMsgDatabase) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) { return db.msgDocDatabase.RangeUserSendCount(ctx, start, end, group, ase, pageNumber, showNumber) } func (db *commonMsgDatabase) RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error) { return db.msgDocDatabase.RangeGroupSendCount(ctx, start, end, ase, pageNumber, showNumber) } func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int64, msgData []*pbmsg.SearchedMsgData, err error) { var totalMsgs []*pbmsg.SearchedMsgData total, msgs, err := db.msgDocDatabase.SearchMessage(ctx, req) if err != nil { return 0, nil, err } for _, msg := range msgs { if msg.IsRead { msg.Msg.IsRead = true } searchedMsgData := &pbmsg.SearchedMsgData{MsgData: convert.MsgDB2Pb(msg.Msg)} if msg.Revoke != nil { searchedMsgData.IsRevoked = true } totalMsgs = append(totalMsgs, searchedMsgData) } return total, totalMsgs, nil } func (db *commonMsgDatabase) FindOneByDocIDs(ctx context.Context, conversationIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error) { totalMsgs := make(map[string]*sdkws.MsgData) for _, conversationID := range conversationIDs { seq := seqs[conversationID] docID := db.msgTable.GetDocID(conversationID, seq) msgs, err := db.msgDocDatabase.FindOneByDocID(ctx, docID) if err != nil { return nil, err } index := db.msgTable.GetMsgIndex(seq) totalMsgs[conversationID] = convert.MsgDB2Pb(msgs.Msg[index].Msg) } return totalMsgs, nil } func (db *commonMsgDatabase) GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) { return db.msgDocDatabase.GetRandBeforeMsg(ctx, ts, limit) } func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, seq int64) error { dbSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID) if err != nil { if errors.Is(errs.Unwrap(err), redis.Nil) { return nil } return err } if dbSeq >= seq { return nil } return db.seqConversation.SetMinSeq(ctx, conversationID, seq) } func (db *commonMsgDatabase) GetCacheMaxSeqWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) { return db.seqConversation.GetCacheMaxSeqWithTime(ctx, conversationIDs) } func (db *commonMsgDatabase) GetMaxSeqWithTime(ctx context.Context, conversationID string) (database.SeqTime, error) { return db.seqConversation.GetMaxSeqWithTime(ctx, conversationID) } func (db *commonMsgDatabase) GetMaxSeqsWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) { // todo: only the time in the redis cache will be taken, not the message time return db.seqConversation.GetMaxSeqsWithTime(ctx, conversationIDs) } func (db *commonMsgDatabase) DeleteDoc(ctx context.Context, docID string) error { index := strings.LastIndex(docID, ":") if index <= 0 { return errs.ErrInternalServer.WrapMsg("docID is invalid", "docID", docID) } index, err := strconv.Atoi(docID[index+1:]) if err != nil { return errs.WrapMsg(err, "strconv.Atoi", "docID", docID) } conversationID := docID[:index] seqs := make([]int64, db.msgTable.GetSingleGocMsgNum()) minSeq := db.msgTable.GetMinSeq(index) for i := range seqs { seqs[i] = minSeq + int64(i) } if err := db.msgDocDatabase.DeleteDoc(ctx, docID); err != nil { return err } return db.msgCache.DelMessageBySeqs(ctx, conversationID, seqs) } func (db *commonMsgDatabase) GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) { return db.msgDocDatabase.GetLastMessageSeqByTime(ctx, conversationID, time) } func (db *commonMsgDatabase) handlerDeleteAndRevoked(ctx context.Context, userID string, msgs []*model.MsgInfoModel) { for i := range msgs { msg := msgs[i] if msg == nil || msg.Msg == nil { continue } msg.Msg.IsRead = msg.IsRead if datautil.Contain(userID, msg.DelList...) { msg.Msg.Content = "" msg.Msg.Status = constant.MsgDeleted } if msg.Revoke == nil { continue } msg.Msg.ContentType = constant.MsgRevokeNotification revokeContent := sdkws.MessageRevokedContent{ RevokerID: msg.Revoke.UserID, RevokerRole: msg.Revoke.Role, ClientMsgID: msg.Msg.ClientMsgID, RevokerNickname: msg.Revoke.Nickname, RevokeTime: msg.Revoke.Time, SourceMessageSendTime: msg.Msg.SendTime, SourceMessageSendID: msg.Msg.SendID, SourceMessageSenderNickname: msg.Msg.SenderNickname, SessionType: msg.Msg.SessionType, Seq: msg.Msg.Seq, Ex: msg.Msg.Ex, } data, err := jsonutil.JsonMarshal(&revokeContent) if err != nil { log.ZWarn(ctx, "handlerDeleteAndRevoked JsonMarshal MessageRevokedContent", err, "msg", msg) continue } elem := sdkws.NotificationElem{ Detail: string(data), } content, err := jsonutil.JsonMarshal(&elem) if err != nil { log.ZWarn(ctx, "handlerDeleteAndRevoked JsonMarshal NotificationElem", err, "msg", msg) continue } msg.Msg.Content = string(content) } } func (db *commonMsgDatabase) handlerQuote(ctx context.Context, userID, conversationID string, msgs []*model.MsgInfoModel) { temp := make(map[int64][]*model.MsgInfoModel) for i := range msgs { db.handlerDBMsg(ctx, temp, userID, conversationID, msgs[i]) } } func (db *commonMsgDatabase) GetMessageBySeqs(ctx context.Context, conversationID string, userID string, seqs []int64) ([]*sdkws.MsgData, error) { msgs, err := db.msgCache.GetMessageBySeqs(ctx, conversationID, seqs) if err != nil { return nil, err } db.handlerDeleteAndRevoked(ctx, userID, msgs) db.handlerQuote(ctx, userID, conversationID, msgs) seqMsgs := make(map[int64]*model.MsgInfoModel) for i, msg := range msgs { if msg.Msg == nil { continue } seqMsgs[msg.Msg.Seq] = msgs[i] } res := make([]*sdkws.MsgData, 0, len(seqs)) for _, seq := range seqs { if v, ok := seqMsgs[seq]; ok { res = append(res, convert.MsgDB2Pb(v.Msg)) } else { res = append(res, &sdkws.MsgData{Seq: seq, Status: constant.MsgStatusHasDeleted}) } } return res, nil } func (db *commonMsgDatabase) GetLastMessage(ctx context.Context, conversationIDs []string, userID string) (map[string]*sdkws.MsgData, error) { res := make(map[string]*sdkws.MsgData) for _, conversationID := range conversationIDs { if _, ok := res[conversationID]; ok { continue } msg, err := db.msgDocDatabase.GetLastMessage(ctx, conversationID) if err != nil { if errs.Unwrap(err) == mongo.ErrNoDocuments { continue } return nil, err } tmp := []*model.MsgInfoModel{msg} db.handlerDeleteAndRevoked(ctx, userID, tmp) db.handlerQuote(ctx, userID, conversationID, tmp) res[conversationID] = convert.MsgDB2Pb(msg.Msg) } return res, nil }