tasko/backend/main.py
dvirlabs d6eeb9a079 feat: Add case-insensitive username login and Google OAuth authentication
- Make username login case-insensitive (Dvir = dvir = DVIR)
- Add Google OAuth login/signup functionality
- Install required dependencies (authlib, httpx, python-dotenv)
- Create OAuth endpoints: /auth/google/url, /auth/google/callback
- Add Google login button to frontend with styled UI
- Configure environment variables for OAuth credentials
- Add comprehensive setup documentation
- Secure .env file with .gitignore
2026-02-20 16:30:27 +02:00

537 lines
18 KiB
Python

from fastapi import FastAPI, HTTPException, Header, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import RedirectResponse
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime
from sqlalchemy.orm import Session
from authlib.integrations.starlette_client import OAuth
import uuid
import hashlib
import os
from dotenv import load_dotenv
load_dotenv()
from database import init_db, get_db
import database as db_models
app = FastAPI(title="Tasko API", version="1.0.0")
# Initialize database
init_db()
# OAuth Configuration
oauth = OAuth()
oauth.register(
name='google',
client_id=os.getenv('GOOGLE_CLIENT_ID', ''),
client_secret=os.getenv('GOOGLE_CLIENT_SECRET', ''),
server_metadata_url='https://accounts.google.com/.well-known/openid-configuration',
client_kwargs={
'scope': 'openid email profile'
}
)
allowed_origins = ["http://localhost:5173", "https://tasko.dvirlabs.com"]
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Pydantic Models for API
class UserResponse(BaseModel):
id: str
username: str
email: str
created_at: datetime
class Config:
from_attributes = True
class UserRegister(BaseModel):
username: str
email: str
password: str
class UserLogin(BaseModel):
username_or_email: str = Field(..., alias='usernameOrEmail')
password: str
class Config:
populate_by_name = True
class AuthResponse(BaseModel):
user: UserResponse
token: str
class TaskListResponse(BaseModel):
id: str
user_id: str
name: str
icon: str = "📝"
color: str = "#667eea"
created_at: datetime
class Config:
from_attributes = True
class TaskListCreate(BaseModel):
name: str
icon: Optional[str] = "📝"
color: Optional[str] = "#667eea"
class TaskListUpdate(BaseModel):
name: Optional[str] = None
icon: Optional[str] = None
color: Optional[str] = None
class TaskResponse(BaseModel):
id: str
list_id: str
user_id: str
title: str
description: Optional[str] = None
completed: bool = False
priority: str = "medium"
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class TaskCreate(BaseModel):
title: str
list_id: str
description: Optional[str] = None
priority: str = "medium"
class TaskUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
completed: Optional[bool] = None
priority: Optional[str] = None
def hash_password(password: str) -> str:
return hashlib.sha256(password.encode()).hexdigest()
def verify_token(authorization: Optional[str], db: Session) -> str:
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Not authenticated")
token_str = authorization.replace("Bearer ", "")
token = db.query(db_models.Token).filter(db_models.Token.token == token_str).first()
if not token:
raise HTTPException(status_code=401, detail="Invalid token")
return token.user_id
@app.get("/")
def read_root():
return {"message": "Welcome to Tasko API"}
# Auth endpoints
@app.post("/register", response_model=AuthResponse)
def register(user_data: UserRegister, db: Session = Depends(get_db)):
"""Register a new user"""
# Check if username or email exists (case-insensitive for username)
existing_user = db.query(db_models.User).filter(
(db_models.User.username.ilike(user_data.username)) |
(db_models.User.email == user_data.email)
).first()
if existing_user:
if existing_user.username == user_data.username:
raise HTTPException(status_code=400, detail="Username already exists")
else:
raise HTTPException(status_code=400, detail="Email already exists")
# Create user
user_id = str(uuid.uuid4())
new_user = db_models.User(
id=user_id,
username=user_data.username,
email=user_data.email,
password_hash=hash_password(user_data.password)
)
db.add(new_user)
# Create token
token_str = str(uuid.uuid4())
new_token = db_models.Token(token=token_str, user_id=user_id)
db.add(new_token)
# Create default lists for new user
default_lists = [
{"name": "Personal", "icon": "🏠", "color": "#667eea"},
{"name": "Work", "icon": "💼", "color": "#f093fb"},
{"name": "Shopping", "icon": "🛒", "color": "#4facfe"},
]
for list_data in default_lists:
list_id = str(uuid.uuid4())
new_list = db_models.TaskList(
id=list_id,
user_id=user_id,
name=list_data["name"],
icon=list_data["icon"],
color=list_data["color"]
)
db.add(new_list)
db.commit()
db.refresh(new_user)
return AuthResponse(
user=UserResponse.model_validate(new_user),
token=token_str
)
@app.post("/login", response_model=AuthResponse)
def login(user_data: UserLogin, db: Session = Depends(get_db)):
"""Login a user with username or email"""
password_hash = hash_password(user_data.password)
# Try to find user by email or username (case-insensitive for username)
user = db.query(db_models.User).filter(
((db_models.User.email == user_data.username_or_email) |
(db_models.User.username.ilike(user_data.username_or_email))),
db_models.User.password_hash == password_hash
).first()
if not user:
raise HTTPException(status_code=401, detail="Invalid credentials")
# Create new token
token_str = str(uuid.uuid4())
new_token = db_models.Token(token=token_str, user_id=user.id)
db.add(new_token)
db.commit()
return AuthResponse(
user=UserResponse.model_validate(user),
token=token_str
)
@app.post("/logout")
def logout(authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
"""Logout a user"""
if authorization and authorization.startswith("Bearer "):
token_str = authorization.replace("Bearer ", "")
token = db.query(db_models.Token).filter(db_models.Token.token == token_str).first()
if token:
db.delete(token)
db.commit()
return {"message": "Logged out successfully"}
@app.get("/auth/google")
async def google_login():
"""Initiate Google OAuth login"""
redirect_uri = os.getenv('GOOGLE_REDIRECT_URI', 'http://localhost:8000/auth/google/callback')
return await oauth.google.authorize_redirect(redirect_uri)
@app.get("/auth/google/callback")
async def google_callback(code: str, db: Session = Depends(get_db)):
"""Handle Google OAuth callback"""
try:
# Get access token from Google
token = await oauth.google.authorize_access_token()
# Get user info from Google
user_info = token.get('userinfo')
if not user_info:
raise HTTPException(status_code=400, detail="Failed to get user info from Google")
email = user_info.get('email')
google_id = user_info.get('sub')
name = user_info.get('name', email.split('@')[0])
# Check if user exists
user = db.query(db_models.User).filter(db_models.User.email == email).first()
if not user:
# Create new user
user_id = str(uuid.uuid4())
# Use email username part as username, make it unique if needed
username = email.split('@')[0]
counter = 1
original_username = username
while db.query(db_models.User).filter(db_models.User.username.ilike(username)).first():
username = f"{original_username}{counter}"
counter += 1
user = db_models.User(
id=user_id,
username=username,
email=email,
password_hash=hashlib.sha256(google_id.encode()).hexdigest() # Use Google ID as password hash
)
db.add(user)
# Create default lists for new user
default_lists = [
{"name": "Personal", "icon": "🏠", "color": "#667eea"},
{"name": "Work", "icon": "💼", "color": "#f093fb"},
{"name": "Shopping", "icon": "🛒", "color": "#4facfe"},
]
for list_data in default_lists:
list_id = str(uuid.uuid4())
new_list = db_models.TaskList(
id=list_id,
user_id=user_id,
name=list_data["name"],
icon=list_data["icon"],
color=list_data["color"]
)
db.add(new_list)
db.commit()
db.refresh(user)
# Create auth token
token_str = str(uuid.uuid4())
new_token = db_models.Token(token=token_str, user_id=user.id)
db.add(new_token)
db.commit()
# Redirect to frontend with token
frontend_url = os.getenv('FRONTEND_URL', 'http://localhost:5173')
return RedirectResponse(url=f"{frontend_url}?token={token_str}&user={user.id}")
except Exception as e:
raise HTTPException(status_code=400, detail=f"Authentication failed: {str(e)}")
@app.get("/auth/google/url")
def get_google_auth_url():
"""Get Google OAuth URL for frontend"""
redirect_uri = os.getenv('GOOGLE_REDIRECT_URI', 'http://localhost:8000/auth/google/callback')
authorization_url = f"https://accounts.google.com/o/oauth2/v2/auth?client_id={os.getenv('GOOGLE_CLIENT_ID', '')}&redirect_uri={redirect_uri}&response_type=code&scope=openid%20email%20profile&access_type=offline&prompt=consent"
return {"url": authorization_url}
@app.get("/api/user", response_model=UserResponse)
def get_current_user(authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
"""Get current user details"""
user_id = verify_token(authorization, db)
user = db.query(db_models.User).filter(db_models.User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return UserResponse.model_validate(user)
# Task List endpoints
@app.get("/lists", response_model=List[TaskListResponse])
@app.get("/api/lists", response_model=List[TaskListResponse])
def get_task_lists(authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
"""Get all task lists for the authenticated user"""
user_id = verify_token(authorization, db)
task_lists = db.query(db_models.TaskList).filter(db_models.TaskList.user_id == user_id).all()
return [TaskListResponse.model_validate(tl) for tl in task_lists]
@app.get("/lists/{list_id}", response_model=TaskListResponse)
@app.get("/api/lists/{list_id}", response_model=TaskListResponse)
def get_task_list(list_id: str, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
"""Get a specific task list by ID"""
user_id = verify_token(authorization, db)
task_list = db.query(db_models.TaskList).filter(
db_models.TaskList.id == list_id,
db_models.TaskList.user_id == user_id
).first()
if not task_list:
raise HTTPException(status_code=404, detail="Task list not found")
return TaskListResponse.model_validate(task_list)
@app.post("/lists", response_model=TaskListResponse, status_code=201)
@app.post("/api/lists", response_model=TaskListResponse, status_code=201)
def create_task_list(task_list: TaskListCreate, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
"""Create a new task list"""
user_id = verify_token(authorization, db)
list_id = str(uuid.uuid4())
new_list = db_models.TaskList(
id=list_id,
user_id=user_id,
name=task_list.name,
icon=task_list.icon or "📝",
color=task_list.color or "#667eea"
)
db.add(new_list)
db.commit()
db.refresh(new_list)
return TaskListResponse.model_validate(new_list)
@app.put("/lists/{list_id}", response_model=TaskListResponse)
@app.put("/api/lists/{list_id}", response_model=TaskListResponse)
def update_task_list(list_id: str, list_update: TaskListUpdate, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
"""Update an existing task list"""
user_id = verify_token(authorization, db)
task_list = db.query(db_models.TaskList).filter(
db_models.TaskList.id == list_id,
db_models.TaskList.user_id == user_id
).first()
if not task_list:
raise HTTPException(status_code=404, detail="Task list not found")
update_data = list_update.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(task_list, field, value)
db.commit()
db.refresh(task_list)
return TaskListResponse.model_validate(task_list)
@app.delete("/lists/{list_id}", status_code=204)
@app.delete("/api/lists/{list_id}", status_code=204)
def delete_task_list(list_id: str, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
"""Delete a task list and all its tasks"""
user_id = verify_token(authorization, db)
task_list = db.query(db_models.TaskList).filter(
db_models.TaskList.id == list_id,
db_models.TaskList.user_id == user_id
).first()
if not task_list:
raise HTTPException(status_code=404, detail="Task list not found")
# Tasks will be deleted automatically due to cascade relationship
db.delete(task_list)
db.commit()
return None
# Task endpoints
@app.get("/tasks", response_model=List[TaskResponse])
@app.get("/api/tasks", response_model=List[TaskResponse])
def get_tasks(list_id: Optional[str] = None, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
"""Get all tasks for authenticated user, optionally filtered by list_id"""
user_id = verify_token(authorization, db)
query = db.query(db_models.Task).filter(db_models.Task.user_id == user_id)
if list_id:
# Verify list belongs to user
task_list = db.query(db_models.TaskList).filter(
db_models.TaskList.id == list_id,
db_models.TaskList.user_id == user_id
).first()
if not task_list:
raise HTTPException(status_code=403, detail="Access denied")
query = query.filter(db_models.Task.list_id == list_id)
tasks = query.all()
return [TaskResponse.model_validate(t) for t in tasks]
@app.get("/tasks/{task_id}", response_model=TaskResponse)
@app.get("/api/tasks/{task_id}", response_model=TaskResponse)
def get_task(task_id: str, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
"""Get a specific task by ID"""
user_id = verify_token(authorization, db)
task = db.query(db_models.Task).filter(
db_models.Task.id == task_id,
db_models.Task.user_id == user_id
).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return TaskResponse.model_validate(task)
@app.post("/tasks", response_model=TaskResponse, status_code=201)
@app.post("/api/tasks", response_model=TaskResponse, status_code=201)
def create_task(task: TaskCreate, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
"""Create a new task"""
user_id = verify_token(authorization, db)
# Verify list exists and belongs to user
task_list = db.query(db_models.TaskList).filter(
db_models.TaskList.id == task.list_id,
db_models.TaskList.user_id == user_id
).first()
if not task_list:
raise HTTPException(status_code=404, detail="Task list not found")
task_id = str(uuid.uuid4())
new_task = db_models.Task(
id=task_id,
list_id=task.list_id,
user_id=user_id,
title=task.title,
description=task.description,
completed=False,
priority=task.priority
)
db.add(new_task)
db.commit()
db.refresh(new_task)
return TaskResponse.model_validate(new_task)
@app.put("/tasks/{task_id}", response_model=TaskResponse)
@app.put("/api/tasks/{task_id}", response_model=TaskResponse)
def update_task(task_id: str, task_update: TaskUpdate, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
"""Update an existing task"""
user_id = verify_token(authorization, db)
task = db.query(db_models.Task).filter(
db_models.Task.id == task_id,
db_models.Task.user_id == user_id
).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
update_data = task_update.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(task, field, value)
task.updated_at = datetime.now()
db.commit()
db.refresh(task)
return TaskResponse.model_validate(task)
@app.delete("/tasks/{task_id}", status_code=204)
@app.delete("/api/tasks/{task_id}", status_code=204)
def delete_task(task_id: str, authorization: Optional[str] = Header(None), db: Session = Depends(get_db)):
"""Delete a task"""
user_id = verify_token(authorization, db)
task = db.query(db_models.Task).filter(
db_models.Task.id == task_id,
db_models.Task.user_id == user_id
).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
db.delete(task)
db.commit()
return None
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)