317 lines
9.8 KiB
Python
Executable file
317 lines
9.8 KiB
Python
Executable file
import time
|
|
|
|
import board
|
|
import busio
|
|
from digitalio import DigitalInOut
|
|
import adafruit_esp32spi.adafruit_esp32spi_socket as socket
|
|
from adafruit_esp32spi import adafruit_esp32spi, adafruit_esp32spi_wifimanager
|
|
import adafruit_imageload
|
|
import displayio
|
|
import neopixel
|
|
from adafruit_bitmap_font import bitmap_font
|
|
from adafruit_display_text.label import Label
|
|
from adafruit_io.adafruit_io import IO_MQTT
|
|
import adafruit_minimqtt.adafruit_minimqtt as MQTT
|
|
from adafruit_pyportal import PyPortal
|
|
from adafruit_seesaw.seesaw import Seesaw
|
|
from simpleio import map_range
|
|
|
|
#---| User Config |---------------
|
|
|
|
# How often to poll the soil sensor, in seconds
|
|
DELAY_SENSOR = 30
|
|
|
|
# How often to send data to adafruit.io, in minutes
|
|
DELAY_PUBLISH = 5
|
|
|
|
# Maximum soil moisture measurement
|
|
SOIL_LEVEL_MAX = 500.0
|
|
|
|
# Minimum soil moisture measurement
|
|
SOIL_LEVEL_MIN= 350.0
|
|
|
|
#---| End User Config |---------------
|
|
|
|
# Background image
|
|
BACKGROUND = "/images/roots.bmp"
|
|
# Icons for water level and temperature
|
|
ICON_LEVEL = "/images/icon-wetness.bmp"
|
|
ICON_TEMP = "/images/icon-temp.bmp"
|
|
WATER_COLOR = 0x16549E
|
|
|
|
# Audio files
|
|
wav_water_high = "/sounds/water-high.wav"
|
|
wav_water_low = "/sounds/water-low.wav"
|
|
|
|
# the current working directory (where this file is)
|
|
cwd = ("/"+__file__).rsplit('/', 1)[0]
|
|
|
|
# Get wifi details and more from a secrets.py file
|
|
try:
|
|
from secrets import secrets
|
|
except ImportError:
|
|
print("WiFi secrets are kept in secrets.py, please add them there!")
|
|
raise
|
|
|
|
# Set up i2c bus
|
|
i2c_bus = busio.I2C(board.SCL, board.SDA)
|
|
|
|
# Initialize soil sensor (s.s)
|
|
ss = Seesaw(i2c_bus, addr=0x36)
|
|
|
|
# PyPortal ESP32 AirLift Pins
|
|
esp32_cs = DigitalInOut(board.ESP_CS)
|
|
esp32_ready = DigitalInOut(board.ESP_BUSY)
|
|
esp32_reset = DigitalInOut(board.ESP_RESET)
|
|
|
|
spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
|
|
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
|
|
status_light = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2)
|
|
wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light)
|
|
|
|
# Initialize PyPortal Display
|
|
display = board.DISPLAY
|
|
|
|
WIDTH = board.DISPLAY.width
|
|
HEIGHT = board.DISPLAY.height
|
|
|
|
# Initialize new PyPortal object
|
|
pyportal = PyPortal(esp=esp,
|
|
external_spi=spi)
|
|
|
|
# Set backlight level
|
|
pyportal.set_backlight(0.5)
|
|
|
|
# Create a new DisplayIO group
|
|
splash = displayio.Group(max_size=15)
|
|
|
|
# show splash group
|
|
display.show(splash)
|
|
|
|
# Palette for water bitmap
|
|
palette = displayio.Palette(2)
|
|
palette[0] = 0x000000
|
|
palette[1] = WATER_COLOR
|
|
palette.make_transparent(0)
|
|
|
|
# Create water bitmap
|
|
water_bmp = displayio.Bitmap(display.width, display.height, len(palette))
|
|
water = displayio.TileGrid(water_bmp, pixel_shader=palette)
|
|
splash.append(water)
|
|
|
|
print("drawing background..")
|
|
# Load background image
|
|
try:
|
|
bg_bitmap, bg_palette = adafruit_imageload.load(BACKGROUND,
|
|
bitmap=displayio.Bitmap,
|
|
palette=displayio.Palette)
|
|
# Or just use solid color
|
|
except (OSError, TypeError):
|
|
BACKGROUND = BACKGROUND if isinstance(BACKGROUND, int) else 0x000000
|
|
bg_bitmap = displayio.Bitmap(display.width, display.height, 1)
|
|
bg_palette = displayio.Palette(1)
|
|
bg_palette[0] = BACKGROUND
|
|
bg_palette.make_transparent(0)
|
|
background = displayio.TileGrid(bg_bitmap, pixel_shader=bg_palette)
|
|
|
|
# Add background to display
|
|
splash.append(background)
|
|
|
|
print('loading fonts...')
|
|
# Fonts within /fonts/ folder
|
|
font = cwd+"/fonts/GothamBlack-50.bdf"
|
|
font_small = cwd+"/fonts/GothamBlack-25.bdf"
|
|
|
|
# pylint: disable=syntax-error
|
|
data_glyphs = b'0123456789FC-* '
|
|
font = bitmap_font.load_font(font)
|
|
font.load_glyphs(data_glyphs)
|
|
|
|
font_small = bitmap_font.load_font(font_small)
|
|
full_glyphs = b'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-,.: '
|
|
font_small.load_glyphs(full_glyphs)
|
|
|
|
# Label to display Adafruit IO status
|
|
label_status = Label(font_small, max_glyphs=20)
|
|
label_status.x = 305
|
|
label_status.y = 10
|
|
splash.append(label_status)
|
|
|
|
# Create a label to display the temperature
|
|
label_temp = Label(font, max_glyphs=4)
|
|
label_temp.x = 35
|
|
label_temp.y = 300
|
|
splash.append(label_temp)
|
|
|
|
# Create a label to display the water level
|
|
label_level = Label(font, max_glyphs=4)
|
|
label_level.x = display.width - 130
|
|
label_level.y = 300
|
|
splash.append(label_level)
|
|
|
|
print('loading icons...')
|
|
# Load temperature icon
|
|
icon_tmp_bitmap, icon_palette = adafruit_imageload.load(ICON_TEMP,
|
|
bitmap=displayio.Bitmap,
|
|
palette=displayio.Palette)
|
|
icon_palette.make_transparent(0)
|
|
icon_tmp_bitmap = displayio.TileGrid(icon_tmp_bitmap,
|
|
pixel_shader=icon_palette,
|
|
x=0, y=280)
|
|
splash.append(icon_tmp_bitmap)
|
|
|
|
# Load level icon
|
|
icon_lvl_bitmap, icon_palette = adafruit_imageload.load(ICON_LEVEL,
|
|
bitmap=displayio.Bitmap,
|
|
palette=displayio.Palette)
|
|
icon_palette.make_transparent(0)
|
|
icon_lvl_bitmap = displayio.TileGrid(icon_lvl_bitmap,
|
|
pixel_shader=icon_palette,
|
|
x=315, y=280)
|
|
splash.append(icon_lvl_bitmap)
|
|
|
|
# Connect to WiFi
|
|
label_status.text = "Connecting..."
|
|
while not esp.is_connected:
|
|
try:
|
|
wifi.connect()
|
|
except RuntimeError as e:
|
|
print("could not connect to AP, retrying: ",e)
|
|
wifi.reset()
|
|
continue
|
|
print("Connected to WiFi!")
|
|
|
|
# Initialize MQTT interface with the esp interface
|
|
MQTT.set_socket(socket, esp)
|
|
|
|
# Initialize a new MQTT Client object
|
|
mqtt_client = MQTT.MQTT(broker="io.adafruit.com",
|
|
username=secrets["aio_user"],
|
|
password=secrets["aio_key"])
|
|
|
|
# Adafruit IO Callback Methods
|
|
# pylint: disable=unused-argument
|
|
def connected(client):
|
|
# Connected function will be called when the client is connected to Adafruit IO.
|
|
print('Connected to Adafruit IO!')
|
|
|
|
def subscribe(client, userdata, topic, granted_qos):
|
|
# This method is called when the client subscribes to a new feed.
|
|
print('Subscribed to {0} with QOS level {1}'.format(topic, granted_qos))
|
|
|
|
# pylint: disable=unused-argument
|
|
def disconnected(client):
|
|
# Disconnected function will be called if the client disconnects
|
|
# from the Adafruit IO MQTT broker.
|
|
print("Disconnected from Adafruit IO!")
|
|
|
|
# Initialize an Adafruit IO MQTT Client
|
|
io = IO_MQTT(mqtt_client)
|
|
|
|
# Connect the callback methods defined above to the Adafruit IO MQTT Client
|
|
io.on_connect = connected
|
|
io.on_subscribe = subscribe
|
|
io.on_disconnect = disconnected
|
|
|
|
# Connect to Adafruit IO
|
|
print("Connecting to Adafruit IO...")
|
|
io.connect()
|
|
label_status.text = " "
|
|
print("Connected!")
|
|
|
|
fill_val = 0.0
|
|
def fill_water(fill_percent):
|
|
"""Fills the background water.
|
|
:param float fill_percent: Percentage of the display to fill.
|
|
|
|
"""
|
|
assert fill_percent <= 1.0, "Water fill value may not be > 100%"
|
|
# pylint: disable=global-statement
|
|
global fill_val
|
|
|
|
if fill_val > fill_percent:
|
|
for _y in range(int((board.DISPLAY.height-1) - ((board.DISPLAY.height-1)*fill_val)),
|
|
int((board.DISPLAY.height-1) - ((board.DISPLAY.height-1)*fill_percent))):
|
|
for _x in range(1, board.DISPLAY.width-1):
|
|
water_bmp[_x, _y] = 0
|
|
else:
|
|
for _y in range(board.DISPLAY.height-1,
|
|
(board.DISPLAY.height-1) - ((board.DISPLAY.height-1)*fill_percent), -1):
|
|
for _x in range(1, board.DISPLAY.width-1):
|
|
water_bmp[_x, _y] = 1
|
|
fill_val = fill_percent
|
|
|
|
def display_temperature(temp_val, is_celsius=False):
|
|
"""Displays the temperature from the STEMMA soil sensor
|
|
on the PyPortal Titano.
|
|
:param float temp: Temperature value.
|
|
:param bool is_celsius:
|
|
|
|
"""
|
|
if not is_celsius:
|
|
temp_val = (temp_val * 9 / 5) + 32 - 15
|
|
print('Temperature: %0.0fF'%temp_val)
|
|
label_temp.text = '%0.0fF'%temp_val
|
|
return int(temp_val)
|
|
else:
|
|
print('Temperature: %0.0fC'%temp_val)
|
|
label_temp.text = '%0.0fC'%temp_val
|
|
return int(temp_val)
|
|
|
|
# initial reference time
|
|
initial = time.monotonic()
|
|
while True:
|
|
# Explicitly pump the message loop
|
|
# to keep the connection active
|
|
try:
|
|
io.loop()
|
|
except (ValueError, RuntimeError) as e:
|
|
print("Failed to get data, retrying...\n", e)
|
|
wifi.reset()
|
|
continue
|
|
now = time.monotonic()
|
|
|
|
print("reading soil sensor...")
|
|
# Read capactive
|
|
moisture = ss.moisture_read()
|
|
label_level.text = str(moisture)
|
|
|
|
# Convert into percentage for filling the screen
|
|
moisture_percentage = map_range(float(moisture), SOIL_LEVEL_MIN, SOIL_LEVEL_MAX, 0.0, 1.0)
|
|
|
|
# Read temperature
|
|
temp = ss.get_temp()
|
|
temp = display_temperature(temp)
|
|
|
|
# fill display
|
|
print("filling disp..")
|
|
fill_water(moisture_percentage)
|
|
print("disp filled..")
|
|
|
|
print("temp: " + str(temp) + " moisture: " + str(moisture))
|
|
|
|
# Play water level alarms
|
|
if moisture <= SOIL_LEVEL_MIN:
|
|
print("Playing low water level warning...")
|
|
pyportal.play_file(wav_water_low)
|
|
elif moisture >= SOIL_LEVEL_MAX:
|
|
print("Playing high water level warning...")
|
|
pyportal.play_file(wav_water_high)
|
|
|
|
|
|
if now - initial > (DELAY_PUBLISH * 60):
|
|
try:
|
|
print("Publishing data to Adafruit IO...")
|
|
label_status.text = "Sending to IO..."
|
|
io.publish("moisture", moisture)
|
|
io.publish("temperature", temp)
|
|
print("Published")
|
|
label_status.text = "Data Sent!"
|
|
|
|
# reset timer
|
|
initial = now
|
|
except (ValueError, RuntimeError) as e:
|
|
label_status.text = "ERROR!"
|
|
print("Failed to get data, retrying...\n", e)
|
|
wifi.reset()
|
|
time.sleep(DELAY_SENSOR)
|