-
Notifications
You must be signed in to change notification settings - Fork 3
Testing guidelines
Unit testing ROS 2 code is no different from unit testing any other software, but some care must be exercised.
- Data transport in ROS 2 is non-deterministic. So is callback execution when multi-threaded executors are in place. This is true for ROS 2 code in general, and for process-wide APIs in particular, for which non-determinism is the price to pay for a synchronous programming model. As such, time sensitive and execution order dependent tests are bound to fail, even if only sporadically. Synchronization is necessary to avoid these issues, and fortunately the very same process-wide APIs enable safe use of synchronization primitives (e.g. via a multi-threaded executor spinning in a background thread, as provided by
bdai_ros_wrappers.scope
functionality). - ROS 2 middlewares perform peer discovery by default. This allows distributed architectures in production but leads to cross-talk during parallelized testing.
domain_coordinator
functionality simplifies ROS domain ID assignment enforcing host-wide uniqueness and with it, middleware isolation.
Therefore, as rules of thumb consider:
- Using
bdai_ros2_wrappers.scope.top
to setup ROS 2 in your test fixtures.- Isolate it by passing a unique domain ID, as provided by
domain_coordinator.domain_id
.
- Isolate it by passing a unique domain ID, as provided by
- Using synchronization primitives to wait with timeouts.
- Note timeouts make the test time sensitive. Pick timeouts an order of magnitude above the expected test timing.
pytest
is a testing framework for Python software, the most common in ROS 2 Python codebases.
import domain_coordinator
import pytest
from typing import Iterator
import bdai_ros2_wrappers.scope as ros_scope
from bdai_ros2_wrappers.action_client import ActionClientWrapper
from bdai_ros2_wrappers.futures import wait_for_future
from bdai_ros2_wrappers.scope import ROSAwareScope
from bdai_ros2_wrappers.single_goal_action_server import SingleGoalActionServer
from bdai_ros2_wrappers.subscription import wait_for_message
from std_msgs.msg import String
from std_srvs.srv import Trigger
from example_interfaces.action import Fibonacci
from rclpy.action.server import ServerGoalHandle
from rclpy.qos import QoSProfile, HistoryPolicy, DurabilityPolicy
@pytest.fixture
def ros() -> Iterator[ROSAwareScope]:
"""
A pytest fixture that will set up and yield a ROS 2 aware global scope to each test that requests it.
See https://docs.pytest.org/en/7.4.x/fixture.html for a primer on pytest fixtures.
"""
with domain_coordinator.domain_id() as domain_id: # to ensure node isolation
with ros_scope.top(global_=True, namespace="fixture", domain_id=domain_id) as top:
yield top
def test_topic_pub_sub(ros: ROSAwareScope) -> None:
"""Asserts that a published message can be received on the other end."""
qos_profile = QoSProfile(
depth=100,
history=HistoryPolicy.KEEP_LAST,
durability=DurabilityPolicy.TRANSIENT_LOCAL,
)
pub = ros.node.create_publisher(String, "test", qos_profile)
pub.publish(String(data="test"))
# Message will arrive at an unspecified point in the future, thus
assert wait_for_message(String, "test", qos_profile=qos_profile, timeout_sec=5.0)
def test_service_server_client(ros: ROSAwareScope) -> None:
"""Asserts that a service server replies to client requests."""
def callback(_: Trigger.Request, response: Trigger.Response) -> Trigger.Response:
response.success = True
return response
server = ros.node.create_service(Trigger, "trigger", callback)
client = ros.node.create_client(Trigger, "trigger")
assert client.wait_for_service(timeout_sec=5.0)
future = client.call_async(Trigger.Request())
assert wait_for_future(future, timeout_sec=5.0)
response = future.result()
assert response and response.success
def test_action_server_client(ros: ROSAwareScope) -> None:
"""Asserts that an action server reacts to action client requests."""
def callback(goal_handle: ServerGoalHandle) -> Fibonacci.Result:
result = Fibonacci.Result()
result.sequence = [0, 1]
for i in range(1, goal_handle.request.order):
result.sequence.append(result.sequence[i] + result.sequence[i-1])
goal_handle.succeed()
return result
server = SingleGoalActionServer(ros.node, Fibonacci, "compute", callback)
client = ActionClientWrapper(Fibonacci, "compute", ros.node)
assert client.wait_for_server(timeout_sec=5.0)
goal = Fibonacci.Goal(order=3)
result = client.send_goal_and_wait("compute", goal, timeout_sec=5.0)
assert result and list(result.sequence) == [0, 1, 1, 2]
unittest
is the testing framework in Python's standard library.
import contextlib
import domain_coordinator
import unittest
import bdai_ros2_wrappers.scope as ros_scope
from bdai_ros2_wrappers.action_client import ActionClientWrapper
from bdai_ros2_wrappers.futures import wait_for_future
from bdai_ros2_wrappers.scope import ROSAwareScope
from bdai_ros2_wrappers.single_goal_action_server import SingleGoalActionServer
from bdai_ros2_wrappers.subscription import wait_for_message
from std_msgs.msg import String
from std_srvs.srv import Trigger
from example_interfaces.action import Fibonacci
from rclpy.action.server import ServerGoalHandle
from rclpy.qos import QoSProfile, HistoryPolicy, DurabilityPolicy
class TestCase(unittest.TestCase):
def setUp(self) -> None:
"""Sets up an isolated ROS 2 aware scope for all tests in the test case."""
self.fixture = contextlib.ExitStack()
domain_id = self.fixture.enter_context(domain_coordinator.domain_id())
self.ros = self.fixture.enter_context(ros_scope.top(
global_=True, namespace="fixture", domain_id=domain_id
))
def tearDown(self) -> None:
self.fixture.close() # exits all contexts
def test_topic_pub_sub(self) -> None:
"""Asserts that a published message can be received on the other end."""
qos_profile = QoSProfile(
depth=100,
history=HistoryPolicy.KEEP_LAST,
durability=DurabilityPolicy.TRANSIENT_LOCAL,
)
pub = self.ros.node.create_publisher(String, "test", qos_profile)
pub.publish(String(data="test"))
# Message will arrive at an unspecified point in the future, thus
self.assertIsNotNone(wait_for_message(
String, "test", qos_profile=qos_profile, timeout_sec=5.0))
def test_service_server_client(self) -> None:
"""Asserts that a service server replies to client requests."""
def callback(_: Trigger.Request, response: Trigger.Response) -> Trigger.Response:
response.success = True
return response
server = self.ros.node.create_service(Trigger, "trigger", callback)
client = self.ros.node.create_client(Trigger, "trigger")
self.assertTrue(client.wait_for_service(timeout_sec=5.0))
future = client.call_async(Trigger.Request())
self.assertTrue(wait_for_future(future, timeout_sec=5.0))
response = future.result()
self.assertIsNotNone(response)
self.assertTrue(response.success)
def test_action_server_client(self) -> None:
"""Asserts that an action server reacts to action client requests."""
def callback(goal_handle: ServerGoalHandle) -> Fibonacci.Result:
result = Fibonacci.Result()
result.sequence = [0, 1]
for i in range(1, goal_handle.request.order):
result.sequence.append(result.sequence[i] + result.sequence[i-1])
goal_handle.succeed()
return result
server = SingleGoalActionServer(self.ros.node, Fibonacci, "compute", callback)
client = ActionClientWrapper(Fibonacci, "compute", self.ros.node)
self.assertTrue(client.wait_for_server(timeout_sec=5.0))
goal = Fibonacci.Goal(order=3)
result = client.send_goal_and_wait("compute", goal, timeout_sec=5.0)
self.assertIsNotNone(result)
self.assertEqual(list(result.sequence), [0, 1, 1, 2])
A package's type and build system dictate how unit tests are to be added. Unit tests for ROS 2 packages are typically hosted under the test
subdirectory, so the following assumes this convention is observed.
For ament_cmake
packages, the CMakeLists.txt
file should have:
if(BUILD_TESTING)
find_package(ament_cmake_pytest REQUIRED)
# Define an arbitrary target for your tests such as:
ament_add_pytest_test(unit_tests test)
endif()
For ament_python
packages, the setup.py
file should have:
setup(
# ...
tests_require=['pytest'],
)
Note that pytest
is the testing tool of choice regardless of package type.
Note: test discovery mechanisms in pytest
do not require __init__.py
files under test
directories.