diff --git a/app.py b/app.py index 6338401..2e6a0c5 100644 --- a/app.py +++ b/app.py @@ -6,8 +6,9 @@ import json import logging import os import time +import cv2 from contextlib import asynccontextmanager -from fastapi import FastAPI, WebSocket, HTTPException, Request +from fastapi import FastAPI, WebSocket, HTTPException from fastapi.responses import Response # Import new modular communication system @@ -27,8 +28,8 @@ logging.basicConfig( logger = logging.getLogger("detector_worker") logger.setLevel(logging.DEBUG) -# Store cached frames for REST API access (temporary storage) -latest_frames = {} +# Frames are now stored in the shared cache buffer from core.streaming.buffers +# latest_frames = {} # Deprecated - using shared_cache_buffer instead # Lifespan event handler (modern FastAPI approach) @asynccontextmanager @@ -49,7 +50,7 @@ async def lifespan(app: FastAPI): worker_state.set_subscriptions([]) worker_state.session_ids.clear() worker_state.progression_stages.clear() - latest_frames.clear() + # latest_frames.clear() # No longer needed - frames are in shared_cache_buffer logger.info("Detector Worker shutdown complete") # Create FastAPI application with detailed WebSocket logging @@ -90,8 +91,8 @@ from core.streaming import initialize_stream_manager initialize_stream_manager(max_streams=config.get('max_streams', 10)) logger.info(f"Initialized stream manager with max_streams={config.get('max_streams', 10)}") -# Store cached frames for REST API access (temporary storage) -latest_frames = {} +# Frames are now stored in the shared cache buffer from core.streaming.buffers +# latest_frames = {} # Deprecated - using shared_cache_buffer instead logger.info("Starting detector worker application (refactored)") logger.info(f"Configuration: Target FPS: {config.get('target_fps', 10)}, " @@ -150,31 +151,36 @@ async def get_camera_image(camera_id: str): detail=f"Camera {camera_id} not found or not active" ) - # Check if we have a cached frame for this camera - if camera_id not in latest_frames: - logger.warning(f"No cached frame available for camera '{camera_id}'") + # Extract actual camera_id from subscription identifier (displayId;cameraId) + # Frames are stored using just the camera_id part + actual_camera_id = camera_id.split(';')[-1] if ';' in camera_id else camera_id + + # Get frame from the shared cache buffer + from core.streaming.buffers import shared_cache_buffer + + # Debug: Log available cameras in buffer + available_cameras = shared_cache_buffer.frame_buffer.get_camera_list() + logger.debug(f"Available cameras in buffer: {available_cameras}") + logger.debug(f"Looking for camera: '{actual_camera_id}'") + + frame = shared_cache_buffer.get_frame(actual_camera_id) + if frame is None: + logger.warning(f"No cached frame available for camera '{actual_camera_id}' (from subscription '{camera_id}')") + logger.warning(f"Available cameras in buffer: {available_cameras}") raise HTTPException( status_code=404, - detail=f"No frame available for camera {camera_id}" + detail=f"No frame available for camera {actual_camera_id}" ) - frame = latest_frames[camera_id] - logger.debug(f"Retrieved cached frame for camera '{camera_id}', shape: {frame.shape}") + logger.debug(f"Retrieved cached frame for camera '{actual_camera_id}' (from subscription '{camera_id}'), shape: {frame.shape}") - # TODO: This import will be replaced in Phase 3 (Streaming System) - # For now, we need to handle the case where OpenCV is not available - try: - import cv2 - # Encode frame as JPEG - success, buffer_img = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) - if not success: - raise HTTPException(status_code=500, detail="Failed to encode image as JPEG") + # Encode frame as JPEG + success, buffer_img = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) + if not success: + raise HTTPException(status_code=500, detail="Failed to encode image as JPEG") - # Return image as binary response - return Response(content=buffer_img.tobytes(), media_type="image/jpeg") - except ImportError: - logger.error("OpenCV not available for image encoding") - raise HTTPException(status_code=500, detail="Image processing not available") + # Return image as binary response + return Response(content=buffer_img.tobytes(), media_type="image/jpeg") except HTTPException: raise diff --git a/core/communication/websocket.py b/core/communication/websocket.py index 813350e..077c6dc 100644 --- a/core/communication/websocket.py +++ b/core/communication/websocket.py @@ -377,6 +377,8 @@ class WebSocketHandler: camera_id = subscription_id.split(';')[-1] model_id = payload['modelId'] + logger.info(f"[SUBSCRIPTION_MAPPING] subscription_id='{subscription_id}' → camera_id='{camera_id}'") + # Get tracking integration for this model tracking_integration = tracking_integrations.get(model_id) diff --git a/core/streaming/buffers.py b/core/streaming/buffers.py index fd29fbb..f2c5787 100644 --- a/core/streaming/buffers.py +++ b/core/streaming/buffers.py @@ -46,13 +46,7 @@ class FrameBuffer: frame_data = self._frames[camera_id] - # Check if frame is too old - age = time.time() - frame_data['timestamp'] - if age > self.max_age_seconds: - logger.debug(f"Frame for camera {camera_id} is {age:.1f}s old, discarding") - del self._frames[camera_id] - return None - + # Return frame regardless of age - frames persist until replaced return frame_data['frame'].copy() def get_frame_info(self, camera_id: str) -> Optional[Dict[str, Any]]: @@ -64,10 +58,7 @@ class FrameBuffer: frame_data = self._frames[camera_id] age = time.time() - frame_data['timestamp'] - if age > self.max_age_seconds: - del self._frames[camera_id] - return None - + # Return frame info regardless of age - frames persist until replaced return { 'timestamp': frame_data['timestamp'], 'age': age, @@ -95,24 +86,10 @@ class FrameBuffer: logger.debug(f"Cleared all frames ({count} cameras)") def get_camera_list(self) -> list: - """Get list of cameras with valid frames.""" + """Get list of cameras with frames - all frames persist until replaced.""" with self._lock: - current_time = time.time() - valid_cameras = [] - expired_cameras = [] - - for camera_id, frame_data in self._frames.items(): - age = current_time - frame_data['timestamp'] - if age <= self.max_age_seconds: - valid_cameras.append(camera_id) - else: - expired_cameras.append(camera_id) - - # Clean up expired frames - for camera_id in expired_cameras: - del self._frames[camera_id] - - return valid_cameras + # Return all cameras that have frames - no age-based filtering + return list(self._frames.keys()) def get_stats(self) -> Dict[str, Any]: """Get buffer statistics.""" @@ -120,8 +97,8 @@ class FrameBuffer: current_time = time.time() stats = { 'total_cameras': len(self._frames), - 'valid_cameras': 0, - 'expired_cameras': 0, + 'recent_cameras': 0, + 'stale_cameras': 0, 'total_memory_mb': 0, 'cameras': {} } @@ -130,16 +107,17 @@ class FrameBuffer: age = current_time - frame_data['timestamp'] size_mb = frame_data.get('size_mb', 0) + # All frames are valid/available, but categorize by freshness for monitoring if age <= self.max_age_seconds: - stats['valid_cameras'] += 1 + stats['recent_cameras'] += 1 else: - stats['expired_cameras'] += 1 + stats['stale_cameras'] += 1 stats['total_memory_mb'] += size_mb stats['cameras'][camera_id] = { 'age': age, - 'valid': age <= self.max_age_seconds, + 'recent': age <= self.max_age_seconds, # Recent but all frames available 'shape': frame_data['shape'], 'dtype': frame_data['dtype'], 'size_mb': size_mb diff --git a/core/streaming/manager.py b/core/streaming/manager.py index 156daf1..0c172ac 100644 --- a/core/streaming/manager.py +++ b/core/streaming/manager.py @@ -130,6 +130,7 @@ class StreamManager: try: if stream_config.rtsp_url: # RTSP stream using FFmpeg subprocess with CUDA acceleration + logger.info(f"[STREAM_START] Starting FFmpeg RTSP stream for camera_id='{camera_id}' URL={stream_config.rtsp_url}") reader = FFmpegRTSPReader( camera_id=camera_id, rtsp_url=stream_config.rtsp_url, @@ -138,10 +139,11 @@ class StreamManager: reader.set_frame_callback(self._frame_callback) reader.start() self._streams[camera_id] = reader - logger.info(f"Started FFmpeg RTSP stream for camera {camera_id}") + logger.info(f"[STREAM_START] ✅ Started FFmpeg RTSP stream for camera_id='{camera_id}'") elif stream_config.snapshot_url: # HTTP snapshot stream + logger.info(f"[STREAM_START] Starting HTTP snapshot stream for camera_id='{camera_id}' URL={stream_config.snapshot_url}") reader = HTTPSnapshotReader( camera_id=camera_id, snapshot_url=stream_config.snapshot_url, @@ -151,7 +153,7 @@ class StreamManager: reader.set_frame_callback(self._frame_callback) reader.start() self._streams[camera_id] = reader - logger.info(f"Started HTTP snapshot stream for camera {camera_id}") + logger.info(f"[STREAM_START] ✅ Started HTTP snapshot stream for camera_id='{camera_id}'") else: logger.error(f"No valid URL provided for camera {camera_id}") @@ -169,8 +171,9 @@ class StreamManager: try: self._streams[camera_id].stop() del self._streams[camera_id] - shared_cache_buffer.clear_camera(camera_id) - logger.info(f"Stopped stream for camera {camera_id}") + # DON'T clear frames - they should persist until replaced + # shared_cache_buffer.clear_camera(camera_id) # REMOVED - frames should persist + logger.info(f"Stopped stream for camera {camera_id} (frames preserved in buffer)") except Exception as e: logger.error(f"Error stopping stream for camera {camera_id}: {e}") @@ -179,6 +182,11 @@ class StreamManager: try: # Store frame in shared buffer shared_cache_buffer.put_frame(camera_id, frame) + logger.info(f"[FRAME_CALLBACK] Stored frame for camera_id='{camera_id}' in shared_cache_buffer, shape={frame.shape}") + + # Log current buffer state + available_cameras = shared_cache_buffer.frame_buffer.get_camera_list() + logger.info(f"[FRAME_CALLBACK] Buffer now contains {len(available_cameras)} cameras: {available_cameras}") # Process tracking for subscriptions with tracking integration self._process_tracking_for_camera(camera_id, frame) diff --git a/core/streaming/readers.py b/core/streaming/readers.py index d8d4b4d..4b5c8ba 100644 --- a/core/streaming/readers.py +++ b/core/streaming/readers.py @@ -101,14 +101,14 @@ class FFmpegRTSPReader: # This ensures each file is complete when written camera_id_safe = self.camera_id.replace(' ', '_') self.frame_prefix = f"camera_{camera_id_safe}" - # Using strftime pattern with microseconds for unique filenames - self.frame_pattern = f"{self.frame_dir}/{self.frame_prefix}_%Y%m%d_%H%M%S_%f.ppm" + # Using strftime pattern with seconds for unique filenames (avoid %f which may not work) + self.frame_pattern = f"{self.frame_dir}/{self.frame_prefix}_%Y%m%d_%H%M%S.ppm" cmd = [ 'ffmpeg', # DO NOT REMOVE - '-hwaccel', 'cuda', - '-hwaccel_device', '0', + # '-hwaccel', 'cuda', + # '-hwaccel_device', '0', '-rtsp_transport', 'tcp', '-i', self.rtsp_url, '-f', 'image2', @@ -201,14 +201,17 @@ class FFmpegRTSPReader: # Sort by filename (which includes timestamp) and get the latest frame_files.sort() latest_frame = frame_files[-1] + logger.debug(f"Camera {self.camera_id}: Found {len(frame_files)} frames, processing latest: {latest_frame}") # Read the latest frame (it's complete since FFmpeg wrote it atomically) frame = cv2.imread(latest_frame) - if frame is not None and frame.shape == (self.height, self.width, 3): - # Call frame callback directly + if frame is not None: + logger.debug(f"Camera {self.camera_id}: Successfully read frame {frame.shape} from {latest_frame}") + # Accept any frame dimensions initially for debugging if self.frame_callback: self.frame_callback(self.camera_id, frame) + logger.debug(f"Camera {self.camera_id}: Called frame callback") frame_count += 1 @@ -217,6 +220,8 @@ class FFmpegRTSPReader: if current_time - last_log_time >= 30: logger.info(f"Camera {self.camera_id}: {frame_count} frames processed") last_log_time = current_time + else: + logger.warning(f"Camera {self.camera_id}: Failed to read frame from {latest_frame}") # Clean up old frame files to prevent disk filling # Keep only the latest 5 frames @@ -226,6 +231,8 @@ class FFmpegRTSPReader: os.remove(old_file) except: pass + else: + logger.warning(f"Camera {self.camera_id}: No frame files found in {self.frame_dir} with pattern {self.frame_prefix}*.ppm") except Exception as e: logger.debug(f"Camera {self.camera_id}: Error reading frames: {e}")