From 48928d7d848f65ec7abfcfe3cfaa5224b5b1a913 Mon Sep 17 00:00:00 2001 From: Dean Wang Date: Sat, 18 Jan 2025 16:37:51 +0800 Subject: [PATCH 1/2] resolve the prometheus discovery issue of multiple instances with same service --- internal/api/init.go | 2 +- internal/api/prometheus_discovery.go | 2 +- internal/msgtransfer/init.go | 2 +- pkg/common/prommetrics/discovery.go | 8 ++++++-- pkg/common/startrpc/start.go | 2 +- 5 files changed, 10 insertions(+), 6 deletions(-) diff --git a/internal/api/init.go b/internal/api/init.go index 20237ebc2..d5084d96e 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -109,7 +109,7 @@ func Start(ctx context.Context, index int, config *Config) error { etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() - _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) + _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName, registerIP, prometheusPort), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) if err != nil { return errs.WrapMsg(err, "etcd put err") } diff --git a/internal/api/prometheus_discovery.go b/internal/api/prometheus_discovery.go index 5e1a9cae2..583c3ae8d 100644 --- a/internal/api/prometheus_discovery.go +++ b/internal/api/prometheus_discovery.go @@ -38,7 +38,7 @@ func (p *PrometheusDiscoveryApi) Enable(c *gin.Context) { } func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) { - eResp, err := p.client.Get(c, prommetrics.BuildDiscoveryKey(key)) + eResp, err := p.client.Get(c, prommetrics.BuildDiscoveryKeyPrefix(key)) if err != nil { // Log and respond with an error if preparation fails. apiresp.GinError(c, errs.WrapMsg(err, "etcd get err")) diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 96e6bbde0..8d791de31 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -194,7 +194,7 @@ func (m *MsgTransfer) Start(index int, config *Config, client discovery.SvcDisco etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() - _, err = etcdClient.Put(context.TODO(), prommetrics.BuildDiscoveryKey(prommetrics.MessageTransferKeyName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) + _, err = etcdClient.Put(context.TODO(), prommetrics.BuildDiscoveryKey(prommetrics.MessageTransferKeyName, registerIP, prometheusPort), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) if err != nil { return errs.WrapMsg(err, "etcd put err") } diff --git a/pkg/common/prommetrics/discovery.go b/pkg/common/prommetrics/discovery.go index 8f03bc2ae..b429adb7f 100644 --- a/pkg/common/prommetrics/discovery.go +++ b/pkg/common/prommetrics/discovery.go @@ -17,8 +17,12 @@ type RespTarget struct { Labels map[string]string `json:"labels"` } -func BuildDiscoveryKey(name string) string { - return fmt.Sprintf("%s/%s/%s", "openim", "prometheus_discovery", name) +func BuildDiscoveryKeyPrefix(name string) string { + return fmt.Sprintf("%s/%s/%s/", "openim", "prometheus_discovery", name) +} + +func BuildDiscoveryKey(name string, host string, port int) string { + return fmt.Sprintf("%s/%s/%s/%s:%d", "openim", "prometheus_discovery", name, host, port) } func BuildDefaultTarget(host string, ip int) Target { diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 27aabca95..0401f83be 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -128,7 +128,7 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() - _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(rpcRegisterName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) + _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(rpcRegisterName, registerIP, prometheusPort), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) if err != nil { return errs.WrapMsg(err, "etcd put err") } From 2eb30b2df00bc0a819edd9a626aeb47f2601b464 Mon Sep 17 00:00:00 2001 From: Dean Wang Date: Tue, 21 Jan 2025 21:33:29 +0800 Subject: [PATCH 2/2] resolve the prometheus discovery issue of multiple instances with same service --- internal/api/init.go | 7 +----- internal/msgtransfer/init.go | 7 +----- pkg/common/prommetrics/discovery.go | 36 ++++++++++++++++++++++++++++- pkg/common/startrpc/start.go | 7 +----- 4 files changed, 38 insertions(+), 19 deletions(-) diff --git a/internal/api/init.go b/internal/api/init.go index d5084d96e..1cac873eb 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -36,7 +36,6 @@ import ( "github.com/openimsdk/tools/mw" "github.com/openimsdk/tools/system/program" "github.com/openimsdk/tools/utils/datautil" - "github.com/openimsdk/tools/utils/jsonutil" "github.com/openimsdk/tools/utils/network" "github.com/openimsdk/tools/utils/runtimeenv" "google.golang.org/grpc" @@ -108,11 +107,7 @@ func Start(ctx context.Context, index int, config *Config) error { } etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() - - _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName, registerIP, prometheusPort), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) - if err != nil { - return errs.WrapMsg(err, "etcd put err") - } + prommetrics.Register(ctx, etcdClient, prommetrics.APIKeyName, registerIP, prometheusPort) } else { prometheusPort, err = datautil.GetElemByIndex(config.API.Prometheus.Ports, index) if err != nil { diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 8d791de31..257a98c01 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -28,7 +28,6 @@ import ( disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery/etcd" - "github.com/openimsdk/tools/utils/jsonutil" "github.com/openimsdk/tools/utils/network" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" @@ -193,11 +192,7 @@ func (m *MsgTransfer) Start(index int, config *Config, client discovery.SvcDisco } etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() - - _, err = etcdClient.Put(context.TODO(), prommetrics.BuildDiscoveryKey(prommetrics.MessageTransferKeyName, registerIP, prometheusPort), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) - if err != nil { - return errs.WrapMsg(err, "etcd put err") - } + prommetrics.Register(context.TODO(), etcdClient, prommetrics.MessageTransferKeyName, registerIP, prometheusPort) } else { prometheusPort, err = datautil.GetElemByIndex(config.MsgTransfer.Prometheus.Ports, index) if err != nil { diff --git a/pkg/common/prommetrics/discovery.go b/pkg/common/prommetrics/discovery.go index b429adb7f..b45902d63 100644 --- a/pkg/common/prommetrics/discovery.go +++ b/pkg/common/prommetrics/discovery.go @@ -1,6 +1,13 @@ package prommetrics -import "fmt" +import ( + "context" + "fmt" + + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/utils/jsonutil" + clientv3 "go.etcd.io/etcd/client/v3" +) const ( APIKeyName = "api" @@ -33,3 +40,30 @@ func BuildDefaultTarget(host string, ip int) Target { }, } } + +func Register(ctx context.Context, etcdClient *clientv3.Client, rpcRegisterName string, registerIP string, prometheusPort int) error { + // create lease + leaseResp, err := etcdClient.Grant(ctx, 30) + if err != nil { + return errs.WrapMsg(err, "failed to create lease in etcd") + } + // release + keepAliveChan, err := etcdClient.KeepAlive(ctx, leaseResp.ID) + if err != nil { + return errs.WrapMsg(err, "failed to keep alive lease") + } + // release resp + go func() { + for range keepAliveChan { + } + }() + putOpts := []clientv3.OpOption{} + if leaseResp != nil { + putOpts = append(putOpts, clientv3.WithLease(leaseResp.ID)) + } + _, err = etcdClient.Put(ctx, BuildDiscoveryKey(rpcRegisterName, registerIP, prometheusPort), jsonutil.StructToJsonString(BuildDefaultTarget(registerIP, prometheusPort)), putOpts...) + if err != nil { + return errs.WrapMsg(err, "etcd put err") + } + return nil +} diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 0401f83be..58601f9a7 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -30,7 +30,6 @@ import ( disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/utils/datautil" - "github.com/openimsdk/tools/utils/jsonutil" "google.golang.org/grpc/status" "github.com/openimsdk/tools/utils/runtimeenv" @@ -127,11 +126,7 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf } etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() - - _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(rpcRegisterName, registerIP, prometheusPort), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) - if err != nil { - return errs.WrapMsg(err, "etcd put err") - } + prommetrics.Register(ctx, etcdClient, rpcRegisterName, registerIP, prometheusPort) } else { prometheusPort, err = datautil.GetElemByIndex(prometheusConfig.Ports, index) if err != nil {