# SPDX-FileCopyrightText: 2020 Brent Rubell for Adafruit Industries # # SPDX-License-Identifier: MIT import time import board import busio from digitalio import DigitalInOut import neopixel from adafruit_esp32spi import adafruit_esp32spi, adafruit_esp32spi_wifimanager from adafruit_io.adafruit_io import IO_HTTP from simpleio import map_range from adafruit_pm25.uart import PM25_UART # Uncomment below for PMSA003I Air Quality Breakout # from adafruit_pm25.i2c import PM25_I2C import adafruit_bme280 ### Configure Sensor ### # Return environmental sensor readings in degrees Celsius USE_CELSIUS = False # Interval the sensor publishes to Adafruit IO, in minutes PUBLISH_INTERVAL = 10 ### WiFi ### # Get wifi details and more from a secrets.py file try: from secrets import secrets except ImportError: print("WiFi secrets are kept in secrets.py, please add them there!") raise # AirLift FeatherWing esp32_cs = DigitalInOut(board.D13) esp32_reset = DigitalInOut(board.D12) esp32_ready = DigitalInOut(board.D11) spi = busio.SPI(board.SCK, board.MOSI, board.MISO) esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset) status_light = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2) wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light) # Connect to a PM2.5 sensor over UART reset_pin = None uart = busio.UART(board.TX, board.RX, baudrate=9600) pm25 = PM25_UART(uart, reset_pin) # Create i2c object i2c = busio.I2C(board.SCL, board.SDA, frequency=100000) # Connect to a BME280 over I2C bme_sensor = adafruit_bme280.Adafruit_BME280_I2C(i2c) # Uncomment below for PMSA003I Air Quality Breakout # pm25 = PM25_I2C(i2c, reset_pin) # Uncomment below for BME680 # import adafruit_bme680 # bme_sensor = adafruit_bme680.Adafruit_BME680_I2C(i2c) ### Sensor Functions ### def calculate_aqi(pm_sensor_reading): """Returns a calculated air quality index (AQI) and category as a tuple. NOTE: The AQI returned by this function should ideally be measured using the 24-hour concentration average. Calculating a AQI without averaging will result in higher AQI values than expected. :param float pm_sensor_reading: Particulate matter sensor value. """ # Check sensor reading using EPA breakpoint (Clow-Chigh) if 0.0 <= pm_sensor_reading <= 12.0: # AQI calculation using EPA breakpoints (Ilow-IHigh) aqi_val = map_range(int(pm_sensor_reading), 0, 12, 0, 50) aqi_cat = "Good" elif 12.1 <= pm_sensor_reading <= 35.4: aqi_val = map_range(int(pm_sensor_reading), 12, 35, 51, 100) aqi_cat = "Moderate" elif 35.5 <= pm_sensor_reading <= 55.4: aqi_val = map_range(int(pm_sensor_reading), 36, 55, 101, 150) aqi_cat = "Unhealthy for Sensitive Groups" elif 55.5 <= pm_sensor_reading <= 150.4: aqi_val = map_range(int(pm_sensor_reading), 56, 150, 151, 200) aqi_cat = "Unhealthy" elif 150.5 <= pm_sensor_reading <= 250.4: aqi_val = map_range(int(pm_sensor_reading), 151, 250, 201, 300) aqi_cat = "Very Unhealthy" elif 250.5 <= pm_sensor_reading <= 350.4: aqi_val = map_range(int(pm_sensor_reading), 251, 350, 301, 400) aqi_cat = "Hazardous" elif 350.5 <= pm_sensor_reading <= 500.4: aqi_val = map_range(int(pm_sensor_reading), 351, 500, 401, 500) aqi_cat = "Hazardous" else: print("Invalid PM2.5 concentration") aqi_val = -1 aqi_cat = None return aqi_val, aqi_cat def sample_aq_sensor(): """Samples PM2.5 sensor over a 2.3 second sample rate. """ aq_reading = 0 aq_samples = [] # initial timestamp time_start = time.monotonic() # sample pm2.5 sensor over 2.3 sec sample rate while time.monotonic() - time_start <= 2.3: try: aqdata = pm25.read() aq_samples.append(aqdata["pm25 env"]) except RuntimeError: print("Unable to read from sensor, retrying...") continue # pm sensor output rate of 1s time.sleep(1) # average sample reading / # samples for sample in range(len(aq_samples)): aq_reading += aq_samples[sample] aq_reading = aq_reading / len(aq_samples) aq_samples.clear() return aq_reading def read_bme(is_celsius=False): """Returns temperature and humidity from BME280/BME680 environmental sensor, as a tuple. :param bool is_celsius: Returns temperature in degrees celsius if True, otherwise fahrenheit. """ humid = bme_sensor.humidity temp = bme_sensor.temperature if not is_celsius: temp = temp * 1.8 + 32 return temp, humid # Create an instance of the Adafruit IO HTTP client io = IO_HTTP(secrets["aio_user"], secrets["aio_key"], wifi) # Describes feeds used to hold Adafruit IO data feed_aqi = io.get_feed("air-quality-sensor.aqi") feed_aqi_category = io.get_feed("air-quality-sensor.category") feed_humidity = io.get_feed("air-quality-sensor.humidity") feed_temperature = io.get_feed("air-quality-sensor.temperature") # Set up location metadata from secrets.py file location_metadata = { "lat": secrets["latitude"], "lon": secrets["longitude"], "ele": secrets["elevation"], } elapsed_minutes = 0 prv_mins = 0 while True: try: print("Fetching time...") cur_time = io.receive_time() print("Time fetched OK!") # Hourly reset if cur_time.tm_min == 0: prv_mins = 0 except (ValueError, RuntimeError, ConnectionError, OSError) as e: print("Failed to fetch time, retrying\n", e) wifi.reset() wifi.connect() continue if cur_time.tm_min >= prv_mins: print("%d min elapsed.." % elapsed_minutes) prv_mins = cur_time.tm_min elapsed_minutes += 1 if elapsed_minutes >= PUBLISH_INTERVAL: print("Sampling AQI...") aqi_reading = sample_aq_sensor() aqi, aqi_category = calculate_aqi(aqi_reading) print("AQI: %d" % aqi) print("Category: %s" % aqi_category) # temp and humidity print("Sampling environmental sensor...") temperature, humidity = read_bme(USE_CELSIUS) print("Temperature: %0.1f F" % temperature) print("Humidity: %0.1f %%" % humidity) # Publish all values to Adafruit IO print("Publishing to Adafruit IO...") try: io.send_data(feed_aqi["key"], str(aqi), location_metadata) io.send_data(feed_aqi_category["key"], aqi_category) io.send_data(feed_temperature["key"], str(temperature)) io.send_data(feed_humidity["key"], str(humidity)) print("Published!") except (ValueError, RuntimeError, ConnectionError, OSError) as e: print("Failed to send data to IO, retrying\n", e) wifi.reset() wifi.connect() continue # Reset timer elapsed_minutes = 0 time.sleep(30)