#!/usr/bin/env python3 """ Test script for JPEG encoding with nvImageCodec Tests GPU-accelerated JPEG encoding from RTSP stream frames """ import argparse import sys import time import os from pathlib import Path from dotenv import load_dotenv from services import StreamDecoderFactory # Load environment variables from .env file load_dotenv() def main(): parser = argparse.ArgumentParser(description='Test JPEG encoding from RTSP stream') parser.add_argument( '--rtsp-url', type=str, default=None, help='RTSP stream URL (defaults to CAMERA_URL_1 from .env)' ) parser.add_argument( '--output-dir', type=str, default='./snapshots', help='Output directory for JPEG files' ) parser.add_argument( '--num-frames', type=int, default=10, help='Number of frames to capture' ) parser.add_argument( '--interval', type=float, default=1.0, help='Interval between captures in seconds' ) parser.add_argument( '--quality', type=int, default=95, help='JPEG quality (0-100)' ) parser.add_argument( '--gpu-id', type=int, default=0, help='GPU device ID' ) args = parser.parse_args() # Get RTSP URL from command line or environment rtsp_url = args.rtsp_url if not rtsp_url: rtsp_url = os.getenv('CAMERA_URL_1') if not rtsp_url: print("Error: No RTSP URL provided") print("Please either:") print(" 1. Use --rtsp-url argument, or") print(" 2. Add CAMERA_URL_1 to your .env file") sys.exit(1) # Create output directory output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) print("=" * 80) print("RTSP Stream JPEG Encoding Test") print("=" * 80) print(f"RTSP URL: {rtsp_url}") print(f"Output Directory: {output_dir}") print(f"Number of Frames: {args.num_frames}") print(f"Capture Interval: {args.interval}s") print(f"JPEG Quality: {args.quality}") print(f"GPU ID: {args.gpu_id}") print("=" * 80) print() try: # Initialize factory and decoder print("[1/3] Initializing StreamDecoderFactory...") factory = StreamDecoderFactory(gpu_id=args.gpu_id) print("✓ Factory initialized\n") print("[2/3] Creating and starting decoder...") decoder = factory.create_decoder( rtsp_url=rtsp_url, buffer_size=30 ) decoder.start() print("✓ Decoder started\n") # Wait for connection print("[3/3] Waiting for stream to connect...") max_wait = 10 for i in range(max_wait): if decoder.is_connected(): print("✓ Stream connected\n") break time.sleep(1) print(f" Waiting... {i+1}/{max_wait}s") else: print("✗ Failed to connect to stream") sys.exit(1) # Capture frames print(f"Capturing {args.num_frames} frames...") print("-" * 80) captured = 0 for i in range(args.num_frames): # Get frame as JPEG start_time = time.time() jpeg_bytes = decoder.get_frame_as_jpeg(quality=args.quality) encode_time = (time.time() - start_time) * 1000 # ms if jpeg_bytes: # Save to file filename = output_dir / f"frame_{i:04d}.jpg" with open(filename, 'wb') as f: f.write(jpeg_bytes) size_kb = len(jpeg_bytes) / 1024 print(f"[{i+1}/{args.num_frames}] Saved {filename.name} " f"({size_kb:.1f} KB, encoded in {encode_time:.2f}ms)") captured += 1 else: print(f"[{i+1}/{args.num_frames}] Failed to get frame") # Wait before next capture (except for last frame) if i < args.num_frames - 1: time.sleep(args.interval) print("-" * 80) # Summary print("\n" + "=" * 80) print("Capture Complete") print("=" * 80) print(f"Successfully captured: {captured}/{args.num_frames} frames") print(f"Output directory: {output_dir.absolute()}") print("=" * 80) except KeyboardInterrupt: print("\n\n✗ Interrupted by user") sys.exit(1) except Exception as e: print(f"\n\n✗ Error: {e}") import traceback traceback.print_exc() sys.exit(1) finally: # Cleanup if 'decoder' in locals(): print("\nCleaning up...") decoder.stop() print("✓ Decoder stopped") print("\n✓ Test completed successfully") sys.exit(0) if __name__ == '__main__': main()