This commit is contained in:
skiffer-git 2024-05-10 21:30:12 +08:00
parent 797b13e4a4
commit 62b4a6a0ca
4 changed files with 136 additions and 20 deletions

4
go.mod
View File

@ -14,7 +14,7 @@ require (
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.65
github.com/openimsdk/tools v0.0.49-alpha.18
github.com/openimsdk/tools v0.0.49-alpha.6
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.9.0
@ -40,6 +40,7 @@ require (
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/spf13/viper v1.18.2
github.com/stathat/consistent v1.0.0
go.etcd.io/etcd/client/v3 v3.5.13
go.uber.org/automaxprocs v1.5.3
golang.org/x/sync v0.6.0
)
@ -141,7 +142,6 @@ require (
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.etcd.io/etcd/api/v3 v3.5.13 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.13 // indirect
go.etcd.io/etcd/client/v3 v3.5.13 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect

4
go.sum
View File

@ -288,8 +288,8 @@ github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJ
github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.65 h1:SPT9qyUsFRTTKSKb/FjpS+xr6sxz/Kbnu+su1bxYagc=
github.com/openimsdk/protocol v0.0.65/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.49-alpha.18 h1:ARQeCiRmExvtB6XYItegThuV63JGOTxddwhSLHYXd78=
github.com/openimsdk/tools v0.0.49-alpha.18/go.mod h1:g7mkHXYUPi0/8aAX8VPMHpnb3hqdV69Jph+bXOGvvNM=
github.com/openimsdk/tools v0.0.49-alpha.6 h1:I3HSyNWqOMTzZ1DlD0ZDKAgVzxGAFnN9EYW0xkA+8To=
github.com/openimsdk/tools v0.0.49-alpha.6/go.mod h1:g7mkHXYUPi0/8aAX8VPMHpnb3hqdV69Jph+bXOGvvNM=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=

View File

@ -16,36 +16,42 @@ package discoveryregister
import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
getcd "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/etcd"
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/discovery/zookeeper"
"github.com/openimsdk/tools/errs"
"time"
)
const (
zookeeperConst = "zookeeper"
kubenetesConst = "k8s"
directConst = "direct"
etcd = "etcd"
)
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
func NewDiscoveryRegister(discovery *config.Discovery, share *config.Share) (discovery.SvcDiscoveryRegistry, error) {
switch discovery.Enable {
case "zookeeper":
func NewDiscoveryRegister(zookeeperConfig *config.ZooKeeper, share *config.Share) (discovery.SvcDiscoveryRegistry1, error) {
switch share.Env {
case zookeeperConst:
return zookeeper.NewZkClient(
discovery.ZooKeeper.Address,
discovery.ZooKeeper.Schema,
zookeeperConfig.Address,
zookeeperConfig.Schema,
zookeeper.WithFreq(time.Hour),
zookeeper.WithUserNameAndPassword(discovery.ZooKeeper.Username, discovery.ZooKeeper.Password),
zookeeper.WithUserNameAndPassword(zookeeperConfig.Username, zookeeperConfig.Password),
zookeeper.WithRoundRobin(),
zookeeper.WithTimeout(10),
)
case "k8s":
case kubenetesConst:
return kubernetes.NewK8sDiscoveryRegister(share.RpcRegisterName.MessageGateway)
case "etcd":
return etcd.NewSvcDiscoveryRegistry(
discovery.Etcd.RootDirectory,
discovery.Etcd.Address,
etcd.WithDialTimeout(10*time.Second),
etcd.WithMaxCallSendMsgSize(20*1024*1024),
etcd.WithUsernameAndPassword(discovery.Etcd.Username, discovery.Etcd.Password))
case etcd:
return getcd.NewSvcDiscoveryRegistry("openim", []string{"http://localhost:2379"})
case directConst:
//return direct.NewConnDirect(config)
default:
return nil, errs.New("unsupported discovery type", "type", discovery.Enable).Wrap()
return nil, errs.New("unsupported discovery type", "type", share.Env).Wrap()
}
return nil, nil
}

View File

@ -0,0 +1,110 @@
package etcd
import (
"context"
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/naming/endpoints"
"go.etcd.io/etcd/client/v3/naming/resolver"
"google.golang.org/grpc"
gresolver "google.golang.org/grpc/resolver"
"log"
"time"
)
// SvcDiscoveryRegistryImpl implementation
type SvcDiscoveryRegistryImpl struct {
client *clientv3.Client
resolver gresolver.Builder
dialOptions []grpc.DialOption
serviceKey string
endpointMgr endpoints.Manager
leaseID clientv3.LeaseID
schema string
}
func NewSvcDiscoveryRegistry(schema string, endpoints []string) (*SvcDiscoveryRegistryImpl, error) {
cfg := clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
}
client, err := clientv3.New(cfg)
if err != nil {
return nil, err
}
r, err := resolver.NewBuilder(client)
if err != nil {
return nil, err
}
return &SvcDiscoveryRegistryImpl{
client: client,
resolver: r,
schema: schema,
}, nil
}
func (r *SvcDiscoveryRegistryImpl) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
target := fmt.Sprintf("%s:///%s", r.schema, serviceName)
conn, err := grpc.DialContext(ctx, target, append(r.dialOptions, opts...)...)
if err != nil {
return nil, err
}
return []*grpc.ClientConn{conn}, nil
}
func (r *SvcDiscoveryRegistryImpl) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
target := fmt.Sprintf("%s:///%s", r.schema, serviceName)
return grpc.DialContext(ctx, target, append(r.dialOptions, opts...)...)
}
func (r *SvcDiscoveryRegistryImpl) GetSelfConnTarget() string {
return fmt.Sprintf("%s:///%s", r.schema, r.serviceKey)
}
func (r *SvcDiscoveryRegistryImpl) AddOption(opts ...grpc.DialOption) {
r.dialOptions = append(r.dialOptions, opts...)
}
func (r *SvcDiscoveryRegistryImpl) CloseConn(conn *grpc.ClientConn) {
if err := conn.Close(); err != nil {
log.Printf("Failed to close connection: %v", err)
}
}
func (r *SvcDiscoveryRegistryImpl) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
r.serviceKey = fmt.Sprintf("%s/%s:%d", serviceName, host, port)
em, err := endpoints.NewManager(r.client, serviceName)
if err != nil {
return err
}
r.endpointMgr = em
leaseResp, err := r.client.Grant(context.Background(), 5)
if err != nil {
return err
}
r.leaseID = leaseResp.ID
endpoint := endpoints.Endpoint{Addr: fmt.Sprintf("%s:%d", host, port)}
err = em.AddEndpoint(context.TODO(), r.serviceKey, endpoint, clientv3.WithLease(leaseResp.ID))
if err != nil {
return err
}
_, err = r.client.KeepAlive(context.Background(), r.leaseID)
return err
}
func (r *SvcDiscoveryRegistryImpl) UnRegister() error {
if r.endpointMgr == nil {
return fmt.Errorf("endpoint manager is not initialized")
}
return r.endpointMgr.DeleteEndpoint(context.TODO(), r.serviceKey)
}
func (r *SvcDiscoveryRegistryImpl) Close() {
if r.client != nil {
_ = r.client.Close()
}
}