Merge pull request 'merge main' (#21) from dev into main
All checks were successful
Build Worker Base and Application Images / check-base-changes (push) Successful in 8s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 2m48s
Build Worker Base and Application Images / deploy-stack (push) Successful in 19s
All checks were successful
Build Worker Base and Application Images / check-base-changes (push) Successful in 8s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 2m48s
Build Worker Base and Application Images / deploy-stack (push) Successful in 19s
Reviewed-on: #21
This commit is contained in:
commit
4a8b107297
2 changed files with 123 additions and 20 deletions
|
@ -115,10 +115,17 @@ class FFmpegRTSPReader(VideoReader):
|
|||
# DO NOT REMOVE
|
||||
'-hwaccel', 'cuda',
|
||||
'-hwaccel_device', '0',
|
||||
# Real-time input flags
|
||||
'-fflags', 'nobuffer+genpts+discardcorrupt',
|
||||
'-flags', 'low_delay',
|
||||
'-max_delay', '0', # No reordering delay
|
||||
# RTSP configuration
|
||||
'-rtsp_transport', 'tcp',
|
||||
'-i', self.rtsp_url,
|
||||
# Output configuration (keeping BMP)
|
||||
'-f', 'image2pipe', # Output images to pipe
|
||||
'-vcodec', 'bmp', # BMP format with header containing dimensions
|
||||
'-vsync', 'passthrough', # Pass frames as-is
|
||||
# Use native stream resolution and framerate
|
||||
'-an', # No audio
|
||||
'-' # Output to stdout
|
||||
|
|
|
@ -411,27 +411,12 @@ class TrackingPipelineIntegration:
|
|||
logger.info(f"Executing processing phase for session {session_id}, vehicle {vehicle.track_id}")
|
||||
|
||||
# Capture high-quality snapshot for pipeline processing
|
||||
frame = None
|
||||
if self.subscription_info and self.subscription_info.stream_config.snapshot_url:
|
||||
from ..streaming.readers import HTTPSnapshotReader
|
||||
logger.info(f"[PROCESSING PHASE] Fetching 2K snapshot for session {session_id}")
|
||||
frame = self._fetch_snapshot()
|
||||
|
||||
logger.info(f"[PROCESSING PHASE] Fetching 2K snapshot for session {session_id}")
|
||||
snapshot_reader = HTTPSnapshotReader(
|
||||
camera_id=self.subscription_info.camera_id,
|
||||
snapshot_url=self.subscription_info.stream_config.snapshot_url,
|
||||
max_retries=3
|
||||
)
|
||||
|
||||
frame = snapshot_reader.fetch_single_snapshot()
|
||||
|
||||
if frame is not None:
|
||||
logger.info(f"[PROCESSING PHASE] Successfully fetched {frame.shape[1]}x{frame.shape[0]} snapshot for pipeline")
|
||||
else:
|
||||
logger.warning(f"[PROCESSING PHASE] Failed to capture snapshot, falling back to RTSP frame")
|
||||
# Fall back to RTSP frame if snapshot fails
|
||||
frame = processing_data['frame']
|
||||
else:
|
||||
logger.warning(f"[PROCESSING PHASE] No snapshot URL available, using RTSP frame")
|
||||
if frame is None:
|
||||
logger.warning(f"[PROCESSING PHASE] Failed to capture snapshot, falling back to RTSP frame")
|
||||
# Fall back to RTSP frame if snapshot fails
|
||||
frame = processing_data['frame']
|
||||
|
||||
# Extract detected regions from detection phase result if available
|
||||
|
@ -527,6 +512,19 @@ class TrackingPipelineIntegration:
|
|||
else:
|
||||
logger.warning(f"No pending processing data found for display {display_id} when setting session {session_id}")
|
||||
|
||||
# FALLBACK: Execute pipeline for POS-initiated sessions
|
||||
logger.info(f"[FALLBACK] Triggering fallback pipeline for session {session_id} on display {display_id}")
|
||||
|
||||
# Create subscription_id for fallback (needed for pipeline execution)
|
||||
fallback_subscription_id = f"{display_id};fallback"
|
||||
|
||||
# Trigger the fallback pipeline asynchronously
|
||||
asyncio.create_task(self._execute_fallback_pipeline(
|
||||
display_id=display_id,
|
||||
session_id=session_id,
|
||||
subscription_id=fallback_subscription_id
|
||||
))
|
||||
|
||||
def clear_session_id(self, session_id: str):
|
||||
"""
|
||||
Clear session ID (post-fueling).
|
||||
|
@ -676,6 +674,104 @@ class TrackingPipelineIntegration:
|
|||
if stage == "car_wait_staff":
|
||||
logger.info(f"Started monitoring session {session_id} for car abandonment")
|
||||
|
||||
def _fetch_snapshot(self) -> Optional[np.ndarray]:
|
||||
"""
|
||||
Fetch high-quality snapshot from camera's snapshot URL.
|
||||
Reusable method for both processing phase and fallback pipeline.
|
||||
|
||||
Returns:
|
||||
Snapshot frame or None if unavailable
|
||||
"""
|
||||
if not (self.subscription_info and self.subscription_info.stream_config.snapshot_url):
|
||||
logger.warning("[SNAPSHOT] No subscription info or snapshot URL available")
|
||||
return None
|
||||
|
||||
try:
|
||||
from ..streaming.readers import HTTPSnapshotReader
|
||||
|
||||
logger.info(f"[SNAPSHOT] Fetching snapshot for {self.subscription_info.camera_id}")
|
||||
snapshot_reader = HTTPSnapshotReader(
|
||||
camera_id=self.subscription_info.camera_id,
|
||||
snapshot_url=self.subscription_info.stream_config.snapshot_url,
|
||||
max_retries=3
|
||||
)
|
||||
|
||||
frame = snapshot_reader.fetch_single_snapshot()
|
||||
|
||||
if frame is not None:
|
||||
logger.info(f"[SNAPSHOT] Successfully fetched {frame.shape[1]}x{frame.shape[0]} snapshot")
|
||||
return frame
|
||||
else:
|
||||
logger.warning("[SNAPSHOT] Failed to fetch snapshot")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[SNAPSHOT] Error fetching snapshot: {e}", exc_info=True)
|
||||
return None
|
||||
|
||||
async def _execute_fallback_pipeline(self, display_id: str, session_id: str, subscription_id: str):
|
||||
"""
|
||||
Execute fallback pipeline when sessionId is received without prior detection.
|
||||
This handles POS-initiated sessions where backend starts transaction before car detection.
|
||||
|
||||
Args:
|
||||
display_id: Display identifier
|
||||
session_id: Session ID from backend
|
||||
subscription_id: Subscription identifier for pipeline execution
|
||||
"""
|
||||
try:
|
||||
logger.info(f"[FALLBACK PIPELINE] Executing for session {session_id}, display {display_id}")
|
||||
|
||||
# Fetch fresh snapshot from camera
|
||||
frame = self._fetch_snapshot()
|
||||
|
||||
if frame is None:
|
||||
logger.error(f"[FALLBACK] Failed to fetch snapshot for session {session_id}, cannot execute pipeline")
|
||||
return
|
||||
|
||||
logger.info(f"[FALLBACK] Using snapshot frame {frame.shape[1]}x{frame.shape[0]} for session {session_id}")
|
||||
|
||||
# Check if detection pipeline is available
|
||||
if not self.detection_pipeline:
|
||||
logger.error(f"[FALLBACK] Detection pipeline not available for session {session_id}")
|
||||
return
|
||||
|
||||
# Execute detection phase to get detected regions
|
||||
detection_result = await self.detection_pipeline.execute_detection_phase(
|
||||
frame=frame,
|
||||
display_id=display_id,
|
||||
subscription_id=subscription_id
|
||||
)
|
||||
|
||||
logger.info(f"[FALLBACK] Detection phase completed for session {session_id}: "
|
||||
f"status={detection_result.get('status', 'unknown')}, "
|
||||
f"regions={list(detection_result.get('detected_regions', {}).keys())}")
|
||||
|
||||
# If detection found regions, execute processing phase
|
||||
detected_regions = detection_result.get('detected_regions', {})
|
||||
if detected_regions:
|
||||
processing_result = await self.detection_pipeline.execute_processing_phase(
|
||||
frame=frame,
|
||||
display_id=display_id,
|
||||
session_id=session_id,
|
||||
subscription_id=subscription_id,
|
||||
detected_regions=detected_regions
|
||||
)
|
||||
|
||||
logger.info(f"[FALLBACK] Processing phase completed for session {session_id}: "
|
||||
f"status={processing_result.get('status', 'unknown')}, "
|
||||
f"branches={len(processing_result.get('branch_results', {}))}, "
|
||||
f"actions={len(processing_result.get('actions_executed', []))}")
|
||||
|
||||
# Update statistics
|
||||
self.stats['pipelines_executed'] += 1
|
||||
|
||||
else:
|
||||
logger.warning(f"[FALLBACK] No detections found in snapshot for session {session_id}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[FALLBACK] Error executing fallback pipeline for session {session_id}: {e}", exc_info=True)
|
||||
|
||||
def _filter_small_frontals(self, tracking_results, frame):
|
||||
"""
|
||||
Filter out frontal detections that are smaller than minimum bbox area percentage.
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue