#!/usr/bin/env python3 """ Validation script to test the refactored detector worker. This script validates that: 1. All modules can be imported successfully 2. Singleton managers work correctly 3. Dependency injection container functions 4. Configuration system operates properly """ import sys import traceback import logging def test_imports(): """Test that all refactored modules can be imported.""" print("šŸ” Testing module imports...") try: # Core modules from detector_worker.core.config import get_config_manager from detector_worker.core.singleton_managers import ( ModelStateManager, StreamStateManager, SessionStateManager ) from detector_worker.core.dependency_injection import get_container from detector_worker.core.exceptions import DetectionError # Detection modules from detector_worker.detection.detection_result import DetectionResult from detector_worker.detection.yolo_detector import YOLODetector # Pipeline modules from detector_worker.pipeline.pipeline_executor import PipelineExecutor from detector_worker.pipeline.action_executor import ActionExecutor # Storage modules from detector_worker.storage.database_manager import DatabaseManager from detector_worker.storage.redis_client import RedisClientManager # Communication modules from detector_worker.communication.websocket_handler import WebSocketHandler from detector_worker.communication.message_processor import MessageProcessor # Utils from detector_worker.utils.error_handler import ErrorHandler from detector_worker.utils.system_monitor import get_system_metrics print("āœ… All imports successful!") return True except ImportError as e: print(f"āŒ Import failed: {e}") traceback.print_exc() return False except Exception as e: print(f"āŒ Unexpected error during import: {e}") traceback.print_exc() return False def test_singleton_managers(): """Test that singleton managers work correctly.""" print("\nšŸ” Testing singleton managers...") try: from detector_worker.core.singleton_managers import ( ModelStateManager, StreamStateManager, SessionStateManager, CacheStateManager, CameraStateManager, PipelineStateManager ) # Test that singletons return same instance model1 = ModelStateManager() model2 = ModelStateManager() assert model1 is model2, "ModelStateManager not singleton" stream1 = StreamStateManager() stream2 = StreamStateManager() assert stream1 is stream2, "StreamStateManager not singleton" # Test basic functionality model_manager = ModelStateManager() stats = model_manager.get_stats() assert isinstance(stats, dict), "Stats should be dict" print("āœ… Singleton managers working correctly!") return True except Exception as e: print(f"āŒ Singleton manager test failed: {e}") traceback.print_exc() return False def test_dependency_injection(): """Test dependency injection container.""" print("\nšŸ” Testing dependency injection...") try: from detector_worker.core.dependency_injection import get_container container = get_container() stats = container.get_container().get_stats() assert isinstance(stats, dict), "Container stats should be dict" assert "registered_services" in stats, "Missing registered_services" print(f"āœ… Dependency injection working! Registered services: {stats['registered_services']}") return True except Exception as e: print(f"āŒ Dependency injection test failed: {e}") traceback.print_exc() return False def test_configuration(): """Test configuration management.""" print("\nšŸ” Testing configuration system...") try: from detector_worker.core.config import get_config_manager, validate_config config_manager = get_config_manager() config = config_manager.get_all() assert isinstance(config, dict), "Config should be dict" assert "max_streams" in config, "Missing max_streams config" # Test validation errors = validate_config() assert isinstance(errors, list), "Validation errors should be list" print(f"āœ… Configuration system working! Config keys: {len(config)}") return True except Exception as e: print(f"āŒ Configuration test failed: {e}") traceback.print_exc() return False def test_error_handling(): """Test error handling system.""" print("\nšŸ” Testing error handling...") try: from detector_worker.utils.error_handler import ErrorHandler, ErrorContext, ErrorSeverity handler = ErrorHandler("test_component") context = ErrorContext(component="test", operation="test_op") # Test error stats stats = handler.get_error_stats() assert isinstance(stats, dict), "Error stats should be dict" print("āœ… Error handling system working!") return True except Exception as e: print(f"āŒ Error handling test failed: {e}") traceback.print_exc() return False def main(): """Run all validation tests.""" print("šŸš€ Starting Detector Worker Refactor Validation") print("=" * 50) tests = [ test_imports, test_singleton_managers, test_dependency_injection, test_configuration, test_error_handling ] passed = 0 total = len(tests) for test in tests: if test(): passed += 1 print("\n" + "=" * 50) print(f"šŸŽÆ Validation Results: {passed}/{total} tests passed") if passed == total: print("šŸŽ‰ All validation tests passed! Refactor is working correctly.") return 0 else: print("āŒ Some validation tests failed. Please check the errors above.") return 1 if __name__ == "__main__": sys.exit(main())