557 lines
23 KiB
Python
557 lines
23 KiB
Python
# SPDX-FileCopyrightText: 2024 Jerry Needell for Adafruit Industries
|
|
#
|
|
# SPDX-License-Identifier: MIT
|
|
|
|
"""
|
|
|
|
* Author(s): Jerry Needell
|
|
"""
|
|
|
|
import asyncio
|
|
import random
|
|
import time
|
|
|
|
from adafruit_bus_device import spi_device
|
|
|
|
try:
|
|
from typing import Callable, Optional, Type
|
|
|
|
import busio
|
|
import digitalio
|
|
from circuitpython_typing import ReadableBuffer, WriteableBuffer
|
|
|
|
except ImportError:
|
|
pass
|
|
|
|
from micropython import const
|
|
|
|
HAS_SUPERVISOR = False
|
|
|
|
try:
|
|
import supervisor
|
|
|
|
if hasattr(supervisor, "ticks_ms"):
|
|
HAS_SUPERVISOR = True
|
|
except ImportError:
|
|
pass
|
|
|
|
|
|
__version__ = "0.0.0+auto.0"
|
|
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_RFM.git"
|
|
|
|
|
|
# RadioHead specific compatibility constants.
|
|
_RH_BROADCAST_ADDRESS = const(0xFF)
|
|
|
|
# The acknowledgement bit in the FLAGS
|
|
# The top 4 bits of the flags are reserved for RadioHead. The lower 4 bits are reserved
|
|
# for application layer use.
|
|
_RH_FLAGS_ACK = const(0x80)
|
|
_RH_FLAGS_RETRY = const(0x40)
|
|
|
|
|
|
# supervisor.ticks_ms() contants
|
|
_TICKS_PERIOD = const(1 << 29)
|
|
_TICKS_MAX = const(_TICKS_PERIOD - 1)
|
|
_TICKS_HALFPERIOD = const(_TICKS_PERIOD // 2)
|
|
|
|
|
|
def ticks_diff(ticks1: int, ticks2: int) -> int:
|
|
"""Compute the signed difference between two ticks values
|
|
assuming that they are within 2**28 ticks
|
|
"""
|
|
diff = (ticks1 - ticks2) & _TICKS_MAX
|
|
diff = ((diff + _TICKS_HALFPERIOD) & _TICKS_MAX) - _TICKS_HALFPERIOD
|
|
return diff
|
|
|
|
|
|
def asyncio_to_blocking(function):
|
|
"""run async function as normal blocking function"""
|
|
|
|
def blocking_function(self, *args, **kwargs):
|
|
return asyncio.run(function(self, *args, **kwargs))
|
|
|
|
return blocking_function
|
|
|
|
|
|
async def asyncio_check_timeout(flag: Callable, limit: float, timeout_poll: float) -> bool:
|
|
"""test for timeout waiting for specified flag"""
|
|
timed_out = False
|
|
if HAS_SUPERVISOR:
|
|
start = supervisor.ticks_ms()
|
|
while not timed_out and not flag():
|
|
if ticks_diff(supervisor.ticks_ms(), start) >= limit * 1000:
|
|
timed_out = True
|
|
await asyncio.sleep(timeout_poll)
|
|
else:
|
|
start = time.monotonic()
|
|
while not timed_out and not flag():
|
|
if time.monotonic() - start >= limit:
|
|
timed_out = True
|
|
await asyncio.sleep(timeout_poll)
|
|
|
|
return timed_out
|
|
|
|
|
|
# pylint: disable=too-many-instance-attributes
|
|
# pylint: disable=too-many-nested-blocks
|
|
class RFMSPI:
|
|
"""Base class for SPI type devices"""
|
|
|
|
class RegisterBits:
|
|
"""Simplify register access"""
|
|
|
|
# Class to simplify access to the many configuration bits avaialable
|
|
# on the chip's registers. This is a subclass here instead of using
|
|
# a higher level module to increase the efficiency of memory usage
|
|
# (all of the instances of this bit class will share the same buffer
|
|
# used by the parent RFM69 class instance vs. each having their own
|
|
# buffer and taking too much memory).
|
|
|
|
# Quirk of pylint that it requires public methods for a class. This
|
|
# is a decorator class in Python and by design it has no public methods.
|
|
# Instead it uses dunder accessors like get and set below. For some
|
|
# reason pylint can't figure this out so disable the check.
|
|
# pylint: disable=too-few-public-methods
|
|
|
|
# Again pylint fails to see the true intent of this code and warns
|
|
# against private access by calling the write and read functions below.
|
|
# This is by design as this is an internally used class. Disable the
|
|
# check from pylint.
|
|
# pylint: disable=protected-access
|
|
|
|
def __init__(self, address: int, *, offset: int = 0, bits: int = 1) -> None:
|
|
assert 0 <= offset <= 7
|
|
assert 1 <= bits <= 8
|
|
assert (offset + bits) <= 8
|
|
self._address = address
|
|
self._mask = 0
|
|
for _ in range(bits):
|
|
self._mask <<= 1
|
|
self._mask |= 1
|
|
self._mask <<= offset
|
|
self._offset = offset
|
|
|
|
def __get__(self, obj: Optional["RFM"], objtype: Type["RFM"]) -> int:
|
|
reg_value = obj.read_u8(self._address)
|
|
return (reg_value & self._mask) >> self._offset
|
|
|
|
def __set__(self, obj: Optional["RFM"], val: int) -> None:
|
|
reg_value = obj.read_u8(self._address)
|
|
reg_value &= ~self._mask
|
|
reg_value |= (val & 0xFF) << self._offset
|
|
obj.write_u8(self._address, reg_value)
|
|
|
|
# pylint: disable-msg=too-many-arguments
|
|
def __init__( # noqa: PLR0913
|
|
self,
|
|
spi: busio.SPI,
|
|
cs_pin: digitalio.DigitalInOut,
|
|
baudrate: int = 5000000,
|
|
polarity: int = 0,
|
|
phase: int = 0,
|
|
):
|
|
self.spi_device = spi_device.SPIDevice(
|
|
spi, cs_pin, baudrate=baudrate, polarity=polarity, phase=phase
|
|
)
|
|
# initialize last RSSI reading
|
|
self.last_rssi = 0.0
|
|
"""The RSSI of the last received packet. Stored when the packet was received.
|
|
The instantaneous RSSI value may not be accurate once the
|
|
operating mode has been changed.
|
|
"""
|
|
self.last_snr = 0.0
|
|
"""The SNR of the last received packet. Stored when the packet was received.
|
|
The instantaneous SNR value may not be accurate once the
|
|
operating mode has been changed.
|
|
"""
|
|
# initialize timeouts and delays delays
|
|
self.ack_wait = 0.1
|
|
"""The delay time before attempting a retry after not receiving an ACK"""
|
|
self.receive_timeout = 0.5
|
|
"""The amount of time to poll for a received packet.
|
|
If no packet is received, the returned packet will be None
|
|
"""
|
|
self.xmit_timeout = 2.0
|
|
"""The amount of time to wait for the HW to transmit the packet.
|
|
This is mainly used to prevent a hang due to a HW issue
|
|
"""
|
|
self.ack_retries = 5
|
|
"""The number of ACK retries before reporting a failure."""
|
|
self.ack_delay: float = None
|
|
"""The delay time before attemting to send an ACK.
|
|
If ACKs are being missed try setting this to .1 or .2.
|
|
"""
|
|
# initialize sequence number counter for reliabe datagram mode
|
|
self.sequence_number = 0
|
|
# create seen Ids list
|
|
self.seen_ids = bytearray(256)
|
|
# initialize packet header
|
|
# node address - default is broadcast
|
|
self.node = _RH_BROADCAST_ADDRESS
|
|
"""The default address of this Node. (0-255).
|
|
If not 255 (0xff) then only packets address to this node will be accepted.
|
|
First byte of the RadioHead header.
|
|
"""
|
|
# destination address - default is broadcast
|
|
self.destination = _RH_BROADCAST_ADDRESS
|
|
"""The default destination address for packet transmissions. (0-255).
|
|
If 255 (0xff) then any receiving node should accept the packet.
|
|
Second byte of the RadioHead header.
|
|
"""
|
|
# ID - contains seq count for reliable datagram mode
|
|
self.identifier = 0
|
|
"""Automatically set to the sequence number when send_with_ack() used.
|
|
Third byte of the RadioHead header.
|
|
"""
|
|
# flags - identifies ack/reetry packet for reliable datagram mode
|
|
self.flags = 0
|
|
"""Upper 4 bits reserved for use by Reliable Datagram Mode.
|
|
Lower 4 bits may be used to pass information.
|
|
Fourth byte of the RadioHead header.
|
|
"""
|
|
self.radiohead = True
|
|
"""Enable RadioHead compatibility"""
|
|
|
|
self.crc_error_count = 0
|
|
self.timeout_poll = 0.001
|
|
|
|
# pylint: enable-msg=too-many-arguments
|
|
|
|
# Global buffer for SPI commands
|
|
_BUFFER = bytearray(4)
|
|
|
|
# pylint: disable=no-member
|
|
# Reconsider pylint: disable when this can be tested
|
|
def read_into(self, address: int, buf: WriteableBuffer, length: Optional[int] = None) -> None:
|
|
"""Read a number of bytes from the specified address into the provided
|
|
buffer. If length is not specified (the default) the entire buffer
|
|
will be filled."""
|
|
if length is None:
|
|
length = len(buf)
|
|
with self.spi_device as device:
|
|
self._BUFFER[0] = address & 0x7F # Strip out top bit to set 0
|
|
# value (read).
|
|
device.write(self._BUFFER, end=1)
|
|
device.readinto(buf, end=length)
|
|
|
|
def read_u8(self, address: int) -> int:
|
|
"""Read a single byte from the provided address and return it."""
|
|
self.read_into(address, self._BUFFER, length=1)
|
|
return self._BUFFER[0]
|
|
|
|
def write_from(self, address: int, buf: ReadableBuffer, length: Optional[int] = None) -> None:
|
|
"""Write a number of bytes to the provided address and taken from the
|
|
provided buffer. If no length is specified (the default) the entire
|
|
buffer is written."""
|
|
if length is None:
|
|
length = len(buf)
|
|
with self.spi_device as device:
|
|
self._BUFFER[0] = (address | 0x80) & 0xFF # Set top bit to 1 to
|
|
# indicate a write.
|
|
device.write(self._BUFFER, end=1)
|
|
device.write(buf, end=length)
|
|
|
|
def write_u8(self, address: int, val: int) -> None:
|
|
"""Write a byte register to the chip. Specify the 7-bit address and the
|
|
8-bit value to write to that address."""
|
|
with self.spi_device as device:
|
|
self._BUFFER[0] = (address | 0x80) & 0xFF # Set top bit to 1 to indicate a write.
|
|
self._BUFFER[1] = val & 0xFF
|
|
device.write(self._BUFFER, end=2)
|
|
|
|
# pylint: disable=too-many-branches
|
|
|
|
async def asyncio_send( # noqa: PLR0912 PLR0913
|
|
self,
|
|
data: ReadableBuffer,
|
|
*,
|
|
keep_listening: bool = False,
|
|
destination: Optional[int] = None,
|
|
node: Optional[int] = None,
|
|
identifier: Optional[int] = None,
|
|
flags: Optional[int] = None,
|
|
) -> bool:
|
|
"""Send a string of data using the transmitter.
|
|
You can only send 252 bytes at a time
|
|
(limited by chip's FIFO size and appended headers).
|
|
if the propert radiohead is True then this appends a 4 byte header
|
|
to be compatible with the RadioHead library.
|
|
The header defaults to using the initialized attributes:
|
|
(destination,node,identifier,flags)
|
|
It may be temporarily overidden via the kwargs - destination,node,identifier,flags.
|
|
Values passed via kwargs do not alter the attribute settings.
|
|
The keep_listening argument should be set to True if you want to start listening
|
|
automatically after the packet is sent. The default setting is False.
|
|
|
|
Returns: True if success or False if the send timed out.
|
|
"""
|
|
self.idle() # Stop receiving to clear FIFO and keep it clear.
|
|
# Combine header and data to form payload
|
|
if self.radiohead:
|
|
payload = bytearray(4)
|
|
if destination is None: # use attribute
|
|
payload[0] = self.destination
|
|
else: # use kwarg
|
|
payload[0] = destination
|
|
if node is None: # use attribute
|
|
payload[1] = self.node
|
|
else: # use kwarg
|
|
payload[1] = node
|
|
if identifier is None: # use attribute
|
|
payload[2] = self.identifier
|
|
else: # use kwarg
|
|
payload[2] = identifier
|
|
if flags is None: # use attribute
|
|
payload[3] = self.flags
|
|
else: # use kwarg
|
|
payload[3] = flags
|
|
payload = payload + data
|
|
elif destination is not None: # prepend destination for non RH packets
|
|
payload = destination.to_bytes(1, "big") + data
|
|
else:
|
|
payload = data
|
|
# Disable pylint warning to not use length as a check for zero.
|
|
# This is a puzzling warning as the below code is clearly the most
|
|
# efficient and proper way to ensure a precondition that the provided
|
|
# buffer be within an expected range of bounds. Disable this check.
|
|
# pylint: disable=len-as-condition
|
|
assert 0 < len(payload) <= self.max_packet_length
|
|
# pylint: enable=len-as-condition
|
|
self.fill_fifo(payload)
|
|
# Turn on transmit mode to send out the packet.
|
|
self.transmit()
|
|
# Wait for packet_sent interrupt with explicit polling (not ideal but
|
|
# best that can be done right now without interrupts).
|
|
timed_out = await asyncio_check_timeout(
|
|
self.packet_sent, self.xmit_timeout, self.timeout_poll
|
|
)
|
|
# Listen again if necessary and return the result packet.
|
|
if keep_listening:
|
|
self.listen()
|
|
else:
|
|
# Enter idle mode to stop receiving other packets.
|
|
self.idle()
|
|
self.clear_interrupt()
|
|
return not timed_out
|
|
|
|
send = asyncio_to_blocking(asyncio_send)
|
|
"""Non-asyncio wrapper to Send a string of data using the transmitter
|
|
using the same arguments and keywords as asyncio_send()
|
|
"""
|
|
|
|
async def asyncio_send_with_ack(self, data: ReadableBuffer) -> bool:
|
|
"""Reliable Datagram mode:
|
|
Send a packet with data and wait for an ACK response.
|
|
The packet header is automatically generated.
|
|
If enabled, the packet transmission will be retried on failure
|
|
"""
|
|
if not self.radiohead:
|
|
raise RuntimeError("send_with_ack onl suppoted in RadioHead mode")
|
|
if self.ack_retries:
|
|
retries_remaining = self.ack_retries
|
|
else:
|
|
retries_remaining = 1
|
|
got_ack = False
|
|
self.sequence_number = (self.sequence_number + 1) & 0xFF
|
|
while not got_ack and retries_remaining:
|
|
self.identifier = self.sequence_number
|
|
await self.asyncio_send(data, keep_listening=True)
|
|
# Don't look for ACK from Broadcast message
|
|
if self.destination == _RH_BROADCAST_ADDRESS:
|
|
got_ack = True
|
|
else:
|
|
# wait for a packet from our destination
|
|
ack_packet = await self.asyncio_receive(timeout=self.ack_wait, with_header=True)
|
|
if ack_packet is not None:
|
|
if ack_packet[3] & _RH_FLAGS_ACK:
|
|
# check the ID
|
|
if ack_packet[2] == self.identifier:
|
|
got_ack = True
|
|
break
|
|
# pause before next retry -- random delay
|
|
if not got_ack:
|
|
# delay by random amount before next try
|
|
await asyncio.sleep(self.ack_wait + self.ack_wait * random.random())
|
|
retries_remaining = retries_remaining - 1
|
|
# set retry flag in packet header
|
|
self.flags |= _RH_FLAGS_RETRY
|
|
self.flags = 0 # clear flags
|
|
return got_ack
|
|
|
|
send_with_ack = asyncio_to_blocking(asyncio_send_with_ack)
|
|
"""Non-asyncio wrapper to Send a string of data using the transmitter
|
|
using the same arguments and keywords as asyncio_send_with_ack()
|
|
"""
|
|
|
|
async def asyncio_receive( # noqa: PLR0912
|
|
self,
|
|
*,
|
|
keep_listening: bool = True,
|
|
with_header: bool = False,
|
|
timeout: Optional[float] = None,
|
|
) -> Optional[bytearray]:
|
|
"""Wait to receive a packet from the receiver. If a packet is found the payload bytes
|
|
are returned, otherwise None is returned (which indicates the timeout elapsed with no
|
|
reception).
|
|
If keep_listening is True (the default) the chip will immediately enter listening mode
|
|
after reception of a packet, otherwise it will fall back to idle mode and ignore any
|
|
future reception.
|
|
Packets may have a 4-byte header for compatibility with the
|
|
RadioHead library.
|
|
The header consists of 4 bytes (To,From,ID,Flags). The default setting will strip
|
|
the header before returning the packet to the caller.
|
|
If with_header is True then the 4 byte header will be returned with the packet.
|
|
The payload then begins at packet[4].
|
|
"""
|
|
if not self.radiohead and with_header:
|
|
raise RuntimeError("with_header only supported for RadioHead mode")
|
|
timed_out = False
|
|
if timeout is None:
|
|
timeout = self.receive_timeout
|
|
if timeout is not None:
|
|
# Wait for the payloadready signal. This is not ideal and will
|
|
# surely miss or overflow the FIFO when packets aren't read fast
|
|
# enough, however it's the best that can be done from Python without
|
|
# interrupt supports.
|
|
# Make sure we are listening for packets.
|
|
self.listen()
|
|
timed_out = await asyncio_check_timeout(self.payload_ready, timeout, self.timeout_poll)
|
|
# Payload ready is set, a packet is in the FIFO.
|
|
packet = None
|
|
# save last RSSI reading
|
|
self.last_rssi = self.rssi
|
|
self.last_snr = self.snr
|
|
|
|
# Enter idle mode to stop receiving other packets.
|
|
self.idle()
|
|
if not timed_out:
|
|
if self.enable_crc and self.crc_error:
|
|
self.crc_error_count += 1
|
|
else:
|
|
packet = self.read_fifo()
|
|
if (packet is not None) and self.radiohead:
|
|
if len(packet) < 5:
|
|
# reject the packet if it is too small to contain the RAdioHead Header
|
|
packet = None
|
|
if packet is not None:
|
|
if (
|
|
self.node != _RH_BROADCAST_ADDRESS # noqa: PLR1714
|
|
and packet[0] != _RH_BROADCAST_ADDRESS
|
|
and packet[0] != self.node
|
|
):
|
|
packet = None
|
|
if not with_header and packet is not None: # skip the header if not wanted
|
|
packet = packet[4:]
|
|
# Listen again if necessary and return the result packet.
|
|
if keep_listening:
|
|
self.listen()
|
|
else:
|
|
# Enter idle mode to stop receiving other packets.
|
|
self.idle()
|
|
self.clear_interrupt()
|
|
return packet
|
|
|
|
receive = asyncio_to_blocking(asyncio_receive)
|
|
"""Non-asyncio wrapper to Receive a packet
|
|
using the same arguments and keywords as asyncio_receive()
|
|
"""
|
|
|
|
async def asyncio_receive_with_ack( # noqa: PLR0912
|
|
self,
|
|
*,
|
|
keep_listening: bool = True,
|
|
with_header: bool = False,
|
|
timeout: Optional[float] = None,
|
|
) -> Optional[bytearray]:
|
|
"""Wait to receive a RadioHead packet from the receiver then send an ACK packet in response.
|
|
AKA Reliable Datagram mode.
|
|
If a packet is found the payload bytes are returned, otherwise None is returned
|
|
(which indicates the timeout elapsed with no reception).
|
|
If keep_listening is True (the default) the chip will immediately enter listening mode
|
|
after receipt of a packet, otherwise it will fall back to idle mode and ignore
|
|
any incomming packets until it is called again.
|
|
All packets must have a 4-byte header for compatibility with the RadioHead library.
|
|
The header consists of 4 bytes (To,From,ID,Flags). The default setting will strip
|
|
the header before returning the packet to the caller.
|
|
If with_header is True then the 4 byte header will be returned with the packet.
|
|
The payload then begins at packet[4].
|
|
"""
|
|
if not self.radiohead:
|
|
raise RuntimeError("receive_with_ack only supported for RadioHead mode")
|
|
timed_out = False
|
|
if timeout is None:
|
|
timeout = self.receive_timeout
|
|
if timeout is not None:
|
|
# Wait for the payloadready signal. This is not ideal and will
|
|
# surely miss or overflow the FIFO when packets aren't read fast
|
|
# enough, however it's the best that can be done from Python without
|
|
# interrupt supports.
|
|
# Make sure we are listening for packets.
|
|
self.listen()
|
|
timed_out = await asyncio_check_timeout(self.payload_ready, timeout, self.timeout_poll)
|
|
# Payload ready is set, a packet is in the FIFO.
|
|
packet = None
|
|
# save last RSSI reading
|
|
self.last_rssi = self.rssi
|
|
self.last_snr = self.snr
|
|
|
|
# Enter idle mode to stop receiving other packets.
|
|
self.idle()
|
|
if not timed_out:
|
|
if self.enable_crc and self.crc_error:
|
|
self.crc_error_count += 1
|
|
else:
|
|
packet = self.read_fifo()
|
|
if (packet is not None) and self.radiohead:
|
|
if len(packet) < 5:
|
|
# reject the packet if it is too small to contain the RAdioHead Header
|
|
packet = None
|
|
if packet is not None:
|
|
if (
|
|
self.node != _RH_BROADCAST_ADDRESS # noqa: PLR1714
|
|
and packet[0] != _RH_BROADCAST_ADDRESS
|
|
and packet[0] != self.node
|
|
):
|
|
packet = None
|
|
# send ACK unless this was an ACK or a broadcast
|
|
elif ((packet[3] & _RH_FLAGS_ACK) == 0) and (
|
|
packet[0] != _RH_BROADCAST_ADDRESS
|
|
):
|
|
# delay before sending Ack to give receiver a chance to get ready
|
|
if self.ack_delay is not None:
|
|
await asyncio.sleep(self.ack_delay)
|
|
# send ACK packet to sender (data is b'!')
|
|
await self.asyncio_send(
|
|
b"!",
|
|
destination=packet[1],
|
|
node=packet[0],
|
|
identifier=packet[2],
|
|
flags=(packet[3] | _RH_FLAGS_ACK),
|
|
)
|
|
# reject Retries if we have seen this idetifier from this source before
|
|
if (self.seen_ids[packet[1]] == packet[2]) and (
|
|
packet[3] & _RH_FLAGS_RETRY
|
|
):
|
|
packet = None
|
|
else: # save the packet identifier for this source
|
|
self.seen_ids[packet[1]] = packet[2]
|
|
if (
|
|
packet is not None and (packet[3] & _RH_FLAGS_ACK) != 0
|
|
): # Ignore it if it was an ACK packet
|
|
packet = None
|
|
if not with_header and packet is not None: # skip the header if not wanted
|
|
packet = packet[4:]
|
|
# Listen again if necessary and return the result packet.
|
|
if keep_listening:
|
|
self.listen()
|
|
else:
|
|
# Enter idle mode to stop receiving other packets.
|
|
self.idle()
|
|
self.clear_interrupt()
|
|
return packet
|
|
|
|
receive_with_ack = asyncio_to_blocking(asyncio_receive_with_ack)
|
|
"""Non-asyncio wrapper to Receive a packet
|
|
using the same arguments and keywords as asyncio_receive_with_ack()
|
|
"""
|