Files
voyage/backend/server/adventures/views/location_image_view.py
alex df8d1adf15 Rename AdventureLog to Voyage and add fork attribution
- Replace all AdventureLog references with Voyage across ~102 files
  (7 case variants: AdventureLog, adventurelog, Adventurelog, ADVENTURELOG,
  AdventUrelog, AdventureLOG, adventure-log, adventure_log)
- Rename brand, static, and documentation assets to use voyage naming
- Rename install_adventurelog.sh → install_voyage.sh
- Update README.md and voyage_overview.md to credit AdventureLog as
  the upstream project and Sean Morley as its original creator
2026-03-06 11:05:26 +00:00

471 lines
19 KiB
Python

from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from rest_framework.throttling import UserRateThrottle
from django.http import HttpResponse
import ipaddress
import socket
from urllib.parse import urlparse
from django.db.models import Q
from django.core.files.base import ContentFile
from django.contrib.contenttypes.models import ContentType
from adventures.models import Location, Transportation, Note, Lodging, Visit, ContentImage
from adventures.serializers import ContentImageSerializer
from integrations.models import ImmichIntegration
from adventures.permissions import IsOwnerOrSharedWithFullAccess # Your existing permission class
import requests
from adventures.permissions import ContentImagePermission
import logging
logger = logging.getLogger(__name__)
class ImageProxyThrottle(UserRateThrottle):
scope = 'image_proxy'
def _is_safe_url(image_url):
"""
Validate a URL for safe proxy use.
Returns (True, parsed) on success or (False, error_message) on failure.
Checks:
- Scheme is http or https
- No non-standard ports (only 80 and 443 allowed)
- All resolved IPs are public (no private/loopback/reserved/link-local/multicast)
"""
parsed = urlparse(image_url)
if parsed.scheme not in ('http', 'https'):
return False, "Invalid URL scheme. Only http and https are allowed."
port = parsed.port
if port is not None and port not in (80, 443):
return False, "Non-standard ports are not allowed."
hostname = parsed.hostname
if not hostname:
return False, "Invalid URL: missing hostname."
try:
addr_infos = socket.getaddrinfo(hostname, None)
except socket.gaierror:
return False, "Could not resolve hostname."
if not addr_infos:
return False, "Could not resolve hostname."
for addr_info in addr_infos:
try:
ip = ipaddress.ip_address(addr_info[4][0])
except ValueError:
return False, "Invalid IP address resolved from hostname."
if (ip.is_private or ip.is_loopback or ip.is_reserved
or ip.is_link_local or ip.is_multicast):
return False, "Access to internal networks is not allowed."
return True, parsed
class ContentImageViewSet(viewsets.ModelViewSet):
serializer_class = ContentImageSerializer
permission_classes = [ContentImagePermission]
def get_queryset(self):
"""Get all images the user has access to"""
if not self.request.user.is_authenticated:
return ContentImage.objects.none()
# Import here to avoid circular imports
from adventures.models import Location, Transportation, Note, Lodging, Visit
# Build a single query with all conditions
return ContentImage.objects.filter(
# User owns the image directly (if user field exists on ContentImage)
Q(user=self.request.user) |
# Or user has access to the content object
(
# Locations owned by user
Q(content_type=ContentType.objects.get_for_model(Location)) &
Q(object_id__in=Location.objects.filter(user=self.request.user).values_list('id', flat=True))
) |
(
# Shared locations
Q(content_type=ContentType.objects.get_for_model(Location)) &
Q(object_id__in=Location.objects.filter(collections__shared_with=self.request.user).values_list('id', flat=True))
) |
(
# Collections owned by user containing locations
Q(content_type=ContentType.objects.get_for_model(Location)) &
Q(object_id__in=Location.objects.filter(collections__user=self.request.user).values_list('id', flat=True))
) |
(
# Transportation owned by user
Q(content_type=ContentType.objects.get_for_model(Transportation)) &
Q(object_id__in=Transportation.objects.filter(user=self.request.user).values_list('id', flat=True))
) |
(
# Notes owned by user
Q(content_type=ContentType.objects.get_for_model(Note)) &
Q(object_id__in=Note.objects.filter(user=self.request.user).values_list('id', flat=True))
) |
(
# Lodging owned by user
Q(content_type=ContentType.objects.get_for_model(Lodging)) &
Q(object_id__in=Lodging.objects.filter(user=self.request.user).values_list('id', flat=True))
) |
(
# Notes shared with user
Q(content_type=ContentType.objects.get_for_model(Note)) &
Q(object_id__in=Note.objects.filter(collection__shared_with=self.request.user).values_list('id', flat=True))
) |
(
# Lodging shared with user
Q(content_type=ContentType.objects.get_for_model(Lodging)) &
Q(object_id__in=Lodging.objects.filter(collection__shared_with=self.request.user).values_list('id', flat=True))
) |
(
# Transportation shared with user
Q(content_type=ContentType.objects.get_for_model(Transportation)) &
Q(object_id__in=Transportation.objects.filter(collection__shared_with=self.request.user).values_list('id', flat=True))
) |
(
# Visits - access through location's user
Q(content_type=ContentType.objects.get_for_model(Visit)) &
Q(object_id__in=Visit.objects.filter(location__user=self.request.user).values_list('id', flat=True))
) |
(
# Visits - access through shared locations
Q(content_type=ContentType.objects.get_for_model(Visit)) &
Q(object_id__in=Visit.objects.filter(location__collections__shared_with=self.request.user).values_list('id', flat=True))
) |
(
# Visits - access through collections owned by user
Q(content_type=ContentType.objects.get_for_model(Visit)) &
Q(object_id__in=Visit.objects.filter(location__collections__user=self.request.user).values_list('id', flat=True))
)
).distinct()
@action(detail=True, methods=['post'])
def image_delete(self, request, *args, **kwargs):
return self.destroy(request, *args, **kwargs)
@action(detail=True, methods=['post'])
def toggle_primary(self, request, *args, **kwargs):
instance = self.get_object()
# Check if the image is already the primary image
if instance.is_primary:
return Response(
{"error": "Image is already the primary image"},
status=status.HTTP_400_BAD_REQUEST
)
# Set other images of the same content object to not primary
ContentImage.objects.filter(
content_type=instance.content_type,
object_id=instance.object_id,
is_primary=True
).update(is_primary=False)
# Set the new image to primary
instance.is_primary = True
instance.save()
return Response({"success": "Image set as primary image"})
@action(detail=False, methods=['post'],
permission_classes=[IsAuthenticated],
throttle_classes=[ImageProxyThrottle])
def fetch_from_url(self, request):
"""
Authenticated proxy endpoint to fetch images from external URLs.
Avoids CORS issues when the frontend downloads images from third-party
servers (e.g. wikimedia.org). Requires a logged-in user and is
rate-limited to 60 requests/minute.
"""
image_url = request.data.get('url')
if not image_url:
return Response(
{"error": "URL is required"},
status=status.HTTP_400_BAD_REQUEST
)
# Validate the initial URL (scheme, port, SSRF check on all resolved IPs)
safe, result = _is_safe_url(image_url)
if not safe:
return Response({"error": result}, status=status.HTTP_400_BAD_REQUEST)
try:
headers = {'User-Agent': 'Voyage/1.0 (Image Proxy)'}
max_redirects = 3
current_url = image_url
for _ in range(max_redirects + 1):
response = requests.get(
current_url,
timeout=10,
headers=headers,
stream=True,
allow_redirects=False,
)
if not response.is_redirect:
break
# Re-validate every redirect destination before following
redirect_url = response.headers.get('Location', '')
if not redirect_url:
return Response(
{"error": "Redirect with missing Location header"},
status=status.HTTP_502_BAD_GATEWAY,
)
safe, result = _is_safe_url(redirect_url)
if not safe:
return Response(
{"error": f"Redirect blocked: {result}"},
status=status.HTTP_400_BAD_REQUEST,
)
current_url = redirect_url
else:
return Response(
{"error": "Too many redirects"},
status=status.HTTP_400_BAD_REQUEST,
)
response.raise_for_status()
content_type = response.headers.get('Content-Type', '')
if not content_type.startswith('image/'):
return Response(
{"error": "URL does not point to an image"},
status=status.HTTP_400_BAD_REQUEST
)
content_length = response.headers.get('Content-Length')
if content_length and int(content_length) > 20 * 1024 * 1024:
return Response(
{"error": "Image too large (max 20MB)"},
status=status.HTTP_400_BAD_REQUEST
)
image_data = response.content
return HttpResponse(image_data, content_type=content_type, status=200)
except requests.exceptions.Timeout:
logger.error("Timeout fetching image from URL %s", image_url)
return Response(
{"error": "Download timeout - image may be too large or server too slow"},
status=status.HTTP_504_GATEWAY_TIMEOUT
)
except requests.exceptions.RequestException as e:
logger.error("Failed to fetch image from URL %s: %s", image_url, str(e))
return Response(
{"error": "Failed to fetch image from the remote server"},
status=status.HTTP_502_BAD_GATEWAY
)
def create(self, request, *args, **kwargs):
# Get content type and object ID from request
content_type_name = request.data.get('content_type')
object_id = request.data.get('object_id')
if not content_type_name or not object_id:
return Response({
"error": "content_type and object_id are required"
}, status=status.HTTP_400_BAD_REQUEST)
# Get the content object and validate permissions
content_object = self._get_and_validate_content_object(content_type_name, object_id)
if isinstance(content_object, Response): # Error response
return content_object
content_type = ContentType.objects.get_for_model(content_object.__class__)
# Handle Immich ID for shared users by downloading the image
if (hasattr(content_object, 'user') and
request.user != content_object.user and
'immich_id' in request.data and
request.data.get('immich_id')):
return self._handle_immich_image_creation(request, content_object, content_type, object_id)
# Standard image creation
return self._create_standard_image(request, content_object, content_type, object_id)
def _get_and_validate_content_object(self, content_type_name, object_id):
"""Get and validate the content object exists and user has access"""
# Map content type names to model classes
content_type_map = {
'location': Location,
'transportation': Transportation,
'note': Note,
'lodging': Lodging,
'visit': Visit,
}
if content_type_name not in content_type_map:
return Response({
"error": f"Invalid content_type. Must be one of: {', '.join(content_type_map.keys())}"
}, status=status.HTTP_400_BAD_REQUEST)
# Validate object_id format (must be a valid UUID, not "undefined" or empty)
if not object_id or object_id == 'undefined':
return Response({
"error": "object_id is required and must be a valid UUID"
}, status=status.HTTP_400_BAD_REQUEST)
import uuid as uuid_module
try:
uuid_module.UUID(str(object_id))
except (ValueError, AttributeError):
return Response({
"error": f"Invalid object_id format: {object_id}"
}, status=status.HTTP_400_BAD_REQUEST)
# Get the content object
try:
content_object = content_type_map[content_type_name].objects.get(id=object_id)
except (ValueError, content_type_map[content_type_name].DoesNotExist):
return Response({
"error": f"{content_type_name} not found"
}, status=status.HTTP_404_NOT_FOUND)
# Check permissions using the permission class
permission_checker = IsOwnerOrSharedWithFullAccess()
if not permission_checker.has_object_permission(self.request, self, content_object):
return Response({
"error": "User does not have permission to access this content"
}, status=status.HTTP_403_FORBIDDEN)
return content_object
def _handle_immich_image_creation(self, request, content_object, content_type, object_id):
"""Handle creation of image from Immich for shared users"""
immich_id = request.data.get('immich_id')
# Get the shared user's Immich integration
try:
user_integration = ImmichIntegration.objects.get(user=request.user)
except ImmichIntegration.DoesNotExist:
return Response({
"error": "No Immich integration found for your account. Please set up Immich integration first.",
"code": "immich_integration_not_found"
}, status=status.HTTP_400_BAD_REQUEST)
# Download the image from the shared user's Immich server
try:
immich_response = requests.get(
f'{user_integration.server_url}/assets/{immich_id}/thumbnail?size=preview',
headers={'x-api-key': user_integration.api_key},
timeout=10
)
immich_response.raise_for_status()
# Create a temporary file with the downloaded content
content_type_header = immich_response.headers.get('Content-Type', 'image/jpeg')
if not content_type_header.startswith('image/'):
return Response({
"error": "Invalid content type returned from Immich server.",
"code": "invalid_content_type"
}, status=status.HTTP_400_BAD_REQUEST)
# Determine file extension from content type
ext_map = {
'image/jpeg': '.jpg',
'image/png': '.png',
'image/webp': '.webp',
'image/gif': '.gif'
}
file_ext = ext_map.get(content_type_header, '.jpg')
filename = f"immich_{immich_id}{file_ext}"
# Create a Django ContentFile from the downloaded image
image_file = ContentFile(immich_response.content, name=filename)
# Modify request data to use the downloaded image instead of immich_id
request_data = request.data.copy()
request_data.pop('immich_id', None) # Remove immich_id
request_data['image'] = image_file # Add the image file
request_data['content_type'] = content_type.id
request_data['object_id'] = object_id
# Create the serializer with the modified data
serializer = self.get_serializer(data=request_data)
serializer.is_valid(raise_exception=True)
# Save with the downloaded image
serializer.save(
user=content_object.user if hasattr(content_object, 'user') else request.user,
image=image_file,
content_type=content_type,
object_id=object_id
)
return Response(serializer.data, status=status.HTTP_201_CREATED)
except requests.exceptions.RequestException:
return Response({
"error": f"Failed to fetch image from Immich server",
"code": "immich_fetch_failed"
}, status=status.HTTP_502_BAD_GATEWAY)
except Exception:
return Response({
"error": f"Unexpected error processing Immich image",
"code": "immich_processing_error"
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def _create_standard_image(self, request, content_object, content_type, object_id):
"""Handle standard image creation without deepcopy issues"""
# Get uploaded image file safely
image_file = request.FILES.get('image')
immich_id = request.data.get('immich_id')
if not image_file and not immich_id:
return Response({"error": "No image uploaded"}, status=status.HTTP_400_BAD_REQUEST)
# Build a clean dict for serializer input
request_data = {
'content_type': content_type.id,
'object_id': object_id,
}
# Add immich_id if provided
if immich_id:
request_data['immich_id'] = immich_id
# Optionally add other fields (e.g., caption, alt text) from request.data
for key in ['caption', 'alt_text', 'description']: # update as needed
if key in request.data:
request_data[key] = request.data[key]
# Create and validate serializer
serializer = self.get_serializer(data=request_data)
serializer.is_valid(raise_exception=True)
# Prepare save parameters
save_kwargs = {
'user': getattr(content_object, 'user', request.user),
'content_type': content_type,
'object_id': object_id,
}
# Add image file if provided
if image_file:
save_kwargs['image'] = image_file
# Save with appropriate parameters
serializer.save(**save_kwargs)
return Response(serializer.data, status=status.HTTP_201_CREATED)
def perform_create(self, serializer):
# The content_type and object_id are already set in the create method
# Just ensure the user is set correctly
pass