diff --git a/REFACTOR_PLAN.md b/REFACTOR_PLAN.md new file mode 100644 index 0000000..7b0f738 --- /dev/null +++ b/REFACTOR_PLAN.md @@ -0,0 +1,365 @@ +# Detector Worker Refactoring Plan + +## Project Overview + +Transform the current monolithic structure (~4000 lines across `app.py` and `siwatsystem/pympta.py`) into a modular, maintainable system with clear separation of concerns. The goal is to make the sophisticated computer vision pipeline easily understandable for other engineers while maintaining all existing functionality. + +## Current System Flow Understanding + +### Validated System Flow +1. **WebSocket Connection** → Backend connects and sends `setSubscriptionList` +2. **Model Management** → Download unique `.mpta` files to `models/` and extract +3. **Tracking Phase** → Continuous tracking with `front_rear_detection_v1.pt` +4. **Validation Phase** → Validate stable car (not just passing by) +5. **Pipeline Execution** → + - Detect car with `yolo11m.pt` + - **Branch 1**: Front/rear detection → crop frontal → save to Redis + brand classification + - **Branch 2**: Body type classification from car crop +6. **Communication** → Send `imageDetection` → Backend generates `sessionId` → Fueling starts +7. **Post-Fueling** → Backend clears `sessionId` → Continue tracking same car to avoid re-pipeline + +### Core Responsibilities Identified +1. **WebSocket Communication** - Message handling and protocol compliance +2. **Stream Management** - RTSP/HTTP frame processing and buffering +3. **Model Management** - MPTA download, extraction, and loading +4. **Pipeline Configuration** - Parse `pipeline.json` and setup execution flow +5. **Vehicle Tracking** - Continuous tracking and car identification +6. **Validation Logic** - Stable car detection vs. passing-by cars +7. **Detection Pipeline** - Main ML pipeline with parallel branches +8. **Data Persistence** - Redis/PostgreSQL operations +9. **Session Management** - Handle session IDs and lifecycle + +## Proposed Directory Structure + +``` +core/ +├── communication/ +│ ├── __init__.py +│ ├── websocket.py # WebSocket message handling & protocol +│ ├── messages.py # Message types and validation +│ ├── models.py # Message data structures +│ └── state.py # Worker state management +├── streaming/ +│ ├── __init__.py +│ ├── manager.py # Stream coordination and lifecycle +│ ├── readers.py # RTSP/HTTP frame readers +│ └── buffers.py # Frame buffering and caching +├── models/ +│ ├── __init__.py +│ ├── manager.py # MPTA download and model loading +│ ├── pipeline.py # Pipeline.json parser and config +│ └── inference.py # YOLO model wrapper and optimization +├── tracking/ +│ ├── __init__.py +│ ├── tracker.py # Vehicle tracking with front_rear_detection_v1 +│ ├── validator.py # Stable car validation logic +│ └── integration.py # Tracking-pipeline integration +├── detection/ +│ ├── __init__.py +│ ├── pipeline.py # Main detection pipeline orchestration +│ └── branches.py # Parallel branch processing (brand/bodytype) +└── storage/ + ├── __init__.py + ├── redis.py # Redis operations and image storage + └── database.py # PostgreSQL operations (existing - will be moved) +``` + +## Implementation Strategy (Feature-by-Feature Testing) + +### Phase 1: Communication Layer +- WebSocket message handling (setSubscriptionList, sessionId management) +- HTTP API endpoints (camera image retrieval) +- Worker state reporting + +### Phase 2: Pipeline Configuration Reader +- Parse `pipeline.json` +- Model dependency resolution +- Branch configuration setup + +### Phase 3: Tracking System +- Continuous vehicle tracking +- Car identification and persistence + +### Phase 4: Tracking Validator +- Stable car detection logic +- Passing-by vs. fueling car differentiation + +### Phase 5: Model Pipeline Execution +- Main detection pipeline +- Parallel branch processing +- Redis/DB integration + +### Phase 6: Post-Session Tracking Validation +- Same car validation after sessionId cleared +- Prevent duplicate pipeline execution + +## Key Preservation Requirements +- **HTTP Endpoint**: `/camera/{camera_id}/image` must remain unchanged +- **WebSocket Protocol**: Full compliance with `worker.md` specification +- **MPTA Format**: Maintain compatibility with existing model archives +- **Database Schema**: Keep existing PostgreSQL structure +- **Redis Integration**: Preserve image storage and pub/sub functionality +- **Configuration**: Maintain `config.json` compatibility +- **Logging**: Preserve structured logging format + +## Expected Benefits +- **Maintainability**: Single responsibility modules (~200-400 lines each) +- **Testability**: Independent testing of each component +- **Readability**: Clear separation of concerns +- **Scalability**: Easy to extend and modify individual components +- **Documentation**: Self-documenting code structure + +--- + +# Comprehensive TODO List + +## 📋 Phase 1: Project Setup & Communication Layer + +### 1.1 Project Structure Setup +- [ ] Create `core/` directory structure +- [ ] Create all module directories and `__init__.py` files +- [ ] Set up logging configuration for new modules +- [ ] Update imports in existing files to prepare for migration + +### 1.2 Communication Module (`core/communication/`) +- [ ] **Create `models.py`** - Message data structures + - [ ] Define WebSocket message models (SubscriptionList, StateReport, etc.) + - [ ] Add validation schemas for incoming messages + - [ ] Create response models for outgoing messages + +- [ ] **Create `messages.py`** - Message types and validation + - [ ] Implement message type constants + - [ ] Add message validation functions + - [ ] Create message builders for common responses + +- [ ] **Create `websocket.py`** - WebSocket message handling + - [ ] Extract WebSocket connection management from `app.py` + - [ ] Implement message routing and dispatching + - [ ] Add connection lifecycle management (connect, disconnect, reconnect) + - [ ] Handle `setSubscriptionList` message processing + - [ ] Handle `setSessionId` and `setProgressionStage` messages + - [ ] Handle `requestState` and `patchSessionResult` messages + +- [ ] **Create `state.py`** - Worker state management + - [ ] Extract state reporting logic from `app.py` + - [ ] Implement system metrics collection (CPU, memory, GPU) + - [ ] Manage active subscriptions state + - [ ] Handle session ID mapping and storage + +### 1.3 HTTP API Preservation +- [ ] **Preserve `/camera/{camera_id}/image` endpoint** + - [ ] Extract REST API logic from `app.py` + - [ ] Ensure frame caching mechanism works with new structure + - [ ] Maintain exact same response format and error handling + +### 1.4 Testing Phase 1 +- [ ] Test WebSocket connection and message handling +- [ ] Test HTTP API endpoint functionality +- [ ] Verify state reporting works correctly +- [ ] Test session management functionality + +## 📋 Phase 2: Pipeline Configuration & Model Management + +### 2.1 Models Module (`core/models/`) +- [ ] **Create `pipeline.py`** - Pipeline.json parser + - [ ] Extract pipeline configuration parsing from `pympta.py` + - [ ] Implement pipeline validation + - [ ] Add configuration schema validation + - [ ] Handle Redis and PostgreSQL configuration parsing + +- [ ] **Create `manager.py`** - MPTA download and model loading + - [ ] Extract MPTA download logic from `pympta.py` + - [ ] Implement ZIP extraction and validation + - [ ] Add model file management and caching + - [ ] Handle model loading with GPU optimization + - [ ] Implement model dependency resolution + +- [ ] **Create `inference.py`** - YOLO model wrapper + - [ ] Create unified YOLO model interface + - [ ] Add inference optimization and caching + - [ ] Implement batch processing capabilities + - [ ] Handle model switching and memory management + +### 2.2 Testing Phase 2 +- [ ] Test MPTA file download and extraction +- [ ] Test pipeline.json parsing and validation +- [ ] Test model loading with different configurations +- [ ] Verify GPU optimization works correctly + +## 📋 Phase 3: Streaming System + +### 3.1 Streaming Module (`core/streaming/`) +- [ ] **Create `readers.py`** - RTSP/HTTP frame readers + - [ ] Extract `frame_reader` function from `app.py` + - [ ] Extract `snapshot_reader` function from `app.py` + - [ ] Add connection management and retry logic + - [ ] Implement frame rate control and optimization + +- [ ] **Create `buffers.py`** - Frame buffering and caching + - [ ] Extract frame buffer management from `app.py` + - [ ] Implement efficient frame caching for REST API + - [ ] Add buffer size management and memory optimization + +- [ ] **Create `manager.py`** - Stream coordination + - [ ] Extract stream lifecycle management from `app.py` + - [ ] Implement shared stream optimization + - [ ] Add subscription reconciliation logic + - [ ] Handle stream sharing across multiple subscriptions + +### 3.2 Testing Phase 3 +- [ ] Test RTSP stream reading and buffering +- [ ] Test HTTP snapshot capture functionality +- [ ] Test shared stream optimization +- [ ] Verify frame caching for REST API access + +## 📋 Phase 4: Vehicle Tracking System + +### 4.1 Tracking Module (`core/tracking/`) +- [ ] **Create `tracker.py`** - Vehicle tracking implementation + - [ ] Implement continuous tracking with `front_rear_detection_v1.pt` + - [ ] Add vehicle identification and persistence + - [ ] Implement tracking state management + - [ ] Add bounding box tracking and motion analysis + +- [ ] **Create `validator.py`** - Stable car validation + - [ ] Implement stable car detection algorithm + - [ ] Add passing-by vs. fueling car differentiation + - [ ] Implement validation thresholds and timing + - [ ] Add confidence scoring for validation decisions + +- [ ] **Create `integration.py`** - Tracking-pipeline integration + - [ ] Connect tracking system with main pipeline + - [ ] Handle tracking state transitions + - [ ] Implement post-session tracking validation + - [ ] Add same-car validation after sessionId cleared + +### 4.2 Testing Phase 4 +- [ ] Test continuous vehicle tracking functionality +- [ ] Test stable car validation logic +- [ ] Test integration with existing pipeline +- [ ] Verify tracking performance and accuracy + +## 📋 Phase 5: Detection Pipeline System + +### 5.1 Detection Module (`core/detection/`) +- [ ] **Create `pipeline.py`** - Main detection orchestration + - [ ] Extract main pipeline execution from `pympta.py` + - [ ] Implement detection flow coordination + - [ ] Add pipeline state management + - [ ] Handle pipeline result aggregation + +- [ ] **Create `branches.py`** - Parallel branch processing + - [ ] Extract parallel branch execution from `pympta.py` + - [ ] Implement brand classification branch + - [ ] Implement body type classification branch + - [ ] Add branch synchronization and result collection + - [ ] Handle branch failure and retry logic + +### 5.2 Storage Module (`core/storage/`) +- [ ] **Create `redis.py`** - Redis operations + - [ ] Extract Redis action execution from `pympta.py` + - [ ] Implement image storage with region cropping + - [ ] Add pub/sub messaging functionality + - [ ] Handle Redis connection management and retry logic + +- [ ] **Move `database.py`** - PostgreSQL operations + - [ ] Move existing `siwatsystem/database.py` to `core/storage/` + - [ ] Update imports and integration points + - [ ] Ensure compatibility with new module structure + +### 5.3 Testing Phase 5 +- [ ] Test main detection pipeline execution +- [ ] Test parallel branch processing (brand/bodytype) +- [ ] Test Redis image storage and messaging +- [ ] Test PostgreSQL database operations +- [ ] Verify complete pipeline integration + +## 📋 Phase 6: Integration & Final Testing + +### 6.1 Main Application Refactoring +- [ ] **Refactor `app.py`** + - [ ] Remove extracted functionality + - [ ] Update to use new modular structure + - [ ] Maintain FastAPI application structure + - [ ] Update imports and dependencies + +- [ ] **Clean up `siwatsystem/pympta.py`** + - [ ] Remove extracted functionality + - [ ] Keep only necessary legacy compatibility code + - [ ] Update imports to use new modules + +### 6.2 Post-Session Tracking Validation +- [ ] Implement same-car validation after sessionId cleared +- [ ] Add logic to prevent duplicate pipeline execution +- [ ] Test tracking persistence through session lifecycle +- [ ] Verify correct behavior during edge cases + +### 6.3 Configuration & Documentation +- [ ] Update configuration handling for new structure +- [ ] Ensure `config.json` compatibility maintained +- [ ] Update logging configuration for all modules +- [ ] Add module-level documentation + +### 6.4 Comprehensive Testing +- [ ] **Integration Testing** + - [ ] Test complete system flow end-to-end + - [ ] Test all WebSocket message types + - [ ] Test HTTP API endpoints + - [ ] Test error handling and recovery + +- [ ] **Performance Testing** + - [ ] Verify system performance is maintained + - [ ] Test memory usage optimization + - [ ] Test GPU utilization efficiency + - [ ] Benchmark against original implementation + +- [ ] **Edge Case Testing** + - [ ] Test connection failures and reconnection + - [ ] Test model loading failures + - [ ] Test stream interruption handling + - [ ] Test concurrent subscription management + +### 6.5 Final Cleanup +- [ ] Remove any remaining duplicate code +- [ ] Optimize imports across all modules +- [ ] Clean up temporary files and debugging code +- [ ] Update project documentation + +## 📋 Post-Refactoring Tasks + +### Documentation Updates +- [ ] Update `CLAUDE.md` with new architecture +- [ ] Create module-specific documentation +- [ ] Update installation and deployment guides +- [ ] Add troubleshooting guide for new structure + +### Code Quality +- [ ] Add type hints to all new modules +- [ ] Implement proper error handling patterns +- [ ] Add logging consistency across modules +- [ ] Ensure proper resource cleanup + +### Future Enhancements (Optional) +- [ ] Add unit tests for each module +- [ ] Implement monitoring and metrics collection +- [ ] Add configuration validation +- [ ] Consider adding dependency injection container + +--- + +## Success Criteria + +✅ **Modularity**: Each module has a single, clear responsibility +✅ **Testability**: Each phase can be tested independently +✅ **Maintainability**: Code is easy to understand and modify +✅ **Compatibility**: All existing functionality preserved +✅ **Performance**: System performance is maintained or improved +✅ **Documentation**: Clear documentation for new architecture + +## Risk Mitigation + +- **Feature-by-feature testing** ensures functionality is preserved at each step +- **Gradual migration** minimizes risk of breaking existing functionality +- **Preserve critical interfaces** (WebSocket protocol, HTTP endpoints) +- **Maintain backward compatibility** with existing configurations +- **Comprehensive testing** at each phase before proceeding \ No newline at end of file