west: runners: Implement the west rtt command for openocd

This was non-trivial, as openocd is a bit weird to work with. Using only
commands passed with '-c' arguments, I couldn't get it to reliably resume
(or just not halt) the target when started. I tried using the 'sleep'
command, and various 'configure -event XX { resume }' events, but nothing
panned out, as it seems to always halt after all `-c` commands have been
run.

To avoid that, this waits for the TCL RPC port to be up, and sends a
resume command there. This works reliably.

Signed-off-by: Tobias Pisani <mail@topisani.dev>
This commit is contained in:
Tobias Pisani 2024-07-07 09:56:03 +02:00 committed by Anas Nashif
parent b3b8360f39
commit 18f45b5f06

View file

@ -6,8 +6,13 @@
'''Runner for openocd.'''
import subprocess
import re
import selectors
import shutil
import socket
import subprocess
import sys
import time
from os import path
from pathlib import Path
@ -23,6 +28,7 @@ from runners.core import ZephyrBinaryRunner, RunnerCaps
DEFAULT_OPENOCD_TCL_PORT = 6333
DEFAULT_OPENOCD_TELNET_PORT = 4444
DEFAULT_OPENOCD_GDB_PORT = 3333
DEFAULT_OPENOCD_RTT_PORT = 5555
DEFAULT_OPENOCD_RESET_HALT_CMD = 'reset init'
DEFAULT_OPENOCD_TARGET_HANDLE = "_TARGETNAME"
@ -39,7 +45,8 @@ class OpenOcdBinaryRunner(ZephyrBinaryRunner):
gdb_port=DEFAULT_OPENOCD_GDB_PORT,
gdb_client_port=DEFAULT_OPENOCD_GDB_PORT,
gdb_init=None, no_load=False,
target_handle=DEFAULT_OPENOCD_TARGET_HANDLE):
target_handle=DEFAULT_OPENOCD_TARGET_HANDLE,
rtt_port=DEFAULT_OPENOCD_RTT_PORT):
super().__init__(cfg)
if not path.exists(cfg.board_dir):
@ -97,6 +104,7 @@ class OpenOcdBinaryRunner(ZephyrBinaryRunner):
self.gdb_init = gdb_init
self.load_arg = [] if no_load else ['-ex', 'load']
self.target_handle = target_handle
self.rtt_port = rtt_port
@classmethod
def name(cls):
@ -104,7 +112,7 @@ class OpenOcdBinaryRunner(ZephyrBinaryRunner):
@classmethod
def capabilities(cls):
return RunnerCaps(commands={'flash', 'debug', 'debugserver', 'attach'})
return RunnerCaps(commands={'flash', 'debug', 'debugserver', 'attach', 'rtt'}, rtt=True)
@classmethod
def do_add_parser(cls, parser):
@ -165,6 +173,8 @@ class OpenOcdBinaryRunner(ZephyrBinaryRunner):
help=f'''Internal handle used in openocd targets cfg
files, defaults to "{DEFAULT_OPENOCD_TARGET_HANDLE}".
''')
parser.add_argument('--rtt-port', default=DEFAULT_OPENOCD_RTT_PORT,
help='openocd rtt port, defaults to 5555')
@classmethod
@ -180,7 +190,8 @@ class OpenOcdBinaryRunner(ZephyrBinaryRunner):
no_targets=args.no_targets, tcl_port=args.tcl_port,
telnet_port=args.telnet_port, gdb_port=args.gdb_port,
gdb_client_port=args.gdb_client_port, gdb_init=args.gdb_init,
no_load=args.no_load, target_handle=args.target_handle)
no_load=args.no_load, target_handle=args.target_handle,
rtt_port=args.rtt_port)
def print_gdbserver_message(self):
if not self.thread_info_enabled:
@ -242,6 +253,8 @@ class OpenOcdBinaryRunner(ZephyrBinaryRunner):
self.do_attach_debug(command, **kwargs)
elif command == 'load':
self.do_load(**kwargs)
elif command == 'rtt':
self.do_rtt(**kwargs)
else:
self.do_debugserver(**kwargs)
@ -256,7 +269,7 @@ class OpenOcdBinaryRunner(ZephyrBinaryRunner):
# them to POSIX style just to be sure.
hex_name = Path(self.cfg.hex_file).as_posix()
self.logger.info('Flashing file: {}'.format(hex_name))
self.logger.info(f'Flashing file: {hex_name}')
pre_init_cmd = []
pre_load_cmd = []
@ -347,17 +360,17 @@ class OpenOcdBinaryRunner(ZephyrBinaryRunner):
if self.thread_info_enabled and self.supports_thread_info():
pre_init_cmd.append("-c")
rtos_command = '${} configure -rtos Zephyr'.format(self.target_handle)
rtos_command = f'${self.target_handle} configure -rtos Zephyr'
pre_init_cmd.append(rtos_command)
server_cmd = (self.openocd_cmd + self.serial + self.cfg_cmd +
['-c', 'tcl_port {}'.format(self.tcl_port),
'-c', 'telnet_port {}'.format(self.telnet_port),
'-c', 'gdb_port {}'.format(self.gdb_port)] +
['-c', f'tcl_port {self.tcl_port}',
'-c', f'telnet_port {self.telnet_port}',
'-c', f'gdb_port {self.gdb_port}'] +
pre_init_cmd + self.init_arg + self.targets_arg +
self.halt_arg)
gdb_cmd = (self.gdb_cmd + self.tui_arg +
['-ex', 'target extended-remote :{}'.format(self.gdb_client_port),
['-ex', f'target extended-remote :{self.gdb_client_port}',
self.elf_name])
if command == 'debug':
gdb_cmd.extend(self.load_arg)
@ -378,14 +391,100 @@ class OpenOcdBinaryRunner(ZephyrBinaryRunner):
if self.thread_info_enabled and self.supports_thread_info():
pre_init_cmd.append("-c")
rtos_command = '${} configure -rtos Zephyr'.format(self.target_handle)
rtos_command = f'${self.target_handle} configure -rtos Zephyr'
pre_init_cmd.append(rtos_command)
cmd = (self.openocd_cmd + self.cfg_cmd +
['-c', 'tcl_port {}'.format(self.tcl_port),
'-c', 'telnet_port {}'.format(self.telnet_port),
'-c', 'gdb_port {}'.format(self.gdb_port)] +
['-c', f'tcl_port {self.tcl_port}',
'-c', f'telnet_port {self.telnet_port}',
'-c', f'gdb_port {self.gdb_port}'] +
pre_init_cmd + self.init_arg + self.targets_arg +
['-c', self.reset_halt_cmd])
self.print_gdbserver_message()
self.check_call(cmd)
def do_rtt(self, **kwargs):
pre_init_cmd = []
for i in self.pre_init:
pre_init_cmd.append("-c")
pre_init_cmd.append(i)
if self.thread_info_enabled and self.supports_thread_info():
pre_init_cmd.append("-c")
rtos_command = f'${self.target_handle} configure -rtos Zephyr'
pre_init_cmd.append(rtos_command)
rtt_address = self.get_rtt_address()
if rtt_address is None:
raise ValueError("RTT Control block not be found")
rtt_cmds = [
'-c', f'rtt setup 0x{rtt_address:x} 0x10 "SEGGER RTT"',
'-c', f'rtt server start {self.rtt_port} 0',
'-c', 'rtt start',
]
server_cmd = (self.openocd_cmd + self.cfg_cmd +
['-c', f'tcl_port {self.tcl_port}',
'-c', f'telnet_port {self.telnet_port}',
'-c', f'gdb_port {self.gdb_port}'] +
pre_init_cmd + self.init_arg + self.targets_arg +
['-c', self.reset_halt_cmd] +
rtt_cmds
)
self.print_gdbserver_message()
server_proc = self.popen_ignore_int(server_cmd)
# The target gets halted after all commands passed on the commandline are run.
# The only way to run resume here, to not have to connect a GDB, is to connect
# to the tcl port and run the command. When the TCL port comes up, initialization
# is done.
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# As long as the server process is still running, keep retrying the connection
while server_proc.poll() is None:
try:
sock.connect(('localhost', self.tcl_port))
break
except ConnectionRefusedError:
time.sleep(0.1)
# \x1a is the command terminator for the openocd tcl rpc
sock.send(b'resume\x1a')
sock.shutdown(socket.SHUT_RDWR)
# Run the client. Since rtt is initialized before the tcl rpc comes up,
# the port is open now.
self.logger.info("Opening RTT")
time.sleep(0.1) # Give the server a moment to output log messages first
self._run_rtt_client()
except Exception as e:
self.logger.error(e)
finally:
server_proc.terminate()
server_proc.wait()
def _run_rtt_client(self):
# If a `nc` command is available, run it, as it will provide the best support for
# CONFIG_SHELL_VT100_COMMANDS etc.
if shutil.which('nc') is not None:
client_cmd = ['nc', 'localhost', str(self.rtt_port)]
self.run_client(client_cmd)
return
# Otherwise, use a pure python implementation. This will work well for logging,
# but input is line based only.
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', self.rtt_port))
sel = selectors.DefaultSelector()
sel.register(sys.stdin, selectors.EVENT_READ)
sel.register(sock, selectors.EVENT_READ)
while True:
events = sel.select()
for key, _ in events:
if key.fileobj == sys.stdin:
text = sys.stdin.readline()
if text:
sock.send(text.encode())
elif key.fileobj == sock:
resp = sock.recv(2048)
if resp:
print(resp.decode())