Some core functionality with tests.
This commit is contained in:
parent
016f147cd1
commit
a87a9d0ae7
5 changed files with 240 additions and 46 deletions
100
circup.py
100
circup.py
|
|
@ -21,9 +21,12 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
"""
|
||||
import github
|
||||
import os
|
||||
import ctypes
|
||||
import glob
|
||||
import re
|
||||
from serial.tools.list_ports import comports as list_serial_ports
|
||||
import github
|
||||
from subprocess import check_output
|
||||
|
||||
|
||||
# IMPORTANT
|
||||
|
|
@ -47,15 +50,72 @@ DUNDER_ASSIGN_RE = re.compile(r"""^__\w+__\s*=\s*['"].+['"]$""")
|
|||
|
||||
def find_device():
|
||||
"""
|
||||
Returns a tuple containing the port's device and description for a
|
||||
connected Adafruit device. If no device is connected, the tuple will be
|
||||
(None, None).
|
||||
Return the location on the filesystem for the connected Adafruit device.
|
||||
This is based upon how Mu discovers this information.
|
||||
|
||||
:return: The path to the device on the local filesystem.
|
||||
"""
|
||||
ports = list_serial_ports()
|
||||
for port in ports:
|
||||
if port.vid == VENDOR_ID:
|
||||
return (port.device, port.description)
|
||||
return (None, None)
|
||||
device_dir = None
|
||||
# Attempt to find the path on the filesystem that represents the plugged in
|
||||
# CIRCUITPY board.
|
||||
if os.name == "posix":
|
||||
# Linux / OSX
|
||||
for mount_command in ["mount", "/sbin/mount"]:
|
||||
try:
|
||||
mount_output = check_output(mount_command).splitlines()
|
||||
mounted_volumes = [x.split()[2] for x in mount_output]
|
||||
for volume in mounted_volumes:
|
||||
if volume.endswith(b"CIRCUITPY"):
|
||||
device_dir = volume.decode("utf-8")
|
||||
except FileNotFoundError:
|
||||
next
|
||||
elif os.name == "nt":
|
||||
# Windows
|
||||
|
||||
def get_volume_name(disk_name):
|
||||
"""
|
||||
Each disk or external device connected to windows has an attribute
|
||||
called "volume name". This function returns the volume name for the
|
||||
given disk/device.
|
||||
|
||||
Based upon answer given here: http://stackoverflow.com/a/12056414
|
||||
"""
|
||||
vol_name_buf = ctypes.create_unicode_buffer(1024)
|
||||
ctypes.windll.kernel32.GetVolumeInformationW(
|
||||
ctypes.c_wchar_p(disk_name),
|
||||
vol_name_buf,
|
||||
ctypes.sizeof(vol_name_buf),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
0,
|
||||
)
|
||||
return vol_name_buf.value
|
||||
|
||||
#
|
||||
# In certain circumstances, volumes are allocated to USB
|
||||
# storage devices which cause a Windows popup to raise if their
|
||||
# volume contains no media. Wrapping the check in SetErrorMode
|
||||
# with SEM_FAILCRITICALERRORS (1) prevents this popup.
|
||||
#
|
||||
old_mode = ctypes.windll.kernel32.SetErrorMode(1)
|
||||
try:
|
||||
for disk in "ABCDEFGHIJKLMNOPQRSTUVWXYZ":
|
||||
path = "{}:\\".format(disk)
|
||||
if (
|
||||
os.path.exists(path)
|
||||
and get_volume_name(path) == "CIRCUITPY"
|
||||
):
|
||||
device_dir = path
|
||||
# Report only the FIRST device found.
|
||||
break
|
||||
finally:
|
||||
ctypes.windll.kernel32.SetErrorMode(old_mode)
|
||||
else:
|
||||
# No support for unknown operating systems.
|
||||
raise NotImplementedError('OS "{}" not supported.'.format(os.name))
|
||||
return device_dir
|
||||
|
||||
|
||||
def get_repos_file(repository, filename):
|
||||
|
|
@ -73,12 +133,10 @@ def get_repos_file(repository, filename):
|
|||
repos_path = "{}/{}".format(owner, repos_name.replace(".git", ""))
|
||||
# Reference the remote repository.
|
||||
gh = github.Github()
|
||||
try:
|
||||
repos = gh.get_repo(repos_path)
|
||||
except github.GithubException.UnknownObjectException:
|
||||
raise ValueError("Unknown repository.")
|
||||
repos = gh.get_repo(repos_path)
|
||||
# Return the content of filename.
|
||||
source = repos.get_contents(filename)
|
||||
return source.decoded_content
|
||||
return source.decoded_content.decode("utf-8")
|
||||
|
||||
|
||||
def extract_metadata(code):
|
||||
|
|
@ -95,17 +153,23 @@ def extract_metadata(code):
|
|||
:return: The dunder based metadata found in the code as a dictionary.
|
||||
"""
|
||||
result = {}
|
||||
lines = code.split()
|
||||
lines = code.split("\n")
|
||||
for line in lines:
|
||||
if DUNDER_ASSIGN_RE.search(line):
|
||||
exec(line, result)
|
||||
del result["__builtins__"] # Side effect of using exec, but not needed.
|
||||
return result
|
||||
|
||||
|
||||
def check_version(path):
|
||||
def find_modules():
|
||||
"""
|
||||
TODO: Finish this...
|
||||
Returns a list of paths to ``.py`` modules in the ``lib`` directory on a
|
||||
connected Adafruit device.
|
||||
"""
|
||||
device_path = find_device()
|
||||
if device_path is None:
|
||||
raise IOError("Could not find a connected Adafruit device.")
|
||||
return glob.glob(os.path.join(device_path, "lib", "*.py"))
|
||||
|
||||
|
||||
def main(): # pragma: no cover
|
||||
|
|
|
|||
2
setup.py
2
setup.py
|
|
@ -22,7 +22,7 @@ with open(os.path.join(base_dir, "CHANGES.rst"), encoding="utf8") as f:
|
|||
changes = f.read()
|
||||
|
||||
|
||||
install_requires = ["pyserial>=3.0,<4.0", "PyGithub>=1.43.8"]
|
||||
install_requires = ["PyGithub>=1.43.8", "semver>=2.8.1"]
|
||||
|
||||
extras_require = {
|
||||
"tests": [
|
||||
|
|
|
|||
7
tests/mount_exists.txt
Normal file
7
tests/mount_exists.txt
Normal file
|
|
@ -0,0 +1,7 @@
|
|||
sysfs on /sys type sysfs (rw,nosuid,nodev,noexec,relatime)
|
||||
proc on /proc type proc (rw,nosuid,nodev,noexec,relatime)
|
||||
udev on /dev type devtmpfs (rw,relatime,size=10240k,nr_inodes=489849,mode=755)
|
||||
devpts on /dev/pts type devpts (rw,nosuid,noexec,relatime,gid=5,mode=620,ptmxmode=000)
|
||||
tmpfs on /run type tmpfs (rw,nosuid,relatime,size=787732k,mode=755)
|
||||
/dev/mapper/heraclitus--vg-root on / type ext4 (rw,relatime,errors=remount-ro,data=ordered)
|
||||
/dev/sdb on /media/ntoll/CIRCUITPY type vfat (rw,nosuid,nodev,relatime,uid=1000,gid=1000,fmask=0022,dmask=0077,codepage=437,iocharset=utf8,shortname=mixed,showexec,utf8,flush,errors=remount-ro,uhelper=udisks2)
|
||||
6
tests/mount_missing.txt
Normal file
6
tests/mount_missing.txt
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
sysfs on /sys type sysfs (rw,nosuid,nodev,noexec,relatime)
|
||||
proc on /proc type proc (rw,nosuid,nodev,noexec,relatime)
|
||||
udev on /dev type devtmpfs (rw,relatime,size=10240k,nr_inodes=489849,mode=755)
|
||||
devpts on /dev/pts type devpts (rw,nosuid,noexec,relatime,gid=5,mode=620,ptmxmode=000)
|
||||
tmpfs on /run type tmpfs (rw,nosuid,relatime,size=787732k,mode=755)
|
||||
/dev/mapper/heraclitus--vg-root on / type ext4 (rw,relatime,errors=remount-ro,data=ordered)
|
||||
|
|
@ -21,40 +21,157 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
"""
|
||||
import os
|
||||
import circup
|
||||
import ctypes
|
||||
import pytest
|
||||
from unittest import mock
|
||||
|
||||
|
||||
def test_find_device():
|
||||
def test_find_device_posix_exists():
|
||||
"""
|
||||
Ensure the find_device function returns the expected information about
|
||||
any Adafruit devices connected to the user's computer.
|
||||
Simulate being on os.name == 'posix' and a call to "mount" returns a
|
||||
record indicating a connected device.
|
||||
"""
|
||||
|
||||
class FakePort:
|
||||
"""
|
||||
Pretends to be a representation of a port in PySerial.
|
||||
"""
|
||||
|
||||
def __init__(self, vid, device, description):
|
||||
self.vid = vid
|
||||
self.device = device
|
||||
self.description = description
|
||||
|
||||
device = "/dev/ttyACM3"
|
||||
description = "CircuitPlayground Express - CircuitPython CDC control"
|
||||
port = FakePort(circup.VENDOR_ID, device, description)
|
||||
ports = [port]
|
||||
with mock.patch("circup.list_serial_ports", return_value=ports):
|
||||
result = circup.find_device()
|
||||
assert result == (device, description)
|
||||
with open("tests/mount_exists.txt", "rb") as fixture_file:
|
||||
fixture = fixture_file.read()
|
||||
with mock.patch("os.name", "posix"):
|
||||
with mock.patch("circup.check_output", return_value=fixture):
|
||||
assert circup.find_device() == "/media/ntoll/CIRCUITPY"
|
||||
|
||||
|
||||
def test_find_device_not_connected():
|
||||
def test_find_device_posix_no_mount_command():
|
||||
"""
|
||||
If no Adafruit device is connected to the user's computer, ensure the
|
||||
result is (None, None)
|
||||
When the user doesn't have administrative privileges on OSX then the mount
|
||||
command isn't on their path. In which case, check circup uses the more
|
||||
explicit /sbin/mount instead.
|
||||
"""
|
||||
with mock.patch("circup.list_serial_ports", return_value=[]):
|
||||
result = circup.find_device()
|
||||
assert result == (None, None)
|
||||
with open("tests/mount_exists.txt", "rb") as fixture_file:
|
||||
fixture = fixture_file.read()
|
||||
mock_check = mock.MagicMock(side_effect=[FileNotFoundError, fixture])
|
||||
with mock.patch("os.name", "posix"), mock.patch(
|
||||
"circup.check_output", mock_check
|
||||
):
|
||||
assert circup.find_device() == "/media/ntoll/CIRCUITPY"
|
||||
assert mock_check.call_count == 2
|
||||
assert mock_check.call_args_list[0][0][0] == "mount"
|
||||
assert mock_check.call_args_list[1][0][0] == "/sbin/mount"
|
||||
|
||||
|
||||
def test_find_device_posix_missing():
|
||||
"""
|
||||
Simulate being on os.name == 'posix' and a call to "mount" returns no
|
||||
records associated with an Adafruit device.
|
||||
"""
|
||||
with open("tests/mount_missing.txt", "rb") as fixture_file:
|
||||
fixture = fixture_file.read()
|
||||
with mock.patch("os.name", "posix"), mock.patch(
|
||||
"circup.check_output", return_value=fixture
|
||||
):
|
||||
assert circup.find_device() is None
|
||||
|
||||
|
||||
def test_find_device_nt_exists():
|
||||
"""
|
||||
Simulate being on os.name == 'nt' and a disk with a volume name 'CIRCUITPY'
|
||||
exists indicating a connected device.
|
||||
"""
|
||||
mock_windll = mock.MagicMock()
|
||||
mock_windll.kernel32 = mock.MagicMock()
|
||||
mock_windll.kernel32.GetVolumeInformationW = mock.MagicMock()
|
||||
mock_windll.kernel32.GetVolumeInformationW.return_value = None
|
||||
fake_buffer = ctypes.create_unicode_buffer("CIRCUITPY")
|
||||
with mock.patch("os.name", "nt"), mock.patch(
|
||||
"os.path.exists", return_value=True
|
||||
), mock.patch("ctypes.create_unicode_buffer", return_value=fake_buffer):
|
||||
ctypes.windll = mock_windll
|
||||
assert circup.find_device() == "A:\\"
|
||||
|
||||
|
||||
def test_find_device_nt_missing():
|
||||
"""
|
||||
Simulate being on os.name == 'nt' and a disk with a volume name 'CIRCUITPY'
|
||||
does not exist for a device.
|
||||
"""
|
||||
mock_windll = mock.MagicMock()
|
||||
mock_windll.kernel32 = mock.MagicMock()
|
||||
mock_windll.kernel32.GetVolumeInformationW = mock.MagicMock()
|
||||
mock_windll.kernel32.GetVolumeInformationW.return_value = None
|
||||
fake_buffer = ctypes.create_unicode_buffer(1024)
|
||||
with mock.patch("os.name", "nt"), mock.patch(
|
||||
"os.path.exists", return_value=True
|
||||
), mock.patch("ctypes.create_unicode_buffer", return_value=fake_buffer):
|
||||
ctypes.windll = mock_windll
|
||||
assert circup.find_device() is None
|
||||
|
||||
|
||||
def test_find_device_unknown_os():
|
||||
"""
|
||||
Raises a NotImplementedError if the host OS is not supported.
|
||||
"""
|
||||
with mock.patch("os.name", "foo"):
|
||||
with pytest.raises(NotImplementedError) as ex:
|
||||
circup.find_device()
|
||||
assert ex.value.args[0] == 'OS "foo" not supported.'
|
||||
|
||||
|
||||
def test_get_repos_file():
|
||||
"""
|
||||
Ensure the repository path and filename are handled in such a way to create
|
||||
the expected and correct calls to the GitHub API.
|
||||
"""
|
||||
repository = "https://github.com/adafruit/SomeLibrary.git"
|
||||
filename = "somelibrary.py"
|
||||
mock_github = mock.MagicMock() # Mock away the API shim.
|
||||
mock_repos = mock.MagicMock() # Mock repository object.
|
||||
mock_source = mock.MagicMock() # Mock source file.
|
||||
mock_github.get_repo.return_value = mock_repos
|
||||
mock_repos.get_contents.return_value = mock_source
|
||||
mock_source.decoded_content = b"# Python content of the file\n"
|
||||
with mock.patch("circup.github.Github", return_value=mock_github):
|
||||
result = circup.get_repos_file(repository, filename)
|
||||
assert result == mock_source.decoded_content.decode("utf-8")
|
||||
mock_github.get_repo.assert_called_once_with("adafruit/SomeLibrary")
|
||||
mock_repos.get_contents.assert_called_once_with(filename)
|
||||
|
||||
|
||||
def test_extract_metadata():
|
||||
"""
|
||||
Ensure the dunder objects assigned in code are extracted into a Python
|
||||
dictionary representing such metadata.
|
||||
"""
|
||||
code = (
|
||||
"# A comment\n"
|
||||
'__version__ = "1.1.4"\n'
|
||||
'__repo__ = "https://github.com/adafruit/SomeLibrary.git"\n'
|
||||
'print("Hello, world!")\n'
|
||||
)
|
||||
result = circup.extract_metadata(code)
|
||||
assert len(result) == 2
|
||||
assert result["__version__"] == "1.1.4"
|
||||
assert result["__repo__"] == "https://github.com/adafruit/SomeLibrary.git"
|
||||
|
||||
|
||||
def test_find_modules():
|
||||
"""
|
||||
Ensure the result of the glob.glob call is returned, and the call is made
|
||||
with the expected path.
|
||||
"""
|
||||
glob_result = ["module1.py", "module2.py"]
|
||||
with mock.patch("circup.find_device", return_value="foo"), mock.patch(
|
||||
"circup.glob.glob", return_value=glob_result
|
||||
) as mock_glob:
|
||||
circup.find_modules()
|
||||
mock_glob.assert_called_once_with(os.path.join("foo", "lib", "*.py"))
|
||||
|
||||
|
||||
def test_find_modules_no_device_connected():
|
||||
"""
|
||||
Ensure an IOError is raised if there's no connected device which can be
|
||||
checked.
|
||||
"""
|
||||
with mock.patch("circup.find_device", return_value=None), pytest.raises(
|
||||
IOError
|
||||
) as ex:
|
||||
circup.find_modules()
|
||||
assert ex.value.args[0] == "Could find a connected Adafruit device."
|
||||
|
|
|
|||
Loading…
Reference in a new issue