Add etcd as a service discovery mechanism

This commit is contained in:
skiffer-git 2024-05-11 15:40:17 +08:00
parent add7ea8ef0
commit e94598f5c1
32 changed files with 161 additions and 108 deletions

View File

@ -1,16 +1,13 @@
enable: "etcd"
etcd:
rootDirectory: openim
address: [ localhost:12379 ]
username: ''
password: ''
zookeeper:
schema: openim
address: [ localhost:12181 ]
username: ''
password: ''

View File

@ -1,5 +1,4 @@
secret: openIM123
env: zookeeper
rpcRegisterName:
user: user
friend: friend

View File

@ -38,20 +38,17 @@ import (
)
type Config struct {
RpcConfig config.API
MongodbConfig config.Mongo
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
MinioConfig config.Minio
API config.API
Share config.Share
Discovery config.Discovery
}
func Start(ctx context.Context, index int, config *Config) error {
apiPort, err := datautil.GetElemByIndex(config.RpcConfig.Api.Ports, index)
apiPort, err := datautil.GetElemByIndex(config.API.Api.Ports, index)
if err != nil {
return err
}
prometheusPort, err := datautil.GetElemByIndex(config.RpcConfig.Prometheus.Ports, index)
prometheusPort, err := datautil.GetElemByIndex(config.API.Prometheus.Ports, index)
if err != nil {
return err
}
@ -59,7 +56,7 @@ func Start(ctx context.Context, index int, config *Config) error {
var client discovery.SvcDiscoveryRegistry
// Determine whether zk is passed according to whether it is a clustered deployment
client, err = kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share)
client, err = kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share)
if err != nil {
return errs.WrapMsg(err, "failed to register discovery service")
}
@ -70,7 +67,7 @@ func Start(ctx context.Context, index int, config *Config) error {
)
router := newGinRouter(client, config)
if config.RpcConfig.Prometheus.Enable {
if config.API.Prometheus.Enable {
go func() {
p := ginprom.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api"))
p.SetListenAddress(fmt.Sprintf(":%d", prometheusPort))
@ -81,7 +78,7 @@ func Start(ctx context.Context, index int, config *Config) error {
}()
}
address := net.JoinHostPort(network.GetListenIP(config.RpcConfig.Api.ListenIP), strconv.Itoa(apiPort))
address := net.JoinHostPort(network.GetListenIP(config.API.Api.ListenIP), strconv.Itoa(apiPort))
server := http.Server{Addr: address, Handler: router}
log.CInfo(ctx, "API server is initializing", "address", address, "apiPort", apiPort, "prometheusPort", prometheusPort)

View File

@ -35,7 +35,7 @@ func (s *Server) InitServer(ctx context.Context, config *Config, disCov discover
}
func (s *Server) Start(ctx context.Context, index int, conf *Config) error {
return startrpc.Start(ctx, &conf.ZookeeperConfig, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP,
return startrpc.Start(ctx, &conf.Discovery, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP,
conf.MsgGateway.RPC.RegisterIP,
conf.MsgGateway.RPC.Ports, index,
conf.Share.RpcRegisterName.MessageGateway,

View File

@ -24,10 +24,10 @@ import (
)
type Config struct {
MsgGateway config.MsgGateway
ZookeeperConfig config.ZooKeeper
Share config.Share
WebhooksConfig config.Webhooks
MsgGateway config.MsgGateway
Share config.Share
WebhooksConfig config.Webhooks
Discovery config.Discovery
}
// Start run ws server.

View File

@ -63,6 +63,7 @@ type Config struct {
ZookeeperConfig config.ZooKeeper
Share config.Share
WebhooksConfig config.Webhooks
Discovery config.Discovery
}
func Start(ctx context.Context, index int, config *Config) error {
@ -76,7 +77,7 @@ func Start(ctx context.Context, index int, config *Config) error {
if err != nil {
return err
}
client, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share)
client, err := kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share)
if err != nil {
return err
}

View File

@ -24,11 +24,11 @@ type Config struct {
RedisConfig config.Redis
MongodbConfig config.Mongo
KafkaConfig config.Kafka
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
WebhooksConfig config.Webhooks
LocalCacheConfig config.LocalCache
Discovery config.Discovery
}
func (p pushServer) PushMsg(ctx context.Context, req *pbpush.PushMsgReq) (*pbpush.PushMsgResp, error) {

View File

@ -45,10 +45,10 @@ type authServer struct {
}
type Config struct {
RpcConfig config.Auth
RedisConfig config.Redis
ZookeeperConfig config.ZooKeeper
Share config.Share
RpcConfig config.Auth
RedisConfig config.Redis
Share config.Share
Discovery config.Discovery
}
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {

View File

@ -48,13 +48,14 @@ type conversationServer struct {
}
type Config struct {
RpcConfig config.Conversation
RedisConfig config.Redis
MongodbConfig config.Mongo
ZookeeperConfig config.ZooKeeper
RpcConfig config.Conversation
RedisConfig config.Redis
MongodbConfig config.Mongo
// ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
LocalCacheConfig config.LocalCache
Discovery config.Discovery
}
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {

View File

@ -50,14 +50,15 @@ type friendServer struct {
}
type Config struct {
RpcConfig config.Friend
RedisConfig config.Redis
MongodbConfig config.Mongo
ZookeeperConfig config.ZooKeeper
RpcConfig config.Friend
RedisConfig config.Redis
MongodbConfig config.Mongo
//ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
WebhooksConfig config.Webhooks
LocalCacheConfig config.LocalCache
Discovery config.Discovery
}
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {

View File

@ -68,11 +68,11 @@ type Config struct {
RpcConfig config.Group
RedisConfig config.Redis
MongodbConfig config.Mongo
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
WebhooksConfig config.Webhooks
LocalCacheConfig config.LocalCache
Discovery config.Discovery
}
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {

View File

@ -59,11 +59,11 @@ type (
RedisConfig config.Redis
MongodbConfig config.Mongo
KafkaConfig config.Kafka
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
WebhooksConfig config.Webhooks
LocalCacheConfig config.LocalCache
Discovery config.Discovery
}
)

View File

@ -46,11 +46,11 @@ type Config struct {
RpcConfig config.Third
RedisConfig config.Redis
MongodbConfig config.Mongo
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
MinioConfig config.Minio
LocalCacheConfig config.LocalCache
Discovery config.Discovery
}
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {

View File

@ -61,11 +61,11 @@ type Config struct {
RedisConfig config.Redis
MongodbConfig config.Mongo
KafkaConfig config.Kafka
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
WebhooksConfig config.Webhooks
LocalCacheConfig config.LocalCache
Discovery config.Discovery
}
func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegistry, server *grpc.Server) error {

View File

@ -33,9 +33,9 @@ import (
)
type CronTaskConfig struct {
CronTask config.CronTask
ZookeeperConfig config.ZooKeeper
Share config.Share
CronTask config.CronTask
Share config.Share
Discovery config.Discovery
}
func Start(ctx context.Context, config *CronTaskConfig) error {
@ -43,7 +43,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
if config.CronTask.RetainChatRecords < 1 {
return errs.New("msg destruct time must be greater than 1").Wrap()
}
client, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share)
client, err := kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share)
if err != nil {
return errs.WrapMsg(err, "failed to register discovery service")
}

View File

@ -33,9 +33,9 @@ func NewApiCmd() *ApiCmd {
var apiConfig api.Config
ret := &ApiCmd{apiConfig: &apiConfig}
ret.configMap = map[string]any{
OpenIMAPICfgFileName: &apiConfig.RpcConfig,
ZookeeperConfigFileName: &apiConfig.ZookeeperConfig,
OpenIMAPICfgFileName: &apiConfig.API,
ShareFileName: &apiConfig.Share,
DiscoveryConfigFilename: &apiConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)

View File

@ -36,8 +36,8 @@ func NewAuthRpcCmd() *AuthRpcCmd {
ret.configMap = map[string]any{
OpenIMRPCAuthCfgFileName: &authConfig.RpcConfig,
RedisConfigFileName: &authConfig.RedisConfig,
ZookeeperConfigFileName: &authConfig.ZookeeperConfig,
ShareFileName: &authConfig.Share,
DiscoveryConfigFilename: &authConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
@ -53,7 +53,7 @@ func (a *AuthRpcCmd) Exec() error {
}
func (a *AuthRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.authConfig.ZookeeperConfig, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP,
return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP,
a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.Ports,
a.Index(), a.authConfig.Share.RpcRegisterName.Auth, &a.authConfig.Share, a.authConfig, auth.Start)
}

View File

@ -42,6 +42,7 @@ var (
OpenIMRPCMsgCfgFileName string
OpenIMRPCThirdCfgFileName string
OpenIMRPCUserCfgFileName string
DiscoveryConfigFilename string
)
var ConfigEnvPrefixMap map[string]string
@ -70,6 +71,7 @@ func init() {
OpenIMRPCMsgCfgFileName = "openim-rpc-msg.yml"
OpenIMRPCThirdCfgFileName = "openim-rpc-third.yml"
OpenIMRPCUserCfgFileName = "openim-rpc-user.yml"
DiscoveryConfigFilename = "discovery.yml"
ConfigEnvPrefixMap = make(map[string]string)
fileNames := []string{
@ -79,7 +81,7 @@ func init() {
OpenIMAPICfgFileName, OpenIMCronTaskCfgFileName, OpenIMMsgGatewayCfgFileName,
OpenIMMsgTransferCfgFileName, OpenIMPushCfgFileName, OpenIMRPCAuthCfgFileName,
OpenIMRPCConversationCfgFileName, OpenIMRPCFriendCfgFileName, OpenIMRPCGroupCfgFileName,
OpenIMRPCMsgCfgFileName, OpenIMRPCThirdCfgFileName, OpenIMRPCUserCfgFileName,
OpenIMRPCMsgCfgFileName, OpenIMRPCThirdCfgFileName, OpenIMRPCUserCfgFileName, DiscoveryConfigFilename,
}
for _, fileName := range fileNames {

View File

@ -36,11 +36,11 @@ func NewConversationRpcCmd() *ConversationRpcCmd {
ret.configMap = map[string]any{
OpenIMRPCConversationCfgFileName: &conversationConfig.RpcConfig,
RedisConfigFileName: &conversationConfig.RedisConfig,
ZookeeperConfigFileName: &conversationConfig.ZookeeperConfig,
MongodbConfigFileName: &conversationConfig.MongodbConfig,
ShareFileName: &conversationConfig.Share,
NotificationFileName: &conversationConfig.NotificationConfig,
LocalCacheConfigFileName: &conversationConfig.LocalCacheConfig,
DiscoveryConfigFilename: &conversationConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
@ -55,7 +55,7 @@ func (a *ConversationRpcCmd) Exec() error {
}
func (a *ConversationRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.conversationConfig.ZookeeperConfig, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP,
return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP,
a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.Ports,
a.Index(), a.conversationConfig.Share.RpcRegisterName.Conversation, &a.conversationConfig.Share, a.conversationConfig, conversation.Start)
}

View File

@ -34,8 +34,8 @@ func NewCronTaskCmd() *CronTaskCmd {
ret := &CronTaskCmd{cronTaskConfig: &cronTaskConfig}
ret.configMap = map[string]any{
OpenIMCronTaskCfgFileName: &cronTaskConfig.CronTask,
ZookeeperConfigFileName: &cronTaskConfig.ZookeeperConfig,
ShareFileName: &cronTaskConfig.Share,
DiscoveryConfigFilename: &cronTaskConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)

View File

@ -36,12 +36,12 @@ func NewFriendRpcCmd() *FriendRpcCmd {
ret.configMap = map[string]any{
OpenIMRPCFriendCfgFileName: &friendConfig.RpcConfig,
RedisConfigFileName: &friendConfig.RedisConfig,
ZookeeperConfigFileName: &friendConfig.ZookeeperConfig,
MongodbConfigFileName: &friendConfig.MongodbConfig,
ShareFileName: &friendConfig.Share,
NotificationFileName: &friendConfig.NotificationConfig,
WebhooksConfigFileName: &friendConfig.WebhooksConfig,
LocalCacheConfigFileName: &friendConfig.LocalCacheConfig,
DiscoveryConfigFilename: &friendConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
@ -56,7 +56,7 @@ func (a *FriendRpcCmd) Exec() error {
}
func (a *FriendRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.friendConfig.ZookeeperConfig, &a.friendConfig.RpcConfig.Prometheus, a.friendConfig.RpcConfig.RPC.ListenIP,
return startrpc.Start(a.ctx, &a.friendConfig.Discovery, &a.friendConfig.RpcConfig.Prometheus, a.friendConfig.RpcConfig.RPC.ListenIP,
a.friendConfig.RpcConfig.RPC.RegisterIP, a.friendConfig.RpcConfig.RPC.Ports,
a.Index(), a.friendConfig.Share.RpcRegisterName.Friend, &a.friendConfig.Share, a.friendConfig, friend.Start)
}

View File

@ -36,12 +36,12 @@ func NewGroupRpcCmd() *GroupRpcCmd {
ret.configMap = map[string]any{
OpenIMRPCGroupCfgFileName: &groupConfig.RpcConfig,
RedisConfigFileName: &groupConfig.RedisConfig,
ZookeeperConfigFileName: &groupConfig.ZookeeperConfig,
MongodbConfigFileName: &groupConfig.MongodbConfig,
ShareFileName: &groupConfig.Share,
NotificationFileName: &groupConfig.NotificationConfig,
WebhooksConfigFileName: &groupConfig.WebhooksConfig,
LocalCacheConfigFileName: &groupConfig.LocalCacheConfig,
DiscoveryConfigFilename: &groupConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
@ -56,7 +56,7 @@ func (a *GroupRpcCmd) Exec() error {
}
func (a *GroupRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.groupConfig.ZookeeperConfig, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP,
return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP,
a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.Ports,
a.Index(), a.groupConfig.Share.RpcRegisterName.Group, &a.groupConfig.Share, a.groupConfig, group.Start)
}

View File

@ -36,13 +36,13 @@ func NewMsgRpcCmd() *MsgRpcCmd {
ret.configMap = map[string]any{
OpenIMRPCMsgCfgFileName: &msgConfig.RpcConfig,
RedisConfigFileName: &msgConfig.RedisConfig,
ZookeeperConfigFileName: &msgConfig.ZookeeperConfig,
MongodbConfigFileName: &msgConfig.MongodbConfig,
KafkaConfigFileName: &msgConfig.KafkaConfig,
ShareFileName: &msgConfig.Share,
NotificationFileName: &msgConfig.NotificationConfig,
WebhooksConfigFileName: &msgConfig.WebhooksConfig,
LocalCacheConfigFileName: &msgConfig.LocalCacheConfig,
DiscoveryConfigFilename: &msgConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
@ -57,7 +57,7 @@ func (a *MsgRpcCmd) Exec() error {
}
func (a *MsgRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.msgConfig.ZookeeperConfig, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP,
return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP,
a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.Ports,
a.Index(), a.msgConfig.Share.RpcRegisterName.Msg, &a.msgConfig.Share, a.msgConfig, msg.Start)
}

View File

@ -36,9 +36,9 @@ func NewMsgGatewayCmd() *MsgGatewayCmd {
ret := &MsgGatewayCmd{msgGatewayConfig: &msgGatewayConfig}
ret.configMap = map[string]any{
OpenIMMsgGatewayCfgFileName: &msgGatewayConfig.MsgGateway,
ZookeeperConfigFileName: &msgGatewayConfig.ZookeeperConfig,
ShareFileName: &msgGatewayConfig.Share,
WebhooksConfigFileName: &msgGatewayConfig.WebhooksConfig,
DiscoveryConfigFilename: &msgGatewayConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)

View File

@ -40,6 +40,7 @@ func NewMsgTransferCmd() *MsgTransferCmd {
ZookeeperConfigFileName: &msgTransferConfig.ZookeeperConfig,
ShareFileName: &msgTransferConfig.Share,
WebhooksConfigFileName: &msgTransferConfig.WebhooksConfig,
DiscoveryConfigFilename: &msgTransferConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)

View File

@ -36,13 +36,13 @@ func NewPushRpcCmd() *PushRpcCmd {
ret.configMap = map[string]any{
OpenIMPushCfgFileName: &pushConfig.RpcConfig,
RedisConfigFileName: &pushConfig.RedisConfig,
ZookeeperConfigFileName: &pushConfig.ZookeeperConfig,
MongodbConfigFileName: &pushConfig.MongodbConfig,
KafkaConfigFileName: &pushConfig.KafkaConfig,
ShareFileName: &pushConfig.Share,
NotificationFileName: &pushConfig.NotificationConfig,
WebhooksConfigFileName: &pushConfig.WebhooksConfig,
LocalCacheConfigFileName: &pushConfig.LocalCacheConfig,
DiscoveryConfigFilename: &pushConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
@ -57,7 +57,7 @@ func (a *PushRpcCmd) Exec() error {
}
func (a *PushRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.pushConfig.ZookeeperConfig, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP,
return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP,
a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.Ports,
a.Index(), a.pushConfig.Share.RpcRegisterName.Push, &a.pushConfig.Share, a.pushConfig, push.Start)
}

View File

@ -36,12 +36,12 @@ func NewThirdRpcCmd() *ThirdRpcCmd {
ret.configMap = map[string]any{
OpenIMRPCThirdCfgFileName: &thirdConfig.RpcConfig,
RedisConfigFileName: &thirdConfig.RedisConfig,
ZookeeperConfigFileName: &thirdConfig.ZookeeperConfig,
MongodbConfigFileName: &thirdConfig.MongodbConfig,
ShareFileName: &thirdConfig.Share,
NotificationFileName: &thirdConfig.NotificationConfig,
MinioConfigFileName: &thirdConfig.MinioConfig,
LocalCacheConfigFileName: &thirdConfig.LocalCacheConfig,
DiscoveryConfigFilename: &thirdConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
@ -56,7 +56,7 @@ func (a *ThirdRpcCmd) Exec() error {
}
func (a *ThirdRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.thirdConfig.ZookeeperConfig, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP,
return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP,
a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.Ports,
a.Index(), a.thirdConfig.Share.RpcRegisterName.Third, &a.thirdConfig.Share, a.thirdConfig, third.Start)
}

View File

@ -36,13 +36,13 @@ func NewUserRpcCmd() *UserRpcCmd {
ret.configMap = map[string]any{
OpenIMRPCUserCfgFileName: &userConfig.RpcConfig,
RedisConfigFileName: &userConfig.RedisConfig,
ZookeeperConfigFileName: &userConfig.ZookeeperConfig,
MongodbConfigFileName: &userConfig.MongodbConfig,
KafkaConfigFileName: &userConfig.KafkaConfig,
ShareFileName: &userConfig.Share,
NotificationFileName: &userConfig.NotificationConfig,
WebhooksConfigFileName: &userConfig.WebhooksConfig,
LocalCacheConfigFileName: &userConfig.LocalCacheConfig,
DiscoveryConfigFilename: &userConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
@ -57,7 +57,7 @@ func (a *UserRpcCmd) Exec() error {
}
func (a *UserRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.userConfig.ZookeeperConfig, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP,
return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP,
a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.Ports,
a.Index(), a.userConfig.Share.RpcRegisterName.User, &a.userConfig.Share, a.userConfig, user.Start)
}

View File

@ -345,7 +345,6 @@ type AfterConfig struct {
type Share struct {
Secret string `mapstructure:"secret"`
Env string `mapstructure:"env"`
RpcRegisterName RpcRegisterName `mapstructure:"rpcRegisterName"`
IMAdminUserID []string `mapstructure:"imAdminUserID"`
}
@ -432,6 +431,19 @@ type ZooKeeper struct {
Password string `mapstructure:"password"`
}
type Discovery struct {
Enable string `mapstructure:"enable"`
Etcd Etcd `mapstructure:"etcd"`
ZooKeeper ZooKeeper `mapstructure:"zooKeeper"`
}
type Etcd struct {
RootDirectory string `mapstructure:"rootDirectory"`
Address []string `mapstructure:"address"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
}
func (m *Mongo) Build() *mongoutil.Config {
return &mongoutil.Config{
Uri: m.URI,

View File

@ -24,33 +24,31 @@ import (
"time"
)
const (
zookeeperConst = "zookeeper"
kubenetesConst = "k8s"
directConst = "direct"
etcdConst = "etcd"
)
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
func NewDiscoveryRegister(zookeeperConfig *config.ZooKeeper, share *config.Share) (discovery.SvcDiscoveryRegistry, error) {
switch share.Env {
case zookeeperConst:
func NewDiscoveryRegister(discovery *config.Discovery, share *config.Share) (discovery.SvcDiscoveryRegistry, error) {
switch discovery.Enable {
case "zookeeper":
return zookeeper.NewZkClient(
zookeeperConfig.Address,
zookeeperConfig.Schema,
discovery.ZooKeeper.Address,
discovery.ZooKeeper.Schema,
zookeeper.WithFreq(time.Hour),
zookeeper.WithUserNameAndPassword(zookeeperConfig.Username, zookeeperConfig.Password),
zookeeper.WithUserNameAndPassword(discovery.ZooKeeper.Username, discovery.ZooKeeper.Password),
zookeeper.WithRoundRobin(),
zookeeper.WithTimeout(10),
)
case kubenetesConst:
case "k8s":
return kubernetes.NewK8sDiscoveryRegister(share.RpcRegisterName.MessageGateway)
case etcdConst:
return getcd.NewSvcDiscoveryRegistry("etcd", []string{"localhost:2379"})
case directConst:
case "etcd":
return getcd.NewSvcDiscoveryRegistry(
discovery.Etcd.RootDirectory,
discovery.Etcd.Address,
getcd.WithDialTimeout(10*time.Second),
getcd.WithMaxCallSendMsgSize(20*1024*1024),
getcd.WithUsernameAndPassword(discovery.Etcd.Username, discovery.Etcd.Password))
case "direct":
//return direct.NewConnDirect(config)
default:
return nil, errs.New("unsupported discovery type", "type", share.Env).Wrap()
return nil, errs.New("unsupported discovery type", "type", discovery.Enable).Wrap()
}
return nil, nil
}

View File

@ -8,27 +8,38 @@ import (
"go.etcd.io/etcd/client/v3/naming/resolver"
"google.golang.org/grpc"
gresolver "google.golang.org/grpc/resolver"
"log"
"time"
)
// ZkOption defines a function type for modifying clientv3.Config
type ZkOption func(*clientv3.Config)
// SvcDiscoveryRegistryImpl implementation
type SvcDiscoveryRegistryImpl struct {
client *clientv3.Client
resolver gresolver.Builder
dialOptions []grpc.DialOption
serviceKey string
endpointMgr endpoints.Manager
leaseID clientv3.LeaseID
schema string
client *clientv3.Client
resolver gresolver.Builder
dialOptions []grpc.DialOption
serviceKey string
endpointMgr endpoints.Manager
leaseID clientv3.LeaseID
rootDirectory string
}
func NewSvcDiscoveryRegistry(schema string, endpoints []string) (*SvcDiscoveryRegistryImpl, error) {
// NewSvcDiscoveryRegistry creates a new service discovery registry implementation
func NewSvcDiscoveryRegistry(rootDirectory string, endpoints []string, options ...ZkOption) (*SvcDiscoveryRegistryImpl, error) {
cfg := clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
// Increase keep-alive queue capacity and message size
PermitWithoutStream: true,
MaxCallSendMsgSize: 10 * 1024 * 1024, // 10 MB
}
// Apply provided options to the config
for _, opt := range options {
opt(&cfg)
}
client, err := clientv3.New(cfg)
if err != nil {
return nil, err
@ -38,17 +49,42 @@ func NewSvcDiscoveryRegistry(schema string, endpoints []string) (*SvcDiscoveryRe
return nil, err
}
return &SvcDiscoveryRegistryImpl{
client: client,
resolver: r,
schema: schema,
client: client,
resolver: r,
rootDirectory: rootDirectory,
}, nil
}
// WithDialTimeout sets a custom dial timeout for the etcd client
func WithDialTimeout(timeout time.Duration) ZkOption {
return func(cfg *clientv3.Config) {
cfg.DialTimeout = timeout
}
}
// WithMaxCallSendMsgSize sets a custom max call send message size for the etcd client
func WithMaxCallSendMsgSize(size int) ZkOption {
return func(cfg *clientv3.Config) {
cfg.MaxCallSendMsgSize = size
}
}
// WithUsernameAndPassword sets a username and password for the etcd client
func WithUsernameAndPassword(username, password string) ZkOption {
return func(cfg *clientv3.Config) {
cfg.Username = username
cfg.Password = password
}
}
// GetUserIdHashGatewayHost returns the gateway host for a given user ID hash
func (r *SvcDiscoveryRegistryImpl) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) {
return "", nil
}
// GetConns returns gRPC client connections for a given service name
func (r *SvcDiscoveryRegistryImpl) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
target := fmt.Sprintf("%s:///%s", r.schema, serviceName)
target := fmt.Sprintf("etcd:///%s", serviceName)
conn, err := grpc.DialContext(ctx, target, append(append(r.dialOptions, opts...), grpc.WithResolvers(r.resolver))...)
if err != nil {
return nil, err
@ -56,34 +92,39 @@ func (r *SvcDiscoveryRegistryImpl) GetConns(ctx context.Context, serviceName str
return []*grpc.ClientConn{conn}, nil
}
// GetConn returns a single gRPC client connection for a given service name
func (r *SvcDiscoveryRegistryImpl) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
target := fmt.Sprintf("%s:///%s", r.schema, serviceName)
target := fmt.Sprintf("etcd:///%s", serviceName)
return grpc.DialContext(ctx, target, append(append(r.dialOptions, opts...), grpc.WithResolvers(r.resolver))...)
}
// GetSelfConnTarget returns the connection target for the current service
func (r *SvcDiscoveryRegistryImpl) GetSelfConnTarget() string {
return fmt.Sprintf("%s:///%s", r.schema, r.serviceKey)
return fmt.Sprintf("etcd:///%s", r.serviceKey)
}
// AddOption appends gRPC dial options to the existing options
func (r *SvcDiscoveryRegistryImpl) AddOption(opts ...grpc.DialOption) {
r.dialOptions = append(r.dialOptions, opts...)
}
// CloseConn closes a given gRPC client connection
func (r *SvcDiscoveryRegistryImpl) CloseConn(conn *grpc.ClientConn) {
if err := conn.Close(); err != nil {
log.Printf("Failed to close connection: %v", err)
fmt.Printf("Failed to close connection: %v\n", err)
}
}
// Register registers a new service endpoint with etcd
func (r *SvcDiscoveryRegistryImpl) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
r.serviceKey = fmt.Sprintf("%s/%s-%d", serviceName, host, port)
em, err := endpoints.NewManager(r.client, serviceName)
r.serviceKey = fmt.Sprintf("%s/%s/%s-%d", r.rootDirectory, serviceName, host, port)
em, err := endpoints.NewManager(r.client, r.rootDirectory+"/"+serviceName)
if err != nil {
return err
}
r.endpointMgr = em
leaseResp, err := r.client.Grant(context.Background(), 30)
leaseResp, err := r.client.Grant(context.Background(), 60) // Increase TTL time
if err != nil {
return err
}
@ -100,10 +141,11 @@ func (r *SvcDiscoveryRegistryImpl) Register(serviceName, host string, port int,
return nil
}
// keepAliveLease maintains the lease alive by sending keep-alive requests
func (r *SvcDiscoveryRegistryImpl) keepAliveLease(leaseID clientv3.LeaseID) {
ch, err := r.client.KeepAlive(context.Background(), leaseID)
if err != nil {
log.Printf("Failed to keep lease alive: %v", err)
fmt.Printf("Failed to keep lease alive: %v\n", err)
return
}
@ -111,12 +153,13 @@ func (r *SvcDiscoveryRegistryImpl) keepAliveLease(leaseID clientv3.LeaseID) {
if ka != nil {
fmt.Printf("Received lease keep-alive response: %v\n", ka)
} else {
fmt.Printf("Lease keep-alive response channel closed")
break
fmt.Printf("Lease keep-alive response channel closed\n")
return
}
}
}
// UnRegister removes the service endpoint from etcd
func (r *SvcDiscoveryRegistryImpl) UnRegister() error {
if r.endpointMgr == nil {
return fmt.Errorf("endpoint manager is not initialized")
@ -124,6 +167,7 @@ func (r *SvcDiscoveryRegistryImpl) UnRegister() error {
return r.endpointMgr.DeleteEndpoint(context.TODO(), r.serviceKey)
}
// Close closes the etcd client connection
func (r *SvcDiscoveryRegistryImpl) Close() {
if r.client != nil {
_ = r.client.Close()

View File

@ -44,7 +44,7 @@ import (
)
// Start rpc server.
func Start[T any](ctx context.Context, zookeeperConfig *config2.ZooKeeper, prometheusConfig *config2.Prometheus, listenIP,
func Start[T any](ctx context.Context, discovery *config2.Discovery, prometheusConfig *config2.Prometheus, listenIP,
registerIP string, rpcPorts []int, index int, rpcRegisterName string, share *config2.Share, config T, rpcFn func(ctx context.Context,
config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error {
@ -68,7 +68,7 @@ func Start[T any](ctx context.Context, zookeeperConfig *config2.ZooKeeper, prome
}
defer listener.Close()
client, err := kdisc.NewDiscoveryRegister(zookeeperConfig, share)
client, err := kdisc.NewDiscoveryRegister(discovery, share)
if err != nil {
return err
}