python-detector-worker/core/streaming/readers.py
Siwat Sirichai 33d738b31b
Some checks failed
Build Worker Base and Application Images / check-base-changes (push) Successful in 7s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 2m55s
Build Worker Base and Application Images / deploy-stack (push) Has been cancelled
fix: remove unused watchdog logging configuration and FrameFileHandler
2025-09-26 19:42:57 +07:00

557 lines
No EOL
21 KiB
Python

"""
Frame readers for RTSP streams and HTTP snapshots.
Optimized for 1280x720@6fps RTSP and 2560x1440 HTTP snapshots.
"""
import cv2
import logging
import time
import threading
import requests
import numpy as np
import subprocess
from typing import Optional, Callable
logger = logging.getLogger(__name__)
# Color codes for pretty logging
class Colors:
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
BLUE = '\033[94m'
PURPLE = '\033[95m'
CYAN = '\033[96m'
WHITE = '\033[97m'
BOLD = '\033[1m'
END = '\033[0m'
def log_success(camera_id: str, message: str):
"""Log success messages in green"""
logger.info(f"{Colors.GREEN}[{camera_id}] {message}{Colors.END}")
def log_warning(camera_id: str, message: str):
"""Log warnings in yellow"""
logger.warning(f"{Colors.YELLOW}[{camera_id}] {message}{Colors.END}")
def log_error(camera_id: str, message: str):
"""Log errors in red"""
logger.error(f"{Colors.RED}[{camera_id}] {message}{Colors.END}")
def log_info(camera_id: str, message: str):
"""Log info in cyan"""
logger.info(f"{Colors.CYAN}[{camera_id}] {message}{Colors.END}")
class FFmpegRTSPReader:
"""RTSP stream reader using subprocess FFmpeg piping frames directly to buffer."""
def __init__(self, camera_id: str, rtsp_url: str, max_retries: int = 3):
self.camera_id = camera_id
self.rtsp_url = rtsp_url
self.max_retries = max_retries
self.process = None
self.stop_event = threading.Event()
self.thread = None
self.stderr_thread = None
self.frame_callback: Optional[Callable] = None
# Expected stream specs (for reference, actual dimensions read from PPM header)
self.width = 1280
self.height = 720
# Watchdog timers for stream reliability
self.process_start_time = None
self.last_frame_time = None
self.is_restart = False # Track if this is a restart (shorter timeout)
self.first_start_timeout = 30.0 # 30s timeout on first start
self.restart_timeout = 15.0 # 15s timeout after restart
def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]):
"""Set callback function to handle captured frames."""
self.frame_callback = callback
def start(self):
"""Start the FFmpeg subprocess reader."""
if self.thread and self.thread.is_alive():
logger.warning(f"FFmpeg reader for {self.camera_id} already running")
return
self.stop_event.clear()
self.thread = threading.Thread(target=self._read_frames, daemon=True)
self.thread.start()
log_success(self.camera_id, "Stream started")
def stop(self):
"""Stop the FFmpeg subprocess reader."""
self.stop_event.set()
if self.process:
self.process.terminate()
try:
self.process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.process.kill()
if self.thread:
self.thread.join(timeout=5.0)
if self.stderr_thread:
self.stderr_thread.join(timeout=2.0)
log_info(self.camera_id, "Stream stopped")
# Removed _probe_stream_info - BMP headers contain dimensions
def _start_ffmpeg_process(self):
"""Start FFmpeg subprocess outputting BMP frames to stdout pipe."""
cmd = [
'ffmpeg',
# DO NOT REMOVE
'-hwaccel', 'cuda',
'-hwaccel_device', '0',
'-rtsp_transport', 'tcp',
'-i', self.rtsp_url,
'-f', 'image2pipe', # Output images to pipe
'-vcodec', 'bmp', # BMP format with header containing dimensions
# Use native stream resolution and framerate
'-an', # No audio
'-' # Output to stdout
]
try:
# Start FFmpeg with stdout pipe to read frames directly
self.process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE, # Capture stdout for frame data
stderr=subprocess.PIPE, # Capture stderr for error logging
bufsize=0 # Unbuffered for real-time processing
)
# Start stderr reading thread
if self.stderr_thread and self.stderr_thread.is_alive():
# Stop previous stderr thread
try:
self.stderr_thread.join(timeout=1.0)
except:
pass
self.stderr_thread = threading.Thread(target=self._read_stderr, daemon=True)
self.stderr_thread.start()
# Set process start time for watchdog
self.process_start_time = time.time()
self.last_frame_time = None # Reset frame time
# After successful restart, next timeout will be back to 30s
if self.is_restart:
log_info(self.camera_id, f"FFmpeg restarted successfully, next timeout: {self.first_start_timeout}s")
self.is_restart = False
return True
except Exception as e:
log_error(self.camera_id, f"FFmpeg startup failed: {e}")
return False
def _read_bmp_frame(self, pipe):
"""Read BMP frame from pipe - BMP header contains dimensions."""
try:
# Read BMP header (14 bytes file header + 40 bytes info header = 54 bytes minimum)
header_data = b''
bytes_to_read = 54
while len(header_data) < bytes_to_read:
chunk = pipe.read(bytes_to_read - len(header_data))
if not chunk:
return None # Silent end of stream
header_data += chunk
# Parse BMP header
if header_data[:2] != b'BM':
return None # Invalid format, skip frame silently
# Extract file size from header (bytes 2-5)
import struct
file_size = struct.unpack('<L', header_data[2:6])[0]
# Extract width and height from info header (bytes 18-21 and 22-25)
width = struct.unpack('<L', header_data[18:22])[0]
height = struct.unpack('<L', header_data[22:26])[0]
# Read remaining file data
remaining_size = file_size - 54
remaining_data = b''
while len(remaining_data) < remaining_size:
chunk = pipe.read(remaining_size - len(remaining_data))
if not chunk:
return None # Stream ended silently
remaining_data += chunk
# Complete BMP data
bmp_data = header_data + remaining_data
# Use OpenCV to decode BMP directly from memory
frame_array = np.frombuffer(bmp_data, dtype=np.uint8)
frame = cv2.imdecode(frame_array, cv2.IMREAD_COLOR)
if frame is None:
return None # Decode failed silently
return frame
except Exception:
return None # Error reading frame silently
def _read_stderr(self):
"""Read and log FFmpeg stderr output in background thread."""
if not self.process or not self.process.stderr:
return
try:
while self.process and self.process.poll() is None:
try:
line = self.process.stderr.readline()
if line:
error_msg = line.decode('utf-8', errors='ignore').strip()
if error_msg and not self.stop_event.is_set():
# Filter out common noise but log actual errors
if any(keyword in error_msg.lower() for keyword in ['error', 'failed', 'cannot', 'invalid']):
log_error(self.camera_id, f"FFmpeg: {error_msg}")
elif 'warning' in error_msg.lower():
log_warning(self.camera_id, f"FFmpeg: {error_msg}")
except Exception:
break
except Exception:
pass
def _check_watchdog_timeout(self) -> bool:
"""Check if watchdog timeout has been exceeded."""
if not self.process_start_time:
return False
current_time = time.time()
time_since_start = current_time - self.process_start_time
# Determine timeout based on whether this is a restart
timeout = self.restart_timeout if self.is_restart else self.first_start_timeout
# If no frames received yet, check against process start time
if not self.last_frame_time:
if time_since_start > timeout:
log_warning(self.camera_id, f"Watchdog timeout: No frames for {time_since_start:.1f}s (limit: {timeout}s)")
return True
else:
# Check time since last frame
time_since_frame = current_time - self.last_frame_time
if time_since_frame > timeout:
log_warning(self.camera_id, f"Watchdog timeout: No frames for {time_since_frame:.1f}s (limit: {timeout}s)")
return True
return False
def _restart_ffmpeg_process(self):
"""Restart FFmpeg process due to watchdog timeout."""
log_warning(self.camera_id, "Watchdog triggered FFmpeg restart")
# Terminate current process
if self.process:
try:
self.process.terminate()
self.process.wait(timeout=3)
except subprocess.TimeoutExpired:
self.process.kill()
except Exception:
pass
self.process = None
# Mark as restart for shorter timeout
self.is_restart = True
# Small delay before restart
time.sleep(1.0)
def _read_frames(self):
"""Read frames directly from FFmpeg stdout pipe."""
frame_count = 0
last_log_time = time.time()
while not self.stop_event.is_set():
try:
# Check watchdog timeout if process is running
if self.process and self.process.poll() is None:
if self._check_watchdog_timeout():
self._restart_ffmpeg_process()
continue
# Start FFmpeg if not running
if not self.process or self.process.poll() is not None:
if self.process and self.process.poll() is not None:
log_warning(self.camera_id, "Stream disconnected, reconnecting...")
if not self._start_ffmpeg_process():
time.sleep(5.0)
continue
# Read frames directly from FFmpeg stdout
try:
if self.process and self.process.stdout:
# Read BMP frame data
frame = self._read_bmp_frame(self.process.stdout)
if frame is None:
continue
# Update watchdog - we got a frame
self.last_frame_time = time.time()
# Call frame callback
if self.frame_callback:
self.frame_callback(self.camera_id, frame)
frame_count += 1
# Log progress every 60 seconds (quieter)
current_time = time.time()
if current_time - last_log_time >= 60:
log_success(self.camera_id, f"{frame_count} frames captured ({frame.shape[1]}x{frame.shape[0]})")
last_log_time = current_time
except Exception:
# Process might have died, let it restart on next iteration
if self.process:
self.process.terminate()
self.process = None
time.sleep(1.0)
except Exception:
time.sleep(1.0)
# Cleanup
if self.process:
self.process.terminate()
logger = logging.getLogger(__name__)
class HTTPSnapshotReader:
"""HTTP snapshot reader optimized for 2560x1440 (2K) high quality images."""
def __init__(self, camera_id: str, snapshot_url: str, interval_ms: int = 5000, max_retries: int = 3):
self.camera_id = camera_id
self.snapshot_url = snapshot_url
self.interval_ms = interval_ms
self.max_retries = max_retries
self.stop_event = threading.Event()
self.thread = None
self.frame_callback: Optional[Callable] = None
# Expected snapshot specifications
self.expected_width = 2560
self.expected_height = 1440
self.max_file_size = 10 * 1024 * 1024 # 10MB max for 2K image
def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]):
"""Set callback function to handle captured frames."""
self.frame_callback = callback
def start(self):
"""Start the snapshot reader thread."""
if self.thread and self.thread.is_alive():
logger.warning(f"Snapshot reader for {self.camera_id} already running")
return
self.stop_event.clear()
self.thread = threading.Thread(target=self._read_snapshots, daemon=True)
self.thread.start()
logger.info(f"Started snapshot reader for camera {self.camera_id}")
def stop(self):
"""Stop the snapshot reader thread."""
self.stop_event.set()
if self.thread:
self.thread.join(timeout=5.0)
logger.info(f"Stopped snapshot reader for camera {self.camera_id}")
def _read_snapshots(self):
"""Main snapshot reading loop for high quality 2K images."""
retries = 0
frame_count = 0
last_log_time = time.time()
interval_seconds = self.interval_ms / 1000.0
logger.info(f"Snapshot interval for camera {self.camera_id}: {interval_seconds}s")
while not self.stop_event.is_set():
try:
start_time = time.time()
frame = self._fetch_snapshot()
if frame is None:
retries += 1
logger.warning(f"Failed to fetch snapshot for camera {self.camera_id}, retry {retries}/{self.max_retries}")
if self.max_retries != -1 and retries > self.max_retries:
logger.error(f"Max retries reached for snapshot camera {self.camera_id}")
break
time.sleep(min(2.0, interval_seconds))
continue
# Accept any valid image dimensions - don't force specific resolution
if frame.shape[1] <= 0 or frame.shape[0] <= 0:
logger.warning(f"Camera {self.camera_id}: Invalid frame dimensions {frame.shape[1]}x{frame.shape[0]}")
continue
# Reset retry counter on successful fetch
retries = 0
frame_count += 1
# Call frame callback
if self.frame_callback:
try:
self.frame_callback(self.camera_id, frame)
except Exception as e:
logger.error(f"Camera {self.camera_id}: Frame callback error: {e}")
# Log progress every 30 seconds
current_time = time.time()
if current_time - last_log_time >= 30:
logger.info(f"Camera {self.camera_id}: {frame_count} snapshots processed")
last_log_time = current_time
# Wait for next interval
elapsed = time.time() - start_time
sleep_time = max(0, interval_seconds - elapsed)
if sleep_time > 0:
self.stop_event.wait(sleep_time)
except Exception as e:
logger.error(f"Error in snapshot loop for camera {self.camera_id}: {e}")
retries += 1
if self.max_retries != -1 and retries > self.max_retries:
break
time.sleep(min(2.0, interval_seconds))
logger.info(f"Snapshot reader thread ended for camera {self.camera_id}")
def _fetch_snapshot(self) -> Optional[np.ndarray]:
"""Fetch a single high quality snapshot from HTTP URL."""
try:
# Parse URL for authentication
from urllib.parse import urlparse
parsed_url = urlparse(self.snapshot_url)
headers = {
'User-Agent': 'Python-Detector-Worker/1.0',
'Accept': 'image/jpeg, image/png, image/*'
}
auth = None
if parsed_url.username and parsed_url.password:
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
auth = HTTPBasicAuth(parsed_url.username, parsed_url.password)
# Reconstruct URL without credentials
clean_url = f"{parsed_url.scheme}://{parsed_url.hostname}"
if parsed_url.port:
clean_url += f":{parsed_url.port}"
clean_url += parsed_url.path
if parsed_url.query:
clean_url += f"?{parsed_url.query}"
# Try Basic Auth first
response = requests.get(clean_url, auth=auth, timeout=15, headers=headers,
stream=True, verify=False)
# If Basic Auth fails, try Digest Auth
if response.status_code == 401:
auth = HTTPDigestAuth(parsed_url.username, parsed_url.password)
response = requests.get(clean_url, auth=auth, timeout=15, headers=headers,
stream=True, verify=False)
else:
response = requests.get(self.snapshot_url, timeout=15, headers=headers,
stream=True, verify=False)
if response.status_code == 200:
# Check content size
content_length = int(response.headers.get('content-length', 0))
if content_length > self.max_file_size:
logger.warning(f"Snapshot too large for camera {self.camera_id}: {content_length} bytes")
return None
# Read content
content = response.content
# Convert to numpy array
image_array = np.frombuffer(content, np.uint8)
# Decode as high quality image
frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
if frame is None:
logger.error(f"Failed to decode snapshot for camera {self.camera_id}")
return None
logger.debug(f"Fetched snapshot for camera {self.camera_id}: {frame.shape[1]}x{frame.shape[0]}")
return frame
else:
logger.warning(f"HTTP {response.status_code} from {self.camera_id}")
return None
except requests.RequestException as e:
logger.error(f"Request error fetching snapshot for {self.camera_id}: {e}")
return None
except Exception as e:
logger.error(f"Error decoding snapshot for {self.camera_id}: {e}")
return None
def fetch_single_snapshot(self) -> Optional[np.ndarray]:
"""
Fetch a single high-quality snapshot on demand for pipeline processing.
This method is for one-time fetch from HTTP URL, not continuous streaming.
Returns:
High quality 2K snapshot frame or None if failed
"""
logger.info(f"[SNAPSHOT] Fetching snapshot for {self.camera_id} from {self.snapshot_url}")
# Try to fetch snapshot with retries
for attempt in range(self.max_retries):
frame = self._fetch_snapshot()
if frame is not None:
logger.info(f"[SNAPSHOT] Successfully fetched {frame.shape[1]}x{frame.shape[0]} snapshot for {self.camera_id}")
return frame
if attempt < self.max_retries - 1:
logger.warning(f"[SNAPSHOT] Attempt {attempt + 1}/{self.max_retries} failed for {self.camera_id}, retrying...")
time.sleep(0.5)
logger.error(f"[SNAPSHOT] Failed to fetch snapshot for {self.camera_id} after {self.max_retries} attempts")
return None
def _resize_maintain_aspect(self, frame: np.ndarray, target_width: int, target_height: int) -> np.ndarray:
"""Resize image while maintaining aspect ratio for high quality."""
h, w = frame.shape[:2]
aspect = w / h
target_aspect = target_width / target_height
if aspect > target_aspect:
# Image is wider
new_width = target_width
new_height = int(target_width / aspect)
else:
# Image is taller
new_height = target_height
new_width = int(target_height * aspect)
# Use INTER_LANCZOS4 for high quality downsampling
resized = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_LANCZOS4)
# Pad to target size if needed
if new_width < target_width or new_height < target_height:
top = (target_height - new_height) // 2
bottom = target_height - new_height - top
left = (target_width - new_width) // 2
right = target_width - new_width - left
resized = cv2.copyMakeBorder(resized, top, bottom, left, right, cv2.BORDER_CONSTANT, value=[0, 0, 0])
return resized