Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce the advanced publisher and subscriber #368

Open
wants to merge 8 commits into
base: rolling
Choose a base branch
from

Conversation

YuanYuYuan
Copy link
Contributor

With this PR, we replace the QueryingSubscriber and PublicationCache (+ a normal publisher)
with the AdvancedSubscriber and AdvancedPublisher.

NOTE: A deadlock is found when undeclaring advanced subscribers. It will be fixed once eclipse-zenoh/zenoh#1685 is merged and synced with zenoh-c.

@clalancette clalancette marked this pull request as draft January 2, 2025 16:09
@YuanYuYuan YuanYuYuan force-pushed the feat/advanced-pub-sub branch from e0c490f to 1f9357f Compare January 7, 2025 07:57
@YuanYuYuan
Copy link
Contributor Author

Update: The deadlock fix has been merged. Mark this PR as ready for review.

@YuanYuYuan YuanYuYuan marked this pull request as ready for review January 7, 2025 07:58
@YuanYuYuan
Copy link
Contributor Author

YuanYuYuan commented Jan 8, 2025

Test Failure

ros2.repos

  • rclcpp_action
    • 1 - test_client
  • rclc_parameter
    • 1 - rclc_parameter_test
  • test_rclcpp.log
    • 74 - test_n_nodes__rmw_zenoh_cpp (Failed) <--- Flaky

No new failure is found.

@YuanYuYuan
Copy link
Contributor Author

Update: bump up zenoh-cpp version to include the memory leak fix eclipse-zenoh/zenoh-cpp#363.

@YuanYuYuan
Copy link
Contributor Author

Will rebase once #405 is merged.

@YuanYuYuan YuanYuYuan force-pushed the feat/advanced-pub-sub branch from 08b0906 to b76ee4c Compare January 11, 2025 17:05
Copy link
Member

@Yadunund Yadunund left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see feedback inline.
I have the following additional questions

  1. It looks like this PR does not fix [Test Failure] rclcpp_action / test_client #326. But from my experience, it changes the behavior from deterministically failing to failing sometimes. ie, the test is now flaky. Do you observe the same?
  2. With AdvancedSubscribers do we no longer need to call Get() when we detect an AdvancedPublisher with publication cache? This was needed to fix Subscribers sometimes not getting messages from topics using durability transient_local #263.

zenoh_cpp_vendor/CMakeLists.txt Outdated Show resolved Hide resolved
rmw_zenoh_cpp/src/detail/graph_cache.hpp Show resolved Hide resolved
rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp Outdated Show resolved Hide resolved
adv_sub_opts.history->detect_late_publishers = true;
adv_sub_opts.history->max_samples = entity_->topic_info()->qos_.depth;
adv_sub_opts.recovery = zenoh::ext::SessionExt::AdvancedSubscriberOptions::RecoveryOptions{};
adv_sub_opts.recovery->periodic_queries_period_ms = 1000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again looking at the documentation, it sounds like recovery options should only be enabled if reliability is RELIABLE. Could you clarify on the behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, answered in #368 (comment).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add some comments on why a value of 1000 is appropriate for periodic_queries_period_ms?

@YuanYuYuan YuanYuYuan force-pushed the feat/advanced-pub-sub branch from b76ee4c to 9cc776b Compare January 13, 2025 08:28
@YuanYuYuan
Copy link
Contributor Author

Please see feedback inline. I have the following additional questions

1. It looks like this PR does not fix [[Test Failure] rclcpp_action / test_client #326](https://github.com/ros2/rmw_zenoh/issues/326). But from my experience, it changes the behavior from deterministically failing to failing sometimes. ie, the test is now flaky. Do you observe the same?

2. With `AdvancedSubscribers` do we no longer need to call `Get()`  when we detect an `AdvancedPublisher` with publication cache? This was needed to fix [Subscribers sometimes not getting messages from topics using durability transient_local #263](https://github.com/ros2/rmw_zenoh/issues/263).

Here’s an improved version of your paragraph with clearer phrasing and a more professional tone:

  1. This PR does not resolve the issue reported in [Test Failure] rclcpp_action / test_client #326. Based on the investigation, the root cause lies in the inappropriate synchronization assumptions in the test. We have also observed similar failures (slightly more often) when using the previous QueryingSubscriber implementation. To address this issue, we can either make the local publisher-to-subscriber action a blocking call or modify the test itself.

  2. We expect the issue will be resolved. So no longer need the hack 😄

@YuanYuYuan
Copy link
Contributor Author

YuanYuYuan commented Jan 13, 2025

Redid the test with the repos. No new failures were found.

  • rclcpp_action/test_client
  • rclc_parameter/rclc_parameter_test

Comment on lines -185 to -197
sub_options.query_keyexpr = std::move(selector_ke);
// Tell the PublicationCache's Queryable that the query accepts any key expression as a reply.
// By default a query accepts only replies that matches its query selector.
// This allows us to selectively query certain PublicationCaches when defining the
// set_querying_subscriber_callback below.
sub_options.query_accept_replies = ZC_REPLY_KEYEXPR_ANY;
// As this initial query is now using a "*", the query target is not COMPLETE.
sub_options.query_target = Z_QUERY_TARGET_ALL;
// We set consolidation to none as we need to receive transient local messages
// from a number of publishers. Eg: To receive TF data published over /tf_static
// by various publishers.
sub_options.query_consolidation =
zenoh::QueryConsolidation(zenoh::ConsolidationMode::Z_CONSOLIDATION_MODE_NONE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And it does not look like there is a way to set these parameters anymore. Can you confirm that the behavior with the AdvancedSubscriber will match the expectations of the old implementation?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants