""" Stream lifecycle management and coordination. This module provides centralized management of camera streams including lifecycle management, resource allocation, and stream coordination. """ import time import queue import logging import threading from typing import Dict, List, Any, Optional, Tuple, Set from dataclasses import dataclass, field from datetime import datetime from ..core.constants import ( DEFAULT_MAX_STREAMS, SHARED_STREAM_BUFFER_SIZE, DEFAULT_RECONNECT_INTERVAL_SEC, DEFAULT_MAX_RETRIES ) from ..core.exceptions import StreamError, create_stream_error from ..streams.frame_reader import create_frame_reader_thread from ..streams.camera_monitor import set_camera_connected logger = logging.getLogger(__name__) @dataclass class StreamInfo: """Information about a single camera stream.""" camera_id: str stream_url: str stream_type: str # "rtsp" or "snapshot" snapshot_interval: Optional[int] = None buffer: Optional[queue.Queue] = None stop_event: Optional[threading.Event] = None thread: Optional[threading.Thread] = None subscribers: Set[str] = field(default_factory=set) created_at: float = field(default_factory=time.time) last_frame_time: Optional[float] = None frame_count: int = 0 # Additional WebSocket fields subscriptionIdentifier: Optional[str] = None modelId: Optional[int] = None modelName: Optional[str] = None cropX1: Optional[int] = None cropY1: Optional[int] = None cropX2: Optional[int] = None cropY2: Optional[int] = None def to_dict(self) -> Dict[str, Any]: """Convert to dictionary format.""" return { "camera_id": self.camera_id, "stream_url": self.stream_url, "stream_type": self.stream_type, "snapshot_interval": self.snapshot_interval, "subscriber_count": len(self.subscribers), "subscribers": list(self.subscribers), "created_at": self.created_at, "last_frame_time": self.last_frame_time, "frame_count": self.frame_count, "is_active": self.thread is not None and self.thread.is_alive() } @dataclass class StreamSubscription: """Information about a stream subscription.""" subscription_id: str camera_id: str subscriber_id: str created_at: float = field(default_factory=time.time) last_access: float = field(default_factory=time.time) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary format.""" return { "subscription_id": self.subscription_id, "camera_id": self.camera_id, "subscriber_id": self.subscriber_id, "created_at": self.created_at, "last_access": self.last_access } class StreamManager: """ Manages camera stream lifecycle and resource allocation. This class provides centralized management of camera streams including: - Stream lifecycle management (start/stop/restart) - Resource allocation and sharing - Subscriber management - Connection state monitoring """ def __init__(self, max_streams: int = DEFAULT_MAX_STREAMS): """ Initialize stream manager. Args: max_streams: Maximum number of concurrent streams """ self.max_streams = max_streams self._streams: Dict[str, StreamInfo] = {} self._subscriptions: Dict[str, StreamSubscription] = {} self._lock = None @property def streams(self) -> Dict[str, StreamInfo]: """Public access to streams dictionary.""" return self._streams @property def streams_lock(self): """Public access to streams lock.""" self._ensure_thread_safety() return self._lock def _ensure_thread_safety(self): """Initialize thread safety if not already done.""" if self._lock is None: import threading self._lock = threading.RLock() def _connection_state_callback(self, camera_id: str, connected: bool, error_msg: Optional[str] = None): """Callback for connection state changes.""" set_camera_connected(camera_id, connected, error_msg) def _create_stream_info(self, camera_id: str, rtsp_url: Optional[str] = None, snapshot_url: Optional[str] = None, snapshot_interval: Optional[int] = None) -> StreamInfo: """Create StreamInfo object based on stream type.""" if snapshot_url and snapshot_interval: return StreamInfo( camera_id=camera_id, stream_url=snapshot_url, stream_type="snapshot", snapshot_interval=snapshot_interval, buffer=queue.Queue(maxsize=SHARED_STREAM_BUFFER_SIZE), stop_event=threading.Event() ) elif rtsp_url: return StreamInfo( camera_id=camera_id, stream_url=rtsp_url, stream_type="rtsp", buffer=queue.Queue(maxsize=SHARED_STREAM_BUFFER_SIZE), stop_event=threading.Event() ) else: raise ValueError("Must provide either RTSP URL or snapshot URL with interval") def create_subscription(self, subscription_id: str, camera_id: str, subscriber_id: str, rtsp_url: Optional[str] = None, snapshot_url: Optional[str] = None, snapshot_interval: Optional[int] = None) -> bool: """ Create a stream subscription. Args: subscription_id: Unique subscription identifier camera_id: Camera identifier subscriber_id: Subscriber identifier rtsp_url: RTSP stream URL (for RTSP streams) snapshot_url: HTTP snapshot URL (for snapshot streams) snapshot_interval: Snapshot interval in milliseconds Returns: True if subscription was created successfully """ self._ensure_thread_safety() with self._lock: try: # Check if subscription already exists if subscription_id in self._subscriptions: logger.warning(f"Subscription {subscription_id} already exists") return False # Check stream limit if len(self._streams) >= self.max_streams and camera_id not in self._streams: logger.error(f"Maximum streams ({self.max_streams}) reached, cannot create new stream for camera {camera_id}") return False # Create or get existing stream if camera_id not in self._streams: stream_info = self._create_stream_info( camera_id, rtsp_url, snapshot_url, snapshot_interval ) self._streams[camera_id] = stream_info # Create and start frame reader thread thread = create_frame_reader_thread( camera_id=camera_id, rtsp_url=rtsp_url, snapshot_url=snapshot_url, snapshot_interval=snapshot_interval, buffer=stream_info.buffer, stop_event=stream_info.stop_event, connection_callback=self._connection_state_callback ) if thread: stream_info.thread = thread thread.start() logger.info(f"Created new {stream_info.stream_type} stream for camera {camera_id}") else: # Clean up failed stream del self._streams[camera_id] return False # Add subscriber to stream stream_info = self._streams[camera_id] stream_info.subscribers.add(subscription_id) # Create subscription record subscription = StreamSubscription( subscription_id=subscription_id, camera_id=camera_id, subscriber_id=subscriber_id ) self._subscriptions[subscription_id] = subscription logger.info(f"Created subscription {subscription_id} for camera {camera_id}, subscribers: {len(stream_info.subscribers)}") return True except Exception as e: logger.error(f"Error creating subscription {subscription_id}: {e}") return False def remove_subscription(self, subscription_id: str) -> bool: """ Remove a stream subscription. Args: subscription_id: Unique subscription identifier Returns: True if subscription was removed successfully """ self._ensure_thread_safety() with self._lock: if subscription_id not in self._subscriptions: logger.warning(f"Subscription {subscription_id} not found") return False subscription = self._subscriptions[subscription_id] camera_id = subscription.camera_id # Remove subscription del self._subscriptions[subscription_id] # Remove subscriber from stream if stream exists if camera_id in self._streams: stream_info = self._streams[camera_id] stream_info.subscribers.discard(subscription_id) logger.info(f"Removed subscription {subscription_id} for camera {camera_id}, remaining subscribers: {len(stream_info.subscribers)}") # Stop stream if no more subscribers if not stream_info.subscribers: self._stop_stream(camera_id) return True def _stop_stream(self, camera_id: str) -> None: """Stop a stream and clean up resources.""" if camera_id not in self._streams: return stream_info = self._streams[camera_id] # Signal thread to stop if stream_info.stop_event: stream_info.stop_event.set() # Wait for thread to finish if stream_info.thread and stream_info.thread.is_alive(): stream_info.thread.join(timeout=5) if stream_info.thread.is_alive(): logger.warning(f"Stream thread for camera {camera_id} did not stop gracefully") # Clean up del self._streams[camera_id] logger.info(f"Stopped {stream_info.stream_type} stream for camera {camera_id}") def get_frame(self, subscription_id: str, timeout: float = 0.1) -> Optional[Any]: """ Get the latest frame for a subscription. Args: subscription_id: Unique subscription identifier timeout: Timeout for frame retrieval in seconds Returns: Latest frame or None if not available """ self._ensure_thread_safety() with self._lock: if subscription_id not in self._subscriptions: return None subscription = self._subscriptions[subscription_id] camera_id = subscription.camera_id if camera_id not in self._streams: return None stream_info = self._streams[camera_id] subscription.last_access = time.time() try: frame = stream_info.buffer.get(timeout=timeout) stream_info.last_frame_time = time.time() stream_info.frame_count += 1 return frame except queue.Empty: return None except Exception as e: logger.error(f"Error getting frame for subscription {subscription_id}: {e}") return None def is_stream_active(self, camera_id: str) -> bool: """ Check if a stream is active. Args: camera_id: Camera identifier Returns: True if stream is active """ self._ensure_thread_safety() with self._lock: if camera_id not in self._streams: return False stream_info = self._streams[camera_id] return stream_info.thread is not None and stream_info.thread.is_alive() def get_stream_stats(self, camera_id: str) -> Optional[Dict[str, Any]]: """ Get statistics for a stream. Args: camera_id: Camera identifier Returns: Stream statistics or None if stream not found """ self._ensure_thread_safety() with self._lock: if camera_id not in self._streams: return None stream_info = self._streams[camera_id] current_time = time.time() stats = stream_info.to_dict() stats["uptime_seconds"] = current_time - stream_info.created_at if stream_info.last_frame_time: stats["seconds_since_last_frame"] = current_time - stream_info.last_frame_time return stats def get_subscription_info(self, subscription_id: str) -> Optional[Dict[str, Any]]: """ Get information about a subscription. Args: subscription_id: Unique subscription identifier Returns: Subscription information or None if not found """ self._ensure_thread_safety() with self._lock: if subscription_id not in self._subscriptions: return None return self._subscriptions[subscription_id].to_dict() def get_all_streams(self) -> Dict[str, Dict[str, Any]]: """ Get information about all active streams. Returns: Dictionary mapping camera IDs to stream information """ self._ensure_thread_safety() with self._lock: return { camera_id: stream_info.to_dict() for camera_id, stream_info in self._streams.items() } def get_all_subscriptions(self) -> Dict[str, Dict[str, Any]]: """ Get information about all active subscriptions. Returns: Dictionary mapping subscription IDs to subscription information """ self._ensure_thread_safety() with self._lock: return { sub_id: subscription.to_dict() for sub_id, subscription in self._subscriptions.items() } def cleanup_inactive_streams(self, inactive_threshold_seconds: int = 3600) -> int: """ Clean up streams that have been inactive for too long. Args: inactive_threshold_seconds: Seconds of inactivity before cleanup Returns: Number of streams cleaned up """ self._ensure_thread_safety() current_time = time.time() cleanup_count = 0 with self._lock: streams_to_remove = [] for camera_id, stream_info in self._streams.items(): # Check if stream has subscribers if stream_info.subscribers: continue # Check if stream has been inactive last_activity = max( stream_info.created_at, stream_info.last_frame_time or 0 ) if current_time - last_activity > inactive_threshold_seconds: streams_to_remove.append(camera_id) for camera_id in streams_to_remove: self._stop_stream(camera_id) cleanup_count += 1 logger.info(f"Cleaned up inactive stream for camera {camera_id}") if cleanup_count > 0: logger.info(f"Cleaned up {cleanup_count} inactive streams") return cleanup_count def restart_stream(self, camera_id: str) -> bool: """ Restart a stream. Args: camera_id: Camera identifier Returns: True if stream was restarted successfully """ self._ensure_thread_safety() with self._lock: if camera_id not in self._streams: logger.warning(f"Cannot restart stream for camera {camera_id}: stream not found") return False stream_info = self._streams[camera_id] subscribers = stream_info.subscribers.copy() stream_url = stream_info.stream_url stream_type = stream_info.stream_type snapshot_interval = stream_info.snapshot_interval # Stop current stream self._stop_stream(camera_id) # Recreate stream try: new_stream_info = self._create_stream_info( camera_id, rtsp_url=stream_url if stream_type == "rtsp" else None, snapshot_url=stream_url if stream_type == "snapshot" else None, snapshot_interval=snapshot_interval ) new_stream_info.subscribers = subscribers self._streams[camera_id] = new_stream_info # Create and start new frame reader thread thread = create_frame_reader_thread( camera_id=camera_id, rtsp_url=stream_url if stream_type == "rtsp" else None, snapshot_url=stream_url if stream_type == "snapshot" else None, snapshot_interval=snapshot_interval, buffer=new_stream_info.buffer, stop_event=new_stream_info.stop_event, connection_callback=self._connection_state_callback ) if thread: new_stream_info.thread = thread thread.start() logger.info(f"Restarted {stream_type} stream for camera {camera_id}") return True else: # Clean up failed restart del self._streams[camera_id] return False except Exception as e: logger.error(f"Error restarting stream for camera {camera_id}: {e}") return False def shutdown_all(self) -> None: """Shutdown all streams and clean up resources.""" self._ensure_thread_safety() with self._lock: logger.info("Shutting down all streams...") # Stop all streams camera_ids = list(self._streams.keys()) for camera_id in camera_ids: self._stop_stream(camera_id) # Clear all subscriptions self._subscriptions.clear() logger.info("All streams shut down successfully") # ===== WEBSOCKET HANDLER COMPATIBILITY METHODS ===== # These methods provide compatibility with the WebSocketHandler interface def get_active_streams(self) -> Dict[str, Any]: """ Get all active streams for WebSocket handler compatibility. Returns: Dictionary of active streams with their information """ self._ensure_thread_safety() with self._lock: active_streams = {} for camera_id, stream_info in self._streams.items(): if stream_info.thread and stream_info.thread.is_alive(): active_streams[camera_id] = { 'camera_id': camera_id, 'status': 'active', 'stream_type': stream_info.stream_type, 'subscribers': len([sub_id for sub_id, sub in self._subscriptions.items() if sub.camera_id == camera_id]), 'last_frame_time': getattr(stream_info, 'last_frame_time', None), 'error_count': getattr(stream_info, 'error_count', 0) } return active_streams def get_latest_frame(self, camera_id: str) -> Optional[Any]: """ Get the latest frame for a camera for WebSocket handler compatibility. Args: camera_id: Camera identifier Returns: Latest frame data or None if not available """ self._ensure_thread_safety() with self._lock: stream_info = self._streams.get(camera_id) if stream_info and hasattr(stream_info, 'latest_frame'): return stream_info.latest_frame return None async def cleanup_all_streams(self) -> None: """ Cleanup all streams asynchronously for WebSocket handler compatibility. """ # This is an async wrapper around shutdown_all for compatibility self.shutdown_all() async def start_stream(self, camera_id: str, payload: Dict[str, Any]) -> bool: """ Start a stream for WebSocket handler compatibility with enhanced validation. Args: camera_id: Camera identifier payload: Stream configuration payload Returns: True if stream started successfully, False otherwise """ try: # Validate inputs if not camera_id: logger.error(f"Invalid camera_id provided: {camera_id}") return False if not payload: logger.error(f"Empty payload provided for camera {camera_id}") return False # Create a subscription ID for this stream subscription_id = f"ws_{camera_id}_{int(time.time() * 1000)}" # Extract stream parameters from payload with validation rtsp_url = payload.get('rtspUrl') snapshot_url = payload.get('snapshotUrl') snapshot_interval = payload.get('snapshotInterval', 5000) # Log payload details for debugging logger.info(f"Starting stream for camera {camera_id} with payload: " f"rtspUrl={rtsp_url}, snapshotUrl={snapshot_url}, " f"snapshotInterval={snapshot_interval}") # Validate URLs if rtsp_url and not isinstance(rtsp_url, str): logger.error(f"Invalid rtspUrl type for camera {camera_id}: {type(rtsp_url)}") rtsp_url = None if snapshot_url and not isinstance(snapshot_url, str): logger.error(f"Invalid snapshotUrl type for camera {camera_id}: {type(snapshot_url)}") snapshot_url = None # Create a subscriber_id (for WebSocket compatibility, use the subscription_id) subscriber_id = f"websocket_{int(time.time() * 1000)}" # Create subscription based on available URL type if rtsp_url and rtsp_url.strip(): logger.info(f"Creating RTSP stream for camera {camera_id}: {rtsp_url}") success = self.create_subscription( subscription_id=subscription_id, camera_id=camera_id, subscriber_id=subscriber_id, rtsp_url=rtsp_url.strip() ) elif snapshot_url and snapshot_url.strip(): logger.info(f"Creating snapshot stream for camera {camera_id}: {snapshot_url}") success = self.create_subscription( subscription_id=subscription_id, camera_id=camera_id, subscriber_id=subscriber_id, snapshot_url=snapshot_url.strip(), snapshot_interval=snapshot_interval ) else: logger.error(f"No valid stream URL provided for camera {camera_id}. " f"rtspUrl='{rtsp_url}', snapshotUrl='{snapshot_url}'. " f"Payload keys: {list(payload.keys())}") return False if success: logger.info(f"Started stream for camera {camera_id} with subscription {subscription_id}") return True else: logger.error(f"Failed to start stream for camera {camera_id}") return False except Exception as e: logger.error(f"Error starting stream for camera {camera_id}: {e}") import traceback traceback.print_exc() return False async def stop_stream(self, camera_id: str) -> bool: """ Stop a stream for WebSocket handler compatibility. Args: camera_id: Camera identifier Returns: True if stream stopped successfully, False otherwise """ try: # Find and remove subscriptions for this camera subscriptions_to_remove = [ sub_id for sub_id, sub in self._subscriptions.items() if sub.camera_id == camera_id ] success = True for sub_id in subscriptions_to_remove: if not self.remove_subscription(sub_id): success = False if success and subscriptions_to_remove: logger.info(f"Stopped stream for camera {camera_id}") return True elif not subscriptions_to_remove: logger.warning(f"No active subscriptions found for camera {camera_id}") return True else: logger.error(f"Failed to stop some subscriptions for camera {camera_id}") return False except Exception as e: logger.error(f"Error stopping stream for camera {camera_id}: {e}") return False # Global stream manager instance stream_manager = StreamManager() # ===== CONVENIENCE FUNCTIONS ===== # These provide a simplified interface for common operations def create_stream_subscription(subscription_id: str, camera_id: str, subscriber_id: str, rtsp_url: Optional[str] = None, snapshot_url: Optional[str] = None, snapshot_interval: Optional[int] = None) -> bool: """Create a stream subscription using global stream manager.""" return stream_manager.create_subscription( subscription_id, camera_id, subscriber_id, rtsp_url, snapshot_url, snapshot_interval ) def remove_stream_subscription(subscription_id: str) -> bool: """Remove a stream subscription using global stream manager.""" return stream_manager.remove_subscription(subscription_id) def get_stream_frame(subscription_id: str, timeout: float = 0.1) -> Optional[Any]: """Get the latest frame for a subscription using global stream manager.""" return stream_manager.get_frame(subscription_id, timeout) def is_stream_active(camera_id: str) -> bool: """Check if a stream is active using global stream manager.""" return stream_manager.is_stream_active(camera_id) def get_stream_statistics() -> Dict[str, Any]: """Get comprehensive stream statistics.""" return { "streams": stream_manager.get_all_streams(), "subscriptions": stream_manager.get_all_subscriptions(), "total_streams": len(stream_manager._streams), "total_subscriptions": len(stream_manager._subscriptions), "max_streams": stream_manager.max_streams }