diff --git a/go.mod b/go.mod index e1500ac11..28b4a872d 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.73-alpha.12 - github.com/openimsdk/tools v0.0.50-alpha.84 + github.com/openimsdk/tools v0.0.50-alpha.89 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 6298f98c9..d9ea83329 100644 --- a/go.sum +++ b/go.sum @@ -349,8 +349,8 @@ github.com/openimsdk/gomake v0.0.15-alpha.5 h1:eEZCEHm+NsmcO3onXZPIUbGFCYPYbsX5b github.com/openimsdk/gomake v0.0.15-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce2WmwEtbP4JvKk= github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= -github.com/openimsdk/tools v0.0.50-alpha.84 h1:jN60Ys/0edZjL/TDmm/5VSJFP4pGYRipkWqhILJbq/8= -github.com/openimsdk/tools v0.0.50-alpha.84/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= +github.com/openimsdk/tools v0.0.50-alpha.89 h1:aAbWSc3gOI//+KQ70i7ilOTiLqQNotmp+bobg4Gu8qI= +github.com/openimsdk/tools v0.0.50-alpha.89/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/api/init.go b/internal/api/init.go index fe8ac1cd0..3fb5364f3 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -94,9 +94,16 @@ func Start(ctx context.Context, index int, cfg *Config) error { return err } - etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + etcdClient, ok := client.(*etcd.SvcDiscoveryRegistryImpl) + if !ok { + return errs.New("only etcd support autoSetPorts").Wrap() + } - _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) + target, err := jsonutil.JsonMarshal(prommetrics.BuildDefaultTarget(registerIP, prometheusPort)) + if err != nil { + return errs.Wrap(err) + } + err = etcdClient.SetWithLease(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName, index), target, prommetrics.TTL) if err != nil { return errs.WrapMsg(err, "etcd put err") } diff --git a/internal/api/prometheus_discovery.go b/internal/api/prometheus_discovery.go index 6f17953ae..6ec563721 100644 --- a/internal/api/prometheus_discovery.go +++ b/internal/api/prometheus_discovery.go @@ -2,6 +2,7 @@ package api import ( "encoding/json" + "errors" "net/http" "github.com/gin-gonic/gin" @@ -11,13 +12,11 @@ import ( "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/errs" - "github.com/openimsdk/tools/log" - clientv3 "go.etcd.io/etcd/client/v3" ) type PrometheusDiscoveryApi struct { config *Config - client *clientv3.Client + client discovery.SvcDiscoveryRegistry } func NewPrometheusDiscoveryApi(cfg *Config, client discovery.SvcDiscoveryRegistry) *PrometheusDiscoveryApi { @@ -25,7 +24,7 @@ func NewPrometheusDiscoveryApi(cfg *Config, client discovery.SvcDiscoveryRegistr config: cfg, } if cfg.Discovery.Enable == config.ETCD { - api.client = client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + api.client = client.(*etcd.SvcDiscoveryRegistryImpl) } return api } @@ -38,35 +37,32 @@ func (p *PrometheusDiscoveryApi) Enable(c *gin.Context) { } func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) { - eResp, err := p.client.Get(c, prommetrics.BuildDiscoveryKey(key)) + value, err := p.client.GetKeyWithPrefix(c, prommetrics.BuildDiscoveryKeyPrefix(key)) if err != nil { - // Log and respond with an error if preparation fails. - apiresp.GinError(c, errs.WrapMsg(err, "etcd get err")) + if errors.Is(err, discovery.ErrNotSupported) { + c.JSON(http.StatusOK, []struct{}{}) + return + } + apiresp.GinError(c, errs.WrapMsg(err, "get key value")) return } - if len(eResp.Kvs) == 0 { - c.JSON(http.StatusOK, []*prommetrics.Target{}) + if len(value) == 0 { + c.JSON(http.StatusOK, []*prommetrics.RespTarget{}) + return + } + var resp prommetrics.RespTarget + for i := range value { + var tmp prommetrics.Target + if err = json.Unmarshal(value[i], &tmp); err != nil { + apiresp.GinError(c, errs.WrapMsg(err, "json unmarshal err")) + return + } + + resp.Targets = append(resp.Targets, tmp.Target) + resp.Labels = tmp.Labels // default label is fixed. See prommetrics.BuildDefaultTarget } - var ( - resp = &prommetrics.RespTarget{ - Targets: make([]string, 0, len(eResp.Kvs)), - } - ) - - for i := range eResp.Kvs { - var target prommetrics.Target - err = json.Unmarshal(eResp.Kvs[i].Value, &target) - if err != nil { - log.ZError(c, "prometheus unmarshal err", errs.Wrap(err)) - } - resp.Targets = append(resp.Targets, target.Target) - if resp.Labels == nil { - resp.Labels = target.Labels - } - } - - c.JSON(200, []*prommetrics.RespTarget{resp}) + c.JSON(http.StatusOK, []*prommetrics.RespTarget{&resp}) } func (p *PrometheusDiscoveryApi) Api(c *gin.Context) { diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 1ac97eeb1..be858ace1 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -176,9 +176,16 @@ func (m *MsgTransfer) Start(index int, cfg *Config) error { return err } - etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + etcdClient, ok := client.(*etcd.SvcDiscoveryRegistryImpl) + if !ok { + return errs.New("only etcd support autoSetPorts").Wrap() + } - _, err = etcdClient.Put(context.TODO(), prommetrics.BuildDiscoveryKey(prommetrics.MessageTransferKeyName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) + target, err := jsonutil.JsonMarshal(prommetrics.BuildDefaultTarget(registerIP, prometheusPort)) + if err != nil { + return errs.Wrap(err) + } + err = etcdClient.SetWithLease(context.TODO(), prommetrics.BuildDiscoveryKey(prommetrics.MessageTransferKeyName, index), target, prommetrics.TTL) if err != nil { return errs.WrapMsg(err, "etcd put err") } diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index 740d27eac..110a08aa8 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -29,7 +29,7 @@ import ( func NewDiscoveryRegister(discovery *config.Discovery, share *config.Share, watchNames []string) (discovery.SvcDiscoveryRegistry, error) { switch discovery.Enable { case "k8s": - return kubernetes.NewKubernetesConnManager("default", + return kubernetes.NewConnManager("default", watchNames, grpc.WithDefaultCallOptions( grpc.MaxCallSendMsgSize(1024*1024*20), ), diff --git a/pkg/common/prommetrics/discovery.go b/pkg/common/prommetrics/discovery.go index 8f03bc2ae..f989caddc 100644 --- a/pkg/common/prommetrics/discovery.go +++ b/pkg/common/prommetrics/discovery.go @@ -5,6 +5,7 @@ import "fmt" const ( APIKeyName = "api" MessageTransferKeyName = "message-transfer" + TTL = 300 ) type Target struct { @@ -17,10 +18,14 @@ type RespTarget struct { Labels map[string]string `json:"labels"` } -func BuildDiscoveryKey(name string) string { +func BuildDiscoveryKeyPrefix(name string) string { return fmt.Sprintf("%s/%s/%s", "openim", "prometheus_discovery", name) } +func BuildDiscoveryKey(name string, index int) string { + return fmt.Sprintf("%s/%s/%s/%d", "openim", "prometheus_discovery", name, index) +} + func BuildDefaultTarget(host string, ip int) Target { return Target{ Target: fmt.Sprintf("%s:%d", host, ip), diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index c82ba0326..8b300ff8f 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -164,9 +164,16 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf return err } - etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + etcdClient, ok := client.(*etcd.SvcDiscoveryRegistryImpl) + if !ok { + return errs.New("only etcd support autoSetPorts").Wrap() + } - _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(rpcRegisterName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) + target, err := jsonutil.JsonMarshal(prommetrics.BuildDefaultTarget(registerIP, prometheusPort)) + if err != nil { + return errs.Wrap(err) + } + err = etcdClient.SetWithLease(ctx, prommetrics.BuildDiscoveryKey(rpcRegisterName, index), target, prommetrics.TTL) if err != nil { return errs.WrapMsg(err, "etcd put err") }