""" Backup and Restore API endpoints for database management. Admin-only access required. """ import os import subprocess import gzip import shutil from datetime import datetime from typing import List import boto3 from botocore.config import Config from botocore.exceptions import ClientError from dotenv import load_dotenv load_dotenv() def get_environment() -> str: """Detect environment based on FRONTEND_URL or deployment environment""" # Check FRONTEND_URL first frontend_url = os.getenv('FRONTEND_URL', '') if 'myrecipes.dvirlabs.com' in frontend_url or 'my-recipes.dvirlabs.com' in frontend_url: return 'prod' # Check if explicitly set via ENV variable env_var = os.getenv('ENVIRONMENT', '').lower() if env_var in ['prod', 'production']: return 'prod' # Check database host (if not localhost, likely production) db_host = os.getenv('DB_HOST', 'localhost') if db_host not in ['localhost', '127.0.0.1', 'postgres']: return 'prod' # Default to dev return 'dev' def get_r2_bucket() -> str: """Get R2 bucket name based on environment""" env = get_environment() return f'my-recipes-db-bkp-{env}' def get_r2_client(): """Get configured R2 client""" return boto3.client( 's3', endpoint_url=os.getenv('R2_ENDPOINT'), aws_access_key_id=os.getenv('R2_ACCESS_KEY'), aws_secret_access_key=os.getenv('R2_SECRET_KEY'), config=Config( signature_version='s3v4', s3={'addressing_style': 'path'} ) ) def create_db_dump() -> str: """Create a database dump file""" timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') env = get_environment() backup_dir = os.path.join(os.path.dirname(__file__), 'backups') os.makedirs(backup_dir, exist_ok=True) dump_file = os.path.join(backup_dir, f'recipes_db_{env}_{timestamp}.sql') db_host = os.getenv('DB_HOST', 'localhost') db_port = os.getenv('DB_PORT', '5432') db_name = os.getenv('DB_NAME', 'recipes_db') db_user = os.getenv('DB_USER', 'postgres') db_password = os.getenv('DB_PASSWORD', 'postgres') env = os.environ.copy() env['PGPASSWORD'] = db_password cmd = [ 'pg_dump', '-h', db_host, '-p', db_port, '-U', db_user, '-d', db_name, '--no-owner', '--no-acl', '-f', dump_file ] result = subprocess.run(cmd, env=env, capture_output=True, text=True) if result.returncode != 0: raise Exception(f"pg_dump failed: {result.stderr}") return dump_file def compress_file(file_path: str) -> str: """Compress a file with gzip""" compressed_path = f"{file_path}.gz" with open(file_path, 'rb') as f_in: with gzip.open(compressed_path, 'wb', compresslevel=9) as f_out: shutil.copyfileobj(f_in, f_out) os.remove(file_path) return compressed_path def upload_to_r2(file_path: str) -> str: """Upload file to R2""" s3_client = get_r2_client() bucket_name = get_r2_bucket() file_name = os.path.basename(file_path) try: s3_client.upload_file(file_path, bucket_name, file_name) return file_name except ClientError as e: raise Exception(f"R2 upload failed: {str(e)}") def list_r2_backups() -> List[dict]: """List all backups in R2""" s3_client = get_r2_client() bucket_name = get_r2_bucket() try: response = s3_client.list_objects_v2(Bucket=bucket_name) if 'Contents' not in response: return [] backups = [] for obj in response['Contents']: backups.append({ 'filename': obj['Key'], 'size': obj['Size'], 'last_modified': obj['LastModified'].isoformat() }) backups.sort(key=lambda x: x['last_modified'], reverse=True) return backups except ClientError as e: raise Exception(f"Failed to list R2 backups: {str(e)}") def download_from_r2(filename: str) -> str: """Download a backup from R2""" s3_client = get_r2_client() bucket_name = get_r2_bucket() backup_dir = os.path.join(os.path.dirname(__file__), 'backups') os.makedirs(backup_dir, exist_ok=True) local_path = os.path.join(backup_dir, filename) try: s3_client.download_file(bucket_name, filename, local_path) return local_path except ClientError as e: raise Exception(f"R2 download failed: {str(e)}") def decompress_file(compressed_path: str) -> str: """Decompress a gzipped file""" if not compressed_path.endswith('.gz'): raise ValueError("File must be gzipped (.gz)") decompressed_path = compressed_path[:-3] with gzip.open(compressed_path, 'rb') as f_in: with open(decompressed_path, 'wb') as f_out: shutil.copyfileobj(f_in, f_out) return decompressed_path def restore_database(sql_file: str) -> None: """Restore database from SQL file""" db_host = os.getenv('DB_HOST', 'localhost') db_port = os.getenv('DB_PORT', '5432') db_name = os.getenv('DB_NAME', 'recipes_db') db_user = os.getenv('DB_USER', 'postgres') db_password = os.getenv('DB_PASSWORD', 'postgres') env = os.environ.copy() env['PGPASSWORD'] = db_password # Drop all tables first drop_cmd = [ 'psql', '-h', db_host, '-p', db_port, '-U', db_user, '-d', db_name, '-c', 'DROP SCHEMA public CASCADE; CREATE SCHEMA public;' ] drop_result = subprocess.run(drop_cmd, env=env, capture_output=True, text=True) if drop_result.returncode != 0: raise Exception(f"Failed to drop schema: {drop_result.stderr}") # Restore from backup restore_cmd = [ 'psql', '-h', db_host, '-p', db_port, '-U', db_user, '-d', db_name, '-f', sql_file ] restore_result = subprocess.run(restore_cmd, env=env, capture_output=True, text=True) if restore_result.returncode != 0: raise Exception(f"Database restore failed: {restore_result.stderr}") def perform_backup() -> dict: """Perform complete backup process""" try: # Create dump dump_file = create_db_dump() # Compress compressed_file = compress_file(dump_file) # Upload to R2 r2_filename = upload_to_r2(compressed_file) # Get file size file_size = os.path.getsize(compressed_file) # Clean up local file os.remove(compressed_file) return { 'success': True, 'filename': r2_filename, 'size': file_size, 'timestamp': datetime.now().isoformat() } except Exception as e: return { 'success': False, 'error': str(e) } def perform_restore(filename: str) -> dict: """Perform complete restore process""" try: # Download from R2 compressed_file = download_from_r2(filename) # Decompress sql_file = decompress_file(compressed_file) # Restore database restore_database(sql_file) # Clean up os.remove(compressed_file) os.remove(sql_file) return { 'success': True, 'filename': filename, 'timestamp': datetime.now().isoformat() } except Exception as e: return { 'success': False, 'error': str(e) }