Compare commits

..

1 commit

Author SHA1 Message Date
dependabot[bot]
140623fefb
Bump black from 19.3b0 to 24.3.0
Bumps [black](https://github.com/psf/black) from 19.3b0 to 24.3.0.
- [Release notes](https://github.com/psf/black/releases)
- [Changelog](https://github.com/psf/black/blob/main/CHANGES.md)
- [Commits](https://github.com/psf/black/commits/24.3.0)

---
updated-dependencies:
- dependency-name: black
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-03-20 15:48:51 +00:00
42 changed files with 2052 additions and 3289 deletions

View file

@ -16,11 +16,11 @@ jobs:
run: echo "$GITHUB_CONTEXT"
- name: Translate Repo Name For Build Tools filename_prefix
id: repo-name
run: echo "repo-name=circup" >> $GITHUB_OUTPUT
- name: Set up Python 3.11
run: echo ::set-output name=repo-name::circup
- name: Set up Python 3.7
uses: actions/setup-python@v1
with:
python-version: 3.11
python-version: 3.7
- name: Pip install Sphinx & pre-commit
run: |
pip install --force-reinstall Sphinx sphinx-rtd-theme pre-commit

View file

@ -1,6 +1,5 @@
# SPDX-FileCopyrightText: 2017 Scott Shawcroft, written for Adafruit Industries
# SPDX-FileCopyrightText: 2021 James Carr
#
# SPDX-FileCopyrightText: 2019 Nicholas Tollervey, written for Adafruit Industries
# SPDX-License-Identifier: MIT
name: Release Actions
@ -15,20 +14,26 @@ jobs:
steps:
- uses: actions/checkout@v4
with:
filter: 'blob:none'
depth: 0
show-progress: false
- name: Check For setup.py
id: need-pypi
run: |
echo ::set-output name=setup-py::$( find . -wholename './setup.py' )
- name: Set up Python
uses: actions/setup-python@v5
if: contains(steps.need-pypi.outputs.setup-py, 'setup.py')
uses: actions/setup-python@v1
with:
python-version: '3.11'
python-version: '3.x'
- name: Install dependencies
if: contains(steps.need-pypi.outputs.setup-py, 'setup.py')
run: |
python -m pip install --upgrade pip
pip install build setuptools wheel twine
pip install setuptools wheel twine
- name: Build and publish
if: contains(steps.need-pypi.outputs.setup-py, 'setup.py')
env:
TWINE_USERNAME: ${{ secrets.pypi_username }}
TWINE_PASSWORD: ${{ secrets.pypi_password }}
run: |
python -m build
python setup.py sdist
twine upload dist/*

2
.gitignore vendored
View file

@ -19,7 +19,6 @@ downloads/
eggs/
.eggs/
lib/
!tests/mock_device/lib/
lib64/
parts/
sdist/
@ -94,7 +93,6 @@ venv/
ENV/
env.bak/
venv.bak/
*_venv/
# Spyder project settings
.spyderproject

View file

@ -9,7 +9,7 @@ repos:
- id: black
exclude: "^tests/bad_python.py$"
- repo: https://github.com/pycqa/pylint
rev: v3.1.0
rev: v2.15.5
hooks:
- id: pylint
name: lint (examples)

View file

@ -246,7 +246,7 @@ ignore-docstrings=yes
ignore-imports=yes
# Minimum lines number of a similarity.
min-similarity-lines=8
min-similarity-lines=4
[BASIC]

View file

@ -25,24 +25,15 @@ Developer Setup
.. note::
Please try to use Python 3.9+ while developing Circup. This is so we can
Please try to use Python 3.6+ while developing CircUp. This is so we can
use the
`Black code formatter <https://black.readthedocs.io/en/stable/index.html>`_
and so that we're supporting versions which still receive security updates.
(which only works with Python 3.6+).
Clone the repository and from the root of the project,
install the requirements::
If you'd like you can setup a virtual environment and activate it.::
python3 -m venv .env
source .env/bin/activate
install the development requirements::
pip install -r optional_requirements.txt
pip install -e ".[dev]"
Run the test suite::
@ -100,7 +91,7 @@ subsequently used to facilitate the various commands the tool makes available.
These commands are defined at the very end of the ``circup.py`` code.
Unit tests can be found in the ``tests`` directory. Circup uses
Unit tests can be found in the ``tests`` directory. CircUp uses
`pytest <http://www.pytest.org/en/latest/>`_ style testing conventions. Test
functions should include a comment to describe its *intention*. We currently
have 100% unit test coverage for all the core functionality (excluding
@ -124,7 +115,7 @@ available options to help you work with the code base.
Before submitting a PR, please remember to ``pre-commit run --all-files``.
But if you forget the CI process in Github will run it for you. ;-)
Circup uses the `Click <https://click.palletsprojects.com>`_ module to
CircUp uses the `Click <https://click.palletsprojects.com/en/7.x/>`_ module to
run command-line interaction. The
`AppDirs <https://pypi.org/project/appdirs/>`_ module is used to determine
where to store user-specific assets created by the tool in such a way that

View file

@ -1,5 +1,5 @@
Circup
CircUp
======
.. image:: https://readthedocs.org/projects/circup/badge/?version=latest
@ -28,7 +28,7 @@ A tool to manage and update libraries (modules) on a CircuitPython device.
Installation
------------
Circup requires Python 3.9 or higher.
Circup requires Python 3.5 or higher.
In a `virtualenv <https://virtualenv.pypa.io/en/latest/>`_,
``pip install circup`` should do the trick. This is the simplest way to make it
@ -39,7 +39,7 @@ If you have no idea what a virtualenv is, try the following command,
.. note::
If you use the ``pip3`` command to install Circup you must make sure that
If you use the ``pip3`` command to install CircUp you must make sure that
your path contains the directory into which the script will be installed.
To discover this path,
@ -76,7 +76,7 @@ Usage
-----
If you need more detailed help using Circup see the Learn Guide article
`"Use Circup to easily keep your CircuitPython libraries up to date" <https://learn.adafruit.com/keep-your-circuitpython-libraries-on-devices-up-to-date-with-circup/>`_.
`"Use CircUp to easily keep your CircuitPython libraries up to date" <https://learn.adafruit.com/keep-your-circuitpython-libraries-on-devices-up-to-date-with-circup/>`_.
First, plug in a device running CircuiPython. This should appear as a mounted
storage device called ``CIRCUITPY``.
@ -95,8 +95,6 @@ To get help, just type the command::
--host TEXT Hostname or IP address of a device. Overrides automatic
path detection.
--password TEXT Password to use for authentication when --host is used.
--timeout INTEGER Specify the timeout in seconds for any network
operations.
--board-id TEXT Manual Board ID of the CircuitPython device. If provided
in combination with --cpy-version, it overrides the
detected board ID.
@ -109,7 +107,6 @@ To get help, just type the command::
bundle-add Add bundles to the local bundles list, by "user/repo"...
bundle-remove Remove one or more bundles from the local bundles list.
bundle-show Show the list of bundles, default and local, with URL,...
example Copy named example(s) from a bundle onto the device.
freeze Output details of all the modules found on the connected...
install Install a named module(s) onto the device.
list Lists all out of date modules found on the connected...
@ -230,7 +227,7 @@ The ``--version`` flag will tell you the current version of the
``circup`` command itself::
$ circup --version
Circup, A CircuitPython module updater. Version 0.0.1
CircUp, A CircuitPython module updater. Version 0.0.1
To use circup via the `Web Workflow <https://learn.adafruit.com/getting-started-with-web-workflow-using-the-code-editor>`_. on devices that support it. Use the ``--host`` and ``--password`` arguments before your circup command.::
@ -263,7 +260,6 @@ For Bash, add this to ~/.bashrc::
For Zsh, add this to ~/.zshrc::
autoload -U compinit; compinit
eval "$(_CIRCUP_COMPLETE=zsh_source circup)"
For Fish, add this to ~/.config/fish/completions/foo-bar.fish::

File diff suppressed because it is too large Load diff

View file

@ -16,6 +16,7 @@ import requests
from requests.adapters import HTTPAdapter
from requests.auth import HTTPBasicAuth
from circup.shared import DATA_DIR, BAD_FILE_FORMAT, extract_metadata, _get_modules_file
#: The location to store a local copy of code.py for use with --auto and
@ -29,10 +30,9 @@ class Backend:
implementations
"""
def __init__(self, logger, version_override=None):
def __init__(self, logger):
self.device_location = None
self.LIB_DIR_PATH = None
self.version_override = version_override
self.logger = logger
def get_circuitpython_version(self):
@ -73,39 +73,27 @@ class Backend:
"""
return self.get_modules(os.path.join(self.device_location, self.LIB_DIR_PATH))
def create_directory(self, device_path, directory):
def _create_library_directory(self, device_path, library_path):
"""
To be overridden by subclass
"""
raise NotImplementedError
def install_module_py(self, metadata, location=None):
def _install_module_py(self, metadata):
"""
To be overridden by subclass
"""
raise NotImplementedError
def install_module_mpy(self, bundle, metadata):
def _install_module_mpy(self, bundle, metadata):
"""
To be overridden by subclass
"""
raise NotImplementedError
def copy_file(self, target_file, location_to_paste):
"""Paste a copy of the specified file at the location given
To be overridden by subclass
"""
raise NotImplementedError
def upload_file(self, target_file, location_to_paste):
"""Paste a copy of the specified file at the location given
To be overridden by subclass
"""
raise NotImplementedError
# pylint: disable=too-many-locals,too-many-branches,too-many-arguments,too-many-nested-blocks,too-many-statements
# pylint: disable=too-many-locals,too-many-branches,too-many-arguments,too-many-nested-blocks
def install_module(
self, device_path, device_modules, name, pyext, mod_names, upgrade=False
self, device_path, device_modules, name, pyext, mod_names
): # pragma: no cover
"""
Finds a connected device and installs a given module name if it
@ -120,51 +108,23 @@ class Backend:
source or from a pre-compiled module
:param mod_names: Dictionary of metadata from modules that can be generated
with get_bundle_versions()
:param bool upgrade: Upgrade the specified modules if they're already installed.
"""
local_path = None
if os.path.exists(name):
# local file exists use that.
local_path = name
name = local_path.split(os.path.sep)[-1]
name = name.replace(".py", "").replace(".mpy", "")
click.echo(f"Installing from local path: {local_path}")
if not name:
click.echo("No module name(s) provided.")
return
if name in mod_names or local_path is not None:
elif name in mod_names:
# Grab device modules to check if module already installed
if name in device_modules:
if not upgrade:
# skip already installed modules if no -upgrade flag
click.echo("'{}' is already installed.".format(name))
return
click.echo("'{}' is already installed.".format(name))
return
# uninstall the module before installing
name = name.lower()
_mod_names = {}
for module_item, _metadata in device_modules.items():
_mod_names[module_item.replace(".py", "").lower()] = _metadata
if name in _mod_names:
_metadata = _mod_names[name]
module_path = _metadata["path"]
self.uninstall(device_path, module_path)
new_module_size = 0
library_path = (
os.path.join(device_path, self.LIB_DIR_PATH)
if not isinstance(self, WebBackend)
else urljoin(device_path, self.LIB_DIR_PATH)
)
if local_path is None:
metadata = mod_names[name]
bundle = metadata["bundle"]
else:
metadata = {"path": local_path}
new_module_size = os.path.getsize(metadata["path"])
metadata = mod_names[name]
bundle = metadata["bundle"]
bundle.size = os.path.getsize(metadata["path"])
if os.path.isdir(metadata["path"]):
# pylint: disable=unused-variable
for dirpath, dirnames, filenames in os.walk(metadata["path"]):
@ -172,7 +132,7 @@ class Backend:
fp = os.path.join(dirpath, f)
try:
if not os.path.islink(fp): # Ignore symbolic links
new_module_size += os.path.getsize(fp)
bundle.size += os.path.getsize(fp)
else:
self.logger.warning(
f"Skipping symbolic link in space calculation: {fp}"
@ -182,29 +142,27 @@ class Backend:
f"Error: {e} - Skipping file in space calculation: {fp}"
)
if self.get_free_space() < new_module_size:
if self.get_free_space() < bundle.size:
self.logger.error(
f"Aborted installing module {name} - "
f"not enough free space ({new_module_size} < {self.get_free_space()})"
f"not enough free space ({bundle.size} < {self.get_free_space()})"
)
click.secho(
f"Aborted installing module {name} - "
f"not enough free space ({new_module_size} < {self.get_free_space()})",
f"not enough free space ({bundle.size} < {self.get_free_space()})",
fg="red",
)
return
# Create the library directory first.
self.create_directory(device_path, library_path)
if local_path is None:
if pyext:
# Use Python source for module.
self.install_module_py(metadata)
else:
# Use pre-compiled mpy modules.
self.install_module_mpy(bundle, metadata)
self._create_library_directory(device_path, library_path)
if pyext:
# Use Python source for module.
self._install_module_py(metadata)
else:
self.copy_file(metadata["path"], "lib")
# Use pre-compiled mpy modules.
self._install_module_mpy(bundle, metadata)
click.echo("Installed '{}'.".format(name))
else:
click.echo("Unknown module named, '{}'.".format(name))
@ -233,12 +191,6 @@ class Backend:
"""
raise NotImplementedError
def get_file_content(self, target_file):
"""
To be overridden by subclass
"""
raise NotImplementedError
def get_free_space(self):
"""
To be overridden by subclass
@ -267,12 +219,6 @@ class Backend:
board_id = ""
return circuit_python, board_id
def file_exists(self, filepath):
"""
To be overriden by subclass
"""
raise NotImplementedError
def _writeable_error():
click.secho(
@ -288,14 +234,10 @@ class WebBackend(Backend):
Backend for interacting with a device via Web Workflow
"""
def __init__( # pylint: disable=too-many-arguments
self, host, port, password, logger, timeout=10, version_override=None
):
def __init__(self, host, password, logger, timeout=10):
super().__init__(logger)
if password is None:
raise ValueError(
"Must pass --password or set CIRCUP_WEBWORKFLOW_PASSWORD environment variable"
)
raise ValueError("--host needs --password")
# pylint: disable=no-member
# verify hostname/address
@ -308,37 +250,24 @@ class WebBackend(Backend):
else "Could not find or connect to specified device"
) from exc
self.FS_PATH = "fs/"
self.LIB_DIR_PATH = f"{self.FS_PATH}lib/"
self.LIB_DIR_PATH = "fs/lib/"
self.host = host
self.port = port
self.password = password
self.device_location = f"http://:{self.password}@{self.host}:{self.port}"
self.device_location = f"http://:{self.password}@{self.host}"
self.session = requests.Session()
self.session.mount(self.device_location, HTTPAdapter(max_retries=5))
self.library_path = self.device_location + "/" + self.LIB_DIR_PATH
self.timeout = timeout
self.version_override = version_override
self.FS_URL = urljoin(self.device_location, self.FS_PATH)
def __repr__(self):
return f"<WebBackend @{self.device_location}>"
def install_file_http(self, source, location=None):
def install_file_http(self, source):
"""
Install file to device using web workflow.
:param source source file.
:param location the location on the device to copy the source
directory in to. If omitted is CIRCUITPY/lib/ used.
"""
file_name = source.split(os.path.sep)
file_name = file_name[-2] if file_name[-1] == "" else file_name[-1]
if location is None:
target = self.device_location + "/" + self.LIB_DIR_PATH + file_name
else:
target = self.device_location + "/" + self.FS_PATH + location + file_name
target = self.device_location + "/" + self.LIB_DIR_PATH + file_name
auth = HTTPBasicAuth("", self.password)
@ -348,19 +277,14 @@ class WebBackend(Backend):
_writeable_error()
r.raise_for_status()
def install_dir_http(self, source, location=None):
def install_dir_http(self, source):
"""
Install directory to device using web workflow.
:param source source directory.
:param location the location on the device to copy the source
directory in to. If omitted is CIRCUITPY/lib/ used.
"""
mod_name = source.split(os.path.sep)
mod_name = mod_name[-2] if mod_name[-1] == "" else mod_name[-1]
if location is None:
target = self.device_location + "/" + self.LIB_DIR_PATH + mod_name
else:
target = self.device_location + "/" + self.FS_PATH + location + mod_name
target = self.device_location + "/" + self.LIB_DIR_PATH + mod_name
target = target + "/" if target[:-1] != "/" else target
url = urlparse(target)
auth = HTTPBasicAuth("", url.password)
@ -424,9 +348,6 @@ class WebBackend(Backend):
:return: A tuple with the version string for CircuitPython and the board ID string.
"""
if self.version_override is not None:
return self.version_override
# pylint: disable=arguments-renamed
with self.session.get(
self.device_location + "/cp/version.json", timeout=self.timeout
@ -561,84 +482,15 @@ class WebBackend(Backend):
metadata["path"] = sfm_url
result[sfm[:idx]] = metadata
def create_directory(self, device_path, directory):
auth = HTTPBasicAuth("", self.password)
with self.session.put(directory, auth=auth, timeout=self.timeout) as r:
def _create_library_directory(self, device_path, library_path):
url = urlparse(device_path)
auth = HTTPBasicAuth("", url.password)
with self.session.put(library_path, auth=auth, timeout=self.timeout) as r:
if r.status_code == 409:
_writeable_error()
r.raise_for_status()
def copy_file(self, target_file, location_to_paste):
if os.path.isdir(target_file):
create_directory_url = urljoin(
self.device_location,
"/".join(("fs", location_to_paste, target_file, "")),
)
self.create_directory(self.device_location, create_directory_url)
self.install_dir_http(target_file)
else:
self.install_file_http(target_file)
def upload_file(self, target_file, location_to_paste):
"""
copy a file from the host PC to the microcontroller
:param target_file: file on the host PC to copy
:param location_to_paste: Location on the microcontroller to paste it.
:return:
"""
if os.path.isdir(target_file):
create_directory_url = urljoin(
self.device_location,
"/".join(("fs", location_to_paste, target_file, "")),
)
self.create_directory(self.device_location, create_directory_url)
self.install_dir_http(target_file, location_to_paste)
else:
self.install_file_http(target_file, location_to_paste)
def download_file(self, target_file, location_to_paste):
"""
Download a file from the MCU device to the local host PC
:param target_file: The file on the MCU to download
:param location_to_paste: The location on the host PC to put the downloaded copy.
:return:
"""
auth = HTTPBasicAuth("", self.password)
with self.session.get(
self.FS_URL + target_file, timeout=self.timeout, auth=auth
) as r:
if r.status_code == 404:
click.secho(f"{target_file} was not found on the device", "red")
file_name = target_file.split("/")[-1]
if location_to_paste is None:
with open(file_name, "wb") as f:
f.write(r.content)
click.echo(f"Downloaded File: {file_name}")
else:
with open(os.path.join(location_to_paste, file_name), "wb") as f:
f.write(r.content)
click.echo(
f"Downloaded File: {os.path.join(location_to_paste, file_name)}"
)
def get_file_content(self, target_file):
"""
Get the content of a file from the MCU drive
:param target_file: The file on the MCU to download
:return:
"""
auth = HTTPBasicAuth("", self.password)
with self.session.get(
self.FS_URL + target_file, timeout=self.timeout, auth=auth
) as r:
if r.status_code == 404:
return None
return r.content # .decode("utf8")
def install_module_mpy(self, bundle, metadata):
def _install_module_mpy(self, bundle, metadata):
"""
:param bundle library bundle.
:param library_path library path
@ -662,7 +514,7 @@ class WebBackend(Backend):
raise IOError("Cannot find compiled version of module.")
# pylint: enable=too-many-locals,too-many-branches
def install_module_py(self, metadata, location=None):
def _install_module_py(self, metadata):
"""
:param library_path library path
:param metadata dictionary.
@ -670,10 +522,23 @@ class WebBackend(Backend):
source_path = metadata["path"] # Path to Python source version.
if os.path.isdir(source_path):
self.install_dir_http(source_path, location=location)
self.install_dir_http(source_path)
else:
self.install_file_http(source_path, location=location)
self.install_file_http(source_path)
def get_auto_file_path(self, auto_file_path):
"""
Make a local temp copy of the --auto file from the device.
Returns the path to the local copy.
"""
url = auto_file_path
auth = HTTPBasicAuth("", self.password)
with self.session.get(url, auth=auth, timeout=self.timeout) as r:
r.raise_for_status()
with open(LOCAL_CODE_PY_COPY, "w", encoding="utf-8") as f:
f.write(r.text)
return LOCAL_CODE_PY_COPY
def uninstall(self, device_path, module_path):
"""
@ -695,18 +560,6 @@ class WebBackend(Backend):
"""
self._update_http(module)
def file_exists(self, filepath):
"""
return True if the file exists, otherwise False.
"""
auth = HTTPBasicAuth("", self.password)
resp = requests.get(
self.get_file_path(filepath), auth=auth, timeout=self.timeout
)
if resp.status_code == 200:
return True
return False
def _update_http(self, module):
"""
Update the module using web workflow.
@ -728,7 +581,11 @@ class WebBackend(Backend):
"""
retuns the full path on the device to a given file name.
"""
return "/".join((self.device_location, "fs", filename))
return urljoin(
urljoin(self.device_location, "fs/", allow_fragments=False),
filename,
allow_fragments=False,
)
def is_device_present(self):
"""
@ -799,19 +656,6 @@ class WebBackend(Backend):
return r.json()["free"] * r.json()["block_size"] # bytes
sys.exit(1)
def list_dir(self, dirpath):
"""
Returns the list of files located in the given dirpath.
"""
auth = HTTPBasicAuth("", self.password)
with self.session.get(
urljoin(self.device_location, f"fs/{dirpath if dirpath else ''}"),
auth=auth,
headers={"Accept": "application/json"},
timeout=self.timeout,
) as r:
return r.json()["files"]
class DiskBackend(Backend):
"""
@ -821,10 +665,9 @@ class DiskBackend(Backend):
:param logger: logger to use for outputting messages
:param String boot_out: Optional mock contents of a boot_out.txt file
to use for version information.
:param String version_override: Optional mock version to use.
"""
def __init__(self, device_location, logger, boot_out=None, version_override=None):
def __init__(self, device_location, logger, boot_out=None):
if device_location is None:
raise ValueError(
"Auto locating USB Disk based device failed. "
@ -838,7 +681,6 @@ class DiskBackend(Backend):
self.version_info = None
if boot_out is not None:
self.version_info = self.parse_boot_out_file(boot_out)
self.version_override = version_override
def get_circuitpython_version(self):
"""
@ -855,9 +697,6 @@ class DiskBackend(Backend):
:return: A tuple with the version string for CircuitPython and the board ID string.
"""
if self.version_override is not None:
return self.version_override
if not self.version_info:
try:
with open(
@ -890,29 +729,14 @@ class DiskBackend(Backend):
"""
return _get_modules_file(device_lib_path, self.logger)
def create_directory(self, device_path, directory):
if not os.path.exists(directory): # pragma: no cover
os.makedirs(directory)
def _create_library_directory(self, device_path, library_path):
if not os.path.exists(library_path): # pragma: no cover
os.makedirs(library_path)
def copy_file(self, target_file, location_to_paste):
target_filename = target_file.split(os.path.sep)[-1]
if os.path.isdir(target_file):
shutil.copytree(
target_file,
os.path.join(self.device_location, location_to_paste, target_filename),
)
else:
shutil.copyfile(
target_file,
os.path.join(self.device_location, location_to_paste, target_filename),
)
def upload_file(self, target_file, location_to_paste):
self.copy_file(target_file, location_to_paste)
def install_module_mpy(self, bundle, metadata):
def _install_module_mpy(self, bundle, metadata):
"""
:param bundle library bundle.
:param library_path library path
:param metadata dictionary.
"""
module_name = os.path.basename(metadata["path"]).replace(".py", ".mpy")
@ -939,32 +763,30 @@ class DiskBackend(Backend):
raise IOError("Cannot find compiled version of module.")
# pylint: enable=too-many-locals,too-many-branches
def install_module_py(self, metadata, location=None):
def _install_module_py(self, metadata):
"""
:param library_path library path
:param metadata dictionary.
:param location the location on the device to copy the py module to.
If omitted is CIRCUITPY/lib/ used.
"""
if location is None:
location = self.library_path
else:
location = os.path.join(self.device_location, location)
source_path = metadata["path"] # Path to Python source version.
if os.path.isdir(source_path):
target = os.path.basename(os.path.dirname(source_path))
target_path = os.path.join(location, target)
target_path = os.path.join(self.library_path, target)
# Copy the directory.
shutil.copytree(source_path, target_path)
else:
if "target_name" in metadata:
target = metadata["target_name"]
else:
target = os.path.basename(source_path)
target_path = os.path.join(location, target)
target = os.path.basename(source_path)
target_path = os.path.join(self.library_path, target)
# Copy file.
shutil.copyfile(source_path, target_path)
def get_auto_file_path(self, auto_file_path):
"""
Returns the path on the device to the file to be read for --auto.
"""
return auto_file_path
def uninstall(self, device_path, module_path):
"""
Uninstall module using local file system.
@ -1003,30 +825,12 @@ class DiskBackend(Backend):
os.remove(module.path)
shutil.copyfile(module.bundle_path, module.path)
def file_exists(self, filepath):
"""
return True if the file exists, otherwise False.
"""
return os.path.exists(os.path.join(self.device_location, filepath))
def get_file_path(self, filename):
"""
returns the full path on the device to a given file name.
"""
return os.path.join(self.device_location, filename)
def get_file_content(self, target_file):
"""
Get the content of a file from the MCU drive
:param target_file: The file on the MCU to download
:return:
"""
file_path = self.get_file_path(target_file)
if os.path.exists(file_path):
with open(file_path, "rb") as file:
return file.read()
return None
def is_device_present(self):
"""
returns True if the device is currently connected
@ -1040,22 +844,3 @@ class DiskBackend(Backend):
# pylint: disable=unused-variable
_, total, free = shutil.disk_usage(self.device_location)
return free
def list_dir(self, dirpath):
"""
Returns the list of files located in the given dirpath.
"""
files_list = []
files = os.listdir(os.path.join(self.device_location, dirpath))
for file_name in files:
file = os.path.join(self.device_location, dirpath, file_name)
stat = os.stat(file)
files_list.append(
{
"name": file_name,
"directory": os.path.isdir(file),
"modified_ns": stat.st_mtime_ns,
"file_size": stat.st_size,
}
)
return files_list

View file

@ -1,170 +0,0 @@
# SPDX-FileCopyrightText: 2019 Nicholas Tollervey, 2024 Tim Cocks, written for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
Class that represents a specific release of a Bundle.
"""
import os
import sys
import click
import requests
from circup.shared import (
DATA_DIR,
PLATFORMS,
REQUESTS_TIMEOUT,
tags_data_load,
get_latest_release_from_url,
)
from circup.logging import logger
class Bundle:
"""
All the links and file names for a bundle
"""
def __init__(self, repo):
"""
Initialise a Bundle created from its github info.
Construct all the strings in one place.
:param str repo: Repository string for github: "user/repository"
"""
vendor, bundle_id = repo.split("/")
bundle_id = bundle_id.lower().replace("_", "-")
self.key = repo
#
self.url = "https://github.com/" + repo
self.basename = bundle_id + "-{platform}-{tag}"
self.urlzip = self.basename + ".zip"
self.dir = os.path.join(DATA_DIR, vendor, bundle_id + "-{platform}")
self.zip = os.path.join(DATA_DIR, bundle_id + "-{platform}.zip")
self.url_format = self.url + "/releases/download/{tag}/" + self.urlzip
# tag
self._current = None
self._latest = None
def lib_dir(self, platform):
"""
This bundle's lib directory for the platform.
:param str platform: The platform identifier (py/6mpy/...).
:return: The path to the lib directory for the platform.
"""
tag = self.current_tag
return os.path.join(
self.dir.format(platform=platform),
self.basename.format(platform=PLATFORMS[platform], tag=tag),
"lib",
)
def examples_dir(self, platform):
"""
This bundle's examples directory for the platform.
:param str platform: The platform identifier (py/6mpy/...).
:return: The path to the examples directory for the platform.
"""
tag = self.current_tag
return os.path.join(
self.dir.format(platform=platform),
self.basename.format(platform=PLATFORMS[platform], tag=tag),
"examples",
)
def requirements_for(self, library_name, toml_file=False):
"""
The requirements file for this library.
:param str library_name: The name of the library.
:return: The path to the requirements.txt file.
"""
platform = "py"
tag = self.current_tag
found_file = os.path.join(
self.dir.format(platform=platform),
self.basename.format(platform=PLATFORMS[platform], tag=tag),
"requirements",
library_name,
"requirements.txt" if not toml_file else "pyproject.toml",
)
if os.path.isfile(found_file):
with open(found_file, "r", encoding="utf-8") as read_this:
return read_this.read()
return None
@property
def current_tag(self):
"""
Lazy load current cached tag from the BUNDLE_DATA json file.
:return: The current cached tag value for the project.
"""
if self._current is None:
self._current = tags_data_load(logger).get(self.key, "0")
return self._current
@current_tag.setter
def current_tag(self, tag):
"""
Set the current cached tag (after updating).
:param str tag: The new value for the current tag.
:return: The current cached tag value for the project.
"""
self._current = tag
@property
def latest_tag(self):
"""
Lazy find the value of the latest tag for the bundle.
:return: The most recent tag value for the project.
"""
if self._latest is None:
self._latest = get_latest_release_from_url(
self.url + "/releases/latest", logger
)
return self._latest
def validate(self):
"""
Test the existence of the expected URLs (not their content)
"""
tag = self.latest_tag
if not tag or tag == "releases":
if "--verbose" in sys.argv:
click.secho(f' Invalid tag "{tag}"', fg="red")
return False
for platform in PLATFORMS.values():
url = self.url_format.format(platform=platform, tag=tag)
r = requests.get(url, stream=True, timeout=REQUESTS_TIMEOUT)
# pylint: disable=no-member
if r.status_code != requests.codes.ok:
if "--verbose" in sys.argv:
click.secho(f" Unable to find {os.path.split(url)[1]}", fg="red")
return False
# pylint: enable=no-member
return True
def __repr__(self):
"""
Helps with log files.
:return: A repr of a dictionary containing the Bundles's metadata.
"""
return repr(
{
"key": self.key,
"url": self.url,
"urlzip": self.urlzip,
"dir": self.dir,
"zip": self.zip,
"url_format": self.url_format,
"current": self._current,
"latest": self._latest,
}
)

View file

@ -1,843 +0,0 @@
# SPDX-FileCopyrightText: 2019 Nicholas Tollervey, 2024 Tim Cocks, written for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
Functions called from commands in order to provide behaviors and return information.
"""
import ast
import ctypes
import glob
import os
from subprocess import check_output
import sys
import shutil
import zipfile
import json
import re
import toml
import requests
import click
from circup.shared import (
PLATFORMS,
REQUESTS_TIMEOUT,
_get_modules_file,
BUNDLE_CONFIG_OVERWRITE,
BUNDLE_CONFIG_FILE,
BUNDLE_CONFIG_LOCAL,
BUNDLE_DATA,
NOT_MCU_LIBRARIES,
tags_data_load,
)
from circup.logging import logger
from circup.module import Module
from circup.bundle import Bundle
WARNING_IGNORE_MODULES = (
"typing-extensions",
"pyasn1",
"circuitpython-typing",
)
CODE_FILES = [
"code.txt",
"code.py",
"main.py",
"main.txt",
"code.txt.py",
"code.py.txt",
"code.txt.txt",
"code.py.py",
"main.txt.py",
"main.py.txt",
"main.txt.txt",
"main.py.py",
]
class CodeParsingException(Exception):
"""Exception thrown when parsing code with ast fails"""
def clean_library_name(assumed_library_name):
"""
Most CP repos and library names are look like this:
repo: Adafruit_CircuitPython_LC709203F
library: adafruit_lc709203f
But some do not and this handles cleaning that up.
Also cleans up if the pypi or reponame is passed in instead of the
CP library name.
:param str assumed_library_name: An assumed name of a library from user
or requirements.txt entry
:return: str proper library name
"""
not_standard_names = {
# Assumed Name : Actual Name
"adafruit_adafruitio": "adafruit_io",
"adafruit_asyncio": "asyncio",
"adafruit_busdevice": "adafruit_bus_device",
"adafruit_connectionmanager": "adafruit_connection_manager",
"adafruit_display_button": "adafruit_button",
"adafruit_neopixel": "neopixel",
"adafruit_sd": "adafruit_sdcard",
"adafruit_simpleio": "simpleio",
"pimoroni_ltr559": "pimoroni_circuitpython_ltr559",
}
if "circuitpython" in assumed_library_name:
# convert repo or pypi name to common library name
assumed_library_name = (
assumed_library_name.replace("-circuitpython-", "_")
.replace("_circuitpython_", "_")
.replace("-", "_")
)
if assumed_library_name in not_standard_names:
return not_standard_names[assumed_library_name]
return assumed_library_name
def completion_for_install(ctx, param, incomplete):
"""
Returns the list of available modules for the command line tab-completion
with the ``circup install`` command.
"""
# pylint: disable=unused-argument
available_modules = get_bundle_versions(get_bundles_list(), avoid_download=True)
module_names = {m.replace(".py", "") for m in available_modules}
if incomplete:
module_names = [name for name in module_names if name.startswith(incomplete)]
module_names.extend(glob.glob(f"{incomplete}*"))
return sorted(module_names)
def completion_for_example(ctx, param, incomplete):
"""
Returns the list of available modules for the command line tab-completion
with the ``circup example`` command.
"""
# pylint: disable=unused-argument, consider-iterating-dictionary
available_examples = get_bundle_examples(get_bundles_list(), avoid_download=True)
matching_examples = [
example_path
for example_path in available_examples.keys()
if example_path.startswith(incomplete)
]
return sorted(matching_examples)
def ensure_latest_bundle(bundle):
"""
Ensure that there's a copy of the latest library bundle available so circup
can check the metadata contained therein.
:param Bundle bundle: the target Bundle object.
"""
logger.info("Checking library updates for %s.", bundle.key)
tag = bundle.latest_tag
do_update = False
if tag == bundle.current_tag:
for platform in PLATFORMS:
# missing directories (new platform added on an existing install
# or side effect of pytest or network errors)
do_update = do_update or not os.path.isdir(bundle.lib_dir(platform))
else:
do_update = True
if do_update:
logger.info("New version available (%s).", tag)
try:
get_bundle(bundle, tag)
tags_data_save_tag(bundle.key, tag)
except requests.exceptions.HTTPError as ex:
# See #20 for reason for this
click.secho(
(
"There was a problem downloading that platform bundle. "
"Skipping and using existing download if available."
),
fg="red",
)
logger.exception(ex)
else:
logger.info("Current bundle up to date %s.", tag)
def find_device():
"""
Return the location on the filesystem for the connected CircuitPython device.
This is based upon how Mu discovers this information.
:return: The path to the device on the local filesystem.
"""
device_dir = None
# Attempt to find the path on the filesystem that represents the plugged in
# CIRCUITPY board.
if os.name == "posix":
# Linux / OSX
for mount_command in ["mount", "/sbin/mount"]:
try:
mount_output = check_output(mount_command).splitlines()
mounted_volumes = [x.split()[2] for x in mount_output]
for volume in mounted_volumes:
if volume.endswith(b"CIRCUITPY"):
device_dir = volume.decode("utf-8")
except FileNotFoundError:
continue
elif os.name == "nt":
# Windows
def get_volume_name(disk_name):
"""
Each disk or external device connected to windows has an attribute
called "volume name". This function returns the volume name for the
given disk/device.
Based upon answer given here: http://stackoverflow.com/a/12056414
"""
vol_name_buf = ctypes.create_unicode_buffer(1024)
ctypes.windll.kernel32.GetVolumeInformationW(
ctypes.c_wchar_p(disk_name),
vol_name_buf,
ctypes.sizeof(vol_name_buf),
None,
None,
None,
None,
0,
)
return vol_name_buf.value
#
# In certain circumstances, volumes are allocated to USB
# storage devices which cause a Windows popup to raise if their
# volume contains no media. Wrapping the check in SetErrorMode
# with SEM_FAILCRITICALERRORS (1) prevents this popup.
#
old_mode = ctypes.windll.kernel32.SetErrorMode(1)
try:
for disk in "ABCDEFGHIJKLMNOPQRSTUVWXYZ":
path = "{}:\\".format(disk)
if os.path.exists(path) and get_volume_name(path) == "CIRCUITPY":
device_dir = path
# Report only the FIRST device found.
break
finally:
ctypes.windll.kernel32.SetErrorMode(old_mode)
else:
# No support for unknown operating systems.
raise NotImplementedError('OS "{}" not supported.'.format(os.name))
logger.info("Found device: %s", device_dir)
return device_dir
def find_modules(backend, bundles_list):
"""
Extracts metadata from the connected device and available bundles and
returns this as a list of Module instances representing the modules on the
device.
:param Backend backend: Backend with the device connection.
:param List[Bundle] bundles_list: List of supported bundles as Bundle objects.
:return: A list of Module instances describing the current state of the
modules on the connected device.
"""
# pylint: disable=broad-except,too-many-locals
try:
device_modules = backend.get_device_versions()
bundle_modules = get_bundle_versions(bundles_list)
result = []
for key, device_metadata in device_modules.items():
if key in bundle_modules:
path = device_metadata["path"]
bundle_metadata = bundle_modules[key]
repo = bundle_metadata.get("__repo__")
bundle = bundle_metadata.get("bundle")
device_version = device_metadata.get("__version__")
bundle_version = bundle_metadata.get("__version__")
mpy = device_metadata["mpy"]
compatibility = device_metadata.get("compatibility", (None, None))
module_name = (
path.split(os.sep)[-1]
if not path.endswith(os.sep)
else path[:-1].split(os.sep)[-1] + os.sep
)
m = Module(
module_name,
backend,
repo,
device_version,
bundle_version,
mpy,
bundle,
compatibility,
)
result.append(m)
return result
except Exception as ex:
# If it's not possible to get the device and bundle metadata, bail out
# with a friendly message and indication of what's gone wrong.
logger.exception(ex)
click.echo("There was a problem: {}".format(ex))
sys.exit(1)
# pylint: enable=broad-except,too-many-locals
def get_bundle(bundle, tag):
"""
Downloads and extracts the version of the bundle with the referenced tag.
The resulting zip file is saved on the local filesystem.
:param Bundle bundle: the target Bundle object.
:param str tag: The GIT tag to use to download the bundle.
"""
click.echo(f"Downloading latest bundles for {bundle.key} ({tag}).")
for platform, github_string in PLATFORMS.items():
# Report the platform: "8.x-mpy", etc.
click.echo(f"{github_string}:")
url = bundle.url_format.format(platform=github_string, tag=tag)
logger.info("Downloading bundle: %s", url)
r = requests.get(url, stream=True, timeout=REQUESTS_TIMEOUT)
# pylint: disable=no-member
if r.status_code != requests.codes.ok:
logger.warning("Unable to connect to %s", url)
r.raise_for_status()
# pylint: enable=no-member
total_size = int(r.headers.get("Content-Length"))
temp_zip = bundle.zip.format(platform=platform)
with click.progressbar(
r.iter_content(1024), label="Extracting:", length=total_size
) as pbar, open(temp_zip, "wb") as zip_fp:
for chunk in pbar:
zip_fp.write(chunk)
pbar.update(len(chunk))
logger.info("Saved to %s", temp_zip)
temp_dir = bundle.dir.format(platform=platform)
if os.path.isdir(temp_dir):
shutil.rmtree(temp_dir)
with zipfile.ZipFile(temp_zip, "r") as zfile:
zfile.extractall(temp_dir)
bundle.current_tag = tag
click.echo("\nOK\n")
def get_bundle_examples(bundles_list, avoid_download=False):
"""
Return a dictionary of metadata from examples in the all of the bundles
specified by bundles_list argument.
:param List[Bundle] bundles_list: List of supported bundles as Bundle objects.
:param bool avoid_download: if True, download the bundle only if missing.
:return: A dictionary of metadata about the examples available in the
library bundle.
"""
# pylint: disable=too-many-nested-blocks,too-many-locals
all_the_examples = dict()
bundle_examples = dict()
try:
for bundle in bundles_list:
if not avoid_download or not os.path.isdir(bundle.lib_dir("py")):
ensure_latest_bundle(bundle)
path = bundle.examples_dir("py")
meta_saved = os.path.join(path, "../bundle_examples.json")
if os.path.exists(meta_saved):
with open(meta_saved, "r", encoding="utf-8") as f:
bundle_examples = json.load(f)
all_the_examples.update(bundle_examples)
bundle_examples.clear()
continue
path_examples = _get_modules_file(path, logger)
for lib_name, lib_metadata in path_examples.items():
for _dir_level in os.walk(lib_metadata["path"]):
for _file in _dir_level[2]:
_parts = _dir_level[0].split(os.path.sep)
_lib_name_index = _parts.index(lib_name)
_dirs = _parts[_lib_name_index:]
if _dirs[-1] == "":
_dirs.pop(-1)
slug = f"{os.path.sep}".join(_dirs + [_file.replace(".py", "")])
bundle_examples[slug] = os.path.join(_dir_level[0], _file)
all_the_examples[slug] = os.path.join(_dir_level[0], _file)
with open(meta_saved, "w", encoding="utf-8") as f:
json.dump(bundle_examples, f)
bundle_examples.clear()
except NotADirectoryError:
# Bundle does not have new style examples directory
# so we cannot include its examples.
pass
return all_the_examples
def get_bundle_versions(bundles_list, avoid_download=False):
"""
Returns a dictionary of metadata from modules in the latest known release
of the library bundle. Uses the Python version (rather than the compiled
version) of the library modules.
:param List[Bundle] bundles_list: List of supported bundles as Bundle objects.
:param bool avoid_download: if True, download the bundle only if missing.
:return: A dictionary of metadata about the modules available in the
library bundle.
"""
all_the_modules = dict()
for bundle in bundles_list:
if not avoid_download or not os.path.isdir(bundle.lib_dir("py")):
ensure_latest_bundle(bundle)
path = bundle.lib_dir("py")
path_modules = _get_modules_file(path, logger)
for name, module in path_modules.items():
module["bundle"] = bundle
if name not in all_the_modules: # here we decide the order of priority
all_the_modules[name] = module
return all_the_modules
def get_bundles_dict():
"""
Retrieve the dictionary from BUNDLE_CONFIG_FILE (JSON).
Put the local dictionary in front, so it gets priority.
It's a dictionary of bundle string identifiers.
:return: Combined dictionaries from the config files.
"""
bundle_dict = get_bundles_local_dict()
try:
with open(BUNDLE_CONFIG_OVERWRITE, "rb") as bundle_config_json:
bundle_config = json.load(bundle_config_json)
except (FileNotFoundError, json.decoder.JSONDecodeError):
with open(BUNDLE_CONFIG_FILE, "rb") as bundle_config_json:
bundle_config = json.load(bundle_config_json)
for name, bundle in bundle_config.items():
if bundle not in bundle_dict.values():
bundle_dict[name] = bundle
return bundle_dict
def get_bundles_local_dict():
"""
Retrieve the local bundles from BUNDLE_CONFIG_LOCAL (JSON).
:return: Raw dictionary from the config file(s).
"""
try:
with open(BUNDLE_CONFIG_LOCAL, "rb") as bundle_config_json:
bundle_config = json.load(bundle_config_json)
if not isinstance(bundle_config, dict) or not bundle_config:
logger.error("Local bundle list invalid. Skipped.")
raise FileNotFoundError("Bad local bundle list")
return bundle_config
except (FileNotFoundError, json.decoder.JSONDecodeError):
return dict()
def get_bundles_list():
"""
Retrieve the list of bundles from the config dictionary.
:return: List of supported bundles as Bundle objects.
"""
bundle_config = get_bundles_dict()
bundles_list = [Bundle(bundle_config[b]) for b in bundle_config]
logger.info("Using bundles: %s", ", ".join(b.key for b in bundles_list))
return bundles_list
def get_circup_version():
"""Return the version of circup that is running. If not available, return None.
:return: Current version of circup, or None.
"""
try:
from importlib import metadata # pylint: disable=import-outside-toplevel
except ImportError:
try:
import importlib_metadata as metadata # pylint: disable=import-outside-toplevel
except ImportError:
return None
try:
return metadata.version("circup")
except metadata.PackageNotFoundError:
return None
def get_dependencies(*requested_libraries, mod_names, to_install=()):
"""
Return a list of other CircuitPython libraries required by the given list
of libraries
:param tuple requested_libraries: The libraries to search for dependencies
:param object mod_names: All the modules metadata from bundle
:param list(str) to_install: Modules already selected for installation.
:return: tuple of module names to install which we build
"""
# pylint: disable=too-many-branches
# Internal variables
_to_install = to_install
_requested_libraries = []
_rl = requested_libraries[0]
if not requested_libraries[0]:
# If nothing is requested, we're done
return _to_install
for lib_name in _rl:
lower_lib_name = lib_name.lower()
if lower_lib_name in NOT_MCU_LIBRARIES:
logger.info(
"Skipping %s. It is not for microcontroller installs.", lib_name
)
else:
# Canonicalize, with some exceptions:
# adafruit-circuitpython-something => adafruit_something
canonical_lib_name = clean_library_name(lower_lib_name)
try:
# Don't process any names we can't find in mod_names
mod_names[canonical_lib_name] # pylint: disable=pointless-statement
_requested_libraries.append(canonical_lib_name)
except KeyError:
if canonical_lib_name not in WARNING_IGNORE_MODULES:
if os.path.exists(canonical_lib_name):
_requested_libraries.append(canonical_lib_name)
else:
click.secho(
f"WARNING:\n\t{canonical_lib_name} "
f"is not a known CircuitPython library.",
fg="yellow",
)
if not _requested_libraries:
# If nothing is requested, we're done
return _to_install
for library in list(_requested_libraries):
if library not in _to_install:
_to_install = _to_install + (library,)
# get the requirements.txt from bundle
try:
bundle = mod_names[library]["bundle"]
requirements_txt = bundle.requirements_for(library)
if requirements_txt:
_requested_libraries.extend(
libraries_from_requirements(requirements_txt)
)
circup_dependencies = get_circup_dependencies(bundle, library)
for circup_dependency in circup_dependencies:
_requested_libraries.append(circup_dependency)
except KeyError:
# don't check local file for further dependencies
pass
# we've processed this library, remove it from the list
_requested_libraries.remove(library)
return get_dependencies(
tuple(_requested_libraries), mod_names=mod_names, to_install=_to_install
)
def get_circup_dependencies(bundle, library):
"""
Get the list of circup dependencies from pyproject.toml
e.g.
[circup]
circup_dependencies = ["dependency_name_here"]
:param bundle: The Bundle to look within
:param library: The Library to find pyproject.toml for and get dependencies from
:return: The list of dependency libraries that were found
"""
try:
pyproj_toml = bundle.requirements_for(library, toml_file=True)
if pyproj_toml:
pyproj_toml_data = toml.loads(pyproj_toml)
dependencies = pyproj_toml_data["circup"]["circup_dependencies"]
if isinstance(dependencies, list):
return dependencies
if isinstance(dependencies, str):
return (dependencies,)
return tuple()
except KeyError:
# no circup_dependencies in pyproject.toml
return tuple()
def libraries_from_requirements(requirements):
"""
Clean up supplied requirements.txt and turn into tuple of CP libraries
:param str requirements: A string version of a requirements.txt
:return: tuple of library names
"""
libraries = ()
for line in requirements.split("\n"):
line = line.lower().strip()
if line.startswith("#") or line == "":
# skip comments
pass
else:
# Remove everything after any pip style version specifiers
line = re.split("[<>=~[;]", line)[0].strip()
libraries = libraries + (line,)
return libraries
def save_local_bundles(bundles_data):
"""
Save the list of local bundles to the settings.
:param str key: The bundle's identifier/key.
"""
if len(bundles_data) > 0:
with open(BUNDLE_CONFIG_LOCAL, "w", encoding="utf-8") as data:
json.dump(bundles_data, data)
else:
if os.path.isfile(BUNDLE_CONFIG_LOCAL):
os.unlink(BUNDLE_CONFIG_LOCAL)
def tags_data_save_tag(key, tag):
"""
Add or change the saved tag value for a bundle.
:param str key: The bundle's identifier/key.
:param str tag: The new tag for the bundle.
"""
tags_data = tags_data_load(logger)
tags_data[key] = tag
with open(BUNDLE_DATA, "w", encoding="utf-8") as data:
json.dump(tags_data, data)
def imports_from_code(full_content):
"""
Parse the given code.py file and return the imported libraries
Note that it's impossible at that level to differentiate between
import module.property and import module.submodule, so we try both
:param str full_content: Code to read imports from
:param str module_name: Name of the module the code is from
:return: sequence of library names
"""
# pylint: disable=too-many-branches
try:
par = ast.parse(full_content)
except (SyntaxError, ValueError) as err:
raise CodeParsingException(err) from err
imports = set()
for thing in ast.walk(par):
# import module and import module.submodule
if isinstance(thing, ast.Import):
for alias in thing.names:
imports.add(alias.name)
# from x import y
if isinstance(thing, ast.ImportFrom):
if thing.module:
# from [.][.]module import names
module = ("." * thing.level) + thing.module
imports.add(module)
for alias in thing.names:
imports.add(".".join([module, alias.name]))
else:
# from . import names
for alias in thing.names:
imports.add(alias.name)
# import parent modules (in practice it's the __init__.py)
for name in list(imports):
if "*" in name:
imports.remove(name)
continue
names = name.split(".")
for i in range(len(names)):
module = ".".join(names[: i + 1])
if module:
imports.add(module)
return sorted(imports)
def get_all_imports( # pylint: disable=too-many-arguments,too-many-locals, too-many-branches
backend, auto_file_content, auto_file_path, mod_names, current_module, visited=None
):
"""
Recursively retrieve imports from files on the backend
:param Backend backend: The current backend object
:param str auto_file_content: Content of the python file to analyse
:param str auto_file_path: Path to the python file to analyse
:param list mod_names: Lits of supported bundle mod names
:param str current_module: Name of the call context module if recursive call
:param set visited: Modules previously visited
:return: sequence of library names
"""
if visited is None:
visited = set()
visited.add(current_module)
requested_installs = []
try:
imports = imports_from_code(auto_file_content)
except CodeParsingException as err:
click.secho(f"Error parsing {current_module}:\n {err}", fg="red")
sys.exit(2)
for install in imports:
if install in visited:
continue
if install in mod_names:
requested_installs.append(install)
else:
# relative module paths
if install.startswith(".."):
install_module = ".".join(current_module.split(".")[:-2])
install_module = install_module + "." + install[2:]
elif install.startswith("."):
install_module = ".".join(current_module.split(".")[:-1])
install_module = install_module + "." + install[1:]
else:
install_module = install
# possible files for the module: .py or __init__.py (if directory)
file_name = os.path.join(*install_module.split(".")) + ".py"
try:
file_location = os.path.join(
*auto_file_path.replace(str(backend.device_location), "").split(
"/"
)[:-1]
)
full_location = os.path.join(file_location, file_name)
except TypeError:
# file is in root of CIRCUITPY
full_location = file_name
exists = backend.file_exists(full_location)
if not exists:
file_name = os.path.join(*install_module.split("."), "__init__.py")
full_location = file_name
exists = backend.file_exists(full_location)
if not exists:
continue
install_module += ".__init__"
# get the content and parse it recursively
auto_file_content = backend.get_file_content(full_location)
if auto_file_content:
sub_imports = get_all_imports(
backend,
auto_file_content,
auto_file_path,
mod_names,
install_module,
visited,
)
requested_installs.extend(sub_imports)
return sorted(requested_installs)
# [r for r in requested_installs if r in mod_names]
def libraries_from_auto_file(backend, auto_file, mod_names):
"""
Parse the input auto_file path and/or use the workflow to find the most
appropriate code.py script. Then return the list of imports
:param Backend backend: The current backend object
:param str auto_file: Path of the candidate auto file or None
:return: sequence of library names
"""
# find the current main file based on Circuitpython's rules
if auto_file is None:
root_files = [
file["name"] for file in backend.list_dir("") if not file["directory"]
]
for main_file in CODE_FILES:
if main_file in root_files:
auto_file = main_file
break
# still no code file found
if auto_file is None:
click.secho(
"No default code file found. See valid names:\n"
"https://docs.circuitpython.org/en/latest/README.html#behavior",
fg="red",
)
sys.exit(1)
# pass a local file with "./" or "../"
is_relative = auto_file.split(os.sep)[0] in [os.path.curdir, os.path.pardir]
if os.path.isabs(auto_file) or is_relative:
with open(auto_file, "r", encoding="UTF8") as fp:
auto_file_content = fp.read()
else:
auto_file_content = backend.get_file_content(auto_file)
if auto_file_content is None:
click.secho(f"Auto file not found: {auto_file}", fg="red")
sys.exit(1)
# from file name to module name (in case it's in a subpackage)
click.secho(f"Finding imports from: {auto_file}", fg="green")
current_module = auto_file.rstrip(".py").replace(os.path.sep, ".")
return get_all_imports(
backend, auto_file_content, auto_file, mod_names, current_module
)
def get_device_path(host, port, password, path):
"""
:param host Hostname or IP address.
:param password REST API password.
:param path File system path.
:return device URL or None if the device cannot be found.
"""
if path:
device_path = path
elif host:
# pylint: enable=no-member
device_path = f"http://:{password}@{host}:{port}"
else:
device_path = find_device()
return device_path
def sorted_by_directory_then_alpha(list_of_files):
"""
Sort the list of files into alphabetical seperated
with directories grouped together before files.
"""
dirs = {}
files = {}
for cur_file in list_of_files:
if cur_file["directory"]:
dirs[cur_file["name"]] = cur_file
else:
files[cur_file["name"]] = cur_file
sorted_dir_names = sorted(dirs.keys())
sorted_file_names = sorted(files.keys())
sorted_full_list = []
for cur_name in sorted_dir_names:
sorted_full_list.append(dirs[cur_name])
for cur_name in sorted_file_names:
sorted_full_list.append(files[cur_name])
return sorted_full_list

View file

@ -1,755 +0,0 @@
# SPDX-FileCopyrightText: 2019 Nicholas Tollervey, 2024 Tim Cocks, written for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
# ----------- CLI command definitions ----------- #
The following functions have IO side effects (for instance they emit to
stdout). Ergo, these are not checked with unit tests. Most of the
functionality they provide is provided by the functions from util_functions.py,
and the respective Backends which *are* tested. Most of the logic of the following
functions is to prepare things for presentation to / interaction with the user.
"""
import os
import subprocess
import time
import sys
import re
import logging
import update_checker
from semver import VersionInfo
import click
import requests
from circup.backends import WebBackend, DiskBackend
from circup.logging import logger, log_formatter, LOGFILE
from circup.shared import BOARDLESS_COMMANDS, get_latest_release_from_url
from circup.bundle import Bundle
from circup.command_utils import (
get_device_path,
get_circup_version,
find_modules,
get_bundles_list,
completion_for_install,
get_bundle_versions,
libraries_from_requirements,
libraries_from_auto_file,
get_dependencies,
get_bundles_local_dict,
save_local_bundles,
get_bundles_dict,
completion_for_example,
get_bundle_examples,
)
@click.group()
@click.option(
"--verbose", is_flag=True, help="Comprehensive logging is sent to stdout."
)
@click.option(
"--path",
type=click.Path(exists=True, file_okay=False),
help="Path to CircuitPython directory. Overrides automatic path detection.",
)
@click.option(
"--host",
help="Hostname or IP address of a device. Overrides automatic path detection.",
)
@click.option(
"--port", help="Port to contact. Overrides automatic path detection.", default=80
)
@click.option(
"--password",
help="Password to use for authentication when --host is used."
" You can optionally set an environment variable CIRCUP_WEBWORKFLOW_PASSWORD"
" instead of passing this argument. If both exist the CLI arg takes precedent.",
)
@click.option(
"--timeout",
default=30,
help="Specify the timeout in seconds for any network operations.",
)
@click.option(
"--board-id",
default=None,
help="Manual Board ID of the CircuitPython device. If provided in combination "
"with --cpy-version, it overrides the detected board ID.",
)
@click.option(
"--cpy-version",
default=None,
help="Manual CircuitPython version. If provided in combination "
"with --board-id, it overrides the detected CPy version.",
)
@click.version_option(
prog_name="Circup",
message="%(prog)s, A CircuitPython module updater. Version %(version)s",
)
@click.pass_context
def main( # pylint: disable=too-many-locals
ctx, verbose, path, host, port, password, timeout, board_id, cpy_version
): # pragma: no cover
"""
A tool to manage and update libraries on a CircuitPython device.
"""
# pylint: disable=too-many-arguments,too-many-branches,too-many-statements,too-many-locals, R0801
ctx.ensure_object(dict)
ctx.obj["TIMEOUT"] = timeout
if password is None:
password = os.getenv("CIRCUP_WEBWORKFLOW_PASSWORD")
device_path = get_device_path(host, port, password, path)
using_webworkflow = "host" in ctx.params.keys() and ctx.params["host"] is not None
if using_webworkflow:
if host == "circuitpython.local":
click.echo("Checking versions.json on circuitpython.local to find hostname")
versions_resp = requests.get(
"http://circuitpython.local/cp/version.json", timeout=timeout
)
host = f'{versions_resp.json()["hostname"]}.local'
click.echo(f"Using hostname: {host}")
device_path = device_path.replace("circuitpython.local", host)
try:
ctx.obj["backend"] = WebBackend(
host=host,
port=port,
password=password,
logger=logger,
timeout=timeout,
version_override=cpy_version,
)
except ValueError as e:
click.secho(e, fg="red")
time.sleep(0.3)
sys.exit(1)
except RuntimeError as e:
click.secho(e, fg="red")
sys.exit(1)
else:
try:
ctx.obj["backend"] = DiskBackend(
device_path,
logger,
version_override=cpy_version,
)
except ValueError as e:
print(e)
if verbose:
# Configure additional logging to stdout.
ctx.obj["verbose"] = True
verbose_handler = logging.StreamHandler(sys.stdout)
verbose_handler.setLevel(logging.INFO)
verbose_handler.setFormatter(log_formatter)
logger.addHandler(verbose_handler)
click.echo("Logging to {}\n".format(LOGFILE))
else:
ctx.obj["verbose"] = False
logger.info("### Started Circup ###")
# If a newer version of circup is available, print a message.
logger.info("Checking for a newer version of circup")
version = get_circup_version()
if version:
update_checker.update_check("circup", version)
# stop early if the command is boardless
if ctx.invoked_subcommand in BOARDLESS_COMMANDS or "--help" in sys.argv:
return
ctx.obj["DEVICE_PATH"] = device_path
latest_version = get_latest_release_from_url(
"https://github.com/adafruit/circuitpython/releases/latest", logger
)
if device_path is None or not ctx.obj["backend"].is_device_present():
click.secho("Could not find a connected CircuitPython device.", fg="red")
sys.exit(1)
else:
cpy_version, board_id = (
ctx.obj["backend"].get_circuitpython_version()
if board_id is None or cpy_version is None
else (cpy_version, board_id)
)
click.echo(
"Found device {} at {}, running CircuitPython {}.".format(
board_id, device_path, cpy_version
)
)
try:
if VersionInfo.parse(cpy_version) < VersionInfo.parse(latest_version):
click.secho(
"A newer version of CircuitPython ({}) is available.".format(
latest_version
),
fg="green",
)
if board_id:
url_download = f"https://circuitpython.org/board/{board_id}"
else:
url_download = "https://circuitpython.org/downloads"
click.secho("Get it here: {}".format(url_download), fg="green")
except ValueError as ex:
logger.warning("CircuitPython has incorrect semver value.")
logger.warning(ex)
@main.command()
@click.option("-r", "--requirement", is_flag=True)
@click.pass_context
def freeze(ctx, requirement): # pragma: no cover
"""
Output details of all the modules found on the connected CIRCUITPYTHON
device. Option -r saves output to requirements.txt file
"""
logger.info("Freeze")
modules = find_modules(ctx.obj["backend"], get_bundles_list())
if modules:
output = []
for module in modules:
output.append("{}=={}".format(module.name, module.device_version))
for module in output:
click.echo(module)
logger.info(module)
if requirement:
cwd = os.path.abspath(os.getcwd())
for i, module in enumerate(output):
output[i] += "\n"
overwrite = None
if os.path.exists(os.path.join(cwd, "requirements.txt")):
overwrite = click.confirm(
click.style(
"\nrequirements.txt file already exists in this location.\n"
"Do you want to overwrite it?",
fg="red",
),
default=False,
)
else:
overwrite = True
if overwrite:
with open(
cwd + "/" + "requirements.txt", "w", newline="\n", encoding="utf-8"
) as file:
file.truncate(0)
file.writelines(output)
else:
click.echo("No modules found on the device.")
@main.command("list")
@click.pass_context
def list_cli(ctx): # pragma: no cover
"""
Lists all out of date modules found on the connected CIRCUITPYTHON device.
"""
logger.info("List")
# Grab out of date modules.
data = [("Module", "Version", "Latest", "Update Reason")]
modules = [
m.row
for m in find_modules(ctx.obj["backend"], get_bundles_list())
if m.outofdate
]
if modules:
data += modules
# Nice tabular display.
col_width = [0, 0, 0, 0]
for row in data:
for i, word in enumerate(row):
col_width[i] = max(len(word) + 2, col_width[i])
dashes = tuple(("-" * (width - 1) for width in col_width))
data.insert(1, dashes)
click.echo(
"The following modules are out of date or probably need an update.\n"
"Major Updates may include breaking changes. Review before updating.\n"
"MPY Format changes from Circuitpython 8 to 9 require an update.\n"
)
for row in data:
output = ""
for index, cell in enumerate(row):
output += cell.ljust(col_width[index])
if "--verbose" not in sys.argv:
click.echo(output)
logger.info(output)
else:
click.echo("All modules found on the device are up to date.")
# pylint: disable=too-many-arguments,too-many-locals
@main.command()
@click.argument(
"modules", required=False, nargs=-1, shell_complete=completion_for_install
)
@click.option(
"pyext",
"--py",
is_flag=True,
help="Install the .py version of the module(s) instead of the mpy version.",
)
@click.option(
"-r",
"--requirement",
type=click.Path(exists=True, dir_okay=False),
help="specify a text file to install all modules listed in the text file."
" Typically requirements.txt.",
)
@click.option(
"--auto", "-a", is_flag=True, help="Install the modules imported in code.py."
)
@click.option(
"--upgrade", "-U", is_flag=True, help="Upgrade modules that are already installed."
)
@click.option(
"--stubs",
"-s",
is_flag=True,
help="Install stubs module from PyPi for context in IDE.",
)
@click.option(
"--auto-file",
default=None,
help="Specify the name of a file on the board to read for auto install."
" Also accepts an absolute path or a local ./ path.",
)
@click.pass_context
def install(
ctx, modules, pyext, requirement, auto, auto_file, upgrade=False, stubs=False
): # pragma: no cover
"""
Install a named module(s) onto the device. Multiple modules
can be installed at once by providing more than one module name, each
separated by a space. Modules can be from a Bundle or local filepaths.
"""
# pylint: disable=too-many-branches
# TODO: Ensure there's enough space on the device
available_modules = get_bundle_versions(get_bundles_list())
mod_names = {}
for module, metadata in available_modules.items():
mod_names[module.replace(".py", "").lower()] = metadata
if requirement:
with open(requirement, "r", encoding="utf-8") as rfile:
requirements_txt = rfile.read()
requested_installs = libraries_from_requirements(requirements_txt)
elif auto or auto_file:
requested_installs = libraries_from_auto_file(
ctx.obj["backend"], auto_file, mod_names
)
else:
requested_installs = modules
requested_installs = sorted(set(requested_installs))
click.echo(f"Searching for dependencies for: {requested_installs}")
to_install = get_dependencies(requested_installs, mod_names=mod_names)
device_modules = ctx.obj["backend"].get_device_versions()
if to_install is not None:
to_install = sorted(to_install)
click.echo(f"Ready to install: {to_install}\n")
for library in to_install:
ctx.obj["backend"].install_module(
ctx.obj["DEVICE_PATH"],
device_modules,
library,
pyext,
mod_names,
upgrade,
)
if stubs:
library_stubs = "adafruit-circuitpython-{}".format(
library.replace("adafruit_", "")
)
try:
output = subprocess.check_output(["pip", "install", library_stubs])
if (
f"Requirement already satisfied: {library_stubs}"
in output.decode()
):
click.echo(f"'{library}' stubs already installed.")
else:
click.echo(f"Installed '{library}' stubs.")
except subprocess.CalledProcessError:
click.secho(
f"Could not install stubs module {library_stubs}", fg="yellow"
)
@main.command()
@click.option("--overwrite", is_flag=True, help="Overwrite the file if it exists.")
@click.option("--list", "-ls", "op_list", is_flag=True, help="List available examples.")
@click.option("--rename", is_flag=True, help="Install the example as code.py.")
@click.argument(
"examples", required=False, nargs=-1, shell_complete=completion_for_example
)
@click.pass_context
def example(ctx, examples, op_list, rename, overwrite):
"""
Copy named example(s) from a bundle onto the device. Multiple examples
can be installed at once by providing more than one example name, each
separated by a space.
"""
if op_list:
if examples:
click.echo("\n".join(completion_for_example(ctx, "", examples)))
else:
click.echo("Available example libraries:")
available_examples = get_bundle_examples(
get_bundles_list(), avoid_download=True
)
lib_names = {
str(key.split(os.path.sep)[0]): value
for key, value in available_examples.items()
}
click.echo("\n".join(sorted(lib_names.keys())))
return
for example_arg in examples:
available_examples = get_bundle_examples(
get_bundles_list(), avoid_download=True
)
if example_arg in available_examples:
filename = available_examples[example_arg].split(os.path.sep)[-1]
install_metadata = {"path": available_examples[example_arg]}
filename = available_examples[example_arg].split(os.path.sep)[-1]
if rename:
if os.path.isfile(available_examples[example_arg]):
filename = "code.py"
install_metadata["target_name"] = filename
if overwrite or not ctx.obj["backend"].file_exists(filename):
click.echo(
f"{'Copying' if not overwrite else 'Overwriting'}: {filename}"
)
ctx.obj["backend"].install_module_py(install_metadata, location="")
else:
click.secho(
f"File: {filename} already exists. Use --overwrite if you wish to replace it.",
fg="red",
)
else:
click.secho(
f"Error: {example_arg} was not found in any local bundle examples.",
fg="red",
)
# pylint: enable=too-many-arguments,too-many-locals
@main.command()
@click.argument("match", required=False, nargs=1)
def show(match): # pragma: no cover
"""
Show a list of available modules in the bundle. These are modules which
*could* be installed on the device.
If MATCH is specified only matching modules will be listed.
"""
available_modules = get_bundle_versions(get_bundles_list())
module_names = sorted([m.replace(".py", "") for m in available_modules])
if match is not None:
match = match.lower()
module_names = [m for m in module_names if match in m]
click.echo("\n".join(module_names))
click.echo(
"{} shown of {} packages.".format(len(module_names), len(available_modules))
)
@main.command()
@click.argument("module", nargs=-1)
@click.pass_context
def uninstall(ctx, module): # pragma: no cover
"""
Uninstall a named module(s) from the connected device. Multiple modules
can be uninstalled at once by providing more than one module name, each
separated by a space.
"""
device_path = ctx.obj["DEVICE_PATH"]
print(f"Uninstalling {module} from {device_path}")
for name in module:
device_modules = ctx.obj["backend"].get_device_versions()
name = name.lower()
mod_names = {}
for module_item, metadata in device_modules.items():
mod_names[module_item.replace(".py", "").lower()] = metadata
if name in mod_names:
metadata = mod_names[name]
module_path = metadata["path"]
ctx.obj["backend"].uninstall(device_path, module_path)
click.echo("Uninstalled '{}'.".format(name))
else:
click.echo("Module '{}' not found on device.".format(name))
continue
# pylint: disable=too-many-branches
@main.command(
short_help=(
"Update modules on the device. "
"Use --all to automatically update all modules without Major Version warnings."
)
)
@click.option(
"update_all",
"--all",
is_flag=True,
help="Update all modules without Major Version warnings.",
)
@click.pass_context
# pylint: disable=too-many-locals
def update(ctx, update_all): # pragma: no cover
"""
Checks for out-of-date modules on the connected CIRCUITPYTHON device, and
prompts the user to confirm updating such modules.
"""
logger.info("Update")
# Grab current modules.
bundles_list = get_bundles_list()
installed_modules = find_modules(ctx.obj["backend"], bundles_list)
modules_to_update = [m for m in installed_modules if m.outofdate]
if not modules_to_update:
click.echo("None of the module[s] found on the device need an update.")
return
# Process out of date modules
updated_modules = []
click.echo("Found {} module[s] needing update.".format(len(modules_to_update)))
if not update_all:
click.echo("Please indicate which module[s] you wish to update:\n")
for module in modules_to_update:
update_flag = update_all
if "--verbose" in sys.argv:
click.echo(
"Device version: {}, Bundle version: {}".format(
module.device_version, module.bundle_version
)
)
if isinstance(module.bundle_version, str) and not VersionInfo.is_valid(
module.bundle_version
):
click.secho(
f"WARNING: Library {module.name} repo has incorrect __version__"
"\n\tmetadata. Circup will assume it needs updating."
"\n\tPlease file an issue in the library repo.",
fg="yellow",
)
if module.repo:
click.secho(f"\t{module.repo}", fg="yellow")
if not update_flag:
if module.bad_format:
click.secho(
f"WARNING: '{module.name}': module corrupted or in an"
" unknown mpy format. Updating is required.",
fg="yellow",
)
update_flag = click.confirm("Do you want to update?")
elif module.mpy_mismatch:
click.secho(
f"WARNING: '{module.name}': mpy format doesn't match the"
" device's Circuitpython version. Updating is required.",
fg="yellow",
)
update_flag = click.confirm("Do you want to update?")
elif module.major_update:
update_flag = click.confirm(
(
"'{}' is a Major Version update and may contain breaking "
"changes. Do you want to update?".format(module.name)
)
)
else:
update_flag = click.confirm("Update '{}'?".format(module.name))
if update_flag:
# pylint: disable=broad-except
try:
ctx.obj["backend"].update(module)
updated_modules.append(module.name)
click.echo("Updated {}".format(module.name))
except Exception as ex:
logger.exception(ex)
click.echo("Something went wrong, {} (check the logs)".format(str(ex)))
# pylint: enable=broad-except
if not updated_modules:
return
# We updated modules, look to see if any requirements are missing
click.echo(
"Checking {} updated module[s] for missing requirements.".format(
len(updated_modules)
)
)
available_modules = get_bundle_versions(bundles_list)
mod_names = {}
for module, metadata in available_modules.items():
mod_names[module.replace(".py", "").lower()] = metadata
missing_modules = get_dependencies(updated_modules, mod_names=mod_names)
device_modules = ctx.obj["backend"].get_device_versions()
# Process newly needed modules
if missing_modules is not None:
installed_module_names = [m.name for m in installed_modules]
missing_modules = set(missing_modules) - set(installed_module_names)
missing_modules = sorted(list(missing_modules))
click.echo(f"Ready to install: {missing_modules}\n")
for library in missing_modules:
ctx.obj["backend"].install_module(
ctx.obj["DEVICE_PATH"], device_modules, library, False, mod_names
)
# pylint: enable=too-many-branches
@main.command("bundle-show")
@click.option("--modules", is_flag=True, help="List all the modules per bundle.")
def bundle_show(modules):
"""
Show the list of bundles, default and local, with URL, current version
and latest version retrieved from the web.
"""
local_bundles = get_bundles_local_dict().values()
bundles = get_bundles_list()
available_modules = get_bundle_versions(bundles)
for bundle in bundles:
if bundle.key in local_bundles:
click.secho(bundle.key, fg="yellow")
else:
click.secho(bundle.key, fg="green")
click.echo(" " + bundle.url)
click.echo(" version = " + bundle.current_tag)
if modules:
click.echo("Modules:")
for name, mod in sorted(available_modules.items()):
if mod["bundle"] == bundle:
click.echo(f" {name} ({mod.get('__version__', '-')})")
@main.command("bundle-add")
@click.argument("bundle", nargs=-1)
@click.pass_context
def bundle_add(ctx, bundle):
"""
Add bundles to the local bundles list, by "user/repo" github string.
A series of tests to validate that the bundle exists and at least looks
like a bundle are done before validating it. There might still be errors
when the bundle is downloaded for the first time.
"""
if len(bundle) == 0:
click.secho(
"Must pass bundle argument, expecting github URL or `user/repository` string.",
fg="red",
)
return
bundles_dict = get_bundles_local_dict()
modified = False
for bundle_repo in bundle:
# cleanup in case seombody pastes the URL to the repo/releases
bundle_repo = re.sub(
r"https?://github.com/([^/]+/[^/]+)(/.*)?", r"\1", bundle_repo
)
if bundle_repo in bundles_dict.values():
click.secho("Bundle already in list.", fg="yellow")
click.secho(" " + bundle_repo, fg="yellow")
continue
try:
bundle_added = Bundle(bundle_repo)
except ValueError:
click.secho(
"Bundle string invalid, expecting github URL or `user/repository` string.",
fg="red",
)
click.secho(" " + bundle_repo, fg="red")
continue
result = requests.get(
"https://github.com/" + bundle_repo, timeout=ctx.obj["TIMEOUT"]
)
# pylint: disable=no-member
if result.status_code == requests.codes.NOT_FOUND:
click.secho("Bundle invalid, the repository doesn't exist (404).", fg="red")
click.secho(" " + bundle_repo, fg="red")
continue
# pylint: enable=no-member
if not bundle_added.validate():
click.secho(
"Bundle invalid, is the repository a valid circup bundle ?", fg="red"
)
click.secho(" " + bundle_repo, fg="red")
continue
# note: use bun as the dictionary key for uniqueness
bundles_dict[bundle_repo] = bundle_repo
modified = True
click.echo("Added " + bundle_repo)
click.echo(" " + bundle_added.url)
if modified:
# save the bundles list
save_local_bundles(bundles_dict)
# update and get the new bundles for the first time
get_bundle_versions(get_bundles_list())
@main.command("bundle-remove")
@click.argument("bundle", nargs=-1)
@click.option("--reset", is_flag=True, help="Remove all local bundles.")
def bundle_remove(bundle, reset):
"""
Remove one or more bundles from the local bundles list.
"""
if reset:
save_local_bundles({})
return
if len(bundle) == 0:
click.secho(
"Must pass bundle argument or --reset, expecting github URL or "
"`user/repository` string. Run circup bundle-show to see a list of bundles.",
fg="red",
)
return
bundle_config = list(get_bundles_dict().values())
bundles_local_dict = get_bundles_local_dict()
modified = False
for bun in bundle:
# cleanup in case somebody pastes the URL to the repo/releases
bun = re.sub(r"https?://github.com/([^/]+/[^/]+)(/.*)?", r"\1", bun)
found = False
for name, repo in list(bundles_local_dict.items()):
if bun in (name, repo):
found = True
click.secho(f"Bundle {repo}")
do_it = click.confirm("Do you want to remove that bundle ?")
if do_it:
click.secho("Removing the bundle from the local list", fg="yellow")
click.secho(f" {bun}", fg="yellow")
modified = True
del bundles_local_dict[name]
if not found:
if bun in bundle_config:
click.secho("Cannot remove built-in module:" "\n " + bun, fg="red")
else:
click.secho(
"Bundle not found in the local list, nothing removed:"
"\n " + bun,
fg="red",
)
if modified:
save_local_bundles(bundles_local_dict)

View file

@ -1,4 +1,5 @@
{
"adafruit": "adafruit/Adafruit_CircuitPython_Bundle",
"circuitpython_community": "adafruit/CircuitPython_Community_Bundle"
"circuitpython_community": "adafruit/CircuitPython_Community_Bundle",
"circuitpython_org": "circuitpython/CircuitPython_Org_Bundle"
}

View file

@ -1,33 +0,0 @@
# SPDX-FileCopyrightText: 2019 Nicholas Tollervey, 2024 Tim Cocks, written for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
Logging utilities and configuration used by circup
"""
import os
import logging
from logging.handlers import RotatingFileHandler
import appdirs
from circup.shared import DATA_DIR
#: The directory containing the utility's log file.
LOG_DIR = appdirs.user_log_dir(appname="circup", appauthor="adafruit")
#: The location of the log file for the utility.
LOGFILE = os.path.join(LOG_DIR, "circup.log")
# Ensure DATA_DIR / LOG_DIR related directories and files exist.
if not os.path.exists(DATA_DIR): # pragma: no cover
os.makedirs(DATA_DIR)
if not os.path.exists(LOG_DIR): # pragma: no cover
os.makedirs(LOG_DIR)
# Setup logging.
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logfile_handler = RotatingFileHandler(LOGFILE, maxBytes=10_000_000, backupCount=0)
log_formatter = logging.Formatter(
"%(asctime)s %(levelname)s: %(message)s", datefmt="%m/%d/%Y %H:%M:%S"
)
logfile_handler.setFormatter(log_formatter)
logger.addHandler(logfile_handler)

View file

@ -1,209 +0,0 @@
# SPDX-FileCopyrightText: 2019 Nicholas Tollervey, 2024 Tim Cocks, written for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
Class that represents a specific CircuitPython module on a device or in a Bundle.
"""
import os
from urllib.parse import urljoin, urlparse
from semver import VersionInfo
from circup.shared import BAD_FILE_FORMAT
from circup.backends import WebBackend
from circup.logging import logger
class Module:
"""
Represents a CircuitPython module.
"""
# pylint: disable=too-many-arguments
def __init__(
self,
name,
backend,
repo,
device_version,
bundle_version,
mpy,
bundle,
compatibility,
):
"""
The ``self.file`` and ``self.name`` attributes are constructed from
the ``path`` value. If the path is to a directory based module, the
resulting self.file value will be None, and the name will be the
basename of the directory path.
:param str name: The file name of the module.
:param Backend backend: The backend that the module is on.
:param str repo: The URL of the Git repository for this module.
:param str device_version: The semver value for the version on device.
:param str bundle_version: The semver value for the version in bundle.
:param bool mpy: Flag to indicate if the module is byte-code compiled.
:param Bundle bundle: Bundle object where the module is located.
:param (str,str) compatibility: Min and max versions of CP compatible with the mpy.
"""
self.name = name
self.backend = backend
self.path = (
urljoin(backend.library_path, name, allow_fragments=False)
if isinstance(backend, WebBackend)
else os.path.join(backend.library_path, name)
)
url = urlparse(self.path, allow_fragments=False)
if (
url.path.endswith("/")
if isinstance(backend, WebBackend)
else self.path.endswith(os.sep)
):
self.file = None
self.name = self.path.split(
"/" if isinstance(backend, WebBackend) else os.sep
)[-2]
else:
self.file = os.path.basename(url.path)
self.name = (
os.path.basename(url.path).replace(".py", "").replace(".mpy", "")
)
self.repo = repo
self.device_version = device_version
self.bundle_version = bundle_version
self.mpy = mpy
self.min_version = compatibility[0]
self.max_version = compatibility[1]
# Figure out the bundle path.
self.bundle_path = None
if self.mpy:
# Byte compiled, now check CircuitPython version.
major_version = self.backend.get_circuitpython_version()[0].split(".")[0]
bundle_platform = "{}mpy".format(major_version)
else:
# Regular Python
bundle_platform = "py"
# module path in the bundle
search_path = bundle.lib_dir(bundle_platform)
if self.file:
self.bundle_path = os.path.join(search_path, self.file)
else:
self.bundle_path = os.path.join(search_path, self.name)
logger.info(self)
# pylint: enable=too-many-arguments
@property
def outofdate(self):
"""
Returns a boolean to indicate if this module is out of date.
Treat mismatched MPY versions as out of date.
:return: Truthy indication if the module is out of date.
"""
if self.mpy_mismatch:
return True
if self.device_version and self.bundle_version:
try:
return VersionInfo.parse(self.device_version) < VersionInfo.parse(
self.bundle_version
)
except ValueError as ex:
logger.warning("Module '%s' has incorrect semver value.", self.name)
logger.warning(ex)
return True # Assume out of date to try to update.
@property
def bad_format(self):
"""A boolean indicating that the mpy file format could not be identified"""
return self.mpy and self.device_version == BAD_FILE_FORMAT
@property
def mpy_mismatch(self):
"""
Returns a boolean to indicate if this module's MPY version is compatible
with the board's current version of Circuitpython. A min or max version
that evals to False means no limit.
:return: Boolean indicating if the MPY versions don't match.
"""
if not self.mpy:
return False
try:
cpv = VersionInfo.parse(self.backend.get_circuitpython_version()[0])
except ValueError as ex:
logger.warning("CircuitPython has incorrect semver value.")
logger.warning(ex)
try:
if self.min_version and cpv < VersionInfo.parse(self.min_version):
return True # CP version too old
if self.max_version and cpv >= VersionInfo.parse(self.max_version):
return True # MPY version too old
except (TypeError, ValueError) as ex:
logger.warning(
"Module '%s' has incorrect MPY compatibility information.", self.name
)
logger.warning(ex)
return False
@property
def major_update(self):
"""
Returns a boolean to indicate if this is a major version update.
:return: Boolean indicating if this is a major version upgrade
"""
try:
if (
VersionInfo.parse(self.device_version).major
== VersionInfo.parse(self.bundle_version).major
):
return False
except (TypeError, ValueError) as ex:
logger.warning("Module '%s' has incorrect semver value.", self.name)
logger.warning(ex)
return True # Assume Major Version udpate.
@property
def row(self):
"""
Returns a tuple of items to display in a table row to show the module's
name, local version and remote version, and reason to update.
:return: A tuple containing the module's name, version on the connected
device, version in the latest bundle and reason to update.
"""
loc = self.device_version if self.device_version else "unknown"
rem = self.bundle_version if self.bundle_version else "unknown"
if self.mpy_mismatch:
update_reason = "MPY Format"
elif self.major_update:
update_reason = "Major Version"
else:
update_reason = "Minor Version"
return (self.name, loc, rem, update_reason)
def __repr__(self):
"""
Helps with log files.
:return: A repr of a dictionary containing the module's metadata.
"""
return repr(
{
"path": self.path,
"file": self.file,
"name": self.name,
"repo": self.repo,
"device_version": self.device_version,
"bundle_version": self.bundle_version,
"bundle_path": self.bundle_path,
"mpy": self.mpy,
"min_version": self.min_version,
"max_version": self.max_version,
}
)

View file

@ -9,10 +9,8 @@ and Backend class functions.
import glob
import os
import re
import json
import importlib.resources
import appdirs
import requests
#: Version identifier for a bad MPY file format
BAD_FILE_FORMAT = "Invalid"
@ -20,36 +18,6 @@ BAD_FILE_FORMAT = "Invalid"
#: The location of data files used by circup (following OS conventions).
DATA_DIR = appdirs.user_data_dir(appname="circup", appauthor="adafruit")
#: Module formats list (and the other form used in github files)
PLATFORMS = {"py": "py", "9mpy": "9.x-mpy", "10mpy": "10.x-mpy"}
#: Timeout for requests calls like get()
REQUESTS_TIMEOUT = 30
#: The path to the JSON file containing the metadata about the bundles.
BUNDLE_CONFIG_FILE = importlib.resources.files("circup") / "config/bundle_config.json"
#: Overwrite the bundles list with this file (only done manually)
BUNDLE_CONFIG_OVERWRITE = os.path.join(DATA_DIR, "bundle_config.json")
#: The path to the JSON file containing the local list of bundles.
BUNDLE_CONFIG_LOCAL = os.path.join(DATA_DIR, "bundle_config_local.json")
#: The path to the JSON file containing the metadata about the bundles.
BUNDLE_DATA = os.path.join(DATA_DIR, "circup.json")
#: The libraries (and blank lines) which don't go on devices
NOT_MCU_LIBRARIES = [
"",
"adafruit-blinka",
"adafruit-blinka-bleio",
"adafruit-blinka-displayio",
"adafruit-circuitpython-typing",
"circuitpython_typing",
"pyserial",
]
#: Commands that do not require an attached board
BOARDLESS_COMMANDS = ["show", "bundle-add", "bundle-remove", "bundle-show"]
def _get_modules_file(path, logger):
"""
@ -79,18 +47,14 @@ def _get_modules_file(path, logger):
py_files = glob.glob(os.path.join(package_path, "**/*.py"), recursive=True)
mpy_files = glob.glob(os.path.join(package_path, "**/*.mpy"), recursive=True)
all_files = py_files + mpy_files
# put __init__ first if any, assumed to have the version number
all_files.sort()
# default value
result[name] = {"path": package_path, "mpy": bool(mpy_files)}
# explore all the submodules to detect bad ones
for source in [f for f in all_files if not os.path.basename(f).startswith(".")]:
metadata = extract_metadata(source, logger)
if "__version__" in metadata:
# don't replace metadata if already found
if "__version__" not in result[name]:
metadata["path"] = package_path
result[name] = metadata
metadata["path"] = package_path
result[name] = metadata
# break now if any of the submodules has a bad format
if metadata["__version__"] == BAD_FILE_FORMAT:
break
@ -178,44 +142,3 @@ def extract_metadata(path, logger):
# not a valid MPY file
result["__version__"] = BAD_FILE_FORMAT
return result
def tags_data_load(logger):
"""
Load the list of the version tags of the bundles on disk.
:return: a dict() of tags indexed by Bundle identifiers/keys.
"""
tags_data = None
try:
with open(BUNDLE_DATA, encoding="utf-8") as data:
try:
tags_data = json.load(data)
except json.decoder.JSONDecodeError as ex:
# Sometimes (why?) the JSON file becomes corrupt. In which case
# log it and carry on as if setting up for first time.
logger.error("Could not parse %s", BUNDLE_DATA)
logger.exception(ex)
except FileNotFoundError:
pass
if not isinstance(tags_data, dict):
tags_data = {}
return tags_data
def get_latest_release_from_url(url, logger):
"""
Find the tag name of the latest release by using HTTP HEAD and decoding the redirect.
:param str url: URL to the latest release page on a git repository.
:return: The most recent tag value for the release.
"""
logger.info("Requesting redirect information: %s", url)
response = requests.head(url, timeout=REQUESTS_TIMEOUT)
responseurl = response.url
if response.is_redirect:
responseurl = response.headers["Location"]
tag = responseurl.rsplit("/", 1)[-1]
logger.info("Tag: '%s'", tag)
return tag

View file

@ -1,105 +0,0 @@
wwshell
=======
.. image:: https://readthedocs.org/projects/circup/badge/?version=latest
:target: https://circuitpython.readthedocs.io/projects/circup/en/latest/
:alt: Documentation Status
.. image:: https://img.shields.io/discord/327254708534116352.svg
:target: https://adafru.it/discord
:alt: Discord
.. image:: https://github.com/adafruit/circup/workflows/Build%20CI/badge.svg
:target: https://github.com/adafruit/circup/actions
:alt: Build Status
.. image:: https://img.shields.io/badge/code%20style-black-000000.svg
:target: https://github.com/psf/black
:alt: Code Style: Black
A tool to manage files on a CircuitPython device via wireless workflows.
Currently supports Web Workflow.
.. contents::
Installation
------------
wwshell is bundled along with Circup. When you install Circup you'll get wwshell automatically.
Circup requires Python 3.5 or higher.
In a `virtualenv <https://virtualenv.pypa.io/en/latest/>`_,
``pip install circup`` should do the trick. This is the simplest way to make it
work.
If you have no idea what a virtualenv is, try the following command,
``pip3 install --user circup``.
.. note::
If you use the ``pip3`` command to install CircUp you must make sure that
your path contains the directory into which the script will be installed.
To discover this path,
* On Unix-like systems, type ``python3 -m site --user-base`` and append
``bin`` to the resulting path.
* On Windows, type the same command, but append ``Scripts`` to the
resulting path.
What does wwshell do?
---------------------
It lets you view, delete, upload, and download files from your Circuitpython device
via wireless workflows. Similar to ampy, but operates over wireless workflow rather
than USB serial.
Usage
-----
To use web workflow you need to enable it by putting WIFI credentials and a web workflow
password into your settings.toml file. `See here <https://learn.adafruit.com/getting-started-with-web-workflow-using-the-code-editor/device-setup>`_,
To get help, just type the command::
$ wwshell
Usage: wwshell [OPTIONS] COMMAND [ARGS]...
A tool to manage files CircuitPython device over web workflow.
Options:
--verbose Comprehensive logging is sent to stdout.
--path DIRECTORY Path to CircuitPython directory. Overrides automatic path
detection.
--host TEXT Hostname or IP address of a device. Overrides automatic
path detection.
--password TEXT Password to use for authentication when --host is used.
You can optionally set an environment variable
CIRCUP_WEBWORKFLOW_PASSWORD instead of passing this
argument. If both exist the CLI arg takes precedent.
--timeout INTEGER Specify the timeout in seconds for any network
operations.
--version Show the version and exit.
--help Show this message and exit.
Commands:
get Download a copy of a file or directory from the device to the...
ls Lists the contents of a directory.
put Upload a copy of a file or directory from the local computer to...
rm Delete a file on the device.
.. note::
If you find a bug, or you want to suggest an enhancement or new feature
feel free to create an issue or submit a pull request here:
https://github.com/adafruit/circup
Discussion of this tool happens on the Adafruit CircuitPython
`Discord channel <https://discord.gg/rqrKDjU>`_.

View file

@ -1,3 +0,0 @@
# SPDX-FileCopyrightText: 2024 Tim Cocks, written for Adafruit Industries
#
# SPDX-License-Identifier: MIT

View file

@ -1,14 +0,0 @@
# SPDX-FileCopyrightText: 2024 Tim Cocks, written for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
wwshell is a CLI utility for managing files on CircuitPython devices via wireless workflows.
It currently supports Web Workflow.
"""
from .commands import main
# Allows execution via `python -m circup ...`
# pylint: disable=no-value-for-parameter
if __name__ == "__main__":
main()

View file

@ -1,231 +0,0 @@
# SPDX-FileCopyrightText: 2019 Nicholas Tollervey, 2024 Tim Cocks, written for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
# ----------- CLI command definitions ----------- #
The following functions have IO side effects (for instance they emit to
stdout). Ergo, these are not checked with unit tests. Most of the
functionality they provide is provided by the functions from util_functions.py,
and the respective Backends which *are* tested. Most of the logic of the following
functions is to prepare things for presentation to / interaction with the user.
"""
import os
import time
import sys
import logging
import update_checker
import click
import requests
from circup.backends import WebBackend
from circup.logging import logger, log_formatter, LOGFILE
from circup.shared import BOARDLESS_COMMANDS
from circup.command_utils import (
get_device_path,
get_circup_version,
sorted_by_directory_then_alpha,
)
@click.group()
@click.option(
"--verbose", is_flag=True, help="Comprehensive logging is sent to stdout."
)
@click.option(
"--path",
type=click.Path(exists=True, file_okay=False),
help="Path to CircuitPython directory. Overrides automatic path detection.",
)
@click.option(
"--host",
help="Hostname or IP address of a device. Overrides automatic path detection.",
default="circuitpython.local",
)
@click.option(
"--port",
help="HTTP port that the web workflow is listening on.",
default=80,
)
@click.option(
"--password",
help="Password to use for authentication when --host is used."
" You can optionally set an environment variable CIRCUP_WEBWORKFLOW_PASSWORD"
" instead of passing this argument. If both exist the CLI arg takes precedent.",
)
@click.option(
"--timeout",
default=30,
help="Specify the timeout in seconds for any network operations.",
)
@click.version_option(
prog_name="CircFile",
message="%(prog)s, A CircuitPython web workflow file managemenr. Version %(version)s",
)
@click.pass_context
def main( # pylint: disable=too-many-locals
ctx,
verbose,
path,
host,
port,
password,
timeout,
): # pragma: no cover
"""
A tool to manage files CircuitPython device over web workflow.
"""
# pylint: disable=too-many-arguments,too-many-branches,too-many-statements,too-many-locals, R0801
ctx.ensure_object(dict)
ctx.obj["TIMEOUT"] = timeout
if password is None:
password = os.getenv("CIRCUP_WEBWORKFLOW_PASSWORD")
device_path = get_device_path(host, port, password, path)
using_webworkflow = "host" in ctx.params.keys() and ctx.params["host"] is not None
if using_webworkflow:
if host == "circuitpython.local":
click.echo("Checking versions.json on circuitpython.local to find hostname")
versions_resp = requests.get(
"http://circuitpython.local/cp/version.json", timeout=timeout
)
host = f'{versions_resp.json()["hostname"]}.local'
click.echo(f"Using hostname: {host}")
device_path = device_path.replace("circuitpython.local", host)
try:
ctx.obj["backend"] = WebBackend(
host=host, port=port, password=password, logger=logger, timeout=timeout
)
except ValueError as e:
click.secho(e, fg="red")
time.sleep(0.3)
sys.exit(1)
except RuntimeError as e:
click.secho(e, fg="red")
sys.exit(1)
if verbose:
# Configure additional logging to stdout.
ctx.obj["verbose"] = True
verbose_handler = logging.StreamHandler(sys.stdout)
verbose_handler.setLevel(logging.INFO)
verbose_handler.setFormatter(log_formatter)
logger.addHandler(verbose_handler)
click.echo("Logging to {}\n".format(LOGFILE))
else:
ctx.obj["verbose"] = False
logger.info("### Started Circfile ###")
# If a newer version of circfile is available, print a message.
logger.info("Checking for a newer version of circfile")
version = get_circup_version()
if version:
update_checker.update_check("circfile", version)
# stop early if the command is boardless
if ctx.invoked_subcommand in BOARDLESS_COMMANDS or "--help" in sys.argv:
return
ctx.obj["DEVICE_PATH"] = device_path
if device_path is None or not ctx.obj["backend"].is_device_present():
click.secho("Could not find a connected CircuitPython device.", fg="red")
sys.exit(1)
else:
click.echo("Found device at {}.".format(device_path))
@main.command("ls")
@click.argument("file", required=True, nargs=1, default="/")
@click.pass_context
def ls_cli(ctx, file): # pragma: no cover
"""
Lists the contents of a directory. Defaults to root directory
if not supplied.
"""
logger.info("ls")
if not file.endswith("/"):
file += "/"
click.echo(f"running: ls {file}")
files = ctx.obj["backend"].list_dir(file)
click.echo("Size\tName")
for cur_file in sorted_by_directory_then_alpha(files):
click.echo(
f"{cur_file['file_size']}\t{cur_file['name']}{'/' if cur_file['directory'] else ''}"
)
@main.command("put")
@click.argument("file", required=True, nargs=1)
@click.argument("location", required=False, nargs=1, default="")
@click.option("--overwrite", is_flag=True, help="Overwrite the file if it exists.")
@click.pass_context
def put_cli(ctx, file, location, overwrite):
"""
Upload a copy of a file or directory from the local computer
to the device
"""
click.echo(f"Attempting PUT: {file} at {location} overwrite? {overwrite}")
if not ctx.obj["backend"].file_exists(f"{location}{file}"):
ctx.obj["backend"].upload_file(file, location)
click.echo(f"Successfully PUT {location}{file}")
else:
if overwrite:
click.secho(
f"{location}{file} already exists. Overwriting it.", fg="yellow"
)
ctx.obj["backend"].upload_file(file, location)
click.echo(f"Successfully PUT {location}{file}")
else:
click.secho(
f"{location}{file} already exists. Pass --overwrite if you wish to replace it.",
fg="red",
)
# pylint: enable=too-many-arguments,too-many-locals
@main.command("get")
@click.argument("file", required=True, nargs=1)
@click.argument("location", required=False, nargs=1)
@click.pass_context
def get_cli(ctx, file, location): # pragma: no cover
"""
Download a copy of a file or directory from the device to the local computer.
"""
click.echo(f"running: get {file} {location}")
ctx.obj["backend"].download_file(file, location)
@main.command("rm")
@click.argument("file", nargs=1)
@click.pass_context
def rm_cli(ctx, file): # pragma: no cover
"""
Delete a file on the device.
"""
click.echo(f"running: rm {file}")
ctx.obj["backend"].uninstall(
ctx.obj["backend"].device_location, ctx.obj["backend"].get_file_path(file)
)
@main.command("mkdir")
@click.argument("directory", nargs=1)
@click.pass_context
def mkdir_cli(ctx, directory): # pragma: no cover
"""
Create
"""
click.echo(f"running: mkdir {directory}")
ctx.obj["backend"].create_directory(
ctx.obj["backend"].device_location, ctx.obj["backend"].get_file_path(directory)
)

View file

@ -64,7 +64,7 @@ release = "1.0"
#
# This is also used if you do content translation via gettext catalogs.
# Usually you set "language" from the command line for these cases.
language = "en"
language = None
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
@ -109,6 +109,7 @@ if not on_rtd: # only import and set the theme if we're building docs locally
import sphinx_rtd_theme
html_theme = "sphinx_rtd_theme"
html_theme_path = [sphinx_rtd_theme.get_html_theme_path(), "."]
except:
html_theme = "default"
html_theme_path = ["."]

View file

@ -1,12 +1,10 @@
.. Circup documentation master file, created by
.. CircUp documentation master file, created by
sphinx-quickstart on Mon Sep 2 10:58:36 2019.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
.. include:: ../README.rst
.. include:: ../circup/wwshell/README.rst
.. include:: ../CONTRIBUTING.rst
API

View file

@ -1,4 +0,0 @@
pytest
pytest-cov
pytest-faulthandler
pytest-random-order

View file

@ -1,3 +0,0 @@
SPDX-FileCopyrightText: 2024 Autogenerated by 'pip freeze'
SPDX-License-Identifier: MIT

View file

@ -1,52 +0,0 @@
# SPDX-FileCopyrightText: 2024 Jev Kuznetsov, ROX Automation
#
# SPDX-License-Identifier: MIT
[build-system]
requires = ["setuptools>=61.0", "setuptools-scm"]
build-backend = "setuptools.build_meta"
[project]
name = "circup"
dynamic = ["version", "dependencies", "optional-dependencies"]
description = "A tool to manage/update libraries on CircuitPython devices."
readme = "README.rst"
authors = [{ name = "Adafruit Industries", email = "circuitpython@adafruit.com" }]
license = { file = "LICENSE" }
classifiers = [
"Development Status :: 3 - Alpha",
"Environment :: Console",
"Intended Audience :: Developers",
"Intended Audience :: Education",
"License :: OSI Approved :: MIT License",
"Operating System :: POSIX",
"Operating System :: MacOS :: MacOS X",
"Operating System :: Microsoft :: Windows",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Topic :: Education",
"Topic :: Software Development :: Embedded Systems",
"Topic :: System :: Software Distribution"
]
keywords = ["adafruit", "blinka", "circuitpython", "micropython", "libraries"]
requires-python = ">=3.9"
[tool.setuptools.dynamic]
dependencies = {file = ["requirements.txt"]}
optional-dependencies = {optional = {file = ["optional_requirements.txt"]}}
[tool.setuptools_scm]
[project.scripts]
circup = "circup:main"
wwshell = "circup.wwshell:main"
[project.urls]
homepage = "https://github.com/adafruit/circup"
[tool.setuptools.packages.find]
where = ["."] # This tells setuptools to look in the project root directory
include = ["circup"] # This pattern includes your main package and any sub-packages within it

View file

@ -1,6 +1,58 @@
appdirs
Click
requests
semver
toml
update_checker
alabaster==0.7.12
appdirs==1.4.3
astroid==2.6.6
atomicwrites==1.3.0
attrs==19.1.0
Babel==2.9.1
black==24.3.0
bleach==3.3.0
certifi==2023.7.22
chardet==3.0.4
charset-normalizer==2.0.4
click==8.0.1
coverage==4.5.4
docutils==0.15.2
findimports==2.1.0
idna==2.8
imagesize==1.1.0
importlib-metadata==4.12.0
isort==5.9.3
Jinja2==2.11.3
lazy-object-proxy==1.6.0
MarkupSafe==1.1.1
mccabe==0.6.1
more-itertools==7.2.0
packaging==19.1
pkginfo==1.5.0.1
pluggy==0.13.1
py==1.10.0
Pygments==2.15.0
pylint==2.9.6
pyparsing==2.4.2
pytest==5.1.2
pytest-cov==2.7.1
pytest-faulthandler==2.0.1
pytest-random-order==1.0.4
pytz==2019.2
readme-renderer==24.0
requests==2.31.0
requests-toolbelt==0.9.1
semver==3.0.1
six==1.12.0
snowballstemmer==1.9.0
Sphinx==2.2.0
sphinxcontrib-applehelp==1.0.1
sphinxcontrib-devhelp==1.0.1
sphinxcontrib-htmlhelp==1.0.2
sphinxcontrib-jsmath==1.0.1
sphinxcontrib-qthelp==1.0.2
sphinxcontrib-serializinghtml==1.1.3
toml==0.10.0
tqdm==4.35.0
twine==1.13.0
update-checker==0.18.0
urllib3==1.26.18
wcwidth==0.1.7
webencodings==0.5.1
wrapt==1.12.1
zipp==0.6.0

104
setup.py Normal file
View file

@ -0,0 +1,104 @@
# SPDX-FileCopyrightText: 2019 Nicholas Tollervey, written for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""A setuptools based setup module.
See:
https://packaging.python.org/guides/distributing-packages-using-setuptools/
https://github.com/pypa/sampleproject
"""
# Always prefer setuptools over distutils
from setuptools import setup, find_packages
# To use a consistent encoding
from codecs import open
from os import path
here = path.abspath(path.dirname(__file__))
# Get the long description from the README file
with open(path.join(here, "README.rst"), encoding="utf-8") as f:
long_description = f.read()
install_requires = [
"semver~=3.0",
"Click>=8.0",
"appdirs>=1.4.3",
"requests>=2.22.0",
"findimports>=2.1.0",
"toml>=0.10.2",
# importlib_metadata is only available for 3.7, and is not needed for 3.8 and up.
"importlib_metadata; python_version == '3.7'",
"update_checker",
]
extras_require = {
"tests": [
"pytest",
"pylint",
"pytest-cov",
"pytest-random-order>=1.0.0",
"pytest-faulthandler",
"coverage",
"black",
],
"docs": ["sphinx"],
"package": [
# Wheel building and PyPI uploading
"wheel",
"twine",
],
}
extras_require["dev"] = (
extras_require["tests"] + extras_require["docs"] + extras_require["package"]
)
extras_require["all"] = list(
{req for extra, reqs in extras_require.items() for req in reqs}
)
setup(
name="circup",
use_scm_version=True,
setup_requires=["setuptools_scm"],
description="A tool to manage/update libraries on CircuitPython devices.",
long_description=long_description,
long_description_content_type="text/x-rst",
# The project's main homepage.
url="https://github.com/adafruit/circup",
# Author details
author="Adafruit Industries",
author_email="circuitpython@adafruit.com",
install_requires=install_requires,
extras_require=extras_require,
# Choose your license
license="MIT",
# See https://pypi.python.org/pypi?%3Aaction=list_classifiers
classifiers=[
"Development Status :: 3 - Alpha",
"Environment :: Console",
"Intended Audience :: Developers",
"Intended Audience :: Education",
"License :: OSI Approved :: MIT License",
"Operating System :: POSIX",
"Operating System :: MacOS :: MacOS X",
"Operating System :: Microsoft :: Windows",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Topic :: Education",
"Topic :: Software Development :: Embedded Systems",
"Topic :: System :: Software Distribution",
],
entry_points={"console_scripts": ["circup=circup:main"]},
# What does your project relate to?
keywords="adafruit, blinka, circuitpython, micropython, libraries",
# You can just specify the packages manually here if your project is
# simple. Or you can use find_packages().
packages=["circup"],
package_data={"circup": ["config/bundle_config.json"]},
)

View file

@ -2,10 +2,7 @@
#
# SPDX-License-Identifier: MIT
# pylint: disable=all
import os, sys
import adafruit_bus_device
from adafruit_button import Button
from adafruit_esp32spi import adafruit_esp32spi_socketpool
from adafruit_display_text import wrap_text_to_pixels, wrap_text_to_lines
import adafruit_esp32spi.adafruit_esp32spi_socket as socket
import adafruit_hid.consumer_control
import import_styles_sub

View file

@ -1,11 +0,0 @@
# SPDX-FileCopyrightText: 2021 Jeff Epler for Adafruit Industries
#
# SPDX-License-Identifier: MIT
# pylint: disable=all
import os, sys
import adafruit_bus_device
from adafruit_button import Button
from adafruit_esp32spi import adafruit_esp32spi_socketpool
from adafruit_display_text import wrap_text_to_pixels, wrap_text_to_lines
import adafruit_hid.consumer_control
import import_styles_sub

View file

@ -1,5 +0,0 @@
# SPDX-FileCopyrightText: 2025 Neradoc
#
# SPDX-License-Identifier: MIT
# pylint: disable=all
import adafruit_ntp

View file

@ -1,3 +1,3 @@
Adafruit CircuitPython 9.0.0 on 2019-08-02; Adafruit CircuitPlayground Express with samd21g18
Adafruit CircuitPython 4.1.0 on 2019-08-02; Adafruit CircuitPlayground Express with samd21g18
Board ID:this_is_a_board
UID:AAAABBBBCCCC

View file

@ -1,11 +0,0 @@
# SPDX-FileCopyrightText: 2021 Jeff Epler for Adafruit Industries
#
# SPDX-License-Identifier: MIT
# pylint: disable=all
import os, sys
import adafruit_bus_device
from adafruit_button import Button
from adafruit_esp32spi import adafruit_esp32spi_socketpool
from adafruit_display_text import wrap_text_to_pixels, wrap_text_to_lines
import adafruit_hid.consumer_control
import import_styles_sub

View file

@ -1,5 +0,0 @@
# SPDX-FileCopyrightText: 2025 Neradoc
#
# SPDX-License-Identifier: MIT
# pylint: disable=all
import adafruit_ntp

View file

@ -1,4 +0,0 @@
# SPDX-FileCopyrightText: 2025 Neradoc
#
# SPDX-License-Identifier: MIT
lib/*

View file

@ -1,3 +0,0 @@
Adafruit CircuitPython 9.0.0 on 2019-08-02; Adafruit CircuitPlayground Express with samd21g18
Board ID:this_is_a_board
UID:AAAABBBBCCCC

View file

@ -1,3 +0,0 @@
# SPDX-FileCopyrightText: 2023 Tim Cocks, written for Adafruit Industries
#
# SPDX-License-Identifier: MIT

View file

@ -1,7 +0,0 @@
# SPDX-FileCopyrightText: 2021 Jeff Epler for Adafruit Industries
#
# SPDX-License-Identifier: MIT
# pylint: disable=all
import adafruit_ssd1675
import import_styles_sub
import package

View file

@ -1,5 +0,0 @@
# SPDX-FileCopyrightText: 2025 Neradoc
#
# SPDX-License-Identifier: MIT
# pylint: disable=all
import adafruit_ntp

View file

@ -1,6 +0,0 @@
# SPDX-FileCopyrightText: 2025 Neradoc
#
# SPDX-License-Identifier: MIT
# pylint: disable=all
import adafruit_spd1656
from .other import variable

View file

@ -1,5 +0,0 @@
# SPDX-FileCopyrightText: 2021 Jeff Epler for Adafruit Industries
#
# SPDX-License-Identifier: MIT
# pylint: disable=all
import adafruit_spd1608

View file

@ -37,18 +37,6 @@ import requests
import circup
from circup import DiskBackend
from circup.command_utils import (
find_device,
ensure_latest_bundle,
get_bundle,
get_bundles_dict,
imports_from_code,
get_all_imports,
libraries_from_auto_file,
)
from circup.shared import PLATFORMS
from circup.module import Module
from circup.logging import logger
TEST_BUNDLE_CONFIG_JSON = "tests/test_bundle_config.json"
with open(TEST_BUNDLE_CONFIG_JSON, "rb") as tbc:
@ -64,16 +52,21 @@ def test_Bundle_init():
"""
Create a Bundle and check all the strings are set as expected.
"""
bundle = circup.Bundle(TEST_BUNDLE_NAME)
with mock.patch("circup.logger.info"), mock.patch(
"circup.os.path.isfile", return_value=True
), mock.patch("circup.CPY_VERSION", "4.1.2"), mock.patch(
"circup.tags_data_load", return_value=dict()
), mock.patch(
"circup.DATA_DIR", "DATA_DIR"
):
bundle = circup.Bundle(TEST_BUNDLE_NAME)
assert repr(bundle) == repr(
{
"key": TEST_BUNDLE_NAME,
"url": "https://github.com/" + TEST_BUNDLE_NAME,
"urlzip": "adafruit-circuitpython-bundle-{platform}-{tag}.zip",
"dir": circup.shared.DATA_DIR
+ "/adafruit/adafruit-circuitpython-bundle-{platform}",
"zip": circup.shared.DATA_DIR
+ "/adafruit-circuitpython-bundle-{platform}.zip",
"dir": "DATA_DIR/adafruit/adafruit-circuitpython-bundle-{platform}",
"zip": "DATA_DIR/adafruit-circuitpython-bundle-{platform}.zip",
"url_format": "https://github.com/"
+ TEST_BUNDLE_NAME
+ "/releases/download/{tag}/"
@ -89,18 +82,22 @@ def test_Bundle_lib_dir():
Check the return of Bundle.lib_dir with a test tag.
"""
bundle_data = {TEST_BUNDLE_NAME: "TESTTAG"}
with mock.patch("circup.bundle.tags_data_load", return_value=bundle_data):
with mock.patch("circup.logger.info"), mock.patch(
"circup.os.path.isfile", return_value=True
), mock.patch("circup.tags_data_load", return_value=bundle_data), mock.patch(
"circup.DATA_DIR", "DATA_DIR"
):
bundle = circup.Bundle(TEST_BUNDLE_NAME)
assert bundle.current_tag == "TESTTAG"
assert bundle.lib_dir("py") == (
circup.shared.DATA_DIR + "/"
"DATA_DIR/"
"adafruit/adafruit-circuitpython-bundle-py/"
"adafruit-circuitpython-bundle-py-TESTTAG/lib"
)
assert bundle.lib_dir("9mpy") == (
circup.shared.DATA_DIR + "/"
"adafruit/adafruit-circuitpython-bundle-9mpy/"
"adafruit-circuitpython-bundle-9.x-mpy-TESTTAG/lib"
assert bundle.lib_dir("8mpy") == (
"DATA_DIR/"
"adafruit/adafruit-circuitpython-bundle-8mpy/"
"adafruit-circuitpython-bundle-8.x-mpy-TESTTAG/lib"
)
@ -109,9 +106,15 @@ def test_Bundle_latest_tag():
Check the latest tag gets through Bundle.latest_tag.
"""
bundle_data = {TEST_BUNDLE_NAME: "TESTTAG"}
with mock.patch(
"circup.bundle.get_latest_release_from_url", return_value="BESTESTTAG"
), mock.patch("circup.bundle.tags_data_load", return_value=bundle_data):
with mock.patch("circup.logger.info"), mock.patch(
"circup.os.path.isfile", return_value=True
), mock.patch(
"circup.get_latest_release_from_url", return_value="BESTESTTAG"
), mock.patch(
"circup.tags_data_load", return_value=bundle_data
), mock.patch(
"circup.DATA_DIR", "DATA_DIR"
):
bundle = circup.Bundle(TEST_BUNDLE_NAME)
assert bundle.latest_tag == "BESTESTTAG"
@ -120,18 +123,16 @@ def test_get_bundles_dict():
"""
Check we are getting the bundles list from BUNDLE_CONFIG_FILE.
"""
with mock.patch(
"circup.command_utils.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON
), mock.patch("circup.command_utils.BUNDLE_CONFIG_LOCAL", ""):
bundles_dict = get_bundles_dict()
with mock.patch("circup.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON), mock.patch(
"circup.BUNDLE_CONFIG_LOCAL", ""
):
bundles_dict = circup.get_bundles_dict()
assert bundles_dict == TEST_BUNDLE_DATA
with mock.patch(
"circup.command_utils.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON
), mock.patch(
"circup.command_utils.BUNDLE_CONFIG_LOCAL", TEST_BUNDLE_CONFIG_LOCAL_JSON
with mock.patch("circup.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON), mock.patch(
"circup.BUNDLE_CONFIG_LOCAL", TEST_BUNDLE_CONFIG_LOCAL_JSON
):
bundles_dict = get_bundles_dict()
bundles_dict = circup.get_bundles_dict()
expected_dict = {**TEST_BUNDLE_LOCAL_DATA, **TEST_BUNDLE_DATA}
assert bundles_dict == expected_dict
@ -140,18 +141,16 @@ def test_get_bundles_local_dict():
"""
Check we are getting the bundles list from BUNDLE_CONFIG_LOCAL.
"""
with mock.patch(
"circup.command_utils.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON
), mock.patch("circup.command_utils.BUNDLE_CONFIG_LOCAL", ""):
bundles_dict = get_bundles_dict()
with mock.patch("circup.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON), mock.patch(
"circup.BUNDLE_CONFIG_LOCAL", ""
):
bundles_dict = circup.get_bundles_dict()
assert bundles_dict == TEST_BUNDLE_DATA
with mock.patch(
"circup.command_utils.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON
), mock.patch(
"circup.command_utils.BUNDLE_CONFIG_LOCAL", TEST_BUNDLE_CONFIG_LOCAL_JSON
with mock.patch("circup.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON), mock.patch(
"circup.BUNDLE_CONFIG_LOCAL", TEST_BUNDLE_CONFIG_LOCAL_JSON
):
bundles_dict = get_bundles_dict()
bundles_dict = circup.get_bundles_dict()
expected_dict = {**TEST_BUNDLE_LOCAL_DATA, **TEST_BUNDLE_DATA}
assert bundles_dict == expected_dict
@ -160,9 +159,9 @@ def test_get_bundles_list():
"""
Check we are getting the bundles list from BUNDLE_CONFIG_FILE.
"""
with mock.patch(
"circup.command_utils.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON
), mock.patch("circup.command_utils.BUNDLE_CONFIG_LOCAL", ""):
with mock.patch("circup.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON), mock.patch(
"circup.BUNDLE_CONFIG_LOCAL", ""
):
bundles_list = circup.get_bundles_list()
bundle = circup.Bundle(TEST_BUNDLE_NAME)
assert repr(bundles_list) == repr([bundle])
@ -172,14 +171,14 @@ def test_save_local_bundles():
"""
Pretend to save local bundles.
"""
with mock.patch(
"circup.command_utils.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON
), mock.patch("circup.command_utils.BUNDLE_CONFIG_LOCAL", ""), mock.patch(
"circup.os.unlink"
) as mock_unlink, mock.patch(
"circup.command_utils.json.dump"
with mock.patch("circup.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON), mock.patch(
"circup.BUNDLE_CONFIG_LOCAL", ""
), mock.patch("circup.os.unlink") as mock_unlink, mock.patch(
"circup.json.dump"
) as mock_dump, mock.patch(
"circup.command_utils.open", mock.mock_open()
"circup.json.load", return_value=TEST_BUNDLE_DATA
), mock.patch(
"circup.open", mock.mock_open()
) as mock_open:
final_data = {**TEST_BUNDLE_DATA, **TEST_BUNDLE_LOCAL_DATA}
circup.save_local_bundles(final_data)
@ -191,18 +190,14 @@ def test_save_local_bundles_reset():
"""
Pretend to reset the local bundles.
"""
with mock.patch(
"circup.command_utils.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON
), mock.patch(
"circup.command_utils.BUNDLE_CONFIG_LOCAL", "test/NOTEXISTS"
), mock.patch(
"circup.os.path.isfile", return_value=True
), mock.patch(
with mock.patch("circup.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON), mock.patch(
"circup.BUNDLE_CONFIG_LOCAL", "test/NOTEXISTS"
), mock.patch("circup.os.path.isfile", return_value=True), mock.patch(
"circup.os.unlink"
) as mock_unlink, mock.patch(
"circup.command_utils.json.load", return_value=TEST_BUNDLE_DATA
"circup.json.load", return_value=TEST_BUNDLE_DATA
), mock.patch(
"circup.command_utils.open", mock.mock_open()
"circup.open", mock.mock_open()
) as mock_open:
circup.save_local_bundles({})
mock_open().write.assert_not_called()
@ -222,13 +217,13 @@ def test_Module_init_file_module():
with mock.patch("circup.logger.info") as mock_logger, mock.patch(
"circup.os.path.isfile", return_value=True
), mock.patch(
"circup.bundle.Bundle.lib_dir",
), mock.patch("circup.CPY_VERSION", "4.1.2"), mock.patch(
"circup.Bundle.lib_dir",
return_value="tests",
):
backend = DiskBackend("mock_device", mock_logger)
bundle = circup.Bundle(TEST_BUNDLE_NAME)
m = Module(
m = circup.Module(
name,
backend,
repo,
@ -255,17 +250,21 @@ def test_Module_init_directory_module():
directory based Python module.
"""
name = "dir_module/"
path = os.path.join("tests", "mock_device", "lib", f"{name}", "")
path = os.path.join("mock_device", "lib", f"{name}", "")
repo = "https://github.com/adafruit/SomeLibrary.git"
device_version = "1.2.3"
bundle_version = "3.2.1"
mpy = True
with mock.patch("circup.logger.info") as mock_logger, mock.patch(
"circup.bundle.Bundle.lib_dir", return_value="tests"
"circup.os.path.isfile", return_value=False
), mock.patch("circup.CPY_VERSION", "4.1.2"), mock.patch(
"circup.DATA_DIR", "/tests/DATA_DIR"
), mock.patch(
"circup.Bundle.lib_dir", return_value="tests"
):
backend = DiskBackend("tests/mock_device", mock_logger)
backend = DiskBackend("mock_device", mock_logger)
bundle = circup.Bundle(TEST_BUNDLE_NAME)
m = Module(
m = circup.Module(
name,
backend,
repo,
@ -296,11 +295,17 @@ def test_Module_outofdate():
name = "module.py"
repo = "https://github.com/adafruit/SomeLibrary.git"
with mock.patch("circup.logger.info") as mock_logger:
backend = DiskBackend("tests/mock_device", mock_logger)
m1 = Module(name, backend, repo, "1.2.3", "3.2.1", False, bundle, (None, None))
m2 = Module(name, backend, repo, "1.2.3", "1.2.3", False, bundle, (None, None))
backend = DiskBackend("mock_device", mock_logger)
m1 = circup.Module(
name, backend, repo, "1.2.3", "3.2.1", False, bundle, (None, None)
)
m2 = circup.Module(
name, backend, repo, "1.2.3", "1.2.3", False, bundle, (None, None)
)
# shouldn't happen!
m3 = Module(name, backend, repo, "3.2.1", "1.2.3", False, bundle, (None, None))
m3 = circup.Module(
name, backend, repo, "3.2.1", "1.2.3", False, bundle, (None, None)
)
assert m1.outofdate is True
assert m2.outofdate is False
assert m3.outofdate is False
@ -320,8 +325,8 @@ def test_Module_outofdate_bad_versions():
bundle_version = "3.2.1"
with mock.patch("circup.logger.warning") as mock_logger:
backend = DiskBackend("tests/mock_device", mock_logger)
m = Module(
backend = DiskBackend("mock_device", mock_logger)
m = circup.Module(
name,
backend,
repo,
@ -344,11 +349,15 @@ def test_Module_mpy_mismatch():
"""
name = "module.py"
repo = "https://github.com/adafruit/SomeLibrary.git"
with mock.patch("circup.logger.warning") as mock_logger:
backend = DiskBackend("tests/mock_device", mock_logger)
with mock.patch("circup.CPY_VERSION", "8.0.0"), mock.patch(
"circup.logger.warning"
) as mock_logger:
backend = DiskBackend("mock_device", mock_logger)
bundle = circup.Bundle(TEST_BUNDLE_NAME)
m1 = Module(name, backend, repo, "1.2.3", "1.2.3", True, bundle, (None, None))
m2 = Module(
m1 = circup.Module(
name, backend, repo, "1.2.3", "1.2.3", True, bundle, (None, None)
)
m2 = circup.Module(
name,
backend,
repo,
@ -358,39 +367,23 @@ def test_Module_mpy_mismatch():
bundle,
("7.0.0-alpha.1", "8.99.99"),
)
m3 = Module(
m3 = circup.Module(
name, backend, repo, "1.2.3", "1.2.3", True, bundle, (None, "7.0.0-alpha.1")
)
with mock.patch(
"circup.backends.DiskBackend.get_circuitpython_version",
return_value=("6.2.0", ""),
):
with mock.patch("circup.CPY_VERSION", "6.2.0"):
assert m1.mpy_mismatch is False
assert m1.outofdate is False
assert m2.mpy_mismatch is True
assert m2.outofdate is True
assert m3.mpy_mismatch is False
assert m3.outofdate is False
with mock.patch(
"circup.backends.DiskBackend.get_circuitpython_version",
return_value=("8.0.0", ""),
):
with mock.patch("circup.CPY_VERSION", "8.0.0"):
assert m1.mpy_mismatch is False
assert m1.outofdate is False
assert m2.mpy_mismatch is False
assert m2.outofdate is False
assert m3.mpy_mismatch is True
assert m3.outofdate is True
with mock.patch(
"circup.backends.DiskBackend.get_circuitpython_version",
return_value=("9.0.0", ""),
):
assert m1.mpy_mismatch is False
assert m1.outofdate is False
assert m2.mpy_mismatch is True
assert m2.outofdate is True
assert m3.mpy_mismatch is True
assert m3.outofdate is True
def test_Module_major_update_bad_versions():
@ -408,7 +401,7 @@ def test_Module_major_update_bad_versions():
with mock.patch("circup.logger.warning") as mock_logger:
backend = DiskBackend("mock_device", mock_logger)
m = Module(
m = circup.Module(
name,
backend,
repo,
@ -431,16 +424,19 @@ def test_Module_row():
name = "module.py"
repo = "https://github.com/adafruit/SomeLibrary.git"
with mock.patch("circup.os.path.isfile", return_value=True), mock.patch(
"circup.backends.DiskBackend.get_circuitpython_version",
return_value=("9.0.0", ""),
"circup.CPY_VERSION", "8.0.0"
), mock.patch("circup.logger.warning") as mock_logger:
backend = DiskBackend("mock_device", mock_logger)
m = Module(name, backend, repo, "1.2.3", None, False, bundle, (None, None))
m = circup.Module(
name, backend, repo, "1.2.3", None, False, bundle, (None, None)
)
assert m.row == ("module", "1.2.3", "unknown", "Major Version")
m = Module(name, backend, repo, "1.2.3", "1.3.4", False, bundle, (None, None))
m = circup.Module(
name, backend, repo, "1.2.3", "1.3.4", False, bundle, (None, None)
)
assert m.row == ("module", "1.2.3", "1.3.4", "Minor Version")
m = Module(
name, backend, repo, "1.2.3", "1.2.3", True, bundle, ("8.0.0", "9.0.0")
m = circup.Module(
name, backend, repo, "1.2.3", "1.2.3", True, bundle, ("9.0.0", None)
)
assert m.row == ("module", "1.2.3", "1.2.3", "MPY Format")
@ -456,10 +452,10 @@ def test_Module_update_dir():
device_version = "1.2.3"
bundle_version = None
with mock.patch("circup.backends.shutil") as mock_shutil, mock.patch(
"circup.logger.warning"
) as mock_logger:
backend = DiskBackend("tests/mock_device", mock_logger)
m = Module(
"circup.os.path.isdir", return_value=True
), mock.patch("circup.logger.warning") as mock_logger:
backend = DiskBackend("mock_device", mock_logger)
m = circup.Module(
name,
backend,
repo,
@ -488,9 +484,13 @@ def test_Module_update_file():
with mock.patch("circup.backends.shutil") as mock_shutil, mock.patch(
"circup.os.remove"
) as mock_remove, mock.patch("circup.logger.warning") as mock_logger:
backend = circup.DiskBackend("tests/mock_device", mock_logger)
m = Module(
) as mock_remove, mock.patch(
"circup.os.path.isdir", return_value=False
), mock.patch(
"circup.logger.warning"
) as mock_logger:
backend = circup.DiskBackend("mock_device", mock_logger)
m = circup.Module(
name,
backend,
repo,
@ -515,14 +515,13 @@ def test_Module_repr():
device_version = "1.2.3"
bundle_version = "3.2.1"
with mock.patch("circup.os.path.isfile", return_value=True), mock.patch(
"circup.backends.DiskBackend.get_circuitpython_version",
return_value=("4.1.2", ""),
"circup.CPY_VERSION", "4.1.2"
), mock.patch("circup.Bundle.lib_dir", return_value="tests"), mock.patch(
"circup.logger.warning"
) as mock_logger:
bundle = circup.Bundle(TEST_BUNDLE_NAME)
backend = circup.DiskBackend("mock_device", mock_logger)
m = Module(
m = circup.Module(
name,
backend,
repo,
@ -556,8 +555,8 @@ def test_find_device_posix_exists():
with open("tests/mount_exists.txt", "rb") as fixture_file:
fixture = fixture_file.read()
with mock.patch("os.name", "posix"):
with mock.patch("circup.command_utils.check_output", return_value=fixture):
assert find_device() == "/media/ntoll/CIRCUITPY"
with mock.patch("circup.check_output", return_value=fixture):
assert circup.find_device() == "/media/ntoll/CIRCUITPY"
def test_find_device_posix_no_mount_command():
@ -569,10 +568,8 @@ def test_find_device_posix_no_mount_command():
with open("tests/mount_exists.txt", "rb") as fixture_file:
fixture = fixture_file.read()
mock_check = mock.MagicMock(side_effect=[FileNotFoundError, fixture])
with mock.patch("os.name", "posix"), mock.patch(
"circup.command_utils.check_output", mock_check
):
assert find_device() == "/media/ntoll/CIRCUITPY"
with mock.patch("os.name", "posix"), mock.patch("circup.check_output", mock_check):
assert circup.find_device() == "/media/ntoll/CIRCUITPY"
assert mock_check.call_count == 2
assert mock_check.call_args_list[0][0][0] == "mount"
assert mock_check.call_args_list[1][0][0] == "/sbin/mount"
@ -586,9 +583,9 @@ def test_find_device_posix_missing():
with open("tests/mount_missing.txt", "rb") as fixture_file:
fixture = fixture_file.read()
with mock.patch("os.name", "posix"), mock.patch(
"circup.command_utils.check_output", return_value=fixture
"circup.check_output", return_value=fixture
):
assert find_device() is None
assert circup.find_device() is None
def test_find_device_nt_exists():
@ -605,7 +602,7 @@ def test_find_device_nt_exists():
"os.path.exists", return_value=True
), mock.patch("ctypes.create_unicode_buffer", return_value=fake_buffer):
ctypes.windll = mock_windll
assert find_device() == "A:\\"
assert circup.find_device() == "A:\\"
def test_find_device_nt_missing():
@ -622,7 +619,7 @@ def test_find_device_nt_missing():
"os.path.exists", return_value=True
), mock.patch("ctypes.create_unicode_buffer", return_value=fake_buffer):
ctypes.windll = mock_windll
assert find_device() is None
assert circup.find_device() is None
def test_find_device_unknown_os():
@ -631,7 +628,7 @@ def test_find_device_unknown_os():
"""
with mock.patch("os.name", "foo"):
with pytest.raises(NotImplementedError) as ex:
find_device()
circup.find_device()
assert ex.value.args[0] == 'OS "foo" not supported.'
@ -646,8 +643,8 @@ def test_get_latest_release_from_url():
"/Adafruit_CircuitPython_Bundle/releases/tag/20190903"
}
expected_url = "https://github.com/" + TEST_BUNDLE_NAME + "/releases/latest"
with mock.patch("circup.shared.requests.head", return_value=response) as mock_get:
result = circup.get_latest_release_from_url(expected_url, logger)
with mock.patch("circup.requests.head", return_value=response) as mock_get:
result = circup.get_latest_release_from_url(expected_url)
assert result == "20190903"
mock_get.assert_called_once_with(expected_url, timeout=mock.ANY)
@ -713,7 +710,9 @@ def test_find_modules():
with mock.patch(
"circup.DiskBackend.get_device_versions", return_value=device_modules
), mock.patch(
"circup.command_utils.get_bundle_versions", return_value=bundle_modules
"circup.get_bundle_versions", return_value=bundle_modules
), mock.patch(
"circup.os.path.isfile", return_value=True
), mock.patch(
"circup.logger.warning"
) as mock_logger:
@ -739,7 +738,7 @@ def test_find_modules_goes_bang():
"""
with mock.patch(
"circup.DiskBackend.get_device_versions", side_effect=Exception("BANG!")
), mock.patch("circup.command_utils.click") as mock_click, mock.patch(
), mock.patch("circup.click") as mock_click, mock.patch(
"circup.sys.exit"
) as mock_exit, mock.patch(
"circup.logger.warning"
@ -757,19 +756,14 @@ def test_get_bundle_versions():
Ensure get_modules is called with the path for the library bundle.
Ensure ensure_latest_bundle is called even if lib_dir exists.
"""
with mock.patch(
"circup.command_utils.ensure_latest_bundle"
) as mock_elb, mock.patch(
"circup.command_utils._get_modules_file", return_value={"ok": {"name": "ok"}}
) as mock_gm, mock.patch(
"circup.backends.DiskBackend.get_circuitpython_version",
return_value=("4.1.2", ""),
), mock.patch(
"circup.bundle.Bundle.lib_dir", return_value="foo/bar/lib"
with mock.patch("circup.ensure_latest_bundle") as mock_elb, mock.patch(
"circup._get_modules_file", return_value={"ok": {"name": "ok"}}
) as mock_gm, mock.patch("circup.CPY_VERSION", "4.1.2"), mock.patch(
"circup.Bundle.lib_dir", return_value="foo/bar/lib"
), mock.patch(
"circup.os.path.isdir", return_value=True
), mock.patch(
"circup.command_utils.logger"
"circup.logger"
) as mock_logger:
bundle = circup.Bundle(TEST_BUNDLE_NAME)
bundles_list = [bundle]
@ -785,17 +779,12 @@ def test_get_bundle_versions_avoid_download():
When avoid_download is True and lib_dir exists, don't ensure_latest_bundle.
Testing both cases: lib_dir exists and lib_dir doesn't exists.
"""
with mock.patch(
"circup.command_utils.ensure_latest_bundle"
) as mock_elb, mock.patch(
"circup.command_utils._get_modules_file", return_value={"ok": {"name": "ok"}}
) as mock_gm, mock.patch(
"circup.backends.DiskBackend.get_circuitpython_version",
return_value=("4.1.2", ""),
), mock.patch(
with mock.patch("circup.ensure_latest_bundle") as mock_elb, mock.patch(
"circup._get_modules_file", return_value={"ok": {"name": "ok"}}
) as mock_gm, mock.patch("circup.CPY_VERSION", "4.1.2"), mock.patch(
"circup.Bundle.lib_dir", return_value="foo/bar/lib"
), mock.patch(
"circup.command_utils.logger"
"circup.logger"
) as mock_logger:
bundle = circup.Bundle(TEST_BUNDLE_NAME)
bundles_list = [bundle]
@ -821,7 +810,7 @@ def test_get_circuitpython_version():
with mock.patch("circup.logger.warning") as mock_logger:
backend = DiskBackend("tests/mock_device", mock_logger)
assert backend.get_circuitpython_version() == (
"9.0.0",
"4.1.0",
"this_is_a_board",
)
@ -833,9 +822,9 @@ def test_get_device_versions():
with mock.patch(
"circup.DiskBackend.get_modules", return_value="ok"
) as mock_gm, mock.patch("circup.logger.warning") as mock_logger:
backend = circup.DiskBackend("tests/mock_device", mock_logger)
backend = circup.DiskBackend("mock_device", mock_logger)
assert backend.get_device_versions() == "ok"
mock_gm.assert_called_once_with(os.path.join("tests", "mock_device", "lib"))
mock_gm.assert_called_once_with(os.path.join("mock_device", "lib"))
def test_get_modules_empty_path():
@ -858,7 +847,7 @@ def test_get_modules_that_are_files():
os.path.join("tests", "local_module.py"),
os.path.join("tests", ".hidden_module.py"),
]
with mock.patch("circup.shared.glob.glob", side_effect=[mods, [], []]), mock.patch(
with mock.patch("circup.glob.glob", side_effect=[mods, [], []]), mock.patch(
"circup.logger.warning"
) as mock_logger:
backend = circup.DiskBackend("mock_device", mock_logger)
@ -885,7 +874,7 @@ def test_get_modules_that_are_directories():
]
mod_files = ["tests/dir_module/my_module.py", "tests/dir_module/__init__.py"]
with mock.patch(
"circup.shared.glob.glob", side_effect=[[], [], mods, mod_files, []]
"circup.glob.glob", side_effect=[[], [], mods, mod_files, []]
), mock.patch("circup.logger.warning") as mock_logger:
backend = circup.DiskBackend("mock_device", mock_logger)
result = backend.get_modules(path)
@ -906,7 +895,7 @@ def test_get_modules_that_are_directories_with_no_metadata():
mods = [os.path.join("tests", "bad_module", "")]
mod_files = ["tests/bad_module/my_module.py", "tests/bad_module/__init__.py"]
with mock.patch(
"circup.shared.glob.glob", side_effect=[[], [], mods, mod_files, []]
"circup.glob.glob", side_effect=[[], [], mods, mod_files, []]
), mock.patch("circup.logger.warning") as mock_logger:
backend = circup.DiskBackend("mock_device", mock_logger)
result = backend.get_modules(path)
@ -922,15 +911,15 @@ def test_ensure_latest_bundle_no_bundle_data():
If there's no BUNDLE_DATA file (containing previous current version of the
bundle) then default to update.
"""
with mock.patch("circup.bundle.Bundle.latest_tag", "12345"), mock.patch(
with mock.patch("circup.Bundle.latest_tag", "12345"), mock.patch(
"circup.os.path.isfile", return_value=False
), mock.patch("circup.command_utils.get_bundle") as mock_gb, mock.patch(
"circup.command_utils.json"
), mock.patch("circup.get_bundle") as mock_gb, mock.patch(
"circup.json"
) as mock_json, mock.patch(
"circup.command_utils.open"
"circup.open"
):
bundle = circup.Bundle(TEST_BUNDLE_NAME)
ensure_latest_bundle(bundle)
circup.ensure_latest_bundle(bundle)
mock_gb.assert_called_once_with(bundle, "12345")
assert mock_json.dump.call_count == 1 # Current version saved to file.
@ -941,21 +930,23 @@ def test_ensure_latest_bundle_bad_bundle_data():
bundle) but it has been corrupted (which has sometimes happened during
manual testing) then default to update.
"""
with mock.patch("circup.bundle.Bundle.latest_tag", "12345"), mock.patch(
"circup.command_utils.open"
), mock.patch("circup.command_utils.get_bundle") as mock_gb, mock.patch(
"builtins.open", mock.mock_open(read_data="}{INVALID_JSON")
with mock.patch("circup.Bundle.latest_tag", "12345"), mock.patch(
"circup.os.path.isfile", return_value=True
), mock.patch("circup.open"), mock.patch(
"circup.get_bundle"
) as mock_gb, mock.patch(
"circup.json.load", side_effect=json.decoder.JSONDecodeError("BANG!", "doc", 1)
), mock.patch(
"circup.command_utils.json.dump"
"circup.json.dump"
), mock.patch(
"circup.bundle.logger"
"circup.logger"
) as mock_logger:
bundle = circup.Bundle(TEST_BUNDLE_NAME)
ensure_latest_bundle(bundle)
circup.ensure_latest_bundle(bundle)
mock_gb.assert_called_once_with(bundle, "12345")
assert mock_logger.error.call_count == 1
assert mock_logger.exception.call_count == 1
# wrong file is opened twice (one at __init__, one at save())
assert mock_logger.error.call_count == 2
assert mock_logger.exception.call_count == 2
def test_ensure_latest_bundle_to_update():
@ -963,14 +954,16 @@ def test_ensure_latest_bundle_to_update():
If the version found in the BUNDLE_DATA is out of date, then cause an
update to the bundle.
"""
with mock.patch("circup.bundle.Bundle.latest_tag", "54321"), mock.patch(
"circup.command_utils.open"
), mock.patch("circup.command_utils.get_bundle") as mock_gb, mock.patch(
"circup.command_utils.json"
with mock.patch("circup.Bundle.latest_tag", "54321"), mock.patch(
"circup.os.path.isfile", return_value=True
), mock.patch("circup.open"), mock.patch(
"circup.get_bundle"
) as mock_gb, mock.patch(
"circup.json"
) as mock_json:
mock_json.load.return_value = {TEST_BUNDLE_NAME: "12345"}
bundle = circup.Bundle(TEST_BUNDLE_NAME)
ensure_latest_bundle(bundle)
circup.ensure_latest_bundle(bundle)
mock_gb.assert_called_once_with(bundle, "54321")
assert mock_json.dump.call_count == 1 # Current version saved to file.
@ -986,18 +979,17 @@ def test_ensure_latest_bundle_to_update_http_error():
# ), mock.patch(
"circup.os.path.isfile",
return_value=True,
), mock.patch("circup.command_utils.open"), mock.patch(
"circup.command_utils.get_bundle",
side_effect=requests.exceptions.HTTPError("404"),
), mock.patch("circup.open"), mock.patch(
"circup.get_bundle", side_effect=requests.exceptions.HTTPError("404")
) as mock_gb, mock.patch(
"circup.command_utils.json"
"circup.json"
) as mock_json, mock.patch(
"circup.click.secho"
) as mock_click:
circup.Bundle.tags_data = dict()
mock_json.load.return_value = tags_data
bundle = circup.Bundle(TEST_BUNDLE_NAME)
ensure_latest_bundle(bundle)
circup.ensure_latest_bundle(bundle)
mock_gb.assert_called_once_with(bundle, "54321")
assert mock_json.dump.call_count == 0 # not saved.
assert mock_click.call_count == 1 # friendly message.
@ -1008,20 +1000,20 @@ def test_ensure_latest_bundle_no_update():
If the version found in the BUNDLE_DATA is NOT out of date, just log the
fact and don't update.
"""
with mock.patch("circup.bundle.Bundle.latest_tag", "12345"), mock.patch(
"circup.command_utils.os.path.isdir", return_value=True
), mock.patch("circup.command_utils.open"), mock.patch(
"circup.command_utils.get_bundle"
with mock.patch("circup.Bundle.latest_tag", "12345"), mock.patch(
"circup.os.path.isfile", return_value=True
), mock.patch("circup.os.path.isdir", return_value=True), mock.patch(
"circup.open"
), mock.patch(
"circup.get_bundle"
) as mock_gb, mock.patch(
"circup.command_utils.os.path.isfile", return_value=True
), mock.patch(
"circup.bundle.Bundle.current_tag", "12345"
), mock.patch(
"circup.command_utils.logger"
"circup.json"
) as mock_json, mock.patch(
"circup.logger"
) as mock_logger:
mock_json.load.return_value = {TEST_BUNDLE_NAME: "12345"}
bundle = circup.Bundle(TEST_BUNDLE_NAME)
ensure_latest_bundle(bundle)
circup.ensure_latest_bundle(bundle)
assert mock_gb.call_count == 0
assert mock_logger.info.call_count == 2
@ -1037,25 +1029,25 @@ def test_get_bundle():
mock_progress = mock.MagicMock()
mock_progress().__enter__ = mock.MagicMock(return_value=["a", "b", "c"])
mock_progress().__exit__ = mock.MagicMock()
with mock.patch("circup.command_utils.requests") as mock_requests, mock.patch(
with mock.patch("circup.requests") as mock_requests, mock.patch(
"circup.click"
) as mock_click, mock.patch(
"circup.command_utils.open", mock.mock_open()
"circup.open", mock.mock_open()
) as mock_open, mock.patch(
"circup.os.path.isdir", return_value=True
), mock.patch(
"circup.command_utils.shutil"
"circup.shutil"
) as mock_shutil, mock.patch(
"circup.command_utils.zipfile"
"circup.zipfile"
) as mock_zipfile:
mock_click.progressbar = mock_progress
mock_requests.get().status_code = mock_requests.codes.ok
mock_requests.get.reset_mock()
tag = "12345"
bundle = circup.Bundle(TEST_BUNDLE_NAME)
get_bundle(bundle, tag)
circup.get_bundle(bundle, tag)
# how many bundles currently supported. i.e. 6x.mpy, 7x.mpy, py = 3 bundles
_bundle_count = len(PLATFORMS)
_bundle_count = len(circup.PLATFORMS)
assert mock_requests.get.call_count == _bundle_count
assert mock_open.call_count == _bundle_count
assert mock_shutil.rmtree.call_count == _bundle_count
@ -1068,9 +1060,9 @@ def test_get_bundle_network_error():
Ensure that if there is a network related error when grabbing the bundle
then the error is logged and re-raised for the HTTP status code.
"""
with mock.patch("circup.command_utils.requests") as mock_requests, mock.patch(
"circup.shared.tags_data_load", return_value=dict()
), mock.patch("circup.command_utils.logger") as mock_logger:
with mock.patch("circup.requests") as mock_requests, mock.patch(
"circup.tags_data_load", return_value=dict()
), mock.patch("circup.logger") as mock_logger:
# Force failure with != requests.codes.ok
mock_requests.get().status_code = mock_requests.codes.BANG
# Ensure raise_for_status actually raises an exception.
@ -1079,7 +1071,7 @@ def test_get_bundle_network_error():
tag = "12345"
with pytest.raises(Exception) as ex:
bundle = circup.Bundle(TEST_BUNDLE_NAME)
get_bundle(bundle, tag)
circup.get_bundle(bundle, tag)
assert ex.value.args[0] == "Bang!"
url = (
"https://github.com/" + TEST_BUNDLE_NAME + "/releases/download"
@ -1096,9 +1088,7 @@ def test_show_command():
"""
runner = CliRunner()
test_bundle_modules = ["one.py", "two.py", "three.py"]
with mock.patch(
"circup.commands.get_bundle_versions", return_value=test_bundle_modules
):
with mock.patch("circup.get_bundle_versions", return_value=test_bundle_modules):
result = runner.invoke(circup.show)
assert result.exit_code == 0
assert all(m.replace(".py", "") in result.output for m in test_bundle_modules)
@ -1110,9 +1100,7 @@ def test_show_match_command():
"""
runner = CliRunner()
test_bundle_modules = ["one.py", "two.py", "three.py"]
with mock.patch(
"circup.commands.get_bundle_versions", return_value=test_bundle_modules
):
with mock.patch("circup.get_bundle_versions", return_value=test_bundle_modules):
result = runner.invoke(circup.show, ["t"])
assert result.exit_code == 0
assert "one" not in result.output
@ -1120,193 +1108,46 @@ def test_show_match_command():
def test_show_match_py_command():
"""
Check that py does not match the .py extension in the module names
Check that py does not match the .py extention in the module names
"""
runner = CliRunner()
test_bundle_modules = ["one.py", "two.py", "three.py"]
with mock.patch(
"circup.commands.get_bundle_versions", return_value=test_bundle_modules
):
with mock.patch("circup.get_bundle_versions", return_value=test_bundle_modules):
result = runner.invoke(circup.show, ["py"])
assert result.exit_code == 0
assert "0 shown" in result.output
def test_imports_from_code():
def test_libraries_from_imports():
"""Ensure that various styles of import all work"""
mod_names = [
"adafruit_bus_device",
"adafruit_button",
"adafruit_display_shapes",
"adafruit_display_text",
"adafruit_esp32spi",
"adafruit_hid",
"adafruit_oauth2",
"adafruit_requests",
"adafruit_touchscreen",
]
test_file = str(pathlib.Path(__file__).parent / "import_styles.py")
with open(test_file, "r", encoding="utf8") as fp:
test_data = fp.read()
result = imports_from_code(test_data)
print(result)
result = circup.libraries_from_code_py(test_file, mod_names)
assert result == [
"adafruit_bus_device",
"adafruit_button",
"adafruit_button.Button",
"adafruit_display_text",
"adafruit_display_text.wrap_text_to_lines",
"adafruit_display_text.wrap_text_to_pixels",
"adafruit_esp32spi",
"adafruit_esp32spi.adafruit_esp32spi_socketpool",
"adafruit_hid",
"adafruit_hid.consumer_control",
"import_styles_sub",
"os",
"sys",
]
def test_get_all_imports():
"""List all libraries from auto file recursively"""
mod_names = [
"adafruit_bus_device",
"adafruit_button",
"adafruit_display_shapes",
"adafruit_display_text",
"adafruit_esp32spi",
"adafruit_hid",
"adafruit_oauth2",
"adafruit_requests",
"adafruit_touchscreen",
"adafruit_ntp",
]
with mock.patch("circup.logger.info") as mock_logger, mock.patch(
"circup.os.path.isfile", return_value=True
), mock.patch(
"circup.bundle.Bundle.lib_dir",
return_value="tests",
):
tests_dir = pathlib.Path(__file__).parent
backend = DiskBackend(tests_dir / "mock_device", mock_logger)
test_file = str(tests_dir / "import_styles.py")
with open(test_file, "r", encoding="utf8") as fp:
test_data = fp.read()
result = get_all_imports(
backend,
test_data,
os.path.join(backend.device_location, "import_styles.py"),
mod_names,
current_module="",
)
assert result == [
"adafruit_bus_device",
"adafruit_button",
"adafruit_display_text",
"adafruit_esp32spi",
"adafruit_hid",
"adafruit_ntp",
]
def test_libraries_from_auto_file_local():
"""Check that we get all libraries from auto file argument.
Testing here with a local file"""
mod_names = [
"adafruit_bus_device",
"adafruit_button",
"adafruit_display_shapes",
"adafruit_display_text",
"adafruit_esp32spi",
"adafruit_hid",
"adafruit_oauth2",
"adafruit_requests",
"adafruit_touchscreen",
"adafruit_ntp",
]
auto_file = "apps/test_app/import_styles.py"
with mock.patch("circup.logger.info") as mock_logger, mock.patch(
"circup.os.path.isfile", return_value=True
), mock.patch(
"circup.bundle.Bundle.lib_dir",
return_value="tests",
):
tests_dir = pathlib.Path(__file__).parent
backend = DiskBackend(tests_dir / "mock_device", mock_logger)
result = libraries_from_auto_file(backend, auto_file, mod_names)
assert result == [
"adafruit_bus_device",
"adafruit_button",
"adafruit_display_text",
"adafruit_esp32spi",
"adafruit_hid",
"adafruit_ntp",
]
def test_libraries_from_auto_file_board():
"""Check that we find code.py on the board if we give no auto_file argument"""
mod_names = [
"adafruit_bus_device",
"adafruit_button",
"adafruit_display_shapes",
"adafruit_display_text",
"adafruit_esp32spi",
"adafruit_ssd1675",
"adafruit_spd1656",
"adafruit_spd1608",
"adafruit_touchscreen",
"adafruit_ntp",
]
auto_file = None
with mock.patch("circup.logger.info") as mock_logger, mock.patch(
"circup.os.path.isfile", return_value=True
), mock.patch(
"circup.bundle.Bundle.lib_dir",
return_value="tests",
):
tests_dir = pathlib.Path(__file__).parent
backend = DiskBackend(tests_dir / "mock_device_2", mock_logger)
result = libraries_from_auto_file(backend, auto_file, mod_names)
assert result == [
"adafruit_ntp",
"adafruit_spd1608",
"adafruit_spd1656",
"adafruit_ssd1675",
]
def test_libraries_from_auto_file_none():
"""Check that we exit if we give no auto_file argument
and there's no default code file"""
mod_names = []
auto_file = None
with mock.patch("circup.logger.info") as mock_logger, mock.patch(
"circup.os.path.isfile", return_value=True
), mock.patch(
"circup.bundle.Bundle.lib_dir",
return_value="tests",
):
tests_dir = pathlib.Path(__file__).parent
backend = DiskBackend(tests_dir / "mock_device", mock_logger)
try:
libraries_from_auto_file(backend, auto_file, mod_names)
raise Exception("Did not call exit")
except SystemExit as ex:
assert ex.code == 1
def test_install_auto_file_bad():
"""Ensure that we catch an error when parsing auto file"""
def test_libraries_from_imports_bad():
"""Ensure that we catch an import error"""
TEST_BUNDLE_MODULES = {"one.py": {}, "two.py": {}, "three.py": {}}
runner = CliRunner()
with mock.patch(
"circup.commands.get_bundle_versions", return_value=TEST_BUNDLE_MODULES
):
with mock.patch("circup.get_bundle_versions", return_value=TEST_BUNDLE_MODULES):
result = runner.invoke(
circup.main,
[