Handle .mpy based modules. Fixes #8.

This commit is contained in:
Nicholas H.Tollervey 2019-09-23 17:45:29 +01:00
parent 3df011654d
commit b2a94e1ca1
No known key found for this signature in database
GPG key ID: FD2A04F69841B6FA
5 changed files with 221 additions and 115 deletions

221
circup.py
View file

@ -50,13 +50,15 @@ DATA_DIR = appdirs.user_data_dir(appname="circup", appauthor="adafruit")
#: The path to the JSON file containing the metadata about the current bundle.
BUNDLE_DATA = os.path.join(DATA_DIR, "circup.json")
#: The path to the zip file containing the current library bundle.
BUNDLE_ZIP = os.path.join(DATA_DIR, "adafruit-circuitpython-bundle-py.zip")
BUNDLE_ZIP = os.path.join(DATA_DIR, "adafruit-circuitpython-bundle-{}.zip")
#: The path to the directory into which the current bundle is unzipped.
BUNDLE_DIR = os.path.join(DATA_DIR, "adafruit_circuitpython_bundle")
BUNDLE_DIR = os.path.join(DATA_DIR, "adafruit_circuitpython_bundle_{}")
#: The directory containing the utility's log file.
LOG_DIR = appdirs.user_log_dir(appname="circup", appauthor="adafruit")
#: The location of the log file for the utility.
LOGFILE = os.path.join(LOG_DIR, "circup.log")
#: The version of CircuitPython found on the connected device.
CPY_VERSION = ""
# Ensure DATA_DIR / LOG_DIR related directories and files exist.
@ -93,9 +95,7 @@ class Module:
Represents a CircuitPython module.
"""
def __init__(
self, path, repo, device_version, bundle_version, bundle_path
):
def __init__(self, path, repo, device_version, bundle_version, mpy):
"""
The ``self.file`` and ``self.name`` attributes are constructed from
the ``path`` value. If the path is to a directory based module, the
@ -107,13 +107,13 @@ class Module:
:param str repo: The URL of the Git repository for this module.
:param str device_version: The semver value for the version on device.
:param str bundle_version: The semver value for the version in bundle.
:param str bundle_path: The path to the bundle version of the module.
:param bool mpy: Flag to indicate if the module is byte-code compiled.
"""
self.path = path
if os.path.isfile(self.path):
# Single file module.
self.file = os.path.basename(path)
self.name = self.file[:-3]
self.name = self.file.replace(".py", "").replace(".mpy", "")
else:
# Directory based module.
self.file = None
@ -121,7 +121,24 @@ class Module:
self.repo = repo
self.device_version = device_version
self.bundle_version = bundle_version
self.bundle_path = bundle_path
self.mpy = mpy
# Figure out the bundle path.
self.bundle_path = None
if self.mpy:
# Byte compiled, now check CircuitPython version.
major_version = CPY_VERSION.split(".")[0]
bundle_platform = "{}mpy".format(major_version)
else:
# Regular Python
bundle_platform = "py"
for path, subdirs, files in os.walk(
BUNDLE_DIR.format(bundle_platform)
):
if os.path.basename(path) == "lib":
if self.file:
self.bundle_path = os.path.join(path, self.file)
else:
self.bundle_path = os.path.join(path, self.name)
logger.info(self)
@property
@ -185,6 +202,7 @@ class Module:
"device_version": self.device_version,
"bundle_version": self.bundle_version,
"bundle_path": self.bundle_path,
"mpy": self.mpy,
}
)
@ -279,28 +297,64 @@ def get_latest_tag():
return tag
def extract_metadata(code):
def extract_metadata(path):
"""
Given some Adafruit library code, return a dictionary containing metadata
extracted from dunder attributes found therein.
Given an file path, return a dictionary containing metadata extracted from
dunder attributes found therein. Works with both *.py and *.mpy files.
Such metadata assignments should be simple and single-line. For example::
For Python source files, such metadata assignments should be simple and
single-line. For example::
__version__ = "1.1.4"
__repo__ = "https://github.com/adafruit/SomeLibrary.git"
:param str code: The source code containing the metadata.
:return: The dunder based metadata found in the code as a dictionary.
For byte compiled *.mpy files, a brute force / backtrack approach is used
to find the __version__ number in the file -- see comments in the
code for the implementation details.
:param str path: The path to the file containing the metadata.
:return: The dunder based metadata found in the file, as a dictionary.
"""
result = {}
lines = code.split("\n")
for line in lines:
if DUNDER_ASSIGN_RE.search(line):
exec(line, result)
if "__builtins__" in result:
del result["__builtins__"] # Side effect of using exec, not needed.
if result:
logger.info("Extracted metadata: {}".format(result))
if path.endswith(".py"):
result["mpy"] = False
with open(path, encoding="utf-8") as source_file:
content = source_file.read()
lines = content.split("\n")
for line in lines:
if DUNDER_ASSIGN_RE.search(line):
exec(line, result)
if "__builtins__" in result:
del result[
"__builtins__"
] # Side effect of using exec, not needed.
if result:
logger.info("Extracted metadata: {}".format(result))
return result
elif path.endswith(".mpy"):
result["mpy"] = True
with open(path, "rb") as mpy_file:
content = mpy_file.read()
# Find the start location of the "__version__" (prepended with byte
# value of 11 to indicate length of "__version__").
loc = content.find(b"\x0b__version__")
if loc > -1:
# Backtrack until a byte value of the offset is reached.
offset = 1
while offset < loc:
val = int(content[loc - offset])
if val == offset - 1: # Off by one..!
# Found version, extract the number given boundaries.
start = loc - offset + 1 # No need for prepended length.
end = loc # Up to the start of the __version__.
version = content[start:end] # Slice the version number.
# Create a string version as metadata in the result.
result = {
"__version__": version.decode("utf-8"),
"mpy": True,
}
break # Nothing more to do.
offset += 1 # ...and again but backtrack by one.
return result
@ -324,11 +378,9 @@ def find_modules():
repo = device_metadata.get("__repo__")
device_version = device_metadata.get("__version__")
bundle_version = bundle_metadata.get("__version__")
bundle_path = bundle_metadata["path"]
mpy = device_metadata["mpy"]
result.append(
Module(
path, repo, device_version, bundle_version, bundle_path
)
Module(path, repo, device_version, bundle_version, mpy)
)
return result
except Exception as ex:
@ -342,14 +394,16 @@ def find_modules():
def get_bundle_versions():
"""
Returns a dictionary of metadata from modules in the latest known release
of the library bundle.
of the library bundle. Uses the Python version (rather than the compiled
version) of the library modules.
:return: A dictionary of metadata about the modules available in the
library bundle.
"""
ensure_latest_bundle()
for path, subdirs, files in os.walk(BUNDLE_DIR):
if path.endswith("lib"):
path = None
for path, subdirs, files in os.walk(BUNDLE_DIR.format("py")):
if os.path.basename(path) == "lib":
break
return get_modules(path)
@ -390,37 +444,42 @@ def get_modules(path):
:param str path: The directory in which to find modules.
:return: A dictionary containing metadata about the found modules.
"""
single_file_mods = [
f
for f in glob.glob(os.path.join(path, "*.py"))
if not os.path.basename(f).startswith(".")
]
result = {}
if not path:
return result
single_file_py_mods = glob.glob(os.path.join(path, "*.py"))
single_file_mpy_mods = glob.glob(os.path.join(path, "*.mpy"))
directory_mods = [
d
for d in glob.glob(os.path.join(path, "*", ""))
if not os.path.basename(os.path.normpath(d)).startswith(".")
]
result = {}
for sfm in single_file_mods:
with open(sfm, encoding="utf-8") as source_file:
source_code = source_file.read()
metadata = extract_metadata(source_code)
metadata["path"] = sfm
result[os.path.basename(sfm)] = metadata
single_file_mods = single_file_py_mods + single_file_mpy_mods
for sfm in [
f for f in single_file_mods if not os.path.basename(f).startswith(".")
]:
metadata = extract_metadata(sfm)
metadata["path"] = sfm
result[
os.path.basename(sfm).replace(".py", "").replace(".mpy", "")
] = metadata
for dm in directory_mods:
name = os.path.basename(os.path.dirname(dm))
metadata = {}
for source in glob.glob(os.path.join(dm, "*.py")):
with open(source, encoding="utf-8") as source_file:
source_code = source_file.read()
metadata = extract_metadata(source_code)
py_files = glob.glob(os.path.join(dm, "*.py"))
mpy_files = glob.glob(os.path.join(dm, "*.mpy"))
all_files = py_files + mpy_files
for source in [
f for f in all_files if not os.path.basename(f).startswith(".")
]:
metadata = extract_metadata(source)
if "__version__" in metadata:
metadata["path"] = dm
result[name] = metadata
break
else:
# No version metadata found.
result[name] = {"path": dm}
result[name] = {"path": dm, "mpy": bool(mpy_files)}
return result
@ -458,29 +517,48 @@ def get_bundle(tag):
:return: The location of the resulting zip file in a temporary location on
the local filesystem.
"""
url = (
"https://github.com/adafruit/Adafruit_CircuitPython_Bundle"
"/releases/download"
"/{tag}/adafruit-circuitpython-bundle-py-{tag}.zip".format(tag=tag)
)
logger.info("Downloading bundle: {}".format(url))
r = requests.get(url, stream=True)
if r.status_code != requests.codes.ok:
logger.warning("Unable to connect to {}".format(url))
r.raise_for_status()
total_size = int(r.headers.get("Content-Length"))
urls = {
"py": (
"https://github.com/adafruit/Adafruit_CircuitPython_Bundle"
"/releases/download"
"/{tag}/adafruit-circuitpython-bundle-py-{tag}.zip".format(tag=tag)
),
"4mpy": (
"https://github.com/adafruit/Adafruit_CircuitPython_Bundle"
"/releases/download"
"/{tag}/adafruit-circuitpython-bundle-4.x-mpy-{tag}.zip".format(
tag=tag
)
),
"5mpy": (
"https://github.com/adafruit/Adafruit_CircuitPython_Bundle/"
"releases/download"
"/{tag}/adafruit-circuitpython-bundle-5.x-mpy-{tag}.zip".format(
tag=tag
)
),
}
click.echo("Downloading latest version information.\n")
with click.progressbar(
r.iter_content(1024), length=total_size
) as bar, open(BUNDLE_ZIP, "wb") as f:
for chunk in bar:
f.write(chunk)
bar.update(len(chunk))
logger.info("Saved to {}".format(BUNDLE_ZIP))
if os.path.isdir(BUNDLE_DIR):
shutil.rmtree(BUNDLE_DIR)
with zipfile.ZipFile(BUNDLE_ZIP, "r") as zfile:
zfile.extractall(BUNDLE_DIR)
for platform, url in urls.items():
logger.info("Downloading bundle: {}".format(url))
r = requests.get(url, stream=True)
if r.status_code != requests.codes.ok:
logger.warning("Unable to connect to {}".format(url))
r.raise_for_status()
total_size = int(r.headers.get("Content-Length"))
temp_zip = BUNDLE_ZIP.format(platform)
with click.progressbar(
r.iter_content(1024), length=total_size
) as bar, open(temp_zip, "wb") as f:
for chunk in bar:
f.write(chunk)
bar.update(len(chunk))
logger.info("Saved to {}".format(temp_zip))
temp_dir = BUNDLE_DIR.format(platform)
if os.path.isdir(temp_dir):
shutil.rmtree(temp_dir)
with zipfile.ZipFile(temp_zip, "r") as zfile:
zfile.extractall(temp_dir)
click.echo("\nOK\n")
@ -519,10 +597,11 @@ def main(verbose): # pragma: no cover
device_path = find_device()
if device_path is None:
raise IOError("Could not find a connected Adafruit device.")
current_version = get_circuitpython_version(device_path)
global CPY_VERSION
CPY_VERSION = get_circuitpython_version(device_path)
click.echo(
"Found device at {}, running CircuitPython {}.".format(
device_path, current_version
device_path, CPY_VERSION
)
)
cp_release = requests.get(
@ -530,7 +609,7 @@ def main(verbose): # pragma: no cover
)
latest_version = cp_release.url.split("/")[-1]
try:
if compare(current_version, latest_version) < 0:
if compare(CPY_VERSION, latest_version) < 0:
click.secho(
"A newer version of CircuitPython ({}) is available.".format(
latest_version

View file

@ -2,11 +2,13 @@
"adafruit_74hc595.py": {
"__version__": "1.0.2",
"__repo__": "https://github.com/adafruit/Adafruit_CircuitPython_74HC595.git",
"path": "/home/ntoll/.local/share/circup/adafruit_circuitpython_bundle/adafruit-circuitpython-bundle-py-20190903/lib/adafruit_74hc595.py"
"path": "/home/ntoll/.local/share/circup/adafruit_circuitpython_bundle/adafruit-circuitpython-bundle-py-20190903/lib/adafruit_74hc595.py",
"mpy": false
},
"adafruit_lsm303.py": {
"__version__": "1.2.5",
"__repo__": "https://github.com/adafruit/Adafruit_CircuitPython_LSM303.git",
"path": "/home/ntoll/.local/share/circup/adafruit_circuitpython_bundle/adafruit-circuitpython-bundle-py-20190903/lib/adafruit_lsm303.py"
"path": "/home/ntoll/.local/share/circup/adafruit_circuitpython_bundle/adafruit-circuitpython-bundle-py-20190903/lib/adafruit_lsm303.py",
"mpy": false
}
}

View file

@ -2,6 +2,7 @@
"adafruit_74hc595.py": {
"__version__": "1.0.2",
"__repo__": "https://github.com/adafruit/Adafruit_CircuitPython_74HC595.git",
"path": "/media/ntoll/CIRCUITPY/lib/adafruit_74hc595.py"
"path": "/media/ntoll/CIRCUITPY/lib/adafruit_74hc595.py",
"mpy": false
}
}

View file

@ -38,13 +38,12 @@ def test_Module_init_file_module():
repo = "https://github.com/adafruit/SomeLibrary.git"
device_version = "1.2.3"
bundle_version = "3.2.1"
bundle_path = os.path.join("baz", "bar", "foo", "module.py")
with mock.patch("circup.logger.info") as mock_logger, mock.patch(
"circup.os.path.isfile", return_value=True
):
m = circup.Module(
path, repo, device_version, bundle_version, bundle_path
)
), mock.patch("circup.CPY_VERSION", "4.1.2"), mock.patch(
"circup.os.walk", return_value=[["lib", "", ""]]
) as mock_walk:
m = circup.Module(path, repo, device_version, bundle_version, False)
mock_logger.assert_called_once_with(m)
assert m.path == path
assert m.file == "module.py"
@ -52,7 +51,9 @@ def test_Module_init_file_module():
assert m.repo == repo
assert m.device_version == device_version
assert m.bundle_version == bundle_version
assert m.bundle_path == bundle_path
assert m.bundle_path == os.path.join("lib", m.file)
assert m.mpy is False
mock_walk.assert_called_once_with(circup.BUNDLE_DIR.format("py"))
def test_Module_init_directory_module():
@ -64,13 +65,13 @@ def test_Module_init_directory_module():
repo = "https://github.com/adafruit/SomeLibrary.git"
device_version = "1.2.3"
bundle_version = "3.2.1"
bundle_path = os.path.join("baz", "bar", "foo", "")
mpy = True
with mock.patch("circup.logger.info") as mock_logger, mock.patch(
"circup.os.path.isfile", return_value=False
):
m = circup.Module(
path, repo, device_version, bundle_version, bundle_path
)
), mock.patch("circup.CPY_VERSION", "4.1.2"), mock.patch(
"circup.os.walk", return_value=[["lib", "", ""]]
) as mock_walk:
m = circup.Module(path, repo, device_version, bundle_version, mpy)
mock_logger.assert_called_once_with(m)
assert m.path == path
assert m.file is None
@ -78,7 +79,9 @@ def test_Module_init_directory_module():
assert m.repo == repo
assert m.device_version == device_version
assert m.bundle_version == bundle_version
assert m.bundle_path == bundle_path
assert m.bundle_path == os.path.join("lib", m.name)
assert m.mpy is True
mock_walk.assert_called_once_with(circup.BUNDLE_DIR.format("4mpy"))
def test_Module_outofdate():
@ -179,11 +182,10 @@ def test_Module_repr():
repo = "https://github.com/adafruit/SomeLibrary.git"
device_version = "1.2.3"
bundle_version = "3.2.1"
bundle_path = os.path.join("baz", "bar", "foo", "module.py")
with mock.patch("circup.os.path.isfile", return_value=True):
m = circup.Module(
path, repo, device_version, bundle_version, bundle_path
)
with mock.patch("circup.os.path.isfile", return_value=True), mock.patch(
"circup.CPY_VERSION", "4.1.2"
), mock.patch("circup.os.walk", return_value=[["lib", "", ""]]):
m = circup.Module(path, repo, device_version, bundle_version, False)
assert repr(m) == repr(
{
"path": path,
@ -192,7 +194,8 @@ def test_Module_repr():
"repo": repo,
"device_version": device_version,
"bundle_version": bundle_version,
"bundle_path": bundle_path,
"bundle_path": os.path.join("lib", m.file),
"mpy": False,
}
)
@ -304,7 +307,7 @@ def test_get_latest_tag():
mock_get.assert_called_once_with(expected_url)
def test_extract_metadata():
def test_extract_metadata_python():
"""
Ensure the dunder objects assigned in code are extracted into a Python
dictionary representing such metadata.
@ -315,10 +318,26 @@ def test_extract_metadata():
'__repo__ = "https://github.com/adafruit/SomeLibrary.git"\n'
'print("Hello, world!")\n'
)
result = circup.extract_metadata(code)
assert len(result) == 2
path = "foo.py"
with mock.patch(
"builtins.open", mock.mock_open(read_data=code)
) as mock_open:
result = circup.extract_metadata(path)
mock_open.assert_called_once_with(path, encoding="utf-8")
assert len(result) == 3
assert result["__version__"] == "1.1.4"
assert result["__repo__"] == "https://github.com/adafruit/SomeLibrary.git"
assert result["mpy"] is False
def test_extract_metadata_byte_code():
"""
Ensure the __version__ is correctly extracted from the bytecode ".mpy"
file. Version in test_module is 0.9.2
"""
result = circup.extract_metadata("tests/test_module.mpy")
assert result["__version__"] == "0.9.2"
assert result["mpy"] is True
def test_find_modules():
@ -368,7 +387,7 @@ def test_get_bundle_versions():
"circup.get_modules", return_value="ok"
) as mock_gm:
assert circup.get_bundle_versions() == "ok"
mock_walk.assert_called_once_with(circup.BUNDLE_DIR)
mock_walk.assert_called_once_with(circup.BUNDLE_DIR.format("py"))
mock_gm.assert_called_once_with("foo/bar/lib")
@ -401,6 +420,14 @@ def test_get_device_versions():
mock_gm.assert_called_once_with(os.path.join("CIRCUITPYTHON", "lib"))
def test_get_modules_empty_path():
"""
Sometimes a path to a device or bundle may be empty. Ensure, if this is the
case, an empty dictionary is returned.
"""
assert circup.get_modules("") == {}
def test_get_modules_that_are_files():
"""
Check the expected dictionary containing metadata is returned given the
@ -411,18 +438,18 @@ def test_get_modules_that_are_files():
os.path.join("tests", "local_module.py"),
os.path.join("tests", ".hidden_module.py"),
]
with mock.patch("circup.glob.glob", side_effect=[mods, []]):
with mock.patch("circup.glob.glob", side_effect=[mods, [], []]):
result = circup.get_modules(path)
assert len(result) == 1 # Hidden files are ignored.
assert "local_module.py" in result
assert result["local_module.py"]["path"] == os.path.join(
assert "local_module" in result
assert result["local_module"]["path"] == os.path.join(
"tests", "local_module.py"
)
assert (
result["local_module.py"]["__version__"] == "1.2.3"
result["local_module"]["__version__"] == "1.2.3"
) # from fixture.
repo = "https://github.com/adafruit/SomeLibrary.git" # from fixture.
assert result["local_module.py"]["__repo__"] == repo
assert result["local_module"]["__repo__"] == repo
def test_get_modules_that_are_directories():
@ -439,7 +466,9 @@ def test_get_modules_that_are_directories():
"tests/dir_module/my_module.py",
"tests/dir_module/__init__.py",
]
with mock.patch("circup.glob.glob", side_effect=[[], mods, mod_files]):
with mock.patch(
"circup.glob.glob", side_effect=[[], [], mods, mod_files, []]
):
result = circup.get_modules(path)
assert len(result) == 1
assert "dir_module" in result
@ -462,7 +491,9 @@ def test_get_modules_that_are_directories_with_no_metadata():
"tests/bad_module/my_module.py",
"tests/bad_module/__init__.py",
]
with mock.patch("circup.glob.glob", side_effect=[[], mods, mod_files]):
with mock.patch(
"circup.glob.glob", side_effect=[[], [], mods, mod_files, []]
):
result = circup.get_modules(path)
assert len(result) == 1
assert "bad_module" in result
@ -577,18 +608,11 @@ def test_get_bundle():
mock_requests.get.reset_mock()
tag = "12345"
circup.get_bundle(tag)
url = (
"https://github.com/adafruit/Adafruit_CircuitPython_Bundle"
"/releases/download"
"/{tag}/adafruit-circuitpython-bundle-py-{tag}.zip".format(tag=tag)
)
mock_requests.get.assert_called_once_with(url, stream=True)
mock_open.assert_called_once_with(circup.BUNDLE_ZIP, "wb")
mock_shutil.rmtree.assert_called_once_with(circup.BUNDLE_DIR)
mock_zipfile.ZipFile.assert_called_once_with(circup.BUNDLE_ZIP, "r")
mock_zipfile.ZipFile().__enter__().extractall.assert_called_once_with(
circup.BUNDLE_DIR
)
assert mock_requests.get.call_count == 3
assert mock_open.call_count == 3
assert mock_shutil.rmtree.call_count == 3
assert mock_zipfile.ZipFile.call_count == 3
assert mock_zipfile.ZipFile().__enter__().extractall.call_count == 3
def test_get_bundle_network_error():

BIN
tests/test_module.mpy Normal file

Binary file not shown.