import requests import time import socket import re import unicodedata from worldtravel.models import Region, City, VisitedRegion, VisitedCity from django.conf import settings # ----------------- # SEARCHING def search_google(query, lang="en"): try: api_key = settings.GOOGLE_MAPS_API_KEY if not api_key: return { "error": "Geocoding service unavailable. Please check configuration." } # Updated to use the new Places API (New) endpoint url = "https://places.googleapis.com/v1/places:searchText" headers = { "Content-Type": "application/json", "X-Goog-Api-Key": api_key, "X-Goog-FieldMask": "places.displayName.text,places.formattedAddress,places.location,places.types,places.rating,places.userRatingCount", } payload = { "textQuery": query, "maxResultCount": 20, # Adjust as needed "languageCode": lang, } response = requests.post(url, json=payload, headers=headers, timeout=(2, 5)) response.raise_for_status() data = response.json() # Check if we have places in the response places = data.get("places", []) if not places: return {"error": "No locations found for the given query."} results = [] for place in places: location = place.get("location", {}) types = place.get("types", []) primary_type = types[0] if types else None category = _extract_google_category(types) addresstype = _infer_addresstype(primary_type) importance = None rating = place.get("rating") ratings_total = place.get("userRatingCount") if rating is not None and ratings_total: importance = round(float(rating) * ratings_total / 100, 2) # Extract display name from the new API structure display_name_obj = place.get("displayName", {}) name = display_name_obj.get("text") if display_name_obj else None results.append( { "lat": location.get("latitude"), "lon": location.get("longitude"), "name": name, "display_name": place.get("formattedAddress"), "type": primary_type, "category": category, "importance": importance, "addresstype": addresstype, "powered_by": "google", } ) if results: results.sort( key=lambda r: r["importance"] if r["importance"] is not None else 0, reverse=True, ) return results except requests.exceptions.Timeout: return { "error": "Request timed out while contacting Google Maps. Please try again." } except requests.exceptions.ConnectionError: return { "error": "Unable to connect to Google Maps service. Please check your internet connection." } except requests.exceptions.HTTPError as e: if response.status_code == 400: return {"error": "Invalid request to Google Maps. Please check your query."} elif response.status_code == 401: return { "error": "Authentication failed with Google Maps. Please check API configuration." } elif response.status_code == 403: return { "error": "Access forbidden to Google Maps. Please check API permissions." } elif response.status_code == 429: return { "error": "Too many requests to Google Maps. Please try again later." } else: return {"error": "Google Maps service error. Please try again later."} except requests.exceptions.RequestException: return { "error": "Network error while contacting Google Maps. Please try again." } except Exception: return { "error": "An unexpected error occurred during Google search. Please try again." } def _extract_google_category(types): # Basic category inference based on common place types if not types: return None if "restaurant" in types: return "food" if "lodging" in types: return "accommodation" if "park" in types or "natural_feature" in types: return "nature" if "museum" in types or "tourist_attraction" in types: return "attraction" if "locality" in types or "administrative_area_level_1" in types: return "region" return types[0] # fallback to first type def _infer_addresstype(type_): # Rough mapping of Google place types to OSM-style addresstypes mapping = { "locality": "city", "sublocality": "neighborhood", "administrative_area_level_1": "region", "administrative_area_level_2": "county", "country": "country", "premise": "building", "point_of_interest": "poi", "route": "road", "street_address": "address", } return mapping.get(type_, None) def search_osm(query, lang="en"): try: url = f"https://nominatim.openstreetmap.org/search?q={query}&format=jsonv2" headers = {"User-Agent": "Voyage Server", "Accept-Language": lang} response = requests.get(url, headers=headers, timeout=(2, 5)) response.raise_for_status() data = response.json() return [ { "lat": item.get("lat"), "lon": item.get("lon"), "name": item.get("name"), "display_name": item.get("display_name"), "type": item.get("type"), "category": item.get("category"), "importance": item.get("importance"), "addresstype": item.get("addresstype"), "powered_by": "nominatim", } for item in data ] except requests.exceptions.Timeout: return { "error": "Request timed out while contacting OpenStreetMap. Please try again." } except requests.exceptions.ConnectionError: return { "error": "Unable to connect to OpenStreetMap service. Please check your internet connection." } except requests.exceptions.HTTPError as e: if response.status_code == 400: return { "error": "Invalid request to OpenStreetMap. Please check your query." } elif response.status_code == 429: return { "error": "Too many requests to OpenStreetMap. Please try again later." } else: return {"error": "OpenStreetMap service error. Please try again later."} except requests.exceptions.RequestException: return { "error": "Network error while contacting OpenStreetMap. Please try again." } except Exception: return { "error": "An unexpected error occurred during OpenStreetMap search. Please try again." } def search(query): """ Unified search function that tries Google Maps first, then falls back to OpenStreetMap. """ if getattr(settings, "GOOGLE_MAPS_API_KEY", None): google_result = search_google(query) if "error" not in google_result: return google_result # If Google fails, fallback to OSM return search_osm(query) # ----------------- # REVERSE GEOCODING # ----------------- def extractIsoCode(user, data): """ Extract the ISO code from the response data. Returns a dictionary containing the region name, country name, and ISO code if found. """ iso_code = None display_name = None country_code = None city = None visited_city = None location_name = None if "name" in data.keys(): location_name = data["name"] address = data.get("address", {}) or {} # Capture country code early for ISO selection and name fallback. country_code = address.get("ISO3166-1") state_name = address.get("state") # Prefer the most specific ISO 3166-2 code available before falling back to country-level. # France gets lvl4 (regions) first for city matching, then lvl6 (departments) as a fallback. preferred_iso_keys = ( [ "ISO3166-2-lvl10", "ISO3166-2-lvl9", "ISO3166-2-lvl8", "ISO3166-2-lvl4", "ISO3166-2-lvl6", "ISO3166-2-lvl7", "ISO3166-2-lvl5", "ISO3166-2-lvl3", "ISO3166-2-lvl2", "ISO3166-2-lvl1", "ISO3166-2", ] if country_code == "FR" else [ "ISO3166-2-lvl10", "ISO3166-2-lvl9", "ISO3166-2-lvl8", "ISO3166-2-lvl4", "ISO3166-2-lvl7", "ISO3166-2-lvl6", "ISO3166-2-lvl5", "ISO3166-2-lvl3", "ISO3166-2-lvl2", "ISO3166-2-lvl1", "ISO3166-2", ] ) iso_candidates = [] for key in preferred_iso_keys: value = address.get(key) if value and value not in iso_candidates: iso_candidates.append(value) # If no region-level code, fall back to country code only as a last resort. if not iso_candidates and "ISO3166-1" in address: iso_candidates.append(address.get("ISO3166-1")) iso_code = iso_candidates[0] if iso_candidates else None region_candidates = [] for candidate in iso_candidates: if len(str(candidate)) <= 2: continue match = Region.objects.filter(id=candidate).first() if match and match not in region_candidates: region_candidates.append(match) region = region_candidates[0] if region_candidates else None # Fallback: attempt to resolve region by name and country code when no ISO match. if not region and state_name: region_queryset = Region.objects.filter(name__iexact=state_name) if country_code: region_queryset = region_queryset.filter(country__country_code=country_code) region = region_queryset.first() if region: iso_code = region.id if not country_code: country_code = region.country.country_code if region not in region_candidates: region_candidates.insert(0, region) if not region: return {"error": "No region found"} if not country_code: country_code = region.country.country_code region_visited = False city_visited = False # ordered preference for best-effort locality matching locality_keys = [ "suburb", "neighbourhood", "neighborhood", # alternate spelling "city", "city_district", "town", "village", "hamlet", "locality", "municipality", "county", ] def _normalize_name(value): normalized = unicodedata.normalize("NFKD", value) ascii_only = normalized.encode("ascii", "ignore").decode("ascii") return re.sub(r"[^a-z0-9]", "", ascii_only.lower()) def match_locality(key_name, target_region): value = address.get(key_name) if not value: return None qs = City.objects.filter(region=target_region) # Use exact matches first to avoid broad county/name collisions (e.g. Troms vs Tromsø). exact_match = qs.filter(name__iexact=value).first() if exact_match: return exact_match normalized_value = _normalize_name(value) for candidate in qs.values_list("id", "name"): candidate_id, candidate_name = candidate if _normalize_name(candidate_name) == normalized_value: return qs.filter(id=candidate_id).first() # Allow partial matching for most locality fields but keep county strict. if key_name == "county": return None return qs.filter(name__icontains=value).first() chosen_region = region for candidate_region in region_candidates or [region]: for key_name in locality_keys: city = match_locality(key_name, candidate_region) if city: chosen_region = candidate_region iso_code = chosen_region.id break if city: break region = chosen_region iso_code = region.id visited_region = VisitedRegion.objects.filter(region=region, user=user).first() region_visited = bool(visited_region) if city: display_name = ( f"{city.name}, {region.name}, {country_code or region.country.country_code}" ) visited_city = VisitedCity.objects.filter(city=city, user=user).first() city_visited = bool(visited_city) else: display_name = f"{region.name}, {country_code or region.country.country_code}" return { "region_id": iso_code, "region": region.name, "country": region.country.name, "country_id": region.country.country_code, "region_visited": region_visited, "display_name": display_name, "city": city.name if city else None, "city_id": city.id if city else None, "city_visited": city_visited, "location_name": location_name, } def is_host_resolvable(hostname: str) -> bool: try: socket.gethostbyname(hostname) return True except socket.error: return False def reverse_geocode(lat, lon, user): if getattr(settings, "GOOGLE_MAPS_API_KEY", None): google_result = reverse_geocode_google(lat, lon, user) if "error" not in google_result: return google_result # If Google fails, fallback to OSM return reverse_geocode_osm(lat, lon, user) return reverse_geocode_osm(lat, lon, user) def reverse_geocode_osm(lat, lon, user): url = ( f"https://nominatim.openstreetmap.org/reverse?format=jsonv2&lat={lat}&lon={lon}" ) headers = {"User-Agent": "Voyage Server"} connect_timeout = 1 read_timeout = 5 if not is_host_resolvable("nominatim.openstreetmap.org"): return { "error": "Unable to resolve OpenStreetMap service. Please check your internet connection." } try: response = requests.get( url, headers=headers, timeout=(connect_timeout, read_timeout) ) response.raise_for_status() data = response.json() return extractIsoCode(user, data) except requests.exceptions.Timeout: return { "error": "Request timed out while contacting OpenStreetMap. Please try again." } except requests.exceptions.ConnectionError: return { "error": "Unable to connect to OpenStreetMap service. Please check your internet connection." } except requests.exceptions.HTTPError as e: if response.status_code == 400: return { "error": "Invalid request to OpenStreetMap. Please check coordinates." } elif response.status_code == 429: return { "error": "Too many requests to OpenStreetMap. Please try again later." } else: return {"error": "OpenStreetMap service error. Please try again later."} except requests.exceptions.RequestException: return { "error": "Network error while contacting OpenStreetMap. Please try again." } except Exception: return { "error": "An unexpected error occurred during OpenStreetMap geocoding. Please try again." } def reverse_geocode_google(lat, lon, user): api_key = settings.GOOGLE_MAPS_API_KEY # Updated to use the new Geocoding API endpoint (this one is still supported) # The Geocoding API is separate from Places API and still uses the old format url = "https://maps.googleapis.com/maps/api/geocode/json" params = {"latlng": f"{lat},{lon}", "key": api_key} try: response = requests.get(url, params=params, timeout=(2, 5)) response.raise_for_status() data = response.json() status = data.get("status") if status != "OK": if status == "ZERO_RESULTS": return {"error": "No location found for the given coordinates."} elif status == "OVER_QUERY_LIMIT": return { "error": "Query limit exceeded for Google Maps. Please try again later." } elif status == "REQUEST_DENIED": return { "error": "Request denied by Google Maps. Please check API configuration." } elif status == "INVALID_REQUEST": return { "error": "Invalid request to Google Maps. Please check coordinates." } else: return {"error": "Geocoding failed. Please try again."} # Convert Google schema to Nominatim-style for extractIsoCode first_result = data.get("results", [])[0] result_data = { "name": first_result.get("formatted_address"), "address": _parse_google_address_components( first_result.get("address_components", []) ), } return extractIsoCode(user, result_data) except requests.exceptions.Timeout: return { "error": "Request timed out while contacting Google Maps. Please try again." } except requests.exceptions.ConnectionError: return { "error": "Unable to connect to Google Maps service. Please check your internet connection." } except requests.exceptions.HTTPError as e: if response.status_code == 400: return { "error": "Invalid request to Google Maps. Please check coordinates." } elif response.status_code == 401: return { "error": "Authentication failed with Google Maps. Please check API configuration." } elif response.status_code == 403: return { "error": "Access forbidden to Google Maps. Please check API permissions." } elif response.status_code == 429: return { "error": "Too many requests to Google Maps. Please try again later." } else: return {"error": "Google Maps service error. Please try again later."} except requests.exceptions.RequestException: return { "error": "Network error while contacting Google Maps. Please try again." } except Exception: return { "error": "An unexpected error occurred during Google geocoding. Please try again." } def _parse_google_address_components(components): parsed = {} country_code = None state_code = None for comp in components: types = comp.get("types", []) long_name = comp.get("long_name") short_name = comp.get("short_name") if "country" in types: parsed["country"] = long_name country_code = short_name parsed["ISO3166-1"] = short_name if "administrative_area_level_1" in types: parsed["state"] = long_name state_code = short_name if "administrative_area_level_2" in types: parsed["county"] = long_name if "administrative_area_level_3" in types: parsed["municipality"] = long_name if "locality" in types: parsed["city"] = long_name if "postal_town" in types: parsed.setdefault("city", long_name) if "sublocality" in types or any( t.startswith("sublocality_level_") for t in types ): parsed["suburb"] = long_name if "neighborhood" in types: parsed["neighbourhood"] = long_name if "route" in types: parsed["road"] = long_name if "street_address" in types: parsed["address"] = long_name # Build composite ISO 3166-2 code like US-ME (matches Region.id in DB) if country_code and state_code: parsed["ISO3166-2-lvl1"] = f"{country_code}-{state_code}" return parsed