Refactor: PHASE 2: Core Module Extraction

This commit is contained in:
ziesorx 2025-09-12 14:45:11 +07:00
parent 96bedae80a
commit 4e9ae6bcc4
7 changed files with 3684 additions and 0 deletions

View file

@ -0,0 +1,345 @@
"""
Camera connection state monitoring and management.
This module provides centralized tracking of camera connection states,
error handling, and disconnection notifications.
"""
import time
import logging
from typing import Dict, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
from ..core.exceptions import CameraConnectionError
logger = logging.getLogger(__name__)
@dataclass
class CameraConnectionState:
"""Represents the connection state of a single camera."""
connected: bool = True
last_error: Optional[str] = None
last_error_time: Optional[float] = None
consecutive_failures: int = 0
disconnection_notified: bool = False
last_successful_frame: Optional[float] = None
connection_start_time: Optional[float] = field(default_factory=time.time)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary format."""
return {
"connected": self.connected,
"last_error": self.last_error,
"last_error_time": self.last_error_time,
"consecutive_failures": self.consecutive_failures,
"disconnection_notified": self.disconnection_notified,
"last_successful_frame": self.last_successful_frame,
"connection_start_time": self.connection_start_time
}
class CameraMonitor:
"""
Monitors camera connection states and handles disconnection notifications.
This class provides a centralized way to track camera connection status,
handle error states, and determine when to notify about disconnections.
"""
def __init__(self, failure_threshold: int = 3):
"""
Initialize camera monitor.
Args:
failure_threshold: Number of consecutive failures before considering disconnected
"""
self.failure_threshold = failure_threshold
self._camera_states: Dict[str, CameraConnectionState] = {}
self._lock = None # Will be initialized when needed for thread safety
def _ensure_thread_safety(self):
"""Initialize thread safety if not already done."""
if self._lock is None:
import threading
self._lock = threading.RLock()
def get_camera_state(self, camera_id: str) -> CameraConnectionState:
"""
Get or create camera connection state.
Args:
camera_id: Unique camera identifier
Returns:
Current connection state for the camera
"""
self._ensure_thread_safety()
with self._lock:
if camera_id not in self._camera_states:
self._camera_states[camera_id] = CameraConnectionState()
logger.debug(f"Initialized connection state for camera {camera_id}")
return self._camera_states[camera_id]
def set_camera_connected(self,
camera_id: str,
connected: bool = True,
error_msg: Optional[str] = None) -> None:
"""
Set camera connection state and track error information.
Args:
camera_id: Unique camera identifier
connected: Whether camera is connected
error_msg: Error message if disconnected
"""
self._ensure_thread_safety()
current_time = time.time()
with self._lock:
state = self.get_camera_state(camera_id)
if connected:
# Camera is now connected
if not state.connected:
logger.info(f"Camera {camera_id} reconnected successfully")
state.connected = True
state.last_successful_frame = current_time
state.consecutive_failures = 0
state.disconnection_notified = False
state.last_error = None
state.last_error_time = None
else:
# Camera is now disconnected
state.connected = False
state.consecutive_failures += 1
state.last_error = error_msg
state.last_error_time = current_time
if state.consecutive_failures == 1:
logger.warning(f"Camera {camera_id} connection lost: {error_msg}")
elif state.consecutive_failures >= self.failure_threshold:
logger.error(f"Camera {camera_id} has {state.consecutive_failures} consecutive failures")
logger.debug(f"Camera {camera_id} state updated - failures: {state.consecutive_failures}")
def is_camera_connected(self, camera_id: str) -> bool:
"""
Check if camera is currently connected.
Args:
camera_id: Unique camera identifier
Returns:
True if camera is connected, False otherwise
"""
self._ensure_thread_safety()
with self._lock:
state = self._camera_states.get(camera_id)
return state.connected if state else True # Default to connected for new cameras
def should_notify_disconnection(self, camera_id: str) -> bool:
"""
Check if we should notify backend about disconnection.
A disconnection notification should be sent when:
1. Camera is disconnected
2. We haven't already notified about this disconnection
3. We have enough consecutive failures
Args:
camera_id: Unique camera identifier
Returns:
True if disconnection notification should be sent
"""
self._ensure_thread_safety()
with self._lock:
state = self._camera_states.get(camera_id)
if not state:
return False
is_disconnected = not state.connected
not_yet_notified = not state.disconnection_notified
has_enough_failures = state.consecutive_failures >= self.failure_threshold
should_notify = is_disconnected and not_yet_notified and has_enough_failures
if should_notify:
logger.info(f"Camera {camera_id} qualifies for disconnection notification - "
f"failures: {state.consecutive_failures}, error: {state.last_error}")
return should_notify
def mark_disconnection_notified(self, camera_id: str) -> None:
"""
Mark that we've notified backend about this disconnection.
Args:
camera_id: Unique camera identifier
"""
self._ensure_thread_safety()
with self._lock:
if camera_id in self._camera_states:
self._camera_states[camera_id].disconnection_notified = True
logger.debug(f"Marked disconnection notification sent for camera {camera_id}")
def get_connection_stats(self, camera_id: str) -> Dict[str, Any]:
"""
Get comprehensive connection statistics for a camera.
Args:
camera_id: Unique camera identifier
Returns:
Dictionary with connection statistics
"""
self._ensure_thread_safety()
with self._lock:
state = self._camera_states.get(camera_id)
if not state:
return {"error": "Camera not found"}
current_time = time.time()
stats = state.to_dict()
# Add computed stats
if state.connection_start_time:
stats["uptime_seconds"] = current_time - state.connection_start_time
if state.last_successful_frame:
stats["seconds_since_last_frame"] = current_time - state.last_successful_frame
if state.last_error_time:
stats["seconds_since_last_error"] = current_time - state.last_error_time
return stats
def reset_camera_state(self, camera_id: str) -> None:
"""
Reset camera state to initial connected state.
Args:
camera_id: Unique camera identifier
"""
self._ensure_thread_safety()
with self._lock:
if camera_id in self._camera_states:
del self._camera_states[camera_id]
logger.info(f"Reset connection state for camera {camera_id}")
def cleanup_inactive_cameras(self, inactive_threshold_seconds: int = 3600) -> int:
"""
Remove states for cameras inactive for too long.
Args:
inactive_threshold_seconds: Seconds of inactivity before cleanup
Returns:
Number of camera states cleaned up
"""
self._ensure_thread_safety()
current_time = time.time()
cleanup_count = 0
with self._lock:
camera_ids_to_remove = []
for camera_id, state in self._camera_states.items():
last_activity = max(
state.connection_start_time or 0,
state.last_successful_frame or 0,
state.last_error_time or 0
)
if current_time - last_activity > inactive_threshold_seconds:
camera_ids_to_remove.append(camera_id)
for camera_id in camera_ids_to_remove:
del self._camera_states[camera_id]
cleanup_count += 1
logger.debug(f"Cleaned up inactive camera state for {camera_id}")
if cleanup_count > 0:
logger.info(f"Cleaned up {cleanup_count} inactive camera states")
return cleanup_count
def get_all_camera_states(self) -> Dict[str, Dict[str, Any]]:
"""
Get connection states for all monitored cameras.
Returns:
Dictionary mapping camera IDs to their connection states
"""
self._ensure_thread_safety()
with self._lock:
return {
camera_id: state.to_dict()
for camera_id, state in self._camera_states.items()
}
def get_disconnected_cameras(self) -> list[str]:
"""
Get list of currently disconnected camera IDs.
Returns:
List of camera IDs that are currently disconnected
"""
self._ensure_thread_safety()
with self._lock:
return [
camera_id for camera_id, state in self._camera_states.items()
if not state.connected
]
# Global camera monitor instance
camera_monitor = CameraMonitor()
# ===== CONVENIENCE FUNCTIONS =====
# These provide the same interface as the original functions in app.py
def set_camera_connected(camera_id: str, connected: bool = True, error_msg: Optional[str] = None) -> None:
"""Set camera connection state and track error information."""
camera_monitor.set_camera_connected(camera_id, connected, error_msg)
def is_camera_connected(camera_id: str) -> bool:
"""Check if camera is currently connected."""
return camera_monitor.is_camera_connected(camera_id)
def should_notify_disconnection(camera_id: str) -> bool:
"""Check if we should notify backend about disconnection."""
return camera_monitor.should_notify_disconnection(camera_id)
def mark_disconnection_notified(camera_id: str) -> None:
"""Mark that we've notified backend about this disconnection."""
camera_monitor.mark_disconnection_notified(camera_id)
def get_connection_stats(camera_id: str) -> Dict[str, Any]:
"""Get comprehensive connection statistics for a camera."""
return camera_monitor.get_connection_stats(camera_id)
def reset_camera_state(camera_id: str) -> None:
"""Reset camera state to initial connected state."""
camera_monitor.reset_camera_state(camera_id)

View file

@ -0,0 +1,476 @@
"""
Frame reading implementations for RTSP and HTTP snapshot streams.
This module provides thread-safe frame readers for different camera stream types.
"""
import cv2
import time
import queue
import logging
import requests
import threading
from typing import Optional, Any
import numpy as np
from ..core.constants import (
DEFAULT_RECONNECT_INTERVAL_SEC,
DEFAULT_MAX_RETRIES,
HTTP_SNAPSHOT_TIMEOUT,
SHARED_STREAM_BUFFER_SIZE
)
from ..core.exceptions import (
CameraConnectionError,
FrameReadError,
create_stream_error
)
logger = logging.getLogger(__name__)
def fetch_snapshot(url: str, timeout: int = HTTP_SNAPSHOT_TIMEOUT) -> Optional[np.ndarray]:
"""
Fetch a single snapshot from HTTP/HTTPS URL.
Args:
url: HTTP/HTTPS URL to fetch snapshot from
timeout: Request timeout in seconds
Returns:
Decoded image frame or None if fetch failed
"""
try:
logger.debug(f"Fetching snapshot from {url}")
response = requests.get(url, timeout=timeout, stream=True)
response.raise_for_status()
# Check if response has content
if not response.content:
logger.warning(f"Empty response from snapshot URL: {url}")
return None
# Decode image from response bytes
img_array = np.frombuffer(response.content, dtype=np.uint8)
frame = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
if frame is None:
logger.warning(f"Failed to decode image from snapshot URL: {url}")
return None
logger.debug(f"Successfully fetched snapshot from {url}, shape: {frame.shape}")
return frame
except requests.exceptions.Timeout:
logger.warning(f"Timeout fetching snapshot from {url}")
return None
except requests.exceptions.ConnectionError:
logger.warning(f"Connection error fetching snapshot from {url}")
return None
except requests.exceptions.HTTPError as e:
logger.warning(f"HTTP error fetching snapshot from {url}: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error fetching snapshot from {url}: {e}")
return None
class RTSPFrameReader:
"""Thread-safe RTSP frame reader."""
def __init__(self,
camera_id: str,
rtsp_url: str,
buffer: queue.Queue,
stop_event: threading.Event,
reconnect_interval: int = DEFAULT_RECONNECT_INTERVAL_SEC,
max_retries: int = DEFAULT_MAX_RETRIES,
connection_callback=None):
"""
Initialize RTSP frame reader.
Args:
camera_id: Unique camera identifier
rtsp_url: RTSP stream URL
buffer: Queue to put frames into
stop_event: Event to signal thread shutdown
reconnect_interval: Seconds between reconnection attempts
max_retries: Maximum retry attempts (-1 for unlimited)
connection_callback: Callback function for connection state changes
"""
self.camera_id = camera_id
self.rtsp_url = rtsp_url
self.buffer = buffer
self.stop_event = stop_event
self.reconnect_interval = reconnect_interval
self.max_retries = max_retries
self.connection_callback = connection_callback
self.cap: Optional[cv2.VideoCapture] = None
self.retries = 0
self.frame_count = 0
self.last_log_time = time.time()
def _set_connection_state(self, connected: bool, error_msg: Optional[str] = None):
"""Update connection state via callback."""
if self.connection_callback:
self.connection_callback(self.camera_id, connected, error_msg)
def _initialize_capture(self) -> bool:
"""Initialize video capture."""
try:
self.cap = cv2.VideoCapture(self.rtsp_url)
if self.cap.isOpened():
# Log camera properties
width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = self.cap.get(cv2.CAP_PROP_FPS)
logger.info(f"Camera {self.camera_id} opened: {width}x{height}, FPS: {fps}")
self._set_connection_state(True)
return True
else:
logger.error(f"Camera {self.camera_id} failed to open")
self._set_connection_state(False, "Failed to open camera")
return False
except Exception as e:
error_msg = f"Failed to initialize capture: {e}"
logger.error(f"Camera {self.camera_id}: {error_msg}")
self._set_connection_state(False, error_msg)
return False
def _reconnect(self) -> bool:
"""Attempt to reconnect to RTSP stream."""
if self.cap:
self.cap.release()
self.cap = None
logger.info(f"Attempting to reconnect RTSP stream for camera: {self.camera_id}")
time.sleep(self.reconnect_interval)
return self._initialize_capture()
def _read_frame(self) -> Optional[np.ndarray]:
"""Read a single frame from the stream."""
if not self.cap or not self.cap.isOpened():
return None
try:
ret, frame = self.cap.read()
if not ret:
return None
# Update statistics
self.frame_count += 1
current_time = time.time()
# Log frame stats every 5 seconds
if current_time - self.last_log_time > 5:
elapsed = current_time - self.last_log_time
logger.info(f"Camera {self.camera_id}: Read {self.frame_count} frames in {elapsed:.1f}s")
self.frame_count = 0
self.last_log_time = current_time
return frame
except cv2.error as e:
raise FrameReadError(f"OpenCV error reading frame: {e}")
def _put_frame(self, frame: np.ndarray):
"""Put frame into buffer, removing old frame if necessary."""
# Remove old frame if buffer is full
if not self.buffer.empty():
try:
self.buffer.get_nowait()
logger.debug(f"Removed old frame from buffer for camera {self.camera_id}")
except queue.Empty:
pass
self.buffer.put(frame)
logger.debug(f"Added frame to buffer for camera {self.camera_id}, buffer size: {self.buffer.qsize()}")
def run(self):
"""Main frame reading loop."""
logger.info(f"Starting RTSP frame reader for camera {self.camera_id}")
try:
# Initialize capture
if not self._initialize_capture():
return
while not self.stop_event.is_set():
try:
frame = self._read_frame()
if frame is None:
# Connection lost
self.retries += 1
error_msg = f"Connection lost, retry {self.retries}/{self.max_retries}"
logger.warning(f"Camera {self.camera_id}: {error_msg}")
self._set_connection_state(False, error_msg)
# Check max retries
if self.retries > self.max_retries and self.max_retries != -1:
logger.error(f"Camera {self.camera_id}: Max retries reached, stopping")
self._set_connection_state(False, "Max retries reached")
break
# Attempt reconnection
if not self._reconnect():
continue
# Reset retry counter on successful reconnection
self.retries = 0
continue
# Successfully read frame
logger.debug(f"Camera {self.camera_id}: Read frame, shape: {frame.shape}")
self.retries = 0
self._set_connection_state(True)
self._put_frame(frame)
# Short sleep to avoid CPU overuse
time.sleep(0.01)
except FrameReadError as e:
self.retries += 1
error_msg = f"Frame read error: {e}"
logger.error(f"Camera {self.camera_id}: {error_msg}")
self._set_connection_state(False, error_msg)
# Check max retries
if self.retries > self.max_retries and self.max_retries != -1:
logger.error(f"Camera {self.camera_id}: Max retries reached after error")
self._set_connection_state(False, "Max retries reached after error")
break
# Attempt reconnection
if not self._reconnect():
continue
except Exception as e:
error_msg = f"Unexpected error: {str(e)}"
logger.error(f"Camera {self.camera_id}: {error_msg}", exc_info=True)
self._set_connection_state(False, error_msg)
break
except Exception as e:
logger.error(f"Error in RTSP frame reader for camera {self.camera_id}: {str(e)}", exc_info=True)
finally:
logger.info(f"RTSP frame reader for camera {self.camera_id} is exiting")
if self.cap and self.cap.isOpened():
self.cap.release()
class SnapshotFrameReader:
"""Thread-safe HTTP snapshot frame reader."""
def __init__(self,
camera_id: str,
snapshot_url: str,
snapshot_interval: int,
buffer: queue.Queue,
stop_event: threading.Event,
max_retries: int = DEFAULT_MAX_RETRIES,
connection_callback=None):
"""
Initialize snapshot frame reader.
Args:
camera_id: Unique camera identifier
snapshot_url: HTTP/HTTPS snapshot URL
snapshot_interval: Interval between snapshots in milliseconds
buffer: Queue to put frames into
stop_event: Event to signal thread shutdown
max_retries: Maximum retry attempts (-1 for unlimited)
connection_callback: Callback function for connection state changes
"""
self.camera_id = camera_id
self.snapshot_url = snapshot_url
self.snapshot_interval = snapshot_interval
self.buffer = buffer
self.stop_event = stop_event
self.max_retries = max_retries
self.connection_callback = connection_callback
self.retries = 0
self.consecutive_failures = 0
self.frame_count = 0
self.last_log_time = time.time()
def _set_connection_state(self, connected: bool, error_msg: Optional[str] = None):
"""Update connection state via callback."""
if self.connection_callback:
self.connection_callback(self.camera_id, connected, error_msg)
def _calculate_backoff_delay(self) -> float:
"""Calculate exponential backoff delay based on consecutive failures."""
interval_seconds = self.snapshot_interval / 1000.0
backoff_delay = min(30, max(1, min(2 ** min(self.consecutive_failures - 1, 6), interval_seconds * 2)))
return backoff_delay
def _test_connectivity(self):
"""Test connectivity to snapshot URL."""
if self.consecutive_failures % 5 == 1: # Every 5th failure
try:
test_response = requests.get(self.snapshot_url, timeout=(2, 5), stream=False)
logger.info(f"Camera {self.camera_id}: Connectivity test result: {test_response.status_code}")
except Exception as test_error:
logger.warning(f"Camera {self.camera_id}: Connectivity test failed: {test_error}")
def _put_frame(self, frame: np.ndarray):
"""Put frame into buffer, removing old frame if necessary."""
# Remove old frame if buffer is full
if not self.buffer.empty():
try:
self.buffer.get_nowait()
logger.debug(f"Removed old snapshot from buffer for camera {self.camera_id}")
except queue.Empty:
pass
self.buffer.put(frame)
logger.debug(f"Added snapshot to buffer for camera {self.camera_id}, buffer size: {self.buffer.qsize()}")
def run(self):
"""Main snapshot reading loop."""
logger.info(f"Starting snapshot reader for camera {self.camera_id} from {self.snapshot_url}")
interval_seconds = self.snapshot_interval / 1000.0
logger.info(f"Snapshot interval for camera {self.camera_id}: {interval_seconds}s")
# Initialize connection state
self._set_connection_state(True)
try:
while not self.stop_event.is_set():
try:
start_time = time.time()
frame = fetch_snapshot(self.snapshot_url)
if frame is None:
# Failed to fetch snapshot
self.consecutive_failures += 1
self.retries += 1
error_msg = f"Failed to fetch snapshot, consecutive failures: {self.consecutive_failures}"
logger.warning(f"Camera {self.camera_id}: {error_msg}")
self._set_connection_state(False, error_msg)
# Test connectivity periodically
self._test_connectivity()
# Check max retries
if self.retries > self.max_retries and self.max_retries != -1:
logger.error(f"Camera {self.camera_id}: Max retries reached for snapshot, stopping")
self._set_connection_state(False, "Max retries reached for snapshot")
break
# Exponential backoff
backoff_delay = self._calculate_backoff_delay()
logger.debug(f"Camera {self.camera_id}: Backing off for {backoff_delay:.1f}s")
if self.stop_event.wait(backoff_delay):
break # Exit if stop event set during backoff
continue
# Successfully fetched snapshot
self.consecutive_failures = 0 # Reset on success
self.retries = 0
self.frame_count += 1
current_time = time.time()
# Log frame stats every 5 seconds
if current_time - self.last_log_time > 5:
elapsed = current_time - self.last_log_time
logger.info(f"Camera {self.camera_id}: Fetched {self.frame_count} snapshots in {elapsed:.1f}s")
self.frame_count = 0
self.last_log_time = current_time
logger.debug(f"Camera {self.camera_id}: Fetched snapshot, shape: {frame.shape}")
self._set_connection_state(True)
self._put_frame(frame)
# Wait for interval
elapsed = time.time() - start_time
sleep_time = max(interval_seconds - elapsed, 0)
if sleep_time > 0:
if self.stop_event.wait(sleep_time):
break # Exit if stop event set during sleep
except Exception as e:
self.consecutive_failures += 1
self.retries += 1
error_msg = f"Unexpected error: {str(e)}"
logger.error(f"Camera {self.camera_id}: {error_msg}", exc_info=True)
self._set_connection_state(False, error_msg)
# Check max retries
if self.retries > self.max_retries and self.max_retries != -1:
logger.error(f"Camera {self.camera_id}: Max retries reached after error")
self._set_connection_state(False, "Max retries reached after error")
break
# Exponential backoff for exceptions too
backoff_delay = self._calculate_backoff_delay()
logger.debug(f"Camera {self.camera_id}: Exception backoff for {backoff_delay:.1f}s")
if self.stop_event.wait(backoff_delay):
break # Exit if stop event set during backoff
except Exception as e:
logger.error(f"Error in snapshot reader for camera {self.camera_id}: {str(e)}", exc_info=True)
finally:
logger.info(f"Snapshot reader for camera {self.camera_id} is exiting")
def create_frame_reader_thread(camera_id: str,
rtsp_url: Optional[str] = None,
snapshot_url: Optional[str] = None,
snapshot_interval: Optional[int] = None,
buffer: Optional[queue.Queue] = None,
stop_event: Optional[threading.Event] = None,
connection_callback=None) -> Optional[threading.Thread]:
"""
Create appropriate frame reader thread based on stream type.
Args:
camera_id: Unique camera identifier
rtsp_url: RTSP stream URL (for RTSP streams)
snapshot_url: HTTP snapshot URL (for snapshot streams)
snapshot_interval: Snapshot interval in milliseconds
buffer: Frame buffer queue
stop_event: Thread stop event
connection_callback: Connection state callback
Returns:
Configured thread ready to start, or None if invalid parameters
"""
if not buffer:
buffer = queue.Queue(maxsize=SHARED_STREAM_BUFFER_SIZE)
if not stop_event:
stop_event = threading.Event()
if snapshot_url and snapshot_interval:
# Create snapshot reader
reader = SnapshotFrameReader(
camera_id=camera_id,
snapshot_url=snapshot_url,
snapshot_interval=snapshot_interval,
buffer=buffer,
stop_event=stop_event,
connection_callback=connection_callback
)
thread = threading.Thread(target=reader.run, name=f"snapshot-{camera_id}")
elif rtsp_url:
# Create RTSP reader
reader = RTSPFrameReader(
camera_id=camera_id,
rtsp_url=rtsp_url,
buffer=buffer,
stop_event=stop_event,
connection_callback=connection_callback
)
thread = threading.Thread(target=reader.run, name=f"rtsp-{camera_id}")
else:
logger.error(f"No valid URL provided for camera {camera_id}")
return None
thread.daemon = True
return thread

View file

@ -0,0 +1,572 @@
"""
Stream lifecycle management and coordination.
This module provides centralized management of camera streams including
lifecycle management, resource allocation, and stream coordination.
"""
import time
import queue
import logging
import threading
from typing import Dict, List, Any, Optional, Tuple, Set
from dataclasses import dataclass, field
from datetime import datetime
from ..core.constants import (
DEFAULT_MAX_STREAMS,
SHARED_STREAM_BUFFER_SIZE,
DEFAULT_RECONNECT_INTERVAL_SEC,
DEFAULT_MAX_RETRIES
)
from ..core.exceptions import StreamError, create_stream_error
from ..streams.frame_reader import create_frame_reader_thread
from ..streams.camera_monitor import set_camera_connected
logger = logging.getLogger(__name__)
@dataclass
class StreamInfo:
"""Information about a single camera stream."""
camera_id: str
stream_url: str
stream_type: str # "rtsp" or "snapshot"
snapshot_interval: Optional[int] = None
buffer: Optional[queue.Queue] = None
stop_event: Optional[threading.Event] = None
thread: Optional[threading.Thread] = None
subscribers: Set[str] = field(default_factory=set)
created_at: float = field(default_factory=time.time)
last_frame_time: Optional[float] = None
frame_count: int = 0
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary format."""
return {
"camera_id": self.camera_id,
"stream_url": self.stream_url,
"stream_type": self.stream_type,
"snapshot_interval": self.snapshot_interval,
"subscriber_count": len(self.subscribers),
"subscribers": list(self.subscribers),
"created_at": self.created_at,
"last_frame_time": self.last_frame_time,
"frame_count": self.frame_count,
"is_active": self.thread is not None and self.thread.is_alive()
}
@dataclass
class StreamSubscription:
"""Information about a stream subscription."""
subscription_id: str
camera_id: str
subscriber_id: str
created_at: float = field(default_factory=time.time)
last_access: float = field(default_factory=time.time)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary format."""
return {
"subscription_id": self.subscription_id,
"camera_id": self.camera_id,
"subscriber_id": self.subscriber_id,
"created_at": self.created_at,
"last_access": self.last_access
}
class StreamManager:
"""
Manages camera stream lifecycle and resource allocation.
This class provides centralized management of camera streams including:
- Stream lifecycle management (start/stop/restart)
- Resource allocation and sharing
- Subscriber management
- Connection state monitoring
"""
def __init__(self, max_streams: int = DEFAULT_MAX_STREAMS):
"""
Initialize stream manager.
Args:
max_streams: Maximum number of concurrent streams
"""
self.max_streams = max_streams
self._streams: Dict[str, StreamInfo] = {}
self._subscriptions: Dict[str, StreamSubscription] = {}
self._lock = None
def _ensure_thread_safety(self):
"""Initialize thread safety if not already done."""
if self._lock is None:
import threading
self._lock = threading.RLock()
def _connection_state_callback(self, camera_id: str, connected: bool, error_msg: Optional[str] = None):
"""Callback for connection state changes."""
set_camera_connected(camera_id, connected, error_msg)
def _create_stream_info(self,
camera_id: str,
rtsp_url: Optional[str] = None,
snapshot_url: Optional[str] = None,
snapshot_interval: Optional[int] = None) -> StreamInfo:
"""Create StreamInfo object based on stream type."""
if snapshot_url and snapshot_interval:
return StreamInfo(
camera_id=camera_id,
stream_url=snapshot_url,
stream_type="snapshot",
snapshot_interval=snapshot_interval,
buffer=queue.Queue(maxsize=SHARED_STREAM_BUFFER_SIZE),
stop_event=threading.Event()
)
elif rtsp_url:
return StreamInfo(
camera_id=camera_id,
stream_url=rtsp_url,
stream_type="rtsp",
buffer=queue.Queue(maxsize=SHARED_STREAM_BUFFER_SIZE),
stop_event=threading.Event()
)
else:
raise ValueError("Must provide either RTSP URL or snapshot URL with interval")
def create_subscription(self,
subscription_id: str,
camera_id: str,
subscriber_id: str,
rtsp_url: Optional[str] = None,
snapshot_url: Optional[str] = None,
snapshot_interval: Optional[int] = None) -> bool:
"""
Create a stream subscription.
Args:
subscription_id: Unique subscription identifier
camera_id: Camera identifier
subscriber_id: Subscriber identifier
rtsp_url: RTSP stream URL (for RTSP streams)
snapshot_url: HTTP snapshot URL (for snapshot streams)
snapshot_interval: Snapshot interval in milliseconds
Returns:
True if subscription was created successfully
"""
self._ensure_thread_safety()
with self._lock:
try:
# Check if subscription already exists
if subscription_id in self._subscriptions:
logger.warning(f"Subscription {subscription_id} already exists")
return False
# Check stream limit
if len(self._streams) >= self.max_streams and camera_id not in self._streams:
logger.error(f"Maximum streams ({self.max_streams}) reached, cannot create new stream for camera {camera_id}")
return False
# Create or get existing stream
if camera_id not in self._streams:
stream_info = self._create_stream_info(
camera_id, rtsp_url, snapshot_url, snapshot_interval
)
self._streams[camera_id] = stream_info
# Create and start frame reader thread
thread = create_frame_reader_thread(
camera_id=camera_id,
rtsp_url=rtsp_url,
snapshot_url=snapshot_url,
snapshot_interval=snapshot_interval,
buffer=stream_info.buffer,
stop_event=stream_info.stop_event,
connection_callback=self._connection_state_callback
)
if thread:
stream_info.thread = thread
thread.start()
logger.info(f"Created new {stream_info.stream_type} stream for camera {camera_id}")
else:
# Clean up failed stream
del self._streams[camera_id]
return False
# Add subscriber to stream
stream_info = self._streams[camera_id]
stream_info.subscribers.add(subscription_id)
# Create subscription record
subscription = StreamSubscription(
subscription_id=subscription_id,
camera_id=camera_id,
subscriber_id=subscriber_id
)
self._subscriptions[subscription_id] = subscription
logger.info(f"Created subscription {subscription_id} for camera {camera_id}, subscribers: {len(stream_info.subscribers)}")
return True
except Exception as e:
logger.error(f"Error creating subscription {subscription_id}: {e}")
return False
def remove_subscription(self, subscription_id: str) -> bool:
"""
Remove a stream subscription.
Args:
subscription_id: Unique subscription identifier
Returns:
True if subscription was removed successfully
"""
self._ensure_thread_safety()
with self._lock:
if subscription_id not in self._subscriptions:
logger.warning(f"Subscription {subscription_id} not found")
return False
subscription = self._subscriptions[subscription_id]
camera_id = subscription.camera_id
# Remove subscription
del self._subscriptions[subscription_id]
# Remove subscriber from stream if stream exists
if camera_id in self._streams:
stream_info = self._streams[camera_id]
stream_info.subscribers.discard(subscription_id)
logger.info(f"Removed subscription {subscription_id} for camera {camera_id}, remaining subscribers: {len(stream_info.subscribers)}")
# Stop stream if no more subscribers
if not stream_info.subscribers:
self._stop_stream(camera_id)
return True
def _stop_stream(self, camera_id: str) -> None:
"""Stop a stream and clean up resources."""
if camera_id not in self._streams:
return
stream_info = self._streams[camera_id]
# Signal thread to stop
if stream_info.stop_event:
stream_info.stop_event.set()
# Wait for thread to finish
if stream_info.thread and stream_info.thread.is_alive():
stream_info.thread.join(timeout=5)
if stream_info.thread.is_alive():
logger.warning(f"Stream thread for camera {camera_id} did not stop gracefully")
# Clean up
del self._streams[camera_id]
logger.info(f"Stopped {stream_info.stream_type} stream for camera {camera_id}")
def get_frame(self, subscription_id: str, timeout: float = 0.1) -> Optional[Any]:
"""
Get the latest frame for a subscription.
Args:
subscription_id: Unique subscription identifier
timeout: Timeout for frame retrieval in seconds
Returns:
Latest frame or None if not available
"""
self._ensure_thread_safety()
with self._lock:
if subscription_id not in self._subscriptions:
return None
subscription = self._subscriptions[subscription_id]
camera_id = subscription.camera_id
if camera_id not in self._streams:
return None
stream_info = self._streams[camera_id]
subscription.last_access = time.time()
try:
frame = stream_info.buffer.get(timeout=timeout)
stream_info.last_frame_time = time.time()
stream_info.frame_count += 1
return frame
except queue.Empty:
return None
except Exception as e:
logger.error(f"Error getting frame for subscription {subscription_id}: {e}")
return None
def is_stream_active(self, camera_id: str) -> bool:
"""
Check if a stream is active.
Args:
camera_id: Camera identifier
Returns:
True if stream is active
"""
self._ensure_thread_safety()
with self._lock:
if camera_id not in self._streams:
return False
stream_info = self._streams[camera_id]
return stream_info.thread is not None and stream_info.thread.is_alive()
def get_stream_stats(self, camera_id: str) -> Optional[Dict[str, Any]]:
"""
Get statistics for a stream.
Args:
camera_id: Camera identifier
Returns:
Stream statistics or None if stream not found
"""
self._ensure_thread_safety()
with self._lock:
if camera_id not in self._streams:
return None
stream_info = self._streams[camera_id]
current_time = time.time()
stats = stream_info.to_dict()
stats["uptime_seconds"] = current_time - stream_info.created_at
if stream_info.last_frame_time:
stats["seconds_since_last_frame"] = current_time - stream_info.last_frame_time
return stats
def get_subscription_info(self, subscription_id: str) -> Optional[Dict[str, Any]]:
"""
Get information about a subscription.
Args:
subscription_id: Unique subscription identifier
Returns:
Subscription information or None if not found
"""
self._ensure_thread_safety()
with self._lock:
if subscription_id not in self._subscriptions:
return None
return self._subscriptions[subscription_id].to_dict()
def get_all_streams(self) -> Dict[str, Dict[str, Any]]:
"""
Get information about all active streams.
Returns:
Dictionary mapping camera IDs to stream information
"""
self._ensure_thread_safety()
with self._lock:
return {
camera_id: stream_info.to_dict()
for camera_id, stream_info in self._streams.items()
}
def get_all_subscriptions(self) -> Dict[str, Dict[str, Any]]:
"""
Get information about all active subscriptions.
Returns:
Dictionary mapping subscription IDs to subscription information
"""
self._ensure_thread_safety()
with self._lock:
return {
sub_id: subscription.to_dict()
for sub_id, subscription in self._subscriptions.items()
}
def cleanup_inactive_streams(self, inactive_threshold_seconds: int = 3600) -> int:
"""
Clean up streams that have been inactive for too long.
Args:
inactive_threshold_seconds: Seconds of inactivity before cleanup
Returns:
Number of streams cleaned up
"""
self._ensure_thread_safety()
current_time = time.time()
cleanup_count = 0
with self._lock:
streams_to_remove = []
for camera_id, stream_info in self._streams.items():
# Check if stream has subscribers
if stream_info.subscribers:
continue
# Check if stream has been inactive
last_activity = max(
stream_info.created_at,
stream_info.last_frame_time or 0
)
if current_time - last_activity > inactive_threshold_seconds:
streams_to_remove.append(camera_id)
for camera_id in streams_to_remove:
self._stop_stream(camera_id)
cleanup_count += 1
logger.info(f"Cleaned up inactive stream for camera {camera_id}")
if cleanup_count > 0:
logger.info(f"Cleaned up {cleanup_count} inactive streams")
return cleanup_count
def restart_stream(self, camera_id: str) -> bool:
"""
Restart a stream.
Args:
camera_id: Camera identifier
Returns:
True if stream was restarted successfully
"""
self._ensure_thread_safety()
with self._lock:
if camera_id not in self._streams:
logger.warning(f"Cannot restart stream for camera {camera_id}: stream not found")
return False
stream_info = self._streams[camera_id]
subscribers = stream_info.subscribers.copy()
stream_url = stream_info.stream_url
stream_type = stream_info.stream_type
snapshot_interval = stream_info.snapshot_interval
# Stop current stream
self._stop_stream(camera_id)
# Recreate stream
try:
new_stream_info = self._create_stream_info(
camera_id,
rtsp_url=stream_url if stream_type == "rtsp" else None,
snapshot_url=stream_url if stream_type == "snapshot" else None,
snapshot_interval=snapshot_interval
)
new_stream_info.subscribers = subscribers
self._streams[camera_id] = new_stream_info
# Create and start new frame reader thread
thread = create_frame_reader_thread(
camera_id=camera_id,
rtsp_url=stream_url if stream_type == "rtsp" else None,
snapshot_url=stream_url if stream_type == "snapshot" else None,
snapshot_interval=snapshot_interval,
buffer=new_stream_info.buffer,
stop_event=new_stream_info.stop_event,
connection_callback=self._connection_state_callback
)
if thread:
new_stream_info.thread = thread
thread.start()
logger.info(f"Restarted {stream_type} stream for camera {camera_id}")
return True
else:
# Clean up failed restart
del self._streams[camera_id]
return False
except Exception as e:
logger.error(f"Error restarting stream for camera {camera_id}: {e}")
return False
def shutdown_all(self) -> None:
"""Shutdown all streams and clean up resources."""
self._ensure_thread_safety()
with self._lock:
logger.info("Shutting down all streams...")
# Stop all streams
camera_ids = list(self._streams.keys())
for camera_id in camera_ids:
self._stop_stream(camera_id)
# Clear all subscriptions
self._subscriptions.clear()
logger.info("All streams shut down successfully")
# Global stream manager instance
stream_manager = StreamManager()
# ===== CONVENIENCE FUNCTIONS =====
# These provide a simplified interface for common operations
def create_stream_subscription(subscription_id: str,
camera_id: str,
subscriber_id: str,
rtsp_url: Optional[str] = None,
snapshot_url: Optional[str] = None,
snapshot_interval: Optional[int] = None) -> bool:
"""Create a stream subscription using global stream manager."""
return stream_manager.create_subscription(
subscription_id, camera_id, subscriber_id, rtsp_url, snapshot_url, snapshot_interval
)
def remove_stream_subscription(subscription_id: str) -> bool:
"""Remove a stream subscription using global stream manager."""
return stream_manager.remove_subscription(subscription_id)
def get_stream_frame(subscription_id: str, timeout: float = 0.1) -> Optional[Any]:
"""Get the latest frame for a subscription using global stream manager."""
return stream_manager.get_frame(subscription_id, timeout)
def is_stream_active(camera_id: str) -> bool:
"""Check if a stream is active using global stream manager."""
return stream_manager.is_stream_active(camera_id)
def get_stream_statistics() -> Dict[str, Any]:
"""Get comprehensive stream statistics."""
return {
"streams": stream_manager.get_all_streams(),
"subscriptions": stream_manager.get_all_subscriptions(),
"total_streams": len(stream_manager._streams),
"total_subscriptions": len(stream_manager._subscriptions),
"max_streams": stream_manager.max_streams
}