843 lines
30 KiB
Python
843 lines
30 KiB
Python
# SPDX-FileCopyrightText: 2019 Nicholas Tollervey, 2024 Tim Cocks, written for Adafruit Industries
|
|
#
|
|
# SPDX-License-Identifier: MIT
|
|
"""
|
|
Functions called from commands in order to provide behaviors and return information.
|
|
"""
|
|
|
|
import ast
|
|
import ctypes
|
|
import glob
|
|
import os
|
|
|
|
from subprocess import check_output
|
|
import sys
|
|
import shutil
|
|
import zipfile
|
|
import json
|
|
import re
|
|
import toml
|
|
import requests
|
|
import click
|
|
|
|
from circup.shared import (
|
|
PLATFORMS,
|
|
REQUESTS_TIMEOUT,
|
|
_get_modules_file,
|
|
BUNDLE_CONFIG_OVERWRITE,
|
|
BUNDLE_CONFIG_FILE,
|
|
BUNDLE_CONFIG_LOCAL,
|
|
BUNDLE_DATA,
|
|
NOT_MCU_LIBRARIES,
|
|
tags_data_load,
|
|
)
|
|
from circup.logging import logger
|
|
from circup.module import Module
|
|
from circup.bundle import Bundle
|
|
|
|
WARNING_IGNORE_MODULES = (
|
|
"typing-extensions",
|
|
"pyasn1",
|
|
"circuitpython-typing",
|
|
)
|
|
|
|
CODE_FILES = [
|
|
"code.txt",
|
|
"code.py",
|
|
"main.py",
|
|
"main.txt",
|
|
"code.txt.py",
|
|
"code.py.txt",
|
|
"code.txt.txt",
|
|
"code.py.py",
|
|
"main.txt.py",
|
|
"main.py.txt",
|
|
"main.txt.txt",
|
|
"main.py.py",
|
|
]
|
|
|
|
|
|
class CodeParsingException(Exception):
|
|
"""Exception thrown when parsing code with ast fails"""
|
|
|
|
|
|
def clean_library_name(assumed_library_name):
|
|
"""
|
|
Most CP repos and library names are look like this:
|
|
|
|
repo: Adafruit_CircuitPython_LC709203F
|
|
library: adafruit_lc709203f
|
|
|
|
But some do not and this handles cleaning that up.
|
|
Also cleans up if the pypi or reponame is passed in instead of the
|
|
CP library name.
|
|
|
|
:param str assumed_library_name: An assumed name of a library from user
|
|
or requirements.txt entry
|
|
:return: str proper library name
|
|
"""
|
|
not_standard_names = {
|
|
# Assumed Name : Actual Name
|
|
"adafruit_adafruitio": "adafruit_io",
|
|
"adafruit_asyncio": "asyncio",
|
|
"adafruit_busdevice": "adafruit_bus_device",
|
|
"adafruit_connectionmanager": "adafruit_connection_manager",
|
|
"adafruit_display_button": "adafruit_button",
|
|
"adafruit_neopixel": "neopixel",
|
|
"adafruit_sd": "adafruit_sdcard",
|
|
"adafruit_simpleio": "simpleio",
|
|
"pimoroni_ltr559": "pimoroni_circuitpython_ltr559",
|
|
}
|
|
if "circuitpython" in assumed_library_name:
|
|
# convert repo or pypi name to common library name
|
|
assumed_library_name = (
|
|
assumed_library_name.replace("-circuitpython-", "_")
|
|
.replace("_circuitpython_", "_")
|
|
.replace("-", "_")
|
|
)
|
|
if assumed_library_name in not_standard_names:
|
|
return not_standard_names[assumed_library_name]
|
|
return assumed_library_name
|
|
|
|
|
|
def completion_for_install(ctx, param, incomplete):
|
|
"""
|
|
Returns the list of available modules for the command line tab-completion
|
|
with the ``circup install`` command.
|
|
"""
|
|
# pylint: disable=unused-argument
|
|
available_modules = get_bundle_versions(get_bundles_list(), avoid_download=True)
|
|
module_names = {m.replace(".py", "") for m in available_modules}
|
|
if incomplete:
|
|
module_names = [name for name in module_names if name.startswith(incomplete)]
|
|
module_names.extend(glob.glob(f"{incomplete}*"))
|
|
return sorted(module_names)
|
|
|
|
|
|
def completion_for_example(ctx, param, incomplete):
|
|
"""
|
|
Returns the list of available modules for the command line tab-completion
|
|
with the ``circup example`` command.
|
|
"""
|
|
|
|
# pylint: disable=unused-argument, consider-iterating-dictionary
|
|
available_examples = get_bundle_examples(get_bundles_list(), avoid_download=True)
|
|
|
|
matching_examples = [
|
|
example_path
|
|
for example_path in available_examples.keys()
|
|
if example_path.startswith(incomplete)
|
|
]
|
|
|
|
return sorted(matching_examples)
|
|
|
|
|
|
def ensure_latest_bundle(bundle):
|
|
"""
|
|
Ensure that there's a copy of the latest library bundle available so circup
|
|
can check the metadata contained therein.
|
|
|
|
:param Bundle bundle: the target Bundle object.
|
|
"""
|
|
logger.info("Checking library updates for %s.", bundle.key)
|
|
tag = bundle.latest_tag
|
|
do_update = False
|
|
if tag == bundle.current_tag:
|
|
for platform in PLATFORMS:
|
|
# missing directories (new platform added on an existing install
|
|
# or side effect of pytest or network errors)
|
|
do_update = do_update or not os.path.isdir(bundle.lib_dir(platform))
|
|
else:
|
|
do_update = True
|
|
|
|
if do_update:
|
|
logger.info("New version available (%s).", tag)
|
|
try:
|
|
get_bundle(bundle, tag)
|
|
tags_data_save_tag(bundle.key, tag)
|
|
except requests.exceptions.HTTPError as ex:
|
|
# See #20 for reason for this
|
|
click.secho(
|
|
(
|
|
"There was a problem downloading that platform bundle. "
|
|
"Skipping and using existing download if available."
|
|
),
|
|
fg="red",
|
|
)
|
|
logger.exception(ex)
|
|
else:
|
|
logger.info("Current bundle up to date %s.", tag)
|
|
|
|
|
|
def find_device():
|
|
"""
|
|
Return the location on the filesystem for the connected CircuitPython device.
|
|
This is based upon how Mu discovers this information.
|
|
|
|
:return: The path to the device on the local filesystem.
|
|
"""
|
|
device_dir = None
|
|
# Attempt to find the path on the filesystem that represents the plugged in
|
|
# CIRCUITPY board.
|
|
if os.name == "posix":
|
|
# Linux / OSX
|
|
for mount_command in ["mount", "/sbin/mount"]:
|
|
try:
|
|
mount_output = check_output(mount_command).splitlines()
|
|
mounted_volumes = [x.split()[2] for x in mount_output]
|
|
for volume in mounted_volumes:
|
|
if volume.endswith(b"CIRCUITPY"):
|
|
device_dir = volume.decode("utf-8")
|
|
except FileNotFoundError:
|
|
continue
|
|
elif os.name == "nt":
|
|
# Windows
|
|
|
|
def get_volume_name(disk_name):
|
|
"""
|
|
Each disk or external device connected to windows has an attribute
|
|
called "volume name". This function returns the volume name for the
|
|
given disk/device.
|
|
|
|
Based upon answer given here: http://stackoverflow.com/a/12056414
|
|
"""
|
|
vol_name_buf = ctypes.create_unicode_buffer(1024)
|
|
ctypes.windll.kernel32.GetVolumeInformationW(
|
|
ctypes.c_wchar_p(disk_name),
|
|
vol_name_buf,
|
|
ctypes.sizeof(vol_name_buf),
|
|
None,
|
|
None,
|
|
None,
|
|
None,
|
|
0,
|
|
)
|
|
return vol_name_buf.value
|
|
|
|
#
|
|
# In certain circumstances, volumes are allocated to USB
|
|
# storage devices which cause a Windows popup to raise if their
|
|
# volume contains no media. Wrapping the check in SetErrorMode
|
|
# with SEM_FAILCRITICALERRORS (1) prevents this popup.
|
|
#
|
|
old_mode = ctypes.windll.kernel32.SetErrorMode(1)
|
|
try:
|
|
for disk in "ABCDEFGHIJKLMNOPQRSTUVWXYZ":
|
|
path = "{}:\\".format(disk)
|
|
if os.path.exists(path) and get_volume_name(path) == "CIRCUITPY":
|
|
device_dir = path
|
|
# Report only the FIRST device found.
|
|
break
|
|
finally:
|
|
ctypes.windll.kernel32.SetErrorMode(old_mode)
|
|
else:
|
|
# No support for unknown operating systems.
|
|
raise NotImplementedError('OS "{}" not supported.'.format(os.name))
|
|
logger.info("Found device: %s", device_dir)
|
|
return device_dir
|
|
|
|
|
|
def find_modules(backend, bundles_list):
|
|
"""
|
|
Extracts metadata from the connected device and available bundles and
|
|
returns this as a list of Module instances representing the modules on the
|
|
device.
|
|
|
|
:param Backend backend: Backend with the device connection.
|
|
:param List[Bundle] bundles_list: List of supported bundles as Bundle objects.
|
|
:return: A list of Module instances describing the current state of the
|
|
modules on the connected device.
|
|
"""
|
|
# pylint: disable=broad-except,too-many-locals
|
|
try:
|
|
device_modules = backend.get_device_versions()
|
|
bundle_modules = get_bundle_versions(bundles_list)
|
|
result = []
|
|
for key, device_metadata in device_modules.items():
|
|
|
|
if key in bundle_modules:
|
|
path = device_metadata["path"]
|
|
bundle_metadata = bundle_modules[key]
|
|
repo = bundle_metadata.get("__repo__")
|
|
bundle = bundle_metadata.get("bundle")
|
|
device_version = device_metadata.get("__version__")
|
|
bundle_version = bundle_metadata.get("__version__")
|
|
mpy = device_metadata["mpy"]
|
|
compatibility = device_metadata.get("compatibility", (None, None))
|
|
module_name = (
|
|
path.split(os.sep)[-1]
|
|
if not path.endswith(os.sep)
|
|
else path[:-1].split(os.sep)[-1] + os.sep
|
|
)
|
|
|
|
m = Module(
|
|
module_name,
|
|
backend,
|
|
repo,
|
|
device_version,
|
|
bundle_version,
|
|
mpy,
|
|
bundle,
|
|
compatibility,
|
|
)
|
|
result.append(m)
|
|
return result
|
|
except Exception as ex:
|
|
# If it's not possible to get the device and bundle metadata, bail out
|
|
# with a friendly message and indication of what's gone wrong.
|
|
logger.exception(ex)
|
|
click.echo("There was a problem: {}".format(ex))
|
|
sys.exit(1)
|
|
# pylint: enable=broad-except,too-many-locals
|
|
|
|
|
|
def get_bundle(bundle, tag):
|
|
"""
|
|
Downloads and extracts the version of the bundle with the referenced tag.
|
|
The resulting zip file is saved on the local filesystem.
|
|
|
|
:param Bundle bundle: the target Bundle object.
|
|
:param str tag: The GIT tag to use to download the bundle.
|
|
"""
|
|
click.echo(f"Downloading latest bundles for {bundle.key} ({tag}).")
|
|
for platform, github_string in PLATFORMS.items():
|
|
# Report the platform: "8.x-mpy", etc.
|
|
click.echo(f"{github_string}:")
|
|
url = bundle.url_format.format(platform=github_string, tag=tag)
|
|
logger.info("Downloading bundle: %s", url)
|
|
r = requests.get(url, stream=True, timeout=REQUESTS_TIMEOUT)
|
|
# pylint: disable=no-member
|
|
if r.status_code != requests.codes.ok:
|
|
logger.warning("Unable to connect to %s", url)
|
|
r.raise_for_status()
|
|
# pylint: enable=no-member
|
|
total_size = int(r.headers.get("Content-Length"))
|
|
temp_zip = bundle.zip.format(platform=platform)
|
|
with click.progressbar(
|
|
r.iter_content(1024), label="Extracting:", length=total_size
|
|
) as pbar, open(temp_zip, "wb") as zip_fp:
|
|
for chunk in pbar:
|
|
zip_fp.write(chunk)
|
|
pbar.update(len(chunk))
|
|
logger.info("Saved to %s", temp_zip)
|
|
temp_dir = bundle.dir.format(platform=platform)
|
|
if os.path.isdir(temp_dir):
|
|
shutil.rmtree(temp_dir)
|
|
with zipfile.ZipFile(temp_zip, "r") as zfile:
|
|
zfile.extractall(temp_dir)
|
|
bundle.current_tag = tag
|
|
click.echo("\nOK\n")
|
|
|
|
|
|
def get_bundle_examples(bundles_list, avoid_download=False):
|
|
"""
|
|
Return a dictionary of metadata from examples in the all of the bundles
|
|
specified by bundles_list argument.
|
|
|
|
:param List[Bundle] bundles_list: List of supported bundles as Bundle objects.
|
|
:param bool avoid_download: if True, download the bundle only if missing.
|
|
:return: A dictionary of metadata about the examples available in the
|
|
library bundle.
|
|
"""
|
|
# pylint: disable=too-many-nested-blocks,too-many-locals
|
|
all_the_examples = dict()
|
|
bundle_examples = dict()
|
|
|
|
try:
|
|
for bundle in bundles_list:
|
|
if not avoid_download or not os.path.isdir(bundle.lib_dir("py")):
|
|
ensure_latest_bundle(bundle)
|
|
path = bundle.examples_dir("py")
|
|
meta_saved = os.path.join(path, "../bundle_examples.json")
|
|
if os.path.exists(meta_saved):
|
|
with open(meta_saved, "r", encoding="utf-8") as f:
|
|
bundle_examples = json.load(f)
|
|
all_the_examples.update(bundle_examples)
|
|
bundle_examples.clear()
|
|
continue
|
|
path_examples = _get_modules_file(path, logger)
|
|
for lib_name, lib_metadata in path_examples.items():
|
|
for _dir_level in os.walk(lib_metadata["path"]):
|
|
for _file in _dir_level[2]:
|
|
_parts = _dir_level[0].split(os.path.sep)
|
|
_lib_name_index = _parts.index(lib_name)
|
|
_dirs = _parts[_lib_name_index:]
|
|
if _dirs[-1] == "":
|
|
_dirs.pop(-1)
|
|
slug = f"{os.path.sep}".join(_dirs + [_file.replace(".py", "")])
|
|
bundle_examples[slug] = os.path.join(_dir_level[0], _file)
|
|
all_the_examples[slug] = os.path.join(_dir_level[0], _file)
|
|
|
|
with open(meta_saved, "w", encoding="utf-8") as f:
|
|
json.dump(bundle_examples, f)
|
|
bundle_examples.clear()
|
|
|
|
except NotADirectoryError:
|
|
# Bundle does not have new style examples directory
|
|
# so we cannot include its examples.
|
|
pass
|
|
return all_the_examples
|
|
|
|
|
|
def get_bundle_versions(bundles_list, avoid_download=False):
|
|
"""
|
|
Returns a dictionary of metadata from modules in the latest known release
|
|
of the library bundle. Uses the Python version (rather than the compiled
|
|
version) of the library modules.
|
|
|
|
:param List[Bundle] bundles_list: List of supported bundles as Bundle objects.
|
|
:param bool avoid_download: if True, download the bundle only if missing.
|
|
:return: A dictionary of metadata about the modules available in the
|
|
library bundle.
|
|
"""
|
|
all_the_modules = dict()
|
|
for bundle in bundles_list:
|
|
if not avoid_download or not os.path.isdir(bundle.lib_dir("py")):
|
|
ensure_latest_bundle(bundle)
|
|
path = bundle.lib_dir("py")
|
|
path_modules = _get_modules_file(path, logger)
|
|
for name, module in path_modules.items():
|
|
module["bundle"] = bundle
|
|
if name not in all_the_modules: # here we decide the order of priority
|
|
all_the_modules[name] = module
|
|
return all_the_modules
|
|
|
|
|
|
def get_bundles_dict():
|
|
"""
|
|
Retrieve the dictionary from BUNDLE_CONFIG_FILE (JSON).
|
|
Put the local dictionary in front, so it gets priority.
|
|
It's a dictionary of bundle string identifiers.
|
|
|
|
:return: Combined dictionaries from the config files.
|
|
"""
|
|
bundle_dict = get_bundles_local_dict()
|
|
try:
|
|
with open(BUNDLE_CONFIG_OVERWRITE, "rb") as bundle_config_json:
|
|
bundle_config = json.load(bundle_config_json)
|
|
except (FileNotFoundError, json.decoder.JSONDecodeError):
|
|
with open(BUNDLE_CONFIG_FILE, "rb") as bundle_config_json:
|
|
bundle_config = json.load(bundle_config_json)
|
|
for name, bundle in bundle_config.items():
|
|
if bundle not in bundle_dict.values():
|
|
bundle_dict[name] = bundle
|
|
return bundle_dict
|
|
|
|
|
|
def get_bundles_local_dict():
|
|
"""
|
|
Retrieve the local bundles from BUNDLE_CONFIG_LOCAL (JSON).
|
|
|
|
:return: Raw dictionary from the config file(s).
|
|
"""
|
|
try:
|
|
with open(BUNDLE_CONFIG_LOCAL, "rb") as bundle_config_json:
|
|
bundle_config = json.load(bundle_config_json)
|
|
if not isinstance(bundle_config, dict) or not bundle_config:
|
|
logger.error("Local bundle list invalid. Skipped.")
|
|
raise FileNotFoundError("Bad local bundle list")
|
|
return bundle_config
|
|
except (FileNotFoundError, json.decoder.JSONDecodeError):
|
|
return dict()
|
|
|
|
|
|
def get_bundles_list():
|
|
"""
|
|
Retrieve the list of bundles from the config dictionary.
|
|
|
|
:return: List of supported bundles as Bundle objects.
|
|
"""
|
|
bundle_config = get_bundles_dict()
|
|
bundles_list = [Bundle(bundle_config[b]) for b in bundle_config]
|
|
logger.info("Using bundles: %s", ", ".join(b.key for b in bundles_list))
|
|
return bundles_list
|
|
|
|
|
|
def get_circup_version():
|
|
"""Return the version of circup that is running. If not available, return None.
|
|
|
|
:return: Current version of circup, or None.
|
|
"""
|
|
try:
|
|
from importlib import metadata # pylint: disable=import-outside-toplevel
|
|
except ImportError:
|
|
try:
|
|
import importlib_metadata as metadata # pylint: disable=import-outside-toplevel
|
|
except ImportError:
|
|
return None
|
|
try:
|
|
return metadata.version("circup")
|
|
except metadata.PackageNotFoundError:
|
|
return None
|
|
|
|
|
|
def get_dependencies(*requested_libraries, mod_names, to_install=()):
|
|
"""
|
|
Return a list of other CircuitPython libraries required by the given list
|
|
of libraries
|
|
|
|
:param tuple requested_libraries: The libraries to search for dependencies
|
|
:param object mod_names: All the modules metadata from bundle
|
|
:param list(str) to_install: Modules already selected for installation.
|
|
:return: tuple of module names to install which we build
|
|
"""
|
|
# pylint: disable=too-many-branches
|
|
# Internal variables
|
|
_to_install = to_install
|
|
_requested_libraries = []
|
|
_rl = requested_libraries[0]
|
|
|
|
if not requested_libraries[0]:
|
|
# If nothing is requested, we're done
|
|
return _to_install
|
|
|
|
for lib_name in _rl:
|
|
lower_lib_name = lib_name.lower()
|
|
if lower_lib_name in NOT_MCU_LIBRARIES:
|
|
logger.info(
|
|
"Skipping %s. It is not for microcontroller installs.", lib_name
|
|
)
|
|
else:
|
|
# Canonicalize, with some exceptions:
|
|
# adafruit-circuitpython-something => adafruit_something
|
|
canonical_lib_name = clean_library_name(lower_lib_name)
|
|
try:
|
|
# Don't process any names we can't find in mod_names
|
|
mod_names[canonical_lib_name] # pylint: disable=pointless-statement
|
|
_requested_libraries.append(canonical_lib_name)
|
|
except KeyError:
|
|
if canonical_lib_name not in WARNING_IGNORE_MODULES:
|
|
if os.path.exists(canonical_lib_name):
|
|
_requested_libraries.append(canonical_lib_name)
|
|
else:
|
|
click.secho(
|
|
f"WARNING:\n\t{canonical_lib_name} "
|
|
f"is not a known CircuitPython library.",
|
|
fg="yellow",
|
|
)
|
|
|
|
if not _requested_libraries:
|
|
# If nothing is requested, we're done
|
|
return _to_install
|
|
|
|
for library in list(_requested_libraries):
|
|
if library not in _to_install:
|
|
_to_install = _to_install + (library,)
|
|
# get the requirements.txt from bundle
|
|
try:
|
|
bundle = mod_names[library]["bundle"]
|
|
requirements_txt = bundle.requirements_for(library)
|
|
if requirements_txt:
|
|
_requested_libraries.extend(
|
|
libraries_from_requirements(requirements_txt)
|
|
)
|
|
|
|
circup_dependencies = get_circup_dependencies(bundle, library)
|
|
for circup_dependency in circup_dependencies:
|
|
_requested_libraries.append(circup_dependency)
|
|
except KeyError:
|
|
# don't check local file for further dependencies
|
|
pass
|
|
|
|
# we've processed this library, remove it from the list
|
|
_requested_libraries.remove(library)
|
|
|
|
return get_dependencies(
|
|
tuple(_requested_libraries), mod_names=mod_names, to_install=_to_install
|
|
)
|
|
|
|
|
|
def get_circup_dependencies(bundle, library):
|
|
"""
|
|
Get the list of circup dependencies from pyproject.toml
|
|
e.g.
|
|
[circup]
|
|
circup_dependencies = ["dependency_name_here"]
|
|
|
|
:param bundle: The Bundle to look within
|
|
:param library: The Library to find pyproject.toml for and get dependencies from
|
|
|
|
:return: The list of dependency libraries that were found
|
|
"""
|
|
try:
|
|
pyproj_toml = bundle.requirements_for(library, toml_file=True)
|
|
if pyproj_toml:
|
|
pyproj_toml_data = toml.loads(pyproj_toml)
|
|
dependencies = pyproj_toml_data["circup"]["circup_dependencies"]
|
|
if isinstance(dependencies, list):
|
|
return dependencies
|
|
|
|
if isinstance(dependencies, str):
|
|
return (dependencies,)
|
|
|
|
return tuple()
|
|
|
|
except KeyError:
|
|
# no circup_dependencies in pyproject.toml
|
|
return tuple()
|
|
|
|
|
|
def libraries_from_requirements(requirements):
|
|
"""
|
|
Clean up supplied requirements.txt and turn into tuple of CP libraries
|
|
|
|
:param str requirements: A string version of a requirements.txt
|
|
:return: tuple of library names
|
|
"""
|
|
libraries = ()
|
|
for line in requirements.split("\n"):
|
|
line = line.lower().strip()
|
|
if line.startswith("#") or line == "":
|
|
# skip comments
|
|
pass
|
|
else:
|
|
# Remove everything after any pip style version specifiers
|
|
line = re.split("[<>=~[;]", line)[0].strip()
|
|
libraries = libraries + (line,)
|
|
return libraries
|
|
|
|
|
|
def save_local_bundles(bundles_data):
|
|
"""
|
|
Save the list of local bundles to the settings.
|
|
|
|
:param str key: The bundle's identifier/key.
|
|
"""
|
|
if len(bundles_data) > 0:
|
|
with open(BUNDLE_CONFIG_LOCAL, "w", encoding="utf-8") as data:
|
|
json.dump(bundles_data, data)
|
|
else:
|
|
if os.path.isfile(BUNDLE_CONFIG_LOCAL):
|
|
os.unlink(BUNDLE_CONFIG_LOCAL)
|
|
|
|
|
|
def tags_data_save_tag(key, tag):
|
|
"""
|
|
Add or change the saved tag value for a bundle.
|
|
|
|
:param str key: The bundle's identifier/key.
|
|
:param str tag: The new tag for the bundle.
|
|
"""
|
|
tags_data = tags_data_load(logger)
|
|
tags_data[key] = tag
|
|
with open(BUNDLE_DATA, "w", encoding="utf-8") as data:
|
|
json.dump(tags_data, data)
|
|
|
|
|
|
def imports_from_code(full_content):
|
|
"""
|
|
Parse the given code.py file and return the imported libraries
|
|
Note that it's impossible at that level to differentiate between
|
|
import module.property and import module.submodule, so we try both
|
|
|
|
:param str full_content: Code to read imports from
|
|
:param str module_name: Name of the module the code is from
|
|
:return: sequence of library names
|
|
"""
|
|
# pylint: disable=too-many-branches
|
|
try:
|
|
par = ast.parse(full_content)
|
|
except (SyntaxError, ValueError) as err:
|
|
raise CodeParsingException(err) from err
|
|
|
|
imports = set()
|
|
for thing in ast.walk(par):
|
|
# import module and import module.submodule
|
|
if isinstance(thing, ast.Import):
|
|
for alias in thing.names:
|
|
imports.add(alias.name)
|
|
# from x import y
|
|
if isinstance(thing, ast.ImportFrom):
|
|
if thing.module:
|
|
# from [.][.]module import names
|
|
module = ("." * thing.level) + thing.module
|
|
imports.add(module)
|
|
for alias in thing.names:
|
|
imports.add(".".join([module, alias.name]))
|
|
else:
|
|
# from . import names
|
|
for alias in thing.names:
|
|
imports.add(alias.name)
|
|
|
|
# import parent modules (in practice it's the __init__.py)
|
|
for name in list(imports):
|
|
if "*" in name:
|
|
imports.remove(name)
|
|
continue
|
|
names = name.split(".")
|
|
for i in range(len(names)):
|
|
module = ".".join(names[: i + 1])
|
|
if module:
|
|
imports.add(module)
|
|
|
|
return sorted(imports)
|
|
|
|
|
|
def get_all_imports( # pylint: disable=too-many-arguments,too-many-locals, too-many-branches
|
|
backend, auto_file_content, auto_file_path, mod_names, current_module, visited=None
|
|
):
|
|
"""
|
|
Recursively retrieve imports from files on the backend
|
|
|
|
:param Backend backend: The current backend object
|
|
:param str auto_file_content: Content of the python file to analyse
|
|
:param str auto_file_path: Path to the python file to analyse
|
|
:param list mod_names: Lits of supported bundle mod names
|
|
:param str current_module: Name of the call context module if recursive call
|
|
:param set visited: Modules previously visited
|
|
:return: sequence of library names
|
|
"""
|
|
if visited is None:
|
|
visited = set()
|
|
visited.add(current_module)
|
|
|
|
requested_installs = []
|
|
try:
|
|
imports = imports_from_code(auto_file_content)
|
|
except CodeParsingException as err:
|
|
click.secho(f"Error parsing {current_module}:\n {err}", fg="red")
|
|
sys.exit(2)
|
|
|
|
for install in imports:
|
|
if install in visited:
|
|
continue
|
|
if install in mod_names:
|
|
requested_installs.append(install)
|
|
else:
|
|
# relative module paths
|
|
if install.startswith(".."):
|
|
install_module = ".".join(current_module.split(".")[:-2])
|
|
install_module = install_module + "." + install[2:]
|
|
elif install.startswith("."):
|
|
install_module = ".".join(current_module.split(".")[:-1])
|
|
install_module = install_module + "." + install[1:]
|
|
else:
|
|
install_module = install
|
|
# possible files for the module: .py or __init__.py (if directory)
|
|
file_name = os.path.join(*install_module.split(".")) + ".py"
|
|
try:
|
|
file_location = os.path.join(
|
|
*auto_file_path.replace(str(backend.device_location), "").split(
|
|
"/"
|
|
)[:-1]
|
|
)
|
|
|
|
full_location = os.path.join(file_location, file_name)
|
|
|
|
except TypeError:
|
|
# file is in root of CIRCUITPY
|
|
full_location = file_name
|
|
|
|
exists = backend.file_exists(full_location)
|
|
if not exists:
|
|
file_name = os.path.join(*install_module.split("."), "__init__.py")
|
|
full_location = file_name
|
|
exists = backend.file_exists(full_location)
|
|
if not exists:
|
|
continue
|
|
install_module += ".__init__"
|
|
# get the content and parse it recursively
|
|
auto_file_content = backend.get_file_content(full_location)
|
|
if auto_file_content:
|
|
sub_imports = get_all_imports(
|
|
backend,
|
|
auto_file_content,
|
|
auto_file_path,
|
|
mod_names,
|
|
install_module,
|
|
visited,
|
|
)
|
|
requested_installs.extend(sub_imports)
|
|
|
|
return sorted(requested_installs)
|
|
# [r for r in requested_installs if r in mod_names]
|
|
|
|
|
|
def libraries_from_auto_file(backend, auto_file, mod_names):
|
|
"""
|
|
Parse the input auto_file path and/or use the workflow to find the most
|
|
appropriate code.py script. Then return the list of imports
|
|
|
|
:param Backend backend: The current backend object
|
|
:param str auto_file: Path of the candidate auto file or None
|
|
:return: sequence of library names
|
|
"""
|
|
# find the current main file based on Circuitpython's rules
|
|
if auto_file is None:
|
|
root_files = [
|
|
file["name"] for file in backend.list_dir("") if not file["directory"]
|
|
]
|
|
for main_file in CODE_FILES:
|
|
if main_file in root_files:
|
|
auto_file = main_file
|
|
break
|
|
# still no code file found
|
|
if auto_file is None:
|
|
click.secho(
|
|
"No default code file found. See valid names:\n"
|
|
"https://docs.circuitpython.org/en/latest/README.html#behavior",
|
|
fg="red",
|
|
)
|
|
sys.exit(1)
|
|
|
|
# pass a local file with "./" or "../"
|
|
is_relative = auto_file.split(os.sep)[0] in [os.path.curdir, os.path.pardir]
|
|
if os.path.isabs(auto_file) or is_relative:
|
|
with open(auto_file, "r", encoding="UTF8") as fp:
|
|
auto_file_content = fp.read()
|
|
else:
|
|
auto_file_content = backend.get_file_content(auto_file)
|
|
|
|
if auto_file_content is None:
|
|
click.secho(f"Auto file not found: {auto_file}", fg="red")
|
|
sys.exit(1)
|
|
|
|
# from file name to module name (in case it's in a subpackage)
|
|
click.secho(f"Finding imports from: {auto_file}", fg="green")
|
|
current_module = auto_file.rstrip(".py").replace(os.path.sep, ".")
|
|
return get_all_imports(
|
|
backend, auto_file_content, auto_file, mod_names, current_module
|
|
)
|
|
|
|
|
|
def get_device_path(host, port, password, path):
|
|
"""
|
|
:param host Hostname or IP address.
|
|
:param password REST API password.
|
|
:param path File system path.
|
|
:return device URL or None if the device cannot be found.
|
|
"""
|
|
if path:
|
|
device_path = path
|
|
elif host:
|
|
# pylint: enable=no-member
|
|
device_path = f"http://:{password}@{host}:{port}"
|
|
else:
|
|
device_path = find_device()
|
|
return device_path
|
|
|
|
|
|
def sorted_by_directory_then_alpha(list_of_files):
|
|
"""
|
|
Sort the list of files into alphabetical seperated
|
|
with directories grouped together before files.
|
|
"""
|
|
dirs = {}
|
|
files = {}
|
|
|
|
for cur_file in list_of_files:
|
|
if cur_file["directory"]:
|
|
dirs[cur_file["name"]] = cur_file
|
|
else:
|
|
files[cur_file["name"]] = cur_file
|
|
|
|
sorted_dir_names = sorted(dirs.keys())
|
|
sorted_file_names = sorted(files.keys())
|
|
|
|
sorted_full_list = []
|
|
for cur_name in sorted_dir_names:
|
|
sorted_full_list.append(dirs[cur_name])
|
|
for cur_name in sorted_file_names:
|
|
sorted_full_list.append(files[cur_name])
|
|
|
|
return sorted_full_list
|