From b0790f0068909b936a0d8c15268b25a636fefd2a Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Sat, 1 Mar 2025 01:25:35 +0700 Subject: [PATCH] socket testing --- .../benq_smartboard/nolib_test.py | 42 ++--- .../benq_smartboard/socket_connect.sh | 13 ++ .../benq_smartboard/socket_test.py | 148 ++++++++++++++++++ 3 files changed, 177 insertions(+), 26 deletions(-) create mode 100755 custom_components/benq_smartboard/socket_connect.sh create mode 100644 custom_components/benq_smartboard/socket_test.py diff --git a/custom_components/benq_smartboard/nolib_test.py b/custom_components/benq_smartboard/nolib_test.py index e0d4ff3..3d19177 100644 --- a/custom_components/benq_smartboard/nolib_test.py +++ b/custom_components/benq_smartboard/nolib_test.py @@ -6,7 +6,7 @@ import logging logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') # Configuration for the Smart Board: -IP_ADDRESS = "10.10.4.173" # Replace with your board's IP +IP_ADDRESS = "10.10.4.58" # Replace with your board's IP PORT = 4660 TIMEOUT = 5.0 @@ -24,39 +24,29 @@ def main(): logging.debug(f"Connecting to {IP_ADDRESS}:{PORT}") sock.connect((IP_ADDRESS, PORT)) logging.info("Connected.") + sock.settimeout(None) # disable timeout for indefinite receive # Build "get_power" command # Protocol: length=5, TV_ID, 'g' (get command), command_code (1 byte) and CR. - command_code = b'\x6C' # Power get command + command_code = b'\x67' # Power get command length = 5 packet = struct.pack(">B2s1s1s1s", length, TV_ID, b'g', command_code, CR) log_packet("Sent", packet) sock.sendall(packet) - time.sleep(1) # Wait a bit for response - - # Receive header first (expected 5 bytes): - header = sock.recv(5) - log_packet("Received", header) - if len(header) < 5: - logging.error("Incomplete header received.") - return - - command_type = header[3:4] - if command_type != b'r': - logging.error(f"Unexpected response type: {command_type}") - return - - # Wait a bit before reading the value bytes. - time.sleep(0.1) - value = sock.recv(3) - log_packet("Received", value) - if len(value) < 3: - logging.error("Incomplete value bytes received.") - return - - power_state = value.decode() - logging.info(f"Power state: {power_state}") + time.sleep(1) # Increased delay to allow board time to respond + + # Loop to receive data indefinitely: + while True: + try: + chunk = sock.recv(1024) + if chunk: + log_packet("Received", chunk) + # Do not break on empty chunk; keep the connection open + except Exception as ex: + logging.exception(f"Error receiving data: {ex}") + # Optionally break on critical errors: + # break except Exception as ex: logging.exception(f"Error during manual test: {ex}") finally: diff --git a/custom_components/benq_smartboard/socket_connect.sh b/custom_components/benq_smartboard/socket_connect.sh new file mode 100755 index 0000000..df033c4 --- /dev/null +++ b/custom_components/benq_smartboard/socket_connect.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +server_address="$1" +port_number="$2" + +if [[ -z "$server_address" || -z "$port_number" ]]; then + echo "Usage: $0 " + exit 1 +fi + +while read -r hex_input; do + echo "$hex_input" | xxd -r -p | socat -u - "TCP:$server_address:$port_number,crnl" | xxd -p +done \ No newline at end of file diff --git a/custom_components/benq_smartboard/socket_test.py b/custom_components/benq_smartboard/socket_test.py new file mode 100644 index 0000000..576fed7 --- /dev/null +++ b/custom_components/benq_smartboard/socket_test.py @@ -0,0 +1,148 @@ +import socket +import logging +import time +import binascii + +logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') + +# Configuration for the Smart Board: +IP_ADDRESS = "10.10.4.58" # Default IP, can be changed via command +PORT = 4660 # Default port, can be changed via command +TIMEOUT = 5.0 + +def log_packet(direction: str, data: bytes): + hex_data = data.hex().upper() + logging.debug(f"{direction} [{len(data)} bytes]: {hex_data}") + +def connect_to_device(ip, port): + """Create and return a connected socket""" + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(TIMEOUT) + + try: + logging.debug(f"Connecting to {ip}:{port}") + sock.connect((ip, port)) + logging.info("Connected.") + return sock + except Exception as ex: + logging.exception(f"Connection failed: {ex}") + return None + +def send_hex_command(sock, hex_string): + """Send a hex command and return any response""" + try: + # Convert hex string to bytes + hex_string = hex_string.replace(" ", "") # Remove any spaces + data = binascii.unhexlify(hex_string) + + print(f"Client Send: {hex_string}") + log_packet("Sent", data) + + sock.sendall(data) + + # Wait for response with timeout + sock.settimeout(TIMEOUT) + response = bytearray() + + # Try to read data until timeout + start_time = time.time() + while time.time() - start_time < TIMEOUT: + try: + chunk = sock.recv(1024) + if not chunk: + break + + response.extend(chunk) + hex_response = chunk.hex().upper() + print(f"Server Recv: {hex_response}") + log_packet("Received", chunk) + + # Small delay to allow more data to arrive if any + time.sleep(0.1) + except socket.timeout: + break + + return response + except binascii.Error: + print("Error: Invalid hexadecimal string") + return None + except Exception as ex: + logging.exception(f"Error sending command: {ex}") + return None + +def print_help(): + """Print available commands""" + print("\nSmart Board Hex Debug Tool") + print("------------------------") + print("Available commands:") + print(" connect - Connect to device (default: 10.10.4.58:4660)") + print(" send - Send hexadecimal data") + print(" help - Show this help") + print(" exit - Exit the program") + print("\nExample:") + print(" connect 10.10.4.58 4660") + print(" send A5F329AA0D") + +def main(): + print_help() + + ip = IP_ADDRESS + port = PORT + sock = None + + while True: + try: + command = input("\n> ").strip() + + if not command: + continue + + parts = command.split() + cmd = parts[0].lower() + + if cmd == "exit": + break + + elif cmd == "help": + print_help() + + elif cmd == "connect": + if sock: + sock.close() + + if len(parts) >= 3: + ip = parts[1] + port = int(parts[2]) + + sock = connect_to_device(ip, port) + if not sock: + print(f"Failed to connect to {ip}:{port}") + + elif cmd == "send": + if not sock: + print("Not connected. Use 'connect' first.") + continue + + if len(parts) < 2: + print("Error: No hex data provided") + continue + + hex_data = parts[1] + send_hex_command(sock, hex_data) + + else: + print(f"Unknown command: {cmd}") + print("Type 'help' for available commands") + + except KeyboardInterrupt: + print("\nExiting...") + break + except Exception as ex: + logging.exception(f"Error: {ex}") + + if sock: + sock.close() + logging.info("Connection closed.") + +if __name__ == '__main__': + main() \ No newline at end of file