Added NeoPixel and Sleep code
This commit is contained in:
parent
23b2a4f779
commit
f3e1d6b81a
4 changed files with 195 additions and 56 deletions
3
Magic_AI_Storybook/bookprompt.txt
Normal file
3
Magic_AI_Storybook/bookprompt.txt
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
Write a complete story with a title and a body of approximately
|
||||
{STORY_WORD_LENGTH} words long and a happy ending. The specific
|
||||
story request is "{STORY_REQUEST}".
|
||||
2
Magic_AI_Storybook/keys.txt
Normal file
2
Magic_AI_Storybook/keys.txt
Normal file
|
|
@ -0,0 +1,2 @@
|
|||
[openai]
|
||||
OPENAI_API_KEY = sk-...
|
||||
|
|
@ -24,14 +24,42 @@ class Listener:
|
|||
self.data_queue = Queue()
|
||||
self.mic_dev_index = None
|
||||
|
||||
def listen(self):
|
||||
def listen(self, ready_callback=None):
|
||||
self.phrase_complete = False
|
||||
start = datetime.utcnow()
|
||||
self.start_listening()
|
||||
if ready_callback:
|
||||
ready_callback()
|
||||
while (
|
||||
self.listener_handle
|
||||
and not self.speech_waiting()
|
||||
or not self.phrase_complete
|
||||
):
|
||||
if self.phrase_time and start - self.phrase_time > timedelta(
|
||||
seconds=self.phrase_timeout
|
||||
):
|
||||
self.last_sample = bytes()
|
||||
self.phrase_complete = True
|
||||
self.phrase_time = start
|
||||
self.stop_listening()
|
||||
|
||||
def start_listening(self):
|
||||
if not self.listener_handle:
|
||||
with sr.Microphone() as source:
|
||||
print(source.stream)
|
||||
self.recognizer.adjust_for_ambient_noise(source)
|
||||
audio = self.recognizer.listen(source, timeout=self.record_timeout)
|
||||
data = audio.get_raw_data()
|
||||
self.data_queue.put(data)
|
||||
self.listener_handle = self.recognizer.listen_in_background(
|
||||
sr.Microphone(),
|
||||
self.record_callback,
|
||||
phrase_time_limit=self.record_timeout,
|
||||
)
|
||||
|
||||
def stop_listening(self, wait_for_stop=False):
|
||||
if self.listener_handle:
|
||||
self.listener_handle(wait_for_stop=wait_for_stop)
|
||||
self.listener_handle = None
|
||||
|
||||
def is_listening(self):
|
||||
return self.listener_handle is not None
|
||||
|
||||
def record_callback(self, _, audio: sr.AudioData) -> None:
|
||||
# Grab the raw bytes and push it into the thread safe queue.
|
||||
|
|
|
|||
|
|
@ -9,19 +9,34 @@ import time
|
|||
import argparse
|
||||
import math
|
||||
import pickle
|
||||
import configparser
|
||||
from enum import Enum
|
||||
from tempfile import NamedTemporaryFile
|
||||
|
||||
import board
|
||||
import digitalio
|
||||
import neopixel
|
||||
import openai
|
||||
import pygame
|
||||
from rpi_backlight import Backlight
|
||||
from adafruit_led_animation.animation.pulse import Pulse
|
||||
|
||||
from listener import Listener
|
||||
|
||||
STORY_WORD_LENGTH = 800
|
||||
REED_SWITCH_PIN = board.D17
|
||||
NEOPIXEL_PIN = board.D18
|
||||
API_KEYS_FILE = "/home/pi/keys.txt"
|
||||
PROMPT_FILE = "/boot/bookprompt.txt"
|
||||
|
||||
# Neopixel Settings
|
||||
NEOPIXEL_COUNT = 10
|
||||
NEOPIXEL_BRIGHTNESS = 0.2
|
||||
NEOPIXEL_ORDER = neopixel.GRBW
|
||||
NEOPIXEL_SLEEP_COLOR = (0, 0, 255, 0)
|
||||
NEOPIXEL_WAITING_COLOR = (255, 255, 0, 0)
|
||||
NEOPIXEL_READY_COLOR = (0, 255, 0, 0)
|
||||
NEOPIXEL_PULSE_SPEED = 0.1
|
||||
|
||||
# Image Names
|
||||
WELCOME_IMAGE = "welcome.png"
|
||||
|
|
@ -66,11 +81,25 @@ ENERGY_THRESHOLD = 1000 # Energy level for mic to detect
|
|||
PHRASE_TIMEOUT = 3.0 # Space between recordings for sepating phrases
|
||||
RECORD_TIMEOUT = 30
|
||||
|
||||
# Import keys from environment variables
|
||||
openai.api_key = os.environ.get("OPENAI_API_KEY")
|
||||
# Do some checks and Import API keys from API_KEYS_FILE
|
||||
config = configparser.ConfigParser()
|
||||
config.read(os.path.expanduser(API_KEYS_FILE))
|
||||
if not config.has_section("openai"):
|
||||
print("Please make sure API_KEYS_FILE points to a valid file.")
|
||||
sys.exit(1)
|
||||
if "OPENAI_API_KEY" not in config["openai"]:
|
||||
print(
|
||||
"Please make sure your API keys file contains an OPENAI_API_KEY under the openai section."
|
||||
)
|
||||
sys.exit(1)
|
||||
if len(config["openai"]["OPENAI_API_KEY"]) < 10:
|
||||
print("Please set OPENAI_API_KEY in your API keys file with a valid key.")
|
||||
sys.exit(1)
|
||||
openai.api_key = config["openai"]["OPENAI_API_KEY"]
|
||||
|
||||
if openai.api_key is None:
|
||||
print("Please set the OPENAI_API_KEY environment variable first.")
|
||||
# Check that the prompt file exists and load it
|
||||
if not os.path.isfile(PROMPT_FILE):
|
||||
print("Please make sure PROMPT_FILE points to a valid file.")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
|
|
@ -145,28 +174,46 @@ class Book:
|
|||
self.textarea = None
|
||||
self.screen = None
|
||||
self.saved_screen = None
|
||||
self.sleeping = False
|
||||
self._sleeping = False
|
||||
self.sleep_check_delay = 0.1
|
||||
self._sleep_check_thread = None
|
||||
self.running = True
|
||||
self._sleep_request = False
|
||||
self._running = True
|
||||
self._busy = False
|
||||
# Use a cursor to keep track of where we are in the text area
|
||||
self.cursor = {"x": 0, "y": 0}
|
||||
self.listener = Listener(ENERGY_THRESHOLD, PHRASE_TIMEOUT, RECORD_TIMEOUT)
|
||||
self.backlight = Backlight()
|
||||
self.pixels = neopixel.NeoPixel(
|
||||
NEOPIXEL_PIN,
|
||||
NEOPIXEL_COUNT,
|
||||
brightness=NEOPIXEL_BRIGHTNESS,
|
||||
pixel_order=NEOPIXEL_ORDER,
|
||||
auto_write=False,
|
||||
)
|
||||
self._prompt = ""
|
||||
# Load the prompt file
|
||||
with open(PROMPT_FILE, "r") as f:
|
||||
self._prompt = f.read()
|
||||
|
||||
def init(self):
|
||||
def start(self):
|
||||
# Output to the LCD instead of the console
|
||||
os.putenv("DISPLAY", ":0")
|
||||
|
||||
# Initialize the display
|
||||
pygame.init()
|
||||
self.screen = pygame.display.set_mode((0, 0), pygame.FULLSCREEN)
|
||||
pygame.display.set_allow_screensaver(False)
|
||||
pygame.mouse.set_visible(False)
|
||||
self.screen.fill((255, 255, 255))
|
||||
self.width = self.screen.get_height()
|
||||
self.height = self.screen.get_width()
|
||||
|
||||
# Preload images
|
||||
# Preload welcome image and display it
|
||||
self._load_image("welcome", WELCOME_IMAGE)
|
||||
self.display_welcome()
|
||||
start_time = time.monotonic()
|
||||
|
||||
# Preload remaining images
|
||||
self._load_image("background", BACKGROUND_IMAGE)
|
||||
self._load_image("loading", LOADING_IMAGE)
|
||||
|
||||
|
|
@ -208,13 +255,21 @@ class Book:
|
|||
self.height - PAGE_NAV_HEIGHT - PAGE_TOP_MARGIN - PAGE_BOTTOM_MARGIN,
|
||||
)
|
||||
|
||||
pygame.mouse.set_visible(False)
|
||||
self.screen.fill((255, 255, 255))
|
||||
self.load_settings()
|
||||
|
||||
# Start the sleep check thread after everything is initialized
|
||||
self._sleep_check_thread = threading.Thread(target=self._handle_sleep)
|
||||
self._sleep_check_thread.start()
|
||||
|
||||
# Light the neopixels to indicate the book is ready
|
||||
self.pixels.fill(NEOPIXEL_READY_COLOR)
|
||||
self.pixels.show()
|
||||
|
||||
# Continue showing the image until the minimum amount of time has passed
|
||||
time.sleep(max(0, WELCOME_IMAGE_DELAY - (time.monotonic() - start_time)))
|
||||
|
||||
def deinit(self):
|
||||
self.running = False
|
||||
self._running = False
|
||||
self._sleep_check_thread.join()
|
||||
self.backlight.power = True
|
||||
|
||||
|
|
@ -222,21 +277,34 @@ class Book:
|
|||
reed_switch = digitalio.DigitalInOut(REED_SWITCH_PIN)
|
||||
reed_switch.direction = digitalio.Direction.INPUT
|
||||
reed_switch.pull = digitalio.Pull.UP
|
||||
pulse = Pulse(
|
||||
self.pixels,
|
||||
speed=NEOPIXEL_PULSE_SPEED,
|
||||
color=NEOPIXEL_SLEEP_COLOR,
|
||||
period=3,
|
||||
)
|
||||
|
||||
while self.running:
|
||||
if self.sleeping and reed_switch.value: # Book Open
|
||||
self.wake()
|
||||
elif not self.sleeping and not reed_switch.value: # Book Closed
|
||||
self.sleep()
|
||||
while self._running:
|
||||
print(self._busy, self._sleeping, reed_switch.value)
|
||||
if self._sleeping and reed_switch.value: # Book Open
|
||||
self._wake()
|
||||
elif (
|
||||
not self._busy and not self._sleeping and not reed_switch.value
|
||||
): # Book Closed
|
||||
self._sleep()
|
||||
|
||||
if self._sleeping:
|
||||
pulse.animate()
|
||||
time.sleep(self.sleep_check_delay)
|
||||
|
||||
def handle_events(self):
|
||||
if not self.sleeping:
|
||||
if not self._sleeping:
|
||||
for event in pygame.event.get():
|
||||
if event.type == pygame.QUIT:
|
||||
raise SystemExit
|
||||
if event.type == pygame.MOUSEBUTTONDOWN:
|
||||
self._handle_mousedown_event(event)
|
||||
time.sleep(0.1)
|
||||
|
||||
def _handle_mousedown_event(self, event):
|
||||
if event.button == 1:
|
||||
|
|
@ -298,6 +366,7 @@ class Book:
|
|||
pygame.time.wait(fade_delay)
|
||||
|
||||
def display_current_page(self):
|
||||
self._busy = True
|
||||
self._display_surface(self.images["background"], 0, 0)
|
||||
pygame.display.update()
|
||||
|
||||
|
|
@ -320,6 +389,7 @@ class Book:
|
|||
self.back_button.show()
|
||||
self.next_button.show()
|
||||
pygame.display.update()
|
||||
self._busy = False
|
||||
|
||||
@staticmethod
|
||||
def _create_transparent_buffer(size):
|
||||
|
|
@ -406,12 +476,15 @@ class Book:
|
|||
pygame.display.update()
|
||||
|
||||
def display_message(self, message):
|
||||
self._busy = True
|
||||
self._display_surface(self.images["background"], 0, 0)
|
||||
height = self._title_text_height(message)
|
||||
self._display_title_text(message, self.height // 2 - height // 2)
|
||||
self._busy = False
|
||||
|
||||
def load_story(self, story):
|
||||
# Parse out the title and story and render into pages
|
||||
self._busy = True
|
||||
self.pages = []
|
||||
title = story.split("Title: ")[1].split("\n\n")[0]
|
||||
page = self._add_page(title)
|
||||
|
|
@ -436,6 +509,7 @@ class Book:
|
|||
if self.cursor["y"] > 0:
|
||||
self.cursor["y"] += PARAGRAPH_SPACING
|
||||
print(f"Loaded story at index {self.story} with {len(self.pages)} pages")
|
||||
self._busy = False
|
||||
|
||||
def _add_page(self, title=None):
|
||||
page = {
|
||||
|
|
@ -490,36 +564,67 @@ class Book:
|
|||
pickle.dump(storydata, f)
|
||||
|
||||
def new_story(self):
|
||||
self._busy = True
|
||||
self.display_message("What story would you like to hear today?")
|
||||
|
||||
while not self.listener.speech_waiting():
|
||||
self.listener.listen()
|
||||
if self._sleep_request:
|
||||
self._busy = False
|
||||
return
|
||||
|
||||
def show_waiting():
|
||||
# Pause for a beat because the listener doesn't
|
||||
# immediately start listening sometimes
|
||||
time.sleep(2)
|
||||
self.pixels.fill(NEOPIXEL_WAITING_COLOR)
|
||||
self.pixels.show()
|
||||
|
||||
self.listener.listen(ready_callback=show_waiting)
|
||||
self.pixels.fill(NEOPIXEL_READY_COLOR)
|
||||
self.pixels.show()
|
||||
if self._sleep_request:
|
||||
self._busy = False
|
||||
return
|
||||
|
||||
if not self.listener.speech_waiting():
|
||||
# No response from user, so return
|
||||
return
|
||||
|
||||
audio_data = self.listener.get_audio_data()
|
||||
|
||||
story_request = self._transcribe(audio_data.get_wav_data())
|
||||
|
||||
story_request = self._transcribe(self.listener.get_speech())
|
||||
story_prompt = self._make_story_prompt(story_request)
|
||||
|
||||
print("Getting new response. This may take a minute or two...")
|
||||
self.display_loading()
|
||||
response = self._sendchat(story_prompt)
|
||||
with open(os.path.dirname(sys.argv[0]) + "response.txt", "w") as f:
|
||||
f.write(response)
|
||||
if self._sleep_request:
|
||||
self._busy = False
|
||||
return
|
||||
print(response)
|
||||
|
||||
self._busy = True
|
||||
self.stories.append(response)
|
||||
self.story = len(self.stories) - 1
|
||||
self.page = 0
|
||||
self.save_settings()
|
||||
|
||||
self.load_story(response)
|
||||
self._busy = False
|
||||
|
||||
def sleep(self):
|
||||
self.sleeping = True
|
||||
self.sleep_check_delay = 1
|
||||
def _sleep(self):
|
||||
# Set a sleep request flag so that any busy threads know to finish up
|
||||
self._sleep_request = True
|
||||
self.listener.stop_listening()
|
||||
while self._busy:
|
||||
time.sleep(0.1)
|
||||
self._sleep_request = False
|
||||
self._sleeping = True
|
||||
self.sleep_check_delay = 0
|
||||
self.saved_screen = self.screen.copy()
|
||||
self.screen.fill((0, 0, 0))
|
||||
pygame.display.update()
|
||||
self.backlight.power = False
|
||||
|
||||
def wake(self):
|
||||
def _wake(self):
|
||||
# Turn on the screen
|
||||
self.backlight.power = True
|
||||
if self.saved_screen:
|
||||
|
|
@ -527,14 +632,13 @@ class Book:
|
|||
pygame.display.update()
|
||||
self.saved_screen = None
|
||||
self.sleep_check_delay = 0.1
|
||||
self.sleeping = False
|
||||
self.pixels.fill(NEOPIXEL_READY_COLOR)
|
||||
self.pixels.show()
|
||||
self._sleeping = False
|
||||
|
||||
@staticmethod
|
||||
def _make_story_prompt(request):
|
||||
return (
|
||||
f"Write a complete story with a title and a body of approximately "
|
||||
f"{STORY_WORD_LENGTH} words long and a happy ending. The specific "
|
||||
f'story request is "{request}". '
|
||||
def _make_story_prompt(self, request):
|
||||
return self._prompt.format(
|
||||
STORY_WORD_LENGTH=STORY_WORD_LENGTH, STORY_REQUEST=request
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
|
|
@ -554,8 +658,9 @@ class Book:
|
|||
attempts += 1
|
||||
return "I wasn't able to understand you. Please repeat that."
|
||||
|
||||
@staticmethod
|
||||
def _sendchat(prompt):
|
||||
def _sendchat(self, prompt):
|
||||
response = ""
|
||||
print("Sending to chatGPT")
|
||||
# Package up the text to send to ChatGPT
|
||||
completion = openai.ChatCompletion.create(
|
||||
model=CHATGPT_MODEL,
|
||||
|
|
@ -563,9 +668,17 @@ class Book:
|
|||
{"role": "system", "content": SYSTEM_ROLE},
|
||||
{"role": "user", "content": prompt},
|
||||
],
|
||||
stream=True,
|
||||
)
|
||||
|
||||
for chunk in completion:
|
||||
if "delta" in chunk.choices[0] and "content" in chunk.choices[0]["delta"]:
|
||||
response += chunk.choices[0]["delta"]["content"]
|
||||
if self._sleep_request:
|
||||
return None
|
||||
|
||||
# Send the heard text to ChatGPT and return the result
|
||||
return completion.choices[0].message.content
|
||||
return response # completion.choices[0].message.content
|
||||
|
||||
|
||||
def parse_args():
|
||||
|
|
@ -585,35 +698,28 @@ def parse_args():
|
|||
|
||||
def main(args):
|
||||
book = Book(args.rotation)
|
||||
book.init()
|
||||
|
||||
try:
|
||||
# Center and display the image
|
||||
book.display_welcome()
|
||||
start_time = time.monotonic()
|
||||
book.load_settings()
|
||||
|
||||
# Continue showing the image until the minimum amount of time has passed
|
||||
time.sleep(max(0, WELCOME_IMAGE_DELAY - (time.monotonic() - start_time)))
|
||||
book.start()
|
||||
|
||||
# If no stories, start a new one
|
||||
if not book.stories:
|
||||
book.new_story()
|
||||
|
||||
book.display_current_page()
|
||||
|
||||
while True:
|
||||
if not book.sleeping:
|
||||
book.handle_events()
|
||||
time.sleep(0.1)
|
||||
book.handle_events()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
book.save_settings()
|
||||
book.deinit()
|
||||
pygame.quit()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main(parse_args())
|
||||
|
||||
# TODO:
|
||||
# * Figure out how to get the script to start on boot
|
||||
# * Play with prompt parameters
|
||||
|
|
|
|||
Loading…
Reference in a new issue