All checks were successful
Build Worker Base and Application Images / check-base-changes (push) Successful in 7s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 2m45s
Build Worker Base and Application Images / deploy-stack (push) Successful in 16s
834 lines
No EOL
35 KiB
Python
834 lines
No EOL
35 KiB
Python
"""
|
|
Tracking-Pipeline Integration Module.
|
|
Connects the tracking system with the main detection pipeline and manages the flow.
|
|
"""
|
|
import logging
|
|
import time
|
|
import uuid
|
|
from typing import Dict, Optional, Any, List, Tuple
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
import asyncio
|
|
import numpy as np
|
|
|
|
from .tracker import VehicleTracker, TrackedVehicle
|
|
from .validator import StableCarValidator
|
|
from ..models.inference import YOLOWrapper
|
|
from ..models.pipeline import PipelineParser
|
|
from ..detection.pipeline import DetectionPipeline
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TrackingPipelineIntegration:
|
|
"""
|
|
Integrates vehicle tracking with the detection pipeline.
|
|
Manages tracking state transitions and pipeline execution triggers.
|
|
"""
|
|
|
|
def __init__(self, pipeline_parser: PipelineParser, model_manager: Any, model_id: int, message_sender=None):
|
|
"""
|
|
Initialize tracking-pipeline integration.
|
|
|
|
Args:
|
|
pipeline_parser: Pipeline parser with loaded configuration
|
|
model_manager: Model manager for loading models
|
|
model_id: The model ID to use for loading models
|
|
message_sender: Optional callback function for sending WebSocket messages
|
|
"""
|
|
self.pipeline_parser = pipeline_parser
|
|
self.model_manager = model_manager
|
|
self.model_id = model_id
|
|
self.message_sender = message_sender
|
|
|
|
# Store subscription info for snapshot access
|
|
self.subscription_info = None
|
|
|
|
# Initialize tracking components
|
|
tracking_config = pipeline_parser.tracking_config.__dict__ if pipeline_parser.tracking_config else {}
|
|
self.tracker = VehicleTracker(tracking_config)
|
|
self.validator = StableCarValidator()
|
|
|
|
# Tracking model
|
|
self.tracking_model: Optional[YOLOWrapper] = None
|
|
self.tracking_model_id = None
|
|
|
|
# Detection pipeline (Phase 5)
|
|
self.detection_pipeline: Optional[DetectionPipeline] = None
|
|
|
|
# Session management
|
|
self.active_sessions: Dict[str, str] = {} # display_id -> session_id
|
|
self.session_vehicles: Dict[str, int] = {} # session_id -> track_id
|
|
self.cleared_sessions: Dict[str, float] = {} # session_id -> clear_time
|
|
self.pending_vehicles: Dict[str, int] = {} # display_id -> track_id (waiting for session ID)
|
|
self.pending_processing_data: Dict[str, Dict] = {} # display_id -> processing data (waiting for session ID)
|
|
|
|
# Additional validators for enhanced flow control
|
|
self.permanently_processed: Dict[str, float] = {} # "camera_id:track_id" -> process_time (never process again)
|
|
self.progression_stages: Dict[str, str] = {} # session_id -> current_stage
|
|
self.last_detection_time: Dict[str, float] = {} # display_id -> last_detection_timestamp
|
|
self.abandonment_timeout = 3.0 # seconds to wait before declaring car abandoned
|
|
|
|
# Thread pool for pipeline execution
|
|
self.executor = ThreadPoolExecutor(max_workers=2)
|
|
|
|
# Min bbox filtering configuration
|
|
# TODO: Make this configurable via pipeline.json in the future
|
|
self.min_bbox_area_percentage = 4.5 # 4.5% of frame area minimum
|
|
|
|
# Statistics
|
|
self.stats = {
|
|
'frames_processed': 0,
|
|
'vehicles_detected': 0,
|
|
'vehicles_validated': 0,
|
|
'pipelines_executed': 0,
|
|
'frontals_filtered_small': 0 # Track filtered detections
|
|
}
|
|
|
|
|
|
logger.info("TrackingPipelineIntegration initialized")
|
|
|
|
async def initialize_tracking_model(self) -> bool:
|
|
"""
|
|
Load and initialize the tracking model.
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
try:
|
|
if not self.pipeline_parser.tracking_config:
|
|
logger.warning("No tracking configuration found in pipeline")
|
|
return False
|
|
|
|
model_file = self.pipeline_parser.tracking_config.model_file
|
|
model_id = self.pipeline_parser.tracking_config.model_id
|
|
|
|
if not model_file:
|
|
logger.warning("No tracking model file specified")
|
|
return False
|
|
|
|
# Load tracking model
|
|
logger.info(f"Loading tracking model: {model_id} ({model_file})")
|
|
self.tracking_model = self.model_manager.get_yolo_model(self.model_id, model_file)
|
|
if not self.tracking_model:
|
|
logger.error(f"Failed to load tracking model {model_file} from model {self.model_id}")
|
|
return False
|
|
self.tracking_model_id = model_id
|
|
|
|
if self.tracking_model:
|
|
logger.info(f"Tracking model {model_id} loaded successfully")
|
|
|
|
# Initialize detection pipeline (Phase 5)
|
|
await self._initialize_detection_pipeline()
|
|
|
|
return True
|
|
else:
|
|
logger.error(f"Failed to load tracking model {model_id}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error initializing tracking model: {e}", exc_info=True)
|
|
return False
|
|
|
|
async def _initialize_detection_pipeline(self) -> bool:
|
|
"""
|
|
Initialize the detection pipeline for main detection processing.
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
try:
|
|
if not self.pipeline_parser:
|
|
logger.warning("No pipeline parser available for detection pipeline")
|
|
return False
|
|
|
|
# Create detection pipeline with message sender capability
|
|
self.detection_pipeline = DetectionPipeline(self.pipeline_parser, self.model_manager, self.model_id, self.message_sender)
|
|
|
|
# Initialize detection pipeline
|
|
if await self.detection_pipeline.initialize():
|
|
logger.info("Detection pipeline initialized successfully")
|
|
return True
|
|
else:
|
|
logger.error("Failed to initialize detection pipeline")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error initializing detection pipeline: {e}", exc_info=True)
|
|
return False
|
|
|
|
async def process_frame(self,
|
|
frame: np.ndarray,
|
|
display_id: str,
|
|
subscription_id: str,
|
|
session_id: Optional[str] = None) -> Dict[str, Any]:
|
|
"""
|
|
Process a frame through tracking and potentially the detection pipeline.
|
|
|
|
Args:
|
|
frame: Input frame to process
|
|
display_id: Display identifier
|
|
subscription_id: Full subscription identifier
|
|
session_id: Optional session ID from backend
|
|
|
|
Returns:
|
|
Dictionary with processing results
|
|
"""
|
|
start_time = time.time()
|
|
result = {
|
|
'tracked_vehicles': [],
|
|
'validated_vehicle': None,
|
|
'pipeline_result': None,
|
|
'session_id': session_id,
|
|
'processing_time': 0.0
|
|
}
|
|
|
|
try:
|
|
# Update stats
|
|
self.stats['frames_processed'] += 1
|
|
|
|
# Run tracking model
|
|
if self.tracking_model:
|
|
# Run detection-only (tracking handled by our own tracker)
|
|
tracking_results = self.tracking_model.track(
|
|
frame,
|
|
confidence_threshold=self.tracker.min_confidence,
|
|
trigger_classes=self.tracker.trigger_classes,
|
|
persist=True
|
|
)
|
|
|
|
# Debug: Log raw detection results
|
|
if tracking_results and hasattr(tracking_results, 'detections'):
|
|
raw_detections = len(tracking_results.detections)
|
|
if raw_detections > 0:
|
|
class_names = [detection.class_name for detection in tracking_results.detections]
|
|
logger.debug(f"Raw detections: {raw_detections}, classes: {class_names}")
|
|
else:
|
|
logger.debug(f"No raw detections found")
|
|
else:
|
|
logger.debug(f"No tracking results or detections attribute")
|
|
|
|
# Filter out small frontal detections (neighboring pumps/distant cars)
|
|
if tracking_results and hasattr(tracking_results, 'detections'):
|
|
tracking_results = self._filter_small_frontals(tracking_results, frame)
|
|
|
|
# Process tracking results
|
|
tracked_vehicles = self.tracker.process_detections(
|
|
tracking_results,
|
|
display_id,
|
|
frame
|
|
)
|
|
|
|
# Update last detection time for abandonment detection
|
|
if tracked_vehicles:
|
|
self.last_detection_time[display_id] = time.time()
|
|
|
|
# Check for car abandonment (vehicle left after getting car_wait_staff stage)
|
|
await self._check_car_abandonment(display_id, subscription_id)
|
|
|
|
result['tracked_vehicles'] = [
|
|
{
|
|
'track_id': v.track_id,
|
|
'bbox': v.bbox,
|
|
'confidence': v.confidence,
|
|
'is_stable': v.is_stable,
|
|
'session_id': v.session_id
|
|
}
|
|
for v in tracked_vehicles
|
|
]
|
|
|
|
# Log tracking info periodically
|
|
if self.stats['frames_processed'] % 30 == 0: # Every 30 frames
|
|
logger.debug(f"Tracking: {len(tracked_vehicles)} vehicles, "
|
|
f"display={display_id}")
|
|
|
|
# Get stable vehicles for validation
|
|
stable_vehicles = self.tracker.get_stable_vehicles(display_id)
|
|
|
|
# Validate and potentially process stable vehicles
|
|
for vehicle in stable_vehicles:
|
|
# Check if vehicle is already processed or has session
|
|
if vehicle.processed_pipeline:
|
|
continue
|
|
|
|
# Check for session cleared (post-fueling)
|
|
if session_id and vehicle.session_id == session_id:
|
|
# Same vehicle with same session, skip
|
|
continue
|
|
|
|
# Check if this was a recently cleared session
|
|
session_cleared = False
|
|
if vehicle.session_id in self.cleared_sessions:
|
|
clear_time = self.cleared_sessions[vehicle.session_id]
|
|
if (time.time() - clear_time) < 30: # 30 second cooldown
|
|
session_cleared = True
|
|
|
|
# Skip same car after session clear or if permanently processed
|
|
if self.validator.should_skip_same_car(vehicle, session_cleared, self.permanently_processed):
|
|
continue
|
|
|
|
# Validate vehicle
|
|
validation_result = self.validator.validate_vehicle(vehicle, frame.shape)
|
|
|
|
if validation_result.is_valid and validation_result.should_process:
|
|
logger.info(f"Vehicle {vehicle.track_id} validated for processing: "
|
|
f"{validation_result.reason}")
|
|
|
|
result['validated_vehicle'] = {
|
|
'track_id': vehicle.track_id,
|
|
'state': validation_result.state.value,
|
|
'confidence': validation_result.confidence
|
|
}
|
|
|
|
# Execute detection pipeline - this will send real imageDetection when detection is found
|
|
|
|
# Mark vehicle as pending session ID assignment
|
|
self.pending_vehicles[display_id] = vehicle.track_id
|
|
logger.info(f"Vehicle {vehicle.track_id} waiting for session ID from backend")
|
|
|
|
# Execute detection pipeline (placeholder for Phase 5)
|
|
pipeline_result = await self._execute_pipeline(
|
|
frame,
|
|
vehicle,
|
|
display_id,
|
|
None, # No session ID yet
|
|
subscription_id
|
|
)
|
|
|
|
result['pipeline_result'] = pipeline_result
|
|
# No session_id in result yet - backend will provide it
|
|
self.stats['pipelines_executed'] += 1
|
|
|
|
# Only process one vehicle per frame
|
|
break
|
|
|
|
self.stats['vehicles_detected'] = len(tracked_vehicles)
|
|
self.stats['vehicles_validated'] = len(stable_vehicles)
|
|
|
|
else:
|
|
logger.warning("No tracking model available")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in tracking pipeline: {e}", exc_info=True)
|
|
|
|
|
|
result['processing_time'] = time.time() - start_time
|
|
return result
|
|
|
|
async def _execute_pipeline(self,
|
|
frame: np.ndarray,
|
|
vehicle: TrackedVehicle,
|
|
display_id: str,
|
|
session_id: str,
|
|
subscription_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Execute the main detection pipeline for a validated vehicle.
|
|
|
|
Args:
|
|
frame: Input frame
|
|
vehicle: Validated tracked vehicle
|
|
display_id: Display identifier
|
|
session_id: Session identifier
|
|
subscription_id: Full subscription identifier
|
|
|
|
Returns:
|
|
Pipeline execution results
|
|
"""
|
|
logger.info(f"Executing detection pipeline for vehicle {vehicle.track_id}, "
|
|
f"session={session_id}, display={display_id}")
|
|
|
|
try:
|
|
# Check if detection pipeline is available
|
|
if not self.detection_pipeline:
|
|
logger.warning("Detection pipeline not initialized, using fallback")
|
|
return {
|
|
'status': 'error',
|
|
'message': 'Detection pipeline not available',
|
|
'vehicle_id': vehicle.track_id,
|
|
'session_id': session_id
|
|
}
|
|
|
|
# Execute only the detection phase (first phase)
|
|
# This will run detection and send imageDetection message to backend
|
|
detection_result = await self.detection_pipeline.execute_detection_phase(
|
|
frame=frame,
|
|
display_id=display_id,
|
|
subscription_id=subscription_id
|
|
)
|
|
|
|
# Add vehicle information to result
|
|
detection_result['vehicle_id'] = vehicle.track_id
|
|
detection_result['vehicle_bbox'] = vehicle.bbox
|
|
detection_result['vehicle_confidence'] = vehicle.confidence
|
|
detection_result['phase'] = 'detection'
|
|
|
|
logger.info(f"Detection phase executed for vehicle {vehicle.track_id}: "
|
|
f"status={detection_result.get('status', 'unknown')}, "
|
|
f"message_sent={detection_result.get('message_sent', False)}, "
|
|
f"processing_time={detection_result.get('processing_time', 0):.3f}s")
|
|
|
|
# Store frame and detection results for processing phase
|
|
if detection_result['message_sent']:
|
|
# Store for later processing when sessionId is received
|
|
self.pending_processing_data[display_id] = {
|
|
'frame': frame.copy(), # Store copy of frame for processing phase
|
|
'vehicle': vehicle,
|
|
'subscription_id': subscription_id,
|
|
'detection_result': detection_result,
|
|
'timestamp': time.time()
|
|
}
|
|
logger.info(f"Stored processing data for {display_id}, waiting for sessionId from backend")
|
|
|
|
return detection_result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error executing detection pipeline: {e}", exc_info=True)
|
|
return {
|
|
'status': 'error',
|
|
'message': str(e),
|
|
'vehicle_id': vehicle.track_id,
|
|
'session_id': session_id,
|
|
'processing_time': 0.0
|
|
}
|
|
|
|
async def _execute_processing_phase(self,
|
|
processing_data: Dict[str, Any],
|
|
session_id: str,
|
|
display_id: str) -> None:
|
|
"""
|
|
Execute the processing phase after receiving sessionId from backend.
|
|
This includes branch processing and database operations.
|
|
|
|
Args:
|
|
processing_data: Stored processing data from detection phase
|
|
session_id: Session ID from backend
|
|
display_id: Display identifier
|
|
"""
|
|
try:
|
|
vehicle = processing_data['vehicle']
|
|
subscription_id = processing_data['subscription_id']
|
|
detection_result = processing_data['detection_result']
|
|
|
|
logger.info(f"Executing processing phase for session {session_id}, vehicle {vehicle.track_id}")
|
|
|
|
# Capture high-quality snapshot for pipeline processing
|
|
logger.info(f"[PROCESSING PHASE] Fetching 2K snapshot for session {session_id}")
|
|
frame = self._fetch_snapshot()
|
|
|
|
if frame is None:
|
|
logger.warning(f"[PROCESSING PHASE] Failed to capture snapshot, falling back to RTSP frame")
|
|
# Fall back to RTSP frame if snapshot fails
|
|
frame = processing_data['frame']
|
|
|
|
# Extract detected regions from detection phase result if available
|
|
detected_regions = detection_result.get('detected_regions', {})
|
|
logger.info(f"[INTEGRATION] Passing detected_regions to processing phase: {list(detected_regions.keys())}")
|
|
|
|
# Execute processing phase with detection pipeline
|
|
if self.detection_pipeline:
|
|
processing_result = await self.detection_pipeline.execute_processing_phase(
|
|
frame=frame,
|
|
display_id=display_id,
|
|
session_id=session_id,
|
|
subscription_id=subscription_id,
|
|
detected_regions=detected_regions
|
|
)
|
|
|
|
logger.info(f"Processing phase completed for session {session_id}: "
|
|
f"status={processing_result.get('status', 'unknown')}, "
|
|
f"branches={len(processing_result.get('branch_results', {}))}, "
|
|
f"actions={len(processing_result.get('actions_executed', []))}, "
|
|
f"processing_time={processing_result.get('processing_time', 0):.3f}s")
|
|
|
|
# Update stats
|
|
self.stats['pipelines_executed'] += 1
|
|
|
|
else:
|
|
logger.error("Detection pipeline not available for processing phase")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in processing phase for session {session_id}: {e}", exc_info=True)
|
|
|
|
|
|
def set_subscription_info(self, subscription_info):
|
|
"""
|
|
Set subscription info to access snapshot URL and other stream details.
|
|
|
|
Args:
|
|
subscription_info: SubscriptionInfo object containing stream config
|
|
"""
|
|
self.subscription_info = subscription_info
|
|
logger.debug(f"Set subscription info with snapshot_url: {subscription_info.stream_config.snapshot_url if subscription_info else None}")
|
|
|
|
def set_session_id(self, display_id: str, session_id: str):
|
|
"""
|
|
Set session ID for a display (from backend).
|
|
This is called when backend sends setSessionId after receiving imageDetection.
|
|
|
|
Args:
|
|
display_id: Display identifier
|
|
session_id: Session identifier
|
|
"""
|
|
# Ensure session_id is always a string for consistent type handling
|
|
session_id = str(session_id) if session_id is not None else None
|
|
self.active_sessions[display_id] = session_id
|
|
logger.info(f"Set session {session_id} for display {display_id}")
|
|
|
|
# Check if we have a pending vehicle for this display
|
|
if display_id in self.pending_vehicles:
|
|
track_id = self.pending_vehicles[display_id]
|
|
|
|
# Mark vehicle as processed with the session ID
|
|
self.tracker.mark_processed(track_id, session_id)
|
|
self.session_vehicles[session_id] = track_id
|
|
|
|
# Mark vehicle as permanently processed (won't process again even after session clear)
|
|
# Use composite key to distinguish same track IDs across different cameras
|
|
camera_id = display_id # Using display_id as camera_id for isolation
|
|
permanent_key = f"{camera_id}:{track_id}"
|
|
self.permanently_processed[permanent_key] = time.time()
|
|
|
|
# Remove from pending
|
|
del self.pending_vehicles[display_id]
|
|
|
|
logger.info(f"Assigned session {session_id} to vehicle {track_id}, marked as permanently processed")
|
|
else:
|
|
logger.warning(f"No pending vehicle found for display {display_id} when setting session {session_id}")
|
|
|
|
# Check if we have pending processing data for this display
|
|
if display_id in self.pending_processing_data:
|
|
processing_data = self.pending_processing_data[display_id]
|
|
|
|
# Trigger the processing phase asynchronously
|
|
asyncio.create_task(self._execute_processing_phase(
|
|
processing_data=processing_data,
|
|
session_id=session_id,
|
|
display_id=display_id
|
|
))
|
|
|
|
# Remove from pending processing
|
|
del self.pending_processing_data[display_id]
|
|
|
|
logger.info(f"Triggered processing phase for session {session_id} on display {display_id}")
|
|
else:
|
|
logger.warning(f"No pending processing data found for display {display_id} when setting session {session_id}")
|
|
|
|
# FALLBACK: Execute pipeline for POS-initiated sessions
|
|
logger.info(f"[FALLBACK] Triggering fallback pipeline for session {session_id} on display {display_id}")
|
|
|
|
# Create subscription_id for fallback (needed for pipeline execution)
|
|
fallback_subscription_id = f"{display_id};fallback"
|
|
|
|
# Trigger the fallback pipeline asynchronously
|
|
asyncio.create_task(self._execute_fallback_pipeline(
|
|
display_id=display_id,
|
|
session_id=session_id,
|
|
subscription_id=fallback_subscription_id
|
|
))
|
|
|
|
def clear_session_id(self, session_id: str):
|
|
"""
|
|
Clear session ID (post-fueling).
|
|
|
|
Args:
|
|
session_id: Session identifier to clear
|
|
"""
|
|
# Mark session as cleared
|
|
self.cleared_sessions[session_id] = time.time()
|
|
|
|
# Clear from tracker
|
|
self.tracker.clear_session(session_id)
|
|
|
|
# Remove from active sessions
|
|
display_to_remove = None
|
|
for display_id, sess_id in self.active_sessions.items():
|
|
if sess_id == session_id:
|
|
display_to_remove = display_id
|
|
break
|
|
|
|
if display_to_remove:
|
|
del self.active_sessions[display_to_remove]
|
|
|
|
if session_id in self.session_vehicles:
|
|
del self.session_vehicles[session_id]
|
|
|
|
logger.info(f"Cleared session {session_id}")
|
|
|
|
# Clean old cleared sessions (older than 5 minutes)
|
|
current_time = time.time()
|
|
old_sessions = [
|
|
sid for sid, clear_time in self.cleared_sessions.items()
|
|
if (current_time - clear_time) > 300
|
|
]
|
|
for sid in old_sessions:
|
|
del self.cleared_sessions[sid]
|
|
|
|
def get_session_for_display(self, display_id: str) -> Optional[str]:
|
|
"""Get active session for a display."""
|
|
return self.active_sessions.get(display_id)
|
|
|
|
def reset_tracking(self):
|
|
"""Reset all tracking state."""
|
|
self.tracker.reset_tracking()
|
|
self.active_sessions.clear()
|
|
self.session_vehicles.clear()
|
|
self.cleared_sessions.clear()
|
|
self.pending_vehicles.clear()
|
|
self.pending_processing_data.clear()
|
|
self.permanently_processed.clear()
|
|
self.progression_stages.clear()
|
|
self.last_detection_time.clear()
|
|
logger.info("Tracking pipeline integration reset")
|
|
|
|
def get_statistics(self) -> Dict[str, Any]:
|
|
"""Get comprehensive statistics."""
|
|
tracker_stats = self.tracker.get_statistics()
|
|
validator_stats = self.validator.get_statistics()
|
|
|
|
return {
|
|
'integration': self.stats,
|
|
'tracker': tracker_stats,
|
|
'validator': validator_stats,
|
|
'active_sessions': len(self.active_sessions),
|
|
'cleared_sessions': len(self.cleared_sessions)
|
|
}
|
|
|
|
async def _check_car_abandonment(self, display_id: str, subscription_id: str):
|
|
"""
|
|
Check if a car has abandoned the fueling process (left after getting car_wait_staff stage).
|
|
|
|
Args:
|
|
display_id: Display identifier
|
|
subscription_id: Subscription identifier
|
|
"""
|
|
current_time = time.time()
|
|
|
|
# Check all sessions in car_wait_staff stage
|
|
abandoned_sessions = []
|
|
for session_id, stage in self.progression_stages.items():
|
|
if stage == "car_wait_staff":
|
|
# Check if we have recent detections for this session's display
|
|
session_display = None
|
|
for disp_id, sess_id in self.active_sessions.items():
|
|
if sess_id == session_id:
|
|
session_display = disp_id
|
|
break
|
|
|
|
if session_display:
|
|
last_detection = self.last_detection_time.get(session_display, 0)
|
|
time_since_detection = current_time - last_detection
|
|
|
|
if time_since_detection > self.abandonment_timeout:
|
|
logger.info(f"Car abandonment detected: session {session_id}, "
|
|
f"no detection for {time_since_detection:.1f}s")
|
|
abandoned_sessions.append(session_id)
|
|
|
|
# Send abandonment detection for each abandoned session
|
|
for session_id in abandoned_sessions:
|
|
await self._send_abandonment_detection(subscription_id, session_id)
|
|
# Remove from progression stages to avoid repeated detection
|
|
if session_id in self.progression_stages:
|
|
del self.progression_stages[session_id]
|
|
|
|
async def _send_abandonment_detection(self, subscription_id: str, session_id: str):
|
|
"""
|
|
Send imageDetection with null detection to indicate car abandonment.
|
|
|
|
Args:
|
|
subscription_id: Subscription identifier
|
|
session_id: Session ID of the abandoned car
|
|
"""
|
|
try:
|
|
# Import here to avoid circular imports
|
|
from ..communication.messages import create_image_detection
|
|
|
|
# Create abandonment detection message with null detection
|
|
detection_message = create_image_detection(
|
|
subscription_identifier=subscription_id,
|
|
detection_data=None, # Null detection indicates abandonment
|
|
model_id=self.model_id,
|
|
model_name=self.pipeline_parser.tracking_config.model_id if self.pipeline_parser.tracking_config else "tracking_model"
|
|
)
|
|
|
|
# Send to backend via WebSocket if sender is available
|
|
if self.message_sender:
|
|
await self.message_sender(detection_message)
|
|
logger.info(f"[CAR ABANDONMENT] Sent null detection for session {session_id}")
|
|
else:
|
|
logger.info(f"[CAR ABANDONMENT] No message sender available, would send: {detection_message}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error sending abandonment detection: {e}", exc_info=True)
|
|
|
|
def set_progression_stage(self, session_id: str, stage: str):
|
|
"""
|
|
Set progression stage for a session (from backend setProgessionStage message).
|
|
|
|
Args:
|
|
session_id: Session identifier
|
|
stage: Progression stage (e.g., "car_wait_staff")
|
|
"""
|
|
self.progression_stages[session_id] = stage
|
|
logger.info(f"Set progression stage for session {session_id}: {stage}")
|
|
|
|
# If car reaches car_wait_staff, start monitoring for abandonment
|
|
if stage == "car_wait_staff":
|
|
logger.info(f"Started monitoring session {session_id} for car abandonment")
|
|
|
|
def _fetch_snapshot(self) -> Optional[np.ndarray]:
|
|
"""
|
|
Fetch high-quality snapshot from camera's snapshot URL.
|
|
Reusable method for both processing phase and fallback pipeline.
|
|
|
|
Returns:
|
|
Snapshot frame or None if unavailable
|
|
"""
|
|
if not (self.subscription_info and self.subscription_info.stream_config.snapshot_url):
|
|
logger.warning("[SNAPSHOT] No subscription info or snapshot URL available")
|
|
return None
|
|
|
|
try:
|
|
from ..streaming.readers import HTTPSnapshotReader
|
|
|
|
logger.info(f"[SNAPSHOT] Fetching snapshot for {self.subscription_info.camera_id}")
|
|
snapshot_reader = HTTPSnapshotReader(
|
|
camera_id=self.subscription_info.camera_id,
|
|
snapshot_url=self.subscription_info.stream_config.snapshot_url,
|
|
max_retries=3
|
|
)
|
|
|
|
frame = snapshot_reader.fetch_single_snapshot()
|
|
|
|
if frame is not None:
|
|
logger.info(f"[SNAPSHOT] Successfully fetched {frame.shape[1]}x{frame.shape[0]} snapshot")
|
|
return frame
|
|
else:
|
|
logger.warning("[SNAPSHOT] Failed to fetch snapshot")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"[SNAPSHOT] Error fetching snapshot: {e}", exc_info=True)
|
|
return None
|
|
|
|
async def _execute_fallback_pipeline(self, display_id: str, session_id: str, subscription_id: str):
|
|
"""
|
|
Execute fallback pipeline when sessionId is received without prior detection.
|
|
This handles POS-initiated sessions where backend starts transaction before car detection.
|
|
|
|
Args:
|
|
display_id: Display identifier
|
|
session_id: Session ID from backend
|
|
subscription_id: Subscription identifier for pipeline execution
|
|
"""
|
|
try:
|
|
logger.info(f"[FALLBACK PIPELINE] Executing for session {session_id}, display {display_id}")
|
|
|
|
# Fetch fresh snapshot from camera
|
|
frame = self._fetch_snapshot()
|
|
|
|
if frame is None:
|
|
logger.error(f"[FALLBACK] Failed to fetch snapshot for session {session_id}, cannot execute pipeline")
|
|
return
|
|
|
|
logger.info(f"[FALLBACK] Using snapshot frame {frame.shape[1]}x{frame.shape[0]} for session {session_id}")
|
|
|
|
# Check if detection pipeline is available
|
|
if not self.detection_pipeline:
|
|
logger.error(f"[FALLBACK] Detection pipeline not available for session {session_id}")
|
|
return
|
|
|
|
# Execute detection phase to get detected regions
|
|
detection_result = await self.detection_pipeline.execute_detection_phase(
|
|
frame=frame,
|
|
display_id=display_id,
|
|
subscription_id=subscription_id
|
|
)
|
|
|
|
logger.info(f"[FALLBACK] Detection phase completed for session {session_id}: "
|
|
f"status={detection_result.get('status', 'unknown')}, "
|
|
f"regions={list(detection_result.get('detected_regions', {}).keys())}")
|
|
|
|
# If detection found regions, execute processing phase
|
|
detected_regions = detection_result.get('detected_regions', {})
|
|
if detected_regions:
|
|
processing_result = await self.detection_pipeline.execute_processing_phase(
|
|
frame=frame,
|
|
display_id=display_id,
|
|
session_id=session_id,
|
|
subscription_id=subscription_id,
|
|
detected_regions=detected_regions
|
|
)
|
|
|
|
logger.info(f"[FALLBACK] Processing phase completed for session {session_id}: "
|
|
f"status={processing_result.get('status', 'unknown')}, "
|
|
f"branches={len(processing_result.get('branch_results', {}))}, "
|
|
f"actions={len(processing_result.get('actions_executed', []))}")
|
|
|
|
# Update statistics
|
|
self.stats['pipelines_executed'] += 1
|
|
|
|
else:
|
|
logger.warning(f"[FALLBACK] No detections found in snapshot for session {session_id}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"[FALLBACK] Error executing fallback pipeline for session {session_id}: {e}", exc_info=True)
|
|
|
|
def _filter_small_frontals(self, tracking_results, frame):
|
|
"""
|
|
Filter out frontal detections that are smaller than minimum bbox area percentage.
|
|
This prevents processing of cars from neighboring pumps that appear in camera view.
|
|
|
|
Args:
|
|
tracking_results: YOLO tracking results with detections
|
|
frame: Input frame for calculating frame area
|
|
|
|
Returns:
|
|
Modified tracking_results with small frontals removed
|
|
"""
|
|
if not hasattr(tracking_results, 'detections') or not tracking_results.detections:
|
|
return tracking_results
|
|
|
|
# Calculate frame area and minimum bbox area threshold
|
|
frame_area = frame.shape[0] * frame.shape[1] # height * width
|
|
min_bbox_area = frame_area * (self.min_bbox_area_percentage / 100.0)
|
|
|
|
# Filter detections
|
|
filtered_detections = []
|
|
filtered_count = 0
|
|
|
|
for detection in tracking_results.detections:
|
|
# Calculate detection bbox area
|
|
bbox = detection.bbox # Assuming bbox is [x1, y1, x2, y2]
|
|
bbox_area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])
|
|
|
|
if bbox_area >= min_bbox_area:
|
|
# Keep detection - bbox is large enough
|
|
filtered_detections.append(detection)
|
|
else:
|
|
# Filter out small detection
|
|
filtered_count += 1
|
|
area_percentage = (bbox_area / frame_area) * 100
|
|
logger.debug(f"Filtered small frontal: area={bbox_area:.0f}px² ({area_percentage:.1f}% of frame, "
|
|
f"min required: {self.min_bbox_area_percentage}%)")
|
|
|
|
# Update tracking results with filtered detections
|
|
tracking_results.detections = filtered_detections
|
|
|
|
# Update statistics
|
|
if filtered_count > 0:
|
|
self.stats['frontals_filtered_small'] += filtered_count
|
|
logger.info(f"Filtered {filtered_count} small frontal detections, "
|
|
f"{len(filtered_detections)} remaining (total filtered: {self.stats['frontals_filtered_small']})")
|
|
|
|
return tracking_results
|
|
|
|
def cleanup(self):
|
|
"""Cleanup resources."""
|
|
self.executor.shutdown(wait=False)
|
|
self.reset_tracking()
|
|
|
|
|
|
# Cleanup detection pipeline
|
|
if self.detection_pipeline:
|
|
self.detection_pipeline.cleanup()
|
|
|
|
logger.info("Tracking pipeline integration cleaned up") |