mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-06 04:15:46 +08:00
feat: add zk and redis mongo env
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
This commit is contained in:
parent
6271a61c36
commit
d61495b047
@ -37,6 +37,7 @@ func NewMongo() (*Mongo, error) {
|
|||||||
var mongoClient *mongo.Client
|
var mongoClient *mongo.Client
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
// Retry connecting to MongoDB
|
||||||
for i := 0; i <= maxRetry; i++ {
|
for i := 0; i <= maxRetry; i++ {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), mongoConnTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), mongoConnTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
@ -1,93 +1,22 @@
|
|||||||
package discoveryregister
|
package discoveryregister
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"time"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/zookeeper"
|
||||||
|
|
||||||
"github.com/OpenIMSDK/tools/discoveryregistry"
|
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||||
openkeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper"
|
|
||||||
"github.com/OpenIMSDK/tools/log"
|
|
||||||
"google.golang.org/grpc"
|
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
|
||||||
func NewDiscoveryRegister(envType string) (discoveryregistry.SvcDiscoveryRegistry, error) {
|
func NewDiscoveryRegister(envType string) (discoveryregistry.SvcDiscoveryRegistry, error) {
|
||||||
var client discoveryregistry.SvcDiscoveryRegistry
|
|
||||||
var err error
|
|
||||||
switch envType {
|
switch envType {
|
||||||
case "zookeeper":
|
case "zookeeper":
|
||||||
client, err = openkeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
|
return zookeeper.NewZookeeperDiscoveryRegister()
|
||||||
openkeeper.WithFreq(time.Hour), openkeeper.WithUserNameAndPassword(
|
|
||||||
config.Config.Zookeeper.Username,
|
|
||||||
config.Config.Zookeeper.Password,
|
|
||||||
), openkeeper.WithRoundRobin(), openkeeper.WithTimeout(10), openkeeper.WithLogger(log.NewZkLogger()))
|
|
||||||
case "k8s":
|
case "k8s":
|
||||||
client, err = NewK8sDiscoveryRegister()
|
return kubernetes.NewK8sDiscoveryRegister()
|
||||||
default:
|
default:
|
||||||
client = nil
|
return nil, errors.New("envType not correct")
|
||||||
err = errors.New("envType not correct")
|
|
||||||
}
|
}
|
||||||
return client, err
|
|
||||||
}
|
|
||||||
|
|
||||||
type K8sDR struct {
|
|
||||||
options []grpc.DialOption
|
|
||||||
rpcRegisterAddr string
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) {
|
|
||||||
return &K8sDR{}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
|
|
||||||
cli.rpcRegisterAddr = serviceName
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (cli *K8sDR) UnRegister() error {
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error {
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error {
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) {
|
|
||||||
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
|
|
||||||
|
|
||||||
conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
|
|
||||||
return []*grpc.ClientConn{conn}, err
|
|
||||||
}
|
|
||||||
func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
|
|
||||||
|
|
||||||
return grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
|
|
||||||
}
|
|
||||||
func (cli *K8sDR) GetSelfConnTarget() string {
|
|
||||||
|
|
||||||
return cli.rpcRegisterAddr
|
|
||||||
}
|
|
||||||
func (cli *K8sDR) AddOption(opts ...grpc.DialOption) {
|
|
||||||
cli.options = append(cli.options, opts...)
|
|
||||||
}
|
|
||||||
func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) {
|
|
||||||
conn.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
// do not use this method for call rpc
|
|
||||||
func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn {
|
|
||||||
fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (cli *K8sDR) Close() {
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
@ -1,407 +1,45 @@
|
|||||||
package discoveryregister
|
package discoveryregister
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"os"
|
||||||
"reflect"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/OpenIMSDK/tools/discoveryregistry"
|
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||||
"google.golang.org/grpc"
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func setupTestEnvironment() {
|
||||||
|
os.Setenv("ZOOKEEPER_SCHEMA", "openim")
|
||||||
|
os.Setenv("ZOOKEEPER_ADDRESS", "172.28.0.1:12181")
|
||||||
|
os.Setenv("ZOOKEEPER_USERNAME", "")
|
||||||
|
os.Setenv("ZOOKEEPER_PASSWORD", "")
|
||||||
|
}
|
||||||
|
|
||||||
func TestNewDiscoveryRegister(t *testing.T) {
|
func TestNewDiscoveryRegister(t *testing.T) {
|
||||||
type args struct {
|
setupTestEnvironment()
|
||||||
envType string
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
args args
|
|
||||||
want discoveryregistry.SvcDiscoveryRegistry
|
|
||||||
wantErr bool
|
|
||||||
}{
|
|
||||||
// TODO: Add test cases.
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
got, err := NewDiscoveryRegister(tt.args.envType)
|
|
||||||
if (err != nil) != tt.wantErr {
|
|
||||||
t.Errorf("NewDiscoveryRegister() error = %v, wantErr %v", err, tt.wantErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(got, tt.want) {
|
|
||||||
t.Errorf("NewDiscoveryRegister() = %v, want %v", got, tt.want)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestNewK8sDiscoveryRegister(t *testing.T) {
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
envType string
|
||||||
want discoveryregistry.SvcDiscoveryRegistry
|
expectedError bool
|
||||||
wantErr bool
|
expectedResult bool
|
||||||
}{
|
}{
|
||||||
// TODO: Add test cases.
|
{"zookeeper", false, true},
|
||||||
|
{"k8s", false, true}, // 假设 k8s 配置也已正确设置
|
||||||
|
{"invalid", true, false},
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
got, err := NewK8sDiscoveryRegister()
|
|
||||||
if (err != nil) != tt.wantErr {
|
|
||||||
t.Errorf("NewK8sDiscoveryRegister() error = %v, wantErr %v", err, tt.wantErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(got, tt.want) {
|
|
||||||
t.Errorf("NewK8sDiscoveryRegister() = %v, want %v", got, tt.want)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestK8sDR_Register(t *testing.T) {
|
for _, test := range tests {
|
||||||
type fields struct {
|
client, err := NewDiscoveryRegister(test.envType)
|
||||||
options []grpc.DialOption
|
|
||||||
rpcRegisterAddr string
|
|
||||||
}
|
|
||||||
type args struct {
|
|
||||||
serviceName string
|
|
||||||
host string
|
|
||||||
port int
|
|
||||||
opts []grpc.DialOption
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
fields fields
|
|
||||||
args args
|
|
||||||
wantErr bool
|
|
||||||
}{
|
|
||||||
// TODO: Add test cases.
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
cli := &K8sDR{
|
|
||||||
options: tt.fields.options,
|
|
||||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
|
||||||
}
|
|
||||||
if err := cli.Register(tt.args.serviceName, tt.args.host, tt.args.port, tt.args.opts...); (err != nil) != tt.wantErr {
|
|
||||||
t.Errorf("K8sDR.Register() error = %v, wantErr %v", err, tt.wantErr)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestK8sDR_UnRegister(t *testing.T) {
|
if test.expectedError {
|
||||||
type fields struct {
|
assert.Error(t, err)
|
||||||
options []grpc.DialOption
|
} else {
|
||||||
rpcRegisterAddr string
|
assert.NoError(t, err)
|
||||||
}
|
if test.expectedResult {
|
||||||
tests := []struct {
|
assert.Implements(t, (*discoveryregistry.SvcDiscoveryRegistry)(nil), client)
|
||||||
name string
|
} else {
|
||||||
fields fields
|
assert.Nil(t, client)
|
||||||
wantErr bool
|
|
||||||
}{
|
|
||||||
// TODO: Add test cases.
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
cli := &K8sDR{
|
|
||||||
options: tt.fields.options,
|
|
||||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
|
||||||
}
|
}
|
||||||
if err := cli.UnRegister(); (err != nil) != tt.wantErr {
|
}
|
||||||
t.Errorf("K8sDR.UnRegister() error = %v, wantErr %v", err, tt.wantErr)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestK8sDR_CreateRpcRootNodes(t *testing.T) {
|
|
||||||
type fields struct {
|
|
||||||
options []grpc.DialOption
|
|
||||||
rpcRegisterAddr string
|
|
||||||
}
|
|
||||||
type args struct {
|
|
||||||
serviceNames []string
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
fields fields
|
|
||||||
args args
|
|
||||||
wantErr bool
|
|
||||||
}{
|
|
||||||
// TODO: Add test cases.
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
cli := &K8sDR{
|
|
||||||
options: tt.fields.options,
|
|
||||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
|
||||||
}
|
|
||||||
if err := cli.CreateRpcRootNodes(tt.args.serviceNames); (err != nil) != tt.wantErr {
|
|
||||||
t.Errorf("K8sDR.CreateRpcRootNodes() error = %v, wantErr %v", err, tt.wantErr)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestK8sDR_RegisterConf2Registry(t *testing.T) {
|
|
||||||
type fields struct {
|
|
||||||
options []grpc.DialOption
|
|
||||||
rpcRegisterAddr string
|
|
||||||
}
|
|
||||||
type args struct {
|
|
||||||
key string
|
|
||||||
conf []byte
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
fields fields
|
|
||||||
args args
|
|
||||||
wantErr bool
|
|
||||||
}{
|
|
||||||
// TODO: Add test cases.
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
cli := &K8sDR{
|
|
||||||
options: tt.fields.options,
|
|
||||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
|
||||||
}
|
|
||||||
if err := cli.RegisterConf2Registry(tt.args.key, tt.args.conf); (err != nil) != tt.wantErr {
|
|
||||||
t.Errorf("K8sDR.RegisterConf2Registry() error = %v, wantErr %v", err, tt.wantErr)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestK8sDR_GetConfFromRegistry(t *testing.T) {
|
|
||||||
type fields struct {
|
|
||||||
options []grpc.DialOption
|
|
||||||
rpcRegisterAddr string
|
|
||||||
}
|
|
||||||
type args struct {
|
|
||||||
key string
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
fields fields
|
|
||||||
args args
|
|
||||||
want []byte
|
|
||||||
wantErr bool
|
|
||||||
}{
|
|
||||||
// TODO: Add test cases.
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
cli := &K8sDR{
|
|
||||||
options: tt.fields.options,
|
|
||||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
|
||||||
}
|
|
||||||
got, err := cli.GetConfFromRegistry(tt.args.key)
|
|
||||||
if (err != nil) != tt.wantErr {
|
|
||||||
t.Errorf("K8sDR.GetConfFromRegistry() error = %v, wantErr %v", err, tt.wantErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(got, tt.want) {
|
|
||||||
t.Errorf("K8sDR.GetConfFromRegistry() = %v, want %v", got, tt.want)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestK8sDR_GetConns(t *testing.T) {
|
|
||||||
type fields struct {
|
|
||||||
options []grpc.DialOption
|
|
||||||
rpcRegisterAddr string
|
|
||||||
}
|
|
||||||
type args struct {
|
|
||||||
ctx context.Context
|
|
||||||
serviceName string
|
|
||||||
opts []grpc.DialOption
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
fields fields
|
|
||||||
args args
|
|
||||||
want []*grpc.ClientConn
|
|
||||||
wantErr bool
|
|
||||||
}{
|
|
||||||
// TODO: Add test cases.
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
cli := &K8sDR{
|
|
||||||
options: tt.fields.options,
|
|
||||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
|
||||||
}
|
|
||||||
got, err := cli.GetConns(tt.args.ctx, tt.args.serviceName, tt.args.opts...)
|
|
||||||
if (err != nil) != tt.wantErr {
|
|
||||||
t.Errorf("K8sDR.GetConns() error = %v, wantErr %v", err, tt.wantErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(got, tt.want) {
|
|
||||||
t.Errorf("K8sDR.GetConns() = %v, want %v", got, tt.want)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestK8sDR_GetConn(t *testing.T) {
|
|
||||||
type fields struct {
|
|
||||||
options []grpc.DialOption
|
|
||||||
rpcRegisterAddr string
|
|
||||||
}
|
|
||||||
type args struct {
|
|
||||||
ctx context.Context
|
|
||||||
serviceName string
|
|
||||||
opts []grpc.DialOption
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
fields fields
|
|
||||||
args args
|
|
||||||
want *grpc.ClientConn
|
|
||||||
wantErr bool
|
|
||||||
}{
|
|
||||||
// TODO: Add test cases.
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
cli := &K8sDR{
|
|
||||||
options: tt.fields.options,
|
|
||||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
|
||||||
}
|
|
||||||
got, err := cli.GetConn(tt.args.ctx, tt.args.serviceName, tt.args.opts...)
|
|
||||||
if (err != nil) != tt.wantErr {
|
|
||||||
t.Errorf("K8sDR.GetConn() error = %v, wantErr %v", err, tt.wantErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(got, tt.want) {
|
|
||||||
t.Errorf("K8sDR.GetConn() = %v, want %v", got, tt.want)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestK8sDR_GetSelfConnTarget(t *testing.T) {
|
|
||||||
type fields struct {
|
|
||||||
options []grpc.DialOption
|
|
||||||
rpcRegisterAddr string
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
fields fields
|
|
||||||
want string
|
|
||||||
}{
|
|
||||||
// TODO: Add test cases.
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
cli := &K8sDR{
|
|
||||||
options: tt.fields.options,
|
|
||||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
|
||||||
}
|
|
||||||
if got := cli.GetSelfConnTarget(); got != tt.want {
|
|
||||||
t.Errorf("K8sDR.GetSelfConnTarget() = %v, want %v", got, tt.want)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestK8sDR_AddOption(t *testing.T) {
|
|
||||||
type fields struct {
|
|
||||||
options []grpc.DialOption
|
|
||||||
rpcRegisterAddr string
|
|
||||||
}
|
|
||||||
type args struct {
|
|
||||||
opts []grpc.DialOption
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
fields fields
|
|
||||||
args args
|
|
||||||
}{
|
|
||||||
// TODO: Add test cases.
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
cli := &K8sDR{
|
|
||||||
options: tt.fields.options,
|
|
||||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
|
||||||
}
|
|
||||||
cli.AddOption(tt.args.opts...)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestK8sDR_CloseConn(t *testing.T) {
|
|
||||||
type fields struct {
|
|
||||||
options []grpc.DialOption
|
|
||||||
rpcRegisterAddr string
|
|
||||||
}
|
|
||||||
type args struct {
|
|
||||||
conn *grpc.ClientConn
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
fields fields
|
|
||||||
args args
|
|
||||||
}{
|
|
||||||
// TODO: Add test cases.
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
cli := &K8sDR{
|
|
||||||
options: tt.fields.options,
|
|
||||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
|
||||||
}
|
|
||||||
cli.CloseConn(tt.args.conn)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestK8sDR_GetClientLocalConns(t *testing.T) {
|
|
||||||
type fields struct {
|
|
||||||
options []grpc.DialOption
|
|
||||||
rpcRegisterAddr string
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
fields fields
|
|
||||||
want map[string][]*grpc.ClientConn
|
|
||||||
}{
|
|
||||||
// TODO: Add test cases.
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
cli := &K8sDR{
|
|
||||||
options: tt.fields.options,
|
|
||||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
|
||||||
}
|
|
||||||
if got := cli.GetClientLocalConns(); !reflect.DeepEqual(got, tt.want) {
|
|
||||||
t.Errorf("K8sDR.GetClientLocalConns() = %v, want %v", got, tt.want)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestK8sDR_Close(t *testing.T) {
|
|
||||||
type fields struct {
|
|
||||||
options []grpc.DialOption
|
|
||||||
rpcRegisterAddr string
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
fields fields
|
|
||||||
}{
|
|
||||||
// TODO: Add test cases.
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
cli := &K8sDR{
|
|
||||||
options: tt.fields.options,
|
|
||||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
|
||||||
}
|
|
||||||
cli.Close()
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
90
pkg/common/discoveryregister/kubernetes/kubernetes.go
Normal file
90
pkg/common/discoveryregister/kubernetes/kubernetes.go
Normal file
@ -0,0 +1,90 @@
|
|||||||
|
package kubernetes
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
|
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||||
|
)
|
||||||
|
|
||||||
|
// K8sDR represents the Kubernetes service discovery and registration client.
|
||||||
|
type K8sDR struct {
|
||||||
|
options []grpc.DialOption
|
||||||
|
rpcRegisterAddr string
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewK8sDiscoveryRegister creates a new instance of K8sDR for Kubernetes service discovery and registration.
|
||||||
|
func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) {
|
||||||
|
|
||||||
|
return &K8sDR{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register registers a service with Kubernetes.
|
||||||
|
func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
|
||||||
|
cli.rpcRegisterAddr = serviceName
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnRegister removes a service registration from Kubernetes.
|
||||||
|
func (cli *K8sDR) UnRegister() error {
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateRpcRootNodes creates root nodes for RPC in Kubernetes.
|
||||||
|
func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error {
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterConf2Registry registers a configuration to the registry.
|
||||||
|
func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error {
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetConfFromRegistry retrieves a configuration from the registry.
|
||||||
|
func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetConns returns a list of gRPC client connections for a given service.
|
||||||
|
func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
|
||||||
|
conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
|
||||||
|
return []*grpc.ClientConn{conn}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetConn returns a single gRPC client connection for a given service.
|
||||||
|
func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
|
||||||
|
return grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetSelfConnTarget returns the connection target of the client itself.
|
||||||
|
func (cli *K8sDR) GetSelfConnTarget() string {
|
||||||
|
return cli.rpcRegisterAddr
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddOption adds gRPC dial options to the client.
|
||||||
|
func (cli *K8sDR) AddOption(opts ...grpc.DialOption) {
|
||||||
|
cli.options = append(cli.options, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// CloseConn closes a given gRPC client connection.
|
||||||
|
func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) {
|
||||||
|
conn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// do not use this method for call rpc
|
||||||
|
func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn {
|
||||||
|
fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the K8sDR client.
|
||||||
|
func (cli *K8sDR) Close() {
|
||||||
|
// Close any open resources here (if applicable)
|
||||||
|
return
|
||||||
|
}
|
46
pkg/common/discoveryregister/zookeeper/zookeeper.go
Normal file
46
pkg/common/discoveryregister/zookeeper/zookeeper.go
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
package zookeeper
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||||
|
openkeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper"
|
||||||
|
"github.com/OpenIMSDK/tools/log"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NewZookeeperDiscoveryRegister creates a new instance of ZookeeperDR for Zookeeper service discovery and registration.
|
||||||
|
func NewZookeeperDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) {
|
||||||
|
schema := getEnv("ZOOKEEPER_SCHEMA", config.Config.Zookeeper.Schema)
|
||||||
|
zkAddr := getZkAddrFromEnv(config.Config.Zookeeper.ZkAddr)
|
||||||
|
username := getEnv("ZOOKEEPER_USERNAME", config.Config.Zookeeper.Username)
|
||||||
|
password := getEnv("ZOOKEEPER_PASSWORD", config.Config.Zookeeper.Password)
|
||||||
|
|
||||||
|
return openkeeper.NewClient(
|
||||||
|
zkAddr,
|
||||||
|
schema,
|
||||||
|
openkeeper.WithFreq(time.Hour),
|
||||||
|
openkeeper.WithUserNameAndPassword(username, password),
|
||||||
|
openkeeper.WithRoundRobin(),
|
||||||
|
openkeeper.WithTimeout(10),
|
||||||
|
openkeeper.WithLogger(log.NewZkLogger()),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// getEnv returns the value of an environment variable if it exists, otherwise it returns the fallback value.
|
||||||
|
func getEnv(key, fallback string) string {
|
||||||
|
if value, exists := os.LookupEnv(key); exists {
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
return fallback
|
||||||
|
}
|
||||||
|
|
||||||
|
// getZkAddrFromEnv returns the value of an environment variable if it exists, otherwise it returns the fallback value.
|
||||||
|
func getZkAddrFromEnv(fallback []string) []string {
|
||||||
|
if value, exists := os.LookupEnv("ZOOKEEPER_ADDRESS"); exists {
|
||||||
|
return strings.Split(value, ",")
|
||||||
|
}
|
||||||
|
return fallback
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user