add support for web workflow

fixes #156
This commit is contained in:
Vladimir Kotal 2023-02-22 20:25:56 +01:00
parent aa3c57ff22
commit 8b413ffe16
3 changed files with 420 additions and 86 deletions

5
.isort.cfg Normal file
View file

@ -0,0 +1,5 @@
# SPDX-FileCopyrightText: 2023 Vladimír Kotal
#
# SPDX-License-Identifier: Unlicense
[settings]
profile = black

View file

@ -13,18 +13,21 @@ import logging
import os
import re
import shutil
from subprocess import check_output
import socket
import sys
import tempfile
import zipfile
from subprocess import check_output
from urllib.parse import urlparse
import appdirs
import click
import findimports
import pkg_resources
import requests
from semver import VersionInfo
import update_checker
from requests.auth import HTTPBasicAuth
from semver import VersionInfo
# Useful constants.
#: Flag to indicate if the command is being run in verbose mode.
@ -237,7 +240,7 @@ class Module:
resulting self.file value will be None, and the name will be the
basename of the directory path.
:param str path: The path to the module on the connected
:param str path: The path or URL to the module on the connected
CIRCUITPYTHON device.
:param str repo: The URL of the Git repository for this module.
:param str device_version: The semver value for the version on device.
@ -247,14 +250,25 @@ class Module:
:param (str,str) compatibility: Min and max versions of CP compatible with the mpy.
"""
self.path = path
if os.path.isfile(self.path):
# Single file module.
self.file = os.path.basename(path)
self.name = self.file.replace(".py", "").replace(".mpy", "")
url = urlparse(path)
if url.scheme == "http":
if url.path.endswith(".py") or url.path.endswith(".mpy"):
self.file = os.path.basename(url.path)
self.name = (
os.path.basename(url.path).replace(".py", "").replace(".mpy", "")
)
else:
self.file = None
self.name = os.path.basename(url.path[:-1])
else:
# Directory based module.
self.file = None
self.name = os.path.basename(os.path.dirname(self.path))
if os.path.isfile(self.path):
# Single file module.
self.file = os.path.basename(path)
self.name = self.file.replace(".py", "").replace(".mpy", "")
else:
# Directory based module.
self.file = None
self.name = os.path.basename(os.path.dirname(self.path))
self.repo = repo
self.device_version = device_version
self.bundle_version = bundle_version
@ -377,6 +391,32 @@ class Module:
The caller is expected to handle any exceptions raised.
"""
url = urlparse(self.path)
if url.scheme == "http":
self._update_http()
else:
self._update_file()
def _update_http(self):
"""
Update the module using web workflow.
"""
if self.file:
# Copy the file (will overwrite).
install_file_http(self.bundle_path, self.path)
else:
# Delete the directory (recursive) first.
url = urlparse(self.path)
auth = HTTPBasicAuth("", url.password)
r = requests.delete(self.path, auth=auth)
r.raise_for_status()
install_dir_http(self.bundle_path, self.path)
def _update_file(self):
"""
Update the module using file system.
"""
if os.path.isdir(self.path):
# Delete and copy the directory.
shutil.rmtree(self.path, ignore_errors=True)
@ -408,6 +448,45 @@ class Module:
)
def install_file_http(source, target):
"""
Install file to device using web workflow.
:param source source file.
:param target destination URL. Should have password embedded.
"""
url = urlparse(target)
auth = HTTPBasicAuth("", url.password)
with open(source, "rb") as fp:
r = requests.put(target, fp.read(), auth=auth)
r.raise_for_status()
def install_dir_http(source, target):
"""
Install directory to device using web workflow.
:param source source directory.
:param target destination URL. Should have password embedded.
"""
url = urlparse(target)
auth = HTTPBasicAuth("", url.password)
# Create the top level directory.
r = requests.put(target, auth=auth)
r.raise_for_status()
# Traverse the directory structure and create the directories/files.
for root, dirs, files in os.walk(source):
rel_path = os.path.relpath(root, source)
for name in files:
with open(os.path.join(root, name), "rb") as fp:
r = requests.put(target + rel_path + "/" + name, fp.read(), auth=auth)
r.raise_for_status()
for name in dirs:
r = requests.put(target + rel_path + "/" + name, auth=auth)
r.raise_for_status()
def clean_library_name(assumed_library_name):
"""
Most CP repos and library names are look like this:
@ -524,8 +603,6 @@ def extract_metadata(path):
dunder_key_val = r"""(__\w+__)(?:\s*:\s*\w+)?\s*=\s*(?:['"]|\(\s)(.+)['"]"""
for match in re.findall(dunder_key_val, content):
result[match[0]] = str(match[1])
if result:
logger.info("Extracted metadata: %s", result)
elif path.endswith(".mpy"):
result["mpy"] = True
with open(path, "rb") as mpy_file:
@ -564,6 +641,9 @@ def extract_metadata(path):
else:
# not a valid MPY file
result["__version__"] = BAD_FILE_FORMAT
if result:
logger.info("Extracted metadata: %s", result)
return result
@ -635,20 +715,20 @@ def find_device():
return device_dir
def find_modules(device_path, bundles_list):
def find_modules(device_url, bundles_list):
"""
Extracts metadata from the connected device and available bundles and
returns this as a list of Module instances representing the modules on the
device.
:param str device_path: The path to the connected board.
:param str device_url: The URL to the board.
:param Bundle bundles_list: List of supported bundles as Bundle objects.
:return: A list of Module instances describing the current state of the
modules on the connected device.
"""
# pylint: disable=broad-except,too-many-locals
try:
device_modules = get_device_versions(device_path)
device_modules = get_device_versions(device_url)
bundle_modules = get_bundle_versions(bundles_list)
result = []
for name, device_metadata in device_modules.items():
@ -661,17 +741,16 @@ def find_modules(device_path, bundles_list):
bundle_version = bundle_metadata.get("__version__")
mpy = device_metadata["mpy"]
compatibility = device_metadata.get("compatibility", (None, None))
result.append(
Module(
path,
repo,
device_version,
bundle_version,
mpy,
bundle,
compatibility,
)
m = Module(
path,
repo,
device_version,
bundle_version,
mpy,
bundle,
compatibility,
)
result.append(m)
return result
except Exception as ex:
# If it's not possible to get the device and bundle metadata, bail out
@ -734,7 +813,7 @@ def get_bundle_versions(bundles_list, avoid_download=False):
if not avoid_download or not os.path.isdir(bundle.lib_dir("py")):
ensure_latest_bundle(bundle)
path = bundle.lib_dir("py")
path_modules = get_modules(path)
path_modules = _get_modules_file(path)
for name, module in path_modules.items():
module["bundle"] = bundle
if name not in all_the_modules: # here we decide the order of priority
@ -792,7 +871,44 @@ def get_bundles_list():
return bundles_list
def get_circuitpython_version(device_path):
def get_circuitpython_version(device_url):
"""
Returns the version number of CircuitPython running on the board connected
via ``device_path``, along with the board ID.
:param str device_url: device URL. Can be either file or http based.
:return: A tuple with the version string for CircuitPython and the board ID string.
"""
url = urlparse(device_url)
if url.scheme == "http":
return _get_circuitpython_version_http(device_url)
if url.scheme == "":
return _get_circuitpython_version_file(url.path)
click.secho(f"Not supported URL scheme: {url.scheme}", fg="red")
sys.exit(1)
def _get_circuitpython_version_http(url):
"""
Returns the version number of CircuitPython running on the board connected
via ``device_path``, along with the board ID. This is obtained using
RESTful API from the /cp/version.json URL.
:param str url: board URL.
:return: A tuple with the version string for CircuitPython and the board ID string.
"""
r = requests.get(url + "/cp/version.json")
# pylint: disable=no-member
if r.status_code != requests.codes.ok:
click.secho(f" Unable to get version from {url}: {r.status_code}", fg="red")
sys.exit(1)
# pylint: enable=no-member
ver_json = r.json()
return ver_json.get("version"), ver_json.get("board_id")
def _get_circuitpython_version_file(device_path):
"""
Returns the version number of CircuitPython running on the board connected
via ``device_path``, along with the board ID. This is obtained from the
@ -808,6 +924,7 @@ def get_circuitpython_version(device_path):
:param str device_path: The path to the connected board.
:return: A tuple with the version string for CircuitPython and the board ID string.
"""
try:
with open(
os.path.join(device_path, "boot_out.txt"), "r", encoding="utf-8"
@ -826,7 +943,8 @@ def get_circuitpython_version(device_path):
)
logger.error("boot_out.txt not found.")
sys.exit(1)
return (circuit_python, board_id)
return circuit_python, board_id
def get_circup_version():
@ -907,15 +1025,34 @@ def get_dependencies(*requested_libraries, mod_names, to_install=()):
)
def get_device_versions(device_path):
def get_device_versions(device_url):
"""
Returns a dictionary of metadata from modules on the connected device.
:param str device_path: Path to the device volume.
:param str device_url: URL for the device.
:return: A dictionary of metadata about the modules available on the
connected device.
"""
return get_modules(os.path.join(device_path, "lib"))
url = urlparse(device_url)
if url.scheme == "http":
return get_modules(device_url + "/fs/lib/")
return get_modules(os.path.join(url.path, "lib"))
def get_modules(device_url):
"""
Get a dictionary containing metadata about all the Python modules found in
the referenced path.
:param str device_url: URL to be used to find modules.
:return: A dictionary containing metadata about the found modules.
"""
url = urlparse(device_url)
if url.scheme == "http":
return _get_modules_http(device_url)
return _get_modules_file(device_url)
def get_latest_release_from_url(url):
@ -936,10 +1073,103 @@ def get_latest_release_from_url(url):
return tag
def get_modules(path):
def _get_modules_http(url):
"""
Get a dictionary containing metadata about all the Python modules found using
the referenced URL.
:param str url: URL for the modules.
:return: A dictionary containing metadata about the found modules.
"""
result = {}
u = urlparse(url)
auth = HTTPBasicAuth("", u.password)
r = requests.get(url, auth=auth, headers={"Accept": "application/json"})
r.raise_for_status()
directory_mods = []
single_file_mods = []
for entry in r.json():
entry_name = entry.get("name")
if entry.get("directory"):
directory_mods.append(entry_name)
else:
if entry_name.endswith(".py") or entry_name.endswith(".mpy"):
single_file_mods.append(entry_name)
_get_modules_http_single_mods(auth, result, single_file_mods, url)
_get_modules_http_dir_mods(auth, directory_mods, result, url)
return result
def _get_modules_http_dir_mods(auth, directory_mods, result, url):
"""
:param auth HTTP authentication.
:param directory_mods list of modules.
:param result dictionary for the result.
:param url: URL of the device.
"""
for dm in directory_mods:
dm_url = url + dm + "/"
r = requests.get(dm_url, auth=auth, headers={"Accept": "application/json"})
r.raise_for_status()
mpy = False
for entry in r.json():
entry_name = entry.get("name")
if not entry.get("directory") and (
entry_name.endswith(".py") or entry_name.endswith(".mpy")
):
if entry_name.endswith(".mpy"):
mpy = True
r = requests.get(dm_url + entry_name, auth=auth)
r.raise_for_status()
idx = entry_name.rfind(".")
with tempfile.NamedTemporaryFile(
prefix=entry_name[:idx] + "-", suffix=entry_name[idx:], delete=False
) as fp:
fp.write(r.content)
tmp_name = fp.name
metadata = extract_metadata(tmp_name)
os.remove(tmp_name)
if "__version__" in metadata:
metadata["path"] = dm_url
result[dm] = metadata
# break now if any of the submodules has a bad format
if metadata["__version__"] == BAD_FILE_FORMAT:
break
if result.get(dm) is None:
result[dm] = {"path": dm_url, "mpy": mpy}
def _get_modules_http_single_mods(auth, result, single_file_mods, url):
"""
:param auth HTTP authentication.
:param single_file_mods list of modules.
:param result dictionary for the result.
:param url: URL of the device.
"""
for sfm in single_file_mods:
sfm_url = url + sfm
r = requests.get(sfm_url, auth=auth)
r.raise_for_status()
idx = sfm.rfind(".")
with tempfile.NamedTemporaryFile(
prefix=sfm[:idx] + "-", suffix=sfm[idx:], delete=False
) as fp:
fp.write(r.content)
tmp_name = fp.name
metadata = extract_metadata(tmp_name)
os.remove(tmp_name)
metadata["path"] = sfm_url
result[sfm[:idx]] = metadata
def _get_modules_file(path):
"""
Get a dictionary containing metadata about all the Python modules found in
the referenced path.
the referenced file system path.
:param str path: The directory in which to find modules.
:return: A dictionary containing metadata about the found modules.
@ -999,53 +1229,94 @@ def install_module(
if not name:
click.echo("No module name(s) provided.")
elif name in mod_names:
library_path = os.path.join(device_path, "lib")
if not os.path.exists(library_path): # pragma: no cover
os.makedirs(library_path)
metadata = mod_names[name]
bundle = metadata["bundle"]
# Grab device modules to check if module already installed
if name in device_modules:
click.echo("'{}' is already installed.".format(name))
return
# Create the library directory first.
url = urlparse(device_path)
if url.scheme == "http":
library_path = device_path + "/fs/lib/"
auth = HTTPBasicAuth("", url.password)
r = requests.put(library_path, auth=auth)
r.raise_for_status()
else:
library_path = os.path.join(device_path, "lib")
if not os.path.exists(library_path): # pragma: no cover
os.makedirs(library_path)
metadata = mod_names[name]
bundle = metadata["bundle"]
if pyext:
# Use Python source for module.
source_path = metadata["path"] # Path to Python source version.
if os.path.isdir(source_path):
target = os.path.basename(os.path.dirname(source_path))
target_path = os.path.join(library_path, target)
# Copy the directory.
shutil.copytree(source_path, target_path)
else:
target = os.path.basename(source_path)
target_path = os.path.join(library_path, target)
# Copy file.
shutil.copyfile(source_path, target_path)
_install_module_py(library_path, metadata)
else:
# Use pre-compiled mpy modules.
module_name = os.path.basename(metadata["path"]).replace(".py", ".mpy")
if not module_name:
# Must be a directory based module.
module_name = os.path.basename(os.path.dirname(metadata["path"]))
major_version = CPY_VERSION.split(".")[0]
bundle_platform = "{}mpy".format(major_version)
bundle_path = os.path.join(bundle.lib_dir(bundle_platform), module_name)
if os.path.isdir(bundle_path):
target_path = os.path.join(library_path, module_name)
# Copy the directory.
shutil.copytree(bundle_path, target_path)
elif os.path.isfile(bundle_path):
target = os.path.basename(bundle_path)
target_path = os.path.join(library_path, target)
# Copy file.
shutil.copyfile(bundle_path, target_path)
else:
raise IOError("Cannot find compiled version of module.")
_install_module_mpy(bundle, library_path, metadata)
click.echo("Installed '{}'.".format(name))
else:
click.echo("Unknown module named, '{}'.".format(name))
def _install_module_mpy(bundle, library_path, metadata):
"""
:param bundle library bundle.
:param library_path library path
:param metadata dictionary.
"""
url = urlparse(library_path)
module_name = os.path.basename(metadata["path"]).replace(".py", ".mpy")
if not module_name:
# Must be a directory based module.
module_name = os.path.basename(os.path.dirname(metadata["path"]))
major_version = CPY_VERSION.split(".")[0]
bundle_platform = "{}mpy".format(major_version)
bundle_path = os.path.join(bundle.lib_dir(bundle_platform), module_name)
if os.path.isdir(bundle_path):
if url.scheme == "http":
install_dir_http(bundle_path, library_path + module_name)
else:
target_path = os.path.join(library_path, module_name)
# Copy the directory.
shutil.copytree(bundle_path, target_path)
elif os.path.isfile(bundle_path):
target = os.path.basename(bundle_path)
if url.scheme == "http":
install_file_http(bundle_path, library_path + target)
else:
target_path = os.path.join(library_path, target)
# Copy file.
shutil.copyfile(bundle_path, target_path)
else:
raise IOError("Cannot find compiled version of module.")
def _install_module_py(library_path, metadata):
"""
:param library_path library path
:param metadata dictionary.
"""
url = urlparse(library_path)
source_path = metadata["path"] # Path to Python source version.
if os.path.isdir(source_path):
target = os.path.basename(os.path.dirname(source_path))
if url.scheme == "http":
install_dir_http(source_path, library_path + target)
else:
target_path = os.path.join(library_path, target)
# Copy the directory.
shutil.copytree(source_path, target_path)
else:
target = os.path.basename(source_path)
if url.scheme == "http":
install_file_http(source_path, library_path + target)
else:
target_path = os.path.join(library_path, target)
# Copy file.
shutil.copyfile(source_path, target_path)
# pylint: enable=too-many-locals,too-many-branches
@ -1148,12 +1419,19 @@ def tags_data_save_tag(key, tag):
type=click.Path(exists=True, file_okay=False),
help="Path to CircuitPython directory. Overrides automatic path detection.",
)
@click.option(
"--host",
help="Hostname or IP address of a device. Overrides automatic path detection.",
)
@click.option(
"--password", help="Password to use for authentication when --host is used."
)
@click.version_option(
prog_name="CircUp",
message="%(prog)s, A CircuitPython module updater. Version %(version)s",
)
@click.pass_context
def main(ctx, verbose, path): # pragma: no cover
def main(ctx, verbose, path, host, password): # pragma: no cover
"""
A tool to manage and update libraries on a CircuitPython device.
"""
@ -1178,10 +1456,8 @@ def main(ctx, verbose, path): # pragma: no cover
# stop early if the command is boardless
if ctx.invoked_subcommand in BOARDLESS_COMMANDS:
return
if path:
device_path = path
else:
device_path = find_device()
device_path = get_device_path(host, password, path)
ctx.obj["DEVICE_PATH"] = device_path
latest_version = get_latest_release_from_url(
"https://github.com/adafruit/circuitpython/releases/latest"
@ -1215,6 +1491,37 @@ def main(ctx, verbose, path): # pragma: no cover
logger.warning(ex)
def get_device_path(host, password, path):
"""
:param host Hostname or IP address.
:param password REST API password.
:param path File system path.
:return device URL or None if the device cannot be found.
"""
if path:
device_path = "file:///" + path
elif host:
if password is None:
click.secho("--host needs --password", fg="red")
sys.exit(1)
# pylint: disable=no-member
# verify hostname/address
try:
socket.getaddrinfo(host, 80, proto=socket.IPPROTO_TCP)
except socket.gaierror:
click.secho("Invalid host: {}".format(host), fg="red")
sys.exit(1)
# pylint: enable=no-member
device_path = f"http://:{password}@" + host
else:
device_path = find_device()
if device_path is not None:
device_path = "file:///" + device_path
return device_path
@main.command()
@click.option("-r", "--requirement", is_flag=True)
@click.pass_context
@ -1368,31 +1675,53 @@ def uninstall(ctx, module): # pragma: no cover
can be uninstalled at once by providing more than one module name, each
separated by a space.
"""
device_path = ctx.obj["DEVICE_PATH"]
for name in module:
device_modules = get_device_versions(ctx.obj["DEVICE_PATH"])
device_modules = get_device_versions(device_path)
name = name.lower()
mod_names = {}
for module_item, metadata in device_modules.items():
mod_names[module_item.replace(".py", "").lower()] = metadata
if name in mod_names:
library_path = os.path.join(ctx.obj["DEVICE_PATH"], "lib")
metadata = mod_names[name]
module_path = metadata["path"]
if os.path.isdir(module_path):
target = os.path.basename(os.path.dirname(module_path))
target_path = os.path.join(library_path, target)
# Remove the directory.
shutil.rmtree(target_path)
url = urlparse(device_path)
if url.scheme == "http":
_uninstall_http(device_path, module_path)
else:
target = os.path.basename(module_path)
target_path = os.path.join(library_path, target)
# Remove file
os.remove(target_path)
_uninstall_file(device_path, module_path)
click.echo("Uninstalled '{}'.".format(name))
else:
click.echo("Module '{}' not found on device.".format(name))
def _uninstall_http(device_path, module_path):
"""
Uninstall given module on device using REST API.
"""
url = urlparse(device_path)
auth = HTTPBasicAuth("", url.password)
r = requests.delete(module_path, auth=auth)
r.raise_for_status()
def _uninstall_file(device_path, module_path):
"""
Uninstall module using local file system.
"""
library_path = os.path.join(device_path, "lib")
if os.path.isdir(module_path):
target = os.path.basename(os.path.dirname(module_path))
target_path = os.path.join(library_path, target)
# Remove the directory.
shutil.rmtree(target_path)
else:
target = os.path.basename(module_path)
target_path = os.path.join(library_path, target)
# Remove file
os.remove(target_path)
# pylint: disable=too-many-branches

View file

@ -647,7 +647,7 @@ def test_get_bundle_versions():
Ensure ensure_latest_bundle is called even if lib_dir exists.
"""
with mock.patch("circup.ensure_latest_bundle") as mock_elb, mock.patch(
"circup.get_modules", return_value={"ok": {"name": "ok"}}
"circup._get_modules_file", return_value={"ok": {"name": "ok"}}
) as mock_gm, mock.patch("circup.CPY_VERSION", "4.1.2"), mock.patch(
"circup.Bundle.lib_dir", return_value="foo/bar/lib"
), mock.patch(
@ -668,7 +668,7 @@ def test_get_bundle_versions_avoid_download():
Testing both cases: lib_dir exists and lib_dir doesn't exists.
"""
with mock.patch("circup.ensure_latest_bundle") as mock_elb, mock.patch(
"circup.get_modules", return_value={"ok": {"name": "ok"}}
"circup._get_modules_file", return_value={"ok": {"name": "ok"}}
) as mock_gm, mock.patch("circup.CPY_VERSION", "4.1.2"), mock.patch(
"circup.Bundle.lib_dir", return_value="foo/bar/lib"
):