dev #7

Merged
chawanwit.p merged 25 commits from dev into main 2025-09-09 12:33:37 +00:00
17 changed files with 4268 additions and 465 deletions

4
.gitignore vendored
View file

@ -13,3 +13,7 @@ no_frame_debug.log
feeder/
.venv/
.vscode/
dist/
websocket_comm.log
temp_debug/

View file

@ -1,8 +1,17 @@
# Base image with all ML dependencies
FROM python:3.13-bookworm
FROM pytorch/pytorch:2.8.0-cuda12.6-cudnn9-runtime
# Install system dependencies
RUN apt update && apt install -y libgl1 && rm -rf /var/lib/apt/lists/*
RUN apt update && apt install -y \
libgl1 \
libglib2.0-0 \
libgstreamer1.0-0 \
libgtk-3-0 \
libavcodec58 \
libavformat58 \
libswscale5 \
libgomp1 \
&& rm -rf /var/lib/apt/lists/*
# Copy and install base requirements (ML dependencies that rarely change)
COPY requirements.base.txt .

1800
app.py

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,142 @@
#!/usr/bin/env python3
"""
Test script to check available camera indices
"""
import cv2
import logging
import sys
import subprocess
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
logger = logging.getLogger("camera_index_test")
def test_camera_index(index):
"""Test if a camera index is available"""
try:
cap = cv2.VideoCapture(index)
if cap.isOpened():
ret, frame = cap.read()
if ret and frame is not None:
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
cap.release()
return True, f"{width}x{height} @ {fps}fps"
else:
cap.release()
return False, "Can open but cannot read frames"
else:
cap.release()
return False, "Cannot open camera"
except Exception as e:
return False, f"Error: {str(e)}"
def get_windows_cameras_ffmpeg():
"""Get available cameras on Windows using FFmpeg"""
try:
result = subprocess.run(['ffmpeg', '-f', 'dshow', '-list_devices', 'true', '-i', 'dummy'],
capture_output=True, text=True, timeout=10, encoding='utf-8', errors='ignore')
output = result.stderr
lines = output.split('\n')
video_devices = []
# Parse the output - look for lines with (video) that contain device names in quotes
for line in lines:
if '[dshow @' in line and '(video)' in line and '"' in line:
# Extract device name between first pair of quotes
start = line.find('"') + 1
end = line.find('"', start)
if start > 0 and end > start:
device_name = line[start:end]
video_devices.append(device_name)
logger.info(f"FFmpeg detected video devices: {video_devices}")
return video_devices
except Exception as e:
logger.error(f"Failed to get Windows camera names: {e}")
return []
def main():
logger.info("=== Camera Index Test ===")
# Check FFmpeg availability for Windows device detection
ffmpeg_available = False
try:
result = subprocess.run(['ffmpeg', '-version'], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
ffmpeg_available = True
logger.info("FFmpeg is available")
except:
logger.info("FFmpeg not available")
# Get Windows camera names if possible
if sys.platform.startswith('win') and ffmpeg_available:
logger.info("\n=== Windows Camera Devices (FFmpeg) ===")
cameras = get_windows_cameras_ffmpeg()
if cameras:
for i, camera in enumerate(cameras):
logger.info(f"Device {i}: {camera}")
else:
logger.info("No cameras detected via FFmpeg")
# Test camera indices 0-9
logger.info("\n=== Testing Camera Indices ===")
available_cameras = []
for index in range(10):
logger.info(f"Testing camera index {index}...")
is_available, info = test_camera_index(index)
if is_available:
logger.info(f"✓ Camera {index}: AVAILABLE - {info}")
available_cameras.append(index)
else:
logger.info(f"✗ Camera {index}: NOT AVAILABLE - {info}")
# Summary
logger.info("\n=== Summary ===")
if available_cameras:
logger.info(f"Available camera indices: {available_cameras}")
logger.info(f"Default camera index to use: {available_cameras[0]}")
# Test the first available camera more thoroughly
logger.info(f"\n=== Detailed Test for Camera {available_cameras[0]} ===")
cap = cv2.VideoCapture(available_cameras[0])
if cap.isOpened():
# Get properties
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
backend = cap.getBackendName()
logger.info(f"Resolution: {width}x{height}")
logger.info(f"FPS: {fps}")
logger.info(f"Backend: {backend}")
# Test frame capture
ret, frame = cap.read()
if ret and frame is not None:
logger.info(f"Frame capture: SUCCESS")
logger.info(f"Frame shape: {frame.shape}")
logger.info(f"Frame dtype: {frame.dtype}")
else:
logger.info(f"Frame capture: FAILED")
cap.release()
else:
logger.error("No cameras available!")
logger.info("Possible solutions:")
logger.info("1. Check if camera is connected and not used by another application")
logger.info("2. Check camera permissions")
logger.info("3. Try different camera indices")
logger.info("4. Install camera drivers")
if __name__ == "__main__":
main()

View file

@ -2,7 +2,7 @@
## Overview
The Camera Module implements a pure VMware DRS-like declarative architecture for managing connections to Python ML workers. This system uses the database as the single source of truth for desired subscription state, with automatic regeneration and reconciliation providing intelligent camera management, real-time object detection, and AI-powered content selection with automatic load balancing capabilities.
The Camera Module implements a pure declarative architecture for managing connections to Python ML workers. This system uses the database as the single source of truth for desired subscription state, with automatic regeneration and reconciliation providing intelligent camera management, real-time object detection, and AI-powered content selection with automatic load balancing capabilities.
**Key Architectural Principle**: Database mutations trigger complete state regeneration rather than incremental updates, ensuring consistency and eliminating complex state synchronization issues.
@ -44,7 +44,7 @@ Core distributed cluster implementation that handles declarative state managemen
**Master Mode Responsibilities:**
- Maintains WebSocket connections to all Python workers
- Manages desired vs actual subscription state separation
- Implements VMware DRS-like global rebalancing algorithm
- Implements intelligent global rebalancing algorithm
- Processes automatic reconciliation every 30 seconds
- Responds to slave join/leave events from MasterElection
- Generates fresh pre-signed model URLs for worker assignments
@ -201,10 +201,10 @@ All Redis data uses **manual cleanup only** (no TTL) to ensure:
- Predictable cleanup during planned maintenance
- Debug visibility into system state history
## Pure Declarative Architecture (VMware DRS-like)
## Pure Declarative Architecture
### Concept Overview
The system implements a pure declarative approach similar to VMware Distributed Resource Scheduler (DRS), where:
The system implements a pure declarative approach where:
- **Database**: Single source of truth for desired state (Display+Camera+Playlist combinations)
- **Actual State**: What subscriptions are currently running on workers (stored in `worker:actual_subscriptions`)
- **Regeneration**: Master regenerates complete desired state from database on every change notification
@ -261,8 +261,8 @@ async handleDatabaseChange(changeType: string, entityId: string) {
}
}
// VMware DRS-like worker selection (unchanged)
function findBestWorkerVMwareDRS(workers, currentLoads) {
// Intelligent worker selection (unchanged)
function findBestWorkerForLoad(workers, currentLoads) {
return workers
.map(worker => ({
worker,
@ -280,7 +280,7 @@ function findBestWorkerVMwareDRS(workers, currentLoads) {
3. **Complete Regeneration**: Master queries database for all active Display+Camera+Playlist combinations
4. **Desired State Creation**: Master generates fresh desired subscriptions from database query results
5. **Diff Analysis**: Master compares fresh desired state vs current actual state on workers
6. **Global Reconciliation**: Master applies VMware DRS algorithm to reconcile differences
6. **Global Reconciliation**: Master applies intelligent load balancing algorithm to reconcile differences
7. **Worker Commands**: Master sends subscription/unsubscription commands to workers
8. **State Update**: Master updates actual subscription state in Redis
@ -1483,7 +1483,7 @@ This interface specification provides external services with a clear understandi
- **Service Layer Simplicity**: Services just update database + trigger regeneration - no subscription logic
- **Operational Resilience**: System is self-healing and predictable - any database change triggers complete reconciliation
### VMware DRS-like Benefits
### Declarative Architecture Benefits
- **Global Optimization**: Every regeneration considers all subscriptions globally for optimal load balancing
- **Automatic Recovery**: System automatically heals from any inconsistent state by regenerating from database
- **Resource Efficiency**: Workers assigned based on real-time CPU/memory metrics with load balancing
@ -1495,4 +1495,4 @@ This interface specification provides external services with a clear understandi
- **Memory Efficiency**: No persistent state storage outside of database and current worker assignments
- **Network Efficiency**: Minimal command protocol reduces Redis pub/sub overhead
This pure declarative architecture provides the reliability and simplicity of Kubernetes-style declarative resource management while maintaining the performance and scalability needed for real-time camera processing systems.
This pure declarative architecture provides the reliability and simplicity of container orchestration-style declarative resource management while maintaining the performance and scalability needed for real-time camera processing systems.

View file

@ -1,7 +1,12 @@
torch
torchvision
ultralytics
opencv-python
scipy
filterpy
psycopg2-binary
ultralytics>=8.3.0
opencv-python>=4.6.0
scipy>=1.9.0
filterpy>=1.4.0
psycopg2-binary>=2.9.0
easydict
loguru
pyzmq
gitpython
gdown
lap
pynvml

View file

@ -1,6 +1,5 @@
fastapi
fastapi[standard]
uvicorn
websockets
fastapi[standard]
redis
urllib3<2.0.0

View file

@ -80,37 +80,50 @@ class DatabaseManager:
try:
cur = self.connection.cursor()
# Build the UPDATE query dynamically
# Build the INSERT and UPDATE query dynamically
insert_placeholders = []
insert_values = [key_value] # Start with key_value
set_clauses = []
values = []
update_values = []
for field, value in fields.items():
if value == "NOW()":
# Special handling for NOW()
insert_placeholders.append("NOW()")
set_clauses.append(f"{field} = NOW()")
else:
insert_placeholders.append("%s")
insert_values.append(value)
set_clauses.append(f"{field} = %s")
values.append(value)
update_values.append(value)
# Add schema prefix if table doesn't already have it
full_table_name = table if '.' in table else f"gas_station_1.{table}"
# Build the complete query
query = f"""
INSERT INTO {full_table_name} ({key_field}, {', '.join(fields.keys())})
VALUES (%s, {', '.join(['%s'] * len(fields))})
VALUES (%s, {', '.join(insert_placeholders)})
ON CONFLICT ({key_field})
DO UPDATE SET {', '.join(set_clauses)}
"""
# Add key_value to the beginning of values list
all_values = [key_value] + list(fields.values()) + values
# Combine values for the query: insert_values + update_values
all_values = insert_values + update_values
logger.debug(f"SQL Query: {query}")
logger.debug(f"Values: {all_values}")
cur.execute(query, all_values)
self.connection.commit()
cur.close()
logger.info(f"Updated {table} for {key_field}={key_value}")
logger.info(f"Updated {table} for {key_field}={key_value} with fields: {fields}")
return True
except Exception as e:
logger.error(f"Failed to execute update on {table}: {e}")
logger.error(f"❌ Failed to execute update on {table}: {e}")
logger.debug(f"Query: {query if 'query' in locals() else 'Query not built'}")
logger.debug(f"Values: {all_values if 'all_values' in locals() else 'Values not prepared'}")
if self.connection:
self.connection.rollback()
return False

View file

@ -0,0 +1,242 @@
"""
Shared Model Registry for Memory Optimization
This module implements a global shared model registry to prevent duplicate model loading
in memory when multiple cameras use the same model. This significantly reduces RAM and
GPU VRAM usage by ensuring only one instance of each unique model is loaded.
Key Features:
- Thread-safe model loading and access
- Reference counting for proper cleanup
- Automatic model lifecycle management
- Maintains compatibility with existing pipeline system
"""
import os
import threading
import logging
from typing import Dict, Any, Optional, Set
import torch
from ultralytics import YOLO
# Create a logger for this module
logger = logging.getLogger("detector_worker.model_registry")
class ModelRegistry:
"""
Singleton class for managing shared YOLO models across multiple cameras.
This registry ensures that each unique model is loaded only once in memory,
dramatically reducing RAM and GPU VRAM usage when multiple cameras use the
same model.
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super(ModelRegistry, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._initialized = True
# Thread-safe storage for loaded models
self._models: Dict[str, YOLO] = {} # modelId -> YOLO model instance
self._model_files: Dict[str, str] = {} # modelId -> file path
self._reference_counts: Dict[str, int] = {} # modelId -> reference count
self._model_lock = threading.RLock() # Reentrant lock for nested calls
logger.info("🏭 Shared Model Registry initialized - ready for memory-optimized model loading")
def get_model(self, model_id: str, model_file_path: str) -> YOLO:
"""
Get or load a YOLO model. Returns shared instance if already loaded.
Args:
model_id: Unique identifier for the model
model_file_path: Path to the model file
Returns:
YOLO model instance (shared across all callers)
"""
with self._model_lock:
if model_id in self._models:
# Model already loaded - increment reference count and return
self._reference_counts[model_id] += 1
logger.info(f"📖 Model '{model_id}' reused (ref_count: {self._reference_counts[model_id]}) - SAVED MEMORY!")
return self._models[model_id]
# Model not loaded yet - load it
logger.info(f"🔄 Loading NEW model '{model_id}' from {model_file_path}")
if not os.path.exists(model_file_path):
raise FileNotFoundError(f"Model file {model_file_path} not found")
try:
# Load the YOLO model
model = YOLO(model_file_path)
# Move to GPU if available
if torch.cuda.is_available():
logger.info(f"🚀 CUDA available. Moving model '{model_id}' to GPU VRAM")
model.to("cuda")
else:
logger.info(f"💻 CUDA not available. Using CPU for model '{model_id}'")
# Store in registry
self._models[model_id] = model
self._model_files[model_id] = model_file_path
self._reference_counts[model_id] = 1
logger.info(f"✅ Model '{model_id}' loaded and registered (ref_count: 1)")
self._log_registry_status()
return model
except Exception as e:
logger.error(f"❌ Failed to load model '{model_id}' from {model_file_path}: {e}")
raise
def release_model(self, model_id: str) -> None:
"""
Release a reference to a model. If reference count reaches zero,
the model may be unloaded to free memory.
Args:
model_id: Unique identifier for the model to release
"""
with self._model_lock:
if model_id not in self._reference_counts:
logger.warning(f"⚠️ Attempted to release unknown model '{model_id}'")
return
self._reference_counts[model_id] -= 1
logger.info(f"📉 Model '{model_id}' reference count decreased to {self._reference_counts[model_id]}")
# For now, keep models in memory even when ref count reaches 0
# This prevents reload overhead if the same model is needed again soon
# In the future, we could implement LRU eviction policy
# if self._reference_counts[model_id] <= 0:
# logger.info(f"💤 Model '{model_id}' has 0 references but keeping in memory for reuse")
# Optionally: self._unload_model(model_id)
def _unload_model(self, model_id: str) -> None:
"""
Internal method to unload a model from memory.
Currently not used to prevent reload overhead.
"""
with self._model_lock:
if model_id in self._models:
logger.info(f"🗑️ Unloading model '{model_id}' from memory")
# Clear GPU memory if model was on GPU
model = self._models[model_id]
if hasattr(model, 'model') and hasattr(model.model, 'cuda'):
try:
# Move model to CPU before deletion to free GPU memory
model.to('cpu')
except Exception as e:
logger.warning(f"⚠️ Failed to move model '{model_id}' to CPU: {e}")
# Remove from registry
del self._models[model_id]
del self._model_files[model_id]
del self._reference_counts[model_id]
# Force garbage collection
import gc
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
logger.info(f"✅ Model '{model_id}' unloaded and memory freed")
self._log_registry_status()
def get_registry_status(self) -> Dict[str, Any]:
"""
Get current status of the model registry.
Returns:
Dictionary with registry statistics
"""
with self._model_lock:
return {
"total_models": len(self._models),
"models": {
model_id: {
"file_path": self._model_files[model_id],
"reference_count": self._reference_counts[model_id]
}
for model_id in self._models
},
"total_references": sum(self._reference_counts.values())
}
def _log_registry_status(self) -> None:
"""Log current registry status for debugging."""
status = self.get_registry_status()
logger.info(f"📊 Model Registry Status: {status['total_models']} unique models, {status['total_references']} total references")
for model_id, info in status['models'].items():
logger.debug(f" 📋 '{model_id}': refs={info['reference_count']}, file={os.path.basename(info['file_path'])}")
def cleanup_all(self) -> None:
"""
Clean up all models from the registry. Used during shutdown.
"""
with self._model_lock:
model_ids = list(self._models.keys())
logger.info(f"🧹 Cleaning up {len(model_ids)} models from registry")
for model_id in model_ids:
self._unload_model(model_id)
logger.info("✅ Model registry cleanup complete")
# Global singleton instance
_registry = ModelRegistry()
def get_shared_model(model_id: str, model_file_path: str) -> YOLO:
"""
Convenience function to get a shared model instance.
Args:
model_id: Unique identifier for the model
model_file_path: Path to the model file
Returns:
YOLO model instance (shared across all callers)
"""
return _registry.get_model(model_id, model_file_path)
def release_shared_model(model_id: str) -> None:
"""
Convenience function to release a shared model reference.
Args:
model_id: Unique identifier for the model to release
"""
_registry.release_model(model_id)
def get_registry_status() -> Dict[str, Any]:
"""
Convenience function to get registry status.
Returns:
Dictionary with registry statistics
"""
return _registry.get_registry_status()
def cleanup_registry() -> None:
"""
Convenience function to cleanup the entire registry.
"""
_registry.cleanup_all()

375
siwatsystem/mpta_manager.py Normal file
View file

@ -0,0 +1,375 @@
"""
Shared MPTA Manager for Disk Space Optimization
This module implements shared MPTA file management to prevent duplicate downloads
and extractions when multiple cameras use the same model. MPTA files are stored
in modelId-based directories and shared across all cameras using that model.
Key Features:
- Thread-safe MPTA downloading and extraction
- ModelId-based directory structure: models/{modelId}/
- Reference counting for proper cleanup
- Eliminates duplicate MPTA downloads
- Maintains compatibility with existing pipeline system
"""
import os
import threading
import logging
import shutil
import requests
from typing import Dict, Set, Optional
from urllib.parse import urlparse
from .pympta import load_pipeline_from_zip
# Create a logger for this module
logger = logging.getLogger("detector_worker.mpta_manager")
class MPTAManager:
"""
Singleton class for managing shared MPTA files across multiple cameras.
This manager ensures that each unique modelId is downloaded and extracted
only once, dramatically reducing disk usage and download time when multiple
cameras use the same model.
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super(MPTAManager, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._initialized = True
# Thread-safe storage for MPTA management
self._model_paths: Dict[int, str] = {} # modelId -> shared_extraction_path
self._mpta_file_paths: Dict[int, str] = {} # modelId -> local_mpta_file_path
self._reference_counts: Dict[int, int] = {} # modelId -> reference count
self._download_locks: Dict[int, threading.Lock] = {} # modelId -> download lock
self._cameras_using_model: Dict[int, Set[str]] = {} # modelId -> set of camera_ids
self._manager_lock = threading.RLock() # Reentrant lock for nested calls
logger.info("🏭 Shared MPTA Manager initialized - ready for disk-optimized MPTA management")
def get_or_download_mpta(self, model_id: int, model_url: str, camera_id: str) -> Optional[tuple[str, str]]:
"""
Get or download an MPTA file. Returns (extraction_path, mpta_file_path) if successful.
Args:
model_id: Unique identifier for the model
model_url: URL to download the MPTA file from
camera_id: Identifier for the requesting camera
Returns:
Tuple of (extraction_path, mpta_file_path), or None if failed
"""
with self._manager_lock:
# Track camera usage
if model_id not in self._cameras_using_model:
self._cameras_using_model[model_id] = set()
self._cameras_using_model[model_id].add(camera_id)
# Check if model directory already exists on disk (from previous sessions)
if model_id not in self._model_paths:
potential_path = f"models/{model_id}"
if os.path.exists(potential_path) and os.path.isdir(potential_path):
# Directory exists from previous session, find the MPTA file
mpta_files = [f for f in os.listdir(potential_path) if f.endswith('.mpta')]
if mpta_files:
# Use the first .mpta file found
mpta_file_path = os.path.join(potential_path, mpta_files[0])
self._model_paths[model_id] = potential_path
self._mpta_file_paths[model_id] = mpta_file_path
self._reference_counts[model_id] = 0 # Will be incremented below
logger.info(f"📂 Found existing MPTA modelId {model_id} from previous session")
# Check if already available
if model_id in self._model_paths:
shared_path = self._model_paths[model_id]
mpta_file_path = self._mpta_file_paths.get(model_id)
if os.path.exists(shared_path) and mpta_file_path and os.path.exists(mpta_file_path):
self._reference_counts[model_id] += 1
logger.info(f"📂 MPTA modelId {model_id} reused for camera {camera_id} (ref_count: {self._reference_counts[model_id]}) - SAVED DOWNLOAD!")
return (shared_path, mpta_file_path)
else:
# Path was deleted externally, clean up our records
logger.warning(f"⚠️ MPTA path for modelId {model_id} was deleted externally, will re-download")
del self._model_paths[model_id]
self._mpta_file_paths.pop(model_id, None)
self._reference_counts.pop(model_id, 0)
# Need to download - get or create download lock for this modelId
if model_id not in self._download_locks:
self._download_locks[model_id] = threading.Lock()
# Download with model-specific lock (released _manager_lock to allow other models)
download_lock = self._download_locks[model_id]
with download_lock:
# Double-check after acquiring download lock
with self._manager_lock:
if model_id in self._model_paths and os.path.exists(self._model_paths[model_id]):
mpta_file_path = self._mpta_file_paths.get(model_id)
if mpta_file_path and os.path.exists(mpta_file_path):
self._reference_counts[model_id] += 1
logger.info(f"📂 MPTA modelId {model_id} became available during wait (ref_count: {self._reference_counts[model_id]})")
return (self._model_paths[model_id], mpta_file_path)
# Actually download and extract
shared_path = f"models/{model_id}"
logger.info(f"🔄 Downloading NEW MPTA for modelId {model_id} from {model_url}")
try:
# Ensure directory exists
os.makedirs(shared_path, exist_ok=True)
# Download MPTA file
mpta_filename = self._extract_filename_from_url(model_url) or f"model_{model_id}.mpta"
local_mpta_path = os.path.join(shared_path, mpta_filename)
if not self._download_file(model_url, local_mpta_path):
logger.error(f"❌ Failed to download MPTA for modelId {model_id}")
return None
# Extract MPTA
pipeline_tree = load_pipeline_from_zip(local_mpta_path, shared_path)
if pipeline_tree is None:
logger.error(f"❌ Failed to extract MPTA for modelId {model_id}")
return None
# Success - register in manager
with self._manager_lock:
self._model_paths[model_id] = shared_path
self._mpta_file_paths[model_id] = local_mpta_path
self._reference_counts[model_id] = 1
logger.info(f"✅ MPTA modelId {model_id} downloaded and registered (ref_count: 1)")
self._log_manager_status()
return (shared_path, local_mpta_path)
except Exception as e:
logger.error(f"❌ Error downloading/extracting MPTA for modelId {model_id}: {e}")
# Clean up partial download
if os.path.exists(shared_path):
shutil.rmtree(shared_path, ignore_errors=True)
return None
def release_mpta(self, model_id: int, camera_id: str) -> None:
"""
Release a reference to an MPTA. If reference count reaches zero,
the MPTA directory may be cleaned up to free disk space.
Args:
model_id: Unique identifier for the model to release
camera_id: Identifier for the camera releasing the reference
"""
with self._manager_lock:
if model_id not in self._reference_counts:
logger.warning(f"⚠️ Attempted to release unknown MPTA modelId {model_id} for camera {camera_id}")
return
# Remove camera from usage tracking
if model_id in self._cameras_using_model:
self._cameras_using_model[model_id].discard(camera_id)
self._reference_counts[model_id] -= 1
logger.info(f"📉 MPTA modelId {model_id} reference count decreased to {self._reference_counts[model_id]} (released by {camera_id})")
# Clean up if no more references
# if self._reference_counts[model_id] <= 0:
# self._cleanup_mpta(model_id)
def _cleanup_mpta(self, model_id: int) -> None:
"""
Internal method to clean up an MPTA directory and free disk space.
"""
if model_id in self._model_paths:
shared_path = self._model_paths[model_id]
try:
if os.path.exists(shared_path):
shutil.rmtree(shared_path)
logger.info(f"🗑️ Cleaned up MPTA directory: {shared_path}")
# Remove from tracking
del self._model_paths[model_id]
self._mpta_file_paths.pop(model_id, None)
del self._reference_counts[model_id]
self._cameras_using_model.pop(model_id, None)
# Clean up download lock (optional, could keep for future use)
self._download_locks.pop(model_id, None)
logger.info(f"✅ MPTA modelId {model_id} fully cleaned up and disk space freed")
self._log_manager_status()
except Exception as e:
logger.error(f"❌ Error cleaning up MPTA modelId {model_id}: {e}")
def get_shared_path(self, model_id: int) -> Optional[str]:
"""
Get the shared extraction path for a modelId without downloading.
Args:
model_id: Model identifier to look up
Returns:
Shared path if available, None otherwise
"""
with self._manager_lock:
return self._model_paths.get(model_id)
def get_manager_status(self) -> Dict:
"""
Get current status of the MPTA manager.
Returns:
Dictionary with manager statistics
"""
with self._manager_lock:
return {
"total_mpta_models": len(self._model_paths),
"models": {
str(model_id): {
"shared_path": path,
"reference_count": self._reference_counts.get(model_id, 0),
"cameras_using": list(self._cameras_using_model.get(model_id, set()))
}
for model_id, path in self._model_paths.items()
},
"total_references": sum(self._reference_counts.values()),
"active_downloads": len(self._download_locks)
}
def _log_manager_status(self) -> None:
"""Log current manager status for debugging."""
status = self.get_manager_status()
logger.info(f"📊 MPTA Manager Status: {status['total_mpta_models']} unique models, {status['total_references']} total references")
for model_id, info in status['models'].items():
cameras_str = ','.join(info['cameras_using'][:3]) # Show first 3 cameras
if len(info['cameras_using']) > 3:
cameras_str += f"+{len(info['cameras_using'])-3} more"
logger.debug(f" 📋 ModelId {model_id}: refs={info['reference_count']}, cameras=[{cameras_str}]")
def cleanup_all(self) -> None:
"""
Clean up all MPTA directories. Used during shutdown.
"""
with self._manager_lock:
model_ids = list(self._model_paths.keys())
logger.info(f"🧹 Cleaning up {len(model_ids)} MPTA directories")
for model_id in model_ids:
self._cleanup_mpta(model_id)
# Clear all tracking data
self._download_locks.clear()
logger.info("✅ MPTA manager cleanup complete")
def _download_file(self, url: str, local_path: str) -> bool:
"""
Download a file from URL to local path with progress logging.
Args:
url: URL to download from
local_path: Local path to save to
Returns:
True if successful, False otherwise
"""
try:
logger.info(f"⬇️ Starting download from {url}")
response = requests.get(url, stream=True)
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
if total_size > 0:
logger.info(f"📦 File size: {total_size / 1024 / 1024:.2f} MB")
downloaded = 0
last_logged_progress = 0
with open(local_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total_size > 0:
progress = int((downloaded / total_size) * 100)
# Log at 10% intervals (10%, 20%, 30%, etc.)
if progress >= last_logged_progress + 10 and progress <= 100:
logger.debug(f"Download progress: {progress}%")
last_logged_progress = progress
logger.info(f"✅ Successfully downloaded to {local_path}")
return True
except Exception as e:
logger.error(f"❌ Download failed: {e}")
# Clean up partial file
if os.path.exists(local_path):
os.remove(local_path)
return False
def _extract_filename_from_url(self, url: str) -> Optional[str]:
"""Extract filename from URL."""
try:
parsed = urlparse(url)
filename = os.path.basename(parsed.path)
return filename if filename else None
except Exception:
return None
# Global singleton instance
_mpta_manager = MPTAManager()
def get_or_download_mpta(model_id: int, model_url: str, camera_id: str) -> Optional[tuple[str, str]]:
"""
Convenience function to get or download a shared MPTA.
Args:
model_id: Unique identifier for the model
model_url: URL to download the MPTA file from
camera_id: Identifier for the requesting camera
Returns:
Tuple of (extraction_path, mpta_file_path), or None if failed
"""
return _mpta_manager.get_or_download_mpta(model_id, model_url, camera_id)
def release_mpta(model_id: int, camera_id: str) -> None:
"""
Convenience function to release a shared MPTA reference.
Args:
model_id: Unique identifier for the model to release
camera_id: Identifier for the camera releasing the reference
"""
_mpta_manager.release_mpta(model_id, camera_id)
def get_mpta_manager_status() -> Dict:
"""
Convenience function to get MPTA manager status.
Returns:
Dictionary with manager statistics
"""
return _mpta_manager.get_manager_status()
def cleanup_mpta_manager() -> None:
"""
Convenience function to cleanup the entire MPTA manager.
"""
_mpta_manager.cleanup_all()

File diff suppressed because it is too large Load diff

BIN
test/sample.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.8 MiB

BIN
test/sample2.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.1 MiB

60
test/test.py Normal file
View file

@ -0,0 +1,60 @@
from ultralytics import YOLO
import cv2
import os
# Load the model
# model = YOLO('../models/webcam-local-01/4/bangchak_poc/yolo11n.pt')
model = YOLO('yolo11m.pt')
def test_image(image_path):
"""Test a single image with YOLO model"""
if not os.path.exists(image_path):
print(f"Image not found: {image_path}")
return
# Run inference - filter for car class only (class 2 in COCO)
results = model(image_path, classes=[2, 5, 7]) # 2, 5, 7 = car, bus, truck in COCO dataset
# Display results
for r in results:
im_array = r.plot() # plot a BGR numpy array of predictions
# Resize image for display (max width/height 800px)
height, width = im_array.shape[:2]
max_dimension = 800
if width > max_dimension or height > max_dimension:
if width > height:
new_width = max_dimension
new_height = int(height * (max_dimension / width))
else:
new_height = max_dimension
new_width = int(width * (max_dimension / height))
im_array = cv2.resize(im_array, (new_width, new_height))
# Show image with predictions
cv2.imshow('YOLO Test - Car Detection Only', im_array)
cv2.waitKey(0)
cv2.destroyAllWindows()
# Print detection info
print(f"\nDetections for {image_path}:")
if r.boxes is not None and len(r.boxes) > 0:
for i, box in enumerate(r.boxes):
cls = int(box.cls[0])
conf = float(box.conf[0])
original_class = model.names[cls] # Original class name (car/bus/truck)
# Get bounding box coordinates
x1, y1, x2, y2 = box.xyxy[0].tolist()
# Rename all vehicle types to "car"
print(f"Detection {i+1}: car (was: {original_class}) - Confidence: {conf:.3f} - BBox: ({x1:.0f}, {y1:.0f}, {x2:.0f}, {y2:.0f})")
print(f"Total cars detected: {len(r.boxes)}")
else:
print("No cars detected in the image")
if __name__ == "__main__":
# Test with an image file
image_path = input("Enter image path (or press Enter for default test): ")
if not image_path:
image_path = "sample.png" # Default test image
test_image(image_path)

View file

@ -0,0 +1,352 @@
import cv2
import torch
import numpy as np
import time
from collections import defaultdict
from ultralytics import YOLO
def point_in_polygon(point, polygon):
"""Check if a point is inside a polygon using ray casting algorithm"""
x, y = point
n = len(polygon)
inside = False
p1x, p1y = polygon[0]
for i in range(1, n + 1):
p2x, p2y = polygon[i % n]
if y > min(p1y, p2y):
if y <= max(p1y, p2y):
if x <= max(p1x, p2x):
if p1y != p2y:
xinters = (y - p1y) * (p2x - p1x) / (p2y - p1y) + p1x
if p1x == p2x or x <= xinters:
inside = not inside
p1x, p1y = p2x, p2y
return inside
def draw_zone(frame, zone_polygon, color=(255, 0, 0), thickness=3):
"""Draw tracking zone on frame"""
pts = np.array(zone_polygon, np.int32)
pts = pts.reshape((-1, 1, 2))
cv2.polylines(frame, [pts], True, color, thickness)
# Add semi-transparent fill
overlay = frame.copy()
cv2.fillPoly(overlay, [pts], color)
cv2.addWeighted(overlay, 0.2, frame, 0.8, 0, frame)
def setup_video_writer(output_path, fps, width, height):
"""Setup video writer for output"""
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
return cv2.VideoWriter(output_path, fourcc, fps, (width, height))
def write_frame_to_video(video_writer, frame, repeat_count):
"""Write frame to video with specified repeat count"""
for _ in range(repeat_count):
video_writer.write(frame)
def finalize_video(video_writer):
"""Release video writer"""
video_writer.release()
def main():
video_path = "sample2.mp4"
yolo_model = "bangchakv2/yolov8n.pt"
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
print("Loading YOLO model...")
model = YOLO(yolo_model)
print("Opening video...")
cap = cv2.VideoCapture(video_path)
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"Video info: {width}x{height}, {fps} FPS, {total_frames} frames")
# Define tracking zone - Gas station floor area (trapezoidal shape)
# Based on the perspective of the gas station floor from your image
# width 2560, height 1440
tracking_zone = [
(423, 974), # Point 1
(1540, 1407), # Point 2
(1976, 806), # Point 3
(1364, 749) # Point 4
]
print(f"🎯 Tracking zone defined: {tracking_zone}")
# CONTINUOUS TRACKING: Process every 118 frames (~2.0s intervals)
frame_skip = 118
print(f"🎯 CONTINUOUS MODE: Processing every {frame_skip} frames ({frame_skip/fps:.2f}s intervals)")
print(f"🎬 Output video will have same duration as input (each processed frame shown for 2 seconds)")
print("🔥 ZONE-FIRST TRACKING: Only cars entering the zone will be tracked!")
print("Requires 5 consecutive detections IN ZONE for verification")
print("🕐 24/7 MODE: Memory reset every hour to prevent overflow")
print("Press 'q' to quit")
# Setup video writer for output (same fps as input for normal playback speed)
output_path = "tracking_output_botsort_zone_track.mp4"
output_fps = fps # Use same fps as input video
out = setup_video_writer(output_path, output_fps, width, height)
# Track car IDs and their consecutive detections
car_id_counts = defaultdict(int)
successful_cars = set()
last_positions = {}
processed_count = 0
# ID remapping for clean sequential zone IDs
tracker_to_zone_id = {} # Maps tracker IDs to clean zone IDs
next_zone_id = 1 # Next clean zone ID to assign
# Store previous frame detections to filter tracking inputs
previous_zone_cars = set()
# 24/7 operation: Reset every hour (1800 snapshots at 2-sec intervals = 1 hour)
RESET_INTERVAL = 1800 # Reset every 1800 processed frames (1 hour)
frame_idx = 0
while True:
# Skip frames to maintain interval
for _ in range(frame_skip):
ret, frame = cap.read()
if not ret:
print("\nNo more frames to read")
cap.release()
cv2.destroyAllWindows()
return
frame_idx += 1
processed_count += 1
current_time = frame_idx / fps
print(f"\n🎬 Frame {frame_idx} at {current_time:.2f}s (processed #{processed_count})")
# 24/7 Memory Management: Reset every hour
if processed_count % RESET_INTERVAL == 0:
print(f"🕐 HOURLY RESET: Clearing all tracking data (processed {processed_count} frames)")
print(f" 📊 Before reset: {len(tracker_to_zone_id)} tracked cars, next Zone ID was {next_zone_id}")
# Clear all tracking data
tracker_to_zone_id.clear()
car_id_counts.clear()
successful_cars.clear()
last_positions.clear()
next_zone_id = 1 # Reset to 1
# Reset BoT-SORT tracker state
try:
model.reset()
print(f" ✅ BoT-SORT tracker reset successfully")
except:
print(f" ⚠️ BoT-SORT reset not available (continuing without reset)")
print(f" 🆕 Zone IDs will start from 1 again")
# Draw tracking zone on frame
draw_zone(frame, tracking_zone, color=(0, 255, 255), thickness=3) # Yellow zone
# First run YOLO detection (without tracking) to find cars in zone
detection_results = model(frame, verbose=False, conf=0.7, classes=[2])
# Find cars currently in the tracking zone
current_zone_cars = []
total_detections = 0
if detection_results[0].boxes is not None:
boxes = detection_results[0].boxes.xyxy.cpu()
scores = detection_results[0].boxes.conf.cpu()
total_detections = len(boxes)
print(f" 🔍 Total car detections: {total_detections}")
for i in range(len(boxes)):
x1, y1, x2, y2 = boxes[i]
conf = float(scores[i])
# Check if detection is in zone (using bottom center)
box_bottom = ((x1 + x2) / 2, y2)
if point_in_polygon(box_bottom, tracking_zone):
current_zone_cars.append({
'bbox': [float(x1), float(y1), float(x2), float(y2)],
'conf': conf,
'center': ((x1 + x2) / 2, (y1 + y2) / 2),
'bottom': box_bottom
})
print(f" 🎯 Cars in zone: {len(current_zone_cars)}")
# Only run tracking if there are cars in the zone
detected_car_ids = set()
if current_zone_cars:
# Run tracking on the full frame (let tracker handle associations)
# But we'll filter results to only zone cars afterward
results = model.track(
frame,
persist=True,
verbose=False,
conf=0.7,
classes=[2],
tracker="botsort_reid.yaml"
)
if results[0].boxes is not None and results[0].boxes.id is not None:
boxes = results[0].boxes.xyxy.cpu()
scores = results[0].boxes.conf.cpu()
track_ids = results[0].boxes.id.cpu().int()
print(f" 📊 Total tracked objects: {len(track_ids)}")
# Filter tracked objects to only those in zone
zone_tracks = []
for i, track_id in enumerate(track_ids):
x1, y1, x2, y2 = boxes[i]
conf = float(scores[i])
# Check if this tracked object is in our zone
box_bottom = ((x1 + x2) / 2, y2)
if point_in_polygon(box_bottom, tracking_zone):
zone_tracks.append({
'id': int(track_id),
'bbox': [int(x1), int(y1), int(x2), int(y2)],
'conf': conf,
'center': ((x1 + x2) / 2, (y1 + y2) / 2),
'bottom': box_bottom
})
print(f" ✅ Zone tracks: {len(zone_tracks)}")
# Process each zone track
for track in zone_tracks:
tracker_id = track['id'] # Original tracker ID
x1, y1, x2, y2 = track['bbox']
conf = track['conf']
box_center = track['center']
# Map tracker ID to clean zone ID
if tracker_id not in tracker_to_zone_id:
tracker_to_zone_id[tracker_id] = next_zone_id
print(f" 🆕 New car: Tracker ID {tracker_id} → Zone ID {next_zone_id}")
next_zone_id += 1
zone_id = tracker_to_zone_id[tracker_id] # Clean sequential ID
# Validate track continuity (use tracker_id for internal logic)
is_valid = True
# Check for suspicious jumps
if tracker_id in last_positions:
last_center = last_positions[tracker_id]
distance = np.sqrt((box_center[0] - last_center[0])**2 +
(box_center[1] - last_center[1])**2)
if distance > 400: # pixels in ~2.0s
is_valid = False
print(f" ⚠️ Zone ID {zone_id} (Tracker {tracker_id}): suspicious jump {distance:.0f}px")
# Skip already successful cars (use zone_id for user logic)
if zone_id in successful_cars:
is_valid = False
print(f" ✅ Zone ID {zone_id}: already successful, skipping")
# Only process valid, high-confidence zone tracks
if is_valid and conf > 0.7:
detected_car_ids.add(zone_id) # Use zone_id for display
car_id_counts[zone_id] += 1
last_positions[tracker_id] = box_center # Track by tracker_id internally
# Draw tracking results with clean zone ID
zone_color = (0, 255, 0) # Green for zone cars
cv2.rectangle(frame, (x1, y1), (x2, y2), zone_color, 2)
cv2.putText(frame, f'ZONE ID:{zone_id}',
(x1, y1-30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, zone_color, 2)
cv2.putText(frame, f'#{car_id_counts[zone_id]} {conf:.2f}',
(x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, zone_color, 2)
# Draw center point
cv2.circle(frame, (int(track['bottom'][0]), int(track['bottom'][1])), 5, zone_color, -1)
print(f" ✅ Zone ID {zone_id} (Tracker {tracker_id}): ZONE detection #{car_id_counts[zone_id]} (conf: {conf:.2f})")
# Check for success (5 consecutive detections IN ZONE)
if car_id_counts[zone_id] == 5:
print(f"🏆 SUCCESS: Zone ID {zone_id} achieved 5 continuous ZONE detections - TRIGGER NEXT MODEL!")
successful_cars.add(zone_id)
# Add success indicator to frame
cv2.putText(frame, f"SUCCESS: Zone Car {zone_id}!",
(50, height-50), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 255, 0), 3)
else:
print(" 📋 No cars in zone - no tracking performed")
# Draw any cars outside the zone in red (for reference)
if detection_results[0].boxes is not None:
boxes = detection_results[0].boxes.xyxy.cpu()
scores = detection_results[0].boxes.conf.cpu()
for i in range(len(boxes)):
x1, y1, x2, y2 = boxes[i]
conf = float(scores[i])
box_bottom = ((x1 + x2) / 2, y2)
if not point_in_polygon(box_bottom, tracking_zone):
# Draw cars outside zone in red (not tracked)
x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 1)
cv2.putText(frame, f'OUT {conf:.2f}',
(x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
# Display results
if detected_car_ids:
print(f" 📋 Active Zone IDs: {sorted(detected_car_ids)} (Clean sequential IDs)")
# Show ID mapping for debugging
if tracker_to_zone_id:
mapping_str = ", ".join([f"Tracker{k}→Zone{v}" for k, v in tracker_to_zone_id.items()])
print(f" 🔄 ID Mapping: {mapping_str}")
# Add annotations to frame
cv2.putText(frame, f"BoT-SORT Zone-First Tracking | Frame: {frame_idx} | {current_time:.2f}s",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
cv2.putText(frame, f"Zone Cars: {len(current_zone_cars)} | Active Tracks: {len(detected_car_ids)}",
(10, 65), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
cv2.putText(frame, f"Successful Cars: {len(successful_cars)}",
(10, 100), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
cv2.putText(frame, "TRACKING ZONE",
(tracking_zone[0][0], tracking_zone[0][1]-10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2)
# Write annotated frame to output video (repeat for 2 seconds duration)
write_frame_to_video(out, frame, frame_skip)
# Show video with zone tracking info
display_frame = cv2.resize(frame, (960, 540))
cv2.imshow('BoT-SORT Zone-First Tracking', display_frame)
# Quick check for quit
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
break
# Small delay to see results
time.sleep(0.1)
cap.release()
finalize_video(out)
cv2.destroyAllWindows()
print(f"\n🎯 BoT-SORT zone-first tracking completed!")
print(f"📊 Processed {processed_count} frames with {frame_skip/fps:.2f}s intervals")
print(f"🏆 Successfully tracked {len(successful_cars)} unique cars IN ZONE")
print(f"💾 Annotated video saved to: {output_path}")
if __name__ == "__main__":
main()

162
view_redis_images.py Normal file
View file

@ -0,0 +1,162 @@
#!/usr/bin/env python3
"""
Script to view frontal images saved in Redis
"""
import redis
import cv2
import numpy as np
import sys
from datetime import datetime
# Redis connection config (from pipeline.json)
REDIS_CONFIG = {
"host": "10.100.1.3",
"port": 6379,
"password": "FBQgi0i5RevAAMO5Hh66",
"db": 0
}
def connect_redis():
"""Connect to Redis server."""
try:
client = redis.Redis(
host=REDIS_CONFIG["host"],
port=REDIS_CONFIG["port"],
password=REDIS_CONFIG["password"],
db=REDIS_CONFIG["db"],
decode_responses=False # Keep bytes for images
)
client.ping()
print(f"✅ Connected to Redis at {REDIS_CONFIG['host']}:{REDIS_CONFIG['port']}")
return client
except redis.exceptions.ConnectionError as e:
print(f"❌ Failed to connect to Redis: {e}")
return None
def list_image_keys(client):
"""List all image keys in Redis."""
try:
# Look for keys matching the inference pattern
keys = client.keys("inference:*")
print(f"\n📋 Found {len(keys)} image keys:")
for i, key in enumerate(keys):
key_str = key.decode() if isinstance(key, bytes) else key
print(f"{i+1}. {key_str}")
return keys
except Exception as e:
print(f"❌ Error listing keys: {e}")
return []
def view_image(client, key):
"""View a specific image from Redis."""
try:
# Get image data from Redis
image_data = client.get(key)
if image_data is None:
print(f"❌ No data found for key: {key}")
return
print(f"📸 Image size: {len(image_data)} bytes")
# Convert bytes to numpy array
nparr = np.frombuffer(image_data, np.uint8)
# Decode image
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is None:
print("❌ Failed to decode image data")
return
print(f"🖼️ Image dimensions: {img.shape[1]}x{img.shape[0]} pixels")
# Display image
key_str = key.decode() if isinstance(key, bytes) else key
cv2.imshow(f'Redis Image: {key_str}', img)
print("👁️ Image displayed. Press any key to close...")
cv2.waitKey(0)
cv2.destroyAllWindows()
# Ask if user wants to save the image
save = input("💾 Save image to file? (y/n): ").lower().strip()
if save == 'y':
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"redis_image_{timestamp}.jpg"
cv2.imwrite(filename, img)
print(f"💾 Image saved as: {filename}")
except Exception as e:
print(f"❌ Error viewing image: {e}")
def monitor_new_images(client):
"""Monitor for new images being added to Redis."""
print("👀 Monitoring for new images... (Press Ctrl+C to stop)")
try:
# Subscribe to Redis pub/sub for car detections
pubsub = client.pubsub()
pubsub.subscribe('car_detections')
for message in pubsub.listen():
if message['type'] == 'message':
data = message['data'].decode()
print(f"🚨 New detection: {data}")
# Try to extract image key from message
import json
try:
detection_data = json.loads(data)
image_key = detection_data.get('image_key')
if image_key:
print(f"🖼️ New image available: {image_key}")
view_choice = input("View this image now? (y/n): ").lower().strip()
if view_choice == 'y':
view_image(client, image_key)
except json.JSONDecodeError:
pass
except KeyboardInterrupt:
print("\n👋 Stopping monitor...")
except Exception as e:
print(f"❌ Monitor error: {e}")
def main():
"""Main function."""
print("🔍 Redis Image Viewer")
print("=" * 50)
# Connect to Redis
client = connect_redis()
if not client:
return
while True:
print("\n📋 Options:")
print("1. List all image keys")
print("2. View specific image")
print("3. Monitor for new images")
print("4. Exit")
choice = input("\nEnter choice (1-4): ").strip()
if choice == '1':
keys = list_image_keys(client)
elif choice == '2':
keys = list_image_keys(client)
if keys:
try:
idx = int(input(f"\nEnter image number (1-{len(keys)}): ")) - 1
if 0 <= idx < len(keys):
view_image(client, keys[idx])
else:
print("❌ Invalid selection")
except ValueError:
print("❌ Please enter a valid number")
elif choice == '3':
monitor_new_images(client)
elif choice == '4':
print("👋 Goodbye!")
break
else:
print("❌ Invalid choice")
if __name__ == "__main__":
main()

325
webcam_rtsp_server.py Normal file
View file

@ -0,0 +1,325 @@
#!/usr/bin/env python3
"""
Enhanced webcam server that provides both RTSP streaming and HTTP snapshot endpoints
Compatible with CMS UI requirements for camera configuration
"""
import cv2
import threading
import time
import logging
import socket
from http.server import BaseHTTPRequestHandler, HTTPServer
import subprocess
import sys
import os
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
logger = logging.getLogger("webcam_rtsp_server")
# Global webcam capture object
webcam_cap = None
rtsp_process = None
class WebcamHTTPHandler(BaseHTTPRequestHandler):
"""HTTP handler for snapshot requests"""
def do_GET(self):
if self.path == '/snapshot' or self.path == '/snapshot.jpg':
try:
# Capture fresh frame from webcam for each request
ret, frame = webcam_cap.read()
if ret and frame is not None:
# Encode as JPEG
success, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
if success:
self.send_response(200)
self.send_header('Content-Type', 'image/jpeg')
self.send_header('Content-Length', str(len(buffer)))
self.send_header('Cache-Control', 'no-cache, no-store, must-revalidate')
self.send_header('Pragma', 'no-cache')
self.send_header('Expires', '0')
self.end_headers()
self.wfile.write(buffer.tobytes())
logger.debug(f"Served webcam snapshot, size: {len(buffer)} bytes")
return
else:
logger.error("Failed to encode frame as JPEG")
else:
logger.error("Failed to capture frame from webcam")
# Send error response
self.send_response(500)
self.send_header('Content-Type', 'text/plain')
self.end_headers()
self.wfile.write(b'Failed to capture webcam frame')
except Exception as e:
logger.error(f"Error serving snapshot: {e}")
self.send_response(500)
self.send_header('Content-Type', 'text/plain')
self.end_headers()
self.wfile.write(f'Error: {str(e)}'.encode())
elif self.path == '/status':
# Status endpoint for health checking
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
width = int(webcam_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(webcam_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = webcam_cap.get(cv2.CAP_PROP_FPS)
status = f'{{"status": "online", "width": {width}, "height": {height}, "fps": {fps}}}'
self.wfile.write(status.encode())
else:
# 404 for other paths
self.send_response(404)
self.send_header('Content-Type', 'text/plain')
self.end_headers()
self.wfile.write(b'Not Found - Available endpoints: /snapshot, /snapshot.jpg, /status')
def log_message(self, format, *args):
# Suppress default HTTP server logging to avoid spam
pass
def check_ffmpeg():
"""Check if FFmpeg is available for RTSP streaming"""
try:
result = subprocess.run(['ffmpeg', '-version'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
logger.info("FFmpeg found and working")
return True
except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.SubprocessError):
pass
logger.warning("FFmpeg not found. RTSP streaming will not be available.")
logger.info("To enable RTSP streaming, install FFmpeg:")
logger.info(" Windows: Download from https://ffmpeg.org/download.html")
logger.info(" Linux: sudo apt install ffmpeg")
logger.info(" macOS: brew install ffmpeg")
return False
def get_windows_camera_name():
"""Get the actual camera device name on Windows"""
try:
# List video devices using FFmpeg with proper encoding handling
result = subprocess.run(['ffmpeg', '-f', 'dshow', '-list_devices', 'true', '-i', 'dummy'],
capture_output=True, text=True, timeout=10, encoding='utf-8', errors='ignore')
output = result.stderr # FFmpeg outputs device list to stderr
# Look for video devices in the output
lines = output.split('\n')
video_devices = []
# Parse the output - look for lines with (video) that contain device names in quotes
for line in lines:
if '[dshow @' in line and '(video)' in line and '"' in line:
# Extract device name between first pair of quotes
start = line.find('"') + 1
end = line.find('"', start)
if start > 0 and end > start:
device_name = line[start:end]
video_devices.append(device_name)
logger.info(f"Found Windows video devices: {video_devices}")
if video_devices:
# Force use the first device (index 0) which is the Logitech HD webcam
return video_devices[0] # This will be "罗技高清网络摄像机 C930c"
else:
logger.info("No devices found via FFmpeg detection, using fallback")
# Fall through to fallback names
except Exception as e:
logger.debug(f"Failed to get Windows camera name: {e}")
# Try common camera device names as fallback
# Prioritize Integrated Camera since that's what's working now
common_names = [
"Integrated Camera", # This is working for the current setup
"USB Video Device", # Common name for USB cameras
"USB2.0 Camera",
"C930c", # Direct model name
"HD Pro Webcam C930c", # Full Logitech name
"Logitech", # Brand name
"USB Camera",
"Webcam"
]
logger.info(f"Using fallback camera names: {common_names}")
return common_names[0] # Return "Integrated Camera" first
def start_rtsp_stream(webcam_index=0, rtsp_port=8554):
"""Start RTSP streaming using FFmpeg"""
global rtsp_process
if not check_ffmpeg():
return None
try:
# Get the actual camera device name for Windows
if sys.platform.startswith('win'):
camera_name = get_windows_camera_name()
logger.info(f"Using Windows camera device: {camera_name}")
# FFmpeg command to stream webcam via RTSP
if sys.platform.startswith('win'):
cmd = [
'ffmpeg',
'-f', 'dshow',
'-i', f'video={camera_name}', # Use detected camera name
'-c:v', 'libx264',
'-preset', 'veryfast',
'-tune', 'zerolatency',
'-r', '30',
'-s', '1280x720',
'-f', 'rtsp',
f'rtsp://localhost:{rtsp_port}/stream'
]
elif sys.platform.startswith('linux'):
cmd = [
'ffmpeg',
'-f', 'v4l2',
'-i', f'/dev/video{webcam_index}',
'-c:v', 'libx264',
'-preset', 'veryfast',
'-tune', 'zerolatency',
'-r', '30',
'-s', '1280x720',
'-f', 'rtsp',
f'rtsp://localhost:{rtsp_port}/stream'
]
else: # macOS
cmd = [
'ffmpeg',
'-f', 'avfoundation',
'-i', f'{webcam_index}:',
'-c:v', 'libx264',
'-preset', 'veryfast',
'-tune', 'zerolatency',
'-r', '30',
'-s', '1280x720',
'-f', 'rtsp',
f'rtsp://localhost:{rtsp_port}/stream'
]
logger.info(f"Starting RTSP stream on rtsp://localhost:{rtsp_port}/stream")
logger.info(f"FFmpeg command: {' '.join(cmd)}")
rtsp_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Give FFmpeg a moment to start
time.sleep(2)
# Check if process is still running
if rtsp_process.poll() is None:
logger.info("RTSP streaming started successfully")
return rtsp_process
else:
# Get error output if process failed
stdout, stderr = rtsp_process.communicate(timeout=2)
logger.error("RTSP streaming failed to start")
logger.error(f"FFmpeg stdout: {stdout}")
logger.error(f"FFmpeg stderr: {stderr}")
return None
except Exception as e:
logger.error(f"Failed to start RTSP stream: {e}")
return None
def get_local_ip():
"""Get the Wireguard IP address for external access"""
# Use Wireguard IP for external access
return "10.101.1.4"
def main():
global webcam_cap, rtsp_process
# Configuration - Force use index 0 for Logitech HD webcam
webcam_index = 0 # Logitech HD webcam C930c (1920x1080@30fps)
http_port = 8080
rtsp_port = 8554
logger.info("=== Webcam RTSP & HTTP Server ===")
# Initialize webcam
logger.info("Initializing webcam...")
webcam_cap = cv2.VideoCapture(webcam_index)
if not webcam_cap.isOpened():
logger.error(f"Failed to open webcam at index {webcam_index}")
logger.info("Try different webcam indices (0, 1, 2, etc.)")
return
# Set webcam properties - Use high resolution for Logitech HD webcam
webcam_cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920)
webcam_cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080)
webcam_cap.set(cv2.CAP_PROP_FPS, 30)
width = int(webcam_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(webcam_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = webcam_cap.get(cv2.CAP_PROP_FPS)
logger.info(f"Webcam initialized: {width}x{height} @ {fps}fps")
# Get local IP for CMS configuration
local_ip = get_local_ip()
# Start RTSP streaming (optional, requires FFmpeg)
rtsp_process = start_rtsp_stream(webcam_index, rtsp_port)
# Start HTTP server for snapshots
server_address = ('0.0.0.0', http_port) # Bind to all interfaces
http_server = HTTPServer(server_address, WebcamHTTPHandler)
logger.info("\n=== Server URLs for CMS Configuration ===")
logger.info(f"HTTP Snapshot URL: http://{local_ip}:{http_port}/snapshot")
if rtsp_process:
logger.info(f"RTSP Stream URL: rtsp://{local_ip}:{rtsp_port}/stream")
else:
logger.info("RTSP Stream: Not available (FFmpeg not found)")
logger.info("HTTP-only mode: Use Snapshot URL for camera input")
logger.info(f"Status URL: http://{local_ip}:{http_port}/status")
logger.info("\n=== CMS Configuration Suggestions ===")
logger.info(f"Camera Identifier: webcam-local-01")
logger.info(f"RTSP Stream URL: rtsp://{local_ip}:{rtsp_port}/stream")
logger.info(f"Snapshot URL: http://{local_ip}:{http_port}/snapshot")
logger.info(f"Snapshot Interval: 2000 (ms)")
logger.info("\nPress Ctrl+C to stop all servers")
try:
# Start HTTP server
http_server.serve_forever()
except KeyboardInterrupt:
logger.info("Shutting down servers...")
finally:
# Clean up
if webcam_cap:
webcam_cap.release()
if rtsp_process:
logger.info("Stopping RTSP stream...")
rtsp_process.terminate()
try:
rtsp_process.wait(timeout=5)
except subprocess.TimeoutExpired:
rtsp_process.kill()
http_server.server_close()
logger.info("All servers stopped")
if __name__ == "__main__":
main()