46 KiB
Detector Worker - Architecture & Workflow Documentation
Table of Contents
- Architecture Overview
- Module Structure
- System Startup Flow
- WebSocket Communication Flow
- Detection Pipeline Flow
- Data Storage Flow
- Error Handling & Recovery
- Testing Architecture
- Development Workflow
Architecture Overview
The Detector Worker has been refactored from a monolithic 4,115-line application into a modular, maintainable system with clear separation of concerns. The new architecture follows modern software engineering principles including dependency injection, thread safety, and comprehensive testing.
High-Level System Diagram
graph TB
Client[WebSocket Client] --> WS[WebSocket Handler]
WS --> MP[Message Processor]
MP --> SM[Stream Manager]
MP --> MM[Model Manager]
MP --> PE[Pipeline Executor]
SM --> FR[Frame Reader]
FR --> YD[YOLO Detector]
YD --> TM[Tracking Manager]
TM --> SV[Stability Validator]
PE --> AE[Action Executor]
PE --> FM[Field Mapper]
AE --> DM[Database Manager]
AE --> RC[Redis Client]
AE --> SC[Session Cache]
subgraph "Singleton Managers"
MSM[Model State Manager]
SSM[Stream State Manager]
SeSM[Session State Manager]
CSM[Cache State Manager]
CaSM[Camera State Manager]
PSM[Pipeline State Manager]
end
subgraph "Storage Layer"
PostgreSQL[(PostgreSQL)]
Redis[(Redis)]
LocalCache[Local Cache]
end
DM --> PostgreSQL
RC --> Redis
SC --> LocalCache
Key Architectural Principles
- Modular Design: Each module has a single responsibility
- Dependency Injection: IoC container manages object dependencies
- Thread Safety: Singleton managers with proper locking
- Async/Await: Non-blocking I/O operations throughout
- Type Safety: Comprehensive type hints and validation
- Error Resilience: Proper exception handling and recovery
Module Structure
Core Modules (detector_worker/core/
)
config.py
- Configuration Management
# Central configuration with multi-source loading
Configuration()
├── load_from_file(path) # JSON/YAML config files
├── load_from_env() # Environment variables
└── validate_config() # Configuration validation
ConfigurationProvider(ABC) # Abstract base for config sources
├── JSONConfigProvider
├── YAMLConfigProvider
└── EnvironmentConfigProvider
dependency_injection.py
- IoC Container
ServiceContainer()
├── register_singleton() # Single instance per container
├── register_transient() # New instance per request
├── register_scoped() # Instance per scope
└── resolve() # Dependency resolution with auto-injection
singleton_managers.py
- Thread-Safe State Management
# Six singleton managers replacing global dictionaries
ModelStateManager() # Model loading states
StreamStateManager() # Active stream connections
SessionStateManager() # Client session tracking
CacheStateManager() # Cache state management
CameraStateManager() # Camera connection states
PipelineStateManager() # Pipeline execution states
exceptions.py
- Exception Hierarchy
DetectorWorkerError # Base exception
├── ConfigurationError # Configuration issues
├── StreamError # Stream-related errors
├── ModelError # Model loading/inference errors
├── PipelineError # Pipeline execution errors
├── DatabaseError # Database operation errors
├── RedisError # Redis operation errors
└── MessageProcessingError # WebSocket message errors
Communication Layer (detector_worker/communication/
)
websocket_handler.py
- WebSocket Management
WebSocketHandler()
├── handle_websocket() # Main WebSocket connection handler
├── _heartbeat_loop() # Keep-alive mechanism
└── _cleanup_connections() # Connection cleanup
ConnectionManager()
├── add_connection() # Register new client
├── remove_connection() # Cleanup disconnected client
├── broadcast() # Send to all clients
└── broadcast_to_subscription() # Send to specific subscription
WebSocketConnection() # Per-client connection wrapper
├── accept() # Accept WebSocket connection
├── send_message() # Send JSON/text message
├── receive_message() # Receive and parse message
└── ping() # Send keep-alive ping
message_processor.py
- Message Processing Pipeline
MessageProcessor()
├── process_message() # Main message dispatcher
├── _handle_subscribe() # Process subscription requests
├── _handle_unsubscribe() # Process unsubscription
├── _handle_state_request() # System state requests
└── _handle_session_ops() # Session management operations
MessageType(Enum) # Supported message types
├── SUBSCRIBE
├── UNSUBSCRIBE
├── REQUEST_STATE
├── SET_SESSION_ID
└── PATCH_SESSION
Stream Management (detector_worker/streams/
)
stream_manager.py
- Stream Lifecycle Management
StreamManager()
├── create_stream() # Create RTSP/HTTP stream
├── remove_stream() # Stop and cleanup stream
├── get_latest_frame() # Get current frame
├── reconnect_stream() # Handle connection failures
└── stop_all_streams() # Cleanup all streams
StreamConfig() # Stream configuration
├── stream_url # RTSP/HTTP URL
├── stream_type # "rtsp" or "http_snapshot"
├── target_fps # Target frame rate
└── reconnect_interval # Reconnection delay
StreamReader() # Individual stream handler
├── start() # Start frame capture
├── stop() # Stop and cleanup
├── get_latest_frame() # Get most recent frame
└── _reader_loop() # Main capture loop
frame_reader.py
- Frame Capture Implementation
RTSPReader() # RTSP stream handler
├── connect() # Establish RTSP connection
├── read_frame() # Capture single frame
└── handle_reconnection() # Connection recovery
HTTPSnapshotReader() # HTTP snapshot handler
├── fetch_snapshot() # HTTP GET request
├── decode_image() # Image decoding
└── schedule_next() # Schedule next capture
Detection System (detector_worker/detection/
)
yolo_detector.py
- Object Detection
YOLODetector()
├── load_model() # Load YOLO model
├── detect() # Run inference
├── _preprocess_frame() # Input preprocessing
├── _postprocess_results() # Output processing
└── _filter_detections() # Confidence filtering
DetectionResult() # Detection output structure
├── class_name # Detected class
├── confidence # Detection confidence
├── bounding_box # Spatial coordinates
├── track_id # Tracking identifier
└── timestamp # Detection timestamp
tracking_manager.py
- Multi-Object Tracking
TrackingManager()
├── update_tracks() # Update tracker with detections
├── _associate_detections() # Data association
├── _create_new_tracks() # Initialize new tracks
├── _update_existing_tracks() # Update track states
└── _cleanup_lost_tracks() # Remove stale tracks
Track() # Individual object track
├── update() # Update with new detection
├── predict() # Predict next state
├── is_confirmed() # Track confirmation status
└── time_since_update() # Track age
stability_validator.py
- Detection Validation
StabilityValidator()
├── add_detection() # Add detection to history
├── is_detection_stable() # Check stability criteria
├── _calculate_stability() # Stability metrics
└── _cleanup_old_detections() # History management
Pipeline System (detector_worker/pipeline/
)
pipeline_executor.py
- ML Pipeline Orchestration
PipelineExecutor()
├── execute_pipeline() # Main pipeline execution
├── _run_detection_stage() # Object detection phase
├── _run_classification_branches() # Parallel classification
├── _execute_actions() # Post-processing actions
└── _wait_for_branches() # Synchronization
PipelineContext() # Execution context
├── camera_id # Camera identifier
├── session_id # Session identifier
├── frame # Input frame
├── timestamp # Processing timestamp
└── intermediate_results # Shared results
action_executor.py
- Action Processing
ActionExecutor()
├── execute_action() # Execute single action
├── _redis_save_image() # Redis image storage
├── _postgresql_create() # Database record creation
├── _postgresql_update() # Database record update
└── _publish_message() # Message publishing
ActionType(Enum) # Supported action types
├── REDIS_SAVE_IMAGE
├── POSTGRESQL_CREATE
├── POSTGRESQL_UPDATE
└── PUBLISH_MESSAGE
field_mapper.py
- Dynamic Field Resolution
FieldMapper()
├── resolve_fields() # Resolve template fields
├── _substitute_variables() # Variable substitution
├── _resolve_branch_results() # Branch result mapping
└── _validate_mapping() # Mapping validation
Storage Layer (detector_worker/storage/
)
database_manager.py
- PostgreSQL Operations
DatabaseManager()
├── connect() # Database connection
├── create_record() # INSERT operations
├── update_record() # UPDATE operations
├── get_record() # SELECT operations
├── execute_query() # Raw SQL execution
└── _handle_connection_error() # Error recovery
DatabaseConfig() # Database configuration
├── host, port, database # Connection params
├── user, password # Authentication
└── connection_pool_size # Pool configuration
redis_client.py
- Redis Operations & Image Storage
RedisClient()
├── connect() # Redis connection
├── set/get/delete() # Basic operations
├── pipeline() # Batch operations
└── scan_keys() # Key scanning
RedisImageStorage() # Image-specific operations
├── store_image() # Store with compression
├── retrieve_image() # Retrieve and decode
├── delete_image() # Delete image
└── cleanup_expired() # Cleanup expired images
RedisPublisher/Subscriber() # Pub/Sub messaging
├── publish() # Publish message
├── subscribe() # Subscribe to channel
└── listen() # Message listening
session_cache.py
- High-Performance Caching
SessionCacheManager() # Singleton cache manager
├── cache_detection() # Cache detection results
├── cache_pipeline_result() # Cache pipeline outputs
├── create_session() # Create session entry
├── update_session() # Update session data
└── cleanup_expired() # Cache maintenance
SessionCache() # LRU cache implementation
├── put/get/remove() # Basic cache operations
├── _evict_lru() # LRU eviction
├── _check_memory_limit() # Memory management
└── get_stats() # Cache statistics
Model Management (detector_worker/models/
)
model_manager.py
- Model Loading & Caching
ModelManager()
├── load_model() # Load and cache model
├── get_model() # Retrieve cached model
├── unload_model() # Remove from cache
├── cleanup_unused() # Cache maintenance
└── get_memory_usage() # Memory tracking
ModelCache() # Model cache implementation
├── put/get/remove() # Cache operations
├── _estimate_memory() # Memory estimation
├── _evict_unused() # Memory-based eviction
└── get_cache_stats() # Cache metrics
pipeline_loader.py
- MPTA Pipeline Loading
PipelineLoader()
├── load_pipeline() # Load MPTA file
├── _extract_archive() # ZIP extraction
├── _parse_config() # Configuration parsing
├── _validate_pipeline() # Pipeline validation
└── _load_models() # Load pipeline models
System Startup Flow
Application Initialization Sequence
sequenceDiagram
participant Main as app.py
participant Config as Configuration
participant Container as ServiceContainer
participant Managers as Singleton Managers
participant FastAPI as FastAPI App
Main->>Config: load_configuration()
Config->>Config: load_from_file()
Config->>Config: load_from_env()
Config->>Config: validate_config()
Main->>Container: ServiceContainer()
Main->>Container: register_services()
Container->>Managers: initialize_singletons()
Main->>FastAPI: create_app()
FastAPI->>FastAPI: setup_lifespan()
FastAPI->>FastAPI: add_websocket_routes()
FastAPI->>FastAPI: add_http_routes()
Note over Main,FastAPI: Application Ready
FastAPI->>Main: uvicorn.run()
Detailed Startup Process
-
Configuration Loading (
app.py:15-25
)# Load configuration from multiple sources config = Configuration() config.load_from_file("config.json") # Primary config config.load_from_env() # Environment overrides config.validate_config() # Validation
-
Dependency Injection Setup (
app.py:27-45
)# Create and configure IoC container container = ServiceContainer() # Register core services container.register_singleton(Configuration, lambda: config) container.register_singleton(StreamManager, StreamManager) container.register_singleton(ModelManager, ModelManager) container.register_singleton(PipelineExecutor, PipelineExecutor)
-
Singleton Manager Initialization (
app.py:47-55
)# Initialize thread-safe singleton managers model_state = ModelStateManager() stream_state = StreamStateManager() session_state = SessionStateManager() # ... other managers
-
FastAPI Application Creation (
app.py:57-75
)# Create FastAPI app with lifespan management @asynccontextmanager async def lifespan(app: FastAPI): # Startup logic await initialize_services() yield # Shutdown logic await cleanup_services() app = FastAPI(lifespan=lifespan)
-
Route Registration (
app.py:77-85
)# WebSocket endpoint @app.websocket("/") async def websocket_endpoint(websocket: WebSocket): ws_handler = container.resolve(WebSocketHandler) await ws_handler.handle_connection(websocket) # HTTP endpoints @app.get("/camera/{camera_id}/image") async def get_camera_image(camera_id: str): return stream_manager.get_latest_frame(camera_id)
WebSocket Communication Flow
Client Connection Lifecycle
sequenceDiagram
participant Client as WebSocket Client
participant WS as WebSocketHandler
participant CM as ConnectionManager
participant MP as MessageProcessor
participant SM as StreamManager
Client->>WS: WebSocket Connection
WS->>WS: handle_websocket()
WS->>CM: add_connection()
CM->>CM: create WebSocketConnection
WS->>Client: Connection Accepted
loop Message Processing
Client->>WS: JSON Message
WS->>MP: process_message()
alt Subscribe Message
MP->>SM: create_stream()
SM->>SM: initialize StreamReader
MP->>Client: subscribeAck
else Unsubscribe Message
MP->>SM: remove_stream()
SM->>SM: cleanup StreamReader
MP->>Client: unsubscribeAck
else State Request
MP->>MP: collect_system_state()
MP->>Client: stateReport
end
end
Client->>WS: Disconnect
WS->>CM: remove_connection()
WS->>SM: cleanup_client_streams()
Message Processing Detail
1. Subscribe Message Flow (message_processor.py:125-185
)
async def _handle_subscribe(self, payload: Dict, client_id: str) -> Dict:
"""Process subscription request"""
# 1. Extract subscription parameters
subscription_id = payload["subscriptionIdentifier"]
stream_url = payload.get("rtspUrl") or payload.get("snapshotUrl")
model_url = payload["modelUrl"]
# 2. Create stream configuration
stream_config = StreamConfig(
stream_url=stream_url,
stream_type="rtsp" if "rtsp" in stream_url else "http_snapshot",
crop_region=[payload.get("cropX1"), payload.get("cropY1"),
payload.get("cropX2"), payload.get("cropY2")]
)
# 3. Load ML pipeline
pipeline_config = await pipeline_loader.load_from_url(model_url)
# 4. Create stream (with sharing if same URL)
stream_info = await stream_manager.create_stream(
camera_id=subscription_id.split(';')[1],
config=stream_config,
subscription_id=subscription_id
)
# 5. Register client subscription
connection_manager.add_subscription(client_id, subscription_id)
return {"type": "subscribeAck", "status": "success",
"subscriptionId": subscription_id}
2. Detection Result Broadcasting (websocket_handler.py:245-265
)
async def broadcast_detection_result(self, subscription_id: str,
detection_result: Dict):
"""Broadcast detection to subscribed clients"""
message = {
"type": "imageDetection",
"payload": {
"subscriptionId": subscription_id,
"detections": detection_result["detections"],
"timestamp": detection_result["timestamp"],
"modelInfo": detection_result["model_info"]
}
}
await self.connection_manager.broadcast_to_subscription(
subscription_id, message
)
Detection Pipeline Flow
Complete Detection Workflow
flowchart TD
A[Frame Captured] --> B[YOLO Detection]
B --> C{Expected Classes Found?}
C -->|No| D[Skip Processing]
C -->|Yes| E[Multi-Object Tracking]
E --> F[Stability Validation]
F --> G{Stable Detection?}
G -->|No| H[Continue Tracking]
G -->|Yes| I[Create Database Record]
I --> J[Execute Redis Actions]
J --> K[Start Classification Branches]
K --> L[Brand Classification]
K --> M[Body Type Classification]
K --> N[License Plate Recognition]
L --> O[Wait for All Branches]
M --> O
N --> O
O --> P[Field Mapping & Resolution]
P --> Q[Database Update Combined]
Q --> R[Broadcast Results]
R --> S[Update Session Cache]
Detailed Pipeline Execution (pipeline_executor.py:85-250
)
1. Detection Stage
async def _run_detection_stage(self, pipeline_config: Dict,
context: PipelineContext) -> List[DetectionResult]:
"""Execute object detection stage"""
# 1. Load detection model
model = await self.model_manager.load_model(
ModelConfig.from_dict(pipeline_config)
)
# 2. Run YOLO inference
detector = YOLODetector()
raw_detections = detector.detect(
frame=context.frame,
confidence_threshold=pipeline_config["minConfidence"]
)
# 3. Filter expected classes
expected_classes = pipeline_config["expectedClasses"]
filtered_detections = [
det for det in raw_detections
if det.class_name in expected_classes
]
# 4. Update tracking
if len(filtered_detections) > 0:
tracking_manager = TrackingManager()
tracked_detections = tracking_manager.update_tracks(
filtered_detections, context.frame_id
)
# 5. Validate stability
stability_validator = StabilityValidator()
stable_detections = []
for det in tracked_detections:
if stability_validator.is_detection_stable(det):
stable_detections.append(det)
return stable_detections
return []
2. Action Execution Stage
async def _execute_actions(self, actions: List[Dict],
context: PipelineContext) -> Dict:
"""Execute pipeline actions"""
action_results = {}
action_executor = ActionExecutor()
for action in actions:
action_type = action["type"]
if action_type == "redis_save_image":
# Save cropped image to Redis
result = await action_executor.redis_save_image(
frame=context.frame,
region=action["region"], # e.g., "Frontal"
key_template=action["key"],
context=context,
expire_seconds=action.get("expire_seconds", 3600)
)
elif action_type == "postgresql_create_record":
# Create initial database record
result = await action_executor.postgresql_create(
table=action["table"],
fields=action["fields"],
context=context
)
action_results[action_type] = result
return action_results
3. Classification Branch Execution
async def _run_classification_branches(self, branches: List[Dict],
context: PipelineContext) -> Dict:
"""Execute parallel classification branches"""
if not branches:
return {}
# 1. Create parallel tasks for each branch
branch_tasks = []
for branch in branches:
if branch.get("parallel", False):
task = asyncio.create_task(
self._execute_branch(branch, context)
)
branch_tasks.append((branch["modelId"], task))
# 2. Wait for all branches to complete
branch_results = {}
for model_id, task in branch_tasks:
try:
result = await task
branch_results[model_id] = result
except Exception as e:
logging.error(f"Branch {model_id} failed: {e}")
branch_results[model_id] = {"error": str(e)}
return branch_results
async def _execute_branch(self, branch_config: Dict,
context: PipelineContext) -> Dict:
"""Execute single classification branch"""
# 1. Load classification model
model = await self.model_manager.load_model(
ModelConfig.from_dict(branch_config)
)
# 2. Prepare input (crop if specified)
input_frame = context.frame
if branch_config.get("crop", False):
crop_class = branch_config["cropClass"]
# Find detection of specified class and crop
for detection in context.detections:
if detection.class_name == crop_class:
bbox = detection.bounding_box
input_frame = context.frame[bbox.y1:bbox.y2, bbox.x1:bbox.x2]
break
# 3. Run classification
classifier = YOLODetector() # Can handle classification too
results = classifier.classify(
frame=input_frame,
confidence_threshold=branch_config["minConfidence"]
)
# 4. Format results based on model type
if "brand" in branch_config["modelId"]:
return {"brand": results.top_class, "confidence": results.confidence}
elif "bodytype" in branch_config["modelId"]:
return {"body_type": results.top_class, "confidence": results.confidence}
else:
return {"class": results.top_class, "confidence": results.confidence}
4. Field Mapping & Database Update
async def _execute_parallel_actions(self, actions: List[Dict],
context: PipelineContext,
branch_results: Dict) -> Dict:
"""Execute actions that depend on branch results"""
for action in actions:
if action["type"] == "postgresql_update_combined":
# 1. Wait for specified branches to complete
wait_for_branches = action.get("waitForBranches", [])
for branch_id in wait_for_branches:
if branch_id not in branch_results:
logging.warning(f"Branch {branch_id} not completed")
# 2. Resolve field mappings
field_mapper = FieldMapper()
resolved_fields = field_mapper.resolve_fields(
field_templates=action["fields"],
context=context,
branch_results=branch_results
)
# Example field resolution:
# "{car_brand_cls_v1.brand}" -> "Toyota"
# "{car_bodytype_cls_v1.body_type}" -> "Sedan"
# 3. Execute database update
database_manager = DatabaseManager()
await database_manager.update_record(
table=action["table"],
key_value=context.session_id,
key_field=action["key_field"],
update_data=resolved_fields
)
return {"status": "success", "fields_updated": resolved_fields}
return {}
Data Storage Flow
Database Operations Flow
sequenceDiagram
participant PE as PipelineExecutor
participant AE as ActionExecutor
participant DM as DatabaseManager
participant FM as FieldMapper
participant DB as PostgreSQL
PE->>AE: postgresql_create_record
AE->>DM: create_record()
DM->>DB: INSERT INTO car_frontal_info
DB->>DM: session_id (UUID)
DM->>AE: Record created
Note over PE: Classification branches execute...
PE->>FM: resolve_fields(templates, branch_results)
FM->>FM: substitute variables
FM->>PE: resolved_fields
PE->>AE: postgresql_update_combined
AE->>DM: update_record()
DM->>DB: UPDATE car_frontal_info SET car_brand=?, car_body_type=?
DB->>DM: Update successful
DM->>AE: Record updated
Redis Storage Operations
sequenceDiagram
participant AE as ActionExecutor
participant RC as RedisClient
participant RIS as RedisImageStorage
participant Redis as Redis Server
AE->>RC: redis_save_image
RC->>RIS: store_image()
RIS->>RIS: crop_region_from_frame()
RIS->>RIS: compress_image()
RIS->>Redis: SET key encoded_image
RIS->>Redis: EXPIRE key 600
Redis->>RIS: OK
RIS->>RC: Storage successful
RC->>AE: Image saved
Session Cache Operations
flowchart LR
A[Detection Event] --> B[Cache Detection Result]
B --> C[Create Session Entry]
C --> D[Pipeline Processing]
D --> E[Update Session with Branch Results]
E --> F[Cache Pipeline Result]
F --> G[Broadcast to Clients]
subgraph "Cache Types"
H[Detection Cache<br/>Latest detection per camera]
I[Pipeline Cache<br/>Pipeline execution results]
J[Session Cache<br/>Session tracking data]
end
B --> H
F --> I
C --> J
E --> J
Error Handling & Recovery
Exception Hierarchy & Handling
classDiagram
DetectorWorkerError <|-- ConfigurationError
DetectorWorkerError <|-- StreamError
DetectorWorkerError <|-- ModelError
DetectorWorkerError <|-- PipelineError
DetectorWorkerError <|-- DatabaseError
DetectorWorkerError <|-- RedisError
DetectorWorkerError <|-- MessageProcessingError
StreamError <|-- ConnectionError
StreamError <|-- StreamTimeoutError
ModelError <|-- ModelLoadError
ModelError <|-- ModelCacheError
PipelineError <|-- ActionExecutionError
PipelineError <|-- BranchExecutionError
Error Recovery Strategies
1. Stream Connection Recovery (stream_manager.py:245-285
)
async def _handle_stream_error(self, stream_id: str, error: Exception):
"""Handle stream errors with exponential backoff retry"""
stream_info = self.get_stream_info(stream_id)
if not stream_info:
return
# Increment error count
stream_info.error_count += 1
stream_info.update_status("error", error_message=str(error))
# Exponential backoff retry
max_retries = stream_info.config.max_retries
if max_retries == -1 or stream_info.error_count <= max_retries:
# Calculate backoff delay
base_delay = stream_info.config.reconnect_interval
backoff_delay = base_delay * (2 ** min(stream_info.error_count - 1, 6))
logging.warning(f"Stream {stream_id} error: {error}. "
f"Retrying in {backoff_delay} seconds...")
await asyncio.sleep(backoff_delay)
try:
await self.reconnect_stream(stream_id)
stream_info.error_count = 0 # Reset on success
stream_info.update_status("active")
except Exception as retry_error:
logging.error(f"Stream {stream_id} retry failed: {retry_error}")
await self._handle_stream_error(stream_id, retry_error)
else:
# Max retries exceeded
logging.error(f"Stream {stream_id} exceeded max retries. Marking as failed.")
stream_info.update_status("failed")
await self._notify_stream_failure(stream_id)
2. Database Connection Recovery (database_manager.py:185-220
)
async def _execute_with_retry(self, operation: Callable, *args, **kwargs):
"""Execute database operation with connection retry"""
max_retries = 3
retry_delay = 1.0
for attempt in range(max_retries + 1):
try:
return await operation(*args, **kwargs)
except psycopg2.OperationalError as e:
if attempt == max_retries:
raise DatabaseError(f"Database operation failed after {max_retries} retries: {e}")
logging.warning(f"Database operation failed (attempt {attempt + 1}): {e}")
# Try to reconnect
try:
await self.disconnect()
await asyncio.sleep(retry_delay)
await self.connect()
retry_delay *= 2 # Exponential backoff
except Exception as reconnect_error:
logging.error(f"Database reconnection failed: {reconnect_error}")
except Exception as e:
# Non-recoverable error
raise DatabaseError(f"Database operation failed: {e}")
3. Pipeline Error Isolation (pipeline_executor.py:325-365
)
async def _execute_branch_with_isolation(self, branch_config: Dict,
context: PipelineContext) -> Dict:
"""Execute branch with error isolation"""
branch_id = branch_config["modelId"]
try:
# Set timeout for branch execution
timeout = branch_config.get("timeout_seconds", 30)
result = await asyncio.wait_for(
self._execute_branch(branch_config, context),
timeout=timeout
)
return result
except asyncio.TimeoutError:
error_msg = f"Branch {branch_id} timed out after {timeout} seconds"
logging.error(error_msg)
return {"error": error_msg, "type": "timeout"}
except ModelError as e:
error_msg = f"Branch {branch_id} model error: {e}"
logging.error(error_msg)
return {"error": error_msg, "type": "model_error"}
except Exception as e:
error_msg = f"Branch {branch_id} unexpected error: {e}"
logging.error(error_msg, exc_info=True)
return {"error": error_msg, "type": "unexpected_error"}
async def _handle_partial_branch_failure(self, branch_results: Dict,
required_branches: List[str]) -> bool:
"""Determine if pipeline can continue with partial branch failures"""
successful_branches = [
branch_id for branch_id, result in branch_results.items()
if not isinstance(result, dict) or "error" not in result
]
failed_branches = [
branch_id for branch_id, result in branch_results.items()
if isinstance(result, dict) and "error" in result
]
if failed_branches:
logging.warning(f"Failed branches: {failed_branches}")
logging.info(f"Successful branches: {successful_branches}")
# Continue if at least one required branch succeeded
required_successful = any(
branch_id in successful_branches
for branch_id in required_branches
)
return required_successful
Testing Architecture
Test Structure Overview
tests/
├── unit/ # Fast, isolated unit tests
│ ├── core/ # Core module tests (config, DI, singletons)
│ ├── detection/ # Detection system tests
│ ├── pipeline/ # Pipeline execution tests
│ ├── streams/ # Stream management tests
│ ├── communication/ # WebSocket & messaging tests
│ ├── storage/ # Storage layer tests
│ └── models/ # Model management tests
├── integration/ # Multi-component integration tests
│ ├── test_complete_detection_workflow.py
│ ├── test_websocket_protocol.py
│ └── test_pipeline_integration.py
├── performance/ # Performance benchmarks
│ ├── test_detection_performance.py
│ ├── test_websocket_performance.py
│ └── test_storage_performance.py
└── conftest.py # Shared fixtures and configuration
Test Execution Flows
Unit Test Example (tests/unit/detection/test_yolo_detector.py
)
class TestYOLODetector:
"""Test YOLO detector functionality"""
def test_detection_basic_functionality(self, mock_frame):
"""Test basic detection pipeline"""
detector = YOLODetector()
with patch('torch.load') as mock_torch_load:
# Setup mock model
mock_model = Mock()
mock_result = self._create_mock_detection_result()
mock_model.return_value = mock_result
mock_torch_load.return_value = mock_model
# Execute detection
detections = detector.detect(mock_frame, confidence_threshold=0.5)
# Verify results
assert len(detections) == 2
assert detections[0].class_name == "car"
assert detections[0].confidence >= 0.5
assert isinstance(detections[0].bounding_box, BoundingBox)
Integration Test Example (tests/integration/test_complete_detection_workflow.py
)
@pytest.mark.asyncio
async def test_complete_rtsp_detection_workflow(self, temp_config_file,
sample_mpta_file, mock_frame):
"""Test complete workflow: RTSP stream -> detection -> classification -> database"""
# 1. Initialize all components
config = Configuration()
config.load_from_file(temp_config_file)
# 2. Mock external dependencies (Redis, DB, models)
with patch('cv2.VideoCapture') as mock_video_cap, \
patch('torch.load') as mock_torch_load, \
patch('psycopg2.connect') as mock_db_connect:
# Setup mocks...
# 3. Execute complete workflow
stream_manager = StreamManager()
pipeline_executor = PipelineExecutor()
# Create stream
stream_info = await stream_manager.create_stream(camera_id, config, sub_id)
# Run pipeline
result = await pipeline_executor.execute_pipeline(pipeline_config, context)
# 4. Verify end-to-end results
assert result["status"] == "completed"
assert "detections" in result
assert "classification_results" in result
# Verify database operations occurred
assert mock_db_cursor.execute.called
# Verify Redis operations occurred
assert mock_redis.set.called
Performance Test Example (tests/performance/test_detection_performance.py
)
def test_yolo_detection_speed(self, sample_frame, performance_config):
"""Benchmark YOLO detection speed"""
detector = YOLODetector()
# Warm up
for _ in range(5):
detector.detect(sample_frame, confidence_threshold=0.5)
# Benchmark
detection_times = []
for _ in range(100):
start_time = time.perf_counter()
detections = detector.detect(sample_frame, confidence_threshold=0.5)
end_time = time.perf_counter()
detection_times.append((end_time - start_time) * 1000)
# Analyze performance
avg_time = statistics.mean(detection_times)
theoretical_fps = 1000 / avg_time
# Performance assertions
assert avg_time < performance_config["max_detection_time_ms"]
assert theoretical_fps >= performance_config["target_fps"]
print(f"Average detection time: {avg_time:.2f} ms")
print(f"Theoretical FPS: {theoretical_fps:.1f}")
Development Workflow
Cross-Platform Setup
The Makefile automatically detects the correct Python command for your platform:
- macOS/Linux: Uses
python3
andpip3
if available - Windows: Falls back to
python
andpip
- Automatic Detection: No manual configuration needed
Step-by-Step Project Setup
1. Clone and Navigate to Project
git clone <repository-url>
cd python-detector-worker
2. Install Dependencies
# Install production dependencies
make install
# Install development dependencies (includes testing tools)
make install-dev
# Check environment information
make env-info
3. Configure Environment (Optional)
# Copy example configuration (if exists)
cp config.example.json config.json
# Set environment variables for development
export DETECTOR_WORKER_ENV=dev
export DETECTOR_WORKER_PORT=8001
4. Run the Application
# For development (staging port 8001 with auto-reload)
make run-staging
# For production (port 8000)
make run-prod
# For debugging (verbose logging on staging port)
make run-debug
5. Verify Installation
# Check system health
curl http://localhost:8001/health
# Run basic tests
make test-fast
# Check code quality
make lint
Development Commands
Environment Management
make env-info # Show environment information
make check-deps # Verify dependency integrity
make update-deps # List outdated dependencies
make freeze # Generate requirements-frozen.txt
Code Development
# Code quality
make format # Format code with black & isort
make lint # Run code linting (flake8, mypy)
make quality # Run all quality checks
# Application execution
make run # Run production mode (port 8000) with reload
make run-staging # Run staging mode (port 8001) with reload
make run-prod # Run production mode (port 8000) without reload
make run-debug # Run debug mode (port 8001) with verbose logging
Testing Framework
# Test execution
make test # Run all tests with coverage
make test-unit # Run unit tests only
make test-integration # Run integration tests
make test-performance # Run performance benchmarks
make test-fast # Run fast tests only (skip slow markers)
make test-coverage # Generate detailed coverage report
make test-failed # Rerun only failed tests
# CI/CD testing
make ci-test # Run CI-optimized tests
make ci-quality # Run CI quality checks
Docker Operations
# Container management
make docker-build # Build Docker image
make docker-run # Run container (production port 8000)
make docker-run-staging # Run container (staging port 8001)
make docker-dev # Run development container with volume mounts
Utilities
# Maintenance
make clean # Clean build artifacts and cache
make monitor # Start system resource monitor
make profile # Run performance profiling
make version # Show application version
# Database utilities (when implemented)
make db-migrate # Run database migrations
make db-reset # Reset database to initial state
Using the Test Runner
# Basic test execution
python scripts/run_tests.py --all # All tests with coverage
python scripts/run_tests.py --unit --verbose # Unit tests with verbose output
python scripts/run_tests.py --integration # Integration tests only
python scripts/run_tests.py --performance # Performance benchmarks
# Advanced options
python scripts/run_tests.py --fast # Fast tests only (no slow markers)
python scripts/run_tests.py --failed # Rerun only failed tests
python scripts/run_tests.py --specific "config" # Run tests matching pattern
python scripts/run_tests.py --coverage --open-browser # Generate and open coverage report
# Quality checks
python scripts/run_tests.py --quality # Run linting and formatting checks
CI/CD Pipeline
The GitHub Actions workflow (.github/workflows/ci.yml
) runs:
- Code Quality Checks: flake8, mypy, black, isort
- Unit Tests: Fast, isolated tests with coverage
- Integration Tests: With Redis and PostgreSQL services
- Performance Tests: On main branch pushes
- Security Scans: safety and bandit
- Docker Build: Verify containerization works
Adding New Features
-
Create Feature Branch
git checkout -b feature/new-detection-algorithm
-
Implement with TDD
# 1. Write failing test # tests/unit/detection/test_new_algorithm.py # 2. Implement minimal code to pass # detector_worker/detection/new_algorithm.py # 3. Refactor and improve # 4. Add integration tests if needed
-
Run Quality Checks
make format # Auto-format code make lint # Check code quality make test # Run all tests
-
Create Pull Request
- CI/CD pipeline runs automatically
- Coverage report posted as comment
- All checks must pass before merge
Debugging & Monitoring
Logging Configuration
# detector_worker/utils/logging.py
import logging
def setup_logging(level=logging.INFO):
"""Configure structured logging"""
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s'
)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
# File handler
file_handler = logging.FileHandler('detector_worker.log')
file_handler.setFormatter(formatter)
# Root logger
logger = logging.getLogger()
logger.setLevel(level)
logger.addHandler(console_handler)
logger.addHandler(file_handler)
return logger
Performance Monitoring
# detector_worker/utils/monitoring.py
import psutil
import time
from typing import Dict
class SystemMonitor:
"""System resource monitoring"""
def get_system_metrics(self) -> Dict:
"""Get current system metrics"""
return {
"cpu_percent": psutil.cpu_percent(),
"memory_percent": psutil.virtual_memory().percent,
"disk_percent": psutil.disk_usage('/').percent,
"network_io": psutil.net_io_counters()._asdict(),
"timestamp": time.time()
}
def get_process_metrics(self) -> Dict:
"""Get current process metrics"""
process = psutil.Process()
return {
"pid": process.pid,
"cpu_percent": process.cpu_percent(),
"memory_mb": process.memory_info().rss / 1024 / 1024,
"threads": process.num_threads(),
"open_files": len(process.open_files()),
"connections": len(process.connections()),
"timestamp": time.time()
}
This comprehensive architecture documentation provides a complete technical overview of the refactored system, enabling any engineer to quickly understand and contribute to the codebase. The modular design, clear separation of concerns, and extensive testing infrastructure ensure the system is maintainable, scalable, and reliable.