cleanup and code format

This commit is contained in:
foamyguy 2024-04-16 19:40:14 -05:00
parent ffda566d7f
commit fdd464d61f
3 changed files with 60 additions and 49 deletions

View file

@ -16,7 +16,6 @@ import requests
from requests.adapters import HTTPAdapter
from requests.auth import HTTPBasicAuth
from circup.shared import DATA_DIR, BAD_FILE_FORMAT, extract_metadata, _get_modules_file
#: The location to store a local copy of code.py for use with --auto and
@ -79,13 +78,13 @@ class Backend:
"""
raise NotImplementedError
def _install_module_py(self, metadata, location=None):
def install_module_py(self, metadata, location=None):
"""
To be overridden by subclass
"""
raise NotImplementedError
def _install_module_mpy(self, bundle, metadata):
def install_module_mpy(self, bundle, metadata):
"""
To be overridden by subclass
"""
@ -159,10 +158,10 @@ class Backend:
if pyext:
# Use Python source for module.
self._install_module_py(metadata)
self.install_module_py(metadata)
else:
# Use pre-compiled mpy modules.
self._install_module_mpy(bundle, metadata)
self.install_module_mpy(bundle, metadata)
click.echo("Installed '{}'.".format(name))
else:
click.echo("Unknown module named, '{}'.".format(name))
@ -219,6 +218,12 @@ class Backend:
board_id = ""
return circuit_python, board_id
def file_exists(self, filepath):
"""
To be overriden by subclass
"""
raise NotImplementedError
def _writeable_error():
click.secho(
@ -282,7 +287,7 @@ class WebBackend(Backend):
_writeable_error()
r.raise_for_status()
def install_dir_http(self, source, location=None ):
def install_dir_http(self, source, location=None):
"""
Install directory to device using web workflow.
:param source source directory.
@ -498,7 +503,7 @@ class WebBackend(Backend):
_writeable_error()
r.raise_for_status()
def _install_module_mpy(self, bundle, metadata):
def install_module_mpy(self, bundle, metadata):
"""
:param bundle library bundle.
:param library_path library path
@ -522,7 +527,7 @@ class WebBackend(Backend):
raise IOError("Cannot find compiled version of module.")
# pylint: enable=too-many-locals,too-many-branches
def _install_module_py(self, metadata, location=None):
def install_module_py(self, metadata, location=None):
"""
:param library_path library path
:param metadata dictionary.
@ -568,6 +573,18 @@ class WebBackend(Backend):
"""
self._update_http(module)
def file_exists(self, filepath):
"""
return True if the file exists, otherwise False.
"""
auth = HTTPBasicAuth("", self.password)
resp = requests.get(
self.get_file_path(filepath), auth=auth, timeout=self.timeout
)
if resp.status_code == 200:
return True
return False
def _update_http(self, module):
"""
Update the module using web workflow.
@ -741,7 +758,7 @@ class DiskBackend(Backend):
if not os.path.exists(library_path): # pragma: no cover
os.makedirs(library_path)
def _install_module_mpy(self, bundle, metadata):
def install_module_mpy(self, bundle, metadata):
"""
:param bundle library bundle.
:param library_path library path
@ -771,7 +788,7 @@ class DiskBackend(Backend):
raise IOError("Cannot find compiled version of module.")
# pylint: enable=too-many-locals,too-many-branches
def _install_module_py(self, metadata, location=None):
def install_module_py(self, metadata, location=None):
"""
:param library_path library path
:param metadata dictionary.
@ -837,6 +854,12 @@ class DiskBackend(Backend):
os.remove(module.path)
shutil.copyfile(module.bundle_path, module.path)
def file_exists(self, filepath):
"""
return True if the file exists, otherwise False.
"""
return os.path.exists(os.path.join(self.device_location, filepath))
def get_file_path(self, filename):
"""
returns the full path on the device to a given file name.

View file

@ -98,16 +98,8 @@ def completion_for_example(ctx, param, incomplete):
Returns the list of available modules for the command line tab-completion
with the ``circup example`` command.
"""
# pylint: disable=unused-argument
# available_modules = get_bundle_versions(get_bundles_list(), avoid_download=True)
# module_names = {m.replace(".py", "") for m in available_modules}
# if incomplete:
# module_names = [name for name in module_names if name.startswith(incomplete)]
# return sorted(module_names)
# pylint: disable=unused-argument, consider-iterating-dictionary
available_examples = get_bundle_examples(get_bundles_list(), avoid_download=True)
# with open("tmp1.py", "w") as f:
# f.write(f"data = {str(available_examples)}")
matching_examples = [
example_path
@ -325,6 +317,7 @@ def get_bundle_examples(bundles_list, avoid_download=False):
:return: A dictionary of metadata about the examples available in the
library bundle.
"""
# pylint: disable=too-many-nested-blocks
all_the_examples = dict()
try:
@ -332,24 +325,8 @@ def get_bundle_examples(bundles_list, avoid_download=False):
if not avoid_download or not os.path.isdir(bundle.lib_dir("py")):
ensure_latest_bundle(bundle)
path = bundle.examples_dir("py")
# with open("path_debug.txt", "a") as f:
# f.write(path + "\n")
path_examples = _get_modules_file(path, logger)
# with open("path_modules_debug.txt", "a") as f:
# f.write(str(path_examples) + "\n----------\n")
for lib_name, lib_metadata in path_examples.items():
walk_val = os.walk(lib_metadata["path"])
# with open("path_walk_debug.txt", "a") as f:
# for thing in walk_val:
# f.write(f"{lib_name}: " + str(thing) + "\n----\n")
# f.write("\n=========================\n")
# for _example_file in os.listdir(lib_metadata["path"]):
# all_the_examples[f"{lib_name}/{_example_file.replace('.py', '')}"] = os.path.join(lib_metadata["path"], f"{_example_file}")
for _dir_level in os.walk(lib_metadata["path"]):
for _file in _dir_level[2]:
_parts = _dir_level[0].split(os.path.sep)
@ -357,14 +334,9 @@ def get_bundle_examples(bundles_list, avoid_download=False):
_dirs = _parts[_lib_name_index:]
if _dirs[-1] == "":
_dirs.pop(-1)
with open("dirs_parts_debug.txt", "a") as f:
f.write(f"{lib_name}: " + str(_dirs) + "\n----\n")
slug = f"{os.path.sep}".join(_dirs + [_file.replace(".py", "")])
all_the_examples[slug] = os.path.join(_dir_level[0], _file)
# all_the_examples[f"{lib_name}/{_example_file.replace('.py', '')}"] = os.path.join(
# lib_metadata["path"], f"{_file}")
except NotADirectoryError:
# Bundle does not have new style examples directory
# so we cannot include its examples.

View file

@ -350,24 +350,40 @@ def install(ctx, modules, pyext, requirement, auto, auto_file): # pragma: no co
@main.command()
@click.option("--overwrite", is_flag=True, help="Overwrite the file if it exists.")
@click.argument(
"examples", required=True, nargs=-1, shell_complete=completion_for_example
)
@click.pass_context
def example(ctx, examples):
print(f"context: {ctx}")
for example in examples:
def example(ctx, examples, overwrite):
"""
Copy named example(s) from a bundle onto the device. Multiple examples
can be installed at once by providing more than one example name, each
separated by a space.
"""
for example_arg in examples:
available_examples = get_bundle_examples(
get_bundles_list(), avoid_download=True
)
if example in available_examples:
click.echo(available_examples[example])
ctx.obj["backend"]._install_module_py(
{"path": available_examples[example]}, location=""
)
if example_arg in available_examples:
filename = available_examples[example_arg].split(os.path.sep)[-1]
if overwrite or not ctx.obj["backend"].file_exists(filename):
click.echo(
f"{'Copying' if not overwrite else 'Overwriting'}: {filename}"
)
ctx.obj["backend"].install_module_py(
{"path": available_examples[example_arg]}, location=""
)
else:
click.secho(
f"File: {filename} already exists. Use --overwrite if you wish to replace it.",
fg="red",
)
else:
click.secho(
f"Error: {example} was not found in any local bundle examples.",
f"Error: {example_arg} was not found in any local bundle examples.",
fg="red",
)