From db906fcea0272abb24aa542a89ecc2fe7188186d Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Sat, 11 May 2024 15:40:17 +0800 Subject: [PATCH] Add etcd as a service discovery mechanism --- config/discovery.yml | 5 +- internal/msgtransfer/init.go | 15 ++-- internal/rpc/conversation/conversaion.go | 7 +- .../discoveryregister/discoveryregister.go | 34 ++++--- pkg/common/discoveryregister/etcd/etcd.go | 90 ++++++++++++++----- 5 files changed, 96 insertions(+), 55 deletions(-) diff --git a/config/discovery.yml b/config/discovery.yml index 2cc54be72..3d96ff9b6 100644 --- a/config/discovery.yml +++ b/config/discovery.yml @@ -1,16 +1,13 @@ - enable: "etcd" etcd: + rootDirectory: openim address: [ localhost:12379 ] username: '' password: '' - - zookeeper: schema: openim address: [ localhost:12181 ] username: '' password: '' - diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 3384b8493..8f72e979d 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -56,13 +56,14 @@ type MsgTransfer struct { } type Config struct { - MsgTransfer config.MsgTransfer - RedisConfig config.Redis - MongodbConfig config.Mongo - KafkaConfig config.Kafka - Share config.Share - WebhooksConfig config.Webhooks - Discovery config.Discovery + MsgTransfer config.MsgTransfer + RedisConfig config.Redis + MongodbConfig config.Mongo + KafkaConfig config.Kafka + ZookeeperConfig config.ZooKeeper + Share config.Share + WebhooksConfig config.Webhooks + Discovery config.Discovery } func Start(ctx context.Context, index int, config *Config) error { diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 4c7828610..ec7522212 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -48,9 +48,10 @@ type conversationServer struct { } type Config struct { - RpcConfig config.Conversation - RedisConfig config.Redis - MongodbConfig config.Mongo + RpcConfig config.Conversation + RedisConfig config.Redis + MongodbConfig config.Mongo + // ZookeeperConfig config.ZooKeeper NotificationConfig config.Notification Share config.Share LocalCacheConfig config.LocalCache diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index dbf16eda0..1085ec1ea 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -24,33 +24,31 @@ import ( "time" ) -const ( - zookeeperConst = "zookeeper" - kubenetesConst = "k8s" - directConst = "direct" - etcdConst = "etcd" -) - // NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type. -func NewDiscoveryRegister(zookeeperConfig *config.ZooKeeper, share *config.Share) (discovery.SvcDiscoveryRegistry, error) { - switch share.Env { - case zookeeperConst: +func NewDiscoveryRegister(discovery *config.Discovery, share *config.Share) (discovery.SvcDiscoveryRegistry, error) { + switch discovery.Enable { + case "zookeeper": return zookeeper.NewZkClient( - zookeeperConfig.Address, - zookeeperConfig.Schema, + discovery.ZooKeeper.Address, + discovery.ZooKeeper.Schema, zookeeper.WithFreq(time.Hour), - zookeeper.WithUserNameAndPassword(zookeeperConfig.Username, zookeeperConfig.Password), + zookeeper.WithUserNameAndPassword(discovery.ZooKeeper.Username, discovery.ZooKeeper.Password), zookeeper.WithRoundRobin(), zookeeper.WithTimeout(10), ) - case kubenetesConst: + case "k8s": return kubernetes.NewK8sDiscoveryRegister(share.RpcRegisterName.MessageGateway) - case etcdConst: - return getcd.NewSvcDiscoveryRegistry("etcd", []string{"localhost:2379"}) - case directConst: + case "etcd": + return getcd.NewSvcDiscoveryRegistry( + discovery.Etcd.RootDirectory, + discovery.Etcd.Address, + getcd.WithDialTimeout(10*time.Second), + getcd.WithMaxCallSendMsgSize(20*1024*1024), + getcd.WithUsernameAndPassword(discovery.Etcd.Username, discovery.Etcd.Password)) + case "direct": //return direct.NewConnDirect(config) default: - return nil, errs.New("unsupported discovery type", "type", share.Env).Wrap() + return nil, errs.New("unsupported discovery type", "type", discovery.Enable).Wrap() } return nil, nil } diff --git a/pkg/common/discoveryregister/etcd/etcd.go b/pkg/common/discoveryregister/etcd/etcd.go index 62d6df150..904f83dd0 100644 --- a/pkg/common/discoveryregister/etcd/etcd.go +++ b/pkg/common/discoveryregister/etcd/etcd.go @@ -8,27 +8,38 @@ import ( "go.etcd.io/etcd/client/v3/naming/resolver" "google.golang.org/grpc" gresolver "google.golang.org/grpc/resolver" - - "log" "time" ) +// ZkOption defines a function type for modifying clientv3.Config +type ZkOption func(*clientv3.Config) + // SvcDiscoveryRegistryImpl implementation type SvcDiscoveryRegistryImpl struct { - client *clientv3.Client - resolver gresolver.Builder - dialOptions []grpc.DialOption - serviceKey string - endpointMgr endpoints.Manager - leaseID clientv3.LeaseID - schema string + client *clientv3.Client + resolver gresolver.Builder + dialOptions []grpc.DialOption + serviceKey string + endpointMgr endpoints.Manager + leaseID clientv3.LeaseID + rootDirectory string } -func NewSvcDiscoveryRegistry(schema string, endpoints []string) (*SvcDiscoveryRegistryImpl, error) { +// NewSvcDiscoveryRegistry creates a new service discovery registry implementation +func NewSvcDiscoveryRegistry(rootDirectory string, endpoints []string, options ...ZkOption) (*SvcDiscoveryRegistryImpl, error) { cfg := clientv3.Config{ Endpoints: endpoints, DialTimeout: 5 * time.Second, + // Increase keep-alive queue capacity and message size + PermitWithoutStream: true, + MaxCallSendMsgSize: 10 * 1024 * 1024, // 10 MB } + + // Apply provided options to the config + for _, opt := range options { + opt(&cfg) + } + client, err := clientv3.New(cfg) if err != nil { return nil, err @@ -38,17 +49,42 @@ func NewSvcDiscoveryRegistry(schema string, endpoints []string) (*SvcDiscoveryRe return nil, err } return &SvcDiscoveryRegistryImpl{ - client: client, - resolver: r, - schema: schema, + client: client, + resolver: r, + rootDirectory: rootDirectory, }, nil } +// WithDialTimeout sets a custom dial timeout for the etcd client +func WithDialTimeout(timeout time.Duration) ZkOption { + return func(cfg *clientv3.Config) { + cfg.DialTimeout = timeout + } +} + +// WithMaxCallSendMsgSize sets a custom max call send message size for the etcd client +func WithMaxCallSendMsgSize(size int) ZkOption { + return func(cfg *clientv3.Config) { + cfg.MaxCallSendMsgSize = size + } +} + +// WithUsernameAndPassword sets a username and password for the etcd client +func WithUsernameAndPassword(username, password string) ZkOption { + return func(cfg *clientv3.Config) { + cfg.Username = username + cfg.Password = password + } +} + +// GetUserIdHashGatewayHost returns the gateway host for a given user ID hash func (r *SvcDiscoveryRegistryImpl) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) { return "", nil } + +// GetConns returns gRPC client connections for a given service name func (r *SvcDiscoveryRegistryImpl) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { - target := fmt.Sprintf("%s:///%s", r.schema, serviceName) + target := fmt.Sprintf("etcd:///%s", serviceName) conn, err := grpc.DialContext(ctx, target, append(append(r.dialOptions, opts...), grpc.WithResolvers(r.resolver))...) if err != nil { return nil, err @@ -56,34 +92,39 @@ func (r *SvcDiscoveryRegistryImpl) GetConns(ctx context.Context, serviceName str return []*grpc.ClientConn{conn}, nil } +// GetConn returns a single gRPC client connection for a given service name func (r *SvcDiscoveryRegistryImpl) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { - target := fmt.Sprintf("%s:///%s", r.schema, serviceName) + target := fmt.Sprintf("etcd:///%s", serviceName) return grpc.DialContext(ctx, target, append(append(r.dialOptions, opts...), grpc.WithResolvers(r.resolver))...) } +// GetSelfConnTarget returns the connection target for the current service func (r *SvcDiscoveryRegistryImpl) GetSelfConnTarget() string { - return fmt.Sprintf("%s:///%s", r.schema, r.serviceKey) + return fmt.Sprintf("etcd:///%s", r.serviceKey) } +// AddOption appends gRPC dial options to the existing options func (r *SvcDiscoveryRegistryImpl) AddOption(opts ...grpc.DialOption) { r.dialOptions = append(r.dialOptions, opts...) } +// CloseConn closes a given gRPC client connection func (r *SvcDiscoveryRegistryImpl) CloseConn(conn *grpc.ClientConn) { if err := conn.Close(); err != nil { - log.Printf("Failed to close connection: %v", err) + fmt.Printf("Failed to close connection: %v\n", err) } } +// Register registers a new service endpoint with etcd func (r *SvcDiscoveryRegistryImpl) Register(serviceName, host string, port int, opts ...grpc.DialOption) error { - r.serviceKey = fmt.Sprintf("%s/%s-%d", serviceName, host, port) - em, err := endpoints.NewManager(r.client, serviceName) + r.serviceKey = fmt.Sprintf("%s/%s/%s-%d", r.rootDirectory, serviceName, host, port) + em, err := endpoints.NewManager(r.client, r.rootDirectory+"/"+serviceName) if err != nil { return err } r.endpointMgr = em - leaseResp, err := r.client.Grant(context.Background(), 30) + leaseResp, err := r.client.Grant(context.Background(), 60) // Increase TTL time if err != nil { return err } @@ -100,10 +141,11 @@ func (r *SvcDiscoveryRegistryImpl) Register(serviceName, host string, port int, return nil } +// keepAliveLease maintains the lease alive by sending keep-alive requests func (r *SvcDiscoveryRegistryImpl) keepAliveLease(leaseID clientv3.LeaseID) { ch, err := r.client.KeepAlive(context.Background(), leaseID) if err != nil { - log.Printf("Failed to keep lease alive: %v", err) + fmt.Printf("Failed to keep lease alive: %v\n", err) return } @@ -111,12 +153,13 @@ func (r *SvcDiscoveryRegistryImpl) keepAliveLease(leaseID clientv3.LeaseID) { if ka != nil { fmt.Printf("Received lease keep-alive response: %v\n", ka) } else { - fmt.Printf("Lease keep-alive response channel closed") - break + fmt.Printf("Lease keep-alive response channel closed\n") + return } } } +// UnRegister removes the service endpoint from etcd func (r *SvcDiscoveryRegistryImpl) UnRegister() error { if r.endpointMgr == nil { return fmt.Errorf("endpoint manager is not initialized") @@ -124,6 +167,7 @@ func (r *SvcDiscoveryRegistryImpl) UnRegister() error { return r.endpointMgr.DeleteEndpoint(context.TODO(), r.serviceKey) } +// Close closes the etcd client connection func (r *SvcDiscoveryRegistryImpl) Close() { if r.client != nil { _ = r.client.Close()