430 lines
No EOL
16 KiB
Python
430 lines
No EOL
16 KiB
Python
"""
|
|
Frame buffering and caching system optimized for different stream formats.
|
|
Supports 1280x720 RTSP streams and 2560x1440 HTTP snapshots.
|
|
"""
|
|
import threading
|
|
import time
|
|
import cv2
|
|
import logging
|
|
import numpy as np
|
|
from typing import Optional, Dict, Any, Tuple
|
|
from collections import defaultdict
|
|
from enum import Enum
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class StreamType(Enum):
|
|
"""Stream type enumeration."""
|
|
RTSP = "rtsp" # 1280x720 @ 6fps
|
|
HTTP = "http" # 2560x1440 high quality
|
|
|
|
|
|
class FrameBuffer:
|
|
"""Thread-safe frame buffer optimized for different stream types."""
|
|
|
|
def __init__(self, max_age_seconds: int = 5):
|
|
self.max_age_seconds = max_age_seconds
|
|
self._frames: Dict[str, Dict[str, Any]] = {}
|
|
self._stream_types: Dict[str, StreamType] = {}
|
|
self._lock = threading.RLock()
|
|
|
|
# Stream-specific settings
|
|
self.rtsp_config = {
|
|
'width': 1280,
|
|
'height': 720,
|
|
'fps': 6,
|
|
'max_size_mb': 3 # 1280x720x3 bytes = ~2.6MB
|
|
}
|
|
self.http_config = {
|
|
'width': 2560,
|
|
'height': 1440,
|
|
'max_size_mb': 10
|
|
}
|
|
|
|
def put_frame(self, camera_id: str, frame: np.ndarray, stream_type: Optional[StreamType] = None):
|
|
"""Store a frame for the given camera ID with type-specific validation."""
|
|
with self._lock:
|
|
# Detect stream type if not provided
|
|
if stream_type is None:
|
|
stream_type = self._detect_stream_type(frame)
|
|
|
|
# Store stream type
|
|
self._stream_types[camera_id] = stream_type
|
|
|
|
# Validate frame based on stream type
|
|
if not self._validate_frame(frame, stream_type):
|
|
logger.warning(f"Frame validation failed for camera {camera_id} ({stream_type.value})")
|
|
return
|
|
|
|
self._frames[camera_id] = {
|
|
'frame': frame.copy(),
|
|
'timestamp': time.time(),
|
|
'shape': frame.shape,
|
|
'dtype': str(frame.dtype),
|
|
'stream_type': stream_type.value,
|
|
'size_mb': frame.nbytes / (1024 * 1024)
|
|
}
|
|
|
|
logger.debug(f"Stored {stream_type.value} frame for camera {camera_id}: "
|
|
f"{frame.shape[1]}x{frame.shape[0]}, {frame.nbytes / (1024 * 1024):.2f}MB")
|
|
|
|
def get_frame(self, camera_id: str) -> Optional[np.ndarray]:
|
|
"""Get the latest frame for the given camera ID."""
|
|
with self._lock:
|
|
if camera_id not in self._frames:
|
|
return None
|
|
|
|
frame_data = self._frames[camera_id]
|
|
|
|
# Check if frame is too old
|
|
age = time.time() - frame_data['timestamp']
|
|
if age > self.max_age_seconds:
|
|
logger.debug(f"Frame for camera {camera_id} is {age:.1f}s old, discarding")
|
|
del self._frames[camera_id]
|
|
if camera_id in self._stream_types:
|
|
del self._stream_types[camera_id]
|
|
return None
|
|
|
|
return frame_data['frame'].copy()
|
|
|
|
def get_frame_info(self, camera_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get frame metadata without copying the frame data."""
|
|
with self._lock:
|
|
if camera_id not in self._frames:
|
|
return None
|
|
|
|
frame_data = self._frames[camera_id]
|
|
age = time.time() - frame_data['timestamp']
|
|
|
|
if age > self.max_age_seconds:
|
|
del self._frames[camera_id]
|
|
if camera_id in self._stream_types:
|
|
del self._stream_types[camera_id]
|
|
return None
|
|
|
|
return {
|
|
'timestamp': frame_data['timestamp'],
|
|
'age': age,
|
|
'shape': frame_data['shape'],
|
|
'dtype': frame_data['dtype'],
|
|
'stream_type': frame_data.get('stream_type', 'unknown'),
|
|
'size_mb': frame_data.get('size_mb', 0)
|
|
}
|
|
|
|
def has_frame(self, camera_id: str) -> bool:
|
|
"""Check if a valid frame exists for the camera."""
|
|
return self.get_frame_info(camera_id) is not None
|
|
|
|
def clear_camera(self, camera_id: str):
|
|
"""Remove all frames for a specific camera."""
|
|
with self._lock:
|
|
if camera_id in self._frames:
|
|
del self._frames[camera_id]
|
|
if camera_id in self._stream_types:
|
|
del self._stream_types[camera_id]
|
|
logger.debug(f"Cleared frames for camera {camera_id}")
|
|
|
|
def clear_all(self):
|
|
"""Clear all stored frames."""
|
|
with self._lock:
|
|
count = len(self._frames)
|
|
self._frames.clear()
|
|
self._stream_types.clear()
|
|
logger.debug(f"Cleared all frames ({count} cameras)")
|
|
|
|
def get_camera_list(self) -> list:
|
|
"""Get list of cameras with valid frames."""
|
|
with self._lock:
|
|
current_time = time.time()
|
|
valid_cameras = []
|
|
expired_cameras = []
|
|
|
|
for camera_id, frame_data in self._frames.items():
|
|
age = current_time - frame_data['timestamp']
|
|
if age <= self.max_age_seconds:
|
|
valid_cameras.append(camera_id)
|
|
else:
|
|
expired_cameras.append(camera_id)
|
|
|
|
# Clean up expired frames
|
|
for camera_id in expired_cameras:
|
|
del self._frames[camera_id]
|
|
if camera_id in self._stream_types:
|
|
del self._stream_types[camera_id]
|
|
|
|
return valid_cameras
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""Get buffer statistics."""
|
|
with self._lock:
|
|
current_time = time.time()
|
|
stats = {
|
|
'total_cameras': len(self._frames),
|
|
'valid_cameras': 0,
|
|
'expired_cameras': 0,
|
|
'rtsp_cameras': 0,
|
|
'http_cameras': 0,
|
|
'total_memory_mb': 0,
|
|
'cameras': {}
|
|
}
|
|
|
|
for camera_id, frame_data in self._frames.items():
|
|
age = current_time - frame_data['timestamp']
|
|
stream_type = frame_data.get('stream_type', 'unknown')
|
|
size_mb = frame_data.get('size_mb', 0)
|
|
|
|
if age <= self.max_age_seconds:
|
|
stats['valid_cameras'] += 1
|
|
else:
|
|
stats['expired_cameras'] += 1
|
|
|
|
if stream_type == StreamType.RTSP.value:
|
|
stats['rtsp_cameras'] += 1
|
|
elif stream_type == StreamType.HTTP.value:
|
|
stats['http_cameras'] += 1
|
|
|
|
stats['total_memory_mb'] += size_mb
|
|
|
|
stats['cameras'][camera_id] = {
|
|
'age': age,
|
|
'valid': age <= self.max_age_seconds,
|
|
'shape': frame_data['shape'],
|
|
'dtype': frame_data['dtype'],
|
|
'stream_type': stream_type,
|
|
'size_mb': size_mb
|
|
}
|
|
|
|
return stats
|
|
|
|
def _detect_stream_type(self, frame: np.ndarray) -> StreamType:
|
|
"""Detect stream type based on frame dimensions."""
|
|
h, w = frame.shape[:2]
|
|
|
|
# Check if it matches RTSP dimensions (1280x720)
|
|
if w == self.rtsp_config['width'] and h == self.rtsp_config['height']:
|
|
return StreamType.RTSP
|
|
|
|
# Check if it matches HTTP dimensions (2560x1440) or close to it
|
|
if w >= 2000 and h >= 1000:
|
|
return StreamType.HTTP
|
|
|
|
# Default based on size
|
|
if w <= 1920 and h <= 1080:
|
|
return StreamType.RTSP
|
|
else:
|
|
return StreamType.HTTP
|
|
|
|
def _validate_frame(self, frame: np.ndarray, stream_type: StreamType) -> bool:
|
|
"""Validate frame based on stream type."""
|
|
if frame is None or frame.size == 0:
|
|
return False
|
|
|
|
h, w = frame.shape[:2]
|
|
size_mb = frame.nbytes / (1024 * 1024)
|
|
|
|
if stream_type == StreamType.RTSP:
|
|
config = self.rtsp_config
|
|
# Allow some tolerance for RTSP streams
|
|
if abs(w - config['width']) > 100 or abs(h - config['height']) > 100:
|
|
logger.warning(f"RTSP frame size mismatch: {w}x{h} (expected {config['width']}x{config['height']})")
|
|
if size_mb > config['max_size_mb']:
|
|
logger.warning(f"RTSP frame too large: {size_mb:.2f}MB (max {config['max_size_mb']}MB)")
|
|
return False
|
|
|
|
elif stream_type == StreamType.HTTP:
|
|
config = self.http_config
|
|
# More flexible for HTTP snapshots
|
|
if size_mb > config['max_size_mb']:
|
|
logger.warning(f"HTTP snapshot too large: {size_mb:.2f}MB (max {config['max_size_mb']}MB)")
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
class CacheBuffer:
|
|
"""Enhanced frame cache with support for cropping and optimized for different formats."""
|
|
|
|
def __init__(self, max_age_seconds: int = 10):
|
|
self.frame_buffer = FrameBuffer(max_age_seconds)
|
|
self._crop_cache: Dict[str, Dict[str, Any]] = {}
|
|
self._cache_lock = threading.RLock()
|
|
|
|
# Quality settings for different stream types
|
|
self.jpeg_quality = {
|
|
StreamType.RTSP: 90, # Good quality for 720p
|
|
StreamType.HTTP: 95 # High quality for 2K
|
|
}
|
|
|
|
def put_frame(self, camera_id: str, frame: np.ndarray, stream_type: Optional[StreamType] = None):
|
|
"""Store a frame and clear any associated crop cache."""
|
|
self.frame_buffer.put_frame(camera_id, frame, stream_type)
|
|
|
|
# Clear crop cache for this camera since we have a new frame
|
|
with self._cache_lock:
|
|
keys_to_remove = [key for key in self._crop_cache.keys() if key.startswith(f"{camera_id}_")]
|
|
for key in keys_to_remove:
|
|
del self._crop_cache[key]
|
|
|
|
def get_frame(self, camera_id: str, crop_coords: Optional[Tuple[int, int, int, int]] = None) -> Optional[np.ndarray]:
|
|
"""Get frame with optional cropping."""
|
|
if crop_coords is None:
|
|
return self.frame_buffer.get_frame(camera_id)
|
|
|
|
# Check crop cache first
|
|
crop_key = f"{camera_id}_{crop_coords}"
|
|
with self._cache_lock:
|
|
if crop_key in self._crop_cache:
|
|
cache_entry = self._crop_cache[crop_key]
|
|
age = time.time() - cache_entry['timestamp']
|
|
if age <= self.frame_buffer.max_age_seconds:
|
|
return cache_entry['cropped_frame'].copy()
|
|
else:
|
|
del self._crop_cache[crop_key]
|
|
|
|
# Get original frame and crop it
|
|
original_frame = self.frame_buffer.get_frame(camera_id)
|
|
if original_frame is None:
|
|
return None
|
|
|
|
try:
|
|
x1, y1, x2, y2 = crop_coords
|
|
|
|
# Ensure coordinates are within frame bounds
|
|
h, w = original_frame.shape[:2]
|
|
x1 = max(0, min(x1, w))
|
|
y1 = max(0, min(y1, h))
|
|
x2 = max(x1, min(x2, w))
|
|
y2 = max(y1, min(y2, h))
|
|
|
|
cropped_frame = original_frame[y1:y2, x1:x2]
|
|
|
|
# Cache the cropped frame
|
|
with self._cache_lock:
|
|
# Limit cache size to prevent memory issues
|
|
if len(self._crop_cache) > 100:
|
|
# Remove oldest entries
|
|
oldest_keys = sorted(self._crop_cache.keys(),
|
|
key=lambda k: self._crop_cache[k]['timestamp'])[:50]
|
|
for key in oldest_keys:
|
|
del self._crop_cache[key]
|
|
|
|
self._crop_cache[crop_key] = {
|
|
'cropped_frame': cropped_frame.copy(),
|
|
'timestamp': time.time(),
|
|
'crop_coords': (x1, y1, x2, y2)
|
|
}
|
|
|
|
return cropped_frame
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error cropping frame for camera {camera_id}: {e}")
|
|
return original_frame
|
|
|
|
def get_frame_as_jpeg(self, camera_id: str, crop_coords: Optional[Tuple[int, int, int, int]] = None,
|
|
quality: Optional[int] = None) -> Optional[bytes]:
|
|
"""Get frame as JPEG bytes with format-specific quality settings."""
|
|
frame = self.get_frame(camera_id, crop_coords)
|
|
if frame is None:
|
|
return None
|
|
|
|
try:
|
|
# Determine quality based on stream type if not specified
|
|
if quality is None:
|
|
frame_info = self.frame_buffer.get_frame_info(camera_id)
|
|
if frame_info:
|
|
stream_type_str = frame_info.get('stream_type', StreamType.RTSP.value)
|
|
stream_type = StreamType.RTSP if stream_type_str == StreamType.RTSP.value else StreamType.HTTP
|
|
quality = self.jpeg_quality[stream_type]
|
|
else:
|
|
quality = 90 # Default
|
|
|
|
# Encode as JPEG with specified quality
|
|
encode_params = [cv2.IMWRITE_JPEG_QUALITY, quality]
|
|
success, encoded_img = cv2.imencode('.jpg', frame, encode_params)
|
|
|
|
if success:
|
|
jpeg_bytes = encoded_img.tobytes()
|
|
logger.debug(f"Encoded JPEG for camera {camera_id}: quality={quality}, size={len(jpeg_bytes)} bytes")
|
|
return jpeg_bytes
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error encoding frame as JPEG for camera {camera_id}: {e}")
|
|
return None
|
|
|
|
def has_frame(self, camera_id: str) -> bool:
|
|
"""Check if a valid frame exists for the camera."""
|
|
return self.frame_buffer.has_frame(camera_id)
|
|
|
|
def clear_camera(self, camera_id: str):
|
|
"""Remove all frames and cache for a specific camera."""
|
|
self.frame_buffer.clear_camera(camera_id)
|
|
with self._cache_lock:
|
|
# Clear crop cache entries for this camera
|
|
keys_to_remove = [key for key in self._crop_cache.keys() if key.startswith(f"{camera_id}_")]
|
|
for key in keys_to_remove:
|
|
del self._crop_cache[key]
|
|
|
|
def clear_all(self):
|
|
"""Clear all stored frames and cache."""
|
|
self.frame_buffer.clear_all()
|
|
with self._cache_lock:
|
|
self._crop_cache.clear()
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""Get comprehensive buffer and cache statistics."""
|
|
buffer_stats = self.frame_buffer.get_stats()
|
|
|
|
with self._cache_lock:
|
|
cache_stats = {
|
|
'crop_cache_entries': len(self._crop_cache),
|
|
'crop_cache_cameras': len(set(key.split('_')[0] for key in self._crop_cache.keys() if '_' in key)),
|
|
'crop_cache_memory_mb': sum(
|
|
entry['cropped_frame'].nbytes / (1024 * 1024)
|
|
for entry in self._crop_cache.values()
|
|
)
|
|
}
|
|
|
|
return {
|
|
'buffer': buffer_stats,
|
|
'cache': cache_stats,
|
|
'total_memory_mb': buffer_stats.get('total_memory_mb', 0) + cache_stats.get('crop_cache_memory_mb', 0)
|
|
}
|
|
|
|
|
|
# Global shared instances for application use
|
|
shared_frame_buffer = FrameBuffer(max_age_seconds=5)
|
|
shared_cache_buffer = CacheBuffer(max_age_seconds=10)
|
|
|
|
|
|
def save_frame_for_testing(camera_id: str, frame: np.ndarray, test_dir: str = "test_frames"):
|
|
"""Save frame to test directory for verification purposes."""
|
|
import os
|
|
|
|
try:
|
|
os.makedirs(test_dir, exist_ok=True)
|
|
timestamp = int(time.time() * 1000) # milliseconds
|
|
filename = f"{camera_id}_{timestamp}.jpg"
|
|
filepath = os.path.join(test_dir, filename)
|
|
|
|
# Use appropriate quality based on frame size
|
|
h, w = frame.shape[:2]
|
|
if w >= 2000: # High resolution
|
|
quality = 95
|
|
else: # Standard resolution
|
|
quality = 90
|
|
|
|
encode_params = [cv2.IMWRITE_JPEG_QUALITY, quality]
|
|
success = cv2.imwrite(filepath, frame, encode_params)
|
|
|
|
if success:
|
|
size_kb = os.path.getsize(filepath) / 1024
|
|
logger.info(f"Saved test frame: {filepath} ({w}x{h}, {size_kb:.1f}KB)")
|
|
else:
|
|
logger.error(f"Failed to save test frame: {filepath}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error saving test frame for camera {camera_id}: {e}") |