Refactor cycle completion notification system

This commit is contained in:
Roy Hooper 2020-05-13 14:04:01 -04:00
parent b8fe94a4d3
commit d66c9992d0
2 changed files with 307 additions and 326 deletions

View file

@ -48,22 +48,25 @@ import random
from math import ceil from math import ceil
import adafruit_led_animation.helper import adafruit_led_animation.helper
from . import NANOS_PER_SECOND, monotonic_ns from . import NANOS_PER_SECOND, monotonic_ns
from .color import BLACK, WHITE, RAINBOW, colorwheel from .color import BLACK, RAINBOW, colorwheel
__version__ = "0.0.0-auto.0" __version__ = "0.0.0-auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_LED_Animation.git" __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_LED_Animation.git"
class Animation: class Animation:
# pylint: disable=too-many-instance-attributes
""" """
Base class for animations. Base class for animations.
""" """
CYCLE_NOTIFICATIONS_SUPPORTED = False
# pylint: disable=too-many-arguments,too-many-instance-attributes # pylint: disable=too-many-arguments
def __init__(self, pixel_object, speed, color, peers=None, paused=False, name=None): def __init__(self, pixel_object, speed, color, peers=None, paused=False, name=None):
self.pixel_object = pixel_object self.pixel_object = pixel_object
self.pixel_object.auto_write = False self.pixel_object.auto_write = False
self.peers = peers if peers else [] self.peers = peers if peers else []
"""A sequence of animations to trigger .draw() on when this animation draws."""
self._speed_ns = 0 self._speed_ns = 0
self._color = None self._color = None
self._paused = paused self._paused = paused
@ -71,9 +74,11 @@ class Animation:
self._time_left_at_pause = 0 self._time_left_at_pause = 0
self.speed = speed # sets _speed_ns self.speed = speed # sets _speed_ns
self.color = color # Triggers _recompute_color self.color = color # Triggers _recompute_color
self.done_cycle_handler = None
self.name = name self.name = name
self.counter = 0 self.draw_count = 0
"""Number of animation frames drawn."""
self.cycle_count = 0
"""Number of animation cycles completed."""
def __str__(self): def __str__(self):
return "<Animation %s: %s>" % (self.__class__.__name__, self.name) return "<Animation %s: %s>" % (self.__class__.__name__, self.name)
@ -93,8 +98,9 @@ class Animation:
return False return False
self.draw() self.draw()
self.counter += 1 self.draw_count += 1
# Draw related animations together
if self.peers: if self.peers:
for peer in self.peers: for peer in self.peers:
peer.draw() peer.draw()
@ -172,13 +178,13 @@ class Animation:
Override as needed. Override as needed.
""" """
def _cycle_done(self): def cycle_complete(self):
""" """
Called by some animations when they complete an animation cycle. Called by some animations when they complete an animation cycle.
Calls done_cycle_handler if one is set. Animations that support cycle complete notifications will have X property set to False.
Override as needed.
""" """
if self.done_cycle_handler: self.cycle_count += 1
self.done_cycle_handler(self) # pylint: disable=not-callable
def reset(self): def reset(self):
""" """
@ -195,12 +201,13 @@ class ColorCycle(Animation):
:param colors: A list of colors to cycle through in ``(r, g, b)`` tuple, or ``0x000000`` hex :param colors: A list of colors to cycle through in ``(r, g, b)`` tuple, or ``0x000000`` hex
format. Defaults to a rainbow color cycle. format. Defaults to a rainbow color cycle.
""" """
def __init__(self, pixel_object, speed, colors=RAINBOW, name=None): def __init__(self, pixel_object, speed, colors=RAINBOW, name=None):
self.colors = colors self.colors = colors
super(ColorCycle, self).__init__(pixel_object, speed, colors[0], name=name) super(ColorCycle, self).__init__(pixel_object, speed, colors[0], name=name)
self._generator = self._color_generator() self._generator = self._color_generator()
cycle_complete_supported = True
def draw(self): def draw(self):
next(self._generator) next(self._generator)
self.pixel_object.fill(self.color) self.pixel_object.fill(self.color)
@ -213,7 +220,7 @@ class ColorCycle(Animation):
yield yield
index = (index + 1) % len(self.colors) index = (index + 1) % len(self.colors)
if index == len(self.colors): if index == len(self.colors):
self._cycle_done() self.cycle_complete()
def reset(self): def reset(self):
""" """
@ -252,9 +259,6 @@ class Solid(ColorCycle):
def _recompute_color(self, color): def _recompute_color(self, color):
self.colors = [color] self.colors = [color]
def _cycle_done(self):
pass
class Comet(Animation): class Comet(Animation):
""" """
@ -293,6 +297,8 @@ class Comet(Animation):
self._generator = self._comet_generator() self._generator = self._comet_generator()
super(Comet, self).__init__(pixel_object, speed, color, name=name) super(Comet, self).__init__(pixel_object, speed, color, name=name)
cycle_complete_supported = True
def _recompute_color(self, color): def _recompute_color(self, color):
pass pass
@ -338,7 +344,7 @@ class Comet(Animation):
if self.bounce: if self.bounce:
self.reverse = not self.reverse self.reverse = not self.reverse
if not self.bounce or cycle_passes == 2: if not self.bounce or cycle_passes == 2:
self._cycle_done() self.cycle_complete()
cycle_passes = 0 cycle_passes = 0
def draw(self): def draw(self):
@ -453,6 +459,8 @@ class Pulse(Animation):
self._generator = None self._generator = None
self.reset() self.reset()
cycle_complete_supported = True
def draw(self): def draw(self):
color = next(self._generator) color = next(self._generator)
self.fill(color) self.fill(color)
@ -485,6 +493,8 @@ class Rainbow(Animation):
self._period = period self._period = period
self._generator = self._color_wheel_generator() self._generator = self._color_wheel_generator()
cycle_complete_supported = True
def _color_wheel_generator(self): def _color_wheel_generator(self):
period = int(self._period * NANOS_PER_SECOND) period = int(self._period * NANOS_PER_SECOND)
@ -497,7 +507,7 @@ class Rainbow(Animation):
last_update = now last_update = now
pos = cycle_position = (cycle_position + time_since_last_draw) % period pos = cycle_position = (cycle_position + time_since_last_draw) % period
if pos < last_pos: if pos < last_pos:
self._cycle_done() self.cycle_complete()
last_pos = pos last_pos = pos
wheel_index = int((pos / period) * 256) wheel_index = int((pos / period) * 256)
self.pixel_object[:] = [ self.pixel_object[:] = [
@ -612,6 +622,8 @@ class Chase(Animation):
super(Chase, self).__init__(pixel_object, speed, color, name=name) super(Chase, self).__init__(pixel_object, speed, color, name=name)
cycle_complete_supported = True
@property @property
def reverse(self): def reverse(self):
""" """
@ -645,7 +657,7 @@ class Chase(Animation):
self.pixel_object[:] = [next(colorgen) for _ in self.pixel_object] self.pixel_object[:] = [next(colorgen) for _ in self.pixel_object]
if self._offset == 0: if self._offset == 0:
self._cycle_done() self.cycle_complete()
self._offset = (self._offset + self._direction) % self._repeat_width self._offset = (self._offset + self._direction) % self._repeat_width
def bar_color(self, n, pixel_no=0): # pylint: disable=unused-argument def bar_color(self, n, pixel_no=0): # pylint: disable=unused-argument
@ -696,311 +708,6 @@ class RainbowChase(Chase):
def bar_color(self, n, pixel_no=0): def bar_color(self, n, pixel_no=0):
return self._colors[self._color_idx - n] return self._colors[self._color_idx - n]
def _cycle_done(self): def cycle_complete(self):
self._color_idx = (self._color_idx + self._direction) % len(self._colors) self._color_idx = (self._color_idx + self._direction) % len(self._colors)
super(RainbowChase, self)._cycle_done() super(RainbowChase, self).cycle_complete()
class AnimationSequence:
"""
A sequence of Animations to run in succession, looping forever.
Advances manually or at the specified interval.
:param members: The animation objects or groups.
:param int advance_interval: Time in seconds between animations if cycling
automatically. Defaults to ``None``.
:param bool auto_clear: Clear the pixels between animations. If ``True``, the current animation
will be cleared from the pixels before the next one starts.
Defaults to ``False``.
:param bool random_order: Activate the animations in a random order. Defaults to ``False``.
:param bool auto_reset:
.. code-block:: python
import board
import neopixel
from adafruit_led_animation.animation import AnimationSequence, Blink, Comet, Sparkle
import adafruit_led_animation.color as color
strip_pixels = neopixel.NeoPixel(board.A1, 30, brightness=1, auto_write=False)
blink = Blink(strip_pixels, 0.2, color.RED)
comet = Comet(strip_pixels, 0.1, color.BLUE)
sparkle = Sparkle(strip_pixels, 0.05, color.GREEN)
animations = AnimationSequence(blink, comet, sparkle, advance_interval=1)
while True:
animations.animate()
"""
# pylint: disable=too-many-instance-attributes
def __init__(self, *members, advance_interval=None, auto_clear=False, random_order=False,
auto_reset=False):
self._members = members
self._advance_interval = (
advance_interval * NANOS_PER_SECOND if advance_interval else None
)
self._last_advance = monotonic_ns()
self._current = 0
self._auto_clear = auto_clear
self._auto_reset = auto_reset
self.clear_color = BLACK
self._paused = False
self._paused_at = 0
self._random = random_order
if random_order:
self._current = random.randint(0, len(self._members) - 1)
self._color = None
self.done_cycle_handler = None
for item in self._members:
item.done_cycle_handler = self.done_handler
def done_handler(self, animation):
"""
Called when an animation sequence is done.
"""
def _auto_advance(self):
if not self._advance_interval:
return
now = monotonic_ns()
if now - self._last_advance > self._advance_interval:
self._last_advance = now
if self._random:
self.random()
else:
self.next()
def activate(self, index):
"""
Activates a specific animation.
"""
if isinstance(index, str):
self._current = [member.name for member in self._members].index(index)
else:
self._current = index
if self._auto_clear:
self.fill(self.clear_color)
if self._color:
self.current_animation.color = self._color
def next(self):
"""
Jump to the next animation.
"""
current = self._current
self.activate((self._current + 1) % len(self._members))
if self._auto_reset:
self.current_animation.reset()
if current > self._current:
self._cycle_done()
def random(self):
"""
Jump to a random animation.
"""
self.activate(random.randint(0, len(self._members) - 1))
def animate(self):
"""
Call animate() from your code's main loop. It will draw the current animation
or go to the next animation based on the advance_interval if set.
:return: True if the animation draw cycle was triggered, otherwise False.
"""
if not self._paused:
self._auto_advance()
return self.current_animation.animate()
@property
def current_animation(self):
"""
Returns the current animation in the sequence.
"""
return self._members[self._current]
@property
def color(self):
"""
Use this property to change the color of all animations in the sequence.
"""
return self._color
@color.setter
def color(self, color):
self._color = color
self.current_animation.color = color
def fill(self, color):
"""
Fills the current animation with a color.
"""
self.current_animation.fill(color)
def freeze(self):
"""
Freeze the current animation in the sequence.
Also stops auto_advance.
"""
if self._paused:
return
self._paused = True
self._paused_at = monotonic_ns()
self.current_animation.freeze()
def resume(self):
"""
Resume the current animation in the sequence, and resumes auto advance if enabled.
"""
if not self._paused:
return
self._paused = False
now = monotonic_ns()
self._last_advance += now - self._paused_at
self._paused_at = 0
self.current_animation.resume()
def animation_done(self, animation):
"""
Called by some animations when they finish a sequence.
"""
def _cycle_done(self):
"""
Called when the (first) member animation cycles.
Calls done_cycle_handler if one is set.
"""
if self.done_cycle_handler:
self.done_cycle_handler(self) # pylint: disable=not-callable
def reset(self):
"""
Resets the current animation.
"""
self.current_animation.reset()
class NotifiedAnimationSequence(AnimationSequence):
"""Prints the current animation type (e.g. ``RainbowComet``, ``Chase``) and ``name`` (e.g. the
string from ``name=`` in the animation setup). Use for debugging when running multiple versions
of the same animation, or simply to print the names to the serial console. Used in the same
manner as ``AnimationSequence`` which is a sequence of Animations to run in succession, looping
forever. Advances manually or at the specified interval.
:param members: The animation objects or groups.
:param int advance_interval: Time in seconds between animations if cycling
automatically. Defaults to ``None``.
:param random_order: Activate the animations in a random order. Defaults to ``False``.
.. code-block:: python
import board
import neopixel
from adafruit_led_animation.animation import NotifiedAnimationSequence, Blink, Comet
import adafruit_led_animation.color as color
strip_pixels = neopixel.NeoPixel(board.A1, 30, brightness=1, auto_write=False)
blink = Blink(strip_pixels, 0.2, color.RED, name="red-blink")
comet = Comet(strip_pixels, 0.1, color.BLUE, name="blue-comet")
animations = NotifiedAnimationSequence(blink, comet, advance_interval=1)
while True:
animations.animate()
"""
def activate(self, index):
super(NotifiedAnimationSequence, self).activate(index)
print("Activating:", self.current_animation)
class AnimationGroup:
"""
A group of animations that are active together. An example would be grouping a strip of
pixels connected to a board and the onboard LED.
:param members: The animation objects or groups.
:param bool sync: Synchronises the timing of all members of the group to the settings of the
first member of the group. Defaults to ``False``.
"""
def __init__(self, *members, sync=False):
self._members = members
self._sync = sync
if sync:
main = members[0]
main.peers = members[1:]
# Register the done handler on the last animation.
self.done_cycle_handler = None
if not self._members:
return
self._members[-1].done_cycle_handler = self.done_handler
def animate(self):
"""
Call animate() from your code's main loop. It will draw all of the animations
in the group.
:return: True if any animation draw cycle was triggered, otherwise False.
"""
if self._sync:
return self._members[0].animate()
return any([item.animate() for item in self._members])
@property
def color(self):
"""
Use this property to change the color of all members of the animation group.
"""
return None
@color.setter
def color(self, color):
for item in self._members:
item.color = color
def fill(self, color):
"""
Fills all pixel objects in the group with a color.
"""
for item in self._members:
item.fill(color)
def freeze(self):
"""
Freeze all animations in the group.
"""
for item in self._members:
item.freeze()
def resume(self):
"""
Resume all animations in the group.
"""
for item in self._members:
item.resume()
def done_handler(self, animation): # pylint: disable=unused-argument
"""
Called by some animations when they complete a cycle. For an AnimationGroup this is the
first member of the group, if any.
"""
self._cycle_done()
def _cycle_done(self):
"""
Called when the (first) member animation cycles.
Calls done_cycle_handler if one is set.
"""
if self.done_cycle_handler:
self.done_cycle_handler(self) # pylint: disable=not-callable
def reset(self):
"""
Resets the animations in the group.
"""
for item in self._members:
item.reset()

View file

@ -44,7 +44,9 @@ Implementation Notes
""" """
import math import math
import random
from . import NANOS_PER_SECOND, monotonic_ns from . import NANOS_PER_SECOND, monotonic_ns
from .color import BLACK
class PixelMap: class PixelMap:
@ -271,8 +273,7 @@ def pulse_generator(period: float, animation_object, white=False):
last_update = now last_update = now
pos = cycle_position = (cycle_position + time_since_last_draw) % period pos = cycle_position = (cycle_position + time_since_last_draw) % period
if pos < last_pos: if pos < last_pos:
if animation_object.done_cycle_handler: animation_object.cycle_complete()
animation_object.done_cycle_handler(animation_object)
last_pos = pos last_pos = pos
if pos > half_period: if pos > half_period:
pos = period - pos pos = period - pos
@ -283,3 +284,276 @@ def pulse_generator(period: float, animation_object, white=False):
fill_color[1] = int(fill_color[1] * intensity) fill_color[1] = int(fill_color[1] * intensity)
fill_color[2] = int(fill_color[2] * intensity) fill_color[2] = int(fill_color[2] * intensity)
yield fill_color yield fill_color
def _wrap_cycle_complete(obj, animation):
if not animation.cycle_complete_supported:
raise NotImplementedError(animation + " does not support cycle_complete")
method = animation.cycle_complete
def wrapper():
method()
obj.cycle_complete()
return wrapper
class AnimationGroup:
"""
A group of animations that are active together. An example would be grouping a strip of
pixels connected to a board and the onboard LED.
:param members: The animation objects or groups.
:param bool sync: Synchronises the timing of all members of the group to the settings of the
first member of the group. Defaults to ``False``.
"""
def __init__(self, *members, sync=False):
if not members:
raise ValueError("At least one member required in an AnimationGroup")
self._members = members
self._sync = sync
if sync:
main = members[0]
main.peers = members[1:]
# Catch cycle_complete on the last animation.
self._members[-1].animation_cycle_done = _wrap_cycle_complete(self, self._members[-1])
self.cycle_complete_supported = self._members[-1].cycle_complete_supported
def cycle_complete(self):
"""
Called when the animation group is done the last animation in the sequence.
"""
def animate(self):
"""
Call animate() from your code's main loop. It will draw all of the animations
in the group.
:return: True if any animation draw cycle was triggered, otherwise False.
"""
if self._sync:
return self._members[0].animate()
return any([item.animate() for item in self._members])
@property
def color(self):
"""
Use this property to change the color of all members of the animation group.
"""
return None
@color.setter
def color(self, color):
for item in self._members:
item.color = color
def fill(self, color):
"""
Fills all pixel objects in the group with a color.
"""
for item in self._members:
item.fill(color)
def freeze(self):
"""
Freeze all animations in the group.
"""
for item in self._members:
item.freeze()
def resume(self):
"""
Resume all animations in the group.
"""
for item in self._members:
item.resume()
def reset(self):
"""
Resets the animations in the group.
"""
for item in self._members:
item.reset()
class AnimationSequence:
"""
A sequence of Animations to run in succession, looping forever.
Advances manually, or at the specified interval.
:param members: The animation objects or groups.
:param int advance_interval: Time in seconds between animations if cycling
automatically. Defaults to ``None``.
:param bool auto_clear: Clear the pixels between animations. If ``True``, the current animation
will be cleared from the pixels before the next one starts.
Defaults to ``False``.
:param bool random_order: Activate the animations in a random order. Defaults to ``False``.
:param bool auto_reset: Automatically call reset() on animations when changing animations.
:param bool advance_on_cycle_complete: Automatically advance when `cycle_complete` is triggered
on member animations. All Animations must support
cycle_complete to use this.
.. code-block:: python
import board
import neopixel
from adafruit_led_animation.animation import AnimationSequence, Blink, Comet, Sparkle
import adafruit_led_animation.color as color
strip_pixels = neopixel.NeoPixel(board.A1, 30, brightness=1, auto_write=False)
blink = Blink(strip_pixels, 0.2, color.RED)
comet = Comet(strip_pixels, 0.1, color.BLUE)
sparkle = Sparkle(strip_pixels, 0.05, color.GREEN)
animations = AnimationSequence(blink, comet, sparkle, advance_interval=1)
while True:
animations.animate()
"""
# pylint: disable=too-many-instance-attributes
def __init__(self, *members, advance_interval=None, auto_clear=False, random_order=False,
auto_reset=False, advance_on_cycle_complete=False):
self._members = members
self._advance_interval = (
advance_interval * NANOS_PER_SECOND if advance_interval else None
)
self._last_advance = monotonic_ns()
self._current = 0
self.auto_clear = auto_clear
self.auto_reset = auto_reset
self.advance_on_cycle_complete = advance_on_cycle_complete
self.clear_color = BLACK
self._paused = False
self._paused_at = 0
self._random = random_order
if random_order:
self._current = random.randint(0, len(self._members) - 1)
self._color = None
for item in self._members:
item.animation_cycle_done = _wrap_cycle_complete(self, item)
cycle_complete_supported = True
def cycle_complete(self):
"""
Called when the animation sequence is done all animations in the sequence.
Advances the animation if advance_on_cycle_complete is True.
"""
if self.advance_on_cycle_complete:
self._advance()
def _auto_advance(self):
if not self._advance_interval:
return
now = monotonic_ns()
if now - self._last_advance > self._advance_interval:
self._last_advance = now
self._advance()
def _advance(self):
if self._random:
self.random()
else:
self.next()
def activate(self, index):
"""
Activates a specific animation.
"""
if isinstance(index, str):
self._current = [member.name for member in self._members].index(index)
else:
self._current = index
if self.auto_clear:
self.fill(self.clear_color)
if self._color:
self.current_animation.color = self._color
def next(self):
"""
Jump to the next animation.
"""
current = self._current
self.activate((self._current + 1) % len(self._members))
if self.auto_reset:
self.current_animation.reset()
if current > self._current:
self.cycle_complete()
def random(self):
"""
Jump to a random animation.
"""
self.activate(random.randint(0, len(self._members) - 1))
def animate(self):
"""
Call animate() from your code's main loop. It will draw the current animation
or go to the next animation based on the advance_interval if set.
:return: True if the animation draw cycle was triggered, otherwise False.
"""
if not self._paused:
self._auto_advance()
return self.current_animation.animate()
@property
def current_animation(self):
"""
Returns the current animation in the sequence.
"""
return self._members[self._current]
@property
def color(self):
"""
Use this property to change the color of all animations in the sequence.
"""
return self._color
@color.setter
def color(self, color):
self._color = color
self.current_animation.color = color
def fill(self, color):
"""
Fills the current animation with a color.
"""
self.current_animation.fill(color)
def freeze(self):
"""
Freeze the current animation in the sequence.
Also stops auto_advance.
"""
if self._paused:
return
self._paused = True
self._paused_at = monotonic_ns()
self.current_animation.freeze()
def resume(self):
"""
Resume the current animation in the sequence, and resumes auto advance if enabled.
"""
if not self._paused:
return
self._paused = False
now = monotonic_ns()
self._last_advance += now - self._paused_at
self._paused_at = 0
self.current_animation.resume()
def reset(self):
"""
Resets the current animation.
"""
self.current_animation.reset()