Merge branch 'dev'
Some checks failed
Build Worker Base and Application Images / check-base-changes (push) Successful in 9s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 3m8s
Build Worker Base and Application Images / deploy-stack (push) Failing after 38s
Some checks failed
Build Worker Base and Application Images / check-base-changes (push) Successful in 9s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 3m8s
Build Worker Base and Application Images / deploy-stack (push) Failing after 38s
This commit is contained in:
commit
d43a971d5b
1 changed files with 74 additions and 39 deletions
|
@ -10,6 +10,7 @@ import requests
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import os
|
import os
|
||||||
import subprocess
|
import subprocess
|
||||||
|
# import fcntl # No longer needed with atomic file operations
|
||||||
from typing import Optional, Callable
|
from typing import Optional, Callable
|
||||||
from watchdog.observers import Observer
|
from watchdog.observers import Observer
|
||||||
from watchdog.events import FileSystemEventHandler
|
from watchdog.events import FileSystemEventHandler
|
||||||
|
@ -23,6 +24,8 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# Suppress noisy watchdog debug logs
|
# Suppress noisy watchdog debug logs
|
||||||
logging.getLogger('watchdog.observers.inotify_buffer').setLevel(logging.CRITICAL)
|
logging.getLogger('watchdog.observers.inotify_buffer').setLevel(logging.CRITICAL)
|
||||||
|
logging.getLogger('watchdog.observers.fsevents').setLevel(logging.CRITICAL)
|
||||||
|
logging.getLogger('fsevents').setLevel(logging.CRITICAL)
|
||||||
|
|
||||||
|
|
||||||
class FrameFileHandler(FileSystemEventHandler):
|
class FrameFileHandler(FileSystemEventHandler):
|
||||||
|
@ -89,63 +92,68 @@ class FFmpegRTSPReader:
|
||||||
logger.info(f"Stopped FFmpeg reader for camera {self.camera_id}")
|
logger.info(f"Stopped FFmpeg reader for camera {self.camera_id}")
|
||||||
|
|
||||||
def _start_ffmpeg_process(self):
|
def _start_ffmpeg_process(self):
|
||||||
"""Start FFmpeg subprocess with CUDA hardware acceleration writing to temp file."""
|
"""Start FFmpeg subprocess writing timestamped frames for atomic reads."""
|
||||||
# Create temp file path for this camera
|
# Create temp file paths for this camera
|
||||||
self.temp_file = f"/tmp/claude/camera_{self.camera_id.replace(' ', '_')}.raw"
|
self.frame_dir = "/tmp/frame"
|
||||||
os.makedirs("/tmp/claude", exist_ok=True)
|
os.makedirs(self.frame_dir, exist_ok=True)
|
||||||
|
|
||||||
# Use PPM format - uncompressed with header, supports -update 1
|
# Use strftime pattern - FFmpeg writes each frame with unique timestamp
|
||||||
self.temp_file = f"/tmp/claude/camera_{self.camera_id.replace(' ', '_')}.ppm"
|
# This ensures each file is complete when written
|
||||||
|
camera_id_safe = self.camera_id.replace(' ', '_')
|
||||||
|
self.frame_prefix = f"camera_{camera_id_safe}"
|
||||||
|
# Using strftime pattern with microseconds for unique filenames
|
||||||
|
self.frame_pattern = f"{self.frame_dir}/{self.frame_prefix}_%Y%m%d_%H%M%S_%f.ppm"
|
||||||
|
|
||||||
cmd = [
|
cmd = [
|
||||||
'ffmpeg',
|
'ffmpeg',
|
||||||
|
# DO NOT REMOVE
|
||||||
'-hwaccel', 'cuda',
|
'-hwaccel', 'cuda',
|
||||||
'-hwaccel_device', '0',
|
'-hwaccel_device', '0',
|
||||||
'-rtsp_transport', 'tcp',
|
'-rtsp_transport', 'tcp',
|
||||||
'-i', self.rtsp_url,
|
'-i', self.rtsp_url,
|
||||||
'-f', 'image2',
|
'-f', 'image2',
|
||||||
'-update', '1', # Works with image2 format
|
'-strftime', '1', # Enable strftime pattern expansion
|
||||||
'-pix_fmt', 'rgb24', # PPM uses RGB not BGR
|
'-pix_fmt', 'rgb24', # PPM uses RGB not BGR
|
||||||
'-an', # No audio
|
'-an', # No audio
|
||||||
'-y', # Overwrite output file
|
'-y', # Overwrite output file
|
||||||
self.temp_file
|
self.frame_pattern # Write timestamped frames
|
||||||
]
|
]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# Log the FFmpeg command for debugging
|
||||||
|
logger.info(f"Starting FFmpeg for camera {self.camera_id} with command: {' '.join(cmd)}")
|
||||||
|
|
||||||
# Start FFmpeg detached - we don't need to communicate with it
|
# Start FFmpeg detached - we don't need to communicate with it
|
||||||
self.process = subprocess.Popen(
|
self.process = subprocess.Popen(
|
||||||
cmd,
|
cmd,
|
||||||
stdout=subprocess.DEVNULL,
|
stdout=subprocess.DEVNULL,
|
||||||
stderr=subprocess.DEVNULL
|
stderr=subprocess.DEVNULL
|
||||||
)
|
)
|
||||||
logger.info(f"Started FFmpeg process PID {self.process.pid} for camera {self.camera_id} -> {self.temp_file}")
|
logger.info(f"Started FFmpeg process PID {self.process.pid} for camera {self.camera_id} -> {self.frame_pattern}")
|
||||||
return True
|
return True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to start FFmpeg for camera {self.camera_id}: {e}")
|
logger.error(f"Failed to start FFmpeg for camera {self.camera_id}: {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def _setup_file_watcher(self):
|
def _setup_file_watcher(self):
|
||||||
"""Setup file system watcher for temp file."""
|
"""Setup file system watcher for frame directory."""
|
||||||
if not os.path.exists(self.temp_file):
|
# Setup file watcher for the frame directory
|
||||||
return
|
handler = FrameFileHandler(lambda: self._on_file_changed())
|
||||||
|
|
||||||
# Setup file watcher
|
|
||||||
handler = FrameFileHandler(self._on_file_changed)
|
|
||||||
self.observer = Observer()
|
self.observer = Observer()
|
||||||
self.observer.schedule(handler, os.path.dirname(self.temp_file), recursive=False)
|
self.observer.schedule(handler, self.frame_dir, recursive=False)
|
||||||
self.observer.start()
|
self.observer.start()
|
||||||
logger.info(f"Started file watcher for {self.temp_file}")
|
logger.info(f"Started file watcher for {self.frame_dir} with pattern {self.frame_prefix}*.ppm")
|
||||||
|
|
||||||
def _on_file_changed(self):
|
def _on_file_changed(self):
|
||||||
"""Called when temp file is modified."""
|
"""Called when a new frame file is created."""
|
||||||
if os.path.basename(self.temp_file) in str(self.temp_file):
|
# Signal that a new frame might be available
|
||||||
self.frame_ready_event.set()
|
self.frame_ready_event.set()
|
||||||
|
|
||||||
def _read_frames(self):
|
def _read_frames(self):
|
||||||
"""Reactively read frames when file changes."""
|
"""Reactively read frames when file changes."""
|
||||||
frame_count = 0
|
frame_count = 0
|
||||||
last_log_time = time.time()
|
last_log_time = time.time()
|
||||||
bytes_per_frame = self.width * self.height * 3
|
# Remove unused variable: bytes_per_frame = self.width * self.height * 3
|
||||||
restart_check_interval = 10 # Check FFmpeg status every 10 seconds
|
restart_check_interval = 10 # Check FFmpeg status every 10 seconds
|
||||||
|
|
||||||
while not self.stop_event.is_set():
|
while not self.stop_event.is_set():
|
||||||
|
@ -159,14 +167,21 @@ class FFmpegRTSPReader:
|
||||||
time.sleep(5.0)
|
time.sleep(5.0)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Wait for temp file to be created
|
# Wait for FFmpeg to start writing frame files
|
||||||
wait_count = 0
|
wait_count = 0
|
||||||
while not os.path.exists(self.temp_file) and wait_count < 30:
|
while wait_count < 30:
|
||||||
|
# Check if any frame files exist
|
||||||
|
import glob
|
||||||
|
frame_files = glob.glob(f"{self.frame_dir}/{self.frame_prefix}*.ppm")
|
||||||
|
if frame_files:
|
||||||
|
logger.info(f"Found {len(frame_files)} initial frame files for {self.camera_id}")
|
||||||
|
break
|
||||||
time.sleep(1.0)
|
time.sleep(1.0)
|
||||||
wait_count += 1
|
wait_count += 1
|
||||||
|
|
||||||
if not os.path.exists(self.temp_file):
|
if wait_count >= 30:
|
||||||
logger.error(f"Temp file not created after 30s for {self.camera_id}")
|
logger.error(f"No frame files created after 30s for {self.camera_id}")
|
||||||
|
logger.error(f"Expected pattern: {self.frame_dir}/{self.frame_prefix}*.ppm")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Setup file watcher
|
# Setup file watcher
|
||||||
|
@ -176,11 +191,19 @@ class FFmpegRTSPReader:
|
||||||
if self.frame_ready_event.wait(timeout=restart_check_interval):
|
if self.frame_ready_event.wait(timeout=restart_check_interval):
|
||||||
self.frame_ready_event.clear()
|
self.frame_ready_event.clear()
|
||||||
|
|
||||||
# Read PPM frame (uncompressed with header)
|
# Read latest complete frame file
|
||||||
try:
|
try:
|
||||||
if os.path.exists(self.temp_file):
|
import glob
|
||||||
# Read PPM with OpenCV (handles RGB->BGR conversion automatically)
|
# Find all frame files for this camera
|
||||||
frame = cv2.imread(self.temp_file)
|
frame_files = glob.glob(f"{self.frame_dir}/{self.frame_prefix}*.ppm")
|
||||||
|
|
||||||
|
if frame_files:
|
||||||
|
# Sort by filename (which includes timestamp) and get the latest
|
||||||
|
frame_files.sort()
|
||||||
|
latest_frame = frame_files[-1]
|
||||||
|
|
||||||
|
# Read the latest frame (it's complete since FFmpeg wrote it atomically)
|
||||||
|
frame = cv2.imread(latest_frame)
|
||||||
|
|
||||||
if frame is not None and frame.shape == (self.height, self.width, 3):
|
if frame is not None and frame.shape == (self.height, self.width, 3):
|
||||||
# Call frame callback directly
|
# Call frame callback directly
|
||||||
|
@ -192,15 +215,21 @@ class FFmpegRTSPReader:
|
||||||
# Log progress
|
# Log progress
|
||||||
current_time = time.time()
|
current_time = time.time()
|
||||||
if current_time - last_log_time >= 30:
|
if current_time - last_log_time >= 30:
|
||||||
logger.info(f"Camera {self.camera_id}: {frame_count} PPM frames processed reactively")
|
logger.info(f"Camera {self.camera_id}: {frame_count} frames processed")
|
||||||
last_log_time = current_time
|
last_log_time = current_time
|
||||||
else:
|
|
||||||
logger.debug(f"Camera {self.camera_id}: Invalid PPM frame")
|
|
||||||
else:
|
|
||||||
logger.debug(f"Camera {self.camera_id}: PPM file not found yet")
|
|
||||||
|
|
||||||
except (IOError, OSError) as e:
|
# Clean up old frame files to prevent disk filling
|
||||||
logger.debug(f"Camera {self.camera_id}: File read error: {e}")
|
# Keep only the latest 5 frames
|
||||||
|
if len(frame_files) > 5:
|
||||||
|
for old_file in frame_files[:-5]:
|
||||||
|
try:
|
||||||
|
os.remove(old_file)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Camera {self.camera_id}: Error reading frames: {e}")
|
||||||
|
pass
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Camera {self.camera_id}: Error in reactive frame reading: {e}")
|
logger.error(f"Camera {self.camera_id}: Error in reactive frame reading: {e}")
|
||||||
|
@ -212,10 +241,16 @@ class FFmpegRTSPReader:
|
||||||
self.observer.join()
|
self.observer.join()
|
||||||
if self.process:
|
if self.process:
|
||||||
self.process.terminate()
|
self.process.terminate()
|
||||||
# Clean up temp file
|
# Clean up all frame files for this camera
|
||||||
try:
|
try:
|
||||||
if hasattr(self, 'temp_file') and os.path.exists(self.temp_file):
|
if hasattr(self, 'frame_prefix') and hasattr(self, 'frame_dir'):
|
||||||
os.remove(self.temp_file)
|
import glob
|
||||||
|
frame_files = glob.glob(f"{self.frame_dir}/{self.frame_prefix}*.ppm")
|
||||||
|
for frame_file in frame_files:
|
||||||
|
try:
|
||||||
|
os.remove(frame_file)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
logger.info(f"Reactive FFmpeg reader ended for camera {self.camera_id}")
|
logger.info(f"Reactive FFmpeg reader ended for camera {self.camera_id}")
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue