Update the script of uptimekuma

This commit is contained in:
dvirlabs 2025-07-18 19:14:12 +03:00
parent 5a6bd190bc
commit b6b28dce6d

View File

@ -1,80 +1,94 @@
import os import os
import requests
import yaml import yaml
import subprocess import subprocess
from uptime_kuma_api import UptimeKumaApi, MonitorType, AuthMethod
KUMA_URL = os.environ["KUMA_URL"].rstrip("/") KUMA_URL = os.environ["KUMA_URL"].rstrip("/")
KUMA_USERNAME = os.environ["KUMA_USERNAME"] KUMA_USERNAME = os.environ["KUMA_USERNAME"]
KUMA_PASSWORD = os.environ["KUMA_PASSWORD"] KUMA_PASSWORD = os.environ["KUMA_PASSWORD"]
GIT_TOKEN = os.environ["GIT_TOKEN"]
GIT_REPOS = [ GIT_REPOS = [
# List your app repos to scan here
"https://git.dvirlabs.com/dvirlabs/dev-tools.git", "https://git.dvirlabs.com/dvirlabs/dev-tools.git",
"https://git.dvirlabs.com/dvirlabs/infra.git", "https://git.dvirlabs.com/dvirlabs/infra.git",
"https://git.dvirlabs.com/dvirlabs/observability-stack.git", "https://git.dvirlabs.com/dvirlabs/observability-stack.git",
"https://git.dvirlabs.com/dvirlabs/my-apps.git" "https://git.dvirlabs.com/dvirlabs/my-apps.git"
] ]
GIT_TOKEN = os.environ["GIT_TOKEN"]
def login_kuma():
session = requests.Session()
r = session.post(f"{KUMA_URL}/api/login", json={
"username": KUMA_USERNAME,
"password": KUMA_PASSWORD
})
r.raise_for_status()
return session
def find_monitoring_files(clone_path): def find_monitoring_files(clone_path):
result = [] for root, _, files in os.walk(clone_path):
for root, dirs, files in os.walk(clone_path):
for file in files: for file in files:
if file == "monitoring.yaml": if file == "monitoring.yaml":
result.append(os.path.join(root, file)) yield os.path.join(root, file)
return result
def sync_monitors(session, yaml_path): def sync_monitor(api, path):
with open(yaml_path, "r") as f: with open(path, "r") as f:
content = yaml.safe_load(f) config = yaml.safe_load(f)
kuma = content.get("uptime_kuma") kuma_cfg = config.get("uptime_kuma")
if not kuma or not kuma.get("enabled"): if not kuma_cfg or not kuma_cfg.get("enabled"):
return return
payload = { name = config.get("app") or os.path.basename(os.path.dirname(path))
"type": kuma.get("type", "http"), url = kuma_cfg["url"]
"name": kuma["name"], tag_name = kuma_cfg.get("tag", "uncategorized")
"url": kuma["url"],
"interval": kuma.get("interval", 60), print(f"🔄 Syncing monitor: {name}")
"retryInterval": 60,
"maxretries": 3, # Create or get tag
"method": "GET", tag_id = None
"upsideDown": kuma.get("upsideDown", False), for tag in api.get_tags():
"tags": kuma.get("tag", "") if tag["name"] == tag_name:
tag_id = tag["id"]
break
if not tag_id:
tag_id = api.add_tag(name=tag_name, color="#2196f3")["tagID"]
# Prepare Basic Auth using Kuma credentials
auth_kwargs = {
"authMethod": AuthMethod.HTTP_BASIC,
"basic_auth_user": KUMA_USERNAME,
"basic_auth_pass": KUMA_PASSWORD,
} }
if "keyword" in kuma: # Add the monitor
payload["keyword"] = kuma["keyword"] resp = api.add_monitor(
type=MonitorType.HTTP,
name=name,
url=url,
interval=60,
retryInterval=30,
resendInterval=0,
maxretries=3,
timeout=30,
upsideDown=False,
ignoreTls=False,
maxredirects=10,
accepted_statuscodes=["200-299"],
expiryNotification=True,
method="GET",
**auth_kwargs
)
print(f"➡️ Creating/updating monitor: {payload['name']}") monitor_id = resp["monitorID"]
r = session.post(f"{KUMA_URL}/api/monitor/add", json=payload) api.add_monitor_tag(tag_id=tag_id, monitor_id=monitor_id, value=tag_name)
if r.status_code != 200: print(f"✅ Synced monitor '{name}' with tag '{tag_name}'")
print(f"❌ Failed to add monitor {payload['name']}: {r.text}")
else:
print(f"✅ Monitor synced: {payload['name']}")
def clone_repo(repo_url, dest): def clone_repo(url, dest):
subprocess.run(["git", "clone", "--depth=1", repo_url, dest], check=True) print(f"📥 Cloning {url} into {dest}")
subprocess.run(["rm", "-rf", dest], check=True)
subprocess.run(["git", "clone", "--depth=1", url.replace("https://", f"https://{GIT_TOKEN}@"), dest], check=True)
def main(): def main():
session = login_kuma() api = UptimeKumaApi(KUMA_URL)
for repo_url in GIT_REPOS: api.login(KUMA_USERNAME, KUMA_PASSWORD)
name = repo_url.split("/")[-1].replace(".git", "")
dest = f"/tmp/repos/{name}" for repo in GIT_REPOS:
clone_repo(repo_url.format(token=GIT_TOKEN), dest) name = repo.split("/")[-1].replace(".git", "")
yamls = find_monitoring_files(dest) path = f"/tmp/repos/{name}"
for yml in yamls: clone_repo(repo, path)
sync_monitors(session, yml) for yml in find_monitoring_files(path):
sync_monitor(api, yml)
if __name__ == "__main__": if __name__ == "__main__":
main() main()