diff --git a/docker-compose.yml b/docker-compose.yml index f6ff9c184..6070e881b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -99,15 +99,62 @@ services: environment: #KAFKA_HEAP_OPTS: "-Xms128m -Xmx256m" TZ: Asia/Shanghai + # Unique identifier for the Kafka node (required in controller mode) KAFKA_CFG_NODE_ID: 0 + # Defines the roles this Kafka node plays: broker, controller, or both KAFKA_CFG_PROCESS_ROLES: controller,broker + # Specifies which nodes are controller nodes for quorum voting. + # The syntax follows the KRaft mode (no ZooKeeper): node.id@host:port + # The controller listener endpoint here is kafka:9093 KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093 - KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 - KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:19094 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT + # Specifies which listener is used for controller-to-controller communication KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + # Default number of partitions for new topics KAFKA_NUM_PARTITIONS: 8 + # Whether to enable automatic topic creation KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true" + # Kafka internal listeners; Kafka supports multiple ports with different protocols + # Each port is used for a specific purpose: INTERNAL for internal broker communication, + # CONTROLLER for controller communication, EXTERNAL for external client connections. + # These logical listener names are mapped to actual protocols via KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP + # In short, Kafka is listening on three logical ports: 9092 for internal communication, + # 9093 for controller traffic, and 9094 for external access. + KAFKA_CFG_LISTENERS: "INTERNAL://:9092,CONTROLLER://:9093,EXTERNAL://:9094" + # Addresses advertised to clients. INTERNAL://kafka:9092 uses the internal Docker service name 'kafka', + # so other containers can access Kafka via kafka:9092. + # EXTERNAL://localhost:19094 is the address external clients (e.g., in the LAN) should use to connect. + # If Kafka is deployed on a different machine than IM, 'localhost' should be replaced with the LAN IP. + KAFKA_CFG_ADVERTISED_LISTENERS: "INTERNAL://kafka:9092,EXTERNAL://localhost:19094" + # Maps logical listener names to actual protocols. + # Supported protocols include: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT" + # Defines which listener is used for inter-broker communication within the Kafka cluster + KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "INTERNAL" + + + # Authentication configuration variables - comment out to disable auth + # KAFKA_USERNAME: "openIM" + # KAFKA_PASSWORD: "openIM123" + command: > + /bin/sh -c ' + if [ -n "$${KAFKA_USERNAME}" ] && [ -n "$${KAFKA_PASSWORD}" ]; then + echo "=== Kafka SASL Authentication ENABLED ===" + echo "Username: $${KAFKA_USERNAME}" + + # Set environment variables for SASL authentication + export KAFKA_CFG_LISTENERS="SASL_PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094" + export KAFKA_CFG_ADVERTISED_LISTENERS="SASL_PLAINTEXT://kafka:9092,EXTERNAL://localhost:19094" + export KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP="CONTROLLER:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT" + export KAFKA_CFG_SASL_ENABLED_MECHANISMS="PLAIN" + export KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL="PLAIN" + export KAFKA_CFG_INTER_BROKER_LISTENER_NAME="SASL_PLAINTEXT" + export KAFKA_CLIENT_USERS="$${KAFKA_USERNAME}" + export KAFKA_CLIENT_PASSWORDS="$${KAFKA_PASSWORD}" + fi + + # Start Kafka with the configured environment + exec /opt/bitnami/scripts/kafka/entrypoint.sh /opt/bitnami/scripts/kafka/run.sh + ' networks: - openim diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 7a173d67b..722b6e8b3 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -26,6 +26,9 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/tools/discovery" + "github.com/go-redis/redis" + "google.golang.org/protobuf/proto" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/IBM/sarama" @@ -41,7 +44,6 @@ import ( "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/stringutil" - "google.golang.org/protobuf/proto" ) const ( @@ -140,53 +142,48 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) { - var conversationID string - var userSeqMap map[string]int64 + // Outer map: conversationID -> (userID -> maxHasReadSeq) + conversationUserSeq := make(map[string]map[string]int64) + for _, msg := range msgs { if msg.message.ContentType != constant.HasReadReceipt { continue } var elem sdkws.NotificationElem if err := json.Unmarshal(msg.message.Content, &elem); err != nil { - log.ZWarn(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg) + log.ZWarn(ctx, "Unmarshal NotificationElem error", err, "msg", msg) continue } var tips sdkws.MarkAsReadTips if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil { - log.ZWarn(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg) + log.ZWarn(ctx, "Unmarshal MarkAsReadTips error", err, "msg", msg) continue } - //The conversation ID for each batch of messages processed by the batcher is the same. - conversationID = tips.ConversationID - if len(tips.Seqs) > 0 { - for _, seq := range tips.Seqs { - if tips.HasReadSeq < seq { - tips.HasReadSeq = seq - } - } - clear(tips.Seqs) - tips.Seqs = nil - } - if tips.HasReadSeq < 0 { + if len(tips.ConversationID) == 0 || tips.HasReadSeq < 0 { continue } - if userSeqMap == nil { - userSeqMap = make(map[string]int64) - } - if userSeqMap[tips.MarkAsReadUserID] > tips.HasReadSeq { - continue + // Calculate the max seq from tips.Seqs + for _, seq := range tips.Seqs { + if tips.HasReadSeq < seq { + tips.HasReadSeq = seq + } + } + + if _, ok := conversationUserSeq[tips.ConversationID]; !ok { + conversationUserSeq[tips.ConversationID] = make(map[string]int64) + } + if conversationUserSeq[tips.ConversationID][tips.MarkAsReadUserID] < tips.HasReadSeq { + conversationUserSeq[tips.ConversationID][tips.MarkAsReadUserID] = tips.HasReadSeq } - userSeqMap[tips.MarkAsReadUserID] = tips.HasReadSeq } - if userSeqMap == nil { - return - } - if len(conversationID) == 0 { - log.ZWarn(ctx, "conversation err", nil, "conversationID", conversationID) - } - if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, conversationID, userSeqMap); err != nil { - log.ZWarn(ctx, "set read seq to db error", err, "conversationID", conversationID, "userSeqMap", userSeqMap) + log.ZInfo(ctx, "doSetReadSeq", "conversationUserSeq", conversationUserSeq) + + // persist to db + for convID, userSeqMap := range conversationUserSeq { + if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, convID, userSeqMap); err != nil { + log.ZWarn(ctx, "SetHasReadSeqToDB error", err, "conversationID", convID, "userSeqMap", userSeqMap) + } } }