-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_integration.py
83 lines (70 loc) · 2.26 KB
/
test_integration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import os
import sys
import time
import unittest
import rclpy
from turtlesim.msg import Pose
import launch
import launch_ros
import launch_testing.actions
def generate_test_description():
return (
launch.LaunchDescription(
[
# Nodes under test
# Can also reference here to a launch file in `app/launch`
# to avoid duplication
# Just a simple node here, yet can launch anything, e.g. Gazebo
launch_ros.actions.Node(
package='turtlesim',
namespace='',
executable='turtlesim_node',
name='turtle1',
),
# Launch tests 0.5 s later
launch.actions.TimerAction(
period=0.5, actions=[launch_testing.actions.ReadyToTest()]),
]
),
# context
{
},
)
# Active tests
class TestTurtleSim(unittest.TestCase):
@classmethod
def setUpClass(cls):
rclpy.init()
@classmethod
def tearDownClass(cls):
rclpy.shutdown()
def setUp(self):
self.node = rclpy.create_node('test_turtlesim')
def tearDown(self):
self.node.destroy_node()
def test_publishes_pose(self, proc_output):
"""Check whether pose messages published"""
msgs_rx = []
sub = self.node.create_subscription(
Pose, 'turtle1/pose',
lambda msg: msgs_rx.append(msg), 10)
try:
end_time = time.time() + 10
while time.time() < end_time:
rclpy.spin_once(self.node, timeout_sec=1)
if len(msgs_rx) > 30:
break
assert len(msgs_rx) > 300
finally:
self.node.destroy_subscription(sub)
def test_logs_spawning(self, proc_output):
"""Check whether logging properly"""
proc_output.assertWaitFor(
'Spawning turtle [turtle1] at x=',
timeout=5, stream='stderr')
# Post-shutdown tests
@launch_testing.post_shutdown_test()
class TestTurtleSimShutdown(unittest.TestCase):
def test_exit_codes(self, proc_info):
"""Check if the processes exited normally."""
launch_testing.asserts.assertExitCodes(proc_info)