diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..dde3895 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.DS_Store +*.pyc diff --git a/custom_components/benq_smartboard/benq_smartboard_lib.py b/custom_components/benq_smartboard/benq_smartboard_lib.py index 99eaa25..0210ae5 100644 --- a/custom_components/benq_smartboard/benq_smartboard_lib.py +++ b/custom_components/benq_smartboard/benq_smartboard_lib.py @@ -4,6 +4,7 @@ Implements all protocol commands from the official specification. """ import socket import struct +import logging # New import for logging from typing import Optional, Dict, Any from enum import Enum, unique @@ -158,6 +159,11 @@ class SwitchOnStatus(Enum): # -- The Main BenQSmartBoard Class -------------------------------------------- +def _log_packet(direction: str, data: bytes): + """Log packet data in hex format.""" + hex_data = data.hex().upper() + logging.debug(f"{direction} [{len(data)} bytes]: {hex_data}") + class BenQSmartBoard: """ A client for controlling BenQ Smart Boards via LAN (RS232-over-TCP). @@ -184,25 +190,28 @@ class BenQSmartBoard: def connect(self): """Establish a TCP connection to the Smart Board.""" if self.sock is not None: - # Already connected or trying return - try: + logging.debug(f"Connecting to Smart Board at {self.ip}:{self.port}") # New logging line self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.settimeout(self.timeout) self.sock.connect((self.ip, self.port)) + logging.info("Connection established successfully.") # New logging line except socket.error as e: self.sock = None + logging.exception("Failed to connect to Smart Board.") # New logging line raise ConnectionError(f"Failed to connect: {e}") def disconnect(self): """Close the TCP connection.""" if self.sock: try: + logging.debug("Closing connection to Smart Board.") # New logging line self.sock.close() except socket.error: pass self.sock = None + logging.info("Disconnected from Smart Board.") # New logging line def _ensure_connection(self): """Helper to ensure we are connected; reconnect if needed.""" @@ -237,9 +246,11 @@ class BenQSmartBoard: value.encode(), self.CR ) + _log_packet("Sent", packet) # Log sent packet self.sock.sendall(packet) response = self._receive_response() + logging.debug(f"Received response: {response}") # New logging line if response == b'+': return True elif response == b'-': @@ -249,6 +260,7 @@ class BenQSmartBoard: raise CommandError(f"Unknown response type: {response}") except socket.error as e: self.disconnect() + logging.exception("Socket error during send_command.") # New logging line raise ConnectionError(f"Failed to send command: {e}") def get_command(self, command_code: str) -> str: @@ -266,6 +278,7 @@ class BenQSmartBoard: length = 6 try: + logging.debug(f"Sending get command: code={command_code}") # New logging line packet = struct.pack( ">B2s1s2s1s", length, @@ -274,14 +287,17 @@ class BenQSmartBoard: command_code.encode(), self.CR ) + _log_packet("Sent", packet) # Log sent packet self.sock.sendall(packet) response, value = self._receive_get_response() + logging.debug(f"Received get response: {response}, value: {value}") # New logging line if response == b'r': return value.decode() raise CommandError(f"Unexpected get response type: {response}") except socket.error as e: self.disconnect() + logging.exception("Socket error during get_command.") # New logging line raise ConnectionError(f"Failed to send get command: {e}") def _receive_response(self): @@ -292,8 +308,10 @@ class BenQSmartBoard: """ try: resp = self.sock.recv(5) + _log_packet("Received", resp) # Log received data if len(resp) < 5: raise CommandError("Incomplete response from Smart Board (set).") + logging.debug(f"Raw set response data: {resp}") # New logging line # parse # >B2s1s1s # But effectively, we only need the command_type at index 3 @@ -313,14 +331,17 @@ class BenQSmartBoard: """ try: header = self.sock.recv(5) + _log_packet("Received", header) # Log header if len(header) < 5: raise CommandError("Incomplete response from Smart Board (get header).") + logging.debug(f"Raw get response header: {header}") # New logging line length = header[0] command_type = header[3:4] # e.g. b'r' # Now read next 3 bytes if it's a valid get reply if command_type == b'r': value = self.sock.recv(3) + _log_packet("Received", value) # Log value bytes if len(value) < 3: raise CommandError("Incomplete 3-byte value in get response.") return command_type, value diff --git a/custom_components/benq_smartboard/nolib_test.py b/custom_components/benq_smartboard/nolib_test.py new file mode 100644 index 0000000..e0d4ff3 --- /dev/null +++ b/custom_components/benq_smartboard/nolib_test.py @@ -0,0 +1,68 @@ +import socket +import struct +import time +import logging + +logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') + +# Configuration for the Smart Board: +IP_ADDRESS = "10.10.4.173" # Replace with your board's IP +PORT = 4660 +TIMEOUT = 5.0 + +TV_ID = b'01' +CR = b'\x0D' + +def log_packet(direction: str, data: bytes): + hex_data = data.hex().upper() + logging.debug(f"{direction} [{len(data)} bytes]: {hex_data}") + +def main(): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(TIMEOUT) + try: + logging.debug(f"Connecting to {IP_ADDRESS}:{PORT}") + sock.connect((IP_ADDRESS, PORT)) + logging.info("Connected.") + + # Build "get_power" command + # Protocol: length=5, TV_ID, 'g' (get command), command_code (1 byte) and CR. + command_code = b'\x6C' # Power get command + length = 5 + packet = struct.pack(">B2s1s1s1s", length, TV_ID, b'g', command_code, CR) + log_packet("Sent", packet) + + sock.sendall(packet) + time.sleep(1) # Wait a bit for response + + # Receive header first (expected 5 bytes): + header = sock.recv(5) + log_packet("Received", header) + if len(header) < 5: + logging.error("Incomplete header received.") + return + + command_type = header[3:4] + if command_type != b'r': + logging.error(f"Unexpected response type: {command_type}") + return + + # Wait a bit before reading the value bytes. + time.sleep(0.1) + value = sock.recv(3) + log_packet("Received", value) + if len(value) < 3: + logging.error("Incomplete value bytes received.") + return + + power_state = value.decode() + logging.info(f"Power state: {power_state}") + except Exception as ex: + logging.exception(f"Error during manual test: {ex}") + finally: + logging.debug("Closing socket.") + sock.close() + logging.info("Connection closed.") + +if __name__ == '__main__': + main() diff --git a/custom_components/benq_smartboard/test.py b/custom_components/benq_smartboard/test.py new file mode 100644 index 0000000..c6b09cd --- /dev/null +++ b/custom_components/benq_smartboard/test.py @@ -0,0 +1,41 @@ +import logging +from benq_smartboard_lib import BenQSmartBoard, ConnectionError, CommandError, PowerState + +# Configure detailed debug logging. +logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') + +# Configurable parameters; update these variables with your Smart Board details. +IP_ADDRESS = "10.10.4.173" # Replace with your Smart Board IP. +PORT = 4660 + +def run_test(): + board = BenQSmartBoard(ip=IP_ADDRESS, port=PORT) + try: + logging.debug('Attempting to connect to the Smart Board.') + board.connect() + logging.info('Connected successfully.') + + # Get the current power state. + logging.debug('Sending get_power command.') + current_power = board.get_power() + logging.info(f'Current power state: {current_power}') + + # Toggle power state for test purposes. + logging.debug('Toggling power state.') + if current_power.strip() == PowerState.OFF.value: + logging.info("Setting power state to ON.") + success = board.set_power(PowerState.ON) + else: + logging.info("Setting power state to OFF.") + success = board.set_power(PowerState.OFF) + logging.info(f'set_power success: {success}') + + except (ConnectionError, CommandError, Exception) as ex: + logging.exception(f'Exception during Smart Board interaction: {ex}') + finally: + logging.debug('Disconnecting from Smart Board.') + board.disconnect() + logging.info('Disconnected.') + +# Execute the test. +run_test()