This commit is contained in:
withchao 2025-02-07 17:18:21 +08:00
parent 31a912c013
commit 9bc6a651cf
3 changed files with 68 additions and 58 deletions

View File

@ -29,14 +29,12 @@ import (
conf "github.com/openimsdk/open-im-server/v3/pkg/common/config"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery"
disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mw"
"github.com/openimsdk/tools/system/program"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/jsonutil"
"github.com/openimsdk/tools/utils/network"
"github.com/openimsdk/tools/utils/runtimeenv"
"google.golang.org/grpc"
@ -69,66 +67,67 @@ func Start(ctx context.Context, index int, config *Config) error {
prometheusPort int
)
registerIP, err := network.GetRpcRegisterIP("")
if err != nil {
return err
}
getAutoPort := func() (net.Listener, int, error) {
registerAddr := net.JoinHostPort(registerIP, "0")
listener, err := net.Listen("tcp", registerAddr)
if err != nil {
return nil, 0, errs.WrapMsg(err, "listen err", "registerAddr", registerAddr)
}
_, portStr, _ := net.SplitHostPort(listener.Addr().String())
port, _ := strconv.Atoi(portStr)
return listener, port, nil
}
if config.API.Prometheus.AutoSetPorts && config.Discovery.Enable != conf.ETCD {
return errs.New("only etcd support autoSetPorts", "RegisterName", "api").Wrap()
}
//registerIP, err := network.GetRpcRegisterIP("")
//if err != nil {
// return err
//}
//
// todo
//getAutoPort := func() (net.Listener, int, error) {
// registerAddr := net.JoinHostPort(registerIP, "0")
// listener, err := net.Listen("tcp", registerAddr)
// if err != nil {
// return nil, 0, errs.WrapMsg(err, "listen err", "registerAddr", registerAddr)
// }
// _, portStr, _ := net.SplitHostPort(listener.Addr().String())
// port, _ := strconv.Atoi(portStr)
// return listener, port, nil
//}
//
//if config.API.Prometheus.AutoSetPorts && config.Discovery.Enable != conf.ETCD {
// return errs.New("only etcd support autoSetPorts", "RegisterName", "api").Wrap()
//}
router, err := newGinRouter(ctx, client, config)
if err != nil {
return err
}
if config.API.Prometheus.Enable {
var (
listener net.Listener
)
if config.API.Prometheus.AutoSetPorts {
listener, prometheusPort, err = getAutoPort()
if err != nil {
return err
}
etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient()
_, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort)))
if err != nil {
return errs.WrapMsg(err, "etcd put err")
}
} else {
prometheusPort, err = datautil.GetElemByIndex(config.API.Prometheus.Ports, index)
if err != nil {
return err
}
listener, err = net.Listen("tcp", fmt.Sprintf(":%d", prometheusPort))
if err != nil {
return errs.WrapMsg(err, "listen err", "addr", fmt.Sprintf(":%d", prometheusPort))
}
}
go func() {
if err := prommetrics.ApiInit(listener); err != nil && !errors.Is(err, http.ErrServerClosed) {
netErr = errs.WrapMsg(err, fmt.Sprintf("api prometheus start err: %d", prometheusPort))
netDone <- struct{}{}
}
}()
}
//if config.API.Prometheus.Enable {
// var (
// listener net.Listener
// )
//
// if config.API.Prometheus.AutoSetPorts {
// listener, prometheusPort, err = getAutoPort()
// if err != nil {
// return err
// }
//
// etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient()
//
// _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort)))
// if err != nil {
// return errs.WrapMsg(err, "etcd put err")
// }
// } else {
// prometheusPort, err = datautil.GetElemByIndex(config.API.Prometheus.Ports, index)
// if err != nil {
// return err
// }
// listener, err = net.Listen("tcp", fmt.Sprintf(":%d", prometheusPort))
// if err != nil {
// return errs.WrapMsg(err, "listen err", "addr", fmt.Sprintf(":%d", prometheusPort))
// }
// }
//
// go func() {
// if err := prommetrics.ApiInit(listener); err != nil && !errors.Is(err, http.ErrServerClosed) {
// netErr = errs.WrapMsg(err, fmt.Sprintf("api prometheus start err: %d", prometheusPort))
// netDone <- struct{}{}
// }
// }()
//
//}
address := net.JoinHostPort(network.GetListenIP(config.API.Api.ListenIP), strconv.Itoa(apiPort))
server := http.Server{Addr: address, Handler: router}

View File

@ -47,6 +47,8 @@ func NewOnlinePusher(disCov discovery.Conn, config *Config) OnlinePusher {
switch config.Discovery.Enable {
case conf.ETCD:
return NewDefaultAllNode(disCov, config)
case conf.Standalone:
return nil
default:
log.ZWarn(context.Background(), "NewOnlinePusher is error", errs.Wrap(errors.New("unsupported discovery type")), "type", config.Discovery.Enable)
return nil

View File

@ -127,7 +127,6 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf
}
etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient()
_, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(rpcRegisterName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort)))
if err != nil {
return errs.WrapMsg(err, "etcd put err")
@ -176,7 +175,17 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf
return err
}
// todo
//err = client.Register(
// ctx,
// "prometheus"+rpcRegisterName,
// registerIP,
// port,
// grpc.WithTransportCredentials(insecure.NewCredentials()),
//)
err = client.Register(
ctx,
rpcRegisterName,
registerIP,
port,