571 lines
20 KiB
Python
571 lines
20 KiB
Python
from typing import List, Dict, Any
|
|
import os
|
|
import secrets
|
|
from pathlib import Path
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import func
|
|
|
|
from app.core.config import settings
|
|
from app.deps import get_db, require_admin, get_current_user
|
|
from app.models import Course, Module, QuizQuestion, QuizChoice, CourseGroup, Group, GroupMembership, User, UserRole, Upload, Enrollment, ModuleProgress, QuizAttempt, ProgressStatus
|
|
from app.schemas import (
|
|
AdminCourseCreate,
|
|
AdminCourseOut,
|
|
AdminCourseUpdate,
|
|
AdminUserOut,
|
|
AdminUserUpdate,
|
|
GroupCreate,
|
|
GroupOut,
|
|
GroupUpdate,
|
|
GroupMemberAdd,
|
|
ModuleCreate,
|
|
ModuleOut,
|
|
ModuleUpdate,
|
|
QuizQuestionCreate,
|
|
QuizQuestionOut,
|
|
QuizQuestionUpdate,
|
|
UploadOut,
|
|
)
|
|
|
|
router = APIRouter(prefix="/admin", tags=["admin"], dependencies=[Depends(require_admin)])
|
|
|
|
|
|
@router.get("/courses", response_model=List[AdminCourseOut])
|
|
def list_courses(db: Session = Depends(get_db)):
|
|
courses = db.query(Course).order_by(Course.created_at.desc()).all()
|
|
results = []
|
|
for course in courses:
|
|
group_ids = [cg.group_id for cg in course.course_groups]
|
|
results.append(
|
|
AdminCourseOut(
|
|
id=course.id,
|
|
title=course.title,
|
|
description=course.description,
|
|
is_published=course.is_published,
|
|
created_at=course.created_at,
|
|
updated_at=course.updated_at,
|
|
group_ids=group_ids,
|
|
)
|
|
)
|
|
return results
|
|
|
|
|
|
@router.post("/courses", response_model=AdminCourseOut)
|
|
def create_course(payload: AdminCourseCreate, db: Session = Depends(get_db)):
|
|
course = Course(**payload.model_dump(exclude={"group_ids"}))
|
|
db.add(course)
|
|
db.commit()
|
|
db.refresh(course)
|
|
if payload.group_ids:
|
|
for gid in payload.group_ids:
|
|
db.add(CourseGroup(course_id=course.id, group_id=gid))
|
|
db.commit()
|
|
db.refresh(course)
|
|
return AdminCourseOut(
|
|
id=course.id,
|
|
title=course.title,
|
|
description=course.description,
|
|
is_published=course.is_published,
|
|
created_at=course.created_at,
|
|
updated_at=course.updated_at,
|
|
group_ids=[cg.group_id for cg in course.course_groups],
|
|
)
|
|
|
|
|
|
@router.put("/courses/{course_id}", response_model=AdminCourseOut)
|
|
def update_course(course_id: int, payload: AdminCourseUpdate, db: Session = Depends(get_db)):
|
|
course = db.get(Course, course_id)
|
|
if not course:
|
|
raise HTTPException(status_code=404, detail="Course not found")
|
|
data = payload.model_dump(exclude_unset=True)
|
|
group_ids = data.pop("group_ids", None)
|
|
for key, value in data.items():
|
|
setattr(course, key, value)
|
|
if group_ids is not None:
|
|
course.course_groups.clear()
|
|
for gid in group_ids:
|
|
course.course_groups.append(CourseGroup(group_id=gid))
|
|
db.commit()
|
|
db.refresh(course)
|
|
return AdminCourseOut(
|
|
id=course.id,
|
|
title=course.title,
|
|
description=course.description,
|
|
is_published=course.is_published,
|
|
created_at=course.created_at,
|
|
updated_at=course.updated_at,
|
|
group_ids=[cg.group_id for cg in course.course_groups],
|
|
)
|
|
|
|
|
|
@router.delete("/courses/{course_id}")
|
|
def delete_course(course_id: int, db: Session = Depends(get_db)):
|
|
course = db.get(Course, course_id)
|
|
if not course:
|
|
raise HTTPException(status_code=404, detail="Course not found")
|
|
db.delete(course)
|
|
db.commit()
|
|
return {"ok": True}
|
|
|
|
|
|
@router.patch("/courses/{course_id}/publish", response_model=AdminCourseOut)
|
|
def toggle_publish(course_id: int, db: Session = Depends(get_db)):
|
|
course = db.get(Course, course_id)
|
|
if not course:
|
|
raise HTTPException(status_code=404, detail="Course not found")
|
|
course.is_published = not course.is_published
|
|
db.commit()
|
|
db.refresh(course)
|
|
return AdminCourseOut(
|
|
id=course.id,
|
|
title=course.title,
|
|
description=course.description,
|
|
is_published=course.is_published,
|
|
created_at=course.created_at,
|
|
updated_at=course.updated_at,
|
|
group_ids=[cg.group_id for cg in course.course_groups],
|
|
)
|
|
|
|
|
|
@router.get("/courses/{course_id}/modules", response_model=List[ModuleOut])
|
|
def list_modules(course_id: int, db: Session = Depends(get_db)):
|
|
return (
|
|
db.query(Module)
|
|
.filter(Module.course_id == course_id)
|
|
.order_by(Module.order_index)
|
|
.all()
|
|
)
|
|
|
|
|
|
@router.post("/courses/{course_id}/modules", response_model=ModuleOut)
|
|
def create_module(course_id: int, payload: ModuleCreate, db: Session = Depends(get_db)):
|
|
course = db.get(Course, course_id)
|
|
if not course:
|
|
raise HTTPException(status_code=404, detail="Course not found")
|
|
module = Module(course_id=course_id, **payload.model_dump())
|
|
db.add(module)
|
|
db.commit()
|
|
db.refresh(module)
|
|
return module
|
|
|
|
|
|
@router.put("/modules/{module_id}", response_model=ModuleOut)
|
|
def update_module(module_id: int, payload: ModuleUpdate, db: Session = Depends(get_db)):
|
|
module = db.get(Module, module_id)
|
|
if not module:
|
|
raise HTTPException(status_code=404, detail="Module not found")
|
|
for key, value in payload.model_dump(exclude_unset=True).items():
|
|
setattr(module, key, value)
|
|
db.commit()
|
|
db.refresh(module)
|
|
return module
|
|
|
|
|
|
@router.get("/modules/{module_id}", response_model=ModuleOut)
|
|
def get_module(module_id: int, db: Session = Depends(get_db)):
|
|
module = db.get(Module, module_id)
|
|
if not module:
|
|
raise HTTPException(status_code=404, detail="Module not found")
|
|
return module
|
|
|
|
|
|
@router.get("/users", response_model=List[AdminUserOut])
|
|
def list_users(q: str | None = None, db: Session = Depends(get_db)):
|
|
query = db.query(User)
|
|
if q:
|
|
query = query.filter(User.email.ilike(f"%{q}%"))
|
|
return query.order_by(User.created_at.desc()).all()
|
|
|
|
|
|
@router.get("/users/{user_id}", response_model=AdminUserOut)
|
|
def get_user(user_id: int, db: Session = Depends(get_db)):
|
|
user = db.get(User, user_id)
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
return user
|
|
|
|
|
|
@router.patch("/users/{user_id}", response_model=AdminUserOut)
|
|
def update_user(user_id: int, payload: AdminUserUpdate, db: Session = Depends(get_db)):
|
|
user = db.get(User, user_id)
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
data = payload.model_dump(exclude_unset=True)
|
|
if "role" in data and data["role"] in [UserRole.admin, UserRole.learner, "admin", "learner"]:
|
|
user.role = UserRole(data["role"])
|
|
if "is_active" in data:
|
|
user.is_active = data["is_active"]
|
|
db.commit()
|
|
db.refresh(user)
|
|
return user
|
|
|
|
|
|
@router.get("/groups", response_model=List[GroupOut])
|
|
def list_groups(db: Session = Depends(get_db)):
|
|
return db.query(Group).order_by(Group.created_at.desc()).all()
|
|
|
|
|
|
@router.post("/groups", response_model=GroupOut)
|
|
def create_group(payload: GroupCreate, db: Session = Depends(get_db)):
|
|
existing = db.query(Group).filter(Group.name == payload.name).first()
|
|
if existing:
|
|
raise HTTPException(status_code=400, detail="Group name already exists")
|
|
group = Group(name=payload.name)
|
|
db.add(group)
|
|
db.commit()
|
|
db.refresh(group)
|
|
return group
|
|
|
|
|
|
@router.put("/groups/{group_id}", response_model=GroupOut)
|
|
def update_group(group_id: int, payload: GroupUpdate, db: Session = Depends(get_db)):
|
|
group = db.get(Group, group_id)
|
|
if not group:
|
|
raise HTTPException(status_code=404, detail="Group not found")
|
|
for key, value in payload.model_dump(exclude_unset=True).items():
|
|
setattr(group, key, value)
|
|
db.commit()
|
|
db.refresh(group)
|
|
return group
|
|
|
|
|
|
@router.delete("/groups/{group_id}")
|
|
def delete_group(group_id: int, db: Session = Depends(get_db)):
|
|
group = db.get(Group, group_id)
|
|
if not group:
|
|
raise HTTPException(status_code=404, detail="Group not found")
|
|
db.delete(group)
|
|
db.commit()
|
|
return {"ok": True}
|
|
|
|
|
|
@router.get("/groups/{group_id}/members", response_model=List[AdminUserOut])
|
|
def list_group_members(group_id: int, db: Session = Depends(get_db)):
|
|
group = db.get(Group, group_id)
|
|
if not group:
|
|
raise HTTPException(status_code=404, detail="Group not found")
|
|
members = (
|
|
db.query(User)
|
|
.join(GroupMembership, GroupMembership.user_id == User.id)
|
|
.filter(GroupMembership.group_id == group_id)
|
|
.all()
|
|
)
|
|
return members
|
|
|
|
|
|
@router.post("/groups/{group_id}/members")
|
|
def add_group_member(group_id: int, payload: GroupMemberAdd, db: Session = Depends(get_db)):
|
|
group = db.get(Group, group_id)
|
|
if not group:
|
|
raise HTTPException(status_code=404, detail="Group not found")
|
|
user = None
|
|
if payload.user_id:
|
|
user = db.get(User, payload.user_id)
|
|
if not user and payload.email:
|
|
user = db.query(User).filter(User.email == payload.email).first()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
existing = (
|
|
db.query(GroupMembership)
|
|
.filter(GroupMembership.group_id == group_id, GroupMembership.user_id == user.id)
|
|
.first()
|
|
)
|
|
if existing:
|
|
return {"ok": True}
|
|
db.add(GroupMembership(group_id=group_id, user_id=user.id))
|
|
db.commit()
|
|
return {"ok": True}
|
|
|
|
|
|
@router.delete("/groups/{group_id}/members/{user_id}")
|
|
def remove_group_member(group_id: int, user_id: int, db: Session = Depends(get_db)):
|
|
membership = (
|
|
db.query(GroupMembership)
|
|
.filter(GroupMembership.group_id == group_id, GroupMembership.user_id == user_id)
|
|
.first()
|
|
)
|
|
if not membership:
|
|
raise HTTPException(status_code=404, detail="Membership not found")
|
|
db.delete(membership)
|
|
db.commit()
|
|
return {"ok": True}
|
|
|
|
|
|
@router.delete("/modules/{module_id}")
|
|
def delete_module(module_id: int, db: Session = Depends(get_db)):
|
|
module = db.get(Module, module_id)
|
|
if not module:
|
|
raise HTTPException(status_code=404, detail="Module not found")
|
|
db.delete(module)
|
|
db.commit()
|
|
return {"ok": True}
|
|
|
|
|
|
@router.get("/modules/{module_id}/questions", response_model=List[QuizQuestionOut])
|
|
def list_questions(module_id: int, db: Session = Depends(get_db)):
|
|
return (
|
|
db.query(QuizQuestion)
|
|
.filter(QuizQuestion.module_id == module_id)
|
|
.all()
|
|
)
|
|
|
|
|
|
@router.post("/modules/{module_id}/questions", response_model=QuizQuestionOut)
|
|
def create_question(module_id: int, payload: QuizQuestionCreate, db: Session = Depends(get_db)):
|
|
module = db.get(Module, module_id)
|
|
if not module:
|
|
raise HTTPException(status_code=404, detail="Module not found")
|
|
question = QuizQuestion(module_id=module_id, prompt=payload.prompt, question_type=payload.question_type)
|
|
db.add(question)
|
|
db.flush()
|
|
for choice in payload.choices:
|
|
db.add(QuizChoice(question_id=question.id, text=choice.text, is_correct=choice.is_correct))
|
|
db.commit()
|
|
db.refresh(question)
|
|
return question
|
|
|
|
|
|
@router.put("/questions/{question_id}", response_model=QuizQuestionOut)
|
|
def update_question(question_id: int, payload: QuizQuestionUpdate, db: Session = Depends(get_db)):
|
|
question = db.get(QuizQuestion, question_id)
|
|
if not question:
|
|
raise HTTPException(status_code=404, detail="Question not found")
|
|
data = payload.model_dump(exclude_unset=True)
|
|
if "prompt" in data:
|
|
question.prompt = data["prompt"]
|
|
if "question_type" in data:
|
|
question.question_type = data["question_type"]
|
|
if "choices" in data:
|
|
question.choices.clear()
|
|
for choice in data["choices"] or []:
|
|
question.choices.append(QuizChoice(text=choice.text, is_correct=choice.is_correct))
|
|
db.commit()
|
|
db.refresh(question)
|
|
return question
|
|
|
|
|
|
@router.delete("/questions/{question_id}")
|
|
def delete_question(question_id: int, db: Session = Depends(get_db)):
|
|
question = db.get(QuizQuestion, question_id)
|
|
if not question:
|
|
raise HTTPException(status_code=404, detail="Question not found")
|
|
db.delete(question)
|
|
db.commit()
|
|
return {"ok": True}
|
|
|
|
|
|
@router.post("/uploads", response_model=UploadOut)
|
|
async def upload_file(file: UploadFile = File(...), db: Session = Depends(get_db), user: User = Depends(get_current_user)):
|
|
# Validate file type (PDF or PPTX only)
|
|
allowed_types = ["application/pdf", "application/vnd.openxmlformats-officedocument.presentationml.presentation", "application/vnd.ms-powerpoint"]
|
|
if file.content_type not in allowed_types:
|
|
raise HTTPException(status_code=400, detail="Only PDF and PPTX files are allowed")
|
|
|
|
# Ensure upload directory exists
|
|
upload_dir = Path(settings.upload_dir)
|
|
upload_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Generate unique filename
|
|
file_ext = Path(file.filename).suffix
|
|
unique_name = f"{secrets.token_urlsafe(16)}{file_ext}"
|
|
file_path = upload_dir / unique_name
|
|
|
|
# Save file
|
|
content = await file.read()
|
|
with open(file_path, "wb") as f:
|
|
f.write(content)
|
|
|
|
# Store metadata in database
|
|
upload = Upload(
|
|
filename=file.filename,
|
|
content_type=file.content_type,
|
|
file_path=str(file_path),
|
|
size_bytes=len(content),
|
|
uploaded_by=user.id,
|
|
)
|
|
db.add(upload)
|
|
db.commit()
|
|
db.refresh(upload)
|
|
return upload
|
|
|
|
|
|
@router.get("/uploads/{upload_id}")
|
|
def get_upload(upload_id: int, db: Session = Depends(get_db)):
|
|
upload = db.get(Upload, upload_id)
|
|
if not upload:
|
|
raise HTTPException(status_code=404, detail="Upload not found")
|
|
return upload
|
|
|
|
|
|
@router.get("/stats/users")
|
|
def stats_users(db: Session = Depends(get_db)) -> List[Dict[str, Any]]:
|
|
"""Get statistics for all users"""
|
|
users = db.query(User).filter(User.role == UserRole.learner).all()
|
|
result = []
|
|
for user in users:
|
|
enrollments_count = db.query(func.count(Enrollment.id)).filter(Enrollment.user_id == user.id).scalar()
|
|
completed_count = (
|
|
db.query(func.count(func.distinct(ModuleProgress.module_id)))
|
|
.join(Module, Module.id == ModuleProgress.module_id)
|
|
.filter(
|
|
ModuleProgress.user_id == user.id,
|
|
ModuleProgress.status == ProgressStatus.completed,
|
|
)
|
|
.scalar()
|
|
)
|
|
attempts_count = db.query(func.count(QuizAttempt.id)).filter(QuizAttempt.user_id == user.id).scalar()
|
|
|
|
result.append({
|
|
"id": user.id,
|
|
"email": user.email,
|
|
"enrollments_count": enrollments_count,
|
|
"completed_modules_count": completed_count,
|
|
"attempts_count": attempts_count,
|
|
"created_at": user.created_at,
|
|
})
|
|
return result
|
|
|
|
|
|
@router.get("/stats/users/{user_id}")
|
|
def stats_user(user_id: int, db: Session = Depends(get_db)) -> Dict[str, Any]:
|
|
"""Get detailed statistics for a specific user"""
|
|
user = db.get(User, user_id)
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
enrollments = db.query(Enrollment).filter(Enrollment.user_id == user_id).all()
|
|
courses_data = []
|
|
|
|
for enrollment in enrollments:
|
|
course = db.get(Course, enrollment.course_id)
|
|
if not course:
|
|
continue
|
|
|
|
modules = db.query(Module).filter(Module.course_id == course.id).order_by(Module.order_index).all()
|
|
progress_map = {
|
|
p.module_id: p
|
|
for p in db.query(ModuleProgress).filter(ModuleProgress.user_id == user_id).all()
|
|
}
|
|
|
|
completed = [p for p in progress_map.values() if p.status == ProgressStatus.completed]
|
|
total_modules = len(modules)
|
|
completed_modules = len([m for m in modules if progress_map.get(m.id) and progress_map.get(m.id).status == ProgressStatus.completed])
|
|
progress_percent = (completed_modules / total_modules * 100) if total_modules > 0 else 0
|
|
|
|
# Get quiz attempts for this course
|
|
attempts_data = []
|
|
for module in modules:
|
|
if module.type.value == "quiz":
|
|
attempts = (
|
|
db.query(QuizAttempt)
|
|
.filter(QuizAttempt.user_id == user_id, QuizAttempt.module_id == module.id)
|
|
.order_by(QuizAttempt.submitted_at.desc())
|
|
.all()
|
|
)
|
|
for attempt in attempts:
|
|
attempts_data.append({
|
|
"module_id": module.id,
|
|
"module_title": module.title,
|
|
"score": attempt.score,
|
|
"passed": attempt.passed,
|
|
"submitted_at": attempt.submitted_at,
|
|
})
|
|
|
|
courses_data.append({
|
|
"course_id": course.id,
|
|
"course_title": course.title,
|
|
"enrolled_at": enrollment.enrolled_at,
|
|
"progress_percent": progress_percent,
|
|
"completed_modules": completed_modules,
|
|
"total_modules": total_modules,
|
|
"attempts": attempts_data,
|
|
})
|
|
|
|
return {
|
|
"user_id": user_id,
|
|
"email": user.email,
|
|
"courses": courses_data,
|
|
}
|
|
|
|
|
|
@router.get("/stats/courses")
|
|
def stats_courses(db: Session = Depends(get_db)) -> List[Dict[str, Any]]:
|
|
"""Get statistics for all courses"""
|
|
courses = db.query(Course).all()
|
|
result = []
|
|
for course in courses:
|
|
enrollments_count = db.query(func.count(Enrollment.id)).filter(Enrollment.course_id == course.id).scalar()
|
|
|
|
# Count users who completed all modules
|
|
modules = db.query(Module).filter(Module.course_id == course.id).all()
|
|
total_modules = len(modules)
|
|
|
|
completions_count = 0
|
|
if total_modules > 0:
|
|
enrollments = db.query(Enrollment).filter(Enrollment.course_id == course.id).all()
|
|
for enrollment in enrollments:
|
|
completed_modules = (
|
|
db.query(func.count(ModuleProgress.id))
|
|
.join(Module, Module.id == ModuleProgress.module_id)
|
|
.filter(
|
|
Module.course_id == course.id,
|
|
ModuleProgress.user_id == enrollment.user_id,
|
|
ModuleProgress.status == ProgressStatus.completed,
|
|
)
|
|
.scalar()
|
|
)
|
|
if completed_modules >= total_modules:
|
|
completions_count += 1
|
|
|
|
# Get final exam average (last module if it's a quiz)
|
|
final_exam_avg = None
|
|
if modules:
|
|
last_module = modules[-1]
|
|
if last_module.type.value == "quiz":
|
|
avg_score = db.query(func.avg(QuizAttempt.score)).filter(QuizAttempt.module_id == last_module.id).scalar()
|
|
final_exam_avg = float(avg_score) if avg_score else None
|
|
|
|
result.append({
|
|
"id": course.id,
|
|
"title": course.title,
|
|
"enrollments_count": enrollments_count,
|
|
"completions_count": completions_count,
|
|
"final_exam_avg_score": final_exam_avg,
|
|
})
|
|
return result
|
|
|
|
|
|
@router.get("/stats/courses/{course_id}")
|
|
def stats_course(course_id: int, db: Session = Depends(get_db)) -> Dict[str, Any]:
|
|
"""Get detailed statistics for a specific course"""
|
|
course = db.get(Course, course_id)
|
|
if not course:
|
|
raise HTTPException(status_code=404, detail="Course not found")
|
|
|
|
enrollments = db.query(Enrollment).filter(Enrollment.course_id == course_id).all()
|
|
modules = db.query(Module).filter(Module.course_id == course_id).order_by(Module.order_index).all()
|
|
|
|
attempts_distribution = []
|
|
for module in modules:
|
|
if module.type.value == "quiz":
|
|
attempts = db.query(QuizAttempt).filter(QuizAttempt.module_id == module.id).all()
|
|
attempts_distribution.append({
|
|
"module_id": module.id,
|
|
"module_title": module.title,
|
|
"total_attempts": len(attempts),
|
|
"avg_score": sum(a.score for a in attempts) / len(attempts) if attempts else 0,
|
|
"pass_rate": sum(1 for a in attempts if a.passed) / len(attempts) * 100 if attempts else 0,
|
|
})
|
|
|
|
return {
|
|
"course_id": course.id,
|
|
"title": course.title,
|
|
"enrollments_count": len(enrollments),
|
|
"total_modules": len(modules),
|
|
"attempts_distribution": attempts_distribution,
|
|
}
|
|
|
|
|