From 7d8886a364972626b1b5f2b847f61a9ad4907d0c Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Fri, 19 Apr 2024 16:25:41 +0800 Subject: [PATCH 1/2] fix: update message. --- config/openim-rpc-msg.yml | 4 +-- internal/rpc/msg/message_interceptor.go | 42 ------------------------- internal/rpc/msg/send.go | 5 --- internal/rpc/msg/server.go | 3 +- internal/rpc/msg/utils.go | 20 ------------ pkg/common/config/config.go | 6 ++-- 6 files changed, 5 insertions(+), 75 deletions(-) delete mode 100644 internal/rpc/msg/message_interceptor.go diff --git a/config/openim-rpc-msg.yml b/config/openim-rpc-msg.yml index 86d1dd7e2..de5e2324d 100644 --- a/config/openim-rpc-msg.yml +++ b/config/openim-rpc-msg.yml @@ -10,7 +10,5 @@ prometheus: #发消息是否需要好友验证 friendVerify: false -# -groupMessageHasReadReceiptEnable: true -singleMessageHasReadReceiptEnable: true + diff --git a/internal/rpc/msg/message_interceptor.go b/internal/rpc/msg/message_interceptor.go deleted file mode 100644 index 0b0566029..000000000 --- a/internal/rpc/msg/message_interceptor.go +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package msg - -import ( - "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" - - "github.com/openimsdk/protocol/constant" - "github.com/openimsdk/protocol/msg" - "github.com/openimsdk/protocol/sdkws" -) - -type MessageInterceptorFunc func(ctx context.Context, globalConfig *Config, req *msg.SendMsgReq) (*sdkws.MsgData, error) - -func MessageHasReadEnabled(ctx context.Context, config *Config, req *msg.SendMsgReq) (*sdkws.MsgData, error) { - switch { - case req.MsgData.ContentType == constant.HasReadReceipt && req.MsgData.SessionType == constant.SingleChatType: - if !config.RpcConfig.SingleMessageHasReadReceiptEnable { - return nil, servererrs.ErrMessageHasReadDisable.Wrap() - } - return req.MsgData, nil - case req.MsgData.ContentType == constant.HasReadReceipt && req.MsgData.SessionType == constant.SuperGroupChatType: - if !config.RpcConfig.GroupMessageHasReadReceiptEnable { - return nil, servererrs.ErrMessageHasReadDisable.Wrap() - } - return req.MsgData, nil - } - return req.MsgData, nil -} diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index 03030f593..5514788d8 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -18,7 +18,6 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" - "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil" "github.com/openimsdk/protocol/constant" @@ -35,10 +34,6 @@ import ( func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.SendMsgResp, error) { if req.MsgData != nil { - flag := isMessageHasReadEnabled(req.MsgData, m.config) - if !flag { - return nil, servererrs.ErrMessageHasReadDisable.Wrap() - } m.encapsulateMsgData(req.MsgData) switch req.MsgData.SessionType { case constant.SingleChatType: diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 794fbec72..2a0325391 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -18,6 +18,7 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" + "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" @@ -33,6 +34,7 @@ import ( "google.golang.org/grpc" ) +type MessageInterceptorFunc func(ctx context.Context, globalConfig *Config, req *msg.SendMsgReq) (*sdkws.MsgData, error) type ( // MessageInterceptorChain defines a chain of message interceptor functions. MessageInterceptorChain []MessageInterceptorFunc @@ -107,7 +109,6 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg } s.notificationSender = rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithLocalSendMsg(s.SendMsg)) - s.addInterceptorHandler(MessageHasReadEnabled) msg.RegisterMsgServer(server, s) return nil } diff --git a/internal/rpc/msg/utils.go b/internal/rpc/msg/utils.go index 15ebd6274..69b4d0bf6 100644 --- a/internal/rpc/msg/utils.go +++ b/internal/rpc/msg/utils.go @@ -15,31 +15,11 @@ package msg import ( - "github.com/openimsdk/protocol/constant" - "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" "github.com/redis/go-redis/v9" "go.mongodb.org/mongo-driver/mongo" ) -func isMessageHasReadEnabled(msgData *sdkws.MsgData, config *Config) bool { - switch { - case msgData.ContentType == constant.HasReadReceipt && msgData.SessionType == constant.SingleChatType: - if config.RpcConfig.SingleMessageHasReadReceiptEnable { - return true - } else { - return false - } - case msgData.ContentType == constant.HasReadReceipt && msgData.SessionType == constant.SuperGroupChatType: - if config.RpcConfig.GroupMessageHasReadReceiptEnable { - return true - } else { - return false - } - } - return true -} - func IsNotFound(err error) bool { switch errs.Unwrap(err) { case redis.Nil, mongo.ErrNoDocuments: diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 002f6113e..6ad3d9994 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -267,10 +267,8 @@ type Msg struct { ListenIP string `mapstructure:"listenIP"` Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` - Prometheus Prometheus `mapstructure:"prometheus"` - FriendVerify bool `mapstructure:"friendVerify"` - GroupMessageHasReadReceiptEnable bool `mapstructure:"groupMessageHasReadReceiptEnable"` - SingleMessageHasReadReceiptEnable bool `mapstructure:"singleMessageHasReadReceiptEnable"` + Prometheus Prometheus `mapstructure:"prometheus"` + FriendVerify bool `mapstructure:"friendVerify"` } type Third struct { From 7969f56d44d121d10ecf7d40d6168be33bca50f0 Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Fri, 19 Apr 2024 16:32:42 +0800 Subject: [PATCH 2/2] fix: msg cache timeout. --- config/openim-msgtransfer.yml | 2 -- internal/msgtransfer/init.go | 2 +- internal/rpc/msg/server.go | 2 +- pkg/common/config/config.go | 3 +-- pkg/common/db/cache/msg.go | 4 +++- 5 files changed, 6 insertions(+), 7 deletions(-) diff --git a/config/openim-msgtransfer.yml b/config/openim-msgtransfer.yml index a92890089..9cb1598b5 100644 --- a/config/openim-msgtransfer.yml +++ b/config/openim-msgtransfer.yml @@ -1,5 +1,3 @@ prometheus: enable: true ports: [ 20108, 20109, 20110, 20111 ] - -msgCacheTimeout: 86400 \ No newline at end of file diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index a1f10b3b3..685e847b8 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -88,7 +88,7 @@ func Start(ctx context.Context, index int, config *Config) error { client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) //todo MsgCacheTimeout - msgModel := cache.NewMsgCache(rdb, 86400, config.RedisConfig.EnablePipeline) + msgModel := cache.NewMsgCache(rdb, config.RedisConfig.EnablePipeline) seqModel := cache.NewSeqCache(rdb) msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB()) if err != nil { diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 2a0325391..81720d674 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -86,7 +86,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg return err } //todo MsgCacheTimeout - msgModel := cache.NewMsgCache(rdb, 86400, config.RedisConfig.EnablePipeline) + msgModel := cache.NewMsgCache(rdb, config.RedisConfig.EnablePipeline) seqModel := cache.NewSeqCache(rdb) conversationClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation) userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 6ad3d9994..19edccbab 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -185,8 +185,7 @@ type MsgGateway struct { } type MsgTransfer struct { - Prometheus Prometheus `mapstructure:"prometheus"` - MsgCacheTimeout int `mapstructure:"msgCacheTimeout"` + Prometheus Prometheus `mapstructure:"prometheus"` } type Push struct { diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index 56ea2dd8b..94307343f 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -31,6 +31,8 @@ import ( "golang.org/x/sync/errgroup" ) +const msgCacheTimeout = 86400 + const ( maxSeq = "MAX_SEQ:" minSeq = "MIN_SEQ:" @@ -82,7 +84,7 @@ type MsgCache interface { // return &msgCache{rdb: client, msgCacheTimeout: msgCacheTimeout, redisConf: redisConf} //} -func NewMsgCache(client redis.UniversalClient, msgCacheTimeout time.Duration, redisEnablePipeline bool) MsgCache { +func NewMsgCache(client redis.UniversalClient, redisEnablePipeline bool) MsgCache { return &msgCache{rdb: client, msgCacheTimeout: msgCacheTimeout, redisEnablePipeline: redisEnablePipeline} }