99 lines
3.0 KiB
Python
99 lines
3.0 KiB
Python
import binascii
|
|
from time import sleep
|
|
import serial
|
|
import threading
|
|
|
|
from kuukar_config import SERIAL_LCD
|
|
|
|
DATATYPE_NUMBER = '71'
|
|
DATATYPE_STRING = '70'
|
|
|
|
PRESSTYPE_PRESS = '01'
|
|
PRESSTYPE_RELEASE = '00'
|
|
|
|
EVENT_TOUCH = '65'
|
|
|
|
class nextion:
|
|
|
|
__commands_output_buffer__ = []
|
|
|
|
def __init__(self) -> None:
|
|
self.device = serial.Serial(SERIAL_LCD, baudrate=19200, timeout=5)
|
|
self.__send_stop_bit()
|
|
self.reset_device()
|
|
threading.Thread(target=self.__serial_reader).start()
|
|
threading.Thread(target=self.__serial_writer__).start()
|
|
|
|
def send_command(self, command: str):
|
|
command = bytes(command, 'ascii')
|
|
self.__commands_output_buffer__.append(command)
|
|
|
|
def __serial_writer__(self):
|
|
if len(self.__commands_output_buffer__) > 0:
|
|
self.device.write(self.__commands_output_buffer__.pop(0))
|
|
self.__send_stop_bit()
|
|
|
|
def reset_device(self):
|
|
self.send_command("rest")
|
|
|
|
def __get_stop_bit(self) -> bytearray:
|
|
stop_bit = bytearray()
|
|
stop_bit.append(0xFF)
|
|
stop_bit.append(0xFF)
|
|
stop_bit.append(0XFF)
|
|
return stop_bit
|
|
|
|
def __send_stop_bit(self):
|
|
self.device.write(self.__get_stop_bit())
|
|
|
|
def __serial_reader(self) -> None:
|
|
ser = self.device
|
|
while True:
|
|
if self.allow_serial_read:
|
|
buffer_length = ser.in_waiting
|
|
if buffer_length > 0:
|
|
self.handle_serial(self.read_serial())
|
|
|
|
def read_serial(self) -> list:
|
|
ser = self.device
|
|
data = ser.read_until(self.get_stop_bit())
|
|
data = data[0:len(data)-3]
|
|
data: list = data.hex('/').split('/')
|
|
return data
|
|
|
|
def handle_serial(self, data: list):
|
|
message_type = data.pop(0)
|
|
if message_type == EVENT_TOUCH:
|
|
self.handle_touch_event(data)
|
|
|
|
def handle_touch_event(self, data: list):
|
|
page = int(data.pop(0), 16)
|
|
component_id = int(data.pop(0), 16)
|
|
press_type = data.pop(0)
|
|
print("Got a touch event at page "+str(page)+" in component "+str(component_id)
|
|
+ " with type "+str(press_type))
|
|
|
|
def get_attribute(self, attribute: str):
|
|
self.allow_serial_read = False
|
|
self.send_command("get "+str(attribute))
|
|
sleep(0.1)
|
|
data = self.read_serial()
|
|
self.allow_serial_read = True
|
|
data_type = data.pop(0)
|
|
if data_type == DATATYPE_STRING:
|
|
data = nextion.hex_string_list_to_bytes(data)
|
|
return data.decode('ascii')
|
|
elif data_type == DATATYPE_NUMBER:
|
|
return nextion.little_endian_to_int(data)
|
|
|
|
@staticmethod
|
|
def hex_string_list_to_bytes(hexstring_list: list) -> bytes:
|
|
hexstring = ''.join(hexstring_list)
|
|
return binascii.unhexlify(hexstring)
|
|
|
|
@staticmethod
|
|
def little_endian_to_int(data: list) -> int:
|
|
data.reverse()
|
|
hexstring = ''.join(data)
|
|
value = int(hexstring, 16)
|
|
return value |