diff --git a/go.mod b/go.mod index 2fe0631e8..4caa277da 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.73-alpha.12 - github.com/openimsdk/tools v0.0.50-alpha.117 + github.com/openimsdk/tools v0.0.50-alpha.119 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.11.1 diff --git a/go.sum b/go.sum index c6f331882..47059e30d 100644 --- a/go.sum +++ b/go.sum @@ -365,6 +365,8 @@ github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= github.com/openimsdk/tools v0.0.50-alpha.117 h1:ACfijEVCeBcttT7OOkNGOOOvq14pJtb9szNIMHLm6Vc= github.com/openimsdk/tools v0.0.50-alpha.117/go.mod h1:I0WESSa7ghPIo9BL+ETlH/qEIbO6+KZioM1jwNuDwz0= +github.com/openimsdk/tools v0.0.50-alpha.119 h1:S/RjRtL0ciwiG6pZKssbU//qVSWi7AuKHOPCHsQgz68= +github.com/openimsdk/tools v0.0.50-alpha.119/go.mod h1:I0WESSa7ghPIo9BL+ETlH/qEIbO6+KZioM1jwNuDwz0= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/msggateway/ws_server.go b/internal/msggateway/ws_server.go index 404751df3..e3ab26f2d 100644 --- a/internal/msggateway/ws_server.go +++ b/internal/msggateway/ws_server.go @@ -14,6 +14,7 @@ import ( "github.com/openimsdk/tools/apiresp" "github.com/go-playground/validator/v10" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" @@ -288,25 +289,12 @@ func (ws *WsServer) registerClient(client *Client) { } } - wg := sync.WaitGroup{} log.ZDebug(client.ctx, "ws.msgGatewayConfig.Discovery.Enable", "discoveryEnable", ws.msgGatewayConfig.Discovery.Enable) - if ws.msgGatewayConfig.Discovery.Enable != "k8s" { - wg.Add(1) - go func() { - defer wg.Done() - _ = ws.sendUserOnlineInfoToOtherNode(client.ctx, client) - }() + if ws.msgGatewayConfig.Discovery.Enable != config.KUBERNETES { + _ = ws.sendUserOnlineInfoToOtherNode(client.ctx, client) } - //wg.Add(1) - //go func() { - // defer wg.Done() - // ws.SetUserOnlineStatus(client.ctx, client, constant.Online) - //}() - - wg.Wait() - log.ZDebug(client.ctx, "user online", "online user Num", ws.onlineUserNum.Load(), "online user conn Num", ws.onlineUserConnNum.Load()) } diff --git a/internal/push/onlinepusher.go b/internal/push/onlinepusher.go index 2393c3567..6db61059a 100644 --- a/internal/push/onlinepusher.go +++ b/internal/push/onlinepusher.go @@ -4,6 +4,7 @@ import ( "context" "sync" + conf "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/protocol/msggateway" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/discovery" @@ -39,11 +40,11 @@ func (u emptyOnlinePusher) GetOnlinePushFailedUserIDs(ctx context.Context, msg * func NewOnlinePusher(disCov discovery.SvcDiscoveryRegistry, config *Config) OnlinePusher { switch config.Discovery.Enable { - case "k8s": - return NewK8sStaticConsistentHash(disCov, config) + case conf.KUBERNETES: + return NewDefaultAllNode(disCov, config) case "zookeeper": return NewDefaultAllNode(disCov, config) - case "etcd": + case conf.ETCD: return NewDefaultAllNode(disCov, config) default: return newEmptyOnlinePusher() diff --git a/internal/tools/cron_test.go b/internal/tools/cron_test.go index 890349069..8aa830935 100644 --- a/internal/tools/cron_test.go +++ b/internal/tools/cron_test.go @@ -23,7 +23,7 @@ func TestName(t *testing.T) { Address: []string{"localhost:12379"}, }, } - client, err := kdisc.NewDiscoveryRegister(conf, "source") + client, err := kdisc.NewDiscoveryRegister(conf, &config.Share{}, nil) if err != nil { panic(err) } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 4cd202db4..7e54bf4d4 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -484,9 +484,14 @@ type ZooKeeper struct { } type Discovery struct { - Enable string `mapstructure:"enable"` - Etcd Etcd `mapstructure:"etcd"` - ZooKeeper ZooKeeper `mapstructure:"zooKeeper"` + Enable string `mapstructure:"enable"` + Etcd Etcd `mapstructure:"etcd"` + Kubernetes Kubernetes `mapstructure:"kubernetes"` + ZooKeeper ZooKeeper `mapstructure:"zooKeeper"` +} + +type Kubernetes struct { + Namespace string `mapstructure:"namespace"` } type Etcd struct { diff --git a/pkg/common/config/load_config_test.go b/pkg/common/config/load_config_test.go index 763bffd9f..fab85c3a6 100644 --- a/pkg/common/config/load_config_test.go +++ b/pkg/common/config/load_config_test.go @@ -1,8 +1,12 @@ package config import ( - "github.com/stretchr/testify/assert" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" ) func TestLoadLogConfig(t *testing.T) { @@ -27,6 +31,24 @@ func TestLoadWebhooksConfig(t *testing.T) { } +func TestLoadDiscoveryKubernetesConfig(t *testing.T) { + path := filepath.Join(t.TempDir(), "discovery.yml") + err := os.WriteFile(path, []byte(`enable: kubernetes +kubernetes: + namespace: openim +etcd: + rootDirectory: openim + address: [localhost:12379] +`), 0600) + assert.Nil(t, err) + + var discovery Discovery + err = LoadConfig(path, "IMENV_DISCOVERY", &discovery) + assert.Nil(t, err) + assert.Equal(t, KUBERNETES, discovery.Enable) + assert.Equal(t, "openim", discovery.Kubernetes.Namespace) +} + func TestLoadOpenIMRpcUserConfig(t *testing.T) { var user User err := LoadConfig("../../../config/openim-rpc-user.yml", "IMENV_OPENIM_RPC_USER", &user) diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index 110a08aa8..ec444defb 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -22,19 +22,26 @@ import ( "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/discovery/kubernetes" "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/utils/runtimeenv" "google.golang.org/grpc" ) // NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type. func NewDiscoveryRegister(discovery *config.Discovery, share *config.Share, watchNames []string) (discovery.SvcDiscoveryRegistry, error) { - switch discovery.Enable { - case "k8s": - return kubernetes.NewConnManager("default", watchNames, + if runtimeenv.RuntimeEnvironment() == config.KUBERNETES { + namespace := discovery.Kubernetes.Namespace + if namespace == "" { + namespace = "default" + } + return kubernetes.NewConnManager(namespace, watchNames, grpc.WithDefaultCallOptions( grpc.MaxCallSendMsgSize(1024*1024*20), ), ) - case "etcd": + } + + switch discovery.Enable { + case config.ETCD: return etcd.NewSvcDiscoveryRegistry( discovery.Etcd.RootDirectory, discovery.Etcd.Address, diff --git a/pkg/common/discoveryregister/discoveryregister_test.go b/pkg/common/discoveryregister/discoveryregister_test.go index 417226645..3e90e324a 100644 --- a/pkg/common/discoveryregister/discoveryregister_test.go +++ b/pkg/common/discoveryregister/discoveryregister_test.go @@ -15,16 +15,12 @@ package discoveryregister import ( - "os" -) + "strings" + "testing" -func setupTestEnvironment() { - os.Setenv("ZOOKEEPER_SCHEMA", "openim") - os.Setenv("ZOOKEEPER_ADDRESS", "172.28.0.1") - os.Setenv("ZOOKEEPER_PORT", "12181") - os.Setenv("ZOOKEEPER_USERNAME", "") - os.Setenv("ZOOKEEPER_PASSWORD", "") -} + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/tools/utils/runtimeenv" +) //func TestNewDiscoveryRegister(t *testing.T) { // setupTestEnvironment() @@ -58,3 +54,23 @@ func setupTestEnvironment() { // } // } //} + +func TestNewDiscoveryRegisterRejectsKubernetesOutsideCluster(t *testing.T) { + if runtimeenv.RuntimeEnvironment() == config.KUBERNETES { + t.Skip("outside-cluster fallback is only relevant outside Kubernetes") + } + discovery := &config.Discovery{ + Enable: config.KUBERNETES, + Kubernetes: config.Kubernetes{ + Namespace: "default", + }, + } + + client, err := NewDiscoveryRegister(discovery, &config.Share{}, nil) + if err == nil && client != nil { + client.Close() + } + if err == nil || !strings.Contains(err.Error(), "unsupported discovery type") { + t.Fatalf("%q outside Kubernetes should not select Kubernetes discovery: %v", config.KUBERNETES, err) + } +}