fix: can read, track, and detect frame
All checks were successful
Build Worker Base and Application Images / check-base-changes (push) Successful in 8s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 3m5s
Build Worker Base and Application Images / deploy-stack (push) Successful in 11s

This commit is contained in:
ziesorx 2025-09-26 11:24:48 +07:00
parent f5c6da8014
commit 83aaf95f59

View file

@ -10,7 +10,7 @@ import requests
import numpy as np
import os
import subprocess
import fcntl
# import fcntl # No longer needed with atomic file operations
from typing import Optional, Callable
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
@ -24,6 +24,8 @@ logger = logging.getLogger(__name__)
# Suppress noisy watchdog debug logs
logging.getLogger('watchdog.observers.inotify_buffer').setLevel(logging.CRITICAL)
logging.getLogger('watchdog.observers.fsevents').setLevel(logging.CRITICAL)
logging.getLogger('fsevents').setLevel(logging.CRITICAL)
class FrameFileHandler(FileSystemEventHandler):
@ -90,63 +92,68 @@ class FFmpegRTSPReader:
logger.info(f"Stopped FFmpeg reader for camera {self.camera_id}")
def _start_ffmpeg_process(self):
"""Start FFmpeg subprocess with CUDA hardware acceleration writing to temp file."""
# Create temp file path for this camera
self.temp_file = f"/tmp/frame/camera_{self.camera_id.replace(' ', '_')}.raw"
os.makedirs("/tmp/frame", exist_ok=True)
"""Start FFmpeg subprocess writing timestamped frames for atomic reads."""
# Create temp file paths for this camera
self.frame_dir = "/tmp/frame"
os.makedirs(self.frame_dir, exist_ok=True)
# Use PPM format with single file (will use file locking for concurrency)
self.temp_file = f"/tmp/frame/camera_{self.camera_id.replace(' ', '_')}.ppm"
# Use strftime pattern - FFmpeg writes each frame with unique timestamp
# This ensures each file is complete when written
camera_id_safe = self.camera_id.replace(' ', '_')
self.frame_prefix = f"camera_{camera_id_safe}"
# Using strftime pattern with microseconds for unique filenames
self.frame_pattern = f"{self.frame_dir}/{self.frame_prefix}_%Y%m%d_%H%M%S_%f.ppm"
cmd = [
'ffmpeg',
# DO NOT REMOVE
'-hwaccel', 'cuda',
'-hwaccel_device', '0',
'-rtsp_transport', 'tcp',
'-i', self.rtsp_url,
'-f', 'image2',
'-update', '1', # Works with image2 format
'-strftime', '1', # Enable strftime pattern expansion
'-pix_fmt', 'rgb24', # PPM uses RGB not BGR
'-an', # No audio
'-y', # Overwrite output file
self.temp_file
self.frame_pattern # Write timestamped frames
]
try:
# Log the FFmpeg command for debugging
logger.info(f"Starting FFmpeg for camera {self.camera_id} with command: {' '.join(cmd)}")
# Start FFmpeg detached - we don't need to communicate with it
self.process = subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
logger.info(f"Started FFmpeg process PID {self.process.pid} for camera {self.camera_id} -> {self.temp_file}")
logger.info(f"Started FFmpeg process PID {self.process.pid} for camera {self.camera_id} -> {self.frame_pattern}")
return True
except Exception as e:
logger.error(f"Failed to start FFmpeg for camera {self.camera_id}: {e}")
return False
def _setup_file_watcher(self):
"""Setup file system watcher for temp file."""
if not os.path.exists(self.temp_file):
return
# Setup file watcher
handler = FrameFileHandler(self._on_file_changed)
"""Setup file system watcher for frame directory."""
# Setup file watcher for the frame directory
handler = FrameFileHandler(lambda: self._on_file_changed())
self.observer = Observer()
self.observer.schedule(handler, os.path.dirname(self.temp_file), recursive=False)
self.observer.schedule(handler, self.frame_dir, recursive=False)
self.observer.start()
logger.info(f"Started file watcher for {self.temp_file}")
logger.info(f"Started file watcher for {self.frame_dir} with pattern {self.frame_prefix}*.ppm")
def _on_file_changed(self):
"""Called when temp file is modified."""
if os.path.basename(self.temp_file) in str(self.temp_file):
self.frame_ready_event.set()
"""Called when a new frame file is created."""
# Signal that a new frame might be available
self.frame_ready_event.set()
def _read_frames(self):
"""Reactively read frames when file changes."""
frame_count = 0
last_log_time = time.time()
bytes_per_frame = self.width * self.height * 3
# Remove unused variable: bytes_per_frame = self.width * self.height * 3
restart_check_interval = 10 # Check FFmpeg status every 10 seconds
while not self.stop_event.is_set():
@ -160,14 +167,21 @@ class FFmpegRTSPReader:
time.sleep(5.0)
continue
# Wait for temp file to be created
# Wait for FFmpeg to start writing frame files
wait_count = 0
while not os.path.exists(self.temp_file) and wait_count < 30:
while wait_count < 30:
# Check if any frame files exist
import glob
frame_files = glob.glob(f"{self.frame_dir}/{self.frame_prefix}*.ppm")
if frame_files:
logger.info(f"Found {len(frame_files)} initial frame files for {self.camera_id}")
break
time.sleep(1.0)
wait_count += 1
if not os.path.exists(self.temp_file):
logger.error(f"Temp file not created after 30s for {self.camera_id}")
if wait_count >= 30:
logger.error(f"No frame files created after 30s for {self.camera_id}")
logger.error(f"Expected pattern: {self.frame_dir}/{self.frame_prefix}*.ppm")
continue
# Setup file watcher
@ -177,50 +191,44 @@ class FFmpegRTSPReader:
if self.frame_ready_event.wait(timeout=restart_check_interval):
self.frame_ready_event.clear()
# Read PPM frame with persistent lock attempts until new inotify
# Read latest complete frame file
try:
if os.path.exists(self.temp_file):
# Keep trying to acquire lock until new inotify event or success
max_attempts = 50 # ~500ms worth of attempts
for attempt in range(max_attempts):
# Check if new inotify event arrived (cancel current attempt)
if self.frame_ready_event.is_set():
break
import glob
# Find all frame files for this camera
frame_files = glob.glob(f"{self.frame_dir}/{self.frame_prefix}*.ppm")
try:
with open(self.temp_file, 'rb') as f:
# Try to acquire shared lock (non-blocking)
fcntl.flock(f.fileno(), fcntl.LOCK_SH | fcntl.LOCK_NB)
if frame_files:
# Sort by filename (which includes timestamp) and get the latest
frame_files.sort()
latest_frame = frame_files[-1]
# Success! File is locked, safe to read
frame = cv2.imread(self.temp_file)
# Read the latest frame (it's complete since FFmpeg wrote it atomically)
frame = cv2.imread(latest_frame)
if frame is not None and frame.shape == (self.height, self.width, 3):
# Call frame callback directly
if self.frame_callback:
self.frame_callback(self.camera_id, frame)
if frame is not None and frame.shape == (self.height, self.width, 3):
# Call frame callback directly
if self.frame_callback:
self.frame_callback(self.camera_id, frame)
frame_count += 1
frame_count += 1
# Log progress
current_time = time.time()
if current_time - last_log_time >= 30:
logger.info(f"Camera {self.camera_id}: {frame_count} PPM frames processed with persistent locking")
last_log_time = current_time
# Invalid frame - just skip, no logging needed
# Log progress
current_time = time.time()
if current_time - last_log_time >= 30:
logger.info(f"Camera {self.camera_id}: {frame_count} frames processed")
last_log_time = current_time
# Successfully processed frame
break
# Clean up old frame files to prevent disk filling
# Keep only the latest 5 frames
if len(frame_files) > 5:
for old_file in frame_files[:-5]:
try:
os.remove(old_file)
except:
pass
except (OSError, IOError):
# File is still locked, wait a bit and try again
time.sleep(0.01) # 10ms wait between attempts
continue
# If we get here, exhausted attempts or file not ready - just continue
except (IOError, OSError):
# File errors are routine, just continue
except Exception as e:
logger.debug(f"Camera {self.camera_id}: Error reading frames: {e}")
pass
except Exception as e:
@ -233,10 +241,16 @@ class FFmpegRTSPReader:
self.observer.join()
if self.process:
self.process.terminate()
# Clean up temp file
# Clean up all frame files for this camera
try:
if hasattr(self, 'temp_file') and os.path.exists(self.temp_file):
os.remove(self.temp_file)
if hasattr(self, 'frame_prefix') and hasattr(self, 'frame_dir'):
import glob
frame_files = glob.glob(f"{self.frame_dir}/{self.frame_prefix}*.ppm")
for frame_file in frame_files:
try:
os.remove(frame_file)
except:
pass
except:
pass
logger.info(f"Reactive FFmpeg reader ended for camera {self.camera_id}")