Merge pull request #12 from ntoll/mpy

Add handling of mpy modules
This commit is contained in:
Nicholas Tollervey 2019-09-23 18:57:48 +01:00 committed by GitHub
commit 20edfab5ae
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 274 additions and 121 deletions

View file

@ -18,7 +18,6 @@ all:
clean:
rm -rf build
rm -rf dist
rm -rf circup.egg-info
rm -rf .coverage
rm -rf .eggs
rm -rf .pytest_cache

259
circup.py
View file

@ -50,13 +50,15 @@ DATA_DIR = appdirs.user_data_dir(appname="circup", appauthor="adafruit")
#: The path to the JSON file containing the metadata about the current bundle.
BUNDLE_DATA = os.path.join(DATA_DIR, "circup.json")
#: The path to the zip file containing the current library bundle.
BUNDLE_ZIP = os.path.join(DATA_DIR, "adafruit-circuitpython-bundle-py.zip")
BUNDLE_ZIP = os.path.join(DATA_DIR, "adafruit-circuitpython-bundle-{}.zip")
#: The path to the directory into which the current bundle is unzipped.
BUNDLE_DIR = os.path.join(DATA_DIR, "adafruit_circuitpython_bundle")
BUNDLE_DIR = os.path.join(DATA_DIR, "adafruit_circuitpython_bundle_{}")
#: The directory containing the utility's log file.
LOG_DIR = appdirs.user_log_dir(appname="circup", appauthor="adafruit")
#: The location of the log file for the utility.
LOGFILE = os.path.join(LOG_DIR, "circup.log")
#: The version of CircuitPython found on the connected device.
CPY_VERSION = ""
# Ensure DATA_DIR / LOG_DIR related directories and files exist.
@ -93,9 +95,7 @@ class Module:
Represents a CircuitPython module.
"""
def __init__(
self, path, repo, device_version, bundle_version, bundle_path
):
def __init__(self, path, repo, device_version, bundle_version, mpy):
"""
The ``self.file`` and ``self.name`` attributes are constructed from
the ``path`` value. If the path is to a directory based module, the
@ -107,13 +107,13 @@ class Module:
:param str repo: The URL of the Git repository for this module.
:param str device_version: The semver value for the version on device.
:param str bundle_version: The semver value for the version in bundle.
:param str bundle_path: The path to the bundle version of the module.
:param bool mpy: Flag to indicate if the module is byte-code compiled.
"""
self.path = path
if os.path.isfile(self.path):
# Single file module.
self.file = os.path.basename(path)
self.name = self.file[:-3]
self.name = self.file.replace(".py", "").replace(".mpy", "")
else:
# Directory based module.
self.file = None
@ -121,7 +121,24 @@ class Module:
self.repo = repo
self.device_version = device_version
self.bundle_version = bundle_version
self.bundle_path = bundle_path
self.mpy = mpy
# Figure out the bundle path.
self.bundle_path = None
if self.mpy:
# Byte compiled, now check CircuitPython version.
major_version = CPY_VERSION.split(".")[0]
bundle_platform = "{}mpy".format(major_version)
else:
# Regular Python
bundle_platform = "py"
for path, subdirs, files in os.walk(
BUNDLE_DIR.format(bundle_platform)
):
if os.path.basename(path) == "lib":
if self.file:
self.bundle_path = os.path.join(path, self.file)
else:
self.bundle_path = os.path.join(path, self.name)
logger.info(self)
@property
@ -185,6 +202,7 @@ class Module:
"device_version": self.device_version,
"bundle_version": self.bundle_version,
"bundle_path": self.bundle_path,
"mpy": self.mpy,
}
)
@ -279,28 +297,64 @@ def get_latest_tag():
return tag
def extract_metadata(code):
def extract_metadata(path):
"""
Given some Adafruit library code, return a dictionary containing metadata
extracted from dunder attributes found therein.
Given an file path, return a dictionary containing metadata extracted from
dunder attributes found therein. Works with both *.py and *.mpy files.
Such metadata assignments should be simple and single-line. For example::
For Python source files, such metadata assignments should be simple and
single-line. For example::
__version__ = "1.1.4"
__repo__ = "https://github.com/adafruit/SomeLibrary.git"
:param str code: The source code containing the metadata.
:return: The dunder based metadata found in the code as a dictionary.
For byte compiled *.mpy files, a brute force / backtrack approach is used
to find the __version__ number in the file -- see comments in the
code for the implementation details.
:param str path: The path to the file containing the metadata.
:return: The dunder based metadata found in the file, as a dictionary.
"""
result = {}
lines = code.split("\n")
for line in lines:
if DUNDER_ASSIGN_RE.search(line):
exec(line, result)
if "__builtins__" in result:
del result["__builtins__"] # Side effect of using exec, not needed.
if result:
logger.info("Extracted metadata: {}".format(result))
if path.endswith(".py"):
result["mpy"] = False
with open(path, encoding="utf-8") as source_file:
content = source_file.read()
lines = content.split("\n")
for line in lines:
if DUNDER_ASSIGN_RE.search(line):
exec(line, result)
if "__builtins__" in result:
del result[
"__builtins__"
] # Side effect of using exec, not needed.
if result:
logger.info("Extracted metadata: {}".format(result))
return result
elif path.endswith(".mpy"):
result["mpy"] = True
with open(path, "rb") as mpy_file:
content = mpy_file.read()
# Find the start location of the "__version__" (prepended with byte
# value of 11 to indicate length of "__version__").
loc = content.find(b"\x0b__version__")
if loc > -1:
# Backtrack until a byte value of the offset is reached.
offset = 1
while offset < loc:
val = int(content[loc - offset])
if val == offset - 1: # Off by one..!
# Found version, extract the number given boundaries.
start = loc - offset + 1 # No need for prepended length.
end = loc # Up to the start of the __version__.
version = content[start:end] # Slice the version number.
# Create a string version as metadata in the result.
result = {
"__version__": version.decode("utf-8"),
"mpy": True,
}
break # Nothing more to do.
offset += 1 # ...and again but backtrack by one.
return result
@ -324,11 +378,9 @@ def find_modules():
repo = device_metadata.get("__repo__")
device_version = device_metadata.get("__version__")
bundle_version = bundle_metadata.get("__version__")
bundle_path = bundle_metadata["path"]
mpy = device_metadata["mpy"]
result.append(
Module(
path, repo, device_version, bundle_version, bundle_path
)
Module(path, repo, device_version, bundle_version, mpy)
)
return result
except Exception as ex:
@ -342,18 +394,37 @@ def find_modules():
def get_bundle_versions():
"""
Returns a dictionary of metadata from modules in the latest known release
of the library bundle.
of the library bundle. Uses the Python version (rather than the compiled
version) of the library modules.
:return: A dictionary of metadata about the modules available in the
library bundle.
"""
ensure_latest_bundle()
for path, subdirs, files in os.walk(BUNDLE_DIR):
if path.endswith("lib"):
path = None
for path, subdirs, files in os.walk(BUNDLE_DIR.format("py")):
if os.path.basename(path) == "lib":
break
return get_modules(path)
def get_circuitpython_version(device_path):
"""
Returns the version number of CircuitPython running on the board connected
via ``device_path``. This is obtained from the ``boot_out.txt`` file on the
device, whose content will start with something like this::
Adafruit CircuitPython 4.1.0 on 2019-08-02;
:param str device_path: The path to the connected board.
:return: The version string for CircuitPython running on the connected
board.
"""
with open(os.path.join(device_path, "boot_out.txt")) as boot:
circuit_python, board = boot.read().split(";")
return circuit_python.split(" ")[-3]
def get_device_versions():
"""
Returns a dictionary of metadata from modules on the connected device.
@ -362,8 +433,6 @@ def get_device_versions():
connected device.
"""
device_path = find_device()
if device_path is None:
raise IOError("Could not find a connected Adafruit device.")
return get_modules(os.path.join(device_path, "lib"))
@ -375,37 +444,42 @@ def get_modules(path):
:param str path: The directory in which to find modules.
:return: A dictionary containing metadata about the found modules.
"""
single_file_mods = [
f
for f in glob.glob(os.path.join(path, "*.py"))
if not os.path.basename(f).startswith(".")
]
result = {}
if not path:
return result
single_file_py_mods = glob.glob(os.path.join(path, "*.py"))
single_file_mpy_mods = glob.glob(os.path.join(path, "*.mpy"))
directory_mods = [
d
for d in glob.glob(os.path.join(path, "*", ""))
if not os.path.basename(os.path.normpath(d)).startswith(".")
]
result = {}
for sfm in single_file_mods:
with open(sfm, encoding="utf-8") as source_file:
source_code = source_file.read()
metadata = extract_metadata(source_code)
metadata["path"] = sfm
result[os.path.basename(sfm)] = metadata
single_file_mods = single_file_py_mods + single_file_mpy_mods
for sfm in [
f for f in single_file_mods if not os.path.basename(f).startswith(".")
]:
metadata = extract_metadata(sfm)
metadata["path"] = sfm
result[
os.path.basename(sfm).replace(".py", "").replace(".mpy", "")
] = metadata
for dm in directory_mods:
name = os.path.basename(os.path.dirname(dm))
metadata = {}
for source in glob.glob(os.path.join(dm, "*.py")):
with open(source, encoding="utf-8") as source_file:
source_code = source_file.read()
metadata = extract_metadata(source_code)
py_files = glob.glob(os.path.join(dm, "*.py"))
mpy_files = glob.glob(os.path.join(dm, "*.mpy"))
all_files = py_files + mpy_files
for source in [
f for f in all_files if not os.path.basename(f).startswith(".")
]:
metadata = extract_metadata(source)
if "__version__" in metadata:
metadata["path"] = dm
result[name] = metadata
break
else:
# No version metadata found.
result[name] = {"path": dm}
result[name] = {"path": dm, "mpy": bool(mpy_files)}
return result
@ -443,29 +517,48 @@ def get_bundle(tag):
:return: The location of the resulting zip file in a temporary location on
the local filesystem.
"""
url = (
"https://github.com/adafruit/Adafruit_CircuitPython_Bundle"
"/releases/download"
"/{tag}/adafruit-circuitpython-bundle-py-{tag}.zip".format(tag=tag)
)
logger.info("Downloading bundle: {}".format(url))
r = requests.get(url, stream=True)
if r.status_code != requests.codes.ok:
logger.warning("Unable to connect to {}".format(url))
r.raise_for_status()
total_size = int(r.headers.get("Content-Length"))
urls = {
"py": (
"https://github.com/adafruit/Adafruit_CircuitPython_Bundle"
"/releases/download"
"/{tag}/adafruit-circuitpython-bundle-py-{tag}.zip".format(tag=tag)
),
"4mpy": (
"https://github.com/adafruit/Adafruit_CircuitPython_Bundle"
"/releases/download"
"/{tag}/adafruit-circuitpython-bundle-4.x-mpy-{tag}.zip".format(
tag=tag
)
),
"5mpy": (
"https://github.com/adafruit/Adafruit_CircuitPython_Bundle/"
"releases/download"
"/{tag}/adafruit-circuitpython-bundle-5.x-mpy-{tag}.zip".format(
tag=tag
)
),
}
click.echo("Downloading latest version information.\n")
with click.progressbar(
r.iter_content(1024), length=total_size
) as bar, open(BUNDLE_ZIP, "wb") as f:
for chunk in bar:
f.write(chunk)
bar.update(len(chunk))
logger.info("Saved to {}".format(BUNDLE_ZIP))
if os.path.isdir(BUNDLE_DIR):
shutil.rmtree(BUNDLE_DIR)
with zipfile.ZipFile(BUNDLE_ZIP, "r") as zfile:
zfile.extractall(BUNDLE_DIR)
for platform, url in urls.items():
logger.info("Downloading bundle: {}".format(url))
r = requests.get(url, stream=True)
if r.status_code != requests.codes.ok:
logger.warning("Unable to connect to {}".format(url))
r.raise_for_status()
total_size = int(r.headers.get("Content-Length"))
temp_zip = BUNDLE_ZIP.format(platform)
with click.progressbar(
r.iter_content(1024), length=total_size
) as bar, open(temp_zip, "wb") as f:
for chunk in bar:
f.write(chunk)
bar.update(len(chunk))
logger.info("Saved to {}".format(temp_zip))
temp_dir = BUNDLE_DIR.format(platform)
if os.path.isdir(temp_dir):
shutil.rmtree(temp_dir)
with zipfile.ZipFile(temp_zip, "r") as zfile:
zfile.extractall(temp_dir)
click.echo("\nOK\n")
@ -501,6 +594,32 @@ def main(verbose): # pragma: no cover
logger.addHandler(verbose_handler)
click.echo("Logging to {}\n".format(LOGFILE))
logger.info("### Started {}".format(datetime.now()))
device_path = find_device()
if device_path is None:
click.secho("Could not find a connected Adafruit device.", fg="red")
sys.exit(1)
global CPY_VERSION
CPY_VERSION = get_circuitpython_version(device_path)
click.echo(
"Found device at {}, running CircuitPython {}.".format(
device_path, CPY_VERSION
)
)
cp_release = requests.get(
"https://github.com/adafruit/circuitpython/releases/latest", timeout=2
)
latest_version = cp_release.url.split("/")[-1]
try:
if compare(CPY_VERSION, latest_version) < 0:
click.secho(
"A newer version of CircuitPython ({}) is available.".format(
latest_version
),
fg="green",
)
except ValueError as ex:
logger.warning("CircuitPython has incorrect semver value.")
logger.warning(ex)
@main.command()

View file

@ -2,11 +2,13 @@
"adafruit_74hc595.py": {
"__version__": "1.0.2",
"__repo__": "https://github.com/adafruit/Adafruit_CircuitPython_74HC595.git",
"path": "/home/ntoll/.local/share/circup/adafruit_circuitpython_bundle/adafruit-circuitpython-bundle-py-20190903/lib/adafruit_74hc595.py"
"path": "/home/ntoll/.local/share/circup/adafruit_circuitpython_bundle/adafruit-circuitpython-bundle-py-20190903/lib/adafruit_74hc595.py",
"mpy": false
},
"adafruit_lsm303.py": {
"__version__": "1.2.5",
"__repo__": "https://github.com/adafruit/Adafruit_CircuitPython_LSM303.git",
"path": "/home/ntoll/.local/share/circup/adafruit_circuitpython_bundle/adafruit-circuitpython-bundle-py-20190903/lib/adafruit_lsm303.py"
"path": "/home/ntoll/.local/share/circup/adafruit_circuitpython_bundle/adafruit-circuitpython-bundle-py-20190903/lib/adafruit_lsm303.py",
"mpy": false
}
}

View file

@ -2,6 +2,7 @@
"adafruit_74hc595.py": {
"__version__": "1.0.2",
"__repo__": "https://github.com/adafruit/Adafruit_CircuitPython_74HC595.git",
"path": "/media/ntoll/CIRCUITPY/lib/adafruit_74hc595.py"
"path": "/media/ntoll/CIRCUITPY/lib/adafruit_74hc595.py",
"mpy": false
}
}

View file

@ -38,13 +38,12 @@ def test_Module_init_file_module():
repo = "https://github.com/adafruit/SomeLibrary.git"
device_version = "1.2.3"
bundle_version = "3.2.1"
bundle_path = os.path.join("baz", "bar", "foo", "module.py")
with mock.patch("circup.logger.info") as mock_logger, mock.patch(
"circup.os.path.isfile", return_value=True
):
m = circup.Module(
path, repo, device_version, bundle_version, bundle_path
)
), mock.patch("circup.CPY_VERSION", "4.1.2"), mock.patch(
"circup.os.walk", return_value=[["lib", "", ""]]
) as mock_walk:
m = circup.Module(path, repo, device_version, bundle_version, False)
mock_logger.assert_called_once_with(m)
assert m.path == path
assert m.file == "module.py"
@ -52,7 +51,9 @@ def test_Module_init_file_module():
assert m.repo == repo
assert m.device_version == device_version
assert m.bundle_version == bundle_version
assert m.bundle_path == bundle_path
assert m.bundle_path == os.path.join("lib", m.file)
assert m.mpy is False
mock_walk.assert_called_once_with(circup.BUNDLE_DIR.format("py"))
def test_Module_init_directory_module():
@ -64,13 +65,13 @@ def test_Module_init_directory_module():
repo = "https://github.com/adafruit/SomeLibrary.git"
device_version = "1.2.3"
bundle_version = "3.2.1"
bundle_path = os.path.join("baz", "bar", "foo", "")
mpy = True
with mock.patch("circup.logger.info") as mock_logger, mock.patch(
"circup.os.path.isfile", return_value=False
):
m = circup.Module(
path, repo, device_version, bundle_version, bundle_path
)
), mock.patch("circup.CPY_VERSION", "4.1.2"), mock.patch(
"circup.os.walk", return_value=[["lib", "", ""]]
) as mock_walk:
m = circup.Module(path, repo, device_version, bundle_version, mpy)
mock_logger.assert_called_once_with(m)
assert m.path == path
assert m.file is None
@ -78,7 +79,9 @@ def test_Module_init_directory_module():
assert m.repo == repo
assert m.device_version == device_version
assert m.bundle_version == bundle_version
assert m.bundle_path == bundle_path
assert m.bundle_path == os.path.join("lib", m.name)
assert m.mpy is True
mock_walk.assert_called_once_with(circup.BUNDLE_DIR.format("4mpy"))
def test_Module_outofdate():
@ -179,11 +182,10 @@ def test_Module_repr():
repo = "https://github.com/adafruit/SomeLibrary.git"
device_version = "1.2.3"
bundle_version = "3.2.1"
bundle_path = os.path.join("baz", "bar", "foo", "module.py")
with mock.patch("circup.os.path.isfile", return_value=True):
m = circup.Module(
path, repo, device_version, bundle_version, bundle_path
)
with mock.patch("circup.os.path.isfile", return_value=True), mock.patch(
"circup.CPY_VERSION", "4.1.2"
), mock.patch("circup.os.walk", return_value=[["lib", "", ""]]):
m = circup.Module(path, repo, device_version, bundle_version, False)
assert repr(m) == repr(
{
"path": path,
@ -192,7 +194,8 @@ def test_Module_repr():
"repo": repo,
"device_version": device_version,
"bundle_version": bundle_version,
"bundle_path": bundle_path,
"bundle_path": os.path.join("lib", m.file),
"mpy": False,
}
)
@ -304,7 +307,7 @@ def test_get_latest_tag():
mock_get.assert_called_once_with(expected_url)
def test_extract_metadata():
def test_extract_metadata_python():
"""
Ensure the dunder objects assigned in code are extracted into a Python
dictionary representing such metadata.
@ -315,10 +318,26 @@ def test_extract_metadata():
'__repo__ = "https://github.com/adafruit/SomeLibrary.git"\n'
'print("Hello, world!")\n'
)
result = circup.extract_metadata(code)
assert len(result) == 2
path = "foo.py"
with mock.patch(
"builtins.open", mock.mock_open(read_data=code)
) as mock_open:
result = circup.extract_metadata(path)
mock_open.assert_called_once_with(path, encoding="utf-8")
assert len(result) == 3
assert result["__version__"] == "1.1.4"
assert result["__repo__"] == "https://github.com/adafruit/SomeLibrary.git"
assert result["mpy"] is False
def test_extract_metadata_byte_code():
"""
Ensure the __version__ is correctly extracted from the bytecode ".mpy"
file. Version in test_module is 0.9.2
"""
result = circup.extract_metadata("tests/test_module.mpy")
assert result["__version__"] == "0.9.2"
assert result["mpy"] is True
def test_find_modules():
@ -368,10 +387,28 @@ def test_get_bundle_versions():
"circup.get_modules", return_value="ok"
) as mock_gm:
assert circup.get_bundle_versions() == "ok"
mock_walk.assert_called_once_with(circup.BUNDLE_DIR)
mock_walk.assert_called_once_with(circup.BUNDLE_DIR.format("py"))
mock_gm.assert_called_once_with("foo/bar/lib")
def test_get_circuitpython_version():
"""
Given valid content of a boot_out.txt file on a connected device, return
the version number of CircuitPython running on the board.
"""
data = (
"Adafruit CircuitPython 4.1.0 on 2019-08-02; "
"Adafruit CircuitPlayground Express with samd21g18"
)
mock_open = mock.mock_open(read_data=data)
device_path = "device"
with mock.patch("builtins.open", mock_open):
assert circup.get_circuitpython_version(device_path) == "4.1.0"
mock_open.assert_called_once_with(
os.path.join(device_path, "boot_out.txt")
)
def test_get_device_versions():
"""
Ensure get_modules is called with the path for the attached device.
@ -383,14 +420,12 @@ def test_get_device_versions():
mock_gm.assert_called_once_with(os.path.join("CIRCUITPYTHON", "lib"))
def test_get_device_versions_go_bang():
def test_get_modules_empty_path():
"""
If it's not possible to find a connected device, ensure an IOError is
raised.
Sometimes a path to a device or bundle may be empty. Ensure, if this is the
case, an empty dictionary is returned.
"""
with mock.patch("circup.find_device", return_value=None):
with pytest.raises(IOError):
circup.get_device_versions()
assert circup.get_modules("") == {}
def test_get_modules_that_are_files():
@ -403,18 +438,18 @@ def test_get_modules_that_are_files():
os.path.join("tests", "local_module.py"),
os.path.join("tests", ".hidden_module.py"),
]
with mock.patch("circup.glob.glob", side_effect=[mods, []]):
with mock.patch("circup.glob.glob", side_effect=[mods, [], []]):
result = circup.get_modules(path)
assert len(result) == 1 # Hidden files are ignored.
assert "local_module.py" in result
assert result["local_module.py"]["path"] == os.path.join(
assert "local_module" in result
assert result["local_module"]["path"] == os.path.join(
"tests", "local_module.py"
)
assert (
result["local_module.py"]["__version__"] == "1.2.3"
result["local_module"]["__version__"] == "1.2.3"
) # from fixture.
repo = "https://github.com/adafruit/SomeLibrary.git" # from fixture.
assert result["local_module.py"]["__repo__"] == repo
assert result["local_module"]["__repo__"] == repo
def test_get_modules_that_are_directories():
@ -431,7 +466,9 @@ def test_get_modules_that_are_directories():
"tests/dir_module/my_module.py",
"tests/dir_module/__init__.py",
]
with mock.patch("circup.glob.glob", side_effect=[[], mods, mod_files]):
with mock.patch(
"circup.glob.glob", side_effect=[[], [], mods, mod_files, []]
):
result = circup.get_modules(path)
assert len(result) == 1
assert "dir_module" in result
@ -454,7 +491,9 @@ def test_get_modules_that_are_directories_with_no_metadata():
"tests/bad_module/my_module.py",
"tests/bad_module/__init__.py",
]
with mock.patch("circup.glob.glob", side_effect=[[], mods, mod_files]):
with mock.patch(
"circup.glob.glob", side_effect=[[], [], mods, mod_files, []]
):
result = circup.get_modules(path)
assert len(result) == 1
assert "bad_module" in result
@ -569,18 +608,11 @@ def test_get_bundle():
mock_requests.get.reset_mock()
tag = "12345"
circup.get_bundle(tag)
url = (
"https://github.com/adafruit/Adafruit_CircuitPython_Bundle"
"/releases/download"
"/{tag}/adafruit-circuitpython-bundle-py-{tag}.zip".format(tag=tag)
)
mock_requests.get.assert_called_once_with(url, stream=True)
mock_open.assert_called_once_with(circup.BUNDLE_ZIP, "wb")
mock_shutil.rmtree.assert_called_once_with(circup.BUNDLE_DIR)
mock_zipfile.ZipFile.assert_called_once_with(circup.BUNDLE_ZIP, "r")
mock_zipfile.ZipFile().__enter__().extractall.assert_called_once_with(
circup.BUNDLE_DIR
)
assert mock_requests.get.call_count == 3
assert mock_open.call_count == 3
assert mock_shutil.rmtree.call_count == 3
assert mock_zipfile.ZipFile.call_count == 3
assert mock_zipfile.ZipFile().__enter__().extractall.call_count == 3
def test_get_bundle_network_error():

BIN
tests/test_module.mpy Normal file

Binary file not shown.