tons of refactoring (squashed)

This commit is contained in:
siddacious 2020-02-07 11:21:09 -08:00
parent 1d8ad0f2cf
commit 7b2ea7e8e2
9 changed files with 384 additions and 347 deletions

View file

@ -0,0 +1,23 @@
import puff_detector
detector = puff_detector.PuffDetector()
@detector.on_sip
def on_sip(strength, duration):
if strength == puff_detector.STRONG:
print("GOT STRONG SIP")
if strength == puff_detector.SOFT:
print("GOT SOFT SIP")
print("%.2f long" % duration)
@detector.on_puff
def on_puff(strength, duration):
if strength == puff_detector.STRONG:
print("GOT STRONG PUFF")
if strength == puff_detector.SOFT:
print("GOT SOFT PUFF")
print("%.2f long" % duration)
detector.run()

View file

@ -0,0 +1,288 @@
import time
import os
import json
import board
import terminalio
from adafruit_display_text import label
from displayio import Group
import displayio
import adafruit_displayio_ssd1306
import adafruit_lps35hw
CONSOLE = False
DEBUG = True
MIN_PRESSURE = 8
HIGH_PRESSURE = 40
WAITING = 0
STARTED = 1
DETECTED = 2
SOFT_SIP = 0
HARD_SIP = 1
SOFT_PUFF = 2
HARD_PUFF = 3
SOFT = 1
STRONG = 2
COLOR = 0xFFFFFF
FONT = terminalio.FONT
DISPLAY_WIDTH = 128
DISPLAY_HEIGHT = 64
Y_OFFSET = 3
TEXT_HEIGHT = 8
BOTTOM_ROW = DISPLAY_HEIGHT - TEXT_HEIGHT
BANNER_STRING = "PUFF-O-TRON-9000"
pressure_string = " "
input_type_string = " "
# pylint:disable=too-many-locals,exec-used,eval-used
class PuffDetector:
def __init__(
self,
min_pressure=MIN_PRESSURE,
high_pressure=HIGH_PRESSURE,
config_filename="settings.json",
display_timeout=1,
):
# misc detection state
self.current_pressure = 0
self.current_polarity = 0
self.current_time = time.monotonic()
self.start_polarity = 0
self.peak_level = 0
self.puff_start = 0
self.duration = 0
self.state = WAITING
self.prev_state = self.state
# settings
self.settings_dict = {}
self.high_pressure = high_pressure
self.min_pressure = min_pressure
self._config_filename = config_filename
self._load_config()
# callbacks
self._on_sip_callbacks = []
self._on_puff_callbacks = []
# display and display state
self.display = None
self.state_display_start = self.current_time
self.detection_result_str = " "
self.duration_str = " "
self.min_press_str = " "
self.high_press_str = " "
self.state_str = " "
self.press_str = " "
self.display_timeout = display_timeout
self._init_stuff()
def _init_stuff(self):
# decouple display
self.state_display_timeout = 1.0
self.state_display_start = 0
displayio.release_displays()
i2c = board.I2C()
display_bus = displayio.I2CDisplay(i2c, device_address=0x3D)
self.display = adafruit_displayio_ssd1306.SSD1306(
display_bus, width=DISPLAY_WIDTH, height=DISPLAY_HEIGHT
)
self.min_press_str = "min: %d" % self.min_pressure
self.high_press_str = "hi: %d" % self.high_pressure
self.pressure_sensor = adafruit_lps35hw.LPS35HW(i2c)
self.pressure_sensor.zero_pressure()
self.pressure_sensor.data_rate = adafruit_lps35hw.DataRate.RATE_75_HZ
self.pressure_sensor.filter_enabled = True
self.pressure_sensor.filter_config = True
def _load_config(self):
if not self._config_filename in os.listdir("/"):
return
try:
with open(self._config_filename, "r") as file:
self.settings_dict = json.load(file)
except (ValueError, OSError) as error:
print("Error loading config file")
print(type(error))
if self.settings_dict:
if "MIN_PRESSURE" in self.settings_dict.keys():
self.min_pressure = self.settings_dict["MIN_PRESSURE"]
if "HIGH_PRESSURE" in self.settings_dict.keys():
self.high_pressure = self.settings_dict["HIGH_PRESSURE"]
if "DISPLAY_TIMEOUT" in self.settings_dict.keys():
self.display_timeout = self.settings_dict["DISPLAY_TIMEOUT"]
def check_for_events(self):
self.current_time = time.monotonic()
self.current_pressure = self.pressure_sensor.pressure
self._update_state()
self._notify_callbacks()
self._update_display()
def run(self):
while True:
self.check_for_events()
def _catagorize_pressure(self, pressure):
"""determine the strength and polarity of the pressure reading"""
level = 0
polarity = 0
abs_pressure = abs(pressure)
if abs_pressure > self.min_pressure:
level = 1
if abs_pressure > self.high_pressure:
level = 2
if level != 0:
if pressure > 0:
polarity = 1
else:
polarity = -1
return (polarity, level)
def on_sip(self, func):
self.add_on_sip(func)
return func
def on_puff(self, func):
self.add_on_puff(func)
return func
def add_on_sip(self, new_callback):
self._on_sip_callbacks.append(new_callback)
def add_on_puff(self, new_callback):
self._on_puff_callbacks.append(new_callback)
def _update_state(self):
"""Updates the internal state to detect if a sip/puff has been started or stopped"""
self.current_polarity, level = self._catagorize_pressure(self.current_pressure)
if self.state == DETECTED:
self.state = WAITING
self.start_polarity = 0
self.peak_level = 0
self.duration = 0
if (self.state == WAITING) and level != 0 and (self.start_polarity == 0):
self.state = STARTED
self.start_polarity = self.current_polarity
self.puff_start = time.monotonic()
if self.state == STARTED:
if level > self.peak_level:
self.peak_level = level
if level == 0:
self.state = DETECTED
self.duration = time.monotonic() - self.puff_start
def _notify_callbacks(self):
state_changed = self.prev_state != self.state
self.prev_state = self.state
if not state_changed:
return
if self.state == DETECTED:
# if this is a sip
if self.start_polarity == -1:
for on_sip_callback in self._on_sip_callbacks:
on_sip_callback(self.peak_level, self.duration)
# if this is a sip
if self.start_polarity == 1:
for on_puff_callback in self._on_puff_callbacks:
on_puff_callback(self.peak_level, self.duration)
def _update_display_strings(self):
self.press_str = "Press: %0.3f" % self.current_pressure
if self.state == DETECTED:
self.duration_str = "Duration: %0.2f" % self.duration
self.state_str = "DETECTED:"
if self.start_polarity == -1:
if self.peak_level == STRONG:
self.detection_result_str = "STRONG SIP"
if self.peak_level == SOFT:
self.detection_result_str = "SOFT SIP"
if self.start_polarity == 1:
if self.peak_level == STRONG:
self.detection_result_str = "STRONG PUFF"
if self.peak_level == SOFT:
self.detection_result_str = "SOFT PUFF"
self.state_display_start = self.current_time
elif self.state == WAITING:
display_elapsed = self.current_time - self.state_display_start
if display_elapsed > self.display_timeout:
self.detection_result_str = " "
self.duration_str = " "
self.detection_result_str = " "
self.state_str = "WAITING FOR INPUT"
elif self.state == STARTED:
if self.start_polarity == -1:
self.state_str = "SIP STARTED..."
if self.start_polarity == 1:
self.state_str = "PUFF STARTED..."
def _update_display(self):
self._update_display_strings()
banner = label.Label(FONT, text=BANNER_STRING, color=COLOR)
state = label.Label(FONT, text=self.state_str, color=COLOR)
detector_result = label.Label(FONT, text=self.detection_result_str, color=COLOR)
duration = label.Label(FONT, text=self.duration_str, color=COLOR)
min_pressure_label = label.Label(FONT, text=self.min_press_str, color=COLOR)
high_pressure_label = label.Label(FONT, text=self.high_press_str, color=COLOR)
pressure_label = label.Label(FONT, text=self.press_str, color=COLOR)
banner.x = 0
banner.y = 0 + Y_OFFSET
state.x = 10
state.y = 10 + Y_OFFSET
detector_result.x = 10
detector_result.y = 20 + Y_OFFSET
duration.x = 10
duration.y = 30 + Y_OFFSET
min_pressure_label.x = 0
min_pressure_label.y = BOTTOM_ROW - 10
pressure_label.x = DISPLAY_WIDTH - pressure_label.bounding_box[2]
pressure_label.y = BOTTOM_ROW
high_pressure_label.x = 0
high_pressure_label.y = BOTTOM_ROW
splash = Group(max_size=10)
splash.append(banner)
splash.append(state)
splash.append(detector_result)
splash.append(duration)
splash.append(min_pressure_label)
splash.append(high_pressure_label)
splash.append(pressure_label)
self.display.show(splash)

View file

@ -0,0 +1,45 @@
class SerialLogger:
# def __init__(self):
# print("MIN_PRESSURE::", detector.min_pressure)
# print("HIGH_PRESSURE_THRESHOLD::", detector.high_pressure)
@classmethod
def run(cls, detector, state_map, puff_stat):
polarity, peak_level, duration = puff_stat
state_str = state_map[detector.state][polarity][0]
state_str = state_str.replace(" ", "_").upper()
input_type_str = state_map[detector.state][polarity][1][peak_level]
if state_map[detector.state]["name"] == "WAITING":
print(state_str)
if state_map[detector.state]["name"] == "STARTED":
print(state_str.replace(" ", "_").upper())
if state_map[detector.state]["name"] == "DETECTED":
type_detected = input_type_str[0].replace(" ", "_").upper()
log_str = "%s::%s::DURATION:%0.3f" % (state_str, type_detected, duration)
print(log_str)
#################################
# def log_state_change(self, puff_stat):
# state_changed = self.prev_state == self.state
# self.prev_state = self.state
# if state_changed:
# return
# polarity, peak_level, duration = puff_stat
# state_str = STATE_MAP[self.state][polarity][0]
# input_type_str = STATE_MAP[self.state][polarity][1][peak_level]
# state_str = state_str.replace(" ", "_").upper()
# if self.state is WAITING:
# print(state_str)
# if self.state is STARTED:
# print(state_str.replace(" ", "_").upper())
# if self.state is DETECTED:
# type_detected = input_type_str[0]
# log_str = "%s::%s::DURATION:%0.3f" % (state_str, type_detected, duration)
# print(log_str)

View file

@ -0,0 +1,5 @@
{
"MIN_PRESSURE": 10,
"HIGH_PRESSURE": 60,
"DISPLAY_TIMEOUT": 1
}

View file

@ -0,0 +1,23 @@
from adafruit_hid.keyboard import Keyboard
from adafruit_hid.keycode import Keycode
def log(message):
print("\t\tHIDDEMO::%s" % message)
class SimpleHid:
def __init__(self):
self.kbd = Keyboard()
print("SimpleHid inited")
def run(self, detector, state_map, puff_stat): # pylint:disable=unused-argument
if state_map[detector.state]["name"] == "WAITING":
self.kbd.send(Keycode.ONE)
if state_map[detector.state]["name"] == "STARTED":
self.kbd.send(Keycode.TWO)
if state_map[detector.state]["name"] == "DETECTED":
self.kbd.send(Keycode.SIX)

View file

@ -1,167 +0,0 @@
import time
import board
import displayio
import terminalio
from adafruit_display_text import label
import adafruit_displayio_ssd1306
import adafruit_lps35hw
from puff_detector import PuffDetector, STARTED, DETECTED
from adafruit_hid.keyboard import Keyboard
from adafruit_hid.keycode import Keycode
# The keycode sent for each button, will be paired with a control key
buttonkeys = [Keycode.A, Keycode.B, Keycode.C, Keycode.D, Keycode.E, Keycode.F]
controlkey = Keycode.LEFT_CONTROL
# the keyboard object!
kbd = Keyboard()
displayio.release_displays()
DISPLAY_WIDTH = 128
DISPLAY_HEIGHT = 64
Y_OFFSET = 3
TEXT_HEIGHT = 8
BOTTOM_ROW = DISPLAY_HEIGHT - TEXT_HEIGHT
SOFT_SIP = 0
HARD_SIP = 1
SOFT_PUFF = 2
HARD_PUFF = 3
i2c = board.I2C()
display_bus = displayio.I2CDisplay(i2c, device_address=0x3D)
display = adafruit_displayio_ssd1306.SSD1306(
display_bus, width=DISPLAY_WIDTH, height=DISPLAY_HEIGHT
)
lps = adafruit_lps35hw.LPS35HW(i2c, 0x5C)
lps.zero_pressure()
lps.data_rate = adafruit_lps35hw.DataRate.RATE_75_HZ
lps.filter_enabled = True
lps.filter_config = True
detector = PuffDetector()
time.sleep(1)
color = 0xFFFFFF
font = terminalio.FONT
banner_string = "PUFF-O-TRON-9000"
state_string = " "
pressure_string = " "
input_type_string = " "
duration_string = " "
state_display_timeout = 1.0
state_display_start = 0
while True:
detected_puff = None
curr_time = time.monotonic()
# Set text, font, and color
current_pressure = lps.pressure
pressure_string = "Press: %0.3f" % current_pressure
puff_polarity, puff_peak_level, puff_duration = detector.check_for_puff(
current_pressure
)
# if puff_duration:
if detector.state == DETECTED:
state = DETECTED
duration_string = (
"Duration: %0.2f" % puff_duration
) # puff duration can be none? after detect?
state_string = "DETECTED:"
if puff_polarity == 1:
if puff_peak_level == 1:
input_type_string = "SOFT PUFF"
detected_puff = SOFT_PUFF
if puff_peak_level == 2:
input_type_string = "HARD PUFF"
detected_puff = HARD_PUFF
if puff_polarity == -1:
if puff_peak_level == 1:
input_type_string = "SOFT SIP"
detected_puff = SOFT_SIP
if puff_peak_level == 2:
input_type_string = "HARD SIP"
detected_puff = HARD_SIP
state_display_start = curr_time
elif detector.state == STARTED:
# elif puff_duration is None and puff_polarity:
dir_string = ""
if puff_polarity == 1:
dir_string = "PUFF"
if puff_polarity == -1:
dir_string = "SIP"
state_string = "%s START" % dir_string
else: # WAITING
if (curr_time - state_display_start) > detector.display_timeout:
state_string = "Waiting for Input"
input_type_string = " "
duration_string = " "
# if it's been >timeout since we started displaying puff result
min_press_str = "min: %d" % detector.min_pressure
high_press_str = "hi: %d" % detector.high_pressure
banner = label.Label(font, text=banner_string, color=color)
state = label.Label(font, text=state_string, color=color)
detector_result = label.Label(font, text=input_type_string, color=color)
duration = label.Label(font, text=duration_string, color=color)
min_pressure_label = label.Label(font, text=min_press_str, color=color)
high_pressure_label = label.Label(font, text=high_press_str, color=color)
pressure_label = label.Label(font, text=pressure_string, color=color)
banner.x = 0
banner.y = 0 + Y_OFFSET
state.x = 20
state.y = 10 + Y_OFFSET
detector_result.x = 20
detector_result.y = 20 + Y_OFFSET
duration.x = 10
duration.y = 30 + Y_OFFSET
min_pressure_label.x = 0
min_pressure_label.y = BOTTOM_ROW - 10
x, y, w, h = pressure_label.bounding_box
pressure_label.x = DISPLAY_WIDTH - w
pressure_label.y = BOTTOM_ROW
high_pressure_label.x = 0
high_pressure_label.y = BOTTOM_ROW
splash = displayio.Group(max_size=10)
splash.append(banner)
splash.append(state)
splash.append(detector_result)
splash.append(duration)
splash.append(min_pressure_label)
splash.append(high_pressure_label)
splash.append(pressure_label)
# Show it
display.show(splash)
# press some buttons
if detected_puff == SOFT_PUFF:
kbd.press(Keycode.LEFT_ARROW)
if detected_puff == HARD_PUFF:
kbd.press(Keycode.DOWN_ARROW)
if detected_puff == SOFT_SIP:
kbd.press(Keycode.RIGHT_ARROW)
if detected_puff == HARD_SIP:
kbd.press(Keycode.UP_ARROW)
kbd.release_all()

View file

@ -1,123 +0,0 @@
import time
import os
import json
CONSOLE = False
DEBUG = True
MIN_PRESSURE = 8
HIGH_PRESSURE = 40
WAITING = 0
STARTED = 1
DETECTED = 2
class PuffDetector:
def __init__(
self,
min_pressure=MIN_PRESSURE,
high_pressure=HIGH_PRESSURE,
config_filename="settings.json",
display_timeout=1,
):
self.high_pressure = high_pressure
self.min_pressure = min_pressure
self.start_polarity = 0
self.peak_level = 0
self.counter = 0
self.duration = 0
self.puff_start = 0
self.state = WAITING
self.settings_dict = {}
self.display_timeout = display_timeout
self._config_filename = config_filename
self._load_config()
if self.settings_dict:
self.min_pressure = self.settings_dict["min_pressure"]
self.high_pressure = self.settings_dict["high_pressure"]
if "display_timeout" in self.settings_dict.keys():
self.display_timeout = self.settings_dict["display_timeout"]
def _load_config(self):
if not self._config_filename in os.listdir("/"):
return
try:
with open(self._config_filename, "r") as file:
self.settings_dict = json.load(file)
except (ValueError, OSError) as error:
print("Error loading config file")
print(type(error))
def catagorize_pressure(self, pressure):
"""determine the strength and polarity of the pressure reading"""
level = 0
polarity = 0
abs_pressure = abs(pressure)
if abs_pressure > self.min_pressure:
level = 1
if abs_pressure > self.high_pressure:
level = 2
if level != 0:
if pressure > 0:
polarity = 1
else:
polarity = -1
return (polarity, level)
@staticmethod
def pressure_string(pressure_type):
polarity, level = pressure_type # pylint:disable=unused-variable
pressure_str = "HIGH"
if level == 0 or polarity == 0:
return ""
# print("pressure level:", level)
if level == 1:
pressure_str = "LOW"
elif level == 2:
pressure_str = "HIGH"
if polarity == 1:
pressure_str += "PUFF"
elif polarity == -1:
pressure_str += "SIP"
return pressure_str
def check_for_puff(self, current_pressure):
"""Updates the internal state to detect if a sip/puff has been started or stopped"""
puff_peak_level = None
puff_duration = None
polarity, level = self.catagorize_pressure(current_pressure)
if self.state == DETECTED:
# if polarity == 0 and level == 0:
self.state = WAITING
self.start_polarity = 0
self.peak_level = 0
self.duration = 0
if level != 0 and self.start_polarity == 0:
self.state = STARTED
self.start_polarity = polarity
self.puff_start = time.monotonic()
if self.state == STARTED:
# if self.start_polarity != 0:
if level > self.peak_level:
self.peak_level = level
# if (level == 0) and (self.start_polarity != 0):
if (level == 0) and (self.state == STARTED):
self.state = DETECTED
self.duration = time.monotonic() - self.puff_start
puff_peak_level = self.peak_level
puff_duration = self.duration
self.counter += 1
return (self.start_polarity, puff_peak_level, puff_duration)

View file

@ -1,57 +0,0 @@
import unittest
from puff_detector import PuffDetector
class DirectionChangeTest(unittest.TestCase):
def test_negative_direction(self):
reading_list = [-1, -2, -3, -4, -5]
self.assertEqual(PuffDetector.direction(reading_list), -1)
def test_positive_direction(self):
reading_list = [1, 2, 3, 4, 5, 6]
self.assertEqual(PuffDetector.direction(reading_list), 1)
def test_flat_direction(self):
reading_list = [1, 1, 1, 1, 1, 1]
self.assertEqual(PuffDetector.direction(reading_list), 0)
def test_decreasing_positive(self):
reading_list = [6, 5, 4, 3, 2, 1]
self.assertEqual(PuffDetector.direction(reading_list), -1)
def test_negative_to_positive(self):
reading_list = [-1, -2, -3, -4, -4, -3, -2, -1]
self.assertEqual(PuffDetector.direction_changed(reading_list, -1), True)
def test_negative_to_negative_er(self):
reading_list = [-1, -2, -3, -4, -5.0 - 6, -7, -8]
self.assertEqual(PuffDetector.direction_changed(reading_list, -1), False)
def test_positive_to_negative(self):
reading_list = [1, 2, 3, 4, 4, 3, 2, 1]
self.assertEqual(PuffDetector.direction_changed(reading_list, 1), True)
def test_positive_to_positive_er(self):
reading_list = [1, 2, 3, 4, 5, 5, 6, 7]
print("positive-er")
self.assertEqual(PuffDetector.direction_changed(reading_list, 1), False)
class RollingAverageTest(unittest.TestCase):
def test_uniform_measurements(self):
reading_list = [10, 10, 10, 10]
self.assertEqual(PuffDetector.rolling_average(reading_list), 10)
def test_real_average(self):
reading_list = [10, 5, 5]
self.assertEqual(PuffDetector.rolling_average(reading_list), 20 / 3)
def test_rolling_average(self):
reading_list = [0, 0, 0, 10, 5, 5]
self.assertEqual(PuffDetector.rolling_average(reading_list), 20 / 3)
def test_flat_average(self):
reading_list = [1, 1, 1, 1, 1, 1]
self.assertEqual(PuffDetector.rolling_average(reading_list), 1)