diff --git a/Dockerfile.base b/Dockerfile.base index 60999b1..ade3d69 100644 --- a/Dockerfile.base +++ b/Dockerfile.base @@ -20,5 +20,9 @@ RUN pip install --no-cache-dir -r requirements.base.txt # Set working directory WORKDIR /app +# Create images directory for bind mount +RUN mkdir -p /app/images && \ + chmod 755 /app/images + # This base image will be reused for all worker builds CMD ["python3", "-m", "fastapi", "run", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file diff --git a/core/communication/websocket.py b/core/communication/websocket.py index da3b7ee..9def134 100644 --- a/core/communication/websocket.py +++ b/core/communication/websocket.py @@ -4,6 +4,10 @@ WebSocket message handling and protocol implementation. import asyncio import json import logging +import os +import cv2 +from datetime import datetime +from pathlib import Path from typing import Optional from fastapi import WebSocket, WebSocketDisconnect from websockets.exceptions import ConnectionClosedError @@ -447,6 +451,89 @@ class WebSocketHandler: logger.error(f"[Model Management] Exception ensuring model {model_id}: {str(e)}", exc_info=True) return False + async def _save_snapshot(self, display_identifier: str, session_id: int) -> None: + """ + Save snapshot image to images folder after receiving sessionId. + + Args: + display_identifier: Display identifier to match with subscriptionIdentifier + session_id: Session ID to include in filename + """ + try: + # Find subscription that matches the displayIdentifier + matching_subscription = None + for subscription in worker_state.get_all_subscriptions(): + # Extract display ID from subscriptionIdentifier (format: displayId;cameraId) + from .messages import extract_display_identifier + sub_display_id = extract_display_identifier(subscription.subscriptionIdentifier) + if sub_display_id == display_identifier: + matching_subscription = subscription + break + + if not matching_subscription: + logger.error(f"[Snapshot Save] No subscription found for display {display_identifier}") + return + + if not matching_subscription.snapshotUrl: + logger.error(f"[Snapshot Save] No snapshotUrl found for display {display_identifier}") + return + + # Ensure images directory exists (relative path for Docker bind mount) + images_dir = Path("images") + images_dir.mkdir(exist_ok=True) + + # Generate filename with timestamp and session ID + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"{display_identifier}_{session_id}_{timestamp}.jpg" + filepath = images_dir / filename + + # Use existing HTTPSnapshotReader to fetch snapshot + logger.info(f"[Snapshot Save] Fetching snapshot from {matching_subscription.snapshotUrl}") + + # Run snapshot fetch in thread pool to avoid blocking async loop + loop = asyncio.get_event_loop() + frame = await loop.run_in_executor(None, self._fetch_snapshot_sync, matching_subscription.snapshotUrl) + + if frame is not None: + # Save the image using OpenCV + success = cv2.imwrite(str(filepath), frame) + if success: + logger.info(f"[Snapshot Save] Successfully saved snapshot to {filepath}") + else: + logger.error(f"[Snapshot Save] Failed to save image file {filepath}") + else: + logger.error(f"[Snapshot Save] Failed to fetch snapshot from {matching_subscription.snapshotUrl}") + + except Exception as e: + logger.error(f"[Snapshot Save] Error saving snapshot for display {display_identifier}: {e}", exc_info=True) + + def _fetch_snapshot_sync(self, snapshot_url: str): + """ + Synchronous snapshot fetching using existing HTTPSnapshotReader infrastructure. + + Args: + snapshot_url: URL to fetch snapshot from + + Returns: + np.ndarray or None: Fetched frame or None on error + """ + try: + from ..streaming.readers import HTTPSnapshotReader + + # Create temporary snapshot reader for single fetch + snapshot_reader = HTTPSnapshotReader( + camera_id="temp_snapshot", + snapshot_url=snapshot_url, + interval_ms=5000 # Not used for single fetch + ) + + # Use existing fetch_single_snapshot method + return snapshot_reader.fetch_single_snapshot() + + except Exception as e: + logger.error(f"Error in sync snapshot fetch: {e}") + return None + async def _handle_set_session_id(self, message: SetSessionIdMessage) -> None: """Handle setSessionId message.""" display_identifier = message.payload.displayIdentifier @@ -460,6 +547,10 @@ class WebSocketHandler: # Update tracking integrations with session ID shared_stream_manager.set_session_id(display_identifier, session_id) + # Save snapshot image after getting sessionId + if session_id: + await self._save_snapshot(display_identifier, session_id) + async def _handle_set_progression_stage(self, message: SetProgressionStageMessage) -> None: """Handle setProgressionStage message.""" display_identifier = message.payload.displayIdentifier