open-meteo-service/app/service.py
2026-02-15 20:04:11 +02:00

78 lines
2.0 KiB
Python

import json
import os
from typing import Dict, Any
import requests
from .metrics import CACHE_HITS_TOTAL, CACHE_MISSES_TOTAL, OPENMETEO_CALLS_TOTAL
API_URL = "https://geocoding-api.open-meteo.com/v1/search"
CITIES = ["Tel Aviv", "Beer Sheva", "Jerusalem", "Szeged"]
CACHE_FILE = os.environ.get("CACHE_FILE", "coordinates_cache.json")
def _fetch_coordinates(city: str) -> Dict[str, Any]:
OPENMETEO_CALLS_TOTAL.labels(city=city).inc()
params = {"name": city, "count": 1}
r = requests.get(API_URL, params=params, timeout=10)
r.raise_for_status()
data = r.json()
if "results" not in data or not data["results"]:
raise ValueError(f"No results found for {city}")
result = data["results"][0]
return {
"name": result.get("name"),
"latitude": result.get("latitude"),
"longitude": result.get("longitude"),
"country": result.get("country"),
}
def _load_cache() -> Dict[str, Any] | None:
if os.path.exists(CACHE_FILE):
with open(CACHE_FILE, "r", encoding="utf-8") as f:
return json.load(f)
return None
def _save_cache(data: Dict[str, Any]) -> None:
with open(CACHE_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
def get_all_coordinates() -> Dict[str, Any]:
cached = _load_cache()
if cached:
CACHE_HITS_TOTAL.inc()
return {"source": "cache", "data": cached}
CACHE_MISSES_TOTAL.inc()
results: Dict[str, Any] = {}
for city in CITIES:
results[city] = _fetch_coordinates(city)
_save_cache(results)
return {"source": "open-meteo", "data": results}
def get_coordinates_for_city(city: str) -> Dict[str, Any]:
cached = _load_cache()
if cached and city in cached:
CACHE_HITS_TOTAL.inc()
return {"source": "cache", "data": cached[city]}
CACHE_MISSES_TOTAL.inc()
result = _fetch_coordinates(city)
# Update cache with the new city
if cached is None:
cached = {}
cached[city] = result
_save_cache(cached)
return {"source": "open-meteo", "data": result}