2025-07-23 18:18:41 +02:00

172 lines
5.0 KiB
Python

import base64
from datetime import date
from io import BytesIO
from pathlib import Path
from uuid import uuid4
import httpx
from fastapi import HTTPException
from PIL import Image
from .. import __version__
from ..config import Settings
settings = Settings()
def generate_filename(format: str) -> str:
return f"{uuid4()}.{format}"
def assets_folder_path() -> Path:
return Path(settings.ASSETS_FOLDER)
def b64e(data: bytes) -> bytes:
return base64.b64encode(data)
def b64img_decode(data: str) -> bytes:
return (
base64.b64decode(data.split(",", 1)[1]) if data.startswith("data:image/") else base64.b64decode(data)
)
def remove_image(path: str):
try:
fpath = Path(assets_folder_path() / path)
if not fpath.exists():
return
fpath.unlink()
except OSError as exc:
raise Exception("Error deleting image:", exc, path)
def parse_str_or_date_to_date(cdate: str | date) -> date:
if isinstance(cdate, str):
try:
return date.fromisoformat(cdate)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid date format, use YYYY-MM-DD")
return cdate
async def download_file(link: str, raise_on_error: bool = False) -> str:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Referer": link,
}
try:
async with httpx.AsyncClient(follow_redirects=True, headers=headers, timeout=5) as client:
response = await client.get(link)
response.raise_for_status()
path = assets_folder_path() / generate_filename(link.split("?")[0].split(".")[-1])
with open(path, "wb") as f:
f.write(response.content)
return f.name
except Exception as e:
if raise_on_error:
raise HTTPException(status_code=400, detail=f"Failed to download file: {e}")
return ""
async def check_update():
url = "https://api.github.com/repos/itskovacs/trip/releases/latest"
try:
async with httpx.AsyncClient(follow_redirects=True, timeout=5) as client:
response = await client.get(url)
response.raise_for_status()
latest_version = response.json()["tag_name"]
if __version__ != latest_version:
return latest_version
return None
except Exception:
raise HTTPException(status_code=503, detail="Couldn't verify for update")
def patch_image(fp: str, size: int = 400) -> bool:
try:
with Image.open(fp) as im:
if im.mode not in ("RGB", "RGBA"):
im = im.convert("RGB")
# Resize and crop to square of size x size
if size > 0:
im_ratio = im.width / im.height
if im_ratio > 1:
new_height = size
new_width = int(size * im_ratio)
else:
new_width = size
new_height = int(size / im_ratio)
im = im.resize((new_width, new_height), Image.LANCZOS)
left = (im.width - size) // 2
top = (im.height - size) // 2
right = left + size
bottom = top + size
im = im.crop((left, top, right, bottom))
im.save(fp)
return True
except Exception:
...
return False
def save_image_to_file(content: bytes, size: int = 600) -> str:
try:
with Image.open(BytesIO(content)) as im:
if im.mode not in ("RGB", "RGBA"):
im = im.convert("RGB")
if size > 0: # Crop as square of (size * size)
im_ratio = im.width / im.height
target_ratio = 1 # Square ratio is 1
if im_ratio > target_ratio:
new_height = size
new_width = int(new_height * im_ratio)
else:
new_width = size
new_height = int(new_width / im_ratio)
im = im.resize((new_width, new_height), Image.LANCZOS)
left = (im.width - size) // 2
top = (im.height - size) // 2
right = left + size
bottom = top + size
im = im.crop((left, top, right, bottom))
if content.startswith(b"\x89PNG"):
image_ext = "png"
elif content.startswith(b"\xff\xd8"):
image_ext = "jpeg"
elif content.startswith(b"RIFF") and content[8:12] == b"WEBP":
image_ext = "webp"
else:
raise ValueError("Unsupported image format")
filename = generate_filename(image_ext)
filepath = assets_folder_path() / filename
im.save(filepath)
return filename
except Exception:
...
return ""