From bb8d2a5fa4da087307761eeb9f5536a00382792a Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Fri, 16 Jun 2023 18:37:43 -0700 Subject: [PATCH] Fix for rosbag2_transport::Recorder failures due to the unhandled exceptions (#1382) * Fix for rosbag2_transport::Recorder failures due to unhandled exceptions Signed-off-by: Michael Orlov * Revert "Fix for rosbag2_transport::Recorder failures due to unhandled exceptions" This reverts commit 767d440d637f5170bf490142bf55e6f84bfeee6b. Signed-off-by: Michael Orlov * Handle exceptions in event publisher and discovery threads - Exceptions potentially may come from the underlying node operations when we are invalidating context via rclcpp::shutdown(). We need to have possibility to correct finish recording in this case. Signed-off-by: Michael Orlov --------- Signed-off-by: Michael Orlov --- .../src/rosbag2_transport/recorder.cpp | 48 +++++++++++++------ 1 file changed, 33 insertions(+), 15 deletions(-) diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index 5b50f97274..69c280e136 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -292,11 +292,12 @@ void RecorderImpl::record() response->paused = is_paused(); }); + split_event_pub_ = + node->create_publisher("events/write_split", 1); + // Start the thread that will publish events event_publisher_thread_ = std::thread(&RecorderImpl::event_publisher_thread_main, this); - split_event_pub_ = - node->create_publisher("events/write_split", 1); rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; callbacks.write_split_callback = [this](rosbag2_cpp::bag_events::BagSplitInfo & info) { @@ -336,7 +337,17 @@ void RecorderImpl::event_publisher_thread_main() auto message = rosbag2_interfaces::msg::WriteSplitEvent(); message.closed_file = bag_split_info_.closed_file; message.opened_file = bag_split_info_.opened_file; - split_event_pub_->publish(message); + try { + split_event_pub_->publish(message); + } catch (const std::exception & e) { + RCLCPP_ERROR_STREAM( + node->get_logger(), + "Failed to publish message on '/events/write_split' topic. \nError: " << e.what()); + } catch (...) { + RCLCPP_ERROR_STREAM( + node->get_logger(), + "Failed to publish message on '/events/write_split' topic."); + } } } RCLCPP_INFO(node->get_logger(), "Event publisher thread: Exiting"); @@ -395,19 +406,26 @@ void RecorderImpl::topics_discovery() } } while (rclcpp::ok() && stop_discovery_ == false) { - auto topics_to_subscribe = - get_requested_or_available_topics(); - for (const auto & topic_and_type : topics_to_subscribe) { - warn_if_new_qos_for_subscribed_topic(topic_and_type.first); - } - auto missing_topics = get_missing_topics(topics_to_subscribe); - subscribe_topics(missing_topics); + try { + auto topics_to_subscribe = get_requested_or_available_topics(); + for (const auto & topic_and_type : topics_to_subscribe) { + warn_if_new_qos_for_subscribed_topic(topic_and_type.first); + } + auto missing_topics = get_missing_topics(topics_to_subscribe); + subscribe_topics(missing_topics); - if (!record_options_.topics.empty() && subscriptions_.size() == record_options_.topics.size()) { - RCLCPP_INFO( - node->get_logger(), - "All requested topics are subscribed. Stopping discovery..."); - return; + if (!record_options_.topics.empty() && + subscriptions_.size() == record_options_.topics.size()) + { + RCLCPP_INFO( + node->get_logger(), + "All requested topics are subscribed. Stopping discovery..."); + return; + } + } catch (const std::exception & e) { + RCLCPP_ERROR_STREAM(node->get_logger(), "Failure in topics discovery.\nError: " << e.what()); + } catch (...) { + RCLCPP_ERROR_STREAM(node->get_logger(), "Failure in topics discovery."); } std::this_thread::sleep_for(record_options_.topic_polling_interval); }