mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-10-26 21:22:16 +08:00
update discovery in kubernetes.
This commit is contained in:
parent
a8fad30507
commit
fbaa7792ca
2
go.mod
2
go.mod
@ -15,7 +15,7 @@ require (
|
|||||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
||||||
github.com/mitchellh/mapstructure v1.5.0
|
github.com/mitchellh/mapstructure v1.5.0
|
||||||
github.com/openimsdk/protocol v0.0.72-alpha.61
|
github.com/openimsdk/protocol v0.0.72-alpha.61
|
||||||
github.com/openimsdk/tools v0.0.50-alpha.46
|
github.com/openimsdk/tools v0.0.50-alpha.51
|
||||||
github.com/pkg/errors v0.9.1 // indirect
|
github.com/pkg/errors v0.9.1 // indirect
|
||||||
github.com/prometheus/client_golang v1.18.0
|
github.com/prometheus/client_golang v1.18.0
|
||||||
github.com/stretchr/testify v1.9.0
|
github.com/stretchr/testify v1.9.0
|
||||||
|
|||||||
4
go.sum
4
go.sum
@ -349,8 +349,8 @@ github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrk
|
|||||||
github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
||||||
github.com/openimsdk/protocol v0.0.72-alpha.61 h1:RuZR9/Sg3p6Bpb2CKPjPoA2AUmTvHITmhZ3PT/RbWMs=
|
github.com/openimsdk/protocol v0.0.72-alpha.61 h1:RuZR9/Sg3p6Bpb2CKPjPoA2AUmTvHITmhZ3PT/RbWMs=
|
||||||
github.com/openimsdk/protocol v0.0.72-alpha.61/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M=
|
github.com/openimsdk/protocol v0.0.72-alpha.61/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M=
|
||||||
github.com/openimsdk/tools v0.0.50-alpha.46 h1:j3HxPxhDptVHwr7eChL2rCH8mKfpUEcr4nHi5k4yDME=
|
github.com/openimsdk/tools v0.0.50-alpha.51 h1:M3dMUoHjggx5Ry6XSkK0FTSJmRQjjkSBpuzXiFzKtC4=
|
||||||
github.com/openimsdk/tools v0.0.50-alpha.46/go.mod h1:muCtxguNJv8lFwLei27UASu2Nvg4ERSeN0R4K5tivk0=
|
github.com/openimsdk/tools v0.0.50-alpha.51/go.mod h1:muCtxguNJv8lFwLei27UASu2Nvg4ERSeN0R4K5tivk0=
|
||||||
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
|
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
|
||||||
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
|
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
|
||||||
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
|
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
|
||||||
|
|||||||
@ -3,15 +3,16 @@ package msggateway
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
|
|
||||||
pbAuth "github.com/openimsdk/protocol/auth"
|
|
||||||
"github.com/openimsdk/tools/mcontext"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
|
||||||
|
pbAuth "github.com/openimsdk/protocol/auth"
|
||||||
|
"github.com/openimsdk/tools/mcontext"
|
||||||
|
|
||||||
"github.com/go-playground/validator/v10"
|
"github.com/go-playground/validator/v10"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
|
||||||
|
|||||||
@ -2,14 +2,19 @@ package push
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/openimsdk/protocol/msggateway"
|
"github.com/openimsdk/protocol/msggateway"
|
||||||
"github.com/openimsdk/protocol/sdkws"
|
"github.com/openimsdk/protocol/sdkws"
|
||||||
"github.com/openimsdk/tools/discovery"
|
"github.com/openimsdk/tools/discovery"
|
||||||
|
"github.com/openimsdk/tools/errs"
|
||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
"github.com/openimsdk/tools/utils/datautil"
|
"github.com/openimsdk/tools/utils/datautil"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"sync"
|
|
||||||
|
conf "github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
)
|
)
|
||||||
|
|
||||||
type OnlinePusher interface {
|
type OnlinePusher interface {
|
||||||
@ -37,15 +42,16 @@ func (u emptyOnlinePusher) GetOnlinePushFailedUserIDs(ctx context.Context, msg *
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewOnlinePusher(disCov discovery.SvcDiscoveryRegistry, config *Config) OnlinePusher {
|
func NewOnlinePusher(disCov discovery.SvcDiscoveryRegistry, config *Config) OnlinePusher {
|
||||||
switch config.Discovery.Enable {
|
|
||||||
case "k8s":
|
if config.runTimeEnv == conf.KUBERNETES {
|
||||||
return NewK8sStaticConsistentHash(disCov, config)
|
|
||||||
case "zookeeper":
|
|
||||||
return NewDefaultAllNode(disCov, config)
|
return NewDefaultAllNode(disCov, config)
|
||||||
case "etcd":
|
}
|
||||||
|
switch config.Discovery.Enable {
|
||||||
|
case conf.ETCD:
|
||||||
return NewDefaultAllNode(disCov, config)
|
return NewDefaultAllNode(disCov, config)
|
||||||
default:
|
default:
|
||||||
return newEmptyOnlinePusher()
|
log.ZError(context.Background(), "NewOnlinePusher is error", errs.Wrap(errors.New("unsupported discovery type")), "type", config.Discovery.Enable)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -10,6 +10,7 @@ import (
|
|||||||
pbpush "github.com/openimsdk/protocol/push"
|
pbpush "github.com/openimsdk/protocol/push"
|
||||||
"github.com/openimsdk/tools/db/redisutil"
|
"github.com/openimsdk/tools/db/redisutil"
|
||||||
"github.com/openimsdk/tools/discovery"
|
"github.com/openimsdk/tools/discovery"
|
||||||
|
"github.com/openimsdk/tools/utils/runtimeenv"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -32,6 +33,8 @@ type Config struct {
|
|||||||
LocalCacheConfig config.LocalCache
|
LocalCacheConfig config.LocalCache
|
||||||
Discovery config.Discovery
|
Discovery config.Discovery
|
||||||
FcmConfigPath string
|
FcmConfigPath string
|
||||||
|
|
||||||
|
runTimeEnv string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p pushServer) PushMsg(ctx context.Context, req *pbpush.PushMsgReq) (*pbpush.PushMsgResp, error) {
|
func (p pushServer) PushMsg(ctx context.Context, req *pbpush.PushMsgReq) (*pbpush.PushMsgResp, error) {
|
||||||
@ -48,6 +51,8 @@ func (p pushServer) DelUserPushToken(ctx context.Context,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
|
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
|
||||||
|
config.runTimeEnv = runtimeenv.PrintRuntimeEnvironment()
|
||||||
|
|
||||||
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
|
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|||||||
@ -20,6 +20,7 @@ const (
|
|||||||
MountConfigFilePath = "CONFIG_PATH"
|
MountConfigFilePath = "CONFIG_PATH"
|
||||||
DeploymentType = "DEPLOYMENT_TYPE"
|
DeploymentType = "DEPLOYMENT_TYPE"
|
||||||
KUBERNETES = "kubernetes"
|
KUBERNETES = "kubernetes"
|
||||||
|
ETCD = "etcd"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@ -19,6 +19,7 @@ import (
|
|||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
"github.com/openimsdk/tools/discovery"
|
"github.com/openimsdk/tools/discovery"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
"github.com/openimsdk/tools/discovery/kubernetes"
|
"github.com/openimsdk/tools/discovery/kubernetes"
|
||||||
|
|
||||||
@ -28,14 +29,16 @@ import (
|
|||||||
|
|
||||||
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
|
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
|
||||||
func NewDiscoveryRegister(discovery *config.Discovery, runtimeEnv string) (discovery.SvcDiscoveryRegistry, error) {
|
func NewDiscoveryRegister(discovery *config.Discovery, runtimeEnv string) (discovery.SvcDiscoveryRegistry, error) {
|
||||||
if runtimeEnv == "kubernetes" {
|
if runtimeEnv == config.KUBERNETES {
|
||||||
discovery.Enable = "kubernetes"
|
return kubernetes.NewKubernetesConnManager(discovery.Kubernetes.Namespace,
|
||||||
|
grpc.WithDefaultCallOptions(
|
||||||
|
grpc.MaxCallSendMsgSize(1024*1024*20),
|
||||||
|
),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
switch discovery.Enable {
|
switch discovery.Enable {
|
||||||
case "kubernetes":
|
case config.ETCD:
|
||||||
return kubernetes.NewKubernetesConnManager(discovery.Kubernetes.Namespace)
|
|
||||||
case "etcd":
|
|
||||||
return etcd.NewSvcDiscoveryRegistry(
|
return etcd.NewSvcDiscoveryRegistry(
|
||||||
discovery.Etcd.RootDirectory,
|
discovery.Etcd.RootDirectory,
|
||||||
discovery.Etcd.Address,
|
discovery.Etcd.Address,
|
||||||
|
|||||||
@ -1,22 +1,10 @@
|
|||||||
// Copyright © 2023 OpenIM. All rights reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package kubernetes
|
package kubernetes
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -35,6 +23,7 @@ type KubernetesConnManager struct {
|
|||||||
namespace string
|
namespace string
|
||||||
dialOptions []grpc.DialOption
|
dialOptions []grpc.DialOption
|
||||||
|
|
||||||
|
rpcTargets map[string]string
|
||||||
selfTarget string
|
selfTarget string
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
@ -76,11 +65,14 @@ func (k *KubernetesConnManager) initializeConns(serviceName string) error {
|
|||||||
return fmt.Errorf("failed to get endpoints for service %s: %v", serviceName, err)
|
return fmt.Errorf("failed to get endpoints for service %s: %v", serviceName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// fmt.Println("Endpoints:", endpoints, "endpoints.Subsets:", endpoints.Subsets)
|
||||||
|
|
||||||
var conns []*grpc.ClientConn
|
var conns []*grpc.ClientConn
|
||||||
for _, subset := range endpoints.Subsets {
|
for _, subset := range endpoints.Subsets {
|
||||||
for _, address := range subset.Addresses {
|
for _, address := range subset.Addresses {
|
||||||
target := fmt.Sprintf("%s:%d", address.IP, port)
|
target := fmt.Sprintf("%s:%d", address.IP, port)
|
||||||
conn, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
// fmt.Println("IP target:", target)
|
||||||
|
conn, err := grpc.Dial(target, append(k.dialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to dial endpoint %s: %v", target, err)
|
return fmt.Errorf("failed to dial endpoint %s: %v", target, err)
|
||||||
}
|
}
|
||||||
@ -89,10 +81,8 @@ func (k *KubernetesConnManager) initializeConns(serviceName string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
k.mu.Lock()
|
k.mu.Lock()
|
||||||
defer k.mu.Unlock()
|
|
||||||
k.connMap[serviceName] = conns
|
k.connMap[serviceName] = conns
|
||||||
|
k.mu.Unlock()
|
||||||
// go k.watchEndpoints(serviceName)
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -100,23 +90,23 @@ func (k *KubernetesConnManager) initializeConns(serviceName string) error {
|
|||||||
// GetConns returns gRPC client connections for a given Kubernetes service name.
|
// GetConns returns gRPC client connections for a given Kubernetes service name.
|
||||||
func (k *KubernetesConnManager) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
|
func (k *KubernetesConnManager) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
|
||||||
k.mu.RLock()
|
k.mu.RLock()
|
||||||
conns, exists := k.connMap[serviceName]
|
|
||||||
defer k.mu.RUnlock()
|
|
||||||
|
|
||||||
|
conns, exists := k.connMap[serviceName]
|
||||||
|
k.mu.RUnlock()
|
||||||
if exists {
|
if exists {
|
||||||
return conns, nil
|
return conns, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
k.mu.Lock()
|
k.mu.Lock()
|
||||||
defer k.mu.Unlock()
|
|
||||||
|
|
||||||
// Check if another goroutine has already initialized the connections when we released the read lock
|
// Check if another goroutine has already initialized the connections when we released the read lock
|
||||||
conns, exists = k.connMap[serviceName]
|
conns, exists = k.connMap[serviceName]
|
||||||
if exists {
|
if exists {
|
||||||
return conns, nil
|
return conns, nil
|
||||||
}
|
}
|
||||||
|
k.mu.Unlock()
|
||||||
|
|
||||||
if err := k.initializeConns(serviceName); err != nil {
|
if err := k.initializeConns(serviceName); err != nil {
|
||||||
|
fmt.Println("Failed to initialize connections:", err)
|
||||||
return nil, fmt.Errorf("failed to initialize connections for service %s: %v", serviceName, err)
|
return nil, fmt.Errorf("failed to initialize connections for service %s: %v", serviceName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -125,26 +115,64 @@ func (k *KubernetesConnManager) GetConns(ctx context.Context, serviceName string
|
|||||||
|
|
||||||
// GetConn returns a single gRPC client connection for a given Kubernetes service name.
|
// GetConn returns a single gRPC client connection for a given Kubernetes service name.
|
||||||
func (k *KubernetesConnManager) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
|
func (k *KubernetesConnManager) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
|
||||||
port, err := k.getServicePort(serviceName)
|
var target string
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
if k.rpcTargets[serviceName] == "" {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
svcPort, err := k.getServicePort(serviceName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
target = fmt.Sprintf("%s.%s.svc.cluster.local:%d", serviceName, k.namespace, svcPort)
|
||||||
|
|
||||||
|
// fmt.Println("SVC target:", target)
|
||||||
|
} else {
|
||||||
|
target = k.rpcTargets[serviceName]
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println("SVC port:", port)
|
|
||||||
|
|
||||||
target := fmt.Sprintf("%s.%s.svc.cluster.local:%d", serviceName, k.namespace, port)
|
|
||||||
|
|
||||||
fmt.Println("SVC target:", target)
|
|
||||||
|
|
||||||
return grpc.DialContext(
|
return grpc.DialContext(
|
||||||
ctx,
|
ctx,
|
||||||
target,
|
target,
|
||||||
append([]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}, k.dialOptions...)...,
|
append([]grpc.DialOption{
|
||||||
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||||
|
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1024*1024*10), grpc.MaxCallSendMsgSize(1024*1024*20)),
|
||||||
|
}, k.dialOptions...)...,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetSelfConnTarget returns the connection target for the current service.
|
// GetSelfConnTarget returns the connection target for the current service.
|
||||||
func (k *KubernetesConnManager) GetSelfConnTarget() string {
|
func (k *KubernetesConnManager) GetSelfConnTarget() string {
|
||||||
|
if k.selfTarget == "" {
|
||||||
|
hostName := os.Getenv("HOSTNAME")
|
||||||
|
|
||||||
|
pod, err := k.clientset.CoreV1().Pods(k.namespace).Get(context.Background(), hostName, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("failed to get pod %s: %v \n", hostName, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for pod.Status.PodIP == "" {
|
||||||
|
pod, err = k.clientset.CoreV1().Pods(k.namespace).Get(context.TODO(), hostName, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Error getting pod: %v \n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
var selfPort int32
|
||||||
|
|
||||||
|
for _, port := range pod.Spec.Containers[0].Ports {
|
||||||
|
if port.ContainerPort != 10001 {
|
||||||
|
selfPort = port.ContainerPort
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
k.selfTarget = fmt.Sprintf("%s:%d", pod.Status.PodIP, selfPort)
|
||||||
|
}
|
||||||
|
|
||||||
return k.selfTarget
|
return k.selfTarget
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -175,6 +203,7 @@ func (k *KubernetesConnManager) Close() {
|
|||||||
func (k *KubernetesConnManager) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
|
func (k *KubernetesConnManager) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *KubernetesConnManager) UnRegister() error {
|
func (k *KubernetesConnManager) UnRegister() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -184,6 +213,8 @@ func (k *KubernetesConnManager) GetUserIdHashGatewayHost(ctx context.Context, us
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (k *KubernetesConnManager) getServicePort(serviceName string) (int32, error) {
|
func (k *KubernetesConnManager) getServicePort(serviceName string) (int32, error) {
|
||||||
|
var svcPort int32
|
||||||
|
|
||||||
svc, err := k.clientset.CoreV1().Services(k.namespace).Get(context.Background(), serviceName, metav1.GetOptions{})
|
svc, err := k.clientset.CoreV1().Services(k.namespace).Get(context.Background(), serviceName, metav1.GetOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Print("namespace:", k.namespace)
|
fmt.Print("namespace:", k.namespace)
|
||||||
@ -194,7 +225,15 @@ func (k *KubernetesConnManager) getServicePort(serviceName string) (int32, error
|
|||||||
return 0, fmt.Errorf("service %s has no ports defined", serviceName)
|
return 0, fmt.Errorf("service %s has no ports defined", serviceName)
|
||||||
}
|
}
|
||||||
|
|
||||||
return svc.Spec.Ports[0].Port, nil
|
for _, port := range svc.Spec.Ports {
|
||||||
|
// fmt.Println(serviceName, " Now Get Port:", port.Port)
|
||||||
|
if port.Port != 10001 {
|
||||||
|
svcPort = port.Port
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return svcPort, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// watchEndpoints listens for changes in Pod resources.
|
// watchEndpoints listens for changes in Pod resources.
|
||||||
@ -229,68 +268,3 @@ func (k *KubernetesConnManager) handleEndpointChange(obj interface{}) {
|
|||||||
fmt.Printf("Error initializing connections for %s: %v\n", serviceName, err)
|
fmt.Printf("Error initializing connections for %s: %v\n", serviceName, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// =================
|
|
||||||
|
|
||||||
// initEndpoints initializes connections by fetching all available endpoints in the specified namespace.
|
|
||||||
|
|
||||||
// func (k *KubernetesConnManager) initEndpoints() error {
|
|
||||||
// k.mu.Lock()
|
|
||||||
// defer k.mu.Unlock()
|
|
||||||
|
|
||||||
// pods, err := k.clientset.CoreV1().Pods(k.namespace).List(context.TODO(), metav1.ListOptions{})
|
|
||||||
// if err != nil {
|
|
||||||
// return fmt.Errorf("failed to list pods: %v", err)
|
|
||||||
// }
|
|
||||||
|
|
||||||
// for _, pod := range pods.Items {
|
|
||||||
// if pod.Status.Phase == v1.PodRunning {
|
|
||||||
// target := fmt.Sprintf("%s:%d", address.IP, port)
|
|
||||||
// conn, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
||||||
// conn, err := k.createGRPCConnection(pod)
|
|
||||||
// if err != nil {
|
|
||||||
// return fmt.Errorf("failed to create GRPC connection for pod %s: %v", pod.Name, err)
|
|
||||||
// }
|
|
||||||
// k.connMap[pod.Name] = append(k.connMap[pod.Name], conn)
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// return nil
|
|
||||||
// }
|
|
||||||
|
|
||||||
// -----
|
|
||||||
|
|
||||||
// func (k *KubernetesConnManager) watchEndpoints1(serviceName string) {
|
|
||||||
// // watch for changes to the service's endpoints
|
|
||||||
// informerFactory := informers.NewSharedInformerFactory(k.clientset, time.Minute)
|
|
||||||
// endpointsInformer := informerFactory.Core().V1().Endpoints().Informer()
|
|
||||||
|
|
||||||
// endpointsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
||||||
// AddFunc: func(obj interface{}) {
|
|
||||||
// eps := obj.(*v1.Endpoints)
|
|
||||||
// if eps.Name == serviceName {
|
|
||||||
// k.initializeConns(serviceName)
|
|
||||||
// }
|
|
||||||
// },
|
|
||||||
// UpdateFunc: func(oldObj, newObj interface{}) {
|
|
||||||
// eps := newObj.(*v1.Endpoints)
|
|
||||||
// if eps.Name == serviceName {
|
|
||||||
// k.initializeConns(serviceName)
|
|
||||||
// }
|
|
||||||
// },
|
|
||||||
// DeleteFunc: func(obj interface{}) {
|
|
||||||
// eps := obj.(*v1.Endpoints)
|
|
||||||
// if eps.Name == serviceName {
|
|
||||||
// k.mu.Lock()
|
|
||||||
// defer k.mu.Unlock()
|
|
||||||
// for _, conn := range k.connMap[serviceName] {
|
|
||||||
// _ = conn.Close()
|
|
||||||
// }
|
|
||||||
// delete(k.connMap, serviceName)
|
|
||||||
// }
|
|
||||||
// },
|
|
||||||
// })
|
|
||||||
|
|
||||||
// informerFactory.Start(wait.NeverStop)
|
|
||||||
// informerFactory.WaitForCacheSync(wait.NeverStop)
|
|
||||||
// }
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user