Skip to content

Commit

Permalink
Introductory level support for managed nodes (#7)
Browse files Browse the repository at this point in the history
* enhance composable node support & code refactor

* introduce support for managed nodes
- statically configure and activate the lifecycle state

* added support for managed nodes
  • Loading branch information
ibrahimsel authored Mar 4, 2024
1 parent 8207868 commit 55a5489
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 15 deletions.
5 changes: 4 additions & 1 deletion composer/introspection/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
import asyncio
import multiprocessing
from launch import LaunchDescription, LaunchService
from launch.actions import RegisterEventHandler
from launch.actions import RegisterEventHandler, ExecuteProcess, TimerAction
from launch.event_handlers import OnProcessStart, OnProcessExit
from launch.substitutions import FindExecutable

class Ros2LaunchParent:
"""
Expand Down Expand Up @@ -84,6 +85,8 @@ def _run_process(self, stop_event, launch_description):
launch_service.include_launch_description(launch_description)
launch_task = loop.create_task(launch_service.run_async())



async def wait_for_stop_event():
while not stop_event.is_set():
await asyncio.sleep(0.1)
Expand Down
55 changes: 55 additions & 0 deletions composer/model/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

import os
import composer.model.param as param
from lifecycle_msgs.msg import Transition, State
from lifecycle_msgs.srv import GetState, GetAvailableTransitions, GetAvailableStates, ChangeState
import rclpy

class Node:
def __init__(self, stack, manifest=None, container=None):
Expand All @@ -32,6 +35,7 @@ def __init__(self, stack, manifest=None, container=None):
self.pkg = manifest.get('pkg', '')
self.exec = manifest.get('exec', '')
self.plugin = manifest.get('plugin', '')
self.lifecycle = manifest.get('lifecycle', '')
self.name = manifest.get('name', '')
self.ros_args = manifest.get('ros_args', '')
self.args = stack.resolve_expression(manifest.get('args', ''))
Expand All @@ -51,6 +55,7 @@ def toManifest(self):
"param": [p.toManifest() for p in self.param],
"remap": [{"from": rm[0], "to": rm[1]} for rm in self.remap_args],
"pkg": self.pkg,
"lifecycle": self.lifecycle,
"exec": self.exec,
"plugin": self.plugin,
"name": self.name,
Expand All @@ -63,6 +68,56 @@ def toManifest(self):
"unless": self.unless,
"action": self.action
}

def change_state(self, verbs=[]):
if self.lifecycle:
temporary_node = rclpy.create_node('change_state_node')
state_cli = temporary_node.create_client(ChangeState, f'/{self.namespace}/{self.name}/change_state')
while not state_cli.wait_for_service(timeout_sec=1.0):
temporary_node.get_logger().warn('Lifecycle change state service not available. Waiting...')

for verb in verbs:
request = ChangeState.Request()
t = Transition()
t.label = verb
request.transition = t
future = state_cli.call_async(request)
rclpy.spin_until_future_complete(temporary_node, future, timeout_sec=3.0)
temporary_node.destroy_node()
else:
print(f"{self.name} is Not a managed node")


def get_state(self):
if self.lifecycle:
temporary_node = rclpy.create_node('get_state_node')
state_cli = temporary_node.create_client(GetState, f'/{self.namespace}/{self.name}/get_state')
while not state_cli.wait_for_service(timeout_sec=1.0):
temporary_node.get_logger().warn('Lifecycle get state service not available. Waiting...')
request = GetState.Request()
future = state_cli.call_async(request)
rclpy.spin_until_future_complete(temporary_node, future, timeout_sec=3.0)
temporary_node.destroy_node()
return future.result()
else:
print(f"{self.name} is Not a managed node")

def get_available_states(self):
if self.lifecycle:
temporary_node = rclpy.create_node('get_available_states_node')
state_cli = temporary_node.create_client(GetAvailableStates, f'/{self.namespace}/{self.name}/get_available_states')
while not state_cli.wait_for_service(timeout_sec=1.0):
temporary_node.get_logger().warn('Lifecycle get_available_states service not available. Waiting...')
request = GetAvailableStates.Request()
response = GetAvailableStates.Response()
future = state_cli.call_async(request)
rclpy.spin_until_future_complete(temporary_node, future, timeout_sec=3.0)
response = future.result()
temporary_node.destroy_node()
return response.available_states
else:
print(f"{self.name} is Not a managed node")


def __eq__(self, other):
"""Checks if two Node objects are equal based on their attributes."""
Expand Down
50 changes: 36 additions & 14 deletions composer/model/stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,14 @@ def flatten_nodes(self, list):
Returns:
list: The flattened list of nodes.
"""
for n in self.node:
list.append(n)
for s in self.stack:
s.flatten_nodes(list)
return list
try:
for n in self.node:
list.append(n)
for s in self.stack:
s.flatten_nodes(list)
return list
except Exception as e:
print(f'Exception occured in flatten_nodes: {e}')

def flatten_composable(self, list):
"""Flatten the nested structure of composable nodes in the stack.
Expand All @@ -151,11 +154,14 @@ def flatten_composable(self, list):
list: The flattened list of composable nodes.
"""

for c in self.composable:
list.append(c)
for s in self.stack:
s.flatten_composable(list)
return list
try:
for c in self.composable:
list.append(c)
for s in self.stack:
s.flatten_composable(list)
return list
except Exception as e:
print(f'Exception occured in flatten_composable: {e}')

def calculate_ros_params_differences(self, current, other):
"""Calculate differences in ROS parameters between nodes of the current stack and another stack.
Expand Down Expand Up @@ -444,7 +450,7 @@ def should_node_run(self, node_name, node_namespace):
'/' + active[0] for active in self.get_active_nodes()]
return f'/{node_namespace}/{node_name}' not in active_nodes

def handle_composable_nodes(self, composable_nodes, launch_description):
def handle_composable_nodes(self, composable_nodes, launch_description, launcher):
"""Handle composable nodes during stack launching.
Args:
Expand All @@ -466,7 +472,7 @@ def handle_composable_nodes(self, composable_nodes, launch_description):
)
launch_description.add_action(container)

def handle_regular_nodes(self, nodes, launch_description):
def handle_regular_nodes(self, nodes, launch_description, launcher):
"""Handle regular nodes during stack launching.
Args:
Expand All @@ -485,6 +491,19 @@ def handle_regular_nodes(self, nodes, launch_description):
arguments=n.args.split(),
remappings=self.process_remaps(n.remap)
))


def handle_managed_nodes(self, nodes, verb):
"""Handle regular nodes during stack launching.
Args:
nodes (list): List of lifecycle nodes.
launch_description (object): The launch description object.
"""
for n in nodes:
if n.lifecycle:
verbs = n.lifecycle.get(verb, [])
n.change_state(verbs=verbs)

def launch(self, launcher):
"""Launch the stack.
Expand All @@ -495,13 +514,16 @@ def launch(self, launcher):
launch_description = LaunchDescription()

try:
self.handle_composable_nodes(self.composable, launch_description)
self.handle_regular_nodes(self.node, launch_description)
self.handle_composable_nodes(self.composable, launch_description, launcher)
self.handle_regular_nodes(self.node, launch_description, launcher)

except Exception as e:
print(f'Stack launching ended with exception: {e}')

launcher.start(launch_description)
all_nodes = self.node + [cn for c in self.composable for cn in c.nodes]
self.handle_managed_nodes(all_nodes, verb='start')


def apply(self, launcher):
"""Apply the stack.
Expand Down

0 comments on commit 55a5489

Please sign in to comment.