benq-rm-homeassistant/custom_components/benq_smartboard/benq_smartboard_lib.py
2025-06-06 17:05:15 +07:00

799 lines
26 KiB
Python

"""
A comprehensive Python library for controlling BenQ Smart Board via RS232-over-LAN or USB Serial.
Implements all protocol commands from the official specification.
"""
import socket
import struct
import logging
import serial # New import for USB Serial support
from typing import Optional, Dict, Any, Union
from enum import Enum, unique
# -- Exceptions ----------------------------------------------------------------
class BenQSmartBoardError(Exception):
"""Base exception for BenQ Smart Board errors."""
pass
class ConnectionError(BenQSmartBoardError):
"""Exception for connection-related errors."""
pass
class CommandError(BenQSmartBoardError):
"""Exception for invalid or failed commands."""
pass
# -- Enums for Commands --------------------------------------------------------
@unique
class PowerState(Enum):
OFF = '000'
ON = '001'
STANDBY = '002'
REBOOT = '003'
@unique
class VideoSource(Enum):
VGA = '000'
HDMI = '001'
HDMI1 = '002'
HDMI2 = '021'
DISPLAY_PORT = '007'
TYPE_C = '051'
ANDROID = '101'
OPS = '102'
DEFAULT = 'FF' # used in E0 timer to restore last signal, etc.
@unique
class AspectRatio(Enum):
SIXTEEN_NINE = '000'
PTP = '002'
@unique
class Language(Enum):
ENGLISH = '000'
FRANCAIS = '001'
ESPANOL = '002'
FANTZHONG = '003'
JIANZHONG = '004'
PORTUGUES = '005'
GERMAN = '006'
DUTCH = '007'
POLISH = '008'
RUSSIA = '009'
CZECH = '010'
DANISH = '011'
SWEDISH = '012'
ITALIAN = '013'
ROMANIAN = '014'
NORWEGIAN = '015'
FINNISH = '016'
GREEK = '017'
TURKISH = '018'
ARABIC = '019'
JAPANSE = '020'
THAILAND = '021'
KOREAN = '022'
HUNGARIAN = '023'
PERSIAN = '024'
VIETNAMESE = '025'
INDONESIA = '026'
HEBREW = '027'
@unique
class SoundMode(Enum):
CINEMA = '000'
STANDARD = '001'
MUSIC = '003'
NEWS = '005'
SPORTS = '006'
@unique
class RemoteCommand(Enum):
VOL_PLUS = '000'
VOL_MINUS = '001'
REMOTE_UP = '010'
REMOTE_DOWN = '011'
REMOTE_LEFT = '012'
REMOTE_RIGHT = '013'
REMOTE_OK = '014'
REMOTE_MENU_KEY = '020'
REMOTE_EXIT = '022'
BLANK = '031'
FREEZE = '032'
@unique
class IRControl(Enum):
DISABLE = '000'
ENABLE = '001'
@unique
class ButtonIRControl(Enum):
DISABLE = '000'
ENABLE = '001'
@unique
class ButtonControl(Enum):
DISABLE = '000'
ENABLE = '001'
@unique
class PixelShift(Enum):
OFF = '000'
ON = '001'
@unique
class PictureMode(Enum):
STANDARD = '000'
BRIGHT = '001'
SOFT = '002'
ECO = '003'
CUSTOM_1 = '005'
CUSTOM_2 = '006'
CUSTOM_3 = '007'
@unique
class DCR(Enum):
OFF = '000'
ON = '001'
@unique
class ColorTemp(Enum):
COOL = '000'
NORMAL = '001'
WARM = '002'
@unique
class PowerSaveMode(Enum):
OFF = '000'
LOW = '001'
HIGH = '002'
@unique
class SwitchOnStatus(Enum):
POWER_OFF = '000'
FORCE_ON = '001'
LAST_STATUS = '002'
@unique
class ConnectionType(Enum):
TCP = 'tcp'
SERIAL = 'serial'
# -- The Main BenQSmartBoard Class --------------------------------------------
def _log_packet(direction: str, data: bytes):
"""Log packet data in hex format."""
hex_data = data.hex().upper()
logging.debug(f"{direction} [{len(data)} bytes]: {hex_data}")
class BenQSmartBoard:
"""
A client for controlling BenQ Smart Boards via LAN (RS232-over-TCP) or USB Serial.
Implements all commands from the official protocol specification.
"""
DEFAULT_PORT = 4660
DEFAULT_BAUDRATE = 115200
TV_ID = b'01'
CR = b'\x0D'
def __init__(self,
ip: str = None,
port: int = DEFAULT_PORT,
timeout: float = 5.0,
connection_type: ConnectionType = ConnectionType.TCP,
serial_port: str = None,
baudrate: int = DEFAULT_BAUDRATE):
"""
:param ip: IP address of the BenQ Smart Board (required for TCP)
:param port: TCP port, default 4660
:param timeout: socket/serial timeout in seconds
:param connection_type: ConnectionType.TCP or ConnectionType.SERIAL
:param serial_port: Serial port path (required for SERIAL, e.g., 'COM3', '/dev/ttyUSB0')
:param baudrate: Serial baudrate, default 115200
"""
self.connection_type = connection_type
self.timeout = timeout
# TCP parameters
self.ip = ip
self.port = port
self.sock: Optional[socket.socket] = None
# Serial parameters
self.serial_port = serial_port
self.baudrate = baudrate
self.ser: Optional[serial.Serial] = None
# -- Connection Management -------------------------------------------------
def connect(self):
"""Establish connection to the Smart Board (TCP or Serial)."""
if self.connection_type == ConnectionType.TCP:
self._connect_tcp()
elif self.connection_type == ConnectionType.SERIAL:
self._connect_serial()
else:
raise ValueError(f"Unsupported connection type: {self.connection_type}")
def _connect_tcp(self):
"""Establish a TCP connection to the Smart Board."""
if self.sock is not None:
return
if not self.ip:
raise ValueError("IP address is required for TCP connection")
try:
logging.debug(f"Connecting to Smart Board at {self.ip}:{self.port}")
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(self.timeout)
self.sock.connect((self.ip, self.port))
logging.info("TCP connection established successfully.")
except socket.error as e:
self.sock = None
logging.exception("Failed to connect to Smart Board via TCP.")
raise ConnectionError(f"Failed to connect via TCP: {e}")
def _connect_serial(self):
"""Establish a Serial connection to the Smart Board."""
if self.ser is not None and self.ser.is_open:
return
if not self.serial_port:
raise ValueError("Serial port is required for Serial connection")
try:
logging.debug(f"Connecting to Smart Board at {self.serial_port}:{self.baudrate}")
self.ser = serial.Serial(
port=self.serial_port,
baudrate=self.baudrate,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=self.timeout
)
logging.info("Serial connection established successfully.")
except serial.SerialException as e:
self.ser = None
logging.exception("Failed to connect to Smart Board via Serial.")
raise ConnectionError(f"Failed to connect via Serial: {e}")
def disconnect(self):
"""Close the connection."""
if self.connection_type == ConnectionType.TCP:
self._disconnect_tcp()
elif self.connection_type == ConnectionType.SERIAL:
self._disconnect_serial()
def _disconnect_tcp(self):
"""Close the TCP connection."""
if self.sock:
try:
logging.debug("Closing TCP connection to Smart Board.")
self.sock.close()
except socket.error:
pass
self.sock = None
logging.info("Disconnected from Smart Board via TCP.")
def _disconnect_serial(self):
"""Close the Serial connection."""
if self.ser and self.ser.is_open:
try:
logging.debug("Closing Serial connection to Smart Board.")
self.ser.close()
except serial.SerialException:
pass
self.ser = None
logging.info("Disconnected from Smart Board via Serial.")
def _ensure_connection(self):
"""Helper to ensure we are connected; reconnect if needed."""
if self.connection_type == ConnectionType.TCP and not self.sock:
self.connect()
elif self.connection_type == ConnectionType.SERIAL and (not self.ser or not self.ser.is_open):
self.connect()
def _send_data(self, data: bytes):
"""Send data via the active connection."""
if self.connection_type == ConnectionType.TCP:
self.sock.sendall(data)
elif self.connection_type == ConnectionType.SERIAL:
self.ser.write(data)
else:
raise ValueError(f"Unsupported connection type: {self.connection_type}")
def _receive_data(self, size: int) -> bytes:
"""Receive data via the active connection."""
if self.connection_type == ConnectionType.TCP:
return self.sock.recv(size)
elif self.connection_type == ConnectionType.SERIAL:
return self.ser.read(size)
else:
raise ValueError(f"Unsupported connection type: {self.connection_type}")
# -- Core Send/Receive -----------------------------------------------------
def send_command(self, command_code: str, value: str) -> bool:
"""
Send a set command to the Smart Board.
Returns True if valid (+), False if invalid (-).
Raises ConnectionError on network issues, CommandError on unknown response.
"""
self._ensure_connection()
# Protocol format: [length][tv_id][command_type][command_code][value][CR]
# Example: 801s!000\r where command_code 21(hex) = !(ASCII)
# Length includes the length byte itself: 1 + tv_id(2) + command_type(1) + command_code(1) + value(3) = 8
if len(command_code) != 2:
raise ValueError("command_code must be 2 ASCII chars (hex).")
if len(value) != 3:
raise ValueError("value must be 3 ASCII chars.")
# Convert hex command code to ASCII character
try:
command_code_int = int(command_code, 16)
command_code_ascii = bytes([command_code_int])
except ValueError:
raise ValueError(f"Invalid hex command code: {command_code}")
# Length = length_byte(1) + tv_id(2) + command_type(1) + command_code(1) + value(3) = 8 bytes total
length_ascii = b'8'
tv_id_ascii = b'01'
command_type_ascii = b's' # Set command is 's'
value_ascii = value.encode('ascii')
try:
packet = length_ascii + tv_id_ascii + command_type_ascii + command_code_ascii + value_ascii + self.CR
_log_packet("Sent", packet)
self._send_data(packet)
response = self._receive_response()
logging.debug(f"Received response: {response}")
if response == b'+':
return True
elif response == b'-':
return False
else:
# Unknown response type
raise CommandError(f"Unknown response type: {response}")
except (socket.error, serial.SerialException) as e:
self.disconnect()
logging.exception("Communication error during send_command.")
raise ConnectionError(f"Failed to send command: {e}")
def get_command(self, command_code: str) -> str:
"""
Send a get command to the Smart Board.
Returns the 3-byte value as a string if the device replies with data,
or raises CommandError/ConnectionError on issues.
"""
self._ensure_connection()
# Convert hex command code to ASCII character
try:
command_code_int = int(command_code, 16)
command_code_ascii = bytes([command_code_int])
except ValueError:
raise ValueError(f"Invalid hex command code: {command_code}")
# For get command: length + tv_id + command_type + command_code + CR
# Length = length_byte(1) + tv_id(2) + command_type(1) + command_code(1) = 5 bytes total
length_ascii = b'5'
tv_id_ascii = b'01'
command_type_ascii = b'g' # Get command is 'g'
try:
logging.debug(f"Sending get command: code={command_code}")
packet = length_ascii + tv_id_ascii + command_type_ascii + command_code_ascii + self.CR
_log_packet("Sent", packet)
self._send_data(packet)
response, value = self._receive_get_response()
logging.debug(f"Received get response: {response}, value: {value}")
if response == b'r':
return value.decode('ascii')
raise CommandError(f"Unexpected get response type: {response}")
except (socket.error, serial.SerialException) as e:
self.disconnect()
logging.exception("Communication error during get_command.")
raise ConnectionError(f"Failed to send get command: {e}")
def _receive_response(self):
"""
Receive response for a Set command.
Expected format: [length_ascii][tv_id_ascii][response_type_ascii][CR]
"""
try:
# Read until we get CR (0x0D) or timeout
response_data = b''
while True:
byte = self._receive_data(1)
if not byte:
raise CommandError("No response from Smart Board.")
response_data += byte
if byte == self.CR:
break
if len(response_data) > 10: # Prevent infinite loop
raise CommandError("Response too long from Smart Board.")
_log_packet("Received", response_data)
if len(response_data) < 4: # minimum: length + tv_id + response_type + CR
raise CommandError("Incomplete response from Smart Board (set).")
# Parse ASCII response: skip length and tv_id, get response type
response_type = response_data[3:4] # Should be '+' or '-'
return response_type
except (socket.timeout, serial.SerialTimeoutException):
raise ConnectionError("Timed out waiting for response (set).")
def _receive_get_response(self):
"""
Receive response for a Get command.
Expected format: [length_ascii][tv_id_ascii][response_type_ascii][value_ascii][CR]
"""
try:
# Read until we get CR (0x0D) or timeout
response_data = b''
while True:
byte = self._receive_data(1)
if not byte:
raise CommandError("No response from Smart Board.")
response_data += byte
if byte == self.CR:
break
if len(response_data) > 20: # Prevent infinite loop
raise CommandError("Response too long from Smart Board.")
_log_packet("Received", response_data)
if len(response_data) < 4: # minimum: length + tv_id + response_type + CR
raise CommandError("Incomplete response from Smart Board (get).")
# Parse ASCII response
response_type = response_data[3:4] # Should be 'r' for get response
if response_type == b'r' and len(response_data) >= 8: # length + tv_id + 'r' + 3-byte value + CR
value = response_data[4:7] # 3-byte ASCII value
return response_type, value
else:
return response_type, b''
except (socket.timeout, serial.SerialTimeoutException):
raise ConnectionError("Timed out waiting for response (get).")
# -- Full Set of Commands (from your Protocol) -----------------------------
# Power commands
def set_power(self, state: PowerState) -> bool:
return self.send_command('21', state.value)
def get_power(self) -> str:
return self.get_command('21')
# Video Source
def set_video_source(self, source: VideoSource) -> bool:
return self.send_command('22', source.value)
def get_video_source(self) -> str:
return self.get_command('22')
# Contrast
def set_contrast(self, value: int) -> bool:
if not (0 <= value <= 100):
raise ValueError("Contrast must be 0..100")
return self.send_command('23', f"{value:03}")
def get_contrast(self) -> str:
return self.get_command('23')
# Brightness
def set_brightness(self, value: int) -> bool:
if not (0 <= value <= 100):
raise ValueError("Brightness must be 0..100")
return self.send_command('24', f"{value:03}")
def get_brightness(self) -> str:
return self.get_command('24')
# Sharpness
def set_sharpness(self, value: int) -> bool:
if not (0 <= value <= 100):
raise ValueError("Sharpness must be 0..100")
return self.send_command('25', f"{value:03}")
def get_sharpness(self) -> str:
return self.get_command('25')
# Picture reset
def reset_picture(self) -> bool:
return self.send_command('26', '000')
# Aspect ratio
def set_aspect_ratio(self, ratio: AspectRatio) -> bool:
return self.send_command('31', ratio.value)
def get_aspect_ratio(self) -> str:
return self.get_command('31')
# Language
def set_language(self, lang: Language) -> bool:
return self.send_command('32', lang.value)
def get_language(self) -> str:
return self.get_command('32')
# Sound mode
def set_sound_mode(self, mode: SoundMode) -> bool:
return self.send_command('33', mode.value)
def get_sound_mode(self) -> str:
return self.get_command('33')
# Volume
def set_volume(self, value: int) -> bool:
if not (0 <= value <= 100):
raise ValueError("Volume must be 0..100")
return self.send_command('35', f"{value:03}")
def get_volume(self) -> str:
return self.get_command('35')
# Mute
def set_mute(self, mute: bool) -> bool:
return self.send_command('36', '001' if mute else '000')
def get_mute(self) -> str:
return self.get_command('36')
# Balance
def set_balance(self, value: int) -> bool:
if not (0 <= value <= 100):
raise ValueError("Balance must be 0..100")
return self.send_command('39', f"{value:03}")
def get_balance(self) -> str:
return self.get_command('39')
# Sound reset
def reset_sound(self) -> bool:
return self.send_command('3B', '000')
# Remote command
def send_remote_command(self, command: RemoteCommand) -> bool:
return self.send_command('40', command.value)
# IR control
def set_ir_control(self, control: IRControl) -> bool:
return self.send_command('42', control.value)
def get_ir_control(self) -> str:
return self.get_command('42')
# Button & IR control
def set_button_ir_control(self, control: ButtonIRControl) -> bool:
return self.send_command('43', control.value)
def get_button_ir_control(self) -> str:
return self.get_command('43')
# Button control
def set_button_control(self, control: ButtonControl) -> bool:
return self.send_command('45', control.value)
def get_button_control(self) -> str:
return self.get_command('45')
# Pixel shift
def set_pixel_shift(self, shift: PixelShift) -> bool:
return self.send_command('47', shift.value)
def get_pixel_shift(self) -> str:
return self.get_command('47')
# Screen reset
def reset_screen(self) -> bool:
return self.send_command('7F', '000')
# All reset
def reset_all(self) -> bool:
return self.send_command('7E', '000')
# Picture mode
def set_picture_mode(self, mode: PictureMode) -> bool:
return self.send_command('81', mode.value)
def get_picture_mode(self) -> str:
return self.get_command('81')
# Saturation
def set_saturation(self, value: int) -> bool:
if not (0 <= value <= 100):
raise ValueError("Saturation must be 0..100")
return self.send_command('82', f"{value:03}")
def get_saturation(self) -> str:
return self.get_command('82')
# Hue
def set_hue(self, value: int) -> bool:
if not (0 <= value <= 100):
raise ValueError("Hue must be 0..100")
return self.send_command('83', f"{value:03}")
def get_hue(self) -> str:
return self.get_command('83')
# Backlight
def set_backlight(self, value: int) -> bool:
if not (0 <= value <= 100):
raise ValueError("Backlight must be 0..100")
return self.send_command('84', f"{value:03}")
def get_backlight(self) -> str:
return self.get_command('84')
# DCR
def set_dcr(self, mode: DCR) -> bool:
return self.send_command('85', mode.value)
def get_dcr(self) -> str:
return self.get_command('85')
# Color temperature
def set_color_temp(self, temp: ColorTemp) -> bool:
return self.send_command('86', temp.value)
def get_color_temp(self) -> str:
return self.get_command('86')
# RTC
def set_rtc(self, year: int, month: int, day: int, hour: int, minute: int) -> bool:
if not (0 <= year <= 99):
raise ValueError("Year must be 0..99")
if not (1 <= month <= 12):
raise ValueError("Month must be 1..12")
if not (1 <= day <= 31):
raise ValueError("Day must be 1..31")
if not (0 <= hour <= 23):
raise ValueError("Hour must be 0..23")
if not (0 <= minute <= 59):
raise ValueError("Minute must be 0..59")
ok_y = self.send_command('98', f"{year:03}")
ok_m = self.send_command('99', f"{month:03}")
ok_d = self.send_command('9A', f"{day:03}")
ok_h = self.send_command('9B', f"{hour:03}")
ok_min = self.send_command('9C', f"{minute:03}")
return all([ok_y, ok_m, ok_d, ok_h, ok_min])
def get_rtc(self) -> Dict[str, str]:
return {
"year": self.get_command('98'),
"month": self.get_command('99'),
"day": self.get_command('9A'),
"hour": self.get_command('9B'),
"minute": self.get_command('9C'),
}
# Power save
def set_power_save(self, mode: PowerSaveMode) -> bool:
return self.send_command('A9', mode.value)
def get_power_save(self) -> str:
return self.get_command('A9')
# Switch on status
def set_switch_on_status(self, status: SwitchOnStatus) -> bool:
return self.send_command('AB', status.value)
def get_switch_on_status(self) -> str:
return self.get_command('AB')
# On/Off Timer
def set_on_off_timer(
self,
timer_number: int,
enable: bool,
on_enable: bool,
off_enable: bool,
days: int,
on_hour: int,
on_minute: int,
off_hour: int,
off_minute: int,
video_source: VideoSource
) -> bool:
# Per specification
if not (1 <= timer_number <= 7):
raise ValueError("Timer number must be 1..7")
if not (0 <= days <= 255):
raise ValueError("Days must be 0..255 (bitmask).")
if not (0 <= on_hour <= 23):
raise ValueError("On hour must be 0..23.")
if not (0 <= on_minute <= 59):
raise ValueError("On minute must be 0..59.")
if not (0 <= off_hour <= 23):
raise ValueError("Off hour must be 0..23.")
if not (0 <= off_minute <= 59):
raise ValueError("Off minute must be 0..59.")
byte1 = (timer_number & 0x07)
if enable:
byte1 |= (1 << 6)
if on_enable:
byte1 |= (1 << 5)
if off_enable:
byte1 |= (1 << 4)
byte2 = days & 0xFF
byte3 = on_hour & 0xFF
byte4 = on_minute & 0xFF
byte5 = off_hour & 0xFF
byte6 = off_minute & 0xFF
# video_source is a string hex in the standard commands, e.g. '000' => 0x00, '001' => 0x01
# But for E0, we parse the hex:
vs_hex = int(video_source.value, 16)
byte7 = vs_hex & 0xFF
byte8 = 0x00
byte9 = 0x00
# Convert 9 bytes to hex string
raw = bytes([byte1, byte2, byte3, byte4, byte5, byte6, byte7, byte8, byte9])
value_str = "".join(f"{b:02X}" for b in raw) # E.g. "050018000019000000"
return self.send_command('E0', value_str)
def get_on_off_timer(self, timer_number: int) -> Dict[str, Any]:
"""
Example implementation for reading a timer.
Protocol details are not fully specified, so we demonstrate a potential approach.
"""
if not (1 <= timer_number <= 7):
raise ValueError("Timer number must be 1..7")
self._ensure_connection()
try:
# Attempt a custom get packet for 'E0' with the selected timer in Byte1
length = 6
byte1 = timer_number & 0x07
packet = struct.pack(
">B2s1s2s1s",
length,
self.TV_ID,
b'g',
b'E0',
self.CR
)
self.sock.sendall(packet)
response_header = self.sock.recv(5)
if len(response_header) < 5:
raise CommandError("Incomplete timer response header.")
cmd_type = response_header[3:4]
if cmd_type != b'r':
raise CommandError(f"Unexpected response type for timer: {cmd_type}")
# Read next 9 bytes for the timer data
value_bytes = self.sock.recv(9)
if len(value_bytes) < 9:
raise CommandError("Incomplete timer data.")
byte1, byte2, byte3, byte4, byte5, byte6, byte7, byte8, byte9 = value_bytes
timer_info = {
"timer_number": byte1 & 0x07,
"enabled": bool((byte1 >> 6) & 0x01),
"on_enabled": bool((byte1 >> 5) & 0x01),
"off_enabled": bool((byte1 >> 4) & 0x01),
"days": byte2,
"on_hour": byte3,
"on_minute": byte4,
"off_hour": byte5,
"off_minute": byte6,
"video_source": f"{byte7:02X}",
"reserved": (byte8, byte9),
}
return timer_info
except socket.error as e:
self.disconnect()
raise ConnectionError(f"Failed to get On/Off Timer: {e}")