From ee484b4655c0d5e89fa7a351187d4331ff647973 Mon Sep 17 00:00:00 2001 From: ziesorx Date: Mon, 29 Sep 2025 23:45:20 +0700 Subject: [PATCH] feat: add min bbox for frontal tracking --- core/tracking/integration.py | 60 +++++++++++++++++++++++++++++++++++- 1 file changed, 59 insertions(+), 1 deletion(-) diff --git a/core/tracking/integration.py b/core/tracking/integration.py index 8c96750..d1401ef 100644 --- a/core/tracking/integration.py +++ b/core/tracking/integration.py @@ -71,12 +71,17 @@ class TrackingPipelineIntegration: # Thread pool for pipeline execution self.executor = ThreadPoolExecutor(max_workers=2) + # Min bbox filtering configuration + # TODO: Make this configurable via pipeline.json in the future + self.min_bbox_area_percentage = 4.5 # 4.5% of frame area minimum + # Statistics self.stats = { 'frames_processed': 0, 'vehicles_detected': 0, 'vehicles_validated': 0, - 'pipelines_executed': 0 + 'pipelines_executed': 0, + 'frontals_filtered_small': 0 # Track filtered detections } @@ -202,6 +207,10 @@ class TrackingPipelineIntegration: else: logger.debug(f"No tracking results or detections attribute") + # Filter out small frontal detections (neighboring pumps/distant cars) + if tracking_results and hasattr(tracking_results, 'detections'): + tracking_results = self._filter_small_frontals(tracking_results, frame) + # Process tracking results tracked_vehicles = self.tracker.process_detections( tracking_results, @@ -667,6 +676,55 @@ class TrackingPipelineIntegration: if stage == "car_wait_staff": logger.info(f"Started monitoring session {session_id} for car abandonment") + def _filter_small_frontals(self, tracking_results, frame): + """ + Filter out frontal detections that are smaller than minimum bbox area percentage. + This prevents processing of cars from neighboring pumps that appear in camera view. + + Args: + tracking_results: YOLO tracking results with detections + frame: Input frame for calculating frame area + + Returns: + Modified tracking_results with small frontals removed + """ + if not hasattr(tracking_results, 'detections') or not tracking_results.detections: + return tracking_results + + # Calculate frame area and minimum bbox area threshold + frame_area = frame.shape[0] * frame.shape[1] # height * width + min_bbox_area = frame_area * (self.min_bbox_area_percentage / 100.0) + + # Filter detections + filtered_detections = [] + filtered_count = 0 + + for detection in tracking_results.detections: + # Calculate detection bbox area + bbox = detection.bbox # Assuming bbox is [x1, y1, x2, y2] + bbox_area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1]) + + if bbox_area >= min_bbox_area: + # Keep detection - bbox is large enough + filtered_detections.append(detection) + else: + # Filter out small detection + filtered_count += 1 + area_percentage = (bbox_area / frame_area) * 100 + logger.debug(f"Filtered small frontal: area={bbox_area:.0f}px² ({area_percentage:.1f}% of frame, " + f"min required: {self.min_bbox_area_percentage}%)") + + # Update tracking results with filtered detections + tracking_results.detections = filtered_detections + + # Update statistics + if filtered_count > 0: + self.stats['frontals_filtered_small'] += filtered_count + logger.info(f"Filtered {filtered_count} small frontal detections, " + f"{len(filtered_detections)} remaining (total filtered: {self.stats['frontals_filtered_small']})") + + return tracking_results + def cleanup(self): """Cleanup resources.""" self.executor.shutdown(wait=False)