From 69671bbc1a693d9f8ffd7b866220fa0f8e931429 Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Sun, 19 Oct 2025 15:45:32 +0700 Subject: [PATCH 1/6] fix: docker compose file --- .gitea/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitea/workflows/build.yml b/.gitea/workflows/build.yml index 316c4dc..d44c87b 100644 --- a/.gitea/workflows/build.yml +++ b/.gitea/workflows/build.yml @@ -105,7 +105,7 @@ jobs: echo "Pulling and starting containers on server..." if [ "${{ github.ref_name }}" = "main" ]; then echo "Deploying production stack..." - ssh -i ~/.ssh/id_rsa ${{ vars.DEPLOY_USER_CMS }}@${{ vars.DEPLOY_HOST_CMS }} "cd ~/cms-system-k8s && docker compose -f docker-compose.production.yml pull && docker compose -f docker-compose.production.yml up -d" + ssh -i ~/.ssh/id_rsa ${{ vars.DEPLOY_USER_CMS }}@${{ vars.DEPLOY_HOST_CMS }} "cd ~/cms-system-k8s && docker compose -f docker-compose.staging.yml -f docker-compose.production.yml pull && docker compose -f docker-compose.staging.yml -f docker-compose.production.yml up -d" else echo "Deploying staging stack..." ssh -i ~/.ssh/id_rsa ${{ vars.DEPLOY_USER_CMS }}@${{ vars.DEPLOY_HOST_CMS }} "cd ~/cms-system-k8s && docker compose -f docker-compose.staging.yml pull && docker compose -f docker-compose.staging.yml up -d" -- 2.47.2 From 498b285e8032904b31cf73446211b4d19066c3a5 Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Sun, 19 Oct 2025 15:48:00 +0700 Subject: [PATCH 2/6] fix: staging deployment --- .gitea/workflows/build.yml | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/.gitea/workflows/build.yml b/.gitea/workflows/build.yml index d44c87b..dc4f18d 100644 --- a/.gitea/workflows/build.yml +++ b/.gitea/workflows/build.yml @@ -103,10 +103,4 @@ jobs: - name: Deploy stack run: | echo "Pulling and starting containers on server..." - if [ "${{ github.ref_name }}" = "main" ]; then - echo "Deploying production stack..." - ssh -i ~/.ssh/id_rsa ${{ vars.DEPLOY_USER_CMS }}@${{ vars.DEPLOY_HOST_CMS }} "cd ~/cms-system-k8s && docker compose -f docker-compose.staging.yml -f docker-compose.production.yml pull && docker compose -f docker-compose.staging.yml -f docker-compose.production.yml up -d" - else - echo "Deploying staging stack..." - ssh -i ~/.ssh/id_rsa ${{ vars.DEPLOY_USER_CMS }}@${{ vars.DEPLOY_HOST_CMS }} "cd ~/cms-system-k8s && docker compose -f docker-compose.staging.yml pull && docker compose -f docker-compose.staging.yml up -d" - fi + ssh -i ~/.ssh/id_rsa ${{ vars.DEPLOY_USER_CMS }}@${{ vars.DEPLOY_HOST_CMS }} "cd ~/cms-system-k8s && docker compose -f docker-compose.staging.yml -f docker-compose.production.yml pull && docker compose -f docker-compose.staging.yml -f docker-compose.production.yml up -d" -- 2.47.2 From 5e59e00c553c1b3bb18b2845cb280c3281a04ce3 Mon Sep 17 00:00:00 2001 From: ziesorx Date: Mon, 20 Oct 2025 14:52:59 +0700 Subject: [PATCH 3/6] fix: classification top-1, dynamic result field, removed crop filter --- core/detection/branches.py | 113 ++++++++++++++++++++------ core/streaming/readers/ffmpeg_rtsp.py | 4 +- 2 files changed, 88 insertions(+), 29 deletions(-) diff --git a/core/detection/branches.py b/core/detection/branches.py index 247c5f8..9359ea8 100644 --- a/core/detection/branches.py +++ b/core/detection/branches.py @@ -35,6 +35,9 @@ class BranchProcessor: # Branch models cache self.branch_models: Dict[str, YOLOWrapper] = {} + # Dynamic field mapping: branch_id → output_field_name (e.g., {"car_brand_cls_v3": "brand"}) + self.branch_output_fields: Dict[str, str] = {} + # Thread pool for parallel execution self.executor = ThreadPoolExecutor(max_workers=4) @@ -68,6 +71,9 @@ class BranchProcessor: self.redis_manager = redis_manager self.db_manager = db_manager + # Parse field mappings from parallelActions to enable dynamic field extraction + self._parse_branch_output_fields(pipeline_config) + # Pre-load branch models if they exist branches = getattr(pipeline_config, 'branches', []) if branches: @@ -141,6 +147,46 @@ class BranchProcessor: logger.error(f"Error loading branch model {getattr(branch_config, 'model_id', 'unknown')}: {e}") return None + def _parse_branch_output_fields(self, pipeline_config: Any) -> None: + """ + Parse parallelActions.fields to determine what output field each branch produces. + Creates dynamic mapping from branch_id to output field name. + + Example: + Input: parallelActions.fields = {"car_brand": "{car_brand_cls_v3.brand}"} + Output: self.branch_output_fields = {"car_brand_cls_v3": "brand"} + + Args: + pipeline_config: Pipeline configuration object + """ + try: + if not pipeline_config or not hasattr(pipeline_config, 'parallel_actions'): + logger.debug("[FIELD MAPPING] No parallelActions found in pipeline config") + return + + for action in pipeline_config.parallel_actions: + if action.type.value == 'postgresql_update_combined': + fields = action.params.get('fields', {}) + + # Parse each field template to extract branch_id and field_name + for db_field_name, template in fields.items(): + # Template format: "{branch_id.field_name}" + if template.startswith('{') and template.endswith('}'): + var_name = template[1:-1] # Remove { } + + if '.' in var_name: + branch_id, field_name = var_name.split('.', 1) + + # Store the mapping + self.branch_output_fields[branch_id] = field_name + + logger.info(f"[FIELD MAPPING] Branch '{branch_id}' → outputs field '{field_name}'") + + logger.info(f"[FIELD MAPPING] Parsed {len(self.branch_output_fields)} branch output field mappings") + + except Exception as e: + logger.error(f"[FIELD MAPPING] Error parsing branch output fields: {e}", exc_info=True) + async def execute_branches(self, frame: np.ndarray, branches: List[Any], @@ -350,10 +396,11 @@ class BranchProcessor: logger.debug(f"[REGION DATA] {branch_id}: '{region_name}' -> bbox={region_data.get('bbox')}, conf={region_data.get('confidence')}") if trigger_classes: - # Check if any parent detection matches our trigger classes + # Check if any parent detection matches our trigger classes (case-insensitive) should_execute = False for trigger_class in trigger_classes: - if trigger_class in detected_regions: + # Case-insensitive comparison for robustness + if trigger_class.lower() in [k.lower() for k in detected_regions.keys()]: should_execute = True logger.info(f"[TRIGGER CHECK] {branch_id}: Found '{trigger_class}' in parent detections - branch will execute") break @@ -410,16 +457,15 @@ class BranchProcessor: region = detected_regions[crop_class] confidence = region.get('confidence', 0.0) - # Only use detections above min_confidence - if confidence >= min_confidence: - bbox = region['bbox'] - area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1]) # width * height + # Select largest bbox (no confidence filtering - parent already validated it) + bbox = region['bbox'] + area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1]) # width * height - # Choose biggest bbox among valid detections - if area > best_area: - best_region = region - best_class = crop_class - best_area = area + # Choose biggest bbox among available detections + if area > best_area: + best_region = region + best_class = crop_class + best_area = area if best_region: bbox = best_region['bbox'] @@ -478,17 +524,25 @@ class BranchProcessor: top_indices = probs.top5 # Get top 5 predictions top_conf = probs.top5conf.cpu().numpy() - for idx, conf in zip(top_indices, top_conf): - if conf >= min_confidence: - class_name = model.model.names[int(idx)] - logger.debug(f"[CLASSIFICATION RESULT {len(branch_detections)+1}] {branch_id}: '{class_name}', conf={conf:.3f}") + # For classification: take only TOP-1 prediction (not all top-5) + # This prevents empty results when all top-5 predictions are below threshold + if len(top_indices) > 0 and len(top_conf) > 0: + top_idx = top_indices[0] + top_confidence = float(top_conf[0]) + + # Apply minConfidence threshold to top-1 only + if top_confidence >= min_confidence: + class_name = model.model.names[int(top_idx)] + logger.info(f"[CLASSIFICATION TOP-1] {branch_id}: '{class_name}', conf={top_confidence:.3f}") # For classification, use full input frame dimensions as bbox branch_detections.append({ 'class_name': class_name, - 'confidence': float(conf), + 'confidence': top_confidence, 'bbox': [0, 0, input_frame.shape[1], input_frame.shape[0]] }) + else: + logger.warning(f"[CLASSIFICATION FILTERED] {branch_id}: Top prediction conf={top_confidence:.3f} < threshold={min_confidence}") else: logger.warning(f"[UNKNOWN MODEL] {branch_id}: Model results have no .boxes or .probs") @@ -499,22 +553,27 @@ class BranchProcessor: logger.info(f"[FINAL RESULTS] {branch_id}: {len(branch_detections)} detections processed") - # Extract best result for classification models + # Determine output field name from dynamic mapping (parsed from parallelActions.fields) + output_field = self.branch_output_fields.get(branch_id) + + # Always initialize the field (even if None) to ensure it exists for database update + if output_field: + result['result'][output_field] = None + logger.debug(f"[FIELD INIT] {branch_id}: Initialized field '{output_field}' = None") + + # Extract best detection if available if branch_detections: best_detection = max(branch_detections, key=lambda x: x['confidence']) logger.info(f"[BEST DETECTION] {branch_id}: '{best_detection['class_name']}' with confidence {best_detection['confidence']:.3f}") - # Add classification-style results for database operations - if 'brand' in branch_id.lower(): - result['result']['brand'] = best_detection['class_name'] - elif 'body' in branch_id.lower() or 'bodytype' in branch_id.lower(): - result['result']['body_type'] = best_detection['class_name'] - elif 'front_rear' in branch_id.lower(): - result['result']['front_rear'] = best_detection['confidence'] - - logger.info(f"[CLASSIFICATION RESULT] {branch_id}: Extracted classification fields") + # Set the output field value using dynamic mapping + if output_field: + result['result'][output_field] = best_detection['class_name'] + logger.info(f"[FIELD SET] {branch_id}: Set field '{output_field}' = '{best_detection['class_name']}'") + else: + logger.warning(f"[NO MAPPING] {branch_id}: No output field defined in parallelActions.fields") else: - logger.warning(f"[NO RESULTS] {branch_id}: No detections found") + logger.warning(f"[NO RESULTS] {branch_id}: No detections found, field '{output_field}' remains None") # Execute branch actions if this branch found valid detections actions_executed = [] diff --git a/core/streaming/readers/ffmpeg_rtsp.py b/core/streaming/readers/ffmpeg_rtsp.py index 88f45ae..e469c9e 100644 --- a/core/streaming/readers/ffmpeg_rtsp.py +++ b/core/streaming/readers/ffmpeg_rtsp.py @@ -113,8 +113,8 @@ class FFmpegRTSPReader(VideoReader): cmd = [ 'ffmpeg', # DO NOT REMOVE - '-hwaccel', 'cuda', - '-hwaccel_device', '0', + # '-hwaccel', 'cuda', + # '-hwaccel_device', '0', # Real-time input flags '-fflags', 'nobuffer+genpts', '-flags', 'low_delay', -- 2.47.2 From a4cfb264b9bddd305c47e4f42afe517a05a434d3 Mon Sep 17 00:00:00 2001 From: ziesorx Date: Mon, 20 Oct 2025 16:54:27 +0700 Subject: [PATCH 4/6] fix: car detection use wrong image source --- core/detection/branches.py | 54 ++++++++++++++++++++++++++---------- core/detection/pipeline.py | 34 ++++++++++++++++------- core/tracking/integration.py | 44 ++++++++++++++++++++++------- 3 files changed, 98 insertions(+), 34 deletions(-) diff --git a/core/detection/branches.py b/core/detection/branches.py index 9359ea8..97c44ff 100644 --- a/core/detection/branches.py +++ b/core/detection/branches.py @@ -393,7 +393,12 @@ class BranchProcessor: trigger_classes = getattr(branch_config, 'trigger_classes', []) logger.info(f"[DETECTED REGIONS] {branch_id}: Available parent detections: {list(detected_regions.keys())}") for region_name, region_data in detected_regions.items(): - logger.debug(f"[REGION DATA] {branch_id}: '{region_name}' -> bbox={region_data.get('bbox')}, conf={region_data.get('confidence')}") + # Handle both list (new) and single dict (backward compat) + if isinstance(region_data, list): + for i, region in enumerate(region_data): + logger.debug(f"[REGION DATA] {branch_id}: '{region_name}[{i}]' -> bbox={region.get('bbox')}, conf={region.get('confidence')}") + else: + logger.debug(f"[REGION DATA] {branch_id}: '{region_name}' -> bbox={region_data.get('bbox')}, conf={region_data.get('confidence')}") if trigger_classes: # Check if any parent detection matches our trigger classes (case-insensitive) @@ -454,18 +459,24 @@ class BranchProcessor: for crop_class in crop_classes: if crop_class in detected_regions: - region = detected_regions[crop_class] - confidence = region.get('confidence', 0.0) + regions = detected_regions[crop_class] - # Select largest bbox (no confidence filtering - parent already validated it) - bbox = region['bbox'] - area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1]) # width * height + # Handle both list (new) and single dict (backward compat) + if not isinstance(regions, list): + regions = [regions] - # Choose biggest bbox among available detections - if area > best_area: - best_region = region - best_class = crop_class - best_area = area + # Find largest bbox from all detections of this class + for region in regions: + confidence = region.get('confidence', 0.0) + bbox = region['bbox'] + area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1]) # width * height + + # Choose biggest bbox among all available detections + if area > best_area: + best_region = region + best_class = crop_class + best_area = area + logger.debug(f"[CROP] Selected larger bbox for '{crop_class}': area={area:.0f}px², conf={confidence:.3f}") if best_region: bbox = best_region['bbox'] @@ -483,7 +494,6 @@ class BranchProcessor: logger.info(f"[INFERENCE START] {branch_id}: Running inference on {'cropped' if input_frame is not frame else 'full'} frame " f"({input_frame.shape[1]}x{input_frame.shape[0]}) with confidence={min_confidence}") - # Use .predict() method for both detection and classification models inference_start = time.time() detection_results = model.model.predict(input_frame, conf=min_confidence, verbose=False) @@ -690,10 +700,26 @@ class BranchProcessor: bbox = None if region_name and region_name in detected_regions: # Crop the specified region - bbox = detected_regions[region_name]['bbox'] + # Handle both list (new) and single dict (backward compat) + regions = detected_regions[region_name] + if isinstance(regions, list): + # Multiple detections - select largest bbox + if regions: + best_region = max(regions, key=lambda r: (r['bbox'][2] - r['bbox'][0]) * (r['bbox'][3] - r['bbox'][1])) + bbox = best_region['bbox'] + else: + bbox = regions['bbox'] elif region_name and region_name.lower() == 'frontal' and 'front_rear' in detected_regions: # Special case: "frontal" region maps to "front_rear" detection - bbox = detected_regions['front_rear']['bbox'] + # Handle both list (new) and single dict (backward compat) + regions = detected_regions['front_rear'] + if isinstance(regions, list): + # Multiple detections - select largest bbox + if regions: + best_region = max(regions, key=lambda r: (r['bbox'][2] - r['bbox'][0]) * (r['bbox'][3] - r['bbox'][1])) + bbox = best_region['bbox'] + else: + bbox = regions['bbox'] if bbox is not None: x1, y1, x2, y2 = [int(coord) for coord in bbox] diff --git a/core/detection/pipeline.py b/core/detection/pipeline.py index d395f3a..ba9ac9a 100644 --- a/core/detection/pipeline.py +++ b/core/detection/pipeline.py @@ -495,11 +495,13 @@ class DetectionPipeline: } valid_detections.append(detection_info) - # Store region for processing phase - detected_regions[class_name] = { + # Store region for processing phase (support multiple detections per class) + if class_name not in detected_regions: + detected_regions[class_name] = [] + detected_regions[class_name].append({ 'bbox': bbox, 'confidence': confidence - } + }) else: logger.warning("[DETECTION PHASE] No boxes found in detection results") @@ -951,14 +953,26 @@ class DetectionPipeline: if region_name and region_name in detected_regions: # Crop the specified region - bbox = detected_regions[region_name]['bbox'] - x1, y1, x2, y2 = [int(coord) for coord in bbox] - cropped = frame[y1:y2, x1:x2] - if cropped.size > 0: - image_to_save = cropped - logger.debug(f"Cropped region '{region_name}' for redis_save_image") + # Handle both list (new) and single dict (backward compat) + regions = detected_regions[region_name] + if isinstance(regions, list): + # Multiple detections - select largest bbox + if regions: + best_region = max(regions, key=lambda r: (r['bbox'][2] - r['bbox'][0]) * (r['bbox'][3] - r['bbox'][1])) + bbox = best_region['bbox'] + else: + bbox = None else: - logger.warning(f"Empty crop for region '{region_name}', using full frame") + bbox = regions['bbox'] + + if bbox: + x1, y1, x2, y2 = [int(coord) for coord in bbox] + cropped = frame[y1:y2, x1:x2] + if cropped.size > 0: + image_to_save = cropped + logger.debug(f"Cropped region '{region_name}' for redis_save_image") + else: + logger.warning(f"Empty crop for region '{region_name}', using full frame") # Format key with context key = action.params['key'].format(**context) diff --git a/core/tracking/integration.py b/core/tracking/integration.py index 2fba002..1e3fc97 100644 --- a/core/tracking/integration.py +++ b/core/tracking/integration.py @@ -350,10 +350,21 @@ class TrackingPipelineIntegration: 'session_id': session_id } + # Fetch high-quality 2K snapshot for detection phase (not RTSP frame) + # This ensures bbox coordinates match the frame used in processing phase + logger.info(f"[DETECTION PHASE] Fetching 2K snapshot for vehicle {vehicle.track_id}") + snapshot_frame = self._fetch_snapshot() + + if snapshot_frame is None: + logger.warning(f"[DETECTION PHASE] Failed to fetch snapshot, falling back to RTSP frame") + snapshot_frame = frame # Fallback to RTSP if snapshot fails + else: + logger.info(f"[DETECTION PHASE] Using {snapshot_frame.shape[1]}x{snapshot_frame.shape[0]} snapshot for detection") + # Execute only the detection phase (first phase) # This will run detection and send imageDetection message to backend detection_result = await self.detection_pipeline.execute_detection_phase( - frame=frame, + frame=snapshot_frame, # Use 2K snapshot instead of RTSP frame display_id=display_id, subscription_id=subscription_id ) @@ -373,13 +384,13 @@ class TrackingPipelineIntegration: if detection_result['message_sent']: # Store for later processing when sessionId is received self.pending_processing_data[display_id] = { - 'frame': frame.copy(), # Store copy of frame for processing phase + 'frame': snapshot_frame.copy(), # Store copy of 2K snapshot (not RTSP frame!) 'vehicle': vehicle, 'subscription_id': subscription_id, 'detection_result': detection_result, 'timestamp': time.time() } - logger.info(f"Stored processing data for {display_id}, waiting for sessionId from backend") + logger.info(f"Stored processing data ({snapshot_frame.shape[1]}x{snapshot_frame.shape[0]} frame) for {display_id}, waiting for sessionId from backend") return detection_result @@ -413,14 +424,27 @@ class TrackingPipelineIntegration: logger.info(f"Executing processing phase for session {session_id}, vehicle {vehicle.track_id}") - # Capture high-quality snapshot for pipeline processing - logger.info(f"[PROCESSING PHASE] Fetching 2K snapshot for session {session_id}") - frame = self._fetch_snapshot() + # Reuse the snapshot from detection phase OR fetch fresh one if detection used RTSP fallback + detection_frame = processing_data['frame'] + frame_height = detection_frame.shape[0] - if frame is None: - logger.warning(f"[PROCESSING PHASE] Failed to capture snapshot, falling back to RTSP frame") - # Fall back to RTSP frame if snapshot fails - frame = processing_data['frame'] + # Check if detection phase used 2K snapshot (height > 1000) or RTSP fallback (height = 720) + if frame_height >= 1000: + # Detection used 2K snapshot - reuse it for consistent coordinates + logger.info(f"[PROCESSING PHASE] Reusing 2K snapshot from detection phase ({detection_frame.shape[1]}x{detection_frame.shape[0]})") + frame = detection_frame + else: + # Detection used RTSP fallback - need to fetch fresh 2K snapshot + logger.warning(f"[PROCESSING PHASE] Detection used RTSP fallback ({detection_frame.shape[1]}x{detection_frame.shape[0]}), fetching fresh 2K snapshot") + frame = self._fetch_snapshot() + + if frame is None: + logger.error(f"[PROCESSING PHASE] Failed to fetch snapshot and detection used RTSP - coordinate mismatch will occur!") + logger.error(f"[PROCESSING PHASE] Cannot proceed with mismatched coordinates. Aborting processing phase.") + return # Cannot process safely - bbox coordinates won't match frame resolution + else: + logger.warning(f"[PROCESSING PHASE] Fetched fresh 2K snapshot ({frame.shape[1]}x{frame.shape[0]}), but coordinates may not match exactly") + logger.warning(f"[PROCESSING PHASE] Re-running detection on fresh snapshot is recommended but not implemented yet") # Extract detected regions from detection phase result if available detected_regions = detection_result.get('detected_regions', {}) -- 2.47.2 From d102f1c4de34390ada31abd8e19498e5fcf023e0 Mon Sep 17 00:00:00 2001 From: ziesorx Date: Mon, 20 Oct 2025 17:05:05 +0700 Subject: [PATCH 5/6] fix: send partial results --- core/detection/pipeline.py | 48 ++++++++++++++++++++++++++++++++------ 1 file changed, 41 insertions(+), 7 deletions(-) diff --git a/core/detection/pipeline.py b/core/detection/pipeline.py index ba9ac9a..78001da 100644 --- a/core/detection/pipeline.py +++ b/core/detection/pipeline.py @@ -199,6 +199,8 @@ class DetectionPipeline: Dictionary with extracted field values (e.g., {"car_brand": "Honda", "body_type": "Sedan"}) """ extracted = {} + missing_fields = [] + available_fields = [] try: for db_field_name, template in self.field_mappings.items(): @@ -215,12 +217,21 @@ class DetectionPipeline: result_data = branch_data['result'] if isinstance(result_data, dict) and field_name in result_data: extracted[field_name] = result_data[field_name] + available_fields.append(f"{field_name}={result_data[field_name]}") logger.debug(f"[DYNAMIC EXTRACT] {field_name}={result_data[field_name]} from branch {branch_id}") else: + missing_fields.append(f"{field_name} (field not in branch {branch_id})") logger.debug(f"[DYNAMIC EXTRACT] Field '{field_name}' not found in branch {branch_id}") else: + missing_fields.append(f"{field_name} (branch {branch_id} missing)") logger.debug(f"[DYNAMIC EXTRACT] Branch '{branch_id}' not in results") + # Log summary of extraction + if available_fields: + logger.info(f"[FIELD EXTRACTION] Available fields: {', '.join(available_fields)}") + if missing_fields: + logger.warning(f"[FIELD EXTRACTION] Missing fields (will be null): {', '.join(missing_fields)}") + except Exception as e: logger.error(f"Error extracting fields from branches: {e}", exc_info=True) @@ -338,7 +349,17 @@ class DetectionPipeline: car_brand = extracted_fields.get('brand') body_type = extracted_fields.get('body_type') - logger.info(f"[LICENSE PLATE] Extracted fields: brand={car_brand}, body_type={body_type}") + # Log extraction results + fields_status = [] + if car_brand is not None: + fields_status.append(f"brand={car_brand}") + else: + fields_status.append("brand=null") + if body_type is not None: + fields_status.append(f"bodyType={body_type}") + else: + fields_status.append("bodyType=null") + logger.info(f"[LICENSE PLATE] Extracted fields: {', '.join(fields_status)}") # Clean up stored results after use del self.session_processing_results[session_id_for_lookup] @@ -367,7 +388,18 @@ class DetectionPipeline: # Send message await self.message_sender(detection_message) - logger.info(f"[COMBINED MESSAGE] Sent imageDetection with brand='{car_brand}', bodyType='{body_type}', license='{license_text}' to '{subscription_id}'") + + # Log with indication of partial results + null_fields = [] + if car_brand is None: + null_fields.append('brand') + if body_type is None: + null_fields.append('bodyType') + + if null_fields: + logger.info(f"[COMBINED MESSAGE] Sent imageDetection with PARTIAL results (null: {', '.join(null_fields)}) - brand='{car_brand}', bodyType='{body_type}', license='{license_text}' to '{subscription_id}'") + else: + logger.info(f"[COMBINED MESSAGE] Sent imageDetection with brand='{car_brand}', bodyType='{body_type}', license='{license_text}' to '{subscription_id}'") except Exception as e: logger.error(f"Error sending license plate imageDetection message: {e}", exc_info=True) @@ -1033,11 +1065,13 @@ class DetectionPipeline: wait_for_branches = action.params.get('waitForBranches', []) branch_results = context.get('branch_results', {}) - # Check if all required branches have completed - for branch_id in wait_for_branches: - if branch_id not in branch_results: - logger.warning(f"Branch {branch_id} result not available for database update") - return {'status': 'error', 'message': f'Missing branch result: {branch_id}'} + # Log missing branches but don't block the update (allow partial results) + missing_branches = [b for b in wait_for_branches if b not in branch_results] + if missing_branches: + logger.warning(f"Some branches missing from results (will use null): {missing_branches}") + available_branches = [b for b in wait_for_branches if b in branch_results] + if available_branches: + logger.info(f"Available branches for database update: {available_branches}") # Prepare fields for database update table = action.params.get('table', 'car_frontal_info') -- 2.47.2 From 10c54bc6e01dde612c382a5fc5d7e47f05f68fd1 Mon Sep 17 00:00:00 2001 From: ziesorx Date: Mon, 20 Oct 2025 17:05:20 +0700 Subject: [PATCH 6/6] chore: bring back cuda --- core/streaming/readers/ffmpeg_rtsp.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/streaming/readers/ffmpeg_rtsp.py b/core/streaming/readers/ffmpeg_rtsp.py index e469c9e..88f45ae 100644 --- a/core/streaming/readers/ffmpeg_rtsp.py +++ b/core/streaming/readers/ffmpeg_rtsp.py @@ -113,8 +113,8 @@ class FFmpegRTSPReader(VideoReader): cmd = [ 'ffmpeg', # DO NOT REMOVE - # '-hwaccel', 'cuda', - # '-hwaccel_device', '0', + '-hwaccel', 'cuda', + '-hwaccel_device', '0', # Real-time input flags '-fflags', 'nobuffer+genpts', '-flags', 'low_delay', -- 2.47.2