From 15d7129c46560a67e817d6750b710a2ba77ce085 Mon Sep 17 00:00:00 2001 From: icey-yu <1186114839@qq.com> Date: Fri, 26 Sep 2025 12:29:20 +0800 Subject: [PATCH 1/7] chore: fix prometheus --- go.mod | 2 +- go.sum | 4 ++-- pkg/common/discoveryregister/discoveryregister.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index e1500ac11..28b4a872d 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.73-alpha.12 - github.com/openimsdk/tools v0.0.50-alpha.84 + github.com/openimsdk/tools v0.0.50-alpha.89 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 6298f98c9..d9ea83329 100644 --- a/go.sum +++ b/go.sum @@ -349,8 +349,8 @@ github.com/openimsdk/gomake v0.0.15-alpha.5 h1:eEZCEHm+NsmcO3onXZPIUbGFCYPYbsX5b github.com/openimsdk/gomake v0.0.15-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce2WmwEtbP4JvKk= github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= -github.com/openimsdk/tools v0.0.50-alpha.84 h1:jN60Ys/0edZjL/TDmm/5VSJFP4pGYRipkWqhILJbq/8= -github.com/openimsdk/tools v0.0.50-alpha.84/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= +github.com/openimsdk/tools v0.0.50-alpha.89 h1:aAbWSc3gOI//+KQ70i7ilOTiLqQNotmp+bobg4Gu8qI= +github.com/openimsdk/tools v0.0.50-alpha.89/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index 740d27eac..110a08aa8 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -29,7 +29,7 @@ import ( func NewDiscoveryRegister(discovery *config.Discovery, share *config.Share, watchNames []string) (discovery.SvcDiscoveryRegistry, error) { switch discovery.Enable { case "k8s": - return kubernetes.NewKubernetesConnManager("default", + return kubernetes.NewConnManager("default", watchNames, grpc.WithDefaultCallOptions( grpc.MaxCallSendMsgSize(1024*1024*20), ), From 909f3a4a24679568b26c93b7c52dfd8da4558c18 Mon Sep 17 00:00:00 2001 From: icey-yu <1186114839@qq.com> Date: Fri, 26 Sep 2025 15:27:46 +0800 Subject: [PATCH 2/7] fix: prometheus --- internal/api/init.go | 2 +- internal/api/prometheus_discovery.go | 2 +- internal/msgtransfer/init.go | 2 +- pkg/common/prommetrics/discovery.go | 6 +++++- pkg/common/startrpc/start.go | 2 +- 5 files changed, 9 insertions(+), 5 deletions(-) diff --git a/internal/api/init.go b/internal/api/init.go index fe8ac1cd0..e07069886 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -96,7 +96,7 @@ func Start(ctx context.Context, index int, cfg *Config) error { etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() - _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) + _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName, index), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) if err != nil { return errs.WrapMsg(err, "etcd put err") } diff --git a/internal/api/prometheus_discovery.go b/internal/api/prometheus_discovery.go index 6f17953ae..26735c41c 100644 --- a/internal/api/prometheus_discovery.go +++ b/internal/api/prometheus_discovery.go @@ -38,7 +38,7 @@ func (p *PrometheusDiscoveryApi) Enable(c *gin.Context) { } func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) { - eResp, err := p.client.Get(c, prommetrics.BuildDiscoveryKey(key)) + eResp, err := p.client.Get(c, prommetrics.BuildDiscoveryKeyPrefix(key), clientv3.WithPrefix()) if err != nil { // Log and respond with an error if preparation fails. apiresp.GinError(c, errs.WrapMsg(err, "etcd get err")) diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 1ac97eeb1..b600b5682 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -178,7 +178,7 @@ func (m *MsgTransfer) Start(index int, cfg *Config) error { etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() - _, err = etcdClient.Put(context.TODO(), prommetrics.BuildDiscoveryKey(prommetrics.MessageTransferKeyName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) + _, err = etcdClient.Put(context.TODO(), prommetrics.BuildDiscoveryKey(prommetrics.MessageTransferKeyName, index), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) if err != nil { return errs.WrapMsg(err, "etcd put err") } diff --git a/pkg/common/prommetrics/discovery.go b/pkg/common/prommetrics/discovery.go index 8f03bc2ae..a3cb54225 100644 --- a/pkg/common/prommetrics/discovery.go +++ b/pkg/common/prommetrics/discovery.go @@ -17,10 +17,14 @@ type RespTarget struct { Labels map[string]string `json:"labels"` } -func BuildDiscoveryKey(name string) string { +func BuildDiscoveryKeyPrefix(name string) string { return fmt.Sprintf("%s/%s/%s", "openim", "prometheus_discovery", name) } +func BuildDiscoveryKey(name string, index int) string { + return fmt.Sprintf("%s/%s/%s/%d", "openim", "prometheus_discovery", name, index) +} + func BuildDefaultTarget(host string, ip int) Target { return Target{ Target: fmt.Sprintf("%s:%d", host, ip), diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index c82ba0326..525c7cb23 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -166,7 +166,7 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() - _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(rpcRegisterName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) + _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(rpcRegisterName, index), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) if err != nil { return errs.WrapMsg(err, "etcd put err") } From 0bd60efeab1c7a3027ca1eebcc61ec56936ccedb Mon Sep 17 00:00:00 2001 From: icey-yu <1186114839@qq.com> Date: Fri, 26 Sep 2025 15:44:18 +0800 Subject: [PATCH 3/7] fix: prometheus --- internal/api/init.go | 11 +++++-- internal/api/prometheus_discovery.go | 44 +++++++++++++--------------- internal/msgtransfer/init.go | 11 +++++-- pkg/common/prommetrics/discovery.go | 1 + pkg/common/startrpc/start.go | 11 +++++-- 5 files changed, 48 insertions(+), 30 deletions(-) diff --git a/internal/api/init.go b/internal/api/init.go index e07069886..3fb5364f3 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -94,9 +94,16 @@ func Start(ctx context.Context, index int, cfg *Config) error { return err } - etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + etcdClient, ok := client.(*etcd.SvcDiscoveryRegistryImpl) + if !ok { + return errs.New("only etcd support autoSetPorts").Wrap() + } - _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName, index), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) + target, err := jsonutil.JsonMarshal(prommetrics.BuildDefaultTarget(registerIP, prometheusPort)) + if err != nil { + return errs.Wrap(err) + } + err = etcdClient.SetWithLease(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName, index), target, prommetrics.TTL) if err != nil { return errs.WrapMsg(err, "etcd put err") } diff --git a/internal/api/prometheus_discovery.go b/internal/api/prometheus_discovery.go index 26735c41c..3369d9e6b 100644 --- a/internal/api/prometheus_discovery.go +++ b/internal/api/prometheus_discovery.go @@ -11,7 +11,8 @@ import ( "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/errs" - "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/utils/datautil" + "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -40,33 +41,28 @@ func (p *PrometheusDiscoveryApi) Enable(c *gin.Context) { func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) { eResp, err := p.client.Get(c, prommetrics.BuildDiscoveryKeyPrefix(key), clientv3.WithPrefix()) if err != nil { - // Log and respond with an error if preparation fails. - apiresp.GinError(c, errs.WrapMsg(err, "etcd get err")) + apiresp.GinError(c, errs.WrapMsg(err, "get key value")) return } - if len(eResp.Kvs) == 0 { - c.JSON(http.StatusOK, []*prommetrics.Target{}) + value := datautil.Batch(func(kv *mvccpb.KeyValue) []byte { return kv.Value }, eResp.Kvs) + + if len(value) == 0 { + c.JSON(http.StatusOK, []*prommetrics.RespTarget{}) + return + } + var resp prommetrics.RespTarget + for i := range value { + var tmp prommetrics.Target + if err = json.Unmarshal(value[i], &tmp); err != nil { + apiresp.GinError(c, errs.WrapMsg(err, "json unmarshal err")) + return + } + + resp.Targets = append(resp.Targets, tmp.Target) + resp.Labels = tmp.Labels // default label is fixed. See prommetrics.BuildDefaultTarget } - var ( - resp = &prommetrics.RespTarget{ - Targets: make([]string, 0, len(eResp.Kvs)), - } - ) - - for i := range eResp.Kvs { - var target prommetrics.Target - err = json.Unmarshal(eResp.Kvs[i].Value, &target) - if err != nil { - log.ZError(c, "prometheus unmarshal err", errs.Wrap(err)) - } - resp.Targets = append(resp.Targets, target.Target) - if resp.Labels == nil { - resp.Labels = target.Labels - } - } - - c.JSON(200, []*prommetrics.RespTarget{resp}) + c.JSON(http.StatusOK, []*prommetrics.RespTarget{&resp}) } func (p *PrometheusDiscoveryApi) Api(c *gin.Context) { diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index b600b5682..be858ace1 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -176,9 +176,16 @@ func (m *MsgTransfer) Start(index int, cfg *Config) error { return err } - etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + etcdClient, ok := client.(*etcd.SvcDiscoveryRegistryImpl) + if !ok { + return errs.New("only etcd support autoSetPorts").Wrap() + } - _, err = etcdClient.Put(context.TODO(), prommetrics.BuildDiscoveryKey(prommetrics.MessageTransferKeyName, index), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) + target, err := jsonutil.JsonMarshal(prommetrics.BuildDefaultTarget(registerIP, prometheusPort)) + if err != nil { + return errs.Wrap(err) + } + err = etcdClient.SetWithLease(context.TODO(), prommetrics.BuildDiscoveryKey(prommetrics.MessageTransferKeyName, index), target, prommetrics.TTL) if err != nil { return errs.WrapMsg(err, "etcd put err") } diff --git a/pkg/common/prommetrics/discovery.go b/pkg/common/prommetrics/discovery.go index a3cb54225..f989caddc 100644 --- a/pkg/common/prommetrics/discovery.go +++ b/pkg/common/prommetrics/discovery.go @@ -5,6 +5,7 @@ import "fmt" const ( APIKeyName = "api" MessageTransferKeyName = "message-transfer" + TTL = 300 ) type Target struct { diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 525c7cb23..8b300ff8f 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -164,9 +164,16 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf return err } - etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + etcdClient, ok := client.(*etcd.SvcDiscoveryRegistryImpl) + if !ok { + return errs.New("only etcd support autoSetPorts").Wrap() + } - _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(rpcRegisterName, index), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) + target, err := jsonutil.JsonMarshal(prommetrics.BuildDefaultTarget(registerIP, prometheusPort)) + if err != nil { + return errs.Wrap(err) + } + err = etcdClient.SetWithLease(ctx, prommetrics.BuildDiscoveryKey(rpcRegisterName, index), target, prommetrics.TTL) if err != nil { return errs.WrapMsg(err, "etcd put err") } From 36810b99a4b839559d4eb95012459b5843406124 Mon Sep 17 00:00:00 2001 From: icey-yu <1186114839@qq.com> Date: Fri, 26 Sep 2025 15:53:19 +0800 Subject: [PATCH 4/7] fix: prometheus --- internal/api/prometheus_discovery.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/internal/api/prometheus_discovery.go b/internal/api/prometheus_discovery.go index 3369d9e6b..6bd9cd4a7 100644 --- a/internal/api/prometheus_discovery.go +++ b/internal/api/prometheus_discovery.go @@ -11,14 +11,11 @@ import ( "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/errs" - "github.com/openimsdk/tools/utils/datautil" - "go.etcd.io/etcd/api/v3/mvccpb" - clientv3 "go.etcd.io/etcd/client/v3" ) type PrometheusDiscoveryApi struct { config *Config - client *clientv3.Client + client discovery.SvcDiscoveryRegistry } func NewPrometheusDiscoveryApi(cfg *Config, client discovery.SvcDiscoveryRegistry) *PrometheusDiscoveryApi { @@ -26,7 +23,7 @@ func NewPrometheusDiscoveryApi(cfg *Config, client discovery.SvcDiscoveryRegistr config: cfg, } if cfg.Discovery.Enable == config.ETCD { - api.client = client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + api.client = client.(*etcd.SvcDiscoveryRegistryImpl) } return api } @@ -39,12 +36,11 @@ func (p *PrometheusDiscoveryApi) Enable(c *gin.Context) { } func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) { - eResp, err := p.client.Get(c, prommetrics.BuildDiscoveryKeyPrefix(key), clientv3.WithPrefix()) + value, err := p.client.GetKeyWithPrefix(c, prommetrics.BuildDiscoveryKeyPrefix(key)) if err != nil { apiresp.GinError(c, errs.WrapMsg(err, "get key value")) return } - value := datautil.Batch(func(kv *mvccpb.KeyValue) []byte { return kv.Value }, eResp.Kvs) if len(value) == 0 { c.JSON(http.StatusOK, []*prommetrics.RespTarget{}) From 95d383d656f167736e2a80a14e03b114cc64be22 Mon Sep 17 00:00:00 2001 From: icey-yu <1186114839@qq.com> Date: Fri, 26 Sep 2025 15:53:52 +0800 Subject: [PATCH 5/7] fix: prometheus --- internal/api/prometheus_discovery.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/api/prometheus_discovery.go b/internal/api/prometheus_discovery.go index 6bd9cd4a7..6ec563721 100644 --- a/internal/api/prometheus_discovery.go +++ b/internal/api/prometheus_discovery.go @@ -2,6 +2,7 @@ package api import ( "encoding/json" + "errors" "net/http" "github.com/gin-gonic/gin" @@ -38,10 +39,13 @@ func (p *PrometheusDiscoveryApi) Enable(c *gin.Context) { func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) { value, err := p.client.GetKeyWithPrefix(c, prommetrics.BuildDiscoveryKeyPrefix(key)) if err != nil { + if errors.Is(err, discovery.ErrNotSupported) { + c.JSON(http.StatusOK, []struct{}{}) + return + } apiresp.GinError(c, errs.WrapMsg(err, "get key value")) return } - if len(value) == 0 { c.JSON(http.StatusOK, []*prommetrics.RespTarget{}) return From fd479c15b46976f7a50c15a68045915197390ec8 Mon Sep 17 00:00:00 2001 From: icey-yu <1186114839@qq.com> Date: Fri, 26 Sep 2025 15:55:33 +0800 Subject: [PATCH 6/7] fix: prometheus --- internal/api/prometheus_discovery.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/api/prometheus_discovery.go b/internal/api/prometheus_discovery.go index 6ec563721..c12ddfcfe 100644 --- a/internal/api/prometheus_discovery.go +++ b/internal/api/prometheus_discovery.go @@ -3,6 +3,7 @@ package api import ( "encoding/json" "errors" + "fmt" "net/http" "github.com/gin-gonic/gin" @@ -50,6 +51,7 @@ func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) { c.JSON(http.StatusOK, []*prommetrics.RespTarget{}) return } + fmt.Println(len(value)) var resp prommetrics.RespTarget for i := range value { var tmp prommetrics.Target From 34c35099d15dcbd9113bafa232fce3781d7c37d4 Mon Sep 17 00:00:00 2001 From: icey-yu <1186114839@qq.com> Date: Fri, 26 Sep 2025 16:38:02 +0800 Subject: [PATCH 7/7] fix: prometheus --- internal/api/prometheus_discovery.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/api/prometheus_discovery.go b/internal/api/prometheus_discovery.go index c12ddfcfe..6ec563721 100644 --- a/internal/api/prometheus_discovery.go +++ b/internal/api/prometheus_discovery.go @@ -3,7 +3,6 @@ package api import ( "encoding/json" "errors" - "fmt" "net/http" "github.com/gin-gonic/gin" @@ -51,7 +50,6 @@ func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) { c.JSON(http.StatusOK, []*prommetrics.RespTarget{}) return } - fmt.Println(len(value)) var resp prommetrics.RespTarget for i := range value { var tmp prommetrics.Target