esp32/modules/machine.py: Add Counter and Encoder classes.
Adds a Python override of the `machine` module, which delegates to the built-in module and adds an implementation of `Counter` and `Encoder`, based on the `esp32.PCNT` class. Original implementation by: Jonathan Hogg <me@jonathanhogg.com> Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
This commit is contained in:
parent
e54553c496
commit
327655905e
1 changed files with 192 additions and 0 deletions
192
ports/esp32/modules/machine.py
Normal file
192
ports/esp32/modules/machine.py
Normal file
|
|
@ -0,0 +1,192 @@
|
|||
import sys
|
||||
|
||||
_path = sys.path
|
||||
sys.path = ()
|
||||
try:
|
||||
import machine as _machine
|
||||
finally:
|
||||
sys.path = _path
|
||||
del _path
|
||||
del sys
|
||||
|
||||
|
||||
from micropython import const
|
||||
import esp32
|
||||
|
||||
if hasattr(esp32, "PCNT"):
|
||||
_PCNT_RANGE = const(32000)
|
||||
|
||||
class _CounterBase:
|
||||
_PCNT = esp32.PCNT
|
||||
# Singletons, keyed by PCNT unit_id (shared by both Counter & Encoder).
|
||||
_INSTANCES = {}
|
||||
|
||||
# Use __new__ to implement a singleton rather than a factory function,
|
||||
# because we need to be able to provide class attributes, e.g.
|
||||
# Counter.RISING, which is not possible if Counter was a function
|
||||
# (functions cannot have attributes in MicroPython).
|
||||
def __new__(cls, unit_id, *_args, **_kwargs):
|
||||
# Find an existing instance for this PCNT unit id.
|
||||
self = cls._INSTANCES.get(unit_id)
|
||||
|
||||
if self:
|
||||
# Verify that this PCNT is being used for the same type
|
||||
# (Encoder or Counter).
|
||||
if not isinstance(self, cls):
|
||||
raise ValueError("PCNT in use")
|
||||
else:
|
||||
# Previously unused PCNT unit.
|
||||
self = object.__new__(cls)
|
||||
cls._INSTANCES[unit_id] = self
|
||||
|
||||
# __init__ will now be called with the same args.
|
||||
return self
|
||||
|
||||
def __init__(self, unit_id, *args, filter_ns=0, **kwargs):
|
||||
self._unit_id = unit_id
|
||||
|
||||
if not hasattr(self, "_pcnt"):
|
||||
# New instance, or previously deinit-ed.
|
||||
self._pcnt = self._PCNT(unit_id, min=-_PCNT_RANGE, max=_PCNT_RANGE)
|
||||
elif not (args or kwargs):
|
||||
# Existing instance, and no args, so accessing the existing
|
||||
# singleton without reconfiguring. Note: This means that
|
||||
# Counter/Encoder cannot be partially re-initalised. Either
|
||||
# you get the existing instance as-is (by passing no arguments
|
||||
# other than the id), or you must pass all the necessary
|
||||
# arguments to additionally re-configure it.
|
||||
return
|
||||
|
||||
# Counter- or Encoder-specific configuration of self._pcnt.
|
||||
self._configure(*args, **kwargs)
|
||||
|
||||
# Common unit configuration.
|
||||
self._pcnt.init(
|
||||
filter=min(max(0, filter_ns * 80 // 1000), 1023),
|
||||
value=0,
|
||||
)
|
||||
|
||||
# Note: We track number-of-overflows rather than the actual count in
|
||||
# order to avoid the IRQ handler overflowing MicroPython's "small int"
|
||||
# range. This gives an effective range of 2**30 overflows. User code
|
||||
# should use counter.value(0) to reset the overflow count.
|
||||
# The ESP32 PCNT resets to zero on under/overflow (i.e. it does not wrap
|
||||
# around to the opposite limit), so each overflow corresponds to exactly
|
||||
# _PCNT_RANGE counts.
|
||||
|
||||
# Reset counter state.
|
||||
self._overflows = 0
|
||||
self._offset = 0
|
||||
|
||||
# Install IRQ handler to handle under/overflow.
|
||||
self._pcnt.irq(self._overflow, self._PCNT.IRQ_MIN | self._PCNT.IRQ_MAX)
|
||||
|
||||
# Start counting.
|
||||
self._pcnt.start()
|
||||
|
||||
# Handle counter under/overflow.
|
||||
def _overflow(self, pcnt):
|
||||
mask = pcnt.irq().flags()
|
||||
if mask & self._PCNT.IRQ_MIN:
|
||||
self._overflows -= 1
|
||||
elif mask & self._PCNT.IRQ_MAX:
|
||||
self._overflows += 1
|
||||
|
||||
# Public machine.Counter & machine.Encoder API.
|
||||
def init(self, *args, **kwargs):
|
||||
self.__init__(self._unit_id, *args, **kwargs)
|
||||
|
||||
# Public machine.Counter & machine.Encoder API.
|
||||
def deinit(self):
|
||||
if hasattr(self, "_pcnt"):
|
||||
self._pcnt.deinit()
|
||||
del self._pcnt
|
||||
|
||||
# Public machine.Counter & machine.Encoder API.
|
||||
def value(self, value=None):
|
||||
if not hasattr(self, "_pcnt"):
|
||||
raise RuntimeError("not initialised")
|
||||
|
||||
# This loop deals with the possibility that a PCNT overflow occurs
|
||||
# between retrieving self._overflows and self._pcnt.value().
|
||||
while True:
|
||||
overflows = self._overflows
|
||||
current = self._pcnt.value()
|
||||
# Calling PCNT.value() forces any pending interrupts to run
|
||||
# for this PCNT unit. So self._overflows must now be the the
|
||||
# value corresponding to the value we read.
|
||||
if self._overflows == overflows:
|
||||
break
|
||||
|
||||
# Compute the result including the number of times we've cycled
|
||||
# through the range, and any applied offset.
|
||||
result = overflows * _PCNT_RANGE + current + self._offset
|
||||
|
||||
# If a new value is specified, then zero out the overflows, and set
|
||||
# self._offset so that it zeros out the current PCNT value. The
|
||||
# mutation to self._overflows is atomic w.r.t. the overflow IRQ
|
||||
# handler because the scheduler only runs on branch instructions.
|
||||
if value is not None:
|
||||
self._overflows -= overflows
|
||||
self._offset = value - current
|
||||
|
||||
return result
|
||||
|
||||
class Counter(_CounterBase):
|
||||
# Public machine.Counter API.
|
||||
RISING = 1
|
||||
FALLING = 2
|
||||
UP = _CounterBase._PCNT.INCREMENT
|
||||
DOWN = _CounterBase._PCNT.DECREMENT
|
||||
|
||||
# Counter-specific configuration.
|
||||
def _configure(self, src, edge=RISING, direction=UP):
|
||||
# Only use the first channel.
|
||||
self._pcnt.init(
|
||||
channel=0,
|
||||
pin=src,
|
||||
rising=direction if edge & Counter.RISING else self._PCNT.IGNORE,
|
||||
falling=direction if edge & Counter.FALLING else self._PCNT.IGNORE,
|
||||
)
|
||||
|
||||
class Encoder(_CounterBase):
|
||||
# Encoder-specific configuration.
|
||||
def _configure(self, phase_a, phase_b, phases=1):
|
||||
if phases not in (1, 2, 4):
|
||||
raise ValueError("phases")
|
||||
# Configure the first channel.
|
||||
self._pcnt.init(
|
||||
channel=0,
|
||||
pin=phase_a,
|
||||
falling=self._PCNT.INCREMENT,
|
||||
rising=self._PCNT.DECREMENT,
|
||||
mode_pin=phase_b,
|
||||
mode_low=self._PCNT.HOLD if phases == 1 else self._PCNT.REVERSE,
|
||||
)
|
||||
if phases == 4:
|
||||
# For 4x quadrature, enable the second channel.
|
||||
self._pcnt.init(
|
||||
channel=1,
|
||||
pin=phase_b,
|
||||
falling=self._PCNT.DECREMENT,
|
||||
rising=self._PCNT.INCREMENT,
|
||||
mode_pin=phase_a,
|
||||
mode_low=self._PCNT.REVERSE,
|
||||
)
|
||||
else:
|
||||
# For 1x and 2x quadrature, disable the second channel.
|
||||
self._pcnt.init(channel=1, pin=None, rising=self._PCNT.IGNORE)
|
||||
self._phases = phases
|
||||
|
||||
def phases(self):
|
||||
return self._phases
|
||||
|
||||
del _CounterBase
|
||||
|
||||
|
||||
del esp32
|
||||
|
||||
|
||||
# Delegate to built-in machine module.
|
||||
def __getattr__(attr):
|
||||
return getattr(_machine, attr)
|
||||
Loading…
Reference in a new issue