diff --git a/go.mod b/go.mod index 3a6ae0bd9..a87a3b6d7 100644 --- a/go.mod +++ b/go.mod @@ -40,6 +40,7 @@ require ( github.com/shirou/gopsutil v3.21.11+incompatible github.com/spf13/viper v1.18.2 github.com/stathat/consistent v1.0.0 + go.etcd.io/etcd/client/v3 v3.5.13 go.uber.org/automaxprocs v1.5.3 golang.org/x/sync v0.6.0 ) @@ -58,6 +59,8 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect github.com/clbanning/mxj v1.8.4 // indirect + github.com/coreos/go-semver v0.3.0 // indirect + github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dustin/go-humanize v1.0.1 // indirect @@ -137,6 +140,8 @@ require ( github.com/xdg-go/stringprep v1.0.4 // indirect github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect + go.etcd.io/etcd/api/v3 v3.5.13 // indirect + go.etcd.io/etcd/client/pkg/v3 v3.5.13 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect diff --git a/go.sum b/go.sum index fa72d61a2..5611a6ca6 100644 --- a/go.sum +++ b/go.sum @@ -47,6 +47,10 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa h1:jQCWAUqqlij9Pgj2i/PB79y4KOPYVyFYdROxgaCwdTQ= github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq8dk6e9PdstVsDgu9RuyIIJqAaF//0IM= +github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= +github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= +github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -111,6 +115,7 @@ github.com/go-zookeeper/zk v1.0.3 h1:7M2kwOsc//9VeeFiPtf+uSJlVpU66x9Ba5+8XK7/TDg github.com/go-zookeeper/zk v1.0.3/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= @@ -378,6 +383,12 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +go.etcd.io/etcd/api/v3 v3.5.13 h1:8WXU2/NBge6AUF1K1gOexB6e07NgsN1hXK0rSTtgSp4= +go.etcd.io/etcd/api/v3 v3.5.13/go.mod h1:gBqlqkcMMZMVTMm4NDZloEVJzxQOQIls8splbqBDa0c= +go.etcd.io/etcd/client/pkg/v3 v3.5.13 h1:RVZSAnWWWiI5IrYAXjQorajncORbS0zI48LQlE2kQWg= +go.etcd.io/etcd/client/pkg/v3 v3.5.13/go.mod h1:XxHT4u1qU12E2+po+UVPrEeL94Um6zL58ppuJWXSAB8= +go.etcd.io/etcd/client/v3 v3.5.13 h1:o0fHTNJLeO0MyVbc7I3fsCf6nrOqn5d+diSarKnB2js= +go.etcd.io/etcd/client/v3 v3.5.13/go.mod h1:cqiAeY8b5DEEcpxvgWKsbLIWNM/8Wy2xJSDMtioMcoI= go.mongodb.org/mongo-driver v1.14.0 h1:P98w8egYRjYe3XDjxhYJagTokP/H6HzlsnojRgZRd80= go.mongodb.org/mongo-driver v1.14.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index 38d7382fa..4aeee67bc 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -16,6 +16,7 @@ package discoveryregister import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" + getcd "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/etcd" "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery/zookeeper" @@ -27,10 +28,11 @@ const ( zookeeperConst = "zookeeper" kubenetesConst = "k8s" directConst = "direct" + etcd = "etcd" ) // NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type. -func NewDiscoveryRegister(zookeeperConfig *config.ZooKeeper, share *config.Share) (discovery.SvcDiscoveryRegistry, error) { +func NewDiscoveryRegister(zookeeperConfig *config.ZooKeeper, share *config.Share) (discovery.SvcDiscoveryRegistry1, error) { switch share.Env { case zookeeperConst: @@ -44,6 +46,8 @@ func NewDiscoveryRegister(zookeeperConfig *config.ZooKeeper, share *config.Share ) case kubenetesConst: return kubernetes.NewK8sDiscoveryRegister(share.RpcRegisterName.MessageGateway) + case etcd: + return getcd.NewSvcDiscoveryRegistry("openim", []string{"http://localhost:2379"}) case directConst: //return direct.NewConnDirect(config) default: diff --git a/pkg/common/discoveryregister/etcd/etcd.go b/pkg/common/discoveryregister/etcd/etcd.go new file mode 100644 index 000000000..5c6bae849 --- /dev/null +++ b/pkg/common/discoveryregister/etcd/etcd.go @@ -0,0 +1,110 @@ +package etcd + +import ( + "context" + "fmt" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/naming/endpoints" + "go.etcd.io/etcd/client/v3/naming/resolver" + "google.golang.org/grpc" + gresolver "google.golang.org/grpc/resolver" + + "log" + "time" +) + +// SvcDiscoveryRegistryImpl implementation +type SvcDiscoveryRegistryImpl struct { + client *clientv3.Client + resolver gresolver.Builder + dialOptions []grpc.DialOption + serviceKey string + endpointMgr endpoints.Manager + leaseID clientv3.LeaseID + schema string +} + +func NewSvcDiscoveryRegistry(schema string, endpoints []string) (*SvcDiscoveryRegistryImpl, error) { + cfg := clientv3.Config{ + Endpoints: endpoints, + DialTimeout: 5 * time.Second, + } + client, err := clientv3.New(cfg) + if err != nil { + return nil, err + } + r, err := resolver.NewBuilder(client) + if err != nil { + return nil, err + } + return &SvcDiscoveryRegistryImpl{ + client: client, + resolver: r, + schema: schema, + }, nil +} + +func (r *SvcDiscoveryRegistryImpl) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { + target := fmt.Sprintf("%s:///%s", r.schema, serviceName) + conn, err := grpc.DialContext(ctx, target, append(r.dialOptions, opts...)...) + if err != nil { + return nil, err + } + return []*grpc.ClientConn{conn}, nil +} + +func (r *SvcDiscoveryRegistryImpl) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + target := fmt.Sprintf("%s:///%s", r.schema, serviceName) + return grpc.DialContext(ctx, target, append(r.dialOptions, opts...)...) +} + +func (r *SvcDiscoveryRegistryImpl) GetSelfConnTarget() string { + return fmt.Sprintf("%s:///%s", r.schema, r.serviceKey) +} + +func (r *SvcDiscoveryRegistryImpl) AddOption(opts ...grpc.DialOption) { + r.dialOptions = append(r.dialOptions, opts...) +} + +func (r *SvcDiscoveryRegistryImpl) CloseConn(conn *grpc.ClientConn) { + if err := conn.Close(); err != nil { + log.Printf("Failed to close connection: %v", err) + } +} + +func (r *SvcDiscoveryRegistryImpl) Register(serviceName, host string, port int, opts ...grpc.DialOption) error { + r.serviceKey = fmt.Sprintf("%s/%s:%d", serviceName, host, port) + em, err := endpoints.NewManager(r.client, serviceName) + if err != nil { + return err + } + r.endpointMgr = em + + leaseResp, err := r.client.Grant(context.Background(), 5) + if err != nil { + return err + } + r.leaseID = leaseResp.ID + + endpoint := endpoints.Endpoint{Addr: fmt.Sprintf("%s:%d", host, port)} + err = em.AddEndpoint(context.TODO(), r.serviceKey, endpoint, clientv3.WithLease(leaseResp.ID)) + if err != nil { + return err + } + + _, err = r.client.KeepAlive(context.Background(), r.leaseID) + return err +} + +func (r *SvcDiscoveryRegistryImpl) UnRegister() error { + if r.endpointMgr == nil { + return fmt.Errorf("endpoint manager is not initialized") + } + return r.endpointMgr.DeleteEndpoint(context.TODO(), r.serviceKey) +} + +func (r *SvcDiscoveryRegistryImpl) Close() { + if r.client != nil { + _ = r.client.Close() + } +}