diff --git a/doc/kernel/services/data_passing/pipes.rst b/doc/kernel/services/data_passing/pipes.rst index b3130687db5..abe9ab1800f 100644 --- a/doc/kernel/services/data_passing/pipes.rst +++ b/doc/kernel/services/data_passing/pipes.rst @@ -4,8 +4,8 @@ Pipes ##### A :dfn:`pipe` is a kernel object that allows a thread to send a byte stream -to another thread. Pipes can be used to synchronously transfer chunks of data -in whole or in part. +to another thread. Pipes enable efficient inter-thread communication and can +be used to synchronously transfer chunks of data in whole or in part. .. contents:: :local: @@ -14,172 +14,171 @@ in whole or in part. Concepts ******** -The pipe can be configured with a ring buffer which holds data that has been -sent but not yet received; alternatively, the pipe may have no ring buffer. - -Any number of pipes can be defined (limited only by available RAM). Each pipe is -referenced by its memory address. +Any number of pipes can be defined, limited only by available RAM. Each pipe +is referenced by its memory address. A pipe has the following key property: -* A **size** that indicates the size of the pipe's ring buffer. Note that a - size of zero defines a pipe with no ring buffer. +* A **size** that indicates the capacity of the pipe's ring buffer. -A pipe must be initialized before it can be used. The pipe is initially empty. +A pipe must be initialized before it can be used. When initialized, the pipe +is empty. -Data is synchronously **sent** either in whole or in part to a pipe by a -thread. If the specified minimum number of bytes can not be immediately -satisfied, then the operation will either fail immediately or attempt to send -as many bytes as possible and then pend in the hope that the send can be -completed later. Accepted data is either copied to the pipe's ring buffer -or directly to the waiting reader(s). +Threads interact with the pipe as follows: -Data is synchronously **received** from a pipe by a thread. If the specified -minimum number of bytes can not be immediately satisfied, then the operation -will either fail immediately or attempt to receive as many bytes as possible -and then pend in the hope that the receive can be completed later. Accepted -data is either copied from the pipe's ring buffer or directly from the -waiting sender(s). +- **Writing**: Data is synchronously written, either in whole or in part, to + a pipe by a thread. If the pipe's ring buffer is full, the operation blocks + until sufficient space becomes available or the specified timeout expires. -Data may also be **flushed** from a pipe by a thread. Flushing can be performed -either on the entire pipe or on only its ring buffer. Flushing the entire pipe -is equivalent to reading all the information in the ring buffer **and** waiting -to be written into a giant temporary buffer which is then discarded. Flushing -the ring buffer is equivalent to reading **only** the data in the ring buffer -into a temporary buffer which is then discarded. Flushing the ring buffer does -not guarantee that the ring buffer will stay empty; flushing it may allow a -pended writer to fill the ring buffer. +- **Reading**: Data is synchronously read, either in whole or in part, from a + pipe by a thread. If the pipe's ring buffer is empty, the operation blocks + until data becomes available or the specified timeout expires. -.. note:: - Flushing does not in practice allocate or use additional buffers. +- **Resetting**: A thread can reset a pipe, which resets its internal state and + ends all pending read and write operations with an error code. -.. note:: - The kernel does allow for an ISR to flush a pipe from an ISR. It also - allows it to send/receive data to/from one provided it does not attempt - to wait for space/data. +Pipes are well-suited for scenarios like producer-consumer patterns or +streaming data between threads. Implementation ************** -A pipe is defined using a variable of type :c:struct:`k_pipe` and an -optional character buffer of type ``unsigned char``. It must then be -initialized by calling :c:func:`k_pipe_init`. - -The following code defines and initializes an empty pipe that has a ring -buffer capable of holding 100 bytes and is aligned to a 4-byte boundary. +A pipe is defined using a variable of type :c:struct:`k_pipe` and a +byte buffer. The pipe must then be initialized by calling :c:func:`k_pipe_init`. +The following code defines and initializes an empty pipe with a ring buffer +capable of holding 100 bytes, aligned to a 4-byte boundary: .. code-block:: c - unsigned char __aligned(4) my_ring_buffer[100]; + uint8_t __aligned(4) my_ring_buffer[100]; struct k_pipe my_pipe; k_pipe_init(&my_pipe, my_ring_buffer, sizeof(my_ring_buffer)); -Alternatively, a pipe can be defined and initialized at compile time by -calling :c:macro:`K_PIPE_DEFINE`. - -The following code has the same effect as the code segment above. Observe -that macro defines both the pipe and its ring buffer. +Alternatively, a pipe can be defined and initialized at compile time using +the :c:macro:`K_PIPE_DEFINE` macro, which defines both the pipe and its +ring buffer: .. code-block:: c K_PIPE_DEFINE(my_pipe, 100, 4); +This has the same effect as the code above. + Writing to a Pipe ================= -Data is added to a pipe by calling :c:func:`k_pipe_put`. +Data is added to a pipe by calling :c:func:`k_pipe_write`. -The following code builds on the example above, and uses the pipe to pass -data from a producing thread to one or more consuming threads. If the pipe's -ring buffer fills up because the consumers can't keep up, the producing thread -waits for a specified amount of time. +The following example demonstrates using a pipe to send data from a producer +thread to one or more consumer threads. If the pipe's ring buffer fills up, +the producer thread waits for a specified amount of time. .. code-block:: c - struct message_header { - ... - }; + struct message_header { + size_t num_data_bytes; /* Example field */ + ... + }; - void producer_thread(void) - { - unsigned char *data; - size_t total_size; - size_t bytes_written; - int rc; - ... + void producer_thread(void) + { + int rc; + uint8_t *data; + size_t total_size; + size_t bytes_written; - while (1) { - /* Craft message to send in the pipe */ - data = ...; - total_size = ...; + while (1) { + /* Craft message to send in the pipe */ + make_message(data, &total_size); + bytes_written = 0; - /* send data to the consumers */ - rc = k_pipe_put(&my_pipe, data, total_size, &bytes_written, - sizeof(struct message_header), K_NO_WAIT); + /* Write data to the pipe, handling partial writes */ + while (bytes_written < total_size) { + rc = k_pipe_write(&my_pipe, &data[bytes_written], total_size - bytes_written, K_NO_WAIT); - if (rc < 0) { - /* Incomplete message header sent */ - ... - } else if (bytes_written < total_size) { - /* Some of the data was sent */ - ... - } else { - /* All data sent */ - ... - } - } - } + if (rc < 0) { + /* Error occurred */ + ... + break; + } else { + /* Partial or full write succeeded; adjust for next iteration */ + bytes_written += rc; + } + } + + /* Reset bytes_written for the next message */ + bytes_written = 0; + ... + } + } Reading from a Pipe =================== -Data is read from the pipe by calling :c:func:`k_pipe_get`. +Data is retrieved from the pipe by calling :c:func:`k_pipe_read`. -The following code builds on the example above, and uses the pipe to -process data items generated by one or more producing threads. +The following example builds on the producer thread example above. It shows +a consumer thread that processes data generated by the producer. .. code-block:: c - void consumer_thread(void) - { - unsigned char buffer[120]; - size_t bytes_read; - struct message_header *header = (struct message_header *)buffer; + struct message_header { + size_t num_data_bytes; /* Example field */ + ... + }; - while (1) { - rc = k_pipe_get(&my_pipe, buffer, sizeof(buffer), &bytes_read, - sizeof(*header), K_MSEC(100)); + void consumer_thread(void) + { + int rc; + uint8_t buffer[128]; + size_t bytes_read = 0; + struct message_header *header = (struct message_header *)buffer; - if ((rc < 0) || (bytes_read < sizeof (*header))) { - /* Incomplete message header received */ - ... - } else if (header->num_data_bytes + sizeof(*header) > bytes_read) { - /* Only some data was received */ - ... - } else { - /* All data was received */ - ... - } - } - } + while (1) { + /* Step 1: Read the message header */ + bytes_read = 0; + read_header: + while (bytes_read < sizeof(*header)) { + rc = k_pipe_read(&my_pipe, &buffer[bytes_read], sizeof(*header) - bytes_read, &bytes_read, K_NO_WAIT); -Use a pipe to send streams of data between threads. + if (rc < 0) { + /* Error occurred */ + ... + goto read_header; + } -.. note:: - A pipe can be used to transfer long streams of data if desired. However - it is often preferable to send pointers to large data items to avoid - copying the data. + /* Adjust for partial reads */ + bytes_read += rc; + } -Flushing a Pipe's Buffer -======================== + /* Step 2: Read the message body */ + bytes_read = 0; + while (bytes_read < header->num_data_bytes) { + rc = k_pipe_read(&my_pipe, &buffer[sizeof(*header) + bytes_read], header->num_data_bytes - bytes_read, K_NO_WAIT); -Data is flushed from the pipe's ring buffer by calling -:c:func:`k_pipe_buffer_flush`. + if (rc < 0) { + /* Error occurred */ + ... + goto read_header; + } -The following code builds on the examples above, and flushes the pipe's -buffer. + /* Adjust for partial reads */ + bytes_read += rc; + } + /* Successfully received the complete message */ + } + } + +Resetting a Pipe +================ + +The pipe can be reset by calling :c:func:`k_pipe_reset`. Resetting a pipe +resets its internal state and ends all pending operations with an error code. + +The following example demonstrates resetting a pipe in response to a critical +error: .. code-block:: c @@ -187,51 +186,21 @@ buffer. { while (1) { ... - /* Pipe buffer contains stale data. Flush it. */ - k_pipe_buffer_flush(&my_pipe); + /* Critical error detected: reset the entire pipe to reset it. */ + k_pipe_reset(&my_pipe); ... } } -Flushing a Pipe -=============== - -All data in the pipe is flushed by calling :c:func:`k_pipe_flush`. - -The following code builds on the examples above, and flushes all the -data in the pipe. - -.. code-block:: c - - void monitor_thread(void) - { - while (1) { - ... - /* Critical error detected. Flush the entire pipe to reset it. */ - k_pipe_flush(&my_pipe); - ... - } - } - - -Suggested uses +Suggested Uses ************** -Use a pipe to send streams of data between threads. +Pipes are useful for sending streams of data between threads. Typical +applications include: -.. note:: - A pipe can be used to transfer long streams of data if desired. However it - is often preferable to send pointers to large data items to avoid copying - the data. Copying large data items will negatively impact interrupt latency - as a spinlock is held while copying that data. - - -Configuration Options -********************* - -Related configuration options: - -* :kconfig:option:`CONFIG_PIPES` +- Implementing producer-consumer patterns. +- Streaming logs or packets between threads. +- Handling variable-length message passing in real-time systems. API Reference *************