From c54ac4b13bfe84359d0ff9749011558300a33ae7 Mon Sep 17 00:00:00 2001 From: Dmytro Korenkov Date: Fri, 15 Mar 2024 16:15:08 +0100 Subject: [PATCH] new storage for snapshot --- rosbag2_storage_mcap/plugin_description.xml | 7 + rosbag2_storage_mcap/src/mcap_storage.cpp | 233 ++++++++++++++++++++ 2 files changed, 240 insertions(+) diff --git a/rosbag2_storage_mcap/plugin_description.xml b/rosbag2_storage_mcap/plugin_description.xml index dd7d45ec0d..90228ea5ea 100644 --- a/rosbag2_storage_mcap/plugin_description.xml +++ b/rosbag2_storage_mcap/plugin_description.xml @@ -6,4 +6,11 @@ > rosbag2 storage plugin using the MCAP file format + + rosbag2 snapshot storage plugin using the MCAP file format + diff --git a/rosbag2_storage_mcap/src/mcap_storage.cpp b/rosbag2_storage_mcap/src/mcap_storage.cpp index 8408220909..7397365130 100644 --- a/rosbag2_storage_mcap/src/mcap_storage.cpp +++ b/rosbag2_storage_mcap/src/mcap_storage.cpp @@ -841,12 +841,245 @@ void MCAPStorage::ensure_rosdistro_metadata_added() has_added_ros_distro_metadata_ = true; } +/** + * A storage implementation for the MCAP file format of snapshots. + */ +class ROSBAG2_STORAGE_MCAP_PUBLIC MCAPStorageSnapshot +: public rosbag2_storage::storage_interfaces::ReadWriteInterface +{ +public: + MCAPStorageSnapshot(); + ~MCAPStorageSnapshot() override; + /** BaseIOInterface **/ + #ifdef ROSBAG2_STORAGE_MCAP_HAS_STORAGE_OPTIONS + void open(const rosbag2_storage::StorageOptions & storage_options, + rosbag2_storage::storage_interfaces::IOFlag io_flag = + rosbag2_storage::storage_interfaces::IOFlag::READ_WRITE) override; + void open(const std::string & uri, rosbag2_storage::storage_interfaces::IOFlag io_flag = + rosbag2_storage::storage_interfaces::IOFlag::READ_WRITE); + #else + void open(const std::string & uri, + rosbag2_storage::storage_interfaces::IOFlag io_flag = + rosbag2_storage::storage_interfaces::IOFlag::READ_WRITE) override; + #endif + + + /** BaseInfoInterface **/ + rosbag2_storage::BagMetadata get_metadata() override; + std::string get_relative_file_path() const override; + uint64_t get_bagfile_size() const override; + std::string get_storage_identifier() const override; + + /** BaseReadInterface **/ + #ifdef ROSBAG2_STORAGE_MCAP_HAS_SET_READ_ORDER + bool set_read_order(const rosbag2_storage::ReadOrder &) override; + #endif + bool has_next() override; + std::shared_ptr read_next() override; + std::vector get_all_topics_and_types() override; + void get_all_message_definitions( + std::vector & definitions) override; + + /** ReadOnlyInterface **/ + void set_filter(const rosbag2_storage::StorageFilter & storage_filter) override; + void reset_filter() override; + #ifdef ROSBAG2_STORAGE_MCAP_OVERRIDE_SEEK_METHOD + void seek(const rcutils_time_point_value_t & time_stamp) override; + #else + void seek(const rcutils_time_point_value_t & time_stamp); + #endif + + /** ReadWriteInterface **/ + uint64_t get_minimum_split_file_size() const override; + + /** BaseWriteInterface **/ + void write(std::shared_ptr msg) override; + void write( + const std::vector> & msg) override; + void create_topic(const rosbag2_storage::TopicMetadata & topic, + const rosbag2_storage::MessageDefinition & message_definition) override; + void remove_topic(const rosbag2_storage::TopicMetadata & topic) override; + #ifdef ROSBAG2_STORAGE_MCAP_HAS_UPDATE_METADATA + void update_metadata(const rosbag2_storage::BagMetadata &) override; + #endif + +private: + std::shared_ptr storage_; + std::string uri_; + rosbag2_storage::storage_interfaces::IOFlag io_flag_ = rosbag2_storage::storage_interfaces::IOFlag::READ_WRITE; + rosbag2_storage::BagMetadata metadata_; + std::vector> topics_; + rosbag2_storage::StorageOptions storage_options_; + uint64_t current_storage_size_; +}; + +MCAPStorageSnapshot::MCAPStorageSnapshot():storage_(nullptr),current_storage_size_(0) +{ +} + +MCAPStorageSnapshot::~MCAPStorageSnapshot() +{ +} + +/** BaseIOInterface **/ +#ifdef ROSBAG2_STORAGE_MCAP_HAS_STORAGE_OPTIONS +void MCAPStorageSnapshot::open(const rosbag2_storage::StorageOptions & storage_options, + rosbag2_storage::storage_interfaces::IOFlag io_flag) +{ + (void)io_flag; + storage_options_ = storage_options; + uri_ = storage_options.uri; +} +#endif + +void MCAPStorageSnapshot::open(const std::string & uri, rosbag2_storage::storage_interfaces::IOFlag io_flag) +{ + uri_ = uri; + //this class is used only for READ_WRITE + (void)io_flag; +} + +/** BaseInfoInterface **/ +rosbag2_storage::BagMetadata MCAPStorageSnapshot::get_metadata() +{ + return metadata_; +} + +std::string MCAPStorageSnapshot::get_relative_file_path() const +{ + return uri_; +} + +uint64_t MCAPStorageSnapshot::get_bagfile_size() const +{ + return current_storage_size_; +} + +std::string MCAPStorageSnapshot::get_storage_identifier() const +{ + return "mcap_snapshot"; +} + +/** BaseReadInterface **/ +#ifdef ROSBAG2_STORAGE_MCAP_HAS_SET_READ_ORDER +bool MCAPStorageSnapshot::set_read_order(const rosbag2_storage::ReadOrder & read_order) +{ + (void)read_order; + return false; +} +#endif + +bool MCAPStorageSnapshot::has_next() +{ + return false; +} + +std::shared_ptr MCAPStorageSnapshot::read_next() +{ + return nullptr; +} + +std::vector MCAPStorageSnapshot::get_all_topics_and_types() +{ + return std::vector(); +} + +void MCAPStorageSnapshot::get_all_message_definitions( + std::vector & definitions) +{ + (void)definitions; +} + +/** ReadOnlyInterface **/ +void MCAPStorageSnapshot::set_filter(const rosbag2_storage::StorageFilter & storage_filter) +{ + (void)storage_filter; +} + +void MCAPStorageSnapshot::reset_filter() +{ +} + +#ifdef ROSBAG2_STORAGE_MCAP_OVERRIDE_SEEK_METHOD +void MCAPStorageSnapshot::seek(const rcutils_time_point_value_t & time_stamp) +{ + (void)time_stamp; +} + +#else +void MCAPStorageSnapshot::seek(const rcutils_time_point_value_t & time_stamp) +{ + (void)time_stamp; +} +#endif + +/** ReadWriteInterface **/ +uint64_t MCAPStorageSnapshot::get_minimum_split_file_size() const +{ + return 1024; +} + +/** BaseWriteInterface **/ +void MCAPStorageSnapshot::write(std::shared_ptr msg) +{ + (void)msg; +} + +void MCAPStorageSnapshot::write(const std::vector> & msgs) +{ + storage_ = std::make_shared(); + if (!storage_) { + RCUTILS_LOG_WARN_NAMED(LOG_NAME, "Was not able to allocate memory for MCAPStorage"); + return; + } + storage_->open(storage_options_, io_flag_); + storage_->update_metadata(metadata_); + for(auto cur_pair : topics_) + { + storage_->create_topic(cur_pair.first, cur_pair.second); + } + for(auto cur_msg : msgs) + { + storage_->write(cur_msg); + } + storage_.reset(); + current_storage_size_ = storage_options_.max_bagfile_size + 1; +} + +void MCAPStorageSnapshot::create_topic(const rosbag2_storage::TopicMetadata & topic, + const rosbag2_storage::MessageDefinition & message_definition) +{ + topics_.push_back(std::make_pair(topic,message_definition)); +} + +void MCAPStorageSnapshot::remove_topic(const rosbag2_storage::TopicMetadata & topic) +{ + auto iter = std::remove_if( + topics_.begin(), + topics_.end(), + [topic](std::pair const& data) { + if(data.first == topic) return true; + else return false; + }); + topics_.erase(iter, topics_.end()); +} + +#ifdef ROSBAG2_STORAGE_MCAP_HAS_UPDATE_METADATA +void MCAPStorageSnapshot::update_metadata(const rosbag2_storage::BagMetadata & metadata) +{ + metadata_ = metadata; +} +#endif + } // namespace rosbag2_storage_plugins #include "pluginlib/class_list_macros.hpp" // NOLINT PLUGINLIB_EXPORT_CLASS(rosbag2_storage_plugins::MCAPStorage, rosbag2_storage::storage_interfaces::ReadWriteInterface) +PLUGINLIB_EXPORT_CLASS(rosbag2_storage_plugins::MCAPStorageSnapshot, + rosbag2_storage::storage_interfaces::ReadWriteInterface) + #ifdef _WIN32 #pragma warning(pop) #endif