Merge pull request #163 from vladak/web_workflow

add support for web workflow
This commit is contained in:
Scott Shawcroft 2024-03-05 10:45:52 -08:00 committed by GitHub
commit 8f1accc568
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 1447 additions and 479 deletions

5
.isort.cfg Normal file
View file

@ -0,0 +1,5 @@
# SPDX-FileCopyrightText: 2023 Vladimír Kotal
#
# SPDX-License-Identifier: Unlicense
[settings]
profile = black

View file

@ -3,6 +3,11 @@
# SPDX-License-Identifier: Unlicense
repos:
- repo: https://github.com/python/black
rev: 22.3.0
hooks:
- id: black
exclude: "^tests/bad_python.py$"
- repo: https://github.com/pycqa/pylint
rev: v2.15.5
hooks:
@ -16,11 +21,6 @@ repos:
name: lint (code)
types: [python]
exclude: "^(docs/|examples/|setup.py$|tests/bad_python.py$)"
- repo: https://github.com/python/black
rev: 22.3.0
hooks:
- id: black
exclude: "^tests/bad_python.py$"
- repo: https://github.com/fsfe/reuse-tool
rev: v0.14.0
hooks:

View file

@ -89,24 +89,32 @@ To get help, just type the command::
A tool to manage and update libraries on a CircuitPython device.
Options:
--verbose Comprehensive logging is sent to stdout.
--version Show the version and exit.
--path DIRECTORY Path to CircuitPython directory. Overrides automatic
path detection.
--help Show this message and exit.
-r --requirement Supports requirements.txt tracking of library
requirements with freeze and install commands.
--verbose Comprehensive logging is sent to stdout.
--path DIRECTORY Path to CircuitPython directory. Overrides automatic
path detection.
--host TEXT Hostname or IP address of a device. Overrides automatic
path detection.
--password TEXT Password to use for authentication when --host is used.
--board-id TEXT Manual Board ID of the CircuitPython device. If provided
in combination with --cpy-version, it overrides the
detected board ID.
--cpy-version TEXT Manual CircuitPython version. If provided in combination
with --board-id, it overrides the detected CPy version.
--version Show the version and exit.
--help Show this message and exit.
Commands:
freeze Output details of all the modules found on the connected...
install Installs .mpy version of named module(s) onto the device.
install --py Installs .py version of named module(s).
list Lists all out of date modules found on the connected...
show Show the long list of all available modules in the bundle.
show <query> Search the names in the modules in the bundle for a match.
uninstall Uninstall a named module(s) from the connected device.
update Update modules on the device. Use --all to automatically update
all modules.
bundle-add Add bundles to the local bundles list, by "user/repo"...
bundle-remove Remove one or more bundles from the local bundles list.
bundle-show Show the list of bundles, default and local, with URL,...
freeze Output details of all the modules found on the connected...
install Install a named module(s) onto the device.
list Lists all out of date modules found on the connected...
show Show a list of available modules in the bundle.
uninstall Uninstall a named module(s) from the connected device.
update Update modules on the device. Use --all to automatically
update all modules without Major Version warnings.
To automatically install all modules imported by ``code.py``,
@ -221,6 +229,12 @@ The ``--version`` flag will tell you the current version of the
$ circup --version
CircUp, A CircuitPython module updater. Version 0.0.1
To use circup via the `Web Workflow <https://learn.adafruit.com/getting-started-with-web-workflow-using-the-code-editor>`_. on devices that support it. Use the ``--host`` and ``--password`` arguments before your circup command.::
$ circup --host 192.168.1.119 --password s3cr3t install adafruit_hid
$ circup --host cpy-9573b2.local --password s3cr3t install adafruit_hid
That's it!

View file

@ -5,17 +5,21 @@
CircUp -- a utility to manage and update libraries on a CircuitPython device.
"""
import ctypes
import glob
import json
import logging
import time
from logging.handlers import RotatingFileHandler
import os
import re
import shutil
from subprocess import check_output
import socket
import sys
import tempfile
import zipfile
from subprocess import check_output
from urllib.parse import urlparse, urljoin
import appdirs
import click
@ -23,15 +27,20 @@ import findimports
import pkg_resources
import requests
import toml
from semver import VersionInfo
import update_checker
from requests.auth import HTTPBasicAuth
from semver import VersionInfo
from circup.shared import DATA_DIR, BAD_FILE_FORMAT, extract_metadata, _get_modules_file
from circup.backends import WebBackend, DiskBackend
#: The version of CircuitPython found on the connected device.
CPY_VERSION = ""
# Useful constants.
#: Flag to indicate if the command is being run in verbose mode.
VERBOSE = False
#: The location of data files used by circup (following OS conventions).
DATA_DIR = appdirs.user_data_dir(appname="circup", appauthor="adafruit")
#: The path to the JSON file containing the metadata about the bundles.
BUNDLE_CONFIG_FILE = pkg_resources.resource_filename(
"circup", "config/bundle_config.json"
@ -46,6 +55,7 @@ BUNDLE_DATA = os.path.join(DATA_DIR, "circup.json")
LOG_DIR = appdirs.user_log_dir(appname="circup", appauthor="adafruit")
#: The location of the log file for the utility.
LOGFILE = os.path.join(LOG_DIR, "circup.log")
#: The libraries (and blank lines) which don't go on devices
NOT_MCU_LIBRARIES = [
"",
@ -56,14 +66,12 @@ NOT_MCU_LIBRARIES = [
"circuitpython_typing",
"pyserial",
]
#: The version of CircuitPython found on the connected device.
CPY_VERSION = ""
#: Module formats list (and the other form used in github files)
PLATFORMS = {"py": "py", "8mpy": "8.x-mpy", "9mpy": "9.x-mpy"}
#: Commands that do not require an attached board
BOARDLESS_COMMANDS = ["show", "bundle-add", "bundle-remove", "bundle-show"]
#: Version identifier for a bad MPY file format
BAD_FILE_FORMAT = "Invalid"
#: Timeout for requests calls like get()
REQUESTS_TIMEOUT = 30
@ -77,7 +85,7 @@ if not os.path.exists(LOG_DIR): # pragma: no cover
# Setup logging.
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logfile_handler = logging.FileHandler(LOGFILE)
logfile_handler = RotatingFileHandler(LOGFILE, maxBytes=10_000_000, backupCount=0)
log_formatter = logging.Formatter(
"%(asctime)s %(levelname)s: %(message)s", datefmt="%m/%d/%Y %H:%M:%S"
)
@ -230,7 +238,15 @@ class Module:
# pylint: disable=too-many-arguments
def __init__(
self, path, repo, device_version, bundle_version, mpy, bundle, compatibility
self,
name,
backend,
repo,
device_version,
bundle_version,
mpy,
bundle,
compatibility,
):
"""
The ``self.file`` and ``self.name`` attributes are constructed from
@ -238,8 +254,8 @@ class Module:
resulting self.file value will be None, and the name will be the
basename of the directory path.
:param str path: The path to the module on the connected
CIRCUITPYTHON device.
:param str name: The file name of the module.
:param Backend backend: The backend that the module is on.
:param str repo: The URL of the Git repository for this module.
:param str device_version: The semver value for the version on device.
:param str bundle_version: The semver value for the version in bundle.
@ -247,15 +263,31 @@ class Module:
:param Bundle bundle: Bundle object where the module is located.
:param (str,str) compatibility: Min and max versions of CP compatible with the mpy.
"""
self.path = path
if os.path.isfile(self.path):
# Single file module.
self.file = os.path.basename(path)
self.name = self.file.replace(".py", "").replace(".mpy", "")
else:
# Directory based module.
self.name = name
self.backend = backend
self.path = (
urljoin(backend.library_path, name, allow_fragments=False)
if isinstance(backend, WebBackend)
else os.path.join(backend.library_path, name)
)
url = urlparse(self.path, allow_fragments=False)
if (
url.path.endswith("/")
if isinstance(backend, WebBackend)
else self.path.endswith(os.sep)
):
self.file = None
self.name = os.path.basename(os.path.dirname(self.path))
self.name = self.path.split(
"/" if isinstance(backend, WebBackend) else os.sep
)[-2]
else:
self.file = os.path.basename(url.path)
self.name = (
os.path.basename(url.path).replace(".py", "").replace(".mpy", "")
)
self.repo = repo
self.device_version = device_version
self.bundle_version = bundle_version
@ -371,22 +403,6 @@ class Module:
update_reason = "Minor Version"
return (self.name, loc, rem, update_reason)
def update(self):
"""
Delete the module on the device, then copy the module from the bundle
back onto the device.
The caller is expected to handle any exceptions raised.
"""
if os.path.isdir(self.path):
# Delete and copy the directory.
shutil.rmtree(self.path, ignore_errors=True)
shutil.copytree(self.bundle_path, self.path)
else:
# Delete and copy file.
os.remove(self.path)
shutil.copyfile(self.bundle_path, self.path)
def __repr__(self):
"""
Helps with log files.
@ -498,89 +514,6 @@ def ensure_latest_bundle(bundle):
logger.info("Current bundle up to date %s.", tag)
def extract_metadata(path):
# pylint: disable=too-many-locals,too-many-branches
"""
Given an file path, return a dictionary containing metadata extracted from
dunder attributes found therein. Works with both .py and .mpy files.
For Python source files, such metadata assignments should be simple and
single-line. For example::
__version__ = "1.1.4"
__repo__ = "https://github.com/adafruit/SomeLibrary.git"
For byte compiled .mpy files, a brute force / backtrack approach is used
to find the __version__ number in the file -- see comments in the
code for the implementation details.
:param str path: The path to the file containing the metadata.
:return: The dunder based metadata found in the file, as a dictionary.
"""
result = {}
logger.info("%s", path)
if path.endswith(".py"):
result["mpy"] = False
with open(path, "r", encoding="utf-8") as source_file:
content = source_file.read()
#: The regex used to extract ``__version__`` and ``__repo__`` assignments.
dunder_key_val = r"""(__\w+__)(?:\s*:\s*\w+)?\s*=\s*(?:['"]|\(\s)(.+)['"]"""
for match in re.findall(dunder_key_val, content):
result[match[0]] = str(match[1])
if result:
logger.info("Extracted metadata: %s", result)
elif path.endswith(".mpy"):
find_by_regexp_match = False
result["mpy"] = True
with open(path, "rb") as mpy_file:
content = mpy_file.read()
# Track the MPY version number
mpy_version = content[0:2]
compatibility = None
loc = -1
# Find the start location of the __version__
if mpy_version == b"M\x03":
# One byte for the length of "__version__"
loc = content.find(b"__version__") - 1
compatibility = (None, "7.0.0-alpha.1")
elif mpy_version == b"C\x05":
# Two bytes for the length of "__version__" in mpy version 5
loc = content.find(b"__version__") - 2
compatibility = ("7.0.0-alpha.1", "8.99.99")
elif mpy_version == b"C\x06":
# Two bytes in mpy version 6
find_by_regexp_match = True
compatibility = ("9.0.0-alpha.1", None)
if find_by_regexp_match:
# Too hard to find the version positionally.
# Find the first thing that looks like an x.y.z version number.
match = re.search(rb"([\d]+\.[\d]+\.[\d]+)\x00", content)
if match:
result["__version__"] = match.group(1).decode("utf-8")
elif loc > -1:
# Backtrack until a byte value of the offset is reached.
offset = 1
while offset < loc:
val = int(content[loc - offset])
if mpy_version == b"C\x05":
val = val // 2
if val == offset - 1: # Off by one..!
# Found version, extract the number given boundaries.
start = loc - offset + 1 # No need for prepended length.
end = loc # Up to the start of the __version__.
version = content[start:end] # Slice the version number.
# Create a string version as metadata in the result.
result["__version__"] = version.decode("utf-8")
break # Nothing more to do.
offset += 1 # ...and again but backtrack by one.
if compatibility:
result["compatibility"] = compatibility
else:
# not a valid MPY file
result["__version__"] = BAD_FILE_FORMAT
return result
def find_device():
"""
Return the location on the filesystem for the connected CircuitPython device.
@ -649,43 +582,50 @@ def find_device():
return device_dir
def find_modules(device_path, bundles_list):
def find_modules(backend, bundles_list):
"""
Extracts metadata from the connected device and available bundles and
returns this as a list of Module instances representing the modules on the
device.
:param str device_path: The path to the connected board.
:param Bundle bundles_list: List of supported bundles as Bundle objects.
:param Backend backend: Backend with the device connection.
:param List[Bundle] bundles_list: List of supported bundles as Bundle objects.
:return: A list of Module instances describing the current state of the
modules on the connected device.
"""
# pylint: disable=broad-except,too-many-locals
try:
device_modules = get_device_versions(device_path)
device_modules = backend.get_device_versions()
bundle_modules = get_bundle_versions(bundles_list)
result = []
for name, device_metadata in device_modules.items():
if name in bundle_modules:
for key, device_metadata in device_modules.items():
if key in bundle_modules:
path = device_metadata["path"]
bundle_metadata = bundle_modules[name]
bundle_metadata = bundle_modules[key]
repo = bundle_metadata.get("__repo__")
bundle = bundle_metadata.get("bundle")
device_version = device_metadata.get("__version__")
bundle_version = bundle_metadata.get("__version__")
mpy = device_metadata["mpy"]
compatibility = device_metadata.get("compatibility", (None, None))
result.append(
Module(
path,
repo,
device_version,
bundle_version,
mpy,
bundle,
compatibility,
)
module_name = (
path.split(os.sep)[-1]
if not path.endswith(os.sep)
else path[:-1].split(os.sep)[-1] + os.sep
)
m = Module(
module_name,
backend,
repo,
device_version,
bundle_version,
mpy,
bundle,
compatibility,
)
result.append(m)
return result
except Exception as ex:
# If it's not possible to get the device and bundle metadata, bail out
@ -740,7 +680,7 @@ def get_bundle_versions(bundles_list, avoid_download=False):
of the library bundle. Uses the Python version (rather than the compiled
version) of the library modules.
:param Bundle bundles_list: List of supported bundles as Bundle objects.
:param List[Bundle] bundles_list: List of supported bundles as Bundle objects.
:param bool avoid_download: if True, download the bundle only if missing.
:return: A dictionary of metadata about the modules available in the
library bundle.
@ -750,7 +690,7 @@ def get_bundle_versions(bundles_list, avoid_download=False):
if not avoid_download or not os.path.isdir(bundle.lib_dir("py")):
ensure_latest_bundle(bundle)
path = bundle.lib_dir("py")
path_modules = get_modules(path)
path_modules = _get_modules_file(path, logger)
for name, module in path_modules.items():
module["bundle"] = bundle
if name not in all_the_modules: # here we decide the order of priority
@ -808,43 +748,6 @@ def get_bundles_list():
return bundles_list
def get_circuitpython_version(device_path):
"""
Returns the version number of CircuitPython running on the board connected
via ``device_path``, along with the board ID. This is obtained from the
``boot_out.txt`` file on the device, whose first line will start with
something like this::
Adafruit CircuitPython 4.1.0 on 2019-08-02;
While the second line is::
Board ID:raspberry_pi_pico
:param str device_path: The path to the connected board.
:return: A tuple with the version string for CircuitPython and the board ID string.
"""
try:
with open(
os.path.join(device_path, "boot_out.txt"), "r", encoding="utf-8"
) as boot:
version_line = boot.readline()
circuit_python = version_line.split(";")[0].split(" ")[-3]
board_line = boot.readline()
if board_line.startswith("Board ID:"):
board_id = board_line[9:].strip()
else:
board_id = ""
except FileNotFoundError:
click.secho(
"Missing file boot_out.txt on the device: wrong path or drive corrupted.",
fg="red",
)
logger.error("boot_out.txt not found.")
sys.exit(1)
return (circuit_python, board_id)
def get_circup_version():
"""Return the version of circup that is running. If not available, return None.
@ -865,7 +768,8 @@ def get_circup_version():
def get_dependencies(*requested_libraries, mod_names, to_install=()):
"""
Return a list of other CircuitPython libraries
Return a list of other CircuitPython libraries required by the given list
of libraries
:param tuple requested_libraries: The libraries to search for dependencies
:param object mod_names: All the modules metadata from bundle
@ -958,17 +862,6 @@ def get_circup_dependencies(bundle, library):
return tuple()
def get_device_versions(device_path):
"""
Returns a dictionary of metadata from modules on the connected device.
:param str device_path: Path to the device volume.
:return: A dictionary of metadata about the modules available on the
connected device.
"""
return get_modules(os.path.join(device_path, "lib"))
def get_latest_release_from_url(url):
"""
Find the tag name of the latest release by using HTTP HEAD and decoding the redirect.
@ -987,138 +880,6 @@ def get_latest_release_from_url(url):
return tag
def get_modules(path):
"""
Get a dictionary containing metadata about all the Python modules found in
the referenced path.
:param str path: The directory in which to find modules.
:return: A dictionary containing metadata about the found modules.
"""
result = {}
if not path:
return result
single_file_py_mods = glob.glob(os.path.join(path, "*.py"))
single_file_mpy_mods = glob.glob(os.path.join(path, "*.mpy"))
package_dir_mods = [
d
for d in glob.glob(os.path.join(path, "*", ""))
if not os.path.basename(os.path.normpath(d)).startswith(".")
]
single_file_mods = single_file_py_mods + single_file_mpy_mods
for sfm in [f for f in single_file_mods if not os.path.basename(f).startswith(".")]:
metadata = extract_metadata(sfm)
metadata["path"] = sfm
result[os.path.basename(sfm).replace(".py", "").replace(".mpy", "")] = metadata
for package_path in package_dir_mods:
name = os.path.basename(os.path.dirname(package_path))
py_files = glob.glob(os.path.join(package_path, "**/*.py"), recursive=True)
mpy_files = glob.glob(os.path.join(package_path, "**/*.mpy"), recursive=True)
all_files = py_files + mpy_files
# default value
result[name] = {"path": package_path, "mpy": bool(mpy_files)}
# explore all the submodules to detect bad ones
for source in [f for f in all_files if not os.path.basename(f).startswith(".")]:
metadata = extract_metadata(source)
if "__version__" in metadata:
metadata["path"] = package_path
result[name] = metadata
# break now if any of the submodules has a bad format
if metadata["__version__"] == BAD_FILE_FORMAT:
break
return result
# pylint: disable=too-many-locals,too-many-branches
def install_module(
device_path, device_modules, name, pyext, mod_names
): # pragma: no cover
"""
Finds a connected device and installs a given module name if it
is available in the current module bundle and is not already
installed on the device.
TODO: There is currently no check for the version.
:param str device_path: The path to the connected board.
:param list(dict) device_modules: List of module metadata from the device.
:param str name: Name of module to install
:param bool pyext: Boolean to specify if the module should be installed from
source or from a pre-compiled module
:param mod_names: Dictionary of metadata from modules that can be generated
with get_bundle_versions()
"""
if not name:
click.echo("No module name(s) provided.")
elif name in mod_names:
library_path = os.path.join(device_path, "lib")
if not os.path.exists(library_path): # pragma: no cover
os.makedirs(library_path)
metadata = mod_names[name]
bundle = metadata["bundle"]
# Grab device modules to check if module already installed
if name in device_modules:
click.echo("'{}' is already installed.".format(name))
return
if pyext:
# Use Python source for module.
source_path = metadata["path"] # Path to Python source version.
if os.path.isdir(source_path):
target = os.path.basename(os.path.dirname(source_path))
target_path = os.path.join(library_path, target)
# Copy the directory.
shutil.copytree(source_path, target_path)
else:
target = os.path.basename(source_path)
target_path = os.path.join(library_path, target)
# Copy file.
shutil.copyfile(source_path, target_path)
else:
# Use pre-compiled mpy modules.
module_name = os.path.basename(metadata["path"]).replace(".py", ".mpy")
if not module_name:
# Must be a directory based module.
module_name = os.path.basename(os.path.dirname(metadata["path"]))
major_version = CPY_VERSION.split(".")[0]
bundle_platform = "{}mpy".format(major_version)
bundle_path = os.path.join(bundle.lib_dir(bundle_platform), module_name)
if os.path.isdir(bundle_path):
target_path = os.path.join(library_path, module_name)
# Copy the directory.
shutil.copytree(bundle_path, target_path)
elif os.path.isfile(bundle_path):
target = os.path.basename(bundle_path)
target_path = os.path.join(library_path, target)
# Copy file.
shutil.copyfile(bundle_path, target_path)
else:
raise IOError("Cannot find compiled version of module.")
click.echo("Installed '{}'.".format(name))
else:
click.echo("Unknown module named, '{}'.".format(name))
# pylint: enable=too-many-locals,too-many-branches
def libraries_from_imports(code_py, mod_names):
"""
Parse the given code.py file and return the imported libraries
:param str code_py: Full path of the code.py file
:return: sequence of library names
"""
# pylint: disable=broad-except
try:
found_imports = findimports.find_imports(code_py)
except Exception as ex: # broad exception because anything could go wrong
logger.exception(ex)
click.secho('Unable to read the auto file: "{}"'.format(str(ex)), fg="red")
sys.exit(2)
# pylint: enable=broad-except
imports = [info.name.split(".", 1)[0] for info in found_imports]
return [r for r in imports if r in mod_names]
def libraries_from_requirements(requirements):
"""
Clean up supplied requirements.txt and turn into tuple of CP libraries
@ -1189,6 +950,25 @@ def tags_data_save_tag(key, tag):
json.dump(tags_data, data)
def libraries_from_code_py(code_py, mod_names):
"""
Parse the given code.py file and return the imported libraries
:param str code_py: Full path of the code.py file
:return: sequence of library names
"""
# pylint: disable=broad-except
try:
found_imports = findimports.find_imports(code_py)
except Exception as ex: # broad exception because anything could go wrong
logger.exception(ex)
click.secho('Unable to read the auto file: "{}"'.format(str(ex)), fg="red")
sys.exit(2)
# pylint: enable=broad-except
imports = [info.name.split(".", 1)[0] for info in found_imports]
return [r for r in imports if r in mod_names]
# ----------- CLI command definitions ----------- #
# The following functions have IO side effects (for instance they emit to
@ -1207,6 +987,18 @@ def tags_data_save_tag(key, tag):
type=click.Path(exists=True, file_okay=False),
help="Path to CircuitPython directory. Overrides automatic path detection.",
)
@click.option(
"--host",
help="Hostname or IP address of a device. Overrides automatic path detection.",
)
@click.option(
"--password", help="Password to use for authentication when --host is used."
)
@click.option(
"--timeout",
default=30,
help="Specify the timeout in seconds for any network operations.",
)
@click.option(
"--board-id",
default=None,
@ -1224,11 +1016,46 @@ def tags_data_save_tag(key, tag):
message="%(prog)s, A CircuitPython module updater. Version %(version)s",
)
@click.pass_context
def main(ctx, verbose, path, board_id, cpy_version): # pragma: no cover
def main( # pylint: disable=too-many-locals
ctx, verbose, path, host, password, timeout, board_id, cpy_version
): # pragma: no cover
"""
A tool to manage and update libraries on a CircuitPython device.
"""
# pylint: disable=too-many-arguments,too-many-branches,too-many-statements,too-many-locals
ctx.ensure_object(dict)
global REQUESTS_TIMEOUT
ctx.obj["TIMEOUT"] = REQUESTS_TIMEOUT = timeout
device_path = get_device_path(host, password, path)
using_webworkflow = "host" in ctx.params.keys() and ctx.params["host"] is not None
if using_webworkflow:
if host == "circuitpython.local":
click.echo("Checking versions.json on circuitpython.local to find hostname")
versions_resp = requests.get(
"http://circuitpython.local/cp/version.json", timeout=timeout
)
host = f'{versions_resp.json()["hostname"]}.local'
click.echo(f"Using hostname: {host}")
device_path = device_path.replace("circuitpython.local", host)
try:
ctx.obj["backend"] = WebBackend(
host=host, password=password, logger=logger, timeout=timeout
)
except ValueError as e:
click.secho(e, fg="red")
time.sleep(0.3)
sys.exit(1)
except RuntimeError as e:
click.secho(e, fg="red")
sys.exit(1)
else:
try:
ctx.obj["backend"] = DiskBackend(device_path, logger)
except ValueError as e:
print(e)
if verbose:
# Configure additional logging to stdout.
global VERBOSE
@ -1249,21 +1076,18 @@ def main(ctx, verbose, path, board_id, cpy_version): # pragma: no cover
# stop early if the command is boardless
if ctx.invoked_subcommand in BOARDLESS_COMMANDS:
return
if path:
device_path = path
else:
device_path = find_device()
ctx.obj["DEVICE_PATH"] = device_path
latest_version = get_latest_release_from_url(
"https://github.com/adafruit/circuitpython/releases/latest"
)
global CPY_VERSION
if device_path is None:
if device_path is None or not ctx.obj["backend"].is_device_present():
click.secho("Could not find a connected CircuitPython device.", fg="red")
sys.exit(1)
else:
CPY_VERSION, board_id = (
get_circuitpython_version(device_path)
ctx.obj["backend"].get_circuitpython_version()
if board_id is None or cpy_version is None
else (cpy_version, board_id)
)
@ -1290,6 +1114,23 @@ def main(ctx, verbose, path, board_id, cpy_version): # pragma: no cover
logger.warning(ex)
def get_device_path(host, password, path):
"""
:param host Hostname or IP address.
:param password REST API password.
:param path File system path.
:return device URL or None if the device cannot be found.
"""
if path:
device_path = path
elif host:
# pylint: enable=no-member
device_path = f"http://:{password}@" + host
else:
device_path = find_device()
return device_path
@main.command()
@click.option("-r", "--requirement", is_flag=True)
@click.pass_context
@ -1299,7 +1140,7 @@ def freeze(ctx, requirement): # pragma: no cover
device. Option -r saves output to requirements.txt file
"""
logger.info("Freeze")
modules = find_modules(ctx.obj["DEVICE_PATH"], get_bundles_list())
modules = find_modules(ctx.obj["backend"], get_bundles_list())
if modules:
output = []
for module in modules:
@ -1332,7 +1173,7 @@ def list_cli(ctx): # pragma: no cover
modules = [
m.row
for m in find_modules(ctx.obj["DEVICE_PATH"], get_bundles_list())
for m in find_modules(ctx.obj["backend"], get_bundles_list())
if m.outofdate
]
if modules:
@ -1394,6 +1235,7 @@ def install(ctx, modules, pyext, requirement, auto, auto_file): # pragma: no co
can be installed at once by providing more than one module name, each
separated by a space.
"""
# TODO: Ensure there's enough space on the device
available_modules = get_bundle_versions(get_bundles_list())
mod_names = {}
@ -1406,25 +1248,39 @@ def install(ctx, modules, pyext, requirement, auto, auto_file): # pragma: no co
elif auto or auto_file:
if auto_file is None:
auto_file = "code.py"
print(f"Auto file: {auto_file}")
# pass a local file with "./" or "../"
is_relative = auto_file.split(os.sep)[0] in [os.path.curdir, os.path.pardir]
is_relative = not isinstance(ctx.obj["backend"], WebBackend) or auto_file.split(
os.sep
)[0] in [os.path.curdir, os.path.pardir]
if not os.path.isabs(auto_file) and not is_relative:
auto_file = os.path.join(ctx.obj["DEVICE_PATH"], auto_file or "code.py")
if not os.path.isfile(auto_file):
click.secho(f"Auto file not found: {auto_file}", fg="red")
sys.exit(1)
requested_installs = libraries_from_imports(auto_file, mod_names)
auto_file = ctx.obj["backend"].get_file_path(auto_file or "code.py")
auto_file_path = ctx.obj["backend"].get_auto_file_path(auto_file)
print(f"Auto file path: {auto_file_path}")
if not os.path.isfile(auto_file_path):
# fell through to here when run from random folder on windows - ask backend.
new_auto_file = ctx.obj["backend"].get_file_path(auto_file)
if os.path.isfile(new_auto_file):
auto_file = new_auto_file
auto_file_path = ctx.obj["backend"].get_auto_file_path(auto_file)
print(f"Auto file path: {auto_file_path}")
else:
click.secho(f"Auto file not found: {auto_file}", fg="red")
sys.exit(1)
requested_installs = libraries_from_code_py(auto_file_path, mod_names)
else:
requested_installs = modules
requested_installs = sorted(set(requested_installs))
click.echo(f"Searching for dependencies for: {requested_installs}")
to_install = get_dependencies(requested_installs, mod_names=mod_names)
device_modules = get_device_versions(ctx.obj["DEVICE_PATH"])
device_modules = ctx.obj["backend"].get_device_versions()
if to_install is not None:
to_install = sorted(to_install)
click.echo(f"Ready to install: {to_install}\n")
for library in to_install:
install_module(
ctx.obj["backend"].install_module(
ctx.obj["DEVICE_PATH"], device_modules, library, pyext, mod_names
)
@ -1462,29 +1318,22 @@ def uninstall(ctx, module): # pragma: no cover
can be uninstalled at once by providing more than one module name, each
separated by a space.
"""
device_path = ctx.obj["DEVICE_PATH"]
print(f"Uninstalling {module} from {device_path}")
for name in module:
device_modules = get_device_versions(ctx.obj["DEVICE_PATH"])
device_modules = ctx.obj["backend"].get_device_versions()
name = name.lower()
mod_names = {}
for module_item, metadata in device_modules.items():
mod_names[module_item.replace(".py", "").lower()] = metadata
if name in mod_names:
library_path = os.path.join(ctx.obj["DEVICE_PATH"], "lib")
metadata = mod_names[name]
module_path = metadata["path"]
if os.path.isdir(module_path):
target = os.path.basename(os.path.dirname(module_path))
target_path = os.path.join(library_path, target)
# Remove the directory.
shutil.rmtree(target_path)
else:
target = os.path.basename(module_path)
target_path = os.path.join(library_path, target)
# Remove file
os.remove(target_path)
ctx.obj["backend"].uninstall(device_path, module_path)
click.echo("Uninstalled '{}'.".format(name))
else:
click.echo("Module '{}' not found on device.".format(name))
continue
# pylint: disable=too-many-branches
@ -1511,9 +1360,7 @@ def update(ctx, update_all): # pragma: no cover
logger.info("Update")
# Grab out of date modules.
modules = [
m
for m in find_modules(ctx.obj["DEVICE_PATH"], get_bundles_list())
if m.outofdate
m for m in find_modules(ctx.obj["backend"], get_bundles_list()) if m.outofdate
]
if modules:
click.echo("Found {} module[s] needing update.".format(len(modules)))
@ -1565,7 +1412,7 @@ def update(ctx, update_all): # pragma: no cover
if update_flag:
# pylint: disable=broad-except
try:
module.update()
ctx.obj["backend"].update(module)
click.echo("Updated {}".format(module.name))
except Exception as ex:
logger.exception(ex)

829
circup/backends.py Normal file
View file

@ -0,0 +1,829 @@
# SPDX-FileCopyrightText: 2019 Nicholas Tollervey, written for Adafruit Industries
# SPDX-FileCopyrightText: 2023 Tim Cocks, written for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
Backend classes that represent interfaces to physical devices.
"""
import os
import shutil
import sys
import socket
import tempfile
from urllib.parse import urlparse, urljoin
import click
import requests
from requests.adapters import HTTPAdapter
from requests.auth import HTTPBasicAuth
from circup.shared import DATA_DIR, BAD_FILE_FORMAT, extract_metadata, _get_modules_file
#: The location to store a local copy of code.py for use with --auto and
# web workflow
LOCAL_CODE_PY_COPY = os.path.join(DATA_DIR, "code.tmp.py")
class Backend:
"""
Backend parent class to be extended for workflow specific
implementations
"""
def __init__(self, logger):
self.device_location = None
self.LIB_DIR_PATH = None
self.logger = logger
def get_circuitpython_version(self):
"""
Must be overridden by subclass for implementation!
Returns the version number of CircuitPython running on the board connected
via ``device_url``, along with the board ID.
:param str device_location: http based device URL or local file path.
:return: A tuple with the version string for CircuitPython and the board ID string.
"""
raise NotImplementedError
def _get_modules(self, device_lib_path):
"""
To be overridden by subclass
"""
raise NotImplementedError
def get_modules(self, device_url):
"""
Get a dictionary containing metadata about all the Python modules found in
the referenced path.
:param str device_url: URL to be used to find modules.
:return: A dictionary containing metadata about the found modules.
"""
return self._get_modules(device_url)
def get_device_versions(self):
"""
Returns a dictionary of metadata from modules on the connected device.
:param str device_url: URL for the device.
:return: A dictionary of metadata about the modules available on the
connected device.
"""
return self.get_modules(os.path.join(self.device_location, self.LIB_DIR_PATH))
def _create_library_directory(self, device_path, library_path):
"""
To be overridden by subclass
"""
raise NotImplementedError
def _install_module_py(self, metadata):
"""
To be overridden by subclass
"""
raise NotImplementedError
def _install_module_mpy(self, bundle, metadata):
"""
To be overridden by subclass
"""
raise NotImplementedError
# pylint: disable=too-many-locals,too-many-branches,too-many-arguments,too-many-nested-blocks
def install_module(
self, device_path, device_modules, name, pyext, mod_names
): # pragma: no cover
"""
Finds a connected device and installs a given module name if it
is available in the current module bundle and is not already
installed on the device.
TODO: There is currently no check for the version.
:param str device_path: The path to the connected board.
:param list(dict) device_modules: List of module metadata from the device.
:param str name: Name of module to install
:param bool pyext: Boolean to specify if the module should be installed from
source or from a pre-compiled module
:param mod_names: Dictionary of metadata from modules that can be generated
with get_bundle_versions()
"""
if not name:
click.echo("No module name(s) provided.")
elif name in mod_names:
# Grab device modules to check if module already installed
if name in device_modules:
click.echo("'{}' is already installed.".format(name))
return
library_path = (
os.path.join(device_path, self.LIB_DIR_PATH)
if not isinstance(self, WebBackend)
else urljoin(device_path, self.LIB_DIR_PATH)
)
metadata = mod_names[name]
bundle = metadata["bundle"]
bundle.size = os.path.getsize(metadata["path"])
if os.path.isdir(metadata["path"]):
# pylint: disable=unused-variable
for dirpath, dirnames, filenames in os.walk(metadata["path"]):
for f in filenames:
fp = os.path.join(dirpath, f)
try:
if not os.path.islink(fp): # Ignore symbolic links
bundle.size += os.path.getsize(fp)
else:
self.logger.warning(
f"Skipping symbolic link in space calculation: {fp}"
)
except OSError as e:
self.logger.error(
f"Error: {e} - Skipping file in space calculation: {fp}"
)
if self.get_free_space() < bundle.size:
self.logger.error(
f"Aborted installing module {name} - "
f"not enough free space ({bundle.size} < {self.get_free_space()})"
)
click.secho(
f"Aborted installing module {name} - "
f"not enough free space ({bundle.size} < {self.get_free_space()})",
fg="red",
)
return
# Create the library directory first.
self._create_library_directory(device_path, library_path)
if pyext:
# Use Python source for module.
self._install_module_py(metadata)
else:
# Use pre-compiled mpy modules.
self._install_module_mpy(bundle, metadata)
click.echo("Installed '{}'.".format(name))
else:
click.echo("Unknown module named, '{}'.".format(name))
# def libraries_from_imports(self, code_py, mod_names):
# """
# To be overridden by subclass
# """
# raise NotImplementedError
def uninstall(self, device_path, module_path):
"""
To be overridden by subclass
"""
raise NotImplementedError
def update(self, module):
"""
To be overridden by subclass
"""
raise NotImplementedError
def get_file_path(self, filename):
"""
To be overridden by subclass
"""
raise NotImplementedError
def get_free_space(self):
"""
To be overridden by subclass
"""
raise NotImplementedError
def is_device_present(self):
"""
To be overriden by subclass
"""
raise NotImplementedError
@staticmethod
def parse_boot_out_file(boot_out_contents):
"""
Parse the contents of boot_out.txt
Returns: circuitpython version and board id
"""
lines = boot_out_contents.split("\n")
version_line = lines[0]
circuit_python = version_line.split(";")[0].split(" ")[-3]
board_line = lines[1]
if board_line.startswith("Board ID:"):
board_id = board_line[9:].strip()
else:
board_id = ""
return circuit_python, board_id
def _writeable_error():
click.secho(
"CircuitPython Web Workflow Device not writable\n - "
"Remount storage as writable to device (not PC)",
fg="red",
)
sys.exit(1)
class WebBackend(Backend):
"""
Backend for interacting with a device via Web Workflow
"""
def __init__(self, host, password, logger, timeout=10):
super().__init__(logger)
if password is None:
raise ValueError("--host needs --password")
# pylint: disable=no-member
# verify hostname/address
try:
socket.getaddrinfo(host, 80, proto=socket.IPPROTO_TCP)
except socket.gaierror as exc:
raise RuntimeError(
"Invalid host: {}.".format(host) + " You should remove the 'http://'"
if "http://" in host or "https://" in host
else "Could not find or connect to specified device"
) from exc
self.LIB_DIR_PATH = "fs/lib/"
self.host = host
self.password = password
self.device_location = f"http://:{self.password}@{self.host}"
self.session = requests.Session()
self.session.mount(self.device_location, HTTPAdapter(max_retries=5))
self.library_path = self.device_location + "/" + self.LIB_DIR_PATH
self.timeout = timeout
def install_file_http(self, source):
"""
Install file to device using web workflow.
:param source source file.
"""
file_name = source.split(os.path.sep)
file_name = file_name[-2] if file_name[-1] == "" else file_name[-1]
target = self.device_location + "/" + self.LIB_DIR_PATH + file_name
auth = HTTPBasicAuth("", self.password)
with open(source, "rb") as fp:
r = self.session.put(target, fp.read(), auth=auth, timeout=self.timeout)
if r.status_code == 409:
_writeable_error()
r.raise_for_status()
def install_dir_http(self, source):
"""
Install directory to device using web workflow.
:param source source directory.
"""
mod_name = source.split(os.path.sep)
mod_name = mod_name[-2] if mod_name[-1] == "" else mod_name[-1]
target = self.device_location + "/" + self.LIB_DIR_PATH + mod_name
target = target + "/" if target[:-1] != "/" else target
url = urlparse(target)
auth = HTTPBasicAuth("", url.password)
# Create the top level directory.
with self.session.put(target, auth=auth, timeout=self.timeout) as r:
if r.status_code == 409:
_writeable_error()
r.raise_for_status()
# Traverse the directory structure and create the directories/files.
for root, dirs, files in os.walk(source):
rel_path = os.path.relpath(root, source)
if rel_path == ".":
rel_path = ""
for name in dirs:
path_to_create = (
urljoin(
urljoin(target, rel_path + "/", allow_fragments=False),
name,
allow_fragments=False,
)
if rel_path != ""
else urljoin(target, name, allow_fragments=False)
)
path_to_create = (
path_to_create + "/"
if path_to_create[:-1] != "/"
else path_to_create
)
with self.session.put(
path_to_create, auth=auth, timeout=self.timeout
) as r:
if r.status_code == 409:
_writeable_error()
r.raise_for_status()
for name in files:
with open(os.path.join(root, name), "rb") as fp:
path_to_create = (
urljoin(
urljoin(target, rel_path + "/", allow_fragments=False),
name,
allow_fragments=False,
)
if rel_path != ""
else urljoin(target, name, allow_fragments=False)
)
with self.session.put(
path_to_create, fp.read(), auth=auth, timeout=self.timeout
) as r:
if r.status_code == 409:
_writeable_error()
r.raise_for_status()
def get_circuitpython_version(self):
"""
Returns the version number of CircuitPython running on the board connected
via ``device_path``, along with the board ID. This is obtained using
RESTful API from the /cp/version.json URL.
:return: A tuple with the version string for CircuitPython and the board ID string.
"""
# pylint: disable=arguments-renamed
with self.session.get(
self.device_location + "/cp/version.json", timeout=self.timeout
) as r:
# pylint: disable=no-member
if r.status_code != requests.codes.ok:
click.secho(
f" Unable to get version from {self.device_location}: {r.status_code}",
fg="red",
)
sys.exit(1)
# pylint: enable=no-member
ver_json = r.json()
return ver_json.get("version"), ver_json.get("board_id")
def _get_modules(self, device_lib_path):
return self._get_modules_http(device_lib_path)
def _get_modules_http(self, url):
"""
Get a dictionary containing metadata about all the Python modules found using
the referenced URL.
:param str url: URL for the modules.
:return: A dictionary containing metadata about the found modules.
"""
result = {}
u = urlparse(url)
auth = HTTPBasicAuth("", u.password)
with self.session.get(
url, auth=auth, headers={"Accept": "application/json"}, timeout=self.timeout
) as r:
r.raise_for_status()
directory_mods = []
single_file_mods = []
for entry in r.json()["files"]:
entry_name = entry.get("name")
if entry.get("directory"):
directory_mods.append(entry_name)
else:
if entry_name.endswith(".py") or entry_name.endswith(".mpy"):
single_file_mods.append(entry_name)
self._get_modules_http_single_mods(auth, result, single_file_mods, url)
self._get_modules_http_dir_mods(auth, directory_mods, result, url)
return result
def _get_modules_http_dir_mods(self, auth, directory_mods, result, url):
# pylint: disable=too-many-locals
"""
Builds result dictionary with keys containing module names and values containing a
dictionary with metadata bout the module like version, compatibility, mpy or not etc.
:param auth HTTP authentication.
:param directory_mods list of modules.
:param result dictionary for the result.
:param url: URL of the device.
"""
for dm in directory_mods:
if str(urlparse(dm).scheme).lower() not in ("http", "https"):
dm_url = url + dm + "/"
else:
dm_url = dm
with self.session.get(
dm_url,
auth=auth,
headers={"Accept": "application/json"},
timeout=self.timeout,
) as r:
r.raise_for_status()
mpy = False
for entry in r.json()["files"]:
entry_name = entry.get("name")
if not entry.get("directory") and (
entry_name.endswith(".py") or entry_name.endswith(".mpy")
):
if entry_name.endswith(".mpy"):
mpy = True
with self.session.get(
dm_url + entry_name, auth=auth, timeout=self.timeout
) as rr:
rr.raise_for_status()
idx = entry_name.rfind(".")
with tempfile.NamedTemporaryFile(
prefix=entry_name[:idx] + "-",
suffix=entry_name[idx:],
delete=False,
) as fp:
fp.write(rr.content)
tmp_name = fp.name
metadata = extract_metadata(tmp_name, self.logger)
os.remove(tmp_name)
if "__version__" in metadata:
metadata["path"] = dm_url
result[dm] = metadata
# break now if any of the submodules has a bad format
if metadata["__version__"] == BAD_FILE_FORMAT:
break
if result.get(dm) is None:
result[dm] = {"path": dm_url, "mpy": mpy}
def _get_modules_http_single_mods(self, auth, result, single_file_mods, url):
"""
:param auth HTTP authentication.
:param single_file_mods list of modules.
:param result dictionary for the result.
:param url: URL of the device.
"""
for sfm in single_file_mods:
if str(urlparse(sfm).scheme).lower() not in ("http", "https"):
sfm_url = url + sfm
else:
sfm_url = sfm
with self.session.get(sfm_url, auth=auth, timeout=self.timeout) as r:
r.raise_for_status()
idx = sfm.rfind(".")
with tempfile.NamedTemporaryFile(
prefix=sfm[:idx] + "-", suffix=sfm[idx:], delete=False
) as fp:
fp.write(r.content)
tmp_name = fp.name
metadata = extract_metadata(tmp_name, self.logger)
os.remove(tmp_name)
metadata["path"] = sfm_url
result[sfm[:idx]] = metadata
def _create_library_directory(self, device_path, library_path):
url = urlparse(device_path)
auth = HTTPBasicAuth("", url.password)
with self.session.put(library_path, auth=auth, timeout=self.timeout) as r:
if r.status_code == 409:
_writeable_error()
r.raise_for_status()
def _install_module_mpy(self, bundle, metadata):
"""
:param bundle library bundle.
:param library_path library path
:param metadata dictionary.
"""
module_name = os.path.basename(metadata["path"]).replace(".py", ".mpy")
if not module_name:
# Must be a directory based module.
module_name = os.path.basename(os.path.dirname(metadata["path"]))
major_version = self.get_circuitpython_version()[0].split(".")[0]
bundle_platform = "{}mpy".format(major_version)
bundle_path = os.path.join(bundle.lib_dir(bundle_platform), module_name)
if os.path.isdir(bundle_path):
self.install_dir_http(bundle_path)
elif os.path.isfile(bundle_path):
self.install_file_http(bundle_path)
else:
raise IOError("Cannot find compiled version of module.")
# pylint: enable=too-many-locals,too-many-branches
def _install_module_py(self, metadata):
"""
:param library_path library path
:param metadata dictionary.
"""
source_path = metadata["path"] # Path to Python source version.
if os.path.isdir(source_path):
self.install_dir_http(source_path)
else:
self.install_file_http(source_path)
def get_auto_file_path(self, auto_file_path):
"""
Make a local temp copy of the --auto file from the device.
Returns the path to the local copy.
"""
url = auto_file_path
auth = HTTPBasicAuth("", self.password)
with self.session.get(url, auth=auth, timeout=self.timeout) as r:
r.raise_for_status()
with open(LOCAL_CODE_PY_COPY, "w", encoding="utf-8") as f:
f.write(r.text)
return LOCAL_CODE_PY_COPY
def uninstall(self, device_path, module_path):
"""
Uninstall given module on device using REST API.
"""
url = urlparse(device_path)
auth = HTTPBasicAuth("", url.password)
with self.session.delete(module_path, auth=auth, timeout=self.timeout) as r:
if r.status_code == 409:
_writeable_error()
r.raise_for_status()
def update(self, module):
"""
Delete the module on the device, then copy the module from the bundle
back onto the device.
The caller is expected to handle any exceptions raised.
"""
self._update_http(module)
def _update_http(self, module):
"""
Update the module using web workflow.
"""
if module.file:
# Copy the file (will overwrite).
self.install_file_http(module.bundle_path)
else:
# Delete the directory (recursive) first.
url = urlparse(module.path)
auth = HTTPBasicAuth("", url.password)
with self.session.delete(module.path, auth=auth, timeout=self.timeout) as r:
if r.status_code == 409:
_writeable_error()
r.raise_for_status()
self.install_dir_http(module.bundle_path)
def get_file_path(self, filename):
"""
retuns the full path on the device to a given file name.
"""
return urljoin(
urljoin(self.device_location, "fs/", allow_fragments=False),
filename,
allow_fragments=False,
)
def is_device_present(self):
"""
returns True if the device is currently connected
"""
try:
_ = self.session.get(f"{self.device_location}/cp/version.json")
return True
except requests.exceptions.ConnectionError:
return False
def get_device_versions(self):
"""
Returns a dictionary of metadata from modules on the connected device.
:param str device_url: URL for the device.
:return: A dictionary of metadata about the modules available on the
connected device.
"""
return self.get_modules(urljoin(self.device_location, self.LIB_DIR_PATH))
def get_free_space(self):
"""
Returns the free space on the device in bytes.
"""
auth = HTTPBasicAuth("", self.password)
with self.session.get(
urljoin(self.device_location, "fs/"),
auth=auth,
headers={"Accept": "application/json"},
timeout=self.timeout,
) as r:
r.raise_for_status()
if r.json().get("free") is None:
self.logger.error("Unable to get free block count from device.")
click.secho("Unable to get free block count from device.", fg="red")
elif r.json().get("block_size") is None:
self.logger.error("Unable to get block size from device.")
click.secho("Unable to get block size from device.", fg="red")
elif r.json().get("writable") is None or r.json().get("writable") is False:
self.logger.error(
"CircuitPython Web Workflow Device not writable\n - "
"Remount storage as writable to device (not PC)"
)
click.secho(
"CircuitPython Web Workflow Device not writable\n - "
"Remount storage as writable to device (not PC)",
fg="red",
)
else:
return r.json()["free"] * r.json()["block_size"] # bytes
sys.exit(1)
class DiskBackend(Backend):
"""
Backend for interacting with a device via USB Workflow
:param String device_location: Path to the device
:param logger: logger to use for outputting messages
:param String boot_out: Optional mock contents of a boot_out.txt file
to use for version information.
"""
def __init__(self, device_location, logger, boot_out=None):
if device_location is None:
raise ValueError(
"Auto locating USB Disk based device failed. "
"Please specify --path argument or ensure your device "
"is connected and mounted under the name CIRCUITPY."
)
super().__init__(logger)
self.LIB_DIR_PATH = "lib"
self.device_location = device_location
self.library_path = os.path.join(self.device_location, self.LIB_DIR_PATH)
self.version_info = None
if boot_out is not None:
self.version_info = self.parse_boot_out_file(boot_out)
def get_circuitpython_version(self):
"""
Returns the version number of CircuitPython running on the board connected
via ``device_path``, along with the board ID. This is obtained from the
``boot_out.txt`` file on the device, whose first line will start with
something like this::
Adafruit CircuitPython 4.1.0 on 2019-08-02;
While the second line is::
Board ID:raspberry_pi_pico
:return: A tuple with the version string for CircuitPython and the board ID string.
"""
if not self.version_info:
try:
with open(
os.path.join(self.device_location, "boot_out.txt"),
"r",
encoding="utf-8",
) as boot:
boot_out_contents = boot.read()
circuit_python, board_id = self.parse_boot_out_file(
boot_out_contents
)
except FileNotFoundError:
click.secho(
"Missing file boot_out.txt on the device: wrong path or drive corrupted.",
fg="red",
)
self.logger.error("boot_out.txt not found.")
sys.exit(1)
return circuit_python, board_id
return self.version_info
def _get_modules(self, device_lib_path):
"""
Get a dictionary containing metadata about all the Python modules found in
the referenced path.
:param str device_lib_path: URL to be used to find modules.
:return: A dictionary containing metadata about the found modules.
"""
return _get_modules_file(device_lib_path, self.logger)
def _create_library_directory(self, device_path, library_path):
if not os.path.exists(library_path): # pragma: no cover
os.makedirs(library_path)
def _install_module_mpy(self, bundle, metadata):
"""
:param bundle library bundle.
:param library_path library path
:param metadata dictionary.
"""
module_name = os.path.basename(metadata["path"]).replace(".py", ".mpy")
if not module_name:
# Must be a directory based module.
module_name = os.path.basename(os.path.dirname(metadata["path"]))
major_version = self.get_circuitpython_version()[0].split(".")[0]
bundle_platform = "{}mpy".format(major_version)
bundle_path = os.path.join(bundle.lib_dir(bundle_platform), module_name)
if os.path.isdir(bundle_path):
target_path = os.path.join(self.library_path, module_name)
# Copy the directory.
shutil.copytree(bundle_path, target_path)
elif os.path.isfile(bundle_path):
target = os.path.basename(bundle_path)
target_path = os.path.join(self.library_path, target)
# Copy file.
shutil.copyfile(bundle_path, target_path)
else:
raise IOError("Cannot find compiled version of module.")
# pylint: enable=too-many-locals,too-many-branches
def _install_module_py(self, metadata):
"""
:param library_path library path
:param metadata dictionary.
"""
source_path = metadata["path"] # Path to Python source version.
if os.path.isdir(source_path):
target = os.path.basename(os.path.dirname(source_path))
target_path = os.path.join(self.library_path, target)
# Copy the directory.
shutil.copytree(source_path, target_path)
else:
target = os.path.basename(source_path)
target_path = os.path.join(self.library_path, target)
# Copy file.
shutil.copyfile(source_path, target_path)
def get_auto_file_path(self, auto_file_path):
"""
Returns the path on the device to the file to be read for --auto.
"""
return auto_file_path
def uninstall(self, device_path, module_path):
"""
Uninstall module using local file system.
"""
library_path = os.path.join(device_path, "lib")
if os.path.isdir(module_path):
target = os.path.basename(os.path.dirname(module_path))
target_path = os.path.join(library_path, target)
# Remove the directory.
shutil.rmtree(target_path)
else:
target = os.path.basename(module_path)
target_path = os.path.join(library_path, target)
# Remove file
os.remove(target_path)
def update(self, module):
"""
Delete the module on the device, then copy the module from the bundle
back onto the device.
The caller is expected to handle any exceptions raised.
"""
self._update_file(module)
def _update_file(self, module):
"""
Update the module using file system.
"""
if os.path.isdir(module.path):
# Delete and copy the directory.
shutil.rmtree(module.path, ignore_errors=True)
shutil.copytree(module.bundle_path, module.path)
else:
# Delete and copy file.
os.remove(module.path)
shutil.copyfile(module.bundle_path, module.path)
def get_file_path(self, filename):
"""
returns the full path on the device to a given file name.
"""
return os.path.join(self.device_location, filename)
def is_device_present(self):
"""
returns True if the device is currently connected
"""
return os.path.exists(self.device_location)
def get_free_space(self):
"""
Returns the free space on the device in bytes.
"""
# pylint: disable=unused-variable
_, total, free = shutil.disk_usage(self.device_location)
return free

144
circup/shared.py Normal file
View file

@ -0,0 +1,144 @@
# SPDX-FileCopyrightText: 2019 Nicholas Tollervey, written for Adafruit Industries
# SPDX-FileCopyrightText: 2023 Tim Cocks, written for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
Utilities that are shared and used by both click CLI command functions
and Backend class functions.
"""
import glob
import os
import re
import appdirs
#: Version identifier for a bad MPY file format
BAD_FILE_FORMAT = "Invalid"
#: The location of data files used by circup (following OS conventions).
DATA_DIR = appdirs.user_data_dir(appname="circup", appauthor="adafruit")
def _get_modules_file(path, logger):
"""
Get a dictionary containing metadata about all the Python modules found in
the referenced file system path.
:param str path: The directory in which to find modules.
:return: A dictionary containing metadata about the found modules.
"""
result = {}
if not path:
return result
single_file_py_mods = glob.glob(os.path.join(path, "*.py"))
single_file_mpy_mods = glob.glob(os.path.join(path, "*.mpy"))
package_dir_mods = [
d
for d in glob.glob(os.path.join(path, "*", ""))
if not os.path.basename(os.path.normpath(d)).startswith(".")
]
single_file_mods = single_file_py_mods + single_file_mpy_mods
for sfm in [f for f in single_file_mods if not os.path.basename(f).startswith(".")]:
metadata = extract_metadata(sfm, logger)
metadata["path"] = sfm
result[os.path.basename(sfm).replace(".py", "").replace(".mpy", "")] = metadata
for package_path in package_dir_mods:
name = os.path.basename(os.path.dirname(package_path))
py_files = glob.glob(os.path.join(package_path, "**/*.py"), recursive=True)
mpy_files = glob.glob(os.path.join(package_path, "**/*.mpy"), recursive=True)
all_files = py_files + mpy_files
# default value
result[name] = {"path": package_path, "mpy": bool(mpy_files)}
# explore all the submodules to detect bad ones
for source in [f for f in all_files if not os.path.basename(f).startswith(".")]:
metadata = extract_metadata(source, logger)
if "__version__" in metadata:
metadata["path"] = package_path
result[name] = metadata
# break now if any of the submodules has a bad format
if metadata["__version__"] == BAD_FILE_FORMAT:
break
return result
def extract_metadata(path, logger):
# pylint: disable=too-many-locals,too-many-branches
"""
Given a file path, return a dictionary containing metadata extracted from
dunder attributes found therein. Works with both .py and .mpy files.
For Python source files, such metadata assignments should be simple and
single-line. For example::
__version__ = "1.1.4"
__repo__ = "https://github.com/adafruit/SomeLibrary.git"
For byte compiled .mpy files, a brute force / backtrack approach is used
to find the __version__ number in the file -- see comments in the
code for the implementation details.
:param str path: The path to the file containing the metadata.
:return: The dunder based metadata found in the file, as a dictionary.
"""
result = {}
logger.info("%s", path)
if path.endswith(".py"):
result["mpy"] = False
with open(path, "r", encoding="utf-8") as source_file:
content = source_file.read()
#: The regex used to extract ``__version__`` and ``__repo__`` assignments.
dunder_key_val = r"""(__\w+__)(?:\s*:\s*\w+)?\s*=\s*(?:['"]|\(\s)(.+)['"]"""
for match in re.findall(dunder_key_val, content):
result[match[0]] = str(match[1])
if result:
logger.info("Extracted metadata: %s", result)
elif path.endswith(".mpy"):
find_by_regexp_match = False
result["mpy"] = True
with open(path, "rb") as mpy_file:
content = mpy_file.read()
# Track the MPY version number
mpy_version = content[0:2]
compatibility = None
loc = -1
# Find the start location of the __version__
if mpy_version == b"M\x03":
# One byte for the length of "__version__"
loc = content.find(b"__version__") - 1
compatibility = (None, "7.0.0-alpha.1")
elif mpy_version == b"C\x05":
# Two bytes for the length of "__version__" in mpy version 5
loc = content.find(b"__version__") - 2
compatibility = ("7.0.0-alpha.1", "8.99.99")
elif mpy_version == b"C\x06":
# Two bytes in mpy version 6
find_by_regexp_match = True
compatibility = ("9.0.0-alpha.1", None)
if find_by_regexp_match:
# Too hard to find the version positionally.
# Find the first thing that looks like an x.y.z version number.
match = re.search(rb"([\d]+\.[\d]+\.[\d]+)\x00", content)
if match:
result["__version__"] = match.group(1).decode("utf-8")
elif loc > -1:
# Backtrack until a byte value of the offset is reached.
offset = 1
while offset < loc:
val = int(content[loc - offset])
if mpy_version == b"C\x05":
val = val // 2
if val == offset - 1: # Off by one..!
# Found version, extract the number given boundaries.
start = loc - offset + 1 # No need for prepended length.
end = loc # Up to the start of the __version__.
version = content[start:end] # Slice the version number.
# Create a string version as metadata in the result.
result["__version__"] = version.decode("utf-8")
break # Nothing more to do.
offset += 1 # ...and again but backtrack by one.
if compatibility:
result["compatibility"] = compatibility
else:
# not a valid MPY file
result["__version__"] = BAD_FILE_FORMAT
return result

View file

@ -0,0 +1,3 @@
Adafruit CircuitPython 4.1.0 on 2019-08-02; Adafruit CircuitPlayground Express with samd21g18
Board ID:this_is_a_board
UID:AAAABBBBCCCC

View file

@ -0,0 +1,3 @@
# SPDX-FileCopyrightText: 2023 Tim Cocks, written for Adafruit Industries
#
# SPDX-License-Identifier: MIT

View file

@ -31,14 +31,12 @@ import ctypes
import json
import pathlib
from unittest import mock
from click.testing import CliRunner
import pytest
import requests
import circup
from circup import DiskBackend
TEST_BUNDLE_CONFIG_JSON = "tests/test_bundle_config.json"
with open(TEST_BUNDLE_CONFIG_JSON, "rb") as tbc:
@ -211,18 +209,29 @@ def test_Module_init_file_module():
Ensure the Module instance is set up as expected and logged, as if for a
single file Python module.
"""
path = os.path.join("foo", "bar", "baz", "local_module.py")
name = "local_module.py"
path = os.path.join("mock_device", "lib", name)
repo = "https://github.com/adafruit/SomeLibrary.git"
device_version = "1.2.3"
bundle_version = "3.2.1"
with mock.patch("circup.logger.info") as mock_logger, mock.patch(
"circup.os.path.isfile", return_value=True
), mock.patch("circup.CPY_VERSION", "4.1.2"), mock.patch(
"circup.Bundle.lib_dir", return_value="tests"
"circup.Bundle.lib_dir",
return_value="tests",
):
backend = DiskBackend("mock_device", mock_logger)
bundle = circup.Bundle(TEST_BUNDLE_NAME)
m = circup.Module(
path, repo, device_version, bundle_version, False, bundle, (None, None)
name,
backend,
repo,
device_version,
bundle_version,
False,
bundle,
(None, None),
)
mock_logger.assert_called_once_with(m)
assert m.path == path
@ -240,7 +249,8 @@ def test_Module_init_directory_module():
Ensure the Module instance is set up as expected and logged, as if for a
directory based Python module.
"""
path = os.path.join("foo", "bar", "dir_module", "")
name = "dir_module/"
path = os.path.join("mock_device", "lib", f"{name}", "")
repo = "https://github.com/adafruit/SomeLibrary.git"
device_version = "1.2.3"
bundle_version = "3.2.1"
@ -252,9 +262,17 @@ def test_Module_init_directory_module():
), mock.patch(
"circup.Bundle.lib_dir", return_value="tests"
):
backend = DiskBackend("mock_device", mock_logger)
bundle = circup.Bundle(TEST_BUNDLE_NAME)
m = circup.Module(
path, repo, device_version, bundle_version, mpy, bundle, (None, None)
name,
backend,
repo,
device_version,
bundle_version,
mpy,
bundle,
(None, None),
)
mock_logger.assert_called_once_with(m)
assert m.path == path
@ -274,15 +292,23 @@ def test_Module_outofdate():
out of date.
"""
bundle = circup.Bundle(TEST_BUNDLE_NAME)
path = os.path.join("foo", "bar", "baz", "module.py")
name = "module.py"
repo = "https://github.com/adafruit/SomeLibrary.git"
m1 = circup.Module(path, repo, "1.2.3", "3.2.1", False, bundle, (None, None))
m2 = circup.Module(path, repo, "1.2.3", "1.2.3", False, bundle, (None, None))
# shouldn't happen!
m3 = circup.Module(path, repo, "3.2.1", "1.2.3", False, bundle, (None, None))
assert m1.outofdate is True
assert m2.outofdate is False
assert m3.outofdate is False
with mock.patch("circup.logger.info") as mock_logger:
backend = DiskBackend("mock_device", mock_logger)
m1 = circup.Module(
name, backend, repo, "1.2.3", "3.2.1", False, bundle, (None, None)
)
m2 = circup.Module(
name, backend, repo, "1.2.3", "1.2.3", False, bundle, (None, None)
)
# shouldn't happen!
m3 = circup.Module(
name, backend, repo, "3.2.1", "1.2.3", False, bundle, (None, None)
)
assert m1.outofdate is True
assert m2.outofdate is False
assert m3.outofdate is False
def test_Module_outofdate_bad_versions():
@ -292,14 +318,25 @@ def test_Module_outofdate_bad_versions():
this problem). Such a problem should be logged.
"""
bundle = circup.Bundle(TEST_BUNDLE_NAME)
path = os.path.join("foo", "bar", "baz", "module.py")
name = "module.py"
repo = "https://github.com/adafruit/SomeLibrary.git"
device_version = "hello"
bundle_version = "3.2.1"
m = circup.Module(
path, repo, device_version, bundle_version, False, bundle, (None, None)
)
with mock.patch("circup.logger.warning") as mock_logger:
backend = DiskBackend("mock_device", mock_logger)
m = circup.Module(
name,
backend,
repo,
device_version,
bundle_version,
False,
bundle,
(None, None),
)
assert m.outofdate is True
assert mock_logger.call_count == 2
@ -310,16 +347,28 @@ def test_Module_mpy_mismatch():
boolean value to correctly indicate if the referenced module is, in fact,
out of date.
"""
path = os.path.join("foo", "bar", "baz", "module.mpy")
name = "module.py"
repo = "https://github.com/adafruit/SomeLibrary.git"
with mock.patch("circup.CPY_VERSION", "8.0.0"):
with mock.patch("circup.CPY_VERSION", "8.0.0"), mock.patch(
"circup.logger.warning"
) as mock_logger:
backend = DiskBackend("mock_device", mock_logger)
bundle = circup.Bundle(TEST_BUNDLE_NAME)
m1 = circup.Module(path, repo, "1.2.3", "1.2.3", True, bundle, (None, None))
m1 = circup.Module(
name, backend, repo, "1.2.3", "1.2.3", True, bundle, (None, None)
)
m2 = circup.Module(
path, repo, "1.2.3", "1.2.3", True, bundle, ("7.0.0-alpha.1", "8.99.99")
name,
backend,
repo,
"1.2.3",
"1.2.3",
True,
bundle,
("7.0.0-alpha.1", "8.99.99"),
)
m3 = circup.Module(
path, repo, "1.2.3", "1.2.3", True, bundle, (None, "7.0.0-alpha.1")
name, backend, repo, "1.2.3", "1.2.3", True, bundle, (None, "7.0.0-alpha.1")
)
with mock.patch("circup.CPY_VERSION", "6.2.0"):
assert m1.mpy_mismatch is False
@ -345,14 +394,23 @@ def test_Module_major_update_bad_versions():
Such a problem should be logged.
"""
bundle = circup.Bundle(TEST_BUNDLE_NAME)
path = os.path.join("foo", "bar", "baz", "module.py")
name = "module.py"
repo = "https://github.com/adafruit/SomeLibrary.git"
device_version = "1.2.3"
bundle_version = "version-3"
m = circup.Module(
path, repo, device_version, bundle_version, False, bundle, (None, None)
)
with mock.patch("circup.logger.warning") as mock_logger:
backend = DiskBackend("mock_device", mock_logger)
m = circup.Module(
name,
backend,
repo,
device_version,
bundle_version,
False,
bundle,
(None, None),
)
assert m.major_update is True
assert mock_logger.call_count == 2
@ -363,16 +421,23 @@ def test_Module_row():
a table of version-related results.
"""
bundle = circup.Bundle(TEST_BUNDLE_NAME)
path = os.path.join("foo", "bar", "baz", "module.py")
name = "module.py"
repo = "https://github.com/adafruit/SomeLibrary.git"
with mock.patch("circup.os.path.isfile", return_value=True), mock.patch(
"circup.CPY_VERSION", "8.0.0"
):
m = circup.Module(path, repo, "1.2.3", None, False, bundle, (None, None))
), mock.patch("circup.logger.warning") as mock_logger:
backend = DiskBackend("mock_device", mock_logger)
m = circup.Module(
name, backend, repo, "1.2.3", None, False, bundle, (None, None)
)
assert m.row == ("module", "1.2.3", "unknown", "Major Version")
m = circup.Module(path, repo, "1.2.3", "1.3.4", False, bundle, (None, None))
m = circup.Module(
name, backend, repo, "1.2.3", "1.3.4", False, bundle, (None, None)
)
assert m.row == ("module", "1.2.3", "1.3.4", "Minor Version")
m = circup.Module(path, repo, "1.2.3", "1.2.3", True, bundle, ("9.0.0", None))
m = circup.Module(
name, backend, repo, "1.2.3", "1.2.3", True, bundle, ("9.0.0", None)
)
assert m.row == ("module", "1.2.3", "1.2.3", "MPY Format")
@ -382,17 +447,25 @@ def test_Module_update_dir():
update the module on the connected device.
"""
bundle = circup.Bundle(TEST_BUNDLE_NAME)
path = os.path.join("foo", "bar", "baz", "module.py")
name = "adafruit_waveform"
repo = "https://github.com/adafruit/SomeLibrary.git"
device_version = "1.2.3"
bundle_version = None
m = circup.Module(
path, repo, device_version, bundle_version, False, bundle, (None, None)
)
with mock.patch("circup.shutil") as mock_shutil, mock.patch(
with mock.patch("circup.backends.shutil") as mock_shutil, mock.patch(
"circup.os.path.isdir", return_value=True
):
m.update()
), mock.patch("circup.logger.warning") as mock_logger:
backend = DiskBackend("mock_device", mock_logger)
m = circup.Module(
name,
backend,
repo,
device_version,
bundle_version,
False,
bundle,
(None, None),
)
backend.update(m)
mock_shutil.rmtree.assert_called_once_with(m.path, ignore_errors=True)
mock_shutil.copytree.assert_called_once_with(m.bundle_path, m.path)
@ -403,17 +476,31 @@ def test_Module_update_file():
update the module on the connected device.
"""
bundle = circup.Bundle(TEST_BUNDLE_NAME)
path = os.path.join("foo", "bar", "baz", "module.py")
name = "colorsys.py"
# path = os.path.join("foo", "bar", "baz", "module.py")
repo = "https://github.com/adafruit/SomeLibrary.git"
device_version = "1.2.3"
bundle_version = None
m = circup.Module(
path, repo, device_version, bundle_version, False, bundle, (None, None)
)
with mock.patch("circup.shutil") as mock_shutil, mock.patch(
with mock.patch("circup.backends.shutil") as mock_shutil, mock.patch(
"circup.os.remove"
) as mock_remove, mock.patch("circup.os.path.isdir", return_value=False):
m.update()
) as mock_remove, mock.patch(
"circup.os.path.isdir", return_value=False
), mock.patch(
"circup.logger.warning"
) as mock_logger:
backend = circup.DiskBackend("mock_device", mock_logger)
m = circup.Module(
name,
backend,
repo,
device_version,
bundle_version,
False,
bundle,
(None, None),
)
backend.update(m)
mock_remove.assert_called_once_with(m.path)
mock_shutil.copyfile.assert_called_once_with(m.bundle_path, m.path)
@ -422,16 +509,27 @@ def test_Module_repr():
"""
Ensure the repr(dict) is returned (helps when logging).
"""
path = os.path.join("foo", "bar", "baz", "local_module.py")
name = "local_module.py"
path = os.path.join("mock_device", "lib", f"{name}")
repo = "https://github.com/adafruit/SomeLibrary.git"
device_version = "1.2.3"
bundle_version = "3.2.1"
with mock.patch("circup.os.path.isfile", return_value=True), mock.patch(
"circup.CPY_VERSION", "4.1.2"
), mock.patch("circup.Bundle.lib_dir", return_value="tests"):
), mock.patch("circup.Bundle.lib_dir", return_value="tests"), mock.patch(
"circup.logger.warning"
) as mock_logger:
bundle = circup.Bundle(TEST_BUNDLE_NAME)
backend = circup.DiskBackend("mock_device", mock_logger)
m = circup.Module(
path, repo, device_version, bundle_version, False, bundle, (None, None)
name,
backend,
repo,
device_version,
bundle_version,
False,
bundle,
(None, None),
)
assert repr(m) == repr(
{
@ -563,8 +661,10 @@ def test_extract_metadata_python():
'print("Hello, world!")\n'
)
path = "foo.py"
with mock.patch("builtins.open", mock.mock_open(read_data=code)) as mock_open:
result = circup.extract_metadata(path)
with mock.patch(
"builtins.open", mock.mock_open(read_data=code)
) as mock_open, mock.patch("circup.logger.warning") as mock_logger:
result = circup.extract_metadata(path, mock_logger)
mock_open.assert_called_once_with(path, "r", encoding="utf-8")
assert len(result) == 3
assert result["__version__"] == "1.1.4"
@ -578,10 +678,11 @@ def test_extract_metadata_byte_code_v6():
Ensure the __version__ is correctly extracted from the bytecode ".mpy"
file generated from Circuitpython < 7. Version in test_module is 0.9.2
"""
result = circup.extract_metadata("tests/test_module.mpy")
assert result["__version__"] == "0.9.2"
assert result["mpy"] is True
assert result["compatibility"] == (None, "7.0.0-alpha.1")
with mock.patch("circup.logger.warning") as mock_logger:
result = circup.extract_metadata("tests/test_module.mpy", mock_logger)
assert result["__version__"] == "0.9.2"
assert result["mpy"] is True
assert result["compatibility"] == (None, "7.0.0-alpha.1")
def test_extract_metadata_byte_code_v7():
@ -589,10 +690,11 @@ def test_extract_metadata_byte_code_v7():
Ensure the __version__ is correctly extracted from the bytecode ".mpy"
file generated from Circuitpython >= 7. Version in local_module_cp7 is 1.2.3
"""
result = circup.extract_metadata("tests/local_module_cp7.mpy")
assert result["__version__"] == "1.2.3"
assert result["mpy"] is True
assert result["compatibility"] == ("7.0.0-alpha.1", "8.99.99")
with mock.patch("circup.logger.warning") as mock_logger:
result = circup.extract_metadata("tests/local_module_cp7.mpy", mock_logger)
assert result["__version__"] == "1.2.3"
assert result["mpy"] is True
assert result["compatibility"] == ("7.0.0-alpha.1", "8.99.99")
def test_find_modules():
@ -604,18 +706,23 @@ def test_find_modules():
device_modules = json.load(f)
with open("tests/bundle.json", "rb") as f:
bundle_modules = json.load(f)
with mock.patch(
"circup.get_device_versions", return_value=device_modules
"circup.DiskBackend.get_device_versions", return_value=device_modules
), mock.patch(
"circup.get_bundle_versions", return_value=bundle_modules
), mock.patch(
"circup.os.path.isfile", return_value=True
):
), mock.patch(
"circup.logger.warning"
) as mock_logger:
backend = DiskBackend("mock_device", mock_logger)
bundle = circup.Bundle(TEST_BUNDLE_NAME)
bundles_list = [bundle]
for module in bundle_modules:
bundle_modules[module]["bundle"] = bundle
result = circup.find_modules("", bundles_list)
result = circup.find_modules(backend, bundles_list)
assert len(result) == 1
assert result[0].name == "adafruit_74hc595"
assert (
@ -630,13 +737,16 @@ def test_find_modules_goes_bang():
and the utility exists with an error code of 1.
"""
with mock.patch(
"circup.get_device_versions", side_effect=Exception("BANG!")
"circup.DiskBackend.get_device_versions", side_effect=Exception("BANG!")
), mock.patch("circup.click") as mock_click, mock.patch(
"circup.sys.exit"
) as mock_exit:
) as mock_exit, mock.patch(
"circup.logger.warning"
) as mock_logger:
bundle = circup.Bundle(TEST_BUNDLE_NAME)
bundles_list = [bundle]
circup.find_modules("", bundles_list)
backend = DiskBackend("mock_devcie", mock_logger)
circup.find_modules(backend, bundles_list)
assert mock_click.echo.call_count == 1
mock_exit.assert_called_once_with(1)
@ -647,19 +757,21 @@ def test_get_bundle_versions():
Ensure ensure_latest_bundle is called even if lib_dir exists.
"""
with mock.patch("circup.ensure_latest_bundle") as mock_elb, mock.patch(
"circup.get_modules", return_value={"ok": {"name": "ok"}}
"circup._get_modules_file", return_value={"ok": {"name": "ok"}}
) as mock_gm, mock.patch("circup.CPY_VERSION", "4.1.2"), mock.patch(
"circup.Bundle.lib_dir", return_value="foo/bar/lib"
), mock.patch(
"circup.os.path.isdir", return_value=True
):
), mock.patch(
"circup.logger"
) as mock_logger:
bundle = circup.Bundle(TEST_BUNDLE_NAME)
bundles_list = [bundle]
assert circup.get_bundle_versions(bundles_list) == {
"ok": {"name": "ok", "bundle": bundle}
}
mock_elb.assert_called_once_with(bundle)
mock_gm.assert_called_once_with("foo/bar/lib")
mock_gm.assert_called_once_with("foo/bar/lib", mock_logger)
def test_get_bundle_versions_avoid_download():
@ -668,10 +780,12 @@ def test_get_bundle_versions_avoid_download():
Testing both cases: lib_dir exists and lib_dir doesn't exists.
"""
with mock.patch("circup.ensure_latest_bundle") as mock_elb, mock.patch(
"circup.get_modules", return_value={"ok": {"name": "ok"}}
"circup._get_modules_file", return_value={"ok": {"name": "ok"}}
) as mock_gm, mock.patch("circup.CPY_VERSION", "4.1.2"), mock.patch(
"circup.Bundle.lib_dir", return_value="foo/bar/lib"
):
), mock.patch(
"circup.logger"
) as mock_logger:
bundle = circup.Bundle(TEST_BUNDLE_NAME)
bundles_list = [bundle]
with mock.patch("circup.os.path.isdir", return_value=True):
@ -679,13 +793,13 @@ def test_get_bundle_versions_avoid_download():
"ok": {"name": "ok", "bundle": bundle}
}
assert mock_elb.call_count == 0
mock_gm.assert_called_once_with("foo/bar/lib")
mock_gm.assert_called_once_with("foo/bar/lib", mock_logger)
with mock.patch("circup.os.path.isdir", return_value=False):
assert circup.get_bundle_versions(bundles_list, avoid_download=True) == {
"ok": {"name": "ok", "bundle": bundle}
}
mock_elb.assert_called_once_with(bundle)
mock_gm.assert_called_with("foo/bar/lib")
mock_gm.assert_called_with("foo/bar/lib", mock_logger)
def test_get_circuitpython_version():
@ -693,36 +807,24 @@ def test_get_circuitpython_version():
Given valid content of a boot_out.txt file on a connected device, return
the version number of CircuitPython running on the board.
"""
device_path = "device"
data_no_id = (
"Adafruit CircuitPython 4.1.0 on 2019-08-02; "
"Adafruit CircuitPlayground Express with samd21g18"
)
with mock.patch("builtins.open", mock.mock_open(read_data=data_no_id)) as mock_open:
assert circup.get_circuitpython_version(device_path) == ("4.1.0", "")
mock_open.assert_called_once_with(
os.path.join(device_path, "boot_out.txt"), "r", encoding="utf-8"
)
data_with_id = data_no_id + "\r\n" "Board ID:this_is_a_board"
with mock.patch(
"builtins.open", mock.mock_open(read_data=data_with_id)
) as mock_open:
assert circup.get_circuitpython_version(device_path) == (
with mock.patch("circup.logger.warning") as mock_logger:
backend = DiskBackend("tests/mock_device", mock_logger)
assert backend.get_circuitpython_version() == (
"4.1.0",
"this_is_a_board",
)
mock_open.assert_called_once_with(
os.path.join(device_path, "boot_out.txt"), "r", encoding="utf-8"
)
def test_get_device_versions():
"""
Ensure get_modules is called with the path for the attached device.
"""
with mock.patch("circup.get_modules", return_value="ok") as mock_gm:
assert circup.get_device_versions("TESTDIR") == "ok"
mock_gm.assert_called_once_with(os.path.join("TESTDIR", "lib"))
with mock.patch(
"circup.DiskBackend.get_modules", return_value="ok"
) as mock_gm, mock.patch("circup.logger.warning") as mock_logger:
backend = circup.DiskBackend("mock_device", mock_logger)
assert backend.get_device_versions() == "ok"
mock_gm.assert_called_once_with(os.path.join("mock_device", "lib"))
def test_get_modules_empty_path():
@ -730,7 +832,9 @@ def test_get_modules_empty_path():
Sometimes a path to a device or bundle may be empty. Ensure, if this is the
case, an empty dictionary is returned.
"""
assert circup.get_modules("") == {}
with mock.patch("circup.logger.warning") as mock_logger:
backend = circup.DiskBackend("tests/mock_device", mock_logger)
assert backend.get_modules("") == {}
def test_get_modules_that_are_files():
@ -743,8 +847,11 @@ def test_get_modules_that_are_files():
os.path.join("tests", "local_module.py"),
os.path.join("tests", ".hidden_module.py"),
]
with mock.patch("circup.glob.glob", side_effect=[mods, [], []]):
result = circup.get_modules(path)
with mock.patch("circup.glob.glob", side_effect=[mods, [], []]), mock.patch(
"circup.logger.warning"
) as mock_logger:
backend = circup.DiskBackend("mock_device", mock_logger)
result = backend.get_modules(path)
assert len(result) == 1 # Hidden files are ignored.
assert "local_module" in result
assert result["local_module"]["path"] == os.path.join(
@ -766,8 +873,11 @@ def test_get_modules_that_are_directories():
os.path.join("tests", ".hidden_dir", ""),
]
mod_files = ["tests/dir_module/my_module.py", "tests/dir_module/__init__.py"]
with mock.patch("circup.glob.glob", side_effect=[[], [], mods, mod_files, []]):
result = circup.get_modules(path)
with mock.patch(
"circup.glob.glob", side_effect=[[], [], mods, mod_files, []]
), mock.patch("circup.logger.warning") as mock_logger:
backend = circup.DiskBackend("mock_device", mock_logger)
result = backend.get_modules(path)
assert len(result) == 1
assert "dir_module" in result
assert result["dir_module"]["path"] == os.path.join("tests", "dir_module", "")
@ -784,8 +894,11 @@ def test_get_modules_that_are_directories_with_no_metadata():
path = "tests" # mocked away in function.
mods = [os.path.join("tests", "bad_module", "")]
mod_files = ["tests/bad_module/my_module.py", "tests/bad_module/__init__.py"]
with mock.patch("circup.glob.glob", side_effect=[[], [], mods, mod_files, []]):
result = circup.get_modules(path)
with mock.patch(
"circup.glob.glob", side_effect=[[], [], mods, mod_files, []]
), mock.patch("circup.logger.warning") as mock_logger:
backend = circup.DiskBackend("mock_device", mock_logger)
result = backend.get_modules(path)
assert len(result) == 1
assert "bad_module" in result
assert result["bad_module"]["path"] == os.path.join("tests", "bad_module", "")
@ -1019,8 +1132,8 @@ def test_libraries_from_imports():
"adafruit_touchscreen",
]
test_file = str(pathlib.Path(__file__).parent / "import_styles.py")
result = circup.libraries_from_imports(test_file, mod_names)
print(result)
result = circup.libraries_from_code_py(test_file, mod_names)
assert result == [
"adafruit_bus_device",
"adafruit_button",
@ -1033,6 +1146,16 @@ def test_libraries_from_imports_bad():
"""Ensure that we catch an import error"""
TEST_BUNDLE_MODULES = {"one.py": {}, "two.py": {}, "three.py": {}}
runner = CliRunner()
with mock.patch("circup.get_bundle_versions", return_value=TEST_BUNDLE_MODULES):
result = runner.invoke(circup.install, ["--auto-file", "./tests/bad_python.py"])
result = runner.invoke(
circup.main,
[
"--path",
"./tests/mock_device/",
"install",
"--auto-file",
"./tests/bad_python.py",
],
)
assert result.exit_code == 2