mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-05 20:11:14 +08:00
Add TLS config for Kafka (#956)
* feat: add TLS utility. * chore: rename pkg/tls/tls.go to pkg/common/tls/tls.go . * feat: add util for kafka TLS config. * feat: setup TLS config for kafka consumer. * feat: add TLS config to kafka consumer group. * feat: add TLS config for kafka producer. * chore: add TLS config for kafka. * feat: add TLS config for kafka checker.
This commit is contained in:
parent
6ca1afe641
commit
303162f8a5
@ -81,9 +81,16 @@ type configStruct struct {
|
||||
} `yaml:"redis"`
|
||||
|
||||
Kafka struct {
|
||||
Username string `yaml:"username"`
|
||||
Password string `yaml:"password"`
|
||||
Addr []string `yaml:"addr"`
|
||||
Username string `yaml:"username"`
|
||||
Password string `yaml:"password"`
|
||||
Addr []string `yaml:"addr"`
|
||||
TLS *struct {
|
||||
CACrt string `yaml:"caCrt"`
|
||||
ClientCrt string `yaml:"clientCrt"`
|
||||
ClientKey string `yaml:"clientKey"`
|
||||
ClientKeyPwd string `yaml:"clientKeyPwd"`
|
||||
InsecureSkipVerify bool `yaml:"insecureSkipVerify"`
|
||||
} `yaml:"tls"`
|
||||
LatestMsgToRedis struct {
|
||||
Topic string `yaml:"topic"`
|
||||
} `yaml:"latestMsgToRedis"`
|
||||
|
@ -40,6 +40,7 @@ func NewKafkaConsumer(addr []string, topic string) *Consumer {
|
||||
consumerConfig.Net.SASL.User = config.Config.Kafka.Username
|
||||
consumerConfig.Net.SASL.Password = config.Config.Kafka.Password
|
||||
}
|
||||
SetupTLSConfig(consumerConfig)
|
||||
consumer, err := sarama.NewConsumer(p.addr, consumerConfig)
|
||||
if err != nil {
|
||||
panic(err.Error())
|
||||
|
@ -17,6 +17,7 @@ package kafka
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
@ -35,11 +36,17 @@ type MConsumerGroupConfig struct {
|
||||
}
|
||||
|
||||
func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []string, groupID string) *MConsumerGroup {
|
||||
config := sarama.NewConfig()
|
||||
config.Version = consumerConfig.KafkaVersion
|
||||
config.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial
|
||||
config.Consumer.Return.Errors = consumerConfig.IsReturnErr
|
||||
consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, config)
|
||||
consumerGroupConfig := sarama.NewConfig()
|
||||
consumerGroupConfig.Version = consumerConfig.KafkaVersion
|
||||
consumerGroupConfig.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial
|
||||
consumerGroupConfig.Consumer.Return.Errors = consumerConfig.IsReturnErr
|
||||
if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" {
|
||||
consumerGroupConfig.Net.SASL.Enable = true
|
||||
consumerGroupConfig.Net.SASL.User = config.Config.Kafka.Username
|
||||
consumerGroupConfig.Net.SASL.Password = config.Config.Kafka.Password
|
||||
}
|
||||
SetupTLSConfig(consumerGroupConfig)
|
||||
consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, consumerGroupConfig)
|
||||
if err != nil {
|
||||
panic(err.Error())
|
||||
}
|
||||
|
@ -60,6 +60,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
|
||||
}
|
||||
p.addr = addr
|
||||
p.topic = topic
|
||||
SetupTLSConfig(p.config)
|
||||
var producer sarama.SyncProducer
|
||||
var err error
|
||||
for i := 0; i <= maxRetry; i++ {
|
||||
|
20
pkg/common/kafka/util.go
Normal file
20
pkg/common/kafka/util.go
Normal file
@ -0,0 +1,20 @@
|
||||
package kafka
|
||||
|
||||
import (
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tls"
|
||||
"github.com/Shopify/sarama"
|
||||
)
|
||||
|
||||
// SetupTLSConfig set up the TLS config from config file.
|
||||
func SetupTLSConfig(cfg *sarama.Config) {
|
||||
if config.Config.Kafka.TLS != nil {
|
||||
cfg.Net.TLS.Enable = true
|
||||
cfg.Net.TLS.Config = tls.NewTLSConfig(
|
||||
config.Config.Kafka.TLS.ClientCrt,
|
||||
config.Config.Kafka.TLS.ClientKey,
|
||||
config.Config.Kafka.TLS.CACrt,
|
||||
[]byte(config.Config.Kafka.TLS.ClientKeyPwd),
|
||||
)
|
||||
}
|
||||
}
|
71
pkg/common/tls/tls.go
Normal file
71
pkg/common/tls/tls.go
Normal file
@ -0,0 +1,71 @@
|
||||
package tls
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/pem"
|
||||
"errors"
|
||||
"os"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
)
|
||||
|
||||
|
||||
func decryptPEM(data []byte, passphrase []byte) ([]byte, error) {
|
||||
if len(passphrase) == 0 {
|
||||
return data, nil
|
||||
}
|
||||
b, _ := pem.Decode(data)
|
||||
d, err := x509.DecryptPEMBlock(b, passphrase)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return pem.EncodeToMemory(&pem.Block{
|
||||
Type: b.Type,
|
||||
Bytes: d,
|
||||
}), nil
|
||||
}
|
||||
|
||||
func readEncryptablePEMBlock(path string, pwd []byte) ([]byte, error) {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return decryptPEM(data, pwd)
|
||||
}
|
||||
|
||||
// NewTLSConfig setup the TLS config from general config file.
|
||||
func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string, keyPwd []byte) *tls.Config {
|
||||
tlsConfig := tls.Config{}
|
||||
|
||||
if clientCertFile != "" && clientKeyFile != "" {
|
||||
certPEMBlock, err := os.ReadFile(clientCertFile)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
keyPEMBlock, err := readEncryptablePEMBlock(clientKeyFile, keyPwd)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
tlsConfig.Certificates = []tls.Certificate{cert}
|
||||
}
|
||||
|
||||
caCert, err := os.ReadFile(caCertFile)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
caCertPool := x509.NewCertPool()
|
||||
ok := caCertPool.AppendCertsFromPEM(caCert)
|
||||
if !ok {
|
||||
panic(errors.New("not a valid CA cert"))
|
||||
}
|
||||
tlsConfig.RootCAs = caCertPool
|
||||
|
||||
tlsConfig.InsecureSkipVerify = config.Config.Kafka.TLS.InsecureSkipVerify
|
||||
|
||||
return &tlsConfig
|
||||
}
|
@ -39,6 +39,7 @@ import (
|
||||
"gorm.io/gorm"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka"
|
||||
|
||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||
)
|
||||
@ -274,6 +275,7 @@ func checkKafka() error {
|
||||
cfg.Net.SASL.User = config.Config.Kafka.Username
|
||||
cfg.Net.SASL.Password = config.Config.Kafka.Password
|
||||
}
|
||||
kafka.SetupTLSConfig(cfg)
|
||||
kafkaClient, err := sarama.NewClient(config.Config.Kafka.Addr, cfg)
|
||||
if err != nil {
|
||||
return errs.Wrap(err)
|
||||
|
Loading…
x
Reference in New Issue
Block a user