apps-gitops/automation/generate-scrape-config.py
2025-06-26 05:53:26 +03:00

96 lines
2.8 KiB
Python

import os
from ruamel.yaml import YAML
from ruamel.yaml.scalarstring import LiteralScalarString
from pathlib import Path
from io import StringIO
REPOS = {
"dev-tools": "https://git.dvirlabs.com/dvirlabs/dev-tools.git",
"infra": "https://git.dvirlabs.com/dvirlabs/infra.git",
"observability-stack": "https://git.dvirlabs.com/dvirlabs/observability-stack.git"
}
TMP_DIR = ".tmp-repos"
OUTPUT_FILE = os.path.join(
TMP_DIR,
"observability-stack/manifests/prometheus-scrape-secret/additional-scrape-configs.yaml"
)
os.makedirs(TMP_DIR, exist_ok=True)
def collect_jobs():
grouped_jobs = {}
for repo_name, repo_url in REPOS.items():
repo_path = os.path.join(TMP_DIR, repo_name)
if not os.path.exists(repo_path):
os.system(f"git clone --depth 1 {repo_url} {repo_path}")
for path in Path(repo_path, "manifests").glob("*/monitoring.yaml"):
with open(path) as f:
data = YAML().load(f)
if not data.get("enabled") or "targets" not in data:
continue
entry = {
"targets": data["targets"]
}
# Optional Prometheus config
for field in ("basic_auth", "bearer_token", "bearer_token_file", "metrics_path", "scheme"):
if field in data:
entry[field] = data[field]
if repo_name not in grouped_jobs:
grouped_jobs[repo_name] = []
grouped_jobs[repo_name].append(entry)
# Convert to Prometheus scrape_config format
result = []
for repo, entries in grouped_jobs.items():
result.append({
"job_name": repo,
"static_configs": [{"targets": e["targets"]} for e in entries],
**{k: v for e in entries for k, v in e.items() if k not in ["targets"]}
})
return result
def write_scrape_config(jobs, output_file):
stream = StringIO()
yaml_writer = YAML()
yaml_writer.default_flow_style = False
yaml_writer.dump(jobs, stream)
scrape_yaml = "# This content will be auto-updated by the pipeline\n" + stream.getvalue()
secret = {
"apiVersion": "v1",
"kind": "Secret",
"metadata": {
"name": "prometheus-additional-scrape-configs",
"namespace": "monitoring",
"labels": {
"app.kubernetes.io/name": "prometheus"
}
},
"type": "Opaque",
"stringData": {
"additional-scrape-configs.yaml": LiteralScalarString(scrape_yaml)
}
}
os.makedirs(os.path.dirname(output_file), exist_ok=True)
with open(output_file, "w") as f:
yaml_writer.dump(secret, f)
if __name__ == "__main__":
jobs = collect_jobs()
write_scrape_config(jobs, OUTPUT_FILE)
print(f"✅ Generated: {OUTPUT_FILE}")