tools/mpremote: Make filesystem commands use transport API.

This introduces a Python filesystem API on `Transport` that is implemented
entirely with eval/exec provided by the underlying transport subclass.

Updates existing mpremote filesystem commands (and `edit) to use this API.

Also re-implements recursive `cp` to allow arbitrary source / destination.

This work was funded through GitHub Sponsors.

Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
Signed-off-by: Damien George <damien@micropython.org>
This commit is contained in:
Jim Mussared 2023-06-09 14:05:44 +10:00 committed by Damien George
parent 1091021995
commit db59e55fe7
5 changed files with 379 additions and 305 deletions

View file

@ -4,8 +4,8 @@ import tempfile
import serial.tools.list_ports
from .transport import TransportError
from .transport_serial import SerialTransport, stdout_write_bytes
from .transport import TransportError, stdout_write_bytes
from .transport_serial import SerialTransport
class CommandError(Exception):
@ -106,61 +106,238 @@ def show_progress_bar(size, total_size, op="copying"):
)
def _remote_path_join(a, *b):
if not a:
a = "./"
result = a.rstrip("/")
for x in b:
result += "/" + x.strip("/")
return result
def _remote_path_dirname(a):
a = a.rsplit("/", 1)
if len(a) == 1:
return ""
else:
return a[0]
def _remote_path_basename(a):
return a.rsplit("/", 1)[-1]
def do_filesystem_cp(state, src, dest, multiple):
if dest.startswith(":"):
dest_exists = state.transport.fs_exists(dest[1:])
dest_isdir = dest_exists and state.transport.fs_isdir(dest[1:])
else:
dest_exists = os.path.exists(dest)
dest_isdir = dest_exists and os.path.isdir(dest)
if multiple:
if not dest_exists:
raise CommandError("cp: destination does not exist")
if not dest_isdir:
raise CommandError("cp: destination is not a directory")
# Download the contents of source.
try:
if src.startswith(":"):
data = state.transport.fs_readfile(src[1:], progress_callback=show_progress_bar)
filename = _remote_path_basename(src[1:])
else:
with open(src, "rb") as f:
data = f.read()
filename = os.path.basename(src)
except IsADirectoryError:
raise CommandError("cp: -r not specified; omitting directory")
# Write back to dest.
if dest.startswith(":"):
# If the destination path is just the directory, then add the source filename.
if dest_isdir:
dest = ":" + _remote_path_join(dest[1:], filename)
# Write to remote.
state.transport.fs_writefile(dest[1:], data, progress_callback=show_progress_bar)
else:
# If the destination path is just the directory, then add the source filename.
if dest_isdir:
dest = os.path.join(dest, filename)
# Write to local file.
with open(dest, "wb") as f:
f.write(data)
def do_filesystem_recursive_cp(state, src, dest, multiple):
# Ignore trailing / on both src and dest. (Unix cp ignores them too)
src = src.rstrip("/" + os.path.sep + (os.path.altsep if os.path.altsep else ""))
dest = dest.rstrip("/" + os.path.sep + (os.path.altsep if os.path.altsep else ""))
# If the destination directory exists, then we copy into it. Otherwise we
# use the destination as the target.
if dest.startswith(":"):
dest_exists = state.transport.fs_exists(dest[1:])
else:
dest_exists = os.path.exists(dest)
# Recursively find all files to copy from a directory.
# `dirs` will be a list of dest split paths.
# `files` will be a list of `(dest split path, src joined path)`.
dirs = []
files = []
# For example, if src=/tmp/foo, with /tmp/foo/x.py and /tmp/foo/a/b/c.py,
# and if the destination directory exists, then we will have:
# dirs = [['foo'], ['foo', 'a'], ['foo', 'a', 'b']]
# files = [(['foo', 'x.py'], '/tmp/foo/x.py'), (['foo', 'a', 'b', 'c.py'], '/tmp/foo/a/b/c.py')]
# If the destination doesn't exist, then we will have:
# dirs = [['a'], ['a', 'b']]
# files = [(['x.py'], '/tmp/foo/x.py'), (['a', 'b', 'c.py'], '/tmp/foo/a/b/c.py')]
def _list_recursive(base, src_path, dest_path, src_join_fun, src_isdir_fun, src_listdir_fun):
src_path_joined = src_join_fun(base, *src_path)
if src_isdir_fun(src_path_joined):
if dest_path:
dirs.append(dest_path)
for entry in src_listdir_fun(src_path_joined):
_list_recursive(
base,
src_path + [entry],
dest_path + [entry],
src_join_fun,
src_isdir_fun,
src_listdir_fun,
)
else:
files.append(
(
dest_path,
src_path_joined,
)
)
if src.startswith(":"):
src_dirname = [_remote_path_basename(src[1:])]
dest_dirname = src_dirname if dest_exists else []
_list_recursive(
_remote_path_dirname(src[1:]),
src_dirname,
dest_dirname,
src_join_fun=_remote_path_join,
src_isdir_fun=state.transport.fs_isdir,
src_listdir_fun=lambda p: [x.name for x in state.transport.fs_listdir(p)],
)
else:
src_dirname = [os.path.basename(src)]
dest_dirname = src_dirname if dest_exists else []
_list_recursive(
os.path.dirname(src),
src_dirname,
dest_dirname,
src_join_fun=os.path.join,
src_isdir_fun=os.path.isdir,
src_listdir_fun=os.listdir,
)
# If no directories were encountered then we must have just had a file.
if not dirs:
return do_filesystem_cp(state, src, dest, multiple)
def _mkdir(a, *b):
try:
if a.startswith(":"):
state.transport.fs_mkdir(_remote_path_join(a[1:], *b))
else:
os.mkdir(os.path.join(a, *b))
except FileExistsError:
pass
# Create the destination if necessary.
if not dest_exists:
_mkdir(dest)
# Create all sub-directories relative to the destination.
for d in dirs:
_mkdir(dest, *d)
# Copy all files, in sorted order to help it be deterministic.
files.sort()
for dest_path_split, src_path_joined in files:
if src.startswith(":"):
src_path_joined = ":" + src_path_joined
if dest.startswith(":"):
dest_path_joined = ":" + _remote_path_join(dest[1:], *dest_path_split)
else:
dest_path_joined = os.path.join(dest, *dest_path_split)
do_filesystem_cp(state, src_path_joined, dest_path_joined, multiple=False)
def do_filesystem(state, args):
state.ensure_raw_repl()
state.did_action()
def _list_recursive(files, path):
if os.path.isdir(path):
for entry in os.listdir(path):
_list_recursive(files, "/".join((path, entry)))
else:
files.append(os.path.split(path))
command = args.command[0]
paths = args.path
if command == "cat":
# Don't be verbose by default when using cat, so output can be
# redirected to something.
# Don't do verbose output for `cat` unless explicitly requested.
verbose = args.verbose is True
else:
verbose = args.verbose is not False
if command == "cp" and args.recursive:
if paths[-1] != ":":
raise CommandError("'cp -r' destination must be ':'")
paths.pop()
src_files = []
for path in paths:
if path.startswith(":"):
raise CommandError("'cp -r' source files must be local")
_list_recursive(src_files, path)
known_dirs = {""}
state.transport.exec("import os")
for dir, file in src_files:
dir_parts = dir.split("/")
for i in range(len(dir_parts)):
d = "/".join(dir_parts[: i + 1])
if d not in known_dirs:
state.transport.exec(
"try:\n os.mkdir('%s')\nexcept OSError as e:\n print(e)" % d
)
known_dirs.add(d)
state.transport.filesystem_command(
["cp", "/".join((dir, file)), ":" + dir + "/"],
progress_callback=show_progress_bar,
verbose=verbose,
)
if command == "cp":
# Note: cp requires the user to specify local/remote explicitly via
# leading ':'.
# The last argument must be the destination.
if len(paths) <= 1:
raise CommandError("cp: missing destination path")
cp_dest = paths[-1]
paths = paths[:-1]
else:
if args.recursive:
raise CommandError("'-r' only supported for 'cp'")
try:
state.transport.filesystem_command(
[command] + paths, progress_callback=show_progress_bar, verbose=verbose
)
except OSError as er:
raise CommandError(er)
# All other commands implicitly use remote paths. Strip the
# leading ':' if the user included them.
paths = [path[1:] if path.startswith(":") else path for path in paths]
# ls implicitly lists the cwd.
if command == "ls" and not paths:
paths = [""]
# Handle each path sequentially.
for path in paths:
if verbose:
if command == "cp":
print("{} {} {}".format(command, path, cp_dest))
else:
print("{} :{}".format(command, path))
if command == "cat":
state.transport.fs_printfile(path)
elif command == "ls":
for result in state.transport.fs_listdir(path):
print(
"{:12} {}{}".format(
result.st_size, result.name, "/" if result.st_mode & 0x4000 else ""
)
)
elif command == "mkdir":
state.transport.fs_mkdir(path)
elif command == "rm":
state.transport.fs_rmfile(path)
elif command == "rmdir":
state.transport.fs_rmdir(path)
elif command == "touch":
state.transport.fs_touchfile(path)
elif command == "cp":
if args.recursive:
do_filesystem_recursive_cp(state, path, cp_dest, len(paths) > 1)
else:
do_filesystem_cp(state, path, cp_dest, len(paths) > 1)
def do_edit(state, args):
@ -174,11 +351,15 @@ def do_edit(state, args):
dest_fd, dest = tempfile.mkstemp(suffix=os.path.basename(src))
try:
print("edit :%s" % (src,))
os.close(dest_fd)
state.transport.fs_touch(src)
state.transport.fs_get(src, dest, progress_callback=show_progress_bar)
state.transport.fs_touchfile(src)
data = state.transport.fs_readfile(src, progress_callback=show_progress_bar)
with open(dest_fd, "wb") as f:
f.write(data)
if os.system('%s "%s"' % (os.getenv("EDITOR"), dest)) == 0:
state.transport.fs_put(dest, src, progress_callback=show_progress_bar)
with open(dest, "rb") as f:
state.transport.fs_writefile(
src, f.read(), progress_callback=show_progress_bar
)
finally:
os.unlink(dest)

View file

@ -190,7 +190,7 @@ def argparse_filesystem():
"enable verbose output (defaults to True for all commands except cat)",
)
cmd_parser.add_argument(
"command", nargs=1, help="filesystem command (e.g. cat, cp, ls, rm, touch)"
"command", nargs=1, help="filesystem command (e.g. cat, cp, ls, rm, rmdir, touch)"
)
cmd_parser.add_argument("path", nargs="+", help="local and remote paths")
return cmd_parser

View file

@ -12,13 +12,10 @@ from .commands import CommandError, show_progress_bar
_PACKAGE_INDEX = "https://micropython.org/pi/v2"
_CHUNK_SIZE = 128
# This implements os.makedirs(os.dirname(path))
def _ensure_path_exists(transport, path):
import os
split = path.split("/")
# Handle paths starting with "/".
@ -34,22 +31,6 @@ def _ensure_path_exists(transport, path):
prefix += "/"
# Copy from src (stream) to dest (function-taking-bytes)
def _chunk(src, dest, length=None, op="downloading"):
buf = memoryview(bytearray(_CHUNK_SIZE))
total = 0
if length:
show_progress_bar(0, length, op)
while True:
n = src.readinto(buf)
if n == 0:
break
dest(buf if n == _CHUNK_SIZE else buf[:n])
total += n
if length:
show_progress_bar(total, length, op)
def _rewrite_url(url, branch=None):
if not branch:
branch = "HEAD"
@ -83,15 +64,10 @@ def _rewrite_url(url, branch=None):
def _download_file(transport, url, dest):
try:
with urllib.request.urlopen(url) as src:
fd, path = tempfile.mkstemp()
try:
print("Installing:", dest)
with os.fdopen(fd, "wb") as f:
_chunk(src, f.write, src.length)
_ensure_path_exists(transport, dest)
transport.fs_put(path, dest, progress_callback=show_progress_bar)
finally:
os.unlink(path)
data = src.read()
print("Installing:", dest)
_ensure_path_exists(transport, dest)
transport.fs_writefile(dest, data, progress_callback=show_progress_bar)
except urllib.error.HTTPError as e:
if e.status == 404:
raise CommandError(f"File not found: {url}")

View file

@ -24,10 +24,153 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import ast, os, sys
from collections import namedtuple
def stdout_write_bytes(b):
b = b.replace(b"\x04", b"")
if hasattr(sys.stdout, "buffer"):
sys.stdout.buffer.write(b)
sys.stdout.buffer.flush()
else:
text = b.decode(sys.stdout.encoding, "strict")
sys.stdout.write(text)
class TransportError(Exception):
pass
listdir_result = namedtuple("dir_result", ["name", "st_mode", "st_ino", "st_size"])
# Takes a Transport error (containing the text of an OSError traceback) and
# raises it as the corresponding OSError-derived exception.
def _convert_filesystem_error(e, info):
if len(e.args) >= 3:
if b"OSError" in e.args[2] and b"ENOENT" in e.args[2]:
return FileNotFoundError(info)
if b"OSError" in e.args[2] and b"EISDIR" in e.args[2]:
return IsADirectoryError(info)
if b"OSError" in e.args[2] and b"EEXIST" in e.args[2]:
return FileExistsError(info)
return e
class Transport:
pass
def fs_listdir(self, src=""):
buf = bytearray()
def repr_consumer(b):
buf.extend(b.replace(b"\x04", b""))
cmd = "import os\nfor f in os.ilistdir(%s):\n" " print(repr(f), end=',')" % (
("'%s'" % src) if src else ""
)
try:
buf.extend(b"[")
self.exec(cmd, data_consumer=repr_consumer)
buf.extend(b"]")
except TransportError as e:
raise _convert_filesystem_error(e, src) from None
return [
listdir_result(*f) if len(f) == 4 else listdir_result(*(f + (0,)))
for f in ast.literal_eval(buf.decode())
]
def fs_stat(self, src):
try:
self.exec("import os")
return os.stat_result(self.eval("os.stat(%s)" % ("'%s'" % src)))
except TransportError as e:
raise _convert_filesystem_error(e, src) from None
def fs_exists(self, src):
try:
self.fs_stat(src)
return True
except OSError:
return False
def fs_isdir(self, src):
try:
mode = self.fs_stat(src).st_mode
return (mode & 0x4000) != 0
except OSError:
# Match CPython, a non-existent path is not a directory.
return False
def fs_printfile(self, src, chunk_size=256):
cmd = (
"with open('%s') as f:\n while 1:\n"
" b=f.read(%u)\n if not b:break\n print(b,end='')" % (src, chunk_size)
)
try:
self.exec(cmd, data_consumer=stdout_write_bytes)
except TransportError as e:
raise _convert_filesystem_error(e, src) from None
def fs_readfile(self, src, chunk_size=256, progress_callback=None):
if progress_callback:
src_size = self.fs_stat(src).st_size
contents = bytearray()
try:
self.exec("f=open('%s','rb')\nr=f.read" % src)
while True:
chunk = self.eval("r({})".format(chunk_size))
if not chunk:
break
contents.extend(chunk)
if progress_callback:
progress_callback(len(contents), src_size)
self.exec("f.close()")
except TransportError as e:
raise _convert_filesystem_error(e, src) from None
return contents
def fs_writefile(self, dest, data, chunk_size=256, progress_callback=None):
if progress_callback:
src_size = len(data)
written = 0
try:
self.exec("f=open('%s','wb')\nw=f.write" % dest)
while data:
chunk = data[:chunk_size]
self.exec("w(" + repr(chunk) + ")")
written += len(chunk)
data = data[len(chunk) :]
if progress_callback:
progress_callback(written, src_size)
self.exec("f.close()")
except TransportError as e:
raise _convert_filesystem_error(e, dest) from None
def fs_mkdir(self, path):
try:
self.exec("import os\nos.mkdir('%s')" % path)
except TransportError as e:
raise _convert_filesystem_error(e, path) from None
def fs_rmdir(self, path):
try:
self.exec("import os\nos.rmdir('%s')" % path)
except TransportError as e:
raise _convert_filesystem_error(e, path) from None
def fs_rmfile(self, path):
try:
self.exec("import os\nos.remove('%s')" % path)
except TransportError as e:
raise _convert_filesystem_error(e, path) from None
def fs_touchfile(self, path):
try:
self.exec("f=open('%s','a')\nf.close()" % path)
except TransportError as e:
raise _convert_filesystem_error(e, path) from None

View file

@ -35,29 +35,12 @@
# Once the API is stabilised, the idea is that mpremote can be used both
# as a command line tool and a library for interacting with devices.
import ast, io, errno, os, re, struct, sys, time
from collections import namedtuple
import ast, io, os, re, struct, sys, time
from errno import EPERM
from .console import VT_ENABLED
from .transport import TransportError, Transport
def stdout_write_bytes(b):
b = b.replace(b"\x04", b"")
sys.stdout.buffer.write(b)
sys.stdout.buffer.flush()
listdir_result = namedtuple("dir_result", ["name", "st_mode", "st_ino", "st_size"])
def reraise_filesystem_error(e, info):
if len(e.args) >= 3:
if b"OSError" in e.args[2] and b"ENOENT" in e.args[2]:
raise FileNotFoundError(info)
raise
class SerialTransport(Transport):
def __init__(self, device, baudrate=115200, wait=0, exclusive=True):
self.in_raw_repl = False
@ -292,215 +275,6 @@ class SerialTransport(Transport):
pyfile = f.read()
return self.exec(pyfile)
def fs_exists(self, src):
try:
self.exec("import os\nos.stat(%s)" % (("'%s'" % src) if src else ""))
return True
except TransportError:
return False
def fs_ls(self, src):
cmd = (
"import os\nfor f in os.ilistdir(%s):\n"
" print('{:12} {}{}'.format(f[3]if len(f)>3 else 0,f[0],'/'if f[1]&0x4000 else ''))"
% (("'%s'" % src) if src else "")
)
self.exec(cmd, data_consumer=stdout_write_bytes)
def fs_listdir(self, src=""):
buf = bytearray()
def repr_consumer(b):
buf.extend(b.replace(b"\x04", b""))
cmd = "import os\nfor f in os.ilistdir(%s):\n" " print(repr(f), end=',')" % (
("'%s'" % src) if src else ""
)
try:
buf.extend(b"[")
self.exec(cmd, data_consumer=repr_consumer)
buf.extend(b"]")
except TransportError as e:
reraise_filesystem_error(e, src)
return [
listdir_result(*f) if len(f) == 4 else listdir_result(*(f + (0,)))
for f in ast.literal_eval(buf.decode())
]
def fs_stat(self, src):
try:
self.exec("import os")
return os.stat_result(self.eval("os.stat(%s)" % ("'%s'" % src)))
except TransportError as e:
reraise_filesystem_error(e, src)
def fs_cat(self, src, chunk_size=256):
cmd = (
"with open('%s') as f:\n while 1:\n"
" b=f.read(%u)\n if not b:break\n print(b,end='')" % (src, chunk_size)
)
self.exec(cmd, data_consumer=stdout_write_bytes)
def fs_readfile(self, src, chunk_size=256):
buf = bytearray()
def repr_consumer(b):
buf.extend(b.replace(b"\x04", b""))
cmd = (
"with open('%s', 'rb') as f:\n while 1:\n"
" b=f.read(%u)\n if not b:break\n print(b,end='')" % (src, chunk_size)
)
try:
self.exec(cmd, data_consumer=repr_consumer)
except TransportError as e:
reraise_filesystem_error(e, src)
return ast.literal_eval(buf.decode())
def fs_writefile(self, dest, data, chunk_size=256):
self.exec("f=open('%s','wb')\nw=f.write" % dest)
while data:
chunk = data[:chunk_size]
self.exec("w(" + repr(chunk) + ")")
data = data[len(chunk) :]
self.exec("f.close()")
def fs_cp(self, src, dest, chunk_size=256, progress_callback=None):
if progress_callback:
src_size = self.fs_stat(src).st_size
written = 0
self.exec("fr=open('%s','rb')\nr=fr.read\nfw=open('%s','wb')\nw=fw.write" % (src, dest))
while True:
data_len = int(self.exec("d=r(%u)\nw(d)\nprint(len(d))" % chunk_size))
if not data_len:
break
if progress_callback:
written += data_len
progress_callback(written, src_size)
self.exec("fr.close()\nfw.close()")
def fs_get(self, src, dest, chunk_size=256, progress_callback=None):
if progress_callback:
src_size = self.fs_stat(src).st_size
written = 0
self.exec("f=open('%s','rb')\nr=f.read" % src)
with open(dest, "wb") as f:
while True:
data = bytearray()
self.exec("print(r(%u))" % chunk_size, data_consumer=lambda d: data.extend(d))
assert data.endswith(b"\r\n\x04")
try:
data = ast.literal_eval(str(data[:-3], "ascii"))
if not isinstance(data, bytes):
raise ValueError("Not bytes")
except (UnicodeError, ValueError) as e:
raise TransportError("fs_get: Could not interpret received data: %s" % str(e))
if not data:
break
f.write(data)
if progress_callback:
written += len(data)
progress_callback(written, src_size)
self.exec("f.close()")
def fs_put(self, src, dest, chunk_size=256, progress_callback=None):
if progress_callback:
src_size = os.path.getsize(src)
written = 0
self.exec("f=open('%s','wb')\nw=f.write" % dest)
with open(src, "rb") as f:
while True:
data = f.read(chunk_size)
if not data:
break
if sys.version_info < (3,):
self.exec("w(b" + repr(data) + ")")
else:
self.exec("w(" + repr(data) + ")")
if progress_callback:
written += len(data)
progress_callback(written, src_size)
self.exec("f.close()")
def fs_mkdir(self, dir):
self.exec("import os\nos.mkdir('%s')" % dir)
def fs_rmdir(self, dir):
self.exec("import os\nos.rmdir('%s')" % dir)
def fs_rm(self, src):
self.exec("import os\nos.remove('%s')" % src)
def fs_touch(self, src):
self.exec("f=open('%s','a')\nf.close()" % src)
def filesystem_command(self, args, progress_callback=None, verbose=False):
def fname_remote(src):
if src.startswith(":"):
src = src[1:]
# Convert all path separators to "/", because that's what a remote device uses.
return src.replace(os.path.sep, "/")
def fname_cp_dest(src, dest):
_, src = os.path.split(src)
if dest is None or dest == "":
dest = src
elif dest == ".":
dest = "./" + src
elif dest.endswith("/"):
dest += src
return dest
cmd = args[0]
args = args[1:]
try:
if cmd == "cp":
srcs = args[:-1]
dest = args[-1]
if dest.startswith(":"):
op_remote_src = self.fs_cp
op_local_src = self.fs_put
else:
op_remote_src = self.fs_get
op_local_src = lambda src, dest, **_: __import__("shutil").copy(src, dest)
for src in srcs:
if verbose:
print("cp %s %s" % (src, dest))
if src.startswith(":"):
op = op_remote_src
else:
op = op_local_src
src2 = fname_remote(src)
dest2 = fname_cp_dest(src2, fname_remote(dest))
op(src2, dest2, progress_callback=progress_callback)
else:
ops = {
"cat": self.fs_cat,
"ls": self.fs_ls,
"mkdir": self.fs_mkdir,
"rm": self.fs_rm,
"rmdir": self.fs_rmdir,
"touch": self.fs_touch,
}
if cmd not in ops:
raise TransportError("'{}' is not a filesystem command".format(cmd))
if cmd == "ls" and not args:
args = [""]
for src in args:
src = fname_remote(src)
if verbose:
print("%s :%s" % (cmd, src))
ops[cmd](src)
except TransportError as er:
if len(er.args) > 1:
print(str(er.args[2], "ascii"))
else:
print(er)
self.exit_raw_repl()
self.close()
sys.exit(1)
def mount_local(self, path, unsafe_links=False):
fout = self.serial
if not self.eval('"RemoteFS" in globals()'):