""" A comprehensive Python library for controlling BenQ Smart Board via RS232-over-LAN. Implements all protocol commands from the official specification. """ import socket import struct from typing import Optional, Dict, Any from enum import Enum, unique # -- Exceptions ---------------------------------------------------------------- class BenQSmartBoardError(Exception): """Base exception for BenQ Smart Board errors.""" pass class ConnectionError(BenQSmartBoardError): """Exception for connection-related errors.""" pass class CommandError(BenQSmartBoardError): """Exception for invalid or failed commands.""" pass # -- Enums for Commands -------------------------------------------------------- @unique class PowerState(Enum): OFF = '000' ON = '001' STANDBY = '002' REBOOT = '003' @unique class VideoSource(Enum): VGA = '000' HDMI = '001' HDMI1 = '002' HDMI2 = '021' DISPLAY_PORT = '007' TYPE_C = '051' ANDROID = '101' OPS = '102' DEFAULT = 'FF' # used in E0 timer to restore last signal, etc. @unique class AspectRatio(Enum): SIXTEEN_NINE = '000' PTP = '002' @unique class Language(Enum): ENGLISH = '000' FRANCAIS = '001' ESPANOL = '002' FANTZHONG = '003' JIANZHONG = '004' PORTUGUES = '005' GERMAN = '006' DUTCH = '007' POLISH = '008' RUSSIA = '009' CZECH = '010' DANISH = '011' SWEDISH = '012' ITALIAN = '013' ROMANIAN = '014' NORWEGIAN = '015' FINNISH = '016' GREEK = '017' TURKISH = '018' ARABIC = '019' JAPANSE = '020' THAILAND = '021' KOREAN = '022' HUNGARIAN = '023' PERSIAN = '024' VIETNAMESE = '025' INDONESIA = '026' HEBREW = '027' @unique class SoundMode(Enum): CINEMA = '000' STANDARD = '001' MUSIC = '003' NEWS = '005' SPORTS = '006' @unique class RemoteCommand(Enum): VOL_PLUS = '000' VOL_MINUS = '001' REMOTE_UP = '010' REMOTE_DOWN = '011' REMOTE_LEFT = '012' REMOTE_RIGHT = '013' REMOTE_OK = '014' REMOTE_MENU_KEY = '020' REMOTE_EXIT = '022' BLANK = '031' FREEZE = '032' @unique class IRControl(Enum): DISABLE = '000' ENABLE = '001' @unique class ButtonIRControl(Enum): DISABLE = '000' ENABLE = '001' @unique class ButtonControl(Enum): DISABLE = '000' ENABLE = '001' @unique class PixelShift(Enum): OFF = '000' ON = '001' @unique class PictureMode(Enum): STANDARD = '000' BRIGHT = '001' SOFT = '002' ECO = '003' CUSTOM_1 = '005' CUSTOM_2 = '006' CUSTOM_3 = '007' @unique class DCR(Enum): OFF = '000' ON = '001' @unique class ColorTemp(Enum): COOL = '000' NORMAL = '001' WARM = '002' @unique class PowerSaveMode(Enum): OFF = '000' LOW = '001' HIGH = '002' @unique class SwitchOnStatus(Enum): POWER_OFF = '000' FORCE_ON = '001' LAST_STATUS = '002' # -- The Main BenQSmartBoard Class -------------------------------------------- class BenQSmartBoard: """ A client for controlling BenQ Smart Boards via LAN (RS232-over-TCP). Implements all commands from the official protocol specification. """ DEFAULT_PORT = 4660 TV_ID = b'01' CR = b'\x0D' def __init__(self, ip: str, port: int = DEFAULT_PORT, timeout: float = 5.0): """ :param ip: IP address of the BenQ Smart Board :param port: TCP port, default 4660 :param timeout: socket timeout in seconds """ self.ip = ip self.port = port self.timeout = timeout self.sock: Optional[socket.socket] = None # -- Connection Management ------------------------------------------------- def connect(self): """Establish a TCP connection to the Smart Board.""" if self.sock is not None: # Already connected or trying return try: self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.settimeout(self.timeout) self.sock.connect((self.ip, self.port)) except socket.error as e: self.sock = None raise ConnectionError(f"Failed to connect: {e}") def disconnect(self): """Close the TCP connection.""" if self.sock: try: self.sock.close() except socket.error: pass self.sock = None def _ensure_connection(self): """Helper to ensure we are connected; reconnect if needed.""" if not self.sock: self.connect() # -- Core Send/Receive ----------------------------------------------------- def send_command(self, command_code: str, value: str) -> bool: """ Send a set command to the Smart Board. Returns True if valid (+), False if invalid (-). Raises ConnectionError on network issues, CommandError on unknown response. """ self._ensure_connection() # Protocol requires length=8 (excluding CR), "TV ID=01", command type='s' (0x73) # Then command_code (1 byte ASCII) + value (3 bytes ASCII) if len(command_code) != 2: raise ValueError("command_code must be 2 ASCII chars (hex).") if len(value) != 3: raise ValueError("value must be 3 ASCII chars.") length = 8 # as per spec try: packet = struct.pack( ">B2s1s2s3s1s", length, self.TV_ID, b's', # Set command command_code.encode(), value.encode(), self.CR ) self.sock.sendall(packet) response = self._receive_response() if response == b'+': return True elif response == b'-': return False else: # Unknown response type raise CommandError(f"Unknown response type: {response}") except socket.error as e: self.disconnect() raise ConnectionError(f"Failed to send command: {e}") def get_command(self, command_code: str) -> str: """ Send a get command to the Smart Board. Returns the 3-byte value as a string if the device replies with data, or raises CommandError/ConnectionError on issues. """ self._ensure_connection() # For get command, protocol is less documented in your specification, # but we assume length=6, "TV ID=01", command type='g' (0x67), # then command_code (2 ASCII) + some placeholder for value? # We'll assume a 3-byte response for the value after the response header. length = 6 try: packet = struct.pack( ">B2s1s2s1s", length, self.TV_ID, b'g', command_code.encode(), self.CR ) self.sock.sendall(packet) response, value = self._receive_get_response() if response == b'r': return value.decode() raise CommandError(f"Unexpected get response type: {response}") except socket.error as e: self.disconnect() raise ConnectionError(f"Failed to send get command: {e}") def _receive_response(self): """ Receive the standard 5-byte response for a Set command: [ length(1) | ID(2) | command_type(1) | CR(1) ] command_type should be '+' or '-'. """ try: resp = self.sock.recv(5) if len(resp) < 5: raise CommandError("Incomplete response from Smart Board (set).") # parse # >B2s1s1s # But effectively, we only need the command_type at index 3 length = resp[0] command_type = resp[3:4] return command_type except socket.timeout: raise ConnectionError("Timed out waiting for response (set).") def _receive_get_response(self): """ Receive the standard 5-byte response header, then a 3-byte value if command_type='r'. Header structure: [ length(1) | ID(2) | command_type(1) | ???(1)??? Actually length might be 5 bytes total ] We interpret the protocol to have an additional 3 bytes for the value if command_type='r'. Return (command_type, value_bytes). """ try: header = self.sock.recv(5) if len(header) < 5: raise CommandError("Incomplete response from Smart Board (get header).") length = header[0] command_type = header[3:4] # e.g. b'r' # Now read next 3 bytes if it's a valid get reply if command_type == b'r': value = self.sock.recv(3) if len(value) < 3: raise CommandError("Incomplete 3-byte value in get response.") return command_type, value else: # No further data expected if it's not 'r' return command_type, b'' except socket.timeout: raise ConnectionError("Timed out waiting for response (get).") # -- Full Set of Commands (from your Protocol) ----------------------------- # Power commands def set_power(self, state: PowerState) -> bool: return self.send_command('21', state.value) def get_power(self) -> str: return self.get_command('21') # Video Source def set_video_source(self, source: VideoSource) -> bool: return self.send_command('22', source.value) def get_video_source(self) -> str: return self.get_command('22') # Contrast def set_contrast(self, value: int) -> bool: if not (0 <= value <= 100): raise ValueError("Contrast must be 0..100") return self.send_command('23', f"{value:03}") def get_contrast(self) -> str: return self.get_command('23') # Brightness def set_brightness(self, value: int) -> bool: if not (0 <= value <= 100): raise ValueError("Brightness must be 0..100") return self.send_command('24', f"{value:03}") def get_brightness(self) -> str: return self.get_command('24') # Sharpness def set_sharpness(self, value: int) -> bool: if not (0 <= value <= 100): raise ValueError("Sharpness must be 0..100") return self.send_command('25', f"{value:03}") def get_sharpness(self) -> str: return self.get_command('25') # Picture reset def reset_picture(self) -> bool: return self.send_command('26', '000') # Aspect ratio def set_aspect_ratio(self, ratio: AspectRatio) -> bool: return self.send_command('31', ratio.value) def get_aspect_ratio(self) -> str: return self.get_command('31') # Language def set_language(self, lang: Language) -> bool: return self.send_command('32', lang.value) def get_language(self) -> str: return self.get_command('32') # Sound mode def set_sound_mode(self, mode: SoundMode) -> bool: return self.send_command('33', mode.value) def get_sound_mode(self) -> str: return self.get_command('33') # Volume def set_volume(self, value: int) -> bool: if not (0 <= value <= 100): raise ValueError("Volume must be 0..100") return self.send_command('35', f"{value:03}") def get_volume(self) -> str: return self.get_command('35') # Mute def set_mute(self, mute: bool) -> bool: return self.send_command('36', '001' if mute else '000') def get_mute(self) -> str: return self.get_command('36') # Balance def set_balance(self, value: int) -> bool: if not (0 <= value <= 100): raise ValueError("Balance must be 0..100") return self.send_command('39', f"{value:03}") def get_balance(self) -> str: return self.get_command('39') # Sound reset def reset_sound(self) -> bool: return self.send_command('3B', '000') # Remote command def send_remote_command(self, command: RemoteCommand) -> bool: return self.send_command('40', command.value) # IR control def set_ir_control(self, control: IRControl) -> bool: return self.send_command('42', control.value) def get_ir_control(self) -> str: return self.get_command('42') # Button & IR control def set_button_ir_control(self, control: ButtonIRControl) -> bool: return self.send_command('43', control.value) def get_button_ir_control(self) -> str: return self.get_command('43') # Button control def set_button_control(self, control: ButtonControl) -> bool: return self.send_command('45', control.value) def get_button_control(self) -> str: return self.get_command('45') # Pixel shift def set_pixel_shift(self, shift: PixelShift) -> bool: return self.send_command('47', shift.value) def get_pixel_shift(self) -> str: return self.get_command('47') # Screen reset def reset_screen(self) -> bool: return self.send_command('7F', '000') # All reset def reset_all(self) -> bool: return self.send_command('7E', '000') # Picture mode def set_picture_mode(self, mode: PictureMode) -> bool: return self.send_command('81', mode.value) def get_picture_mode(self) -> str: return self.get_command('81') # Saturation def set_saturation(self, value: int) -> bool: if not (0 <= value <= 100): raise ValueError("Saturation must be 0..100") return self.send_command('82', f"{value:03}") def get_saturation(self) -> str: return self.get_command('82') # Hue def set_hue(self, value: int) -> bool: if not (0 <= value <= 100): raise ValueError("Hue must be 0..100") return self.send_command('83', f"{value:03}") def get_hue(self) -> str: return self.get_command('83') # Backlight def set_backlight(self, value: int) -> bool: if not (0 <= value <= 100): raise ValueError("Backlight must be 0..100") return self.send_command('84', f"{value:03}") def get_backlight(self) -> str: return self.get_command('84') # DCR def set_dcr(self, mode: DCR) -> bool: return self.send_command('85', mode.value) def get_dcr(self) -> str: return self.get_command('85') # Color temperature def set_color_temp(self, temp: ColorTemp) -> bool: return self.send_command('86', temp.value) def get_color_temp(self) -> str: return self.get_command('86') # RTC def set_rtc(self, year: int, month: int, day: int, hour: int, minute: int) -> bool: if not (0 <= year <= 99): raise ValueError("Year must be 0..99") if not (1 <= month <= 12): raise ValueError("Month must be 1..12") if not (1 <= day <= 31): raise ValueError("Day must be 1..31") if not (0 <= hour <= 23): raise ValueError("Hour must be 0..23") if not (0 <= minute <= 59): raise ValueError("Minute must be 0..59") ok_y = self.send_command('98', f"{year:03}") ok_m = self.send_command('99', f"{month:03}") ok_d = self.send_command('9A', f"{day:03}") ok_h = self.send_command('9B', f"{hour:03}") ok_min = self.send_command('9C', f"{minute:03}") return all([ok_y, ok_m, ok_d, ok_h, ok_min]) def get_rtc(self) -> Dict[str, str]: return { "year": self.get_command('98'), "month": self.get_command('99'), "day": self.get_command('9A'), "hour": self.get_command('9B'), "minute": self.get_command('9C'), } # Power save def set_power_save(self, mode: PowerSaveMode) -> bool: return self.send_command('A9', mode.value) def get_power_save(self) -> str: return self.get_command('A9') # Switch on status def set_switch_on_status(self, status: SwitchOnStatus) -> bool: return self.send_command('AB', status.value) def get_switch_on_status(self) -> str: return self.get_command('AB') # On/Off Timer def set_on_off_timer( self, timer_number: int, enable: bool, on_enable: bool, off_enable: bool, days: int, on_hour: int, on_minute: int, off_hour: int, off_minute: int, video_source: VideoSource ) -> bool: # Per specification if not (1 <= timer_number <= 7): raise ValueError("Timer number must be 1..7") if not (0 <= days <= 255): raise ValueError("Days must be 0..255 (bitmask).") if not (0 <= on_hour <= 23): raise ValueError("On hour must be 0..23.") if not (0 <= on_minute <= 59): raise ValueError("On minute must be 0..59.") if not (0 <= off_hour <= 23): raise ValueError("Off hour must be 0..23.") if not (0 <= off_minute <= 59): raise ValueError("Off minute must be 0..59.") byte1 = (timer_number & 0x07) if enable: byte1 |= (1 << 6) if on_enable: byte1 |= (1 << 5) if off_enable: byte1 |= (1 << 4) byte2 = days & 0xFF byte3 = on_hour & 0xFF byte4 = on_minute & 0xFF byte5 = off_hour & 0xFF byte6 = off_minute & 0xFF # video_source is a string hex in the standard commands, e.g. '000' => 0x00, '001' => 0x01 # But for E0, we parse the hex: vs_hex = int(video_source.value, 16) byte7 = vs_hex & 0xFF byte8 = 0x00 byte9 = 0x00 # Convert 9 bytes to hex string raw = bytes([byte1, byte2, byte3, byte4, byte5, byte6, byte7, byte8, byte9]) value_str = "".join(f"{b:02X}" for b in raw) # E.g. "050018000019000000" return self.send_command('E0', value_str) def get_on_off_timer(self, timer_number: int) -> Dict[str, Any]: """ Example implementation for reading a timer. Protocol details are not fully specified, so we demonstrate a potential approach. """ if not (1 <= timer_number <= 7): raise ValueError("Timer number must be 1..7") self._ensure_connection() try: # Attempt a custom get packet for 'E0' with the selected timer in Byte1 length = 6 byte1 = timer_number & 0x07 packet = struct.pack( ">B2s1s2s1s", length, self.TV_ID, b'g', b'E0', self.CR ) self.sock.sendall(packet) response_header = self.sock.recv(5) if len(response_header) < 5: raise CommandError("Incomplete timer response header.") cmd_type = response_header[3:4] if cmd_type != b'r': raise CommandError(f"Unexpected response type for timer: {cmd_type}") # Read next 9 bytes for the timer data value_bytes = self.sock.recv(9) if len(value_bytes) < 9: raise CommandError("Incomplete timer data.") byte1, byte2, byte3, byte4, byte5, byte6, byte7, byte8, byte9 = value_bytes timer_info = { "timer_number": byte1 & 0x07, "enabled": bool((byte1 >> 6) & 0x01), "on_enabled": bool((byte1 >> 5) & 0x01), "off_enabled": bool((byte1 >> 4) & 0x01), "days": byte2, "on_hour": byte3, "on_minute": byte4, "off_hour": byte5, "off_minute": byte6, "video_source": f"{byte7:02X}", "reserved": (byte8, byte9), } return timer_info except socket.error as e: self.disconnect() raise ConnectionError(f"Failed to get On/Off Timer: {e}")