Some checks failed
Build Worker Base and Application Images / check-base-changes (push) Successful in 7s
Build Worker Base and Application Images / build-base (push) Successful in 10m17s
Build Worker Base and Application Images / build-docker (push) Successful in 3m54s
Build Worker Base and Application Images / deploy-stack (push) Has been cancelled
- Removed the existing `readers.py` file and created separate modules for `FFmpegRTSPReader`, `HTTPSnapshotReader`, and utility functions. - Introduced an abstract base class `VideoReader` to standardize the interface for video stream readers. - Updated `FFmpegRTSPReader` and `HTTPSnapshotReader` to inherit from `VideoReader` and implement required methods. - Enhanced logging utilities for better readability and maintainability. - Removed `pycuda` from requirements as it is no longer needed.
65 lines
No EOL
1.7 KiB
Python
65 lines
No EOL
1.7 KiB
Python
"""
|
|
Abstract base class for video stream readers.
|
|
"""
|
|
from abc import ABC, abstractmethod
|
|
from typing import Optional, Callable
|
|
import numpy as np
|
|
|
|
|
|
class VideoReader(ABC):
|
|
"""Abstract base class for video stream readers."""
|
|
|
|
def __init__(self, camera_id: str, source_url: str, max_retries: int = 3):
|
|
"""
|
|
Initialize the video reader.
|
|
|
|
Args:
|
|
camera_id: Unique identifier for the camera
|
|
source_url: URL or path to the video source
|
|
max_retries: Maximum number of retry attempts
|
|
"""
|
|
self.camera_id = camera_id
|
|
self.source_url = source_url
|
|
self.max_retries = max_retries
|
|
self.frame_callback: Optional[Callable[[str, np.ndarray], None]] = None
|
|
|
|
@abstractmethod
|
|
def start(self) -> None:
|
|
"""Start the video reader."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def stop(self) -> None:
|
|
"""Stop the video reader."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]) -> None:
|
|
"""
|
|
Set callback function to handle captured frames.
|
|
|
|
Args:
|
|
callback: Function that takes (camera_id, frame) as arguments
|
|
"""
|
|
pass
|
|
|
|
@property
|
|
@abstractmethod
|
|
def is_running(self) -> bool:
|
|
"""Check if the reader is currently running."""
|
|
pass
|
|
|
|
@property
|
|
@abstractmethod
|
|
def reader_type(self) -> str:
|
|
"""Get the type of reader (e.g., 'rtsp', 'http_snapshot')."""
|
|
pass
|
|
|
|
def __enter__(self):
|
|
"""Context manager entry."""
|
|
self.start()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
"""Context manager exit."""
|
|
self.stop() |