more refactoring, some successful USB workflow functionality

This commit is contained in:
foamyguy 2023-11-20 10:28:29 -06:00
parent c6d430c0fc
commit 86ccabe411

View file

@ -388,48 +388,6 @@ class Module:
update_reason = "Minor Version"
return (self.name, loc, rem, update_reason)
def update(self):
"""
Delete the module on the device, then copy the module from the bundle
back onto the device.
The caller is expected to handle any exceptions raised.
"""
url = urlparse(self.path)
if url.scheme == "http":
self._update_http()
else:
self._update_file()
def _update_http(self):
"""
Update the module using web workflow.
"""
if self.file:
# Copy the file (will overwrite).
install_file_http(self.bundle_path, self.path)
else:
# Delete the directory (recursive) first.
url = urlparse(self.path)
auth = HTTPBasicAuth("", url.password)
r = requests.delete(self.path, auth=auth)
r.raise_for_status()
install_dir_http(self.bundle_path, self.path)
def _update_file(self):
"""
Update the module using file system.
"""
if os.path.isdir(self.path):
# Delete and copy the directory.
shutil.rmtree(self.path, ignore_errors=True)
shutil.copytree(self.bundle_path, self.path)
else:
# Delete and copy file.
os.remove(self.path)
shutil.copyfile(self.bundle_path, self.path)
def __repr__(self):
"""
Helps with log files.
@ -453,8 +411,13 @@ class Module:
class WebBackend:
def __init__(self):
pass
"""
Backend for interacting with a device via Web Workflow
"""
def __init__(self, host, password):
self.host = host
self.password = password
def install_file_http(self, source, target):
"""
@ -489,7 +452,9 @@ class WebBackend:
rel_path = ""
for name in files:
with open(os.path.join(root, name), "rb") as fp:
r = requests.put(target + rel_path + "/" + name, fp.read(), auth=auth)
r = requests.put(
target + rel_path + "/" + name, fp.read(), auth=auth
)
r.raise_for_status()
for name in dirs:
r = requests.put(target + rel_path + "/" + name, auth=auth)
@ -498,13 +463,12 @@ class WebBackend:
def get_circuitpython_version(self, device_url):
"""
Returns the version number of CircuitPython running on the board connected
via ``device_path``, along with the board ID.
via ``device_url``, along with the board ID.
:param str device_url: http based device URL.
:return: A tuple with the version string for CircuitPython and the board ID string.
"""
url = urlparse(device_url)
return self._get_circuitpython_version_http(device_url)
return self._get_circuitpython_version(device_url)
def _get_circuitpython_version(self, url):
"""
@ -518,7 +482,9 @@ class WebBackend:
r = requests.get(url + "/cp/version.json")
# pylint: disable=no-member
if r.status_code != requests.codes.ok:
click.secho(f" Unable to get version from {url}: {r.status_code}", fg="red")
click.secho(
f" Unable to get version from {url}: {r.status_code}", fg="red"
)
sys.exit(1)
# pylint: enable=no-member
ver_json = r.json()
@ -532,7 +498,6 @@ class WebBackend:
:return: A dictionary of metadata about the modules available on the
connected device.
"""
url = urlparse(device_url)
return self.get_modules(device_url + "/fs/lib/")
def get_modules(self, device_url):
@ -543,7 +508,6 @@ class WebBackend:
:param str device_url: URL to be used to find modules.
:return: A dictionary containing metadata about the found modules.
"""
url = urlparse(device_url)
return self._get_modules_http(device_url)
def _get_modules_http(self, url):
@ -592,7 +556,7 @@ class WebBackend:
for entry in r.json():
entry_name = entry.get("name")
if not entry.get("directory") and (
entry_name.endswith(".py") or entry_name.endswith(".mpy")
entry_name.endswith(".py") or entry_name.endswith(".mpy")
):
if entry_name.endswith(".mpy"):
mpy = True
@ -600,7 +564,9 @@ class WebBackend:
r.raise_for_status()
idx = entry_name.rfind(".")
with tempfile.NamedTemporaryFile(
prefix=entry_name[:idx] + "-", suffix=entry_name[idx:], delete=False
prefix=entry_name[:idx] + "-",
suffix=entry_name[idx:],
delete=False,
) as fp:
fp.write(r.content)
tmp_name = fp.name
@ -629,7 +595,7 @@ class WebBackend:
r.raise_for_status()
idx = sfm.rfind(".")
with tempfile.NamedTemporaryFile(
prefix=sfm[:idx] + "-", suffix=sfm[idx:], delete=False
prefix=sfm[:idx] + "-", suffix=sfm[idx:], delete=False
) as fp:
fp.write(r.content)
tmp_name = fp.name
@ -638,9 +604,9 @@ class WebBackend:
metadata["path"] = sfm_url
result[sfm[:idx]] = metadata
# pylint: disable=too-many-locals,too-many-branches
# pylint: disable=too-many-locals,too-many-branches,too-many-arguments
def install_module(
self, device_path, device_modules, name, pyext, mod_names
self, device_path, device_modules, name, pyext, mod_names
): # pragma: no cover
"""
Finds a connected device and installs a given module name if it
@ -690,7 +656,6 @@ class WebBackend:
:param library_path library path
:param metadata dictionary.
"""
url = urlparse(library_path)
module_name = os.path.basename(metadata["path"]).replace(".py", ".mpy")
if not module_name:
# Must be a directory based module.
@ -716,18 +681,16 @@ class WebBackend:
:param library_path library path
:param metadata dictionary.
"""
url = urlparse(library_path)
source_path = metadata["path"] # Path to Python source version.
if os.path.isdir(source_path):
target = os.path.basename(os.path.dirname(source_path))
self.install_dir_http(source_path, library_path + target)
else:
target = os.path.basename(source_path)
self.install_file_http(source_path, library_path + target)
def libraries_from_imports(self, ctx, code_py, mod_names):
def libraries_from_imports(self, code_py, mod_names):
"""
Parse the given code.py file and return the imported libraries
@ -736,16 +699,15 @@ class WebBackend:
"""
# pylint: disable=broad-except
url = code_py
auth = HTTPBasicAuth("", ctx.parent.params["password"])
auth = HTTPBasicAuth("", self.password)
r = requests.get(url, auth=auth)
r.raise_for_status()
with open(LOCAL_CODE_PY_COPY, "w", encoding="utf-8") as f:
f.write(r.text)
code_py = LOCAL_CODE_PY_COPY
try:
found_imports = findimports.find_imports(code_py)
except Exception as ex: # broad exception because anything could go wrong
@ -756,7 +718,7 @@ class WebBackend:
imports = [info.name.split(".", 1)[0] for info in found_imports]
return [r for r in imports if r in mod_names]
def _uninstall(self, device_path, module_path):
def uninstall(self, device_path, module_path):
"""
Uninstall given module on device using REST API.
"""
@ -765,11 +727,95 @@ class WebBackend:
r = requests.delete(module_path, auth=auth)
r.raise_for_status()
def update(self, module):
"""
Delete the module on the device, then copy the module from the bundle
back onto the device.
The caller is expected to handle any exceptions raised.
"""
url = urlparse(module.path)
if url.scheme == "http":
self._update_http(module)
def _update_http(self, module):
"""
Update the module using web workflow.
"""
if module.file:
# Copy the file (will overwrite).
self.install_file_http(module.bundle_path, module.path)
else:
# Delete the directory (recursive) first.
url = urlparse(module.path)
auth = HTTPBasicAuth("", url.password)
r = requests.delete(module.path, auth=auth)
r.raise_for_status()
self.install_dir_http(module.bundle_path, module.path)
def get_modules(device_url):
"""
Get a dictionary containing metadata about all the Python modules found in
the referenced path.
:param str device_url: URL to be used to find modules.
:return: A dictionary containing metadata about the found modules.
"""
return _get_modules_file(device_url)
def _get_modules_file(path):
"""
Get a dictionary containing metadata about all the Python modules found in
the referenced file system path.
:param str path: The directory in which to find modules.
:return: A dictionary containing metadata about the found modules.
"""
result = {}
if not path:
return result
single_file_py_mods = glob.glob(os.path.join(path, "*.py"))
single_file_mpy_mods = glob.glob(os.path.join(path, "*.mpy"))
package_dir_mods = [
d
for d in glob.glob(os.path.join(path, "*", ""))
if not os.path.basename(os.path.normpath(d)).startswith(".")
]
single_file_mods = single_file_py_mods + single_file_mpy_mods
for sfm in [f for f in single_file_mods if not os.path.basename(f).startswith(".")]:
metadata = extract_metadata(sfm)
metadata["path"] = sfm
result[os.path.basename(sfm).replace(".py", "").replace(".mpy", "")] = metadata
for package_path in package_dir_mods:
name = os.path.basename(os.path.dirname(package_path))
py_files = glob.glob(os.path.join(package_path, "**/*.py"), recursive=True)
mpy_files = glob.glob(os.path.join(package_path, "**/*.mpy"), recursive=True)
all_files = py_files + mpy_files
# default value
result[name] = {"path": package_path, "mpy": bool(mpy_files)}
# explore all the submodules to detect bad ones
for source in [f for f in all_files if not os.path.basename(f).startswith(".")]:
metadata = extract_metadata(source)
if "__version__" in metadata:
metadata["path"] = package_path
result[name] = metadata
# break now if any of the submodules has a bad format
if metadata["__version__"] == BAD_FILE_FORMAT:
break
return result
class USBBackend:
"""
Backend for interacting with a device via USB Workflow
"""
def __init__(self):
pass
def get_circuitpython_version(self, device_url):
"""
Returns the version number of CircuitPython running on the board connected
@ -800,7 +846,7 @@ class USBBackend:
try:
with open(
os.path.join(device_path, "boot_out.txt"), "r", encoding="utf-8"
os.path.join(device_path, "boot_out.txt"), "r", encoding="utf-8"
) as boot:
version_line = boot.readline()
circuit_python = version_line.split(";")[0].split(" ")[-3]
@ -828,63 +874,11 @@ class USBBackend:
connected device.
"""
url = urlparse(device_url)
return self.get_modules(os.path.join(url.path, "lib"))
return get_modules(os.path.join(url.path, "lib"))
def get_modules(self, device_url):
"""
Get a dictionary containing metadata about all the Python modules found in
the referenced path.
:param str device_url: URL to be used to find modules.
:return: A dictionary containing metadata about the found modules.
"""
url = urlparse(device_url)
return self._get_modules_file(device_url)
def _get_modules_file(self, path):
"""
Get a dictionary containing metadata about all the Python modules found in
the referenced file system path.
:param str path: The directory in which to find modules.
:return: A dictionary containing metadata about the found modules.
"""
result = {}
if not path:
return result
single_file_py_mods = glob.glob(os.path.join(path, "*.py"))
single_file_mpy_mods = glob.glob(os.path.join(path, "*.mpy"))
package_dir_mods = [
d
for d in glob.glob(os.path.join(path, "*", ""))
if not os.path.basename(os.path.normpath(d)).startswith(".")
]
single_file_mods = single_file_py_mods + single_file_mpy_mods
for sfm in [f for f in single_file_mods if not os.path.basename(f).startswith(".")]:
metadata = extract_metadata(sfm)
metadata["path"] = sfm
result[os.path.basename(sfm).replace(".py", "").replace(".mpy", "")] = metadata
for package_path in package_dir_mods:
name = os.path.basename(os.path.dirname(package_path))
py_files = glob.glob(os.path.join(package_path, "**/*.py"), recursive=True)
mpy_files = glob.glob(os.path.join(package_path, "**/*.mpy"), recursive=True)
all_files = py_files + mpy_files
# default value
result[name] = {"path": package_path, "mpy": bool(mpy_files)}
# explore all the submodules to detect bad ones
for source in [f for f in all_files if not os.path.basename(f).startswith(".")]:
metadata = extract_metadata(source)
if "__version__" in metadata:
metadata["path"] = package_path
result[name] = metadata
# break now if any of the submodules has a bad format
if metadata["__version__"] == BAD_FILE_FORMAT:
break
return result
# pylint: disable=too-many-locals,too-many-branches
# pylint: disable=too-many-locals,too-many-branches,too-many-arguments
def install_module(
self, device_path, device_modules, name, pyext, mod_names
self, device_path, device_modules, name, pyext, mod_names
): # pragma: no cover
"""
Finds a connected device and installs a given module name if it
@ -909,8 +903,6 @@ class USBBackend:
return
# Create the library directory first.
url = urlparse(device_path)
library_path = os.path.join(device_path, "lib")
if not os.path.exists(library_path): # pragma: no cover
os.makedirs(library_path)
@ -933,7 +925,6 @@ class USBBackend:
:param library_path library path
:param metadata dictionary.
"""
url = urlparse(library_path)
module_name = os.path.basename(metadata["path"]).replace(".py", ".mpy")
if not module_name:
# Must be a directory based module.
@ -959,7 +950,7 @@ class USBBackend:
:param library_path library path
:param metadata dictionary.
"""
url = urlparse(library_path)
source_path = metadata["path"] # Path to Python source version.
if os.path.isdir(source_path):
target = os.path.basename(os.path.dirname(source_path))
@ -972,7 +963,7 @@ class USBBackend:
# Copy file.
shutil.copyfile(source_path, target_path)
def libraries_from_imports(ctx, code_py, mod_names):
def libraries_from_imports(self, code_py, mod_names):
"""
Parse the given code.py file and return the imported libraries
@ -997,7 +988,7 @@ class USBBackend:
imports = [info.name.split(".", 1)[0] for info in found_imports]
return [r for r in imports if r in mod_names]
def _uninstall(device_path, module_path):
def uninstall(self, device_path, module_path):
"""
Uninstall module using local file system.
"""
@ -1012,7 +1003,30 @@ class USBBackend:
target_path = os.path.join(library_path, target)
# Remove file
os.remove(target_path)
def update(self, module):
"""
Delete the module on the device, then copy the module from the bundle
back onto the device.
The caller is expected to handle any exceptions raised.
"""
self._update_file(module)
def _update_file(self, module):
"""
Update the module using file system.
"""
if os.path.isdir(module.path):
# Delete and copy the directory.
shutil.rmtree(module.path, ignore_errors=True)
shutil.copytree(module.bundle_path, module.path)
else:
# Delete and copy file.
os.remove(module.path)
shutil.copyfile(module.bundle_path, module.path)
def clean_library_name(assumed_library_name):
"""
Most CP repos and library names are look like this:
@ -1241,14 +1255,14 @@ def find_device():
return device_dir
def find_modules(device_url, bundles_list):
def find_modules(backend, device_url, bundles_list):
"""
Extracts metadata from the connected device and available bundles and
returns this as a list of Module instances representing the modules on the
device.
:param str device_url: The URL to the board.
:param Bundle bundles_list: List of supported bundles as Bundle objects.
:param List[Bundle] bundles_list: List of supported bundles as Bundle objects.
:return: A list of Module instances describing the current state of the
modules on the connected device.
"""
@ -1649,16 +1663,16 @@ def main(ctx, verbose, path, host, password, board_id, cpy_version): # pragma:
A tool to manage and update libraries on a CircuitPython device.
"""
ctx.ensure_object(dict)
using_webworkflow = (
ctx.parent is not None
and "host" in ctx.parent.params.keys()
and ctx.parent.params["host"] is not None
)
ctx.obj["using_webworkflow"] = using_webworkflow
if using_webworkflow:
ctx.obj["backend"] = WebBackend()
ctx.obj["backend"] = WebBackend(host=host, password=password)
else:
ctx.obj["backend"] = USBBackend()
@ -1758,7 +1772,9 @@ def freeze(ctx, requirement): # pragma: no cover
device. Option -r saves output to requirements.txt file
"""
logger.info("Freeze")
modules = find_modules(ctx.obj["DEVICE_PATH"], get_bundles_list())
modules = find_modules(
ctx.obj["backend"], ctx.obj["DEVICE_PATH"], get_bundles_list()
)
if modules:
output = []
for module in modules:
@ -1791,7 +1807,9 @@ def list_cli(ctx): # pragma: no cover
modules = [
m.row
for m in find_modules(ctx.obj["DEVICE_PATH"], get_bundles_list())
for m in find_modules(
ctx.obj["backend"], ctx.obj["DEVICE_PATH"], get_bundles_list()
)
if m.outofdate
]
if modules:
@ -1882,7 +1900,9 @@ def install(ctx, modules, pyext, requirement, auto, auto_file): # pragma: no co
if not os.path.isfile(auto_file) and not ctx.obj["using_webworkflow"]:
click.secho(f"Auto file not found: {auto_file}", fg="red")
sys.exit(1)
requested_installs = ctx.obj["backend"].libraries_from_imports(ctx, auto_file, mod_names)
requested_installs = ctx.obj["backend"].libraries_from_imports(
auto_file, mod_names
)
else:
requested_installs = modules
requested_installs = sorted(set(requested_installs))
@ -1941,16 +1961,12 @@ def uninstall(ctx, module): # pragma: no cover
if name in mod_names:
metadata = mod_names[name]
module_path = metadata["path"]
url = urlparse(device_path)
ctx.obj["backend"]._uninstall(device_path, module_path)
# if url.scheme == "http":
# _uninstall_http(device_path, module_path)
# else:
# _uninstall_file(device_path, module_path)
ctx.obj["backend"].uninstall(device_path, module_path)
click.echo("Uninstalled '{}'.".format(name))
else:
click.echo("Module '{}' not found on device.".format(name))
# pylint: disable=too-many-branches
@ -1976,7 +1992,9 @@ def update(ctx, update_all): # pragma: no cover
# Grab out of date modules.
modules = [
m
for m in find_modules(ctx.obj["DEVICE_PATH"], get_bundles_list())
for m in find_modules(
ctx.obj["backend"], ctx.obj["DEVICE_PATH"], get_bundles_list()
)
if m.outofdate
]
if modules:
@ -2029,7 +2047,7 @@ def update(ctx, update_all): # pragma: no cover
if update_flag:
# pylint: disable=broad-except
try:
module.update()
ctx.obj["backend"].update(module)
click.echo("Updated {}".format(module.name))
except Exception as ex:
logger.exception(ex)