checkpoint: neopixel example works

This commit is contained in:
Jeff Epler 2024-12-10 10:08:24 -06:00
parent 1fb9ad654d
commit b8c92b3b9c
4 changed files with 208 additions and 94 deletions

View file

@ -6,4 +6,4 @@ Contents:
.. toctree::
:maxdepth: 2
neopixel_write_pi5
adafruit_rp1pio

View file

@ -1,22 +1,67 @@
# SPDX-FileCopyrightText: 2024 Jeff Epler, written for Adafruit Industries
# SPDX-FileCopyrightText: 2021 Scott Shawcroft, written for Adafruit Industries
#
# SPDX-License-Identifier: MIT
import time
import adafruit_pioasm
import adafruit_pixelbuf
import adafruit_rp1pio
import board
import neopixel_write_pi5
from adafruit_led_animation.animation.rainbow import Rainbow
from adafruit_led_animation.animation.rainbowchase import RainbowChase
from adafruit_led_animation.animation.rainbowcomet import RainbowComet
from adafruit_led_animation.animation.rainbowsparkle import RainbowSparkle
from adafruit_led_animation.sequence import AnimationSequence
# chosen because it's on a connector on the braincraft hat
NEOPIXEL = board.D13
# NeoPixels are 800khz bit streams. We are choosing zeros as <312ns hi, 936 lo>
# and ones as <700 ns hi, 556 ns lo>.
# The first two instructions always run while only one of the two final
# instructions run per bit. We start with the low period because it can be
# longer while waiting for more data.
program = """
.program ws2812
.side_set 1
.wrap_target
bitloop:
out x 1 side 0 [6]; Drive low. Side-set still takes place before instruction stalls.
jmp !x do_zero side 1 [3]; Branch on the bit we shifted out previous delay. Drive high.
do_one:
jmp bitloop side 1 [4]; Continue driving high, for a one (long pulse)
do_zero:
nop side 0 [4]; Or drive low, for a zero (short pulse)
.wrap
"""
assembled = adafruit_pioasm.assemble(program)
class Pi5Pixelbuf(adafruit_pixelbuf.PixelBuf):
def __init__(self, pin, size, **kwargs):
self._pin = pin
self.sm = adafruit_rp1pio.StateMachine(
assembled,
frequency=12_800_000, # to get appropriate sub-bit times in PIO program
first_sideset_pin=pin,
auto_pull=True,
out_shift_right=False,
pull_threshold=8,
)
#print("real frequency", self.sm.frequency)
super().__init__(size=size, **kwargs)
def _transmit(self, buf):
neopixel_write_pi5.neopixel_write(self._pin, buf)
self.sm.write(buf)
pixels = Pi5Pixelbuf(board.D13, 120, auto_write=True, byteorder="WBGR")
pixels = Pi5Pixelbuf(NEOPIXEL, 120, auto_write=True, byteorder="BGR")
pixels.fill(0x10102)
pixels.show()
time.sleep(2)
rainbow = Rainbow(pixels, speed=0.02, period=2)
rainbow_chase = RainbowChase(pixels, speed=0.02, size=5, spacing=3)
@ -37,5 +82,6 @@ try:
while True:
animations.animate()
finally:
time.sleep(.01)
pixels.fill(0)
pixels.show()

View file

@ -11,12 +11,12 @@ __version__ = get_version()
# say from a submodule.
ext_modules = [
Pybind11Extension("rp1pio._piolib",
Pybind11Extension("adafruit_rp1pio",
["src/main.cpp", "src/utils/piolib/piolib.c", "src/utils/piolib/pio_rp1.c"],
define_macros = [('VERSION_INFO', __version__)],
include_dirs = ['./src/utils/piolib/include'],
cxx_std = "20",
extra_compile_args = ["-g3"],
extra_compile_args = ["-g3", "-Og"],
),
]

View file

@ -9,120 +9,187 @@
namespace py = pybind11;
static PIO pio{};
static int sm{-1};
static int offset{-1};
static int last_gpio{-1};
namespace {
static void neopixel_write(py::object gpio_obj, py::buffer buf) {
int gpio = py::getattr(gpio_obj, "_pin", gpio_obj).attr("id").cast<int>();
py::buffer_info info = buf.request();
if (!pio || sm < 0) {
// (is safe to call twice)
if (pio_init()) {
throw std::runtime_error("pio_init() failed");
}
// can't use `pio0` macro as it will call exit() on failure!
pio = pio_open(0);
if(PIO_IS_ERR(pio)) {
throw std::runtime_error(
py::str("Failed to open PIO device (error {})").attr("format")(PIO_ERR_VAL(pio)).cast<std::string>());
}
sm = pio_claim_unused_sm(pio, true);
offset = pio_add_program(pio, &ws2812_program);
pio_sm_clear_fifos(pio, sm);
pio_sm_set_clkdiv(pio, sm, 1.0);
printf("Loaded program at %d, using sm %d\n", offset, sm, gpio);
last_gpio = -1;
PIO pio_open_check() {
if (pio_init()) {
throw std::runtime_error("PIO not available");
}
if (gpio != last_gpio) {
ws2812_program_init(pio, sm, offset, gpio, 800000.0, false);
printf("Initialized program at %d, using sm %d, gpio %d\n", offset, sm, gpio);
last_gpio = gpio;
PIO pio = pio_open(0);
if(PIO_IS_ERR(pio)) {
throw std::runtime_error(
py::str("Failed to open PIO device (error {})").attr("format")(PIO_ERR_VAL(pio)).cast<std::string>());
}
size_t size = info.size * info.itemsize;
if(size > UINT16_MAX) {
throw py::value_error("Too much data");
}
if (pio_sm_config_xfer(pio, sm, PIO_DIR_TO_SM, size, 1)) {
throw std::runtime_error("pio_sm_config_xfer() failed");
}
if (pio_sm_xfer_data(pio, sm, PIO_DIR_TO_SM, size, info.ptr)) {
throw std::runtime_error("pio_sm_xfer_data() failed");
}
return pio;
}
static void free_pio(void) {
if (!pio) { return; }
if (offset <= 0) { pio_remove_program(pio, &ws2812_program, offset); };
offset = -1;
if (sm >= 0) { pio_sm_unclaim(pio, sm); }
sm = -1;
pio_close(pio);
pio = nullptr;
int pio_sm_claim_unused_sm_check(PIO pio) {
int sm = pio_claim_unused_sm(pio, false);
if (sm < 0) {
throw std::runtime_error("No state machine available");
}
return sm;
}
int pio_add_program_check(PIO pio, const struct pio_program *program) {
int offset = pio_add_program(pio, program);
if (offset < 0) {
throw std::runtime_error("Could not load program");
}
return offset;
}
int get_pin_number(py::object gpio_obj) {
return py::getattr(gpio_obj, "_pin", gpio_obj).attr("id").cast<int>();
}
template<class T>
int get_default(py::object o, T default_value) {
if (o.is_none()) { return default_value; }
return o.cast<T>();
}
class StateMachine {
StateMachine(py::buffer program);
~StateMachine();
PIO pio{};
int sm{-1};
int offset{-1};
void clear_fifos();
public:
StateMachine(py::buffer assembled,
double frequency,
py::object first_sideset_pin,
int sideset_pin_count,
bool auto_pull,
bool out_shift_right,
int pull_threshold) {
pio = pio_open_check();
sm = pio_sm_claim_unused_sm_check(pio);
py::buffer_info info = assembled.request();
if (info.itemsize != 2) {
throw py::value_error("assembled: Expected array of type `h`");
}
if (info.size >= 32) {
throw py::value_error("assembled: Exceeds maximum program length (32)");
}
void set_frequency(double frequency);
void get_frequency() const;
struct pio_program program = {
.instructions = reinterpret_cast<uint16_t*>(info.ptr),
.length = static_cast<uint8_t>(info.size),
.origin = -1,
};
offset = pio_add_program_check(pio, &program);
void write(py::buffer buffer);
void read(py::buffer buffer);
pio_sm_config c = {0, 0, 0};
sm_config_set_wrap(&c, offset, offset + info.size - 1);
void gpio_init(int pin_no, int count=1) {
for(int i=0; i<count; i++) {
pio_gpio_init(pin_no + i);
if (!first_sideset_pin.is_none()) {
if (sideset_pin_count < 1 || sideset_pin_count > 5) {
throw py::value_error("sideset_pin_count out of range (must be 1 to 5)");
}
auto first_sideset_pin_number = get_pin_number(first_sideset_pin);
for(int i=0; i<sideset_pin_count; i++) {
pio_gpio_init(pio, first_sideset_pin_number + i);
}
sm_config_set_sideset(&c, sideset_pin_count, /* optional */ false, /* pindirs */ false);
sm_config_set_sideset_pins(&c, first_sideset_pin_number);
}
printf("pull threshold=%d\n", pull_threshold);
sm_config_set_out_shift(&c, out_shift_right, auto_pull, pull_threshold);
double div = clock_get_hz(clk_sys) / frequency;
printf("frequency=%f div=%f\n", frequency, div);
sm_config_set_clkdiv(&c, div);
pio_sm_init(pio, sm, offset, &c);
pio_sm_set_enabled(pio, sm, true);
}
~StateMachine() {
if(!PIO_IS_ERR(pio)) pio_close(pio);
}
void write(py::buffer b) {
py::buffer_info info = b.request();
uint32_t *ptr = reinterpret_cast<uint32_t*>(info.ptr);
std::vector<uint32_t> vec;
printf("itemsize=%zd size=%zd\n", info.itemsize, info.size);
// the DMA controller doesn't replicate 8- and 16-bit values like on rp2, so we have to do it ourselves
if (info.itemsize != 4) {
vec.reserve(info.size);
switch(info.itemsize) {
case 1:
{
printf("replicating 8-bit values to 32 bits\n");
auto *buf = reinterpret_cast<uint8_t*>(info.ptr);
for(pybind11::ssize_t i=0; i<info.size; i++) {
vec.push_back(buf[i] * 0x01010101);
}
break;
}
case 2:
{
printf("replicating 16-bit values to 32 bits\n");
auto *buf = reinterpret_cast<uint16_t*>(info.ptr);
for(pybind11::ssize_t i=0; i<info.size; i++) {
vec.push_back(buf[i] * 0x00010001);
}
}
break;
default:
throw py::value_error("buffer must contain items of 1, 2, or 4 bytes");
}
printf("setting ptr to vec[0]\n");
ptr = &vec[0];
} else {
printf("using items as-is\n");
}
size_t size = info.size * sizeof(uint32_t);
printf("transfer size %zd\n", size);
printf("ptr @ %p info.ptr @ %p\n", ptr, info.ptr);
printf("%08x %08x %08x %08x\n", ptr[0], ptr[1], ptr[2], ptr[3]);
if (pio_sm_config_xfer(pio, sm, PIO_DIR_TO_SM, size, 1)) {
throw std::runtime_error("pio_sm_config_xfer() failed");
}
if (pio_sm_xfer_data(pio, sm, PIO_DIR_TO_SM, size, ptr)) {
throw std::runtime_error("pio_sm_xfer_data() failed");
}
}
void set_consecutive_pindirs(int pin_no, int count, bool is_output) {
pio_sm_set_consecutive_pindirs(pio, sm, pin_no, count, is_output);
}
void set_sideset_pins(int pin_no, int count) {
pio_sm_set_sideset_pins(pio, pin_no);
}
void set_out_shift(bool shift_right, bool autopull, int pull_threshold);
void set_in_shift(bool shift_right, bool autopush, int push_threshold);
void set_fifo_join(pio_fifo_join join);
void set_wrap(size_t wrap_target, size_t wrap);
};
PYBIND11_MODULE(neopixel_write_pi5, m) {
PYBIND11_MODULE(adafruit_rp1pio, m) {
m.doc() = R"pbdoc(
low level wrapper for piolib
----------------------------
Control the RP1 I/O coprocessor
-------------------------------
.. currentmodule:: rp1pio._piolib
.. currentmodule:: adafruit_rp1pio
.. autosummary::
:toctree: _generate
_StateMachine
StateMachine
)pbdoc";
m.def("neopixel_write", &neopixel_write,
py::arg("gpio"),
py::arg("buf"),
R"pbdoc(NeoPixel writing function)pbdoc");
m.def("_free_pio", &free_pio,
R"pbdoc(Release any held PIO resource)pbdoc");
py::class_<StateMachine>(m, "StateMachine")
.def(py::init<py::buffer /* assembled */,
double /* frequency */,
py::object /* first_sideset_pin */,
int /* sideset_pin_count */,
bool /* auto_pull */,
bool /* out_shift_right */,
int /* pull_threshold */>(),
py::arg("assembled"),
py::arg("frequency"),
py::arg("first_sideset_pin") = py::none(),
py::arg("sideset_pin_count") = 1,
py::arg("auto_pull") = false,
py::arg("out_shift_right") = true,
py::arg("pull_threshold") = 32
)
.def("write", &StateMachine::write);
#ifdef VERSION_INFO
m.attr("__version__") = MACRO_STRINGIFY(VERSION_INFO);
@ -130,3 +197,4 @@ PYBIND11_MODULE(neopixel_write_pi5, m) {
m.attr("__version__") = "dev";
#endif
}
}