python-detector-worker/IMPLEMENTATION_PLAN.md
ziesorx 34d1982e9e
All checks were successful
Build Worker Base and Application Images / check-base-changes (push) Successful in 7s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 2m52s
Build Worker Base and Application Images / deploy-stack (push) Successful in 9s
refactor: half way to process per session
2025-09-25 20:52:26 +07:00

13 KiB

Session-Isolated Multiprocessing Architecture - Implementation Plan

🎯 Objective

Eliminate shared state issues causing identical results across different sessions by implementing Process-Per-Session architecture with per-camera logging.

🔍 Root Cause Analysis

Current Shared State Issues:

  1. Shared Model Cache (core/models/inference.py:40): All sessions share same cached YOLO model instances
  2. Single Pipeline Instance (core/detection/pipeline.py): One pipeline handles all sessions with shared mappings
  3. Global Session Mappings: session_to_subscription and session_processing_results dictionaries
  4. Shared Thread Pool: Single ThreadPoolExecutor for all sessions
  5. Global Frame Cache (app.py:39): latest_frames shared across endpoints
  6. Single Log File: All cameras write to detector_worker.log

🏗️ New Architecture: Process-Per-Session

FastAPI Main Process (Port 8001)
├── WebSocket Handler (manages connections)
├── SessionProcessManager (spawns/manages session processes)
├── Main Process Logger → detector_worker_main.log
├──
├── Session Process 1 (Camera/Display 1)
│   ├── Dedicated Model Pipeline
│   ├── Own Model Cache & Memory
│   ├── Session Logger → detector_worker_camera_display-001_cam-001.log
│   └── Redis/DB connections
├──
├── Session Process 2 (Camera/Display 2)
│   ├── Dedicated Model Pipeline
│   ├── Own Model Cache & Memory
│   ├── Session Logger → detector_worker_camera_display-002_cam-001.log
│   └── Redis/DB connections
└──
└── Session Process N...

📋 Implementation Tasks

Phase 1: Core Infrastructure COMPLETED

  • Create SessionProcessManager class

    • Manages lifecycle of session processes
    • Handles process spawning, monitoring, and cleanup
    • Maintains process registry and health checks
  • Implement SessionWorkerProcess

    • Individual process class that handles one session completely
    • Loads own models, pipeline, and maintains state
    • Communicates via queues with main process
  • Design Inter-Process Communication

    • Command queue: Main → Session (frames, commands, config)
    • Result queue: Session → Main (detections, status, errors)
    • Use multiprocessing.Queue for thread-safe communication

Phase 1 Testing Results:

  • Server starts successfully on port 8001
  • WebSocket connections established (10.100.1.3:57488)
  • SessionProcessManager initializes (max_sessions=20)
  • Multiple session processes created (9 camera subscriptions)
  • Individual session processes spawn with unique PIDs (e.g., PID: 16380)
  • Session logging shows isolated process names (SessionWorker-session_xxx)
  • IPC communication framework functioning

What to Look For When Testing:

  • Check logs for "SessionProcessManager initialized"
  • Verify individual session processes: "Session process created: session_xxx (PID: xxxx)"
  • Monitor process isolation: Each session has unique process name "SessionWorker-session_xxx"
  • Confirm WebSocket integration: "Session WebSocket integration started"

Phase 2: Per-Session Logging COMPLETED

  • Implement PerSessionLogger

    • Each session process creates own log file
    • Format: detector_worker_camera_{subscription_id}.log
    • Include session context in all log messages
    • Implement log rotation (daily/size-based)
  • Update Main Process Logging

    • Main process logs to detector_worker_main.log
    • Log session process lifecycle events
    • Track active sessions and resource usage

Phase 2 Testing Results:

  • Main process logs to dedicated file: logs/detector_worker_main.log
  • Session-specific logger initialization working
  • Each camera spawns with unique session worker name: "SessionWorker-session_{unique_id}_{camera_name}"
  • Per-session logger ready for file creation (will create files when sessions fully initialize)
  • Structured logging with session context in format
  • Log rotation capability implemented (100MB max, 5 backups)

What to Look For When Testing:

  • Check for main process log: logs/detector_worker_main.log
  • Monitor per-session process names in logs: "SessionWorker-session_xxx"
  • Once sessions initialize fully, look for per-camera log files: detector_worker_camera_{camera_name}.log
  • Verify session start/end events are logged with timestamps
  • Check log rotation when files exceed 100MB

Phase 3: Model & Pipeline Isolation COMPLETED

  • Remove Shared Model Cache

    • Eliminated YOLOWrapper._model_cache class variable
    • Each process loads models independently
    • Memory isolation prevents cross-session contamination
  • Create Per-Process Pipeline Instances

    • Each session process instantiates own DetectionPipeline
    • Removed global pipeline singleton pattern
    • Session-local session_to_subscription mapping
  • Isolate Session State

    • Each process maintains own session_processing_results
    • Session mappings are process-local
    • Complete state isolation per session

Phase 3 Testing Results:

  • Zero Shared Cache: Models log "(ISOLATED)" and "no shared cache!"
  • Individual Model Loading: Each session loads complete model set independently
    • car_frontal_detection_v1.pt per session
    • car_brand_cls_v1.pt per session
    • car_bodytype_cls_v1.pt per session
  • Pipeline Isolation: Each session has unique pipeline instance ID
  • Memory Isolation: Different sessions cannot share model instances
  • State Isolation: Session mappings are process-local (ISOLATED comments added)

What to Look For When Testing:

  • Check logs for "(ISOLATED)" on model loading
  • Verify each session loads models independently: "Loading YOLO model ... (ISOLATED)"
  • Monitor unique pipeline instance IDs per session
  • Confirm no shared state between sessions
  • Look for "Successfully loaded model ... in isolation - no shared cache!"

Phase 4: Integrated Stream-Session Architecture 🚧 IN PROGRESS

Problem Identified: Frame processing pipeline not working due to dual stream systems causing communication gap.

Root Cause:

  • Old RTSP Process Manager capturing frames but not forwarding to session workers
  • New Session Workers ready for processing but receiving no frames
  • Architecture mismatch preventing detection despite successful initialization

Solution: Complete integration of stream reading INTO session worker processes.

  • Integrate RTSP Stream Reading into Session Workers

    • Move RTSP stream capture from separate processes into each session worker
    • Each session worker handles: RTSP connection + frame processing + model inference
    • Eliminate communication gap between stream capture and detection
  • Remove Duplicate Stream Management Systems

    • Delete old RTSP Process Manager (core/streaming/process_manager.py)
    • Remove conflicting stream management from main process
    • Consolidate to single session-worker-only architecture
  • Enhanced Session Worker with Stream Integration

    • Add RTSP stream reader to SessionWorkerProcess
    • Implement frame buffer queue management per worker
    • Add connection recovery and stream health monitoring per session
  • Complete End-to-End Isolation per Camera

    Session Worker Process N:
    ├── RTSP Stream Reader (rtsp://cameraN)
    ├── Frame Buffer Queue
    ├── YOLO Detection Pipeline
    ├── Model Cache (isolated)
    ├── Database/Redis connections
    └── Per-camera Logger
    

Benefits for 20+ Cameras:

  • Python GIL Bypass: True parallelism with multiprocessing
  • Resource Isolation: Process crashes don't affect other cameras
  • Memory Distribution: Each process has own memory space
  • Independent Recovery: Per-camera reconnection logic
  • Scalable Architecture: Linear scaling with available CPU cores

Phase 5: Resource Management & Cleanup

  • Process Lifecycle Management

    • Automatic process cleanup on WebSocket disconnect
    • Graceful shutdown handling
    • Resource deallocation on process termination
  • Memory & GPU Management

    • Monitor per-process memory usage
    • GPU memory isolation between sessions
    • Prevent memory leaks in long-running processes
  • Health Monitoring

    • Process health checks and restart capability
    • Performance metrics per session process
    • Resource usage monitoring and alerting

🔄 What Will Be Replaced

Files to Modify:

  1. app.py

    • Replace direct pipeline execution with process management
    • Remove global latest_frames cache
    • Add SessionProcessManager integration
  2. core/models/inference.py

    • Remove shared _model_cache class variable
    • Make model loading process-specific
    • Eliminate cross-session model sharing
  3. core/detection/pipeline.py

    • Remove global session mappings
    • Make pipeline instance session-specific
    • Isolate processing state per session
  4. core/communication/websocket.py

    • Replace direct pipeline calls with IPC
    • Add process spawn/cleanup on subscribe/unsubscribe
    • Implement queue-based communication

New Files to Create:

  1. core/processes/session_manager.py

    • SessionProcessManager class
    • Process lifecycle management
    • Health monitoring and cleanup
  2. core/processes/session_worker.py

    • SessionWorkerProcess class
    • Individual session process implementation
    • Model loading and pipeline execution
  3. core/processes/communication.py

    • IPC message definitions and handlers
    • Queue management utilities
    • Protocol for main ↔ session communication
  4. core/logging/session_logger.py

    • Per-session logging configuration
    • Log file management and rotation
    • Structured logging with session context

What Will Be Removed

Code to Remove:

  1. Shared State Variables

    # From core/models/inference.py
    _model_cache: Dict[str, Any] = {}
    
    # From core/detection/pipeline.py
    self.session_to_subscription = {}
    self.session_processing_results = {}
    
    # From app.py
    latest_frames = {}
    
  2. Global Singleton Patterns

    • Single pipeline instance handling all sessions
    • Shared ThreadPoolExecutor across sessions
    • Global model manager for all subscriptions
  3. Cross-Session Dependencies

    • Session mapping lookups across different subscriptions
    • Shared processing state between unrelated sessions
    • Global frame caching across all cameras

🔧 Configuration Changes

New Configuration Options:

{
  "session_processes": {
    "max_concurrent_sessions": 20,
    "process_cleanup_timeout": 30,
    "health_check_interval": 10,
    "log_rotation": {
      "max_size_mb": 100,
      "backup_count": 5
    }
  },
  "resource_limits": {
    "memory_per_process_mb": 2048,
    "gpu_memory_fraction": 0.3
  }
}

📊 Benefits of New Architecture

🛡️ Complete Isolation:

  • Memory Isolation: Each session runs in separate process memory space
  • Model Isolation: No shared model cache between sessions
  • State Isolation: Session mappings and processing state are process-local
  • Error Isolation: Process crashes don't affect other sessions

📈 Performance Improvements:

  • True Parallelism: Bypass Python GIL limitations
  • Resource Optimization: Each process uses only required resources
  • Scalability: Linear scaling with available CPU cores
  • Memory Efficiency: Automatic cleanup on session termination

🔍 Enhanced Monitoring:

  • Per-Camera Logs: Dedicated log file for each session
  • Resource Tracking: Monitor CPU/memory per session process
  • Debugging: Isolated logs make issue diagnosis easier
  • Audit Trail: Complete processing history per camera

🚀 Operational Benefits:

  • Zero Cross-Session Contamination: Impossible for sessions to affect each other
  • Hot Restart: Individual session restart without affecting others
  • Resource Control: Fine-grained resource allocation per session
  • Development: Easier testing and debugging of individual sessions

🎬 Implementation Order

  1. Phase 1: Core infrastructure (SessionProcessManager, IPC)
  2. Phase 2: Per-session logging system
  3. Phase 3: Model and pipeline isolation
  4. Phase 4: Resource management and monitoring

🧪 Testing Strategy

  1. Unit Tests: Test individual session processes in isolation
  2. Integration Tests: Test main ↔ session process communication
  3. Load Tests: Multiple concurrent sessions with different models
  4. Memory Tests: Verify no cross-session memory leaks
  5. Logging Tests: Verify correct log file creation and rotation

📝 Migration Checklist

  • Backup current working version
  • Implement Phase 1 (core infrastructure)
  • Test with single session process
  • Implement Phase 2 (logging)
  • Test with multiple concurrent sessions
  • Implement Phase 3 (isolation)
  • Verify complete elimination of shared state
  • Implement Phase 4 (resource management)
  • Performance testing and optimization
  • Documentation updates

Expected Outcome: Complete elimination of cross-session result contamination with enhanced monitoring capabilities and true session isolation.