feat: custom bot-sort based tracker

This commit is contained in:
ziesorx 2025-09-26 14:22:38 +07:00
parent bd201acac1
commit 791f611f7d
8 changed files with 649 additions and 282 deletions

View file

@ -1,6 +1,6 @@
"""
Vehicle Tracking Module - Continuous tracking with front_rear_detection model
Implements vehicle identification, persistence, and motion analysis.
Vehicle Tracking Module - BoT-SORT based tracking with camera isolation
Implements vehicle identification, persistence, and motion analysis using external tracker.
"""
import logging
import time
@ -10,6 +10,8 @@ from dataclasses import dataclass, field
import numpy as np
from threading import Lock
from .bot_sort_tracker import MultiCameraBoTSORT
logger = logging.getLogger(__name__)
@ -17,6 +19,7 @@ logger = logging.getLogger(__name__)
class TrackedVehicle:
"""Represents a tracked vehicle with all its state information."""
track_id: int
camera_id: str
first_seen: float
last_seen: float
session_id: Optional[str] = None
@ -30,6 +33,8 @@ class TrackedVehicle:
processed_pipeline: bool = False
last_position_history: List[Tuple[float, float]] = field(default_factory=list)
avg_confidence: float = 0.0
hit_streak: int = 0
age: int = 0
def update_position(self, bbox: Tuple[int, int, int, int], confidence: float):
"""Update vehicle position and confidence."""
@ -73,7 +78,7 @@ class TrackedVehicle:
class VehicleTracker:
"""
Main vehicle tracking implementation using YOLO tracking capabilities.
Main vehicle tracking implementation using BoT-SORT with camera isolation.
Manages continuous tracking, vehicle identification, and state persistence.
"""
@ -88,18 +93,19 @@ class VehicleTracker:
self.trigger_classes = self.config.get('trigger_classes', self.config.get('triggerClasses', ['frontal']))
self.min_confidence = self.config.get('minConfidence', 0.6)
# Tracking state
self.tracked_vehicles: Dict[int, TrackedVehicle] = {}
self.next_track_id = 1
# BoT-SORT multi-camera tracker
self.bot_sort = MultiCameraBoTSORT(self.trigger_classes, self.min_confidence)
# Tracking state - maintain compatibility with existing code
self.tracked_vehicles: Dict[str, Dict[int, TrackedVehicle]] = {} # camera_id -> {track_id: vehicle}
self.lock = Lock()
# Tracking parameters
self.stability_threshold = 0.7
self.min_stable_frames = 5
self.position_tolerance = 50 # pixels
self.timeout_seconds = 2.0
logger.info(f"VehicleTracker initialized with trigger_classes={self.trigger_classes}, "
logger.info(f"VehicleTracker initialized with BoT-SORT: trigger_classes={self.trigger_classes}, "
f"min_confidence={self.min_confidence}")
def process_detections(self,
@ -107,10 +113,10 @@ class VehicleTracker:
display_id: str,
frame: np.ndarray) -> List[TrackedVehicle]:
"""
Process YOLO detection results and update tracking state.
Process detection results using BoT-SORT tracking.
Args:
results: YOLO detection results with tracking
results: Detection results (InferenceResult)
display_id: Display identifier for this stream
frame: Current frame being processed
@ -118,108 +124,67 @@ class VehicleTracker:
List of currently tracked vehicles
"""
current_time = time.time()
active_tracks = []
# Extract camera_id from display_id for tracking isolation
camera_id = display_id # Using display_id as camera_id for isolation
with self.lock:
# Clean up expired tracks
expired_ids = [
track_id for track_id, vehicle in self.tracked_vehicles.items()
if vehicle.is_expired(self.timeout_seconds)
]
for track_id in expired_ids:
logger.debug(f"Removing expired track {track_id}")
del self.tracked_vehicles[track_id]
# Update BoT-SORT tracker
track_results = self.bot_sort.update(camera_id, results)
# Process new detections from InferenceResult
if hasattr(results, 'detections') and results.detections:
# Process detections from InferenceResult
for detection in results.detections:
# Skip if confidence is too low
if detection.confidence < self.min_confidence:
continue
# Ensure camera tracking dict exists
if camera_id not in self.tracked_vehicles:
self.tracked_vehicles[camera_id] = {}
# Check if class is in trigger classes
if detection.class_name not in self.trigger_classes:
continue
# Update tracked vehicles based on BoT-SORT results
current_tracks = {}
active_tracks = []
# Use track_id if available, otherwise generate one
track_id = detection.track_id if detection.track_id is not None else self.next_track_id
if detection.track_id is None:
self.next_track_id += 1
for track_result in track_results:
track_id = track_result['track_id']
# Get bounding box from Detection object
x1, y1, x2, y2 = detection.bbox
bbox = (int(x1), int(y1), int(x2), int(y2))
# Create or update TrackedVehicle
if track_id in self.tracked_vehicles[camera_id]:
# Update existing vehicle
vehicle = self.tracked_vehicles[camera_id][track_id]
vehicle.update_position(track_result['bbox'], track_result['confidence'])
vehicle.hit_streak = track_result['hit_streak']
vehicle.age = track_result['age']
# Update or create tracked vehicle
confidence = detection.confidence
if track_id in self.tracked_vehicles:
# Update existing track
vehicle = self.tracked_vehicles[track_id]
vehicle.update_position(bbox, confidence)
vehicle.display_id = display_id
# Update stability based on hit_streak
if vehicle.hit_streak >= self.min_stable_frames:
vehicle.is_stable = True
vehicle.stable_frames = vehicle.hit_streak
# Check stability
stability = vehicle.calculate_stability()
if stability > self.stability_threshold:
vehicle.stable_frames += 1
if vehicle.stable_frames >= self.min_stable_frames:
vehicle.is_stable = True
else:
vehicle.stable_frames = max(0, vehicle.stable_frames - 1)
if vehicle.stable_frames < self.min_stable_frames:
vehicle.is_stable = False
logger.debug(f"Updated track {track_id}: conf={vehicle.confidence:.2f}, "
f"stable={vehicle.is_stable}, hit_streak={vehicle.hit_streak}")
else:
# Create new vehicle
x1, y1, x2, y2 = track_result['bbox']
vehicle = TrackedVehicle(
track_id=track_id,
camera_id=camera_id,
first_seen=current_time,
last_seen=current_time,
display_id=display_id,
confidence=track_result['confidence'],
bbox=tuple(track_result['bbox']),
center=((x1 + x2) / 2, (y1 + y2) / 2),
total_frames=1,
hit_streak=track_result['hit_streak'],
age=track_result['age']
)
vehicle.last_position_history.append(vehicle.center)
logger.info(f"New vehicle tracked: ID={track_id}, camera={camera_id}, display={display_id}")
logger.debug(f"Updated track {track_id}: conf={confidence:.2f}, "
f"stable={vehicle.is_stable}, stability={stability:.2f}")
else:
# Create new track
vehicle = TrackedVehicle(
track_id=track_id,
first_seen=current_time,
last_seen=current_time,
display_id=display_id,
confidence=confidence,
bbox=bbox,
center=((x1 + x2) / 2, (y1 + y2) / 2),
total_frames=1
)
vehicle.last_position_history.append(vehicle.center)
self.tracked_vehicles[track_id] = vehicle
logger.info(f"New vehicle tracked: ID={track_id}, display={display_id}")
current_tracks[track_id] = vehicle
active_tracks.append(vehicle)
active_tracks.append(self.tracked_vehicles[track_id])
# Update the camera's tracked vehicles
self.tracked_vehicles[camera_id] = current_tracks
return active_tracks
def _find_closest_track(self, center: Tuple[float, float]) -> Optional[TrackedVehicle]:
"""
Find the closest existing track to a given position.
Args:
center: Center position to match
Returns:
Closest tracked vehicle if within tolerance, None otherwise
"""
min_distance = float('inf')
closest_track = None
for vehicle in self.tracked_vehicles.values():
if vehicle.is_expired(0.5): # Shorter timeout for matching
continue
distance = np.sqrt(
(center[0] - vehicle.center[0]) ** 2 +
(center[1] - vehicle.center[1]) ** 2
)
if distance < min_distance and distance < self.position_tolerance:
min_distance = distance
closest_track = vehicle
return closest_track
def get_stable_vehicles(self, display_id: Optional[str] = None) -> List[TrackedVehicle]:
"""
Get all stable vehicles, optionally filtered by display.
@ -231,11 +196,15 @@ class VehicleTracker:
List of stable tracked vehicles
"""
with self.lock:
stable = [
v for v in self.tracked_vehicles.values()
if v.is_stable and not v.is_expired(self.timeout_seconds)
and (display_id is None or v.display_id == display_id)
]
stable = []
camera_id = display_id # Using display_id as camera_id
if camera_id in self.tracked_vehicles:
for vehicle in self.tracked_vehicles[camera_id].values():
if (vehicle.is_stable and not vehicle.is_expired(self.timeout_seconds) and
(display_id is None or vehicle.display_id == display_id)):
stable.append(vehicle)
return stable
def get_vehicle_by_session(self, session_id: str) -> Optional[TrackedVehicle]:
@ -249,9 +218,11 @@ class VehicleTracker:
Tracked vehicle if found, None otherwise
"""
with self.lock:
for vehicle in self.tracked_vehicles.values():
if vehicle.session_id == session_id:
return vehicle
# Search across all cameras
for camera_vehicles in self.tracked_vehicles.values():
for vehicle in camera_vehicles.values():
if vehicle.session_id == session_id:
return vehicle
return None
def mark_processed(self, track_id: int, session_id: str):
@ -263,11 +234,14 @@ class VehicleTracker:
session_id: Session ID assigned to this vehicle
"""
with self.lock:
if track_id in self.tracked_vehicles:
vehicle = self.tracked_vehicles[track_id]
vehicle.processed_pipeline = True
vehicle.session_id = session_id
logger.info(f"Marked vehicle {track_id} as processed with session {session_id}")
# Search across all cameras for the track_id
for camera_vehicles in self.tracked_vehicles.values():
if track_id in camera_vehicles:
vehicle = camera_vehicles[track_id]
vehicle.processed_pipeline = True
vehicle.session_id = session_id
logger.info(f"Marked vehicle {track_id} as processed with session {session_id}")
return
def clear_session(self, session_id: str):
"""
@ -277,30 +251,43 @@ class VehicleTracker:
session_id: Session ID to clear
"""
with self.lock:
for vehicle in self.tracked_vehicles.values():
if vehicle.session_id == session_id:
logger.info(f"Clearing session {session_id} from vehicle {vehicle.track_id}")
vehicle.session_id = None
# Keep processed_pipeline=True to prevent re-processing
# Search across all cameras
for camera_vehicles in self.tracked_vehicles.values():
for vehicle in camera_vehicles.values():
if vehicle.session_id == session_id:
logger.info(f"Clearing session {session_id} from vehicle {vehicle.track_id}")
vehicle.session_id = None
# Keep processed_pipeline=True to prevent re-processing
def reset_tracking(self):
"""Reset all tracking state."""
with self.lock:
self.tracked_vehicles.clear()
self.next_track_id = 1
self.bot_sort.reset_all()
logger.info("Vehicle tracking state reset")
def get_statistics(self) -> Dict:
"""Get tracking statistics."""
with self.lock:
total = len(self.tracked_vehicles)
stable = sum(1 for v in self.tracked_vehicles.values() if v.is_stable)
processed = sum(1 for v in self.tracked_vehicles.values() if v.processed_pipeline)
total = 0
stable = 0
processed = 0
all_confidences = []
# Aggregate stats across all cameras
for camera_vehicles in self.tracked_vehicles.values():
total += len(camera_vehicles)
for vehicle in camera_vehicles.values():
if vehicle.is_stable:
stable += 1
if vehicle.processed_pipeline:
processed += 1
all_confidences.append(vehicle.avg_confidence)
return {
'total_tracked': total,
'stable_vehicles': stable,
'processed_vehicles': processed,
'avg_confidence': np.mean([v.avg_confidence for v in self.tracked_vehicles.values()])
if self.tracked_vehicles else 0.0
'avg_confidence': np.mean(all_confidences) if all_confidences else 0.0,
'bot_sort_stats': self.bot_sort.get_statistics()
}