From d3d9c426f89e5d24c8613949671a21f74eb47ff2 Mon Sep 17 00:00:00 2001 From: ziesorx Date: Tue, 23 Sep 2025 21:21:27 +0700 Subject: [PATCH] feat: validation with tracking works --- core/streaming/readers.py | 177 ++++++++++++++++++++++++++++++----- core/tracking/integration.py | 63 +++++++------ 2 files changed, 187 insertions(+), 53 deletions(-) diff --git a/core/streaming/readers.py b/core/streaming/readers.py index 3064886..f2da909 100644 --- a/core/streaming/readers.py +++ b/core/streaming/readers.py @@ -52,38 +52,46 @@ class RTSPReader: logger.info(f"Stopped RTSP reader for camera {self.camera_id}") def _read_frames(self): - """Main frame reading loop.""" + """Main frame reading loop with improved error handling and stream recovery.""" retries = 0 frame_count = 0 last_log_time = time.time() + consecutive_errors = 0 + last_successful_frame_time = time.time() try: - # Initialize video capture - self.cap = cv2.VideoCapture(self.rtsp_url) - - # Set buffer size to 1 to get latest frames - self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) - - if self.cap.isOpened(): - width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) - height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) - fps = self.cap.get(cv2.CAP_PROP_FPS) - logger.info(f"Camera {self.camera_id} opened: {width}x{height}, FPS: {fps}") - else: - logger.error(f"Camera {self.camera_id} failed to open initially") + # Initialize video capture with optimized parameters + self._initialize_capture() while not self.stop_event.is_set(): try: - if not self.cap.isOpened(): - logger.error(f"Camera {self.camera_id} not open, attempting to reopen") - self.cap.open(self.rtsp_url) + # Check if stream needs recovery + if not self.cap or not self.cap.isOpened(): + logger.warning(f"Camera {self.camera_id} not open, reinitializing") + self._initialize_capture() time.sleep(1) continue + # Check for stream timeout (no frames for 30 seconds) + if time.time() - last_successful_frame_time > 30: + logger.warning(f"Camera {self.camera_id} stream timeout, reinitializing") + self._initialize_capture() + last_successful_frame_time = time.time() + continue + ret, frame = self.cap.read() if not ret or frame is None: - logger.warning(f"Failed to read frame from camera {self.camera_id}") + consecutive_errors += 1 + logger.warning(f"Failed to read frame from camera {self.camera_id} (consecutive errors: {consecutive_errors})") + + # Force stream recovery after multiple consecutive errors + if consecutive_errors >= 5: + logger.warning(f"Camera {self.camera_id}: Too many consecutive errors, reinitializing stream") + self._initialize_capture() + consecutive_errors = 0 + continue + retries += 1 if retries > self.max_retries and self.max_retries != -1: logger.error(f"Max retries reached for camera {self.camera_id}") @@ -91,9 +99,21 @@ class RTSPReader: time.sleep(0.1) continue - # Reset retry counter on successful read + # Skip frame validation for now - let YOLO handle corrupted frames + # if not self._is_frame_valid(frame): + # logger.debug(f"Invalid frame detected for camera {self.camera_id}, skipping") + # consecutive_errors += 1 + # if consecutive_errors >= 10: # Reinitialize after many invalid frames + # logger.warning(f"Camera {self.camera_id}: Too many invalid frames, reinitializing") + # self._initialize_capture() + # consecutive_errors = 0 + # continue + + # Reset counters on successful read retries = 0 + consecutive_errors = 0 frame_count += 1 + last_successful_frame_time = time.time() # Call frame callback if set if self.frame_callback: @@ -102,15 +122,40 @@ class RTSPReader: # Log progress every 30 seconds current_time = time.time() if current_time - last_log_time >= 30: - logger.info(f"Camera {self.camera_id}: {frame_count} frames processed") + logger.info(f"Camera {self.camera_id}: {frame_count} frames processed, {consecutive_errors} consecutive errors") last_log_time = current_time - # Small delay to prevent CPU overload - time.sleep(0.033) # ~30 FPS + # Adaptive delay based on stream FPS and performance + if consecutive_errors == 0: + # Calculate frame delay based on actual FPS + try: + actual_fps = self.cap.get(cv2.CAP_PROP_FPS) + if actual_fps > 0 and actual_fps <= 120: # Reasonable bounds + delay = 1.0 / actual_fps + # Mock cam: 60fps -> ~16.7ms delay + # Real cam: 6fps -> ~167ms delay + else: + # Fallback for invalid FPS values + delay = 0.033 # Default 30 FPS (33ms) + except Exception as e: + logger.debug(f"Failed to get FPS for delay calculation: {e}") + delay = 0.033 # Fallback to 30 FPS + else: + delay = 0.1 # Slower when having issues (100ms) + + time.sleep(delay) except Exception as e: logger.error(f"Error reading frame from camera {self.camera_id}: {e}") + consecutive_errors += 1 retries += 1 + + # Force reinitialization on severe errors + if consecutive_errors >= 3: + logger.warning(f"Camera {self.camera_id}: Severe errors detected, reinitializing stream") + self._initialize_capture() + consecutive_errors = 0 + if retries > self.max_retries and self.max_retries != -1: break time.sleep(1) @@ -122,6 +167,94 @@ class RTSPReader: self.cap.release() logger.info(f"RTSP reader thread ended for camera {self.camera_id}") + def _initialize_capture(self): + """Initialize or reinitialize video capture with optimized settings.""" + try: + # Release previous capture if exists + if self.cap: + self.cap.release() + time.sleep(0.1) + + # Create new capture with enhanced RTSP URL parameters + enhanced_url = self._enhance_rtsp_url(self.rtsp_url) + logger.debug(f"Initializing capture for camera {self.camera_id} with URL: {enhanced_url}") + + self.cap = cv2.VideoCapture(enhanced_url) + + if not self.cap.isOpened(): + # Try again with different backend + logger.debug(f"Retrying capture initialization with different backend for camera {self.camera_id}") + self.cap = cv2.VideoCapture(enhanced_url, cv2.CAP_FFMPEG) + + if self.cap.isOpened(): + # Get actual stream properties first + width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + fps = self.cap.get(cv2.CAP_PROP_FPS) + + # Adaptive buffer settings based on FPS and resolution + # Mock cam: 1920x1080@60fps, Real cam: 1280x720@6fps + if fps > 30: + # High FPS streams (like mock cam) need larger buffer + buffer_size = 5 + elif fps > 15: + # Medium FPS streams + buffer_size = 3 + else: + # Low FPS streams (like real cam) can use smaller buffer + buffer_size = 2 + + # Apply buffer size with bounds checking + try: + self.cap.set(cv2.CAP_PROP_BUFFERSIZE, buffer_size) + actual_buffer = int(self.cap.get(cv2.CAP_PROP_BUFFERSIZE)) + logger.debug(f"Camera {self.camera_id}: Buffer size set to {buffer_size}, actual: {actual_buffer}") + except Exception as e: + logger.warning(f"Failed to set buffer size for camera {self.camera_id}: {e}") + + # Don't override FPS - let stream use its natural rate + # This works for both mock cam (60fps) and real cam (6fps) + logger.debug(f"Camera {self.camera_id}: Using native FPS {fps}") + + # Additional optimization for high resolution streams + if width * height > 1920 * 1080: + logger.info(f"Camera {self.camera_id}: High resolution stream detected, applying optimizations") + + logger.info(f"Camera {self.camera_id} initialized: {width}x{height}, FPS: {fps}, Buffer: {buffer_size}") + return True + else: + logger.error(f"Failed to initialize camera {self.camera_id}") + return False + + except Exception as e: + logger.error(f"Error initializing capture for camera {self.camera_id}: {e}") + return False + + def _enhance_rtsp_url(self, rtsp_url: str) -> str: + """Use RTSP URL exactly as provided by backend without modification.""" + return rtsp_url + + def _is_frame_valid(self, frame) -> bool: + """Validate frame integrity to detect corrupted frames.""" + if frame is None: + return False + + # Check frame dimensions + if frame.shape[0] < 10 or frame.shape[1] < 10: + return False + + # Check if frame is completely black or completely white (possible corruption) + mean_val = np.mean(frame) + if mean_val < 1 or mean_val > 254: + return False + + # Check for excessive noise/corruption (very high standard deviation) + std_val = np.std(frame) + if std_val > 100: # Threshold for detecting very noisy/corrupted frames + return False + + return True + class HTTPSnapshotReader: """HTTP snapshot reader for periodic image capture.""" diff --git a/core/tracking/integration.py b/core/tracking/integration.py index 957e8a9..ccddab7 100644 --- a/core/tracking/integration.py +++ b/core/tracking/integration.py @@ -50,6 +50,7 @@ class TrackingPipelineIntegration: self.active_sessions: Dict[str, str] = {} # display_id -> session_id self.session_vehicles: Dict[str, int] = {} # session_id -> track_id self.cleared_sessions: Dict[str, float] = {} # session_id -> clear_time + self.pending_vehicles: Dict[str, int] = {} # display_id -> track_id (waiting for session ID) # Thread pool for pipeline execution self.executor = ThreadPoolExecutor(max_workers=2) @@ -64,8 +65,6 @@ class TrackingPipelineIntegration: # Test mode for mock detection self.test_mode = True - self.test_detection_sent = False - self.start_time = time.time() logger.info("TrackingPipelineIntegration initialized") @@ -225,30 +224,26 @@ class TrackingPipelineIntegration: 'confidence': validation_result.confidence } - # Generate session ID if not provided - if not session_id: - session_id = str(uuid.uuid4()) - logger.info(f"Generated session ID: {session_id}") + # Send mock image detection message in test mode + # Note: Backend will generate and send back session ID via setSessionId + if self.test_mode: + await self._send_mock_detection(subscription_id, None) - # Mark vehicle as processed - self.tracker.mark_processed(vehicle.track_id, session_id) - self.session_vehicles[session_id] = vehicle.track_id - self.active_sessions[display_id] = session_id - - # Send mock image detection message as per worker.md specification - await self._send_mock_detection(subscription_id, session_id) + # Mark vehicle as pending session ID assignment + self.pending_vehicles[display_id] = vehicle.track_id + logger.info(f"Vehicle {vehicle.track_id} waiting for session ID from backend") # Execute detection pipeline (placeholder for Phase 5) pipeline_result = await self._execute_pipeline( frame, vehicle, display_id, - session_id, + None, # No session ID yet subscription_id ) result['pipeline_result'] = pipeline_result - result['session_id'] = session_id + # No session_id in result yet - backend will provide it self.stats['pipelines_executed'] += 1 # Only process one vehicle per frame @@ -263,12 +258,6 @@ class TrackingPipelineIntegration: except Exception as e: logger.error(f"Error in tracking pipeline: {e}", exc_info=True) - # TEST MODE: Send mock detection after 10 seconds to test WebSocket communication - if self.test_mode and not self.test_detection_sent and (time.time() - self.start_time) > 10: - self.test_detection_sent = True - test_session_id = f"test-session-{int(time.time())}" - logger.info(f"[TEST MODE] Triggering mock detection with session {test_session_id}") - await self._send_mock_detection(subscription_id, test_session_id) result['processing_time'] = time.time() - start_time return result @@ -326,12 +315,12 @@ class TrackingPipelineIntegration: # Create flat detection data as required by the model detection_data = { - "carModel": "Civic", - "carBrand": "Honda", - "carYear": 2023, - "bodyType": "Sedan", - "licensePlateText": "MOCK123", - "licensePlateConfidence": 0.95 + "carModel": None, + "carBrand": None, + "carYear": None, + "bodyType": None, + "licensePlateText": None, + "licensePlateConfidence": None } # Get model info @@ -364,6 +353,7 @@ class TrackingPipelineIntegration: def set_session_id(self, display_id: str, session_id: str): """ Set session ID for a display (from backend). + This is called when backend sends setSessionId after receiving imageDetection. Args: display_id: Display identifier @@ -372,10 +362,20 @@ class TrackingPipelineIntegration: self.active_sessions[display_id] = session_id logger.info(f"Set session {session_id} for display {display_id}") - # Find vehicle with this session - vehicle = self.tracker.get_vehicle_by_session(session_id) - if vehicle: - self.session_vehicles[session_id] = vehicle.track_id + # Check if we have a pending vehicle for this display + if display_id in self.pending_vehicles: + track_id = self.pending_vehicles[display_id] + + # Mark vehicle as processed with the session ID + self.tracker.mark_processed(track_id, session_id) + self.session_vehicles[session_id] = track_id + + # Remove from pending + del self.pending_vehicles[display_id] + + logger.info(f"Assigned session {session_id} to vehicle {track_id}, marked as processed") + else: + logger.warning(f"No pending vehicle found for display {display_id} when setting session {session_id}") def clear_session_id(self, session_id: str): """ @@ -424,6 +424,7 @@ class TrackingPipelineIntegration: self.active_sessions.clear() self.session_vehicles.clear() self.cleared_sessions.clear() + self.pending_vehicles.clear() logger.info("Tracking pipeline integration reset") def get_statistics(self) -> Dict[str, Any]: