Add test suite using python-can for testing Controller Area Network (CAN) communication between a host PC and a device under test running Zephyr. Signed-off-by: Henrik Brix Andersen <hebad@vestas.com>
197 lines
6.8 KiB
Python
197 lines
6.8 KiB
Python
# Copyright (c) 2024 Vestas Wind Systems A/S
|
|
#
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
|
|
"""
|
|
Zephyr CAN shell module support for providing a python-can bus interface for testing.
|
|
"""
|
|
|
|
import re
|
|
import logging
|
|
from typing import Optional, Tuple
|
|
|
|
from can import BusABC, CanProtocol, Message
|
|
from can.exceptions import CanInitializationError, CanOperationError
|
|
from can.typechecking import CanFilters
|
|
|
|
from twister_harness import DeviceAdapter, Shell
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class CanShellBus(BusABC): # pylint: disable=abstract-method
|
|
"""
|
|
A CAN interface using the Zephyr CAN shell module.
|
|
"""
|
|
|
|
def __init__(self, dut: DeviceAdapter, shell: Shell, channel: str,
|
|
can_filters: Optional[CanFilters] = None, **kwargs) -> None:
|
|
self._dut = dut
|
|
self._shell = shell
|
|
self._device = channel
|
|
self._is_filtered = False
|
|
self._filter_ids = []
|
|
|
|
self.channel_info = f'Zephyr CAN shell, device "{self._device}"'
|
|
|
|
mode = 'normal'
|
|
if 'fd' in self._get_capabilities():
|
|
self._can_protocol = CanProtocol.CAN_FD
|
|
mode += ' fd'
|
|
else:
|
|
self._can_protocol = CanProtocol.CAN_20
|
|
|
|
self._set_mode(mode)
|
|
self._start()
|
|
|
|
super().__init__(channel=channel, can_filters=can_filters, **kwargs)
|
|
|
|
def _retval(self):
|
|
"""Get return value of last shell command."""
|
|
return int(self._shell.get_filtered_output(self._shell.exec_command('retval'))[0])
|
|
|
|
def _get_capabilities(self) -> list[str]:
|
|
cmd = f'can show {self._device}'
|
|
|
|
lines = self._shell.get_filtered_output(self._shell.exec_command(cmd))
|
|
regex_compiled = re.compile(r'capabilities:\s+(?P<caps>.*)')
|
|
for line in lines:
|
|
m = regex_compiled.match(line)
|
|
if m:
|
|
return m.group('caps').split()
|
|
|
|
raise CanOperationError('capabilities not found')
|
|
|
|
def _set_mode(self, mode: str) -> None:
|
|
self._shell.exec_command(f'can mode {self._device} {mode}')
|
|
retval = self._retval()
|
|
if retval != 0:
|
|
raise CanOperationError(f'failed to set mode "{mode}" (err {retval})')
|
|
|
|
def _start(self):
|
|
self._shell.exec_command(f'can start {self._device}')
|
|
retval = self._retval()
|
|
if retval != 0:
|
|
raise CanInitializationError(f'failed to start (err {retval})')
|
|
|
|
def _stop(self):
|
|
self._shell.exec_command(f'can stop {self._device}')
|
|
|
|
def send(self, msg: Message, timeout: Optional[float] = None) -> None:
|
|
logger.debug('sending: %s', msg)
|
|
|
|
cmd = f'can send {self._device}'
|
|
cmd += ' -e' if msg.is_extended_id else ''
|
|
cmd += ' -r' if msg.is_remote_frame else ''
|
|
cmd += ' -f' if msg.is_fd else ''
|
|
cmd += ' -b' if msg.bitrate_switch else ''
|
|
|
|
if msg.is_extended_id:
|
|
cmd += f' {msg.arbitration_id:08x}'
|
|
else:
|
|
cmd += f' {msg.arbitration_id:03x}'
|
|
|
|
if msg.data:
|
|
cmd += ' ' + msg.data.hex(' ', 1)
|
|
|
|
lines = self._shell.exec_command(cmd)
|
|
regex_compiled = re.compile(r'enqueuing\s+CAN\s+frame\s+#(?P<id>\d+)')
|
|
frame_num = None
|
|
for line in lines:
|
|
m = regex_compiled.match(line)
|
|
if m:
|
|
frame_num = m.group('id')
|
|
break
|
|
|
|
if frame_num is None:
|
|
raise CanOperationError('frame not enqueued')
|
|
|
|
tx_regex = r'CAN\s+frame\s+#' + frame_num + r'\s+successfully\s+sent'
|
|
self._dut.readlines_until(regex=tx_regex, timeout=timeout)
|
|
|
|
def _add_filter(self, can_id: int, can_mask: int, extended: bool) -> None:
|
|
"""Add RX filter."""
|
|
cmd = f'can filter add {self._device}'
|
|
cmd += ' -e' if extended else ''
|
|
|
|
if extended:
|
|
cmd += f' {can_id:08x}'
|
|
cmd += f' {can_mask:08x}'
|
|
else:
|
|
cmd += f' {can_id:03x}'
|
|
cmd += f' {can_mask:03x}'
|
|
|
|
lines = self._shell.exec_command(cmd)
|
|
regex_compiled = re.compile(r'filter\s+ID:\s+(?P<id>\d+)')
|
|
for line in lines:
|
|
m = regex_compiled.match(line)
|
|
if m:
|
|
filter_id = int(m.group('id'))
|
|
self._filter_ids.append(filter_id)
|
|
return
|
|
|
|
raise CanOperationError('filter_id not found')
|
|
|
|
def _remove_filter(self, filter_id: int) -> None:
|
|
"""Remove RX filter."""
|
|
if filter_id in self._filter_ids:
|
|
self._filter_ids.remove(filter_id)
|
|
|
|
self._shell.exec_command(f'can filter remove {self._device} {filter_id}')
|
|
retval = self._retval()
|
|
if retval != 0:
|
|
raise CanOperationError(f'failed to remove filter ID {filter_id} (err {retval})')
|
|
|
|
def _remove_all_filters(self) -> None:
|
|
"""Remove all RX filters."""
|
|
for filter_id in self._filter_ids[:]:
|
|
self._remove_filter(filter_id)
|
|
|
|
def _apply_filters(self, filters: Optional[CanFilters]) -> None:
|
|
self._remove_all_filters()
|
|
|
|
if filters:
|
|
self._is_filtered = True
|
|
else:
|
|
# Accept all frames if no hardware filters provided
|
|
filters = [
|
|
{'can_id': 0x0, 'can_mask': 0x0},
|
|
{'can_id': 0x0, 'can_mask': 0x0, 'extended': True}
|
|
]
|
|
self._is_filtered = False
|
|
|
|
for can_filter in filters:
|
|
can_id = can_filter['can_id']
|
|
can_mask = can_filter['can_mask']
|
|
extended = can_filter['extended'] if 'extended' in can_filter else False
|
|
self._add_filter(can_id, can_mask, extended)
|
|
|
|
def _recv_internal(self, timeout: Optional[float]) -> Tuple[Optional[Message], bool]:
|
|
frame_regex = r'.*' + re.escape(self._device) + \
|
|
r'\s+(?P<brs>\S)(?P<esi>\S)\s+(?P<can_id>\d+)\s+\[(?P<dlc>\d+)\]\s*(?P<data>[a-z0-9 ]*)'
|
|
lines = self._dut.readlines_until(regex=frame_regex, timeout=timeout)
|
|
msg = None
|
|
|
|
regex_compiled = re.compile(frame_regex)
|
|
for line in lines:
|
|
m = regex_compiled.match(line)
|
|
if m:
|
|
can_id = int(m.group('can_id'), 16)
|
|
ext = len(m.group('can_id')) == 8
|
|
dlc = int(m.group('dlc'))
|
|
fd = len(m.group('dlc')) == 2
|
|
brs = m.group('brs') == 'B'
|
|
esi = m.group('esi') == 'P'
|
|
data = bytearray.fromhex(m.group('data'))
|
|
msg = Message(arbitration_id=can_id,is_extended_id=ext,
|
|
data=data, dlc=dlc,
|
|
is_fd=fd, bitrate_switch=brs, error_state_indicator=esi,
|
|
channel=self._device, check=True)
|
|
logger.debug('received: %s', msg)
|
|
|
|
return msg, self._is_filtered
|
|
|
|
def shutdown(self) -> None:
|
|
if not self._is_shutdown:
|
|
super().shutdown()
|
|
self._stop()
|
|
self._remove_all_filters()
|