mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-04 11:22:10 +08:00 
			
		
		
		
	* chore: config * chore: config * chore: config * chore: config * chore: config * feat: config * fix: config * fix: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config
		
			
				
	
	
		
			271 lines
		
	
	
		
			7.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			271 lines
		
	
	
		
			7.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package kubernetes
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"log"
 | 
						|
	"os"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"google.golang.org/grpc"
 | 
						|
	"google.golang.org/grpc/credentials/insecure"
 | 
						|
	v1 "k8s.io/api/core/v1"
 | 
						|
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						|
	"k8s.io/client-go/informers"
 | 
						|
	"k8s.io/client-go/kubernetes"
 | 
						|
	"k8s.io/client-go/rest"
 | 
						|
	"k8s.io/client-go/tools/cache"
 | 
						|
)
 | 
						|
 | 
						|
type KubernetesConnManager struct {
 | 
						|
	clientset   *kubernetes.Clientset
 | 
						|
	namespace   string
 | 
						|
	dialOptions []grpc.DialOption
 | 
						|
 | 
						|
	rpcTargets map[string]string
 | 
						|
	selfTarget string
 | 
						|
 | 
						|
	mu      sync.RWMutex
 | 
						|
	connMap map[string][]*grpc.ClientConn
 | 
						|
}
 | 
						|
 | 
						|
// NewKubernetesConnManager creates a new connection manager that uses Kubernetes services for service discovery.
 | 
						|
func NewKubernetesConnManager(namespace string, options ...grpc.DialOption) (*KubernetesConnManager, error) {
 | 
						|
	config, err := rest.InClusterConfig()
 | 
						|
	if err != nil {
 | 
						|
		return nil, fmt.Errorf("failed to create in-cluster config: %v", err)
 | 
						|
	}
 | 
						|
 | 
						|
	clientset, err := kubernetes.NewForConfig(config)
 | 
						|
	if err != nil {
 | 
						|
		return nil, fmt.Errorf("failed to create clientset: %v", err)
 | 
						|
	}
 | 
						|
 | 
						|
	k := &KubernetesConnManager{
 | 
						|
		clientset:   clientset,
 | 
						|
		namespace:   namespace,
 | 
						|
		dialOptions: options,
 | 
						|
		connMap:     make(map[string][]*grpc.ClientConn),
 | 
						|
	}
 | 
						|
 | 
						|
	go k.watchEndpoints()
 | 
						|
 | 
						|
	return k, nil
 | 
						|
}
 | 
						|
 | 
						|
func (k *KubernetesConnManager) initializeConns(serviceName string) error {
 | 
						|
	port, err := k.getServicePort(serviceName)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	endpoints, err := k.clientset.CoreV1().Endpoints(k.namespace).Get(context.Background(), serviceName, metav1.GetOptions{})
 | 
						|
	if err != nil {
 | 
						|
		return fmt.Errorf("failed to get endpoints for service %s: %v", serviceName, err)
 | 
						|
	}
 | 
						|
 | 
						|
	// fmt.Println("Endpoints:", endpoints, "endpoints.Subsets:", endpoints.Subsets)
 | 
						|
 | 
						|
	var conns []*grpc.ClientConn
 | 
						|
	for _, subset := range endpoints.Subsets {
 | 
						|
		for _, address := range subset.Addresses {
 | 
						|
			target := fmt.Sprintf("%s:%d", address.IP, port)
 | 
						|
			// fmt.Println("IP target:", target)
 | 
						|
			conn, err := grpc.Dial(target, append(k.dialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))...)
 | 
						|
			if err != nil {
 | 
						|
				return fmt.Errorf("failed to dial endpoint %s: %v", target, err)
 | 
						|
			}
 | 
						|
			conns = append(conns, conn)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	k.mu.Lock()
 | 
						|
	k.connMap[serviceName] = conns
 | 
						|
	k.mu.Unlock()
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// GetConns returns gRPC client connections for a given Kubernetes service name.
 | 
						|
func (k *KubernetesConnManager) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
 | 
						|
	k.mu.RLock()
 | 
						|
 | 
						|
	conns, exists := k.connMap[serviceName]
 | 
						|
	k.mu.RUnlock()
 | 
						|
	if exists {
 | 
						|
		return conns, nil
 | 
						|
	}
 | 
						|
 | 
						|
	k.mu.Lock()
 | 
						|
	// Check if another goroutine has already initialized the connections when we released the read lock
 | 
						|
	conns, exists = k.connMap[serviceName]
 | 
						|
	if exists {
 | 
						|
		return conns, nil
 | 
						|
	}
 | 
						|
	k.mu.Unlock()
 | 
						|
 | 
						|
	if err := k.initializeConns(serviceName); err != nil {
 | 
						|
		fmt.Println("Failed to initialize connections:", err)
 | 
						|
		return nil, fmt.Errorf("failed to initialize connections for service %s: %v", serviceName, err)
 | 
						|
	}
 | 
						|
 | 
						|
	return k.connMap[serviceName], nil
 | 
						|
}
 | 
						|
 | 
						|
// GetConn returns a single gRPC client connection for a given Kubernetes service name.
 | 
						|
func (k *KubernetesConnManager) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
 | 
						|
	var target string
 | 
						|
 | 
						|
	if k.rpcTargets[serviceName] == "" {
 | 
						|
		var err error
 | 
						|
 | 
						|
		svcPort, err := k.getServicePort(serviceName)
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
 | 
						|
		target = fmt.Sprintf("%s.%s.svc.cluster.local:%d", serviceName, k.namespace, svcPort)
 | 
						|
 | 
						|
		// fmt.Println("SVC target:", target)
 | 
						|
	} else {
 | 
						|
		target = k.rpcTargets[serviceName]
 | 
						|
	}
 | 
						|
 | 
						|
	return grpc.DialContext(
 | 
						|
		ctx,
 | 
						|
		target,
 | 
						|
		append([]grpc.DialOption{
 | 
						|
			grpc.WithTransportCredentials(insecure.NewCredentials()),
 | 
						|
			grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1024*1024*10), grpc.MaxCallSendMsgSize(1024*1024*20)),
 | 
						|
		}, k.dialOptions...)...,
 | 
						|
	)
 | 
						|
}
 | 
						|
 | 
						|
// GetSelfConnTarget returns the connection target for the current service.
 | 
						|
func (k *KubernetesConnManager) GetSelfConnTarget() string {
 | 
						|
	if k.selfTarget == "" {
 | 
						|
		hostName := os.Getenv("HOSTNAME")
 | 
						|
 | 
						|
		pod, err := k.clientset.CoreV1().Pods(k.namespace).Get(context.Background(), hostName, metav1.GetOptions{})
 | 
						|
		if err != nil {
 | 
						|
			log.Printf("failed to get pod %s: %v \n", hostName, err)
 | 
						|
		}
 | 
						|
 | 
						|
		for pod.Status.PodIP == "" {
 | 
						|
			pod, err = k.clientset.CoreV1().Pods(k.namespace).Get(context.TODO(), hostName, metav1.GetOptions{})
 | 
						|
			if err != nil {
 | 
						|
				log.Printf("Error getting pod: %v \n", err)
 | 
						|
			}
 | 
						|
 | 
						|
			time.Sleep(3 * time.Second)
 | 
						|
		}
 | 
						|
 | 
						|
		var selfPort int32
 | 
						|
 | 
						|
		for _, port := range pod.Spec.Containers[0].Ports {
 | 
						|
			if port.ContainerPort != 10001 {
 | 
						|
				selfPort = port.ContainerPort
 | 
						|
				break
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		k.selfTarget = fmt.Sprintf("%s:%d", pod.Status.PodIP, selfPort)
 | 
						|
	}
 | 
						|
 | 
						|
	return k.selfTarget
 | 
						|
}
 | 
						|
 | 
						|
// AddOption appends gRPC dial options to the existing options.
 | 
						|
func (k *KubernetesConnManager) AddOption(opts ...grpc.DialOption) {
 | 
						|
	k.mu.Lock()
 | 
						|
	defer k.mu.Unlock()
 | 
						|
	k.dialOptions = append(k.dialOptions, opts...)
 | 
						|
}
 | 
						|
 | 
						|
// CloseConn closes a given gRPC client connection.
 | 
						|
func (k *KubernetesConnManager) CloseConn(conn *grpc.ClientConn) {
 | 
						|
	conn.Close()
 | 
						|
}
 | 
						|
 | 
						|
// Close closes all gRPC connections managed by KubernetesConnManager.
 | 
						|
func (k *KubernetesConnManager) Close() {
 | 
						|
	k.mu.Lock()
 | 
						|
	defer k.mu.Unlock()
 | 
						|
	for _, conns := range k.connMap {
 | 
						|
		for _, conn := range conns {
 | 
						|
			_ = conn.Close()
 | 
						|
		}
 | 
						|
	}
 | 
						|
	k.connMap = make(map[string][]*grpc.ClientConn)
 | 
						|
}
 | 
						|
 | 
						|
func (k *KubernetesConnManager) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (k *KubernetesConnManager) UnRegister() error {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (k *KubernetesConnManager) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) {
 | 
						|
	return "", nil
 | 
						|
}
 | 
						|
 | 
						|
func (k *KubernetesConnManager) getServicePort(serviceName string) (int32, error) {
 | 
						|
	var svcPort int32
 | 
						|
 | 
						|
	svc, err := k.clientset.CoreV1().Services(k.namespace).Get(context.Background(), serviceName, metav1.GetOptions{})
 | 
						|
	if err != nil {
 | 
						|
		fmt.Print("namespace:", k.namespace)
 | 
						|
		return 0, fmt.Errorf("failed to get service %s: %v", serviceName, err)
 | 
						|
	}
 | 
						|
 | 
						|
	if len(svc.Spec.Ports) == 0 {
 | 
						|
		return 0, fmt.Errorf("service %s has no ports defined", serviceName)
 | 
						|
	}
 | 
						|
 | 
						|
	for _, port := range svc.Spec.Ports {
 | 
						|
		// fmt.Println(serviceName, " Now Get Port:", port.Port)
 | 
						|
		if port.Port != 10001 {
 | 
						|
			svcPort = port.Port
 | 
						|
			break
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return svcPort, nil
 | 
						|
}
 | 
						|
 | 
						|
// watchEndpoints listens for changes in Pod resources.
 | 
						|
func (k *KubernetesConnManager) watchEndpoints() {
 | 
						|
	informerFactory := informers.NewSharedInformerFactory(k.clientset, time.Minute*10)
 | 
						|
	informer := informerFactory.Core().V1().Pods().Informer()
 | 
						|
 | 
						|
	// Watch for Pod changes (add, update, delete)
 | 
						|
	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
 | 
						|
		AddFunc: func(obj interface{}) {
 | 
						|
			k.handleEndpointChange(obj)
 | 
						|
		},
 | 
						|
		UpdateFunc: func(oldObj, newObj interface{}) {
 | 
						|
			k.handleEndpointChange(newObj)
 | 
						|
		},
 | 
						|
		DeleteFunc: func(obj interface{}) {
 | 
						|
			k.handleEndpointChange(obj)
 | 
						|
		},
 | 
						|
	})
 | 
						|
 | 
						|
	informerFactory.Start(context.Background().Done())
 | 
						|
	<-context.Background().Done() // Block forever
 | 
						|
}
 | 
						|
 | 
						|
func (k *KubernetesConnManager) handleEndpointChange(obj interface{}) {
 | 
						|
	endpoint, ok := obj.(*v1.Endpoints)
 | 
						|
	if !ok {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	serviceName := endpoint.Name
 | 
						|
	if err := k.initializeConns(serviceName); err != nil {
 | 
						|
		fmt.Printf("Error initializing connections for %s: %v\n", serviceName, err)
 | 
						|
	}
 | 
						|
}
 |