Adafruit_CircuitPython_RFM/adafruit_rfm/rfm_common.py
2025-06-23 17:45:54 -05:00

557 lines
23 KiB
Python

# SPDX-FileCopyrightText: 2024 Jerry Needell for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
* Author(s): Jerry Needell
"""
import asyncio
import random
import time
from adafruit_bus_device import spi_device
try:
from typing import Callable, Optional, Type
import busio
import digitalio
from circuitpython_typing import ReadableBuffer, WriteableBuffer
except ImportError:
pass
from micropython import const
HAS_SUPERVISOR = False
try:
import supervisor
if hasattr(supervisor, "ticks_ms"):
HAS_SUPERVISOR = True
except ImportError:
pass
__version__ = "0.0.0+auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_RFM.git"
# RadioHead specific compatibility constants.
_RH_BROADCAST_ADDRESS = const(0xFF)
# The acknowledgement bit in the FLAGS
# The top 4 bits of the flags are reserved for RadioHead. The lower 4 bits are reserved
# for application layer use.
_RH_FLAGS_ACK = const(0x80)
_RH_FLAGS_RETRY = const(0x40)
# supervisor.ticks_ms() contants
_TICKS_PERIOD = const(1 << 29)
_TICKS_MAX = const(_TICKS_PERIOD - 1)
_TICKS_HALFPERIOD = const(_TICKS_PERIOD // 2)
def ticks_diff(ticks1: int, ticks2: int) -> int:
"""Compute the signed difference between two ticks values
assuming that they are within 2**28 ticks
"""
diff = (ticks1 - ticks2) & _TICKS_MAX
diff = ((diff + _TICKS_HALFPERIOD) & _TICKS_MAX) - _TICKS_HALFPERIOD
return diff
def asyncio_to_blocking(function):
"""run async function as normal blocking function"""
def blocking_function(self, *args, **kwargs):
return asyncio.run(function(self, *args, **kwargs))
return blocking_function
async def asyncio_check_timeout(flag: Callable, limit: float, timeout_poll: float) -> bool:
"""test for timeout waiting for specified flag"""
timed_out = False
if HAS_SUPERVISOR:
start = supervisor.ticks_ms()
while not timed_out and not flag():
if ticks_diff(supervisor.ticks_ms(), start) >= limit * 1000:
timed_out = True
await asyncio.sleep(timeout_poll)
else:
start = time.monotonic()
while not timed_out and not flag():
if time.monotonic() - start >= limit:
timed_out = True
await asyncio.sleep(timeout_poll)
return timed_out
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-nested-blocks
class RFMSPI:
"""Base class for SPI type devices"""
class RegisterBits:
"""Simplify register access"""
# Class to simplify access to the many configuration bits avaialable
# on the chip's registers. This is a subclass here instead of using
# a higher level module to increase the efficiency of memory usage
# (all of the instances of this bit class will share the same buffer
# used by the parent RFM69 class instance vs. each having their own
# buffer and taking too much memory).
# Quirk of pylint that it requires public methods for a class. This
# is a decorator class in Python and by design it has no public methods.
# Instead it uses dunder accessors like get and set below. For some
# reason pylint can't figure this out so disable the check.
# pylint: disable=too-few-public-methods
# Again pylint fails to see the true intent of this code and warns
# against private access by calling the write and read functions below.
# This is by design as this is an internally used class. Disable the
# check from pylint.
# pylint: disable=protected-access
def __init__(self, address: int, *, offset: int = 0, bits: int = 1) -> None:
assert 0 <= offset <= 7
assert 1 <= bits <= 8
assert (offset + bits) <= 8
self._address = address
self._mask = 0
for _ in range(bits):
self._mask <<= 1
self._mask |= 1
self._mask <<= offset
self._offset = offset
def __get__(self, obj: Optional["RFM"], objtype: Type["RFM"]) -> int:
reg_value = obj.read_u8(self._address)
return (reg_value & self._mask) >> self._offset
def __set__(self, obj: Optional["RFM"], val: int) -> None:
reg_value = obj.read_u8(self._address)
reg_value &= ~self._mask
reg_value |= (val & 0xFF) << self._offset
obj.write_u8(self._address, reg_value)
# pylint: disable-msg=too-many-arguments
def __init__( # noqa: PLR0913
self,
spi: busio.SPI,
cs_pin: digitalio.DigitalInOut,
baudrate: int = 5000000,
polarity: int = 0,
phase: int = 0,
):
self.spi_device = spi_device.SPIDevice(
spi, cs_pin, baudrate=baudrate, polarity=polarity, phase=phase
)
# initialize last RSSI reading
self.last_rssi = 0.0
"""The RSSI of the last received packet. Stored when the packet was received.
The instantaneous RSSI value may not be accurate once the
operating mode has been changed.
"""
self.last_snr = 0.0
"""The SNR of the last received packet. Stored when the packet was received.
The instantaneous SNR value may not be accurate once the
operating mode has been changed.
"""
# initialize timeouts and delays delays
self.ack_wait = 0.1
"""The delay time before attempting a retry after not receiving an ACK"""
self.receive_timeout = 0.5
"""The amount of time to poll for a received packet.
If no packet is received, the returned packet will be None
"""
self.xmit_timeout = 2.0
"""The amount of time to wait for the HW to transmit the packet.
This is mainly used to prevent a hang due to a HW issue
"""
self.ack_retries = 5
"""The number of ACK retries before reporting a failure."""
self.ack_delay: float = None
"""The delay time before attemting to send an ACK.
If ACKs are being missed try setting this to .1 or .2.
"""
# initialize sequence number counter for reliabe datagram mode
self.sequence_number = 0
# create seen Ids list
self.seen_ids = bytearray(256)
# initialize packet header
# node address - default is broadcast
self.node = _RH_BROADCAST_ADDRESS
"""The default address of this Node. (0-255).
If not 255 (0xff) then only packets address to this node will be accepted.
First byte of the RadioHead header.
"""
# destination address - default is broadcast
self.destination = _RH_BROADCAST_ADDRESS
"""The default destination address for packet transmissions. (0-255).
If 255 (0xff) then any receiving node should accept the packet.
Second byte of the RadioHead header.
"""
# ID - contains seq count for reliable datagram mode
self.identifier = 0
"""Automatically set to the sequence number when send_with_ack() used.
Third byte of the RadioHead header.
"""
# flags - identifies ack/reetry packet for reliable datagram mode
self.flags = 0
"""Upper 4 bits reserved for use by Reliable Datagram Mode.
Lower 4 bits may be used to pass information.
Fourth byte of the RadioHead header.
"""
self.radiohead = True
"""Enable RadioHead compatibility"""
self.crc_error_count = 0
self.timeout_poll = 0.001
# pylint: enable-msg=too-many-arguments
# Global buffer for SPI commands
_BUFFER = bytearray(4)
# pylint: disable=no-member
# Reconsider pylint: disable when this can be tested
def read_into(self, address: int, buf: WriteableBuffer, length: Optional[int] = None) -> None:
"""Read a number of bytes from the specified address into the provided
buffer. If length is not specified (the default) the entire buffer
will be filled."""
if length is None:
length = len(buf)
with self.spi_device as device:
self._BUFFER[0] = address & 0x7F # Strip out top bit to set 0
# value (read).
device.write(self._BUFFER, end=1)
device.readinto(buf, end=length)
def read_u8(self, address: int) -> int:
"""Read a single byte from the provided address and return it."""
self.read_into(address, self._BUFFER, length=1)
return self._BUFFER[0]
def write_from(self, address: int, buf: ReadableBuffer, length: Optional[int] = None) -> None:
"""Write a number of bytes to the provided address and taken from the
provided buffer. If no length is specified (the default) the entire
buffer is written."""
if length is None:
length = len(buf)
with self.spi_device as device:
self._BUFFER[0] = (address | 0x80) & 0xFF # Set top bit to 1 to
# indicate a write.
device.write(self._BUFFER, end=1)
device.write(buf, end=length)
def write_u8(self, address: int, val: int) -> None:
"""Write a byte register to the chip. Specify the 7-bit address and the
8-bit value to write to that address."""
with self.spi_device as device:
self._BUFFER[0] = (address | 0x80) & 0xFF # Set top bit to 1 to indicate a write.
self._BUFFER[1] = val & 0xFF
device.write(self._BUFFER, end=2)
# pylint: disable=too-many-branches
async def asyncio_send( # noqa: PLR0912 PLR0913
self,
data: ReadableBuffer,
*,
keep_listening: bool = False,
destination: Optional[int] = None,
node: Optional[int] = None,
identifier: Optional[int] = None,
flags: Optional[int] = None,
) -> bool:
"""Send a string of data using the transmitter.
You can only send 252 bytes at a time
(limited by chip's FIFO size and appended headers).
if the propert radiohead is True then this appends a 4 byte header
to be compatible with the RadioHead library.
The header defaults to using the initialized attributes:
(destination,node,identifier,flags)
It may be temporarily overidden via the kwargs - destination,node,identifier,flags.
Values passed via kwargs do not alter the attribute settings.
The keep_listening argument should be set to True if you want to start listening
automatically after the packet is sent. The default setting is False.
Returns: True if success or False if the send timed out.
"""
self.idle() # Stop receiving to clear FIFO and keep it clear.
# Combine header and data to form payload
if self.radiohead:
payload = bytearray(4)
if destination is None: # use attribute
payload[0] = self.destination
else: # use kwarg
payload[0] = destination
if node is None: # use attribute
payload[1] = self.node
else: # use kwarg
payload[1] = node
if identifier is None: # use attribute
payload[2] = self.identifier
else: # use kwarg
payload[2] = identifier
if flags is None: # use attribute
payload[3] = self.flags
else: # use kwarg
payload[3] = flags
payload = payload + data
elif destination is not None: # prepend destination for non RH packets
payload = destination.to_bytes(1, "big") + data
else:
payload = data
# Disable pylint warning to not use length as a check for zero.
# This is a puzzling warning as the below code is clearly the most
# efficient and proper way to ensure a precondition that the provided
# buffer be within an expected range of bounds. Disable this check.
# pylint: disable=len-as-condition
assert 0 < len(payload) <= self.max_packet_length
# pylint: enable=len-as-condition
self.fill_fifo(payload)
# Turn on transmit mode to send out the packet.
self.transmit()
# Wait for packet_sent interrupt with explicit polling (not ideal but
# best that can be done right now without interrupts).
timed_out = await asyncio_check_timeout(
self.packet_sent, self.xmit_timeout, self.timeout_poll
)
# Listen again if necessary and return the result packet.
if keep_listening:
self.listen()
else:
# Enter idle mode to stop receiving other packets.
self.idle()
self.clear_interrupt()
return not timed_out
send = asyncio_to_blocking(asyncio_send)
"""Non-asyncio wrapper to Send a string of data using the transmitter
using the same arguments and keywords as asyncio_send()
"""
async def asyncio_send_with_ack(self, data: ReadableBuffer) -> bool:
"""Reliable Datagram mode:
Send a packet with data and wait for an ACK response.
The packet header is automatically generated.
If enabled, the packet transmission will be retried on failure
"""
if not self.radiohead:
raise RuntimeError("send_with_ack onl suppoted in RadioHead mode")
if self.ack_retries:
retries_remaining = self.ack_retries
else:
retries_remaining = 1
got_ack = False
self.sequence_number = (self.sequence_number + 1) & 0xFF
while not got_ack and retries_remaining:
self.identifier = self.sequence_number
await self.asyncio_send(data, keep_listening=True)
# Don't look for ACK from Broadcast message
if self.destination == _RH_BROADCAST_ADDRESS:
got_ack = True
else:
# wait for a packet from our destination
ack_packet = await self.asyncio_receive(timeout=self.ack_wait, with_header=True)
if ack_packet is not None:
if ack_packet[3] & _RH_FLAGS_ACK:
# check the ID
if ack_packet[2] == self.identifier:
got_ack = True
break
# pause before next retry -- random delay
if not got_ack:
# delay by random amount before next try
await asyncio.sleep(self.ack_wait + self.ack_wait * random.random())
retries_remaining = retries_remaining - 1
# set retry flag in packet header
self.flags |= _RH_FLAGS_RETRY
self.flags = 0 # clear flags
return got_ack
send_with_ack = asyncio_to_blocking(asyncio_send_with_ack)
"""Non-asyncio wrapper to Send a string of data using the transmitter
using the same arguments and keywords as asyncio_send_with_ack()
"""
async def asyncio_receive( # noqa: PLR0912
self,
*,
keep_listening: bool = True,
with_header: bool = False,
timeout: Optional[float] = None,
) -> Optional[bytearray]:
"""Wait to receive a packet from the receiver. If a packet is found the payload bytes
are returned, otherwise None is returned (which indicates the timeout elapsed with no
reception).
If keep_listening is True (the default) the chip will immediately enter listening mode
after reception of a packet, otherwise it will fall back to idle mode and ignore any
future reception.
Packets may have a 4-byte header for compatibility with the
RadioHead library.
The header consists of 4 bytes (To,From,ID,Flags). The default setting will strip
the header before returning the packet to the caller.
If with_header is True then the 4 byte header will be returned with the packet.
The payload then begins at packet[4].
"""
if not self.radiohead and with_header:
raise RuntimeError("with_header only supported for RadioHead mode")
timed_out = False
if timeout is None:
timeout = self.receive_timeout
if timeout is not None:
# Wait for the payloadready signal. This is not ideal and will
# surely miss or overflow the FIFO when packets aren't read fast
# enough, however it's the best that can be done from Python without
# interrupt supports.
# Make sure we are listening for packets.
self.listen()
timed_out = await asyncio_check_timeout(self.payload_ready, timeout, self.timeout_poll)
# Payload ready is set, a packet is in the FIFO.
packet = None
# save last RSSI reading
self.last_rssi = self.rssi
self.last_snr = self.snr
# Enter idle mode to stop receiving other packets.
self.idle()
if not timed_out:
if self.enable_crc and self.crc_error:
self.crc_error_count += 1
else:
packet = self.read_fifo()
if (packet is not None) and self.radiohead:
if len(packet) < 5:
# reject the packet if it is too small to contain the RAdioHead Header
packet = None
if packet is not None:
if (
self.node != _RH_BROADCAST_ADDRESS # noqa: PLR1714
and packet[0] != _RH_BROADCAST_ADDRESS
and packet[0] != self.node
):
packet = None
if not with_header and packet is not None: # skip the header if not wanted
packet = packet[4:]
# Listen again if necessary and return the result packet.
if keep_listening:
self.listen()
else:
# Enter idle mode to stop receiving other packets.
self.idle()
self.clear_interrupt()
return packet
receive = asyncio_to_blocking(asyncio_receive)
"""Non-asyncio wrapper to Receive a packet
using the same arguments and keywords as asyncio_receive()
"""
async def asyncio_receive_with_ack( # noqa: PLR0912
self,
*,
keep_listening: bool = True,
with_header: bool = False,
timeout: Optional[float] = None,
) -> Optional[bytearray]:
"""Wait to receive a RadioHead packet from the receiver then send an ACK packet in response.
AKA Reliable Datagram mode.
If a packet is found the payload bytes are returned, otherwise None is returned
(which indicates the timeout elapsed with no reception).
If keep_listening is True (the default) the chip will immediately enter listening mode
after receipt of a packet, otherwise it will fall back to idle mode and ignore
any incomming packets until it is called again.
All packets must have a 4-byte header for compatibility with the RadioHead library.
The header consists of 4 bytes (To,From,ID,Flags). The default setting will strip
the header before returning the packet to the caller.
If with_header is True then the 4 byte header will be returned with the packet.
The payload then begins at packet[4].
"""
if not self.radiohead:
raise RuntimeError("receive_with_ack only supported for RadioHead mode")
timed_out = False
if timeout is None:
timeout = self.receive_timeout
if timeout is not None:
# Wait for the payloadready signal. This is not ideal and will
# surely miss or overflow the FIFO when packets aren't read fast
# enough, however it's the best that can be done from Python without
# interrupt supports.
# Make sure we are listening for packets.
self.listen()
timed_out = await asyncio_check_timeout(self.payload_ready, timeout, self.timeout_poll)
# Payload ready is set, a packet is in the FIFO.
packet = None
# save last RSSI reading
self.last_rssi = self.rssi
self.last_snr = self.snr
# Enter idle mode to stop receiving other packets.
self.idle()
if not timed_out:
if self.enable_crc and self.crc_error:
self.crc_error_count += 1
else:
packet = self.read_fifo()
if (packet is not None) and self.radiohead:
if len(packet) < 5:
# reject the packet if it is too small to contain the RAdioHead Header
packet = None
if packet is not None:
if (
self.node != _RH_BROADCAST_ADDRESS # noqa: PLR1714
and packet[0] != _RH_BROADCAST_ADDRESS
and packet[0] != self.node
):
packet = None
# send ACK unless this was an ACK or a broadcast
elif ((packet[3] & _RH_FLAGS_ACK) == 0) and (
packet[0] != _RH_BROADCAST_ADDRESS
):
# delay before sending Ack to give receiver a chance to get ready
if self.ack_delay is not None:
await asyncio.sleep(self.ack_delay)
# send ACK packet to sender (data is b'!')
await self.asyncio_send(
b"!",
destination=packet[1],
node=packet[0],
identifier=packet[2],
flags=(packet[3] | _RH_FLAGS_ACK),
)
# reject Retries if we have seen this idetifier from this source before
if (self.seen_ids[packet[1]] == packet[2]) and (
packet[3] & _RH_FLAGS_RETRY
):
packet = None
else: # save the packet identifier for this source
self.seen_ids[packet[1]] = packet[2]
if (
packet is not None and (packet[3] & _RH_FLAGS_ACK) != 0
): # Ignore it if it was an ACK packet
packet = None
if not with_header and packet is not None: # skip the header if not wanted
packet = packet[4:]
# Listen again if necessary and return the result packet.
if keep_listening:
self.listen()
else:
# Enter idle mode to stop receiving other packets.
self.idle()
self.clear_interrupt()
return packet
receive_with_ack = asyncio_to_blocking(asyncio_receive_with_ack)
"""Non-asyncio wrapper to Receive a packet
using the same arguments and keywords as asyncio_receive_with_ack()
"""