refactor: enhance FFmpegRTSPReader to implement persistent file locking for PPM frame reading
All checks were successful
Build Worker Base and Application Images / check-base-changes (push) Successful in 9s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 3m8s
Build Worker Base and Application Images / deploy-stack (push) Successful in 14s
All checks were successful
Build Worker Base and Application Images / check-base-changes (push) Successful in 9s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 3m8s
Build Worker Base and Application Images / deploy-stack (push) Successful in 14s
This commit is contained in:
parent
fe0da18d0f
commit
a12e3efa12
1 changed files with 41 additions and 20 deletions
|
@ -10,6 +10,7 @@ import requests
|
|||
import numpy as np
|
||||
import os
|
||||
import subprocess
|
||||
import fcntl
|
||||
from typing import Optional, Callable
|
||||
from watchdog.observers import Observer
|
||||
from watchdog.events import FileSystemEventHandler
|
||||
|
@ -94,7 +95,7 @@ class FFmpegRTSPReader:
|
|||
self.temp_file = f"/tmp/claude/camera_{self.camera_id.replace(' ', '_')}.raw"
|
||||
os.makedirs("/tmp/claude", exist_ok=True)
|
||||
|
||||
# Use PPM format - uncompressed with header, supports -update 1
|
||||
# Use PPM format with single file (will use file locking for concurrency)
|
||||
self.temp_file = f"/tmp/claude/camera_{self.camera_id.replace(' ', '_')}.ppm"
|
||||
|
||||
cmd = [
|
||||
|
@ -176,31 +177,51 @@ class FFmpegRTSPReader:
|
|||
if self.frame_ready_event.wait(timeout=restart_check_interval):
|
||||
self.frame_ready_event.clear()
|
||||
|
||||
# Read PPM frame (uncompressed with header)
|
||||
# Read PPM frame with persistent lock attempts until new inotify
|
||||
try:
|
||||
if os.path.exists(self.temp_file):
|
||||
# Read PPM with OpenCV (handles RGB->BGR conversion automatically)
|
||||
frame = cv2.imread(self.temp_file)
|
||||
# Keep trying to acquire lock until new inotify event or success
|
||||
max_attempts = 50 # ~500ms worth of attempts
|
||||
for attempt in range(max_attempts):
|
||||
# Check if new inotify event arrived (cancel current attempt)
|
||||
if self.frame_ready_event.is_set():
|
||||
break
|
||||
|
||||
if frame is not None and frame.shape == (self.height, self.width, 3):
|
||||
# Call frame callback directly
|
||||
if self.frame_callback:
|
||||
self.frame_callback(self.camera_id, frame)
|
||||
try:
|
||||
with open(self.temp_file, 'rb') as f:
|
||||
# Try to acquire shared lock (non-blocking)
|
||||
fcntl.flock(f.fileno(), fcntl.LOCK_SH | fcntl.LOCK_NB)
|
||||
|
||||
frame_count += 1
|
||||
# Success! File is locked, safe to read
|
||||
frame = cv2.imread(self.temp_file)
|
||||
|
||||
# Log progress
|
||||
current_time = time.time()
|
||||
if current_time - last_log_time >= 30:
|
||||
logger.info(f"Camera {self.camera_id}: {frame_count} PPM frames processed reactively")
|
||||
last_log_time = current_time
|
||||
else:
|
||||
logger.debug(f"Camera {self.camera_id}: Invalid PPM frame")
|
||||
else:
|
||||
logger.debug(f"Camera {self.camera_id}: PPM file not found yet")
|
||||
if frame is not None and frame.shape == (self.height, self.width, 3):
|
||||
# Call frame callback directly
|
||||
if self.frame_callback:
|
||||
self.frame_callback(self.camera_id, frame)
|
||||
|
||||
except (IOError, OSError) as e:
|
||||
logger.debug(f"Camera {self.camera_id}: File read error: {e}")
|
||||
frame_count += 1
|
||||
|
||||
# Log progress
|
||||
current_time = time.time()
|
||||
if current_time - last_log_time >= 30:
|
||||
logger.info(f"Camera {self.camera_id}: {frame_count} PPM frames processed with persistent locking")
|
||||
last_log_time = current_time
|
||||
# Invalid frame - just skip, no logging needed
|
||||
|
||||
# Successfully processed frame
|
||||
break
|
||||
|
||||
except (OSError, IOError):
|
||||
# File is still locked, wait a bit and try again
|
||||
time.sleep(0.01) # 10ms wait between attempts
|
||||
continue
|
||||
|
||||
# If we get here, exhausted attempts or file not ready - just continue
|
||||
|
||||
except (IOError, OSError):
|
||||
# File errors are routine, just continue
|
||||
pass
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Camera {self.camera_id}: Error in reactive frame reading: {e}")
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue