From 0dd1b9f5c2682964581fe2b027eb639651b59831 Mon Sep 17 00:00:00 2001 From: ziesorx Date: Mon, 20 Oct 2025 17:54:50 +0700 Subject: [PATCH 1/3] fix: sent partial results --- core/detection/branches.py | 95 ++++++++++++++++++++++++++++++-------- core/detection/pipeline.py | 79 +++++++++++++++++++++++++++++++ 2 files changed, 155 insertions(+), 19 deletions(-) diff --git a/core/detection/branches.py b/core/detection/branches.py index 97c44ff..61b6dbb 100644 --- a/core/detection/branches.py +++ b/core/detection/branches.py @@ -45,12 +45,17 @@ class BranchProcessor: self.redis_manager = None self.db_manager = None + # Branch execution timeout (seconds) + self.branch_timeout = 30.0 + # Statistics self.stats = { 'branches_processed': 0, 'parallel_executions': 0, 'total_processing_time': 0.0, - 'models_loaded': 0 + 'models_loaded': 0, + 'branches_timed_out': 0, + 'branches_failed': 0 } logger.info("BranchProcessor initialized") @@ -279,22 +284,46 @@ class BranchProcessor: ) future_to_branch[future] = branch - # Collect results as they complete - for future in as_completed(future_to_branch): - branch = future_to_branch[future] - branch_id = getattr(branch, 'model_id', 'unknown') + # Collect results as they complete with timeout + try: + for future in as_completed(future_to_branch, timeout=self.branch_timeout): + branch = future_to_branch[future] + branch_id = getattr(branch, 'model_id', 'unknown') - try: - result = future.result() - results[branch_id] = result - logger.info(f"[PARALLEL COMPLETE] {branch_id}: Branch completed successfully") - except Exception as e: - logger.error(f"Error in parallel branch {branch_id}: {e}") - results[branch_id] = { - 'status': 'error', - 'message': str(e), - 'processing_time': 0.0 - } + try: + # Get result with timeout to prevent indefinite hanging + result = future.result(timeout=self.branch_timeout) + results[branch_id] = result + logger.info(f"[PARALLEL COMPLETE] {branch_id}: Branch completed successfully") + except TimeoutError: + logger.error(f"[TIMEOUT] Branch {branch_id} exceeded timeout of {self.branch_timeout}s") + self.stats['branches_timed_out'] += 1 + results[branch_id] = { + 'status': 'timeout', + 'message': f'Branch execution timeout after {self.branch_timeout}s', + 'processing_time': self.branch_timeout + } + except Exception as e: + logger.error(f"[ERROR] Error in parallel branch {branch_id}: {e}", exc_info=True) + self.stats['branches_failed'] += 1 + results[branch_id] = { + 'status': 'error', + 'message': str(e), + 'processing_time': 0.0 + } + except TimeoutError: + # as_completed iterator timed out - mark remaining futures as timed out + logger.error(f"[TIMEOUT] Branch execution timeout after {self.branch_timeout}s - some branches did not complete") + for future, branch in future_to_branch.items(): + branch_id = getattr(branch, 'model_id', 'unknown') + if branch_id not in results: + logger.error(f"[TIMEOUT] Branch {branch_id} did not complete within timeout") + self.stats['branches_timed_out'] += 1 + results[branch_id] = { + 'status': 'timeout', + 'message': f'Branch did not complete within {self.branch_timeout}s timeout', + 'processing_time': self.branch_timeout + } # Flatten nested branch results to top level for database access flattened_results = {} @@ -309,6 +338,24 @@ class BranchProcessor: flattened_results[nested_branch_id] = nested_result logger.info(f"[FLATTEN] Added nested branch {nested_branch_id} to top-level results") + # Log summary of branch execution results + succeeded = [bid for bid, res in results.items() if res.get('status') == 'success'] + failed = [bid for bid, res in results.items() if res.get('status') == 'error'] + timed_out = [bid for bid, res in results.items() if res.get('status') == 'timeout'] + skipped = [bid for bid, res in results.items() if res.get('status') == 'skipped'] + + summary_parts = [] + if succeeded: + summary_parts.append(f"{len(succeeded)} succeeded: {', '.join(succeeded)}") + if failed: + summary_parts.append(f"{len(failed)} FAILED: {', '.join(failed)}") + if timed_out: + summary_parts.append(f"{len(timed_out)} TIMED OUT: {', '.join(timed_out)}") + if skipped: + summary_parts.append(f"{len(skipped)} skipped: {', '.join(skipped)}") + + logger.info(f"[PARALLEL SUMMARY] Branch execution completed: {' | '.join(summary_parts) if summary_parts else 'no branches'}") + return flattened_results async def _execute_sequential_branches(self, @@ -496,9 +543,19 @@ class BranchProcessor: # Use .predict() method for both detection and classification models inference_start = time.time() - detection_results = model.model.predict(input_frame, conf=min_confidence, verbose=False) - inference_time = time.time() - inference_start - logger.info(f"[INFERENCE DONE] {branch_id}: Predict completed in {inference_time:.3f}s using .predict() method") + try: + detection_results = model.model.predict(input_frame, conf=min_confidence, verbose=False) + inference_time = time.time() - inference_start + logger.info(f"[INFERENCE DONE] {branch_id}: Predict completed in {inference_time:.3f}s using .predict() method") + except Exception as inference_error: + inference_time = time.time() - inference_start + logger.error(f"[INFERENCE ERROR] {branch_id}: Model inference failed after {inference_time:.3f}s: {inference_error}", exc_info=True) + return { + 'status': 'error', + 'branch_id': branch_id, + 'message': f'Model inference failed: {str(inference_error)}', + 'processing_time': time.time() - start_time + } # Initialize branch_detections outside the conditional branch_detections = [] diff --git a/core/detection/pipeline.py b/core/detection/pipeline.py index 78001da..d71f525 100644 --- a/core/detection/pipeline.py +++ b/core/detection/pipeline.py @@ -445,6 +445,78 @@ class DetectionPipeline: except Exception as e: logger.error(f"Error sending initial detection imageDetection message: {e}", exc_info=True) + async def _send_classification_results(self, subscription_id: str, session_id: str, branch_results: Dict[str, Any]): + """ + Send imageDetection message with classification results (without license plate). + Called after processing phase completes to send partial results immediately. + + Args: + subscription_id: Subscription identifier to send message to + session_id: Session identifier + branch_results: Dictionary of branch execution results + """ + try: + if not self.message_sender: + logger.warning("No message sender configured, cannot send imageDetection") + return + + # Import here to avoid circular imports + from ..communication.models import ImageDetectionMessage, DetectionData + + # Extract classification fields from branch results + extracted_fields = self._extract_fields_from_branches(branch_results) + car_brand = extracted_fields.get('brand') + body_type = extracted_fields.get('body_type') + + # Log what we're sending + fields_status = [] + if car_brand is not None: + fields_status.append(f"brand={car_brand}") + else: + fields_status.append("brand=null") + if body_type is not None: + fields_status.append(f"bodyType={body_type}") + else: + fields_status.append("bodyType=null") + logger.info(f"[CLASSIFICATION] Sending partial results for session {session_id}: {', '.join(fields_status)}") + + # Create detection data with classification results (license plate still pending) + detection_data_obj = DetectionData( + detection={ + "carBrand": car_brand, + "carModel": None, # Not implemented yet + "bodyType": body_type, + "licensePlateText": None, # Will be sent later via license plate callback + "licensePlateConfidence": None + }, + modelId=self.model_id, + modelName=self.pipeline_parser.pipeline_config.model_id if self.pipeline_parser.pipeline_config else "detection_model" + ) + + # Create imageDetection message + detection_message = ImageDetectionMessage( + subscriptionIdentifier=subscription_id, + data=detection_data_obj + ) + + # Send message + await self.message_sender(detection_message) + + # Log with indication of partial results + null_fields = [] + if car_brand is None: + null_fields.append('brand') + if body_type is None: + null_fields.append('bodyType') + + if null_fields: + logger.info(f"[PARTIAL RESULTS] Sent imageDetection with PARTIAL results (null: {', '.join(null_fields)}) - brand='{car_brand}', bodyType='{body_type}' to '{subscription_id}'") + else: + logger.info(f"[CLASSIFICATION COMPLETE] Sent imageDetection with brand='{car_brand}', bodyType='{body_type}' to '{subscription_id}'") + + except Exception as e: + logger.error(f"Error sending classification results imageDetection message: {e}", exc_info=True) + async def execute_detection_phase(self, frame: np.ndarray, display_id: str, @@ -693,6 +765,13 @@ class DetectionPipeline: self.session_processing_results[session_id] = result['branch_results'] logger.info(f"[PROCESSING RESULTS] Stored results for session {session_id} for later combination") + # Send classification results immediately (license plate will come later via callback) + await self._send_classification_results( + subscription_id=subscription_id, + session_id=session_id, + branch_results=result['branch_results'] + ) + logger.info(f"Processing phase completed for session {session_id}: " f"{len(result['branch_results'])} branches, {len(result['actions_executed'])} actions") From f495b47a9611a4058746212ef577e367ac948ace Mon Sep 17 00:00:00 2001 From: ziesorx Date: Mon, 20 Oct 2025 18:04:23 +0700 Subject: [PATCH 2/3] fix: increase tracking thread and lower tracking min confidence --- core/tracking/integration.py | 3 ++- core/tracking/validator.py | 11 ++++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/core/tracking/integration.py b/core/tracking/integration.py index 1e3fc97..6ff2ee7 100644 --- a/core/tracking/integration.py +++ b/core/tracking/integration.py @@ -70,7 +70,8 @@ class TrackingPipelineIntegration: self.abandonment_timeout = 3.0 # seconds to wait before declaring car abandoned # Thread pool for pipeline execution - self.executor = ThreadPoolExecutor(max_workers=2) + # Increased to 8 workers to handle 8 concurrent cameras without queuing + self.executor = ThreadPoolExecutor(max_workers=8) # Min bbox filtering configuration # TODO: Make this configurable via pipeline.json in the future diff --git a/core/tracking/validator.py b/core/tracking/validator.py index d86a3f6..0c1dca4 100644 --- a/core/tracking/validator.py +++ b/core/tracking/validator.py @@ -56,10 +56,15 @@ class StableCarValidator: self.config = config or {} # Validation thresholds - self.min_stable_duration = self.config.get('min_stable_duration', 3.0) # seconds - self.min_stable_frames = self.config.get('min_stable_frames', 10) + # Optimized for 6 FPS RTSP source with 8 concurrent cameras on GPU + # GPU contention reduces effective FPS to ~3-5 per camera + # Reduced from 3.0s to 1.5s to achieve ~2.75s total validation time (was ~4.25s) + self.min_stable_duration = self.config.get('min_stable_duration', 1.5) # seconds + # Reduced from 10 to 5 to align with tracker requirement and reduce validation time + self.min_stable_frames = self.config.get('min_stable_frames', 5) self.position_variance_threshold = self.config.get('position_variance_threshold', 25.0) # pixels - self.min_confidence = self.config.get('min_confidence', 0.7) + # Reduced from 0.7 to 0.45 to be more permissive under GPU load + self.min_confidence = self.config.get('min_confidence', 0.45) self.velocity_threshold = self.config.get('velocity_threshold', 5.0) # pixels/frame self.entering_zone_ratio = self.config.get('entering_zone_ratio', 0.3) # 30% of frame self.leaving_zone_ratio = self.config.get('leaving_zone_ratio', 0.3) From 0348812fcc8b6170bfd848b510b677224dd75ae5 Mon Sep 17 00:00:00 2001 From: ziesorx Date: Mon, 20 Oct 2025 18:24:19 +0700 Subject: [PATCH 3/3] refactor: improve get session image --- app.py | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/app.py b/app.py index 21d89db..8e17400 100644 --- a/app.py +++ b/app.py @@ -319,7 +319,6 @@ async def get_session_image(session_id: int): """ try: from pathlib import Path - import glob # Images directory images_dir = Path("images") @@ -331,23 +330,34 @@ async def get_session_image(session_id: int): detail=f"No images directory found" ) - # Search for files matching session ID pattern: {session_id}_* - pattern = str(images_dir / f"{session_id}_*.jpg") - matching_files = glob.glob(pattern) + # Use os.scandir() for efficient file searching (3-5x faster than glob.glob) + # Filter files matching session ID pattern: {session_id}_*.jpg + prefix = f"{session_id}_" + most_recent_file = None + most_recent_mtime = 0 - if not matching_files: + with os.scandir(images_dir) as entries: + for entry in entries: + # Filter: must be a file, start with session_id prefix, and end with .jpg + if entry.is_file() and entry.name.startswith(prefix) and entry.name.endswith('.jpg'): + # Use cached stat info from DirEntry (much faster than separate stat calls) + entry_stat = entry.stat() + if entry_stat.st_mtime > most_recent_mtime: + most_recent_mtime = entry_stat.st_mtime + most_recent_file = entry.path + + if not most_recent_file: logger.warning(f"No image found for session {session_id}") raise HTTPException( status_code=404, detail=f"No image found for session {session_id}" ) - # Get the most recent file if multiple exist - most_recent_file = max(matching_files, key=os.path.getmtime) logger.info(f"Found session image for session {session_id}: {most_recent_file}") # Read the image file - image_data = open(most_recent_file, 'rb').read() + with open(most_recent_file, 'rb') as f: + image_data = f.read() # Return image as binary response return Response(content=image_data, media_type="image/jpeg")