from fastapi import FastAPI, HTTPException, Header, Depends, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import RedirectResponse from pydantic import BaseModel, Field, ConfigDict from typing import List, Optional from datetime import datetime from sqlalchemy.orm import Session from authlib.integrations.starlette_client import OAuth import uuid import hashlib import os from dotenv import load_dotenv load_dotenv() from database import init_db, get_db import database as db_models app = FastAPI(title="Tasko API", version="1.0.0") # Initialize database init_db() # OAuth Configuration oauth = OAuth() oauth.register( name='google', client_id=os.getenv('GOOGLE_CLIENT_ID', ''), client_secret=os.getenv('GOOGLE_CLIENT_SECRET', ''), server_metadata_url='https://accounts.google.com/.well-known/openid-configuration', client_kwargs={ 'scope': 'openid email profile' } ) allowed_origins = ["http://localhost:5173", "https://tasko.dvirlabs.com"] # Configure CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Pydantic Models for API class UserResponse(BaseModel): model_config = ConfigDict(from_attributes=True) id: str username: str email: str created_at: datetime class UserRegister(BaseModel): username: str email: str password: str class UserLogin(BaseModel): model_config = ConfigDict(populate_by_name=True) username_or_email: str = Field(..., alias='usernameOrEmail') password: str class AuthResponse(BaseModel): user: UserResponse token: str class TaskListResponse(BaseModel): model_config = ConfigDict(from_attributes=True) id: str user_id: str name: str icon: str = "📝" color: str = "#667eea" created_at: datetime class TaskListCreate(BaseModel): name: str icon: Optional[str] = "📝" color: Optional[str] = "#667eea" class TaskListUpdate(BaseModel): name: Optional[str] = None icon: Optional[str] = None color: Optional[str] = None class TaskResponse(BaseModel): model_config = ConfigDict(from_attributes=True) id: str list_id: str user_id: str title: str description: Optional[str] = None completed: bool = False priority: str = "medium" created_at: datetime updated_at: datetime class TaskCreate(BaseModel): title: str list_id: str description: Optional[str] = None priority: str = "medium" class TaskUpdate(BaseModel): title: Optional[str] = None description: Optional[str] = None completed: Optional[bool] = None priority: Optional[str] = None def hash_password(password: str) -> str: return hashlib.sha256(password.encode()).hexdigest() def verify_token(authorization: Optional[str], db: Session) -> str: if not authorization or not authorization.startswith("Bearer "): raise HTTPException(status_code=401, detail="Not authenticated") token_str = authorization.replace("Bearer ", "") token = db.query(db_models.Token).filter(db_models.Token.token == token_str).first() if not token: raise HTTPException(status_code=401, detail="Invalid token") return token.user_id @app.get("/") def read_root(): return {"message": "Welcome to Tasko API"} # Auth endpoints @app.post("/register", response_model=AuthResponse) def register(user_data: UserRegister, db: Session = Depends(get_db)): """Register a new user""" # Check if username or email exists (case-insensitive for username) existing_user = db.query(db_models.User).filter( (db_models.User.username.ilike(user_data.username)) | (db_models.User.email == user_data.email) ).first() if existing_user: if existing_user.username == user_data.username: raise HTTPException(status_code=400, detail="Username already exists") else: raise HTTPException(status_code=400, detail="Email already exists") # Create user user_id = str(uuid.uuid4()) new_user = db_models.User( id=user_id, username=user_data.username, email=user_data.email, password_hash=hash_password(user_data.password) ) db.add(new_user) # Create token token_str = str(uuid.uuid4()) new_token = db_models.Token(token=token_str, user_id=user_id) db.add(new_token) # Create default lists for new user default_lists = [ {"name": "Personal", "icon": "🏠", "color": "#667eea"}, {"name": "Work", "icon": "💼", "color": "#f093fb"}, {"name": "Shopping", "icon": "🛒", "color": "#4facfe"}, ] for list_data in default_lists: list_id = str(uuid.uuid4()) new_list = db_models.TaskList( id=list_id, user_id=user_id, name=list_data["name"], icon=list_data["icon"], color=list_data["color"] ) db.add(new_list) db.commit() db.refresh(new_user) return AuthResponse( user=UserResponse.model_validate(new_user), token=token_str ) @app.post("/login", response_model=AuthResponse) def login(user_data: UserLogin, db: Session = Depends(get_db)): """Login a user with username or email""" password_hash = hash_password(user_data.password) # Try to find user by email or username (case-insensitive for username) user = db.query(db_models.User).filter( ((db_models.User.email == user_data.username_or_email) | (db_models.User.username.ilike(user_data.username_or_email))), db_models.User.password_hash == password_hash ).first() if not user: raise HTTPException(status_code=401, detail="Invalid credentials") # Create new token token_str = str(uuid.uuid4()) new_token = db_models.Token(token=token_str, user_id=user.id) db.add(new_token) db.commit() return AuthResponse( user=UserResponse.model_validate(user), token=token_str ) @app.post("/logout") def logout(authorization: Optional[str] = Header(None), db: Session = Depends(get_db)): """Logout a user""" if authorization and authorization.startswith("Bearer "): token_str = authorization.replace("Bearer ", "") token = db.query(db_models.Token).filter(db_models.Token.token == token_str).first() if token: db.delete(token) db.commit() return {"message": "Logged out successfully"} @app.get("/auth/google") async def google_login(request: Request): """Initiate Google OAuth login""" redirect_uri = os.getenv('GOOGLE_REDIRECT_URI', 'http://localhost:8000/auth/google/callback') return await oauth.google.authorize_redirect(request, redirect_uri) @app.get("/auth/google/callback") async def google_callback(request: Request, db: Session = Depends(get_db)): """Handle Google OAuth callback""" try: # Get access token from Google token = await oauth.google.authorize_access_token(request) # Get user info from Google user_info = token.get('userinfo') if not user_info: raise HTTPException(status_code=400, detail="Failed to get user info from Google") email = user_info.get('email') google_id = user_info.get('sub') name = user_info.get('name', email.split('@')[0]) # Check if user exists user = db.query(db_models.User).filter(db_models.User.email == email).first() if not user: # Create new user user_id = str(uuid.uuid4()) # Use email username part as username, make it unique if needed username = email.split('@')[0] counter = 1 original_username = username while db.query(db_models.User).filter(db_models.User.username.ilike(username)).first(): username = f"{original_username}{counter}" counter += 1 user = db_models.User( id=user_id, username=username, email=email, password_hash=hashlib.sha256(google_id.encode()).hexdigest() # Use Google ID as password hash ) db.add(user) # Create default lists for new user default_lists = [ {"name": "Personal", "icon": "🏠", "color": "#667eea"}, {"name": "Work", "icon": "💼", "color": "#f093fb"}, {"name": "Shopping", "icon": "🛒", "color": "#4facfe"}, ] for list_data in default_lists: list_id = str(uuid.uuid4()) new_list = db_models.TaskList( id=list_id, user_id=user_id, name=list_data["name"], icon=list_data["icon"], color=list_data["color"] ) db.add(new_list) db.commit() db.refresh(user) # Create auth token token_str = str(uuid.uuid4()) new_token = db_models.Token(token=token_str, user_id=user.id) db.add(new_token) db.commit() # Redirect to frontend with token frontend_url = os.getenv('FRONTEND_URL', 'http://localhost:5173') return RedirectResponse(url=f"{frontend_url}?token={token_str}&user={user.id}") except Exception as e: raise HTTPException(status_code=400, detail=f"Authentication failed: {str(e)}") @app.get("/auth/google/url") def get_google_auth_url(): """Get Google OAuth URL for frontend""" redirect_uri = os.getenv('GOOGLE_REDIRECT_URI', 'http://localhost:8000/auth/google/callback') authorization_url = f"https://accounts.google.com/o/oauth2/v2/auth?client_id={os.getenv('GOOGLE_CLIENT_ID', '')}&redirect_uri={redirect_uri}&response_type=code&scope=openid%20email%20profile&access_type=offline&prompt=consent" return {"url": authorization_url} @app.get("/api/user", response_model=UserResponse) def get_current_user(authorization: Optional[str] = Header(None), db: Session = Depends(get_db)): """Get current user details""" user_id = verify_token(authorization, db) user = db.query(db_models.User).filter(db_models.User.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") return UserResponse.model_validate(user) # Task List endpoints @app.get("/lists", response_model=List[TaskListResponse]) @app.get("/api/lists", response_model=List[TaskListResponse]) def get_task_lists(authorization: Optional[str] = Header(None), db: Session = Depends(get_db)): """Get all task lists for the authenticated user""" user_id = verify_token(authorization, db) task_lists = db.query(db_models.TaskList).filter(db_models.TaskList.user_id == user_id).all() return [TaskListResponse.model_validate(tl) for tl in task_lists] @app.get("/lists/{list_id}", response_model=TaskListResponse) @app.get("/api/lists/{list_id}", response_model=TaskListResponse) def get_task_list(list_id: str, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)): """Get a specific task list by ID""" user_id = verify_token(authorization, db) task_list = db.query(db_models.TaskList).filter( db_models.TaskList.id == list_id, db_models.TaskList.user_id == user_id ).first() if not task_list: raise HTTPException(status_code=404, detail="Task list not found") return TaskListResponse.model_validate(task_list) @app.post("/lists", response_model=TaskListResponse, status_code=201) @app.post("/api/lists", response_model=TaskListResponse, status_code=201) def create_task_list(task_list: TaskListCreate, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)): """Create a new task list""" user_id = verify_token(authorization, db) list_id = str(uuid.uuid4()) new_list = db_models.TaskList( id=list_id, user_id=user_id, name=task_list.name, icon=task_list.icon or "📝", color=task_list.color or "#667eea" ) db.add(new_list) db.commit() db.refresh(new_list) return TaskListResponse.model_validate(new_list) @app.put("/lists/{list_id}", response_model=TaskListResponse) @app.put("/api/lists/{list_id}", response_model=TaskListResponse) def update_task_list(list_id: str, list_update: TaskListUpdate, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)): """Update an existing task list""" user_id = verify_token(authorization, db) task_list = db.query(db_models.TaskList).filter( db_models.TaskList.id == list_id, db_models.TaskList.user_id == user_id ).first() if not task_list: raise HTTPException(status_code=404, detail="Task list not found") update_data = list_update.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(task_list, field, value) db.commit() db.refresh(task_list) return TaskListResponse.model_validate(task_list) @app.delete("/lists/{list_id}", status_code=204) @app.delete("/api/lists/{list_id}", status_code=204) def delete_task_list(list_id: str, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)): """Delete a task list and all its tasks""" user_id = verify_token(authorization, db) task_list = db.query(db_models.TaskList).filter( db_models.TaskList.id == list_id, db_models.TaskList.user_id == user_id ).first() if not task_list: raise HTTPException(status_code=404, detail="Task list not found") # Tasks will be deleted automatically due to cascade relationship db.delete(task_list) db.commit() return None # Task endpoints @app.get("/tasks", response_model=List[TaskResponse]) @app.get("/api/tasks", response_model=List[TaskResponse]) def get_tasks(list_id: Optional[str] = None, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)): """Get all tasks for authenticated user, optionally filtered by list_id""" user_id = verify_token(authorization, db) query = db.query(db_models.Task).filter(db_models.Task.user_id == user_id) if list_id: # Verify list belongs to user task_list = db.query(db_models.TaskList).filter( db_models.TaskList.id == list_id, db_models.TaskList.user_id == user_id ).first() if not task_list: raise HTTPException(status_code=403, detail="Access denied") query = query.filter(db_models.Task.list_id == list_id) tasks = query.all() return [TaskResponse.model_validate(t) for t in tasks] @app.get("/tasks/{task_id}", response_model=TaskResponse) @app.get("/api/tasks/{task_id}", response_model=TaskResponse) def get_task(task_id: str, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)): """Get a specific task by ID""" user_id = verify_token(authorization, db) task = db.query(db_models.Task).filter( db_models.Task.id == task_id, db_models.Task.user_id == user_id ).first() if not task: raise HTTPException(status_code=404, detail="Task not found") return TaskResponse.model_validate(task) @app.post("/tasks", response_model=TaskResponse, status_code=201) @app.post("/api/tasks", response_model=TaskResponse, status_code=201) def create_task(task: TaskCreate, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)): """Create a new task""" user_id = verify_token(authorization, db) # Verify list exists and belongs to user task_list = db.query(db_models.TaskList).filter( db_models.TaskList.id == task.list_id, db_models.TaskList.user_id == user_id ).first() if not task_list: raise HTTPException(status_code=404, detail="Task list not found") task_id = str(uuid.uuid4()) new_task = db_models.Task( id=task_id, list_id=task.list_id, user_id=user_id, title=task.title, description=task.description, completed=False, priority=task.priority ) db.add(new_task) db.commit() db.refresh(new_task) return TaskResponse.model_validate(new_task) @app.put("/tasks/{task_id}", response_model=TaskResponse) @app.put("/api/tasks/{task_id}", response_model=TaskResponse) def update_task(task_id: str, task_update: TaskUpdate, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)): """Update an existing task""" user_id = verify_token(authorization, db) task = db.query(db_models.Task).filter( db_models.Task.id == task_id, db_models.Task.user_id == user_id ).first() if not task: raise HTTPException(status_code=404, detail="Task not found") update_data = task_update.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(task, field, value) task.updated_at = datetime.now() db.commit() db.refresh(task) return TaskResponse.model_validate(task) @app.delete("/tasks/{task_id}", status_code=204) @app.delete("/api/tasks/{task_id}", status_code=204) def delete_task(task_id: str, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)): """Delete a task""" user_id = verify_token(authorization, db) task = db.query(db_models.Task).filter( db_models.Task.id == task_id, db_models.Task.user_id == user_id ).first() if not task: raise HTTPException(status_code=404, detail="Task not found") db.delete(task) db.commit() return None if __name__ == "__main__": import uvicorn uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)