115 lines
3.5 KiB
Python
115 lines
3.5 KiB
Python
# SPDX-FileCopyrightText: 2024 johnpark for Adafruit Industries
|
|
#
|
|
# SPDX-License-Identifier: MIT
|
|
'''
|
|
BLE BBQ Thermometer to WiFi to Adafruit IO Dashboard
|
|
Feather ESP32-S3 8MB No PSRAM
|
|
'''
|
|
import os
|
|
import time
|
|
import adafruit_connection_manager
|
|
import wifi
|
|
import adafruit_minimqtt.adafruit_minimqtt as MQTT
|
|
import adafruit_ble
|
|
from adafruit_ble.advertising.standard import ProvideServicesAdvertisement
|
|
from adafruit_ble_ibbq import IBBQService
|
|
|
|
|
|
aio_username = os.getenv("aio_username")
|
|
aio_key = os.getenv("aio_key")
|
|
|
|
print(f"Connecting to {os.getenv('CIRCUITPY_WIFI_SSID')}")
|
|
wifi.radio.connect(
|
|
os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv("CIRCUITPY_WIFI_PASSWORD")
|
|
)
|
|
print(f"Connected to {os.getenv('CIRCUITPY_WIFI_SSID')}")
|
|
|
|
### Feeds ###
|
|
feeds = [aio_username + f"/feeds/bbq{i}" for i in range(1, 7)]
|
|
battery_feed = aio_username + "/feeds/bbq_battery"
|
|
|
|
# Define callback methods which are called when events occur
|
|
# pylint: disable=unused-argument, redefined-outer-name
|
|
def connected(client, userdata, flags, rc):
|
|
print("Connected to Adafruit IO")
|
|
|
|
def disconnected(client, userdata, rc):
|
|
print("Disconnected from Adafruit IO")
|
|
|
|
# Create a socket pool
|
|
pool = adafruit_connection_manager.get_radio_socketpool(wifi.radio)
|
|
ssl_context = adafruit_connection_manager.get_radio_ssl_context(wifi.radio)
|
|
connection_manager = adafruit_connection_manager.get_connection_manager(pool)
|
|
|
|
# Set up a MiniMQTT Client
|
|
mqtt_client = MQTT.MQTT(
|
|
broker="io.adafruit.com",
|
|
port=1883,
|
|
username=aio_username,
|
|
password=aio_key,
|
|
socket_pool=pool,
|
|
ssl_context=ssl_context,
|
|
)
|
|
|
|
# Setup the callback methods above
|
|
mqtt_client.on_connect = connected
|
|
mqtt_client.on_disconnect = disconnected
|
|
|
|
# Connect the client to the MQTT broker.
|
|
print("Connecting to Adafruit IO...")
|
|
mqtt_client.connect()
|
|
|
|
# PyLint can't find BLERadio for some reason so special case it here.
|
|
ble = adafruit_ble.BLERadio() # pylint: disable=no-member
|
|
|
|
ibbq_connection = None
|
|
battery_percentage = 100
|
|
|
|
def c_to_f(temp_c):
|
|
return (temp_c * 9/5) + 32
|
|
|
|
def volt_to_percent(voltage, max_voltage):
|
|
return (voltage / max_voltage) * 100
|
|
|
|
def probe_check(temp): # if value is wildly high no probe is connected
|
|
return temp if temp <= 11000 else 0
|
|
|
|
battery_val = 3.3
|
|
|
|
|
|
while True:
|
|
print("Scanning...")
|
|
for adv in ble.start_scan(ProvideServicesAdvertisement, timeout=5):
|
|
if IBBQService in adv.services:
|
|
print("found an IBBq advertisement")
|
|
ibbq_connection = ble.connect(adv)
|
|
print("Connected")
|
|
break
|
|
|
|
# Stop scanning whether or not we are connected.
|
|
ble.stop_scan()
|
|
|
|
if ibbq_connection and ibbq_connection.connected:
|
|
ibbq_service = ibbq_connection[IBBQService]
|
|
ibbq_service.init()
|
|
while ibbq_connection.connected:
|
|
print(
|
|
"Temperatures:",
|
|
ibbq_service.temperatures,
|
|
"; Battery:",
|
|
ibbq_service.battery_level,
|
|
)
|
|
|
|
grill_vals = [probe_check(c_to_f(temp)) for temp in ibbq_service.temperatures]
|
|
battery_val, battery_max = ibbq_service.battery_level
|
|
battery_percentage = (volt_to_percent(battery_val, 3.3))
|
|
|
|
mqtt_client.loop(timeout=1)
|
|
|
|
for feed, val in zip(feeds, grill_vals):
|
|
print(f"Sending grill value: {val} to {feed}...")
|
|
mqtt_client.publish(feed, val)
|
|
|
|
mqtt_client.publish(battery_feed, battery_percentage)
|
|
print("Sent")
|
|
time.sleep(5)
|