From 719d16ae4d32c25c35a09bdd4e8fe1a7c9b83488 Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Fri, 26 Sep 2025 00:07:48 +0700 Subject: [PATCH] refactor: simplify frame handling by removing stream type management and enhancing validation --- .claude/settings.local.json | 9 +++ core/streaming/buffers.py | 134 +++++++----------------------------- core/streaming/manager.py | 41 +---------- core/streaming/readers.py | 49 ++++--------- 4 files changed, 51 insertions(+), 182 deletions(-) create mode 100644 .claude/settings.local.json diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000..b06024d --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,9 @@ +{ + "permissions": { + "allow": [ + "Bash(dir:*)" + ], + "deny": [], + "ask": [] + } +} \ No newline at end of file diff --git a/core/streaming/buffers.py b/core/streaming/buffers.py index 602e028..fd29fbb 100644 --- a/core/streaming/buffers.py +++ b/core/streaming/buffers.py @@ -9,53 +9,25 @@ import logging import numpy as np from typing import Optional, Dict, Any, Tuple from collections import defaultdict -from enum import Enum logger = logging.getLogger(__name__) -class StreamType(Enum): - """Stream type enumeration.""" - RTSP = "rtsp" # 1280x720 @ 6fps - HTTP = "http" # 2560x1440 high quality - - class FrameBuffer: - """Thread-safe frame buffer optimized for different stream types.""" + """Thread-safe frame buffer for all camera streams.""" def __init__(self, max_age_seconds: int = 5): self.max_age_seconds = max_age_seconds self._frames: Dict[str, Dict[str, Any]] = {} - self._stream_types: Dict[str, StreamType] = {} self._lock = threading.RLock() - # Stream-specific settings - self.rtsp_config = { - 'width': 1280, - 'height': 720, - 'fps': 6, - 'max_size_mb': 3 # 1280x720x3 bytes = ~2.6MB - } - self.http_config = { - 'width': 2560, - 'height': 1440, - 'max_size_mb': 10 - } - - def put_frame(self, camera_id: str, frame: np.ndarray, stream_type: Optional[StreamType] = None): - """Store a frame for the given camera ID with type-specific validation.""" + def put_frame(self, camera_id: str, frame: np.ndarray): + """Store a frame for the given camera ID.""" with self._lock: - # Detect stream type if not provided - if stream_type is None: - stream_type = self._detect_stream_type(frame) - - # Store stream type - self._stream_types[camera_id] = stream_type - - # Validate frame based on stream type - if not self._validate_frame(frame, stream_type): - logger.warning(f"Frame validation failed for camera {camera_id} ({stream_type.value})") + # Validate frame + if not self._validate_frame(frame): + logger.warning(f"Frame validation failed for camera {camera_id}") return self._frames[camera_id] = { @@ -63,14 +35,9 @@ class FrameBuffer: 'timestamp': time.time(), 'shape': frame.shape, 'dtype': str(frame.dtype), - 'stream_type': stream_type.value, 'size_mb': frame.nbytes / (1024 * 1024) } - # Commented out verbose frame storage logging - # logger.debug(f"Stored {stream_type.value} frame for camera {camera_id}: " - # f"{frame.shape[1]}x{frame.shape[0]}, {frame.nbytes / (1024 * 1024):.2f}MB") - def get_frame(self, camera_id: str) -> Optional[np.ndarray]: """Get the latest frame for the given camera ID.""" with self._lock: @@ -84,8 +51,6 @@ class FrameBuffer: if age > self.max_age_seconds: logger.debug(f"Frame for camera {camera_id} is {age:.1f}s old, discarding") del self._frames[camera_id] - if camera_id in self._stream_types: - del self._stream_types[camera_id] return None return frame_data['frame'].copy() @@ -101,8 +66,6 @@ class FrameBuffer: if age > self.max_age_seconds: del self._frames[camera_id] - if camera_id in self._stream_types: - del self._stream_types[camera_id] return None return { @@ -110,7 +73,6 @@ class FrameBuffer: 'age': age, 'shape': frame_data['shape'], 'dtype': frame_data['dtype'], - 'stream_type': frame_data.get('stream_type', 'unknown'), 'size_mb': frame_data.get('size_mb', 0) } @@ -123,8 +85,6 @@ class FrameBuffer: with self._lock: if camera_id in self._frames: del self._frames[camera_id] - if camera_id in self._stream_types: - del self._stream_types[camera_id] logger.debug(f"Cleared frames for camera {camera_id}") def clear_all(self): @@ -132,7 +92,6 @@ class FrameBuffer: with self._lock: count = len(self._frames) self._frames.clear() - self._stream_types.clear() logger.debug(f"Cleared all frames ({count} cameras)") def get_camera_list(self) -> list: @@ -152,8 +111,6 @@ class FrameBuffer: # Clean up expired frames for camera_id in expired_cameras: del self._frames[camera_id] - if camera_id in self._stream_types: - del self._stream_types[camera_id] return valid_cameras @@ -165,15 +122,12 @@ class FrameBuffer: 'total_cameras': len(self._frames), 'valid_cameras': 0, 'expired_cameras': 0, - 'rtsp_cameras': 0, - 'http_cameras': 0, 'total_memory_mb': 0, 'cameras': {} } for camera_id, frame_data in self._frames.items(): age = current_time - frame_data['timestamp'] - stream_type = frame_data.get('stream_type', 'unknown') size_mb = frame_data.get('size_mb', 0) if age <= self.max_age_seconds: @@ -181,11 +135,6 @@ class FrameBuffer: else: stats['expired_cameras'] += 1 - if stream_type == StreamType.RTSP.value: - stats['rtsp_cameras'] += 1 - elif stream_type == StreamType.HTTP.value: - stats['http_cameras'] += 1 - stats['total_memory_mb'] += size_mb stats['cameras'][camera_id] = { @@ -193,74 +142,45 @@ class FrameBuffer: 'valid': age <= self.max_age_seconds, 'shape': frame_data['shape'], 'dtype': frame_data['dtype'], - 'stream_type': stream_type, 'size_mb': size_mb } return stats - def _detect_stream_type(self, frame: np.ndarray) -> StreamType: - """Detect stream type based on frame dimensions.""" - h, w = frame.shape[:2] - - # Check if it matches RTSP dimensions (1280x720) - if w == self.rtsp_config['width'] and h == self.rtsp_config['height']: - return StreamType.RTSP - - # Check if it matches HTTP dimensions (2560x1440) or close to it - if w >= 2000 and h >= 1000: - return StreamType.HTTP - - # Default based on size - if w <= 1920 and h <= 1080: - return StreamType.RTSP - else: - return StreamType.HTTP - - def _validate_frame(self, frame: np.ndarray, stream_type: StreamType) -> bool: - """Validate frame based on stream type.""" + def _validate_frame(self, frame: np.ndarray) -> bool: + """Validate frame - basic validation for any stream type.""" if frame is None or frame.size == 0: return False h, w = frame.shape[:2] size_mb = frame.nbytes / (1024 * 1024) - if stream_type == StreamType.RTSP: - config = self.rtsp_config - # Allow some tolerance for RTSP streams - if abs(w - config['width']) > 100 or abs(h - config['height']) > 100: - logger.warning(f"RTSP frame size mismatch: {w}x{h} (expected {config['width']}x{config['height']})") - if size_mb > config['max_size_mb']: - logger.warning(f"RTSP frame too large: {size_mb:.2f}MB (max {config['max_size_mb']}MB)") - return False + # Basic size validation - reject extremely large frames regardless of type + max_size_mb = 50 # Generous limit for any frame type + if size_mb > max_size_mb: + logger.warning(f"Frame too large: {size_mb:.2f}MB (max {max_size_mb}MB) for {w}x{h}") + return False - elif stream_type == StreamType.HTTP: - config = self.http_config - # More flexible for HTTP snapshots - if size_mb > config['max_size_mb']: - logger.warning(f"HTTP snapshot too large: {size_mb:.2f}MB (max {config['max_size_mb']}MB)") - return False + # Basic dimension validation + if w < 100 or h < 100: + logger.warning(f"Frame too small: {w}x{h}") + return False return True class CacheBuffer: - """Enhanced frame cache with support for cropping and optimized for different formats.""" + """Enhanced frame cache with support for cropping.""" def __init__(self, max_age_seconds: int = 10): self.frame_buffer = FrameBuffer(max_age_seconds) self._crop_cache: Dict[str, Dict[str, Any]] = {} self._cache_lock = threading.RLock() + self.jpeg_quality = 95 # High quality for all frames - # Quality settings for different stream types - self.jpeg_quality = { - StreamType.RTSP: 90, # Good quality for 720p - StreamType.HTTP: 95 # High quality for 2K - } - - def put_frame(self, camera_id: str, frame: np.ndarray, stream_type: Optional[StreamType] = None): + def put_frame(self, camera_id: str, frame: np.ndarray): """Store a frame and clear any associated crop cache.""" - self.frame_buffer.put_frame(camera_id, frame, stream_type) + self.frame_buffer.put_frame(camera_id, frame) # Clear crop cache for this camera since we have a new frame with self._cache_lock: @@ -325,21 +245,15 @@ class CacheBuffer: def get_frame_as_jpeg(self, camera_id: str, crop_coords: Optional[Tuple[int, int, int, int]] = None, quality: Optional[int] = None) -> Optional[bytes]: - """Get frame as JPEG bytes with format-specific quality settings.""" + """Get frame as JPEG bytes.""" frame = self.get_frame(camera_id, crop_coords) if frame is None: return None try: - # Determine quality based on stream type if not specified + # Use specified quality or default if quality is None: - frame_info = self.frame_buffer.get_frame_info(camera_id) - if frame_info: - stream_type_str = frame_info.get('stream_type', StreamType.RTSP.value) - stream_type = StreamType.RTSP if stream_type_str == StreamType.RTSP.value else StreamType.HTTP - quality = self.jpeg_quality[stream_type] - else: - quality = 90 # Default + quality = self.jpeg_quality # Encode as JPEG with specified quality encode_params = [cv2.IMWRITE_JPEG_QUALITY, quality] diff --git a/core/streaming/manager.py b/core/streaming/manager.py index 7bd44c1..1e3719f 100644 --- a/core/streaming/manager.py +++ b/core/streaming/manager.py @@ -10,7 +10,7 @@ from dataclasses import dataclass from collections import defaultdict from .readers import RTSPReader, HTTPSnapshotReader -from .buffers import shared_cache_buffer, StreamType +from .buffers import shared_cache_buffer from ..tracking.integration import TrackingPipelineIntegration @@ -177,12 +177,8 @@ class StreamManager: def _frame_callback(self, camera_id: str, frame): """Callback for when a new frame is available.""" try: - # Detect stream type based on frame dimensions - stream_type = self._detect_stream_type(frame) - - # Store frame in shared buffer with stream type - shared_cache_buffer.put_frame(camera_id, frame, stream_type) - + # Store frame in shared buffer + shared_cache_buffer.put_frame(camera_id, frame) # Process tracking for subscriptions with tracking integration self._process_tracking_for_camera(camera_id, frame) @@ -404,26 +400,6 @@ class StreamManager: stats[subscription_id] = subscription_info.tracking_integration.get_statistics() return stats - def _detect_stream_type(self, frame) -> StreamType: - """Detect stream type based on frame dimensions.""" - if frame is None: - return StreamType.RTSP # Default - - h, w = frame.shape[:2] - - # RTSP: 1280x720 - if w == 1280 and h == 720: - return StreamType.RTSP - - # HTTP: 2560x1440 or larger - if w >= 2000 and h >= 1000: - return StreamType.HTTP - - # Default based on size - if w <= 1920 and h <= 1080: - return StreamType.RTSP - else: - return StreamType.HTTP def get_stats(self) -> Dict[str, Any]: """Get comprehensive streaming statistics.""" @@ -431,22 +407,11 @@ class StreamManager: buffer_stats = shared_cache_buffer.get_stats() tracking_stats = self.get_tracking_stats() - # Add stream type information - stream_types = {} - for camera_id in self._streams.keys(): - if isinstance(self._streams[camera_id], RTSPReader): - stream_types[camera_id] = 'rtsp' - elif isinstance(self._streams[camera_id], HTTPSnapshotReader): - stream_types[camera_id] = 'http' - else: - stream_types[camera_id] = 'unknown' - return { 'active_subscriptions': len(self._subscriptions), 'active_streams': len(self._streams), 'cameras_with_subscribers': len(self._camera_subscribers), 'max_streams': self.max_streams, - 'stream_types': stream_types, 'subscriptions_by_camera': { camera_id: len(subscribers) for camera_id, subscribers in self._camera_subscribers.items() diff --git a/core/streaming/readers.py b/core/streaming/readers.py index 9a3db6d..53c9643 100644 --- a/core/streaming/readers.py +++ b/core/streaming/readers.py @@ -37,7 +37,6 @@ class RTSPReader: self.expected_fps = 6 # Frame processing parameters - self.frame_interval = 1.0 / self.expected_fps # ~167ms for 6fps self.error_recovery_delay = 5.0 # Increased from 2.0 for stability self.max_consecutive_errors = 30 # Increased from 10 to handle network jitter self.stream_timeout = 30.0 @@ -72,7 +71,6 @@ class RTSPReader: frame_count = 0 last_log_time = time.time() last_successful_frame_time = time.time() - last_frame_time = 0 while not self.stop_event.is_set(): try: @@ -90,12 +88,7 @@ class RTSPReader: last_successful_frame_time = time.time() continue - # Rate limiting for 6fps - current_time = time.time() - if current_time - last_frame_time < self.frame_interval: - time.sleep(0.01) # Small sleep to avoid busy waiting - continue - + # Read frame immediately without rate limiting for minimum latency ret, frame = self.cap.read() if not ret or frame is None: @@ -118,15 +111,10 @@ class RTSPReader: time.sleep(sleep_time) continue - # Validate frame dimensions - if frame.shape[1] != self.expected_width or frame.shape[0] != self.expected_height: - logger.warning(f"Camera {self.camera_id}: Unexpected frame dimensions {frame.shape[1]}x{frame.shape[0]}") - # Try to resize if dimensions are wrong - if frame.shape[1] > 0 and frame.shape[0] > 0: - frame = cv2.resize(frame, (self.expected_width, self.expected_height)) - else: - consecutive_errors += 1 - continue + # Accept any valid frame dimensions - don't force specific resolution + if frame.shape[1] <= 0 or frame.shape[0] <= 0: + consecutive_errors += 1 + continue # Check for corrupted frames (all black, all white, excessive noise) if self._is_frame_corrupted(frame): @@ -138,7 +126,6 @@ class RTSPReader: consecutive_errors = 0 frame_count += 1 last_successful_frame_time = time.time() - last_frame_time = current_time # Call frame callback if self.frame_callback: @@ -148,6 +135,7 @@ class RTSPReader: logger.error(f"Camera {self.camera_id}: Frame callback error: {e}") # Log progress every 30 seconds + current_time = time.time() if current_time - last_log_time >= 30: logger.info(f"Camera {self.camera_id}: {frame_count} frames processed") last_log_time = current_time @@ -261,14 +249,12 @@ class RTSPReader: logger.error(f"Failed to open stream for camera {self.camera_id}") return False - # Set capture properties for 1280x720@6fps - self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.expected_width) - self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.expected_height) - self.cap.set(cv2.CAP_PROP_FPS, self.expected_fps) + # Don't force resolution/fps - let the stream determine its natural specs + # The camera will provide whatever resolution/fps it supports - # Set moderate buffer to handle network jitter while avoiding excessive latency - # Buffer of 3 frames provides resilience without major delay - self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 3) + # Set minimal buffer for lowest latency - single frame buffer + # This ensures we always get the most recent frame + self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # Set FFMPEG options for better H.264 handling self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264')) @@ -405,15 +391,10 @@ class HTTPSnapshotReader: time.sleep(min(2.0, interval_seconds)) continue - # Validate image dimensions - if frame.shape[1] != self.expected_width or frame.shape[0] != self.expected_height: - logger.info(f"Camera {self.camera_id}: Snapshot dimensions {frame.shape[1]}x{frame.shape[0]} " - f"(expected {self.expected_width}x{self.expected_height})") - # Resize if needed (maintaining aspect ratio for high quality) - if frame.shape[1] > 0 and frame.shape[0] > 0: - # Only resize if significantly different - if abs(frame.shape[1] - self.expected_width) > 100: - frame = self._resize_maintain_aspect(frame, self.expected_width, self.expected_height) + # Accept any valid image dimensions - don't force specific resolution + if frame.shape[1] <= 0 or frame.shape[0] <= 0: + logger.warning(f"Camera {self.camera_id}: Invalid frame dimensions {frame.shape[1]}x{frame.shape[0]}") + continue # Reset retry counter on successful fetch retries = 0