293 lines
No EOL
11 KiB
Python
293 lines
No EOL
11 KiB
Python
"""
|
|
Vehicle Tracking Module - BoT-SORT based tracking with camera isolation
|
|
Implements vehicle identification, persistence, and motion analysis using external tracker.
|
|
"""
|
|
import logging
|
|
import time
|
|
import uuid
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
from dataclasses import dataclass, field
|
|
import numpy as np
|
|
from threading import Lock
|
|
|
|
from .bot_sort_tracker import MultiCameraBoTSORT
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class TrackedVehicle:
|
|
"""Represents a tracked vehicle with all its state information."""
|
|
track_id: int
|
|
camera_id: str
|
|
first_seen: float
|
|
last_seen: float
|
|
session_id: Optional[str] = None
|
|
display_id: Optional[str] = None
|
|
confidence: float = 0.0
|
|
bbox: Tuple[int, int, int, int] = (0, 0, 0, 0) # x1, y1, x2, y2
|
|
center: Tuple[float, float] = (0.0, 0.0)
|
|
stable_frames: int = 0
|
|
total_frames: int = 0
|
|
is_stable: bool = False
|
|
processed_pipeline: bool = False
|
|
last_position_history: List[Tuple[float, float]] = field(default_factory=list)
|
|
avg_confidence: float = 0.0
|
|
hit_streak: int = 0
|
|
age: int = 0
|
|
|
|
def update_position(self, bbox: Tuple[int, int, int, int], confidence: float):
|
|
"""Update vehicle position and confidence."""
|
|
self.bbox = bbox
|
|
self.center = ((bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2)
|
|
self.last_seen = time.time()
|
|
self.confidence = confidence
|
|
self.total_frames += 1
|
|
|
|
# Update confidence average
|
|
self.avg_confidence = ((self.avg_confidence * (self.total_frames - 1)) + confidence) / self.total_frames
|
|
|
|
# Maintain position history (last 10 positions)
|
|
self.last_position_history.append(self.center)
|
|
if len(self.last_position_history) > 10:
|
|
self.last_position_history.pop(0)
|
|
|
|
def calculate_stability(self) -> float:
|
|
"""Calculate stability score based on position history."""
|
|
if len(self.last_position_history) < 2:
|
|
return 0.0
|
|
|
|
# Calculate movement variance
|
|
positions = np.array(self.last_position_history)
|
|
if len(positions) < 2:
|
|
return 0.0
|
|
|
|
# Calculate standard deviation of positions
|
|
std_x = np.std(positions[:, 0])
|
|
std_y = np.std(positions[:, 1])
|
|
|
|
# Lower variance means more stable (inverse relationship)
|
|
# Normalize to 0-1 range (assuming max reasonable std is 50 pixels)
|
|
stability = max(0, 1 - (std_x + std_y) / 100)
|
|
return stability
|
|
|
|
def is_expired(self, timeout_seconds: float = 2.0) -> bool:
|
|
"""Check if vehicle tracking has expired."""
|
|
return (time.time() - self.last_seen) > timeout_seconds
|
|
|
|
|
|
class VehicleTracker:
|
|
"""
|
|
Main vehicle tracking implementation using BoT-SORT with camera isolation.
|
|
Manages continuous tracking, vehicle identification, and state persistence.
|
|
"""
|
|
|
|
def __init__(self, tracking_config: Optional[Dict] = None):
|
|
"""
|
|
Initialize the vehicle tracker.
|
|
|
|
Args:
|
|
tracking_config: Configuration from pipeline.json tracking section
|
|
"""
|
|
self.config = tracking_config or {}
|
|
self.trigger_classes = self.config.get('trigger_classes', self.config.get('triggerClasses', ['frontal']))
|
|
self.min_confidence = self.config.get('minConfidence', 0.6)
|
|
|
|
# BoT-SORT multi-camera tracker
|
|
self.bot_sort = MultiCameraBoTSORT(self.trigger_classes, self.min_confidence)
|
|
|
|
# Tracking state - maintain compatibility with existing code
|
|
self.tracked_vehicles: Dict[str, Dict[int, TrackedVehicle]] = {} # camera_id -> {track_id: vehicle}
|
|
self.lock = Lock()
|
|
|
|
# Tracking parameters
|
|
self.stability_threshold = 0.7
|
|
self.min_stable_frames = 5
|
|
self.timeout_seconds = 2.0
|
|
|
|
logger.info(f"VehicleTracker initialized with BoT-SORT: trigger_classes={self.trigger_classes}, "
|
|
f"min_confidence={self.min_confidence}")
|
|
|
|
def process_detections(self,
|
|
results: Any,
|
|
display_id: str,
|
|
frame: np.ndarray) -> List[TrackedVehicle]:
|
|
"""
|
|
Process detection results using BoT-SORT tracking.
|
|
|
|
Args:
|
|
results: Detection results (InferenceResult)
|
|
display_id: Display identifier for this stream
|
|
frame: Current frame being processed
|
|
|
|
Returns:
|
|
List of currently tracked vehicles
|
|
"""
|
|
current_time = time.time()
|
|
|
|
# Extract camera_id from display_id for tracking isolation
|
|
camera_id = display_id # Using display_id as camera_id for isolation
|
|
|
|
with self.lock:
|
|
# Update BoT-SORT tracker
|
|
track_results = self.bot_sort.update(camera_id, results)
|
|
|
|
# Ensure camera tracking dict exists
|
|
if camera_id not in self.tracked_vehicles:
|
|
self.tracked_vehicles[camera_id] = {}
|
|
|
|
# Update tracked vehicles based on BoT-SORT results
|
|
current_tracks = {}
|
|
active_tracks = []
|
|
|
|
for track_result in track_results:
|
|
track_id = track_result['track_id']
|
|
|
|
# Create or update TrackedVehicle
|
|
if track_id in self.tracked_vehicles[camera_id]:
|
|
# Update existing vehicle
|
|
vehicle = self.tracked_vehicles[camera_id][track_id]
|
|
vehicle.update_position(track_result['bbox'], track_result['confidence'])
|
|
vehicle.hit_streak = track_result['hit_streak']
|
|
vehicle.age = track_result['age']
|
|
|
|
# Update stability based on hit_streak
|
|
if vehicle.hit_streak >= self.min_stable_frames:
|
|
vehicle.is_stable = True
|
|
vehicle.stable_frames = vehicle.hit_streak
|
|
|
|
logger.debug(f"Updated track {track_id}: conf={vehicle.confidence:.2f}, "
|
|
f"stable={vehicle.is_stable}, hit_streak={vehicle.hit_streak}")
|
|
else:
|
|
# Create new vehicle
|
|
x1, y1, x2, y2 = track_result['bbox']
|
|
vehicle = TrackedVehicle(
|
|
track_id=track_id,
|
|
camera_id=camera_id,
|
|
first_seen=current_time,
|
|
last_seen=current_time,
|
|
display_id=display_id,
|
|
confidence=track_result['confidence'],
|
|
bbox=tuple(track_result['bbox']),
|
|
center=((x1 + x2) / 2, (y1 + y2) / 2),
|
|
total_frames=1,
|
|
hit_streak=track_result['hit_streak'],
|
|
age=track_result['age']
|
|
)
|
|
vehicle.last_position_history.append(vehicle.center)
|
|
logger.info(f"New vehicle tracked: ID={track_id}, camera={camera_id}, display={display_id}")
|
|
|
|
current_tracks[track_id] = vehicle
|
|
active_tracks.append(vehicle)
|
|
|
|
# Update the camera's tracked vehicles
|
|
self.tracked_vehicles[camera_id] = current_tracks
|
|
|
|
return active_tracks
|
|
|
|
def get_stable_vehicles(self, display_id: Optional[str] = None) -> List[TrackedVehicle]:
|
|
"""
|
|
Get all stable vehicles, optionally filtered by display.
|
|
|
|
Args:
|
|
display_id: Optional display ID to filter by
|
|
|
|
Returns:
|
|
List of stable tracked vehicles
|
|
"""
|
|
with self.lock:
|
|
stable = []
|
|
camera_id = display_id # Using display_id as camera_id
|
|
|
|
if camera_id in self.tracked_vehicles:
|
|
for vehicle in self.tracked_vehicles[camera_id].values():
|
|
if (vehicle.is_stable and not vehicle.is_expired(self.timeout_seconds) and
|
|
(display_id is None or vehicle.display_id == display_id)):
|
|
stable.append(vehicle)
|
|
|
|
return stable
|
|
|
|
def get_vehicle_by_session(self, session_id: str) -> Optional[TrackedVehicle]:
|
|
"""
|
|
Get a tracked vehicle by its session ID.
|
|
|
|
Args:
|
|
session_id: Session ID to look up
|
|
|
|
Returns:
|
|
Tracked vehicle if found, None otherwise
|
|
"""
|
|
with self.lock:
|
|
# Search across all cameras
|
|
for camera_vehicles in self.tracked_vehicles.values():
|
|
for vehicle in camera_vehicles.values():
|
|
if vehicle.session_id == session_id:
|
|
return vehicle
|
|
return None
|
|
|
|
def mark_processed(self, track_id: int, session_id: str):
|
|
"""
|
|
Mark a vehicle as processed through the pipeline.
|
|
|
|
Args:
|
|
track_id: Track ID of the vehicle
|
|
session_id: Session ID assigned to this vehicle
|
|
"""
|
|
with self.lock:
|
|
# Search across all cameras for the track_id
|
|
for camera_vehicles in self.tracked_vehicles.values():
|
|
if track_id in camera_vehicles:
|
|
vehicle = camera_vehicles[track_id]
|
|
vehicle.processed_pipeline = True
|
|
vehicle.session_id = session_id
|
|
logger.info(f"Marked vehicle {track_id} as processed with session {session_id}")
|
|
return
|
|
|
|
def clear_session(self, session_id: str):
|
|
"""
|
|
Clear session ID from a tracked vehicle (post-fueling).
|
|
|
|
Args:
|
|
session_id: Session ID to clear
|
|
"""
|
|
with self.lock:
|
|
# Search across all cameras
|
|
for camera_vehicles in self.tracked_vehicles.values():
|
|
for vehicle in camera_vehicles.values():
|
|
if vehicle.session_id == session_id:
|
|
logger.info(f"Clearing session {session_id} from vehicle {vehicle.track_id}")
|
|
vehicle.session_id = None
|
|
# Keep processed_pipeline=True to prevent re-processing
|
|
|
|
def reset_tracking(self):
|
|
"""Reset all tracking state."""
|
|
with self.lock:
|
|
self.tracked_vehicles.clear()
|
|
self.bot_sort.reset_all()
|
|
logger.info("Vehicle tracking state reset")
|
|
|
|
def get_statistics(self) -> Dict:
|
|
"""Get tracking statistics."""
|
|
with self.lock:
|
|
total = 0
|
|
stable = 0
|
|
processed = 0
|
|
all_confidences = []
|
|
|
|
# Aggregate stats across all cameras
|
|
for camera_vehicles in self.tracked_vehicles.values():
|
|
total += len(camera_vehicles)
|
|
for vehicle in camera_vehicles.values():
|
|
if vehicle.is_stable:
|
|
stable += 1
|
|
if vehicle.processed_pipeline:
|
|
processed += 1
|
|
all_confidences.append(vehicle.avg_confidence)
|
|
|
|
return {
|
|
'total_tracked': total,
|
|
'stable_vehicles': stable,
|
|
'processed_vehicles': processed,
|
|
'avg_confidence': np.mean(all_confidences) if all_confidences else 0.0,
|
|
'bot_sort_stats': self.bot_sort.get_statistics()
|
|
} |