From 8b413ffe1604c6dcd808c1fdbd5f2240bf0a0dc5 Mon Sep 17 00:00:00 2001 From: Vladimir Kotal Date: Wed, 22 Feb 2023 20:25:56 +0100 Subject: [PATCH] add support for web workflow fixes #156 --- .isort.cfg | 5 + circup/__init__.py | 497 +++++++++++++++++++++++++++++++++++-------- tests/test_circup.py | 4 +- 3 files changed, 420 insertions(+), 86 deletions(-) create mode 100644 .isort.cfg diff --git a/.isort.cfg b/.isort.cfg new file mode 100644 index 0000000..4c0cb09 --- /dev/null +++ b/.isort.cfg @@ -0,0 +1,5 @@ +# SPDX-FileCopyrightText: 2023 VladimĂ­r Kotal +# +# SPDX-License-Identifier: Unlicense +[settings] +profile = black diff --git a/circup/__init__.py b/circup/__init__.py index 2854983..441ebd8 100644 --- a/circup/__init__.py +++ b/circup/__init__.py @@ -13,18 +13,21 @@ import logging import os import re import shutil -from subprocess import check_output +import socket import sys +import tempfile import zipfile +from subprocess import check_output +from urllib.parse import urlparse import appdirs import click import findimports import pkg_resources import requests -from semver import VersionInfo import update_checker - +from requests.auth import HTTPBasicAuth +from semver import VersionInfo # Useful constants. #: Flag to indicate if the command is being run in verbose mode. @@ -237,7 +240,7 @@ class Module: resulting self.file value will be None, and the name will be the basename of the directory path. - :param str path: The path to the module on the connected + :param str path: The path or URL to the module on the connected CIRCUITPYTHON device. :param str repo: The URL of the Git repository for this module. :param str device_version: The semver value for the version on device. @@ -247,14 +250,25 @@ class Module: :param (str,str) compatibility: Min and max versions of CP compatible with the mpy. """ self.path = path - if os.path.isfile(self.path): - # Single file module. - self.file = os.path.basename(path) - self.name = self.file.replace(".py", "").replace(".mpy", "") + url = urlparse(path) + if url.scheme == "http": + if url.path.endswith(".py") or url.path.endswith(".mpy"): + self.file = os.path.basename(url.path) + self.name = ( + os.path.basename(url.path).replace(".py", "").replace(".mpy", "") + ) + else: + self.file = None + self.name = os.path.basename(url.path[:-1]) else: - # Directory based module. - self.file = None - self.name = os.path.basename(os.path.dirname(self.path)) + if os.path.isfile(self.path): + # Single file module. + self.file = os.path.basename(path) + self.name = self.file.replace(".py", "").replace(".mpy", "") + else: + # Directory based module. + self.file = None + self.name = os.path.basename(os.path.dirname(self.path)) self.repo = repo self.device_version = device_version self.bundle_version = bundle_version @@ -377,6 +391,32 @@ class Module: The caller is expected to handle any exceptions raised. """ + url = urlparse(self.path) + if url.scheme == "http": + self._update_http() + else: + self._update_file() + + def _update_http(self): + """ + Update the module using web workflow. + """ + if self.file: + # Copy the file (will overwrite). + install_file_http(self.bundle_path, self.path) + else: + # Delete the directory (recursive) first. + url = urlparse(self.path) + auth = HTTPBasicAuth("", url.password) + r = requests.delete(self.path, auth=auth) + r.raise_for_status() + + install_dir_http(self.bundle_path, self.path) + + def _update_file(self): + """ + Update the module using file system. + """ if os.path.isdir(self.path): # Delete and copy the directory. shutil.rmtree(self.path, ignore_errors=True) @@ -408,6 +448,45 @@ class Module: ) +def install_file_http(source, target): + """ + Install file to device using web workflow. + :param source source file. + :param target destination URL. Should have password embedded. + """ + url = urlparse(target) + auth = HTTPBasicAuth("", url.password) + + with open(source, "rb") as fp: + r = requests.put(target, fp.read(), auth=auth) + r.raise_for_status() + + +def install_dir_http(source, target): + """ + Install directory to device using web workflow. + :param source source directory. + :param target destination URL. Should have password embedded. + """ + url = urlparse(target) + auth = HTTPBasicAuth("", url.password) + + # Create the top level directory. + r = requests.put(target, auth=auth) + r.raise_for_status() + + # Traverse the directory structure and create the directories/files. + for root, dirs, files in os.walk(source): + rel_path = os.path.relpath(root, source) + for name in files: + with open(os.path.join(root, name), "rb") as fp: + r = requests.put(target + rel_path + "/" + name, fp.read(), auth=auth) + r.raise_for_status() + for name in dirs: + r = requests.put(target + rel_path + "/" + name, auth=auth) + r.raise_for_status() + + def clean_library_name(assumed_library_name): """ Most CP repos and library names are look like this: @@ -524,8 +603,6 @@ def extract_metadata(path): dunder_key_val = r"""(__\w+__)(?:\s*:\s*\w+)?\s*=\s*(?:['"]|\(\s)(.+)['"]""" for match in re.findall(dunder_key_val, content): result[match[0]] = str(match[1]) - if result: - logger.info("Extracted metadata: %s", result) elif path.endswith(".mpy"): result["mpy"] = True with open(path, "rb") as mpy_file: @@ -564,6 +641,9 @@ def extract_metadata(path): else: # not a valid MPY file result["__version__"] = BAD_FILE_FORMAT + + if result: + logger.info("Extracted metadata: %s", result) return result @@ -635,20 +715,20 @@ def find_device(): return device_dir -def find_modules(device_path, bundles_list): +def find_modules(device_url, bundles_list): """ Extracts metadata from the connected device and available bundles and returns this as a list of Module instances representing the modules on the device. - :param str device_path: The path to the connected board. + :param str device_url: The URL to the board. :param Bundle bundles_list: List of supported bundles as Bundle objects. :return: A list of Module instances describing the current state of the modules on the connected device. """ # pylint: disable=broad-except,too-many-locals try: - device_modules = get_device_versions(device_path) + device_modules = get_device_versions(device_url) bundle_modules = get_bundle_versions(bundles_list) result = [] for name, device_metadata in device_modules.items(): @@ -661,17 +741,16 @@ def find_modules(device_path, bundles_list): bundle_version = bundle_metadata.get("__version__") mpy = device_metadata["mpy"] compatibility = device_metadata.get("compatibility", (None, None)) - result.append( - Module( - path, - repo, - device_version, - bundle_version, - mpy, - bundle, - compatibility, - ) + m = Module( + path, + repo, + device_version, + bundle_version, + mpy, + bundle, + compatibility, ) + result.append(m) return result except Exception as ex: # If it's not possible to get the device and bundle metadata, bail out @@ -734,7 +813,7 @@ def get_bundle_versions(bundles_list, avoid_download=False): if not avoid_download or not os.path.isdir(bundle.lib_dir("py")): ensure_latest_bundle(bundle) path = bundle.lib_dir("py") - path_modules = get_modules(path) + path_modules = _get_modules_file(path) for name, module in path_modules.items(): module["bundle"] = bundle if name not in all_the_modules: # here we decide the order of priority @@ -792,7 +871,44 @@ def get_bundles_list(): return bundles_list -def get_circuitpython_version(device_path): +def get_circuitpython_version(device_url): + """ + Returns the version number of CircuitPython running on the board connected + via ``device_path``, along with the board ID. + + :param str device_url: device URL. Can be either file or http based. + :return: A tuple with the version string for CircuitPython and the board ID string. + """ + url = urlparse(device_url) + if url.scheme == "http": + return _get_circuitpython_version_http(device_url) + if url.scheme == "": + return _get_circuitpython_version_file(url.path) + + click.secho(f"Not supported URL scheme: {url.scheme}", fg="red") + sys.exit(1) + + +def _get_circuitpython_version_http(url): + """ + Returns the version number of CircuitPython running on the board connected + via ``device_path``, along with the board ID. This is obtained using + RESTful API from the /cp/version.json URL. + + :param str url: board URL. + :return: A tuple with the version string for CircuitPython and the board ID string. + """ + r = requests.get(url + "/cp/version.json") + # pylint: disable=no-member + if r.status_code != requests.codes.ok: + click.secho(f" Unable to get version from {url}: {r.status_code}", fg="red") + sys.exit(1) + # pylint: enable=no-member + ver_json = r.json() + return ver_json.get("version"), ver_json.get("board_id") + + +def _get_circuitpython_version_file(device_path): """ Returns the version number of CircuitPython running on the board connected via ``device_path``, along with the board ID. This is obtained from the @@ -808,6 +924,7 @@ def get_circuitpython_version(device_path): :param str device_path: The path to the connected board. :return: A tuple with the version string for CircuitPython and the board ID string. """ + try: with open( os.path.join(device_path, "boot_out.txt"), "r", encoding="utf-8" @@ -826,7 +943,8 @@ def get_circuitpython_version(device_path): ) logger.error("boot_out.txt not found.") sys.exit(1) - return (circuit_python, board_id) + + return circuit_python, board_id def get_circup_version(): @@ -907,15 +1025,34 @@ def get_dependencies(*requested_libraries, mod_names, to_install=()): ) -def get_device_versions(device_path): +def get_device_versions(device_url): """ Returns a dictionary of metadata from modules on the connected device. - :param str device_path: Path to the device volume. + :param str device_url: URL for the device. :return: A dictionary of metadata about the modules available on the connected device. """ - return get_modules(os.path.join(device_path, "lib")) + url = urlparse(device_url) + if url.scheme == "http": + return get_modules(device_url + "/fs/lib/") + + return get_modules(os.path.join(url.path, "lib")) + + +def get_modules(device_url): + """ + Get a dictionary containing metadata about all the Python modules found in + the referenced path. + + :param str device_url: URL to be used to find modules. + :return: A dictionary containing metadata about the found modules. + """ + url = urlparse(device_url) + if url.scheme == "http": + return _get_modules_http(device_url) + + return _get_modules_file(device_url) def get_latest_release_from_url(url): @@ -936,10 +1073,103 @@ def get_latest_release_from_url(url): return tag -def get_modules(path): +def _get_modules_http(url): + """ + Get a dictionary containing metadata about all the Python modules found using + the referenced URL. + + :param str url: URL for the modules. + :return: A dictionary containing metadata about the found modules. + """ + result = {} + u = urlparse(url) + auth = HTTPBasicAuth("", u.password) + r = requests.get(url, auth=auth, headers={"Accept": "application/json"}) + r.raise_for_status() + + directory_mods = [] + single_file_mods = [] + for entry in r.json(): + entry_name = entry.get("name") + if entry.get("directory"): + directory_mods.append(entry_name) + else: + if entry_name.endswith(".py") or entry_name.endswith(".mpy"): + single_file_mods.append(entry_name) + + _get_modules_http_single_mods(auth, result, single_file_mods, url) + _get_modules_http_dir_mods(auth, directory_mods, result, url) + + return result + + +def _get_modules_http_dir_mods(auth, directory_mods, result, url): + """ + :param auth HTTP authentication. + :param directory_mods list of modules. + :param result dictionary for the result. + :param url: URL of the device. + """ + for dm in directory_mods: + dm_url = url + dm + "/" + r = requests.get(dm_url, auth=auth, headers={"Accept": "application/json"}) + r.raise_for_status() + mpy = False + for entry in r.json(): + entry_name = entry.get("name") + if not entry.get("directory") and ( + entry_name.endswith(".py") or entry_name.endswith(".mpy") + ): + if entry_name.endswith(".mpy"): + mpy = True + r = requests.get(dm_url + entry_name, auth=auth) + r.raise_for_status() + idx = entry_name.rfind(".") + with tempfile.NamedTemporaryFile( + prefix=entry_name[:idx] + "-", suffix=entry_name[idx:], delete=False + ) as fp: + fp.write(r.content) + tmp_name = fp.name + metadata = extract_metadata(tmp_name) + os.remove(tmp_name) + if "__version__" in metadata: + metadata["path"] = dm_url + result[dm] = metadata + # break now if any of the submodules has a bad format + if metadata["__version__"] == BAD_FILE_FORMAT: + break + + if result.get(dm) is None: + result[dm] = {"path": dm_url, "mpy": mpy} + + +def _get_modules_http_single_mods(auth, result, single_file_mods, url): + """ + :param auth HTTP authentication. + :param single_file_mods list of modules. + :param result dictionary for the result. + :param url: URL of the device. + """ + for sfm in single_file_mods: + sfm_url = url + sfm + r = requests.get(sfm_url, auth=auth) + r.raise_for_status() + idx = sfm.rfind(".") + with tempfile.NamedTemporaryFile( + prefix=sfm[:idx] + "-", suffix=sfm[idx:], delete=False + ) as fp: + fp.write(r.content) + tmp_name = fp.name + metadata = extract_metadata(tmp_name) + os.remove(tmp_name) + metadata["path"] = sfm_url + result[sfm[:idx]] = metadata + + +def _get_modules_file(path): """ Get a dictionary containing metadata about all the Python modules found in - the referenced path. + the referenced file system path. :param str path: The directory in which to find modules. :return: A dictionary containing metadata about the found modules. @@ -999,53 +1229,94 @@ def install_module( if not name: click.echo("No module name(s) provided.") elif name in mod_names: - library_path = os.path.join(device_path, "lib") - if not os.path.exists(library_path): # pragma: no cover - os.makedirs(library_path) - metadata = mod_names[name] - bundle = metadata["bundle"] # Grab device modules to check if module already installed if name in device_modules: click.echo("'{}' is already installed.".format(name)) return + + # Create the library directory first. + url = urlparse(device_path) + if url.scheme == "http": + library_path = device_path + "/fs/lib/" + auth = HTTPBasicAuth("", url.password) + r = requests.put(library_path, auth=auth) + r.raise_for_status() + else: + library_path = os.path.join(device_path, "lib") + if not os.path.exists(library_path): # pragma: no cover + os.makedirs(library_path) + + metadata = mod_names[name] + bundle = metadata["bundle"] if pyext: # Use Python source for module. - source_path = metadata["path"] # Path to Python source version. - if os.path.isdir(source_path): - target = os.path.basename(os.path.dirname(source_path)) - target_path = os.path.join(library_path, target) - # Copy the directory. - shutil.copytree(source_path, target_path) - else: - target = os.path.basename(source_path) - target_path = os.path.join(library_path, target) - # Copy file. - shutil.copyfile(source_path, target_path) + _install_module_py(library_path, metadata) else: # Use pre-compiled mpy modules. - module_name = os.path.basename(metadata["path"]).replace(".py", ".mpy") - if not module_name: - # Must be a directory based module. - module_name = os.path.basename(os.path.dirname(metadata["path"])) - major_version = CPY_VERSION.split(".")[0] - bundle_platform = "{}mpy".format(major_version) - bundle_path = os.path.join(bundle.lib_dir(bundle_platform), module_name) - if os.path.isdir(bundle_path): - target_path = os.path.join(library_path, module_name) - # Copy the directory. - shutil.copytree(bundle_path, target_path) - elif os.path.isfile(bundle_path): - target = os.path.basename(bundle_path) - target_path = os.path.join(library_path, target) - # Copy file. - shutil.copyfile(bundle_path, target_path) - else: - raise IOError("Cannot find compiled version of module.") + _install_module_mpy(bundle, library_path, metadata) click.echo("Installed '{}'.".format(name)) else: click.echo("Unknown module named, '{}'.".format(name)) +def _install_module_mpy(bundle, library_path, metadata): + """ + :param bundle library bundle. + :param library_path library path + :param metadata dictionary. + """ + url = urlparse(library_path) + module_name = os.path.basename(metadata["path"]).replace(".py", ".mpy") + if not module_name: + # Must be a directory based module. + module_name = os.path.basename(os.path.dirname(metadata["path"])) + major_version = CPY_VERSION.split(".")[0] + bundle_platform = "{}mpy".format(major_version) + bundle_path = os.path.join(bundle.lib_dir(bundle_platform), module_name) + if os.path.isdir(bundle_path): + if url.scheme == "http": + install_dir_http(bundle_path, library_path + module_name) + else: + target_path = os.path.join(library_path, module_name) + # Copy the directory. + shutil.copytree(bundle_path, target_path) + elif os.path.isfile(bundle_path): + target = os.path.basename(bundle_path) + if url.scheme == "http": + install_file_http(bundle_path, library_path + target) + else: + target_path = os.path.join(library_path, target) + # Copy file. + shutil.copyfile(bundle_path, target_path) + else: + raise IOError("Cannot find compiled version of module.") + + +def _install_module_py(library_path, metadata): + """ + :param library_path library path + :param metadata dictionary. + """ + url = urlparse(library_path) + source_path = metadata["path"] # Path to Python source version. + if os.path.isdir(source_path): + target = os.path.basename(os.path.dirname(source_path)) + if url.scheme == "http": + install_dir_http(source_path, library_path + target) + else: + target_path = os.path.join(library_path, target) + # Copy the directory. + shutil.copytree(source_path, target_path) + else: + target = os.path.basename(source_path) + if url.scheme == "http": + install_file_http(source_path, library_path + target) + else: + target_path = os.path.join(library_path, target) + # Copy file. + shutil.copyfile(source_path, target_path) + + # pylint: enable=too-many-locals,too-many-branches @@ -1148,12 +1419,19 @@ def tags_data_save_tag(key, tag): type=click.Path(exists=True, file_okay=False), help="Path to CircuitPython directory. Overrides automatic path detection.", ) +@click.option( + "--host", + help="Hostname or IP address of a device. Overrides automatic path detection.", +) +@click.option( + "--password", help="Password to use for authentication when --host is used." +) @click.version_option( prog_name="CircUp", message="%(prog)s, A CircuitPython module updater. Version %(version)s", ) @click.pass_context -def main(ctx, verbose, path): # pragma: no cover +def main(ctx, verbose, path, host, password): # pragma: no cover """ A tool to manage and update libraries on a CircuitPython device. """ @@ -1178,10 +1456,8 @@ def main(ctx, verbose, path): # pragma: no cover # stop early if the command is boardless if ctx.invoked_subcommand in BOARDLESS_COMMANDS: return - if path: - device_path = path - else: - device_path = find_device() + + device_path = get_device_path(host, password, path) ctx.obj["DEVICE_PATH"] = device_path latest_version = get_latest_release_from_url( "https://github.com/adafruit/circuitpython/releases/latest" @@ -1215,6 +1491,37 @@ def main(ctx, verbose, path): # pragma: no cover logger.warning(ex) +def get_device_path(host, password, path): + """ + :param host Hostname or IP address. + :param password REST API password. + :param path File system path. + :return device URL or None if the device cannot be found. + """ + if path: + device_path = "file:///" + path + elif host: + if password is None: + click.secho("--host needs --password", fg="red") + sys.exit(1) + + # pylint: disable=no-member + # verify hostname/address + try: + socket.getaddrinfo(host, 80, proto=socket.IPPROTO_TCP) + except socket.gaierror: + click.secho("Invalid host: {}".format(host), fg="red") + sys.exit(1) + # pylint: enable=no-member + device_path = f"http://:{password}@" + host + else: + device_path = find_device() + if device_path is not None: + device_path = "file:///" + device_path + + return device_path + + @main.command() @click.option("-r", "--requirement", is_flag=True) @click.pass_context @@ -1368,31 +1675,53 @@ def uninstall(ctx, module): # pragma: no cover can be uninstalled at once by providing more than one module name, each separated by a space. """ + device_path = ctx.obj["DEVICE_PATH"] for name in module: - device_modules = get_device_versions(ctx.obj["DEVICE_PATH"]) + device_modules = get_device_versions(device_path) name = name.lower() mod_names = {} for module_item, metadata in device_modules.items(): mod_names[module_item.replace(".py", "").lower()] = metadata if name in mod_names: - library_path = os.path.join(ctx.obj["DEVICE_PATH"], "lib") metadata = mod_names[name] module_path = metadata["path"] - if os.path.isdir(module_path): - target = os.path.basename(os.path.dirname(module_path)) - target_path = os.path.join(library_path, target) - # Remove the directory. - shutil.rmtree(target_path) + url = urlparse(device_path) + if url.scheme == "http": + _uninstall_http(device_path, module_path) else: - target = os.path.basename(module_path) - target_path = os.path.join(library_path, target) - # Remove file - os.remove(target_path) + _uninstall_file(device_path, module_path) click.echo("Uninstalled '{}'.".format(name)) else: click.echo("Module '{}' not found on device.".format(name)) +def _uninstall_http(device_path, module_path): + """ + Uninstall given module on device using REST API. + """ + url = urlparse(device_path) + auth = HTTPBasicAuth("", url.password) + r = requests.delete(module_path, auth=auth) + r.raise_for_status() + + +def _uninstall_file(device_path, module_path): + """ + Uninstall module using local file system. + """ + library_path = os.path.join(device_path, "lib") + if os.path.isdir(module_path): + target = os.path.basename(os.path.dirname(module_path)) + target_path = os.path.join(library_path, target) + # Remove the directory. + shutil.rmtree(target_path) + else: + target = os.path.basename(module_path) + target_path = os.path.join(library_path, target) + # Remove file + os.remove(target_path) + + # pylint: disable=too-many-branches diff --git a/tests/test_circup.py b/tests/test_circup.py index 1faa69b..64331f3 100644 --- a/tests/test_circup.py +++ b/tests/test_circup.py @@ -647,7 +647,7 @@ def test_get_bundle_versions(): Ensure ensure_latest_bundle is called even if lib_dir exists. """ with mock.patch("circup.ensure_latest_bundle") as mock_elb, mock.patch( - "circup.get_modules", return_value={"ok": {"name": "ok"}} + "circup._get_modules_file", return_value={"ok": {"name": "ok"}} ) as mock_gm, mock.patch("circup.CPY_VERSION", "4.1.2"), mock.patch( "circup.Bundle.lib_dir", return_value="foo/bar/lib" ), mock.patch( @@ -668,7 +668,7 @@ def test_get_bundle_versions_avoid_download(): Testing both cases: lib_dir exists and lib_dir doesn't exists. """ with mock.patch("circup.ensure_latest_bundle") as mock_elb, mock.patch( - "circup.get_modules", return_value={"ok": {"name": "ok"}} + "circup._get_modules_file", return_value={"ok": {"name": "ok"}} ) as mock_gm, mock.patch("circup.CPY_VERSION", "4.1.2"), mock.patch( "circup.Bundle.lib_dir", return_value="foo/bar/lib" ):