import serial import time from threading import Thread class access_control: _in_payloads = [] _read_buffer = [] _door_state: bool = False serial_adapter: serial.Serial _scan_active: bool = False _read_active = True def __init__(self, serial_port: str): self.serial_adapter = serial.Serial(serial_port,baudrate=9600) Thread(target=self.read_serial).start() Thread(target=self._process_payload).start() #Thread(target=self._state_requester).start() def _state_requester(self): while True: if self._read_active: self._request_door_state() self._request_scan_state() time.sleep(1) def read_serial(self): while True: if self.serial_adapter.in_waiting>0: in_byte = self.serial_adapter.read(1) if(in_byte==b'\xFF'): self._in_payloads.append(self._read_buffer) self._read_buffer = [] else: self._read_buffer.append(in_byte) time.sleep(0.01) def _process_payload(self): while True: self._process_payload_once() time.sleep(0.01) def _process_payload_once(self): if(len(self._in_payloads)>0): payload = self._in_payloads.pop(0) if len(payload)>0: if(payload[0]==b'\x01'): if(payload[1]==b'\x00'): self._door_state = True elif(payload[1]==b'\01'): self._door_state = False elif(payload[0]==b'\x02'): if(payload[1]==b'\x00'): self._scan_active = False elif(payload[1]==b'\01'): self._scan_active = True def light_on(self): packet = bytearray() packet.append(0x00) packet.append(0x01) packet.append(0xFF) self.serial_adapter.write(packet) def light_off(self): packet = bytearray() packet.append(0x00) packet.append(0x00) packet.append(0xFF) self.serial_adapter.write(packet) def _request_door_state(self): packet = bytearray() packet.append(0x01) packet.append(0xFF) self.serial_adapter.write(packet) def _request_scan_state(self): packet = bytearray() packet.append(0x03) packet.append(0xFF) self.serial_adapter.write(packet) def get_door_state(self) -> bool: return self._door_state def get_scan_state(self) -> bool: return self._scan_active def lock_door(self): packet = bytearray() packet.append(0x02) packet.append(0x01) packet.append(0xFF) self.serial_adapter.write(packet) def unlock_door(self): packet = bytearray() packet.append(0x02) packet.append(0x00) packet.append(0xFF) self.serial_adapter.write(packet)