rtio: Add a managed memory pool for reads

- Introduce a new Kconfig to enable mempool in RTIO
- Introduce a new RTIO_DEFINE_WITH_MEMPOOL to allocate an RTIO context
  with an associated memory pool.
- Add a new sqe read function rtio_sqe_read_with_pool() for memory pool
  enabled RTIO contexts
- Allow IODevs to allocate only the memory they need via rtio_sqe_rx_buf()
- Allow the consumer to get the allocated buffer via
  rtio_cqe_get_mempool_buffer()
- Consumers need to release the buffer via rtio_release_buffer() when
  processing is complete.

Signed-off-by: Yuval Peress <peress@google.com>
This commit is contained in:
Yuval Peress 2023-04-06 22:49:25 -06:00 committed by Anas Nashif
parent 80d70b4e96
commit dbb470ea7a
10 changed files with 544 additions and 40 deletions

View file

@ -36,6 +36,7 @@
#include <zephyr/rtio/rtio_mpsc.h>
#include <zephyr/sys/__assert.h>
#include <zephyr/sys/atomic.h>
#include <zephyr/sys/mem_blocks.h>
#include <zephyr/device.h>
#include <zephyr/kernel.h>
#include <string.h>
@ -114,6 +115,52 @@ extern "C" {
*/
#define RTIO_SQE_TRANSACTION BIT(1)
/**
* @brief The buffer should be allocated by the RTIO mempool
*
* This flag can only exist if the CONFIG_RTIO_SYS_MEM_BLOCKS Kconfig was
* enabled and the RTIO context was created via the RTIO_DEFINE_WITH_MEMPOOL()
* macro. If set, the buffer associated with the entry was allocated by the
* internal memory pool and should be released as soon as it is no longer
* needed via a call to rtio_release_mempool().
*/
#define RTIO_SQE_MEMPOOL_BUFFER BIT(2)
/**
* @}
*/
/**
* @brief RTIO CQE Flags
* @defgroup rtio_cqe_flags RTIO CQE Flags
* @ingroup rtio_api
* @{
*/
/**
* @brief The entry's buffer was allocated from the RTIO's mempool
*
* If this bit is set, the buffer was allocated from the memory pool and should be recycled as
* soon as the application is done with it.
*/
#define RTIO_CQE_FLAG_MEMPOOL_BUFFER BIT(0)
/**
* @brief Get the value portion of the CQE flags
*
* @param flags The CQE flags value
* @return The value portion of the flags field.
*/
#define RTIO_CQE_FLAG_GET_VALUE(flags) FIELD_GET(GENMASK(31, 16), (flags))
/**
* @brief Prepare a value to be added to the CQE flags field.
*
* @param value The value to prepare
* @return A shifted and masked value that can be added to the flags field with an OR operator.
*/
#define RTIO_CQE_FLAG_PREP_VALUE(value) FIELD_PREP(GENMASK(31, 16), (value))
/**
* @}
*/
@ -209,6 +256,7 @@ struct rtio_sq {
struct rtio_cqe {
int32_t result; /**< Result from operation */
void *userdata; /**< Associated userdata with operation */
uint32_t flags; /**< Flags associated with the operation */
};
/**
@ -269,6 +317,34 @@ struct rtio_executor {
const struct rtio_executor_api *api;
};
/**
* Internal state of the mempool sqe map entry.
*/
enum rtio_mempool_entry_state {
/** The SQE has no mempool buffer allocated */
RTIO_MEMPOOL_ENTRY_STATE_FREE = 0,
/** The SQE has an active mempool buffer allocated */
RTIO_MEMPOOL_ENTRY_STATE_ALLOCATED,
/** The SQE has a mempool buffer allocated that is currently owned by a CQE */
RTIO_MEMPOOL_ENTRY_STATE_ZOMBIE,
RTIO_MEMPOOL_ENTRY_STATE_COUNT,
};
/* Check that we can always fit the state in 2 bits */
BUILD_ASSERT(RTIO_MEMPOOL_ENTRY_STATE_COUNT < 4);
/**
* @brief An entry mapping a mempool entry to its sqe.
*/
struct rtio_mempool_map_entry {
/** The state of the sqe map entry */
enum rtio_mempool_entry_state state : 2;
/** The starting block index into the mempool buffer */
uint16_t block_idx : 15;
/** Number of blocks after the block_idx */
uint16_t block_count : 15;
};
/**
* @brief An RTIO queue pair that both the kernel and application work with
*
@ -313,8 +389,40 @@ struct rtio {
/* Completion queue */
struct rtio_cq *cq;
#ifdef CONFIG_RTIO_SYS_MEM_BLOCKS
/* Memory pool associated with this RTIO context. */
struct sys_mem_blocks *mempool;
/* The size (in bytes) of a single block in the mempool */
uint32_t mempool_blk_size;
/* Map of allocated starting blocks */
struct rtio_mempool_map_entry *mempool_map;
#endif /* CONFIG_RTIO_SYS_MEM_BLOCKS */
};
/** The memory partition associated with all RTIO context information */
extern struct k_mem_partition rtio_partition;
/**
* @brief Compute the mempool block index for a given pointer
*
* @param[in] r RTIO contex
* @param[in] ptr Memory pointer in the mempool
* @return Index of the mempool block associated with the pointer. Or UINT16_MAX if invalid.
*/
#ifdef CONFIG_RTIO_SYS_MEM_BLOCKS
static inline uint16_t __rtio_compute_mempool_block_index(const struct rtio *r, const void *ptr)
{
uintptr_t addr = (uintptr_t)ptr;
uintptr_t buff = (uintptr_t)r->mempool->buffer;
uint32_t buff_size = r->mempool->num_blocks * r->mempool_blk_size;
if (addr < buff || addr >= buff + buff_size) {
return UINT16_MAX;
}
return (addr - buff) / r->mempool_blk_size;
}
#endif
/**
* @brief IO device submission queue entry
@ -408,6 +516,19 @@ static inline void rtio_sqe_prep_read(struct rtio_sqe *sqe,
sqe->userdata = userdata;
}
/**
* @brief Prepare a read op submission with context's mempool
*
* @see rtio_sqe_prep_read()
*/
static inline void rtio_sqe_prep_read_with_pool(struct rtio_sqe *sqe,
const struct rtio_iodev *iodev, int8_t prio,
void *userdata)
{
rtio_sqe_prep_read(sqe, iodev, prio, NULL, 0, userdata);
sqe->flags = RTIO_SQE_MEMPOOL_BUFFER;
}
/**
* @brief Prepare a write op submission
*/
@ -530,6 +651,24 @@ static inline void rtio_sqe_prep_transceive(struct rtio_sqe *sqe,
.data = (iodev_data), \
}
/* clang-format off */
#define _RTIO_DEFINE(name, exec, sq_sz, cq_sz) \
IF_ENABLED(CONFIG_RTIO_SUBMIT_SEM, \
(static K_SEM_DEFINE(_submit_sem_##name, 0, K_SEM_MAX_LIMIT))) \
IF_ENABLED(CONFIG_RTIO_CONSUME_SEM, \
(static K_SEM_DEFINE(_consume_sem_##name, 0, K_SEM_MAX_LIMIT))) \
RTIO_SQ_DEFINE(_sq_##name, sq_sz); \
RTIO_CQ_DEFINE(_cq_##name, cq_sz); \
STRUCT_SECTION_ITERABLE(rtio, name) = { \
.executor = (exec), \
.xcqcnt = ATOMIC_INIT(0), \
IF_ENABLED(CONFIG_RTIO_SUBMIT_SEM, (.submit_sem = &_submit_sem_##name,)) \
IF_ENABLED(CONFIG_RTIO_SUBMIT_SEM, (.submit_count = 0,)) \
IF_ENABLED(CONFIG_RTIO_CONSUME_SEM, (.consume_sem = &_consume_sem_##name,)) \
.sq = (struct rtio_sq *const)&_sq_##name, \
.cq = (struct rtio_cq *const)&_cq_##name,
/* clang-format on */
/**
* @brief Statically define and initialize an RTIO context
*
@ -538,22 +677,58 @@ static inline void rtio_sqe_prep_transceive(struct rtio_sqe *sqe,
* @param sq_sz Size of the submission queue, must be power of 2
* @param cq_sz Size of the completion queue, must be power of 2
*/
#define RTIO_DEFINE(name, exec, sq_sz, cq_sz) \
IF_ENABLED(CONFIG_RTIO_SUBMIT_SEM, \
(static K_SEM_DEFINE(_submit_sem_##name, 0, K_SEM_MAX_LIMIT))) \
IF_ENABLED(CONFIG_RTIO_CONSUME_SEM, \
(static K_SEM_DEFINE(_consume_sem_##name, 0, K_SEM_MAX_LIMIT))) \
RTIO_SQ_DEFINE(_sq_##name, sq_sz); \
RTIO_CQ_DEFINE(_cq_##name, cq_sz); \
STRUCT_SECTION_ITERABLE(rtio, name) = { \
.executor = (exec), \
.xcqcnt = ATOMIC_INIT(0), \
IF_ENABLED(CONFIG_RTIO_SUBMIT_SEM, (.submit_sem = &_submit_sem_##name,)) \
IF_ENABLED(CONFIG_RTIO_SUBMIT_SEM, (.submit_count = 0,)) \
IF_ENABLED(CONFIG_RTIO_CONSUME_SEM, (.consume_sem = &_consume_sem_##name,)) \
.sq = (struct rtio_sq *const)&_sq_##name, \
.cq = (struct rtio_cq *const)&_cq_##name, \
};
/* clang-format off */
#define RTIO_DEFINE(name, exec, sq_sz, cq_sz) \
_RTIO_DEFINE(name, exec, sq_sz, cq_sz) \
}
/* clang-format on */
/**
* @brief Allocate to bss if available
*
* If CONFIG_USERSPACE is selected, allocate to the rtio_partition bss. Maps to:
* K_APP_BMEM(rtio_partition) static
*
* If CONFIG_USERSPACE is disabled, allocate as plain static:
* static
*/
#define RTIO_BMEM COND_CODE_1(CONFIG_USERSPACE, (K_APP_BMEM(rtio_partition) static), (static))
/**
* @brief Allocate as initialized memory if available
*
* If CONFIG_USERSPACE is selected, allocate to the rtio_partition init. Maps to:
* K_APP_DMEM(rtio_partition) static
*
* If CONFIG_USERSPACE is disabled, allocate as plain static:
* static
*/
#define RTIO_DMEM COND_CODE_1(CONFIG_USERSPACE, (K_APP_DMEM(rtio_partition) static), (static))
/**
* @brief Statically define and initialize an RTIO context
*
* @param name Name of the RTIO
* @param exec Symbol for rtio_executor (pointer)
* @param sq_sz Size of the submission queue, must be power of 2
* @param cq_sz Size of the completion queue, must be power of 2
* @param num_blks Number of blocks in the memory pool
* @param blk_size The number of bytes in each block
* @param balign The block alignment
*/
/* clang-format off */
#define RTIO_DEFINE_WITH_MEMPOOL(name, exec, sq_sz, cq_sz, num_blks, blk_size, balign) \
RTIO_BMEM uint8_t __aligned(WB_UP(balign)) \
_mempool_buf_##name[num_blks*WB_UP(blk_size)]; \
_SYS_MEM_BLOCKS_DEFINE_WITH_EXT_BUF(_mempool_##name, WB_UP(blk_size), num_blks, \
_mempool_buf_##name, RTIO_DMEM); \
RTIO_BMEM struct rtio_mempool_map_entry _mempool_map_##name[sq_sz]; \
_RTIO_DEFINE(name, exec, sq_sz, cq_sz) \
.mempool = &_mempool_##name, \
.mempool_blk_size = WB_UP(blk_size), \
.mempool_map = _mempool_map_##name, \
}
/* clang-format on */
/**
* @brief Set the executor of the rtio context
@ -697,6 +872,85 @@ static inline void rtio_cqe_release_all(struct rtio *r)
rtio_spsc_release_all(r->cq);
}
/**
* @brief Compte the CQE flags from the rtio_iodev_sqe entry
*
* @param iodev_sqe The SQE entry in question.
* @return The value that should be set for the CQE's flags field.
*/
static inline uint32_t rtio_cqe_compute_flags(struct rtio_iodev_sqe *iodev_sqe)
{
uint32_t flags = 0;
ARG_UNUSED(iodev_sqe);
#ifdef CONFIG_RTIO_SYS_MEM_BLOCKS
if (iodev_sqe->sqe->op == RTIO_OP_RX && iodev_sqe->sqe->flags & RTIO_SQE_MEMPOOL_BUFFER) {
struct rtio *r = iodev_sqe->r;
int sqe_index = iodev_sqe->sqe - r->sq->buffer;
struct rtio_mempool_map_entry *map_entry = &r->mempool_map[sqe_index];
if (map_entry->state != RTIO_MEMPOOL_ENTRY_STATE_ALLOCATED) {
/* Not allocated to this sqe */
return flags;
}
flags |= RTIO_CQE_FLAG_MEMPOOL_BUFFER;
flags |= RTIO_CQE_FLAG_PREP_VALUE(sqe_index);
map_entry->state = RTIO_MEMPOOL_ENTRY_STATE_ZOMBIE;
}
#endif
return flags;
}
/**
* @brief Retrieve the mempool buffer that was allocated for the CQE.
*
* If the RTIO context contains a memory pool, and the SQE was created by calling
* rtio_sqe_read_with_pool(), this function can be used to retrieve the memory associated with the
* read. Once processing is done, it should be released by calling rtio_release_buffer().
*
* @param[in] r RTIO context
* @param[in] cqe The CQE handling the event.
* @param[out] buff Pointer to the mempool buffer
* @param[out] buff_len Length of the allocated buffer
* @return 0 on success
* @return -EINVAL if the buffer wasn't allocated for this cqe
* @return -ENOTSUP if memory blocks are disabled
*/
__syscall int rtio_cqe_get_mempool_buffer(const struct rtio *r, struct rtio_cqe *cqe,
uint8_t **buff, uint32_t *buff_len);
static inline int z_impl_rtio_cqe_get_mempool_buffer(const struct rtio *r, struct rtio_cqe *cqe,
uint8_t **buff, uint32_t *buff_len)
{
#ifdef CONFIG_RTIO_SYS_MEM_BLOCKS
if (cqe->flags & RTIO_CQE_FLAG_MEMPOOL_BUFFER) {
int map_idx = RTIO_CQE_FLAG_GET_VALUE(cqe->flags);
struct rtio_mempool_map_entry *entry = &r->mempool_map[map_idx];
if (entry->state != RTIO_MEMPOOL_ENTRY_STATE_ZOMBIE) {
return -EINVAL;
}
*buff = r->mempool->buffer + entry->block_idx * r->mempool_blk_size;
*buff_len = entry->block_count * r->mempool_blk_size;
__ASSERT_NO_MSG(*buff >= r->mempool->buffer);
__ASSERT_NO_MSG(*buff <
r->mempool->buffer + r->mempool_blk_size * r->mempool->num_blocks);
return 0;
}
return -EINVAL;
#else
ARG_UNUSED(r);
ARG_UNUSED(cqe);
ARG_UNUSED(buff);
ARG_UNUSED(buff_len);
return -ENOTSUP;
#endif
}
/**
* @brief Inform the executor of a submission completion with success
@ -751,8 +1005,9 @@ static inline void rtio_iodev_cancel_all(struct rtio_iodev *iodev)
* @param r RTIO context
* @param result Integer result code (could be -errno)
* @param userdata Userdata to pass along to completion
* @param flags Flags to use for the CEQ see RTIO_CQE_FLAG_*
*/
static inline void rtio_cqe_submit(struct rtio *r, int result, void *userdata)
static inline void rtio_cqe_submit(struct rtio *r, int result, void *userdata, uint32_t flags)
{
struct rtio_cqe *cqe = rtio_spsc_acquire(r->cq);
@ -761,6 +1016,7 @@ static inline void rtio_cqe_submit(struct rtio *r, int result, void *userdata)
} else {
cqe->result = result;
cqe->userdata = userdata;
cqe->flags = flags;
rtio_spsc_produce(r->cq);
}
#ifdef CONFIG_RTIO_SUBMIT_SEM
@ -776,6 +1032,115 @@ static inline void rtio_cqe_submit(struct rtio *r, int result, void *userdata)
#endif
}
#define __RTIO_MEMPOOL_GET_NUM_BLKS(num_bytes, blk_size) (((num_bytes) + (blk_size)-1) / (blk_size))
/**
* @brief Get the buffer associate with the RX submission
*
* @param[in] iodev_sqe The submission to probe
* @param[in] min_buf_len The minimum number of bytes needed for the operation
* @param[in] max_buf_len The maximum number of bytes needed for the operation
* @param[out] buf Where to store the pointer to the buffer
* @param[out] buf_len Where to store the size of the buffer
*
* @return 0 if @p buf and @p buf_len were successfully filled
* @return -ENOMEM Not enough memory for @p min_buf_len
*/
static inline int rtio_sqe_rx_buf(const struct rtio_iodev_sqe *iodev_sqe, uint32_t min_buf_len,
uint32_t max_buf_len, uint8_t **buf, uint32_t *buf_len)
{
const struct rtio_sqe *sqe = iodev_sqe->sqe;
#ifdef CONFIG_RTIO_SYS_MEM_BLOCKS
if (sqe->op == RTIO_OP_RX && sqe->flags & RTIO_SQE_MEMPOOL_BUFFER) {
struct rtio *r = iodev_sqe->r;
uint32_t blk_size = r->mempool_blk_size;
struct sys_mem_blocks *pool = r->mempool;
uint32_t bytes = max_buf_len;
int sqe_index = sqe - r->sq->buffer;
struct rtio_mempool_map_entry *map_entry = &r->mempool_map[sqe_index];
if (map_entry->state == RTIO_MEMPOOL_ENTRY_STATE_ALLOCATED) {
if (map_entry->block_count * blk_size < min_buf_len) {
return -ENOMEM;
}
*buf = &r->mempool->buffer[map_entry->block_count * blk_size];
*buf_len = map_entry->block_count * blk_size;
return 0;
}
do {
size_t num_blks = ceiling_fraction(bytes, blk_size);
int rc = sys_mem_blocks_alloc_contiguous(pool, num_blks, (void **)buf);
if (rc == 0) {
*buf_len = num_blks * blk_size;
map_entry->state = RTIO_MEMPOOL_ENTRY_STATE_ALLOCATED;
map_entry->block_idx = __rtio_compute_mempool_block_index(r, *buf);
map_entry->block_count = num_blks;
return 0;
}
if (bytes == min_buf_len) {
break;
}
bytes = (bytes + min_buf_len) / 2;
} while (bytes >= min_buf_len);
return -ENOMEM;
}
#endif
if (sqe->buf_len < min_buf_len) {
return -ENOMEM;
}
*buf = sqe->buf;
*buf_len = sqe->buf_len;
return 0;
}
/**
* @brief Release memory that was allocated by the RTIO's memory pool
*
* If the RTIO context was created by a call to RTIO_DEFINE_WITH_MEMPOOL(), then the cqe data might
* contain a buffer that's owned by the RTIO context. In those cases (if the read request was
* configured via rtio_sqe_read_with_pool()) the buffer must be returned back to the pool.
*
* Call this function when processing is complete. This function will validate that the memory
* actually belongs to the RTIO context and will ignore invalid arguments.
*
* @param r RTIO context
* @param buff Pointer to the buffer to be released.
*/
__syscall void rtio_release_buffer(struct rtio *r, void *buff);
static inline void z_impl_rtio_release_buffer(struct rtio *r, void *buff)
{
#ifdef CONFIG_RTIO_SYS_MEM_BLOCKS
if (r == NULL || buff == NULL || r->mempool == NULL) {
return;
}
int rc = sys_mem_blocks_free(r->mempool, 1, &buff);
if (rc != 0) {
return;
}
uint16_t blk_index = __rtio_compute_mempool_block_index(r, buff);
if (blk_index == UINT16_MAX) {
return;
}
for (int i = 0; i < r->sq->_spsc.mask + 1; ++i) {
struct rtio_mempool_map_entry *entry = &r->mempool_map[i];
if (entry->block_idx == blk_index) {
entry->state = RTIO_MEMPOOL_ENTRY_STATE_FREE;
break;
}
}
#endif
}
/**
* Grant access to an RTIO context to a user thread
*/

View file

@ -21,4 +21,9 @@ if(CONFIG_RTIO)
rtio_executor_concurrent.c
)
zephyr_library_sources_ifdef(
CONFIG_USERSPACE
rtio_userspace_init.c
)
endif()

View file

@ -38,6 +38,14 @@ config RTIO_CONSUME_SEM
will use polling on the completion queue with a k_yield() in between
iterations.
config RTIO_SYS_MEM_BLOCKS
bool "Include system memory blocks as an optional backing read memory pool"
select SYS_MEM_BLOCKS
help
Enable the RTIO_DEFINE_WITH_MEMPOOL macro which allows queueing reads
without a pre-allocated memory buffer. Instead the buffer will be taken
from the allocated memory pool associated with the RTIO context.
module = RTIO
module-str = RTIO
module-help = Sets log level for RTIO support

View file

@ -228,6 +228,7 @@ void rtio_concurrent_ok(struct rtio_iodev_sqe *iodev_sqe, int result)
do {
/* Capture the sqe information */
void *userdata = sqe->userdata;
uint32_t flags = rtio_cqe_compute_flags(iodev_sqe);
transaction = sqe->flags & RTIO_SQE_TRANSACTION;
@ -235,9 +236,7 @@ void rtio_concurrent_ok(struct rtio_iodev_sqe *iodev_sqe, int result)
conex_sweep(r, exc);
/* Submit the completion event */
rtio_cqe_submit(r, result, userdata);
conex_prepare(r, exc);
conex_resume(r, exc);
rtio_cqe_submit(r, result, userdata, flags);
if (transaction) {
/* sqe was a transaction, get the next one */
@ -245,6 +244,8 @@ void rtio_concurrent_ok(struct rtio_iodev_sqe *iodev_sqe, int result)
__ASSERT_NO_MSG(sqe != NULL);
}
} while (transaction);
conex_prepare(r, exc);
conex_resume(r, exc);
k_spin_unlock(&exc->lock, key);
}

View file

@ -86,6 +86,7 @@ void rtio_simple_ok(struct rtio_iodev_sqe *iodev_sqe, int result)
do {
/* Capture the sqe information */
void *userdata = sqe->userdata;
uint32_t flags = rtio_cqe_compute_flags(iodev_sqe);
transaction = sqe->flags & RTIO_SQE_TRANSACTION;
@ -93,7 +94,7 @@ void rtio_simple_ok(struct rtio_iodev_sqe *iodev_sqe, int result)
rtio_spsc_release(r->sq);
/* Submit the completion event */
rtio_cqe_submit(r, result, userdata);
rtio_cqe_submit(r, result, userdata, flags);
if (transaction) {
/* sqe was a transaction, get the next one */
@ -119,6 +120,7 @@ void rtio_simple_err(struct rtio_iodev_sqe *iodev_sqe, int result)
const struct rtio_sqe *sqe = iodev_sqe->sqe;
struct rtio *r = iodev_sqe->r;
void *userdata = sqe->userdata;
uint32_t flags = rtio_cqe_compute_flags(iodev_sqe);
bool chained = sqe->flags & RTIO_SQE_CHAINED;
bool transaction = sqe->flags & RTIO_SQE_TRANSACTION;
@ -132,7 +134,7 @@ void rtio_simple_err(struct rtio_iodev_sqe *iodev_sqe, int result)
rtio_spsc_release(r->sq);
iodev_sqe->sqe = NULL;
if (!transaction) {
rtio_cqe_submit(r, result, userdata);
rtio_cqe_submit(r, result, userdata, flags);
}
while (chained | transaction) {
sqe = rtio_spsc_consume(r->sq);
@ -142,9 +144,9 @@ void rtio_simple_err(struct rtio_iodev_sqe *iodev_sqe, int result)
rtio_spsc_release(r->sq);
if (!transaction) {
rtio_cqe_submit(r, result, userdata);
rtio_cqe_submit(r, result, userdata, flags);
} else {
rtio_cqe_submit(r, -ECANCELED, userdata);
rtio_cqe_submit(r, -ECANCELED, userdata, flags);
}
}

View file

@ -31,7 +31,9 @@ static inline bool rtio_vrfy_sqe(struct rtio_sqe *sqe)
valid_sqe &= Z_SYSCALL_MEMORY(sqe->buf, sqe->buf_len, false);
break;
case RTIO_OP_RX:
valid_sqe &= Z_SYSCALL_MEMORY(sqe->buf, sqe->buf_len, true);
if ((sqe->flags & RTIO_SQE_MEMPOOL_BUFFER) == 0) {
valid_sqe &= Z_SYSCALL_MEMORY(sqe->buf, sqe->buf_len, true);
}
break;
case RTIO_OP_TINY_TX:
break;
@ -49,6 +51,24 @@ static inline bool rtio_vrfy_sqe(struct rtio_sqe *sqe)
return valid_sqe;
}
static inline void z_vrfy_rtio_release_buffer(struct rtio *r, void *buff)
{
Z_OOPS(Z_SYSCALL_OBJ(r, K_OBJ_RTIO));
z_impl_rtio_release_buffer(r, buff);
}
#include <syscalls/rtio_release_buffer_mrsh.c>
static inline int z_vrfy_rtio_cqe_get_mempool_buffer(const struct rtio *r, struct rtio_cqe *cqe,
uint8_t **buff, uint32_t *buff_len)
{
Z_OOPS(Z_SYSCALL_OBJ(r, K_OBJ_RTIO));
Z_OOPS(Z_SYSCALL_MEMORY_READ(cqe, sizeof(struct rtio_cqe)));
Z_OOPS(Z_SYSCALL_MEMORY_READ(buff, sizeof(void *)));
Z_OOPS(Z_SYSCALL_MEMORY_READ(buff_len, sizeof(uint32_t)));
return z_impl_rtio_cqe_get_mempool_buffer(r, cqe, buff, buff_len);
}
#include <syscalls/rtio_cqe_get_mempool_buffer_mrsh.c>
static inline int z_vrfy_rtio_sqe_copy_in(struct rtio *r,
const struct rtio_sqe *sqes,
size_t sqe_count)

View file

@ -0,0 +1,8 @@
/*
* Copyright (c) 2023 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
#include <zephyr/app_memory/app_memdomain.h>
K_APPMEM_PARTITION_DEFINE(rtio_partition);

View file

@ -2,3 +2,4 @@ CONFIG_ZTEST=y
CONFIG_ZTEST_NEW_API=y
CONFIG_LOG=y
CONFIG_RTIO=y
CONFIG_RTIO_SYS_MEM_BLOCKS=y

View file

@ -58,6 +58,22 @@ static void rtio_iodev_timer_fn(struct k_timer *tm)
struct rtio_iodev_sqe *iodev_sqe = data->iodev_sqe;
struct rtio_iodev *iodev = (struct rtio_iodev *)iodev_sqe->sqe->iodev;
if (data->sqe->op == RTIO_OP_RX) {
uint8_t *buf;
uint32_t buf_len;
int rc = rtio_sqe_rx_buf(iodev_sqe, 16, 16, &buf, &buf_len);
if (rc != 0) {
rtio_iodev_sqe_err(iodev_sqe, rc);
return;
}
for (int i = 0; i < 16; ++i) {
buf[i] = ((uint8_t *)iodev_sqe->sqe->userdata)[i];
}
}
if (data->sqe->flags & RTIO_SQE_TRANSACTION) {
data->sqe = rtio_spsc_next(data->iodev_sqe->r->sq, data->sqe);
k_timer_start(&data->timer, K_MSEC(10), K_NO_WAIT);

View file

@ -24,11 +24,17 @@
/* Repeat tests to ensure they are repeatable */
#define TEST_REPEATS 4
#define MEM_BLK_COUNT 4
#define MEM_BLK_SIZE 16
#define MEM_BLK_ALIGN 4
RTIO_EXECUTOR_SIMPLE_DEFINE(simple_exec_simp);
RTIO_DEFINE(r_simple_simp, (struct rtio_executor *)&simple_exec_simp, 4, 4);
RTIO_DEFINE_WITH_MEMPOOL(r_simple_simp, (struct rtio_executor *)&simple_exec_simp, 4, 4,
MEM_BLK_COUNT, MEM_BLK_SIZE, MEM_BLK_ALIGN);
RTIO_EXECUTOR_CONCURRENT_DEFINE(simple_exec_con, 1);
RTIO_DEFINE(r_simple_con, (struct rtio_executor *)&simple_exec_con, 4, 4);
RTIO_DEFINE_WITH_MEMPOOL(r_simple_con, (struct rtio_executor *)&simple_exec_con, 4, 4,
MEM_BLK_COUNT, MEM_BLK_SIZE, MEM_BLK_ALIGN);
RTIO_IODEV_TEST_DEFINE(iodev_test_simple);
@ -224,13 +230,11 @@ ZTEST(rtio_api, test_rtio_multiple_chains)
}
#ifdef CONFIG_USERSPACE
K_APPMEM_PARTITION_DEFINE(rtio_partition);
K_APP_BMEM(rtio_partition) uint8_t syscall_bufs[4];
struct k_mem_domain rtio_domain;
#else
uint8_t syscall_bufs[4];
#endif
RTIO_BMEM uint8_t syscall_bufs[4];
RTIO_EXECUTOR_SIMPLE_DEFINE(syscall_simple);
RTIO_DEFINE(r_syscall, (struct rtio_executor *)&syscall_simple, 4, 4);
RTIO_IODEV_TEST_DEFINE(iodev_test_syscall);
@ -268,18 +272,10 @@ void rtio_syscall_test(void *p1, void *p2, void *p3)
#ifdef CONFIG_USERSPACE
ZTEST(rtio_api, test_rtio_syscalls_usermode)
{
struct k_mem_partition *parts[] = {
#if Z_LIBC_PARTITION_EXISTS
&z_libc_partition,
#endif
&rtio_partition
};
TC_PRINT("syscalls from user mode test\n");
TC_PRINT("test iodev init\n");
rtio_iodev_test_init(&iodev_test_syscall);
TC_PRINT("mem domain init\n");
k_mem_domain_init(&rtio_domain, ARRAY_SIZE(parts), parts);
TC_PRINT("mem domain add current\n");
k_mem_domain_add_thread(&rtio_domain, k_current_get());
TC_PRINT("rtio access grant\n");
@ -291,6 +287,77 @@ ZTEST(rtio_api, test_rtio_syscalls_usermode)
}
#endif /* CONFIG_USERSPACE */
RTIO_BMEM uint8_t mempool_data[MEM_BLK_SIZE];
static void test_rtio_simple_mempool_(struct rtio *r, int run_count)
{
int res;
struct rtio_sqe sqe;
struct rtio_cqe cqe;
for (int i = 0; i < MEM_BLK_SIZE; ++i) {
mempool_data[i] = i + run_count;
}
TC_PRINT("setting up single mempool read %p\n", r);
rtio_sqe_prep_read_with_pool(&sqe, (struct rtio_iodev *)&iodev_test_simple, 0,
mempool_data);
TC_PRINT("Calling rtio_sqe_copy_in()\n");
res = rtio_sqe_copy_in(r, &sqe, 1);
zassert_ok(res);
TC_PRINT("submit with wait\n");
res = rtio_submit(r, 1);
zassert_ok(res, "Should return ok from rtio_execute");
TC_PRINT("Calling rtio_cqe_copy_out\n");
zassert_equal(1, rtio_cqe_copy_out(r, &cqe, 1, K_FOREVER));
zassert_ok(cqe.result, "Result should be ok");
zassert_equal_ptr(cqe.userdata, mempool_data, "Expected userdata back");
uint8_t *buffer = NULL;
uint32_t buffer_len = 0;
TC_PRINT("Calling rtio_cqe_get_mempool_buffer\n");
zassert_ok(rtio_cqe_get_mempool_buffer(r, &cqe, &buffer, &buffer_len));
zassert_not_null(buffer, "Expected an allocated mempool buffer");
zassert_equal(buffer_len, MEM_BLK_SIZE);
zassert_mem_equal(buffer, mempool_data, MEM_BLK_SIZE, "Data expected to be the same");
TC_PRINT("Calling rtio_cqe_get_mempool_buffer\n");
rtio_release_buffer(r, buffer);
}
static void rtio_simple_mempool_test(void *a, void *b, void *c)
{
ARG_UNUSED(a);
ARG_UNUSED(b);
ARG_UNUSED(c);
TC_PRINT("rtio simple mempool simple\n");
for (int i = 0; i < TEST_REPEATS * 2; i++) {
test_rtio_simple_mempool_(&r_simple_simp, i);
}
TC_PRINT("rtio simple mempool concurrent\n");
for (int i = 0; i < TEST_REPEATS * 2; i++) {
test_rtio_simple_mempool_(&r_simple_con, i);
}
}
ZTEST(rtio_api, test_rtio_simple_mempool)
{
rtio_iodev_test_init(&iodev_test_simple);
#ifdef CONFIG_USERSPACE
k_mem_domain_add_thread(&rtio_domain, k_current_get());
rtio_access_grant(&r_simple_simp, k_current_get());
rtio_access_grant(&r_simple_con, k_current_get());
k_object_access_grant(&iodev_test_simple, k_current_get());
k_thread_user_mode_enter(rtio_simple_mempool_test, NULL, NULL, NULL);
#else
rtio_simple_mempool_test(NULL, NULL, NULL);
#endif
}
ZTEST(rtio_api, test_rtio_syscalls)
{
@ -394,6 +461,17 @@ ZTEST(rtio_api, test_rtio_transaction)
}
}
static void *rtio_api_setup(void)
{
#ifdef CONFIG_USERSPACE
k_mem_domain_init(&rtio_domain, 0, NULL);
k_mem_domain_add_partition(&rtio_domain, &rtio_partition);
#if Z_LIBC_PARTITION_EXISTS
k_mem_domain_add_partition(&rtio_domain, &z_libc_partition);
#endif /* Z_LIBC_PARTITION_EXISTS */
#endif /* CONFIG_USERSPACE */
return NULL;
}
ZTEST_SUITE(rtio_api, NULL, NULL, NULL, NULL, NULL);
ZTEST_SUITE(rtio_api, NULL, rtio_api_setup, NULL, NULL, NULL);