From fa0f865319753d30c499899450117d4094293009 Mon Sep 17 00:00:00 2001 From: ziesorx Date: Tue, 30 Sep 2025 00:53:27 +0700 Subject: [PATCH 1/2] feat: add fallback when cant initially detect but backend start session --- core/tracking/integration.py | 136 +++++++++++++++++++++++++++++------ 1 file changed, 116 insertions(+), 20 deletions(-) diff --git a/core/tracking/integration.py b/core/tracking/integration.py index d1401ef..7d5f3f8 100644 --- a/core/tracking/integration.py +++ b/core/tracking/integration.py @@ -411,27 +411,12 @@ class TrackingPipelineIntegration: logger.info(f"Executing processing phase for session {session_id}, vehicle {vehicle.track_id}") # Capture high-quality snapshot for pipeline processing - frame = None - if self.subscription_info and self.subscription_info.stream_config.snapshot_url: - from ..streaming.readers import HTTPSnapshotReader + logger.info(f"[PROCESSING PHASE] Fetching 2K snapshot for session {session_id}") + frame = self._fetch_snapshot() - logger.info(f"[PROCESSING PHASE] Fetching 2K snapshot for session {session_id}") - snapshot_reader = HTTPSnapshotReader( - camera_id=self.subscription_info.camera_id, - snapshot_url=self.subscription_info.stream_config.snapshot_url, - max_retries=3 - ) - - frame = snapshot_reader.fetch_single_snapshot() - - if frame is not None: - logger.info(f"[PROCESSING PHASE] Successfully fetched {frame.shape[1]}x{frame.shape[0]} snapshot for pipeline") - else: - logger.warning(f"[PROCESSING PHASE] Failed to capture snapshot, falling back to RTSP frame") - # Fall back to RTSP frame if snapshot fails - frame = processing_data['frame'] - else: - logger.warning(f"[PROCESSING PHASE] No snapshot URL available, using RTSP frame") + if frame is None: + logger.warning(f"[PROCESSING PHASE] Failed to capture snapshot, falling back to RTSP frame") + # Fall back to RTSP frame if snapshot fails frame = processing_data['frame'] # Extract detected regions from detection phase result if available @@ -527,6 +512,19 @@ class TrackingPipelineIntegration: else: logger.warning(f"No pending processing data found for display {display_id} when setting session {session_id}") + # FALLBACK: Execute pipeline for POS-initiated sessions + logger.info(f"[FALLBACK] Triggering fallback pipeline for session {session_id} on display {display_id}") + + # Create subscription_id for fallback (needed for pipeline execution) + fallback_subscription_id = f"{display_id};fallback" + + # Trigger the fallback pipeline asynchronously + asyncio.create_task(self._execute_fallback_pipeline( + display_id=display_id, + session_id=session_id, + subscription_id=fallback_subscription_id + )) + def clear_session_id(self, session_id: str): """ Clear session ID (post-fueling). @@ -676,6 +674,104 @@ class TrackingPipelineIntegration: if stage == "car_wait_staff": logger.info(f"Started monitoring session {session_id} for car abandonment") + def _fetch_snapshot(self) -> Optional[np.ndarray]: + """ + Fetch high-quality snapshot from camera's snapshot URL. + Reusable method for both processing phase and fallback pipeline. + + Returns: + Snapshot frame or None if unavailable + """ + if not (self.subscription_info and self.subscription_info.stream_config.snapshot_url): + logger.warning("[SNAPSHOT] No subscription info or snapshot URL available") + return None + + try: + from ..streaming.readers import HTTPSnapshotReader + + logger.info(f"[SNAPSHOT] Fetching snapshot for {self.subscription_info.camera_id}") + snapshot_reader = HTTPSnapshotReader( + camera_id=self.subscription_info.camera_id, + snapshot_url=self.subscription_info.stream_config.snapshot_url, + max_retries=3 + ) + + frame = snapshot_reader.fetch_single_snapshot() + + if frame is not None: + logger.info(f"[SNAPSHOT] Successfully fetched {frame.shape[1]}x{frame.shape[0]} snapshot") + return frame + else: + logger.warning("[SNAPSHOT] Failed to fetch snapshot") + return None + + except Exception as e: + logger.error(f"[SNAPSHOT] Error fetching snapshot: {e}", exc_info=True) + return None + + async def _execute_fallback_pipeline(self, display_id: str, session_id: str, subscription_id: str): + """ + Execute fallback pipeline when sessionId is received without prior detection. + This handles POS-initiated sessions where backend starts transaction before car detection. + + Args: + display_id: Display identifier + session_id: Session ID from backend + subscription_id: Subscription identifier for pipeline execution + """ + try: + logger.info(f"[FALLBACK PIPELINE] Executing for session {session_id}, display {display_id}") + + # Fetch fresh snapshot from camera + frame = self._fetch_snapshot() + + if frame is None: + logger.error(f"[FALLBACK] Failed to fetch snapshot for session {session_id}, cannot execute pipeline") + return + + logger.info(f"[FALLBACK] Using snapshot frame {frame.shape[1]}x{frame.shape[0]} for session {session_id}") + + # Check if detection pipeline is available + if not self.detection_pipeline: + logger.error(f"[FALLBACK] Detection pipeline not available for session {session_id}") + return + + # Execute detection phase to get detected regions + detection_result = await self.detection_pipeline.execute_detection_phase( + frame=frame, + display_id=display_id, + subscription_id=subscription_id + ) + + logger.info(f"[FALLBACK] Detection phase completed for session {session_id}: " + f"status={detection_result.get('status', 'unknown')}, " + f"regions={list(detection_result.get('detected_regions', {}).keys())}") + + # If detection found regions, execute processing phase + detected_regions = detection_result.get('detected_regions', {}) + if detected_regions: + processing_result = await self.detection_pipeline.execute_processing_phase( + frame=frame, + display_id=display_id, + session_id=session_id, + subscription_id=subscription_id, + detected_regions=detected_regions + ) + + logger.info(f"[FALLBACK] Processing phase completed for session {session_id}: " + f"status={processing_result.get('status', 'unknown')}, " + f"branches={len(processing_result.get('branch_results', {}))}, " + f"actions={len(processing_result.get('actions_executed', []))}") + + # Update statistics + self.stats['pipelines_executed'] += 1 + + else: + logger.warning(f"[FALLBACK] No detections found in snapshot for session {session_id}") + + except Exception as e: + logger.error(f"[FALLBACK] Error executing fallback pipeline for session {session_id}: {e}", exc_info=True) + def _filter_small_frontals(self, tracking_results, frame): """ Filter out frontal detections that are smaller than minimum bbox area percentage. From 31bc91d57ba03d0cd2e4d6f8b936ad18d9adfaae Mon Sep 17 00:00:00 2001 From: ziesorx Date: Tue, 30 Sep 2025 12:06:03 +0700 Subject: [PATCH 2/2] fix: add ffmpeg flags fix frame delay --- core/streaming/readers/ffmpeg_rtsp.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/core/streaming/readers/ffmpeg_rtsp.py b/core/streaming/readers/ffmpeg_rtsp.py index 7c453f3..352c28e 100644 --- a/core/streaming/readers/ffmpeg_rtsp.py +++ b/core/streaming/readers/ffmpeg_rtsp.py @@ -115,10 +115,17 @@ class FFmpegRTSPReader(VideoReader): # DO NOT REMOVE '-hwaccel', 'cuda', '-hwaccel_device', '0', + # Real-time input flags + '-fflags', 'nobuffer+genpts+discardcorrupt', + '-flags', 'low_delay', + '-max_delay', '0', # No reordering delay + # RTSP configuration '-rtsp_transport', 'tcp', '-i', self.rtsp_url, + # Output configuration (keeping BMP) '-f', 'image2pipe', # Output images to pipe '-vcodec', 'bmp', # BMP format with header containing dimensions + '-vsync', 'passthrough', # Pass frames as-is # Use native stream resolution and framerate '-an', # No audio '-' # Output to stdout