import os import requests import yaml import subprocess KUMA_URL = os.environ["KUMA_URL"].rstrip("/") KUMA_USERNAME = os.environ["KUMA_USERNAME"] KUMA_PASSWORD = os.environ["KUMA_PASSWORD"] GIT_REPOS = [ # List your app repos to scan here "https://git.dvirlabs.com/dvirlabs/dev-tools.git", "https://git.dvirlabs.com/dvirlabs/infra.git", "https://git.dvirlabs.com/dvirlabs/observability-stack.git", "https://git.dvirlabs.com/dvirlabs/my-apps.git" ] GIT_TOKEN = os.environ["GIT_TOKEN"] def login_kuma(): session = requests.Session() r = session.post(f"{KUMA_URL}/api/login", json={ "username": KUMA_USERNAME, "password": KUMA_PASSWORD }) r.raise_for_status() return session def find_monitoring_files(clone_path): result = [] for root, dirs, files in os.walk(clone_path): for file in files: if file == "monitoring.yaml": result.append(os.path.join(root, file)) return result def sync_monitors(session, yaml_path): with open(yaml_path, "r") as f: content = yaml.safe_load(f) kuma = content.get("uptime_kuma") if not kuma or not kuma.get("enabled"): return payload = { "type": kuma.get("type", "http"), "name": kuma["name"], "url": kuma["url"], "interval": kuma.get("interval", 60), "retryInterval": 60, "maxretries": 3, "method": "GET", "upsideDown": kuma.get("upsideDown", False), "tags": kuma.get("tag", "") } if "keyword" in kuma: payload["keyword"] = kuma["keyword"] print(f"➡️ Creating/updating monitor: {payload['name']}") r = session.post(f"{KUMA_URL}/api/monitor/add", json=payload) if r.status_code != 200: print(f"❌ Failed to add monitor {payload['name']}: {r.text}") else: print(f"✅ Monitor synced: {payload['name']}") def clone_repo(repo_url, dest): subprocess.run(["git", "clone", "--depth=1", repo_url, dest], check=True) def main(): session = login_kuma() for repo_url in GIT_REPOS: name = repo_url.split("/")[-1].replace(".git", "") dest = f"/tmp/repos/{name}" clone_repo(repo_url.format(token=GIT_TOKEN), dest) yamls = find_monitoring_files(dest) for yml in yamls: sync_monitors(session, yml) if __name__ == "__main__": main()