import binascii from time import sleep import serial import threading from kuukar.kuukar_config import SERIAL_LCD DATATYPE_NUMBER = '71' DATATYPE_STRING = '70' PRESSTYPE_PRESS = '01' PRESSTYPE_RELEASE = '00' EVENT_TOUCH = '65' class nextion: __commands_output_buffer__ = [] allow_serial_read = True def __init__(self) -> None: self.device = serial.Serial(SERIAL_LCD, baudrate=115200, timeout=15) self.__send_stop_bit() self.reset_device() threading.Thread(target=self.__serial_reader).start() threading.Thread(target=self.__serial_writer__).start() def send_command(self, command: str): command = bytes(command, 'ascii') self.__commands_output_buffer__.append(command) def __serial_writer__(self): while True: if len(self.__commands_output_buffer__) > 0: self.device.write(self.__commands_output_buffer__.pop(0)) self.__send_stop_bit() sleep(0.05) def reset_device(self): self.send_command("rest") def __get_stop_bit(self) -> bytearray: stop_bit = bytearray() stop_bit.append(0xFF) stop_bit.append(0xFF) stop_bit.append(0XFF) return stop_bit def __send_stop_bit(self): self.device.write(self.__get_stop_bit()) def __serial_reader(self) -> None: ser = self.device while True: while self.allow_serial_read: buffer_length = ser.in_waiting if buffer_length > 0: self.handle_serial(self.read_serial()) sleep(0.05) def read_serial(self) -> list: ser = self.device data = ser.read_until(self.__get_stop_bit()) data = data[0:len(data)-3] data: list = data.hex('/').split('/') return data def handle_serial(self, data: list): message_type = data.pop(0) if message_type == EVENT_TOUCH: self.handle_touch_event(data) def handle_touch_event(self, data: list): page = int(data.pop(0), 16) component_id = int(data.pop(0), 16) press_type = data.pop(0) self.external_touch_handler(page,component_id,press_type) #print("Got a touch event at page "+str(page)+" in component "+str(component_id) # + " with type "+str(press_type)) def external_touch_handler(self, page, component_id, press_type): # To be overridden pass def get_attribute(self, attribute: str): self.allow_serial_read = False self.send_command("get "+str(attribute)) sleep(0.01) data = self.read_serial() self.allow_serial_read = True data_type = data.pop(0) if data_type == DATATYPE_STRING: data = nextion.hex_string_list_to_bytes(data) return data.decode('ascii') elif data_type == DATATYPE_NUMBER: return nextion.little_endian_to_int(data) @staticmethod def hex_string_list_to_bytes(hexstring_list: list) -> bytes: hexstring = ''.join(hexstring_list) return binascii.unhexlify(hexstring) @staticmethod def little_endian_to_int(data: list) -> int: data.reverse() hexstring = ''.join(data) value = int(hexstring, 16) return value