diff --git a/test_fps_benchmark.py b/test_fps_benchmark.py deleted file mode 100644 index 3e35d6f..0000000 --- a/test_fps_benchmark.py +++ /dev/null @@ -1,340 +0,0 @@ -""" -FPS Benchmark Test for Single vs Multi-Camera Tracking - -This script benchmarks the FPS performance of: -1. Single camera tracking -2. Multi-camera tracking (2+ cameras) - -Usage: - python test_fps_benchmark.py -""" - -import time -import os -from dotenv import load_dotenv -from services import ( - StreamDecoderFactory, - TensorRTModelRepository, - TrackingFactory, - YOLOv8Utils, - COCO_CLASSES, -) - -load_dotenv() - - -def benchmark_single_camera(duration=30): - """ - Benchmark single camera tracking performance. - - Args: - duration: Test duration in seconds - - Returns: - Dictionary with FPS statistics - """ - print("\n" + "=" * 80) - print("SINGLE CAMERA BENCHMARK") - print("=" * 80) - - GPU_ID = 0 - MODEL_PATH = "models/yolov8n.trt" - RTSP_URL = os.getenv('CAMERA_URL_1', 'rtsp://localhost:8554/test') - - # Initialize components - print("\nInitializing...") - model_repo = TensorRTModelRepository(gpu_id=GPU_ID, default_num_contexts=4) - model_repo.load_model("detector", MODEL_PATH, num_contexts=4) - - tracking_factory = TrackingFactory(gpu_id=GPU_ID) - controller = tracking_factory.create_controller( - model_repository=model_repo, - model_id="detector", - tracker_type="iou", - max_age=30, - min_confidence=0.5, - iou_threshold=0.3, - class_names=COCO_CLASSES - ) - - stream_factory = StreamDecoderFactory(gpu_id=GPU_ID) - decoder = stream_factory.create_decoder(RTSP_URL, buffer_size=30) - decoder.start() - - print("Waiting for stream connection...") - time.sleep(5) - - if not decoder.is_connected(): - print("⚠ Stream not connected, results may be inaccurate") - - # Benchmark - print(f"\nRunning benchmark for {duration} seconds...") - frame_count = 0 - start_time = time.time() - - fps_samples = [] - sample_start = time.time() - sample_frames = 0 - - try: - while time.time() - start_time < duration: - frame_gpu = decoder.get_latest_frame(rgb=True) - - if frame_gpu is None: - time.sleep(0.001) - continue - - # Run tracking - tracked_objects = controller.track( - frame_gpu, - preprocess_fn=YOLOv8Utils.preprocess, - postprocess_fn=YOLOv8Utils.postprocess - ) - - frame_count += 1 - sample_frames += 1 - - # Sample FPS every second - if time.time() - sample_start >= 1.0: - fps = sample_frames / (time.time() - sample_start) - fps_samples.append(fps) - sample_frames = 0 - sample_start = time.time() - print(f" Current FPS: {fps:.2f}") - - except KeyboardInterrupt: - print("\nBenchmark interrupted") - - # Calculate statistics - total_time = time.time() - start_time - avg_fps = frame_count / total_time - - # Cleanup - decoder.stop() - - stats = { - 'total_frames': frame_count, - 'total_time': total_time, - 'avg_fps': avg_fps, - 'min_fps': min(fps_samples) if fps_samples else 0, - 'max_fps': max(fps_samples) if fps_samples else 0, - 'samples': fps_samples - } - - print("\n" + "-" * 80) - print(f"Total Frames: {stats['total_frames']}") - print(f"Total Time: {stats['total_time']:.2f} seconds") - print(f"Average FPS: {stats['avg_fps']:.2f}") - print(f"Min FPS: {stats['min_fps']:.2f}") - print(f"Max FPS: {stats['max_fps']:.2f}") - print("-" * 80) - - return stats - - -def benchmark_multi_camera(duration=30): - """ - Benchmark multi-camera tracking performance. - - Args: - duration: Test duration in seconds - - Returns: - Dictionary with FPS statistics per camera - """ - print("\n" + "=" * 80) - print("MULTI-CAMERA BENCHMARK") - print("=" * 80) - - GPU_ID = 0 - MODEL_PATH = "models/yolov8n.trt" - - # Load camera URLs - camera_urls = [] - i = 1 - while True: - url = os.getenv(f'CAMERA_URL_{i}') - if url: - camera_urls.append(url) - i += 1 - else: - break - - if len(camera_urls) < 2: - print("⚠ Need at least 2 cameras for multi-camera test") - print(f" Found only {len(camera_urls)} camera(s) in .env") - return None - - print(f"\nTesting with {len(camera_urls)} cameras") - - # Initialize components - print("\nInitializing...") - model_repo = TensorRTModelRepository(gpu_id=GPU_ID, default_num_contexts=8) - model_repo.load_model("detector", MODEL_PATH, num_contexts=8) - - tracking_factory = TrackingFactory(gpu_id=GPU_ID) - stream_factory = StreamDecoderFactory(gpu_id=GPU_ID) - - decoders = [] - controllers = [] - - for i, url in enumerate(camera_urls): - # Create decoder - decoder = stream_factory.create_decoder(url, buffer_size=30) - decoder.start() - decoders.append(decoder) - - # Create controller - controller = tracking_factory.create_controller( - model_repository=model_repo, - model_id="detector", - tracker_type="iou", - max_age=30, - min_confidence=0.5, - iou_threshold=0.3, - class_names=COCO_CLASSES - ) - controllers.append(controller) - - print(f" Camera {i+1}: {url}") - - print("\nWaiting for streams to connect...") - time.sleep(10) - - # Benchmark - print(f"\nRunning benchmark for {duration} seconds...") - - frame_counts = [0] * len(camera_urls) - fps_samples = [[] for _ in camera_urls] - sample_starts = [time.time()] * len(camera_urls) - sample_frames = [0] * len(camera_urls) - - start_time = time.time() - - try: - while time.time() - start_time < duration: - for i, (decoder, controller) in enumerate(zip(decoders, controllers)): - frame_gpu = decoder.get_latest_frame(rgb=True) - - if frame_gpu is None: - continue - - # Run tracking - tracked_objects = controller.track( - frame_gpu, - preprocess_fn=YOLOv8Utils.preprocess, - postprocess_fn=YOLOv8Utils.postprocess - ) - - frame_counts[i] += 1 - sample_frames[i] += 1 - - # Sample FPS every second - if time.time() - sample_starts[i] >= 1.0: - fps = sample_frames[i] / (time.time() - sample_starts[i]) - fps_samples[i].append(fps) - sample_frames[i] = 0 - sample_starts[i] = time.time() - - except KeyboardInterrupt: - print("\nBenchmark interrupted") - - # Calculate statistics - total_time = time.time() - start_time - - # Cleanup - for decoder in decoders: - decoder.stop() - - # Compile results - results = {} - total_frames = 0 - - print("\n" + "-" * 80) - for i in range(len(camera_urls)): - avg_fps = frame_counts[i] / total_time if total_time > 0 else 0 - total_frames += frame_counts[i] - - cam_stats = { - 'total_frames': frame_counts[i], - 'avg_fps': avg_fps, - 'min_fps': min(fps_samples[i]) if fps_samples[i] else 0, - 'max_fps': max(fps_samples[i]) if fps_samples[i] else 0, - } - - results[f'camera_{i+1}'] = cam_stats - - print(f"Camera {i+1}:") - print(f" Total Frames: {cam_stats['total_frames']}") - print(f" Average FPS: {cam_stats['avg_fps']:.2f}") - print(f" Min FPS: {cam_stats['min_fps']:.2f}") - print(f" Max FPS: {cam_stats['max_fps']:.2f}") - print() - - # Combined stats - combined_avg_fps = total_frames / total_time if total_time > 0 else 0 - - print("-" * 80) - print(f"COMBINED:") - print(f" Total Frames (all cameras): {total_frames}") - print(f" Total Time: {total_time:.2f} seconds") - print(f" Combined Throughput: {combined_avg_fps:.2f} FPS") - print(f" Per-Camera Average: {combined_avg_fps / len(camera_urls):.2f} FPS") - print("-" * 80) - - results['combined'] = { - 'total_frames': total_frames, - 'total_time': total_time, - 'combined_fps': combined_avg_fps, - 'per_camera_avg': combined_avg_fps / len(camera_urls) - } - - return results - - -def main(): - """Run both benchmarks and compare.""" - print("=" * 80) - print("FPS BENCHMARK: Single vs Multi-Camera Tracking") - print("=" * 80) - - # Run single camera benchmark - single_stats = benchmark_single_camera(duration=30) - - # Run multi-camera benchmark - multi_stats = benchmark_multi_camera(duration=30) - - # Comparison - if multi_stats: - print("\n" + "=" * 80) - print("COMPARISON") - print("=" * 80) - - print(f"\nSingle Camera Performance:") - print(f" Average FPS: {single_stats['avg_fps']:.2f}") - - print(f"\nMulti-Camera Performance:") - print(f" Per-Camera Average: {multi_stats['combined']['per_camera_avg']:.2f} FPS") - print(f" Combined Throughput: {multi_stats['combined']['combined_fps']:.2f} FPS") - - # Calculate performance drop - fps_drop = ((single_stats['avg_fps'] - multi_stats['combined']['per_camera_avg']) - / single_stats['avg_fps'] * 100) - - print(f"\nPerformance Analysis:") - print(f" FPS Drop per Camera: {fps_drop:.1f}%") - - if fps_drop < 10: - print(" ✓ Excellent - Minimal performance impact") - elif fps_drop < 25: - print(" ✓ Good - Acceptable performance scaling") - elif fps_drop < 50: - print(" ⚠ Moderate - Some performance degradation") - else: - print(" ⚠ Significant - Consider optimizations") - - print("=" * 80) - - -if __name__ == "__main__": - main() diff --git a/test_inference.py b/test_inference.py deleted file mode 100644 index b54c2fc..0000000 --- a/test_inference.py +++ /dev/null @@ -1,189 +0,0 @@ - -import time -import torch -import os -from dotenv import load_dotenv -from services.model_repository import TensorRTModelRepository -from services.stream_decoder import StreamDecoderFactory -import numpy as np - -# COCO class names for YOLOv8 -COCO_CLASSES = [ - 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', 'boat', 'traffic light', - 'fire hydrant', 'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow', - 'elephant', 'bear', 'zebra', 'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee', - 'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard', - 'tennis racket', 'bottle', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple', - 'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch', - 'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', - 'cell phone', 'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase', - 'scissors', 'teddy bear', 'hair drier', 'toothbrush' -] - -def postprocess(output, confidence_threshold=0.25, iou_threshold=0.45): - """ - Post-processes the output of a YOLOv8 model to extract bounding boxes, scores, and class IDs. - """ - # output shape: (batch_size, 84, 8400) - # 84 = 4 (bbox) + 80 (classes) - - # Transpose the output to (batch_size, 8400, 84) - output = output.transpose(1, 2) - - boxes = [] - scores = [] - class_ids = [] - - for detection in output[0]: - # First 4 values are bbox (cx, cy, w, h) - # The rest are class scores - - class_scores = detection[4:] - max_score, max_class_id = torch.max(class_scores, 0) - - if max_score > confidence_threshold: - - cx, cy, w, h = detection[:4] - - # Convert from center-width-height to x1-y1-x2-y2 - x1 = cx - w / 2 - y1 = cy - h / 2 - x2 = cx + w / 2 - y2 = cy + h / 2 - - boxes.append([x1.item(), y1.item(), x2.item(), y2.item()]) - scores.append(max_score.item()) - class_ids.append(max_class_id.item()) - - if not boxes: - return [], [], [] - - # Perform Non-Maximum Suppression (NMS) - # This is a simplified version. For production, use a library like torchvision.ops.nms - indices = [] - boxes_np = np.array(boxes) - scores_np = np.array(scores) - - order = scores_np.argsort()[::-1] - - while order.size > 0: - i = order[0] - indices.append(i) - - xx1 = np.maximum(boxes_np[i, 0], boxes_np[order[1:], 0]) - yy1 = np.maximum(boxes_np[i, 1], boxes_np[order[1:], 1]) - xx2 = np.minimum(boxes_np[i, 2], boxes_np[order[1:], 2]) - yy2 = np.minimum(boxes_np[i, 3], boxes_np[order[1:], 3]) - - w = np.maximum(0.0, xx2 - xx1 + 1) - h = np.maximum(0.0, yy2 - yy1 + 1) - inter = w * h - - ovr = inter / ((boxes_np[i, 2] - boxes_np[i, 0] + 1) * (boxes_np[i, 3] - boxes_np[i, 1] + 1) + \ - (boxes_np[order[1:], 2] - boxes_np[order[1:], 0] + 1) * \ - (boxes_np[order[1:], 3] - boxes_np[order[1:], 1] + 1) - inter) - - inds = np.where(ovr <= iou_threshold)[0] - order = order[inds + 1] - - final_boxes = [boxes[i] for i in indices] - final_scores = [scores[i] for i in indices] - final_class_ids = [class_ids[i] for i in indices] - - return final_boxes, final_scores, final_class_ids - - -def test_rtsp_stream_with_inference(): - """ - Decodes an RTSP stream and runs inference, printing bounding boxes and class names. - """ - load_dotenv() - rtsp_url = os.getenv("CAMERA_URL_1") - if not rtsp_url: - print("Error: CAMERA_URL_1 not found in .env file.") - return - - print("=" * 80) - print("RTSP Stream + TensorRT Inference") - print("=" * 80) - - # Initialize components - decoder_factory = StreamDecoderFactory(gpu_id=0) - model_repo = TensorRTModelRepository(gpu_id=0, default_num_contexts=1) - - # Setup camera stream - decoder = decoder_factory.create_decoder(rtsp_url, buffer_size=1) - decoder.start() - - # Load inference model - model_path = "models/yolov8n.trt" - try: - model_repo.load_model( - model_id="camera_main", - file_path=model_path - ) - except Exception as e: - print(f"Error loading model: {e}") - print(f"Please ensure '{model_path}' exists.") - decoder.stop() - return - - print("\nWaiting for stream to buffer frames...") - time.sleep(3) - - try: - while True: - frame_gpu = decoder.get_latest_frame(rgb=True) - - if frame_gpu is None: - time.sleep(0.1) - continue - - # Preprocess frame for YOLOv8 - # Resize to 640x640, normalize, and add batch dimension - frame_float = frame_gpu.unsqueeze(0).float() # Convert to float here - frame_resized = torch.nn.functional.interpolate( - frame_float, size=(640, 640), mode='bilinear', align_corners=False - ) - frame_normalized = frame_resized.float() / 255.0 - - # Run inference - try: - outputs = model_repo.infer( - model_id="camera_main", - inputs={"images": frame_normalized}, - synchronize=True - ) - - # Post-process the output - output_tensor = outputs['output0'] - boxes, scores, class_ids = postprocess(output_tensor) - - # Print results - print(f"\n--- Frame at {time.time():.2f} ---") - if boxes: - for box, score, class_id in zip(boxes, scores, class_ids): - class_name = COCO_CLASSES[class_id] - print( - f" Detected: {class_name} " - f"(confidence: {score:.2f}) at " - f"bbox: [{box[0]:.0f}, {box[1]:.0f}, {box[2]:.0f}, {box[3]:.0f}]" - ) - else: - print(" No objects detected.") - - except Exception as e: - print(f"Inference failed: {e}") - - time.sleep(0.03) # ~30 FPS - - except KeyboardInterrupt: - print("\nStopping...") - finally: - # Cleanup - decoder.stop() - model_repo.unload_model("camera_main") - print("Stream and model unloaded.") - -if __name__ == "__main__": - test_rtsp_stream_with_inference() diff --git a/test_jpeg_encode.py b/test_jpeg_encode.py deleted file mode 100755 index 8e35145..0000000 --- a/test_jpeg_encode.py +++ /dev/null @@ -1,174 +0,0 @@ -#!/usr/bin/env python3 -""" -Test script for JPEG encoding with nvImageCodec -Tests GPU-accelerated JPEG encoding from RTSP stream frames -""" - -import argparse -import sys -import time -import os -from pathlib import Path -from dotenv import load_dotenv -from services import StreamDecoderFactory - -# Load environment variables from .env file -load_dotenv() - - -def main(): - parser = argparse.ArgumentParser(description='Test JPEG encoding from RTSP stream') - parser.add_argument( - '--rtsp-url', - type=str, - default=None, - help='RTSP stream URL (defaults to CAMERA_URL_1 from .env)' - ) - parser.add_argument( - '--output-dir', - type=str, - default='./snapshots', - help='Output directory for JPEG files' - ) - parser.add_argument( - '--num-frames', - type=int, - default=10, - help='Number of frames to capture' - ) - parser.add_argument( - '--interval', - type=float, - default=1.0, - help='Interval between captures in seconds' - ) - parser.add_argument( - '--quality', - type=int, - default=95, - help='JPEG quality (0-100)' - ) - parser.add_argument( - '--gpu-id', - type=int, - default=0, - help='GPU device ID' - ) - - args = parser.parse_args() - - # Get RTSP URL from command line or environment - rtsp_url = args.rtsp_url - if not rtsp_url: - rtsp_url = os.getenv('CAMERA_URL_1') - if not rtsp_url: - print("Error: No RTSP URL provided") - print("Please either:") - print(" 1. Use --rtsp-url argument, or") - print(" 2. Add CAMERA_URL_1 to your .env file") - sys.exit(1) - - # Create output directory - output_dir = Path(args.output_dir) - output_dir.mkdir(parents=True, exist_ok=True) - - print("=" * 80) - print("RTSP Stream JPEG Encoding Test") - print("=" * 80) - print(f"RTSP URL: {rtsp_url}") - print(f"Output Directory: {output_dir}") - print(f"Number of Frames: {args.num_frames}") - print(f"Capture Interval: {args.interval}s") - print(f"JPEG Quality: {args.quality}") - print(f"GPU ID: {args.gpu_id}") - print("=" * 80) - print() - - try: - # Initialize factory and decoder - print("[1/3] Initializing StreamDecoderFactory...") - factory = StreamDecoderFactory(gpu_id=args.gpu_id) - print("✓ Factory initialized\n") - - print("[2/3] Creating and starting decoder...") - decoder = factory.create_decoder( - rtsp_url=rtsp_url, - buffer_size=30 - ) - decoder.start() - print("✓ Decoder started\n") - - # Wait for connection - print("[3/3] Waiting for stream to connect...") - max_wait = 10 - for i in range(max_wait): - if decoder.is_connected(): - print("✓ Stream connected\n") - break - time.sleep(1) - print(f" Waiting... {i+1}/{max_wait}s") - else: - print("✗ Failed to connect to stream") - sys.exit(1) - - # Capture frames - print(f"Capturing {args.num_frames} frames...") - print("-" * 80) - - captured = 0 - for i in range(args.num_frames): - # Get frame as JPEG - start_time = time.time() - jpeg_bytes = decoder.get_frame_as_jpeg(quality=args.quality) - encode_time = (time.time() - start_time) * 1000 # ms - - if jpeg_bytes: - # Save to file - filename = output_dir / f"frame_{i:04d}.jpg" - with open(filename, 'wb') as f: - f.write(jpeg_bytes) - - size_kb = len(jpeg_bytes) / 1024 - print(f"[{i+1}/{args.num_frames}] Saved {filename.name} " - f"({size_kb:.1f} KB, encoded in {encode_time:.2f}ms)") - captured += 1 - else: - print(f"[{i+1}/{args.num_frames}] Failed to get frame") - - # Wait before next capture (except for last frame) - if i < args.num_frames - 1: - time.sleep(args.interval) - - print("-" * 80) - - # Summary - print("\n" + "=" * 80) - print("Capture Complete") - print("=" * 80) - print(f"Successfully captured: {captured}/{args.num_frames} frames") - print(f"Output directory: {output_dir.absolute()}") - print("=" * 80) - - except KeyboardInterrupt: - print("\n\n✗ Interrupted by user") - sys.exit(1) - - except Exception as e: - print(f"\n\n✗ Error: {e}") - import traceback - traceback.print_exc() - sys.exit(1) - - finally: - # Cleanup - if 'decoder' in locals(): - print("\nCleaning up...") - decoder.stop() - print("✓ Decoder stopped") - - print("\n✓ Test completed successfully") - sys.exit(0) - - -if __name__ == '__main__': - main() diff --git a/test_model_inference.py b/test_model_inference.py deleted file mode 100644 index ea6d075..0000000 --- a/test_model_inference.py +++ /dev/null @@ -1,310 +0,0 @@ -""" -Test script for TensorRT Model Repository with multi-camera inference. - -This demonstrates: -1. Loading the same model for multiple cameras (deduplication) -2. Context pool load balancing -3. GPU-to-GPU inference from RTSP streams -4. Memory efficiency with shared engines -""" - -import time -import torch -from services.model_repository import TensorRTModelRepository -from services.stream_decoder import StreamDecoderFactory - - -def test_multi_camera_inference(): - """ - Simulate multi-camera inference scenario. - - Example: 100 cameras, all using the same YOLOv8 model - - Without pooling: 100 engines + 100 contexts in VRAM - - With pooling: 1 engine + 4 contexts in VRAM (huge savings!) - """ - - # Initialize model repository with context pooling - repo = TensorRTModelRepository(gpu_id=0, default_num_contexts=4) - - # Camera configurations (simulated) - camera_configs = [ - {"id": "camera_1", "rtsp_url": "rtsp://camera1.local/stream"}, - {"id": "camera_2", "rtsp_url": "rtsp://camera2.local/stream"}, - {"id": "camera_3", "rtsp_url": "rtsp://camera3.local/stream"}, - # ... imagine 100 cameras here - ] - - # Load the same model for all cameras - model_file = "models/yolov8n.trt" # Same file for all cameras - - print("=" * 80) - print("LOADING MODELS FOR MULTIPLE CAMERAS") - print("=" * 80) - - for config in camera_configs: - try: - # Each camera gets its own model_id, but shares the same engine! - metadata = repo.load_model( - model_id=config["id"], - file_path=model_file, - num_contexts=4 # 4 contexts shared across all cameras - ) - print(f"\n✓ Loaded model for {config['id']}") - except Exception as e: - print(f"\n✗ Failed to load model for {config['id']}: {e}") - - # Show repository stats - print("\n" + "=" * 80) - print("REPOSITORY STATISTICS") - print("=" * 80) - stats = repo.get_stats() - print(f"Total model IDs: {stats['total_model_ids']}") - print(f"Unique engines in VRAM: {stats['unique_engines']}") - print(f"Total contexts: {stats['total_contexts']}") - print(f"Memory efficiency: {stats['memory_efficiency']}") - - # Get detailed info for one camera - print("\n" + "=" * 80) - print("DETAILED MODEL INFO (camera_1)") - print("=" * 80) - info = repo.get_model_info("camera_1") - if info: - print(f"Model ID: {info['model_id']}") - print(f"File: {info['file_path']}") - print(f"File hash: {info['file_hash']}") - print(f"Engine references: {info['engine_references']}") - print(f"Context pool size: {info['context_pool_size']}") - print(f"Shared with: {info['shared_with_model_ids']}") - print(f"\nInputs:") - for name, spec in info['inputs'].items(): - print(f" {name}: {spec['shape']} ({spec['dtype']})") - print(f"\nOutputs:") - for name, spec in info['outputs'].items(): - print(f" {name}: {spec['shape']} ({spec['dtype']})") - - # Simulate inference from multiple cameras - print("\n" + "=" * 80) - print("RUNNING INFERENCE (GPU-to-GPU)") - print("=" * 80) - - # Create dummy input tensors (simulating frames from cameras) - # In real scenario, these come from StreamDecoder.get_frame() - batch_size = 1 - channels = 3 - height = 640 - width = 640 - - for config in camera_configs: - try: - # Simulate getting frame from camera (already on GPU) - input_tensor = torch.rand( - batch_size, channels, height, width, - dtype=torch.float32, - device='cuda:0' - ) - - # Run inference (stays in GPU) - start = time.time() - outputs = repo.infer( - model_id=config["id"], - inputs={"images": input_tensor}, # Adjust input name based on your model - synchronize=True, - timeout=5.0 - ) - elapsed = (time.time() - start) * 1000 # Convert to ms - - print(f"\n{config['id']}: Inference completed in {elapsed:.2f}ms") - for name, tensor in outputs.items(): - print(f" Output '{name}': {tensor.shape} on {tensor.device}") - - except Exception as e: - print(f"\n{config['id']}: Inference failed: {e}") - - # Cleanup - print("\n" + "=" * 80) - print("CLEANUP") - print("=" * 80) - - for config in camera_configs: - repo.unload_model(config["id"]) - - print("\nAll models unloaded.") - - -def test_rtsp_stream_with_inference(): - """ - Real-world example: Decode RTSP stream and run inference. - Everything stays in GPU memory (zero CPU transfers). - """ - - print("=" * 80) - print("RTSP STREAM + TENSORRT INFERENCE (GPU-to-GPU)") - print("=" * 80) - - # Initialize components - decoder_factory = StreamDecoderFactory(gpu_id=0) - model_repo = TensorRTModelRepository(gpu_id=0, default_num_contexts=4) - - # Setup camera stream - rtsp_url = "rtsp://your-camera-ip/stream" - decoder = decoder_factory.create_decoder(rtsp_url, buffer_size=30) - decoder.start() - - # Load inference model - try: - model_repo.load_model( - model_id="camera_main", - file_path="models/yolov8n.trt" - ) - except FileNotFoundError: - print("\n⚠ Model file not found. Please export your model to TensorRT:") - print(" Example: yolo export model=yolov8n.pt format=engine device=0") - return - - print("\nWaiting for stream to buffer frames...") - time.sleep(3) - - # Process frames - for i in range(10): - # Get frame from decoder (already on GPU) - frame_gpu = decoder.get_latest_frame(rgb=True) # Returns torch.Tensor on CUDA - - if frame_gpu is None: - print(f"Frame {i}: No frame available") - continue - - # Preprocess if needed (stays on GPU) - # For YOLOv8: normalize, resize, etc. - # Example preprocessing (adjust for your model): - frame_gpu = frame_gpu.float() / 255.0 # Normalize to [0, 1] - frame_gpu = frame_gpu.unsqueeze(0) # Add batch dimension: (1, 3, H, W) - - # Run inference (GPU-to-GPU, zero copy) - try: - outputs = model_repo.infer( - model_id="camera_main", - inputs={"images": frame_gpu}, - synchronize=True - ) - - print(f"\nFrame {i}: Inference successful") - for name, tensor in outputs.items(): - print(f" {name}: {tensor.shape} on {tensor.device}") - - # Post-process results (can stay on GPU or move to CPU as needed) - # Example: NMS, bounding box extraction, etc. - - except Exception as e: - print(f"\nFrame {i}: Inference failed: {e}") - - time.sleep(0.1) # Simulate processing interval - - # Cleanup - decoder.stop() - model_repo.unload_model("camera_main") - print("\n✓ Test completed successfully") - - -def test_concurrent_inference(): - """ - Test concurrent inference from multiple threads. - Demonstrates context pool load balancing. - """ - import threading - - print("=" * 80) - print("CONCURRENT INFERENCE TEST (Context Pool Load Balancing)") - print("=" * 80) - - repo = TensorRTModelRepository(gpu_id=0, default_num_contexts=4) - - # Load model - try: - repo.load_model("shared_model", "models/yolov8n.trt", num_contexts=4) - except Exception as e: - print(f"Failed to load model: {e}") - return - - def worker(worker_id: int, num_inferences: int): - """Worker thread performing inference""" - for i in range(num_inferences): - try: - # Create dummy input - input_tensor = torch.rand(1, 3, 640, 640, device='cuda:0', dtype=torch.float32) - - # Acquire context from pool, run inference, release context - outputs = repo.infer( - model_id="shared_model", - inputs={"images": input_tensor}, - timeout=10.0 - ) - - print(f"Worker {worker_id}, Inference {i}: SUCCESS") - - except Exception as e: - print(f"Worker {worker_id}, Inference {i}: FAILED - {e}") - - time.sleep(0.01) # Small delay - - # Launch multiple worker threads (more workers than contexts!) - threads = [] - num_workers = 10 # 10 workers sharing 4 contexts - inferences_per_worker = 5 - - print(f"\nLaunching {num_workers} workers (only 4 contexts available)") - print("Contexts will be borrowed/returned automatically\n") - - start_time = time.time() - - for worker_id in range(num_workers): - t = threading.Thread(target=worker, args=(worker_id, inferences_per_worker)) - threads.append(t) - t.start() - - # Wait for all workers - for t in threads: - t.join() - - elapsed = time.time() - start_time - total_inferences = num_workers * inferences_per_worker - - print(f"\n✓ Completed {total_inferences} inferences in {elapsed:.2f}s") - print(f" Throughput: {total_inferences / elapsed:.2f} inferences/sec") - print(f" With only 4 contexts for {num_workers} workers!") - - repo.unload_model("shared_model") - - -if __name__ == "__main__": - print("\n" + "=" * 80) - print("TENSORRT MODEL REPOSITORY - TEST SUITE") - print("=" * 80) - - # Test 1: Multi-camera model loading - print("\n\nTEST 1: Multi-Camera Model Loading with Deduplication") - print("-" * 80) - try: - test_multi_camera_inference() - except Exception as e: - print(f"Test 1 failed: {e}") - - # Test 2: RTSP stream + inference (commented out by default) - # Uncomment if you have a real RTSP stream - # print("\n\nTEST 2: RTSP Stream + Inference") - # print("-" * 80) - # try: - # test_rtsp_stream_with_inference() - # except Exception as e: - # print(f"Test 2 failed: {e}") - - # Test 3: Concurrent inference - print("\n\nTEST 3: Concurrent Inference with Context Pooling") - print("-" * 80) - try: - test_concurrent_inference() - except Exception as e: - print(f"Test 3 failed: {e}") - - print("\n" + "=" * 80) - print("ALL TESTS COMPLETED") - print("=" * 80) diff --git a/test_multi_stream.py b/test_multi_stream.py deleted file mode 100755 index 71bc6b4..0000000 --- a/test_multi_stream.py +++ /dev/null @@ -1,255 +0,0 @@ -#!/usr/bin/env python3 -""" -Multi-stream test script to verify CUDA context sharing efficiency. -Tests multiple RTSP streams simultaneously and monitors VRAM usage. -""" - -import argparse -import time -import sys -import subprocess -import os -from pathlib import Path -from dotenv import load_dotenv -from services import StreamDecoderFactory, ConnectionStatus - -# Load environment variables from .env file -load_dotenv() - - -def get_gpu_memory_usage(gpu_id: int = 0) -> int: - """Get current GPU memory usage in MB using nvidia-smi""" - try: - result = subprocess.run( - ['nvidia-smi', '--query-gpu=memory.used', '--format=csv,noheader,nounits', f'--id={gpu_id}'], - capture_output=True, - text=True, - check=True - ) - return int(result.stdout.strip()) - except Exception as e: - print(f"Warning: Could not get GPU memory usage: {e}") - return 0 - - -def main(): - parser = argparse.ArgumentParser(description='Test multi-stream decoding with context sharing') - parser.add_argument( - '--gpu-id', - type=int, - default=0, - help='GPU device ID' - ) - parser.add_argument( - '--duration', - type=int, - default=20, - help='Test duration in seconds' - ) - parser.add_argument( - '--capture-snapshots', - action='store_true', - help='Capture JPEG snapshots during test' - ) - parser.add_argument( - '--output-dir', - type=str, - default='./multi_stream_snapshots', - help='Output directory for snapshots' - ) - - args = parser.parse_args() - - # Load camera URLs from environment - camera_urls = [] - i = 1 - while True: - url = os.getenv(f'CAMERA_URL_{i}') - if url: - camera_urls.append(url) - i += 1 - else: - break - - if not camera_urls: - print("Error: No camera URLs found in .env file") - print("Please add CAMERA_URL_1, CAMERA_URL_2, etc. to your .env file") - sys.exit(1) - - # Create output directory if capturing snapshots - if args.capture_snapshots: - output_dir = Path(args.output_dir) - output_dir.mkdir(parents=True, exist_ok=True) - - print("=" * 80) - print("Multi-Stream RTSP Decoder Test - Context Sharing Verification") - print("=" * 80) - print(f"Number of Streams: {len(camera_urls)}") - print(f"GPU ID: {args.gpu_id}") - print(f"Test Duration: {args.duration} seconds") - print(f"Capture Snapshots: {args.capture_snapshots}") - print("=" * 80) - print() - - try: - # Get baseline GPU memory - print("[Baseline] Measuring initial GPU memory usage...") - baseline_memory = get_gpu_memory_usage(args.gpu_id) - print(f"✓ Baseline VRAM: {baseline_memory} MB\n") - - # Initialize factory (shared CUDA context) - print("[1/4] Initializing StreamDecoderFactory with shared CUDA context...") - factory = StreamDecoderFactory(gpu_id=args.gpu_id) - - factory_memory = get_gpu_memory_usage(args.gpu_id) - factory_overhead = factory_memory - baseline_memory - print(f"✓ Factory initialized") - print(f" VRAM after factory: {factory_memory} MB (+{factory_overhead} MB)\n") - - # Create all decoders - print(f"[2/4] Creating {len(camera_urls)} StreamDecoder instances...") - decoders = [] - for i, url in enumerate(camera_urls): - decoder = factory.create_decoder( - rtsp_url=url, - buffer_size=30, - codec='h264' - ) - decoders.append(decoder) - print(f" ✓ Decoder {i+1} created for camera {url.split('@')[1].split('/')[0]}") - - decoders_memory = get_gpu_memory_usage(args.gpu_id) - decoders_overhead = decoders_memory - factory_memory - print(f"\n VRAM after creating {len(decoders)} decoders: {decoders_memory} MB (+{decoders_overhead} MB)") - print(f" Average per decoder: {decoders_overhead / len(decoders):.1f} MB\n") - - # Start all decoders - print(f"[3/4] Starting all {len(decoders)} decoders...") - for i, decoder in enumerate(decoders): - decoder.start() - print(f" ✓ Decoder {i+1} started") - - started_memory = get_gpu_memory_usage(args.gpu_id) - started_overhead = started_memory - decoders_memory - print(f"\n VRAM after starting decoders: {started_memory} MB (+{started_overhead} MB)") - print(f" Average per running decoder: {started_overhead / len(decoders):.1f} MB\n") - - # Wait for all streams to connect - print("[4/4] Waiting for all streams to connect...") - max_wait = 15 - for wait_time in range(max_wait): - connected = sum(1 for d in decoders if d.is_connected()) - print(f" Connected: {connected}/{len(decoders)} streams", end='\r') - - if connected == len(decoders): - print(f"\n✓ All {len(decoders)} streams connected!\n") - break - - time.sleep(1) - else: - connected = sum(1 for d in decoders if d.is_connected()) - print(f"\n⚠ Only {connected}/{len(decoders)} streams connected after {max_wait}s\n") - - connected_memory = get_gpu_memory_usage(args.gpu_id) - connected_overhead = connected_memory - started_memory - print(f" VRAM after connection: {connected_memory} MB (+{connected_overhead} MB)\n") - - # Monitor streams - print(f"Monitoring streams for {args.duration} seconds...") - print("=" * 80) - print(f"{'Time':<8} {'VRAM':<10} {'Stream 1':<12} {'Stream 2':<12} {'Stream 3':<12} {'Stream 4':<12}") - print("-" * 80) - - start_time = time.time() - snapshot_interval = args.duration // 3 if args.capture_snapshots else 0 - last_snapshot = 0 - - while time.time() - start_time < args.duration: - elapsed = time.time() - start_time - current_memory = get_gpu_memory_usage(args.gpu_id) - - # Get stats for each decoder - stats = [] - for decoder in decoders: - status = decoder.get_status().value[:8] - buffer = decoder.get_buffer_size() - frames = decoder.frame_count - stats.append(f"{status:8s} {buffer:2d}/30 {frames:4d}") - - print(f"{elapsed:6.1f}s {current_memory:6d}MB {stats[0]:<12} {stats[1]:<12} {stats[2]:<12} {stats[3]:<12}") - - # Capture snapshots - if args.capture_snapshots and snapshot_interval > 0: - if elapsed - last_snapshot >= snapshot_interval: - print("\n → Capturing snapshots from all streams...") - for i, decoder in enumerate(decoders): - jpeg_bytes = decoder.get_frame_as_jpeg(quality=85) - if jpeg_bytes: - filename = output_dir / f"camera_{i+1}_t{int(elapsed)}s.jpg" - with open(filename, 'wb') as f: - f.write(jpeg_bytes) - print(f" Saved {filename.name} ({len(jpeg_bytes)/1024:.1f} KB)") - print() - last_snapshot = elapsed - - time.sleep(1) - - print("=" * 80) - - # Final memory analysis - final_memory = get_gpu_memory_usage(args.gpu_id) - total_overhead = final_memory - baseline_memory - - print("\n" + "=" * 80) - print("Memory Usage Analysis") - print("=" * 80) - print(f"Baseline VRAM: {baseline_memory:6d} MB") - print(f"After Factory Init: {factory_memory:6d} MB (+{factory_overhead:4d} MB)") - print(f"After Creating {len(decoders)} Decoders: {decoders_memory:6d} MB (+{decoders_overhead:4d} MB)") - print(f"After Starting Decoders: {started_memory:6d} MB (+{started_overhead:4d} MB)") - print(f"After Connection: {connected_memory:6d} MB (+{connected_overhead:4d} MB)") - print(f"Final (after {args.duration}s): {final_memory:6d} MB (+{total_overhead:4d} MB total)") - print("-" * 80) - print(f"Average VRAM per stream: {total_overhead / len(decoders):6.1f} MB") - print(f"Context sharing efficiency: {'EXCELLENT' if total_overhead < 500 else 'GOOD' if total_overhead < 800 else 'POOR'}") - print("=" * 80) - - # Final stats - print("\nFinal Stream Statistics:") - print("-" * 80) - for i, decoder in enumerate(decoders): - status = decoder.get_status().value - buffer = decoder.get_buffer_size() - frames = decoder.frame_count - fps = frames / args.duration if args.duration > 0 else 0 - print(f"Stream {i+1}: {status:12s} | Buffer: {buffer:2d}/{decoder.buffer_size} | " - f"Frames: {frames:5d} | Avg FPS: {fps:5.2f}") - print("=" * 80) - - except KeyboardInterrupt: - print("\n\n✗ Interrupted by user") - sys.exit(1) - - except Exception as e: - print(f"\n\n✗ Error: {e}") - import traceback - traceback.print_exc() - sys.exit(1) - - finally: - # Cleanup - if 'decoders' in locals(): - print("\nCleaning up...") - for i, decoder in enumerate(decoders): - decoder.stop() - print(f" ✓ Decoder {i+1} stopped") - - cleanup_memory = get_gpu_memory_usage(args.gpu_id) - print(f"\nVRAM after cleanup: {cleanup_memory} MB") - - print("\n✓ Multi-stream test completed successfully") - sys.exit(0) - - -if __name__ == '__main__': - main() diff --git a/test_stream.py b/test_stream.py deleted file mode 100755 index 9fc9b30..0000000 --- a/test_stream.py +++ /dev/null @@ -1,152 +0,0 @@ -#!/usr/bin/env python3 -""" -CLI test script for StreamDecoder -Tests RTSP stream decoding with NVDEC hardware acceleration -""" - -import argparse -import time -import sys -from services.stream_decoder import StreamDecoderFactory, ConnectionStatus - - -def main(): - parser = argparse.ArgumentParser(description='Test RTSP stream decoder with NVDEC') - parser.add_argument( - '--rtsp-url', - type=str, - required=True, - help='RTSP stream URL (e.g., rtsp://user:pass@host/path)' - ) - parser.add_argument( - '--gpu-id', - type=int, - default=0, - help='GPU device ID' - ) - parser.add_argument( - '--buffer-size', - type=int, - default=30, - help='Frame buffer size' - ) - parser.add_argument( - '--duration', - type=int, - default=30, - help='Test duration in seconds' - ) - parser.add_argument( - '--check-interval', - type=float, - default=1.0, - help='Status check interval in seconds' - ) - - args = parser.parse_args() - - print("=" * 80) - print("RTSP Stream Decoder Test") - print("=" * 80) - print(f"RTSP URL: {args.rtsp_url}") - print(f"GPU ID: {args.gpu_id}") - print(f"Buffer Size: {args.buffer_size} frames") - print(f"Test Duration: {args.duration} seconds") - print("=" * 80) - print() - - try: - # Create factory with shared CUDA context - print("[1/4] Initializing StreamDecoderFactory...") - factory = StreamDecoderFactory(gpu_id=args.gpu_id) - print("✓ Factory initialized with shared CUDA context\n") - - # Create decoder - print("[2/4] Creating StreamDecoder...") - decoder = factory.create_decoder( - rtsp_url=args.rtsp_url, - buffer_size=args.buffer_size, - codec='h264' - ) - print(f"✓ Decoder created: {decoder}\n") - - # Start decoding - print("[3/4] Starting decoder thread...") - decoder.start() - print("✓ Decoder thread started\n") - - # Monitor for specified duration - print(f"[4/4] Monitoring stream for {args.duration} seconds...") - print("-" * 80) - - start_time = time.time() - last_frame_count = 0 - - while time.time() - start_time < args.duration: - time.sleep(args.check_interval) - - # Get status - status = decoder.get_status() - buffer_size = decoder.get_buffer_size() - frame_count = decoder.frame_count - fps = (frame_count - last_frame_count) / args.check_interval - last_frame_count = frame_count - - # Print status - elapsed = time.time() - start_time - print(f"[{elapsed:6.1f}s] Status: {status.value:12s} | " - f"Buffer: {buffer_size:2d}/{args.buffer_size:2d} | " - f"Frames: {frame_count:5d} | " - f"FPS: {fps:5.1f}") - - # Try to get latest frame - if status == ConnectionStatus.CONNECTED: - frame = decoder.get_latest_frame() - if frame is not None: - print(f" Frame shape: {frame.shape}, dtype: {frame.dtype}, " - f"device: {frame.device}") - - # Check for errors - if status == ConnectionStatus.ERROR: - print("\n✗ ERROR: Stream connection failed!") - break - - print("-" * 80) - - # Final statistics - print("\n" + "=" * 80) - print("Test Complete - Final Statistics") - print("=" * 80) - print(f"Total Frames Decoded: {decoder.frame_count}") - print(f"Average FPS: {decoder.frame_count / args.duration:.2f}") - print(f"Final Status: {decoder.get_status().value}") - print(f"Buffer Utilization: {decoder.get_buffer_size()}/{args.buffer_size}") - - if decoder.frame_width and decoder.frame_height: - print(f"Frame Resolution: {decoder.frame_width}x{decoder.frame_height}") - - print("=" * 80) - - except KeyboardInterrupt: - print("\n\n✗ Interrupted by user") - sys.exit(1) - - except Exception as e: - print(f"\n\n✗ Error: {e}") - import traceback - traceback.print_exc() - sys.exit(1) - - finally: - # Cleanup - if 'decoder' in locals(): - print("\nCleaning up...") - decoder.stop() - print("✓ Decoder stopped") - - print("\n✓ Test completed successfully") - sys.exit(0) - - -if __name__ == '__main__': - main() diff --git a/test_vram_process.py b/test_vram_process.py deleted file mode 100644 index 7cb8761..0000000 --- a/test_vram_process.py +++ /dev/null @@ -1,143 +0,0 @@ -#!/usr/bin/env python3 -""" -VRAM scaling test - measures Python process memory usage for 1, 2, 3, and 4 streams. -""" - -import os -import time -import subprocess -from dotenv import load_dotenv -from services import StreamDecoderFactory - -# Load environment variables from .env file -load_dotenv() - -# Load camera URLs from environment -camera_urls = [] -i = 1 -while True: - url = os.getenv(f'CAMERA_URL_{i}') - if url: - camera_urls.append(url) - i += 1 - else: - break - -if not camera_urls: - print("Error: No camera URLs found in .env file") - print("Please add CAMERA_URL_1, CAMERA_URL_2, etc. to your .env file") - exit(1) - -def get_python_gpu_memory(): - """Get Python process GPU memory usage in MB""" - try: - pid = os.getpid() - result = subprocess.run( - ['nvidia-smi', '--query-compute-apps=pid,used_memory', '--format=csv,noheader,nounits'], - capture_output=True, text=True, check=True - ) - for line in result.stdout.strip().split('\n'): - if line: - parts = line.split(',') - if len(parts) >= 2 and int(parts[0].strip()) == pid: - return int(parts[1].strip()) - return 0 - except: - return 0 - -def test_n_streams(n, wait_time=15): - """Test with n streams""" - print(f"\n{'='*80}") - print(f"Testing with {n} stream(s)") - print('='*80) - - mem_before = get_python_gpu_memory() - print(f"Python process VRAM before: {mem_before} MB") - - # Create factory - factory = StreamDecoderFactory(gpu_id=0) - time.sleep(1) - mem_after_factory = get_python_gpu_memory() - print(f"After factory: {mem_after_factory} MB (+{mem_after_factory - mem_before} MB)") - - # Create decoders - decoders = [] - for i in range(n): - decoder = factory.create_decoder(camera_urls[i], buffer_size=30) - decoders.append(decoder) - - time.sleep(1) - mem_after_create = get_python_gpu_memory() - print(f"After creating {n} decoder(s): {mem_after_create} MB (+{mem_after_create - mem_after_factory} MB)") - - # Start decoders - for decoder in decoders: - decoder.start() - - time.sleep(2) - mem_after_start = get_python_gpu_memory() - print(f"After starting {n} decoder(s): {mem_after_start} MB (+{mem_after_start - mem_after_create} MB)") - - # Wait for connection - print(f"Waiting {wait_time}s for streams to connect and stabilize...") - time.sleep(wait_time) - - # Check connection status - connected = sum(1 for d in decoders if d.is_connected()) - mem_stable = get_python_gpu_memory() - - print(f"Connected: {connected}/{n} streams") - print(f"Python process VRAM (stable): {mem_stable} MB") - - # Get frame stats - for i, decoder in enumerate(decoders): - print(f" Stream {i+1}: {decoder.get_status().value:10s} " - f"Buffer: {decoder.get_buffer_size()}/30 " - f"Frames: {decoder.frame_count}") - - # Cleanup - for decoder in decoders: - decoder.stop() - - time.sleep(2) - mem_after_cleanup = get_python_gpu_memory() - print(f"After cleanup: {mem_after_cleanup} MB") - - return mem_stable - -if __name__ == '__main__': - print("Python VRAM Scaling Test") - print(f"PID: {os.getpid()}") - - baseline = get_python_gpu_memory() - print(f"Baseline Python process VRAM: {baseline} MB\n") - - results = {} - for n in [1, 2, 3, 4]: - mem = test_n_streams(n, wait_time=15) - results[n] = mem - print(f"\n→ {n} stream(s): {mem} MB (process total)") - - # Give time between tests - if n < 4: - print("\nWaiting 5s before next test...") - time.sleep(5) - - # Summary - print("\n" + "="*80) - print("Python Process VRAM Scaling Summary") - print("="*80) - print(f"Baseline: {baseline:4d} MB") - for n in [1, 2, 3, 4]: - total = results[n] - overhead = total - baseline - per_stream = overhead / n if n > 0 else 0 - print(f"{n} stream(s): {total:4d} MB (+{overhead:3d} MB total, {per_stream:5.1f} MB per stream)") - - # Calculate marginal cost - print("\nMarginal cost per additional stream:") - for n in [2, 3, 4]: - marginal = results[n] - results[n-1] - print(f" Stream {n}: +{marginal} MB") - - print("="*80) diff --git a/verify_tensorrt_model.py b/verify_tensorrt_model.py deleted file mode 100644 index f04042f..0000000 --- a/verify_tensorrt_model.py +++ /dev/null @@ -1,85 +0,0 @@ -#!/usr/bin/env python3 -""" -Quick verification script for TensorRT model -""" - -import torch -from services.model_repository import TensorRTModelRepository - -def verify_model(): - print("=" * 80) - print("TensorRT Model Verification") - print("=" * 80) - - # Initialize repository - repo = TensorRTModelRepository(gpu_id=0, default_num_contexts=2) - - # Load the model - print("\nLoading YOLOv8n TensorRT engine...") - try: - metadata = repo.load_model( - model_id="yolov8n_test", - file_path="models/yolov8n.trt", - num_contexts=2 - ) - print("✓ Model loaded successfully!") - except Exception as e: - print(f"✗ Failed to load model: {e}") - return - - # Get model info - print("\n" + "=" * 80) - print("Model Information") - print("=" * 80) - info = repo.get_model_info("yolov8n_test") - if info: - print(f"Model ID: {info['model_id']}") - print(f"File: {info['file_path']}") - print(f"File hash: {info['file_hash']}") - print(f"\nInputs:") - for name, spec in info['inputs'].items(): - print(f" {name}: {spec['shape']} ({spec['dtype']})") - print(f"\nOutputs:") - for name, spec in info['outputs'].items(): - print(f" {name}: {spec['shape']} ({spec['dtype']})") - - # Run test inference - print("\n" + "=" * 80) - print("Running Test Inference") - print("=" * 80) - - try: - # Create dummy input (simulating a 640x640 image) - input_tensor = torch.rand(1, 3, 640, 640, dtype=torch.float32, device='cuda:0') - print(f"Input tensor: {input_tensor.shape} on {input_tensor.device}") - - # Run inference - outputs = repo.infer( - model_id="yolov8n_test", - inputs={"images": input_tensor}, - synchronize=True - ) - - print("\n✓ Inference successful!") - print("\nOutputs:") - for name, tensor in outputs.items(): - print(f" {name}: {tensor.shape} on {tensor.device} ({tensor.dtype})") - - except Exception as e: - print(f"\n✗ Inference failed: {e}") - import traceback - traceback.print_exc() - - # Cleanup - print("\n" + "=" * 80) - print("Cleanup") - print("=" * 80) - repo.unload_model("yolov8n_test") - print("✓ Model unloaded") - - print("\n" + "=" * 80) - print("Verification Complete!") - print("=" * 80) - -if __name__ == "__main__": - verify_model()