feat: add save frame if there is sessionId
All checks were successful
Build Worker Base and Application Images / check-base-changes (push) Successful in 8s
Build Worker Base and Application Images / build-base (push) Successful in 5m23s
Build Worker Base and Application Images / build-docker (push) Successful in 3m11s
Build Worker Base and Application Images / deploy-stack (push) Successful in 23s

This commit is contained in:
ziesorx 2025-09-25 00:45:09 +07:00
parent 965a0d0a72
commit 2eba1f94ea
2 changed files with 95 additions and 0 deletions

View file

@ -20,5 +20,9 @@ RUN pip install --no-cache-dir -r requirements.base.txt
# Set working directory
WORKDIR /app
# Create images directory for bind mount
RUN mkdir -p /app/images && \
chmod 755 /app/images
# This base image will be reused for all worker builds
CMD ["python3", "-m", "fastapi", "run", "--host", "0.0.0.0", "--port", "8000"]

View file

@ -4,6 +4,10 @@ WebSocket message handling and protocol implementation.
import asyncio
import json
import logging
import os
import cv2
from datetime import datetime
from pathlib import Path
from typing import Optional
from fastapi import WebSocket, WebSocketDisconnect
from websockets.exceptions import ConnectionClosedError
@ -447,6 +451,89 @@ class WebSocketHandler:
logger.error(f"[Model Management] Exception ensuring model {model_id}: {str(e)}", exc_info=True)
return False
async def _save_snapshot(self, display_identifier: str, session_id: int) -> None:
"""
Save snapshot image to images folder after receiving sessionId.
Args:
display_identifier: Display identifier to match with subscriptionIdentifier
session_id: Session ID to include in filename
"""
try:
# Find subscription that matches the displayIdentifier
matching_subscription = None
for subscription in worker_state.get_all_subscriptions():
# Extract display ID from subscriptionIdentifier (format: displayId;cameraId)
from .messages import extract_display_identifier
sub_display_id = extract_display_identifier(subscription.subscriptionIdentifier)
if sub_display_id == display_identifier:
matching_subscription = subscription
break
if not matching_subscription:
logger.error(f"[Snapshot Save] No subscription found for display {display_identifier}")
return
if not matching_subscription.snapshotUrl:
logger.error(f"[Snapshot Save] No snapshotUrl found for display {display_identifier}")
return
# Ensure images directory exists (relative path for Docker bind mount)
images_dir = Path("images")
images_dir.mkdir(exist_ok=True)
# Generate filename with timestamp and session ID
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{display_identifier}_{session_id}_{timestamp}.jpg"
filepath = images_dir / filename
# Use existing HTTPSnapshotReader to fetch snapshot
logger.info(f"[Snapshot Save] Fetching snapshot from {matching_subscription.snapshotUrl}")
# Run snapshot fetch in thread pool to avoid blocking async loop
loop = asyncio.get_event_loop()
frame = await loop.run_in_executor(None, self._fetch_snapshot_sync, matching_subscription.snapshotUrl)
if frame is not None:
# Save the image using OpenCV
success = cv2.imwrite(str(filepath), frame)
if success:
logger.info(f"[Snapshot Save] Successfully saved snapshot to {filepath}")
else:
logger.error(f"[Snapshot Save] Failed to save image file {filepath}")
else:
logger.error(f"[Snapshot Save] Failed to fetch snapshot from {matching_subscription.snapshotUrl}")
except Exception as e:
logger.error(f"[Snapshot Save] Error saving snapshot for display {display_identifier}: {e}", exc_info=True)
def _fetch_snapshot_sync(self, snapshot_url: str):
"""
Synchronous snapshot fetching using existing HTTPSnapshotReader infrastructure.
Args:
snapshot_url: URL to fetch snapshot from
Returns:
np.ndarray or None: Fetched frame or None on error
"""
try:
from ..streaming.readers import HTTPSnapshotReader
# Create temporary snapshot reader for single fetch
snapshot_reader = HTTPSnapshotReader(
camera_id="temp_snapshot",
snapshot_url=snapshot_url,
interval_ms=5000 # Not used for single fetch
)
# Use existing fetch_single_snapshot method
return snapshot_reader.fetch_single_snapshot()
except Exception as e:
logger.error(f"Error in sync snapshot fetch: {e}")
return None
async def _handle_set_session_id(self, message: SetSessionIdMessage) -> None:
"""Handle setSessionId message."""
display_identifier = message.payload.displayIdentifier
@ -460,6 +547,10 @@ class WebSocketHandler:
# Update tracking integrations with session ID
shared_stream_manager.set_session_id(display_identifier, session_id)
# Save snapshot image after getting sessionId
if session_id:
await self._save_snapshot(display_identifier, session_id)
async def _handle_set_progression_stage(self, message: SetProgressionStageMessage) -> None:
"""Handle setProgressionStage message."""
display_identifier = message.payload.displayIdentifier