// Copyright © 2023 OpenIM. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package msg import ( "context" cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" "sort" utils2 "github.com/OpenIMSDK/tools/utils" "github.com/redis/go-redis/v9" "github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/msg" "github.com/OpenIMSDK/protocol/sdkws" "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/log" ) func (m *msgServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *msg.GetConversationsHasReadAndMaxSeqReq) (resp *msg.GetConversationsHasReadAndMaxSeqResp, err error) { var conversationIDs []string if len(req.ConversationIDs) == 0 { conversationIDs, err = m.ConversationLocalCache.GetConversationIDs(ctx, req.UserID) if err != nil { return nil, err } } else { conversationIDs = req.ConversationIDs } hasReadSeqs, err := m.MsgDatabase.GetHasReadSeqs(ctx, req.UserID, conversationIDs) if err != nil { return nil, err } conversations, err := m.Conversation.GetConversations(ctx, req.UserID, conversationIDs) if err != nil { return nil, err } conversationMaxSeqMap := make(map[string]int64) for _, conversation := range conversations { if conversation.MaxSeq != 0 { conversationMaxSeqMap[conversation.ConversationID] = conversation.MaxSeq } } maxSeqs, err := m.MsgDatabase.GetMaxSeqs(ctx, conversationIDs) if err != nil { return nil, err } resp = &msg.GetConversationsHasReadAndMaxSeqResp{Seqs: make(map[string]*msg.Seqs)} for conversarionID, maxSeq := range maxSeqs { resp.Seqs[conversarionID] = &msg.Seqs{ HasReadSeq: hasReadSeqs[conversarionID], MaxSeq: maxSeq, } if v, ok := conversationMaxSeqMap[conversarionID]; ok { resp.Seqs[conversarionID].MaxSeq = v } } return resp, nil } func (m *msgServer) GetConversationList(ctx context.Context, req *msg.GetConversationListReq) (resp *msg.GetConversationListResp, err error) { var conversationIDs []string if len(req.ConversationIDs) == 0 { conversationIDs, err = m.ConversationLocalCache.GetConversationIDs(ctx, req.UserID) if err != nil { return nil, err } } else { conversationIDs = req.ConversationIDs } conversations, err := m.Conversation.GetConversations(ctx, req.UserID, conversationIDs) if err != nil { return nil, err } maxSeqs, err := m.MsgDatabase.GetMaxSeqs(ctx, conversationIDs) if err != nil { return nil, err } chatLogs, err := m.MsgDatabase.FindOneByDocIDs(ctx, conversationIDs, maxSeqs) if err != nil { return nil, err } conversationMsg, err := m.getConversationInfo(ctx, chatLogs, req.UserID) if err != nil { return nil, err } hasReadSeqs, err := m.MsgDatabase.GetHasReadSeqs(ctx, req.UserID, conversationIDs) if err != nil { return nil, err } conversation_unreadCount := make(map[string]int64) for conversationID, maxSeq := range maxSeqs { conversation_unreadCount[conversationID] = maxSeq - hasReadSeqs[conversationID] } conversation_isPinkTime := make(map[int64]string) conversation_notPinkTime := make(map[int64]string) for _, v := range conversations { conversationID := v.ConversationID time := conversationMsg[conversationID].MsgInfo.LatestMsgRecvTime conversationMsg[conversationID].RecvMsgOpt = v.RecvMsgOpt if v.IsPinned { conversationMsg[conversationID].IsPinned = v.IsPinned conversation_isPinkTime[time] = conversationID continue } conversation_notPinkTime[time] = conversationID } resp = &msg.GetConversationListResp{ ConversationElems: []*msg.ConversationElem{}, } m.conversationSort(conversation_isPinkTime, resp, conversation_unreadCount, conversationMsg) m.conversationSort(conversation_notPinkTime, resp, conversation_unreadCount, conversationMsg) return resp, nil } func (m *msgServer) SetConversationHasReadSeq( ctx context.Context, req *msg.SetConversationHasReadSeqReq, ) (resp *msg.SetConversationHasReadSeqResp, err error) { maxSeq, err := m.MsgDatabase.GetMaxSeq(ctx, req.ConversationID) if err != nil { return } if req.HasReadSeq > maxSeq { return nil, errs.ErrArgs.Wrap("hasReadSeq must not be bigger than maxSeq") } if err := m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq); err != nil { return nil, err } if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID, req.UserID, nil, req.HasReadSeq); err != nil { return } return &msg.SetConversationHasReadSeqResp{}, nil } func (m *msgServer) MarkMsgsAsRead( ctx context.Context, req *msg.MarkMsgsAsReadReq, ) (resp *msg.MarkMsgsAsReadResp, err error) { if len(req.Seqs) < 1 { return nil, errs.ErrArgs.Wrap("seqs must not be empty") } maxSeq, err := m.MsgDatabase.GetMaxSeq(ctx, req.ConversationID) if err != nil { return } hasReadSeq := req.Seqs[len(req.Seqs)-1] if hasReadSeq > maxSeq { return nil, errs.ErrArgs.Wrap("hasReadSeq must not be bigger than maxSeq") } conversation, err := m.Conversation.GetConversation(ctx, req.UserID, req.ConversationID) if err != nil { return } if err = m.MsgDatabase.MarkSingleChatMsgsAsRead(ctx, req.UserID, req.ConversationID, req.Seqs); err != nil { return } currentHasReadSeq, err := m.MsgDatabase.GetHasReadSeq(ctx, req.UserID, req.ConversationID) if err != nil && errs.Unwrap(err) != redis.Nil { return } if hasReadSeq > currentHasReadSeq { err = m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, hasReadSeq) if err != nil { return } } if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID, m.conversationAndGetRecvID(conversation, req.UserID), req.Seqs, hasReadSeq); err != nil { return } return &msg.MarkMsgsAsReadResp{}, nil } func (m *msgServer) MarkConversationAsRead( ctx context.Context, req *msg.MarkConversationAsReadReq, ) (resp *msg.MarkConversationAsReadResp, err error) { conversation, err := m.Conversation.GetConversation(ctx, req.UserID, req.ConversationID) if err != nil { return nil, err } hasReadSeq, err := m.MsgDatabase.GetHasReadSeq(ctx, req.UserID, req.ConversationID) if err != nil && errs.Unwrap(err) != redis.Nil { return nil, err } var seqs []int64 log.ZDebug(ctx, "MarkConversationAsRead", "hasReadSeq", hasReadSeq, "req.HasReadSeq", req.HasReadSeq) if conversation.ConversationType == constant.SingleChatType { for i := hasReadSeq + 1; i <= req.HasReadSeq; i++ { seqs = append(seqs, i) } //avoid client missed call MarkConversationMessageAsRead by order for _, val := range req.Seqs { if !utils2.Contain(val, seqs...) { seqs = append(seqs, val) } } if len(seqs) > 0 { log.ZDebug(ctx, "MarkConversationAsRead", "seqs", seqs, "conversationID", req.ConversationID) if err = m.MsgDatabase.MarkSingleChatMsgsAsRead(ctx, req.UserID, req.ConversationID, seqs); err != nil { return nil, err } } if req.HasReadSeq > hasReadSeq { err = m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq) if err != nil { return nil, err } hasReadSeq = req.HasReadSeq } if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID, m.conversationAndGetRecvID(conversation, req.UserID), seqs, hasReadSeq); err != nil { return nil, err } } else if conversation.ConversationType == constant.SuperGroupChatType || conversation.ConversationType == constant.NotificationChatType { if req.HasReadSeq > hasReadSeq { err = m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq) if err != nil { return nil, err } hasReadSeq = req.HasReadSeq } if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID, req.UserID, seqs, hasReadSeq); err != nil { return nil, err } } reqCall := &cbapi.CallbackGroupMsgReadReq{ SendID: conversation.OwnerUserID, ReceiveID: req.UserID, UnreadMsgNum: req.HasReadSeq, ContentType: int64(conversation.ConversationType), } if err := CallbackGroupMsgRead(ctx, reqCall); err != nil { return nil, err } return &msg.MarkConversationAsReadResp{}, nil } func (m *msgServer) sendMarkAsReadNotification( ctx context.Context, conversationID string, sessionType int32, sendID, recvID string, seqs []int64, hasReadSeq int64, ) error { tips := &sdkws.MarkAsReadTips{ MarkAsReadUserID: sendID, ConversationID: conversationID, Seqs: seqs, HasReadSeq: hasReadSeq, } err := m.notificationSender.NotificationWithSesstionType(ctx, sendID, recvID, constant.HasReadReceipt, sessionType, tips) if err != nil { log.ZWarn(ctx, "send has read Receipt err", err) } return nil } func (m *msgServer) conversationSort( conversations map[int64]string, resp *msg.GetConversationListResp, conversation_unreadCount map[string]int64, conversationMsg map[string]*msg.ConversationElem, ) { keys := []int64{} for key := range conversations { keys = append(keys, key) } sort.Slice(keys[:], func(i, j int) bool { return keys[i] > keys[j] }) index := 0 cons := make([]*msg.ConversationElem, len(conversations)) for _, v := range keys { conversationID := conversations[v] conversationElem := conversationMsg[conversationID] conversationElem.UnreadCount = conversation_unreadCount[conversationID] cons[index] = conversationElem index++ } resp.ConversationElems = append(resp.ConversationElems, cons...) } func (m *msgServer) getConversationInfo( ctx context.Context, chatLogs map[string]*sdkws.MsgData, userID string) (map[string]*msg.ConversationElem, error) { var ( sendIDs []string groupIDs []string sendMap = make(map[string]*sdkws.UserInfo) groupMap = make(map[string]*sdkws.GroupInfo) conversationMsg = make(map[string]*msg.ConversationElem) ) for _, chatLog := range chatLogs { switch chatLog.SessionType { case constant.SingleChatType: if chatLog.SendID == userID { sendIDs = append(sendIDs, chatLog.RecvID) } sendIDs = append(sendIDs, chatLog.SendID) case constant.GroupChatType, constant.SuperGroupChatType: groupIDs = append(groupIDs, chatLog.GroupID) sendIDs = append(sendIDs, chatLog.SendID) } } if len(sendIDs) != 0 { sendInfos, err := m.User.GetUsersInfo(ctx, sendIDs) if err != nil { return nil, err } for _, sendInfo := range sendInfos { sendMap[sendInfo.UserID] = sendInfo } } if len(groupIDs) != 0 { groupInfos, err := m.Group.GetGroupInfos(ctx, groupIDs, false) if err != nil { return nil, err } for _, groupInfo := range groupInfos { groupMap[groupInfo.GroupID] = groupInfo } } for conversationID, chatLog := range chatLogs { pbchatLog := &msg.ConversationElem{} msgInfo := &msg.MsgInfo{} if err := utils2.CopyStructFields(msgInfo, chatLog); err != nil { return nil, err } switch chatLog.SessionType { case constant.SingleChatType: if chatLog.SendID == userID { msgInfo.FaceURL = sendMap[chatLog.RecvID].FaceURL msgInfo.SenderName = sendMap[chatLog.RecvID].Nickname break } msgInfo.FaceURL = sendMap[chatLog.SendID].FaceURL msgInfo.SenderName = sendMap[chatLog.SendID].Nickname case constant.GroupChatType, constant.SuperGroupChatType: msgInfo.GroupName = groupMap[chatLog.GroupID].GroupName msgInfo.GroupFaceURL = groupMap[chatLog.GroupID].FaceURL msgInfo.GroupMemberCount = groupMap[chatLog.GroupID].MemberCount msgInfo.GroupID = chatLog.GroupID msgInfo.GroupType = groupMap[chatLog.GroupID].GroupType msgInfo.SenderName = sendMap[chatLog.SendID].Nickname } pbchatLog.ConversationID = conversationID msgInfo.LatestMsgRecvTime = chatLog.SendTime pbchatLog.MsgInfo = msgInfo conversationMsg[conversationID] = pbchatLog } return conversationMsg, nil }