""" Stream coordination and lifecycle management. Optimized for 1280x720@6fps RTSP and 2560x1440 HTTP snapshots. """ import logging import threading import time import queue import asyncio from typing import Dict, Set, Optional, List, Any from dataclasses import dataclass from collections import defaultdict from .readers import HTTPSnapshotReader, FFmpegRTSPReader from .buffers import shared_cache_buffer from ..tracking.integration import TrackingPipelineIntegration logger = logging.getLogger(__name__) @dataclass class StreamConfig: """Configuration for a stream.""" camera_id: str rtsp_url: Optional[str] = None snapshot_url: Optional[str] = None snapshot_interval: int = 5000 # milliseconds max_retries: int = 3 @dataclass class SubscriptionInfo: """Information about a subscription.""" subscription_id: str camera_id: str stream_config: StreamConfig created_at: float crop_coords: Optional[tuple] = None model_id: Optional[str] = None model_url: Optional[str] = None tracking_integration: Optional[TrackingPipelineIntegration] = None class StreamManager: """Manages multiple camera streams with shared optimization.""" def __init__(self, max_streams: int = 10): self.max_streams = max_streams self._streams: Dict[str, Any] = {} # camera_id -> reader instance self._subscriptions: Dict[str, SubscriptionInfo] = {} # subscription_id -> info self._camera_subscribers: Dict[str, Set[str]] = defaultdict(set) # camera_id -> set of subscription_ids self._lock = threading.RLock() # Fair tracking queue system - per camera queues self._tracking_queues: Dict[str, queue.Queue] = {} # camera_id -> queue self._tracking_workers = [] self._stop_workers = threading.Event() self._dropped_frame_counts: Dict[str, int] = {} # per-camera drop counts # Round-robin scheduling state self._camera_list = [] # Ordered list of active cameras self._camera_round_robin_index = 0 self._round_robin_lock = threading.Lock() # Start worker threads for tracking processing num_workers = min(4, max_streams // 2 + 1) # Scale with streams for i in range(num_workers): worker = threading.Thread( target=self._tracking_worker_loop, name=f"TrackingWorker-{i}", daemon=True ) worker.start() self._tracking_workers.append(worker) logger.info(f"Started {num_workers} tracking worker threads") def _ensure_camera_queue(self, camera_id: str): """Ensure a tracking queue exists for the camera.""" if camera_id not in self._tracking_queues: self._tracking_queues[camera_id] = queue.Queue(maxsize=10) # 10 frames per camera self._dropped_frame_counts[camera_id] = 0 with self._round_robin_lock: if camera_id not in self._camera_list: self._camera_list.append(camera_id) logger.info(f"✅ Created tracking queue for camera {camera_id}, camera_list now has {len(self._camera_list)} cameras: {self._camera_list}") else: logger.warning(f"Camera {camera_id} already in camera_list") else: logger.debug(f"Camera {camera_id} already has tracking queue") def _remove_camera_queue(self, camera_id: str): """Remove tracking queue for a camera that's no longer active.""" if camera_id in self._tracking_queues: # Clear any remaining items while not self._tracking_queues[camera_id].empty(): try: self._tracking_queues[camera_id].get_nowait() except queue.Empty: break del self._tracking_queues[camera_id] del self._dropped_frame_counts[camera_id] with self._round_robin_lock: if camera_id in self._camera_list: self._camera_list.remove(camera_id) # Reset index if needed if self._camera_round_robin_index >= len(self._camera_list): self._camera_round_robin_index = 0 logger.info(f"Removed tracking queue for camera {camera_id}") def add_subscription(self, subscription_id: str, stream_config: StreamConfig, crop_coords: Optional[tuple] = None, model_id: Optional[str] = None, model_url: Optional[str] = None, tracking_integration: Optional[TrackingPipelineIntegration] = None) -> bool: """Add a new subscription. Returns True if successful.""" with self._lock: if subscription_id in self._subscriptions: logger.warning(f"Subscription {subscription_id} already exists") return False camera_id = stream_config.camera_id # Create subscription info subscription_info = SubscriptionInfo( subscription_id=subscription_id, camera_id=camera_id, stream_config=stream_config, created_at=time.time(), crop_coords=crop_coords, model_id=model_id, model_url=model_url, tracking_integration=tracking_integration ) # Pass subscription info to tracking integration for snapshot access if tracking_integration: tracking_integration.set_subscription_info(subscription_info) self._subscriptions[subscription_id] = subscription_info self._camera_subscribers[camera_id].add(subscription_id) # Start stream if not already running if camera_id not in self._streams: if len(self._streams) >= self.max_streams: logger.error(f"Maximum streams ({self.max_streams}) reached, cannot add {camera_id}") self._remove_subscription_internal(subscription_id) return False success = self._start_stream(camera_id, stream_config) if not success: self._remove_subscription_internal(subscription_id) return False else: # Stream already exists, but ensure queue exists too logger.info(f"Stream already exists for {camera_id}, ensuring queue exists") self._ensure_camera_queue(camera_id) logger.info(f"Added subscription {subscription_id} for camera {camera_id} " f"({len(self._camera_subscribers[camera_id])} total subscribers)") return True def remove_subscription(self, subscription_id: str) -> bool: """Remove a subscription. Returns True if found and removed.""" with self._lock: return self._remove_subscription_internal(subscription_id) def _remove_subscription_internal(self, subscription_id: str) -> bool: """Internal method to remove subscription (assumes lock is held).""" if subscription_id not in self._subscriptions: logger.warning(f"Subscription {subscription_id} not found") return False subscription_info = self._subscriptions[subscription_id] camera_id = subscription_info.camera_id # Remove from tracking del self._subscriptions[subscription_id] self._camera_subscribers[camera_id].discard(subscription_id) # Stop stream if no more subscribers if not self._camera_subscribers[camera_id]: self._stop_stream(camera_id) del self._camera_subscribers[camera_id] logger.info(f"Removed subscription {subscription_id} for camera {camera_id} " f"({len(self._camera_subscribers[camera_id])} remaining subscribers)") return True def _start_stream(self, camera_id: str, stream_config: StreamConfig) -> bool: """Start a stream for the given camera.""" try: logger.info(f"🚀 _start_stream called for {camera_id}") if stream_config.rtsp_url: # RTSP stream using FFmpeg subprocess with CUDA acceleration logger.info(f"\033[94m[RTSP] Starting {camera_id}\033[0m") reader = FFmpegRTSPReader( camera_id=camera_id, rtsp_url=stream_config.rtsp_url, max_retries=stream_config.max_retries ) reader.set_frame_callback(self._frame_callback) reader.start() self._streams[camera_id] = reader logger.info(f"🎬 About to call _ensure_camera_queue for {camera_id}") self._ensure_camera_queue(camera_id) # Create tracking queue logger.info(f"✅ _ensure_camera_queue completed for {camera_id}") logger.info(f"\033[92m[RTSP] {camera_id} connected\033[0m") elif stream_config.snapshot_url: # HTTP snapshot stream logger.info(f"\033[95m[HTTP] Starting {camera_id}\033[0m") reader = HTTPSnapshotReader( camera_id=camera_id, snapshot_url=stream_config.snapshot_url, interval_ms=stream_config.snapshot_interval, max_retries=stream_config.max_retries ) reader.set_frame_callback(self._frame_callback) reader.start() self._streams[camera_id] = reader logger.info(f"🎬 About to call _ensure_camera_queue for {camera_id}") self._ensure_camera_queue(camera_id) # Create tracking queue logger.info(f"✅ _ensure_camera_queue completed for {camera_id}") logger.info(f"\033[92m[HTTP] {camera_id} connected\033[0m") else: logger.error(f"No valid URL provided for camera {camera_id}") return False return True except Exception as e: logger.error(f"Error starting stream for camera {camera_id}: {e}") return False def _stop_stream(self, camera_id: str): """Stop a stream for the given camera.""" if camera_id in self._streams: try: self._streams[camera_id].stop() del self._streams[camera_id] self._remove_camera_queue(camera_id) # Remove tracking queue # DON'T clear frames - they should persist until replaced # shared_cache_buffer.clear_camera(camera_id) # REMOVED - frames should persist logger.info(f"Stopped stream for camera {camera_id} (frames preserved in buffer)") except Exception as e: logger.error(f"Error stopping stream for camera {camera_id}: {e}") def _frame_callback(self, camera_id: str, frame): """Callback for when a new frame is available.""" try: # Store frame in shared buffer shared_cache_buffer.put_frame(camera_id, frame) # Quieter frame callback logging - only log occasionally if hasattr(self, '_frame_log_count'): self._frame_log_count += 1 else: self._frame_log_count = 1 # Log every 100 frames to avoid spam if self._frame_log_count % 100 == 0: available_cameras = shared_cache_buffer.frame_buffer.get_camera_list() logger.info(f"\033[96m[BUFFER] {len(available_cameras)} active cameras: {', '.join(available_cameras)}\033[0m") # Queue for tracking processing (non-blocking) - route to camera-specific queue if camera_id in self._tracking_queues: try: self._tracking_queues[camera_id].put_nowait({ 'frame': frame, 'timestamp': time.time() }) except queue.Full: # Drop frame if camera queue is full (maintain real-time) self._dropped_frame_counts[camera_id] += 1 if self._dropped_frame_counts[camera_id] % 50 == 0: logger.warning(f"Dropped {self._dropped_frame_counts[camera_id]} frames for camera {camera_id} due to full queue") except Exception as e: logger.error(f"Error in frame callback for camera {camera_id}: {e}") def _process_tracking_for_camera(self, camera_id: str, frame): """Process tracking for all subscriptions of a camera.""" try: with self._lock: for subscription_id in self._camera_subscribers[camera_id]: subscription_info = self._subscriptions[subscription_id] # Skip if no tracking integration if not subscription_info.tracking_integration: continue # Extract display_id from subscription_id display_id = subscription_id.split(';')[0] if ';' in subscription_id else subscription_id # Process frame through tracking asynchronously # Note: This is synchronous for now, can be made async in future try: # Create a simple asyncio event loop for this frame import asyncio loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: result = loop.run_until_complete( subscription_info.tracking_integration.process_frame( frame, display_id, subscription_id ) ) # Log tracking results if result: tracked_count = len(result.get('tracked_vehicles', [])) validated_vehicle = result.get('validated_vehicle') pipeline_result = result.get('pipeline_result') if tracked_count > 0: logger.info(f"[Tracking] {camera_id}: {tracked_count} vehicles tracked") if validated_vehicle: logger.info(f"[Tracking] {camera_id}: Vehicle {validated_vehicle['track_id']} " f"validated as {validated_vehicle['state']} " f"(confidence: {validated_vehicle['confidence']:.2f})") if pipeline_result: logger.info(f"[Pipeline] {camera_id}: {pipeline_result.get('status', 'unknown')} - " f"{pipeline_result.get('message', 'no message')}") finally: loop.close() except Exception as track_e: logger.error(f"Error in tracking for {subscription_id}: {track_e}") except Exception as e: logger.error(f"Error processing tracking for camera {camera_id}: {e}") def _tracking_worker_loop(self): """Worker thread loop for round-robin processing of camera queues.""" logger.info(f"Tracking worker {threading.current_thread().name} started") consecutive_empty = 0 max_consecutive_empty = 10 # Sleep if all cameras empty this many times while not self._stop_workers.is_set(): try: logger.debug(f"Worker {threading.current_thread().name} loop iteration, stop_event={self._stop_workers.is_set()}") # Get next camera in round-robin fashion camera_id, item = self._get_next_camera_item() if camera_id is None: # No cameras have items, sleep briefly consecutive_empty += 1 logger.debug(f"Worker {threading.current_thread().name}: All queues empty ({consecutive_empty}/{max_consecutive_empty})") if consecutive_empty >= max_consecutive_empty: time.sleep(0.1) # Sleep 100ms if nothing to process consecutive_empty = 0 continue consecutive_empty = 0 # Reset counter when we find work logger.info(f"Worker {threading.current_thread().name}: Processing frame from {camera_id}") frame = item['frame'] timestamp = item['timestamp'] # Check if frame is too old (drop if > 1 second old) age = time.time() - timestamp if age > 1.0: logger.warning(f"Dropping old frame for {camera_id} (age: {age:.2f}s)") continue logger.info(f"Worker {threading.current_thread().name}: Calling tracking sync for {camera_id}") # Process tracking for this camera's frame self._process_tracking_for_camera_sync(camera_id, frame) logger.info(f"Worker {threading.current_thread().name}: Finished tracking sync for {camera_id}") except Exception as e: logger.error(f"Error in tracking worker: {e}", exc_info=True) logger.info(f"Tracking worker {threading.current_thread().name} stopped") def _get_next_camera_item(self): """Get next item from camera queues using round-robin scheduling.""" with self._round_robin_lock: # Get current list of cameras from actual tracking queues (central state) camera_list = list(self._tracking_queues.keys()) # Debug: show ALL state logger.info(f"🔍 _tracking_queues keys: {list(self._tracking_queues.keys())}") logger.info(f"🔍 _streams keys: {list(self._streams.keys())}") logger.info(f"🔍 _subscriptions keys: {list(self._subscriptions.keys())}") if not camera_list: logger.warning("⚠️ _get_next_camera_item: No cameras have tracking queues yet, but streams/subscriptions exist!") return None, None logger.debug(f"_get_next_camera_item: {len(camera_list)} cameras with queues: {camera_list}") attempts = 0 max_attempts = len(camera_list) while attempts < max_attempts: # Get current camera using round-robin index if self._camera_round_robin_index >= len(camera_list): self._camera_round_robin_index = 0 camera_id = camera_list[self._camera_round_robin_index] logger.debug(f"_get_next_camera_item: Trying camera {camera_id} (attempt {attempts + 1}/{max_attempts})") # Move to next camera for next call self._camera_round_robin_index = (self._camera_round_robin_index + 1) % len(camera_list) # Try to get item from this camera's queue queue_size = self._tracking_queues[camera_id].qsize() logger.debug(f"_get_next_camera_item: Camera {camera_id} queue has {queue_size} items") try: item = self._tracking_queues[camera_id].get_nowait() logger.info(f"_get_next_camera_item: Got item from {camera_id}") return camera_id, item except queue.Empty: logger.debug(f"_get_next_camera_item: Camera {camera_id} queue empty") pass # Try next camera attempts += 1 logger.debug("_get_next_camera_item: All cameras empty") return None, None # All cameras empty def _process_tracking_for_camera_sync(self, camera_id: str, frame): """Synchronous version of tracking processing for worker threads.""" try: with self._lock: subscription_ids = list(self._camera_subscribers.get(camera_id, [])) for subscription_id in subscription_ids: subscription_info = self._subscriptions.get(subscription_id) if not subscription_info: logger.warning(f"No subscription info found for {subscription_id}") continue if not subscription_info.tracking_integration: logger.debug(f"No tracking integration for {subscription_id} (camera {camera_id}), skipping inference") continue display_id = subscription_id.split(';')[0] if ';' in subscription_id else subscription_id try: # Run async tracking in thread's event loop loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: result = loop.run_until_complete( subscription_info.tracking_integration.process_frame( frame, display_id, subscription_id ) ) # Log tracking results if result: tracked_count = len(result.get('tracked_vehicles', [])) validated_vehicle = result.get('validated_vehicle') pipeline_result = result.get('pipeline_result') if tracked_count > 0: logger.info(f"[Tracking] {camera_id}: {tracked_count} vehicles tracked") if validated_vehicle: logger.info(f"[Tracking] {camera_id}: Vehicle {validated_vehicle['track_id']} " f"validated as {validated_vehicle['state']} " f"(confidence: {validated_vehicle['confidence']:.2f})") if pipeline_result: logger.info(f"[Pipeline] {camera_id}: {pipeline_result.get('status', 'unknown')} - " f"{pipeline_result.get('message', 'no message')}") finally: loop.close() except Exception as track_e: logger.error(f"Error in tracking for {subscription_id}: {track_e}") except Exception as e: logger.error(f"Error processing tracking for camera {camera_id}: {e}") def get_frame(self, camera_id: str, crop_coords: Optional[tuple] = None): """Get the latest frame for a camera with optional cropping.""" return shared_cache_buffer.get_frame(camera_id, crop_coords) def get_frame_as_jpeg(self, camera_id: str, crop_coords: Optional[tuple] = None, quality: int = 100) -> Optional[bytes]: """Get frame as JPEG bytes for HTTP responses with highest quality by default.""" return shared_cache_buffer.get_frame_as_jpeg(camera_id, crop_coords, quality) def has_frame(self, camera_id: str) -> bool: """Check if a frame is available for the camera.""" return shared_cache_buffer.has_frame(camera_id) def get_subscription_info(self, subscription_id: str) -> Optional[SubscriptionInfo]: """Get information about a subscription.""" with self._lock: return self._subscriptions.get(subscription_id) def get_camera_subscribers(self, camera_id: str) -> Set[str]: """Get all subscription IDs for a camera.""" with self._lock: return self._camera_subscribers[camera_id].copy() def get_active_cameras(self) -> List[str]: """Get list of cameras with active streams.""" with self._lock: return list(self._streams.keys()) def get_all_subscriptions(self) -> List[SubscriptionInfo]: """Get all active subscriptions.""" with self._lock: return list(self._subscriptions.values()) def reconcile_subscriptions(self, target_subscriptions: List[Dict[str, Any]]) -> Dict[str, Any]: """ Reconcile current subscriptions with target list. Returns summary of changes made. """ with self._lock: current_subscription_ids = set(self._subscriptions.keys()) target_subscription_ids = {sub['subscriptionIdentifier'] for sub in target_subscriptions} # Find subscriptions to remove and add to_remove = current_subscription_ids - target_subscription_ids to_add = target_subscription_ids - current_subscription_ids # Remove old subscriptions removed_count = 0 for subscription_id in to_remove: if self._remove_subscription_internal(subscription_id): removed_count += 1 # Add new subscriptions added_count = 0 failed_count = 0 for target_sub in target_subscriptions: subscription_id = target_sub['subscriptionIdentifier'] if subscription_id in to_add: success = self._add_subscription_from_payload(subscription_id, target_sub) if success: added_count += 1 else: failed_count += 1 result = { 'removed': removed_count, 'added': added_count, 'failed': failed_count, 'total_active': len(self._subscriptions), 'active_streams': len(self._streams) } logger.info(f"Subscription reconciliation: {result}") return result def _add_subscription_from_payload(self, subscription_id: str, payload: Dict[str, Any]) -> bool: """Add subscription from WebSocket payload format.""" try: # Extract camera ID from subscription identifier # Format: "display-001;cam-001" -> camera_id = "cam-001" camera_id = subscription_id.split(';')[-1] # Extract crop coordinates if present crop_coords = None if all(key in payload for key in ['cropX1', 'cropY1', 'cropX2', 'cropY2']): crop_coords = ( payload['cropX1'], payload['cropY1'], payload['cropX2'], payload['cropY2'] ) # Create stream configuration stream_config = StreamConfig( camera_id=camera_id, rtsp_url=payload.get('rtspUrl'), snapshot_url=payload.get('snapshotUrl'), snapshot_interval=payload.get('snapshotInterval', 5000), max_retries=3, ) return self.add_subscription( subscription_id, stream_config, crop_coords, model_id=payload.get('modelId'), model_url=payload.get('modelUrl') ) except Exception as e: logger.error(f"Error adding subscription from payload {subscription_id}: {e}") return False def stop_all(self): """Stop all streams and clear all subscriptions.""" # Signal workers to stop self._stop_workers.set() # Clear all camera queues for camera_id, camera_queue in list(self._tracking_queues.items()): while not camera_queue.empty(): try: camera_queue.get_nowait() except queue.Empty: break # Wait for workers to finish for worker in self._tracking_workers: worker.join(timeout=2.0) # Clear queue management structures self._tracking_queues.clear() self._dropped_frame_counts.clear() with self._round_robin_lock: self._camera_list.clear() self._camera_round_robin_index = 0 logger.info("Stopped all tracking worker threads") with self._lock: # Stop all streams for camera_id in list(self._streams.keys()): self._stop_stream(camera_id) # Clear all tracking self._subscriptions.clear() self._camera_subscribers.clear() shared_cache_buffer.clear_all() logger.info("Stopped all streams and cleared all subscriptions") def set_session_id(self, display_id: str, session_id: str): """Set session ID for tracking integration.""" # Ensure session_id is always a string for consistent type handling session_id = str(session_id) if session_id is not None else None with self._lock: for subscription_info in self._subscriptions.values(): # Check if this subscription matches the display_id subscription_display_id = subscription_info.subscription_id.split(';')[0] if subscription_display_id == display_id and subscription_info.tracking_integration: # Pass the full subscription_id (displayId;cameraId) to the tracking integration subscription_info.tracking_integration.set_session_id( display_id, session_id, subscription_id=subscription_info.subscription_id ) logger.debug(f"Set session {session_id} for display {display_id} with subscription {subscription_info.subscription_id}") def clear_session_id(self, session_id: str): """Clear session ID from the specific tracking integration handling this session.""" with self._lock: # Find the subscription that's handling this session session_subscription = None for subscription_info in self._subscriptions.values(): if subscription_info.tracking_integration: # Check if this integration is handling the given session_id integration = subscription_info.tracking_integration if session_id in integration.session_vehicles: session_subscription = subscription_info break if session_subscription and session_subscription.tracking_integration: session_subscription.tracking_integration.clear_session_id(session_id) logger.debug(f"Cleared session {session_id} from subscription {session_subscription.subscription_id}") else: logger.warning(f"No tracking integration found for session {session_id}, broadcasting to all subscriptions") # Fallback: broadcast to all (original behavior) for subscription_info in self._subscriptions.values(): if subscription_info.tracking_integration: subscription_info.tracking_integration.clear_session_id(session_id) def set_progression_stage(self, session_id: str, stage: str): """Set progression stage for the specific tracking integration handling this session.""" with self._lock: # Find the subscription that's handling this session session_subscription = None for subscription_info in self._subscriptions.values(): if subscription_info.tracking_integration: # Check if this integration is handling the given session_id # We need to check the integration's active sessions integration = subscription_info.tracking_integration if session_id in integration.session_vehicles: session_subscription = subscription_info break if session_subscription and session_subscription.tracking_integration: session_subscription.tracking_integration.set_progression_stage(session_id, stage) logger.debug(f"Set progression stage for session {session_id}: {stage} on subscription {session_subscription.subscription_id}") else: logger.warning(f"No tracking integration found for session {session_id}, broadcasting to all subscriptions") # Fallback: broadcast to all (original behavior) for subscription_info in self._subscriptions.values(): if subscription_info.tracking_integration: subscription_info.tracking_integration.set_progression_stage(session_id, stage) def get_tracking_stats(self) -> Dict[str, Any]: """Get tracking statistics from all subscriptions.""" stats = {} with self._lock: for subscription_id, subscription_info in self._subscriptions.items(): if subscription_info.tracking_integration: stats[subscription_id] = subscription_info.tracking_integration.get_statistics() return stats def get_stats(self) -> Dict[str, Any]: """Get comprehensive streaming statistics.""" with self._lock: buffer_stats = shared_cache_buffer.get_stats() tracking_stats = self.get_tracking_stats() return { 'active_subscriptions': len(self._subscriptions), 'active_streams': len(self._streams), 'cameras_with_subscribers': len(self._camera_subscribers), 'max_streams': self.max_streams, 'subscriptions_by_camera': { camera_id: len(subscribers) for camera_id, subscribers in self._camera_subscribers.items() }, 'buffer_stats': buffer_stats, 'tracking_stats': tracking_stats, 'memory_usage_mb': buffer_stats.get('total_memory_mb', 0) } # Global shared instance for application use # Default initialization, will be updated with config value in app.py shared_stream_manager = StreamManager(max_streams=20) def initialize_stream_manager(max_streams: int = 10): """Re-initialize the global stream manager with config value.""" global shared_stream_manager # Release old manager if exists if shared_stream_manager: try: # Stop all existing streams gracefully shared_stream_manager.stop_all() except Exception as e: logger.warning(f"Error stopping previous stream manager: {e}") shared_stream_manager = StreamManager(max_streams=max_streams) return shared_stream_manager