327 lines
12 KiB
YAML
327 lines
12 KiB
YAML
{{- $serviceHostName := printf "%s-http.%s" (include "nifi.fullname" . ) .Release.Namespace }}
|
|
apiVersion: v1
|
|
kind: ConfigMap
|
|
metadata:
|
|
name: {{ include "nifi.fullname" . }}
|
|
labels:
|
|
{{- include "nifi.labels" . | nindent 4 }}
|
|
data:
|
|
custom-startup.sh: |
|
|
#!/bin/bash -e
|
|
|
|
prop_add () {
|
|
target_file="${3:-${nifi_props_file}}"
|
|
echo "adding property to target file ${target_file}"
|
|
echo "$1=$2" >> "${target_file}"
|
|
}
|
|
|
|
prop_remove () {
|
|
target_file="${3:-${nifi_props_file}}"
|
|
echo "removing property from target file ${target_file}"
|
|
sed -i -e "s|^$1=.*$||" "${target_file}"
|
|
}
|
|
|
|
authorizers_file='conf/authorizers.xml'
|
|
bootstrap_file='conf/bootstrap.conf'
|
|
nifi_properties_file='conf/nifi.properties'
|
|
logback_file='conf/logback.xml'
|
|
scripts_dir='/opt/nifi/scripts'
|
|
[ -f "${scripts_dir}/common.sh" ] && . "${scripts_dir}/common.sh"
|
|
|
|
{{- /* Set host connection properties so the node is reachable, with TLS hostname verification */}}
|
|
host_name="${HOSTNAME}.{{ include "nifi.fullname" . }}.{{ .Release.Namespace }}"
|
|
pod_index=$(echo ${HOSTNAME} | sed -E 's/^.*([0-9]+)$/\1/g')
|
|
|
|
prop_replace 'nifi.web.https.host' "${host_name}"
|
|
export NIFI_WEB_HTTPS_HOST="${host_name}"
|
|
export NIFI_WEB_PROXY_HOST=" \
|
|
${host_name}, \
|
|
{{ .Values.ingress.hostName }}, \
|
|
{{ include "nifi.siteToSiteHostName" . }}, \
|
|
{{ include "nifi.siteToSiteHostName" . }}:443, \
|
|
{{ printf "%s-${pod_index}.%s" (include "nifi.fullname" .) (include "nifi.siteToSiteHostName" .) }}, \
|
|
{{ printf "%s-${pod_index}.%s" (include "nifi.fullname" .) (include "nifi.siteToSiteHostName" .) }}:443, \
|
|
{{ $serviceHostName }}, \
|
|
{{ $serviceHostName }}:{{ .Values.ports.https }}"
|
|
|
|
{{- /* S2S cluster-local connections */}}
|
|
export NIFI_REMOTE_INPUT_HOST="${HOSTNAME}"
|
|
prop_add 'nifi.remote.route.raw.cluster.when' '${s2s.source.hostname:equals('\''{{ $serviceHostName }}'\'')}'
|
|
prop_add 'nifi.remote.route.raw.cluster.hostname' '${s2s.target.hostname}'
|
|
prop_add 'nifi.remote.route.raw.cluster.port' {{ .Values.ports.remoteinput | squote }}
|
|
prop_add 'nifi.remote.route.raw.cluster.secure' 'true'
|
|
|
|
{{- /* S2S connections via Ingress */}}
|
|
prop_add 'nifi.remote.route.http.ingress.when' '${X-ProxyHost:contains('\''{{ include "nifi.siteToSiteHostName" . }}'\'')}'
|
|
prop_add 'nifi.remote.route.http.ingress.hostname' '${s2s.target.hostname}.{{ include "nifi.siteToSiteHostName" . }}'
|
|
prop_add 'nifi.remote.route.http.ingress.port' '443'
|
|
prop_add 'nifi.remote.route.http.ingress.secure' 'true'
|
|
|
|
{{- /* Replace properties not exposed by environment variables */}}
|
|
{{- if eq (include "nifi.useZooKeeper" .) "true" }}
|
|
prop_replace 'nifi.zookeeper.client.secure' 'false'
|
|
{{- else }}
|
|
prop_replace 'nifi.state.management.provider.cluster' 'kubernetes-provider'
|
|
xmlstarlet ed --inplace --update "/stateManagement/cluster-provider[id='kubernetes-provider']/property[@name='ConfigMap Name Prefix']" --value '{{ .Values.stateManagement.kubernetes.statePrefix }}' "conf/state-management.xml"
|
|
xmlstarlet ed --inplace --update "/stateManagement/cluster-provider[id='kubernetes-provider']/property[@name='ConfigMap Namespace']" --value '{{ include "nifi.stateManagementNamespace" . }}' "conf/state-management.xml"
|
|
{{- end }}
|
|
|
|
{{- /* Grant nodes cluster permissions */}}
|
|
list_identities () {
|
|
node_identities=""
|
|
for (( i = 0; i < {{ .Values.global.nifi.nodeCount }}; i++ )); do
|
|
node_dn="CN={{ include "nifi.fullname" . }}-${i}.{{ include "nifi.fullname" . }}.{{ .Release.Namespace }}"
|
|
node_identities="${node_identities}<property name=\"$1 Node-${i}\">${node_dn}</property>\n"
|
|
done
|
|
ingress_dn="CN={{ include "nifi.fullname" . }}.{{ .Release.Namespace }}"
|
|
node_identities="${node_identities}<property name=\"$1 Node-Ingress\">${ingress_dn}</property>\n"
|
|
sed -i -E "s|(<property name=\"$1 1\">.*</property>)|\1\n${node_identities}|g" "${authorizers_file}"
|
|
}
|
|
list_identities 'Node Identity'
|
|
list_identities 'Initial User Identity'
|
|
|
|
{{- /* Set file and directory paths to persistent locations */}}
|
|
{{- with .Values.persistence }}
|
|
conf_dir='./persistent_conf'
|
|
prop_replace 'nifi.flow.configuration.file' "${conf_dir}/{{ .config.files.flowJson }}"
|
|
prop_replace 'nifi.flow.configuration.archive.dir' "${conf_dir}/archive"
|
|
|
|
sed -i -E "s|(<property name=\"Authorizations File\">).*(</property>)|\1${conf_dir}/{{ .config.files.authorizations }}\2|g" "${authorizers_file}"
|
|
sed -i -E "s|(<property name=\"Users File\">).*(</property>)|\1${conf_dir}/{{ .config.files.users }}\2|g" "${authorizers_file}"
|
|
{{- end }}
|
|
|
|
{{- /* Define flowfile repository */}}
|
|
{{- with .Values.persistence.repo.flowfile }}
|
|
prop_replace 'nifi.flowfile.repository.directory' {{ printf "./%s" .mountDir | squote }}
|
|
{{- end }}
|
|
|
|
{{- /* Define content repositories */}}
|
|
prop_remove 'nifi.content.repository.directory.default'
|
|
{{- range .Values.persistence.repo.content }}
|
|
prop_add 'nifi.content.repository.directory.{{ .name }}' {{ printf "./%s" .mountDir | squote }}
|
|
{{- end }}
|
|
|
|
{{- /* Define provenance repositories */}}
|
|
prop_remove 'nifi.provenance.repository.directory.default'
|
|
{{- range .Values.persistence.repo.provenance }}
|
|
prop_add 'nifi.provenance.repository.directory.{{ .name }}' {{ printf "./%s" .mountDir | squote }}
|
|
{{- end }}
|
|
|
|
{{- /* Define custom nar library path */}}
|
|
prop_add 'nifi.nar.library.directory.custom' {{ .Values.customLibPath | squote }}
|
|
|
|
{{- /* Generate a TLS cert for this node from the CSI-provided certificates and private key */}}
|
|
{{- if .Values.global.tls.certificate }}
|
|
cert_dir='/opt/certmanager'
|
|
tls_dir='/opt/tls'
|
|
rm -f $tls_dir/*
|
|
openssl pkcs12 -export \
|
|
-in $cert_dir/tls.crt \
|
|
-inkey $cert_dir/tls.key \
|
|
-CAfile $cert_dir/ca.crt \
|
|
-passout "pass:${KEYSTORE_PASSWORD}" \
|
|
-out $tls_dir/keystore.p12
|
|
keytool -import -noprompt -trustcacerts \
|
|
-file $cert_dir/ca.crt \
|
|
-storepass "${TRUSTSTORE_PASSWORD}" \
|
|
-destkeystore $tls_dir/truststore.p12 \
|
|
-deststoretype pkcs12
|
|
{{- end }}
|
|
|
|
{{- /* Task termination period */}}
|
|
prop_replace 'graceful.shutdown.seconds' {{ .Values.shutdown.gracefulShutdownSeconds }} "${bootstrap_file}"
|
|
|
|
{{- /* Set UI autorefresh interval */}}
|
|
prop_replace 'nifi.ui.autorefresh.interval' {{ .Values.ui.refreshInterval | squote }}
|
|
|
|
{{- with .Values.ui.timeZone }}
|
|
echo 'java.arg.8=-Duser.timezone={{ . }}' >> "${bootstrap_file}"
|
|
{{- end }}
|
|
|
|
{{- with .Values.ui.maxThreads }}
|
|
prop_replace 'nifi.web.jetty.threads' {{ . | squote }}
|
|
{{- end }}
|
|
|
|
{{- /* Set user logging levels */}}
|
|
{{- range $logger, $level := .Values.logging.levels }}
|
|
xmlstarlet ed --inplace --update "//logger[@name='{{ $logger }}']/@level" --value '{{ $level }}' "${logback_file}"
|
|
{{- end }}
|
|
|
|
{{- /* Set user logging max size capping */}}
|
|
{{- range $appender, $size := .Values.logging.totalSizeCap }}
|
|
xmlstarlet ed -L -s '//appender[@name="{{ $appender }}"]/rollingPolicy' -t elem -n 'totalSizeCap' -v '{{ $size }}' "${logback_file}"
|
|
{{- end }}
|
|
|
|
{{- range $key, $value := .Values.extraConfig.nifiProperties }}
|
|
prop_replace {{ $key | squote }} {{ $value | quote }}
|
|
{{- end }}
|
|
|
|
{{- if .Values.debugStartup }}
|
|
sleep 1000000
|
|
{{- end }}
|
|
|
|
{{- with .Values.umask }}
|
|
umask {{ . }}
|
|
{{- end }}
|
|
|
|
exec $scripts_dir/start.sh
|
|
pre-stop.sh: |
|
|
#!/bin/bash
|
|
|
|
# NiFi toolkit CLI path
|
|
NIFI_CLI="/opt/nifi/nifi-toolkit-current/bin/cli.sh"
|
|
|
|
# NiFi Cluster variables (use a proper hostname substitution here)
|
|
NIFI_URL="https://{{ $serviceHostName }}:8443"
|
|
|
|
# Log file path (update to your desired destination)
|
|
LOG_FILE="/opt/nifi/nifi-current/logs/k8s-pre-stop.log"
|
|
|
|
# Redirect all output (stdout and stderr) to the log file
|
|
exec > >(tee -a "$LOG_FILE") 2>&1
|
|
|
|
# Function to get the status of the node
|
|
get_node_status() {
|
|
NODE_ID="$1"
|
|
NODE_STATUS=$($NIFI_CLI nifi get-node --nifiNodeId "$NODE_ID" -u "$NIFI_URL" -ot json | jq -r '.node.status')
|
|
echo "$NODE_STATUS"
|
|
}
|
|
|
|
# Retry function for critical steps (like disconnecting or offloading nodes)
|
|
retry_command() {
|
|
local retries=5
|
|
local wait_time=5
|
|
local cmd="$@"
|
|
|
|
for ((i=1; i<=retries; i++)); do
|
|
eval "$cmd"
|
|
if [ $? -eq 0 ]; then
|
|
return 0
|
|
fi
|
|
echo "$(date): Command failed. Retrying in $wait_time seconds... (Attempt $i/$retries)"
|
|
sleep $wait_time
|
|
done
|
|
|
|
echo "$(date): Command failed after $retries attempts. Exiting."
|
|
exit 1
|
|
}
|
|
|
|
# Get the hostname
|
|
HOSTNAME=$(hostname)
|
|
echo "$(date): Retrieving node information for the hostname: $HOSTNAME..."
|
|
|
|
# Retrieve the list of nodes
|
|
NODE_INFO=$($NIFI_CLI nifi get-nodes -u "$NIFI_URL")
|
|
|
|
# Check if NODE_INFO is empty (failed retrieval)
|
|
if [ -z "$NODE_INFO" ]; then
|
|
echo "$(date): Failed to retrieve node information. Exiting."
|
|
exit 1
|
|
fi
|
|
|
|
# Extract the node ID based on the hostname
|
|
NODE_ID=$(echo "$NODE_INFO" | grep "$HOSTNAME" | awk '{print $2}')
|
|
|
|
# Check if the NODE_ID is empty
|
|
if [ -z "$NODE_ID" ]; then
|
|
echo "$(date): Node ID for $HOSTNAME not found. Exiting."
|
|
exit 1
|
|
fi
|
|
|
|
echo "$(date): Node ID for $HOSTNAME is $NODE_ID"
|
|
|
|
# Get the current node status
|
|
CURRENT_STATUS=$(get_node_status "$NODE_ID")
|
|
echo "$(date): Current node status: $CURRENT_STATUS"
|
|
|
|
# Disconnect the current node if it is not already disconnected
|
|
if [ "$CURRENT_STATUS" != "DISCONNECTED" ]; then
|
|
echo "$(date): Disconnecting node $NODE_ID..."
|
|
retry_command "$NIFI_CLI nifi disconnect-node --nifiNodeId $NODE_ID"
|
|
echo "$(date): Node $NODE_ID disconnected."
|
|
else
|
|
echo "$(date): Node $NODE_ID is already disconnected."
|
|
fi
|
|
|
|
# Offload the current node if not already offloaded
|
|
if [ "$CURRENT_STATUS" != "OFFLOADED" ]; then
|
|
echo "$(date): Offloading node $NODE_ID..."
|
|
retry_command "$NIFI_CLI nifi offload-node --nifiNodeId $NODE_ID --connectionTimeout 60000 --readTimeout 60000 -u $NIFI_URL"
|
|
echo "$(date): Node $NODE_ID offloading..."
|
|
else
|
|
echo "$(date): Node $NODE_ID is already offloaded."
|
|
fi
|
|
|
|
# Wait for the node to be fully offloaded with a retry limit
|
|
MAX_ATTEMPTS=12 # Limit the retries to 12 (with 5-second interval = 1 minute total)
|
|
attempt=1
|
|
while [ $attempt -le $MAX_ATTEMPTS ]; do
|
|
CURRENT_STATUS=$(get_node_status "$NODE_ID")
|
|
if [ "$CURRENT_STATUS" == "OFFLOADED" ]; then
|
|
echo "$(date): Node $NODE_ID successfully offloaded."
|
|
break
|
|
else
|
|
echo "$(date): Current node status: $CURRENT_STATUS. Retrying in 5 seconds... (Attempt $attempt/$MAX_ATTEMPTS)"
|
|
attempt=$((attempt+1))
|
|
sleep 5
|
|
fi
|
|
done
|
|
|
|
# If the node hasn't offloaded after the max attempts, exit with an error
|
|
CURRENT_STATUS=$(get_node_status "$NODE_ID")
|
|
if [ "$CURRENT_STATUS" != "OFFLOADED" ]; then
|
|
echo "$(date): Node $NODE_ID failed to offload after $MAX_ATTEMPTS attempts. Exiting."
|
|
exit 1
|
|
fi
|
|
|
|
# Remove the node
|
|
echo "$(date): Removing node $NODE_ID..."
|
|
retry_command "$NIFI_CLI nifi delete-node --nifiNodeId $NODE_ID -u $NIFI_URL"
|
|
echo "$(date): Node $NODE_ID offloaded and removed successfully."
|
|
|
|
|
|
{{- with .Values.filebeat }}
|
|
{{- if .enabled }}
|
|
---
|
|
apiVersion: v1
|
|
kind: ConfigMap
|
|
metadata:
|
|
name: {{ include "nifi.fullname" $ }}-filebeat
|
|
labels:
|
|
{{- include "nifi.labels" $ | nindent 4 }}
|
|
data:
|
|
filebeat.yml: |
|
|
tags:
|
|
{{- toYaml .tags | nindent 6 }}
|
|
filebeat.inputs:
|
|
- type: filestream
|
|
id: nifi-app
|
|
fields:
|
|
log_id: app
|
|
paths: ["/nifi/logs/nifi-app*.log"]
|
|
- type: filestream
|
|
id: nifi-request
|
|
fields:
|
|
log_id: request
|
|
paths: ["/nifi/logs/nifi-request*.log"]
|
|
- type: filestream
|
|
id: nifi-user
|
|
fields:
|
|
log_id: user
|
|
paths: ["/nifi/logs/nifi-user*.log"]
|
|
{{- if or .labels .processors }}
|
|
processors:
|
|
{{- if .labels }}
|
|
- add_labels:
|
|
labels:
|
|
{{- toYaml .labels | nindent 12 }}
|
|
{{- end }}
|
|
{{- toYaml .processors | nindent 6 }}
|
|
{{- end }}
|
|
{{ printf "output.%s:" .output.type }}
|
|
{{- toYaml .output.parameters | nindent 6 }}
|
|
queue.mem:
|
|
flush.timeout: {{ .queue.flushTimeout }}
|
|
{{- end }}
|
|
{{- end }}
|