add initial implementation of the Nextion instruction set
This commit is contained in:
		
							parent
							
								
									3c3010d590
								
							
						
					
					
						commit
						7d1e9f9717
					
				
					 4 changed files with 103 additions and 2 deletions
				
			
		| 
						 | 
					@ -29,3 +29,5 @@ LEDS_DATA_PIN = 3
 | 
				
			||||||
LEDS_NUM = 15
 | 
					LEDS_NUM = 15
 | 
				
			||||||
 | 
					
 | 
				
			||||||
DHT22_PIN = 5
 | 
					DHT22_PIN = 5
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					SERIAL_LCD = "/dev/serial/by-id/..."
 | 
				
			||||||
| 
						 | 
					@ -6,7 +6,6 @@ class cv:
 | 
				
			||||||
    def __init__(self, lcd: lcd, leds: leds) -> None:
 | 
					    def __init__(self, lcd: lcd, leds: leds) -> None:
 | 
				
			||||||
        self.lcd = lcd
 | 
					        self.lcd = lcd
 | 
				
			||||||
        self.leds = leds
 | 
					        self.leds = leds
 | 
				
			||||||
        self.noise = noise
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def capture_image(self):
 | 
					    def capture_image(self):
 | 
				
			||||||
        pass
 | 
					        pass
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,6 +1,7 @@
 | 
				
			||||||
 | 
					import kuukar_nextion
 | 
				
			||||||
class lcd:
 | 
					class lcd:
 | 
				
			||||||
    def __init__(self) -> None:
 | 
					    def __init__(self) -> None:
 | 
				
			||||||
        pass
 | 
					        self.nextion = kuukar_nextion.nextion()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def keke_uwu(self):
 | 
					    def keke_uwu(self):
 | 
				
			||||||
        pass
 | 
					        pass
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,99 @@
 | 
				
			||||||
 | 
					import binascii
 | 
				
			||||||
 | 
					from time import sleep
 | 
				
			||||||
 | 
					import serial
 | 
				
			||||||
 | 
					import threading
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from kuukar_config import SERIAL_LCD
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					DATATYPE_NUMBER = '71'
 | 
				
			||||||
 | 
					DATATYPE_STRING = '70'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					PRESSTYPE_PRESS = '01'
 | 
				
			||||||
 | 
					PRESSTYPE_RELEASE = '00'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					EVENT_TOUCH = '65'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class nextion:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    __commands_output_buffer__ = []
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __init__(self) -> None:
 | 
				
			||||||
 | 
					        self.device = serial.Serial(SERIAL_LCD, baudrate=19200, timeout=5)
 | 
				
			||||||
 | 
					        self.__send_stop_bit__()
 | 
				
			||||||
 | 
					        self.reset_device()
 | 
				
			||||||
 | 
					        threading.Thread(target=self.__serial_reader__).start()
 | 
				
			||||||
 | 
					        threading.Thread(target=self.__serial_writer__).start()
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					    def send_command(self, command: str):
 | 
				
			||||||
 | 
					        command = bytes(command, 'ascii')
 | 
				
			||||||
 | 
					        self.__commands_output_buffer__.append(command)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __serial_writer__(self):
 | 
				
			||||||
 | 
					        if len(self.__commands_output_buffer__) > 0:
 | 
				
			||||||
 | 
					            self.device.write(self.__commands_output_buffer__.pop(0))
 | 
				
			||||||
 | 
					            self.__send_stop_bit__()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def reset_device(self):
 | 
				
			||||||
 | 
					        self.send_command("rest")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __get_stop_bit__(self) -> bytearray:
 | 
				
			||||||
 | 
					        stop_bit = bytearray()
 | 
				
			||||||
 | 
					        stop_bit.append(0xFF)
 | 
				
			||||||
 | 
					        stop_bit.append(0xFF)
 | 
				
			||||||
 | 
					        stop_bit.append(0XFF)
 | 
				
			||||||
 | 
					        return stop_bit
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __send_stop_bit__(self):
 | 
				
			||||||
 | 
					        self.device.write(self.__get_stop_bit__())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __serial_reader__(self) -> None:
 | 
				
			||||||
 | 
					        ser = self.device
 | 
				
			||||||
 | 
					        while True:
 | 
				
			||||||
 | 
					            if self.allow_serial_read:
 | 
				
			||||||
 | 
					                buffer_length = ser.in_waiting
 | 
				
			||||||
 | 
					                if buffer_length > 0:
 | 
				
			||||||
 | 
					                    self.handle_serial(self.read_serial())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def read_serial(self) -> list:
 | 
				
			||||||
 | 
					        ser = self.device
 | 
				
			||||||
 | 
					        data = ser.read_until(self.get_stop_bit())
 | 
				
			||||||
 | 
					        data = data[0:len(data)-3]
 | 
				
			||||||
 | 
					        data: list = data.hex('/').split('/')
 | 
				
			||||||
 | 
					        return data
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def handle_serial(self, data: list):
 | 
				
			||||||
 | 
					        message_type = data.pop(0)
 | 
				
			||||||
 | 
					        if message_type == EVENT_TOUCH:
 | 
				
			||||||
 | 
					            self.handle_touch_event(data)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def handle_touch_event(self, data: list):
 | 
				
			||||||
 | 
					        page = int(data.pop(0), 16)
 | 
				
			||||||
 | 
					        component_id = int(data.pop(0), 16)
 | 
				
			||||||
 | 
					        press_type = data.pop(0)
 | 
				
			||||||
 | 
					        print("Got a touch event at page "+str(page)+" in component "+str(component_id)
 | 
				
			||||||
 | 
					              + " with type "+str(press_type))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def get_attribute(self, attribute: str):
 | 
				
			||||||
 | 
					        self.allow_serial_read = False
 | 
				
			||||||
 | 
					        self.send_command("get "+str(attribute))
 | 
				
			||||||
 | 
					        sleep(0.1)
 | 
				
			||||||
 | 
					        data = self.read_serial()
 | 
				
			||||||
 | 
					        self.allow_serial_read = True
 | 
				
			||||||
 | 
					        data_type = data.pop(0)
 | 
				
			||||||
 | 
					        if data_type == DATATYPE_STRING:
 | 
				
			||||||
 | 
					            data = nextion.hex_string_list_to_bytes(data)
 | 
				
			||||||
 | 
					            return data.decode('ascii')
 | 
				
			||||||
 | 
					        elif data_type == DATATYPE_NUMBER:
 | 
				
			||||||
 | 
					            return nextion.little_endian_to_int(data)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @staticmethod
 | 
				
			||||||
 | 
					    def hex_string_list_to_bytes(hexstring_list: list) -> bytes:
 | 
				
			||||||
 | 
					        hexstring = ''.join(hexstring_list)
 | 
				
			||||||
 | 
					        return binascii.unhexlify(hexstring)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @staticmethod
 | 
				
			||||||
 | 
					    def little_endian_to_int(data: list) -> int:
 | 
				
			||||||
 | 
					        data.reverse()
 | 
				
			||||||
 | 
					        hexstring = ''.join(data)
 | 
				
			||||||
 | 
					        value = int(hexstring, 16)
 | 
				
			||||||
 | 
					        return value
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue