Adafruit_Learning_System_Gu.../Adafruit_IO_Air_Quality/code.py

201 lines
6.8 KiB
Python
Executable file

import time
import board
import busio
from digitalio import DigitalInOut
import neopixel
from adafruit_esp32spi import adafruit_esp32spi, adafruit_esp32spi_wifimanager
from adafruit_io.adafruit_io import IO_HTTP
from simpleio import map_range
import adafruit_pm25
### Configure Sensor ###
# Return environmental sensor readings in degrees Celsius
USE_CELSIUS = False
# Interval the sensor publishes to Adafruit IO, in minutes
PUBLISH_INTERVAL = 10
# Set this to True if you are using a PMSA003I breakout, False otherwise
USE_PMSA003I = False
### WiFi ###
# Get wifi details and more from a secrets.py file
try:
from secrets import secrets
except ImportError:
print("WiFi secrets are kept in secrets.py, please add them there!")
raise
# AirLift FeatherWing
esp32_cs = DigitalInOut(board.D13)
esp32_reset = DigitalInOut(board.D12)
esp32_ready = DigitalInOut(board.D11)
spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
status_light = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2)
wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light)
# Create i2c object, use 'slow' 100KHz frequency!
i2c = busio.I2C(board.SCL, board.SDA, frequency=100000)
reset_pin = None
if USE_PMSA003I:
import adafruit_bme680
# Connect to a PM2.5 sensor over I2C
pm25 = adafruit_pm25.PM25_I2C(i2c, reset_pin)
# Connect to a BME680 sensor over I2C
bme_sensor = adafruit_bme680.Adafruit_BME680_I2C(i2c)
else:
import adafruit_bme280
# Connect to a PM2.5 sensor over UART
uart = busio.UART(board.TX, board.RX, baudrate=9600)
pm25 = adafruit_pm25.PM25_UART(uart, reset_pin)
# Connect to a BME280 sensor over I2C
bme_sensor = adafruit_bme280.Adafruit_BME280_I2C(i2c)
### Sensor Functions ###
def calculate_aqi(pm_sensor_reading):
"""Returns a calculated air quality index (AQI)
and category as a tuple.
NOTE: The AQI returned by this function should ideally be measured
using the 24-hour concentration average. Calculating a AQI without
averaging will result in higher AQI values than expected.
:param float pm_sensor_reading: Particulate matter sensor value.
"""
# Check sensor reading using EPA breakpoint (Clow-Chigh)
if 0.0 <= pm_sensor_reading <= 12.0:
# AQI calculation using EPA breakpoints (Ilow-IHigh)
aqi_val = map_range(int(pm_sensor_reading), 0, 12, 0, 50)
aqi_cat = "Good"
elif 12.1 <= pm_sensor_reading <= 35.4:
aqi_val = map_range(int(pm_sensor_reading), 12, 35, 51, 100)
aqi_cat = "Moderate"
elif 35.5 <= pm_sensor_reading <= 55.4:
aqi_val = map_range(int(pm_sensor_reading), 36, 55, 101, 150)
aqi_cat = "Unhealthy for Sensitive Groups"
elif 55.5 <= pm_sensor_reading <= 150.4:
aqi_val = map_range(int(pm_sensor_reading), 56, 150, 151, 200)
aqi_cat = "Unhealthy"
elif 150.5 <= pm_sensor_reading <= 250.4:
aqi_val = map_range(int(pm_sensor_reading), 151, 250, 201, 300)
aqi_cat = "Very Unhealthy"
elif 250.5 <= pm_sensor_reading <= 350.4:
aqi_val = map_range(int(pm_sensor_reading), 251, 350, 301, 400)
aqi_cat = "Hazardous"
elif 350.5 <= pm_sensor_reading <= 500.4:
aqi_val = map_range(int(pm_sensor_reading), 351, 500, 401, 500)
aqi_cat = "Hazardous"
else:
print("Invalid PM2.5 concentration")
aqi_val = -1
aqi_cat = None
return aqi_val, aqi_cat
def sample_aq_sensor():
"""Samples PM2.5 sensor
over a 2.3 second sample rate.
"""
aq_reading = 0
aq_samples = []
# initial timestamp
time_start = time.monotonic()
# sample pm2.5 sensor over 2.3 sec sample rate
while time.monotonic() - time_start <= 2.3:
try:
aqdata = pm25.read()
aq_samples.append(aqdata["pm25 env"])
except RuntimeError:
print("Unable to read from sensor, retrying...")
continue
# pm sensor output rate of 1s
time.sleep(1)
# average sample reading / # samples
for sample in range(len(aq_samples)):
aq_reading += aq_samples[sample]
aq_reading = aq_reading / len(aq_samples)
aq_samples.clear()
return aq_reading
def read_bme280(is_celsius=False):
"""Returns temperature and humidity
from BME280 environmental sensor, as a tuple.
:param bool is_celsius: Returns temperature in degrees celsius
if True, otherwise fahrenheit.
"""
humid = bme_sensor.humidity
temp = bme_sensor.temperature
if not is_celsius:
temp = temp * 1.8 + 32
return temp, humid
# Create an instance of the Adafruit IO HTTP client
io = IO_HTTP(secrets["aio_user"], secrets["aio_key"], wifi)
# Describes feeds used to hold Adafruit IO data
feed_aqi = io.get_feed("air-quality-sensor.aqi")
feed_aqi_category = io.get_feed("air-quality-sensor.category")
feed_humidity = io.get_feed("air-quality-sensor.humidity")
feed_temperature = io.get_feed("air-quality-sensor.temperature")
# Set up location metadata from secrets.py file
location_metadata = (secrets["latitude"], secrets["longitude"], secrets["elevation"])
elapsed_minutes = 0
prv_mins = 0
while True:
try:
print("Fetching time...")
cur_time = io.receive_time()
print("Time fetched OK!")
# Hourly reset
if cur_time.tm_min == 0:
prv_mins = 0
except (ValueError, RuntimeError) as e:
print("Failed to fetch time, retrying\n", e)
wifi.reset()
wifi.connect()
continue
if cur_time.tm_min >= prv_mins:
print("%d min elapsed.." % elapsed_minutes)
prv_mins = cur_time.tm_min
elapsed_minutes += 1
if elapsed_minutes >= PUBLISH_INTERVAL:
print("Sampling AQI...")
aqi_reading = sample_aq_sensor()
aqi, aqi_category = calculate_aqi(aqi_reading)
print("AQI: %d" % aqi)
print("Category: %s" % aqi_category)
# temp and humidity
print("Sampling environmental sensor...")
temperature, humidity = read_bme280(USE_CELSIUS)
print("Temperature: %0.1f F" % temperature)
print("Humidity: %0.1f %%" % humidity)
# Publish all values to Adafruit IO
print("Publishing to Adafruit IO...")
try:
io.send_data(feed_aqi["key"], str(aqi), location_metadata)
io.send_data(feed_aqi_category["key"], aqi_category)
io.send_data(feed_temperature["key"], str(temperature))
io.send_data(feed_humidity["key"], str(humidity))
print("Published!")
except (ValueError, RuntimeError) as e:
print("Failed to send data to IO, retrying\n", e)
wifi.reset()
wifi.connect()
continue
# Reset timer
elapsed_minutes = 0
time.sleep(30)