from pathlib import Path from typing import Annotated from fastapi import (APIRouter, BackgroundTasks, Depends, File, HTTPException, UploadFile) from fastapi.responses import FileResponse from sqlmodel import select from ..config import settings from ..deps import SessionDep, get_current_username from ..models.models import (Backup, BackupRead, BackupStatus, User, UserRead, UserUpdate) from ..utils.utils import check_update from ..utils.zip import (process_backup_export, process_backup_import, process_legacy_import) router = APIRouter(prefix="/api/settings", tags=["settings"]) @router.get("", response_model=UserRead) def get_user_settings( session: SessionDep, current_user: Annotated[str, Depends(get_current_username)] ) -> UserRead: db_user = session.get(User, current_user) return UserRead.serialize(db_user) @router.put("", response_model=UserRead) def put_user_settings( session: SessionDep, data: UserUpdate, current_user: Annotated[str, Depends(get_current_username)] ) -> UserRead: db_user = session.get(User, current_user) user_data = data.model_dump(exclude_unset=True) if "do_not_display" in user_data: user_data["do_not_display"] = ( ",".join(user_data["do_not_display"]) if user_data["do_not_display"] else "" ) for key, value in user_data.items(): setattr(db_user, key, value) session.add(db_user) session.commit() session.refresh(db_user) return UserRead.serialize(db_user) @router.get("/checkversion") async def check_version(session: SessionDep, current_user: Annotated[str, Depends(get_current_username)]): return await check_update() @router.post("/backups", response_model=BackupRead) def create_backup_export( background_tasks: BackgroundTasks, session: SessionDep, current_user: Annotated[str, Depends(get_current_username)], ) -> BackupRead: db_backup = Backup(user=current_user) session.add(db_backup) session.commit() session.refresh(db_backup) background_tasks.add_task(process_backup_export, session, db_backup.id) return BackupRead.serialize(db_backup) @router.get("/backups", response_model=list[BackupRead]) def read_backups( session: SessionDep, current_user: Annotated[str, Depends(get_current_username)] ) -> list[BackupRead]: db_backups = session.exec(select(Backup).where(Backup.user == current_user)).all() return [BackupRead.serialize(backup) for backup in db_backups] @router.get("/backups/{backup_id}/download") def download_backup( backup_id: int, session: SessionDep, current_user: Annotated[str, Depends(get_current_username)] ): db_backup = session.exec( select(Backup).where( Backup.id == backup_id, Backup.user == current_user, Backup.status == BackupStatus.COMPLETED ) ).first() if not db_backup or not db_backup.filename: raise HTTPException(status_code=404, detail="Not found") file_path = Path(settings.BACKUPS_FOLDER) / db_backup.filename if not file_path.exists(): raise HTTPException(status_code=404, detail="Not found") iso_date = db_backup.created_at.strftime("%Y-%m-%d") filename = f"TRIP_{iso_date}_{current_user}_backup.zip" return FileResponse(path=file_path, filename=filename, media_type="application/zip") @router.delete("/backups/{backup_id}") async def delete_backup( backup_id: int, session: SessionDep, current_user: Annotated[str, Depends(get_current_username)] ): db_backup = session.get(Backup, backup_id) if not db_backup.user == current_user: raise HTTPException(status_code=403, detail="Forbidden") session.delete(db_backup) session.commit() return {} @router.post("/backups/import") async def backup_import( session: SessionDep, current_user: Annotated[str, Depends(get_current_username)], file: UploadFile = File(...), ): content_type = file.content_type if content_type == "application/json": return await process_legacy_import(session, current_user, file) elif content_type == "application/x-zip-compressed" or content_type == "application/zip": return await process_backup_import(session, current_user, file) raise HTTPException(status_code=400, detail="Bad request, invalid file")