2026-06-04 11:46:59 +03:00

327 lines
12 KiB
YAML

{{- $serviceHostName := printf "%s-http.%s" (include "nifi.fullname" . ) .Release.Namespace }}
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ include "nifi.fullname" . }}
labels:
{{- include "nifi.labels" . | nindent 4 }}
data:
custom-startup.sh: |
#!/bin/bash -e
prop_add () {
target_file="${3:-${nifi_props_file}}"
echo "adding property to target file ${target_file}"
echo "$1=$2" >> "${target_file}"
}
prop_remove () {
target_file="${3:-${nifi_props_file}}"
echo "removing property from target file ${target_file}"
sed -i -e "s|^$1=.*$||" "${target_file}"
}
authorizers_file='conf/authorizers.xml'
bootstrap_file='conf/bootstrap.conf'
nifi_properties_file='conf/nifi.properties'
logback_file='conf/logback.xml'
scripts_dir='/opt/nifi/scripts'
[ -f "${scripts_dir}/common.sh" ] && . "${scripts_dir}/common.sh"
{{- /* Set host connection properties so the node is reachable, with TLS hostname verification */}}
host_name="${HOSTNAME}.{{ include "nifi.fullname" . }}.{{ .Release.Namespace }}"
pod_index=$(echo ${HOSTNAME} | sed -E 's/^.*([0-9]+)$/\1/g')
prop_replace 'nifi.web.https.host' "${host_name}"
export NIFI_WEB_HTTPS_HOST="${host_name}"
export NIFI_WEB_PROXY_HOST=" \
${host_name}, \
{{ .Values.ingress.hostName }}, \
{{ include "nifi.siteToSiteHostName" . }}, \
{{ include "nifi.siteToSiteHostName" . }}:443, \
{{ printf "%s-${pod_index}.%s" (include "nifi.fullname" .) (include "nifi.siteToSiteHostName" .) }}, \
{{ printf "%s-${pod_index}.%s" (include "nifi.fullname" .) (include "nifi.siteToSiteHostName" .) }}:443, \
{{ $serviceHostName }}, \
{{ $serviceHostName }}:{{ .Values.ports.https }}"
{{- /* S2S cluster-local connections */}}
export NIFI_REMOTE_INPUT_HOST="${HOSTNAME}"
prop_add 'nifi.remote.route.raw.cluster.when' '${s2s.source.hostname:equals('\''{{ $serviceHostName }}'\'')}'
prop_add 'nifi.remote.route.raw.cluster.hostname' '${s2s.target.hostname}'
prop_add 'nifi.remote.route.raw.cluster.port' {{ .Values.ports.remoteinput | squote }}
prop_add 'nifi.remote.route.raw.cluster.secure' 'true'
{{- /* S2S connections via Ingress */}}
prop_add 'nifi.remote.route.http.ingress.when' '${X-ProxyHost:contains('\''{{ include "nifi.siteToSiteHostName" . }}'\'')}'
prop_add 'nifi.remote.route.http.ingress.hostname' '${s2s.target.hostname}.{{ include "nifi.siteToSiteHostName" . }}'
prop_add 'nifi.remote.route.http.ingress.port' '443'
prop_add 'nifi.remote.route.http.ingress.secure' 'true'
{{- /* Replace properties not exposed by environment variables */}}
{{- if eq (include "nifi.useZooKeeper" .) "true" }}
prop_replace 'nifi.zookeeper.client.secure' 'false'
{{- else }}
prop_replace 'nifi.state.management.provider.cluster' 'kubernetes-provider'
xmlstarlet ed --inplace --update "/stateManagement/cluster-provider[id='kubernetes-provider']/property[@name='ConfigMap Name Prefix']" --value '{{ .Values.stateManagement.kubernetes.statePrefix }}' "conf/state-management.xml"
xmlstarlet ed --inplace --update "/stateManagement/cluster-provider[id='kubernetes-provider']/property[@name='ConfigMap Namespace']" --value '{{ include "nifi.stateManagementNamespace" . }}' "conf/state-management.xml"
{{- end }}
{{- /* Grant nodes cluster permissions */}}
list_identities () {
node_identities=""
for (( i = 0; i < {{ .Values.global.nifi.nodeCount }}; i++ )); do
node_dn="CN={{ include "nifi.fullname" . }}-${i}.{{ include "nifi.fullname" . }}.{{ .Release.Namespace }}"
node_identities="${node_identities}<property name=\"$1 Node-${i}\">${node_dn}</property>\n"
done
ingress_dn="CN={{ include "nifi.fullname" . }}.{{ .Release.Namespace }}"
node_identities="${node_identities}<property name=\"$1 Node-Ingress\">${ingress_dn}</property>\n"
sed -i -E "s|(<property name=\"$1 1\">.*</property>)|\1\n${node_identities}|g" "${authorizers_file}"
}
list_identities 'Node Identity'
list_identities 'Initial User Identity'
{{- /* Set file and directory paths to persistent locations */}}
{{- with .Values.persistence }}
conf_dir='./persistent_conf'
prop_replace 'nifi.flow.configuration.file' "${conf_dir}/{{ .config.files.flowJson }}"
prop_replace 'nifi.flow.configuration.archive.dir' "${conf_dir}/archive"
sed -i -E "s|(<property name=\"Authorizations File\">).*(</property>)|\1${conf_dir}/{{ .config.files.authorizations }}\2|g" "${authorizers_file}"
sed -i -E "s|(<property name=\"Users File\">).*(</property>)|\1${conf_dir}/{{ .config.files.users }}\2|g" "${authorizers_file}"
{{- end }}
{{- /* Define flowfile repository */}}
{{- with .Values.persistence.repo.flowfile }}
prop_replace 'nifi.flowfile.repository.directory' {{ printf "./%s" .mountDir | squote }}
{{- end }}
{{- /* Define content repositories */}}
prop_remove 'nifi.content.repository.directory.default'
{{- range .Values.persistence.repo.content }}
prop_add 'nifi.content.repository.directory.{{ .name }}' {{ printf "./%s" .mountDir | squote }}
{{- end }}
{{- /* Define provenance repositories */}}
prop_remove 'nifi.provenance.repository.directory.default'
{{- range .Values.persistence.repo.provenance }}
prop_add 'nifi.provenance.repository.directory.{{ .name }}' {{ printf "./%s" .mountDir | squote }}
{{- end }}
{{- /* Define custom nar library path */}}
prop_add 'nifi.nar.library.directory.custom' {{ .Values.customLibPath | squote }}
{{- /* Generate a TLS cert for this node from the CSI-provided certificates and private key */}}
{{- if .Values.global.tls.certificate }}
cert_dir='/opt/certmanager'
tls_dir='/opt/tls'
rm -f $tls_dir/*
openssl pkcs12 -export \
-in $cert_dir/tls.crt \
-inkey $cert_dir/tls.key \
-CAfile $cert_dir/ca.crt \
-passout "pass:${KEYSTORE_PASSWORD}" \
-out $tls_dir/keystore.p12
keytool -import -noprompt -trustcacerts \
-file $cert_dir/ca.crt \
-storepass "${TRUSTSTORE_PASSWORD}" \
-destkeystore $tls_dir/truststore.p12 \
-deststoretype pkcs12
{{- end }}
{{- /* Task termination period */}}
prop_replace 'graceful.shutdown.seconds' {{ .Values.shutdown.gracefulShutdownSeconds }} "${bootstrap_file}"
{{- /* Set UI autorefresh interval */}}
prop_replace 'nifi.ui.autorefresh.interval' {{ .Values.ui.refreshInterval | squote }}
{{- with .Values.ui.timeZone }}
echo 'java.arg.8=-Duser.timezone={{ . }}' >> "${bootstrap_file}"
{{- end }}
{{- with .Values.ui.maxThreads }}
prop_replace 'nifi.web.jetty.threads' {{ . | squote }}
{{- end }}
{{- /* Set user logging levels */}}
{{- range $logger, $level := .Values.logging.levels }}
xmlstarlet ed --inplace --update "//logger[@name='{{ $logger }}']/@level" --value '{{ $level }}' "${logback_file}"
{{- end }}
{{- /* Set user logging max size capping */}}
{{- range $appender, $size := .Values.logging.totalSizeCap }}
xmlstarlet ed -L -s '//appender[@name="{{ $appender }}"]/rollingPolicy' -t elem -n 'totalSizeCap' -v '{{ $size }}' "${logback_file}"
{{- end }}
{{- range $key, $value := .Values.extraConfig.nifiProperties }}
prop_replace {{ $key | squote }} {{ $value | quote }}
{{- end }}
{{- if .Values.debugStartup }}
sleep 1000000
{{- end }}
{{- with .Values.umask }}
umask {{ . }}
{{- end }}
exec $scripts_dir/start.sh
pre-stop.sh: |
#!/bin/bash
# NiFi toolkit CLI path
NIFI_CLI="/opt/nifi/nifi-toolkit-current/bin/cli.sh"
# NiFi Cluster variables (use a proper hostname substitution here)
NIFI_URL="https://{{ $serviceHostName }}:8443"
# Log file path (update to your desired destination)
LOG_FILE="/opt/nifi/nifi-current/logs/k8s-pre-stop.log"
# Redirect all output (stdout and stderr) to the log file
exec > >(tee -a "$LOG_FILE") 2>&1
# Function to get the status of the node
get_node_status() {
NODE_ID="$1"
NODE_STATUS=$($NIFI_CLI nifi get-node --nifiNodeId "$NODE_ID" -u "$NIFI_URL" -ot json | jq -r '.node.status')
echo "$NODE_STATUS"
}
# Retry function for critical steps (like disconnecting or offloading nodes)
retry_command() {
local retries=5
local wait_time=5
local cmd="$@"
for ((i=1; i<=retries; i++)); do
eval "$cmd"
if [ $? -eq 0 ]; then
return 0
fi
echo "$(date): Command failed. Retrying in $wait_time seconds... (Attempt $i/$retries)"
sleep $wait_time
done
echo "$(date): Command failed after $retries attempts. Exiting."
exit 1
}
# Get the hostname
HOSTNAME=$(hostname)
echo "$(date): Retrieving node information for the hostname: $HOSTNAME..."
# Retrieve the list of nodes
NODE_INFO=$($NIFI_CLI nifi get-nodes -u "$NIFI_URL")
# Check if NODE_INFO is empty (failed retrieval)
if [ -z "$NODE_INFO" ]; then
echo "$(date): Failed to retrieve node information. Exiting."
exit 1
fi
# Extract the node ID based on the hostname
NODE_ID=$(echo "$NODE_INFO" | grep "$HOSTNAME" | awk '{print $2}')
# Check if the NODE_ID is empty
if [ -z "$NODE_ID" ]; then
echo "$(date): Node ID for $HOSTNAME not found. Exiting."
exit 1
fi
echo "$(date): Node ID for $HOSTNAME is $NODE_ID"
# Get the current node status
CURRENT_STATUS=$(get_node_status "$NODE_ID")
echo "$(date): Current node status: $CURRENT_STATUS"
# Disconnect the current node if it is not already disconnected
if [ "$CURRENT_STATUS" != "DISCONNECTED" ]; then
echo "$(date): Disconnecting node $NODE_ID..."
retry_command "$NIFI_CLI nifi disconnect-node --nifiNodeId $NODE_ID"
echo "$(date): Node $NODE_ID disconnected."
else
echo "$(date): Node $NODE_ID is already disconnected."
fi
# Offload the current node if not already offloaded
if [ "$CURRENT_STATUS" != "OFFLOADED" ]; then
echo "$(date): Offloading node $NODE_ID..."
retry_command "$NIFI_CLI nifi offload-node --nifiNodeId $NODE_ID --connectionTimeout 60000 --readTimeout 60000 -u $NIFI_URL"
echo "$(date): Node $NODE_ID offloading..."
else
echo "$(date): Node $NODE_ID is already offloaded."
fi
# Wait for the node to be fully offloaded with a retry limit
MAX_ATTEMPTS=12 # Limit the retries to 12 (with 5-second interval = 1 minute total)
attempt=1
while [ $attempt -le $MAX_ATTEMPTS ]; do
CURRENT_STATUS=$(get_node_status "$NODE_ID")
if [ "$CURRENT_STATUS" == "OFFLOADED" ]; then
echo "$(date): Node $NODE_ID successfully offloaded."
break
else
echo "$(date): Current node status: $CURRENT_STATUS. Retrying in 5 seconds... (Attempt $attempt/$MAX_ATTEMPTS)"
attempt=$((attempt+1))
sleep 5
fi
done
# If the node hasn't offloaded after the max attempts, exit with an error
CURRENT_STATUS=$(get_node_status "$NODE_ID")
if [ "$CURRENT_STATUS" != "OFFLOADED" ]; then
echo "$(date): Node $NODE_ID failed to offload after $MAX_ATTEMPTS attempts. Exiting."
exit 1
fi
# Remove the node
echo "$(date): Removing node $NODE_ID..."
retry_command "$NIFI_CLI nifi delete-node --nifiNodeId $NODE_ID -u $NIFI_URL"
echo "$(date): Node $NODE_ID offloaded and removed successfully."
{{- with .Values.filebeat }}
{{- if .enabled }}
---
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ include "nifi.fullname" $ }}-filebeat
labels:
{{- include "nifi.labels" $ | nindent 4 }}
data:
filebeat.yml: |
tags:
{{- toYaml .tags | nindent 6 }}
filebeat.inputs:
- type: filestream
id: nifi-app
fields:
log_id: app
paths: ["/nifi/logs/nifi-app*.log"]
- type: filestream
id: nifi-request
fields:
log_id: request
paths: ["/nifi/logs/nifi-request*.log"]
- type: filestream
id: nifi-user
fields:
log_id: user
paths: ["/nifi/logs/nifi-user*.log"]
{{- if or .labels .processors }}
processors:
{{- if .labels }}
- add_labels:
labels:
{{- toYaml .labels | nindent 12 }}
{{- end }}
{{- toYaml .processors | nindent 6 }}
{{- end }}
{{ printf "output.%s:" .output.type }}
{{- toYaml .output.parameters | nindent 6 }}
queue.mem:
flush.timeout: {{ .queue.flushTimeout }}
{{- end }}
{{- end }}