Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ros2 node get_type_description initial implementation #846

Open
wants to merge 2 commits into
base: rolling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ros2node/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

<exec_depend>rclpy</exec_depend>
<exec_depend>ros2cli</exec_depend>
<exec_depend>type_description_interfaces</exec_depend>

<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
Expand Down
156 changes: 156 additions & 0 deletions ros2node/ros2node/verb/get_type_description.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
# Copyright 2023 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os

import rclpy
from ros2cli.node import NODE_NAME_PREFIX
from ros2cli.node.strategy import add_arguments
from ros2node.api import (
NodeNameCompleter,
parse_node_name,
)
from ros2node.verb import VerbExtension
from type_description_interfaces.msg import FieldType
from type_description_interfaces.srv import GetTypeDescription


def print_field_type(ft):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function name print_field_type is a bit misleading since it doesn't print anything.
IMO it would be more appropriate to rename it to the get_field_type_str or something similar.

type_name = None
type_str = ''
for x, y in FieldType.__dict__.items():
if y == ft.type_id:
type_name = x
break
if not type_name:
raise RuntimeError(f'Could not match FieldType type_id {ft.type_id} to a name')

type_str = type_name
if 'NESTED_TYPE' in type_name:
type_str += f' ({ft.nested_type_name})'
if ft.string_capacity:
type_str += f' - string_capacity {ft.string_capacity}'
if ft.capacity:
type_str += f' - capacity {ft.capacity}'
Comment on lines +42 to +45

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if ft.string_capacity:
type_str += f' - string_capacity {ft.string_capacity}'
if ft.capacity:
type_str += f' - capacity {ft.capacity}'
if ft.string_capacity:
type_str += f' string_capacity({ft.string_capacity})'
if ft.capacity:
type_str += f' capacity({ft.capacity})'

IMO formatting with - capacity 255 could be a bit confusing especially if afterward will be printed out the dafult value. It is gonna look like FIELD_TYPE_INT8_ARRAY - capacity 25 = 0

return type_str


def print_individual_type_description(itd, indent=0):
def p(line):
print(' ' * indent + line)

print()
p(f'{itd.type_name}')
p('Fields:')
indent += 2
for field in itd.fields:
field_string = f'{field.name}: {print_field_type(field.type)}'
if field.default_value:
field_string += f' = {field.default_value}'
Comment on lines +59 to +60

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if field.default_value:
field_string += f' = {field.default_value}'
if field.default_value:
field_string += f' default({field.default_value})'

p(field_string)
indent -= 2


def print_type_description(td):
print_individual_type_description(td.type_description)
print()
print('Referenced Type Descriptions:')
for rtd in td.referenced_type_descriptions:
print_individual_type_description(rtd, indent=2)
Comment on lines +67 to +70

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Printing 'Referenced Type Descriptions:' make sense only if td.referenced_type_descriptions is not empty



def discover_hash_for_type(node, remote_node_name, type_name):
# TODO(emersonknapp) get hashes from names_and_types when that is implemented

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a reference to the tracking issue ros2/rmw#356 in the TODO comment.

ns = type_name.split('/')[1]
if ns != 'msg':
raise RuntimeError(
f'Currently cannot auto-discover hashes for "{ns}", only "msg". '
'Please provide type_hash value to command.')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity, how user can get type_hash for srv or action? that would be helpful to add here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately it is not exposed to user anywhere except in the source code ☹️ so you would already have the hash.

ros2/rmw#356 might help a little, but it's not the whole story. I think we need to change the signatures of Node::get_[service|topic]_names_and_types, or provide a new alternative API something like

struct InterfaceType {
std::string name;
rosidl_type_hash_t hash;
};

std::vector<std::string, std::vector<InterfaceType>> get_topic_names_and_types;

remote_node = parse_node_name(remote_node_name)
nt = node.get_publisher_names_and_types_by_node(remote_node.name, remote_node.namespace)
nt += node.get_subscriber_names_and_types_by_node(remote_node.name, remote_node.namespace)
discover_topic = None
for topic, types in nt:
if type_name in types:
discover_topic = topic
if not discover_topic:
raise RuntimeError(
f'Could not find type "{type_name}" advertised as a topic type on node '
f'"{remote_node_name}"')

endpoint_infos = node.get_publishers_info_by_topic(discover_topic)
endpoint_infos += node.get_subscriptions_info_by_topic(discover_topic)
for endpoint_info in endpoint_infos:
if (
endpoint_info.node_name == remote_node.name and
endpoint_info.node_namespace == remote_node.namespace and
endpoint_info.topic_type == type_name
):
return str(endpoint_info.topic_type_hash)
raise RuntimeError(
f'Cound not find endpoint type hash for topic "{discover_topic}"')


class GetTypeDescriptionVerb(VerbExtension):
"""Output information about a node."""

def add_arguments(self, parser, cli_name):
add_arguments(parser)
argument = parser.add_argument(
'node_name',
help='Node name to request information from')
argument.completer = NodeNameCompleter()
parser.add_argument(
'type_name',
help='Interface type to get description of')
parser.add_argument(
'type_hash',
default=None,
nargs='?',
help='Hash string of the type. If not provided, will try to determine automatically.')
parser.add_argument(
'--include-sources', action='store_true',
help='Fetch and display the raw source text as well')

def main(self, *, args):
service_name = f'{args.node_name}/get_type_description'

rclpy.init()
node = rclpy.create_node(f'{NODE_NAME_PREFIX}_td_requester_{os.getpid()}')
cli = node.create_client(GetTypeDescription, '/talker/get_type_description')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
cli = node.create_client(GetTypeDescription, '/talker/get_type_description')
cli = node.create_client(GetTypeDescription, service_name)

if not cli.service_is_ready():
print('Waiting for service to become available...')
if not cli.wait_for_service(timeout_sec=5.0):
raise RuntimeError(f'Service {service_name} not found')
Comment on lines +134 to +135
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about waiting until service is ready, unless user sends the signal to stop? i would have timeout option instead of static timeout that user cannot even see how long it waits.

Suggested change
if not cli.wait_for_service(timeout_sec=5.0):
raise RuntimeError(f'Service {service_name} not found')
cli.wait_for_service()


if args.type_hash:
type_hash = args.type_hash
else:
type_hash = discover_hash_for_type(node, args.node_name, args.type_name)
print(f'Requesting type hash {type_hash}')

request = GetTypeDescription.Request()
request.type_name = args.type_name
request.type_hash = type_hash
request.include_type_sources = args.include_sources
print('Sending request...')
future = cli.call_async(request)
rclpy.spin_until_future_complete(node, future)
response = future.result()
if not response.successful:
raise RuntimeError(
f'Failed to get type description: {response.failure_reason}')

print('Response successful:')
print_type_description(response.type_description)
1 change: 1 addition & 0 deletions ros2node/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
'ros2node.verb': [
'info = ros2node.verb.info:InfoVerb',
'list = ros2node.verb.list:ListVerb',
'get-type-description = ros2node.verb.get_type_description:GetTypeDescriptionVerb',
],
}
)