fix: stability fix
All checks were successful
Build Worker Base and Application Images / check-base-changes (push) Successful in 10s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 2m53s
Build Worker Base and Application Images / deploy-stack (push) Successful in 8s

This commit is contained in:
ziesorx 2025-09-25 13:28:56 +07:00
parent bfab574058
commit 0cf0bc8b91
3 changed files with 215 additions and 60 deletions

View file

@ -1,7 +1,7 @@
{
"poll_interval_ms": 100,
"max_streams": 20,
"target_fps": 2,
"target_fps": 4,
"reconnect_interval_sec": 10,
"max_retries": -1,
"rtsp_buffer_size": 3,

View file

@ -31,40 +31,125 @@ class TrackedVehicle:
last_position_history: List[Tuple[float, float]] = field(default_factory=list)
avg_confidence: float = 0.0
def update_position(self, bbox: Tuple[int, int, int, int], confidence: float):
# Hybrid validation fields
track_id_changes: int = 0 # Number of times track ID changed for same position
position_stability_score: float = 0.0 # Independent position-based stability
continuous_stable_duration: float = 0.0 # Time continuously stable (ignoring track ID changes)
last_track_id_change: Optional[float] = None # When track ID last changed
original_track_id: int = None # First track ID seen at this position
def update_position(self, bbox: Tuple[int, int, int, int], confidence: float, new_track_id: Optional[int] = None):
"""Update vehicle position and confidence."""
self.bbox = bbox
self.center = ((bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2)
self.last_seen = time.time()
current_time = time.time()
self.last_seen = current_time
self.confidence = confidence
self.total_frames += 1
# Track ID change detection
if new_track_id is not None and new_track_id != self.track_id:
self.track_id_changes += 1
self.last_track_id_change = current_time
logger.debug(f"Track ID changed from {self.track_id} to {new_track_id} for same vehicle")
self.track_id = new_track_id
# Set original track ID if not set
if self.original_track_id is None:
self.original_track_id = self.track_id
# Update confidence average
self.avg_confidence = ((self.avg_confidence * (self.total_frames - 1)) + confidence) / self.total_frames
# Maintain position history (last 10 positions)
# Maintain position history (last 15 positions for better stability analysis)
self.last_position_history.append(self.center)
if len(self.last_position_history) > 10:
if len(self.last_position_history) > 15:
self.last_position_history.pop(0)
def calculate_stability(self) -> float:
"""Calculate stability score based on position history."""
if len(self.last_position_history) < 2:
return 0.0
# Update position-based stability
self._update_position_stability()
def _update_position_stability(self):
"""Update position-based stability score independent of track ID."""
if len(self.last_position_history) < 5:
self.position_stability_score = 0.0
return
# Calculate movement variance
positions = np.array(self.last_position_history)
if len(positions) < 2:
return 0.0
# Calculate standard deviation of positions
# Calculate position variance (lower = more stable)
std_x = np.std(positions[:, 0])
std_y = np.std(positions[:, 1])
# Lower variance means more stable (inverse relationship)
# Normalize to 0-1 range (assuming max reasonable std is 50 pixels)
stability = max(0, 1 - (std_x + std_y) / 100)
return stability
# Calculate movement velocity
if len(positions) >= 3:
recent_movement = np.mean([
np.sqrt((positions[i][0] - positions[i-1][0])**2 +
(positions[i][1] - positions[i-1][1])**2)
for i in range(-3, 0)
])
else:
recent_movement = 0
# Position-based stability (0-1 where 1 = perfectly stable)
max_reasonable_std = 150 # For HD resolution
variance_score = max(0, 1 - (std_x + std_y) / max_reasonable_std)
velocity_score = max(0, 1 - recent_movement / 20) # 20 pixels max reasonable movement
self.position_stability_score = (variance_score * 0.7 + velocity_score * 0.3)
# Update continuous stable duration
if self.position_stability_score > 0.7:
if self.continuous_stable_duration == 0:
# Start tracking stable duration
self.continuous_stable_duration = 0.1 # Small initial value
else:
# Continue tracking
self.continuous_stable_duration = time.time() - self.first_seen
else:
# Reset if not stable
self.continuous_stable_duration = 0.0
def calculate_stability(self) -> float:
"""Calculate stability score based on position history."""
return self.position_stability_score
def calculate_hybrid_stability(self) -> Tuple[float, str]:
"""
Calculate hybrid stability considering both track ID continuity and position stability.
Returns:
Tuple of (stability_score, reasoning)
"""
if len(self.last_position_history) < 5:
return 0.0, "Insufficient position history"
position_stable = self.position_stability_score > 0.7
has_stable_duration = self.continuous_stable_duration > 2.0 # 2+ seconds stable
recent_track_change = (self.last_track_id_change is not None and
(time.time() - self.last_track_id_change) < 1.0)
# Base stability from position
base_score = self.position_stability_score
# Penalties and bonuses
if self.track_id_changes > 3:
# Too many track ID changes - likely tracking issues
base_score *= 0.8
reason = f"Multiple track ID changes ({self.track_id_changes})"
elif recent_track_change:
# Recent track change - be cautious
base_score *= 0.9
reason = "Recent track ID change"
else:
reason = "Position-based stability"
# Bonus for long continuous stability regardless of track ID changes
if has_stable_duration:
base_score = min(1.0, base_score + 0.1)
reason += f" + {self.continuous_stable_duration:.1f}s continuous"
return base_score, reason
def is_expired(self, timeout_seconds: float = 2.0) -> bool:
"""Check if vehicle tracking has expired."""
@ -90,14 +175,15 @@ class VehicleTracker:
# Tracking state
self.tracked_vehicles: Dict[int, TrackedVehicle] = {}
self.position_registry: Dict[str, TrackedVehicle] = {} # Position-based vehicle registry
self.next_track_id = 1
self.lock = Lock()
# Tracking parameters
self.stability_threshold = 0.7
self.min_stable_frames = 5
self.position_tolerance = 50 # pixels
self.timeout_seconds = 2.0
self.stability_threshold = 0.65 # Lowered for gas station scenarios
self.min_stable_frames = 8 # Increased for 4fps processing
self.position_tolerance = 80 # pixels - increased for gas station scenarios
self.timeout_seconds = 8.0 # Increased for gas station scenarios
logger.info(f"VehicleTracker initialized with trigger_classes={self.trigger_classes}, "
f"min_confidence={self.min_confidence}")
@ -127,6 +213,11 @@ class VehicleTracker:
if vehicle.is_expired(self.timeout_seconds)
]
for track_id in expired_ids:
vehicle = self.tracked_vehicles[track_id]
# Remove from position registry too
position_key = self._get_position_key(vehicle.center)
if position_key in self.position_registry and self.position_registry[position_key] == vehicle:
del self.position_registry[position_key]
logger.debug(f"Removing expired track {track_id}")
del self.tracked_vehicles[track_id]
@ -142,56 +233,115 @@ class VehicleTracker:
if detection.class_name not in self.trigger_classes:
continue
# Use track_id if available, otherwise generate one
track_id = detection.track_id if detection.track_id is not None else self.next_track_id
if detection.track_id is None:
self.next_track_id += 1
# Get bounding box from Detection object
# Get bounding box and center from Detection object
x1, y1, x2, y2 = detection.bbox
bbox = (int(x1), int(y1), int(x2), int(y2))
# Update or create tracked vehicle
center = ((x1 + x2) / 2, (y1 + y2) / 2)
confidence = detection.confidence
if track_id in self.tracked_vehicles:
# Update existing track
vehicle = self.tracked_vehicles[track_id]
vehicle.update_position(bbox, confidence)
vehicle.display_id = display_id
# Check stability
stability = vehicle.calculate_stability()
if stability > self.stability_threshold:
vehicle.stable_frames += 1
if vehicle.stable_frames >= self.min_stable_frames:
vehicle.is_stable = True
# Hybrid approach: Try position-based association first, then track ID
track_id = detection.track_id
existing_vehicle = None
position_key = self._get_position_key(center)
# 1. Check position registry first (same physical location)
if position_key in self.position_registry:
existing_vehicle = self.position_registry[position_key]
if track_id is not None and track_id != existing_vehicle.track_id:
# Track ID changed for same position - update vehicle
existing_vehicle.update_position(bbox, confidence, track_id)
logger.debug(f"Track ID changed {existing_vehicle.track_id}->{track_id} at same position")
# Update tracking dict
if existing_vehicle.track_id in self.tracked_vehicles:
del self.tracked_vehicles[existing_vehicle.track_id]
self.tracked_vehicles[track_id] = existing_vehicle
else:
vehicle.stable_frames = max(0, vehicle.stable_frames - 1)
if vehicle.stable_frames < self.min_stable_frames:
vehicle.is_stable = False
# Same position, same/no track ID
existing_vehicle.update_position(bbox, confidence)
track_id = existing_vehicle.track_id
logger.debug(f"Updated track {track_id}: conf={confidence:.2f}, "
f"stable={vehicle.is_stable}, stability={stability:.2f}")
else:
# Create new track
vehicle = TrackedVehicle(
# 2. If no position match, try track ID approach
elif track_id is not None and track_id in self.tracked_vehicles:
# Existing track ID, check if position moved significantly
existing_vehicle = self.tracked_vehicles[track_id]
old_position_key = self._get_position_key(existing_vehicle.center)
# If position moved significantly, update position registry
if old_position_key != position_key:
if old_position_key in self.position_registry:
del self.position_registry[old_position_key]
self.position_registry[position_key] = existing_vehicle
existing_vehicle.update_position(bbox, confidence)
# 3. Try closest track association (fallback)
elif track_id is None:
closest_track = self._find_closest_track(center)
if closest_track:
existing_vehicle = closest_track
track_id = closest_track.track_id
existing_vehicle.update_position(bbox, confidence)
# Update position registry
self.position_registry[position_key] = existing_vehicle
logger.debug(f"Associated detection with existing track {track_id} based on proximity")
# 4. Create new vehicle if no associations found
if existing_vehicle is None:
track_id = track_id if track_id is not None else self.next_track_id
if track_id == self.next_track_id:
self.next_track_id += 1
existing_vehicle = TrackedVehicle(
track_id=track_id,
first_seen=current_time,
last_seen=current_time,
display_id=display_id,
confidence=confidence,
bbox=bbox,
center=((x1 + x2) / 2, (y1 + y2) / 2),
total_frames=1
center=center,
total_frames=1,
original_track_id=track_id
)
vehicle.last_position_history.append(vehicle.center)
self.tracked_vehicles[track_id] = vehicle
existing_vehicle.last_position_history.append(center)
self.tracked_vehicles[track_id] = existing_vehicle
self.position_registry[position_key] = existing_vehicle
logger.info(f"New vehicle tracked: ID={track_id}, display={display_id}")
active_tracks.append(self.tracked_vehicles[track_id])
# Check stability using hybrid approach
stability_score, reason = existing_vehicle.calculate_hybrid_stability()
if stability_score > self.stability_threshold:
existing_vehicle.stable_frames += 1
if existing_vehicle.stable_frames >= self.min_stable_frames:
existing_vehicle.is_stable = True
else:
existing_vehicle.stable_frames = max(0, existing_vehicle.stable_frames - 1)
if existing_vehicle.stable_frames < self.min_stable_frames:
existing_vehicle.is_stable = False
logger.debug(f"Updated track {track_id}: conf={confidence:.2f}, "
f"stable={existing_vehicle.is_stable}, hybrid_stability={stability_score:.2f} ({reason})")
active_tracks.append(existing_vehicle)
return active_tracks
def _get_position_key(self, center: Tuple[float, float]) -> str:
"""
Generate a position-based key for vehicle registry.
Groups nearby positions into the same key for association.
Args:
center: Center position (x, y)
Returns:
Position key string
"""
# Grid-based quantization - 60 pixel grid for gas station scenarios
grid_size = 60
grid_x = int(center[0] // grid_size)
grid_y = int(center[1] // grid_size)
return f"{grid_x}_{grid_y}"
def _find_closest_track(self, center: Tuple[float, float]) -> Optional[TrackedVehicle]:
"""
Find the closest existing track to a given position.
@ -206,7 +356,7 @@ class VehicleTracker:
closest_track = None
for vehicle in self.tracked_vehicles.values():
if vehicle.is_expired(0.5): # Shorter timeout for matching
if vehicle.is_expired(1.0): # Allow slightly older tracks for matching
continue
distance = np.sqrt(
@ -287,6 +437,7 @@ class VehicleTracker:
"""Reset all tracking state."""
with self.lock:
self.tracked_vehicles.clear()
self.position_registry.clear()
self.next_track_id = 1
logger.info("Vehicle tracking state reset")

View file

@ -51,8 +51,8 @@ class StableCarValidator:
# Validation thresholds
self.min_stable_duration = self.config.get('min_stable_duration', 3.0) # seconds
self.min_stable_frames = self.config.get('min_stable_frames', 10)
self.position_variance_threshold = self.config.get('position_variance_threshold', 25.0) # pixels
self.min_stable_frames = self.config.get('min_stable_frames', 8)
self.position_variance_threshold = self.config.get('position_variance_threshold', 40.0) # pixels - adjusted for HD
self.min_confidence = self.config.get('min_confidence', 0.7)
self.velocity_threshold = self.config.get('velocity_threshold', 5.0) # pixels/frame
self.entering_zone_ratio = self.config.get('entering_zone_ratio', 0.3) # 30% of frame
@ -188,9 +188,9 @@ class StableCarValidator:
x_position = vehicle.center[0] / self.frame_width
y_position = vehicle.center[1] / self.frame_height
# Check if vehicle is stable
stability = vehicle.calculate_stability()
if stability > 0.7 and velocity < self.velocity_threshold:
# Check if vehicle is stable using hybrid approach
stability_score, stability_reason = vehicle.calculate_hybrid_stability()
if stability_score > 0.65 and velocity < self.velocity_threshold:
# Check if it's been stable long enough
duration = time.time() - vehicle.first_seen
if duration > self.min_stable_duration and vehicle.stable_frames >= self.min_stable_frames:
@ -294,11 +294,15 @@ class StableCarValidator:
# All checks passed - vehicle is valid for processing
self.last_processed_vehicles[vehicle.track_id] = time.time()
# Get hybrid stability info for detailed reasoning
hybrid_stability, hybrid_reason = vehicle.calculate_hybrid_stability()
processing_reason = f"Vehicle is stable and ready for processing (hybrid: {hybrid_reason})"
return ValidationResult(
is_valid=True,
state=VehicleState.STABLE,
confidence=vehicle.avg_confidence,
reason="Vehicle is stable and ready for processing",
reason=processing_reason,
should_process=True,
track_id=vehicle.track_id
)