socket testing

This commit is contained in:
Siwat Sirichai 2025-03-01 01:25:35 +07:00
parent 11c4d4acd9
commit b0790f0068
3 changed files with 177 additions and 26 deletions

View File

@ -6,7 +6,7 @@ import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
# Configuration for the Smart Board:
IP_ADDRESS = "10.10.4.173" # Replace with your board's IP
IP_ADDRESS = "10.10.4.58" # Replace with your board's IP
PORT = 4660
TIMEOUT = 5.0
@ -24,39 +24,29 @@ def main():
logging.debug(f"Connecting to {IP_ADDRESS}:{PORT}")
sock.connect((IP_ADDRESS, PORT))
logging.info("Connected.")
sock.settimeout(None) # disable timeout for indefinite receive
# Build "get_power" command
# Protocol: length=5, TV_ID, 'g' (get command), command_code (1 byte) and CR.
command_code = b'\x6C' # Power get command
command_code = b'\x67' # Power get command
length = 5
packet = struct.pack(">B2s1s1s1s", length, TV_ID, b'g', command_code, CR)
log_packet("Sent", packet)
sock.sendall(packet)
time.sleep(1) # Wait a bit for response
# Receive header first (expected 5 bytes):
header = sock.recv(5)
log_packet("Received", header)
if len(header) < 5:
logging.error("Incomplete header received.")
return
command_type = header[3:4]
if command_type != b'r':
logging.error(f"Unexpected response type: {command_type}")
return
# Wait a bit before reading the value bytes.
time.sleep(0.1)
value = sock.recv(3)
log_packet("Received", value)
if len(value) < 3:
logging.error("Incomplete value bytes received.")
return
power_state = value.decode()
logging.info(f"Power state: {power_state}")
time.sleep(1) # Increased delay to allow board time to respond
# Loop to receive data indefinitely:
while True:
try:
chunk = sock.recv(1024)
if chunk:
log_packet("Received", chunk)
# Do not break on empty chunk; keep the connection open
except Exception as ex:
logging.exception(f"Error receiving data: {ex}")
# Optionally break on critical errors:
# break
except Exception as ex:
logging.exception(f"Error during manual test: {ex}")
finally:

View File

@ -0,0 +1,13 @@
#!/bin/bash
server_address="$1"
port_number="$2"
if [[ -z "$server_address" || -z "$port_number" ]]; then
echo "Usage: $0 <server_address> <port_number>"
exit 1
fi
while read -r hex_input; do
echo "$hex_input" | xxd -r -p | socat -u - "TCP:$server_address:$port_number,crnl" | xxd -p
done

View File

@ -0,0 +1,148 @@
import socket
import logging
import time
import binascii
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
# Configuration for the Smart Board:
IP_ADDRESS = "10.10.4.58" # Default IP, can be changed via command
PORT = 4660 # Default port, can be changed via command
TIMEOUT = 5.0
def log_packet(direction: str, data: bytes):
hex_data = data.hex().upper()
logging.debug(f"{direction} [{len(data)} bytes]: {hex_data}")
def connect_to_device(ip, port):
"""Create and return a connected socket"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(TIMEOUT)
try:
logging.debug(f"Connecting to {ip}:{port}")
sock.connect((ip, port))
logging.info("Connected.")
return sock
except Exception as ex:
logging.exception(f"Connection failed: {ex}")
return None
def send_hex_command(sock, hex_string):
"""Send a hex command and return any response"""
try:
# Convert hex string to bytes
hex_string = hex_string.replace(" ", "") # Remove any spaces
data = binascii.unhexlify(hex_string)
print(f"Client Send: {hex_string}")
log_packet("Sent", data)
sock.sendall(data)
# Wait for response with timeout
sock.settimeout(TIMEOUT)
response = bytearray()
# Try to read data until timeout
start_time = time.time()
while time.time() - start_time < TIMEOUT:
try:
chunk = sock.recv(1024)
if not chunk:
break
response.extend(chunk)
hex_response = chunk.hex().upper()
print(f"Server Recv: {hex_response}")
log_packet("Received", chunk)
# Small delay to allow more data to arrive if any
time.sleep(0.1)
except socket.timeout:
break
return response
except binascii.Error:
print("Error: Invalid hexadecimal string")
return None
except Exception as ex:
logging.exception(f"Error sending command: {ex}")
return None
def print_help():
"""Print available commands"""
print("\nSmart Board Hex Debug Tool")
print("------------------------")
print("Available commands:")
print(" connect <ip> <port> - Connect to device (default: 10.10.4.58:4660)")
print(" send <hex> - Send hexadecimal data")
print(" help - Show this help")
print(" exit - Exit the program")
print("\nExample:")
print(" connect 10.10.4.58 4660")
print(" send A5F329AA0D")
def main():
print_help()
ip = IP_ADDRESS
port = PORT
sock = None
while True:
try:
command = input("\n> ").strip()
if not command:
continue
parts = command.split()
cmd = parts[0].lower()
if cmd == "exit":
break
elif cmd == "help":
print_help()
elif cmd == "connect":
if sock:
sock.close()
if len(parts) >= 3:
ip = parts[1]
port = int(parts[2])
sock = connect_to_device(ip, port)
if not sock:
print(f"Failed to connect to {ip}:{port}")
elif cmd == "send":
if not sock:
print("Not connected. Use 'connect' first.")
continue
if len(parts) < 2:
print("Error: No hex data provided")
continue
hex_data = parts[1]
send_hex_command(sock, hex_data)
else:
print(f"Unknown command: {cmd}")
print("Type 'help' for available commands")
except KeyboardInterrupt:
print("\nExiting...")
break
except Exception as ex:
logging.exception(f"Error: {ex}")
if sock:
sock.close()
logging.info("Connection closed.")
if __name__ == '__main__':
main()