diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index a24eba7ca..175813552 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -18,7 +18,6 @@ import ( "context" "fmt" - disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/mcache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" @@ -26,7 +25,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/dbbuild" "github.com/openimsdk/open-im-server/v3/pkg/mqbuild" "github.com/openimsdk/tools/discovery" - "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/mq" "github.com/openimsdk/tools/utils/runtimeenv" @@ -75,19 +73,19 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr return err } - if config.Discovery.Enable == conf.ETCD { - cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), []string{ - config.MsgTransfer.GetConfigFileName(), - config.RedisConfig.GetConfigFileName(), - config.MongodbConfig.GetConfigFileName(), - config.KafkaConfig.GetConfigFileName(), - config.Share.GetConfigFileName(), - config.WebhooksConfig.GetConfigFileName(), - config.Discovery.GetConfigFileName(), - conf.LogConfigFileName, - }) - cm.Watch(ctx) - } + //if config.Discovery.Enable == conf.ETCD { + // cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), []string{ + // config.MsgTransfer.GetConfigFileName(), + // config.RedisConfig.GetConfigFileName(), + // config.MongodbConfig.GetConfigFileName(), + // config.KafkaConfig.GetConfigFileName(), + // config.Share.GetConfigFileName(), + // config.WebhooksConfig.GetConfigFileName(), + // config.Discovery.GetConfigFileName(), + // conf.LogConfigFileName, + // }) + // cm.Watch(ctx) + //} mongoProducer, err := builder.GetTopicProducer(ctx, config.KafkaConfig.ToMongoTopic) if err != nil { return err diff --git a/pkg/common/cmd/cron_task.go b/pkg/common/cmd/cron_task.go index c90bce174..af2cbbee9 100644 --- a/pkg/common/cmd/cron_task.go +++ b/pkg/common/cmd/cron_task.go @@ -58,7 +58,7 @@ func (a *CronTaskCmd) runE() error { a.ctx, &a.cronTaskConfig.Discovery, &prometheus, "", "", - false, + true, nil, 0, "", nil, diff --git a/pkg/common/cmd/msg_transfer.go b/pkg/common/cmd/msg_transfer.go index 3dc4c80b1..9411a2cd0 100644 --- a/pkg/common/cmd/msg_transfer.go +++ b/pkg/common/cmd/msg_transfer.go @@ -63,9 +63,9 @@ func (m *MsgTransferCmd) runE() error { m.ctx, &m.msgTransferConfig.Discovery, &prometheus, "", "", - false, + true, nil, int(m.msgTransferConfig.Index), - m.msgTransferConfig.Discovery.RpcService.MessageGateway, + "", nil, m.msgTransferConfig, []string{}, diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index d39d4862d..a69edae20 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -61,28 +61,17 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c if err != nil { return err } - var ( - rpcListenAddr string - prometheusListenAddr string - ) + var prometheusListenAddr string if autoSetPorts { - rpcListenAddr = net.JoinHostPort(listenIP, "0") prometheusListenAddr = net.JoinHostPort(listenIP, "0") } else { - rpcPort, err := datautil.GetElemByIndex(rpcPorts, index) - if err != nil { - return err - } prometheusPort, err := datautil.GetElemByIndex(prometheusConfig.Ports, index) if err != nil { return err } - rpcListenAddr = net.JoinHostPort(listenIP, strconv.Itoa(rpcPort)) prometheusListenAddr = net.JoinHostPort(listenIP, strconv.Itoa(prometheusPort)) } - log.CInfo(ctx, "RPC server is initializing", "rpcRegisterName", rpcRegisterName, "rpcAddr", rpcListenAddr, "prometheusAddr", prometheusListenAddr) - watchConfigNames = append(watchConfigNames, conf.LogConfigFileName) client, err := kdisc.NewDiscoveryRegister(disc, watchServiceNames) @@ -149,6 +138,17 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c rpcServer.RegisterService(desc, impl) return } + var rpcListenAddr string + if autoSetPorts { + rpcListenAddr = net.JoinHostPort(listenIP, "0") + } else { + rpcPort, err := datautil.GetElemByIndex(rpcPorts, index) + if err != nil { + cancel(fmt.Errorf("rpcPorts index out of range %s %w", rpcRegisterName, err)) + return + } + rpcListenAddr = net.JoinHostPort(listenIP, strconv.Itoa(rpcPort)) + } rpcListener, err := net.Listen("tcp", rpcListenAddr) if err != nil { cancel(fmt.Errorf("listen rpc %s %s %w", rpcRegisterName, rpcListenAddr, err)) @@ -186,6 +186,7 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c return err } <-ctx.Done() + log.ZDebug(ctx, "cmd wait done", "err", context.Cause(ctx)) if rpcGracefulStop != nil { timeout := time.NewTimer(time.Second * 15) defer timeout.Stop()