logging & nolib test
This commit is contained in:
parent
5f500e3da2
commit
11c4d4acd9
|
@ -0,0 +1,2 @@
|
|||
.DS_Store
|
||||
*.pyc
|
|
@ -4,6 +4,7 @@ Implements all protocol commands from the official specification.
|
|||
"""
|
||||
import socket
|
||||
import struct
|
||||
import logging # New import for logging
|
||||
from typing import Optional, Dict, Any
|
||||
from enum import Enum, unique
|
||||
|
||||
|
@ -158,6 +159,11 @@ class SwitchOnStatus(Enum):
|
|||
|
||||
# -- The Main BenQSmartBoard Class --------------------------------------------
|
||||
|
||||
def _log_packet(direction: str, data: bytes):
|
||||
"""Log packet data in hex format."""
|
||||
hex_data = data.hex().upper()
|
||||
logging.debug(f"{direction} [{len(data)} bytes]: {hex_data}")
|
||||
|
||||
class BenQSmartBoard:
|
||||
"""
|
||||
A client for controlling BenQ Smart Boards via LAN (RS232-over-TCP).
|
||||
|
@ -184,25 +190,28 @@ class BenQSmartBoard:
|
|||
def connect(self):
|
||||
"""Establish a TCP connection to the Smart Board."""
|
||||
if self.sock is not None:
|
||||
# Already connected or trying
|
||||
return
|
||||
|
||||
try:
|
||||
logging.debug(f"Connecting to Smart Board at {self.ip}:{self.port}") # New logging line
|
||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.sock.settimeout(self.timeout)
|
||||
self.sock.connect((self.ip, self.port))
|
||||
logging.info("Connection established successfully.") # New logging line
|
||||
except socket.error as e:
|
||||
self.sock = None
|
||||
logging.exception("Failed to connect to Smart Board.") # New logging line
|
||||
raise ConnectionError(f"Failed to connect: {e}")
|
||||
|
||||
def disconnect(self):
|
||||
"""Close the TCP connection."""
|
||||
if self.sock:
|
||||
try:
|
||||
logging.debug("Closing connection to Smart Board.") # New logging line
|
||||
self.sock.close()
|
||||
except socket.error:
|
||||
pass
|
||||
self.sock = None
|
||||
logging.info("Disconnected from Smart Board.") # New logging line
|
||||
|
||||
def _ensure_connection(self):
|
||||
"""Helper to ensure we are connected; reconnect if needed."""
|
||||
|
@ -237,9 +246,11 @@ class BenQSmartBoard:
|
|||
value.encode(),
|
||||
self.CR
|
||||
)
|
||||
_log_packet("Sent", packet) # Log sent packet
|
||||
self.sock.sendall(packet)
|
||||
|
||||
response = self._receive_response()
|
||||
logging.debug(f"Received response: {response}") # New logging line
|
||||
if response == b'+':
|
||||
return True
|
||||
elif response == b'-':
|
||||
|
@ -249,6 +260,7 @@ class BenQSmartBoard:
|
|||
raise CommandError(f"Unknown response type: {response}")
|
||||
except socket.error as e:
|
||||
self.disconnect()
|
||||
logging.exception("Socket error during send_command.") # New logging line
|
||||
raise ConnectionError(f"Failed to send command: {e}")
|
||||
|
||||
def get_command(self, command_code: str) -> str:
|
||||
|
@ -266,6 +278,7 @@ class BenQSmartBoard:
|
|||
|
||||
length = 6
|
||||
try:
|
||||
logging.debug(f"Sending get command: code={command_code}") # New logging line
|
||||
packet = struct.pack(
|
||||
">B2s1s2s1s",
|
||||
length,
|
||||
|
@ -274,14 +287,17 @@ class BenQSmartBoard:
|
|||
command_code.encode(),
|
||||
self.CR
|
||||
)
|
||||
_log_packet("Sent", packet) # Log sent packet
|
||||
self.sock.sendall(packet)
|
||||
|
||||
response, value = self._receive_get_response()
|
||||
logging.debug(f"Received get response: {response}, value: {value}") # New logging line
|
||||
if response == b'r':
|
||||
return value.decode()
|
||||
raise CommandError(f"Unexpected get response type: {response}")
|
||||
except socket.error as e:
|
||||
self.disconnect()
|
||||
logging.exception("Socket error during get_command.") # New logging line
|
||||
raise ConnectionError(f"Failed to send get command: {e}")
|
||||
|
||||
def _receive_response(self):
|
||||
|
@ -292,8 +308,10 @@ class BenQSmartBoard:
|
|||
"""
|
||||
try:
|
||||
resp = self.sock.recv(5)
|
||||
_log_packet("Received", resp) # Log received data
|
||||
if len(resp) < 5:
|
||||
raise CommandError("Incomplete response from Smart Board (set).")
|
||||
logging.debug(f"Raw set response data: {resp}") # New logging line
|
||||
# parse
|
||||
# >B2s1s1s
|
||||
# But effectively, we only need the command_type at index 3
|
||||
|
@ -313,14 +331,17 @@ class BenQSmartBoard:
|
|||
"""
|
||||
try:
|
||||
header = self.sock.recv(5)
|
||||
_log_packet("Received", header) # Log header
|
||||
if len(header) < 5:
|
||||
raise CommandError("Incomplete response from Smart Board (get header).")
|
||||
logging.debug(f"Raw get response header: {header}") # New logging line
|
||||
|
||||
length = header[0]
|
||||
command_type = header[3:4] # e.g. b'r'
|
||||
# Now read next 3 bytes if it's a valid get reply
|
||||
if command_type == b'r':
|
||||
value = self.sock.recv(3)
|
||||
_log_packet("Received", value) # Log value bytes
|
||||
if len(value) < 3:
|
||||
raise CommandError("Incomplete 3-byte value in get response.")
|
||||
return command_type, value
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
import socket
|
||||
import struct
|
||||
import time
|
||||
import logging
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
# Configuration for the Smart Board:
|
||||
IP_ADDRESS = "10.10.4.173" # Replace with your board's IP
|
||||
PORT = 4660
|
||||
TIMEOUT = 5.0
|
||||
|
||||
TV_ID = b'01'
|
||||
CR = b'\x0D'
|
||||
|
||||
def log_packet(direction: str, data: bytes):
|
||||
hex_data = data.hex().upper()
|
||||
logging.debug(f"{direction} [{len(data)} bytes]: {hex_data}")
|
||||
|
||||
def main():
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.settimeout(TIMEOUT)
|
||||
try:
|
||||
logging.debug(f"Connecting to {IP_ADDRESS}:{PORT}")
|
||||
sock.connect((IP_ADDRESS, PORT))
|
||||
logging.info("Connected.")
|
||||
|
||||
# Build "get_power" command
|
||||
# Protocol: length=5, TV_ID, 'g' (get command), command_code (1 byte) and CR.
|
||||
command_code = b'\x6C' # Power get command
|
||||
length = 5
|
||||
packet = struct.pack(">B2s1s1s1s", length, TV_ID, b'g', command_code, CR)
|
||||
log_packet("Sent", packet)
|
||||
|
||||
sock.sendall(packet)
|
||||
time.sleep(1) # Wait a bit for response
|
||||
|
||||
# Receive header first (expected 5 bytes):
|
||||
header = sock.recv(5)
|
||||
log_packet("Received", header)
|
||||
if len(header) < 5:
|
||||
logging.error("Incomplete header received.")
|
||||
return
|
||||
|
||||
command_type = header[3:4]
|
||||
if command_type != b'r':
|
||||
logging.error(f"Unexpected response type: {command_type}")
|
||||
return
|
||||
|
||||
# Wait a bit before reading the value bytes.
|
||||
time.sleep(0.1)
|
||||
value = sock.recv(3)
|
||||
log_packet("Received", value)
|
||||
if len(value) < 3:
|
||||
logging.error("Incomplete value bytes received.")
|
||||
return
|
||||
|
||||
power_state = value.decode()
|
||||
logging.info(f"Power state: {power_state}")
|
||||
except Exception as ex:
|
||||
logging.exception(f"Error during manual test: {ex}")
|
||||
finally:
|
||||
logging.debug("Closing socket.")
|
||||
sock.close()
|
||||
logging.info("Connection closed.")
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
|
@ -0,0 +1,41 @@
|
|||
import logging
|
||||
from benq_smartboard_lib import BenQSmartBoard, ConnectionError, CommandError, PowerState
|
||||
|
||||
# Configure detailed debug logging.
|
||||
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
# Configurable parameters; update these variables with your Smart Board details.
|
||||
IP_ADDRESS = "10.10.4.173" # Replace with your Smart Board IP.
|
||||
PORT = 4660
|
||||
|
||||
def run_test():
|
||||
board = BenQSmartBoard(ip=IP_ADDRESS, port=PORT)
|
||||
try:
|
||||
logging.debug('Attempting to connect to the Smart Board.')
|
||||
board.connect()
|
||||
logging.info('Connected successfully.')
|
||||
|
||||
# Get the current power state.
|
||||
logging.debug('Sending get_power command.')
|
||||
current_power = board.get_power()
|
||||
logging.info(f'Current power state: {current_power}')
|
||||
|
||||
# Toggle power state for test purposes.
|
||||
logging.debug('Toggling power state.')
|
||||
if current_power.strip() == PowerState.OFF.value:
|
||||
logging.info("Setting power state to ON.")
|
||||
success = board.set_power(PowerState.ON)
|
||||
else:
|
||||
logging.info("Setting power state to OFF.")
|
||||
success = board.set_power(PowerState.OFF)
|
||||
logging.info(f'set_power success: {success}')
|
||||
|
||||
except (ConnectionError, CommandError, Exception) as ex:
|
||||
logging.exception(f'Exception during Smart Board interaction: {ex}')
|
||||
finally:
|
||||
logging.debug('Disconnecting from Smart Board.')
|
||||
board.disconnect()
|
||||
logging.info('Disconnected.')
|
||||
|
||||
# Execute the test.
|
||||
run_test()
|
Loading…
Reference in New Issue