benq-rm-homeassistant/custom_components/benq_smartboard/benq_smartboard_lib.py

670 lines
20 KiB
Python
Raw Permalink Normal View History

2025-01-25 12:42:38 +00:00
"""
A comprehensive Python library for controlling BenQ Smart Board via RS232-over-LAN.
Implements all protocol commands from the official specification.
"""
import socket
import struct
from typing import Optional, Dict, Any
from enum import Enum, unique
# -- Exceptions ----------------------------------------------------------------
class BenQSmartBoardError(Exception):
"""Base exception for BenQ Smart Board errors."""
pass
class ConnectionError(BenQSmartBoardError):
"""Exception for connection-related errors."""
pass
class CommandError(BenQSmartBoardError):
"""Exception for invalid or failed commands."""
pass
# -- Enums for Commands --------------------------------------------------------
@unique
class PowerState(Enum):
OFF = '000'
ON = '001'
STANDBY = '002'
REBOOT = '003'
@unique
class VideoSource(Enum):
VGA = '000'
HDMI = '001'
HDMI1 = '002'
HDMI2 = '021'
DISPLAY_PORT = '007'
TYPE_C = '051'
ANDROID = '101'
OPS = '102'
DEFAULT = 'FF' # used in E0 timer to restore last signal, etc.
@unique
class AspectRatio(Enum):
SIXTEEN_NINE = '000'
PTP = '002'
@unique
class Language(Enum):
ENGLISH = '000'
FRANCAIS = '001'
ESPANOL = '002'
FANTZHONG = '003'
JIANZHONG = '004'
PORTUGUES = '005'
GERMAN = '006'
DUTCH = '007'
POLISH = '008'
RUSSIA = '009'
CZECH = '010'
DANISH = '011'
SWEDISH = '012'
ITALIAN = '013'
ROMANIAN = '014'
NORWEGIAN = '015'
FINNISH = '016'
GREEK = '017'
TURKISH = '018'
ARABIC = '019'
JAPANSE = '020'
THAILAND = '021'
KOREAN = '022'
HUNGARIAN = '023'
PERSIAN = '024'
VIETNAMESE = '025'
INDONESIA = '026'
HEBREW = '027'
@unique
class SoundMode(Enum):
CINEMA = '000'
STANDARD = '001'
MUSIC = '003'
NEWS = '005'
SPORTS = '006'
@unique
class RemoteCommand(Enum):
VOL_PLUS = '000'
VOL_MINUS = '001'
REMOTE_UP = '010'
REMOTE_DOWN = '011'
REMOTE_LEFT = '012'
REMOTE_RIGHT = '013'
REMOTE_OK = '014'
REMOTE_MENU_KEY = '020'
REMOTE_EXIT = '022'
BLANK = '031'
FREEZE = '032'
@unique
class IRControl(Enum):
DISABLE = '000'
ENABLE = '001'
@unique
class ButtonIRControl(Enum):
DISABLE = '000'
ENABLE = '001'
@unique
class ButtonControl(Enum):
DISABLE = '000'
ENABLE = '001'
@unique
class PixelShift(Enum):
OFF = '000'
ON = '001'
@unique
class PictureMode(Enum):
STANDARD = '000'
BRIGHT = '001'
SOFT = '002'
ECO = '003'
CUSTOM_1 = '005'
CUSTOM_2 = '006'
CUSTOM_3 = '007'
@unique
class DCR(Enum):
OFF = '000'
ON = '001'
@unique
class ColorTemp(Enum):
COOL = '000'
NORMAL = '001'
WARM = '002'
@unique
class PowerSaveMode(Enum):
OFF = '000'
LOW = '001'
HIGH = '002'
@unique
class SwitchOnStatus(Enum):
POWER_OFF = '000'
FORCE_ON = '001'
LAST_STATUS = '002'
# -- The Main BenQSmartBoard Class --------------------------------------------
class BenQSmartBoard:
"""
A client for controlling BenQ Smart Boards via LAN (RS232-over-TCP).
Implements all commands from the official protocol specification.
"""
DEFAULT_PORT = 4660
TV_ID = b'01'
CR = b'\x0D'
def __init__(self, ip: str, port: int = DEFAULT_PORT, timeout: float = 5.0):
"""
:param ip: IP address of the BenQ Smart Board
:param port: TCP port, default 4660
:param timeout: socket timeout in seconds
"""
self.ip = ip
self.port = port
self.timeout = timeout
self.sock: Optional[socket.socket] = None
# -- Connection Management -------------------------------------------------
def connect(self):
"""Establish a TCP connection to the Smart Board."""
if self.sock is not None:
# Already connected or trying
return
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(self.timeout)
self.sock.connect((self.ip, self.port))
except socket.error as e:
self.sock = None
raise ConnectionError(f"Failed to connect: {e}")
def disconnect(self):
"""Close the TCP connection."""
if self.sock:
try:
self.sock.close()
except socket.error:
pass
self.sock = None
def _ensure_connection(self):
"""Helper to ensure we are connected; reconnect if needed."""
if not self.sock:
self.connect()
# -- Core Send/Receive -----------------------------------------------------
def send_command(self, command_code: str, value: str) -> bool:
"""
Send a set command to the Smart Board.
Returns True if valid (+), False if invalid (-).
Raises ConnectionError on network issues, CommandError on unknown response.
"""
self._ensure_connection()
# Protocol requires length=8 (excluding CR), "TV ID=01", command type='s' (0x73)
# Then command_code (1 byte ASCII) + value (3 bytes ASCII)
if len(command_code) != 2:
raise ValueError("command_code must be 2 ASCII chars (hex).")
if len(value) != 3:
raise ValueError("value must be 3 ASCII chars.")
length = 8 # as per spec
try:
packet = struct.pack(
">B2s1s2s3s1s",
length,
self.TV_ID,
b's', # Set command
command_code.encode(),
value.encode(),
self.CR
)
self.sock.sendall(packet)
response = self._receive_response()
if response == b'+':
return True
elif response == b'-':
return False
else:
# Unknown response type
raise CommandError(f"Unknown response type: {response}")
except socket.error as e:
self.disconnect()
raise ConnectionError(f"Failed to send command: {e}")
def get_command(self, command_code: str) -> str:
"""
Send a get command to the Smart Board.
Returns the 3-byte value as a string if the device replies with data,
or raises CommandError/ConnectionError on issues.
"""
self._ensure_connection()
# For get command, protocol is less documented in your specification,
# but we assume length=6, "TV ID=01", command type='g' (0x67),
# then command_code (2 ASCII) + some placeholder for value?
# We'll assume a 3-byte response for the value after the response header.
length = 6
try:
packet = struct.pack(
">B2s1s2s1s",
length,
self.TV_ID,
b'g',
command_code.encode(),
self.CR
)
self.sock.sendall(packet)
response, value = self._receive_get_response()
if response == b'r':
return value.decode()
raise CommandError(f"Unexpected get response type: {response}")
except socket.error as e:
self.disconnect()
raise ConnectionError(f"Failed to send get command: {e}")
def _receive_response(self):
"""
Receive the standard 5-byte response for a Set command:
[ length(1) | ID(2) | command_type(1) | CR(1) ]
command_type should be '+' or '-'.
"""
try:
resp = self.sock.recv(5)
if len(resp) < 5:
raise CommandError("Incomplete response from Smart Board (set).")
# parse
# >B2s1s1s
# But effectively, we only need the command_type at index 3
length = resp[0]
command_type = resp[3:4]
return command_type
except socket.timeout:
raise ConnectionError("Timed out waiting for response (set).")
def _receive_get_response(self):
"""
Receive the standard 5-byte response header, then a 3-byte value if command_type='r'.
Header structure: [ length(1) | ID(2) | command_type(1) | ???(1)??? Actually length might be 5 bytes total ]
We interpret the protocol to have an additional 3 bytes for the value if command_type='r'.
Return (command_type, value_bytes).
"""
try:
header = self.sock.recv(5)
if len(header) < 5:
raise CommandError("Incomplete response from Smart Board (get header).")
length = header[0]
command_type = header[3:4] # e.g. b'r'
# Now read next 3 bytes if it's a valid get reply
if command_type == b'r':
value = self.sock.recv(3)
if len(value) < 3:
raise CommandError("Incomplete 3-byte value in get response.")
return command_type, value
else:
# No further data expected if it's not 'r'
return command_type, b''
except socket.timeout:
raise ConnectionError("Timed out waiting for response (get).")
# -- Full Set of Commands (from your Protocol) -----------------------------
# Power commands
def set_power(self, state: PowerState) -> bool:
return self.send_command('21', state.value)
def get_power(self) -> str:
return self.get_command('21')
# Video Source
def set_video_source(self, source: VideoSource) -> bool:
return self.send_command('22', source.value)
def get_video_source(self) -> str:
return self.get_command('22')
# Contrast
def set_contrast(self, value: int) -> bool:
if not (0 <= value <= 100):
raise ValueError("Contrast must be 0..100")
return self.send_command('23', f"{value:03}")
def get_contrast(self) -> str:
return self.get_command('23')
# Brightness
def set_brightness(self, value: int) -> bool:
if not (0 <= value <= 100):
raise ValueError("Brightness must be 0..100")
return self.send_command('24', f"{value:03}")
def get_brightness(self) -> str:
return self.get_command('24')
# Sharpness
def set_sharpness(self, value: int) -> bool:
if not (0 <= value <= 100):
raise ValueError("Sharpness must be 0..100")
return self.send_command('25', f"{value:03}")
def get_sharpness(self) -> str:
return self.get_command('25')
# Picture reset
def reset_picture(self) -> bool:
return self.send_command('26', '000')
# Aspect ratio
def set_aspect_ratio(self, ratio: AspectRatio) -> bool:
return self.send_command('31', ratio.value)
def get_aspect_ratio(self) -> str:
return self.get_command('31')
# Language
def set_language(self, lang: Language) -> bool:
return self.send_command('32', lang.value)
def get_language(self) -> str:
return self.get_command('32')
# Sound mode
def set_sound_mode(self, mode: SoundMode) -> bool:
return self.send_command('33', mode.value)
def get_sound_mode(self) -> str:
return self.get_command('33')
# Volume
def set_volume(self, value: int) -> bool:
if not (0 <= value <= 100):
raise ValueError("Volume must be 0..100")
return self.send_command('35', f"{value:03}")
def get_volume(self) -> str:
return self.get_command('35')
# Mute
def set_mute(self, mute: bool) -> bool:
return self.send_command('36', '001' if mute else '000')
def get_mute(self) -> str:
return self.get_command('36')
# Balance
def set_balance(self, value: int) -> bool:
if not (0 <= value <= 100):
raise ValueError("Balance must be 0..100")
return self.send_command('39', f"{value:03}")
def get_balance(self) -> str:
return self.get_command('39')
# Sound reset
def reset_sound(self) -> bool:
return self.send_command('3B', '000')
# Remote command
def send_remote_command(self, command: RemoteCommand) -> bool:
return self.send_command('40', command.value)
# IR control
def set_ir_control(self, control: IRControl) -> bool:
return self.send_command('42', control.value)
def get_ir_control(self) -> str:
return self.get_command('42')
# Button & IR control
def set_button_ir_control(self, control: ButtonIRControl) -> bool:
return self.send_command('43', control.value)
def get_button_ir_control(self) -> str:
return self.get_command('43')
# Button control
def set_button_control(self, control: ButtonControl) -> bool:
return self.send_command('45', control.value)
def get_button_control(self) -> str:
return self.get_command('45')
# Pixel shift
def set_pixel_shift(self, shift: PixelShift) -> bool:
return self.send_command('47', shift.value)
def get_pixel_shift(self) -> str:
return self.get_command('47')
# Screen reset
def reset_screen(self) -> bool:
return self.send_command('7F', '000')
# All reset
def reset_all(self) -> bool:
return self.send_command('7E', '000')
# Picture mode
def set_picture_mode(self, mode: PictureMode) -> bool:
return self.send_command('81', mode.value)
def get_picture_mode(self) -> str:
return self.get_command('81')
# Saturation
def set_saturation(self, value: int) -> bool:
if not (0 <= value <= 100):
raise ValueError("Saturation must be 0..100")
return self.send_command('82', f"{value:03}")
def get_saturation(self) -> str:
return self.get_command('82')
# Hue
def set_hue(self, value: int) -> bool:
if not (0 <= value <= 100):
raise ValueError("Hue must be 0..100")
return self.send_command('83', f"{value:03}")
def get_hue(self) -> str:
return self.get_command('83')
# Backlight
def set_backlight(self, value: int) -> bool:
if not (0 <= value <= 100):
raise ValueError("Backlight must be 0..100")
return self.send_command('84', f"{value:03}")
def get_backlight(self) -> str:
return self.get_command('84')
# DCR
def set_dcr(self, mode: DCR) -> bool:
return self.send_command('85', mode.value)
def get_dcr(self) -> str:
return self.get_command('85')
# Color temperature
def set_color_temp(self, temp: ColorTemp) -> bool:
return self.send_command('86', temp.value)
def get_color_temp(self) -> str:
return self.get_command('86')
# RTC
def set_rtc(self, year: int, month: int, day: int, hour: int, minute: int) -> bool:
if not (0 <= year <= 99):
raise ValueError("Year must be 0..99")
if not (1 <= month <= 12):
raise ValueError("Month must be 1..12")
if not (1 <= day <= 31):
raise ValueError("Day must be 1..31")
if not (0 <= hour <= 23):
raise ValueError("Hour must be 0..23")
if not (0 <= minute <= 59):
raise ValueError("Minute must be 0..59")
ok_y = self.send_command('98', f"{year:03}")
ok_m = self.send_command('99', f"{month:03}")
ok_d = self.send_command('9A', f"{day:03}")
ok_h = self.send_command('9B', f"{hour:03}")
ok_min = self.send_command('9C', f"{minute:03}")
return all([ok_y, ok_m, ok_d, ok_h, ok_min])
def get_rtc(self) -> Dict[str, str]:
return {
"year": self.get_command('98'),
"month": self.get_command('99'),
"day": self.get_command('9A'),
"hour": self.get_command('9B'),
"minute": self.get_command('9C'),
}
# Power save
def set_power_save(self, mode: PowerSaveMode) -> bool:
return self.send_command('A9', mode.value)
def get_power_save(self) -> str:
return self.get_command('A9')
# Switch on status
def set_switch_on_status(self, status: SwitchOnStatus) -> bool:
return self.send_command('AB', status.value)
def get_switch_on_status(self) -> str:
return self.get_command('AB')
# On/Off Timer
def set_on_off_timer(
self,
timer_number: int,
enable: bool,
on_enable: bool,
off_enable: bool,
days: int,
on_hour: int,
on_minute: int,
off_hour: int,
off_minute: int,
video_source: VideoSource
) -> bool:
# Per specification
if not (1 <= timer_number <= 7):
raise ValueError("Timer number must be 1..7")
if not (0 <= days <= 255):
raise ValueError("Days must be 0..255 (bitmask).")
if not (0 <= on_hour <= 23):
raise ValueError("On hour must be 0..23.")
if not (0 <= on_minute <= 59):
raise ValueError("On minute must be 0..59.")
if not (0 <= off_hour <= 23):
raise ValueError("Off hour must be 0..23.")
if not (0 <= off_minute <= 59):
raise ValueError("Off minute must be 0..59.")
byte1 = (timer_number & 0x07)
if enable:
byte1 |= (1 << 6)
if on_enable:
byte1 |= (1 << 5)
if off_enable:
byte1 |= (1 << 4)
byte2 = days & 0xFF
byte3 = on_hour & 0xFF
byte4 = on_minute & 0xFF
byte5 = off_hour & 0xFF
byte6 = off_minute & 0xFF
# video_source is a string hex in the standard commands, e.g. '000' => 0x00, '001' => 0x01
# But for E0, we parse the hex:
vs_hex = int(video_source.value, 16)
byte7 = vs_hex & 0xFF
byte8 = 0x00
byte9 = 0x00
# Convert 9 bytes to hex string
raw = bytes([byte1, byte2, byte3, byte4, byte5, byte6, byte7, byte8, byte9])
value_str = "".join(f"{b:02X}" for b in raw) # E.g. "050018000019000000"
return self.send_command('E0', value_str)
def get_on_off_timer(self, timer_number: int) -> Dict[str, Any]:
"""
Example implementation for reading a timer.
Protocol details are not fully specified, so we demonstrate a potential approach.
"""
if not (1 <= timer_number <= 7):
raise ValueError("Timer number must be 1..7")
self._ensure_connection()
try:
# Attempt a custom get packet for 'E0' with the selected timer in Byte1
length = 6
byte1 = timer_number & 0x07
packet = struct.pack(
">B2s1s2s1s",
length,
self.TV_ID,
b'g',
b'E0',
self.CR
)
self.sock.sendall(packet)
response_header = self.sock.recv(5)
if len(response_header) < 5:
raise CommandError("Incomplete timer response header.")
cmd_type = response_header[3:4]
if cmd_type != b'r':
raise CommandError(f"Unexpected response type for timer: {cmd_type}")
# Read next 9 bytes for the timer data
value_bytes = self.sock.recv(9)
if len(value_bytes) < 9:
raise CommandError("Incomplete timer data.")
byte1, byte2, byte3, byte4, byte5, byte6, byte7, byte8, byte9 = value_bytes
timer_info = {
"timer_number": byte1 & 0x07,
"enabled": bool((byte1 >> 6) & 0x01),
"on_enabled": bool((byte1 >> 5) & 0x01),
"off_enabled": bool((byte1 >> 4) & 0x01),
"days": byte2,
"on_hour": byte3,
"on_minute": byte4,
"off_hour": byte5,
"off_minute": byte6,
"video_source": f"{byte7:02X}",
"reserved": (byte8, byte9),
}
return timer_info
except socket.error as e:
self.disconnect()
raise ConnectionError(f"Failed to get On/Off Timer: {e}")