sensors: Add streaming APIs

Introduce a streaming API that uses the same data path as the async API.

This includes features to the decoder:
* Checking if triggers are present

Adding streaming features built ontop of existing triggers:
* Adding 3 operations to be done on a trigger
  * include - include the data with the trigger information
  * nop - do nothing
  * drop - drop the data (flush)
* Add a new sensor_stream() API to mirror sensor_read() but add an
optional handler to be able to cancel the stream.

Signed-off-by: Yuval Peress <peress@google.com>
topic#sensor_stream
This commit is contained in:
Yuval Peress 2023-06-05 20:26:19 -06:00 committed by Maureen Helm
parent 1088846e9a
commit 94dc05b3f2
9 changed files with 550 additions and 71 deletions

View file

@ -166,5 +166,6 @@ zephyr_library_property(ALLOW_EMPTY TRUE)
zephyr_library_sources_ifdef(CONFIG_USERSPACE sensor_handlers.c)
zephyr_library_sources_ifdef(CONFIG_SENSOR_SHELL sensor_shell.c)
zephyr_library_sources_ifdef(CONFIG_SENSOR_SHELL_STREAM sensor_shell_stream.c)
zephyr_library_sources_ifdef(CONFIG_SENSOR_SHELL_BATTERY shell_battery.c)
zephyr_library_sources_ifdef(CONFIG_SENSOR_ASYNC_API sensor_decoders_init.c default_rtio_sensor.c)

View file

@ -36,6 +36,23 @@ config SENSOR_SHELL
help
This shell provides access to basic sensor data.
config SENSOR_SHELL_STREAM
bool "Sensor shell 'stream' command"
depends on SENSOR_SHELL
help
Add the 'stream' subcommand to the sensor shell. When run on drivers that
support streaming (usually hardware FIFO backed), the shell will continue
to print new values as they come until the stream is closed.
config SENSOR_SHELL_THREAD_STACK_SIZE
int "Stack size for the sensor shell data processing thread"
depends on SENSOR_SHELL_STREAM
default 1024
help
The sensor shell uses a dedicated thread to process data coming from the
sensors in either one-shot or streaming mode. Use this config to control
the size of that thread's stack.
config SENSOR_SHELL_BATTERY
bool "Sensor shell 'battery' command"
depends on SHELL

View file

@ -28,8 +28,10 @@ static void sensor_iodev_submit(struct rtio_iodev_sqe *iodev_sqe)
if (api->submit != NULL) {
api->submit(dev, iodev_sqe);
} else {
} else if (!cfg->is_streaming) {
sensor_submit_fallback(dev, iodev_sqe);
} else {
rtio_iodev_sqe_err(iodev_sqe, -ENOTSUP);
}
}
@ -235,7 +237,7 @@ static void sensor_submit_fallback(const struct device *dev, struct rtio_iodev_s
}
sample_idx += num_samples;
}
LOG_DBG("Total channels in header: %u", header->num_channels);
LOG_DBG("Total channels in header: %" PRIu32, header->num_channels);
rtio_iodev_sqe_ok(iodev_sqe, 0);
}

View file

@ -16,6 +16,8 @@
#include <zephyr/sys/iterable_sections.h>
#include <zephyr/sys/util.h>
#include "sensor_shell.h"
LOG_MODULE_REGISTER(sensor_shell);
#define SENSOR_GET_HELP \
@ -23,6 +25,11 @@ LOG_MODULE_REGISTER(sensor_shell);
"when no channels are provided. Syntax:\n" \
"<device_name> <channel name 0> .. <channel name N>"
#define SENSOR_STREAM_HELP \
"Start/stop streaming sensor data. Data ready trigger will be used if no triggers " \
"are provided. Syntax:\n" \
"<device_name> on|off <trigger name> incl|drop|nop"
#define SENSOR_ATTR_GET_HELP \
"Get the sensor's channel attribute. Syntax:\n" \
"<device_name> [<channel_name 0> <attribute_name 0> .. " \
@ -38,7 +45,7 @@ LOG_MODULE_REGISTER(sensor_shell);
"Get or set the trigger type on a sensor. Currently only supports `data_ready`.\n" \
"<device_name> <on/off> <trigger_name>"
const char *sensor_channel_name[SENSOR_CHAN_ALL] = {
const char *sensor_channel_name[SENSOR_CHAN_COMMON_COUNT] = {
[SENSOR_CHAN_ACCEL_X] = "accel_x",
[SENSOR_CHAN_ACCEL_Y] = "accel_y",
[SENSOR_CHAN_ACCEL_Z] = "accel_z",
@ -96,6 +103,7 @@ const char *sensor_channel_name[SENSOR_CHAN_ALL] = {
[SENSOR_CHAN_GAUGE_DESIGN_VOLTAGE] = "gauge_design_voltage",
[SENSOR_CHAN_GAUGE_DESIRED_VOLTAGE] = "gauge_desired_voltage",
[SENSOR_CHAN_GAUGE_DESIRED_CHARGING_CURRENT] = "gauge_desired_charging_current",
[SENSOR_CHAN_ALL] = "all",
};
static const char *sensor_attribute_name[SENSOR_ATTR_COMMON_COUNT] = {
@ -114,6 +122,7 @@ static const char *sensor_attribute_name[SENSOR_ATTR_COMMON_COUNT] = {
[SENSOR_ATTR_FEATURE_MASK] = "feature_mask",
[SENSOR_ATTR_ALERT] = "alert",
[SENSOR_ATTR_FF_DUR] = "ff_dur",
[SENSOR_ATTR_FIFO_WATERMARK] = "fifo_wm",
};
/* Forward declaration */
@ -146,12 +155,32 @@ static const struct {
TRIGGER_DATA_ENTRY(SENSOR_TRIG_FREEFALL, freefall, NULL),
TRIGGER_DATA_ENTRY(SENSOR_TRIG_MOTION, motion, NULL),
TRIGGER_DATA_ENTRY(SENSOR_TRIG_STATIONARY, stationary, NULL),
TRIGGER_DATA_ENTRY(SENSOR_TRIG_FIFO_WATERMARK, fifo_wm, NULL),
TRIGGER_DATA_ENTRY(SENSOR_TRIG_FIFO_FULL, fifo_full, NULL),
};
/**
* Lookup the sensor trigger data by name
*
* @param name The name of the trigger
* @return < 0 on error
* @return >= 0 if found
*/
static int sensor_trigger_name_lookup(const char *name)
{
for (int i = 0; i < ARRAY_SIZE(sensor_trigger_table); ++i) {
if (strcmp(name, sensor_trigger_table[i].name) == 0) {
return i;
}
}
return -1;
}
enum dynamic_command_context {
NONE,
CTX_GET,
CTX_ATTR_GET_SET,
CTX_STREAM_ON_OFF,
};
static enum dynamic_command_context current_cmd_ctx = NONE;
@ -163,6 +192,7 @@ K_MUTEX_DEFINE(cmd_get_mutex);
static enum sensor_channel iodev_sensor_shell_channels[SENSOR_CHAN_ALL];
static struct sensor_read_config iodev_sensor_shell_read_config = {
.sensor = NULL,
.is_streaming = false,
.channels = iodev_sensor_shell_channels,
.count = 0,
.max = ARRAY_SIZE(iodev_sensor_shell_channels),
@ -234,17 +264,18 @@ static int parse_sensor_value(const char *val_str, struct sensor_value *out)
return 0;
}
struct sensor_shell_processing_context {
const struct device *dev;
const struct shell *sh;
};
static void sensor_shell_processing_callback(int result, uint8_t *buf, uint32_t buf_len,
void *userdata)
void sensor_shell_processing_callback(int result, uint8_t *buf, uint32_t buf_len, void *userdata)
{
struct sensor_shell_processing_context *ctx = userdata;
const struct sensor_decoder_api *decoder;
uint8_t decoded_buffer[128];
struct {
uint64_t base_timestamp_ns;
int count;
uint64_t timestamp_delta;
int64_t values[3];
int8_t shift;
} accumulator_buffer;
int rc;
ARG_UNUSED(buf_len);
@ -260,6 +291,17 @@ static void sensor_shell_processing_callback(int result, uint8_t *buf, uint32_t
return;
}
for (int trigger = 0; decoder->has_trigger != NULL && trigger < SENSOR_TRIG_COMMON_COUNT;
++trigger) {
if (!decoder->has_trigger(buf, trigger)) {
continue;
}
shell_info(ctx->sh, "Trigger (%d / %s) detected", trigger,
(sensor_trigger_table[trigger].name == NULL
? "UNKNOWN"
: sensor_trigger_table[trigger].name));
}
for (int channel = 0; channel < SENSOR_CHAN_ALL; ++channel) {
uint32_t fit = 0;
size_t base_size;
@ -292,6 +334,7 @@ static void sensor_shell_processing_callback(int result, uint8_t *buf, uint32_t
while (decoder->get_frame_count(buf, channel, channel_idx, &frame_count) == 0) {
fit = 0;
memset(&accumulator_buffer, 0, sizeof(accumulator_buffer));
while (decoder->decode(buf, channel, channel_idx, &fit, 1, decoded_buffer) >
0) {
@ -303,41 +346,127 @@ static void sensor_shell_processing_callback(int result, uint8_t *buf, uint32_t
struct sensor_three_axis_data *data =
(struct sensor_three_axis_data *)decoded_buffer;
shell_info(ctx->sh,
"channel idx=%d %s shift=%d "
"value=%" PRIsensor_three_axis_data,
channel, sensor_channel_name[channel],
data->shift,
PRIsensor_three_axis_data_arg(*data, 0));
if (accumulator_buffer.count == 0) {
accumulator_buffer.base_timestamp_ns =
data->header.base_timestamp_ns;
}
accumulator_buffer.count++;
accumulator_buffer.shift = data->shift;
accumulator_buffer.timestamp_delta +=
data->readings[0].timestamp_delta;
accumulator_buffer.values[0] += data->readings[0].values[0];
accumulator_buffer.values[1] += data->readings[0].values[1];
accumulator_buffer.values[2] += data->readings[0].values[2];
break;
}
case SENSOR_CHAN_PROX: {
struct sensor_byte_data *data =
(struct sensor_byte_data *)decoded_buffer;
shell_info(ctx->sh,
"channel idx=%d %s value=%" PRIsensor_byte_data(
is_near),
channel, sensor_channel_name[channel],
PRIsensor_byte_data_arg(*data, 0, is_near));
if (accumulator_buffer.count == 0) {
accumulator_buffer.base_timestamp_ns =
data->header.base_timestamp_ns;
}
accumulator_buffer.count++;
accumulator_buffer.timestamp_delta +=
data->readings[0].timestamp_delta;
accumulator_buffer.values[0] += data->readings[0].is_near;
break;
}
default: {
struct sensor_q31_data *data =
(struct sensor_q31_data *)decoded_buffer;
shell_info(ctx->sh,
"channel idx=%d %s shift=%d "
"value=%" PRIsensor_q31_data,
channel,
(channel >= ARRAY_SIZE(sensor_channel_name))
? ""
: sensor_channel_name[channel],
data->shift, PRIsensor_q31_data_arg(*data, 0));
if (accumulator_buffer.count == 0) {
accumulator_buffer.base_timestamp_ns =
data->header.base_timestamp_ns;
}
accumulator_buffer.count++;
accumulator_buffer.shift = data->shift;
accumulator_buffer.timestamp_delta +=
data->readings[0].timestamp_delta;
accumulator_buffer.values[0] += data->readings[0].value;
break;
}
}
}
/* Print the accumulated value average */
switch (channel) {
case SENSOR_CHAN_ACCEL_XYZ:
case SENSOR_CHAN_GYRO_XYZ:
case SENSOR_CHAN_MAGN_XYZ:
case SENSOR_CHAN_POS_DX: {
struct sensor_three_axis_data *data =
(struct sensor_three_axis_data *)decoded_buffer;
data->header.base_timestamp_ns =
accumulator_buffer.base_timestamp_ns;
data->header.reading_count = 1;
data->shift = accumulator_buffer.shift;
data->readings[0].timestamp_delta =
(uint32_t)(accumulator_buffer.timestamp_delta /
accumulator_buffer.count);
data->readings[0].values[0] = (q31_t)(accumulator_buffer.values[0] /
accumulator_buffer.count);
data->readings[0].values[1] = (q31_t)(accumulator_buffer.values[1] /
accumulator_buffer.count);
data->readings[0].values[2] = (q31_t)(accumulator_buffer.values[2] /
accumulator_buffer.count);
shell_info(ctx->sh,
"channel idx=%d %s shift=%d num_samples=%d "
"value=%" PRIsensor_three_axis_data,
channel, sensor_channel_name[channel],
data->shift, accumulator_buffer.count,
PRIsensor_three_axis_data_arg(*data, 0));
break;
}
case SENSOR_CHAN_PROX: {
struct sensor_byte_data *data =
(struct sensor_byte_data *)decoded_buffer;
data->header.base_timestamp_ns =
accumulator_buffer.base_timestamp_ns;
data->header.reading_count = 1;
data->readings[0].timestamp_delta =
(uint32_t)(accumulator_buffer.timestamp_delta /
accumulator_buffer.count);
data->readings[0].is_near =
accumulator_buffer.values[0] / accumulator_buffer.count;
shell_info(ctx->sh,
"channel idx=%d %s num_samples=%d "
"value=%" PRIsensor_byte_data(is_near),
channel, sensor_channel_name[channel],
accumulator_buffer.count,
PRIsensor_byte_data_arg(*data, 0, is_near));
break;
}
default: {
struct sensor_q31_data *data =
(struct sensor_q31_data *)decoded_buffer;
data->header.base_timestamp_ns =
accumulator_buffer.base_timestamp_ns;
data->header.reading_count = 1;
data->shift = accumulator_buffer.shift;
data->readings[0].timestamp_delta =
(uint32_t)(accumulator_buffer.timestamp_delta /
accumulator_buffer.count);
data->readings[0].value = (q31_t)(accumulator_buffer.values[0] /
accumulator_buffer.count);
shell_info(ctx->sh,
"channel idx=%d %s shift=%d num_samples=%d "
"value=%" PRIsensor_q31_data,
channel,
(channel >= ARRAY_SIZE(sensor_channel_name))
? ""
: sensor_channel_name[channel],
data->shift, accumulator_buffer.count,
PRIsensor_q31_data_arg(*data, 0));
}
}
++channel_idx;
}
}
@ -345,6 +474,7 @@ static void sensor_shell_processing_callback(int result, uint8_t *buf, uint32_t
static int cmd_get_sensor(const struct shell *sh, size_t argc, char *argv[])
{
static struct sensor_shell_processing_context ctx;
const struct device *dev;
int count = 0;
int err;
@ -392,21 +522,27 @@ static int cmd_get_sensor(const struct shell *sh, size_t argc, char *argv[])
iodev_sensor_shell_read_config.sensor = dev;
iodev_sensor_shell_read_config.count = count;
struct sensor_shell_processing_context ctx = {
.dev = dev,
.sh = sh,
};
ctx.dev = dev;
ctx.sh = sh;
err = sensor_read(&iodev_sensor_shell_read, &sensor_read_rtio, &ctx);
if (err < 0) {
shell_error(sh, "Failed to read sensor: %d", err);
}
sensor_processing_with_callback(&sensor_read_rtio, sensor_shell_processing_callback);
if (!IS_ENABLED(CONFIG_SENSOR_SHELL_STREAM)) {
/*
* Streaming enables a thread that polls the RTIO context, so if it's enabled, we
* don't need a blocking read here.
*/
sensor_processing_with_callback(&sensor_read_rtio,
sensor_shell_processing_callback);
}
k_mutex_unlock(&cmd_get_mutex);
return 0;
}
static int cmd_sensor_attr_set(const struct shell *shell_ptr, size_t argc, char *argv[])
{
const struct device *dev;
@ -517,28 +653,9 @@ static int cmd_sensor_attr_get(const struct shell *shell_ptr, size_t argc, char
}
static void channel_name_get(size_t idx, struct shell_static_entry *entry);
SHELL_DYNAMIC_CMD_CREATE(dsub_channel_name, channel_name_get);
static void attribute_name_get(size_t idx, struct shell_static_entry *entry)
{
int cnt = 0;
entry->syntax = NULL;
entry->handler = NULL;
entry->help = NULL;
entry->subcmd = &dsub_channel_name;
for (int i = 0; i < SENSOR_ATTR_COMMON_COUNT; i++) {
if (sensor_attribute_name[i] != NULL) {
if (cnt == idx) {
entry->syntax = sensor_attribute_name[i];
break;
}
cnt++;
}
}
}
static void attribute_name_get(size_t idx, struct shell_static_entry *entry);
SHELL_DYNAMIC_CMD_CREATE(dsub_attribute_name, attribute_name_get);
static void channel_name_get(size_t idx, struct shell_static_entry *entry)
@ -556,7 +673,7 @@ static void channel_name_get(size_t idx, struct shell_static_entry *entry)
entry->subcmd = NULL;
}
for (int i = 0; i < SENSOR_CHAN_ALL; i++) {
for (int i = 0; i < ARRAY_SIZE(sensor_channel_name); i++) {
if (sensor_channel_name[i] != NULL) {
if (cnt == idx) {
entry->syntax = sensor_channel_name[i];
@ -567,6 +684,88 @@ static void channel_name_get(size_t idx, struct shell_static_entry *entry)
}
}
static void attribute_name_get(size_t idx, struct shell_static_entry *entry)
{
int cnt = 0;
entry->syntax = NULL;
entry->handler = NULL;
entry->help = NULL;
entry->subcmd = &dsub_channel_name;
for (int i = 0; i < ARRAY_SIZE(sensor_attribute_name); i++) {
if (sensor_attribute_name[i] != NULL) {
if (cnt == idx) {
entry->syntax = sensor_attribute_name[i];
break;
}
cnt++;
}
}
}
static void trigger_opt_get_for_stream(size_t idx, struct shell_static_entry *entry);
SHELL_DYNAMIC_CMD_CREATE(dsub_trigger_opt_get_for_stream, trigger_opt_get_for_stream);
static void trigger_opt_get_for_stream(size_t idx, struct shell_static_entry *entry)
{
entry->syntax = NULL;
entry->handler = NULL;
entry->help = NULL;
entry->subcmd = NULL;
switch (idx) {
case SENSOR_STREAM_DATA_INCLUDE:
entry->syntax = "incl";
break;
case SENSOR_STREAM_DATA_DROP:
entry->syntax = "drop";
break;
case SENSOR_STREAM_DATA_NOP:
entry->syntax = "nop";
break;
}
}
static void trigger_name_get_for_stream(size_t idx, struct shell_static_entry *entry);
SHELL_DYNAMIC_CMD_CREATE(dsub_trigger_name_for_stream, trigger_name_get_for_stream);
static void trigger_name_get_for_stream(size_t idx, struct shell_static_entry *entry)
{
int cnt = 0;
entry->syntax = NULL;
entry->handler = NULL;
entry->help = NULL;
entry->subcmd = &dsub_trigger_opt_get_for_stream;
for (int i = 0; i < ARRAY_SIZE(sensor_trigger_table); i++) {
if (sensor_trigger_table[i].name != NULL) {
if (cnt == idx) {
entry->syntax = sensor_trigger_table[i].name;
break;
}
cnt++;
}
}
}
static void stream_on_off(size_t idx, struct shell_static_entry *entry)
{
entry->syntax = NULL;
entry->handler = NULL;
entry->help = NULL;
if (idx == 0) {
entry->syntax = "on";
entry->subcmd = &dsub_trigger_name_for_stream;
} else if (idx == 1) {
entry->syntax = "off";
entry->subcmd = NULL;
}
}
SHELL_DYNAMIC_CMD_CREATE(dsub_stream_on_off, stream_on_off);
static void device_name_get(size_t idx, struct shell_static_entry *entry);
SHELL_DYNAMIC_CMD_CREATE(dsub_device_name, device_name_get);
@ -603,7 +802,7 @@ static void trigger_name_get(size_t idx, struct shell_static_entry *entry)
entry->help = NULL;
entry->subcmd = NULL;
for (int i = 0; i < SENSOR_TRIG_COMMON_COUNT; i++) {
for (int i = 0; i < ARRAY_SIZE(sensor_trigger_table); i++) {
if (sensor_trigger_table[i].name != NULL) {
if (cnt == idx) {
entry->syntax = sensor_trigger_table[i].name;
@ -649,6 +848,18 @@ static void device_name_get_for_trigger(size_t idx, struct shell_static_entry *e
SHELL_DYNAMIC_CMD_CREATE(dsub_trigger, device_name_get_for_trigger);
static void device_name_get_for_stream(size_t idx, struct shell_static_entry *entry)
{
const struct device *dev = shell_device_lookup(idx, NULL);
current_cmd_ctx = CTX_STREAM_ON_OFF;
entry->syntax = (dev != NULL) ? dev->name : NULL;
entry->handler = NULL;
entry->help = NULL;
entry->subcmd = &dsub_stream_on_off;
}
SHELL_DYNAMIC_CMD_CREATE(dsub_device_name_for_stream, device_name_get_for_stream);
static int cmd_get_sensor_info(const struct shell *sh, size_t argc, char **argv)
{
ARG_UNUSED(argc);
@ -738,7 +949,7 @@ static void data_ready_trigger_handler(const struct device *sensor,
static int cmd_trig_sensor(const struct shell *sh, size_t argc, char **argv)
{
const struct device *dev;
enum sensor_trigger_type trigger;
int trigger;
int err;
if (argc < 4) {
@ -754,12 +965,8 @@ static int cmd_trig_sensor(const struct shell *sh, size_t argc, char **argv)
}
/* Map the trigger string to an enum value */
for (trigger = 0; trigger < ARRAY_SIZE(sensor_trigger_table); trigger++) {
if (strcmp(argv[3], sensor_trigger_table[trigger].name) == 0) {
break;
}
}
if (trigger >= SENSOR_TRIG_COMMON_COUNT || sensor_trigger_table[trigger].handler == NULL) {
trigger = sensor_trigger_name_lookup(argv[3]);
if (trigger < 0 || sensor_trigger_table[trigger].handler == NULL) {
shell_error(sh, "Unsupported trigger type (%s)", argv[3]);
return -ENOTSUP;
}
@ -792,6 +999,8 @@ SHELL_STATIC_SUBCMD_SET_CREATE(sub_sensor,
cmd_sensor_attr_set, 2, 255),
SHELL_CMD_ARG(attr_get, &dsub_device_name_for_attr, SENSOR_ATTR_GET_HELP,
cmd_sensor_attr_get, 2, 255),
SHELL_COND_CMD(CONFIG_SENSOR_SHELL_STREAM, stream, &dsub_device_name_for_stream,
SENSOR_STREAM_HELP, cmd_sensor_stream),
SHELL_COND_CMD(CONFIG_SENSOR_INFO, info, NULL, SENSOR_INFO_HELP,
cmd_get_sensor_info),
SHELL_CMD_ARG(trig, &dsub_trigger, SENSOR_TRIG_HELP, cmd_trig_sensor,

View file

@ -0,0 +1,26 @@
/*
* Copyright (c) 2023 Google LLC
*
* SPDX-License-Identifier: Apache-2.0
*/
#ifndef ZEPHYR_DRIVERS_SENSOR_SENSOR_SHELL_H
#define ZEPHYR_DRIVERS_SENSOR_SENSOR_SHELL_H
#include <zephyr/device.h>
#include <zephyr/drivers/sensor.h>
#include <zephyr/rtio/rtio.h>
#include <zephyr/shell/shell.h>
struct sensor_shell_processing_context {
const struct device *dev;
const struct shell *sh;
};
extern struct rtio sensor_read_rtio;
int cmd_sensor_stream(const struct shell *shell_ptr, size_t argc, char *argv[]);
void sensor_shell_processing_callback(int result, uint8_t *buf, uint32_t buf_len, void *userdata);
#endif /* ZEPHYR_DRIVERS_SENSOR_SENSOR_SHELL_H */

View file

@ -0,0 +1,124 @@
/*
* Copyright (c) 2023 Google LLC
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <stdlib.h>
#include <string.h>
#include <zephyr/drivers/sensor.h>
#include <zephyr/kernel.h>
#include "sensor_shell.h"
/* Create a single common config for streaming */
static struct sensor_stream_trigger iodev_sensor_shell_trigger;
static struct sensor_read_config iodev_sensor_shell_stream_config = {
.sensor = NULL,
.is_streaming = true,
.triggers = &iodev_sensor_shell_trigger,
.count = 0,
.max = 1,
};
RTIO_IODEV_DEFINE(iodev_sensor_shell_stream, &__sensor_iodev_api,
&iodev_sensor_shell_stream_config);
static void sensor_shell_processing_entry_point(void *a, void *b, void *c)
{
ARG_UNUSED(a);
ARG_UNUSED(b);
ARG_UNUSED(c);
while (true) {
sensor_processing_with_callback(&sensor_read_rtio,
sensor_shell_processing_callback);
}
}
K_THREAD_DEFINE(sensor_shell_processing_tid, CONFIG_SENSOR_SHELL_THREAD_STACK_SIZE,
sensor_shell_processing_entry_point, NULL, NULL, NULL, 0, 0, 0);
int cmd_sensor_stream(const struct shell *shell_ptr, size_t argc, char *argv[])
{
static struct rtio_sqe *current_streaming_handle;
static struct sensor_shell_processing_context ctx;
const struct device *dev = device_get_binding(argv[1]);
if (argc != 5 && argc != 3) {
shell_error(shell_ptr, "Wrong number of arguments (%zu)", argc);
return -EINVAL;
}
if (dev == NULL) {
shell_error(shell_ptr, "Device unknown (%s)", argv[1]);
return -ENODEV;
}
if (current_streaming_handle != NULL) {
shell_info(shell_ptr, "Disabling existing stream");
rtio_sqe_cancel(current_streaming_handle);
}
if (strcmp("off", argv[2]) == 0) {
return 0;
}
if (strcmp("on", argv[2]) != 0) {
shell_error(shell_ptr, "Unknown streaming operation (%s)", argv[2]);
return -EINVAL;
}
if (strcmp("double_tap", argv[3]) == 0) {
iodev_sensor_shell_trigger.trigger = SENSOR_TRIG_DOUBLE_TAP;
} else if (strcmp("data_ready", argv[3]) == 0) {
iodev_sensor_shell_trigger.trigger = SENSOR_TRIG_DATA_READY;
} else if (strcmp("delta", argv[3]) == 0) {
iodev_sensor_shell_trigger.trigger = SENSOR_TRIG_DELTA;
} else if (strcmp("freefall", argv[3]) == 0) {
iodev_sensor_shell_trigger.trigger = SENSOR_TRIG_FREEFALL;
} else if (strcmp("motion", argv[3]) == 0) {
iodev_sensor_shell_trigger.trigger = SENSOR_TRIG_MOTION;
} else if (strcmp("near_far", argv[3]) == 0) {
iodev_sensor_shell_trigger.trigger = SENSOR_TRIG_NEAR_FAR;
} else if (strcmp("stationary", argv[3]) == 0) {
iodev_sensor_shell_trigger.trigger = SENSOR_TRIG_STATIONARY;
} else if (strcmp("threshold", argv[3]) == 0) {
iodev_sensor_shell_trigger.trigger = SENSOR_TRIG_THRESHOLD;
} else if (strcmp("fifo_wm", argv[3]) == 0) {
iodev_sensor_shell_trigger.trigger = SENSOR_TRIG_FIFO_WATERMARK;
} else if (strcmp("fifo_full", argv[3]) == 0) {
iodev_sensor_shell_trigger.trigger = SENSOR_TRIG_FIFO_FULL;
} else if (strcmp("tap", argv[3]) == 0) {
iodev_sensor_shell_trigger.trigger = SENSOR_TRIG_TAP;
} else {
shell_error(shell_ptr, "Invalid trigger (%s)", argv[3]);
return -EINVAL;
}
if (strcmp("incl", argv[4]) == 0) {
iodev_sensor_shell_trigger.opt = SENSOR_STREAM_DATA_INCLUDE;
} else if (strcmp("drop", argv[4]) == 0) {
iodev_sensor_shell_trigger.opt = SENSOR_STREAM_DATA_DROP;
} else if (strcmp("nop", argv[4]) == 0) {
iodev_sensor_shell_trigger.opt = SENSOR_STREAM_DATA_NOP;
} else {
shell_error(shell_ptr, "Unknown trigger op (%s)", argv[4]);
return -EINVAL;
}
shell_print(shell_ptr, "Enabling stream...");
iodev_sensor_shell_stream_config.sensor = dev;
iodev_sensor_shell_stream_config.count = 1;
ctx.dev = dev;
ctx.sh = shell_ptr;
int rc = sensor_stream(&iodev_sensor_shell_stream, &sensor_read_rtio, &ctx,
&current_streaming_handle);
if (rc != 0) {
shell_error(shell_ptr, "Failed to start stream");
}
return rc;
}

View file

@ -251,6 +251,12 @@ enum sensor_trigger_type {
/** Trigger fires when no motion has been detected for a while. */
SENSOR_TRIG_STATIONARY,
/** Trigger fires when the FIFO watermark has been reached. */
SENSOR_TRIG_FIFO_WATERMARK,
/** Trigger fires when the FIFO becomes full. */
SENSOR_TRIG_FIFO_FULL,
/**
* Number of all common sensor triggers.
*/
@ -328,6 +334,8 @@ enum sensor_attribute {
* to the new sampling frequency.
*/
SENSOR_ATTR_FF_DUR,
/** Watermark % for the hardware fifo interrupt */
SENSOR_ATTR_FIFO_WATERMARK,
/**
* Number of all common sensor attributes.
*/
@ -466,6 +474,15 @@ struct sensor_decoder_api {
*/
int (*decode)(const uint8_t *buffer, enum sensor_channel channel, size_t channel_idx,
uint32_t *fit, uint16_t max_count, void *data_out);
/**
* @brief Check if the given trigger type is present
*
* @param[in] buffer The buffer provided on the @ref rtio context
* @param[in] trigger The trigger type in question
* @return Whether the trigger is present in the buffer
*/
bool (*has_trigger)(const uint8_t *buffer, enum sensor_trigger_type trigger);
};
/**
@ -538,13 +555,38 @@ int sensor_natively_supported_channel_size_info(enum sensor_channel channel, siz
typedef int (*sensor_get_decoder_t)(const struct device *dev,
const struct sensor_decoder_api **api);
/**
* @brief Options for what to do with the associated data when a trigger is consumed
*/
enum sensor_stream_data_opt {
/** @brief Include whatever data is associated with the trigger */
SENSOR_STREAM_DATA_INCLUDE = 0,
/** @brief Do nothing with the associated trigger data, it may be consumed later */
SENSOR_STREAM_DATA_NOP = 1,
/** @brief Flush/clear whatever data is associated with the trigger */
SENSOR_STREAM_DATA_DROP = 2,
};
struct sensor_stream_trigger {
enum sensor_trigger_type trigger;
enum sensor_stream_data_opt opt;
};
#define SENSOR_STREAM_TRIGGER_PREP(_trigger, _opt) \
{ \
.trigger = (_trigger), .opt = (_opt), \
}
/*
* Internal data structure used to store information about the IODevice for async reading and
* streaming sensor data.
*/
struct sensor_read_config {
const struct device *sensor;
enum sensor_channel *const channels;
const bool is_streaming;
union {
enum sensor_channel *const channels;
struct sensor_stream_trigger *const triggers;
};
size_t count;
const size_t max;
};
@ -564,14 +606,45 @@ struct sensor_read_config {
* @endcode
*/
#define SENSOR_DT_READ_IODEV(name, dt_node, ...) \
static enum sensor_channel __channel_array_##name[] = {__VA_ARGS__}; \
static struct sensor_read_config __sensor_read_config_##name = { \
static enum sensor_channel _CONCAT(__channel_array_, name)[] = {__VA_ARGS__}; \
static struct sensor_read_config _CONCAT(__sensor_read_config_, name) = { \
.sensor = DEVICE_DT_GET(dt_node), \
.channels = __channel_array_##name, \
.count = ARRAY_SIZE(__channel_array_##name), \
.max = ARRAY_SIZE(__channel_array_##name), \
.is_streaming = false, \
.channels = _CONCAT(__channel_array_, name), \
.count = ARRAY_SIZE(_CONCAT(__channel_array_, name)), \
.max = ARRAY_SIZE(_CONCAT(__channel_array_, name)), \
}; \
RTIO_IODEV_DEFINE(name, &__sensor_iodev_api, &__sensor_read_config_##name)
RTIO_IODEV_DEFINE(name, &__sensor_iodev_api, _CONCAT(&__sensor_read_config_, name))
/**
* @brief Define a stream instance of a sensor
*
* Use this macro to generate a @ref rtio_iodev for starting a stream that's triggered by specific
* interrupts. Example:
*
* @code(.c)
* SENSOR_DT_STREAM_IODEV(imu_stream, DT_ALIAS(imu),
* {SENSOR_TRIG_FIFO_WATERMARK, SENSOR_STREAM_DATA_INCLUDE},
* {SENSOR_TRIG_FIFO_FULL, SENSOR_STREAM_DATA_NOP});
*
* int main(void) {
* struct rtio_sqe *handle;
* sensor_stream(&imu_stream, &rtio, NULL, &handle);
* k_msleep(1000);
* rtio_sqe_cancel(handle);
* }
* @endcode
*/
#define SENSOR_DT_STREAM_IODEV(name, dt_node, ...) \
static struct sensor_stream_trigger _CONCAT(__trigger_array_, name)[] = {__VA_ARGS__}; \
static struct sensor_read_config _CONCAT(__sensor_read_config_, name) = { \
.sensor = DEVICE_DT_GET(dt_node), \
.is_streaming = true, \
.triggers = _CONCAT(__trigger_array_, name), \
.count = ARRAY_SIZE(_CONCAT(__trigger_array_, name)), \
.max = ARRAY_SIZE(_CONCAT(__trigger_array_, name)), \
}; \
RTIO_IODEV_DEFINE(name, &__sensor_iodev_api, &_CONCAT(__sensor_read_config_, name))
/* Used to submit an RTIO sqe to the sensor's iodev */
typedef int (*sensor_submit_t)(const struct device *sensor, struct rtio_iodev_sqe *sqe);
@ -880,7 +953,7 @@ static inline int z_impl_sensor_reconfigure_read_iodev(struct rtio_iodev *iodev,
{
struct sensor_read_config *cfg = (struct sensor_read_config *)iodev->data;
if (cfg->max < num_channels) {
if (cfg->max < num_channels || cfg->is_streaming) {
return -ENOMEM;
}
@ -888,6 +961,28 @@ static inline int z_impl_sensor_reconfigure_read_iodev(struct rtio_iodev *iodev,
memcpy(cfg->channels, channels, num_channels * sizeof(enum sensor_channel));
cfg->count = num_channels;
return 0;
}
static inline int sensor_stream(struct rtio_iodev *iodev, struct rtio *ctx, void *userdata,
struct rtio_sqe **handle)
{
if (IS_ENABLED(CONFIG_USERSPACE)) {
struct rtio_sqe sqe;
rtio_sqe_prep_read_multishot(&sqe, iodev, RTIO_PRIO_NORM, userdata);
rtio_sqe_copy_in_get_handles(ctx, &sqe, handle, 1);
} else {
struct rtio_sqe *sqe = rtio_sqe_acquire(ctx);
if (sqe == NULL) {
return -ENOMEM;
}
if (handle != NULL) {
*handle = sqe;
}
rtio_sqe_prep_read_multishot(sqe, iodev, RTIO_PRIO_NORM, userdata);
}
rtio_submit(ctx, 0);
return 0;
}

View file

@ -0,0 +1,3 @@
# Copyright (c) 2023 Google LLC
# SPDX-License-Identifier: Apache-2.0
CONFIG_SPI_RTIO=y

View file

@ -4,6 +4,8 @@ tests:
sample.sensor.shell:
integration_platforms:
- frdm_k64f
# TODO Remove once #63414 is resolved
platform_exclude: gd32l233r_eval
filter: ( CONFIG_UART_CONSOLE and CONFIG_SERIAL_SUPPORT_INTERRUPT )
tags: shell
harness: keyboard