Replace deprecated class Config with model_config = ConfigDict() to fix Pydantic v2 deprecation warnings
533 lines
18 KiB
Python
533 lines
18 KiB
Python
from fastapi import FastAPI, HTTPException, Header, Depends
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import RedirectResponse
|
|
from pydantic import BaseModel, Field, ConfigDict
|
|
from typing import List, Optional
|
|
from datetime import datetime
|
|
from sqlalchemy.orm import Session
|
|
from authlib.integrations.starlette_client import OAuth
|
|
import uuid
|
|
import hashlib
|
|
import os
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
|
|
from database import init_db, get_db
|
|
import database as db_models
|
|
|
|
app = FastAPI(title="Tasko API", version="1.0.0")
|
|
|
|
# Initialize database
|
|
init_db()
|
|
|
|
# OAuth Configuration
|
|
oauth = OAuth()
|
|
oauth.register(
|
|
name='google',
|
|
client_id=os.getenv('GOOGLE_CLIENT_ID', ''),
|
|
client_secret=os.getenv('GOOGLE_CLIENT_SECRET', ''),
|
|
server_metadata_url='https://accounts.google.com/.well-known/openid-configuration',
|
|
client_kwargs={
|
|
'scope': 'openid email profile'
|
|
}
|
|
)
|
|
|
|
allowed_origins = ["http://localhost:5173", "https://tasko.dvirlabs.com"]
|
|
|
|
# Configure CORS
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# Pydantic Models for API
|
|
class UserResponse(BaseModel):
|
|
model_config = ConfigDict(from_attributes=True)
|
|
|
|
id: str
|
|
username: str
|
|
email: str
|
|
created_at: datetime
|
|
|
|
class UserRegister(BaseModel):
|
|
username: str
|
|
email: str
|
|
password: str
|
|
|
|
class UserLogin(BaseModel):
|
|
model_config = ConfigDict(populate_by_name=True)
|
|
|
|
username_or_email: str = Field(..., alias='usernameOrEmail')
|
|
password: str
|
|
|
|
class AuthResponse(BaseModel):
|
|
user: UserResponse
|
|
token: str
|
|
|
|
class TaskListResponse(BaseModel):
|
|
model_config = ConfigDict(from_attributes=True)
|
|
|
|
id: str
|
|
user_id: str
|
|
name: str
|
|
icon: str = "📝"
|
|
color: str = "#667eea"
|
|
created_at: datetime
|
|
|
|
class TaskListCreate(BaseModel):
|
|
name: str
|
|
icon: Optional[str] = "📝"
|
|
color: Optional[str] = "#667eea"
|
|
|
|
class TaskListUpdate(BaseModel):
|
|
name: Optional[str] = None
|
|
icon: Optional[str] = None
|
|
color: Optional[str] = None
|
|
|
|
class TaskResponse(BaseModel):
|
|
model_config = ConfigDict(from_attributes=True)
|
|
|
|
id: str
|
|
list_id: str
|
|
user_id: str
|
|
title: str
|
|
description: Optional[str] = None
|
|
completed: bool = False
|
|
priority: str = "medium"
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
|
|
class TaskCreate(BaseModel):
|
|
title: str
|
|
list_id: str
|
|
description: Optional[str] = None
|
|
priority: str = "medium"
|
|
|
|
class TaskUpdate(BaseModel):
|
|
title: Optional[str] = None
|
|
description: Optional[str] = None
|
|
completed: Optional[bool] = None
|
|
priority: Optional[str] = None
|
|
|
|
def hash_password(password: str) -> str:
|
|
return hashlib.sha256(password.encode()).hexdigest()
|
|
|
|
def verify_token(authorization: Optional[str], db: Session) -> str:
|
|
if not authorization or not authorization.startswith("Bearer "):
|
|
raise HTTPException(status_code=401, detail="Not authenticated")
|
|
|
|
token_str = authorization.replace("Bearer ", "")
|
|
token = db.query(db_models.Token).filter(db_models.Token.token == token_str).first()
|
|
|
|
if not token:
|
|
raise HTTPException(status_code=401, detail="Invalid token")
|
|
|
|
return token.user_id
|
|
|
|
@app.get("/")
|
|
def read_root():
|
|
return {"message": "Welcome to Tasko API"}
|
|
|
|
# Auth endpoints
|
|
@app.post("/register", response_model=AuthResponse)
|
|
def register(user_data: UserRegister, db: Session = Depends(get_db)):
|
|
"""Register a new user"""
|
|
# Check if username or email exists (case-insensitive for username)
|
|
existing_user = db.query(db_models.User).filter(
|
|
(db_models.User.username.ilike(user_data.username)) |
|
|
(db_models.User.email == user_data.email)
|
|
).first()
|
|
|
|
if existing_user:
|
|
if existing_user.username == user_data.username:
|
|
raise HTTPException(status_code=400, detail="Username already exists")
|
|
else:
|
|
raise HTTPException(status_code=400, detail="Email already exists")
|
|
|
|
# Create user
|
|
user_id = str(uuid.uuid4())
|
|
new_user = db_models.User(
|
|
id=user_id,
|
|
username=user_data.username,
|
|
email=user_data.email,
|
|
password_hash=hash_password(user_data.password)
|
|
)
|
|
db.add(new_user)
|
|
|
|
# Create token
|
|
token_str = str(uuid.uuid4())
|
|
new_token = db_models.Token(token=token_str, user_id=user_id)
|
|
db.add(new_token)
|
|
|
|
# Create default lists for new user
|
|
default_lists = [
|
|
{"name": "Personal", "icon": "🏠", "color": "#667eea"},
|
|
{"name": "Work", "icon": "💼", "color": "#f093fb"},
|
|
{"name": "Shopping", "icon": "🛒", "color": "#4facfe"},
|
|
]
|
|
|
|
for list_data in default_lists:
|
|
list_id = str(uuid.uuid4())
|
|
new_list = db_models.TaskList(
|
|
id=list_id,
|
|
user_id=user_id,
|
|
name=list_data["name"],
|
|
icon=list_data["icon"],
|
|
color=list_data["color"]
|
|
)
|
|
db.add(new_list)
|
|
|
|
db.commit()
|
|
db.refresh(new_user)
|
|
|
|
return AuthResponse(
|
|
user=UserResponse.model_validate(new_user),
|
|
token=token_str
|
|
)
|
|
|
|
@app.post("/login", response_model=AuthResponse)
|
|
def login(user_data: UserLogin, db: Session = Depends(get_db)):
|
|
"""Login a user with username or email"""
|
|
password_hash = hash_password(user_data.password)
|
|
|
|
# Try to find user by email or username (case-insensitive for username)
|
|
user = db.query(db_models.User).filter(
|
|
((db_models.User.email == user_data.username_or_email) |
|
|
(db_models.User.username.ilike(user_data.username_or_email))),
|
|
db_models.User.password_hash == password_hash
|
|
).first()
|
|
|
|
if not user:
|
|
raise HTTPException(status_code=401, detail="Invalid credentials")
|
|
|
|
# Create new token
|
|
token_str = str(uuid.uuid4())
|
|
new_token = db_models.Token(token=token_str, user_id=user.id)
|
|
db.add(new_token)
|
|
db.commit()
|
|
|
|
return AuthResponse(
|
|
user=UserResponse.model_validate(user),
|
|
token=token_str
|
|
)
|
|
|
|
@app.post("/logout")
|
|
def logout(authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
|
|
"""Logout a user"""
|
|
if authorization and authorization.startswith("Bearer "):
|
|
token_str = authorization.replace("Bearer ", "")
|
|
token = db.query(db_models.Token).filter(db_models.Token.token == token_str).first()
|
|
if token:
|
|
db.delete(token)
|
|
db.commit()
|
|
return {"message": "Logged out successfully"}
|
|
|
|
@app.get("/auth/google")
|
|
async def google_login():
|
|
"""Initiate Google OAuth login"""
|
|
redirect_uri = os.getenv('GOOGLE_REDIRECT_URI', 'http://localhost:8000/auth/google/callback')
|
|
return await oauth.google.authorize_redirect(redirect_uri)
|
|
|
|
@app.get("/auth/google/callback")
|
|
async def google_callback(code: str, db: Session = Depends(get_db)):
|
|
"""Handle Google OAuth callback"""
|
|
try:
|
|
# Get access token from Google
|
|
token = await oauth.google.authorize_access_token()
|
|
|
|
# Get user info from Google
|
|
user_info = token.get('userinfo')
|
|
if not user_info:
|
|
raise HTTPException(status_code=400, detail="Failed to get user info from Google")
|
|
|
|
email = user_info.get('email')
|
|
google_id = user_info.get('sub')
|
|
name = user_info.get('name', email.split('@')[0])
|
|
|
|
# Check if user exists
|
|
user = db.query(db_models.User).filter(db_models.User.email == email).first()
|
|
|
|
if not user:
|
|
# Create new user
|
|
user_id = str(uuid.uuid4())
|
|
# Use email username part as username, make it unique if needed
|
|
username = email.split('@')[0]
|
|
counter = 1
|
|
original_username = username
|
|
while db.query(db_models.User).filter(db_models.User.username.ilike(username)).first():
|
|
username = f"{original_username}{counter}"
|
|
counter += 1
|
|
|
|
user = db_models.User(
|
|
id=user_id,
|
|
username=username,
|
|
email=email,
|
|
password_hash=hashlib.sha256(google_id.encode()).hexdigest() # Use Google ID as password hash
|
|
)
|
|
db.add(user)
|
|
|
|
# Create default lists for new user
|
|
default_lists = [
|
|
{"name": "Personal", "icon": "🏠", "color": "#667eea"},
|
|
{"name": "Work", "icon": "💼", "color": "#f093fb"},
|
|
{"name": "Shopping", "icon": "🛒", "color": "#4facfe"},
|
|
]
|
|
|
|
for list_data in default_lists:
|
|
list_id = str(uuid.uuid4())
|
|
new_list = db_models.TaskList(
|
|
id=list_id,
|
|
user_id=user_id,
|
|
name=list_data["name"],
|
|
icon=list_data["icon"],
|
|
color=list_data["color"]
|
|
)
|
|
db.add(new_list)
|
|
|
|
db.commit()
|
|
db.refresh(user)
|
|
|
|
# Create auth token
|
|
token_str = str(uuid.uuid4())
|
|
new_token = db_models.Token(token=token_str, user_id=user.id)
|
|
db.add(new_token)
|
|
db.commit()
|
|
|
|
# Redirect to frontend with token
|
|
frontend_url = os.getenv('FRONTEND_URL', 'http://localhost:5173')
|
|
return RedirectResponse(url=f"{frontend_url}?token={token_str}&user={user.id}")
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=400, detail=f"Authentication failed: {str(e)}")
|
|
|
|
@app.get("/auth/google/url")
|
|
def get_google_auth_url():
|
|
"""Get Google OAuth URL for frontend"""
|
|
redirect_uri = os.getenv('GOOGLE_REDIRECT_URI', 'http://localhost:8000/auth/google/callback')
|
|
authorization_url = f"https://accounts.google.com/o/oauth2/v2/auth?client_id={os.getenv('GOOGLE_CLIENT_ID', '')}&redirect_uri={redirect_uri}&response_type=code&scope=openid%20email%20profile&access_type=offline&prompt=consent"
|
|
return {"url": authorization_url}
|
|
|
|
@app.get("/api/user", response_model=UserResponse)
|
|
def get_current_user(authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
|
|
"""Get current user details"""
|
|
user_id = verify_token(authorization, db)
|
|
user = db.query(db_models.User).filter(db_models.User.id == user_id).first()
|
|
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
return UserResponse.model_validate(user)
|
|
|
|
# Task List endpoints
|
|
@app.get("/lists", response_model=List[TaskListResponse])
|
|
@app.get("/api/lists", response_model=List[TaskListResponse])
|
|
def get_task_lists(authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
|
|
"""Get all task lists for the authenticated user"""
|
|
user_id = verify_token(authorization, db)
|
|
task_lists = db.query(db_models.TaskList).filter(db_models.TaskList.user_id == user_id).all()
|
|
return [TaskListResponse.model_validate(tl) for tl in task_lists]
|
|
|
|
@app.get("/lists/{list_id}", response_model=TaskListResponse)
|
|
@app.get("/api/lists/{list_id}", response_model=TaskListResponse)
|
|
def get_task_list(list_id: str, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
|
|
"""Get a specific task list by ID"""
|
|
user_id = verify_token(authorization, db)
|
|
|
|
task_list = db.query(db_models.TaskList).filter(
|
|
db_models.TaskList.id == list_id,
|
|
db_models.TaskList.user_id == user_id
|
|
).first()
|
|
|
|
if not task_list:
|
|
raise HTTPException(status_code=404, detail="Task list not found")
|
|
|
|
return TaskListResponse.model_validate(task_list)
|
|
|
|
@app.post("/lists", response_model=TaskListResponse, status_code=201)
|
|
@app.post("/api/lists", response_model=TaskListResponse, status_code=201)
|
|
def create_task_list(task_list: TaskListCreate, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
|
|
"""Create a new task list"""
|
|
user_id = verify_token(authorization, db)
|
|
|
|
list_id = str(uuid.uuid4())
|
|
new_list = db_models.TaskList(
|
|
id=list_id,
|
|
user_id=user_id,
|
|
name=task_list.name,
|
|
icon=task_list.icon or "📝",
|
|
color=task_list.color or "#667eea"
|
|
)
|
|
|
|
db.add(new_list)
|
|
db.commit()
|
|
db.refresh(new_list)
|
|
|
|
return TaskListResponse.model_validate(new_list)
|
|
|
|
@app.put("/lists/{list_id}", response_model=TaskListResponse)
|
|
@app.put("/api/lists/{list_id}", response_model=TaskListResponse)
|
|
def update_task_list(list_id: str, list_update: TaskListUpdate, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
|
|
"""Update an existing task list"""
|
|
user_id = verify_token(authorization, db)
|
|
|
|
task_list = db.query(db_models.TaskList).filter(
|
|
db_models.TaskList.id == list_id,
|
|
db_models.TaskList.user_id == user_id
|
|
).first()
|
|
|
|
if not task_list:
|
|
raise HTTPException(status_code=404, detail="Task list not found")
|
|
|
|
update_data = list_update.model_dump(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(task_list, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(task_list)
|
|
|
|
return TaskListResponse.model_validate(task_list)
|
|
|
|
@app.delete("/lists/{list_id}", status_code=204)
|
|
@app.delete("/api/lists/{list_id}", status_code=204)
|
|
def delete_task_list(list_id: str, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
|
|
"""Delete a task list and all its tasks"""
|
|
user_id = verify_token(authorization, db)
|
|
|
|
task_list = db.query(db_models.TaskList).filter(
|
|
db_models.TaskList.id == list_id,
|
|
db_models.TaskList.user_id == user_id
|
|
).first()
|
|
|
|
if not task_list:
|
|
raise HTTPException(status_code=404, detail="Task list not found")
|
|
|
|
# Tasks will be deleted automatically due to cascade relationship
|
|
db.delete(task_list)
|
|
db.commit()
|
|
|
|
return None
|
|
|
|
# Task endpoints
|
|
@app.get("/tasks", response_model=List[TaskResponse])
|
|
@app.get("/api/tasks", response_model=List[TaskResponse])
|
|
def get_tasks(list_id: Optional[str] = None, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
|
|
"""Get all tasks for authenticated user, optionally filtered by list_id"""
|
|
user_id = verify_token(authorization, db)
|
|
|
|
query = db.query(db_models.Task).filter(db_models.Task.user_id == user_id)
|
|
|
|
if list_id:
|
|
# Verify list belongs to user
|
|
task_list = db.query(db_models.TaskList).filter(
|
|
db_models.TaskList.id == list_id,
|
|
db_models.TaskList.user_id == user_id
|
|
).first()
|
|
|
|
if not task_list:
|
|
raise HTTPException(status_code=403, detail="Access denied")
|
|
|
|
query = query.filter(db_models.Task.list_id == list_id)
|
|
|
|
tasks = query.all()
|
|
return [TaskResponse.model_validate(t) for t in tasks]
|
|
|
|
@app.get("/tasks/{task_id}", response_model=TaskResponse)
|
|
@app.get("/api/tasks/{task_id}", response_model=TaskResponse)
|
|
def get_task(task_id: str, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
|
|
"""Get a specific task by ID"""
|
|
user_id = verify_token(authorization, db)
|
|
|
|
task = db.query(db_models.Task).filter(
|
|
db_models.Task.id == task_id,
|
|
db_models.Task.user_id == user_id
|
|
).first()
|
|
|
|
if not task:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
|
|
return TaskResponse.model_validate(task)
|
|
|
|
@app.post("/tasks", response_model=TaskResponse, status_code=201)
|
|
@app.post("/api/tasks", response_model=TaskResponse, status_code=201)
|
|
def create_task(task: TaskCreate, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
|
|
"""Create a new task"""
|
|
user_id = verify_token(authorization, db)
|
|
|
|
# Verify list exists and belongs to user
|
|
task_list = db.query(db_models.TaskList).filter(
|
|
db_models.TaskList.id == task.list_id,
|
|
db_models.TaskList.user_id == user_id
|
|
).first()
|
|
|
|
if not task_list:
|
|
raise HTTPException(status_code=404, detail="Task list not found")
|
|
|
|
task_id = str(uuid.uuid4())
|
|
new_task = db_models.Task(
|
|
id=task_id,
|
|
list_id=task.list_id,
|
|
user_id=user_id,
|
|
title=task.title,
|
|
description=task.description,
|
|
completed=False,
|
|
priority=task.priority
|
|
)
|
|
|
|
db.add(new_task)
|
|
db.commit()
|
|
db.refresh(new_task)
|
|
|
|
return TaskResponse.model_validate(new_task)
|
|
|
|
@app.put("/tasks/{task_id}", response_model=TaskResponse)
|
|
@app.put("/api/tasks/{task_id}", response_model=TaskResponse)
|
|
def update_task(task_id: str, task_update: TaskUpdate, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
|
|
"""Update an existing task"""
|
|
user_id = verify_token(authorization, db)
|
|
|
|
task = db.query(db_models.Task).filter(
|
|
db_models.Task.id == task_id,
|
|
db_models.Task.user_id == user_id
|
|
).first()
|
|
|
|
if not task:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
|
|
update_data = task_update.model_dump(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(task, field, value)
|
|
|
|
task.updated_at = datetime.now()
|
|
|
|
db.commit()
|
|
db.refresh(task)
|
|
|
|
return TaskResponse.model_validate(task)
|
|
|
|
@app.delete("/tasks/{task_id}", status_code=204)
|
|
@app.delete("/api/tasks/{task_id}", status_code=204)
|
|
def delete_task(task_id: str, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
|
|
"""Delete a task"""
|
|
user_id = verify_token(authorization, db)
|
|
|
|
task = db.query(db_models.Task).filter(
|
|
db_models.Task.id == task_id,
|
|
db_models.Task.user_id == user_id
|
|
).first()
|
|
|
|
if not task:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
|
|
db.delete(task)
|
|
db.commit()
|
|
|
|
return None
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
|