iqair-apiserver/app.py

191 lines
6.6 KiB
Python

from smb.SMBConnection import SMBConnection
from flask import Flask, jsonify, request
from threading import Thread
from time import sleep
import time
import requests
import json
global last_indoor_data
last_indoor_data = {
"offline": True
}
def get_indoor_data() -> list:
# SMB server details
server_name = "192.168.0.116"
share_name = "airvisual"
username = "airvisual"
password = "6scprjag"
# File details
file_path = "202401_AirVisual_values.txt"
# Connect to the SMB server
conn = SMBConnection(username, password, "", "")
conn.connect(server_name, 139)
# Read the file contents
file_obj = open(file_path, "wb")
conn.retrieveFile(share_name, file_path, file_obj)
conn.close()
# Open the local cached file
file_obj = open(file_path, "r")
# The first line of the file contains the header
# The header contains the column names separated by a semicolon (;)
# The rest of the file contains the data separated by a semicolon (;)
# Extract the column names and the data from the file
file_obj.seek(0)
header = file_obj.readline().strip().split(";")
data = file_obj.readlines()
# Split all the data into a list of lists
data = [row.strip().split(";") for row in data]
file_obj.close()
# Remap the header names
headers_map = {
"PM2_5(ug/m3)": "pm25",
"PM10(ug/m3)": "pm10",
"PM1(ug/m3)": "pm1",
"CO2(ppm)": "co2",
"AQI(US)_indoor": "aqi",
"Temperature(C)": "temperature",
"Humidity(%RH)": "humidity",
"Timestamp": "time"
}
# Remove rows with header names that are not in the header map
# First, get the indices of the header names that are in the header map
headers_indices = []
for index, name in enumerate(header):
if name in headers_map:
headers_indices.append(index)
# Construct the new header with the header names that are in the header map
header = [header[index] for index in headers_indices]
# Construct the new data with only the columns indicated by the header indices
data = [[row[index] for index in headers_indices] for row in data]
# Remap the header names
headers = [headers_map[name] for name in header]
# Convert unix timestamp to human readable time
for row in data:
row[headers.index("time")] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(row[headers.index("time")])))
# Create a list of dictionaries representing the data
# Each dictionary represents a row of data
data_list = []
for row in data:
data_dict = {}
for header in headers:
data_dict[header] = row[headers.index(header)]
data_list.append(data_dict)
return data_list
def get_outdoor_data_current() -> dict:
# Fetch the data from the AirVisual API
# Note that API call is rate limited to 5 calls per minute
# If this function is called within 1 minute of the previous call, return the cached data
# Check if the cache file exists
# If it does not exist, create a new cache file
try:
data = json.loads(open("outdoor_data_cache.txt", "r").read())
except:
default_data = {
"pm25": 0,
"pm10": 0,
"pm1": 0,
"aqi": 0,
"temperature": 0,
"humidity": 0,
"pressure": 0,
"time": 0,
"last_updated": 0 # Unix timestamp
}
open("outdoor_data_cache.txt", "w").write(json.dumps(default_data))
data = default_data
# Is the last_updated time more than 1 minute ago?
# If it is, fetch the data from the API
# If it is not, return the cached data
# Note that the cache file is a JSON object
data["last_updated"] = int(data["last_updated"])
if data["last_updated"] + 60 < int(time.time()):
url = "https://device.iqair.com/v2/64b63cdf45eeae29464b590d"
response = requests.get(url)
try:
print("Fetching data from API!" )
data = response.json()
# Create a dictionary of the data
data = {
"pm25": data["current"]["pm25"]["conc"],
"pm10": data["current"]["pm10"]["conc"],
"pm1": data["current"]["pm1"]["conc"],
"aqi": data["current"]["aqius"],
"temperature": data["current"]["tp"],
"humidity": data["current"]["hm"],
"pressure": data["current"]["pr"],
"time": data["current"]["ts"]
}
# Time is in 2024-01-03T16:08:32.000Z
# Convert to GMT+7 in the format YYYY-MM-DD HH:MM:SS
# First parse the time string to a datetime object
# Then format the datetime object to YYYY-MM-DD HH:MM:SS
# The time string is in UTC time, we need to convert it to GMT+7
data["time"] = time.strptime(data["time"], "%Y-%m-%dT%H:%M:%S.000Z")
data["time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.mktime(data["time"]) + 7 * 3600))
# Update the cache file
data["last_updated"] = int(time.time())
open("outdoor_data_cache.txt", "w").write(json.dumps(data))
# Remove the last_updated key
del data["last_updated"]
return data
except:
# Oops, we got rate limited
# Return the cached data
print("Rate limited!")
return data
else:
print("Using cached data!")
return data
def merge_data(indoor_data_current: dict, outdoor_data: dict) -> dict:
# Indoor data dict's key are to be appended with "_indoor"
# Outdoor data dict's key are to be appended with "_outdoor"
# Merge the two dictionaries
merged_data = {}
for key, value in indoor_data_current.items():
merged_data[key + "_indoor"] = value
for key, value in outdoor_data.items():
merged_data[key + "_outdoor"] = value
return merged_data
app = Flask(__name__)
# Refresh the indoor data every 30 seconds
def refresh_data():
while True:
print("Fetching indoor data!")
indoor_data = get_indoor_data()
global last_indoor_data
# last_indoor_data the last dictionary in the list
last_indoor_data = indoor_data[-1]
sleep(30)
# Start the thread to refresh the data
Thread(target=refresh_data).start()
# Return the latest data
@app.route("/get_data", methods=["GET"])
def get_data_route():
global last_indoor_data
indoor_data = last_indoor_data
outdoor_data = get_outdoor_data_current()
merged_data = merge_data(indoor_data, outdoor_data)
return jsonify(merged_data)