From fd470b3765c3507e02ecd10b05253dc7cd420595 Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Sun, 9 Nov 2025 19:54:41 +0700 Subject: [PATCH] remove old tests --- test_batch_inference.py | 310 ------------------------------------ test_event_driven_quick.py | 117 -------------- test_profiling.py | 218 ------------------------- test_tracking.py | 318 ------------------------------------- 4 files changed, 963 deletions(-) delete mode 100644 test_batch_inference.py delete mode 100755 test_event_driven_quick.py delete mode 100644 test_profiling.py delete mode 100644 test_tracking.py diff --git a/test_batch_inference.py b/test_batch_inference.py deleted file mode 100644 index f900914..0000000 --- a/test_batch_inference.py +++ /dev/null @@ -1,310 +0,0 @@ -""" -Batch Inference Test - Process Multiple Cameras in Single Batch - -This script demonstrates batch inference to eliminate sequential processing bottleneck. -Instead of processing 4 cameras one-by-one, we process all 4 in a single batched inference. - -Requirements: -- TensorRT model with dynamic batching support -- Rebuild model: python scripts/convert_pt_to_tensorrt.py --model yolov8n.pt - --output models/yolov8n_batch4.trt --dynamic-batch --max-batch 4 --fp16 - -Performance Comparison: -- Sequential: Process each camera separately (current bottleneck) -- Batched: Stack all frames → single inference → split results -""" - -import time -import os -import torch -from dotenv import load_dotenv -from services import ( - StreamDecoderFactory, - TensorRTModelRepository, - YOLOv8Utils, - COCO_CLASSES, -) - -load_dotenv() - - -def preprocess_batch(frames: list[torch.Tensor], input_size: int = 640) -> torch.Tensor: - """ - Preprocess multiple frames for batched inference. - - Args: - frames: List of GPU tensors, each (3, H, W) uint8 - input_size: Model input size (default: 640) - - Returns: - Batched tensor (B, 3, 640, 640) float32 - """ - # Preprocess each frame individually - preprocessed = [YOLOv8Utils.preprocess(frame, input_size) for frame in frames] - - # Stack into batch: (B, 3, 640, 640) - return torch.cat(preprocessed, dim=0) - - -def postprocess_batch(outputs: dict, conf_threshold: float = 0.25, - nms_threshold: float = 0.45) -> list[torch.Tensor]: - """ - Postprocess batched YOLOv8 output to per-image detections. - - YOLOv8 batched output: (B, 84, 8400) - - Args: - outputs: Dictionary of model outputs from TensorRT inference - conf_threshold: Confidence threshold - nms_threshold: IoU threshold for NMS - - Returns: - List of detection tensors, each (N, 6): [x1, y1, x2, y2, conf, class_id] - """ - from torchvision.ops import nms - - # Get output tensor - output_name = list(outputs.keys())[0] - output = outputs[output_name] # (B, 84, 8400) - - batch_size = output.shape[0] - results = [] - - for b in range(batch_size): - # Extract single image from batch - single_output = output[b:b+1] # (1, 84, 8400) - - # Reuse existing postprocessing logic - detections = YOLOv8Utils.postprocess( - {output_name: single_output}, - conf_threshold=conf_threshold, - nms_threshold=nms_threshold - ) - - results.append(detections) - - return results - - -def benchmark_sequential_vs_batch(duration: int = 30): - """ - Benchmark sequential vs batched inference. - - Args: - duration: Test duration in seconds - """ - print("=" * 80) - print("BATCH INFERENCE BENCHMARK") - print("=" * 80) - - GPU_ID = 0 - MODEL_PATH_BATCH = "models/yolov8n_batch4.trt" # Dynamic batch model - MODEL_PATH_SINGLE = "models/yolov8n.trt" # Original single-batch model - - # Check if batch model exists - if not os.path.exists(MODEL_PATH_BATCH): - print(f"\n⚠ Batch model not found: {MODEL_PATH_BATCH}") - print("\nTo create it, run:") - print(" python scripts/convert_pt_to_tensorrt.py \\") - print(" --model yolov8n.pt \\") - print(" --output models/yolov8n_batch4.trt \\") - print(" --dynamic-batch --max-batch 4 --fp16") - print("\nFalling back to simulated batch processing...") - use_true_batching = False - MODEL_PATH = MODEL_PATH_SINGLE - else: - use_true_batching = True - MODEL_PATH = MODEL_PATH_BATCH - print(f"\n✓ Using batch model: {MODEL_PATH_BATCH}") - - # Load camera URLs - camera_urls = [] - for i in range(1, 5): - url = os.getenv(f'CAMERA_URL_{i}') - if url: - camera_urls.append(url) - - if len(camera_urls) < 2: - print(f"⚠ Need at least 2 cameras, found {len(camera_urls)}") - return - - print(f"\nTesting with {len(camera_urls)} cameras") - - # Initialize components - print("\nInitializing...") - model_repo = TensorRTModelRepository(gpu_id=GPU_ID, default_num_contexts=4) - model_repo.load_model("detector", MODEL_PATH, num_contexts=4) - - stream_factory = StreamDecoderFactory(gpu_id=GPU_ID) - decoders = [] - - for i, url in enumerate(camera_urls): - decoder = stream_factory.create_decoder(url, buffer_size=30) - decoder.start() - decoders.append(decoder) - print(f" Camera {i+1}: {url}") - - print("\nWaiting for streams to connect...") - time.sleep(10) - - # ==================== SEQUENTIAL BENCHMARK ==================== - print("\n" + "=" * 80) - print("1. SEQUENTIAL INFERENCE (Current Method)") - print("=" * 80) - - frame_count_seq = 0 - start_time = time.time() - - print(f"\nRunning for {duration} seconds...") - - try: - while time.time() - start_time < duration: - for decoder in decoders: - frame_gpu = decoder.get_latest_frame(rgb=True) - if frame_gpu is None: - continue - - # Preprocess - preprocessed = YOLOv8Utils.preprocess(frame_gpu) - - # Inference (single frame) - outputs = model_repo.infer( - model_id="detector", - inputs={"images": preprocessed}, - synchronize=True - ) - - # Postprocess - detections = YOLOv8Utils.postprocess(outputs) - - frame_count_seq += 1 - - except KeyboardInterrupt: - pass - - seq_time = time.time() - start_time - seq_fps = frame_count_seq / seq_time - - print(f"\nSequential Results:") - print(f" Total frames: {frame_count_seq}") - print(f" Total time: {seq_time:.2f}s") - print(f" Combined FPS: {seq_fps:.2f}") - print(f" Per-camera FPS: {seq_fps / len(camera_urls):.2f}") - - # ==================== BATCHED BENCHMARK ==================== - print("\n" + "=" * 80) - print("2. BATCHED INFERENCE (Optimized Method)") - print("=" * 80) - - if not use_true_batching: - print("\n⚠ Skipping true batch inference (model not available)") - print(" Results would be identical without dynamic batch model") - else: - frame_count_batch = 0 - start_time = time.time() - - print(f"\nRunning for {duration} seconds...") - - try: - while time.time() - start_time < duration: - # Collect frames from all cameras - frames = [] - for decoder in decoders: - frame_gpu = decoder.get_latest_frame(rgb=True) - if frame_gpu is not None: - frames.append(frame_gpu) - - if len(frames) == 0: - continue - - # Batch preprocess - batch_input = preprocess_batch(frames) - - # Single batched inference - outputs = model_repo.infer( - model_id="detector", - inputs={"images": batch_input}, - synchronize=True - ) - - # Batch postprocess - batch_detections = postprocess_batch(outputs) - - frame_count_batch += len(frames) - - except KeyboardInterrupt: - pass - - batch_time = time.time() - start_time - batch_fps = frame_count_batch / batch_time - - print(f"\nBatched Results:") - print(f" Total frames: {frame_count_batch}") - print(f" Total time: {batch_time:.2f}s") - print(f" Combined FPS: {batch_fps:.2f}") - print(f" Per-camera FPS: {batch_fps / len(camera_urls):.2f}") - - # ==================== COMPARISON ==================== - print("\n" + "=" * 80) - print("COMPARISON") - print("=" * 80) - - improvement = ((batch_fps - seq_fps) / seq_fps) * 100 - - print(f"\nSequential: {seq_fps:.2f} FPS combined ({seq_fps / len(camera_urls):.2f} per camera)") - print(f"Batched: {batch_fps:.2f} FPS combined ({batch_fps / len(camera_urls):.2f} per camera)") - print(f"\nImprovement: {improvement:+.1f}%") - - if improvement > 10: - print("✓ Significant improvement with batch inference!") - elif improvement > 0: - print("✓ Moderate improvement with batch inference") - else: - print("⚠ No improvement - check batch model configuration") - - # Cleanup - print("\n" + "=" * 80) - print("Cleanup") - print("=" * 80) - - for i, decoder in enumerate(decoders): - decoder.stop() - print(f" Stopped camera {i+1}") - - print("\n✓ Benchmark complete!") - - -def test_batch_preprocessing(): - """Test that batch preprocessing works correctly""" - print("\n" + "=" * 80) - print("BATCH PREPROCESSING TEST") - print("=" * 80) - - # Create dummy frames - device = torch.device('cuda:0') - frames = [ - torch.randint(0, 256, (3, 720, 1280), dtype=torch.uint8, device=device) - for _ in range(4) - ] - - print(f"\nInput: {len(frames)} frames, each {frames[0].shape}") - - # Test batch preprocessing - batch = preprocess_batch(frames) - print(f"Output: {batch.shape} (expected: [4, 3, 640, 640])") - print(f"dtype: {batch.dtype} (expected: torch.float32)") - print(f"range: [{batch.min():.3f}, {batch.max():.3f}] (expected: [0.0, 1.0])") - - assert batch.shape == (4, 3, 640, 640), "Batch shape mismatch" - assert batch.dtype == torch.float32, "Dtype mismatch" - assert 0.0 <= batch.min() and batch.max() <= 1.0, "Value range incorrect" - - print("\n✓ Batch preprocessing test passed!") - - -if __name__ == "__main__": - # Test batch preprocessing - test_batch_preprocessing() - - # Run benchmark - benchmark_sequential_vs_batch(duration=30) diff --git a/test_event_driven_quick.py b/test_event_driven_quick.py deleted file mode 100755 index ca90494..0000000 --- a/test_event_driven_quick.py +++ /dev/null @@ -1,117 +0,0 @@ -#!/usr/bin/env python3 -""" -Quick test for event-driven stream processing - runs for 20 seconds. -""" - -import asyncio -import os -import logging -from dotenv import load_dotenv - -from services import StreamConnectionManager, YOLOv8Utils, COCO_CLASSES - -# Setup logging -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' -) -logger = logging.getLogger(__name__) - - -async def main(): - """Quick test with callback pattern""" - logger.info("=== Quick Event-Driven Test (20 seconds) ===") - - # Load environment variables - load_dotenv() - camera_url = os.getenv('CAMERA_URL_1') - if not camera_url: - logger.error("CAMERA_URL_1 not found in .env file") - return - - # Create manager - manager = StreamConnectionManager( - gpu_id=0, - batch_size=16, - force_timeout=0.05, # 50ms - poll_interval=0.01, # 100 FPS - ) - - # Initialize with YOLOv8 model - model_path = "models/yolov8n.trt" - logger.info(f"Initializing with model: {model_path}") - - await manager.initialize( - model_path=model_path, - model_id="yolo", - preprocess_fn=YOLOv8Utils.preprocess, - postprocess_fn=YOLOv8Utils.postprocess, - ) - - result_count = 0 - - # Define callback for tracking results - def on_tracking_result(result): - nonlocal result_count - result_count += 1 - - if result_count % 5 == 0: # Log every 5th result - logger.info(f"[{result.stream_id}] Frame {result.metadata.get('frame_number', 0)}") - logger.info(f" Tracked objects: {len(result.tracked_objects)}") - - for obj in result.tracked_objects[:3]: # Show first 3 - class_name = COCO_CLASSES.get(obj.class_id, f"Class {obj.class_id}") - logger.info( - f" Track ID {obj.track_id}: {class_name}, " - f"conf={obj.confidence:.2f}" - ) - - def on_error(error): - logger.error(f"Stream error: {error}") - - # Connect to stream - logger.info(f"Connecting to stream...") - connection = await manager.connect_stream( - rtsp_url=camera_url, - stream_id="test_camera", - on_tracking_result=on_tracking_result, - on_error=on_error, - ) - - # Monitor for 20 seconds with stats updates - for i in range(4): # 4 x 5 seconds = 20 seconds - await asyncio.sleep(5) - - stats = manager.get_stats() - model_stats = stats['model_controller'] - - logger.info(f"\n=== Stats Update {i+1}/4 ===") - logger.info(f"Results received: {result_count}") - logger.info(f"Buffer A: {model_stats['buffer_a_size']} ({model_stats['buffer_a_state']})") - logger.info(f"Buffer B: {model_stats['buffer_b_size']} ({model_stats['buffer_b_state']})") - logger.info(f"Active buffer: {model_stats['active_buffer']}") - logger.info(f"Total frames processed: {model_stats['total_frames_processed']}") - logger.info(f"Total batches: {model_stats['total_batches_processed']}") - logger.info(f"Avg batch size: {model_stats['avg_batch_size']:.2f}") - - # Final statistics - stats = manager.get_stats() - logger.info("\n=== Final Statistics ===") - logger.info(f"Total results received: {result_count}") - logger.info(f"Manager: {stats['manager']}") - logger.info(f"Model Controller: {stats['model_controller']}") - logger.info(f"Connection: {stats['connections']['test_camera']}") - - # Cleanup - logger.info("\nShutting down...") - await manager.shutdown() - logger.info("Test complete!") - - -if __name__ == "__main__": - try: - asyncio.run(main()) - except KeyboardInterrupt: - logger.info("\nInterrupted by user") - except Exception as e: - logger.error(f"Error: {e}", exc_info=True) diff --git a/test_profiling.py b/test_profiling.py deleted file mode 100644 index 0214748..0000000 --- a/test_profiling.py +++ /dev/null @@ -1,218 +0,0 @@ -""" -Detailed Profiling Script to Identify Performance Bottlenecks - -This script profiles each component separately: -1. Video decoding (NVDEC) -2. Preprocessing -3. TensorRT inference -4. Postprocessing (including NMS) -5. Tracking (IOU matching) -""" - -import time -import os -import torch -from dotenv import load_dotenv -from services import ( - StreamDecoderFactory, - TensorRTModelRepository, - TrackingFactory, - YOLOv8Utils, - COCO_CLASSES, -) - -load_dotenv() - - -def profile_component(name, iterations=100): - """Decorator for profiling a component.""" - def decorator(func): - def wrapper(*args, **kwargs): - times = [] - for _ in range(iterations): - start = time.time() - result = func(*args, **kwargs) - elapsed = time.time() - start - times.append(elapsed * 1000) # Convert to ms - - avg_time = sum(times) / len(times) - min_time = min(times) - max_time = max(times) - - print(f"\n{name}:") - print(f" Iterations: {iterations}") - print(f" Average: {avg_time:.2f} ms") - print(f" Min: {min_time:.2f} ms") - print(f" Max: {max_time:.2f} ms") - print(f" Throughput: {1000/avg_time:.2f} FPS") - - return result - return wrapper - return decorator - - -def main(): - print("=" * 80) - print("PERFORMANCE PROFILING - Component Breakdown") - print("=" * 80) - - GPU_ID = 0 - MODEL_PATH = "models/yolov8n.trt" - RTSP_URL = os.getenv('CAMERA_URL_1') - - # Initialize components - print("\nInitializing components...") - model_repo = TensorRTModelRepository(gpu_id=GPU_ID, default_num_contexts=4) - model_repo.load_model("detector", MODEL_PATH, num_contexts=4) - - tracking_factory = TrackingFactory(gpu_id=GPU_ID) - controller = tracking_factory.create_controller( - model_repository=model_repo, - model_id="detector", - tracker_type="iou", - max_age=30, - min_confidence=0.5, - iou_threshold=0.3, - class_names=COCO_CLASSES - ) - - stream_factory = StreamDecoderFactory(gpu_id=GPU_ID) - decoder = stream_factory.create_decoder(RTSP_URL, buffer_size=30) - decoder.start() - - print("Waiting for stream connection...") - connected = False - for i in range(30): - time.sleep(1) - if decoder.is_connected(): - connected = True - print(f"✓ Stream connected after {i+1} seconds") - break - if i % 5 == 0: - print(f" Waiting... {i+1}/30 seconds") - - if not connected: - print("⚠ Stream not connected after 30 seconds") - return - - print("✓ Stream connected\n") - print("=" * 80) - print("PROFILING RESULTS") - print("=" * 80) - - # Wait for frames to buffer - time.sleep(2) - - # Get a sample frame for testing - frame_gpu = decoder.get_latest_frame(rgb=True) - if frame_gpu is None: - print("⚠ No frames available") - return - - print(f"\nFrame shape: {frame_gpu.shape}") - print(f"Frame device: {frame_gpu.device}") - print(f"Frame dtype: {frame_gpu.dtype}") - - # Profile 1: Video Decoding - @profile_component("1. Video Decoding (NVDEC)", iterations=100) - def profile_decoding(): - return decoder.get_latest_frame(rgb=True) - - profile_decoding() - - # Profile 2: Preprocessing - @profile_component("2. Preprocessing (Resize + Normalize)", iterations=100) - def profile_preprocessing(): - return YOLOv8Utils.preprocess(frame_gpu) - - preprocessed = profile_preprocessing() - - # Profile 3: TensorRT Inference - @profile_component("3. TensorRT Inference", iterations=100) - def profile_inference(): - return model_repo.infer( - model_id="detector", - inputs={"images": preprocessed}, - synchronize=True - ) - - outputs = profile_inference() - - # Profile 4: Postprocessing (including NMS) - @profile_component("4. Postprocessing (NMS + Format Conversion)", iterations=100) - def profile_postprocessing(): - return YOLOv8Utils.postprocess(outputs) - - detections = profile_postprocessing() - - print(f"\nDetections shape: {detections.shape}") - print(f"Number of detections: {len(detections)}") - - # Profile 5: Full Pipeline (Tracking) - @profile_component("5. Full Tracking Pipeline", iterations=50) - def profile_full_pipeline(): - frame = decoder.get_latest_frame(rgb=True) - if frame is None: - return [] - return controller.track( - frame, - preprocess_fn=YOLOv8Utils.preprocess, - postprocess_fn=YOLOv8Utils.postprocess - ) - - profile_full_pipeline() - - # Profile 6: Parallel inference (simulate multi-camera) - print("\n" + "=" * 80) - print("MULTI-CAMERA SIMULATION") - print("=" * 80) - - num_cameras = 4 - print(f"\nSimulating {num_cameras} cameras processing sequentially...") - - @profile_component(f"Sequential Processing ({num_cameras} cameras)", iterations=20) - def profile_sequential(): - for _ in range(num_cameras): - frame = decoder.get_latest_frame(rgb=True) - if frame is not None: - controller.track( - frame, - preprocess_fn=YOLOv8Utils.preprocess, - postprocess_fn=YOLOv8Utils.postprocess - ) - - profile_sequential() - - # Cleanup - decoder.stop() - - # Summary - print("\n" + "=" * 80) - print("BOTTLENECK ANALYSIS") - print("=" * 80) - - print(""" -Based on the profiling results above, identify the bottleneck: - -1. If "TensorRT Inference" is the slowest: - → GPU compute is the bottleneck - → Solutions: Lower resolution, smaller model, batch processing - -2. If "Postprocessing (NMS)" is slow: - → CPU/GPU synchronization or NMS is slow - → Solutions: Optimize NMS, reduce detections threshold - -3. If "Video Decoding" is slow: - → NVDEC is the bottleneck - → Solutions: Lower resolution streams, fewer cameras per decoder - -4. If "Sequential Processing" time ≈ (single pipeline time × num_cameras): - → No parallelization, processing is sequential - → Solutions: Async processing, CUDA streams, batching - -Expected bottleneck: TensorRT Inference (most compute-intensive) - """) - - -if __name__ == "__main__": - main() diff --git a/test_tracking.py b/test_tracking.py deleted file mode 100644 index 5910c3c..0000000 --- a/test_tracking.py +++ /dev/null @@ -1,318 +0,0 @@ -""" -Test script for TrackingController and TrackingFactory. - -This script demonstrates how to use the tracking system with: -- TensorRT model repository (dependency injection) -- TrackingFactory for controller creation -- GPU-accelerated object tracking on RTSP streams -- Persistent track IDs and history management -""" - -import time -import os -from dotenv import load_dotenv -from services import ( - StreamDecoderFactory, - TensorRTModelRepository, - TrackingFactory, - TrackedObject -) - -# Load environment variables -load_dotenv() - - -def main(): - """ - Main test function demonstrating tracking workflow. - """ - # Configuration - GPU_ID = 0 - MODEL_PATH = "models/yolov8n.trt" # Update with your model path - RTSP_URL = os.getenv('CAMERA_URL_1', 'rtsp://localhost:8554/test') - BUFFER_SIZE = 30 - - # COCO class names (example for YOLOv8) - COCO_CLASSES = { - 0: 'person', 1: 'bicycle', 2: 'car', 3: 'motorcycle', 4: 'airplane', - 5: 'bus', 6: 'train', 7: 'truck', 8: 'boat', 9: 'traffic light', - # Add more as needed... - } - - print("=" * 80) - print("GPU-Accelerated Object Tracking Test") - print("=" * 80) - - # Step 1: Create model repository - print("\n[1/5] Initializing TensorRT Model Repository...") - model_repo = TensorRTModelRepository(gpu_id=GPU_ID, default_num_contexts=4) - - # Load detection model (if file exists) - model_id = "yolov8_detector" - if os.path.exists(MODEL_PATH): - try: - metadata = model_repo.load_model( - model_id=model_id, - file_path=MODEL_PATH, - num_contexts=4 - ) - print(f"✓ Model loaded successfully") - print(f" Input shape: {metadata.input_shapes}") - print(f" Output shape: {metadata.output_shapes}") - except Exception as e: - print(f"✗ Failed to load model: {e}") - print(f" Please ensure {MODEL_PATH} exists") - print(f" Continuing with demo (will use mock detections)...") - model_id = None - else: - print(f"✗ Model file not found: {MODEL_PATH}") - print(f" Continuing with demo (will use mock detections)...") - model_id = None - - # Step 2: Create tracking factory - print("\n[2/5] Creating TrackingFactory...") - tracking_factory = TrackingFactory(gpu_id=GPU_ID) - print(f"✓ Factory created: {tracking_factory}") - - # Step 3: Create tracking controller (only if model loaded) - tracking_controller = None - if model_id is not None: - print("\n[3/5] Creating TrackingController...") - try: - tracking_controller = tracking_factory.create_controller( - model_repository=model_repo, - model_id=model_id, - tracker_type="iou", - max_age=30, - min_confidence=0.5, - iou_threshold=0.3, - class_names=COCO_CLASSES - ) - print(f"✓ Controller created: {tracking_controller}") - except Exception as e: - print(f"✗ Failed to create controller: {e}") - tracking_controller = None - else: - print("\n[3/5] Skipping TrackingController creation (no model loaded)") - - # Step 4: Create stream decoder - print("\n[4/5] Creating RTSP Stream Decoder...") - stream_factory = StreamDecoderFactory(gpu_id=GPU_ID) - decoder = stream_factory.create_decoder( - rtsp_url=RTSP_URL, - buffer_size=BUFFER_SIZE - ) - decoder.start() - print(f"✓ Decoder started for: {RTSP_URL}") - print(f" Waiting for connection...") - - # Wait for stream connection - time.sleep(5) - - if decoder.is_connected(): - print(f"✓ Stream connected!") - else: - print(f"✗ Stream not connected (status: {decoder.get_status().value})") - print(f" Note: This is expected if RTSP URL is not available") - print(f" The tracking system will still work with valid streams") - - # Step 5: Run tracking loop (demo) - print("\n[5/5] Running Tracking Loop...") - print(f" Processing frames for 30 seconds...") - print(f" Press Ctrl+C to stop early\n") - - try: - frame_count = 0 - start_time = time.time() - - while time.time() - start_time < 30: - # Get latest frame from decoder (GPU tensor) - frame = decoder.get_latest_frame(rgb=True) - - if frame is None: - time.sleep(0.1) - continue - - frame_count += 1 - - # Run tracking (if controller available) - if tracking_controller is not None: - try: - # Track objects in frame - tracked_objects = tracking_controller.track(frame) - - # Display tracking results every 10 frames - if frame_count % 10 == 0: - print(f"\n--- Frame {frame_count} ---") - print(f"Active tracks: {len(tracked_objects)}") - - for obj in tracked_objects: - print(f" Track #{obj.track_id}: {obj.class_name} " - f"(conf={obj.confidence:.2f}, " - f"bbox={[f'{x:.1f}' for x in obj.bbox]}, " - f"age={obj.age(tracking_controller._frame_count)} frames)") - - # Print statistics - stats = tracking_controller.get_statistics() - print(f"\nStatistics:") - print(f" Total frames processed: {stats['frame_count']}") - print(f" Total tracks created: {stats['total_tracks_created']}") - print(f" Total detections: {stats['total_detections']}") - print(f" Avg detections/frame: {stats['avg_detections_per_frame']:.2f}") - print(f" Class counts: {stats['class_counts']}") - - except Exception as e: - print(f"✗ Tracking error on frame {frame_count}: {e}") - - # Small delay to avoid overwhelming output - time.sleep(0.1) - - except KeyboardInterrupt: - print("\n\n✓ Interrupted by user") - - # Cleanup - print("\n" + "=" * 80) - print("Cleanup") - print("=" * 80) - - if tracking_controller is not None: - print("\nTracking final statistics:") - stats = tracking_controller.get_statistics() - for key, value in stats.items(): - print(f" {key}: {value}") - - print("\nExporting tracks to JSON...") - try: - tracks_json = tracking_controller.export_tracks(format="json") - with open("tracked_objects.json", "w") as f: - f.write(tracks_json) - print(f"✓ Tracks exported to tracked_objects.json") - except Exception as e: - print(f"✗ Export failed: {e}") - - print("\nStopping decoder...") - decoder.stop() - print("✓ Decoder stopped") - - print("\n" + "=" * 80) - print("Test completed successfully!") - print("=" * 80) - - -def test_multi_camera_tracking(): - """ - Example: Track objects across multiple camera streams. - - This demonstrates: - - Shared model repository across multiple streams - - Multiple tracking controllers (one per camera) - - Efficient GPU resource usage - """ - GPU_ID = 0 - MODEL_PATH = "models/yolov8n.trt" - - # Load multiple camera URLs - camera_urls = [] - i = 1 - while True: - url = os.getenv(f'CAMERA_URL_{i}') - if url: - camera_urls.append(url) - i += 1 - else: - break - - if not camera_urls: - print("No camera URLs found in .env file") - return - - print(f"Testing multi-camera tracking with {len(camera_urls)} cameras") - - # Create shared model repository - model_repo = TensorRTModelRepository(gpu_id=GPU_ID, default_num_contexts=8) - - if os.path.exists(MODEL_PATH): - model_repo.load_model("detector", MODEL_PATH, num_contexts=8) - else: - print(f"Model not found: {MODEL_PATH}") - return - - # Create tracking factory - tracking_factory = TrackingFactory(gpu_id=GPU_ID) - - # Create stream decoders and tracking controllers - stream_factory = StreamDecoderFactory(gpu_id=GPU_ID) - decoders = [] - controllers = [] - - for i, url in enumerate(camera_urls): - # Create decoder - decoder = stream_factory.create_decoder(url, buffer_size=30) - decoder.start() - decoders.append(decoder) - - # Create tracking controller - controller = tracking_factory.create_controller( - model_repository=model_repo, - model_id="detector", - tracker_type="iou", - max_age=30, - min_confidence=0.5 - ) - controllers.append(controller) - - print(f"Camera {i+1}: {url}") - - print(f"\nWaiting for streams to connect...") - time.sleep(10) - - # Track objects for 30 seconds - print(f"\nTracking objects across {len(camera_urls)} cameras...") - start_time = time.time() - - try: - while time.time() - start_time < 30: - for i, (decoder, controller) in enumerate(zip(decoders, controllers)): - frame = decoder.get_latest_frame(rgb=True) - - if frame is not None: - tracked_objects = controller.track(frame) - - # Print stats every 10 seconds - if int(time.time() - start_time) % 10 == 0: - stats = controller.get_statistics() - print(f"Camera {i+1}: {stats['active_tracks']} tracks, " - f"{stats['frame_count']} frames") - - time.sleep(0.1) - - except KeyboardInterrupt: - print("\nInterrupted by user") - - # Cleanup - print("\nCleaning up...") - for decoder in decoders: - decoder.stop() - - # Print final stats - print("\nFinal Statistics:") - for i, controller in enumerate(controllers): - stats = controller.get_statistics() - print(f"\nCamera {i+1}:") - print(f" Frames: {stats['frame_count']}") - print(f" Tracks created: {stats['total_tracks_created']}") - print(f" Active tracks: {stats['active_tracks']}") - - # Print model repository stats - print("\nModel Repository Stats:") - repo_stats = model_repo.get_stats() - for key, value in repo_stats.items(): - print(f" {key}: {value}") - - -if __name__ == "__main__": - # Run single camera test - main() - - # Uncomment to test multi-camera tracking - # test_multi_camera_tracking()