All checks were successful
Build Worker Base and Application Images / check-base-changes (push) Successful in 7s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 2m51s
Build Worker Base and Application Images / deploy-stack (push) Successful in 15s
710 lines
No EOL
31 KiB
Python
710 lines
No EOL
31 KiB
Python
"""
|
|
Stream coordination and lifecycle management.
|
|
Optimized for 1280x720@6fps RTSP and 2560x1440 HTTP snapshots.
|
|
"""
|
|
import logging
|
|
import threading
|
|
import time
|
|
import queue
|
|
import asyncio
|
|
from typing import Dict, Set, Optional, List, Any
|
|
from dataclasses import dataclass
|
|
from collections import defaultdict
|
|
|
|
from .readers import HTTPSnapshotReader, FFmpegRTSPReader
|
|
from .buffers import shared_cache_buffer
|
|
from ..tracking.integration import TrackingPipelineIntegration
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class StreamConfig:
|
|
"""Configuration for a stream."""
|
|
camera_id: str
|
|
rtsp_url: Optional[str] = None
|
|
snapshot_url: Optional[str] = None
|
|
snapshot_interval: int = 5000 # milliseconds
|
|
max_retries: int = 3
|
|
|
|
|
|
@dataclass
|
|
class SubscriptionInfo:
|
|
"""Information about a subscription."""
|
|
subscription_id: str
|
|
camera_id: str
|
|
stream_config: StreamConfig
|
|
created_at: float
|
|
crop_coords: Optional[tuple] = None
|
|
model_id: Optional[str] = None
|
|
model_url: Optional[str] = None
|
|
tracking_integration: Optional[TrackingPipelineIntegration] = None
|
|
|
|
|
|
class StreamManager:
|
|
"""Manages multiple camera streams with shared optimization."""
|
|
|
|
def __init__(self, max_streams: int = 10):
|
|
self.max_streams = max_streams
|
|
self._streams: Dict[str, Any] = {} # camera_id -> reader instance
|
|
self._subscriptions: Dict[str, SubscriptionInfo] = {} # subscription_id -> info
|
|
self._camera_subscribers: Dict[str, Set[str]] = defaultdict(set) # camera_id -> set of subscription_ids
|
|
self._lock = threading.RLock()
|
|
|
|
# Fair tracking queue system - per camera queues
|
|
self._tracking_queues: Dict[str, queue.Queue] = {} # camera_id -> queue
|
|
self._tracking_workers = []
|
|
self._stop_workers = threading.Event()
|
|
self._dropped_frame_counts: Dict[str, int] = {} # per-camera drop counts
|
|
|
|
# Round-robin scheduling state
|
|
self._camera_list = [] # Ordered list of active cameras
|
|
self._camera_round_robin_index = 0
|
|
self._round_robin_lock = threading.Lock()
|
|
|
|
# Start worker threads for tracking processing
|
|
num_workers = min(4, max_streams // 2 + 1) # Scale with streams
|
|
for i in range(num_workers):
|
|
worker = threading.Thread(
|
|
target=self._tracking_worker_loop,
|
|
name=f"TrackingWorker-{i}",
|
|
daemon=True
|
|
)
|
|
worker.start()
|
|
self._tracking_workers.append(worker)
|
|
|
|
logger.info(f"Started {num_workers} tracking worker threads")
|
|
|
|
def _ensure_camera_queue(self, camera_id: str):
|
|
"""Ensure a tracking queue exists for the camera."""
|
|
if camera_id not in self._tracking_queues:
|
|
self._tracking_queues[camera_id] = queue.Queue(maxsize=10) # 10 frames per camera
|
|
self._dropped_frame_counts[camera_id] = 0
|
|
|
|
with self._round_robin_lock:
|
|
if camera_id not in self._camera_list:
|
|
self._camera_list.append(camera_id)
|
|
|
|
logger.info(f"Created tracking queue for camera {camera_id}")
|
|
|
|
def _remove_camera_queue(self, camera_id: str):
|
|
"""Remove tracking queue for a camera that's no longer active."""
|
|
if camera_id in self._tracking_queues:
|
|
# Clear any remaining items
|
|
while not self._tracking_queues[camera_id].empty():
|
|
try:
|
|
self._tracking_queues[camera_id].get_nowait()
|
|
except queue.Empty:
|
|
break
|
|
|
|
del self._tracking_queues[camera_id]
|
|
del self._dropped_frame_counts[camera_id]
|
|
|
|
with self._round_robin_lock:
|
|
if camera_id in self._camera_list:
|
|
self._camera_list.remove(camera_id)
|
|
# Reset index if needed
|
|
if self._camera_round_robin_index >= len(self._camera_list):
|
|
self._camera_round_robin_index = 0
|
|
|
|
logger.info(f"Removed tracking queue for camera {camera_id}")
|
|
|
|
def add_subscription(self, subscription_id: str, stream_config: StreamConfig,
|
|
crop_coords: Optional[tuple] = None,
|
|
model_id: Optional[str] = None,
|
|
model_url: Optional[str] = None,
|
|
tracking_integration: Optional[TrackingPipelineIntegration] = None) -> bool:
|
|
"""Add a new subscription. Returns True if successful."""
|
|
with self._lock:
|
|
if subscription_id in self._subscriptions:
|
|
logger.warning(f"Subscription {subscription_id} already exists")
|
|
return False
|
|
|
|
camera_id = stream_config.camera_id
|
|
|
|
# Create subscription info
|
|
subscription_info = SubscriptionInfo(
|
|
subscription_id=subscription_id,
|
|
camera_id=camera_id,
|
|
stream_config=stream_config,
|
|
created_at=time.time(),
|
|
crop_coords=crop_coords,
|
|
model_id=model_id,
|
|
model_url=model_url,
|
|
tracking_integration=tracking_integration
|
|
)
|
|
|
|
# Pass subscription info to tracking integration for snapshot access
|
|
if tracking_integration:
|
|
tracking_integration.set_subscription_info(subscription_info)
|
|
|
|
self._subscriptions[subscription_id] = subscription_info
|
|
self._camera_subscribers[camera_id].add(subscription_id)
|
|
|
|
# Start stream if not already running
|
|
if camera_id not in self._streams:
|
|
if len(self._streams) >= self.max_streams:
|
|
logger.error(f"Maximum streams ({self.max_streams}) reached, cannot add {camera_id}")
|
|
self._remove_subscription_internal(subscription_id)
|
|
return False
|
|
|
|
success = self._start_stream(camera_id, stream_config)
|
|
if not success:
|
|
self._remove_subscription_internal(subscription_id)
|
|
return False
|
|
|
|
logger.info(f"Added subscription {subscription_id} for camera {camera_id} "
|
|
f"({len(self._camera_subscribers[camera_id])} total subscribers)")
|
|
return True
|
|
|
|
def remove_subscription(self, subscription_id: str) -> bool:
|
|
"""Remove a subscription. Returns True if found and removed."""
|
|
with self._lock:
|
|
return self._remove_subscription_internal(subscription_id)
|
|
|
|
def _remove_subscription_internal(self, subscription_id: str) -> bool:
|
|
"""Internal method to remove subscription (assumes lock is held)."""
|
|
if subscription_id not in self._subscriptions:
|
|
logger.warning(f"Subscription {subscription_id} not found")
|
|
return False
|
|
|
|
subscription_info = self._subscriptions[subscription_id]
|
|
camera_id = subscription_info.camera_id
|
|
|
|
# Remove from tracking
|
|
del self._subscriptions[subscription_id]
|
|
self._camera_subscribers[camera_id].discard(subscription_id)
|
|
|
|
# Stop stream if no more subscribers
|
|
if not self._camera_subscribers[camera_id]:
|
|
self._stop_stream(camera_id)
|
|
del self._camera_subscribers[camera_id]
|
|
|
|
logger.info(f"Removed subscription {subscription_id} for camera {camera_id} "
|
|
f"({len(self._camera_subscribers[camera_id])} remaining subscribers)")
|
|
return True
|
|
|
|
def _start_stream(self, camera_id: str, stream_config: StreamConfig) -> bool:
|
|
"""Start a stream for the given camera."""
|
|
try:
|
|
if stream_config.rtsp_url:
|
|
# RTSP stream using FFmpeg subprocess with CUDA acceleration
|
|
logger.info(f"\033[94m[RTSP] Starting {camera_id}\033[0m")
|
|
reader = FFmpegRTSPReader(
|
|
camera_id=camera_id,
|
|
rtsp_url=stream_config.rtsp_url,
|
|
max_retries=stream_config.max_retries
|
|
)
|
|
reader.set_frame_callback(self._frame_callback)
|
|
reader.start()
|
|
self._streams[camera_id] = reader
|
|
self._ensure_camera_queue(camera_id) # Create tracking queue
|
|
logger.info(f"\033[92m[RTSP] {camera_id} connected\033[0m")
|
|
|
|
elif stream_config.snapshot_url:
|
|
# HTTP snapshot stream
|
|
logger.info(f"\033[95m[HTTP] Starting {camera_id}\033[0m")
|
|
reader = HTTPSnapshotReader(
|
|
camera_id=camera_id,
|
|
snapshot_url=stream_config.snapshot_url,
|
|
interval_ms=stream_config.snapshot_interval,
|
|
max_retries=stream_config.max_retries
|
|
)
|
|
reader.set_frame_callback(self._frame_callback)
|
|
reader.start()
|
|
self._streams[camera_id] = reader
|
|
self._ensure_camera_queue(camera_id) # Create tracking queue
|
|
logger.info(f"\033[92m[HTTP] {camera_id} connected\033[0m")
|
|
|
|
else:
|
|
logger.error(f"No valid URL provided for camera {camera_id}")
|
|
return False
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error starting stream for camera {camera_id}: {e}")
|
|
return False
|
|
|
|
def _stop_stream(self, camera_id: str):
|
|
"""Stop a stream for the given camera."""
|
|
if camera_id in self._streams:
|
|
try:
|
|
self._streams[camera_id].stop()
|
|
del self._streams[camera_id]
|
|
self._remove_camera_queue(camera_id) # Remove tracking queue
|
|
# DON'T clear frames - they should persist until replaced
|
|
# shared_cache_buffer.clear_camera(camera_id) # REMOVED - frames should persist
|
|
logger.info(f"Stopped stream for camera {camera_id} (frames preserved in buffer)")
|
|
except Exception as e:
|
|
logger.error(f"Error stopping stream for camera {camera_id}: {e}")
|
|
|
|
def _frame_callback(self, camera_id: str, frame):
|
|
"""Callback for when a new frame is available."""
|
|
try:
|
|
# Store frame in shared buffer
|
|
shared_cache_buffer.put_frame(camera_id, frame)
|
|
# Quieter frame callback logging - only log occasionally
|
|
if hasattr(self, '_frame_log_count'):
|
|
self._frame_log_count += 1
|
|
else:
|
|
self._frame_log_count = 1
|
|
|
|
# Log every 100 frames to avoid spam
|
|
if self._frame_log_count % 100 == 0:
|
|
available_cameras = shared_cache_buffer.frame_buffer.get_camera_list()
|
|
logger.info(f"\033[96m[BUFFER] {len(available_cameras)} active cameras: {', '.join(available_cameras)}\033[0m")
|
|
|
|
# Queue for tracking processing (non-blocking) - route to camera-specific queue
|
|
if camera_id in self._tracking_queues:
|
|
try:
|
|
self._tracking_queues[camera_id].put_nowait({
|
|
'frame': frame,
|
|
'timestamp': time.time()
|
|
})
|
|
except queue.Full:
|
|
# Drop frame if camera queue is full (maintain real-time)
|
|
self._dropped_frame_counts[camera_id] += 1
|
|
|
|
if self._dropped_frame_counts[camera_id] % 50 == 0:
|
|
logger.warning(f"Dropped {self._dropped_frame_counts[camera_id]} frames for camera {camera_id} due to full queue")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in frame callback for camera {camera_id}: {e}")
|
|
|
|
def _process_tracking_for_camera(self, camera_id: str, frame):
|
|
"""Process tracking for all subscriptions of a camera."""
|
|
try:
|
|
with self._lock:
|
|
for subscription_id in self._camera_subscribers[camera_id]:
|
|
subscription_info = self._subscriptions[subscription_id]
|
|
|
|
# Skip if no tracking integration
|
|
if not subscription_info.tracking_integration:
|
|
continue
|
|
|
|
# Extract display_id from subscription_id
|
|
display_id = subscription_id.split(';')[0] if ';' in subscription_id else subscription_id
|
|
|
|
# Process frame through tracking asynchronously
|
|
# Note: This is synchronous for now, can be made async in future
|
|
try:
|
|
# Create a simple asyncio event loop for this frame
|
|
import asyncio
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
result = loop.run_until_complete(
|
|
subscription_info.tracking_integration.process_frame(
|
|
frame, display_id, subscription_id
|
|
)
|
|
)
|
|
# Log tracking results
|
|
if result:
|
|
tracked_count = len(result.get('tracked_vehicles', []))
|
|
validated_vehicle = result.get('validated_vehicle')
|
|
pipeline_result = result.get('pipeline_result')
|
|
|
|
if tracked_count > 0:
|
|
logger.info(f"[Tracking] {camera_id}: {tracked_count} vehicles tracked")
|
|
|
|
if validated_vehicle:
|
|
logger.info(f"[Tracking] {camera_id}: Vehicle {validated_vehicle['track_id']} "
|
|
f"validated as {validated_vehicle['state']} "
|
|
f"(confidence: {validated_vehicle['confidence']:.2f})")
|
|
|
|
if pipeline_result:
|
|
logger.info(f"[Pipeline] {camera_id}: {pipeline_result.get('status', 'unknown')} - "
|
|
f"{pipeline_result.get('message', 'no message')}")
|
|
finally:
|
|
loop.close()
|
|
except Exception as track_e:
|
|
logger.error(f"Error in tracking for {subscription_id}: {track_e}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing tracking for camera {camera_id}: {e}")
|
|
|
|
def _tracking_worker_loop(self):
|
|
"""Worker thread loop for round-robin processing of camera queues."""
|
|
logger.info(f"Tracking worker {threading.current_thread().name} started")
|
|
|
|
consecutive_empty = 0
|
|
max_consecutive_empty = 10 # Sleep if all cameras empty this many times
|
|
|
|
while not self._stop_workers.is_set():
|
|
try:
|
|
# Get next camera in round-robin fashion
|
|
camera_id, item = self._get_next_camera_item()
|
|
|
|
if camera_id is None:
|
|
# No cameras have items, sleep briefly
|
|
consecutive_empty += 1
|
|
if consecutive_empty >= max_consecutive_empty:
|
|
time.sleep(0.1) # Sleep 100ms if nothing to process
|
|
consecutive_empty = 0
|
|
continue
|
|
|
|
consecutive_empty = 0 # Reset counter when we find work
|
|
|
|
frame = item['frame']
|
|
timestamp = item['timestamp']
|
|
|
|
# Check if frame is too old (drop if > 1 second old)
|
|
age = time.time() - timestamp
|
|
if age > 1.0:
|
|
logger.debug(f"Dropping old frame for {camera_id} (age: {age:.2f}s)")
|
|
continue
|
|
|
|
# Process tracking for this camera's frame
|
|
self._process_tracking_for_camera_sync(camera_id, frame)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in tracking worker: {e}", exc_info=True)
|
|
|
|
logger.info(f"Tracking worker {threading.current_thread().name} stopped")
|
|
|
|
def _get_next_camera_item(self):
|
|
"""Get next item from camera queues using round-robin scheduling."""
|
|
with self._round_robin_lock:
|
|
if not self._camera_list:
|
|
return None, None
|
|
|
|
attempts = 0
|
|
max_attempts = len(self._camera_list)
|
|
|
|
while attempts < max_attempts:
|
|
# Get current camera
|
|
if self._camera_round_robin_index >= len(self._camera_list):
|
|
self._camera_round_robin_index = 0
|
|
|
|
camera_id = self._camera_list[self._camera_round_robin_index]
|
|
|
|
# Move to next camera for next call
|
|
self._camera_round_robin_index = (self._camera_round_robin_index + 1) % len(self._camera_list)
|
|
|
|
# Try to get item from this camera's queue
|
|
if camera_id in self._tracking_queues:
|
|
try:
|
|
item = self._tracking_queues[camera_id].get_nowait()
|
|
return camera_id, item
|
|
except queue.Empty:
|
|
pass # Try next camera
|
|
|
|
attempts += 1
|
|
|
|
return None, None # All cameras empty
|
|
|
|
def _process_tracking_for_camera_sync(self, camera_id: str, frame):
|
|
"""Synchronous version of tracking processing for worker threads."""
|
|
try:
|
|
with self._lock:
|
|
subscription_ids = list(self._camera_subscribers.get(camera_id, []))
|
|
|
|
for subscription_id in subscription_ids:
|
|
subscription_info = self._subscriptions.get(subscription_id)
|
|
|
|
if not subscription_info or not subscription_info.tracking_integration:
|
|
continue
|
|
|
|
display_id = subscription_id.split(';')[0] if ';' in subscription_id else subscription_id
|
|
|
|
try:
|
|
# Run async tracking in thread's event loop
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
result = loop.run_until_complete(
|
|
subscription_info.tracking_integration.process_frame(
|
|
frame, display_id, subscription_id
|
|
)
|
|
)
|
|
|
|
# Log tracking results
|
|
if result:
|
|
tracked_count = len(result.get('tracked_vehicles', []))
|
|
validated_vehicle = result.get('validated_vehicle')
|
|
pipeline_result = result.get('pipeline_result')
|
|
|
|
if tracked_count > 0:
|
|
logger.info(f"[Tracking] {camera_id}: {tracked_count} vehicles tracked")
|
|
|
|
if validated_vehicle:
|
|
logger.info(f"[Tracking] {camera_id}: Vehicle {validated_vehicle['track_id']} "
|
|
f"validated as {validated_vehicle['state']} "
|
|
f"(confidence: {validated_vehicle['confidence']:.2f})")
|
|
|
|
if pipeline_result:
|
|
logger.info(f"[Pipeline] {camera_id}: {pipeline_result.get('status', 'unknown')} - "
|
|
f"{pipeline_result.get('message', 'no message')}")
|
|
finally:
|
|
loop.close()
|
|
|
|
except Exception as track_e:
|
|
logger.error(f"Error in tracking for {subscription_id}: {track_e}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing tracking for camera {camera_id}: {e}")
|
|
|
|
def get_frame(self, camera_id: str, crop_coords: Optional[tuple] = None):
|
|
"""Get the latest frame for a camera with optional cropping."""
|
|
return shared_cache_buffer.get_frame(camera_id, crop_coords)
|
|
|
|
def get_frame_as_jpeg(self, camera_id: str, crop_coords: Optional[tuple] = None,
|
|
quality: int = 100) -> Optional[bytes]:
|
|
"""Get frame as JPEG bytes for HTTP responses with highest quality by default."""
|
|
return shared_cache_buffer.get_frame_as_jpeg(camera_id, crop_coords, quality)
|
|
|
|
def has_frame(self, camera_id: str) -> bool:
|
|
"""Check if a frame is available for the camera."""
|
|
return shared_cache_buffer.has_frame(camera_id)
|
|
|
|
def get_subscription_info(self, subscription_id: str) -> Optional[SubscriptionInfo]:
|
|
"""Get information about a subscription."""
|
|
with self._lock:
|
|
return self._subscriptions.get(subscription_id)
|
|
|
|
def get_camera_subscribers(self, camera_id: str) -> Set[str]:
|
|
"""Get all subscription IDs for a camera."""
|
|
with self._lock:
|
|
return self._camera_subscribers[camera_id].copy()
|
|
|
|
def get_active_cameras(self) -> List[str]:
|
|
"""Get list of cameras with active streams."""
|
|
with self._lock:
|
|
return list(self._streams.keys())
|
|
|
|
def get_all_subscriptions(self) -> List[SubscriptionInfo]:
|
|
"""Get all active subscriptions."""
|
|
with self._lock:
|
|
return list(self._subscriptions.values())
|
|
|
|
def reconcile_subscriptions(self, target_subscriptions: List[Dict[str, Any]]) -> Dict[str, Any]:
|
|
"""
|
|
Reconcile current subscriptions with target list.
|
|
Returns summary of changes made.
|
|
"""
|
|
with self._lock:
|
|
current_subscription_ids = set(self._subscriptions.keys())
|
|
target_subscription_ids = {sub['subscriptionIdentifier'] for sub in target_subscriptions}
|
|
|
|
# Find subscriptions to remove and add
|
|
to_remove = current_subscription_ids - target_subscription_ids
|
|
to_add = target_subscription_ids - current_subscription_ids
|
|
|
|
# Remove old subscriptions
|
|
removed_count = 0
|
|
for subscription_id in to_remove:
|
|
if self._remove_subscription_internal(subscription_id):
|
|
removed_count += 1
|
|
|
|
# Add new subscriptions
|
|
added_count = 0
|
|
failed_count = 0
|
|
for target_sub in target_subscriptions:
|
|
subscription_id = target_sub['subscriptionIdentifier']
|
|
if subscription_id in to_add:
|
|
success = self._add_subscription_from_payload(subscription_id, target_sub)
|
|
if success:
|
|
added_count += 1
|
|
else:
|
|
failed_count += 1
|
|
|
|
result = {
|
|
'removed': removed_count,
|
|
'added': added_count,
|
|
'failed': failed_count,
|
|
'total_active': len(self._subscriptions),
|
|
'active_streams': len(self._streams)
|
|
}
|
|
|
|
logger.info(f"Subscription reconciliation: {result}")
|
|
return result
|
|
|
|
def _add_subscription_from_payload(self, subscription_id: str, payload: Dict[str, Any]) -> bool:
|
|
"""Add subscription from WebSocket payload format."""
|
|
try:
|
|
# Extract camera ID from subscription identifier
|
|
# Format: "display-001;cam-001" -> camera_id = "cam-001"
|
|
camera_id = subscription_id.split(';')[-1]
|
|
|
|
# Extract crop coordinates if present
|
|
crop_coords = None
|
|
if all(key in payload for key in ['cropX1', 'cropY1', 'cropX2', 'cropY2']):
|
|
crop_coords = (
|
|
payload['cropX1'],
|
|
payload['cropY1'],
|
|
payload['cropX2'],
|
|
payload['cropY2']
|
|
)
|
|
|
|
# Create stream configuration
|
|
stream_config = StreamConfig(
|
|
camera_id=camera_id,
|
|
rtsp_url=payload.get('rtspUrl'),
|
|
snapshot_url=payload.get('snapshotUrl'),
|
|
snapshot_interval=payload.get('snapshotInterval', 5000),
|
|
max_retries=3,
|
|
)
|
|
|
|
return self.add_subscription(
|
|
subscription_id,
|
|
stream_config,
|
|
crop_coords,
|
|
model_id=payload.get('modelId'),
|
|
model_url=payload.get('modelUrl')
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error adding subscription from payload {subscription_id}: {e}")
|
|
return False
|
|
|
|
def stop_all(self):
|
|
"""Stop all streams and clear all subscriptions."""
|
|
# Signal workers to stop
|
|
self._stop_workers.set()
|
|
|
|
# Clear all camera queues
|
|
for camera_id, camera_queue in list(self._tracking_queues.items()):
|
|
while not camera_queue.empty():
|
|
try:
|
|
camera_queue.get_nowait()
|
|
except queue.Empty:
|
|
break
|
|
|
|
# Wait for workers to finish
|
|
for worker in self._tracking_workers:
|
|
worker.join(timeout=2.0)
|
|
|
|
# Clear queue management structures
|
|
self._tracking_queues.clear()
|
|
self._dropped_frame_counts.clear()
|
|
with self._round_robin_lock:
|
|
self._camera_list.clear()
|
|
self._camera_round_robin_index = 0
|
|
|
|
logger.info("Stopped all tracking worker threads")
|
|
|
|
with self._lock:
|
|
# Stop all streams
|
|
for camera_id in list(self._streams.keys()):
|
|
self._stop_stream(camera_id)
|
|
|
|
# Clear all tracking
|
|
self._subscriptions.clear()
|
|
self._camera_subscribers.clear()
|
|
shared_cache_buffer.clear_all()
|
|
|
|
logger.info("Stopped all streams and cleared all subscriptions")
|
|
|
|
def set_session_id(self, display_id: str, session_id: str):
|
|
"""Set session ID for tracking integration."""
|
|
# Ensure session_id is always a string for consistent type handling
|
|
session_id = str(session_id) if session_id is not None else None
|
|
with self._lock:
|
|
for subscription_info in self._subscriptions.values():
|
|
# Check if this subscription matches the display_id
|
|
subscription_display_id = subscription_info.subscription_id.split(';')[0]
|
|
if subscription_display_id == display_id and subscription_info.tracking_integration:
|
|
# Pass the full subscription_id (displayId;cameraId) to the tracking integration
|
|
subscription_info.tracking_integration.set_session_id(
|
|
display_id,
|
|
session_id,
|
|
subscription_id=subscription_info.subscription_id
|
|
)
|
|
logger.debug(f"Set session {session_id} for display {display_id} with subscription {subscription_info.subscription_id}")
|
|
|
|
def clear_session_id(self, session_id: str):
|
|
"""Clear session ID from the specific tracking integration handling this session."""
|
|
with self._lock:
|
|
# Find the subscription that's handling this session
|
|
session_subscription = None
|
|
for subscription_info in self._subscriptions.values():
|
|
if subscription_info.tracking_integration:
|
|
# Check if this integration is handling the given session_id
|
|
integration = subscription_info.tracking_integration
|
|
if session_id in integration.session_vehicles:
|
|
session_subscription = subscription_info
|
|
break
|
|
|
|
if session_subscription and session_subscription.tracking_integration:
|
|
session_subscription.tracking_integration.clear_session_id(session_id)
|
|
logger.debug(f"Cleared session {session_id} from subscription {session_subscription.subscription_id}")
|
|
else:
|
|
logger.warning(f"No tracking integration found for session {session_id}, broadcasting to all subscriptions")
|
|
# Fallback: broadcast to all (original behavior)
|
|
for subscription_info in self._subscriptions.values():
|
|
if subscription_info.tracking_integration:
|
|
subscription_info.tracking_integration.clear_session_id(session_id)
|
|
|
|
def set_progression_stage(self, session_id: str, stage: str):
|
|
"""Set progression stage for the specific tracking integration handling this session."""
|
|
with self._lock:
|
|
# Find the subscription that's handling this session
|
|
session_subscription = None
|
|
for subscription_info in self._subscriptions.values():
|
|
if subscription_info.tracking_integration:
|
|
# Check if this integration is handling the given session_id
|
|
# We need to check the integration's active sessions
|
|
integration = subscription_info.tracking_integration
|
|
if session_id in integration.session_vehicles:
|
|
session_subscription = subscription_info
|
|
break
|
|
|
|
if session_subscription and session_subscription.tracking_integration:
|
|
session_subscription.tracking_integration.set_progression_stage(session_id, stage)
|
|
logger.debug(f"Set progression stage for session {session_id}: {stage} on subscription {session_subscription.subscription_id}")
|
|
else:
|
|
logger.warning(f"No tracking integration found for session {session_id}, broadcasting to all subscriptions")
|
|
# Fallback: broadcast to all (original behavior)
|
|
for subscription_info in self._subscriptions.values():
|
|
if subscription_info.tracking_integration:
|
|
subscription_info.tracking_integration.set_progression_stage(session_id, stage)
|
|
|
|
def get_tracking_stats(self) -> Dict[str, Any]:
|
|
"""Get tracking statistics from all subscriptions."""
|
|
stats = {}
|
|
with self._lock:
|
|
for subscription_id, subscription_info in self._subscriptions.items():
|
|
if subscription_info.tracking_integration:
|
|
stats[subscription_id] = subscription_info.tracking_integration.get_statistics()
|
|
return stats
|
|
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""Get comprehensive streaming statistics."""
|
|
with self._lock:
|
|
buffer_stats = shared_cache_buffer.get_stats()
|
|
tracking_stats = self.get_tracking_stats()
|
|
|
|
return {
|
|
'active_subscriptions': len(self._subscriptions),
|
|
'active_streams': len(self._streams),
|
|
'cameras_with_subscribers': len(self._camera_subscribers),
|
|
'max_streams': self.max_streams,
|
|
'subscriptions_by_camera': {
|
|
camera_id: len(subscribers)
|
|
for camera_id, subscribers in self._camera_subscribers.items()
|
|
},
|
|
'buffer_stats': buffer_stats,
|
|
'tracking_stats': tracking_stats,
|
|
'memory_usage_mb': buffer_stats.get('total_memory_mb', 0)
|
|
}
|
|
|
|
|
|
# Global shared instance for application use
|
|
# Default initialization, will be updated with config value in app.py
|
|
shared_stream_manager = StreamManager(max_streams=20)
|
|
|
|
def initialize_stream_manager(max_streams: int = 10):
|
|
"""Re-initialize the global stream manager with config value."""
|
|
global shared_stream_manager
|
|
# Release old manager if exists
|
|
if shared_stream_manager:
|
|
try:
|
|
# Stop all existing streams gracefully
|
|
shared_stream_manager.stop_all()
|
|
except Exception as e:
|
|
logger.warning(f"Error stopping previous stream manager: {e}")
|
|
shared_stream_manager = StreamManager(max_streams=max_streams)
|
|
return shared_stream_manager |