python-detector-worker/IMPLEMENTATION_PLAN.md
ziesorx 34d1982e9e
All checks were successful
Build Worker Base and Application Images / check-base-changes (push) Successful in 7s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 2m52s
Build Worker Base and Application Images / deploy-stack (push) Successful in 9s
refactor: half way to process per session
2025-09-25 20:52:26 +07:00

339 lines
No EOL
13 KiB
Markdown

# Session-Isolated Multiprocessing Architecture - Implementation Plan
## 🎯 Objective
Eliminate shared state issues causing identical results across different sessions by implementing **Process-Per-Session architecture** with **per-camera logging**.
## 🔍 Root Cause Analysis
### Current Shared State Issues:
1. **Shared Model Cache** (`core/models/inference.py:40`): All sessions share same cached YOLO model instances
2. **Single Pipeline Instance** (`core/detection/pipeline.py`): One pipeline handles all sessions with shared mappings
3. **Global Session Mappings**: `session_to_subscription` and `session_processing_results` dictionaries
4. **Shared Thread Pool**: Single `ThreadPoolExecutor` for all sessions
5. **Global Frame Cache** (`app.py:39`): `latest_frames` shared across endpoints
6. **Single Log File**: All cameras write to `detector_worker.log`
## 🏗️ New Architecture: Process-Per-Session
```
FastAPI Main Process (Port 8001)
├── WebSocket Handler (manages connections)
├── SessionProcessManager (spawns/manages session processes)
├── Main Process Logger → detector_worker_main.log
├──
├── Session Process 1 (Camera/Display 1)
│ ├── Dedicated Model Pipeline
│ ├── Own Model Cache & Memory
│ ├── Session Logger → detector_worker_camera_display-001_cam-001.log
│ └── Redis/DB connections
├──
├── Session Process 2 (Camera/Display 2)
│ ├── Dedicated Model Pipeline
│ ├── Own Model Cache & Memory
│ ├── Session Logger → detector_worker_camera_display-002_cam-001.log
│ └── Redis/DB connections
└──
└── Session Process N...
```
## 📋 Implementation Tasks
### Phase 1: Core Infrastructure ✅ **COMPLETED**
- [x] **Create SessionProcessManager class**
- Manages lifecycle of session processes
- Handles process spawning, monitoring, and cleanup
- Maintains process registry and health checks
- [x] **Implement SessionWorkerProcess**
- Individual process class that handles one session completely
- Loads own models, pipeline, and maintains state
- Communicates via queues with main process
- [x] **Design Inter-Process Communication**
- Command queue: Main → Session (frames, commands, config)
- Result queue: Session → Main (detections, status, errors)
- Use `multiprocessing.Queue` for thread-safe communication
**Phase 1 Testing Results:**
- ✅ Server starts successfully on port 8001
- ✅ WebSocket connections established (10.100.1.3:57488)
- ✅ SessionProcessManager initializes (max_sessions=20)
- ✅ Multiple session processes created (9 camera subscriptions)
- ✅ Individual session processes spawn with unique PIDs (e.g., PID: 16380)
- ✅ Session logging shows isolated process names (SessionWorker-session_xxx)
- ✅ IPC communication framework functioning
**What to Look For When Testing:**
- Check logs for "SessionProcessManager initialized"
- Verify individual session processes: "Session process created: session_xxx (PID: xxxx)"
- Monitor process isolation: Each session has unique process name "SessionWorker-session_xxx"
- Confirm WebSocket integration: "Session WebSocket integration started"
### Phase 2: Per-Session Logging ✅ **COMPLETED**
- [x] **Implement PerSessionLogger**
- Each session process creates own log file
- Format: `detector_worker_camera_{subscription_id}.log`
- Include session context in all log messages
- Implement log rotation (daily/size-based)
- [x] **Update Main Process Logging**
- Main process logs to `detector_worker_main.log`
- Log session process lifecycle events
- Track active sessions and resource usage
**Phase 2 Testing Results:**
- ✅ Main process logs to dedicated file: `logs/detector_worker_main.log`
- ✅ Session-specific logger initialization working
- ✅ Each camera spawns with unique session worker name: "SessionWorker-session_{unique_id}_{camera_name}"
- ✅ Per-session logger ready for file creation (will create files when sessions fully initialize)
- ✅ Structured logging with session context in format
- ✅ Log rotation capability implemented (100MB max, 5 backups)
**What to Look For When Testing:**
- Check for main process log: `logs/detector_worker_main.log`
- Monitor per-session process names in logs: "SessionWorker-session_xxx"
- Once sessions initialize fully, look for per-camera log files: `detector_worker_camera_{camera_name}.log`
- Verify session start/end events are logged with timestamps
- Check log rotation when files exceed 100MB
### Phase 3: Model & Pipeline Isolation ✅ **COMPLETED**
- [x] **Remove Shared Model Cache**
- Eliminated `YOLOWrapper._model_cache` class variable
- Each process loads models independently
- Memory isolation prevents cross-session contamination
- [x] **Create Per-Process Pipeline Instances**
- Each session process instantiates own `DetectionPipeline`
- Removed global pipeline singleton pattern
- Session-local `session_to_subscription` mapping
- [x] **Isolate Session State**
- Each process maintains own `session_processing_results`
- Session mappings are process-local
- Complete state isolation per session
**Phase 3 Testing Results:**
-**Zero Shared Cache**: Models log "(ISOLATED)" and "no shared cache!"
-**Individual Model Loading**: Each session loads complete model set independently
- `car_frontal_detection_v1.pt` per session
- `car_brand_cls_v1.pt` per session
- `car_bodytype_cls_v1.pt` per session
-**Pipeline Isolation**: Each session has unique pipeline instance ID
-**Memory Isolation**: Different sessions cannot share model instances
-**State Isolation**: Session mappings are process-local (ISOLATED comments added)
**What to Look For When Testing:**
- Check logs for "(ISOLATED)" on model loading
- Verify each session loads models independently: "Loading YOLO model ... (ISOLATED)"
- Monitor unique pipeline instance IDs per session
- Confirm no shared state between sessions
- Look for "Successfully loaded model ... in isolation - no shared cache!"
### Phase 4: Integrated Stream-Session Architecture 🚧 **IN PROGRESS**
**Problem Identified:** Frame processing pipeline not working due to dual stream systems causing communication gap.
**Root Cause:**
- Old RTSP Process Manager capturing frames but not forwarding to session workers
- New Session Workers ready for processing but receiving no frames
- Architecture mismatch preventing detection despite successful initialization
**Solution:** Complete integration of stream reading INTO session worker processes.
- [ ] **Integrate RTSP Stream Reading into Session Workers**
- Move RTSP stream capture from separate processes into each session worker
- Each session worker handles: RTSP connection + frame processing + model inference
- Eliminate communication gap between stream capture and detection
- [ ] **Remove Duplicate Stream Management Systems**
- Delete old RTSP Process Manager (`core/streaming/process_manager.py`)
- Remove conflicting stream management from main process
- Consolidate to single session-worker-only architecture
- [ ] **Enhanced Session Worker with Stream Integration**
- Add RTSP stream reader to `SessionWorkerProcess`
- Implement frame buffer queue management per worker
- Add connection recovery and stream health monitoring per session
- [ ] **Complete End-to-End Isolation per Camera**
```
Session Worker Process N:
├── RTSP Stream Reader (rtsp://cameraN)
├── Frame Buffer Queue
├── YOLO Detection Pipeline
├── Model Cache (isolated)
├── Database/Redis connections
└── Per-camera Logger
```
**Benefits for 20+ Cameras:**
- **Python GIL Bypass**: True parallelism with multiprocessing
- **Resource Isolation**: Process crashes don't affect other cameras
- **Memory Distribution**: Each process has own memory space
- **Independent Recovery**: Per-camera reconnection logic
- **Scalable Architecture**: Linear scaling with available CPU cores
### Phase 5: Resource Management & Cleanup
- [ ] **Process Lifecycle Management**
- Automatic process cleanup on WebSocket disconnect
- Graceful shutdown handling
- Resource deallocation on process termination
- [ ] **Memory & GPU Management**
- Monitor per-process memory usage
- GPU memory isolation between sessions
- Prevent memory leaks in long-running processes
- [ ] **Health Monitoring**
- Process health checks and restart capability
- Performance metrics per session process
- Resource usage monitoring and alerting
## 🔄 What Will Be Replaced
### Files to Modify:
1. **`app.py`**
- Replace direct pipeline execution with process management
- Remove global `latest_frames` cache
- Add SessionProcessManager integration
2. **`core/models/inference.py`**
- Remove shared `_model_cache` class variable
- Make model loading process-specific
- Eliminate cross-session model sharing
3. **`core/detection/pipeline.py`**
- Remove global session mappings
- Make pipeline instance session-specific
- Isolate processing state per session
4. **`core/communication/websocket.py`**
- Replace direct pipeline calls with IPC
- Add process spawn/cleanup on subscribe/unsubscribe
- Implement queue-based communication
### New Files to Create:
1. **`core/processes/session_manager.py`**
- SessionProcessManager class
- Process lifecycle management
- Health monitoring and cleanup
2. **`core/processes/session_worker.py`**
- SessionWorkerProcess class
- Individual session process implementation
- Model loading and pipeline execution
3. **`core/processes/communication.py`**
- IPC message definitions and handlers
- Queue management utilities
- Protocol for main ↔ session communication
4. **`core/logging/session_logger.py`**
- Per-session logging configuration
- Log file management and rotation
- Structured logging with session context
## ❌ What Will Be Removed
### Code to Remove:
1. **Shared State Variables**
```python
# From core/models/inference.py
_model_cache: Dict[str, Any] = {}
# From core/detection/pipeline.py
self.session_to_subscription = {}
self.session_processing_results = {}
# From app.py
latest_frames = {}
```
2. **Global Singleton Patterns**
- Single pipeline instance handling all sessions
- Shared ThreadPoolExecutor across sessions
- Global model manager for all subscriptions
3. **Cross-Session Dependencies**
- Session mapping lookups across different subscriptions
- Shared processing state between unrelated sessions
- Global frame caching across all cameras
## 🔧 Configuration Changes
### New Configuration Options:
```json
{
"session_processes": {
"max_concurrent_sessions": 20,
"process_cleanup_timeout": 30,
"health_check_interval": 10,
"log_rotation": {
"max_size_mb": 100,
"backup_count": 5
}
},
"resource_limits": {
"memory_per_process_mb": 2048,
"gpu_memory_fraction": 0.3
}
}
```
## 📊 Benefits of New Architecture
### 🛡️ Complete Isolation:
- **Memory Isolation**: Each session runs in separate process memory space
- **Model Isolation**: No shared model cache between sessions
- **State Isolation**: Session mappings and processing state are process-local
- **Error Isolation**: Process crashes don't affect other sessions
### 📈 Performance Improvements:
- **True Parallelism**: Bypass Python GIL limitations
- **Resource Optimization**: Each process uses only required resources
- **Scalability**: Linear scaling with available CPU cores
- **Memory Efficiency**: Automatic cleanup on session termination
### 🔍 Enhanced Monitoring:
- **Per-Camera Logs**: Dedicated log file for each session
- **Resource Tracking**: Monitor CPU/memory per session process
- **Debugging**: Isolated logs make issue diagnosis easier
- **Audit Trail**: Complete processing history per camera
### 🚀 Operational Benefits:
- **Zero Cross-Session Contamination**: Impossible for sessions to affect each other
- **Hot Restart**: Individual session restart without affecting others
- **Resource Control**: Fine-grained resource allocation per session
- **Development**: Easier testing and debugging of individual sessions
## 🎬 Implementation Order
1. **Phase 1**: Core infrastructure (SessionProcessManager, IPC)
2. **Phase 2**: Per-session logging system
3. **Phase 3**: Model and pipeline isolation
4. **Phase 4**: Resource management and monitoring
## 🧪 Testing Strategy
1. **Unit Tests**: Test individual session processes in isolation
2. **Integration Tests**: Test main ↔ session process communication
3. **Load Tests**: Multiple concurrent sessions with different models
4. **Memory Tests**: Verify no cross-session memory leaks
5. **Logging Tests**: Verify correct log file creation and rotation
## 📝 Migration Checklist
- [ ] Backup current working version
- [ ] Implement Phase 1 (core infrastructure)
- [ ] Test with single session process
- [ ] Implement Phase 2 (logging)
- [ ] Test with multiple concurrent sessions
- [ ] Implement Phase 3 (isolation)
- [ ] Verify complete elimination of shared state
- [ ] Implement Phase 4 (resource management)
- [ ] Performance testing and optimization
- [ ] Documentation updates
---
**Expected Outcome**: Complete elimination of cross-session result contamination with enhanced monitoring capabilities and true session isolation.