diff --git a/internal/api/init.go b/internal/api/init.go index 3b6a40913..5c3a25224 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -29,14 +29,12 @@ import ( conf "github.com/openimsdk/open-im-server/v3/pkg/common/config" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery" disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" - "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mw" "github.com/openimsdk/tools/system/program" "github.com/openimsdk/tools/utils/datautil" - "github.com/openimsdk/tools/utils/jsonutil" "github.com/openimsdk/tools/utils/network" "github.com/openimsdk/tools/utils/runtimeenv" "google.golang.org/grpc" @@ -69,66 +67,67 @@ func Start(ctx context.Context, index int, config *Config) error { prometheusPort int ) - registerIP, err := network.GetRpcRegisterIP("") - if err != nil { - return err - } - - getAutoPort := func() (net.Listener, int, error) { - registerAddr := net.JoinHostPort(registerIP, "0") - listener, err := net.Listen("tcp", registerAddr) - if err != nil { - return nil, 0, errs.WrapMsg(err, "listen err", "registerAddr", registerAddr) - } - _, portStr, _ := net.SplitHostPort(listener.Addr().String()) - port, _ := strconv.Atoi(portStr) - return listener, port, nil - } - - if config.API.Prometheus.AutoSetPorts && config.Discovery.Enable != conf.ETCD { - return errs.New("only etcd support autoSetPorts", "RegisterName", "api").Wrap() - } + //registerIP, err := network.GetRpcRegisterIP("") + //if err != nil { + // return err + //} + // + // todo + //getAutoPort := func() (net.Listener, int, error) { + // registerAddr := net.JoinHostPort(registerIP, "0") + // listener, err := net.Listen("tcp", registerAddr) + // if err != nil { + // return nil, 0, errs.WrapMsg(err, "listen err", "registerAddr", registerAddr) + // } + // _, portStr, _ := net.SplitHostPort(listener.Addr().String()) + // port, _ := strconv.Atoi(portStr) + // return listener, port, nil + //} + // + //if config.API.Prometheus.AutoSetPorts && config.Discovery.Enable != conf.ETCD { + // return errs.New("only etcd support autoSetPorts", "RegisterName", "api").Wrap() + //} router, err := newGinRouter(ctx, client, config) if err != nil { return err } - if config.API.Prometheus.Enable { - var ( - listener net.Listener - ) - - if config.API.Prometheus.AutoSetPorts { - listener, prometheusPort, err = getAutoPort() - if err != nil { - return err - } - - etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() - - _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) - if err != nil { - return errs.WrapMsg(err, "etcd put err") - } - } else { - prometheusPort, err = datautil.GetElemByIndex(config.API.Prometheus.Ports, index) - if err != nil { - return err - } - listener, err = net.Listen("tcp", fmt.Sprintf(":%d", prometheusPort)) - if err != nil { - return errs.WrapMsg(err, "listen err", "addr", fmt.Sprintf(":%d", prometheusPort)) - } - } - - go func() { - if err := prommetrics.ApiInit(listener); err != nil && !errors.Is(err, http.ErrServerClosed) { - netErr = errs.WrapMsg(err, fmt.Sprintf("api prometheus start err: %d", prometheusPort)) - netDone <- struct{}{} - } - }() - - } + //if config.API.Prometheus.Enable { + // var ( + // listener net.Listener + // ) + // + // if config.API.Prometheus.AutoSetPorts { + // listener, prometheusPort, err = getAutoPort() + // if err != nil { + // return err + // } + // + // etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + // + // _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) + // if err != nil { + // return errs.WrapMsg(err, "etcd put err") + // } + // } else { + // prometheusPort, err = datautil.GetElemByIndex(config.API.Prometheus.Ports, index) + // if err != nil { + // return err + // } + // listener, err = net.Listen("tcp", fmt.Sprintf(":%d", prometheusPort)) + // if err != nil { + // return errs.WrapMsg(err, "listen err", "addr", fmt.Sprintf(":%d", prometheusPort)) + // } + // } + // + // go func() { + // if err := prommetrics.ApiInit(listener); err != nil && !errors.Is(err, http.ErrServerClosed) { + // netErr = errs.WrapMsg(err, fmt.Sprintf("api prometheus start err: %d", prometheusPort)) + // netDone <- struct{}{} + // } + // }() + // + //} address := net.JoinHostPort(network.GetListenIP(config.API.Api.ListenIP), strconv.Itoa(apiPort)) server := http.Server{Addr: address, Handler: router} diff --git a/internal/push/onlinepusher.go b/internal/push/onlinepusher.go index 4ebe4413a..f4a3285ad 100644 --- a/internal/push/onlinepusher.go +++ b/internal/push/onlinepusher.go @@ -47,6 +47,8 @@ func NewOnlinePusher(disCov discovery.Conn, config *Config) OnlinePusher { switch config.Discovery.Enable { case conf.ETCD: return NewDefaultAllNode(disCov, config) + case conf.Standalone: + return nil default: log.ZWarn(context.Background(), "NewOnlinePusher is error", errs.Wrap(errors.New("unsupported discovery type")), "type", config.Discovery.Enable) return nil diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index b0b6c9a36..5c27b2f87 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -127,7 +127,6 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf } etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() - _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(rpcRegisterName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) if err != nil { return errs.WrapMsg(err, "etcd put err") @@ -176,7 +175,17 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf return err } + // todo + //err = client.Register( + // ctx, + // "prometheus"+rpcRegisterName, + // registerIP, + // port, + // grpc.WithTransportCredentials(insecure.NewCredentials()), + //) + err = client.Register( + ctx, rpcRegisterName, registerIP, port,