From cb8614587413ff309c0c18d889b75a0393b2f820 Mon Sep 17 00:00:00 2001 From: John Park Date: Fri, 25 Sep 2020 09:26:04 -0700 Subject: [PATCH] first commit --- Purple_Air_Display/purple_air_display.py | 120 +++++++++++++++++++++++ 1 file changed, 120 insertions(+) create mode 100644 Purple_Air_Display/purple_air_display.py diff --git a/Purple_Air_Display/purple_air_display.py b/Purple_Air_Display/purple_air_display.py new file mode 100644 index 000000000..91ba84d6f --- /dev/null +++ b/Purple_Air_Display/purple_air_display.py @@ -0,0 +1,120 @@ +# Purple Air AQI Display +# for Metro M4 Airlift with RGB Matrix Shield +# or Matrix Portal +# and 64 x 32 RGB LED Matrix + +import time +import board +import terminalio +from adafruit_matrixportal.matrixportal import MatrixPortal + +def aqi_transform(val): + aqi = pm_to_aqi(val) # derive Air Quality Index from Particulate Matter 2.5 value + return "AQI: %d" % aqi + +def message_transform(val): # picks message based on thresholds + index = aqi_to_list_index(pm_to_aqi(val)) + messages = ( + "Hazardous", + "Very Unhealthy", + "Unhealthy", + "Unhealthy for Sensitive Groups", + "Moderate", + "Good", + ) + + if index is not None: + return messages[index] + return "Unknown" + +SENSOR_ID = 3085 # Poughkeepsie # 30183 LA outdoor / 37823 oregon / 21441 NYC +SENSOR_REFRESH_PERIOD = 30 # seconds +DATA_SOURCE = "https://www.purpleair.com/json?show=" + str(SENSOR_ID) +SCROLL_DELAY = 0.02 +DATA_LOCATION = ["results", 0, "PM2_5Value"] # navigate the JSON response + +# --- Display setup --- +matrixportal = MatrixPortal( + status_neopixel=board.NEOPIXEL, + debug=True, + url=DATA_SOURCE, + json_path=(DATA_LOCATION, DATA_LOCATION), +) + +# Create a static label to show AQI +matrixportal.add_text( + text_font=terminalio.FONT, + text_position=(8, 7), + text_transform=aqi_transform, +) + +# Create a scrolling label to show level message +matrixportal.add_text( + text_font=terminalio.FONT, + text_position=(0, 23), + scrolling=True, + text_transform=message_transform, +) +# pylint: disable=too-many-return-statements +def aqi_to_list_index(aqi): + aqi_groups = (301, 201, 151, 101, 51, 0) + for index, group in enumerate(aqi_groups): + if aqi >= group: + return index + return None + +def calculate_aqi(Cp, Ih, Il, BPh, BPl): # wikipedia.org/wiki/Air_quality_index#Computing_the_AQI + return round(((Ih - Il)/(BPh - BPl)) * (Cp - BPl) + Il) + +def pm_to_aqi(pm): + pm = float(pm) + if pm < 0: + return pm + if pm > 1000: + return 1000 + if pm > 350.5: + return calculate_aqi(pm, 500, 401, 500, 350.5) + elif pm > 250.5: + return calculate_aqi(pm, 400, 301, 350.4, 250.5) + elif pm > 150.5: + return calculate_aqi(pm, 300, 201, 250.4, 150.5) + elif pm > 55.5: + return calculate_aqi(pm, 200, 151, 150.4, 55.5) + elif pm > 35.5: + return calculate_aqi(pm, 150, 101, 55.4, 35.5) + elif pm > 12.1: + return calculate_aqi(pm, 100, 51, 35.4, 12.1) + elif pm >= 0: + return calculate_aqi(pm, 50, 0, 12, 0) + else: + return None + +def get_color(aqi): + index = aqi_to_list_index(aqi) + colors = ( + (115, 20, 37), + (140, 26, 75), + (234, 51, 36), + (239, 133, 51), + (255, 255, 85), + (104, 225, 67), + ) + if index is not None: + return colors[index] + return (150, 150, 150) + +sensor_refresh = None +while True: + # only query the weather every 10 minutes (and on first run) + if (not sensor_refresh) or (time.monotonic() - sensor_refresh) > SENSOR_REFRESH_PERIOD: + try: + value = matrixportal.fetch() + print("Response is", value) + matrixportal.set_text_color(get_color(pm_to_aqi(value[0]))) + sensor_refresh = time.monotonic() + except RuntimeError as e: + print("Some error occured, retrying! -", e) + continue + + # Scroll it + matrixportal.scroll_text(SCROLL_DELAY)