fix: improve couple things #29
					 5 changed files with 183 additions and 31 deletions
				
			
		
							
								
								
									
										26
									
								
								app.py
									
										
									
									
									
								
							
							
						
						
									
										26
									
								
								app.py
									
										
									
									
									
								
							| 
						 | 
				
			
			@ -319,7 +319,6 @@ async def get_session_image(session_id: int):
 | 
			
		|||
    """
 | 
			
		||||
    try:
 | 
			
		||||
        from pathlib import Path
 | 
			
		||||
        import glob
 | 
			
		||||
 | 
			
		||||
        # Images directory
 | 
			
		||||
        images_dir = Path("images")
 | 
			
		||||
| 
						 | 
				
			
			@ -331,23 +330,34 @@ async def get_session_image(session_id: int):
 | 
			
		|||
                detail=f"No images directory found"
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        # Search for files matching session ID pattern: {session_id}_*
 | 
			
		||||
        pattern = str(images_dir / f"{session_id}_*.jpg")
 | 
			
		||||
        matching_files = glob.glob(pattern)
 | 
			
		||||
        # Use os.scandir() for efficient file searching (3-5x faster than glob.glob)
 | 
			
		||||
        # Filter files matching session ID pattern: {session_id}_*.jpg
 | 
			
		||||
        prefix = f"{session_id}_"
 | 
			
		||||
        most_recent_file = None
 | 
			
		||||
        most_recent_mtime = 0
 | 
			
		||||
 | 
			
		||||
        if not matching_files:
 | 
			
		||||
        with os.scandir(images_dir) as entries:
 | 
			
		||||
            for entry in entries:
 | 
			
		||||
                # Filter: must be a file, start with session_id prefix, and end with .jpg
 | 
			
		||||
                if entry.is_file() and entry.name.startswith(prefix) and entry.name.endswith('.jpg'):
 | 
			
		||||
                    # Use cached stat info from DirEntry (much faster than separate stat calls)
 | 
			
		||||
                    entry_stat = entry.stat()
 | 
			
		||||
                    if entry_stat.st_mtime > most_recent_mtime:
 | 
			
		||||
                        most_recent_mtime = entry_stat.st_mtime
 | 
			
		||||
                        most_recent_file = entry.path
 | 
			
		||||
 | 
			
		||||
        if not most_recent_file:
 | 
			
		||||
            logger.warning(f"No image found for session {session_id}")
 | 
			
		||||
            raise HTTPException(
 | 
			
		||||
                status_code=404,
 | 
			
		||||
                detail=f"No image found for session {session_id}"
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        # Get the most recent file if multiple exist
 | 
			
		||||
        most_recent_file = max(matching_files, key=os.path.getmtime)
 | 
			
		||||
        logger.info(f"Found session image for session {session_id}: {most_recent_file}")
 | 
			
		||||
 | 
			
		||||
        # Read the image file
 | 
			
		||||
        image_data = open(most_recent_file, 'rb').read()
 | 
			
		||||
        with open(most_recent_file, 'rb') as f:
 | 
			
		||||
            image_data = f.read()
 | 
			
		||||
 | 
			
		||||
        # Return image as binary response
 | 
			
		||||
        return Response(content=image_data, media_type="image/jpeg")
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -45,12 +45,17 @@ class BranchProcessor:
 | 
			
		|||
        self.redis_manager = None
 | 
			
		||||
        self.db_manager = None
 | 
			
		||||
 | 
			
		||||
        # Branch execution timeout (seconds)
 | 
			
		||||
        self.branch_timeout = 30.0
 | 
			
		||||
 | 
			
		||||
        # Statistics
 | 
			
		||||
        self.stats = {
 | 
			
		||||
            'branches_processed': 0,
 | 
			
		||||
            'parallel_executions': 0,
 | 
			
		||||
            'total_processing_time': 0.0,
 | 
			
		||||
            'models_loaded': 0
 | 
			
		||||
            'models_loaded': 0,
 | 
			
		||||
            'branches_timed_out': 0,
 | 
			
		||||
            'branches_failed': 0
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        logger.info("BranchProcessor initialized")
 | 
			
		||||
| 
						 | 
				
			
			@ -279,22 +284,46 @@ class BranchProcessor:
 | 
			
		|||
            )
 | 
			
		||||
            future_to_branch[future] = branch
 | 
			
		||||
 | 
			
		||||
        # Collect results as they complete
 | 
			
		||||
        for future in as_completed(future_to_branch):
 | 
			
		||||
            branch = future_to_branch[future]
 | 
			
		||||
            branch_id = getattr(branch, 'model_id', 'unknown')
 | 
			
		||||
        # Collect results as they complete with timeout
 | 
			
		||||
        try:
 | 
			
		||||
            for future in as_completed(future_to_branch, timeout=self.branch_timeout):
 | 
			
		||||
                branch = future_to_branch[future]
 | 
			
		||||
                branch_id = getattr(branch, 'model_id', 'unknown')
 | 
			
		||||
 | 
			
		||||
            try:
 | 
			
		||||
                result = future.result()
 | 
			
		||||
                results[branch_id] = result
 | 
			
		||||
                logger.info(f"[PARALLEL COMPLETE] {branch_id}: Branch completed successfully")
 | 
			
		||||
            except Exception as e:
 | 
			
		||||
                logger.error(f"Error in parallel branch {branch_id}: {e}")
 | 
			
		||||
                results[branch_id] = {
 | 
			
		||||
                    'status': 'error',
 | 
			
		||||
                    'message': str(e),
 | 
			
		||||
                    'processing_time': 0.0
 | 
			
		||||
                }
 | 
			
		||||
                try:
 | 
			
		||||
                    # Get result with timeout to prevent indefinite hanging
 | 
			
		||||
                    result = future.result(timeout=self.branch_timeout)
 | 
			
		||||
                    results[branch_id] = result
 | 
			
		||||
                    logger.info(f"[PARALLEL COMPLETE] {branch_id}: Branch completed successfully")
 | 
			
		||||
                except TimeoutError:
 | 
			
		||||
                    logger.error(f"[TIMEOUT] Branch {branch_id} exceeded timeout of {self.branch_timeout}s")
 | 
			
		||||
                    self.stats['branches_timed_out'] += 1
 | 
			
		||||
                    results[branch_id] = {
 | 
			
		||||
                        'status': 'timeout',
 | 
			
		||||
                        'message': f'Branch execution timeout after {self.branch_timeout}s',
 | 
			
		||||
                        'processing_time': self.branch_timeout
 | 
			
		||||
                    }
 | 
			
		||||
                except Exception as e:
 | 
			
		||||
                    logger.error(f"[ERROR] Error in parallel branch {branch_id}: {e}", exc_info=True)
 | 
			
		||||
                    self.stats['branches_failed'] += 1
 | 
			
		||||
                    results[branch_id] = {
 | 
			
		||||
                        'status': 'error',
 | 
			
		||||
                        'message': str(e),
 | 
			
		||||
                        'processing_time': 0.0
 | 
			
		||||
                    }
 | 
			
		||||
        except TimeoutError:
 | 
			
		||||
            # as_completed iterator timed out - mark remaining futures as timed out
 | 
			
		||||
            logger.error(f"[TIMEOUT] Branch execution timeout after {self.branch_timeout}s - some branches did not complete")
 | 
			
		||||
            for future, branch in future_to_branch.items():
 | 
			
		||||
                branch_id = getattr(branch, 'model_id', 'unknown')
 | 
			
		||||
                if branch_id not in results:
 | 
			
		||||
                    logger.error(f"[TIMEOUT] Branch {branch_id} did not complete within timeout")
 | 
			
		||||
                    self.stats['branches_timed_out'] += 1
 | 
			
		||||
                    results[branch_id] = {
 | 
			
		||||
                        'status': 'timeout',
 | 
			
		||||
                        'message': f'Branch did not complete within {self.branch_timeout}s timeout',
 | 
			
		||||
                        'processing_time': self.branch_timeout
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
        # Flatten nested branch results to top level for database access
 | 
			
		||||
        flattened_results = {}
 | 
			
		||||
| 
						 | 
				
			
			@ -309,6 +338,24 @@ class BranchProcessor:
 | 
			
		|||
                    flattened_results[nested_branch_id] = nested_result
 | 
			
		||||
                    logger.info(f"[FLATTEN] Added nested branch {nested_branch_id} to top-level results")
 | 
			
		||||
 | 
			
		||||
        # Log summary of branch execution results
 | 
			
		||||
        succeeded = [bid for bid, res in results.items() if res.get('status') == 'success']
 | 
			
		||||
        failed = [bid for bid, res in results.items() if res.get('status') == 'error']
 | 
			
		||||
        timed_out = [bid for bid, res in results.items() if res.get('status') == 'timeout']
 | 
			
		||||
        skipped = [bid for bid, res in results.items() if res.get('status') == 'skipped']
 | 
			
		||||
 | 
			
		||||
        summary_parts = []
 | 
			
		||||
        if succeeded:
 | 
			
		||||
            summary_parts.append(f"{len(succeeded)} succeeded: {', '.join(succeeded)}")
 | 
			
		||||
        if failed:
 | 
			
		||||
            summary_parts.append(f"{len(failed)} FAILED: {', '.join(failed)}")
 | 
			
		||||
        if timed_out:
 | 
			
		||||
            summary_parts.append(f"{len(timed_out)} TIMED OUT: {', '.join(timed_out)}")
 | 
			
		||||
        if skipped:
 | 
			
		||||
            summary_parts.append(f"{len(skipped)} skipped: {', '.join(skipped)}")
 | 
			
		||||
 | 
			
		||||
        logger.info(f"[PARALLEL SUMMARY] Branch execution completed: {' | '.join(summary_parts) if summary_parts else 'no branches'}")
 | 
			
		||||
 | 
			
		||||
        return flattened_results
 | 
			
		||||
 | 
			
		||||
    async def _execute_sequential_branches(self,
 | 
			
		||||
| 
						 | 
				
			
			@ -496,9 +543,19 @@ class BranchProcessor:
 | 
			
		|||
 | 
			
		||||
            # Use .predict() method for both detection and classification models
 | 
			
		||||
            inference_start = time.time()
 | 
			
		||||
            detection_results = model.model.predict(input_frame, conf=min_confidence, verbose=False)
 | 
			
		||||
            inference_time = time.time() - inference_start
 | 
			
		||||
            logger.info(f"[INFERENCE DONE] {branch_id}: Predict completed in {inference_time:.3f}s using .predict() method")
 | 
			
		||||
            try:
 | 
			
		||||
                detection_results = model.model.predict(input_frame, conf=min_confidence, verbose=False)
 | 
			
		||||
                inference_time = time.time() - inference_start
 | 
			
		||||
                logger.info(f"[INFERENCE DONE] {branch_id}: Predict completed in {inference_time:.3f}s using .predict() method")
 | 
			
		||||
            except Exception as inference_error:
 | 
			
		||||
                inference_time = time.time() - inference_start
 | 
			
		||||
                logger.error(f"[INFERENCE ERROR] {branch_id}: Model inference failed after {inference_time:.3f}s: {inference_error}", exc_info=True)
 | 
			
		||||
                return {
 | 
			
		||||
                    'status': 'error',
 | 
			
		||||
                    'branch_id': branch_id,
 | 
			
		||||
                    'message': f'Model inference failed: {str(inference_error)}',
 | 
			
		||||
                    'processing_time': time.time() - start_time
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
            # Initialize branch_detections outside the conditional
 | 
			
		||||
            branch_detections = []
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -445,6 +445,78 @@ class DetectionPipeline:
 | 
			
		|||
        except Exception as e:
 | 
			
		||||
            logger.error(f"Error sending initial detection imageDetection message: {e}", exc_info=True)
 | 
			
		||||
 | 
			
		||||
    async def _send_classification_results(self, subscription_id: str, session_id: str, branch_results: Dict[str, Any]):
 | 
			
		||||
        """
 | 
			
		||||
        Send imageDetection message with classification results (without license plate).
 | 
			
		||||
        Called after processing phase completes to send partial results immediately.
 | 
			
		||||
 | 
			
		||||
        Args:
 | 
			
		||||
            subscription_id: Subscription identifier to send message to
 | 
			
		||||
            session_id: Session identifier
 | 
			
		||||
            branch_results: Dictionary of branch execution results
 | 
			
		||||
        """
 | 
			
		||||
        try:
 | 
			
		||||
            if not self.message_sender:
 | 
			
		||||
                logger.warning("No message sender configured, cannot send imageDetection")
 | 
			
		||||
                return
 | 
			
		||||
 | 
			
		||||
            # Import here to avoid circular imports
 | 
			
		||||
            from ..communication.models import ImageDetectionMessage, DetectionData
 | 
			
		||||
 | 
			
		||||
            # Extract classification fields from branch results
 | 
			
		||||
            extracted_fields = self._extract_fields_from_branches(branch_results)
 | 
			
		||||
            car_brand = extracted_fields.get('brand')
 | 
			
		||||
            body_type = extracted_fields.get('body_type')
 | 
			
		||||
 | 
			
		||||
            # Log what we're sending
 | 
			
		||||
            fields_status = []
 | 
			
		||||
            if car_brand is not None:
 | 
			
		||||
                fields_status.append(f"brand={car_brand}")
 | 
			
		||||
            else:
 | 
			
		||||
                fields_status.append("brand=null")
 | 
			
		||||
            if body_type is not None:
 | 
			
		||||
                fields_status.append(f"bodyType={body_type}")
 | 
			
		||||
            else:
 | 
			
		||||
                fields_status.append("bodyType=null")
 | 
			
		||||
            logger.info(f"[CLASSIFICATION] Sending partial results for session {session_id}: {', '.join(fields_status)}")
 | 
			
		||||
 | 
			
		||||
            # Create detection data with classification results (license plate still pending)
 | 
			
		||||
            detection_data_obj = DetectionData(
 | 
			
		||||
                detection={
 | 
			
		||||
                    "carBrand": car_brand,
 | 
			
		||||
                    "carModel": None,  # Not implemented yet
 | 
			
		||||
                    "bodyType": body_type,
 | 
			
		||||
                    "licensePlateText": None,  # Will be sent later via license plate callback
 | 
			
		||||
                    "licensePlateConfidence": None
 | 
			
		||||
                },
 | 
			
		||||
                modelId=self.model_id,
 | 
			
		||||
                modelName=self.pipeline_parser.pipeline_config.model_id if self.pipeline_parser.pipeline_config else "detection_model"
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            # Create imageDetection message
 | 
			
		||||
            detection_message = ImageDetectionMessage(
 | 
			
		||||
                subscriptionIdentifier=subscription_id,
 | 
			
		||||
                data=detection_data_obj
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            # Send message
 | 
			
		||||
            await self.message_sender(detection_message)
 | 
			
		||||
 | 
			
		||||
            # Log with indication of partial results
 | 
			
		||||
            null_fields = []
 | 
			
		||||
            if car_brand is None:
 | 
			
		||||
                null_fields.append('brand')
 | 
			
		||||
            if body_type is None:
 | 
			
		||||
                null_fields.append('bodyType')
 | 
			
		||||
 | 
			
		||||
            if null_fields:
 | 
			
		||||
                logger.info(f"[PARTIAL RESULTS] Sent imageDetection with PARTIAL results (null: {', '.join(null_fields)}) - brand='{car_brand}', bodyType='{body_type}' to '{subscription_id}'")
 | 
			
		||||
            else:
 | 
			
		||||
                logger.info(f"[CLASSIFICATION COMPLETE] Sent imageDetection with brand='{car_brand}', bodyType='{body_type}' to '{subscription_id}'")
 | 
			
		||||
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.error(f"Error sending classification results imageDetection message: {e}", exc_info=True)
 | 
			
		||||
 | 
			
		||||
    async def execute_detection_phase(self,
 | 
			
		||||
                                    frame: np.ndarray,
 | 
			
		||||
                                    display_id: str,
 | 
			
		||||
| 
						 | 
				
			
			@ -693,6 +765,13 @@ class DetectionPipeline:
 | 
			
		|||
                self.session_processing_results[session_id] = result['branch_results']
 | 
			
		||||
                logger.info(f"[PROCESSING RESULTS] Stored results for session {session_id} for later combination")
 | 
			
		||||
 | 
			
		||||
                # Send classification results immediately (license plate will come later via callback)
 | 
			
		||||
                await self._send_classification_results(
 | 
			
		||||
                    subscription_id=subscription_id,
 | 
			
		||||
                    session_id=session_id,
 | 
			
		||||
                    branch_results=result['branch_results']
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            logger.info(f"Processing phase completed for session {session_id}: "
 | 
			
		||||
                       f"{len(result['branch_results'])} branches, {len(result['actions_executed'])} actions")
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -70,7 +70,8 @@ class TrackingPipelineIntegration:
 | 
			
		|||
        self.abandonment_timeout = 3.0  # seconds to wait before declaring car abandoned
 | 
			
		||||
 | 
			
		||||
        # Thread pool for pipeline execution
 | 
			
		||||
        self.executor = ThreadPoolExecutor(max_workers=2)
 | 
			
		||||
        # Increased to 8 workers to handle 8 concurrent cameras without queuing
 | 
			
		||||
        self.executor = ThreadPoolExecutor(max_workers=8)
 | 
			
		||||
 | 
			
		||||
        # Min bbox filtering configuration
 | 
			
		||||
        # TODO: Make this configurable via pipeline.json in the future
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -56,10 +56,15 @@ class StableCarValidator:
 | 
			
		|||
        self.config = config or {}
 | 
			
		||||
 | 
			
		||||
        # Validation thresholds
 | 
			
		||||
        self.min_stable_duration = self.config.get('min_stable_duration', 3.0)  # seconds
 | 
			
		||||
        self.min_stable_frames = self.config.get('min_stable_frames', 10)
 | 
			
		||||
        # Optimized for 6 FPS RTSP source with 8 concurrent cameras on GPU
 | 
			
		||||
        # GPU contention reduces effective FPS to ~3-5 per camera
 | 
			
		||||
        # Reduced from 3.0s to 1.5s to achieve ~2.75s total validation time (was ~4.25s)
 | 
			
		||||
        self.min_stable_duration = self.config.get('min_stable_duration', 1.5)  # seconds
 | 
			
		||||
        # Reduced from 10 to 5 to align with tracker requirement and reduce validation time
 | 
			
		||||
        self.min_stable_frames = self.config.get('min_stable_frames', 5)
 | 
			
		||||
        self.position_variance_threshold = self.config.get('position_variance_threshold', 25.0)  # pixels
 | 
			
		||||
        self.min_confidence = self.config.get('min_confidence', 0.7)
 | 
			
		||||
        # Reduced from 0.7 to 0.45 to be more permissive under GPU load
 | 
			
		||||
        self.min_confidence = self.config.get('min_confidence', 0.45)
 | 
			
		||||
        self.velocity_threshold = self.config.get('velocity_threshold', 5.0)  # pixels/frame
 | 
			
		||||
        self.entering_zone_ratio = self.config.get('entering_zone_ratio', 0.3)  # 30% of frame
 | 
			
		||||
        self.leaving_zone_ratio = self.config.get('leaving_zone_ratio', 0.3)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue