All checks were successful
Build Worker Base and Application Images / check-base-changes (push) Successful in 7s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 2m55s
Build Worker Base and Application Images / deploy-stack (push) Successful in 12s
317 lines
11 KiB
Python
317 lines
11 KiB
Python
"""
|
|
Frame buffering and caching system optimized for different stream formats.
|
|
Supports 1280x720 RTSP streams and 2560x1440 HTTP snapshots.
|
|
"""
|
|
import threading
|
|
import time
|
|
import cv2
|
|
import logging
|
|
import numpy as np
|
|
from typing import Optional, Dict, Any, Tuple
|
|
from collections import defaultdict
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FrameBuffer:
|
|
"""Thread-safe frame buffer for all camera streams."""
|
|
|
|
def __init__(self, max_age_seconds: int = 5):
|
|
self.max_age_seconds = max_age_seconds
|
|
self._frames: Dict[str, Dict[str, Any]] = {}
|
|
self._lock = threading.RLock()
|
|
|
|
def put_frame(self, camera_id: str, frame: np.ndarray):
|
|
"""Store a frame for the given camera ID."""
|
|
with self._lock:
|
|
# Validate frame
|
|
if not self._validate_frame(frame):
|
|
logger.warning(f"Frame validation failed for camera {camera_id}")
|
|
return
|
|
|
|
self._frames[camera_id] = {
|
|
'frame': frame.copy(),
|
|
'timestamp': time.time(),
|
|
'shape': frame.shape,
|
|
'dtype': str(frame.dtype),
|
|
'size_mb': frame.nbytes / (1024 * 1024)
|
|
}
|
|
|
|
def get_frame(self, camera_id: str) -> Optional[np.ndarray]:
|
|
"""Get the latest frame for the given camera ID."""
|
|
with self._lock:
|
|
if camera_id not in self._frames:
|
|
return None
|
|
|
|
frame_data = self._frames[camera_id]
|
|
|
|
# Check if frame is too old
|
|
age = time.time() - frame_data['timestamp']
|
|
if age > self.max_age_seconds:
|
|
logger.debug(f"Frame for camera {camera_id} is {age:.1f}s old, discarding")
|
|
del self._frames[camera_id]
|
|
return None
|
|
|
|
return frame_data['frame'].copy()
|
|
|
|
def get_frame_info(self, camera_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get frame metadata without copying the frame data."""
|
|
with self._lock:
|
|
if camera_id not in self._frames:
|
|
return None
|
|
|
|
frame_data = self._frames[camera_id]
|
|
age = time.time() - frame_data['timestamp']
|
|
|
|
if age > self.max_age_seconds:
|
|
del self._frames[camera_id]
|
|
return None
|
|
|
|
return {
|
|
'timestamp': frame_data['timestamp'],
|
|
'age': age,
|
|
'shape': frame_data['shape'],
|
|
'dtype': frame_data['dtype'],
|
|
'size_mb': frame_data.get('size_mb', 0)
|
|
}
|
|
|
|
def has_frame(self, camera_id: str) -> bool:
|
|
"""Check if a valid frame exists for the camera."""
|
|
return self.get_frame_info(camera_id) is not None
|
|
|
|
def clear_camera(self, camera_id: str):
|
|
"""Remove all frames for a specific camera."""
|
|
with self._lock:
|
|
if camera_id in self._frames:
|
|
del self._frames[camera_id]
|
|
logger.debug(f"Cleared frames for camera {camera_id}")
|
|
|
|
def clear_all(self):
|
|
"""Clear all stored frames."""
|
|
with self._lock:
|
|
count = len(self._frames)
|
|
self._frames.clear()
|
|
logger.debug(f"Cleared all frames ({count} cameras)")
|
|
|
|
def get_camera_list(self) -> list:
|
|
"""Get list of cameras with valid frames."""
|
|
with self._lock:
|
|
current_time = time.time()
|
|
valid_cameras = []
|
|
expired_cameras = []
|
|
|
|
for camera_id, frame_data in self._frames.items():
|
|
age = current_time - frame_data['timestamp']
|
|
if age <= self.max_age_seconds:
|
|
valid_cameras.append(camera_id)
|
|
else:
|
|
expired_cameras.append(camera_id)
|
|
|
|
# Clean up expired frames
|
|
for camera_id in expired_cameras:
|
|
del self._frames[camera_id]
|
|
|
|
return valid_cameras
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""Get buffer statistics."""
|
|
with self._lock:
|
|
current_time = time.time()
|
|
stats = {
|
|
'total_cameras': len(self._frames),
|
|
'valid_cameras': 0,
|
|
'expired_cameras': 0,
|
|
'total_memory_mb': 0,
|
|
'cameras': {}
|
|
}
|
|
|
|
for camera_id, frame_data in self._frames.items():
|
|
age = current_time - frame_data['timestamp']
|
|
size_mb = frame_data.get('size_mb', 0)
|
|
|
|
if age <= self.max_age_seconds:
|
|
stats['valid_cameras'] += 1
|
|
else:
|
|
stats['expired_cameras'] += 1
|
|
|
|
stats['total_memory_mb'] += size_mb
|
|
|
|
stats['cameras'][camera_id] = {
|
|
'age': age,
|
|
'valid': age <= self.max_age_seconds,
|
|
'shape': frame_data['shape'],
|
|
'dtype': frame_data['dtype'],
|
|
'size_mb': size_mb
|
|
}
|
|
|
|
return stats
|
|
|
|
def _validate_frame(self, frame: np.ndarray) -> bool:
|
|
"""Validate frame - basic validation for any stream type."""
|
|
if frame is None or frame.size == 0:
|
|
return False
|
|
|
|
h, w = frame.shape[:2]
|
|
size_mb = frame.nbytes / (1024 * 1024)
|
|
|
|
# Basic size validation - reject extremely large frames regardless of type
|
|
max_size_mb = 50 # Generous limit for any frame type
|
|
if size_mb > max_size_mb:
|
|
logger.warning(f"Frame too large: {size_mb:.2f}MB (max {max_size_mb}MB) for {w}x{h}")
|
|
return False
|
|
|
|
# Basic dimension validation
|
|
if w < 100 or h < 100:
|
|
logger.warning(f"Frame too small: {w}x{h}")
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
class CacheBuffer:
|
|
"""Enhanced frame cache with support for cropping."""
|
|
|
|
def __init__(self, max_age_seconds: int = 10):
|
|
self.frame_buffer = FrameBuffer(max_age_seconds)
|
|
self._crop_cache: Dict[str, Dict[str, Any]] = {}
|
|
self._cache_lock = threading.RLock()
|
|
self.jpeg_quality = 95 # High quality for all frames
|
|
|
|
def put_frame(self, camera_id: str, frame: np.ndarray):
|
|
"""Store a frame and clear any associated crop cache."""
|
|
self.frame_buffer.put_frame(camera_id, frame)
|
|
|
|
# Clear crop cache for this camera since we have a new frame
|
|
with self._cache_lock:
|
|
keys_to_remove = [key for key in self._crop_cache.keys() if key.startswith(f"{camera_id}_")]
|
|
for key in keys_to_remove:
|
|
del self._crop_cache[key]
|
|
|
|
def get_frame(self, camera_id: str, crop_coords: Optional[Tuple[int, int, int, int]] = None) -> Optional[np.ndarray]:
|
|
"""Get frame with optional cropping."""
|
|
if crop_coords is None:
|
|
return self.frame_buffer.get_frame(camera_id)
|
|
|
|
# Check crop cache first
|
|
crop_key = f"{camera_id}_{crop_coords}"
|
|
with self._cache_lock:
|
|
if crop_key in self._crop_cache:
|
|
cache_entry = self._crop_cache[crop_key]
|
|
age = time.time() - cache_entry['timestamp']
|
|
if age <= self.frame_buffer.max_age_seconds:
|
|
return cache_entry['cropped_frame'].copy()
|
|
else:
|
|
del self._crop_cache[crop_key]
|
|
|
|
# Get original frame and crop it
|
|
original_frame = self.frame_buffer.get_frame(camera_id)
|
|
if original_frame is None:
|
|
return None
|
|
|
|
try:
|
|
x1, y1, x2, y2 = crop_coords
|
|
|
|
# Ensure coordinates are within frame bounds
|
|
h, w = original_frame.shape[:2]
|
|
x1 = max(0, min(x1, w))
|
|
y1 = max(0, min(y1, h))
|
|
x2 = max(x1, min(x2, w))
|
|
y2 = max(y1, min(y2, h))
|
|
|
|
cropped_frame = original_frame[y1:y2, x1:x2]
|
|
|
|
# Cache the cropped frame
|
|
with self._cache_lock:
|
|
# Limit cache size to prevent memory issues
|
|
if len(self._crop_cache) > 100:
|
|
# Remove oldest entries
|
|
oldest_keys = sorted(self._crop_cache.keys(),
|
|
key=lambda k: self._crop_cache[k]['timestamp'])[:50]
|
|
for key in oldest_keys:
|
|
del self._crop_cache[key]
|
|
|
|
self._crop_cache[crop_key] = {
|
|
'cropped_frame': cropped_frame.copy(),
|
|
'timestamp': time.time(),
|
|
'crop_coords': (x1, y1, x2, y2)
|
|
}
|
|
|
|
return cropped_frame
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error cropping frame for camera {camera_id}: {e}")
|
|
return original_frame
|
|
|
|
def get_frame_as_jpeg(self, camera_id: str, crop_coords: Optional[Tuple[int, int, int, int]] = None,
|
|
quality: Optional[int] = None) -> Optional[bytes]:
|
|
"""Get frame as JPEG bytes."""
|
|
frame = self.get_frame(camera_id, crop_coords)
|
|
if frame is None:
|
|
return None
|
|
|
|
try:
|
|
# Use specified quality or default
|
|
if quality is None:
|
|
quality = self.jpeg_quality
|
|
|
|
# Encode as JPEG with specified quality
|
|
encode_params = [cv2.IMWRITE_JPEG_QUALITY, quality]
|
|
success, encoded_img = cv2.imencode('.jpg', frame, encode_params)
|
|
|
|
if success:
|
|
jpeg_bytes = encoded_img.tobytes()
|
|
logger.debug(f"Encoded JPEG for camera {camera_id}: quality={quality}, size={len(jpeg_bytes)} bytes")
|
|
return jpeg_bytes
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error encoding frame as JPEG for camera {camera_id}: {e}")
|
|
return None
|
|
|
|
def has_frame(self, camera_id: str) -> bool:
|
|
"""Check if a valid frame exists for the camera."""
|
|
return self.frame_buffer.has_frame(camera_id)
|
|
|
|
def clear_camera(self, camera_id: str):
|
|
"""Remove all frames and cache for a specific camera."""
|
|
self.frame_buffer.clear_camera(camera_id)
|
|
with self._cache_lock:
|
|
# Clear crop cache entries for this camera
|
|
keys_to_remove = [key for key in self._crop_cache.keys() if key.startswith(f"{camera_id}_")]
|
|
for key in keys_to_remove:
|
|
del self._crop_cache[key]
|
|
|
|
def clear_all(self):
|
|
"""Clear all stored frames and cache."""
|
|
self.frame_buffer.clear_all()
|
|
with self._cache_lock:
|
|
self._crop_cache.clear()
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""Get comprehensive buffer and cache statistics."""
|
|
buffer_stats = self.frame_buffer.get_stats()
|
|
|
|
with self._cache_lock:
|
|
cache_stats = {
|
|
'crop_cache_entries': len(self._crop_cache),
|
|
'crop_cache_cameras': len(set(key.split('_')[0] for key in self._crop_cache.keys() if '_' in key)),
|
|
'crop_cache_memory_mb': sum(
|
|
entry['cropped_frame'].nbytes / (1024 * 1024)
|
|
for entry in self._crop_cache.values()
|
|
)
|
|
}
|
|
|
|
return {
|
|
'buffer': buffer_stats,
|
|
'cache': cache_stats,
|
|
'total_memory_mb': buffer_stats.get('total_memory_mb', 0) + cache_stats.get('crop_cache_memory_mb', 0)
|
|
}
|
|
|
|
|
|
# Global shared instances for application use
|
|
shared_frame_buffer = FrameBuffer(max_age_seconds=5)
|
|
shared_cache_buffer = CacheBuffer(max_age_seconds=10)
|
|
|
|
|