mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-26 21:22:16 +08:00 
			
		
		
		
	* chore: config * chore: config * chore: config * chore: config * chore: config * feat: config * fix: config * fix: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config
		
			
				
	
	
		
			271 lines
		
	
	
		
			7.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			271 lines
		
	
	
		
			7.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package kubernetes
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"log"
 | |
| 	"os"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"google.golang.org/grpc"
 | |
| 	"google.golang.org/grpc/credentials/insecure"
 | |
| 	v1 "k8s.io/api/core/v1"
 | |
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | |
| 	"k8s.io/client-go/informers"
 | |
| 	"k8s.io/client-go/kubernetes"
 | |
| 	"k8s.io/client-go/rest"
 | |
| 	"k8s.io/client-go/tools/cache"
 | |
| )
 | |
| 
 | |
| type KubernetesConnManager struct {
 | |
| 	clientset   *kubernetes.Clientset
 | |
| 	namespace   string
 | |
| 	dialOptions []grpc.DialOption
 | |
| 
 | |
| 	rpcTargets map[string]string
 | |
| 	selfTarget string
 | |
| 
 | |
| 	mu      sync.RWMutex
 | |
| 	connMap map[string][]*grpc.ClientConn
 | |
| }
 | |
| 
 | |
| // NewKubernetesConnManager creates a new connection manager that uses Kubernetes services for service discovery.
 | |
| func NewKubernetesConnManager(namespace string, options ...grpc.DialOption) (*KubernetesConnManager, error) {
 | |
| 	config, err := rest.InClusterConfig()
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to create in-cluster config: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	clientset, err := kubernetes.NewForConfig(config)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to create clientset: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	k := &KubernetesConnManager{
 | |
| 		clientset:   clientset,
 | |
| 		namespace:   namespace,
 | |
| 		dialOptions: options,
 | |
| 		connMap:     make(map[string][]*grpc.ClientConn),
 | |
| 	}
 | |
| 
 | |
| 	go k.watchEndpoints()
 | |
| 
 | |
| 	return k, nil
 | |
| }
 | |
| 
 | |
| func (k *KubernetesConnManager) initializeConns(serviceName string) error {
 | |
| 	port, err := k.getServicePort(serviceName)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	endpoints, err := k.clientset.CoreV1().Endpoints(k.namespace).Get(context.Background(), serviceName, metav1.GetOptions{})
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to get endpoints for service %s: %v", serviceName, err)
 | |
| 	}
 | |
| 
 | |
| 	// fmt.Println("Endpoints:", endpoints, "endpoints.Subsets:", endpoints.Subsets)
 | |
| 
 | |
| 	var conns []*grpc.ClientConn
 | |
| 	for _, subset := range endpoints.Subsets {
 | |
| 		for _, address := range subset.Addresses {
 | |
| 			target := fmt.Sprintf("%s:%d", address.IP, port)
 | |
| 			// fmt.Println("IP target:", target)
 | |
| 			conn, err := grpc.Dial(target, append(k.dialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))...)
 | |
| 			if err != nil {
 | |
| 				return fmt.Errorf("failed to dial endpoint %s: %v", target, err)
 | |
| 			}
 | |
| 			conns = append(conns, conn)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	k.mu.Lock()
 | |
| 	k.connMap[serviceName] = conns
 | |
| 	k.mu.Unlock()
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // GetConns returns gRPC client connections for a given Kubernetes service name.
 | |
| func (k *KubernetesConnManager) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
 | |
| 	k.mu.RLock()
 | |
| 
 | |
| 	conns, exists := k.connMap[serviceName]
 | |
| 	k.mu.RUnlock()
 | |
| 	if exists {
 | |
| 		return conns, nil
 | |
| 	}
 | |
| 
 | |
| 	k.mu.Lock()
 | |
| 	// Check if another goroutine has already initialized the connections when we released the read lock
 | |
| 	conns, exists = k.connMap[serviceName]
 | |
| 	if exists {
 | |
| 		return conns, nil
 | |
| 	}
 | |
| 	k.mu.Unlock()
 | |
| 
 | |
| 	if err := k.initializeConns(serviceName); err != nil {
 | |
| 		fmt.Println("Failed to initialize connections:", err)
 | |
| 		return nil, fmt.Errorf("failed to initialize connections for service %s: %v", serviceName, err)
 | |
| 	}
 | |
| 
 | |
| 	return k.connMap[serviceName], nil
 | |
| }
 | |
| 
 | |
| // GetConn returns a single gRPC client connection for a given Kubernetes service name.
 | |
| func (k *KubernetesConnManager) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
 | |
| 	var target string
 | |
| 
 | |
| 	if k.rpcTargets[serviceName] == "" {
 | |
| 		var err error
 | |
| 
 | |
| 		svcPort, err := k.getServicePort(serviceName)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		target = fmt.Sprintf("%s.%s.svc.cluster.local:%d", serviceName, k.namespace, svcPort)
 | |
| 
 | |
| 		// fmt.Println("SVC target:", target)
 | |
| 	} else {
 | |
| 		target = k.rpcTargets[serviceName]
 | |
| 	}
 | |
| 
 | |
| 	return grpc.DialContext(
 | |
| 		ctx,
 | |
| 		target,
 | |
| 		append([]grpc.DialOption{
 | |
| 			grpc.WithTransportCredentials(insecure.NewCredentials()),
 | |
| 			grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1024*1024*10), grpc.MaxCallSendMsgSize(1024*1024*20)),
 | |
| 		}, k.dialOptions...)...,
 | |
| 	)
 | |
| }
 | |
| 
 | |
| // GetSelfConnTarget returns the connection target for the current service.
 | |
| func (k *KubernetesConnManager) GetSelfConnTarget() string {
 | |
| 	if k.selfTarget == "" {
 | |
| 		hostName := os.Getenv("HOSTNAME")
 | |
| 
 | |
| 		pod, err := k.clientset.CoreV1().Pods(k.namespace).Get(context.Background(), hostName, metav1.GetOptions{})
 | |
| 		if err != nil {
 | |
| 			log.Printf("failed to get pod %s: %v \n", hostName, err)
 | |
| 		}
 | |
| 
 | |
| 		for pod.Status.PodIP == "" {
 | |
| 			pod, err = k.clientset.CoreV1().Pods(k.namespace).Get(context.TODO(), hostName, metav1.GetOptions{})
 | |
| 			if err != nil {
 | |
| 				log.Printf("Error getting pod: %v \n", err)
 | |
| 			}
 | |
| 
 | |
| 			time.Sleep(3 * time.Second)
 | |
| 		}
 | |
| 
 | |
| 		var selfPort int32
 | |
| 
 | |
| 		for _, port := range pod.Spec.Containers[0].Ports {
 | |
| 			if port.ContainerPort != 10001 {
 | |
| 				selfPort = port.ContainerPort
 | |
| 				break
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		k.selfTarget = fmt.Sprintf("%s:%d", pod.Status.PodIP, selfPort)
 | |
| 	}
 | |
| 
 | |
| 	return k.selfTarget
 | |
| }
 | |
| 
 | |
| // AddOption appends gRPC dial options to the existing options.
 | |
| func (k *KubernetesConnManager) AddOption(opts ...grpc.DialOption) {
 | |
| 	k.mu.Lock()
 | |
| 	defer k.mu.Unlock()
 | |
| 	k.dialOptions = append(k.dialOptions, opts...)
 | |
| }
 | |
| 
 | |
| // CloseConn closes a given gRPC client connection.
 | |
| func (k *KubernetesConnManager) CloseConn(conn *grpc.ClientConn) {
 | |
| 	conn.Close()
 | |
| }
 | |
| 
 | |
| // Close closes all gRPC connections managed by KubernetesConnManager.
 | |
| func (k *KubernetesConnManager) Close() {
 | |
| 	k.mu.Lock()
 | |
| 	defer k.mu.Unlock()
 | |
| 	for _, conns := range k.connMap {
 | |
| 		for _, conn := range conns {
 | |
| 			_ = conn.Close()
 | |
| 		}
 | |
| 	}
 | |
| 	k.connMap = make(map[string][]*grpc.ClientConn)
 | |
| }
 | |
| 
 | |
| func (k *KubernetesConnManager) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (k *KubernetesConnManager) UnRegister() error {
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (k *KubernetesConnManager) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) {
 | |
| 	return "", nil
 | |
| }
 | |
| 
 | |
| func (k *KubernetesConnManager) getServicePort(serviceName string) (int32, error) {
 | |
| 	var svcPort int32
 | |
| 
 | |
| 	svc, err := k.clientset.CoreV1().Services(k.namespace).Get(context.Background(), serviceName, metav1.GetOptions{})
 | |
| 	if err != nil {
 | |
| 		fmt.Print("namespace:", k.namespace)
 | |
| 		return 0, fmt.Errorf("failed to get service %s: %v", serviceName, err)
 | |
| 	}
 | |
| 
 | |
| 	if len(svc.Spec.Ports) == 0 {
 | |
| 		return 0, fmt.Errorf("service %s has no ports defined", serviceName)
 | |
| 	}
 | |
| 
 | |
| 	for _, port := range svc.Spec.Ports {
 | |
| 		// fmt.Println(serviceName, " Now Get Port:", port.Port)
 | |
| 		if port.Port != 10001 {
 | |
| 			svcPort = port.Port
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return svcPort, nil
 | |
| }
 | |
| 
 | |
| // watchEndpoints listens for changes in Pod resources.
 | |
| func (k *KubernetesConnManager) watchEndpoints() {
 | |
| 	informerFactory := informers.NewSharedInformerFactory(k.clientset, time.Minute*10)
 | |
| 	informer := informerFactory.Core().V1().Pods().Informer()
 | |
| 
 | |
| 	// Watch for Pod changes (add, update, delete)
 | |
| 	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
 | |
| 		AddFunc: func(obj interface{}) {
 | |
| 			k.handleEndpointChange(obj)
 | |
| 		},
 | |
| 		UpdateFunc: func(oldObj, newObj interface{}) {
 | |
| 			k.handleEndpointChange(newObj)
 | |
| 		},
 | |
| 		DeleteFunc: func(obj interface{}) {
 | |
| 			k.handleEndpointChange(obj)
 | |
| 		},
 | |
| 	})
 | |
| 
 | |
| 	informerFactory.Start(context.Background().Done())
 | |
| 	<-context.Background().Done() // Block forever
 | |
| }
 | |
| 
 | |
| func (k *KubernetesConnManager) handleEndpointChange(obj interface{}) {
 | |
| 	endpoint, ok := obj.(*v1.Endpoints)
 | |
| 	if !ok {
 | |
| 		return
 | |
| 	}
 | |
| 	serviceName := endpoint.Name
 | |
| 	if err := k.initializeConns(serviceName); err != nil {
 | |
| 		fmt.Printf("Error initializing connections for %s: %v\n", serviceName, err)
 | |
| 	}
 | |
| }
 |