{{- /* Copyright VMware, Inc. SPDX-License-Identifier: APACHE-2.0 */}} {{- $replicaCount := int .Values.controller.replicaCount }} {{- if and .Values.kraft.enabled (gt $replicaCount 0) }} apiVersion: {{ include "common.capabilities.statefulset.apiVersion" . }} kind: StatefulSet metadata: name: {{ printf "%s-controller" (include "common.names.fullname" .) }} namespace: {{ include "common.names.namespace" . | quote }} labels: {{- include "common.labels.standard" ( dict "customLabels" .Values.commonLabels "context" $ ) | nindent 4 }} app.kubernetes.io/component: controller-eligible app.kubernetes.io/part-of: kafka {{- if .Values.commonAnnotations }} annotations: {{- include "common.tplvalues.render" ( dict "value" .Values.commonAnnotations "context" $ ) | nindent 4 }} {{- end }} spec: podManagementPolicy: {{ .Values.controller.podManagementPolicy }} replicas: {{ .Values.controller.replicaCount }} {{- $podLabels := include "common.tplvalues.merge" ( dict "values" ( list .Values.controller.podLabels .Values.commonLabels ) "context" . ) }} selector: matchLabels: {{- include "common.labels.matchLabels" ( dict "customLabels" $podLabels "context" $ ) | nindent 6 }} app.kubernetes.io/component: controller-eligible app.kubernetes.io/part-of: kafka serviceName: {{ printf "%s-controller-headless" (include "common.names.fullname" .) | trunc 63 | trimSuffix "-" }} updateStrategy: {{- include "common.tplvalues.render" (dict "value" .Values.controller.updateStrategy "context" $ ) | nindent 4 }} template: metadata: labels: {{- include "common.labels.standard" ( dict "customLabels" $podLabels "context" $ ) | nindent 8 }} app.kubernetes.io/component: controller-eligible app.kubernetes.io/part-of: kafka annotations: {{- if (include "kafka.controller.createConfigmap" .) }} checksum/configuration: {{ include (print $.Template.BasePath "/controller-eligible/configmap.yaml") . | sha256sum }} {{- end }} {{- if (include "kafka.createSaslSecret" .) }} checksum/passwords-secret: {{ include (print $.Template.BasePath "/secrets.yaml") . | sha256sum }} {{- end }} {{- if (include "kafka.createTlsSecret" .) }} checksum/tls-secret: {{ include (print $.Template.BasePath "/tls-secret.yaml") . | sha256sum }} {{- end }} {{- if (include "kafka.metrics.jmx.createConfigmap" .) }} checksum/jmx-configuration: {{ include (print $.Template.BasePath "/metrics/jmx-configmap.yaml") . | sha256sum }} {{- end }} {{- if .Values.controller.podAnnotations }} {{- include "common.tplvalues.render" (dict "value" .Values.controller.podAnnotations "context" $) | nindent 8 }} {{- end }} spec: {{- include "kafka.imagePullSecrets" . | nindent 6 }} {{- if .Values.controller.hostAliases }} hostAliases: {{- include "common.tplvalues.render" (dict "value" .Values.controller.hostAliases "context" $) | nindent 8 }} {{- end }} hostNetwork: {{ .Values.controller.hostNetwork }} hostIPC: {{ .Values.controller.hostIPC }} {{- if .Values.controller.schedulerName }} schedulerName: {{ .Values.controller.schedulerName | quote }} {{- end }} {{- if .Values.controller.affinity }} affinity: {{- include "common.tplvalues.render" (dict "value" .Values.controller.affinity "context" $) | nindent 8 }} {{- else }} affinity: podAffinity: {{- include "common.affinities.pods" (dict "type" .Values.controller.podAffinityPreset "component" "controller-eligible" "customLabels" $podLabels "context" $) | nindent 10 }} podAntiAffinity: {{- include "common.affinities.pods" (dict "type" .Values.controller.podAntiAffinityPreset "component" "controller-eligible" "customLabels" $podLabels "context" $) | nindent 10 }} nodeAffinity: {{- include "common.affinities.nodes" (dict "type" .Values.controller.nodeAffinityPreset.type "key" .Values.controller.nodeAffinityPreset.key "values" .Values.controller.nodeAffinityPreset.values) | nindent 10 }} {{- end }} {{- if .Values.controller.nodeSelector }} nodeSelector: {{- include "common.tplvalues.render" (dict "value" .Values.controller.nodeSelector "context" $) | nindent 8 }} {{- end }} {{- if .Values.controller.tolerations }} tolerations: {{- include "common.tplvalues.render" (dict "value" .Values.controller.tolerations "context" $) | nindent 8 }} {{- end }} {{- if .Values.controller.topologySpreadConstraints }} topologySpreadConstraints: {{- include "common.tplvalues.render" (dict "value" .Values.controller.topologySpreadConstraints "context" $) | nindent 8 }} {{- end }} {{- if .Values.controller.terminationGracePeriodSeconds }} terminationGracePeriodSeconds: {{ .Values.controller.terminationGracePeriodSeconds }} {{- end }} {{- if .Values.controller.priorityClassName }} priorityClassName: {{ .Values.controller.priorityClassName }} {{- end }} {{- if .Values.controller.runtimeClassName }} runtimeClassName: {{ .Values.controller.runtimeClassName }} {{- end }} {{- if .Values.controller.podSecurityContext.enabled }} securityContext: {{- omit .Values.controller.podSecurityContext "enabled" | toYaml | nindent 8 }} {{- end }} serviceAccountName: {{ include "kafka.serviceAccountName" . }} enableServiceLinks: {{ .Values.controller.enableServiceLinks }} initContainers: {{- if and .Values.volumePermissions.enabled .Values.controller.persistence.enabled }} - name: volume-permissions image: {{ include "kafka.volumePermissions.image" . }} imagePullPolicy: {{ .Values.volumePermissions.image.pullPolicy | quote }} command: - /bin/bash args: - -ec - | mkdir -p "{{ .Values.controller.persistence.mountPath }}" "{{ .Values.controller.logPersistence.mountPath }}" chown -R {{ .Values.controller.containerSecurityContext.runAsUser }}:{{ .Values.controller.podSecurityContext.fsGroup }} "{{ .Values.controller.persistence.mountPath }}" "{{ .Values.controller.logPersistence.mountPath }}" find "{{ .Values.controller.persistence.mountPath }}" -mindepth 1 -maxdepth 1 -not -name ".snapshot" -not -name "lost+found" | xargs -r chown -R {{ .Values.controller.containerSecurityContext.runAsUser }}:{{ .Values.controller.podSecurityContext.fsGroup }} find "{{ .Values.controller.logPersistence.mountPath }}" -mindepth 1 -maxdepth 1 -not -name ".snapshot" -not -name "lost+found" | xargs -r chown -R {{ .Values.controller.containerSecurityContext.runAsUser }}:{{ .Values.controller.podSecurityContext.fsGroup }} {{- if eq ( toString ( .Values.volumePermissions.containerSecurityContext.runAsUser )) "auto" }} securityContext: {{- omit .Values.volumePermissions.containerSecurityContext "runAsUser" | toYaml | nindent 12 }} {{- else }} securityContext: {{- .Values.volumePermissions.containerSecurityContext | toYaml | nindent 12 }} {{- end }} {{- if .Values.volumePermissions.resources }} resources: {{- toYaml .Values.volumePermissions.resources | nindent 12 }} {{- end }} volumeMounts: - name: data mountPath: {{ .Values.controller.persistence.mountPath }} - name: logs mountPath: {{ .Values.controller.logPersistence.mountPath }} {{- end }} {{- if and .Values.externalAccess.enabled .Values.externalAccess.autoDiscovery.enabled (or .Values.externalAccess.controller.forceExpose (not .Values.controller.controllerOnly))}} {{- include "kafka.autoDiscoveryInitContainer" ( dict "role" "controller" "context" $) | nindent 8 }} {{- end }} {{- include "kafka.prepareKafkaInitContainer" ( dict "role" "controller" "context" $) | nindent 8 }} {{- if .Values.controller.initContainers }} {{- include "common.tplvalues.render" ( dict "value" .Values.controller.initContainers "context" $ ) | nindent 8 }} {{- end }} {{- if .Values.initContainers }} {{- include "common.tplvalues.render" ( dict "value" .Values.initContainers "context" $ ) | nindent 8 }} {{- end }} containers: - name: kafka image: {{ include "kafka.image" . }} imagePullPolicy: {{ .Values.image.pullPolicy | quote }} {{- if .Values.controller.containerSecurityContext.enabled }} securityContext: {{- omit .Values.controller.containerSecurityContext "enabled" | toYaml | nindent 12 }} {{- end }} {{- if .Values.diagnosticMode.enabled }} command: {{- include "common.tplvalues.render" (dict "value" .Values.diagnosticMode.command "context" $) | nindent 12 }} {{- else if .Values.controller.command }} command: {{- include "common.tplvalues.render" (dict "value" .Values.controller.command "context" $) | nindent 12 }} {{- end }} {{- if .Values.diagnosticMode.enabled }} args: {{- include "common.tplvalues.render" (dict "value" .Values.diagnosticMode.args "context" $) | nindent 12 }} {{- else if .Values.controller.args }} args: {{- include "common.tplvalues.render" (dict "value" .Values.controller.args "context" $) | nindent 12 }} {{- end }} env: - name: BITNAMI_DEBUG value: {{ ternary "true" "false" (or .Values.image.debug .Values.diagnosticMode.enabled) | quote }} - name: KAFKA_HEAP_OPTS value: {{ coalesce .Values.controller.heapOpts .Values.heapOpts | quote }} - name: KAFKA_KRAFT_CLUSTER_ID valueFrom: secretKeyRef: name: {{ printf "%s-kraft-cluster-id" (include "common.names.fullname" .) }} key: kraft-cluster-id {{- if and (include "kafka.saslEnabled" .) (or (regexFind "SCRAM" (upper .Values.sasl.enabledMechanisms)) (regexFind "SCRAM" (upper .Values.sasl.controllerMechanism)) (regexFind "SCRAM" (upper .Values.sasl.interBrokerMechanism))) }} - name: KAFKA_KRAFT_BOOTSTRAP_SCRAM_USERS value: "true" {{- if and (include "kafka.client.saslEnabled" . ) .Values.sasl.client.users }} - name: KAFKA_CLIENT_USERS value: {{ join "," .Values.sasl.client.users | quote }} - name: KAFKA_CLIENT_PASSWORDS valueFrom: secretKeyRef: name: {{ include "kafka.saslSecretName" . }} key: client-passwords {{- end }} {{- if regexFind "SASL" (upper .Values.listeners.interbroker.protocol) }} - name: KAFKA_INTER_BROKER_USER value: {{ .Values.sasl.interbroker.user | quote }} - name: KAFKA_INTER_BROKER_PASSWORD valueFrom: secretKeyRef: name: {{ include "kafka.saslSecretName" . }} key: inter-broker-password {{- end }} {{- if regexFind "SASL" (upper .Values.listeners.controller.protocol) }} - name: KAFKA_CONTROLLER_USER value: {{ .Values.sasl.controller.user | quote }} - name: KAFKA_CONTROLLER_PASSWORD valueFrom: secretKeyRef: name: {{ include "kafka.saslSecretName" . }} key: controller-password {{- end }} {{- end }} {{- if .Values.metrics.jmx.enabled }} - name: JMX_PORT value: {{ .Values.metrics.jmx.kafkaJmxPort | quote }} {{- end }} {{- if .Values.controller.extraEnvVars }} {{- include "common.tplvalues.render" ( dict "value" .Values.controller.extraEnvVars "context" $) | nindent 12 }} {{- end }} {{- if .Values.extraEnvVars }} {{- include "common.tplvalues.render" ( dict "value" .Values.extraEnvVars "context" $) | nindent 12 }} {{- end }} {{- if or .Values.controller.extraEnvVarsCM .Values.extraEnvVarsCM .Values.controller.extraEnvVarsSecret .Values.extraEnvVarsSecret }} envFrom: {{- if .Values.controller.extraEnvVarsCM }} - configMapRef: name: {{ include "common.tplvalues.render" (dict "value" .Values.controller.extraEnvVarsCM "context" $) }} {{- end }} {{- if .Values.extraEnvVarsCM }} - configMapRef: name: {{ include "common.tplvalues.render" (dict "value" .Values.extraEnvVarsCM "context" $) }} {{- end }} {{- if .Values.controller.extraEnvVarsSecret }} - secretRef: name: {{ include "common.tplvalues.render" (dict "value" .Values.controller.extraEnvVarsSecret "context" $) }} {{- end }} {{- if .Values.extraEnvVarsSecret }} - secretRef: name: {{ include "common.tplvalues.render" (dict "value" .Values.extraEnvVarsSecret "context" $) }} {{- end }} {{- end }} ports: - name: controller containerPort: {{ .Values.listeners.controller.containerPort }} {{- if not .Values.controller.controllerOnly }} - name: client containerPort: {{ .Values.listeners.client.containerPort }} - name: interbroker containerPort: {{ .Values.listeners.interbroker.containerPort }} {{- if .Values.externalAccess.enabled }} - name: external containerPort: {{ .Values.listeners.external.containerPort }} {{- end }} {{- if .Values.listeners.extraListeners }} {{- include "kafka.extraListeners.containerPorts" . | nindent 12 }} {{- end }} {{- end }} {{- if .Values.controller.extraContainerPorts }} {{- include "common.tplvalues.render" (dict "value" .Values.controller.extraContainerPorts "context" $) | nindent 12 }} {{- end }} {{- if not .Values.diagnosticMode.enabled }} {{- if .Values.controller.customLivenessProbe }} livenessProbe: {{- include "common.tplvalues.render" (dict "value" .Values.controller.customLivenessProbe "context" $) | nindent 12 }} {{- else if .Values.controller.livenessProbe.enabled }} livenessProbe: {{- include "common.tplvalues.render" (dict "value" (omit .Values.controller.livenessProbe "enabled") "context" $) | nindent 12 }} tcpSocket: port: "controller" {{- end }} {{- if .Values.controller.customReadinessProbe }} readinessProbe: {{- include "common.tplvalues.render" (dict "value" .Values.controller.customReadinessProbe "context" $) | nindent 12 }} {{- else if .Values.controller.readinessProbe.enabled }} readinessProbe: {{- include "common.tplvalues.render" (dict "value" (omit .Values.controller.readinessProbe "enabled") "context" $) | nindent 12 }} tcpSocket: port: "controller" {{- end }} {{- if .Values.controller.customStartupProbe }} startupProbe: {{- include "common.tplvalues.render" (dict "value" .Values.controller.customStartupProbe "context" $) | nindent 12 }} {{- else if .Values.controller.startupProbe.enabled }} startupProbe: {{- include "common.tplvalues.render" (dict "value" (omit .Values.controller.startupProbe "enabled") "context" $) | nindent 12 }} tcpSocket: port: "controller" {{- end }} {{- end }} {{- if .Values.controller.lifecycleHooks }} lifecycle: {{- include "common.tplvalues.render" (dict "value" .Values.controller.lifecycleHooks "context" $) | nindent 12 }} {{- end }} {{- if .Values.controller.resources }} resources: {{- toYaml .Values.controller.resources | nindent 12 }} {{- end }} volumeMounts: - name: data mountPath: {{ .Values.controller.persistence.mountPath }} - name: logs mountPath: {{ .Values.controller.logPersistence.mountPath }} - name: kafka-config mountPath: /opt/bitnami/kafka/config/server.properties subPath: server.properties - name: tmp mountPath: /tmp {{- if or .Values.log4j .Values.existingLog4jConfigMap }} - name: log4j-config mountPath: /opt/bitnami/kafka/config/log4j.properties subPath: log4j.properties {{- end }} {{- if or .Values.tls.zookeeper.enabled (include "kafka.sslEnabled" .) }} - name: kafka-shared-certs mountPath: /opt/bitnami/kafka/config/certs readOnly: true {{- end }} {{- if .Values.extraVolumeMounts }} {{- include "common.tplvalues.render" (dict "value" .Values.extraVolumeMounts "context" $) | nindent 12 }} {{- end }} {{- if .Values.controller.extraVolumeMounts }} {{- include "common.tplvalues.render" (dict "value" .Values.controller.extraVolumeMounts "context" $) | nindent 12 }} {{- end }} {{- if .Values.metrics.jmx.enabled }} - name: jmx-exporter image: {{ include "kafka.metrics.jmx.image" . }} imagePullPolicy: {{ .Values.metrics.jmx.image.pullPolicy | quote }} {{- if .Values.metrics.jmx.containerSecurityContext.enabled }} securityContext: {{- omit .Values.metrics.jmx.containerSecurityContext "enabled" | toYaml | nindent 12 }} {{- end }} {{- if .Values.diagnosticMode.enabled }} command: {{- include "common.tplvalues.render" (dict "value" .Values.diagnosticMode.command "context" $) | nindent 12 }} args: {{- include "common.tplvalues.render" (dict "value" .Values.diagnosticMode.args "context" $) | nindent 12 }} {{- else }} command: - java args: - -XX:MaxRAMPercentage=100 - -XshowSettings:vm - -jar - jmx_prometheus_httpserver.jar - "5556" - /etc/jmx-kafka/jmx-kafka-prometheus.yml {{- end }} ports: - name: metrics containerPort: {{ .Values.metrics.jmx.containerPorts.metrics }} {{- if .Values.metrics.jmx.resources }} resources: {{- toYaml .Values.metrics.jmx.resources | nindent 12 }} {{- end }} volumeMounts: - name: jmx-config mountPath: /etc/jmx-kafka {{- end }} {{- if .Values.controller.sidecars }} {{- include "common.tplvalues.render" (dict "value" .Values.controller.sidecars "context" $) | nindent 8 }} {{- end }} {{- if .Values.sidecars }} {{- include "common.tplvalues.render" (dict "value" .Values.sidecars "context" $) | nindent 8 }} {{- end }} volumes: - name: kafka-configmaps configMap: name: {{ include "kafka.controller.configmapName" . }} - name: kafka-secret-config {{- if (include "kafka.controller.secretConfigExists" .) }} secret: secretName: {{ include "kafka.controller.secretConfigName" . }} {{- else }} emptyDir: {} {{- end }} - name: kafka-config emptyDir: {} - name: tmp emptyDir: {} - name: scripts configMap: name: {{ include "common.names.fullname" . }}-scripts defaultMode: 0755 {{- if and .Values.externalAccess.enabled .Values.externalAccess.autoDiscovery.enabled }} - name: kafka-autodiscovery-shared emptyDir: {} {{- end }} {{- if or .Values.log4j .Values.existingLog4jConfigMap }} - name: log4j-config configMap: name: {{ include "kafka.log4j.configMapName" . }} {{- end }} {{- if .Values.metrics.jmx.enabled }} - name: jmx-config configMap: name: {{ include "kafka.metrics.jmx.configmapName" . }} {{- end }} {{- if or .Values.tls.zookeeper.enabled (include "kafka.sslEnabled" .) }} - name: kafka-shared-certs emptyDir: {} {{- if and (include "kafka.sslEnabled" .) (or .Values.tls.existingSecret .Values.tls.autoGenerated) }} - name: kafka-certs projected: defaultMode: 256 sources: - secret: name: {{ include "kafka.tlsSecretName" . }} {{- if .Values.tls.jksTruststoreSecret }} - secret: name: {{ .Values.tls.jksTruststoreSecret }} {{- end }} {{- end }} {{- if and .Values.tls.zookeeper.enabled .Values.tls.zookeeper.existingSecret }} - name: kafka-zookeeper-cert secret: secretName: {{ .Values.tls.zookeeper.existingSecret }} defaultMode: 256 {{- end }} {{- end }} {{- if .Values.extraVolumes }} {{- include "common.tplvalues.render" (dict "value" .Values.extraVolumes "context" $) | nindent 8 }} {{- end }} {{- if .Values.controller.extraVolumes }} {{- include "common.tplvalues.render" (dict "value" .Values.controller.extraVolumes "context" $) | nindent 8 }} {{- end }} {{- if not .Values.controller.persistence.enabled }} - name: data emptyDir: {} {{- else if .Values.controller.persistence.existingClaim }} - name: data persistentVolumeClaim: claimName: {{ printf "%s" (tpl .Values.controller.persistence.existingClaim .) }} {{- end }} {{- if not .Values.controller.logPersistence.enabled }} - name: logs emptyDir: {} {{- else if .Values.controller.logPersistence.existingClaim }} - name: logs persistentVolumeClaim: claimName: {{ printf "%s" (tpl .Values.controller.logPersistence.existingClaim .) }} {{- end }} {{- if or (and .Values.controller.persistence.enabled (not .Values.controller.persistence.existingClaim)) (and .Values.controller.logPersistence.enabled (not .Values.controller.logPersistence.existingClaim)) }} volumeClaimTemplates: {{- if and .Values.controller.persistence.enabled (not .Values.controller.persistence.existingClaim) }} - metadata: name: data {{- if .Values.controller.persistence.annotations }} annotations: {{- include "common.tplvalues.render" (dict "value" .Values.controller.persistence.annotations "context" $) | nindent 10 }} {{- end }} {{- if .Values.controller.persistence.labels }} labels: {{- include "common.tplvalues.render" (dict "value" .Values.controller.persistence.labels "context" $) | nindent 10 }} {{- end }} spec: accessModes: {{- range .Values.controller.persistence.accessModes }} - {{ . | quote }} {{- end }} resources: requests: storage: {{ .Values.controller.persistence.size | quote }} {{- include "common.storage.class" (dict "persistence" .Values.controller.persistence "global" .Values.global) | nindent 8 }} {{- if .Values.controller.persistence.selector }} selector: {{- include "common.tplvalues.render" (dict "value" .Values.controller.persistence.selector "context" $) | nindent 10 }} {{- end -}} {{- end }} {{- if and .Values.controller.logPersistence.enabled (not .Values.controller.logPersistence.existingClaim) }} - metadata: name: logs {{- if .Values.controller.logPersistence.annotations }} annotations: {{- include "common.tplvalues.render" (dict "value" .Values.controller.logPersistence.annotations "context" $) | nindent 10 }} {{- end }} spec: accessModes: {{- range .Values.controller.logPersistence.accessModes }} - {{ . | quote }} {{- end }} resources: requests: storage: {{ .Values.controller.logPersistence.size | quote }} {{- include "common.storage.class" (dict "persistence" .Values.controller.logPersistence "global" .Values.global) | nindent 8 }} {{- if .Values.controller.logPersistence.selector }} selector: {{- include "common.tplvalues.render" (dict "value" .Values.controller.logPersistence.selector "context" $) | nindent 10 }} {{- end -}} {{- end }} {{- end }} {{- end }}