115 lines
3.2 KiB
Python
Executable file
115 lines
3.2 KiB
Python
Executable file
# SPDX-FileCopyrightText: 2017 Scott Shawcroft, written for Adafruit Industries
|
|
# SPDX-FileCopyrightText: Copyright (c) 2021 Melissa LeBlanc-Williams for Adafruit Industries
|
|
#
|
|
# SPDX-License-Identifier: MIT
|
|
|
|
import os
|
|
import time
|
|
import board
|
|
import digitalio
|
|
import analogio
|
|
from displayio import CIRCUITPYTHON_TERMINAL
|
|
from adafruit_display_shapes.circle import Circle
|
|
from adafruit_funhouse import FunHouse
|
|
|
|
BOWL_STATE_TOPIC = "funhouse/catbowl/state"
|
|
LOW_VALUE = 4000
|
|
EMPTY_VALUE = 2000
|
|
UPDATE_INTERVAL = 1800 # Every 30 minutes
|
|
|
|
# Text labels for the Display
|
|
states = {
|
|
"empty": "Add Water",
|
|
"low": "Low",
|
|
"full": "Full",
|
|
}
|
|
|
|
def publish_bowl_state(state):
|
|
funhouse.peripherals.led = True
|
|
# Publish the Bowl Level State
|
|
print("Publishing to {}".format(BOWL_STATE_TOPIC))
|
|
funhouse.network.mqtt_publish(BOWL_STATE_TOPIC, state)
|
|
funhouse.peripherals.led = False
|
|
|
|
def connected(_client, _userdata, _result, _payload):
|
|
status.fill = 0x00FF00
|
|
status.outline = 0x008800
|
|
|
|
def disconnected(_client):
|
|
status.fill = 0xFF0000
|
|
status.outline = 0x880000
|
|
|
|
def get_bowl_reading():
|
|
water_enable.value = True
|
|
level = water_level_sensor.value
|
|
water_enable.value = False
|
|
return level
|
|
|
|
def get_bowl_state(level):
|
|
if level <= EMPTY_VALUE:
|
|
return "empty"
|
|
elif level <= LOW_VALUE:
|
|
return "low"
|
|
return "full"
|
|
|
|
def bowl_level_display(level):
|
|
if funhouse.peripherals.button_sel:
|
|
return level
|
|
return states[get_bowl_state(level)]
|
|
|
|
# Set Initial States
|
|
funhouse = FunHouse(default_bg=0x0F0F00)
|
|
funhouse.peripherals.dotstars.fill(0)
|
|
water_enable = digitalio.DigitalInOut(board.A0)
|
|
water_enable.switch_to_output()
|
|
water_level_sensor = analogio.AnalogIn(board.A1)
|
|
funhouse.display.root_group = CIRCUITPYTHON_TERMINAL
|
|
funhouse.add_text(
|
|
text="Bowl Level:",
|
|
text_position=(120, 60),
|
|
text_anchor_point=(0.5, 0.5),
|
|
text_color=0xFF0000,
|
|
text_font="fonts/Arial-Bold-24.pcf",
|
|
)
|
|
level_label = funhouse.add_text(
|
|
text_position=(120, 100),
|
|
text_anchor_point=(0.5, 0.5),
|
|
text_color=0xFFFF00,
|
|
text_font="fonts/Arial-Bold-24.pcf",
|
|
)
|
|
funhouse.display.root_group = funhouse.splash
|
|
|
|
status = Circle(229, 10, 10, fill=0xFF0000, outline=0x880000)
|
|
funhouse.splash.append(status)
|
|
|
|
# Initialize a new MQTT Client object
|
|
funhouse.network.init_mqtt(
|
|
os.getenv("MQTT_BROKER"),
|
|
os.getenv("MQTT_PORT"),
|
|
os.getenv("MQTT_USERNAME"),
|
|
os.getenv("MQTT_PASSWORD"),
|
|
)
|
|
funhouse.network.on_mqtt_connect = connected
|
|
funhouse.network.on_mqtt_disconnect = disconnected
|
|
|
|
print("Attempting to connect to {}".format(os.getenv("MQTT_BROKER")))
|
|
funhouse.network.mqtt_connect()
|
|
|
|
last_reading_timestamp = None
|
|
last_bowl_state = None
|
|
|
|
while True:
|
|
if (
|
|
last_reading_timestamp is None
|
|
or time.monotonic() > last_reading_timestamp + UPDATE_INTERVAL
|
|
):
|
|
# Take Reading
|
|
water_level = get_bowl_reading()
|
|
# Update Display
|
|
funhouse.set_text(bowl_level_display(water_level), level_label)
|
|
# If changed, publish new result
|
|
bowl_state = get_bowl_state(water_level)
|
|
if bowl_state != last_bowl_state:
|
|
publish_bowl_state(bowl_state)
|
|
last_bowl_state = bowl_state
|
|
last_reading_timestamp = time.monotonic()
|