Add test suite using python-can for testing Controller Area Network (CAN) communication between a host PC and a device under test running Zephyr. Signed-off-by: Henrik Brix Andersen <hebad@vestas.com>
94 lines
3 KiB
Python
94 lines
3 KiB
Python
# Copyright (c) 2024 Vestas Wind Systems A/S
|
|
#
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
|
|
"""
|
|
Test suites for testing Zephyr CAN <=> host CAN.
|
|
"""
|
|
|
|
import logging
|
|
import pytest
|
|
|
|
import can
|
|
from can import BusABC, CanProtocol
|
|
|
|
# RX/TX timeout in seconds
|
|
TIMEOUT = 1.0
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@pytest.mark.parametrize('msg', [
|
|
pytest.param(
|
|
can.Message(arbitration_id=0x10,
|
|
is_extended_id=False),
|
|
id='std_id_dlc_0'
|
|
),
|
|
pytest.param(
|
|
can.Message(arbitration_id=0x20,
|
|
data=[0xaa, 0xbb, 0xcc, 0xdd],
|
|
is_extended_id=False),
|
|
id='std_id_dlc_4'
|
|
),
|
|
pytest.param(
|
|
can.Message(arbitration_id=0x30,
|
|
data=[0xee, 0xff, 0xee, 0xff, 0xee, 0xff, 0xee, 0xff],
|
|
is_extended_id=True),
|
|
id='ext_id_dlc_8'
|
|
),
|
|
pytest.param(
|
|
can.Message(arbitration_id=0x40,
|
|
data=[0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09,
|
|
0x10, 0x11],
|
|
is_fd=True, is_extended_id=False),
|
|
id='std_id_fdf_dlc_9'
|
|
),
|
|
pytest.param(
|
|
can.Message(arbitration_id=0x50,
|
|
data=[0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09,
|
|
0x10, 0x11],
|
|
is_fd=True, bitrate_switch=True, is_extended_id=False),
|
|
id='std_id_fdf_brs_dlc_9'
|
|
),
|
|
])
|
|
class TestCanRxTx():
|
|
"""
|
|
Class for testing CAN RX/TX between Zephyr DUT and host.
|
|
"""
|
|
|
|
@staticmethod
|
|
def check_rx(tx: can.Message, rx: can.Message) -> None:
|
|
"""Check if received message matches transmitted message."""
|
|
# pylint: disable-next=unused-variable
|
|
__tracebackhide__ = True
|
|
|
|
if rx is None:
|
|
pytest.fail('no message received')
|
|
|
|
if not tx.equals(rx, timestamp_delta=None, check_channel=False,
|
|
check_direction=False):
|
|
pytest.fail(f'rx message "{rx}" not equal to tx message "{tx}"')
|
|
|
|
@staticmethod
|
|
def skip_if_unsupported(can_dut: BusABC, can_host: BusABC, msg: can.Message) -> None:
|
|
"""Skip test if message format is not supported by both DUT and host."""
|
|
if msg.is_fd:
|
|
if can_dut.protocol == CanProtocol.CAN_20:
|
|
pytest.skip('CAN FD not supported by DUT')
|
|
if can_host.protocol == CanProtocol.CAN_20:
|
|
pytest.skip('CAN FD not supported by host')
|
|
|
|
def test_dut_to_host(self, can_dut: BusABC, can_host: BusABC, msg: can.Message) -> None:
|
|
"""Test DUT to host communication."""
|
|
self.skip_if_unsupported(can_dut, can_host, msg)
|
|
|
|
can_dut.send(msg, timeout=TIMEOUT)
|
|
rx = can_host.recv(timeout=TIMEOUT)
|
|
self.check_rx(msg, rx)
|
|
|
|
def test_host_to_dut(self, can_dut: BusABC, can_host: BusABC, msg: can.Message) -> None:
|
|
"""Test host to DUT communication."""
|
|
self.skip_if_unsupported(can_dut, can_host, msg)
|
|
|
|
can_host.send(msg, timeout=TIMEOUT)
|
|
rx = can_dut.recv(timeout=TIMEOUT)
|
|
self.check_rx(msg, rx)
|