From 2e6594d3d47fc6f6776b6a49c694e2539ce1ac90 Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Fri, 6 Jun 2025 15:57:10 +0700 Subject: [PATCH] sercat usb --- .../benq_smartboard/benq_smartboard_lib.py | 165 ++++++++++++---- .../benq_smartboard/serial_test.py | 181 ++++++++++++++++++ 2 files changed, 310 insertions(+), 36 deletions(-) create mode 100644 custom_components/benq_smartboard/serial_test.py diff --git a/custom_components/benq_smartboard/benq_smartboard_lib.py b/custom_components/benq_smartboard/benq_smartboard_lib.py index 0210ae5..0be0d27 100644 --- a/custom_components/benq_smartboard/benq_smartboard_lib.py +++ b/custom_components/benq_smartboard/benq_smartboard_lib.py @@ -1,11 +1,12 @@ """ -A comprehensive Python library for controlling BenQ Smart Board via RS232-over-LAN. +A comprehensive Python library for controlling BenQ Smart Board via RS232-over-LAN or USB Serial. Implements all protocol commands from the official specification. """ import socket import struct -import logging # New import for logging -from typing import Optional, Dict, Any +import logging +import serial # New import for USB Serial support +from typing import Optional, Dict, Any, Union from enum import Enum, unique @@ -166,57 +167,146 @@ def _log_packet(direction: str, data: bytes): class BenQSmartBoard: """ - A client for controlling BenQ Smart Boards via LAN (RS232-over-TCP). + A client for controlling BenQ Smart Boards via LAN (RS232-over-TCP) or USB Serial. Implements all commands from the official protocol specification. """ DEFAULT_PORT = 4660 + DEFAULT_BAUDRATE = 115200 TV_ID = b'01' CR = b'\x0D' - def __init__(self, ip: str, port: int = DEFAULT_PORT, timeout: float = 5.0): + def __init__(self, + ip: str = None, + port: int = DEFAULT_PORT, + timeout: float = 5.0, + connection_type: ConnectionType = ConnectionType.TCP, + serial_port: str = None, + baudrate: int = DEFAULT_BAUDRATE): """ - :param ip: IP address of the BenQ Smart Board + :param ip: IP address of the BenQ Smart Board (required for TCP) :param port: TCP port, default 4660 - :param timeout: socket timeout in seconds + :param timeout: socket/serial timeout in seconds + :param connection_type: ConnectionType.TCP or ConnectionType.SERIAL + :param serial_port: Serial port path (required for SERIAL, e.g., 'COM3', '/dev/ttyUSB0') + :param baudrate: Serial baudrate, default 115200 """ + self.connection_type = connection_type + self.timeout = timeout + + # TCP parameters self.ip = ip self.port = port - self.timeout = timeout self.sock: Optional[socket.socket] = None + + # Serial parameters + self.serial_port = serial_port + self.baudrate = baudrate + self.ser: Optional[serial.Serial] = None # -- Connection Management ------------------------------------------------- def connect(self): + """Establish connection to the Smart Board (TCP or Serial).""" + if self.connection_type == ConnectionType.TCP: + self._connect_tcp() + elif self.connection_type == ConnectionType.SERIAL: + self._connect_serial() + else: + raise ValueError(f"Unsupported connection type: {self.connection_type}") + + def _connect_tcp(self): """Establish a TCP connection to the Smart Board.""" if self.sock is not None: return + if not self.ip: + raise ValueError("IP address is required for TCP connection") try: - logging.debug(f"Connecting to Smart Board at {self.ip}:{self.port}") # New logging line + logging.debug(f"Connecting to Smart Board at {self.ip}:{self.port}") self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.settimeout(self.timeout) self.sock.connect((self.ip, self.port)) - logging.info("Connection established successfully.") # New logging line + logging.info("TCP connection established successfully.") except socket.error as e: self.sock = None - logging.exception("Failed to connect to Smart Board.") # New logging line - raise ConnectionError(f"Failed to connect: {e}") + logging.exception("Failed to connect to Smart Board via TCP.") + raise ConnectionError(f"Failed to connect via TCP: {e}") + + def _connect_serial(self): + """Establish a Serial connection to the Smart Board.""" + if self.ser is not None and self.ser.is_open: + return + if not self.serial_port: + raise ValueError("Serial port is required for Serial connection") + try: + logging.debug(f"Connecting to Smart Board at {self.serial_port}:{self.baudrate}") + self.ser = serial.Serial( + port=self.serial_port, + baudrate=self.baudrate, + bytesize=serial.EIGHTBITS, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE, + timeout=self.timeout + ) + logging.info("Serial connection established successfully.") + except serial.SerialException as e: + self.ser = None + logging.exception("Failed to connect to Smart Board via Serial.") + raise ConnectionError(f"Failed to connect via Serial: {e}") def disconnect(self): + """Close the connection.""" + if self.connection_type == ConnectionType.TCP: + self._disconnect_tcp() + elif self.connection_type == ConnectionType.SERIAL: + self._disconnect_serial() + + def _disconnect_tcp(self): """Close the TCP connection.""" if self.sock: try: - logging.debug("Closing connection to Smart Board.") # New logging line + logging.debug("Closing TCP connection to Smart Board.") self.sock.close() except socket.error: pass self.sock = None - logging.info("Disconnected from Smart Board.") # New logging line + logging.info("Disconnected from Smart Board via TCP.") + + def _disconnect_serial(self): + """Close the Serial connection.""" + if self.ser and self.ser.is_open: + try: + logging.debug("Closing Serial connection to Smart Board.") + self.ser.close() + except serial.SerialException: + pass + self.ser = None + logging.info("Disconnected from Smart Board via Serial.") def _ensure_connection(self): """Helper to ensure we are connected; reconnect if needed.""" - if not self.sock: + if self.connection_type == ConnectionType.TCP and not self.sock: self.connect() + elif self.connection_type == ConnectionType.SERIAL and (not self.ser or not self.ser.is_open): + self.connect() + + def _send_data(self, data: bytes): + """Send data via the active connection.""" + if self.connection_type == ConnectionType.TCP: + self.sock.sendall(data) + elif self.connection_type == ConnectionType.SERIAL: + self.ser.write(data) + else: + raise ValueError(f"Unsupported connection type: {self.connection_type}") + + def _receive_data(self, size: int) -> bytes: + """Receive data via the active connection.""" + if self.connection_type == ConnectionType.TCP: + return self.sock.recv(size) + elif self.connection_type == ConnectionType.SERIAL: + return self.ser.read(size) + else: + raise ValueError(f"Unsupported connection type: {self.connection_type}") # -- Core Send/Receive ----------------------------------------------------- @@ -246,11 +336,11 @@ class BenQSmartBoard: value.encode(), self.CR ) - _log_packet("Sent", packet) # Log sent packet - self.sock.sendall(packet) + _log_packet("Sent", packet) + self._send_data(packet) response = self._receive_response() - logging.debug(f"Received response: {response}") # New logging line + logging.debug(f"Received response: {response}") if response == b'+': return True elif response == b'-': @@ -258,9 +348,9 @@ class BenQSmartBoard: else: # Unknown response type raise CommandError(f"Unknown response type: {response}") - except socket.error as e: + except (socket.error, serial.SerialException) as e: self.disconnect() - logging.exception("Socket error during send_command.") # New logging line + logging.exception("Communication error during send_command.") raise ConnectionError(f"Failed to send command: {e}") def get_command(self, command_code: str) -> str: @@ -278,7 +368,7 @@ class BenQSmartBoard: length = 6 try: - logging.debug(f"Sending get command: code={command_code}") # New logging line + logging.debug(f"Sending get command: code={command_code}") packet = struct.pack( ">B2s1s2s1s", length, @@ -287,17 +377,17 @@ class BenQSmartBoard: command_code.encode(), self.CR ) - _log_packet("Sent", packet) # Log sent packet - self.sock.sendall(packet) + _log_packet("Sent", packet) + self._send_data(packet) response, value = self._receive_get_response() - logging.debug(f"Received get response: {response}, value: {value}") # New logging line + logging.debug(f"Received get response: {response}, value: {value}") if response == b'r': return value.decode() raise CommandError(f"Unexpected get response type: {response}") - except socket.error as e: + except (socket.error, serial.SerialException) as e: self.disconnect() - logging.exception("Socket error during get_command.") # New logging line + logging.exception("Communication error during get_command.") raise ConnectionError(f"Failed to send get command: {e}") def _receive_response(self): @@ -305,20 +395,23 @@ class BenQSmartBoard: Receive the standard 5-byte response for a Set command: [ length(1) | ID(2) | command_type(1) | CR(1) ] command_type should be '+' or '-'. + + We interpret the protocol to have an additional 3 bytes for the value if command_type='r'. + Return (command_type, value_bytes). """ try: - resp = self.sock.recv(5) - _log_packet("Received", resp) # Log received data + resp = self._receive_data(5) + _log_packet("Received", resp) if len(resp) < 5: raise CommandError("Incomplete response from Smart Board (set).") - logging.debug(f"Raw set response data: {resp}") # New logging line + logging.debug(f"Raw set response data: {resp}") # parse # >B2s1s1s # But effectively, we only need the command_type at index 3 length = resp[0] command_type = resp[3:4] return command_type - except socket.timeout: + except (socket.timeout, serial.SerialTimeoutException): raise ConnectionError("Timed out waiting for response (set).") def _receive_get_response(self): @@ -330,25 +423,25 @@ class BenQSmartBoard: Return (command_type, value_bytes). """ try: - header = self.sock.recv(5) - _log_packet("Received", header) # Log header + header = self._receive_data(5) + _log_packet("Received", header) if len(header) < 5: raise CommandError("Incomplete response from Smart Board (get header).") - logging.debug(f"Raw get response header: {header}") # New logging line + logging.debug(f"Raw get response header: {header}") length = header[0] command_type = header[3:4] # e.g. b'r' # Now read next 3 bytes if it's a valid get reply if command_type == b'r': - value = self.sock.recv(3) - _log_packet("Received", value) # Log value bytes + value = self._receive_data(3) + _log_packet("Received", value) if len(value) < 3: raise CommandError("Incomplete 3-byte value in get response.") return command_type, value else: # No further data expected if it's not 'r' return command_type, b'' - except socket.timeout: + except (socket.timeout, serial.SerialTimeoutException): raise ConnectionError("Timed out waiting for response (get).") # -- Full Set of Commands (from your Protocol) ----------------------------- diff --git a/custom_components/benq_smartboard/serial_test.py b/custom_components/benq_smartboard/serial_test.py new file mode 100644 index 0000000..023d051 --- /dev/null +++ b/custom_components/benq_smartboard/serial_test.py @@ -0,0 +1,181 @@ +#!/usr/bin/env python3 +""" +Test script for BenQ Smart Board library using USB Serial connection. +""" + +import logging +import time +from benq_smartboard_lib import ( + BenQSmartBoard, + ConnectionType, + PowerState, + VideoSource, + BenQSmartBoardError +) + +# Configure logging +logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s - %(levelname)s - %(message)s' +) + +def test_serial_connection(): + """Test the serial connection functionality.""" + + # Configure your serial port here + SERIAL_PORT = 'COM3' # Windows example, use '/dev/ttyUSB0' for Linux + BAUDRATE = 115200 + + print(f"Testing BenQ Smart Board via Serial: {SERIAL_PORT} @ {BAUDRATE}") + + # Create Smart Board instance with Serial connection + board = BenQSmartBoard( + connection_type=ConnectionType.SERIAL, + serial_port=SERIAL_PORT, + baudrate=BAUDRATE, + timeout=5.0 + ) + + try: + # Connect to the Smart Board + print("Connecting to Smart Board...") + board.connect() + print("Connected successfully!") + + # Test basic commands + print("\n=== Testing Basic Commands ===") + + # Get current power state + try: + power_state = board.get_power() + print(f"Current power state: {power_state}") + except Exception as e: + print(f"Failed to get power state: {e}") + + # Get current video source + try: + video_source = board.get_video_source() + print(f"Current video source: {video_source}") + except Exception as e: + print(f"Failed to get video source: {e}") + + # Get current volume + try: + volume = board.get_volume() + print(f"Current volume: {volume}") + except Exception as e: + print(f"Failed to get volume: {e}") + + # Get current brightness + try: + brightness = board.get_brightness() + print(f"Current brightness: {brightness}") + except Exception as e: + print(f"Failed to get brightness: {e}") + + print("\n=== Testing Set Commands ===") + + # Test setting volume + try: + print("Setting volume to 50...") + success = board.set_volume(50) + print(f"Set volume result: {'Success' if success else 'Failed'}") + time.sleep(1) + + # Verify the change + new_volume = board.get_volume() + print(f"New volume: {new_volume}") + except Exception as e: + print(f"Failed to set volume: {e}") + + # Test setting brightness + try: + print("Setting brightness to 80...") + success = board.set_brightness(80) + print(f"Set brightness result: {'Success' if success else 'Failed'}") + time.sleep(1) + + # Verify the change + new_brightness = board.get_brightness() + print(f"New brightness: {new_brightness}") + except Exception as e: + print(f"Failed to set brightness: {e}") + + # Test video source switching + try: + print("Switching to HDMI1...") + success = board.set_video_source(VideoSource.HDMI1) + print(f"Set video source result: {'Success' if success else 'Failed'}") + time.sleep(2) + + # Verify the change + new_source = board.get_video_source() + print(f"New video source: {new_source}") + except Exception as e: + print(f"Failed to set video source: {e}") + + print("\n=== Testing Advanced Commands ===") + + # Test RTC reading + try: + rtc_info = board.get_rtc() + print(f"RTC Info: {rtc_info}") + except Exception as e: + print(f"Failed to get RTC info: {e}") + + # Test mute functionality + try: + print("Testing mute...") + success = board.set_mute(True) + print(f"Mute ON result: {'Success' if success else 'Failed'}") + time.sleep(2) + + success = board.set_mute(False) + print(f"Mute OFF result: {'Success' if success else 'Failed'}") + except Exception as e: + print(f"Failed to test mute: {e}") + + print("\n=== Test Completed Successfully ===") + + except BenQSmartBoardError as e: + print(f"Smart Board Error: {e}") + except Exception as e: + print(f"Unexpected error: {e}") + logging.exception("Unexpected error occurred") + finally: + # Always disconnect + print("Disconnecting...") + board.disconnect() + print("Disconnected.") + +def list_serial_ports(): + """List available serial ports.""" + try: + import serial.tools.list_ports + ports = serial.tools.list_ports.comports() + print("Available serial ports:") + for port in ports: + print(f" {port.device} - {port.description}") + return [port.device for port in ports] + except ImportError: + print("pyserial tools not available. Install with: pip install pyserial") + return [] + +if __name__ == "__main__": + print("BenQ Smart Board Serial Test") + print("=" * 40) + + # List available ports + available_ports = list_serial_ports() + + if not available_ports: + print("No serial ports found or pyserial not installed.") + print("Make sure to:") + print("1. Install pyserial: pip install pyserial") + print("2. Connect your BenQ Smart Board via USB") + print("3. Update the SERIAL_PORT variable in this script") + else: + print(f"\nFound {len(available_ports)} serial port(s)") + + print("\nStarting test...") + test_serial_connection()