2025-11-09 19:10:30 +01:00

243 lines
7.2 KiB
Python

import base64
from io import BytesIO
from pathlib import Path
from secrets import token_urlsafe
from uuid import uuid4
import httpx
from fastapi import HTTPException, UploadFile
from PIL import Image
from .. import __version__
from ..config import Settings
settings = Settings()
def generate_urlsafe() -> str:
return token_urlsafe(32)
def generate_filename(format: str) -> str:
return f"{uuid4()}.{format}"
def assets_folder_path() -> Path:
return Path(settings.ASSETS_FOLDER)
def attachments_folder_path() -> Path:
return Path(settings.ATTACHMENTS_FOLDER)
def attachments_trip_folder_path(trip_id: int | str) -> Path:
path = attachments_folder_path() / str(trip_id)
path.mkdir(parents=True, exist_ok=True)
return path
def b64img_decode(data: str) -> bytes:
return (
base64.b64decode(data.split(",", 1)[1]) if data.startswith("data:image/") else base64.b64decode(data)
)
def remove_attachment(trip_id: int, filename: str):
try:
att_fp = attachments_trip_folder_path(trip_id) / filename
if not att_fp.exists():
return
att_fp.unlink()
except OSError:
pass
def remove_backup(filename: str):
if not filename:
return
try:
backup_fp = Path(settings.BACKUPS_FOLDER) / filename
if not backup_fp.exists():
return
backup_fp.unlink()
except OSError:
pass
def remove_image(filename: str):
try:
image_fp = assets_folder_path() / filename
if not image_fp.exists():
return
image_fp.unlink()
except OSError:
pass
async def httpx_get(link: str) -> str:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Referer": link,
}
try:
async with httpx.AsyncClient(follow_redirects=True, headers=headers, timeout=5) as client:
response = await client.get(link)
response.raise_for_status()
return response.json()
except Exception:
raise HTTPException(status_code=400, detail="Bad Request")
async def download_file(link: str, raise_on_error: bool = False) -> str:
if not link[:4] == "http":
raise HTTPException(status_code=400, detail="Bad Request")
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
"Accept": "image/*",
"Accept-Language": "en-US,en;q=0.5",
"Referer": link,
}
try:
async with httpx.AsyncClient(follow_redirects=True, headers=headers, timeout=5) as client:
response = await client.get(link)
response.raise_for_status()
content_type = response.headers.get("Content-Type", "")
if not content_type.startswith("image/"):
raise HTTPException(
status_code=400, detail="Bad Request: provided image URL is not an image"
)
infer_extension = content_type.split("/")[1]
if not infer_extension:
infer_extension = "jpg"
path = assets_folder_path() / generate_filename(infer_extension)
with open(path, "wb") as f:
f.write(response.content)
return f.name
except Exception as e:
if raise_on_error:
raise HTTPException(status_code=400, detail=f"Failed to download file: {e}")
return ""
async def check_update():
url = "https://api.github.com/repos/itskovacs/trip/releases/latest"
try:
async with httpx.AsyncClient(follow_redirects=True, timeout=5) as client:
response = await client.get(url)
response.raise_for_status()
latest_version = response.json()["tag_name"]
if __version__ != latest_version:
return latest_version
return None
except Exception:
raise HTTPException(status_code=503, detail="Couldn't verify for update")
def patch_image(fp: str, size: int = 400) -> bool:
try:
with Image.open(fp) as im:
if im.mode not in ("RGB", "RGBA"):
im = im.convert("RGB")
# Resize and crop to square of size x size
if size > 0:
im_ratio = im.width / im.height
if im_ratio > 1:
new_height = size
new_width = int(size * im_ratio)
else:
new_width = size
new_height = int(size / im_ratio)
im = im.resize((new_width, new_height), Image.LANCZOS)
left = (im.width - size) // 2
top = (im.height - size) // 2
right = left + size
bottom = top + size
im = im.crop((left, top, right, bottom))
im.save(fp)
return True
except Exception:
...
return False
def save_image_to_file(content: bytes, size: int = 600) -> str:
try:
with Image.open(BytesIO(content)) as im:
if im.mode not in ("RGB", "RGBA"):
im = im.convert("RGB")
if size > 0: # Crop as square of (size * size)
im_ratio = im.width / im.height
target_ratio = 1 # Square ratio is 1
if im_ratio > target_ratio:
new_height = size
new_width = int(new_height * im_ratio)
else:
new_width = size
new_height = int(new_width / im_ratio)
im = im.resize((new_width, new_height), Image.LANCZOS)
left = (im.width - size) // 2
top = (im.height - size) // 2
right = left + size
bottom = top + size
im = im.crop((left, top, right, bottom))
if content.startswith(b"\x89PNG"):
image_ext = "png"
elif content.startswith(b"\xff\xd8"):
image_ext = "jpeg"
elif content.startswith(b"RIFF") and content[8:12] == b"WEBP":
image_ext = "webp"
else:
raise ValueError("Unsupported image format")
filename = generate_filename(image_ext)
filepath = assets_folder_path() / filename
im.save(filepath)
return filename
except Exception:
if filepath.exists():
filepath.unlink()
return ""
async def save_attachment(trip_id: int, file: UploadFile) -> str:
if file.content_type != "application/pdf":
raise ValueError("Unsupported attachment format")
if file.size > settings.ATTACHMENT_MAX_SIZE:
raise ValueError("File size is above ATTACHMENT_MAX_SIZE")
filename = generate_filename("pdf")
filepath = attachments_trip_folder_path(trip_id) / filename
try:
with open(filepath, "wb") as buf:
while chunk := await file.read(8192):
buf.write(chunk)
return filename
except Exception:
if filepath.exists():
filepath.unlink()
return ""