refactor(core): support kubernetes discovery registry (#3749)

This commit is contained in:
dsx137 2026-06-26 18:44:39 +08:00 committed by GitHub
parent 2a6935e910
commit 2b42ff8c71
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 78 additions and 37 deletions

2
go.mod
View File

@ -13,7 +13,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.73-alpha.12
github.com/openimsdk/tools v0.0.50-alpha.117
github.com/openimsdk/tools v0.0.50-alpha.119
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.11.1

2
go.sum
View File

@ -365,6 +365,8 @@ github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce
github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
github.com/openimsdk/tools v0.0.50-alpha.117 h1:ACfijEVCeBcttT7OOkNGOOOvq14pJtb9szNIMHLm6Vc=
github.com/openimsdk/tools v0.0.50-alpha.117/go.mod h1:I0WESSa7ghPIo9BL+ETlH/qEIbO6+KZioM1jwNuDwz0=
github.com/openimsdk/tools v0.0.50-alpha.119 h1:S/RjRtL0ciwiG6pZKssbU//qVSWi7AuKHOPCHsQgz68=
github.com/openimsdk/tools v0.0.50-alpha.119/go.mod h1:I0WESSa7ghPIo9BL+ETlH/qEIbO6+KZioM1jwNuDwz0=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=

View File

@ -14,6 +14,7 @@ import (
"github.com/openimsdk/tools/apiresp"
"github.com/go-playground/validator/v10"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
@ -288,25 +289,12 @@ func (ws *WsServer) registerClient(client *Client) {
}
}
wg := sync.WaitGroup{}
log.ZDebug(client.ctx, "ws.msgGatewayConfig.Discovery.Enable", "discoveryEnable", ws.msgGatewayConfig.Discovery.Enable)
if ws.msgGatewayConfig.Discovery.Enable != "k8s" {
wg.Add(1)
go func() {
defer wg.Done()
_ = ws.sendUserOnlineInfoToOtherNode(client.ctx, client)
}()
if ws.msgGatewayConfig.Discovery.Enable != config.KUBERNETES {
_ = ws.sendUserOnlineInfoToOtherNode(client.ctx, client)
}
//wg.Add(1)
//go func() {
// defer wg.Done()
// ws.SetUserOnlineStatus(client.ctx, client, constant.Online)
//}()
wg.Wait()
log.ZDebug(client.ctx, "user online", "online user Num", ws.onlineUserNum.Load(), "online user conn Num", ws.onlineUserConnNum.Load())
}

View File

@ -4,6 +4,7 @@ import (
"context"
"sync"
conf "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/protocol/msggateway"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/discovery"
@ -39,11 +40,11 @@ func (u emptyOnlinePusher) GetOnlinePushFailedUserIDs(ctx context.Context, msg *
func NewOnlinePusher(disCov discovery.SvcDiscoveryRegistry, config *Config) OnlinePusher {
switch config.Discovery.Enable {
case "k8s":
return NewK8sStaticConsistentHash(disCov, config)
case conf.KUBERNETES:
return NewDefaultAllNode(disCov, config)
case "zookeeper":
return NewDefaultAllNode(disCov, config)
case "etcd":
case conf.ETCD:
return NewDefaultAllNode(disCov, config)
default:
return newEmptyOnlinePusher()

View File

@ -23,7 +23,7 @@ func TestName(t *testing.T) {
Address: []string{"localhost:12379"},
},
}
client, err := kdisc.NewDiscoveryRegister(conf, "source")
client, err := kdisc.NewDiscoveryRegister(conf, &config.Share{}, nil)
if err != nil {
panic(err)
}

View File

@ -484,9 +484,14 @@ type ZooKeeper struct {
}
type Discovery struct {
Enable string `mapstructure:"enable"`
Etcd Etcd `mapstructure:"etcd"`
ZooKeeper ZooKeeper `mapstructure:"zooKeeper"`
Enable string `mapstructure:"enable"`
Etcd Etcd `mapstructure:"etcd"`
Kubernetes Kubernetes `mapstructure:"kubernetes"`
ZooKeeper ZooKeeper `mapstructure:"zooKeeper"`
}
type Kubernetes struct {
Namespace string `mapstructure:"namespace"`
}
type Etcd struct {

View File

@ -1,8 +1,12 @@
package config
import (
"github.com/stretchr/testify/assert"
"os"
"path/filepath"
"testing"
"github.com/stretchr/testify/assert"
)
func TestLoadLogConfig(t *testing.T) {
@ -27,6 +31,24 @@ func TestLoadWebhooksConfig(t *testing.T) {
}
func TestLoadDiscoveryKubernetesConfig(t *testing.T) {
path := filepath.Join(t.TempDir(), "discovery.yml")
err := os.WriteFile(path, []byte(`enable: kubernetes
kubernetes:
namespace: openim
etcd:
rootDirectory: openim
address: [localhost:12379]
`), 0600)
assert.Nil(t, err)
var discovery Discovery
err = LoadConfig(path, "IMENV_DISCOVERY", &discovery)
assert.Nil(t, err)
assert.Equal(t, KUBERNETES, discovery.Enable)
assert.Equal(t, "openim", discovery.Kubernetes.Namespace)
}
func TestLoadOpenIMRpcUserConfig(t *testing.T) {
var user User
err := LoadConfig("../../../config/openim-rpc-user.yml", "IMENV_OPENIM_RPC_USER", &user)

View File

@ -22,19 +22,26 @@ import (
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/discovery/kubernetes"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/runtimeenv"
"google.golang.org/grpc"
)
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
func NewDiscoveryRegister(discovery *config.Discovery, share *config.Share, watchNames []string) (discovery.SvcDiscoveryRegistry, error) {
switch discovery.Enable {
case "k8s":
return kubernetes.NewConnManager("default", watchNames,
if runtimeenv.RuntimeEnvironment() == config.KUBERNETES {
namespace := discovery.Kubernetes.Namespace
if namespace == "" {
namespace = "default"
}
return kubernetes.NewConnManager(namespace, watchNames,
grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(1024*1024*20),
),
)
case "etcd":
}
switch discovery.Enable {
case config.ETCD:
return etcd.NewSvcDiscoveryRegistry(
discovery.Etcd.RootDirectory,
discovery.Etcd.Address,

View File

@ -15,16 +15,12 @@
package discoveryregister
import (
"os"
)
"strings"
"testing"
func setupTestEnvironment() {
os.Setenv("ZOOKEEPER_SCHEMA", "openim")
os.Setenv("ZOOKEEPER_ADDRESS", "172.28.0.1")
os.Setenv("ZOOKEEPER_PORT", "12181")
os.Setenv("ZOOKEEPER_USERNAME", "")
os.Setenv("ZOOKEEPER_PASSWORD", "")
}
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/utils/runtimeenv"
)
//func TestNewDiscoveryRegister(t *testing.T) {
// setupTestEnvironment()
@ -58,3 +54,23 @@ func setupTestEnvironment() {
// }
// }
//}
func TestNewDiscoveryRegisterRejectsKubernetesOutsideCluster(t *testing.T) {
if runtimeenv.RuntimeEnvironment() == config.KUBERNETES {
t.Skip("outside-cluster fallback is only relevant outside Kubernetes")
}
discovery := &config.Discovery{
Enable: config.KUBERNETES,
Kubernetes: config.Kubernetes{
Namespace: "default",
},
}
client, err := NewDiscoveryRegister(discovery, &config.Share{}, nil)
if err == nil && client != nil {
client.Close()
}
if err == nil || !strings.Contains(err.Error(), "unsupported discovery type") {
t.Fatalf("%q outside Kubernetes should not select Kubernetes discovery: %v", config.KUBERNETES, err)
}
}