Merge pull request #223 from FoamyGuy/wwshell

wwshell file management CLI
This commit is contained in:
Dan Halbert 2025-01-02 10:47:28 -05:00 committed by GitHub
commit a03d50463f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 466 additions and 16 deletions

View file

@ -73,7 +73,7 @@ class Backend:
"""
return self.get_modules(os.path.join(self.device_location, self.LIB_DIR_PATH))
def _create_library_directory(self, device_path, library_path):
def create_directory(self, device_path, directory):
"""
To be overridden by subclass
"""
@ -97,6 +97,12 @@ class Backend:
"""
raise NotImplementedError
def upload_file(self, target_file, location_to_paste):
"""Paste a copy of the specified file at the location given
To be overridden by subclass
"""
raise NotImplementedError
# pylint: disable=too-many-locals,too-many-branches,too-many-arguments,too-many-nested-blocks,too-many-statements
def install_module(
self, device_path, device_modules, name, pyext, mod_names, upgrade=False
@ -189,7 +195,7 @@ class Backend:
return
# Create the library directory first.
self._create_library_directory(device_path, library_path)
self.create_directory(device_path, library_path)
if local_path is None:
if pyext:
# Use Python source for module.
@ -281,7 +287,9 @@ class WebBackend(Backend):
):
super().__init__(logger)
if password is None:
raise ValueError("--host needs --password")
raise ValueError(
"Must pass --password or set CIRCUP_WEBWORKFLOW_PASSWORD environment variable"
)
# pylint: disable=no-member
# verify hostname/address
@ -306,6 +314,7 @@ class WebBackend(Backend):
self.library_path = self.device_location + "/" + self.LIB_DIR_PATH
self.timeout = timeout
self.version_override = version_override
self.FS_URL = urljoin(self.device_location, self.FS_PATH)
def __repr__(self):
return f"<WebBackend @{self.device_location}>"
@ -320,6 +329,7 @@ class WebBackend(Backend):
file_name = source.split(os.path.sep)
file_name = file_name[-2] if file_name[-1] == "" else file_name[-1]
print(f"inside install_file_http location: '{location}'")
if location is None:
target = self.device_location + "/" + self.LIB_DIR_PATH + file_name
else:
@ -328,7 +338,10 @@ class WebBackend(Backend):
auth = HTTPBasicAuth("", self.password)
with open(source, "rb") as fp:
print(f"upload file PUT URL: {target}")
r = self.session.put(target, fp.read(), auth=auth, timeout=self.timeout)
print(f"install_file_http response status: {r.status_code}")
print(r.content)
if r.status_code == 409:
_writeable_error()
r.raise_for_status()
@ -546,10 +559,9 @@ class WebBackend(Backend):
metadata["path"] = sfm_url
result[sfm[:idx]] = metadata
def _create_library_directory(self, device_path, library_path):
url = urlparse(device_path)
auth = HTTPBasicAuth("", url.password)
with self.session.put(library_path, auth=auth, timeout=self.timeout) as r:
def create_directory(self, device_path, directory):
auth = HTTPBasicAuth("", self.password)
with self.session.put(directory, auth=auth, timeout=self.timeout) as r:
if r.status_code == 409:
_writeable_error()
r.raise_for_status()
@ -560,11 +572,57 @@ class WebBackend(Backend):
self.device_location,
"/".join(("fs", location_to_paste, target_file, "")),
)
self._create_library_directory(self.device_location, create_directory_url)
self.create_directory(self.device_location, create_directory_url)
self.install_dir_http(target_file)
else:
self.install_file_http(target_file)
def upload_file(self, target_file, location_to_paste):
"""
copy a file from the host PC to the microcontroller
:param target_file: file on the host PC to copy
:param location_to_paste: Location on the microcontroller to paste it.
:return:
"""
print(f"inside upload_file location_to_paste: '{location_to_paste}'")
if os.path.isdir(target_file):
create_directory_url = urljoin(
self.device_location,
"/".join(("fs", location_to_paste, target_file, "")),
)
self.create_directory(self.device_location, create_directory_url)
self.install_dir_http(target_file, location_to_paste)
else:
self.install_file_http(target_file, location_to_paste)
def download_file(self, target_file, location_to_paste):
"""
Download a file from the MCU device to the local host PC
:param target_file: The file on the MCU to download
:param location_to_paste: The location on the host PC to put the downloaded copy.
:return:
"""
auth = HTTPBasicAuth("", self.password)
with self.session.get(
self.FS_URL + target_file, timeout=self.timeout, auth=auth
) as r:
if r.status_code == 404:
click.secho(f"{target_file} was not found on the device", "red")
file_name = target_file.split("/")[-1]
if location_to_paste is None:
with open(file_name, "wb") as f:
f.write(r.content)
click.echo(f"Downloaded File: {file_name}")
else:
with open(os.path.join(location_to_paste, file_name), "wb") as f:
f.write(r.content)
click.echo(
f"Downloaded File: {os.path.join(location_to_paste, file_name)}"
)
def install_module_mpy(self, bundle, metadata):
"""
:param bundle library bundle.
@ -640,6 +698,7 @@ class WebBackend(Backend):
return True if the file exists, otherwise False.
"""
auth = HTTPBasicAuth("", self.password)
print(f"URL: {self.get_file_path(filepath)}")
resp = requests.get(
self.get_file_path(filepath), auth=auth, timeout=self.timeout
)
@ -668,11 +727,7 @@ class WebBackend(Backend):
"""
retuns the full path on the device to a given file name.
"""
return urljoin(
urljoin(self.device_location, "fs/", allow_fragments=False),
filename,
allow_fragments=False,
)
return "/".join((self.device_location, "fs", filename))
def is_device_present(self):
"""
@ -743,6 +798,20 @@ class WebBackend(Backend):
return r.json()["free"] * r.json()["block_size"] # bytes
sys.exit(1)
def list_dir(self, dirpath):
"""
Returns the list of files located in the given dirpath.
"""
auth = HTTPBasicAuth("", self.password)
with self.session.get(
urljoin(self.device_location, f"fs/{dirpath if dirpath else ''}"),
auth=auth,
headers={"Accept": "application/json"},
timeout=self.timeout,
) as r:
print(r.content)
return r.json()["files"]
class DiskBackend(Backend):
"""
@ -821,9 +890,9 @@ class DiskBackend(Backend):
"""
return _get_modules_file(device_lib_path, self.logger)
def _create_library_directory(self, device_path, library_path):
if not os.path.exists(library_path): # pragma: no cover
os.makedirs(library_path)
def create_directory(self, device_path, directory):
if not os.path.exists(directory): # pragma: no cover
os.makedirs(directory)
def copy_file(self, target_file, location_to_paste):
target_filename = target_file.split(os.path.sep)[-1]
@ -838,6 +907,9 @@ class DiskBackend(Backend):
os.path.join(self.device_location, location_to_paste, target_filename),
)
def upload_file(self, target_file, location_to_paste):
self.copy_file(target_file, location_to_paste)
def install_module_mpy(self, bundle, metadata):
"""
:param bundle library bundle.

View file

@ -625,3 +625,29 @@ def get_device_path(host, port, password, path):
else:
device_path = find_device()
return device_path
def sorted_by_directory_then_alpha(list_of_files):
"""
Sort the list of files into alphabetical seperated
with directories grouped together before files.
"""
dirs = {}
files = {}
for cur_file in list_of_files:
if cur_file["directory"]:
dirs[cur_file["name"]] = cur_file
else:
files[cur_file["name"]] = cur_file
sorted_dir_names = sorted(dirs.keys())
sorted_file_names = sorted(files.keys())
sorted_full_list = []
for cur_name in sorted_dir_names:
sorted_full_list.append(dirs[cur_name])
for cur_name in sorted_file_names:
sorted_full_list.append(files[cur_name])
return sorted_full_list

105
circup/wwshell/README.rst Normal file
View file

@ -0,0 +1,105 @@
wwshell
=======
.. image:: https://readthedocs.org/projects/circup/badge/?version=latest
:target: https://circuitpython.readthedocs.io/projects/circup/en/latest/
:alt: Documentation Status
.. image:: https://img.shields.io/discord/327254708534116352.svg
:target: https://adafru.it/discord
:alt: Discord
.. image:: https://github.com/adafruit/circup/workflows/Build%20CI/badge.svg
:target: https://github.com/adafruit/circup/actions
:alt: Build Status
.. image:: https://img.shields.io/badge/code%20style-black-000000.svg
:target: https://github.com/psf/black
:alt: Code Style: Black
A tool to manage files on a CircuitPython device via wireless workflows.
Currently supports Web Workflow.
.. contents::
Installation
------------
wwshell is bundled along with Circup. When you install Circup you'll get wwshell automatically.
Circup requires Python 3.5 or higher.
In a `virtualenv <https://virtualenv.pypa.io/en/latest/>`_,
``pip install circup`` should do the trick. This is the simplest way to make it
work.
If you have no idea what a virtualenv is, try the following command,
``pip3 install --user circup``.
.. note::
If you use the ``pip3`` command to install CircUp you must make sure that
your path contains the directory into which the script will be installed.
To discover this path,
* On Unix-like systems, type ``python3 -m site --user-base`` and append
``bin`` to the resulting path.
* On Windows, type the same command, but append ``Scripts`` to the
resulting path.
What does wwshell do?
---------------------
It lets you view, delete, upload, and download files from your Circuitpython device
via wireless workflows. Similar to ampy, but operates over wireless workflow rather
than USB serial.
Usage
-----
To use web workflow you need to enable it by putting WIFI credentials and a web workflow
password into your settings.toml file. `See here <https://learn.adafruit.com/getting-started-with-web-workflow-using-the-code-editor/device-setup>`_,
To get help, just type the command::
$ wwshell
Usage: wwshell [OPTIONS] COMMAND [ARGS]...
A tool to manage files CircuitPython device over web workflow.
Options:
--verbose Comprehensive logging is sent to stdout.
--path DIRECTORY Path to CircuitPython directory. Overrides automatic path
detection.
--host TEXT Hostname or IP address of a device. Overrides automatic
path detection.
--password TEXT Password to use for authentication when --host is used.
You can optionally set an environment variable
CIRCUP_WEBWORKFLOW_PASSWORD instead of passing this
argument. If both exist the CLI arg takes precedent.
--timeout INTEGER Specify the timeout in seconds for any network
operations.
--version Show the version and exit.
--help Show this message and exit.
Commands:
get Download a copy of a file or directory from the device to the...
ls Lists the contents of a directory.
put Upload a copy of a file or directory from the local computer to...
rm Delete a file on the device.
.. note::
If you find a bug, or you want to suggest an enhancement or new feature
feel free to create an issue or submit a pull request here:
https://github.com/adafruit/circup
Discussion of this tool happens on the Adafruit CircuitPython
`Discord channel <https://discord.gg/rqrKDjU>`_.

View file

@ -0,0 +1,3 @@
# SPDX-FileCopyrightText: 2024 Tim Cocks, written for Adafruit Industries
#
# SPDX-License-Identifier: MIT

View file

@ -0,0 +1,14 @@
# SPDX-FileCopyrightText: 2024 Tim Cocks, written for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
wwshell is a CLI utility for managing files on CircuitPython devices via wireless workflows.
It currently supports Web Workflow.
"""
from .commands import main
# Allows execution via `python -m circup ...`
# pylint: disable=no-value-for-parameter
if __name__ == "__main__":
main()

227
circup/wwshell/commands.py Normal file
View file

@ -0,0 +1,227 @@
# SPDX-FileCopyrightText: 2019 Nicholas Tollervey, 2024 Tim Cocks, written for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
# ----------- CLI command definitions ----------- #
The following functions have IO side effects (for instance they emit to
stdout). Ergo, these are not checked with unit tests. Most of the
functionality they provide is provided by the functions from util_functions.py,
and the respective Backends which *are* tested. Most of the logic of the following
functions is to prepare things for presentation to / interaction with the user.
"""
import os
import time
import sys
import logging
import update_checker
import click
import requests
from circup.backends import WebBackend
from circup.logging import logger, log_formatter, LOGFILE
from circup.shared import BOARDLESS_COMMANDS
from circup.command_utils import (
get_device_path,
get_circup_version,
sorted_by_directory_then_alpha,
)
@click.group()
@click.option(
"--verbose", is_flag=True, help="Comprehensive logging is sent to stdout."
)
@click.option(
"--path",
type=click.Path(exists=True, file_okay=False),
help="Path to CircuitPython directory. Overrides automatic path detection.",
)
@click.option(
"--host",
help="Hostname or IP address of a device. Overrides automatic path detection.",
default="circuitpython.local",
)
@click.option(
"--password",
help="Password to use for authentication when --host is used."
" You can optionally set an environment variable CIRCUP_WEBWORKFLOW_PASSWORD"
" instead of passing this argument. If both exist the CLI arg takes precedent.",
)
@click.option(
"--timeout",
default=30,
help="Specify the timeout in seconds for any network operations.",
)
@click.version_option(
prog_name="CircFile",
message="%(prog)s, A CircuitPython web workflow file managemenr. Version %(version)s",
)
@click.pass_context
def main( # pylint: disable=too-many-locals
ctx,
verbose,
path,
host,
password,
timeout,
): # pragma: no cover
"""
A tool to manage files CircuitPython device over web workflow.
"""
# pylint: disable=too-many-arguments,too-many-branches,too-many-statements,too-many-locals
ctx.ensure_object(dict)
ctx.obj["TIMEOUT"] = timeout
if password is None:
password = os.getenv("CIRCUP_WEBWORKFLOW_PASSWORD")
device_path = get_device_path(host, password, path)
using_webworkflow = "host" in ctx.params.keys() and ctx.params["host"] is not None
print(f"host: {ctx.params['host']}")
print(f"using webworkflow: {using_webworkflow}")
if using_webworkflow:
if host == "circuitpython.local":
click.echo("Checking versions.json on circuitpython.local to find hostname")
versions_resp = requests.get(
"http://circuitpython.local/cp/version.json", timeout=timeout
)
host = f'{versions_resp.json()["hostname"]}.local'
click.echo(f"Using hostname: {host}")
device_path = device_path.replace("circuitpython.local", host)
try:
ctx.obj["backend"] = WebBackend(
host=host, password=password, logger=logger, timeout=timeout
)
except ValueError as e:
click.secho(e, fg="red")
time.sleep(0.3)
sys.exit(1)
except RuntimeError as e:
click.secho(e, fg="red")
sys.exit(1)
if verbose:
# Configure additional logging to stdout.
ctx.obj["verbose"] = True
verbose_handler = logging.StreamHandler(sys.stdout)
verbose_handler.setLevel(logging.INFO)
verbose_handler.setFormatter(log_formatter)
logger.addHandler(verbose_handler)
click.echo("Logging to {}\n".format(LOGFILE))
else:
ctx.obj["verbose"] = False
logger.info("### Started Circfile ###")
# If a newer version of circfile is available, print a message.
logger.info("Checking for a newer version of circfile")
version = get_circup_version()
if version:
update_checker.update_check("circfile", version)
# stop early if the command is boardless
if ctx.invoked_subcommand in BOARDLESS_COMMANDS or "--help" in sys.argv:
return
ctx.obj["DEVICE_PATH"] = device_path
if device_path is None or not ctx.obj["backend"].is_device_present():
click.secho("Could not find a connected CircuitPython device.", fg="red")
sys.exit(1)
else:
click.echo("Found device at {}.".format(device_path))
@main.command("ls")
@click.argument("file", required=True, nargs=1, default="/")
@click.pass_context
def ls_cli(ctx, file): # pragma: no cover
"""
Lists the contents of a directory. Defaults to root directory
if not supplied.
"""
logger.info("ls")
if not file.endswith("/"):
file += "/"
click.echo(f"running: ls {file}")
files = ctx.obj["backend"].list_dir(file)
click.echo("Size\tName")
for cur_file in sorted_by_directory_then_alpha(files):
click.echo(
f"{cur_file['file_size']}\t{cur_file['name']}{'/' if cur_file['directory'] else ''}"
)
@main.command("put")
@click.argument("file", required=True, nargs=1)
@click.argument("location", required=False, nargs=1, default="")
@click.option("--overwrite", is_flag=True, help="Overwrite the file if it exists.")
@click.pass_context
def put_cli(ctx, file, location, overwrite):
"""
Upload a copy of a file or directory from the local computer
to the device
"""
click.echo(f"Attempting PUT: {file} at {location} overwrite? {overwrite}")
if not ctx.obj["backend"].file_exists(f"{location}{file}"):
ctx.obj["backend"].upload_file(file, location)
click.echo(f"Successfully PUT {location}{file}")
else:
if overwrite:
click.secho(
f"{location}{file} already exists. Overwriting it.", fg="yellow"
)
ctx.obj["backend"].upload_file(file, location)
click.echo(f"Successfully PUT {location}{file}")
else:
click.secho(
f"{location}{file} already exists. Pass --overwrite if you wish to replace it.",
fg="red",
)
# pylint: enable=too-many-arguments,too-many-locals
@main.command("get")
@click.argument("file", required=True, nargs=1)
@click.argument("location", required=False, nargs=1)
@click.pass_context
def get_cli(ctx, file, location): # pragma: no cover
"""
Download a copy of a file or directory from the device to the local computer.
"""
click.echo(f"running: get {file} {location}")
ctx.obj["backend"].download_file(file, location)
@main.command("rm")
@click.argument("file", nargs=1)
@click.pass_context
def rm_cli(ctx, file): # pragma: no cover
"""
Delete a file on the device.
"""
click.echo(f"running: rm {file}")
ctx.obj["backend"].uninstall(
ctx.obj["backend"].device_location, ctx.obj["backend"].get_file_path(file)
)
@main.command("mkdir")
@click.argument("directory", nargs=1)
@click.pass_context
def mkdir_cli(ctx, directory): # pragma: no cover
"""
Create
"""
click.echo(f"running: mkdir {directory}")
ctx.obj["backend"].create_directory(
ctx.obj["backend"].device_location, ctx.obj["backend"].get_file_path(directory)
)

View file

@ -5,6 +5,8 @@
.. include:: ../README.rst
.. include:: ../circup/wwshell/README.rst
.. include:: ../CONTRIBUTING.rst
API

View file

@ -42,6 +42,7 @@ optional-dependencies = {optional = {file = ["optional_requirements.txt"]}}
[project.scripts]
circup = "circup:main"
wwshell = "circup.wwshell:main"
[project.urls]
homepage = "https://github.com/adafruit/circup"