2025-11-09 19:18:13 +01:00

102 lines
3.4 KiB
Python

from typing import Any
import httpx
from fastapi import HTTPException
gmaps_types_mapper: dict[str, list] = {
"Nature & Outdoor": ["natural_feature", "landmark"],
"Entertainment & Leisure": ["amusement", "aquarium"],
"Culture": ["museum", "historical", "art_", "church"],
"Food & Drink": ["food", "bar", "bakery", "coffee_shop", "restaurant"],
"Adventure & Sports": ["adventure_sports_center"],
"Wellness": ["wellness"],
"Accommodation": ["hotel", "camping"],
}
def compute_avg_price(price_range: dict | None) -> float | None:
if not price_range:
return None
start = price_range.get("startPrice", {}).get("units")
end = price_range.get("endPrice", {}).get("units")
if start and end:
return (int(start) + int(end)) / 2
elif start:
return int(start)
elif end:
return int(end)
else:
return None
def compute_description(place: dict[str, Any]):
description = ""
address = place.get("formattedAddress")
phone = place.get("internationalPhoneNumber")
website = place.get("websiteUri")
if address:
description += f"{address}\n"
if phone:
description += f"Phone: {phone}\n"
if website:
description += f"Website: {website}"
return description.rstrip()
async def gmaps_textsearch(search: str, api_key: str) -> list[dict[str, Any]]:
url = "https://places.googleapis.com/v1/places:searchText"
body = {"textQuery": search}
headers = {
"Content-Type": "application/json",
"X-Goog-Api-Key": api_key,
"X-Goog-FieldMask": "places.id,places.types,places.location,places.priceRange,places.formattedAddress,places.websiteUri,places.internationalPhoneNumber,places.displayName,places.allowsDogs,places.photos",
}
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(url, json=body, headers=headers)
response.raise_for_status()
data = response.json()
return data.get("places", [])
except Exception:
raise HTTPException(status_code=400, detail="Bad Request")
async def gmaps_photo(name: str, api_key: str) -> str | None:
url = f"https://places.googleapis.com/v1/{name}/media"
params = {"key": api_key, "maxWidthPx": 1000}
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(url, params=params, follow_redirects=True)
response.raise_for_status()
return str(response.url) if response.url else None
except Exception:
raise HTTPException(status_code=400, detail="Bad Request")
async def gmaps_get_boundaries(name: str, api_key: str) -> dict[str, Any] | None:
url = "https://maps.googleapis.com/maps/api/geocode/json"
params = {"address": name, "key": api_key}
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(url, params=params)
response.raise_for_status()
data = response.json()
if data.get("status") != "OK" or not data.get("results"):
return None
result = data["results"][0]
geometry = result.get("geometry", {})
bounds = geometry.get("bounds")
if not bounds:
bounds = geometry.get("viewport")
return bounds
except Exception:
raise HTTPException(status_code=400, detail="Bad Request")