circuitpython/tools/mpremote/mpremote/commands.py
Jim Mussared f13134e403 tools/mpremote: Fix argument handling for follow and add help strings.
Fixes in this commit are:
- Make --follow the default for "run" (accidentally changed in 68d094358).
- Add help strings for "repl": --capture --inject-file --inject-code
- Update help strings for "run".
- Fix encoding for --inject-code (accidentally broken in 68d094358).
- Remove ability to --no-follow for "eval". It was there previously because
  it shared the same code path with "exec" and "run", but makes no sense
  for "eval", so might as well remove.

This work was funded through GitHub Sponsors.

Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
2022-10-06 00:58:31 +11:00

237 lines
7.4 KiB
Python

import os
import sys
import tempfile
import serial.tools.list_ports
from . import pyboardextended as pyboard
class CommandError(Exception):
pass
def do_connect(state, args=None):
dev = args.device[0] if args else "auto"
do_disconnect(state)
try:
if dev == "list":
# List attached devices.
for p in sorted(serial.tools.list_ports.comports()):
print(
"{} {} {:04x}:{:04x} {} {}".format(
p.device,
p.serial_number,
p.vid if isinstance(p.vid, int) else 0,
p.pid if isinstance(p.pid, int) else 0,
p.manufacturer,
p.product,
)
)
# Don't do implicit REPL command.
state.did_action()
elif dev == "auto":
# Auto-detect and auto-connect to the first available device.
for p in sorted(serial.tools.list_ports.comports()):
try:
state.pyb = pyboard.PyboardExtended(p.device, baudrate=115200)
return
except pyboard.PyboardError as er:
if not er.args[0].startswith("failed to access"):
raise er
raise pyboard.PyboardError("no device found")
elif dev.startswith("id:"):
# Search for a device with the given serial number.
serial_number = dev[len("id:") :]
dev = None
for p in serial.tools.list_ports.comports():
if p.serial_number == serial_number:
state.pyb = pyboard.PyboardExtended(p.device, baudrate=115200)
return
raise pyboard.PyboardError("no device with serial number {}".format(serial_number))
else:
# Connect to the given device.
if dev.startswith("port:"):
dev = dev[len("port:") :]
state.pyb = pyboard.PyboardExtended(dev, baudrate=115200)
return
except pyboard.PyboardError as er:
msg = er.args[0]
if msg.startswith("failed to access"):
msg += " (it may be in use by another program)"
print(msg)
sys.exit(1)
def do_disconnect(state, _args=None):
if not state.pyb:
return
try:
if state.pyb.mounted:
if not state.pyb.in_raw_repl:
state.pyb.enter_raw_repl(soft_reset=False)
state.pyb.umount_local()
if state.pyb.in_raw_repl:
state.pyb.exit_raw_repl()
except OSError:
# Ignore any OSError exceptions when shutting down, eg:
# - pyboard.filesystem_command will close the connecton if it had an error
# - umounting will fail if serial port disappeared
pass
state.pyb.close()
state.pyb = None
state._auto_soft_reset = True
def show_progress_bar(size, total_size, op="copying"):
if not sys.stdout.isatty():
return
verbose_size = 2048
bar_length = 20
if total_size < verbose_size:
return
elif size >= total_size:
# Clear progress bar when copy completes
print("\r" + " " * (13 + len(op) + bar_length) + "\r", end="")
else:
bar = size * bar_length // total_size
progress = size * 100 // total_size
print(
"\r ... {} {:3d}% [{}{}]".format(op, progress, "#" * bar, "-" * (bar_length - bar)),
end="",
)
def do_filesystem(state, args):
state.ensure_raw_repl()
state.did_action()
def _list_recursive(files, path):
if os.path.isdir(path):
for entry in os.listdir(path):
_list_recursive(files, "/".join((path, entry)))
else:
files.append(os.path.split(path))
command = args.command[0]
paths = args.path
if command == "cat":
# Don't be verbose by default when using cat, so output can be
# redirected to something.
verbose = args.verbose == True
else:
verbose = args.verbose != False
if command == "cp" and args.recursive:
if paths[-1] != ":":
raise CommandError("'cp -r' destination must be ':'")
paths.pop()
src_files = []
for path in paths:
if path.startswith(":"):
raise CommandError("'cp -r' source files must be local")
_list_recursive(src_files, path)
known_dirs = {""}
state.pyb.exec_("import uos")
for dir, file in src_files:
dir_parts = dir.split("/")
for i in range(len(dir_parts)):
d = "/".join(dir_parts[: i + 1])
if d not in known_dirs:
state.pyb.exec_("try:\n uos.mkdir('%s')\nexcept OSError as e:\n print(e)" % d)
known_dirs.add(d)
pyboard.filesystem_command(
state.pyb,
["cp", "/".join((dir, file)), ":" + dir + "/"],
progress_callback=show_progress_bar,
verbose=verbose,
)
else:
if args.recursive:
raise CommandError("'-r' only supported for 'cp'")
try:
pyboard.filesystem_command(
state.pyb, [command] + paths, progress_callback=show_progress_bar, verbose=verbose
)
except OSError as er:
raise CommandError(er)
def do_edit(state, args):
state.ensure_raw_repl()
state.did_action()
if not os.getenv("EDITOR"):
raise pyboard.PyboardError("edit: $EDITOR not set")
for src in args.files:
src = src.lstrip(":")
dest_fd, dest = tempfile.mkstemp(suffix=os.path.basename(src))
try:
print("edit :%s" % (src,))
os.close(dest_fd)
state.pyb.fs_touch(src)
state.pyb.fs_get(src, dest, progress_callback=show_progress_bar)
if os.system("$EDITOR '%s'" % (dest,)) == 0:
state.pyb.fs_put(dest, src, progress_callback=show_progress_bar)
finally:
os.unlink(dest)
def _do_execbuffer(state, buf, follow):
state.ensure_raw_repl()
state.did_action()
try:
state.pyb.exec_raw_no_follow(buf)
if follow:
ret, ret_err = state.pyb.follow(timeout=None, data_consumer=pyboard.stdout_write_bytes)
if ret_err:
pyboard.stdout_write_bytes(ret_err)
sys.exit(1)
except pyboard.PyboardError as er:
print(er)
sys.exit(1)
except KeyboardInterrupt:
sys.exit(1)
def do_exec(state, args):
_do_execbuffer(state, args.expr[0], args.follow)
def do_eval(state, args):
buf = "print(" + args.expr[0] + ")"
_do_execbuffer(state, buf, True)
def do_run(state, args):
filename = args.path[0]
try:
with open(filename, "rb") as f:
buf = f.read()
except OSError:
raise CommandError(f"could not read file '{filename}'")
_do_execbuffer(state, buf, args.follow)
def do_mount(state, args):
state.ensure_raw_repl()
path = args.path[0]
state.pyb.mount_local(path, unsafe_links=args.unsafe_links)
print(f"Local directory {path} is mounted at /remote")
def do_umount(state, path):
state.ensure_raw_repl()
state.pyb.umount_local()
def do_resume(state, _args=None):
state._auto_soft_reset = False
def do_soft_reset(state, _args=None):
state.ensure_raw_repl(soft_reset=True)