This commit is contained in:
Gordon 2023-03-21 11:27:20 +08:00
parent c92e48dfa3
commit 1e8eb76013
3 changed files with 88 additions and 84 deletions

View File

@ -270,9 +270,10 @@ func (m *msgServer) modifyMessageByUserMessageReceiveOpt(ctx context.Context, us
} }
conversationID := utils.GetConversationIDBySessionType(sourceID, sessionType) conversationID := utils.GetConversationIDBySessionType(sourceID, sessionType)
singleOpt, err := m.Conversation.GetSingleConversationRecvMsgOpt(ctx, userID, conversationID) singleOpt, err := m.Conversation.GetSingleConversationRecvMsgOpt(ctx, userID, conversationID)
if err != nil { //if err != nil {
return false, err // return false, err
} //}
return true, nil
switch singleOpt { switch singleOpt {
case constant.ReceiveMessage: case constant.ReceiveMessage:
return true, nil return true, nil

View File

@ -4,13 +4,11 @@ import (
"context" "context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
promePkg "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome" promePkg "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
pbConversation "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation" pbConversation "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/go-redis/redis/v8"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"sync" "sync"
) )
@ -233,82 +231,3 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *msg.SendMsgReq) (
resp.ClientMsgID = msgToMQSingle.MsgData.ClientMsgID resp.ClientMsgID = msgToMQSingle.MsgData.ClientMsgID
return resp, nil return resp, nil
} }
func (m *msgServer) SendMsg(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, error error) {
resp = &msg.SendMsgResp{}
flag := isMessageHasReadEnabled(req.MsgData)
if !flag {
return nil, errs.ErrMessageHasReadDisable.Wrap()
}
m.encapsulateMsgData(req.MsgData)
if err := CallbackMsgModify(ctx, req); err != nil && err != errs.ErrCallbackContinue {
return nil, err
}
switch req.MsgData.SessionType {
case constant.SingleChatType:
return m.sendMsgSingleChat(ctx, req)
case constant.GroupChatType:
return m.sendMsgGroupChat(ctx, req)
case constant.NotificationChatType:
return m.sendMsgNotification(ctx, req)
case constant.SuperGroupChatType:
return m.sendMsgSuperGroupChat(ctx, req)
default:
return nil, errs.ErrArgs.Wrap("unknown sessionType")
}
}
func (m *msgServer) GetMaxAndMinSeq(ctx context.Context, req *sdkws.GetMaxAndMinSeqReq) (*sdkws.GetMaxAndMinSeqResp, error) {
if err := tokenverify.CheckAccessV3(ctx, req.UserID); err != nil {
return nil, err
}
resp := new(sdkws.GetMaxAndMinSeqResp)
m2 := make(map[string]*sdkws.MaxAndMinSeq)
maxSeq, err := m.MsgDatabase.GetUserMaxSeq(ctx, req.UserID)
if err != nil && errs.Unwrap(err) != redis.Nil {
return nil, err
}
minSeq, err := m.MsgDatabase.GetUserMinSeq(ctx, req.UserID)
if err != nil && errs.Unwrap(err) != redis.Nil {
return nil, err
}
resp.MaxSeq = maxSeq
resp.MinSeq = minSeq
if len(req.GroupIDs) > 0 {
for _, groupID := range req.GroupIDs {
maxSeq, err := m.MsgDatabase.GetGroupMaxSeq(ctx, groupID)
if err != nil && errs.Unwrap(err) != redis.Nil {
return nil, err
}
minSeq, err := m.MsgDatabase.GetGroupMinSeq(ctx, groupID)
if err != nil && errs.Unwrap(err) != redis.Nil {
return nil, err
}
m2[groupID] = &sdkws.MaxAndMinSeq{
MaxSeq: maxSeq,
MinSeq: minSeq,
}
}
}
resp.GroupMaxAndMinSeq = m2
return resp, nil
}
func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessageBySeqsReq) (*sdkws.PullMessageBySeqsResp, error) {
resp := &sdkws.PullMessageBySeqsResp{GroupMsgDataList: make(map[string]*sdkws.MsgDataList)}
msgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, req.UserID, req.Seqs)
if err != nil {
return nil, err
}
resp.List = msgs
for groupID, list := range req.GroupSeqs {
msgs, err := m.MsgDatabase.GetSuperGroupMsgBySeqs(ctx, groupID, list.Seqs)
if err != nil {
return nil, err
}
resp.GroupMsgDataList[groupID] = &sdkws.MsgDataList{
MsgDataList: msgs,
}
}
return resp, nil
}

View File

@ -1,14 +1,20 @@
package msg package msg
import ( import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/localcache" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/localcache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify"
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/check" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/check"
"github.com/go-redis/redis/v8"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@ -74,3 +80,81 @@ func (m *msgServer) initPrometheus() {
prome.NewWorkSuperGroupChatMsgProcessSuccessCounter() prome.NewWorkSuperGroupChatMsgProcessSuccessCounter()
prome.NewWorkSuperGroupChatMsgProcessFailedCounter() prome.NewWorkSuperGroupChatMsgProcessFailedCounter()
} }
func (m *msgServer) SendMsg(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, error error) {
resp = &msg.SendMsgResp{}
flag := isMessageHasReadEnabled(req.MsgData)
if !flag {
return nil, errs.ErrMessageHasReadDisable.Wrap()
}
m.encapsulateMsgData(req.MsgData)
if err := CallbackMsgModify(ctx, req); err != nil && err != errs.ErrCallbackContinue {
return nil, err
}
switch req.MsgData.SessionType {
case constant.SingleChatType:
return m.sendMsgSingleChat(ctx, req)
case constant.GroupChatType:
return m.sendMsgGroupChat(ctx, req)
case constant.NotificationChatType:
return m.sendMsgNotification(ctx, req)
case constant.SuperGroupChatType:
return m.sendMsgSuperGroupChat(ctx, req)
default:
return nil, errs.ErrArgs.Wrap("unknown sessionType")
}
}
func (m *msgServer) GetMaxAndMinSeq(ctx context.Context, req *sdkws.GetMaxAndMinSeqReq) (*sdkws.GetMaxAndMinSeqResp, error) {
if err := tokenverify.CheckAccessV3(ctx, req.UserID); err != nil {
return nil, err
}
resp := new(sdkws.GetMaxAndMinSeqResp)
m2 := make(map[string]*sdkws.MaxAndMinSeq)
maxSeq, err := m.MsgDatabase.GetUserMaxSeq(ctx, req.UserID)
if err != nil && errs.Unwrap(err) != redis.Nil {
return nil, err
}
minSeq, err := m.MsgDatabase.GetUserMinSeq(ctx, req.UserID)
if err != nil && errs.Unwrap(err) != redis.Nil {
return nil, err
}
resp.MaxSeq = maxSeq
resp.MinSeq = minSeq
if len(req.GroupIDs) > 0 {
for _, groupID := range req.GroupIDs {
maxSeq, err := m.MsgDatabase.GetGroupMaxSeq(ctx, groupID)
if err != nil && errs.Unwrap(err) != redis.Nil {
return nil, err
}
minSeq, err := m.MsgDatabase.GetGroupMinSeq(ctx, groupID)
if err != nil && errs.Unwrap(err) != redis.Nil {
return nil, err
}
m2[groupID] = &sdkws.MaxAndMinSeq{
MaxSeq: maxSeq,
MinSeq: minSeq,
}
}
}
resp.GroupMaxAndMinSeq = m2
return resp, nil
}
func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessageBySeqsReq) (*sdkws.PullMessageBySeqsResp, error) {
resp := &sdkws.PullMessageBySeqsResp{GroupMsgDataList: make(map[string]*sdkws.MsgDataList)}
msgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, req.UserID, req.Seqs)
if err != nil {
return nil, err
}
resp.List = msgs
for groupID, list := range req.GroupSeqs {
msgs, err := m.MsgDatabase.GetSuperGroupMsgBySeqs(ctx, groupID, list.Seqs)
if err != nil {
return nil, err
}
resp.GroupMsgDataList[groupID] = &sdkws.MsgDataList{
MsgDataList: msgs,
}
}
return resp, nil
}