mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-26 21:22:16 +08:00 
			
		
		
		
	Merge: main
This commit is contained in:
		
						commit
						69ccbb25a3
					
				| @ -1,6 +1,11 @@ | |||||||
| rpc: | rpc: | ||||||
|   # The IP address where this RPC service registers itself; if left blank, it defaults to the internal network IP |   # The IP address where this RPC service registers itself; if left blank, it defaults to the internal network IP | ||||||
|   registerIP: |   registerIP: | ||||||
|  |   # autoSetPorts indicates whether to automatically set the ports | ||||||
|  |   autoSetPorts: true | ||||||
|  |   # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports | ||||||
|  |   # It will only take effect when autoSetPorts is set to false. | ||||||
|  |   ports: [ 10140, 10141, 10142, 10143, 10144, 10145, 10146, 10147, 10148, 10149, 10150, 10151, 10152, 10153, 10154, 10155 ] | ||||||
| 
 | 
 | ||||||
| prometheus: | prometheus: | ||||||
|   # Enable or disable Prometheus monitoring |   # Enable or disable Prometheus monitoring | ||||||
|  | |||||||
| @ -3,6 +3,12 @@ rpc: | |||||||
|   registerIP: |   registerIP: | ||||||
|   # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP |   # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP | ||||||
|   listenIP: 0.0.0.0 |   listenIP: 0.0.0.0 | ||||||
|  |   # autoSetPorts indicates whether to automatically set the ports | ||||||
|  |   autoSetPorts: true | ||||||
|  |   # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports | ||||||
|  |   # It will only take effect when autoSetPorts is set to false. | ||||||
|  |   ports: [ 10170, 10171, 10172, 10173, 10174, 10175, 10176, 10177, 10178, 10179, 10180, 10181, 10182, 10183, 10184, 10185 ] | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| prometheus: | prometheus: | ||||||
|   # Enable or disable Prometheus monitoring |   # Enable or disable Prometheus monitoring | ||||||
|  | |||||||
| @ -3,6 +3,11 @@ rpc: | |||||||
|   registerIP:  |   registerIP:  | ||||||
|   # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP |   # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP | ||||||
|   listenIP: 0.0.0.0 |   listenIP: 0.0.0.0 | ||||||
|  |   # autoSetPorts indicates whether to automatically set the ports | ||||||
|  |   autoSetPorts: true | ||||||
|  |   # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports | ||||||
|  |   # It will only take effect when autoSetPorts is set to false. | ||||||
|  |   ports: [ 10200 ] | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| prometheus: | prometheus: | ||||||
|  | |||||||
| @ -3,6 +3,11 @@ rpc: | |||||||
|   registerIP:  |   registerIP:  | ||||||
|   # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP |   # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP | ||||||
|   listenIP: 0.0.0.0 |   listenIP: 0.0.0.0 | ||||||
|  |   # autoSetPorts indicates whether to automatically set the ports | ||||||
|  |   autoSetPorts: true | ||||||
|  |   # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports | ||||||
|  |   # It will only take effect when autoSetPorts is set to false. | ||||||
|  |   ports: [ 10220 ] | ||||||
| 
 | 
 | ||||||
| prometheus: | prometheus: | ||||||
|   # Enable or disable Prometheus monitoring |   # Enable or disable Prometheus monitoring | ||||||
|  | |||||||
| @ -3,6 +3,11 @@ rpc: | |||||||
|   registerIP:  |   registerIP:  | ||||||
|   # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP |   # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP | ||||||
|   listenIP: 0.0.0.0 |   listenIP: 0.0.0.0 | ||||||
|  |   # autoSetPorts indicates whether to automatically set the ports | ||||||
|  |   autoSetPorts: true | ||||||
|  |   # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports | ||||||
|  |   # It will only take effect when autoSetPorts is set to false. | ||||||
|  |   ports: [ 10240 ] | ||||||
| 
 | 
 | ||||||
| prometheus: | prometheus: | ||||||
|   # Enable or disable Prometheus monitoring |   # Enable or disable Prometheus monitoring | ||||||
|  | |||||||
| @ -3,6 +3,11 @@ rpc: | |||||||
|   registerIP: |   registerIP: | ||||||
|   # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP |   # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP | ||||||
|   listenIP: 0.0.0.0 |   listenIP: 0.0.0.0 | ||||||
|  |   # autoSetPorts indicates whether to automatically set the ports | ||||||
|  |   autoSetPorts: true | ||||||
|  |   # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports | ||||||
|  |   # It will only take effect when autoSetPorts is set to false. | ||||||
|  |   ports: [ 10260 ] | ||||||
| 
 | 
 | ||||||
| prometheus: | prometheus: | ||||||
|   # Enable or disable Prometheus monitoring |   # Enable or disable Prometheus monitoring | ||||||
|  | |||||||
| @ -3,6 +3,11 @@ rpc: | |||||||
|   registerIP:  |   registerIP:  | ||||||
|   # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP |   # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP | ||||||
|   listenIP: 0.0.0.0 |   listenIP: 0.0.0.0 | ||||||
|  |   # autoSetPorts indicates whether to automatically set the ports | ||||||
|  |   autoSetPorts: true | ||||||
|  |   # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports | ||||||
|  |   # It will only take effect when autoSetPorts is set to false. | ||||||
|  |   ports: [ 10280 ] | ||||||
| 
 | 
 | ||||||
| prometheus: | prometheus: | ||||||
|   # Enable or disable Prometheus monitoring |   # Enable or disable Prometheus monitoring | ||||||
|  | |||||||
| @ -3,6 +3,11 @@ rpc: | |||||||
|   registerIP:  |   registerIP:  | ||||||
|   # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP |   # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP | ||||||
|   listenIP: 0.0.0.0 |   listenIP: 0.0.0.0 | ||||||
|  |   # autoSetPorts indicates whether to automatically set the ports | ||||||
|  |   autoSetPorts: true | ||||||
|  |   # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports | ||||||
|  |   # It will only take effect when autoSetPorts is set to false. | ||||||
|  |   ports: [ 10300 ] | ||||||
| 
 | 
 | ||||||
| prometheus: | prometheus: | ||||||
|   # Enable or disable Prometheus monitoring |   # Enable or disable Prometheus monitoring | ||||||
|  | |||||||
| @ -3,6 +3,11 @@ rpc: | |||||||
|   registerIP:  |   registerIP:  | ||||||
|   # Listening IP; 0.0.0.0 means both internal and external IPs are listened to, if blank, the internal network IP is automatically obtained by default |   # Listening IP; 0.0.0.0 means both internal and external IPs are listened to, if blank, the internal network IP is automatically obtained by default | ||||||
|   listenIP: 0.0.0.0 |   listenIP: 0.0.0.0 | ||||||
|  |   # autoSetPorts indicates whether to automatically set the ports | ||||||
|  |   autoSetPorts: true | ||||||
|  |   # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports | ||||||
|  |   # It will only take effect when autoSetPorts is set to false. | ||||||
|  |   ports: [ 10320 ] | ||||||
| 
 | 
 | ||||||
| prometheus: | prometheus: | ||||||
|   # Whether to enable prometheus |   # Whether to enable prometheus | ||||||
|  | |||||||
| @ -46,7 +46,8 @@ func (s *Server) InitServer(ctx context.Context, config *Config, disCov discover | |||||||
| 
 | 
 | ||||||
| func (s *Server) Start(ctx context.Context, index int, conf *Config) error { | func (s *Server) Start(ctx context.Context, index int, conf *Config) error { | ||||||
| 	return startrpc.Start(ctx, &conf.Discovery, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP, | 	return startrpc.Start(ctx, &conf.Discovery, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP, | ||||||
| 		index, | 		conf.MsgGateway.RPC.RegisterIP, | ||||||
|  | 		conf.MsgGateway.RPC.AutoSetPorts, conf.MsgGateway.RPC.Ports, index, | ||||||
| 		conf.Discovery.RpcService.MessageGateway, | 		conf.Discovery.RpcService.MessageGateway, | ||||||
| 		&conf.Share, | 		&conf.Share, | ||||||
| 		conf, | 		conf, | ||||||
|  | |||||||
| @ -35,7 +35,8 @@ type Config struct { | |||||||
| 
 | 
 | ||||||
| // Start run ws server. | // Start run ws server. | ||||||
| func Start(ctx context.Context, index int, conf *Config) error { | func Start(ctx context.Context, index int, conf *Config) error { | ||||||
| 	log.CInfo(ctx, "MSG-GATEWAY server is initializing", | 	log.CInfo(ctx, "MSG-GATEWAY server is initializing", "autoSetPorts", conf.MsgGateway.RPC.AutoSetPorts, | ||||||
|  | 		"rpcPorts", conf.MsgGateway.RPC.Ports, | ||||||
| 		"wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports) | 		"wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports) | ||||||
| 	wsPort, err := datautil.GetElemByIndex(conf.MsgGateway.LongConnSvr.Ports, index) | 	wsPort, err := datautil.GetElemByIndex(conf.MsgGateway.LongConnSvr.Ports, index) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
|  | |||||||
| @ -1,165 +0,0 @@ | |||||||
| // addr provides functions to retrieve local IP addresses from device interfaces. |  | ||||||
| package addr |  | ||||||
| 
 |  | ||||||
| import ( |  | ||||||
| 	"net" |  | ||||||
| 
 |  | ||||||
| 	"github.com/pkg/errors" |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| var ( |  | ||||||
| 	// ErrIPNotFound no IP address found, and explicit IP not provided. |  | ||||||
| 	ErrIPNotFound = errors.New("no IP address found, and explicit IP not provided") |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| // IsLocal checks whether an IP belongs to one of the device's interfaces. |  | ||||||
| func IsLocal(addr string) bool { |  | ||||||
| 	// Extract the host |  | ||||||
| 	host, _, err := net.SplitHostPort(addr) |  | ||||||
| 	if err == nil { |  | ||||||
| 		addr = host |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	if addr == "localhost" { |  | ||||||
| 		return true |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	// Check against all local ips |  | ||||||
| 	for _, ip := range IPs() { |  | ||||||
| 		if addr == ip { |  | ||||||
| 			return true |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return false |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // Extract returns a valid IP address. If the address provided is a valid |  | ||||||
| // address, it will be returned directly. Otherwise, the available interfaces |  | ||||||
| // will be iterated over to find an IP address, preferably private. |  | ||||||
| func Extract(addr string) (string, error) { |  | ||||||
| 	// if addr is already specified then it's directly returned |  | ||||||
| 	if len(addr) > 0 && (addr != "0.0.0.0" && addr != "[::]" && addr != "::") { |  | ||||||
| 		return addr, nil |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	var ( |  | ||||||
| 		addrs   []net.Addr |  | ||||||
| 		loAddrs []net.Addr |  | ||||||
| 	) |  | ||||||
| 
 |  | ||||||
| 	ifaces, err := net.Interfaces() |  | ||||||
| 	if err != nil { |  | ||||||
| 		return "", errors.Wrap(err, "failed to get interfaces") |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	for _, iface := range ifaces { |  | ||||||
| 		ifaceAddrs, err := iface.Addrs() |  | ||||||
| 		if err != nil { |  | ||||||
| 			// ignore error, interface can disappear from system |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		if iface.Flags&net.FlagLoopback != 0 { |  | ||||||
| 			loAddrs = append(loAddrs, ifaceAddrs...) |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		addrs = append(addrs, ifaceAddrs...) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	// Add loopback addresses to the end of the list |  | ||||||
| 	addrs = append(addrs, loAddrs...) |  | ||||||
| 
 |  | ||||||
| 	// Try to find private IP in list, public IP otherwise |  | ||||||
| 	ip, err := findIP(addrs) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return "", err |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return ip.String(), nil |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // IPs returns all available interface IP addresses. |  | ||||||
| func IPs() []string { |  | ||||||
| 	ifaces, err := net.Interfaces() |  | ||||||
| 	if err != nil { |  | ||||||
| 		return nil |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	var ipAddrs []string |  | ||||||
| 
 |  | ||||||
| 	for _, i := range ifaces { |  | ||||||
| 		addrs, err := i.Addrs() |  | ||||||
| 		if err != nil { |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		for _, addr := range addrs { |  | ||||||
| 			var ip net.IP |  | ||||||
| 			switch v := addr.(type) { |  | ||||||
| 			case *net.IPNet: |  | ||||||
| 				ip = v.IP |  | ||||||
| 			case *net.IPAddr: |  | ||||||
| 				ip = v.IP |  | ||||||
| 			} |  | ||||||
| 
 |  | ||||||
| 			if ip == nil { |  | ||||||
| 				continue |  | ||||||
| 			} |  | ||||||
| 
 |  | ||||||
| 			ipAddrs = append(ipAddrs, ip.String()) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return ipAddrs |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // findIP will return the first private IP available in the list. |  | ||||||
| // If no private IP is available it will return the first public IP, if present. |  | ||||||
| // If no public IP is available, it will return the first loopback IP, if present. |  | ||||||
| func findIP(addresses []net.Addr) (net.IP, error) { |  | ||||||
| 	var publicIP net.IP |  | ||||||
| 	var localIP net.IP |  | ||||||
| 
 |  | ||||||
| 	for _, rawAddr := range addresses { |  | ||||||
| 		var ip net.IP |  | ||||||
| 		switch addr := rawAddr.(type) { |  | ||||||
| 		case *net.IPAddr: |  | ||||||
| 			ip = addr.IP |  | ||||||
| 		case *net.IPNet: |  | ||||||
| 			ip = addr.IP |  | ||||||
| 		default: |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		if ip.IsLoopback() { |  | ||||||
| 			if localIP == nil { |  | ||||||
| 				localIP = ip |  | ||||||
| 			} |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		if !ip.IsPrivate() { |  | ||||||
| 			if publicIP == nil { |  | ||||||
| 				publicIP = ip |  | ||||||
| 			} |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		// Return private IP if available |  | ||||||
| 		return ip, nil |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	// Return public or virtual IP |  | ||||||
| 	if len(publicIP) > 0 { |  | ||||||
| 		return publicIP, nil |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	// Return local IP |  | ||||||
| 	if len(localIP) > 0 { |  | ||||||
| 		return localIP, nil |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return nil, ErrIPNotFound |  | ||||||
| } |  | ||||||
| @ -55,5 +55,6 @@ func (a *AuthRpcCmd) Exec() error { | |||||||
| 
 | 
 | ||||||
| func (a *AuthRpcCmd) runE() error { | func (a *AuthRpcCmd) runE() error { | ||||||
| 	return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, | 	return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, | ||||||
|  | 		a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.AutoSetPorts, a.authConfig.RpcConfig.RPC.Ports, | ||||||
| 		a.Index(), a.authConfig.Discovery.RpcService.Auth, &a.authConfig.Share, a.authConfig, auth.Start) | 		a.Index(), a.authConfig.Discovery.RpcService.Auth, &a.authConfig.Share, a.authConfig, auth.Start) | ||||||
| } | } | ||||||
|  | |||||||
| @ -57,5 +57,6 @@ func (a *ConversationRpcCmd) Exec() error { | |||||||
| 
 | 
 | ||||||
| func (a *ConversationRpcCmd) runE() error { | func (a *ConversationRpcCmd) runE() error { | ||||||
| 	return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, | 	return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, | ||||||
|  | 		a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.AutoSetPorts, a.conversationConfig.RpcConfig.RPC.Ports, | ||||||
| 		a.Index(), a.conversationConfig.Discovery.RpcService.Conversation, &a.conversationConfig.Share, a.conversationConfig, conversation.Start) | 		a.Index(), a.conversationConfig.Discovery.RpcService.Conversation, &a.conversationConfig.Share, a.conversationConfig, conversation.Start) | ||||||
| } | } | ||||||
|  | |||||||
| @ -58,5 +58,6 @@ func (a *FriendRpcCmd) Exec() error { | |||||||
| 
 | 
 | ||||||
| func (a *FriendRpcCmd) runE() error { | func (a *FriendRpcCmd) runE() error { | ||||||
| 	return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP, | 	return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP, | ||||||
|  | 		a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.AutoSetPorts, a.relationConfig.RpcConfig.RPC.Ports, | ||||||
| 		a.Index(), a.relationConfig.Discovery.RpcService.Friend, &a.relationConfig.Share, a.relationConfig, relation.Start) | 		a.Index(), a.relationConfig.Discovery.RpcService.Friend, &a.relationConfig.Share, a.relationConfig, relation.Start) | ||||||
| } | } | ||||||
|  | |||||||
| @ -59,5 +59,6 @@ func (a *GroupRpcCmd) Exec() error { | |||||||
| 
 | 
 | ||||||
| func (a *GroupRpcCmd) runE() error { | func (a *GroupRpcCmd) runE() error { | ||||||
| 	return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, | 	return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, | ||||||
|  | 		a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.AutoSetPorts, a.groupConfig.RpcConfig.RPC.Ports, | ||||||
| 		a.Index(), a.groupConfig.Discovery.RpcService.Group, &a.groupConfig.Share, a.groupConfig, group.Start, versionctx.EnableVersionCtx()) | 		a.Index(), a.groupConfig.Discovery.RpcService.Group, &a.groupConfig.Share, a.groupConfig, group.Start, versionctx.EnableVersionCtx()) | ||||||
| } | } | ||||||
|  | |||||||
| @ -59,5 +59,6 @@ func (a *MsgRpcCmd) Exec() error { | |||||||
| 
 | 
 | ||||||
| func (a *MsgRpcCmd) runE() error { | func (a *MsgRpcCmd) runE() error { | ||||||
| 	return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, | 	return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, | ||||||
|  | 		a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.AutoSetPorts, a.msgConfig.RpcConfig.RPC.Ports, | ||||||
| 		a.Index(), a.msgConfig.Discovery.RpcService.Msg, &a.msgConfig.Share, a.msgConfig, msg.Start) | 		a.Index(), a.msgConfig.Discovery.RpcService.Msg, &a.msgConfig.Share, a.msgConfig, msg.Start) | ||||||
| } | } | ||||||
|  | |||||||
| @ -59,5 +59,6 @@ func (a *PushRpcCmd) Exec() error { | |||||||
| 
 | 
 | ||||||
| func (a *PushRpcCmd) runE() error { | func (a *PushRpcCmd) runE() error { | ||||||
| 	return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, | 	return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, | ||||||
|  | 		a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.AutoSetPorts, a.pushConfig.RpcConfig.RPC.Ports, | ||||||
| 		a.Index(), a.pushConfig.Discovery.RpcService.Push, &a.pushConfig.Share, a.pushConfig, push.Start) | 		a.Index(), a.pushConfig.Discovery.RpcService.Push, &a.pushConfig.Share, a.pushConfig, push.Start) | ||||||
| } | } | ||||||
|  | |||||||
| @ -58,5 +58,6 @@ func (a *ThirdRpcCmd) Exec() error { | |||||||
| 
 | 
 | ||||||
| func (a *ThirdRpcCmd) runE() error { | func (a *ThirdRpcCmd) runE() error { | ||||||
| 	return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, | 	return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, | ||||||
|  | 		a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.AutoSetPorts, a.thirdConfig.RpcConfig.RPC.Ports, | ||||||
| 		a.Index(), a.thirdConfig.Discovery.RpcService.Third, &a.thirdConfig.Share, a.thirdConfig, third.Start) | 		a.Index(), a.thirdConfig.Discovery.RpcService.Third, &a.thirdConfig.Share, a.thirdConfig, third.Start) | ||||||
| } | } | ||||||
|  | |||||||
| @ -59,5 +59,6 @@ func (a *UserRpcCmd) Exec() error { | |||||||
| 
 | 
 | ||||||
| func (a *UserRpcCmd) runE() error { | func (a *UserRpcCmd) runE() error { | ||||||
| 	return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, | 	return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, | ||||||
|  | 		a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.AutoSetPorts, a.userConfig.RpcConfig.RPC.Ports, | ||||||
| 		a.Index(), a.userConfig.Discovery.RpcService.User, &a.userConfig.Share, a.userConfig, user.Start) | 		a.Index(), a.userConfig.Discovery.RpcService.User, &a.userConfig.Share, a.userConfig, user.Start) | ||||||
| } | } | ||||||
|  | |||||||
| @ -177,6 +177,7 @@ type Prometheus struct { | |||||||
| type MsgGateway struct { | type MsgGateway struct { | ||||||
| 	RPC struct { | 	RPC struct { | ||||||
| 		RegisterIP   string `mapstructure:"registerIP"` | 		RegisterIP   string `mapstructure:"registerIP"` | ||||||
|  | 		AutoSetPorts bool   `mapstructure:"autoSetPorts"` | ||||||
| 		Ports        []int  `mapstructure:"ports"` | 		Ports        []int  `mapstructure:"ports"` | ||||||
| 	} `mapstructure:"rpc"` | 	} `mapstructure:"rpc"` | ||||||
| 	Prometheus  Prometheus `mapstructure:"prometheus"` | 	Prometheus  Prometheus `mapstructure:"prometheus"` | ||||||
| @ -197,6 +198,8 @@ type Push struct { | |||||||
| 	RPC struct { | 	RPC struct { | ||||||
| 		RegisterIP   string `mapstructure:"registerIP"` | 		RegisterIP   string `mapstructure:"registerIP"` | ||||||
| 		ListenIP     string `mapstructure:"listenIP"` | 		ListenIP     string `mapstructure:"listenIP"` | ||||||
|  | 		AutoSetPorts bool   `mapstructure:"autoSetPorts"` | ||||||
|  | 		Ports        []int  `mapstructure:"ports"` | ||||||
| 	} `mapstructure:"rpc"` | 	} `mapstructure:"rpc"` | ||||||
| 	Prometheus           Prometheus `mapstructure:"prometheus"` | 	Prometheus           Prometheus `mapstructure:"prometheus"` | ||||||
| 	MaxConcurrentWorkers int        `mapstructure:"maxConcurrentWorkers"` | 	MaxConcurrentWorkers int        `mapstructure:"maxConcurrentWorkers"` | ||||||
| @ -231,6 +234,7 @@ type Auth struct { | |||||||
| 	RPC struct { | 	RPC struct { | ||||||
| 		RegisterIP   string `mapstructure:"registerIP"` | 		RegisterIP   string `mapstructure:"registerIP"` | ||||||
| 		ListenIP     string `mapstructure:"listenIP"` | 		ListenIP     string `mapstructure:"listenIP"` | ||||||
|  | 		AutoSetPorts bool   `mapstructure:"autoSetPorts"` | ||||||
| 		Ports        []int  `mapstructure:"ports"` | 		Ports        []int  `mapstructure:"ports"` | ||||||
| 	} `mapstructure:"rpc"` | 	} `mapstructure:"rpc"` | ||||||
| 	Prometheus  Prometheus `mapstructure:"prometheus"` | 	Prometheus  Prometheus `mapstructure:"prometheus"` | ||||||
| @ -243,6 +247,7 @@ type Conversation struct { | |||||||
| 	RPC struct { | 	RPC struct { | ||||||
| 		RegisterIP   string `mapstructure:"registerIP"` | 		RegisterIP   string `mapstructure:"registerIP"` | ||||||
| 		ListenIP     string `mapstructure:"listenIP"` | 		ListenIP     string `mapstructure:"listenIP"` | ||||||
|  | 		AutoSetPorts bool   `mapstructure:"autoSetPorts"` | ||||||
| 		Ports        []int  `mapstructure:"ports"` | 		Ports        []int  `mapstructure:"ports"` | ||||||
| 	} `mapstructure:"rpc"` | 	} `mapstructure:"rpc"` | ||||||
| 	Prometheus Prometheus `mapstructure:"prometheus"` | 	Prometheus Prometheus `mapstructure:"prometheus"` | ||||||
| @ -252,6 +257,7 @@ type Friend struct { | |||||||
| 	RPC struct { | 	RPC struct { | ||||||
| 		RegisterIP   string `mapstructure:"registerIP"` | 		RegisterIP   string `mapstructure:"registerIP"` | ||||||
| 		ListenIP     string `mapstructure:"listenIP"` | 		ListenIP     string `mapstructure:"listenIP"` | ||||||
|  | 		AutoSetPorts bool   `mapstructure:"autoSetPorts"` | ||||||
| 		Ports        []int  `mapstructure:"ports"` | 		Ports        []int  `mapstructure:"ports"` | ||||||
| 	} `mapstructure:"rpc"` | 	} `mapstructure:"rpc"` | ||||||
| 	Prometheus Prometheus `mapstructure:"prometheus"` | 	Prometheus Prometheus `mapstructure:"prometheus"` | ||||||
| @ -261,6 +267,7 @@ type Group struct { | |||||||
| 	RPC struct { | 	RPC struct { | ||||||
| 		RegisterIP   string `mapstructure:"registerIP"` | 		RegisterIP   string `mapstructure:"registerIP"` | ||||||
| 		ListenIP     string `mapstructure:"listenIP"` | 		ListenIP     string `mapstructure:"listenIP"` | ||||||
|  | 		AutoSetPorts bool   `mapstructure:"autoSetPorts"` | ||||||
| 		Ports        []int  `mapstructure:"ports"` | 		Ports        []int  `mapstructure:"ports"` | ||||||
| 	} `mapstructure:"rpc"` | 	} `mapstructure:"rpc"` | ||||||
| 	Prometheus                 Prometheus `mapstructure:"prometheus"` | 	Prometheus                 Prometheus `mapstructure:"prometheus"` | ||||||
| @ -271,6 +278,7 @@ type Msg struct { | |||||||
| 	RPC struct { | 	RPC struct { | ||||||
| 		RegisterIP   string `mapstructure:"registerIP"` | 		RegisterIP   string `mapstructure:"registerIP"` | ||||||
| 		ListenIP     string `mapstructure:"listenIP"` | 		ListenIP     string `mapstructure:"listenIP"` | ||||||
|  | 		AutoSetPorts bool   `mapstructure:"autoSetPorts"` | ||||||
| 		Ports        []int  `mapstructure:"ports"` | 		Ports        []int  `mapstructure:"ports"` | ||||||
| 	} `mapstructure:"rpc"` | 	} `mapstructure:"rpc"` | ||||||
| 	Prometheus   Prometheus `mapstructure:"prometheus"` | 	Prometheus   Prometheus `mapstructure:"prometheus"` | ||||||
| @ -281,6 +289,7 @@ type Third struct { | |||||||
| 	RPC struct { | 	RPC struct { | ||||||
| 		RegisterIP   string `mapstructure:"registerIP"` | 		RegisterIP   string `mapstructure:"registerIP"` | ||||||
| 		ListenIP     string `mapstructure:"listenIP"` | 		ListenIP     string `mapstructure:"listenIP"` | ||||||
|  | 		AutoSetPorts bool   `mapstructure:"autoSetPorts"` | ||||||
| 		Ports        []int  `mapstructure:"ports"` | 		Ports        []int  `mapstructure:"ports"` | ||||||
| 	} `mapstructure:"rpc"` | 	} `mapstructure:"rpc"` | ||||||
| 	Prometheus Prometheus `mapstructure:"prometheus"` | 	Prometheus Prometheus `mapstructure:"prometheus"` | ||||||
| @ -330,6 +339,7 @@ type User struct { | |||||||
| 	RPC struct { | 	RPC struct { | ||||||
| 		RegisterIP   string `mapstructure:"registerIP"` | 		RegisterIP   string `mapstructure:"registerIP"` | ||||||
| 		ListenIP     string `mapstructure:"listenIP"` | 		ListenIP     string `mapstructure:"listenIP"` | ||||||
|  | 		AutoSetPorts bool   `mapstructure:"autoSetPorts"` | ||||||
| 		Ports        []int  `mapstructure:"ports"` | 		Ports        []int  `mapstructure:"ports"` | ||||||
| 	} `mapstructure:"rpc"` | 	} `mapstructure:"rpc"` | ||||||
| 	Prometheus Prometheus `mapstructure:"prometheus"` | 	Prometheus Prometheus `mapstructure:"prometheus"` | ||||||
|  | |||||||
| @ -16,6 +16,7 @@ package startrpc | |||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
|  | 	"errors" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||||
| 	"github.com/openimsdk/tools/utils/datautil" | 	"github.com/openimsdk/tools/utils/datautil" | ||||||
| @ -27,7 +28,6 @@ import ( | |||||||
| 	"syscall" | 	"syscall" | ||||||
| 	"time" | 	"time" | ||||||
| 
 | 
 | ||||||
| 	"github.com/openimsdk/open-im-server/v3/internal/tools/addr" |  | ||||||
| 	kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" | 	kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" | ||||||
| 	"github.com/openimsdk/tools/discovery" | 	"github.com/openimsdk/tools/discovery" | ||||||
| @ -41,11 +41,60 @@ import ( | |||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| // Start rpc server. | // Start rpc server. | ||||||
| func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusConfig *config.Prometheus, listenIP string, | func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusConfig *config.Prometheus, listenIP, | ||||||
| 	index int, rpcRegisterName string, share *config.Share, config T, rpcFn func(ctx context.Context, | 	registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, share *config.Share, config T, rpcFn func(ctx context.Context, | ||||||
| 	config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { | 	config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { | ||||||
| 
 | 
 | ||||||
| 	rpcTcpAddr := net.JoinHostPort(network.GetListenIP(listenIP), "0") | 	var ( | ||||||
|  | 		rpcTcpAddr string | ||||||
|  | 		netDone    = make(chan struct{}, 2) | ||||||
|  | 		netErr     error | ||||||
|  | 	) | ||||||
|  | 
 | ||||||
|  | 	if !autoSetPorts { | ||||||
|  | 		rpcPort, err := datautil.GetElemByIndex(rpcPorts, index) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 		rpcTcpAddr = net.JoinHostPort(network.GetListenIP(listenIP), strconv.Itoa(rpcPort)) | ||||||
|  | 	} else { | ||||||
|  | 		rpcTcpAddr = net.JoinHostPort(network.GetListenIP(listenIP), "0") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// var reg *prometheus.Registry | ||||||
|  | 	// var metric *grpcprometheus.ServerMetrics | ||||||
|  | 	if prometheusConfig.Enable { | ||||||
|  | 		// cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share) | ||||||
|  | 		// reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics) | ||||||
|  | 		// options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()), | ||||||
|  | 		//	grpc.UnaryInterceptor(metric.UnaryServerInterceptor())) | ||||||
|  | 		options = append( | ||||||
|  | 			options, mw.GrpcServer(), | ||||||
|  | 			prommetricsUnaryInterceptor(rpcRegisterName), | ||||||
|  | 			prommetricsStreamInterceptor(rpcRegisterName), | ||||||
|  | 		) | ||||||
|  | 		prometheusPort, err := datautil.GetElemByIndex(prometheusConfig.Ports, index) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 		cs := prommetrics.GetGrpcCusMetrics(rpcRegisterName, discovery) | ||||||
|  | 		go func() { | ||||||
|  | 			if err := prommetrics.RpcInit(cs, prometheusPort); err != nil && !errors.Is(err, http.ErrServerClosed) { | ||||||
|  | 				netErr = errs.WrapMsg(err, fmt.Sprintf("rpc %s prometheus start err: %d", rpcRegisterName, prometheusPort)) | ||||||
|  | 				netDone <- struct{}{} | ||||||
|  | 			} | ||||||
|  | 			// metric.InitializeMetrics(srv) | ||||||
|  | 			// Create a HTTP server for prometheus. | ||||||
|  | 			// httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)} | ||||||
|  | 			// if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { | ||||||
|  | 			//	netErr = errs.WrapMsg(err, "prometheus start err", httpServer.Addr) | ||||||
|  | 			//	netDone <- struct{}{} | ||||||
|  | 			// } | ||||||
|  | 		}() | ||||||
|  | 	} else { | ||||||
|  | 		options = append(options, mw.GrpcServer()) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	listener, err := net.Listen( | 	listener, err := net.Listen( | ||||||
| 		"tcp", | 		"tcp", | ||||||
| 		rpcTcpAddr, | 		rpcTcpAddr, | ||||||
| @ -54,8 +103,8 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo | |||||||
| 		return errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr) | 		return errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	h, portStr, _ := net.SplitHostPort(listener.Addr().String()) | 	_, portStr, _ := net.SplitHostPort(listener.Addr().String()) | ||||||
| 	host, _ := addr.Extract(h) | 	registerIP = network.GetListenIP(registerIP) | ||||||
| 	port, _ := strconv.Atoi(portStr) | 	port, _ := strconv.Atoi(portStr) | ||||||
| 
 | 
 | ||||||
| 	log.CInfo(ctx, "RPC server is initializing", "rpcRegisterName", rpcRegisterName, "rpcPort", portStr, | 	log.CInfo(ctx, "RPC server is initializing", "rpcRegisterName", rpcRegisterName, "rpcPort", portStr, | ||||||
| @ -70,22 +119,6 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo | |||||||
| 	defer client.Close() | 	defer client.Close() | ||||||
| 	client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) | 	client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) | ||||||
| 
 | 
 | ||||||
| 	// var reg *prometheus.Registry |  | ||||||
| 	// var metric *grpcprometheus.ServerMetrics |  | ||||||
| 	if prometheusConfig.Enable { |  | ||||||
| 		// cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share) |  | ||||||
| 		// reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics) |  | ||||||
| 		// options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()), |  | ||||||
| 		//	grpc.UnaryInterceptor(metric.UnaryServerInterceptor())) |  | ||||||
| 		options = append( |  | ||||||
| 			options, mw.GrpcServer(), |  | ||||||
| 			prommetricsUnaryInterceptor(rpcRegisterName), |  | ||||||
| 			prommetricsStreamInterceptor(rpcRegisterName), |  | ||||||
| 		) |  | ||||||
| 	} else { |  | ||||||
| 		options = append(options, mw.GrpcServer()) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	srv := grpc.NewServer(options...) | 	srv := grpc.NewServer(options...) | ||||||
| 
 | 
 | ||||||
| 	err = rpcFn(ctx, config, client, srv) | 	err = rpcFn(ctx, config, client, srv) | ||||||
| @ -95,7 +128,7 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo | |||||||
| 
 | 
 | ||||||
| 	err = client.Register( | 	err = client.Register( | ||||||
| 		rpcRegisterName, | 		rpcRegisterName, | ||||||
| 		host, | 		registerIP, | ||||||
| 		port, | 		port, | ||||||
| 		grpc.WithTransportCredentials(insecure.NewCredentials()), | 		grpc.WithTransportCredentials(insecure.NewCredentials()), | ||||||
| 	) | 	) | ||||||
| @ -103,33 +136,6 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo | |||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	var ( |  | ||||||
| 		netDone = make(chan struct{}, 2) |  | ||||||
| 		netErr  error |  | ||||||
| 	) |  | ||||||
| 	if prometheusConfig.Enable { |  | ||||||
| 		go func() { |  | ||||||
| 			prometheusPort, err := datautil.GetElemByIndex(prometheusConfig.Ports, index) |  | ||||||
| 			if err != nil { |  | ||||||
| 				netErr = err |  | ||||||
| 				netDone <- struct{}{} |  | ||||||
| 				return |  | ||||||
| 			} |  | ||||||
| 			cs := prommetrics.GetGrpcCusMetrics(rpcRegisterName, discovery) |  | ||||||
| 			if err := prommetrics.RpcInit(cs, prometheusPort); err != nil && err != http.ErrServerClosed { |  | ||||||
| 				netErr = errs.WrapMsg(err, fmt.Sprintf("rpc %s prometheus start err: %d", rpcRegisterName, prometheusPort)) |  | ||||||
| 				netDone <- struct{}{} |  | ||||||
| 			} |  | ||||||
| 			// metric.InitializeMetrics(srv) |  | ||||||
| 			// Create a HTTP server for prometheus. |  | ||||||
| 			// httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)} |  | ||||||
| 			// if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { |  | ||||||
| 			//	netErr = errs.WrapMsg(err, "prometheus start err", httpServer.Addr) |  | ||||||
| 			//	netDone <- struct{}{} |  | ||||||
| 			// } |  | ||||||
| 		}() |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	go func() { | 	go func() { | ||||||
| 		err := srv.Serve(listener) | 		err := srv.Serve(listener) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user