import os import yaml import subprocess from uptime_kuma_api import UptimeKumaApi, MonitorType, AuthMethod KUMA_URL = os.environ["KUMA_URL"].rstrip("/") KUMA_USERNAME = os.environ["KUMA_USERNAME"] KUMA_PASSWORD = os.environ["KUMA_PASSWORD"] GIT_TOKEN = os.environ["GIT_TOKEN"] GIT_REPOS = [ "https://git.dvirlabs.com/dvirlabs/dev-tools.git", "https://git.dvirlabs.com/dvirlabs/infra.git", "https://git.dvirlabs.com/dvirlabs/observability-stack.git", "https://git.dvirlabs.com/dvirlabs/my-apps.git" ] def find_monitoring_files(clone_path): for root, _, files in os.walk(clone_path): for file in files: if file == "monitoring.yaml": yield os.path.join(root, file) def sync_monitor(api, path): with open(path, "r") as f: config = yaml.safe_load(f) kuma_cfg = config.get("uptime_kuma") if not kuma_cfg or not kuma_cfg.get("enabled"): return name = config.get("app") or os.path.basename(os.path.dirname(path)) url = kuma_cfg["url"] tag_name = kuma_cfg.get("tag", "uncategorized") print(f"🔄 Syncing monitor: {name}") # Check if monitor already exists for monitor in api.get_monitors(): if monitor["name"] == name: print(f"⏭️ Monitor '{name}' already exists — skipping") return # Get or create tag safely tag_id = None for tag in api.get_tags(): if tag["name"] == tag_name: tag_id = tag["id"] break if not tag_id: tag_resp = api.add_tag(name=tag_name, color="#2196f3") tag_id = tag_resp.get("tagID") if not tag_id: for tag in api.get_tags(): if tag["name"] == tag_name: tag_id = tag["id"] break if not tag_id: raise Exception(f"❌ Failed to create or find tag: {tag_name}") # Prepare Basic Auth using Kuma credentials auth_kwargs = { "authMethod": AuthMethod.HTTP_BASIC, "basic_auth_user": KUMA_USERNAME, "basic_auth_pass": KUMA_PASSWORD, } # Add the monitor resp = api.add_monitor( type=MonitorType.HTTP, name=name, url=url, interval=60, retryInterval=30, resendInterval=0, maxretries=3, timeout=30, upsideDown=False, ignoreTls=False, maxredirects=10, accepted_statuscodes=["200-299"], expiryNotification=True, method="GET", **auth_kwargs ) monitor_id = resp["monitorID"] api.add_monitor_tag(tag_id=tag_id, monitor_id=monitor_id, value="") print(f"✅ Synced monitor '{name}' with tag '{tag_name}'") def clone_repo(url, dest): print(f"📥 Cloning {url} into {dest}") subprocess.run(["rm", "-rf", dest], check=True) subprocess.run(["git", "clone", "--depth=1", url.replace("https://", f"https://{GIT_TOKEN}@"), dest], check=True) def main(): api = UptimeKumaApi(KUMA_URL) api.login(KUMA_USERNAME, KUMA_PASSWORD) for repo in GIT_REPOS: name = repo.split("/")[-1].replace(".git", "") path = f"/tmp/repos/{name}" clone_repo(repo, path) for yml in find_monitoring_files(path): sync_monitor(api, yml) if __name__ == "__main__": main()