feat: compatible autoSetPorts

This commit is contained in:
icey-yu 2024-12-05 12:23:00 +08:00
parent 17d5df88d1
commit 4fd0b55c64
22 changed files with 150 additions and 243 deletions

View File

@ -1,6 +1,11 @@
rpc: rpc:
# The IP address where this RPC service registers itself; if left blank, it defaults to the internal network IP # The IP address where this RPC service registers itself; if left blank, it defaults to the internal network IP
registerIP: registerIP:
# autoSetPorts indicates whether to automatically set the ports
autoSetPorts: true
# List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports
# It will only take effect when autoSetPorts is set to false.
ports: [ 10140, 10141, 10142, 10143, 10144, 10145, 10146, 10147, 10148, 10149, 10150, 10151, 10152, 10153, 10154, 10155 ]
prometheus: prometheus:
# Enable or disable Prometheus monitoring # Enable or disable Prometheus monitoring

View File

@ -3,6 +3,12 @@ rpc:
registerIP: registerIP:
# IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP
listenIP: 0.0.0.0 listenIP: 0.0.0.0
# autoSetPorts indicates whether to automatically set the ports
autoSetPorts: true
# List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports
# It will only take effect when autoSetPorts is set to false.
ports: [ 10170, 10171, 10172, 10173, 10174, 10175, 10176, 10177, 10178, 10179, 10180, 10181, 10182, 10183, 10184, 10185 ]
prometheus: prometheus:
# Enable or disable Prometheus monitoring # Enable or disable Prometheus monitoring

View File

@ -3,6 +3,11 @@ rpc:
registerIP: registerIP:
# IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP
listenIP: 0.0.0.0 listenIP: 0.0.0.0
# autoSetPorts indicates whether to automatically set the ports
autoSetPorts: true
# List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports
# It will only take effect when autoSetPorts is set to false.
ports: [ 10200 ]
prometheus: prometheus:

View File

@ -3,6 +3,11 @@ rpc:
registerIP: registerIP:
# IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP
listenIP: 0.0.0.0 listenIP: 0.0.0.0
# autoSetPorts indicates whether to automatically set the ports
autoSetPorts: true
# List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports
# It will only take effect when autoSetPorts is set to false.
ports: [ 10220 ]
prometheus: prometheus:
# Enable or disable Prometheus monitoring # Enable or disable Prometheus monitoring

View File

@ -3,6 +3,11 @@ rpc:
registerIP: registerIP:
# IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP
listenIP: 0.0.0.0 listenIP: 0.0.0.0
# autoSetPorts indicates whether to automatically set the ports
autoSetPorts: true
# List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports
# It will only take effect when autoSetPorts is set to false.
ports: [ 10240 ]
prometheus: prometheus:
# Enable or disable Prometheus monitoring # Enable or disable Prometheus monitoring

View File

@ -3,6 +3,11 @@ rpc:
registerIP: registerIP:
# IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP
listenIP: 0.0.0.0 listenIP: 0.0.0.0
# autoSetPorts indicates whether to automatically set the ports
autoSetPorts: true
# List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports
# It will only take effect when autoSetPorts is set to false.
ports: [ 10260 ]
prometheus: prometheus:
# Enable or disable Prometheus monitoring # Enable or disable Prometheus monitoring

View File

@ -3,6 +3,11 @@ rpc:
registerIP: registerIP:
# IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP
listenIP: 0.0.0.0 listenIP: 0.0.0.0
# autoSetPorts indicates whether to automatically set the ports
autoSetPorts: true
# List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports
# It will only take effect when autoSetPorts is set to false.
ports: [ 10280 ]
prometheus: prometheus:
# Enable or disable Prometheus monitoring # Enable or disable Prometheus monitoring

View File

@ -3,6 +3,11 @@ rpc:
registerIP: registerIP:
# IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP
listenIP: 0.0.0.0 listenIP: 0.0.0.0
# autoSetPorts indicates whether to automatically set the ports
autoSetPorts: true
# List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports
# It will only take effect when autoSetPorts is set to false.
ports: [ 10300 ]
prometheus: prometheus:
# Enable or disable Prometheus monitoring # Enable or disable Prometheus monitoring

View File

@ -3,6 +3,11 @@ rpc:
registerIP: registerIP:
# Listening IP; 0.0.0.0 means both internal and external IPs are listened to, if blank, the internal network IP is automatically obtained by default # Listening IP; 0.0.0.0 means both internal and external IPs are listened to, if blank, the internal network IP is automatically obtained by default
listenIP: 0.0.0.0 listenIP: 0.0.0.0
# autoSetPorts indicates whether to automatically set the ports
autoSetPorts: true
# List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports
# It will only take effect when autoSetPorts is set to false.
ports: [ 10320 ]
prometheus: prometheus:
# Whether to enable prometheus # Whether to enable prometheus

View File

@ -46,7 +46,8 @@ func (s *Server) InitServer(ctx context.Context, config *Config, disCov discover
func (s *Server) Start(ctx context.Context, index int, conf *Config) error { func (s *Server) Start(ctx context.Context, index int, conf *Config) error {
return startrpc.Start(ctx, &conf.Discovery, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP, return startrpc.Start(ctx, &conf.Discovery, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP,
index, conf.MsgGateway.RPC.RegisterIP,
conf.MsgGateway.RPC.AutoSetPorts, conf.MsgGateway.RPC.Ports, index,
conf.Share.RpcRegisterName.MessageGateway, conf.Share.RpcRegisterName.MessageGateway,
&conf.Share, &conf.Share,
conf, conf,

View File

@ -35,7 +35,8 @@ type Config struct {
// Start run ws server. // Start run ws server.
func Start(ctx context.Context, index int, conf *Config) error { func Start(ctx context.Context, index int, conf *Config) error {
log.CInfo(ctx, "MSG-GATEWAY server is initializing", log.CInfo(ctx, "MSG-GATEWAY server is initializing", "autoSetPorts", conf.MsgGateway.RPC.AutoSetPorts,
"rpcPorts", conf.MsgGateway.RPC.Ports,
"wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports) "wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports)
wsPort, err := datautil.GetElemByIndex(conf.MsgGateway.LongConnSvr.Ports, index) wsPort, err := datautil.GetElemByIndex(conf.MsgGateway.LongConnSvr.Ports, index)
if err != nil { if err != nil {

View File

@ -1,165 +0,0 @@
// addr provides functions to retrieve local IP addresses from device interfaces.
package addr
import (
"net"
"github.com/pkg/errors"
)
var (
// ErrIPNotFound no IP address found, and explicit IP not provided.
ErrIPNotFound = errors.New("no IP address found, and explicit IP not provided")
)
// IsLocal checks whether an IP belongs to one of the device's interfaces.
func IsLocal(addr string) bool {
// Extract the host
host, _, err := net.SplitHostPort(addr)
if err == nil {
addr = host
}
if addr == "localhost" {
return true
}
// Check against all local ips
for _, ip := range IPs() {
if addr == ip {
return true
}
}
return false
}
// Extract returns a valid IP address. If the address provided is a valid
// address, it will be returned directly. Otherwise, the available interfaces
// will be iterated over to find an IP address, preferably private.
func Extract(addr string) (string, error) {
// if addr is already specified then it's directly returned
if len(addr) > 0 && (addr != "0.0.0.0" && addr != "[::]" && addr != "::") {
return addr, nil
}
var (
addrs []net.Addr
loAddrs []net.Addr
)
ifaces, err := net.Interfaces()
if err != nil {
return "", errors.Wrap(err, "failed to get interfaces")
}
for _, iface := range ifaces {
ifaceAddrs, err := iface.Addrs()
if err != nil {
// ignore error, interface can disappear from system
continue
}
if iface.Flags&net.FlagLoopback != 0 {
loAddrs = append(loAddrs, ifaceAddrs...)
continue
}
addrs = append(addrs, ifaceAddrs...)
}
// Add loopback addresses to the end of the list
addrs = append(addrs, loAddrs...)
// Try to find private IP in list, public IP otherwise
ip, err := findIP(addrs)
if err != nil {
return "", err
}
return ip.String(), nil
}
// IPs returns all available interface IP addresses.
func IPs() []string {
ifaces, err := net.Interfaces()
if err != nil {
return nil
}
var ipAddrs []string
for _, i := range ifaces {
addrs, err := i.Addrs()
if err != nil {
continue
}
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
if ip == nil {
continue
}
ipAddrs = append(ipAddrs, ip.String())
}
}
return ipAddrs
}
// findIP will return the first private IP available in the list.
// If no private IP is available it will return the first public IP, if present.
// If no public IP is available, it will return the first loopback IP, if present.
func findIP(addresses []net.Addr) (net.IP, error) {
var publicIP net.IP
var localIP net.IP
for _, rawAddr := range addresses {
var ip net.IP
switch addr := rawAddr.(type) {
case *net.IPAddr:
ip = addr.IP
case *net.IPNet:
ip = addr.IP
default:
continue
}
if ip.IsLoopback() {
if localIP == nil {
localIP = ip
}
continue
}
if !ip.IsPrivate() {
if publicIP == nil {
publicIP = ip
}
continue
}
// Return private IP if available
return ip, nil
}
// Return public or virtual IP
if len(publicIP) > 0 {
return publicIP, nil
}
// Return local IP
if len(localIP) > 0 {
return localIP, nil
}
return nil, ErrIPNotFound
}

View File

@ -55,5 +55,6 @@ func (a *AuthRpcCmd) Exec() error {
func (a *AuthRpcCmd) runE() error { func (a *AuthRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP,
a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.AutoSetPorts, a.authConfig.RpcConfig.RPC.Ports,
a.Index(), a.authConfig.Share.RpcRegisterName.Auth, &a.authConfig.Share, a.authConfig, auth.Start) a.Index(), a.authConfig.Share.RpcRegisterName.Auth, &a.authConfig.Share, a.authConfig, auth.Start)
} }

View File

@ -57,5 +57,6 @@ func (a *ConversationRpcCmd) Exec() error {
func (a *ConversationRpcCmd) runE() error { func (a *ConversationRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP,
a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.AutoSetPorts, a.conversationConfig.RpcConfig.RPC.Ports,
a.Index(), a.conversationConfig.Share.RpcRegisterName.Conversation, &a.conversationConfig.Share, a.conversationConfig, conversation.Start) a.Index(), a.conversationConfig.Share.RpcRegisterName.Conversation, &a.conversationConfig.Share, a.conversationConfig, conversation.Start)
} }

View File

@ -58,5 +58,6 @@ func (a *FriendRpcCmd) Exec() error {
func (a *FriendRpcCmd) runE() error { func (a *FriendRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP,
a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.AutoSetPorts, a.relationConfig.RpcConfig.RPC.Ports,
a.Index(), a.relationConfig.Share.RpcRegisterName.Friend, &a.relationConfig.Share, a.relationConfig, relation.Start) a.Index(), a.relationConfig.Share.RpcRegisterName.Friend, &a.relationConfig.Share, a.relationConfig, relation.Start)
} }

View File

@ -59,5 +59,6 @@ func (a *GroupRpcCmd) Exec() error {
func (a *GroupRpcCmd) runE() error { func (a *GroupRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP,
a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.AutoSetPorts, a.groupConfig.RpcConfig.RPC.Ports,
a.Index(), a.groupConfig.Share.RpcRegisterName.Group, &a.groupConfig.Share, a.groupConfig, group.Start, versionctx.EnableVersionCtx()) a.Index(), a.groupConfig.Share.RpcRegisterName.Group, &a.groupConfig.Share, a.groupConfig, group.Start, versionctx.EnableVersionCtx())
} }

View File

@ -59,5 +59,6 @@ func (a *MsgRpcCmd) Exec() error {
func (a *MsgRpcCmd) runE() error { func (a *MsgRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP,
a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.AutoSetPorts, a.msgConfig.RpcConfig.RPC.Ports,
a.Index(), a.msgConfig.Share.RpcRegisterName.Msg, &a.msgConfig.Share, a.msgConfig, msg.Start) a.Index(), a.msgConfig.Share.RpcRegisterName.Msg, &a.msgConfig.Share, a.msgConfig, msg.Start)
} }

View File

@ -59,5 +59,6 @@ func (a *PushRpcCmd) Exec() error {
func (a *PushRpcCmd) runE() error { func (a *PushRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP,
a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.AutoSetPorts, a.pushConfig.RpcConfig.RPC.Ports,
a.Index(), a.pushConfig.Share.RpcRegisterName.Push, &a.pushConfig.Share, a.pushConfig, push.Start) a.Index(), a.pushConfig.Share.RpcRegisterName.Push, &a.pushConfig.Share, a.pushConfig, push.Start)
} }

View File

@ -58,5 +58,6 @@ func (a *ThirdRpcCmd) Exec() error {
func (a *ThirdRpcCmd) runE() error { func (a *ThirdRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP,
a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.AutoSetPorts, a.thirdConfig.RpcConfig.RPC.Ports,
a.Index(), a.thirdConfig.Share.RpcRegisterName.Third, &a.thirdConfig.Share, a.thirdConfig, third.Start) a.Index(), a.thirdConfig.Share.RpcRegisterName.Third, &a.thirdConfig.Share, a.thirdConfig, third.Start)
} }

View File

@ -59,5 +59,6 @@ func (a *UserRpcCmd) Exec() error {
func (a *UserRpcCmd) runE() error { func (a *UserRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP,
a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.AutoSetPorts, a.userConfig.RpcConfig.RPC.Ports,
a.Index(), a.userConfig.Share.RpcRegisterName.User, &a.userConfig.Share, a.userConfig, user.Start) a.Index(), a.userConfig.Share.RpcRegisterName.User, &a.userConfig.Share, a.userConfig, user.Start)
} }

View File

@ -176,8 +176,9 @@ type Prometheus struct {
type MsgGateway struct { type MsgGateway struct {
RPC struct { RPC struct {
RegisterIP string `mapstructure:"registerIP"` RegisterIP string `mapstructure:"registerIP"`
Ports []int `mapstructure:"ports"` AutoSetPorts bool `mapstructure:"autoSetPorts"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"` } `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"` Prometheus Prometheus `mapstructure:"prometheus"`
ListenIP string `mapstructure:"listenIP"` ListenIP string `mapstructure:"listenIP"`
@ -195,8 +196,10 @@ type MsgTransfer struct {
type Push struct { type Push struct {
RPC struct { RPC struct {
RegisterIP string `mapstructure:"registerIP"` RegisterIP string `mapstructure:"registerIP"`
ListenIP string `mapstructure:"listenIP"` ListenIP string `mapstructure:"listenIP"`
AutoSetPorts bool `mapstructure:"autoSetPorts"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"` } `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"` Prometheus Prometheus `mapstructure:"prometheus"`
MaxConcurrentWorkers int `mapstructure:"maxConcurrentWorkers"` MaxConcurrentWorkers int `mapstructure:"maxConcurrentWorkers"`
@ -229,9 +232,10 @@ type Push struct {
type Auth struct { type Auth struct {
RPC struct { RPC struct {
RegisterIP string `mapstructure:"registerIP"` RegisterIP string `mapstructure:"registerIP"`
ListenIP string `mapstructure:"listenIP"` ListenIP string `mapstructure:"listenIP"`
Ports []int `mapstructure:"ports"` AutoSetPorts bool `mapstructure:"autoSetPorts"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"` } `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"` Prometheus Prometheus `mapstructure:"prometheus"`
TokenPolicy struct { TokenPolicy struct {
@ -241,27 +245,30 @@ type Auth struct {
type Conversation struct { type Conversation struct {
RPC struct { RPC struct {
RegisterIP string `mapstructure:"registerIP"` RegisterIP string `mapstructure:"registerIP"`
ListenIP string `mapstructure:"listenIP"` ListenIP string `mapstructure:"listenIP"`
Ports []int `mapstructure:"ports"` AutoSetPorts bool `mapstructure:"autoSetPorts"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"` } `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"` Prometheus Prometheus `mapstructure:"prometheus"`
} }
type Friend struct { type Friend struct {
RPC struct { RPC struct {
RegisterIP string `mapstructure:"registerIP"` RegisterIP string `mapstructure:"registerIP"`
ListenIP string `mapstructure:"listenIP"` ListenIP string `mapstructure:"listenIP"`
Ports []int `mapstructure:"ports"` AutoSetPorts bool `mapstructure:"autoSetPorts"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"` } `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"` Prometheus Prometheus `mapstructure:"prometheus"`
} }
type Group struct { type Group struct {
RPC struct { RPC struct {
RegisterIP string `mapstructure:"registerIP"` RegisterIP string `mapstructure:"registerIP"`
ListenIP string `mapstructure:"listenIP"` ListenIP string `mapstructure:"listenIP"`
Ports []int `mapstructure:"ports"` AutoSetPorts bool `mapstructure:"autoSetPorts"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"` } `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"` Prometheus Prometheus `mapstructure:"prometheus"`
EnableHistoryForNewMembers bool `mapstructure:"enableHistoryForNewMembers"` EnableHistoryForNewMembers bool `mapstructure:"enableHistoryForNewMembers"`
@ -269,9 +276,10 @@ type Group struct {
type Msg struct { type Msg struct {
RPC struct { RPC struct {
RegisterIP string `mapstructure:"registerIP"` RegisterIP string `mapstructure:"registerIP"`
ListenIP string `mapstructure:"listenIP"` ListenIP string `mapstructure:"listenIP"`
Ports []int `mapstructure:"ports"` AutoSetPorts bool `mapstructure:"autoSetPorts"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"` } `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"` Prometheus Prometheus `mapstructure:"prometheus"`
FriendVerify bool `mapstructure:"friendVerify"` FriendVerify bool `mapstructure:"friendVerify"`
@ -279,9 +287,10 @@ type Msg struct {
type Third struct { type Third struct {
RPC struct { RPC struct {
RegisterIP string `mapstructure:"registerIP"` RegisterIP string `mapstructure:"registerIP"`
ListenIP string `mapstructure:"listenIP"` ListenIP string `mapstructure:"listenIP"`
Ports []int `mapstructure:"ports"` AutoSetPorts bool `mapstructure:"autoSetPorts"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"` } `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"` Prometheus Prometheus `mapstructure:"prometheus"`
Object struct { Object struct {
@ -328,9 +337,10 @@ type Kodo struct {
type User struct { type User struct {
RPC struct { RPC struct {
RegisterIP string `mapstructure:"registerIP"` RegisterIP string `mapstructure:"registerIP"`
ListenIP string `mapstructure:"listenIP"` ListenIP string `mapstructure:"listenIP"`
Ports []int `mapstructure:"ports"` AutoSetPorts bool `mapstructure:"autoSetPorts"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"` } `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"` Prometheus Prometheus `mapstructure:"prometheus"`
} }

View File

@ -16,6 +16,7 @@ package startrpc
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/datautil"
@ -27,7 +28,6 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/openimsdk/open-im-server/v3/internal/tools/addr"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery"
@ -41,11 +41,60 @@ import (
) )
// Start rpc server. // Start rpc server.
func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusConfig *config.Prometheus, listenIP string, func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusConfig *config.Prometheus, listenIP,
index int, rpcRegisterName string, share *config.Share, config T, rpcFn func(ctx context.Context, registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, share *config.Share, config T, rpcFn func(ctx context.Context,
config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error {
var (
rpcTcpAddr string
netDone = make(chan struct{}, 2)
netErr error
)
if !autoSetPorts {
rpcPort, err := datautil.GetElemByIndex(rpcPorts, index)
if err != nil {
return err
}
rpcTcpAddr = net.JoinHostPort(network.GetListenIP(listenIP), strconv.Itoa(rpcPort))
} else {
rpcTcpAddr = net.JoinHostPort(network.GetListenIP(listenIP), "0")
}
// var reg *prometheus.Registry
// var metric *grpcprometheus.ServerMetrics
if prometheusConfig.Enable {
// cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share)
// reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics)
// options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()),
// grpc.UnaryInterceptor(metric.UnaryServerInterceptor()))
options = append(
options, mw.GrpcServer(),
prommetricsUnaryInterceptor(rpcRegisterName),
prommetricsStreamInterceptor(rpcRegisterName),
)
prometheusPort, err := datautil.GetElemByIndex(prometheusConfig.Ports, index)
if err != nil {
return err
}
cs := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share)
go func() {
if err := prommetrics.RpcInit(cs, prometheusPort); err != nil && !errors.Is(err, http.ErrServerClosed) {
netErr = errs.WrapMsg(err, fmt.Sprintf("rpc %s prometheus start err: %d", rpcRegisterName, prometheusPort))
netDone <- struct{}{}
}
// metric.InitializeMetrics(srv)
// Create a HTTP server for prometheus.
// httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)}
// if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
// netErr = errs.WrapMsg(err, "prometheus start err", httpServer.Addr)
// netDone <- struct{}{}
// }
}()
} else {
options = append(options, mw.GrpcServer())
}
rpcTcpAddr := net.JoinHostPort(network.GetListenIP(listenIP), "0")
listener, err := net.Listen( listener, err := net.Listen(
"tcp", "tcp",
rpcTcpAddr, rpcTcpAddr,
@ -54,8 +103,8 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo
return errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr) return errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr)
} }
h, portStr, _ := net.SplitHostPort(listener.Addr().String()) _, portStr, _ := net.SplitHostPort(listener.Addr().String())
host, _ := addr.Extract(h) registerIP = network.GetListenIP(registerIP)
port, _ := strconv.Atoi(portStr) port, _ := strconv.Atoi(portStr)
log.CInfo(ctx, "RPC server is initializing", "rpcRegisterName", rpcRegisterName, "rpcPort", portStr, log.CInfo(ctx, "RPC server is initializing", "rpcRegisterName", rpcRegisterName, "rpcPort", portStr,
@ -70,22 +119,6 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo
defer client.Close() defer client.Close()
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
// var reg *prometheus.Registry
// var metric *grpcprometheus.ServerMetrics
if prometheusConfig.Enable {
// cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share)
// reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics)
// options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()),
// grpc.UnaryInterceptor(metric.UnaryServerInterceptor()))
options = append(
options, mw.GrpcServer(),
prommetricsUnaryInterceptor(rpcRegisterName),
prommetricsStreamInterceptor(rpcRegisterName),
)
} else {
options = append(options, mw.GrpcServer())
}
srv := grpc.NewServer(options...) srv := grpc.NewServer(options...)
err = rpcFn(ctx, config, client, srv) err = rpcFn(ctx, config, client, srv)
@ -95,7 +128,7 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo
err = client.Register( err = client.Register(
rpcRegisterName, rpcRegisterName,
host, registerIP,
port, port,
grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithTransportCredentials(insecure.NewCredentials()),
) )
@ -103,33 +136,6 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo
return err return err
} }
var (
netDone = make(chan struct{}, 2)
netErr error
)
if prometheusConfig.Enable {
go func() {
prometheusPort, err := datautil.GetElemByIndex(prometheusConfig.Ports, index)
if err != nil {
netErr = err
netDone <- struct{}{}
return
}
cs := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share)
if err := prommetrics.RpcInit(cs, prometheusPort); err != nil && err != http.ErrServerClosed {
netErr = errs.WrapMsg(err, fmt.Sprintf("rpc %s prometheus start err: %d", rpcRegisterName, prometheusPort))
netDone <- struct{}{}
}
// metric.InitializeMetrics(srv)
// Create a HTTP server for prometheus.
// httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)}
// if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
// netErr = errs.WrapMsg(err, "prometheus start err", httpServer.Addr)
// netDone <- struct{}{}
// }
}()
}
go func() { go func() {
err := srv.Serve(listener) err := srv.Serve(listener)
if err != nil { if err != nil {