124 lines
4.3 KiB
Python
124 lines
4.3 KiB
Python
from pathlib import Path
|
|
from typing import Annotated
|
|
|
|
from fastapi import (APIRouter, BackgroundTasks, Depends, File, HTTPException,
|
|
UploadFile)
|
|
from fastapi.responses import FileResponse
|
|
from sqlmodel import select
|
|
|
|
from ..config import settings
|
|
from ..deps import SessionDep, get_current_username
|
|
from ..models.models import (Backup, BackupRead, BackupStatus, User, UserRead,
|
|
UserUpdate)
|
|
from ..utils.utils import check_update
|
|
from ..utils.zip import (process_backup_export, process_backup_import,
|
|
process_legacy_import)
|
|
|
|
router = APIRouter(prefix="/api/settings", tags=["settings"])
|
|
|
|
|
|
@router.get("", response_model=UserRead)
|
|
def get_user_settings(
|
|
session: SessionDep, current_user: Annotated[str, Depends(get_current_username)]
|
|
) -> UserRead:
|
|
db_user = session.get(User, current_user)
|
|
return UserRead.serialize(db_user)
|
|
|
|
|
|
@router.put("", response_model=UserRead)
|
|
def put_user_settings(
|
|
session: SessionDep, data: UserUpdate, current_user: Annotated[str, Depends(get_current_username)]
|
|
) -> UserRead:
|
|
db_user = session.get(User, current_user)
|
|
|
|
user_data = data.model_dump(exclude_unset=True)
|
|
if "do_not_display" in user_data:
|
|
user_data["do_not_display"] = (
|
|
",".join(user_data["do_not_display"]) if user_data["do_not_display"] else ""
|
|
)
|
|
|
|
for key, value in user_data.items():
|
|
setattr(db_user, key, value)
|
|
|
|
session.add(db_user)
|
|
session.commit()
|
|
session.refresh(db_user)
|
|
return UserRead.serialize(db_user)
|
|
|
|
|
|
@router.get("/checkversion")
|
|
async def check_version(session: SessionDep, current_user: Annotated[str, Depends(get_current_username)]):
|
|
return await check_update()
|
|
|
|
|
|
@router.post("/backups", response_model=BackupRead)
|
|
def create_backup_export(
|
|
background_tasks: BackgroundTasks,
|
|
session: SessionDep,
|
|
current_user: Annotated[str, Depends(get_current_username)],
|
|
) -> BackupRead:
|
|
db_backup = Backup(user=current_user)
|
|
session.add(db_backup)
|
|
session.commit()
|
|
session.refresh(db_backup)
|
|
background_tasks.add_task(process_backup_export, session, db_backup.id)
|
|
return BackupRead.serialize(db_backup)
|
|
|
|
|
|
@router.get("/backups", response_model=list[BackupRead])
|
|
def read_backups(
|
|
session: SessionDep, current_user: Annotated[str, Depends(get_current_username)]
|
|
) -> list[BackupRead]:
|
|
db_backups = session.exec(select(Backup).where(Backup.user == current_user)).all()
|
|
return [BackupRead.serialize(backup) for backup in db_backups]
|
|
|
|
|
|
@router.get("/backups/{backup_id}/download")
|
|
def download_backup(
|
|
backup_id: int, session: SessionDep, current_user: Annotated[str, Depends(get_current_username)]
|
|
):
|
|
db_backup = session.exec(
|
|
select(Backup).where(
|
|
Backup.id == backup_id, Backup.user == current_user, Backup.status == BackupStatus.COMPLETED
|
|
)
|
|
).first()
|
|
if not db_backup or not db_backup.filename:
|
|
raise HTTPException(status_code=404, detail="Not found")
|
|
|
|
file_path = Path(settings.BACKUPS_FOLDER) / db_backup.filename
|
|
if not file_path.exists():
|
|
raise HTTPException(status_code=404, detail="Not found")
|
|
|
|
iso_date = db_backup.created_at.strftime("%Y-%m-%d")
|
|
filename = f"TRIP_{iso_date}_{current_user}_backup.zip"
|
|
return FileResponse(path=file_path, filename=filename, media_type="application/zip")
|
|
|
|
|
|
@router.delete("/backups/{backup_id}")
|
|
async def delete_backup(
|
|
backup_id: int, session: SessionDep, current_user: Annotated[str, Depends(get_current_username)]
|
|
):
|
|
db_backup = session.get(Backup, backup_id)
|
|
if not db_backup.user == current_user:
|
|
raise HTTPException(status_code=403, detail="Forbidden")
|
|
|
|
session.delete(db_backup)
|
|
session.commit()
|
|
return {}
|
|
|
|
|
|
@router.post("/backups/import")
|
|
async def backup_import(
|
|
session: SessionDep,
|
|
current_user: Annotated[str, Depends(get_current_username)],
|
|
file: UploadFile = File(...),
|
|
):
|
|
content_type = file.content_type
|
|
if content_type == "application/json":
|
|
return await process_legacy_import(session, current_user, file)
|
|
|
|
elif content_type == "application/x-zip-compressed" or content_type == "application/zip":
|
|
return await process_backup_import(session, current_user, file)
|
|
|
|
raise HTTPException(status_code=400, detail="Bad request, invalid file")
|