diff --git a/app.py b/app.py index d9a571b..f6ca225 100644 --- a/app.py +++ b/app.py @@ -18,6 +18,10 @@ from detector_worker.core.singleton_managers import ( from detector_worker.communication.websocket_handler import WebSocketHandler from detector_worker.utils.system_monitor import get_system_metrics from detector_worker.utils.error_handler import ErrorHandler, create_logger +from detector_worker.utils.websocket_debug import ( + setup_websocket_logging, enable_websocket_debug, disable_websocket_debug, + get_websocket_debug_status +) # Setup logging logger = create_logger("detector_worker.main", logging.INFO) @@ -54,6 +58,9 @@ async def lifespan(app: FastAPI): logger.info("Configuration validation passed") + # Setup WebSocket logging based on configuration + setup_websocket_logging() + # Log startup information config = config_manager.get_all() logger.info(f"Max streams: {config.get('max_streams', 5)}") @@ -211,6 +218,48 @@ async def reload_configuration(): raise HTTPException(status_code=500, detail="Internal server error") +@app.get("/debug/websocket") +async def get_websocket_debug_status(): + """Get current WebSocket debugging status.""" + try: + return get_websocket_debug_status() + except Exception as e: + logger.error(f"Error getting WebSocket debug status: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +@app.post("/debug/websocket/enable") +async def enable_websocket_debugging(): + """Enable WebSocket debugging.""" + try: + enable_websocket_debug() + logger.info("WebSocket debugging enabled via API") + return { + "success": True, + "message": "WebSocket debugging enabled", + "status": get_websocket_debug_status() + } + except Exception as e: + logger.error(f"Error enabling WebSocket debugging: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +@app.post("/debug/websocket/disable") +async def disable_websocket_debugging(): + """Disable WebSocket debugging.""" + try: + disable_websocket_debug() + logger.info("WebSocket debugging disabled via API") + return { + "success": True, + "message": "WebSocket debugging disabled", + "status": get_websocket_debug_status() + } + except Exception as e: + logger.error(f"Error disabling WebSocket debugging: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + if __name__ == "__main__": import uvicorn diff --git a/detector_worker/communication/message_processor.py b/detector_worker/communication/message_processor.py index a6167e6..fba0cfd 100644 --- a/detector_worker/communication/message_processor.py +++ b/detector_worker/communication/message_processor.py @@ -15,6 +15,7 @@ from ..core.exceptions import ValidationError, MessageProcessingError # Setup logging logger = logging.getLogger("detector_worker.message_processor") +msg_proc_logger = logging.getLogger("websocket.message_processor") # Detailed message processing logger class MessageType(Enum): @@ -161,24 +162,31 @@ class MessageProcessor: Raises: MessageProcessingError: If message is invalid """ + msg_proc_logger.debug(f"📨 PARSING MESSAGE: {raw_message[:100]}{'...' if len(raw_message) > 100 else ''}") + try: data = json.loads(raw_message) except json.JSONDecodeError as e: + msg_proc_logger.error(f"❌ JSON PARSE ERROR: {e}") raise MessageProcessingError(f"Invalid JSON: {e}") # Validate message structure if not isinstance(data, dict): + msg_proc_logger.error(f"❌ INVALID MESSAGE STRUCTURE: Expected dict, got {type(data)}") raise MessageProcessingError("Message must be a JSON object") # Extract message type msg_type_str = data.get("type") if not msg_type_str: + msg_proc_logger.error("❌ MISSING MESSAGE TYPE") raise MessageProcessingError("Missing 'type' field in message") # Convert to MessageType enum try: msg_type = MessageType(msg_type_str) + msg_proc_logger.debug(f"✅ PARSED MESSAGE TYPE: {msg_type.value}") except ValueError: + msg_proc_logger.error(f"❌ UNKNOWN MESSAGE TYPE: {msg_type_str}") raise MessageProcessingError(f"Unknown message type: {msg_type_str}") return msg_type, data @@ -197,13 +205,22 @@ class MessageProcessor: Raises: ValidationError: If validation fails """ + msg_proc_logger.debug(f"🔍 VALIDATING {msg_type.value} MESSAGE") + # Get validator for message type validator = self.validators.get(msg_type) if not validator: # No validation needed for some message types + msg_proc_logger.debug(f"â„šī¸ NO VALIDATION NEEDED FOR {msg_type.value}") return data.get("payload", {}) - return validator(data) + try: + result = validator(data) + msg_proc_logger.debug(f"✅ VALIDATION SUCCESS FOR {msg_type.value}") + return result + except ValidationError as e: + msg_proc_logger.error(f"❌ VALIDATION FAILED FOR {msg_type.value}: {e}") + raise def _validate_subscribe(self, data: Dict[str, Any]) -> SubscribePayload: """Validate subscribe message.""" diff --git a/detector_worker/communication/websocket_handler.py b/detector_worker/communication/websocket_handler.py index da05d6a..e86199b 100644 --- a/detector_worker/communication/websocket_handler.py +++ b/detector_worker/communication/websocket_handler.py @@ -32,6 +32,7 @@ from ..utils.system_monitor import get_system_metrics # Setup logging logger = logging.getLogger("detector_worker.websocket_handler") ws_logger = logging.getLogger("websocket") +ws_rxtx_logger = logging.getLogger("websocket.rxtx") # Dedicated RX/TX logger # Type definitions for callbacks MessageHandler = Callable[[Dict[str, Any]], asyncio.coroutine] @@ -109,7 +110,11 @@ class WebSocketHandler: self.websocket = websocket self.connected = True - logger.info("WebSocket connection accepted") + # Log connection details + client_host = getattr(websocket.client, 'host', 'unknown') + client_port = getattr(websocket.client, 'port', 'unknown') + logger.info(f"🔗 WebSocket connection accepted from {client_host}:{client_port}") + ws_rxtx_logger.info(f"CONNECT -> Client: {client_host}:{client_port}") # Create concurrent tasks stream_task = asyncio.create_task(self._process_streams()) @@ -126,6 +131,9 @@ class WebSocketHandler: finally: self.connected = False + client_host = getattr(websocket.client, 'host', 'unknown') if websocket.client else 'unknown' + client_port = getattr(websocket.client, 'port', 'unknown') if websocket.client else 'unknown' + ws_rxtx_logger.info(f"DISCONNECT -> Client: {client_host}:{client_port}") await self._cleanup() async def _cleanup(self) -> None: @@ -181,7 +189,9 @@ class WebSocketHandler: } } - ws_logger.info(f"TX -> {json.dumps(state_data, separators=(',', ':'))}") + # Compact JSON for RX/TX logging + compact_json = json.dumps(state_data, separators=(',', ':')) + ws_rxtx_logger.info(f"TX -> {compact_json}") await self.websocket.send_json(state_data) await asyncio.sleep(HEARTBEAT_INTERVAL) @@ -198,16 +208,21 @@ class WebSocketHandler: while self.connected: try: text_data = await self.websocket.receive_text() - ws_logger.info(f"RX <- {text_data}") + ws_rxtx_logger.info(f"RX <- {text_data}") data = json.loads(text_data) msg_type = data.get("type") + # Log message processing + logger.debug(f"đŸ“Ĩ Processing message type: {msg_type}") + if msg_type in self.message_handlers: handler = self.message_handlers[msg_type] await handler(data) + logger.debug(f"✅ Message {msg_type} processed successfully") else: - logger.error(f"Unknown message type: {msg_type}") + logger.error(f"❌ Unknown message type: {msg_type}") + ws_rxtx_logger.error(f"UNKNOWN_MSG_TYPE -> {msg_type}") except json.JSONDecodeError: logger.error("Received invalid JSON message") @@ -437,7 +452,7 @@ class WebSocketHandler: "sessionId": session_id } } - ws_logger.info(f"TX -> {json.dumps(response, separators=(',', ':'))}") + ws_rxtx_logger.info(f"TX -> {json.dumps(response, separators=(',', ':'))}") await self.websocket.send_json(response) logger.info(f"Set session {session_id} for display {display_id}") @@ -463,7 +478,7 @@ class WebSocketHandler: "patchData": patch_data } } - ws_logger.info(f"TX -> {json.dumps(response, separators=(',', ':'))}") + ws_rxtx_logger.info(f"TX -> {json.dumps(response, separators=(',', ':'))}") await self.websocket.send_json(response) async def _handle_set_progression_stage(self, data: Dict[str, Any]) -> None: @@ -539,7 +554,7 @@ class WebSocketHandler: } try: - ws_logger.info(f"TX -> {json.dumps(detection_data, separators=(',', ':'))}") + ws_rxtx_logger.info(f"TX -> {json.dumps(detection_data, separators=(',', ':'))}") await self.websocket.send_json(detection_data) except RuntimeError as e: if "websocket.close" in str(e): @@ -571,7 +586,7 @@ class WebSocketHandler: } try: - ws_logger.info(f"TX -> {json.dumps(detection_data, separators=(',', ':'))}") + ws_rxtx_logger.info(f"TX -> {json.dumps(detection_data, separators=(',', ':'))}") await self.websocket.send_json(detection_data) except RuntimeError as e: if "websocket.close" in str(e): diff --git a/detector_worker/core/config.py b/detector_worker/core/config.py index 991151c..c777674 100644 --- a/detector_worker/core/config.py +++ b/detector_worker/core/config.py @@ -181,6 +181,10 @@ class LoggingConfig: file_path: Optional[str] = "detector_worker.log" max_file_size_mb: int = 10 backup_count: int = 5 + # WebSocket debugging + websocket_debug: bool = False + websocket_rxtx_log_level: str = "INFO" + websocket_message_proc_log_level: str = "DEBUG" @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'LoggingConfig': @@ -244,6 +248,11 @@ class ConfigurationManager: "session_timeout": 600, "models_dir": "models", "log_level": "INFO", + "logging": { + "websocket_debug": False, + "websocket_rxtx_log_level": "INFO", + "websocket_message_proc_log_level": "DEBUG" + }, "database": { "enabled": False, "host": "localhost", diff --git a/detector_worker/utils/websocket_debug.py b/detector_worker/utils/websocket_debug.py new file mode 100644 index 0000000..f0e9db6 --- /dev/null +++ b/detector_worker/utils/websocket_debug.py @@ -0,0 +1,165 @@ +""" +WebSocket debugging utilities. + +This module provides utilities for configuring WebSocket logging and debugging +based on configuration settings. +""" +import logging +import sys +from typing import Optional +from pathlib import Path + +from ..core.config import get_config_manager + + +def setup_websocket_logging( + enable_debug: Optional[bool] = None, + rxtx_log_level: Optional[str] = None, + message_proc_log_level: Optional[str] = None, + log_file: Optional[str] = None +) -> None: + """ + Set up WebSocket logging based on configuration. + + Args: + enable_debug: Enable WebSocket debugging (overrides config) + rxtx_log_level: Log level for RX/TX messages (overrides config) + message_proc_log_level: Log level for message processing (overrides config) + log_file: Optional log file path for WebSocket messages + """ + # Get configuration + config_manager = get_config_manager() + logging_config = config_manager.get_logging_config() + + # Use provided values or fall back to config + debug_enabled = enable_debug if enable_debug is not None else logging_config.websocket_debug + rxtx_level = rxtx_log_level or logging_config.websocket_rxtx_log_level + msg_proc_level = message_proc_log_level or logging_config.websocket_message_proc_log_level + + # Configure WebSocket RX/TX logger + ws_rxtx_logger = logging.getLogger("websocket.rxtx") + ws_rxtx_logger.setLevel(getattr(logging, rxtx_level.upper(), logging.INFO)) + + # Configure WebSocket message processor logger + ws_msg_proc_logger = logging.getLogger("websocket.message_processor") + ws_msg_proc_logger.setLevel(getattr(logging, msg_proc_level.upper(), logging.DEBUG)) + + # Configure main WebSocket logger + ws_logger = logging.getLogger("websocket") + ws_logger.setLevel(logging.DEBUG if debug_enabled else logging.INFO) + + # Set up console handler for debugging if not already present + if debug_enabled and not ws_rxtx_logger.handlers: + # Create console handler + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setLevel(getattr(logging, rxtx_level.upper(), logging.INFO)) + + # Create formatter for WebSocket messages + formatter = logging.Formatter( + "%(asctime)s | WEBSOCKET | %(name)s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S" + ) + console_handler.setFormatter(formatter) + + # Add handler to both WebSocket loggers + ws_rxtx_logger.addHandler(console_handler) + ws_msg_proc_logger.addHandler(console_handler) + + # Set up file handler if log file is specified + if log_file: + log_path = Path(log_file) + log_path.parent.mkdir(parents=True, exist_ok=True) + + # Create file handler + file_handler = logging.FileHandler(log_path) + file_handler.setLevel(logging.DEBUG) + + # Create detailed formatter for file output + file_formatter = logging.Formatter( + "%(asctime)s | %(levelname)s | %(name)s | %(message)s" + ) + file_handler.setFormatter(file_formatter) + + # Add to all WebSocket loggers + ws_rxtx_logger.addHandler(file_handler) + ws_msg_proc_logger.addHandler(file_handler) + ws_logger.addHandler(file_handler) + + # Log configuration status + main_logger = logging.getLogger("detector_worker.websocket_debug") + main_logger.info(f"WebSocket debugging configured: " + f"enabled={debug_enabled}, " + f"rxtx_level={rxtx_level}, " + f"msg_proc_level={msg_proc_level}, " + f"log_file={log_file}") + + +def enable_websocket_debug() -> None: + """Enable WebSocket debugging with default settings.""" + setup_websocket_logging( + enable_debug=True, + rxtx_log_level="INFO", + message_proc_log_level="DEBUG" + ) + + +def disable_websocket_debug() -> None: + """Disable WebSocket debugging.""" + # Set all WebSocket loggers to WARNING level to reduce noise + ws_loggers = [ + "websocket", + "websocket.rxtx", + "websocket.message_processor" + ] + + for logger_name in ws_loggers: + logger = logging.getLogger(logger_name) + logger.setLevel(logging.WARNING) + + # Remove all handlers + for handler in logger.handlers[:]: + logger.removeHandler(handler) + + +def get_websocket_debug_status() -> dict: + """ + Get current WebSocket debugging status. + + Returns: + Dictionary with debugging status information + """ + config_manager = get_config_manager() + logging_config = config_manager.get_logging_config() + + # Check current logger levels + ws_rxtx_logger = logging.getLogger("websocket.rxtx") + ws_msg_proc_logger = logging.getLogger("websocket.message_processor") + ws_logger = logging.getLogger("websocket") + + return { + "config": { + "websocket_debug": logging_config.websocket_debug, + "websocket_rxtx_log_level": logging_config.websocket_rxtx_log_level, + "websocket_message_proc_log_level": logging_config.websocket_message_proc_log_level + }, + "runtime": { + "websocket_logger_level": logging.getLevelName(ws_logger.level), + "rxtx_logger_level": logging.getLevelName(ws_rxtx_logger.level), + "msg_proc_logger_level": logging.getLevelName(ws_msg_proc_logger.level), + "handler_count": { + "websocket": len(ws_logger.handlers), + "rxtx": len(ws_rxtx_logger.handlers), + "msg_proc": len(ws_msg_proc_logger.handlers) + } + } + } + + +def log_websocket_stats() -> None: + """Log WebSocket communication statistics.""" + logger = logging.getLogger("websocket.stats") + + # This could be extended to track actual message counts + # For now, just log the current status + status = get_websocket_debug_status() + logger.info(f"WebSocket Debug Status: {status}") \ No newline at end of file