From bb0f5e97a2a6f0c2c0236032b1c46aa9b06e5d44 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Fri, 16 Sep 2022 16:51:12 +0800 Subject: [PATCH] kafka --- config/config.yaml | 4 ++-- pkg/common/config/config.go | 6 +++--- pkg/common/kafka/consumer.go | 6 +++--- pkg/common/kafka/producer.go | 6 +++--- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index f474fc06a..0385c8831 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -48,8 +48,8 @@ redis: enableCluster: false #如果外部redis以集群方式启动,需要打开此开关 kafka: - userName: - password: + SASLUserName: + SASLPassword: ws2mschat: addr: [ 127.0.0.1:9092 ] #kafka配置,默认即可 topic: "ws2ms_chat" #用于mongo和mysql保存消息 diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index e246c76bf..62b3eb373 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -221,9 +221,9 @@ type config struct { } Kafka struct { - UserName string `yaml:"userName"` - Password string `yaml:"password"` - Ws2mschat struct { + SASLUserName string `yaml:"SASLUserName"` + SASLPassword string `yaml:"SASLPassword"` + Ws2mschat struct { Addr []string `yaml:"addr"` Topic string `yaml:"topic"` } diff --git a/pkg/common/kafka/consumer.go b/pkg/common/kafka/consumer.go index dd3182fad..f794a244c 100644 --- a/pkg/common/kafka/consumer.go +++ b/pkg/common/kafka/consumer.go @@ -20,10 +20,10 @@ func NewKafkaConsumer(addr []string, topic string) *Consumer { p.Topic = topic p.addr = addr consumerConfig := sarama.NewConfig() - if config.Config.Kafka.UserName != "" && config.Config.Kafka.Password != "" { + if config.Config.Kafka.SASLUserName != "" && config.Config.Kafka.SASLPassword != "" { consumerConfig.Net.SASL.Enable = true - consumerConfig.Net.SASL.User = config.Config.Kafka.UserName - consumerConfig.Net.SASL.Password = config.Config.Kafka.Password + consumerConfig.Net.SASL.User = config.Config.Kafka.SASLUserName + consumerConfig.Net.SASL.Password = config.Config.Kafka.SASLPassword } consumer, err := sarama.NewConsumer(p.addr, consumerConfig) if err != nil { diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index 23dfc3085..1df0e0ed8 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -26,10 +26,10 @@ func NewKafkaProducer(addr []string, topic string) *Producer { p.config.Producer.Return.Errors = true p.config.Producer.RequiredAcks = sarama.WaitForAll //Set producer Message Reply level 0 1 all p.config.Producer.Partitioner = sarama.NewHashPartitioner //Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly - if config.Config.Kafka.UserName != "" && config.Config.Kafka.Password != "" { + if config.Config.Kafka.SASLUserName != "" && config.Config.Kafka.SASLPassword != "" { p.config.Net.SASL.Enable = true - p.config.Net.SASL.User = config.Config.Kafka.UserName - p.config.Net.SASL.Password = config.Config.Kafka.Password + p.config.Net.SASL.User = config.Config.Kafka.SASLUserName + p.config.Net.SASL.Password = config.Config.Kafka.SASLPassword } p.addr = addr p.topic = topic