diff --git a/internal/api/msg.go b/internal/api/msg.go index a5514bf12..627918ed8 100644 --- a/internal/api/msg.go +++ b/internal/api/msg.go @@ -62,8 +62,6 @@ func (m Message) newUserSendMsgReq(c *gin.Context, params *apistruct.ManagementS fallthrough case constant.CustomOnlineOnly: fallthrough - case constant.AdvancedRevoke: - newContent = utils.StructToJsonString(params.Content) case constant.Revoke: newContent = params.Content["revokeMsgClientID"].(string) default: @@ -197,8 +195,6 @@ func (m *Message) SendMessage(c *gin.Context) { data = apistruct.CustomElem{} case constant.Revoke: data = apistruct.RevokeElem{} - case constant.AdvancedRevoke: - data = apistruct.MessageRevoked{} case constant.OANotification: data = apistruct.OANotificationElem{} params.SessionType = constant.NotificationChatType diff --git a/internal/rpc/msg/as_read.go b/internal/rpc/msg/as_read.go index 5330408e1..b996d89c2 100644 --- a/internal/rpc/msg/as_read.go +++ b/internal/rpc/msg/as_read.go @@ -5,23 +5,14 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" + "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" ) func (m *msgServer) MarkMsgsAsRead(ctx context.Context, req *msg.MarkMsgsAsReadReq) (resp *msg.MarkMsgsAsReadResp, err error) { - conversations, err := m.Conversation.GetConversationsByConversationID(ctx, []string{req.ConversationID}) + recvID, err := m.getConversationAndGetRecvID(ctx, req.ConversationID, req.UserID) if err != nil { return } - var recvID string - if conversations[0].ConversationType == constant.SingleChatType || conversations[0].ConversationType == constant.NotificationChatType { - if req.UserID == conversations[0].OwnerUserID { - recvID = conversations[0].UserID - } else { - recvID = conversations[0].OwnerUserID - } - } else if conversations[0].ConversationType == constant.SuperGroupChatType { - recvID = conversations[0].GroupID - } err = m.MsgDatabase.MarkSingleChatMsgsAsRead(ctx, req.ConversationID, req.UserID, req.Seqs) if err != nil { return @@ -33,11 +24,11 @@ func (m *msgServer) MarkMsgsAsRead(ctx context.Context, req *msg.MarkMsgsAsReadR } func (m *msgServer) sendMarkAsReadNotification(ctx context.Context, conversationID string, sendID, recvID string, seqs []int64) error { - // tips := &sdkws.MarkAsReadTips{ - // MarkAsReadUserID: sendID, - // ConversationID: conversationID, - // Seqs: seqs, - // } - // m.notificationSender.NotificationWithSesstionType(ctx) + tips := &sdkws.MarkAsReadTips{ + MarkAsReadUserID: sendID, + ConversationID: conversationID, + Seqs: seqs, + } + m.notificationSender.NotificationWithSesstionType(ctx, sendID, recvID, constant.HasReadReceipt, constant.SingleChatType, tips) return nil } diff --git a/internal/rpc/msg/delete.go b/internal/rpc/msg/delete.go index 57086475a..a3020d4c2 100644 --- a/internal/rpc/msg/delete.go +++ b/internal/rpc/msg/delete.go @@ -64,18 +64,8 @@ func (m *msgServer) DeleteMsgs(ctx context.Context, req *msg.DeleteMsgsReq) (*ms if err != nil { return nil, err } - var recvID string - if conversations[0].ConversationType == constant.SingleChatType || conversations[0].ConversationType == constant.NotificationChatType { - if conversations[0].OwnerUserID == recvID { - recvID = conversations[0].UserID - } else if conversations[0].UserID == recvID { - recvID = conversations[0].OwnerUserID - } - } else if conversations[0].ConversationType == constant.SuperGroupChatType { - recvID = conversations[0].GroupID - } tips := &sdkws.DeleteMsgsTips{UserID: req.UserID, ConversationID: req.ConversationID, Seqs: req.Seqs} - m.notificationSender.NotificationWithSesstionType(ctx, req.UserID, recvID, constant.DeleteMsgsNotification, conversations[0].ConversationType, tips) + m.notificationSender.NotificationWithSesstionType(ctx, req.UserID, m.conversationAndGetRecvID(conversations[0], req.UserID), constant.DeleteMsgsNotification, conversations[0].ConversationType, tips) } else { if err := m.MsgDatabase.DeleteUserMsgsBySeqs(ctx, req.UserID, req.ConversationID, req.Seqs); err != nil { return nil, err @@ -139,18 +129,8 @@ func (m *msgServer) clearConversation(ctx context.Context, conversationIDs []str return err } for _, conversation := range existConversations { - var recvID string - if conversation.ConversationType == constant.SingleChatType || conversation.ConversationType == constant.NotificationChatType { - if conversation.OwnerUserID == recvID { - recvID = conversation.UserID - } else if conversation.UserID == recvID { - recvID = conversation.OwnerUserID - } - } else if conversation.ConversationType == constant.SuperGroupChatType { - recvID = conversation.GroupID - } tips := &sdkws.ClearConversationTips{UserID: userID, ConversationIDs: []string{conversation.ConversationID}} - m.notificationSender.NotificationWithSesstionType(ctx, userID, recvID, constant.ClearConversationNotification, conversation.ConversationType, tips) + m.notificationSender.NotificationWithSesstionType(ctx, userID, m.conversationAndGetRecvID(conversation, userID), constant.ClearConversationNotification, conversation.ConversationType, tips) } } return nil diff --git a/internal/rpc/msg/message_interceptor.go b/internal/rpc/msg/message_interceptor.go index 9d4314dff..d142abbbe 100644 --- a/internal/rpc/msg/message_interceptor.go +++ b/internal/rpc/msg/message_interceptor.go @@ -14,14 +14,14 @@ import ( type MessageInterceptorFunc func(ctx context.Context, req *msg.SendMsgReq) (*sdkws.MsgData, error) func MessageHasReadEnabled(_ context.Context, req *msg.SendMsgReq) (*sdkws.MsgData, error) { - switch req.MsgData.ContentType { - case constant.HasReadReceipt: + switch { + case req.MsgData.ContentType == constant.HasReadReceipt && req.MsgData.SessionType == constant.SingleChatType: if config.Config.SingleMessageHasReadReceiptEnable { return req.MsgData, nil } else { return nil, errs.ErrMessageHasReadDisable.Wrap() } - case constant.GroupHasReadReceipt: + case req.MsgData.ContentType == constant.HasReadReceipt && req.MsgData.SessionType == constant.SuperGroupChatType: if config.Config.GroupMessageHasReadReceiptEnable { return req.MsgData, nil } else { diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index f51ed2c2a..d4e0ac2be 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -3,6 +3,7 @@ package msg import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/localcache" @@ -10,6 +11,7 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome" "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" + "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" "google.golang.org/grpc" @@ -100,3 +102,24 @@ func (m *msgServer) initPrometheus() { prome.NewWorkSuperGroupChatMsgProcessSuccessCounter() prome.NewWorkSuperGroupChatMsgProcessFailedCounter() } + +func (m *msgServer) getConversationAndGetRecvID(ctx context.Context, userID, conversationID string) (recvID string, err error) { + conversations, err := m.Conversation.GetConversationsByConversationID(ctx, []string{conversationID}) + if err != nil { + return + } + return m.conversationAndGetRecvID(conversations[0], userID), nil +} + +func (m *msgServer) conversationAndGetRecvID(conversation *conversation.Conversation, userID string) (recvID string) { + if conversation.ConversationType == constant.SingleChatType || conversation.ConversationType == constant.NotificationChatType { + if userID == conversation.OwnerUserID { + recvID = conversation.UserID + } else { + recvID = conversation.OwnerUserID + } + } else if conversation.ConversationType == constant.SuperGroupChatType { + recvID = conversation.GroupID + } + return +} diff --git a/internal/rpc/msg/utils.go b/internal/rpc/msg/utils.go index cb31f97f2..c4790241b 100644 --- a/internal/rpc/msg/utils.go +++ b/internal/rpc/msg/utils.go @@ -10,14 +10,14 @@ import ( ) func isMessageHasReadEnabled(msgData *sdkws.MsgData) bool { - switch msgData.ContentType { - case constant.HasReadReceipt: + switch { + case msgData.ContentType == constant.HasReadReceipt && msgData.SessionType == constant.SingleChatType: if config.Config.SingleMessageHasReadReceiptEnable { return true } else { return false } - case constant.GroupHasReadReceipt: + case msgData.ContentType == constant.HasReadReceipt && msgData.SessionType == constant.SuperGroupChatType: if config.Config.GroupMessageHasReadReceiptEnable { return true } else { diff --git a/internal/rpc/msg/verify.go b/internal/rpc/msg/verify.go index 5a190bbe3..319a37e4e 100644 --- a/internal/rpc/msg/verify.go +++ b/internal/rpc/msg/verify.go @@ -15,7 +15,7 @@ import ( ) var ( - ExcludeContentType = []int{constant.HasReadReceipt, constant.GroupHasReadReceipt} + ExcludeContentType = []int{constant.HasReadReceipt} ) type Validator interface { diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index a727c56d9..0d3f7005c 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -4,23 +4,22 @@ const ( ///ContentType //UserRelated - Text = 101 - Picture = 102 - Voice = 103 - Video = 104 - File = 105 - AtText = 106 - Merger = 107 - Card = 108 - Location = 109 - Custom = 110 - Revoke = 111 - HasReadReceipt = 112 - Typing = 113 - Quote = 114 - GroupHasReadReceipt = 116 - AdvancedText = 117 - AdvancedRevoke = 118 //影响前者消息 + Text = 101 + Picture = 102 + Voice = 103 + Video = 104 + File = 105 + AtText = 106 + Merger = 107 + Card = 108 + Location = 109 + Custom = 110 + Revoke = 111 + Typing = 113 + Quote = 114 + + AdvancedText = 117 + CustomNotTriggerConversation = 119 CustomOnlineOnly = 120 ReactionMessageModifier = 121 @@ -93,6 +92,8 @@ const ( ClearConversationNotification = 2101 DeleteMsgsNotification = 2102 + HasReadReceipt = 2200 + NotificationEnd = 5000 //status diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 18fba6ee6..b970a2758 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -299,8 +299,18 @@ func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID strin return db.BatchInsertBlock(ctx, conversationID, []any{revoke}, updateKeyRevoke, seq) } -func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, seqs []int64) error { - return db.msgDocDatabase.MarkSingleChatMsgsAsRead(ctx, userID, conversationID, seqs) +func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, totalSeqs []int64) error { + for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, totalSeqs) { + var indexes []int64 + for _, seq := range seqs { + indexes = append(indexes, db.msg.GetMsgIndex(seq)) + } + if err := db.msgDocDatabase.MarkSingleChatMsgsAsRead(ctx, userID, docID, indexes); err != nil { + log.ZError(ctx, "MarkSingleChatMsgsAsRead", err, "userID", userID, "docID", docID, "indexes", indexes) + return err + } + } + return nil } func (db *commonMsgDatabase) DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error { diff --git a/pkg/common/db/table/unrelation/msg.go b/pkg/common/db/table/unrelation/msg.go index 3d2af60be..329610ebf 100644 --- a/pkg/common/db/table/unrelation/msg.go +++ b/pkg/common/db/table/unrelation/msg.go @@ -51,6 +51,7 @@ type MsgDataModel struct { SendTime int64 `bson:"send_time"` CreateTime int64 `bson:"create_time"` Status int32 `bson:"status"` + IsRead bool `bson:"is_read"` Options map[string]bool `bson:"options"` OfflinePush *OfflinePushModel `bson:"offline_push"` AtUserIDList []string `bson:"at_user_id_list"` @@ -62,6 +63,7 @@ type MsgInfoModel struct { Msg *MsgDataModel `bson:"msg"` Revoke *RevokeModel `bson:"revoke"` DelList []string `bson:"del_list"` + IsRead bool `bson:"is_read"` } type MsgDocModelInterface interface { @@ -78,7 +80,7 @@ type MsgDocModelInterface interface { DeleteDocs(ctx context.Context, docIDs []string) error GetMsgDocModelByIndex(ctx context.Context, conversationID string, index, sort int64) (*MsgDocModel, error) DeleteMsgsInOneDocByIndex(ctx context.Context, docID string, indexes []int) error - MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, seqs []int64) error + MarkSingleChatMsgsAsRead(ctx context.Context, userID string, docID string, indexes []int64) error } func (MsgDocModel) TableName() string { diff --git a/pkg/common/db/unrelation/msg.go b/pkg/common/db/unrelation/msg.go index 629f13b81..194aa6e69 100644 --- a/pkg/common/db/unrelation/msg.go +++ b/pkg/common/db/unrelation/msg.go @@ -280,11 +280,17 @@ func (m *MsgMongoDriver) IsExistDocID(ctx context.Context, docID string) (bool, return count > 0, nil } -func (m *MsgMongoDriver) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, seqs []int64) error { - indexs := make([]int64, 0, len(seqs)) - for _, seq := range seqs { - indexs = append(indexs, m.model.GetMsgIndex(seq)) +func (m *MsgMongoDriver) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, docID string, indexes []int64) error { + updates := bson.M{ + "$set": bson.M{}, + } + for _, index := range indexes { + updates["$set"].(bson.M)[fmt.Sprintf("msgs.%d.is_read", index)] = true + } + filter := bson.M{"doc_id": docID} + _, err := m.MsgCollection.UpdateMany(ctx, filter, updates) + if err != nil { + return utils.Wrap(err, "") } - return nil }