{{/* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */}} ################################ ## Airflow Scheduler Deployment/StatefulSet ################################# {{- if .Values.scheduler.enabled }} {{- $local := contains "Local" .Values.executor }} # Is persistence enabled on the _workers_? # This is important because in $local mode, the scheduler assumes the role of the worker {{- $persistence := or .Values.workers.celery.persistence.enabled (and (not (has .Values.workers.celery.persistence.enabled (list true false))) .Values.workers.persistence.enabled) }} {{- $stateful := and $local $persistence }} # We can skip DAGs mounts on scheduler if dagProcessor is enabled, except with $local mode {{- $dagProcessorEnabled := .Values.dagProcessor.enabled }} {{- if eq $dagProcessorEnabled nil}} {{ $dagProcessorEnabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }} {{- end }} {{- $localOrDagProcessorDisabled := or (not $dagProcessorEnabled) $local }} {{- $remoteLogging := or .Values.elasticsearch.enabled .Values.opensearch.enabled }} {{- $nodeSelector := or .Values.scheduler.nodeSelector .Values.nodeSelector }} {{- $affinity := or .Values.scheduler.affinity .Values.affinity }} {{- $tolerations := or .Values.scheduler.tolerations .Values.tolerations }} {{- $topologySpreadConstraints := or .Values.scheduler.topologySpreadConstraints .Values.topologySpreadConstraints }} {{- $revisionHistoryLimit := include "airflow.revisionHistoryLimit" (list .Values.scheduler.revisionHistoryLimit .Values.revisionHistoryLimit) }} {{- $securityContext := include "airflowPodSecurityContext" (list .Values.scheduler .Values) }} {{- $containerSecurityContext := include "containerSecurityContext" (list .Values.scheduler .Values) }} {{- $containerSecurityContextWaitForMigrations := include "containerSecurityContext" (list .Values.scheduler.waitForMigrations .Values) }} {{- $containerSecurityContextLogGroomerSidecar := include "containerSecurityContext" (list .Values.scheduler.logGroomerSidecar .Values) }} {{- $containerLifecycleHooks := or .Values.scheduler.containerLifecycleHooks .Values.containerLifecycleHooks }} {{- $containerLifecycleHooksLogGroomerSidecar := or .Values.scheduler.logGroomerSidecar.containerLifecycleHooks .Values.containerLifecycleHooks }} apiVersion: apps/v1 kind: {{ if $stateful }}StatefulSet{{ else }}Deployment{{ end }} metadata: name: {{ include "airflow.fullname" . }}-scheduler labels: tier: airflow component: scheduler release: {{ .Release.Name }} chart: "{{ .Chart.Name }}-{{ .Chart.Version }}" heritage: {{ .Release.Service }} executor: {{ .Values.executor | replace "," "-" | trunc 63 | trimSuffix "-" | trimSuffix ":" | trimSuffix "_" | trimSuffix "." | quote }} {{- with .Values.labels }} {{- toYaml . | nindent 4 }} {{- end }} {{- if .Values.scheduler.annotations }} annotations: {{- toYaml .Values.scheduler.annotations | nindent 4 }} {{- end }} spec: {{- if $stateful }} serviceName: {{ include "airflow.fullname" . }}-scheduler {{- end }} replicas: {{ .Values.scheduler.replicas }} {{- if ne $revisionHistoryLimit "" }} revisionHistoryLimit: {{ $revisionHistoryLimit }} {{- end }} {{- if and $stateful .Values.scheduler.updateStrategy }} updateStrategy: {{- toYaml .Values.scheduler.updateStrategy | nindent 4 }} {{- end }} {{- if and $stateful (or .Values.workers.celery.persistence.persistentVolumeClaimRetentionPolicy .Values.workers.persistence.persistentVolumeClaimRetentionPolicy) }} persistentVolumeClaimRetentionPolicy: {{- toYaml (.Values.workers.celery.persistence.persistentVolumeClaimRetentionPolicy | default .Values.workers.persistence.persistentVolumeClaimRetentionPolicy) | nindent 4 }} {{- end }} {{- if and (not $stateful) .Values.scheduler.strategy }} strategy: {{- toYaml .Values.scheduler.strategy | nindent 4 }} {{- end }} selector: matchLabels: tier: airflow component: scheduler release: {{ .Release.Name }} template: metadata: labels: tier: airflow component: scheduler release: {{ .Release.Name }} {{- if or .Values.labels .Values.scheduler.labels }} {{- mustMerge .Values.scheduler.labels .Values.labels | toYaml | nindent 8 }} {{- end }} annotations: checksum/metadata-secret: {{ include (print $.Template.BasePath "/secrets/metadata-connection-secret.yaml") . | sha256sum }} checksum/result-backend-secret: {{ include (print $.Template.BasePath "/secrets/result-backend-connection-secret.yaml") . | sha256sum }} checksum/pgbouncer-config-secret: {{ include (print $.Template.BasePath "/secrets/pgbouncer-config-secret.yaml") . | sha256sum }} checksum/airflow-config: {{ include (print $.Template.BasePath "/configmaps/configmap.yaml") . | sha256sum }} checksum/extra-configmaps: {{ include (print $.Template.BasePath "/configmaps/extra-configmaps.yaml") . | sha256sum }} checksum/extra-secrets: {{ include (print $.Template.BasePath "/secrets/extra-secrets.yaml") . | sha256sum }} {{- if and (semverCompare ">=3.0.0" .Values.airflowVersion) .Values.apiServer.enabled (not .Values.jwtSecretName) }} checksum/jwt-secret: {{ include (print $.Template.BasePath "/secrets/jwt-secret.yaml") . | sha256sum }} {{- end }} {{- if .Values.scheduler.safeToEvict }} cluster-autoscaler.kubernetes.io/safe-to-evict: "true" {{- end }} {{- if .Values.airflowPodAnnotations }} {{- tpl (toYaml .Values.airflowPodAnnotations) . | nindent 8 }} {{- end }} {{- if .Values.scheduler.podAnnotations }} {{- tpl (toYaml .Values.scheduler.podAnnotations) . | nindent 8 }} {{- end }} spec: {{- if .Values.scheduler.priorityClassName }} priorityClassName: {{ .Values.scheduler.priorityClassName }} {{- end }} {{- if .Values.schedulerName }} schedulerName: {{ .Values.schedulerName }} {{- end }} nodeSelector: {{- toYaml $nodeSelector | nindent 8 }} affinity: {{- if $affinity }} {{- toYaml $affinity | nindent 8 }} {{- else }} podAntiAffinity: preferredDuringSchedulingIgnoredDuringExecution: - podAffinityTerm: labelSelector: matchLabels: component: scheduler topologyKey: kubernetes.io/hostname weight: 100 {{- end }} tolerations: {{- toYaml $tolerations | nindent 8 }} topologySpreadConstraints: {{- toYaml $topologySpreadConstraints | nindent 8 }} restartPolicy: Always terminationGracePeriodSeconds: {{ .Values.scheduler.terminationGracePeriodSeconds }} serviceAccountName: {{ include "scheduler.serviceAccountName" . }} {{- if and (eq (include "airflow.podLaunchingExecutor" .) "true") (not .Values.scheduler.serviceAccount.automountServiceAccountToken) }} automountServiceAccountToken: false {{- end }} securityContext: {{ $securityContext | nindent 8 }} imagePullSecrets: {{ include "image_pull_secrets" . | nindent 8 }} {{- if .Values.scheduler.hostAliases }} hostAliases: {{- toYaml .Values.scheduler.hostAliases | nindent 8 }} {{- end }} initContainers: {{- if .Values.scheduler.waitForMigrations.enabled }} - name: wait-for-airflow-migrations resources: {{- toYaml .Values.scheduler.resources | nindent 12 }} image: {{ template "airflow_image_for_migrations" . }} imagePullPolicy: {{ .Values.images.airflow.pullPolicy }} securityContext: {{ $containerSecurityContextWaitForMigrations | nindent 12 }} volumeMounts: - name: logs mountPath: "/opt/airflow/logs" {{- if .Values.logs.persistence.subPath }} subPath: {{ .Values.logs.persistence.subPath }} {{- end }} {{- include "airflow_config_mount" . | nindent 12 }} {{- if .Values.volumeMounts }} {{- toYaml .Values.volumeMounts | nindent 12 }} {{- end }} {{- if .Values.scheduler.extraVolumeMounts }} {{- tpl (toYaml .Values.scheduler.extraVolumeMounts) . | nindent 12 }} {{- end }} {{- if or .Values.webserver.webserverConfig .Values.webserver.webserverConfigConfigMapName }} {{- include "airflow_webserver_config_mount" . | nindent 12 }} {{- end }} args: {{- include "wait-for-migrations-command" . | indent 10 }} envFrom: {{- include "custom_airflow_environment_from" . | default "\n []" | indent 10 }} env: {{- include "custom_airflow_environment" . | indent 10 }} {{- include "standard_airflow_environment" (merge (dict "IncludeJwtSecret" false) .) | indent 10 }} {{- if .Values.scheduler.waitForMigrations.env }} {{- tpl (toYaml .Values.scheduler.waitForMigrations.env) $ | nindent 12 }} {{- end }} {{- end }} {{- if and $localOrDagProcessorDisabled .Values.dags.gitSync.enabled }} {{- include "git_sync_container" (dict "Values" .Values "is_init" "true" "Template" .Template) | nindent 8 }} {{- end }} {{- if .Values.scheduler.extraInitContainers }} {{- tpl (toYaml .Values.scheduler.extraInitContainers) . | nindent 8 }} {{- end }} containers: - name: scheduler image: {{ template "airflow_image" . }} imagePullPolicy: {{ .Values.images.airflow.pullPolicy }} securityContext: {{ $containerSecurityContext | nindent 12 }} {{- if $containerLifecycleHooks }} lifecycle: {{- tpl (toYaml $containerLifecycleHooks) . | nindent 12 }} {{- end }} {{- if .Values.scheduler.command }} command: {{ tpl (toYaml .Values.scheduler.command) . | nindent 12 }} {{- end }} {{- if .Values.scheduler.args }} args: {{ tpl (toYaml .Values.scheduler.args) . | nindent 12 }} {{- end }} envFrom: {{- include "custom_airflow_environment_from" . | default "\n []" | indent 10 }} env: {{- include "custom_airflow_environment" . | indent 10 }} {{- include "standard_airflow_environment" (merge (dict "IncludeJwtSecret" true) .) | indent 10 }} {{- include "container_extra_envs" (list . .Values.scheduler.env) | indent 10 }} livenessProbe: initialDelaySeconds: {{ .Values.scheduler.livenessProbe.initialDelaySeconds }} timeoutSeconds: {{ .Values.scheduler.livenessProbe.timeoutSeconds }} failureThreshold: {{ .Values.scheduler.livenessProbe.failureThreshold }} periodSeconds: {{ .Values.scheduler.livenessProbe.periodSeconds }} exec: command: {{- if .Values.scheduler.livenessProbe.command }} {{- toYaml .Values.scheduler.livenessProbe.command | nindent 16 }} {{- else }} {{- include "scheduler_liveness_check_command" . | indent 14 }} {{- end }} startupProbe: initialDelaySeconds: {{ .Values.scheduler.startupProbe.initialDelaySeconds }} timeoutSeconds: {{ .Values.scheduler.startupProbe.timeoutSeconds }} failureThreshold: {{ .Values.scheduler.startupProbe.failureThreshold }} periodSeconds: {{ .Values.scheduler.startupProbe.periodSeconds }} exec: command: {{- if .Values.scheduler.startupProbe.command }} {{- toYaml .Values.scheduler.startupProbe.command | nindent 16 }} {{- else }} {{- include "scheduler_startup_check_command" . | indent 14 }} {{- end }} {{- if and $local (not $remoteLogging) }} # Serve logs if we're in local mode and we have neither elasticsearch nor opensearch enabled. ports: - name: worker-logs containerPort: {{ .Values.ports.workerLogs }} {{- end }} resources: {{- toYaml .Values.scheduler.resources | nindent 12 }} volumeMounts: - name: config mountPath: {{ include "airflow_pod_template_file" . }}/pod_template_file.yaml subPath: pod_template_file.yaml readOnly: true - name: logs mountPath: {{ template "airflow_logs" . }} {{- if .Values.logs.persistence.subPath }} subPath: {{ .Values.logs.persistence.subPath }} {{- end }} {{- include "airflow_config_mount" . | nindent 12 }} {{- if or .Values.webserver.webserverConfig .Values.webserver.webserverConfigConfigMapName }} {{- include "airflow_webserver_config_mount" . | nindent 12 }} {{- end }} {{- if and $localOrDagProcessorDisabled (or .Values.dags.persistence.enabled .Values.dags.gitSync.enabled) }} {{- include "airflow_dags_mount" . | nindent 12 }} {{- end }} {{- if .Values.volumeMounts }} {{- toYaml .Values.volumeMounts | nindent 12 }} {{- end }} {{- if .Values.scheduler.extraVolumeMounts }} {{- tpl (toYaml .Values.scheduler.extraVolumeMounts) . | nindent 12 }} {{- end }} {{- if and (eq (include "airflow.podLaunchingExecutor" .) "true") (not .Values.scheduler.serviceAccount.automountServiceAccountToken) .Values.scheduler.serviceAccount.serviceAccountTokenVolume.enabled }} - name: {{ .Values.scheduler.serviceAccount.serviceAccountTokenVolume.volumeName }} mountPath: {{ .Values.scheduler.serviceAccount.serviceAccountTokenVolume.mountPath }} readOnly: true {{- end }} {{- if and $localOrDagProcessorDisabled .Values.dags.gitSync.enabled }} {{- include "git_sync_container" . | indent 8 }} {{- end }} {{- if .Values.scheduler.logGroomerSidecar.enabled }} - name: scheduler-log-groomer resources: {{- toYaml .Values.scheduler.logGroomerSidecar.resources | nindent 12 }} image: {{ template "airflow_image" . }} imagePullPolicy: {{ .Values.images.airflow.pullPolicy }} securityContext: {{ $containerSecurityContextLogGroomerSidecar | nindent 12 }} {{- if $containerLifecycleHooksLogGroomerSidecar }} lifecycle: {{- tpl (toYaml $containerLifecycleHooksLogGroomerSidecar) . | nindent 12 }} {{- end }} {{- if .Values.scheduler.logGroomerSidecar.command }} command: {{ tpl (toYaml .Values.scheduler.logGroomerSidecar.command) . | nindent 12 }} {{- end }} {{- if .Values.scheduler.logGroomerSidecar.args }} args: {{- tpl (toYaml .Values.scheduler.logGroomerSidecar.args) . | nindent 12 }} {{- end }} env: - name: AIRFLOW_HOME value: "{{ .Values.airflowHome }}" {{- if .Values.scheduler.logGroomerSidecar.retentionDays }} - name: AIRFLOW__LOG_RETENTION_DAYS value: "{{ .Values.scheduler.logGroomerSidecar.retentionDays }}" {{- end }} {{- if .Values.scheduler.logGroomerSidecar.retentionMinutes }} - name: AIRFLOW__LOG_RETENTION_MINUTES value: "{{ .Values.scheduler.logGroomerSidecar.retentionMinutes }}" {{- end }} {{- if .Values.scheduler.logGroomerSidecar.frequencyMinutes }} - name: AIRFLOW__LOG_CLEANUP_FREQUENCY_MINUTES value: "{{ .Values.scheduler.logGroomerSidecar.frequencyMinutes }}" {{- end }} {{- if .Values.scheduler.logGroomerSidecar.maxSizeBytes }} - name: AIRFLOW__LOG_MAX_SIZE_BYTES value: "{{ .Values.scheduler.logGroomerSidecar.maxSizeBytes | int64 }}" {{- end }} {{- if .Values.scheduler.logGroomerSidecar.maxSizePercent }} - name: AIRFLOW__LOG_MAX_SIZE_PERCENT value: "{{ .Values.scheduler.logGroomerSidecar.maxSizePercent }}" {{- end }} {{- if .Values.scheduler.logGroomerSidecar.env }} {{- tpl (toYaml .Values.scheduler.logGroomerSidecar.env) $ | nindent 12 }} {{- end }} volumeMounts: - name: logs mountPath: {{ template "airflow_logs" . }} {{- if .Values.logs.persistence.subPath }} subPath: {{ .Values.logs.persistence.subPath }} {{- end }} {{- if .Values.volumeMounts }} {{- toYaml .Values.volumeMounts | nindent 12 }} {{- end }} {{- if .Values.scheduler.extraVolumeMounts }} {{- tpl (toYaml .Values.scheduler.extraVolumeMounts) . | nindent 12 }} {{- end }} {{- if or .Values.webserver.webserverConfig .Values.webserver.webserverConfigConfigMapName }} {{- include "airflow_webserver_config_mount" . | nindent 12 }} {{- end }} {{- end }} {{- if .Values.scheduler.extraContainers }} {{- tpl (toYaml .Values.scheduler.extraContainers) . | nindent 8 }} {{- end }} volumes: - name: config configMap: name: {{ template "airflow_config" . }} {{- if or .Values.webserver.webserverConfig .Values.webserver.webserverConfigConfigMapName }} - name: webserver-config configMap: name: {{ template "airflow_webserver_config_configmap_name" . }} {{- end }} {{- if $localOrDagProcessorDisabled }} {{- if .Values.dags.persistence.enabled }} - name: dags persistentVolumeClaim: claimName: {{ template "airflow_dags_volume_claim" . }} {{- else if .Values.dags.gitSync.enabled }} - name: dags emptyDir: {{- toYaml (default (dict) .Values.dags.gitSync.emptyDirConfig) | nindent 12 }} {{- if or .Values.dags.gitSync.sshKeySecret .Values.dags.gitSync.sshKey}} {{- include "git_sync_ssh_key_volume" . | indent 8 }} {{- end }} {{- end }} {{- end }} {{- if .Values.volumes }} {{- toYaml .Values.volumes | nindent 8 }} {{- end }} {{- if .Values.scheduler.extraVolumes }} {{- tpl (toYaml .Values.scheduler.extraVolumes) . | nindent 8 }} {{- end }} {{- if and (eq (include "airflow.podLaunchingExecutor" .) "true") (not .Values.scheduler.serviceAccount.automountServiceAccountToken) .Values.scheduler.serviceAccount.serviceAccountTokenVolume.enabled }} - name: {{ .Values.scheduler.serviceAccount.serviceAccountTokenVolume.volumeName }} projected: defaultMode: 420 sources: - serviceAccountToken: {{- if .Values.scheduler.serviceAccount.serviceAccountTokenVolume.audience }} audience: {{ .Values.scheduler.serviceAccount.serviceAccountTokenVolume.audience }} {{- end }} expirationSeconds: {{ .Values.scheduler.serviceAccount.serviceAccountTokenVolume.expirationSeconds }} path: token - configMap: items: - key: ca.crt path: ca.crt name: kube-root-ca.crt - downwardAPI: items: - fieldRef: apiVersion: v1 fieldPath: metadata.namespace path: namespace {{- end }} {{- if .Values.logs.persistence.enabled }} - name: logs persistentVolumeClaim: claimName: {{ template "airflow_logs_volume_claim" . }} {{- else if not $stateful }} - name: logs emptyDir: {{- toYaml (default (dict) .Values.logs.emptyDirConfig) | nindent 12 }} {{- else }} volumeClaimTemplates: - apiVersion: v1 kind: PersistentVolumeClaim metadata: name: logs {{- if or .Values.workers.celery.persistence.annotations .Values.workers.persistence.annotations }} annotations: {{- toYaml (.Values.workers.celery.persistence.annotations | default .Values.workers.persistence.annotations) | nindent 10 }} {{- end }} spec: {{- if or .Values.workers.celery.persistence.storageClassName .Values.workers.persistence.storageClassName }} storageClassName: {{ tpl (.Values.workers.celery.persistence.storageClassName | default .Values.workers.persistence.storageClassName) . | quote }} {{- end }} accessModes: ["ReadWriteOnce"] resources: requests: storage: {{ .Values.workers.celery.persistence.size | default .Values.workers.persistence.size }} {{- end }} {{- end }}