diff --git a/pkg/common/db/cache/init_redis.go b/pkg/common/db/cache/init_redis.go index 63d2f5707..1308e9649 100644 --- a/pkg/common/db/cache/init_redis.go +++ b/pkg/common/db/cache/init_redis.go @@ -87,7 +87,15 @@ func NewRedis() (redis.UniversalClient, error) { // overrideConfigFromEnv overrides configuration fields with environment variables if present. func overrideConfigFromEnv() { if envAddr := os.Getenv("REDIS_ADDRESS"); envAddr != "" { - config.Config.Redis.Address = strings.Split(envAddr, ",") // Assuming addresses are comma-separated + if envPort := os.Getenv("REDIS_PORT"); envPort != "" { + addresses := strings.Split(envAddr, ",") + for i, addr := range addresses { + addresses[i] = addr + ":" + envPort + } + config.Config.Redis.Address = addresses + } else { + config.Config.Redis.Address = strings.Split(envAddr, ",") + } } if envUser := os.Getenv("REDIS_USERNAME"); envUser != "" { config.Config.Redis.Username = envUser diff --git a/pkg/common/discoveryregister/discoveryregister_test.go b/pkg/common/discoveryregister/discoveryregister_test.go index d4a634b91..d83da1285 100644 --- a/pkg/common/discoveryregister/discoveryregister_test.go +++ b/pkg/common/discoveryregister/discoveryregister_test.go @@ -24,7 +24,8 @@ import ( func setupTestEnvironment() { os.Setenv("ZOOKEEPER_SCHEMA", "openim") - os.Setenv("ZOOKEEPER_ADDRESS", "172.28.0.1:12181") + os.Setenv("ZOOKEEPER_ADDRESS", "172.28.0.1") + os.Setenv("ZOOKEEPER_PORT", "12181") os.Setenv("ZOOKEEPER_USERNAME", "") os.Setenv("ZOOKEEPER_PASSWORD", "") } diff --git a/pkg/common/discoveryregister/zookeeper/zookeeper.go b/pkg/common/discoveryregister/zookeeper/zookeeper.go index a4edffc43..db1a5c6c4 100644 --- a/pkg/common/discoveryregister/zookeeper/zookeeper.go +++ b/pkg/common/discoveryregister/zookeeper/zookeeper.go @@ -52,10 +52,18 @@ func getEnv(key, fallback string) string { return fallback } -// getZkAddrFromEnv returns the value of an environment variable if it exists, otherwise it returns the fallback value. +// getZkAddrFromEnv returns the Zookeeper addresses combined from the ZOOKEEPER_ADDRESS and ZOOKEEPER_PORT environment variables. +// If the environment variables are not set, it returns the fallback value. func getZkAddrFromEnv(fallback []string) []string { - if value, exists := os.LookupEnv("ZOOKEEPER_ADDRESS"); exists { - return strings.Split(value, ",") + address, addrExists := os.LookupEnv("ZOOKEEPER_ADDRESS") + port, portExists := os.LookupEnv("ZOOKEEPER_PORT") + + if addrExists && portExists { + addresses := strings.Split(address, ",") + for i, addr := range addresses { + addresses[i] = addr + ":" + port + } + return addresses } return fallback } diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index 9a06809b7..06b1e2b4c 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -31,15 +31,14 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" ) -const ( - maxRetry = 10 // Maximum number of retries for producer creation -) +const maxRetry = 10 // number of retries -var errEmptyMsg = errors.New("binary msg is empty") +var errEmptyMsg = errors.New("kafka binary msg is empty") +// Producer represents a Kafka producer. type Producer struct { - topic string addr []string + topic string config *sarama.Config producer sarama.SyncProducer } @@ -68,7 +67,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer { // Get Kafka configuration from environment variables or fallback to config file kafkaUsername := getEnvOrConfig("KAFKA_USERNAME", config.Config.Kafka.Username) kafkaPassword := getEnvOrConfig("KAFKA_PASSWORD", config.Config.Kafka.Password) - kafkaAddr := getEnvOrConfig("KAFKA_ADDRESS", addr[0]) // Assuming addr[0] contains address from config + kafkaAddr := getKafkaAddrFromEnv(addr) // Updated to use the new function // Configure SASL authentication if credentials are provided if kafkaUsername != "" && kafkaPassword != "" { @@ -78,7 +77,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer { } // Set the Kafka address - p.addr = []string{kafkaAddr} + p.addr = kafkaAddr // Set up TLS configuration (if required) SetupTLSConfig(p.config) diff --git a/pkg/common/kafka/util.go b/pkg/common/kafka/util.go index da0c5349b..f318ecf73 100644 --- a/pkg/common/kafka/util.go +++ b/pkg/common/kafka/util.go @@ -15,7 +15,9 @@ package kafka import ( + "fmt" "os" + "strings" "github.com/IBM/sarama" @@ -44,3 +46,20 @@ func getEnvOrConfig(envName string, configValue string) string { } return configValue } + +// getKafkaAddrFromEnv returns the Kafka addresses combined from the KAFKA_ADDRESS and KAFKA_PORT environment variables. +// If the environment variables are not set, it returns the fallback value. +func getKafkaAddrFromEnv(fallback []string) []string { + envAddr := os.Getenv("KAFKA_ADDRESS") + envPort := os.Getenv("KAFKA_PORT") + + if envAddr != "" && envPort != "" { + addresses := strings.Split(envAddr, ",") + for i, addr := range addresses { + addresses[i] = fmt.Sprintf("%s:%s", addr, envPort) + } + return addresses + } + + return fallback +} diff --git a/tools/component/component_test.go b/tools/component/component_test.go index 08ea0fda8..4488c029e 100644 --- a/tools/component/component_test.go +++ b/tools/component/component_test.go @@ -21,22 +21,10 @@ import ( "time" "github.com/redis/go-redis/v9" - "github.com/stretchr/testify/assert" "github.com/openimsdk/open-im-server/v3/pkg/common/config" ) -func TestCheckMysql(t *testing.T) { - err := mockInitCfg() - assert.NoError(t, err, "Initialization should not produce errors") - - err = checkMysql() - if err != nil { - // You might expect an error if MySQL isn't running locally with the mock credentials. - t.Logf("Expected error due to mock configuration: %v", err) - } -} - // Mock for initCfg for testing purpose func mockInitCfg() error { config.Config.Mysql.Username = "root"