120 lines
3.2 KiB
Python
120 lines
3.2 KiB
Python
import os
|
|
import json
|
|
from typing import List, Optional, Dict, Any
|
|
|
|
import psycopg2
|
|
from psycopg2.extras import RealDictCursor
|
|
from dotenv import load_dotenv
|
|
|
|
# load .env for local/dev; in K8s you'll use environment variables
|
|
load_dotenv()
|
|
|
|
# Prefer explicit envs (K8s), fallback to DATABASE_URL (local dev)
|
|
DB_HOST = os.getenv("DB_HOST")
|
|
DB_PORT = os.getenv("DB_PORT", "5432")
|
|
DB_NAME = os.getenv("DB_NAME")
|
|
DB_USER = os.getenv("DB_USER")
|
|
DB_PASSWORD = os.getenv("DB_PASSWORD")
|
|
|
|
DATABASE_URL = os.getenv(
|
|
"DATABASE_URL",
|
|
"postgresql://user:password@localhost:5432/recipes_db",
|
|
)
|
|
|
|
|
|
def _build_dsn() -> str:
|
|
if DB_HOST and DB_NAME and DB_USER and DB_PASSWORD:
|
|
return (
|
|
f"dbname={DB_NAME} user={DB_USER} password={DB_PASSWORD} "
|
|
f"host={DB_HOST} port={DB_PORT}"
|
|
)
|
|
if DATABASE_URL:
|
|
return DATABASE_URL
|
|
raise RuntimeError(
|
|
"No DB configuration found. Set DB_HOST/DB_NAME/DB_USER/DB_PASSWORD "
|
|
"or DATABASE_URL."
|
|
)
|
|
|
|
|
|
def get_conn():
|
|
dsn = _build_dsn()
|
|
return psycopg2.connect(dsn, cursor_factory=RealDictCursor)
|
|
|
|
|
|
def list_recipes_db() -> List[Dict[str, Any]]:
|
|
conn = get_conn()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT id, name, meal_type, time_minutes,
|
|
tags, ingredients, steps
|
|
FROM recipes
|
|
ORDER BY id
|
|
"""
|
|
)
|
|
rows = cur.fetchall()
|
|
return rows
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def create_recipe_db(recipe_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
recipe_data keys: name, meal_type, time_minutes, tags, ingredients, steps
|
|
"""
|
|
conn = get_conn()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO recipes (name, meal_type, time_minutes, tags, ingredients, steps)
|
|
VALUES (%s, %s, %s, %s, %s, %s)
|
|
RETURNING id, name, meal_type, time_minutes, tags, ingredients, steps
|
|
""",
|
|
(
|
|
recipe_data["name"],
|
|
recipe_data["meal_type"],
|
|
recipe_data["time_minutes"],
|
|
json.dumps(recipe_data.get("tags", [])),
|
|
json.dumps(recipe_data.get("ingredients", [])),
|
|
json.dumps(recipe_data.get("steps", [])),
|
|
),
|
|
)
|
|
row = cur.fetchone()
|
|
conn.commit()
|
|
return row
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_recipes_by_filters_db(
|
|
meal_type: Optional[str],
|
|
max_time: Optional[int],
|
|
) -> List[Dict[str, Any]]:
|
|
conn = get_conn()
|
|
try:
|
|
query = """
|
|
SELECT id, name, meal_type, time_minutes,
|
|
tags, ingredients, steps
|
|
FROM recipes
|
|
WHERE 1=1
|
|
"""
|
|
params: List = []
|
|
|
|
if meal_type:
|
|
query += " AND meal_type = %s"
|
|
params.append(meal_type.lower())
|
|
|
|
if max_time:
|
|
query += " AND time_minutes <= %s"
|
|
params.append(max_time)
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute(query, params)
|
|
rows = cur.fetchall()
|
|
|
|
return rows
|
|
finally:
|
|
conn.close()
|