2023-09-17 08:27:41 +00:00
|
|
|
import serial
|
2023-09-17 09:57:40 +00:00
|
|
|
import time
|
2023-09-17 08:27:41 +00:00
|
|
|
from threading import Thread
|
|
|
|
|
|
|
|
class access_control:
|
2023-09-17 09:57:40 +00:00
|
|
|
_in_payloads = []
|
2023-09-17 08:27:41 +00:00
|
|
|
_read_buffer = []
|
2023-09-17 09:57:40 +00:00
|
|
|
_door_state: bool = 0
|
2023-09-17 08:27:41 +00:00
|
|
|
serial_adapter: serial.Serial
|
|
|
|
def __init__(self, serial_port: str):
|
|
|
|
self.serial_adapter = serial.Serial(serial_port,baudrate=115200)
|
2023-09-17 09:57:40 +00:00
|
|
|
Thread(target=self.read_serial).start()
|
|
|
|
Thread(target=self._process_payload).start()
|
2023-09-17 08:27:41 +00:00
|
|
|
def read_serial(self):
|
|
|
|
while True:
|
2023-09-17 09:57:40 +00:00
|
|
|
if self.serial_adapter.in_waiting:
|
|
|
|
in_byte = self.serial_adapter.read(1)
|
|
|
|
if(in_byte==b'\xFF'):
|
|
|
|
self._in_payloads.append(self._read_buffer)
|
|
|
|
self._read_buffer = []
|
|
|
|
else:
|
|
|
|
self._read_buffer.append(in_byte)
|
|
|
|
def _process_payload(self):
|
|
|
|
while True:
|
|
|
|
self._process_payload_once()
|
|
|
|
|
|
|
|
def _process_payload_once(self):
|
|
|
|
if(len(self._in_payloads)>0):
|
|
|
|
payload = self._in_payloads.pop(0)
|
|
|
|
if(payload[0]==b'\x01'):
|
|
|
|
if(payload[1]==b'\x00'):
|
|
|
|
self._door_state = True
|
|
|
|
elif(payload[1]==b'\01'):
|
|
|
|
self._door_state = False
|
2023-09-17 08:27:41 +00:00
|
|
|
def light_on(self):
|
|
|
|
packet = bytearray()
|
|
|
|
packet.append(0x00)
|
|
|
|
packet.append(0x01)
|
|
|
|
packet.append(0xFF)
|
|
|
|
self.serial_adapter.write(packet)
|
|
|
|
def light_off(self):
|
|
|
|
packet = bytearray()
|
|
|
|
packet.append(0x00)
|
|
|
|
packet.append(0x00)
|
|
|
|
packet.append(0xFF)
|
|
|
|
self.serial_adapter.write(packet)
|
2023-09-17 09:57:40 +00:00
|
|
|
def _request_door_state(self):
|
2023-09-17 09:18:33 +00:00
|
|
|
packet = bytearray()
|
|
|
|
packet.append(0x01)
|
|
|
|
packet.append(0xFF)
|
2023-09-17 09:57:40 +00:00
|
|
|
self.serial_adapter.write(packet)
|
|
|
|
def get_door_state(self) -> bool:
|
|
|
|
self._request_door_state()
|
|
|
|
time.sleep(0.25)
|
|
|
|
return self._door_state
|