From 70786734bd9a70efac955c22d888c51d6dea5523 Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Fri, 6 Jun 2025 17:05:15 +0700 Subject: [PATCH] working on off --- .../benq_smartboard/benq_smartboard_lib.py | 139 ++++++++++-------- .../benq_smartboard/serial_test.py | 93 +----------- .../benq_smartboard/serial_test_nolib.py | 0 3 files changed, 81 insertions(+), 151 deletions(-) create mode 100644 custom_components/benq_smartboard/serial_test_nolib.py diff --git a/custom_components/benq_smartboard/benq_smartboard_lib.py b/custom_components/benq_smartboard/benq_smartboard_lib.py index bf18d6b..faab2e9 100644 --- a/custom_components/benq_smartboard/benq_smartboard_lib.py +++ b/custom_components/benq_smartboard/benq_smartboard_lib.py @@ -323,24 +323,29 @@ class BenQSmartBoard: """ self._ensure_connection() - # Protocol requires length=8 (excluding CR), "TV ID=01", command type='s' (0x73) - # Then command_code (1 byte ASCII) + value (3 bytes ASCII) + # Protocol format: [length][tv_id][command_type][command_code][value][CR] + # Example: 801s!000\r where command_code 21(hex) = !(ASCII) + # Length includes the length byte itself: 1 + tv_id(2) + command_type(1) + command_code(1) + value(3) = 8 if len(command_code) != 2: raise ValueError("command_code must be 2 ASCII chars (hex).") if len(value) != 3: raise ValueError("value must be 3 ASCII chars.") - length = 8 # as per spec + # Convert hex command code to ASCII character try: - packet = struct.pack( - ">B2s1s2s3s1s", - length, - self.TV_ID, - b's', # Set command - command_code.encode(), - value.encode(), - self.CR - ) + command_code_int = int(command_code, 16) + command_code_ascii = bytes([command_code_int]) + except ValueError: + raise ValueError(f"Invalid hex command code: {command_code}") + + # Length = length_byte(1) + tv_id(2) + command_type(1) + command_code(1) + value(3) = 8 bytes total + length_ascii = b'8' + tv_id_ascii = b'01' + command_type_ascii = b's' # Set command is 's' + value_ascii = value.encode('ascii') + + try: + packet = length_ascii + tv_id_ascii + command_type_ascii + command_code_ascii + value_ascii + self.CR _log_packet("Sent", packet) self._send_data(packet) @@ -366,29 +371,29 @@ class BenQSmartBoard: """ self._ensure_connection() - # For get command, protocol is less documented in your specification, - # but we assume length=6, "TV ID=01", command type='g' (0x67), - # then command_code (2 ASCII) + some placeholder for value? - # We'll assume a 3-byte response for the value after the response header. + # Convert hex command code to ASCII character + try: + command_code_int = int(command_code, 16) + command_code_ascii = bytes([command_code_int]) + except ValueError: + raise ValueError(f"Invalid hex command code: {command_code}") - length = 6 + # For get command: length + tv_id + command_type + command_code + CR + # Length = length_byte(1) + tv_id(2) + command_type(1) + command_code(1) = 5 bytes total + length_ascii = b'5' + tv_id_ascii = b'01' + command_type_ascii = b'g' # Get command is 'g' + try: logging.debug(f"Sending get command: code={command_code}") - packet = struct.pack( - ">B2s1s2s1s", - length, - self.TV_ID, - b'g', - command_code.encode(), - self.CR - ) + packet = length_ascii + tv_id_ascii + command_type_ascii + command_code_ascii + self.CR _log_packet("Sent", packet) self._send_data(packet) response, value = self._receive_get_response() logging.debug(f"Received get response: {response}, value: {value}") if response == b'r': - return value.decode() + return value.decode('ascii') raise CommandError(f"Unexpected get response type: {response}") except (socket.error, serial.SerialException) as e: self.disconnect() @@ -397,55 +402,61 @@ class BenQSmartBoard: def _receive_response(self): """ - Receive the standard 5-byte response for a Set command: - [ length(1) | ID(2) | command_type(1) | CR(1) ] - command_type should be '+' or '-'. - - We interpret the protocol to have an additional 3 bytes for the value if command_type='r'. - Return (command_type, value_bytes). + Receive response for a Set command. + Expected format: [length_ascii][tv_id_ascii][response_type_ascii][CR] """ try: - resp = self._receive_data(5) - _log_packet("Received", resp) - if len(resp) < 5: + # Read until we get CR (0x0D) or timeout + response_data = b'' + while True: + byte = self._receive_data(1) + if not byte: + raise CommandError("No response from Smart Board.") + response_data += byte + if byte == self.CR: + break + if len(response_data) > 10: # Prevent infinite loop + raise CommandError("Response too long from Smart Board.") + + _log_packet("Received", response_data) + if len(response_data) < 4: # minimum: length + tv_id + response_type + CR raise CommandError("Incomplete response from Smart Board (set).") - logging.debug(f"Raw set response data: {resp}") - # parse - # >B2s1s1s - # But effectively, we only need the command_type at index 3 - length = resp[0] - command_type = resp[3:4] - return command_type + + # Parse ASCII response: skip length and tv_id, get response type + response_type = response_data[3:4] # Should be '+' or '-' + return response_type except (socket.timeout, serial.SerialTimeoutException): raise ConnectionError("Timed out waiting for response (set).") def _receive_get_response(self): """ - Receive the standard 5-byte response header, then a 3-byte value if command_type='r'. - Header structure: [ length(1) | ID(2) | command_type(1) | ???(1)??? Actually length might be 5 bytes total ] - - We interpret the protocol to have an additional 3 bytes for the value if command_type='r'. - Return (command_type, value_bytes). + Receive response for a Get command. + Expected format: [length_ascii][tv_id_ascii][response_type_ascii][value_ascii][CR] """ try: - header = self._receive_data(5) - _log_packet("Received", header) - if len(header) < 5: - raise CommandError("Incomplete response from Smart Board (get header).") - logging.debug(f"Raw get response header: {header}") - - length = header[0] - command_type = header[3:4] # e.g. b'r' - # Now read next 3 bytes if it's a valid get reply - if command_type == b'r': - value = self._receive_data(3) - _log_packet("Received", value) - if len(value) < 3: - raise CommandError("Incomplete 3-byte value in get response.") - return command_type, value + # Read until we get CR (0x0D) or timeout + response_data = b'' + while True: + byte = self._receive_data(1) + if not byte: + raise CommandError("No response from Smart Board.") + response_data += byte + if byte == self.CR: + break + if len(response_data) > 20: # Prevent infinite loop + raise CommandError("Response too long from Smart Board.") + + _log_packet("Received", response_data) + if len(response_data) < 4: # minimum: length + tv_id + response_type + CR + raise CommandError("Incomplete response from Smart Board (get).") + + # Parse ASCII response + response_type = response_data[3:4] # Should be 'r' for get response + if response_type == b'r' and len(response_data) >= 8: # length + tv_id + 'r' + 3-byte value + CR + value = response_data[4:7] # 3-byte ASCII value + return response_type, value else: - # No further data expected if it's not 'r' - return command_type, b'' + return response_type, b'' except (socket.timeout, serial.SerialTimeoutException): raise ConnectionError("Timed out waiting for response (get).") diff --git a/custom_components/benq_smartboard/serial_test.py b/custom_components/benq_smartboard/serial_test.py index 023d051..f355493 100644 --- a/custom_components/benq_smartboard/serial_test.py +++ b/custom_components/benq_smartboard/serial_test.py @@ -8,6 +8,7 @@ import time from benq_smartboard_lib import ( BenQSmartBoard, ConnectionType, + PowerState, PowerState, VideoSource, BenQSmartBoardError @@ -23,7 +24,7 @@ def test_serial_connection(): """Test the serial connection functionality.""" # Configure your serial port here - SERIAL_PORT = 'COM3' # Windows example, use '/dev/ttyUSB0' for Linux + SERIAL_PORT = 'COM11' # Windows example, use '/dev/ttyUSB0' for Linux BAUDRATE = 115200 print(f"Testing BenQ Smart Board via Serial: {SERIAL_PORT} @ {BAUDRATE}") @@ -47,93 +48,11 @@ def test_serial_connection(): # Get current power state try: - power_state = board.get_power() - print(f"Current power state: {power_state}") + print("Expected packet: 801s!000\\r (where 21 hex = ! ASCII, length=8 includes length byte)") + board.set_power(PowerState.ON) + print("Power OFF command sent successfully!") except Exception as e: - print(f"Failed to get power state: {e}") - - # Get current video source - try: - video_source = board.get_video_source() - print(f"Current video source: {video_source}") - except Exception as e: - print(f"Failed to get video source: {e}") - - # Get current volume - try: - volume = board.get_volume() - print(f"Current volume: {volume}") - except Exception as e: - print(f"Failed to get volume: {e}") - - # Get current brightness - try: - brightness = board.get_brightness() - print(f"Current brightness: {brightness}") - except Exception as e: - print(f"Failed to get brightness: {e}") - - print("\n=== Testing Set Commands ===") - - # Test setting volume - try: - print("Setting volume to 50...") - success = board.set_volume(50) - print(f"Set volume result: {'Success' if success else 'Failed'}") - time.sleep(1) - - # Verify the change - new_volume = board.get_volume() - print(f"New volume: {new_volume}") - except Exception as e: - print(f"Failed to set volume: {e}") - - # Test setting brightness - try: - print("Setting brightness to 80...") - success = board.set_brightness(80) - print(f"Set brightness result: {'Success' if success else 'Failed'}") - time.sleep(1) - - # Verify the change - new_brightness = board.get_brightness() - print(f"New brightness: {new_brightness}") - except Exception as e: - print(f"Failed to set brightness: {e}") - - # Test video source switching - try: - print("Switching to HDMI1...") - success = board.set_video_source(VideoSource.HDMI1) - print(f"Set video source result: {'Success' if success else 'Failed'}") - time.sleep(2) - - # Verify the change - new_source = board.get_video_source() - print(f"New video source: {new_source}") - except Exception as e: - print(f"Failed to set video source: {e}") - - print("\n=== Testing Advanced Commands ===") - - # Test RTC reading - try: - rtc_info = board.get_rtc() - print(f"RTC Info: {rtc_info}") - except Exception as e: - print(f"Failed to get RTC info: {e}") - - # Test mute functionality - try: - print("Testing mute...") - success = board.set_mute(True) - print(f"Mute ON result: {'Success' if success else 'Failed'}") - time.sleep(2) - - success = board.set_mute(False) - print(f"Mute OFF result: {'Success' if success else 'Failed'}") - except Exception as e: - print(f"Failed to test mute: {e}") + print(f"Failed to set power OFF: {e}") print("\n=== Test Completed Successfully ===") diff --git a/custom_components/benq_smartboard/serial_test_nolib.py b/custom_components/benq_smartboard/serial_test_nolib.py new file mode 100644 index 0000000..e69de29