233 lines
7.3 KiB
Python
Executable file
233 lines
7.3 KiB
Python
Executable file
# SPDX-FileCopyrightText: 2020 Jeff Epler for Adafruit Industries
|
|
#
|
|
# SPDX-License-Identifier: MIT
|
|
|
|
#!/usr/bin/env python3
|
|
from enum import Enum
|
|
import time
|
|
import tkinter
|
|
|
|
import adafruit_ble
|
|
from adafruit_ble.advertising.standard import ProvideServicesAdvertisement
|
|
from adafruit_ble.services.standard.device_info import DeviceInfoService
|
|
from adafruit_ble_cycling_speed_and_cadence import CyclingSpeedAndCadenceService
|
|
|
|
import Xlib.X as X
|
|
import Xlib.XK as XK
|
|
import Xlib.display as display
|
|
import Xlib.ext.xtest as xtest
|
|
|
|
# Customize me! Set the minimum RPM to start the video, target RPM, and the
|
|
# grace time (number of seconds) you can be below the minimum RPM before it
|
|
# stops.
|
|
MINIMUM_RPM = 60
|
|
TARGET_RPM = 72
|
|
GRACE_TIME = 2
|
|
|
|
class Keystroke:
|
|
"""Use a connection to the X Server (linux display server) to send # fake
|
|
keystrokes to the Chromium browser window."""
|
|
def __init__(self):
|
|
self.display = display.Display()
|
|
self.root = self.display.screen().root
|
|
self._keycodes = {}
|
|
|
|
def _keycode(self, sym):
|
|
if isinstance(sym, str):
|
|
sym = XK.string_to_keysym(sym)
|
|
result = self._keycodes.get(sym, None)
|
|
if result is None:
|
|
self._keycodes[sym] = result = self.display.keysym_to_keycode(sym)
|
|
return result
|
|
|
|
def send_keysym(self, keysym):
|
|
keycode = self._keycode(keysym)
|
|
print("sending", keycode, keysym)
|
|
xtest.fake_input(self.root, X.KeyPress, keycode)
|
|
self.display.sync()
|
|
time.sleep(.01)
|
|
xtest.fake_input(self.root, X.KeyRelease, keycode)
|
|
self.display.sync()
|
|
|
|
@property
|
|
def current_window_class(self):
|
|
window = self.display.get_input_focus().focus
|
|
while window:
|
|
class_ = window.get_wm_class()
|
|
if class_:
|
|
return class_[1]
|
|
window = window.query_tree().parent
|
|
return ''
|
|
|
|
class OSD:
|
|
"""Use Tkinter to display a simple OSD window on top of all regular windows"""
|
|
def __init__(self, width=12, text='', geometry='-0+48', font=('Arial', 36)):
|
|
self.app = tkinter.Tk()
|
|
self.app.wm_geometry(geometry)
|
|
self.app.wm_overrideredirect(1)
|
|
self._label = tkinter.Label(self.app, width=width, text=text, font=font)
|
|
self._label.pack()
|
|
|
|
@property
|
|
def label(self):
|
|
return self._label['text']
|
|
|
|
@label.setter
|
|
def label(self, text):
|
|
self._label['text'] = text
|
|
self.update()
|
|
|
|
def mainloop(self):
|
|
self.app.mainloop()
|
|
|
|
def destroy(self):
|
|
self.app.destroy()
|
|
|
|
def update(self):
|
|
self.app.update()
|
|
|
|
@property
|
|
def background(self):
|
|
return self.label['background']
|
|
|
|
@background.setter
|
|
def background(self, color):
|
|
self._label['background'] = color
|
|
|
|
def send_pause():
|
|
"""Send the key 'p', to send a video to the paused state"""
|
|
if keystroke.current_window_class != 'Chromium-browser':
|
|
return
|
|
print('actually send play')
|
|
keystroke.send_keysym('p')
|
|
|
|
def send_play():
|
|
"""Send the keys 'pk', to send a video into the playing state"""
|
|
if keystroke.current_window_class != 'Chromium-browser':
|
|
return
|
|
print('actually send play')
|
|
keystroke.send_keysym('p')
|
|
keystroke.send_keysym('k')
|
|
|
|
def delta16(v1, v2):
|
|
"""Return the delta (difference) between two increasing 16-bit counters,
|
|
accounting for the wraparound from 65535 back to 0"""
|
|
diff = v2 - v1
|
|
if diff < 0:
|
|
diff += (1<<16)
|
|
return diff
|
|
|
|
# PyLint can't find BLERadio for some reason so special case it here.
|
|
ble = adafruit_ble.BLERadio() # pylint: disable=no-member
|
|
keystroke = Keystroke()
|
|
osd = OSD()
|
|
class VideoState(Enum):
|
|
PAUSED = 0
|
|
PLAYING = 1
|
|
|
|
while True:
|
|
state = VideoState.PAUSED
|
|
|
|
osd.label = "Scanning"
|
|
osd.background = '#ffffff'
|
|
print("Scanning...")
|
|
# Save advertisements, indexed by address
|
|
advs = {}
|
|
for adv in ble.start_scan(ProvideServicesAdvertisement, timeout=5):
|
|
if CyclingSpeedAndCadenceService in adv.services:
|
|
print("found a CyclingSpeedAndCadenceService advertisement")
|
|
# Save advertisement. Overwrite duplicates from same address (device).
|
|
advs[adv.address] = adv
|
|
|
|
ble.stop_scan()
|
|
print("Stopped scanning")
|
|
if not advs:
|
|
# Nothing found. Go back and keep looking.
|
|
continue
|
|
|
|
osd.label = "Connecting"
|
|
# Connect to all available CSC sensors.
|
|
cyc_connections = []
|
|
for adv in advs.values():
|
|
cyc_connections.append(ble.connect(adv))
|
|
print("Connected", len(cyc_connections))
|
|
|
|
# Print out info about each sensors.
|
|
for conn in cyc_connections:
|
|
if conn.connected:
|
|
if DeviceInfoService in conn:
|
|
dis = conn[DeviceInfoService]
|
|
try:
|
|
manufacturer = dis.manufacturer
|
|
except AttributeError:
|
|
manufacturer = "(Manufacturer Not specified)"
|
|
print("Device:", manufacturer)
|
|
else:
|
|
print("No device information")
|
|
|
|
osd.label = "Polling"
|
|
print("Waiting for data... (could be 10-20 seconds or more)")
|
|
# Get CSC Service from each sensor.
|
|
cyc_services = []
|
|
for conn in cyc_connections:
|
|
cyc_services.append(conn[CyclingSpeedAndCadenceService])
|
|
# Read data from each sensor once a second.
|
|
# Stop if we lose connection to all sensors.
|
|
|
|
last_crank_time_ms = 0
|
|
last_crank_revs = 0
|
|
grace_period_end = 0
|
|
est_rpm = 0
|
|
|
|
while True:
|
|
still_connected = False
|
|
crank_revs = None
|
|
crank_time_ms = None
|
|
for conn, svc in zip(cyc_connections, cyc_services):
|
|
if conn.connected:
|
|
still_connected = True
|
|
values = svc.measurement_values
|
|
if values is not None:
|
|
if values.cumulative_crank_revolutions:
|
|
crank_revs = values.cumulative_crank_revolutions
|
|
crank_time_ms = values.last_crank_event_time
|
|
if not still_connected:
|
|
break
|
|
|
|
if crank_revs is None:
|
|
continue
|
|
|
|
if crank_time_ms == last_crank_time_ms:
|
|
est_rpm = 0
|
|
else:
|
|
# If we were stopped prior to this, jump to MINIMUM_RPM
|
|
# it gives a faster restart after paused
|
|
if est_rpm == 0 and state == VideoState.PAUSED:
|
|
est_rpm = MINIMUM_RPM
|
|
else:
|
|
delta_revs = delta16(last_crank_revs, crank_revs)
|
|
delta_t = delta16(last_crank_time_ms, crank_time_ms) / 1000
|
|
est_rpm = 60 * delta_revs / delta_t
|
|
if est_rpm >= MINIMUM_RPM:
|
|
grace_period_end = time.monotonic() + GRACE_TIME
|
|
if state == VideoState.PAUSED:
|
|
send_play()
|
|
state = VideoState.PLAYING
|
|
elif time.monotonic() > grace_period_end:
|
|
if state == VideoState.PLAYING:
|
|
send_pause()
|
|
state = VideoState.PAUSED
|
|
|
|
last_crank_revs = crank_revs
|
|
last_crank_time_ms = crank_time_ms
|
|
print(f"Crank: {crank_revs}")
|
|
print(f"Crank RPM: {est_rpm:.1f}")
|
|
|
|
if est_rpm < MINIMUM_RPM:
|
|
osd.background = '#ff0000'
|
|
elif est_rpm < TARGET_RPM:
|
|
osd.background = '#ffff00'
|
|
else:
|
|
osd.background = '#00ff00'
|
|
osd.label = f"RPM: {est_rpm:.1f}"
|
|
time.sleep(0.1)
|