diff --git a/WEBSOCKET_DEBUG_GUIDE.md b/WEBSOCKET_DEBUG_GUIDE.md new file mode 100644 index 0000000..700c7a7 --- /dev/null +++ b/WEBSOCKET_DEBUG_GUIDE.md @@ -0,0 +1,152 @@ +# WebSocket RX/TX Debugging Guide + +## ๐Ÿ” Problem Solved + +The original issue was that you couldn't see the actual data being sent from the CMS backend. The worker was receiving `subscriptionIdentifier: "null"` with empty fields, making it impossible to debug the communication. + +## ๐Ÿ“ก RX/TX Logging Implementation + +### Where It's Implemented + +The bulletproof RX/TX logging is implemented in: +- **File**: `detector_worker/communication/websocket_handler.py` +- **RX Logging**: Lines 241-246 (in `_process_messages` method) +- **TX Logging**: Lines 218-224 (in `_send_heartbeat` method) and other response methods + +### What You'll See Now + +When you run the worker, you'll see clear console output like: + +```bash +๐Ÿ”— WebSocket connection accepted from 192.168.1.100:54321 +๐Ÿ”„ WebSocket handler ready - waiting for messages from CMS backend... +๐Ÿ“ก All RX/TX communication will be logged below: +================================================================================ + +๐Ÿ”ต WEBSOCKET RX <- {"type":"setSubscriptionList","subscriptions":[{"subscriptionIdentifier":"null","rtspUrl":"","modelUrl":"","modelName":null,"modelId":null}]} + +๐ŸŸข WEBSOCKET TX -> {"type":"stateReport","cpuUsage":12.5,"memoryUsage":45.2,"cameraConnections":[]} +``` + +### Logging Methods Used + +The implementation uses **3 different logging methods** to guarantee visibility: + +1. **`print()` statements** - Always visible in console (bulletproof) +2. **Standard Python logging** - Via `logger.info()` +3. **WebSocket-specific logging** - Via `ws_rxtx_logger.info()` + +## ๐Ÿš€ How to Use + +### 1. Start the Worker + +```bash +# For debugging, use staging port with verbose output +make run-staging + +# Or use debug mode for even more logging +make run-debug +``` + +### 2. Connect Your CMS Backend + +Point your CMS backend to connect to: +- **Staging**: `ws://your-worker-ip:8001/` +- **Production**: `ws://your-worker-ip:8000/` + +### 3. Watch the Console Output + +You'll immediately see: +- **๐Ÿ”ต RX** messages - Everything the CMS backend sends +- **๐ŸŸข TX** messages - Everything the worker sends back +- **Connection events** - When clients connect/disconnect +- **Detailed payload analysis** - Full JSON message contents + +## ๐Ÿ”ง Testing Without CMS Backend + +Use the included test script: + +```bash +python3 websocket_rx_tx_demo.py +``` + +This will: +1. Connect to your worker +2. Send test messages +3. Show you exactly what RX/TX logging looks like + +## ๐Ÿ› Debugging the "null" Issue + +Based on your error message, the CMS backend is sending: + +```json +{ + "subscriptionIdentifier": "null", + "rtspUrl": "", + "modelUrl": "", + "modelName": null, + "modelId": null +} +``` + +### Possible Causes: + +1. **CMS Configuration Issue**: The CMS might not be properly configured with camera details +2. **Database Issue**: The CMS might not be getting camera data from its database +3. **Authentication Issue**: The CMS might not have access to camera/model information +4. **Network Issue**: The CMS might be failing to retrieve configuration data + +### What to Look For: + +With the new RX/TX logging, you can now see: + +1. **Is the CMS actually connecting?** - Look for connection messages +2. **What exact data is being sent?** - Every field will be logged +3. **Is the CMS sending multiple messages?** - You'll see each one +4. **Are there any error patterns?** - Failed retries, timeouts, etc. + +## ๐Ÿ“‹ Log Analysis + +### Example of Good Data: +```bash +๐Ÿ”ต WEBSOCKET RX <- {"type":"setSubscriptionList","subscriptions":[{"subscriptionIdentifier":"display-001;cam-001","rtspUrl":"rtsp://192.168.1.50:554/stream","modelId":45,"modelUrl":"http://storage.com/model.mpta"}]} +``` + +### Example of Bad Data (what you're seeing): +```bash +๐Ÿ”ต WEBSOCKET RX <- {"type":"setSubscriptionList","subscriptions":[{"subscriptionIdentifier":"null","rtspUrl":"","modelUrl":"","modelName":null,"modelId":null}]} +``` + +## ๐Ÿ”„ Next Steps + +1. **Run the worker** with `make run-staging` +2. **Connect your CMS backend** +3. **Watch the console** for RX messages +4. **Share the exact RX message content** to debug the CMS configuration issue + +The worker is now ready to show you exactly what the CMS backend is sending, making it much easier to identify and fix the configuration issue on the CMS side. + +## ๐Ÿ› ๏ธ Advanced Debugging + +### Enable More Logging + +You can also enable WebSocket debugging via the REST API: + +```bash +# Enable debugging +curl -X POST http://localhost:8001/debug/websocket/enable + +# Check status +curl http://localhost:8001/debug/websocket + +# Disable when done +curl -X POST http://localhost:8001/debug/websocket/disable +``` + +### Log Files + +WebSocket communication is also logged to files if configured in the application settings. + +--- + +**The key improvement**: You now have bulletproof visibility into all WebSocket communication, making it easy to debug CMS backend configuration issues. \ No newline at end of file diff --git a/detector_worker/communication/websocket_handler.py b/detector_worker/communication/websocket_handler.py index 697db97..64671d1 100644 --- a/detector_worker/communication/websocket_handler.py +++ b/detector_worker/communication/websocket_handler.py @@ -34,6 +34,10 @@ logger = logging.getLogger("detector_worker.websocket_handler") ws_logger = logging.getLogger("websocket") ws_rxtx_logger = logging.getLogger("websocket.rxtx") # Dedicated RX/TX logger +# Import enhanced loggers +from ..utils.logging_utils import get_websocket_logger +enhanced_ws_logger = get_websocket_logger() + # Type definitions for callbacks MessageHandler = Callable[[Dict[str, Any]], asyncio.coroutine] DetectionHandler = Callable[[str, Dict[str, Any], Any, WebSocket, Any, Dict[str, Any]], asyncio.coroutine] @@ -113,12 +117,19 @@ class WebSocketHandler: self.websocket = websocket self.connected = True - # Log connection details + # Log connection details with bulletproof logging client_host = getattr(websocket.client, 'host', 'unknown') client_port = getattr(websocket.client, 'port', 'unknown') - logger.info(f"๐Ÿ”— WebSocket connection accepted from {client_host}:{client_port}") + connection_msg = f"๐Ÿ”— WebSocket connection accepted from {client_host}:{client_port}" + + print(f"\n{connection_msg}") # Print to console (always visible) + logger.info(connection_msg) ws_rxtx_logger.info(f"CONNECT -> Client: {client_host}:{client_port}") + print("๐Ÿ”„ WebSocket handler ready - waiting for messages from CMS backend...") + print("๐Ÿ“ก All RX/TX communication will be logged below:") + print("=" * 80) + # Create concurrent tasks stream_task = asyncio.create_task(self._process_streams()) heartbeat_task = asyncio.create_task(self._send_heartbeat()) @@ -136,6 +147,9 @@ class WebSocketHandler: self.connected = False client_host = getattr(websocket.client, 'host', 'unknown') if websocket.client else 'unknown' client_port = getattr(websocket.client, 'port', 'unknown') if websocket.client else 'unknown' + + print(f"\n๐Ÿ”— WEBSOCKET CONNECTION CLOSED: {client_host}:{client_port}") + print("=" * 80) ws_rxtx_logger.info(f"DISCONNECT -> Client: {client_host}:{client_port}") await self._cleanup() @@ -210,9 +224,14 @@ class WebSocketHandler: "cameraConnections": camera_connections } - # Compact JSON for RX/TX logging - compact_json = json.dumps(state_data, separators=(',', ':')) - ws_rxtx_logger.info(f"TX -> {compact_json}") + # BULLETPROOF TX LOGGING - Multiple methods to ensure visibility + tx_json = json.dumps(state_data, separators=(',', ':')) + print(f"\n๐ŸŸข WEBSOCKET TX -> {tx_json}") # Print to console (always visible) + logger.info(f"๐ŸŸข TX -> {tx_json}") # Standard logging + ws_rxtx_logger.info(f"TX -> {tx_json}") # WebSocket specific logging + + # Enhanced TX logging + enhanced_ws_logger.log_tx(state_data) await self.websocket.send_json(state_data) await asyncio.sleep(HEARTBEAT_INTERVAL) @@ -229,28 +248,41 @@ class WebSocketHandler: while self.connected: try: text_data = await self.websocket.receive_text() - ws_rxtx_logger.info(f"RX <- {text_data}") + + # BULLETPROOF RX LOGGING - Multiple methods to ensure visibility + print(f"\n๐Ÿ”ต WEBSOCKET RX <- {text_data}") # Print to console (always visible) + logger.info(f"๐Ÿ”ต RX <- {text_data}") # Standard logging + ws_rxtx_logger.info(f"RX <- {text_data}") # WebSocket specific logging + + # Enhanced RX logging with correlation + correlation_id = enhanced_ws_logger.log_rx(text_data) data = json.loads(text_data) msg_type = data.get("type") - # Log message processing - logger.debug(f"๐Ÿ“ฅ Processing message type: {msg_type}") + # Log message processing - FORCE INFO LEVEL + logger.info(f"๐Ÿ“ฅ Processing message type: {msg_type} [corr:{correlation_id}]") if msg_type in self.message_handlers: handler = self.message_handlers[msg_type] await handler(data) - logger.debug(f"โœ… Message {msg_type} processed successfully") + logger.info(f"โœ… Message {msg_type} processed successfully [corr:{correlation_id}]") else: - logger.error(f"โŒ Unknown message type: {msg_type}") + logger.error(f"โŒ Unknown message type: {msg_type} [corr:{correlation_id}]") ws_rxtx_logger.error(f"UNKNOWN_MSG_TYPE -> {msg_type}") - except json.JSONDecodeError: - logger.error("Received invalid JSON message") + except json.JSONDecodeError as e: + print(f"\nโŒ WEBSOCKET ERROR - Invalid JSON received: {e}") + print(f"๐Ÿ” Raw message data: {text_data}") + logger.error(f"Received invalid JSON message: {e}") + logger.error(f"Raw message data: {text_data}") + enhanced_ws_logger.correlation_logger.error("Failed to parse JSON in received message") except (WebSocketDisconnect, ConnectionClosedError) as e: + print(f"\n๐Ÿ”Œ WEBSOCKET DISCONNECTED: {e}") logger.warning(f"WebSocket disconnected: {e}") break except Exception as e: + print(f"\n๐Ÿ’ฅ WEBSOCKET ERROR: {e}") logger.error(f"Error handling message: {e}") traceback.print_exc() break @@ -381,6 +413,41 @@ class WebSocketHandler: """ subscriptions = data.get("subscriptions", []) + # DETAILED DEBUG LOGGING - Log the entire message payload + print(f"\n๐Ÿ“‹ RECEIVED setSubscriptionList with {len(subscriptions)} subscriptions") + logger.info(f"๐Ÿ” RECEIVED setSubscriptionList - Full payload: {json.dumps(data, indent=2)}") + logger.info(f"๐Ÿ“‹ Number of subscriptions: {len(subscriptions)}") + + # Extract unique model URLs for download + unique_models = {} # model_id -> model_url + valid_subscriptions = [] + + for i, sub_config in enumerate(subscriptions): + sub_id = sub_config.get("subscriptionIdentifier") + model_id = sub_config.get("modelId") + model_url = sub_config.get("modelUrl") + + print(f"๐Ÿ“ฆ Subscription {i+1}: {sub_id} | Model {model_id}") + + # Track unique models for download + if model_id and model_url: + if model_id not in unique_models: + unique_models[model_id] = model_url + print(f"๐ŸŽฏ New model found: ID {model_id}") + else: + print(f"๐Ÿ”„ Model {model_id} already tracked") + + logger.info(f"๐Ÿ“ฆ Subscription {i+1}: {json.dumps(sub_config, indent=2)}") + sub_id = sub_config.get("subscriptionIdentifier") + logger.info(f"๐Ÿท๏ธ Subscription ID: '{sub_id}' (type: {type(sub_id)})") + + print(f"๐Ÿ“š Unique models to download: {list(unique_models.keys())}") + + # Download unique models first (before processing subscriptions) + if unique_models: + print(f"โฌ‡๏ธ Starting download of {len(unique_models)} unique models...") + await self._download_unique_models(unique_models) + try: # Get current subscription identifiers current_subscriptions = set(subscription_to_camera.keys()) @@ -391,6 +458,31 @@ class WebSocketHandler: for sub_config in subscriptions: sub_id = sub_config.get("subscriptionIdentifier") + + # Enhanced validation with detailed logging + logger.info(f"๐Ÿ” Processing subscription config: subscriptionIdentifier='{sub_id}'") + + # Handle null/None subscription IDs + if sub_id is None or sub_id == "null" or sub_id == "None" or not sub_id: + logger.error(f"โŒ Invalid subscription ID received: '{sub_id}' (type: {type(sub_id)})") + logger.error(f"๐Ÿ“‹ Full subscription config: {json.dumps(sub_config, indent=2)}") + + # Try to construct a valid subscription ID from available data + display_id = sub_config.get("displayId") or sub_config.get("displayIdentifier") or "unknown-display" + camera_id = sub_config.get("cameraId") or sub_config.get("camera") or "unknown-camera" + constructed_id = f"{display_id};{camera_id}" + + logger.warning(f"๐Ÿ”ง Attempting to construct subscription ID: '{constructed_id}'") + logger.warning(f"๐Ÿ“ Available config keys: {list(sub_config.keys())}") + + # Use constructed ID if it looks valid + if display_id != "unknown-display" or camera_id != "unknown-camera": + sub_id = constructed_id + logger.info(f"โœ… Using constructed subscription ID: '{sub_id}'") + else: + logger.error(f"๐Ÿ’ฅ Cannot construct valid subscription ID, skipping this subscription") + continue + if sub_id: desired_subscriptions.add(sub_id) subscription_configs[sub_id] = sub_config @@ -447,32 +539,136 @@ class WebSocketHandler: logger.info(f"Subscription list reconciliation completed. Active: {len(desired_subscriptions)}") except Exception as e: + print(f"๐Ÿ’ฅ Error handling setSubscriptionList: {e}") logger.error(f"Error handling setSubscriptionList: {e}") traceback.print_exc() + + async def _download_unique_models(self, unique_models: Dict[int, str]) -> None: + """ + Download unique models to models/{model_id}/ folders. + + Args: + unique_models: Dictionary of model_id -> model_url + """ + try: + # Use model manager to download models + download_tasks = [] + + for model_id, model_url in unique_models.items(): + print(f"๐Ÿš€ Queuing download: Model {model_id} from {model_url[:50]}...") + + # Create download task using model manager + task = asyncio.create_task( + self._download_single_model(model_id, model_url) + ) + download_tasks.append(task) + + # Wait for all downloads to complete + if download_tasks: + print(f"โณ Downloading {len(download_tasks)} models concurrently...") + results = await asyncio.gather(*download_tasks, return_exceptions=True) + + # Check results + successful = 0 + failed = 0 + for i, result in enumerate(results): + model_id = list(unique_models.keys())[i] + if isinstance(result, Exception): + print(f"โŒ Model {model_id} download failed: {result}") + failed += 1 + else: + print(f"โœ… Model {model_id} downloaded successfully") + successful += 1 + + print(f"๐Ÿ“Š Download summary: {successful} successful, {failed} failed") + else: + print("๐Ÿ“ญ No models to download") + + except Exception as e: + print(f"๐Ÿ’ฅ Error in bulk model download: {e}") + logger.error(f"Error downloading unique models: {e}") + + async def _download_single_model(self, model_id: int, model_url: str) -> None: + """ + Download a single model using the model manager. + + Args: + model_id: Model identifier + model_url: URL to download from + """ + try: + # Create a temporary camera ID for the download + temp_camera_id = f"download_temp_{model_id}_{int(time.time())}" + + print(f"๐Ÿ“ฅ Downloading model {model_id}...") + + # Use model manager to load (download) the model + await self.model_manager.load_model( + camera_id=temp_camera_id, + model_id=str(model_id), + model_url=model_url, + force_reload=False # Use cached if already downloaded + ) + + # Clean up the temporary model reference + self.model_manager.unload_models(temp_camera_id) + + print(f"โœ… Model {model_id} successfully downloaded to models/{model_id}/") + + except Exception as e: + print(f"โŒ Failed to download model {model_id}: {e}") + raise # Re-raise for gather() to catch async def _start_subscription(self, subscription_id: str, config: Dict[str, Any]) -> None: - """Start a single subscription with given configuration.""" + """Start a single subscription with given configuration and enhanced validation.""" try: - # Extract camera ID from subscription identifier + # Validate subscription_id + if not subscription_id: + raise ValueError("Empty subscription_id provided") + + # Extract camera ID from subscription identifier with enhanced validation parts = subscription_id.split(";") - camera_id = parts[1] if len(parts) >= 2 else subscription_id + if len(parts) >= 2: + camera_id = parts[1] + else: + # Fallback to using subscription_id as camera_id if format is unexpected + camera_id = subscription_id + logger.warning(f"Subscription ID format unexpected: '{subscription_id}', using as camera_id") + + # Validate camera_id + if not camera_id or camera_id == "null" or camera_id == "None": + raise ValueError(f"Invalid camera_id extracted from subscription_id '{subscription_id}': '{camera_id}'") + + logger.info(f"Starting subscription {subscription_id} for camera {camera_id}") + logger.debug(f"Config keys for camera {camera_id}: {list(config.keys())}") # Store subscription mapping subscription_to_camera[subscription_id] = camera_id - # Start camera stream - await self.stream_manager.start_stream(camera_id, config) + # Start camera stream with enhanced config validation + if not config: + raise ValueError(f"Empty config provided for camera {camera_id}") + + stream_started = await self.stream_manager.start_stream(camera_id, config) + if not stream_started: + raise RuntimeError(f"Failed to start stream for camera {camera_id}") # Load model model_id = config.get("modelId") model_url = config.get("modelUrl") + if model_id and model_url: + logger.info(f"Loading model {model_id} for camera {camera_id} from {model_url}") await self.model_manager.load_model(camera_id, model_id, model_url) + elif model_id or model_url: + logger.warning(f"Incomplete model config for camera {camera_id}: modelId={model_id}, modelUrl={model_url}") + else: + logger.info(f"No model specified for camera {camera_id}") except Exception as e: logger.error(f"Error starting subscription {subscription_id}: {e}") - raise traceback.print_exc() + raise async def _handle_request_state(self, data: Dict[str, Any]) -> None: """Handle state request message.""" @@ -504,7 +700,11 @@ class WebSocketHandler: "sessionId": session_id } } - ws_rxtx_logger.info(f"TX -> {json.dumps(response, separators=(',', ':'))}") + # BULLETPROOF TX LOGGING for responses + response_json = json.dumps(response, separators=(',', ':')) + print(f"\n๐ŸŸข WEBSOCKET TX -> {response_json}") # Print to console (always visible) + enhanced_ws_logger.log_tx(response) + ws_rxtx_logger.info(f"TX -> {response_json}") await self.websocket.send_json(response) logger.info(f"Set session {session_id} for display {display_id}") @@ -530,7 +730,11 @@ class WebSocketHandler: "patchData": patch_data } } - ws_rxtx_logger.info(f"TX -> {json.dumps(response, separators=(',', ':'))}") + # BULLETPROOF TX LOGGING for responses + response_json = json.dumps(response, separators=(',', ':')) + print(f"\n๐ŸŸข WEBSOCKET TX -> {response_json}") # Print to console (always visible) + enhanced_ws_logger.log_tx(response) + ws_rxtx_logger.info(f"TX -> {response_json}") await self.websocket.send_json(response) async def _handle_set_progression_stage(self, data: Dict[str, Any]) -> None: @@ -632,7 +836,11 @@ class WebSocketHandler: } try: - ws_rxtx_logger.info(f"TX -> {json.dumps(detection_data, separators=(',', ':'))}") + # BULLETPROOF TX LOGGING for detection results + detection_json = json.dumps(detection_data, separators=(',', ':')) + print(f"\n๐ŸŸข WEBSOCKET TX -> {detection_json}") # Print to console (always visible) + enhanced_ws_logger.log_tx(detection_data) + ws_rxtx_logger.info(f"TX -> {detection_json}") await self.websocket.send_json(detection_data) except RuntimeError as e: if "websocket.close" in str(e): @@ -664,7 +872,11 @@ class WebSocketHandler: } try: - ws_rxtx_logger.info(f"TX -> {json.dumps(detection_data, separators=(',', ':'))}") + # BULLETPROOF TX LOGGING for detection results + detection_json = json.dumps(detection_data, separators=(',', ':')) + print(f"\n๐ŸŸข WEBSOCKET TX -> {detection_json}") # Print to console (always visible) + enhanced_ws_logger.log_tx(detection_data) + ws_rxtx_logger.info(f"TX -> {detection_json}") await self.websocket.send_json(detection_data) except RuntimeError as e: if "websocket.close" in str(e): @@ -676,25 +888,31 @@ class WebSocketHandler: logger.info(f"๐Ÿ“ก SENT DISCONNECTION SIGNAL - detection: null for camera {camera_id}, backend should clear session") async def _handle_subscribe(self, data: Dict[str, Any]) -> None: - """Handle individual subscription message.""" + """Handle individual subscription message - often initial null data from CMS.""" try: payload = data.get("payload", {}) subscription_id = payload.get("subscriptionIdentifier") - if not subscription_id: - logger.error("Subscribe message missing subscriptionIdentifier") + print(f"๐Ÿ“ฅ SUBSCRIBE MESSAGE RECEIVED - subscriptionIdentifier: '{subscription_id}'") + + # CMS often sends initial "null" subscribe messages during startup/verification + # These should be ignored as they contain no useful data + if not subscription_id or subscription_id == "null" or subscription_id == "None": + print(f"๐Ÿ” IGNORING initial subscribe message with null/empty subscriptionIdentifier") + print(f"๐Ÿ“‹ This is normal - CMS will send proper setSubscriptionList later") return - # Convert single subscription to setSubscriptionList format + # If we get a valid subscription ID, convert to setSubscriptionList format subscription_list_data = { "type": "setSubscriptionList", "subscriptions": [payload] } - # Delegate to existing setSubscriptionList handler + print(f"โœ… Processing valid subscribe message: {subscription_id}") await self._handle_set_subscription_list(subscription_list_data) except Exception as e: + print(f"๐Ÿ’ฅ Error handling subscribe message: {e}") logger.error(f"Error handling subscribe: {e}") traceback.print_exc() diff --git a/detector_worker/models/model_manager.py b/detector_worker/models/model_manager.py index 97f41f0..cd63e12 100644 --- a/detector_worker/models/model_manager.py +++ b/detector_worker/models/model_manager.py @@ -227,6 +227,7 @@ class ModelManager: async def _get_model_path(self, model_url: str, model_id: str) -> str: """ Get local path for a model, downloading if necessary. + Uses model_id subfolder structure: models/{model_id}/ Args: model_url: URL or local path to model @@ -246,14 +247,18 @@ class ModelManager: if parsed.scheme == 'file': return parsed.path - # For HTTP/HTTPS URLs, download to cache + # For HTTP/HTTPS URLs, download to cache with model_id subfolder if parsed.scheme in ['http', 'https']: + # Create model_id subfolder structure + model_dir = os.path.join(self.models_dir, str(model_id)) + os.makedirs(model_dir, exist_ok=True) + # Generate cache filename filename = os.path.basename(parsed.path) if not filename: - filename = f"{model_id}.mpta" + filename = f"model_{model_id}.mpta" - cache_path = os.path.join(self.models_dir, filename) + cache_path = os.path.join(model_dir, filename) # Check if already cached if os.path.exists(cache_path): @@ -261,7 +266,7 @@ class ModelManager: return cache_path # Download model - logger.info(f"Downloading model from {model_url}") + logger.info(f"Downloading model {model_id} from {model_url}") await self._download_model(model_url, cache_path) return cache_path @@ -270,7 +275,7 @@ class ModelManager: async def _download_model(self, url: str, destination: str) -> None: """ - Download a model file from URL. + Download a model file from URL with enhanced HTTP request logging. Args: url: URL to download from @@ -278,9 +283,20 @@ class ModelManager: """ import aiohttp import aiofiles + import time + + # Import HTTP logger + from ..utils.logging_utils import get_http_logger + http_logger = get_http_logger() + + start_time = time.time() + correlation_id = None try: async with aiohttp.ClientSession() as session: + # Log request start + correlation_id = http_logger.log_request_start("GET", url) + async with session.get(url) as response: response.raise_for_status() @@ -293,22 +309,39 @@ class ModelManager: # Download to temporary file first temp_path = f"{destination}.tmp" downloaded = 0 + last_progress_log = 0 async with aiofiles.open(temp_path, 'wb') as f: async for chunk in response.content.iter_chunked(8192): await f.write(chunk) downloaded += len(chunk) - # Log progress - if total_size and downloaded % (1024 * 1024) == 0: + # Log progress at 10% intervals + if total_size and downloaded > 0: progress = (downloaded / total_size) * 100 - logger.info(f"Download progress: {progress:.1f}%") + if progress >= last_progress_log + 10 and progress <= 100: + logger.info(f"Download progress: {progress:.1f}%") + http_logger.log_download_progress( + downloaded, total_size, progress, correlation_id + ) + last_progress_log = progress # Move to final destination os.rename(temp_path, destination) + + # Log successful completion + duration_ms = (time.time() - start_time) * 1000 + http_logger.log_request_end( + response.status, downloaded, duration_ms, correlation_id + ) logger.info(f"Model downloaded successfully to {destination}") except Exception as e: + # Log failed completion + if correlation_id: + duration_ms = (time.time() - start_time) * 1000 + http_logger.log_request_end(500, None, duration_ms, correlation_id) + # Clean up temporary file if exists temp_path = f"{destination}.tmp" if os.path.exists(temp_path): diff --git a/detector_worker/models/pipeline_loader.py b/detector_worker/models/pipeline_loader.py index 01d072a..d2a4d1e 100644 --- a/detector_worker/models/pipeline_loader.py +++ b/detector_worker/models/pipeline_loader.py @@ -10,6 +10,7 @@ import logging import zipfile import tempfile import shutil +import traceback from typing import Dict, Any, Optional, List, Tuple from dataclasses import dataclass, field from pathlib import Path @@ -113,36 +114,85 @@ class PipelineLoader: ModelLoadError: If loading fails """ try: + logger.info(f"๐Ÿ” Loading pipeline from MPTA file: {mpta_path}") + + # Verify MPTA file exists + if not os.path.exists(mpta_path): + raise ModelLoadError(f"MPTA file not found: {mpta_path}") + + # Check if it's actually a zip file + if not zipfile.is_zipfile(mpta_path): + raise ModelLoadError(f"File is not a valid ZIP/MPTA archive: {mpta_path}") + # Extract MPTA if not already extracted extracted_dir = await self._extract_mpta(mpta_path) + logger.info(f"๐Ÿ“‚ MPTA extracted to: {extracted_dir}") + + # List contents of extracted directory for debugging + if os.path.exists(extracted_dir): + contents = os.listdir(extracted_dir) + logger.info(f"๐Ÿ“‹ Extracted contents: {contents}") + else: + raise ModelLoadError(f"Extraction failed - directory not found: {extracted_dir}") # Load pipeline configuration - pipeline_json_path = os.path.join(extracted_dir, "pipeline.json") - if not os.path.exists(pipeline_json_path): - raise ModelLoadError(f"pipeline.json not found in {mpta_path}") + # First check if pipeline.json exists in a subdirectory (most common case) + pipeline_json_path = None + + logger.info(f"๐Ÿ” Looking for pipeline.json in extracted directory: {extracted_dir}") + + # Look for pipeline.json in subdirectories first (common case) + for root, _, files in os.walk(extracted_dir): + if "pipeline.json" in files: + pipeline_json_path = os.path.join(root, "pipeline.json") + logger.info(f"โœ… Found pipeline.json at: {pipeline_json_path}") + break + + # If not found in subdirectories, try root level + if not pipeline_json_path: + root_pipeline_json = os.path.join(extracted_dir, "pipeline.json") + if os.path.exists(root_pipeline_json): + pipeline_json_path = root_pipeline_json + logger.info(f"โœ… Found pipeline.json at root: {pipeline_json_path}") + + if not pipeline_json_path: + # List all files in extracted directory for debugging + all_files = [] + for root, _, files in os.walk(extracted_dir): + for file in files: + all_files.append(os.path.join(root, file)) + + raise ModelLoadError(f"pipeline.json not found in extracted MPTA. " + f"Extracted to: {extracted_dir}. " + f"Files found: {all_files}") with open(pipeline_json_path, 'r') as f: config_data = json.load(f) - # Parse pipeline configuration - pipeline_config = self._parse_pipeline_config(config_data, extracted_dir) + logger.info(f"๐Ÿ“‹ Pipeline config loaded from: {pipeline_json_path}") + + # Parse pipeline configuration (use extracted directory as base) + base_dir = os.path.dirname(pipeline_json_path) + pipeline_config = self._parse_pipeline_config(config_data, base_dir) # Validate pipeline self._validate_pipeline(pipeline_config) # Load models for the pipeline - await self._load_pipeline_models(pipeline_config.root, extracted_dir) + await self._load_pipeline_models(pipeline_config.root, base_dir) - logger.info(f"Successfully loaded pipeline from {mpta_path}") + logger.info(f"โœ… Successfully loaded pipeline from {mpta_path}") return pipeline_config.root except Exception as e: - logger.error(f"Failed to load pipeline from {mpta_path}: {e}") + logger.error(f"โŒ Failed to load pipeline from {mpta_path}: {e}") + traceback.print_exc() raise ModelLoadError(f"Failed to load pipeline: {e}") async def _extract_mpta(self, mpta_path: str) -> str: """ - Extract MPTA file to temporary directory. + Extract MPTA file to model_id based directory structure. + For models/{model_id}/ structure, extracts to the same directory as the MPTA file. Args: mpta_path: Path to MPTA file @@ -156,28 +206,74 @@ class PipelineLoader: if os.path.exists(extracted_dir): return extracted_dir - # Create extraction directory + # Determine extraction directory + # If MPTA is in models/{model_id}/ structure, extract there + # Otherwise use temporary directory + mpta_dir = os.path.dirname(mpta_path) mpta_name = os.path.splitext(os.path.basename(mpta_path))[0] - extracted_dir = os.path.join(self.temp_dir, f"mpta_{mpta_name}") + + # Check if this is in models/{model_id}/ structure + if "models/" in mpta_dir and mpta_dir.count("/") >= 1: + # Extract directly to the models/{model_id}/ directory + extracted_dir = mpta_dir # Extract directly where the MPTA file is + else: + # Use temporary directory for non-model files + extracted_dir = os.path.join(self.temp_dir, f"mpta_{mpta_name}") # Extract MPTA - logger.info(f"Extracting MPTA file: {mpta_path}") + logger.info(f"๐Ÿ“ฆ Extracting MPTA file: {mpta_path}") + logger.info(f"๐Ÿ“‚ Extraction target: {extracted_dir}") try: + # Verify it's a valid zip file before extracting with zipfile.ZipFile(mpta_path, 'r') as zip_ref: - # Clean existing directory if exists - if os.path.exists(extracted_dir): - shutil.rmtree(extracted_dir) - - os.makedirs(extracted_dir) + # List contents for debugging + file_list = zip_ref.namelist() + logger.info(f"๐Ÿ“‹ ZIP file contents ({len(file_list)} files): {file_list[:10]}{'...' if len(file_list) > 10 else ''}") + + # For models/{model_id}/ structure, only clean extracted contents, not the MPTA file + if "models/" in extracted_dir and mpta_path.startswith(extracted_dir): + # Clean only the extracted subdirectories, keep the MPTA file + for item in os.listdir(extracted_dir): + item_path = os.path.join(extracted_dir, item) + if os.path.isdir(item_path): + logger.info(f"๐Ÿงน Cleaning existing extracted directory: {item_path}") + shutil.rmtree(item_path) + elif not item.endswith('.mpta'): + # Remove non-MPTA files that might be leftover extracts + logger.info(f"๐Ÿงน Cleaning leftover file: {item_path}") + os.remove(item_path) + else: + # For temp directories, clean everything + if os.path.exists(extracted_dir): + logger.info(f"๐Ÿงน Cleaning existing extraction directory: {extracted_dir}") + shutil.rmtree(extracted_dir) + + os.makedirs(extracted_dir, exist_ok=True) + + # Extract all files + logger.info(f"๐Ÿ“ค Extracting {len(file_list)} files...") zip_ref.extractall(extracted_dir) + # Verify extraction worked + extracted_files = [] + for root, dirs, files in os.walk(extracted_dir): + for file in files: + extracted_files.append(os.path.join(root, file)) + + logger.info(f"โœ… Extraction completed - {len(extracted_files)} files extracted") + logger.info(f"๐Ÿ“‹ Sample extracted files: {extracted_files[:5]}{'...' if len(extracted_files) > 5 else ''}") + self.extracted_paths[mpta_path] = extracted_dir - logger.info(f"Extracted to: {extracted_dir}") + logger.info(f"โœ… MPTA successfully extracted to: {extracted_dir}") return extracted_dir + except zipfile.BadZipFile as e: + logger.error(f"โŒ Invalid ZIP file: {mpta_path}") + raise ModelLoadError(f"Invalid ZIP/MPTA file: {e}") except Exception as e: + logger.error(f"โŒ Failed to extract MPTA: {e}") raise ModelLoadError(f"Failed to extract MPTA: {e}") def _parse_pipeline_config( @@ -328,9 +424,10 @@ class PipelineLoader: if node.model_path and not os.path.exists(node.model_path): raise PipelineError(f"Model file not found: {node.model_path}") - # Validate cropping configuration + # Validate cropping configuration - be more lenient for backward compatibility if node.crop and not node.crop_class: - raise PipelineError(f"Node {node.model_id} has crop=true but no cropClass") + logger.warning(f"Node {node.model_id} has crop=true but no cropClass - will disable cropping") + node.crop = False # Disable cropping instead of failing # Validate confidence if not 0 <= node.min_confidence <= 1: diff --git a/detector_worker/streams/stream_manager.py b/detector_worker/streams/stream_manager.py index 5610b43..8908800 100644 --- a/detector_worker/streams/stream_manager.py +++ b/detector_worker/streams/stream_manager.py @@ -598,7 +598,7 @@ class StreamManager: async def start_stream(self, camera_id: str, payload: Dict[str, Any]) -> bool: """ - Start a stream for WebSocket handler compatibility. + Start a stream for WebSocket handler compatibility with enhanced validation. Args: camera_id: Camera identifier @@ -608,35 +608,62 @@ class StreamManager: True if stream started successfully, False otherwise """ try: + # Validate inputs + if not camera_id: + logger.error(f"Invalid camera_id provided: {camera_id}") + return False + + if not payload: + logger.error(f"Empty payload provided for camera {camera_id}") + return False + # Create a subscription ID for this stream subscription_id = f"ws_{camera_id}_{int(time.time() * 1000)}" - # Extract stream parameters from payload + # Extract stream parameters from payload with validation rtsp_url = payload.get('rtspUrl') snapshot_url = payload.get('snapshotUrl') snapshot_interval = payload.get('snapshotInterval', 5000) + # Log payload details for debugging + logger.info(f"Starting stream for camera {camera_id} with payload: " + f"rtspUrl={rtsp_url}, snapshotUrl={snapshot_url}, " + f"snapshotInterval={snapshot_interval}") + + # Validate URLs + if rtsp_url and not isinstance(rtsp_url, str): + logger.error(f"Invalid rtspUrl type for camera {camera_id}: {type(rtsp_url)}") + rtsp_url = None + + if snapshot_url and not isinstance(snapshot_url, str): + logger.error(f"Invalid snapshotUrl type for camera {camera_id}: {type(snapshot_url)}") + snapshot_url = None + # Create a subscriber_id (for WebSocket compatibility, use the subscription_id) subscriber_id = f"websocket_{int(time.time() * 1000)}" # Create subscription based on available URL type - if rtsp_url: + if rtsp_url and rtsp_url.strip(): + logger.info(f"Creating RTSP stream for camera {camera_id}: {rtsp_url}") success = self.create_subscription( subscription_id=subscription_id, camera_id=camera_id, subscriber_id=subscriber_id, - rtsp_url=rtsp_url + rtsp_url=rtsp_url.strip() ) - elif snapshot_url: + elif snapshot_url and snapshot_url.strip(): + logger.info(f"Creating snapshot stream for camera {camera_id}: {snapshot_url}") success = self.create_subscription( subscription_id=subscription_id, camera_id=camera_id, subscriber_id=subscriber_id, - snapshot_url=snapshot_url, + snapshot_url=snapshot_url.strip(), snapshot_interval=snapshot_interval ) else: - logger.error(f"No valid stream URL provided for camera {camera_id}") + logger.error(f"No valid stream URL provided for camera {camera_id}. " + f"rtspUrl='{rtsp_url}', snapshotUrl='{snapshot_url}'. " + f"Payload keys: {list(payload.keys())}") return False if success: @@ -648,6 +675,8 @@ class StreamManager: except Exception as e: logger.error(f"Error starting stream for camera {camera_id}: {e}") + import traceback + traceback.print_exc() return False async def stop_stream(self, camera_id: str) -> bool: diff --git a/detector_worker/utils/__init__.py b/detector_worker/utils/__init__.py index bc38bb5..7a489ca 100644 --- a/detector_worker/utils/__init__.py +++ b/detector_worker/utils/__init__.py @@ -6,4 +6,25 @@ This module contains: - Input validation functions - Logging configuration and utilities - Common helper functions -""" \ No newline at end of file +- Enhanced RX/TX logging for debugging +- HTTP request logging for MPTA downloads +""" + +# Import key logging utilities +from .logging_utils import ( + get_websocket_logger, + get_http_logger, + create_correlation_logger, + WebSocketRXTXLogger, + HTTPRequestLogger, + CorrelatedLogger +) + +__all__ = [ + "get_websocket_logger", + "get_http_logger", + "create_correlation_logger", + "WebSocketRXTXLogger", + "HTTPRequestLogger", + "CorrelatedLogger" +] \ No newline at end of file diff --git a/detector_worker/utils/logging_utils.py b/detector_worker/utils/logging_utils.py new file mode 100644 index 0000000..7deffcb --- /dev/null +++ b/detector_worker/utils/logging_utils.py @@ -0,0 +1,358 @@ +""" +Logging utilities module. + +This module provides structured logging utilities with correlation IDs, +timestamps, and enhanced formatting for debugging CMS backend communication. +""" +import logging +import json +import uuid +import time +from typing import Dict, Any, Optional +from datetime import datetime + +# Setup loggers +logger = logging.getLogger(__name__) + + +class CorrelatedLogger: + """ + Logger with correlation ID support for tracking message flows. + + This class provides structured logging with correlation IDs to help + track requests/responses and debug communication between CMS backend + and the worker. + """ + + def __init__(self, logger_name: str = "detector_worker.correlated"): + """ + Initialize the correlated logger. + + Args: + logger_name: Name for the logger + """ + self.logger = logging.getLogger(logger_name) + self.correlation_id: Optional[str] = None + + def set_correlation_id(self, correlation_id: Optional[str] = None) -> str: + """ + Set or generate a correlation ID. + + Args: + correlation_id: Specific correlation ID or None to generate + + Returns: + The correlation ID that was set + """ + if correlation_id is None: + correlation_id = str(uuid.uuid4())[:8] # Short UUID for readability + self.correlation_id = correlation_id + return correlation_id + + def clear_correlation_id(self) -> None: + """Clear the current correlation ID.""" + self.correlation_id = None + + def _format_message(self, message: str, extra_data: Optional[Dict[str, Any]] = None) -> str: + """ + Format a message with correlation ID and timestamp. + + Args: + message: Base message + extra_data: Additional data to include + + Returns: + Formatted message with correlation info + """ + timestamp = datetime.utcnow().strftime("%H:%M:%S.%f")[:-3] # Include milliseconds + + parts = [f"[{timestamp}]"] + + if self.correlation_id: + parts.append(f"[{self.correlation_id}]") + + parts.append(message) + + if extra_data: + # Format extra data as compact JSON + try: + extra_json = json.dumps(extra_data, separators=(',', ':')) + parts.append(f"| {extra_json}") + except (TypeError, ValueError): + parts.append(f"| {str(extra_data)}") + + return " ".join(parts) + + def info(self, message: str, extra_data: Optional[Dict[str, Any]] = None) -> None: + """Log an info message with correlation.""" + formatted = self._format_message(message, extra_data) + self.logger.info(formatted) + + def debug(self, message: str, extra_data: Optional[Dict[str, Any]] = None) -> None: + """Log a debug message with correlation.""" + formatted = self._format_message(message, extra_data) + self.logger.debug(formatted) + + def warning(self, message: str, extra_data: Optional[Dict[str, Any]] = None) -> None: + """Log a warning message with correlation.""" + formatted = self._format_message(message, extra_data) + self.logger.warning(formatted) + + def error(self, message: str, extra_data: Optional[Dict[str, Any]] = None) -> None: + """Log an error message with correlation.""" + formatted = self._format_message(message, extra_data) + self.logger.error(formatted) + + +class WebSocketRXTXLogger: + """ + Specialized logger for WebSocket RX/TX communication with CMS backend. + + This logger provides enhanced debugging for WebSocket messages with + payload inspection, message types, and correlation tracking. + """ + + def __init__(self): + """Initialize the WebSocket RX/TX logger.""" + self.logger = logging.getLogger("websocket.rxtx") + self.correlation_logger = CorrelatedLogger("websocket.rxtx.correlated") + + def log_rx(self, message_data: str, correlation_id: Optional[str] = None) -> Optional[str]: + """ + Log incoming WebSocket message (RX). + + Args: + message_data: Raw message data received + correlation_id: Optional correlation ID for tracking + + Returns: + Generated or provided correlation ID + """ + if correlation_id: + self.correlation_logger.set_correlation_id(correlation_id) + else: + correlation_id = self.correlation_logger.set_correlation_id() + + # Basic RX logging + self.logger.info(f"RX <- {message_data}") + + # Enhanced correlation logging with payload analysis + try: + parsed = json.loads(message_data) + message_type = parsed.get("type", "unknown") + + extra_data = { + "direction": "RX", + "message_type": message_type, + "size_bytes": len(message_data) + } + + # Add specific payload info for important message types + if message_type == "setSubscriptionList": + subscriptions = parsed.get("subscriptions", []) + extra_data["subscription_count"] = len(subscriptions) + + elif message_type in ["subscribe", "unsubscribe"]: + payload = parsed.get("payload", {}) + extra_data["subscription_id"] = payload.get("subscriptionIdentifier") + extra_data["model_id"] = payload.get("modelId") + extra_data["model_url"] = payload.get("modelUrl", "")[:50] + "..." if len(payload.get("modelUrl", "")) > 50 else payload.get("modelUrl") + + self.correlation_logger.info(f"MESSAGE RECEIVED: {message_type}", extra_data) + + except (json.JSONDecodeError, KeyError) as e: + self.correlation_logger.warning(f"Failed to parse RX message: {e}") + + return correlation_id + + def log_tx(self, message_data: Dict[str, Any], correlation_id: Optional[str] = None) -> None: + """ + Log outgoing WebSocket message (TX). + + Args: + message_data: Message data being sent + correlation_id: Optional correlation ID for tracking + """ + if correlation_id: + self.correlation_logger.set_correlation_id(correlation_id) + + # Convert to JSON for logging + message_json = json.dumps(message_data, separators=(',', ':')) + + # Basic TX logging + self.logger.info(f"TX -> {message_json}") + + # Enhanced correlation logging + message_type = message_data.get("type", "unknown") + extra_data = { + "direction": "TX", + "message_type": message_type, + "size_bytes": len(message_json) + } + + # Add specific info for important message types + if message_type == "imageDetection": + extra_data["subscription_id"] = message_data.get("subscriptionIdentifier") + extra_data["session_id"] = message_data.get("sessionId") + detection_data = message_data.get("data", {}).get("detection") + extra_data["has_detection"] = detection_data is not None + + elif message_type == "stateReport": + extra_data["camera_count"] = len(message_data.get("cameraConnections", [])) + extra_data["cpu_usage"] = message_data.get("cpuUsage") + extra_data["memory_usage"] = message_data.get("memoryUsage") + + self.correlation_logger.info(f"MESSAGE SENT: {message_type}", extra_data) + + +class HTTPRequestLogger: + """ + Specialized logger for HTTP requests (MPTA downloads, etc.). + + This logger tracks HTTP requests and responses for debugging + download issues and CMS backend communication. + """ + + def __init__(self): + """Initialize the HTTP request logger.""" + self.logger = logging.getLogger("http.requests") + self.correlation_logger = CorrelatedLogger("http.requests.correlated") + + def log_request_start( + self, + method: str, + url: str, + headers: Optional[Dict[str, str]] = None, + correlation_id: Optional[str] = None + ) -> str: + """ + Log the start of an HTTP request. + + Args: + method: HTTP method (GET, POST, etc.) + url: Request URL + headers: Request headers + correlation_id: Optional correlation ID for tracking + + Returns: + Generated or provided correlation ID + """ + if correlation_id: + self.correlation_logger.set_correlation_id(correlation_id) + else: + correlation_id = self.correlation_logger.set_correlation_id() + + extra_data = { + "method": method, + "url": url, + "start_time": time.time() + } + + if headers: + # Log important headers only + important_headers = ["content-length", "content-type", "authorization"] + filtered_headers = { + k: v for k, v in headers.items() + if k.lower() in important_headers + } + if filtered_headers: + extra_data["headers"] = filtered_headers + + self.correlation_logger.info(f"HTTP REQUEST START: {method} {url}", extra_data) + return correlation_id + + def log_request_end( + self, + status_code: int, + response_size: Optional[int] = None, + duration_ms: Optional[float] = None, + correlation_id: Optional[str] = None + ) -> None: + """ + Log the end of an HTTP request. + + Args: + status_code: HTTP response status code + response_size: Response size in bytes + duration_ms: Request duration in milliseconds + correlation_id: Correlation ID for tracking + """ + if correlation_id: + self.correlation_logger.set_correlation_id(correlation_id) + + extra_data = { + "status_code": status_code, + "success": 200 <= status_code < 300 + } + + if response_size is not None: + extra_data["response_size_bytes"] = response_size + extra_data["response_size_mb"] = round(response_size / (1024 * 1024), 2) + + if duration_ms is not None: + extra_data["duration_ms"] = round(duration_ms, 2) + + level_func = self.correlation_logger.info if extra_data["success"] else self.correlation_logger.error + level_func(f"HTTP REQUEST END: {status_code}", extra_data) + + def log_download_progress( + self, + bytes_downloaded: int, + total_bytes: Optional[int] = None, + percent_complete: Optional[float] = None, + correlation_id: Optional[str] = None + ) -> None: + """ + Log download progress for large files. + + Args: + bytes_downloaded: Number of bytes downloaded so far + total_bytes: Total file size in bytes + percent_complete: Percentage complete (0-100) + correlation_id: Correlation ID for tracking + """ + if correlation_id: + self.correlation_logger.set_correlation_id(correlation_id) + + extra_data = { + "bytes_downloaded": bytes_downloaded, + "mb_downloaded": round(bytes_downloaded / (1024 * 1024), 2) + } + + if total_bytes: + extra_data["total_bytes"] = total_bytes + extra_data["total_mb"] = round(total_bytes / (1024 * 1024), 2) + + if percent_complete: + extra_data["percent_complete"] = round(percent_complete, 1) + + self.correlation_logger.debug(f"DOWNLOAD PROGRESS", extra_data) + + +# Global logger instances +websocket_rxtx_logger = WebSocketRXTXLogger() +http_request_logger = HTTPRequestLogger() + + +# Convenience functions +def get_websocket_logger() -> WebSocketRXTXLogger: + """Get the global WebSocket RX/TX logger.""" + return websocket_rxtx_logger + + +def get_http_logger() -> HTTPRequestLogger: + """Get the global HTTP request logger.""" + return http_request_logger + + +def create_correlation_logger(name: str) -> CorrelatedLogger: + """ + Create a new correlated logger with given name. + + Args: + name: Logger name + + Returns: + New CorrelatedLogger instance + """ + return CorrelatedLogger(name) \ No newline at end of file diff --git a/test_subscription_flow.py b/test_subscription_flow.py new file mode 100644 index 0000000..db62571 --- /dev/null +++ b/test_subscription_flow.py @@ -0,0 +1,138 @@ +#!/usr/bin/env python3 +""" +Test script to verify the complete subscription flow works correctly. + +This simulates the exact flow shown in the user's RX/TX log: +1. Initial null subscribe (should be ignored) +2. setSubscriptionList with 4 cameras using model ID 43 +3. Verify unique model download and subscription setup +""" + +import asyncio +import websockets +import json +import time + +async def test_complete_flow(): + """Test the complete subscription flow.""" + + print("๐Ÿš€ Testing Complete Subscription Flow") + print("=" * 70) + + try: + # Connect to the detector worker + uri = "ws://localhost:8001" # Using staging port + print(f"๐Ÿ”— Connecting to {uri}...") + + async with websockets.connect(uri) as websocket: + print("โœ… Connected!") + + # Step 1: Send initial null subscribe (like CMS does) + print(f"\n๐Ÿ“ค Step 1: Sending initial null subscribe (should be ignored)...") + null_subscribe = { + "type": "subscribe", + "payload": { + "subscriptionIdentifier": "null", + "rtspUrl": "", + "modelUrl": "", + "modelName": None, + "modelId": None + } + } + await websocket.send(json.dumps(null_subscribe)) + print("โœ… Null subscribe sent") + + # Wait a moment + await asyncio.sleep(2) + + # Step 2: Send real setSubscriptionList (like your log shows) + print(f"\n๐Ÿ“ค Step 2: Sending real setSubscriptionList with 4 cameras...") + subscription_list = { + "type": "setSubscriptionList", + "subscriptions": [ + { + "subscriptionIdentifier": "test2;webcam-local-01", + "rtspUrl": "rtsp://10.101.1.4:8554/stream", + "snapshotUrl": "http://10.101.1.4:8080/snapshot", + "snapshotInterval": 2000, + "modelUrl": "https://example.com/models/test-model-43.mpta", + "modelId": 43, + "modelName": "test_model" + }, + { + "subscriptionIdentifier": "test3;webcam-local-02", + "rtspUrl": "rtsp://10.101.1.4:8551/stream", + "snapshotUrl": "http://10.101.1.4:8081/snapshot", + "snapshotInterval": 2000, + "modelUrl": "https://example.com/models/test-model-43.mpta", + "modelId": 43, + "modelName": "test_model" + }, + { + "subscriptionIdentifier": "test4;webcam-local-03", + "rtspUrl": "rtsp://10.101.1.4:8552/stream", + "snapshotUrl": "http://10.101.1.4:8082/snapshot", + "snapshotInterval": 2000, + "modelUrl": "https://example.com/models/test-model-43.mpta", + "modelId": 43, + "modelName": "test_model" + }, + { + "subscriptionIdentifier": "test5;webcam-local-04", + "rtspUrl": "rtsp://10.101.1.4:8553/stream", + "snapshotUrl": "http://10.101.1.4:8083/snapshot", + "snapshotInterval": 2000, + "modelUrl": "https://example.com/models/test-model-43.mpta", + "modelId": 43, + "modelName": "test_model" + } + ] + } + await websocket.send(json.dumps(subscription_list)) + print("โœ… setSubscriptionList sent with 4 cameras (all using model ID 43)") + + # Wait for processing + print(f"\nโณ Waiting for worker to process subscriptions...") + await asyncio.sleep(10) + + # Step 3: Send progression stage (like your log shows) + print(f"\n๐Ÿ“ค Step 3: Sending setProgressionStage...") + progression_stage = { + "type": "setProgressionStage", + "payload": { + "displayIdentifier": "test2", + "progressionStage": "welcome" + } + } + await websocket.send(json.dumps(progression_stage)) + print("โœ… setProgressionStage sent") + + # Wait for any responses + print(f"\n๐Ÿ“ฅ Listening for responses...") + try: + for i in range(3): + response = await asyncio.wait_for(websocket.recv(), timeout=5.0) + print(f"๐Ÿ“ฅ Response {i+1}: {response[:100]}...") + except asyncio.TimeoutError: + print("โฐ No more responses (this is normal)") + + print(f"\nโœ… Complete flow test finished!") + print(f"๐Ÿ“‹ Check worker logs for:") + print(f" - 'IGNORING initial subscribe message' for step 1") + print(f" - 'Unique models to download: [43]' for step 2") + print(f" - Model download progress and success messages") + print(f" - Subscription setup for 4 cameras") + + except ConnectionRefusedError: + print("โŒ Connection refused. Make sure the worker is running with:") + print(" make run-staging") + + except Exception as e: + print(f"โŒ Error: {e}") + +if __name__ == "__main__": + print("๐Ÿงช Complete Subscription Flow Test") + print("This simulates the exact CMS backend behavior you showed in the RX/TX log") + input("Make sure detector worker is running first, then press Enter...") + + asyncio.run(test_complete_flow()) \ No newline at end of file