rtio: Add cqe per each sqe in transaction

Update the policy such that every completed sqe has a parallel cqe.
This has the primary purpose of making any reads in the sqe visible
to the consumer (since they might have different buffers).

Signed-off-by: Yuval Peress <peress@google.com>
This commit is contained in:
Yuval Peress 2023-03-20 22:13:06 -06:00 committed by Anas Nashif
parent af79019146
commit 80d70b4e96
4 changed files with 53 additions and 24 deletions

View file

@ -431,6 +431,11 @@ Libraries / Subsystems
correctly, allowing other transports or other parts of the application correctly, allowing other transports or other parts of the application
code to use it. code to use it.
* RTIO
* Added policy that every ``sqe`` will generate a ``cqe`` (previously an RTIO_SQE_TRANSACTION
entry would only trigger a ``cqe`` on the last ``sqe`` in the transaction.
HALs HALs
**** ****

View file

@ -223,17 +223,28 @@ void rtio_concurrent_ok(struct rtio_iodev_sqe *iodev_sqe, int result)
exc->task_status[task_id] |= CONEX_TASK_COMPLETE; exc->task_status[task_id] |= CONEX_TASK_COMPLETE;
} }
bool transaction = sqe->flags & RTIO_SQE_TRANSACTION; bool transaction;
do {
/* Capture the sqe information */
void *userdata = sqe->userdata;
while (transaction) {
sqe = rtio_spsc_next(r->sq, sqe);
transaction = sqe->flags & RTIO_SQE_TRANSACTION; transaction = sqe->flags & RTIO_SQE_TRANSACTION;
}
conex_sweep(r, exc); /* Release the sqe */
rtio_cqe_submit(r, result, sqe->userdata); conex_sweep(r, exc);
conex_prepare(r, exc);
conex_resume(r, exc); /* Submit the completion event */
rtio_cqe_submit(r, result, userdata);
conex_prepare(r, exc);
conex_resume(r, exc);
if (transaction) {
/* sqe was a transaction, get the next one */
sqe = rtio_spsc_next(r->sq, sqe);
__ASSERT_NO_MSG(sqe != NULL);
}
} while (transaction);
k_spin_unlock(&exc->lock, key); k_spin_unlock(&exc->lock, key);
} }
@ -248,6 +259,7 @@ void rtio_concurrent_err(struct rtio_iodev_sqe *iodev_sqe, int result)
const struct rtio_sqe *sqe = iodev_sqe->sqe; const struct rtio_sqe *sqe = iodev_sqe->sqe;
struct rtio_concurrent_executor *exc = (struct rtio_concurrent_executor *)r->executor; struct rtio_concurrent_executor *exc = (struct rtio_concurrent_executor *)r->executor;
void *userdata = sqe->userdata; void *userdata = sqe->userdata;
uint32_t flags = rtio_cqe_compute_flags(iodev_sqe);
bool chained = sqe->flags & RTIO_SQE_CHAINED; bool chained = sqe->flags & RTIO_SQE_CHAINED;
bool transaction = sqe->flags & RTIO_SQE_TRANSACTION; bool transaction = sqe->flags & RTIO_SQE_TRANSACTION;
uint16_t task_id = conex_task_id(exc, iodev_sqe); uint16_t task_id = conex_task_id(exc, iodev_sqe);
@ -260,7 +272,7 @@ void rtio_concurrent_err(struct rtio_iodev_sqe *iodev_sqe, int result)
key = k_spin_lock(&exc->lock); key = k_spin_lock(&exc->lock);
if (!transaction) { if (!transaction) {
rtio_cqe_submit(r, result, userdata); rtio_cqe_submit(r, result, userdata, flags);
} }
/* While the last sqe was marked as chained or transactional, do more work */ /* While the last sqe was marked as chained or transactional, do more work */
@ -271,9 +283,9 @@ void rtio_concurrent_err(struct rtio_iodev_sqe *iodev_sqe, int result)
userdata = sqe->userdata; userdata = sqe->userdata;
if (!transaction) { if (!transaction) {
rtio_cqe_submit(r, result, userdata); rtio_cqe_submit(r, result, userdata, flags);
} else { } else {
rtio_cqe_submit(r, -ECANCELED, userdata); rtio_cqe_submit(r, -ECANCELED, userdata, flags);
} }
} }

View file

@ -74,6 +74,7 @@ void rtio_simple_ok(struct rtio_iodev_sqe *iodev_sqe, int result)
{ {
struct rtio *r = iodev_sqe->r; struct rtio *r = iodev_sqe->r;
const struct rtio_sqe *sqe = iodev_sqe->sqe; const struct rtio_sqe *sqe = iodev_sqe->sqe;
bool transaction;
#ifdef CONFIG_ASSERT #ifdef CONFIG_ASSERT
struct rtio_simple_executor *exc = struct rtio_simple_executor *exc =
@ -82,21 +83,27 @@ void rtio_simple_ok(struct rtio_iodev_sqe *iodev_sqe, int result)
__ASSERT_NO_MSG(iodev_sqe == &exc->task); __ASSERT_NO_MSG(iodev_sqe == &exc->task);
#endif #endif
bool transaction = sqe->flags & RTIO_SQE_TRANSACTION; do {
/* Capture the sqe information */
void *userdata = sqe->userdata;
while (transaction) {
rtio_spsc_release(r->sq);
sqe = rtio_spsc_consume(r->sq);
__ASSERT_NO_MSG(sqe != NULL);
transaction = sqe->flags & RTIO_SQE_TRANSACTION; transaction = sqe->flags & RTIO_SQE_TRANSACTION;
}
void *userdata = sqe->userdata; /* Release the sqe */
rtio_spsc_release(r->sq);
/* Submit the completion event */
rtio_cqe_submit(r, result, userdata);
if (transaction) {
/* sqe was a transaction, get the next one */
sqe = rtio_spsc_consume(r->sq);
__ASSERT_NO_MSG(sqe != NULL);
}
} while (transaction);
rtio_spsc_release(r->sq);
iodev_sqe->sqe = NULL; iodev_sqe->sqe = NULL;
rtio_cqe_submit(r, result, userdata);
rtio_simple_submit(r); rtio_simple_submit(r);
} }

View file

@ -349,16 +349,21 @@ void test_rtio_transaction_(struct rtio *r)
&userdata[1]); &userdata[1]);
TC_PRINT("submitting userdata 0 %p, userdata 1 %p\n", &userdata[0], &userdata[1]); TC_PRINT("submitting userdata 0 %p, userdata 1 %p\n", &userdata[0], &userdata[1]);
res = rtio_submit(r, 2); res = rtio_submit(r, 4);
TC_PRINT("checking cq, completions available %lu\n", rtio_spsc_consumable(r->cq)); TC_PRINT("checking cq, completions available %lu\n", rtio_spsc_consumable(r->cq));
zassert_ok(res, "Should return ok from rtio_execute"); zassert_ok(res, "Should return ok from rtio_execute");
zassert_equal(rtio_spsc_consumable(r->cq), 2, "Should have 2 pending completions"); zassert_equal(rtio_spsc_consumable(r->cq), 4, "Should have 4 pending completions");
for (int i = 0; i < 2; i++) { for (int i = 0; i < 4; i++) {
TC_PRINT("consume %d\n", i); TC_PRINT("consume %d\n", i);
cqe = rtio_spsc_consume(r->cq); cqe = rtio_spsc_consume(r->cq);
zassert_not_null(cqe, "Expected a valid cqe"); zassert_not_null(cqe, "Expected a valid cqe");
zassert_ok(cqe->result, "Result should be ok"); zassert_ok(cqe->result, "Result should be ok");
if (i % 2 == 0) {
zassert_is_null(cqe->userdata);
rtio_spsc_release(r->cq);
continue;
}
uintptr_t idx = *(uintptr_t *)cqe->userdata; uintptr_t idx = *(uintptr_t *)cqe->userdata;
TC_PRINT("userdata is %p, value %lu\n", cqe->userdata, idx); TC_PRINT("userdata is %p, value %lu\n", cqe->userdata, idx);