kernel/pipe: implement direct-to-pending-readers data copy

If there are pending readers, it is best to perform a single data copy
directly into their final destination buffer rather than doing one copy
into the ring buffer just to immediately copy the same data out of it.

Incidentally, this allows for supporting pipes with no ring buffer at all.

The pipe implementation being deprecated has a similar capability so better
have it here too.

Signed-off-by: Nicolas Pitre <npitre@baylibre.com>
This commit is contained in:
Nicolas Pitre 2025-01-14 13:41:10 -05:00 committed by Benjamin Cabé
parent 4af80d72bc
commit 29ae9e3435
3 changed files with 93 additions and 15 deletions

View file

@ -19,7 +19,8 @@ is referenced by its memory address.
A pipe has the following key property: A pipe has the following key property:
* A **size** that indicates the capacity of the pipe's ring buffer. * A **size** that indicates the capacity of the pipe's ring buffer. Note that a
size of zero defines a pipe with no ring buffer.
A pipe must be initialized before it can be used. When initialized, the pipe A pipe must be initialized before it can be used. When initialized, the pipe
is empty. is empty.
@ -27,12 +28,16 @@ is empty.
Threads interact with the pipe as follows: Threads interact with the pipe as follows:
- **Writing**: Data is synchronously written, either in whole or in part, to - **Writing**: Data is synchronously written, either in whole or in part, to
a pipe by a thread. If the pipe's ring buffer is full, the operation blocks a pipe by a thread. Accepted data is either copied directly to the waiting
until sufficient space becomes available or the specified timeout expires. reader(s) or to the pipe's ring buffer. If the ring buffer is full or simply
absent, the operation blocks until sufficient space becomes available or
the specified timeout expires.
- **Reading**: Data is synchronously read, either in whole or in part, from a - **Reading**: Data is synchronously read, either in whole or in part, from a
pipe by a thread. If the pipe's ring buffer is empty, the operation blocks pipe by a thread. Accepted data is either copied from the pipe's ring buffer
until data becomes available or the specified timeout expires. or directly from the waiting sender(s). If the ring buffer is empty or simply
absent, the operation blocks until data becomes available or the specified
timeout expires.
- **Resetting**: A thread can reset a pipe, which resets its internal state and - **Resetting**: A thread can reset a pipe, which resets its internal state and
ends all pending read and write operations with an error code. ends all pending read and write operations with an error code.
@ -66,6 +71,9 @@ ring buffer:
This has the same effect as the code above. This has the same effect as the code above.
When no ring buffer is used, the buffer pointer argument should be NULL and
the size argument should be 0.
Writing to a Pipe Writing to a Pipe
================= =================

View file

@ -4998,8 +4998,8 @@ void k_mbox_data_get(struct k_mbox_msg *rx_msg, void *buffer);
* This routine initializes a pipe object, prior to its first use. * This routine initializes a pipe object, prior to its first use.
* *
* @param pipe Address of the pipe. * @param pipe Address of the pipe.
* @param buffer Address of the pipe's buffer. * @param buffer Address of the pipe's buffer, or NULL if no ring buffer is used.
* @param buffer_size Size of the pipe's buffer. * @param buffer_size Size of the pipe's buffer, or zero if no ring buffer is used.
*/ */
__syscall void k_pipe_init(struct k_pipe *pipe, uint8_t *buffer, size_t buffer_size); __syscall void k_pipe_init(struct k_pipe *pipe, uint8_t *buffer, size_t buffer_size);
@ -5247,7 +5247,8 @@ struct k_pipe {
* @code extern struct k_pipe <name>; @endcode * @code extern struct k_pipe <name>; @endcode
* *
* @param name Name of the pipe. * @param name Name of the pipe.
* @param pipe_buffer_size Size of the pipe's ring buffer (in bytes). * @param pipe_buffer_size Size of the pipe's ring buffer (in bytes)
* or zero if no ring buffer is used.
* @param pipe_align Alignment of the pipe's ring buffer (power of 2). * @param pipe_align Alignment of the pipe's ring buffer (power of 2).
* *
*/ */

View file

@ -79,6 +79,65 @@ void z_impl_k_pipe_init(struct k_pipe *pipe, uint8_t *buffer, size_t buffer_size
SYS_PORT_TRACING_OBJ_INIT(k_pipe, pipe, buffer, buffer_size); SYS_PORT_TRACING_OBJ_INIT(k_pipe, pipe, buffer, buffer_size);
} }
struct pipe_buf_spec {
uint8_t * const data;
const size_t len;
size_t used;
};
static size_t copy_to_pending_readers(struct k_pipe *pipe,
const uint8_t *data, size_t len)
{
struct k_thread *reader;
struct pipe_buf_spec *reader_buf;
size_t copy_size, written = 0;
/*
* Attempt a direct data copy to waiting readers if any.
* The copy has to be done under the scheduler lock to ensure all the
* needed data is copied to the target thread whose buffer spec lives
* on that thread's stack, and then the thread unpended only if it
* received all the data it wanted, without racing with a potential
* thread timeout/cancellation event.
*/
do {
LOCK_SCHED_SPINLOCK {
reader = _priq_wait_best(&pipe->data.waitq);
if (reader == NULL) {
K_SPINLOCK_BREAK;
}
reader_buf = reader->base.swap_data;
copy_size = MIN(len - written,
reader_buf->len - reader_buf->used);
memcpy(&reader_buf->data[reader_buf->used],
&data[written], copy_size);
written += copy_size;
reader_buf->used += copy_size;
if (reader_buf->used < reader_buf->len) {
/* This reader wants more: don't unpend. */
reader = NULL;
} else {
/*
* This reader has received all the data
* it was waiting for: wake it up with
* the scheduler lock still held.
*/
unpend_thread_no_timeout(reader);
z_abort_thread_timeout(reader);
}
}
if (reader != NULL) {
/* rest of thread wake-up outside the scheduler lock */
z_thread_return_value_set_with_data(reader, 0, NULL);
z_ready_thread(reader);
}
} while (reader != NULL && written < len);
return written;
}
int z_impl_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_timeout_t timeout) int z_impl_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_timeout_t timeout)
{ {
int rc; int rc;
@ -100,7 +159,14 @@ int z_impl_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_
} }
if (pipe_empty(pipe)) { if (pipe_empty(pipe)) {
z_sched_wake(&pipe->data, 0, NULL); if (pipe->waiting != 0) {
written += copy_to_pending_readers(pipe, &data[written],
len - written);
if (written >= len) {
rc = written;
break;
}
}
#ifdef CONFIG_POLL #ifdef CONFIG_POLL
z_handle_obj_poll_events(&pipe->poll_events, z_handle_obj_poll_events(&pipe->poll_events,
K_POLL_STATE_PIPE_DATA_AVAILABLE); K_POLL_STATE_PIPE_DATA_AVAILABLE);
@ -129,8 +195,8 @@ exit:
int z_impl_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout_t timeout) int z_impl_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout_t timeout)
{ {
struct pipe_buf_spec buf = { data, len, 0 };
int rc; int rc;
size_t read = 0;
k_timepoint_t end = sys_timepoint_calc(timeout); k_timepoint_t end = sys_timepoint_calc(timeout);
k_spinlock_key_t key = k_spin_lock(&pipe->lock); k_spinlock_key_t key = k_spin_lock(&pipe->lock);
@ -146,21 +212,24 @@ int z_impl_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout
z_sched_wake(&pipe->space, 0, NULL); z_sched_wake(&pipe->space, 0, NULL);
} }
read += ring_buf_get(&pipe->buf, &data[read], len - read); buf.used += ring_buf_get(&pipe->buf, &data[buf.used], len - buf.used);
if (likely(read == len)) { if (likely(buf.used == len)) {
rc = read; rc = buf.used;
break; break;
} }
if (unlikely(pipe_closed(pipe))) { if (unlikely(pipe_closed(pipe))) {
rc = read ? read : -EPIPE; rc = buf.used ? buf.used : -EPIPE;
break; break;
} }
/* provide our "direct copy" info to potential writers */
_current->base.swap_data = &buf;
rc = wait_for(&pipe->data, pipe, &key, end); rc = wait_for(&pipe->data, pipe, &key, end);
if (rc != 0) { if (rc != 0) {
if (rc == -EAGAIN) { if (rc == -EAGAIN) {
rc = read ? read : -EAGAIN; rc = buf.used ? buf.used : -EAGAIN;
} }
break; break;
} }