mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-24 18:36:19 +08:00
kafka
This commit is contained in:
parent
665e2e3e83
commit
bb0f5e97a2
@ -48,8 +48,8 @@ redis:
|
|||||||
enableCluster: false #如果外部redis以集群方式启动,需要打开此开关
|
enableCluster: false #如果外部redis以集群方式启动,需要打开此开关
|
||||||
|
|
||||||
kafka:
|
kafka:
|
||||||
userName:
|
SASLUserName:
|
||||||
password:
|
SASLPassword:
|
||||||
ws2mschat:
|
ws2mschat:
|
||||||
addr: [ 127.0.0.1:9092 ] #kafka配置,默认即可
|
addr: [ 127.0.0.1:9092 ] #kafka配置,默认即可
|
||||||
topic: "ws2ms_chat" #用于mongo和mysql保存消息
|
topic: "ws2ms_chat" #用于mongo和mysql保存消息
|
||||||
|
@ -221,8 +221,8 @@ type config struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Kafka struct {
|
Kafka struct {
|
||||||
UserName string `yaml:"userName"`
|
SASLUserName string `yaml:"SASLUserName"`
|
||||||
Password string `yaml:"password"`
|
SASLPassword string `yaml:"SASLPassword"`
|
||||||
Ws2mschat struct {
|
Ws2mschat struct {
|
||||||
Addr []string `yaml:"addr"`
|
Addr []string `yaml:"addr"`
|
||||||
Topic string `yaml:"topic"`
|
Topic string `yaml:"topic"`
|
||||||
|
@ -20,10 +20,10 @@ func NewKafkaConsumer(addr []string, topic string) *Consumer {
|
|||||||
p.Topic = topic
|
p.Topic = topic
|
||||||
p.addr = addr
|
p.addr = addr
|
||||||
consumerConfig := sarama.NewConfig()
|
consumerConfig := sarama.NewConfig()
|
||||||
if config.Config.Kafka.UserName != "" && config.Config.Kafka.Password != "" {
|
if config.Config.Kafka.SASLUserName != "" && config.Config.Kafka.SASLPassword != "" {
|
||||||
consumerConfig.Net.SASL.Enable = true
|
consumerConfig.Net.SASL.Enable = true
|
||||||
consumerConfig.Net.SASL.User = config.Config.Kafka.UserName
|
consumerConfig.Net.SASL.User = config.Config.Kafka.SASLUserName
|
||||||
consumerConfig.Net.SASL.Password = config.Config.Kafka.Password
|
consumerConfig.Net.SASL.Password = config.Config.Kafka.SASLPassword
|
||||||
}
|
}
|
||||||
consumer, err := sarama.NewConsumer(p.addr, consumerConfig)
|
consumer, err := sarama.NewConsumer(p.addr, consumerConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -26,10 +26,10 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
|
|||||||
p.config.Producer.Return.Errors = true
|
p.config.Producer.Return.Errors = true
|
||||||
p.config.Producer.RequiredAcks = sarama.WaitForAll //Set producer Message Reply level 0 1 all
|
p.config.Producer.RequiredAcks = sarama.WaitForAll //Set producer Message Reply level 0 1 all
|
||||||
p.config.Producer.Partitioner = sarama.NewHashPartitioner //Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly
|
p.config.Producer.Partitioner = sarama.NewHashPartitioner //Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly
|
||||||
if config.Config.Kafka.UserName != "" && config.Config.Kafka.Password != "" {
|
if config.Config.Kafka.SASLUserName != "" && config.Config.Kafka.SASLPassword != "" {
|
||||||
p.config.Net.SASL.Enable = true
|
p.config.Net.SASL.Enable = true
|
||||||
p.config.Net.SASL.User = config.Config.Kafka.UserName
|
p.config.Net.SASL.User = config.Config.Kafka.SASLUserName
|
||||||
p.config.Net.SASL.Password = config.Config.Kafka.Password
|
p.config.Net.SASL.Password = config.Config.Kafka.SASLPassword
|
||||||
}
|
}
|
||||||
p.addr = addr
|
p.addr = addr
|
||||||
p.topic = topic
|
p.topic = topic
|
||||||
|
Loading…
x
Reference in New Issue
Block a user