{{/* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */}} {{/* Create a default fully qualified app name. We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec). If release name contains chart name it will be used as a full name. */}} {{- define "airflow.fullname" -}} {{- if not .Values.useStandardNaming }} {{- .Release.Name }} {{- else if .Values.fullnameOverride }} {{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }} {{- else }} {{- $name := default .Chart.Name .Values.nameOverride }} {{- if contains $name .Release.Name }} {{- .Release.Name | trunc 63 | trimSuffix "-" }} {{- else }} {{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }} {{- end }} {{- end }} {{- end }} {{- define "airflow.serviceAccountName" -}} {{ if .Values.fullnameOverride }} {{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }} {{- else }} {{- $name := default .Chart.Name .Values.nameOverride }} {{- if contains $name .Release.Name }} {{- .Release.Name | trunc 63 | trimSuffix "-" }} {{- else }} {{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }} {{- end }} {{- end }} {{- end }} {{- define "airflow.tplDict" -}} {{- $rendered := dict -}} {{- range $key, $value := .values }} {{- $_ := set $rendered $key (tpl (toString $value) $.context) -}} {{- end }} {{- toYaml $rendered -}} {{- end }} {{/* Standard Airflow environment variables */}} {{- define "standard_airflow_environment" }} # Hard Coded Airflow Envs - name: AIRFLOW_HOME value: {{ .Values.airflowHome }} {{- if .Values.enableBuiltInSecretEnvVars.AIRFLOW__CORE__FERNET_KEY }} - name: AIRFLOW__CORE__FERNET_KEY valueFrom: secretKeyRef: name: {{ template "fernet_key_secret" . }} key: fernet-key {{- end }} {{- if .Values.enableBuiltInSecretEnvVars.AIRFLOW__DATABASE__SQL_ALCHEMY_CONN }} - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN valueFrom: secretKeyRef: name: {{ template "airflow_metadata_secret" . }} key: connection {{- end }} {{- if .Values.enableBuiltInSecretEnvVars.AIRFLOW_CONN_AIRFLOW_DB }} - name: AIRFLOW_CONN_AIRFLOW_DB valueFrom: secretKeyRef: name: {{ template "airflow_metadata_secret" . }} key: connection {{- end }} {{- $kedaEnabled := .Values.workers.keda.enabled }} {{- $kedaUsePgBouncer := .Values.workers.keda.usePgbouncer }} {{- if hasKey .Values.workers "celery" }} {{- $kedaEnabled = or .Values.workers.celery.keda.enabled (and (not (has .Values.workers.celery.keda.enabled (list true false))) .Values.workers.keda.enabled) }} {{- $kedaUsePgBouncer = or .Values.workers.celery.keda.usePgbouncer (and (not (has .Values.workers.celery.keda.usePgbouncer (list true false))) .Values.workers.keda.usePgbouncer) }} {{- end }} {{- if and $kedaEnabled (or (eq .Values.data.metadataConnection.protocol "mysql") (and .Values.pgbouncer.enabled (not $kedaUsePgBouncer))) }} - name: KEDA_DB_CONN valueFrom: secretKeyRef: name: {{ template "airflow_metadata_secret" . }} key: kedaConnection {{- end }} {{- if and (semverCompare "<3.0.0" .Values.airflowVersion) .Values.enableBuiltInSecretEnvVars.AIRFLOW__WEBSERVER__SECRET_KEY }} - name: AIRFLOW__WEBSERVER__SECRET_KEY valueFrom: secretKeyRef: name: {{ template "webserver_secret_key_secret" . }} key: webserver-secret-key {{- end }} {{- if and (semverCompare ">=3.0.0" .Values.airflowVersion) .Values.enableBuiltInSecretEnvVars.AIRFLOW__API__SECRET_KEY }} - name: AIRFLOW__API__SECRET_KEY valueFrom: secretKeyRef: name: {{ template "api_secret_key_secret" . }} key: api-secret-key {{- end }} {{- if and .IncludeJwtSecret (semverCompare ">=3.0.0" .Values.airflowVersion) .Values.enableBuiltInSecretEnvVars.AIRFLOW__API_AUTH__JWT_SECRET }} - name: AIRFLOW__API_AUTH__JWT_SECRET valueFrom: secretKeyRef: name: {{ template "jwt_secret" . }} key: jwt-secret {{- end }} {{- if or (contains "CeleryExecutor" .Values.executor) (contains "CeleryKubernetesExecutor" .Values.executor) }} {{- if and .Values.enableBuiltInSecretEnvVars.AIRFLOW__CELERY__RESULT_BACKEND (or .Values.data.resultBackendSecretName .Values.data.resultBackendConnection) }} - name: AIRFLOW__CELERY__RESULT_BACKEND valueFrom: secretKeyRef: name: {{ template "airflow_result_backend_secret" . }} key: connection {{- end }} {{- if .Values.enableBuiltInSecretEnvVars.AIRFLOW__CELERY__BROKER_URL }} - name: AIRFLOW__CELERY__BROKER_URL valueFrom: secretKeyRef: name: {{ template "airflow_broker_url_secret" . }} key: connection {{- end }} {{- end }} {{- if and .Values.elasticsearch.enabled .Values.enableBuiltInSecretEnvVars.AIRFLOW__ELASTICSEARCH__HOST }} - name: AIRFLOW__ELASTICSEARCH__HOST valueFrom: secretKeyRef: name: {{ template "elasticsearch_secret" . }} key: connection {{- end }} {{- if and .Values.opensearch.enabled .Values.enableBuiltInSecretEnvVars.AIRFLOW__OPENSEARCH__HOST }} - name: AIRFLOW__OPENSEARCH__HOST valueFrom: secretKeyRef: name: {{ template "opensearch_secret" . }} key: connection {{- end }} {{- end }} {{/* User defined Airflow environment variables */}} {{- define "custom_airflow_environment" }} # Dynamically created environment variables {{- range $i, $config := .Values.env }} - name: {{ $config.name }} value: {{ $config.value | quote }} {{- end }} # Dynamically created secret envs {{- range $i, $config := .Values.secret }} - name: {{ $config.envName }} valueFrom: secretKeyRef: name: {{ $config.secretName }} key: {{ default "value" $config.secretKey }} {{- end }} # Extra env {{- $Global := . }} {{- with .Values.extraEnv }} {{- tpl . $Global | nindent 2 }} {{- end }} {{- end }} {{/* User defined Airflow environment from */}} {{- define "custom_airflow_environment_from" }} {{- $Global := . }} {{- with .Values.extraEnvFrom }} {{- tpl . $Global | nindent 2 }} {{- end }} {{- end }} {{/* User defined gitSync container environment from */}} {{- define "custom_git_sync_environment_from" }} {{- $Global := . }} {{- with .Values.dags.gitSync.envFrom }} {{- tpl . $Global | nindent 2 }} {{- end }} {{- end }} {{/* Git ssh key volume */}} {{- define "git_sync_ssh_key_volume" }} - name: git-sync-ssh-key secret: secretName: {{ template "git_sync_ssh_key" . }} defaultMode: 288 {{- end }} {{/* Git sync container */}} {{- define "git_sync_container" }} - name: {{ .Values.dags.gitSync.containerName }}{{ if .is_init }}-init{{ end }} image: {{ template "git_sync_image" . }} imagePullPolicy: {{ .Values.images.gitSync.pullPolicy }} securityContext: {{- include "localContainerSecurityContext" .Values.dags.gitSync | nindent 4 }} envFrom: {{- include "custom_git_sync_environment_from" . | default "\n []" | indent 2 }} env: - name: GIT_SYNC_REV value: {{ .Values.dags.gitSync.rev | quote }} - name: GITSYNC_REF value: {{ .Values.dags.gitSync.ref | quote }} - name: GIT_SYNC_BRANCH value: {{ .Values.dags.gitSync.branch | quote }} - name: GIT_SYNC_REPO value: {{ .Values.dags.gitSync.repo | quote }} - name: GITSYNC_REPO value: {{ .Values.dags.gitSync.repo | quote }} - name: GIT_SYNC_DEPTH value: {{ .Values.dags.gitSync.depth | quote }} - name: GITSYNC_DEPTH value: {{ .Values.dags.gitSync.depth | quote }} - name: GIT_SYNC_ROOT value: "/git" - name: GITSYNC_ROOT value: "/git" - name: GIT_SYNC_DEST value: "repo" - name: GITSYNC_LINK value: "repo" - name: GIT_SYNC_ADD_USER value: "true" - name: GITSYNC_ADD_USER value: "true" - name: GITSYNC_PERIOD value: {{ .Values.dags.gitSync.period | quote }} - name: GIT_SYNC_MAX_SYNC_FAILURES value: {{ .Values.dags.gitSync.maxFailures | quote }} - name: GITSYNC_MAX_FAILURES value: {{ .Values.dags.gitSync.maxFailures | quote }} {{- if or .Values.dags.gitSync.sshKeySecret .Values.dags.gitSync.sshKey }} - name: GIT_SSH_KEY_FILE value: "/etc/git-secret/ssh" - name: GITSYNC_SSH_KEY_FILE value: "/etc/git-secret/ssh" - name: GIT_SYNC_SSH value: "true" - name: GITSYNC_SSH value: "true" {{- if .Values.dags.gitSync.knownHosts }} - name: GIT_KNOWN_HOSTS value: "true" - name: GITSYNC_SSH_KNOWN_HOSTS value: "true" - name: GIT_SSH_KNOWN_HOSTS_FILE value: "/etc/git-secret/known_hosts" - name: GITSYNC_SSH_KNOWN_HOSTS_FILE value: "/etc/git-secret/known_hosts" {{- else }} - name: GIT_KNOWN_HOSTS value: "false" - name: GITSYNC_SSH_KNOWN_HOSTS value: "false" {{- end }} {{ else if .Values.dags.gitSync.credentialsSecret }} - name: GIT_SYNC_USERNAME valueFrom: secretKeyRef: name: {{ .Values.dags.gitSync.credentialsSecret | quote }} key: GIT_SYNC_USERNAME - name: GITSYNC_USERNAME valueFrom: secretKeyRef: name: {{ .Values.dags.gitSync.credentialsSecret | quote }} key: GITSYNC_USERNAME - name: GIT_SYNC_PASSWORD valueFrom: secretKeyRef: name: {{ .Values.dags.gitSync.credentialsSecret | quote }} key: GIT_SYNC_PASSWORD - name: GITSYNC_PASSWORD valueFrom: secretKeyRef: name: {{ .Values.dags.gitSync.credentialsSecret | quote }} key: GITSYNC_PASSWORD {{- end }} {{- if .Values.dags.gitSync.wait }} - name: GIT_SYNC_WAIT value: {{ .Values.dags.gitSync.wait | quote }} {{- end }} {{- if .is_init }} - name: GIT_SYNC_ONE_TIME value: "true" - name: GITSYNC_ONE_TIME value: "true" {{- else }} - name: GIT_SYNC_HTTP_BIND value: ":{{ .Values.dags.gitSync.httpPort }}" - name: GITSYNC_HTTP_BIND value: ":{{ .Values.dags.gitSync.httpPort }}" {{- end }} {{- with .Values.dags.gitSync.env }} {{- toYaml . | nindent 4 }} {{- end }} resources: {{ toYaml .Values.dags.gitSync.resources | nindent 4 }} {{- if not .is_init }} {{- if .Values.dags.gitSync.startupProbe.enabled }} startupProbe: httpGet: path: / port: {{ .Values.dags.gitSync.httpPort }} timeoutSeconds: {{ .Values.dags.gitSync.startupProbe.timeoutSeconds }} initialDelaySeconds: {{ .Values.dags.gitSync.startupProbe.initialDelaySeconds }} periodSeconds: {{ .Values.dags.gitSync.startupProbe.periodSeconds }} failureThreshold: {{ .Values.dags.gitSync.startupProbe.failureThreshold }} {{- end }} {{- if and .Values.dags.gitSync.recommendedProbeSetting (hasKey .Values.dags.gitSync.livenessProbe "enabled") .Values.dags.gitSync.livenessProbe.enabled }} livenessProbe: httpGet: path: / port: {{ .Values.dags.gitSync.httpPort }} timeoutSeconds: {{ .Values.dags.gitSync.livenessProbe.timeoutSeconds | default 1 }} initialDelaySeconds: {{ .Values.dags.gitSync.livenessProbe.initialDelaySeconds | default 0 }} periodSeconds: {{ .Values.dags.gitSync.livenessProbe.periodSeconds | default 5 }} failureThreshold: {{ .Values.dags.gitSync.livenessProbe.failureThreshold | default 10 }} {{- else if .Values.dags.gitSync.livenessProbe }} livenessProbe: {{ tpl (toYaml .Values.dags.gitSync.livenessProbe) . | nindent 4 }} {{- end }} {{- if and .Values.dags.gitSync.readinessProbe (not .Values.dags.gitSync.recommendedProbeSetting) }} readinessProbe: {{ tpl (toYaml .Values.dags.gitSync.readinessProbe) . | nindent 4 }} {{- end }} {{- end }} volumeMounts: - name: dags mountPath: /git {{- if or .Values.dags.gitSync.sshKeySecret .Values.dags.gitSync.sshKey }} - name: git-sync-ssh-key mountPath: /etc/git-secret/ssh readOnly: true subPath: gitSshKey {{- if .Values.dags.gitSync.knownHosts }} - name: config mountPath: /etc/git-secret/known_hosts readOnly: true subPath: known_hosts {{- end }} {{- end }} {{- if .Values.dags.gitSync.extraVolumeMounts }} {{- tpl (toYaml .Values.dags.gitSync.extraVolumeMounts) . | nindent 2 }} {{- end }} {{- if and .Values.dags.gitSync.containerLifecycleHooks (not .is_init) }} lifecycle: {{- tpl (toYaml .Values.dags.gitSync.containerLifecycleHooks) . | nindent 4 }} {{- end }} {{- end }} {{/* This helper will change when customers deploy a new image */}} {{- define "airflow_image" -}} {{- $repository := .Values.images.airflow.repository | default .Values.defaultAirflowRepository -}} {{- $tag := .Values.images.airflow.tag | default .Values.defaultAirflowTag -}} {{- $digest := .Values.images.airflow.digest | default .Values.defaultAirflowDigest -}} {{- if $digest }} {{- printf "%s@%s" $repository $digest -}} {{- else }} {{- printf "%s:%s" $repository $tag -}} {{- end }} {{- end }} {{- define "pod_template_image" -}} {{- printf "%s:%s" (.Values.images.pod_template.repository | default .Values.defaultAirflowRepository) (.Values.images.pod_template.tag | default .Values.defaultAirflowTag) }} {{- end }} {{/* This helper is used for airflow containers that do not need the users code */}} {{ define "default_airflow_image" -}} {{- $repository := .Values.defaultAirflowRepository -}} {{- $tag := .Values.defaultAirflowTag -}} {{- $digest := .Values.defaultAirflowDigest -}} {{- if $digest }} {{- printf "%s@%s" $repository $digest -}} {{- else }} {{- printf "%s:%s" $repository $tag -}} {{- end }} {{- end }} {{ define "airflow_image_for_migrations" -}} {{- if .Values.images.useDefaultImageForMigration }} {{- template "default_airflow_image" . }} {{- else }} {{- template "airflow_image" . }} {{- end }} {{- end }} {{- define "flower_image" -}} {{- printf "%s:%s" (.Values.images.flower.repository | default .Values.defaultAirflowRepository) (.Values.images.flower.tag | default .Values.defaultAirflowTag) }} {{- end }} {{- define "statsd_image" -}} {{- printf "%s:%s" .Values.images.statsd.repository .Values.images.statsd.tag }} {{- end }} {{- define "redis_image" -}} {{- printf "%s:%s" .Values.images.redis.repository .Values.images.redis.tag }} {{- end }} {{- define "pgbouncer_image" -}} {{- printf "%s:%s" .Values.images.pgbouncer.repository .Values.images.pgbouncer.tag }} {{- end }} {{- define "pgbouncer_exporter_image" -}} {{- printf "%s:%s" .Values.images.pgbouncerExporter.repository .Values.images.pgbouncerExporter.tag }} {{- end }} {{- define "git_sync_image" -}} {{- printf "%s:%s" .Values.images.gitSync.repository .Values.images.gitSync.tag }} {{- end }} {{- define "fernet_key_secret" -}} {{- default (printf "%s-fernet-key" (include "airflow.fullname" .)) .Values.fernetKeySecretName }} {{- end }} {{- define "jwt_secret" -}} {{- default (printf "%s-jwt-secret" (include "airflow.fullname" .)) .Values.jwtSecretName }} {{- end }} {{- define "webserver_secret_key_secret" -}} {{- default (printf "%s-webserver-secret-key" (include "airflow.fullname" .)) .Values.webserverSecretKeySecretName }} {{- end }} {{- define "api_secret_key_secret" -}} {{- default (printf "%s-api-secret-key" (include "airflow.fullname" .)) .Values.apiSecretKeySecretName }} {{- end }} {{- define "redis_password_secret" -}} {{- default (printf "%s-redis-password" (include "airflow.fullname" .)) .Values.redis.passwordSecretName }} {{- end }} {{- define "airflow_broker_url_secret" -}} {{- default (printf "%s-broker-url" (include "airflow.fullname" .)) .Values.data.brokerUrlSecretName }} {{- end }} {{- define "airflow_metadata_secret" -}} {{- default (printf "%s-metadata" (include "airflow.fullname" .)) .Values.data.metadataSecretName }} {{- end }} {{- define "airflow_result_backend_secret" -}} {{- default (printf "%s-result-backend" (include "airflow.fullname" .)) .Values.data.resultBackendSecretName }} {{- end }} {{- define "airflow_pod_template_file" -}} {{- printf "%s/pod_templates" .Values.airflowHome }} {{- end }} {{- define "pgbouncer_config_secret" -}} {{- default (printf "%s-pgbouncer-config" (include "airflow.fullname" .)) .Values.pgbouncer.configSecretName }} {{- end }} {{- define "pgbouncer_certificates_secret" -}} {{- printf "%s-pgbouncer-certificates" (include "airflow.fullname" .) }} {{- end }} {{- define "pgbouncer_stats_secret" -}} {{- default (printf "%s-pgbouncer-stats" (include "airflow.fullname" .)) .Values.pgbouncer.metricsExporterSidecar.statsSecretName }} {{- end }} {{- define "image_pull_secrets" -}} {{- $secrets := default (list .Values.registry.secretName) .Values.imagePullSecrets -}} {{- $secrets = ($secrets | compact | uniq) -}} {{- if and (not $secrets) .Values.registry.connection -}} {{- $secrets = append $secrets (printf "%s-registry" (include "airflow.fullname" .)) -}} {{- end -}} {{- $out := list -}} {{- range $n := $secrets }} {{- if kindIs "string" $n }} {{- $n = dict "name" $n -}} {{- end -}} {{- $out = append $out (dict "name" $n.name) -}} {{- end -}} {{- toYaml $out -}} {{- end }} {{- define "elasticsearch_secret" -}} {{- default (printf "%s-elasticsearch" (include "airflow.fullname" .)) .Values.elasticsearch.secretName }} {{- end }} {{- define "opensearch_secret" -}} {{- default (printf "%s-opensearch" (include "airflow.fullname" .)) .Values.opensearch.secretName }} {{- end }} {{- define "flower_secret" -}} {{- default (printf "%s-flower" (include "airflow.fullname" .)) .Values.flower.secretName }} {{- end }} {{- define "kerberos_keytab_secret" -}} {{- printf "%s-kerberos-keytab" (include "airflow.fullname" .) }} {{- end }} {{- define "kerberos_ccache_path" -}} {{- printf "%s/%s" .Values.kerberos.ccacheMountPath .Values.kerberos.ccacheFileName }} {{- end }} {{/* Create the name of the git sync ssh secret to use */}} {{- define "git_sync_ssh_key" -}} {{- default (printf "%s-ssh-secret" (include "airflow.fullname" .)) .Values.dags.gitSync.sshKeySecret }} {{- end }} {{- define "celery_executor_namespace" -}} {{- print "airflow.providers.celery.executors.celery_executor.app" -}} {{- end }} {{- define "pgbouncer_config" -}} {{ $resultBackendConnection := .Values.data.resultBackendConnection | default .Values.data.metadataConnection }} {{ $pgMetadataHost := .Values.data.metadataConnection.host | default (printf "%s-%s.%s" .Release.Name "postgresql" .Release.Namespace) }} {{ $pgResultBackendHost := $resultBackendConnection.host | default (printf "%s-%s.%s" .Release.Name "postgresql" .Release.Namespace) }} [databases] {{ .Release.Name }}-metadata = host={{ $pgMetadataHost }} dbname={{ tpl .Values.data.metadataConnection.db . }} port={{ .Values.data.metadataConnection.port }} pool_size={{ .Values.pgbouncer.metadataPoolSize }} {{ .Values.pgbouncer.extraIniMetadata | default "" }} {{ .Release.Name }}-result-backend = host={{ $pgResultBackendHost }} dbname={{ tpl $resultBackendConnection.db . }} port={{ $resultBackendConnection.port }} pool_size={{ .Values.pgbouncer.resultBackendPoolSize }} {{ .Values.pgbouncer.extraIniResultBackend | default "" }} [pgbouncer] pool_mode = transaction listen_port = {{ .Values.ports.pgbouncer }} listen_addr = * auth_type = {{ .Values.pgbouncer.auth_type }} auth_file = {{ .Values.pgbouncer.auth_file }} stats_users = {{ tpl .Values.data.metadataConnection.user . }} ignore_startup_parameters = extra_float_digits max_client_conn = {{ .Values.pgbouncer.maxClientConn }} verbose = {{ .Values.pgbouncer.verbose }} log_disconnections = {{ .Values.pgbouncer.logDisconnections }} log_connections = {{ .Values.pgbouncer.logConnections }} server_tls_sslmode = {{ .Values.pgbouncer.sslmode }} server_tls_ciphers = {{ .Values.pgbouncer.ciphers }} {{- if .Values.pgbouncer.ssl.ca }} server_tls_ca_file = /etc/pgbouncer/root.crt {{- end }} {{- if .Values.pgbouncer.ssl.cert }} server_tls_cert_file = /etc/pgbouncer/server.crt {{- end }} {{- if .Values.pgbouncer.ssl.key }} server_tls_key_file = /etc/pgbouncer/server.key {{- end }} {{- if .Values.pgbouncer.extraIni }} {{ .Values.pgbouncer.extraIni }} {{- end }} {{- end }} {{ define "pgbouncer_users" }} {{- $resultBackendConnection := .Values.data.resultBackendConnection | default .Values.data.metadataConnection }} {{ tpl .Values.data.metadataConnection.user . | quote }} {{ .Values.data.metadataConnection.pass | quote }} {{ tpl $resultBackendConnection.user . | quote }} {{ $resultBackendConnection.pass | quote }} {{- end }} {{- define "airflow_logs" -}} {{- printf "%s/logs" .Values.airflowHome | quote }} {{- end }} {{- define "airflow_logs_no_quote" -}} {{- printf "%s/logs" .Values.airflowHome }} {{- end }} {{- define "airflow_logs_volume_claim" -}} {{- if .Values.logs.persistence.existingClaim }} {{- .Values.logs.persistence.existingClaim }} {{- else }} {{- printf "%s-logs" .Release.Name }} {{- end }} {{- end }} {{- define "airflow_dags" -}} {{- if .Values.dags.mountPath }} {{- if .Values.dags.gitSync.enabled }} {{- printf "%s/repo/%s" .Values.dags.mountPath .Values.dags.gitSync.subPath }} {{- else }} {{- printf "%s" .Values.dags.mountPath }} {{- end }} {{- else }} {{- if .Values.dags.gitSync.enabled }} {{- printf "%s/dags/repo/%s" .Values.airflowHome .Values.dags.gitSync.subPath }} {{- else }} {{- printf "%s/dags" .Values.airflowHome }} {{- end }} {{- end }} {{- end }} {{- define "airflow_dags_volume_claim" -}} {{- if .Values.dags.persistence.existingClaim }} {{- .Values.dags.persistence.existingClaim }} {{- else }} {{- printf "%s-dags" .Release.Name }} {{- end }} {{- end }} {{- define "airflow_dags_mount" -}} - name: dags {{- if .Values.dags.mountPath }} mountPath: {{ .Values.dags.mountPath }} {{- else }} mountPath: {{ printf "%s/dags" .Values.airflowHome }} {{- end }} {{- if .Values.dags.persistence.subPath }} subPath: {{ .Values.dags.persistence.subPath }} {{- end }} readOnly: {{ .Values.dags.gitSync.enabled | ternary "True" "False" }} {{- end }} {{- define "airflow_config_path" -}} {{- printf "%s/airflow.cfg" .Values.airflowHome | quote }} {{- end }} {{- define "airflow_webserver_config_path" -}} {{- printf "%s/webserver_config.py" .Values.airflowHome | quote }} {{- end }} {{- define "airflow_webserver_config_configmap_name" -}} {{- if .Values.webserver.webserverConfigConfigMapName }} {{- tpl .Values.webserver.webserverConfigConfigMapName . }} {{- else }} {{- printf "%s-webserver-config" (include "airflow.fullname" .) }} {{- end }} {{- end }} {{- define "airflow_webserver_config_mount" -}} - name: webserver-config mountPath: {{ template "airflow_webserver_config_path" . }} subPath: webserver_config.py readOnly: True {{- end }} {{- define "airflow_api_server_config_configmap_name" -}} {{- if .Values.apiServer.apiServerConfigConfigMapName }} {{- tpl .Values.apiServer.apiServerConfigConfigMapName . }} {{- else }} {{- printf "%s-api-server-config" (include "airflow.fullname" .) }} {{- end }} {{- end }} {{- define "airflow_api_server_config_mount" -}} - name: api-server-config mountPath: {{ template "airflow_webserver_config_path" . }} subPath: webserver_config.py readOnly: True {{- end }} {{- define "airflow_local_setting_path" -}} {{- printf "%s/config/airflow_local_settings.py" .Values.airflowHome | quote }} {{- end }} {{- define "airflow_config" -}} {{- printf "%s-config" (include "airflow.fullname" .) }} {{- end }} {{- define "airflow_config_mount" -}} - name: config mountPath: {{ template "airflow_config_path" . }} subPath: airflow.cfg readOnly: true {{- if .Values.airflowLocalSettings }} - name: config mountPath: {{ template "airflow_local_setting_path" . }} subPath: airflow_local_settings.py readOnly: true {{- end }} {{- end }} {{/* Helper for service account name generation */}} {{- define "_serviceAccountNameGen" -}} {{- if .sa.create }} {{- default (printf "%s-%s" (include "airflow.serviceAccountName" .) (default .key .nameSuffix)) .sa.name | quote }} {{- else }} {{- default "default" .sa.name | quote }} {{- end }} {{- end }} {{/* Helper to generate service account name respecting .Values.$section.serviceAccount or .Values.$section.$subSection.serviceAccount flags */}} {{- define "_serviceAccountName" -}} {{- if .subKey }} {{- $sa := get (get (get .Values .key) .subKey) "serviceAccount" -}} {{- include "_serviceAccountNameGen" (merge (dict "sa" $sa "key" .key "nameSuffix" .nameSuffix) .) }} {{- else }} {{- $sa := get (get .Values .key) "serviceAccount" }} {{- include "_serviceAccountNameGen" (merge (dict "sa" $sa "key" .key "nameSuffix" .nameSuffix) .) }} {{- end }} {{- end }} {{/* Create the name of the webserver service account to use */}} {{- define "webserver.serviceAccountName" -}} {{- include "_serviceAccountName" (merge (dict "key" "webserver") .) -}} {{- end }} {{/* Create the name of the API server service account to use */}} {{- define "apiServer.serviceAccountName" -}} {{- include "_serviceAccountName" (merge (dict "key" "apiServer" "nameSuffix" "api-server" ) .) -}} {{- end }} {{/* Create the name of the redis service account to use */}} {{- define "redis.serviceAccountName" -}} {{- include "_serviceAccountName" (merge (dict "key" "redis") .) -}} {{- end }} {{/* Create the name of the flower service account to use */}} {{- define "flower.serviceAccountName" -}} {{- include "_serviceAccountName" (merge (dict "key" "flower") .) -}} {{- end }} {{/* Create the name of the scheduler service account to use */}} {{- define "scheduler.serviceAccountName" -}} {{- include "_serviceAccountName" (merge (dict "key" "scheduler") .) -}} {{- end }} {{/* Create the name of the StatsD service account to use */}} {{- define "statsd.serviceAccountName" -}} {{- include "_serviceAccountName" (merge (dict "key" "statsd") .) -}} {{- end }} {{/* Create the name of the create user job service account to use */}} {{- define "createUserJob.serviceAccountName" -}} {{- include "_serviceAccountName" (merge (dict "key" "createUserJob" "nameSuffix" "create-user-job") .) -}} {{- end }} {{/* Create the name of the migrate database job service account to use */}} {{- define "migrateDatabaseJob.serviceAccountName" -}} {{- include "_serviceAccountName" (merge (dict "key" "migrateDatabaseJob" "nameSuffix" "migrate-database-job") .) -}} {{- end }} {{/* Create the name of the worker service account to use */}} {{- define "worker.serviceAccountName" -}} {{- if and (hasKey .Values.workers "name") (ne .Values.workers.name "default") }} {{- include "_serviceAccountName" (merge (dict "key" "workers" "nameSuffix" (printf "%s-%s" "worker" .Values.workers.name)) .) -}} {{- else }} {{- include "_serviceAccountName" (merge (dict "key" "workers" "nameSuffix" "worker") .) -}} {{- end }} {{- end }} {{/* Create the name of the worker kubernetes service account to use */}} {{- define "worker.kubernetes.serviceAccountName" -}} {{- include "_serviceAccountName" (merge (dict "key" "workers" "subKey" "kubernetes" "nameSuffix" "worker-kubernetes") .) -}} {{- end }} {{/* Create the name of the triggerer service account to use */}} {{- define "triggerer.serviceAccountName" -}} {{- include "_serviceAccountName" (merge (dict "key" "triggerer") .) -}} {{- end }} {{/* Determine trigger capacity, taking Airflow 2 and 3 config option differences into account */}} {{- define "triggerer.capacity" -}} {{- $triggerer_section := .Values.config.triggerer | default dict }} {{- $triggerer_section.capacity | default $triggerer_section.default_capacity | default 1000 | int -}} {{- end -}} {{/* Create the name of the dag processor service account to use */}} {{- define "dagProcessor.serviceAccountName" -}} {{- include "_serviceAccountName" (merge (dict "key" "dagProcessor" "nameSuffix" "dag-processor") .) -}} {{- end }} {{/* Create the name of the pgbouncer service account to use */}} {{- define "pgbouncer.serviceAccountName" -}} {{- include "_serviceAccountName" (merge (dict "key" "pgbouncer") .) -}} {{- end }} {{/* Create the name of the cleanup service account to use */}} {{- define "cleanup.serviceAccountName" -}} {{- include "_serviceAccountName" (merge (dict "key" "cleanup") .) -}} {{- end }} {{/* Create the name of the database cleanup service account to use */}} {{- define "databaseCleanup.serviceAccountName" -}} {{- include "_serviceAccountName" (merge (dict "key" "databaseCleanup" "nameSuffix" "database-cleanup") .) -}} {{- end }} {{- define "wait-for-migrations-command" }} - airflow - db - check-migrations - --migration-wait-timeout={{ .Values.images.migrationsWaitTimeout }} {{- end }} {{- define "scheduler_liveness_check_command" }} - sh - -c - | CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL=ERROR exec /entrypoint \ airflow jobs check --job-type SchedulerJob --local {{- end }} {{- define "scheduler_startup_check_command" }} - sh - -c - | CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL=ERROR exec /entrypoint \ airflow jobs check --job-type SchedulerJob --local {{- end }} {{- define "triggerer_liveness_check_command" }} - sh - -c - | CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL=ERROR exec /entrypoint \ airflow jobs check --job-type TriggererJob --local {{- end }} {{- define "dag_processor_liveness_check_command" }} - sh - -c - | CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL=ERROR exec /entrypoint \ airflow jobs check --local --job-type DagProcessorJob {{- end }} {{- define "registry_docker_config" }} {{- $user := .Values.registry.connection.user }} {{- $pass := .Values.registry.connection.pass }} {{- $config := dict "auths" }} {{- $auth := dict }} {{- $data := dict }} {{- $_ := set $data "username" $user }} {{- $_ := set $data "password" $pass }} {{- $_ := set $data "email" .Values.registry.connection.email }} {{- $_ := set $data "auth" (printf "%v:%v" $user $pass | b64enc) }} {{- $_ := set $auth .Values.registry.connection.host $data }} {{- $_ := set $config "auths" $auth }} {{ $config | toJson | print }} {{- end }} {{/* Set the default value for pod securityContext If no value is passed for securityContexts.pod or .securityContexts.pod or legacy securityContext and .securityContext, defaults to global uid and gid. +-----------------------------+ +------------------------+ +----------------------+ +-----------------+ +-------------------------+ | .securityContexts.pod | -> | .securityContext | -> | securityContexts.pod | -> | securityContext | -> | Values.uid + Values.gid | +-----------------------------+ +------------------------+ +----------------------+ +-----------------+ +-------------------------+ Values are not accumulated meaning that if runAsUser is set to 10 in .securityContexts.pod, any extra values set to securityContext or uid+gid will be ignored. The template can be called like so: include "airflowPodSecurityContext" (list .Values.webserver .Values) Where `.Values` is the global variables scope and `.Values.webserver` the local variables scope for the webserver template. Priority of values are from left to right, meaning if first value is not empty, the rest will not be evaluated. */}} {{- define "airflowPodSecurityContext" }} {{- $ := last . }} {{- $result := dict }} {{- range . }} {{- if and (hasKey . "securityContexts") (hasKey .securityContexts "pod") .securityContexts.pod }} {{- $result = .securityContexts.pod }} {{- break }} {{- end }} {{- if and (hasKey . "securityContext") .securityContext }} {{- $result = .securityContext }} {{- break }} {{- end }} {{- end }} {{- if $result }} {{- toYaml $result | print }} {{- else }} runAsUser: {{ $.uid }} fsGroup: {{ $.gid }} {{- end }} {{- end }} {{/* Set the default value for pod securityContext If no value is passed for .securityContexts.pod or .securityContext, defaults to UID in the local node. +-----------------------------+ +------------------------+ +------------+ | .securityContexts.pod | -> | .securityContext | -> | .uid | +-----------------------------+ +------------------------+ +------------+ The template can be called like so: include "localPodSecurityContext" (list . .Values.schedule) It is important to pass the local variables scope to this template as it is used to determine the local node value for uid. */}} {{- define "localPodSecurityContext" -}} {{- if .securityContexts.pod -}} {{ toYaml .securityContexts.pod | print }} {{- else if .securityContext -}} {{ toYaml .securityContext | print }} {{- else -}} runAsUser: {{ .uid }} {{- end -}} {{- end -}} {{/* Set the default value for container securityContext If no value is passed for .securityContexts.container or .securityContext, defaults to UID in the local node. +-----------------------------------+ +------------------------+ +------------+ | .securityContexts.container | -> | .securityContext | -> | .uid | +-----------------------------------+ +------------------------+ +------------+ The template can be called like so: include "localContainerSecurityContext" .Values.statsd It is important to pass the local variables scope to this template as it is used to determine the local node value for uid. */}} {{- define "localContainerSecurityContext" -}} {{- if .securityContexts.container -}} {{ toYaml .securityContexts.container | print }} {{- else if .securityContext -}} {{ toYaml .securityContext | print }} {{- else -}} runAsUser: {{ .uid }} {{- end -}} {{- end -}} {{/* Set the default value for workers chown for persistent storage If no value is passed for securityContexts.pod or .securityContexts.pod or legacy securityContext and .securityContext, defaults to global uid and gid. The template looks for `runAsUser` and `fsGroup` specifically, any other parameter will be ignored. +-----------------------------+ +----------------------------------------------------+ +------------------+ +-------------------------+ | .securityContexts.pod | -> | securityContexts.pod | .securityContexts.pod | -> | securityContexts | -> | Values.uid + Values.gid | +-----------------------------+ +----------------------------------------------------+ +------------------+ +-------------------------+ Values are not accumulated meaning that if runAsUser is set to 10 in .securityContexts.pod, any extra values set to securityContexts or uid+gid will be ignored. The template can be called like so: include "airflowPodSecurityContextsIds" (list . .Values.webserver) Where `.` is the global variables scope and `.Values.workers` the local variables scope for the workers template. */}} {{- define "airflowPodSecurityContextsIds" -}} {{- $ := index . 0 -}} {{- with index . 1 }} {{- if .securityContexts.pod -}} {{ pluck "runAsUser" .securityContexts.pod | first | default $.Values.uid }}:{{ pluck "fsGroup" .securityContexts.pod | first | default $.Values.gid }} {{- else if $.Values.securityContext -}} {{ pluck "runAsUser" $.Values.securityContext | first | default $.Values.uid }}:{{ pluck "fsGroup" $.Values.securityContext | first | default $.Values.gid }} {{- else if $.Values.securityContexts.pod -}} {{ pluck "runAsUser" $.Values.securityContexts.pod | first | default $.Values.uid }}:{{ pluck "fsGroup" $.Values.securityContexts.pod | first | default $.Values.gid }} {{- else -}} {{ $.Values.uid }}:{{ $.Values.gid }} {{- end -}} {{- end -}} {{- end -}} {{/* Set the default value for container securityContext If no value is passed for securityContexts.container or .securityContexts.container, defaults to deny privileges escalation and dropping all POSIX capabilities. +-----------------------------------+ +-----------------------------+ +------------------------------------------------------------+ | .securityContexts.container | -> | securityContexts.containers | -> | allowPrivilegesEscalation: false, capabilities.drop: [ALL] | +-----------------------------------+ +-----------------------------+ +------------------------------------------------------------+ The template can be called like so: include "containerSecurityContext" (list .Values.webserver .Values) Where `.Values` is the global variables scope and `.Values.webserver` the local variables scope for the webserver template. Priority of values are from left to right, meaning if first value is not empty, the rest will not be evaluated. */}} {{- define "containerSecurityContext" -}} {{- $ := last . }} {{- $result := dict }} {{- range . }} {{- if and (hasKey . "securityContexts") (hasKey .securityContexts "container") .securityContexts.container }} {{- $result = .securityContexts.container }} {{- break }} {{- end }} {{- end }} {{- if $result }} {{- toYaml $result | print }} {{- else if and (hasKey $ "securityContexts") (hasKey $.securityContexts "containers") $.securityContexts.containers }} {{- toYaml $.securityContexts.containers | print }} {{- else -}} allowPrivilegeEscalation: false capabilities: drop: - ALL {{- end }} {{- end -}} {{/* Set the default value for external container securityContext(redis and statsd). If no value is passed for .securityContexts.container, defaults to deny privileges escalation and dropping all POSIX capabilities. +-----------------------------------+ +------------------------------------------------------------+ | .securityContexts.container | -> | allowPrivilegesEscalation: false, capabilities.drop: [ALL] | +-----------------------------------+ +------------------------------------------------------------+ The template can be called like so: include "externalContainerSecurityContext" .Values.statsd */}} {{- define "externalContainerSecurityContext" -}} {{- if .securityContexts.container -}} {{ toYaml .securityContexts.container | print }} {{- else -}} allowPrivilegeEscalation: false capabilities: drop: - ALL {{- end -}} {{- end -}} {{- define "container_extra_envs" -}} {{- $ := index . 0 -}} {{- $env := index . 1 -}} {{- range $i, $config := $env }} - name: {{ $config.name }} {{- if $config.value }} value: {{ $config.value | quote }} {{- else if $config.valueFrom }} valueFrom: {{- if $config.valueFrom.secretKeyRef }} secretKeyRef: name: {{ $config.valueFrom.secretKeyRef.name }} key: {{ $config.valueFrom.secretKeyRef.key }} {{- else if $config.valueFrom.configMapKeyRef }} configMapKeyRef: name: {{ $config.valueFrom.configMapKeyRef.name }} key: {{ $config.valueFrom.configMapKeyRef.key }} {{- end }} {{- end }} {{- end }} {{- end }} {{/* Convert a Kubernetes CPU limit (e.g., "500m", "1.5", "2", "750m") into an integer number of CPU cores. */}} {{- define "cpu_count" -}} {{- $v := toString . -}} {{- if hasSuffix "m" $v -}} {{- /* millicores path: e.g. 500m, 1500m */ -}} {{- $m := float64 (trimSuffix "m" $v) -}} {{- int (ceil (divf $m 1000)) -}} {{- else -}} {{- /* plain cores: e.g. 0.5, 1, 1.5 */ -}} {{- int (ceil (float64 $v)) -}} {{- end -}} {{- end }} {{/* Check if the current executor launches pods This helper function returns true if the executor can launch pods (Kubernetes-based executors) */}} {{- define "airflow.podLaunchingExecutor" -}} {{- if or (contains "CeleryExecutor" .Values.executor) (contains "CeleryKubernetesExecutor" .Values.executor) (contains "KubernetesExecutor" .Values.executor) (contains "LocalKubernetesExecutor" .Values.executor) -}} {{- print "true" -}} {{- end -}} {{- end -}} {{/* Get revisionHistoryLimit with nil-aware fallback. Unlike `or`, this properly handles 0 as a valid value. Pass a list of values to check in order of priority. Usage: include "airflow.revisionHistoryLimit" (list .Values.scheduler.revisionHistoryLimit .Values.revisionHistoryLimit) */}} {{- define "airflow.revisionHistoryLimit" -}} {{- $result := "" -}} {{- range . -}} {{- if and (not (kindIs "invalid" .)) -}} {{- $result = . -}} {{- break -}} {{- end -}} {{- end -}} {{- $result -}} {{- end -}} {{/* Determine if the create-user job should be enabled. When webserver.defaultUser is set (deprecated), it takes precedence to preserve backwards compatibility. Otherwise, fall back to createUserJob.enabled. */}} {{- define "createUserJob.isEnabled" -}} {{- if .Values.webserver.defaultUser -}} {{- .Values.webserver.defaultUser.enabled -}} {{- else -}} {{- .Values.createUserJob.enabled -}} {{- end -}} {{- end -}} {{/* Convert dagBundleConfigList YAML list to JSON string for dag_bundle_config_list. This helper function converts the structured YAML format to the JSON string format required by Airflow's dag_bundle_config_list configuration. Usage: config: dag_processor: dag_bundle_config_list: '{{ include "dag_bundle_config_list" . }}' */}} {{- define "dag_bundle_config_list" -}} {{- if .Values.dagProcessor.dagBundleConfigList -}} {{- $bundles := list -}} {{- range .Values.dagProcessor.dagBundleConfigList -}} {{- $bundle := dict "name" .name "classpath" .classpath "kwargs" (.kwargs | default dict) -}} {{- $bundles = append $bundles $bundle -}} {{- end -}} {{- $bundles | toJson -}} {{- else -}} {{- "[]" -}} {{- end -}} {{- end }} {{/* Custom merge function which enables full map overwrite and `or` logic for boolean overwrite. It takes 4 arguments where: 1. Input map (source for merge) 2. Input map with values which will overwrite values from first map 3. Root section name (used for `or` boolean logic) 4. List of section names for which logic for boolean parameters should be `or` instead of overwrite Usage: {{ include "workersMergeValues" (list .Values.workers .Values.workers.celery "" (list "kerberosInitContainer")) }} */}} {{- define "workersMergeValues" -}} {{- $inputMap := index . 0 -}} {{- $overwriteMap := index . 1 -}} {{- $sectionName := index . 2 -}} {{- $orBoolean := index . 3 -}} {{- $outputMap := dict -}} {{- $fullOverwrite := list "annotations" "podAnnotations" "persistentVolumeClaimRetentionPolicy" "pod" "container" "securityContext" "containerLifecycleHooks" "config" "advanced" "behavior" "resources" "nodeSelector" "affinity" "labels" -}} {{- range $key, $val := $inputMap -}} {{/* Below if holds logic for full overwrite map logic */}} {{- if and (hasKey $overwriteMap $key) (has $key $fullOverwrite) -}} {{- $_ := set $outputMap $key (get $overwriteMap $key) -}} {{/* Below if holds base logic for merge values */}} {{- else if and (hasKey $overwriteMap $key) (kindIs "map" $val) -}} {{- $nested := include "workersMergeValues" (list $val (get $overwriteMap $key) $key $orBoolean) | fromYaml -}} {{- if gt (len $nested) 0 -}} {{- $_ := set $outputMap $key $nested -}} {{- end -}} {{/* Below if contains check for `slice` to fully overwrite list parameters */}} {{- else if and (hasKey $overwriteMap $key) (not (and (kindIs "slice" (get $overwriteMap $key)) (eq (len (get $overwriteMap $key)) 0))) -}} {{/* Below if holds `or` logic for boolean fields in a map */}} {{- if and (kindIs "bool" $val) (has $sectionName $orBoolean) -}} {{- $_ := set $outputMap $key (or $val (get $overwriteMap $key)) -}} {{- else -}} {{- $_ := set $outputMap $key (get $overwriteMap $key) -}} {{- end -}} {{- else -}} {{- $_ := set $outputMap $key $val -}} {{- end -}} {{- end -}} {{/* Below if makes sure that all fields in overwrite map input parameter will be copied to source map (if they don't exist) */}} {{- range $key, $val := $overwriteMap -}} {{- if not (hasKey $inputMap $key) -}} {{- $_ := set $outputMap $key $val -}} {{- end -}} {{- end -}} {{- toYaml $outputMap -}} {{- end -}} {{/* Remove all nil fields in a map and return new dict object. Usage: {{ include "removeNilFields" . }} */}} {{- define "removeNilFields" -}} {{- $newValues := dict -}} {{- range $key, $val := . -}} {{- if kindIs "map" $val -}} {{- $nested := include "removeNilFields" $val | fromYaml -}} {{- if gt (len $nested) 0 -}} {{- $_ := set $newValues $key $nested -}} {{- end -}} {{- else if not (eq $val nil) -}} {{- $_ := set $newValues $key $val -}} {{- end -}} {{- end -}} {{- toYaml $newValues -}} {{- end -}}