diff --git a/cmd/main.go b/cmd/main.go index 40bce080d..a2afb11fa 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -53,9 +53,9 @@ func main() { putCmd1(cmd, false, third.Start) putCmd1(cmd, false, user.Start) putCmd1(cmd, false, push.Start) - putCmd3(cmd, true, msggateway.Start) - putCmd2(cmd, true, msgtransfer.Start) - putCmd2(cmd, true, api.Start) + putCmd1(cmd, true, msggateway.Start) + putCmd1(cmd, true, msgtransfer.Start) + putCmd1(cmd, true, api.Start) ctx := context.Background() if err := cmd.run(ctx); err != nil { fmt.Println(err) @@ -231,22 +231,23 @@ func putCmd1[C any](cmd *cmds, block bool, fn func(ctx context.Context, config * }) } -func putCmd2[C any](cmd *cmds, block bool, fn func(ctx context.Context, index int, config *C) error) { - cmd.add(getFuncPacketName(fn), block, func(ctx context.Context) error { - var conf C - if err := cmd.parseConf(&conf); err != nil { - return err - } - return fn(ctx, 0, &conf) - }) -} - -func putCmd3[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar, index int) error) { - cmd.add(getFuncPacketName(fn), block, func(ctx context.Context) error { - var conf C - if err := cmd.parseConf(&conf); err != nil { - return err - } - return fn(ctx, &conf, standalone.GetDiscoveryConn(), standalone.GetServiceRegistrar(), 0) - }) -} +// +//func putCmd2[C any](cmd *cmds, block bool, fn func(ctx context.Context, index int, config *C) error) { +// cmd.add(getFuncPacketName(fn), block, func(ctx context.Context) error { +// var conf C +// if err := cmd.parseConf(&conf); err != nil { +// return err +// } +// return fn(ctx, 0, &conf) +// }) +//} +// +//func putCmd3[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar, index int) error) { +// cmd.add(getFuncPacketName(fn), block, func(ctx context.Context) error { +// var conf C +// if err := cmd.parseConf(&conf); err != nil { +// return err +// } +// return fn(ctx, &conf, standalone.GetDiscoveryConn(), standalone.GetServiceRegistrar(), 0) +// }) +//} diff --git a/internal/api/init.go b/internal/api/init.go index 5c3a25224..0222ebf9f 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -29,6 +29,7 @@ import ( conf "github.com/openimsdk/open-im-server/v3/pkg/common/config" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery" disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" + "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" @@ -45,21 +46,22 @@ type Config struct { conf.AllConfig ConfigPath conf.Path + Index conf.Index } -func Start(ctx context.Context, index int, config *Config) error { - apiPort, err := datautil.GetElemByIndex(config.API.Api.Ports, index) +func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { + apiPort, err := datautil.GetElemByIndex(config.API.Api.Ports, int(config.Index)) if err != nil { return err } - client, err := kdisc.NewDiscoveryRegister(&config.Discovery, []string{ - config.Discovery.RpcService.MessageGateway, - }) - if err != nil { - return errs.WrapMsg(err, "failed to register discovery service") - } - client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) + //client, err := kdisc.NewDiscoveryRegister(&config.Discovery, []string{ + // config.Discovery.RpcService.MessageGateway, + //}) + //if err != nil { + // return errs.WrapMsg(err, "failed to register discovery service") + //} + //client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) var ( netDone = make(chan struct{}, 1) diff --git a/internal/api/prometheus_discovery.go b/internal/api/prometheus_discovery.go index 5e1a9cae2..2516d0d14 100644 --- a/internal/api/prometheus_discovery.go +++ b/internal/api/prometheus_discovery.go @@ -20,7 +20,7 @@ type PrometheusDiscoveryApi struct { client *clientv3.Client } -func NewPrometheusDiscoveryApi(config *Config, client discovery.SvcDiscoveryRegistry) *PrometheusDiscoveryApi { +func NewPrometheusDiscoveryApi(config *Config, client discovery.Conn) *PrometheusDiscoveryApi { api := &PrometheusDiscoveryApi{ config: config, } diff --git a/internal/api/router.go b/internal/api/router.go index d2f5d2c61..9febaade0 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -52,7 +52,7 @@ func prommetricsGin() gin.HandlerFunc { } } -func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cfg *Config) (*gin.Engine, error) { +func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin.Engine, error) { authConn, err := client.GetConn(ctx, cfg.Discovery.RpcService.Auth) if err != nil { return nil, err diff --git a/internal/api/user.go b/internal/api/user.go index a88f8f65a..6427e222e 100644 --- a/internal/api/user.go +++ b/internal/api/user.go @@ -29,11 +29,11 @@ import ( type UserApi struct { Client user.UserClient - discov discovery.SvcDiscoveryRegistry + discov discovery.Conn config config.RpcService } -func NewUserApi(client user.UserClient, discov discovery.SvcDiscoveryRegistry, config config.RpcService) UserApi { +func NewUserApi(client user.UserClient, discov discovery.Conn, config config.RpcService) UserApi { return UserApi{Client: client, discov: discov, config: config} } diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 916dd0385..6c6960220 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -35,14 +35,15 @@ type Config struct { RedisConfig config.Redis WebhooksConfig config.Webhooks Discovery config.Discovery + Index config.Index } // Start run ws server. -func Start(ctx context.Context, conf *Config, client discovery.Conn, server grpc.ServiceRegistrar, index int) error { +func Start(ctx context.Context, conf *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { log.CInfo(ctx, "MSG-GATEWAY server is initializing", "runtimeEnv", runtimeenv.RuntimeEnvironment(), "rpcPorts", conf.MsgGateway.RPC.Ports, "wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports) - wsPort, err := datautil.GetElemByIndex(conf.MsgGateway.LongConnSvr.Ports, index) + wsPort, err := datautil.GetElemByIndex(conf.MsgGateway.LongConnSvr.Ports, int(conf.Index)) if err != nil { return err } @@ -71,11 +72,6 @@ func Start(ctx context.Context, conf *Config, client discovery.Conn, server grpc go longServer.ChangeOnlineStatus(4) - //netDone := make(chan error) - //go func() { - // err = hubServer.Start(ctx, index, conf) - // netDone <- err - //}() return hubServer.LongConnServer.Run(ctx) } diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index bf1c98f45..3c73b5032 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -42,14 +42,11 @@ import ( "github.com/openimsdk/tools/utils/runtimeenv" conf "github.com/openimsdk/open-im-server/v3/pkg/common/config" - discRegister "github.com/openimsdk/open-im-server/v3/pkg/common/discovery" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/mw" "github.com/openimsdk/tools/system/program" "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" ) type MsgTransfer struct { @@ -73,14 +70,14 @@ type Config struct { Share conf.Share WebhooksConfig conf.Webhooks Discovery conf.Discovery + Index conf.Index } -func Start(ctx context.Context, index int, config *Config) error { - +func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { builder := mqbuild.NewBuilder(&config.Discovery, &config.KafkaConfig) log.CInfo(ctx, "MSG-TRANSFER server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "prometheusPorts", - config.MsgTransfer.Prometheus.Ports, "index", index) + config.MsgTransfer.Prometheus.Ports, "index", config.Index) mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) if err != nil { @@ -90,12 +87,12 @@ func Start(ctx context.Context, index int, config *Config) error { if err != nil { return err } - client, err := discRegister.NewDiscoveryRegister(&config.Discovery, nil) - if err != nil { - return err - } - client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) + //client, err := discRegister.NewDiscoveryRegister(&config.Discovery, nil) + //if err != nil { + // return err + //} + //client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), + // grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) if config.Discovery.Enable == conf.ETCD { cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), []string{ @@ -158,10 +155,10 @@ func Start(ctx context.Context, index int, config *Config) error { historyMongoHandler: historyMongoHandler, } - return msgTransfer.Start(index, config, client) + return msgTransfer.Start(int(config.Index), config, client) } -func (m *MsgTransfer) Start(index int, config *Config, client discovery.SvcDiscoveryRegistry) error { +func (m *MsgTransfer) Start(index int, config *Config, client discovery.Conn) error { m.ctx, m.cancel = context.WithCancel(context.Background()) var ( netDone = make(chan struct{}, 1) diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index bbf204b47..a2d0cca67 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -78,7 +78,7 @@ type ConsumerMessage struct { Value []byte } -func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery.SvcDiscoveryRegistry, config *Config, database controller.MsgTransferDatabase) (*OnlineHistoryRedisConsumerHandler, error) { +func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery.Conn, config *Config, database controller.MsgTransferDatabase) (*OnlineHistoryRedisConsumerHandler, error) { groupConn, err := client.GetConn(ctx, config.Discovery.RpcService.Group) if err != nil { return nil, err diff --git a/pkg/common/cmd/msg_gateway.go b/pkg/common/cmd/msg_gateway.go index 3f66ea720..362362927 100644 --- a/pkg/common/cmd/msg_gateway.go +++ b/pkg/common/cmd/msg_gateway.go @@ -19,6 +19,7 @@ import ( "github.com/openimsdk/open-im-server/v3/internal/msggateway" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" @@ -55,5 +56,22 @@ func (m *MsgGatewayCmd) Exec() error { } func (m *MsgGatewayCmd) runE() error { - return msggateway.Start(m.ctx, m.Index(), m.msgGatewayConfig) + m.msgGatewayConfig.Index = config.Index(m.Index()) + //return msggateway.Start(m.ctx, m.msgGatewayConfig, "", "") + //return msggateway.Start(m.ctx, m.msgGatewayConfig, "", "") + var prometheus config.Prometheus + rpc := m.msgGatewayConfig.MsgGateway.RPC + return startrpc.Start( + m.ctx, &m.msgGatewayConfig.Discovery, + &prometheus, + rpc.ListenIP, rpc.RegisterIP, + rpc.AutoSetPorts, + rpc.Ports, int(m.msgGatewayConfig.Index), + m.msgGatewayConfig.Discovery.RpcService.MessageGateway, + "", + m.msgGatewayConfig.MsgGateway, + []string{}, + []string{}, + msggateway.Start, + ) } diff --git a/pkg/common/cmd/msg_transfer.go b/pkg/common/cmd/msg_transfer.go index fbb83c65f..0b21f5756 100644 --- a/pkg/common/cmd/msg_transfer.go +++ b/pkg/common/cmd/msg_transfer.go @@ -56,5 +56,6 @@ func (m *MsgTransferCmd) Exec() error { } func (m *MsgTransferCmd) runE() error { + m.msgTransferConfig.Index = config.Index(m.Index()) return msgtransfer.Start(m.ctx, m.Index(), m.msgTransferConfig) } diff --git a/pkg/common/cmd/push.go b/pkg/common/cmd/push.go index c4ae84952..f9f5beb70 100644 --- a/pkg/common/cmd/push.go +++ b/pkg/common/cmd/push.go @@ -48,7 +48,7 @@ func NewPushRpcCmd() *PushRpcCmd { ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", version.Version) ret.Command.RunE = func(cmd *cobra.Command, args []string) error { - ret.pushConfig.FcmConfigPath = ret.ConfigPath() + ret.pushConfig.FcmConfigPath = config.Path(ret.ConfigPath()) return ret.runE() } return ret diff --git a/pkg/common/cmd/root.go b/pkg/common/cmd/root.go index 0a405fb6e..c3f58b8c9 100644 --- a/pkg/common/cmd/root.go +++ b/pkg/common/cmd/root.go @@ -12,7 +12,6 @@ import ( "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/utils/runtimeenv" "github.com/spf13/cobra" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -86,14 +85,12 @@ func (r *RootCmd) initEtcd() error { return err } disConfig := config.Discovery{} - env := runtimeenv.PrintRuntimeEnvironment() - err = config.Load(configDirectory, config.DiscoveryConfigFilename, config.EnvPrefixMap[config.DiscoveryConfigFilename], - env, &disConfig) + err = config.Load(configDirectory, config.DiscoveryConfigFilename, config.EnvPrefixMap[config.DiscoveryConfigFilename], &disConfig) if err != nil { return err } if disConfig.Enable == config.ETCD { - discov, _ := kdisc.NewDiscoveryRegister(&disConfig, env, nil) + discov, _ := kdisc.NewDiscoveryRegister(&disConfig, nil) r.etcdClient = discov.(*etcd.SvcDiscoveryRegistryImpl).GetClient() } return nil @@ -125,18 +122,16 @@ func (r *RootCmd) initializeConfiguration(cmd *cobra.Command, opts *CmdOpts) err return err } - runtimeEnv := runtimeenv.PrintRuntimeEnvironment() - // Load common configuration file //opts.configMap[ShareFileName] = StructEnvPrefix{EnvPrefix: shareEnvPrefix, ConfigStruct: &r.share} for configFileName, configStruct := range opts.configMap { - err := config.Load(configDirectory, configFileName, config.EnvPrefixMap[configFileName], runtimeEnv, configStruct) + err := config.Load(configDirectory, configFileName, config.EnvPrefixMap[configFileName], configStruct) if err != nil { return err } } // Load common log configuration file - return config.Load(configDirectory, config.LogConfigFileName, config.EnvPrefixMap[config.LogConfigFileName], runtimeEnv, &r.log) + return config.Load(configDirectory, config.LogConfigFileName, config.EnvPrefixMap[config.LogConfigFileName], &r.log) } func (r *RootCmd) updateConfigFromEtcd(opts *CmdOpts) error { diff --git a/pkg/common/cmd/start.go b/pkg/common/cmd/start.go new file mode 100644 index 000000000..1ac6d1c6f --- /dev/null +++ b/pkg/common/cmd/start.go @@ -0,0 +1,40 @@ +package cmd + +// +//type StartFunc[C any] func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar) error +// +//func Start[C any](fn StartFunc[C]) { +// var _ RootCmd +// cmd := cobra.Command{ +// Use: "Start openIM application", +// Long: fmt.Sprintf(`Start %s `, program.GetProcessName()), +// PersistentPreRunE: func(cmd *cobra.Command, args []string) error { +// return rootCmd.persistentPreRun(cmd, opts...) +// }, +// SilenceUsage: true, +// SilenceErrors: false, +// } +// cmd.Flags().StringP(config.FlagConf, "c", "", "path of config directory") +// cmd.Flags().IntP(config.FlagTransferIndex, "i", 0, "process startup sequence number") +// +// +// +//} +// +//func start[C any](fn StartFunc[C]) error { +// +// +// v := viper.New() +// v.SetConfigType("yaml") +// if err := v.ReadConfig(bytes.NewReader(confData)); err != nil { +// return err +// } +// fn := func(conf *mapstructure.DecoderConfig) { +// conf.TagName = config.StructTagName +// } +// if err := v.Unmarshal(val, fn); err != nil { +// return err +// } +// +// return nil +//} diff --git a/pkg/common/cmd/third.go b/pkg/common/cmd/third.go index e567234e4..d5f066d1b 100644 --- a/pkg/common/cmd/third.go +++ b/pkg/common/cmd/third.go @@ -22,6 +22,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" + "github.com/openimsdk/tools/utils/network" "github.com/spf13/cobra" ) diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 1a87fe1d6..f74fe2440 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -32,6 +32,8 @@ const StructTagName = "yaml" type Path string +type Index int + type CacheConfig struct { Topic string `yaml:"topic"` SlotNum int `yaml:"slotNum"` @@ -181,11 +183,7 @@ type Prometheus struct { } type MsgGateway struct { - RPC struct { - RegisterIP string `yaml:"registerIP"` - AutoSetPorts bool `yaml:"autoSetPorts"` - Ports []int `yaml:"ports"` - } `yaml:"rpc"` + RPC RPC `yaml:"rpc"` Prometheus Prometheus `yaml:"prometheus"` ListenIP string `yaml:"listenIP"` LongConnSvr struct { @@ -205,12 +203,7 @@ type MsgTransfer struct { } type Push struct { - RPC struct { - RegisterIP string `yaml:"registerIP"` - ListenIP string `yaml:"listenIP"` - AutoSetPorts bool `yaml:"autoSetPorts"` - Ports []int `yaml:"ports"` - } `yaml:"rpc"` + RPC RPC `yaml:"rpc"` Prometheus Prometheus `yaml:"prometheus"` MaxConcurrentWorkers int `yaml:"maxConcurrentWorkers"` Enable string `yaml:"enable"` @@ -241,12 +234,7 @@ type Push struct { } type Auth struct { - RPC struct { - RegisterIP string `yaml:"registerIP"` - ListenIP string `yaml:"listenIP"` - AutoSetPorts bool `yaml:"autoSetPorts"` - Ports []int `yaml:"ports"` - } `yaml:"rpc"` + RPC RPC `yaml:"rpc"` Prometheus Prometheus `yaml:"prometheus"` TokenPolicy struct { Expire int64 `yaml:"expire"` @@ -254,54 +242,29 @@ type Auth struct { } type Conversation struct { - RPC struct { - RegisterIP string `yaml:"registerIP"` - ListenIP string `yaml:"listenIP"` - AutoSetPorts bool `yaml:"autoSetPorts"` - Ports []int `yaml:"ports"` - } `yaml:"rpc"` + RPC RPC `yaml:"rpc"` Prometheus Prometheus `yaml:"prometheus"` } type Friend struct { - RPC struct { - RegisterIP string `yaml:"registerIP"` - ListenIP string `yaml:"listenIP"` - AutoSetPorts bool `yaml:"autoSetPorts"` - Ports []int `yaml:"ports"` - } `yaml:"rpc"` + RPC RPC `yaml:"rpc"` Prometheus Prometheus `yaml:"prometheus"` } type Group struct { - RPC struct { - RegisterIP string `yaml:"registerIP"` - ListenIP string `yaml:"listenIP"` - AutoSetPorts bool `yaml:"autoSetPorts"` - Ports []int `yaml:"ports"` - } `yaml:"rpc"` + RPC RPC `yaml:"rpc"` Prometheus Prometheus `yaml:"prometheus"` EnableHistoryForNewMembers bool `yaml:"enableHistoryForNewMembers"` } type Msg struct { - RPC struct { - RegisterIP string `yaml:"registerIP"` - ListenIP string `yaml:"listenIP"` - AutoSetPorts bool `yaml:"autoSetPorts"` - Ports []int `yaml:"ports"` - } `yaml:"rpc"` + RPC RPC `yaml:"rpc"` Prometheus Prometheus `yaml:"prometheus"` FriendVerify bool `yaml:"friendVerify"` } type Third struct { - RPC struct { - RegisterIP string `yaml:"registerIP"` - ListenIP string `yaml:"listenIP"` - AutoSetPorts bool `yaml:"autoSetPorts"` - Ports []int `yaml:"ports"` - } `yaml:"rpc"` + RPC RPC `yaml:"rpc"` Prometheus Prometheus `yaml:"prometheus"` Object struct { Enable string `yaml:"enable"` @@ -348,15 +311,17 @@ type Aws struct { } type User struct { - RPC struct { - RegisterIP string `yaml:"registerIP"` - ListenIP string `yaml:"listenIP"` - AutoSetPorts bool `yaml:"autoSetPorts"` - Ports []int `yaml:"ports"` - } `yaml:"rpc"` + RPC RPC `yaml:"rpc"` Prometheus Prometheus `yaml:"prometheus"` } +type RPC struct { + RegisterIP string `yaml:"registerIP"` + ListenIP string `yaml:"listenIP"` + AutoSetPorts bool `yaml:"autoSetPorts"` + Ports []int `yaml:"ports"` +} + type Redis struct { Address []string `yaml:"address"` Username string `yaml:"username"` diff --git a/pkg/common/startrpc/doc.go b/pkg/common/startrpc/doc.go deleted file mode 100644 index fce7309f4..000000000 --- a/pkg/common/startrpc/doc.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright © 2024 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package startrpc // import "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 5c27b2f87..8412e46b7 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -16,10 +16,8 @@ package startrpc import ( "context" - "errors" "fmt" "net" - "net/http" "os" "os/signal" "strconv" @@ -27,10 +25,8 @@ import ( "time" conf "github.com/openimsdk/open-im-server/v3/pkg/common/config" - disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" - "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/utils/datautil" - "github.com/openimsdk/tools/utils/jsonutil" + "github.com/openimsdk/tools/utils/network" "google.golang.org/grpc/status" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery" @@ -39,12 +35,10 @@ import ( "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mw" - "github.com/openimsdk/tools/utils/network" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) -// Start rpc server. func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP, registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T, watchConfigNames []string, watchServiceNames []string, @@ -59,182 +53,142 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf return nil } - watchConfigNames = append(watchConfigNames, conf.LogConfigFileName) - var ( - rpcTcpAddr string - netDone = make(chan struct{}, 2) - netErr error - prometheusPort int - ) + options = append(options, mw.GrpcServer()) registerIP, err := network.GetRpcRegisterIP(registerIP) if err != nil { return err } - - if !autoSetPorts { + var ( + rpcListenAddr string + prometheusListenAddr string + ) + if autoSetPorts { + rpcListenAddr = net.JoinHostPort(listenIP, "0") + prometheusListenAddr = net.JoinHostPort("", "0") + } else { rpcPort, err := datautil.GetElemByIndex(rpcPorts, index) if err != nil { return err } - rpcTcpAddr = net.JoinHostPort(network.GetListenIP(listenIP), strconv.Itoa(rpcPort)) - } else { - rpcTcpAddr = net.JoinHostPort(network.GetListenIP(listenIP), "0") - } - - getAutoPort := func() (net.Listener, int, error) { - listener, err := net.Listen("tcp", rpcTcpAddr) + prometheusPort, err := datautil.GetElemByIndex(prometheusConfig.Ports, index) if err != nil { - return nil, 0, errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr) + return err } - _, portStr, _ := net.SplitHostPort(listener.Addr().String()) - port, _ := strconv.Atoi(portStr) - return listener, port, nil + rpcListenAddr = net.JoinHostPort(listenIP, strconv.Itoa(rpcPort)) + prometheusListenAddr = net.JoinHostPort("", strconv.Itoa(prometheusPort)) } - if autoSetPorts && discovery.Enable != conf.ETCD { - return errs.New("only etcd support autoSetPorts", "rpcRegisterName", rpcRegisterName).Wrap() - } + log.CInfo(ctx, "RPC server is initializing", "rpcRegisterName", rpcRegisterName, "rpcAddr", rpcListenAddr, "prometheusAddr", prometheusListenAddr) + + watchConfigNames = append(watchConfigNames, conf.LogConfigFileName) + client, err := kdisc.NewDiscoveryRegister(discovery, watchServiceNames) if err != nil { return err } defer client.Close() - client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) + client.AddOption( + mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")), + ) - // var reg *prometheus.Registry - // var metric *grpcprometheus.ServerMetrics - if prometheusConfig.Enable { - // cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share) - // reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics) - // options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()), - // grpc.UnaryInterceptor(metric.UnaryServerInterceptor())) + var gsrv grpcServiceRegistrar + + err = rpcFn(ctx, config, client, &gsrv) + if err != nil { + return err + } + + ctx, cancel := context.WithCancelCause(ctx) + + if prometheusListenAddr != "" { options = append( - options, mw.GrpcServer(), + options, prommetricsUnaryInterceptor(rpcRegisterName), prommetricsStreamInterceptor(rpcRegisterName), ) - - var ( - listener net.Listener - ) - - if autoSetPorts { - listener, prometheusPort, err = getAutoPort() - if err != nil { - return err - } - - etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() - _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(rpcRegisterName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) - if err != nil { - return errs.WrapMsg(err, "etcd put err") - } - } else { - prometheusPort, err = datautil.GetElemByIndex(prometheusConfig.Ports, index) - if err != nil { - return err - } - listener, err = net.Listen("tcp", fmt.Sprintf(":%d", prometheusPort)) - if err != nil { - return errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr) - } + prometheusListener, prometheusPort, err := listenTCP(prometheusListenAddr) + if err != nil { + return err } + if err := client.Register(ctx, "prometheus_"+rpcRegisterName, registerIP, prometheusPort); err != nil { + return err + } + cs := prommetrics.GetGrpcCusMetrics(rpcRegisterName, discovery) go func() { - if err := prommetrics.RpcInit(cs, listener); err != nil && !errors.Is(err, http.ErrServerClosed) { - netErr = errs.WrapMsg(err, fmt.Sprintf("rpc %s prometheus start err: %d", rpcRegisterName, prometheusPort)) - netDone <- struct{}{} + err := prommetrics.RpcInit(cs, prometheusListener) + if err == nil { + err = fmt.Errorf("serve end") } - //metric.InitializeMetrics(srv) - // Create a HTTP server for prometheus. - // httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)} - // if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { - // netErr = errs.WrapMsg(err, "prometheus start err", httpServer.Addr) - // netDone <- struct{}{} - // } + cancel(fmt.Errorf("prommetrics %s %w", rpcRegisterName, err)) }() - } else { - options = append(options, mw.GrpcServer()) } - listener, port, err := getAutoPort() - if err != nil { - return err - } + var rpcGracefulStop chan struct{} - log.CInfo(ctx, "RPC server is initializing", "rpcRegisterName", rpcRegisterName, "rpcPort", port, - "prometheusPort", prometheusPort) - - defer listener.Close() - srv := grpc.NewServer(options...) - - err = rpcFn(ctx, config, client, srv) - if err != nil { - return err - } - - // todo - //err = client.Register( - // ctx, - // "prometheus"+rpcRegisterName, - // registerIP, - // port, - // grpc.WithTransportCredentials(insecure.NewCredentials()), - //) - - err = client.Register( - ctx, - rpcRegisterName, - registerIP, - port, - grpc.WithTransportCredentials(insecure.NewCredentials()), - ) - if err != nil { - return err - } - - go func() { - err := srv.Serve(listener) - if err != nil && !errors.Is(err, http.ErrServerClosed) { - netErr = errs.WrapMsg(err, "rpc start err: ", rpcTcpAddr) - netDone <- struct{}{} + if len(gsrv.services) > 0 { + rpcListener, rpcPort, err := listenTCP(rpcListenAddr) + if err != nil { + return err } - }() + srv := grpc.NewServer(options...) - if discovery.Enable == conf.ETCD { - cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), watchConfigNames) - cm.Watch(ctx) + for _, service := range gsrv.services { + srv.RegisterService(service.desc, service.impl) + } + grpcOpt := grpc.WithTransportCredentials(insecure.NewCredentials()) + if err := client.Register(ctx, rpcRegisterName, registerIP, rpcPort, grpcOpt); err != nil { + return err + } + + rpcGracefulStop = make(chan struct{}) + + go func() { + err := srv.Serve(rpcListener) + if err == nil { + err = fmt.Errorf("serve end") + } + cancel(fmt.Errorf("rpc %s %w", rpcRegisterName, err)) + }() + + go func() { + <-ctx.Done() + srv.GracefulStop() + close(rpcGracefulStop) + }() } sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGTERM) + select { - case <-sigs: - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - if err := gracefulStopWithCtx(ctx, srv.GracefulStop); err != nil { - return err - } - return nil - case <-netDone: - return netErr + case val := <-sigs: + log.ZDebug(ctx, "recv exit", "signal", val.String()) + cancel(fmt.Errorf("signal %s", val.String())) + case <-ctx.Done(): } + if rpcGracefulStop != nil { + timeout := time.NewTimer(time.Second * 15) + defer timeout.Stop() + select { + case <-timeout.C: + log.ZWarn(ctx, "rcp graceful stop timeout", nil) + case <-rpcGracefulStop: + log.ZDebug(ctx, "rcp graceful stop done") + } + } + return context.Cause(ctx) } -func gracefulStopWithCtx(ctx context.Context, f func()) error { - done := make(chan struct{}, 1) - go func() { - f() - close(done) - }() - select { - case <-ctx.Done(): - return errs.New("timeout, ctx graceful stop") - case <-done: - return nil +func listenTCP(addr string) (net.Listener, int, error) { + listener, err := net.Listen("tcp", addr) + if err != nil { + return nil, 0, errs.WrapMsg(err, "listen err", "addr", addr) } + return listener, listener.Addr().(*net.TCPAddr).Port, nil } func prommetricsUnaryInterceptor(rpcRegisterName string) grpc.ServerOption { @@ -258,3 +212,19 @@ func prommetricsUnaryInterceptor(rpcRegisterName string) grpc.ServerOption { func prommetricsStreamInterceptor(rpcRegisterName string) grpc.ServerOption { return grpc.ChainStreamInterceptor() } + +type grpcService struct { + desc *grpc.ServiceDesc + impl any +} + +type grpcServiceRegistrar struct { + services []*grpcService +} + +func (x *grpcServiceRegistrar) RegisterService(desc *grpc.ServiceDesc, impl any) { + x.services = append(x.services, &grpcService{ + desc: desc, + impl: impl, + }) +}