Merge pull request #208 from FoamyGuy/refactoring_and_issue_fixes

Refactoring and issue fixes
This commit is contained in:
foamyguy 2024-04-09 17:54:45 -05:00 committed by GitHub
commit fe79ea8d3b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 1910 additions and 1818 deletions

2
.gitignore vendored
View file

@ -19,6 +19,7 @@ downloads/
eggs/
.eggs/
lib/
!tests/mock_device/lib/
lib64/
parts/
sdist/
@ -93,6 +94,7 @@ venv/
ENV/
env.bak/
venv.bak/
*_venv/
# Spyder project settings
.spyderproject

File diff suppressed because it is too large Load diff

156
circup/bundle.py Normal file
View file

@ -0,0 +1,156 @@
# SPDX-FileCopyrightText: 2019 Nicholas Tollervey, 2024 Tim Cocks, written for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
Class that represents a specific release of a Bundle.
"""
import os
import sys
import click
import requests
from circup.shared import (
DATA_DIR,
PLATFORMS,
REQUESTS_TIMEOUT,
tags_data_load,
get_latest_release_from_url,
)
from circup.logging import logger
class Bundle:
"""
All the links and file names for a bundle
"""
def __init__(self, repo):
"""
Initialise a Bundle created from its github info.
Construct all the strings in one place.
:param str repo: Repository string for github: "user/repository"
"""
vendor, bundle_id = repo.split("/")
bundle_id = bundle_id.lower().replace("_", "-")
self.key = repo
#
self.url = "https://github.com/" + repo
self.basename = bundle_id + "-{platform}-{tag}"
self.urlzip = self.basename + ".zip"
self.dir = os.path.join(DATA_DIR, vendor, bundle_id + "-{platform}")
self.zip = os.path.join(DATA_DIR, bundle_id + "-{platform}.zip")
self.url_format = self.url + "/releases/download/{tag}/" + self.urlzip
# tag
self._current = None
self._latest = None
def lib_dir(self, platform):
"""
This bundle's lib directory for the platform.
:param str platform: The platform identifier (py/6mpy/...).
:return: The path to the lib directory for the platform.
"""
tag = self.current_tag
return os.path.join(
self.dir.format(platform=platform),
self.basename.format(platform=PLATFORMS[platform], tag=tag),
"lib",
)
def requirements_for(self, library_name, toml_file=False):
"""
The requirements file for this library.
:param str library_name: The name of the library.
:return: The path to the requirements.txt file.
"""
platform = "py"
tag = self.current_tag
found_file = os.path.join(
self.dir.format(platform=platform),
self.basename.format(platform=PLATFORMS[platform], tag=tag),
"requirements",
library_name,
"requirements.txt" if not toml_file else "pyproject.toml",
)
if os.path.isfile(found_file):
with open(found_file, "r", encoding="utf-8") as read_this:
return read_this.read()
return None
@property
def current_tag(self):
"""
Lazy load current cached tag from the BUNDLE_DATA json file.
:return: The current cached tag value for the project.
"""
if self._current is None:
self._current = tags_data_load(logger).get(self.key, "0")
return self._current
@current_tag.setter
def current_tag(self, tag):
"""
Set the current cached tag (after updating).
:param str tag: The new value for the current tag.
:return: The current cached tag value for the project.
"""
self._current = tag
@property
def latest_tag(self):
"""
Lazy find the value of the latest tag for the bundle.
:return: The most recent tag value for the project.
"""
if self._latest is None:
self._latest = get_latest_release_from_url(
self.url + "/releases/latest", logger
)
return self._latest
def validate(self):
"""
Test the existence of the expected URLs (not their content)
"""
tag = self.latest_tag
if not tag or tag == "releases":
if "--verbose" in sys.argv:
click.secho(f' Invalid tag "{tag}"', fg="red")
return False
for platform in PLATFORMS.values():
url = self.url_format.format(platform=platform, tag=tag)
r = requests.get(url, stream=True, timeout=REQUESTS_TIMEOUT)
# pylint: disable=no-member
if r.status_code != requests.codes.ok:
if "--verbose" in sys.argv:
click.secho(f" Unable to find {os.path.split(url)[1]}", fg="red")
return False
# pylint: enable=no-member
return True
def __repr__(self):
"""
Helps with log files.
:return: A repr of a dictionary containing the Bundles's metadata.
"""
return repr(
{
"key": self.key,
"url": self.url,
"urlzip": self.urlzip,
"dir": self.dir,
"zip": self.zip,
"url_format": self.url_format,
"current": self._current,
"latest": self._latest,
}
)

562
circup/command_utils.py Normal file
View file

@ -0,0 +1,562 @@
# SPDX-FileCopyrightText: 2019 Nicholas Tollervey, 2024 Tim Cocks, written for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
Functions called from commands in order to provide behaviors and return information.
"""
import ctypes
import os
from subprocess import check_output
import sys
import shutil
import zipfile
import json
import re
import toml
import findimports
import requests
import click
from circup.shared import (
PLATFORMS,
REQUESTS_TIMEOUT,
_get_modules_file,
BUNDLE_CONFIG_OVERWRITE,
BUNDLE_CONFIG_FILE,
BUNDLE_CONFIG_LOCAL,
BUNDLE_DATA,
NOT_MCU_LIBRARIES,
tags_data_load,
)
from circup.logging import logger
from circup.module import Module
from circup.bundle import Bundle
WARNING_IGNORE_MODULES = (
"typing-extensions",
"pyasn1",
"circuitpython-typing",
)
def clean_library_name(assumed_library_name):
"""
Most CP repos and library names are look like this:
repo: Adafruit_CircuitPython_LC709203F
library: adafruit_lc709203f
But some do not and this handles cleaning that up.
Also cleans up if the pypi or reponame is passed in instead of the
CP library name.
:param str assumed_library_name: An assumed name of a library from user
or requirements.txt entry
:return: str proper library name
"""
not_standard_names = {
# Assumed Name : Actual Name
"adafruit_adafruitio": "adafruit_io",
"adafruit_asyncio": "asyncio",
"adafruit_busdevice": "adafruit_bus_device",
"adafruit_connectionmanager": "adafruit_connection_manager",
"adafruit_display_button": "adafruit_button",
"adafruit_neopixel": "neopixel",
"adafruit_sd": "adafruit_sdcard",
"adafruit_simpleio": "simpleio",
"pimoroni_ltr559": "pimoroni_circuitpython_ltr559",
}
if "circuitpython" in assumed_library_name:
# convert repo or pypi name to common library name
assumed_library_name = (
assumed_library_name.replace("-circuitpython-", "_")
.replace("_circuitpython_", "_")
.replace("-", "_")
)
if assumed_library_name in not_standard_names:
return not_standard_names[assumed_library_name]
return assumed_library_name
def completion_for_install(ctx, param, incomplete):
"""
Returns the list of available modules for the command line tab-completion
with the ``circup install`` command.
"""
# pylint: disable=unused-argument
available_modules = get_bundle_versions(get_bundles_list(), avoid_download=True)
module_names = {m.replace(".py", "") for m in available_modules}
if incomplete:
module_names = [name for name in module_names if name.startswith(incomplete)]
return sorted(module_names)
def ensure_latest_bundle(bundle):
"""
Ensure that there's a copy of the latest library bundle available so circup
can check the metadata contained therein.
:param Bundle bundle: the target Bundle object.
"""
logger.info("Checking library updates for %s.", bundle.key)
tag = bundle.latest_tag
do_update = False
if tag == bundle.current_tag:
for platform in PLATFORMS:
# missing directories (new platform added on an existing install
# or side effect of pytest or network errors)
do_update = do_update or not os.path.isdir(bundle.lib_dir(platform))
else:
do_update = True
if do_update:
logger.info("New version available (%s).", tag)
try:
get_bundle(bundle, tag)
tags_data_save_tag(bundle.key, tag)
except requests.exceptions.HTTPError as ex:
# See #20 for reason for this
click.secho(
(
"There was a problem downloading that platform bundle. "
"Skipping and using existing download if available."
),
fg="red",
)
logger.exception(ex)
else:
logger.info("Current bundle up to date %s.", tag)
def find_device():
"""
Return the location on the filesystem for the connected CircuitPython device.
This is based upon how Mu discovers this information.
:return: The path to the device on the local filesystem.
"""
device_dir = None
# Attempt to find the path on the filesystem that represents the plugged in
# CIRCUITPY board.
if os.name == "posix":
# Linux / OSX
for mount_command in ["mount", "/sbin/mount"]:
try:
mount_output = check_output(mount_command).splitlines()
mounted_volumes = [x.split()[2] for x in mount_output]
for volume in mounted_volumes:
if volume.endswith(b"CIRCUITPY"):
device_dir = volume.decode("utf-8")
except FileNotFoundError:
continue
elif os.name == "nt":
# Windows
def get_volume_name(disk_name):
"""
Each disk or external device connected to windows has an attribute
called "volume name". This function returns the volume name for the
given disk/device.
Based upon answer given here: http://stackoverflow.com/a/12056414
"""
vol_name_buf = ctypes.create_unicode_buffer(1024)
ctypes.windll.kernel32.GetVolumeInformationW(
ctypes.c_wchar_p(disk_name),
vol_name_buf,
ctypes.sizeof(vol_name_buf),
None,
None,
None,
None,
0,
)
return vol_name_buf.value
#
# In certain circumstances, volumes are allocated to USB
# storage devices which cause a Windows popup to raise if their
# volume contains no media. Wrapping the check in SetErrorMode
# with SEM_FAILCRITICALERRORS (1) prevents this popup.
#
old_mode = ctypes.windll.kernel32.SetErrorMode(1)
try:
for disk in "ABCDEFGHIJKLMNOPQRSTUVWXYZ":
path = "{}:\\".format(disk)
if os.path.exists(path) and get_volume_name(path) == "CIRCUITPY":
device_dir = path
# Report only the FIRST device found.
break
finally:
ctypes.windll.kernel32.SetErrorMode(old_mode)
else:
# No support for unknown operating systems.
raise NotImplementedError('OS "{}" not supported.'.format(os.name))
logger.info("Found device: %s", device_dir)
return device_dir
def find_modules(backend, bundles_list):
"""
Extracts metadata from the connected device and available bundles and
returns this as a list of Module instances representing the modules on the
device.
:param Backend backend: Backend with the device connection.
:param List[Bundle] bundles_list: List of supported bundles as Bundle objects.
:return: A list of Module instances describing the current state of the
modules on the connected device.
"""
# pylint: disable=broad-except,too-many-locals
try:
device_modules = backend.get_device_versions()
bundle_modules = get_bundle_versions(bundles_list)
result = []
for key, device_metadata in device_modules.items():
if key in bundle_modules:
path = device_metadata["path"]
bundle_metadata = bundle_modules[key]
repo = bundle_metadata.get("__repo__")
bundle = bundle_metadata.get("bundle")
device_version = device_metadata.get("__version__")
bundle_version = bundle_metadata.get("__version__")
mpy = device_metadata["mpy"]
compatibility = device_metadata.get("compatibility", (None, None))
module_name = (
path.split(os.sep)[-1]
if not path.endswith(os.sep)
else path[:-1].split(os.sep)[-1] + os.sep
)
m = Module(
module_name,
backend,
repo,
device_version,
bundle_version,
mpy,
bundle,
compatibility,
)
result.append(m)
return result
except Exception as ex:
# If it's not possible to get the device and bundle metadata, bail out
# with a friendly message and indication of what's gone wrong.
logger.exception(ex)
click.echo("There was a problem: {}".format(ex))
sys.exit(1)
# pylint: enable=broad-except,too-many-locals
def get_bundle(bundle, tag):
"""
Downloads and extracts the version of the bundle with the referenced tag.
The resulting zip file is saved on the local filesystem.
:param Bundle bundle: the target Bundle object.
:param str tag: The GIT tag to use to download the bundle.
"""
click.echo(f"Downloading latest bundles for {bundle.key} ({tag}).")
for platform, github_string in PLATFORMS.items():
# Report the platform: "8.x-mpy", etc.
click.echo(f"{github_string}:")
url = bundle.url_format.format(platform=github_string, tag=tag)
logger.info("Downloading bundle: %s", url)
r = requests.get(url, stream=True, timeout=REQUESTS_TIMEOUT)
# pylint: disable=no-member
if r.status_code != requests.codes.ok:
logger.warning("Unable to connect to %s", url)
r.raise_for_status()
# pylint: enable=no-member
total_size = int(r.headers.get("Content-Length"))
temp_zip = bundle.zip.format(platform=platform)
with click.progressbar(
r.iter_content(1024), label="Extracting:", length=total_size
) as pbar, open(temp_zip, "wb") as zip_fp:
for chunk in pbar:
zip_fp.write(chunk)
pbar.update(len(chunk))
logger.info("Saved to %s", temp_zip)
temp_dir = bundle.dir.format(platform=platform)
if os.path.isdir(temp_dir):
shutil.rmtree(temp_dir)
with zipfile.ZipFile(temp_zip, "r") as zfile:
zfile.extractall(temp_dir)
bundle.current_tag = tag
click.echo("\nOK\n")
def get_bundle_versions(bundles_list, avoid_download=False):
"""
Returns a dictionary of metadata from modules in the latest known release
of the library bundle. Uses the Python version (rather than the compiled
version) of the library modules.
:param List[Bundle] bundles_list: List of supported bundles as Bundle objects.
:param bool avoid_download: if True, download the bundle only if missing.
:return: A dictionary of metadata about the modules available in the
library bundle.
"""
all_the_modules = dict()
for bundle in bundles_list:
if not avoid_download or not os.path.isdir(bundle.lib_dir("py")):
ensure_latest_bundle(bundle)
path = bundle.lib_dir("py")
path_modules = _get_modules_file(path, logger)
for name, module in path_modules.items():
module["bundle"] = bundle
if name not in all_the_modules: # here we decide the order of priority
all_the_modules[name] = module
return all_the_modules
def get_bundles_dict():
"""
Retrieve the dictionary from BUNDLE_CONFIG_FILE (JSON).
Put the local dictionary in front, so it gets priority.
It's a dictionary of bundle string identifiers.
:return: Combined dictionaries from the config files.
"""
bundle_dict = get_bundles_local_dict()
try:
with open(BUNDLE_CONFIG_OVERWRITE, "rb") as bundle_config_json:
bundle_config = json.load(bundle_config_json)
except (FileNotFoundError, json.decoder.JSONDecodeError):
with open(BUNDLE_CONFIG_FILE, "rb") as bundle_config_json:
bundle_config = json.load(bundle_config_json)
for name, bundle in bundle_config.items():
if bundle not in bundle_dict.values():
bundle_dict[name] = bundle
return bundle_dict
def get_bundles_local_dict():
"""
Retrieve the local bundles from BUNDLE_CONFIG_LOCAL (JSON).
:return: Raw dictionary from the config file(s).
"""
try:
with open(BUNDLE_CONFIG_LOCAL, "rb") as bundle_config_json:
bundle_config = json.load(bundle_config_json)
if not isinstance(bundle_config, dict) or not bundle_config:
logger.error("Local bundle list invalid. Skipped.")
raise FileNotFoundError("Bad local bundle list")
return bundle_config
except (FileNotFoundError, json.decoder.JSONDecodeError):
return dict()
def get_bundles_list():
"""
Retrieve the list of bundles from the config dictionary.
:return: List of supported bundles as Bundle objects.
"""
bundle_config = get_bundles_dict()
bundles_list = [Bundle(bundle_config[b]) for b in bundle_config]
logger.info("Using bundles: %s", ", ".join(b.key for b in bundles_list))
return bundles_list
def get_circup_version():
"""Return the version of circup that is running. If not available, return None.
:return: Current version of circup, or None.
"""
try:
from importlib import metadata # pylint: disable=import-outside-toplevel
except ImportError:
try:
import importlib_metadata as metadata # pylint: disable=import-outside-toplevel
except ImportError:
return None
try:
return metadata.version("circup")
except metadata.PackageNotFoundError:
return None
def get_dependencies(*requested_libraries, mod_names, to_install=()):
"""
Return a list of other CircuitPython libraries required by the given list
of libraries
:param tuple requested_libraries: The libraries to search for dependencies
:param object mod_names: All the modules metadata from bundle
:param list(str) to_install: Modules already selected for installation.
:return: tuple of module names to install which we build
"""
# Internal variables
_to_install = to_install
_requested_libraries = []
_rl = requested_libraries[0]
if not requested_libraries[0]:
# If nothing is requested, we're done
return _to_install
for lib_name in _rl:
lower_lib_name = lib_name.lower()
if lower_lib_name in NOT_MCU_LIBRARIES:
logger.info(
"Skipping %s. It is not for microcontroller installs.", lib_name
)
else:
# Canonicalize, with some exceptions:
# adafruit-circuitpython-something => adafruit_something
canonical_lib_name = clean_library_name(lower_lib_name)
try:
# Don't process any names we can't find in mod_names
mod_names[canonical_lib_name] # pylint: disable=pointless-statement
_requested_libraries.append(canonical_lib_name)
except KeyError:
if canonical_lib_name not in WARNING_IGNORE_MODULES:
click.secho(
f"WARNING:\n\t{canonical_lib_name} is not a known CircuitPython library.",
fg="yellow",
)
if not _requested_libraries:
# If nothing is requested, we're done
return _to_install
for library in list(_requested_libraries):
if library not in _to_install:
_to_install = _to_install + (library,)
# get the requirements.txt from bundle
bundle = mod_names[library]["bundle"]
requirements_txt = bundle.requirements_for(library)
if requirements_txt:
_requested_libraries.extend(
libraries_from_requirements(requirements_txt)
)
circup_dependencies = get_circup_dependencies(bundle, library)
for circup_dependency in circup_dependencies:
_requested_libraries.append(circup_dependency)
# we've processed this library, remove it from the list
_requested_libraries.remove(library)
return get_dependencies(
tuple(_requested_libraries), mod_names=mod_names, to_install=_to_install
)
def get_circup_dependencies(bundle, library):
"""
Get the list of circup dependencies from pyproject.toml
e.g.
[circup]
circup_dependencies = ["dependency_name_here"]
:param bundle: The Bundle to look within
:param library: The Library to find pyproject.toml for and get dependencies from
:return: The list of dependency libraries that were found
"""
try:
pyproj_toml = bundle.requirements_for(library, toml_file=True)
if pyproj_toml:
pyproj_toml_data = toml.loads(pyproj_toml)
dependencies = pyproj_toml_data["circup"]["circup_dependencies"]
if isinstance(dependencies, list):
return dependencies
if isinstance(dependencies, str):
return (dependencies,)
return tuple()
except KeyError:
# no circup_dependencies in pyproject.toml
return tuple()
def libraries_from_requirements(requirements):
"""
Clean up supplied requirements.txt and turn into tuple of CP libraries
:param str requirements: A string version of a requirements.txt
:return: tuple of library names
"""
libraries = ()
for line in requirements.split("\n"):
line = line.lower().strip()
if line.startswith("#") or line == "":
# skip comments
pass
else:
# Remove everything after any pip style version specifiers
line = re.split("[<>=~[;]", line)[0].strip()
libraries = libraries + (line,)
return libraries
def save_local_bundles(bundles_data):
"""
Save the list of local bundles to the settings.
:param str key: The bundle's identifier/key.
"""
if len(bundles_data) > 0:
with open(BUNDLE_CONFIG_LOCAL, "w", encoding="utf-8") as data:
json.dump(bundles_data, data)
else:
if os.path.isfile(BUNDLE_CONFIG_LOCAL):
os.unlink(BUNDLE_CONFIG_LOCAL)
def tags_data_save_tag(key, tag):
"""
Add or change the saved tag value for a bundle.
:param str key: The bundle's identifier/key.
:param str tag: The new tag for the bundle.
"""
tags_data = tags_data_load(logger)
tags_data[key] = tag
with open(BUNDLE_DATA, "w", encoding="utf-8") as data:
json.dump(tags_data, data)
def libraries_from_code_py(code_py, mod_names):
"""
Parse the given code.py file and return the imported libraries
:param str code_py: Full path of the code.py file
:return: sequence of library names
"""
# pylint: disable=broad-except
try:
found_imports = findimports.find_imports(code_py)
except Exception as ex: # broad exception because anything could go wrong
logger.exception(ex)
click.secho('Unable to read the auto file: "{}"'.format(str(ex)), fg="red")
sys.exit(2)
# pylint: enable=broad-except
imports = [info.name.split(".", 1)[0] for info in found_imports]
return [r for r in imports if r in mod_names]
def get_device_path(host, password, path):
"""
:param host Hostname or IP address.
:param password REST API password.
:param path File system path.
:return device URL or None if the device cannot be found.
"""
if path:
device_path = path
elif host:
# pylint: enable=no-member
device_path = f"http://:{password}@" + host
else:
device_path = find_device()
return device_path

657
circup/commands.py Normal file
View file

@ -0,0 +1,657 @@
# SPDX-FileCopyrightText: 2019 Nicholas Tollervey, 2024 Tim Cocks, written for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
# ----------- CLI command definitions ----------- #
The following functions have IO side effects (for instance they emit to
stdout). Ergo, these are not checked with unit tests. Most of the
functionality they provide is provided by the functions from util_functions.py,
and the respective Backends which *are* tested. Most of the logic of the following
functions is to prepare things for presentation to / interaction with the user.
"""
import os
import time
import sys
import re
import logging
import update_checker
from semver import VersionInfo
import click
import requests
from circup.backends import WebBackend, DiskBackend
from circup.logging import logger, log_formatter, LOGFILE
from circup.shared import BOARDLESS_COMMANDS, get_latest_release_from_url
from circup.bundle import Bundle
from circup.command_utils import (
get_device_path,
get_circup_version,
find_modules,
get_bundles_list,
completion_for_install,
get_bundle_versions,
libraries_from_requirements,
libraries_from_code_py,
get_dependencies,
get_bundles_local_dict,
save_local_bundles,
get_bundles_dict,
)
@click.group()
@click.option(
"--verbose", is_flag=True, help="Comprehensive logging is sent to stdout."
)
@click.option(
"--path",
type=click.Path(exists=True, file_okay=False),
help="Path to CircuitPython directory. Overrides automatic path detection.",
)
@click.option(
"--host",
help="Hostname or IP address of a device. Overrides automatic path detection.",
)
@click.option(
"--password", help="Password to use for authentication when --host is used."
)
@click.option(
"--timeout",
default=30,
help="Specify the timeout in seconds for any network operations.",
)
@click.option(
"--board-id",
default=None,
help="Manual Board ID of the CircuitPython device. If provided in combination "
"with --cpy-version, it overrides the detected board ID.",
)
@click.option(
"--cpy-version",
default=None,
help="Manual CircuitPython version. If provided in combination "
"with --board-id, it overrides the detected CPy version.",
)
@click.version_option(
prog_name="CircUp",
message="%(prog)s, A CircuitPython module updater. Version %(version)s",
)
@click.pass_context
def main( # pylint: disable=too-many-locals
ctx, verbose, path, host, password, timeout, board_id, cpy_version
): # pragma: no cover
"""
A tool to manage and update libraries on a CircuitPython device.
"""
# pylint: disable=too-many-arguments,too-many-branches,too-many-statements,too-many-locals
ctx.ensure_object(dict)
ctx.obj["TIMEOUT"] = timeout
device_path = get_device_path(host, password, path)
using_webworkflow = "host" in ctx.params.keys() and ctx.params["host"] is not None
if using_webworkflow:
if host == "circuitpython.local":
click.echo("Checking versions.json on circuitpython.local to find hostname")
versions_resp = requests.get(
"http://circuitpython.local/cp/version.json", timeout=timeout
)
host = f'{versions_resp.json()["hostname"]}.local'
click.echo(f"Using hostname: {host}")
device_path = device_path.replace("circuitpython.local", host)
try:
ctx.obj["backend"] = WebBackend(
host=host, password=password, logger=logger, timeout=timeout
)
except ValueError as e:
click.secho(e, fg="red")
time.sleep(0.3)
sys.exit(1)
except RuntimeError as e:
click.secho(e, fg="red")
sys.exit(1)
else:
try:
ctx.obj["backend"] = DiskBackend(device_path, logger)
except ValueError as e:
print(e)
if verbose:
# Configure additional logging to stdout.
ctx.obj["verbose"] = True
verbose_handler = logging.StreamHandler(sys.stdout)
verbose_handler.setLevel(logging.INFO)
verbose_handler.setFormatter(log_formatter)
logger.addHandler(verbose_handler)
click.echo("Logging to {}\n".format(LOGFILE))
else:
ctx.obj["verbose"] = False
logger.info("### Started Circup ###")
# If a newer version of circup is available, print a message.
logger.info("Checking for a newer version of circup")
version = get_circup_version()
if version:
update_checker.update_check("circup", version)
# stop early if the command is boardless
if ctx.invoked_subcommand in BOARDLESS_COMMANDS or "--help" in sys.argv:
return
ctx.obj["DEVICE_PATH"] = device_path
latest_version = get_latest_release_from_url(
"https://github.com/adafruit/circuitpython/releases/latest", logger
)
if device_path is None or not ctx.obj["backend"].is_device_present():
click.secho("Could not find a connected CircuitPython device.", fg="red")
sys.exit(1)
else:
cpy_version, board_id = (
ctx.obj["backend"].get_circuitpython_version()
if board_id is None or cpy_version is None
else (cpy_version, board_id)
)
click.echo(
"Found device at {}, running CircuitPython {}.".format(
device_path, cpy_version
)
)
try:
if VersionInfo.parse(cpy_version) < VersionInfo.parse(latest_version):
click.secho(
"A newer version of CircuitPython ({}) is available.".format(
latest_version
),
fg="green",
)
if board_id:
url_download = f"https://circuitpython.org/board/{board_id}"
else:
url_download = "https://circuitpython.org/downloads"
click.secho("Get it here: {}".format(url_download), fg="green")
except ValueError as ex:
logger.warning("CircuitPython has incorrect semver value.")
logger.warning(ex)
@main.command()
@click.option("-r", "--requirement", is_flag=True)
@click.pass_context
def freeze(ctx, requirement): # pragma: no cover
"""
Output details of all the modules found on the connected CIRCUITPYTHON
device. Option -r saves output to requirements.txt file
"""
logger.info("Freeze")
modules = find_modules(ctx.obj["backend"], get_bundles_list())
if modules:
output = []
for module in modules:
output.append("{}=={}".format(module.name, module.device_version))
for module in output:
click.echo(module)
logger.info(module)
if requirement:
cwd = os.path.abspath(os.getcwd())
for i, module in enumerate(output):
output[i] += "\n"
overwrite = None
if os.path.exists(os.path.join(cwd, "requirements.txt")):
overwrite = click.confirm(
click.style(
"\nrequirements.txt file already exists in this location.\n"
"Do you want to overwrite it?",
fg="red",
),
default=False,
)
else:
overwrite = True
if overwrite:
with open(
cwd + "/" + "requirements.txt", "w", newline="\n", encoding="utf-8"
) as file:
file.truncate(0)
file.writelines(output)
else:
click.echo("No modules found on the device.")
@main.command("list")
@click.pass_context
def list_cli(ctx): # pragma: no cover
"""
Lists all out of date modules found on the connected CIRCUITPYTHON device.
"""
logger.info("List")
# Grab out of date modules.
data = [("Module", "Version", "Latest", "Update Reason")]
modules = [
m.row
for m in find_modules(ctx.obj["backend"], get_bundles_list())
if m.outofdate
]
if modules:
data += modules
# Nice tabular display.
col_width = [0, 0, 0, 0]
for row in data:
for i, word in enumerate(row):
col_width[i] = max(len(word) + 2, col_width[i])
dashes = tuple(("-" * (width - 1) for width in col_width))
data.insert(1, dashes)
click.echo(
"The following modules are out of date or probably need an update.\n"
"Major Updates may include breaking changes. Review before updating.\n"
"MPY Format changes from Circuitpython 8 to 9 require an update.\n"
)
for row in data:
output = ""
for index, cell in enumerate(row):
output += cell.ljust(col_width[index])
if "--verbose" not in sys.argv:
click.echo(output)
logger.info(output)
else:
click.echo("All modules found on the device are up to date.")
# pylint: disable=too-many-arguments,too-many-locals
@main.command()
@click.argument(
"modules", required=False, nargs=-1, shell_complete=completion_for_install
)
@click.option(
"pyext",
"--py",
is_flag=True,
help="Install the .py version of the module(s) instead of the mpy version.",
)
@click.option(
"-r",
"--requirement",
type=click.Path(exists=True, dir_okay=False),
help="specify a text file to install all modules listed in the text file."
" Typically requirements.txt.",
)
@click.option(
"--auto", "-a", is_flag=True, help="Install the modules imported in code.py."
)
@click.option(
"--auto-file",
default=None,
help="Specify the name of a file on the board to read for auto install."
" Also accepts an absolute path or a local ./ path.",
)
@click.pass_context
def install(ctx, modules, pyext, requirement, auto, auto_file): # pragma: no cover
"""
Install a named module(s) onto the device. Multiple modules
can be installed at once by providing more than one module name, each
separated by a space.
"""
# TODO: Ensure there's enough space on the device
available_modules = get_bundle_versions(get_bundles_list())
mod_names = {}
for module, metadata in available_modules.items():
mod_names[module.replace(".py", "").lower()] = metadata
if requirement:
with open(requirement, "r", encoding="utf-8") as rfile:
requirements_txt = rfile.read()
requested_installs = libraries_from_requirements(requirements_txt)
elif auto or auto_file:
if auto_file is None:
auto_file = "code.py"
print(f"Auto file: {auto_file}")
# pass a local file with "./" or "../"
is_relative = not isinstance(ctx.obj["backend"], WebBackend) or auto_file.split(
os.sep
)[0] in [os.path.curdir, os.path.pardir]
if not os.path.isabs(auto_file) and not is_relative:
auto_file = ctx.obj["backend"].get_file_path(auto_file or "code.py")
auto_file_path = ctx.obj["backend"].get_auto_file_path(auto_file)
print(f"Auto file path: {auto_file_path}")
if not os.path.isfile(auto_file_path):
# fell through to here when run from random folder on windows - ask backend.
new_auto_file = ctx.obj["backend"].get_file_path(auto_file)
if os.path.isfile(new_auto_file):
auto_file = new_auto_file
auto_file_path = ctx.obj["backend"].get_auto_file_path(auto_file)
print(f"Auto file path: {auto_file_path}")
else:
click.secho(f"Auto file not found: {auto_file}", fg="red")
sys.exit(1)
requested_installs = libraries_from_code_py(auto_file_path, mod_names)
else:
requested_installs = modules
requested_installs = sorted(set(requested_installs))
click.echo(f"Searching for dependencies for: {requested_installs}")
to_install = get_dependencies(requested_installs, mod_names=mod_names)
device_modules = ctx.obj["backend"].get_device_versions()
if to_install is not None:
to_install = sorted(to_install)
click.echo(f"Ready to install: {to_install}\n")
for library in to_install:
ctx.obj["backend"].install_module(
ctx.obj["DEVICE_PATH"], device_modules, library, pyext, mod_names
)
# pylint: enable=too-many-arguments,too-many-locals
@main.command()
@click.argument("match", required=False, nargs=1)
def show(match): # pragma: no cover
"""
Show a list of available modules in the bundle. These are modules which
*could* be installed on the device.
If MATCH is specified only matching modules will be listed.
"""
available_modules = get_bundle_versions(get_bundles_list())
module_names = sorted([m.replace(".py", "") for m in available_modules])
if match is not None:
match = match.lower()
module_names = [m for m in module_names if match in m]
click.echo("\n".join(module_names))
click.echo(
"{} shown of {} packages.".format(len(module_names), len(available_modules))
)
@main.command()
@click.argument("module", nargs=-1)
@click.pass_context
def uninstall(ctx, module): # pragma: no cover
"""
Uninstall a named module(s) from the connected device. Multiple modules
can be uninstalled at once by providing more than one module name, each
separated by a space.
"""
device_path = ctx.obj["DEVICE_PATH"]
print(f"Uninstalling {module} from {device_path}")
for name in module:
device_modules = ctx.obj["backend"].get_device_versions()
name = name.lower()
mod_names = {}
for module_item, metadata in device_modules.items():
mod_names[module_item.replace(".py", "").lower()] = metadata
if name in mod_names:
metadata = mod_names[name]
module_path = metadata["path"]
ctx.obj["backend"].uninstall(device_path, module_path)
click.echo("Uninstalled '{}'.".format(name))
else:
click.echo("Module '{}' not found on device.".format(name))
continue
# pylint: disable=too-many-branches
@main.command(
short_help=(
"Update modules on the device. "
"Use --all to automatically update all modules without Major Version warnings."
)
)
@click.option(
"update_all",
"--all",
is_flag=True,
help="Update all modules without Major Version warnings.",
)
@click.pass_context
# pylint: disable=too-many-locals
def update(ctx, update_all): # pragma: no cover
"""
Checks for out-of-date modules on the connected CIRCUITPYTHON device, and
prompts the user to confirm updating such modules.
"""
logger.info("Update")
# Grab current modules.
bundles_list = get_bundles_list()
installed_modules = find_modules(ctx.obj["backend"], bundles_list)
modules_to_update = [m for m in installed_modules if m.outofdate]
if not modules_to_update:
click.echo("None of the module[s] found on the device need an update.")
return
# Process out of date modules
updated_modules = []
click.echo("Found {} module[s] needing update.".format(len(modules_to_update)))
if not update_all:
click.echo("Please indicate which module[s] you wish to update:\n")
for module in modules_to_update:
update_flag = update_all
if "--verbose" in sys.argv:
click.echo(
"Device version: {}, Bundle version: {}".format(
module.device_version, module.bundle_version
)
)
if isinstance(module.bundle_version, str) and not VersionInfo.is_valid(
module.bundle_version
):
click.secho(
f"WARNING: Library {module.name} repo has incorrect __version__"
"\n\tmetadata. Circup will assume it needs updating."
"\n\tPlease file an issue in the library repo.",
fg="yellow",
)
if module.repo:
click.secho(f"\t{module.repo}", fg="yellow")
if not update_flag:
if module.bad_format:
click.secho(
f"WARNING: '{module.name}': module corrupted or in an"
" unknown mpy format. Updating is required.",
fg="yellow",
)
update_flag = click.confirm("Do you want to update?")
elif module.mpy_mismatch:
click.secho(
f"WARNING: '{module.name}': mpy format doesn't match the"
" device's Circuitpython version. Updating is required.",
fg="yellow",
)
update_flag = click.confirm("Do you want to update?")
elif module.major_update:
update_flag = click.confirm(
(
"'{}' is a Major Version update and may contain breaking "
"changes. Do you want to update?".format(module.name)
)
)
else:
update_flag = click.confirm("Update '{}'?".format(module.name))
if update_flag:
# pylint: disable=broad-except
try:
ctx.obj["backend"].update(module)
updated_modules.append(module.name)
click.echo("Updated {}".format(module.name))
except Exception as ex:
logger.exception(ex)
click.echo("Something went wrong, {} (check the logs)".format(str(ex)))
# pylint: enable=broad-except
if not updated_modules:
return
# We updated modules, look to see if any requirements are missing
click.echo(
"Checking {} updated module[s] for missing requirements.".format(
len(updated_modules)
)
)
available_modules = get_bundle_versions(bundles_list)
mod_names = {}
for module, metadata in available_modules.items():
mod_names[module.replace(".py", "").lower()] = metadata
missing_modules = get_dependencies(updated_modules, mod_names=mod_names)
device_modules = ctx.obj["backend"].get_device_versions()
# Process newly needed modules
if missing_modules is not None:
installed_module_names = [m.name for m in installed_modules]
missing_modules = set(missing_modules) - set(installed_module_names)
missing_modules = sorted(list(missing_modules))
click.echo(f"Ready to install: {missing_modules}\n")
for library in missing_modules:
ctx.obj["backend"].install_module(
ctx.obj["DEVICE_PATH"], device_modules, library, False, mod_names
)
# pylint: enable=too-many-branches
@main.command("bundle-show")
@click.option("--modules", is_flag=True, help="List all the modules per bundle.")
def bundle_show(modules):
"""
Show the list of bundles, default and local, with URL, current version
and latest version retrieved from the web.
"""
local_bundles = get_bundles_local_dict().values()
bundles = get_bundles_list()
available_modules = get_bundle_versions(bundles)
for bundle in bundles:
if bundle.key in local_bundles:
click.secho(bundle.key, fg="yellow")
else:
click.secho(bundle.key, fg="green")
click.echo(" " + bundle.url)
click.echo(" version = " + bundle.current_tag)
if modules:
click.echo("Modules:")
for name, mod in sorted(available_modules.items()):
if mod["bundle"] == bundle:
click.echo(f" {name} ({mod.get('__version__', '-')})")
@main.command("bundle-add")
@click.argument("bundle", nargs=-1)
@click.pass_context
def bundle_add(ctx, bundle):
"""
Add bundles to the local bundles list, by "user/repo" github string.
A series of tests to validate that the bundle exists and at least looks
like a bundle are done before validating it. There might still be errors
when the bundle is downloaded for the first time.
"""
if len(bundle) == 0:
click.secho(
"Must pass bundle argument, expecting github URL or `user/repository` string.",
fg="red",
)
return
bundles_dict = get_bundles_local_dict()
modified = False
for bundle_repo in bundle:
# cleanup in case seombody pastes the URL to the repo/releases
bundle_repo = re.sub(
r"https?://github.com/([^/]+/[^/]+)(/.*)?", r"\1", bundle_repo
)
if bundle_repo in bundles_dict.values():
click.secho("Bundle already in list.", fg="yellow")
click.secho(" " + bundle_repo, fg="yellow")
continue
try:
bundle_added = Bundle(bundle_repo)
except ValueError:
click.secho(
"Bundle string invalid, expecting github URL or `user/repository` string.",
fg="red",
)
click.secho(" " + bundle_repo, fg="red")
continue
result = requests.get(
"https://github.com/" + bundle_repo, timeout=ctx.obj["TIMEOUT"]
)
# pylint: disable=no-member
if result.status_code == requests.codes.NOT_FOUND:
click.secho("Bundle invalid, the repository doesn't exist (404).", fg="red")
click.secho(" " + bundle_repo, fg="red")
continue
# pylint: enable=no-member
if not bundle_added.validate():
click.secho(
"Bundle invalid, is the repository a valid circup bundle ?", fg="red"
)
click.secho(" " + bundle_repo, fg="red")
continue
# note: use bun as the dictionary key for uniqueness
bundles_dict[bundle_repo] = bundle_repo
modified = True
click.echo("Added " + bundle_repo)
click.echo(" " + bundle_added.url)
if modified:
# save the bundles list
save_local_bundles(bundles_dict)
# update and get the new bundles for the first time
get_bundle_versions(get_bundles_list())
@main.command("bundle-remove")
@click.argument("bundle", nargs=-1)
@click.option("--reset", is_flag=True, help="Remove all local bundles.")
def bundle_remove(bundle, reset):
"""
Remove one or more bundles from the local bundles list.
"""
if reset:
save_local_bundles({})
return
if len(bundle) == 0:
click.secho(
"Must pass bundle argument or --reset, expecting github URL or "
"`user/repository` string. Run circup bundle-show to see a list of bundles.",
fg="red",
)
return
bundle_config = list(get_bundles_dict().values())
bundles_local_dict = get_bundles_local_dict()
modified = False
for bun in bundle:
# cleanup in case seombody pastes the URL to the repo/releases
bun = re.sub(r"https?://github.com/([^/]+/[^/]+)(/.*)?", r"\1", bun)
found = False
for name, repo in list(bundles_local_dict.items()):
if bun in (name, repo):
found = True
click.secho(f"Bundle {repo}")
do_it = click.confirm("Do you want to remove that bundle ?")
if do_it:
click.secho("Removing the bundle from the local list", fg="yellow")
click.secho(f" {bun}", fg="yellow")
modified = True
del bundles_local_dict[name]
if not found:
if bun in bundle_config:
click.secho("Cannot remove built-in module:" "\n " + bun, fg="red")
else:
click.secho(
"Bundle not found in the local list, nothing removed:"
"\n " + bun,
fg="red",
)
if modified:
save_local_bundles(bundles_local_dict)

33
circup/logging.py Normal file
View file

@ -0,0 +1,33 @@
# SPDX-FileCopyrightText: 2019 Nicholas Tollervey, 2024 Tim Cocks, written for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
Logging utilities and configuration used by circup
"""
import os
import logging
from logging.handlers import RotatingFileHandler
import appdirs
from circup.shared import DATA_DIR
#: The directory containing the utility's log file.
LOG_DIR = appdirs.user_log_dir(appname="circup", appauthor="adafruit")
#: The location of the log file for the utility.
LOGFILE = os.path.join(LOG_DIR, "circup.log")
# Ensure DATA_DIR / LOG_DIR related directories and files exist.
if not os.path.exists(DATA_DIR): # pragma: no cover
os.makedirs(DATA_DIR)
if not os.path.exists(LOG_DIR): # pragma: no cover
os.makedirs(LOG_DIR)
# Setup logging.
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logfile_handler = RotatingFileHandler(LOGFILE, maxBytes=10_000_000, backupCount=0)
log_formatter = logging.Formatter(
"%(asctime)s %(levelname)s: %(message)s", datefmt="%m/%d/%Y %H:%M:%S"
)
logfile_handler.setFormatter(log_formatter)
logger.addHandler(logfile_handler)

209
circup/module.py Normal file
View file

@ -0,0 +1,209 @@
# SPDX-FileCopyrightText: 2019 Nicholas Tollervey, 2024 Tim Cocks, written for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
Class that represents a specific CircuitPython module on a device or in a Bundle.
"""
import os
from urllib.parse import urljoin, urlparse
from semver import VersionInfo
from circup.shared import BAD_FILE_FORMAT
from circup.backends import WebBackend
from circup.logging import logger
class Module:
"""
Represents a CircuitPython module.
"""
# pylint: disable=too-many-arguments
def __init__(
self,
name,
backend,
repo,
device_version,
bundle_version,
mpy,
bundle,
compatibility,
):
"""
The ``self.file`` and ``self.name`` attributes are constructed from
the ``path`` value. If the path is to a directory based module, the
resulting self.file value will be None, and the name will be the
basename of the directory path.
:param str name: The file name of the module.
:param Backend backend: The backend that the module is on.
:param str repo: The URL of the Git repository for this module.
:param str device_version: The semver value for the version on device.
:param str bundle_version: The semver value for the version in bundle.
:param bool mpy: Flag to indicate if the module is byte-code compiled.
:param Bundle bundle: Bundle object where the module is located.
:param (str,str) compatibility: Min and max versions of CP compatible with the mpy.
"""
self.name = name
self.backend = backend
self.path = (
urljoin(backend.library_path, name, allow_fragments=False)
if isinstance(backend, WebBackend)
else os.path.join(backend.library_path, name)
)
url = urlparse(self.path, allow_fragments=False)
if (
url.path.endswith("/")
if isinstance(backend, WebBackend)
else self.path.endswith(os.sep)
):
self.file = None
self.name = self.path.split(
"/" if isinstance(backend, WebBackend) else os.sep
)[-2]
else:
self.file = os.path.basename(url.path)
self.name = (
os.path.basename(url.path).replace(".py", "").replace(".mpy", "")
)
self.repo = repo
self.device_version = device_version
self.bundle_version = bundle_version
self.mpy = mpy
self.min_version = compatibility[0]
self.max_version = compatibility[1]
# Figure out the bundle path.
self.bundle_path = None
if self.mpy:
# Byte compiled, now check CircuitPython version.
major_version = self.backend.get_circuitpython_version()[0].split(".")[0]
bundle_platform = "{}mpy".format(major_version)
else:
# Regular Python
bundle_platform = "py"
# module path in the bundle
search_path = bundle.lib_dir(bundle_platform)
if self.file:
self.bundle_path = os.path.join(search_path, self.file)
else:
self.bundle_path = os.path.join(search_path, self.name)
logger.info(self)
# pylint: enable=too-many-arguments
@property
def outofdate(self):
"""
Returns a boolean to indicate if this module is out of date.
Treat mismatched MPY versions as out of date.
:return: Truthy indication if the module is out of date.
"""
if self.mpy_mismatch:
return True
if self.device_version and self.bundle_version:
try:
return VersionInfo.parse(self.device_version) < VersionInfo.parse(
self.bundle_version
)
except ValueError as ex:
logger.warning("Module '%s' has incorrect semver value.", self.name)
logger.warning(ex)
return True # Assume out of date to try to update.
@property
def bad_format(self):
"""A boolean indicating that the mpy file format could not be identified"""
return self.mpy and self.device_version == BAD_FILE_FORMAT
@property
def mpy_mismatch(self):
"""
Returns a boolean to indicate if this module's MPY version is compatible
with the board's current version of Circuitpython. A min or max version
that evals to False means no limit.
:return: Boolean indicating if the MPY versions don't match.
"""
if not self.mpy:
return False
try:
cpv = VersionInfo.parse(self.backend.get_circuitpython_version()[0])
except ValueError as ex:
logger.warning("CircuitPython has incorrect semver value.")
logger.warning(ex)
try:
if self.min_version and cpv < VersionInfo.parse(self.min_version):
return True # CP version too old
if self.max_version and cpv >= VersionInfo.parse(self.max_version):
return True # MPY version too old
except (TypeError, ValueError) as ex:
logger.warning(
"Module '%s' has incorrect MPY compatibility information.", self.name
)
logger.warning(ex)
return False
@property
def major_update(self):
"""
Returns a boolean to indicate if this is a major version update.
:return: Boolean indicating if this is a major version upgrade
"""
try:
if (
VersionInfo.parse(self.device_version).major
== VersionInfo.parse(self.bundle_version).major
):
return False
except (TypeError, ValueError) as ex:
logger.warning("Module '%s' has incorrect semver value.", self.name)
logger.warning(ex)
return True # Assume Major Version udpate.
@property
def row(self):
"""
Returns a tuple of items to display in a table row to show the module's
name, local version and remote version, and reason to update.
:return: A tuple containing the module's name, version on the connected
device, version in the latest bundle and reason to update.
"""
loc = self.device_version if self.device_version else "unknown"
rem = self.bundle_version if self.bundle_version else "unknown"
if self.mpy_mismatch:
update_reason = "MPY Format"
elif self.major_update:
update_reason = "Major Version"
else:
update_reason = "Minor Version"
return (self.name, loc, rem, update_reason)
def __repr__(self):
"""
Helps with log files.
:return: A repr of a dictionary containing the module's metadata.
"""
return repr(
{
"path": self.path,
"file": self.file,
"name": self.name,
"repo": self.repo,
"device_version": self.device_version,
"bundle_version": self.bundle_version,
"bundle_path": self.bundle_path,
"mpy": self.mpy,
"min_version": self.min_version,
"max_version": self.max_version,
}
)

View file

@ -9,8 +9,10 @@ and Backend class functions.
import glob
import os
import re
import json
import appdirs
import pkg_resources
import requests
#: Version identifier for a bad MPY file format
BAD_FILE_FORMAT = "Invalid"
@ -18,6 +20,37 @@ BAD_FILE_FORMAT = "Invalid"
#: The location of data files used by circup (following OS conventions).
DATA_DIR = appdirs.user_data_dir(appname="circup", appauthor="adafruit")
#: Module formats list (and the other form used in github files)
PLATFORMS = {"py": "py", "8mpy": "8.x-mpy", "9mpy": "9.x-mpy"}
#: Timeout for requests calls like get()
REQUESTS_TIMEOUT = 30
#: The path to the JSON file containing the metadata about the bundles.
BUNDLE_CONFIG_FILE = pkg_resources.resource_filename(
"circup", "config/bundle_config.json"
)
#: Overwrite the bundles list with this file (only done manually)
BUNDLE_CONFIG_OVERWRITE = os.path.join(DATA_DIR, "bundle_config.json")
#: The path to the JSON file containing the local list of bundles.
BUNDLE_CONFIG_LOCAL = os.path.join(DATA_DIR, "bundle_config_local.json")
#: The path to the JSON file containing the metadata about the bundles.
BUNDLE_DATA = os.path.join(DATA_DIR, "circup.json")
#: The libraries (and blank lines) which don't go on devices
NOT_MCU_LIBRARIES = [
"",
"adafruit-blinka",
"adafruit-blinka-bleio",
"adafruit-blinka-displayio",
"adafruit-circuitpython-typing",
"circuitpython_typing",
"pyserial",
]
#: Commands that do not require an attached board
BOARDLESS_COMMANDS = ["show", "bundle-add", "bundle-remove", "bundle-show"]
def _get_modules_file(path, logger):
"""
@ -142,3 +175,44 @@ def extract_metadata(path, logger):
# not a valid MPY file
result["__version__"] = BAD_FILE_FORMAT
return result
def tags_data_load(logger):
"""
Load the list of the version tags of the bundles on disk.
:return: a dict() of tags indexed by Bundle identifiers/keys.
"""
tags_data = None
try:
with open(BUNDLE_DATA, encoding="utf-8") as data:
try:
tags_data = json.load(data)
except json.decoder.JSONDecodeError as ex:
# Sometimes (why?) the JSON file becomes corrupt. In which case
# log it and carry on as if setting up for first time.
logger.error("Could not parse %s", BUNDLE_DATA)
logger.exception(ex)
except FileNotFoundError:
pass
if not isinstance(tags_data, dict):
tags_data = {}
return tags_data
def get_latest_release_from_url(url, logger):
"""
Find the tag name of the latest release by using HTTP HEAD and decoding the redirect.
:param str url: URL to the latest release page on a git repository.
:return: The most recent tag value for the release.
"""
logger.info("Requesting redirect information: %s", url)
response = requests.head(url, timeout=REQUESTS_TIMEOUT)
responseurl = response.url
if response.is_redirect:
responseurl = response.headers["Location"]
tag = responseurl.rsplit("/", 1)[-1]
logger.info("Tag: '%s'", tag)
return tag

View file

@ -64,7 +64,7 @@ release = "1.0"
#
# This is also used if you do content translation via gettext catalogs.
# Usually you set "language" from the command line for these cases.
language = None
language = "en"
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.

View file

@ -0,0 +1,4 @@
pytest
pytest-cov
pytest-faulthandler
pytest-random-order

View file

@ -0,0 +1,3 @@
SPDX-FileCopyrightText: 2024 Autogenerated by 'pip freeze'
SPDX-License-Identifier: MIT

View file

@ -1,58 +1,7 @@
alabaster==0.7.12
appdirs==1.4.3
astroid==2.6.6
atomicwrites==1.3.0
attrs==19.1.0
Babel==2.9.1
black==19.3b0
bleach==3.3.0
certifi==2023.7.22
chardet==3.0.4
charset-normalizer==2.0.4
click==8.0.1
coverage==4.5.4
docutils==0.15.2
findimports==2.1.0
idna==2.8
imagesize==1.1.0
importlib-metadata==4.12.0
isort==5.9.3
Jinja2==2.11.3
lazy-object-proxy==1.6.0
MarkupSafe==1.1.1
mccabe==0.6.1
more-itertools==7.2.0
packaging==19.1
pkginfo==1.5.0.1
pluggy==0.13.1
py==1.10.0
Pygments==2.15.0
pylint==2.9.6
pyparsing==2.4.2
pytest==5.1.2
pytest-cov==2.7.1
pytest-faulthandler==2.0.1
pytest-random-order==1.0.4
pytz==2019.2
readme-renderer==24.0
requests==2.31.0
requests-toolbelt==0.9.1
semver==3.0.1
six==1.12.0
snowballstemmer==1.9.0
Sphinx==2.2.0
sphinxcontrib-applehelp==1.0.1
sphinxcontrib-devhelp==1.0.1
sphinxcontrib-htmlhelp==1.0.2
sphinxcontrib-jsmath==1.0.1
sphinxcontrib-qthelp==1.0.2
sphinxcontrib-serializinghtml==1.1.3
toml==0.10.0
tqdm==4.35.0
twine==1.13.0
update-checker==0.18.0
urllib3==1.26.18
wcwidth==0.1.7
webencodings==0.5.1
wrapt==1.12.1
zipp==0.6.0
appdirs
Click
findimports
requests
semver
toml
update_checker

View file

@ -1,3 +1,3 @@
Adafruit CircuitPython 4.1.0 on 2019-08-02; Adafruit CircuitPlayground Express with samd21g18
Adafruit CircuitPython 8.1.0 on 2019-08-02; Adafruit CircuitPlayground Express with samd21g18
Board ID:this_is_a_board
UID:AAAABBBBCCCC

View file

@ -37,6 +37,15 @@ import requests
import circup
from circup import DiskBackend
from circup.command_utils import (
find_device,
ensure_latest_bundle,
get_bundle,
get_bundles_dict,
)
from circup.shared import PLATFORMS
from circup.module import Module
from circup.logging import logger
TEST_BUNDLE_CONFIG_JSON = "tests/test_bundle_config.json"
with open(TEST_BUNDLE_CONFIG_JSON, "rb") as tbc:
@ -52,21 +61,16 @@ def test_Bundle_init():
"""
Create a Bundle and check all the strings are set as expected.
"""
with mock.patch("circup.logger.info"), mock.patch(
"circup.os.path.isfile", return_value=True
), mock.patch("circup.CPY_VERSION", "4.1.2"), mock.patch(
"circup.tags_data_load", return_value=dict()
), mock.patch(
"circup.DATA_DIR", "DATA_DIR"
):
bundle = circup.Bundle(TEST_BUNDLE_NAME)
bundle = circup.Bundle(TEST_BUNDLE_NAME)
assert repr(bundle) == repr(
{
"key": TEST_BUNDLE_NAME,
"url": "https://github.com/" + TEST_BUNDLE_NAME,
"urlzip": "adafruit-circuitpython-bundle-{platform}-{tag}.zip",
"dir": "DATA_DIR/adafruit/adafruit-circuitpython-bundle-{platform}",
"zip": "DATA_DIR/adafruit-circuitpython-bundle-{platform}.zip",
"dir": circup.shared.DATA_DIR
+ "/adafruit/adafruit-circuitpython-bundle-{platform}",
"zip": circup.shared.DATA_DIR
+ "/adafruit-circuitpython-bundle-{platform}.zip",
"url_format": "https://github.com/"
+ TEST_BUNDLE_NAME
+ "/releases/download/{tag}/"
@ -82,20 +86,16 @@ def test_Bundle_lib_dir():
Check the return of Bundle.lib_dir with a test tag.
"""
bundle_data = {TEST_BUNDLE_NAME: "TESTTAG"}
with mock.patch("circup.logger.info"), mock.patch(
"circup.os.path.isfile", return_value=True
), mock.patch("circup.tags_data_load", return_value=bundle_data), mock.patch(
"circup.DATA_DIR", "DATA_DIR"
):
with mock.patch("circup.bundle.tags_data_load", return_value=bundle_data):
bundle = circup.Bundle(TEST_BUNDLE_NAME)
assert bundle.current_tag == "TESTTAG"
assert bundle.lib_dir("py") == (
"DATA_DIR/"
circup.shared.DATA_DIR + "/"
"adafruit/adafruit-circuitpython-bundle-py/"
"adafruit-circuitpython-bundle-py-TESTTAG/lib"
)
assert bundle.lib_dir("8mpy") == (
"DATA_DIR/"
circup.shared.DATA_DIR + "/"
"adafruit/adafruit-circuitpython-bundle-8mpy/"
"adafruit-circuitpython-bundle-8.x-mpy-TESTTAG/lib"
)
@ -106,15 +106,9 @@ def test_Bundle_latest_tag():
Check the latest tag gets through Bundle.latest_tag.
"""
bundle_data = {TEST_BUNDLE_NAME: "TESTTAG"}
with mock.patch("circup.logger.info"), mock.patch(
"circup.os.path.isfile", return_value=True
), mock.patch(
"circup.get_latest_release_from_url", return_value="BESTESTTAG"
), mock.patch(
"circup.tags_data_load", return_value=bundle_data
), mock.patch(
"circup.DATA_DIR", "DATA_DIR"
):
with mock.patch(
"circup.bundle.get_latest_release_from_url", return_value="BESTESTTAG"
), mock.patch("circup.bundle.tags_data_load", return_value=bundle_data):
bundle = circup.Bundle(TEST_BUNDLE_NAME)
assert bundle.latest_tag == "BESTESTTAG"
@ -123,16 +117,18 @@ def test_get_bundles_dict():
"""
Check we are getting the bundles list from BUNDLE_CONFIG_FILE.
"""
with mock.patch("circup.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON), mock.patch(
"circup.BUNDLE_CONFIG_LOCAL", ""
):
bundles_dict = circup.get_bundles_dict()
with mock.patch(
"circup.command_utils.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON
), mock.patch("circup.shared.BUNDLE_CONFIG_LOCAL", ""):
bundles_dict = get_bundles_dict()
assert bundles_dict == TEST_BUNDLE_DATA
with mock.patch("circup.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON), mock.patch(
"circup.BUNDLE_CONFIG_LOCAL", TEST_BUNDLE_CONFIG_LOCAL_JSON
with mock.patch(
"circup.command_utils.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON
), mock.patch(
"circup.command_utils.BUNDLE_CONFIG_LOCAL", TEST_BUNDLE_CONFIG_LOCAL_JSON
):
bundles_dict = circup.get_bundles_dict()
bundles_dict = get_bundles_dict()
expected_dict = {**TEST_BUNDLE_LOCAL_DATA, **TEST_BUNDLE_DATA}
assert bundles_dict == expected_dict
@ -141,16 +137,18 @@ def test_get_bundles_local_dict():
"""
Check we are getting the bundles list from BUNDLE_CONFIG_LOCAL.
"""
with mock.patch("circup.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON), mock.patch(
"circup.BUNDLE_CONFIG_LOCAL", ""
):
bundles_dict = circup.get_bundles_dict()
with mock.patch(
"circup.command_utils.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON
), mock.patch("circup.command_utils.BUNDLE_CONFIG_LOCAL", ""):
bundles_dict = get_bundles_dict()
assert bundles_dict == TEST_BUNDLE_DATA
with mock.patch("circup.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON), mock.patch(
"circup.BUNDLE_CONFIG_LOCAL", TEST_BUNDLE_CONFIG_LOCAL_JSON
with mock.patch(
"circup.command_utils.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON
), mock.patch(
"circup.command_utils.BUNDLE_CONFIG_LOCAL", TEST_BUNDLE_CONFIG_LOCAL_JSON
):
bundles_dict = circup.get_bundles_dict()
bundles_dict = get_bundles_dict()
expected_dict = {**TEST_BUNDLE_LOCAL_DATA, **TEST_BUNDLE_DATA}
assert bundles_dict == expected_dict
@ -159,9 +157,9 @@ def test_get_bundles_list():
"""
Check we are getting the bundles list from BUNDLE_CONFIG_FILE.
"""
with mock.patch("circup.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON), mock.patch(
"circup.BUNDLE_CONFIG_LOCAL", ""
):
with mock.patch(
"circup.command_utils.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON
), mock.patch("circup.command_utils.BUNDLE_CONFIG_LOCAL", ""):
bundles_list = circup.get_bundles_list()
bundle = circup.Bundle(TEST_BUNDLE_NAME)
assert repr(bundles_list) == repr([bundle])
@ -171,14 +169,14 @@ def test_save_local_bundles():
"""
Pretend to save local bundles.
"""
with mock.patch("circup.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON), mock.patch(
"circup.BUNDLE_CONFIG_LOCAL", ""
), mock.patch("circup.os.unlink") as mock_unlink, mock.patch(
"circup.json.dump"
with mock.patch(
"circup.command_utils.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON
), mock.patch("circup.command_utils.BUNDLE_CONFIG_LOCAL", ""), mock.patch(
"circup.os.unlink"
) as mock_unlink, mock.patch(
"circup.command_utils.json.dump"
) as mock_dump, mock.patch(
"circup.json.load", return_value=TEST_BUNDLE_DATA
), mock.patch(
"circup.open", mock.mock_open()
"circup.command_utils.open", mock.mock_open()
) as mock_open:
final_data = {**TEST_BUNDLE_DATA, **TEST_BUNDLE_LOCAL_DATA}
circup.save_local_bundles(final_data)
@ -190,14 +188,18 @@ def test_save_local_bundles_reset():
"""
Pretend to reset the local bundles.
"""
with mock.patch("circup.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON), mock.patch(
"circup.BUNDLE_CONFIG_LOCAL", "test/NOTEXISTS"
), mock.patch("circup.os.path.isfile", return_value=True), mock.patch(
with mock.patch(
"circup.command_utils.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON
), mock.patch(
"circup.command_utils.BUNDLE_CONFIG_LOCAL", "test/NOTEXISTS"
), mock.patch(
"circup.os.path.isfile", return_value=True
), mock.patch(
"circup.os.unlink"
) as mock_unlink, mock.patch(
"circup.json.load", return_value=TEST_BUNDLE_DATA
"circup.command_utils.json.load", return_value=TEST_BUNDLE_DATA
), mock.patch(
"circup.open", mock.mock_open()
"circup.command_utils.open", mock.mock_open()
) as mock_open:
circup.save_local_bundles({})
mock_open().write.assert_not_called()
@ -217,13 +219,13 @@ def test_Module_init_file_module():
with mock.patch("circup.logger.info") as mock_logger, mock.patch(
"circup.os.path.isfile", return_value=True
), mock.patch("circup.CPY_VERSION", "4.1.2"), mock.patch(
"circup.Bundle.lib_dir",
), mock.patch(
"circup.bundle.Bundle.lib_dir",
return_value="tests",
):
backend = DiskBackend("mock_device", mock_logger)
bundle = circup.Bundle(TEST_BUNDLE_NAME)
m = circup.Module(
m = Module(
name,
backend,
repo,
@ -250,21 +252,17 @@ def test_Module_init_directory_module():
directory based Python module.
"""
name = "dir_module/"
path = os.path.join("mock_device", "lib", f"{name}", "")
path = os.path.join("tests", "mock_device", "lib", f"{name}", "")
repo = "https://github.com/adafruit/SomeLibrary.git"
device_version = "1.2.3"
bundle_version = "3.2.1"
mpy = True
with mock.patch("circup.logger.info") as mock_logger, mock.patch(
"circup.os.path.isfile", return_value=False
), mock.patch("circup.CPY_VERSION", "4.1.2"), mock.patch(
"circup.DATA_DIR", "/tests/DATA_DIR"
), mock.patch(
"circup.Bundle.lib_dir", return_value="tests"
"circup.bundle.Bundle.lib_dir", return_value="tests"
):
backend = DiskBackend("mock_device", mock_logger)
backend = DiskBackend("tests/mock_device", mock_logger)
bundle = circup.Bundle(TEST_BUNDLE_NAME)
m = circup.Module(
m = Module(
name,
backend,
repo,
@ -295,17 +293,11 @@ def test_Module_outofdate():
name = "module.py"
repo = "https://github.com/adafruit/SomeLibrary.git"
with mock.patch("circup.logger.info") as mock_logger:
backend = DiskBackend("mock_device", mock_logger)
m1 = circup.Module(
name, backend, repo, "1.2.3", "3.2.1", False, bundle, (None, None)
)
m2 = circup.Module(
name, backend, repo, "1.2.3", "1.2.3", False, bundle, (None, None)
)
backend = DiskBackend("tests/mock_device", mock_logger)
m1 = Module(name, backend, repo, "1.2.3", "3.2.1", False, bundle, (None, None))
m2 = Module(name, backend, repo, "1.2.3", "1.2.3", False, bundle, (None, None))
# shouldn't happen!
m3 = circup.Module(
name, backend, repo, "3.2.1", "1.2.3", False, bundle, (None, None)
)
m3 = Module(name, backend, repo, "3.2.1", "1.2.3", False, bundle, (None, None))
assert m1.outofdate is True
assert m2.outofdate is False
assert m3.outofdate is False
@ -325,8 +317,8 @@ def test_Module_outofdate_bad_versions():
bundle_version = "3.2.1"
with mock.patch("circup.logger.warning") as mock_logger:
backend = DiskBackend("mock_device", mock_logger)
m = circup.Module(
backend = DiskBackend("tests/mock_device", mock_logger)
m = Module(
name,
backend,
repo,
@ -349,15 +341,11 @@ def test_Module_mpy_mismatch():
"""
name = "module.py"
repo = "https://github.com/adafruit/SomeLibrary.git"
with mock.patch("circup.CPY_VERSION", "8.0.0"), mock.patch(
"circup.logger.warning"
) as mock_logger:
backend = DiskBackend("mock_device", mock_logger)
with mock.patch("circup.logger.warning") as mock_logger:
backend = DiskBackend("tests/mock_device", mock_logger)
bundle = circup.Bundle(TEST_BUNDLE_NAME)
m1 = circup.Module(
name, backend, repo, "1.2.3", "1.2.3", True, bundle, (None, None)
)
m2 = circup.Module(
m1 = Module(name, backend, repo, "1.2.3", "1.2.3", True, bundle, (None, None))
m2 = Module(
name,
backend,
repo,
@ -367,17 +355,23 @@ def test_Module_mpy_mismatch():
bundle,
("7.0.0-alpha.1", "8.99.99"),
)
m3 = circup.Module(
m3 = Module(
name, backend, repo, "1.2.3", "1.2.3", True, bundle, (None, "7.0.0-alpha.1")
)
with mock.patch("circup.CPY_VERSION", "6.2.0"):
with mock.patch(
"circup.backends.DiskBackend.get_circuitpython_version",
return_value=("6.2.0", ""),
):
assert m1.mpy_mismatch is False
assert m1.outofdate is False
assert m2.mpy_mismatch is True
assert m2.outofdate is True
assert m3.mpy_mismatch is False
assert m3.outofdate is False
with mock.patch("circup.CPY_VERSION", "8.0.0"):
with mock.patch(
"circup.backends.DiskBackend.get_circuitpython_version",
return_value=("8.0.0", ""),
):
assert m1.mpy_mismatch is False
assert m1.outofdate is False
assert m2.mpy_mismatch is False
@ -401,7 +395,7 @@ def test_Module_major_update_bad_versions():
with mock.patch("circup.logger.warning") as mock_logger:
backend = DiskBackend("mock_device", mock_logger)
m = circup.Module(
m = Module(
name,
backend,
repo,
@ -424,20 +418,15 @@ def test_Module_row():
name = "module.py"
repo = "https://github.com/adafruit/SomeLibrary.git"
with mock.patch("circup.os.path.isfile", return_value=True), mock.patch(
"circup.CPY_VERSION", "8.0.0"
"circup.backends.DiskBackend.get_circuitpython_version",
return_value=("8.0.0", ""),
), mock.patch("circup.logger.warning") as mock_logger:
backend = DiskBackend("mock_device", mock_logger)
m = circup.Module(
name, backend, repo, "1.2.3", None, False, bundle, (None, None)
)
m = Module(name, backend, repo, "1.2.3", None, False, bundle, (None, None))
assert m.row == ("module", "1.2.3", "unknown", "Major Version")
m = circup.Module(
name, backend, repo, "1.2.3", "1.3.4", False, bundle, (None, None)
)
m = Module(name, backend, repo, "1.2.3", "1.3.4", False, bundle, (None, None))
assert m.row == ("module", "1.2.3", "1.3.4", "Minor Version")
m = circup.Module(
name, backend, repo, "1.2.3", "1.2.3", True, bundle, ("9.0.0", None)
)
m = Module(name, backend, repo, "1.2.3", "1.2.3", True, bundle, ("9.0.0", None))
assert m.row == ("module", "1.2.3", "1.2.3", "MPY Format")
@ -452,10 +441,10 @@ def test_Module_update_dir():
device_version = "1.2.3"
bundle_version = None
with mock.patch("circup.backends.shutil") as mock_shutil, mock.patch(
"circup.os.path.isdir", return_value=True
), mock.patch("circup.logger.warning") as mock_logger:
backend = DiskBackend("mock_device", mock_logger)
m = circup.Module(
"circup.logger.warning"
) as mock_logger:
backend = DiskBackend("tests/mock_device", mock_logger)
m = Module(
name,
backend,
repo,
@ -484,13 +473,9 @@ def test_Module_update_file():
with mock.patch("circup.backends.shutil") as mock_shutil, mock.patch(
"circup.os.remove"
) as mock_remove, mock.patch(
"circup.os.path.isdir", return_value=False
), mock.patch(
"circup.logger.warning"
) as mock_logger:
backend = circup.DiskBackend("mock_device", mock_logger)
m = circup.Module(
) as mock_remove, mock.patch("circup.logger.warning") as mock_logger:
backend = circup.DiskBackend("tests/mock_device", mock_logger)
m = Module(
name,
backend,
repo,
@ -515,13 +500,14 @@ def test_Module_repr():
device_version = "1.2.3"
bundle_version = "3.2.1"
with mock.patch("circup.os.path.isfile", return_value=True), mock.patch(
"circup.CPY_VERSION", "4.1.2"
"circup.backends.DiskBackend.get_circuitpython_version",
return_value=("4.1.2", ""),
), mock.patch("circup.Bundle.lib_dir", return_value="tests"), mock.patch(
"circup.logger.warning"
) as mock_logger:
bundle = circup.Bundle(TEST_BUNDLE_NAME)
backend = circup.DiskBackend("mock_device", mock_logger)
m = circup.Module(
m = Module(
name,
backend,
repo,
@ -555,8 +541,8 @@ def test_find_device_posix_exists():
with open("tests/mount_exists.txt", "rb") as fixture_file:
fixture = fixture_file.read()
with mock.patch("os.name", "posix"):
with mock.patch("circup.check_output", return_value=fixture):
assert circup.find_device() == "/media/ntoll/CIRCUITPY"
with mock.patch("circup.command_utils.check_output", return_value=fixture):
assert find_device() == "/media/ntoll/CIRCUITPY"
def test_find_device_posix_no_mount_command():
@ -568,8 +554,10 @@ def test_find_device_posix_no_mount_command():
with open("tests/mount_exists.txt", "rb") as fixture_file:
fixture = fixture_file.read()
mock_check = mock.MagicMock(side_effect=[FileNotFoundError, fixture])
with mock.patch("os.name", "posix"), mock.patch("circup.check_output", mock_check):
assert circup.find_device() == "/media/ntoll/CIRCUITPY"
with mock.patch("os.name", "posix"), mock.patch(
"circup.command_utils.check_output", mock_check
):
assert find_device() == "/media/ntoll/CIRCUITPY"
assert mock_check.call_count == 2
assert mock_check.call_args_list[0][0][0] == "mount"
assert mock_check.call_args_list[1][0][0] == "/sbin/mount"
@ -583,9 +571,9 @@ def test_find_device_posix_missing():
with open("tests/mount_missing.txt", "rb") as fixture_file:
fixture = fixture_file.read()
with mock.patch("os.name", "posix"), mock.patch(
"circup.check_output", return_value=fixture
"circup.command_utils.check_output", return_value=fixture
):
assert circup.find_device() is None
assert find_device() is None
def test_find_device_nt_exists():
@ -602,7 +590,7 @@ def test_find_device_nt_exists():
"os.path.exists", return_value=True
), mock.patch("ctypes.create_unicode_buffer", return_value=fake_buffer):
ctypes.windll = mock_windll
assert circup.find_device() == "A:\\"
assert find_device() == "A:\\"
def test_find_device_nt_missing():
@ -619,7 +607,7 @@ def test_find_device_nt_missing():
"os.path.exists", return_value=True
), mock.patch("ctypes.create_unicode_buffer", return_value=fake_buffer):
ctypes.windll = mock_windll
assert circup.find_device() is None
assert find_device() is None
def test_find_device_unknown_os():
@ -628,7 +616,7 @@ def test_find_device_unknown_os():
"""
with mock.patch("os.name", "foo"):
with pytest.raises(NotImplementedError) as ex:
circup.find_device()
find_device()
assert ex.value.args[0] == 'OS "foo" not supported.'
@ -643,8 +631,8 @@ def test_get_latest_release_from_url():
"/Adafruit_CircuitPython_Bundle/releases/tag/20190903"
}
expected_url = "https://github.com/" + TEST_BUNDLE_NAME + "/releases/latest"
with mock.patch("circup.requests.head", return_value=response) as mock_get:
result = circup.get_latest_release_from_url(expected_url)
with mock.patch("circup.shared.requests.head", return_value=response) as mock_get:
result = circup.get_latest_release_from_url(expected_url, logger)
assert result == "20190903"
mock_get.assert_called_once_with(expected_url, timeout=mock.ANY)
@ -710,9 +698,7 @@ def test_find_modules():
with mock.patch(
"circup.DiskBackend.get_device_versions", return_value=device_modules
), mock.patch(
"circup.get_bundle_versions", return_value=bundle_modules
), mock.patch(
"circup.os.path.isfile", return_value=True
"circup.command_utils.get_bundle_versions", return_value=bundle_modules
), mock.patch(
"circup.logger.warning"
) as mock_logger:
@ -738,7 +724,7 @@ def test_find_modules_goes_bang():
"""
with mock.patch(
"circup.DiskBackend.get_device_versions", side_effect=Exception("BANG!")
), mock.patch("circup.click") as mock_click, mock.patch(
), mock.patch("circup.command_utils.click") as mock_click, mock.patch(
"circup.sys.exit"
) as mock_exit, mock.patch(
"circup.logger.warning"
@ -756,14 +742,19 @@ def test_get_bundle_versions():
Ensure get_modules is called with the path for the library bundle.
Ensure ensure_latest_bundle is called even if lib_dir exists.
"""
with mock.patch("circup.ensure_latest_bundle") as mock_elb, mock.patch(
"circup._get_modules_file", return_value={"ok": {"name": "ok"}}
) as mock_gm, mock.patch("circup.CPY_VERSION", "4.1.2"), mock.patch(
"circup.Bundle.lib_dir", return_value="foo/bar/lib"
with mock.patch(
"circup.command_utils.ensure_latest_bundle"
) as mock_elb, mock.patch(
"circup.command_utils._get_modules_file", return_value={"ok": {"name": "ok"}}
) as mock_gm, mock.patch(
"circup.backends.DiskBackend.get_circuitpython_version",
return_value=("4.1.2", ""),
), mock.patch(
"circup.bundle.Bundle.lib_dir", return_value="foo/bar/lib"
), mock.patch(
"circup.os.path.isdir", return_value=True
), mock.patch(
"circup.logger"
"circup.command_utils.logger"
) as mock_logger:
bundle = circup.Bundle(TEST_BUNDLE_NAME)
bundles_list = [bundle]
@ -779,12 +770,17 @@ def test_get_bundle_versions_avoid_download():
When avoid_download is True and lib_dir exists, don't ensure_latest_bundle.
Testing both cases: lib_dir exists and lib_dir doesn't exists.
"""
with mock.patch("circup.ensure_latest_bundle") as mock_elb, mock.patch(
"circup._get_modules_file", return_value={"ok": {"name": "ok"}}
) as mock_gm, mock.patch("circup.CPY_VERSION", "4.1.2"), mock.patch(
with mock.patch(
"circup.command_utils.ensure_latest_bundle"
) as mock_elb, mock.patch(
"circup.command_utils._get_modules_file", return_value={"ok": {"name": "ok"}}
) as mock_gm, mock.patch(
"circup.backends.DiskBackend.get_circuitpython_version",
return_value=("4.1.2", ""),
), mock.patch(
"circup.Bundle.lib_dir", return_value="foo/bar/lib"
), mock.patch(
"circup.logger"
"circup.command_utils.logger"
) as mock_logger:
bundle = circup.Bundle(TEST_BUNDLE_NAME)
bundles_list = [bundle]
@ -810,7 +806,7 @@ def test_get_circuitpython_version():
with mock.patch("circup.logger.warning") as mock_logger:
backend = DiskBackend("tests/mock_device", mock_logger)
assert backend.get_circuitpython_version() == (
"4.1.0",
"8.1.0",
"this_is_a_board",
)
@ -822,9 +818,9 @@ def test_get_device_versions():
with mock.patch(
"circup.DiskBackend.get_modules", return_value="ok"
) as mock_gm, mock.patch("circup.logger.warning") as mock_logger:
backend = circup.DiskBackend("mock_device", mock_logger)
backend = circup.DiskBackend("tests/mock_device", mock_logger)
assert backend.get_device_versions() == "ok"
mock_gm.assert_called_once_with(os.path.join("mock_device", "lib"))
mock_gm.assert_called_once_with(os.path.join("tests", "mock_device", "lib"))
def test_get_modules_empty_path():
@ -847,7 +843,7 @@ def test_get_modules_that_are_files():
os.path.join("tests", "local_module.py"),
os.path.join("tests", ".hidden_module.py"),
]
with mock.patch("circup.glob.glob", side_effect=[mods, [], []]), mock.patch(
with mock.patch("circup.shared.glob.glob", side_effect=[mods, [], []]), mock.patch(
"circup.logger.warning"
) as mock_logger:
backend = circup.DiskBackend("mock_device", mock_logger)
@ -874,7 +870,7 @@ def test_get_modules_that_are_directories():
]
mod_files = ["tests/dir_module/my_module.py", "tests/dir_module/__init__.py"]
with mock.patch(
"circup.glob.glob", side_effect=[[], [], mods, mod_files, []]
"circup.shared.glob.glob", side_effect=[[], [], mods, mod_files, []]
), mock.patch("circup.logger.warning") as mock_logger:
backend = circup.DiskBackend("mock_device", mock_logger)
result = backend.get_modules(path)
@ -895,7 +891,7 @@ def test_get_modules_that_are_directories_with_no_metadata():
mods = [os.path.join("tests", "bad_module", "")]
mod_files = ["tests/bad_module/my_module.py", "tests/bad_module/__init__.py"]
with mock.patch(
"circup.glob.glob", side_effect=[[], [], mods, mod_files, []]
"circup.shared.glob.glob", side_effect=[[], [], mods, mod_files, []]
), mock.patch("circup.logger.warning") as mock_logger:
backend = circup.DiskBackend("mock_device", mock_logger)
result = backend.get_modules(path)
@ -911,15 +907,15 @@ def test_ensure_latest_bundle_no_bundle_data():
If there's no BUNDLE_DATA file (containing previous current version of the
bundle) then default to update.
"""
with mock.patch("circup.Bundle.latest_tag", "12345"), mock.patch(
with mock.patch("circup.bundle.Bundle.latest_tag", "12345"), mock.patch(
"circup.os.path.isfile", return_value=False
), mock.patch("circup.get_bundle") as mock_gb, mock.patch(
"circup.json"
), mock.patch("circup.command_utils.get_bundle") as mock_gb, mock.patch(
"circup.command_utils.json"
) as mock_json, mock.patch(
"circup.open"
"circup.command_utils.open"
):
bundle = circup.Bundle(TEST_BUNDLE_NAME)
circup.ensure_latest_bundle(bundle)
ensure_latest_bundle(bundle)
mock_gb.assert_called_once_with(bundle, "12345")
assert mock_json.dump.call_count == 1 # Current version saved to file.
@ -930,23 +926,21 @@ def test_ensure_latest_bundle_bad_bundle_data():
bundle) but it has been corrupted (which has sometimes happened during
manual testing) then default to update.
"""
with mock.patch("circup.Bundle.latest_tag", "12345"), mock.patch(
"circup.os.path.isfile", return_value=True
), mock.patch("circup.open"), mock.patch(
"circup.get_bundle"
) as mock_gb, mock.patch(
"circup.json.load", side_effect=json.decoder.JSONDecodeError("BANG!", "doc", 1)
with mock.patch("circup.bundle.Bundle.latest_tag", "12345"), mock.patch(
"circup.command_utils.open"
), mock.patch("circup.command_utils.get_bundle") as mock_gb, mock.patch(
"builtins.open", mock.mock_open(read_data="}{INVALID_JSON")
), mock.patch(
"circup.json.dump"
"circup.command_utils.json.dump"
), mock.patch(
"circup.logger"
"circup.bundle.logger"
) as mock_logger:
bundle = circup.Bundle(TEST_BUNDLE_NAME)
circup.ensure_latest_bundle(bundle)
ensure_latest_bundle(bundle)
mock_gb.assert_called_once_with(bundle, "12345")
# wrong file is opened twice (one at __init__, one at save())
assert mock_logger.error.call_count == 2
assert mock_logger.exception.call_count == 2
assert mock_logger.error.call_count == 1
assert mock_logger.exception.call_count == 1
def test_ensure_latest_bundle_to_update():
@ -954,16 +948,14 @@ def test_ensure_latest_bundle_to_update():
If the version found in the BUNDLE_DATA is out of date, then cause an
update to the bundle.
"""
with mock.patch("circup.Bundle.latest_tag", "54321"), mock.patch(
"circup.os.path.isfile", return_value=True
), mock.patch("circup.open"), mock.patch(
"circup.get_bundle"
) as mock_gb, mock.patch(
"circup.json"
with mock.patch("circup.bundle.Bundle.latest_tag", "54321"), mock.patch(
"circup.command_utils.open"
), mock.patch("circup.command_utils.get_bundle") as mock_gb, mock.patch(
"circup.command_utils.json"
) as mock_json:
mock_json.load.return_value = {TEST_BUNDLE_NAME: "12345"}
bundle = circup.Bundle(TEST_BUNDLE_NAME)
circup.ensure_latest_bundle(bundle)
ensure_latest_bundle(bundle)
mock_gb.assert_called_once_with(bundle, "54321")
assert mock_json.dump.call_count == 1 # Current version saved to file.
@ -979,17 +971,18 @@ def test_ensure_latest_bundle_to_update_http_error():
# ), mock.patch(
"circup.os.path.isfile",
return_value=True,
), mock.patch("circup.open"), mock.patch(
"circup.get_bundle", side_effect=requests.exceptions.HTTPError("404")
), mock.patch("circup.command_utils.open"), mock.patch(
"circup.command_utils.get_bundle",
side_effect=requests.exceptions.HTTPError("404"),
) as mock_gb, mock.patch(
"circup.json"
"circup.command_utils.json"
) as mock_json, mock.patch(
"circup.click.secho"
) as mock_click:
circup.Bundle.tags_data = dict()
mock_json.load.return_value = tags_data
bundle = circup.Bundle(TEST_BUNDLE_NAME)
circup.ensure_latest_bundle(bundle)
ensure_latest_bundle(bundle)
mock_gb.assert_called_once_with(bundle, "54321")
assert mock_json.dump.call_count == 0 # not saved.
assert mock_click.call_count == 1 # friendly message.
@ -1000,20 +993,20 @@ def test_ensure_latest_bundle_no_update():
If the version found in the BUNDLE_DATA is NOT out of date, just log the
fact and don't update.
"""
with mock.patch("circup.Bundle.latest_tag", "12345"), mock.patch(
"circup.os.path.isfile", return_value=True
), mock.patch("circup.os.path.isdir", return_value=True), mock.patch(
"circup.open"
), mock.patch(
"circup.get_bundle"
with mock.patch("circup.bundle.Bundle.latest_tag", "12345"), mock.patch(
"circup.command_utils.os.path.isdir", return_value=True
), mock.patch("circup.command_utils.open"), mock.patch(
"circup.command_utils.get_bundle"
) as mock_gb, mock.patch(
"circup.json"
) as mock_json, mock.patch(
"circup.logger"
"circup.command_utils.os.path.isfile", return_value=True
), mock.patch(
"circup.bundle.Bundle.current_tag", "12345"
), mock.patch(
"circup.command_utils.logger"
) as mock_logger:
mock_json.load.return_value = {TEST_BUNDLE_NAME: "12345"}
bundle = circup.Bundle(TEST_BUNDLE_NAME)
circup.ensure_latest_bundle(bundle)
ensure_latest_bundle(bundle)
assert mock_gb.call_count == 0
assert mock_logger.info.call_count == 2
@ -1029,25 +1022,25 @@ def test_get_bundle():
mock_progress = mock.MagicMock()
mock_progress().__enter__ = mock.MagicMock(return_value=["a", "b", "c"])
mock_progress().__exit__ = mock.MagicMock()
with mock.patch("circup.requests") as mock_requests, mock.patch(
with mock.patch("circup.command_utils.requests") as mock_requests, mock.patch(
"circup.click"
) as mock_click, mock.patch(
"circup.open", mock.mock_open()
"circup.command_utils.open", mock.mock_open()
) as mock_open, mock.patch(
"circup.os.path.isdir", return_value=True
), mock.patch(
"circup.shutil"
"circup.command_utils.shutil"
) as mock_shutil, mock.patch(
"circup.zipfile"
"circup.command_utils.zipfile"
) as mock_zipfile:
mock_click.progressbar = mock_progress
mock_requests.get().status_code = mock_requests.codes.ok
mock_requests.get.reset_mock()
tag = "12345"
bundle = circup.Bundle(TEST_BUNDLE_NAME)
circup.get_bundle(bundle, tag)
get_bundle(bundle, tag)
# how many bundles currently supported. i.e. 6x.mpy, 7x.mpy, py = 3 bundles
_bundle_count = len(circup.PLATFORMS)
_bundle_count = len(PLATFORMS)
assert mock_requests.get.call_count == _bundle_count
assert mock_open.call_count == _bundle_count
assert mock_shutil.rmtree.call_count == _bundle_count
@ -1060,9 +1053,9 @@ def test_get_bundle_network_error():
Ensure that if there is a network related error when grabbing the bundle
then the error is logged and re-raised for the HTTP status code.
"""
with mock.patch("circup.requests") as mock_requests, mock.patch(
"circup.tags_data_load", return_value=dict()
), mock.patch("circup.logger") as mock_logger:
with mock.patch("circup.command_utils.requests") as mock_requests, mock.patch(
"circup.shared.tags_data_load", return_value=dict()
), mock.patch("circup.command_utils.logger") as mock_logger:
# Force failure with != requests.codes.ok
mock_requests.get().status_code = mock_requests.codes.BANG
# Ensure raise_for_status actually raises an exception.
@ -1071,7 +1064,7 @@ def test_get_bundle_network_error():
tag = "12345"
with pytest.raises(Exception) as ex:
bundle = circup.Bundle(TEST_BUNDLE_NAME)
circup.get_bundle(bundle, tag)
get_bundle(bundle, tag)
assert ex.value.args[0] == "Bang!"
url = (
"https://github.com/" + TEST_BUNDLE_NAME + "/releases/download"
@ -1088,7 +1081,9 @@ def test_show_command():
"""
runner = CliRunner()
test_bundle_modules = ["one.py", "two.py", "three.py"]
with mock.patch("circup.get_bundle_versions", return_value=test_bundle_modules):
with mock.patch(
"circup.commands.get_bundle_versions", return_value=test_bundle_modules
):
result = runner.invoke(circup.show)
assert result.exit_code == 0
assert all(m.replace(".py", "") in result.output for m in test_bundle_modules)
@ -1100,7 +1095,9 @@ def test_show_match_command():
"""
runner = CliRunner()
test_bundle_modules = ["one.py", "two.py", "three.py"]
with mock.patch("circup.get_bundle_versions", return_value=test_bundle_modules):
with mock.patch(
"circup.commands.get_bundle_versions", return_value=test_bundle_modules
):
result = runner.invoke(circup.show, ["t"])
assert result.exit_code == 0
assert "one" not in result.output
@ -1108,11 +1105,13 @@ def test_show_match_command():
def test_show_match_py_command():
"""
Check that py does not match the .py extention in the module names
Check that py does not match the .py extension in the module names
"""
runner = CliRunner()
test_bundle_modules = ["one.py", "two.py", "three.py"]
with mock.patch("circup.get_bundle_versions", return_value=test_bundle_modules):
with mock.patch(
"circup.commands.get_bundle_versions", return_value=test_bundle_modules
):
result = runner.invoke(circup.show, ["py"])
assert result.exit_code == 0
assert "0 shown" in result.output
@ -1147,7 +1146,9 @@ def test_libraries_from_imports_bad():
TEST_BUNDLE_MODULES = {"one.py": {}, "two.py": {}, "three.py": {}}
runner = CliRunner()
with mock.patch("circup.get_bundle_versions", return_value=TEST_BUNDLE_MODULES):
with mock.patch(
"circup.commands.get_bundle_versions", return_value=TEST_BUNDLE_MODULES
):
result = runner.invoke(
circup.main,
[