diff --git a/rosbag2_cpp/include/rosbag2_cpp/cache/circular_message_cache.hpp b/rosbag2_cpp/include/rosbag2_cpp/cache/circular_message_cache.hpp index 0777dcd788..f002183632 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/cache/circular_message_cache.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/cache/circular_message_cache.hpp @@ -20,6 +20,7 @@ #include #include #include +#include #include "rcpputils/thread_safety_annotations.hpp" @@ -28,6 +29,7 @@ #include "rosbag2_cpp/cache/cache_buffer_interface.hpp" #include "rosbag2_cpp/visibility_control.hpp" +#include "rosbag2_storage/rosbag2_storage/bag_metadata.hpp" #include "rosbag2_storage/serialized_bag_message.hpp" // This is necessary because of using stl types here. It is completely safe, because @@ -52,7 +54,9 @@ class ROSBAG2_CPP_PUBLIC CircularMessageCache : public MessageCacheInterface { public: - explicit CircularMessageCache(size_t max_buffer_size); + explicit CircularMessageCache( + size_t max_buffer_size, const std::unordered_map & topics_names_to_info); ~CircularMessageCache() override; diff --git a/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_circular_buffer.hpp b/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_circular_buffer.hpp index bd6a022f2e..6fcbe6feba 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_circular_buffer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_circular_buffer.hpp @@ -18,9 +18,12 @@ #include #include #include +#include +#include #include "rosbag2_cpp/visibility_control.hpp" #include "rosbag2_cpp/cache/cache_buffer_interface.hpp" +#include "rosbag2_storage/rosbag2_storage/bag_metadata.hpp" #include "rosbag2_storage/serialized_bag_message.hpp" // This is necessary because of using stl types here. It is completely safe, because @@ -51,7 +54,9 @@ class ROSBAG2_CPP_PUBLIC MessageCacheCircularBuffer public: // Delete default constructor since max_cache_size is required MessageCacheCircularBuffer() = delete; - explicit MessageCacheCircularBuffer(size_t max_cache_size); + explicit MessageCacheCircularBuffer( + size_t max_cache_size, const std::unordered_map & topics_names_to_info); /** * If buffer size has some space left, we push the message regardless of its size, @@ -73,6 +78,7 @@ class ROSBAG2_CPP_PUBLIC MessageCacheCircularBuffer std::vector msg_vector_; size_t buffer_bytes_size_ {0u}; const size_t max_bytes_size_; + const std::unordered_map & topics_names_to_info_; }; } // namespace cache diff --git a/rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp b/rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp index d26d04ea61..bb10bc809c 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp @@ -27,10 +27,16 @@ namespace rosbag2_cpp namespace cache { -CircularMessageCache::CircularMessageCache(size_t max_buffer_size) +CircularMessageCache::CircularMessageCache( + size_t max_buffer_size, + const std::unordered_map & topics_names_to_info) { - producer_buffer_ = std::make_shared(max_buffer_size); - consumer_buffer_ = std::make_shared(max_buffer_size); + producer_buffer_ = std::make_shared( + max_buffer_size, + topics_names_to_info); + consumer_buffer_ = std::make_shared( + max_buffer_size, + topics_names_to_info); } CircularMessageCache::~CircularMessageCache() diff --git a/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache_circular_buffer.cpp b/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache_circular_buffer.cpp index 8a1b212db6..ea0430e103 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache_circular_buffer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache_circular_buffer.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -25,8 +26,11 @@ namespace rosbag2_cpp namespace cache { -MessageCacheCircularBuffer::MessageCacheCircularBuffer(size_t max_cache_size) -: max_bytes_size_(max_cache_size) +MessageCacheCircularBuffer::MessageCacheCircularBuffer( + size_t max_cache_size, + const std::unordered_map & topics_names_to_info) +: max_bytes_size_(max_cache_size), topics_names_to_info_(topics_names_to_info) { } @@ -38,10 +42,37 @@ bool MessageCacheCircularBuffer::push(CacheBufferInterface::buffer_element_t msg return false; } - // Remove any old items until there is room for new message + // Remove any old items that is no transient local until there is room for new message while (buffer_bytes_size_ > (max_bytes_size_ - msg->serialized_data->buffer_length)) { - buffer_bytes_size_ -= buffer_.front()->serialized_data->buffer_length; - buffer_.pop_front(); + auto is_not_transient_local = [this](buffer_element_t buffer_element) + { + auto it_matching_topic_name = topics_names_to_info_.find(buffer_element->topic_name); + if (it_matching_topic_name != topics_names_to_info_.end()) { + return it_matching_topic_name->second.topic_metadata.offered_qos_profiles.find( + "durability: 1") == std::string::npos; + } + return true; + }; + + // Find the first element which is non transient local + auto it_first_not_transient = std::find_if( + buffer_.begin(), + buffer_.end(), is_not_transient_local); + + size_t position_first_not_transient = std::distance(buffer_.begin(), it_first_not_transient); + + // Remove the first non transient msg if found and if older transient messages account for less + // than 10% of the total number of messages in the buffer + // else pop_front + if (it_first_not_transient != buffer_.end() && + (position_first_not_transient + 1) < buffer_.size() / 10) + { + buffer_bytes_size_ -= it_first_not_transient->get()->serialized_data->buffer_length; + buffer_.erase(it_first_not_transient); + } else { + buffer_.pop_front(); + buffer_bytes_size_ -= buffer_.front()->serialized_data->buffer_length; + } } // Add new message to end of buffer buffer_bytes_size_ += msg->serialized_data->buffer_length; diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index ea6d2fa579..6da5ebfde3 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -160,7 +160,7 @@ void SequentialWriter::open( if (use_cache_) { if (storage_options.snapshot_mode) { message_cache_ = std::make_shared( - storage_options.max_cache_size); + storage_options.max_cache_size, topics_names_to_info_); } else { message_cache_ = std::make_shared( storage_options.max_cache_size); diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_circular_message_cache.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_circular_message_cache.cpp index 802799bfe4..aa2d61c576 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_circular_message_cache.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_circular_message_cache.cpp @@ -72,8 +72,9 @@ class CircularMessageCacheTest : public Test TEST_F(CircularMessageCacheTest, circular_message_cache_overwrites_old) { const unsigned message_count = 100; + std::unordered_map topics_names_to_info; auto circular_message_cache = std::make_shared( - cache_size_); + cache_size_, topics_names_to_info); for (unsigned i = 0; i < message_count; ++i) { auto msg = make_test_msg(); @@ -109,8 +110,9 @@ TEST_F(CircularMessageCacheTest, circular_message_cache_overwrites_old) { TEST_F(CircularMessageCacheTest, circular_message_cache_ensure_empty) { const unsigned message_count = 100; + std::unordered_map topics_names_to_info; auto circular_message_cache = std::make_shared( - cache_size_); + cache_size_, topics_names_to_info); for (unsigned i = 0; i < message_count; ++i) { auto msg = make_test_msg();