Adafruit_Learning_System_Gu.../Pi_In_Stock_Notifier/code.py
2025-04-01 07:07:01 -07:00

152 lines
5.5 KiB
Python

# SPDX-FileCopyrightText: 2022 Liz Clark for Adafruit Industries
#
# SPDX-License-Identifier: MIT
from os import getenv
import time
import ssl
import wifi
import terminalio
import socketpool
import displayio
import board
from adafruit_display_text import bitmap_label, wrap_text_to_lines
import adafruit_requests
from digitalio import DigitalInOut, Direction, Pull
from adafruit_debouncer import Debouncer
# Get WiFi details and Adafruit IO keys, ensure these are setup in settings.toml
# (visit io.adafruit.com if you need to create an account, or if you need your Adafruit IO key.)
ssid = getenv("CIRCUITPY_WIFI_SSID")
password = getenv("CIRCUITPY_WIFI_PASSWORD")
aio_username = getenv("ADAFRUIT_AIO_USERNAME")
aio_key = getenv("ADAFRUIT_AIO_KEY")
if None in [ssid, password, aio_username, aio_key]:
raise RuntimeError(
"WiFi and Adafruit IO settings are kept in settings.toml, "
"please add them there. The settings file must contain "
"'CIRCUITPY_WIFI_SSID', 'CIRCUITPY_WIFI_PASSWORD', "
"'ADAFRUIT_AIO_USERNAME' and 'ADAFRUIT_AIO_KEY' at a minimum."
)
alarm_out = DigitalInOut(board.A1)
alarm_out.direction = Direction.OUTPUT
alarm_out.value = False
button_in = DigitalInOut(board.BUTTON) # on-board Boot button on Feather ESP32-S2 TFT
button_in.pull = Pull.UP
button = Debouncer(button_in)
print("Adafruit Raspberry Pi In Stock Tweet Listener")
# import your bearer token
bearer_token = getenv('bearer_token')
# query URL for tweets. looking for hashtag partyparrot sent to a specific username
# disabling line-too-long because queries for tweet_query & TIME_URL cannot have line breaks
# pylint: disable=line-too-long
tweet_query = 'https://api.twitter.com/2/tweets/search/recent?query=In Stock at Adafruit from:rpilocator&tweet.fields=created_at'
headers = {'Authorization': 'Bearer ' + bearer_token}
print(f"Connecting to {ssid}")
wifi.radio.connect(ssid, password)
print(f"Connected to {ssid}!")
print(f"My IP address is {wifi.radio.ipv4_address}")
pool = socketpool.SocketPool(wifi.radio)
requests = adafruit_requests.Session(pool, ssl.create_default_context())
# gets and formats time from adafruit.io
location = getenv("timezone", None)
TIME_URL = "https://io.adafruit.com/api/v2/%s/integrations/time/strftime?x-aio-key=%s" % (aio_username, aio_key)
TIME_URL += "&fmt=%25Y-%25m-%25dT%25H%3A%25M%3A%25S.%25L%25j%25u%25z%25Z"
display = board.DISPLAY
group = displayio.Group()
font = terminalio.FONT
text = ''
text_area = bitmap_label.Label(font, text=text, scale=2, color=0xFFFFFF)
text_area.x = 0
text_area.y = 15
clock = ''
clock_area = bitmap_label.Label(font, text=clock, color=0xFFFFFF)
clock_area.x = 125
clock_area.y = 128
group.append(text_area)
group.append(clock_area)
display.root_group = group
last_value = 0 # checks last tweet's ID
check = 0 # time.monotonic() holder
# fetching current time
print("Fetching text from", TIME_URL)
current_time = requests.get(TIME_URL)
print("-" * 40)
print(current_time.text)
print("-" * 40)
alarm_out.value = False # this starts the alarm so you can test turning it off w the boot button
while True:
button.update()
if button.fell:
print("Alarm off")
alarm_out.value = False
# every 30 seconds...
if (check + 30) < time.monotonic():
# updates current time
current_time = requests.get(TIME_URL)
print(current_time.text)
# get tweets from rpilocator containing in stock at adafruit
the_tweet = requests.request("GET", url=tweet_query, headers=headers)
# gets data portion of json
data = the_tweet.json()['data']
# tweet ID number
value = data[0]['id']
# tweet text
stock_check = data[0]['text']
# timestamp
timestamp = data[0]['created_at']
# reset time count
check = time.monotonic()
# compare last tweet ID and current tweet ID
if last_value != value:
# compares date of tweet timestamp with current date
if timestamp.startswith(current_time.text[0 : 10]):
print("match")
# grabs the hour of the tweet timestamp
tweet_hour = int(timestamp[11:13])
print(tweet_hour)
# grabs the current hour
current_hour = int(current_time.text[11:13])
print(current_hour)
# if it's been less than an hour since the tweet...
if abs(current_hour - tweet_hour) < 1:
print("in the last hour")
# displays tweet text and time on screen
text_area.text = "\n".join(wrap_text_to_lines(stock_check, 21))
print(stock_check)
clock_area.text = timestamp
print("Raspberry Pi in stock at Adafruit!")
alarm_out.value = True
else:
# if it's not new, then the wait continues
no_tweet_text = "No stock in last hour :( Last stock: %s" % (timestamp)
text_area.text = "\n".join(wrap_text_to_lines(no_tweet_text, 21))
print("no new in stock notifications :(")
# updates tweet ID
last_value = value
# if the tweet wasn't today
else:
# if it's not new, then the wait continues
no_tweet_text = "No stock in last hour :( Last stock: %s" % (timestamp)
text_area.text = "\n".join(wrap_text_to_lines(no_tweet_text, 21))
print("no new in stock notifications :(")