Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[humble] Make snapshot writing into a new file each time it is triggered (backport #1842) #1850

Merged
merged 2 commits into from
Nov 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,81 @@ TEST_P(SequentialCompressionWriterTest, split_event_calls_callback_with_file_com
}
}

TEST_F(SequentialCompressionWriterTest, snapshot_writes_to_new_file_with_file_compression)
{
tmp_dir_storage_options_.max_bagfile_size = 0;
tmp_dir_storage_options_.max_cache_size = 200;
tmp_dir_storage_options_.snapshot_mode = true;

initializeFakeFileStorage();
// Expect a single write call when the snapshot is triggered
EXPECT_CALL(
*storage_, write(
An<const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> &>())
).Times(1);

rosbag2_compression::CompressionOptions compression_options {
DefaultTestCompressor,
rosbag2_compression::CompressionMode::FILE,
1,
1
};
initializeWriter(compression_options);

std::vector<std::string> closed_files;
std::vector<std::string> opened_files;
rosbag2_cpp::bag_events::WriterEventCallbacks callbacks;
callbacks.write_split_callback =
[&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) {
closed_files.emplace_back(info.closed_file);
opened_files.emplace_back(info.opened_file);
};
writer_->add_event_callbacks(callbacks);

std::string rmw_format = "rmw_format";

std::string msg_content = "Hello";
auto msg_length = msg_content.length();
auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = "test_topic";
message->serialized_data =
rosbag2_storage::make_serialized_message(msg_content.c_str(), msg_length);

writer_->open(tmp_dir_storage_options_, {rmw_format, rmw_format});
writer_->create_topic({"test_topic", "test_msgs/BasicTypes", "", ""});

for (size_t i = 0; i < 100; i++) {
writer_->write(message);
}
writer_->take_snapshot();
writer_->close();

EXPECT_THAT(closed_files.size(), 2);
EXPECT_THAT(opened_files.size(), 2);

if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == 2))) {
// Output debug info
for (size_t i = 0; i < opened_files.size(); i++) {
std::cout << "opened_file[" << i << "] = '" << opened_files[i] <<
"'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl;
}
}

ASSERT_EQ(opened_files.size(), 2);
ASSERT_EQ(closed_files.size(), 2);

for (size_t i = 0; i < 2; i++) {
auto expected_closed = fs::path(tmp_dir_storage_options_.uri) /
(bag_base_dir_ + "_" + std::to_string(i) + "." + DefaultTestCompressor);
auto expected_opened = (i == 1) ?
// The last opened file shall be empty string when we do "writer->close();"
fs::path("") : fs::path(tmp_dir_storage_options_.uri) /
(bag_base_dir_ + "_" + std::to_string(i + 1));
ASSERT_STREQ(closed_files[i].c_str(), expected_closed.string().c_str());
ASSERT_STREQ(opened_files[i].c_str(), expected_opened.string().c_str());
}
}

INSTANTIATE_TEST_SUITE_P(
SequentialCompressionWriterTestQueueSizes,
SequentialCompressionWriterTest,
Expand Down
1 change: 1 addition & 0 deletions rosbag2_cpp/src/rosbag2_cpp/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ void Writer::remove_topic(const rosbag2_storage::TopicMetadata & topic_with_type

bool Writer::take_snapshot()
{
std::lock_guard<std::mutex> writer_lock(writer_mutex_);
return writer_impl_->take_snapshot();
}

Expand Down
20 changes: 18 additions & 2 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,13 @@ void SequentialWriter::init_metadata()
metadata_.storage_identifier = storage_->get_storage_identifier();
metadata_.starting_time = std::chrono::time_point<std::chrono::high_resolution_clock>(
std::chrono::nanoseconds::max());
metadata_.duration = std::chrono::nanoseconds(0);
metadata_.relative_file_paths = {strip_parent_path(storage_->get_relative_file_path())};
rosbag2_storage::FileInformation file_info{};
file_info.path = strip_parent_path(storage_->get_relative_file_path());
file_info.starting_time = std::chrono::time_point<std::chrono::high_resolution_clock>(
std::chrono::nanoseconds::max());
file_info.duration = std::chrono::nanoseconds(0);
file_info.message_count = 0;
metadata_.files = {file_info};
}
Expand Down Expand Up @@ -353,7 +355,7 @@ void SequentialWriter::write(std::shared_ptr<rosbag2_storage::SerializedBagMessa
is_first_message_ = false;
}

if (should_split_bagfile(message_timestamp)) {
if (!storage_options_.snapshot_mode && should_split_bagfile(message_timestamp)) {
split_bagfile();
metadata_.files.back().starting_time = message_timestamp;
}
Expand Down Expand Up @@ -385,10 +387,13 @@ void SequentialWriter::write(std::shared_ptr<rosbag2_storage::SerializedBagMessa
bool SequentialWriter::take_snapshot()
{
if (!storage_options_.snapshot_mode) {
ROSBAG2_CPP_LOG_WARN("SequentialWriter take_snaphot called when snapshot mode is disabled");
ROSBAG2_CPP_LOG_WARN("SequentialWriter take_snapshot called when snapshot mode is disabled");
return false;
}
// Note: Information about start, duration and num messages for the current file in metadata_
// will be updated in the write_messages(..), when cache_consumer call it as a callback.
message_cache_->notify_data_ready();
split_bagfile();
return true;
}

Expand Down Expand Up @@ -454,6 +459,17 @@ void SequentialWriter::write_messages(
return;
}
storage_->write(messages);
if (storage_options_.snapshot_mode) {
// Update FileInformation about the last file in metadata in case of snapshot mode
const auto first_msg_timestamp = std::chrono::time_point<std::chrono::high_resolution_clock>(
std::chrono::nanoseconds(messages.front()->time_stamp));
const auto last_msg_timestamp = std::chrono::time_point<std::chrono::high_resolution_clock>(
std::chrono::nanoseconds(messages.back()->time_stamp));
metadata_.files.back().starting_time = first_msg_timestamp;
metadata_.files.back().duration = last_msg_timestamp - first_msg_timestamp;
metadata_.files.back().message_count = messages.size();
}
metadata_.message_count += messages.size();
std::lock_guard<std::mutex> lock(topics_info_mutex_);
for (const auto & msg : messages) {
if (topics_names_to_info_.find(msg->topic_name) != topics_names_to_info_.end()) {
Expand Down
195 changes: 195 additions & 0 deletions rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,201 @@ TEST_F(SequentialWriterTest, snapshot_mode_zero_cache_size_throws_exception)
EXPECT_THROW(writer_->open(storage_options_, {rmw_format, rmw_format}), std::runtime_error);
}

TEST_F(SequentialWriterTest, snapshot_writes_to_new_file_with_bag_split)
{
storage_options_.max_bagfile_size = 0;
storage_options_.max_cache_size = 200;
storage_options_.snapshot_mode = true;
const rcutils_time_point_value_t first_msg_timestamp = 100;
const size_t num_msgs_to_write = 100;
const std::string topic_name = "test_topic";
std::string msg_content = "Hello";
const size_t serialized_msg_buffer_length = msg_content.length();
const size_t num_expected_msgs = storage_options_.max_cache_size / serialized_msg_buffer_length;
const size_t expected_start_time = first_msg_timestamp + (num_msgs_to_write - num_expected_msgs);
const auto expected_last_msg_timestamp = (first_msg_timestamp + num_msgs_to_write - 1);
const size_t expected_duration = expected_last_msg_timestamp - expected_start_time;
// Prepare vector of messages
std::vector<rosbag2_storage::SerializedBagMessageSharedPtr> messages;
for (size_t i = 0; i < num_msgs_to_write; i++) {
auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->time_stamp = first_msg_timestamp + static_cast<rcutils_time_point_value_t>(i);
message->topic_name = topic_name;
message->serialized_data =
rosbag2_storage::make_serialized_message(msg_content.c_str(), msg_content.length());
messages.push_back(message);
}

// Expect a single write call when the snapshot is triggered
EXPECT_CALL(
*storage_, write(
An<const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> &>())
).Times(1);

ON_CALL(
*storage_,
write(An<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>>())).WillByDefault(
[this](std::shared_ptr<const rosbag2_storage::SerializedBagMessage>) {
fake_storage_size_ += 1;
});

ON_CALL(*storage_, get_bagfile_size).WillByDefault(
[this]() {
return fake_storage_size_.load();
});

ON_CALL(*metadata_io_, write_metadata).WillByDefault(
[this](const std::string &, const rosbag2_storage::BagMetadata & metadata) {
fake_metadata_ = metadata;
});

ON_CALL(*storage_, get_relative_file_path).WillByDefault(
[this]() {
return fake_storage_uri_;
});

auto sequential_writer = std::make_unique<rosbag2_cpp::writers::SequentialWriter>(
std::move(storage_factory_), converter_factory_, std::move(metadata_io_));
writer_ = std::make_unique<rosbag2_cpp::Writer>(std::move(sequential_writer));

std::vector<std::string> closed_files;
std::vector<std::string> opened_files;
rosbag2_cpp::bag_events::WriterEventCallbacks callbacks;
callbacks.write_split_callback =
[&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) {
closed_files.emplace_back(info.closed_file);
opened_files.emplace_back(info.opened_file);
};
writer_->add_event_callbacks(callbacks);

std::string rmw_format = "rmw_format";

writer_->open(storage_options_, {rmw_format, rmw_format});
writer_->create_topic({"test_topic", "test_msgs/BasicTypes", "", ""});

for (const auto & message : messages) {
writer_->write(message);
}
writer_->take_snapshot();
writer_->close();

EXPECT_THAT(closed_files.size(), 2);
EXPECT_THAT(opened_files.size(), 2);

if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == 2))) {
// Output debug info
for (size_t i = 0; i < opened_files.size(); i++) {
std::cout << "opened_file[" << i << "] = '" << opened_files[i] <<
"'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl;
}
}

ASSERT_EQ(opened_files.size(), 2);
ASSERT_EQ(closed_files.size(), 2);

for (size_t i = 0; i < 2; i++) {
auto expected_closed =
fs::path(storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i));
auto expected_opened = (i == 1) ?
// The last opened file shall be empty string when we do "writer->close();"
fs::path("") : fs::path(storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i + 1));
ASSERT_STREQ(closed_files[i].c_str(), expected_closed.string().c_str());
ASSERT_STREQ(opened_files[i].c_str(), expected_opened.string().c_str());
}
// Check metadata
EXPECT_EQ(fake_metadata_.message_count, num_expected_msgs);
EXPECT_EQ(
std::chrono::time_point_cast<std::chrono::nanoseconds>(
fake_metadata_.starting_time).time_since_epoch().count(), first_msg_timestamp);

ASSERT_FALSE(fake_metadata_.files.empty());

const auto & first_file_info = fake_metadata_.files[0];
EXPECT_STREQ(first_file_info.path.c_str(), std::string(bag_base_dir_ + "_0").c_str());
EXPECT_EQ(first_file_info.message_count, num_expected_msgs);
EXPECT_EQ(
std::chrono::time_point_cast<std::chrono::nanoseconds>(
first_file_info.starting_time).time_since_epoch().count(), expected_start_time);
EXPECT_EQ(first_file_info.duration.count(), expected_duration);
}

TEST_F(SequentialWriterTest, snapshot_can_be_called_twice)
{
storage_options_.max_bagfile_size = 0;
storage_options_.max_cache_size = 200;
storage_options_.snapshot_mode = true;
const size_t num_msgs_to_write = 100;

// Expect to call write method twice. Once per each snapshot.
EXPECT_CALL(
*storage_, write(
An<const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> &>())
).Times(2);

ON_CALL(*storage_, get_relative_file_path).WillByDefault(
[this]() {
return fake_storage_uri_;
});

auto sequential_writer = std::make_unique<rosbag2_cpp::writers::SequentialWriter>(
std::move(storage_factory_), converter_factory_, std::move(metadata_io_));
writer_ = std::make_unique<rosbag2_cpp::Writer>(std::move(sequential_writer));

std::vector<std::string> closed_files;
std::vector<std::string> opened_files;
rosbag2_cpp::bag_events::WriterEventCallbacks callbacks;
callbacks.write_split_callback =
[&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) {
closed_files.emplace_back(info.closed_file);
opened_files.emplace_back(info.opened_file);
};
writer_->add_event_callbacks(callbacks);

std::string rmw_format = "rmw_format";

writer_->open(storage_options_, {rmw_format, rmw_format});
writer_->create_topic({"test_topic", "test_msgs/BasicTypes", "", ""});

std::string msg_content = "Hello";
auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = "test_topic";
message->serialized_data =
rosbag2_storage::make_serialized_message(msg_content.c_str(), msg_content.length());

for (size_t i = 0; i < num_msgs_to_write / 2; i++) {
writer_->write(message);
}
writer_->take_snapshot();

for (size_t i = num_msgs_to_write / 2; i < num_msgs_to_write; i++) {
writer_->write(message);
}
writer_->take_snapshot();

EXPECT_THAT(closed_files.size(), 2);
EXPECT_THAT(opened_files.size(), 2);

if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == 2))) {
// Output debug info
for (size_t i = 0; i < opened_files.size(); i++) {
std::cout << "opened_file[" << i << "] = '" << opened_files[i] <<
"'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl;
}
}

ASSERT_EQ(opened_files.size(), 2);
ASSERT_EQ(closed_files.size(), 2);

for (size_t i = 0; i < opened_files.size(); i++) {
const auto expected_closed = fs::path(storage_options_.uri) /
(bag_base_dir_ + "_" + std::to_string(i));
const auto expected_opened = fs::path(storage_options_.uri) /
(bag_base_dir_ + "_" + std::to_string(i + 1));
ASSERT_STREQ(closed_files[i].c_str(), expected_closed.string().c_str());
ASSERT_STREQ(opened_files[i].c_str(), expected_opened.string().c_str());
}
}

TEST_F(SequentialWriterTest, split_event_calls_callback)
{
const uint64_t max_bagfile_size = 3;
Expand Down
Loading