feat: config (#2934)

This commit is contained in:
icey-yu 2024-12-07 12:11:12 +08:00 committed by GitHub
parent fceaaa199b
commit bbbf17cbe4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 85 additions and 103 deletions

View File

@ -5,4 +5,15 @@ etcd:
username: '' username: ''
password: '' password: ''
rpcService:
user: user-rpc-service
friend: friend-rpc-service
msg: msg-rpc-service
push: push-rpc-service
messageGateway: messageGateway-rpc-service
group: group-rpc-service
auth: auth-rpc-service
conversation: conversation-rpc-service
third: third-rpc-service

View File

@ -51,7 +51,7 @@ func Start(ctx context.Context, index int, config *Config) error {
var client discovery.SvcDiscoveryRegistry var client discovery.SvcDiscoveryRegistry
// Determine whether zk is passed according to whether it is a clustered deployment // Determine whether zk is passed according to whether it is a clustered deployment
client, err = kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share) client, err = kdisc.NewDiscoveryRegister(&config.Discovery)
if err != nil { if err != nil {
return errs.WrapMsg(err, "failed to register discovery service") return errs.WrapMsg(err, "failed to register discovery service")
} }

View File

@ -57,14 +57,14 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
_ = v.RegisterValidation("required_if", RequiredIf) _ = v.RegisterValidation("required_if", RequiredIf)
} }
// init rpc client here // init rpc client here
userRpc := rpcclient.NewUser(disCov, config.Share.RpcRegisterName.User, config.Share.RpcRegisterName.MessageGateway, userRpc := rpcclient.NewUser(disCov, config.Discovery.RpcService.User, config.Discovery.RpcService.MessageGateway,
config.Share.IMAdminUserID) config.Share.IMAdminUserID)
groupRpc := rpcclient.NewGroup(disCov, config.Share.RpcRegisterName.Group) groupRpc := rpcclient.NewGroup(disCov, config.Discovery.RpcService.Group)
friendRpc := rpcclient.NewFriend(disCov, config.Share.RpcRegisterName.Friend) friendRpc := rpcclient.NewFriend(disCov, config.Discovery.RpcService.Friend)
messageRpc := rpcclient.NewMessage(disCov, config.Share.RpcRegisterName.Msg) messageRpc := rpcclient.NewMessage(disCov, config.Discovery.RpcService.Msg)
conversationRpc := rpcclient.NewConversation(disCov, config.Share.RpcRegisterName.Conversation) conversationRpc := rpcclient.NewConversation(disCov, config.Discovery.RpcService.Conversation)
authRpc := rpcclient.NewAuth(disCov, config.Share.RpcRegisterName.Auth) authRpc := rpcclient.NewAuth(disCov, config.Discovery.RpcService.Auth)
thirdRpc := rpcclient.NewThird(disCov, config.Share.RpcRegisterName.Third, config.API.Prometheus.GrafanaURL) thirdRpc := rpcclient.NewThird(disCov, config.Discovery.RpcService.Third, config.API.Prometheus.GrafanaURL)
switch config.API.Api.CompressionLevel { switch config.API.Api.CompressionLevel {
case NoCompression: case NoCompression:
case DefaultCompression: case DefaultCompression:

View File

@ -37,7 +37,7 @@ import (
func (s *Server) InitServer(ctx context.Context, config *Config, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error { func (s *Server) InitServer(ctx context.Context, config *Config, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
s.LongConnServer.SetDiscoveryRegistry(disCov, config) s.LongConnServer.SetDiscoveryRegistry(disCov, config)
msggateway.RegisterMsgGatewayServer(server, s) msggateway.RegisterMsgGatewayServer(server, s)
s.userRcp = rpcclient.NewUserRpcClient(disCov, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) s.userRcp = rpcclient.NewUserRpcClient(disCov, config.Discovery.RpcService.User, config.Share.IMAdminUserID)
if s.ready != nil { if s.ready != nil {
return s.ready(s) return s.ready(s)
} }
@ -48,7 +48,7 @@ func (s *Server) Start(ctx context.Context, index int, conf *Config) error {
return startrpc.Start(ctx, &conf.Discovery, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP, return startrpc.Start(ctx, &conf.Discovery, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP,
conf.MsgGateway.RPC.RegisterIP, conf.MsgGateway.RPC.RegisterIP,
conf.MsgGateway.RPC.AutoSetPorts, conf.MsgGateway.RPC.Ports, index, conf.MsgGateway.RPC.AutoSetPorts, conf.MsgGateway.RPC.Ports, index,
conf.Share.RpcRegisterName.MessageGateway, conf.Discovery.RpcService.MessageGateway,
&conf.Share, &conf.Share,
conf, conf,
s.InitServer, s.InitServer,

View File

@ -120,7 +120,7 @@ type GrpcHandler struct {
validate *validator.Validate validate *validator.Validate
} }
func NewGrpcHandler(validate *validator.Validate, client discovery.SvcDiscoveryRegistry, rpcRegisterName *config.RpcRegisterName) *GrpcHandler { func NewGrpcHandler(validate *validator.Validate, client discovery.SvcDiscoveryRegistry, rpcRegisterName *config.RpcService) *GrpcHandler {
msgRpcClient := rpcclient.NewMessageRpcClient(client, rpcRegisterName.Msg) msgRpcClient := rpcclient.NewMessageRpcClient(client, rpcRegisterName.Msg)
pushRpcClient := rpcclient.NewPushRpcClient(client, rpcRegisterName.Push) pushRpcClient := rpcclient.NewPushRpcClient(client, rpcRegisterName.Push)
return &GrpcHandler{ return &GrpcHandler{

View File

@ -72,9 +72,9 @@ type kickHandler struct {
} }
func (ws *WsServer) SetDiscoveryRegistry(disCov discovery.SvcDiscoveryRegistry, config *Config) { func (ws *WsServer) SetDiscoveryRegistry(disCov discovery.SvcDiscoveryRegistry, config *Config) {
ws.MessageHandler = NewGrpcHandler(ws.validate, disCov, &config.Share.RpcRegisterName) ws.MessageHandler = NewGrpcHandler(ws.validate, disCov, &config.Discovery.RpcService)
u := rpcclient.NewUserRpcClient(disCov, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) u := rpcclient.NewUserRpcClient(disCov, config.Discovery.RpcService.User, config.Share.IMAdminUserID)
ws.authClient = rpcclient.NewAuth(disCov, config.Share.RpcRegisterName.Auth) ws.authClient = rpcclient.NewAuth(disCov, config.Discovery.RpcService.Auth)
ws.userClient = &u ws.userClient = &u
ws.disCov = disCov ws.disCov = disCov
} }
@ -113,7 +113,7 @@ func NewWsServer(msgGatewayConfig *Config, opts ...Option) *WsServer {
for _, o := range opts { for _, o := range opts {
o(&config) o(&config)
} }
//userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) //userRpcClient := rpcclient.NewUserRpcClient(client, config.Discovery.RpcService.User, config.Share.IMAdminUserID)
v := validator.New() v := validator.New()
return &WsServer{ return &WsServer{
@ -192,7 +192,7 @@ func (ws *WsServer) Run(done chan error) error {
var concurrentRequest = 3 var concurrentRequest = 3
func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *Client) error { func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *Client) error {
conns, err := ws.disCov.GetConns(ctx, ws.msgGatewayConfig.Share.RpcRegisterName.MessageGateway) conns, err := ws.disCov.GetConns(ctx, ws.msgGatewayConfig.Discovery.RpcService.MessageGateway)
if err != nil { if err != nil {
return err return err
} }

View File

@ -75,7 +75,7 @@ func Start(ctx context.Context, index int, config *Config) error {
if err != nil { if err != nil {
return err return err
} }
client, err := discRegister.NewDiscoveryRegister(&config.Discovery, &config.Share) client, err := discRegister.NewDiscoveryRegister(&config.Discovery)
if err != nil { if err != nil {
return err return err
} }
@ -101,8 +101,8 @@ func Start(ctx context.Context, index int, config *Config) error {
if err != nil { if err != nil {
return err return err
} }
conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation) conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Discovery.RpcService.Conversation)
groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Discovery.RpcService.Group)
historyCH, err := NewOnlineHistoryRedisConsumerHandler(&config.KafkaConfig, msgTransferDatabase, &conversationRpcClient, &groupRpcClient) historyCH, err := NewOnlineHistoryRedisConsumerHandler(&config.KafkaConfig, msgTransferDatabase, &conversationRpcClient, &groupRpcClient)
if err != nil { if err != nil {
return err return err

View File

@ -60,7 +60,7 @@ func NewDefaultAllNode(disCov discovery.SvcDiscoveryRegistry, config *Config) *D
func (d *DefaultAllNode) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, func (d *DefaultAllNode) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData,
pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
conns, err := d.disCov.GetConns(ctx, d.config.Share.RpcRegisterName.MessageGateway) conns, err := d.disCov.GetConns(ctx, d.config.Discovery.RpcService.MessageGateway)
if len(conns) == 0 { if len(conns) == 0 {
log.ZWarn(ctx, "get gateway conn 0 ", nil) log.ZWarn(ctx, "get gateway conn 0 ", nil)
} else { } else {

View File

@ -58,14 +58,14 @@ func NewConsumerHandler(config *Config, database controller.PushDatabase, offlin
return nil, err return nil, err
} }
userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) userRpcClient := rpcclient.NewUserRpcClient(client, config.Discovery.RpcService.User, config.Share.IMAdminUserID)
consumerHandler.offlinePusher = offlinePusher consumerHandler.offlinePusher = offlinePusher
consumerHandler.onlinePusher = NewOnlinePusher(client, config) consumerHandler.onlinePusher = NewOnlinePusher(client, config)
consumerHandler.groupRpcClient = rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) consumerHandler.groupRpcClient = rpcclient.NewGroupRpcClient(client, config.Discovery.RpcService.Group)
consumerHandler.groupLocalCache = rpccache.NewGroupLocalCache(consumerHandler.groupRpcClient, &config.LocalCacheConfig, rdb) consumerHandler.groupLocalCache = rpccache.NewGroupLocalCache(consumerHandler.groupRpcClient, &config.LocalCacheConfig, rdb)
consumerHandler.msgRpcClient = rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) consumerHandler.msgRpcClient = rpcclient.NewMessageRpcClient(client, config.Discovery.RpcService.Msg)
consumerHandler.conversationRpcClient = rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation) consumerHandler.conversationRpcClient = rpcclient.NewConversationRpcClient(client, config.Discovery.RpcService.Conversation)
consumerHandler.conversationLocalCache = rpccache.NewConversationLocalCache(consumerHandler.conversationRpcClient, &config.LocalCacheConfig, rdb) consumerHandler.conversationLocalCache = rpccache.NewConversationLocalCache(consumerHandler.conversationRpcClient, &config.LocalCacheConfig, rdb)
consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL) consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL)
consumerHandler.config = config consumerHandler.config = config

View File

@ -59,7 +59,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil { if err != nil {
return err return err
} }
userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) userRpcClient := rpcclient.NewUserRpcClient(client, config.Discovery.RpcService.User, config.Share.IMAdminUserID)
pbauth.RegisterAuthServer(server, &authServer{ pbauth.RegisterAuthServer(server, &authServer{
userRpcClient: &userRpcClient, userRpcClient: &userRpcClient,
RegisterCenter: client, RegisterCenter: client,
@ -182,7 +182,7 @@ func (s *authServer) ForceLogout(ctx context.Context, req *pbauth.ForceLogoutReq
} }
func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID int32) error { func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID int32) error {
conns, err := s.RegisterCenter.GetConns(ctx, s.config.Share.RpcRegisterName.MessageGateway) conns, err := s.RegisterCenter.GetConns(ctx, s.config.Discovery.RpcService.MessageGateway)
if err != nil { if err != nil {
return err return err
} }

View File

@ -78,9 +78,9 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil { if err != nil {
return err return err
} }
groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Discovery.RpcService.Group)
msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Discovery.RpcService.Msg)
userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) userRpcClient := rpcclient.NewUserRpcClient(client, config.Discovery.RpcService.User, config.Share.IMAdminUserID)
localcache.InitLocalCache(&config.LocalCacheConfig) localcache.InitLocalCache(&config.LocalCacheConfig)
pbconversation.RegisterConversationServer(server, &conversationServer{ pbconversation.RegisterConversationServer(server, &conversationServer{
msgRpcClient: &msgRpcClient, msgRpcClient: &msgRpcClient,

View File

@ -99,9 +99,9 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil { if err != nil {
return err return err
} }
userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) userRpcClient := rpcclient.NewUserRpcClient(client, config.Discovery.RpcService.User, config.Share.IMAdminUserID)
msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Discovery.RpcService.Msg)
conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation) conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Discovery.RpcService.Conversation)
var gs groupServer var gs groupServer
database := controller.NewGroupDatabase(rdb, &config.LocalCacheConfig, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs)) database := controller.NewGroupDatabase(rdb, &config.LocalCacheConfig, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs))
gs.db = database gs.db = database

View File

@ -89,10 +89,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
return err return err
} }
msgModel := redis.NewMsgCache(rdb) msgModel := redis.NewMsgCache(rdb)
conversationClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation) conversationClient := rpcclient.NewConversationRpcClient(client, config.Discovery.RpcService.Conversation)
userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) userRpcClient := rpcclient.NewUserRpcClient(client, config.Discovery.RpcService.User, config.Share.IMAdminUserID)
groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Discovery.RpcService.Group)
friendRpcClient := rpcclient.NewFriendRpcClient(client, config.Share.RpcRegisterName.Friend) friendRpcClient := rpcclient.NewFriendRpcClient(client, config.Discovery.RpcService.Friend)
seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB()) seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB())
if err != nil { if err != nil {
return err return err

View File

@ -93,8 +93,8 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
} }
// Initialize RPC clients // Initialize RPC clients
userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) userRpcClient := rpcclient.NewUserRpcClient(client, config.Discovery.RpcService.User, config.Share.IMAdminUserID)
msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Discovery.RpcService.Msg)
// Initialize notification sender // Initialize notification sender
notificationSender := NewFriendNotificationSender( notificationSender := NewFriendNotificationSender(
@ -119,7 +119,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
userRpcClient: &userRpcClient, userRpcClient: &userRpcClient,
notificationSender: notificationSender, notificationSender: notificationSender,
RegisterCenter: client, RegisterCenter: client,
conversationRpcClient: rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation), conversationRpcClient: rpcclient.NewConversationRpcClient(client, config.Discovery.RpcService.Conversation),
config: config, config: config,
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL), webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
queue: memamq.NewMemoryQueue(16, 1024*1024), queue: memamq.NewMemoryQueue(16, 1024*1024),

View File

@ -101,7 +101,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
localcache.InitLocalCache(&config.LocalCacheConfig) localcache.InitLocalCache(&config.LocalCacheConfig)
third.RegisterThirdServer(server, &thirdServer{ third.RegisterThirdServer(server, &thirdServer{
thirdDatabase: controller.NewThirdDatabase(redis.NewThirdCache(rdb), logdb), thirdDatabase: controller.NewThirdDatabase(redis.NewThirdCache(rdb), logdb),
userRpcClient: rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID), userRpcClient: rpcclient.NewUserRpcClient(client, config.Discovery.RpcService.User, config.Share.IMAdminUserID),
s3dataBase: controller.NewS3Database(rdb, o, s3db), s3dataBase: controller.NewS3Database(rdb, o, s3db),
defaultExpire: time.Hour * 24 * 7, defaultExpire: time.Hour * 24 * 7,
config: config, config: config,

View File

@ -96,9 +96,9 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi
} }
userCache := redis.NewUserCacheRedis(rdb, &config.LocalCacheConfig, userDB, redis.GetRocksCacheOptions()) userCache := redis.NewUserCacheRedis(rdb, &config.LocalCacheConfig, userDB, redis.GetRocksCacheOptions())
database := controller.NewUserDatabase(userDB, userCache, mgocli.GetTx()) database := controller.NewUserDatabase(userDB, userCache, mgocli.GetTx())
friendRpcClient := rpcclient.NewFriendRpcClient(client, config.Share.RpcRegisterName.Friend) friendRpcClient := rpcclient.NewFriendRpcClient(client, config.Discovery.RpcService.Friend)
groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Discovery.RpcService.Group)
msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Discovery.RpcService.Msg)
localcache.InitLocalCache(&config.LocalCacheConfig) localcache.InitLocalCache(&config.LocalCacheConfig)
u := &userServer{ u := &userServer{
online: redis.NewUserOnline(rdb), online: redis.NewUserOnline(rdb),

View File

@ -1,17 +1,3 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tools package tools
import ( import (
@ -47,24 +33,24 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
if config.CronTask.RetainChatRecords < 1 { if config.CronTask.RetainChatRecords < 1 {
return errs.New("msg destruct time must be greater than 1").Wrap() return errs.New("msg destruct time must be greater than 1").Wrap()
} }
client, err := kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share) client, err := kdisc.NewDiscoveryRegister(&config.Discovery)
if err != nil { if err != nil {
return errs.WrapMsg(err, "failed to register discovery service") return errs.WrapMsg(err, "failed to register discovery service")
} }
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
ctx = mcontext.SetOpUserID(ctx, config.Share.IMAdminUserID[0]) ctx = mcontext.SetOpUserID(ctx, config.Share.IMAdminUserID[0])
msgConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Msg) msgConn, err := client.GetConn(ctx, config.Discovery.RpcService.Msg)
if err != nil { if err != nil {
return err return err
} }
thirdConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third) thirdConn, err := client.GetConn(ctx, config.Discovery.RpcService.Third)
if err != nil { if err != nil {
return err return err
} }
conversationConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Conversation) conversationConn, err := client.GetConn(ctx, config.Discovery.RpcService.Conversation)
if err != nil { if err != nil {
return err return err
} }

View File

@ -56,5 +56,5 @@ func (a *AuthRpcCmd) Exec() error {
func (a *AuthRpcCmd) runE() error { func (a *AuthRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP,
a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.AutoSetPorts, a.authConfig.RpcConfig.RPC.Ports, a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.AutoSetPorts, a.authConfig.RpcConfig.RPC.Ports,
a.Index(), a.authConfig.Share.RpcRegisterName.Auth, &a.authConfig.Share, a.authConfig, auth.Start) a.Index(), a.authConfig.Discovery.RpcService.Auth, &a.authConfig.Share, a.authConfig, auth.Start)
} }

View File

@ -58,5 +58,5 @@ func (a *ConversationRpcCmd) Exec() error {
func (a *ConversationRpcCmd) runE() error { func (a *ConversationRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP,
a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.AutoSetPorts, a.conversationConfig.RpcConfig.RPC.Ports, a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.AutoSetPorts, a.conversationConfig.RpcConfig.RPC.Ports,
a.Index(), a.conversationConfig.Share.RpcRegisterName.Conversation, &a.conversationConfig.Share, a.conversationConfig, conversation.Start) a.Index(), a.conversationConfig.Discovery.RpcService.Conversation, &a.conversationConfig.Share, a.conversationConfig, conversation.Start)
} }

View File

@ -59,5 +59,5 @@ func (a *FriendRpcCmd) Exec() error {
func (a *FriendRpcCmd) runE() error { func (a *FriendRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP,
a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.AutoSetPorts, a.relationConfig.RpcConfig.RPC.Ports, a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.AutoSetPorts, a.relationConfig.RpcConfig.RPC.Ports,
a.Index(), a.relationConfig.Share.RpcRegisterName.Friend, &a.relationConfig.Share, a.relationConfig, relation.Start) a.Index(), a.relationConfig.Discovery.RpcService.Friend, &a.relationConfig.Share, a.relationConfig, relation.Start)
} }

View File

@ -60,5 +60,5 @@ func (a *GroupRpcCmd) Exec() error {
func (a *GroupRpcCmd) runE() error { func (a *GroupRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP,
a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.AutoSetPorts, a.groupConfig.RpcConfig.RPC.Ports, a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.AutoSetPorts, a.groupConfig.RpcConfig.RPC.Ports,
a.Index(), a.groupConfig.Share.RpcRegisterName.Group, &a.groupConfig.Share, a.groupConfig, group.Start, versionctx.EnableVersionCtx()) a.Index(), a.groupConfig.Discovery.RpcService.Group, &a.groupConfig.Share, a.groupConfig, group.Start, versionctx.EnableVersionCtx())
} }

View File

@ -60,5 +60,5 @@ func (a *MsgRpcCmd) Exec() error {
func (a *MsgRpcCmd) runE() error { func (a *MsgRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP,
a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.AutoSetPorts, a.msgConfig.RpcConfig.RPC.Ports, a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.AutoSetPorts, a.msgConfig.RpcConfig.RPC.Ports,
a.Index(), a.msgConfig.Share.RpcRegisterName.Msg, &a.msgConfig.Share, a.msgConfig, msg.Start) a.Index(), a.msgConfig.Discovery.RpcService.Msg, &a.msgConfig.Share, a.msgConfig, msg.Start)
} }

View File

@ -60,5 +60,5 @@ func (a *PushRpcCmd) Exec() error {
func (a *PushRpcCmd) runE() error { func (a *PushRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP,
a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.AutoSetPorts, a.pushConfig.RpcConfig.RPC.Ports, a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.AutoSetPorts, a.pushConfig.RpcConfig.RPC.Ports,
a.Index(), a.pushConfig.Share.RpcRegisterName.Push, &a.pushConfig.Share, a.pushConfig, push.Start) a.Index(), a.pushConfig.Discovery.RpcService.Push, &a.pushConfig.Share, a.pushConfig, push.Start)
} }

View File

@ -59,5 +59,5 @@ func (a *ThirdRpcCmd) Exec() error {
func (a *ThirdRpcCmd) runE() error { func (a *ThirdRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP,
a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.AutoSetPorts, a.thirdConfig.RpcConfig.RPC.Ports, a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.AutoSetPorts, a.thirdConfig.RpcConfig.RPC.Ports,
a.Index(), a.thirdConfig.Share.RpcRegisterName.Third, &a.thirdConfig.Share, a.thirdConfig, third.Start) a.Index(), a.thirdConfig.Discovery.RpcService.Third, &a.thirdConfig.Share, a.thirdConfig, third.Start)
} }

View File

@ -60,5 +60,5 @@ func (a *UserRpcCmd) Exec() error {
func (a *UserRpcCmd) runE() error { func (a *UserRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP,
a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.AutoSetPorts, a.userConfig.RpcConfig.RPC.Ports, a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.AutoSetPorts, a.userConfig.RpcConfig.RPC.Ports,
a.Index(), a.userConfig.Share.RpcRegisterName.User, &a.userConfig.Share, a.userConfig, user.Start) a.Index(), a.userConfig.Discovery.RpcService.User, &a.userConfig.Share, a.userConfig, user.Start)
} }

View File

@ -373,7 +373,6 @@ type AfterConfig struct {
type Share struct { type Share struct {
Secret string `mapstructure:"secret"` Secret string `mapstructure:"secret"`
RpcRegisterName RpcRegisterName `mapstructure:"rpcRegisterName"`
IMAdminUserID []string `mapstructure:"imAdminUserID"` IMAdminUserID []string `mapstructure:"imAdminUserID"`
MultiLogin MultiLogin `mapstructure:"multiLogin"` MultiLogin MultiLogin `mapstructure:"multiLogin"`
} }
@ -383,7 +382,7 @@ type MultiLogin struct {
MaxNumOneEnd int `mapstructure:"maxNumOneEnd"` MaxNumOneEnd int `mapstructure:"maxNumOneEnd"`
} }
type RpcRegisterName struct { type RpcService struct {
User string `mapstructure:"user"` User string `mapstructure:"user"`
Friend string `mapstructure:"friend"` Friend string `mapstructure:"friend"`
Msg string `mapstructure:"msg"` Msg string `mapstructure:"msg"`
@ -395,7 +394,7 @@ type RpcRegisterName struct {
Third string `mapstructure:"third"` Third string `mapstructure:"third"`
} }
func (r *RpcRegisterName) GetServiceNames() []string { func (r *RpcService) GetServiceNames() []string {
return []string{ return []string{
r.User, r.User,
r.Friend, r.Friend,
@ -472,7 +471,7 @@ type ZooKeeper struct {
type Discovery struct { type Discovery struct {
Enable string `mapstructure:"enable"` Enable string `mapstructure:"enable"`
Etcd Etcd `mapstructure:"etcd"` Etcd Etcd `mapstructure:"etcd"`
ZooKeeper ZooKeeper `mapstructure:"zooKeeper"` RpcService RpcService `mapstructure:"rpcService"`
} }
type Etcd struct { type Etcd struct {

View File

@ -19,25 +19,15 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes" "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes"
"github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/discovery/zookeeper"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"time" "time"
) )
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type. // NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
func NewDiscoveryRegister(discovery *config.Discovery, share *config.Share) (discovery.SvcDiscoveryRegistry, error) { func NewDiscoveryRegister(discovery *config.Discovery) (discovery.SvcDiscoveryRegistry, error) {
switch discovery.Enable { switch discovery.Enable {
case "zookeeper":
return zookeeper.NewZkClient(
discovery.ZooKeeper.Address,
discovery.ZooKeeper.Schema,
zookeeper.WithFreq(time.Hour),
zookeeper.WithUserNameAndPassword(discovery.ZooKeeper.Username, discovery.ZooKeeper.Password),
zookeeper.WithRoundRobin(),
zookeeper.WithTimeout(10),
)
case "k8s": case "k8s":
return kubernetes.NewK8sDiscoveryRegister(share.RpcRegisterName.MessageGateway) return kubernetes.NewK8sDiscoveryRegister(discovery.RpcService.MessageGateway)
case "etcd": case "etcd":
return etcd.NewSvcDiscoveryRegistry( return etcd.NewSvcDiscoveryRegistry(
discovery.Etcd.RootDirectory, discovery.Etcd.RootDirectory,

View File

@ -42,25 +42,25 @@ func GetGrpcServerMetrics() *gp.ServerMetrics {
return grpcMetrics return grpcMetrics
} }
func GetGrpcCusMetrics(registerName string, share *config.Share) []prometheus.Collector { func GetGrpcCusMetrics(registerName string, discovery *config.Discovery) []prometheus.Collector {
switch registerName { switch registerName {
case share.RpcRegisterName.MessageGateway: case discovery.RpcService.MessageGateway:
return []prometheus.Collector{OnlineUserGauge} return []prometheus.Collector{OnlineUserGauge}
case share.RpcRegisterName.Msg: case discovery.RpcService.Msg:
return []prometheus.Collector{ return []prometheus.Collector{
SingleChatMsgProcessSuccessCounter, SingleChatMsgProcessSuccessCounter,
SingleChatMsgProcessFailedCounter, SingleChatMsgProcessFailedCounter,
GroupChatMsgProcessSuccessCounter, GroupChatMsgProcessSuccessCounter,
GroupChatMsgProcessFailedCounter, GroupChatMsgProcessFailedCounter,
} }
case share.RpcRegisterName.Push: case discovery.RpcService.Push:
return []prometheus.Collector{ return []prometheus.Collector{
MsgOfflinePushFailedCounter, MsgOfflinePushFailedCounter,
MsgLoneTimePushCounter, MsgLoneTimePushCounter,
} }
case share.RpcRegisterName.Auth: case discovery.RpcService.Auth:
return []prometheus.Collector{UserLoginCounter} return []prometheus.Collector{UserLoginCounter}
case share.RpcRegisterName.User: case discovery.RpcService.User:
return []prometheus.Collector{UserRegisterCounter} return []prometheus.Collector{UserRegisterCounter}
default: default:
return nil return nil

View File

@ -77,7 +77,7 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo
if err != nil { if err != nil {
return err return err
} }
cs := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share) cs := prommetrics.GetGrpcCusMetrics(rpcRegisterName, discovery)
go func() { go func() {
if err := prommetrics.RpcInit(cs, prometheusPort); err != nil && !errors.Is(err, http.ErrServerClosed) { if err := prommetrics.RpcInit(cs, prometheusPort); err != nil && !errors.Is(err, http.ErrServerClosed) {
netErr = errs.WrapMsg(err, fmt.Sprintf("rpc %s prometheus start err: %d", rpcRegisterName, prometheusPort)) netErr = errs.WrapMsg(err, fmt.Sprintf("rpc %s prometheus start err: %d", rpcRegisterName, prometheusPort))
@ -111,7 +111,7 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo
"prometheusPorts", prometheusConfig.Ports) "prometheusPorts", prometheusConfig.Ports)
defer listener.Close() defer listener.Close()
client, err := kdisc.NewDiscoveryRegister(discovery, share) client, err := kdisc.NewDiscoveryRegister(discovery)
if err != nil { if err != nil {
return err return err
} }

View File

@ -160,10 +160,6 @@ func performChecks(ctx context.Context, mongoConfig *config.Mongo, redisConfig *
checks["Etcd"] = func(ctx context.Context) error { checks["Etcd"] = func(ctx context.Context) error {
return CheckEtcd(ctx, &discovery.Etcd) return CheckEtcd(ctx, &discovery.Etcd)
} }
} else if discovery.Enable == "zookeeper" {
checks["Zookeeper"] = func(ctx context.Context) error {
return CheckZookeeper(ctx, &discovery.ZooKeeper)
}
} }
for i := 0; i < maxRetry; i++ { for i := 0; i < maxRetry; i++ {