diff --git a/internal/api/init.go b/internal/api/init.go index e07069886..3fb5364f3 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -94,9 +94,16 @@ func Start(ctx context.Context, index int, cfg *Config) error { return err } - etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + etcdClient, ok := client.(*etcd.SvcDiscoveryRegistryImpl) + if !ok { + return errs.New("only etcd support autoSetPorts").Wrap() + } - _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName, index), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) + target, err := jsonutil.JsonMarshal(prommetrics.BuildDefaultTarget(registerIP, prometheusPort)) + if err != nil { + return errs.Wrap(err) + } + err = etcdClient.SetWithLease(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName, index), target, prommetrics.TTL) if err != nil { return errs.WrapMsg(err, "etcd put err") } diff --git a/internal/api/prometheus_discovery.go b/internal/api/prometheus_discovery.go index 26735c41c..3369d9e6b 100644 --- a/internal/api/prometheus_discovery.go +++ b/internal/api/prometheus_discovery.go @@ -11,7 +11,8 @@ import ( "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/errs" - "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/utils/datautil" + "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -40,33 +41,28 @@ func (p *PrometheusDiscoveryApi) Enable(c *gin.Context) { func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) { eResp, err := p.client.Get(c, prommetrics.BuildDiscoveryKeyPrefix(key), clientv3.WithPrefix()) if err != nil { - // Log and respond with an error if preparation fails. - apiresp.GinError(c, errs.WrapMsg(err, "etcd get err")) + apiresp.GinError(c, errs.WrapMsg(err, "get key value")) return } - if len(eResp.Kvs) == 0 { - c.JSON(http.StatusOK, []*prommetrics.Target{}) + value := datautil.Batch(func(kv *mvccpb.KeyValue) []byte { return kv.Value }, eResp.Kvs) + + if len(value) == 0 { + c.JSON(http.StatusOK, []*prommetrics.RespTarget{}) + return + } + var resp prommetrics.RespTarget + for i := range value { + var tmp prommetrics.Target + if err = json.Unmarshal(value[i], &tmp); err != nil { + apiresp.GinError(c, errs.WrapMsg(err, "json unmarshal err")) + return + } + + resp.Targets = append(resp.Targets, tmp.Target) + resp.Labels = tmp.Labels // default label is fixed. See prommetrics.BuildDefaultTarget } - var ( - resp = &prommetrics.RespTarget{ - Targets: make([]string, 0, len(eResp.Kvs)), - } - ) - - for i := range eResp.Kvs { - var target prommetrics.Target - err = json.Unmarshal(eResp.Kvs[i].Value, &target) - if err != nil { - log.ZError(c, "prometheus unmarshal err", errs.Wrap(err)) - } - resp.Targets = append(resp.Targets, target.Target) - if resp.Labels == nil { - resp.Labels = target.Labels - } - } - - c.JSON(200, []*prommetrics.RespTarget{resp}) + c.JSON(http.StatusOK, []*prommetrics.RespTarget{&resp}) } func (p *PrometheusDiscoveryApi) Api(c *gin.Context) { diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index b600b5682..be858ace1 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -176,9 +176,16 @@ func (m *MsgTransfer) Start(index int, cfg *Config) error { return err } - etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + etcdClient, ok := client.(*etcd.SvcDiscoveryRegistryImpl) + if !ok { + return errs.New("only etcd support autoSetPorts").Wrap() + } - _, err = etcdClient.Put(context.TODO(), prommetrics.BuildDiscoveryKey(prommetrics.MessageTransferKeyName, index), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) + target, err := jsonutil.JsonMarshal(prommetrics.BuildDefaultTarget(registerIP, prometheusPort)) + if err != nil { + return errs.Wrap(err) + } + err = etcdClient.SetWithLease(context.TODO(), prommetrics.BuildDiscoveryKey(prommetrics.MessageTransferKeyName, index), target, prommetrics.TTL) if err != nil { return errs.WrapMsg(err, "etcd put err") } diff --git a/pkg/common/prommetrics/discovery.go b/pkg/common/prommetrics/discovery.go index a3cb54225..f989caddc 100644 --- a/pkg/common/prommetrics/discovery.go +++ b/pkg/common/prommetrics/discovery.go @@ -5,6 +5,7 @@ import "fmt" const ( APIKeyName = "api" MessageTransferKeyName = "message-transfer" + TTL = 300 ) type Target struct { diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 525c7cb23..8b300ff8f 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -164,9 +164,16 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf return err } - etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + etcdClient, ok := client.(*etcd.SvcDiscoveryRegistryImpl) + if !ok { + return errs.New("only etcd support autoSetPorts").Wrap() + } - _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(rpcRegisterName, index), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) + target, err := jsonutil.JsonMarshal(prommetrics.BuildDefaultTarget(registerIP, prometheusPort)) + if err != nil { + return errs.Wrap(err) + } + err = etcdClient.SetWithLease(ctx, prommetrics.BuildDiscoveryKey(rpcRegisterName, index), target, prommetrics.TTL) if err != nil { return errs.WrapMsg(err, "etcd put err") }