From 8c10e4f5328f9af4647b56432b7d6feba810bc51 Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Sun, 7 Apr 2024 21:23:36 +0800 Subject: [PATCH] refactor: third update --- cmd/openim-rpc/openim-rpc-msg/main.go | 6 +-- cmd/openim-rpc/openim-rpc-third/main.go | 6 +-- internal/rpc/msg/server.go | 25 ++++----- internal/rpc/third/third.go | 32 +++++------ pkg/common/cmd/constant.go | 4 ++ pkg/common/cmd/friend.go | 2 +- pkg/common/cmd/group.go | 2 +- pkg/common/cmd/msg.go | 72 +++++++++++++++++++++++++ pkg/common/cmd/third.go | 70 ++++++++++++++++++++++++ pkg/common/config/config.go | 5 ++ 10 files changed, 184 insertions(+), 40 deletions(-) create mode 100644 pkg/common/cmd/msg.go create mode 100644 pkg/common/cmd/third.go diff --git a/cmd/openim-rpc/openim-rpc-msg/main.go b/cmd/openim-rpc/openim-rpc-msg/main.go index e0d3b2d63..37f6cf237 100644 --- a/cmd/openim-rpc/openim-rpc-msg/main.go +++ b/cmd/openim-rpc/openim-rpc-msg/main.go @@ -15,16 +15,12 @@ package main import ( - "github.com/openimsdk/open-im-server/v3/internal/rpc/msg" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/tools/system/program" ) func main() { - rpcCmd := cmd.NewRpcCmd(cmd.RpcMsgServer, msg.Start) - rpcCmd.AddPortFlag() - rpcCmd.AddPrometheusPortFlag() - if err := rpcCmd.Exec(); err != nil { + if err := cmd.NewMsgRpcCmd().Exec(); err != nil { program.ExitWithError(err) } } diff --git a/cmd/openim-rpc/openim-rpc-third/main.go b/cmd/openim-rpc/openim-rpc-third/main.go index 6e77a6081..fcead5f89 100644 --- a/cmd/openim-rpc/openim-rpc-third/main.go +++ b/cmd/openim-rpc/openim-rpc-third/main.go @@ -15,16 +15,12 @@ package main import ( - "github.com/openimsdk/open-im-server/v3/internal/rpc/third" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/tools/system/program" ) func main() { - rpcCmd := cmd.NewRpcCmd(cmd.RpcThirdServer, third.Start) - rpcCmd.AddPortFlag() - rpcCmd.AddPrometheusPortFlag() - if err := rpcCmd.Exec(); err != nil { + if err := cmd.NewThirdRpcCmd().Exec(); err != nil { program.ExitWithError(err) } } diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 0b6a24644..894216a6f 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -16,10 +16,10 @@ package msg import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo" @@ -47,20 +47,21 @@ type ( ConversationLocalCache *rpccache.ConversationLocalCache // Local cache for conversation data. Handlers MessageInterceptorChain // Chain of handlers for processing messages. notificationSender *rpcclient.NotificationSender // RPC client for sending notifications. - config *config.GlobalConfig // Global configuration settings. + config *cmd.MsgConfig // Global configuration settings. } ) func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorFunc) { - m.Handlers = append(m.Handlers, interceptorFunc...) + m.Handlers = append(m.Handlers, interceptorFunc... + } -func Start(ctx context.Context, config *config.GlobalConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { - mgocli, err := mongoutil.NewMongoDB(ctx, config.Mongo.Build()) +func Start(ctx context.Context, config *cmd.MsgConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { + mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) if err != nil { return err } - rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build()) + rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) if err != nil { return err } @@ -71,11 +72,11 @@ func Start(ctx context.Context, config *config.GlobalConfig, client discovery.Sv } msgModel := cache.NewMsgCache(rdb, config.MsgCacheTimeout, &config.Redis) seqModel := cache.NewSeqCache(rdb) - conversationClient := rpcclient.NewConversationRpcClient(client, config.RpcRegisterName.OpenImConversationName) - userRpcClient := rpcclient.NewUserRpcClient(client, config.RpcRegisterName.OpenImUserName, &config.Manager, &config.IMAdmin) - groupRpcClient := rpcclient.NewGroupRpcClient(client, config.RpcRegisterName.OpenImGroupName) - friendRpcClient := rpcclient.NewFriendRpcClient(client, config.RpcRegisterName.OpenImFriendName) - msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqModel, &config.Kafka) + conversationClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation) + userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, &config.Share.IMAdmin) + groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) + friendRpcClient := rpcclient.NewFriendRpcClient(client, config.Share.RpcRegisterName.Friend) + msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqModel, &config.KafkaConfig) if err != nil { return err } @@ -90,7 +91,7 @@ func Start(ctx context.Context, config *config.GlobalConfig, client discovery.Sv config: config, } - s.notificationSender = rpcclient.NewNotificationSender(&config.Notification, rpcclient.WithLocalSendMsg(s.SendMsg)) + s.notificationSender = rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithLocalSendMsg(s.SendMsg)) s.addInterceptorHandler(MessageHasReadEnabled) msg.RegisterMsgServer(server, s) return nil diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index 85c5f1f0f..4db71020a 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -17,10 +17,10 @@ package third import ( "context" "fmt" + "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "net/url" "time" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo" @@ -37,12 +37,21 @@ import ( "google.golang.org/grpc" ) -func Start(ctx context.Context, config *config.GlobalConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { - mgocli, err := mongoutil.NewMongoDB(ctx, config.Mongo.Build()) +type thirdServer struct { + apiURL string + thirdDatabase controller.ThirdDatabase + s3dataBase controller.S3Database + userRpcClient rpcclient.UserRpcClient + defaultExpire time.Duration + config *cmd.ThirdConfig +} + +func Start(ctx context.Context, config *cmd.ThirdConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { + mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) if err != nil { return err } - rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build()) + rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) if err != nil { return err } @@ -54,11 +63,11 @@ func Start(ctx context.Context, config *config.GlobalConfig, client discovery.Sv if err != nil { return err } - apiURL := config.Object.ApiURL + apiURL := config.MinioConfig.URL if apiURL == "" { return errs.Wrap(fmt.Errorf("api is empty")) } - if _, err := url.Parse(config.Object.ApiURL); err != nil { + if _, err := url.Parse(config.MinioConfig.URL); err != nil { return err } if apiURL[len(apiURL)-1] != '/' { @@ -67,7 +76,7 @@ func Start(ctx context.Context, config *config.GlobalConfig, client discovery.Sv apiURL += "object/" // Select the oss method according to the profile policy - enable := config.Object.Enable + enable := config.RpcConfig.Object.Enable var o s3.Interface switch enable { case "minio": @@ -93,15 +102,6 @@ func Start(ctx context.Context, config *config.GlobalConfig, client discovery.Sv return nil } -type thirdServer struct { - apiURL string - thirdDatabase controller.ThirdDatabase - s3dataBase controller.S3Database - userRpcClient rpcclient.UserRpcClient - defaultExpire time.Duration - config *config.GlobalConfig -} - func (t *thirdServer) FcmUpdateToken(ctx context.Context, req *third.FcmUpdateTokenReq) (resp *third.FcmUpdateTokenResp, err error) { err = t.thirdDatabase.FcmUpdateToken(ctx, req.Account, int(req.PlatformID), req.FcmToken, req.ExpireTime) if err != nil { diff --git a/pkg/common/cmd/constant.go b/pkg/common/cmd/constant.go index 1df90768f..28836f3ec 100644 --- a/pkg/common/cmd/constant.go +++ b/pkg/common/cmd/constant.go @@ -60,11 +60,15 @@ const ( logEnvPrefix = "openim-log" redisEnvPrefix = "openim-redis" mongodbEnvPrefix = "openim-mongodb" + minioEnvPrefix = "openim-minio" + kafkaEnvPrefix = "openim-kafka" zoopkeeperEnvPrefix = "openim-zookeeper" authEnvPrefix = "openim-auth" conversationEnvPrefix = "openim-conversation" friendEnvPrefix = "openim-friend" groupEnvPrefix = "openim-group" + msgEnvPrefix = "openim-msg" + thridEnvPrefix = "openim-third" ) const ( diff --git a/pkg/common/cmd/friend.go b/pkg/common/cmd/friend.go index 456709b09..e22d4157b 100644 --- a/pkg/common/cmd/friend.go +++ b/pkg/common/cmd/friend.go @@ -30,7 +30,7 @@ type FriendRpcCmd struct { friendConfig FriendConfig } type FriendConfig struct { - RpcConfig config.Conversation + RpcConfig config.Friend RedisConfig config.Redis MongodbConfig config.Mongo ZookeeperConfig config.ZooKeeper diff --git a/pkg/common/cmd/group.go b/pkg/common/cmd/group.go index 946771c87..192fa23b1 100644 --- a/pkg/common/cmd/group.go +++ b/pkg/common/cmd/group.go @@ -30,7 +30,7 @@ type GroupRpcCmd struct { groupConfig GroupConfig } type GroupConfig struct { - RpcConfig config.Conversation + RpcConfig config.Group RedisConfig config.Redis MongodbConfig config.Mongo ZookeeperConfig config.ZooKeeper diff --git a/pkg/common/cmd/msg.go b/pkg/common/cmd/msg.go new file mode 100644 index 000000000..e8592f214 --- /dev/null +++ b/pkg/common/cmd/msg.go @@ -0,0 +1,72 @@ +// Copyright © 2023 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/internal/rpc/msg" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" + "github.com/openimsdk/tools/system/program" + "github.com/spf13/cobra" +) + +type MsgRpcCmd struct { + *RootCmd + ctx context.Context + configMap map[string]StructEnvPrefix + msgConfig MsgConfig +} +type MsgConfig struct { + RpcConfig config.Msg + RedisConfig config.Redis + MongodbConfig config.Mongo + KafkaConfig config.Kafka + ZookeeperConfig config.ZooKeeper + NotificationConfig config.Notification + Share config.Share + WebhooksConfig config.Webhooks +} + +func NewMsgRpcCmd() *MsgRpcCmd { + var msgConfig MsgConfig + ret := &MsgRpcCmd{msgConfig: msgConfig} + ret.configMap = map[string]StructEnvPrefix{ + OpenIMRPCMsgCfgFileName: {EnvPrefix: msgEnvPrefix, ConfigStruct: &msgConfig.RpcConfig}, + RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &msgConfig.RedisConfig}, + ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &msgConfig.ZookeeperConfig}, + MongodbConfigFileName: {EnvPrefix: mongodbEnvPrefix, ConfigStruct: &msgConfig.MongodbConfig}, + KafkaConfigFileName: {EnvPrefix: kafkaEnvPrefix, ConfigStruct: &msgConfig.KafkaConfig}, + ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &msgConfig.Share}, + NotificationFileName: {EnvPrefix: notificationEnvPrefix, ConfigStruct: &msgConfig.NotificationConfig}, + WebhooksConfigFileName: {EnvPrefix: webhooksEnvPrefix, ConfigStruct: &msgConfig.WebhooksConfig}, + } + ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) + ret.ctx = context.WithValue(context.Background(), "version", config.Version) + ret.Command.PreRunE = func(cmd *cobra.Command, args []string) error { + return ret.preRunE() + } + return ret +} + +func (a *MsgRpcCmd) Exec() error { + return a.Execute() +} + +func (a *MsgRpcCmd) preRunE() error { + return startrpc.Start(a.ctx, &a.msgConfig.ZookeeperConfig, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, + a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.Ports, + a.Index(), a.msgConfig.Share.RpcRegisterName.Auth, &a.msgConfig, msg.Start) +} diff --git a/pkg/common/cmd/third.go b/pkg/common/cmd/third.go new file mode 100644 index 000000000..b6728d3c0 --- /dev/null +++ b/pkg/common/cmd/third.go @@ -0,0 +1,70 @@ +// Copyright © 2023 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/internal/rpc/third" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" + "github.com/openimsdk/tools/system/program" + "github.com/spf13/cobra" +) + +type ThirdRpcCmd struct { + *RootCmd + ctx context.Context + configMap map[string]StructEnvPrefix + thirdConfig ThirdConfig +} +type ThirdConfig struct { + RpcConfig config.Third + RedisConfig config.Redis + MongodbConfig config.Mongo + ZookeeperConfig config.ZooKeeper + NotificationConfig config.Notification + Share config.Share + MinioConfig config.Minio +} + +func NewThirdRpcCmd() *ThirdRpcCmd { + var thirdConfig ThirdConfig + ret := &ThirdRpcCmd{thirdConfig: thirdConfig} + ret.configMap = map[string]StructEnvPrefix{ + OpenIMRPCThirdCfgFileName: {EnvPrefix: thridEnvPrefix, ConfigStruct: &thirdConfig.RpcConfig}, + RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &thirdConfig.RedisConfig}, + ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &thirdConfig.ZookeeperConfig}, + MongodbConfigFileName: {EnvPrefix: mongodbEnvPrefix, ConfigStruct: &thirdConfig.MongodbConfig}, + ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &thirdConfig.Share}, + NotificationFileName: {EnvPrefix: notificationEnvPrefix, ConfigStruct: &thirdConfig.NotificationConfig}, + MinioConfigFileName: {EnvPrefix: minioEnvPrefix, ConfigStruct: &thirdConfig.MinioConfig}, + } + ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) + ret.ctx = context.WithValue(context.Background(), "version", config.Version) + ret.Command.PreRunE = func(cmd *cobra.Command, args []string) error { + return ret.preRunE() + } + return ret +} + +func (a *ThirdRpcCmd) Exec() error { + return a.Execute() +} + +func (a *ThirdRpcCmd) preRunE() error { + return startrpc.Start(a.ctx, &a.thirdConfig.ZookeeperConfig, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, + a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.Ports, + a.Index(), a.thirdConfig.Share.RpcRegisterName.Auth, &a.thirdConfig, third.Start) +} diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 855464d18..86f592db2 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -17,6 +17,7 @@ package config import ( "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" + "github.com/openimsdk/tools/mq/kafka" "time" ) @@ -420,6 +421,10 @@ func (r *Redis) Build() *redisutil.Config { } } +func (k *Kafka) Build() *kafka.Config { + return &kafka.Config{} +} + func (l *CacheConfig) Failed() time.Duration { return time.Second * time.Duration(l.FailedExpire) }