From e87ed4c05663876e5b8dbba2262679ab1cd027b1 Mon Sep 17 00:00:00 2001 From: ziesorx Date: Thu, 25 Sep 2025 12:01:32 +0700 Subject: [PATCH] feat: update rtsp scaling plan --- RTSP_SCALING_SOLUTION.md | 382 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 382 insertions(+) create mode 100644 RTSP_SCALING_SOLUTION.md diff --git a/RTSP_SCALING_SOLUTION.md b/RTSP_SCALING_SOLUTION.md new file mode 100644 index 0000000..3fc2fd8 --- /dev/null +++ b/RTSP_SCALING_SOLUTION.md @@ -0,0 +1,382 @@ +# RTSP Stream Scaling Solution Plan + +## Problem Statement +Current implementation fails with 8+ concurrent RTSP streams (1280x720@6fps) due to: +- Python GIL bottleneck limiting true parallelism +- OpenCV/FFMPEG resource contention +- Thread starvation causing frame read failures +- Socket buffer exhaustion dropping UDP packets + +## Selected Solution: Phased Approach + +### Phase 1: Quick Fix - Multiprocessing (8-20 cameras) +**Timeline:** 1-2 days +**Goal:** Immediate fix for current 8 camera deployment + +### Phase 2: Long-term - go2rtc or GStreamer/FFmpeg Proxy (20+ cameras) +**Timeline:** 1-2 weeks +**Goal:** Scalable architecture for future growth + +--- + +## Implementation Checklist + +### Phase 1: Multiprocessing Solution + +#### Core Architecture Changes +- [ ] Create `RTSPProcessManager` class to manage camera processes +- [ ] Implement shared memory for frame passing (using `multiprocessing.shared_memory`) +- [ ] Create `CameraProcess` worker class for individual camera handling +- [ ] Add process pool executor with configurable worker count +- [ ] Implement process health monitoring and auto-restart + +#### Frame Pipeline +- [ ] Replace threading.Thread with multiprocessing.Process for readers +- [ ] Implement zero-copy frame transfer using shared memory buffers +- [ ] Add frame queue with backpressure handling +- [ ] Create frame skipping logic when processing falls behind +- [ ] Add timestamp-based frame dropping (keep only recent frames) + +#### Thread Safety & Synchronization (CRITICAL) +- [ ] Implement `multiprocessing.Lock()` for all shared memory write operations +- [ ] Use `multiprocessing.Queue()` instead of shared lists (thread-safe by design) +- [ ] Replace counters with `multiprocessing.Value()` for atomic operations +- [ ] Implement lock-free ring buffer using `multiprocessing.Array()` for frames +- [ ] Use `multiprocessing.Manager()` for complex shared objects (dicts, lists) +- [ ] Add memory barriers for CPU cache coherency +- [ ] Create read-write locks for frame buffers (multiple readers, single writer) +- [ ] Implement semaphores for limiting concurrent RTSP connections +- [ ] Add process-safe logging with `QueueHandler` and `QueueListener` +- [ ] Use `multiprocessing.Condition()` for frame-ready notifications +- [ ] Implement deadlock detection and recovery mechanism +- [ ] Add timeout on all lock acquisitions to prevent hanging +- [ ] Create lock hierarchy documentation to prevent deadlocks +- [ ] Implement lock-free data structures where possible (SPSC queues) +- [ ] Add memory fencing for shared memory access patterns + +#### Resource Management +- [ ] Set process CPU affinity for better cache utilization +- [ ] Implement memory pool for frame buffers (prevent allocation overhead) +- [ ] Add configurable process limits based on CPU cores +- [ ] Create graceful shutdown mechanism for all processes +- [ ] Add resource monitoring (CPU, memory per process) + +#### Configuration Updates +- [ ] Add `max_processes` config parameter (default: CPU cores - 2) +- [ ] Add `frames_per_second_limit` for frame skipping +- [ ] Add `frame_queue_size` parameter +- [ ] Add `process_restart_threshold` for failure recovery +- [ ] Update Docker container to handle multiprocessing + +#### Error Handling +- [ ] Implement process crash detection and recovery +- [ ] Add exponential backoff for process restarts +- [ ] Create dead process cleanup mechanism +- [ ] Add logging aggregation from multiple processes +- [ ] Implement shared error counter with thresholds + +#### Testing +- [ ] Test with 8 cameras simultaneously +- [ ] Verify frame rate stability under load +- [ ] Test process crash recovery +- [ ] Measure CPU and memory usage +- [ ] Load test with 15-20 cameras + +--- + +### Phase 2: go2rtc or GStreamer/FFmpeg Proxy Solution + +#### Option A: go2rtc Integration (Recommended) +- [ ] Deploy go2rtc as separate service container +- [ ] Configure go2rtc streams.yaml for all cameras +- [ ] Implement Python client to consume go2rtc WebRTC/HLS streams +- [ ] Add automatic camera discovery and registration +- [ ] Create health monitoring for go2rtc service + +#### Option B: Custom Proxy Service +- [ ] Create standalone RTSP proxy service +- [ ] Implement GStreamer pipeline for multiple RTSP inputs +- [ ] Add hardware acceleration detection (NVDEC, VAAPI) +- [ ] Create shared memory or socket output for frames +- [ ] Implement dynamic stream addition/removal API + +#### Integration Layer +- [ ] Create Python client for proxy service +- [ ] Implement frame receiver from proxy +- [ ] Add stream control commands (start/stop/restart) +- [ ] Create fallback to multiprocessing if proxy fails +- [ ] Add proxy health monitoring + +#### Performance Optimization +- [ ] Implement hardware decoder auto-detection +- [ ] Add adaptive bitrate handling +- [ ] Create intelligent frame dropping at source +- [ ] Add network buffer tuning +- [ ] Implement zero-copy frame pipeline + +#### Deployment +- [ ] Create Docker container for proxy service +- [ ] Add Kubernetes deployment configs +- [ ] Create service mesh for multi-instance scaling +- [ ] Add load balancer for camera distribution +- [ ] Implement monitoring and alerting + +--- + +## Quick Wins (Implement Immediately) + +### Network Optimizations +- [ ] Increase system socket buffer sizes: + ```bash + sysctl -w net.core.rmem_default=2097152 + sysctl -w net.core.rmem_max=8388608 + ``` +- [ ] Increase file descriptor limits: + ```bash + ulimit -n 65535 + ``` +- [ ] Add to Docker compose: + ```yaml + ulimits: + nofile: + soft: 65535 + hard: 65535 + ``` + +### Code Optimizations +- [ ] Fix RTSP TCP transport bug in readers.py +- [ ] Increase error threshold to 30 (already done) +- [ ] Add frame timestamp checking to skip old frames +- [ ] Implement connection pooling for RTSP streams +- [ ] Add configurable frame skip interval + +### Monitoring +- [ ] Add metrics for frames processed/dropped per camera +- [ ] Log queue sizes and processing delays +- [ ] Track FFMPEG/OpenCV resource usage +- [ ] Create dashboard for stream health monitoring + +--- + +## Performance Targets + +### Phase 1 (Multiprocessing) +- Support: 15-20 cameras +- Frame rate: Stable 5-6 fps per camera +- CPU usage: < 80% on 8-core system +- Memory: < 2GB total +- Latency: < 200ms frame-to-detection + +### Phase 2 (GStreamer) +- Support: 50+ cameras (100+ with HW acceleration) +- Frame rate: Full 6 fps per camera +- CPU usage: < 50% on 8-core system +- Memory: < 1GB for proxy + workers +- Latency: < 100ms frame-to-detection + +--- + +## Risk Mitigation + +### Known Risks +1. **Race Conditions** - Multiple processes writing to same memory location + - *Mitigation*: Strict locking protocol, atomic operations only +2. **Deadlocks** - Circular lock dependencies between processes + - *Mitigation*: Lock ordering, timeouts, deadlock detection +3. **Frame Corruption** - Partial writes to shared memory during reads + - *Mitigation*: Double buffering, memory barriers, atomic swaps +4. **Memory Coherency** - CPU cache inconsistencies between cores + - *Mitigation*: Memory fencing, volatile markers, cache line padding +5. **Lock Contention** - Too many processes waiting for same lock + - *Mitigation*: Fine-grained locks, lock-free structures, sharding +6. **Multiprocessing overhead** - Monitor shared memory performance +7. **Memory leaks** - Implement proper cleanup and monitoring +8. **Network bandwidth** - Add bandwidth monitoring and alerts +9. **Hardware limitations** - Profile and set realistic limits + +### Fallback Strategy +- Keep current threading implementation as fallback +- Implement feature flag to switch between implementations +- Add automatic fallback on repeated failures +- Maintain backwards compatibility with existing API + +--- + +## Success Criteria + +### Phase 1 Complete When: +- [x] All 8 cameras run simultaneously without frame read failures +- [ ] System stable for 24+ hours continuous operation +- [ ] CPU usage remains below 80% +- [ ] No memory leaks detected +- [ ] Frame processing latency < 200ms + +### Phase 2 Complete When: +- [ ] Successfully handling 20+ cameras +- [ ] Hardware acceleration working (if available) +- [ ] Proxy service stable and monitored +- [ ] Automatic scaling implemented +- [ ] Full production deployment complete + +--- + +## Thread Safety Implementation Details + +### Critical Sections Requiring Synchronization + +#### 1. Frame Buffer Access +```python +# UNSAFE - Race condition +shared_frames[camera_id] = new_frame # Multiple writers + +# SAFE - With proper locking +with frame_locks[camera_id]: + # Double buffer swap to avoid corruption + write_buffer = frame_buffers[camera_id]['write'] + write_buffer[:] = new_frame + # Atomic swap of buffer pointers + frame_buffers[camera_id]['write'], frame_buffers[camera_id]['read'] = \ + frame_buffers[camera_id]['read'], frame_buffers[camera_id]['write'] +``` + +#### 2. Statistics/Counters +```python +# UNSAFE +frame_count += 1 # Not atomic + +# SAFE +with frame_count.get_lock(): + frame_count.value += 1 +# OR use atomic Value +frame_count = multiprocessing.Value('i', 0) # Atomic integer +``` + +#### 3. Queue Operations +```python +# SAFE - multiprocessing.Queue is thread-safe +frame_queue = multiprocessing.Queue(maxsize=100) +# Put with timeout to avoid blocking +try: + frame_queue.put(frame, timeout=0.1) +except queue.Full: + # Handle backpressure + pass +``` + +#### 4. Shared Memory Layout +```python +# Define memory structure with proper alignment +class FrameBuffer: + def __init__(self, camera_id, width=1280, height=720): + # Align to cache line boundary (64 bytes) + self.lock = multiprocessing.Lock() + + # Double buffering for lock-free reads + buffer_size = width * height * 3 # RGB + self.buffer_a = multiprocessing.Array('B', buffer_size) + self.buffer_b = multiprocessing.Array('B', buffer_size) + + # Atomic pointer to current read buffer (0 or 1) + self.read_buffer_idx = multiprocessing.Value('i', 0) + + # Metadata (atomic access) + self.timestamp = multiprocessing.Value('d', 0.0) + self.frame_number = multiprocessing.Value('L', 0) +``` + +### Lock-Free Patterns + +#### Single Producer, Single Consumer (SPSC) Queue +```python +# Lock-free for one writer, one reader +class SPSCQueue: + def __init__(self, size): + self.buffer = multiprocessing.Array('i', size) + self.head = multiprocessing.Value('L', 0) # Writer position + self.tail = multiprocessing.Value('L', 0) # Reader position + self.size = size + + def put(self, item): + next_head = (self.head.value + 1) % self.size + if next_head == self.tail.value: + return False # Queue full + self.buffer[self.head.value] = item + self.head.value = next_head # Atomic update + return True +``` + +### Memory Barrier Considerations +```python +import ctypes + +# Ensure memory visibility across CPU cores +def memory_fence(): + # Force CPU cache synchronization + ctypes.CDLL(None).sched_yield() # Linux/Unix + # OR use threading.Barrier for synchronization points +``` + +### Deadlock Prevention Strategy + +#### Lock Ordering Protocol +```python +# Define strict lock acquisition order +LOCK_ORDER = { + 'frame_buffer': 1, + 'statistics': 2, + 'queue': 3, + 'config': 4 +} + +# Always acquire locks in ascending order +def safe_multi_lock(locks): + sorted_locks = sorted(locks, key=lambda x: LOCK_ORDER[x.name]) + for lock in sorted_locks: + lock.acquire(timeout=5.0) # Timeout prevents hanging +``` + +#### Monitoring & Detection +```python +# Deadlock detector +def detect_deadlocks(): + import threading + for thread in threading.enumerate(): + if thread.is_alive(): + frame = sys._current_frames().get(thread.ident) + if frame and 'acquire' in str(frame): + logger.warning(f"Potential deadlock: {thread.name}") +``` + +--- + +## Notes + +### Current Bottlenecks (Must Address) +- Python GIL preventing parallel frame reading +- FFMPEG internal buffer management +- Thread context switching overhead +- Socket receive buffer too small for 8 streams +- **Thread safety in shared memory access** (CRITICAL) + +### Key Insights +- Don't need every frame - intelligent dropping is acceptable +- Hardware acceleration is crucial for 50+ cameras +- Process isolation prevents cascade failures +- Shared memory faster than queues for large frames + +### Dependencies to Add +```txt +# requirements.txt additions +psutil>=5.9.0 # Process monitoring +py-cpuinfo>=9.0.0 # CPU detection +shared-memory-dict>=0.7.2 # Shared memory utils +multiprocess>=0.70.14 # Better multiprocessing with dill +atomicwrites>=1.4.0 # Atomic file operations +portalocker>=2.7.0 # Cross-platform file locking +``` + +--- + +**Last Updated:** 2025-09-25 +**Priority:** CRITICAL - Production deployment blocked +**Owner:** Engineering Team \ No newline at end of file