diff --git a/config/discovery.yml b/config/discovery.yml index 2cc54be72..3d96ff9b6 100644 --- a/config/discovery.yml +++ b/config/discovery.yml @@ -1,16 +1,13 @@ - enable: "etcd" etcd: + rootDirectory: openim address: [ localhost:12379 ] username: '' password: '' - - zookeeper: schema: openim address: [ localhost:12181 ] username: '' password: '' - diff --git a/config/share.yml b/config/share.yml index 2abbb77a0..fc97b6a1f 100644 --- a/config/share.yml +++ b/config/share.yml @@ -1,5 +1,4 @@ secret: openIM123 -env: zookeeper rpcRegisterName: user: user friend: friend diff --git a/internal/api/init.go b/internal/api/init.go index 6e784da9a..b49a14569 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -38,20 +38,17 @@ import ( ) type Config struct { - RpcConfig config.API - MongodbConfig config.Mongo - ZookeeperConfig config.ZooKeeper - NotificationConfig config.Notification - Share config.Share - MinioConfig config.Minio + API config.API + Share config.Share + Discovery config.Discovery } func Start(ctx context.Context, index int, config *Config) error { - apiPort, err := datautil.GetElemByIndex(config.RpcConfig.Api.Ports, index) + apiPort, err := datautil.GetElemByIndex(config.API.Api.Ports, index) if err != nil { return err } - prometheusPort, err := datautil.GetElemByIndex(config.RpcConfig.Prometheus.Ports, index) + prometheusPort, err := datautil.GetElemByIndex(config.API.Prometheus.Ports, index) if err != nil { return err } @@ -59,7 +56,7 @@ func Start(ctx context.Context, index int, config *Config) error { var client discovery.SvcDiscoveryRegistry // Determine whether zk is passed according to whether it is a clustered deployment - client, err = kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share) + client, err = kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share) if err != nil { return errs.WrapMsg(err, "failed to register discovery service") } @@ -70,7 +67,7 @@ func Start(ctx context.Context, index int, config *Config) error { ) router := newGinRouter(client, config) - if config.RpcConfig.Prometheus.Enable { + if config.API.Prometheus.Enable { go func() { p := ginprom.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api")) p.SetListenAddress(fmt.Sprintf(":%d", prometheusPort)) @@ -81,7 +78,7 @@ func Start(ctx context.Context, index int, config *Config) error { }() } - address := net.JoinHostPort(network.GetListenIP(config.RpcConfig.Api.ListenIP), strconv.Itoa(apiPort)) + address := net.JoinHostPort(network.GetListenIP(config.API.Api.ListenIP), strconv.Itoa(apiPort)) server := http.Server{Addr: address, Handler: router} log.CInfo(ctx, "API server is initializing", "address", address, "apiPort", apiPort, "prometheusPort", prometheusPort) diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index bfe81b602..f9bb699ed 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -35,7 +35,7 @@ func (s *Server) InitServer(ctx context.Context, config *Config, disCov discover } func (s *Server) Start(ctx context.Context, index int, conf *Config) error { - return startrpc.Start(ctx, &conf.ZookeeperConfig, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP, + return startrpc.Start(ctx, &conf.Discovery, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP, conf.MsgGateway.RPC.RegisterIP, conf.MsgGateway.RPC.Ports, index, conf.Share.RpcRegisterName.MessageGateway, diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 727ade0af..ef24d1bf9 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -24,10 +24,10 @@ import ( ) type Config struct { - MsgGateway config.MsgGateway - ZookeeperConfig config.ZooKeeper - Share config.Share - WebhooksConfig config.Webhooks + MsgGateway config.MsgGateway + Share config.Share + WebhooksConfig config.Webhooks + Discovery config.Discovery } // Start run ws server. diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 68d953e90..8f72e979d 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -63,6 +63,7 @@ type Config struct { ZookeeperConfig config.ZooKeeper Share config.Share WebhooksConfig config.Webhooks + Discovery config.Discovery } func Start(ctx context.Context, index int, config *Config) error { @@ -76,7 +77,7 @@ func Start(ctx context.Context, index int, config *Config) error { if err != nil { return err } - client, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share) + client, err := kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share) if err != nil { return err } diff --git a/internal/push/push.go b/internal/push/push.go index 18012a864..2e5c4e526 100644 --- a/internal/push/push.go +++ b/internal/push/push.go @@ -24,11 +24,11 @@ type Config struct { RedisConfig config.Redis MongodbConfig config.Mongo KafkaConfig config.Kafka - ZookeeperConfig config.ZooKeeper NotificationConfig config.Notification Share config.Share WebhooksConfig config.Webhooks LocalCacheConfig config.LocalCache + Discovery config.Discovery } func (p pushServer) PushMsg(ctx context.Context, req *pbpush.PushMsgReq) (*pbpush.PushMsgResp, error) { diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index c6d236b21..ddb655398 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -45,10 +45,10 @@ type authServer struct { } type Config struct { - RpcConfig config.Auth - RedisConfig config.Redis - ZookeeperConfig config.ZooKeeper - Share config.Share + RpcConfig config.Auth + RedisConfig config.Redis + Share config.Share + Discovery config.Discovery } func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 96d2a403f..ec7522212 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -48,13 +48,14 @@ type conversationServer struct { } type Config struct { - RpcConfig config.Conversation - RedisConfig config.Redis - MongodbConfig config.Mongo - ZookeeperConfig config.ZooKeeper + RpcConfig config.Conversation + RedisConfig config.Redis + MongodbConfig config.Mongo + // ZookeeperConfig config.ZooKeeper NotificationConfig config.Notification Share config.Share LocalCacheConfig config.LocalCache + Discovery config.Discovery } func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index bffda3c04..b49490f26 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -50,14 +50,15 @@ type friendServer struct { } type Config struct { - RpcConfig config.Friend - RedisConfig config.Redis - MongodbConfig config.Mongo - ZookeeperConfig config.ZooKeeper + RpcConfig config.Friend + RedisConfig config.Redis + MongodbConfig config.Mongo + //ZookeeperConfig config.ZooKeeper NotificationConfig config.Notification Share config.Share WebhooksConfig config.Webhooks LocalCacheConfig config.LocalCache + Discovery config.Discovery } func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 13bd7f9be..551554c23 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -68,11 +68,11 @@ type Config struct { RpcConfig config.Group RedisConfig config.Redis MongodbConfig config.Mongo - ZookeeperConfig config.ZooKeeper NotificationConfig config.Notification Share config.Share WebhooksConfig config.Webhooks LocalCacheConfig config.LocalCache + Discovery config.Discovery } func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 3f4df8d4b..5d7c0b297 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -59,11 +59,11 @@ type ( RedisConfig config.Redis MongodbConfig config.Mongo KafkaConfig config.Kafka - ZookeeperConfig config.ZooKeeper NotificationConfig config.Notification Share config.Share WebhooksConfig config.Webhooks LocalCacheConfig config.LocalCache + Discovery config.Discovery } ) diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index 9bf8cafa9..a3d9085d3 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -46,11 +46,11 @@ type Config struct { RpcConfig config.Third RedisConfig config.Redis MongodbConfig config.Mongo - ZookeeperConfig config.ZooKeeper NotificationConfig config.Notification Share config.Share MinioConfig config.Minio LocalCacheConfig config.LocalCache + Discovery config.Discovery } func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index c453ac9f8..a28fa24e2 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -61,11 +61,11 @@ type Config struct { RedisConfig config.Redis MongodbConfig config.Mongo KafkaConfig config.Kafka - ZookeeperConfig config.ZooKeeper NotificationConfig config.Notification Share config.Share WebhooksConfig config.Webhooks LocalCacheConfig config.LocalCache + Discovery config.Discovery } func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegistry, server *grpc.Server) error { diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 20baeffaf..bf037b694 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -33,9 +33,9 @@ import ( ) type CronTaskConfig struct { - CronTask config.CronTask - ZookeeperConfig config.ZooKeeper - Share config.Share + CronTask config.CronTask + Share config.Share + Discovery config.Discovery } func Start(ctx context.Context, config *CronTaskConfig) error { @@ -43,7 +43,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error { if config.CronTask.RetainChatRecords < 1 { return errs.New("msg destruct time must be greater than 1").Wrap() } - client, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share) + client, err := kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share) if err != nil { return errs.WrapMsg(err, "failed to register discovery service") } diff --git a/pkg/common/cmd/api.go b/pkg/common/cmd/api.go index 022fb1097..ecdb0dd3a 100644 --- a/pkg/common/cmd/api.go +++ b/pkg/common/cmd/api.go @@ -33,9 +33,9 @@ func NewApiCmd() *ApiCmd { var apiConfig api.Config ret := &ApiCmd{apiConfig: &apiConfig} ret.configMap = map[string]any{ - OpenIMAPICfgFileName: &apiConfig.RpcConfig, - ZookeeperConfigFileName: &apiConfig.ZookeeperConfig, + OpenIMAPICfgFileName: &apiConfig.API, ShareFileName: &apiConfig.Share, + DiscoveryConfigFilename: &apiConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) diff --git a/pkg/common/cmd/auth.go b/pkg/common/cmd/auth.go index 5ed02ffd0..7d75a7da6 100644 --- a/pkg/common/cmd/auth.go +++ b/pkg/common/cmd/auth.go @@ -36,8 +36,8 @@ func NewAuthRpcCmd() *AuthRpcCmd { ret.configMap = map[string]any{ OpenIMRPCAuthCfgFileName: &authConfig.RpcConfig, RedisConfigFileName: &authConfig.RedisConfig, - ZookeeperConfigFileName: &authConfig.ZookeeperConfig, ShareFileName: &authConfig.Share, + DiscoveryConfigFilename: &authConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) @@ -53,7 +53,7 @@ func (a *AuthRpcCmd) Exec() error { } func (a *AuthRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.authConfig.ZookeeperConfig, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.Ports, a.Index(), a.authConfig.Share.RpcRegisterName.Auth, &a.authConfig.Share, a.authConfig, auth.Start) } diff --git a/pkg/common/cmd/constant.go b/pkg/common/cmd/constant.go index 55eb4a069..d7c7ee94e 100644 --- a/pkg/common/cmd/constant.go +++ b/pkg/common/cmd/constant.go @@ -42,6 +42,7 @@ var ( OpenIMRPCMsgCfgFileName string OpenIMRPCThirdCfgFileName string OpenIMRPCUserCfgFileName string + DiscoveryConfigFilename string ) var ConfigEnvPrefixMap map[string]string @@ -70,6 +71,7 @@ func init() { OpenIMRPCMsgCfgFileName = "openim-rpc-msg.yml" OpenIMRPCThirdCfgFileName = "openim-rpc-third.yml" OpenIMRPCUserCfgFileName = "openim-rpc-user.yml" + DiscoveryConfigFilename = "discovery.yml" ConfigEnvPrefixMap = make(map[string]string) fileNames := []string{ @@ -79,7 +81,7 @@ func init() { OpenIMAPICfgFileName, OpenIMCronTaskCfgFileName, OpenIMMsgGatewayCfgFileName, OpenIMMsgTransferCfgFileName, OpenIMPushCfgFileName, OpenIMRPCAuthCfgFileName, OpenIMRPCConversationCfgFileName, OpenIMRPCFriendCfgFileName, OpenIMRPCGroupCfgFileName, - OpenIMRPCMsgCfgFileName, OpenIMRPCThirdCfgFileName, OpenIMRPCUserCfgFileName, + OpenIMRPCMsgCfgFileName, OpenIMRPCThirdCfgFileName, OpenIMRPCUserCfgFileName, DiscoveryConfigFilename, } for _, fileName := range fileNames { diff --git a/pkg/common/cmd/conversation.go b/pkg/common/cmd/conversation.go index 0a617c729..57ffa52bc 100644 --- a/pkg/common/cmd/conversation.go +++ b/pkg/common/cmd/conversation.go @@ -36,11 +36,11 @@ func NewConversationRpcCmd() *ConversationRpcCmd { ret.configMap = map[string]any{ OpenIMRPCConversationCfgFileName: &conversationConfig.RpcConfig, RedisConfigFileName: &conversationConfig.RedisConfig, - ZookeeperConfigFileName: &conversationConfig.ZookeeperConfig, MongodbConfigFileName: &conversationConfig.MongodbConfig, ShareFileName: &conversationConfig.Share, NotificationFileName: &conversationConfig.NotificationConfig, LocalCacheConfigFileName: &conversationConfig.LocalCacheConfig, + DiscoveryConfigFilename: &conversationConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) @@ -55,7 +55,7 @@ func (a *ConversationRpcCmd) Exec() error { } func (a *ConversationRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.conversationConfig.ZookeeperConfig, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.Ports, a.Index(), a.conversationConfig.Share.RpcRegisterName.Conversation, &a.conversationConfig.Share, a.conversationConfig, conversation.Start) } diff --git a/pkg/common/cmd/cron_task.go b/pkg/common/cmd/cron_task.go index be26f5af3..fd4447524 100644 --- a/pkg/common/cmd/cron_task.go +++ b/pkg/common/cmd/cron_task.go @@ -34,8 +34,8 @@ func NewCronTaskCmd() *CronTaskCmd { ret := &CronTaskCmd{cronTaskConfig: &cronTaskConfig} ret.configMap = map[string]any{ OpenIMCronTaskCfgFileName: &cronTaskConfig.CronTask, - ZookeeperConfigFileName: &cronTaskConfig.ZookeeperConfig, ShareFileName: &cronTaskConfig.Share, + DiscoveryConfigFilename: &cronTaskConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) diff --git a/pkg/common/cmd/friend.go b/pkg/common/cmd/friend.go index b8d46f77e..8be1f7745 100644 --- a/pkg/common/cmd/friend.go +++ b/pkg/common/cmd/friend.go @@ -36,12 +36,12 @@ func NewFriendRpcCmd() *FriendRpcCmd { ret.configMap = map[string]any{ OpenIMRPCFriendCfgFileName: &friendConfig.RpcConfig, RedisConfigFileName: &friendConfig.RedisConfig, - ZookeeperConfigFileName: &friendConfig.ZookeeperConfig, MongodbConfigFileName: &friendConfig.MongodbConfig, ShareFileName: &friendConfig.Share, NotificationFileName: &friendConfig.NotificationConfig, WebhooksConfigFileName: &friendConfig.WebhooksConfig, LocalCacheConfigFileName: &friendConfig.LocalCacheConfig, + DiscoveryConfigFilename: &friendConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) @@ -56,7 +56,7 @@ func (a *FriendRpcCmd) Exec() error { } func (a *FriendRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.friendConfig.ZookeeperConfig, &a.friendConfig.RpcConfig.Prometheus, a.friendConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.friendConfig.Discovery, &a.friendConfig.RpcConfig.Prometheus, a.friendConfig.RpcConfig.RPC.ListenIP, a.friendConfig.RpcConfig.RPC.RegisterIP, a.friendConfig.RpcConfig.RPC.Ports, a.Index(), a.friendConfig.Share.RpcRegisterName.Friend, &a.friendConfig.Share, a.friendConfig, friend.Start) } diff --git a/pkg/common/cmd/group.go b/pkg/common/cmd/group.go index 8bf977824..f158b8c62 100644 --- a/pkg/common/cmd/group.go +++ b/pkg/common/cmd/group.go @@ -36,12 +36,12 @@ func NewGroupRpcCmd() *GroupRpcCmd { ret.configMap = map[string]any{ OpenIMRPCGroupCfgFileName: &groupConfig.RpcConfig, RedisConfigFileName: &groupConfig.RedisConfig, - ZookeeperConfigFileName: &groupConfig.ZookeeperConfig, MongodbConfigFileName: &groupConfig.MongodbConfig, ShareFileName: &groupConfig.Share, NotificationFileName: &groupConfig.NotificationConfig, WebhooksConfigFileName: &groupConfig.WebhooksConfig, LocalCacheConfigFileName: &groupConfig.LocalCacheConfig, + DiscoveryConfigFilename: &groupConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) @@ -56,7 +56,7 @@ func (a *GroupRpcCmd) Exec() error { } func (a *GroupRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.groupConfig.ZookeeperConfig, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.Ports, a.Index(), a.groupConfig.Share.RpcRegisterName.Group, &a.groupConfig.Share, a.groupConfig, group.Start) } diff --git a/pkg/common/cmd/msg.go b/pkg/common/cmd/msg.go index a3b521b4b..91f7931fb 100644 --- a/pkg/common/cmd/msg.go +++ b/pkg/common/cmd/msg.go @@ -36,13 +36,13 @@ func NewMsgRpcCmd() *MsgRpcCmd { ret.configMap = map[string]any{ OpenIMRPCMsgCfgFileName: &msgConfig.RpcConfig, RedisConfigFileName: &msgConfig.RedisConfig, - ZookeeperConfigFileName: &msgConfig.ZookeeperConfig, MongodbConfigFileName: &msgConfig.MongodbConfig, KafkaConfigFileName: &msgConfig.KafkaConfig, ShareFileName: &msgConfig.Share, NotificationFileName: &msgConfig.NotificationConfig, WebhooksConfigFileName: &msgConfig.WebhooksConfig, LocalCacheConfigFileName: &msgConfig.LocalCacheConfig, + DiscoveryConfigFilename: &msgConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) @@ -57,7 +57,7 @@ func (a *MsgRpcCmd) Exec() error { } func (a *MsgRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.msgConfig.ZookeeperConfig, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.Ports, a.Index(), a.msgConfig.Share.RpcRegisterName.Msg, &a.msgConfig.Share, a.msgConfig, msg.Start) } diff --git a/pkg/common/cmd/msg_gateway.go b/pkg/common/cmd/msg_gateway.go index 897fd7008..78004094c 100644 --- a/pkg/common/cmd/msg_gateway.go +++ b/pkg/common/cmd/msg_gateway.go @@ -36,9 +36,9 @@ func NewMsgGatewayCmd() *MsgGatewayCmd { ret := &MsgGatewayCmd{msgGatewayConfig: &msgGatewayConfig} ret.configMap = map[string]any{ OpenIMMsgGatewayCfgFileName: &msgGatewayConfig.MsgGateway, - ZookeeperConfigFileName: &msgGatewayConfig.ZookeeperConfig, ShareFileName: &msgGatewayConfig.Share, WebhooksConfigFileName: &msgGatewayConfig.WebhooksConfig, + DiscoveryConfigFilename: &msgGatewayConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) diff --git a/pkg/common/cmd/msg_transfer.go b/pkg/common/cmd/msg_transfer.go index 86f42dc56..99acc053c 100644 --- a/pkg/common/cmd/msg_transfer.go +++ b/pkg/common/cmd/msg_transfer.go @@ -40,6 +40,7 @@ func NewMsgTransferCmd() *MsgTransferCmd { ZookeeperConfigFileName: &msgTransferConfig.ZookeeperConfig, ShareFileName: &msgTransferConfig.Share, WebhooksConfigFileName: &msgTransferConfig.WebhooksConfig, + DiscoveryConfigFilename: &msgTransferConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) diff --git a/pkg/common/cmd/push.go b/pkg/common/cmd/push.go index 0140ced23..3e7c4c249 100644 --- a/pkg/common/cmd/push.go +++ b/pkg/common/cmd/push.go @@ -36,13 +36,13 @@ func NewPushRpcCmd() *PushRpcCmd { ret.configMap = map[string]any{ OpenIMPushCfgFileName: &pushConfig.RpcConfig, RedisConfigFileName: &pushConfig.RedisConfig, - ZookeeperConfigFileName: &pushConfig.ZookeeperConfig, MongodbConfigFileName: &pushConfig.MongodbConfig, KafkaConfigFileName: &pushConfig.KafkaConfig, ShareFileName: &pushConfig.Share, NotificationFileName: &pushConfig.NotificationConfig, WebhooksConfigFileName: &pushConfig.WebhooksConfig, LocalCacheConfigFileName: &pushConfig.LocalCacheConfig, + DiscoveryConfigFilename: &pushConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) @@ -57,7 +57,7 @@ func (a *PushRpcCmd) Exec() error { } func (a *PushRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.pushConfig.ZookeeperConfig, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.Ports, a.Index(), a.pushConfig.Share.RpcRegisterName.Push, &a.pushConfig.Share, a.pushConfig, push.Start) } diff --git a/pkg/common/cmd/third.go b/pkg/common/cmd/third.go index 0dfa7d5be..b6731f1ff 100644 --- a/pkg/common/cmd/third.go +++ b/pkg/common/cmd/third.go @@ -36,12 +36,12 @@ func NewThirdRpcCmd() *ThirdRpcCmd { ret.configMap = map[string]any{ OpenIMRPCThirdCfgFileName: &thirdConfig.RpcConfig, RedisConfigFileName: &thirdConfig.RedisConfig, - ZookeeperConfigFileName: &thirdConfig.ZookeeperConfig, MongodbConfigFileName: &thirdConfig.MongodbConfig, ShareFileName: &thirdConfig.Share, NotificationFileName: &thirdConfig.NotificationConfig, MinioConfigFileName: &thirdConfig.MinioConfig, LocalCacheConfigFileName: &thirdConfig.LocalCacheConfig, + DiscoveryConfigFilename: &thirdConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) @@ -56,7 +56,7 @@ func (a *ThirdRpcCmd) Exec() error { } func (a *ThirdRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.thirdConfig.ZookeeperConfig, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.Ports, a.Index(), a.thirdConfig.Share.RpcRegisterName.Third, &a.thirdConfig.Share, a.thirdConfig, third.Start) } diff --git a/pkg/common/cmd/user.go b/pkg/common/cmd/user.go index 315b93256..674f9e3a6 100644 --- a/pkg/common/cmd/user.go +++ b/pkg/common/cmd/user.go @@ -36,13 +36,13 @@ func NewUserRpcCmd() *UserRpcCmd { ret.configMap = map[string]any{ OpenIMRPCUserCfgFileName: &userConfig.RpcConfig, RedisConfigFileName: &userConfig.RedisConfig, - ZookeeperConfigFileName: &userConfig.ZookeeperConfig, MongodbConfigFileName: &userConfig.MongodbConfig, KafkaConfigFileName: &userConfig.KafkaConfig, ShareFileName: &userConfig.Share, NotificationFileName: &userConfig.NotificationConfig, WebhooksConfigFileName: &userConfig.WebhooksConfig, LocalCacheConfigFileName: &userConfig.LocalCacheConfig, + DiscoveryConfigFilename: &userConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) @@ -57,7 +57,7 @@ func (a *UserRpcCmd) Exec() error { } func (a *UserRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.userConfig.ZookeeperConfig, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.Ports, a.Index(), a.userConfig.Share.RpcRegisterName.User, &a.userConfig.Share, a.userConfig, user.Start) } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 24d04d8cc..12c4f7f78 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -345,7 +345,6 @@ type AfterConfig struct { type Share struct { Secret string `mapstructure:"secret"` - Env string `mapstructure:"env"` RpcRegisterName RpcRegisterName `mapstructure:"rpcRegisterName"` IMAdminUserID []string `mapstructure:"imAdminUserID"` } @@ -432,6 +431,19 @@ type ZooKeeper struct { Password string `mapstructure:"password"` } +type Discovery struct { + Enable string `mapstructure:"enable"` + Etcd Etcd `mapstructure:"etcd"` + ZooKeeper ZooKeeper `mapstructure:"zooKeeper"` +} + +type Etcd struct { + RootDirectory string `mapstructure:"rootDirectory"` + Address []string `mapstructure:"address"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` +} + func (m *Mongo) Build() *mongoutil.Config { return &mongoutil.Config{ Uri: m.URI, diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index dbf16eda0..1085ec1ea 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -24,33 +24,31 @@ import ( "time" ) -const ( - zookeeperConst = "zookeeper" - kubenetesConst = "k8s" - directConst = "direct" - etcdConst = "etcd" -) - // NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type. -func NewDiscoveryRegister(zookeeperConfig *config.ZooKeeper, share *config.Share) (discovery.SvcDiscoveryRegistry, error) { - switch share.Env { - case zookeeperConst: +func NewDiscoveryRegister(discovery *config.Discovery, share *config.Share) (discovery.SvcDiscoveryRegistry, error) { + switch discovery.Enable { + case "zookeeper": return zookeeper.NewZkClient( - zookeeperConfig.Address, - zookeeperConfig.Schema, + discovery.ZooKeeper.Address, + discovery.ZooKeeper.Schema, zookeeper.WithFreq(time.Hour), - zookeeper.WithUserNameAndPassword(zookeeperConfig.Username, zookeeperConfig.Password), + zookeeper.WithUserNameAndPassword(discovery.ZooKeeper.Username, discovery.ZooKeeper.Password), zookeeper.WithRoundRobin(), zookeeper.WithTimeout(10), ) - case kubenetesConst: + case "k8s": return kubernetes.NewK8sDiscoveryRegister(share.RpcRegisterName.MessageGateway) - case etcdConst: - return getcd.NewSvcDiscoveryRegistry("etcd", []string{"localhost:2379"}) - case directConst: + case "etcd": + return getcd.NewSvcDiscoveryRegistry( + discovery.Etcd.RootDirectory, + discovery.Etcd.Address, + getcd.WithDialTimeout(10*time.Second), + getcd.WithMaxCallSendMsgSize(20*1024*1024), + getcd.WithUsernameAndPassword(discovery.Etcd.Username, discovery.Etcd.Password)) + case "direct": //return direct.NewConnDirect(config) default: - return nil, errs.New("unsupported discovery type", "type", share.Env).Wrap() + return nil, errs.New("unsupported discovery type", "type", discovery.Enable).Wrap() } return nil, nil } diff --git a/pkg/common/discoveryregister/etcd/etcd.go b/pkg/common/discoveryregister/etcd/etcd.go index 62d6df150..904f83dd0 100644 --- a/pkg/common/discoveryregister/etcd/etcd.go +++ b/pkg/common/discoveryregister/etcd/etcd.go @@ -8,27 +8,38 @@ import ( "go.etcd.io/etcd/client/v3/naming/resolver" "google.golang.org/grpc" gresolver "google.golang.org/grpc/resolver" - - "log" "time" ) +// ZkOption defines a function type for modifying clientv3.Config +type ZkOption func(*clientv3.Config) + // SvcDiscoveryRegistryImpl implementation type SvcDiscoveryRegistryImpl struct { - client *clientv3.Client - resolver gresolver.Builder - dialOptions []grpc.DialOption - serviceKey string - endpointMgr endpoints.Manager - leaseID clientv3.LeaseID - schema string + client *clientv3.Client + resolver gresolver.Builder + dialOptions []grpc.DialOption + serviceKey string + endpointMgr endpoints.Manager + leaseID clientv3.LeaseID + rootDirectory string } -func NewSvcDiscoveryRegistry(schema string, endpoints []string) (*SvcDiscoveryRegistryImpl, error) { +// NewSvcDiscoveryRegistry creates a new service discovery registry implementation +func NewSvcDiscoveryRegistry(rootDirectory string, endpoints []string, options ...ZkOption) (*SvcDiscoveryRegistryImpl, error) { cfg := clientv3.Config{ Endpoints: endpoints, DialTimeout: 5 * time.Second, + // Increase keep-alive queue capacity and message size + PermitWithoutStream: true, + MaxCallSendMsgSize: 10 * 1024 * 1024, // 10 MB } + + // Apply provided options to the config + for _, opt := range options { + opt(&cfg) + } + client, err := clientv3.New(cfg) if err != nil { return nil, err @@ -38,17 +49,42 @@ func NewSvcDiscoveryRegistry(schema string, endpoints []string) (*SvcDiscoveryRe return nil, err } return &SvcDiscoveryRegistryImpl{ - client: client, - resolver: r, - schema: schema, + client: client, + resolver: r, + rootDirectory: rootDirectory, }, nil } +// WithDialTimeout sets a custom dial timeout for the etcd client +func WithDialTimeout(timeout time.Duration) ZkOption { + return func(cfg *clientv3.Config) { + cfg.DialTimeout = timeout + } +} + +// WithMaxCallSendMsgSize sets a custom max call send message size for the etcd client +func WithMaxCallSendMsgSize(size int) ZkOption { + return func(cfg *clientv3.Config) { + cfg.MaxCallSendMsgSize = size + } +} + +// WithUsernameAndPassword sets a username and password for the etcd client +func WithUsernameAndPassword(username, password string) ZkOption { + return func(cfg *clientv3.Config) { + cfg.Username = username + cfg.Password = password + } +} + +// GetUserIdHashGatewayHost returns the gateway host for a given user ID hash func (r *SvcDiscoveryRegistryImpl) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) { return "", nil } + +// GetConns returns gRPC client connections for a given service name func (r *SvcDiscoveryRegistryImpl) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { - target := fmt.Sprintf("%s:///%s", r.schema, serviceName) + target := fmt.Sprintf("etcd:///%s", serviceName) conn, err := grpc.DialContext(ctx, target, append(append(r.dialOptions, opts...), grpc.WithResolvers(r.resolver))...) if err != nil { return nil, err @@ -56,34 +92,39 @@ func (r *SvcDiscoveryRegistryImpl) GetConns(ctx context.Context, serviceName str return []*grpc.ClientConn{conn}, nil } +// GetConn returns a single gRPC client connection for a given service name func (r *SvcDiscoveryRegistryImpl) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { - target := fmt.Sprintf("%s:///%s", r.schema, serviceName) + target := fmt.Sprintf("etcd:///%s", serviceName) return grpc.DialContext(ctx, target, append(append(r.dialOptions, opts...), grpc.WithResolvers(r.resolver))...) } +// GetSelfConnTarget returns the connection target for the current service func (r *SvcDiscoveryRegistryImpl) GetSelfConnTarget() string { - return fmt.Sprintf("%s:///%s", r.schema, r.serviceKey) + return fmt.Sprintf("etcd:///%s", r.serviceKey) } +// AddOption appends gRPC dial options to the existing options func (r *SvcDiscoveryRegistryImpl) AddOption(opts ...grpc.DialOption) { r.dialOptions = append(r.dialOptions, opts...) } +// CloseConn closes a given gRPC client connection func (r *SvcDiscoveryRegistryImpl) CloseConn(conn *grpc.ClientConn) { if err := conn.Close(); err != nil { - log.Printf("Failed to close connection: %v", err) + fmt.Printf("Failed to close connection: %v\n", err) } } +// Register registers a new service endpoint with etcd func (r *SvcDiscoveryRegistryImpl) Register(serviceName, host string, port int, opts ...grpc.DialOption) error { - r.serviceKey = fmt.Sprintf("%s/%s-%d", serviceName, host, port) - em, err := endpoints.NewManager(r.client, serviceName) + r.serviceKey = fmt.Sprintf("%s/%s/%s-%d", r.rootDirectory, serviceName, host, port) + em, err := endpoints.NewManager(r.client, r.rootDirectory+"/"+serviceName) if err != nil { return err } r.endpointMgr = em - leaseResp, err := r.client.Grant(context.Background(), 30) + leaseResp, err := r.client.Grant(context.Background(), 60) // Increase TTL time if err != nil { return err } @@ -100,10 +141,11 @@ func (r *SvcDiscoveryRegistryImpl) Register(serviceName, host string, port int, return nil } +// keepAliveLease maintains the lease alive by sending keep-alive requests func (r *SvcDiscoveryRegistryImpl) keepAliveLease(leaseID clientv3.LeaseID) { ch, err := r.client.KeepAlive(context.Background(), leaseID) if err != nil { - log.Printf("Failed to keep lease alive: %v", err) + fmt.Printf("Failed to keep lease alive: %v\n", err) return } @@ -111,12 +153,13 @@ func (r *SvcDiscoveryRegistryImpl) keepAliveLease(leaseID clientv3.LeaseID) { if ka != nil { fmt.Printf("Received lease keep-alive response: %v\n", ka) } else { - fmt.Printf("Lease keep-alive response channel closed") - break + fmt.Printf("Lease keep-alive response channel closed\n") + return } } } +// UnRegister removes the service endpoint from etcd func (r *SvcDiscoveryRegistryImpl) UnRegister() error { if r.endpointMgr == nil { return fmt.Errorf("endpoint manager is not initialized") @@ -124,6 +167,7 @@ func (r *SvcDiscoveryRegistryImpl) UnRegister() error { return r.endpointMgr.DeleteEndpoint(context.TODO(), r.serviceKey) } +// Close closes the etcd client connection func (r *SvcDiscoveryRegistryImpl) Close() { if r.client != nil { _ = r.client.Close() diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index ebcd5aa7c..a36bcfe1c 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -44,7 +44,7 @@ import ( ) // Start rpc server. -func Start[T any](ctx context.Context, zookeeperConfig *config2.ZooKeeper, prometheusConfig *config2.Prometheus, listenIP, +func Start[T any](ctx context.Context, discovery *config2.Discovery, prometheusConfig *config2.Prometheus, listenIP, registerIP string, rpcPorts []int, index int, rpcRegisterName string, share *config2.Share, config T, rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { @@ -68,7 +68,7 @@ func Start[T any](ctx context.Context, zookeeperConfig *config2.ZooKeeper, prome } defer listener.Close() - client, err := kdisc.NewDiscoveryRegister(zookeeperConfig, share) + client, err := kdisc.NewDiscoveryRegister(discovery, share) if err != nil { return err }