""" Stream coordination and lifecycle management. Optimized for 1280x720@6fps RTSP and 2560x1440 HTTP snapshots. """ import logging import threading import time from typing import Dict, Set, Optional, List, Any from dataclasses import dataclass from collections import defaultdict from .readers import RTSPReader, HTTPSnapshotReader from .buffers import shared_cache_buffer from ..tracking.integration import TrackingPipelineIntegration logger = logging.getLogger(__name__) @dataclass class StreamConfig: """Configuration for a stream.""" camera_id: str rtsp_url: Optional[str] = None snapshot_url: Optional[str] = None snapshot_interval: int = 5000 # milliseconds max_retries: int = 3 @dataclass class SubscriptionInfo: """Information about a subscription.""" subscription_id: str camera_id: str stream_config: StreamConfig created_at: float crop_coords: Optional[tuple] = None model_id: Optional[str] = None model_url: Optional[str] = None tracking_integration: Optional[TrackingPipelineIntegration] = None class StreamManager: """Manages multiple camera streams with shared optimization.""" def __init__(self, max_streams: int = 10): self.max_streams = max_streams self._streams: Dict[str, Any] = {} # camera_id -> reader instance self._subscriptions: Dict[str, SubscriptionInfo] = {} # subscription_id -> info self._camera_subscribers: Dict[str, Set[str]] = defaultdict(set) # camera_id -> set of subscription_ids self._lock = threading.RLock() def add_subscription(self, subscription_id: str, stream_config: StreamConfig, crop_coords: Optional[tuple] = None, model_id: Optional[str] = None, model_url: Optional[str] = None, tracking_integration: Optional[TrackingPipelineIntegration] = None) -> bool: """Add a new subscription. Returns True if successful.""" with self._lock: if subscription_id in self._subscriptions: logger.warning(f"Subscription {subscription_id} already exists") return False camera_id = stream_config.camera_id # Create subscription info subscription_info = SubscriptionInfo( subscription_id=subscription_id, camera_id=camera_id, stream_config=stream_config, created_at=time.time(), crop_coords=crop_coords, model_id=model_id, model_url=model_url, tracking_integration=tracking_integration ) # Pass subscription info to tracking integration for snapshot access if tracking_integration: tracking_integration.set_subscription_info(subscription_info) self._subscriptions[subscription_id] = subscription_info self._camera_subscribers[camera_id].add(subscription_id) # Start stream if not already running if camera_id not in self._streams: if len(self._streams) >= self.max_streams: logger.error(f"Maximum streams ({self.max_streams}) reached, cannot add {camera_id}") self._remove_subscription_internal(subscription_id) return False success = self._start_stream(camera_id, stream_config) if not success: self._remove_subscription_internal(subscription_id) return False logger.info(f"Added subscription {subscription_id} for camera {camera_id} " f"({len(self._camera_subscribers[camera_id])} total subscribers)") return True def remove_subscription(self, subscription_id: str) -> bool: """Remove a subscription. Returns True if found and removed.""" with self._lock: return self._remove_subscription_internal(subscription_id) def _remove_subscription_internal(self, subscription_id: str) -> bool: """Internal method to remove subscription (assumes lock is held).""" if subscription_id not in self._subscriptions: logger.warning(f"Subscription {subscription_id} not found") return False subscription_info = self._subscriptions[subscription_id] camera_id = subscription_info.camera_id # Remove from tracking del self._subscriptions[subscription_id] self._camera_subscribers[camera_id].discard(subscription_id) # Stop stream if no more subscribers if not self._camera_subscribers[camera_id]: self._stop_stream(camera_id) del self._camera_subscribers[camera_id] logger.info(f"Removed subscription {subscription_id} for camera {camera_id} " f"({len(self._camera_subscribers[camera_id])} remaining subscribers)") return True def _start_stream(self, camera_id: str, stream_config: StreamConfig) -> bool: """Start a stream for the given camera.""" try: if stream_config.rtsp_url: # RTSP stream reader = RTSPReader( camera_id=camera_id, rtsp_url=stream_config.rtsp_url, max_retries=stream_config.max_retries ) reader.set_frame_callback(self._frame_callback) reader.start() self._streams[camera_id] = reader logger.info(f"Started RTSP stream for camera {camera_id}") elif stream_config.snapshot_url: # HTTP snapshot stream reader = HTTPSnapshotReader( camera_id=camera_id, snapshot_url=stream_config.snapshot_url, interval_ms=stream_config.snapshot_interval, max_retries=stream_config.max_retries ) reader.set_frame_callback(self._frame_callback) reader.start() self._streams[camera_id] = reader logger.info(f"Started HTTP snapshot stream for camera {camera_id}") else: logger.error(f"No valid URL provided for camera {camera_id}") return False return True except Exception as e: logger.error(f"Error starting stream for camera {camera_id}: {e}") return False def _stop_stream(self, camera_id: str): """Stop a stream for the given camera.""" if camera_id in self._streams: try: self._streams[camera_id].stop() del self._streams[camera_id] shared_cache_buffer.clear_camera(camera_id) logger.info(f"Stopped stream for camera {camera_id}") except Exception as e: logger.error(f"Error stopping stream for camera {camera_id}: {e}") def _frame_callback(self, camera_id: str, frame): """Callback for when a new frame is available.""" try: # Store frame in shared buffer shared_cache_buffer.put_frame(camera_id, frame) # Process tracking for subscriptions with tracking integration self._process_tracking_for_camera(camera_id, frame) except Exception as e: logger.error(f"Error in frame callback for camera {camera_id}: {e}") def _process_tracking_for_camera(self, camera_id: str, frame): """Process tracking for all subscriptions of a camera.""" try: with self._lock: for subscription_id in self._camera_subscribers[camera_id]: subscription_info = self._subscriptions[subscription_id] # Skip if no tracking integration if not subscription_info.tracking_integration: continue # Extract display_id from subscription_id display_id = subscription_id.split(';')[0] if ';' in subscription_id else subscription_id # Process frame through tracking asynchronously # Note: This is synchronous for now, can be made async in future try: # Create a simple asyncio event loop for this frame import asyncio loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: result = loop.run_until_complete( subscription_info.tracking_integration.process_frame( frame, display_id, subscription_id ) ) # Log tracking results if result: tracked_count = len(result.get('tracked_vehicles', [])) validated_vehicle = result.get('validated_vehicle') pipeline_result = result.get('pipeline_result') if tracked_count > 0: logger.info(f"[Tracking] {camera_id}: {tracked_count} vehicles tracked") if validated_vehicle: logger.info(f"[Tracking] {camera_id}: Vehicle {validated_vehicle['track_id']} " f"validated as {validated_vehicle['state']} " f"(confidence: {validated_vehicle['confidence']:.2f})") if pipeline_result: logger.info(f"[Pipeline] {camera_id}: {pipeline_result.get('status', 'unknown')} - " f"{pipeline_result.get('message', 'no message')}") finally: loop.close() except Exception as track_e: logger.error(f"Error in tracking for {subscription_id}: {track_e}") except Exception as e: logger.error(f"Error processing tracking for camera {camera_id}: {e}") def get_frame(self, camera_id: str, crop_coords: Optional[tuple] = None): """Get the latest frame for a camera with optional cropping.""" return shared_cache_buffer.get_frame(camera_id, crop_coords) def get_frame_as_jpeg(self, camera_id: str, crop_coords: Optional[tuple] = None, quality: int = 100) -> Optional[bytes]: """Get frame as JPEG bytes for HTTP responses with highest quality by default.""" return shared_cache_buffer.get_frame_as_jpeg(camera_id, crop_coords, quality) def has_frame(self, camera_id: str) -> bool: """Check if a frame is available for the camera.""" return shared_cache_buffer.has_frame(camera_id) def get_subscription_info(self, subscription_id: str) -> Optional[SubscriptionInfo]: """Get information about a subscription.""" with self._lock: return self._subscriptions.get(subscription_id) def get_camera_subscribers(self, camera_id: str) -> Set[str]: """Get all subscription IDs for a camera.""" with self._lock: return self._camera_subscribers[camera_id].copy() def get_active_cameras(self) -> List[str]: """Get list of cameras with active streams.""" with self._lock: return list(self._streams.keys()) def get_all_subscriptions(self) -> List[SubscriptionInfo]: """Get all active subscriptions.""" with self._lock: return list(self._subscriptions.values()) def reconcile_subscriptions(self, target_subscriptions: List[Dict[str, Any]]) -> Dict[str, Any]: """ Reconcile current subscriptions with target list. Returns summary of changes made. """ with self._lock: current_subscription_ids = set(self._subscriptions.keys()) target_subscription_ids = {sub['subscriptionIdentifier'] for sub in target_subscriptions} # Find subscriptions to remove and add to_remove = current_subscription_ids - target_subscription_ids to_add = target_subscription_ids - current_subscription_ids # Remove old subscriptions removed_count = 0 for subscription_id in to_remove: if self._remove_subscription_internal(subscription_id): removed_count += 1 # Add new subscriptions added_count = 0 failed_count = 0 for target_sub in target_subscriptions: subscription_id = target_sub['subscriptionIdentifier'] if subscription_id in to_add: success = self._add_subscription_from_payload(subscription_id, target_sub) if success: added_count += 1 else: failed_count += 1 result = { 'removed': removed_count, 'added': added_count, 'failed': failed_count, 'total_active': len(self._subscriptions), 'active_streams': len(self._streams) } logger.info(f"Subscription reconciliation: {result}") return result def _add_subscription_from_payload(self, subscription_id: str, payload: Dict[str, Any]) -> bool: """Add subscription from WebSocket payload format.""" try: # Extract camera ID from subscription identifier # Format: "display-001;cam-001" -> camera_id = "cam-001" camera_id = subscription_id.split(';')[-1] # Extract crop coordinates if present crop_coords = None if all(key in payload for key in ['cropX1', 'cropY1', 'cropX2', 'cropY2']): crop_coords = ( payload['cropX1'], payload['cropY1'], payload['cropX2'], payload['cropY2'] ) # Create stream configuration stream_config = StreamConfig( camera_id=camera_id, rtsp_url=payload.get('rtspUrl'), snapshot_url=payload.get('snapshotUrl'), snapshot_interval=payload.get('snapshotInterval', 5000), max_retries=3, ) return self.add_subscription( subscription_id, stream_config, crop_coords, model_id=payload.get('modelId'), model_url=payload.get('modelUrl') ) except Exception as e: logger.error(f"Error adding subscription from payload {subscription_id}: {e}") return False def stop_all(self): """Stop all streams and clear all subscriptions.""" with self._lock: # Stop all streams for camera_id in list(self._streams.keys()): self._stop_stream(camera_id) # Clear all tracking self._subscriptions.clear() self._camera_subscribers.clear() shared_cache_buffer.clear_all() logger.info("Stopped all streams and cleared all subscriptions") def set_session_id(self, display_id: str, session_id: str): """Set session ID for tracking integration.""" with self._lock: for subscription_info in self._subscriptions.values(): # Check if this subscription matches the display_id subscription_display_id = subscription_info.subscription_id.split(';')[0] if subscription_display_id == display_id and subscription_info.tracking_integration: subscription_info.tracking_integration.set_session_id(display_id, session_id) logger.debug(f"Set session {session_id} for display {display_id}") def clear_session_id(self, session_id: str): """Clear session ID from tracking integrations.""" with self._lock: for subscription_info in self._subscriptions.values(): if subscription_info.tracking_integration: subscription_info.tracking_integration.clear_session_id(session_id) logger.debug(f"Cleared session {session_id}") def set_progression_stage(self, session_id: str, stage: str): """Set progression stage for tracking integrations.""" with self._lock: for subscription_info in self._subscriptions.values(): if subscription_info.tracking_integration: subscription_info.tracking_integration.set_progression_stage(session_id, stage) logger.debug(f"Set progression stage for session {session_id}: {stage}") def get_tracking_stats(self) -> Dict[str, Any]: """Get tracking statistics from all subscriptions.""" stats = {} with self._lock: for subscription_id, subscription_info in self._subscriptions.items(): if subscription_info.tracking_integration: stats[subscription_id] = subscription_info.tracking_integration.get_statistics() return stats def get_stats(self) -> Dict[str, Any]: """Get comprehensive streaming statistics.""" with self._lock: buffer_stats = shared_cache_buffer.get_stats() tracking_stats = self.get_tracking_stats() return { 'active_subscriptions': len(self._subscriptions), 'active_streams': len(self._streams), 'cameras_with_subscribers': len(self._camera_subscribers), 'max_streams': self.max_streams, 'subscriptions_by_camera': { camera_id: len(subscribers) for camera_id, subscribers in self._camera_subscribers.items() }, 'buffer_stats': buffer_stats, 'tracking_stats': tracking_stats, 'memory_usage_mb': buffer_stats.get('total_memory_mb', 0) } # Global shared instance for application use # Default initialization, will be updated with config value in app.py shared_stream_manager = StreamManager(max_streams=20) def initialize_stream_manager(max_streams: int = 10): """Re-initialize the global stream manager with config value.""" global shared_stream_manager # Release old manager if exists if shared_stream_manager: try: # Stop all existing streams gracefully shared_stream_manager.stop_all() except Exception as e: logger.warning(f"Error stopping previous stream manager: {e}") shared_stream_manager = StreamManager(max_streams=max_streams) return shared_stream_manager