This commit is contained in:
withchao 2025-02-08 14:52:56 +08:00
parent 1f337b416c
commit 585e56783b
17 changed files with 247 additions and 276 deletions

View File

@ -53,9 +53,9 @@ func main() {
putCmd1(cmd, false, third.Start)
putCmd1(cmd, false, user.Start)
putCmd1(cmd, false, push.Start)
putCmd3(cmd, true, msggateway.Start)
putCmd2(cmd, true, msgtransfer.Start)
putCmd2(cmd, true, api.Start)
putCmd1(cmd, true, msggateway.Start)
putCmd1(cmd, true, msgtransfer.Start)
putCmd1(cmd, true, api.Start)
ctx := context.Background()
if err := cmd.run(ctx); err != nil {
fmt.Println(err)
@ -231,22 +231,23 @@ func putCmd1[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *
})
}
func putCmd2[C any](cmd *cmds, block bool, fn func(ctx context.Context, index int, config *C) error) {
cmd.add(getFuncPacketName(fn), block, func(ctx context.Context) error {
var conf C
if err := cmd.parseConf(&conf); err != nil {
return err
}
return fn(ctx, 0, &conf)
})
}
func putCmd3[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar, index int) error) {
cmd.add(getFuncPacketName(fn), block, func(ctx context.Context) error {
var conf C
if err := cmd.parseConf(&conf); err != nil {
return err
}
return fn(ctx, &conf, standalone.GetDiscoveryConn(), standalone.GetServiceRegistrar(), 0)
})
}
//
//func putCmd2[C any](cmd *cmds, block bool, fn func(ctx context.Context, index int, config *C) error) {
// cmd.add(getFuncPacketName(fn), block, func(ctx context.Context) error {
// var conf C
// if err := cmd.parseConf(&conf); err != nil {
// return err
// }
// return fn(ctx, 0, &conf)
// })
//}
//
//func putCmd3[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar, index int) error) {
// cmd.add(getFuncPacketName(fn), block, func(ctx context.Context) error {
// var conf C
// if err := cmd.parseConf(&conf); err != nil {
// return err
// }
// return fn(ctx, &conf, standalone.GetDiscoveryConn(), standalone.GetServiceRegistrar(), 0)
// })
//}

View File

@ -29,6 +29,7 @@ import (
conf "github.com/openimsdk/open-im-server/v3/pkg/common/config"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery"
disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
@ -45,21 +46,22 @@ type Config struct {
conf.AllConfig
ConfigPath conf.Path
Index conf.Index
}
func Start(ctx context.Context, index int, config *Config) error {
apiPort, err := datautil.GetElemByIndex(config.API.Api.Ports, index)
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
apiPort, err := datautil.GetElemByIndex(config.API.Api.Ports, int(config.Index))
if err != nil {
return err
}
client, err := kdisc.NewDiscoveryRegister(&config.Discovery, []string{
config.Discovery.RpcService.MessageGateway,
})
if err != nil {
return errs.WrapMsg(err, "failed to register discovery service")
}
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
//client, err := kdisc.NewDiscoveryRegister(&config.Discovery, []string{
// config.Discovery.RpcService.MessageGateway,
//})
//if err != nil {
// return errs.WrapMsg(err, "failed to register discovery service")
//}
//client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
var (
netDone = make(chan struct{}, 1)

View File

@ -20,7 +20,7 @@ type PrometheusDiscoveryApi struct {
client *clientv3.Client
}
func NewPrometheusDiscoveryApi(config *Config, client discovery.SvcDiscoveryRegistry) *PrometheusDiscoveryApi {
func NewPrometheusDiscoveryApi(config *Config, client discovery.Conn) *PrometheusDiscoveryApi {
api := &PrometheusDiscoveryApi{
config: config,
}

View File

@ -52,7 +52,7 @@ func prommetricsGin() gin.HandlerFunc {
}
}
func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cfg *Config) (*gin.Engine, error) {
func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin.Engine, error) {
authConn, err := client.GetConn(ctx, cfg.Discovery.RpcService.Auth)
if err != nil {
return nil, err

View File

@ -29,11 +29,11 @@ import (
type UserApi struct {
Client user.UserClient
discov discovery.SvcDiscoveryRegistry
discov discovery.Conn
config config.RpcService
}
func NewUserApi(client user.UserClient, discov discovery.SvcDiscoveryRegistry, config config.RpcService) UserApi {
func NewUserApi(client user.UserClient, discov discovery.Conn, config config.RpcService) UserApi {
return UserApi{Client: client, discov: discov, config: config}
}

View File

@ -35,14 +35,15 @@ type Config struct {
RedisConfig config.Redis
WebhooksConfig config.Webhooks
Discovery config.Discovery
Index config.Index
}
// Start run ws server.
func Start(ctx context.Context, conf *Config, client discovery.Conn, server grpc.ServiceRegistrar, index int) error {
func Start(ctx context.Context, conf *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
log.CInfo(ctx, "MSG-GATEWAY server is initializing", "runtimeEnv", runtimeenv.RuntimeEnvironment(),
"rpcPorts", conf.MsgGateway.RPC.Ports,
"wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports)
wsPort, err := datautil.GetElemByIndex(conf.MsgGateway.LongConnSvr.Ports, index)
wsPort, err := datautil.GetElemByIndex(conf.MsgGateway.LongConnSvr.Ports, int(conf.Index))
if err != nil {
return err
}
@ -71,11 +72,6 @@ func Start(ctx context.Context, conf *Config, client discovery.Conn, server grpc
go longServer.ChangeOnlineStatus(4)
//netDone := make(chan error)
//go func() {
// err = hubServer.Start(ctx, index, conf)
// netDone <- err
//}()
return hubServer.LongConnServer.Run(ctx)
}

View File

@ -42,14 +42,11 @@ import (
"github.com/openimsdk/tools/utils/runtimeenv"
conf "github.com/openimsdk/open-im-server/v3/pkg/common/config"
discRegister "github.com/openimsdk/open-im-server/v3/pkg/common/discovery"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mw"
"github.com/openimsdk/tools/system/program"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
type MsgTransfer struct {
@ -73,14 +70,14 @@ type Config struct {
Share conf.Share
WebhooksConfig conf.Webhooks
Discovery conf.Discovery
Index conf.Index
}
func Start(ctx context.Context, index int, config *Config) error {
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
builder := mqbuild.NewBuilder(&config.Discovery, &config.KafkaConfig)
log.CInfo(ctx, "MSG-TRANSFER server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "prometheusPorts",
config.MsgTransfer.Prometheus.Ports, "index", index)
config.MsgTransfer.Prometheus.Ports, "index", config.Index)
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
if err != nil {
@ -90,12 +87,12 @@ func Start(ctx context.Context, index int, config *Config) error {
if err != nil {
return err
}
client, err := discRegister.NewDiscoveryRegister(&config.Discovery, nil)
if err != nil {
return err
}
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
//client, err := discRegister.NewDiscoveryRegister(&config.Discovery, nil)
//if err != nil {
// return err
//}
//client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()),
// grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
if config.Discovery.Enable == conf.ETCD {
cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), []string{
@ -158,10 +155,10 @@ func Start(ctx context.Context, index int, config *Config) error {
historyMongoHandler: historyMongoHandler,
}
return msgTransfer.Start(index, config, client)
return msgTransfer.Start(int(config.Index), config, client)
}
func (m *MsgTransfer) Start(index int, config *Config, client discovery.SvcDiscoveryRegistry) error {
func (m *MsgTransfer) Start(index int, config *Config, client discovery.Conn) error {
m.ctx, m.cancel = context.WithCancel(context.Background())
var (
netDone = make(chan struct{}, 1)

View File

@ -78,7 +78,7 @@ type ConsumerMessage struct {
Value []byte
}
func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery.SvcDiscoveryRegistry, config *Config, database controller.MsgTransferDatabase) (*OnlineHistoryRedisConsumerHandler, error) {
func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery.Conn, config *Config, database controller.MsgTransferDatabase) (*OnlineHistoryRedisConsumerHandler, error) {
groupConn, err := client.GetConn(ctx, config.Discovery.RpcService.Group)
if err != nil {
return nil, err

View File

@ -19,6 +19,7 @@ import (
"github.com/openimsdk/open-im-server/v3/internal/msggateway"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
@ -55,5 +56,22 @@ func (m *MsgGatewayCmd) Exec() error {
}
func (m *MsgGatewayCmd) runE() error {
return msggateway.Start(m.ctx, m.Index(), m.msgGatewayConfig)
m.msgGatewayConfig.Index = config.Index(m.Index())
//return msggateway.Start(m.ctx, m.msgGatewayConfig, "", "")
//return msggateway.Start(m.ctx, m.msgGatewayConfig, "", "")
var prometheus config.Prometheus
rpc := m.msgGatewayConfig.MsgGateway.RPC
return startrpc.Start(
m.ctx, &m.msgGatewayConfig.Discovery,
&prometheus,
rpc.ListenIP, rpc.RegisterIP,
rpc.AutoSetPorts,
rpc.Ports, int(m.msgGatewayConfig.Index),
m.msgGatewayConfig.Discovery.RpcService.MessageGateway,
"",
m.msgGatewayConfig.MsgGateway,
[]string{},
[]string{},
msggateway.Start,
)
}

View File

@ -56,5 +56,6 @@ func (m *MsgTransferCmd) Exec() error {
}
func (m *MsgTransferCmd) runE() error {
m.msgTransferConfig.Index = config.Index(m.Index())
return msgtransfer.Start(m.ctx, m.Index(), m.msgTransferConfig)
}

View File

@ -48,7 +48,7 @@ func NewPushRpcCmd() *PushRpcCmd {
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
ret.Command.RunE = func(cmd *cobra.Command, args []string) error {
ret.pushConfig.FcmConfigPath = ret.ConfigPath()
ret.pushConfig.FcmConfigPath = config.Path(ret.ConfigPath())
return ret.runE()
}
return ret

View File

@ -12,7 +12,6 @@ import (
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/runtimeenv"
"github.com/spf13/cobra"
clientv3 "go.etcd.io/etcd/client/v3"
)
@ -86,14 +85,12 @@ func (r *RootCmd) initEtcd() error {
return err
}
disConfig := config.Discovery{}
env := runtimeenv.PrintRuntimeEnvironment()
err = config.Load(configDirectory, config.DiscoveryConfigFilename, config.EnvPrefixMap[config.DiscoveryConfigFilename],
env, &disConfig)
err = config.Load(configDirectory, config.DiscoveryConfigFilename, config.EnvPrefixMap[config.DiscoveryConfigFilename], &disConfig)
if err != nil {
return err
}
if disConfig.Enable == config.ETCD {
discov, _ := kdisc.NewDiscoveryRegister(&disConfig, env, nil)
discov, _ := kdisc.NewDiscoveryRegister(&disConfig, nil)
r.etcdClient = discov.(*etcd.SvcDiscoveryRegistryImpl).GetClient()
}
return nil
@ -125,18 +122,16 @@ func (r *RootCmd) initializeConfiguration(cmd *cobra.Command, opts *CmdOpts) err
return err
}
runtimeEnv := runtimeenv.PrintRuntimeEnvironment()
// Load common configuration file
//opts.configMap[ShareFileName] = StructEnvPrefix{EnvPrefix: shareEnvPrefix, ConfigStruct: &r.share}
for configFileName, configStruct := range opts.configMap {
err := config.Load(configDirectory, configFileName, config.EnvPrefixMap[configFileName], runtimeEnv, configStruct)
err := config.Load(configDirectory, configFileName, config.EnvPrefixMap[configFileName], configStruct)
if err != nil {
return err
}
}
// Load common log configuration file
return config.Load(configDirectory, config.LogConfigFileName, config.EnvPrefixMap[config.LogConfigFileName], runtimeEnv, &r.log)
return config.Load(configDirectory, config.LogConfigFileName, config.EnvPrefixMap[config.LogConfigFileName], &r.log)
}
func (r *RootCmd) updateConfigFromEtcd(opts *CmdOpts) error {

40
pkg/common/cmd/start.go Normal file
View File

@ -0,0 +1,40 @@
package cmd
//
//type StartFunc[C any] func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar) error
//
//func Start[C any](fn StartFunc[C]) {
// var _ RootCmd
// cmd := cobra.Command{
// Use: "Start openIM application",
// Long: fmt.Sprintf(`Start %s `, program.GetProcessName()),
// PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
// return rootCmd.persistentPreRun(cmd, opts...)
// },
// SilenceUsage: true,
// SilenceErrors: false,
// }
// cmd.Flags().StringP(config.FlagConf, "c", "", "path of config directory")
// cmd.Flags().IntP(config.FlagTransferIndex, "i", 0, "process startup sequence number")
//
//
//
//}
//
//func start[C any](fn StartFunc[C]) error {
//
//
// v := viper.New()
// v.SetConfigType("yaml")
// if err := v.ReadConfig(bytes.NewReader(confData)); err != nil {
// return err
// }
// fn := func(conf *mapstructure.DecoderConfig) {
// conf.TagName = config.StructTagName
// }
// if err := v.Unmarshal(val, fn); err != nil {
// return err
// }
//
// return nil
//}

View File

@ -22,6 +22,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
"github.com/openimsdk/tools/utils/network"
"github.com/spf13/cobra"
)

View File

@ -32,6 +32,8 @@ const StructTagName = "yaml"
type Path string
type Index int
type CacheConfig struct {
Topic string `yaml:"topic"`
SlotNum int `yaml:"slotNum"`
@ -181,11 +183,7 @@ type Prometheus struct {
}
type MsgGateway struct {
RPC struct {
RegisterIP string `yaml:"registerIP"`
AutoSetPorts bool `yaml:"autoSetPorts"`
Ports []int `yaml:"ports"`
} `yaml:"rpc"`
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
ListenIP string `yaml:"listenIP"`
LongConnSvr struct {
@ -205,12 +203,7 @@ type MsgTransfer struct {
}
type Push struct {
RPC struct {
RegisterIP string `yaml:"registerIP"`
ListenIP string `yaml:"listenIP"`
AutoSetPorts bool `yaml:"autoSetPorts"`
Ports []int `yaml:"ports"`
} `yaml:"rpc"`
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
MaxConcurrentWorkers int `yaml:"maxConcurrentWorkers"`
Enable string `yaml:"enable"`
@ -241,12 +234,7 @@ type Push struct {
}
type Auth struct {
RPC struct {
RegisterIP string `yaml:"registerIP"`
ListenIP string `yaml:"listenIP"`
AutoSetPorts bool `yaml:"autoSetPorts"`
Ports []int `yaml:"ports"`
} `yaml:"rpc"`
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
TokenPolicy struct {
Expire int64 `yaml:"expire"`
@ -254,54 +242,29 @@ type Auth struct {
}
type Conversation struct {
RPC struct {
RegisterIP string `yaml:"registerIP"`
ListenIP string `yaml:"listenIP"`
AutoSetPorts bool `yaml:"autoSetPorts"`
Ports []int `yaml:"ports"`
} `yaml:"rpc"`
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
}
type Friend struct {
RPC struct {
RegisterIP string `yaml:"registerIP"`
ListenIP string `yaml:"listenIP"`
AutoSetPorts bool `yaml:"autoSetPorts"`
Ports []int `yaml:"ports"`
} `yaml:"rpc"`
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
}
type Group struct {
RPC struct {
RegisterIP string `yaml:"registerIP"`
ListenIP string `yaml:"listenIP"`
AutoSetPorts bool `yaml:"autoSetPorts"`
Ports []int `yaml:"ports"`
} `yaml:"rpc"`
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
EnableHistoryForNewMembers bool `yaml:"enableHistoryForNewMembers"`
}
type Msg struct {
RPC struct {
RegisterIP string `yaml:"registerIP"`
ListenIP string `yaml:"listenIP"`
AutoSetPorts bool `yaml:"autoSetPorts"`
Ports []int `yaml:"ports"`
} `yaml:"rpc"`
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
FriendVerify bool `yaml:"friendVerify"`
}
type Third struct {
RPC struct {
RegisterIP string `yaml:"registerIP"`
ListenIP string `yaml:"listenIP"`
AutoSetPorts bool `yaml:"autoSetPorts"`
Ports []int `yaml:"ports"`
} `yaml:"rpc"`
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
Object struct {
Enable string `yaml:"enable"`
@ -348,15 +311,17 @@ type Aws struct {
}
type User struct {
RPC struct {
RegisterIP string `yaml:"registerIP"`
ListenIP string `yaml:"listenIP"`
AutoSetPorts bool `yaml:"autoSetPorts"`
Ports []int `yaml:"ports"`
} `yaml:"rpc"`
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
}
type RPC struct {
RegisterIP string `yaml:"registerIP"`
ListenIP string `yaml:"listenIP"`
AutoSetPorts bool `yaml:"autoSetPorts"`
Ports []int `yaml:"ports"`
}
type Redis struct {
Address []string `yaml:"address"`
Username string `yaml:"username"`

View File

@ -1,15 +0,0 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package startrpc // import "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"

View File

@ -16,10 +16,8 @@ package startrpc
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"os"
"os/signal"
"strconv"
@ -27,10 +25,8 @@ import (
"time"
conf "github.com/openimsdk/open-im-server/v3/pkg/common/config"
disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd"
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/jsonutil"
"github.com/openimsdk/tools/utils/network"
"google.golang.org/grpc/status"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery"
@ -39,12 +35,10 @@ import (
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mw"
"github.com/openimsdk/tools/utils/network"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// Start rpc server.
func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP,
registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T,
watchConfigNames []string, watchServiceNames []string,
@ -59,182 +53,142 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf
return nil
}
watchConfigNames = append(watchConfigNames, conf.LogConfigFileName)
var (
rpcTcpAddr string
netDone = make(chan struct{}, 2)
netErr error
prometheusPort int
)
options = append(options, mw.GrpcServer())
registerIP, err := network.GetRpcRegisterIP(registerIP)
if err != nil {
return err
}
if !autoSetPorts {
var (
rpcListenAddr string
prometheusListenAddr string
)
if autoSetPorts {
rpcListenAddr = net.JoinHostPort(listenIP, "0")
prometheusListenAddr = net.JoinHostPort("", "0")
} else {
rpcPort, err := datautil.GetElemByIndex(rpcPorts, index)
if err != nil {
return err
}
rpcTcpAddr = net.JoinHostPort(network.GetListenIP(listenIP), strconv.Itoa(rpcPort))
} else {
rpcTcpAddr = net.JoinHostPort(network.GetListenIP(listenIP), "0")
}
getAutoPort := func() (net.Listener, int, error) {
listener, err := net.Listen("tcp", rpcTcpAddr)
prometheusPort, err := datautil.GetElemByIndex(prometheusConfig.Ports, index)
if err != nil {
return nil, 0, errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr)
return err
}
_, portStr, _ := net.SplitHostPort(listener.Addr().String())
port, _ := strconv.Atoi(portStr)
return listener, port, nil
rpcListenAddr = net.JoinHostPort(listenIP, strconv.Itoa(rpcPort))
prometheusListenAddr = net.JoinHostPort("", strconv.Itoa(prometheusPort))
}
if autoSetPorts && discovery.Enable != conf.ETCD {
return errs.New("only etcd support autoSetPorts", "rpcRegisterName", rpcRegisterName).Wrap()
}
log.CInfo(ctx, "RPC server is initializing", "rpcRegisterName", rpcRegisterName, "rpcAddr", rpcListenAddr, "prometheusAddr", prometheusListenAddr)
watchConfigNames = append(watchConfigNames, conf.LogConfigFileName)
client, err := kdisc.NewDiscoveryRegister(discovery, watchServiceNames)
if err != nil {
return err
}
defer client.Close()
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
client.AddOption(
mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")),
)
// var reg *prometheus.Registry
// var metric *grpcprometheus.ServerMetrics
if prometheusConfig.Enable {
// cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share)
// reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics)
// options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()),
// grpc.UnaryInterceptor(metric.UnaryServerInterceptor()))
var gsrv grpcServiceRegistrar
err = rpcFn(ctx, config, client, &gsrv)
if err != nil {
return err
}
ctx, cancel := context.WithCancelCause(ctx)
if prometheusListenAddr != "" {
options = append(
options, mw.GrpcServer(),
options,
prommetricsUnaryInterceptor(rpcRegisterName),
prommetricsStreamInterceptor(rpcRegisterName),
)
var (
listener net.Listener
)
if autoSetPorts {
listener, prometheusPort, err = getAutoPort()
if err != nil {
return err
}
etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient()
_, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(rpcRegisterName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort)))
if err != nil {
return errs.WrapMsg(err, "etcd put err")
}
} else {
prometheusPort, err = datautil.GetElemByIndex(prometheusConfig.Ports, index)
if err != nil {
return err
}
listener, err = net.Listen("tcp", fmt.Sprintf(":%d", prometheusPort))
if err != nil {
return errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr)
}
prometheusListener, prometheusPort, err := listenTCP(prometheusListenAddr)
if err != nil {
return err
}
if err := client.Register(ctx, "prometheus_"+rpcRegisterName, registerIP, prometheusPort); err != nil {
return err
}
cs := prommetrics.GetGrpcCusMetrics(rpcRegisterName, discovery)
go func() {
if err := prommetrics.RpcInit(cs, listener); err != nil && !errors.Is(err, http.ErrServerClosed) {
netErr = errs.WrapMsg(err, fmt.Sprintf("rpc %s prometheus start err: %d", rpcRegisterName, prometheusPort))
netDone <- struct{}{}
err := prommetrics.RpcInit(cs, prometheusListener)
if err == nil {
err = fmt.Errorf("serve end")
}
//metric.InitializeMetrics(srv)
// Create a HTTP server for prometheus.
// httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)}
// if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
// netErr = errs.WrapMsg(err, "prometheus start err", httpServer.Addr)
// netDone <- struct{}{}
// }
cancel(fmt.Errorf("prommetrics %s %w", rpcRegisterName, err))
}()
} else {
options = append(options, mw.GrpcServer())
}
listener, port, err := getAutoPort()
if err != nil {
return err
}
var rpcGracefulStop chan struct{}
log.CInfo(ctx, "RPC server is initializing", "rpcRegisterName", rpcRegisterName, "rpcPort", port,
"prometheusPort", prometheusPort)
defer listener.Close()
srv := grpc.NewServer(options...)
err = rpcFn(ctx, config, client, srv)
if err != nil {
return err
}
// todo
//err = client.Register(
// ctx,
// "prometheus"+rpcRegisterName,
// registerIP,
// port,
// grpc.WithTransportCredentials(insecure.NewCredentials()),
//)
err = client.Register(
ctx,
rpcRegisterName,
registerIP,
port,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return err
}
go func() {
err := srv.Serve(listener)
if err != nil && !errors.Is(err, http.ErrServerClosed) {
netErr = errs.WrapMsg(err, "rpc start err: ", rpcTcpAddr)
netDone <- struct{}{}
if len(gsrv.services) > 0 {
rpcListener, rpcPort, err := listenTCP(rpcListenAddr)
if err != nil {
return err
}
}()
srv := grpc.NewServer(options...)
if discovery.Enable == conf.ETCD {
cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), watchConfigNames)
cm.Watch(ctx)
for _, service := range gsrv.services {
srv.RegisterService(service.desc, service.impl)
}
grpcOpt := grpc.WithTransportCredentials(insecure.NewCredentials())
if err := client.Register(ctx, rpcRegisterName, registerIP, rpcPort, grpcOpt); err != nil {
return err
}
rpcGracefulStop = make(chan struct{})
go func() {
err := srv.Serve(rpcListener)
if err == nil {
err = fmt.Errorf("serve end")
}
cancel(fmt.Errorf("rpc %s %w", rpcRegisterName, err))
}()
go func() {
<-ctx.Done()
srv.GracefulStop()
close(rpcGracefulStop)
}()
}
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)
select {
case <-sigs:
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := gracefulStopWithCtx(ctx, srv.GracefulStop); err != nil {
return err
}
return nil
case <-netDone:
return netErr
case val := <-sigs:
log.ZDebug(ctx, "recv exit", "signal", val.String())
cancel(fmt.Errorf("signal %s", val.String()))
case <-ctx.Done():
}
if rpcGracefulStop != nil {
timeout := time.NewTimer(time.Second * 15)
defer timeout.Stop()
select {
case <-timeout.C:
log.ZWarn(ctx, "rcp graceful stop timeout", nil)
case <-rpcGracefulStop:
log.ZDebug(ctx, "rcp graceful stop done")
}
}
return context.Cause(ctx)
}
func gracefulStopWithCtx(ctx context.Context, f func()) error {
done := make(chan struct{}, 1)
go func() {
f()
close(done)
}()
select {
case <-ctx.Done():
return errs.New("timeout, ctx graceful stop")
case <-done:
return nil
func listenTCP(addr string) (net.Listener, int, error) {
listener, err := net.Listen("tcp", addr)
if err != nil {
return nil, 0, errs.WrapMsg(err, "listen err", "addr", addr)
}
return listener, listener.Addr().(*net.TCPAddr).Port, nil
}
func prommetricsUnaryInterceptor(rpcRegisterName string) grpc.ServerOption {
@ -258,3 +212,19 @@ func prommetricsUnaryInterceptor(rpcRegisterName string) grpc.ServerOption {
func prommetricsStreamInterceptor(rpcRegisterName string) grpc.ServerOption {
return grpc.ChainStreamInterceptor()
}
type grpcService struct {
desc *grpc.ServiceDesc
impl any
}
type grpcServiceRegistrar struct {
services []*grpcService
}
func (x *grpcServiceRegistrar) RegisterService(desc *grpc.ServiceDesc, impl any) {
x.services = append(x.services, &grpcService{
desc: desc,
impl: impl,
})
}