From 1e68f99d1180afe674c8e09870a4903fb60e1187 Mon Sep 17 00:00:00 2001 From: icey-yu <1186114839@qq.com> Date: Tue, 10 Dec 2024 11:43:19 +0800 Subject: [PATCH] fix: add autoPort && prometheus port discovery --- config/openim-api.yml | 3 + config/openim-msggateway.yml | 6 +- config/openim-msgtransfer.yml | 4 +- config/openim-push.yml | 4 + config/openim-rpc-auth.yml | 4 + config/openim-rpc-conversation.yml | 4 + config/openim-rpc-friend.yml | 4 + config/openim-rpc-group.yml | 4 + config/openim-rpc-msg.yml | 4 + config/openim-rpc-third.yml | 4 + config/openim-rpc-user.yml | 6 +- config/prometheus.yml | 127 ++++++++++------ docker-compose.yml | 111 ++++++++------ go.mod | 2 +- internal/api/init.go | 77 ++++++++-- internal/api/prometheus_discovery.go | 114 ++++++++++++++ internal/api/router.go | 35 +++-- internal/msggateway/hub_server.go | 1 + internal/msggateway/init.go | 9 +- internal/msgtransfer/init.go | 91 +++++++++-- pkg/common/cmd/auth.go | 2 +- pkg/common/cmd/conversation.go | 2 +- pkg/common/cmd/friend.go | 2 +- pkg/common/cmd/group.go | 2 +- pkg/common/cmd/msg.go | 2 +- pkg/common/cmd/push.go | 2 +- pkg/common/cmd/third.go | 2 +- pkg/common/cmd/user.go | 2 +- pkg/common/config/config.go | 76 ++++++---- pkg/common/config/constant.go | 5 +- pkg/common/config/load_config_test.go | 8 + .../discoveryregister/discoveryregister.go | 10 -- pkg/common/prommetrics/api.go | 5 +- pkg/common/prommetrics/discovery.go | 31 ++++ pkg/common/prommetrics/prommetrics.go | 6 +- pkg/common/prommetrics/rpc.go | 5 +- pkg/common/prommetrics/transfer.go | 5 +- pkg/common/startrpc/start.go | 142 ++++++++++++------ 38 files changed, 670 insertions(+), 253 deletions(-) create mode 100644 internal/api/prometheus_discovery.go create mode 100644 pkg/common/prommetrics/discovery.go diff --git a/config/openim-api.yml b/config/openim-api.yml index 4c38e1005..a23b5fb31 100644 --- a/config/openim-api.yml +++ b/config/openim-api.yml @@ -10,7 +10,10 @@ api: prometheus: # Whether to enable prometheus enable: true + # autoSetPorts indicates whether to automatically set the ports + autoSetPorts: true # Prometheus listening ports, must match the number of api.ports + # It will only take effect when autoSetPorts is set to false. ports: [ 12002 ] # This address can be accessed via a browser grafanaURL: http://127.0.0.1:13000/ diff --git a/config/openim-msggateway.yml b/config/openim-msggateway.yml index 6c46b52a8..d374ce3c7 100644 --- a/config/openim-msggateway.yml +++ b/config/openim-msggateway.yml @@ -1,13 +1,17 @@ rpc: # The IP address where this RPC service registers itself; if left blank, it defaults to the internal network IP - registerIP: + registerIP: + # autoSetPorts indicates whether to automatically set the ports + autoSetPorts: true # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports + # It will only take effect when autoSetPorts is set to false. ports: [ 10140, 10141, 10142, 10143, 10144, 10145, 10146, 10147, 10148, 10149, 10150, 10151, 10152, 10153, 10154, 10155 ] prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup + # It will only take effect when autoSetPorts is set to false. ports: [ 12140, 12141, 12142, 12143, 12144, 12145, 12146, 12147, 12148, 12149, 12150, 12151, 12152, 12153, 12154, 12155 ] # IP address that the RPC/WebSocket service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP diff --git a/config/openim-msgtransfer.yml b/config/openim-msgtransfer.yml index 94ed073d8..39b23b222 100644 --- a/config/openim-msgtransfer.yml +++ b/config/openim-msgtransfer.yml @@ -1,6 +1,8 @@ prometheus: # Enable or disable Prometheus monitoring enable: true + # autoSetPorts indicates whether to automatically set the ports + autoSetPorts: true # List of ports that Prometheus listens on; each port corresponds to an instance of monitoring. Ensure these are managed accordingly - # Because four instances have been launched, four ports need to be specified + # It will only take effect when autoSetPorts is set to false. ports: [ 12020, 12021, 12022, 12023, 12024, 12025, 12026, 12027, 12028, 12029, 12030, 12031, 12032, 12033, 12034, 12035 ] diff --git a/config/openim-push.yml b/config/openim-push.yml index 70e67add2..1bb84a172 100644 --- a/config/openim-push.yml +++ b/config/openim-push.yml @@ -3,13 +3,17 @@ rpc: registerIP: # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP listenIP: 0.0.0.0 + # autoSetPorts indicates whether to automatically set the ports + autoSetPorts: true # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports + # It will only take effect when autoSetPorts is set to false. ports: [ 10170, 10171, 10172, 10173, 10174, 10175, 10176, 10177, 10178, 10179, 10180, 10181, 10182, 10183, 10184, 10185 ] prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup + # It will only take effect when autoSetPorts is set to false. ports: [ 12170, 12171, 12172, 12173, 12174, 12175, 12176, 12177, 12178, 12179, 12180, 12182, 12183, 12184, 12185, 12186 ] maxConcurrentWorkers: 3 diff --git a/config/openim-rpc-auth.yml b/config/openim-rpc-auth.yml index 496803e43..d6e234b63 100644 --- a/config/openim-rpc-auth.yml +++ b/config/openim-rpc-auth.yml @@ -3,13 +3,17 @@ rpc: registerIP: # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP listenIP: 0.0.0.0 + # autoSetPorts indicates whether to automatically set the ports + autoSetPorts: true # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports + # It will only take effect when autoSetPorts is set to false. ports: [ 10200 ] prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup + # It will only take effect when autoSetPorts is set to false. ports: [ 12200 ] tokenPolicy: diff --git a/config/openim-rpc-conversation.yml b/config/openim-rpc-conversation.yml index 3581d7e19..0636a76e3 100644 --- a/config/openim-rpc-conversation.yml +++ b/config/openim-rpc-conversation.yml @@ -3,11 +3,15 @@ rpc: registerIP: # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP listenIP: 0.0.0.0 + # autoSetPorts indicates whether to automatically set the ports + autoSetPorts: true # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports + # It will only take effect when autoSetPorts is set to false. ports: [ 10220 ] prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup + # It will only take effect when autoSetPorts is set to false. ports: [ 12220 ] diff --git a/config/openim-rpc-friend.yml b/config/openim-rpc-friend.yml index 3022c09f3..e2b150cec 100644 --- a/config/openim-rpc-friend.yml +++ b/config/openim-rpc-friend.yml @@ -3,11 +3,15 @@ rpc: registerIP: # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP listenIP: 0.0.0.0 + # autoSetPorts indicates whether to automatically set the ports + autoSetPorts: true # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports + # It will only take effect when autoSetPorts is set to false. ports: [ 10240 ] prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup + # It will only take effect when autoSetPorts is set to false. ports: [ 12240 ] diff --git a/config/openim-rpc-group.yml b/config/openim-rpc-group.yml index 9a634d12f..a8c2d5ec1 100644 --- a/config/openim-rpc-group.yml +++ b/config/openim-rpc-group.yml @@ -3,13 +3,17 @@ rpc: registerIP: # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP listenIP: 0.0.0.0 + # autoSetPorts indicates whether to automatically set the ports + autoSetPorts: true # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports + # It will only take effect when autoSetPorts is set to false. ports: [ 10260 ] prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup + # It will only take effect when autoSetPorts is set to false. ports: [ 12260 ] diff --git a/config/openim-rpc-msg.yml b/config/openim-rpc-msg.yml index 82d6e2f53..fdb6d8035 100644 --- a/config/openim-rpc-msg.yml +++ b/config/openim-rpc-msg.yml @@ -3,13 +3,17 @@ rpc: registerIP: # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP listenIP: 0.0.0.0 + # autoSetPorts indicates whether to automatically set the ports + autoSetPorts: true # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports + # It will only take effect when autoSetPorts is set to false. ports: [ 10280 ] prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup + # It will only take effect when autoSetPorts is set to false. ports: [ 12280 ] diff --git a/config/openim-rpc-third.yml b/config/openim-rpc-third.yml index 639affe5b..50088fc03 100644 --- a/config/openim-rpc-third.yml +++ b/config/openim-rpc-third.yml @@ -3,13 +3,17 @@ rpc: registerIP: # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP listenIP: 0.0.0.0 + # autoSetPorts indicates whether to automatically set the ports + autoSetPorts: true # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports + # It will only take effect when autoSetPorts is set to false. ports: [ 10300 ] prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup + # It will only take effect when autoSetPorts is set to false. ports: [ 12300 ] diff --git a/config/openim-rpc-user.yml b/config/openim-rpc-user.yml index 798105472..7da94ca0d 100644 --- a/config/openim-rpc-user.yml +++ b/config/openim-rpc-user.yml @@ -3,11 +3,15 @@ rpc: registerIP: # Listening IP; 0.0.0.0 means both internal and external IPs are listened to, if blank, the internal network IP is automatically obtained by default listenIP: 0.0.0.0 - # Listening ports; if multiple are configured, multiple instances will be launched, and must be consistent with the number of prometheus.ports + # autoSetPorts indicates whether to automatically set the ports + autoSetPorts: true + # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports + # It will only take effect when autoSetPorts is set to false. ports: [ 10320 ] prometheus: # Whether to enable prometheus enable: true # Prometheus listening ports, must be consistent with the number of rpc.ports + # It will only take effect when autoSetPorts is set to false. ports: [ 12320 ] diff --git a/config/prometheus.yml b/config/prometheus.yml index ab427ee82..6fb112824 100644 --- a/config/prometheus.yml +++ b/config/prometheus.yml @@ -26,61 +26,94 @@ scrape_configs: - job_name: node_exporter static_configs: - targets: [ internal_ip:20500 ] + - job_name: openimserver-openim-api - static_configs: - - targets: [ internal_ip:12002 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/api" +# static_configs: +# - targets: [ internal_ip:12002 ] +# labels: +# namespace: default + - job_name: openimserver-openim-msggateway - static_configs: - - targets: [ internal_ip:12140 ] -# - targets: [ internal_ip:12140, internal_ip:12141, internal_ip:12142, internal_ip:12143, internal_ip:12144, internal_ip:12145, internal_ip:12146, internal_ip:12147, internal_ip:12148, internal_ip:12149, internal_ip:12150, internal_ip:12151, internal_ip:12152, internal_ip:12153, internal_ip:12154, internal_ip:12155 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/msg_gateway" +# static_configs: +# - targets: [ internal_ip:12140 ] +# # - targets: [ internal_ip:12140, internal_ip:12141, internal_ip:12142, internal_ip:12143, internal_ip:12144, internal_ip:12145, internal_ip:12146, internal_ip:12147, internal_ip:12148, internal_ip:12149, internal_ip:12150, internal_ip:12151, internal_ip:12152, internal_ip:12153, internal_ip:12154, internal_ip:12155 ] +# labels: +# namespace: default + - job_name: openimserver-openim-msgtransfer - static_configs: - - targets: [ internal_ip:12020, internal_ip:12021, internal_ip:12022, internal_ip:12023, internal_ip:12024, internal_ip:12025, internal_ip:12026, internal_ip:12027 ] -# - targets: [ internal_ip:12020, internal_ip:12021, internal_ip:12022, internal_ip:12023, internal_ip:12024, internal_ip:12025, internal_ip:12026, internal_ip:12027, internal_ip:12028, internal_ip:12029, internal_ip:12030, internal_ip:12031, internal_ip:12032, internal_ip:12033, internal_ip:12034, internal_ip:12035 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/msg_transfer" +# static_configs: +# - targets: [ internal_ip:12020, internal_ip:12021, internal_ip:12022, internal_ip:12023, internal_ip:12024, internal_ip:12025, internal_ip:12026, internal_ip:12027 ] +# # - targets: [ internal_ip:12020, internal_ip:12021, internal_ip:12022, internal_ip:12023, internal_ip:12024, internal_ip:12025, internal_ip:12026, internal_ip:12027, internal_ip:12028, internal_ip:12029, internal_ip:12030, internal_ip:12031, internal_ip:12032, internal_ip:12033, internal_ip:12034, internal_ip:12035 ] +# labels: +# namespace: default + - job_name: openimserver-openim-push - static_configs: - - targets: [ internal_ip:12170, internal_ip:12171, internal_ip:12172, internal_ip:12173, internal_ip:12174, internal_ip:12175, internal_ip:12176, internal_ip:12177 ] -# - targets: [ internal_ip:12170, internal_ip:12171, internal_ip:12172, internal_ip:12173, internal_ip:12174, internal_ip:12175, internal_ip:12176, internal_ip:12177, internal_ip:12178, internal_ip:12179, internal_ip:12180, internal_ip:12182, internal_ip:12183, internal_ip:12184, internal_ip:12185, internal_ip:12186 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/push" +# static_configs: +# - targets: [ internal_ip:12170, internal_ip:12171, internal_ip:12172, internal_ip:12173, internal_ip:12174, internal_ip:12175, internal_ip:12176, internal_ip:12177 ] +## - targets: [ internal_ip:12170, internal_ip:12171, internal_ip:12172, internal_ip:12173, internal_ip:12174, internal_ip:12175, internal_ip:12176, internal_ip:12177, internal_ip:12178, internal_ip:12179, internal_ip:12180, internal_ip:12182, internal_ip:12183, internal_ip:12184, internal_ip:12185, internal_ip:12186 ] +# labels: +# namespace: default + - job_name: openimserver-openim-rpc-auth - static_configs: - - targets: [ internal_ip:12200 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/auth" +# static_configs: +# - targets: [ internal_ip:12200 ] +# labels: +# namespace: default + - job_name: openimserver-openim-rpc-conversation - static_configs: - - targets: [ internal_ip:12220 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/conversation" +# static_configs: +# - targets: [ internal_ip:12220 ] +# labels: +# namespace: default + - job_name: openimserver-openim-rpc-friend - static_configs: - - targets: [ internal_ip:12240 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/friend" +# static_configs: +# - targets: [ internal_ip:12240 ] +# labels: +# namespace: default + - job_name: openimserver-openim-rpc-group - static_configs: - - targets: [ internal_ip:12260 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/group" +# static_configs: +# - targets: [ internal_ip:12260 ] +# labels: +# namespace: default. + - job_name: openimserver-openim-rpc-msg - static_configs: - - targets: [ internal_ip:12280 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/msg" +# static_configs: +# - targets: [ internal_ip:12280 ] +# labels: +# namespace: default + - job_name: openimserver-openim-rpc-third - static_configs: - - targets: [ internal_ip:12300 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/third" +# static_configs: +# - targets: [ internal_ip:12300 ] +# labels: +# namespace: default + - job_name: openimserver-openim-rpc-user - static_configs: - - targets: [ internal_ip:12320 ] - labels: - namespace: default \ No newline at end of file + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/user" +# static_configs: +# - targets: [ internal_ip:12320 ] +# labels: +# namespace: default \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 8d25383bc..0c022c91e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -146,49 +146,68 @@ services: networks: - openim -# prometheus: -# image: ${PROMETHEUS_IMAGE} -# container_name: prometheus -# restart: always -# user: root -# volumes: -# - ./config/prometheus.yml:/etc/prometheus/prometheus.yml -# - ./config/instance-down-rules.yml:/etc/prometheus/instance-down-rules.yml -# - ${DATA_DIR}/components/prometheus/data:/prometheus -# command: -# - '--config.file=/etc/prometheus/prometheus.yml' -# - '--storage.tsdb.path=/prometheus' -# ports: -# - "19091:9090" -# networks: -# - openim -# -# alertmanager: -# image: ${ALERTMANAGER_IMAGE} -# container_name: alertmanager -# restart: always -# volumes: -# - ./config/alertmanager.yml:/etc/alertmanager/alertmanager.yml -# - ./config/email.tmpl:/etc/alertmanager/email.tmpl -# ports: -# - "19093:9093" -# networks: -# - openim -# -# grafana: -# image: ${GRAFANA_IMAGE} -# container_name: grafana -# user: root -# restart: always -# environment: -# - GF_SECURITY_ALLOW_EMBEDDING=true -# - GF_SESSION_COOKIE_SAMESITE=none -# - GF_SESSION_COOKIE_SECURE=true -# - GF_AUTH_ANONYMOUS_ENABLED=true -# - GF_AUTH_ANONYMOUS_ORG_ROLE=Admin -# ports: -# - "13000:3000" -# volumes: -# - ${DATA_DIR:-./}/components/grafana:/var/lib/grafana -# networks: -# - openim + prometheus: + image: ${PROMETHEUS_IMAGE} + container_name: prometheus + restart: always + user: root + profiles: + - m + volumes: + - ./config/prometheus.yml:/etc/prometheus/prometheus.yml + - ./config/instance-down-rules.yml:/etc/prometheus/instance-down-rules.yml + - ${DATA_DIR}/components/prometheus/data:/prometheus + command: + - '--config.file=/etc/prometheus/prometheus.yml' + - '--storage.tsdb.path=/prometheus' + - '--web.listen-address=:${PROMETHEUS_PORT}' + network_mode: host + + alertmanager: + image: ${ALERTMANAGER_IMAGE} + container_name: alertmanager + restart: always + profiles: + - m + volumes: + - ./config/alertmanager.yml:/etc/alertmanager/alertmanager.yml + - ./config/email.tmpl:/etc/alertmanager/email.tmpl + command: + - '--config.file=/etc/alertmanager/alertmanager.yml' + - '--web.listen-address=:${ALERTMANAGER_PORT}' + network_mode: host + + grafana: + image: ${GRAFANA_IMAGE} + container_name: grafana + user: root + restart: always + profiles: + - m + environment: + - GF_SECURITY_ALLOW_EMBEDDING=true + - GF_SESSION_COOKIE_SAMESITE=none + - GF_SESSION_COOKIE_SECURE=true + - GF_AUTH_ANONYMOUS_ENABLED=true + - GF_AUTH_ANONYMOUS_ORG_ROLE=Admin + - GF_SERVER_HTTP_PORT=${GRAFANA_PORT} + volumes: + - ${DATA_DIR:-./}/components/grafana:/var/lib/grafana + network_mode: host + + node-exporter: + image: ${NODE_EXPORTER_IMAGE} + container_name: node-exporter + restart: always + profiles: + - m + volumes: + - /proc:/host/proc:ro + - /sys:/host/sys:ro + - /:/rootfs:ro + command: + - '--path.procfs=/host/proc' + - '--path.sysfs=/host/sys' + - '--path.rootfs=/rootfs' + - '--web.listen-address=:{NODE_EXPORTER_PORT}' + network_mode: host diff --git a/go.mod b/go.mod index 99fa917fa..fe0328031 100644 --- a/go.mod +++ b/go.mod @@ -41,6 +41,7 @@ require ( github.com/shirou/gopsutil v3.21.11+incompatible github.com/spf13/viper v1.18.2 github.com/stathat/consistent v1.0.0 + go.etcd.io/etcd/client/v3 v3.5.13 go.uber.org/automaxprocs v1.5.3 golang.org/x/sync v0.8.0 ) @@ -162,7 +163,6 @@ require ( github.com/yusufpapurcu/wmi v1.2.4 // indirect go.etcd.io/etcd/api/v3 v3.5.13 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.13 // indirect - go.etcd.io/etcd/client/v3 v3.5.13 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect diff --git a/internal/api/init.go b/internal/api/init.go index 360ea77f5..fe8ac1cd0 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -2,6 +2,7 @@ package api import ( "context" + "errors" "fmt" "net" "net/http" @@ -12,15 +13,20 @@ import ( "time" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/tools/mw" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/network" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/tools/discovery" + "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/system/program" + "github.com/openimsdk/tools/utils/jsonutil" ) type Config struct { @@ -29,8 +35,8 @@ type Config struct { Discovery config.Discovery } -func Start(ctx context.Context, index int, config *Config) error { - apiPort, err := datautil.GetElemByIndex(config.API.Api.Ports, index) +func Start(ctx context.Context, index int, cfg *Config) error { + apiPort, err := datautil.GetElemByIndex(cfg.API.Api.Ports, index) if err != nil { return err } @@ -38,12 +44,14 @@ func Start(ctx context.Context, index int, config *Config) error { var client discovery.SvcDiscoveryRegistry // Determine whether zk is passed according to whether it is a clustered deployment - client, err = kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share, []string{ - config.Share.RpcRegisterName.MessageGateway, + client, err = kdisc.NewDiscoveryRegister(&cfg.Discovery, &cfg.Share, []string{ + cfg.Share.RpcRegisterName.MessageGateway, }) if err != nil { return errs.WrapMsg(err, "failed to register discovery service") } + client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) var ( netDone = make(chan struct{}, 1) @@ -51,32 +59,73 @@ func Start(ctx context.Context, index int, config *Config) error { prometheusPort int ) - router, err := newGinRouter(ctx, client, config) + router, err := newGinRouter(ctx, client, cfg) if err != nil { return err } - if config.API.Prometheus.Enable { - go func() { - prometheusPort, err = datautil.GetElemByIndex(config.API.Prometheus.Ports, index) + registerIP, err := network.GetRpcRegisterIP("") + if err != nil { + return err + } + + getAutoPort := func() (net.Listener, int, error) { + registerAddr := net.JoinHostPort(registerIP, "0") + listener, err := net.Listen("tcp", registerAddr) + if err != nil { + return nil, 0, errs.WrapMsg(err, "listen err", "registerAddr", registerAddr) + } + _, portStr, _ := net.SplitHostPort(listener.Addr().String()) + port, _ := strconv.Atoi(portStr) + return listener, port, nil + } + + if cfg.API.Prometheus.AutoSetPorts && cfg.Discovery.Enable != config.ETCD { + return errs.New("only etcd support autoSetPorts", "RegisterName", "api").Wrap() + } + + if cfg.API.Prometheus.Enable { + var ( + listener net.Listener + ) + + if cfg.API.Prometheus.AutoSetPorts { + listener, prometheusPort, err = getAutoPort() if err != nil { - netErr = err - netDone <- struct{}{} - return + return err } - if err := prommetrics.ApiInit(prometheusPort); err != nil && err != http.ErrServerClosed { + + etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + + _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) + if err != nil { + return errs.WrapMsg(err, "etcd put err") + } + } else { + prometheusPort, err = datautil.GetElemByIndex(cfg.API.Prometheus.Ports, index) + if err != nil { + return err + } + listener, err = net.Listen("tcp", fmt.Sprintf(":%d", prometheusPort)) + if err != nil { + return errs.WrapMsg(err, "listen err", "addr", fmt.Sprintf(":%d", prometheusPort)) + } + } + + go func() { + if err := prommetrics.ApiInit(listener); err != nil && !errors.Is(err, http.ErrServerClosed) { netErr = errs.WrapMsg(err, fmt.Sprintf("api prometheus start err: %d", prometheusPort)) netDone <- struct{}{} } }() } - address := net.JoinHostPort(network.GetListenIP(config.API.Api.ListenIP), strconv.Itoa(apiPort)) + address := net.JoinHostPort(network.GetListenIP(cfg.API.Api.ListenIP), strconv.Itoa(apiPort)) server := http.Server{Addr: address, Handler: router} log.CInfo(ctx, "API server is initializing", "address", address, "apiPort", apiPort, "prometheusPort", prometheusPort) go func() { err = server.ListenAndServe() - if err != nil && err != http.ErrServerClosed { + if err != nil && !errors.Is(err, http.ErrServerClosed) { netErr = errs.WrapMsg(err, fmt.Sprintf("api start err: %s", server.Addr)) netDone <- struct{}{} diff --git a/internal/api/prometheus_discovery.go b/internal/api/prometheus_discovery.go new file mode 100644 index 000000000..6f17953ae --- /dev/null +++ b/internal/api/prometheus_discovery.go @@ -0,0 +1,114 @@ +package api + +import ( + "encoding/json" + "net/http" + + "github.com/gin-gonic/gin" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" + "github.com/openimsdk/tools/apiresp" + "github.com/openimsdk/tools/discovery" + "github.com/openimsdk/tools/discovery/etcd" + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/log" + clientv3 "go.etcd.io/etcd/client/v3" +) + +type PrometheusDiscoveryApi struct { + config *Config + client *clientv3.Client +} + +func NewPrometheusDiscoveryApi(cfg *Config, client discovery.SvcDiscoveryRegistry) *PrometheusDiscoveryApi { + api := &PrometheusDiscoveryApi{ + config: cfg, + } + if cfg.Discovery.Enable == config.ETCD { + api.client = client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + } + return api +} + +func (p *PrometheusDiscoveryApi) Enable(c *gin.Context) { + if p.config.Discovery.Enable != config.ETCD { + c.JSON(http.StatusOK, []struct{}{}) + c.Abort() + } +} + +func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) { + eResp, err := p.client.Get(c, prommetrics.BuildDiscoveryKey(key)) + if err != nil { + // Log and respond with an error if preparation fails. + apiresp.GinError(c, errs.WrapMsg(err, "etcd get err")) + return + } + if len(eResp.Kvs) == 0 { + c.JSON(http.StatusOK, []*prommetrics.Target{}) + } + + var ( + resp = &prommetrics.RespTarget{ + Targets: make([]string, 0, len(eResp.Kvs)), + } + ) + + for i := range eResp.Kvs { + var target prommetrics.Target + err = json.Unmarshal(eResp.Kvs[i].Value, &target) + if err != nil { + log.ZError(c, "prometheus unmarshal err", errs.Wrap(err)) + } + resp.Targets = append(resp.Targets, target.Target) + if resp.Labels == nil { + resp.Labels = target.Labels + } + } + + c.JSON(200, []*prommetrics.RespTarget{resp}) +} + +func (p *PrometheusDiscoveryApi) Api(c *gin.Context) { + p.discovery(c, prommetrics.APIKeyName) +} + +func (p *PrometheusDiscoveryApi) User(c *gin.Context) { + p.discovery(c, p.config.Share.RpcRegisterName.User) +} + +func (p *PrometheusDiscoveryApi) Group(c *gin.Context) { + p.discovery(c, p.config.Share.RpcRegisterName.Group) +} + +func (p *PrometheusDiscoveryApi) Msg(c *gin.Context) { + p.discovery(c, p.config.Share.RpcRegisterName.Msg) +} + +func (p *PrometheusDiscoveryApi) Friend(c *gin.Context) { + p.discovery(c, p.config.Share.RpcRegisterName.Friend) +} + +func (p *PrometheusDiscoveryApi) Conversation(c *gin.Context) { + p.discovery(c, p.config.Share.RpcRegisterName.Conversation) +} + +func (p *PrometheusDiscoveryApi) Third(c *gin.Context) { + p.discovery(c, p.config.Share.RpcRegisterName.Third) +} + +func (p *PrometheusDiscoveryApi) Auth(c *gin.Context) { + p.discovery(c, p.config.Share.RpcRegisterName.Auth) +} + +func (p *PrometheusDiscoveryApi) Push(c *gin.Context) { + p.discovery(c, p.config.Share.RpcRegisterName.Push) +} + +func (p *PrometheusDiscoveryApi) MessageGateway(c *gin.Context) { + p.discovery(c, p.config.Share.RpcRegisterName.MessageGateway) +} + +func (p *PrometheusDiscoveryApi) MessageTransfer(c *gin.Context) { + p.discovery(c, prommetrics.MessageTransferKeyName) +} diff --git a/internal/api/router.go b/internal/api/router.go index 9b3fac24f..72cafc6b3 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -2,7 +2,9 @@ package api import ( "context" - "fmt" + "net/http" + "strings" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" pbAuth "github.com/openimsdk/protocol/auth" "github.com/openimsdk/protocol/conversation" @@ -11,8 +13,6 @@ import ( "github.com/openimsdk/protocol/relation" "github.com/openimsdk/protocol/third" "github.com/openimsdk/protocol/user" - "net/http" - "strings" "github.com/openimsdk/open-im-server/v3/internal/api/jssdk" @@ -21,9 +21,6 @@ import ( "github.com/gin-gonic/gin" "github.com/gin-gonic/gin/binding" "github.com/go-playground/validator/v10" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/protocol/constant" @@ -56,8 +53,6 @@ func prommetricsGin() gin.HandlerFunc { } func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, config *Config) (*gin.Engine, error) { - client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) authConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Auth) if err != nil { return nil, err @@ -100,12 +95,11 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co case BestSpeed: r.Use(gzip.Gzip(gzip.BestSpeed)) } - r.Use(prommetricsGin(), gin.RecoveryWithWriter(gin.DefaultErrorWriter, mw.GinPanicErr), mw.CorsHandler(), - mw.GinParseOperationID(), GinParseToken(rpcli.NewAuthClient(authConn))) - + r.Use(prommetricsGin(), gin.RecoveryWithWriter(gin.DefaultErrorWriter, mw.GinPanicErr), mw.CorsHandler(), mw.GinParseOperationID(), GinParseToken(rpcli.NewAuthClient(authConn))) u := NewUserApi(user.NewUserClient(userConn), client, config.Share.RpcRegisterName) + m := NewMessageApi(msg.NewMsgClient(msgConn), rpcli.NewUserClient(userConn), config.Share.IMAdminUserID) + userRouterGroup := r.Group("/user") { - userRouterGroup := r.Group("/user") userRouterGroup.POST("/user_register", u.UserRegister) userRouterGroup.POST("/update_user_info", u.UpdateUserInfo) userRouterGroup.POST("/update_user_info_ex", u.UpdateUserInfoEx) @@ -228,7 +222,6 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co objectGroup.GET("/*name", t.ObjectRedirect) } // Message - m := NewMessageApi(msg.NewMsgClient(msgConn), rpcli.NewUserClient(userConn), config.Share.IMAdminUserID) { msgGroup := r.Group("/msg") msgGroup.POST("/newest_seq", m.GetSeq) @@ -284,7 +277,21 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co jssdk.POST("/get_conversations", j.GetConversations) jssdk.POST("/get_active_conversations", j.GetActiveConversations) } - + { + pd := NewPrometheusDiscoveryApi(config, client) + proDiscoveryGroup := r.Group("/prometheus_discovery", pd.Enable) + proDiscoveryGroup.GET("/api", pd.Api) + proDiscoveryGroup.GET("/user", pd.User) + proDiscoveryGroup.GET("/group", pd.Group) + proDiscoveryGroup.GET("/msg", pd.Msg) + proDiscoveryGroup.GET("/friend", pd.Friend) + proDiscoveryGroup.GET("/conversation", pd.Conversation) + proDiscoveryGroup.GET("/third", pd.Third) + proDiscoveryGroup.GET("/auth", pd.Auth) + proDiscoveryGroup.GET("/push", pd.Push) + proDiscoveryGroup.GET("/msg_gateway", pd.MessageGateway) + proDiscoveryGroup.GET("/msg_transfer", pd.MessageTransfer) + } return r, nil } diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index 13f63f8b9..2e6a0f2de 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -54,6 +54,7 @@ func (s *Server) InitServer(ctx context.Context, config *Config, disCov discover func (s *Server) Start(ctx context.Context, index int, conf *Config) error { return startrpc.Start(ctx, &conf.Discovery, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP, conf.MsgGateway.RPC.RegisterIP, + conf.MsgGateway.RPC.AutoSetPorts, conf.MsgGateway.RPC.Ports, index, conf.Share.RpcRegisterName.MessageGateway, &conf.Share, diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index ed7e04b7b..982d0b82e 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -16,13 +16,13 @@ package msggateway import ( "context" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/rpccache" "github.com/openimsdk/tools/db/redisutil" - "github.com/openimsdk/tools/utils/datautil" - "time" - "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/utils/datautil" ) type Config struct { @@ -35,7 +35,8 @@ type Config struct { // Start run ws server. func Start(ctx context.Context, index int, conf *Config) error { - log.CInfo(ctx, "MSG-GATEWAY server is initializing", "rpcPorts", conf.MsgGateway.RPC.Ports, + log.CInfo(ctx, "MSG-GATEWAY server is initializing", "autoSetPorts", conf.MsgGateway.RPC.AutoSetPorts, + "rpcPorts", conf.MsgGateway.RPC.Ports, "wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports) wsPort, err := datautil.GetElemByIndex(conf.MsgGateway.LongConnSvr.Ports, index) if err != nil { diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index bf315bd83..1ac97eeb1 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -18,11 +18,17 @@ import ( "context" "errors" "fmt" + "net" "net/http" "os" "os/signal" + "strconv" "syscall" + "github.com/openimsdk/tools/discovery/etcd" + "github.com/openimsdk/tools/utils/jsonutil" + "github.com/openimsdk/tools/utils/network" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" @@ -30,8 +36,9 @@ import ( "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/utils/datautil" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" + conf "github.com/openimsdk/open-im-server/v3/pkg/common/config" discRegister "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" + kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" @@ -53,16 +60,17 @@ type MsgTransfer struct { } type Config struct { - MsgTransfer config.MsgTransfer - RedisConfig config.Redis - MongodbConfig config.Mongo - KafkaConfig config.Kafka - Share config.Share - WebhooksConfig config.Webhooks - Discovery config.Discovery + MsgTransfer conf.MsgTransfer + RedisConfig conf.Redis + MongodbConfig conf.Mongo + KafkaConfig conf.Kafka + Share conf.Share + WebhooksConfig conf.Webhooks + Discovery conf.Discovery } func Start(ctx context.Context, index int, config *Config) error { + log.CInfo(ctx, "MSG-TRANSFER server is initializing", "prometheusPorts", config.MsgTransfer.Prometheus.Ports, "index", index) @@ -116,7 +124,7 @@ func Start(ctx context.Context, index int, config *Config) error { return msgTransfer.Start(index, config) } -func (m *MsgTransfer) Start(index int, config *Config) error { +func (m *MsgTransfer) Start(index int, cfg *Config) error { m.ctx, m.cancel = context.WithCancel(context.Background()) var ( netDone = make(chan struct{}, 1) @@ -131,16 +139,67 @@ func (m *MsgTransfer) Start(index int, config *Config) error { return err } - if config.MsgTransfer.Prometheus.Enable { - go func() { - prometheusPort, err := datautil.GetElemByIndex(config.MsgTransfer.Prometheus.Ports, index) + client, err := kdisc.NewDiscoveryRegister(&cfg.Discovery, &cfg.Share, nil) + if err != nil { + return errs.WrapMsg(err, "failed to register discovery service") + } + + registerIP, err := network.GetRpcRegisterIP("") + if err != nil { + return err + } + + getAutoPort := func() (net.Listener, int, error) { + registerAddr := net.JoinHostPort(registerIP, "0") + listener, err := net.Listen("tcp", registerAddr) + if err != nil { + return nil, 0, errs.WrapMsg(err, "listen err", "registerAddr", registerAddr) + } + _, portStr, _ := net.SplitHostPort(listener.Addr().String()) + port, _ := strconv.Atoi(portStr) + return listener, port, nil + } + + if cfg.MsgTransfer.Prometheus.AutoSetPorts && cfg.Discovery.Enable != conf.ETCD { + return errs.New("only etcd support autoSetPorts", "RegisterName", "api").Wrap() + } + + if cfg.MsgTransfer.Prometheus.Enable { + var ( + listener net.Listener + prometheusPort int + ) + + if cfg.MsgTransfer.Prometheus.AutoSetPorts { + listener, prometheusPort, err = getAutoPort() if err != nil { - netErr = err - netDone <- struct{}{} - return + return err } - if err := prommetrics.TransferInit(prometheusPort); err != nil && !errors.Is(err, http.ErrServerClosed) { + etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + + _, err = etcdClient.Put(context.TODO(), prommetrics.BuildDiscoveryKey(prommetrics.MessageTransferKeyName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) + if err != nil { + return errs.WrapMsg(err, "etcd put err") + } + } else { + prometheusPort, err = datautil.GetElemByIndex(cfg.MsgTransfer.Prometheus.Ports, index) + if err != nil { + return err + } + listener, err = net.Listen("tcp", fmt.Sprintf(":%d", prometheusPort)) + if err != nil { + return errs.WrapMsg(err, "listen err", "addr", fmt.Sprintf(":%d", prometheusPort)) + } + } + + go func() { + defer func() { + if r := recover(); r != nil { + log.ZPanic(m.ctx, "MsgTransfer Start Panic", errs.ErrPanic(r)) + } + }() + if err := prommetrics.TransferInit(listener); err != nil && !errors.Is(err, http.ErrServerClosed) { netErr = errs.WrapMsg(err, "prometheus start error", "prometheusPort", prometheusPort) netDone <- struct{}{} } diff --git a/pkg/common/cmd/auth.go b/pkg/common/cmd/auth.go index cc7077385..54f65bc37 100644 --- a/pkg/common/cmd/auth.go +++ b/pkg/common/cmd/auth.go @@ -55,7 +55,7 @@ func (a *AuthRpcCmd) Exec() error { func (a *AuthRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, - a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.Ports, + a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.AutoSetPorts, a.authConfig.RpcConfig.RPC.Ports, a.Index(), a.authConfig.Share.RpcRegisterName.Auth, &a.authConfig.Share, a.authConfig, []string{ a.authConfig.Share.RpcRegisterName.MessageGateway, diff --git a/pkg/common/cmd/conversation.go b/pkg/common/cmd/conversation.go index 77b88b79a..3d252e267 100644 --- a/pkg/common/cmd/conversation.go +++ b/pkg/common/cmd/conversation.go @@ -57,7 +57,7 @@ func (a *ConversationRpcCmd) Exec() error { func (a *ConversationRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, - a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.Ports, + a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.AutoSetPorts, a.conversationConfig.RpcConfig.RPC.Ports, a.Index(), a.conversationConfig.Share.RpcRegisterName.Conversation, &a.conversationConfig.Share, a.conversationConfig, nil, conversation.Start) diff --git a/pkg/common/cmd/friend.go b/pkg/common/cmd/friend.go index bbab2a93f..363ca375f 100644 --- a/pkg/common/cmd/friend.go +++ b/pkg/common/cmd/friend.go @@ -58,7 +58,7 @@ func (a *FriendRpcCmd) Exec() error { func (a *FriendRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP, - a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.Ports, + a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.AutoSetPorts, a.relationConfig.RpcConfig.RPC.Ports, a.Index(), a.relationConfig.Share.RpcRegisterName.Friend, &a.relationConfig.Share, a.relationConfig, nil, relation.Start) diff --git a/pkg/common/cmd/group.go b/pkg/common/cmd/group.go index ecfc08603..44fa712f6 100644 --- a/pkg/common/cmd/group.go +++ b/pkg/common/cmd/group.go @@ -59,7 +59,7 @@ func (a *GroupRpcCmd) Exec() error { func (a *GroupRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, - a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.Ports, + a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.AutoSetPorts, a.groupConfig.RpcConfig.RPC.Ports, a.Index(), a.groupConfig.Share.RpcRegisterName.Group, &a.groupConfig.Share, a.groupConfig, nil, group.Start, versionctx.EnableVersionCtx()) diff --git a/pkg/common/cmd/msg.go b/pkg/common/cmd/msg.go index 0feeb7b64..64124696d 100644 --- a/pkg/common/cmd/msg.go +++ b/pkg/common/cmd/msg.go @@ -59,7 +59,7 @@ func (a *MsgRpcCmd) Exec() error { func (a *MsgRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, - a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.Ports, + a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.AutoSetPorts, a.msgConfig.RpcConfig.RPC.Ports, a.Index(), a.msgConfig.Share.RpcRegisterName.Msg, &a.msgConfig.Share, a.msgConfig, nil, msg.Start) diff --git a/pkg/common/cmd/push.go b/pkg/common/cmd/push.go index 121d59f9d..62bdfceaf 100644 --- a/pkg/common/cmd/push.go +++ b/pkg/common/cmd/push.go @@ -59,7 +59,7 @@ func (a *PushRpcCmd) Exec() error { func (a *PushRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, - a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.Ports, + a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.AutoSetPorts, a.pushConfig.RpcConfig.RPC.Ports, a.Index(), a.pushConfig.Share.RpcRegisterName.Push, &a.pushConfig.Share, a.pushConfig, []string{ a.pushConfig.Share.RpcRegisterName.MessageGateway, diff --git a/pkg/common/cmd/third.go b/pkg/common/cmd/third.go index 850b0aae5..30828cd53 100644 --- a/pkg/common/cmd/third.go +++ b/pkg/common/cmd/third.go @@ -58,7 +58,7 @@ func (a *ThirdRpcCmd) Exec() error { func (a *ThirdRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, - a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.Ports, + a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.AutoSetPorts, a.thirdConfig.RpcConfig.RPC.Ports, a.Index(), a.thirdConfig.Share.RpcRegisterName.Third, &a.thirdConfig.Share, a.thirdConfig, nil, third.Start) diff --git a/pkg/common/cmd/user.go b/pkg/common/cmd/user.go index 6ba6ba3bd..54aae7382 100644 --- a/pkg/common/cmd/user.go +++ b/pkg/common/cmd/user.go @@ -59,7 +59,7 @@ func (a *UserRpcCmd) Exec() error { func (a *UserRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, - a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.Ports, + a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.AutoSetPorts, a.userConfig.RpcConfig.RPC.Ports, a.Index(), a.userConfig.Share.RpcRegisterName.User, &a.userConfig.Share, a.userConfig, nil, user.Start) diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index b4d77f394..e2b9f6c8d 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -15,13 +15,13 @@ package config import ( - "github.com/openimsdk/tools/s3/aws" "strings" "time" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/mq/kafka" + "github.com/openimsdk/tools/s3/aws" "github.com/openimsdk/tools/s3/cos" "github.com/openimsdk/tools/s3/kodo" "github.com/openimsdk/tools/s3/minio" @@ -107,9 +107,10 @@ type API struct { CompressionLevel int `mapstructure:"compressionLevel"` } `mapstructure:"api"` Prometheus struct { - Enable bool `mapstructure:"enable"` - Ports []int `mapstructure:"ports"` - GrafanaURL string `mapstructure:"grafanaURL"` + Enable bool `mapstructure:"enable"` + AutoSetPorts bool `mapstructure:"autoSetPorts"` + Ports []int `mapstructure:"ports"` + GrafanaURL string `mapstructure:"grafanaURL"` } `mapstructure:"prometheus"` } @@ -177,8 +178,9 @@ type Prometheus struct { type MsgGateway struct { RPC struct { - RegisterIP string `mapstructure:"registerIP"` - Ports []int `mapstructure:"ports"` + RegisterIP string `mapstructure:"registerIP"` + AutoSetPorts bool `mapstructure:"autoSetPorts"` + Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` Prometheus Prometheus `mapstructure:"prometheus"` ListenIP string `mapstructure:"listenIP"` @@ -191,14 +193,19 @@ type MsgGateway struct { } type MsgTransfer struct { - Prometheus Prometheus `mapstructure:"prometheus"` + Prometheus struct { + Enable bool `mapstructure:"enable"` + AutoSetPorts bool `mapstructure:"autoSetPorts"` + Ports []int `mapstructure:"ports"` + } `mapstructure:"prometheus"` } type Push struct { RPC struct { - RegisterIP string `mapstructure:"registerIP"` - ListenIP string `mapstructure:"listenIP"` - Ports []int `mapstructure:"ports"` + RegisterIP string `mapstructure:"registerIP"` + ListenIP string `mapstructure:"listenIP"` + AutoSetPorts bool `mapstructure:"autoSetPorts"` + Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` Prometheus Prometheus `mapstructure:"prometheus"` MaxConcurrentWorkers int `mapstructure:"maxConcurrentWorkers"` @@ -231,9 +238,10 @@ type Push struct { type Auth struct { RPC struct { - RegisterIP string `mapstructure:"registerIP"` - ListenIP string `mapstructure:"listenIP"` - Ports []int `mapstructure:"ports"` + RegisterIP string `mapstructure:"registerIP"` + ListenIP string `mapstructure:"listenIP"` + AutoSetPorts bool `mapstructure:"autoSetPorts"` + Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` Prometheus Prometheus `mapstructure:"prometheus"` TokenPolicy struct { @@ -243,27 +251,30 @@ type Auth struct { type Conversation struct { RPC struct { - RegisterIP string `mapstructure:"registerIP"` - ListenIP string `mapstructure:"listenIP"` - Ports []int `mapstructure:"ports"` + RegisterIP string `mapstructure:"registerIP"` + ListenIP string `mapstructure:"listenIP"` + AutoSetPorts bool `mapstructure:"autoSetPorts"` + Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` Prometheus Prometheus `mapstructure:"prometheus"` } type Friend struct { RPC struct { - RegisterIP string `mapstructure:"registerIP"` - ListenIP string `mapstructure:"listenIP"` - Ports []int `mapstructure:"ports"` + RegisterIP string `mapstructure:"registerIP"` + ListenIP string `mapstructure:"listenIP"` + AutoSetPorts bool `mapstructure:"autoSetPorts"` + Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` Prometheus Prometheus `mapstructure:"prometheus"` } type Group struct { RPC struct { - RegisterIP string `mapstructure:"registerIP"` - ListenIP string `mapstructure:"listenIP"` - Ports []int `mapstructure:"ports"` + RegisterIP string `mapstructure:"registerIP"` + ListenIP string `mapstructure:"listenIP"` + AutoSetPorts bool `mapstructure:"autoSetPorts"` + Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` Prometheus Prometheus `mapstructure:"prometheus"` EnableHistoryForNewMembers bool `mapstructure:"enableHistoryForNewMembers"` @@ -271,9 +282,10 @@ type Group struct { type Msg struct { RPC struct { - RegisterIP string `mapstructure:"registerIP"` - ListenIP string `mapstructure:"listenIP"` - Ports []int `mapstructure:"ports"` + RegisterIP string `mapstructure:"registerIP"` + ListenIP string `mapstructure:"listenIP"` + AutoSetPorts bool `mapstructure:"autoSetPorts"` + Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` Prometheus Prometheus `mapstructure:"prometheus"` FriendVerify bool `mapstructure:"friendVerify"` @@ -281,9 +293,10 @@ type Msg struct { type Third struct { RPC struct { - RegisterIP string `mapstructure:"registerIP"` - ListenIP string `mapstructure:"listenIP"` - Ports []int `mapstructure:"ports"` + RegisterIP string `mapstructure:"registerIP"` + ListenIP string `mapstructure:"listenIP"` + AutoSetPorts bool `mapstructure:"autoSetPorts"` + Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` Prometheus Prometheus `mapstructure:"prometheus"` Object struct { @@ -332,9 +345,10 @@ type Aws struct { type User struct { RPC struct { - RegisterIP string `mapstructure:"registerIP"` - ListenIP string `mapstructure:"listenIP"` - Ports []int `mapstructure:"ports"` + RegisterIP string `mapstructure:"registerIP"` + ListenIP string `mapstructure:"listenIP"` + AutoSetPorts bool `mapstructure:"autoSetPorts"` + Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` Prometheus Prometheus `mapstructure:"prometheus"` } diff --git a/pkg/common/config/constant.go b/pkg/common/config/constant.go index f425a624c..96ca88353 100644 --- a/pkg/common/config/constant.go +++ b/pkg/common/config/constant.go @@ -14,7 +14,10 @@ package config -const ConfKey = "conf" +const ( + ConfKey = "conf" + ETCD = "etcd" +) const ( // DefaultDirPerm is used for creating general directories, allowing the owner to read, write, and execute, diff --git a/pkg/common/config/load_config_test.go b/pkg/common/config/load_config_test.go index a0345fc7a..763bffd9f 100644 --- a/pkg/common/config/load_config_test.go +++ b/pkg/common/config/load_config_test.go @@ -59,3 +59,11 @@ func TestLoadOpenIMThirdConfig(t *testing.T) { // Environment: IMENV_OPENIM_RPC_THIRD_OBJECT_ENABLE=enabled;IMENV_OPENIM_RPC_THIRD_OBJECT_OSS_ENDPOINT=https://oss-cn-chengdu.aliyuncs.com;IMENV_OPENIM_RPC_THIRD_OBJECT_OSS_BUCKET=my_bucket_name;IMENV_OPENIM_RPC_THIRD_OBJECT_OSS_BUCKETURL=https://my_bucket_name.oss-cn-chengdu.aliyuncs.com;IMENV_OPENIM_RPC_THIRD_OBJECT_OSS_ACCESSKEYID=AKID1234567890;IMENV_OPENIM_RPC_THIRD_OBJECT_OSS_ACCESSKEYSECRET=abc123xyz789;IMENV_OPENIM_RPC_THIRD_OBJECT_OSS_SESSIONTOKEN=session_token_value;IMENV_OPENIM_RPC_THIRD_OBJECT_OSS_PUBLICREAD=true } + +func TestTransferConfig(t *testing.T) { + var tran MsgTransfer + err := LoadConfig("../../../config/openim-msgtransfer.yml", "IMENV_OPENIM-MSGTRANSFER", &tran) + assert.Nil(t, err) + assert.Equal(t, true, tran.Prometheus.Enable) + assert.Equal(t, true, tran.Prometheus.AutoSetPorts) +} diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index f2d96cda4..f7175491c 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -21,22 +21,12 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery/etcd" - "github.com/openimsdk/tools/discovery/zookeeper" "github.com/openimsdk/tools/errs" ) // NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type. func NewDiscoveryRegister(discovery *config.Discovery, share *config.Share, watchNames []string) (discovery.SvcDiscoveryRegistry, error) { switch discovery.Enable { - case "zookeeper": - return zookeeper.NewZkClient( - discovery.ZooKeeper.Address, - discovery.ZooKeeper.Schema, - zookeeper.WithFreq(time.Hour), - zookeeper.WithUserNameAndPassword(discovery.ZooKeeper.Username, discovery.ZooKeeper.Password), - zookeeper.WithRoundRobin(), - zookeeper.WithTimeout(10), - ) case "k8s": return kubernetes.NewK8sDiscoveryRegister(share.RpcRegisterName.MessageGateway) case "etcd": diff --git a/pkg/common/prommetrics/api.go b/pkg/common/prommetrics/api.go index 95b5c06b6..2dc5cb65d 100644 --- a/pkg/common/prommetrics/api.go +++ b/pkg/common/prommetrics/api.go @@ -3,6 +3,7 @@ package prommetrics import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "net" "strconv" ) @@ -23,14 +24,14 @@ var ( ) ) -func ApiInit(prometheusPort int) error { +func ApiInit(listener net.Listener) error { apiRegistry := prometheus.NewRegistry() cs := append( baseCollector, apiCounter, httpCounter, ) - return Init(apiRegistry, prometheusPort, commonPath, promhttp.HandlerFor(apiRegistry, promhttp.HandlerOpts{}), cs...) + return Init(apiRegistry, listener, commonPath, promhttp.HandlerFor(apiRegistry, promhttp.HandlerOpts{}), cs...) } func APICall(path string, method string, apiCode int) { diff --git a/pkg/common/prommetrics/discovery.go b/pkg/common/prommetrics/discovery.go new file mode 100644 index 000000000..8f03bc2ae --- /dev/null +++ b/pkg/common/prommetrics/discovery.go @@ -0,0 +1,31 @@ +package prommetrics + +import "fmt" + +const ( + APIKeyName = "api" + MessageTransferKeyName = "message-transfer" +) + +type Target struct { + Target string `json:"target"` + Labels map[string]string `json:"labels"` +} + +type RespTarget struct { + Targets []string `json:"targets"` + Labels map[string]string `json:"labels"` +} + +func BuildDiscoveryKey(name string) string { + return fmt.Sprintf("%s/%s/%s", "openim", "prometheus_discovery", name) +} + +func BuildDefaultTarget(host string, ip int) Target { + return Target{ + Target: fmt.Sprintf("%s:%d", host, ip), + Labels: map[string]string{ + "namespace": "default", + }, + } +} diff --git a/pkg/common/prommetrics/prommetrics.go b/pkg/common/prommetrics/prommetrics.go index 02e408d63..2fc5d76b4 100644 --- a/pkg/common/prommetrics/prommetrics.go +++ b/pkg/common/prommetrics/prommetrics.go @@ -15,9 +15,9 @@ package prommetrics import ( - "fmt" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" + "net" "net/http" ) @@ -30,9 +30,9 @@ var ( } ) -func Init(registry *prometheus.Registry, prometheusPort int, path string, handler http.Handler, cs ...prometheus.Collector) error { +func Init(registry *prometheus.Registry, listener net.Listener, path string, handler http.Handler, cs ...prometheus.Collector) error { registry.MustRegister(cs...) srv := http.NewServeMux() srv.Handle(path, handler) - return http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), srv) + return http.Serve(listener, srv) } diff --git a/pkg/common/prommetrics/rpc.go b/pkg/common/prommetrics/rpc.go index 7162fa7e8..ddad4c671 100644 --- a/pkg/common/prommetrics/rpc.go +++ b/pkg/common/prommetrics/rpc.go @@ -5,6 +5,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "net" "strconv" ) @@ -21,13 +22,13 @@ var ( ) ) -func RpcInit(cs []prometheus.Collector, prometheusPort int) error { +func RpcInit(cs []prometheus.Collector, listener net.Listener) error { reg := prometheus.NewRegistry() cs = append(append( baseCollector, rpcCounter, ), cs...) - return Init(reg, prometheusPort, rpcPath, promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}), cs...) + return Init(reg, listener, rpcPath, promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}), cs...) } func RPCCall(name string, path string, code int) { diff --git a/pkg/common/prommetrics/transfer.go b/pkg/common/prommetrics/transfer.go index f0abb8285..36fe1d568 100644 --- a/pkg/common/prommetrics/transfer.go +++ b/pkg/common/prommetrics/transfer.go @@ -17,6 +17,7 @@ package prommetrics import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "net" ) var ( @@ -42,7 +43,7 @@ var ( }) ) -func TransferInit(prometheusPort int) error { +func TransferInit(listener net.Listener) error { reg := prometheus.NewRegistry() cs := append( baseCollector, @@ -52,5 +53,5 @@ func TransferInit(prometheusPort int) error { MsgInsertMongoFailedCounter, SeqSetFailedCounter, ) - return Init(reg, prometheusPort, commonPath, promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}), cs...) + return Init(reg, listener, commonPath, promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}), cs...) } diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 0a59a90b8..3d4394c51 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -16,6 +16,7 @@ package startrpc import ( "context" + "errors" "fmt" "net" "net/http" @@ -25,8 +26,10 @@ import ( "syscall" "time" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" + conf "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/utils/datautil" + "github.com/openimsdk/tools/utils/jsonutil" "google.golang.org/grpc/status" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" @@ -41,29 +44,47 @@ import ( ) // Start rpc server. -func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusConfig *config.Prometheus, listenIP, - registerIP string, rpcPorts []int, index int, rpcRegisterName string, share *config.Share, config T, +func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP, + registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, share *conf.Share, config T, watchServiceNames []string, - rpcFn func(ctx context.Context, - config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { + rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, + options ...grpc.ServerOption) error { - rpcPort, err := datautil.GetElemByIndex(rpcPorts, index) + var ( + rpcTcpAddr string + netDone = make(chan struct{}, 2) + netErr error + prometheusPort int + ) + + registerIP, err := network.GetRpcRegisterIP(registerIP) if err != nil { return err } - log.CInfo(ctx, "RPC server is initializing", "rpcRegisterName", rpcRegisterName, "rpcPort", rpcPort, - "prometheusPorts", prometheusConfig.Ports) - rpcTcpAddr := net.JoinHostPort(network.GetListenIP(listenIP), strconv.Itoa(rpcPort)) - - listener, err := net.Listen( - "tcp", - rpcTcpAddr, - ) - if err != nil { - return errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr) + if !autoSetPorts { + rpcPort, err := datautil.GetElemByIndex(rpcPorts, index) + if err != nil { + return err + } + rpcTcpAddr = net.JoinHostPort(network.GetListenIP(listenIP), strconv.Itoa(rpcPort)) + } else { + rpcTcpAddr = net.JoinHostPort(network.GetListenIP(listenIP), "0") + } + + getAutoPort := func() (net.Listener, int, error) { + listener, err := net.Listen("tcp", rpcTcpAddr) + if err != nil { + return nil, 0, errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr) + } + _, portStr, _ := net.SplitHostPort(listener.Addr().String()) + port, _ := strconv.Atoi(portStr) + return listener, port, nil + } + + if autoSetPorts && discovery.Enable != conf.ETCD { + return errs.New("only etcd support autoSetPorts", "rpcRegisterName", rpcRegisterName).Wrap() } - defer listener.Close() client, err := kdisc.NewDiscoveryRegister(discovery, share, watchServiceNames) if err != nil { return err @@ -75,19 +96,69 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo // var reg *prometheus.Registry // var metric *grpcprometheus.ServerMetrics if prometheusConfig.Enable { - //cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share) - //reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics) - //options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()), + // cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share) + // reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics) + // options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()), // grpc.UnaryInterceptor(metric.UnaryServerInterceptor())) options = append( options, mw.GrpcServer(), prommetricsUnaryInterceptor(rpcRegisterName), prommetricsStreamInterceptor(rpcRegisterName), ) + + var ( + listener net.Listener + ) + + if autoSetPorts { + listener, prometheusPort, err = getAutoPort() + if err != nil { + return err + } + + etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + + _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(rpcRegisterName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) + if err != nil { + return errs.WrapMsg(err, "etcd put err") + } + } else { + prometheusPort, err = datautil.GetElemByIndex(prometheusConfig.Ports, index) + if err != nil { + return err + } + listener, err = net.Listen("tcp", fmt.Sprintf(":%d", prometheusPort)) + if err != nil { + return errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr) + } + } + cs := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share) + go func() { + if err := prommetrics.RpcInit(cs, listener); err != nil && !errors.Is(err, http.ErrServerClosed) { + netErr = errs.WrapMsg(err, fmt.Sprintf("rpc %s prometheus start err: %d", rpcRegisterName, prometheusPort)) + netDone <- struct{}{} + } + //metric.InitializeMetrics(srv) + // Create a HTTP server for prometheus. + // httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)} + // if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + // netErr = errs.WrapMsg(err, "prometheus start err", httpServer.Addr) + // netDone <- struct{}{} + // } + }() } else { options = append(options, mw.GrpcServer()) } + listener, port, err := getAutoPort() + if err != nil { + return err + } + + log.CInfo(ctx, "RPC server is initializing", "rpcRegisterName", rpcRegisterName, "rpcPort", port, + "prometheusPort", prometheusPort) + + defer listener.Close() srv := grpc.NewServer(options...) err = rpcFn(ctx, config, client, srv) @@ -98,43 +169,16 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo err = client.Register( rpcRegisterName, registerIP, - rpcPort, + port, grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { return err } - var ( - netDone = make(chan struct{}, 2) - netErr error - ) - if prometheusConfig.Enable { - go func() { - prometheusPort, err := datautil.GetElemByIndex(prometheusConfig.Ports, index) - if err != nil { - netErr = err - netDone <- struct{}{} - return - } - cs := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share) - if err := prommetrics.RpcInit(cs, prometheusPort); err != nil && err != http.ErrServerClosed { - netErr = errs.WrapMsg(err, fmt.Sprintf("rpc %s prometheus start err: %d", rpcRegisterName, prometheusPort)) - netDone <- struct{}{} - } - //metric.InitializeMetrics(srv) - // Create a HTTP server for prometheus. - //httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)} - //if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { - // netErr = errs.WrapMsg(err, "prometheus start err", httpServer.Addr) - // netDone <- struct{}{} - //} - }() - } - go func() { err := srv.Serve(listener) - if err != nil { + if err != nil && !errors.Is(err, http.ErrServerClosed) { netErr = errs.WrapMsg(err, "rpc start err: ", rpcTcpAddr) netDone <- struct{}{} }