From aeee3f33b19810ed9c5214e2b8827574be48026a Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Fri, 7 Mar 2025 14:48:44 +0800 Subject: [PATCH] resolving merge conflicts --- go.sum | 4 +- internal/api/config_manager.go | 2 +- internal/api/init.go | 2 +- internal/msggateway/ws_server.go | 12 ++- internal/msgtransfer/init.go | 2 +- .../msgtransfer/online_history_msg_handler.go | 2 +- .../online_msg_to_mongo_handler.go | 2 +- internal/push/offlinepush_handler.go | 2 +- internal/push/onlinepusher.go | 2 +- internal/push/push_handler.go | 2 +- internal/rpc/auth/auth.go | 2 +- internal/rpc/msg/send.go | 5 -- internal/rpc/third/s3.go | 8 +- internal/tools/cron_task.go | 2 +- pkg/common/cmd/root.go | 4 +- pkg/common/config/config.go | 2 +- pkg/common/startrpc/start.go | 3 +- pkg/common/storage/controller/msg.go | 2 +- pkg/common/storage/controller/msg_transfer.go | 2 +- pkg/common/storage/controller/push.go | 2 +- pkg/common/storage/controller/s3.go | 4 +- pkg/common/storage/kafka/config.go | 33 +++++++ pkg/common/storage/kafka/consumer_group.go | 68 +++++++++++++++ pkg/common/storage/kafka/producer.go | 82 ++++++++++++++++++ pkg/common/storage/kafka/sarama.go | 85 +++++++++++++++++++ pkg/common/storage/kafka/tls.go | 83 ++++++++++++++++++ pkg/common/storage/kafka/util.go | 34 ++++++++ pkg/common/storage/kafka/verify.go | 79 +++++++++++++++++ tools/check-component/main.go | 4 +- tools/seq/internal/main.go | 2 +- 30 files changed, 503 insertions(+), 35 deletions(-) create mode 100644 pkg/common/storage/kafka/config.go create mode 100644 pkg/common/storage/kafka/consumer_group.go create mode 100644 pkg/common/storage/kafka/producer.go create mode 100644 pkg/common/storage/kafka/sarama.go create mode 100644 pkg/common/storage/kafka/tls.go create mode 100644 pkg/common/storage/kafka/util.go create mode 100644 pkg/common/storage/kafka/verify.go diff --git a/go.sum b/go.sum index 2a86d97ea..66af77379 100644 --- a/go.sum +++ b/go.sum @@ -345,8 +345,8 @@ github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= -github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM= -github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= +github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= +github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.72-alpha.79 h1:e46no8WVAsmTzyy405klrdoUiG7u+1ohDsXvQuFng4s= github.com/openimsdk/protocol v0.0.72-alpha.79/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= github.com/openimsdk/tools v0.0.50-alpha.74 h1:yh10SiMiivMEjicEQg+QAsH4pvaO+4noMPdlw+ew0Kc= diff --git a/internal/api/config_manager.go b/internal/api/config_manager.go index c61b2cb0b..4d846dd9b 100644 --- a/internal/api/config_manager.go +++ b/internal/api/config_manager.go @@ -73,7 +73,7 @@ func (cm *ConfigManager) GetConfig(c *gin.Context) { func (cm *ConfigManager) GetConfigList(c *gin.Context) { var resp apistruct.GetConfigListResp resp.ConfigNames = cm.config.GetConfigNames() - resp.Environment = runtimeenv.PrintRuntimeEnvironment() + resp.Environment = runtimeenv.RuntimeEnvironment() resp.Version = version.Version apiresp.GinSuccess(c, resp) diff --git a/internal/api/init.go b/internal/api/init.go index 20237ebc2..4bd29c9e0 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -56,7 +56,7 @@ func Start(ctx context.Context, index int, config *Config) error { return err } - config.RuntimeEnv = runtimeenv.PrintRuntimeEnvironment() + config.RuntimeEnv = runtimeenv.RuntimeEnvironment() client, err := kdisc.NewDiscoveryRegister(&config.Discovery, config.RuntimeEnv, []string{ config.Discovery.RpcService.MessageGateway, diff --git a/internal/msggateway/ws_server.go b/internal/msggateway/ws_server.go index 24dd823f6..0731074c0 100644 --- a/internal/msggateway/ws_server.go +++ b/internal/msggateway/ws_server.go @@ -223,15 +223,19 @@ func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *C if err != nil { return err } + if len(conns) == 0 || (len(conns) == 1 && ws.disCov.IsSelfNode(conns[0])) { + return nil + } + wg := errgroup.Group{} wg.SetLimit(concurrentRequest) // Online push user online message to other node for _, v := range conns { v := v - log.ZDebug(ctx, " sendUserOnlineInfoToOtherNode conn ", "target", v.Target()) - if v.Target() == ws.disCov.GetSelfConnTarget() { - log.ZDebug(ctx, "Filter out this node", "node", v.Target()) + log.ZDebug(ctx, "sendUserOnlineInfoToOtherNode conn") + if ws.disCov.IsSelfNode(v) { + log.ZDebug(ctx, "Filter out this node") continue } @@ -242,7 +246,7 @@ func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *C PlatformID: int32(client.PlatformID), Token: client.token, }) if err != nil { - log.ZWarn(ctx, "MultiTerminalLoginCheck err", err, "node", v.Target()) + log.ZWarn(ctx, "MultiTerminalLoginCheck err", err) } return nil }) diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 96e6bbde0..ea7ebb24c 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -74,7 +74,7 @@ type Config struct { } func Start(ctx context.Context, index int, config *Config) error { - runTimeEnv := runtimeenv.PrintRuntimeEnvironment() + runTimeEnv := runtimeenv.RuntimeEnvironment() log.CInfo(ctx, "MSG-TRANSFER server is initializing", "runTimeEnv", runTimeEnv, "prometheusPorts", config.MsgTransfer.Prometheus.Ports, "index", index) diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 6334c95fd..e1d3f71fd 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -29,6 +29,7 @@ import ( "github.com/go-redis/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/tools/batcher" "github.com/openimsdk/protocol/constant" @@ -37,7 +38,6 @@ import ( "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" - "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/utils/stringutil" "google.golang.org/protobuf/proto" ) diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 8405be7fe..01872562e 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -21,9 +21,9 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" pbmsg "github.com/openimsdk/protocol/msg" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/mq/kafka" "google.golang.org/protobuf/proto" ) diff --git a/internal/push/offlinepush_handler.go b/internal/push/offlinepush_handler.go index 5c69da005..c685a188a 100644 --- a/internal/push/offlinepush_handler.go +++ b/internal/push/offlinepush_handler.go @@ -7,12 +7,12 @@ import ( "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/protocol/constant" pbpush "github.com/openimsdk/protocol/push" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/utils/jsonutil" "google.golang.org/protobuf/proto" ) diff --git a/internal/push/onlinepusher.go b/internal/push/onlinepusher.go index 23e68339c..8bec6e60b 100644 --- a/internal/push/onlinepusher.go +++ b/internal/push/onlinepusher.go @@ -166,7 +166,7 @@ func (k *K8sStaticConsistentHash) GetConnsAndOnlinePush(ctx context.Context, msg } } log.ZDebug(ctx, "genUsers send hosts struct:", "usersHost", usersHost) - var usersConns = make(map[*grpc.ClientConn][]string) + var usersConns = make(map[grpc.ClientConnInterface][]string) for host, userIds := range usersHost { tconn, _ := k.disCov.GetConn(ctx, host) usersConns[tconn] = userIds diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 826a6e944..23233ef2e 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -14,6 +14,7 @@ import ( "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/rpccache" @@ -25,7 +26,6 @@ import ( "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" - "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/jsonutil" "github.com/openimsdk/tools/utils/timeutil" diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 3e096aa64..2e64c365c 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -192,7 +192,7 @@ func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID return err } for _, v := range conns { - log.ZDebug(ctx, "forceKickOff", "conn", v.Target()) + log.ZDebug(ctx, "forceKickOff", "userID", userID, "platformID", platformID) client := msggateway.NewMsgGatewayClient(v) kickReq := &msggateway.KickUserOfflineReq{KickUserIDList: []string{userID}, PlatformID: platformID} _, err := client.KickUserOffline(ctx, kickReq) diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index 0731dd8d4..2b66f7a9a 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -49,11 +49,6 @@ func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg. func (m *msgServer) sendMsg(ctx context.Context, req *pbmsg.SendMsgReq, before **sdkws.MsgData) (*pbmsg.SendMsgResp, error) { m.encapsulateMsgData(req.MsgData) - if req.MsgData.ContentType == constant.Stream { - if err := m.handlerStreamMsg(ctx, req.MsgData); err != nil { - return nil, err - } - } switch req.MsgData.SessionType { case constant.SingleChatType: return m.sendMsgSingleChat(ctx, req, before) diff --git a/internal/rpc/third/s3.go b/internal/rpc/third/s3.go index 8796fe824..97206dd6d 100644 --- a/internal/rpc/third/s3.go +++ b/internal/rpc/third/s3.go @@ -19,11 +19,12 @@ import ( "encoding/base64" "encoding/hex" "encoding/json" - "github.com/openimsdk/open-im-server/v3/pkg/authverify" "path" "strconv" "time" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" + "github.com/google/uuid" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" @@ -37,7 +38,10 @@ import ( ) func (t *thirdServer) PartLimit(ctx context.Context, req *third.PartLimitReq) (*third.PartLimitResp, error) { - limit := t.s3dataBase.PartLimit() + limit, err := t.s3dataBase.PartLimit() + if err != nil { + return nil, err + } return &third.PartLimitResp{ MinPartSize: limit.MinPartSize, MaxPartSize: limit.MaxPartSize, diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index da1c6320e..5932053d0 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -31,7 +31,7 @@ type CronTaskConfig struct { } func Start(ctx context.Context, conf *CronTaskConfig) error { - conf.runTimeEnv = runtimeenv.PrintRuntimeEnvironment() + conf.runTimeEnv = runtimeenv.RuntimeEnvironment() log.CInfo(ctx, "CRON-TASK server is initializing", "runTimeEnv", conf.runTimeEnv, "chatRecordsClearTime", conf.CronTask.CronExecuteTime, "msgDestructTime", conf.CronTask.RetainChatRecords) if conf.CronTask.RetainChatRecords < 1 { diff --git a/pkg/common/cmd/root.go b/pkg/common/cmd/root.go index 0a405fb6e..80912679c 100644 --- a/pkg/common/cmd/root.go +++ b/pkg/common/cmd/root.go @@ -86,7 +86,7 @@ func (r *RootCmd) initEtcd() error { return err } disConfig := config.Discovery{} - env := runtimeenv.PrintRuntimeEnvironment() + env := runtimeenv.RuntimeEnvironment() err = config.Load(configDirectory, config.DiscoveryConfigFilename, config.EnvPrefixMap[config.DiscoveryConfigFilename], env, &disConfig) if err != nil { @@ -125,7 +125,7 @@ func (r *RootCmd) initializeConfiguration(cmd *cobra.Command, opts *CmdOpts) err return err } - runtimeEnv := runtimeenv.PrintRuntimeEnvironment() + runtimeEnv := runtimeenv.RuntimeEnvironment() // Load common configuration file //opts.configMap[ShareFileName] = StructEnvPrefix{EnvPrefix: shareEnvPrefix, ConfigStruct: &r.share} diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index eb38b64ac..ca448083c 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -18,9 +18,9 @@ import ( "strings" "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" - "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/s3/aws" "github.com/openimsdk/tools/s3/cos" "github.com/openimsdk/tools/s3/kodo" diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 27aabca95..99df537f7 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -70,7 +70,7 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf return err } - runTimeEnv := runtimeenv.PrintRuntimeEnvironment() + runTimeEnv := runtimeenv.RuntimeEnvironment() if !autoSetPorts { rpcPort, err := datautil.GetElemByIndex(rpcPorts, index) @@ -177,6 +177,7 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf } err = client.Register( + ctx, rpcRegisterName, registerIP, port, diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index 0069dc7cc..5f924e55a 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -33,12 +33,12 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/protocol/constant" pbmsg "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/utils/datautil" ) diff --git a/pkg/common/storage/controller/msg_transfer.go b/pkg/common/storage/controller/msg_transfer.go index f4c0c6270..15f3b968f 100644 --- a/pkg/common/storage/controller/msg_transfer.go +++ b/pkg/common/storage/controller/msg_transfer.go @@ -9,12 +9,12 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" pbmsg "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/mq/kafka" "go.mongodb.org/mongo-driver/mongo" ) diff --git a/pkg/common/storage/controller/push.go b/pkg/common/storage/controller/push.go index 91ef126fe..a805eaf00 100644 --- a/pkg/common/storage/controller/push.go +++ b/pkg/common/storage/controller/push.go @@ -19,10 +19,10 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/protocol/push" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/mq/kafka" ) type PushDatabase interface { diff --git a/pkg/common/storage/controller/s3.go b/pkg/common/storage/controller/s3.go index 6693d2dde..30d8d20ec 100644 --- a/pkg/common/storage/controller/s3.go +++ b/pkg/common/storage/controller/s3.go @@ -30,7 +30,7 @@ import ( ) type S3Database interface { - PartLimit() *s3.PartLimit + PartLimit() (*s3.PartLimit, error) PartSize(ctx context.Context, size int64) (int64, error) AuthSign(ctx context.Context, uploadID string, partNumbers []int) (*s3.AuthSignResult, error) InitiateMultipartUpload(ctx context.Context, hash string, size int64, expire time.Duration, maxParts int) (*cont.InitiateUploadResult, error) @@ -65,7 +65,7 @@ func (s *s3Database) PartSize(ctx context.Context, size int64) (int64, error) { return s.s3.PartSize(ctx, size) } -func (s *s3Database) PartLimit() *s3.PartLimit { +func (s *s3Database) PartLimit() (*s3.PartLimit, error) { return s.s3.PartLimit() } diff --git a/pkg/common/storage/kafka/config.go b/pkg/common/storage/kafka/config.go new file mode 100644 index 000000000..1c9c4b0a0 --- /dev/null +++ b/pkg/common/storage/kafka/config.go @@ -0,0 +1,33 @@ +// Copyright © 2024 OpenIM open source community. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +type TLSConfig struct { + EnableTLS bool `yaml:"enableTLS"` + CACrt string `yaml:"caCrt"` + ClientCrt string `yaml:"clientCrt"` + ClientKey string `yaml:"clientKey"` + ClientKeyPwd string `yaml:"clientKeyPwd"` + InsecureSkipVerify bool `yaml:"insecureSkipVerify"` +} + +type Config struct { + Username string `yaml:"username"` + Password string `yaml:"password"` + ProducerAck string `yaml:"producerAck"` + CompressType string `yaml:"compressType"` + Addr []string `yaml:"addr"` + TLS TLSConfig `yaml:"tls"` +} diff --git a/pkg/common/storage/kafka/consumer_group.go b/pkg/common/storage/kafka/consumer_group.go new file mode 100644 index 000000000..f0e84bbc9 --- /dev/null +++ b/pkg/common/storage/kafka/consumer_group.go @@ -0,0 +1,68 @@ +// Copyright © 2023 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "errors" + + "github.com/IBM/sarama" + "github.com/openimsdk/tools/log" +) + +type MConsumerGroup struct { + sarama.ConsumerGroup + groupID string + topics []string +} + +func NewMConsumerGroup(conf *Config, groupID string, topics []string, autoCommitEnable bool) (*MConsumerGroup, error) { + config, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, autoCommitEnable) + if err != nil { + return nil, err + } + group, err := NewConsumerGroup(config, conf.Addr, groupID) + if err != nil { + return nil, err + } + return &MConsumerGroup{ + ConsumerGroup: group, + groupID: groupID, + topics: topics, + }, nil +} + +func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context { + return GetContextWithMQHeader(cMsg.Headers) +} + +func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler) { + for { + err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler) + if errors.Is(err, sarama.ErrClosedConsumerGroup) { + return + } + if errors.Is(err, context.Canceled) { + return + } + if err != nil { + log.ZWarn(ctx, "consume err", err, "topic", mc.topics, "groupID", mc.groupID) + } + } +} + +func (mc *MConsumerGroup) Close() error { + return mc.ConsumerGroup.Close() +} diff --git a/pkg/common/storage/kafka/producer.go b/pkg/common/storage/kafka/producer.go new file mode 100644 index 000000000..5f6be29ed --- /dev/null +++ b/pkg/common/storage/kafka/producer.go @@ -0,0 +1,82 @@ +// Copyright © 2023 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "github.com/IBM/sarama" + "github.com/openimsdk/tools/errs" + "google.golang.org/protobuf/proto" +) + +// Producer represents a Kafka producer. +type Producer struct { + addr []string + topic string + config *sarama.Config + producer sarama.SyncProducer +} + +func NewKafkaProducer(config *sarama.Config, addr []string, topic string) (*Producer, error) { + producer, err := NewProducer(config, addr) + if err != nil { + return nil, err + } + return &Producer{ + addr: addr, + topic: topic, + config: config, + producer: producer, + }, nil +} + +// SendMessage sends a message to the Kafka topic configured in the Producer. +func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Message) (int32, int64, error) { + // Marshal the protobuf message + bMsg, err := proto.Marshal(msg) + if err != nil { + return 0, 0, errs.WrapMsg(err, "kafka proto Marshal err") + } + if len(bMsg) == 0 { + return 0, 0, errs.WrapMsg(errEmptyMsg, "kafka proto Marshal err") + } + + // Prepare Kafka message + kMsg := &sarama.ProducerMessage{ + Topic: p.topic, + Key: sarama.StringEncoder(key), + Value: sarama.ByteEncoder(bMsg), + } + + // Validate message key and value + if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 { + return 0, 0, errs.Wrap(errEmptyMsg) + } + + // Attach context metadata as headers + header, err := GetMQHeaderWithContext(ctx) + if err != nil { + return 0, 0, err + } + kMsg.Headers = header + + // Send the message + partition, offset, err := p.producer.SendMessage(kMsg) + if err != nil { + return 0, 0, errs.WrapMsg(err, "p.producer.SendMessage error") + } + + return partition, offset, nil +} diff --git a/pkg/common/storage/kafka/sarama.go b/pkg/common/storage/kafka/sarama.go new file mode 100644 index 000000000..23220b4d0 --- /dev/null +++ b/pkg/common/storage/kafka/sarama.go @@ -0,0 +1,85 @@ +package kafka + +import ( + "bytes" + "strings" + + "github.com/IBM/sarama" + "github.com/openimsdk/tools/errs" +) + +func BuildConsumerGroupConfig(conf *Config, initial int64, autoCommitEnable bool) (*sarama.Config, error) { + kfk := sarama.NewConfig() + kfk.Version = sarama.V2_0_0_0 + kfk.Consumer.Offsets.Initial = initial + kfk.Consumer.Offsets.AutoCommit.Enable = autoCommitEnable + kfk.Consumer.Return.Errors = false + if conf.Username != "" || conf.Password != "" { + kfk.Net.SASL.Enable = true + kfk.Net.SASL.User = conf.Username + kfk.Net.SASL.Password = conf.Password + } + if conf.TLS.EnableTLS { + tls, err := newTLSConfig(conf.TLS.ClientCrt, conf.TLS.ClientKey, conf.TLS.CACrt, []byte(conf.TLS.ClientKeyPwd), conf.TLS.InsecureSkipVerify) + if err != nil { + return nil, err + } + kfk.Net.TLS.Config = tls + kfk.Net.TLS.Enable = true + } + return kfk, nil +} + +func NewConsumerGroup(conf *sarama.Config, addr []string, groupID string) (sarama.ConsumerGroup, error) { + cg, err := sarama.NewConsumerGroup(addr, groupID, conf) + if err != nil { + return nil, errs.WrapMsg(err, "NewConsumerGroup failed", "addr", addr, "groupID", groupID, "conf", *conf) + } + return cg, nil +} + +func BuildProducerConfig(conf Config) (*sarama.Config, error) { + kfk := sarama.NewConfig() + kfk.Producer.Return.Successes = true + kfk.Producer.Return.Errors = true + kfk.Producer.Partitioner = sarama.NewHashPartitioner + if conf.Username != "" || conf.Password != "" { + kfk.Net.SASL.Enable = true + kfk.Net.SASL.User = conf.Username + kfk.Net.SASL.Password = conf.Password + } + switch strings.ToLower(conf.ProducerAck) { + case "no_response": + kfk.Producer.RequiredAcks = sarama.NoResponse + case "wait_for_local": + kfk.Producer.RequiredAcks = sarama.WaitForLocal + case "wait_for_all": + kfk.Producer.RequiredAcks = sarama.WaitForAll + default: + kfk.Producer.RequiredAcks = sarama.WaitForAll + } + if conf.CompressType == "" { + kfk.Producer.Compression = sarama.CompressionNone + } else { + if err := kfk.Producer.Compression.UnmarshalText(bytes.ToLower([]byte(conf.CompressType))); err != nil { + return nil, errs.WrapMsg(err, "UnmarshalText failed", "compressType", conf.CompressType) + } + } + if conf.TLS.EnableTLS { + tls, err := newTLSConfig(conf.TLS.ClientCrt, conf.TLS.ClientKey, conf.TLS.CACrt, []byte(conf.TLS.ClientKeyPwd), conf.TLS.InsecureSkipVerify) + if err != nil { + return nil, err + } + kfk.Net.TLS.Config = tls + kfk.Net.TLS.Enable = true + } + return kfk, nil +} + +func NewProducer(conf *sarama.Config, addr []string) (sarama.SyncProducer, error) { + producer, err := sarama.NewSyncProducer(addr, conf) + if err != nil { + return nil, errs.WrapMsg(err, "NewSyncProducer failed", "addr", addr, "conf", *conf) + } + return producer, nil +} diff --git a/pkg/common/storage/kafka/tls.go b/pkg/common/storage/kafka/tls.go new file mode 100644 index 000000000..00c89dcc1 --- /dev/null +++ b/pkg/common/storage/kafka/tls.go @@ -0,0 +1,83 @@ +// Copyright © 2024 OpenIM open source community. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "crypto/tls" + "crypto/x509" + "encoding/pem" + "os" + + "github.com/openimsdk/tools/errs" +) + +// decryptPEM decrypts a PEM block using a password. +func decryptPEM(data []byte, passphrase []byte) ([]byte, error) { + if len(passphrase) == 0 { + return data, nil + } + b, _ := pem.Decode(data) + d, err := x509.DecryptPEMBlock(b, passphrase) + if err != nil { + return nil, errs.WrapMsg(err, "DecryptPEMBlock failed") + } + return pem.EncodeToMemory(&pem.Block{ + Type: b.Type, + Bytes: d, + }), nil +} + +func readEncryptablePEMBlock(path string, pwd []byte) ([]byte, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, errs.WrapMsg(err, "ReadFile failed", "path", path) + } + return decryptPEM(data, pwd) +} + +// newTLSConfig setup the TLS config from general config file. +func newTLSConfig(clientCertFile, clientKeyFile, caCertFile string, keyPwd []byte, insecureSkipVerify bool) (*tls.Config, error) { + var tlsConfig tls.Config + if clientCertFile != "" && clientKeyFile != "" { + certPEMBlock, err := os.ReadFile(clientCertFile) + if err != nil { + return nil, errs.WrapMsg(err, "ReadFile failed", "clientCertFile", clientCertFile) + } + keyPEMBlock, err := readEncryptablePEMBlock(clientKeyFile, keyPwd) + if err != nil { + return nil, err + } + + cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock) + if err != nil { + return nil, errs.WrapMsg(err, "X509KeyPair failed") + } + tlsConfig.Certificates = []tls.Certificate{cert} + } + + if caCertFile != "" { + caCert, err := os.ReadFile(caCertFile) + if err != nil { + return nil, errs.WrapMsg(err, "ReadFile failed", "caCertFile", caCertFile) + } + caCertPool := x509.NewCertPool() + if ok := caCertPool.AppendCertsFromPEM(caCert); !ok { + return nil, errs.New("AppendCertsFromPEM failed") + } + tlsConfig.RootCAs = caCertPool + } + tlsConfig.InsecureSkipVerify = insecureSkipVerify + return &tlsConfig, nil +} diff --git a/pkg/common/storage/kafka/util.go b/pkg/common/storage/kafka/util.go new file mode 100644 index 000000000..61abe5450 --- /dev/null +++ b/pkg/common/storage/kafka/util.go @@ -0,0 +1,34 @@ +package kafka + +import ( + "context" + "errors" + "github.com/IBM/sarama" + "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/tools/mcontext" +) + +var errEmptyMsg = errors.New("kafka binary msg is empty") + +// GetMQHeaderWithContext extracts message queue headers from the context. +func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) { + operationID, opUserID, platform, connID, err := mcontext.GetCtxInfos(ctx) + if err != nil { + return nil, err + } + return []sarama.RecordHeader{ + {Key: []byte(constant.OperationID), Value: []byte(operationID)}, + {Key: []byte(constant.OpUserID), Value: []byte(opUserID)}, + {Key: []byte(constant.OpUserPlatform), Value: []byte(platform)}, + {Key: []byte(constant.ConnID), Value: []byte(connID)}, + }, nil +} + +// GetContextWithMQHeader creates a context from message queue headers. +func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context { + var values []string + for _, recordHeader := range header { + values = append(values, string(recordHeader.Value)) + } + return mcontext.WithMustInfoCtx(values) // Attach extracted values to context +} diff --git a/pkg/common/storage/kafka/verify.go b/pkg/common/storage/kafka/verify.go new file mode 100644 index 000000000..0a09eed4e --- /dev/null +++ b/pkg/common/storage/kafka/verify.go @@ -0,0 +1,79 @@ +// Copyright © 2024 OpenIM open source community. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "fmt" + + "github.com/IBM/sarama" + "github.com/openimsdk/tools/errs" +) + +func CheckTopics(ctx context.Context, conf *Config, topics []string) error { + kfk, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, false) + if err != nil { + return err + } + cli, err := sarama.NewClient(conf.Addr, kfk) + if err != nil { + return errs.WrapMsg(err, "NewClient failed", "config: ", fmt.Sprintf("%+v", conf)) + } + defer cli.Close() + + existingTopics, err := cli.Topics() + if err != nil { + return errs.WrapMsg(err, "Failed to list topics") + } + + existingTopicsMap := make(map[string]bool) + for _, t := range existingTopics { + existingTopicsMap[t] = true + } + + for _, topic := range topics { + if !existingTopicsMap[topic] { + return errs.New("topic not exist", "topic", topic).Wrap() + } + } + return nil +} + +func CheckHealth(ctx context.Context, conf *Config) error { + kfk, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, false) + if err != nil { + return err + } + cli, err := sarama.NewClient(conf.Addr, kfk) + if err != nil { + return errs.WrapMsg(err, "NewClient failed", "config: ", fmt.Sprintf("%+v", conf)) + } + defer cli.Close() + + // Get broker list + brokers := cli.Brokers() + if len(brokers) == 0 { + return errs.New("no brokers found").Wrap() + } + + // Check if all brokers are reachable + for _, broker := range brokers { + if err := broker.Open(kfk); err != nil { + return errs.WrapMsg(err, "failed to open broker", "broker", broker.Addr()) + } + } + + return nil +} diff --git a/tools/check-component/main.go b/tools/check-component/main.go index 9df0da7de..ad53c1921 100644 --- a/tools/check-component/main.go +++ b/tools/check-component/main.go @@ -25,11 +25,11 @@ import ( "time" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/discovery/zookeeper" - "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/s3/minio" "github.com/openimsdk/tools/system/program" "github.com/openimsdk/tools/utils/runtimeenv" @@ -84,7 +84,7 @@ func initConfig(configDir string) (*config.Mongo, *config.Redis, *config.Kafka, discovery = &config.Discovery{} thirdConfig = &config.Third{} ) - runtimeEnv := runtimeenv.PrintRuntimeEnvironment() + runtimeEnv := runtimeenv.RuntimeEnvironment() err := config.Load(configDir, config.MongodbConfigFileName, config.EnvPrefixMap[config.MongodbConfigFileName], runtimeEnv, mongoConfig) if err != nil { diff --git a/tools/seq/internal/main.go b/tools/seq/internal/main.go index 574e7cef9..7e5d5598c 100644 --- a/tools/seq/internal/main.go +++ b/tools/seq/internal/main.go @@ -43,7 +43,7 @@ const ( ) func readConfig[T any](dir string, name string) (*T, error) { - if runtimeenv.PrintRuntimeEnvironment() == config.KUBERNETES { + if runtimeenv.RuntimeEnvironment() == config.KUBERNETES { dir = os.Getenv(config.MountConfigFilePath) } v := viper.New()