mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-26 21:22:16 +08:00 
			
		
		
		
	Merge branch 'openimsdk:main' into fix/image-services
This commit is contained in:
		
						commit
						33e48fa7d5
					
				| @ -1,8 +1,6 @@ | |||||||
| rpc: | rpc: | ||||||
|   # The IP address where this RPC service registers itself; if left blank, it defaults to the internal network IP |   # The IP address where this RPC service registers itself; if left blank, it defaults to the internal network IP | ||||||
|   registerIP:  |   registerIP: | ||||||
|   # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports |  | ||||||
|   ports: [ 10140, 10141, 10142, 10143, 10144, 10145, 10146, 10147, 10148, 10149, 10150, 10151, 10152, 10153, 10154, 10155 ] |  | ||||||
| 
 | 
 | ||||||
| prometheus: | prometheus: | ||||||
|   # Enable or disable Prometheus monitoring |   # Enable or disable Prometheus monitoring | ||||||
|  | |||||||
| @ -3,8 +3,6 @@ rpc: | |||||||
|   registerIP: |   registerIP: | ||||||
|   # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP |   # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP | ||||||
|   listenIP: 0.0.0.0 |   listenIP: 0.0.0.0 | ||||||
|   # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports |  | ||||||
|   ports: [ 10170, 10171, 10172, 10173, 10174, 10175, 10176, 10177, 10178, 10179, 10180, 10181, 10182, 10183, 10184, 10185 ] |  | ||||||
| 
 | 
 | ||||||
| prometheus: | prometheus: | ||||||
|   # Enable or disable Prometheus monitoring |   # Enable or disable Prometheus monitoring | ||||||
|  | |||||||
| @ -3,8 +3,7 @@ rpc: | |||||||
|   registerIP:  |   registerIP:  | ||||||
|   # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP |   # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP | ||||||
|   listenIP: 0.0.0.0 |   listenIP: 0.0.0.0 | ||||||
|   # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports | 
 | ||||||
|   ports: [ 10200 ] |  | ||||||
| 
 | 
 | ||||||
| prometheus: | prometheus: | ||||||
|   # Enable or disable Prometheus monitoring |   # Enable or disable Prometheus monitoring | ||||||
|  | |||||||
| @ -3,8 +3,6 @@ rpc: | |||||||
|   registerIP:  |   registerIP:  | ||||||
|   # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP |   # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP | ||||||
|   listenIP: 0.0.0.0 |   listenIP: 0.0.0.0 | ||||||
|   # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports |  | ||||||
|   ports: [ 10220 ] |  | ||||||
| 
 | 
 | ||||||
| prometheus: | prometheus: | ||||||
|   # Enable or disable Prometheus monitoring |   # Enable or disable Prometheus monitoring | ||||||
|  | |||||||
| @ -3,8 +3,6 @@ rpc: | |||||||
|   registerIP:  |   registerIP:  | ||||||
|   # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP |   # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP | ||||||
|   listenIP: 0.0.0.0 |   listenIP: 0.0.0.0 | ||||||
|   # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports |  | ||||||
|   ports: [ 10240 ] |  | ||||||
| 
 | 
 | ||||||
| prometheus: | prometheus: | ||||||
|   # Enable or disable Prometheus monitoring |   # Enable or disable Prometheus monitoring | ||||||
|  | |||||||
| @ -3,8 +3,6 @@ rpc: | |||||||
|   registerIP: |   registerIP: | ||||||
|   # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP |   # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP | ||||||
|   listenIP: 0.0.0.0 |   listenIP: 0.0.0.0 | ||||||
|   # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports |  | ||||||
|   ports: [ 10260 ] |  | ||||||
| 
 | 
 | ||||||
| prometheus: | prometheus: | ||||||
|   # Enable or disable Prometheus monitoring |   # Enable or disable Prometheus monitoring | ||||||
|  | |||||||
| @ -3,8 +3,6 @@ rpc: | |||||||
|   registerIP:  |   registerIP:  | ||||||
|   # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP |   # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP | ||||||
|   listenIP: 0.0.0.0 |   listenIP: 0.0.0.0 | ||||||
|   # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports |  | ||||||
|   ports: [ 10280 ] |  | ||||||
| 
 | 
 | ||||||
| prometheus: | prometheus: | ||||||
|   # Enable or disable Prometheus monitoring |   # Enable or disable Prometheus monitoring | ||||||
|  | |||||||
| @ -3,8 +3,6 @@ rpc: | |||||||
|   registerIP:  |   registerIP:  | ||||||
|   # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP |   # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP | ||||||
|   listenIP: 0.0.0.0 |   listenIP: 0.0.0.0 | ||||||
|   # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports |  | ||||||
|   ports: [ 10300 ] |  | ||||||
| 
 | 
 | ||||||
| prometheus: | prometheus: | ||||||
|   # Enable or disable Prometheus monitoring |   # Enable or disable Prometheus monitoring | ||||||
|  | |||||||
| @ -3,8 +3,6 @@ rpc: | |||||||
|   registerIP:  |   registerIP:  | ||||||
|   # Listening IP; 0.0.0.0 means both internal and external IPs are listened to, if blank, the internal network IP is automatically obtained by default |   # Listening IP; 0.0.0.0 means both internal and external IPs are listened to, if blank, the internal network IP is automatically obtained by default | ||||||
|   listenIP: 0.0.0.0 |   listenIP: 0.0.0.0 | ||||||
|   # Listening ports; if multiple are configured, multiple instances will be launched, and must be consistent with the number of prometheus.ports |  | ||||||
|   ports: [ 10320 ] |  | ||||||
| 
 | 
 | ||||||
| prometheus: | prometheus: | ||||||
|   # Whether to enable prometheus |   # Whether to enable prometheus | ||||||
|  | |||||||
| @ -46,8 +46,7 @@ func (s *Server) InitServer(ctx context.Context, config *Config, disCov discover | |||||||
| 
 | 
 | ||||||
| func (s *Server) Start(ctx context.Context, index int, conf *Config) error { | func (s *Server) Start(ctx context.Context, index int, conf *Config) error { | ||||||
| 	return startrpc.Start(ctx, &conf.Discovery, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP, | 	return startrpc.Start(ctx, &conf.Discovery, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP, | ||||||
| 		conf.MsgGateway.RPC.RegisterIP, | 		index, | ||||||
| 		conf.MsgGateway.RPC.Ports, index, |  | ||||||
| 		conf.Share.RpcRegisterName.MessageGateway, | 		conf.Share.RpcRegisterName.MessageGateway, | ||||||
| 		&conf.Share, | 		&conf.Share, | ||||||
| 		conf, | 		conf, | ||||||
| @ -57,7 +56,7 @@ func (s *Server) Start(ctx context.Context, index int, conf *Config) error { | |||||||
| 
 | 
 | ||||||
| type Server struct { | type Server struct { | ||||||
| 	msggateway.UnimplementedMsgGatewayServer | 	msggateway.UnimplementedMsgGatewayServer | ||||||
| 	rpcPort        int | 
 | ||||||
| 	LongConnServer LongConnServer | 	LongConnServer LongConnServer | ||||||
| 	config         *Config | 	config         *Config | ||||||
| 	pushTerminal   map[int]struct{} | 	pushTerminal   map[int]struct{} | ||||||
| @ -70,9 +69,8 @@ func (s *Server) SetLongConnServer(LongConnServer LongConnServer) { | |||||||
| 	s.LongConnServer = LongConnServer | 	s.LongConnServer = LongConnServer | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func NewServer(rpcPort int, longConnServer LongConnServer, conf *Config, ready func(srv *Server) error) *Server { | func NewServer(longConnServer LongConnServer, conf *Config, ready func(srv *Server) error) *Server { | ||||||
| 	s := &Server{ | 	s := &Server{ | ||||||
| 		rpcPort:        rpcPort, |  | ||||||
| 		LongConnServer: longConnServer, | 		LongConnServer: longConnServer, | ||||||
| 		pushTerminal:   make(map[int]struct{}), | 		pushTerminal:   make(map[int]struct{}), | ||||||
| 		config:         conf, | 		config:         conf, | ||||||
|  | |||||||
| @ -35,16 +35,13 @@ type Config struct { | |||||||
| 
 | 
 | ||||||
| // Start run ws server. | // Start run ws server. | ||||||
| func Start(ctx context.Context, index int, conf *Config) error { | func Start(ctx context.Context, index int, conf *Config) error { | ||||||
| 	log.CInfo(ctx, "MSG-GATEWAY server is initializing", "rpcPorts", conf.MsgGateway.RPC.Ports, | 	log.CInfo(ctx, "MSG-GATEWAY server is initializing", | ||||||
| 		"wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports) | 		"wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports) | ||||||
| 	wsPort, err := datautil.GetElemByIndex(conf.MsgGateway.LongConnSvr.Ports, index) | 	wsPort, err := datautil.GetElemByIndex(conf.MsgGateway.LongConnSvr.Ports, index) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	rpcPort, err := datautil.GetElemByIndex(conf.MsgGateway.RPC.Ports, index) | 
 | ||||||
| 	if err != nil { |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
| 	rdb, err := redisutil.NewRedisClient(ctx, conf.RedisConfig.Build()) | 	rdb, err := redisutil.NewRedisClient(ctx, conf.RedisConfig.Build()) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| @ -57,7 +54,7 @@ func Start(ctx context.Context, index int, conf *Config) error { | |||||||
| 		WithMessageMaxMsgLength(conf.MsgGateway.LongConnSvr.WebsocketMaxMsgLen), | 		WithMessageMaxMsgLength(conf.MsgGateway.LongConnSvr.WebsocketMaxMsgLen), | ||||||
| 	) | 	) | ||||||
| 
 | 
 | ||||||
| 	hubServer := NewServer(rpcPort, longServer, conf, func(srv *Server) error { | 	hubServer := NewServer(longServer, conf, func(srv *Server) error { | ||||||
| 		longServer.online, _ = rpccache.NewOnlineCache(srv.userRcp, nil, rdb, false, longServer.subscriberUserOnlineStatusChanges) | 		longServer.online, _ = rpccache.NewOnlineCache(srv.userRcp, nil, rdb, false, longServer.subscriberUserOnlineStatusChanges) | ||||||
| 		return nil | 		return nil | ||||||
| 	}) | 	}) | ||||||
|  | |||||||
							
								
								
									
										165
									
								
								internal/tools/addr/addr.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										165
									
								
								internal/tools/addr/addr.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,165 @@ | |||||||
|  | // addr provides functions to retrieve local IP addresses from device interfaces. | ||||||
|  | package addr | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"net" | ||||||
|  | 
 | ||||||
|  | 	"github.com/pkg/errors" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | var ( | ||||||
|  | 	// ErrIPNotFound no IP address found, and explicit IP not provided. | ||||||
|  | 	ErrIPNotFound = errors.New("no IP address found, and explicit IP not provided") | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | // IsLocal checks whether an IP belongs to one of the device's interfaces. | ||||||
|  | func IsLocal(addr string) bool { | ||||||
|  | 	// Extract the host | ||||||
|  | 	host, _, err := net.SplitHostPort(addr) | ||||||
|  | 	if err == nil { | ||||||
|  | 		addr = host | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if addr == "localhost" { | ||||||
|  | 		return true | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// Check against all local ips | ||||||
|  | 	for _, ip := range IPs() { | ||||||
|  | 		if addr == ip { | ||||||
|  | 			return true | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return false | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Extract returns a valid IP address. If the address provided is a valid | ||||||
|  | // address, it will be returned directly. Otherwise, the available interfaces | ||||||
|  | // will be iterated over to find an IP address, preferably private. | ||||||
|  | func Extract(addr string) (string, error) { | ||||||
|  | 	// if addr is already specified then it's directly returned | ||||||
|  | 	if len(addr) > 0 && (addr != "0.0.0.0" && addr != "[::]" && addr != "::") { | ||||||
|  | 		return addr, nil | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	var ( | ||||||
|  | 		addrs   []net.Addr | ||||||
|  | 		loAddrs []net.Addr | ||||||
|  | 	) | ||||||
|  | 
 | ||||||
|  | 	ifaces, err := net.Interfaces() | ||||||
|  | 	if err != nil { | ||||||
|  | 		return "", errors.Wrap(err, "failed to get interfaces") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	for _, iface := range ifaces { | ||||||
|  | 		ifaceAddrs, err := iface.Addrs() | ||||||
|  | 		if err != nil { | ||||||
|  | 			// ignore error, interface can disappear from system | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		if iface.Flags&net.FlagLoopback != 0 { | ||||||
|  | 			loAddrs = append(loAddrs, ifaceAddrs...) | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		addrs = append(addrs, ifaceAddrs...) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// Add loopback addresses to the end of the list | ||||||
|  | 	addrs = append(addrs, loAddrs...) | ||||||
|  | 
 | ||||||
|  | 	// Try to find private IP in list, public IP otherwise | ||||||
|  | 	ip, err := findIP(addrs) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return "", err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return ip.String(), nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // IPs returns all available interface IP addresses. | ||||||
|  | func IPs() []string { | ||||||
|  | 	ifaces, err := net.Interfaces() | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	var ipAddrs []string | ||||||
|  | 
 | ||||||
|  | 	for _, i := range ifaces { | ||||||
|  | 		addrs, err := i.Addrs() | ||||||
|  | 		if err != nil { | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		for _, addr := range addrs { | ||||||
|  | 			var ip net.IP | ||||||
|  | 			switch v := addr.(type) { | ||||||
|  | 			case *net.IPNet: | ||||||
|  | 				ip = v.IP | ||||||
|  | 			case *net.IPAddr: | ||||||
|  | 				ip = v.IP | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			if ip == nil { | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			ipAddrs = append(ipAddrs, ip.String()) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return ipAddrs | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // findIP will return the first private IP available in the list. | ||||||
|  | // If no private IP is available it will return the first public IP, if present. | ||||||
|  | // If no public IP is available, it will return the first loopback IP, if present. | ||||||
|  | func findIP(addresses []net.Addr) (net.IP, error) { | ||||||
|  | 	var publicIP net.IP | ||||||
|  | 	var localIP net.IP | ||||||
|  | 
 | ||||||
|  | 	for _, rawAddr := range addresses { | ||||||
|  | 		var ip net.IP | ||||||
|  | 		switch addr := rawAddr.(type) { | ||||||
|  | 		case *net.IPAddr: | ||||||
|  | 			ip = addr.IP | ||||||
|  | 		case *net.IPNet: | ||||||
|  | 			ip = addr.IP | ||||||
|  | 		default: | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		if ip.IsLoopback() { | ||||||
|  | 			if localIP == nil { | ||||||
|  | 				localIP = ip | ||||||
|  | 			} | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		if !ip.IsPrivate() { | ||||||
|  | 			if publicIP == nil { | ||||||
|  | 				publicIP = ip | ||||||
|  | 			} | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		// Return private IP if available | ||||||
|  | 		return ip, nil | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// Return public or virtual IP | ||||||
|  | 	if len(publicIP) > 0 { | ||||||
|  | 		return publicIP, nil | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// Return local IP | ||||||
|  | 	if len(localIP) > 0 { | ||||||
|  | 		return localIP, nil | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return nil, ErrIPNotFound | ||||||
|  | } | ||||||
| @ -55,6 +55,5 @@ func (a *AuthRpcCmd) Exec() error { | |||||||
| 
 | 
 | ||||||
| func (a *AuthRpcCmd) runE() error { | func (a *AuthRpcCmd) runE() error { | ||||||
| 	return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, | 	return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, | ||||||
| 		a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.Ports, |  | ||||||
| 		a.Index(), a.authConfig.Share.RpcRegisterName.Auth, &a.authConfig.Share, a.authConfig, auth.Start) | 		a.Index(), a.authConfig.Share.RpcRegisterName.Auth, &a.authConfig.Share, a.authConfig, auth.Start) | ||||||
| } | } | ||||||
|  | |||||||
| @ -57,6 +57,5 @@ func (a *ConversationRpcCmd) Exec() error { | |||||||
| 
 | 
 | ||||||
| func (a *ConversationRpcCmd) runE() error { | func (a *ConversationRpcCmd) runE() error { | ||||||
| 	return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, | 	return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, | ||||||
| 		a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.Ports, |  | ||||||
| 		a.Index(), a.conversationConfig.Share.RpcRegisterName.Conversation, &a.conversationConfig.Share, a.conversationConfig, conversation.Start) | 		a.Index(), a.conversationConfig.Share.RpcRegisterName.Conversation, &a.conversationConfig.Share, a.conversationConfig, conversation.Start) | ||||||
| } | } | ||||||
|  | |||||||
| @ -58,6 +58,5 @@ func (a *FriendRpcCmd) Exec() error { | |||||||
| 
 | 
 | ||||||
| func (a *FriendRpcCmd) runE() error { | func (a *FriendRpcCmd) runE() error { | ||||||
| 	return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP, | 	return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP, | ||||||
| 		a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.Ports, |  | ||||||
| 		a.Index(), a.relationConfig.Share.RpcRegisterName.Friend, &a.relationConfig.Share, a.relationConfig, relation.Start) | 		a.Index(), a.relationConfig.Share.RpcRegisterName.Friend, &a.relationConfig.Share, a.relationConfig, relation.Start) | ||||||
| } | } | ||||||
|  | |||||||
| @ -59,6 +59,5 @@ func (a *GroupRpcCmd) Exec() error { | |||||||
| 
 | 
 | ||||||
| func (a *GroupRpcCmd) runE() error { | func (a *GroupRpcCmd) runE() error { | ||||||
| 	return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, | 	return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, | ||||||
| 		a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.Ports, |  | ||||||
| 		a.Index(), a.groupConfig.Share.RpcRegisterName.Group, &a.groupConfig.Share, a.groupConfig, group.Start, versionctx.EnableVersionCtx()) | 		a.Index(), a.groupConfig.Share.RpcRegisterName.Group, &a.groupConfig.Share, a.groupConfig, group.Start, versionctx.EnableVersionCtx()) | ||||||
| } | } | ||||||
|  | |||||||
| @ -59,6 +59,5 @@ func (a *MsgRpcCmd) Exec() error { | |||||||
| 
 | 
 | ||||||
| func (a *MsgRpcCmd) runE() error { | func (a *MsgRpcCmd) runE() error { | ||||||
| 	return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, | 	return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, | ||||||
| 		a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.Ports, |  | ||||||
| 		a.Index(), a.msgConfig.Share.RpcRegisterName.Msg, &a.msgConfig.Share, a.msgConfig, msg.Start) | 		a.Index(), a.msgConfig.Share.RpcRegisterName.Msg, &a.msgConfig.Share, a.msgConfig, msg.Start) | ||||||
| } | } | ||||||
|  | |||||||
| @ -59,6 +59,5 @@ func (a *PushRpcCmd) Exec() error { | |||||||
| 
 | 
 | ||||||
| func (a *PushRpcCmd) runE() error { | func (a *PushRpcCmd) runE() error { | ||||||
| 	return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, | 	return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, | ||||||
| 		a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.Ports, |  | ||||||
| 		a.Index(), a.pushConfig.Share.RpcRegisterName.Push, &a.pushConfig.Share, a.pushConfig, push.Start) | 		a.Index(), a.pushConfig.Share.RpcRegisterName.Push, &a.pushConfig.Share, a.pushConfig, push.Start) | ||||||
| } | } | ||||||
|  | |||||||
| @ -58,6 +58,5 @@ func (a *ThirdRpcCmd) Exec() error { | |||||||
| 
 | 
 | ||||||
| func (a *ThirdRpcCmd) runE() error { | func (a *ThirdRpcCmd) runE() error { | ||||||
| 	return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, | 	return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, | ||||||
| 		a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.Ports, |  | ||||||
| 		a.Index(), a.thirdConfig.Share.RpcRegisterName.Third, &a.thirdConfig.Share, a.thirdConfig, third.Start) | 		a.Index(), a.thirdConfig.Share.RpcRegisterName.Third, &a.thirdConfig.Share, a.thirdConfig, third.Start) | ||||||
| } | } | ||||||
|  | |||||||
| @ -59,6 +59,5 @@ func (a *UserRpcCmd) Exec() error { | |||||||
| 
 | 
 | ||||||
| func (a *UserRpcCmd) runE() error { | func (a *UserRpcCmd) runE() error { | ||||||
| 	return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, | 	return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, | ||||||
| 		a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.Ports, |  | ||||||
| 		a.Index(), a.userConfig.Share.RpcRegisterName.User, &a.userConfig.Share, a.userConfig, user.Start) | 		a.Index(), a.userConfig.Share.RpcRegisterName.User, &a.userConfig.Share, a.userConfig, user.Start) | ||||||
| } | } | ||||||
|  | |||||||
| @ -197,7 +197,6 @@ type Push struct { | |||||||
| 	RPC struct { | 	RPC struct { | ||||||
| 		RegisterIP string `mapstructure:"registerIP"` | 		RegisterIP string `mapstructure:"registerIP"` | ||||||
| 		ListenIP   string `mapstructure:"listenIP"` | 		ListenIP   string `mapstructure:"listenIP"` | ||||||
| 		Ports      []int  `mapstructure:"ports"` |  | ||||||
| 	} `mapstructure:"rpc"` | 	} `mapstructure:"rpc"` | ||||||
| 	Prometheus           Prometheus `mapstructure:"prometheus"` | 	Prometheus           Prometheus `mapstructure:"prometheus"` | ||||||
| 	MaxConcurrentWorkers int        `mapstructure:"maxConcurrentWorkers"` | 	MaxConcurrentWorkers int        `mapstructure:"maxConcurrentWorkers"` | ||||||
|  | |||||||
| @ -24,10 +24,10 @@ import ( | |||||||
| 	"net/http" | 	"net/http" | ||||||
| 	"os" | 	"os" | ||||||
| 	"os/signal" | 	"os/signal" | ||||||
| 	"strconv" |  | ||||||
| 	"syscall" | 	"syscall" | ||||||
| 	"time" | 	"time" | ||||||
| 
 | 
 | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/internal/tools/addr" | ||||||
| 	kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" | 	kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" | ||||||
| 	"github.com/openimsdk/tools/discovery" | 	"github.com/openimsdk/tools/discovery" | ||||||
| @ -37,22 +37,15 @@ import ( | |||||||
| 	"github.com/openimsdk/tools/utils/network" | 	"github.com/openimsdk/tools/utils/network" | ||||||
| 	"google.golang.org/grpc" | 	"google.golang.org/grpc" | ||||||
| 	"google.golang.org/grpc/credentials/insecure" | 	"google.golang.org/grpc/credentials/insecure" | ||||||
|  | 	"strconv" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| // Start rpc server. | // Start rpc server. | ||||||
| func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusConfig *config.Prometheus, listenIP, | func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusConfig *config.Prometheus, listenIP string, | ||||||
| 	registerIP string, rpcPorts []int, index int, rpcRegisterName string, share *config.Share, config T, rpcFn func(ctx context.Context, | 	index int, rpcRegisterName string, share *config.Share, config T, rpcFn func(ctx context.Context, | ||||||
| 	config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { | 		config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { | ||||||
| 
 |  | ||||||
| 	rpcPort, err := datautil.GetElemByIndex(rpcPorts, index) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	log.CInfo(ctx, "RPC server is initializing", "rpcRegisterName", rpcRegisterName, "rpcPort", rpcPort, |  | ||||||
| 		"prometheusPorts", prometheusConfig.Ports) |  | ||||||
| 	rpcTcpAddr := net.JoinHostPort(network.GetListenIP(listenIP), strconv.Itoa(rpcPort)) |  | ||||||
| 
 | 
 | ||||||
|  | 	rpcTcpAddr := net.JoinHostPort(network.GetListenIP(listenIP), "0") | ||||||
| 	listener, err := net.Listen( | 	listener, err := net.Listen( | ||||||
| 		"tcp", | 		"tcp", | ||||||
| 		rpcTcpAddr, | 		rpcTcpAddr, | ||||||
| @ -60,6 +53,14 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo | |||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr) | 		return errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr) | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
|  | 	h, portStr, _ := net.SplitHostPort(listener.Addr().String()) | ||||||
|  | 	host, _ := addr.Extract(h) | ||||||
|  | 	port, _ := strconv.Atoi(portStr) | ||||||
|  | 
 | ||||||
|  | 	log.CInfo(ctx, "RPC server is initializing", "rpcRegisterName", rpcRegisterName, "rpcPort", portStr, | ||||||
|  | 		"prometheusPorts", prometheusConfig.Ports) | ||||||
|  | 
 | ||||||
| 	defer listener.Close() | 	defer listener.Close() | ||||||
| 	client, err := kdisc.NewDiscoveryRegister(discovery, share) | 	client, err := kdisc.NewDiscoveryRegister(discovery, share) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| @ -68,17 +69,13 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo | |||||||
| 
 | 
 | ||||||
| 	defer client.Close() | 	defer client.Close() | ||||||
| 	client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) | 	client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) | ||||||
| 	registerIP, err = network.GetRpcRegisterIP(registerIP) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
| 
 | 
 | ||||||
| 	//var reg *prometheus.Registry | 	// var reg *prometheus.Registry | ||||||
| 	//var metric *grpcprometheus.ServerMetrics | 	// var metric *grpcprometheus.ServerMetrics | ||||||
| 	if prometheusConfig.Enable { | 	if prometheusConfig.Enable { | ||||||
| 		//cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share) | 		// cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share) | ||||||
| 		//reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics) | 		// reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics) | ||||||
| 		//options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()), | 		// options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()), | ||||||
| 		//	grpc.UnaryInterceptor(metric.UnaryServerInterceptor())) | 		//	grpc.UnaryInterceptor(metric.UnaryServerInterceptor())) | ||||||
| 		options = append( | 		options = append( | ||||||
| 			options, mw.GrpcServer(), | 			options, mw.GrpcServer(), | ||||||
| @ -98,8 +95,8 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo | |||||||
| 
 | 
 | ||||||
| 	err = client.Register( | 	err = client.Register( | ||||||
| 		rpcRegisterName, | 		rpcRegisterName, | ||||||
| 		registerIP, | 		host, | ||||||
| 		rpcPort, | 		port, | ||||||
| 		grpc.WithTransportCredentials(insecure.NewCredentials()), | 		grpc.WithTransportCredentials(insecure.NewCredentials()), | ||||||
| 	) | 	) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| @ -123,13 +120,13 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo | |||||||
| 				netErr = errs.WrapMsg(err, fmt.Sprintf("rpc %s prometheus start err: %d", rpcRegisterName, prometheusPort)) | 				netErr = errs.WrapMsg(err, fmt.Sprintf("rpc %s prometheus start err: %d", rpcRegisterName, prometheusPort)) | ||||||
| 				netDone <- struct{}{} | 				netDone <- struct{}{} | ||||||
| 			} | 			} | ||||||
| 			//metric.InitializeMetrics(srv) | 			// metric.InitializeMetrics(srv) | ||||||
| 			// Create a HTTP server for prometheus. | 			// Create a HTTP server for prometheus. | ||||||
| 			//httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)} | 			// httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)} | ||||||
| 			//if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { | 			// if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { | ||||||
| 			//	netErr = errs.WrapMsg(err, "prometheus start err", httpServer.Addr) | 			//	netErr = errs.WrapMsg(err, "prometheus start err", httpServer.Addr) | ||||||
| 			//	netDone <- struct{}{} | 			//	netDone <- struct{}{} | ||||||
| 			//} | 			// } | ||||||
| 		}() | 		}() | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user