diff --git a/pkg/common/db/cache/init_redis.go b/pkg/common/db/cache/init_redis.go index 63d2f5707..1308e9649 100644 --- a/pkg/common/db/cache/init_redis.go +++ b/pkg/common/db/cache/init_redis.go @@ -87,7 +87,15 @@ func NewRedis() (redis.UniversalClient, error) { // overrideConfigFromEnv overrides configuration fields with environment variables if present. func overrideConfigFromEnv() { if envAddr := os.Getenv("REDIS_ADDRESS"); envAddr != "" { - config.Config.Redis.Address = strings.Split(envAddr, ",") // Assuming addresses are comma-separated + if envPort := os.Getenv("REDIS_PORT"); envPort != "" { + addresses := strings.Split(envAddr, ",") + for i, addr := range addresses { + addresses[i] = addr + ":" + envPort + } + config.Config.Redis.Address = addresses + } else { + config.Config.Redis.Address = strings.Split(envAddr, ",") + } } if envUser := os.Getenv("REDIS_USERNAME"); envUser != "" { config.Config.Redis.Username = envUser diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index 9a06809b7..06b1e2b4c 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -31,15 +31,14 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" ) -const ( - maxRetry = 10 // Maximum number of retries for producer creation -) +const maxRetry = 10 // number of retries -var errEmptyMsg = errors.New("binary msg is empty") +var errEmptyMsg = errors.New("kafka binary msg is empty") +// Producer represents a Kafka producer. type Producer struct { - topic string addr []string + topic string config *sarama.Config producer sarama.SyncProducer } @@ -68,7 +67,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer { // Get Kafka configuration from environment variables or fallback to config file kafkaUsername := getEnvOrConfig("KAFKA_USERNAME", config.Config.Kafka.Username) kafkaPassword := getEnvOrConfig("KAFKA_PASSWORD", config.Config.Kafka.Password) - kafkaAddr := getEnvOrConfig("KAFKA_ADDRESS", addr[0]) // Assuming addr[0] contains address from config + kafkaAddr := getKafkaAddrFromEnv(addr) // Updated to use the new function // Configure SASL authentication if credentials are provided if kafkaUsername != "" && kafkaPassword != "" { @@ -78,7 +77,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer { } // Set the Kafka address - p.addr = []string{kafkaAddr} + p.addr = kafkaAddr // Set up TLS configuration (if required) SetupTLSConfig(p.config) diff --git a/pkg/common/kafka/util.go b/pkg/common/kafka/util.go index da0c5349b..f318ecf73 100644 --- a/pkg/common/kafka/util.go +++ b/pkg/common/kafka/util.go @@ -15,7 +15,9 @@ package kafka import ( + "fmt" "os" + "strings" "github.com/IBM/sarama" @@ -44,3 +46,20 @@ func getEnvOrConfig(envName string, configValue string) string { } return configValue } + +// getKafkaAddrFromEnv returns the Kafka addresses combined from the KAFKA_ADDRESS and KAFKA_PORT environment variables. +// If the environment variables are not set, it returns the fallback value. +func getKafkaAddrFromEnv(fallback []string) []string { + envAddr := os.Getenv("KAFKA_ADDRESS") + envPort := os.Getenv("KAFKA_PORT") + + if envAddr != "" && envPort != "" { + addresses := strings.Split(envAddr, ",") + for i, addr := range addresses { + addresses[i] = fmt.Sprintf("%s:%s", addr, envPort) + } + return addresses + } + + return fallback +}