From dd401f14d7e14dbd36f15142f538b8742502438e Mon Sep 17 00:00:00 2001 From: ziesorx Date: Tue, 23 Sep 2025 23:06:03 +0700 Subject: [PATCH] feat: tracking works 100% --- core/communication/messages.py | 7 +- core/communication/models.py | 4 +- core/streaming/__init__.py | 3 +- core/streaming/buffers.py | 205 ++++++++++-- core/streaming/manager.py | 46 ++- core/streaming/readers.py | 551 +++++++++++++++++---------------- 6 files changed, 511 insertions(+), 305 deletions(-) diff --git a/core/communication/messages.py b/core/communication/messages.py index 5afde80..d94f1c4 100644 --- a/core/communication/messages.py +++ b/core/communication/messages.py @@ -82,7 +82,12 @@ def serialize_outgoing_message(message: OutgoingMessage) -> str: JSON string representation """ try: - return message.model_dump_json(exclude_none=True) + # For ImageDetectionMessage, we need to include None values for abandonment detection + from .models import ImageDetectionMessage + if isinstance(message, ImageDetectionMessage): + return message.model_dump_json(exclude_none=False) + else: + return message.model_dump_json(exclude_none=True) except Exception as e: logger.error(f"Failed to serialize outgoing message: {e}") raise diff --git a/core/communication/models.py b/core/communication/models.py index eb55cc6..14ca881 100644 --- a/core/communication/models.py +++ b/core/communication/models.py @@ -36,7 +36,9 @@ class CameraConnection(BaseModel): class DetectionData(BaseModel): """Detection result data structure.""" - detection: Dict[str, Any] = Field(..., description="Flat key-value detection results") + model_config = {"json_encoders": {type(None): lambda v: None}} + + detection: Optional[Dict[str, Any]] = Field(None, description="Flat key-value detection results, null for abandonment") modelId: int modelName: str diff --git a/core/streaming/__init__.py b/core/streaming/__init__.py index 0863b6e..bed8399 100644 --- a/core/streaming/__init__.py +++ b/core/streaming/__init__.py @@ -2,7 +2,7 @@ Streaming system for RTSP and HTTP camera feeds. Provides modular frame readers, buffers, and stream management. """ -from .readers import RTSPReader, HTTPSnapshotReader, fetch_snapshot +from .readers import RTSPReader, HTTPSnapshotReader from .buffers import FrameBuffer, CacheBuffer, shared_frame_buffer, shared_cache_buffer, save_frame_for_testing from .manager import StreamManager, StreamConfig, SubscriptionInfo, shared_stream_manager @@ -10,7 +10,6 @@ __all__ = [ # Readers 'RTSPReader', 'HTTPSnapshotReader', - 'fetch_snapshot', # Buffers 'FrameBuffer', diff --git a/core/streaming/buffers.py b/core/streaming/buffers.py index dbb8e73..875207c 100644 --- a/core/streaming/buffers.py +++ b/core/streaming/buffers.py @@ -1,37 +1,75 @@ """ -Frame buffering and caching system for stream management. -Provides efficient frame storage and retrieval for multiple consumers. +Frame buffering and caching system optimized for different stream formats. +Supports 1280x720 RTSP streams and 2560x1440 HTTP snapshots. """ import threading import time import cv2 import logging import numpy as np -from typing import Optional, Dict, Any +from typing import Optional, Dict, Any, Tuple from collections import defaultdict +from enum import Enum logger = logging.getLogger(__name__) +class StreamType(Enum): + """Stream type enumeration.""" + RTSP = "rtsp" # 1280x720 @ 6fps + HTTP = "http" # 2560x1440 high quality + + class FrameBuffer: - """Thread-safe frame buffer that stores the latest frame for each camera.""" + """Thread-safe frame buffer optimized for different stream types.""" def __init__(self, max_age_seconds: int = 5): self.max_age_seconds = max_age_seconds self._frames: Dict[str, Dict[str, Any]] = {} + self._stream_types: Dict[str, StreamType] = {} self._lock = threading.RLock() - def put_frame(self, camera_id: str, frame: np.ndarray): - """Store a frame for the given camera ID.""" + # Stream-specific settings + self.rtsp_config = { + 'width': 1280, + 'height': 720, + 'fps': 6, + 'max_size_mb': 3 # 1280x720x3 bytes = ~2.6MB + } + self.http_config = { + 'width': 2560, + 'height': 1440, + 'max_size_mb': 10 + } + + def put_frame(self, camera_id: str, frame: np.ndarray, stream_type: Optional[StreamType] = None): + """Store a frame for the given camera ID with type-specific validation.""" with self._lock: + # Detect stream type if not provided + if stream_type is None: + stream_type = self._detect_stream_type(frame) + + # Store stream type + self._stream_types[camera_id] = stream_type + + # Validate frame based on stream type + if not self._validate_frame(frame, stream_type): + logger.warning(f"Frame validation failed for camera {camera_id} ({stream_type.value})") + return + self._frames[camera_id] = { - 'frame': frame.copy(), # Make a copy to avoid reference issues + 'frame': frame.copy(), 'timestamp': time.time(), 'shape': frame.shape, - 'dtype': str(frame.dtype) + 'dtype': str(frame.dtype), + 'stream_type': stream_type.value, + 'size_mb': frame.nbytes / (1024 * 1024) } + logger.debug(f"Stored {stream_type.value} frame for camera {camera_id}: " + f"{frame.shape[1]}x{frame.shape[0]}, {frame.nbytes / (1024 * 1024):.2f}MB") + def get_frame(self, camera_id: str) -> Optional[np.ndarray]: """Get the latest frame for the given camera ID.""" with self._lock: @@ -45,6 +83,8 @@ class FrameBuffer: if age > self.max_age_seconds: logger.debug(f"Frame for camera {camera_id} is {age:.1f}s old, discarding") del self._frames[camera_id] + if camera_id in self._stream_types: + del self._stream_types[camera_id] return None return frame_data['frame'].copy() @@ -60,13 +100,17 @@ class FrameBuffer: if age > self.max_age_seconds: del self._frames[camera_id] + if camera_id in self._stream_types: + del self._stream_types[camera_id] return None return { 'timestamp': frame_data['timestamp'], 'age': age, 'shape': frame_data['shape'], - 'dtype': frame_data['dtype'] + 'dtype': frame_data['dtype'], + 'stream_type': frame_data.get('stream_type', 'unknown'), + 'size_mb': frame_data.get('size_mb', 0) } def has_frame(self, camera_id: str) -> bool: @@ -78,13 +122,16 @@ class FrameBuffer: with self._lock: if camera_id in self._frames: del self._frames[camera_id] - logger.debug(f"Cleared frames for camera {camera_id}") + if camera_id in self._stream_types: + del self._stream_types[camera_id] + logger.debug(f"Cleared frames for camera {camera_id}") def clear_all(self): """Clear all stored frames.""" with self._lock: count = len(self._frames) self._frames.clear() + self._stream_types.clear() logger.debug(f"Cleared all frames ({count} cameras)") def get_camera_list(self) -> list: @@ -104,6 +151,8 @@ class FrameBuffer: # Clean up expired frames for camera_id in expired_cameras: del self._frames[camera_id] + if camera_id in self._stream_types: + del self._stream_types[camera_id] return valid_cameras @@ -115,44 +164,110 @@ class FrameBuffer: 'total_cameras': len(self._frames), 'valid_cameras': 0, 'expired_cameras': 0, + 'rtsp_cameras': 0, + 'http_cameras': 0, + 'total_memory_mb': 0, 'cameras': {} } for camera_id, frame_data in self._frames.items(): age = current_time - frame_data['timestamp'] + stream_type = frame_data.get('stream_type', 'unknown') + size_mb = frame_data.get('size_mb', 0) + if age <= self.max_age_seconds: stats['valid_cameras'] += 1 else: stats['expired_cameras'] += 1 + if stream_type == StreamType.RTSP.value: + stats['rtsp_cameras'] += 1 + elif stream_type == StreamType.HTTP.value: + stats['http_cameras'] += 1 + + stats['total_memory_mb'] += size_mb + stats['cameras'][camera_id] = { 'age': age, 'valid': age <= self.max_age_seconds, 'shape': frame_data['shape'], - 'dtype': frame_data['dtype'] + 'dtype': frame_data['dtype'], + 'stream_type': stream_type, + 'size_mb': size_mb } return stats + def _detect_stream_type(self, frame: np.ndarray) -> StreamType: + """Detect stream type based on frame dimensions.""" + h, w = frame.shape[:2] + + # Check if it matches RTSP dimensions (1280x720) + if w == self.rtsp_config['width'] and h == self.rtsp_config['height']: + return StreamType.RTSP + + # Check if it matches HTTP dimensions (2560x1440) or close to it + if w >= 2000 and h >= 1000: + return StreamType.HTTP + + # Default based on size + if w <= 1920 and h <= 1080: + return StreamType.RTSP + else: + return StreamType.HTTP + + def _validate_frame(self, frame: np.ndarray, stream_type: StreamType) -> bool: + """Validate frame based on stream type.""" + if frame is None or frame.size == 0: + return False + + h, w = frame.shape[:2] + size_mb = frame.nbytes / (1024 * 1024) + + if stream_type == StreamType.RTSP: + config = self.rtsp_config + # Allow some tolerance for RTSP streams + if abs(w - config['width']) > 100 or abs(h - config['height']) > 100: + logger.warning(f"RTSP frame size mismatch: {w}x{h} (expected {config['width']}x{config['height']})") + if size_mb > config['max_size_mb']: + logger.warning(f"RTSP frame too large: {size_mb:.2f}MB (max {config['max_size_mb']}MB)") + return False + + elif stream_type == StreamType.HTTP: + config = self.http_config + # More flexible for HTTP snapshots + if size_mb > config['max_size_mb']: + logger.warning(f"HTTP snapshot too large: {size_mb:.2f}MB (max {config['max_size_mb']}MB)") + return False + + return True + class CacheBuffer: - """Enhanced frame cache with support for cropping and REST API access.""" + """Enhanced frame cache with support for cropping and optimized for different formats.""" def __init__(self, max_age_seconds: int = 10): self.frame_buffer = FrameBuffer(max_age_seconds) self._crop_cache: Dict[str, Dict[str, Any]] = {} self._cache_lock = threading.RLock() - def put_frame(self, camera_id: str, frame: np.ndarray): + # Quality settings for different stream types + self.jpeg_quality = { + StreamType.RTSP: 90, # Good quality for 720p + StreamType.HTTP: 95 # High quality for 2K + } + + def put_frame(self, camera_id: str, frame: np.ndarray, stream_type: Optional[StreamType] = None): """Store a frame and clear any associated crop cache.""" - self.frame_buffer.put_frame(camera_id, frame) + self.frame_buffer.put_frame(camera_id, frame, stream_type) # Clear crop cache for this camera since we have a new frame with self._cache_lock: - if camera_id in self._crop_cache: - del self._crop_cache[camera_id] + keys_to_remove = [key for key in self._crop_cache.keys() if key.startswith(f"{camera_id}_")] + for key in keys_to_remove: + del self._crop_cache[key] - def get_frame(self, camera_id: str, crop_coords: Optional[tuple] = None) -> Optional[np.ndarray]: + def get_frame(self, camera_id: str, crop_coords: Optional[Tuple[int, int, int, int]] = None) -> Optional[np.ndarray]: """Get frame with optional cropping.""" if crop_coords is None: return self.frame_buffer.get_frame(camera_id) @@ -175,6 +290,7 @@ class CacheBuffer: try: x1, y1, x2, y2 = crop_coords + # Ensure coordinates are within frame bounds h, w = original_frame.shape[:2] x1 = max(0, min(x1, w)) @@ -186,6 +302,14 @@ class CacheBuffer: # Cache the cropped frame with self._cache_lock: + # Limit cache size to prevent memory issues + if len(self._crop_cache) > 100: + # Remove oldest entries + oldest_keys = sorted(self._crop_cache.keys(), + key=lambda k: self._crop_cache[k]['timestamp'])[:50] + for key in oldest_keys: + del self._crop_cache[key] + self._crop_cache[crop_key] = { 'cropped_frame': cropped_frame.copy(), 'timestamp': time.time(), @@ -198,19 +322,33 @@ class CacheBuffer: logger.error(f"Error cropping frame for camera {camera_id}: {e}") return original_frame - def get_frame_as_jpeg(self, camera_id: str, crop_coords: Optional[tuple] = None, - quality: int = 100) -> Optional[bytes]: - """Get frame as JPEG bytes for HTTP responses with highest quality by default.""" + def get_frame_as_jpeg(self, camera_id: str, crop_coords: Optional[Tuple[int, int, int, int]] = None, + quality: Optional[int] = None) -> Optional[bytes]: + """Get frame as JPEG bytes with format-specific quality settings.""" frame = self.get_frame(camera_id, crop_coords) if frame is None: return None try: - # Encode as JPEG with specified quality (default 100 for highest) + # Determine quality based on stream type if not specified + if quality is None: + frame_info = self.frame_buffer.get_frame_info(camera_id) + if frame_info: + stream_type_str = frame_info.get('stream_type', StreamType.RTSP.value) + stream_type = StreamType.RTSP if stream_type_str == StreamType.RTSP.value else StreamType.HTTP + quality = self.jpeg_quality[stream_type] + else: + quality = 90 # Default + + # Encode as JPEG with specified quality encode_params = [cv2.IMWRITE_JPEG_QUALITY, quality] success, encoded_img = cv2.imencode('.jpg', frame, encode_params) + if success: - return encoded_img.tobytes() + jpeg_bytes = encoded_img.tobytes() + logger.debug(f"Encoded JPEG for camera {camera_id}: quality={quality}, size={len(jpeg_bytes)} bytes") + return jpeg_bytes + return None except Exception as e: @@ -243,12 +381,17 @@ class CacheBuffer: with self._cache_lock: cache_stats = { 'crop_cache_entries': len(self._crop_cache), - 'crop_cache_cameras': len(set(key.split('_')[0] for key in self._crop_cache.keys())) + 'crop_cache_cameras': len(set(key.split('_')[0] for key in self._crop_cache.keys() if '_' in key)), + 'crop_cache_memory_mb': sum( + entry['cropped_frame'].nbytes / (1024 * 1024) + for entry in self._crop_cache.values() + ) } return { 'buffer': buffer_stats, - 'cache': cache_stats + 'cache': cache_stats, + 'total_memory_mb': buffer_stats.get('total_memory_mb', 0) + cache_stats.get('crop_cache_memory_mb', 0) } @@ -267,9 +410,19 @@ def save_frame_for_testing(camera_id: str, frame: np.ndarray, test_dir: str = "t filename = f"{camera_id}_{timestamp}.jpg" filepath = os.path.join(test_dir, filename) - success = cv2.imwrite(filepath, frame) + # Use appropriate quality based on frame size + h, w = frame.shape[:2] + if w >= 2000: # High resolution + quality = 95 + else: # Standard resolution + quality = 90 + + encode_params = [cv2.IMWRITE_JPEG_QUALITY, quality] + success = cv2.imwrite(filepath, frame, encode_params) + if success: - logger.info(f"Saved test frame: {filepath}") + size_kb = os.path.getsize(filepath) / 1024 + logger.info(f"Saved test frame: {filepath} ({w}x{h}, {size_kb:.1f}KB)") else: logger.error(f"Failed to save test frame: {filepath}") diff --git a/core/streaming/manager.py b/core/streaming/manager.py index 893f128..ea6fb20 100644 --- a/core/streaming/manager.py +++ b/core/streaming/manager.py @@ -1,6 +1,6 @@ """ Stream coordination and lifecycle management. -Handles shared streams, subscription reconciliation, and resource optimization. +Optimized for 1280x720@6fps RTSP and 2560x1440 HTTP snapshots. """ import logging import threading @@ -10,7 +10,7 @@ from dataclasses import dataclass from collections import defaultdict from .readers import RTSPReader, HTTPSnapshotReader -from .buffers import shared_cache_buffer, save_frame_for_testing +from .buffers import shared_cache_buffer, save_frame_for_testing, StreamType from ..tracking.integration import TrackingPipelineIntegration @@ -174,8 +174,11 @@ class StreamManager: def _frame_callback(self, camera_id: str, frame): """Callback for when a new frame is available.""" try: - # Store frame in shared buffer - shared_cache_buffer.put_frame(camera_id, frame) + # Detect stream type based on frame dimensions + stream_type = self._detect_stream_type(frame) + + # Store frame in shared buffer with stream type + shared_cache_buffer.put_frame(camera_id, frame, stream_type) # Save test frames if enabled for any subscription with self._lock: @@ -406,23 +409,56 @@ class StreamManager: stats[subscription_id] = subscription_info.tracking_integration.get_statistics() return stats + def _detect_stream_type(self, frame) -> StreamType: + """Detect stream type based on frame dimensions.""" + if frame is None: + return StreamType.RTSP # Default + + h, w = frame.shape[:2] + + # RTSP: 1280x720 + if w == 1280 and h == 720: + return StreamType.RTSP + + # HTTP: 2560x1440 or larger + if w >= 2000 and h >= 1000: + return StreamType.HTTP + + # Default based on size + if w <= 1920 and h <= 1080: + return StreamType.RTSP + else: + return StreamType.HTTP + def get_stats(self) -> Dict[str, Any]: """Get comprehensive streaming statistics.""" with self._lock: buffer_stats = shared_cache_buffer.get_stats() tracking_stats = self.get_tracking_stats() + # Add stream type information + stream_types = {} + for camera_id in self._streams.keys(): + if isinstance(self._streams[camera_id], RTSPReader): + stream_types[camera_id] = 'rtsp' + elif isinstance(self._streams[camera_id], HTTPSnapshotReader): + stream_types[camera_id] = 'http' + else: + stream_types[camera_id] = 'unknown' + return { 'active_subscriptions': len(self._subscriptions), 'active_streams': len(self._streams), 'cameras_with_subscribers': len(self._camera_subscribers), 'max_streams': self.max_streams, + 'stream_types': stream_types, 'subscriptions_by_camera': { camera_id: len(subscribers) for camera_id, subscribers in self._camera_subscribers.items() }, 'buffer_stats': buffer_stats, - 'tracking_stats': tracking_stats + 'tracking_stats': tracking_stats, + 'memory_usage_mb': buffer_stats.get('total_memory_mb', 0) } diff --git a/core/streaming/readers.py b/core/streaming/readers.py index f2da909..e6856d8 100644 --- a/core/streaming/readers.py +++ b/core/streaming/readers.py @@ -1,6 +1,6 @@ """ Frame readers for RTSP streams and HTTP snapshots. -Extracted from app.py for modular architecture. +Optimized for 1280x720@6fps RTSP and 2560x1440 HTTP snapshots. """ import cv2 import logging @@ -8,15 +8,19 @@ import time import threading import requests import numpy as np +import os from typing import Optional, Callable -from queue import Queue +# Suppress FFMPEG/H.264 error messages if needed +# Set this environment variable to reduce noise from decoder errors +os.environ["OPENCV_LOG_LEVEL"] = "ERROR" +os.environ["OPENCV_FFMPEG_LOGLEVEL"] = "-8" # Suppress FFMPEG warnings logger = logging.getLogger(__name__) class RTSPReader: - """RTSP stream frame reader using OpenCV.""" + """RTSP stream frame reader optimized for 1280x720 @ 6fps streams.""" def __init__(self, camera_id: str, rtsp_url: str, max_retries: int = 3): self.camera_id = camera_id @@ -27,6 +31,17 @@ class RTSPReader: self.thread = None self.frame_callback: Optional[Callable] = None + # Expected stream specifications + self.expected_width = 1280 + self.expected_height = 720 + self.expected_fps = 6 + + # Frame processing parameters + self.frame_interval = 1.0 / self.expected_fps # ~167ms for 6fps + self.error_recovery_delay = 2.0 + self.max_consecutive_errors = 10 + self.stream_timeout = 30.0 + def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]): """Set callback function to handle captured frames.""" self.frame_callback = callback @@ -52,212 +67,186 @@ class RTSPReader: logger.info(f"Stopped RTSP reader for camera {self.camera_id}") def _read_frames(self): - """Main frame reading loop with improved error handling and stream recovery.""" - retries = 0 + """Main frame reading loop with H.264 error recovery.""" + consecutive_errors = 0 frame_count = 0 last_log_time = time.time() - consecutive_errors = 0 last_successful_frame_time = time.time() + last_frame_time = 0 - try: - # Initialize video capture with optimized parameters - self._initialize_capture() - - while not self.stop_event.is_set(): - try: - # Check if stream needs recovery - if not self.cap or not self.cap.isOpened(): - logger.warning(f"Camera {self.camera_id} not open, reinitializing") - self._initialize_capture() - time.sleep(1) + while not self.stop_event.is_set(): + try: + # Initialize/reinitialize capture if needed + if not self.cap or not self.cap.isOpened(): + if not self._initialize_capture(): + time.sleep(self.error_recovery_delay) continue - - # Check for stream timeout (no frames for 30 seconds) - if time.time() - last_successful_frame_time > 30: - logger.warning(f"Camera {self.camera_id} stream timeout, reinitializing") - self._initialize_capture() - last_successful_frame_time = time.time() - continue - - ret, frame = self.cap.read() - - if not ret or frame is None: - consecutive_errors += 1 - logger.warning(f"Failed to read frame from camera {self.camera_id} (consecutive errors: {consecutive_errors})") - - # Force stream recovery after multiple consecutive errors - if consecutive_errors >= 5: - logger.warning(f"Camera {self.camera_id}: Too many consecutive errors, reinitializing stream") - self._initialize_capture() - consecutive_errors = 0 - continue - - retries += 1 - if retries > self.max_retries and self.max_retries != -1: - logger.error(f"Max retries reached for camera {self.camera_id}") - break - time.sleep(0.1) - continue - - # Skip frame validation for now - let YOLO handle corrupted frames - # if not self._is_frame_valid(frame): - # logger.debug(f"Invalid frame detected for camera {self.camera_id}, skipping") - # consecutive_errors += 1 - # if consecutive_errors >= 10: # Reinitialize after many invalid frames - # logger.warning(f"Camera {self.camera_id}: Too many invalid frames, reinitializing") - # self._initialize_capture() - # consecutive_errors = 0 - # continue - - # Reset counters on successful read - retries = 0 - consecutive_errors = 0 - frame_count += 1 last_successful_frame_time = time.time() - # Call frame callback if set - if self.frame_callback: - self.frame_callback(self.camera_id, frame) + # Check for stream timeout + if time.time() - last_successful_frame_time > self.stream_timeout: + logger.warning(f"Camera {self.camera_id}: Stream timeout, reinitializing") + self._reinitialize_capture() + last_successful_frame_time = time.time() + continue - # Log progress every 30 seconds - current_time = time.time() - if current_time - last_log_time >= 30: - logger.info(f"Camera {self.camera_id}: {frame_count} frames processed, {consecutive_errors} consecutive errors") - last_log_time = current_time + # Rate limiting for 6fps + current_time = time.time() + if current_time - last_frame_time < self.frame_interval: + time.sleep(0.01) # Small sleep to avoid busy waiting + continue - # Adaptive delay based on stream FPS and performance - if consecutive_errors == 0: - # Calculate frame delay based on actual FPS - try: - actual_fps = self.cap.get(cv2.CAP_PROP_FPS) - if actual_fps > 0 and actual_fps <= 120: # Reasonable bounds - delay = 1.0 / actual_fps - # Mock cam: 60fps -> ~16.7ms delay - # Real cam: 6fps -> ~167ms delay - else: - # Fallback for invalid FPS values - delay = 0.033 # Default 30 FPS (33ms) - except Exception as e: - logger.debug(f"Failed to get FPS for delay calculation: {e}") - delay = 0.033 # Fallback to 30 FPS - else: - delay = 0.1 # Slower when having issues (100ms) + ret, frame = self.cap.read() - time.sleep(delay) - - except Exception as e: - logger.error(f"Error reading frame from camera {self.camera_id}: {e}") + if not ret or frame is None: consecutive_errors += 1 - retries += 1 - # Force reinitialization on severe errors - if consecutive_errors >= 3: - logger.warning(f"Camera {self.camera_id}: Severe errors detected, reinitializing stream") - self._initialize_capture() + if consecutive_errors >= self.max_consecutive_errors: + logger.error(f"Camera {self.camera_id}: Too many consecutive errors, reinitializing") + self._reinitialize_capture() consecutive_errors = 0 + time.sleep(self.error_recovery_delay) + else: + # Skip corrupted frame and continue + logger.debug(f"Camera {self.camera_id}: Frame read failed (error {consecutive_errors})") + time.sleep(0.1) + continue - if retries > self.max_retries and self.max_retries != -1: - break - time.sleep(1) + # Validate frame dimensions + if frame.shape[1] != self.expected_width or frame.shape[0] != self.expected_height: + logger.warning(f"Camera {self.camera_id}: Unexpected frame dimensions {frame.shape[1]}x{frame.shape[0]}") + # Try to resize if dimensions are wrong + if frame.shape[1] > 0 and frame.shape[0] > 0: + frame = cv2.resize(frame, (self.expected_width, self.expected_height)) + else: + consecutive_errors += 1 + continue - except Exception as e: - logger.error(f"Fatal error in RTSP reader for camera {self.camera_id}: {e}") - finally: - if self.cap: - self.cap.release() - logger.info(f"RTSP reader thread ended for camera {self.camera_id}") + # Check for corrupted frames (all black, all white, excessive noise) + if self._is_frame_corrupted(frame): + logger.debug(f"Camera {self.camera_id}: Corrupted frame detected, skipping") + consecutive_errors += 1 + continue - def _initialize_capture(self): - """Initialize or reinitialize video capture with optimized settings.""" + # Frame is valid + consecutive_errors = 0 + frame_count += 1 + last_successful_frame_time = time.time() + last_frame_time = current_time + + # Call frame callback + if self.frame_callback: + try: + self.frame_callback(self.camera_id, frame) + except Exception as e: + logger.error(f"Camera {self.camera_id}: Frame callback error: {e}") + + # Log progress every 30 seconds + if current_time - last_log_time >= 30: + logger.info(f"Camera {self.camera_id}: {frame_count} frames processed") + last_log_time = current_time + + except Exception as e: + logger.error(f"Camera {self.camera_id}: Error in frame reading loop: {e}") + consecutive_errors += 1 + if consecutive_errors >= self.max_consecutive_errors: + self._reinitialize_capture() + consecutive_errors = 0 + time.sleep(self.error_recovery_delay) + + # Cleanup + if self.cap: + self.cap.release() + logger.info(f"RTSP reader thread ended for camera {self.camera_id}") + + def _initialize_capture(self) -> bool: + """Initialize video capture with optimized settings for 1280x720@6fps.""" try: # Release previous capture if exists if self.cap: self.cap.release() - time.sleep(0.1) + time.sleep(0.5) - # Create new capture with enhanced RTSP URL parameters - enhanced_url = self._enhance_rtsp_url(self.rtsp_url) - logger.debug(f"Initializing capture for camera {self.camera_id} with URL: {enhanced_url}") + logger.info(f"Initializing capture for camera {self.camera_id}") - self.cap = cv2.VideoCapture(enhanced_url) + # Create capture with FFMPEG backend + self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG) if not self.cap.isOpened(): - # Try again with different backend - logger.debug(f"Retrying capture initialization with different backend for camera {self.camera_id}") - self.cap = cv2.VideoCapture(enhanced_url, cv2.CAP_FFMPEG) - - if self.cap.isOpened(): - # Get actual stream properties first - width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) - height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) - fps = self.cap.get(cv2.CAP_PROP_FPS) - - # Adaptive buffer settings based on FPS and resolution - # Mock cam: 1920x1080@60fps, Real cam: 1280x720@6fps - if fps > 30: - # High FPS streams (like mock cam) need larger buffer - buffer_size = 5 - elif fps > 15: - # Medium FPS streams - buffer_size = 3 - else: - # Low FPS streams (like real cam) can use smaller buffer - buffer_size = 2 - - # Apply buffer size with bounds checking - try: - self.cap.set(cv2.CAP_PROP_BUFFERSIZE, buffer_size) - actual_buffer = int(self.cap.get(cv2.CAP_PROP_BUFFERSIZE)) - logger.debug(f"Camera {self.camera_id}: Buffer size set to {buffer_size}, actual: {actual_buffer}") - except Exception as e: - logger.warning(f"Failed to set buffer size for camera {self.camera_id}: {e}") - - # Don't override FPS - let stream use its natural rate - # This works for both mock cam (60fps) and real cam (6fps) - logger.debug(f"Camera {self.camera_id}: Using native FPS {fps}") - - # Additional optimization for high resolution streams - if width * height > 1920 * 1080: - logger.info(f"Camera {self.camera_id}: High resolution stream detected, applying optimizations") - - logger.info(f"Camera {self.camera_id} initialized: {width}x{height}, FPS: {fps}, Buffer: {buffer_size}") - return True - else: - logger.error(f"Failed to initialize camera {self.camera_id}") + logger.error(f"Failed to open stream for camera {self.camera_id}") return False + # Set capture properties for 1280x720@6fps + self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.expected_width) + self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.expected_height) + self.cap.set(cv2.CAP_PROP_FPS, self.expected_fps) + + # Set small buffer to reduce latency and avoid accumulating corrupted frames + self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) + + # Set FFMPEG options for better H.264 handling + self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264')) + + # Verify stream properties + actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + actual_fps = self.cap.get(cv2.CAP_PROP_FPS) + + logger.info(f"Camera {self.camera_id} initialized: {actual_width}x{actual_height} @ {actual_fps}fps") + + # Read and discard first few frames to stabilize stream + for _ in range(5): + ret, _ = self.cap.read() + if not ret: + logger.warning(f"Camera {self.camera_id}: Failed to read initial frames") + time.sleep(0.1) + + return True + except Exception as e: logger.error(f"Error initializing capture for camera {self.camera_id}: {e}") return False - def _enhance_rtsp_url(self, rtsp_url: str) -> str: - """Use RTSP URL exactly as provided by backend without modification.""" - return rtsp_url + def _reinitialize_capture(self): + """Reinitialize capture after errors.""" + logger.info(f"Reinitializing capture for camera {self.camera_id}") + if self.cap: + self.cap.release() + self.cap = None + time.sleep(1.0) + self._initialize_capture() - def _is_frame_valid(self, frame) -> bool: - """Validate frame integrity to detect corrupted frames.""" - if frame is None: - return False + def _is_frame_corrupted(self, frame: np.ndarray) -> bool: + """Check if frame is corrupted (all black, all white, or excessive noise).""" + if frame is None or frame.size == 0: + return True - # Check frame dimensions - if frame.shape[0] < 10 or frame.shape[1] < 10: - return False + # Check mean and standard deviation + mean = np.mean(frame) + std = np.std(frame) - # Check if frame is completely black or completely white (possible corruption) - mean_val = np.mean(frame) - if mean_val < 1 or mean_val > 254: - return False + # All black or all white + if mean < 5 or mean > 250: + return True - # Check for excessive noise/corruption (very high standard deviation) - std_val = np.std(frame) - if std_val > 100: # Threshold for detecting very noisy/corrupted frames - return False + # No variation (stuck frame) + if std < 1: + return True - return True + # Excessive noise (corrupted H.264 decode) + # Calculate edge density as corruption indicator + edges = cv2.Canny(frame, 50, 150) + edge_density = np.sum(edges > 0) / edges.size + + # Too many edges indicate corruption + if edge_density > 0.5: + return True + + return False class HTTPSnapshotReader: - """HTTP snapshot reader for periodic image capture.""" + """HTTP snapshot reader optimized for 2560x1440 (2K) high quality images.""" def __init__(self, camera_id: str, snapshot_url: str, interval_ms: int = 5000, max_retries: int = 3): self.camera_id = camera_id @@ -268,6 +257,11 @@ class HTTPSnapshotReader: self.thread = None self.frame_callback: Optional[Callable] = None + # Expected snapshot specifications + self.expected_width = 2560 + self.expected_height = 1440 + self.max_file_size = 10 * 1024 * 1024 # 10MB max for 2K image + def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]): """Set callback function to handle captured frames.""" self.frame_callback = callback @@ -291,7 +285,7 @@ class HTTPSnapshotReader: logger.info(f"Stopped snapshot reader for camera {self.camera_id}") def _read_snapshots(self): - """Main snapshot reading loop.""" + """Main snapshot reading loop for high quality 2K images.""" retries = 0 frame_count = 0 last_log_time = time.time() @@ -299,66 +293,78 @@ class HTTPSnapshotReader: logger.info(f"Snapshot interval for camera {self.camera_id}: {interval_seconds}s") - try: - while not self.stop_event.is_set(): - try: - start_time = time.time() - frame = self._fetch_snapshot() + while not self.stop_event.is_set(): + try: + start_time = time.time() + frame = self._fetch_snapshot() - if frame is None: - logger.warning(f"Failed to fetch snapshot for camera {self.camera_id}, retry {retries+1}/{self.max_retries}") - retries += 1 - if retries > self.max_retries and self.max_retries != -1: - logger.error(f"Max retries reached for snapshot camera {self.camera_id}") - break - time.sleep(1) - continue - - # Reset retry counter on successful fetch - retries = 0 - frame_count += 1 - - # Call frame callback if set - if self.frame_callback: - self.frame_callback(self.camera_id, frame) - - # Log progress every 30 seconds - current_time = time.time() - if current_time - last_log_time >= 30: - logger.info(f"Camera {self.camera_id}: {frame_count} snapshots processed") - last_log_time = current_time - - # Wait for next interval, accounting for processing time - elapsed = time.time() - start_time - sleep_time = max(0, interval_seconds - elapsed) - if sleep_time > 0: - self.stop_event.wait(sleep_time) - - except Exception as e: - logger.error(f"Error fetching snapshot for camera {self.camera_id}: {e}") + if frame is None: retries += 1 - if retries > self.max_retries and self.max_retries != -1: - break - time.sleep(1) + logger.warning(f"Failed to fetch snapshot for camera {self.camera_id}, retry {retries}/{self.max_retries}") - except Exception as e: - logger.error(f"Fatal error in snapshot reader for camera {self.camera_id}: {e}") - finally: - logger.info(f"Snapshot reader thread ended for camera {self.camera_id}") + if self.max_retries != -1 and retries > self.max_retries: + logger.error(f"Max retries reached for snapshot camera {self.camera_id}") + break + + time.sleep(min(2.0, interval_seconds)) + continue + + # Validate image dimensions + if frame.shape[1] != self.expected_width or frame.shape[0] != self.expected_height: + logger.info(f"Camera {self.camera_id}: Snapshot dimensions {frame.shape[1]}x{frame.shape[0]} " + f"(expected {self.expected_width}x{self.expected_height})") + # Resize if needed (maintaining aspect ratio for high quality) + if frame.shape[1] > 0 and frame.shape[0] > 0: + # Only resize if significantly different + if abs(frame.shape[1] - self.expected_width) > 100: + frame = self._resize_maintain_aspect(frame, self.expected_width, self.expected_height) + + # Reset retry counter on successful fetch + retries = 0 + frame_count += 1 + + # Call frame callback + if self.frame_callback: + try: + self.frame_callback(self.camera_id, frame) + except Exception as e: + logger.error(f"Camera {self.camera_id}: Frame callback error: {e}") + + # Log progress every 30 seconds + current_time = time.time() + if current_time - last_log_time >= 30: + logger.info(f"Camera {self.camera_id}: {frame_count} snapshots processed") + last_log_time = current_time + + # Wait for next interval + elapsed = time.time() - start_time + sleep_time = max(0, interval_seconds - elapsed) + if sleep_time > 0: + self.stop_event.wait(sleep_time) + + except Exception as e: + logger.error(f"Error in snapshot loop for camera {self.camera_id}: {e}") + retries += 1 + if self.max_retries != -1 and retries > self.max_retries: + break + time.sleep(min(2.0, interval_seconds)) + + logger.info(f"Snapshot reader thread ended for camera {self.camera_id}") def _fetch_snapshot(self) -> Optional[np.ndarray]: - """Fetch a single snapshot from HTTP URL.""" + """Fetch a single high quality snapshot from HTTP URL.""" try: - # Parse URL to extract auth credentials if present + # Parse URL for authentication from urllib.parse import urlparse parsed_url = urlparse(self.snapshot_url) - # Prepare headers with proper authentication - headers = {} + headers = { + 'User-Agent': 'Python-Detector-Worker/1.0', + 'Accept': 'image/jpeg, image/png, image/*' + } auth = None if parsed_url.username and parsed_url.password: - # Use HTTP Basic Auth properly from requests.auth import HTTPBasicAuth, HTTPDigestAuth auth = HTTPBasicAuth(parsed_url.username, parsed_url.password) @@ -370,71 +376,76 @@ class HTTPSnapshotReader: if parsed_url.query: clean_url += f"?{parsed_url.query}" - # Try with Basic Auth first - response = requests.get(clean_url, auth=auth, timeout=10, headers=headers) + # Try Basic Auth first + response = requests.get(clean_url, auth=auth, timeout=15, headers=headers, + stream=True, verify=False) - # If Basic Auth fails, try Digest Auth (common for IP cameras) + # If Basic Auth fails, try Digest Auth if response.status_code == 401: auth = HTTPDigestAuth(parsed_url.username, parsed_url.password) - response = requests.get(clean_url, auth=auth, timeout=10, headers=headers) + response = requests.get(clean_url, auth=auth, timeout=15, headers=headers, + stream=True, verify=False) else: - # No auth in URL, use as-is - response = requests.get(self.snapshot_url, timeout=10, headers=headers) + response = requests.get(self.snapshot_url, timeout=15, headers=headers, + stream=True, verify=False) if response.status_code == 200: - # Convert bytes to numpy array - image_array = np.frombuffer(response.content, np.uint8) - # Decode as image + # Check content size + content_length = int(response.headers.get('content-length', 0)) + if content_length > self.max_file_size: + logger.warning(f"Snapshot too large for camera {self.camera_id}: {content_length} bytes") + return None + + # Read content + content = response.content + + # Convert to numpy array + image_array = np.frombuffer(content, np.uint8) + + # Decode as high quality image frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR) + + if frame is None: + logger.error(f"Failed to decode snapshot for camera {self.camera_id}") + return None + + logger.debug(f"Fetched snapshot for camera {self.camera_id}: {frame.shape[1]}x{frame.shape[0]}") return frame else: - logger.warning(f"HTTP {response.status_code} from {self.snapshot_url}") + logger.warning(f"HTTP {response.status_code} from {self.camera_id}") return None + except requests.RequestException as e: - logger.error(f"Request error fetching snapshot: {e}") + logger.error(f"Request error fetching snapshot for {self.camera_id}: {e}") return None except Exception as e: - logger.error(f"Error decoding snapshot: {e}") + logger.error(f"Error decoding snapshot for {self.camera_id}: {e}") return None + def _resize_maintain_aspect(self, frame: np.ndarray, target_width: int, target_height: int) -> np.ndarray: + """Resize image while maintaining aspect ratio for high quality.""" + h, w = frame.shape[:2] + aspect = w / h + target_aspect = target_width / target_height -def fetch_snapshot(url: str) -> Optional[np.ndarray]: - """Standalone function to fetch a snapshot (for compatibility).""" - try: - # Parse URL to extract auth credentials if present - from urllib.parse import urlparse - parsed_url = urlparse(url) - - auth = None - if parsed_url.username and parsed_url.password: - # Use HTTP Basic Auth properly - from requests.auth import HTTPBasicAuth, HTTPDigestAuth - auth = HTTPBasicAuth(parsed_url.username, parsed_url.password) - - # Reconstruct URL without credentials - clean_url = f"{parsed_url.scheme}://{parsed_url.hostname}" - if parsed_url.port: - clean_url += f":{parsed_url.port}" - clean_url += parsed_url.path - if parsed_url.query: - clean_url += f"?{parsed_url.query}" - - # Try with Basic Auth first - response = requests.get(clean_url, auth=auth, timeout=10) - - # If Basic Auth fails, try Digest Auth (common for IP cameras) - if response.status_code == 401: - auth = HTTPDigestAuth(parsed_url.username, parsed_url.password) - response = requests.get(clean_url, auth=auth, timeout=10) + if aspect > target_aspect: + # Image is wider + new_width = target_width + new_height = int(target_width / aspect) else: - # No auth in URL, use as-is - response = requests.get(url, timeout=10) + # Image is taller + new_height = target_height + new_width = int(target_height * aspect) - if response.status_code == 200: - image_array = np.frombuffer(response.content, np.uint8) - frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR) - return frame - return None - except Exception as e: - logger.error(f"Error fetching snapshot from {url}: {e}") - return None \ No newline at end of file + # Use INTER_LANCZOS4 for high quality downsampling + resized = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_LANCZOS4) + + # Pad to target size if needed + if new_width < target_width or new_height < target_height: + top = (target_height - new_height) // 2 + bottom = target_height - new_height - top + left = (target_width - new_width) // 2 + right = target_width - new_width - left + resized = cv2.copyMakeBorder(resized, top, bottom, left, right, cv2.BORDER_CONSTANT, value=[0, 0, 0]) + + return resized \ No newline at end of file