refactor: simplify frame handling by removing stream type management and enhancing validation
All checks were successful
Build Worker Base and Application Images / check-base-changes (push) Successful in 7s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 2m55s
Build Worker Base and Application Images / deploy-stack (push) Successful in 12s

This commit is contained in:
Siwat Sirichai 2025-09-26 00:07:48 +07:00
parent dc1db635d0
commit 719d16ae4d
4 changed files with 51 additions and 182 deletions

View file

@ -0,0 +1,9 @@
{
"permissions": {
"allow": [
"Bash(dir:*)"
],
"deny": [],
"ask": []
}
}

View file

@ -9,53 +9,25 @@ import logging
import numpy as np import numpy as np
from typing import Optional, Dict, Any, Tuple from typing import Optional, Dict, Any, Tuple
from collections import defaultdict from collections import defaultdict
from enum import Enum
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class StreamType(Enum):
"""Stream type enumeration."""
RTSP = "rtsp" # 1280x720 @ 6fps
HTTP = "http" # 2560x1440 high quality
class FrameBuffer: class FrameBuffer:
"""Thread-safe frame buffer optimized for different stream types.""" """Thread-safe frame buffer for all camera streams."""
def __init__(self, max_age_seconds: int = 5): def __init__(self, max_age_seconds: int = 5):
self.max_age_seconds = max_age_seconds self.max_age_seconds = max_age_seconds
self._frames: Dict[str, Dict[str, Any]] = {} self._frames: Dict[str, Dict[str, Any]] = {}
self._stream_types: Dict[str, StreamType] = {}
self._lock = threading.RLock() self._lock = threading.RLock()
# Stream-specific settings def put_frame(self, camera_id: str, frame: np.ndarray):
self.rtsp_config = { """Store a frame for the given camera ID."""
'width': 1280,
'height': 720,
'fps': 6,
'max_size_mb': 3 # 1280x720x3 bytes = ~2.6MB
}
self.http_config = {
'width': 2560,
'height': 1440,
'max_size_mb': 10
}
def put_frame(self, camera_id: str, frame: np.ndarray, stream_type: Optional[StreamType] = None):
"""Store a frame for the given camera ID with type-specific validation."""
with self._lock: with self._lock:
# Detect stream type if not provided # Validate frame
if stream_type is None: if not self._validate_frame(frame):
stream_type = self._detect_stream_type(frame) logger.warning(f"Frame validation failed for camera {camera_id}")
# Store stream type
self._stream_types[camera_id] = stream_type
# Validate frame based on stream type
if not self._validate_frame(frame, stream_type):
logger.warning(f"Frame validation failed for camera {camera_id} ({stream_type.value})")
return return
self._frames[camera_id] = { self._frames[camera_id] = {
@ -63,14 +35,9 @@ class FrameBuffer:
'timestamp': time.time(), 'timestamp': time.time(),
'shape': frame.shape, 'shape': frame.shape,
'dtype': str(frame.dtype), 'dtype': str(frame.dtype),
'stream_type': stream_type.value,
'size_mb': frame.nbytes / (1024 * 1024) 'size_mb': frame.nbytes / (1024 * 1024)
} }
# Commented out verbose frame storage logging
# logger.debug(f"Stored {stream_type.value} frame for camera {camera_id}: "
# f"{frame.shape[1]}x{frame.shape[0]}, {frame.nbytes / (1024 * 1024):.2f}MB")
def get_frame(self, camera_id: str) -> Optional[np.ndarray]: def get_frame(self, camera_id: str) -> Optional[np.ndarray]:
"""Get the latest frame for the given camera ID.""" """Get the latest frame for the given camera ID."""
with self._lock: with self._lock:
@ -84,8 +51,6 @@ class FrameBuffer:
if age > self.max_age_seconds: if age > self.max_age_seconds:
logger.debug(f"Frame for camera {camera_id} is {age:.1f}s old, discarding") logger.debug(f"Frame for camera {camera_id} is {age:.1f}s old, discarding")
del self._frames[camera_id] del self._frames[camera_id]
if camera_id in self._stream_types:
del self._stream_types[camera_id]
return None return None
return frame_data['frame'].copy() return frame_data['frame'].copy()
@ -101,8 +66,6 @@ class FrameBuffer:
if age > self.max_age_seconds: if age > self.max_age_seconds:
del self._frames[camera_id] del self._frames[camera_id]
if camera_id in self._stream_types:
del self._stream_types[camera_id]
return None return None
return { return {
@ -110,7 +73,6 @@ class FrameBuffer:
'age': age, 'age': age,
'shape': frame_data['shape'], 'shape': frame_data['shape'],
'dtype': frame_data['dtype'], 'dtype': frame_data['dtype'],
'stream_type': frame_data.get('stream_type', 'unknown'),
'size_mb': frame_data.get('size_mb', 0) 'size_mb': frame_data.get('size_mb', 0)
} }
@ -123,8 +85,6 @@ class FrameBuffer:
with self._lock: with self._lock:
if camera_id in self._frames: if camera_id in self._frames:
del self._frames[camera_id] del self._frames[camera_id]
if camera_id in self._stream_types:
del self._stream_types[camera_id]
logger.debug(f"Cleared frames for camera {camera_id}") logger.debug(f"Cleared frames for camera {camera_id}")
def clear_all(self): def clear_all(self):
@ -132,7 +92,6 @@ class FrameBuffer:
with self._lock: with self._lock:
count = len(self._frames) count = len(self._frames)
self._frames.clear() self._frames.clear()
self._stream_types.clear()
logger.debug(f"Cleared all frames ({count} cameras)") logger.debug(f"Cleared all frames ({count} cameras)")
def get_camera_list(self) -> list: def get_camera_list(self) -> list:
@ -152,8 +111,6 @@ class FrameBuffer:
# Clean up expired frames # Clean up expired frames
for camera_id in expired_cameras: for camera_id in expired_cameras:
del self._frames[camera_id] del self._frames[camera_id]
if camera_id in self._stream_types:
del self._stream_types[camera_id]
return valid_cameras return valid_cameras
@ -165,15 +122,12 @@ class FrameBuffer:
'total_cameras': len(self._frames), 'total_cameras': len(self._frames),
'valid_cameras': 0, 'valid_cameras': 0,
'expired_cameras': 0, 'expired_cameras': 0,
'rtsp_cameras': 0,
'http_cameras': 0,
'total_memory_mb': 0, 'total_memory_mb': 0,
'cameras': {} 'cameras': {}
} }
for camera_id, frame_data in self._frames.items(): for camera_id, frame_data in self._frames.items():
age = current_time - frame_data['timestamp'] age = current_time - frame_data['timestamp']
stream_type = frame_data.get('stream_type', 'unknown')
size_mb = frame_data.get('size_mb', 0) size_mb = frame_data.get('size_mb', 0)
if age <= self.max_age_seconds: if age <= self.max_age_seconds:
@ -181,11 +135,6 @@ class FrameBuffer:
else: else:
stats['expired_cameras'] += 1 stats['expired_cameras'] += 1
if stream_type == StreamType.RTSP.value:
stats['rtsp_cameras'] += 1
elif stream_type == StreamType.HTTP.value:
stats['http_cameras'] += 1
stats['total_memory_mb'] += size_mb stats['total_memory_mb'] += size_mb
stats['cameras'][camera_id] = { stats['cameras'][camera_id] = {
@ -193,74 +142,45 @@ class FrameBuffer:
'valid': age <= self.max_age_seconds, 'valid': age <= self.max_age_seconds,
'shape': frame_data['shape'], 'shape': frame_data['shape'],
'dtype': frame_data['dtype'], 'dtype': frame_data['dtype'],
'stream_type': stream_type,
'size_mb': size_mb 'size_mb': size_mb
} }
return stats return stats
def _detect_stream_type(self, frame: np.ndarray) -> StreamType: def _validate_frame(self, frame: np.ndarray) -> bool:
"""Detect stream type based on frame dimensions.""" """Validate frame - basic validation for any stream type."""
h, w = frame.shape[:2]
# Check if it matches RTSP dimensions (1280x720)
if w == self.rtsp_config['width'] and h == self.rtsp_config['height']:
return StreamType.RTSP
# Check if it matches HTTP dimensions (2560x1440) or close to it
if w >= 2000 and h >= 1000:
return StreamType.HTTP
# Default based on size
if w <= 1920 and h <= 1080:
return StreamType.RTSP
else:
return StreamType.HTTP
def _validate_frame(self, frame: np.ndarray, stream_type: StreamType) -> bool:
"""Validate frame based on stream type."""
if frame is None or frame.size == 0: if frame is None or frame.size == 0:
return False return False
h, w = frame.shape[:2] h, w = frame.shape[:2]
size_mb = frame.nbytes / (1024 * 1024) size_mb = frame.nbytes / (1024 * 1024)
if stream_type == StreamType.RTSP: # Basic size validation - reject extremely large frames regardless of type
config = self.rtsp_config max_size_mb = 50 # Generous limit for any frame type
# Allow some tolerance for RTSP streams if size_mb > max_size_mb:
if abs(w - config['width']) > 100 or abs(h - config['height']) > 100: logger.warning(f"Frame too large: {size_mb:.2f}MB (max {max_size_mb}MB) for {w}x{h}")
logger.warning(f"RTSP frame size mismatch: {w}x{h} (expected {config['width']}x{config['height']})")
if size_mb > config['max_size_mb']:
logger.warning(f"RTSP frame too large: {size_mb:.2f}MB (max {config['max_size_mb']}MB)")
return False return False
elif stream_type == StreamType.HTTP: # Basic dimension validation
config = self.http_config if w < 100 or h < 100:
# More flexible for HTTP snapshots logger.warning(f"Frame too small: {w}x{h}")
if size_mb > config['max_size_mb']:
logger.warning(f"HTTP snapshot too large: {size_mb:.2f}MB (max {config['max_size_mb']}MB)")
return False return False
return True return True
class CacheBuffer: class CacheBuffer:
"""Enhanced frame cache with support for cropping and optimized for different formats.""" """Enhanced frame cache with support for cropping."""
def __init__(self, max_age_seconds: int = 10): def __init__(self, max_age_seconds: int = 10):
self.frame_buffer = FrameBuffer(max_age_seconds) self.frame_buffer = FrameBuffer(max_age_seconds)
self._crop_cache: Dict[str, Dict[str, Any]] = {} self._crop_cache: Dict[str, Dict[str, Any]] = {}
self._cache_lock = threading.RLock() self._cache_lock = threading.RLock()
self.jpeg_quality = 95 # High quality for all frames
# Quality settings for different stream types def put_frame(self, camera_id: str, frame: np.ndarray):
self.jpeg_quality = {
StreamType.RTSP: 90, # Good quality for 720p
StreamType.HTTP: 95 # High quality for 2K
}
def put_frame(self, camera_id: str, frame: np.ndarray, stream_type: Optional[StreamType] = None):
"""Store a frame and clear any associated crop cache.""" """Store a frame and clear any associated crop cache."""
self.frame_buffer.put_frame(camera_id, frame, stream_type) self.frame_buffer.put_frame(camera_id, frame)
# Clear crop cache for this camera since we have a new frame # Clear crop cache for this camera since we have a new frame
with self._cache_lock: with self._cache_lock:
@ -325,21 +245,15 @@ class CacheBuffer:
def get_frame_as_jpeg(self, camera_id: str, crop_coords: Optional[Tuple[int, int, int, int]] = None, def get_frame_as_jpeg(self, camera_id: str, crop_coords: Optional[Tuple[int, int, int, int]] = None,
quality: Optional[int] = None) -> Optional[bytes]: quality: Optional[int] = None) -> Optional[bytes]:
"""Get frame as JPEG bytes with format-specific quality settings.""" """Get frame as JPEG bytes."""
frame = self.get_frame(camera_id, crop_coords) frame = self.get_frame(camera_id, crop_coords)
if frame is None: if frame is None:
return None return None
try: try:
# Determine quality based on stream type if not specified # Use specified quality or default
if quality is None: if quality is None:
frame_info = self.frame_buffer.get_frame_info(camera_id) quality = self.jpeg_quality
if frame_info:
stream_type_str = frame_info.get('stream_type', StreamType.RTSP.value)
stream_type = StreamType.RTSP if stream_type_str == StreamType.RTSP.value else StreamType.HTTP
quality = self.jpeg_quality[stream_type]
else:
quality = 90 # Default
# Encode as JPEG with specified quality # Encode as JPEG with specified quality
encode_params = [cv2.IMWRITE_JPEG_QUALITY, quality] encode_params = [cv2.IMWRITE_JPEG_QUALITY, quality]

View file

@ -10,7 +10,7 @@ from dataclasses import dataclass
from collections import defaultdict from collections import defaultdict
from .readers import RTSPReader, HTTPSnapshotReader from .readers import RTSPReader, HTTPSnapshotReader
from .buffers import shared_cache_buffer, StreamType from .buffers import shared_cache_buffer
from ..tracking.integration import TrackingPipelineIntegration from ..tracking.integration import TrackingPipelineIntegration
@ -177,12 +177,8 @@ class StreamManager:
def _frame_callback(self, camera_id: str, frame): def _frame_callback(self, camera_id: str, frame):
"""Callback for when a new frame is available.""" """Callback for when a new frame is available."""
try: try:
# Detect stream type based on frame dimensions # Store frame in shared buffer
stream_type = self._detect_stream_type(frame) shared_cache_buffer.put_frame(camera_id, frame)
# Store frame in shared buffer with stream type
shared_cache_buffer.put_frame(camera_id, frame, stream_type)
# Process tracking for subscriptions with tracking integration # Process tracking for subscriptions with tracking integration
self._process_tracking_for_camera(camera_id, frame) self._process_tracking_for_camera(camera_id, frame)
@ -404,26 +400,6 @@ class StreamManager:
stats[subscription_id] = subscription_info.tracking_integration.get_statistics() stats[subscription_id] = subscription_info.tracking_integration.get_statistics()
return stats return stats
def _detect_stream_type(self, frame) -> StreamType:
"""Detect stream type based on frame dimensions."""
if frame is None:
return StreamType.RTSP # Default
h, w = frame.shape[:2]
# RTSP: 1280x720
if w == 1280 and h == 720:
return StreamType.RTSP
# HTTP: 2560x1440 or larger
if w >= 2000 and h >= 1000:
return StreamType.HTTP
# Default based on size
if w <= 1920 and h <= 1080:
return StreamType.RTSP
else:
return StreamType.HTTP
def get_stats(self) -> Dict[str, Any]: def get_stats(self) -> Dict[str, Any]:
"""Get comprehensive streaming statistics.""" """Get comprehensive streaming statistics."""
@ -431,22 +407,11 @@ class StreamManager:
buffer_stats = shared_cache_buffer.get_stats() buffer_stats = shared_cache_buffer.get_stats()
tracking_stats = self.get_tracking_stats() tracking_stats = self.get_tracking_stats()
# Add stream type information
stream_types = {}
for camera_id in self._streams.keys():
if isinstance(self._streams[camera_id], RTSPReader):
stream_types[camera_id] = 'rtsp'
elif isinstance(self._streams[camera_id], HTTPSnapshotReader):
stream_types[camera_id] = 'http'
else:
stream_types[camera_id] = 'unknown'
return { return {
'active_subscriptions': len(self._subscriptions), 'active_subscriptions': len(self._subscriptions),
'active_streams': len(self._streams), 'active_streams': len(self._streams),
'cameras_with_subscribers': len(self._camera_subscribers), 'cameras_with_subscribers': len(self._camera_subscribers),
'max_streams': self.max_streams, 'max_streams': self.max_streams,
'stream_types': stream_types,
'subscriptions_by_camera': { 'subscriptions_by_camera': {
camera_id: len(subscribers) camera_id: len(subscribers)
for camera_id, subscribers in self._camera_subscribers.items() for camera_id, subscribers in self._camera_subscribers.items()

View file

@ -37,7 +37,6 @@ class RTSPReader:
self.expected_fps = 6 self.expected_fps = 6
# Frame processing parameters # Frame processing parameters
self.frame_interval = 1.0 / self.expected_fps # ~167ms for 6fps
self.error_recovery_delay = 5.0 # Increased from 2.0 for stability self.error_recovery_delay = 5.0 # Increased from 2.0 for stability
self.max_consecutive_errors = 30 # Increased from 10 to handle network jitter self.max_consecutive_errors = 30 # Increased from 10 to handle network jitter
self.stream_timeout = 30.0 self.stream_timeout = 30.0
@ -72,7 +71,6 @@ class RTSPReader:
frame_count = 0 frame_count = 0
last_log_time = time.time() last_log_time = time.time()
last_successful_frame_time = time.time() last_successful_frame_time = time.time()
last_frame_time = 0
while not self.stop_event.is_set(): while not self.stop_event.is_set():
try: try:
@ -90,12 +88,7 @@ class RTSPReader:
last_successful_frame_time = time.time() last_successful_frame_time = time.time()
continue continue
# Rate limiting for 6fps # Read frame immediately without rate limiting for minimum latency
current_time = time.time()
if current_time - last_frame_time < self.frame_interval:
time.sleep(0.01) # Small sleep to avoid busy waiting
continue
ret, frame = self.cap.read() ret, frame = self.cap.read()
if not ret or frame is None: if not ret or frame is None:
@ -118,13 +111,8 @@ class RTSPReader:
time.sleep(sleep_time) time.sleep(sleep_time)
continue continue
# Validate frame dimensions # Accept any valid frame dimensions - don't force specific resolution
if frame.shape[1] != self.expected_width or frame.shape[0] != self.expected_height: if frame.shape[1] <= 0 or frame.shape[0] <= 0:
logger.warning(f"Camera {self.camera_id}: Unexpected frame dimensions {frame.shape[1]}x{frame.shape[0]}")
# Try to resize if dimensions are wrong
if frame.shape[1] > 0 and frame.shape[0] > 0:
frame = cv2.resize(frame, (self.expected_width, self.expected_height))
else:
consecutive_errors += 1 consecutive_errors += 1
continue continue
@ -138,7 +126,6 @@ class RTSPReader:
consecutive_errors = 0 consecutive_errors = 0
frame_count += 1 frame_count += 1
last_successful_frame_time = time.time() last_successful_frame_time = time.time()
last_frame_time = current_time
# Call frame callback # Call frame callback
if self.frame_callback: if self.frame_callback:
@ -148,6 +135,7 @@ class RTSPReader:
logger.error(f"Camera {self.camera_id}: Frame callback error: {e}") logger.error(f"Camera {self.camera_id}: Frame callback error: {e}")
# Log progress every 30 seconds # Log progress every 30 seconds
current_time = time.time()
if current_time - last_log_time >= 30: if current_time - last_log_time >= 30:
logger.info(f"Camera {self.camera_id}: {frame_count} frames processed") logger.info(f"Camera {self.camera_id}: {frame_count} frames processed")
last_log_time = current_time last_log_time = current_time
@ -261,14 +249,12 @@ class RTSPReader:
logger.error(f"Failed to open stream for camera {self.camera_id}") logger.error(f"Failed to open stream for camera {self.camera_id}")
return False return False
# Set capture properties for 1280x720@6fps # Don't force resolution/fps - let the stream determine its natural specs
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.expected_width) # The camera will provide whatever resolution/fps it supports
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.expected_height)
self.cap.set(cv2.CAP_PROP_FPS, self.expected_fps)
# Set moderate buffer to handle network jitter while avoiding excessive latency # Set minimal buffer for lowest latency - single frame buffer
# Buffer of 3 frames provides resilience without major delay # This ensures we always get the most recent frame
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 3) self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
# Set FFMPEG options for better H.264 handling # Set FFMPEG options for better H.264 handling
self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264')) self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264'))
@ -405,15 +391,10 @@ class HTTPSnapshotReader:
time.sleep(min(2.0, interval_seconds)) time.sleep(min(2.0, interval_seconds))
continue continue
# Validate image dimensions # Accept any valid image dimensions - don't force specific resolution
if frame.shape[1] != self.expected_width or frame.shape[0] != self.expected_height: if frame.shape[1] <= 0 or frame.shape[0] <= 0:
logger.info(f"Camera {self.camera_id}: Snapshot dimensions {frame.shape[1]}x{frame.shape[0]} " logger.warning(f"Camera {self.camera_id}: Invalid frame dimensions {frame.shape[1]}x{frame.shape[0]}")
f"(expected {self.expected_width}x{self.expected_height})") continue
# Resize if needed (maintaining aspect ratio for high quality)
if frame.shape[1] > 0 and frame.shape[0] > 0:
# Only resize if significantly different
if abs(frame.shape[1] - self.expected_width) > 100:
frame = self._resize_maintain_aspect(frame, self.expected_width, self.expected_height)
# Reset retry counter on successful fetch # Reset retry counter on successful fetch
retries = 0 retries = 0