From f3e1d6b81a6fd05217312136fc063ebd5edba0d4 Mon Sep 17 00:00:00 2001 From: Melissa LeBlanc-Williams Date: Wed, 19 Apr 2023 14:43:58 -0700 Subject: [PATCH] Added NeoPixel and Sleep code --- Magic_AI_Storybook/bookprompt.txt | 3 + Magic_AI_Storybook/keys.txt | 2 + Magic_AI_Storybook/listener.py | 38 +++++- Magic_AI_Storybook/story.py | 208 ++++++++++++++++++++++-------- 4 files changed, 195 insertions(+), 56 deletions(-) create mode 100644 Magic_AI_Storybook/bookprompt.txt create mode 100644 Magic_AI_Storybook/keys.txt diff --git a/Magic_AI_Storybook/bookprompt.txt b/Magic_AI_Storybook/bookprompt.txt new file mode 100644 index 000000000..c40ed5213 --- /dev/null +++ b/Magic_AI_Storybook/bookprompt.txt @@ -0,0 +1,3 @@ +Write a complete story with a title and a body of approximately +{STORY_WORD_LENGTH} words long and a happy ending. The specific +story request is "{STORY_REQUEST}". \ No newline at end of file diff --git a/Magic_AI_Storybook/keys.txt b/Magic_AI_Storybook/keys.txt new file mode 100644 index 000000000..41a609c13 --- /dev/null +++ b/Magic_AI_Storybook/keys.txt @@ -0,0 +1,2 @@ +[openai] +OPENAI_API_KEY = sk-... diff --git a/Magic_AI_Storybook/listener.py b/Magic_AI_Storybook/listener.py index f287d0314..40b9546db 100644 --- a/Magic_AI_Storybook/listener.py +++ b/Magic_AI_Storybook/listener.py @@ -24,14 +24,42 @@ class Listener: self.data_queue = Queue() self.mic_dev_index = None - def listen(self): + def listen(self, ready_callback=None): + self.phrase_complete = False + start = datetime.utcnow() + self.start_listening() + if ready_callback: + ready_callback() + while ( + self.listener_handle + and not self.speech_waiting() + or not self.phrase_complete + ): + if self.phrase_time and start - self.phrase_time > timedelta( + seconds=self.phrase_timeout + ): + self.last_sample = bytes() + self.phrase_complete = True + self.phrase_time = start + self.stop_listening() + + def start_listening(self): if not self.listener_handle: with sr.Microphone() as source: - print(source.stream) self.recognizer.adjust_for_ambient_noise(source) - audio = self.recognizer.listen(source, timeout=self.record_timeout) - data = audio.get_raw_data() - self.data_queue.put(data) + self.listener_handle = self.recognizer.listen_in_background( + sr.Microphone(), + self.record_callback, + phrase_time_limit=self.record_timeout, + ) + + def stop_listening(self, wait_for_stop=False): + if self.listener_handle: + self.listener_handle(wait_for_stop=wait_for_stop) + self.listener_handle = None + + def is_listening(self): + return self.listener_handle is not None def record_callback(self, _, audio: sr.AudioData) -> None: # Grab the raw bytes and push it into the thread safe queue. diff --git a/Magic_AI_Storybook/story.py b/Magic_AI_Storybook/story.py index 492ea8f0a..c4f932bec 100644 --- a/Magic_AI_Storybook/story.py +++ b/Magic_AI_Storybook/story.py @@ -9,19 +9,34 @@ import time import argparse import math import pickle +import configparser from enum import Enum from tempfile import NamedTemporaryFile import board import digitalio +import neopixel import openai import pygame from rpi_backlight import Backlight +from adafruit_led_animation.animation.pulse import Pulse from listener import Listener STORY_WORD_LENGTH = 800 REED_SWITCH_PIN = board.D17 +NEOPIXEL_PIN = board.D18 +API_KEYS_FILE = "/home/pi/keys.txt" +PROMPT_FILE = "/boot/bookprompt.txt" + +# Neopixel Settings +NEOPIXEL_COUNT = 10 +NEOPIXEL_BRIGHTNESS = 0.2 +NEOPIXEL_ORDER = neopixel.GRBW +NEOPIXEL_SLEEP_COLOR = (0, 0, 255, 0) +NEOPIXEL_WAITING_COLOR = (255, 255, 0, 0) +NEOPIXEL_READY_COLOR = (0, 255, 0, 0) +NEOPIXEL_PULSE_SPEED = 0.1 # Image Names WELCOME_IMAGE = "welcome.png" @@ -66,11 +81,25 @@ ENERGY_THRESHOLD = 1000 # Energy level for mic to detect PHRASE_TIMEOUT = 3.0 # Space between recordings for sepating phrases RECORD_TIMEOUT = 30 -# Import keys from environment variables -openai.api_key = os.environ.get("OPENAI_API_KEY") +# Do some checks and Import API keys from API_KEYS_FILE +config = configparser.ConfigParser() +config.read(os.path.expanduser(API_KEYS_FILE)) +if not config.has_section("openai"): + print("Please make sure API_KEYS_FILE points to a valid file.") + sys.exit(1) +if "OPENAI_API_KEY" not in config["openai"]: + print( + "Please make sure your API keys file contains an OPENAI_API_KEY under the openai section." + ) + sys.exit(1) +if len(config["openai"]["OPENAI_API_KEY"]) < 10: + print("Please set OPENAI_API_KEY in your API keys file with a valid key.") + sys.exit(1) +openai.api_key = config["openai"]["OPENAI_API_KEY"] -if openai.api_key is None: - print("Please set the OPENAI_API_KEY environment variable first.") +# Check that the prompt file exists and load it +if not os.path.isfile(PROMPT_FILE): + print("Please make sure PROMPT_FILE points to a valid file.") sys.exit(1) @@ -145,28 +174,46 @@ class Book: self.textarea = None self.screen = None self.saved_screen = None - self.sleeping = False + self._sleeping = False self.sleep_check_delay = 0.1 self._sleep_check_thread = None - self.running = True + self._sleep_request = False + self._running = True + self._busy = False # Use a cursor to keep track of where we are in the text area self.cursor = {"x": 0, "y": 0} self.listener = Listener(ENERGY_THRESHOLD, PHRASE_TIMEOUT, RECORD_TIMEOUT) self.backlight = Backlight() + self.pixels = neopixel.NeoPixel( + NEOPIXEL_PIN, + NEOPIXEL_COUNT, + brightness=NEOPIXEL_BRIGHTNESS, + pixel_order=NEOPIXEL_ORDER, + auto_write=False, + ) + self._prompt = "" + # Load the prompt file + with open(PROMPT_FILE, "r") as f: + self._prompt = f.read() - def init(self): + def start(self): # Output to the LCD instead of the console os.putenv("DISPLAY", ":0") # Initialize the display pygame.init() self.screen = pygame.display.set_mode((0, 0), pygame.FULLSCREEN) - pygame.display.set_allow_screensaver(False) + pygame.mouse.set_visible(False) + self.screen.fill((255, 255, 255)) self.width = self.screen.get_height() self.height = self.screen.get_width() - # Preload images + # Preload welcome image and display it self._load_image("welcome", WELCOME_IMAGE) + self.display_welcome() + start_time = time.monotonic() + + # Preload remaining images self._load_image("background", BACKGROUND_IMAGE) self._load_image("loading", LOADING_IMAGE) @@ -208,13 +255,21 @@ class Book: self.height - PAGE_NAV_HEIGHT - PAGE_TOP_MARGIN - PAGE_BOTTOM_MARGIN, ) - pygame.mouse.set_visible(False) - self.screen.fill((255, 255, 255)) + self.load_settings() + + # Start the sleep check thread after everything is initialized self._sleep_check_thread = threading.Thread(target=self._handle_sleep) self._sleep_check_thread.start() + # Light the neopixels to indicate the book is ready + self.pixels.fill(NEOPIXEL_READY_COLOR) + self.pixels.show() + + # Continue showing the image until the minimum amount of time has passed + time.sleep(max(0, WELCOME_IMAGE_DELAY - (time.monotonic() - start_time))) + def deinit(self): - self.running = False + self._running = False self._sleep_check_thread.join() self.backlight.power = True @@ -222,21 +277,34 @@ class Book: reed_switch = digitalio.DigitalInOut(REED_SWITCH_PIN) reed_switch.direction = digitalio.Direction.INPUT reed_switch.pull = digitalio.Pull.UP + pulse = Pulse( + self.pixels, + speed=NEOPIXEL_PULSE_SPEED, + color=NEOPIXEL_SLEEP_COLOR, + period=3, + ) - while self.running: - if self.sleeping and reed_switch.value: # Book Open - self.wake() - elif not self.sleeping and not reed_switch.value: # Book Closed - self.sleep() + while self._running: + print(self._busy, self._sleeping, reed_switch.value) + if self._sleeping and reed_switch.value: # Book Open + self._wake() + elif ( + not self._busy and not self._sleeping and not reed_switch.value + ): # Book Closed + self._sleep() + + if self._sleeping: + pulse.animate() time.sleep(self.sleep_check_delay) def handle_events(self): - if not self.sleeping: + if not self._sleeping: for event in pygame.event.get(): if event.type == pygame.QUIT: raise SystemExit if event.type == pygame.MOUSEBUTTONDOWN: self._handle_mousedown_event(event) + time.sleep(0.1) def _handle_mousedown_event(self, event): if event.button == 1: @@ -298,6 +366,7 @@ class Book: pygame.time.wait(fade_delay) def display_current_page(self): + self._busy = True self._display_surface(self.images["background"], 0, 0) pygame.display.update() @@ -320,6 +389,7 @@ class Book: self.back_button.show() self.next_button.show() pygame.display.update() + self._busy = False @staticmethod def _create_transparent_buffer(size): @@ -406,12 +476,15 @@ class Book: pygame.display.update() def display_message(self, message): + self._busy = True self._display_surface(self.images["background"], 0, 0) height = self._title_text_height(message) self._display_title_text(message, self.height // 2 - height // 2) + self._busy = False def load_story(self, story): # Parse out the title and story and render into pages + self._busy = True self.pages = [] title = story.split("Title: ")[1].split("\n\n")[0] page = self._add_page(title) @@ -436,6 +509,7 @@ class Book: if self.cursor["y"] > 0: self.cursor["y"] += PARAGRAPH_SPACING print(f"Loaded story at index {self.story} with {len(self.pages)} pages") + self._busy = False def _add_page(self, title=None): page = { @@ -490,36 +564,67 @@ class Book: pickle.dump(storydata, f) def new_story(self): + self._busy = True self.display_message("What story would you like to hear today?") - while not self.listener.speech_waiting(): - self.listener.listen() + if self._sleep_request: + self._busy = False + return + + def show_waiting(): + # Pause for a beat because the listener doesn't + # immediately start listening sometimes + time.sleep(2) + self.pixels.fill(NEOPIXEL_WAITING_COLOR) + self.pixels.show() + + self.listener.listen(ready_callback=show_waiting) + self.pixels.fill(NEOPIXEL_READY_COLOR) + self.pixels.show() + if self._sleep_request: + self._busy = False + return + + if not self.listener.speech_waiting(): + # No response from user, so return + return + + audio_data = self.listener.get_audio_data() + + story_request = self._transcribe(audio_data.get_wav_data()) - story_request = self._transcribe(self.listener.get_speech()) story_prompt = self._make_story_prompt(story_request) - - print("Getting new response. This may take a minute or two...") self.display_loading() response = self._sendchat(story_prompt) - with open(os.path.dirname(sys.argv[0]) + "response.txt", "w") as f: - f.write(response) + if self._sleep_request: + self._busy = False + return print(response) + self._busy = True self.stories.append(response) self.story = len(self.stories) - 1 self.page = 0 + self.save_settings() self.load_story(response) + self._busy = False - def sleep(self): - self.sleeping = True - self.sleep_check_delay = 1 + def _sleep(self): + # Set a sleep request flag so that any busy threads know to finish up + self._sleep_request = True + self.listener.stop_listening() + while self._busy: + time.sleep(0.1) + self._sleep_request = False + self._sleeping = True + self.sleep_check_delay = 0 self.saved_screen = self.screen.copy() self.screen.fill((0, 0, 0)) pygame.display.update() self.backlight.power = False - def wake(self): + def _wake(self): # Turn on the screen self.backlight.power = True if self.saved_screen: @@ -527,14 +632,13 @@ class Book: pygame.display.update() self.saved_screen = None self.sleep_check_delay = 0.1 - self.sleeping = False + self.pixels.fill(NEOPIXEL_READY_COLOR) + self.pixels.show() + self._sleeping = False - @staticmethod - def _make_story_prompt(request): - return ( - f"Write a complete story with a title and a body of approximately " - f"{STORY_WORD_LENGTH} words long and a happy ending. The specific " - f'story request is "{request}". ' + def _make_story_prompt(self, request): + return self._prompt.format( + STORY_WORD_LENGTH=STORY_WORD_LENGTH, STORY_REQUEST=request ) @staticmethod @@ -554,8 +658,9 @@ class Book: attempts += 1 return "I wasn't able to understand you. Please repeat that." - @staticmethod - def _sendchat(prompt): + def _sendchat(self, prompt): + response = "" + print("Sending to chatGPT") # Package up the text to send to ChatGPT completion = openai.ChatCompletion.create( model=CHATGPT_MODEL, @@ -563,9 +668,17 @@ class Book: {"role": "system", "content": SYSTEM_ROLE}, {"role": "user", "content": prompt}, ], + stream=True, ) + + for chunk in completion: + if "delta" in chunk.choices[0] and "content" in chunk.choices[0]["delta"]: + response += chunk.choices[0]["delta"]["content"] + if self._sleep_request: + return None + # Send the heard text to ChatGPT and return the result - return completion.choices[0].message.content + return response # completion.choices[0].message.content def parse_args(): @@ -585,35 +698,28 @@ def parse_args(): def main(args): book = Book(args.rotation) - book.init() - try: - # Center and display the image - book.display_welcome() - start_time = time.monotonic() - book.load_settings() - - # Continue showing the image until the minimum amount of time has passed - time.sleep(max(0, WELCOME_IMAGE_DELAY - (time.monotonic() - start_time))) + book.start() + # If no stories, start a new one if not book.stories: book.new_story() book.display_current_page() while True: - if not book.sleeping: - book.handle_events() - time.sleep(0.1) + book.handle_events() except KeyboardInterrupt: pass finally: book.save_settings() book.deinit() + pygame.quit() if __name__ == "__main__": main(parse_args()) # TODO: +# * Figure out how to get the script to start on boot # * Play with prompt parameters