fix: minor #12
5 changed files with 61 additions and 18 deletions
7
app.py
7
app.py
|
@ -76,7 +76,7 @@ else:
|
||||||
"poll_interval_ms": 100,
|
"poll_interval_ms": 100,
|
||||||
"reconnect_interval_sec": 5,
|
"reconnect_interval_sec": 5,
|
||||||
"target_fps": 10,
|
"target_fps": 10,
|
||||||
"max_streams": 5,
|
"max_streams": 20,
|
||||||
"max_retries": 3
|
"max_retries": 3
|
||||||
}
|
}
|
||||||
logger.warning(f"Configuration file {config_path} not found, using defaults")
|
logger.warning(f"Configuration file {config_path} not found, using defaults")
|
||||||
|
@ -85,6 +85,11 @@ else:
|
||||||
os.makedirs("models", exist_ok=True)
|
os.makedirs("models", exist_ok=True)
|
||||||
logger.info("Ensured models directory exists")
|
logger.info("Ensured models directory exists")
|
||||||
|
|
||||||
|
# Initialize stream manager with config value
|
||||||
|
from core.streaming import initialize_stream_manager
|
||||||
|
initialize_stream_manager(max_streams=config.get('max_streams', 10))
|
||||||
|
logger.info(f"Initialized stream manager with max_streams={config.get('max_streams', 10)}")
|
||||||
|
|
||||||
# Store cached frames for REST API access (temporary storage)
|
# Store cached frames for REST API access (temporary storage)
|
||||||
latest_frames = {}
|
latest_frames = {}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
{
|
{
|
||||||
"poll_interval_ms": 100,
|
"poll_interval_ms": 100,
|
||||||
"max_streams": 5,
|
"max_streams": 20,
|
||||||
"target_fps": 2,
|
"target_fps": 2,
|
||||||
"reconnect_interval_sec": 5,
|
"reconnect_interval_sec": 10,
|
||||||
"max_retries": -1
|
"max_retries": -1,
|
||||||
|
"rtsp_buffer_size": 3,
|
||||||
|
"rtsp_tcp_transport": true
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,7 +4,7 @@ Provides modular frame readers, buffers, and stream management.
|
||||||
"""
|
"""
|
||||||
from .readers import RTSPReader, HTTPSnapshotReader
|
from .readers import RTSPReader, HTTPSnapshotReader
|
||||||
from .buffers import FrameBuffer, CacheBuffer, shared_frame_buffer, shared_cache_buffer
|
from .buffers import FrameBuffer, CacheBuffer, shared_frame_buffer, shared_cache_buffer
|
||||||
from .manager import StreamManager, StreamConfig, SubscriptionInfo, shared_stream_manager
|
from .manager import StreamManager, StreamConfig, SubscriptionInfo, shared_stream_manager, initialize_stream_manager
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
# Readers
|
# Readers
|
||||||
|
@ -21,5 +21,6 @@ __all__ = [
|
||||||
'StreamManager',
|
'StreamManager',
|
||||||
'StreamConfig',
|
'StreamConfig',
|
||||||
'SubscriptionInfo',
|
'SubscriptionInfo',
|
||||||
'shared_stream_manager'
|
'shared_stream_manager',
|
||||||
|
'initialize_stream_manager'
|
||||||
]
|
]
|
|
@ -458,4 +458,11 @@ class StreamManager:
|
||||||
|
|
||||||
|
|
||||||
# Global shared instance for application use
|
# Global shared instance for application use
|
||||||
shared_stream_manager = StreamManager(max_streams=10)
|
# Will be initialized with config value in app.py
|
||||||
|
shared_stream_manager = None
|
||||||
|
|
||||||
|
def initialize_stream_manager(max_streams: int = 10):
|
||||||
|
"""Initialize the global stream manager with config value."""
|
||||||
|
global shared_stream_manager
|
||||||
|
shared_stream_manager = StreamManager(max_streams=max_streams)
|
||||||
|
return shared_stream_manager
|
|
@ -38,8 +38,8 @@ class RTSPReader:
|
||||||
|
|
||||||
# Frame processing parameters
|
# Frame processing parameters
|
||||||
self.frame_interval = 1.0 / self.expected_fps # ~167ms for 6fps
|
self.frame_interval = 1.0 / self.expected_fps # ~167ms for 6fps
|
||||||
self.error_recovery_delay = 2.0
|
self.error_recovery_delay = 5.0 # Increased from 2.0 for stability
|
||||||
self.max_consecutive_errors = 10
|
self.max_consecutive_errors = 30 # Increased from 10 to handle network jitter
|
||||||
self.stream_timeout = 30.0
|
self.stream_timeout = 30.0
|
||||||
|
|
||||||
def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]):
|
def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]):
|
||||||
|
@ -107,9 +107,15 @@ class RTSPReader:
|
||||||
consecutive_errors = 0
|
consecutive_errors = 0
|
||||||
time.sleep(self.error_recovery_delay)
|
time.sleep(self.error_recovery_delay)
|
||||||
else:
|
else:
|
||||||
# Skip corrupted frame and continue
|
# Skip corrupted frame and continue with exponential backoff
|
||||||
|
if consecutive_errors <= 5:
|
||||||
logger.debug(f"Camera {self.camera_id}: Frame read failed (error {consecutive_errors})")
|
logger.debug(f"Camera {self.camera_id}: Frame read failed (error {consecutive_errors})")
|
||||||
time.sleep(0.1)
|
elif consecutive_errors % 10 == 0: # Log every 10th error after 5
|
||||||
|
logger.warning(f"Camera {self.camera_id}: Continuing frame read failures (error {consecutive_errors})")
|
||||||
|
|
||||||
|
# Exponential backoff with cap at 1 second
|
||||||
|
sleep_time = min(0.1 * (1.5 ** min(consecutive_errors, 10)), 1.0)
|
||||||
|
time.sleep(sleep_time)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Validate frame dimensions
|
# Validate frame dimensions
|
||||||
|
@ -169,7 +175,18 @@ class RTSPReader:
|
||||||
|
|
||||||
logger.info(f"Initializing capture for camera {self.camera_id}")
|
logger.info(f"Initializing capture for camera {self.camera_id}")
|
||||||
|
|
||||||
# Create capture with FFMPEG backend
|
# Create capture with FFMPEG backend and TCP transport for reliability
|
||||||
|
# Use TCP instead of UDP to prevent packet loss
|
||||||
|
rtsp_url_tcp = self.rtsp_url.replace('rtsp://', 'rtsp://')
|
||||||
|
if '?' in rtsp_url_tcp:
|
||||||
|
rtsp_url_tcp += '&tcp'
|
||||||
|
else:
|
||||||
|
rtsp_url_tcp += '?tcp'
|
||||||
|
|
||||||
|
# Alternative: Set environment variable for RTSP transport
|
||||||
|
import os
|
||||||
|
os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'rtsp_transport;tcp'
|
||||||
|
|
||||||
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
|
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
|
||||||
|
|
||||||
if not self.cap.isOpened():
|
if not self.cap.isOpened():
|
||||||
|
@ -181,8 +198,9 @@ class RTSPReader:
|
||||||
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.expected_height)
|
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.expected_height)
|
||||||
self.cap.set(cv2.CAP_PROP_FPS, self.expected_fps)
|
self.cap.set(cv2.CAP_PROP_FPS, self.expected_fps)
|
||||||
|
|
||||||
# Set small buffer to reduce latency and avoid accumulating corrupted frames
|
# Set moderate buffer to handle network jitter while avoiding excessive latency
|
||||||
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
|
# Buffer of 3 frames provides resilience without major delay
|
||||||
|
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 3)
|
||||||
|
|
||||||
# Set FFMPEG options for better H.264 handling
|
# Set FFMPEG options for better H.264 handling
|
||||||
self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264'))
|
self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264'))
|
||||||
|
@ -208,13 +226,23 @@ class RTSPReader:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def _reinitialize_capture(self):
|
def _reinitialize_capture(self):
|
||||||
"""Reinitialize capture after errors."""
|
"""Reinitialize capture after errors with retry logic."""
|
||||||
logger.info(f"Reinitializing capture for camera {self.camera_id}")
|
logger.info(f"Reinitializing capture for camera {self.camera_id}")
|
||||||
if self.cap:
|
if self.cap:
|
||||||
self.cap.release()
|
self.cap.release()
|
||||||
self.cap = None
|
self.cap = None
|
||||||
time.sleep(1.0)
|
|
||||||
self._initialize_capture()
|
# Longer delay before reconnection to avoid rapid reconnect loops
|
||||||
|
time.sleep(3.0)
|
||||||
|
|
||||||
|
# Retry initialization up to 3 times
|
||||||
|
for attempt in range(3):
|
||||||
|
if self._initialize_capture():
|
||||||
|
logger.info(f"Successfully reinitialized camera {self.camera_id} on attempt {attempt + 1}")
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
logger.warning(f"Failed to reinitialize camera {self.camera_id} on attempt {attempt + 1}")
|
||||||
|
time.sleep(2.0)
|
||||||
|
|
||||||
def _is_frame_corrupted(self, frame: np.ndarray) -> bool:
|
def _is_frame_corrupted(self, frame: np.ndarray) -> bool:
|
||||||
"""Check if frame is corrupted (all black, all white, or excessive noise)."""
|
"""Check if frame is corrupted (all black, all white, or excessive noise)."""
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue