13 KiB
Session-Isolated Multiprocessing Architecture - Implementation Plan
🎯 Objective
Eliminate shared state issues causing identical results across different sessions by implementing Process-Per-Session architecture with per-camera logging.
🔍 Root Cause Analysis
Current Shared State Issues:
- Shared Model Cache (
core/models/inference.py:40
): All sessions share same cached YOLO model instances - Single Pipeline Instance (
core/detection/pipeline.py
): One pipeline handles all sessions with shared mappings - Global Session Mappings:
session_to_subscription
andsession_processing_results
dictionaries - Shared Thread Pool: Single
ThreadPoolExecutor
for all sessions - Global Frame Cache (
app.py:39
):latest_frames
shared across endpoints - Single Log File: All cameras write to
detector_worker.log
🏗️ New Architecture: Process-Per-Session
FastAPI Main Process (Port 8001)
├── WebSocket Handler (manages connections)
├── SessionProcessManager (spawns/manages session processes)
├── Main Process Logger → detector_worker_main.log
├──
├── Session Process 1 (Camera/Display 1)
│ ├── Dedicated Model Pipeline
│ ├── Own Model Cache & Memory
│ ├── Session Logger → detector_worker_camera_display-001_cam-001.log
│ └── Redis/DB connections
├──
├── Session Process 2 (Camera/Display 2)
│ ├── Dedicated Model Pipeline
│ ├── Own Model Cache & Memory
│ ├── Session Logger → detector_worker_camera_display-002_cam-001.log
│ └── Redis/DB connections
└──
└── Session Process N...
📋 Implementation Tasks
Phase 1: Core Infrastructure ✅ COMPLETED
-
Create SessionProcessManager class ✅
- Manages lifecycle of session processes
- Handles process spawning, monitoring, and cleanup
- Maintains process registry and health checks
-
Implement SessionWorkerProcess ✅
- Individual process class that handles one session completely
- Loads own models, pipeline, and maintains state
- Communicates via queues with main process
-
Design Inter-Process Communication ✅
- Command queue: Main → Session (frames, commands, config)
- Result queue: Session → Main (detections, status, errors)
- Use
multiprocessing.Queue
for thread-safe communication
Phase 1 Testing Results:
- ✅ Server starts successfully on port 8001
- ✅ WebSocket connections established (10.100.1.3:57488)
- ✅ SessionProcessManager initializes (max_sessions=20)
- ✅ Multiple session processes created (9 camera subscriptions)
- ✅ Individual session processes spawn with unique PIDs (e.g., PID: 16380)
- ✅ Session logging shows isolated process names (SessionWorker-session_xxx)
- ✅ IPC communication framework functioning
What to Look For When Testing:
- Check logs for "SessionProcessManager initialized"
- Verify individual session processes: "Session process created: session_xxx (PID: xxxx)"
- Monitor process isolation: Each session has unique process name "SessionWorker-session_xxx"
- Confirm WebSocket integration: "Session WebSocket integration started"
Phase 2: Per-Session Logging ✅ COMPLETED
-
Implement PerSessionLogger ✅
- Each session process creates own log file
- Format:
detector_worker_camera_{subscription_id}.log
- Include session context in all log messages
- Implement log rotation (daily/size-based)
-
Update Main Process Logging ✅
- Main process logs to
detector_worker_main.log
- Log session process lifecycle events
- Track active sessions and resource usage
- Main process logs to
Phase 2 Testing Results:
- ✅ Main process logs to dedicated file:
logs/detector_worker_main.log
- ✅ Session-specific logger initialization working
- ✅ Each camera spawns with unique session worker name: "SessionWorker-session_{unique_id}_{camera_name}"
- ✅ Per-session logger ready for file creation (will create files when sessions fully initialize)
- ✅ Structured logging with session context in format
- ✅ Log rotation capability implemented (100MB max, 5 backups)
What to Look For When Testing:
- Check for main process log:
logs/detector_worker_main.log
- Monitor per-session process names in logs: "SessionWorker-session_xxx"
- Once sessions initialize fully, look for per-camera log files:
detector_worker_camera_{camera_name}.log
- Verify session start/end events are logged with timestamps
- Check log rotation when files exceed 100MB
Phase 3: Model & Pipeline Isolation ✅ COMPLETED
-
Remove Shared Model Cache ✅
- Eliminated
YOLOWrapper._model_cache
class variable - Each process loads models independently
- Memory isolation prevents cross-session contamination
- Eliminated
-
Create Per-Process Pipeline Instances ✅
- Each session process instantiates own
DetectionPipeline
- Removed global pipeline singleton pattern
- Session-local
session_to_subscription
mapping
- Each session process instantiates own
-
Isolate Session State ✅
- Each process maintains own
session_processing_results
- Session mappings are process-local
- Complete state isolation per session
- Each process maintains own
Phase 3 Testing Results:
- ✅ Zero Shared Cache: Models log "(ISOLATED)" and "no shared cache!"
- ✅ Individual Model Loading: Each session loads complete model set independently
car_frontal_detection_v1.pt
per sessioncar_brand_cls_v1.pt
per sessioncar_bodytype_cls_v1.pt
per session
- ✅ Pipeline Isolation: Each session has unique pipeline instance ID
- ✅ Memory Isolation: Different sessions cannot share model instances
- ✅ State Isolation: Session mappings are process-local (ISOLATED comments added)
What to Look For When Testing:
- Check logs for "(ISOLATED)" on model loading
- Verify each session loads models independently: "Loading YOLO model ... (ISOLATED)"
- Monitor unique pipeline instance IDs per session
- Confirm no shared state between sessions
- Look for "Successfully loaded model ... in isolation - no shared cache!"
Phase 4: Integrated Stream-Session Architecture 🚧 IN PROGRESS
Problem Identified: Frame processing pipeline not working due to dual stream systems causing communication gap.
Root Cause:
- Old RTSP Process Manager capturing frames but not forwarding to session workers
- New Session Workers ready for processing but receiving no frames
- Architecture mismatch preventing detection despite successful initialization
Solution: Complete integration of stream reading INTO session worker processes.
-
Integrate RTSP Stream Reading into Session Workers
- Move RTSP stream capture from separate processes into each session worker
- Each session worker handles: RTSP connection + frame processing + model inference
- Eliminate communication gap between stream capture and detection
-
Remove Duplicate Stream Management Systems
- Delete old RTSP Process Manager (
core/streaming/process_manager.py
) - Remove conflicting stream management from main process
- Consolidate to single session-worker-only architecture
- Delete old RTSP Process Manager (
-
Enhanced Session Worker with Stream Integration
- Add RTSP stream reader to
SessionWorkerProcess
- Implement frame buffer queue management per worker
- Add connection recovery and stream health monitoring per session
- Add RTSP stream reader to
-
Complete End-to-End Isolation per Camera
Session Worker Process N: ├── RTSP Stream Reader (rtsp://cameraN) ├── Frame Buffer Queue ├── YOLO Detection Pipeline ├── Model Cache (isolated) ├── Database/Redis connections └── Per-camera Logger
Benefits for 20+ Cameras:
- Python GIL Bypass: True parallelism with multiprocessing
- Resource Isolation: Process crashes don't affect other cameras
- Memory Distribution: Each process has own memory space
- Independent Recovery: Per-camera reconnection logic
- Scalable Architecture: Linear scaling with available CPU cores
Phase 5: Resource Management & Cleanup
-
Process Lifecycle Management
- Automatic process cleanup on WebSocket disconnect
- Graceful shutdown handling
- Resource deallocation on process termination
-
Memory & GPU Management
- Monitor per-process memory usage
- GPU memory isolation between sessions
- Prevent memory leaks in long-running processes
-
Health Monitoring
- Process health checks and restart capability
- Performance metrics per session process
- Resource usage monitoring and alerting
🔄 What Will Be Replaced
Files to Modify:
-
app.py
- Replace direct pipeline execution with process management
- Remove global
latest_frames
cache - Add SessionProcessManager integration
-
core/models/inference.py
- Remove shared
_model_cache
class variable - Make model loading process-specific
- Eliminate cross-session model sharing
- Remove shared
-
core/detection/pipeline.py
- Remove global session mappings
- Make pipeline instance session-specific
- Isolate processing state per session
-
core/communication/websocket.py
- Replace direct pipeline calls with IPC
- Add process spawn/cleanup on subscribe/unsubscribe
- Implement queue-based communication
New Files to Create:
-
core/processes/session_manager.py
- SessionProcessManager class
- Process lifecycle management
- Health monitoring and cleanup
-
core/processes/session_worker.py
- SessionWorkerProcess class
- Individual session process implementation
- Model loading and pipeline execution
-
core/processes/communication.py
- IPC message definitions and handlers
- Queue management utilities
- Protocol for main ↔ session communication
-
core/logging/session_logger.py
- Per-session logging configuration
- Log file management and rotation
- Structured logging with session context
❌ What Will Be Removed
Code to Remove:
-
Shared State Variables
# From core/models/inference.py _model_cache: Dict[str, Any] = {} # From core/detection/pipeline.py self.session_to_subscription = {} self.session_processing_results = {} # From app.py latest_frames = {}
-
Global Singleton Patterns
- Single pipeline instance handling all sessions
- Shared ThreadPoolExecutor across sessions
- Global model manager for all subscriptions
-
Cross-Session Dependencies
- Session mapping lookups across different subscriptions
- Shared processing state between unrelated sessions
- Global frame caching across all cameras
🔧 Configuration Changes
New Configuration Options:
{
"session_processes": {
"max_concurrent_sessions": 20,
"process_cleanup_timeout": 30,
"health_check_interval": 10,
"log_rotation": {
"max_size_mb": 100,
"backup_count": 5
}
},
"resource_limits": {
"memory_per_process_mb": 2048,
"gpu_memory_fraction": 0.3
}
}
📊 Benefits of New Architecture
🛡️ Complete Isolation:
- Memory Isolation: Each session runs in separate process memory space
- Model Isolation: No shared model cache between sessions
- State Isolation: Session mappings and processing state are process-local
- Error Isolation: Process crashes don't affect other sessions
📈 Performance Improvements:
- True Parallelism: Bypass Python GIL limitations
- Resource Optimization: Each process uses only required resources
- Scalability: Linear scaling with available CPU cores
- Memory Efficiency: Automatic cleanup on session termination
🔍 Enhanced Monitoring:
- Per-Camera Logs: Dedicated log file for each session
- Resource Tracking: Monitor CPU/memory per session process
- Debugging: Isolated logs make issue diagnosis easier
- Audit Trail: Complete processing history per camera
🚀 Operational Benefits:
- Zero Cross-Session Contamination: Impossible for sessions to affect each other
- Hot Restart: Individual session restart without affecting others
- Resource Control: Fine-grained resource allocation per session
- Development: Easier testing and debugging of individual sessions
🎬 Implementation Order
- Phase 1: Core infrastructure (SessionProcessManager, IPC)
- Phase 2: Per-session logging system
- Phase 3: Model and pipeline isolation
- Phase 4: Resource management and monitoring
🧪 Testing Strategy
- Unit Tests: Test individual session processes in isolation
- Integration Tests: Test main ↔ session process communication
- Load Tests: Multiple concurrent sessions with different models
- Memory Tests: Verify no cross-session memory leaks
- Logging Tests: Verify correct log file creation and rotation
📝 Migration Checklist
- Backup current working version
- Implement Phase 1 (core infrastructure)
- Test with single session process
- Implement Phase 2 (logging)
- Test with multiple concurrent sessions
- Implement Phase 3 (isolation)
- Verify complete elimination of shared state
- Implement Phase 4 (resource management)
- Performance testing and optimization
- Documentation updates
Expected Outcome: Complete elimination of cross-session result contamination with enhanced monitoring capabilities and true session isolation.