Merge pull request #1961 from dhalbert/asyncio-1

move Cooperative Multitasking code to repo
This commit is contained in:
Anne Barela 2022-01-06 12:32:41 -05:00 committed by GitHub
commit a8cd5c8d6a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 357 additions and 0 deletions

View file

@ -0,0 +1,70 @@
# SPDX-FileCopyrightText: 2022 Dan Halbert for Adafruit Industries
#
# SPDX-License-Identifier: MIT
import asyncio
import board
import keypad
import neopixel
from rainbowio import colorwheel
pixel_pin = board.A0
num_pixels = 24
pixels = neopixel.NeoPixel(pixel_pin, num_pixels, brightness=0.03, auto_write=False)
class Controls:
def __init__(self):
self.reverse = False
self.wait = 0.0
async def rainbow_cycle(controls):
while True:
# Increment by 2 instead of 1 to speed the cycle up a bit.
for j in range(255, -1, -2) if controls.reverse else range(0, 256, 2):
for i in range(num_pixels):
rc_index = (i * 256 // num_pixels) + j
pixels[i] = colorwheel(rc_index & 255)
pixels.show()
await asyncio.sleep(controls.wait)
async def monitor_buttons(reverse_pin, slower_pin, faster_pin, controls):
"""Monitor buttons that reverse direction and change animation speed.
Assume buttons are active low.
"""
with keypad.Keys(
(reverse_pin, slower_pin, faster_pin), value_when_pressed=False, pull=True
) as keys:
while True:
key_event = keys.events.get()
if key_event and key_event.pressed:
key_number = key_event.key_number
if key_number == 0:
controls.reverse = not controls.reverse
elif key_number == 1:
# Lengthen the interval.
controls.wait = controls.wait + 0.001
elif key_number == 2:
# Shorten the interval.
controls.wait = max(0.0, controls.wait - 0.001)
# Let another task run.
await asyncio.sleep(0)
async def main():
controls = Controls()
buttons_task = asyncio.create_task(
monitor_buttons(board.A1, board.A2, board.A3, controls)
)
animation_task = asyncio.create_task(rainbow_cycle(controls))
# This will run forever, because no tasks ever finish.
await asyncio.gather(buttons_task, animation_task)
asyncio.run(main())

View file

@ -0,0 +1,26 @@
# SPDX-FileCopyrightText: 2022 Dan Halbert for Adafruit Industries
#
# SPDX-License-Identifier: MIT
import time
import board
import digitalio
def blink(pin, interval, count):
with digitalio.DigitalInOut(pin) as led:
led.switch_to_output(value=False)
for _ in range(count):
led.value = True
time.sleep(interval)
led.value = False
time.sleep(interval)
def main():
blink(board.D1, 0.25, 10)
print("done")
main()

View file

@ -0,0 +1,29 @@
# SPDX-FileCopyrightText: 2022 Dan Halbert for Adafruit Industries
#
# SPDX-License-Identifier: MIT
# DOESN'T WORK
import time
import board
import digitalio
def blink(pin, interval, count):
with digitalio.DigitalInOut(pin) as led:
led.switch_to_output(value=False)
for _ in range(count):
led.value = True
time.sleep(interval)
led.value = False
time.sleep(interval)
def main():
blink(board.D1, 0.25, 10)
# DOESN'T WORK
# Second LED blinks only after the first one is finished.
blink(board.D2, 0.1, 20)
main()

View file

@ -0,0 +1,48 @@
# SPDX-FileCopyrightText: 2022 Dan Halbert for Adafruit Industries
#
# SPDX-License-Identifier: MIT
import time
import board
import digitalio
class Blinker:
def __init__(self, led, interval, count):
self.led = led
self.interval = interval
# Count both on and off.
self.count2 = count * 2
self.last_transition = 0
def blink(self):
"""Return False when blinking is finished."""
if self.count2 <= 0:
return False
now = time.monotonic()
if now > self.last_transition + self.interval:
self.led.value = not self.led.value
self.last_transition = now
self.count2 -= 1
return True
def main():
with digitalio.DigitalInOut(board.D1) as led1, digitalio.DigitalInOut(
board.D2
) as led2:
led1.switch_to_output(value=False)
led2.switch_to_output(value=False)
blinker1 = Blinker(led1, 0.25, 10)
blinker2 = Blinker(led2, 0.1, 20)
running1 = True
running2 = True
while running1 or running2:
running1 = blinker1.blink()
running2 = blinker2.blink()
print("done")
main()

View file

@ -0,0 +1,63 @@
# SPDX-FileCopyrightText: 2022 Dan Halbert for Adafruit Industries
#
# SPDX-License-Identifier: MIT
import asyncio
import board
import digitalio
import keypad
class Interval:
"""Simple class to hold an interval value. Use .value to to read or write."""
def __init__(self, initial_interval):
self.value = initial_interval
async def monitor_interval_buttons(pin_slower, pin_faster, interval):
"""Monitor two buttons: one lengthens the interval, the other shortens it.
Change interval.value as appropriate.
"""
# Assume buttons are active low.
with keypad.Keys(
(pin_slower, pin_faster), value_when_pressed=False, pull=True
) as keys:
while True:
key_event = keys.events.get()
if key_event and key_event.pressed:
if key_event.key_number == 0:
# Lengthen the interval.
interval.value += 0.1
else:
# Shorten the interval.
interval.value = max(0.1, interval.value - 0.1)
print("interval is now", interval.value)
# Let another task run.
await asyncio.sleep(0)
async def blink(pin, interval):
"""Blink the given pin forever.
The blinking rate is controlled by the supplied Interval object.
"""
with digitalio.DigitalInOut(pin) as led:
led.switch_to_output()
while True:
led.value = not led.value
await asyncio.sleep(interval.value)
async def main():
# Start blinking 0.5 sec on, 0.5 sec off.
interval = Interval(0.5)
led_task = asyncio.create_task(blink(board.D1, interval))
interval_task = asyncio.create_task(
monitor_interval_buttons(board.D3, board.D4, interval)
)
# This will run forever, because neither task ever exits.
await asyncio.gather(led_task, interval_task)
asyncio.run(main())

View file

@ -0,0 +1,26 @@
# SPDX-FileCopyrightText: 2022 Dan Halbert for Adafruit Industries
#
# SPDX-License-Identifier: MIT
import asyncio
import board
import digitalio
async def blink(pin, interval, count): # Don't forget the async!
with digitalio.DigitalInOut(pin) as led:
led.switch_to_output(value=False)
for _ in range(count):
led.value = True
await asyncio.sleep(interval) # Don't forget the await!
led.value = False
await asyncio.sleep(interval) # Don't forget the await!
async def main(): # Don't forget the async!
led_task = asyncio.create_task(blink(board.D1, 0.25, 10))
await asyncio.gather(led_task) # Don't forget the await!
print("done")
asyncio.run(main())

View file

@ -0,0 +1,67 @@
# SPDX-FileCopyrightText: 2022 Dan Halbert for Adafruit Industries
#
# SPDX-License-Identifier: MIT
import asyncio
import board
import digitalio
import keypad
class Interval:
"""Simple class to hold an interval value. Use .value to to read or write."""
def __init__(self, initial_interval):
self.value = initial_interval
async def monitor_interval_buttons(pin_slower, pin_faster, interval):
"""Monitor two buttons: one lengthens the interval, the other shortens it.
Change interval.value as appropriate.
"""
# Assume buttons are active low.
with keypad.Keys(
(pin_slower, pin_faster), value_when_pressed=False, pull=True
) as keys:
while True:
key_event = keys.events.get()
if key_event and key_event.pressed:
if key_event.key_number == 0:
# Lengthen the interval.
interval.value += 0.1
else:
# Shorten the interval.
interval.value = max(0.1, interval.value - 0.1)
print("interval is now", interval.value)
# Let another task run.
await asyncio.sleep(0)
async def blink(pin, interval):
"""Blink the given pin forever.
The blinking rate is controlled by the supplied Interval object.
"""
with digitalio.DigitalInOut(pin) as led:
led.switch_to_output()
while True:
led.value = not led.value
await asyncio.sleep(interval.value)
async def main():
interval1 = Interval(0.5)
interval2 = Interval(1.0)
led1_task = asyncio.create_task(blink(board.D1, interval1))
led2_task = asyncio.create_task(blink(board.D2, interval2))
interval1_task = asyncio.create_task(
monitor_interval_buttons(board.D3, board.D4, interval1)
)
interval2_task = asyncio.create_task(
monitor_interval_buttons(board.D5, board.D6, interval2)
)
await asyncio.gather(led1_task, led2_task, interval1_task, interval2_task)
asyncio.run(main())

View file

@ -0,0 +1,28 @@
# SPDX-FileCopyrightText: 2022 Dan Halbert for Adafruit Industries
#
# SPDX-License-Identifier: MIT
import asyncio
import board
import digitalio
async def blink(pin, interval, count):
with digitalio.DigitalInOut(pin) as led:
led.switch_to_output(value=False)
for _ in range(count):
led.value = True
await asyncio.sleep(interval) # Don't forget the "await"!
led.value = False
await asyncio.sleep(interval) # Don't forget the "await"!
async def main():
led1_task = asyncio.create_task(blink(board.D1, 0.25, 10))
led2_task = asyncio.create_task(blink(board.D2, 0.1, 20))
await asyncio.gather(led1_task, led2_task) # Don't forget "await"!
print("done")
asyncio.run(main())