diff --git a/go.mod b/go.mod index 1b80650c5..2db34a8b1 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.19 require ( firebase.google.com/go v3.13.0+incompatible - github.com/OpenIMSDK/protocol v0.0.40 + github.com/OpenIMSDK/protocol v0.0.41 github.com/OpenIMSDK/tools v0.0.21 github.com/bwmarrin/snowflake v0.3.0 // indirect github.com/dtm-labs/rockscache v0.1.1 @@ -156,5 +156,3 @@ require ( golang.org/x/crypto v0.14.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect ) - -replace github.com/OpenIMSDK/protocol v0.0.40 => github.com/luhaoling/protocol v0.0.0-20231227040641-2f934a0d64a3 diff --git a/go.sum b/go.sum index 881000381..fc1d15242 100644 --- a/go.sum +++ b/go.sum @@ -18,8 +18,8 @@ firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIw github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c= github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= -github.com/OpenIMSDK/protocol v0.0.40 h1:1/Oij6RSAaePCPrWGwp9Cyz976/8Uxr94hM5M5FXzlg= -github.com/OpenIMSDK/protocol v0.0.40/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= +github.com/OpenIMSDK/protocol v0.0.42 h1:vIWXqZJZZ1ddleJA25fxhjZ1GyEHATpYM3wVWh4/+PY= +github.com/OpenIMSDK/protocol v0.0.42/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= github.com/OpenIMSDK/tools v0.0.21 h1:iTapc2mIEVH/xl5Nd6jfwPub11Pgp44tVcE1rjB3a48= github.com/OpenIMSDK/tools v0.0.21/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= @@ -227,8 +227,6 @@ github.com/lestrrat-go/strftime v1.0.6 h1:CFGsDEt1pOpFNU+TJB0nhz9jl+K0hZSLE205Ah github.com/lestrrat-go/strftime v1.0.6/go.mod h1:f7jQKgV5nnJpYgdEasS+/y7EsTb8ykN2z68n3TtcTaw= github.com/lithammer/shortuuid v3.0.0+incompatible h1:NcD0xWW/MZYXEHa6ITy6kaXN5nwm/V115vj2YXfhS0w= github.com/lithammer/shortuuid v3.0.0+incompatible/go.mod h1:FR74pbAuElzOUuenUHTK2Tciko1/vKuIKS9dSkDrA4w= -github.com/luhaoling/protocol v0.0.0-20231227040641-2f934a0d64a3 h1:HZz2U/M3T4x9SqPxWdrD9MZy7jxx7nS+nx/aRN9m3RQ= -github.com/luhaoling/protocol v0.0.0-20231227040641-2f934a0d64a3/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= diff --git a/internal/api/conversation.go b/internal/api/conversation.go index e422de677..6463cbde6 100644 --- a/internal/api/conversation.go +++ b/internal/api/conversation.go @@ -33,6 +33,10 @@ func (o *ConversationApi) GetAllConversations(c *gin.Context) { a2r.Call(conversation.ConversationClient.GetAllConversations, o.Client, c) } +func (o *ConversationApi) GetConversationsList(c *gin.Context) { + a2r.Call(conversation.ConversationClient.GetConversationList, o.Client, c) +} + func (o *ConversationApi) GetConversation(c *gin.Context) { a2r.Call(conversation.ConversationClient.GetConversation, o.Client, c) } diff --git a/internal/api/msg.go b/internal/api/msg.go index 8346ad860..548450534 100644 --- a/internal/api/msg.go +++ b/internal/api/msg.go @@ -250,13 +250,14 @@ func (m *MessageApi) SendBusinessNotification(c *gin.Context) { req := struct { Key string `json:"key"` Data string `json:"data"` - SendUserID string `json:"sendUserID"` - RecvUserID string `json:"recvUserID"` + SendUserID string `json:"sendUserID" binding:"required"` + RecvUserID string `json:"recvUserID" binding:"required"` }{} if err := c.BindJSON(&req); err != nil { apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) return } + if !authverify.IsAppManagerUid(c) { apiresp.GinError(c, errs.ErrNoPermission.Wrap("only app manager can send message")) return diff --git a/internal/api/route.go b/internal/api/route.go index 766912d45..1c91f4dde 100644 --- a/internal/api/route.go +++ b/internal/api/route.go @@ -204,6 +204,7 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive conversationGroup := r.Group("/conversation", ParseToken) { c := NewConversationApi(*conversationRpc) + conversationGroup.POST("/get_conversations_list", c.GetConversationsList) conversationGroup.POST("/get_all_conversations", c.GetAllConversations) conversationGroup.POST("/get_conversation", c.GetConversation) conversationGroup.POST("/get_conversations", c.GetConversations) diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 88c9ff7ff..b80e32953 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -17,6 +17,8 @@ package conversation import ( "context" "errors" + "github.com/OpenIMSDK/protocol/sdkws" + "sort" "github.com/OpenIMSDK/tools/tx" @@ -41,6 +43,8 @@ import ( ) type conversationServer struct { + msgRpcClient *rpcclient.MessageRpcClient + user *rpcclient.UserRpcClient groupRpcClient *rpcclient.GroupRpcClient conversationDatabase controller.ConversationDatabase conversationNotificationSender *notification.ConversationNotificationSender @@ -61,7 +65,10 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e } groupRpcClient := rpcclient.NewGroupRpcClient(client) msgRpcClient := rpcclient.NewMessageRpcClient(client) + userRpcClient := rpcclient.NewUserRpcClient(client) pbconversation.RegisterConversationServer(server, &conversationServer{ + msgRpcClient: &msgRpcClient, + user: &userRpcClient, conversationNotificationSender: notification.NewConversationNotificationSender(&msgRpcClient), groupRpcClient: &groupRpcClient, conversationDatabase: controller.NewConversationDatabase(conversationDB, cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), tx.NewMongo(mongo.GetClient())), @@ -82,6 +89,73 @@ func (c *conversationServer) GetConversation(ctx context.Context, req *pbconvers return resp, nil } +func (m *conversationServer) GetConversationList(ctx context.Context, req *pbconversation.GetConversationListReq) (resp *pbconversation.GetConversationListResp, err error) { + log.ZDebug(ctx, "GetConversationList", "seqs", req, "userID", req.UserID) + var conversationIDs []string + if len(req.ConversationIDs) == 0 { + conversationIDs, err = m.conversationDatabase.GetConversationIDs(ctx, req.UserID) + if err != nil { + return nil, err + } + } else { + conversationIDs = req.ConversationIDs + } + + conversations, err := m.conversationDatabase.FindConversations(ctx, req.UserID, conversationIDs) + if err != nil { + return nil, err + } + if len(conversations) == 0 { + return nil, errs.ErrRecordNotFound.Wrap() + } + + maxSeqs, err := m.msgRpcClient.GetMaxSeqs(ctx, conversationIDs) + if err != nil { + return nil, err + } + + chatLogs, err := m.msgRpcClient.GetMsgByConversationIDs(ctx, conversationIDs, maxSeqs) + if err != nil { + return nil, err + } + + conversationMsg, err := m.getConversationInfo(ctx, chatLogs, req.UserID) + if err != nil { + return nil, err + } + + hasReadSeqs, err := m.msgRpcClient.GetHasReadSeqs(ctx, req.UserID, conversationIDs) + if err != nil { + return nil, err + } + + conversation_unreadCount := make(map[string]int64) + for conversationID, maxSeq := range maxSeqs { + conversation_unreadCount[conversationID] = maxSeq - hasReadSeqs[conversationID] + } + + conversation_isPinkTime := make(map[int64]string) + conversation_notPinkTime := make(map[int64]string) + for _, v := range conversations { + conversationID := v.ConversationID + time := conversationMsg[conversationID].MsgInfo.LatestMsgRecvTime + conversationMsg[conversationID].RecvMsgOpt = v.RecvMsgOpt + if v.IsPinned { + conversationMsg[conversationID].IsPinned = v.IsPinned + conversation_isPinkTime[time] = conversationID + continue + } + conversation_notPinkTime[time] = conversationID + } + resp = &pbconversation.GetConversationListResp{ + ConversationElems: []*pbconversation.ConversationElem{}, + } + + m.conversationSort(conversation_isPinkTime, resp, conversation_unreadCount, conversationMsg) + m.conversationSort(conversation_notPinkTime, resp, conversation_unreadCount, conversationMsg) + return resp, nil +} + func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbconversation.GetAllConversationsReq) (*pbconversation.GetAllConversationsResp, error) { conversations, err := c.conversationDatabase.GetUserAllConversation(ctx, req.OwnerUserID) if err != nil { @@ -348,3 +422,102 @@ func (c *conversationServer) GetConversationOfflinePushUserIDs( } return &pbconversation.GetConversationOfflinePushUserIDsResp{UserIDs: utils.Keys(userIDSet)}, nil } + +func (c *conversationServer) conversationSort( + conversations map[int64]string, + resp *pbconversation.GetConversationListResp, + conversation_unreadCount map[string]int64, + conversationMsg map[string]*pbconversation.ConversationElem, +) { + keys := []int64{} + for key := range conversations { + keys = append(keys, key) + } + + sort.Slice(keys[:], func(i, j int) bool { + return keys[i] > keys[j] + }) + index := 0 + + cons := make([]*pbconversation.ConversationElem, len(conversations)) + for _, v := range keys { + conversationID := conversations[v] + conversationElem := conversationMsg[conversationID] + conversationElem.UnreadCount = conversation_unreadCount[conversationID] + cons[index] = conversationElem + index++ + } + resp.ConversationElems = append(resp.ConversationElems, cons...) +} + +func (c *conversationServer) getConversationInfo( + ctx context.Context, + chatLogs map[string]*sdkws.MsgData, + userID string) (map[string]*pbconversation.ConversationElem, error) { + var ( + sendIDs []string + groupIDs []string + sendMap = make(map[string]*sdkws.UserInfo) + groupMap = make(map[string]*sdkws.GroupInfo) + conversationMsg = make(map[string]*pbconversation.ConversationElem) + ) + for _, chatLog := range chatLogs { + switch chatLog.SessionType { + case constant.SingleChatType: + if chatLog.SendID == userID { + sendIDs = append(sendIDs, chatLog.RecvID) + } + sendIDs = append(sendIDs, chatLog.SendID) + case constant.GroupChatType, constant.SuperGroupChatType: + groupIDs = append(groupIDs, chatLog.GroupID) + sendIDs = append(sendIDs, chatLog.SendID) + } + } + if len(sendIDs) != 0 { + sendInfos, err := c.user.GetUsersInfo(ctx, sendIDs) + if err != nil { + return nil, err + } + for _, sendInfo := range sendInfos { + sendMap[sendInfo.UserID] = sendInfo + } + } + if len(groupIDs) != 0 { + groupInfos, err := c.groupRpcClient.GetGroupInfos(ctx, groupIDs, false) + if err != nil { + return nil, err + } + for _, groupInfo := range groupInfos { + groupMap[groupInfo.GroupID] = groupInfo + } + } + for conversationID, chatLog := range chatLogs { + pbchatLog := &pbconversation.ConversationElem{} + msgInfo := &pbconversation.MsgInfo{} + if err := utils.CopyStructFields(msgInfo, chatLog); err != nil { + return nil, err + } + switch chatLog.SessionType { + case constant.SingleChatType: + if chatLog.SendID == userID { + msgInfo.FaceURL = sendMap[chatLog.RecvID].FaceURL + msgInfo.SenderName = sendMap[chatLog.RecvID].Nickname + break + } + msgInfo.FaceURL = sendMap[chatLog.SendID].FaceURL + msgInfo.SenderName = sendMap[chatLog.SendID].Nickname + case constant.GroupChatType, constant.SuperGroupChatType: + msgInfo.GroupName = groupMap[chatLog.GroupID].GroupName + msgInfo.GroupFaceURL = groupMap[chatLog.GroupID].FaceURL + msgInfo.GroupMemberCount = groupMap[chatLog.GroupID].MemberCount + msgInfo.GroupID = chatLog.GroupID + msgInfo.GroupType = groupMap[chatLog.GroupID].GroupType + msgInfo.SenderName = sendMap[chatLog.SendID].Nickname + } + pbchatLog.ConversationID = conversationID + msgInfo.LatestMsgRecvTime = chatLog.SendTime + pbchatLog.MsgInfo = msgInfo + conversationMsg[conversationID] = pbchatLog + } + return conversationMsg, nil +} diff --git a/internal/rpc/msg/as_read.go b/internal/rpc/msg/as_read.go index 5324ccba8..71e038b39 100644 --- a/internal/rpc/msg/as_read.go +++ b/internal/rpc/msg/as_read.go @@ -18,6 +18,7 @@ import ( "context" utils2 "github.com/OpenIMSDK/tools/utils" + cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" "github.com/redis/go-redis/v9" @@ -26,8 +27,6 @@ import ( "github.com/OpenIMSDK/protocol/sdkws" "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/log" - - cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" ) func (m *msgServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *msg.GetConversationsHasReadAndMaxSeqReq) (resp *msg.GetConversationsHasReadAndMaxSeqResp, err error) { diff --git a/internal/rpc/msg/seq.go b/internal/rpc/msg/seq.go index 4f6a01e8d..c12f258b7 100644 --- a/internal/rpc/msg/seq.go +++ b/internal/rpc/msg/seq.go @@ -16,7 +16,6 @@ package msg import ( "context" - pbmsg "github.com/OpenIMSDK/protocol/msg" ) @@ -30,3 +29,27 @@ func (m *msgServer) GetConversationMaxSeq( } return &pbmsg.GetConversationMaxSeqResp{MaxSeq: maxSeq}, nil } + +func (m *msgServer) GetMaxSeqs(ctx context.Context, req *pbmsg.GetMaxSeqsReq) (*pbmsg.SeqsInfoResp, error) { + maxSeqs, err := m.MsgDatabase.GetMaxSeqs(ctx, req.ConversationIDs) + if err != nil { + return nil, err + } + return &pbmsg.SeqsInfoResp{MaxSeqs: maxSeqs}, nil +} + +func (m *msgServer) GetHasReadSeqs(ctx context.Context, req *pbmsg.GetHasReadSeqsReq) (*pbmsg.SeqsInfoResp, error) { + hasReadSeqs, err := m.MsgDatabase.GetHasReadSeqs(ctx, req.UserID, req.ConversationIDs) + if err != nil { + return nil, err + } + return &pbmsg.SeqsInfoResp{MaxSeqs: hasReadSeqs}, nil +} + +func (m *msgServer) GetMsgByConversationIDs(ctx context.Context, req *pbmsg.GetMsgByConversationIDsReq) (*pbmsg.GetMsgByConversationIDsResp, error) { + Msgs, err := m.MsgDatabase.FindOneByDocIDs(ctx, req.ConversationIDs, req.MaxSeqs) + if err != nil { + return nil, err + } + return &pbmsg.GetMsgByConversationIDsResp{MsgDatas: Msgs}, nil +} diff --git a/pkg/apistruct/manage.go b/pkg/apistruct/manage.go index a48f5253c..f9f542835 100644 --- a/pkg/apistruct/manage.go +++ b/pkg/apistruct/manage.go @@ -64,6 +64,30 @@ type SendMsgReq struct { SendMsg } +type GetConversationListReq struct { + // userID uniquely identifies the user. + UserID string `protobuf:"bytes,1,opt,name=userID,proto3" json:"userID,omitempty" binding:"required"` + + // ConversationIDs contains a list of unique identifiers for conversations. + ConversationIDs []string `protobuf:"bytes,2,rep,name=conversationIDs,proto3" json:"conversationIDs,omitempty"` +} + +type GetConversationListResp struct { + // ConversationElems is a map that associates conversation IDs with their respective details. + ConversationElems map[string]*ConversationElem `protobuf:"bytes,1,rep,name=conversationElems,proto3" json:"conversationElems,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` +} + +type ConversationElem struct { + // MaxSeq represents the maximum sequence number within the conversation. + MaxSeq int64 `protobuf:"varint,1,opt,name=maxSeq,proto3" json:"maxSeq,omitempty"` + + // UnreadSeq represents the number of unread messages in the conversation. + UnreadSeq int64 `protobuf:"varint,2,opt,name=unreadSeq,proto3" json:"unreadSeq,omitempty"` + + // LastSeqTime represents the timestamp of the last sequence in the conversation. + LastSeqTime int64 `protobuf:"varint,3,opt,name=LastSeqTime,proto3" json:"LastSeqTime,omitempty"` +} + // BatchSendMsgReq defines the structure for sending a message to multiple recipients. type BatchSendMsgReq struct { SendMsg diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index fb0a9c702..dfff5c61d 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -98,6 +98,7 @@ type CommonMsgDatabase interface { SetSendMsgStatus(ctx context.Context, id string, status int32) error GetSendMsgStatus(ctx context.Context, id string) (int32, error) SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int32, msgData []*sdkws.MsgData, err error) + FindOneByDocIDs(ctx context.Context, docIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error) // to mq MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error @@ -1051,6 +1052,21 @@ func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbmsg.Searc return total, totalMsgs, nil } +func (db *commonMsgDatabase) FindOneByDocIDs(ctx context.Context, conversationIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error) { + totalMsgs := make(map[string]*sdkws.MsgData) + for _, conversationID := range conversationIDs { + seq := seqs[conversationID] + docID := db.msg.GetDocID(conversationID, seq) + msgs, err := db.msgDocDatabase.FindOneByDocID(ctx, docID) + if err != nil { + return nil, err + } + index := db.msg.GetMsgIndex(seq) + totalMsgs[conversationID] = convert.MsgDB2Pb(msgs.Msg[index].Msg) + } + return totalMsgs, nil +} + func (db *commonMsgDatabase) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) { db.msgDocDatabase.ConvertMsgsDocLen(ctx, conversationIDs) } diff --git a/pkg/rpcclient/msg.go b/pkg/rpcclient/msg.go index 03769dc74..abad0075a 100644 --- a/pkg/rpcclient/msg.go +++ b/pkg/rpcclient/msg.go @@ -17,7 +17,6 @@ package rpcclient import ( "context" "encoding/json" - "google.golang.org/grpc" "google.golang.org/protobuf/proto" @@ -157,6 +156,30 @@ func (m *MessageRpcClient) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqRe return resp, err } +func (m *MessageRpcClient) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { + log.ZDebug(ctx, "GetMaxSeqs", "conversationIDs", conversationIDs) + resp, err := m.Client.GetMaxSeqs(ctx, &msg.GetMaxSeqsReq{ + ConversationIDs: conversationIDs, + }) + return resp.MaxSeqs, err +} + +func (m *MessageRpcClient) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) { + resp, err := m.Client.GetHasReadSeqs(ctx, &msg.GetHasReadSeqsReq{ + UserID: userID, + ConversationIDs: conversationIDs, + }) + return resp.MaxSeqs, err +} + +func (m *MessageRpcClient) GetMsgByConversationIDs(ctx context.Context, docIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error) { + resp, err := m.Client.GetMsgByConversationIDs(ctx, &msg.GetMsgByConversationIDsReq{ + ConversationIDs: docIDs, + MaxSeqs: seqs, + }) + return resp.MsgDatas, err +} + func (m *MessageRpcClient) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMessageBySeqsReq) (*sdkws.PullMessageBySeqsResp, error) { resp, err := m.Client.PullMessageBySeqs(ctx, req) return resp, err