mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-25 04:32:10 +08:00 
			
		
		
		
	fix: discovery
This commit is contained in:
		
							parent
							
								
									1339121e29
								
							
						
					
					
						commit
						8c6d734f88
					
				| @ -1,25 +1,8 @@ | ||||
| // Copyright © 2023 OpenIM. All rights reserved. | ||||
| // | ||||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| // you may not use this file except in compliance with the License. | ||||
| // You may obtain a copy of the License at | ||||
| // | ||||
| //     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| // | ||||
| // Unless required by applicable law or agreed to in writing, software | ||||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| // See the License for the specific language governing permissions and | ||||
| // limitations under the License. | ||||
| 
 | ||||
| package api | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||
| 	"github.com/openimsdk/tools/utils/datautil" | ||||
| 	"github.com/openimsdk/tools/utils/network" | ||||
| 	"net" | ||||
| 	"net/http" | ||||
| 	"os" | ||||
| @ -28,6 +11,10 @@ import ( | ||||
| 	"syscall" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||
| 	"github.com/openimsdk/tools/utils/datautil" | ||||
| 	"github.com/openimsdk/tools/utils/network" | ||||
| 
 | ||||
| 	kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" | ||||
| 	"github.com/openimsdk/tools/discovery" | ||||
| @ -51,7 +38,9 @@ func Start(ctx context.Context, index int, config *Config) error { | ||||
| 	var client discovery.SvcDiscoveryRegistry | ||||
| 
 | ||||
| 	// Determine whether zk is passed according to whether it is a clustered deployment | ||||
| 	client, err = kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share) | ||||
| 	client, err = kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share, []string{ | ||||
| 		config.Share.RpcRegisterName.MessageGateway, | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		return errs.WrapMsg(err, "failed to register discovery service") | ||||
| 	} | ||||
|  | ||||
| @ -16,9 +16,10 @@ package msggateway | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/rpcli" | ||||
| 	"sync/atomic" | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/rpcli" | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/authverify" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" | ||||
| @ -57,6 +58,9 @@ func (s *Server) Start(ctx context.Context, index int, conf *Config) error { | ||||
| 		conf.Share.RpcRegisterName.MessageGateway, | ||||
| 		&conf.Share, | ||||
| 		conf, | ||||
| 		[]string{ | ||||
| 			conf.Share.RpcRegisterName.MessageGateway, | ||||
| 		}, | ||||
| 		s.InitServer, | ||||
| 	) | ||||
| } | ||||
|  | ||||
| @ -74,7 +74,7 @@ func Start(ctx context.Context, index int, config *Config) error { | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	client, err := discRegister.NewDiscoveryRegister(&config.Discovery, &config.Share) | ||||
| 	client, err := discRegister.NewDiscoveryRegister(&config.Discovery, &config.Share, nil) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| @ -16,6 +16,7 @@ package tools | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||
| 	kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" | ||||
| 	pbconversation "github.com/openimsdk/protocol/conversation" | ||||
| @ -43,7 +44,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error { | ||||
| 	if config.CronTask.RetainChatRecords < 1 { | ||||
| 		return errs.New("msg destruct time must be greater than 1").Wrap() | ||||
| 	} | ||||
| 	client, err := kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share) | ||||
| 	client, err := kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share, nil) | ||||
| 	if err != nil { | ||||
| 		return errs.WrapMsg(err, "failed to register discovery service") | ||||
| 	} | ||||
|  | ||||
| @ -56,5 +56,9 @@ func (a *AuthRpcCmd) Exec() error { | ||||
| func (a *AuthRpcCmd) runE() error { | ||||
| 	return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, | ||||
| 		a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.Ports, | ||||
| 		a.Index(), a.authConfig.Share.RpcRegisterName.Auth, &a.authConfig.Share, a.authConfig, auth.Start) | ||||
| 		a.Index(), a.authConfig.Share.RpcRegisterName.Auth, &a.authConfig.Share, a.authConfig, | ||||
| 		[]string{ | ||||
| 			a.authConfig.Share.RpcRegisterName.MessageGateway, | ||||
| 		}, | ||||
| 		auth.Start) | ||||
| } | ||||
|  | ||||
| @ -58,5 +58,7 @@ func (a *ConversationRpcCmd) Exec() error { | ||||
| func (a *ConversationRpcCmd) runE() error { | ||||
| 	return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, | ||||
| 		a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.Ports, | ||||
| 		a.Index(), a.conversationConfig.Share.RpcRegisterName.Conversation, &a.conversationConfig.Share, a.conversationConfig, conversation.Start) | ||||
| 		a.Index(), a.conversationConfig.Share.RpcRegisterName.Conversation, &a.conversationConfig.Share, a.conversationConfig, | ||||
| 		nil, | ||||
| 		conversation.Start) | ||||
| } | ||||
|  | ||||
| @ -59,5 +59,7 @@ func (a *FriendRpcCmd) Exec() error { | ||||
| func (a *FriendRpcCmd) runE() error { | ||||
| 	return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP, | ||||
| 		a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.Ports, | ||||
| 		a.Index(), a.relationConfig.Share.RpcRegisterName.Friend, &a.relationConfig.Share, a.relationConfig, relation.Start) | ||||
| 		a.Index(), a.relationConfig.Share.RpcRegisterName.Friend, &a.relationConfig.Share, a.relationConfig, | ||||
| 		nil, | ||||
| 		relation.Start) | ||||
| } | ||||
|  | ||||
| @ -60,5 +60,7 @@ func (a *GroupRpcCmd) Exec() error { | ||||
| func (a *GroupRpcCmd) runE() error { | ||||
| 	return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, | ||||
| 		a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.Ports, | ||||
| 		a.Index(), a.groupConfig.Share.RpcRegisterName.Group, &a.groupConfig.Share, a.groupConfig, group.Start, versionctx.EnableVersionCtx()) | ||||
| 		a.Index(), a.groupConfig.Share.RpcRegisterName.Group, &a.groupConfig.Share, a.groupConfig, | ||||
| 		nil, | ||||
| 		group.Start, versionctx.EnableVersionCtx()) | ||||
| } | ||||
|  | ||||
| @ -60,5 +60,7 @@ func (a *MsgRpcCmd) Exec() error { | ||||
| func (a *MsgRpcCmd) runE() error { | ||||
| 	return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, | ||||
| 		a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.Ports, | ||||
| 		a.Index(), a.msgConfig.Share.RpcRegisterName.Msg, &a.msgConfig.Share, a.msgConfig, msg.Start) | ||||
| 		a.Index(), a.msgConfig.Share.RpcRegisterName.Msg, &a.msgConfig.Share, a.msgConfig, | ||||
| 		nil, | ||||
| 		msg.Start) | ||||
| } | ||||
|  | ||||
| @ -60,5 +60,9 @@ func (a *PushRpcCmd) Exec() error { | ||||
| func (a *PushRpcCmd) runE() error { | ||||
| 	return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, | ||||
| 		a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.Ports, | ||||
| 		a.Index(), a.pushConfig.Share.RpcRegisterName.Push, &a.pushConfig.Share, a.pushConfig, push.Start) | ||||
| 		a.Index(), a.pushConfig.Share.RpcRegisterName.Push, &a.pushConfig.Share, a.pushConfig, | ||||
| 		[]string{ | ||||
| 			a.pushConfig.Share.RpcRegisterName.MessageGateway, | ||||
| 		}, | ||||
| 		push.Start) | ||||
| } | ||||
|  | ||||
| @ -59,5 +59,7 @@ func (a *ThirdRpcCmd) Exec() error { | ||||
| func (a *ThirdRpcCmd) runE() error { | ||||
| 	return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, | ||||
| 		a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.Ports, | ||||
| 		a.Index(), a.thirdConfig.Share.RpcRegisterName.Third, &a.thirdConfig.Share, a.thirdConfig, third.Start) | ||||
| 		a.Index(), a.thirdConfig.Share.RpcRegisterName.Third, &a.thirdConfig.Share, a.thirdConfig, | ||||
| 		nil, | ||||
| 		third.Start) | ||||
| } | ||||
|  | ||||
| @ -60,5 +60,7 @@ func (a *UserRpcCmd) Exec() error { | ||||
| func (a *UserRpcCmd) runE() error { | ||||
| 	return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, | ||||
| 		a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.Ports, | ||||
| 		a.Index(), a.userConfig.Share.RpcRegisterName.User, &a.userConfig.Share, a.userConfig, user.Start) | ||||
| 		a.Index(), a.userConfig.Share.RpcRegisterName.User, &a.userConfig.Share, a.userConfig, | ||||
| 		nil, | ||||
| 		user.Start) | ||||
| } | ||||
|  | ||||
| @ -15,17 +15,18 @@ | ||||
| package discoveryregister | ||||
| 
 | ||||
| import ( | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes" | ||||
| 	"github.com/openimsdk/tools/discovery" | ||||
| 	"github.com/openimsdk/tools/discovery/etcd" | ||||
| 	"github.com/openimsdk/tools/discovery/zookeeper" | ||||
| 	"github.com/openimsdk/tools/errs" | ||||
| 	"time" | ||||
| ) | ||||
| 
 | ||||
| // NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type. | ||||
| func NewDiscoveryRegister(discovery *config.Discovery, share *config.Share) (discovery.SvcDiscoveryRegistry, error) { | ||||
| func NewDiscoveryRegister(discovery *config.Discovery, share *config.Share, watchNames []string) (discovery.SvcDiscoveryRegistry, error) { | ||||
| 	switch discovery.Enable { | ||||
| 	case "zookeeper": | ||||
| 		return zookeeper.NewZkClient( | ||||
| @ -42,7 +43,7 @@ func NewDiscoveryRegister(discovery *config.Discovery, share *config.Share) (dis | ||||
| 		return etcd.NewSvcDiscoveryRegistry( | ||||
| 			discovery.Etcd.RootDirectory, | ||||
| 			discovery.Etcd.Address, | ||||
| 			nil, | ||||
| 			watchNames, | ||||
| 			etcd.WithDialTimeout(10*time.Second), | ||||
| 			etcd.WithMaxCallSendMsgSize(20*1024*1024), | ||||
| 			etcd.WithUsernameAndPassword(discovery.Etcd.Username, discovery.Etcd.Password)) | ||||
|  | ||||
| @ -17,9 +17,6 @@ package startrpc | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||
| 	"github.com/openimsdk/tools/utils/datautil" | ||||
| 	"google.golang.org/grpc/status" | ||||
| 	"net" | ||||
| 	"net/http" | ||||
| 	"os" | ||||
| @ -28,6 +25,10 @@ import ( | ||||
| 	"syscall" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||
| 	"github.com/openimsdk/tools/utils/datautil" | ||||
| 	"google.golang.org/grpc/status" | ||||
| 
 | ||||
| 	kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" | ||||
| 	"github.com/openimsdk/tools/discovery" | ||||
| @ -41,8 +42,10 @@ import ( | ||||
| 
 | ||||
| // Start rpc server. | ||||
| func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusConfig *config.Prometheus, listenIP, | ||||
| 	registerIP string, rpcPorts []int, index int, rpcRegisterName string, share *config.Share, config T, rpcFn func(ctx context.Context, | ||||
| 	config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { | ||||
| 	registerIP string, rpcPorts []int, index int, rpcRegisterName string, share *config.Share, config T, | ||||
| 	watchServiceNames []string, | ||||
| 	rpcFn func(ctx context.Context, | ||||
| 		config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { | ||||
| 
 | ||||
| 	rpcPort, err := datautil.GetElemByIndex(rpcPorts, index) | ||||
| 	if err != nil { | ||||
| @ -61,7 +64,7 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo | ||||
| 		return errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr) | ||||
| 	} | ||||
| 	defer listener.Close() | ||||
| 	client, err := kdisc.NewDiscoveryRegister(discovery, share) | ||||
| 	client, err := kdisc.NewDiscoveryRegister(discovery, share, watchServiceNames) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user