from datetime import UTC, datetime, timedelta import jwt from argon2 import PasswordHasher from argon2 import exceptions as argon_exceptions from fastapi import HTTPException from .config import settings from .models.models import Token ph = PasswordHasher() def hash_password(password: str) -> str: return ph.hash(password) def verify_password(plain_password: str, hashed_password: str) -> bool: try: return ph.verify(hashed_password, plain_password) except ( argon_exceptions.VerifyMismatchError, argon_exceptions.VerificationError, argon_exceptions.InvalidHashError, ): raise HTTPException(status_code=401, detail="Invalid credentials") def create_access_token(data: dict) -> str: to_encode = data.copy() expire = datetime.now(UTC) + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) def create_refresh_token(data: dict) -> str: to_encode = data.copy() expire = datetime.now(UTC) + timedelta(minutes=settings.REFRESH_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire.timestamp()}) return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) def create_tokens(data: dict) -> Token: return Token(access_token=create_access_token(data), refresh_token=create_refresh_token(data)) def verify_exists_and_owns(username: str, obj) -> None: if not obj: raise HTTPException(status_code=404, detail="The resource does not exist") if obj.user != username: raise PermissionError return None