sercat usb

This commit is contained in:
Siwat Sirichai 2025-06-06 15:57:10 +07:00
parent b0790f0068
commit 2e6594d3d4
2 changed files with 310 additions and 36 deletions

View file

@ -1,11 +1,12 @@
"""
A comprehensive Python library for controlling BenQ Smart Board via RS232-over-LAN.
A comprehensive Python library for controlling BenQ Smart Board via RS232-over-LAN or USB Serial.
Implements all protocol commands from the official specification.
"""
import socket
import struct
import logging # New import for logging
from typing import Optional, Dict, Any
import logging
import serial # New import for USB Serial support
from typing import Optional, Dict, Any, Union
from enum import Enum, unique
@ -166,57 +167,146 @@ def _log_packet(direction: str, data: bytes):
class BenQSmartBoard:
"""
A client for controlling BenQ Smart Boards via LAN (RS232-over-TCP).
A client for controlling BenQ Smart Boards via LAN (RS232-over-TCP) or USB Serial.
Implements all commands from the official protocol specification.
"""
DEFAULT_PORT = 4660
DEFAULT_BAUDRATE = 115200
TV_ID = b'01'
CR = b'\x0D'
def __init__(self, ip: str, port: int = DEFAULT_PORT, timeout: float = 5.0):
def __init__(self,
ip: str = None,
port: int = DEFAULT_PORT,
timeout: float = 5.0,
connection_type: ConnectionType = ConnectionType.TCP,
serial_port: str = None,
baudrate: int = DEFAULT_BAUDRATE):
"""
:param ip: IP address of the BenQ Smart Board
:param ip: IP address of the BenQ Smart Board (required for TCP)
:param port: TCP port, default 4660
:param timeout: socket timeout in seconds
:param timeout: socket/serial timeout in seconds
:param connection_type: ConnectionType.TCP or ConnectionType.SERIAL
:param serial_port: Serial port path (required for SERIAL, e.g., 'COM3', '/dev/ttyUSB0')
:param baudrate: Serial baudrate, default 115200
"""
self.connection_type = connection_type
self.timeout = timeout
# TCP parameters
self.ip = ip
self.port = port
self.timeout = timeout
self.sock: Optional[socket.socket] = None
# Serial parameters
self.serial_port = serial_port
self.baudrate = baudrate
self.ser: Optional[serial.Serial] = None
# -- Connection Management -------------------------------------------------
def connect(self):
"""Establish connection to the Smart Board (TCP or Serial)."""
if self.connection_type == ConnectionType.TCP:
self._connect_tcp()
elif self.connection_type == ConnectionType.SERIAL:
self._connect_serial()
else:
raise ValueError(f"Unsupported connection type: {self.connection_type}")
def _connect_tcp(self):
"""Establish a TCP connection to the Smart Board."""
if self.sock is not None:
return
if not self.ip:
raise ValueError("IP address is required for TCP connection")
try:
logging.debug(f"Connecting to Smart Board at {self.ip}:{self.port}") # New logging line
logging.debug(f"Connecting to Smart Board at {self.ip}:{self.port}")
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(self.timeout)
self.sock.connect((self.ip, self.port))
logging.info("Connection established successfully.") # New logging line
logging.info("TCP connection established successfully.")
except socket.error as e:
self.sock = None
logging.exception("Failed to connect to Smart Board.") # New logging line
raise ConnectionError(f"Failed to connect: {e}")
logging.exception("Failed to connect to Smart Board via TCP.")
raise ConnectionError(f"Failed to connect via TCP: {e}")
def _connect_serial(self):
"""Establish a Serial connection to the Smart Board."""
if self.ser is not None and self.ser.is_open:
return
if not self.serial_port:
raise ValueError("Serial port is required for Serial connection")
try:
logging.debug(f"Connecting to Smart Board at {self.serial_port}:{self.baudrate}")
self.ser = serial.Serial(
port=self.serial_port,
baudrate=self.baudrate,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=self.timeout
)
logging.info("Serial connection established successfully.")
except serial.SerialException as e:
self.ser = None
logging.exception("Failed to connect to Smart Board via Serial.")
raise ConnectionError(f"Failed to connect via Serial: {e}")
def disconnect(self):
"""Close the connection."""
if self.connection_type == ConnectionType.TCP:
self._disconnect_tcp()
elif self.connection_type == ConnectionType.SERIAL:
self._disconnect_serial()
def _disconnect_tcp(self):
"""Close the TCP connection."""
if self.sock:
try:
logging.debug("Closing connection to Smart Board.") # New logging line
logging.debug("Closing TCP connection to Smart Board.")
self.sock.close()
except socket.error:
pass
self.sock = None
logging.info("Disconnected from Smart Board.") # New logging line
logging.info("Disconnected from Smart Board via TCP.")
def _disconnect_serial(self):
"""Close the Serial connection."""
if self.ser and self.ser.is_open:
try:
logging.debug("Closing Serial connection to Smart Board.")
self.ser.close()
except serial.SerialException:
pass
self.ser = None
logging.info("Disconnected from Smart Board via Serial.")
def _ensure_connection(self):
"""Helper to ensure we are connected; reconnect if needed."""
if not self.sock:
if self.connection_type == ConnectionType.TCP and not self.sock:
self.connect()
elif self.connection_type == ConnectionType.SERIAL and (not self.ser or not self.ser.is_open):
self.connect()
def _send_data(self, data: bytes):
"""Send data via the active connection."""
if self.connection_type == ConnectionType.TCP:
self.sock.sendall(data)
elif self.connection_type == ConnectionType.SERIAL:
self.ser.write(data)
else:
raise ValueError(f"Unsupported connection type: {self.connection_type}")
def _receive_data(self, size: int) -> bytes:
"""Receive data via the active connection."""
if self.connection_type == ConnectionType.TCP:
return self.sock.recv(size)
elif self.connection_type == ConnectionType.SERIAL:
return self.ser.read(size)
else:
raise ValueError(f"Unsupported connection type: {self.connection_type}")
# -- Core Send/Receive -----------------------------------------------------
@ -246,11 +336,11 @@ class BenQSmartBoard:
value.encode(),
self.CR
)
_log_packet("Sent", packet) # Log sent packet
self.sock.sendall(packet)
_log_packet("Sent", packet)
self._send_data(packet)
response = self._receive_response()
logging.debug(f"Received response: {response}") # New logging line
logging.debug(f"Received response: {response}")
if response == b'+':
return True
elif response == b'-':
@ -258,9 +348,9 @@ class BenQSmartBoard:
else:
# Unknown response type
raise CommandError(f"Unknown response type: {response}")
except socket.error as e:
except (socket.error, serial.SerialException) as e:
self.disconnect()
logging.exception("Socket error during send_command.") # New logging line
logging.exception("Communication error during send_command.")
raise ConnectionError(f"Failed to send command: {e}")
def get_command(self, command_code: str) -> str:
@ -278,7 +368,7 @@ class BenQSmartBoard:
length = 6
try:
logging.debug(f"Sending get command: code={command_code}") # New logging line
logging.debug(f"Sending get command: code={command_code}")
packet = struct.pack(
">B2s1s2s1s",
length,
@ -287,17 +377,17 @@ class BenQSmartBoard:
command_code.encode(),
self.CR
)
_log_packet("Sent", packet) # Log sent packet
self.sock.sendall(packet)
_log_packet("Sent", packet)
self._send_data(packet)
response, value = self._receive_get_response()
logging.debug(f"Received get response: {response}, value: {value}") # New logging line
logging.debug(f"Received get response: {response}, value: {value}")
if response == b'r':
return value.decode()
raise CommandError(f"Unexpected get response type: {response}")
except socket.error as e:
except (socket.error, serial.SerialException) as e:
self.disconnect()
logging.exception("Socket error during get_command.") # New logging line
logging.exception("Communication error during get_command.")
raise ConnectionError(f"Failed to send get command: {e}")
def _receive_response(self):
@ -305,20 +395,23 @@ class BenQSmartBoard:
Receive the standard 5-byte response for a Set command:
[ length(1) | ID(2) | command_type(1) | CR(1) ]
command_type should be '+' or '-'.
We interpret the protocol to have an additional 3 bytes for the value if command_type='r'.
Return (command_type, value_bytes).
"""
try:
resp = self.sock.recv(5)
_log_packet("Received", resp) # Log received data
resp = self._receive_data(5)
_log_packet("Received", resp)
if len(resp) < 5:
raise CommandError("Incomplete response from Smart Board (set).")
logging.debug(f"Raw set response data: {resp}") # New logging line
logging.debug(f"Raw set response data: {resp}")
# parse
# >B2s1s1s
# But effectively, we only need the command_type at index 3
length = resp[0]
command_type = resp[3:4]
return command_type
except socket.timeout:
except (socket.timeout, serial.SerialTimeoutException):
raise ConnectionError("Timed out waiting for response (set).")
def _receive_get_response(self):
@ -330,25 +423,25 @@ class BenQSmartBoard:
Return (command_type, value_bytes).
"""
try:
header = self.sock.recv(5)
_log_packet("Received", header) # Log header
header = self._receive_data(5)
_log_packet("Received", header)
if len(header) < 5:
raise CommandError("Incomplete response from Smart Board (get header).")
logging.debug(f"Raw get response header: {header}") # New logging line
logging.debug(f"Raw get response header: {header}")
length = header[0]
command_type = header[3:4] # e.g. b'r'
# Now read next 3 bytes if it's a valid get reply
if command_type == b'r':
value = self.sock.recv(3)
_log_packet("Received", value) # Log value bytes
value = self._receive_data(3)
_log_packet("Received", value)
if len(value) < 3:
raise CommandError("Incomplete 3-byte value in get response.")
return command_type, value
else:
# No further data expected if it's not 'r'
return command_type, b''
except socket.timeout:
except (socket.timeout, serial.SerialTimeoutException):
raise ConnectionError("Timed out waiting for response (get).")
# -- Full Set of Commands (from your Protocol) -----------------------------

View file

@ -0,0 +1,181 @@
#!/usr/bin/env python3
"""
Test script for BenQ Smart Board library using USB Serial connection.
"""
import logging
import time
from benq_smartboard_lib import (
BenQSmartBoard,
ConnectionType,
PowerState,
VideoSource,
BenQSmartBoardError
)
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def test_serial_connection():
"""Test the serial connection functionality."""
# Configure your serial port here
SERIAL_PORT = 'COM3' # Windows example, use '/dev/ttyUSB0' for Linux
BAUDRATE = 115200
print(f"Testing BenQ Smart Board via Serial: {SERIAL_PORT} @ {BAUDRATE}")
# Create Smart Board instance with Serial connection
board = BenQSmartBoard(
connection_type=ConnectionType.SERIAL,
serial_port=SERIAL_PORT,
baudrate=BAUDRATE,
timeout=5.0
)
try:
# Connect to the Smart Board
print("Connecting to Smart Board...")
board.connect()
print("Connected successfully!")
# Test basic commands
print("\n=== Testing Basic Commands ===")
# Get current power state
try:
power_state = board.get_power()
print(f"Current power state: {power_state}")
except Exception as e:
print(f"Failed to get power state: {e}")
# Get current video source
try:
video_source = board.get_video_source()
print(f"Current video source: {video_source}")
except Exception as e:
print(f"Failed to get video source: {e}")
# Get current volume
try:
volume = board.get_volume()
print(f"Current volume: {volume}")
except Exception as e:
print(f"Failed to get volume: {e}")
# Get current brightness
try:
brightness = board.get_brightness()
print(f"Current brightness: {brightness}")
except Exception as e:
print(f"Failed to get brightness: {e}")
print("\n=== Testing Set Commands ===")
# Test setting volume
try:
print("Setting volume to 50...")
success = board.set_volume(50)
print(f"Set volume result: {'Success' if success else 'Failed'}")
time.sleep(1)
# Verify the change
new_volume = board.get_volume()
print(f"New volume: {new_volume}")
except Exception as e:
print(f"Failed to set volume: {e}")
# Test setting brightness
try:
print("Setting brightness to 80...")
success = board.set_brightness(80)
print(f"Set brightness result: {'Success' if success else 'Failed'}")
time.sleep(1)
# Verify the change
new_brightness = board.get_brightness()
print(f"New brightness: {new_brightness}")
except Exception as e:
print(f"Failed to set brightness: {e}")
# Test video source switching
try:
print("Switching to HDMI1...")
success = board.set_video_source(VideoSource.HDMI1)
print(f"Set video source result: {'Success' if success else 'Failed'}")
time.sleep(2)
# Verify the change
new_source = board.get_video_source()
print(f"New video source: {new_source}")
except Exception as e:
print(f"Failed to set video source: {e}")
print("\n=== Testing Advanced Commands ===")
# Test RTC reading
try:
rtc_info = board.get_rtc()
print(f"RTC Info: {rtc_info}")
except Exception as e:
print(f"Failed to get RTC info: {e}")
# Test mute functionality
try:
print("Testing mute...")
success = board.set_mute(True)
print(f"Mute ON result: {'Success' if success else 'Failed'}")
time.sleep(2)
success = board.set_mute(False)
print(f"Mute OFF result: {'Success' if success else 'Failed'}")
except Exception as e:
print(f"Failed to test mute: {e}")
print("\n=== Test Completed Successfully ===")
except BenQSmartBoardError as e:
print(f"Smart Board Error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
logging.exception("Unexpected error occurred")
finally:
# Always disconnect
print("Disconnecting...")
board.disconnect()
print("Disconnected.")
def list_serial_ports():
"""List available serial ports."""
try:
import serial.tools.list_ports
ports = serial.tools.list_ports.comports()
print("Available serial ports:")
for port in ports:
print(f" {port.device} - {port.description}")
return [port.device for port in ports]
except ImportError:
print("pyserial tools not available. Install with: pip install pyserial")
return []
if __name__ == "__main__":
print("BenQ Smart Board Serial Test")
print("=" * 40)
# List available ports
available_ports = list_serial_ports()
if not available_ports:
print("No serial ports found or pyserial not installed.")
print("Make sure to:")
print("1. Install pyserial: pip install pyserial")
print("2. Connect your BenQ Smart Board via USB")
print("3. Update the SERIAL_PORT variable in this script")
else:
print(f"\nFound {len(available_ports)} serial port(s)")
print("\nStarting test...")
test_serial_connection()