Refactor: done phase 3
This commit is contained in:
parent
6ec10682c0
commit
7e8034c6e5
6 changed files with 967 additions and 21 deletions
322
core/streaming/manager.py
Normal file
322
core/streaming/manager.py
Normal file
|
@ -0,0 +1,322 @@
|
|||
"""
|
||||
Stream coordination and lifecycle management.
|
||||
Handles shared streams, subscription reconciliation, and resource optimization.
|
||||
"""
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from typing import Dict, Set, Optional, List, Any
|
||||
from dataclasses import dataclass
|
||||
from collections import defaultdict
|
||||
|
||||
from .readers import RTSPReader, HTTPSnapshotReader
|
||||
from .buffers import shared_cache_buffer, save_frame_for_testing
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class StreamConfig:
|
||||
"""Configuration for a stream."""
|
||||
camera_id: str
|
||||
rtsp_url: Optional[str] = None
|
||||
snapshot_url: Optional[str] = None
|
||||
snapshot_interval: int = 5000 # milliseconds
|
||||
max_retries: int = 3
|
||||
save_test_frames: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class SubscriptionInfo:
|
||||
"""Information about a subscription."""
|
||||
subscription_id: str
|
||||
camera_id: str
|
||||
stream_config: StreamConfig
|
||||
created_at: float
|
||||
crop_coords: Optional[tuple] = None
|
||||
|
||||
|
||||
class StreamManager:
|
||||
"""Manages multiple camera streams with shared optimization."""
|
||||
|
||||
def __init__(self, max_streams: int = 10):
|
||||
self.max_streams = max_streams
|
||||
self._streams: Dict[str, Any] = {} # camera_id -> reader instance
|
||||
self._subscriptions: Dict[str, SubscriptionInfo] = {} # subscription_id -> info
|
||||
self._camera_subscribers: Dict[str, Set[str]] = defaultdict(set) # camera_id -> set of subscription_ids
|
||||
self._lock = threading.RLock()
|
||||
|
||||
def add_subscription(self, subscription_id: str, stream_config: StreamConfig,
|
||||
crop_coords: Optional[tuple] = None) -> bool:
|
||||
"""Add a new subscription. Returns True if successful."""
|
||||
with self._lock:
|
||||
if subscription_id in self._subscriptions:
|
||||
logger.warning(f"Subscription {subscription_id} already exists")
|
||||
return False
|
||||
|
||||
camera_id = stream_config.camera_id
|
||||
|
||||
# Create subscription info
|
||||
subscription_info = SubscriptionInfo(
|
||||
subscription_id=subscription_id,
|
||||
camera_id=camera_id,
|
||||
stream_config=stream_config,
|
||||
created_at=time.time(),
|
||||
crop_coords=crop_coords
|
||||
)
|
||||
|
||||
self._subscriptions[subscription_id] = subscription_info
|
||||
self._camera_subscribers[camera_id].add(subscription_id)
|
||||
|
||||
# Start stream if not already running
|
||||
if camera_id not in self._streams:
|
||||
if len(self._streams) >= self.max_streams:
|
||||
logger.error(f"Maximum streams ({self.max_streams}) reached, cannot add {camera_id}")
|
||||
self._remove_subscription_internal(subscription_id)
|
||||
return False
|
||||
|
||||
success = self._start_stream(camera_id, stream_config)
|
||||
if not success:
|
||||
self._remove_subscription_internal(subscription_id)
|
||||
return False
|
||||
|
||||
logger.info(f"Added subscription {subscription_id} for camera {camera_id} "
|
||||
f"({len(self._camera_subscribers[camera_id])} total subscribers)")
|
||||
return True
|
||||
|
||||
def remove_subscription(self, subscription_id: str) -> bool:
|
||||
"""Remove a subscription. Returns True if found and removed."""
|
||||
with self._lock:
|
||||
return self._remove_subscription_internal(subscription_id)
|
||||
|
||||
def _remove_subscription_internal(self, subscription_id: str) -> bool:
|
||||
"""Internal method to remove subscription (assumes lock is held)."""
|
||||
if subscription_id not in self._subscriptions:
|
||||
logger.warning(f"Subscription {subscription_id} not found")
|
||||
return False
|
||||
|
||||
subscription_info = self._subscriptions[subscription_id]
|
||||
camera_id = subscription_info.camera_id
|
||||
|
||||
# Remove from tracking
|
||||
del self._subscriptions[subscription_id]
|
||||
self._camera_subscribers[camera_id].discard(subscription_id)
|
||||
|
||||
# Stop stream if no more subscribers
|
||||
if not self._camera_subscribers[camera_id]:
|
||||
self._stop_stream(camera_id)
|
||||
del self._camera_subscribers[camera_id]
|
||||
|
||||
logger.info(f"Removed subscription {subscription_id} for camera {camera_id} "
|
||||
f"({len(self._camera_subscribers[camera_id])} remaining subscribers)")
|
||||
return True
|
||||
|
||||
def _start_stream(self, camera_id: str, stream_config: StreamConfig) -> bool:
|
||||
"""Start a stream for the given camera."""
|
||||
try:
|
||||
if stream_config.rtsp_url:
|
||||
# RTSP stream
|
||||
reader = RTSPReader(
|
||||
camera_id=camera_id,
|
||||
rtsp_url=stream_config.rtsp_url,
|
||||
max_retries=stream_config.max_retries
|
||||
)
|
||||
reader.set_frame_callback(self._frame_callback)
|
||||
reader.start()
|
||||
self._streams[camera_id] = reader
|
||||
logger.info(f"Started RTSP stream for camera {camera_id}")
|
||||
|
||||
elif stream_config.snapshot_url:
|
||||
# HTTP snapshot stream
|
||||
reader = HTTPSnapshotReader(
|
||||
camera_id=camera_id,
|
||||
snapshot_url=stream_config.snapshot_url,
|
||||
interval_ms=stream_config.snapshot_interval,
|
||||
max_retries=stream_config.max_retries
|
||||
)
|
||||
reader.set_frame_callback(self._frame_callback)
|
||||
reader.start()
|
||||
self._streams[camera_id] = reader
|
||||
logger.info(f"Started HTTP snapshot stream for camera {camera_id}")
|
||||
|
||||
else:
|
||||
logger.error(f"No valid URL provided for camera {camera_id}")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error starting stream for camera {camera_id}: {e}")
|
||||
return False
|
||||
|
||||
def _stop_stream(self, camera_id: str):
|
||||
"""Stop a stream for the given camera."""
|
||||
if camera_id in self._streams:
|
||||
try:
|
||||
self._streams[camera_id].stop()
|
||||
del self._streams[camera_id]
|
||||
shared_cache_buffer.clear_camera(camera_id)
|
||||
logger.info(f"Stopped stream for camera {camera_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error stopping stream for camera {camera_id}: {e}")
|
||||
|
||||
def _frame_callback(self, camera_id: str, frame):
|
||||
"""Callback for when a new frame is available."""
|
||||
try:
|
||||
# Store frame in shared buffer
|
||||
shared_cache_buffer.put_frame(camera_id, frame)
|
||||
|
||||
# Save test frames if enabled for any subscription
|
||||
with self._lock:
|
||||
for subscription_id in self._camera_subscribers[camera_id]:
|
||||
subscription_info = self._subscriptions[subscription_id]
|
||||
if subscription_info.stream_config.save_test_frames:
|
||||
save_frame_for_testing(camera_id, frame)
|
||||
break # Only save once per frame
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in frame callback for camera {camera_id}: {e}")
|
||||
|
||||
def get_frame(self, camera_id: str, crop_coords: Optional[tuple] = None):
|
||||
"""Get the latest frame for a camera with optional cropping."""
|
||||
return shared_cache_buffer.get_frame(camera_id, crop_coords)
|
||||
|
||||
def get_frame_as_jpeg(self, camera_id: str, crop_coords: Optional[tuple] = None,
|
||||
quality: int = 100) -> Optional[bytes]:
|
||||
"""Get frame as JPEG bytes for HTTP responses with highest quality by default."""
|
||||
return shared_cache_buffer.get_frame_as_jpeg(camera_id, crop_coords, quality)
|
||||
|
||||
def has_frame(self, camera_id: str) -> bool:
|
||||
"""Check if a frame is available for the camera."""
|
||||
return shared_cache_buffer.has_frame(camera_id)
|
||||
|
||||
def get_subscription_info(self, subscription_id: str) -> Optional[SubscriptionInfo]:
|
||||
"""Get information about a subscription."""
|
||||
with self._lock:
|
||||
return self._subscriptions.get(subscription_id)
|
||||
|
||||
def get_camera_subscribers(self, camera_id: str) -> Set[str]:
|
||||
"""Get all subscription IDs for a camera."""
|
||||
with self._lock:
|
||||
return self._camera_subscribers[camera_id].copy()
|
||||
|
||||
def get_active_cameras(self) -> List[str]:
|
||||
"""Get list of cameras with active streams."""
|
||||
with self._lock:
|
||||
return list(self._streams.keys())
|
||||
|
||||
def get_all_subscriptions(self) -> List[SubscriptionInfo]:
|
||||
"""Get all active subscriptions."""
|
||||
with self._lock:
|
||||
return list(self._subscriptions.values())
|
||||
|
||||
def reconcile_subscriptions(self, target_subscriptions: List[Dict[str, Any]]) -> Dict[str, Any]:
|
||||
"""
|
||||
Reconcile current subscriptions with target list.
|
||||
Returns summary of changes made.
|
||||
"""
|
||||
with self._lock:
|
||||
current_subscription_ids = set(self._subscriptions.keys())
|
||||
target_subscription_ids = {sub['subscriptionIdentifier'] for sub in target_subscriptions}
|
||||
|
||||
# Find subscriptions to remove and add
|
||||
to_remove = current_subscription_ids - target_subscription_ids
|
||||
to_add = target_subscription_ids - current_subscription_ids
|
||||
|
||||
# Remove old subscriptions
|
||||
removed_count = 0
|
||||
for subscription_id in to_remove:
|
||||
if self._remove_subscription_internal(subscription_id):
|
||||
removed_count += 1
|
||||
|
||||
# Add new subscriptions
|
||||
added_count = 0
|
||||
failed_count = 0
|
||||
for target_sub in target_subscriptions:
|
||||
subscription_id = target_sub['subscriptionIdentifier']
|
||||
if subscription_id in to_add:
|
||||
success = self._add_subscription_from_payload(subscription_id, target_sub)
|
||||
if success:
|
||||
added_count += 1
|
||||
else:
|
||||
failed_count += 1
|
||||
|
||||
result = {
|
||||
'removed': removed_count,
|
||||
'added': added_count,
|
||||
'failed': failed_count,
|
||||
'total_active': len(self._subscriptions),
|
||||
'active_streams': len(self._streams)
|
||||
}
|
||||
|
||||
logger.info(f"Subscription reconciliation: {result}")
|
||||
return result
|
||||
|
||||
def _add_subscription_from_payload(self, subscription_id: str, payload: Dict[str, Any]) -> bool:
|
||||
"""Add subscription from WebSocket payload format."""
|
||||
try:
|
||||
# Extract camera ID from subscription identifier
|
||||
# Format: "display-001;cam-001" -> camera_id = "cam-001"
|
||||
camera_id = subscription_id.split(';')[-1]
|
||||
|
||||
# Extract crop coordinates if present
|
||||
crop_coords = None
|
||||
if all(key in payload for key in ['cropX1', 'cropY1', 'cropX2', 'cropY2']):
|
||||
crop_coords = (
|
||||
payload['cropX1'],
|
||||
payload['cropY1'],
|
||||
payload['cropX2'],
|
||||
payload['cropY2']
|
||||
)
|
||||
|
||||
# Create stream configuration
|
||||
stream_config = StreamConfig(
|
||||
camera_id=camera_id,
|
||||
rtsp_url=payload.get('rtspUrl'),
|
||||
snapshot_url=payload.get('snapshotUrl'),
|
||||
snapshot_interval=payload.get('snapshotInterval', 5000),
|
||||
max_retries=3,
|
||||
save_test_frames=True # Enable for testing
|
||||
)
|
||||
|
||||
return self.add_subscription(subscription_id, stream_config, crop_coords)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error adding subscription from payload {subscription_id}: {e}")
|
||||
return False
|
||||
|
||||
def stop_all(self):
|
||||
"""Stop all streams and clear all subscriptions."""
|
||||
with self._lock:
|
||||
# Stop all streams
|
||||
for camera_id in list(self._streams.keys()):
|
||||
self._stop_stream(camera_id)
|
||||
|
||||
# Clear all tracking
|
||||
self._subscriptions.clear()
|
||||
self._camera_subscribers.clear()
|
||||
shared_cache_buffer.clear_all()
|
||||
|
||||
logger.info("Stopped all streams and cleared all subscriptions")
|
||||
|
||||
def get_stats(self) -> Dict[str, Any]:
|
||||
"""Get comprehensive streaming statistics."""
|
||||
with self._lock:
|
||||
buffer_stats = shared_cache_buffer.get_stats()
|
||||
|
||||
return {
|
||||
'active_subscriptions': len(self._subscriptions),
|
||||
'active_streams': len(self._streams),
|
||||
'cameras_with_subscribers': len(self._camera_subscribers),
|
||||
'max_streams': self.max_streams,
|
||||
'subscriptions_by_camera': {
|
||||
camera_id: len(subscribers)
|
||||
for camera_id, subscribers in self._camera_subscribers.items()
|
||||
},
|
||||
'buffer_stats': buffer_stats
|
||||
}
|
||||
|
||||
|
||||
# Global shared instance for application use
|
||||
shared_stream_manager = StreamManager(max_streams=10)
|
Loading…
Add table
Add a link
Reference in a new issue