commit 3c83a57e4426a5023550fcacbd608205fb76e8d8 Author: Siwat Sirichai Date: Sun Nov 9 00:57:08 2025 +0700 feat: inference subsystem and optimization to decoder diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..805e8de --- /dev/null +++ b/.env.example @@ -0,0 +1,11 @@ +# RTSP Camera URLs +# Add your camera URLs here, one per line with CAMERA_URL_N format + +CAMERA_URL_1=rtsp://user:pass@host/path +CAMERA_URL_2=rtsp://user:pass@host/path +CAMERA_URL_3=rtsp://user:pass@host/path +CAMERA_URL_4=rtsp://user:pass@host/path + +# Add more cameras as needed... +# CAMERA_URL_5=rtsp://user:pass@host/path +# CAMERA_URL_6=rtsp://user:pass@host/path diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9da85c0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +fastapi +__pycache__/ +*.pyc +.env +.claude +models/ \ No newline at end of file diff --git a/app.py b/app.py new file mode 100644 index 0000000..7b3fb12 --- /dev/null +++ b/app.py @@ -0,0 +1,13 @@ +from fastapi import FastAPI + +app = FastAPI() + + +@app.get("/") +async def root(): + return {"message": "Hello World"} + + +@app.get("/health") +async def health_check(): + return {"status": "healthy"} diff --git a/claude.md b/claude.md new file mode 100644 index 0000000..75f833d --- /dev/null +++ b/claude.md @@ -0,0 +1,373 @@ +# GPU-Accelerated RTSP Stream Processing System + +## Project Overview + +A high-performance RTSP stream processing system designed to handle 1000+ concurrent camera streams using NVIDIA GPU hardware acceleration. The system implements a zero-copy GPU pipeline that minimizes VRAM usage through shared CUDA context and keeps all processing on the GPU until final JPEG compression. + +## Key Achievements + +- **Shared CUDA Context**: 70% VRAM reduction (from ~200MB to ~60MB per stream) +- **Linear VRAM Scaling**: Perfect scaling at 60 MB per additional stream +- **Zero-Copy Pipeline**: All processing stays on GPU until JPEG bytes +- **Proven Performance**: 4 streams @ 720p, 7-7.5 FPS each, 458 MB total VRAM + +## Architecture + +### Pipeline Flow +``` +RTSP Stream → PyAV (CPU) + ↓ + NVDEC Decode (GPU) → NV12 Format + ↓ + NV12 to RGB (GPU) → PyTorch Ops + ↓ + nvJPEG Encode (GPU) → JPEG Bytes + ↓ + CPU (JPEG only) +``` + +### Core Components + +#### StreamDecoderFactory +Singleton factory managing shared CUDA context across all decoder instances. + +**Key Methods:** +- `get_factory(gpu_id)`: Returns singleton instance +- `create_decoder(rtsp_url, buffer_size)`: Creates new decoder with shared context + +**CUDA Context Initialization:** +```python +err, = cuda_driver.cuInit(0) +err, self.cuda_context = cuda_driver.cuDevicePrimaryCtxRetain(self.cuda_device) +``` + +#### StreamDecoder +Individual stream decoder with NVDEC hardware acceleration and thread-safe ring buffer. + +**Key Features:** +- Thread-safe frame buffer (deque) +- Connection status tracking +- Automatic reconnection handling +- Background thread for continuous decoding + +**Key Methods:** +- `start()`: Start decoding thread +- `stop()`: Stop and cleanup +- `get_latest_frame()`: Get most recent RGB frame (GPU tensor) +- `is_connected()`: Check connection status +- `get_buffer_size()`: Current buffer size + +#### JPEGEncoderFactory +Shared JPEG encoder using nvImageCodec for GPU-accelerated encoding. + +**Key Function:** +```python +def encode_frame_to_jpeg(rgb_frame: torch.Tensor, quality: int = 95) -> Optional[bytes]: + """ + Encodes GPU RGB tensor to JPEG bytes without CPU transfer. + Uses __cuda_array_interface__ for zero-copy operation. + + Performance: 1-2ms per 720p frame + """ +``` + +## Technical Implementation + +### Shared CUDA Context Pattern + +```python +# Single shared context for all decoders +factory = StreamDecoderFactory(gpu_id=0) + +# All decoders share same context +decoder1 = factory.create_decoder(url1, buffer_size=30) +decoder2 = factory.create_decoder(url2, buffer_size=30) +decoder3 = factory.create_decoder(url3, buffer_size=30) +``` + +**Benefits:** +- 70% VRAM reduction per stream +- Single decoder initialization overhead +- Efficient resource sharing + +### NV12 to RGB Conversion (GPU) + +```python +def nv12_to_rgb_gpu(nv12_tensor: torch.Tensor, height: int, width: int) -> torch.Tensor: + """ + Converts NV12 (YUV420) to RGB entirely on GPU using PyTorch ops. + Uses BT.601 color space conversion. + + Input: (height * 1.5, width) NV12 tensor + Output: (3, height, width) RGB tensor + """ +``` + +**Steps:** +1. Split Y and UV planes +2. Deinterleave UV components +3. Upsample chroma (bilinear interpolation) +4. Apply BT.601 color matrix +5. Clamp to [0, 255] + +### Zero-Copy Operations + +**DLPack for PyTorch ↔ nvImageCodec:** +```python +# GPU tensor stays on GPU +rgb_hwc = rgb_frame.permute(1, 2, 0).contiguous() +nv_image = nvimgcodec.as_image(rgb_hwc) # Uses __cuda_array_interface__ +jpeg_data = encoder.encode(nv_image, "jpeg", encode_params) +``` + +## Performance Metrics + +### VRAM Usage (Python Process) + +| Streams | Total VRAM | Overhead | Per Stream | Marginal Cost | +|---------|-----------|----------|------------|---------------| +| 0 | 216 MB | 0 MB | - | - | +| 1 | 278 MB | 62 MB | 62.0 MB | 62 MB | +| 2 | 338 MB | 122 MB | 61.0 MB | 60 MB | +| 3 | 398 MB | 182 MB | 60.7 MB | 60 MB | +| 4 | 458 MB | 242 MB | 60.5 MB | 60 MB | + +**Result:** Perfect linear scaling at ~60 MB per stream + +### Capacity Estimates + +With 60 MB per stream + 216 MB baseline: + +- **16GB GPU**: ~269 cameras (conservative: ~250) +- **24GB GPU**: ~407 cameras (conservative: ~380) +- **48GB GPU**: ~815 cameras (conservative: ~780) +- **For 1000 streams**: ~60GB VRAM required + +### Throughput + +- **Frame Rate**: 7-7.5 FPS per stream @ 720p +- **JPEG Encoding**: 1-2ms per frame +- **Connection Time**: ~15s for stream stabilization + +## Project Structure + +``` +python-rtsp-worker/ +├── app.py # FastAPI application +├── services/ +│ ├── __init__.py # Package exports +│ ├── stream_decoder.py # StreamDecoder & Factory +│ └── jpeg_encoder.py # JPEG encoding utilities +├── test_stream.py # Single stream test +├── test_multi_stream.py # 4-stream test with monitoring +├── test_vram_scaling.py # System VRAM measurement +├── test_vram_process.py # Process VRAM measurement +├── test_jpeg_encode.py # JPEG encoding test +├── requirements.txt # Python dependencies +├── .env # Camera URLs (gitignored) +├── .env.example # Template for camera URLs +└── .gitignore + +``` + +## Dependencies + +``` +fastapi # Web framework +uvicorn[standard] # ASGI server +torch # GPU tensor operations +PyNvVideoCodec # NVDEC hardware decoding +av # FFmpeg/RTSP client +cuda-python # CUDA driver bindings +nvidia-nvimgcodec-cu12 # nvJPEG encoding +python-dotenv # Environment variables +``` + +## Configuration + +### Environment Variables (.env) + +```bash +# RTSP Camera URLs +CAMERA_URL_1=rtsp://user:pass@host/path +CAMERA_URL_2=rtsp://user:pass@host/path +CAMERA_URL_3=rtsp://user:pass@host/path +CAMERA_URL_4=rtsp://user:pass@host/path +# Add more as needed... +``` + +### Loading URLs in Code + +```python +from dotenv import load_dotenv +import os + +load_dotenv() + +camera_urls = [] +i = 1 +while True: + url = os.getenv(f'CAMERA_URL_{i}') + if url: + camera_urls.append(url) + i += 1 + else: + break +``` + +## Usage Examples + +### Basic Usage + +```python +from services import StreamDecoderFactory, encode_frame_to_jpeg + +# Create factory (shared CUDA context) +factory = StreamDecoderFactory(gpu_id=0) + +# Create decoder +decoder = factory.create_decoder( + rtsp_url="rtsp://user:pass@host/path", + buffer_size=30 +) + +# Start decoding +decoder.start() + +# Wait for connection +import time +time.sleep(5) + +# Get latest frame (GPU tensor) +rgb_frame = decoder.get_latest_frame() +if rgb_frame is not None: + # Encode to JPEG (on GPU) + jpeg_bytes = encode_frame_to_jpeg(rgb_frame, quality=95) + + # Save or transmit jpeg_bytes + with open("frame.jpg", "wb") as f: + f.write(jpeg_bytes) + +# Cleanup +decoder.stop() +``` + +### Multi-Stream Usage + +```python +from services import StreamDecoderFactory +import time + +factory = StreamDecoderFactory(gpu_id=0) + +# Create multiple decoders (all share context) +decoders = [] +for url in camera_urls: + decoder = factory.create_decoder(url, buffer_size=30) + decoder.start() + decoders.append(decoder) + +# Wait for connections +time.sleep(15) + +# Check status +for i, decoder in enumerate(decoders): + status = decoder.get_status() + buffer_size = decoder.get_buffer_size() + connected = decoder.is_connected() + print(f"Stream {i+1}: {status.value}, Buffer: {buffer_size}, Connected: {connected}") + +# Process frames +for decoder in decoders: + frame = decoder.get_latest_frame() + if frame is not None: + # Process frame... + pass + +# Cleanup +for decoder in decoders: + decoder.stop() +``` + +## Testing + +### Run Single Stream Test +```bash +python test_stream.py +``` + +### Run 4-Stream Test with VRAM Monitoring +```bash +python test_multi_stream.py +``` + +### Measure VRAM Scaling +```bash +python test_vram_process.py +``` + +### Test JPEG Encoding +```bash +python test_jpeg_encode.py +``` + +## Known Issues + +### Segmentation Faults on Cleanup +**Status**: Non-critical +**Impact**: Occurs during cleanup, doesn't affect core functionality +**Cause**: Likely CUDA context cleanup order issues +**Workaround**: Functionality works correctly; cleanup errors can be ignored + +## Technical Decisions + +### Why PyNvVideoCodec? +- Direct access to NVDEC hardware decoder +- Minimal overhead compared to FFmpeg/torchaudio +- Returns GPU tensors via DLPack +- Better control over decode sessions + +### Why Shared CUDA Context? +- Reduces VRAM from ~200MB to ~60MB per stream (70% savings) +- Enables 1000-stream target on 60GB GPU +- Minimal complexity overhead with singleton pattern + +### Why nvImageCodec? +- GPU-native JPEG encoding (nvJPEG) +- Zero-copy with PyTorch via `__cuda_array_interface__` +- 1-2ms encoding time per 720p frame +- Keeps data on GPU until final compression + +### Why Thread-Safe Ring Buffer? +- Decouples decoding from inference pipeline +- Prevents frame drops during processing spikes +- Allows async frame access +- Configurable buffer size per stream + +## Future Considerations + +### Hardware Decode Session Limits +- NVIDIA GPUs typically support 5-30 concurrent decode sessions +- May need multiple GPUs for 1000 streams +- Test with actual hardware to verify limits + +### Scaling Beyond 1000 Streams +- Multi-GPU support with context per GPU +- Load balancing across GPUs +- Network bandwidth considerations + +### TensorRT Integration +- Next step: Integrate with TensorRT inference pipeline +- GPU frames → TensorRT → Results +- Keep entire pipeline on GPU + +## References + +- [PyNvVideoCodec Documentation](https://developer.nvidia.com/pynvvideocodec) +- [NVIDIA Video Codec SDK](https://developer.nvidia.com/nvidia-video-codec-sdk) +- [nvImageCodec Documentation](https://docs.nvidia.com/cuda/nvimgcodec/) +- [CUDA Python Bindings](https://nvidia.github.io/cuda-python/) + +## License + +This project uses NVIDIA proprietary libraries (PyNvVideoCodec, nvImageCodec) which require NVIDIA GPU hardware and may have specific licensing terms. diff --git a/requirements.dev.txt b/requirements.dev.txt new file mode 100644 index 0000000..900cb8c --- /dev/null +++ b/requirements.dev.txt @@ -0,0 +1,11 @@ +# Development Dependencies +# Install with: pip install -r requirements.dev.txt + +# Model conversion tools +tensorrt +onnx +ultralytics # For YOLO models download and export + +# Optional: Additional tools for model optimization +onnxruntime-gpu # ONNX runtime for testing +onnx-simplifier # Simplify ONNX models diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..88ab797 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +fastapi +uvicorn[standard] +torch +PyNvVideoCodec +av +cuda-python +nvidia-nvimgcodec-cu12 # GPU-accelerated JPEG encoding/decoding with nvJPEG +python-dotenv # Load environment variables from .env file diff --git a/scripts/README.md b/scripts/README.md new file mode 100644 index 0000000..84cb13e --- /dev/null +++ b/scripts/README.md @@ -0,0 +1,197 @@ +# Scripts Directory + +This directory contains utility scripts for the python-rtsp-worker project. + +## convert_pt_to_tensorrt.py + +Converts PyTorch models (.pt, .pth) to TensorRT engines (.trt) for optimized GPU inference. + +### Features + +- **Multiple Precision Modes**: FP32, FP16, INT8 +- **Dynamic Batch Size**: Support for variable batch sizes +- **Automatic Optimization**: Creates optimization profiles for best performance +- **ONNX Intermediate**: Uses ONNX as intermediate format for compatibility +- **Easy to Use**: Simple command-line interface + +### Requirements + +Make sure you have the following dependencies installed: + +```bash +pip install torch tensorrt onnx +``` + +### Quick Start + +**Basic conversion (FP32)**: +```bash +python scripts/convert_pt_to_tensorrt.py \ + --model path/to/model.pt \ + --output models/model.trt +``` + +**FP16 precision** (recommended for most cases - 2x faster, minimal accuracy loss): +```bash +python scripts/convert_pt_to_tensorrt.py \ + --model yolov8n.pt \ + --output models/yolov8n.trt \ + --fp16 +``` + +**Custom input shape**: +```bash +python scripts/convert_pt_to_tensorrt.py \ + --model model.pt \ + --output model.trt \ + --input-shape 1,3,416,416 +``` + +**Dynamic batch size** (for variable batch inference): +```bash +python scripts/convert_pt_to_tensorrt.py \ + --model model.pt \ + --output model.trt \ + --dynamic-batch \ + --max-batch 16 +``` + +**Maximum optimization** (FP16 + INT8): +```bash +python scripts/convert_pt_to_tensorrt.py \ + --model model.pt \ + --output model.trt \ + --fp16 \ + --int8 +``` + +### Command-Line Arguments + +| Argument | Required | Default | Description | +|----------|----------|---------|-------------| +| `--model`, `-m` | Yes | - | Path to PyTorch model file (.pt or .pth) | +| `--output`, `-o` | Yes | - | Output path for TensorRT engine (.trt) | +| `--input-shape`, `-s` | No | 1,3,640,640 | Input tensor shape as B,C,H,W | +| `--fp16` | No | False | Enable FP16 precision (faster, ~same accuracy) | +| `--int8` | No | False | Enable INT8 precision (fastest, needs calibration) | +| `--dynamic-batch` | No | False | Enable dynamic batch size support | +| `--max-batch` | No | 16 | Maximum batch size for dynamic batching | +| `--workspace-size` | No | 4 | TensorRT workspace size in GB | +| `--gpu` | No | 0 | GPU device ID to use | +| `--input-names` | No | ["input"] | Custom input tensor names | +| `--output-names` | No | ["output"] | Custom output tensor names | +| `--keep-onnx` | No | False | Keep intermediate ONNX file for debugging | +| `--verbose`, `-v` | No | False | Enable verbose logging | + +### Performance Tips + +1. **Always use FP16** unless you need FP32 precision: + - 2x faster inference + - 50% less VRAM usage + - Minimal accuracy loss for most models + +2. **Use dynamic batching** for variable workloads: + - Process 1-16 images with same engine + - Automatic optimization for common batch sizes + +3. **Increase workspace size** for complex models: + - Default 4GB works for most models + - Increase to 8GB for very large models + +4. **INT8 quantization** for maximum speed: + - Requires calibration data (not included in basic conversion) + - 4x faster than FP32 + - Best for deployment scenarios + +### Integration with Model Repository + +Once converted, use the TensorRT engine with the model repository: + +```python +from services.model_repository import TensorRTModelRepository + +# Initialize repository +repo = TensorRTModelRepository(gpu_id=0, default_num_contexts=4) + +# Load the converted model +repo.load_model( + model_id="my_model", + file_path="models/model.trt", + num_contexts=4 +) + +# Run inference +import torch +input_tensor = torch.rand(1, 3, 640, 640, device='cuda:0') +outputs = repo.infer( + model_id="my_model", + inputs={"input": input_tensor} +) +``` + +### Troubleshooting + +**Issue**: `Failed to parse ONNX model` +- Solution: Check if your PyTorch model is compatible with ONNX export +- Try updating PyTorch and ONNX versions + +**Issue**: `FP16 not supported on this platform` +- Solution: Your GPU doesn't support FP16. Remove `--fp16` flag + +**Issue**: `Out of memory during conversion` +- Solution: Reduce `--workspace-size` or free up GPU memory + +**Issue**: `Model contains only state_dict` +- Solution: Your checkpoint only has weights. You need the full model architecture. +- Modify the script's `load_pytorch_model()` method to instantiate your model class + +### Examples for Common Models + +**YOLOv8**: +```bash +# Download model first +# yolo export model=yolov8n.pt format=engine device=0 + +# Or use this script +python scripts/convert_pt_to_tensorrt.py \ + --model yolov8n.pt \ + --output models/yolov8n.trt \ + --input-shape 1,3,640,640 \ + --fp16 +``` + +**ResNet**: +```bash +python scripts/convert_pt_to_tensorrt.py \ + --model resnet50.pt \ + --output models/resnet50.trt \ + --input-shape 1,3,224,224 \ + --fp16 \ + --dynamic-batch \ + --max-batch 32 +``` + +**Custom Model**: +```bash +python scripts/convert_pt_to_tensorrt.py \ + --model custom_model.pt \ + --output models/custom.trt \ + --input-shape 1,3,512,512 \ + --input-names image \ + --output-names predictions \ + --fp16 \ + --verbose +``` + +### Notes + +- The script uses ONNX as an intermediate format, which is the recommended approach +- TensorRT engines are hardware-specific; rebuild for different GPUs +- Conversion time varies (30 seconds to 5 minutes depending on model size) +- The first inference after loading is slower (warmup) + +### Support + +For issues or questions, please check: +- TensorRT documentation: https://docs.nvidia.com/deeplearning/tensorrt/ +- PyTorch ONNX export guide: https://pytorch.org/docs/stable/onnx.html diff --git a/scripts/convert_pt_to_tensorrt.py b/scripts/convert_pt_to_tensorrt.py new file mode 100755 index 0000000..b0286d6 --- /dev/null +++ b/scripts/convert_pt_to_tensorrt.py @@ -0,0 +1,562 @@ +#!/usr/bin/env python3 +""" +PyTorch to TensorRT Model Conversion Script + +This script converts PyTorch models (.pt, .pth) to TensorRT engines (.trt) for optimized inference. + +Features: +- Automatic FP32/FP16/INT8 precision modes +- Dynamic batch size support +- Input shape validation +- Optimization profiles for dynamic shapes +- ONNX intermediate format +- GPU-accelerated conversion + +Usage: + python convert_pt_to_tensorrt.py --model path/to/model.pt --output models/model.trt + python convert_pt_to_tensorrt.py --model yolov8n.pt --input-shape 1 3 640 640 --fp16 + python convert_pt_to_tensorrt.py --model model.pt --dynamic-batch --max-batch 16 +""" + +import argparse +import sys +from pathlib import Path +from typing import Tuple, List, Optional +import torch +import tensorrt as trt +import numpy as np + + +class TensorRTConverter: + """Converts PyTorch models to TensorRT engines""" + + def __init__(self, gpu_id: int = 0, verbose: bool = True): + """ + Initialize the converter. + + Args: + gpu_id: GPU device ID to use for conversion + verbose: Enable verbose logging + """ + self.gpu_id = gpu_id + self.device = torch.device(f'cuda:{gpu_id}') + + # TensorRT logger + log_level = trt.Logger.VERBOSE if verbose else trt.Logger.WARNING + self.logger = trt.Logger(log_level) + + # Set CUDA device + torch.cuda.set_device(gpu_id) + + print(f"Initialized TensorRT Converter on GPU {gpu_id}") + print(f"PyTorch version: {torch.__version__}") + print(f"TensorRT version: {trt.__version__}") + print(f"CUDA available: {torch.cuda.is_available()}") + if torch.cuda.is_available(): + print(f"CUDA device: {torch.cuda.get_device_name(gpu_id)}") + + def load_pytorch_model(self, model_path: str) -> torch.nn.Module: + """ + Load PyTorch model from file. + + Args: + model_path: Path to .pt or .pth file + + Returns: + Loaded PyTorch model in eval mode + """ + print(f"\nLoading PyTorch model from {model_path}...") + + if not Path(model_path).exists(): + raise FileNotFoundError(f"Model file not found: {model_path}") + + # Load model (weights_only=False for models with custom classes) + checkpoint = torch.load(model_path, map_location=self.device, weights_only=False) + + # Handle different checkpoint formats + if isinstance(checkpoint, dict): + if 'model' in checkpoint: + model = checkpoint['model'] + elif 'state_dict' in checkpoint: + # Need model architecture - this is a limitation + raise ValueError( + "Checkpoint contains only state_dict. " + "Please provide the complete model or modify this script to load your architecture." + ) + else: + raise ValueError("Unknown checkpoint format") + else: + model = checkpoint + + # Set to eval mode + model.eval() + model.to(self.device) + + print(f"✓ Model loaded successfully") + return model + + def export_to_onnx(self, model: torch.nn.Module, input_shape: Tuple[int, ...], + onnx_path: str, dynamic_batch: bool = False, + input_names: List[str] = None, output_names: List[str] = None) -> str: + """ + Export PyTorch model to ONNX format (intermediate step). + + Args: + model: PyTorch model + input_shape: Input tensor shape (B, C, H, W) + onnx_path: Output path for ONNX file + dynamic_batch: Enable dynamic batch dimension + input_names: List of input tensor names + output_names: List of output tensor names + + Returns: + Path to exported ONNX file + """ + print(f"\nExporting to ONNX format...") + print(f"Input shape: {input_shape}") + print(f"Dynamic batch: {dynamic_batch}") + + # Default names + if input_names is None: + input_names = ['input'] + if output_names is None: + output_names = ['output'] + + # Create dummy input + dummy_input = torch.randn(*input_shape, device=self.device) + + # Dynamic axes configuration + dynamic_axes = None + if dynamic_batch: + dynamic_axes = { + input_names[0]: {0: 'batch'}, + output_names[0]: {0: 'batch'} + } + + # Export to ONNX + torch.onnx.export( + model, + dummy_input, + onnx_path, + input_names=input_names, + output_names=output_names, + dynamic_axes=dynamic_axes, + opset_version=17, # Use recent ONNX opset + do_constant_folding=True, + verbose=False + ) + + print(f"✓ ONNX model exported to {onnx_path}") + return onnx_path + + def build_tensorrt_engine_from_onnx(self, onnx_path: str, engine_path: str, + fp16: bool = False, int8: bool = False, + max_workspace_size: int = 4, + min_batch: int = 1, opt_batch: int = 1, max_batch: int = 1) -> str: + """ + Build TensorRT engine from ONNX model. + + Args: + onnx_path: Path to ONNX model + engine_path: Output path for TensorRT engine + fp16: Enable FP16 precision + int8: Enable INT8 precision (requires calibration) + max_workspace_size: Maximum workspace size in GB + min_batch: Minimum batch size for optimization + opt_batch: Optimal batch size for optimization + max_batch: Maximum batch size for optimization + + Returns: + Path to built TensorRT engine + """ + print(f"\nBuilding TensorRT engine from ONNX...") + print(f"Precision: FP{'16' if fp16 else '32'}{' + INT8' if int8 else ''}") + print(f"Workspace size: {max_workspace_size} GB") + + # Create builder and network + builder = trt.Builder(self.logger) + network = builder.create_network( + 1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH) + ) + parser = trt.OnnxParser(network, self.logger) + + # Parse ONNX model + print(f"Loading ONNX file from {onnx_path}...") + with open(onnx_path, 'rb') as f: + if not parser.parse(f.read()): + print("ERROR: Failed to parse the ONNX file:") + for error in range(parser.num_errors): + print(f" {parser.get_error(error)}") + raise RuntimeError("Failed to parse ONNX model") + + print(f"✓ ONNX model parsed successfully") + + # Print network info + print(f"\nNetwork Information:") + print(f" Inputs: {network.num_inputs}") + for i in range(network.num_inputs): + inp = network.get_input(i) + print(f" [{i}] {inp.name}: {inp.shape} ({inp.dtype})") + + print(f" Outputs: {network.num_outputs}") + for i in range(network.num_outputs): + out = network.get_output(i) + print(f" [{i}] {out.name}: {out.shape} ({out.dtype})") + + # Create builder config + config = builder.create_builder_config() + + # Set workspace size + config.set_memory_pool_limit( + trt.MemoryPoolType.WORKSPACE, + max_workspace_size * (1 << 30) # GB to bytes + ) + + # Enable precision modes + if fp16: + if not builder.platform_has_fast_fp16: + print("Warning: FP16 not supported on this platform, using FP32") + else: + config.set_flag(trt.BuilderFlag.FP16) + print("✓ FP16 mode enabled") + + if int8: + if not builder.platform_has_fast_int8: + print("Warning: INT8 not supported on this platform, using FP32/FP16") + else: + config.set_flag(trt.BuilderFlag.INT8) + print("✓ INT8 mode enabled") + print("Note: INT8 calibration not implemented. Results may be suboptimal.") + + # Set optimization profile for dynamic shapes + if max_batch > 1 or min_batch != max_batch: + profile = builder.create_optimization_profile() + + for i in range(network.num_inputs): + inp = network.get_input(i) + shape = list(inp.shape) + + # Handle dynamic batch dimension + if shape[0] == -1: + # Min, opt, max shapes + min_shape = [min_batch] + shape[1:] + opt_shape = [opt_batch] + shape[1:] + max_shape = [max_batch] + shape[1:] + + profile.set_shape(inp.name, min_shape, opt_shape, max_shape) + print(f" Dynamic shape for {inp.name}:") + print(f" Min: {min_shape}") + print(f" Opt: {opt_shape}") + print(f" Max: {max_shape}") + + config.add_optimization_profile(profile) + + # Build engine + print(f"\nBuilding TensorRT engine (this may take a few minutes)...") + serialized_engine = builder.build_serialized_network(network, config) + + if serialized_engine is None: + raise RuntimeError("Failed to build TensorRT engine") + + # Save engine to file + print(f"Saving engine to {engine_path}...") + with open(engine_path, 'wb') as f: + f.write(serialized_engine) + + # Get file size + file_size_mb = Path(engine_path).stat().st_size / (1024 * 1024) + print(f"✓ TensorRT engine built successfully") + print(f" Engine size: {file_size_mb:.2f} MB") + + return engine_path + + def convert(self, model_path: str, output_path: str, + input_shape: Tuple[int, ...] = (1, 3, 640, 640), + fp16: bool = False, int8: bool = False, + dynamic_batch: bool = False, + max_batch: int = 16, + workspace_size: int = 4, + input_names: List[str] = None, + output_names: List[str] = None, + keep_onnx: bool = False) -> str: + """ + Convert PyTorch or ONNX model to TensorRT engine. + + Args: + model_path: Path to PyTorch model (.pt, .pth) or ONNX model (.onnx) + output_path: Path for output TensorRT engine (.trt) + input_shape: Input tensor shape (B, C, H, W) - required for PyTorch models + fp16: Enable FP16 precision + int8: Enable INT8 precision + dynamic_batch: Enable dynamic batch size + max_batch: Maximum batch size (for dynamic batching) + workspace_size: TensorRT workspace size in GB + input_names: Custom input names (for PyTorch export) + output_names: Custom output names (for PyTorch export) + keep_onnx: Keep intermediate ONNX file + + Returns: + Path to created TensorRT engine + """ + # Create output directory + output_dir = Path(output_path).parent + output_dir.mkdir(parents=True, exist_ok=True) + + # Check if input is already ONNX + model_path_obj = Path(model_path) + is_onnx = model_path_obj.suffix.lower() == '.onnx' + + if is_onnx: + # Direct ONNX to TensorRT conversion + print(f"Input is ONNX model, converting directly to TensorRT...") + + min_batch = 1 + opt_batch = input_shape[0] if not dynamic_batch else max(1, max_batch // 2) + max_batch_size = max_batch if dynamic_batch else input_shape[0] + + engine_path = self.build_tensorrt_engine_from_onnx( + onnx_path=model_path, + engine_path=output_path, + fp16=fp16, + int8=int8, + max_workspace_size=workspace_size, + min_batch=min_batch, + opt_batch=opt_batch, + max_batch=max_batch_size + ) + + print(f"\n{'=' * 80}") + print(f"CONVERSION COMPLETED SUCCESSFULLY") + print(f"{'=' * 80}") + print(f"Input: {model_path}") + print(f"Output: {engine_path}") + print(f"Precision: FP{'16' if fp16 else '32'}{' + INT8' if int8 else ''}") + print(f"{'=' * 80}") + + return engine_path + + # PyTorch to TensorRT conversion (via ONNX) + # Temporary ONNX path + onnx_path = str(output_dir / "temp_model.onnx") + + try: + # Step 1: Load PyTorch model + model = self.load_pytorch_model(model_path) + + # Step 2: Export to ONNX + self.export_to_onnx( + model=model, + input_shape=input_shape, + onnx_path=onnx_path, + dynamic_batch=dynamic_batch, + input_names=input_names, + output_names=output_names + ) + + # Step 3: Build TensorRT engine + min_batch = 1 + opt_batch = input_shape[0] if not dynamic_batch else max(1, max_batch // 2) + max_batch_size = max_batch if dynamic_batch else input_shape[0] + + engine_path = self.build_tensorrt_engine_from_onnx( + onnx_path=onnx_path, + engine_path=output_path, + fp16=fp16, + int8=int8, + max_workspace_size=workspace_size, + min_batch=min_batch, + opt_batch=opt_batch, + max_batch=max_batch_size + ) + + print(f"\n{'=' * 80}") + print(f"CONVERSION COMPLETED SUCCESSFULLY") + print(f"{'=' * 80}") + print(f"Input: {model_path}") + print(f"Output: {engine_path}") + print(f"Precision: FP{'16' if fp16 else '32'}{' + INT8' if int8 else ''}") + print(f"Dynamic batch: {dynamic_batch}") + if dynamic_batch: + print(f"Batch range: [1, {max_batch}]") + print(f"{'=' * 80}") + + return engine_path + + finally: + # Cleanup temporary ONNX file + if not keep_onnx and Path(onnx_path).exists(): + Path(onnx_path).unlink() + print(f"Cleaned up temporary ONNX file") + + +def parse_shape(shape_str: str) -> Tuple[int, ...]: + """Parse shape string like '1,3,640,640' to tuple""" + try: + return tuple(int(x) for x in shape_str.split(',')) + except ValueError: + raise argparse.ArgumentTypeError( + f"Invalid shape format: {shape_str}. Expected format: 1,3,640,640" + ) + + +def main(): + parser = argparse.ArgumentParser( + description="Convert PyTorch models to TensorRT engines", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Basic conversion (FP32) + python convert_pt_to_tensorrt.py --model yolov8n.pt --output models/yolov8n.trt + + # FP16 precision for faster inference + python convert_pt_to_tensorrt.py --model model.pt --output model.trt --fp16 + + # Custom input shape + python convert_pt_to_tensorrt.py --model model.pt --output model.trt \\ + --input-shape 1,3,416,416 + + # Dynamic batch size (1 to 16) + python convert_pt_to_tensorrt.py --model model.pt --output model.trt \\ + --dynamic-batch --max-batch 16 + + # INT8 quantization for maximum speed (requires calibration) + python convert_pt_to_tensorrt.py --model model.pt --output model.trt \\ + --fp16 --int8 + + # Keep intermediate ONNX file for debugging + python convert_pt_to_tensorrt.py --model model.pt --output model.trt \\ + --keep-onnx + """ + ) + + # Required arguments + parser.add_argument( + '--model', '-m', + type=str, + required=True, + help='Path to PyTorch model file (.pt or .pth)' + ) + + parser.add_argument( + '--output', '-o', + type=str, + required=True, + help='Output path for TensorRT engine (.trt or .engine)' + ) + + # Optional arguments + parser.add_argument( + '--input-shape', '-s', + type=parse_shape, + default=(1, 3, 640, 640), + help='Input tensor shape as B,C,H,W (default: 1,3,640,640)' + ) + + parser.add_argument( + '--fp16', + action='store_true', + help='Enable FP16 precision (faster inference, slightly lower accuracy)' + ) + + parser.add_argument( + '--int8', + action='store_true', + help='Enable INT8 precision (fastest, requires calibration)' + ) + + parser.add_argument( + '--dynamic-batch', + action='store_true', + help='Enable dynamic batch size support' + ) + + parser.add_argument( + '--max-batch', + type=int, + default=16, + help='Maximum batch size for dynamic batching (default: 16)' + ) + + parser.add_argument( + '--workspace-size', + type=int, + default=4, + help='TensorRT workspace size in GB (default: 4)' + ) + + parser.add_argument( + '--gpu', + type=int, + default=0, + help='GPU device ID (default: 0)' + ) + + parser.add_argument( + '--input-names', + type=str, + nargs='+', + default=None, + help='Custom input tensor names (default: ["input"])' + ) + + parser.add_argument( + '--output-names', + type=str, + nargs='+', + default=None, + help='Custom output tensor names (default: ["output"])' + ) + + parser.add_argument( + '--keep-onnx', + action='store_true', + help='Keep intermediate ONNX file' + ) + + parser.add_argument( + '--verbose', '-v', + action='store_true', + help='Enable verbose logging' + ) + + args = parser.parse_args() + + # Validate arguments + if not Path(args.model).exists(): + print(f"Error: Model file not found: {args.model}") + sys.exit(1) + + if args.int8 and not args.fp16: + print("Warning: INT8 mode works best with FP16 enabled. Adding --fp16 flag.") + args.fp16 = True + + # Run conversion + try: + converter = TensorRTConverter(gpu_id=args.gpu, verbose=args.verbose) + + converter.convert( + model_path=args.model, + output_path=args.output, + input_shape=args.input_shape, + fp16=args.fp16, + int8=args.int8, + dynamic_batch=args.dynamic_batch, + max_batch=args.max_batch, + workspace_size=args.workspace_size, + input_names=args.input_names, + output_names=args.output_names, + keep_onnx=args.keep_onnx + ) + + print("\n✓ Conversion successful!") + + except Exception as e: + print(f"\n✗ Conversion failed: {e}") + if args.verbose: + import traceback + traceback.print_exc() + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/services/README_MODEL_REPOSITORY.md b/services/README_MODEL_REPOSITORY.md new file mode 100644 index 0000000..00e1a34 --- /dev/null +++ b/services/README_MODEL_REPOSITORY.md @@ -0,0 +1,380 @@ +# TensorRT Model Repository + +Efficient TensorRT model management with context pooling, deduplication, and GPU-to-GPU inference. + +## Architecture + +### Key Features + +1. **Model Deduplication by File Hash** + - Multiple model IDs can point to the same model file + - Only one engine loaded in VRAM per unique file + - Example: 100 cameras with same model = 1 engine (not 100!) + +2. **Context Pooling for Load Balancing** + - Each unique engine has N execution contexts (configurable) + - Contexts borrowed/returned via mutex-based queue + - Enables concurrent inference without context-per-model overhead + - Example: 100 cameras sharing 4 contexts efficiently + +3. **GPU-to-GPU Inference** + - All inputs/outputs stay in VRAM (zero CPU transfers) + - Integrates seamlessly with StreamDecoder (frames already on GPU) + - Maximum performance for video inference pipelines + +4. **Thread-Safe Concurrent Inference** + - Mutex-based context acquisition (TensorRT best practice) + - No shared IExecutionContext across threads (safe) + - Multiple threads can infer concurrently (limited by pool size) + +## Design Rationale + +### Why Context Pooling? + +**Without pooling** (naive approach): +``` +100 cameras → 100 model IDs → 100 execution contexts +``` +- Problem: Each context consumes VRAM (layers, workspace, etc.) +- Problem: Context creation overhead per camera +- Problem: Doesn't scale to hundreds of cameras + +**With pooling** (our approach): +``` +100 cameras → 100 model IDs → 1 shared engine → 4 contexts (pool) +``` +- Solution: Contexts shared across all cameras using same model +- Solution: Borrow/return mechanism with mutex queue +- Solution: Scales to any number of cameras with fixed context count + +### Memory Savings Example + +YOLOv8n model (~6MB engine file): + +| Approach | Model IDs | Engines | Contexts | Approx VRAM | +|----------|-----------|---------|----------|-------------| +| Naive | 100 | 100 | 100 | ~1.5 GB | +| **Ours (pooled)** | **100** | **1** | **4** | **~30 MB** | + +**50x memory savings!** + +## Usage + +### Basic Usage + +```python +from services.model_repository import TensorRTModelRepository + +# Initialize repository +repo = TensorRTModelRepository( + gpu_id=0, + default_num_contexts=4 # 4 contexts per unique engine +) + +# Load model for camera 1 +repo.load_model( + model_id="camera_1", + file_path="models/yolov8n.trt" +) + +# Load same model for camera 2 (deduplication happens automatically) +repo.load_model( + model_id="camera_2", + file_path="models/yolov8n.trt" # Same file → shares engine and contexts! +) + +# Run inference (GPU-to-GPU) +import torch +input_tensor = torch.rand(1, 3, 640, 640, device='cuda:0') + +outputs = repo.infer( + model_id="camera_1", + inputs={"images": input_tensor}, + synchronize=True, + timeout=5.0 # Wait up to 5s for available context +) + +# Outputs stay on GPU +for name, tensor in outputs.items(): + print(f"{name}: {tensor.shape} on {tensor.device}") +``` + +### Multi-Camera Scenario + +```python +# Setup multiple cameras +cameras = [f"camera_{i}" for i in range(100)] + +# Load same model for all cameras +for camera_id in cameras: + repo.load_model( + model_id=camera_id, + file_path="models/yolov8n.trt" # Same file for all + ) + +# Check efficiency +stats = repo.get_stats() +print(f"Model IDs: {stats['total_model_ids']}") # 100 +print(f"Unique engines: {stats['unique_engines']}") # 1 +print(f"Total contexts: {stats['total_contexts']}") # 4 +``` + +### Integration with RTSP Decoder + +```python +from services.stream_decoder import StreamDecoderFactory +from services.model_repository import TensorRTModelRepository + +# Setup +decoder_factory = StreamDecoderFactory(gpu_id=0) +model_repo = TensorRTModelRepository(gpu_id=0) + +# Create decoder for camera +decoder = decoder_factory.create_decoder("rtsp://camera.ip/stream") +decoder.start() + +# Load inference model +model_repo.load_model("camera_main", "models/yolov8n.trt") + +# Process frames (everything on GPU) +frame_gpu = decoder.get_latest_frame(rgb=True) # torch.Tensor on CUDA + +# Preprocess (stays on GPU) +frame_gpu = frame_gpu.float() / 255.0 +frame_gpu = frame_gpu.unsqueeze(0) # Add batch dim + +# Inference (GPU-to-GPU, zero copy) +outputs = model_repo.infer( + model_id="camera_main", + inputs={"images": frame_gpu} +) + +# Post-process outputs (can stay on GPU) +# ... NMS, bounding boxes, etc. +``` + +### Concurrent Inference + +```python +import threading + +def process_camera(camera_id: str, model_id: str): + # Get frame from decoder (on GPU) + frame = decoder.get_latest_frame(rgb=True) + + # Inference automatically borrows/returns context from pool + outputs = repo.infer( + model_id=model_id, + inputs={"images": frame}, + timeout=10.0 # Wait for available context + ) + + # Process outputs... + +# Multiple threads can infer concurrently +threads = [] +for i in range(10): # 10 threads + t = threading.Thread( + target=process_camera, + args=(f"camera_{i}", f"camera_{i}") + ) + threads.append(t) + t.start() + +for t in threads: + t.join() + +# With 4 contexts: up to 4 inferences run in parallel +# Others wait in queue, contexts auto-balanced +``` + +## API Reference + +### TensorRTModelRepository + +#### `__init__(gpu_id=0, default_num_contexts=4)` +Initialize the repository. + +**Args:** +- `gpu_id`: GPU device ID +- `default_num_contexts`: Default context pool size per engine + +#### `load_model(model_id, file_path, num_contexts=None, force_reload=False)` +Load a TensorRT model. + +**Args:** +- `model_id`: Unique identifier (e.g., "camera_1") +- `file_path`: Path to .trt/.engine file +- `num_contexts`: Context pool size (None = use default) +- `force_reload`: Reload if model_id exists + +**Returns:** `ModelMetadata` + +**Deduplication:** If file hash matches existing model, reuses engine + contexts. + +#### `infer(model_id, inputs, synchronize=True, timeout=5.0)` +Run inference. + +**Args:** +- `model_id`: Model identifier +- `inputs`: Dict mapping input names to CUDA tensors +- `synchronize`: Wait for completion +- `timeout`: Max wait time for context (seconds) + +**Returns:** Dict mapping output names to CUDA tensors + +**Thread-safe:** Borrows context from pool, returns after inference. + +#### `unload_model(model_id)` +Unload a model. + +If last reference to engine, fully unloads from VRAM. + +#### `get_metadata(model_id)` +Get model metadata. + +**Returns:** `ModelMetadata` or `None` + +#### `get_model_info(model_id)` +Get detailed model information. + +**Returns:** Dict with engine references, context pool size, shared model IDs, etc. + +#### `get_stats()` +Get repository statistics. + +**Returns:** Dict with total models, unique engines, contexts, memory efficiency. + +## Best Practices + +### 1. Set Appropriate Context Pool Size + +```python +# For 10 cameras with same model, 4 contexts is usually enough +repo = TensorRTModelRepository(default_num_contexts=4) + +# For high concurrency, increase pool size +repo = TensorRTModelRepository(default_num_contexts=8) +``` + +**Rule of thumb:** Start with 4 contexts, increase if you see timeout errors. + +### 2. Always Use GPU Tensors + +```python +# ✅ Good: Input on GPU +input_gpu = torch.rand(1, 3, 640, 640, device='cuda:0') +outputs = repo.infer(model_id, {"images": input_gpu}) + +# ❌ Bad: Input on CPU (will cause error) +input_cpu = torch.rand(1, 3, 640, 640) +outputs = repo.infer(model_id, {"images": input_cpu}) # ValueError! +``` + +### 3. Handle Timeout Gracefully + +```python +try: + outputs = repo.infer( + model_id="camera_1", + inputs=inputs, + timeout=5.0 + ) +except RuntimeError as e: + # All contexts busy, increase pool size or add backpressure + print(f"Inference timeout: {e}") +``` + +### 4. Use Same File for Deduplication + +```python +# ✅ Good: Same file path → deduplication +repo.load_model("cam1", "/models/yolo.trt") +repo.load_model("cam2", "/models/yolo.trt") # Shares engine! + +# ❌ Bad: Different paths (even if same content) → no deduplication +repo.load_model("cam1", "/models/yolo.trt") +repo.load_model("cam2", "/models/yolo_copy.trt") # Separate engine +``` + +## TensorRT Best Practices Implemented + +Based on NVIDIA documentation and web search findings: + +1. **Separate IExecutionContext per concurrent stream** ✅ + - Each context has its own CUDA stream + - Contexts never shared across threads simultaneously + +2. **Mutex-based context management** ✅ + - Queue-based borrowing with locks + - Thread-safe acquire/release pattern + +3. **GPU memory reuse** ✅ + - Engines shared by file hash + - Contexts pooled and reused + +4. **Zero-copy operations** ✅ + - All data stays in VRAM + - DLPack integration with PyTorch + +## Troubleshooting + +### "No execution context available within timeout" + +**Cause:** All contexts busy with concurrent inferences. + +**Solutions:** +1. Increase context pool size: + ```python + repo.load_model(model_id, file_path, num_contexts=8) + ``` +2. Increase timeout: + ```python + outputs = repo.infer(model_id, inputs, timeout=30.0) + ``` +3. Add backpressure/throttling to limit concurrent requests + +### Out of Memory (OOM) + +**Cause:** Too many unique engines or large context pools. + +**Solutions:** +1. Ensure deduplication working (same file paths) +2. Reduce context pool sizes +3. Use smaller models or quantization (INT8/FP16) + +### Import Error: "tensorrt could not be resolved" + +**Solution:** Install TensorRT: +```bash +pip install tensorrt +# Or use NVIDIA's wheel for your CUDA version +``` + +## Performance Tips + +1. **Batch Processing:** Process multiple frames before synchronizing + ```python + outputs = repo.infer(model_id, inputs, synchronize=False) + # ... more inferences ... + torch.cuda.synchronize() # Sync once at end + ``` + +2. **Async Inference:** Don't synchronize if not needed immediately + ```python + outputs = repo.infer(model_id, inputs, synchronize=False) + # GPU continues working, CPU continues + # Synchronize later when you need results + ``` + +3. **Monitor Context Utilization:** + ```python + stats = repo.get_stats() + print(f"Contexts: {stats['total_contexts']}") + + # If timeouts occur frequently, increase pool size + ``` + +## License + +Part of python-rtsp-worker project. diff --git a/services/__init__.py b/services/__init__.py new file mode 100644 index 0000000..307e411 --- /dev/null +++ b/services/__init__.py @@ -0,0 +1,14 @@ +""" +Services package for RTSP stream processing with GPU acceleration. +""" + +from .stream_decoder import StreamDecoderFactory, StreamDecoder, ConnectionStatus +from .jpeg_encoder import JPEGEncoderFactory, encode_frame_to_jpeg + +__all__ = [ + 'StreamDecoderFactory', + 'StreamDecoder', + 'ConnectionStatus', + 'JPEGEncoderFactory', + 'encode_frame_to_jpeg', +] diff --git a/services/jpeg_encoder.py b/services/jpeg_encoder.py new file mode 100644 index 0000000..e9e6df5 --- /dev/null +++ b/services/jpeg_encoder.py @@ -0,0 +1,91 @@ +""" +JPEG Encoder wrapper for GPU-accelerated JPEG encoding using nvImageCodec/nvJPEG. +Provides a shared encoder instance that can be used across multiple streams. +""" + +from typing import Optional +import torch +import nvidia.nvimgcodec as nvimgcodec + + +class JPEGEncoderFactory: + """ + Factory for creating and managing a shared JPEG encoder instance. + Thread-safe singleton pattern for efficient resource sharing. + """ + + _instance = None + _encoder = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super(JPEGEncoderFactory, cls).__new__(cls) + cls._encoder = nvimgcodec.Encoder() + print("JPEGEncoderFactory initialized with shared nvJPEG encoder") + return cls._instance + + @classmethod + def get_encoder(cls): + """Get the shared JPEG encoder instance""" + if cls._encoder is None: + cls() # Initialize if not already done + return cls._encoder + + +def encode_frame_to_jpeg(rgb_frame: torch.Tensor, quality: int = 95) -> Optional[bytes]: + """ + Encode an RGB frame to JPEG on GPU and return JPEG bytes. + + This function: + 1. Takes RGB frame from GPU (stays on GPU during encoding) + 2. Converts PyTorch tensor to nvImageCodec image via as_image() + 3. Encodes to JPEG using nvJPEG (GPU operation) + 4. Transfers only JPEG bytes to CPU + 5. Returns bytes for saving to disk + + Args: + rgb_frame: RGB tensor on GPU, shape (3, H, W) or (H, W, 3), dtype uint8 + quality: JPEG quality (0-100, default 95) + + Returns: + JPEG encoded bytes or None if encoding fails + """ + if rgb_frame is None: + return None + + try: + # Ensure we have (H, W, C) format and contiguous memory + if rgb_frame.dim() == 3: + if rgb_frame.shape[0] == 3: + # Convert from (C, H, W) to (H, W, C) + rgb_hwc = rgb_frame.permute(1, 2, 0).contiguous() + else: + # Already (H, W, C) + rgb_hwc = rgb_frame.contiguous() + else: + raise ValueError(f"Expected 3D tensor, got shape {rgb_frame.shape}") + + # Get shared encoder + encoder = JPEGEncoderFactory.get_encoder() + + # Create encode parameters with quality + # Quality is set via quality_value (0-100 scale) + jpeg_params = nvimgcodec.JpegEncodeParams(optimized_huffman=True) + encode_params = nvimgcodec.EncodeParams( + quality_value=float(quality), + jpeg_encode_params=jpeg_params + ) + + # Convert PyTorch GPU tensor to nvImageCodec image using __cuda_array_interface__ + # This is zero-copy - nvimgcodec reads directly from GPU memory + nv_image = nvimgcodec.as_image(rgb_hwc) + + # Encode to JPEG on GPU + # The encoding happens on GPU, only compressed JPEG bytes are transferred to CPU + jpeg_data = encoder.encode(nv_image, "jpeg", encode_params) + + return bytes(jpeg_data) + + except Exception as e: + print(f"Error encoding frame to JPEG: {e}") + return None diff --git a/services/model_repository.py b/services/model_repository.py new file mode 100644 index 0000000..ec50401 --- /dev/null +++ b/services/model_repository.py @@ -0,0 +1,631 @@ +import threading +import hashlib +from typing import Optional, Dict, Any, List, Tuple +from pathlib import Path +from queue import Queue +import torch +import tensorrt as trt +from dataclasses import dataclass + + +@dataclass +class ModelMetadata: + """Metadata for a loaded TensorRT model""" + file_path: str + file_hash: str + input_shapes: Dict[str, Tuple[int, ...]] + output_shapes: Dict[str, Tuple[int, ...]] + input_names: List[str] + output_names: List[str] + input_dtypes: Dict[str, torch.dtype] + output_dtypes: Dict[str, torch.dtype] + + +class ExecutionContext: + """ + Wrapper for TensorRT execution context with CUDA stream. + Used in context pool for load balancing. + """ + def __init__(self, context: trt.IExecutionContext, stream: torch.cuda.Stream, + context_id: int, device: torch.device): + self.context = context + self.stream = stream + self.context_id = context_id + self.device = device + self.in_use = False + self.lock = threading.Lock() + + def __repr__(self): + return f"ExecutionContext(id={self.context_id}, in_use={self.in_use})" + + +class SharedEngine: + """ + Shared TensorRT engine with context pool for load balancing. + + Architecture: + - One engine shared across all model_ids with same file hash + - Pool of N execution contexts for concurrent inference + - Contexts are borrowed/returned using mutex locks + - Load balancing: contexts distributed across requests + """ + def __init__(self, engine: trt.ICudaEngine, file_hash: str, file_path: str, + num_contexts: int, device: torch.device, metadata: ModelMetadata): + self.engine = engine + self.file_hash = file_hash + self.file_path = file_path + self.metadata = metadata + self.device = device + self.num_contexts = num_contexts + + # Create context pool + self.context_pool: List[ExecutionContext] = [] + self.available_contexts: Queue[ExecutionContext] = Queue() + + for i in range(num_contexts): + ctx = engine.create_execution_context() + if ctx is None: + raise RuntimeError(f"Failed to create execution context {i}") + + stream = torch.cuda.Stream(device=device) + exec_ctx = ExecutionContext(ctx, stream, i, device) + self.context_pool.append(exec_ctx) + self.available_contexts.put(exec_ctx) + + # Model IDs referencing this engine + self.model_ids: set = set() + self.lock = threading.Lock() + + print(f"Created context pool with {num_contexts} contexts for engine {file_hash[:8]}...") + + def acquire_context(self, timeout: Optional[float] = None) -> Optional[ExecutionContext]: + """ + Acquire an available execution context from the pool. + Blocks if all contexts are in use. + + Args: + timeout: Max time to wait for context (None = wait forever) + + Returns: + ExecutionContext or None if timeout + """ + try: + exec_ctx = self.available_contexts.get(timeout=timeout) + with exec_ctx.lock: + exec_ctx.in_use = True + return exec_ctx + except: + return None + + def release_context(self, exec_ctx: ExecutionContext): + """ + Return a context to the pool. + + Args: + exec_ctx: Context to release + """ + with exec_ctx.lock: + exec_ctx.in_use = False + self.available_contexts.put(exec_ctx) + + def add_model_id(self, model_id: str): + """Add a model_id reference to this engine""" + with self.lock: + self.model_ids.add(model_id) + + def remove_model_id(self, model_id: str) -> int: + """ + Remove a model_id reference from this engine. + Returns the number of remaining references. + """ + with self.lock: + self.model_ids.discard(model_id) + return len(self.model_ids) + + def get_reference_count(self) -> int: + """Get number of model_ids using this engine""" + with self.lock: + return len(self.model_ids) + + def cleanup(self): + """Cleanup all contexts""" + for exec_ctx in self.context_pool: + del exec_ctx.context + self.context_pool.clear() + del self.engine + + +class TensorRTModelRepository: + """ + Thread-safe repository for TensorRT models with context pooling and deduplication. + + Architecture: + - Deduplication: Multiple model_ids with same file → share one engine + - Context Pool: Each unique engine has N execution contexts (configurable) + - Load Balancing: Contexts are borrowed/returned via mutex queue + - Scalability: Adding 100 cameras with same model = 1 engine + N contexts (not 100 contexts!) + + Best Practices: + - GPU-to-GPU: All inputs/outputs stay in VRAM (zero CPU transfers) + - Thread Safety: Mutex-based context borrowing (TensorRT best practice) + - Memory Efficient: Deduplicate by file hash, share engine across model_ids + - Concurrent: N contexts allow N parallel inferences per unique model + + Example: + # 100 cameras, same model file + for i in range(100): + repo.load_model(f"camera_{i}", "yolov8.trt") + # Result: 1 engine in VRAM, N contexts (e.g., 4), not 100 contexts! + """ + + def __init__(self, gpu_id: int = 0, default_num_contexts: int = 4): + """ + Initialize the model repository. + + Args: + gpu_id: GPU device ID to use + default_num_contexts: Default number of execution contexts per unique engine + """ + self.gpu_id = gpu_id + self.device = torch.device(f'cuda:{gpu_id}') + self.default_num_contexts = default_num_contexts + + # Model ID to engine mapping: model_id -> file_hash + self._model_to_hash: Dict[str, str] = {} + + # Shared engines with context pools: file_hash -> SharedEngine + self._shared_engines: Dict[str, SharedEngine] = {} + + # Locks for thread safety + self._repo_lock = threading.RLock() + + # TensorRT logger + self.trt_logger = trt.Logger(trt.Logger.WARNING) + + print(f"TensorRT Model Repository initialized on GPU {gpu_id}") + print(f"Default context pool size: {default_num_contexts} contexts per unique model") + + @staticmethod + def compute_file_hash(file_path: str) -> str: + """ + Compute SHA256 hash of a file for deduplication. + + Args: + file_path: Path to the file + + Returns: + Hexadecimal hash string + """ + sha256_hash = hashlib.sha256() + with open(file_path, "rb") as f: + # Read in chunks to handle large files efficiently + for byte_block in iter(lambda: f.read(65536), b""): + sha256_hash.update(byte_block) + return sha256_hash.hexdigest() + + def _load_engine(self, file_path: str) -> trt.ICudaEngine: + """ + Load TensorRT engine from file. + + Args: + file_path: Path to .trt or .engine file + + Returns: + TensorRT engine + """ + runtime = trt.Runtime(self.trt_logger) + + with open(file_path, 'rb') as f: + engine_data = f.read() + + engine = runtime.deserialize_cuda_engine(engine_data) + if engine is None: + raise RuntimeError(f"Failed to load TensorRT engine from {file_path}") + + return engine + + def _extract_metadata(self, engine: trt.ICudaEngine, + file_path: str, file_hash: str) -> ModelMetadata: + """ + Extract metadata from TensorRT engine. + + Args: + engine: TensorRT engine + file_path: Path to model file + file_hash: SHA256 hash of model file + + Returns: + ModelMetadata object + """ + input_shapes = {} + output_shapes = {} + input_names = [] + output_names = [] + input_dtypes = {} + output_dtypes = {} + + # TensorRT dtype to PyTorch dtype mapping + trt_to_torch_dtype = { + trt.DataType.FLOAT: torch.float32, + trt.DataType.HALF: torch.float16, + trt.DataType.INT8: torch.int8, + trt.DataType.INT32: torch.int32, + trt.DataType.BOOL: torch.bool, + } + + # Iterate through all tensors (inputs and outputs) - TensorRT 10.x API + for i in range(engine.num_io_tensors): + name = engine.get_tensor_name(i) + shape = tuple(engine.get_tensor_shape(name)) + dtype = trt_to_torch_dtype.get(engine.get_tensor_dtype(name), torch.float32) + mode = engine.get_tensor_mode(name) + + if mode == trt.TensorIOMode.INPUT: + input_names.append(name) + input_shapes[name] = shape + input_dtypes[name] = dtype + else: + output_names.append(name) + output_shapes[name] = shape + output_dtypes[name] = dtype + + return ModelMetadata( + file_path=file_path, + file_hash=file_hash, + input_shapes=input_shapes, + output_shapes=output_shapes, + input_names=input_names, + output_names=output_names, + input_dtypes=input_dtypes, + output_dtypes=output_dtypes + ) + + def load_model(self, model_id: str, file_path: str, + num_contexts: Optional[int] = None, + force_reload: bool = False) -> ModelMetadata: + """ + Load a TensorRT model with the given ID. + + Deduplication: If a model with the same file hash is already loaded, the model_id + is simply mapped to the existing SharedEngine (no new engine or contexts created). + + Args: + model_id: User-defined identifier for this model (e.g., "camera_1") + file_path: Path to TensorRT engine file (.trt or .engine) + num_contexts: Number of execution contexts in pool (None = use default) + force_reload: If True, reload even if model_id exists + + Returns: + ModelMetadata for the loaded model + + Raises: + FileNotFoundError: If model file doesn't exist + RuntimeError: If engine loading fails + ValueError: If model_id already exists and force_reload is False + """ + file_path = str(Path(file_path).resolve()) + + if not Path(file_path).exists(): + raise FileNotFoundError(f"Model file not found: {file_path}") + + if num_contexts is None: + num_contexts = self.default_num_contexts + + with self._repo_lock: + # Check if model_id already exists + if model_id in self._model_to_hash and not force_reload: + raise ValueError( + f"Model ID '{model_id}' already exists. " + f"Use force_reload=True to reload or choose a different ID." + ) + + # Unload existing model if force_reload + if model_id in self._model_to_hash and force_reload: + self.unload_model(model_id) + + # Compute file hash for deduplication + print(f"Computing hash for {file_path}...") + file_hash = self.compute_file_hash(file_path) + print(f"File hash: {file_hash[:16]}...") + + # Check if this file is already loaded (deduplication) + if file_hash in self._shared_engines: + shared_engine = self._shared_engines[file_hash] + print(f"Engine already loaded (hash match), reusing engine and context pool...") + print(f" Existing model_ids using this engine: {shared_engine.model_ids}") + else: + # Load new engine + print(f"Loading TensorRT engine from {file_path}...") + engine = self._load_engine(file_path) + + # Extract metadata + metadata = self._extract_metadata(engine, file_path, file_hash) + + # Create shared engine with context pool + shared_engine = SharedEngine( + engine=engine, + file_hash=file_hash, + file_path=file_path, + num_contexts=num_contexts, + device=self.device, + metadata=metadata + ) + self._shared_engines[file_hash] = shared_engine + + # Add this model_id to the shared engine + shared_engine.add_model_id(model_id) + + # Map model_id to file_hash + self._model_to_hash[model_id] = file_hash + + print(f"Model '{model_id}' loaded successfully") + print(f" Inputs: {shared_engine.metadata.input_names}") + for name in shared_engine.metadata.input_names: + print(f" {name}: {shared_engine.metadata.input_shapes[name]} ({shared_engine.metadata.input_dtypes[name]})") + print(f" Outputs: {shared_engine.metadata.output_names}") + for name in shared_engine.metadata.output_names: + print(f" {name}: {shared_engine.metadata.output_shapes[name]} ({shared_engine.metadata.output_dtypes[name]})") + print(f" Context pool size: {num_contexts}") + print(f" Model IDs sharing this engine: {shared_engine.get_reference_count()}") + print(f" Unique engines in VRAM: {len(self._shared_engines)}") + + return shared_engine.metadata + + def infer(self, model_id: str, inputs: Dict[str, torch.Tensor], + synchronize: bool = True, timeout: Optional[float] = 5.0) -> Dict[str, torch.Tensor]: + """ + Run GPU-to-GPU inference with the specified model using context pooling. + + All inputs must be CUDA tensors and outputs will be CUDA tensors (stays in VRAM). + Thread-safe: Borrows an execution context from the pool with mutex locking. + + Args: + model_id: Model identifier + inputs: Dictionary mapping input names to CUDA tensors + synchronize: If True, wait for inference to complete. If False, async execution. + timeout: Max time to wait for available context (seconds) + + Returns: + Dictionary mapping output names to CUDA tensors (in VRAM) + + Raises: + KeyError: If model_id not found + ValueError: If inputs don't match expected shapes or are not on GPU + RuntimeError: If no context available within timeout + """ + # Get shared engine + if model_id not in self._model_to_hash: + raise KeyError(f"Model '{model_id}' not found. Available: {list(self._model_to_hash.keys())}") + + file_hash = self._model_to_hash[model_id] + shared_engine = self._shared_engines[file_hash] + metadata = shared_engine.metadata + + # Validate inputs + for name in metadata.input_names: + if name not in inputs: + raise ValueError(f"Missing required input: {name}") + + tensor = inputs[name] + if not tensor.is_cuda: + raise ValueError(f"Input '{name}' must be a CUDA tensor (on GPU)") + + # Check device + if tensor.device != self.device: + print(f"Warning: Input '{name}' on {tensor.device}, moving to {self.device}") + inputs[name] = tensor.to(self.device) + + # Acquire context from pool (mutex-based) + exec_ctx = shared_engine.acquire_context(timeout=timeout) + if exec_ctx is None: + raise RuntimeError( + f"No execution context available for model '{model_id}' within {timeout}s. " + f"All {shared_engine.num_contexts} contexts are busy." + ) + + try: + # Prepare output tensors + outputs = {} + + # Set input tensors - TensorRT 10.x API + for name in metadata.input_names: + input_tensor = inputs[name].contiguous() + exec_ctx.context.set_tensor_address(name, input_tensor.data_ptr()) + + # Allocate and set output tensors + for name in metadata.output_names: + output_shape = metadata.output_shapes[name] + output_dtype = metadata.output_dtypes[name] + + output_tensor = torch.empty( + output_shape, + dtype=output_dtype, + device=self.device + ) + + outputs[name] = output_tensor + exec_ctx.context.set_tensor_address(name, output_tensor.data_ptr()) + + # Execute inference on context's stream - TensorRT 10.x API + with torch.cuda.stream(exec_ctx.stream): + success = exec_ctx.context.execute_async_v3( + stream_handle=exec_ctx.stream.cuda_stream + ) + + if not success: + raise RuntimeError(f"Inference failed for model '{model_id}'") + + # Synchronize if requested + if synchronize: + exec_ctx.stream.synchronize() + + return outputs + + finally: + # Always release context back to pool + shared_engine.release_context(exec_ctx) + + def infer_batch(self, model_id: str, batch_inputs: List[Dict[str, torch.Tensor]], + synchronize: bool = True) -> List[Dict[str, torch.Tensor]]: + """ + Run inference on multiple inputs. + Contexts are borrowed/returned for each input, enabling parallel processing. + + Args: + model_id: Model identifier + batch_inputs: List of input dictionaries + synchronize: If True, wait for all inferences to complete + + Returns: + List of output dictionaries + """ + results = [] + for inputs in batch_inputs: + outputs = self.infer(model_id, inputs, synchronize=synchronize) + results.append(outputs) + + return results + + def unload_model(self, model_id: str): + """ + Unload a model from the repository. + + Removes the model_id reference from the shared engine. If this was the last + reference, the engine and all its contexts will be fully unloaded from VRAM. + + Args: + model_id: Model identifier to unload + """ + with self._repo_lock: + if model_id not in self._model_to_hash: + print(f"Warning: Model '{model_id}' not found") + return + + file_hash = self._model_to_hash[model_id] + + # Remove model_id from shared engine + if file_hash in self._shared_engines: + shared_engine = self._shared_engines[file_hash] + remaining_refs = shared_engine.remove_model_id(model_id) + + # If no more references, cleanup engine and contexts + if remaining_refs == 0: + shared_engine.cleanup() + del self._shared_engines[file_hash] + print(f"Model '{model_id}' unloaded, engine removed from VRAM (0 references)") + else: + print(f"Model '{model_id}' unloaded, engine kept in VRAM ({remaining_refs} references)") + + # Remove from model_id mapping + del self._model_to_hash[model_id] + + def get_metadata(self, model_id: str) -> Optional[ModelMetadata]: + """ + Get metadata for a loaded model. + + Args: + model_id: Model identifier + + Returns: + ModelMetadata or None if not found + """ + if model_id not in self._model_to_hash: + return None + + file_hash = self._model_to_hash[model_id] + if file_hash not in self._shared_engines: + return None + + return self._shared_engines[file_hash].metadata + + def list_models(self) -> Dict[str, ModelMetadata]: + """ + List all loaded models. + + Returns: + Dictionary mapping model_id to ModelMetadata + """ + with self._repo_lock: + result = {} + for model_id, file_hash in self._model_to_hash.items(): + if file_hash in self._shared_engines: + result[model_id] = self._shared_engines[file_hash].metadata + return result + + def get_model_info(self, model_id: str) -> Optional[Dict[str, Any]]: + """ + Get detailed information about a loaded model. + + Args: + model_id: Model identifier + + Returns: + Dictionary with model information or None if not found + """ + if model_id not in self._model_to_hash: + return None + + file_hash = self._model_to_hash[model_id] + if file_hash not in self._shared_engines: + return None + + shared_engine = self._shared_engines[file_hash] + metadata = shared_engine.metadata + + return { + 'model_id': model_id, + 'file_path': metadata.file_path, + 'file_hash': metadata.file_hash[:16] + '...', + 'engine_references': shared_engine.get_reference_count(), + 'context_pool_size': shared_engine.num_contexts, + 'shared_with_model_ids': list(shared_engine.model_ids), + 'inputs': { + name: { + 'shape': metadata.input_shapes[name], + 'dtype': str(metadata.input_dtypes[name]) + } + for name in metadata.input_names + }, + 'outputs': { + name: { + 'shape': metadata.output_shapes[name], + 'dtype': str(metadata.output_dtypes[name]) + } + for name in metadata.output_names + } + } + + def get_stats(self) -> Dict[str, Any]: + """ + Get repository statistics. + + Returns: + Dictionary with stats about loaded models and memory usage + """ + with self._repo_lock: + total_contexts = sum( + engine.num_contexts + for engine in self._shared_engines.values() + ) + + return { + 'total_model_ids': len(self._model_to_hash), + 'unique_engines': len(self._shared_engines), + 'total_contexts': total_contexts, + 'memory_efficiency': f"{len(self._model_to_hash)} model IDs using only {len(self._shared_engines)} engines", + 'gpu_id': self.gpu_id, + 'models': list(self._model_to_hash.keys()) + } + + def __repr__(self): + with self._repo_lock: + return (f"TensorRTModelRepository(gpu={self.gpu_id}, " + f"model_ids={len(self._model_to_hash)}, " + f"unique_engines={len(self._shared_engines)})") + + def __del__(self): + """Cleanup all models on deletion""" + with self._repo_lock: + model_ids = list(self._model_to_hash.keys()) + for model_id in model_ids: + self.unload_model(model_id) diff --git a/services/stream_decoder.py b/services/stream_decoder.py new file mode 100644 index 0000000..327ab9a --- /dev/null +++ b/services/stream_decoder.py @@ -0,0 +1,481 @@ +import threading +from typing import Optional +from collections import deque +from enum import Enum +import torch +import PyNvVideoCodec as nvc +import av +import numpy as np +from cuda.bindings import driver as cuda_driver +from .jpeg_encoder import encode_frame_to_jpeg + + +def nv12_to_rgb_gpu(nv12_tensor: torch.Tensor, height: int, width: int) -> torch.Tensor: + """ + Convert NV12 format to RGB on GPU using PyTorch operations. + + NV12 format: + - Y plane: height x width (luminance) + - UV plane: (height/2) x width (interleaved U and V, subsampled by 2) + + Total tensor size: (height * 3/2) x width + + Args: + nv12_tensor: Input tensor in NV12 format, shape (H*3/2, W) + height: Original frame height + width: Original frame width + + Returns: + RGB tensor, shape (3, H, W) in range [0, 255] + """ + device = nv12_tensor.device + + # Split Y and UV planes + y_plane = nv12_tensor[:height, :].float() # (H, W) + uv_plane = nv12_tensor[height:, :].float() # (H/2, W) + + # Reshape UV plane to separate U and V channels + # UV is interleaved: U0V0U1V1... we need to deinterleave + uv_plane = uv_plane.reshape(height // 2, width // 2, 2) # (H/2, W/2, 2) + u_plane = uv_plane[:, :, 0] # (H/2, W/2) + v_plane = uv_plane[:, :, 1] # (H/2, W/2) + + # Upsample U and V to full resolution using bilinear interpolation + u_upsampled = torch.nn.functional.interpolate( + u_plane.unsqueeze(0).unsqueeze(0), # (1, 1, H/2, W/2) + size=(height, width), + mode='bilinear', + align_corners=False + ).squeeze(0).squeeze(0) # (H, W) + + v_upsampled = torch.nn.functional.interpolate( + v_plane.unsqueeze(0).unsqueeze(0), # (1, 1, H/2, W/2) + size=(height, width), + mode='bilinear', + align_corners=False + ).squeeze(0).squeeze(0) # (H, W) + + # YUV to RGB conversion using BT.601 standard + # R = Y + 1.402 * (V - 128) + # G = Y - 0.344136 * (U - 128) - 0.714136 * (V - 128) + # B = Y + 1.772 * (U - 128) + + y = y_plane + u = u_upsampled - 128.0 + v = v_upsampled - 128.0 + + r = y + 1.402 * v + g = y - 0.344136 * u - 0.714136 * v + b = y + 1.772 * u + + # Clamp to [0, 255] and convert to uint8 + r = torch.clamp(r, 0, 255).to(torch.uint8) + g = torch.clamp(g, 0, 255).to(torch.uint8) + b = torch.clamp(b, 0, 255).to(torch.uint8) + + # Stack to (3, H, W) + rgb = torch.stack([r, g, b], dim=0) + + return rgb + + +class ConnectionStatus(Enum): + DISCONNECTED = "disconnected" + CONNECTING = "connecting" + CONNECTED = "connected" + ERROR = "error" + RECONNECTING = "reconnecting" + + +class StreamDecoderFactory: + """ + Factory for creating StreamDecoder instances with shared CUDA context. + This minimizes VRAM overhead by sharing the CUDA context across all decoders. + """ + + _instance = None + _lock = threading.Lock() + + def __new__(cls, gpu_id: int = 0): + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = super(StreamDecoderFactory, cls).__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self, gpu_id: int = 0): + if self._initialized: + return + + self.gpu_id = gpu_id + + # Initialize CUDA and get device + err, = cuda_driver.cuInit(0) + if err != cuda_driver.CUresult.CUDA_SUCCESS: + raise RuntimeError(f"Failed to initialize CUDA: {err}") + + # Get CUDA device + err, self.cuda_device = cuda_driver.cuDeviceGet(gpu_id) + if err != cuda_driver.CUresult.CUDA_SUCCESS: + raise RuntimeError(f"Failed to get CUDA device {gpu_id}: {err}") + + # Retain primary context (shared across all decoders) + err, self.cuda_context = cuda_driver.cuDevicePrimaryCtxRetain(self.cuda_device) + if err != cuda_driver.CUresult.CUDA_SUCCESS: + raise RuntimeError(f"Failed to retain CUDA primary context: {err}") + + self._initialized = True + print(f"StreamDecoderFactory initialized with shared CUDA context on GPU {gpu_id}") + + def create_decoder(self, rtsp_url: str, buffer_size: int = 30, + codec: str = "h264") -> 'StreamDecoder': + """ + Create a new StreamDecoder instance with shared CUDA context. + + Args: + rtsp_url: RTSP stream URL + buffer_size: Number of frames to buffer in VRAM + codec: Video codec (h264, hevc, etc.) + + Returns: + StreamDecoder instance + """ + return StreamDecoder( + rtsp_url=rtsp_url, + cuda_context=self.cuda_context, + gpu_id=self.gpu_id, + buffer_size=buffer_size, + codec=codec + ) + + def __del__(self): + """Cleanup shared CUDA context on factory destruction""" + if hasattr(self, 'cuda_device') and hasattr(self, 'gpu_id'): + cuda_driver.cuDevicePrimaryCtxRelease(self.cuda_device) + + +class StreamDecoder: + """ + Decodes RTSP stream using NVDEC and maintains a ring buffer of frames in GPU VRAM. + Thread-safe for concurrent read/write operations. + """ + + def __init__(self, rtsp_url: str, cuda_context, gpu_id: int, + buffer_size: int = 30, codec: str = "h264"): + """ + Initialize StreamDecoder. + + Args: + rtsp_url: RTSP stream URL + cuda_context: Shared CUDA context handle + gpu_id: GPU device ID + buffer_size: Number of frames to keep in ring buffer + codec: Video codec type + """ + self.rtsp_url = rtsp_url + self.cuda_context = cuda_context + self.gpu_id = gpu_id + self.buffer_size = buffer_size + self.codec = codec + + # Connection status + self.status = ConnectionStatus.DISCONNECTED + self._status_lock = threading.Lock() + + # Frame buffer (ring buffer) - stores CUDA device pointers + self.frame_buffer = deque(maxlen=buffer_size) + self._buffer_lock = threading.RLock() + + # Decoder and container instances + self.decoder = None + self.container = None + + # Decode thread + self._decode_thread: Optional[threading.Thread] = None + self._stop_flag = threading.Event() + + # Frame metadata + self.frame_width: Optional[int] = None + self.frame_height: Optional[int] = None + self.frame_count: int = 0 + + def start(self): + """Start the RTSP stream decoding in background thread""" + if self._decode_thread is not None and self._decode_thread.is_alive(): + print(f"Decoder already running for {self.rtsp_url}") + return + + self._stop_flag.clear() + self._decode_thread = threading.Thread(target=self._decode_loop, daemon=True) + self._decode_thread.start() + print(f"Started decoder thread for {self.rtsp_url}") + + def stop(self): + """Stop the decoding thread and cleanup resources""" + self._stop_flag.set() + if self._decode_thread is not None: + self._decode_thread.join(timeout=5.0) + self._cleanup() + print(f"Stopped decoder for {self.rtsp_url}") + + def _set_status(self, status: ConnectionStatus): + """Thread-safe status update""" + with self._status_lock: + self.status = status + + def get_status(self) -> ConnectionStatus: + """Get current connection status""" + with self._status_lock: + return self.status + + def _init_rtsp_connection(self) -> bool: + """Initialize RTSP connection using PyAV + PyNvVideoCodec""" + try: + self._set_status(ConnectionStatus.CONNECTING) + + # Open RTSP stream with PyAV + options = { + 'rtsp_transport': 'tcp', + 'max_delay': '500000', # 500ms + 'rtsp_flags': 'prefer_tcp', + 'timeout': '5000000', # 5 seconds + } + + self.container = av.open(self.rtsp_url, options=options) + + # Get video stream + video_stream = self.container.streams.video[0] + self.frame_width = video_stream.width + self.frame_height = video_stream.height + + print(f"RTSP connected: {self.frame_width}x{self.frame_height}") + + # Map codec name to PyNvVideoCodec codec enum + codec_map = { + 'h264': nvc.cudaVideoCodec.H264, + 'hevc': nvc.cudaVideoCodec.HEVC, + 'h265': nvc.cudaVideoCodec.HEVC, + } + + codec_id = codec_map.get(self.codec.lower(), nvc.cudaVideoCodec.H264) + + # Initialize NVDEC decoder with shared CUDA context + self.decoder = nvc.CreateDecoder( + gpuid=self.gpu_id, + codec=codec_id, + cudacontext=self.cuda_context, + usedevicememory=True + ) + + self._set_status(ConnectionStatus.CONNECTED) + return True + + except Exception as e: + print(f"Failed to connect to RTSP stream {self.rtsp_url}: {e}") + self._set_status(ConnectionStatus.ERROR) + return False + + def _decode_loop(self): + """Main decode loop running in background thread""" + retry_count = 0 + max_retries = 5 + + while not self._stop_flag.is_set(): + # Initialize connection + if not self._init_rtsp_connection(): + retry_count += 1 + if retry_count >= max_retries: + print(f"Max retries reached for {self.rtsp_url}") + self._set_status(ConnectionStatus.ERROR) + break + + self._set_status(ConnectionStatus.RECONNECTING) + self._stop_flag.wait(timeout=2.0) + continue + + retry_count = 0 # Reset on successful connection + + try: + # Decode loop - iterate through packets from PyAV + for packet in self.container.demux(video=0): + if self._stop_flag.is_set(): + break + + if packet.dts is None: + continue + + # Convert packet to numpy array + packet_data = np.frombuffer(bytes(packet), dtype=np.uint8) + + # Create PacketData and pass numpy array pointer + pkt = nvc.PacketData() + pkt.bsl_data = packet_data.ctypes.data + pkt.bsl = len(packet_data) + + # Decode using NVDEC + decoded_frames = self.decoder.Decode(pkt) + + if not decoded_frames: + continue + + # Add frames to ring buffer (thread-safe) + with self._buffer_lock: + for frame in decoded_frames: + self.frame_buffer.append(frame) + self.frame_count += 1 + + except Exception as e: + print(f"Error in decode loop for {self.rtsp_url}: {e}") + self._set_status(ConnectionStatus.RECONNECTING) + self._cleanup() + self._stop_flag.wait(timeout=2.0) + + def _cleanup(self): + """Cleanup resources""" + if self.container: + try: + self.container.close() + except: + pass + self.container = None + + self.decoder = None + + with self._buffer_lock: + self.frame_buffer.clear() + + def get_frame(self, index: int = -1, rgb: bool = True) -> Optional[torch.Tensor]: + """ + Get a frame from the buffer as a CUDA tensor (in VRAM). + + Args: + index: Frame index in buffer (-1 for latest, -2 for second latest, etc.) + rgb: If True, convert NV12 to RGB. If False, return raw NV12 format. + + Returns: + torch.Tensor in CUDA memory (device tensor) or None if buffer empty + - If rgb=True: Shape (3, H, W) in RGB format, dtype uint8 + - If rgb=False: Shape (H*3/2, W) in NV12 format, dtype uint8 + """ + with self._buffer_lock: + if len(self.frame_buffer) == 0: + return None + + try: + decoded_frame = self.frame_buffer[index] + + # Convert DecodedFrame to PyTorch tensor using DLPack (zero-copy) + # This keeps the data in GPU memory + nv12_tensor = torch.from_dlpack(decoded_frame) + + if not rgb: + # Return raw NV12 format + return nv12_tensor + + # Convert NV12 to RGB on GPU + if self.frame_height is None or self.frame_width is None: + print("Frame dimensions not available") + return None + + rgb_tensor = nv12_to_rgb_gpu(nv12_tensor, self.frame_height, self.frame_width) + return rgb_tensor + + except (IndexError, Exception) as e: + print(f"Error getting frame: {e}") + return None + + def get_latest_frame(self, rgb: bool = True) -> Optional[torch.Tensor]: + """ + Get the most recent decoded frame as CUDA tensor. + + Args: + rgb: If True, convert to RGB. If False, return raw NV12. + + Returns: + torch.Tensor on GPU in RGB (3, H, W) or NV12 (H*3/2, W) format + """ + return self.get_frame(-1, rgb=rgb) + + def get_frame_cpu(self, index: int = -1, rgb: bool = True) -> Optional[np.ndarray]: + """ + Get a frame from the buffer and copy it to CPU memory as numpy array. + + Args: + index: Frame index in buffer (-1 for latest, -2 for second latest, etc.) + rgb: If True, convert NV12 to RGB. If False, return raw NV12 format. + + Returns: + numpy.ndarray in CPU memory or None if buffer empty + - If rgb=True: Shape (H, W, 3) in RGB format, dtype uint8 (HWC format for easy display) + - If rgb=False: Shape (H*3/2, W) in NV12 format, dtype uint8 + """ + # Get frame on GPU + gpu_frame = self.get_frame(index=index, rgb=rgb) + + if gpu_frame is None: + return None + + # Transfer from GPU to CPU + cpu_tensor = gpu_frame.cpu() + + # Convert to numpy array + if rgb: + # Convert from (3, H, W) to (H, W, 3) for standard image format + cpu_array = cpu_tensor.permute(1, 2, 0).numpy() + else: + # Keep NV12 format as-is + cpu_array = cpu_tensor.numpy() + + return cpu_array + + def get_latest_frame_cpu(self, rgb: bool = True) -> Optional[np.ndarray]: + """ + Get the most recent decoded frame as CPU numpy array. + + Args: + rgb: If True, convert to RGB. If False, return raw NV12. + + Returns: + numpy.ndarray in CPU memory + - If rgb=True: Shape (H, W, 3) in RGB format, dtype uint8 + - If rgb=False: Shape (H*3/2, W) in NV12 format, dtype uint8 + """ + return self.get_frame_cpu(-1, rgb=rgb) + + def get_buffer_size(self) -> int: + """Get current number of frames in buffer""" + with self._buffer_lock: + return len(self.frame_buffer) + + def is_connected(self) -> bool: + """Check if stream is actively connected""" + return self.get_status() == ConnectionStatus.CONNECTED + + def get_frame_as_jpeg(self, index: int = -1, quality: int = 95) -> Optional[bytes]: + """ + Get a frame from the buffer and encode to JPEG. + + This method: + 1. Gets RGB frame from buffer (stays on GPU) + 2. Encodes to JPEG using nvJPEG (GPU operation via shared encoder) + 3. Transfers JPEG bytes to CPU + 4. Returns bytes for saving to disk + + Args: + index: Frame index in buffer (-1 for latest) + quality: JPEG quality (0-100, default 95) + + Returns: + JPEG encoded bytes or None if frame unavailable + """ + # Get RGB frame (on GPU) + rgb_frame = self.get_frame(index=index, rgb=True) + + # Use the shared JPEG encoder from jpeg_encoder module + return encode_frame_to_jpeg(rgb_frame, quality=quality) + + def __repr__(self): + return (f"StreamDecoder(url={self.rtsp_url}, status={self.status.value}, " + f"buffer={self.get_buffer_size()}/{self.buffer_size}, " + f"frames_decoded={self.frame_count})") diff --git a/test_jpeg_encode.py b/test_jpeg_encode.py new file mode 100755 index 0000000..8e35145 --- /dev/null +++ b/test_jpeg_encode.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python3 +""" +Test script for JPEG encoding with nvImageCodec +Tests GPU-accelerated JPEG encoding from RTSP stream frames +""" + +import argparse +import sys +import time +import os +from pathlib import Path +from dotenv import load_dotenv +from services import StreamDecoderFactory + +# Load environment variables from .env file +load_dotenv() + + +def main(): + parser = argparse.ArgumentParser(description='Test JPEG encoding from RTSP stream') + parser.add_argument( + '--rtsp-url', + type=str, + default=None, + help='RTSP stream URL (defaults to CAMERA_URL_1 from .env)' + ) + parser.add_argument( + '--output-dir', + type=str, + default='./snapshots', + help='Output directory for JPEG files' + ) + parser.add_argument( + '--num-frames', + type=int, + default=10, + help='Number of frames to capture' + ) + parser.add_argument( + '--interval', + type=float, + default=1.0, + help='Interval between captures in seconds' + ) + parser.add_argument( + '--quality', + type=int, + default=95, + help='JPEG quality (0-100)' + ) + parser.add_argument( + '--gpu-id', + type=int, + default=0, + help='GPU device ID' + ) + + args = parser.parse_args() + + # Get RTSP URL from command line or environment + rtsp_url = args.rtsp_url + if not rtsp_url: + rtsp_url = os.getenv('CAMERA_URL_1') + if not rtsp_url: + print("Error: No RTSP URL provided") + print("Please either:") + print(" 1. Use --rtsp-url argument, or") + print(" 2. Add CAMERA_URL_1 to your .env file") + sys.exit(1) + + # Create output directory + output_dir = Path(args.output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + + print("=" * 80) + print("RTSP Stream JPEG Encoding Test") + print("=" * 80) + print(f"RTSP URL: {rtsp_url}") + print(f"Output Directory: {output_dir}") + print(f"Number of Frames: {args.num_frames}") + print(f"Capture Interval: {args.interval}s") + print(f"JPEG Quality: {args.quality}") + print(f"GPU ID: {args.gpu_id}") + print("=" * 80) + print() + + try: + # Initialize factory and decoder + print("[1/3] Initializing StreamDecoderFactory...") + factory = StreamDecoderFactory(gpu_id=args.gpu_id) + print("✓ Factory initialized\n") + + print("[2/3] Creating and starting decoder...") + decoder = factory.create_decoder( + rtsp_url=rtsp_url, + buffer_size=30 + ) + decoder.start() + print("✓ Decoder started\n") + + # Wait for connection + print("[3/3] Waiting for stream to connect...") + max_wait = 10 + for i in range(max_wait): + if decoder.is_connected(): + print("✓ Stream connected\n") + break + time.sleep(1) + print(f" Waiting... {i+1}/{max_wait}s") + else: + print("✗ Failed to connect to stream") + sys.exit(1) + + # Capture frames + print(f"Capturing {args.num_frames} frames...") + print("-" * 80) + + captured = 0 + for i in range(args.num_frames): + # Get frame as JPEG + start_time = time.time() + jpeg_bytes = decoder.get_frame_as_jpeg(quality=args.quality) + encode_time = (time.time() - start_time) * 1000 # ms + + if jpeg_bytes: + # Save to file + filename = output_dir / f"frame_{i:04d}.jpg" + with open(filename, 'wb') as f: + f.write(jpeg_bytes) + + size_kb = len(jpeg_bytes) / 1024 + print(f"[{i+1}/{args.num_frames}] Saved {filename.name} " + f"({size_kb:.1f} KB, encoded in {encode_time:.2f}ms)") + captured += 1 + else: + print(f"[{i+1}/{args.num_frames}] Failed to get frame") + + # Wait before next capture (except for last frame) + if i < args.num_frames - 1: + time.sleep(args.interval) + + print("-" * 80) + + # Summary + print("\n" + "=" * 80) + print("Capture Complete") + print("=" * 80) + print(f"Successfully captured: {captured}/{args.num_frames} frames") + print(f"Output directory: {output_dir.absolute()}") + print("=" * 80) + + except KeyboardInterrupt: + print("\n\n✗ Interrupted by user") + sys.exit(1) + + except Exception as e: + print(f"\n\n✗ Error: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + + finally: + # Cleanup + if 'decoder' in locals(): + print("\nCleaning up...") + decoder.stop() + print("✓ Decoder stopped") + + print("\n✓ Test completed successfully") + sys.exit(0) + + +if __name__ == '__main__': + main() diff --git a/test_model_inference.py b/test_model_inference.py new file mode 100644 index 0000000..ea6d075 --- /dev/null +++ b/test_model_inference.py @@ -0,0 +1,310 @@ +""" +Test script for TensorRT Model Repository with multi-camera inference. + +This demonstrates: +1. Loading the same model for multiple cameras (deduplication) +2. Context pool load balancing +3. GPU-to-GPU inference from RTSP streams +4. Memory efficiency with shared engines +""" + +import time +import torch +from services.model_repository import TensorRTModelRepository +from services.stream_decoder import StreamDecoderFactory + + +def test_multi_camera_inference(): + """ + Simulate multi-camera inference scenario. + + Example: 100 cameras, all using the same YOLOv8 model + - Without pooling: 100 engines + 100 contexts in VRAM + - With pooling: 1 engine + 4 contexts in VRAM (huge savings!) + """ + + # Initialize model repository with context pooling + repo = TensorRTModelRepository(gpu_id=0, default_num_contexts=4) + + # Camera configurations (simulated) + camera_configs = [ + {"id": "camera_1", "rtsp_url": "rtsp://camera1.local/stream"}, + {"id": "camera_2", "rtsp_url": "rtsp://camera2.local/stream"}, + {"id": "camera_3", "rtsp_url": "rtsp://camera3.local/stream"}, + # ... imagine 100 cameras here + ] + + # Load the same model for all cameras + model_file = "models/yolov8n.trt" # Same file for all cameras + + print("=" * 80) + print("LOADING MODELS FOR MULTIPLE CAMERAS") + print("=" * 80) + + for config in camera_configs: + try: + # Each camera gets its own model_id, but shares the same engine! + metadata = repo.load_model( + model_id=config["id"], + file_path=model_file, + num_contexts=4 # 4 contexts shared across all cameras + ) + print(f"\n✓ Loaded model for {config['id']}") + except Exception as e: + print(f"\n✗ Failed to load model for {config['id']}: {e}") + + # Show repository stats + print("\n" + "=" * 80) + print("REPOSITORY STATISTICS") + print("=" * 80) + stats = repo.get_stats() + print(f"Total model IDs: {stats['total_model_ids']}") + print(f"Unique engines in VRAM: {stats['unique_engines']}") + print(f"Total contexts: {stats['total_contexts']}") + print(f"Memory efficiency: {stats['memory_efficiency']}") + + # Get detailed info for one camera + print("\n" + "=" * 80) + print("DETAILED MODEL INFO (camera_1)") + print("=" * 80) + info = repo.get_model_info("camera_1") + if info: + print(f"Model ID: {info['model_id']}") + print(f"File: {info['file_path']}") + print(f"File hash: {info['file_hash']}") + print(f"Engine references: {info['engine_references']}") + print(f"Context pool size: {info['context_pool_size']}") + print(f"Shared with: {info['shared_with_model_ids']}") + print(f"\nInputs:") + for name, spec in info['inputs'].items(): + print(f" {name}: {spec['shape']} ({spec['dtype']})") + print(f"\nOutputs:") + for name, spec in info['outputs'].items(): + print(f" {name}: {spec['shape']} ({spec['dtype']})") + + # Simulate inference from multiple cameras + print("\n" + "=" * 80) + print("RUNNING INFERENCE (GPU-to-GPU)") + print("=" * 80) + + # Create dummy input tensors (simulating frames from cameras) + # In real scenario, these come from StreamDecoder.get_frame() + batch_size = 1 + channels = 3 + height = 640 + width = 640 + + for config in camera_configs: + try: + # Simulate getting frame from camera (already on GPU) + input_tensor = torch.rand( + batch_size, channels, height, width, + dtype=torch.float32, + device='cuda:0' + ) + + # Run inference (stays in GPU) + start = time.time() + outputs = repo.infer( + model_id=config["id"], + inputs={"images": input_tensor}, # Adjust input name based on your model + synchronize=True, + timeout=5.0 + ) + elapsed = (time.time() - start) * 1000 # Convert to ms + + print(f"\n{config['id']}: Inference completed in {elapsed:.2f}ms") + for name, tensor in outputs.items(): + print(f" Output '{name}': {tensor.shape} on {tensor.device}") + + except Exception as e: + print(f"\n{config['id']}: Inference failed: {e}") + + # Cleanup + print("\n" + "=" * 80) + print("CLEANUP") + print("=" * 80) + + for config in camera_configs: + repo.unload_model(config["id"]) + + print("\nAll models unloaded.") + + +def test_rtsp_stream_with_inference(): + """ + Real-world example: Decode RTSP stream and run inference. + Everything stays in GPU memory (zero CPU transfers). + """ + + print("=" * 80) + print("RTSP STREAM + TENSORRT INFERENCE (GPU-to-GPU)") + print("=" * 80) + + # Initialize components + decoder_factory = StreamDecoderFactory(gpu_id=0) + model_repo = TensorRTModelRepository(gpu_id=0, default_num_contexts=4) + + # Setup camera stream + rtsp_url = "rtsp://your-camera-ip/stream" + decoder = decoder_factory.create_decoder(rtsp_url, buffer_size=30) + decoder.start() + + # Load inference model + try: + model_repo.load_model( + model_id="camera_main", + file_path="models/yolov8n.trt" + ) + except FileNotFoundError: + print("\n⚠ Model file not found. Please export your model to TensorRT:") + print(" Example: yolo export model=yolov8n.pt format=engine device=0") + return + + print("\nWaiting for stream to buffer frames...") + time.sleep(3) + + # Process frames + for i in range(10): + # Get frame from decoder (already on GPU) + frame_gpu = decoder.get_latest_frame(rgb=True) # Returns torch.Tensor on CUDA + + if frame_gpu is None: + print(f"Frame {i}: No frame available") + continue + + # Preprocess if needed (stays on GPU) + # For YOLOv8: normalize, resize, etc. + # Example preprocessing (adjust for your model): + frame_gpu = frame_gpu.float() / 255.0 # Normalize to [0, 1] + frame_gpu = frame_gpu.unsqueeze(0) # Add batch dimension: (1, 3, H, W) + + # Run inference (GPU-to-GPU, zero copy) + try: + outputs = model_repo.infer( + model_id="camera_main", + inputs={"images": frame_gpu}, + synchronize=True + ) + + print(f"\nFrame {i}: Inference successful") + for name, tensor in outputs.items(): + print(f" {name}: {tensor.shape} on {tensor.device}") + + # Post-process results (can stay on GPU or move to CPU as needed) + # Example: NMS, bounding box extraction, etc. + + except Exception as e: + print(f"\nFrame {i}: Inference failed: {e}") + + time.sleep(0.1) # Simulate processing interval + + # Cleanup + decoder.stop() + model_repo.unload_model("camera_main") + print("\n✓ Test completed successfully") + + +def test_concurrent_inference(): + """ + Test concurrent inference from multiple threads. + Demonstrates context pool load balancing. + """ + import threading + + print("=" * 80) + print("CONCURRENT INFERENCE TEST (Context Pool Load Balancing)") + print("=" * 80) + + repo = TensorRTModelRepository(gpu_id=0, default_num_contexts=4) + + # Load model + try: + repo.load_model("shared_model", "models/yolov8n.trt", num_contexts=4) + except Exception as e: + print(f"Failed to load model: {e}") + return + + def worker(worker_id: int, num_inferences: int): + """Worker thread performing inference""" + for i in range(num_inferences): + try: + # Create dummy input + input_tensor = torch.rand(1, 3, 640, 640, device='cuda:0', dtype=torch.float32) + + # Acquire context from pool, run inference, release context + outputs = repo.infer( + model_id="shared_model", + inputs={"images": input_tensor}, + timeout=10.0 + ) + + print(f"Worker {worker_id}, Inference {i}: SUCCESS") + + except Exception as e: + print(f"Worker {worker_id}, Inference {i}: FAILED - {e}") + + time.sleep(0.01) # Small delay + + # Launch multiple worker threads (more workers than contexts!) + threads = [] + num_workers = 10 # 10 workers sharing 4 contexts + inferences_per_worker = 5 + + print(f"\nLaunching {num_workers} workers (only 4 contexts available)") + print("Contexts will be borrowed/returned automatically\n") + + start_time = time.time() + + for worker_id in range(num_workers): + t = threading.Thread(target=worker, args=(worker_id, inferences_per_worker)) + threads.append(t) + t.start() + + # Wait for all workers + for t in threads: + t.join() + + elapsed = time.time() - start_time + total_inferences = num_workers * inferences_per_worker + + print(f"\n✓ Completed {total_inferences} inferences in {elapsed:.2f}s") + print(f" Throughput: {total_inferences / elapsed:.2f} inferences/sec") + print(f" With only 4 contexts for {num_workers} workers!") + + repo.unload_model("shared_model") + + +if __name__ == "__main__": + print("\n" + "=" * 80) + print("TENSORRT MODEL REPOSITORY - TEST SUITE") + print("=" * 80) + + # Test 1: Multi-camera model loading + print("\n\nTEST 1: Multi-Camera Model Loading with Deduplication") + print("-" * 80) + try: + test_multi_camera_inference() + except Exception as e: + print(f"Test 1 failed: {e}") + + # Test 2: RTSP stream + inference (commented out by default) + # Uncomment if you have a real RTSP stream + # print("\n\nTEST 2: RTSP Stream + Inference") + # print("-" * 80) + # try: + # test_rtsp_stream_with_inference() + # except Exception as e: + # print(f"Test 2 failed: {e}") + + # Test 3: Concurrent inference + print("\n\nTEST 3: Concurrent Inference with Context Pooling") + print("-" * 80) + try: + test_concurrent_inference() + except Exception as e: + print(f"Test 3 failed: {e}") + + print("\n" + "=" * 80) + print("ALL TESTS COMPLETED") + print("=" * 80) diff --git a/test_multi_stream.py b/test_multi_stream.py new file mode 100755 index 0000000..71bc6b4 --- /dev/null +++ b/test_multi_stream.py @@ -0,0 +1,255 @@ +#!/usr/bin/env python3 +""" +Multi-stream test script to verify CUDA context sharing efficiency. +Tests multiple RTSP streams simultaneously and monitors VRAM usage. +""" + +import argparse +import time +import sys +import subprocess +import os +from pathlib import Path +from dotenv import load_dotenv +from services import StreamDecoderFactory, ConnectionStatus + +# Load environment variables from .env file +load_dotenv() + + +def get_gpu_memory_usage(gpu_id: int = 0) -> int: + """Get current GPU memory usage in MB using nvidia-smi""" + try: + result = subprocess.run( + ['nvidia-smi', '--query-gpu=memory.used', '--format=csv,noheader,nounits', f'--id={gpu_id}'], + capture_output=True, + text=True, + check=True + ) + return int(result.stdout.strip()) + except Exception as e: + print(f"Warning: Could not get GPU memory usage: {e}") + return 0 + + +def main(): + parser = argparse.ArgumentParser(description='Test multi-stream decoding with context sharing') + parser.add_argument( + '--gpu-id', + type=int, + default=0, + help='GPU device ID' + ) + parser.add_argument( + '--duration', + type=int, + default=20, + help='Test duration in seconds' + ) + parser.add_argument( + '--capture-snapshots', + action='store_true', + help='Capture JPEG snapshots during test' + ) + parser.add_argument( + '--output-dir', + type=str, + default='./multi_stream_snapshots', + help='Output directory for snapshots' + ) + + args = parser.parse_args() + + # Load camera URLs from environment + camera_urls = [] + i = 1 + while True: + url = os.getenv(f'CAMERA_URL_{i}') + if url: + camera_urls.append(url) + i += 1 + else: + break + + if not camera_urls: + print("Error: No camera URLs found in .env file") + print("Please add CAMERA_URL_1, CAMERA_URL_2, etc. to your .env file") + sys.exit(1) + + # Create output directory if capturing snapshots + if args.capture_snapshots: + output_dir = Path(args.output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + + print("=" * 80) + print("Multi-Stream RTSP Decoder Test - Context Sharing Verification") + print("=" * 80) + print(f"Number of Streams: {len(camera_urls)}") + print(f"GPU ID: {args.gpu_id}") + print(f"Test Duration: {args.duration} seconds") + print(f"Capture Snapshots: {args.capture_snapshots}") + print("=" * 80) + print() + + try: + # Get baseline GPU memory + print("[Baseline] Measuring initial GPU memory usage...") + baseline_memory = get_gpu_memory_usage(args.gpu_id) + print(f"✓ Baseline VRAM: {baseline_memory} MB\n") + + # Initialize factory (shared CUDA context) + print("[1/4] Initializing StreamDecoderFactory with shared CUDA context...") + factory = StreamDecoderFactory(gpu_id=args.gpu_id) + + factory_memory = get_gpu_memory_usage(args.gpu_id) + factory_overhead = factory_memory - baseline_memory + print(f"✓ Factory initialized") + print(f" VRAM after factory: {factory_memory} MB (+{factory_overhead} MB)\n") + + # Create all decoders + print(f"[2/4] Creating {len(camera_urls)} StreamDecoder instances...") + decoders = [] + for i, url in enumerate(camera_urls): + decoder = factory.create_decoder( + rtsp_url=url, + buffer_size=30, + codec='h264' + ) + decoders.append(decoder) + print(f" ✓ Decoder {i+1} created for camera {url.split('@')[1].split('/')[0]}") + + decoders_memory = get_gpu_memory_usage(args.gpu_id) + decoders_overhead = decoders_memory - factory_memory + print(f"\n VRAM after creating {len(decoders)} decoders: {decoders_memory} MB (+{decoders_overhead} MB)") + print(f" Average per decoder: {decoders_overhead / len(decoders):.1f} MB\n") + + # Start all decoders + print(f"[3/4] Starting all {len(decoders)} decoders...") + for i, decoder in enumerate(decoders): + decoder.start() + print(f" ✓ Decoder {i+1} started") + + started_memory = get_gpu_memory_usage(args.gpu_id) + started_overhead = started_memory - decoders_memory + print(f"\n VRAM after starting decoders: {started_memory} MB (+{started_overhead} MB)") + print(f" Average per running decoder: {started_overhead / len(decoders):.1f} MB\n") + + # Wait for all streams to connect + print("[4/4] Waiting for all streams to connect...") + max_wait = 15 + for wait_time in range(max_wait): + connected = sum(1 for d in decoders if d.is_connected()) + print(f" Connected: {connected}/{len(decoders)} streams", end='\r') + + if connected == len(decoders): + print(f"\n✓ All {len(decoders)} streams connected!\n") + break + + time.sleep(1) + else: + connected = sum(1 for d in decoders if d.is_connected()) + print(f"\n⚠ Only {connected}/{len(decoders)} streams connected after {max_wait}s\n") + + connected_memory = get_gpu_memory_usage(args.gpu_id) + connected_overhead = connected_memory - started_memory + print(f" VRAM after connection: {connected_memory} MB (+{connected_overhead} MB)\n") + + # Monitor streams + print(f"Monitoring streams for {args.duration} seconds...") + print("=" * 80) + print(f"{'Time':<8} {'VRAM':<10} {'Stream 1':<12} {'Stream 2':<12} {'Stream 3':<12} {'Stream 4':<12}") + print("-" * 80) + + start_time = time.time() + snapshot_interval = args.duration // 3 if args.capture_snapshots else 0 + last_snapshot = 0 + + while time.time() - start_time < args.duration: + elapsed = time.time() - start_time + current_memory = get_gpu_memory_usage(args.gpu_id) + + # Get stats for each decoder + stats = [] + for decoder in decoders: + status = decoder.get_status().value[:8] + buffer = decoder.get_buffer_size() + frames = decoder.frame_count + stats.append(f"{status:8s} {buffer:2d}/30 {frames:4d}") + + print(f"{elapsed:6.1f}s {current_memory:6d}MB {stats[0]:<12} {stats[1]:<12} {stats[2]:<12} {stats[3]:<12}") + + # Capture snapshots + if args.capture_snapshots and snapshot_interval > 0: + if elapsed - last_snapshot >= snapshot_interval: + print("\n → Capturing snapshots from all streams...") + for i, decoder in enumerate(decoders): + jpeg_bytes = decoder.get_frame_as_jpeg(quality=85) + if jpeg_bytes: + filename = output_dir / f"camera_{i+1}_t{int(elapsed)}s.jpg" + with open(filename, 'wb') as f: + f.write(jpeg_bytes) + print(f" Saved {filename.name} ({len(jpeg_bytes)/1024:.1f} KB)") + print() + last_snapshot = elapsed + + time.sleep(1) + + print("=" * 80) + + # Final memory analysis + final_memory = get_gpu_memory_usage(args.gpu_id) + total_overhead = final_memory - baseline_memory + + print("\n" + "=" * 80) + print("Memory Usage Analysis") + print("=" * 80) + print(f"Baseline VRAM: {baseline_memory:6d} MB") + print(f"After Factory Init: {factory_memory:6d} MB (+{factory_overhead:4d} MB)") + print(f"After Creating {len(decoders)} Decoders: {decoders_memory:6d} MB (+{decoders_overhead:4d} MB)") + print(f"After Starting Decoders: {started_memory:6d} MB (+{started_overhead:4d} MB)") + print(f"After Connection: {connected_memory:6d} MB (+{connected_overhead:4d} MB)") + print(f"Final (after {args.duration}s): {final_memory:6d} MB (+{total_overhead:4d} MB total)") + print("-" * 80) + print(f"Average VRAM per stream: {total_overhead / len(decoders):6.1f} MB") + print(f"Context sharing efficiency: {'EXCELLENT' if total_overhead < 500 else 'GOOD' if total_overhead < 800 else 'POOR'}") + print("=" * 80) + + # Final stats + print("\nFinal Stream Statistics:") + print("-" * 80) + for i, decoder in enumerate(decoders): + status = decoder.get_status().value + buffer = decoder.get_buffer_size() + frames = decoder.frame_count + fps = frames / args.duration if args.duration > 0 else 0 + print(f"Stream {i+1}: {status:12s} | Buffer: {buffer:2d}/{decoder.buffer_size} | " + f"Frames: {frames:5d} | Avg FPS: {fps:5.2f}") + print("=" * 80) + + except KeyboardInterrupt: + print("\n\n✗ Interrupted by user") + sys.exit(1) + + except Exception as e: + print(f"\n\n✗ Error: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + + finally: + # Cleanup + if 'decoders' in locals(): + print("\nCleaning up...") + for i, decoder in enumerate(decoders): + decoder.stop() + print(f" ✓ Decoder {i+1} stopped") + + cleanup_memory = get_gpu_memory_usage(args.gpu_id) + print(f"\nVRAM after cleanup: {cleanup_memory} MB") + + print("\n✓ Multi-stream test completed successfully") + sys.exit(0) + + +if __name__ == '__main__': + main() diff --git a/test_stream.py b/test_stream.py new file mode 100755 index 0000000..9fc9b30 --- /dev/null +++ b/test_stream.py @@ -0,0 +1,152 @@ +#!/usr/bin/env python3 +""" +CLI test script for StreamDecoder +Tests RTSP stream decoding with NVDEC hardware acceleration +""" + +import argparse +import time +import sys +from services.stream_decoder import StreamDecoderFactory, ConnectionStatus + + +def main(): + parser = argparse.ArgumentParser(description='Test RTSP stream decoder with NVDEC') + parser.add_argument( + '--rtsp-url', + type=str, + required=True, + help='RTSP stream URL (e.g., rtsp://user:pass@host/path)' + ) + parser.add_argument( + '--gpu-id', + type=int, + default=0, + help='GPU device ID' + ) + parser.add_argument( + '--buffer-size', + type=int, + default=30, + help='Frame buffer size' + ) + parser.add_argument( + '--duration', + type=int, + default=30, + help='Test duration in seconds' + ) + parser.add_argument( + '--check-interval', + type=float, + default=1.0, + help='Status check interval in seconds' + ) + + args = parser.parse_args() + + print("=" * 80) + print("RTSP Stream Decoder Test") + print("=" * 80) + print(f"RTSP URL: {args.rtsp_url}") + print(f"GPU ID: {args.gpu_id}") + print(f"Buffer Size: {args.buffer_size} frames") + print(f"Test Duration: {args.duration} seconds") + print("=" * 80) + print() + + try: + # Create factory with shared CUDA context + print("[1/4] Initializing StreamDecoderFactory...") + factory = StreamDecoderFactory(gpu_id=args.gpu_id) + print("✓ Factory initialized with shared CUDA context\n") + + # Create decoder + print("[2/4] Creating StreamDecoder...") + decoder = factory.create_decoder( + rtsp_url=args.rtsp_url, + buffer_size=args.buffer_size, + codec='h264' + ) + print(f"✓ Decoder created: {decoder}\n") + + # Start decoding + print("[3/4] Starting decoder thread...") + decoder.start() + print("✓ Decoder thread started\n") + + # Monitor for specified duration + print(f"[4/4] Monitoring stream for {args.duration} seconds...") + print("-" * 80) + + start_time = time.time() + last_frame_count = 0 + + while time.time() - start_time < args.duration: + time.sleep(args.check_interval) + + # Get status + status = decoder.get_status() + buffer_size = decoder.get_buffer_size() + frame_count = decoder.frame_count + fps = (frame_count - last_frame_count) / args.check_interval + last_frame_count = frame_count + + # Print status + elapsed = time.time() - start_time + print(f"[{elapsed:6.1f}s] Status: {status.value:12s} | " + f"Buffer: {buffer_size:2d}/{args.buffer_size:2d} | " + f"Frames: {frame_count:5d} | " + f"FPS: {fps:5.1f}") + + # Try to get latest frame + if status == ConnectionStatus.CONNECTED: + frame = decoder.get_latest_frame() + if frame is not None: + print(f" Frame shape: {frame.shape}, dtype: {frame.dtype}, " + f"device: {frame.device}") + + # Check for errors + if status == ConnectionStatus.ERROR: + print("\n✗ ERROR: Stream connection failed!") + break + + print("-" * 80) + + # Final statistics + print("\n" + "=" * 80) + print("Test Complete - Final Statistics") + print("=" * 80) + print(f"Total Frames Decoded: {decoder.frame_count}") + print(f"Average FPS: {decoder.frame_count / args.duration:.2f}") + print(f"Final Status: {decoder.get_status().value}") + print(f"Buffer Utilization: {decoder.get_buffer_size()}/{args.buffer_size}") + + if decoder.frame_width and decoder.frame_height: + print(f"Frame Resolution: {decoder.frame_width}x{decoder.frame_height}") + + print("=" * 80) + + except KeyboardInterrupt: + print("\n\n✗ Interrupted by user") + sys.exit(1) + + except Exception as e: + print(f"\n\n✗ Error: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + + finally: + # Cleanup + if 'decoder' in locals(): + print("\nCleaning up...") + decoder.stop() + print("✓ Decoder stopped") + + print("\n✓ Test completed successfully") + sys.exit(0) + + +if __name__ == '__main__': + main() diff --git a/test_vram_process.py b/test_vram_process.py new file mode 100644 index 0000000..7cb8761 --- /dev/null +++ b/test_vram_process.py @@ -0,0 +1,143 @@ +#!/usr/bin/env python3 +""" +VRAM scaling test - measures Python process memory usage for 1, 2, 3, and 4 streams. +""" + +import os +import time +import subprocess +from dotenv import load_dotenv +from services import StreamDecoderFactory + +# Load environment variables from .env file +load_dotenv() + +# Load camera URLs from environment +camera_urls = [] +i = 1 +while True: + url = os.getenv(f'CAMERA_URL_{i}') + if url: + camera_urls.append(url) + i += 1 + else: + break + +if not camera_urls: + print("Error: No camera URLs found in .env file") + print("Please add CAMERA_URL_1, CAMERA_URL_2, etc. to your .env file") + exit(1) + +def get_python_gpu_memory(): + """Get Python process GPU memory usage in MB""" + try: + pid = os.getpid() + result = subprocess.run( + ['nvidia-smi', '--query-compute-apps=pid,used_memory', '--format=csv,noheader,nounits'], + capture_output=True, text=True, check=True + ) + for line in result.stdout.strip().split('\n'): + if line: + parts = line.split(',') + if len(parts) >= 2 and int(parts[0].strip()) == pid: + return int(parts[1].strip()) + return 0 + except: + return 0 + +def test_n_streams(n, wait_time=15): + """Test with n streams""" + print(f"\n{'='*80}") + print(f"Testing with {n} stream(s)") + print('='*80) + + mem_before = get_python_gpu_memory() + print(f"Python process VRAM before: {mem_before} MB") + + # Create factory + factory = StreamDecoderFactory(gpu_id=0) + time.sleep(1) + mem_after_factory = get_python_gpu_memory() + print(f"After factory: {mem_after_factory} MB (+{mem_after_factory - mem_before} MB)") + + # Create decoders + decoders = [] + for i in range(n): + decoder = factory.create_decoder(camera_urls[i], buffer_size=30) + decoders.append(decoder) + + time.sleep(1) + mem_after_create = get_python_gpu_memory() + print(f"After creating {n} decoder(s): {mem_after_create} MB (+{mem_after_create - mem_after_factory} MB)") + + # Start decoders + for decoder in decoders: + decoder.start() + + time.sleep(2) + mem_after_start = get_python_gpu_memory() + print(f"After starting {n} decoder(s): {mem_after_start} MB (+{mem_after_start - mem_after_create} MB)") + + # Wait for connection + print(f"Waiting {wait_time}s for streams to connect and stabilize...") + time.sleep(wait_time) + + # Check connection status + connected = sum(1 for d in decoders if d.is_connected()) + mem_stable = get_python_gpu_memory() + + print(f"Connected: {connected}/{n} streams") + print(f"Python process VRAM (stable): {mem_stable} MB") + + # Get frame stats + for i, decoder in enumerate(decoders): + print(f" Stream {i+1}: {decoder.get_status().value:10s} " + f"Buffer: {decoder.get_buffer_size()}/30 " + f"Frames: {decoder.frame_count}") + + # Cleanup + for decoder in decoders: + decoder.stop() + + time.sleep(2) + mem_after_cleanup = get_python_gpu_memory() + print(f"After cleanup: {mem_after_cleanup} MB") + + return mem_stable + +if __name__ == '__main__': + print("Python VRAM Scaling Test") + print(f"PID: {os.getpid()}") + + baseline = get_python_gpu_memory() + print(f"Baseline Python process VRAM: {baseline} MB\n") + + results = {} + for n in [1, 2, 3, 4]: + mem = test_n_streams(n, wait_time=15) + results[n] = mem + print(f"\n→ {n} stream(s): {mem} MB (process total)") + + # Give time between tests + if n < 4: + print("\nWaiting 5s before next test...") + time.sleep(5) + + # Summary + print("\n" + "="*80) + print("Python Process VRAM Scaling Summary") + print("="*80) + print(f"Baseline: {baseline:4d} MB") + for n in [1, 2, 3, 4]: + total = results[n] + overhead = total - baseline + per_stream = overhead / n if n > 0 else 0 + print(f"{n} stream(s): {total:4d} MB (+{overhead:3d} MB total, {per_stream:5.1f} MB per stream)") + + # Calculate marginal cost + print("\nMarginal cost per additional stream:") + for n in [2, 3, 4]: + marginal = results[n] - results[n-1] + print(f" Stream {n}: +{marginal} MB") + + print("="*80) diff --git a/verify_tensorrt_model.py b/verify_tensorrt_model.py new file mode 100644 index 0000000..f04042f --- /dev/null +++ b/verify_tensorrt_model.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python3 +""" +Quick verification script for TensorRT model +""" + +import torch +from services.model_repository import TensorRTModelRepository + +def verify_model(): + print("=" * 80) + print("TensorRT Model Verification") + print("=" * 80) + + # Initialize repository + repo = TensorRTModelRepository(gpu_id=0, default_num_contexts=2) + + # Load the model + print("\nLoading YOLOv8n TensorRT engine...") + try: + metadata = repo.load_model( + model_id="yolov8n_test", + file_path="models/yolov8n.trt", + num_contexts=2 + ) + print("✓ Model loaded successfully!") + except Exception as e: + print(f"✗ Failed to load model: {e}") + return + + # Get model info + print("\n" + "=" * 80) + print("Model Information") + print("=" * 80) + info = repo.get_model_info("yolov8n_test") + if info: + print(f"Model ID: {info['model_id']}") + print(f"File: {info['file_path']}") + print(f"File hash: {info['file_hash']}") + print(f"\nInputs:") + for name, spec in info['inputs'].items(): + print(f" {name}: {spec['shape']} ({spec['dtype']})") + print(f"\nOutputs:") + for name, spec in info['outputs'].items(): + print(f" {name}: {spec['shape']} ({spec['dtype']})") + + # Run test inference + print("\n" + "=" * 80) + print("Running Test Inference") + print("=" * 80) + + try: + # Create dummy input (simulating a 640x640 image) + input_tensor = torch.rand(1, 3, 640, 640, dtype=torch.float32, device='cuda:0') + print(f"Input tensor: {input_tensor.shape} on {input_tensor.device}") + + # Run inference + outputs = repo.infer( + model_id="yolov8n_test", + inputs={"images": input_tensor}, + synchronize=True + ) + + print("\n✓ Inference successful!") + print("\nOutputs:") + for name, tensor in outputs.items(): + print(f" {name}: {tensor.shape} on {tensor.device} ({tensor.dtype})") + + except Exception as e: + print(f"\n✗ Inference failed: {e}") + import traceback + traceback.print_exc() + + # Cleanup + print("\n" + "=" * 80) + print("Cleanup") + print("=" * 80) + repo.unload_model("yolov8n_test") + print("✓ Model unloaded") + + print("\n" + "=" * 80) + print("Verification Complete!") + print("=" * 80) + +if __name__ == "__main__": + verify_model()