# RTSP Stream Scaling Solution Plan ## Problem Statement Current implementation fails with 8+ concurrent RTSP streams (1280x720@6fps) due to: - Python GIL bottleneck limiting true parallelism - OpenCV/FFMPEG resource contention - Thread starvation causing frame read failures - Socket buffer exhaustion dropping UDP packets ## Selected Solution: Phased Approach ### Phase 1: Quick Fix - Multiprocessing (8-20 cameras) **Timeline:** 1-2 days **Goal:** Immediate fix for current 8 camera deployment ### Phase 2: Long-term - go2rtc or GStreamer/FFmpeg Proxy (20+ cameras) **Timeline:** 1-2 weeks **Goal:** Scalable architecture for future growth --- ## Implementation Checklist ### Phase 1: Multiprocessing Solution #### Core Architecture Changes - [ ] Create `RTSPProcessManager` class to manage camera processes - [ ] Implement shared memory for frame passing (using `multiprocessing.shared_memory`) - [ ] Create `CameraProcess` worker class for individual camera handling - [ ] Add process pool executor with configurable worker count - [ ] Implement process health monitoring and auto-restart #### Frame Pipeline - [ ] Replace threading.Thread with multiprocessing.Process for readers - [ ] Implement zero-copy frame transfer using shared memory buffers - [ ] Add frame queue with backpressure handling - [ ] Create frame skipping logic when processing falls behind - [ ] Add timestamp-based frame dropping (keep only recent frames) #### Thread Safety & Synchronization (CRITICAL) - [ ] Implement `multiprocessing.Lock()` for all shared memory write operations - [ ] Use `multiprocessing.Queue()` instead of shared lists (thread-safe by design) - [ ] Replace counters with `multiprocessing.Value()` for atomic operations - [ ] Implement lock-free ring buffer using `multiprocessing.Array()` for frames - [ ] Use `multiprocessing.Manager()` for complex shared objects (dicts, lists) - [ ] Add memory barriers for CPU cache coherency - [ ] Create read-write locks for frame buffers (multiple readers, single writer) - [ ] Implement semaphores for limiting concurrent RTSP connections - [ ] Add process-safe logging with `QueueHandler` and `QueueListener` - [ ] Use `multiprocessing.Condition()` for frame-ready notifications - [ ] Implement deadlock detection and recovery mechanism - [ ] Add timeout on all lock acquisitions to prevent hanging - [ ] Create lock hierarchy documentation to prevent deadlocks - [ ] Implement lock-free data structures where possible (SPSC queues) - [ ] Add memory fencing for shared memory access patterns #### Resource Management - [ ] Set process CPU affinity for better cache utilization - [ ] Implement memory pool for frame buffers (prevent allocation overhead) - [ ] Add configurable process limits based on CPU cores - [ ] Create graceful shutdown mechanism for all processes - [ ] Add resource monitoring (CPU, memory per process) #### Configuration Updates - [ ] Add `max_processes` config parameter (default: CPU cores - 2) - [ ] Add `frames_per_second_limit` for frame skipping - [ ] Add `frame_queue_size` parameter - [ ] Add `process_restart_threshold` for failure recovery - [ ] Update Docker container to handle multiprocessing #### Error Handling - [ ] Implement process crash detection and recovery - [ ] Add exponential backoff for process restarts - [ ] Create dead process cleanup mechanism - [ ] Add logging aggregation from multiple processes - [ ] Implement shared error counter with thresholds #### Testing - [ ] Test with 8 cameras simultaneously - [ ] Verify frame rate stability under load - [ ] Test process crash recovery - [ ] Measure CPU and memory usage - [ ] Load test with 15-20 cameras --- ### Phase 2: go2rtc or GStreamer/FFmpeg Proxy Solution #### Option A: go2rtc Integration (Recommended) - [ ] Deploy go2rtc as separate service container - [ ] Configure go2rtc streams.yaml for all cameras - [ ] Implement Python client to consume go2rtc WebRTC/HLS streams - [ ] Add automatic camera discovery and registration - [ ] Create health monitoring for go2rtc service #### Option B: Custom Proxy Service - [ ] Create standalone RTSP proxy service - [ ] Implement GStreamer pipeline for multiple RTSP inputs - [ ] Add hardware acceleration detection (NVDEC, VAAPI) - [ ] Create shared memory or socket output for frames - [ ] Implement dynamic stream addition/removal API #### Integration Layer - [ ] Create Python client for proxy service - [ ] Implement frame receiver from proxy - [ ] Add stream control commands (start/stop/restart) - [ ] Create fallback to multiprocessing if proxy fails - [ ] Add proxy health monitoring #### Performance Optimization - [ ] Implement hardware decoder auto-detection - [ ] Add adaptive bitrate handling - [ ] Create intelligent frame dropping at source - [ ] Add network buffer tuning - [ ] Implement zero-copy frame pipeline #### Deployment - [ ] Create Docker container for proxy service - [ ] Add Kubernetes deployment configs - [ ] Create service mesh for multi-instance scaling - [ ] Add load balancer for camera distribution - [ ] Implement monitoring and alerting --- ## Quick Wins (Implement Immediately) ### Network Optimizations - [ ] Increase system socket buffer sizes: ```bash sysctl -w net.core.rmem_default=2097152 sysctl -w net.core.rmem_max=8388608 ``` - [ ] Increase file descriptor limits: ```bash ulimit -n 65535 ``` - [ ] Add to Docker compose: ```yaml ulimits: nofile: soft: 65535 hard: 65535 ``` ### Code Optimizations - [ ] Fix RTSP TCP transport bug in readers.py - [ ] Increase error threshold to 30 (already done) - [ ] Add frame timestamp checking to skip old frames - [ ] Implement connection pooling for RTSP streams - [ ] Add configurable frame skip interval ### Monitoring - [ ] Add metrics for frames processed/dropped per camera - [ ] Log queue sizes and processing delays - [ ] Track FFMPEG/OpenCV resource usage - [ ] Create dashboard for stream health monitoring --- ## Performance Targets ### Phase 1 (Multiprocessing) - Support: 15-20 cameras - Frame rate: Stable 5-6 fps per camera - CPU usage: < 80% on 8-core system - Memory: < 2GB total - Latency: < 200ms frame-to-detection ### Phase 2 (GStreamer) - Support: 50+ cameras (100+ with HW acceleration) - Frame rate: Full 6 fps per camera - CPU usage: < 50% on 8-core system - Memory: < 1GB for proxy + workers - Latency: < 100ms frame-to-detection --- ## Risk Mitigation ### Known Risks 1. **Race Conditions** - Multiple processes writing to same memory location - *Mitigation*: Strict locking protocol, atomic operations only 2. **Deadlocks** - Circular lock dependencies between processes - *Mitigation*: Lock ordering, timeouts, deadlock detection 3. **Frame Corruption** - Partial writes to shared memory during reads - *Mitigation*: Double buffering, memory barriers, atomic swaps 4. **Memory Coherency** - CPU cache inconsistencies between cores - *Mitigation*: Memory fencing, volatile markers, cache line padding 5. **Lock Contention** - Too many processes waiting for same lock - *Mitigation*: Fine-grained locks, lock-free structures, sharding 6. **Multiprocessing overhead** - Monitor shared memory performance 7. **Memory leaks** - Implement proper cleanup and monitoring 8. **Network bandwidth** - Add bandwidth monitoring and alerts 9. **Hardware limitations** - Profile and set realistic limits ### Fallback Strategy - Keep current threading implementation as fallback - Implement feature flag to switch between implementations - Add automatic fallback on repeated failures - Maintain backwards compatibility with existing API --- ## Success Criteria ### Phase 1 Complete When: - [x] All 8 cameras run simultaneously without frame read failures - [ ] System stable for 24+ hours continuous operation - [ ] CPU usage remains below 80% - [ ] No memory leaks detected - [ ] Frame processing latency < 200ms ### Phase 2 Complete When: - [ ] Successfully handling 20+ cameras - [ ] Hardware acceleration working (if available) - [ ] Proxy service stable and monitored - [ ] Automatic scaling implemented - [ ] Full production deployment complete --- ## Thread Safety Implementation Details ### Critical Sections Requiring Synchronization #### 1. Frame Buffer Access ```python # UNSAFE - Race condition shared_frames[camera_id] = new_frame # Multiple writers # SAFE - With proper locking with frame_locks[camera_id]: # Double buffer swap to avoid corruption write_buffer = frame_buffers[camera_id]['write'] write_buffer[:] = new_frame # Atomic swap of buffer pointers frame_buffers[camera_id]['write'], frame_buffers[camera_id]['read'] = \ frame_buffers[camera_id]['read'], frame_buffers[camera_id]['write'] ``` #### 2. Statistics/Counters ```python # UNSAFE frame_count += 1 # Not atomic # SAFE with frame_count.get_lock(): frame_count.value += 1 # OR use atomic Value frame_count = multiprocessing.Value('i', 0) # Atomic integer ``` #### 3. Queue Operations ```python # SAFE - multiprocessing.Queue is thread-safe frame_queue = multiprocessing.Queue(maxsize=100) # Put with timeout to avoid blocking try: frame_queue.put(frame, timeout=0.1) except queue.Full: # Handle backpressure pass ``` #### 4. Shared Memory Layout ```python # Define memory structure with proper alignment class FrameBuffer: def __init__(self, camera_id, width=1280, height=720): # Align to cache line boundary (64 bytes) self.lock = multiprocessing.Lock() # Double buffering for lock-free reads buffer_size = width * height * 3 # RGB self.buffer_a = multiprocessing.Array('B', buffer_size) self.buffer_b = multiprocessing.Array('B', buffer_size) # Atomic pointer to current read buffer (0 or 1) self.read_buffer_idx = multiprocessing.Value('i', 0) # Metadata (atomic access) self.timestamp = multiprocessing.Value('d', 0.0) self.frame_number = multiprocessing.Value('L', 0) ``` ### Lock-Free Patterns #### Single Producer, Single Consumer (SPSC) Queue ```python # Lock-free for one writer, one reader class SPSCQueue: def __init__(self, size): self.buffer = multiprocessing.Array('i', size) self.head = multiprocessing.Value('L', 0) # Writer position self.tail = multiprocessing.Value('L', 0) # Reader position self.size = size def put(self, item): next_head = (self.head.value + 1) % self.size if next_head == self.tail.value: return False # Queue full self.buffer[self.head.value] = item self.head.value = next_head # Atomic update return True ``` ### Memory Barrier Considerations ```python import ctypes # Ensure memory visibility across CPU cores def memory_fence(): # Force CPU cache synchronization ctypes.CDLL(None).sched_yield() # Linux/Unix # OR use threading.Barrier for synchronization points ``` ### Deadlock Prevention Strategy #### Lock Ordering Protocol ```python # Define strict lock acquisition order LOCK_ORDER = { 'frame_buffer': 1, 'statistics': 2, 'queue': 3, 'config': 4 } # Always acquire locks in ascending order def safe_multi_lock(locks): sorted_locks = sorted(locks, key=lambda x: LOCK_ORDER[x.name]) for lock in sorted_locks: lock.acquire(timeout=5.0) # Timeout prevents hanging ``` #### Monitoring & Detection ```python # Deadlock detector def detect_deadlocks(): import threading for thread in threading.enumerate(): if thread.is_alive(): frame = sys._current_frames().get(thread.ident) if frame and 'acquire' in str(frame): logger.warning(f"Potential deadlock: {thread.name}") ``` --- ## Notes ### Current Bottlenecks (Must Address) - Python GIL preventing parallel frame reading - FFMPEG internal buffer management - Thread context switching overhead - Socket receive buffer too small for 8 streams - **Thread safety in shared memory access** (CRITICAL) ### Key Insights - Don't need every frame - intelligent dropping is acceptable - Hardware acceleration is crucial for 50+ cameras - Process isolation prevents cascade failures - Shared memory faster than queues for large frames ### Dependencies to Add ```txt # requirements.txt additions psutil>=5.9.0 # Process monitoring py-cpuinfo>=9.0.0 # CPU detection shared-memory-dict>=0.7.2 # Shared memory utils multiprocess>=0.70.14 # Better multiprocessing with dill atomicwrites>=1.4.0 # Atomic file operations portalocker>=2.7.0 # Cross-platform file locking ``` --- **Last Updated:** 2025-09-25 **Priority:** CRITICAL - Production deployment blocked **Owner:** Engineering Team