From d61495b047a5e649c99e9845f8ab06577462e715 Mon Sep 17 00:00:00 2001 From: "Xinwei Xiong (cubxxw)" <3293172751nss@gmail.com> Date: Wed, 13 Dec 2023 20:14:45 +0800 Subject: [PATCH] feat: add zk and redis mongo env Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com> --- pkg/common/db/unrelation/mongo.go | 1 + .../discoveryregister/discoveryregister.go | 85 +--- .../discoveryregister_test.go | 416 ++---------------- .../kubernetes/kubernetes.go | 90 ++++ .../discoveryregister/zookeeper/zookeeper.go | 46 ++ 5 files changed, 171 insertions(+), 467 deletions(-) create mode 100644 pkg/common/discoveryregister/kubernetes/kubernetes.go create mode 100644 pkg/common/discoveryregister/zookeeper/zookeeper.go diff --git a/pkg/common/db/unrelation/mongo.go b/pkg/common/db/unrelation/mongo.go index 228e09b63..893ff7400 100644 --- a/pkg/common/db/unrelation/mongo.go +++ b/pkg/common/db/unrelation/mongo.go @@ -37,6 +37,7 @@ func NewMongo() (*Mongo, error) { var mongoClient *mongo.Client var err error + // Retry connecting to MongoDB for i := 0; i <= maxRetry; i++ { ctx, cancel := context.WithTimeout(context.Background(), mongoConnTimeout) defer cancel() diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index c204184ff..ee6374a7c 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -1,93 +1,22 @@ package discoveryregister import ( - "context" "errors" - "fmt" - "time" + + "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes" + "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/zookeeper" "github.com/OpenIMSDK/tools/discoveryregistry" - openkeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper" - "github.com/OpenIMSDK/tools/log" - "google.golang.org/grpc" - - "github.com/openimsdk/open-im-server/v3/pkg/common/config" ) +// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type. func NewDiscoveryRegister(envType string) (discoveryregistry.SvcDiscoveryRegistry, error) { - var client discoveryregistry.SvcDiscoveryRegistry - var err error switch envType { case "zookeeper": - client, err = openkeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, - openkeeper.WithFreq(time.Hour), openkeeper.WithUserNameAndPassword( - config.Config.Zookeeper.Username, - config.Config.Zookeeper.Password, - ), openkeeper.WithRoundRobin(), openkeeper.WithTimeout(10), openkeeper.WithLogger(log.NewZkLogger())) + return zookeeper.NewZookeeperDiscoveryRegister() case "k8s": - client, err = NewK8sDiscoveryRegister() + return kubernetes.NewK8sDiscoveryRegister() default: - client = nil - err = errors.New("envType not correct") + return nil, errors.New("envType not correct") } - return client, err -} - -type K8sDR struct { - options []grpc.DialOption - rpcRegisterAddr string -} - -func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) { - return &K8sDR{}, nil -} - -func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error { - cli.rpcRegisterAddr = serviceName - return nil -} -func (cli *K8sDR) UnRegister() error { - - return nil -} -func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error { - - return nil -} -func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error { - - return nil -} - -func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) { - - return nil, nil -} -func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { - - conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...) - return []*grpc.ClientConn{conn}, err -} -func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { - - return grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...) -} -func (cli *K8sDR) GetSelfConnTarget() string { - - return cli.rpcRegisterAddr -} -func (cli *K8sDR) AddOption(opts ...grpc.DialOption) { - cli.options = append(cli.options, opts...) -} -func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) { - conn.Close() -} - -// do not use this method for call rpc -func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn { - fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!") - return nil -} -func (cli *K8sDR) Close() { - return } diff --git a/pkg/common/discoveryregister/discoveryregister_test.go b/pkg/common/discoveryregister/discoveryregister_test.go index 8426598f9..c1b10df58 100644 --- a/pkg/common/discoveryregister/discoveryregister_test.go +++ b/pkg/common/discoveryregister/discoveryregister_test.go @@ -1,407 +1,45 @@ package discoveryregister import ( - "context" - "reflect" + "os" "testing" "github.com/OpenIMSDK/tools/discoveryregistry" - "google.golang.org/grpc" + "github.com/stretchr/testify/assert" ) +func setupTestEnvironment() { + os.Setenv("ZOOKEEPER_SCHEMA", "openim") + os.Setenv("ZOOKEEPER_ADDRESS", "172.28.0.1:12181") + os.Setenv("ZOOKEEPER_USERNAME", "") + os.Setenv("ZOOKEEPER_PASSWORD", "") +} + func TestNewDiscoveryRegister(t *testing.T) { - type args struct { - envType string - } - tests := []struct { - name string - args args - want discoveryregistry.SvcDiscoveryRegistry - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := NewDiscoveryRegister(tt.args.envType) - if (err != nil) != tt.wantErr { - t.Errorf("NewDiscoveryRegister() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("NewDiscoveryRegister() = %v, want %v", got, tt.want) - } - }) - } -} + setupTestEnvironment() -func TestNewK8sDiscoveryRegister(t *testing.T) { tests := []struct { - name string - want discoveryregistry.SvcDiscoveryRegistry - wantErr bool + envType string + expectedError bool + expectedResult bool }{ - // TODO: Add test cases. + {"zookeeper", false, true}, + {"k8s", false, true}, // 假设 k8s 配置也已正确设置 + {"invalid", true, false}, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := NewK8sDiscoveryRegister() - if (err != nil) != tt.wantErr { - t.Errorf("NewK8sDiscoveryRegister() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("NewK8sDiscoveryRegister() = %v, want %v", got, tt.want) - } - }) - } -} -func TestK8sDR_Register(t *testing.T) { - type fields struct { - options []grpc.DialOption - rpcRegisterAddr string - } - type args struct { - serviceName string - host string - port int - opts []grpc.DialOption - } - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - cli := &K8sDR{ - options: tt.fields.options, - rpcRegisterAddr: tt.fields.rpcRegisterAddr, - } - if err := cli.Register(tt.args.serviceName, tt.args.host, tt.args.port, tt.args.opts...); (err != nil) != tt.wantErr { - t.Errorf("K8sDR.Register() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} + for _, test := range tests { + client, err := NewDiscoveryRegister(test.envType) -func TestK8sDR_UnRegister(t *testing.T) { - type fields struct { - options []grpc.DialOption - rpcRegisterAddr string - } - tests := []struct { - name string - fields fields - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - cli := &K8sDR{ - options: tt.fields.options, - rpcRegisterAddr: tt.fields.rpcRegisterAddr, + if test.expectedError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + if test.expectedResult { + assert.Implements(t, (*discoveryregistry.SvcDiscoveryRegistry)(nil), client) + } else { + assert.Nil(t, client) } - if err := cli.UnRegister(); (err != nil) != tt.wantErr { - t.Errorf("K8sDR.UnRegister() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} - -func TestK8sDR_CreateRpcRootNodes(t *testing.T) { - type fields struct { - options []grpc.DialOption - rpcRegisterAddr string - } - type args struct { - serviceNames []string - } - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - cli := &K8sDR{ - options: tt.fields.options, - rpcRegisterAddr: tt.fields.rpcRegisterAddr, - } - if err := cli.CreateRpcRootNodes(tt.args.serviceNames); (err != nil) != tt.wantErr { - t.Errorf("K8sDR.CreateRpcRootNodes() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} - -func TestK8sDR_RegisterConf2Registry(t *testing.T) { - type fields struct { - options []grpc.DialOption - rpcRegisterAddr string - } - type args struct { - key string - conf []byte - } - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - cli := &K8sDR{ - options: tt.fields.options, - rpcRegisterAddr: tt.fields.rpcRegisterAddr, - } - if err := cli.RegisterConf2Registry(tt.args.key, tt.args.conf); (err != nil) != tt.wantErr { - t.Errorf("K8sDR.RegisterConf2Registry() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} - -func TestK8sDR_GetConfFromRegistry(t *testing.T) { - type fields struct { - options []grpc.DialOption - rpcRegisterAddr string - } - type args struct { - key string - } - tests := []struct { - name string - fields fields - args args - want []byte - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - cli := &K8sDR{ - options: tt.fields.options, - rpcRegisterAddr: tt.fields.rpcRegisterAddr, - } - got, err := cli.GetConfFromRegistry(tt.args.key) - if (err != nil) != tt.wantErr { - t.Errorf("K8sDR.GetConfFromRegistry() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("K8sDR.GetConfFromRegistry() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestK8sDR_GetConns(t *testing.T) { - type fields struct { - options []grpc.DialOption - rpcRegisterAddr string - } - type args struct { - ctx context.Context - serviceName string - opts []grpc.DialOption - } - tests := []struct { - name string - fields fields - args args - want []*grpc.ClientConn - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - cli := &K8sDR{ - options: tt.fields.options, - rpcRegisterAddr: tt.fields.rpcRegisterAddr, - } - got, err := cli.GetConns(tt.args.ctx, tt.args.serviceName, tt.args.opts...) - if (err != nil) != tt.wantErr { - t.Errorf("K8sDR.GetConns() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("K8sDR.GetConns() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestK8sDR_GetConn(t *testing.T) { - type fields struct { - options []grpc.DialOption - rpcRegisterAddr string - } - type args struct { - ctx context.Context - serviceName string - opts []grpc.DialOption - } - tests := []struct { - name string - fields fields - args args - want *grpc.ClientConn - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - cli := &K8sDR{ - options: tt.fields.options, - rpcRegisterAddr: tt.fields.rpcRegisterAddr, - } - got, err := cli.GetConn(tt.args.ctx, tt.args.serviceName, tt.args.opts...) - if (err != nil) != tt.wantErr { - t.Errorf("K8sDR.GetConn() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("K8sDR.GetConn() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestK8sDR_GetSelfConnTarget(t *testing.T) { - type fields struct { - options []grpc.DialOption - rpcRegisterAddr string - } - tests := []struct { - name string - fields fields - want string - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - cli := &K8sDR{ - options: tt.fields.options, - rpcRegisterAddr: tt.fields.rpcRegisterAddr, - } - if got := cli.GetSelfConnTarget(); got != tt.want { - t.Errorf("K8sDR.GetSelfConnTarget() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestK8sDR_AddOption(t *testing.T) { - type fields struct { - options []grpc.DialOption - rpcRegisterAddr string - } - type args struct { - opts []grpc.DialOption - } - tests := []struct { - name string - fields fields - args args - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - cli := &K8sDR{ - options: tt.fields.options, - rpcRegisterAddr: tt.fields.rpcRegisterAddr, - } - cli.AddOption(tt.args.opts...) - }) - } -} - -func TestK8sDR_CloseConn(t *testing.T) { - type fields struct { - options []grpc.DialOption - rpcRegisterAddr string - } - type args struct { - conn *grpc.ClientConn - } - tests := []struct { - name string - fields fields - args args - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - cli := &K8sDR{ - options: tt.fields.options, - rpcRegisterAddr: tt.fields.rpcRegisterAddr, - } - cli.CloseConn(tt.args.conn) - }) - } -} - -func TestK8sDR_GetClientLocalConns(t *testing.T) { - type fields struct { - options []grpc.DialOption - rpcRegisterAddr string - } - tests := []struct { - name string - fields fields - want map[string][]*grpc.ClientConn - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - cli := &K8sDR{ - options: tt.fields.options, - rpcRegisterAddr: tt.fields.rpcRegisterAddr, - } - if got := cli.GetClientLocalConns(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("K8sDR.GetClientLocalConns() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestK8sDR_Close(t *testing.T) { - type fields struct { - options []grpc.DialOption - rpcRegisterAddr string - } - tests := []struct { - name string - fields fields - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - cli := &K8sDR{ - options: tt.fields.options, - rpcRegisterAddr: tt.fields.rpcRegisterAddr, - } - cli.Close() - }) + } } } diff --git a/pkg/common/discoveryregister/kubernetes/kubernetes.go b/pkg/common/discoveryregister/kubernetes/kubernetes.go new file mode 100644 index 000000000..31c61c49f --- /dev/null +++ b/pkg/common/discoveryregister/kubernetes/kubernetes.go @@ -0,0 +1,90 @@ +package kubernetes + +import ( + "context" + "fmt" + + "google.golang.org/grpc" + + "github.com/OpenIMSDK/tools/discoveryregistry" +) + +// K8sDR represents the Kubernetes service discovery and registration client. +type K8sDR struct { + options []grpc.DialOption + rpcRegisterAddr string +} + +// NewK8sDiscoveryRegister creates a new instance of K8sDR for Kubernetes service discovery and registration. +func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) { + + return &K8sDR{}, nil +} + +// Register registers a service with Kubernetes. +func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error { + cli.rpcRegisterAddr = serviceName + + return nil +} + +// UnRegister removes a service registration from Kubernetes. +func (cli *K8sDR) UnRegister() error { + + return nil +} + +// CreateRpcRootNodes creates root nodes for RPC in Kubernetes. +func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error { + + return nil +} + +// RegisterConf2Registry registers a configuration to the registry. +func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error { + + return nil +} + +// GetConfFromRegistry retrieves a configuration from the registry. +func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) { + return nil, nil +} + +// GetConns returns a list of gRPC client connections for a given service. +func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { + conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...) + return []*grpc.ClientConn{conn}, err +} + +// GetConn returns a single gRPC client connection for a given service. +func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + return grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...) +} + +// GetSelfConnTarget returns the connection target of the client itself. +func (cli *K8sDR) GetSelfConnTarget() string { + return cli.rpcRegisterAddr +} + +// AddOption adds gRPC dial options to the client. +func (cli *K8sDR) AddOption(opts ...grpc.DialOption) { + cli.options = append(cli.options, opts...) +} + +// CloseConn closes a given gRPC client connection. +func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) { + conn.Close() +} + +// do not use this method for call rpc +func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn { + fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!") + return nil +} + +// Close closes the K8sDR client. +func (cli *K8sDR) Close() { + // Close any open resources here (if applicable) + return +} diff --git a/pkg/common/discoveryregister/zookeeper/zookeeper.go b/pkg/common/discoveryregister/zookeeper/zookeeper.go new file mode 100644 index 000000000..c7549fc58 --- /dev/null +++ b/pkg/common/discoveryregister/zookeeper/zookeeper.go @@ -0,0 +1,46 @@ +package zookeeper + +import ( + "os" + "strings" + "time" + + "github.com/OpenIMSDK/tools/discoveryregistry" + openkeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper" + "github.com/OpenIMSDK/tools/log" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" +) + +// NewZookeeperDiscoveryRegister creates a new instance of ZookeeperDR for Zookeeper service discovery and registration. +func NewZookeeperDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) { + schema := getEnv("ZOOKEEPER_SCHEMA", config.Config.Zookeeper.Schema) + zkAddr := getZkAddrFromEnv(config.Config.Zookeeper.ZkAddr) + username := getEnv("ZOOKEEPER_USERNAME", config.Config.Zookeeper.Username) + password := getEnv("ZOOKEEPER_PASSWORD", config.Config.Zookeeper.Password) + + return openkeeper.NewClient( + zkAddr, + schema, + openkeeper.WithFreq(time.Hour), + openkeeper.WithUserNameAndPassword(username, password), + openkeeper.WithRoundRobin(), + openkeeper.WithTimeout(10), + openkeeper.WithLogger(log.NewZkLogger()), + ) +} + +// getEnv returns the value of an environment variable if it exists, otherwise it returns the fallback value. +func getEnv(key, fallback string) string { + if value, exists := os.LookupEnv(key); exists { + return value + } + return fallback +} + +// getZkAddrFromEnv returns the value of an environment variable if it exists, otherwise it returns the fallback value. +func getZkAddrFromEnv(fallback []string) []string { + if value, exists := os.LookupEnv("ZOOKEEPER_ADDRESS"); exists { + return strings.Split(value, ",") + } + return fallback +}