From 80d70b4e968ce56c702d97b7059d4cb79b1392de Mon Sep 17 00:00:00 2001 From: Yuval Peress Date: Mon, 20 Mar 2023 22:13:06 -0600 Subject: [PATCH] rtio: Add cqe per each sqe in transaction Update the policy such that every completed sqe has a parallel cqe. This has the primary purpose of making any reads in the sqe visible to the consumer (since they might have different buffers). Signed-off-by: Yuval Peress --- doc/releases/release-notes-3.4.rst | 5 +++ subsys/rtio/rtio_executor_concurrent.c | 34 +++++++++++++------ subsys/rtio/rtio_executor_simple.c | 27 +++++++++------ .../subsys/rtio/rtio_api/src/test_rtio_api.c | 11 ++++-- 4 files changed, 53 insertions(+), 24 deletions(-) diff --git a/doc/releases/release-notes-3.4.rst b/doc/releases/release-notes-3.4.rst index 5e60648d904..27cec6ee088 100644 --- a/doc/releases/release-notes-3.4.rst +++ b/doc/releases/release-notes-3.4.rst @@ -431,6 +431,11 @@ Libraries / Subsystems correctly, allowing other transports or other parts of the application code to use it. +* RTIO + + * Added policy that every ``sqe`` will generate a ``cqe`` (previously an RTIO_SQE_TRANSACTION + entry would only trigger a ``cqe`` on the last ``sqe`` in the transaction. + HALs **** diff --git a/subsys/rtio/rtio_executor_concurrent.c b/subsys/rtio/rtio_executor_concurrent.c index 4adbeebdac8..ca614093ea9 100644 --- a/subsys/rtio/rtio_executor_concurrent.c +++ b/subsys/rtio/rtio_executor_concurrent.c @@ -223,17 +223,28 @@ void rtio_concurrent_ok(struct rtio_iodev_sqe *iodev_sqe, int result) exc->task_status[task_id] |= CONEX_TASK_COMPLETE; } - bool transaction = sqe->flags & RTIO_SQE_TRANSACTION; + bool transaction; + + do { + /* Capture the sqe information */ + void *userdata = sqe->userdata; - while (transaction) { - sqe = rtio_spsc_next(r->sq, sqe); transaction = sqe->flags & RTIO_SQE_TRANSACTION; - } - conex_sweep(r, exc); - rtio_cqe_submit(r, result, sqe->userdata); - conex_prepare(r, exc); - conex_resume(r, exc); + /* Release the sqe */ + conex_sweep(r, exc); + + /* Submit the completion event */ + rtio_cqe_submit(r, result, userdata); + conex_prepare(r, exc); + conex_resume(r, exc); + + if (transaction) { + /* sqe was a transaction, get the next one */ + sqe = rtio_spsc_next(r->sq, sqe); + __ASSERT_NO_MSG(sqe != NULL); + } + } while (transaction); k_spin_unlock(&exc->lock, key); } @@ -248,6 +259,7 @@ void rtio_concurrent_err(struct rtio_iodev_sqe *iodev_sqe, int result) const struct rtio_sqe *sqe = iodev_sqe->sqe; struct rtio_concurrent_executor *exc = (struct rtio_concurrent_executor *)r->executor; void *userdata = sqe->userdata; + uint32_t flags = rtio_cqe_compute_flags(iodev_sqe); bool chained = sqe->flags & RTIO_SQE_CHAINED; bool transaction = sqe->flags & RTIO_SQE_TRANSACTION; uint16_t task_id = conex_task_id(exc, iodev_sqe); @@ -260,7 +272,7 @@ void rtio_concurrent_err(struct rtio_iodev_sqe *iodev_sqe, int result) key = k_spin_lock(&exc->lock); if (!transaction) { - rtio_cqe_submit(r, result, userdata); + rtio_cqe_submit(r, result, userdata, flags); } /* While the last sqe was marked as chained or transactional, do more work */ @@ -271,9 +283,9 @@ void rtio_concurrent_err(struct rtio_iodev_sqe *iodev_sqe, int result) userdata = sqe->userdata; if (!transaction) { - rtio_cqe_submit(r, result, userdata); + rtio_cqe_submit(r, result, userdata, flags); } else { - rtio_cqe_submit(r, -ECANCELED, userdata); + rtio_cqe_submit(r, -ECANCELED, userdata, flags); } } diff --git a/subsys/rtio/rtio_executor_simple.c b/subsys/rtio/rtio_executor_simple.c index bf2c6c49014..8c9bd02ecea 100644 --- a/subsys/rtio/rtio_executor_simple.c +++ b/subsys/rtio/rtio_executor_simple.c @@ -74,6 +74,7 @@ void rtio_simple_ok(struct rtio_iodev_sqe *iodev_sqe, int result) { struct rtio *r = iodev_sqe->r; const struct rtio_sqe *sqe = iodev_sqe->sqe; + bool transaction; #ifdef CONFIG_ASSERT struct rtio_simple_executor *exc = @@ -82,21 +83,27 @@ void rtio_simple_ok(struct rtio_iodev_sqe *iodev_sqe, int result) __ASSERT_NO_MSG(iodev_sqe == &exc->task); #endif - bool transaction = sqe->flags & RTIO_SQE_TRANSACTION; + do { + /* Capture the sqe information */ + void *userdata = sqe->userdata; - while (transaction) { - rtio_spsc_release(r->sq); - sqe = rtio_spsc_consume(r->sq); - __ASSERT_NO_MSG(sqe != NULL); transaction = sqe->flags & RTIO_SQE_TRANSACTION; - } - void *userdata = sqe->userdata; + /* Release the sqe */ + rtio_spsc_release(r->sq); + + /* Submit the completion event */ + rtio_cqe_submit(r, result, userdata); + + if (transaction) { + /* sqe was a transaction, get the next one */ + sqe = rtio_spsc_consume(r->sq); + __ASSERT_NO_MSG(sqe != NULL); + } + + } while (transaction); - rtio_spsc_release(r->sq); iodev_sqe->sqe = NULL; - - rtio_cqe_submit(r, result, userdata); rtio_simple_submit(r); } diff --git a/tests/subsys/rtio/rtio_api/src/test_rtio_api.c b/tests/subsys/rtio/rtio_api/src/test_rtio_api.c index 86b198a6589..84cbf86a8e1 100644 --- a/tests/subsys/rtio/rtio_api/src/test_rtio_api.c +++ b/tests/subsys/rtio/rtio_api/src/test_rtio_api.c @@ -349,16 +349,21 @@ void test_rtio_transaction_(struct rtio *r) &userdata[1]); TC_PRINT("submitting userdata 0 %p, userdata 1 %p\n", &userdata[0], &userdata[1]); - res = rtio_submit(r, 2); + res = rtio_submit(r, 4); TC_PRINT("checking cq, completions available %lu\n", rtio_spsc_consumable(r->cq)); zassert_ok(res, "Should return ok from rtio_execute"); - zassert_equal(rtio_spsc_consumable(r->cq), 2, "Should have 2 pending completions"); + zassert_equal(rtio_spsc_consumable(r->cq), 4, "Should have 4 pending completions"); - for (int i = 0; i < 2; i++) { + for (int i = 0; i < 4; i++) { TC_PRINT("consume %d\n", i); cqe = rtio_spsc_consume(r->cq); zassert_not_null(cqe, "Expected a valid cqe"); zassert_ok(cqe->result, "Result should be ok"); + if (i % 2 == 0) { + zassert_is_null(cqe->userdata); + rtio_spsc_release(r->cq); + continue; + } uintptr_t idx = *(uintptr_t *)cqe->userdata; TC_PRINT("userdata is %p, value %lu\n", cqe->userdata, idx);