{{- $serviceHostName := printf "%s-http.%s" (include "nifi.fullname" . ) .Release.Namespace }} apiVersion: v1 kind: ConfigMap metadata: name: {{ include "nifi.fullname" . }} labels: {{- include "nifi.labels" . | nindent 4 }} data: custom-startup.sh: | #!/bin/bash -e prop_add () { target_file="${3:-${nifi_props_file}}" echo "adding property to target file ${target_file}" echo "$1=$2" >> "${target_file}" } prop_remove () { target_file="${3:-${nifi_props_file}}" echo "removing property from target file ${target_file}" sed -i -e "s|^$1=.*$||" "${target_file}" } authorizers_file='conf/authorizers.xml' bootstrap_file='conf/bootstrap.conf' nifi_properties_file='conf/nifi.properties' logback_file='conf/logback.xml' scripts_dir='/opt/nifi/scripts' [ -f "${scripts_dir}/common.sh" ] && . "${scripts_dir}/common.sh" {{- /* Set host connection properties so the node is reachable, with TLS hostname verification */}} host_name="${HOSTNAME}.{{ include "nifi.fullname" . }}.{{ .Release.Namespace }}" pod_index=$(echo ${HOSTNAME} | sed -E 's/^.*([0-9]+)$/\1/g') prop_replace 'nifi.web.https.host' "${host_name}" export NIFI_WEB_HTTPS_HOST="${host_name}" export NIFI_WEB_PROXY_HOST=" \ ${host_name}, \ {{ .Values.ingress.hostName }}, \ {{ include "nifi.siteToSiteHostName" . }}, \ {{ include "nifi.siteToSiteHostName" . }}:443, \ {{ printf "%s-${pod_index}.%s" (include "nifi.fullname" .) (include "nifi.siteToSiteHostName" .) }}, \ {{ printf "%s-${pod_index}.%s" (include "nifi.fullname" .) (include "nifi.siteToSiteHostName" .) }}:443, \ {{ $serviceHostName }}, \ {{ $serviceHostName }}:{{ .Values.ports.https }}" {{- /* S2S cluster-local connections */}} export NIFI_REMOTE_INPUT_HOST="${HOSTNAME}" prop_add 'nifi.remote.route.raw.cluster.when' '${s2s.source.hostname:equals('\''{{ $serviceHostName }}'\'')}' prop_add 'nifi.remote.route.raw.cluster.hostname' '${s2s.target.hostname}' prop_add 'nifi.remote.route.raw.cluster.port' {{ .Values.ports.remoteinput | squote }} prop_add 'nifi.remote.route.raw.cluster.secure' 'true' {{- /* S2S connections via Ingress */}} prop_add 'nifi.remote.route.http.ingress.when' '${X-ProxyHost:contains('\''{{ include "nifi.siteToSiteHostName" . }}'\'')}' prop_add 'nifi.remote.route.http.ingress.hostname' '${s2s.target.hostname}.{{ include "nifi.siteToSiteHostName" . }}' prop_add 'nifi.remote.route.http.ingress.port' '443' prop_add 'nifi.remote.route.http.ingress.secure' 'true' {{- /* Replace properties not exposed by environment variables */}} {{- if eq (include "nifi.useZooKeeper" .) "true" }} prop_replace 'nifi.zookeeper.client.secure' 'false' {{- else }} prop_replace 'nifi.state.management.provider.cluster' 'kubernetes-provider' xmlstarlet ed --inplace --update "/stateManagement/cluster-provider[id='kubernetes-provider']/property[@name='ConfigMap Name Prefix']" --value '{{ .Values.stateManagement.kubernetes.statePrefix }}' "conf/state-management.xml" xmlstarlet ed --inplace --update "/stateManagement/cluster-provider[id='kubernetes-provider']/property[@name='ConfigMap Namespace']" --value '{{ include "nifi.stateManagementNamespace" . }}' "conf/state-management.xml" {{- end }} {{- /* Grant nodes cluster permissions */}} list_identities () { node_identities="" for (( i = 0; i < {{ .Values.global.nifi.nodeCount }}; i++ )); do node_dn="CN={{ include "nifi.fullname" . }}-${i}.{{ include "nifi.fullname" . }}.{{ .Release.Namespace }}" node_identities="${node_identities}${node_dn}\n" done ingress_dn="CN={{ include "nifi.fullname" . }}.{{ .Release.Namespace }}" node_identities="${node_identities}${ingress_dn}\n" sed -i -E "s|(.*)|\1\n${node_identities}|g" "${authorizers_file}" } list_identities 'Node Identity' list_identities 'Initial User Identity' {{- /* Set file and directory paths to persistent locations */}} {{- with .Values.persistence }} conf_dir='./persistent_conf' prop_replace 'nifi.flow.configuration.file' "${conf_dir}/{{ .config.files.flowJson }}" prop_replace 'nifi.flow.configuration.archive.dir' "${conf_dir}/archive" sed -i -E "s|().*()|\1${conf_dir}/{{ .config.files.authorizations }}\2|g" "${authorizers_file}" sed -i -E "s|().*()|\1${conf_dir}/{{ .config.files.users }}\2|g" "${authorizers_file}" {{- end }} {{- /* Define flowfile repository */}} {{- with .Values.persistence.repo.flowfile }} prop_replace 'nifi.flowfile.repository.directory' {{ printf "./%s" .mountDir | squote }} {{- end }} {{- /* Define content repositories */}} prop_remove 'nifi.content.repository.directory.default' {{- range .Values.persistence.repo.content }} prop_add 'nifi.content.repository.directory.{{ .name }}' {{ printf "./%s" .mountDir | squote }} {{- end }} {{- /* Define provenance repositories */}} prop_remove 'nifi.provenance.repository.directory.default' {{- range .Values.persistence.repo.provenance }} prop_add 'nifi.provenance.repository.directory.{{ .name }}' {{ printf "./%s" .mountDir | squote }} {{- end }} {{- /* Define custom nar library path */}} prop_add 'nifi.nar.library.directory.custom' {{ .Values.customLibPath | squote }} {{- /* Generate a TLS cert for this node from the CSI-provided certificates and private key */}} {{- if .Values.global.tls.certificate }} cert_dir='/opt/certmanager' tls_dir='/opt/tls' rm -f $tls_dir/* openssl pkcs12 -export \ -in $cert_dir/tls.crt \ -inkey $cert_dir/tls.key \ -CAfile $cert_dir/ca.crt \ -passout "pass:${KEYSTORE_PASSWORD}" \ -out $tls_dir/keystore.p12 keytool -import -noprompt -trustcacerts \ -file $cert_dir/ca.crt \ -storepass "${TRUSTSTORE_PASSWORD}" \ -destkeystore $tls_dir/truststore.p12 \ -deststoretype pkcs12 {{- end }} {{- /* Task termination period */}} prop_replace 'graceful.shutdown.seconds' {{ .Values.shutdown.gracefulShutdownSeconds }} "${bootstrap_file}" {{- /* Set UI autorefresh interval */}} prop_replace 'nifi.ui.autorefresh.interval' {{ .Values.ui.refreshInterval | squote }} {{- with .Values.ui.timeZone }} echo 'java.arg.8=-Duser.timezone={{ . }}' >> "${bootstrap_file}" {{- end }} {{- with .Values.ui.maxThreads }} prop_replace 'nifi.web.jetty.threads' {{ . | squote }} {{- end }} {{- /* Set user logging levels */}} {{- range $logger, $level := .Values.logging.levels }} xmlstarlet ed --inplace --update "//logger[@name='{{ $logger }}']/@level" --value '{{ $level }}' "${logback_file}" {{- end }} {{- /* Set user logging max size capping */}} {{- range $appender, $size := .Values.logging.totalSizeCap }} xmlstarlet ed -L -s '//appender[@name="{{ $appender }}"]/rollingPolicy' -t elem -n 'totalSizeCap' -v '{{ $size }}' "${logback_file}" {{- end }} {{- range $key, $value := .Values.extraConfig.nifiProperties }} prop_replace {{ $key | squote }} {{ $value | quote }} {{- end }} {{- if .Values.debugStartup }} sleep 1000000 {{- end }} {{- with .Values.umask }} umask {{ . }} {{- end }} exec $scripts_dir/start.sh pre-stop.sh: | #!/bin/bash # NiFi toolkit CLI path NIFI_CLI="/opt/nifi/nifi-toolkit-current/bin/cli.sh" # NiFi Cluster variables (use a proper hostname substitution here) NIFI_URL="https://{{ $serviceHostName }}:8443" # Log file path (update to your desired destination) LOG_FILE="/opt/nifi/nifi-current/logs/k8s-pre-stop.log" # Redirect all output (stdout and stderr) to the log file exec > >(tee -a "$LOG_FILE") 2>&1 # Function to get the status of the node get_node_status() { NODE_ID="$1" NODE_STATUS=$($NIFI_CLI nifi get-node --nifiNodeId "$NODE_ID" -u "$NIFI_URL" -ot json | jq -r '.node.status') echo "$NODE_STATUS" } # Retry function for critical steps (like disconnecting or offloading nodes) retry_command() { local retries=5 local wait_time=5 local cmd="$@" for ((i=1; i<=retries; i++)); do eval "$cmd" if [ $? -eq 0 ]; then return 0 fi echo "$(date): Command failed. Retrying in $wait_time seconds... (Attempt $i/$retries)" sleep $wait_time done echo "$(date): Command failed after $retries attempts. Exiting." exit 1 } # Get the hostname HOSTNAME=$(hostname) echo "$(date): Retrieving node information for the hostname: $HOSTNAME..." # Retrieve the list of nodes NODE_INFO=$($NIFI_CLI nifi get-nodes -u "$NIFI_URL") # Check if NODE_INFO is empty (failed retrieval) if [ -z "$NODE_INFO" ]; then echo "$(date): Failed to retrieve node information. Exiting." exit 1 fi # Extract the node ID based on the hostname NODE_ID=$(echo "$NODE_INFO" | grep "$HOSTNAME" | awk '{print $2}') # Check if the NODE_ID is empty if [ -z "$NODE_ID" ]; then echo "$(date): Node ID for $HOSTNAME not found. Exiting." exit 1 fi echo "$(date): Node ID for $HOSTNAME is $NODE_ID" # Get the current node status CURRENT_STATUS=$(get_node_status "$NODE_ID") echo "$(date): Current node status: $CURRENT_STATUS" # Disconnect the current node if it is not already disconnected if [ "$CURRENT_STATUS" != "DISCONNECTED" ]; then echo "$(date): Disconnecting node $NODE_ID..." retry_command "$NIFI_CLI nifi disconnect-node --nifiNodeId $NODE_ID" echo "$(date): Node $NODE_ID disconnected." else echo "$(date): Node $NODE_ID is already disconnected." fi # Offload the current node if not already offloaded if [ "$CURRENT_STATUS" != "OFFLOADED" ]; then echo "$(date): Offloading node $NODE_ID..." retry_command "$NIFI_CLI nifi offload-node --nifiNodeId $NODE_ID --connectionTimeout 60000 --readTimeout 60000 -u $NIFI_URL" echo "$(date): Node $NODE_ID offloading..." else echo "$(date): Node $NODE_ID is already offloaded." fi # Wait for the node to be fully offloaded with a retry limit MAX_ATTEMPTS=12 # Limit the retries to 12 (with 5-second interval = 1 minute total) attempt=1 while [ $attempt -le $MAX_ATTEMPTS ]; do CURRENT_STATUS=$(get_node_status "$NODE_ID") if [ "$CURRENT_STATUS" == "OFFLOADED" ]; then echo "$(date): Node $NODE_ID successfully offloaded." break else echo "$(date): Current node status: $CURRENT_STATUS. Retrying in 5 seconds... (Attempt $attempt/$MAX_ATTEMPTS)" attempt=$((attempt+1)) sleep 5 fi done # If the node hasn't offloaded after the max attempts, exit with an error CURRENT_STATUS=$(get_node_status "$NODE_ID") if [ "$CURRENT_STATUS" != "OFFLOADED" ]; then echo "$(date): Node $NODE_ID failed to offload after $MAX_ATTEMPTS attempts. Exiting." exit 1 fi # Remove the node echo "$(date): Removing node $NODE_ID..." retry_command "$NIFI_CLI nifi delete-node --nifiNodeId $NODE_ID -u $NIFI_URL" echo "$(date): Node $NODE_ID offloaded and removed successfully." {{- with .Values.filebeat }} {{- if .enabled }} --- apiVersion: v1 kind: ConfigMap metadata: name: {{ include "nifi.fullname" $ }}-filebeat labels: {{- include "nifi.labels" $ | nindent 4 }} data: filebeat.yml: | tags: {{- toYaml .tags | nindent 6 }} filebeat.inputs: - type: filestream id: nifi-app fields: log_id: app paths: ["/nifi/logs/nifi-app*.log"] - type: filestream id: nifi-request fields: log_id: request paths: ["/nifi/logs/nifi-request*.log"] - type: filestream id: nifi-user fields: log_id: user paths: ["/nifi/logs/nifi-user*.log"] {{- if or .labels .processors }} processors: {{- if .labels }} - add_labels: labels: {{- toYaml .labels | nindent 12 }} {{- end }} {{- toYaml .processors | nindent 6 }} {{- end }} {{ printf "output.%s:" .output.type }} {{- toYaml .output.parameters | nindent 6 }} queue.mem: flush.timeout: {{ .queue.flushTimeout }} {{- end }} {{- end }}