diff --git a/core/communication/websocket.py b/core/communication/websocket.py index 077c6dc..7394280 100644 --- a/core/communication/websocket.py +++ b/core/communication/websocket.py @@ -297,31 +297,31 @@ class WebSocketHandler: async def _reconcile_subscriptions_with_tracking(self, target_subscriptions) -> dict: """Reconcile subscriptions with tracking integration.""" try: - # First, we need to create tracking integrations for each unique model + # Create separate tracking integrations for each subscription (camera isolation) tracking_integrations = {} for subscription_payload in target_subscriptions: + subscription_id = subscription_payload['subscriptionIdentifier'] model_id = subscription_payload['modelId'] - # Create tracking integration if not already created - if model_id not in tracking_integrations: - # Get pipeline configuration for this model - pipeline_parser = model_manager.get_pipeline_config(model_id) - if pipeline_parser: - # Create tracking integration with message sender - tracking_integration = TrackingPipelineIntegration( - pipeline_parser, model_manager, model_id, self._send_message - ) + # Create separate tracking integration per subscription for camera isolation + # Get pipeline configuration for this model + pipeline_parser = model_manager.get_pipeline_config(model_id) + if pipeline_parser: + # Create tracking integration with message sender (separate instance per camera) + tracking_integration = TrackingPipelineIntegration( + pipeline_parser, model_manager, model_id, self._send_message + ) - # Initialize tracking model - success = await tracking_integration.initialize_tracking_model() - if success: - tracking_integrations[model_id] = tracking_integration - logger.info(f"[Tracking] Created tracking integration for model {model_id}") - else: - logger.warning(f"[Tracking] Failed to initialize tracking for model {model_id}") + # Initialize tracking model + success = await tracking_integration.initialize_tracking_model() + if success: + tracking_integrations[subscription_id] = tracking_integration + logger.info(f"[Tracking] Created isolated tracking integration for subscription {subscription_id} (model {model_id})") else: - logger.warning(f"[Tracking] No pipeline config found for model {model_id}") + logger.warning(f"[Tracking] Failed to initialize tracking for subscription {subscription_id} (model {model_id})") + else: + logger.warning(f"[Tracking] No pipeline config found for model {model_id} in subscription {subscription_id}") # Now reconcile with StreamManager, adding tracking integrations current_subscription_ids = set() @@ -379,8 +379,8 @@ class WebSocketHandler: logger.info(f"[SUBSCRIPTION_MAPPING] subscription_id='{subscription_id}' → camera_id='{camera_id}'") - # Get tracking integration for this model - tracking_integration = tracking_integrations.get(model_id) + # Get tracking integration for this subscription (camera-isolated) + tracking_integration = tracking_integrations.get(subscription_id) # Extract crop coordinates if present crop_coords = None @@ -412,7 +412,7 @@ class WebSocketHandler: ) if success and tracking_integration: - logger.info(f"[Tracking] Subscription {subscription_id} configured with tracking for model {model_id}") + logger.info(f"[Tracking] Subscription {subscription_id} configured with isolated tracking for model {model_id}") return success diff --git a/core/streaming/manager.py b/core/streaming/manager.py index f6cfbda..0c026e7 100644 --- a/core/streaming/manager.py +++ b/core/streaming/manager.py @@ -389,20 +389,51 @@ class StreamManager: logger.debug(f"Set session {session_id} for display {display_id}") def clear_session_id(self, session_id: str): - """Clear session ID from tracking integrations.""" + """Clear session ID from the specific tracking integration handling this session.""" with self._lock: + # Find the subscription that's handling this session + session_subscription = None for subscription_info in self._subscriptions.values(): if subscription_info.tracking_integration: - subscription_info.tracking_integration.clear_session_id(session_id) - logger.debug(f"Cleared session {session_id}") + # Check if this integration is handling the given session_id + integration = subscription_info.tracking_integration + if session_id in integration.session_vehicles: + session_subscription = subscription_info + break + + if session_subscription and session_subscription.tracking_integration: + session_subscription.tracking_integration.clear_session_id(session_id) + logger.debug(f"Cleared session {session_id} from subscription {session_subscription.subscription_id}") + else: + logger.warning(f"No tracking integration found for session {session_id}, broadcasting to all subscriptions") + # Fallback: broadcast to all (original behavior) + for subscription_info in self._subscriptions.values(): + if subscription_info.tracking_integration: + subscription_info.tracking_integration.clear_session_id(session_id) def set_progression_stage(self, session_id: str, stage: str): - """Set progression stage for tracking integrations.""" + """Set progression stage for the specific tracking integration handling this session.""" with self._lock: + # Find the subscription that's handling this session + session_subscription = None for subscription_info in self._subscriptions.values(): if subscription_info.tracking_integration: - subscription_info.tracking_integration.set_progression_stage(session_id, stage) - logger.debug(f"Set progression stage for session {session_id}: {stage}") + # Check if this integration is handling the given session_id + # We need to check the integration's active sessions + integration = subscription_info.tracking_integration + if session_id in integration.session_vehicles: + session_subscription = subscription_info + break + + if session_subscription and session_subscription.tracking_integration: + session_subscription.tracking_integration.set_progression_stage(session_id, stage) + logger.debug(f"Set progression stage for session {session_id}: {stage} on subscription {session_subscription.subscription_id}") + else: + logger.warning(f"No tracking integration found for session {session_id}, broadcasting to all subscriptions") + # Fallback: broadcast to all (original behavior) + for subscription_info in self._subscriptions.values(): + if subscription_info.tracking_integration: + subscription_info.tracking_integration.set_progression_stage(session_id, stage) def get_tracking_stats(self) -> Dict[str, Any]: """Get tracking statistics from all subscriptions.""" diff --git a/core/tracking/validator.py b/core/tracking/validator.py index c20987f..d86a3f6 100644 --- a/core/tracking/validator.py +++ b/core/tracking/validator.py @@ -36,8 +36,14 @@ class ValidationResult: class StableCarValidator: """ - Validates whether a tracked vehicle is stable (fueling) or just passing by. - Uses multiple criteria including position stability, duration, and movement patterns. + Validates whether a tracked vehicle should be processed through the pipeline. + + Updated for BoT-SORT integration: Trusts the sophisticated BoT-SORT tracking algorithm + for stability determination and focuses on business logic validation: + - Duration requirements for processing + - Confidence thresholds + - Session management and cooldowns + - Camera isolation with composite keys """ def __init__(self, config: Optional[Dict] = None): @@ -169,7 +175,10 @@ class StableCarValidator: def _determine_vehicle_state(self, vehicle: TrackedVehicle) -> VehicleState: """ - Determine the current state of the vehicle based on movement patterns. + Determine the current state of the vehicle based on BoT-SORT tracking results. + + BoT-SORT provides sophisticated tracking, so we trust its stability determination + and focus on business logic validation. Args: vehicle: The tracked vehicle @@ -177,53 +186,44 @@ class StableCarValidator: Returns: Current vehicle state """ - # Not enough data - if len(vehicle.last_position_history) < 3: - return VehicleState.UNKNOWN - - # Calculate velocity - velocity = self._calculate_velocity(vehicle) - - # Get position zones - x_position = vehicle.center[0] / self.frame_width - y_position = vehicle.center[1] / self.frame_height - - # Check if vehicle is stable - stability = vehicle.calculate_stability() - if stability > 0.7 and velocity < self.velocity_threshold: - # Check if it's been stable long enough + # Trust BoT-SORT's stability determination + if vehicle.is_stable: + # Check if it's been stable long enough for processing duration = time.time() - vehicle.first_seen - if duration > self.min_stable_duration and vehicle.stable_frames >= self.min_stable_frames: + if duration >= self.min_stable_duration: return VehicleState.STABLE else: return VehicleState.ENTERING - # Check if vehicle is entering or leaving + # For non-stable vehicles, use simplified state determination + if len(vehicle.last_position_history) < 2: + return VehicleState.UNKNOWN + + # Calculate velocity for movement classification + velocity = self._calculate_velocity(vehicle) + + # Basic movement classification if velocity > self.velocity_threshold: - # Determine direction based on position history - positions = np.array(vehicle.last_position_history) - if len(positions) >= 2: - direction = positions[-1] - positions[0] + # Vehicle is moving - classify as passing by or entering/leaving + x_position = vehicle.center[0] / self.frame_width - # Entering: moving towards center - if x_position < self.entering_zone_ratio or x_position > (1 - self.entering_zone_ratio): - if abs(direction[0]) > abs(direction[1]): # Horizontal movement - if (x_position < 0.5 and direction[0] > 0) or (x_position > 0.5 and direction[0] < 0): - return VehicleState.ENTERING + # Simple heuristic: vehicles near edges are entering/leaving, center vehicles are passing + if x_position < 0.2 or x_position > 0.8: + return VehicleState.ENTERING + else: + return VehicleState.PASSING_BY - # Leaving: moving away from center - if 0.3 < x_position < 0.7: # In center zone - if abs(direction[0]) > abs(direction[1]): # Horizontal movement - if abs(direction[0]) > 10: # Significant movement - return VehicleState.LEAVING - - return VehicleState.PASSING_BY - - return VehicleState.UNKNOWN + # Low velocity but not marked stable by tracker - likely entering + return VehicleState.ENTERING def _validate_stable_vehicle(self, vehicle: TrackedVehicle) -> ValidationResult: """ - Perform detailed validation of a stable vehicle. + Perform business logic validation of a stable vehicle. + + Since BoT-SORT already determined the vehicle is stable, we focus on: + - Duration requirements for processing + - Confidence thresholds + - Business logic constraints Args: vehicle: The stable vehicle to validate @@ -231,7 +231,7 @@ class StableCarValidator: Returns: Detailed validation result """ - # Check duration + # Check duration (business requirement) duration = time.time() - vehicle.first_seen if duration < self.min_stable_duration: return ValidationResult( @@ -243,18 +243,7 @@ class StableCarValidator: track_id=vehicle.track_id ) - # Check frame count - if vehicle.stable_frames < self.min_stable_frames: - return ValidationResult( - is_valid=False, - state=VehicleState.STABLE, - confidence=0.6, - reason=f"Not enough stable frames ({vehicle.stable_frames} < {self.min_stable_frames})", - should_process=False, - track_id=vehicle.track_id - ) - - # Check confidence + # Check confidence (business requirement) if vehicle.avg_confidence < self.min_confidence: return ValidationResult( is_valid=False, @@ -265,28 +254,19 @@ class StableCarValidator: track_id=vehicle.track_id ) - # Check position variance - variance = self._calculate_position_variance(vehicle) - if variance > self.position_variance_threshold: - return ValidationResult( - is_valid=False, - state=VehicleState.STABLE, - confidence=0.7, - reason=f"Position variance too high ({variance:.1f} > {self.position_variance_threshold})", - should_process=False, - track_id=vehicle.track_id - ) + # Trust BoT-SORT's stability determination - skip position variance check + # BoT-SORT's sophisticated tracking already ensures consistent positioning - # Check state history consistency + # Simplified state history check - just ensure recent stability if vehicle.track_id in self.validation_history: - history = self.validation_history[vehicle.track_id][-5:] # Last 5 states + history = self.validation_history[vehicle.track_id][-3:] # Last 3 states stable_count = sum(1 for s in history if s == VehicleState.STABLE) - if stable_count < 3: + if len(history) >= 2 and stable_count == 0: # Only fail if clear instability return ValidationResult( is_valid=False, state=VehicleState.STABLE, confidence=0.7, - reason="Inconsistent state history", + reason="Recent state history shows instability", should_process=False, track_id=vehicle.track_id ) @@ -298,7 +278,7 @@ class StableCarValidator: is_valid=True, state=VehicleState.STABLE, confidence=vehicle.avg_confidence, - reason="Vehicle is stable and ready for processing", + reason="Vehicle is stable and ready for processing (BoT-SORT validated)", should_process=True, track_id=vehicle.track_id )