python-detector-worker/core/streaming/manager.py
ziesorx f467cb005d
Some checks failed
Build Worker Base and Application Images / deploy-stack (push) Blocked by required conditions
Build Worker Base and Application Images / check-base-changes (push) Successful in 7s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Has been cancelled
feat: update max_streams
2025-09-25 03:11:22 +07:00

468 lines
No EOL
20 KiB
Python

"""
Stream coordination and lifecycle management.
Optimized for 1280x720@6fps RTSP and 2560x1440 HTTP snapshots.
"""
import logging
import threading
import time
from typing import Dict, Set, Optional, List, Any
from dataclasses import dataclass
from collections import defaultdict
from .readers import RTSPReader, HTTPSnapshotReader
from .buffers import shared_cache_buffer, StreamType
from ..tracking.integration import TrackingPipelineIntegration
logger = logging.getLogger(__name__)
@dataclass
class StreamConfig:
"""Configuration for a stream."""
camera_id: str
rtsp_url: Optional[str] = None
snapshot_url: Optional[str] = None
snapshot_interval: int = 5000 # milliseconds
max_retries: int = 3
@dataclass
class SubscriptionInfo:
"""Information about a subscription."""
subscription_id: str
camera_id: str
stream_config: StreamConfig
created_at: float
crop_coords: Optional[tuple] = None
model_id: Optional[str] = None
model_url: Optional[str] = None
tracking_integration: Optional[TrackingPipelineIntegration] = None
class StreamManager:
"""Manages multiple camera streams with shared optimization."""
def __init__(self, max_streams: int = 10):
self.max_streams = max_streams
self._streams: Dict[str, Any] = {} # camera_id -> reader instance
self._subscriptions: Dict[str, SubscriptionInfo] = {} # subscription_id -> info
self._camera_subscribers: Dict[str, Set[str]] = defaultdict(set) # camera_id -> set of subscription_ids
self._lock = threading.RLock()
def add_subscription(self, subscription_id: str, stream_config: StreamConfig,
crop_coords: Optional[tuple] = None,
model_id: Optional[str] = None,
model_url: Optional[str] = None,
tracking_integration: Optional[TrackingPipelineIntegration] = None) -> bool:
"""Add a new subscription. Returns True if successful."""
with self._lock:
if subscription_id in self._subscriptions:
logger.warning(f"Subscription {subscription_id} already exists")
return False
camera_id = stream_config.camera_id
# Create subscription info
subscription_info = SubscriptionInfo(
subscription_id=subscription_id,
camera_id=camera_id,
stream_config=stream_config,
created_at=time.time(),
crop_coords=crop_coords,
model_id=model_id,
model_url=model_url,
tracking_integration=tracking_integration
)
# Pass subscription info to tracking integration for snapshot access
if tracking_integration:
tracking_integration.set_subscription_info(subscription_info)
self._subscriptions[subscription_id] = subscription_info
self._camera_subscribers[camera_id].add(subscription_id)
# Start stream if not already running
if camera_id not in self._streams:
if len(self._streams) >= self.max_streams:
logger.error(f"Maximum streams ({self.max_streams}) reached, cannot add {camera_id}")
self._remove_subscription_internal(subscription_id)
return False
success = self._start_stream(camera_id, stream_config)
if not success:
self._remove_subscription_internal(subscription_id)
return False
logger.info(f"Added subscription {subscription_id} for camera {camera_id} "
f"({len(self._camera_subscribers[camera_id])} total subscribers)")
return True
def remove_subscription(self, subscription_id: str) -> bool:
"""Remove a subscription. Returns True if found and removed."""
with self._lock:
return self._remove_subscription_internal(subscription_id)
def _remove_subscription_internal(self, subscription_id: str) -> bool:
"""Internal method to remove subscription (assumes lock is held)."""
if subscription_id not in self._subscriptions:
logger.warning(f"Subscription {subscription_id} not found")
return False
subscription_info = self._subscriptions[subscription_id]
camera_id = subscription_info.camera_id
# Remove from tracking
del self._subscriptions[subscription_id]
self._camera_subscribers[camera_id].discard(subscription_id)
# Stop stream if no more subscribers
if not self._camera_subscribers[camera_id]:
self._stop_stream(camera_id)
del self._camera_subscribers[camera_id]
logger.info(f"Removed subscription {subscription_id} for camera {camera_id} "
f"({len(self._camera_subscribers[camera_id])} remaining subscribers)")
return True
def _start_stream(self, camera_id: str, stream_config: StreamConfig) -> bool:
"""Start a stream for the given camera."""
try:
if stream_config.rtsp_url:
# RTSP stream
reader = RTSPReader(
camera_id=camera_id,
rtsp_url=stream_config.rtsp_url,
max_retries=stream_config.max_retries
)
reader.set_frame_callback(self._frame_callback)
reader.start()
self._streams[camera_id] = reader
logger.info(f"Started RTSP stream for camera {camera_id}")
elif stream_config.snapshot_url:
# HTTP snapshot stream
reader = HTTPSnapshotReader(
camera_id=camera_id,
snapshot_url=stream_config.snapshot_url,
interval_ms=stream_config.snapshot_interval,
max_retries=stream_config.max_retries
)
reader.set_frame_callback(self._frame_callback)
reader.start()
self._streams[camera_id] = reader
logger.info(f"Started HTTP snapshot stream for camera {camera_id}")
else:
logger.error(f"No valid URL provided for camera {camera_id}")
return False
return True
except Exception as e:
logger.error(f"Error starting stream for camera {camera_id}: {e}")
return False
def _stop_stream(self, camera_id: str):
"""Stop a stream for the given camera."""
if camera_id in self._streams:
try:
self._streams[camera_id].stop()
del self._streams[camera_id]
shared_cache_buffer.clear_camera(camera_id)
logger.info(f"Stopped stream for camera {camera_id}")
except Exception as e:
logger.error(f"Error stopping stream for camera {camera_id}: {e}")
def _frame_callback(self, camera_id: str, frame):
"""Callback for when a new frame is available."""
try:
# Detect stream type based on frame dimensions
stream_type = self._detect_stream_type(frame)
# Store frame in shared buffer with stream type
shared_cache_buffer.put_frame(camera_id, frame, stream_type)
# Process tracking for subscriptions with tracking integration
self._process_tracking_for_camera(camera_id, frame)
except Exception as e:
logger.error(f"Error in frame callback for camera {camera_id}: {e}")
def _process_tracking_for_camera(self, camera_id: str, frame):
"""Process tracking for all subscriptions of a camera."""
try:
with self._lock:
for subscription_id in self._camera_subscribers[camera_id]:
subscription_info = self._subscriptions[subscription_id]
# Skip if no tracking integration
if not subscription_info.tracking_integration:
continue
# Extract display_id from subscription_id
display_id = subscription_id.split(';')[0] if ';' in subscription_id else subscription_id
# Process frame through tracking asynchronously
# Note: This is synchronous for now, can be made async in future
try:
# Create a simple asyncio event loop for this frame
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(
subscription_info.tracking_integration.process_frame(
frame, display_id, subscription_id
)
)
# Log tracking results
if result:
tracked_count = len(result.get('tracked_vehicles', []))
validated_vehicle = result.get('validated_vehicle')
pipeline_result = result.get('pipeline_result')
if tracked_count > 0:
logger.info(f"[Tracking] {camera_id}: {tracked_count} vehicles tracked")
if validated_vehicle:
logger.info(f"[Tracking] {camera_id}: Vehicle {validated_vehicle['track_id']} "
f"validated as {validated_vehicle['state']} "
f"(confidence: {validated_vehicle['confidence']:.2f})")
if pipeline_result:
logger.info(f"[Pipeline] {camera_id}: {pipeline_result.get('status', 'unknown')} - "
f"{pipeline_result.get('message', 'no message')}")
finally:
loop.close()
except Exception as track_e:
logger.error(f"Error in tracking for {subscription_id}: {track_e}")
except Exception as e:
logger.error(f"Error processing tracking for camera {camera_id}: {e}")
def get_frame(self, camera_id: str, crop_coords: Optional[tuple] = None):
"""Get the latest frame for a camera with optional cropping."""
return shared_cache_buffer.get_frame(camera_id, crop_coords)
def get_frame_as_jpeg(self, camera_id: str, crop_coords: Optional[tuple] = None,
quality: int = 100) -> Optional[bytes]:
"""Get frame as JPEG bytes for HTTP responses with highest quality by default."""
return shared_cache_buffer.get_frame_as_jpeg(camera_id, crop_coords, quality)
def has_frame(self, camera_id: str) -> bool:
"""Check if a frame is available for the camera."""
return shared_cache_buffer.has_frame(camera_id)
def get_subscription_info(self, subscription_id: str) -> Optional[SubscriptionInfo]:
"""Get information about a subscription."""
with self._lock:
return self._subscriptions.get(subscription_id)
def get_camera_subscribers(self, camera_id: str) -> Set[str]:
"""Get all subscription IDs for a camera."""
with self._lock:
return self._camera_subscribers[camera_id].copy()
def get_active_cameras(self) -> List[str]:
"""Get list of cameras with active streams."""
with self._lock:
return list(self._streams.keys())
def get_all_subscriptions(self) -> List[SubscriptionInfo]:
"""Get all active subscriptions."""
with self._lock:
return list(self._subscriptions.values())
def reconcile_subscriptions(self, target_subscriptions: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Reconcile current subscriptions with target list.
Returns summary of changes made.
"""
with self._lock:
current_subscription_ids = set(self._subscriptions.keys())
target_subscription_ids = {sub['subscriptionIdentifier'] for sub in target_subscriptions}
# Find subscriptions to remove and add
to_remove = current_subscription_ids - target_subscription_ids
to_add = target_subscription_ids - current_subscription_ids
# Remove old subscriptions
removed_count = 0
for subscription_id in to_remove:
if self._remove_subscription_internal(subscription_id):
removed_count += 1
# Add new subscriptions
added_count = 0
failed_count = 0
for target_sub in target_subscriptions:
subscription_id = target_sub['subscriptionIdentifier']
if subscription_id in to_add:
success = self._add_subscription_from_payload(subscription_id, target_sub)
if success:
added_count += 1
else:
failed_count += 1
result = {
'removed': removed_count,
'added': added_count,
'failed': failed_count,
'total_active': len(self._subscriptions),
'active_streams': len(self._streams)
}
logger.info(f"Subscription reconciliation: {result}")
return result
def _add_subscription_from_payload(self, subscription_id: str, payload: Dict[str, Any]) -> bool:
"""Add subscription from WebSocket payload format."""
try:
# Extract camera ID from subscription identifier
# Format: "display-001;cam-001" -> camera_id = "cam-001"
camera_id = subscription_id.split(';')[-1]
# Extract crop coordinates if present
crop_coords = None
if all(key in payload for key in ['cropX1', 'cropY1', 'cropX2', 'cropY2']):
crop_coords = (
payload['cropX1'],
payload['cropY1'],
payload['cropX2'],
payload['cropY2']
)
# Create stream configuration
stream_config = StreamConfig(
camera_id=camera_id,
rtsp_url=payload.get('rtspUrl'),
snapshot_url=payload.get('snapshotUrl'),
snapshot_interval=payload.get('snapshotInterval', 5000),
max_retries=3,
)
return self.add_subscription(
subscription_id,
stream_config,
crop_coords,
model_id=payload.get('modelId'),
model_url=payload.get('modelUrl')
)
except Exception as e:
logger.error(f"Error adding subscription from payload {subscription_id}: {e}")
return False
def stop_all(self):
"""Stop all streams and clear all subscriptions."""
with self._lock:
# Stop all streams
for camera_id in list(self._streams.keys()):
self._stop_stream(camera_id)
# Clear all tracking
self._subscriptions.clear()
self._camera_subscribers.clear()
shared_cache_buffer.clear_all()
logger.info("Stopped all streams and cleared all subscriptions")
def set_session_id(self, display_id: str, session_id: str):
"""Set session ID for tracking integration."""
with self._lock:
for subscription_info in self._subscriptions.values():
# Check if this subscription matches the display_id
subscription_display_id = subscription_info.subscription_id.split(';')[0]
if subscription_display_id == display_id and subscription_info.tracking_integration:
subscription_info.tracking_integration.set_session_id(display_id, session_id)
logger.debug(f"Set session {session_id} for display {display_id}")
def clear_session_id(self, session_id: str):
"""Clear session ID from tracking integrations."""
with self._lock:
for subscription_info in self._subscriptions.values():
if subscription_info.tracking_integration:
subscription_info.tracking_integration.clear_session_id(session_id)
logger.debug(f"Cleared session {session_id}")
def set_progression_stage(self, session_id: str, stage: str):
"""Set progression stage for tracking integrations."""
with self._lock:
for subscription_info in self._subscriptions.values():
if subscription_info.tracking_integration:
subscription_info.tracking_integration.set_progression_stage(session_id, stage)
logger.debug(f"Set progression stage for session {session_id}: {stage}")
def get_tracking_stats(self) -> Dict[str, Any]:
"""Get tracking statistics from all subscriptions."""
stats = {}
with self._lock:
for subscription_id, subscription_info in self._subscriptions.items():
if subscription_info.tracking_integration:
stats[subscription_id] = subscription_info.tracking_integration.get_statistics()
return stats
def _detect_stream_type(self, frame) -> StreamType:
"""Detect stream type based on frame dimensions."""
if frame is None:
return StreamType.RTSP # Default
h, w = frame.shape[:2]
# RTSP: 1280x720
if w == 1280 and h == 720:
return StreamType.RTSP
# HTTP: 2560x1440 or larger
if w >= 2000 and h >= 1000:
return StreamType.HTTP
# Default based on size
if w <= 1920 and h <= 1080:
return StreamType.RTSP
else:
return StreamType.HTTP
def get_stats(self) -> Dict[str, Any]:
"""Get comprehensive streaming statistics."""
with self._lock:
buffer_stats = shared_cache_buffer.get_stats()
tracking_stats = self.get_tracking_stats()
# Add stream type information
stream_types = {}
for camera_id in self._streams.keys():
if isinstance(self._streams[camera_id], RTSPReader):
stream_types[camera_id] = 'rtsp'
elif isinstance(self._streams[camera_id], HTTPSnapshotReader):
stream_types[camera_id] = 'http'
else:
stream_types[camera_id] = 'unknown'
return {
'active_subscriptions': len(self._subscriptions),
'active_streams': len(self._streams),
'cameras_with_subscribers': len(self._camera_subscribers),
'max_streams': self.max_streams,
'stream_types': stream_types,
'subscriptions_by_camera': {
camera_id: len(subscribers)
for camera_id, subscribers in self._camera_subscribers.items()
},
'buffer_stats': buffer_stats,
'tracking_stats': tracking_stats,
'memory_usage_mb': buffer_stats.get('total_memory_mb', 0)
}
# Global shared instance for application use
# Will be initialized with config value in app.py
shared_stream_manager = None
def initialize_stream_manager(max_streams: int = 10):
"""Initialize the global stream manager with config value."""
global shared_stream_manager
shared_stream_manager = StreamManager(max_streams=max_streams)
return shared_stream_manager