Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WaitForTopics: get content of messages for each topic #353

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
8656009
Get content of topic from launch_testing_ros::WaitForTopics
LastStarDust Feb 13, 2023
e6d2af3
Use deque instead of list and added unit tests
LastStarDust Feb 22, 2023
3ad48ec
return a list to not expose implementation details (deque)
LastStarDust Feb 28, 2023
41643b6
Fixed flake8 errors
LastStarDust Mar 1, 2023
9f214fc
WaitForTopic: better naming of variables and methods
LastStarDust Mar 2, 2023
5b015fc
WaitForTopic: better naming of variables and methods (bugfix)
LastStarDust Mar 2, 2023
de7266c
Update launch_testing_ros/launch_testing_ros/wait_for_topics.py
LastStarDust Mar 10, 2023
595a1c7
Update launch_testing_ros/launch_testing_ros/wait_for_topics.py
LastStarDust Mar 10, 2023
64a4dcf
Update launch_testing_ros/test/examples/wait_for_topic_launch_test.py
LastStarDust Mar 10, 2023
a9a56bd
Update launch_testing_ros/launch_testing_ros/wait_for_topics.py
LastStarDust Mar 10, 2023
e1a7013
Bugfix: save all incoming messages and not only the first
LastStarDust Mar 13, 2023
a84047c
check the buffer length
LastStarDust Mar 13, 2023
1cd227b
Do not recreate the same subscribers when recalling the wait method
LastStarDust Mar 15, 2023
ad4557e
Clear msg_event_object too
LastStarDust Mar 15, 2023
9d3c509
Forgot to rename received_messages into received_messages_buffer
LastStarDust May 25, 2023
6228855
Renamed max_number_of_messages into messages_received_buffer_length
LastStarDust May 26, 2023
e1564ba
Trying to fix flake8 warnings
LastStarDust Jun 9, 2023
1144f61
Fixed import ordering
LastStarDust Jun 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 51 additions & 19 deletions launch_testing_ros/launch_testing_ros/wait_for_topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import deque
import random
import string
from threading import Event
Expand Down Expand Up @@ -47,12 +48,14 @@ def method_2():
print('Given topics are receiving messages !')
print(wait_for_topics.topics_not_received()) # Should be an empty set
print(wait_for_topics.topics_received()) # Should be {'topic_1', 'topic_2'}
print(wait_for_topics.messages_received('topic_1')) # Should be [message_1, ...]
wait_for_topics.shutdown()
"""

def __init__(self, topic_tuples, timeout=5.0):
def __init__(self, topic_tuples, timeout=5.0, messages_received_buffer_length=10):
self.topic_tuples = topic_tuples
self.timeout = timeout
self.messages_received_buffer_length = messages_received_buffer_length
self.__ros_context = rclpy.Context()
rclpy.init(context=self.__ros_context)
self.__ros_executor = SingleThreadedExecutor(context=self.__ros_context)
Expand All @@ -64,9 +67,14 @@ def __init__(self, topic_tuples, timeout=5.0):
self.__ros_spin_thread.start()

def _prepare_ros_node(self):
node_name = '_test_node_' +\
''.join(random.choices(string.ascii_uppercase + string.digits, k=10))
self.__ros_node = _WaitForTopicsNode(name=node_name, node_context=self.__ros_context)
node_name = '_test_node_' + ''.join(
random.choices(string.ascii_uppercase + string.digits, k=10)
)
self.__ros_node = _WaitForTopicsNode(
name=node_name,
node_context=self.__ros_context,
messages_received_buffer_length=self.messages_received_buffer_length,
)
self.__ros_executor.add_node(self.__ros_node)

def wait(self):
Expand All @@ -87,6 +95,12 @@ def topics_not_received(self):
"""Topics that did not receive any messages."""
return self.__ros_node.expected_topics - self.__ros_node.received_topics

def received_messages(self, topic_name):
"""List of received messages of a specific topic."""
if topic_name not in self.__ros_node.received_messages_buffer:
raise KeyError('No messages received for topic: ' + topic_name)
return list(self.__ros_node.received_messages_buffer[topic_name])

def __enter__(self):
if not self.wait():
raise RuntimeError('Did not receive messages on these topics: ',
Expand All @@ -100,31 +114,49 @@ def __exit__(self, exep_type, exep_value, trace):
class _WaitForTopicsNode(Node):
"""Internal node used for subscribing to a set of topics."""

def __init__(self, name='test_node', node_context=None):
super().__init__(node_name=name, context=node_context)
def __init__(
self, name='test_node', node_context=None, messages_received_buffer_length=None
):
super().__init__(node_name=name, context=node_context) # type: ignore
self.msg_event_object = Event()

def start_subscribers(self, topic_tuples):
self.messages_received_buffer_length = messages_received_buffer_length
self.subscriber_list = []
self.expected_topics = {name for name, _ in topic_tuples}
self.topic_tuples = []
self.expected_topics = set()
self.received_topics = set()
self.received_messages_buffer = {}

def _reset(self):
self.msg_event_object.clear()
self.received_topics = set()
for buffer in self.received_messages_buffer.values():
buffer.clear()

def start_subscribers(self, topic_tuples):
self._reset()
for topic_name, topic_type in topic_tuples:
# Create a subscriber
self.subscriber_list.append(
self.create_subscription(
topic_type,
topic_name,
self.callback_template(topic_name),
10
if (topic_name, topic_type) not in self.topic_tuples:
self.topic_tuples.append((topic_name, topic_type))
self.expected_topics.add(topic_name)
# Initialize ring buffer of received messages
self.received_messages_buffer[topic_name] = deque(
maxlen=self.messages_received_buffer_length
)
# Create a subscriber
self.subscriber_list.append(
self.create_subscription(
topic_type,
topic_name,
self.callback_template(topic_name),
10
)
)
)

def callback_template(self, topic_name):

def topic_callback(data):
self.get_logger().debug('Message received for ' + topic_name)
self.received_messages_buffer[topic_name].append(data)
if topic_name not in self.received_topics:
self.get_logger().debug('Message received for ' + topic_name)
self.received_topics.add(topic_name)
if self.received_topics == self.expected_topics:
self.msg_event_object.set()
Expand Down
16 changes: 15 additions & 1 deletion launch_testing_ros/test/examples/wait_for_topic_launch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import os
import re
import sys
import unittest

Expand All @@ -22,7 +23,9 @@
import launch_testing.actions
import launch_testing.markers
from launch_testing_ros import WaitForTopics

import pytest

from std_msgs.msg import String


Expand Down Expand Up @@ -57,11 +60,18 @@ def test_topics_successful(self, count):
"""All the supplied topics should be read successfully."""
topic_list = [('chatter_' + str(i), String) for i in range(count)]
expected_topics = {'chatter_' + str(i) for i in range(count)}
message_pattern = re.compile(r'Hello World: \d+')

# Method 1 : Using the magic methods and 'with' keyword
with WaitForTopics(topic_list, timeout=10.0) as wait_for_node_object_1:
with WaitForTopics(
topic_list, timeout=2.0, messages_received_buffer_length=10
) as wait_for_node_object_1:
assert wait_for_node_object_1.topics_received() == expected_topics
assert wait_for_node_object_1.topics_not_received() == set()
for topic_name, _ in topic_list:
assert len(wait_for_node_object_1.received_messages(topic_name)) >= 1
message = wait_for_node_object_1.received_messages(topic_name).pop().data
assert message_pattern.match(message)

# Multiple instances of WaitForNode() can be created safely as
# their internal nodes spin in separate contexts
Expand All @@ -70,6 +80,10 @@ def test_topics_successful(self, count):
assert wait_for_node_object_2.wait()
assert wait_for_node_object_2.topics_received() == expected_topics
assert wait_for_node_object_2.topics_not_received() == set()
for topic_name, _ in topic_list:
assert len(wait_for_node_object_1.received_messages(topic_name)) >= 1
message = wait_for_node_object_2.received_messages(topic_name).pop().data
LastStarDust marked this conversation as resolved.
Show resolved Hide resolved
assert message_pattern.match(message)
wait_for_node_object_2.shutdown()

def test_topics_unsuccessful(self, count):
Expand Down