main cpy
|
@ -1,20 +1,41 @@
|
||||||
from espmega_mqtt import ESPMegaMQTT
|
from espmega.espmega_r3 import ESPMega_standalone as ESPMega
|
||||||
from time import sleep
|
from time import sleep
|
||||||
|
|
||||||
class access_control:
|
class access_control:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.plc = ESPMegaMQTT()
|
self.plc = ESPMega("/facescan","192.168.0.239",1883)
|
||||||
pass
|
|
||||||
|
|
||||||
|
# def light_on(self):
|
||||||
|
# self.plc.analog_write(0,1,4095)
|
||||||
|
# def light_off(self):
|
||||||
|
# self.plc.analog_write(0,0,4095)
|
||||||
|
# def get_door_state(self) -> bool:
|
||||||
|
# return self.plc.digital_read(0)
|
||||||
|
# def get_scan_state(self) -> bool:
|
||||||
|
# return self.plc.digital_read(1)
|
||||||
|
# def lock_door(self):
|
||||||
|
# if(self.get_door_state()):
|
||||||
|
# self.plc.analog_write(1,1,4095)
|
||||||
|
|
||||||
|
# def unlock_door(self):
|
||||||
|
# self.plc.analog_write(1,0,4095)
|
||||||
|
|
||||||
|
|
||||||
def light_on(self):
|
|
||||||
self.plc.write_pwm(0,1,4095)
|
|
||||||
def light_off(self):
|
|
||||||
self.plc.write_pwm(0,0,4095)
|
|
||||||
def get_door_state(self) -> bool:
|
|
||||||
return self.plc.read_digital(0)
|
|
||||||
def get_scan_state(self) -> bool:
|
def get_scan_state(self) -> bool:
|
||||||
return self.plc.read_digital(1)
|
return self.plc.digital_read(1)
|
||||||
def lock_door(self):
|
|
||||||
self.plc.write_pwm(1,1,4095)
|
def lock_door(self): #if door close, lock the door.
|
||||||
def unlock_door(self):
|
if(self.plc.digital_read(0)):
|
||||||
self.plc.write_pwm(1,0,4095)
|
self.plc.analog_write(0,1,4095)
|
||||||
|
|
||||||
|
|
||||||
|
def unlock_door(self): #if door close, unlock the door.
|
||||||
|
if(self.plc.digital_read(0)):
|
||||||
|
self.plc.analog_write(0,0,4095)
|
||||||
|
|
||||||
|
def get_door_state(self) -> bool:
|
||||||
|
return self.plc.digital_read(0)
|
||||||
|
|
||||||
|
def activate_alarm():
|
||||||
|
self.plc.analog_write(1,1,4095)
|
|
@ -31,7 +31,7 @@ class Database:
|
||||||
statement = (f'SELECT * FROM students WHERE imagefile=\"{filename}\"')
|
statement = (f'SELECT * FROM students WHERE imagefile=\"{filename}\"')
|
||||||
self.cursor.execute(statement)
|
self.cursor.execute(statement)
|
||||||
result = self.cursor.fetchone()
|
result = self.cursor.fetchone()
|
||||||
if len(result) == 0:
|
if result == None:
|
||||||
return None
|
return None
|
||||||
student = Student(result[0],result[1],result[2],result[3])
|
student = Student(result[0],result[1],result[2],result[3])
|
||||||
return student
|
return student
|
||||||
|
@ -39,7 +39,7 @@ class Database:
|
||||||
statement = (f'SELECT * FROM parents WHERE imagefile=\"{filename}\"')
|
statement = (f'SELECT * FROM parents WHERE imagefile=\"{filename}\"')
|
||||||
self.cursor.execute(statement)
|
self.cursor.execute(statement)
|
||||||
result = self.cursor.fetchone()
|
result = self.cursor.fetchone()
|
||||||
if len(result) == 0:
|
if result == None:
|
||||||
return None
|
return None
|
||||||
parent = Parent(result[0],result[1],result[2],result[3])
|
parent = Parent(result[0],result[1],result[2],result[3])
|
||||||
return parent
|
return parent
|
||||||
|
|
|
@ -1,16 +1,32 @@
|
||||||
import serial
|
import serial
|
||||||
class Display:
|
class Display:
|
||||||
def __init__(self, serial_port: str) -> None:
|
def __init__(self, serial_port: str) -> None:
|
||||||
self.serial_adapter = serial.Serial(serial_port,baudrate=115200)
|
self.serial_adapter = serial.Serial(
|
||||||
|
port=serial_port,
|
||||||
|
baudrate =115200,
|
||||||
|
parity=serial.PARITY_NONE,
|
||||||
|
stopbits=serial.STOPBITS_ONE,
|
||||||
|
bytesize=serial.EIGHTBITS,
|
||||||
|
timeout=1)
|
||||||
|
self.send_stop_bit()
|
||||||
|
self.reset_device()
|
||||||
def set_page(self, page):
|
def set_page(self, page):
|
||||||
command = f'page {page}'.encode("ascii")
|
command = f'page {page}'
|
||||||
self.serial_adapter.write(command)
|
self.send_command(command)
|
||||||
self.send_stop_bit()
|
|
||||||
def set_string(self, field, text):
|
def set_string(self, field, text):
|
||||||
command = f'{field}="{text}"'.encode("ascii")
|
command = f'{field}="{text}"'
|
||||||
|
self.send_command(command)
|
||||||
|
def send_stop_bit(self):
|
||||||
|
self.serial_adapter.write(self.get_stop_bit())
|
||||||
|
def send_command(self, command: str):
|
||||||
|
command = bytes(command, 'ascii')
|
||||||
self.serial_adapter.write(command)
|
self.serial_adapter.write(command)
|
||||||
self.send_stop_bit()
|
self.send_stop_bit()
|
||||||
def send_stop_bit(self):
|
def get_stop_bit(self) -> bytearray:
|
||||||
self.serial_adapter.write(0xFF)
|
stop_bit = bytearray()
|
||||||
self.serial_adapter.write(0xFF)
|
stop_bit.append(0xFF)
|
||||||
self.serial_adapter.write(0xFF)
|
stop_bit.append(0xFF)
|
||||||
|
stop_bit.append(0XFF)
|
||||||
|
return stop_bit
|
||||||
|
def reset_device(self):
|
||||||
|
self.send_command("rest")
|
|
@ -1,25 +0,0 @@
|
||||||
import paho.mqtt.client as pahomqtt
|
|
||||||
BROKER = "192.168.0.239"
|
|
||||||
BASE_TOPIC = "/facescan"
|
|
||||||
class ESPMegaMQTT:
|
|
||||||
input_buffer = [0]*16
|
|
||||||
mqtt: pahomqtt.Client
|
|
||||||
def __init__(self) -> None:
|
|
||||||
self.mqtt = pahomqtt.Client(client_id="pyfacescan")
|
|
||||||
self.mqtt.connect(host=BROKER,port=1883,keepalive=60)
|
|
||||||
self.mqtt.subscribe(f'{BASE_TOPIC}/input/#')
|
|
||||||
self.mqtt.on_message=self.handle_message
|
|
||||||
self.request_state()
|
|
||||||
self.mqtt.loop_start()
|
|
||||||
def write_pwm(self, id: int, state: bool, value: int = 4095):
|
|
||||||
self.mqtt.publish(f'{BASE_TOPIC}/pwm/{"%02d"}/set/state'%id,"on" if state else "off")
|
|
||||||
self.mqtt.publish(f'{BASE_TOPIC}/pwm/{"%02d"}/set/value'%id, str(value))
|
|
||||||
def read_digital(self, id: int):
|
|
||||||
return self.input_buffer[id]
|
|
||||||
def request_state(self):
|
|
||||||
self.mqtt.publish(f'{BASE_TOPIC}/requeststate',"req")
|
|
||||||
def handle_message(self, client: pahomqtt.Client, data, message: pahomqtt.MQTTMessage):
|
|
||||||
if (message.topic.startswith(BASE_TOPIC+"/input/")):
|
|
||||||
id = int(message.topic[len(BASE_TOPIC)+7:len(message.topic)])
|
|
||||||
state = int(message.payload)
|
|
||||||
self.input_buffer[id] = state
|
|
|
@ -0,0 +1,162 @@
|
||||||
|
import cv2
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
import face_processing as fp
|
||||||
|
from access_control_mqtt import access_control
|
||||||
|
from database import Database, Student, Parent
|
||||||
|
from display import Display
|
||||||
|
|
||||||
|
SERIAL_PORT_DISPLAY = "COM11"
|
||||||
|
CAMERA_INDEX = 0
|
||||||
|
|
||||||
|
display = Display(SERIAL_PORT_DISPLAY)
|
||||||
|
cam = cv2.VideoCapture(CAMERA_INDEX)
|
||||||
|
actrl = access_control()
|
||||||
|
db = Database()
|
||||||
|
|
||||||
|
global img
|
||||||
|
global frame_ready
|
||||||
|
frame_ready = False
|
||||||
|
|
||||||
|
def read_webcam():
|
||||||
|
global img
|
||||||
|
global frame_ready
|
||||||
|
while True:
|
||||||
|
ret, img = cam.read()
|
||||||
|
frame_ready = True
|
||||||
|
|
||||||
|
'''
|
||||||
|
display.set_page("student")
|
||||||
|
display.set_string("msg.txt","Door Open")
|
||||||
|
'''
|
||||||
|
|
||||||
|
student_info = None
|
||||||
|
parent_info = None
|
||||||
|
faces = None
|
||||||
|
|
||||||
|
|
||||||
|
threading.Thread(target=read_webcam).start()
|
||||||
|
|
||||||
|
while True:
|
||||||
|
while not frame_ready:
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
should_restart = True
|
||||||
|
|
||||||
|
|
||||||
|
while should_restart:
|
||||||
|
#Try to identify face
|
||||||
|
should_restart = False
|
||||||
|
#if actrl.get_scan_state():
|
||||||
|
if(True):
|
||||||
|
#print("SCAN ACTIVE!")
|
||||||
|
display.set_string("msg.txt","Scanning")
|
||||||
|
faces = fp.identify_face(img, target_condidence=0.6) ## Scan student face
|
||||||
|
if(len(faces)==1):
|
||||||
|
#print("faces == 1")
|
||||||
|
facefile = faces[0]['name'] ## File name
|
||||||
|
print(facefile)
|
||||||
|
student_info = db.get_student_info(facefile)## Check is it student?
|
||||||
|
if(student_info!=None): #is student, request another scan for parent
|
||||||
|
print("Find student info")
|
||||||
|
#db.log_access_student_file()
|
||||||
|
display.set_page("student")
|
||||||
|
display.set_string("msg.txt","Scan parent face")
|
||||||
|
display.set_string("name_std.txt", f'Name: {student_info.name}')
|
||||||
|
display.set_string("surname_std.txt",f'Surname: {student_info.surname}')
|
||||||
|
display.set_string("id_std.txt", f'ID: {str(student_info.id)}')
|
||||||
|
"""
|
||||||
|
parent_face = fp.identify_face(img, target_condidence=0.6)
|
||||||
|
parent_info = db.get_parent_info(parent_face)"""
|
||||||
|
#DONE TODO Fill in student info Resolve
|
||||||
|
|
||||||
|
time.sleep(1)
|
||||||
|
print(student_info)
|
||||||
|
|
||||||
|
i = 0
|
||||||
|
while(i <= 10):
|
||||||
|
parent_face = fp.identify_face(img, target_condidence=0.6)
|
||||||
|
|
||||||
|
if (len(parent_face)!= 0):
|
||||||
|
parent_facefile = parent_face[0]['name']
|
||||||
|
parent_info = db.get_parent_info(parent_facefile)
|
||||||
|
print(i)
|
||||||
|
i += 1
|
||||||
|
if(parent_info != None): # Identified parent
|
||||||
|
if(db.check_relationship(student_info,parent_info)): # Check if the detected parent is right for the detected student.
|
||||||
|
print("Relation is OKAY")
|
||||||
|
#DONE TODO Log Access STDw/P
|
||||||
|
db.log_access_student_with_parent(student_info, student_info)
|
||||||
|
#DONE TODO write parrent info to display
|
||||||
|
|
||||||
|
display.set_string("name_pt.txt", f'Name: {parent_info.name}')
|
||||||
|
display.set_string("surname_pt.txt", f'Surname: {parent_info.surname}')
|
||||||
|
display.set_string("id_pt.txt", f'ID: {parent_info.id}')
|
||||||
|
actrl.unlock_door() # Door open
|
||||||
|
display.set_string("msg.txt","Get your kid")
|
||||||
|
#DONE TODO Send timer to display
|
||||||
|
for i in range(10):
|
||||||
|
display.set_string("msg.txt",f'Door will close in {10-i}')
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
i = 10
|
||||||
|
#TODO Check if door close, lock
|
||||||
|
while(i>=0):
|
||||||
|
if (not actrl.get_door_state()): # False door is left open
|
||||||
|
display.set_string("msg.txt","Door is left open, Alram in {} sec".format(i))
|
||||||
|
time.sleep(1)
|
||||||
|
i -= 1
|
||||||
|
if i == 0:
|
||||||
|
actrl.activate_alarm()
|
||||||
|
else:
|
||||||
|
actrl.lock_door()
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
display.set_page("scan")
|
||||||
|
display.set_string("msg.txt","Wrong parent. \t Try Again")
|
||||||
|
should_restart = True
|
||||||
|
break
|
||||||
|
#TODO Try again does not try again
|
||||||
|
|
||||||
|
else:
|
||||||
|
display.set_page("student")
|
||||||
|
display.set_string("msg.txt","Cannot find parent data. \t Try Again")
|
||||||
|
should_restart = True
|
||||||
|
time.sleep(1)
|
||||||
|
else:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
#TODO is this a parent?
|
||||||
|
display.set_page("student")
|
||||||
|
display.set_string("msg.txt","Cannot find student data. \t Try Again")
|
||||||
|
pass
|
||||||
|
|
||||||
|
elif (len(faces)>1):
|
||||||
|
#More than one people, error.
|
||||||
|
print("more than one people. \t try again")
|
||||||
|
display.set_string("msg.txt","More than one people. \t try again")
|
||||||
|
time.sleep(2)
|
||||||
|
pass
|
||||||
|
print("DONE")
|
||||||
|
|
||||||
|
else:
|
||||||
|
display.set_string("msg.txt","Move your face closer to the scanner")
|
||||||
|
print("Scan inactivate!!")
|
||||||
|
|
||||||
|
|
||||||
|
'''
|
||||||
|
#is student, request another scan for parent
|
||||||
|
parent_face = fp.identify_face(img, target_condidence=0.6)
|
||||||
|
if(len(faces)==1):
|
||||||
|
parent_info = db.get_parent_info(faces[0]['name'])
|
||||||
|
if(parent_info!=None):
|
||||||
|
if(db.check_relationship(student_info,parent_info)):
|
||||||
|
#Student is under parent, open door.
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
#Wrong Parent, retry.
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
#Not a parent, retry
|
||||||
|
pass
|
||||||
|
'''
|
|
@ -6,7 +6,7 @@ from access_control_mqtt import access_control
|
||||||
from database import Database, Student, Parent
|
from database import Database, Student, Parent
|
||||||
from display import Display
|
from display import Display
|
||||||
|
|
||||||
SERIAL_PORT_DISPLAY = "COM26"
|
SERIAL_PORT_DISPLAY = "COM11"
|
||||||
CAMERA_INDEX = 0
|
CAMERA_INDEX = 0
|
||||||
|
|
||||||
cam = cv2.VideoCapture(CAMERA_INDEX)
|
cam = cv2.VideoCapture(CAMERA_INDEX)
|
||||||
|
@ -43,7 +43,6 @@ while True:
|
||||||
if(len(faces)==1):
|
if(len(faces)==1):
|
||||||
facefile = faces[0]['name'] ## File name
|
facefile = faces[0]['name'] ## File name
|
||||||
student_info = db.get_student_info(facefile)## Check is it student?
|
student_info = db.get_student_info(facefile)## Check is it student?
|
||||||
db.log_access_student(student_info) # Log student data
|
|
||||||
if(student_info!=None): #is student, request another scan for parent
|
if(student_info!=None): #is student, request another scan for parent
|
||||||
#db.log_access_student_file()
|
#db.log_access_student_file()
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
|
@ -51,24 +50,52 @@ while True:
|
||||||
display.set_string("msg.txt","Scan parent face")
|
display.set_string("msg.txt","Scan parent face")
|
||||||
parent_face = fp.identify_face(img, target_condidence=0.6)
|
parent_face = fp.identify_face(img, target_condidence=0.6)
|
||||||
parent_info = db.get_parent_info(parent_face)
|
parent_info = db.get_parent_info(parent_face)
|
||||||
#TODO Fill in student info
|
#DONE TODO Fill in student info Resolve
|
||||||
|
display.set_page("student")
|
||||||
|
display.set_string("name_std.txt", student_info.name)
|
||||||
|
display.set_string("surname_std.txt", student_info.surname)
|
||||||
|
display.set_string("id_std.txt", student_info.id)
|
||||||
|
|
||||||
if(parent_info != None): # Identified parent
|
if(parent_info != None): # Identified parent
|
||||||
if(db.check_relationship(student_info,parent_info)): # Check if the detected parent is right for the detected student.
|
if(db.check_relationship(student_info,parent_info)): # Check if the detected parent is right for the detected student.
|
||||||
#TODO Log Access STDw/P
|
#DONE TODO Log Access STDw/P
|
||||||
#TODO write parrent info to display
|
db.log_access_student_with_parent(student_info, student_info)
|
||||||
|
#DONE TODO write parrent info to display
|
||||||
|
display.set_page("student")
|
||||||
|
display.set_string("name_std.txt", parent_info.name)
|
||||||
|
display.set_string("surname_std.txt", parent_info.surname)
|
||||||
|
display.set_string("id_std.txt", parent_info.id)
|
||||||
access_control.unlock_door() # Door open
|
access_control.unlock_door() # Door open
|
||||||
display.set_string("msg.txt","Get your kid")
|
display.set_string("msg.txt","Get your kid")
|
||||||
#TODO Send timer to display
|
#TODO Send timer to display
|
||||||
time.sleep(10)
|
#display.set_page("student")
|
||||||
|
i = 10
|
||||||
|
while(i>=1):
|
||||||
|
display.set_string("msg.txt","Door will close in {}".format(i))
|
||||||
|
time.sleep(1)
|
||||||
|
i -= 1
|
||||||
|
actrl.lock_door()
|
||||||
|
|
||||||
#TODO Check if door can be locked
|
i = 10
|
||||||
# if can lock, lock
|
#TODO Check if door close, lock
|
||||||
|
while(i>=0):
|
||||||
|
|
||||||
|
if (not actrl.get_door_state()): # False door is left open
|
||||||
|
display.set_string("msg.txt","Door is left open, Alram in {} sec".format(i))
|
||||||
|
time.sleep(1)
|
||||||
|
i -= 1
|
||||||
|
if i == 0:
|
||||||
|
actrl.activate_alarm()
|
||||||
|
else:
|
||||||
|
actrl.lock_door()
|
||||||
|
break
|
||||||
|
|
||||||
|
|
||||||
|
#
|
||||||
#TODO If can't lock, say left open
|
#TODO If can't lock, say left open
|
||||||
#Countdown
|
#Countdown
|
||||||
# Activate Alarm
|
# Activate Alarm
|
||||||
else:
|
else:
|
||||||
|
|
||||||
display.set_page("scan")
|
display.set_page("scan")
|
||||||
display.set_string("msg.txt","Wrong parent. \t Try Again")
|
display.set_string("msg.txt","Wrong parent. \t Try Again")
|
||||||
#TODO Try again does not try again
|
#TODO Try again does not try again
|
||||||
|
|
|
@ -16,10 +16,10 @@ plc = ESPMegaMQTT()
|
||||||
|
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
for i in range(8,16):
|
for i in range(0,16):
|
||||||
plc.write_pwm(i, 0, 4095)
|
plc.write_pwm(i, 0, 2000)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
for i in range(8,16):
|
for i in range(0,16):
|
||||||
plc.write_pwm(i, 1, 4095)
|
plc.write_pwm(i, 1, 2000)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,33 @@
|
||||||
|
import time
|
||||||
|
import serial
|
||||||
|
|
||||||
|
device = serial.Serial(
|
||||||
|
port="COM11",
|
||||||
|
baudrate =115200,
|
||||||
|
parity=serial.PARITY_NONE,
|
||||||
|
stopbits=serial.STOPBITS_ONE,
|
||||||
|
bytesize=serial.EIGHTBITS,
|
||||||
|
timeout=1)
|
||||||
|
|
||||||
|
def get_stop_bit() -> bytearray:
|
||||||
|
stop_bit = bytearray()
|
||||||
|
stop_bit.append(0xFF)
|
||||||
|
stop_bit.append(0xFF)
|
||||||
|
stop_bit.append(0XFF)
|
||||||
|
return stop_bit
|
||||||
|
def send_stop_bit():
|
||||||
|
device.write(get_stop_bit())
|
||||||
|
def reset_device():
|
||||||
|
send_command("rest")
|
||||||
|
def send_command(command: str):
|
||||||
|
command = bytes(command, 'ascii')
|
||||||
|
device.write(command)
|
||||||
|
send_stop_bit()
|
||||||
|
def send_command(command: str):
|
||||||
|
command = bytes(command, 'ascii')
|
||||||
|
device.write(command)
|
||||||
|
send_stop_bit()
|
||||||
|
|
||||||
|
send_command("rest")
|
||||||
|
time.sleep(1)
|
||||||
|
send_command("page parent")
|
Before Width: | Height: | Size: 148 KiB After Width: | Height: | Size: 148 KiB |
Before Width: | Height: | Size: 124 KiB After Width: | Height: | Size: 124 KiB |
After Width: | Height: | Size: 1.2 MiB |
After Width: | Height: | Size: 1.2 MiB |
After Width: | Height: | Size: 1.2 MiB |
After Width: | Height: | Size: 1.2 MiB |
After Width: | Height: | Size: 1.2 MiB |
After Width: | Height: | Size: 1.2 MiB |
After Width: | Height: | Size: 1.2 MiB |
After Width: | Height: | Size: 1.2 MiB |
After Width: | Height: | Size: 1.2 MiB |
|
@ -0,0 +1,11 @@
|
||||||
|
from deepface import DeepFace
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
image_path = 'D:\Git\stm32-fmt-code\\face_recognition\img_1.jpg'
|
||||||
|
database_path = 'D:\Git\stm32-fmt-code\\face_recognition\\faces_data\Sukatat'
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
DeepFace.stream(db_path = database_path)
|
||||||
|
|
||||||
|
#result.head()
|