micropython-ulab/docs/ulab-utils.ipynb
Zoltán Vörös c0b3262be4
Add keyword arguments to spectrogram (#657)
* re-work spectrogram method, so that RAM can be re-used

* update docs with spectrogram changes
2024-09-14 12:18:14 +02:00

620 lines
21 KiB
Text

{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-04T18:21:22.822563Z",
"start_time": "2021-03-04T18:21:18.656643Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Populating the interactive namespace from numpy and matplotlib\n"
]
}
],
"source": [
"%pylab inline"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Notebook magic"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"ExecuteTime": {
"end_time": "2022-01-29T16:53:11.972661Z",
"start_time": "2022-01-29T16:53:11.965952Z"
}
},
"outputs": [],
"source": [
"from IPython.core.magic import Magics, magics_class, line_cell_magic\n",
"from IPython.core.magic import cell_magic, register_cell_magic, register_line_magic\n",
"from IPython.core.magic_arguments import argument, magic_arguments, parse_argstring\n",
"import subprocess\n",
"import os"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"ExecuteTime": {
"end_time": "2022-01-29T16:59:24.652277Z",
"start_time": "2022-01-29T16:59:24.639828Z"
}
},
"outputs": [],
"source": [
"@magics_class\n",
"class PyboardMagic(Magics):\n",
" @cell_magic\n",
" @magic_arguments()\n",
" @argument('-skip')\n",
" @argument('-unix')\n",
" @argument('-pyboard')\n",
" @argument('-file')\n",
" @argument('-data')\n",
" @argument('-time')\n",
" @argument('-memory')\n",
" def micropython(self, line='', cell=None):\n",
" args = parse_argstring(self.micropython, line)\n",
" if args.skip: # doesn't care about the cell's content\n",
" print('skipped execution')\n",
" return None # do not parse the rest\n",
" if args.unix: # tests the code on the unix port. Note that this works on unix only\n",
" with open('/dev/shm/micropython.py', 'w') as fout:\n",
" fout.write(cell)\n",
" proc = subprocess.Popen([\"../micropython/ports/unix/build-2/micropython-2\", \"/dev/shm/micropython.py\"], \n",
" stdout=subprocess.PIPE, stderr=subprocess.PIPE)\n",
" print(proc.stdout.read().decode(\"utf-8\"))\n",
" print(proc.stderr.read().decode(\"utf-8\"))\n",
" return None\n",
" if args.file: # can be used to copy the cell content onto the pyboard's flash\n",
" spaces = \" \"\n",
" try:\n",
" with open(args.file, 'w') as fout:\n",
" fout.write(cell.replace('\\t', spaces))\n",
" printf('written cell to {}'.format(args.file))\n",
" except:\n",
" print('Failed to write to disc!')\n",
" return None # do not parse the rest\n",
" if args.data: # can be used to load data from the pyboard directly into kernel space\n",
" message = pyb.exec(cell)\n",
" if len(message) == 0:\n",
" print('pyboard >>>')\n",
" else:\n",
" print(message.decode('utf-8'))\n",
" # register new variable in user namespace\n",
" self.shell.user_ns[args.data] = string_to_matrix(message.decode(\"utf-8\"))\n",
" \n",
" if args.time: # measures the time of executions\n",
" pyb.exec('import utime')\n",
" message = pyb.exec('t = utime.ticks_us()\\n' + cell + '\\ndelta = utime.ticks_diff(utime.ticks_us(), t)' + \n",
" \"\\nprint('execution time: {:d} us'.format(delta))\")\n",
" print(message.decode('utf-8'))\n",
" \n",
" if args.memory: # prints out memory information \n",
" message = pyb.exec('from micropython import mem_info\\nprint(mem_info())\\n')\n",
" print(\"memory before execution:\\n========================\\n\", message.decode('utf-8'))\n",
" message = pyb.exec(cell)\n",
" print(\">>> \", message.decode('utf-8'))\n",
" message = pyb.exec('print(mem_info())')\n",
" print(\"memory after execution:\\n========================\\n\", message.decode('utf-8'))\n",
"\n",
" if args.pyboard:\n",
" message = pyb.exec(cell)\n",
" print(message.decode('utf-8'))\n",
"\n",
"ip = get_ipython()\n",
"ip.register_magics(PyboardMagic)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## pyboard"
]
},
{
"cell_type": "code",
"execution_count": 57,
"metadata": {
"ExecuteTime": {
"end_time": "2020-05-07T07:35:35.126401Z",
"start_time": "2020-05-07T07:35:35.105824Z"
}
},
"outputs": [],
"source": [
"import pyboard\n",
"pyb = pyboard.Pyboard('/dev/ttyACM0')\n",
"pyb.enter_raw_repl()"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"ExecuteTime": {
"end_time": "2020-05-19T19:11:18.145548Z",
"start_time": "2020-05-19T19:11:18.137468Z"
}
},
"outputs": [],
"source": [
"pyb.exit_raw_repl()\n",
"pyb.close()"
]
},
{
"cell_type": "code",
"execution_count": 58,
"metadata": {
"ExecuteTime": {
"end_time": "2020-05-07T07:35:38.725924Z",
"start_time": "2020-05-07T07:35:38.645488Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
}
],
"source": [
"%%micropython -pyboard 1\n",
"\n",
"import utime\n",
"import ulab as np\n",
"\n",
"def timeit(n=1000):\n",
" def wrapper(f, *args, **kwargs):\n",
" func_name = str(f).split(' ')[1]\n",
" def new_func(*args, **kwargs):\n",
" run_times = np.zeros(n, dtype=np.uint16)\n",
" for i in range(n):\n",
" t = utime.ticks_us()\n",
" result = f(*args, **kwargs)\n",
" run_times[i] = utime.ticks_diff(utime.ticks_us(), t)\n",
" print('{}() execution times based on {} cycles'.format(func_name, n, (delta2-delta1)/n))\n",
" print('\\tbest: %d us'%np.min(run_times))\n",
" print('\\tworst: %d us'%np.max(run_times))\n",
" print('\\taverage: %d us'%np.mean(run_times))\n",
" print('\\tdeviation: +/-%.3f us'%np.std(run_times)) \n",
" return result\n",
" return new_func\n",
" return wrapper\n",
"\n",
"def timeit(f, *args, **kwargs):\n",
" func_name = str(f).split(' ')[1]\n",
" def new_func(*args, **kwargs):\n",
" t = utime.ticks_us()\n",
" result = f(*args, **kwargs)\n",
" print('execution time: ', utime.ticks_diff(utime.ticks_us(), t), ' us')\n",
" return result\n",
" return new_func"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"__END_OF_DEFS__"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# ulab utilities\n",
"\n",
"\n",
"There might be cases, when the format of your data does not conform to `ulab`, i.e., there is no obvious way to map the data to any of the five supported `dtype`s. A trivial example is an ADC or microphone signal with 32-bit resolution. For such cases, `ulab` defines the `utils` module, which, at the moment, has four functions that are not `numpy` compatible, but which should ease interfacing `ndarray`s to peripheral devices. \n",
"\n",
"The `utils` module can be enabled by setting the `ULAB_HAS_UTILS_MODULE` constant to 1 in [ulab.h](https://github.com/v923z/micropython-ulab/blob/master/code/ulab.h):\n",
"\n",
"```c\n",
"#ifndef ULAB_HAS_UTILS_MODULE\n",
"#define ULAB_HAS_UTILS_MODULE (1)\n",
"#endif\n",
"```\n",
"\n",
"This still does not compile any functions into the firmware. You can add a function by setting the corresponding pre-processor constant to 1. E.g., \n",
"\n",
"```c\n",
"#ifndef ULAB_UTILS_HAS_FROM_INT16_BUFFER\n",
"#define ULAB_UTILS_HAS_FROM_INT16_BUFFER (1)\n",
"#endif\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## from_int32_buffer, from_uint32_buffer\n",
"\n",
"With the help of `utils.from_int32_buffer`, and `utils.from_uint32_buffer`, it is possible to convert 32-bit integer buffers to `ndarrays` of float type. These functions have a syntax similar to `numpy.frombuffer`; they support the `count=-1`, and `offset=0` keyword arguments. However, in addition, they also accept `out=None`, and `byteswap=False`. \n",
"\n",
"Here is an example without keyword arguments"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-05T06:53:26.256516Z",
"start_time": "2021-03-05T06:53:26.007070Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"a: bytearray(b'\\x01\\x01\\x00\\x00\\x00\\x00\\x00\\xff')\n",
"\n",
"unsigned integers: array([257.0, 4278190080.000001], dtype=float64)\n",
"\n",
"b: bytearray(b'\\x01\\x01\\x00\\x00\\x00\\x00\\x00\\xff')\n",
"\n",
"signed integers: array([257.0, -16777216.0], dtype=float64)\n",
"\n",
"\n"
]
}
],
"source": [
"%%micropython -unix 1\n",
"\n",
"from ulab import numpy as np\n",
"from ulab import utils\n",
"\n",
"a = bytearray([1, 1, 0, 0, 0, 0, 0, 255])\n",
"print('a: ', a)\n",
"print()\n",
"print('unsigned integers: ', utils.from_uint32_buffe\n",
"print('original vector:\\n', y)\n",
"print('\\nspectrum:\\n', a)r(a))\n",
"\n",
"b = bytearray([1, 1, 0, 0, 0, 0, 0, 255])\n",
"print('\\nb: ', b)\n",
"print()\n",
"print('signed integers: ', utils.from_int32_buffer(b))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The meaning of `count`, and `offset` is similar to that in `numpy.frombuffer`. `count` is the number of floats that will be converted, while `offset` would discard the first `offset` number of bytes from the buffer before the conversion.\n",
"\n",
"In the example above, repeated calls to either of the functions returns a new `ndarray`. You can save RAM by supplying the `out` keyword argument with a pre-defined `ndarray` of sufficient size, in which case the results will be inserted into the `ndarray`. If the `dtype` of `out` is not `float`, a `TypeError` exception will be raised."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-05T06:53:41.551440Z",
"start_time": "2021-03-05T06:53:41.534163Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"b: bytearray(b'\\x01\\x00\\x01\\x00\\x00\\x01\\x00\\x01')\n",
"a: array([65537.0, 16777472.0], dtype=float64)\n",
"\n",
"\n"
]
}
],
"source": [
"%%micropython -unix 1\n",
"\n",
"from ulab import numpy as np\n",
"from ulab import utils\n",
"\n",
"a = np.array([1, 2], dtype=np.float)\n",
"b = bytearray([1, 0, 1, 0, 0, 1, 0, 1])\n",
"print('b: ', b)\n",
"utils.from_uint32_buffer(b, out=a)\n",
"print('a: ', a)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Finally, since there is no guarantee that the endianness of a particular peripheral device supplying the buffer is the same as that of the microcontroller, `from_(u)intbuffer` allows a conversion via the `byteswap` keyword argument."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-05T06:53:52.242950Z",
"start_time": "2021-03-05T06:53:52.229160Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"a: bytearray(b'\\x01\\x00\\x00\\x00\\x00\\x00\\x00\\x01')\n",
"buffer without byteswapping: array([1.0, 16777216.0], dtype=float64)\n",
"buffer with byteswapping: array([16777216.0, 1.0], dtype=float64)\n",
"\n",
"\n"
]
}
],
"source": [
"%%micropython -unix 1\n",
"\n",
"from ulab import numpy as np\n",
"from ulab import utils\n",
"\n",
"a = bytearray([1, 0, 0, 0, 0, 0, 0, 1])\n",
"print('a: ', a)\n",
"print('buffer without byteswapping: ', utils.from_uint32_buffer(a))\n",
"print('buffer with byteswapping: ', utils.from_uint32_buffer(a, byteswap=True))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## from_int16_buffer, from_uint16_buffer\n",
"\n",
"These two functions are identical to `utils.from_int32_buffer`, and `utils.from_uint32_buffer`, with the exception that they convert 16-bit integers to floating point `ndarray`s. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## spectrogram\n",
"\n",
"In addition to the Fourier transform and its inverse, `ulab` also sports a function called `spectrogram`, which returns the absolute value of the Fourier transform, also known as the power spectrum. This could be used to find the dominant spectral component in a time series. The positional arguments are treated in the same way as in `fft`, and `ifft`. This means that, if the firmware was compiled with complex support and `ULAB_FFT_IS_NUMPY_COMPATIBLE` is defined to be 1 in `ulab.h`, the input can also be a complex array. \n",
"\n",
"And easy way to find out if the FFT is `numpy`-compatible is to check the number of values `fft.fft` returns, when called with a single real argument of length other than 2: "
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"FFT is numpy compatible (complex inputs/outputs)\n",
"\n",
"\n"
]
}
],
"source": [
"%%micropython -unix 1\n",
"\n",
"from ulab import numpy as np\n",
"\n",
"if len(np.fft.fft(np.zeros(4))) == 2:\n",
" print('FFT is NOT numpy compatible (real and imaginary parts are treated separately)')\n",
"else:\n",
" print('FFT is numpy compatible (complex inputs/outputs)')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Depending on the `numpy`-compatibility of the FFT, the `spectrogram` function takes one or two positional arguments, and three keyword arguments. If the FFT is `numpy` compatible, one positional argument is allowed, and it is a 1D real or complex `ndarray`. If the FFT is not `numpy`-compatible, if a single argument is supplied, it will be treated as the real part of the input, and if two positional arguments are supplied, they are treated as the real and imaginary parts of the signal.\n",
"\n",
"The keyword arguments are as follows:\n",
"\n",
"1. `scratchpad = None`: must be a 1D, dense, floating point array, twice as long as the input array; the `scratchpad` will be used as a temporary internal buffer to perform the Fourier transform; the `scratchpad` can repeatedly be re-used.\n",
"1. `out = None`: must be a 1D, not necessarily dense, floating point array that will store the results\n",
"1. `log = False`: must be either `True`, or `False`; if `True`, the `spectrogram` returns the logarithm of the absolute values of the Fourier transform."
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {
"ExecuteTime": {
"end_time": "2022-01-29T16:59:56.400603Z",
"start_time": "2022-01-29T16:59:56.374748Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"original vector:\n",
" array([0.0, 0.009775015390171337, 0.01954909674625918, ..., -0.5275140569487312, -0.5357931822978732, -0.5440211108893697], dtype=float64)\n",
"\n",
"spectrum:\n",
" array([187.8635087634578, 315.3112063607119, 347.8814873399375, ..., 84.45888934298905, 347.8814873399374, 315.3112063607118], dtype=float64)\n",
"\n",
"\n"
]
}
],
"source": [
"%%micropython -unix 1\n",
"\n",
"from ulab import numpy as np\n",
"from ulab import utils as utils\n",
"\n",
"x = np.linspace(0, 10, num=1024)\n",
"y = np.sin(x)\n",
"\n",
"a = utils.spectrogram(y)\n",
"\n",
"print('original vector:\\n', y)\n",
"print('\\nspectrum:\\n', a)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"As such, `spectrogram` is really just a shorthand for `np.abs(np.fft.fft(signal))`, if the FFT is `numpy`-compatible, or `np.sqrt(a*a + b*b)` if the FFT returns the real (`a`) and imaginary (`b`) parts separately. However, `spectrogram` saves significant amounts of RAM: the expression `a*a + b*b` has to allocate memory for `a*a`, `b*b`, and finally, their sum. Similarly, `np.abs` returns a new array. This issue is compounded even more, if `np.log()` is used on the absolute value. \n",
"\n",
"In contrast, `spectrogram` handles all calculations in the same internal arrays, and allows one to re-use previously reserved RAM. This can be especially useful in cases, when `spectogram` is called repeatedly, as in the snippet below."
]
},
{
"cell_type": "code",
"execution_count": 34,
"metadata": {
"ExecuteTime": {
"end_time": "2022-01-29T16:59:48.485610Z",
"start_time": "2022-01-29T16:59:48.462593Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"signal: array([-27.38260169844543, 6.237834411021073, -0.4038327279002965, ..., -0.9795967096969854, -0.4038327279002969, 6.237834411021073], dtype=float64)\n",
"out: array([-27.38260169844543, 6.237834411021073, -0.4038327279002965, ..., -0.9795967096969854, -0.4038327279002969, 6.237834411021073], dtype=float64)\n",
"\n",
"\n"
]
}
],
"source": [
"%%micropython -unix 1\n",
"\n",
"from ulab import numpy as np\n",
"from ulab import utils as utils\n",
"\n",
"n = 1024\n",
"t = np.linspace(0, 2 * np.pi, num=1024)\n",
"scratchpad = np.zeros(2 * n)\n",
"\n",
"for _ in range(10):\n",
" signal = np.sin(t)\n",
" utils.spectrogram(signal, out=signal, scratchpad=scratchpad, log=True)\n",
"\n",
"print('signal: ', signal)\n",
"\n",
"for _ in range(10):\n",
" signal = np.sin(t)\n",
" out = np.log(utils.spectrogram(signal))\n",
"\n",
"print('out: ', out)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note that `scratchpad` is reserved only once, and then is re-used in the first loop. By assigning `signal` to the output, we save additional RAM. This approach avoids the usual problem of memory fragmentation, which would happen in the second loop, where both `spectrogram`, and `np.log` must reserve RAM in each iteration."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.7"
},
"toc": {
"base_numbering": 1,
"nav_menu": {},
"number_sections": true,
"sideBar": true,
"skip_h1_title": false,
"title_cell": "Table of Contents",
"title_sidebar": "Contents",
"toc_cell": false,
"toc_position": {
"height": "calc(100% - 180px)",
"left": "10px",
"top": "150px",
"width": "382.797px"
},
"toc_section_display": true,
"toc_window_display": true
},
"varInspector": {
"cols": {
"lenName": 16,
"lenType": 16,
"lenVar": 40
},
"kernels_config": {
"python": {
"delete_cmd_postfix": "",
"delete_cmd_prefix": "del ",
"library": "var_list.py",
"varRefreshCmd": "print(var_dic_list())"
},
"r": {
"delete_cmd_postfix": ") ",
"delete_cmd_prefix": "rm(",
"library": "var_list.r",
"varRefreshCmd": "cat(var_dic_list()) "
}
},
"types_to_exclude": [
"module",
"function",
"builtin_function_or_method",
"instance",
"_Feature"
],
"window_display": false
}
},
"nbformat": 4,
"nbformat_minor": 4
}