14 KiB
Detector Worker Refactoring Plan
Project Overview
Transform the current monolithic structure (~4000 lines across app.py
and siwatsystem/pympta.py
) into a modular, maintainable system with clear separation of concerns. The goal is to make the sophisticated computer vision pipeline easily understandable for other engineers while maintaining all existing functionality.
Current System Flow Understanding
Validated System Flow
- WebSocket Connection → Backend connects and sends
setSubscriptionList
- Model Management → Download unique
.mpta
files tomodels/
and extract - Tracking Phase → Continuous tracking with
front_rear_detection_v1.pt
- Validation Phase → Validate stable car (not just passing by)
- Pipeline Execution →
- Detect car with
yolo11m.pt
- Branch 1: Front/rear detection → crop frontal → save to Redis + brand classification
- Branch 2: Body type classification from car crop
- Detect car with
- Communication → Send
imageDetection
→ Backend generatessessionId
→ Fueling starts - Post-Fueling → Backend clears
sessionId
→ Continue tracking same car to avoid re-pipeline
Core Responsibilities Identified
- WebSocket Communication - Message handling and protocol compliance
- Stream Management - RTSP/HTTP frame processing and buffering
- Model Management - MPTA download, extraction, and loading
- Pipeline Configuration - Parse
pipeline.json
and setup execution flow - Vehicle Tracking - Continuous tracking and car identification
- Validation Logic - Stable car detection vs. passing-by cars
- Detection Pipeline - Main ML pipeline with parallel branches
- Data Persistence - Redis/PostgreSQL operations
- Session Management - Handle session IDs and lifecycle
Proposed Directory Structure
core/
├── communication/
│ ├── __init__.py
│ ├── websocket.py # WebSocket message handling & protocol
│ ├── messages.py # Message types and validation
│ ├── models.py # Message data structures
│ └── state.py # Worker state management
├── streaming/
│ ├── __init__.py
│ ├── manager.py # Stream coordination and lifecycle
│ ├── readers.py # RTSP/HTTP frame readers
│ └── buffers.py # Frame buffering and caching
├── models/
│ ├── __init__.py
│ ├── manager.py # MPTA download and model loading
│ ├── pipeline.py # Pipeline.json parser and config
│ └── inference.py # YOLO model wrapper and optimization
├── tracking/
│ ├── __init__.py
│ ├── tracker.py # Vehicle tracking with front_rear_detection_v1
│ ├── validator.py # Stable car validation logic
│ └── integration.py # Tracking-pipeline integration
├── detection/
│ ├── __init__.py
│ ├── pipeline.py # Main detection pipeline orchestration
│ └── branches.py # Parallel branch processing (brand/bodytype)
└── storage/
├── __init__.py
├── redis.py # Redis operations and image storage
└── database.py # PostgreSQL operations (existing - will be moved)
Implementation Strategy (Feature-by-Feature Testing)
Phase 1: Communication Layer
- WebSocket message handling (setSubscriptionList, sessionId management)
- HTTP API endpoints (camera image retrieval)
- Worker state reporting
Phase 2: Pipeline Configuration Reader
- Parse
pipeline.json
- Model dependency resolution
- Branch configuration setup
Phase 3: Tracking System
- Continuous vehicle tracking
- Car identification and persistence
Phase 4: Tracking Validator
- Stable car detection logic
- Passing-by vs. fueling car differentiation
Phase 5: Model Pipeline Execution
- Main detection pipeline
- Parallel branch processing
- Redis/DB integration
Phase 6: Post-Session Tracking Validation
- Same car validation after sessionId cleared
- Prevent duplicate pipeline execution
Key Preservation Requirements
- HTTP Endpoint:
/camera/{camera_id}/image
must remain unchanged - WebSocket Protocol: Full compliance with
worker.md
specification - MPTA Format: Maintain compatibility with existing model archives
- Database Schema: Keep existing PostgreSQL structure
- Redis Integration: Preserve image storage and pub/sub functionality
- Configuration: Maintain
config.json
compatibility - Logging: Preserve structured logging format
Expected Benefits
- Maintainability: Single responsibility modules (~200-400 lines each)
- Testability: Independent testing of each component
- Readability: Clear separation of concerns
- Scalability: Easy to extend and modify individual components
- Documentation: Self-documenting code structure
Comprehensive TODO List
📋 Phase 1: Project Setup & Communication Layer
1.1 Project Structure Setup
- Create
core/
directory structure - Create all module directories and
__init__.py
files - Set up logging configuration for new modules
- Update imports in existing files to prepare for migration
1.2 Communication Module (core/communication/
)
-
Create
models.py
- Message data structures- Define WebSocket message models (SubscriptionList, StateReport, etc.)
- Add validation schemas for incoming messages
- Create response models for outgoing messages
-
Create
messages.py
- Message types and validation- Implement message type constants
- Add message validation functions
- Create message builders for common responses
-
Create
websocket.py
- WebSocket message handling- Extract WebSocket connection management from
app.py
- Implement message routing and dispatching
- Add connection lifecycle management (connect, disconnect, reconnect)
- Handle
setSubscriptionList
message processing - Handle
setSessionId
andsetProgressionStage
messages - Handle
requestState
andpatchSessionResult
messages
- Extract WebSocket connection management from
-
Create
state.py
- Worker state management- Extract state reporting logic from
app.py
- Implement system metrics collection (CPU, memory, GPU)
- Manage active subscriptions state
- Handle session ID mapping and storage
- Extract state reporting logic from
1.3 HTTP API Preservation
- Preserve
/camera/{camera_id}/image
endpoint- Extract REST API logic from
app.py
- Ensure frame caching mechanism works with new structure
- Maintain exact same response format and error handling
- Extract REST API logic from
1.4 Testing Phase 1
- Test WebSocket connection and message handling
- Test HTTP API endpoint functionality
- Verify state reporting works correctly
- Test session management functionality
📋 Phase 2: Pipeline Configuration & Model Management
2.1 Models Module (core/models/
)
-
Create
pipeline.py
- Pipeline.json parser- Extract pipeline configuration parsing from
pympta.py
- Implement pipeline validation
- Add configuration schema validation
- Handle Redis and PostgreSQL configuration parsing
- Extract pipeline configuration parsing from
-
Create
manager.py
- MPTA download and model loading- Extract MPTA download logic from
pympta.py
- Implement ZIP extraction and validation
- Add model file management and caching
- Handle model loading with GPU optimization
- Implement model dependency resolution
- Extract MPTA download logic from
-
Create
inference.py
- YOLO model wrapper- Create unified YOLO model interface
- Add inference optimization and caching
- Implement batch processing capabilities
- Handle model switching and memory management
2.2 Testing Phase 2
- Test MPTA file download and extraction
- Test pipeline.json parsing and validation
- Test model loading with different configurations
- Verify GPU optimization works correctly
📋 Phase 3: Streaming System
3.1 Streaming Module (core/streaming/
)
-
Create
readers.py
- RTSP/HTTP frame readers- Extract
frame_reader
function fromapp.py
- Extract
snapshot_reader
function fromapp.py
- Add connection management and retry logic
- Implement frame rate control and optimization
- Extract
-
Create
buffers.py
- Frame buffering and caching- Extract frame buffer management from
app.py
- Implement efficient frame caching for REST API
- Add buffer size management and memory optimization
- Extract frame buffer management from
-
Create
manager.py
- Stream coordination- Extract stream lifecycle management from
app.py
- Implement shared stream optimization
- Add subscription reconciliation logic
- Handle stream sharing across multiple subscriptions
- Extract stream lifecycle management from
3.2 Testing Phase 3
- Test RTSP stream reading and buffering
- Test HTTP snapshot capture functionality
- Test shared stream optimization
- Verify frame caching for REST API access
📋 Phase 4: Vehicle Tracking System
4.1 Tracking Module (core/tracking/
)
-
Create
tracker.py
- Vehicle tracking implementation- Implement continuous tracking with
front_rear_detection_v1.pt
- Add vehicle identification and persistence
- Implement tracking state management
- Add bounding box tracking and motion analysis
- Implement continuous tracking with
-
Create
validator.py
- Stable car validation- Implement stable car detection algorithm
- Add passing-by vs. fueling car differentiation
- Implement validation thresholds and timing
- Add confidence scoring for validation decisions
-
Create
integration.py
- Tracking-pipeline integration- Connect tracking system with main pipeline
- Handle tracking state transitions
- Implement post-session tracking validation
- Add same-car validation after sessionId cleared
4.2 Testing Phase 4
- Test continuous vehicle tracking functionality
- Test stable car validation logic
- Test integration with existing pipeline
- Verify tracking performance and accuracy
📋 Phase 5: Detection Pipeline System
5.1 Detection Module (core/detection/
)
-
Create
pipeline.py
- Main detection orchestration- Extract main pipeline execution from
pympta.py
- Implement detection flow coordination
- Add pipeline state management
- Handle pipeline result aggregation
- Extract main pipeline execution from
-
Create
branches.py
- Parallel branch processing- Extract parallel branch execution from
pympta.py
- Implement brand classification branch
- Implement body type classification branch
- Add branch synchronization and result collection
- Handle branch failure and retry logic
- Extract parallel branch execution from
5.2 Storage Module (core/storage/
)
-
Create
redis.py
- Redis operations- Extract Redis action execution from
pympta.py
- Implement image storage with region cropping
- Add pub/sub messaging functionality
- Handle Redis connection management and retry logic
- Extract Redis action execution from
-
Move
database.py
- PostgreSQL operations- Move existing
siwatsystem/database.py
tocore/storage/
- Update imports and integration points
- Ensure compatibility with new module structure
- Move existing
5.3 Testing Phase 5
- Test main detection pipeline execution
- Test parallel branch processing (brand/bodytype)
- Test Redis image storage and messaging
- Test PostgreSQL database operations
- Verify complete pipeline integration
📋 Phase 6: Integration & Final Testing
6.1 Main Application Refactoring
-
Refactor
app.py
- Remove extracted functionality
- Update to use new modular structure
- Maintain FastAPI application structure
- Update imports and dependencies
-
Clean up
siwatsystem/pympta.py
- Remove extracted functionality
- Keep only necessary legacy compatibility code
- Update imports to use new modules
6.2 Post-Session Tracking Validation
- Implement same-car validation after sessionId cleared
- Add logic to prevent duplicate pipeline execution
- Test tracking persistence through session lifecycle
- Verify correct behavior during edge cases
6.3 Configuration & Documentation
- Update configuration handling for new structure
- Ensure
config.json
compatibility maintained - Update logging configuration for all modules
- Add module-level documentation
6.4 Comprehensive Testing
-
Integration Testing
- Test complete system flow end-to-end
- Test all WebSocket message types
- Test HTTP API endpoints
- Test error handling and recovery
-
Performance Testing
- Verify system performance is maintained
- Test memory usage optimization
- Test GPU utilization efficiency
- Benchmark against original implementation
-
Edge Case Testing
- Test connection failures and reconnection
- Test model loading failures
- Test stream interruption handling
- Test concurrent subscription management
6.5 Final Cleanup
- Remove any remaining duplicate code
- Optimize imports across all modules
- Clean up temporary files and debugging code
- Update project documentation
📋 Post-Refactoring Tasks
Documentation Updates
- Update
CLAUDE.md
with new architecture - Create module-specific documentation
- Update installation and deployment guides
- Add troubleshooting guide for new structure
Code Quality
- Add type hints to all new modules
- Implement proper error handling patterns
- Add logging consistency across modules
- Ensure proper resource cleanup
Future Enhancements (Optional)
- Add unit tests for each module
- Implement monitoring and metrics collection
- Add configuration validation
- Consider adding dependency injection container
Success Criteria
✅ Modularity: Each module has a single, clear responsibility ✅ Testability: Each phase can be tested independently ✅ Maintainability: Code is easy to understand and modify ✅ Compatibility: All existing functionality preserved ✅ Performance: System performance is maintained or improved ✅ Documentation: Clear documentation for new architecture
Risk Mitigation
- Feature-by-feature testing ensures functionality is preserved at each step
- Gradual migration minimizes risk of breaking existing functionality
- Preserve critical interfaces (WebSocket protocol, HTTP endpoints)
- Maintain backward compatibility with existing configurations
- Comprehensive testing at each phase before proceeding