From 8222e82dd7a8608ddd63b308b09a99b0a65d67f5 Mon Sep 17 00:00:00 2001 From: ziesorx Date: Tue, 23 Sep 2025 15:44:09 +0700 Subject: [PATCH] feat: update rxtx log --- app.py | 4 ++-- core/communication/state.py | 14 ++++++------ core/communication/websocket.py | 38 +++++++++++++++------------------ test_protocol.py | 4 ++-- 4 files changed, 28 insertions(+), 32 deletions(-) diff --git a/app.py b/app.py index ce979d2..8c8a194 100644 --- a/app.py +++ b/app.py @@ -16,7 +16,7 @@ from core.communication.state import worker_state # Configure logging logging.basicConfig( - level=logging.INFO, + level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[ logging.FileHandler("detector_worker.log"), @@ -101,7 +101,7 @@ async def websocket_handler(websocket: WebSocket): Handles all protocol messages according to worker.md specification. """ client_info = f"{websocket.client.host}:{websocket.client.port}" if websocket.client else "unknown" - logger.info(f"New WebSocket connection request from {client_info}") + logger.info(f"[RX ← Backend] New WebSocket connection request from {client_info}") try: await websocket_endpoint(websocket) diff --git a/core/communication/state.py b/core/communication/state.py index 4992b42..b60f341 100644 --- a/core/communication/state.py +++ b/core/communication/state.py @@ -58,11 +58,11 @@ class WorkerState: updated = current_ids & new_ids if added: - logger.info(f"Adding subscriptions: {added}") + logger.info(f"[State Update] Adding subscriptions: {added}") if removed: - logger.info(f"Removing subscriptions: {removed}") + logger.info(f"[State Update] Removing subscriptions: {removed}") if updated: - logger.info(f"Updating subscriptions: {updated}") + logger.info(f"[State Update] Updating subscriptions: {updated}") # Replace entire subscription dict self.subscriptions = new_sub_dict @@ -91,10 +91,10 @@ class WorkerState: with self._lock: if session_id is None: self.session_ids.pop(display_identifier, None) - logger.info(f"Cleared session ID for display {display_identifier}") + logger.info(f"[State Update] Cleared session ID for display {display_identifier}") else: self.session_ids[display_identifier] = session_id - logger.info(f"Set session ID {session_id} for display {display_identifier}") + logger.info(f"[State Update] Set session ID {session_id} for display {display_identifier}") def get_session_id(self, display_identifier: str) -> Optional[int]: """Get session ID for display identifier.""" @@ -121,10 +121,10 @@ class WorkerState: with self._lock: if stage is None: self.progression_stages.pop(display_identifier, None) - logger.info(f"Cleared progression stage for display {display_identifier}") + logger.info(f"[State Update] Cleared progression stage for display {display_identifier}") else: self.progression_stages[display_identifier] = stage - logger.info(f"Set progression stage '{stage}' for display {display_identifier}") + logger.info(f"[State Update] Set progression stage '{stage}' for display {display_identifier}") def get_progression_stage(self, display_identifier: str) -> Optional[str]: """Get progression stage for display identifier.""" diff --git a/core/communication/websocket.py b/core/communication/websocket.py index c7e14c7..1ac80d9 100644 --- a/core/communication/websocket.py +++ b/core/communication/websocket.py @@ -95,7 +95,7 @@ class WebSocketHandler: ) await self._send_message(state_report) - logger.info(f"Sent immediate stateReport: CPU {cpu_usage:.1f}%, Memory {memory_usage:.1f}%, " + logger.info(f"[TX → Backend] stateReport: CPU {cpu_usage:.1f}%, Memory {memory_usage:.1f}%, " f"GPU {gpu_usage or 'N/A'}, {len(camera_connections)} cameras") except Exception as e: @@ -122,8 +122,8 @@ class WebSocketHandler: ) await self._send_message(state_report) - logger.debug(f"Sent heartbeat: CPU {cpu_usage:.1f}%, Memory {memory_usage:.1f}%, " - f"GPU {gpu_usage or 'N/A'}, {len(camera_connections)} cameras") + logger.info(f"[TX → Backend] Heartbeat: CPU {cpu_usage:.1f}%, Memory {memory_usage:.1f}%, " + f"GPU {gpu_usage or 'N/A'}, {len(camera_connections)} cameras") await asyncio.sleep(HEARTBEAT_INTERVAL) @@ -136,7 +136,7 @@ class WebSocketHandler: while self.connected: try: raw_message = await self.websocket.receive_text() - logger.info(f"Received message: {raw_message}") + logger.info(f"[RX ← Backend] {raw_message}") # Parse incoming message message = parse_incoming_message(raw_message) @@ -179,7 +179,7 @@ class WebSocketHandler: async def _handle_set_subscription_list(self, message: SetSubscriptionListMessage) -> None: """Handle setSubscriptionList message for declarative subscription management.""" - logger.info(f"Processing setSubscriptionList with {len(message.subscriptions)} subscriptions") + logger.info(f"[RX Processing] setSubscriptionList with {len(message.subscriptions)} subscriptions") # Update worker state with new subscriptions worker_state.set_subscriptions(message.subscriptions) @@ -203,7 +203,7 @@ class WebSocketHandler: display_identifier = message.payload.displayIdentifier session_id = message.payload.sessionId - logger.info(f"Setting session ID for display {display_identifier}: {session_id}") + logger.info(f"[RX Processing] setSessionId for display {display_identifier}: {session_id}") # Update worker state worker_state.set_session_id(display_identifier, session_id) @@ -213,14 +213,14 @@ class WebSocketHandler: display_identifier = message.payload.displayIdentifier stage = message.payload.progressionStage - logger.info(f"Setting progression stage for display {display_identifier}: {stage}") + logger.info(f"[RX Processing] setProgressionStage for display {display_identifier}: {stage}") # Update worker state worker_state.set_progression_stage(display_identifier, stage) async def _handle_request_state(self, message: RequestStateMessage) -> None: """Handle requestState message by sending immediate state report.""" - logger.debug("Received requestState, sending immediate state report") + logger.debug("[RX Processing] requestState - sending immediate state report") # Collect metrics and send state report cpu_usage = SystemMetrics.get_cpu_usage() @@ -242,7 +242,7 @@ class WebSocketHandler: async def _handle_patch_session_result(self, message: PatchSessionResultMessage) -> None: """Handle patchSessionResult message.""" payload = message.payload - logger.info(f"Received patch session result for session {payload.sessionId}: " + logger.info(f"[RX Processing] patchSessionResult for session {payload.sessionId}: " f"success={payload.success}, message='{payload.message}'") # TODO: Handle patch session result if needed @@ -257,11 +257,11 @@ class WebSocketHandler: try: json_message = serialize_outgoing_message(message) await self.websocket.send_text(json_message) - # Don't log full message for heartbeats to avoid spam, just type + # Log heartbeats at INFO level with simplified format if hasattr(message, 'type') and message.type == 'stateReport': - logger.debug(f"Sent message: {message.type}") + logger.info(f"[TX → Backend] {message.type}") else: - logger.debug(f"Sent message: {json_message}") + logger.info(f"[TX → Backend] {json_message}") except Exception as e: logger.error(f"Failed to send WebSocket message: {e}") raise @@ -277,15 +277,11 @@ class WebSocketHandler: # Get current subscriptions subscriptions = worker_state.get_all_subscriptions() - if subscriptions: - logger.debug(f"Stream processor running with {len(subscriptions)} active subscriptions") - # TODO: Phase 2 - Add actual frame processing logic here - # This will include: - # - Frame reading from RTSP/HTTP streams - # - Model inference using loaded pipelines - # - Detection result sending via WebSocket - else: - logger.debug("Stream processor running with no active subscriptions") + # TODO: Phase 2 - Add actual frame processing logic here + # This will include: + # - Frame reading from RTSP/HTTP streams + # - Model inference using loaded pipelines + # - Detection result sending via WebSocket # Sleep to prevent excessive CPU usage (similar to old poll_interval) await asyncio.sleep(0.1) # 100ms polling interval diff --git a/test_protocol.py b/test_protocol.py index 74af7d8..6b32fd8 100644 --- a/test_protocol.py +++ b/test_protocol.py @@ -9,7 +9,7 @@ import time async def test_protocol(): """Test the worker protocol implementation""" - uri = "ws://localhost:8000" + uri = "ws://localhost:8001" try: async with websockets.connect(uri) as websocket: @@ -119,7 +119,7 @@ async def test_protocol(): except Exception as e: print(f"✗ Connection failed: {e}") - print("Make sure the worker is running on localhost:8000") + print("Make sure the worker is running on localhost:8001") if __name__ == "__main__": asyncio.run(test_protocol()) \ No newline at end of file