627 lines
22 KiB
Python
627 lines
22 KiB
Python
from fastapi import FastAPI, HTTPException, Header, Depends, Request
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import RedirectResponse
|
|
from starlette.middleware.sessions import SessionMiddleware
|
|
from pydantic import BaseModel, Field, ConfigDict
|
|
from typing import List, Optional
|
|
from datetime import datetime
|
|
from sqlalchemy.orm import Session
|
|
from authlib.integrations.starlette_client import OAuth
|
|
import uuid
|
|
import hashlib
|
|
import os
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
|
|
from database import init_db, get_db
|
|
import database as db_models
|
|
|
|
app = FastAPI(title="Tasko API", version="1.0.0")
|
|
|
|
# Initialize database
|
|
init_db()
|
|
|
|
# OAuth Configuration
|
|
oauth = OAuth()
|
|
oauth.register(
|
|
name='google',
|
|
client_id=os.getenv('GOOGLE_CLIENT_ID', ''),
|
|
client_secret=os.getenv('GOOGLE_CLIENT_SECRET', ''),
|
|
server_metadata_url='https://accounts.google.com/.well-known/openid-configuration',
|
|
client_kwargs={
|
|
'scope': 'openid email profile'
|
|
}
|
|
)
|
|
|
|
allowed_origins = ["http://localhost:5173", "https://tasko.dvirlabs.com"]
|
|
|
|
# Environment-aware configuration
|
|
ENVIRONMENT = os.getenv('ENVIRONMENT', 'development')
|
|
is_production = ENVIRONMENT == 'production'
|
|
is_development = ENVIRONMENT == 'development'
|
|
|
|
# Configure Session Middleware (required for OAuth state/nonce storage)
|
|
# Generate a strong SECRET_KEY: python -c "import secrets; print(secrets.token_hex(32))"
|
|
SESSION_SECRET = os.getenv('SESSION_SECRET', 'dev-secret-change-in-production')
|
|
|
|
# Session cookie configuration - environment aware
|
|
session_config = {
|
|
"secret_key": SESSION_SECRET,
|
|
"session_cookie": "tasko_session",
|
|
"max_age": 3600, # 1 hour
|
|
"path": "/", # Available on all routes (required for OAuth callback)
|
|
"same_site": "lax", # Allows OAuth redirects while preventing CSRF
|
|
"https_only": is_production, # False for HTTP (dev), True for HTTPS (prod)
|
|
}
|
|
|
|
# Development-specific: Ensure cookies work on localhost
|
|
if is_development:
|
|
session_config["domain"] = None # Don't set domain for localhost
|
|
|
|
app.add_middleware(SessionMiddleware, **session_config)
|
|
|
|
# Log session configuration in development
|
|
if is_development:
|
|
print(f"🔐 Session Configuration (Development Mode):")
|
|
print(f" - Cookie Name: {session_config['session_cookie']}")
|
|
print(f" - Path: {session_config['path']}")
|
|
print(f" - SameSite: {session_config['same_site']}")
|
|
print(f" - HTTPS Only: {session_config['https_only']}")
|
|
print(f" - Domain: {session_config.get('domain', 'None (localhost)')}")
|
|
|
|
# Configure CORS - MUST use specific origins (not "*") when allow_credentials=True
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=allowed_origins, # Specific origins required for credentials
|
|
allow_credentials=True, # Required for session cookies
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
expose_headers=["*"], # Allow frontend to read all response headers
|
|
)
|
|
|
|
# Log startup info
|
|
if is_development:
|
|
print(f"🚀 Tasko API starting in DEVELOPMENT mode")
|
|
print(f" - CORS Origins: {allowed_origins}")
|
|
print(f" - Allow Credentials: True (session cookies enabled)")
|
|
|
|
# Pydantic Models for API
|
|
class UserResponse(BaseModel):
|
|
model_config = ConfigDict(from_attributes=True)
|
|
|
|
id: str
|
|
username: str
|
|
email: str
|
|
created_at: datetime
|
|
|
|
class UserRegister(BaseModel):
|
|
username: str
|
|
email: str
|
|
password: str
|
|
|
|
class UserLogin(BaseModel):
|
|
model_config = ConfigDict(populate_by_name=True)
|
|
|
|
username_or_email: str = Field(..., alias='usernameOrEmail')
|
|
password: str
|
|
|
|
class AuthResponse(BaseModel):
|
|
user: UserResponse
|
|
token: str
|
|
|
|
class TaskListResponse(BaseModel):
|
|
model_config = ConfigDict(from_attributes=True)
|
|
|
|
id: str
|
|
user_id: str
|
|
name: str
|
|
icon: str = "📝"
|
|
color: str = "#667eea"
|
|
created_at: datetime
|
|
|
|
class TaskListCreate(BaseModel):
|
|
name: str
|
|
icon: Optional[str] = "📝"
|
|
color: Optional[str] = "#667eea"
|
|
|
|
class TaskListUpdate(BaseModel):
|
|
name: Optional[str] = None
|
|
icon: Optional[str] = None
|
|
color: Optional[str] = None
|
|
|
|
class TaskResponse(BaseModel):
|
|
model_config = ConfigDict(from_attributes=True)
|
|
|
|
id: str
|
|
list_id: str
|
|
user_id: str
|
|
title: str
|
|
description: Optional[str] = None
|
|
completed: bool = False
|
|
priority: str = "medium"
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
|
|
class TaskCreate(BaseModel):
|
|
title: str
|
|
list_id: str
|
|
description: Optional[str] = None
|
|
priority: str = "medium"
|
|
|
|
class TaskUpdate(BaseModel):
|
|
title: Optional[str] = None
|
|
description: Optional[str] = None
|
|
completed: Optional[bool] = None
|
|
priority: Optional[str] = None
|
|
|
|
def hash_password(password: str) -> str:
|
|
return hashlib.sha256(password.encode()).hexdigest()
|
|
|
|
def verify_token(authorization: Optional[str], db: Session) -> str:
|
|
if not authorization or not authorization.startswith("Bearer "):
|
|
raise HTTPException(status_code=401, detail="Not authenticated")
|
|
|
|
token_str = authorization.replace("Bearer ", "")
|
|
token = db.query(db_models.Token).filter(db_models.Token.token == token_str).first()
|
|
|
|
if not token:
|
|
raise HTTPException(status_code=401, detail="Invalid token")
|
|
|
|
return token.user_id
|
|
|
|
@app.get("/")
|
|
def read_root():
|
|
return {"message": "Welcome to Tasko API"}
|
|
|
|
# Auth endpoints
|
|
@app.post("/register", response_model=AuthResponse)
|
|
def register(user_data: UserRegister, db: Session = Depends(get_db)):
|
|
"""Register a new user"""
|
|
# Check if username or email exists (case-insensitive for username)
|
|
existing_user = db.query(db_models.User).filter(
|
|
(db_models.User.username.ilike(user_data.username)) |
|
|
(db_models.User.email == user_data.email)
|
|
).first()
|
|
|
|
if existing_user:
|
|
if existing_user.username == user_data.username:
|
|
raise HTTPException(status_code=400, detail="Username already exists")
|
|
else:
|
|
raise HTTPException(status_code=400, detail="Email already exists")
|
|
|
|
# Create user
|
|
user_id = str(uuid.uuid4())
|
|
new_user = db_models.User(
|
|
id=user_id,
|
|
username=user_data.username,
|
|
email=user_data.email,
|
|
password_hash=hash_password(user_data.password)
|
|
)
|
|
db.add(new_user)
|
|
|
|
# Create token
|
|
token_str = str(uuid.uuid4())
|
|
new_token = db_models.Token(token=token_str, user_id=user_id)
|
|
db.add(new_token)
|
|
|
|
# Create default lists for new user
|
|
default_lists = [
|
|
{"name": "Personal", "icon": "🏠", "color": "#667eea"},
|
|
{"name": "Work", "icon": "💼", "color": "#f093fb"},
|
|
{"name": "Shopping", "icon": "🛒", "color": "#4facfe"},
|
|
]
|
|
|
|
for list_data in default_lists:
|
|
list_id = str(uuid.uuid4())
|
|
new_list = db_models.TaskList(
|
|
id=list_id,
|
|
user_id=user_id,
|
|
name=list_data["name"],
|
|
icon=list_data["icon"],
|
|
color=list_data["color"]
|
|
)
|
|
db.add(new_list)
|
|
|
|
db.commit()
|
|
db.refresh(new_user)
|
|
|
|
return AuthResponse(
|
|
user=UserResponse.model_validate(new_user),
|
|
token=token_str
|
|
)
|
|
|
|
@app.post("/login", response_model=AuthResponse)
|
|
def login(user_data: UserLogin, db: Session = Depends(get_db)):
|
|
"""Login a user with username or email"""
|
|
password_hash = hash_password(user_data.password)
|
|
|
|
# Try to find user by email or username (case-insensitive for username)
|
|
user = db.query(db_models.User).filter(
|
|
((db_models.User.email == user_data.username_or_email) |
|
|
(db_models.User.username.ilike(user_data.username_or_email))),
|
|
db_models.User.password_hash == password_hash
|
|
).first()
|
|
|
|
if not user:
|
|
raise HTTPException(status_code=401, detail="Invalid credentials")
|
|
|
|
# Create new token
|
|
token_str = str(uuid.uuid4())
|
|
new_token = db_models.Token(token=token_str, user_id=user.id)
|
|
db.add(new_token)
|
|
db.commit()
|
|
|
|
return AuthResponse(
|
|
user=UserResponse.model_validate(user),
|
|
token=token_str
|
|
)
|
|
|
|
@app.post("/logout")
|
|
def logout(authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
|
|
"""Logout a user"""
|
|
if authorization and authorization.startswith("Bearer "):
|
|
token_str = authorization.replace("Bearer ", "")
|
|
token = db.query(db_models.Token).filter(db_models.Token.token == token_str).first()
|
|
if token:
|
|
db.delete(token)
|
|
db.commit()
|
|
return {"message": "Logged out successfully"}
|
|
|
|
@app.get("/auth/google")
|
|
async def google_login(request: Request):
|
|
"""Initiate Google OAuth login - DIRECT BROWSER REDIRECT (not fetch)"""
|
|
redirect_uri = os.getenv('GOOGLE_REDIRECT_URI', 'http://localhost:8000/auth/google/callback')
|
|
|
|
# Debug logging for development
|
|
if is_development:
|
|
print(f"\n🔑 OAuth Login initiated (/auth/google):")
|
|
print(f" - Redirect URI: {redirect_uri}")
|
|
print(f" - Request URL: {request.url}")
|
|
print(f" - Request method: {request.method}")
|
|
print(f" - Request headers Cookie: {request.headers.get('cookie', 'NONE')}")
|
|
print(f" - Session available: {hasattr(request, 'session')}")
|
|
if hasattr(request, 'session'):
|
|
print(f" - Session keys BEFORE: {list(request.session.keys())}")
|
|
|
|
# This will set session state and redirect to Google
|
|
response = await oauth.google.authorize_redirect(request, redirect_uri)
|
|
|
|
if is_development:
|
|
print(f" - Session keys AFTER: {list(request.session.keys())}")
|
|
print(f" - Response status: {response.status_code}")
|
|
print(f" - Response Set-Cookie: {response.headers.get('set-cookie', 'NONE')}")
|
|
print(f" - Response Location: {response.headers.get('location', 'NONE')[:100]}...")
|
|
|
|
return response
|
|
|
|
@app.get("/auth/google/callback")
|
|
async def google_callback(request: Request, db: Session = Depends(get_db)):
|
|
"""Handle Google OAuth callback"""
|
|
try:
|
|
# Debug logging for development
|
|
if is_development:
|
|
print(f"\n🔄 OAuth Callback received (/auth/google/callback):")
|
|
print(f" - Request URL: {request.url}")
|
|
print(f" - Request method: {request.method}")
|
|
print(f" - Request headers Cookie: {request.headers.get('cookie', 'NONE')}")
|
|
print(f" - Request query params: {dict(request.query_params)}")
|
|
print(f" - Cookies from request.cookies: {list(request.cookies.keys())}")
|
|
print(f" - Session available: {hasattr(request, 'session')}")
|
|
if hasattr(request, 'session'):
|
|
session_keys = list(request.session.keys())
|
|
print(f" - Session keys: {session_keys}")
|
|
# Print state for debugging
|
|
for key in session_keys:
|
|
if 'state' in key.lower():
|
|
value_str = str(request.session[key])
|
|
if len(value_str) > 100:
|
|
print(f" - Session[{key}]: {value_str[:100]}...")
|
|
else:
|
|
print(f" - Session[{key}]: {value_str}")
|
|
|
|
# Get access token from Google
|
|
token = await oauth.google.authorize_access_token(request)
|
|
|
|
# Get user info from Google
|
|
user_info = token.get('userinfo')
|
|
if not user_info:
|
|
raise HTTPException(status_code=400, detail="Failed to get user info from Google")
|
|
|
|
email = user_info.get('email')
|
|
google_id = user_info.get('sub')
|
|
name = user_info.get('name', email.split('@')[0])
|
|
|
|
# Check if user exists
|
|
user = db.query(db_models.User).filter(db_models.User.email == email).first()
|
|
|
|
if not user:
|
|
# Create new user
|
|
user_id = str(uuid.uuid4())
|
|
# Use email username part as username, make it unique if needed
|
|
username = email.split('@')[0]
|
|
counter = 1
|
|
original_username = username
|
|
while db.query(db_models.User).filter(db_models.User.username.ilike(username)).first():
|
|
username = f"{original_username}{counter}"
|
|
counter += 1
|
|
|
|
user = db_models.User(
|
|
id=user_id,
|
|
username=username,
|
|
email=email,
|
|
password_hash=hashlib.sha256(google_id.encode()).hexdigest() # Use Google ID as password hash
|
|
)
|
|
db.add(user)
|
|
|
|
# Create default lists for new user
|
|
default_lists = [
|
|
{"name": "Personal", "icon": "🏠", "color": "#667eea"},
|
|
{"name": "Work", "icon": "💼", "color": "#f093fb"},
|
|
{"name": "Shopping", "icon": "🛒", "color": "#4facfe"},
|
|
]
|
|
|
|
for list_data in default_lists:
|
|
list_id = str(uuid.uuid4())
|
|
new_list = db_models.TaskList(
|
|
id=list_id,
|
|
user_id=user_id,
|
|
name=list_data["name"],
|
|
icon=list_data["icon"],
|
|
color=list_data["color"]
|
|
)
|
|
db.add(new_list)
|
|
|
|
db.commit()
|
|
db.refresh(user)
|
|
|
|
# Create auth token
|
|
token_str = str(uuid.uuid4())
|
|
new_token = db_models.Token(token=token_str, user_id=user.id)
|
|
db.add(new_token)
|
|
db.commit()
|
|
|
|
if is_development:
|
|
print(f"✅ OAuth Login SUCCESS!")
|
|
print(f" - User: {user.email} (ID: {user.id})")
|
|
print(f" - Token generated: {token_str[:20]}...")
|
|
print(f" - Redirecting to frontend with token")
|
|
|
|
# Redirect to frontend with token
|
|
frontend_url = os.getenv('FRONTEND_URL', 'http://localhost:5173')
|
|
return RedirectResponse(url=f"{frontend_url}?token={token_str}&user={user.id}")
|
|
|
|
except Exception as e:
|
|
if is_development:
|
|
print(f"❌ OAuth Login FAILED: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
raise HTTPException(status_code=400, detail=f"Authentication failed: {str(e)}")
|
|
|
|
@app.get("/auth/google/url")
|
|
def get_google_auth_url():
|
|
"""Get Google OAuth URL for frontend"""
|
|
redirect_uri = os.getenv('GOOGLE_REDIRECT_URI', 'http://localhost:8000/auth/google/callback')
|
|
authorization_url = f"https://accounts.google.com/o/oauth2/v2/auth?client_id={os.getenv('GOOGLE_CLIENT_ID', '')}&redirect_uri={redirect_uri}&response_type=code&scope=openid%20email%20profile&access_type=offline&prompt=consent"
|
|
return {"url": authorization_url}
|
|
|
|
@app.get("/api/user", response_model=UserResponse)
|
|
def get_current_user(authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
|
|
"""Get current user details"""
|
|
user_id = verify_token(authorization, db)
|
|
user = db.query(db_models.User).filter(db_models.User.id == user_id).first()
|
|
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
return UserResponse.model_validate(user)
|
|
|
|
# Task List endpoints
|
|
@app.get("/lists", response_model=List[TaskListResponse])
|
|
@app.get("/api/lists", response_model=List[TaskListResponse])
|
|
def get_task_lists(authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
|
|
"""Get all task lists for the authenticated user"""
|
|
user_id = verify_token(authorization, db)
|
|
task_lists = db.query(db_models.TaskList).filter(db_models.TaskList.user_id == user_id).all()
|
|
return [TaskListResponse.model_validate(tl) for tl in task_lists]
|
|
|
|
@app.get("/lists/{list_id}", response_model=TaskListResponse)
|
|
@app.get("/api/lists/{list_id}", response_model=TaskListResponse)
|
|
def get_task_list(list_id: str, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
|
|
"""Get a specific task list by ID"""
|
|
user_id = verify_token(authorization, db)
|
|
|
|
task_list = db.query(db_models.TaskList).filter(
|
|
db_models.TaskList.id == list_id,
|
|
db_models.TaskList.user_id == user_id
|
|
).first()
|
|
|
|
if not task_list:
|
|
raise HTTPException(status_code=404, detail="Task list not found")
|
|
|
|
return TaskListResponse.model_validate(task_list)
|
|
|
|
@app.post("/lists", response_model=TaskListResponse, status_code=201)
|
|
@app.post("/api/lists", response_model=TaskListResponse, status_code=201)
|
|
def create_task_list(task_list: TaskListCreate, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
|
|
"""Create a new task list"""
|
|
user_id = verify_token(authorization, db)
|
|
|
|
list_id = str(uuid.uuid4())
|
|
new_list = db_models.TaskList(
|
|
id=list_id,
|
|
user_id=user_id,
|
|
name=task_list.name,
|
|
icon=task_list.icon or "📝",
|
|
color=task_list.color or "#667eea"
|
|
)
|
|
|
|
db.add(new_list)
|
|
db.commit()
|
|
db.refresh(new_list)
|
|
|
|
return TaskListResponse.model_validate(new_list)
|
|
|
|
@app.put("/lists/{list_id}", response_model=TaskListResponse)
|
|
@app.put("/api/lists/{list_id}", response_model=TaskListResponse)
|
|
def update_task_list(list_id: str, list_update: TaskListUpdate, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
|
|
"""Update an existing task list"""
|
|
user_id = verify_token(authorization, db)
|
|
|
|
task_list = db.query(db_models.TaskList).filter(
|
|
db_models.TaskList.id == list_id,
|
|
db_models.TaskList.user_id == user_id
|
|
).first()
|
|
|
|
if not task_list:
|
|
raise HTTPException(status_code=404, detail="Task list not found")
|
|
|
|
update_data = list_update.model_dump(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(task_list, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(task_list)
|
|
|
|
return TaskListResponse.model_validate(task_list)
|
|
|
|
@app.delete("/lists/{list_id}", status_code=204)
|
|
@app.delete("/api/lists/{list_id}", status_code=204)
|
|
def delete_task_list(list_id: str, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
|
|
"""Delete a task list and all its tasks"""
|
|
user_id = verify_token(authorization, db)
|
|
|
|
task_list = db.query(db_models.TaskList).filter(
|
|
db_models.TaskList.id == list_id,
|
|
db_models.TaskList.user_id == user_id
|
|
).first()
|
|
|
|
if not task_list:
|
|
raise HTTPException(status_code=404, detail="Task list not found")
|
|
|
|
# Tasks will be deleted automatically due to cascade relationship
|
|
db.delete(task_list)
|
|
db.commit()
|
|
|
|
return None
|
|
|
|
# Task endpoints
|
|
@app.get("/tasks", response_model=List[TaskResponse])
|
|
@app.get("/api/tasks", response_model=List[TaskResponse])
|
|
def get_tasks(list_id: Optional[str] = None, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
|
|
"""Get all tasks for authenticated user, optionally filtered by list_id"""
|
|
user_id = verify_token(authorization, db)
|
|
|
|
query = db.query(db_models.Task).filter(db_models.Task.user_id == user_id)
|
|
|
|
if list_id:
|
|
# Verify list belongs to user
|
|
task_list = db.query(db_models.TaskList).filter(
|
|
db_models.TaskList.id == list_id,
|
|
db_models.TaskList.user_id == user_id
|
|
).first()
|
|
|
|
if not task_list:
|
|
raise HTTPException(status_code=403, detail="Access denied")
|
|
|
|
query = query.filter(db_models.Task.list_id == list_id)
|
|
|
|
tasks = query.all()
|
|
return [TaskResponse.model_validate(t) for t in tasks]
|
|
|
|
@app.get("/tasks/{task_id}", response_model=TaskResponse)
|
|
@app.get("/api/tasks/{task_id}", response_model=TaskResponse)
|
|
def get_task(task_id: str, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
|
|
"""Get a specific task by ID"""
|
|
user_id = verify_token(authorization, db)
|
|
|
|
task = db.query(db_models.Task).filter(
|
|
db_models.Task.id == task_id,
|
|
db_models.Task.user_id == user_id
|
|
).first()
|
|
|
|
if not task:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
|
|
return TaskResponse.model_validate(task)
|
|
|
|
@app.post("/tasks", response_model=TaskResponse, status_code=201)
|
|
@app.post("/api/tasks", response_model=TaskResponse, status_code=201)
|
|
def create_task(task: TaskCreate, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
|
|
"""Create a new task"""
|
|
user_id = verify_token(authorization, db)
|
|
|
|
# Verify list exists and belongs to user
|
|
task_list = db.query(db_models.TaskList).filter(
|
|
db_models.TaskList.id == task.list_id,
|
|
db_models.TaskList.user_id == user_id
|
|
).first()
|
|
|
|
if not task_list:
|
|
raise HTTPException(status_code=404, detail="Task list not found")
|
|
|
|
task_id = str(uuid.uuid4())
|
|
new_task = db_models.Task(
|
|
id=task_id,
|
|
list_id=task.list_id,
|
|
user_id=user_id,
|
|
title=task.title,
|
|
description=task.description,
|
|
completed=False,
|
|
priority=task.priority
|
|
)
|
|
|
|
db.add(new_task)
|
|
db.commit()
|
|
db.refresh(new_task)
|
|
|
|
return TaskResponse.model_validate(new_task)
|
|
|
|
@app.put("/tasks/{task_id}", response_model=TaskResponse)
|
|
@app.put("/api/tasks/{task_id}", response_model=TaskResponse)
|
|
def update_task(task_id: str, task_update: TaskUpdate, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
|
|
"""Update an existing task"""
|
|
user_id = verify_token(authorization, db)
|
|
|
|
task = db.query(db_models.Task).filter(
|
|
db_models.Task.id == task_id,
|
|
db_models.Task.user_id == user_id
|
|
).first()
|
|
|
|
if not task:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
|
|
update_data = task_update.model_dump(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(task, field, value)
|
|
|
|
task.updated_at = datetime.now()
|
|
|
|
db.commit()
|
|
db.refresh(task)
|
|
|
|
return TaskResponse.model_validate(task)
|
|
|
|
@app.delete("/tasks/{task_id}", status_code=204)
|
|
@app.delete("/api/tasks/{task_id}", status_code=204)
|
|
def delete_task(task_id: str, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
|
|
"""Delete a task"""
|
|
user_id = verify_token(authorization, db)
|
|
|
|
task = db.query(db_models.Task).filter(
|
|
db_models.Task.id == task_id,
|
|
db_models.Task.user_id == user_id
|
|
).first()
|
|
|
|
if not task:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
|
|
db.delete(task)
|
|
db.commit()
|
|
|
|
return None
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
|