diff --git a/Pedal_For_Youtube/main.py b/Pedal_For_Youtube/main.py new file mode 100755 index 000000000..805de3bec --- /dev/null +++ b/Pedal_For_Youtube/main.py @@ -0,0 +1,229 @@ +#!/usr/bin/env python3 +from enum import Enum +import time +import tkinter + +import adafruit_ble +from adafruit_ble.advertising.standard import ProvideServicesAdvertisement +from adafruit_ble.services.standard.device_info import DeviceInfoService +from adafruit_ble_cycling_speed_and_cadence import CyclingSpeedAndCadenceService + +import Xlib.X as X +import Xlib.XK as XK +import Xlib.display as display +import Xlib.ext.xtest as xtest + +# Customize me! Set the minimum RPM to start the video, target RPM, and the +# grace time (number of seconds) you can be below the minimum RPM before it +# stops. +MINIMUM_RPM = 60 +TARGET_RPM = 72 +GRACE_TIME = 2 + +class Keystroke: + """Use a connection to the X Server (linux display server) to send # fake + keystrokes to the Chromium browser window.""" + def __init__(self): + self.display = display.Display() + self.root = self.display.screen().root + self._keycodes = {} + + def _keycode(self, sym): + if isinstance(sym, str): + sym = XK.string_to_keysym(sym) + result = self._keycodes.get(sym, None) + if result is None: + self._keycodes[sym] = result = self.display.keysym_to_keycode(sym) + return result + + def send_keysym(self, keysym): + keycode = self._keycode(keysym) + print("sending", keycode, keysym) + xtest.fake_input(self.root, X.KeyPress, keycode) + self.display.sync() + time.sleep(.01) + xtest.fake_input(self.root, X.KeyRelease, keycode) + self.display.sync() + + @property + def current_window_class(self): + window = self.display.get_input_focus().focus + while window: + class_ = window.get_wm_class() + if class_: + return class_[1] + window = window.query_tree().parent + return '' + +class OSD: + """Use Tkinter to display a simple OSD window on top of all regular windows""" + def __init__(self, width=12, text='', geometry='-0+48', font=('Arial', 36)): + self.app = tkinter.Tk() + self.app.wm_geometry(geometry) + self.app.wm_overrideredirect(1) + self._label = tkinter.Label(self.app, width=width, text=text, font=font) + self._label.pack() + + @property + def label(self): + return self._label['text'] + + @label.setter + def label(self, text): + self._label['text'] = text + self.update() + + def mainloop(self): + self.app.mainloop() + + def destroy(self): + self.app.destroy() + + def update(self): + self.app.update() + + @property + def background(self): + return self.label['background'] + + @background.setter + def background(self, color): + self._label['background'] = color + +def send_pause(): + """Send the key 'p', to send a video to the paused state""" + if keystroke.current_window_class != 'Chromium-browser': + return + print('actually send play') + keystroke.send_keysym('p') + +def send_play(): + """Send the keys 'pk', to send a video into the playing state""" + if keystroke.current_window_class != 'Chromium-browser': + return + print('actually send play') + keystroke.send_keysym('p') + keystroke.send_keysym('k') + +def delta16(v1, v2): + """Return the delta (difference) between two increasing 16-bit counters, + accounting for the wraparound from 65535 back to 0""" + diff = v2 - v1 + if diff < 0: + diff += (1<<16) + return diff + +# PyLint can't find BLERadio for some reason so special case it here. +ble = adafruit_ble.BLERadio() # pylint: disable=no-member +keystroke = Keystroke() +osd = OSD() +class VideoState(Enum): + PAUSED = 0 + PLAYING = 1 + +while True: + state = VideoState.PAUSED + + osd.label = "Scanning" + osd.background = '#ffffff' + print("Scanning...") + # Save advertisements, indexed by address + advs = {} + for adv in ble.start_scan(ProvideServicesAdvertisement, timeout=5): + if CyclingSpeedAndCadenceService in adv.services: + print("found a CyclingSpeedAndCadenceService advertisement") + # Save advertisement. Overwrite duplicates from same address (device). + advs[adv.address] = adv + + ble.stop_scan() + print("Stopped scanning") + if not advs: + # Nothing found. Go back and keep looking. + continue + + osd.label = "Connecting" + # Connect to all available CSC sensors. + cyc_connections = [] + for adv in advs.values(): + cyc_connections.append(ble.connect(adv)) + print("Connected", len(cyc_connections)) + + # Print out info about each sensors. + for conn in cyc_connections: + if conn.connected: + if DeviceInfoService in conn: + dis = conn[DeviceInfoService] + try: + manufacturer = dis.manufacturer + except AttributeError: + manufacturer = "(Manufacturer Not specified)" + print("Device:", manufacturer) + else: + print("No device information") + + osd.label = "Polling" + print("Waiting for data... (could be 10-20 seconds or more)") + # Get CSC Service from each sensor. + cyc_services = [] + for conn in cyc_connections: + cyc_services.append(conn[CyclingSpeedAndCadenceService]) + # Read data from each sensor once a second. + # Stop if we lose connection to all sensors. + + last_crank_time_ms = 0 + last_crank_revs = 0 + grace_period_end = 0 + est_rpm = 0 + + while True: + still_connected = False + crank_revs = None + crank_time_ms = None + for conn, svc in zip(cyc_connections, cyc_services): + if conn.connected: + still_connected = True + values = svc.measurement_values + if values is not None: + if values.cumulative_crank_revolutions: + crank_revs = values.cumulative_crank_revolutions + crank_time_ms = values.last_crank_event_time + if not still_connected: + break + + if crank_revs is None: + continue + + if crank_time_ms == last_crank_time_ms: + est_rpm = 0 + else: + # If we were stopped prior to this, jump to MINIMUM_RPM + # it gives a faster restart after paused + if est_rpm == 0 and state == VideoState.PAUSED: + est_rpm = MINIMUM_RPM + else: + delta_revs = delta16(last_crank_revs, crank_revs) + delta_t = delta16(last_crank_time_ms, crank_time_ms) / 1000 + est_rpm = 60 * delta_revs / delta_t + if est_rpm >= MINIMUM_RPM: + grace_period_end = time.monotonic() + GRACE_TIME + if state == VideoState.PAUSED: + send_play() + state = VideoState.PLAYING + elif time.monotonic() > grace_period_end: + if state == VideoState.PLAYING: + send_pause() + state = VideoState.PAUSED + + last_crank_revs = crank_revs + last_crank_time_ms = crank_time_ms + print(f"Crank: {crank_revs}") + print(f"Crank RPM: {est_rpm:.1f}") + + if est_rpm < MINIMUM_RPM: + osd.background = '#ff0000' + elif est_rpm < TARGET_RPM: + osd.background = '#ffff00' + else: + osd.background = '#00ff00' + osd.label = f"RPM: {est_rpm:.1f}" + time.sleep(0.1) diff --git a/Pedal_For_Youtube/youtube_stop_key.user.js b/Pedal_For_Youtube/youtube_stop_key.user.js new file mode 100644 index 000000000..bce01b410 --- /dev/null +++ b/Pedal_For_Youtube/youtube_stop_key.user.js @@ -0,0 +1,42 @@ +// ==UserScript== +// @name Youtube Pause Key +// @version 1 +// @namespace http://unpythonic.net.com/youtube-pause-key +// @include http://youtube.com/* +// @include https://youtube.com/* +// @include http://www.youtube.com/* +// @include https://www.youtube.com/* +// @grant none +// ==/UserScript== + +(function () { + var inject = function() { + var is_press_key = function (e, key) { + document.last_target = e.target; + document.last_event = e; + var keyCode = e.keyCode ? e.keyCode : e.which; + return (! (e.altKey || e.ctrlKey || e.metaKey) + && keyCode == key); + } + + var gr_in_bg_event = function(e) { + var pause_key = 112; // 'p' key + var videos = e.target.getElementsByTagName('video'); + if(videos.length == 0) { return true; } + if (is_press_key(e, pause_key)) { + videos[0].pause(); + e.stopPropagation(); + e.preventDefault(); + return; + } + return true; + } + + document.addEventListener("keypress", gr_in_bg_event, true); + } + + var newel = document.createElement("script"); + var newcontent = document.createTextNode("(" + inject + ")()") + newel.appendChild(newcontent); + document.body.appendChild(newel); +})()