Make auto-install find user code recursively. Find code.py alternatives.

Auto install:
- Move finding libraries from auto-file into command utils (libraries_from_auto_file)
- Find the first possible code.py alternative in order (like main.py)
- Replace libraries_from_code_py using the ast module instead of findimports
- Get all imports to find local python imports
- Find all dependencies from user code recursively
Update backends:
- Add get_file_content in Backends subclasses
- Remove no longer used get_auto_file_path()
- Add list_dir() to DiskBackend
Update tests
- Add non-bundle imports
- Add submodule import
- Add another mock device to test finding code.py
This commit is contained in:
Neradoc 2025-02-02 19:17:48 +01:00
parent 43b31da905
commit d27ae8164c
10 changed files with 374 additions and 60 deletions

View file

@ -233,6 +233,12 @@ class Backend:
"""
raise NotImplementedError
def get_file_content(self, target_file):
"""
To be overridden by subclass
"""
raise NotImplementedError
def get_free_space(self):
"""
To be overridden by subclass
@ -618,6 +624,20 @@ class WebBackend(Backend):
f"Downloaded File: {os.path.join(location_to_paste, file_name)}"
)
def get_file_content(self, target_file):
"""
Get the content of a file from the MCU drive
:param target_file: The file on the MCU to download
:return:
"""
auth = HTTPBasicAuth("", self.password)
with self.session.get(
self.FS_URL + target_file, timeout=self.timeout, auth=auth
) as r:
if r.status_code == 404:
return None
return r.content # .decode("utf8")
def install_module_mpy(self, bundle, metadata):
"""
:param bundle library bundle.
@ -655,19 +675,6 @@ class WebBackend(Backend):
else:
self.install_file_http(source_path, location=location)
def get_auto_file_path(self, auto_file_path):
"""
Make a local temp copy of the --auto file from the device.
Returns the path to the local copy.
"""
url = auto_file_path
auth = HTTPBasicAuth("", self.password)
with self.session.get(url, auth=auth, timeout=self.timeout) as r:
r.raise_for_status()
with open(LOCAL_CODE_PY_COPY, "w", encoding="utf-8") as f:
f.write(r.text)
return LOCAL_CODE_PY_COPY
def uninstall(self, device_path, module_path):
"""
Uninstall given module on device using REST API.
@ -958,12 +965,6 @@ class DiskBackend(Backend):
# Copy file.
shutil.copyfile(source_path, target_path)
def get_auto_file_path(self, auto_file_path):
"""
Returns the path on the device to the file to be read for --auto.
"""
return auto_file_path
def uninstall(self, device_path, module_path):
"""
Uninstall module using local file system.
@ -1014,6 +1015,18 @@ class DiskBackend(Backend):
"""
return os.path.join(self.device_location, filename)
def get_file_content(self, target_file):
"""
Get the content of a file from the MCU drive
:param target_file: The file on the MCU to download
:return:
"""
file_path = self.get_file_path(target_file)
if os.path.exists(file_path):
with open(file_path, "rb") as file:
return file.read()
return None
def is_device_present(self):
"""
returns True if the device is currently connected
@ -1027,3 +1040,22 @@ class DiskBackend(Backend):
# pylint: disable=unused-variable
_, total, free = shutil.disk_usage(self.device_location)
return free
def list_dir(self, dirpath):
"""
Returns the list of files located in the given dirpath.
"""
files_list = []
files = os.listdir(os.path.join(self.device_location, dirpath))
for file_name in files:
file = os.path.join(self.device_location, dirpath, file_name)
stat = os.stat(file)
files_list.append(
{
"name": file_name,
"directory": os.path.isdir(file),
"modified_ns": stat.st_mtime_ns,
"file_size": stat.st_size,
}
)
return files_list

View file

@ -5,6 +5,7 @@
Functions called from commands in order to provide behaviors and return information.
"""
import ast
import ctypes
import glob
import os
@ -16,7 +17,6 @@ import zipfile
import json
import re
import toml
import findimports
import requests
import click
@ -41,6 +41,25 @@ WARNING_IGNORE_MODULES = (
"circuitpython-typing",
)
CODE_FILES = [
"code.txt",
"code.py",
"main.py",
"main.txt",
"code.txt.py",
"code.py.txt",
"code.txt.txt",
"code.py.py",
"main.txt.py",
"main.py.txt",
"main.txt.txt",
"main.py.py",
]
class CodeParsingException(Exception):
"""Exception thrown when parsing code with ast fails"""
def clean_library_name(assumed_library_name):
"""
@ -605,23 +624,138 @@ def tags_data_save_tag(key, tag):
json.dump(tags_data, data)
def libraries_from_code_py(code_py, mod_names):
def imports_from_code(full_content):
"""
Parse the given code.py file and return the imported libraries
Note that it's impossible at that level to differentiate between
import module.property and import module.submodule, so we try both
:param str code_py: Full path of the code.py file
:param str full_content: Code to read imports from
:param str module_name: Name of the module the code is from
:return: sequence of library names
"""
# pylint: disable=broad-except
try:
found_imports = findimports.find_imports(code_py)
except Exception as ex: # broad exception because anything could go wrong
logger.exception(ex)
click.secho('Unable to read the auto file: "{}"'.format(str(ex)), fg="red")
par = ast.parse(full_content)
except (SyntaxError, ValueError) as err:
raise CodeParsingException(err) from err
imports = set()
for thing in ast.walk(par):
if isinstance(thing, ast.Import):
for alias in thing.names:
imports.add(alias.name)
if isinstance(thing, ast.ImportFrom):
if thing.module is None:
for alias in thing.names:
imports.add(alias.name)
else:
imports.add(("." * thing.level) + thing.module)
# import parent modules (in practice it's the __init__.py)
for name in list(imports):
names = name.split(".")
for i in range(len(names)):
module = ".".join(names[: i + 1])
if module:
imports.add(module)
return sorted(imports)
def get_all_imports(
backend, auto_file_content, mod_names, current_module, visited=None
):
"""
Recursively retrieve imports from files on the backend
:param Backend backend: The current backend object
:param str auto_file_content: Content of the python file to analyse
:param list mod_names: Lits of supported bundle mod names
:param str current_module: Name of the call context module if recursive call
:param set visited: Modules previously visited
:return: sequence of library names
"""
if visited is None:
visited = set()
visited.add(current_module)
requested_installs = []
try:
imports = imports_from_code(auto_file_content)
except CodeParsingException as err:
click.secho(f"Error parsing {current_module}:\n {err}", fg="red")
sys.exit(2)
# pylint: enable=broad-except
imports = [info.name.split(".", 1)[0] for info in found_imports]
return [r for r in imports if r in mod_names]
for install in imports:
if install in visited:
continue
if install in mod_names:
requested_installs.append(install)
else:
# relative module paths
if install.startswith(".."):
install_module = "/".join(current_module.split(".")[:-1])
install_module = install_module + "." + install[2:]
elif install.startswith("."):
install_module = current_module + "." + install[1:]
else:
install_module = install
# possible files for the module: .py or __init__.py (if directory)
file_name = install_module.replace(".", "/") + ".py"
exists = backend.file_exists(file_name)
if not exists:
file_name = install_module.replace(".", "/") + "/__init__.py"
exists = backend.file_exists(file_name)
if not exists:
continue
# get the content and parse it recursively
auto_file_content = backend.get_file_content(file_name)
if auto_file_content:
sub_imports = get_all_imports(
backend, auto_file_content, mod_names, install_module, visited
)
requested_installs.extend(sub_imports)
return requested_installs
# [r for r in requested_installs if r in mod_names]
def libraries_from_auto_file(backend, auto_file, mod_names):
"""
Parse the input auto_file path and/or use the workflow to find the most
appropriate code.py script. Then return the list of imports
:param Backend backend: The current backend object
:param str auto_file: Path of the candidate auto file or None
:return: sequence of library names
"""
# find the current main file based on Circuitpython's rules
if auto_file is None:
root_files = [
file["name"] for file in backend.list_dir("") if not file["directory"]
]
for main_file in CODE_FILES:
if main_file in root_files:
auto_file = main_file
break
# still no code file found
if auto_file is None:
click.secho("No default code file found (code.py, main.py, etc.)", fg="red")
sys.exit(1)
# pass a local file with "./" or "../"
is_relative = auto_file.split(os.sep)[0] in [os.path.curdir, os.path.pardir]
if os.path.isabs(auto_file) or is_relative:
with open(auto_file, "r", encoding="UTF8") as fp:
auto_file_content = fp.read()
else:
auto_file_content = backend.get_file_content(auto_file)
if auto_file_content is None:
click.secho(f"Auto file not found: {auto_file}", fg="red")
sys.exit(1)
return get_all_imports(backend, auto_file_content, mod_names, auto_file)
def get_device_path(host, port, password, path):

View file

@ -34,7 +34,7 @@ from circup.command_utils import (
completion_for_install,
get_bundle_versions,
libraries_from_requirements,
libraries_from_code_py,
libraries_from_auto_file,
get_dependencies,
get_bundles_local_dict,
save_local_bundles,
@ -342,32 +342,12 @@ def install(
requirements_txt = rfile.read()
requested_installs = libraries_from_requirements(requirements_txt)
elif auto or auto_file:
if auto_file is None:
auto_file = "code.py"
print(f"Auto file: {auto_file}")
# pass a local file with "./" or "../"
is_relative = not isinstance(ctx.obj["backend"], WebBackend) or auto_file.split(
os.sep
)[0] in [os.path.curdir, os.path.pardir]
if not os.path.isabs(auto_file) and not is_relative:
auto_file = ctx.obj["backend"].get_file_path(auto_file or "code.py")
auto_file_path = ctx.obj["backend"].get_auto_file_path(auto_file)
print(f"Auto file path: {auto_file_path}")
if not os.path.isfile(auto_file_path):
# fell through to here when run from random folder on windows - ask backend.
new_auto_file = ctx.obj["backend"].get_file_path(auto_file)
if os.path.isfile(new_auto_file):
auto_file = new_auto_file
auto_file_path = ctx.obj["backend"].get_auto_file_path(auto_file)
print(f"Auto file path: {auto_file_path}")
else:
click.secho(f"Auto file not found: {auto_file}", fg="red")
sys.exit(1)
requested_installs = libraries_from_code_py(auto_file_path, mod_names)
requested_installs = libraries_from_auto_file(
ctx.obj["backend"], auto_file, mod_names
)
else:
requested_installs = modules
requested_installs = sorted(set(requested_installs))
click.echo(f"Searching for dependencies for: {requested_installs}")
to_install = get_dependencies(requested_installs, mod_names=mod_names)

View file

@ -2,7 +2,10 @@
#
# SPDX-License-Identifier: MIT
# pylint: disable=all
import os, sys
import adafruit_bus_device
from adafruit_button import Button
from adafruit_esp32spi import adafruit_esp32spi_socketpool
from adafruit_display_text import wrap_text_to_pixels, wrap_text_to_lines
import adafruit_hid.consumer_control
import import_styles_sub

View file

@ -0,0 +1,5 @@
# SPDX-FileCopyrightText: 2025 Neradoc
#
# SPDX-License-Identifier: MIT
# pylint: disable=all
import adafruit_ntp

4
tests/mock_device_2/.gitignore vendored Normal file
View file

@ -0,0 +1,4 @@
# SPDX-FileCopyrightText: 2025 Neradoc
#
# SPDX-License-Identifier: MIT
lib/*

View file

@ -0,0 +1,3 @@
Adafruit CircuitPython 8.1.0 on 2019-08-02; Adafruit CircuitPlayground Express with samd21g18
Board ID:this_is_a_board
UID:AAAABBBBCCCC

View file

@ -0,0 +1,3 @@
# SPDX-FileCopyrightText: 2023 Tim Cocks, written for Adafruit Industries
#
# SPDX-License-Identifier: MIT

View file

@ -0,0 +1,11 @@
# SPDX-FileCopyrightText: 2021 Jeff Epler for Adafruit Industries
#
# SPDX-License-Identifier: MIT
# pylint: disable=all
import os, sys
import adafruit_bus_device
from adafruit_button import Button
from adafruit_esp32spi import adafruit_esp32spi_socketpool
from adafruit_display_text import wrap_text_to_pixels, wrap_text_to_lines
import adafruit_hid.consumer_control
import import_styles_sub

View file

@ -42,6 +42,9 @@ from circup.command_utils import (
ensure_latest_bundle,
get_bundle,
get_bundles_dict,
imports_from_code,
get_all_imports,
libraries_from_auto_file,
)
from circup.shared import PLATFORMS
from circup.module import Module
@ -1129,8 +1132,28 @@ def test_show_match_py_command():
assert "0 shown" in result.output
def test_libraries_from_imports():
def test_imports_from_code():
"""Ensure that various styles of import all work"""
test_file = str(pathlib.Path(__file__).parent / "import_styles.py")
with open(test_file, "r", encoding="utf8") as fp:
test_data = fp.read()
result = imports_from_code(test_data)
assert result == [
"adafruit_bus_device",
"adafruit_button",
"adafruit_display_text",
"adafruit_esp32spi",
"adafruit_hid",
"adafruit_hid.consumer_control",
"import_styles_sub",
"os",
"sys",
]
def test_get_all_imports():
"""List all libraries from auto file recursively"""
mod_names = [
"adafruit_bus_device",
"adafruit_button",
@ -1141,20 +1164,133 @@ def test_libraries_from_imports():
"adafruit_oauth2",
"adafruit_requests",
"adafruit_touchscreen",
"adafruit_ntp",
]
test_file = str(pathlib.Path(__file__).parent / "import_styles.py")
result = circup.libraries_from_code_py(test_file, mod_names)
with mock.patch("circup.logger.info") as mock_logger, mock.patch(
"circup.os.path.isfile", return_value=True
), mock.patch(
"circup.bundle.Bundle.lib_dir",
return_value="tests",
):
tests_dir = pathlib.Path(__file__).parent
backend = DiskBackend(tests_dir / "mock_device", mock_logger)
test_file = str(tests_dir / "import_styles.py")
with open(test_file, "r", encoding="utf8") as fp:
test_data = fp.read()
result = get_all_imports(backend, test_data, mod_names, current_module="")
assert result == [
"adafruit_bus_device",
"adafruit_button",
"adafruit_display_text",
"adafruit_esp32spi",
"adafruit_hid",
"adafruit_ntp",
]
def test_libraries_from_auto_file_local():
"""Check that we get all libraries from auto file argument.
Testing here with a local file"""
mod_names = [
"adafruit_bus_device",
"adafruit_button",
"adafruit_display_shapes",
"adafruit_display_text",
"adafruit_esp32spi",
"adafruit_hid",
"adafruit_oauth2",
"adafruit_requests",
"adafruit_touchscreen",
"adafruit_ntp",
]
auto_file = "./tests/import_styles.py"
with mock.patch("circup.logger.info") as mock_logger, mock.patch(
"circup.os.path.isfile", return_value=True
), mock.patch(
"circup.bundle.Bundle.lib_dir",
return_value="tests",
):
tests_dir = pathlib.Path(__file__).parent
backend = DiskBackend(tests_dir / "mock_device", mock_logger)
result = libraries_from_auto_file(backend, auto_file, mod_names)
assert result == [
"adafruit_bus_device",
"adafruit_button",
"adafruit_display_text",
"adafruit_esp32spi",
"adafruit_hid",
"adafruit_ntp",
]
def test_libraries_from_auto_file_board():
"""Check that we find code.py on the board if we give no auto_file argument"""
mod_names = [
"adafruit_bus_device",
"adafruit_button",
"adafruit_display_shapes",
"adafruit_display_text",
"adafruit_esp32spi",
"adafruit_hid",
"adafruit_oauth2",
"adafruit_requests",
"adafruit_touchscreen",
"adafruit_ntp",
]
auto_file = None
with mock.patch("circup.logger.info") as mock_logger, mock.patch(
"circup.os.path.isfile", return_value=True
), mock.patch(
"circup.bundle.Bundle.lib_dir",
return_value="tests",
):
tests_dir = pathlib.Path(__file__).parent
backend = DiskBackend(tests_dir / "mock_device_2", mock_logger)
result = libraries_from_auto_file(backend, auto_file, mod_names)
assert result == [
"adafruit_bus_device",
"adafruit_button",
"adafruit_display_text",
"adafruit_esp32spi",
"adafruit_hid",
]
def test_libraries_from_imports_bad():
"""Ensure that we catch an import error"""
def test_libraries_from_auto_file_none():
"""Check that we exit if we give no auto_file argument
and there's no default code file"""
mod_names = []
auto_file = None
with mock.patch("circup.logger.info") as mock_logger, mock.patch(
"circup.os.path.isfile", return_value=True
), mock.patch(
"circup.bundle.Bundle.lib_dir",
return_value="tests",
):
tests_dir = pathlib.Path(__file__).parent
backend = DiskBackend(tests_dir / "mock_device", mock_logger)
try:
libraries_from_auto_file(backend, auto_file, mod_names)
raise Exception("Did not call exit")
except SystemExit as ex:
assert ex.code == 1
def test_install_auto_file_bad():
"""Ensure that we catch an error when parsing auto file"""
TEST_BUNDLE_MODULES = {"one.py": {}, "two.py": {}, "three.py": {}}
runner = CliRunner()
@ -1172,3 +1308,6 @@ def test_libraries_from_imports_bad():
],
)
assert result.exit_code == 2
# test_install_auto_file_good ?