From 1a5c094e13d2ebe19d934155b55b8ccf004e198d Mon Sep 17 00:00:00 2001 From: icey-yu <1186114839@qq.com> Date: Thu, 5 Jun 2025 15:57:53 +0800 Subject: [PATCH] fix: prometheus discovery --- cmd/main.go | 17 ++------------ go.mod | 2 +- go.sum | 4 ++-- internal/api/init.go | 2 +- internal/api/prometheus_discovery.go | 27 ++++++++++++----------- internal/api/router.go | 2 +- internal/msggateway/init.go | 2 +- internal/msgtransfer/init.go | 2 +- internal/push/push.go | 2 +- internal/rpc/auth/auth.go | 2 +- internal/rpc/conversation/conversation.go | 2 +- internal/rpc/group/group.go | 2 +- internal/rpc/msg/server.go | 2 +- internal/rpc/relation/friend.go | 2 +- internal/rpc/third/third.go | 2 +- internal/rpc/user/user.go | 2 +- internal/tools/cron/cron_task.go | 2 +- pkg/common/cmd/api.go | 3 ++- pkg/common/cmd/msg_transfer.go | 3 ++- pkg/common/prommetrics/prommetrics.go | 8 ++++++- pkg/common/startrpc/start.go | 10 +++++---- 21 files changed, 49 insertions(+), 51 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 1d0b82be8..7e19f1c98 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -3,7 +3,6 @@ package main import ( "bytes" "context" - "encoding/json" "flag" "fmt" "net" @@ -39,7 +38,6 @@ import ( "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/system/program" "github.com/openimsdk/tools/utils/datautil" - "github.com/openimsdk/tools/utils/network" "github.com/spf13/viper" "google.golang.org/grpc" ) @@ -250,23 +248,12 @@ func (x *cmds) run(ctx context.Context) error { return err } } - ip, err := network.GetLocalIP() - if err != nil { - return err - } listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { return fmt.Errorf("prometheus listen %d error %w", port, err) } defer listener.Close() log.ZDebug(ctx, "prometheus start", "addr", listener.Addr()) - target, err := json.Marshal(prommetrics.BuildDefaultTarget(ip, listener.Addr().(*net.TCPAddr).Port)) - if err != nil { - return err - } - if err := standalone.GetKeyValue().SetKey(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), target); err != nil { - return err - } go func() { err := prommetrics.Start(listener) if err == nil { @@ -342,7 +329,7 @@ func (x *cmds) run(ctx context.Context) error { } } -func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar) error) { +func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error) { name := path.Base(runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()) if index := strings.Index(name, "."); index >= 0 { name = name[:index] @@ -352,7 +339,7 @@ func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C if err := cmd.parseConf(&conf); err != nil { return err } - return fn(ctx, &conf, standalone.GetDiscoveryConn(), standalone.GetServiceRegistrar()) + return fn(ctx, &conf, standalone.GetSvcDiscoveryRegistry(), standalone.GetServiceRegistrar()) }) } diff --git a/go.mod b/go.mod index 221e28b72..845f75bb2 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.73-alpha.12 - github.com/openimsdk/tools v0.0.50-alpha.84 + github.com/openimsdk/tools v0.0.50-alpha.85 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 6298f98c9..b775a1056 100644 --- a/go.sum +++ b/go.sum @@ -349,8 +349,8 @@ github.com/openimsdk/gomake v0.0.15-alpha.5 h1:eEZCEHm+NsmcO3onXZPIUbGFCYPYbsX5b github.com/openimsdk/gomake v0.0.15-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce2WmwEtbP4JvKk= github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= -github.com/openimsdk/tools v0.0.50-alpha.84 h1:jN60Ys/0edZjL/TDmm/5VSJFP4pGYRipkWqhILJbq/8= -github.com/openimsdk/tools v0.0.50-alpha.84/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= +github.com/openimsdk/tools v0.0.50-alpha.85 h1:OqTUYx6r7Zp/eH8FKB08XeNjPV405TUIG9QT6QQ+F+s= +github.com/openimsdk/tools v0.0.50-alpha.85/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/api/init.go b/internal/api/init.go index 378f03eda..f3548e29a 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -39,7 +39,7 @@ type Config struct { Index conf.Index } -func Start(ctx context.Context, config *Config, client discovery.Conn, service grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, service grpc.ServiceRegistrar) error { apiPort, err := datautil.GetElemByIndex(config.API.Api.Ports, int(config.Index)) if err != nil { return err diff --git a/internal/api/prometheus_discovery.go b/internal/api/prometheus_discovery.go index bdcca4e26..c861003a0 100644 --- a/internal/api/prometheus_discovery.go +++ b/internal/api/prometheus_discovery.go @@ -6,35 +6,29 @@ import ( "net/http" "github.com/gin-gonic/gin" - conf "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/tools/apiresp" "github.com/openimsdk/tools/discovery" - "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/errs" - clientv3 "go.etcd.io/etcd/client/v3" ) type PrometheusDiscoveryApi struct { config *Config - client *clientv3.Client kv discovery.KeyValue } -func NewPrometheusDiscoveryApi(config *Config, client discovery.Conn) *PrometheusDiscoveryApi { +func NewPrometheusDiscoveryApi(config *Config, client discovery.SvcDiscoveryRegistry) *PrometheusDiscoveryApi { api := &PrometheusDiscoveryApi{ config: config, - } - if config.Discovery.Enable == conf.ETCD { - api.client = client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + kv: client, } return api } func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) { - value, err := p.kv.GetKey(c, prommetrics.BuildDiscoveryKey(key)) + value, err := p.kv.GetKeyWithPrefix(c, prommetrics.BuildDiscoveryKeyPrefix(key)) if err != nil { - if errors.Is(err, discovery.ErrNotSupportedKeyValue) { + if errors.Is(err, discovery.ErrNotSupported) { c.JSON(http.StatusOK, []struct{}{}) return } @@ -46,10 +40,17 @@ func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) { return } var resp prommetrics.RespTarget - if err := json.Unmarshal(value, &resp); err != nil { - apiresp.GinError(c, errs.WrapMsg(err, "json unmarshal err")) - return + for i := range value { + var tmp prommetrics.Target + if err = json.Unmarshal(value[i], &tmp); err != nil { + apiresp.GinError(c, errs.WrapMsg(err, "json unmarshal err")) + return + } + + resp.Targets = append(resp.Targets, tmp.Target) + resp.Labels = tmp.Labels // default label is fixed. See prommetrics.BuildDefaultTarget } + c.JSON(http.StatusOK, []*prommetrics.RespTarget{&resp}) } diff --git a/internal/api/router.go b/internal/api/router.go index 5e5f35a60..fcad104b8 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -53,7 +53,7 @@ func prommetricsGin() gin.HandlerFunc { } } -func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin.Engine, error) { +func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cfg *Config) (*gin.Engine, error) { authConn, err := client.GetConn(ctx, cfg.Discovery.RpcService.Auth) if err != nil { return nil, err diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 8772693cc..40a57b1da 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -39,7 +39,7 @@ type Config struct { } // Start run ws server. -func Start(ctx context.Context, conf *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, conf *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { log.CInfo(ctx, "MSG-GATEWAY server is initializing", "runtimeEnv", runtimeenv.RuntimeEnvironment(), "rpcPorts", conf.MsgGateway.RPC.Ports, "wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports) diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 175813552..a529edfe2 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -58,7 +58,7 @@ type Config struct { Index conf.Index } -func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { builder := mqbuild.NewBuilder(&config.KafkaConfig) log.CInfo(ctx, "MSG-TRANSFER server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "prometheusPorts", diff --git a/internal/push/push.go b/internal/push/push.go index f720a52ac..1d6f8cb30 100644 --- a/internal/push/push.go +++ b/internal/push/push.go @@ -50,7 +50,7 @@ func (p pushServer) DelUserPushToken(ctx context.Context, return &pbpush.DelUserPushTokenResp{}, nil } -func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { dbb := dbbuild.NewBuilder(&config.MongoConfig, &config.RedisConfig) rdb, err := dbb.Redis(ctx) if err != nil { diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 2c2691d1d..5ed9cdf12 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -59,7 +59,7 @@ type Config struct { Discovery config.Discovery } -func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { dbb := dbbuild.NewBuilder(&config.MongoConfig, &config.RedisConfig) rdb, err := dbb.Redis(ctx) if err != nil { diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index ba9e7746b..cf5a2b9c6 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -69,7 +69,7 @@ type Config struct { Discovery config.Discovery } -func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) mgocli, err := dbb.Mongo(ctx) if err != nil { diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 5219546b7..ce8a2c7aa 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -76,7 +76,7 @@ type Config struct { Discovery config.Discovery } -func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) mgocli, err := dbb.Mongo(ctx) if err != nil { diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index cfc750c5b..9d5391cc9 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -78,7 +78,7 @@ func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorF } -func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { builder := mqbuild.NewBuilder(&config.KafkaConfig) redisProducer, err := builder.GetTopicProducer(ctx, config.KafkaConfig.ToRedisTopic) if err != nil { diff --git a/internal/rpc/relation/friend.go b/internal/rpc/relation/friend.go index 8c7c40536..d05cf7d77 100644 --- a/internal/rpc/relation/friend.go +++ b/internal/rpc/relation/friend.go @@ -66,7 +66,7 @@ type Config struct { Discovery config.Discovery } -func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) mgocli, err := dbb.Mongo(ctx) if err != nil { diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index c6dcb2ea4..cea6a8522 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -64,7 +64,7 @@ type Config struct { Discovery config.Discovery } -func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) mgocli, err := dbb.Mongo(ctx) if err != nil { diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 5639baab9..28461ae0a 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -79,7 +79,7 @@ type Config struct { Discovery config.Discovery } -func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) mgocli, err := dbb.Mongo(ctx) if err != nil { diff --git a/internal/tools/cron/cron_task.go b/internal/tools/cron/cron_task.go index 7ae314193..c18e47a33 100644 --- a/internal/tools/cron/cron_task.go +++ b/internal/tools/cron/cron_task.go @@ -25,7 +25,7 @@ type Config struct { Discovery config.Discovery } -func Start(ctx context.Context, conf *Config, client discovery.Conn, service grpc.ServiceRegistrar) error { +func Start(ctx context.Context, conf *Config, client discovery.SvcDiscoveryRegistry, service grpc.ServiceRegistrar) error { log.CInfo(ctx, "CRON-TASK server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "chatRecordsClearTime", conf.CronTask.CronExecuteTime, "msgDestructTime", conf.CronTask.RetainChatRecords) if conf.CronTask.RetainChatRecords < 1 { log.ZInfo(ctx, "disable cron") diff --git a/pkg/common/cmd/api.go b/pkg/common/cmd/api.go index 484467798..7b7dbc89b 100644 --- a/pkg/common/cmd/api.go +++ b/pkg/common/cmd/api.go @@ -19,6 +19,7 @@ import ( "github.com/openimsdk/open-im-server/v3/internal/api" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" @@ -84,7 +85,7 @@ func (a *ApiCmd) runE() error { a.apiConfig.API.Api.ListenIP, "", a.apiConfig.API.Prometheus.AutoSetPorts, nil, int(a.apiConfig.Index), - a.apiConfig.Discovery.RpcService.MessageGateway, + prommetrics.APIKeyName, &a.apiConfig.Notification, a.apiConfig, []string{}, diff --git a/pkg/common/cmd/msg_transfer.go b/pkg/common/cmd/msg_transfer.go index 9411a2cd0..fe6c27e54 100644 --- a/pkg/common/cmd/msg_transfer.go +++ b/pkg/common/cmd/msg_transfer.go @@ -19,6 +19,7 @@ import ( "github.com/openimsdk/open-im-server/v3/internal/msgtransfer" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" @@ -65,7 +66,7 @@ func (m *MsgTransferCmd) runE() error { "", "", true, nil, int(m.msgTransferConfig.Index), - "", + prommetrics.MessageTransferKeyName, nil, m.msgTransferConfig, []string{}, diff --git a/pkg/common/prommetrics/prommetrics.go b/pkg/common/prommetrics/prommetrics.go index 153314bbb..3f683a50e 100644 --- a/pkg/common/prommetrics/prommetrics.go +++ b/pkg/common/prommetrics/prommetrics.go @@ -85,6 +85,8 @@ func Start(listener net.Listener) error { const ( APIKeyName = "api" MessageTransferKeyName = "message-transfer" + + TTL = 300 ) type Target struct { @@ -97,10 +99,14 @@ type RespTarget struct { Labels map[string]string `json:"labels"` } -func BuildDiscoveryKey(name string) string { +func BuildDiscoveryKeyPrefix(name string) string { return fmt.Sprintf("%s/%s/%s", "openim", "prometheus_discovery", name) } +func BuildDiscoveryKey(name string, index int) string { + return fmt.Sprintf("%s/%s/%s/%d", "openim", "prometheus_discovery", name, index) +} + func BuildDefaultTarget(host string, ip int) Target { return Target{ Target: fmt.Sprintf("%s:%d", host, ip), diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index b99d32db1..06e19d8d2 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -50,7 +50,7 @@ func init() { func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP, registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T, watchConfigNames []string, watchServiceNames []string, - rpcFn func(ctx context.Context, config T, client discovery.Conn, server grpc.ServiceRegistrar) error, + rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error, options ...grpc.ServerOption) error { if notification != nil { @@ -148,9 +148,11 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c if err != nil { return err } - if err := client.SetKey(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), target); err != nil { - if !errors.Is(err, discovery.ErrNotSupportedKeyValue) { - return err + if autoSetPorts { + if err = client.SetWithLease(ctx, prommetrics.BuildDiscoveryKey(rpcRegisterName, index), target, prommetrics.TTL); err != nil { + if !errors.Is(err, discovery.ErrNotSupported) { + return err + } } } go func() {