my-recipes/backend/backup_restore_api.py

261 lines
7.0 KiB
Python

"""
Backup and Restore API endpoints for database management.
Admin-only access required.
"""
import os
import subprocess
import gzip
import shutil
from datetime import datetime
from typing import List
import boto3
from botocore.exceptions import ClientError
from dotenv import load_dotenv
load_dotenv()
def get_environment() -> str:
"""Detect environment based on FRONTEND_URL"""
frontend_url = os.getenv('FRONTEND_URL', 'http://localhost:5174')
if 'myrecipes.dvirlabs.com' in frontend_url or 'my-recipes.dvirlabs.com' in frontend_url:
return 'prod'
return 'dev'
def get_r2_bucket() -> str:
"""Get R2 bucket name based on environment"""
env = get_environment()
return f'my-recipes-db-bkp-{env}'
def get_r2_client():
"""Get configured R2 client"""
return boto3.client(
's3',
endpoint_url=os.getenv('R2_ENDPOINT'),
aws_access_key_id=os.getenv('R2_ACCESS_KEY'),
aws_secret_access_key=os.getenv('R2_SECRET_KEY'),
region_name='auto'
)
def create_db_dump() -> str:
"""Create a database dump file"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
env = get_environment()
backup_dir = os.path.join(os.path.dirname(__file__), 'backups')
os.makedirs(backup_dir, exist_ok=True)
dump_file = os.path.join(backup_dir, f'recipes_db_{env}_{timestamp}.sql')
db_host = os.getenv('DB_HOST', 'localhost')
db_port = os.getenv('DB_PORT', '5432')
db_name = os.getenv('DB_NAME', 'recipes_db')
db_user = os.getenv('DB_USER', 'postgres')
db_password = os.getenv('DB_PASSWORD', 'postgres')
env = os.environ.copy()
env['PGPASSWORD'] = db_password
cmd = [
'pg_dump',
'-h', db_host,
'-p', db_port,
'-U', db_user,
'-d', db_name,
'--no-owner',
'--no-acl',
'-f', dump_file
]
result = subprocess.run(cmd, env=env, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"pg_dump failed: {result.stderr}")
return dump_file
def compress_file(file_path: str) -> str:
"""Compress a file with gzip"""
compressed_path = f"{file_path}.gz"
with open(file_path, 'rb') as f_in:
with gzip.open(compressed_path, 'wb', compresslevel=9) as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(file_path)
return compressed_path
def upload_to_r2(file_path: str) -> str:
"""Upload file to R2"""
s3_client = get_r2_client()
bucket_name = get_r2_bucket()
file_name = os.path.basename(file_path)
try:
s3_client.upload_file(file_path, bucket_name, file_name)
return file_name
except ClientError as e:
raise Exception(f"R2 upload failed: {str(e)}")
def list_r2_backups() -> List[dict]:
"""List all backups in R2"""
s3_client = get_r2_client()
bucket_name = get_r2_bucket()
try:
response = s3_client.list_objects_v2(Bucket=bucket_name)
if 'Contents' not in response:
return []
backups = []
for obj in response['Contents']:
backups.append({
'filename': obj['Key'],
'size': obj['Size'],
'last_modified': obj['LastModified'].isoformat()
})
backups.sort(key=lambda x: x['last_modified'], reverse=True)
return backups
except ClientError as e:
raise Exception(f"Failed to list R2 backups: {str(e)}")
def download_from_r2(filename: str) -> str:
"""Download a backup from R2"""
s3_client = get_r2_client()
bucket_name = get_r2_bucket()
backup_dir = os.path.join(os.path.dirname(__file__), 'backups')
os.makedirs(backup_dir, exist_ok=True)
local_path = os.path.join(backup_dir, filename)
try:
s3_client.download_file(bucket_name, filename, local_path)
return local_path
except ClientError as e:
raise Exception(f"R2 download failed: {str(e)}")
def decompress_file(compressed_path: str) -> str:
"""Decompress a gzipped file"""
if not compressed_path.endswith('.gz'):
raise ValueError("File must be gzipped (.gz)")
decompressed_path = compressed_path[:-3]
with gzip.open(compressed_path, 'rb') as f_in:
with open(decompressed_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
return decompressed_path
def restore_database(sql_file: str) -> None:
"""Restore database from SQL file"""
db_host = os.getenv('DB_HOST', 'localhost')
db_port = os.getenv('DB_PORT', '5432')
db_name = os.getenv('DB_NAME', 'recipes_db')
db_user = os.getenv('DB_USER', 'postgres')
db_password = os.getenv('DB_PASSWORD', 'postgres')
env = os.environ.copy()
env['PGPASSWORD'] = db_password
# Drop all tables first
drop_cmd = [
'psql',
'-h', db_host,
'-p', db_port,
'-U', db_user,
'-d', db_name,
'-c', 'DROP SCHEMA public CASCADE; CREATE SCHEMA public;'
]
drop_result = subprocess.run(drop_cmd, env=env, capture_output=True, text=True)
if drop_result.returncode != 0:
raise Exception(f"Failed to drop schema: {drop_result.stderr}")
# Restore from backup
restore_cmd = [
'psql',
'-h', db_host,
'-p', db_port,
'-U', db_user,
'-d', db_name,
'-f', sql_file
]
restore_result = subprocess.run(restore_cmd, env=env, capture_output=True, text=True)
if restore_result.returncode != 0:
raise Exception(f"Database restore failed: {restore_result.stderr}")
def perform_backup() -> dict:
"""Perform complete backup process"""
try:
# Create dump
dump_file = create_db_dump()
# Compress
compressed_file = compress_file(dump_file)
# Upload to R2
r2_filename = upload_to_r2(compressed_file)
# Get file size
file_size = os.path.getsize(compressed_file)
# Clean up local file
os.remove(compressed_file)
return {
'success': True,
'filename': r2_filename,
'size': file_size,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
def perform_restore(filename: str) -> dict:
"""Perform complete restore process"""
try:
# Download from R2
compressed_file = download_from_r2(filename)
# Decompress
sql_file = decompress_file(compressed_file)
# Restore database
restore_database(sql_file)
# Clean up
os.remove(compressed_file)
os.remove(sql_file)
return {
'success': True,
'filename': filename,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': str(e)
}