Merge pull request 'fix: minor' (#12) from dev into main
	
		
			
	
		
	
	
		
	
		
			All checks were successful
		
		
	
	
		
			
				
	
				Build Worker Base and Application Images / check-base-changes (push) Successful in 8s
				
			
		
			
				
	
				Build Worker Base and Application Images / build-base (push) Has been skipped
				
			
		
			
				
	
				Build Worker Base and Application Images / build-docker (push) Successful in 2m44s
				
			
		
			
				
	
				Build Worker Base and Application Images / deploy-stack (push) Successful in 22s
				
			
		
		
	
	
				
					
				
			
		
			All checks were successful
		
		
	
	Build Worker Base and Application Images / check-base-changes (push) Successful in 8s
				
			Build Worker Base and Application Images / build-base (push) Has been skipped
				
			Build Worker Base and Application Images / build-docker (push) Successful in 2m44s
				
			Build Worker Base and Application Images / deploy-stack (push) Successful in 22s
				
			Reviewed-on: #12
This commit is contained in:
		
						commit
						1cc46e0663
					
				
					 5 changed files with 61 additions and 18 deletions
				
			
		
							
								
								
									
										7
									
								
								app.py
									
										
									
									
									
								
							
							
						
						
									
										7
									
								
								app.py
									
										
									
									
									
								
							| 
						 | 
				
			
			@ -76,7 +76,7 @@ else:
 | 
			
		|||
        "poll_interval_ms": 100,
 | 
			
		||||
        "reconnect_interval_sec": 5,
 | 
			
		||||
        "target_fps": 10,
 | 
			
		||||
        "max_streams": 5,
 | 
			
		||||
        "max_streams": 20,
 | 
			
		||||
        "max_retries": 3
 | 
			
		||||
    }
 | 
			
		||||
    logger.warning(f"Configuration file {config_path} not found, using defaults")
 | 
			
		||||
| 
						 | 
				
			
			@ -85,6 +85,11 @@ else:
 | 
			
		|||
os.makedirs("models", exist_ok=True)
 | 
			
		||||
logger.info("Ensured models directory exists")
 | 
			
		||||
 | 
			
		||||
# Initialize stream manager with config value
 | 
			
		||||
from core.streaming import initialize_stream_manager
 | 
			
		||||
initialize_stream_manager(max_streams=config.get('max_streams', 10))
 | 
			
		||||
logger.info(f"Initialized stream manager with max_streams={config.get('max_streams', 10)}")
 | 
			
		||||
 | 
			
		||||
# Store cached frames for REST API access (temporary storage)
 | 
			
		||||
latest_frames = {}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,7 +1,9 @@
 | 
			
		|||
{
 | 
			
		||||
  "poll_interval_ms": 100,
 | 
			
		||||
  "max_streams": 5,
 | 
			
		||||
  "max_streams": 20,
 | 
			
		||||
  "target_fps": 2,
 | 
			
		||||
  "reconnect_interval_sec": 5,
 | 
			
		||||
  "max_retries": -1
 | 
			
		||||
  "reconnect_interval_sec": 10,
 | 
			
		||||
  "max_retries": -1,
 | 
			
		||||
  "rtsp_buffer_size": 3,
 | 
			
		||||
  "rtsp_tcp_transport": true
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -4,7 +4,7 @@ Provides modular frame readers, buffers, and stream management.
 | 
			
		|||
"""
 | 
			
		||||
from .readers import RTSPReader, HTTPSnapshotReader
 | 
			
		||||
from .buffers import FrameBuffer, CacheBuffer, shared_frame_buffer, shared_cache_buffer
 | 
			
		||||
from .manager import StreamManager, StreamConfig, SubscriptionInfo, shared_stream_manager
 | 
			
		||||
from .manager import StreamManager, StreamConfig, SubscriptionInfo, shared_stream_manager, initialize_stream_manager
 | 
			
		||||
 | 
			
		||||
__all__ = [
 | 
			
		||||
    # Readers
 | 
			
		||||
| 
						 | 
				
			
			@ -21,5 +21,6 @@ __all__ = [
 | 
			
		|||
    'StreamManager',
 | 
			
		||||
    'StreamConfig',
 | 
			
		||||
    'SubscriptionInfo',
 | 
			
		||||
    'shared_stream_manager'
 | 
			
		||||
    'shared_stream_manager',
 | 
			
		||||
    'initialize_stream_manager'
 | 
			
		||||
]
 | 
			
		||||
| 
						 | 
				
			
			@ -458,4 +458,11 @@ class StreamManager:
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
# Global shared instance for application use
 | 
			
		||||
shared_stream_manager = StreamManager(max_streams=10)
 | 
			
		||||
# Will be initialized with config value in app.py
 | 
			
		||||
shared_stream_manager = None
 | 
			
		||||
 | 
			
		||||
def initialize_stream_manager(max_streams: int = 10):
 | 
			
		||||
    """Initialize the global stream manager with config value."""
 | 
			
		||||
    global shared_stream_manager
 | 
			
		||||
    shared_stream_manager = StreamManager(max_streams=max_streams)
 | 
			
		||||
    return shared_stream_manager
 | 
			
		||||
| 
						 | 
				
			
			@ -38,8 +38,8 @@ class RTSPReader:
 | 
			
		|||
 | 
			
		||||
        # Frame processing parameters
 | 
			
		||||
        self.frame_interval = 1.0 / self.expected_fps  # ~167ms for 6fps
 | 
			
		||||
        self.error_recovery_delay = 2.0
 | 
			
		||||
        self.max_consecutive_errors = 10
 | 
			
		||||
        self.error_recovery_delay = 5.0  # Increased from 2.0 for stability
 | 
			
		||||
        self.max_consecutive_errors = 30  # Increased from 10 to handle network jitter
 | 
			
		||||
        self.stream_timeout = 30.0
 | 
			
		||||
 | 
			
		||||
    def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]):
 | 
			
		||||
| 
						 | 
				
			
			@ -107,9 +107,15 @@ class RTSPReader:
 | 
			
		|||
                        consecutive_errors = 0
 | 
			
		||||
                        time.sleep(self.error_recovery_delay)
 | 
			
		||||
                    else:
 | 
			
		||||
                        # Skip corrupted frame and continue
 | 
			
		||||
                        logger.debug(f"Camera {self.camera_id}: Frame read failed (error {consecutive_errors})")
 | 
			
		||||
                        time.sleep(0.1)
 | 
			
		||||
                        # Skip corrupted frame and continue with exponential backoff
 | 
			
		||||
                        if consecutive_errors <= 5:
 | 
			
		||||
                            logger.debug(f"Camera {self.camera_id}: Frame read failed (error {consecutive_errors})")
 | 
			
		||||
                        elif consecutive_errors % 10 == 0:  # Log every 10th error after 5
 | 
			
		||||
                            logger.warning(f"Camera {self.camera_id}: Continuing frame read failures (error {consecutive_errors})")
 | 
			
		||||
 | 
			
		||||
                        # Exponential backoff with cap at 1 second
 | 
			
		||||
                        sleep_time = min(0.1 * (1.5 ** min(consecutive_errors, 10)), 1.0)
 | 
			
		||||
                        time.sleep(sleep_time)
 | 
			
		||||
                    continue
 | 
			
		||||
 | 
			
		||||
                # Validate frame dimensions
 | 
			
		||||
| 
						 | 
				
			
			@ -169,7 +175,18 @@ class RTSPReader:
 | 
			
		|||
 | 
			
		||||
            logger.info(f"Initializing capture for camera {self.camera_id}")
 | 
			
		||||
 | 
			
		||||
            # Create capture with FFMPEG backend
 | 
			
		||||
            # Create capture with FFMPEG backend and TCP transport for reliability
 | 
			
		||||
            # Use TCP instead of UDP to prevent packet loss
 | 
			
		||||
            rtsp_url_tcp = self.rtsp_url.replace('rtsp://', 'rtsp://')
 | 
			
		||||
            if '?' in rtsp_url_tcp:
 | 
			
		||||
                rtsp_url_tcp += '&tcp'
 | 
			
		||||
            else:
 | 
			
		||||
                rtsp_url_tcp += '?tcp'
 | 
			
		||||
 | 
			
		||||
            # Alternative: Set environment variable for RTSP transport
 | 
			
		||||
            import os
 | 
			
		||||
            os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'rtsp_transport;tcp'
 | 
			
		||||
 | 
			
		||||
            self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
 | 
			
		||||
 | 
			
		||||
            if not self.cap.isOpened():
 | 
			
		||||
| 
						 | 
				
			
			@ -181,8 +198,9 @@ class RTSPReader:
 | 
			
		|||
            self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.expected_height)
 | 
			
		||||
            self.cap.set(cv2.CAP_PROP_FPS, self.expected_fps)
 | 
			
		||||
 | 
			
		||||
            # Set small buffer to reduce latency and avoid accumulating corrupted frames
 | 
			
		||||
            self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
 | 
			
		||||
            # Set moderate buffer to handle network jitter while avoiding excessive latency
 | 
			
		||||
            # Buffer of 3 frames provides resilience without major delay
 | 
			
		||||
            self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 3)
 | 
			
		||||
 | 
			
		||||
            # Set FFMPEG options for better H.264 handling
 | 
			
		||||
            self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264'))
 | 
			
		||||
| 
						 | 
				
			
			@ -208,13 +226,23 @@ class RTSPReader:
 | 
			
		|||
            return False
 | 
			
		||||
 | 
			
		||||
    def _reinitialize_capture(self):
 | 
			
		||||
        """Reinitialize capture after errors."""
 | 
			
		||||
        """Reinitialize capture after errors with retry logic."""
 | 
			
		||||
        logger.info(f"Reinitializing capture for camera {self.camera_id}")
 | 
			
		||||
        if self.cap:
 | 
			
		||||
            self.cap.release()
 | 
			
		||||
            self.cap = None
 | 
			
		||||
        time.sleep(1.0)
 | 
			
		||||
        self._initialize_capture()
 | 
			
		||||
 | 
			
		||||
        # Longer delay before reconnection to avoid rapid reconnect loops
 | 
			
		||||
        time.sleep(3.0)
 | 
			
		||||
 | 
			
		||||
        # Retry initialization up to 3 times
 | 
			
		||||
        for attempt in range(3):
 | 
			
		||||
            if self._initialize_capture():
 | 
			
		||||
                logger.info(f"Successfully reinitialized camera {self.camera_id} on attempt {attempt + 1}")
 | 
			
		||||
                break
 | 
			
		||||
            else:
 | 
			
		||||
                logger.warning(f"Failed to reinitialize camera {self.camera_id} on attempt {attempt + 1}")
 | 
			
		||||
                time.sleep(2.0)
 | 
			
		||||
 | 
			
		||||
    def _is_frame_corrupted(self, frame: np.ndarray) -> bool:
 | 
			
		||||
        """Check if frame is corrupted (all black, all white, or excessive noise)."""
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue