59 lines
1.8 KiB
Python
59 lines
1.8 KiB
Python
import socket
|
|
import struct
|
|
import time
|
|
import logging
|
|
|
|
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
# Configuration for the Smart Board:
|
|
IP_ADDRESS = "10.10.4.58" # Replace with your board's IP
|
|
PORT = 4660
|
|
TIMEOUT = 5.0
|
|
|
|
TV_ID = b'01'
|
|
CR = b'\x0D'
|
|
|
|
def log_packet(direction: str, data: bytes):
|
|
hex_data = data.hex().upper()
|
|
logging.debug(f"{direction} [{len(data)} bytes]: {hex_data}")
|
|
|
|
def main():
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.settimeout(TIMEOUT)
|
|
try:
|
|
logging.debug(f"Connecting to {IP_ADDRESS}:{PORT}")
|
|
sock.connect((IP_ADDRESS, PORT))
|
|
logging.info("Connected.")
|
|
sock.settimeout(None) # disable timeout for indefinite receive
|
|
|
|
# Build "get_power" command
|
|
# Protocol: length=5, TV_ID, 'g' (get command), command_code (1 byte) and CR.
|
|
command_code = b'\x67' # Power get command
|
|
length = 5
|
|
packet = struct.pack(">B2s1s1s1s", length, TV_ID, b'g', command_code, CR)
|
|
log_packet("Sent", packet)
|
|
|
|
sock.sendall(packet)
|
|
time.sleep(1) # Increased delay to allow board time to respond
|
|
|
|
# Loop to receive data indefinitely:
|
|
while True:
|
|
try:
|
|
chunk = sock.recv(1024)
|
|
if chunk:
|
|
log_packet("Received", chunk)
|
|
# Do not break on empty chunk; keep the connection open
|
|
except Exception as ex:
|
|
logging.exception(f"Error receiving data: {ex}")
|
|
# Optionally break on critical errors:
|
|
# break
|
|
except Exception as ex:
|
|
logging.exception(f"Error during manual test: {ex}")
|
|
finally:
|
|
logging.debug("Closing socket.")
|
|
sock.close()
|
|
logging.info("Connection closed.")
|
|
|
|
if __name__ == '__main__':
|
|
main()
|