Adafruit_Learning_System_Gu.../Adafruit_IO_Air_Quality/code.py
2025-03-07 13:19:44 -08:00

219 lines
7.4 KiB
Python
Executable file

# SPDX-FileCopyrightText: 2020 Brent Rubell for Adafruit Industries
#
# SPDX-License-Identifier: MIT
from os import getenv
import time
import board
import busio
from digitalio import DigitalInOut
import neopixel
from adafruit_esp32spi import adafruit_esp32spi, adafruit_esp32spi_wifimanager
from adafruit_io.adafruit_io import IO_HTTP
from simpleio import map_range
from adafruit_pm25.uart import PM25_UART
# Uncomment below for PMSA003I Air Quality Breakout
# from adafruit_pm25.i2c import PM25_I2C
import adafruit_bme280
### Configure Sensor ###
# Return environmental sensor readings in degrees Celsius
USE_CELSIUS = False
# Interval the sensor publishes to Adafruit IO, in minutes
PUBLISH_INTERVAL = 10
# Get WiFi details and Adafruit IO keys, ensure these are setup in settings.toml
# (visit io.adafruit.com if you need to create an account, or if you need your Adafruit IO key.)
ssid = getenv("CIRCUITPY_WIFI_SSID")
password = getenv("CIRCUITPY_WIFI_PASSWORD")
aio_username = getenv("ADAFRUIT_AIO_USERNAME")
aio_key = getenv("ADAFRUIT_AIO_KEY")
if None in [ssid, password, aio_username, aio_key]:
raise RuntimeError(
"WiFi and Adafruit IO settings are kept in settings.toml, "
"please add them there. The settings file must contain "
"'CIRCUITPY_WIFI_SSID', 'CIRCUITPY_WIFI_PASSWORD', "
"'ADAFRUIT_AIO_USERNAME' and 'ADAFRUIT_AIO_KEY' at a minimum."
)
### WiFi ###
# AirLift FeatherWing
esp32_cs = DigitalInOut(board.D13)
esp32_reset = DigitalInOut(board.D12)
esp32_ready = DigitalInOut(board.D11)
spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
status_pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2)
wifi = adafruit_esp32spi_wifimanager.WiFiManager(esp, ssid, password, status_pixel=status_pixel)
# Connect to a PM2.5 sensor over UART
reset_pin = None
uart = busio.UART(board.TX, board.RX, baudrate=9600)
pm25 = PM25_UART(uart, reset_pin)
# Create i2c object
i2c = busio.I2C(board.SCL, board.SDA, frequency=100000)
# Connect to a BME280 over I2C
bme_sensor = adafruit_bme280.Adafruit_BME280_I2C(i2c)
# Uncomment below for PMSA003I Air Quality Breakout
# pm25 = PM25_I2C(i2c, reset_pin)
# Uncomment below for BME680
# import adafruit_bme680
# bme_sensor = adafruit_bme680.Adafruit_BME680_I2C(i2c)
### Sensor Functions ###
def calculate_aqi(pm_sensor_reading):
"""Returns a calculated air quality index (AQI)
and category as a tuple.
NOTE: The AQI returned by this function should ideally be measured
using the 24-hour concentration average. Calculating a AQI without
averaging will result in higher AQI values than expected.
:param float pm_sensor_reading: Particulate matter sensor value.
"""
# Check sensor reading using EPA breakpoint (Clow-Chigh)
if 0.0 <= pm_sensor_reading <= 12.0:
# AQI calculation using EPA breakpoints (Ilow-IHigh)
aqi_val = map_range(int(pm_sensor_reading), 0, 12, 0, 50)
aqi_cat = "Good"
elif 12.1 <= pm_sensor_reading <= 35.4:
aqi_val = map_range(int(pm_sensor_reading), 12, 35, 51, 100)
aqi_cat = "Moderate"
elif 35.5 <= pm_sensor_reading <= 55.4:
aqi_val = map_range(int(pm_sensor_reading), 36, 55, 101, 150)
aqi_cat = "Unhealthy for Sensitive Groups"
elif 55.5 <= pm_sensor_reading <= 150.4:
aqi_val = map_range(int(pm_sensor_reading), 56, 150, 151, 200)
aqi_cat = "Unhealthy"
elif 150.5 <= pm_sensor_reading <= 250.4:
aqi_val = map_range(int(pm_sensor_reading), 151, 250, 201, 300)
aqi_cat = "Very Unhealthy"
elif 250.5 <= pm_sensor_reading <= 350.4:
aqi_val = map_range(int(pm_sensor_reading), 251, 350, 301, 400)
aqi_cat = "Hazardous"
elif 350.5 <= pm_sensor_reading <= 500.4:
aqi_val = map_range(int(pm_sensor_reading), 351, 500, 401, 500)
aqi_cat = "Hazardous"
else:
print("Invalid PM2.5 concentration")
aqi_val = -1
aqi_cat = None
return aqi_val, aqi_cat
def sample_aq_sensor():
"""Samples PM2.5 sensor
over a 2.3 second sample rate.
"""
aq_reading = 0
aq_samples = []
# initial timestamp
time_start = time.monotonic()
# sample pm2.5 sensor over 2.3 sec sample rate
while time.monotonic() - time_start <= 2.3:
try:
aqdata = pm25.read()
aq_samples.append(aqdata["pm25 env"])
except RuntimeError:
print("Unable to read from sensor, retrying...")
continue
# pm sensor output rate of 1s
time.sleep(1)
# average sample reading / # samples
for sample in range(len(aq_samples)):
aq_reading += aq_samples[sample]
aq_reading = aq_reading / len(aq_samples)
aq_samples.clear()
return aq_reading
def read_bme(is_celsius=False):
"""Returns temperature and humidity
from BME280/BME680 environmental sensor, as a tuple.
:param bool is_celsius: Returns temperature in degrees celsius
if True, otherwise fahrenheit.
"""
humid = bme_sensor.humidity
temp = bme_sensor.temperature
if not is_celsius:
temp = temp * 1.8 + 32
return temp, humid
# Create an instance of the Adafruit IO HTTP client
io = IO_HTTP(aio_username, aio_key, wifi)
# Describes feeds used to hold Adafruit IO data
feed_aqi = io.get_feed("air-quality-sensor.aqi")
feed_aqi_category = io.get_feed("air-quality-sensor.category")
feed_humidity = io.get_feed("air-quality-sensor.humidity")
feed_temperature = io.get_feed("air-quality-sensor.temperature")
# Set up location metadata from settings.toml file
location_metadata = {
"lat": getenv("latitude"),
"lon": getenv("longitude"),
"ele": getenv("elevation"),
}
elapsed_minutes = 0
prv_mins = 0
while True:
try:
print("Fetching time...")
cur_time = io.receive_time()
print("Time fetched OK!")
# Hourly reset
if cur_time.tm_min == 0:
prv_mins = 0
except (ValueError, RuntimeError, ConnectionError, OSError) as e:
print("Failed to fetch time, retrying\n", e)
wifi.reset()
wifi.connect()
continue
if cur_time.tm_min >= prv_mins:
print("%d min elapsed.." % elapsed_minutes)
prv_mins = cur_time.tm_min
elapsed_minutes += 1
if elapsed_minutes >= PUBLISH_INTERVAL:
print("Sampling AQI...")
aqi_reading = sample_aq_sensor()
aqi, aqi_category = calculate_aqi(aqi_reading)
print("AQI: %d" % aqi)
print("Category: %s" % aqi_category)
# temp and humidity
print("Sampling environmental sensor...")
temperature, humidity = read_bme(USE_CELSIUS)
print("Temperature: %0.1f F" % temperature)
print("Humidity: %0.1f %%" % humidity)
# Publish all values to Adafruit IO
print("Publishing to Adafruit IO...")
try:
io.send_data(feed_aqi["key"], str(aqi), location_metadata)
io.send_data(feed_aqi_category["key"], aqi_category)
io.send_data(feed_temperature["key"], str(temperature))
io.send_data(feed_humidity["key"], str(humidity))
print("Published!")
except (ValueError, RuntimeError, ConnectionError, OSError) as e:
print("Failed to send data to IO, retrying\n", e)
wifi.reset()
wifi.connect()
continue
# Reset timer
elapsed_minutes = 0
time.sleep(30)