diff --git a/internal/api/init.go b/internal/api/init.go index 2a388655c..360ea77f5 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -1,25 +1,8 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package api import ( "context" "fmt" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/tools/utils/datautil" - "github.com/openimsdk/tools/utils/network" "net" "net/http" "os" @@ -28,6 +11,10 @@ import ( "syscall" "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/tools/utils/datautil" + "github.com/openimsdk/tools/utils/network" + kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/tools/discovery" @@ -51,7 +38,9 @@ func Start(ctx context.Context, index int, config *Config) error { var client discovery.SvcDiscoveryRegistry // Determine whether zk is passed according to whether it is a clustered deployment - client, err = kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share) + client, err = kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share, []string{ + config.Share.RpcRegisterName.MessageGateway, + }) if err != nil { return errs.WrapMsg(err, "failed to register discovery service") } diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index 441cdbd13..13f63f8b9 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -16,9 +16,10 @@ package msggateway import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "sync/atomic" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" @@ -57,6 +58,9 @@ func (s *Server) Start(ctx context.Context, index int, conf *Config) error { conf.Share.RpcRegisterName.MessageGateway, &conf.Share, conf, + []string{ + conf.Share.RpcRegisterName.MessageGateway, + }, s.InitServer, ) } diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 42612e294..bf315bd83 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -74,7 +74,7 @@ func Start(ctx context.Context, index int, config *Config) error { if err != nil { return err } - client, err := discRegister.NewDiscoveryRegister(&config.Discovery, &config.Share) + client, err := discRegister.NewDiscoveryRegister(&config.Discovery, &config.Share, nil) if err != nil { return err } diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 79df7500a..689244dd1 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -16,6 +16,7 @@ package tools import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" pbconversation "github.com/openimsdk/protocol/conversation" @@ -43,7 +44,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error { if config.CronTask.RetainChatRecords < 1 { return errs.New("msg destruct time must be greater than 1").Wrap() } - client, err := kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share) + client, err := kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share, nil) if err != nil { return errs.WrapMsg(err, "failed to register discovery service") } diff --git a/pkg/common/cmd/auth.go b/pkg/common/cmd/auth.go index b35a95f39..cc7077385 100644 --- a/pkg/common/cmd/auth.go +++ b/pkg/common/cmd/auth.go @@ -56,5 +56,9 @@ func (a *AuthRpcCmd) Exec() error { func (a *AuthRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.Ports, - a.Index(), a.authConfig.Share.RpcRegisterName.Auth, &a.authConfig.Share, a.authConfig, auth.Start) + a.Index(), a.authConfig.Share.RpcRegisterName.Auth, &a.authConfig.Share, a.authConfig, + []string{ + a.authConfig.Share.RpcRegisterName.MessageGateway, + }, + auth.Start) } diff --git a/pkg/common/cmd/conversation.go b/pkg/common/cmd/conversation.go index bdb4447f4..77b88b79a 100644 --- a/pkg/common/cmd/conversation.go +++ b/pkg/common/cmd/conversation.go @@ -58,5 +58,7 @@ func (a *ConversationRpcCmd) Exec() error { func (a *ConversationRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.Ports, - a.Index(), a.conversationConfig.Share.RpcRegisterName.Conversation, &a.conversationConfig.Share, a.conversationConfig, conversation.Start) + a.Index(), a.conversationConfig.Share.RpcRegisterName.Conversation, &a.conversationConfig.Share, a.conversationConfig, + nil, + conversation.Start) } diff --git a/pkg/common/cmd/friend.go b/pkg/common/cmd/friend.go index a564facd0..bbab2a93f 100644 --- a/pkg/common/cmd/friend.go +++ b/pkg/common/cmd/friend.go @@ -59,5 +59,7 @@ func (a *FriendRpcCmd) Exec() error { func (a *FriendRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP, a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.Ports, - a.Index(), a.relationConfig.Share.RpcRegisterName.Friend, &a.relationConfig.Share, a.relationConfig, relation.Start) + a.Index(), a.relationConfig.Share.RpcRegisterName.Friend, &a.relationConfig.Share, a.relationConfig, + nil, + relation.Start) } diff --git a/pkg/common/cmd/group.go b/pkg/common/cmd/group.go index 9b0fbf8de..ecfc08603 100644 --- a/pkg/common/cmd/group.go +++ b/pkg/common/cmd/group.go @@ -60,5 +60,7 @@ func (a *GroupRpcCmd) Exec() error { func (a *GroupRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.Ports, - a.Index(), a.groupConfig.Share.RpcRegisterName.Group, &a.groupConfig.Share, a.groupConfig, group.Start, versionctx.EnableVersionCtx()) + a.Index(), a.groupConfig.Share.RpcRegisterName.Group, &a.groupConfig.Share, a.groupConfig, + nil, + group.Start, versionctx.EnableVersionCtx()) } diff --git a/pkg/common/cmd/msg.go b/pkg/common/cmd/msg.go index bfd29398e..0feeb7b64 100644 --- a/pkg/common/cmd/msg.go +++ b/pkg/common/cmd/msg.go @@ -60,5 +60,7 @@ func (a *MsgRpcCmd) Exec() error { func (a *MsgRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.Ports, - a.Index(), a.msgConfig.Share.RpcRegisterName.Msg, &a.msgConfig.Share, a.msgConfig, msg.Start) + a.Index(), a.msgConfig.Share.RpcRegisterName.Msg, &a.msgConfig.Share, a.msgConfig, + nil, + msg.Start) } diff --git a/pkg/common/cmd/push.go b/pkg/common/cmd/push.go index ca22a697d..121d59f9d 100644 --- a/pkg/common/cmd/push.go +++ b/pkg/common/cmd/push.go @@ -60,5 +60,9 @@ func (a *PushRpcCmd) Exec() error { func (a *PushRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.Ports, - a.Index(), a.pushConfig.Share.RpcRegisterName.Push, &a.pushConfig.Share, a.pushConfig, push.Start) + a.Index(), a.pushConfig.Share.RpcRegisterName.Push, &a.pushConfig.Share, a.pushConfig, + []string{ + a.pushConfig.Share.RpcRegisterName.MessageGateway, + }, + push.Start) } diff --git a/pkg/common/cmd/third.go b/pkg/common/cmd/third.go index a301b738f..850b0aae5 100644 --- a/pkg/common/cmd/third.go +++ b/pkg/common/cmd/third.go @@ -59,5 +59,7 @@ func (a *ThirdRpcCmd) Exec() error { func (a *ThirdRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.Ports, - a.Index(), a.thirdConfig.Share.RpcRegisterName.Third, &a.thirdConfig.Share, a.thirdConfig, third.Start) + a.Index(), a.thirdConfig.Share.RpcRegisterName.Third, &a.thirdConfig.Share, a.thirdConfig, + nil, + third.Start) } diff --git a/pkg/common/cmd/user.go b/pkg/common/cmd/user.go index 9a614afca..6ba6ba3bd 100644 --- a/pkg/common/cmd/user.go +++ b/pkg/common/cmd/user.go @@ -60,5 +60,7 @@ func (a *UserRpcCmd) Exec() error { func (a *UserRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.Ports, - a.Index(), a.userConfig.Share.RpcRegisterName.User, &a.userConfig.Share, a.userConfig, user.Start) + a.Index(), a.userConfig.Share.RpcRegisterName.User, &a.userConfig.Share, a.userConfig, + nil, + user.Start) } diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index 5731ffcdf..f2d96cda4 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -15,17 +15,18 @@ package discoveryregister import ( + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/discovery/zookeeper" "github.com/openimsdk/tools/errs" - "time" ) // NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type. -func NewDiscoveryRegister(discovery *config.Discovery, share *config.Share) (discovery.SvcDiscoveryRegistry, error) { +func NewDiscoveryRegister(discovery *config.Discovery, share *config.Share, watchNames []string) (discovery.SvcDiscoveryRegistry, error) { switch discovery.Enable { case "zookeeper": return zookeeper.NewZkClient( @@ -42,7 +43,7 @@ func NewDiscoveryRegister(discovery *config.Discovery, share *config.Share) (dis return etcd.NewSvcDiscoveryRegistry( discovery.Etcd.RootDirectory, discovery.Etcd.Address, - nil, + watchNames, etcd.WithDialTimeout(10*time.Second), etcd.WithMaxCallSendMsgSize(20*1024*1024), etcd.WithUsernameAndPassword(discovery.Etcd.Username, discovery.Etcd.Password)) diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 31f1a060a..0a59a90b8 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -17,9 +17,6 @@ package startrpc import ( "context" "fmt" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/tools/utils/datautil" - "google.golang.org/grpc/status" "net" "net/http" "os" @@ -28,6 +25,10 @@ import ( "syscall" "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/tools/utils/datautil" + "google.golang.org/grpc/status" + kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/tools/discovery" @@ -41,8 +42,10 @@ import ( // Start rpc server. func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusConfig *config.Prometheus, listenIP, - registerIP string, rpcPorts []int, index int, rpcRegisterName string, share *config.Share, config T, rpcFn func(ctx context.Context, - config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { + registerIP string, rpcPorts []int, index int, rpcRegisterName string, share *config.Share, config T, + watchServiceNames []string, + rpcFn func(ctx context.Context, + config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { rpcPort, err := datautil.GetElemByIndex(rpcPorts, index) if err != nil { @@ -61,7 +64,7 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo return errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr) } defer listener.Close() - client, err := kdisc.NewDiscoveryRegister(discovery, share) + client, err := kdisc.NewDiscoveryRegister(discovery, share, watchServiceNames) if err != nil { return err } diff --git a/version/version b/version/version index 00e897bda..6a39225f1 100644 --- a/version/version +++ b/version/version @@ -1 +1 @@ -3.8.2 \ No newline at end of file +v3.8.3