bringing in wwshell

This commit is contained in:
foamyguy 2024-05-25 12:14:52 -05:00
parent 74b07bee0a
commit e75a7dbf3a
5 changed files with 342 additions and 7 deletions

View file

@ -97,6 +97,12 @@ class Backend:
"""
raise NotImplementedError
def upload_file(self, target_file, location_to_paste):
"""Paste a copy of the specified file at the location given
To be overridden by subclass
"""
raise NotImplementedError
# pylint: disable=too-many-locals,too-many-branches,too-many-arguments,too-many-nested-blocks,too-many-statements
def install_module(
self, device_path, device_modules, name, pyext, mod_names, upgrade=False
@ -281,7 +287,9 @@ class WebBackend(Backend):
):
super().__init__(logger)
if password is None:
raise ValueError("--host needs --password")
raise ValueError(
"Must pass --password or set CIRCUP_WEBWORKFLOW_PASSWORD environment variable"
)
# pylint: disable=no-member
# verify hostname/address
@ -305,6 +313,7 @@ class WebBackend(Backend):
self.library_path = self.device_location + "/" + self.LIB_DIR_PATH
self.timeout = timeout
self.version_override = version_override
self.FS_URL = urljoin(self.device_location, self.FS_PATH)
def install_file_http(self, source, location=None):
"""
@ -316,6 +325,7 @@ class WebBackend(Backend):
file_name = source.split(os.path.sep)
file_name = file_name[-2] if file_name[-1] == "" else file_name[-1]
print(f"inside install_file_http location: '{location}'")
if location is None:
target = self.device_location + "/" + self.LIB_DIR_PATH + file_name
else:
@ -324,7 +334,10 @@ class WebBackend(Backend):
auth = HTTPBasicAuth("", self.password)
with open(source, "rb") as fp:
print(f"upload file PUT URL: {target}")
r = self.session.put(target, fp.read(), auth=auth, timeout=self.timeout)
print(f"install_file_http response status: {r.status_code}")
print(r.content)
if r.status_code == 409:
_writeable_error()
r.raise_for_status()
@ -561,6 +574,52 @@ class WebBackend(Backend):
else:
self.install_file_http(target_file)
def upload_file(self, target_file, location_to_paste):
"""
copy a file from the host PC to the microcontroller
:param target_file: file on the host PC to copy
:param location_to_paste: Location on the microcontroller to paste it.
:return:
"""
print(f"inside upload_file location_to_paste: '{location_to_paste}'")
if os.path.isdir(target_file):
create_directory_url = urljoin(
self.device_location,
"/".join(("fs", location_to_paste, target_file, "")),
)
self._create_library_directory(self.device_location, create_directory_url)
self.install_dir_http(target_file, location_to_paste)
else:
self.install_file_http(target_file, location_to_paste)
def download_file(self, target_file, location_to_paste):
"""
Download a file from the MCU device to the local host PC
:param target_file: The file on the MCU to download
:param location_to_paste: The location on the host PC to put the downloaded copy.
:return:
"""
auth = HTTPBasicAuth("", self.password)
with self.session.get(
self.FS_URL + target_file, timeout=self.timeout, auth=auth
) as r:
if r.status_code == 404:
click.secho(f"{target_file} was not found on the device", "red")
file_name = target_file.split("/")[-1]
if location_to_paste is None:
with open(file_name, "wb") as f:
f.write(r.content)
click.echo(f"Downloaded File: {file_name}")
else:
with open(os.path.join(location_to_paste, file_name), "wb") as f:
f.write(r.content)
click.echo(
f"Downloaded File: {os.path.join(location_to_paste, file_name)}"
)
def install_module_mpy(self, bundle, metadata):
"""
:param bundle library bundle.
@ -636,6 +695,7 @@ class WebBackend(Backend):
return True if the file exists, otherwise False.
"""
auth = HTTPBasicAuth("", self.password)
print(f"URL: {self.get_file_path(filepath)}")
resp = requests.get(
self.get_file_path(filepath), auth=auth, timeout=self.timeout
)
@ -664,11 +724,8 @@ class WebBackend(Backend):
"""
retuns the full path on the device to a given file name.
"""
return urljoin(
urljoin(self.device_location, "fs/", allow_fragments=False),
filename,
allow_fragments=False,
)
return "/".join((self.device_location, "fs", filename))
def is_device_present(self):
"""
@ -739,6 +796,17 @@ class WebBackend(Backend):
return r.json()["free"] * r.json()["block_size"] # bytes
sys.exit(1)
def list_dir(self, dirpath):
auth = HTTPBasicAuth("", self.password)
with self.session.get(
urljoin(self.device_location, f"fs/{dirpath if dirpath else ''}"),
auth=auth,
headers={"Accept": "application/json"},
timeout=self.timeout,
) as r:
print(r.content)
return r.json()["files"]
class DiskBackend(Backend):
"""

View file

@ -625,3 +625,29 @@ def get_device_path(host, password, path):
else:
device_path = find_device()
return device_path
def sorted_by_directory_then_alpha(list_of_files):
"""
Sort the list of files into alphabetical seperated
with directories grouped together before files.
"""
dirs = {}
files = {}
for cur_file in list_of_files:
if cur_file["directory"]:
dirs[cur_file["name"]] = cur_file
else:
files[cur_file["name"]] = cur_file
sorted_dir_names = sorted(dirs.keys())
sorted_file_names = sorted(files.keys())
sorted_full_list = []
for cur_name in sorted_dir_names:
sorted_full_list.append(dirs[cur_name])
for cur_name in sorted_file_names:
sorted_full_list.append(files[cur_name])
return sorted_full_list

View file

@ -0,0 +1,4 @@
from .commands import main
if __name__ == "__main__":
main()

235
circup/wwshell/commands.py Normal file
View file

@ -0,0 +1,235 @@
# SPDX-FileCopyrightText: 2019 Nicholas Tollervey, 2024 Tim Cocks, written for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
# ----------- CLI command definitions ----------- #
The following functions have IO side effects (for instance they emit to
stdout). Ergo, these are not checked with unit tests. Most of the
functionality they provide is provided by the functions from util_functions.py,
and the respective Backends which *are* tested. Most of the logic of the following
functions is to prepare things for presentation to / interaction with the user.
"""
import os
import time
import sys
import re
import logging
import update_checker
from semver import VersionInfo
import click
import requests
from circup.backends import WebBackend
from circup.logging import logger, log_formatter, LOGFILE
from circup.shared import BOARDLESS_COMMANDS, get_latest_release_from_url
from circup.command_utils import (
get_device_path,
get_circup_version,
completion_for_install,
completion_for_example,
sorted_by_directory_then_alpha,
)
@click.group()
@click.option(
"--verbose", is_flag=True, help="Comprehensive logging is sent to stdout."
)
@click.option(
"--path",
type=click.Path(exists=True, file_okay=False),
help="Path to CircuitPython directory. Overrides automatic path detection.",
)
@click.option(
"--host",
help="Hostname or IP address of a device. Overrides automatic path detection.",
default="circuitpython.local",
)
@click.option(
"--password",
help="Password to use for authentication when --host is used."
" You can optionally set an environment variable CIRCUP_WEBWORKFLOW_PASSWORD"
" instead of passing this argument. If both exist the CLI arg takes precedent.",
)
@click.option(
"--timeout",
default=30,
help="Specify the timeout in seconds for any network operations.",
)
@click.version_option(
prog_name="CircFile",
message="%(prog)s, A CircuitPython web workflow file managemenr. Version %(version)s",
)
@click.pass_context
def main( # pylint: disable=too-many-locals
ctx,
verbose,
path,
host,
password,
timeout,
): # pragma: no cover
"""
A tool to manage files CircuitPython device over web workflow.
"""
# pylint: disable=too-many-arguments,too-many-branches,too-many-statements,too-many-locals
ctx.ensure_object(dict)
ctx.obj["TIMEOUT"] = timeout
if password is None:
password = os.getenv("CIRCUP_WEBWORKFLOW_PASSWORD")
device_path = get_device_path(host, password, path)
using_webworkflow = "host" in ctx.params.keys() and ctx.params["host"] is not None
print(f"host: {ctx.params['host']}")
print(f"using webworkflow: {using_webworkflow}")
if using_webworkflow:
if host == "circuitpython.local":
click.echo("Checking versions.json on circuitpython.local to find hostname")
versions_resp = requests.get(
"http://circuitpython.local/cp/version.json", timeout=timeout
)
host = f'{versions_resp.json()["hostname"]}.local'
click.echo(f"Using hostname: {host}")
device_path = device_path.replace("circuitpython.local", host)
try:
ctx.obj["backend"] = WebBackend(
host=host, password=password, logger=logger, timeout=timeout
)
except ValueError as e:
click.secho(e, fg="red")
time.sleep(0.3)
sys.exit(1)
except RuntimeError as e:
click.secho(e, fg="red")
sys.exit(1)
if verbose:
# Configure additional logging to stdout.
ctx.obj["verbose"] = True
verbose_handler = logging.StreamHandler(sys.stdout)
verbose_handler.setLevel(logging.INFO)
verbose_handler.setFormatter(log_formatter)
logger.addHandler(verbose_handler)
click.echo("Logging to {}\n".format(LOGFILE))
else:
ctx.obj["verbose"] = False
logger.info("### Started Circfile ###")
# If a newer version of circfile is available, print a message.
logger.info("Checking for a newer version of circfile")
version = get_circup_version()
if version:
update_checker.update_check("circfile", version)
# stop early if the command is boardless
if ctx.invoked_subcommand in BOARDLESS_COMMANDS or "--help" in sys.argv:
return
ctx.obj["DEVICE_PATH"] = device_path
latest_version = get_latest_release_from_url(
"https://github.com/adafruit/circuitpython/releases/latest", logger
)
if device_path is None or not ctx.obj["backend"].is_device_present():
click.secho("Could not find a connected CircuitPython device.", fg="red")
sys.exit(1)
else:
click.echo("Found device at {}.".format(device_path))
@main.command("ls")
@click.argument("file", required=True, nargs=1, default="/")
@click.pass_context
def ls_cli(ctx, file): # pragma: no cover
"""
Lists the contents of a directory. Defaults to root directory
if not supplied.
"""
logger.info("ls")
if not file.endswith("/"):
file += "/"
click.echo(f"running: ls {file}")
files = ctx.obj["backend"].list_dir(file)
click.echo(f"Size\tName")
for cur_file in sorted_by_directory_then_alpha(files):
click.echo(
f"{cur_file['file_size']}\t{cur_file['name']}{'/' if cur_file['directory'] else ''}"
)
@main.command("put")
@click.argument("file", required=True, nargs=1)
@click.argument("location", required=False, nargs=1, default="")
@click.option("--overwrite", is_flag=True, help="Overwrite the file if it exists.")
@click.pass_context
def put_cli(ctx, file, location, overwrite):
"""
Upload a copy of a file or directory from the local computer
to the device
"""
click.echo(f"Attempting PUT: {file} at {location} overwrite? {overwrite}")
if not ctx.obj["backend"].file_exists(f"{location}{file}"):
ctx.obj["backend"].upload_file(file, location)
click.echo(f"Successfully PUT {location}{file}")
else:
if overwrite:
click.secho(
f"{location}{file} already exists. Overwriting it.", fg="yellow"
)
ctx.obj["backend"].upload_file(file, location)
click.echo(f"Successfully PUT {location}{file}")
else:
click.secho(
f"{location}{file} already exists. Pass --overwrite if you wish to replace it.",
fg="red",
)
# pylint: enable=too-many-arguments,too-many-locals
@main.command("get")
@click.argument("file", required=True, nargs=1)
@click.argument("location", required=False, nargs=1)
@click.pass_context
def get_cli(ctx, file, location): # pragma: no cover
"""
Download a copy of a file or directory from the device to the local computer.
"""
# click.echo(f"file: {file}")
# click.echo(f"location: {location}")
click.echo(f"running: get {file} {location}")
ctx.obj["backend"].download_file(file, location)
pass
@main.command("rm")
@click.argument("file", nargs=-1)
@click.pass_context
def rm_cli(ctx, module): # pragma: no cover
"""
Delete a file on the device.
"""
# device_path = ctx.obj["DEVICE_PATH"]
# print(f"Uninstalling {module} from {device_path}")
# for name in module:
# device_modules = ctx.obj["backend"].get_device_versions()
# name = name.lower()
# mod_names = {}
# for module_item, metadata in device_modules.items():
# mod_names[module_item.replace(".py", "").lower()] = metadata
# if name in mod_names:
# metadata = mod_names[name]
# module_path = metadata["path"]
# ctx.obj["backend"].uninstall(device_path, module_path)
# click.echo("Uninstalled '{}'.".format(name))
# else:
# click.echo("Module '{}' not found on device.".format(name))
# continue

View file

@ -94,7 +94,9 @@ setup(
"Topic :: Software Development :: Embedded Systems",
"Topic :: System :: Software Distribution",
],
entry_points={"console_scripts": ["circup=circup:main"]},
entry_points={
"console_scripts": ["circup=circup:main", "wwshell=circup.wwshell:main"]
},
# What does your project relate to?
keywords="adafruit, blinka, circuitpython, micropython, libraries",
# You can just specify the packages manually here if your project is