""" FPS Benchmark Test for Single vs Multi-Camera Tracking This script benchmarks the FPS performance of: 1. Single camera tracking 2. Multi-camera tracking (2+ cameras) Usage: python test_fps_benchmark.py """ import time import os from dotenv import load_dotenv from services import ( StreamDecoderFactory, TensorRTModelRepository, TrackingFactory, YOLOv8Utils, COCO_CLASSES, ) load_dotenv() def benchmark_single_camera(duration=30): """ Benchmark single camera tracking performance. Args: duration: Test duration in seconds Returns: Dictionary with FPS statistics """ print("\n" + "=" * 80) print("SINGLE CAMERA BENCHMARK") print("=" * 80) GPU_ID = 0 MODEL_PATH = "models/yolov8n.trt" RTSP_URL = os.getenv('CAMERA_URL_1', 'rtsp://localhost:8554/test') # Initialize components print("\nInitializing...") model_repo = TensorRTModelRepository(gpu_id=GPU_ID, default_num_contexts=4) model_repo.load_model("detector", MODEL_PATH, num_contexts=4) tracking_factory = TrackingFactory(gpu_id=GPU_ID) controller = tracking_factory.create_controller( model_repository=model_repo, model_id="detector", tracker_type="iou", max_age=30, min_confidence=0.5, iou_threshold=0.3, class_names=COCO_CLASSES ) stream_factory = StreamDecoderFactory(gpu_id=GPU_ID) decoder = stream_factory.create_decoder(RTSP_URL, buffer_size=30) decoder.start() print("Waiting for stream connection...") time.sleep(5) if not decoder.is_connected(): print("⚠ Stream not connected, results may be inaccurate") # Benchmark print(f"\nRunning benchmark for {duration} seconds...") frame_count = 0 start_time = time.time() fps_samples = [] sample_start = time.time() sample_frames = 0 try: while time.time() - start_time < duration: frame_gpu = decoder.get_latest_frame(rgb=True) if frame_gpu is None: time.sleep(0.001) continue # Run tracking tracked_objects = controller.track( frame_gpu, preprocess_fn=YOLOv8Utils.preprocess, postprocess_fn=YOLOv8Utils.postprocess ) frame_count += 1 sample_frames += 1 # Sample FPS every second if time.time() - sample_start >= 1.0: fps = sample_frames / (time.time() - sample_start) fps_samples.append(fps) sample_frames = 0 sample_start = time.time() print(f" Current FPS: {fps:.2f}") except KeyboardInterrupt: print("\nBenchmark interrupted") # Calculate statistics total_time = time.time() - start_time avg_fps = frame_count / total_time # Cleanup decoder.stop() stats = { 'total_frames': frame_count, 'total_time': total_time, 'avg_fps': avg_fps, 'min_fps': min(fps_samples) if fps_samples else 0, 'max_fps': max(fps_samples) if fps_samples else 0, 'samples': fps_samples } print("\n" + "-" * 80) print(f"Total Frames: {stats['total_frames']}") print(f"Total Time: {stats['total_time']:.2f} seconds") print(f"Average FPS: {stats['avg_fps']:.2f}") print(f"Min FPS: {stats['min_fps']:.2f}") print(f"Max FPS: {stats['max_fps']:.2f}") print("-" * 80) return stats def benchmark_multi_camera(duration=30): """ Benchmark multi-camera tracking performance. Args: duration: Test duration in seconds Returns: Dictionary with FPS statistics per camera """ print("\n" + "=" * 80) print("MULTI-CAMERA BENCHMARK") print("=" * 80) GPU_ID = 0 MODEL_PATH = "models/yolov8n.trt" # Load camera URLs camera_urls = [] i = 1 while True: url = os.getenv(f'CAMERA_URL_{i}') if url: camera_urls.append(url) i += 1 else: break if len(camera_urls) < 2: print("⚠ Need at least 2 cameras for multi-camera test") print(f" Found only {len(camera_urls)} camera(s) in .env") return None print(f"\nTesting with {len(camera_urls)} cameras") # Initialize components print("\nInitializing...") model_repo = TensorRTModelRepository(gpu_id=GPU_ID, default_num_contexts=8) model_repo.load_model("detector", MODEL_PATH, num_contexts=8) tracking_factory = TrackingFactory(gpu_id=GPU_ID) stream_factory = StreamDecoderFactory(gpu_id=GPU_ID) decoders = [] controllers = [] for i, url in enumerate(camera_urls): # Create decoder decoder = stream_factory.create_decoder(url, buffer_size=30) decoder.start() decoders.append(decoder) # Create controller controller = tracking_factory.create_controller( model_repository=model_repo, model_id="detector", tracker_type="iou", max_age=30, min_confidence=0.5, iou_threshold=0.3, class_names=COCO_CLASSES ) controllers.append(controller) print(f" Camera {i+1}: {url}") print("\nWaiting for streams to connect...") time.sleep(10) # Benchmark print(f"\nRunning benchmark for {duration} seconds...") frame_counts = [0] * len(camera_urls) fps_samples = [[] for _ in camera_urls] sample_starts = [time.time()] * len(camera_urls) sample_frames = [0] * len(camera_urls) start_time = time.time() try: while time.time() - start_time < duration: for i, (decoder, controller) in enumerate(zip(decoders, controllers)): frame_gpu = decoder.get_latest_frame(rgb=True) if frame_gpu is None: continue # Run tracking tracked_objects = controller.track( frame_gpu, preprocess_fn=YOLOv8Utils.preprocess, postprocess_fn=YOLOv8Utils.postprocess ) frame_counts[i] += 1 sample_frames[i] += 1 # Sample FPS every second if time.time() - sample_starts[i] >= 1.0: fps = sample_frames[i] / (time.time() - sample_starts[i]) fps_samples[i].append(fps) sample_frames[i] = 0 sample_starts[i] = time.time() except KeyboardInterrupt: print("\nBenchmark interrupted") # Calculate statistics total_time = time.time() - start_time # Cleanup for decoder in decoders: decoder.stop() # Compile results results = {} total_frames = 0 print("\n" + "-" * 80) for i in range(len(camera_urls)): avg_fps = frame_counts[i] / total_time if total_time > 0 else 0 total_frames += frame_counts[i] cam_stats = { 'total_frames': frame_counts[i], 'avg_fps': avg_fps, 'min_fps': min(fps_samples[i]) if fps_samples[i] else 0, 'max_fps': max(fps_samples[i]) if fps_samples[i] else 0, } results[f'camera_{i+1}'] = cam_stats print(f"Camera {i+1}:") print(f" Total Frames: {cam_stats['total_frames']}") print(f" Average FPS: {cam_stats['avg_fps']:.2f}") print(f" Min FPS: {cam_stats['min_fps']:.2f}") print(f" Max FPS: {cam_stats['max_fps']:.2f}") print() # Combined stats combined_avg_fps = total_frames / total_time if total_time > 0 else 0 print("-" * 80) print(f"COMBINED:") print(f" Total Frames (all cameras): {total_frames}") print(f" Total Time: {total_time:.2f} seconds") print(f" Combined Throughput: {combined_avg_fps:.2f} FPS") print(f" Per-Camera Average: {combined_avg_fps / len(camera_urls):.2f} FPS") print("-" * 80) results['combined'] = { 'total_frames': total_frames, 'total_time': total_time, 'combined_fps': combined_avg_fps, 'per_camera_avg': combined_avg_fps / len(camera_urls) } return results def main(): """Run both benchmarks and compare.""" print("=" * 80) print("FPS BENCHMARK: Single vs Multi-Camera Tracking") print("=" * 80) # Run single camera benchmark single_stats = benchmark_single_camera(duration=30) # Run multi-camera benchmark multi_stats = benchmark_multi_camera(duration=30) # Comparison if multi_stats: print("\n" + "=" * 80) print("COMPARISON") print("=" * 80) print(f"\nSingle Camera Performance:") print(f" Average FPS: {single_stats['avg_fps']:.2f}") print(f"\nMulti-Camera Performance:") print(f" Per-Camera Average: {multi_stats['combined']['per_camera_avg']:.2f} FPS") print(f" Combined Throughput: {multi_stats['combined']['combined_fps']:.2f} FPS") # Calculate performance drop fps_drop = ((single_stats['avg_fps'] - multi_stats['combined']['per_camera_avg']) / single_stats['avg_fps'] * 100) print(f"\nPerformance Analysis:") print(f" FPS Drop per Camera: {fps_drop:.1f}%") if fps_drop < 10: print(" ✓ Excellent - Minimal performance impact") elif fps_drop < 25: print(" ✓ Good - Acceptable performance scaling") elif fps_drop < 50: print(" ⚠ Moderate - Some performance degradation") else: print(" ⚠ Significant - Consider optimizations") print("=" * 80) if __name__ == "__main__": main()