2026-02-04 16:50:33 +02:00

571 lines
20 KiB
Python

from typing import List, Dict, Any
import os
import secrets
from pathlib import Path
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File
from sqlalchemy.orm import Session
from sqlalchemy import func
from app.core.config import settings
from app.deps import get_db, require_admin, get_current_user
from app.models import Course, Module, QuizQuestion, QuizChoice, CourseGroup, Group, GroupMembership, User, UserRole, Upload, Enrollment, ModuleProgress, QuizAttempt, ProgressStatus
from app.schemas import (
AdminCourseCreate,
AdminCourseOut,
AdminCourseUpdate,
AdminUserOut,
AdminUserUpdate,
GroupCreate,
GroupOut,
GroupUpdate,
GroupMemberAdd,
ModuleCreate,
ModuleOut,
ModuleUpdate,
QuizQuestionCreate,
QuizQuestionOut,
QuizQuestionUpdate,
UploadOut,
)
router = APIRouter(prefix="/admin", tags=["admin"], dependencies=[Depends(require_admin)])
@router.get("/courses", response_model=List[AdminCourseOut])
def list_courses(db: Session = Depends(get_db)):
courses = db.query(Course).order_by(Course.created_at.desc()).all()
results = []
for course in courses:
group_ids = [cg.group_id for cg in course.course_groups]
results.append(
AdminCourseOut(
id=course.id,
title=course.title,
description=course.description,
is_published=course.is_published,
created_at=course.created_at,
updated_at=course.updated_at,
group_ids=group_ids,
)
)
return results
@router.post("/courses", response_model=AdminCourseOut)
def create_course(payload: AdminCourseCreate, db: Session = Depends(get_db)):
course = Course(**payload.model_dump(exclude={"group_ids"}))
db.add(course)
db.commit()
db.refresh(course)
if payload.group_ids:
for gid in payload.group_ids:
db.add(CourseGroup(course_id=course.id, group_id=gid))
db.commit()
db.refresh(course)
return AdminCourseOut(
id=course.id,
title=course.title,
description=course.description,
is_published=course.is_published,
created_at=course.created_at,
updated_at=course.updated_at,
group_ids=[cg.group_id for cg in course.course_groups],
)
@router.put("/courses/{course_id}", response_model=AdminCourseOut)
def update_course(course_id: int, payload: AdminCourseUpdate, db: Session = Depends(get_db)):
course = db.get(Course, course_id)
if not course:
raise HTTPException(status_code=404, detail="Course not found")
data = payload.model_dump(exclude_unset=True)
group_ids = data.pop("group_ids", None)
for key, value in data.items():
setattr(course, key, value)
if group_ids is not None:
course.course_groups.clear()
for gid in group_ids:
course.course_groups.append(CourseGroup(group_id=gid))
db.commit()
db.refresh(course)
return AdminCourseOut(
id=course.id,
title=course.title,
description=course.description,
is_published=course.is_published,
created_at=course.created_at,
updated_at=course.updated_at,
group_ids=[cg.group_id for cg in course.course_groups],
)
@router.delete("/courses/{course_id}")
def delete_course(course_id: int, db: Session = Depends(get_db)):
course = db.get(Course, course_id)
if not course:
raise HTTPException(status_code=404, detail="Course not found")
db.delete(course)
db.commit()
return {"ok": True}
@router.patch("/courses/{course_id}/publish", response_model=AdminCourseOut)
def toggle_publish(course_id: int, db: Session = Depends(get_db)):
course = db.get(Course, course_id)
if not course:
raise HTTPException(status_code=404, detail="Course not found")
course.is_published = not course.is_published
db.commit()
db.refresh(course)
return AdminCourseOut(
id=course.id,
title=course.title,
description=course.description,
is_published=course.is_published,
created_at=course.created_at,
updated_at=course.updated_at,
group_ids=[cg.group_id for cg in course.course_groups],
)
@router.get("/courses/{course_id}/modules", response_model=List[ModuleOut])
def list_modules(course_id: int, db: Session = Depends(get_db)):
return (
db.query(Module)
.filter(Module.course_id == course_id)
.order_by(Module.order_index)
.all()
)
@router.post("/courses/{course_id}/modules", response_model=ModuleOut)
def create_module(course_id: int, payload: ModuleCreate, db: Session = Depends(get_db)):
course = db.get(Course, course_id)
if not course:
raise HTTPException(status_code=404, detail="Course not found")
module = Module(course_id=course_id, **payload.model_dump())
db.add(module)
db.commit()
db.refresh(module)
return module
@router.put("/modules/{module_id}", response_model=ModuleOut)
def update_module(module_id: int, payload: ModuleUpdate, db: Session = Depends(get_db)):
module = db.get(Module, module_id)
if not module:
raise HTTPException(status_code=404, detail="Module not found")
for key, value in payload.model_dump(exclude_unset=True).items():
setattr(module, key, value)
db.commit()
db.refresh(module)
return module
@router.get("/modules/{module_id}", response_model=ModuleOut)
def get_module(module_id: int, db: Session = Depends(get_db)):
module = db.get(Module, module_id)
if not module:
raise HTTPException(status_code=404, detail="Module not found")
return module
@router.get("/users", response_model=List[AdminUserOut])
def list_users(q: str | None = None, db: Session = Depends(get_db)):
query = db.query(User)
if q:
query = query.filter(User.email.ilike(f"%{q}%"))
return query.order_by(User.created_at.desc()).all()
@router.get("/users/{user_id}", response_model=AdminUserOut)
def get_user(user_id: int, db: Session = Depends(get_db)):
user = db.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@router.patch("/users/{user_id}", response_model=AdminUserOut)
def update_user(user_id: int, payload: AdminUserUpdate, db: Session = Depends(get_db)):
user = db.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
data = payload.model_dump(exclude_unset=True)
if "role" in data and data["role"] in [UserRole.admin, UserRole.learner, "admin", "learner"]:
user.role = UserRole(data["role"])
if "is_active" in data:
user.is_active = data["is_active"]
db.commit()
db.refresh(user)
return user
@router.get("/groups", response_model=List[GroupOut])
def list_groups(db: Session = Depends(get_db)):
return db.query(Group).order_by(Group.created_at.desc()).all()
@router.post("/groups", response_model=GroupOut)
def create_group(payload: GroupCreate, db: Session = Depends(get_db)):
existing = db.query(Group).filter(Group.name == payload.name).first()
if existing:
raise HTTPException(status_code=400, detail="Group name already exists")
group = Group(name=payload.name)
db.add(group)
db.commit()
db.refresh(group)
return group
@router.put("/groups/{group_id}", response_model=GroupOut)
def update_group(group_id: int, payload: GroupUpdate, db: Session = Depends(get_db)):
group = db.get(Group, group_id)
if not group:
raise HTTPException(status_code=404, detail="Group not found")
for key, value in payload.model_dump(exclude_unset=True).items():
setattr(group, key, value)
db.commit()
db.refresh(group)
return group
@router.delete("/groups/{group_id}")
def delete_group(group_id: int, db: Session = Depends(get_db)):
group = db.get(Group, group_id)
if not group:
raise HTTPException(status_code=404, detail="Group not found")
db.delete(group)
db.commit()
return {"ok": True}
@router.get("/groups/{group_id}/members", response_model=List[AdminUserOut])
def list_group_members(group_id: int, db: Session = Depends(get_db)):
group = db.get(Group, group_id)
if not group:
raise HTTPException(status_code=404, detail="Group not found")
members = (
db.query(User)
.join(GroupMembership, GroupMembership.user_id == User.id)
.filter(GroupMembership.group_id == group_id)
.all()
)
return members
@router.post("/groups/{group_id}/members")
def add_group_member(group_id: int, payload: GroupMemberAdd, db: Session = Depends(get_db)):
group = db.get(Group, group_id)
if not group:
raise HTTPException(status_code=404, detail="Group not found")
user = None
if payload.user_id:
user = db.get(User, payload.user_id)
if not user and payload.email:
user = db.query(User).filter(User.email == payload.email).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
existing = (
db.query(GroupMembership)
.filter(GroupMembership.group_id == group_id, GroupMembership.user_id == user.id)
.first()
)
if existing:
return {"ok": True}
db.add(GroupMembership(group_id=group_id, user_id=user.id))
db.commit()
return {"ok": True}
@router.delete("/groups/{group_id}/members/{user_id}")
def remove_group_member(group_id: int, user_id: int, db: Session = Depends(get_db)):
membership = (
db.query(GroupMembership)
.filter(GroupMembership.group_id == group_id, GroupMembership.user_id == user_id)
.first()
)
if not membership:
raise HTTPException(status_code=404, detail="Membership not found")
db.delete(membership)
db.commit()
return {"ok": True}
@router.delete("/modules/{module_id}")
def delete_module(module_id: int, db: Session = Depends(get_db)):
module = db.get(Module, module_id)
if not module:
raise HTTPException(status_code=404, detail="Module not found")
db.delete(module)
db.commit()
return {"ok": True}
@router.get("/modules/{module_id}/questions", response_model=List[QuizQuestionOut])
def list_questions(module_id: int, db: Session = Depends(get_db)):
return (
db.query(QuizQuestion)
.filter(QuizQuestion.module_id == module_id)
.all()
)
@router.post("/modules/{module_id}/questions", response_model=QuizQuestionOut)
def create_question(module_id: int, payload: QuizQuestionCreate, db: Session = Depends(get_db)):
module = db.get(Module, module_id)
if not module:
raise HTTPException(status_code=404, detail="Module not found")
question = QuizQuestion(module_id=module_id, prompt=payload.prompt, question_type=payload.question_type)
db.add(question)
db.flush()
for choice in payload.choices:
db.add(QuizChoice(question_id=question.id, text=choice.text, is_correct=choice.is_correct))
db.commit()
db.refresh(question)
return question
@router.put("/questions/{question_id}", response_model=QuizQuestionOut)
def update_question(question_id: int, payload: QuizQuestionUpdate, db: Session = Depends(get_db)):
question = db.get(QuizQuestion, question_id)
if not question:
raise HTTPException(status_code=404, detail="Question not found")
data = payload.model_dump(exclude_unset=True)
if "prompt" in data:
question.prompt = data["prompt"]
if "question_type" in data:
question.question_type = data["question_type"]
if "choices" in data:
question.choices.clear()
for choice in data["choices"] or []:
question.choices.append(QuizChoice(text=choice.text, is_correct=choice.is_correct))
db.commit()
db.refresh(question)
return question
@router.delete("/questions/{question_id}")
def delete_question(question_id: int, db: Session = Depends(get_db)):
question = db.get(QuizQuestion, question_id)
if not question:
raise HTTPException(status_code=404, detail="Question not found")
db.delete(question)
db.commit()
return {"ok": True}
@router.post("/uploads", response_model=UploadOut)
async def upload_file(file: UploadFile = File(...), db: Session = Depends(get_db), user: User = Depends(get_current_user)):
# Validate file type (PDF or PPTX only)
allowed_types = ["application/pdf", "application/vnd.openxmlformats-officedocument.presentationml.presentation", "application/vnd.ms-powerpoint"]
if file.content_type not in allowed_types:
raise HTTPException(status_code=400, detail="Only PDF and PPTX files are allowed")
# Ensure upload directory exists
upload_dir = Path(settings.upload_dir)
upload_dir.mkdir(parents=True, exist_ok=True)
# Generate unique filename
file_ext = Path(file.filename).suffix
unique_name = f"{secrets.token_urlsafe(16)}{file_ext}"
file_path = upload_dir / unique_name
# Save file
content = await file.read()
with open(file_path, "wb") as f:
f.write(content)
# Store metadata in database
upload = Upload(
filename=file.filename,
content_type=file.content_type,
file_path=str(file_path),
size_bytes=len(content),
uploaded_by=user.id,
)
db.add(upload)
db.commit()
db.refresh(upload)
return upload
@router.get("/uploads/{upload_id}")
def get_upload(upload_id: int, db: Session = Depends(get_db)):
upload = db.get(Upload, upload_id)
if not upload:
raise HTTPException(status_code=404, detail="Upload not found")
return upload
@router.get("/stats/users")
def stats_users(db: Session = Depends(get_db)) -> List[Dict[str, Any]]:
"""Get statistics for all users"""
users = db.query(User).filter(User.role == UserRole.learner).all()
result = []
for user in users:
enrollments_count = db.query(func.count(Enrollment.id)).filter(Enrollment.user_id == user.id).scalar()
completed_count = (
db.query(func.count(func.distinct(ModuleProgress.module_id)))
.join(Module, Module.id == ModuleProgress.module_id)
.filter(
ModuleProgress.user_id == user.id,
ModuleProgress.status == ProgressStatus.completed,
)
.scalar()
)
attempts_count = db.query(func.count(QuizAttempt.id)).filter(QuizAttempt.user_id == user.id).scalar()
result.append({
"id": user.id,
"email": user.email,
"enrollments_count": enrollments_count,
"completed_modules_count": completed_count,
"attempts_count": attempts_count,
"created_at": user.created_at,
})
return result
@router.get("/stats/users/{user_id}")
def stats_user(user_id: int, db: Session = Depends(get_db)) -> Dict[str, Any]:
"""Get detailed statistics for a specific user"""
user = db.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
enrollments = db.query(Enrollment).filter(Enrollment.user_id == user_id).all()
courses_data = []
for enrollment in enrollments:
course = db.get(Course, enrollment.course_id)
if not course:
continue
modules = db.query(Module).filter(Module.course_id == course.id).order_by(Module.order_index).all()
progress_map = {
p.module_id: p
for p in db.query(ModuleProgress).filter(ModuleProgress.user_id == user_id).all()
}
completed = [p for p in progress_map.values() if p.status == ProgressStatus.completed]
total_modules = len(modules)
completed_modules = len([m for m in modules if progress_map.get(m.id) and progress_map.get(m.id).status == ProgressStatus.completed])
progress_percent = (completed_modules / total_modules * 100) if total_modules > 0 else 0
# Get quiz attempts for this course
attempts_data = []
for module in modules:
if module.type.value == "quiz":
attempts = (
db.query(QuizAttempt)
.filter(QuizAttempt.user_id == user_id, QuizAttempt.module_id == module.id)
.order_by(QuizAttempt.submitted_at.desc())
.all()
)
for attempt in attempts:
attempts_data.append({
"module_id": module.id,
"module_title": module.title,
"score": attempt.score,
"passed": attempt.passed,
"submitted_at": attempt.submitted_at,
})
courses_data.append({
"course_id": course.id,
"course_title": course.title,
"enrolled_at": enrollment.enrolled_at,
"progress_percent": progress_percent,
"completed_modules": completed_modules,
"total_modules": total_modules,
"attempts": attempts_data,
})
return {
"user_id": user_id,
"email": user.email,
"courses": courses_data,
}
@router.get("/stats/courses")
def stats_courses(db: Session = Depends(get_db)) -> List[Dict[str, Any]]:
"""Get statistics for all courses"""
courses = db.query(Course).all()
result = []
for course in courses:
enrollments_count = db.query(func.count(Enrollment.id)).filter(Enrollment.course_id == course.id).scalar()
# Count users who completed all modules
modules = db.query(Module).filter(Module.course_id == course.id).all()
total_modules = len(modules)
completions_count = 0
if total_modules > 0:
enrollments = db.query(Enrollment).filter(Enrollment.course_id == course.id).all()
for enrollment in enrollments:
completed_modules = (
db.query(func.count(ModuleProgress.id))
.join(Module, Module.id == ModuleProgress.module_id)
.filter(
Module.course_id == course.id,
ModuleProgress.user_id == enrollment.user_id,
ModuleProgress.status == ProgressStatus.completed,
)
.scalar()
)
if completed_modules >= total_modules:
completions_count += 1
# Get final exam average (last module if it's a quiz)
final_exam_avg = None
if modules:
last_module = modules[-1]
if last_module.type.value == "quiz":
avg_score = db.query(func.avg(QuizAttempt.score)).filter(QuizAttempt.module_id == last_module.id).scalar()
final_exam_avg = float(avg_score) if avg_score else None
result.append({
"id": course.id,
"title": course.title,
"enrollments_count": enrollments_count,
"completions_count": completions_count,
"final_exam_avg_score": final_exam_avg,
})
return result
@router.get("/stats/courses/{course_id}")
def stats_course(course_id: int, db: Session = Depends(get_db)) -> Dict[str, Any]:
"""Get detailed statistics for a specific course"""
course = db.get(Course, course_id)
if not course:
raise HTTPException(status_code=404, detail="Course not found")
enrollments = db.query(Enrollment).filter(Enrollment.course_id == course_id).all()
modules = db.query(Module).filter(Module.course_id == course_id).order_by(Module.order_index).all()
attempts_distribution = []
for module in modules:
if module.type.value == "quiz":
attempts = db.query(QuizAttempt).filter(QuizAttempt.module_id == module.id).all()
attempts_distribution.append({
"module_id": module.id,
"module_title": module.title,
"total_attempts": len(attempts),
"avg_score": sum(a.score for a in attempts) / len(attempts) if attempts else 0,
"pass_rate": sum(1 for a in attempts if a.passed) / len(attempts) * 100 if attempts else 0,
})
return {
"course_id": course.id,
"title": course.title,
"enrollments_count": len(enrollments),
"total_modules": len(modules),
"attempts_distribution": attempts_distribution,
}