diff --git a/app.py b/app.py index 6338401..605aa0b 100644 --- a/app.py +++ b/app.py @@ -6,8 +6,9 @@ import json import logging import os import time +import cv2 from contextlib import asynccontextmanager -from fastapi import FastAPI, WebSocket, HTTPException, Request +from fastapi import FastAPI, WebSocket, HTTPException from fastapi.responses import Response # Import new modular communication system @@ -27,8 +28,8 @@ logging.basicConfig( logger = logging.getLogger("detector_worker") logger.setLevel(logging.DEBUG) -# Store cached frames for REST API access (temporary storage) -latest_frames = {} +# Frames are now stored in the shared cache buffer from core.streaming.buffers +# latest_frames = {} # Deprecated - using shared_cache_buffer instead # Lifespan event handler (modern FastAPI approach) @asynccontextmanager @@ -49,7 +50,7 @@ async def lifespan(app: FastAPI): worker_state.set_subscriptions([]) worker_state.session_ids.clear() worker_state.progression_stages.clear() - latest_frames.clear() + # latest_frames.clear() # No longer needed - frames are in shared_cache_buffer logger.info("Detector Worker shutdown complete") # Create FastAPI application with detailed WebSocket logging @@ -90,8 +91,8 @@ from core.streaming import initialize_stream_manager initialize_stream_manager(max_streams=config.get('max_streams', 10)) logger.info(f"Initialized stream manager with max_streams={config.get('max_streams', 10)}") -# Store cached frames for REST API access (temporary storage) -latest_frames = {} +# Frames are now stored in the shared cache buffer from core.streaming.buffers +# latest_frames = {} # Deprecated - using shared_cache_buffer instead logger.info("Starting detector worker application (refactored)") logger.info(f"Configuration: Target FPS: {config.get('target_fps', 10)}, " @@ -150,31 +151,33 @@ async def get_camera_image(camera_id: str): detail=f"Camera {camera_id} not found or not active" ) - # Check if we have a cached frame for this camera - if camera_id not in latest_frames: - logger.warning(f"No cached frame available for camera '{camera_id}'") + # Extract actual camera_id from subscription identifier (displayId;cameraId) + # Frames are stored using just the camera_id part + actual_camera_id = camera_id.split(';')[-1] if ';' in camera_id else camera_id + + # Get frame from the shared cache buffer + from core.streaming.buffers import shared_cache_buffer + + # Only show buffer debug info if camera not found (to reduce log spam) + available_cameras = shared_cache_buffer.frame_buffer.get_camera_list() + + frame = shared_cache_buffer.get_frame(actual_camera_id) + if frame is None: + logger.warning(f"\033[93m[API] No frame for '{actual_camera_id}' - Available: {available_cameras}\033[0m") raise HTTPException( status_code=404, - detail=f"No frame available for camera {camera_id}" + detail=f"No frame available for camera {actual_camera_id}" ) - frame = latest_frames[camera_id] - logger.debug(f"Retrieved cached frame for camera '{camera_id}', shape: {frame.shape}") + # Successful frame retrieval - log only occasionally to avoid spam - # TODO: This import will be replaced in Phase 3 (Streaming System) - # For now, we need to handle the case where OpenCV is not available - try: - import cv2 - # Encode frame as JPEG - success, buffer_img = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) - if not success: - raise HTTPException(status_code=500, detail="Failed to encode image as JPEG") + # Encode frame as JPEG + success, buffer_img = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) + if not success: + raise HTTPException(status_code=500, detail="Failed to encode image as JPEG") - # Return image as binary response - return Response(content=buffer_img.tobytes(), media_type="image/jpeg") - except ImportError: - logger.error("OpenCV not available for image encoding") - raise HTTPException(status_code=500, detail="Image processing not available") + # Return image as binary response + return Response(content=buffer_img.tobytes(), media_type="image/jpeg") except HTTPException: raise diff --git a/core/communication/websocket.py b/core/communication/websocket.py index 813350e..4e40d2a 100644 --- a/core/communication/websocket.py +++ b/core/communication/websocket.py @@ -297,31 +297,31 @@ class WebSocketHandler: async def _reconcile_subscriptions_with_tracking(self, target_subscriptions) -> dict: """Reconcile subscriptions with tracking integration.""" try: - # First, we need to create tracking integrations for each unique model + # Create separate tracking integrations for each subscription (camera isolation) tracking_integrations = {} for subscription_payload in target_subscriptions: + subscription_id = subscription_payload['subscriptionIdentifier'] model_id = subscription_payload['modelId'] - # Create tracking integration if not already created - if model_id not in tracking_integrations: - # Get pipeline configuration for this model - pipeline_parser = model_manager.get_pipeline_config(model_id) - if pipeline_parser: - # Create tracking integration with message sender - tracking_integration = TrackingPipelineIntegration( - pipeline_parser, model_manager, model_id, self._send_message - ) + # Create separate tracking integration per subscription for camera isolation + # Get pipeline configuration for this model + pipeline_parser = model_manager.get_pipeline_config(model_id) + if pipeline_parser: + # Create tracking integration with message sender (separate instance per camera) + tracking_integration = TrackingPipelineIntegration( + pipeline_parser, model_manager, model_id, self._send_message + ) - # Initialize tracking model - success = await tracking_integration.initialize_tracking_model() - if success: - tracking_integrations[model_id] = tracking_integration - logger.info(f"[Tracking] Created tracking integration for model {model_id}") - else: - logger.warning(f"[Tracking] Failed to initialize tracking for model {model_id}") + # Initialize tracking model + success = await tracking_integration.initialize_tracking_model() + if success: + tracking_integrations[subscription_id] = tracking_integration + logger.info(f"[Tracking] Created isolated tracking integration for subscription {subscription_id} (model {model_id})") else: - logger.warning(f"[Tracking] No pipeline config found for model {model_id}") + logger.warning(f"[Tracking] Failed to initialize tracking for subscription {subscription_id} (model {model_id})") + else: + logger.warning(f"[Tracking] No pipeline config found for model {model_id} in subscription {subscription_id}") # Now reconcile with StreamManager, adding tracking integrations current_subscription_ids = set() @@ -377,8 +377,10 @@ class WebSocketHandler: camera_id = subscription_id.split(';')[-1] model_id = payload['modelId'] - # Get tracking integration for this model - tracking_integration = tracking_integrations.get(model_id) + logger.info(f"[SUBSCRIPTION_MAPPING] subscription_id='{subscription_id}' → camera_id='{camera_id}'") + + # Get tracking integration for this subscription (camera-isolated) + tracking_integration = tracking_integrations.get(subscription_id) # Extract crop coordinates if present crop_coords = None @@ -410,7 +412,7 @@ class WebSocketHandler: ) if success and tracking_integration: - logger.info(f"[Tracking] Subscription {subscription_id} configured with tracking for model {model_id}") + logger.info(f"[Tracking] Subscription {subscription_id} configured with isolated tracking for model {model_id}") return success @@ -547,10 +549,6 @@ class WebSocketHandler: # Update tracking integrations with session ID shared_stream_manager.set_session_id(display_identifier, session_id) - # Save snapshot image after getting sessionId - if session_id: - await self._save_snapshot(display_identifier, session_id) - async def _handle_set_progression_stage(self, message: SetProgressionStageMessage) -> None: """Handle setProgressionStage message.""" display_identifier = message.payload.displayIdentifier @@ -566,6 +564,10 @@ class WebSocketHandler: if session_id: shared_stream_manager.set_progression_stage(session_id, stage) + # Save snapshot image when progression stage is car_fueling + if stage == 'car_fueling' and session_id: + await self._save_snapshot(display_identifier, session_id) + # If stage indicates session is cleared/finished, clear from tracking if stage in ['finished', 'cleared', 'idle']: # Get session ID for this display and clear it diff --git a/core/models/inference.py b/core/models/inference.py index 826061c..f96c0e8 100644 --- a/core/models/inference.py +++ b/core/models/inference.py @@ -60,6 +60,8 @@ class YOLOWrapper: self.model = None self._class_names = [] + + self._load_model() logger.info(f"Initialized YOLO wrapper for {model_id} on {self.device}") @@ -115,6 +117,7 @@ class YOLOWrapper: logger.error(f"Failed to extract class names: {str(e)}") self._class_names = {} + def infer( self, image: np.ndarray, @@ -222,55 +225,30 @@ class YOLOWrapper: return detections + def track( self, image: np.ndarray, confidence_threshold: float = 0.5, trigger_classes: Optional[List[str]] = None, - persist: bool = True + persist: bool = True, + camera_id: Optional[str] = None ) -> InferenceResult: """ - Run tracking on an image + Run detection (tracking will be handled by external tracker) Args: image: Input image as numpy array (BGR format) confidence_threshold: Minimum confidence for detections trigger_classes: List of class names to filter - persist: Whether to persist tracks across frames + persist: Ignored - tracking handled externally + camera_id: Ignored - tracking handled externally Returns: - InferenceResult containing detections with track IDs + InferenceResult containing detections (no track IDs from YOLO) """ - if self.model is None: - raise RuntimeError(f"Model {self.model_id} not loaded") - - try: - import time - start_time = time.time() - - # Run tracking - results = self.model.track( - image, - conf=confidence_threshold, - persist=persist, - verbose=False - ) - - inference_time = time.time() - start_time - - # Parse results - detections = self._parse_results(results[0], trigger_classes) - - return InferenceResult( - detections=detections, - image_shape=(image.shape[0], image.shape[1]), - inference_time=inference_time, - model_id=self.model_id - ) - - except Exception as e: - logger.error(f"Tracking failed for model {self.model_id}: {str(e)}", exc_info=True) - raise + # Just do detection - no YOLO tracking + return self.infer(image, confidence_threshold, trigger_classes) def predict_classification( self, @@ -350,6 +328,7 @@ class YOLOWrapper: """Get the number of classes the model can detect""" return len(self._class_names) + def clear_cache(self) -> None: """Clear the model cache""" with self._cache_lock: diff --git a/core/streaming/buffers.py b/core/streaming/buffers.py index fd29fbb..f2c5787 100644 --- a/core/streaming/buffers.py +++ b/core/streaming/buffers.py @@ -46,13 +46,7 @@ class FrameBuffer: frame_data = self._frames[camera_id] - # Check if frame is too old - age = time.time() - frame_data['timestamp'] - if age > self.max_age_seconds: - logger.debug(f"Frame for camera {camera_id} is {age:.1f}s old, discarding") - del self._frames[camera_id] - return None - + # Return frame regardless of age - frames persist until replaced return frame_data['frame'].copy() def get_frame_info(self, camera_id: str) -> Optional[Dict[str, Any]]: @@ -64,10 +58,7 @@ class FrameBuffer: frame_data = self._frames[camera_id] age = time.time() - frame_data['timestamp'] - if age > self.max_age_seconds: - del self._frames[camera_id] - return None - + # Return frame info regardless of age - frames persist until replaced return { 'timestamp': frame_data['timestamp'], 'age': age, @@ -95,24 +86,10 @@ class FrameBuffer: logger.debug(f"Cleared all frames ({count} cameras)") def get_camera_list(self) -> list: - """Get list of cameras with valid frames.""" + """Get list of cameras with frames - all frames persist until replaced.""" with self._lock: - current_time = time.time() - valid_cameras = [] - expired_cameras = [] - - for camera_id, frame_data in self._frames.items(): - age = current_time - frame_data['timestamp'] - if age <= self.max_age_seconds: - valid_cameras.append(camera_id) - else: - expired_cameras.append(camera_id) - - # Clean up expired frames - for camera_id in expired_cameras: - del self._frames[camera_id] - - return valid_cameras + # Return all cameras that have frames - no age-based filtering + return list(self._frames.keys()) def get_stats(self) -> Dict[str, Any]: """Get buffer statistics.""" @@ -120,8 +97,8 @@ class FrameBuffer: current_time = time.time() stats = { 'total_cameras': len(self._frames), - 'valid_cameras': 0, - 'expired_cameras': 0, + 'recent_cameras': 0, + 'stale_cameras': 0, 'total_memory_mb': 0, 'cameras': {} } @@ -130,16 +107,17 @@ class FrameBuffer: age = current_time - frame_data['timestamp'] size_mb = frame_data.get('size_mb', 0) + # All frames are valid/available, but categorize by freshness for monitoring if age <= self.max_age_seconds: - stats['valid_cameras'] += 1 + stats['recent_cameras'] += 1 else: - stats['expired_cameras'] += 1 + stats['stale_cameras'] += 1 stats['total_memory_mb'] += size_mb stats['cameras'][camera_id] = { 'age': age, - 'valid': age <= self.max_age_seconds, + 'recent': age <= self.max_age_seconds, # Recent but all frames available 'shape': frame_data['shape'], 'dtype': frame_data['dtype'], 'size_mb': size_mb diff --git a/core/streaming/manager.py b/core/streaming/manager.py index 156daf1..0c026e7 100644 --- a/core/streaming/manager.py +++ b/core/streaming/manager.py @@ -130,6 +130,7 @@ class StreamManager: try: if stream_config.rtsp_url: # RTSP stream using FFmpeg subprocess with CUDA acceleration + logger.info(f"\033[94m[RTSP] Starting {camera_id}\033[0m") reader = FFmpegRTSPReader( camera_id=camera_id, rtsp_url=stream_config.rtsp_url, @@ -138,10 +139,11 @@ class StreamManager: reader.set_frame_callback(self._frame_callback) reader.start() self._streams[camera_id] = reader - logger.info(f"Started FFmpeg RTSP stream for camera {camera_id}") + logger.info(f"\033[92m[RTSP] {camera_id} connected\033[0m") elif stream_config.snapshot_url: # HTTP snapshot stream + logger.info(f"\033[95m[HTTP] Starting {camera_id}\033[0m") reader = HTTPSnapshotReader( camera_id=camera_id, snapshot_url=stream_config.snapshot_url, @@ -151,7 +153,7 @@ class StreamManager: reader.set_frame_callback(self._frame_callback) reader.start() self._streams[camera_id] = reader - logger.info(f"Started HTTP snapshot stream for camera {camera_id}") + logger.info(f"\033[92m[HTTP] {camera_id} connected\033[0m") else: logger.error(f"No valid URL provided for camera {camera_id}") @@ -169,8 +171,9 @@ class StreamManager: try: self._streams[camera_id].stop() del self._streams[camera_id] - shared_cache_buffer.clear_camera(camera_id) - logger.info(f"Stopped stream for camera {camera_id}") + # DON'T clear frames - they should persist until replaced + # shared_cache_buffer.clear_camera(camera_id) # REMOVED - frames should persist + logger.info(f"Stopped stream for camera {camera_id} (frames preserved in buffer)") except Exception as e: logger.error(f"Error stopping stream for camera {camera_id}: {e}") @@ -179,6 +182,16 @@ class StreamManager: try: # Store frame in shared buffer shared_cache_buffer.put_frame(camera_id, frame) + # Quieter frame callback logging - only log occasionally + if hasattr(self, '_frame_log_count'): + self._frame_log_count += 1 + else: + self._frame_log_count = 1 + + # Log every 100 frames to avoid spam + if self._frame_log_count % 100 == 0: + available_cameras = shared_cache_buffer.frame_buffer.get_camera_list() + logger.info(f"\033[96m[BUFFER] {len(available_cameras)} active cameras: {', '.join(available_cameras)}\033[0m") # Process tracking for subscriptions with tracking integration self._process_tracking_for_camera(camera_id, frame) @@ -376,20 +389,51 @@ class StreamManager: logger.debug(f"Set session {session_id} for display {display_id}") def clear_session_id(self, session_id: str): - """Clear session ID from tracking integrations.""" + """Clear session ID from the specific tracking integration handling this session.""" with self._lock: + # Find the subscription that's handling this session + session_subscription = None for subscription_info in self._subscriptions.values(): if subscription_info.tracking_integration: - subscription_info.tracking_integration.clear_session_id(session_id) - logger.debug(f"Cleared session {session_id}") + # Check if this integration is handling the given session_id + integration = subscription_info.tracking_integration + if session_id in integration.session_vehicles: + session_subscription = subscription_info + break + + if session_subscription and session_subscription.tracking_integration: + session_subscription.tracking_integration.clear_session_id(session_id) + logger.debug(f"Cleared session {session_id} from subscription {session_subscription.subscription_id}") + else: + logger.warning(f"No tracking integration found for session {session_id}, broadcasting to all subscriptions") + # Fallback: broadcast to all (original behavior) + for subscription_info in self._subscriptions.values(): + if subscription_info.tracking_integration: + subscription_info.tracking_integration.clear_session_id(session_id) def set_progression_stage(self, session_id: str, stage: str): - """Set progression stage for tracking integrations.""" + """Set progression stage for the specific tracking integration handling this session.""" with self._lock: + # Find the subscription that's handling this session + session_subscription = None for subscription_info in self._subscriptions.values(): if subscription_info.tracking_integration: - subscription_info.tracking_integration.set_progression_stage(session_id, stage) - logger.debug(f"Set progression stage for session {session_id}: {stage}") + # Check if this integration is handling the given session_id + # We need to check the integration's active sessions + integration = subscription_info.tracking_integration + if session_id in integration.session_vehicles: + session_subscription = subscription_info + break + + if session_subscription and session_subscription.tracking_integration: + session_subscription.tracking_integration.set_progression_stage(session_id, stage) + logger.debug(f"Set progression stage for session {session_id}: {stage} on subscription {session_subscription.subscription_id}") + else: + logger.warning(f"No tracking integration found for session {session_id}, broadcasting to all subscriptions") + # Fallback: broadcast to all (original behavior) + for subscription_info in self._subscriptions.values(): + if subscription_info.tracking_integration: + subscription_info.tracking_integration.set_progression_stage(session_id, stage) def get_tracking_stats(self) -> Dict[str, Any]: """Get tracking statistics from all subscriptions.""" diff --git a/core/streaming/readers.py b/core/streaming/readers.py index d8d4b4d..d5635ba 100644 --- a/core/streaming/readers.py +++ b/core/streaming/readers.py @@ -12,8 +12,7 @@ import os import subprocess # import fcntl # No longer needed with atomic file operations from typing import Optional, Callable -from watchdog.observers import Observer -from watchdog.events import FileSystemEventHandler +# Removed watchdog imports - no longer using file watching # Suppress FFMPEG/H.264 error messages if needed # Set this environment variable to reduce noise from decoder errors @@ -22,31 +21,42 @@ os.environ["OPENCV_FFMPEG_LOGLEVEL"] = "-8" # Suppress FFMPEG warnings logger = logging.getLogger(__name__) -# Suppress noisy watchdog debug logs -logging.getLogger('watchdog.observers.inotify_buffer').setLevel(logging.CRITICAL) -logging.getLogger('watchdog.observers.fsevents').setLevel(logging.CRITICAL) -logging.getLogger('fsevents').setLevel(logging.CRITICAL) +# Color codes for pretty logging +class Colors: + GREEN = '\033[92m' + YELLOW = '\033[93m' + RED = '\033[91m' + BLUE = '\033[94m' + PURPLE = '\033[95m' + CYAN = '\033[96m' + WHITE = '\033[97m' + BOLD = '\033[1m' + END = '\033[0m' + +def log_success(camera_id: str, message: str): + """Log success messages in green""" + logger.info(f"{Colors.GREEN}[{camera_id}] {message}{Colors.END}") + +def log_warning(camera_id: str, message: str): + """Log warnings in yellow""" + logger.warning(f"{Colors.YELLOW}[{camera_id}] {message}{Colors.END}") + +def log_error(camera_id: str, message: str): + """Log errors in red""" + logger.error(f"{Colors.RED}[{camera_id}] {message}{Colors.END}") + +def log_info(camera_id: str, message: str): + """Log info in cyan""" + logger.info(f"{Colors.CYAN}[{camera_id}] {message}{Colors.END}") + +# Removed watchdog logging configuration - no longer using file watching -class FrameFileHandler(FileSystemEventHandler): - """File system event handler for frame file changes.""" - - def __init__(self, callback): - self.callback = callback - self.last_modified = 0 - - def on_modified(self, event): - if event.is_directory: - return - # Debounce rapid file changes - current_time = time.time() - if current_time - self.last_modified > 0.01: # 10ms debounce - self.last_modified = current_time - self.callback() +# Removed FrameFileHandler - no longer using file watching class FFmpegRTSPReader: - """RTSP stream reader using subprocess FFmpeg with CUDA hardware acceleration and file watching.""" + """RTSP stream reader using subprocess FFmpeg piping frames directly to buffer.""" def __init__(self, camera_id: str, rtsp_url: str, max_retries: int = 3): self.camera_id = camera_id @@ -56,10 +66,8 @@ class FFmpegRTSPReader: self.stop_event = threading.Event() self.thread = None self.frame_callback: Optional[Callable] = None - self.observer = None - self.frame_ready_event = threading.Event() - # Stream specs + # Expected stream specs (for reference, actual dimensions read from PPM header) self.width = 1280 self.height = 720 @@ -76,7 +84,7 @@ class FFmpegRTSPReader: self.stop_event.clear() self.thread = threading.Thread(target=self._read_frames, daemon=True) self.thread.start() - logger.info(f"Started FFmpeg reader for camera {self.camera_id}") + log_success(self.camera_id, "Stream started") def stop(self): """Stop the FFmpeg subprocess reader.""" @@ -89,171 +97,138 @@ class FFmpegRTSPReader: self.process.kill() if self.thread: self.thread.join(timeout=5.0) - logger.info(f"Stopped FFmpeg reader for camera {self.camera_id}") + log_info(self.camera_id, "Stream stopped") + + # Removed _probe_stream_info - BMP headers contain dimensions def _start_ffmpeg_process(self): - """Start FFmpeg subprocess writing timestamped frames for atomic reads.""" - # Create temp file paths for this camera - self.frame_dir = "/tmp/frame" - os.makedirs(self.frame_dir, exist_ok=True) - - # Use strftime pattern - FFmpeg writes each frame with unique timestamp - # This ensures each file is complete when written - camera_id_safe = self.camera_id.replace(' ', '_') - self.frame_prefix = f"camera_{camera_id_safe}" - # Using strftime pattern with microseconds for unique filenames - self.frame_pattern = f"{self.frame_dir}/{self.frame_prefix}_%Y%m%d_%H%M%S_%f.ppm" - + """Start FFmpeg subprocess outputting BMP frames to stdout pipe.""" cmd = [ 'ffmpeg', # DO NOT REMOVE - '-hwaccel', 'cuda', - '-hwaccel_device', '0', + # '-hwaccel', 'cuda', + # '-hwaccel_device', '0', '-rtsp_transport', 'tcp', '-i', self.rtsp_url, - '-f', 'image2', - '-strftime', '1', # Enable strftime pattern expansion - '-pix_fmt', 'rgb24', # PPM uses RGB not BGR - '-an', # No audio - '-y', # Overwrite output file - self.frame_pattern # Write timestamped frames + '-f', 'image2pipe', # Output images to pipe + '-vcodec', 'bmp', # BMP format with header containing dimensions + # Use native stream resolution and framerate + '-an', # No audio + '-' # Output to stdout ] try: - # Log the FFmpeg command for debugging - logger.info(f"Starting FFmpeg for camera {self.camera_id} with command: {' '.join(cmd)}") - - # Start FFmpeg detached - we don't need to communicate with it + # Start FFmpeg with stdout pipe to read frames directly self.process = subprocess.Popen( cmd, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL + stdout=subprocess.PIPE, # Capture stdout for frame data + stderr=subprocess.DEVNULL, + bufsize=0 # Unbuffered for real-time processing ) - logger.info(f"Started FFmpeg process PID {self.process.pid} for camera {self.camera_id} -> {self.frame_pattern}") return True except Exception as e: - logger.error(f"Failed to start FFmpeg for camera {self.camera_id}: {e}") + log_error(self.camera_id, f"FFmpeg startup failed: {e}") return False - def _setup_file_watcher(self): - """Setup file system watcher for frame directory.""" - # Setup file watcher for the frame directory - handler = FrameFileHandler(lambda: self._on_file_changed()) - self.observer = Observer() - self.observer.schedule(handler, self.frame_dir, recursive=False) - self.observer.start() - logger.info(f"Started file watcher for {self.frame_dir} with pattern {self.frame_prefix}*.ppm") + def _read_bmp_frame(self, pipe): + """Read BMP frame from pipe - BMP header contains dimensions.""" + try: + # Read BMP header (14 bytes file header + 40 bytes info header = 54 bytes minimum) + header_data = b'' + bytes_to_read = 54 - def _on_file_changed(self): - """Called when a new frame file is created.""" - # Signal that a new frame might be available - self.frame_ready_event.set() + while len(header_data) < bytes_to_read: + chunk = pipe.read(bytes_to_read - len(header_data)) + if not chunk: + return None # Silent end of stream + header_data += chunk + + # Parse BMP header + if header_data[:2] != b'BM': + return None # Invalid format, skip frame silently + + # Extract file size from header (bytes 2-5) + import struct + file_size = struct.unpack('= 30: - logger.error(f"No frame files created after 30s for {self.camera_id}") - logger.error(f"Expected pattern: {self.frame_dir}/{self.frame_prefix}*.ppm") - continue + # Call frame callback + if self.frame_callback: + self.frame_callback(self.camera_id, frame) - # Setup file watcher - self._setup_file_watcher() + frame_count += 1 - # Wait for file change event (or timeout for health check) - if self.frame_ready_event.wait(timeout=restart_check_interval): - self.frame_ready_event.clear() + # Log progress every 60 seconds (quieter) + current_time = time.time() + if current_time - last_log_time >= 60: + log_success(self.camera_id, f"{frame_count} frames captured ({frame.shape[1]}x{frame.shape[0]})") + last_log_time = current_time - # Read latest complete frame file - try: - import glob - # Find all frame files for this camera - frame_files = glob.glob(f"{self.frame_dir}/{self.frame_prefix}*.ppm") + except Exception: + # Process might have died, let it restart on next iteration + if self.process: + self.process.terminate() + self.process = None + time.sleep(1.0) - if frame_files: - # Sort by filename (which includes timestamp) and get the latest - frame_files.sort() - latest_frame = frame_files[-1] - - # Read the latest frame (it's complete since FFmpeg wrote it atomically) - frame = cv2.imread(latest_frame) - - if frame is not None and frame.shape == (self.height, self.width, 3): - # Call frame callback directly - if self.frame_callback: - self.frame_callback(self.camera_id, frame) - - frame_count += 1 - - # Log progress - current_time = time.time() - if current_time - last_log_time >= 30: - logger.info(f"Camera {self.camera_id}: {frame_count} frames processed") - last_log_time = current_time - - # Clean up old frame files to prevent disk filling - # Keep only the latest 5 frames - if len(frame_files) > 5: - for old_file in frame_files[:-5]: - try: - os.remove(old_file) - except: - pass - - except Exception as e: - logger.debug(f"Camera {self.camera_id}: Error reading frames: {e}") - pass - - except Exception as e: - logger.error(f"Camera {self.camera_id}: Error in reactive frame reading: {e}") + except Exception: time.sleep(1.0) # Cleanup - if self.observer: - self.observer.stop() - self.observer.join() if self.process: self.process.terminate() - # Clean up all frame files for this camera - try: - if hasattr(self, 'frame_prefix') and hasattr(self, 'frame_dir'): - import glob - frame_files = glob.glob(f"{self.frame_dir}/{self.frame_prefix}*.ppm") - for frame_file in frame_files: - try: - os.remove(frame_file) - except: - pass - except: - pass - logger.info(f"Reactive FFmpeg reader ended for camera {self.camera_id}") logger = logging.getLogger(__name__) diff --git a/core/tracking/bot_sort_tracker.py b/core/tracking/bot_sort_tracker.py new file mode 100644 index 0000000..f487a6a --- /dev/null +++ b/core/tracking/bot_sort_tracker.py @@ -0,0 +1,408 @@ +""" +BoT-SORT Multi-Object Tracker with Camera Isolation +Based on BoT-SORT: Robust Associations Multi-Pedestrian Tracking +""" + +import logging +import time +import numpy as np +from typing import Dict, List, Optional, Tuple, Any +from dataclasses import dataclass +from scipy.optimize import linear_sum_assignment +from filterpy.kalman import KalmanFilter +import cv2 + +logger = logging.getLogger(__name__) + + +@dataclass +class TrackState: + """Track state enumeration""" + TENTATIVE = "tentative" # New track, not confirmed yet + CONFIRMED = "confirmed" # Confirmed track + DELETED = "deleted" # Track to be deleted + + +class Track: + """ + Individual track representation with Kalman filter for motion prediction + """ + + def __init__(self, detection, track_id: int, camera_id: str): + """ + Initialize a new track + + Args: + detection: Initial detection (bbox, confidence, class) + track_id: Unique track identifier within camera + camera_id: Camera identifier + """ + self.track_id = track_id + self.camera_id = camera_id + self.state = TrackState.TENTATIVE + + # Time tracking + self.start_time = time.time() + self.last_update_time = time.time() + + # Appearance and motion + self.bbox = detection.bbox # [x1, y1, x2, y2] + self.confidence = detection.confidence + self.class_name = detection.class_name + + # Track management + self.hit_streak = 1 + self.time_since_update = 0 + self.age = 1 + + # Kalman filter for motion prediction + self.kf = self._create_kalman_filter() + self._update_kalman_filter(detection.bbox) + + # Track history + self.history = [detection.bbox] + self.max_history = 10 + + def _create_kalman_filter(self) -> KalmanFilter: + """Create Kalman filter for bbox tracking (x, y, w, h, vx, vy, vw, vh)""" + kf = KalmanFilter(dim_x=8, dim_z=4) + + # State transition matrix (constant velocity model) + kf.F = np.array([ + [1, 0, 0, 0, 1, 0, 0, 0], + [0, 1, 0, 0, 0, 1, 0, 0], + [0, 0, 1, 0, 0, 0, 1, 0], + [0, 0, 0, 1, 0, 0, 0, 1], + [0, 0, 0, 0, 1, 0, 0, 0], + [0, 0, 0, 0, 0, 1, 0, 0], + [0, 0, 0, 0, 0, 0, 1, 0], + [0, 0, 0, 0, 0, 0, 0, 1] + ]) + + # Measurement matrix (observe x, y, w, h) + kf.H = np.array([ + [1, 0, 0, 0, 0, 0, 0, 0], + [0, 1, 0, 0, 0, 0, 0, 0], + [0, 0, 1, 0, 0, 0, 0, 0], + [0, 0, 0, 1, 0, 0, 0, 0] + ]) + + # Process noise + kf.Q *= 0.01 + + # Measurement noise + kf.R *= 10 + + # Initial covariance + kf.P *= 100 + + return kf + + def _update_kalman_filter(self, bbox: List[float]): + """Update Kalman filter with new bbox""" + # Convert [x1, y1, x2, y2] to [cx, cy, w, h] + x1, y1, x2, y2 = bbox + cx = (x1 + x2) / 2 + cy = (y1 + y2) / 2 + w = x2 - x1 + h = y2 - y1 + + # Properly assign to column vector + self.kf.x[:4, 0] = [cx, cy, w, h] + + def predict(self) -> np.ndarray: + """Predict next position using Kalman filter""" + self.kf.predict() + + # Convert back to [x1, y1, x2, y2] format + cx, cy, w, h = self.kf.x[:4, 0] # Extract from column vector + x1 = cx - w/2 + y1 = cy - h/2 + x2 = cx + w/2 + y2 = cy + h/2 + + return np.array([x1, y1, x2, y2]) + + def update(self, detection): + """Update track with new detection""" + self.last_update_time = time.time() + self.time_since_update = 0 + self.hit_streak += 1 + self.age += 1 + + # Update track properties + self.bbox = detection.bbox + self.confidence = detection.confidence + + # Update Kalman filter + x1, y1, x2, y2 = detection.bbox + cx = (x1 + x2) / 2 + cy = (y1 + y2) / 2 + w = x2 - x1 + h = y2 - y1 + + self.kf.update([cx, cy, w, h]) + + # Update history + self.history.append(detection.bbox) + if len(self.history) > self.max_history: + self.history.pop(0) + + # Update state + if self.state == TrackState.TENTATIVE and self.hit_streak >= 3: + self.state = TrackState.CONFIRMED + + def mark_missed(self): + """Mark track as missed in this frame""" + self.time_since_update += 1 + self.age += 1 + + if self.time_since_update > 5: # Delete after 5 missed frames + self.state = TrackState.DELETED + + def is_confirmed(self) -> bool: + """Check if track is confirmed""" + return self.state == TrackState.CONFIRMED + + def is_deleted(self) -> bool: + """Check if track should be deleted""" + return self.state == TrackState.DELETED + + +class CameraTracker: + """ + BoT-SORT tracker for a single camera + """ + + def __init__(self, camera_id: str, max_disappeared: int = 10): + """ + Initialize camera tracker + + Args: + camera_id: Unique camera identifier + max_disappeared: Maximum frames a track can be missed before deletion + """ + self.camera_id = camera_id + self.max_disappeared = max_disappeared + + # Track management + self.tracks: Dict[int, Track] = {} + self.next_id = 1 + self.frame_count = 0 + + logger.info(f"Initialized BoT-SORT tracker for camera {camera_id}") + + def update(self, detections: List) -> List[Track]: + """ + Update tracker with new detections + + Args: + detections: List of Detection objects + + Returns: + List of active confirmed tracks + """ + self.frame_count += 1 + + # Predict all existing tracks + for track in self.tracks.values(): + track.predict() + + # Associate detections to tracks + matched_tracks, unmatched_detections, unmatched_tracks = self._associate(detections) + + # Update matched tracks + for track_id, detection in matched_tracks: + self.tracks[track_id].update(detection) + + # Mark unmatched tracks as missed + for track_id in unmatched_tracks: + self.tracks[track_id].mark_missed() + + # Create new tracks for unmatched detections + for detection in unmatched_detections: + track = Track(detection, self.next_id, self.camera_id) + self.tracks[self.next_id] = track + self.next_id += 1 + + # Remove deleted tracks + tracks_to_remove = [tid for tid, track in self.tracks.items() if track.is_deleted()] + for tid in tracks_to_remove: + del self.tracks[tid] + + # Return confirmed tracks + confirmed_tracks = [track for track in self.tracks.values() if track.is_confirmed()] + + return confirmed_tracks + + def _associate(self, detections: List) -> Tuple[List[Tuple[int, Any]], List[Any], List[int]]: + """ + Associate detections to existing tracks using IoU distance + + Returns: + (matched_tracks, unmatched_detections, unmatched_tracks) + """ + if not detections or not self.tracks: + return [], detections, list(self.tracks.keys()) + + # Calculate IoU distance matrix + track_ids = list(self.tracks.keys()) + cost_matrix = np.zeros((len(track_ids), len(detections))) + + for i, track_id in enumerate(track_ids): + track = self.tracks[track_id] + predicted_bbox = track.predict() + + for j, detection in enumerate(detections): + iou = self._calculate_iou(predicted_bbox, detection.bbox) + cost_matrix[i, j] = 1 - iou # Convert IoU to distance + + # Solve assignment problem + row_indices, col_indices = linear_sum_assignment(cost_matrix) + + # Filter matches by IoU threshold + iou_threshold = 0.3 + matched_tracks = [] + matched_detection_indices = set() + matched_track_indices = set() + + for row, col in zip(row_indices, col_indices): + if cost_matrix[row, col] <= (1 - iou_threshold): + track_id = track_ids[row] + detection = detections[col] + matched_tracks.append((track_id, detection)) + matched_detection_indices.add(col) + matched_track_indices.add(row) + + # Find unmatched detections and tracks + unmatched_detections = [detections[i] for i in range(len(detections)) + if i not in matched_detection_indices] + unmatched_tracks = [track_ids[i] for i in range(len(track_ids)) + if i not in matched_track_indices] + + return matched_tracks, unmatched_detections, unmatched_tracks + + def _calculate_iou(self, bbox1: np.ndarray, bbox2: List[float]) -> float: + """Calculate IoU between two bounding boxes""" + x1_1, y1_1, x2_1, y2_1 = bbox1 + x1_2, y1_2, x2_2, y2_2 = bbox2 + + # Calculate intersection area + x1_i = max(x1_1, x1_2) + y1_i = max(y1_1, y1_2) + x2_i = min(x2_1, x2_2) + y2_i = min(y2_1, y2_2) + + if x2_i <= x1_i or y2_i <= y1_i: + return 0.0 + + intersection = (x2_i - x1_i) * (y2_i - y1_i) + + # Calculate union area + area1 = (x2_1 - x1_1) * (y2_1 - y1_1) + area2 = (x2_2 - x1_2) * (y2_2 - y1_2) + union = area1 + area2 - intersection + + return intersection / union if union > 0 else 0.0 + + +class MultiCameraBoTSORT: + """ + Multi-camera BoT-SORT tracker with complete camera isolation + """ + + def __init__(self, trigger_classes: List[str], min_confidence: float = 0.6): + """ + Initialize multi-camera tracker + + Args: + trigger_classes: List of class names to track + min_confidence: Minimum detection confidence threshold + """ + self.trigger_classes = trigger_classes + self.min_confidence = min_confidence + + # Camera-specific trackers + self.camera_trackers: Dict[str, CameraTracker] = {} + + logger.info(f"Initialized MultiCameraBoTSORT with classes={trigger_classes}, " + f"min_confidence={min_confidence}") + + def get_or_create_tracker(self, camera_id: str) -> CameraTracker: + """Get or create tracker for specific camera""" + if camera_id not in self.camera_trackers: + self.camera_trackers[camera_id] = CameraTracker(camera_id) + logger.info(f"Created new tracker for camera {camera_id}") + + return self.camera_trackers[camera_id] + + def update(self, camera_id: str, inference_result) -> List[Dict]: + """ + Update tracker for specific camera with detections + + Args: + camera_id: Camera identifier + inference_result: InferenceResult with detections + + Returns: + List of track information dictionaries + """ + # Filter detections by confidence and trigger classes + filtered_detections = [] + + if hasattr(inference_result, 'detections') and inference_result.detections: + for detection in inference_result.detections: + if (detection.confidence >= self.min_confidence and + detection.class_name in self.trigger_classes): + filtered_detections.append(detection) + + # Get camera tracker and update + tracker = self.get_or_create_tracker(camera_id) + confirmed_tracks = tracker.update(filtered_detections) + + # Convert tracks to output format + track_results = [] + for track in confirmed_tracks: + track_results.append({ + 'track_id': track.track_id, + 'camera_id': track.camera_id, + 'bbox': track.bbox, + 'confidence': track.confidence, + 'class_name': track.class_name, + 'hit_streak': track.hit_streak, + 'age': track.age + }) + + return track_results + + def get_statistics(self) -> Dict[str, Any]: + """Get tracking statistics across all cameras""" + stats = {} + total_tracks = 0 + + for camera_id, tracker in self.camera_trackers.items(): + camera_stats = { + 'active_tracks': len([t for t in tracker.tracks.values() if t.is_confirmed()]), + 'total_tracks': len(tracker.tracks), + 'frame_count': tracker.frame_count + } + stats[camera_id] = camera_stats + total_tracks += camera_stats['active_tracks'] + + stats['summary'] = { + 'total_cameras': len(self.camera_trackers), + 'total_active_tracks': total_tracks + } + + return stats + + def reset_camera(self, camera_id: str): + """Reset tracking for specific camera""" + if camera_id in self.camera_trackers: + del self.camera_trackers[camera_id] + logger.info(f"Reset tracking for camera {camera_id}") + + def reset_all(self): + """Reset all camera trackers""" + self.camera_trackers.clear() + logger.info("Reset all camera trackers") \ No newline at end of file diff --git a/core/tracking/integration.py b/core/tracking/integration.py index a10acf8..3f1ebe0 100644 --- a/core/tracking/integration.py +++ b/core/tracking/integration.py @@ -63,7 +63,7 @@ class TrackingPipelineIntegration: self.pending_processing_data: Dict[str, Dict] = {} # display_id -> processing data (waiting for session ID) # Additional validators for enhanced flow control - self.permanently_processed: Dict[int, float] = {} # track_id -> process_time (never process again) + self.permanently_processed: Dict[str, float] = {} # "camera_id:track_id" -> process_time (never process again) self.progression_stages: Dict[str, str] = {} # session_id -> current_stage self.last_detection_time: Dict[str, float] = {} # display_id -> last_detection_timestamp self.abandonment_timeout = 3.0 # seconds to wait before declaring car abandoned @@ -183,7 +183,7 @@ class TrackingPipelineIntegration: # Run tracking model if self.tracking_model: - # Run inference with tracking + # Run detection-only (tracking handled by our own tracker) tracking_results = self.tracking_model.track( frame, confidence_threshold=self.tracker.min_confidence, @@ -486,7 +486,10 @@ class TrackingPipelineIntegration: self.session_vehicles[session_id] = track_id # Mark vehicle as permanently processed (won't process again even after session clear) - self.permanently_processed[track_id] = time.time() + # Use composite key to distinguish same track IDs across different cameras + camera_id = display_id # Using display_id as camera_id for isolation + permanent_key = f"{camera_id}:{track_id}" + self.permanently_processed[permanent_key] = time.time() # Remove from pending del self.pending_vehicles[display_id] @@ -667,6 +670,7 @@ class TrackingPipelineIntegration: self.executor.shutdown(wait=False) self.reset_tracking() + # Cleanup detection pipeline if self.detection_pipeline: self.detection_pipeline.cleanup() diff --git a/core/tracking/tracker.py b/core/tracking/tracker.py index 6fa6ed9..63d0299 100644 --- a/core/tracking/tracker.py +++ b/core/tracking/tracker.py @@ -1,6 +1,6 @@ """ -Vehicle Tracking Module - Continuous tracking with front_rear_detection model -Implements vehicle identification, persistence, and motion analysis. +Vehicle Tracking Module - BoT-SORT based tracking with camera isolation +Implements vehicle identification, persistence, and motion analysis using external tracker. """ import logging import time @@ -10,6 +10,8 @@ from dataclasses import dataclass, field import numpy as np from threading import Lock +from .bot_sort_tracker import MultiCameraBoTSORT + logger = logging.getLogger(__name__) @@ -17,6 +19,7 @@ logger = logging.getLogger(__name__) class TrackedVehicle: """Represents a tracked vehicle with all its state information.""" track_id: int + camera_id: str first_seen: float last_seen: float session_id: Optional[str] = None @@ -30,6 +33,8 @@ class TrackedVehicle: processed_pipeline: bool = False last_position_history: List[Tuple[float, float]] = field(default_factory=list) avg_confidence: float = 0.0 + hit_streak: int = 0 + age: int = 0 def update_position(self, bbox: Tuple[int, int, int, int], confidence: float): """Update vehicle position and confidence.""" @@ -73,7 +78,7 @@ class TrackedVehicle: class VehicleTracker: """ - Main vehicle tracking implementation using YOLO tracking capabilities. + Main vehicle tracking implementation using BoT-SORT with camera isolation. Manages continuous tracking, vehicle identification, and state persistence. """ @@ -88,18 +93,19 @@ class VehicleTracker: self.trigger_classes = self.config.get('trigger_classes', self.config.get('triggerClasses', ['frontal'])) self.min_confidence = self.config.get('minConfidence', 0.6) - # Tracking state - self.tracked_vehicles: Dict[int, TrackedVehicle] = {} - self.next_track_id = 1 + # BoT-SORT multi-camera tracker + self.bot_sort = MultiCameraBoTSORT(self.trigger_classes, self.min_confidence) + + # Tracking state - maintain compatibility with existing code + self.tracked_vehicles: Dict[str, Dict[int, TrackedVehicle]] = {} # camera_id -> {track_id: vehicle} self.lock = Lock() # Tracking parameters self.stability_threshold = 0.7 self.min_stable_frames = 5 - self.position_tolerance = 50 # pixels self.timeout_seconds = 2.0 - logger.info(f"VehicleTracker initialized with trigger_classes={self.trigger_classes}, " + logger.info(f"VehicleTracker initialized with BoT-SORT: trigger_classes={self.trigger_classes}, " f"min_confidence={self.min_confidence}") def process_detections(self, @@ -107,10 +113,10 @@ class VehicleTracker: display_id: str, frame: np.ndarray) -> List[TrackedVehicle]: """ - Process YOLO detection results and update tracking state. + Process detection results using BoT-SORT tracking. Args: - results: YOLO detection results with tracking + results: Detection results (InferenceResult) display_id: Display identifier for this stream frame: Current frame being processed @@ -118,108 +124,67 @@ class VehicleTracker: List of currently tracked vehicles """ current_time = time.time() - active_tracks = [] + + # Extract camera_id from display_id for tracking isolation + camera_id = display_id # Using display_id as camera_id for isolation with self.lock: - # Clean up expired tracks - expired_ids = [ - track_id for track_id, vehicle in self.tracked_vehicles.items() - if vehicle.is_expired(self.timeout_seconds) - ] - for track_id in expired_ids: - logger.debug(f"Removing expired track {track_id}") - del self.tracked_vehicles[track_id] + # Update BoT-SORT tracker + track_results = self.bot_sort.update(camera_id, results) - # Process new detections from InferenceResult - if hasattr(results, 'detections') and results.detections: - # Process detections from InferenceResult - for detection in results.detections: - # Skip if confidence is too low - if detection.confidence < self.min_confidence: - continue + # Ensure camera tracking dict exists + if camera_id not in self.tracked_vehicles: + self.tracked_vehicles[camera_id] = {} - # Check if class is in trigger classes - if detection.class_name not in self.trigger_classes: - continue + # Update tracked vehicles based on BoT-SORT results + current_tracks = {} + active_tracks = [] - # Use track_id if available, otherwise generate one - track_id = detection.track_id if detection.track_id is not None else self.next_track_id - if detection.track_id is None: - self.next_track_id += 1 + for track_result in track_results: + track_id = track_result['track_id'] - # Get bounding box from Detection object - x1, y1, x2, y2 = detection.bbox - bbox = (int(x1), int(y1), int(x2), int(y2)) + # Create or update TrackedVehicle + if track_id in self.tracked_vehicles[camera_id]: + # Update existing vehicle + vehicle = self.tracked_vehicles[camera_id][track_id] + vehicle.update_position(track_result['bbox'], track_result['confidence']) + vehicle.hit_streak = track_result['hit_streak'] + vehicle.age = track_result['age'] - # Update or create tracked vehicle - confidence = detection.confidence - if track_id in self.tracked_vehicles: - # Update existing track - vehicle = self.tracked_vehicles[track_id] - vehicle.update_position(bbox, confidence) - vehicle.display_id = display_id + # Update stability based on hit_streak + if vehicle.hit_streak >= self.min_stable_frames: + vehicle.is_stable = True + vehicle.stable_frames = vehicle.hit_streak - # Check stability - stability = vehicle.calculate_stability() - if stability > self.stability_threshold: - vehicle.stable_frames += 1 - if vehicle.stable_frames >= self.min_stable_frames: - vehicle.is_stable = True - else: - vehicle.stable_frames = max(0, vehicle.stable_frames - 1) - if vehicle.stable_frames < self.min_stable_frames: - vehicle.is_stable = False + logger.debug(f"Updated track {track_id}: conf={vehicle.confidence:.2f}, " + f"stable={vehicle.is_stable}, hit_streak={vehicle.hit_streak}") + else: + # Create new vehicle + x1, y1, x2, y2 = track_result['bbox'] + vehicle = TrackedVehicle( + track_id=track_id, + camera_id=camera_id, + first_seen=current_time, + last_seen=current_time, + display_id=display_id, + confidence=track_result['confidence'], + bbox=tuple(track_result['bbox']), + center=((x1 + x2) / 2, (y1 + y2) / 2), + total_frames=1, + hit_streak=track_result['hit_streak'], + age=track_result['age'] + ) + vehicle.last_position_history.append(vehicle.center) + logger.info(f"New vehicle tracked: ID={track_id}, camera={camera_id}, display={display_id}") - logger.debug(f"Updated track {track_id}: conf={confidence:.2f}, " - f"stable={vehicle.is_stable}, stability={stability:.2f}") - else: - # Create new track - vehicle = TrackedVehicle( - track_id=track_id, - first_seen=current_time, - last_seen=current_time, - display_id=display_id, - confidence=confidence, - bbox=bbox, - center=((x1 + x2) / 2, (y1 + y2) / 2), - total_frames=1 - ) - vehicle.last_position_history.append(vehicle.center) - self.tracked_vehicles[track_id] = vehicle - logger.info(f"New vehicle tracked: ID={track_id}, display={display_id}") + current_tracks[track_id] = vehicle + active_tracks.append(vehicle) - active_tracks.append(self.tracked_vehicles[track_id]) + # Update the camera's tracked vehicles + self.tracked_vehicles[camera_id] = current_tracks return active_tracks - def _find_closest_track(self, center: Tuple[float, float]) -> Optional[TrackedVehicle]: - """ - Find the closest existing track to a given position. - - Args: - center: Center position to match - - Returns: - Closest tracked vehicle if within tolerance, None otherwise - """ - min_distance = float('inf') - closest_track = None - - for vehicle in self.tracked_vehicles.values(): - if vehicle.is_expired(0.5): # Shorter timeout for matching - continue - - distance = np.sqrt( - (center[0] - vehicle.center[0]) ** 2 + - (center[1] - vehicle.center[1]) ** 2 - ) - - if distance < min_distance and distance < self.position_tolerance: - min_distance = distance - closest_track = vehicle - - return closest_track - def get_stable_vehicles(self, display_id: Optional[str] = None) -> List[TrackedVehicle]: """ Get all stable vehicles, optionally filtered by display. @@ -231,11 +196,15 @@ class VehicleTracker: List of stable tracked vehicles """ with self.lock: - stable = [ - v for v in self.tracked_vehicles.values() - if v.is_stable and not v.is_expired(self.timeout_seconds) - and (display_id is None or v.display_id == display_id) - ] + stable = [] + camera_id = display_id # Using display_id as camera_id + + if camera_id in self.tracked_vehicles: + for vehicle in self.tracked_vehicles[camera_id].values(): + if (vehicle.is_stable and not vehicle.is_expired(self.timeout_seconds) and + (display_id is None or vehicle.display_id == display_id)): + stable.append(vehicle) + return stable def get_vehicle_by_session(self, session_id: str) -> Optional[TrackedVehicle]: @@ -249,9 +218,11 @@ class VehicleTracker: Tracked vehicle if found, None otherwise """ with self.lock: - for vehicle in self.tracked_vehicles.values(): - if vehicle.session_id == session_id: - return vehicle + # Search across all cameras + for camera_vehicles in self.tracked_vehicles.values(): + for vehicle in camera_vehicles.values(): + if vehicle.session_id == session_id: + return vehicle return None def mark_processed(self, track_id: int, session_id: str): @@ -263,11 +234,14 @@ class VehicleTracker: session_id: Session ID assigned to this vehicle """ with self.lock: - if track_id in self.tracked_vehicles: - vehicle = self.tracked_vehicles[track_id] - vehicle.processed_pipeline = True - vehicle.session_id = session_id - logger.info(f"Marked vehicle {track_id} as processed with session {session_id}") + # Search across all cameras for the track_id + for camera_vehicles in self.tracked_vehicles.values(): + if track_id in camera_vehicles: + vehicle = camera_vehicles[track_id] + vehicle.processed_pipeline = True + vehicle.session_id = session_id + logger.info(f"Marked vehicle {track_id} as processed with session {session_id}") + return def clear_session(self, session_id: str): """ @@ -277,30 +251,43 @@ class VehicleTracker: session_id: Session ID to clear """ with self.lock: - for vehicle in self.tracked_vehicles.values(): - if vehicle.session_id == session_id: - logger.info(f"Clearing session {session_id} from vehicle {vehicle.track_id}") - vehicle.session_id = None - # Keep processed_pipeline=True to prevent re-processing + # Search across all cameras + for camera_vehicles in self.tracked_vehicles.values(): + for vehicle in camera_vehicles.values(): + if vehicle.session_id == session_id: + logger.info(f"Clearing session {session_id} from vehicle {vehicle.track_id}") + vehicle.session_id = None + # Keep processed_pipeline=True to prevent re-processing def reset_tracking(self): """Reset all tracking state.""" with self.lock: self.tracked_vehicles.clear() - self.next_track_id = 1 + self.bot_sort.reset_all() logger.info("Vehicle tracking state reset") def get_statistics(self) -> Dict: """Get tracking statistics.""" with self.lock: - total = len(self.tracked_vehicles) - stable = sum(1 for v in self.tracked_vehicles.values() if v.is_stable) - processed = sum(1 for v in self.tracked_vehicles.values() if v.processed_pipeline) + total = 0 + stable = 0 + processed = 0 + all_confidences = [] + + # Aggregate stats across all cameras + for camera_vehicles in self.tracked_vehicles.values(): + total += len(camera_vehicles) + for vehicle in camera_vehicles.values(): + if vehicle.is_stable: + stable += 1 + if vehicle.processed_pipeline: + processed += 1 + all_confidences.append(vehicle.avg_confidence) return { 'total_tracked': total, 'stable_vehicles': stable, 'processed_vehicles': processed, - 'avg_confidence': np.mean([v.avg_confidence for v in self.tracked_vehicles.values()]) - if self.tracked_vehicles else 0.0 + 'avg_confidence': np.mean(all_confidences) if all_confidences else 0.0, + 'bot_sort_stats': self.bot_sort.get_statistics() } \ No newline at end of file diff --git a/core/tracking/validator.py b/core/tracking/validator.py index d90d4ec..d86a3f6 100644 --- a/core/tracking/validator.py +++ b/core/tracking/validator.py @@ -36,8 +36,14 @@ class ValidationResult: class StableCarValidator: """ - Validates whether a tracked vehicle is stable (fueling) or just passing by. - Uses multiple criteria including position stability, duration, and movement patterns. + Validates whether a tracked vehicle should be processed through the pipeline. + + Updated for BoT-SORT integration: Trusts the sophisticated BoT-SORT tracking algorithm + for stability determination and focuses on business logic validation: + - Duration requirements for processing + - Confidence thresholds + - Session management and cooldowns + - Camera isolation with composite keys """ def __init__(self, config: Optional[Dict] = None): @@ -169,7 +175,10 @@ class StableCarValidator: def _determine_vehicle_state(self, vehicle: TrackedVehicle) -> VehicleState: """ - Determine the current state of the vehicle based on movement patterns. + Determine the current state of the vehicle based on BoT-SORT tracking results. + + BoT-SORT provides sophisticated tracking, so we trust its stability determination + and focus on business logic validation. Args: vehicle: The tracked vehicle @@ -177,53 +186,44 @@ class StableCarValidator: Returns: Current vehicle state """ - # Not enough data - if len(vehicle.last_position_history) < 3: - return VehicleState.UNKNOWN - - # Calculate velocity - velocity = self._calculate_velocity(vehicle) - - # Get position zones - x_position = vehicle.center[0] / self.frame_width - y_position = vehicle.center[1] / self.frame_height - - # Check if vehicle is stable - stability = vehicle.calculate_stability() - if stability > 0.7 and velocity < self.velocity_threshold: - # Check if it's been stable long enough + # Trust BoT-SORT's stability determination + if vehicle.is_stable: + # Check if it's been stable long enough for processing duration = time.time() - vehicle.first_seen - if duration > self.min_stable_duration and vehicle.stable_frames >= self.min_stable_frames: + if duration >= self.min_stable_duration: return VehicleState.STABLE else: return VehicleState.ENTERING - # Check if vehicle is entering or leaving + # For non-stable vehicles, use simplified state determination + if len(vehicle.last_position_history) < 2: + return VehicleState.UNKNOWN + + # Calculate velocity for movement classification + velocity = self._calculate_velocity(vehicle) + + # Basic movement classification if velocity > self.velocity_threshold: - # Determine direction based on position history - positions = np.array(vehicle.last_position_history) - if len(positions) >= 2: - direction = positions[-1] - positions[0] + # Vehicle is moving - classify as passing by or entering/leaving + x_position = vehicle.center[0] / self.frame_width - # Entering: moving towards center - if x_position < self.entering_zone_ratio or x_position > (1 - self.entering_zone_ratio): - if abs(direction[0]) > abs(direction[1]): # Horizontal movement - if (x_position < 0.5 and direction[0] > 0) or (x_position > 0.5 and direction[0] < 0): - return VehicleState.ENTERING + # Simple heuristic: vehicles near edges are entering/leaving, center vehicles are passing + if x_position < 0.2 or x_position > 0.8: + return VehicleState.ENTERING + else: + return VehicleState.PASSING_BY - # Leaving: moving away from center - if 0.3 < x_position < 0.7: # In center zone - if abs(direction[0]) > abs(direction[1]): # Horizontal movement - if abs(direction[0]) > 10: # Significant movement - return VehicleState.LEAVING - - return VehicleState.PASSING_BY - - return VehicleState.UNKNOWN + # Low velocity but not marked stable by tracker - likely entering + return VehicleState.ENTERING def _validate_stable_vehicle(self, vehicle: TrackedVehicle) -> ValidationResult: """ - Perform detailed validation of a stable vehicle. + Perform business logic validation of a stable vehicle. + + Since BoT-SORT already determined the vehicle is stable, we focus on: + - Duration requirements for processing + - Confidence thresholds + - Business logic constraints Args: vehicle: The stable vehicle to validate @@ -231,7 +231,7 @@ class StableCarValidator: Returns: Detailed validation result """ - # Check duration + # Check duration (business requirement) duration = time.time() - vehicle.first_seen if duration < self.min_stable_duration: return ValidationResult( @@ -243,18 +243,7 @@ class StableCarValidator: track_id=vehicle.track_id ) - # Check frame count - if vehicle.stable_frames < self.min_stable_frames: - return ValidationResult( - is_valid=False, - state=VehicleState.STABLE, - confidence=0.6, - reason=f"Not enough stable frames ({vehicle.stable_frames} < {self.min_stable_frames})", - should_process=False, - track_id=vehicle.track_id - ) - - # Check confidence + # Check confidence (business requirement) if vehicle.avg_confidence < self.min_confidence: return ValidationResult( is_valid=False, @@ -265,28 +254,19 @@ class StableCarValidator: track_id=vehicle.track_id ) - # Check position variance - variance = self._calculate_position_variance(vehicle) - if variance > self.position_variance_threshold: - return ValidationResult( - is_valid=False, - state=VehicleState.STABLE, - confidence=0.7, - reason=f"Position variance too high ({variance:.1f} > {self.position_variance_threshold})", - should_process=False, - track_id=vehicle.track_id - ) + # Trust BoT-SORT's stability determination - skip position variance check + # BoT-SORT's sophisticated tracking already ensures consistent positioning - # Check state history consistency + # Simplified state history check - just ensure recent stability if vehicle.track_id in self.validation_history: - history = self.validation_history[vehicle.track_id][-5:] # Last 5 states + history = self.validation_history[vehicle.track_id][-3:] # Last 3 states stable_count = sum(1 for s in history if s == VehicleState.STABLE) - if stable_count < 3: + if len(history) >= 2 and stable_count == 0: # Only fail if clear instability return ValidationResult( is_valid=False, state=VehicleState.STABLE, confidence=0.7, - reason="Inconsistent state history", + reason="Recent state history shows instability", should_process=False, track_id=vehicle.track_id ) @@ -298,7 +278,7 @@ class StableCarValidator: is_valid=True, state=VehicleState.STABLE, confidence=vehicle.avg_confidence, - reason="Vehicle is stable and ready for processing", + reason="Vehicle is stable and ready for processing (BoT-SORT validated)", should_process=True, track_id=vehicle.track_id ) @@ -354,25 +334,28 @@ class StableCarValidator: def should_skip_same_car(self, vehicle: TrackedVehicle, session_cleared: bool = False, - permanently_processed: Dict[int, float] = None) -> bool: + permanently_processed: Dict[str, float] = None) -> bool: """ Determine if we should skip processing for the same car after session clear. Args: vehicle: The tracked vehicle session_cleared: Whether the session was recently cleared - permanently_processed: Dict of permanently processed vehicles + permanently_processed: Dict of permanently processed vehicles (camera_id:track_id -> time) Returns: True if we should skip this vehicle """ # Check if this vehicle was permanently processed (never process again) - if permanently_processed and vehicle.track_id in permanently_processed: - process_time = permanently_processed[vehicle.track_id] - time_since = time.time() - process_time - logger.debug(f"Skipping permanently processed vehicle {vehicle.track_id} " - f"(processed {time_since:.1f}s ago)") - return True + if permanently_processed: + # Create composite key using camera_id and track_id + permanent_key = f"{vehicle.camera_id}:{vehicle.track_id}" + if permanent_key in permanently_processed: + process_time = permanently_processed[permanent_key] + time_since = time.time() - process_time + logger.debug(f"Skipping permanently processed vehicle {vehicle.track_id} on camera {vehicle.camera_id} " + f"(processed {time_since:.1f}s ago)") + return True # If vehicle has a session_id but it was cleared, skip for a period if vehicle.session_id is None and vehicle.processed_pipeline and session_cleared: