243 lines
7.2 KiB
Python
243 lines
7.2 KiB
Python
import base64
|
|
from io import BytesIO
|
|
from pathlib import Path
|
|
from secrets import token_urlsafe
|
|
from uuid import uuid4
|
|
|
|
import httpx
|
|
from fastapi import HTTPException, UploadFile
|
|
from PIL import Image
|
|
|
|
from .. import __version__
|
|
from ..config import Settings
|
|
|
|
settings = Settings()
|
|
|
|
|
|
def generate_urlsafe() -> str:
|
|
return token_urlsafe(32)
|
|
|
|
|
|
def generate_filename(format: str) -> str:
|
|
return f"{uuid4()}.{format}"
|
|
|
|
|
|
def assets_folder_path() -> Path:
|
|
return Path(settings.ASSETS_FOLDER)
|
|
|
|
|
|
def attachments_folder_path() -> Path:
|
|
return Path(settings.ATTACHMENTS_FOLDER)
|
|
|
|
|
|
def attachments_trip_folder_path(trip_id: int | str) -> Path:
|
|
path = attachments_folder_path() / str(trip_id)
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
return path
|
|
|
|
|
|
def b64img_decode(data: str) -> bytes:
|
|
return (
|
|
base64.b64decode(data.split(",", 1)[1]) if data.startswith("data:image/") else base64.b64decode(data)
|
|
)
|
|
|
|
|
|
def remove_attachment(trip_id: int, filename: str):
|
|
try:
|
|
att_fp = attachments_trip_folder_path(trip_id) / filename
|
|
if not att_fp.exists():
|
|
return
|
|
att_fp.unlink()
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def remove_backup(filename: str):
|
|
if not filename:
|
|
return
|
|
try:
|
|
backup_fp = Path(settings.BACKUPS_FOLDER) / filename
|
|
if not backup_fp.exists():
|
|
return
|
|
backup_fp.unlink()
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def remove_image(filename: str):
|
|
try:
|
|
image_fp = assets_folder_path() / filename
|
|
if not image_fp.exists():
|
|
return
|
|
image_fp.unlink()
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
async def httpx_get(link: str) -> str:
|
|
headers = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
|
"Accept-Language": "en-US,en;q=0.5",
|
|
"Referer": link,
|
|
}
|
|
|
|
try:
|
|
async with httpx.AsyncClient(follow_redirects=True, headers=headers, timeout=5) as client:
|
|
response = await client.get(link)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception:
|
|
raise HTTPException(status_code=400, detail="Bad Request")
|
|
|
|
|
|
async def download_file(link: str, raise_on_error: bool = False) -> str:
|
|
if not link[:4] == "http":
|
|
raise HTTPException(status_code=400, detail="Bad Request")
|
|
|
|
headers = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
|
|
"Accept": "image/*",
|
|
"Accept-Language": "en-US,en;q=0.5",
|
|
"Referer": link,
|
|
}
|
|
|
|
try:
|
|
async with httpx.AsyncClient(follow_redirects=True, headers=headers, timeout=5) as client:
|
|
response = await client.get(link)
|
|
response.raise_for_status()
|
|
content_type = response.headers.get("Content-Type", "")
|
|
if not content_type.startswith("image/"):
|
|
raise HTTPException(
|
|
status_code=400, detail="Bad Request: provided image URL is not an image"
|
|
)
|
|
infer_extension = content_type.split("/")[1]
|
|
if not infer_extension:
|
|
infer_extension = "jpg"
|
|
path = assets_folder_path() / generate_filename(infer_extension)
|
|
with open(path, "wb") as f:
|
|
f.write(response.content)
|
|
return str(path)
|
|
except Exception as e:
|
|
if raise_on_error:
|
|
raise HTTPException(status_code=400, detail=f"Failed to download file: {e}")
|
|
return ""
|
|
|
|
|
|
async def check_update():
|
|
url = "https://api.github.com/repos/itskovacs/trip/releases/latest"
|
|
try:
|
|
async with httpx.AsyncClient(follow_redirects=True, timeout=5) as client:
|
|
response = await client.get(url)
|
|
response.raise_for_status()
|
|
|
|
latest_version = response.json()["tag_name"]
|
|
if __version__ != latest_version:
|
|
return latest_version
|
|
|
|
return None
|
|
|
|
except Exception:
|
|
raise HTTPException(status_code=503, detail="Couldn't verify for update")
|
|
|
|
|
|
def patch_image(fp: str, size: int = 400) -> bool:
|
|
try:
|
|
with Image.open(fp) as im:
|
|
if im.mode not in ("RGB", "RGBA"):
|
|
im = im.convert("RGB")
|
|
|
|
# Resize and crop to square of size x size
|
|
if size > 0:
|
|
im_ratio = im.width / im.height
|
|
|
|
if im_ratio > 1:
|
|
new_height = size
|
|
new_width = int(size * im_ratio)
|
|
else:
|
|
new_width = size
|
|
new_height = int(size / im_ratio)
|
|
|
|
im = im.resize((new_width, new_height), Image.LANCZOS)
|
|
|
|
left = (im.width - size) // 2
|
|
top = (im.height - size) // 2
|
|
right = left + size
|
|
bottom = top + size
|
|
|
|
im = im.crop((left, top, right, bottom))
|
|
|
|
im.save(fp)
|
|
return True
|
|
|
|
except Exception:
|
|
...
|
|
return False
|
|
|
|
|
|
def save_image_to_file(content: bytes, size: int = 600) -> str:
|
|
try:
|
|
with Image.open(BytesIO(content)) as im:
|
|
if im.mode not in ("RGB", "RGBA"):
|
|
im = im.convert("RGB")
|
|
|
|
if size > 0: # Crop as square of (size * size)
|
|
im_ratio = im.width / im.height
|
|
target_ratio = 1 # Square ratio is 1
|
|
|
|
if im_ratio > target_ratio:
|
|
new_height = size
|
|
new_width = int(new_height * im_ratio)
|
|
else:
|
|
new_width = size
|
|
new_height = int(new_width / im_ratio)
|
|
|
|
im = im.resize((new_width, new_height), Image.LANCZOS)
|
|
|
|
left = (im.width - size) // 2
|
|
top = (im.height - size) // 2
|
|
right = left + size
|
|
bottom = top + size
|
|
|
|
im = im.crop((left, top, right, bottom))
|
|
|
|
if content.startswith(b"\x89PNG"):
|
|
image_ext = "png"
|
|
elif content.startswith(b"\xff\xd8"):
|
|
image_ext = "jpeg"
|
|
elif content.startswith(b"RIFF") and content[8:12] == b"WEBP":
|
|
image_ext = "webp"
|
|
else:
|
|
raise ValueError("Unsupported image format")
|
|
|
|
filename = generate_filename(image_ext)
|
|
filepath = assets_folder_path() / filename
|
|
im.save(filepath)
|
|
|
|
return filename
|
|
|
|
except Exception:
|
|
if filepath.exists():
|
|
filepath.unlink()
|
|
return ""
|
|
|
|
|
|
async def save_attachment(trip_id: int, file: UploadFile) -> str:
|
|
if file.content_type != "application/pdf":
|
|
raise ValueError("Unsupported attachment format")
|
|
|
|
if file.size > settings.ATTACHMENT_MAX_SIZE:
|
|
raise ValueError("File size is above ATTACHMENT_MAX_SIZE")
|
|
|
|
filename = generate_filename("pdf")
|
|
filepath = attachments_trip_folder_path(trip_id) / filename
|
|
try:
|
|
with open(filepath, "wb") as buf:
|
|
while chunk := await file.read(8192):
|
|
buf.write(chunk)
|
|
return filename
|
|
except Exception:
|
|
if filepath.exists():
|
|
filepath.unlink()
|
|
return ""
|