tripweb/backend/trip/routers/settings.py
2025-10-16 18:44:12 +02:00

124 lines
4.3 KiB
Python

from pathlib import Path
from typing import Annotated
from fastapi import (APIRouter, BackgroundTasks, Depends, File, HTTPException,
UploadFile)
from fastapi.responses import FileResponse
from sqlmodel import select
from ..config import settings
from ..deps import SessionDep, get_current_username
from ..models.models import (Backup, BackupRead, BackupStatus, User, UserRead,
UserUpdate)
from ..utils.utils import check_update
from ..utils.zip import (process_backup_export, process_backup_import,
process_legacy_import)
router = APIRouter(prefix="/api/settings", tags=["settings"])
@router.get("", response_model=UserRead)
def get_user_settings(
session: SessionDep, current_user: Annotated[str, Depends(get_current_username)]
) -> UserRead:
db_user = session.get(User, current_user)
return UserRead.serialize(db_user)
@router.put("", response_model=UserRead)
def put_user_settings(
session: SessionDep, data: UserUpdate, current_user: Annotated[str, Depends(get_current_username)]
) -> UserRead:
db_user = session.get(User, current_user)
user_data = data.model_dump(exclude_unset=True)
if "do_not_display" in user_data:
user_data["do_not_display"] = (
",".join(user_data["do_not_display"]) if user_data["do_not_display"] else ""
)
for key, value in user_data.items():
setattr(db_user, key, value)
session.add(db_user)
session.commit()
session.refresh(db_user)
return UserRead.serialize(db_user)
@router.get("/checkversion")
async def check_version(session: SessionDep, current_user: Annotated[str, Depends(get_current_username)]):
return await check_update()
@router.post("/backups", response_model=BackupRead)
def create_backup_export(
background_tasks: BackgroundTasks,
session: SessionDep,
current_user: Annotated[str, Depends(get_current_username)],
) -> BackupRead:
db_backup = Backup(user=current_user)
session.add(db_backup)
session.commit()
session.refresh(db_backup)
background_tasks.add_task(process_backup_export, session, db_backup.id)
return BackupRead.serialize(db_backup)
@router.get("/backups", response_model=list[BackupRead])
def read_backups(
session: SessionDep, current_user: Annotated[str, Depends(get_current_username)]
) -> list[BackupRead]:
db_backups = session.exec(select(Backup).where(Backup.user == current_user)).all()
return [BackupRead.serialize(backup) for backup in db_backups]
@router.get("/backups/{backup_id}/download")
def download_backup(
backup_id: int, session: SessionDep, current_user: Annotated[str, Depends(get_current_username)]
):
db_backup = session.exec(
select(Backup).where(
Backup.id == backup_id, Backup.user == current_user, Backup.status == BackupStatus.COMPLETED
)
).first()
if not db_backup or not db_backup.filename:
raise HTTPException(status_code=404, detail="Not found")
file_path = Path(settings.BACKUPS_FOLDER) / db_backup.filename
if not file_path.exists():
raise HTTPException(status_code=404, detail="Not found")
iso_date = db_backup.created_at.strftime("%Y-%m-%d")
filename = f"TRIP_{iso_date}_{current_user}_backup.zip"
return FileResponse(path=file_path, filename=filename, media_type="application/zip")
@router.delete("/backups/{backup_id}")
async def delete_backup(
backup_id: int, session: SessionDep, current_user: Annotated[str, Depends(get_current_username)]
):
db_backup = session.get(Backup, backup_id)
if not db_backup.user == current_user:
raise HTTPException(status_code=403, detail="Forbidden")
session.delete(db_backup)
session.commit()
return {}
@router.post("/backups/import")
async def backup_import(
session: SessionDep,
current_user: Annotated[str, Depends(get_current_username)],
file: UploadFile = File(...),
):
content_type = file.content_type
if content_type == "application/json":
return await process_legacy_import(session, current_user, file)
elif content_type == "application/x-zip-compressed" or content_type == "application/zip":
return await process_backup_import(session, current_user, file)
raise HTTPException(status_code=400, detail="Bad request, invalid file")