Adafruit_Learning_System_Gu.../FunHouse_Motion_Outlet/code.py
2023-11-09 08:27:52 -06:00

116 lines
3.6 KiB
Python
Executable file

# SPDX-FileCopyrightText: 2017 Scott Shawcroft, written for Adafruit Industries
# SPDX-FileCopyrightText: Copyright (c) 2021 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: MIT
import os
import time
import board
import digitalio
from displayio import CIRCUITPYTHON_TERMINAL
from adafruit_display_shapes.circle import Circle
from adafruit_funhouse import FunHouse
OUTLET_STATE_TOPIC = "funhouse/outlet/state"
OUTLET_COMMAND_TOPIC = "funhouse/outlet/set"
MOTION_TIMEOUT = 300 # Timeout in seconds
USE_MQTT = True
# Use dict to avoid reassigning the variable
timestamps = {
"last_pir": None
}
def set_outlet_state(value):
if value:
funhouse.peripherals.dotstars.fill(0x00FF00)
timestamps["last_pir"] = time.monotonic()
else:
funhouse.peripherals.dotstars.fill(0xFF0000)
timestamps["last_pir"] = time.monotonic() - MOTION_TIMEOUT
outlet.value = value
publish_outlet_state()
def publish_outlet_state():
if USE_MQTT:
funhouse.peripherals.led = True
output = "on" if outlet.value else "off"
# Publish the Dotstar State
print("Publishing to {}".format(OUTLET_STATE_TOPIC))
funhouse.network.mqtt_publish(OUTLET_STATE_TOPIC, output)
funhouse.peripherals.led = False
def connected(client, _userdata, _result, _payload):
status.fill = 0x00FF00
status.outline = 0x008800
print("Connected to MQTT! Subscribing...")
client.subscribe(OUTLET_COMMAND_TOPIC)
def disconnected(_client):
status.fill = 0xFF0000
status.outline = 0x880000
def message(_client, topic, payload):
print("Topic {0} received new value: {1}".format(topic, payload))
if topic == OUTLET_COMMAND_TOPIC:
set_outlet_state(payload == "on")
def timeleft():
seconds = int(timestamps["last_pir"] + MOTION_TIMEOUT - time.monotonic())
if outlet.value and seconds >= 0:
minutes = seconds // 60
seconds -= minutes * 60
return "{:01}:{:02}".format(minutes, seconds)
return "Off"
# Set Initial States
funhouse = FunHouse(default_bg=0x0F0F00)
funhouse.peripherals.dotstars.fill(0)
outlet = digitalio.DigitalInOut(board.A0)
outlet.direction = digitalio.Direction.OUTPUT
funhouse.display.root_group = CIRCUITPYTHON_TERMINAL
funhouse.add_text(
text="Timeout Left:",
text_position=(20, 60),
text_color=0xFF0000,
text_font="fonts/Arial-Bold-24.pcf",
)
countdown_label = funhouse.add_text(
text_position=(120, 100),
text_anchor_point=(0.5, 0.5),
text_color=0xFFFF00,
text_font="fonts/Arial-Bold-24.pcf",
)
funhouse.display.root_group = funhouse.splash
status = Circle(229, 10, 10, fill=0xFF0000, outline=0x880000)
funhouse.splash.append(status)
# Initialize a new MQTT Client object
if USE_MQTT:
funhouse.network.init_mqtt(
os.getenv("MQTT_BROKER"),
os.getenv("MQTT_PORT"),
os.getenv("MQTT_USERNAME"),
os.getenv("MQTT_PASSWORD"),
)
funhouse.network.on_mqtt_connect = connected
funhouse.network.on_mqtt_disconnect = disconnected
funhouse.network.on_mqtt_message = message
print("Attempting to connect to {}".format(os.getenv("MQTT_BROKER")))
funhouse.network.mqtt_connect()
set_outlet_state(False)
while True:
if funhouse.peripherals.pir_sensor:
timestamps["last_pir"] = time.monotonic()
if not outlet.value:
set_outlet_state(True)
if outlet.value and time.monotonic() >= timestamps["last_pir"] + MOTION_TIMEOUT:
set_outlet_state(False)
funhouse.set_text(timeleft(), countdown_label)
# Check any topics we are subscribed to
if USE_MQTT:
funhouse.network.mqtt_loop(0.5)