diff --git a/services/stream_decoder.py b/services/stream_decoder.py index a30148f..5b30585 100644 --- a/services/stream_decoder.py +++ b/services/stream_decoder.py @@ -1,12 +1,14 @@ import threading -from typing import Optional, Callable from collections import deque from enum import Enum -import torch -import PyNvVideoCodec as nvc +from typing import Callable, Optional + import av import numpy as np +import PyNvVideoCodec as nvc +import torch from cuda.bindings import driver as cuda_driver + from .jpeg_encoder import encode_frame_to_jpeg @@ -18,6 +20,7 @@ class FrameReference: cloned frame, and tracks when all references are released so the decoder knows when buffer slots can be reused. """ + def __init__(self, rgb_tensor: torch.Tensor, buffer_index: int, decoder): self.rgb_tensor = rgb_tensor # Cloned RGB tensor (one clone per frame) self.buffer_index = buffer_index @@ -75,19 +78,27 @@ def nv12_to_rgb_gpu(nv12_tensor: torch.Tensor, height: int, width: int) -> torch v_plane = uv_plane[:, :, 1] # (H/2, W/2) # Upsample U and V to full resolution using bilinear interpolation - u_upsampled = torch.nn.functional.interpolate( - u_plane.unsqueeze(0).unsqueeze(0), # (1, 1, H/2, W/2) - size=(height, width), - mode='bilinear', - align_corners=False - ).squeeze(0).squeeze(0) # (H, W) + u_upsampled = ( + torch.nn.functional.interpolate( + u_plane.unsqueeze(0).unsqueeze(0), # (1, 1, H/2, W/2) + size=(height, width), + mode="bilinear", + align_corners=False, + ) + .squeeze(0) + .squeeze(0) + ) # (H, W) - v_upsampled = torch.nn.functional.interpolate( - v_plane.unsqueeze(0).unsqueeze(0), # (1, 1, H/2, W/2) - size=(height, width), - mode='bilinear', - align_corners=False - ).squeeze(0).squeeze(0) # (H, W) + v_upsampled = ( + torch.nn.functional.interpolate( + v_plane.unsqueeze(0).unsqueeze(0), # (1, 1, H/2, W/2) + size=(height, width), + mode="bilinear", + align_corners=False, + ) + .squeeze(0) + .squeeze(0) + ) # (H, W) # YUV to RGB conversion using BT.601 standard # R = Y + 1.402 * (V - 128) @@ -145,7 +156,7 @@ class StreamDecoderFactory: self.gpu_id = gpu_id # Initialize CUDA and get device - err, = cuda_driver.cuInit(0) + (err,) = cuda_driver.cuInit(0) if err != cuda_driver.CUresult.CUDA_SUCCESS: raise RuntimeError(f"Failed to initialize CUDA: {err}") @@ -160,10 +171,13 @@ class StreamDecoderFactory: raise RuntimeError(f"Failed to retain CUDA primary context: {err}") self._initialized = True - print(f"StreamDecoderFactory initialized with shared CUDA context on GPU {gpu_id}") + print( + f"StreamDecoderFactory initialized with shared CUDA context on GPU {gpu_id}" + ) - def create_decoder(self, rtsp_url: str, buffer_size: int = 30, - codec: str = "h264") -> 'StreamDecoder': + def create_decoder( + self, rtsp_url: str, buffer_size: int = 30, codec: str = "h264" + ) -> "StreamDecoder": """ Create a new StreamDecoder instance with shared CUDA context. @@ -180,12 +194,12 @@ class StreamDecoderFactory: cuda_context=self.cuda_context, gpu_id=self.gpu_id, buffer_size=buffer_size, - codec=codec + codec=codec, ) def __del__(self): """Cleanup shared CUDA context on factory destruction""" - if hasattr(self, 'cuda_device') and hasattr(self, 'gpu_id'): + if hasattr(self, "cuda_device") and hasattr(self, "gpu_id"): cuda_driver.cuDevicePrimaryCtxRelease(self.cuda_device) @@ -195,8 +209,14 @@ class StreamDecoder: Thread-safe for concurrent read/write operations. """ - def __init__(self, rtsp_url: str, cuda_context, gpu_id: int, - buffer_size: int = 30, codec: str = "h264"): + def __init__( + self, + rtsp_url: str, + cuda_context, + gpu_id: int, + buffer_size: int = 30, + codec: str = "h264", + ): """ Initialize StreamDecoder. @@ -275,7 +295,9 @@ class StreamDecoder: """ with self._buffer_lock: # Remove from in-use tracking - self._in_use_frames = [f for f in self._in_use_frames if f.buffer_index != buffer_index] + self._in_use_frames = [ + f for f in self._in_use_frames if f.buffer_index != buffer_index + ] def start(self): """Start the RTSP stream decoding in background thread""" @@ -313,10 +335,10 @@ class StreamDecoder: # Open RTSP stream with PyAV options = { - 'rtsp_transport': 'tcp', - 'max_delay': '500000', # 500ms - 'rtsp_flags': 'prefer_tcp', - 'timeout': '5000000', # 5 seconds + "rtsp_transport": "tcp", + "max_delay": "500000", # 500ms + "rtsp_flags": "prefer_tcp", + "timeout": "5000000", # 5 seconds } self.container = av.open(self.rtsp_url, options=options) @@ -330,9 +352,9 @@ class StreamDecoder: # Map codec name to PyNvVideoCodec codec enum codec_map = { - 'h264': nvc.cudaVideoCodec.H264, - 'hevc': nvc.cudaVideoCodec.HEVC, - 'h265': nvc.cudaVideoCodec.HEVC, + "h264": nvc.cudaVideoCodec.H264, + "hevc": nvc.cudaVideoCodec.HEVC, + "h265": nvc.cudaVideoCodec.HEVC, } codec_id = codec_map.get(self.codec.lower(), nvc.cudaVideoCodec.H264) @@ -342,7 +364,7 @@ class StreamDecoder: gpuid=self.gpu_id, codec=codec_id, cudacontext=self.cuda_context, - usedevicememory=True + usedevicememory=True, ) self._set_status(ConnectionStatus.CONNECTED) @@ -408,8 +430,13 @@ class StreamDecoder: nv12_tensor = torch.from_dlpack(frame) # Convert NV12 to RGB on GPU - if self.frame_height is not None and self.frame_width is not None: - rgb_tensor = nv12_to_rgb_gpu(nv12_tensor, self.frame_height, self.frame_width) + if ( + self.frame_height is not None + and self.frame_width is not None + ): + rgb_tensor = nv12_to_rgb_gpu( + nv12_tensor, self.frame_height, self.frame_width + ) # CLONE ONCE into our post-decode buffer # This breaks the dependency on PyNvVideoCodec's DecodedFrame @@ -420,7 +447,7 @@ class StreamDecoder: frame_ref = FrameReference( rgb_tensor=rgb_cloned, buffer_index=self._frame_index_counter, - decoder=self + decoder=self, ) self._frame_index_counter += 1 @@ -480,7 +507,9 @@ class StreamDecoder: return None if not rgb: - print("Warning: NV12 format not supported with FrameReference, only RGB") + print( + "Warning: NV12 format not supported with FrameReference, only RGB" + ) return None try: @@ -620,6 +649,8 @@ class StreamDecoder: return encode_frame_to_jpeg(rgb_frame, quality=quality) def __repr__(self): - return (f"StreamDecoder(url={self.rtsp_url}, status={self.status.value}, " - f"buffer={self.get_buffer_size()}/{self.buffer_size}, " - f"frames_decoded={self.frame_count})") + return ( + f"StreamDecoder(url={self.rtsp_url}, status={self.status.value}, " + f"buffer={self.get_buffer_size()}/{self.buffer_size}, " + f"frames_decoded={self.frame_count})" + ) diff --git a/stream_decoder_test.py b/stream_decoder_test.py new file mode 100644 index 0000000..817197b --- /dev/null +++ b/stream_decoder_test.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 +""" +Simple example: Decode 4 RTSP streams and display with OpenCV using callbacks + +Usage: + python stream_decoder_test.py +""" + +import os + +import cv2 +from dotenv import load_dotenv + +from services import StreamDecoderFactory + +load_dotenv() + +# Frame storage for each camera +frames = {1: None, 2: None, 3: None, 4: None} + + +def make_callback(cam_id): + """Create callback for specific camera""" + + def callback(frame_ref): + # Transfer to CPU and convert RGB to BGR + frame = frame_ref.rgb_tensor.cpu().permute(1, 2, 0).numpy() + frames[cam_id] = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) + frame_ref.free() + + return callback + + +# Initialize factory and decoders +factory = StreamDecoderFactory(gpu_id=0) +decoders = [] + +for i in range(1, 5): + url = os.getenv(f"CAMERA_URL_{i}") + decoder = factory.create_decoder(url, buffer_size=5) + decoder.register_frame_callback(make_callback(i)) + decoder.start() + decoders.append(decoder) + print(f"Camera {i} started") + +# Display loop +print("Press 'q' to quit") +while True: + # Show each camera in separate window + for cam_id, frame in frames.items(): + if frame is not None: + cv2.imshow(f"Camera {cam_id}", frame) + + if cv2.waitKey(1) & 0xFF == ord("q"): + break + +# Cleanup +for decoder in decoders: + decoder.stop() +cv2.destroyAllWindows()