From 62b4a6a0ca44b5d1e3e35210bbbc8115b1adb3a3 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Fri, 10 May 2024 21:30:12 +0800 Subject: [PATCH] add etcd --- go.mod | 4 +- go.sum | 4 +- .../discoveryregister/discoveryregister.go | 38 +++--- pkg/common/discoveryregister/etcd/etcd.go | 110 ++++++++++++++++++ 4 files changed, 136 insertions(+), 20 deletions(-) create mode 100644 pkg/common/discoveryregister/etcd/etcd.go diff --git a/go.mod b/go.mod index e9777eaa8..6b6de7804 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.65 - github.com/openimsdk/tools v0.0.49-alpha.18 + github.com/openimsdk/tools v0.0.49-alpha.6 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 @@ -40,6 +40,7 @@ require ( github.com/shirou/gopsutil v3.21.11+incompatible github.com/spf13/viper v1.18.2 github.com/stathat/consistent v1.0.0 + go.etcd.io/etcd/client/v3 v3.5.13 go.uber.org/automaxprocs v1.5.3 golang.org/x/sync v0.6.0 ) @@ -141,7 +142,6 @@ require ( github.com/yusufpapurcu/wmi v1.2.4 // indirect go.etcd.io/etcd/api/v3 v3.5.13 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.13 // indirect - go.etcd.io/etcd/client/v3 v3.5.13 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect diff --git a/go.sum b/go.sum index 3acb4709c..9a1bd14aa 100644 --- a/go.sum +++ b/go.sum @@ -288,8 +288,8 @@ github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJ github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.65 h1:SPT9qyUsFRTTKSKb/FjpS+xr6sxz/Kbnu+su1bxYagc= github.com/openimsdk/protocol v0.0.65/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= -github.com/openimsdk/tools v0.0.49-alpha.18 h1:ARQeCiRmExvtB6XYItegThuV63JGOTxddwhSLHYXd78= -github.com/openimsdk/tools v0.0.49-alpha.18/go.mod h1:g7mkHXYUPi0/8aAX8VPMHpnb3hqdV69Jph+bXOGvvNM= +github.com/openimsdk/tools v0.0.49-alpha.6 h1:I3HSyNWqOMTzZ1DlD0ZDKAgVzxGAFnN9EYW0xkA+8To= +github.com/openimsdk/tools v0.0.49-alpha.6/go.mod h1:g7mkHXYUPi0/8aAX8VPMHpnb3hqdV69Jph+bXOGvvNM= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index 559c937c1..4aeee67bc 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -16,36 +16,42 @@ package discoveryregister import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" + getcd "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/etcd" "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes" "github.com/openimsdk/tools/discovery" - "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/discovery/zookeeper" "github.com/openimsdk/tools/errs" "time" ) +const ( + zookeeperConst = "zookeeper" + kubenetesConst = "k8s" + directConst = "direct" + etcd = "etcd" +) + // NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type. -func NewDiscoveryRegister(discovery *config.Discovery, share *config.Share) (discovery.SvcDiscoveryRegistry, error) { - switch discovery.Enable { - case "zookeeper": +func NewDiscoveryRegister(zookeeperConfig *config.ZooKeeper, share *config.Share) (discovery.SvcDiscoveryRegistry1, error) { + switch share.Env { + case zookeeperConst: + return zookeeper.NewZkClient( - discovery.ZooKeeper.Address, - discovery.ZooKeeper.Schema, + zookeeperConfig.Address, + zookeeperConfig.Schema, zookeeper.WithFreq(time.Hour), - zookeeper.WithUserNameAndPassword(discovery.ZooKeeper.Username, discovery.ZooKeeper.Password), + zookeeper.WithUserNameAndPassword(zookeeperConfig.Username, zookeeperConfig.Password), zookeeper.WithRoundRobin(), zookeeper.WithTimeout(10), ) - case "k8s": + case kubenetesConst: return kubernetes.NewK8sDiscoveryRegister(share.RpcRegisterName.MessageGateway) - case "etcd": - return etcd.NewSvcDiscoveryRegistry( - discovery.Etcd.RootDirectory, - discovery.Etcd.Address, - etcd.WithDialTimeout(10*time.Second), - etcd.WithMaxCallSendMsgSize(20*1024*1024), - etcd.WithUsernameAndPassword(discovery.Etcd.Username, discovery.Etcd.Password)) + case etcd: + return getcd.NewSvcDiscoveryRegistry("openim", []string{"http://localhost:2379"}) + case directConst: + //return direct.NewConnDirect(config) default: - return nil, errs.New("unsupported discovery type", "type", discovery.Enable).Wrap() + return nil, errs.New("unsupported discovery type", "type", share.Env).Wrap() } + return nil, nil } diff --git a/pkg/common/discoveryregister/etcd/etcd.go b/pkg/common/discoveryregister/etcd/etcd.go new file mode 100644 index 000000000..5c6bae849 --- /dev/null +++ b/pkg/common/discoveryregister/etcd/etcd.go @@ -0,0 +1,110 @@ +package etcd + +import ( + "context" + "fmt" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/naming/endpoints" + "go.etcd.io/etcd/client/v3/naming/resolver" + "google.golang.org/grpc" + gresolver "google.golang.org/grpc/resolver" + + "log" + "time" +) + +// SvcDiscoveryRegistryImpl implementation +type SvcDiscoveryRegistryImpl struct { + client *clientv3.Client + resolver gresolver.Builder + dialOptions []grpc.DialOption + serviceKey string + endpointMgr endpoints.Manager + leaseID clientv3.LeaseID + schema string +} + +func NewSvcDiscoveryRegistry(schema string, endpoints []string) (*SvcDiscoveryRegistryImpl, error) { + cfg := clientv3.Config{ + Endpoints: endpoints, + DialTimeout: 5 * time.Second, + } + client, err := clientv3.New(cfg) + if err != nil { + return nil, err + } + r, err := resolver.NewBuilder(client) + if err != nil { + return nil, err + } + return &SvcDiscoveryRegistryImpl{ + client: client, + resolver: r, + schema: schema, + }, nil +} + +func (r *SvcDiscoveryRegistryImpl) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { + target := fmt.Sprintf("%s:///%s", r.schema, serviceName) + conn, err := grpc.DialContext(ctx, target, append(r.dialOptions, opts...)...) + if err != nil { + return nil, err + } + return []*grpc.ClientConn{conn}, nil +} + +func (r *SvcDiscoveryRegistryImpl) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + target := fmt.Sprintf("%s:///%s", r.schema, serviceName) + return grpc.DialContext(ctx, target, append(r.dialOptions, opts...)...) +} + +func (r *SvcDiscoveryRegistryImpl) GetSelfConnTarget() string { + return fmt.Sprintf("%s:///%s", r.schema, r.serviceKey) +} + +func (r *SvcDiscoveryRegistryImpl) AddOption(opts ...grpc.DialOption) { + r.dialOptions = append(r.dialOptions, opts...) +} + +func (r *SvcDiscoveryRegistryImpl) CloseConn(conn *grpc.ClientConn) { + if err := conn.Close(); err != nil { + log.Printf("Failed to close connection: %v", err) + } +} + +func (r *SvcDiscoveryRegistryImpl) Register(serviceName, host string, port int, opts ...grpc.DialOption) error { + r.serviceKey = fmt.Sprintf("%s/%s:%d", serviceName, host, port) + em, err := endpoints.NewManager(r.client, serviceName) + if err != nil { + return err + } + r.endpointMgr = em + + leaseResp, err := r.client.Grant(context.Background(), 5) + if err != nil { + return err + } + r.leaseID = leaseResp.ID + + endpoint := endpoints.Endpoint{Addr: fmt.Sprintf("%s:%d", host, port)} + err = em.AddEndpoint(context.TODO(), r.serviceKey, endpoint, clientv3.WithLease(leaseResp.ID)) + if err != nil { + return err + } + + _, err = r.client.KeepAlive(context.Background(), r.leaseID) + return err +} + +func (r *SvcDiscoveryRegistryImpl) UnRegister() error { + if r.endpointMgr == nil { + return fmt.Errorf("endpoint manager is not initialized") + } + return r.endpointMgr.DeleteEndpoint(context.TODO(), r.serviceKey) +} + +func (r *SvcDiscoveryRegistryImpl) Close() { + if r.client != nil { + _ = r.client.Close() + } +}