mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-27 05:52:29 +08:00 
			
		
		
		
	feat: add kafka and redis mongo env
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
This commit is contained in:
		
							parent
							
								
									d61495b047
								
							
						
					
					
						commit
						856b54e371
					
				| @ -1,17 +1,3 @@ | |||||||
| // Copyright © 2023 OpenIM. All rights reserved. |  | ||||||
| // |  | ||||||
| // Licensed under the Apache License, Version 2.0 (the "License"); |  | ||||||
| // you may not use this file except in compliance with the License. |  | ||||||
| // You may obtain a copy of the License at |  | ||||||
| // |  | ||||||
| //     http://www.apache.org/licenses/LICENSE-2.0 |  | ||||||
| // |  | ||||||
| // Unless required by applicable law or agreed to in writing, software |  | ||||||
| // distributed under the License is distributed on an "AS IS" BASIS, |  | ||||||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |  | ||||||
| // See the License for the specific language governing permissions and |  | ||||||
| // limitations under the License. |  | ||||||
| 
 |  | ||||||
| package kafka | package kafka | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| @ -21,19 +7,17 @@ import ( | |||||||
| 	"strings" | 	"strings" | ||||||
| 	"time" | 	"time" | ||||||
| 
 | 
 | ||||||
|  | 	"github.com/IBM/sarama" | ||||||
| 	"github.com/OpenIMSDK/protocol/constant" | 	"github.com/OpenIMSDK/protocol/constant" | ||||||
| 	log "github.com/OpenIMSDK/tools/log" | 	"github.com/OpenIMSDK/tools/log" | ||||||
| 	"github.com/OpenIMSDK/tools/mcontext" | 	"github.com/OpenIMSDK/tools/mcontext" | ||||||
| 	"github.com/OpenIMSDK/tools/utils" | 	"github.com/OpenIMSDK/tools/utils" | ||||||
| 
 |  | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||||
| 
 |  | ||||||
| 	"github.com/IBM/sarama" |  | ||||||
| 	"google.golang.org/protobuf/proto" | 	"google.golang.org/protobuf/proto" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| const ( | const ( | ||||||
| 	maxRetry = 10 // number of retries | 	maxRetry = 10 // Maximum number of retries for producer creation | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| var errEmptyMsg = errors.New("binary msg is empty") | var errEmptyMsg = errors.New("binary msg is empty") | ||||||
| @ -45,62 +29,85 @@ type Producer struct { | |||||||
| 	producer sarama.SyncProducer | 	producer sarama.SyncProducer | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // NewKafkaProducer Initialize kafka producer. | // NewKafkaProducer initializes a new Kafka producer. | ||||||
| func NewKafkaProducer(addr []string, topic string) *Producer { | func NewKafkaProducer(addr []string, topic string) *Producer { | ||||||
| 	p := Producer{} | 	p := Producer{ | ||||||
| 	p.config = sarama.NewConfig()             // Instantiate a sarama Config | 		addr:   addr, | ||||||
| 	p.config.Producer.Return.Successes = true // Whether to enable the successes channel to be notified after the message is sent successfully | 		topic:  topic, | ||||||
|  | 		config: sarama.NewConfig(), | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// Set producer return flags | ||||||
|  | 	p.config.Producer.Return.Successes = true | ||||||
| 	p.config.Producer.Return.Errors = true | 	p.config.Producer.Return.Errors = true | ||||||
| 	p.config.Producer.Partitioner = sarama.NewHashPartitioner // Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly |  | ||||||
| 
 | 
 | ||||||
| 	var producerAck = sarama.WaitForAll // default: WaitForAll | 	// Set partitioner strategy | ||||||
| 	switch strings.ToLower(config.Config.Kafka.ProducerAck) { | 	p.config.Producer.Partitioner = sarama.NewHashPartitioner | ||||||
| 	case "no_response": |  | ||||||
| 		producerAck = sarama.NoResponse |  | ||||||
| 	case "wait_for_local": |  | ||||||
| 		producerAck = sarama.WaitForLocal |  | ||||||
| 	case "wait_for_all": |  | ||||||
| 		producerAck = sarama.WaitForAll |  | ||||||
| 	} |  | ||||||
| 	p.config.Producer.RequiredAcks = producerAck |  | ||||||
| 
 | 
 | ||||||
| 	var compress = sarama.CompressionNone // default: no compress | 	// Configure producer acknowledgement level | ||||||
| 	_ = compress.UnmarshalText(bytes.ToLower([]byte(config.Config.Kafka.CompressType))) | 	configureProducerAck(&p, config.Config.Kafka.ProducerAck) | ||||||
| 	p.config.Producer.Compression = compress |  | ||||||
| 
 | 
 | ||||||
| 	if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" { | 	// Configure message compression | ||||||
|  | 	configureCompression(&p, config.Config.Kafka.CompressType) | ||||||
|  | 
 | ||||||
|  | 	// Get Kafka configuration from environment variables or fallback to config file | ||||||
|  | 	kafkaUsername := getEnvOrConfig("KAFKA_USERNAME", config.Config.Kafka.Username) | ||||||
|  | 	kafkaPassword := getEnvOrConfig("KAFKA_PASSWORD", config.Config.Kafka.Password) | ||||||
|  | 	kafkaAddr := getEnvOrConfig("KAFKA_ADDRESS", addr[0]) // Assuming addr[0] contains address from config | ||||||
|  | 
 | ||||||
|  | 	// Configure SASL authentication if credentials are provided | ||||||
|  | 	if kafkaUsername != "" && kafkaPassword != "" { | ||||||
| 		p.config.Net.SASL.Enable = true | 		p.config.Net.SASL.Enable = true | ||||||
| 		p.config.Net.SASL.User = config.Config.Kafka.Username | 		p.config.Net.SASL.User = kafkaUsername | ||||||
| 		p.config.Net.SASL.Password = config.Config.Kafka.Password | 		p.config.Net.SASL.Password = kafkaPassword | ||||||
| 	} | 	} | ||||||
| 	p.addr = addr | 
 | ||||||
| 	p.topic = topic | 	// Set the Kafka address | ||||||
|  | 	p.addr = []string{kafkaAddr} | ||||||
|  | 
 | ||||||
|  | 	// Set up TLS configuration (if required) | ||||||
| 	SetupTLSConfig(p.config) | 	SetupTLSConfig(p.config) | ||||||
| 	var producer sarama.SyncProducer | 
 | ||||||
|  | 	// Create the producer with retries | ||||||
| 	var err error | 	var err error | ||||||
| 	for i := 0; i <= maxRetry; i++ { | 	for i := 0; i <= maxRetry; i++ { | ||||||
| 		producer, err = sarama.NewSyncProducer(p.addr, p.config) // Initialize the client | 		p.producer, err = sarama.NewSyncProducer(p.addr, p.config) | ||||||
| 		if err == nil { | 		if err == nil { | ||||||
| 			p.producer = producer |  | ||||||
| 			return &p | 			return &p | ||||||
| 		} | 		} | ||||||
| 		//TODO If the password is wrong, exit directly | 		time.Sleep(1 * time.Second) // Wait before retrying | ||||||
| 		//if packetErr, ok := err.(*sarama.PacketEncodingError); ok { |  | ||||||
| 		//if _, ok := packetErr.Err.(sarama.AuthenticationError); ok { |  | ||||||
| 		//	fmt.Println("Kafka password is wrong.") |  | ||||||
| 		//} |  | ||||||
| 		//} else { |  | ||||||
| 		//	fmt.Printf("Failed to create Kafka producer: %v\n", err) |  | ||||||
| 		//} |  | ||||||
| 		time.Sleep(time.Duration(1) * time.Second) |  | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
|  | 	// Panic if unable to create producer after retries | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		panic(err.Error()) | 		panic("Failed to create Kafka producer: " + err.Error()) | ||||||
| 	} | 	} | ||||||
| 	p.producer = producer | 
 | ||||||
| 	return &p | 	return &p | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | // configureProducerAck configures the producer's acknowledgement level. | ||||||
|  | func configureProducerAck(p *Producer, ackConfig string) { | ||||||
|  | 	switch strings.ToLower(ackConfig) { | ||||||
|  | 	case "no_response": | ||||||
|  | 		p.config.Producer.RequiredAcks = sarama.NoResponse | ||||||
|  | 	case "wait_for_local": | ||||||
|  | 		p.config.Producer.RequiredAcks = sarama.WaitForLocal | ||||||
|  | 	case "wait_for_all": | ||||||
|  | 		p.config.Producer.RequiredAcks = sarama.WaitForAll | ||||||
|  | 	default: | ||||||
|  | 		p.config.Producer.RequiredAcks = sarama.WaitForAll | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // configureCompression configures the message compression type for the producer. | ||||||
|  | func configureCompression(p *Producer, compressType string) { | ||||||
|  | 	var compress sarama.CompressionCodec = sarama.CompressionNone | ||||||
|  | 	compress.UnmarshalText(bytes.ToLower([]byte(compressType))) | ||||||
|  | 	p.config.Producer.Compression = compress | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // GetMQHeaderWithContext extracts message queue headers from the context. | ||||||
| func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) { | func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) { | ||||||
| 	operationID, opUserID, platform, connID, err := mcontext.GetCtxInfos(ctx) | 	operationID, opUserID, platform, connID, err := mcontext.GetCtxInfos(ctx) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| @ -111,22 +118,23 @@ func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) | |||||||
| 		{Key: []byte(constant.OpUserID), Value: []byte(opUserID)}, | 		{Key: []byte(constant.OpUserID), Value: []byte(opUserID)}, | ||||||
| 		{Key: []byte(constant.OpUserPlatform), Value: []byte(platform)}, | 		{Key: []byte(constant.OpUserPlatform), Value: []byte(platform)}, | ||||||
| 		{Key: []byte(constant.ConnID), Value: []byte(connID)}, | 		{Key: []byte(constant.ConnID), Value: []byte(connID)}, | ||||||
| 	}, err | 	}, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | // GetContextWithMQHeader creates a context from message queue headers. | ||||||
| func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context { | func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context { | ||||||
| 	var values []string | 	var values []string | ||||||
| 	for _, recordHeader := range header { | 	for _, recordHeader := range header { | ||||||
| 		values = append(values, string(recordHeader.Value)) | 		values = append(values, string(recordHeader.Value)) | ||||||
| 	} | 	} | ||||||
| 	return mcontext.WithMustInfoCtx(values) // TODO | 	return mcontext.WithMustInfoCtx(values) // Attach extracted values to context | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | // SendMessage sends a message to the Kafka topic configured in the Producer. | ||||||
| func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Message) (int32, int64, error) { | func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Message) (int32, int64, error) { | ||||||
| 	log.ZDebug(ctx, "SendMessage", "msg", msg, "topic", p.topic, "key", key) | 	log.ZDebug(ctx, "SendMessage", "msg", msg, "topic", p.topic, "key", key) | ||||||
| 	kMsg := &sarama.ProducerMessage{} | 
 | ||||||
| 	kMsg.Topic = p.topic | 	// Marshal the protobuf message | ||||||
| 	kMsg.Key = sarama.StringEncoder(key) |  | ||||||
| 	bMsg, err := proto.Marshal(msg) | 	bMsg, err := proto.Marshal(msg) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return 0, 0, utils.Wrap(err, "kafka proto Marshal err") | 		return 0, 0, utils.Wrap(err, "kafka proto Marshal err") | ||||||
| @ -134,20 +142,33 @@ func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Messag | |||||||
| 	if len(bMsg) == 0 { | 	if len(bMsg) == 0 { | ||||||
| 		return 0, 0, utils.Wrap(errEmptyMsg, "") | 		return 0, 0, utils.Wrap(errEmptyMsg, "") | ||||||
| 	} | 	} | ||||||
| 	kMsg.Value = sarama.ByteEncoder(bMsg) | 
 | ||||||
|  | 	// Prepare Kafka message | ||||||
|  | 	kMsg := &sarama.ProducerMessage{ | ||||||
|  | 		Topic: p.topic, | ||||||
|  | 		Key:   sarama.StringEncoder(key), | ||||||
|  | 		Value: sarama.ByteEncoder(bMsg), | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// Validate message key and value | ||||||
| 	if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 { | 	if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 { | ||||||
| 		return 0, 0, utils.Wrap(errEmptyMsg, "") | 		return 0, 0, utils.Wrap(errEmptyMsg, "") | ||||||
| 	} | 	} | ||||||
| 	kMsg.Metadata = ctx | 
 | ||||||
|  | 	// Attach context metadata as headers | ||||||
| 	header, err := GetMQHeaderWithContext(ctx) | 	header, err := GetMQHeaderWithContext(ctx) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return 0, 0, utils.Wrap(err, "") | 		return 0, 0, utils.Wrap(err, "") | ||||||
| 	} | 	} | ||||||
| 	kMsg.Headers = header | 	kMsg.Headers = header | ||||||
|  | 
 | ||||||
|  | 	// Send the message | ||||||
| 	partition, offset, err := p.producer.SendMessage(kMsg) | 	partition, offset, err := p.producer.SendMessage(kMsg) | ||||||
| 	log.ZDebug(ctx, "ByteEncoder SendMessage end", "key ", kMsg.Key, "key length", kMsg.Value.Length()) |  | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.ZWarn(ctx, "p.producer.SendMessage error", err) | 		log.ZWarn(ctx, "p.producer.SendMessage error", err) | ||||||
|  | 		return 0, 0, utils.Wrap(err, "") | ||||||
| 	} | 	} | ||||||
| 	return partition, offset, utils.Wrap(err, "") | 
 | ||||||
|  | 	log.ZDebug(ctx, "ByteEncoder SendMessage end", "key", kMsg.Key, "key length", kMsg.Value.Length()) | ||||||
|  | 	return partition, offset, nil | ||||||
| } | } | ||||||
|  | |||||||
| @ -15,6 +15,8 @@ | |||||||
| package kafka | package kafka | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
|  | 	"os" | ||||||
|  | 
 | ||||||
| 	"github.com/IBM/sarama" | 	"github.com/IBM/sarama" | ||||||
| 
 | 
 | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||||
| @ -33,3 +35,12 @@ func SetupTLSConfig(cfg *sarama.Config) { | |||||||
| 		) | 		) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | // getEnvOrConfig returns the value of the environment variable if it exists, | ||||||
|  | // otherwise, it returns the value from the configuration file. | ||||||
|  | func getEnvOrConfig(envName string, configValue string) string { | ||||||
|  | 	if value, exists := os.LookupEnv(envName); exists { | ||||||
|  | 		return value | ||||||
|  | 	} | ||||||
|  | 	return configValue | ||||||
|  | } | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user