mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-04 19:32:17 +08:00 
			
		
		
		
	Merge remote-tracking branch 'upstream/main' into fixbug1
# Conflicts: # go.mod # go.sum
This commit is contained in:
		
						commit
						3dacb99587
					
				
							
								
								
									
										2
									
								
								.env
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								.env
									
									
									
									
									
								
							@ -4,7 +4,7 @@ REDIS_IMAGE=redis:7.0.0
 | 
			
		||||
ZOOKEEPER_IMAGE=bitnami/zookeeper:3.8
 | 
			
		||||
KAFKA_IMAGE=bitnami/kafka:3.5.1
 | 
			
		||||
MINIO_IMAGE=minio/minio:RELEASE.2024-01-11T07-46-16Z
 | 
			
		||||
 | 
			
		||||
ETCD_IMAGE=quay.io/coreos/etcd:v3.5.13
 | 
			
		||||
 | 
			
		||||
OPENIM_WEB_FRONT_IMAGE=openim/openim-web-front:release-v3.5.1
 | 
			
		||||
OPENIM_ADMIN_FRONT_IMAGE=openim/openim-admin-front:release-v1.7
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										13
									
								
								config/discovery.yml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										13
									
								
								config/discovery.yml
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,13 @@
 | 
			
		||||
enable: "etcd"
 | 
			
		||||
etcd:
 | 
			
		||||
  rootDirectory: openim
 | 
			
		||||
  address: [ localhost:12379 ]
 | 
			
		||||
  username: ''
 | 
			
		||||
  password: ''
 | 
			
		||||
 | 
			
		||||
zookeeper:
 | 
			
		||||
  schema: openim
 | 
			
		||||
  address: [ localhost:12181 ]
 | 
			
		||||
  username: ''
 | 
			
		||||
  password: ''
 | 
			
		||||
 | 
			
		||||
@ -1,5 +1,4 @@
 | 
			
		||||
secret: openIM123
 | 
			
		||||
env: zookeeper
 | 
			
		||||
rpcRegisterName:
 | 
			
		||||
  user: user
 | 
			
		||||
  friend: friend
 | 
			
		||||
 | 
			
		||||
@ -1,6 +0,0 @@
 | 
			
		||||
 | 
			
		||||
schema: openim
 | 
			
		||||
address: [ localhost:12181 ]
 | 
			
		||||
username: ''
 | 
			
		||||
password: ''
 | 
			
		||||
 | 
			
		||||
@ -58,6 +58,26 @@ services:
 | 
			
		||||
    networks:
 | 
			
		||||
      - openim
 | 
			
		||||
 | 
			
		||||
  etcd:
 | 
			
		||||
    image: "${ETCD_IMAGE}"
 | 
			
		||||
    container_name: etcd
 | 
			
		||||
    ports:
 | 
			
		||||
      - "12379:2379"
 | 
			
		||||
      - "12380:2380"
 | 
			
		||||
    environment:
 | 
			
		||||
      - ETCD_NAME=s1
 | 
			
		||||
      - ETCD_DATA_DIR=/etcd-data
 | 
			
		||||
      - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379
 | 
			
		||||
      - ETCD_ADVERTISE_CLIENT_URLS=http://0.0.0.0:2379
 | 
			
		||||
      - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380
 | 
			
		||||
      - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://0.0.0.0:2380
 | 
			
		||||
      - ETCD_INITIAL_CLUSTER=s1=http://0.0.0.0:2380
 | 
			
		||||
      - ETCD_INITIAL_CLUSTER_TOKEN=tkn
 | 
			
		||||
      - ETCD_INITIAL_CLUSTER_STATE=new
 | 
			
		||||
    restart: always
 | 
			
		||||
    networks:
 | 
			
		||||
      - openim
 | 
			
		||||
 | 
			
		||||
  kafka:
 | 
			
		||||
    image: "${KAFKA_IMAGE}"
 | 
			
		||||
    container_name: kafka
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										5
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										5
									
								
								go.mod
									
									
									
									
									
								
							@ -58,6 +58,8 @@ require (
 | 
			
		||||
	github.com/cespare/xxhash/v2 v2.2.0 // indirect
 | 
			
		||||
	github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
 | 
			
		||||
	github.com/clbanning/mxj v1.8.4 // indirect
 | 
			
		||||
	github.com/coreos/go-semver v0.3.0 // indirect
 | 
			
		||||
	github.com/coreos/go-systemd/v22 v22.3.2 // indirect
 | 
			
		||||
	github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
 | 
			
		||||
	github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
 | 
			
		||||
	github.com/dustin/go-humanize v1.0.1 // indirect
 | 
			
		||||
@ -137,6 +139,9 @@ require (
 | 
			
		||||
	github.com/xdg-go/stringprep v1.0.4 // indirect
 | 
			
		||||
	github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
 | 
			
		||||
	github.com/yusufpapurcu/wmi v1.2.4 // indirect
 | 
			
		||||
	go.etcd.io/etcd/api/v3 v3.5.13 // indirect
 | 
			
		||||
	go.etcd.io/etcd/client/pkg/v3 v3.5.13 // indirect
 | 
			
		||||
	go.etcd.io/etcd/client/v3 v3.5.13 // indirect
 | 
			
		||||
	go.opencensus.io v0.24.0 // indirect
 | 
			
		||||
	go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect
 | 
			
		||||
	go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										15
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										15
									
								
								go.sum
									
									
									
									
									
								
							@ -47,6 +47,10 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
 | 
			
		||||
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
 | 
			
		||||
github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa h1:jQCWAUqqlij9Pgj2i/PB79y4KOPYVyFYdROxgaCwdTQ=
 | 
			
		||||
github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq8dk6e9PdstVsDgu9RuyIIJqAaF//0IM=
 | 
			
		||||
github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM=
 | 
			
		||||
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
 | 
			
		||||
github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI=
 | 
			
		||||
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
 | 
			
		||||
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
 | 
			
		||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 | 
			
		||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 | 
			
		||||
@ -111,6 +115,7 @@ github.com/go-zookeeper/zk v1.0.3 h1:7M2kwOsc//9VeeFiPtf+uSJlVpU66x9Ba5+8XK7/TDg
 | 
			
		||||
github.com/go-zookeeper/zk v1.0.3/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
 | 
			
		||||
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
 | 
			
		||||
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
 | 
			
		||||
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
 | 
			
		||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
 | 
			
		||||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
 | 
			
		||||
github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg=
 | 
			
		||||
@ -283,8 +288,8 @@ github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJ
 | 
			
		||||
github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
 | 
			
		||||
github.com/openimsdk/protocol v0.0.65 h1:SPT9qyUsFRTTKSKb/FjpS+xr6sxz/Kbnu+su1bxYagc=
 | 
			
		||||
github.com/openimsdk/protocol v0.0.65/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
 | 
			
		||||
github.com/openimsdk/tools v0.0.49-alpha.19 h1:CbASL0yefRSVAmWPVeRnhF7wZKd6umLfz31CIhEgrBs=
 | 
			
		||||
github.com/openimsdk/tools v0.0.49-alpha.19/go.mod h1:g7mkHXYUPi0/8aAX8VPMHpnb3hqdV69Jph+bXOGvvNM=
 | 
			
		||||
github.com/openimsdk/tools v0.0.49-alpha.18 h1:ARQeCiRmExvtB6XYItegThuV63JGOTxddwhSLHYXd78=
 | 
			
		||||
github.com/openimsdk/tools v0.0.49-alpha.18/go.mod h1:g7mkHXYUPi0/8aAX8VPMHpnb3hqdV69Jph+bXOGvvNM=
 | 
			
		||||
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
 | 
			
		||||
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
 | 
			
		||||
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
 | 
			
		||||
@ -378,6 +383,12 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec
 | 
			
		||||
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
 | 
			
		||||
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
 | 
			
		||||
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
 | 
			
		||||
go.etcd.io/etcd/api/v3 v3.5.13 h1:8WXU2/NBge6AUF1K1gOexB6e07NgsN1hXK0rSTtgSp4=
 | 
			
		||||
go.etcd.io/etcd/api/v3 v3.5.13/go.mod h1:gBqlqkcMMZMVTMm4NDZloEVJzxQOQIls8splbqBDa0c=
 | 
			
		||||
go.etcd.io/etcd/client/pkg/v3 v3.5.13 h1:RVZSAnWWWiI5IrYAXjQorajncORbS0zI48LQlE2kQWg=
 | 
			
		||||
go.etcd.io/etcd/client/pkg/v3 v3.5.13/go.mod h1:XxHT4u1qU12E2+po+UVPrEeL94Um6zL58ppuJWXSAB8=
 | 
			
		||||
go.etcd.io/etcd/client/v3 v3.5.13 h1:o0fHTNJLeO0MyVbc7I3fsCf6nrOqn5d+diSarKnB2js=
 | 
			
		||||
go.etcd.io/etcd/client/v3 v3.5.13/go.mod h1:cqiAeY8b5DEEcpxvgWKsbLIWNM/8Wy2xJSDMtioMcoI=
 | 
			
		||||
go.mongodb.org/mongo-driver v1.14.0 h1:P98w8egYRjYe3XDjxhYJagTokP/H6HzlsnojRgZRd80=
 | 
			
		||||
go.mongodb.org/mongo-driver v1.14.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c=
 | 
			
		||||
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
 | 
			
		||||
 | 
			
		||||
@ -38,20 +38,17 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type Config struct {
 | 
			
		||||
	RpcConfig          config.API
 | 
			
		||||
	MongodbConfig      config.Mongo
 | 
			
		||||
	ZookeeperConfig    config.ZooKeeper
 | 
			
		||||
	NotificationConfig config.Notification
 | 
			
		||||
	Share              config.Share
 | 
			
		||||
	MinioConfig        config.Minio
 | 
			
		||||
	API       config.API
 | 
			
		||||
	Share     config.Share
 | 
			
		||||
	Discovery config.Discovery
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Start(ctx context.Context, index int, config *Config) error {
 | 
			
		||||
	apiPort, err := datautil.GetElemByIndex(config.RpcConfig.Api.Ports, index)
 | 
			
		||||
	apiPort, err := datautil.GetElemByIndex(config.API.Api.Ports, index)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	prometheusPort, err := datautil.GetElemByIndex(config.RpcConfig.Prometheus.Ports, index)
 | 
			
		||||
	prometheusPort, err := datautil.GetElemByIndex(config.API.Prometheus.Ports, index)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
@ -59,7 +56,7 @@ func Start(ctx context.Context, index int, config *Config) error {
 | 
			
		||||
	var client discovery.SvcDiscoveryRegistry
 | 
			
		||||
 | 
			
		||||
	// Determine whether zk is passed according to whether it is a clustered deployment
 | 
			
		||||
	client, err = kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share)
 | 
			
		||||
	client, err = kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return errs.WrapMsg(err, "failed to register discovery service")
 | 
			
		||||
	}
 | 
			
		||||
@ -70,7 +67,7 @@ func Start(ctx context.Context, index int, config *Config) error {
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	router := newGinRouter(client, config)
 | 
			
		||||
	if config.RpcConfig.Prometheus.Enable {
 | 
			
		||||
	if config.API.Prometheus.Enable {
 | 
			
		||||
		go func() {
 | 
			
		||||
			p := ginprom.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api"))
 | 
			
		||||
			p.SetListenAddress(fmt.Sprintf(":%d", prometheusPort))
 | 
			
		||||
@ -81,7 +78,7 @@ func Start(ctx context.Context, index int, config *Config) error {
 | 
			
		||||
		}()
 | 
			
		||||
 | 
			
		||||
	}
 | 
			
		||||
	address := net.JoinHostPort(network.GetListenIP(config.RpcConfig.Api.ListenIP), strconv.Itoa(apiPort))
 | 
			
		||||
	address := net.JoinHostPort(network.GetListenIP(config.API.Api.ListenIP), strconv.Itoa(apiPort))
 | 
			
		||||
 | 
			
		||||
	server := http.Server{Addr: address, Handler: router}
 | 
			
		||||
	log.CInfo(ctx, "API server is initializing", "address", address, "apiPort", apiPort, "prometheusPort", prometheusPort)
 | 
			
		||||
 | 
			
		||||
@ -34,7 +34,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
 | 
			
		||||
	messageRpc := rpcclient.NewMessage(disCov, config.Share.RpcRegisterName.Msg)
 | 
			
		||||
	conversationRpc := rpcclient.NewConversation(disCov, config.Share.RpcRegisterName.Conversation)
 | 
			
		||||
	authRpc := rpcclient.NewAuth(disCov, config.Share.RpcRegisterName.Auth)
 | 
			
		||||
	thirdRpc := rpcclient.NewThird(disCov, config.Share.RpcRegisterName.Third, config.RpcConfig.Prometheus.GrafanaURL)
 | 
			
		||||
	thirdRpc := rpcclient.NewThird(disCov, config.Share.RpcRegisterName.Third, config.API.Prometheus.GrafanaURL)
 | 
			
		||||
 | 
			
		||||
	u := NewUserApi(*userRpc)
 | 
			
		||||
	m := NewMessageApi(messageRpc, userRpc, config.Share.IMAdminUserID)
 | 
			
		||||
 | 
			
		||||
@ -286,6 +286,7 @@ func (c *Client) KickOnlineMessage() error {
 | 
			
		||||
	resp := Resp{
 | 
			
		||||
		ReqIdentifier: WSKickOnlineMsg,
 | 
			
		||||
	}
 | 
			
		||||
	log.ZDebug(c.ctx, "KickOnlineMessage debug ")
 | 
			
		||||
	err := c.writeBinaryMsg(resp)
 | 
			
		||||
	c.close()
 | 
			
		||||
	return err
 | 
			
		||||
 | 
			
		||||
@ -35,7 +35,7 @@ func (s *Server) InitServer(ctx context.Context, config *Config, disCov discover
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *Server) Start(ctx context.Context, index int, conf *Config) error {
 | 
			
		||||
	return startrpc.Start(ctx, &conf.ZookeeperConfig, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP,
 | 
			
		||||
	return startrpc.Start(ctx, &conf.Discovery, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP,
 | 
			
		||||
		conf.MsgGateway.RPC.RegisterIP,
 | 
			
		||||
		conf.MsgGateway.RPC.Ports, index,
 | 
			
		||||
		conf.Share.RpcRegisterName.MessageGateway,
 | 
			
		||||
 | 
			
		||||
@ -24,10 +24,10 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type Config struct {
 | 
			
		||||
	MsgGateway      config.MsgGateway
 | 
			
		||||
	ZookeeperConfig config.ZooKeeper
 | 
			
		||||
	Share           config.Share
 | 
			
		||||
	WebhooksConfig  config.Webhooks
 | 
			
		||||
	MsgGateway     config.MsgGateway
 | 
			
		||||
	Share          config.Share
 | 
			
		||||
	WebhooksConfig config.Webhooks
 | 
			
		||||
	Discovery      config.Discovery
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Start run ws server.
 | 
			
		||||
 | 
			
		||||
@ -211,7 +211,8 @@ func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *C
 | 
			
		||||
 | 
			
		||||
	// Online push user online message to other node
 | 
			
		||||
	for _, v := range conns {
 | 
			
		||||
		v := v // safe closure var
 | 
			
		||||
		v := v
 | 
			
		||||
		log.ZDebug(ctx, " sendUserOnlineInfoToOtherNode conn ", "target", v.Target())
 | 
			
		||||
		if v.Target() == ws.disCov.GetSelfConnTarget() {
 | 
			
		||||
			log.ZDebug(ctx, "Filter out this node", "node", v.Target())
 | 
			
		||||
			continue
 | 
			
		||||
@ -267,7 +268,9 @@ func (ws *WsServer) registerClient(client *Client) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	wg := sync.WaitGroup{}
 | 
			
		||||
	if ws.msgGatewayConfig.Share.Env == "zookeeper" {
 | 
			
		||||
	log.ZDebug(client.ctx, "ws.msgGatewayConfig.Discovery.Enable", ws.msgGatewayConfig.Discovery.Enable)
 | 
			
		||||
 | 
			
		||||
	if ws.msgGatewayConfig.Discovery.Enable != "k8s" {
 | 
			
		||||
		wg.Add(1)
 | 
			
		||||
		go func() {
 | 
			
		||||
			defer wg.Done()
 | 
			
		||||
 | 
			
		||||
@ -56,13 +56,13 @@ type MsgTransfer struct {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Config struct {
 | 
			
		||||
	MsgTransfer     config.MsgTransfer
 | 
			
		||||
	RedisConfig     config.Redis
 | 
			
		||||
	MongodbConfig   config.Mongo
 | 
			
		||||
	KafkaConfig     config.Kafka
 | 
			
		||||
	ZookeeperConfig config.ZooKeeper
 | 
			
		||||
	Share           config.Share
 | 
			
		||||
	WebhooksConfig  config.Webhooks
 | 
			
		||||
	MsgTransfer    config.MsgTransfer
 | 
			
		||||
	RedisConfig    config.Redis
 | 
			
		||||
	MongodbConfig  config.Mongo
 | 
			
		||||
	KafkaConfig    config.Kafka
 | 
			
		||||
	Share          config.Share
 | 
			
		||||
	WebhooksConfig config.Webhooks
 | 
			
		||||
	Discovery      config.Discovery
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Start(ctx context.Context, index int, config *Config) error {
 | 
			
		||||
@ -76,7 +76,7 @@ func Start(ctx context.Context, index int, config *Config) error {
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	client, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share)
 | 
			
		||||
	client, err := kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -12,11 +12,6 @@ import (
 | 
			
		||||
	"sync"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	KUBERNETES = "k8s"
 | 
			
		||||
	ZOOKEEPER  = "zookeeper"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type OnlinePusher interface {
 | 
			
		||||
	GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData,
 | 
			
		||||
		pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error)
 | 
			
		||||
@ -42,10 +37,12 @@ func (u emptyOnlinePUsher) GetOnlinePushFailedUserIDs(ctx context.Context, msg *
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewOnlinePusher(disCov discovery.SvcDiscoveryRegistry, config *Config) OnlinePusher {
 | 
			
		||||
	switch config.Share.Env {
 | 
			
		||||
	case KUBERNETES:
 | 
			
		||||
	switch config.Discovery.Enable {
 | 
			
		||||
	case "k8s":
 | 
			
		||||
		return NewK8sStaticConsistentHash(disCov, config)
 | 
			
		||||
	case ZOOKEEPER:
 | 
			
		||||
	case "zookeeper":
 | 
			
		||||
		return NewDefaultAllNode(disCov, config)
 | 
			
		||||
	case "etcd":
 | 
			
		||||
		return NewDefaultAllNode(disCov, config)
 | 
			
		||||
	default:
 | 
			
		||||
		return newEmptyOnlinePUsher()
 | 
			
		||||
@ -64,7 +61,12 @@ func NewDefaultAllNode(disCov discovery.SvcDiscoveryRegistry, config *Config) *D
 | 
			
		||||
func (d *DefaultAllNode) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData,
 | 
			
		||||
	pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
 | 
			
		||||
	conns, err := d.disCov.GetConns(ctx, d.config.Share.RpcRegisterName.MessageGateway)
 | 
			
		||||
	log.ZDebug(ctx, "get gateway conn", "conn length", len(conns))
 | 
			
		||||
	if len(conns) == 0 {
 | 
			
		||||
		log.ZWarn(ctx, "get gateway conn 0 ", nil)
 | 
			
		||||
	} else {
 | 
			
		||||
		log.ZDebug(ctx, "get gateway conn", "conn length", len(conns))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
@ -85,10 +87,12 @@ func (d *DefaultAllNode) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.M
 | 
			
		||||
	// Online push message
 | 
			
		||||
	for _, conn := range conns {
 | 
			
		||||
		conn := conn // loop var safe
 | 
			
		||||
		ctx := ctx
 | 
			
		||||
		wg.Go(func() error {
 | 
			
		||||
			msgClient := msggateway.NewMsgGatewayClient(conn)
 | 
			
		||||
			reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, input)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				log.ZError(ctx, "SuperGroupOnlineBatchPushOneMsg ", err, "req:", input.String())
 | 
			
		||||
				return nil
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -24,11 +24,11 @@ type Config struct {
 | 
			
		||||
	RedisConfig        config.Redis
 | 
			
		||||
	MongodbConfig      config.Mongo
 | 
			
		||||
	KafkaConfig        config.Kafka
 | 
			
		||||
	ZookeeperConfig    config.ZooKeeper
 | 
			
		||||
	NotificationConfig config.Notification
 | 
			
		||||
	Share              config.Share
 | 
			
		||||
	WebhooksConfig     config.Webhooks
 | 
			
		||||
	LocalCacheConfig   config.LocalCache
 | 
			
		||||
	Discovery          config.Discovery
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (p pushServer) PushMsg(ctx context.Context, req *pbpush.PushMsgReq) (*pbpush.PushMsgResp, error) {
 | 
			
		||||
 | 
			
		||||
@ -45,10 +45,10 @@ type authServer struct {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Config struct {
 | 
			
		||||
	RpcConfig       config.Auth
 | 
			
		||||
	RedisConfig     config.Redis
 | 
			
		||||
	ZookeeperConfig config.ZooKeeper
 | 
			
		||||
	Share           config.Share
 | 
			
		||||
	RpcConfig   config.Auth
 | 
			
		||||
	RedisConfig config.Redis
 | 
			
		||||
	Share       config.Share
 | 
			
		||||
	Discovery   config.Discovery
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
 | 
			
		||||
 | 
			
		||||
@ -51,10 +51,10 @@ type Config struct {
 | 
			
		||||
	RpcConfig          config.Conversation
 | 
			
		||||
	RedisConfig        config.Redis
 | 
			
		||||
	MongodbConfig      config.Mongo
 | 
			
		||||
	ZookeeperConfig    config.ZooKeeper
 | 
			
		||||
	NotificationConfig config.Notification
 | 
			
		||||
	Share              config.Share
 | 
			
		||||
	LocalCacheConfig   config.LocalCache
 | 
			
		||||
	Discovery          config.Discovery
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
 | 
			
		||||
 | 
			
		||||
@ -50,14 +50,15 @@ type friendServer struct {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Config struct {
 | 
			
		||||
	RpcConfig          config.Friend
 | 
			
		||||
	RedisConfig        config.Redis
 | 
			
		||||
	MongodbConfig      config.Mongo
 | 
			
		||||
	ZookeeperConfig    config.ZooKeeper
 | 
			
		||||
	RpcConfig     config.Friend
 | 
			
		||||
	RedisConfig   config.Redis
 | 
			
		||||
	MongodbConfig config.Mongo
 | 
			
		||||
	//ZookeeperConfig    config.ZooKeeper
 | 
			
		||||
	NotificationConfig config.Notification
 | 
			
		||||
	Share              config.Share
 | 
			
		||||
	WebhooksConfig     config.Webhooks
 | 
			
		||||
	LocalCacheConfig   config.LocalCache
 | 
			
		||||
	Discovery          config.Discovery
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
 | 
			
		||||
 | 
			
		||||
@ -68,11 +68,11 @@ type Config struct {
 | 
			
		||||
	RpcConfig          config.Group
 | 
			
		||||
	RedisConfig        config.Redis
 | 
			
		||||
	MongodbConfig      config.Mongo
 | 
			
		||||
	ZookeeperConfig    config.ZooKeeper
 | 
			
		||||
	NotificationConfig config.Notification
 | 
			
		||||
	Share              config.Share
 | 
			
		||||
	WebhooksConfig     config.Webhooks
 | 
			
		||||
	LocalCacheConfig   config.LocalCache
 | 
			
		||||
	Discovery          config.Discovery
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
 | 
			
		||||
 | 
			
		||||
@ -59,11 +59,11 @@ type (
 | 
			
		||||
		RedisConfig        config.Redis
 | 
			
		||||
		MongodbConfig      config.Mongo
 | 
			
		||||
		KafkaConfig        config.Kafka
 | 
			
		||||
		ZookeeperConfig    config.ZooKeeper
 | 
			
		||||
		NotificationConfig config.Notification
 | 
			
		||||
		Share              config.Share
 | 
			
		||||
		WebhooksConfig     config.Webhooks
 | 
			
		||||
		LocalCacheConfig   config.LocalCache
 | 
			
		||||
		Discovery          config.Discovery
 | 
			
		||||
	}
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -46,11 +46,11 @@ type Config struct {
 | 
			
		||||
	RpcConfig          config.Third
 | 
			
		||||
	RedisConfig        config.Redis
 | 
			
		||||
	MongodbConfig      config.Mongo
 | 
			
		||||
	ZookeeperConfig    config.ZooKeeper
 | 
			
		||||
	NotificationConfig config.Notification
 | 
			
		||||
	Share              config.Share
 | 
			
		||||
	MinioConfig        config.Minio
 | 
			
		||||
	LocalCacheConfig   config.LocalCache
 | 
			
		||||
	Discovery          config.Discovery
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
 | 
			
		||||
 | 
			
		||||
@ -61,11 +61,11 @@ type Config struct {
 | 
			
		||||
	RedisConfig        config.Redis
 | 
			
		||||
	MongodbConfig      config.Mongo
 | 
			
		||||
	KafkaConfig        config.Kafka
 | 
			
		||||
	ZookeeperConfig    config.ZooKeeper
 | 
			
		||||
	NotificationConfig config.Notification
 | 
			
		||||
	Share              config.Share
 | 
			
		||||
	WebhooksConfig     config.Webhooks
 | 
			
		||||
	LocalCacheConfig   config.LocalCache
 | 
			
		||||
	Discovery          config.Discovery
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
 | 
			
		||||
 | 
			
		||||
@ -33,9 +33,9 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type CronTaskConfig struct {
 | 
			
		||||
	CronTask        config.CronTask
 | 
			
		||||
	ZookeeperConfig config.ZooKeeper
 | 
			
		||||
	Share           config.Share
 | 
			
		||||
	CronTask  config.CronTask
 | 
			
		||||
	Share     config.Share
 | 
			
		||||
	Discovery config.Discovery
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Start(ctx context.Context, config *CronTaskConfig) error {
 | 
			
		||||
@ -43,7 +43,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
 | 
			
		||||
	if config.CronTask.RetainChatRecords < 1 {
 | 
			
		||||
		return errs.New("msg destruct time must be greater than 1").Wrap()
 | 
			
		||||
	}
 | 
			
		||||
	client, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share)
 | 
			
		||||
	client, err := kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return errs.WrapMsg(err, "failed to register discovery service")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -33,9 +33,9 @@ func NewApiCmd() *ApiCmd {
 | 
			
		||||
	var apiConfig api.Config
 | 
			
		||||
	ret := &ApiCmd{apiConfig: &apiConfig}
 | 
			
		||||
	ret.configMap = map[string]any{
 | 
			
		||||
		OpenIMAPICfgFileName:    &apiConfig.RpcConfig,
 | 
			
		||||
		ZookeeperConfigFileName: &apiConfig.ZookeeperConfig,
 | 
			
		||||
		OpenIMAPICfgFileName:    &apiConfig.API,
 | 
			
		||||
		ShareFileName:           &apiConfig.Share,
 | 
			
		||||
		DiscoveryConfigFilename: &apiConfig.Discovery,
 | 
			
		||||
	}
 | 
			
		||||
	ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
 | 
			
		||||
	ret.ctx = context.WithValue(context.Background(), "version", config.Version)
 | 
			
		||||
 | 
			
		||||
@ -36,8 +36,8 @@ func NewAuthRpcCmd() *AuthRpcCmd {
 | 
			
		||||
	ret.configMap = map[string]any{
 | 
			
		||||
		OpenIMRPCAuthCfgFileName: &authConfig.RpcConfig,
 | 
			
		||||
		RedisConfigFileName:      &authConfig.RedisConfig,
 | 
			
		||||
		ZookeeperConfigFileName:  &authConfig.ZookeeperConfig,
 | 
			
		||||
		ShareFileName:            &authConfig.Share,
 | 
			
		||||
		DiscoveryConfigFilename:  &authConfig.Discovery,
 | 
			
		||||
	}
 | 
			
		||||
	ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
 | 
			
		||||
	ret.ctx = context.WithValue(context.Background(), "version", config.Version)
 | 
			
		||||
@ -53,7 +53,7 @@ func (a *AuthRpcCmd) Exec() error {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (a *AuthRpcCmd) runE() error {
 | 
			
		||||
	return startrpc.Start(a.ctx, &a.authConfig.ZookeeperConfig, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP,
 | 
			
		||||
	return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP,
 | 
			
		||||
		a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.Ports,
 | 
			
		||||
		a.Index(), a.authConfig.Share.RpcRegisterName.Auth, &a.authConfig.Share, a.authConfig, auth.Start)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -26,7 +26,6 @@ var (
 | 
			
		||||
	LocalCacheConfigFileName         string
 | 
			
		||||
	KafkaConfigFileName              string
 | 
			
		||||
	RedisConfigFileName              string
 | 
			
		||||
	ZookeeperConfigFileName          string
 | 
			
		||||
	MongodbConfigFileName            string
 | 
			
		||||
	MinioConfigFileName              string
 | 
			
		||||
	LogConfigFileName                string
 | 
			
		||||
@ -42,6 +41,7 @@ var (
 | 
			
		||||
	OpenIMRPCMsgCfgFileName          string
 | 
			
		||||
	OpenIMRPCThirdCfgFileName        string
 | 
			
		||||
	OpenIMRPCUserCfgFileName         string
 | 
			
		||||
	DiscoveryConfigFilename          string
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var ConfigEnvPrefixMap map[string]string
 | 
			
		||||
@ -54,7 +54,6 @@ func init() {
 | 
			
		||||
	LocalCacheConfigFileName = "local-cache.yml"
 | 
			
		||||
	KafkaConfigFileName = "kafka.yml"
 | 
			
		||||
	RedisConfigFileName = "redis.yml"
 | 
			
		||||
	ZookeeperConfigFileName = "zookeeper.yml"
 | 
			
		||||
	MongodbConfigFileName = "mongodb.yml"
 | 
			
		||||
	MinioConfigFileName = "minio.yml"
 | 
			
		||||
	LogConfigFileName = "log.yml"
 | 
			
		||||
@ -70,16 +69,17 @@ func init() {
 | 
			
		||||
	OpenIMRPCMsgCfgFileName = "openim-rpc-msg.yml"
 | 
			
		||||
	OpenIMRPCThirdCfgFileName = "openim-rpc-third.yml"
 | 
			
		||||
	OpenIMRPCUserCfgFileName = "openim-rpc-user.yml"
 | 
			
		||||
	DiscoveryConfigFilename = "discovery.yml"
 | 
			
		||||
 | 
			
		||||
	ConfigEnvPrefixMap = make(map[string]string)
 | 
			
		||||
	fileNames := []string{
 | 
			
		||||
		FileName, NotificationFileName, ShareFileName, WebhooksConfigFileName,
 | 
			
		||||
		KafkaConfigFileName, RedisConfigFileName, ZookeeperConfigFileName,
 | 
			
		||||
		KafkaConfigFileName, RedisConfigFileName,
 | 
			
		||||
		MongodbConfigFileName, MinioConfigFileName, LogConfigFileName,
 | 
			
		||||
		OpenIMAPICfgFileName, OpenIMCronTaskCfgFileName, OpenIMMsgGatewayCfgFileName,
 | 
			
		||||
		OpenIMMsgTransferCfgFileName, OpenIMPushCfgFileName, OpenIMRPCAuthCfgFileName,
 | 
			
		||||
		OpenIMRPCConversationCfgFileName, OpenIMRPCFriendCfgFileName, OpenIMRPCGroupCfgFileName,
 | 
			
		||||
		OpenIMRPCMsgCfgFileName, OpenIMRPCThirdCfgFileName, OpenIMRPCUserCfgFileName,
 | 
			
		||||
		OpenIMRPCMsgCfgFileName, OpenIMRPCThirdCfgFileName, OpenIMRPCUserCfgFileName, DiscoveryConfigFilename,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, fileName := range fileNames {
 | 
			
		||||
 | 
			
		||||
@ -36,11 +36,11 @@ func NewConversationRpcCmd() *ConversationRpcCmd {
 | 
			
		||||
	ret.configMap = map[string]any{
 | 
			
		||||
		OpenIMRPCConversationCfgFileName: &conversationConfig.RpcConfig,
 | 
			
		||||
		RedisConfigFileName:              &conversationConfig.RedisConfig,
 | 
			
		||||
		ZookeeperConfigFileName:          &conversationConfig.ZookeeperConfig,
 | 
			
		||||
		MongodbConfigFileName:            &conversationConfig.MongodbConfig,
 | 
			
		||||
		ShareFileName:                    &conversationConfig.Share,
 | 
			
		||||
		NotificationFileName:             &conversationConfig.NotificationConfig,
 | 
			
		||||
		LocalCacheConfigFileName:         &conversationConfig.LocalCacheConfig,
 | 
			
		||||
		DiscoveryConfigFilename:          &conversationConfig.Discovery,
 | 
			
		||||
	}
 | 
			
		||||
	ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
 | 
			
		||||
	ret.ctx = context.WithValue(context.Background(), "version", config.Version)
 | 
			
		||||
@ -55,7 +55,7 @@ func (a *ConversationRpcCmd) Exec() error {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (a *ConversationRpcCmd) runE() error {
 | 
			
		||||
	return startrpc.Start(a.ctx, &a.conversationConfig.ZookeeperConfig, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP,
 | 
			
		||||
	return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP,
 | 
			
		||||
		a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.Ports,
 | 
			
		||||
		a.Index(), a.conversationConfig.Share.RpcRegisterName.Conversation, &a.conversationConfig.Share, a.conversationConfig, conversation.Start)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -34,8 +34,8 @@ func NewCronTaskCmd() *CronTaskCmd {
 | 
			
		||||
	ret := &CronTaskCmd{cronTaskConfig: &cronTaskConfig}
 | 
			
		||||
	ret.configMap = map[string]any{
 | 
			
		||||
		OpenIMCronTaskCfgFileName: &cronTaskConfig.CronTask,
 | 
			
		||||
		ZookeeperConfigFileName:   &cronTaskConfig.ZookeeperConfig,
 | 
			
		||||
		ShareFileName:             &cronTaskConfig.Share,
 | 
			
		||||
		DiscoveryConfigFilename:   &cronTaskConfig.Discovery,
 | 
			
		||||
	}
 | 
			
		||||
	ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
 | 
			
		||||
	ret.ctx = context.WithValue(context.Background(), "version", config.Version)
 | 
			
		||||
 | 
			
		||||
@ -36,12 +36,12 @@ func NewFriendRpcCmd() *FriendRpcCmd {
 | 
			
		||||
	ret.configMap = map[string]any{
 | 
			
		||||
		OpenIMRPCFriendCfgFileName: &friendConfig.RpcConfig,
 | 
			
		||||
		RedisConfigFileName:        &friendConfig.RedisConfig,
 | 
			
		||||
		ZookeeperConfigFileName:    &friendConfig.ZookeeperConfig,
 | 
			
		||||
		MongodbConfigFileName:      &friendConfig.MongodbConfig,
 | 
			
		||||
		ShareFileName:              &friendConfig.Share,
 | 
			
		||||
		NotificationFileName:       &friendConfig.NotificationConfig,
 | 
			
		||||
		WebhooksConfigFileName:     &friendConfig.WebhooksConfig,
 | 
			
		||||
		LocalCacheConfigFileName:   &friendConfig.LocalCacheConfig,
 | 
			
		||||
		DiscoveryConfigFilename:    &friendConfig.Discovery,
 | 
			
		||||
	}
 | 
			
		||||
	ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
 | 
			
		||||
	ret.ctx = context.WithValue(context.Background(), "version", config.Version)
 | 
			
		||||
@ -56,7 +56,7 @@ func (a *FriendRpcCmd) Exec() error {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (a *FriendRpcCmd) runE() error {
 | 
			
		||||
	return startrpc.Start(a.ctx, &a.friendConfig.ZookeeperConfig, &a.friendConfig.RpcConfig.Prometheus, a.friendConfig.RpcConfig.RPC.ListenIP,
 | 
			
		||||
	return startrpc.Start(a.ctx, &a.friendConfig.Discovery, &a.friendConfig.RpcConfig.Prometheus, a.friendConfig.RpcConfig.RPC.ListenIP,
 | 
			
		||||
		a.friendConfig.RpcConfig.RPC.RegisterIP, a.friendConfig.RpcConfig.RPC.Ports,
 | 
			
		||||
		a.Index(), a.friendConfig.Share.RpcRegisterName.Friend, &a.friendConfig.Share, a.friendConfig, friend.Start)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -36,12 +36,12 @@ func NewGroupRpcCmd() *GroupRpcCmd {
 | 
			
		||||
	ret.configMap = map[string]any{
 | 
			
		||||
		OpenIMRPCGroupCfgFileName: &groupConfig.RpcConfig,
 | 
			
		||||
		RedisConfigFileName:       &groupConfig.RedisConfig,
 | 
			
		||||
		ZookeeperConfigFileName:   &groupConfig.ZookeeperConfig,
 | 
			
		||||
		MongodbConfigFileName:     &groupConfig.MongodbConfig,
 | 
			
		||||
		ShareFileName:             &groupConfig.Share,
 | 
			
		||||
		NotificationFileName:      &groupConfig.NotificationConfig,
 | 
			
		||||
		WebhooksConfigFileName:    &groupConfig.WebhooksConfig,
 | 
			
		||||
		LocalCacheConfigFileName:  &groupConfig.LocalCacheConfig,
 | 
			
		||||
		DiscoveryConfigFilename:   &groupConfig.Discovery,
 | 
			
		||||
	}
 | 
			
		||||
	ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
 | 
			
		||||
	ret.ctx = context.WithValue(context.Background(), "version", config.Version)
 | 
			
		||||
@ -56,7 +56,7 @@ func (a *GroupRpcCmd) Exec() error {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (a *GroupRpcCmd) runE() error {
 | 
			
		||||
	return startrpc.Start(a.ctx, &a.groupConfig.ZookeeperConfig, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP,
 | 
			
		||||
	return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP,
 | 
			
		||||
		a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.Ports,
 | 
			
		||||
		a.Index(), a.groupConfig.Share.RpcRegisterName.Group, &a.groupConfig.Share, a.groupConfig, group.Start)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -36,13 +36,13 @@ func NewMsgRpcCmd() *MsgRpcCmd {
 | 
			
		||||
	ret.configMap = map[string]any{
 | 
			
		||||
		OpenIMRPCMsgCfgFileName:  &msgConfig.RpcConfig,
 | 
			
		||||
		RedisConfigFileName:      &msgConfig.RedisConfig,
 | 
			
		||||
		ZookeeperConfigFileName:  &msgConfig.ZookeeperConfig,
 | 
			
		||||
		MongodbConfigFileName:    &msgConfig.MongodbConfig,
 | 
			
		||||
		KafkaConfigFileName:      &msgConfig.KafkaConfig,
 | 
			
		||||
		ShareFileName:            &msgConfig.Share,
 | 
			
		||||
		NotificationFileName:     &msgConfig.NotificationConfig,
 | 
			
		||||
		WebhooksConfigFileName:   &msgConfig.WebhooksConfig,
 | 
			
		||||
		LocalCacheConfigFileName: &msgConfig.LocalCacheConfig,
 | 
			
		||||
		DiscoveryConfigFilename:  &msgConfig.Discovery,
 | 
			
		||||
	}
 | 
			
		||||
	ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
 | 
			
		||||
	ret.ctx = context.WithValue(context.Background(), "version", config.Version)
 | 
			
		||||
@ -57,7 +57,7 @@ func (a *MsgRpcCmd) Exec() error {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (a *MsgRpcCmd) runE() error {
 | 
			
		||||
	return startrpc.Start(a.ctx, &a.msgConfig.ZookeeperConfig, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP,
 | 
			
		||||
	return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP,
 | 
			
		||||
		a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.Ports,
 | 
			
		||||
		a.Index(), a.msgConfig.Share.RpcRegisterName.Msg, &a.msgConfig.Share, a.msgConfig, msg.Start)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -36,9 +36,9 @@ func NewMsgGatewayCmd() *MsgGatewayCmd {
 | 
			
		||||
	ret := &MsgGatewayCmd{msgGatewayConfig: &msgGatewayConfig}
 | 
			
		||||
	ret.configMap = map[string]any{
 | 
			
		||||
		OpenIMMsgGatewayCfgFileName: &msgGatewayConfig.MsgGateway,
 | 
			
		||||
		ZookeeperConfigFileName:     &msgGatewayConfig.ZookeeperConfig,
 | 
			
		||||
		ShareFileName:               &msgGatewayConfig.Share,
 | 
			
		||||
		WebhooksConfigFileName:      &msgGatewayConfig.WebhooksConfig,
 | 
			
		||||
		DiscoveryConfigFilename:     &msgGatewayConfig.Discovery,
 | 
			
		||||
	}
 | 
			
		||||
	ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
 | 
			
		||||
	ret.ctx = context.WithValue(context.Background(), "version", config.Version)
 | 
			
		||||
 | 
			
		||||
@ -37,9 +37,9 @@ func NewMsgTransferCmd() *MsgTransferCmd {
 | 
			
		||||
		RedisConfigFileName:          &msgTransferConfig.RedisConfig,
 | 
			
		||||
		MongodbConfigFileName:        &msgTransferConfig.MongodbConfig,
 | 
			
		||||
		KafkaConfigFileName:          &msgTransferConfig.KafkaConfig,
 | 
			
		||||
		ZookeeperConfigFileName:      &msgTransferConfig.ZookeeperConfig,
 | 
			
		||||
		ShareFileName:                &msgTransferConfig.Share,
 | 
			
		||||
		WebhooksConfigFileName:       &msgTransferConfig.WebhooksConfig,
 | 
			
		||||
		DiscoveryConfigFilename:      &msgTransferConfig.Discovery,
 | 
			
		||||
	}
 | 
			
		||||
	ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
 | 
			
		||||
	ret.ctx = context.WithValue(context.Background(), "version", config.Version)
 | 
			
		||||
 | 
			
		||||
@ -36,13 +36,13 @@ func NewPushRpcCmd() *PushRpcCmd {
 | 
			
		||||
	ret.configMap = map[string]any{
 | 
			
		||||
		OpenIMPushCfgFileName:    &pushConfig.RpcConfig,
 | 
			
		||||
		RedisConfigFileName:      &pushConfig.RedisConfig,
 | 
			
		||||
		ZookeeperConfigFileName:  &pushConfig.ZookeeperConfig,
 | 
			
		||||
		MongodbConfigFileName:    &pushConfig.MongodbConfig,
 | 
			
		||||
		KafkaConfigFileName:      &pushConfig.KafkaConfig,
 | 
			
		||||
		ShareFileName:            &pushConfig.Share,
 | 
			
		||||
		NotificationFileName:     &pushConfig.NotificationConfig,
 | 
			
		||||
		WebhooksConfigFileName:   &pushConfig.WebhooksConfig,
 | 
			
		||||
		LocalCacheConfigFileName: &pushConfig.LocalCacheConfig,
 | 
			
		||||
		DiscoveryConfigFilename:  &pushConfig.Discovery,
 | 
			
		||||
	}
 | 
			
		||||
	ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
 | 
			
		||||
	ret.ctx = context.WithValue(context.Background(), "version", config.Version)
 | 
			
		||||
@ -57,7 +57,7 @@ func (a *PushRpcCmd) Exec() error {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (a *PushRpcCmd) runE() error {
 | 
			
		||||
	return startrpc.Start(a.ctx, &a.pushConfig.ZookeeperConfig, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP,
 | 
			
		||||
	return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP,
 | 
			
		||||
		a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.Ports,
 | 
			
		||||
		a.Index(), a.pushConfig.Share.RpcRegisterName.Push, &a.pushConfig.Share, a.pushConfig, push.Start)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -36,12 +36,12 @@ func NewThirdRpcCmd() *ThirdRpcCmd {
 | 
			
		||||
	ret.configMap = map[string]any{
 | 
			
		||||
		OpenIMRPCThirdCfgFileName: &thirdConfig.RpcConfig,
 | 
			
		||||
		RedisConfigFileName:       &thirdConfig.RedisConfig,
 | 
			
		||||
		ZookeeperConfigFileName:   &thirdConfig.ZookeeperConfig,
 | 
			
		||||
		MongodbConfigFileName:     &thirdConfig.MongodbConfig,
 | 
			
		||||
		ShareFileName:             &thirdConfig.Share,
 | 
			
		||||
		NotificationFileName:      &thirdConfig.NotificationConfig,
 | 
			
		||||
		MinioConfigFileName:       &thirdConfig.MinioConfig,
 | 
			
		||||
		LocalCacheConfigFileName:  &thirdConfig.LocalCacheConfig,
 | 
			
		||||
		DiscoveryConfigFilename:   &thirdConfig.Discovery,
 | 
			
		||||
	}
 | 
			
		||||
	ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
 | 
			
		||||
	ret.ctx = context.WithValue(context.Background(), "version", config.Version)
 | 
			
		||||
@ -56,7 +56,7 @@ func (a *ThirdRpcCmd) Exec() error {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (a *ThirdRpcCmd) runE() error {
 | 
			
		||||
	return startrpc.Start(a.ctx, &a.thirdConfig.ZookeeperConfig, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP,
 | 
			
		||||
	return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP,
 | 
			
		||||
		a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.Ports,
 | 
			
		||||
		a.Index(), a.thirdConfig.Share.RpcRegisterName.Third, &a.thirdConfig.Share, a.thirdConfig, third.Start)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -36,13 +36,13 @@ func NewUserRpcCmd() *UserRpcCmd {
 | 
			
		||||
	ret.configMap = map[string]any{
 | 
			
		||||
		OpenIMRPCUserCfgFileName: &userConfig.RpcConfig,
 | 
			
		||||
		RedisConfigFileName:      &userConfig.RedisConfig,
 | 
			
		||||
		ZookeeperConfigFileName:  &userConfig.ZookeeperConfig,
 | 
			
		||||
		MongodbConfigFileName:    &userConfig.MongodbConfig,
 | 
			
		||||
		KafkaConfigFileName:      &userConfig.KafkaConfig,
 | 
			
		||||
		ShareFileName:            &userConfig.Share,
 | 
			
		||||
		NotificationFileName:     &userConfig.NotificationConfig,
 | 
			
		||||
		WebhooksConfigFileName:   &userConfig.WebhooksConfig,
 | 
			
		||||
		LocalCacheConfigFileName: &userConfig.LocalCacheConfig,
 | 
			
		||||
		DiscoveryConfigFilename:  &userConfig.Discovery,
 | 
			
		||||
	}
 | 
			
		||||
	ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
 | 
			
		||||
	ret.ctx = context.WithValue(context.Background(), "version", config.Version)
 | 
			
		||||
@ -57,7 +57,7 @@ func (a *UserRpcCmd) Exec() error {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (a *UserRpcCmd) runE() error {
 | 
			
		||||
	return startrpc.Start(a.ctx, &a.userConfig.ZookeeperConfig, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP,
 | 
			
		||||
	return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP,
 | 
			
		||||
		a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.Ports,
 | 
			
		||||
		a.Index(), a.userConfig.Share.RpcRegisterName.User, &a.userConfig.Share, a.userConfig, user.Start)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -345,7 +345,6 @@ type AfterConfig struct {
 | 
			
		||||
 | 
			
		||||
type Share struct {
 | 
			
		||||
	Secret          string          `mapstructure:"secret"`
 | 
			
		||||
	Env             string          `mapstructure:"env"`
 | 
			
		||||
	RpcRegisterName RpcRegisterName `mapstructure:"rpcRegisterName"`
 | 
			
		||||
	IMAdminUserID   []string        `mapstructure:"imAdminUserID"`
 | 
			
		||||
}
 | 
			
		||||
@ -432,6 +431,19 @@ type ZooKeeper struct {
 | 
			
		||||
	Password string   `mapstructure:"password"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Discovery struct {
 | 
			
		||||
	Enable    string    `mapstructure:"enable"`
 | 
			
		||||
	Etcd      Etcd      `mapstructure:"etcd"`
 | 
			
		||||
	ZooKeeper ZooKeeper `mapstructure:"zooKeeper"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Etcd struct {
 | 
			
		||||
	RootDirectory string   `mapstructure:"rootDirectory"`
 | 
			
		||||
	Address       []string `mapstructure:"address"`
 | 
			
		||||
	Username      string   `mapstructure:"username"`
 | 
			
		||||
	Password      string   `mapstructure:"password"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *Mongo) Build() *mongoutil.Config {
 | 
			
		||||
	return &mongoutil.Config{
 | 
			
		||||
		Uri:         m.URI,
 | 
			
		||||
 | 
			
		||||
@ -18,36 +18,34 @@ import (
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes"
 | 
			
		||||
	"github.com/openimsdk/tools/discovery"
 | 
			
		||||
	"github.com/openimsdk/tools/discovery/etcd"
 | 
			
		||||
	"github.com/openimsdk/tools/discovery/zookeeper"
 | 
			
		||||
	"github.com/openimsdk/tools/errs"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	zookeeperConst = "zookeeper"
 | 
			
		||||
	kubenetesConst = "k8s"
 | 
			
		||||
	directConst    = "direct"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
 | 
			
		||||
func NewDiscoveryRegister(zookeeperConfig *config.ZooKeeper, share *config.Share) (discovery.SvcDiscoveryRegistry, error) {
 | 
			
		||||
	switch share.Env {
 | 
			
		||||
	case zookeeperConst:
 | 
			
		||||
 | 
			
		||||
func NewDiscoveryRegister(discovery *config.Discovery, share *config.Share) (discovery.SvcDiscoveryRegistry, error) {
 | 
			
		||||
	switch discovery.Enable {
 | 
			
		||||
	case "zookeeper":
 | 
			
		||||
		return zookeeper.NewZkClient(
 | 
			
		||||
			zookeeperConfig.Address,
 | 
			
		||||
			zookeeperConfig.Schema,
 | 
			
		||||
			discovery.ZooKeeper.Address,
 | 
			
		||||
			discovery.ZooKeeper.Schema,
 | 
			
		||||
			zookeeper.WithFreq(time.Hour),
 | 
			
		||||
			zookeeper.WithUserNameAndPassword(zookeeperConfig.Username, zookeeperConfig.Password),
 | 
			
		||||
			zookeeper.WithUserNameAndPassword(discovery.ZooKeeper.Username, discovery.ZooKeeper.Password),
 | 
			
		||||
			zookeeper.WithRoundRobin(),
 | 
			
		||||
			zookeeper.WithTimeout(10),
 | 
			
		||||
		)
 | 
			
		||||
	case kubenetesConst:
 | 
			
		||||
	case "k8s":
 | 
			
		||||
		return kubernetes.NewK8sDiscoveryRegister(share.RpcRegisterName.MessageGateway)
 | 
			
		||||
	case directConst:
 | 
			
		||||
		//return direct.NewConnDirect(config)
 | 
			
		||||
	case "etcd":
 | 
			
		||||
		return etcd.NewSvcDiscoveryRegistry(
 | 
			
		||||
			discovery.Etcd.RootDirectory,
 | 
			
		||||
			discovery.Etcd.Address,
 | 
			
		||||
			etcd.WithDialTimeout(10*time.Second),
 | 
			
		||||
			etcd.WithMaxCallSendMsgSize(20*1024*1024),
 | 
			
		||||
			etcd.WithUsernameAndPassword(discovery.Etcd.Username, discovery.Etcd.Password))
 | 
			
		||||
	default:
 | 
			
		||||
		return nil, errs.New("unsupported discovery type", "type", share.Env).Wrap()
 | 
			
		||||
		return nil, errs.New("unsupported discovery type", "type", discovery.Enable).Wrap()
 | 
			
		||||
	}
 | 
			
		||||
	return nil, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										15
									
								
								pkg/common/discoveryregister/etcd/doc.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										15
									
								
								pkg/common/discoveryregister/etcd/doc.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,15 @@
 | 
			
		||||
// Copyright © 2024 OpenIM. All rights reserved.
 | 
			
		||||
//
 | 
			
		||||
// Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
// you may not use this file except in compliance with the License.
 | 
			
		||||
// You may obtain a copy of the License at
 | 
			
		||||
//
 | 
			
		||||
//     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
//
 | 
			
		||||
// Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
// distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
// See the License for the specific language governing permissions and
 | 
			
		||||
// limitations under the License.
 | 
			
		||||
 | 
			
		||||
package kubernetes // import "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/etcd"
 | 
			
		||||
@ -1,44 +0,0 @@
 | 
			
		||||
// Copyright © 2023 OpenIM. All rights reserved.
 | 
			
		||||
//
 | 
			
		||||
// Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
// you may not use this file except in compliance with the License.
 | 
			
		||||
// You may obtain a copy of the License at
 | 
			
		||||
//
 | 
			
		||||
//     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
//
 | 
			
		||||
// Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
// distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
// See the License for the specific language governing permissions and
 | 
			
		||||
// limitations under the License.
 | 
			
		||||
 | 
			
		||||
package zookeeper
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"os"
 | 
			
		||||
	"strings"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// getEnv returns the value of an environment variable if it exists, otherwise it returns the fallback value.
 | 
			
		||||
func getEnv(key, fallback string) string {
 | 
			
		||||
	if value, exists := os.LookupEnv(key); exists {
 | 
			
		||||
		return value
 | 
			
		||||
	}
 | 
			
		||||
	return fallback
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// getZkAddrFromEnv returns the Zookeeper addresses combined from the ZOOKEEPER_ADDRESS and ZOOKEEPER_PORT environment variables.
 | 
			
		||||
// If the environment variables are not set, it returns the fallback value.
 | 
			
		||||
func getZkAddrFromEnv(fallback []string) []string {
 | 
			
		||||
	address, addrExists := os.LookupEnv("ZOOKEEPER_ADDRESS")
 | 
			
		||||
	port, portExists := os.LookupEnv("ZOOKEEPER_PORT")
 | 
			
		||||
 | 
			
		||||
	if addrExists && portExists {
 | 
			
		||||
		addresses := strings.Split(address, ",")
 | 
			
		||||
		for i, addr := range addresses {
 | 
			
		||||
			addresses[i] = addr + ":" + port
 | 
			
		||||
		}
 | 
			
		||||
		return addresses
 | 
			
		||||
	}
 | 
			
		||||
	return fallback
 | 
			
		||||
}
 | 
			
		||||
@ -44,7 +44,7 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Start rpc server.
 | 
			
		||||
func Start[T any](ctx context.Context, zookeeperConfig *config2.ZooKeeper, prometheusConfig *config2.Prometheus, listenIP,
 | 
			
		||||
func Start[T any](ctx context.Context, discovery *config2.Discovery, prometheusConfig *config2.Prometheus, listenIP,
 | 
			
		||||
	registerIP string, rpcPorts []int, index int, rpcRegisterName string, share *config2.Share, config T, rpcFn func(ctx context.Context,
 | 
			
		||||
	config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error {
 | 
			
		||||
 | 
			
		||||
@ -68,7 +68,7 @@ func Start[T any](ctx context.Context, zookeeperConfig *config2.ZooKeeper, prome
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	defer listener.Close()
 | 
			
		||||
	client, err := kdisc.NewDiscoveryRegister(zookeeperConfig, share)
 | 
			
		||||
	client, err := kdisc.NewDiscoveryRegister(discovery, share)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -22,6 +22,7 @@ import (
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
 | 
			
		||||
	"github.com/openimsdk/tools/db/mongoutil"
 | 
			
		||||
	"github.com/openimsdk/tools/db/redisutil"
 | 
			
		||||
	"github.com/openimsdk/tools/discovery/etcd"
 | 
			
		||||
	"github.com/openimsdk/tools/discovery/zookeeper"
 | 
			
		||||
	"github.com/openimsdk/tools/mq/kafka"
 | 
			
		||||
	"github.com/openimsdk/tools/s3/minio"
 | 
			
		||||
@ -43,6 +44,14 @@ func CheckZookeeper(ctx context.Context, config *config.ZooKeeper) error {
 | 
			
		||||
	return zookeeper.Check(ctx, config.Address, config.Schema, zookeeper.WithUserNameAndPassword(config.Username, config.Password))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func CheckEtcd(ctx context.Context, config *config.Etcd) error {
 | 
			
		||||
	return etcd.Check(ctx, config.Address, "/check_openim_component",
 | 
			
		||||
		true,
 | 
			
		||||
		etcd.WithDialTimeout(10*time.Second),
 | 
			
		||||
		etcd.WithMaxCallSendMsgSize(20*1024*1024),
 | 
			
		||||
		etcd.WithUsernameAndPassword(config.Username, config.Password))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func CheckMongo(ctx context.Context, config *config.Mongo) error {
 | 
			
		||||
	return mongoutil.Check(ctx, config.Build())
 | 
			
		||||
}
 | 
			
		||||
@ -59,14 +68,14 @@ func CheckKafka(ctx context.Context, conf *config.Kafka) error {
 | 
			
		||||
	return kafka.Check(ctx, conf.Build(), []string{conf.ToMongoTopic, conf.ToRedisTopic, conf.ToPushTopic})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func initConfig(configDir string) (*config.Mongo, *config.Redis, *config.Kafka, *config.Minio, *config.ZooKeeper, error) {
 | 
			
		||||
func initConfig(configDir string) (*config.Mongo, *config.Redis, *config.Kafka, *config.Minio, *config.Discovery, error) {
 | 
			
		||||
	var (
 | 
			
		||||
		mongoConfig     = &config.Mongo{}
 | 
			
		||||
		redisConfig     = &config.Redis{}
 | 
			
		||||
		kafkaConfig     = &config.Kafka{}
 | 
			
		||||
		minioConfig     = &config.Minio{}
 | 
			
		||||
		zookeeperConfig = &config.ZooKeeper{}
 | 
			
		||||
		thirdConfig     = &config.Third{}
 | 
			
		||||
		mongoConfig = &config.Mongo{}
 | 
			
		||||
		redisConfig = &config.Redis{}
 | 
			
		||||
		kafkaConfig = &config.Kafka{}
 | 
			
		||||
		minioConfig = &config.Minio{}
 | 
			
		||||
		discovery   = &config.Discovery{}
 | 
			
		||||
		thirdConfig = &config.Third{}
 | 
			
		||||
	)
 | 
			
		||||
	err := config.LoadConfig(filepath.Join(configDir, cmd.MongodbConfigFileName), cmd.ConfigEnvPrefixMap[cmd.MongodbConfigFileName], mongoConfig)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@ -96,11 +105,11 @@ func initConfig(configDir string) (*config.Mongo, *config.Redis, *config.Kafka,
 | 
			
		||||
	} else {
 | 
			
		||||
		minioConfig = nil
 | 
			
		||||
	}
 | 
			
		||||
	err = config.LoadConfig(filepath.Join(configDir, cmd.ZookeeperConfigFileName), cmd.ConfigEnvPrefixMap[cmd.ZookeeperConfigFileName], zookeeperConfig)
 | 
			
		||||
	err = config.LoadConfig(filepath.Join(configDir, cmd.DiscoveryConfigFilename), cmd.ConfigEnvPrefixMap[cmd.DiscoveryConfigFilename], discovery)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, nil, nil, nil, nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return mongoConfig, redisConfig, kafkaConfig, minioConfig, zookeeperConfig, nil
 | 
			
		||||
	return mongoConfig, redisConfig, kafkaConfig, minioConfig, discovery, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func main() {
 | 
			
		||||
@ -127,35 +136,40 @@ func main() {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func performChecks(ctx context.Context, mongoConfig *config.Mongo, redisConfig *config.Redis, kafkaConfig *config.Kafka, minioConfig *config.Minio, zookeeperConfig *config.ZooKeeper, maxRetry int) error {
 | 
			
		||||
func performChecks(ctx context.Context, mongoConfig *config.Mongo, redisConfig *config.Redis, kafkaConfig *config.Kafka, minioConfig *config.Minio, discovery *config.Discovery, maxRetry int) error {
 | 
			
		||||
	checksDone := make(map[string]bool)
 | 
			
		||||
 | 
			
		||||
	checks := map[string]func() error{
 | 
			
		||||
		"Zookeeper": func() error {
 | 
			
		||||
			return CheckZookeeper(ctx, zookeeperConfig)
 | 
			
		||||
		},
 | 
			
		||||
		"Mongo": func() error {
 | 
			
		||||
	checks := map[string]func(ctx context.Context) error{
 | 
			
		||||
		"Mongo": func(ctx context.Context) error {
 | 
			
		||||
			return CheckMongo(ctx, mongoConfig)
 | 
			
		||||
		},
 | 
			
		||||
		"Redis": func() error {
 | 
			
		||||
		"Redis": func(ctx context.Context) error {
 | 
			
		||||
			return CheckRedis(ctx, redisConfig)
 | 
			
		||||
		},
 | 
			
		||||
		"Kafka": func() error {
 | 
			
		||||
		"Kafka": func(ctx context.Context) error {
 | 
			
		||||
			return CheckKafka(ctx, kafkaConfig)
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if minioConfig != nil {
 | 
			
		||||
		checks["MinIO"] = func() error {
 | 
			
		||||
		checks["MinIO"] = func(ctx context.Context) error {
 | 
			
		||||
			return CheckMinIO(ctx, minioConfig)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if discovery.Enable == "etcd" {
 | 
			
		||||
		checks["Etcd"] = func(ctx context.Context) error {
 | 
			
		||||
			return CheckEtcd(ctx, &discovery.Etcd)
 | 
			
		||||
		}
 | 
			
		||||
	} else if discovery.Enable == "zookeeper" {
 | 
			
		||||
		checks["Zookeeper"] = func(ctx context.Context) error {
 | 
			
		||||
			return CheckZookeeper(ctx, &discovery.ZooKeeper)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for i := 0; i < maxRetry; i++ {
 | 
			
		||||
		allSuccess := true
 | 
			
		||||
		for name, check := range checks {
 | 
			
		||||
			if !checksDone[name] {
 | 
			
		||||
				if err := check(); err != nil {
 | 
			
		||||
				if err := check(ctx); err != nil {
 | 
			
		||||
					fmt.Printf("%s check failed: %v\n", name, err)
 | 
			
		||||
					allSuccess = false
 | 
			
		||||
				} else {
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user