Merge pull request #941 from makermelissa/main

Add PIO implementation of rotaryio for the Pi 5
This commit is contained in:
Jeff Epler 2025-02-18 13:16:40 -06:00 committed by GitHub
commit c23dc24bae
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 368 additions and 124 deletions

24
LICENSES/BSD-3-Clause.txt Normal file
View file

@ -0,0 +1,24 @@
Copyright (c) 2023, Raspberry Pi Ltd.
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of the copyright holder nor the
names of its contributors may be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View file

@ -0,0 +1,192 @@
# SPDX-FileCopyrightText: 2025 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: MIT
# SPDX-License-Identifier: BSD-3-Clause
"""
`rotaryio` - Support for reading rotation sensors
===========================================================
See `CircuitPython:rotaryio` in CircuitPython for more details.
Raspberry Pi PIO implementation
* Author(s): Melissa LeBlanc-Williams
"""
from __future__ import annotations
import array
import microcontroller
try:
import adafruit_pioasm
from adafruit_rp1pio import StateMachine
except ImportError as exc:
raise ImportError(
"adafruit_pioasm and adafruit_rp1pio are required for this module"
) from exc
_n_read = 17
_program = adafruit_pioasm.Program(
"""
;
; Copyright (c) 2023 Raspberry Pi (Trading) Ltd.
;
; SPDX-License-Identifier: BSD-3-Clause
;
.pio_version 0 // only requires PIO version 0
.program quadrature_encoder
; the code must be loaded at address 0, because it uses computed jumps
.origin 0
; the code works by running a loop that continuously shifts the 2 phase pins into
; ISR and looks at the lower 4 bits to do a computed jump to an instruction that
; does the proper "do nothing" | "increment" | "decrement" action for that pin
; state change (or no change)
; ISR holds the last state of the 2 pins during most of the code. The Y register
; keeps the current encoder count and is incremented / decremented according to
; the steps sampled
; the program keeps trying to write the current count to the RX FIFO without
; blocking. To read the current count, the user code must drain the FIFO first
; and wait for a fresh sample (takes ~4 SM cycles on average). The worst case
; sampling loop takes 10 cycles, so this program is able to read step rates up
; to sysclk / 10 (e.g., sysclk 125MHz, max step rate = 12.5 Msteps/sec)
; 00 state
jmp update ; read 00
jmp decrement ; read 01
jmp increment ; read 10
jmp update ; read 11
; 01 state
jmp increment ; read 00
jmp update ; read 01
jmp update ; read 10
jmp decrement ; read 11
; 10 state
jmp decrement ; read 00
jmp update ; read 01
jmp update ; read 10
jmp increment ; read 11
; to reduce code size, the last 2 states are implemented in place and become the
; target for the other jumps
; 11 state
jmp update ; read 00
jmp increment ; read 01
decrement:
; note: the target of this instruction must be the next address, so that
; the effect of the instruction does not depend on the value of Y. The
; same is true for the "jmp y--" below. Basically "jmp y--, <next addr>"
; is just a pure "decrement y" instruction, with no other side effects
jmp y--, update ; read 10
; this is where the main loop starts
.wrap_target
update:
mov isr, y ; read 11
push noblock
sample_pins:
; we shift into ISR the last state of the 2 input pins (now in OSR) and
; the new state of the 2 pins, thus producing the 4 bit target for the
; computed jump into the correct action for this state. Both the PUSH
; above and the OUT below zero out the other bits in ISR
out isr, 2
in pins, 2
; save the state in the OSR, so that we can use ISR for other purposes
mov osr, isr
; jump to the correct state machine action
mov pc, isr
; the PIO does not have a increment instruction, so to do that we do a
; negate, decrement, negate sequence
increment:
mov y, ~y
jmp y--, increment_cont
increment_cont:
mov y, ~y
.wrap ; the .wrap here avoids one jump instruction and saves a cycle too
"""
)
_zero_y = adafruit_pioasm.assemble("set y 0")
class IncrementalEncoder:
"""
IncrementalEncoder determines the relative rotational position based on two series of
pulses. It assumes that the encoders common pin(s) are connected to ground,and enables
pull-ups on pin_a and pin_b.
Create an IncrementalEncoder object associated with the given pins. It tracks the
positional state of an incremental rotary encoder (also known as a quadrature encoder.)
Position is relative to the position when the object is constructed.
"""
def __init__(
self, pin_a: microcontroller.Pin, pin_b: microcontroller.Pin, divisor: int = 4
):
"""Create an incremental encoder on pin_a and the next higher pin
Always operates in "x4" mode (one count per quadrature edge)
Assumes but does not check that pin_b is one above pin_a."""
if pin_b is not None and pin_b.id != pin_a.id + 1:
raise ValueError("pin_b must be None or one higher than pin_a")
try:
self._sm = StateMachine(
_program.assembled,
frequency=0,
init=_zero_y,
first_in_pin=pin_a,
in_pin_count=2,
pull_in_pin_up=0x3,
auto_push=True,
push_threshold=32,
in_shift_right=False,
**_program.pio_kwargs,
)
except RuntimeError as e:
if "(error -13)" in e.args[0]:
raise RuntimeError(
"This feature requires a rules file to allow access to PIO. See "
"https://learn.adafruit.com/circuitpython-on-raspberrypi-linux/"
"using-neopixels-on-the-pi-5#updating-permissions-3189429"
) from e
raise
self._buffer = array.array("i", [0] * _n_read)
self.divisor = divisor
self._position = 0
def deinit(self):
"""Deinitializes the IncrementalEncoder and releases any hardware resources for reuse."""
self._sm.deinit()
def __enter__(self) -> IncrementalEncoder:
"""No-op used by Context Managers."""
return self
def __exit__(self, _type, _value, _traceback):
"""
Automatically deinitializes when exiting a context. See
:ref:`lifetime-and-contextmanagers` for more info.
"""
self.deinit()
@property
def position(self):
"""The current position in terms of pulses. The number of pulses per rotation is defined
by the specific hardware and by the divisor."""
self._sm.readinto(self._buffer) # read N stale values + 1 fresh value
raw_position = self._buffer[-1]
delta = int((raw_position - self._position * self.divisor) / self.divisor)
self._position += delta
return self._position

View file

@ -0,0 +1,139 @@
# SPDX-FileCopyrightText: 2025 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
`rotaryio` - Support for reading rotation sensors
===========================================================
See `CircuitPython:rotaryio` in CircuitPython for more details.
Generic Threading/DigitalIO implementation for Linux
* Author(s): Melissa LeBlanc-Williams
"""
from __future__ import annotations
import threading
import microcontroller
import digitalio
# Define the state transition table for the quadrature encoder
transitions = [
0, # 00 -> 00 no movement
-1, # 00 -> 01 3/4 ccw (11 detent) or 1/4 ccw (00 at detent)
+1, # 00 -> 10 3/4 cw or 1/4 cw
0, # 00 -> 11 non-Gray-code transition
+1, # 01 -> 00 2/4 or 4/4 cw
0, # 01 -> 01 no movement
0, # 01 -> 10 non-Gray-code transition
-1, # 01 -> 11 4/4 or 2/4 ccw
-1, # 10 -> 00 2/4 or 4/4 ccw
0, # 10 -> 01 non-Gray-code transition
0, # 10 -> 10 no movement
+1, # 10 -> 11 4/4 or 2/4 cw
0, # 11 -> 00 non-Gray-code transition
+1, # 11 -> 01 1/4 or 3/4 cw
-1, # 11 -> 10 1/4 or 3/4 ccw
0, # 11 -> 11 no movement
]
class IncrementalEncoder:
"""
IncrementalEncoder determines the relative rotational position based on two series of
pulses. It assumes that the encoders common pin(s) are connected to ground,and enables
pull-ups on pin_a and pin_b.
Create an IncrementalEncoder object associated with the given pins. It tracks the
positional state of an incremental rotary encoder (also known as a quadrature encoder.)
Position is relative to the position when the object is constructed.
"""
def __init__(
self, pin_a: microcontroller.Pin, pin_b: microcontroller.Pin, divisor: int = 4
):
"""
Create an IncrementalEncoder object associated with the given pins. It tracks the
positional state of an incremental rotary encoder (also known as a quadrature encoder.)
Position is relative to the position when the object is constructed.
:param microcontroller.Pin pin_a: The first pin connected to the encoder.
:param microcontroller.Pin pin_b: The second pin connected to the encoder.
:param int divisor: The number of pulses per encoder step. Default is 4.
"""
self._pin_a = digitalio.DigitalInOut(pin_a)
self._pin_a.switch_to_input(pull=digitalio.Pull.UP)
self._pin_b = digitalio.DigitalInOut(pin_b)
self._pin_b.switch_to_input(pull=digitalio.Pull.UP)
self._position = 0
self._last_state = 0
self._divisor = divisor
self._sub_count = 0
self._poll_thread = threading.Thread(target=self._polling_loop, daemon=True)
self._poll_thread.start()
def deinit(self):
"""Deinitializes the IncrementalEncoder and releases any hardware resources for reuse."""
self._pin_a.deinit()
self._pin_b.deinit()
if self._poll_thread.is_alive():
self._poll_thread.join()
def __enter__(self) -> IncrementalEncoder:
"""No-op used by Context Managers."""
return self
def __exit__(self, _type, _value, _traceback):
"""
Automatically deinitializes when exiting a context. See
:ref:`lifetime-and-contextmanagers` for more info.
"""
self.deinit()
@property
def divisor(self) -> int:
"""The divisor of the quadrature signal. Use 1 for encoders without detents, or encoders
with 4 detents per cycle. Use 2 for encoders with 2 detents per cycle. Use 4 for encoders
with 1 detent per cycle."""
return self._divisor
@divisor.setter
def divisor(self, value: int):
self._divisor = value
@property
def position(self) -> int:
"""The current position in terms of pulses. The number of pulses per rotation is defined
by the specific hardware and by the divisor."""
return self._position
@position.setter
def position(self, value: int):
self._position = value
def _get_pin_state(self) -> int:
"""Returns the current state of the pins."""
return self._pin_a.value << 1 | self._pin_b.value
def _polling_loop(self):
while True:
self._poll_encoder()
def _poll_encoder(self):
# Check the state of the pins
# if either pin has changed, update the state
new_state = self._get_pin_state()
if new_state != self._last_state:
self._state_update(new_state)
self._last_state = new_state
def _state_update(self, new_state: int):
new_state &= 3
index = self._last_state << 2 | new_state
sub_increment = transitions[index]
self._sub_count += sub_increment
if self._sub_count >= self._divisor:
self._position += 1
self._sub_count = 0
elif self._sub_count <= -self._divisor:
self._position -= 1
self._sub_count = 0

View file

@ -9,129 +9,18 @@ See `CircuitPython:rotaryio` in CircuitPython for more details.
* Author(s): Melissa LeBlanc-Williams
"""
from __future__ import annotations
import threading
import microcontroller
import digitalio
from adafruit_blinka.agnostic import detector
# Define the state transition table for the quadrature encoder
transitions = [
0, # 00 -> 00 no movement
-1, # 00 -> 01 3/4 ccw (11 detent) or 1/4 ccw (00 at detent)
+1, # 00 -> 10 3/4 cw or 1/4 cw
0, # 00 -> 11 non-Gray-code transition
+1, # 01 -> 00 2/4 or 4/4 cw
0, # 01 -> 01 no movement
0, # 01 -> 10 non-Gray-code transition
-1, # 01 -> 11 4/4 or 2/4 ccw
-1, # 10 -> 00 2/4 or 4/4 ccw
0, # 10 -> 01 non-Gray-code transition
0, # 10 -> 10 no movement
+1, # 10 -> 11 4/4 or 2/4 cw
0, # 11 -> 00 non-Gray-code transition
+1, # 11 -> 01 1/4 or 3/4 cw
-1, # 11 -> 10 1/4 or 3/4 ccw
0, # 11 -> 11 no movement
]
# pylint: disable=unused-import
class IncrementalEncoder:
"""
IncrementalEncoder determines the relative rotational position based on two series of
pulses. It assumes that the encoders common pin(s) are connected to ground,and enables
pull-ups on pin_a and pin_b.
Create an IncrementalEncoder object associated with the given pins. It tracks the
positional state of an incremental rotary encoder (also known as a quadrature encoder.)
Position is relative to the position when the object is constructed.
"""
def __init__(
self, pin_a: microcontroller.Pin, pin_b: microcontroller.Pin, divisor: int = 4
):
"""
Create an IncrementalEncoder object associated with the given pins. It tracks the
positional state of an incremental rotary encoder (also known as a quadrature encoder.)
Position is relative to the position when the object is constructed.
:param microcontroller.Pin pin_a: The first pin connected to the encoder.
:param microcontroller.Pin pin_b: The second pin connected to the encoder.
:param int divisor: The number of pulses per encoder step. Default is 4.
"""
self._pin_a = digitalio.DigitalInOut(pin_a)
self._pin_a.switch_to_input(pull=digitalio.Pull.UP)
self._pin_b = digitalio.DigitalInOut(pin_b)
self._pin_b.switch_to_input(pull=digitalio.Pull.UP)
self._position = 0
self._last_state = 0
self._divisor = divisor
self._sub_count = 0
self._poll_thread = threading.Thread(target=self._polling_loop, daemon=True)
self._poll_thread.start()
def deinit(self):
"""Deinitializes the IncrementalEncoder and releases any hardware resources for reuse."""
self._pin_a.deinit()
self._pin_b.deinit()
if self._poll_thread.is_alive():
self._poll_thread.join()
def __enter__(self) -> IncrementalEncoder:
"""No-op used by Context Managers."""
return self
def __exit__(self, _type, _value, _traceback):
"""
Automatically deinitializes when exiting a context. See
:ref:`lifetime-and-contextmanagers` for more info.
"""
self.deinit()
@property
def divisor(self) -> int:
"""The divisor of the quadrature signal. Use 1 for encoders without detents, or encoders
with 4 detents per cycle. Use 2 for encoders with 2 detents per cycle. Use 4 for encoders
with 1 detent per cycle."""
return self._divisor
@divisor.setter
def divisor(self, value: int):
self._divisor = value
@property
def position(self) -> int:
"""The current position in terms of pulses. The number of pulses per rotation is defined
by the specific hardware and by the divisor."""
return self._position
@position.setter
def position(self, value: int):
self._position = value
def _get_pin_state(self) -> int:
"""Returns the current state of the pins."""
return self._pin_a.value << 1 | self._pin_b.value
def _polling_loop(self):
while True:
self._poll_encoder()
def _poll_encoder(self):
# Check the state of the pins
# if either pin has changed, update the state
new_state = self._get_pin_state()
if new_state != self._last_state:
self._state_update(new_state)
self._last_state = new_state
def _state_update(self, new_state: int):
new_state &= 3
index = self._last_state << 2 | new_state
sub_increment = transitions[index]
self._sub_count += sub_increment
if self._sub_count >= self._divisor:
self._position += 1
self._sub_count = 0
elif self._sub_count <= -self._divisor:
self._position -= 1
self._sub_count = 0
# Import any board specific modules here
if detector.board.any_raspberry_pi_5_board:
from adafruit_blinka.microcontroller.bcm283x.rotaryio import IncrementalEncoder
elif detector.board.any_embedded_linux:
# fall back to the generic linux implementation
from adafruit_blinka.microcontroller.generic_linux.rotaryio import (
IncrementalEncoder,
)
else:
# For non-Linux Boards, threading likely will work in the same way
raise NotImplementedError("Board not supported")