Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add options to conditionally stop the recording #1896

Draft
wants to merge 1 commit into
base: rolling
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,18 @@ Messages written to the bag will use the latest received value of `/clock` for t
Note: Until the first `/clock` message is received, the recorder will not write any messages.
Before that message is received, the time is 0, which leads to a significant time jump once simulation time begins, making the bag essentially unplayable if messages are written first with time 0 and then time N from `/clock`.

#### Recording with termination conditions

rosbag2 can be configured to stop recording after a certain size, duration or number of messages is reached (considering **all** the bag files of the current recording). By default the recording will continue until an external signal is stopping it, but this can be changed using the CLI options.

_Stopping by size_: `ros2 bag record -a --max-recording-size 100000` will stop the recording after it becomes greater than 100 kilobytes. This option defaults to `0`, which means the recording will not stop based on size.

_Stopping by duration_: `ros2 bag record -a --max-recording-duration 9000` will stop the recording after `9000` seconds. This option defaults to `0`, which means the recording will not stop based on duration.

_Stopping by number of messages_: `ros2 bag record -a --max-recording-messages 1000` will stop the recording after `1000` messages have been recorded. This option defaults to `0`, which means the recording will not stop based on number of messages.

If multiple stopping conditions are enabled, the recording will stop at whichever threshold is reached first.

#### Splitting files during recording

rosbag2 offers the capability to split bag files when they reach a maximum size or after a specified duration. By default rosbag2 will record all data into a single bag file, but this can be changed using the CLI options.
Expand Down Expand Up @@ -253,6 +265,9 @@ output_bags:
storage_id: "" # will use the default storage plugin, if unspecified
max_bagfile_size: 0
max_bagfile_duration: 0
max_recording_size: 0
max_recording_duration: 0
max_recording_messages: 0
storage_preset_profile: ""
storage_config_uri: ""
all_topics: false
Expand Down
17 changes: 16 additions & 1 deletion ros2bag/ros2bag/verb/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,18 @@ def add_recorder_arguments(parser: ArgumentParser) -> None:
'Default: %(default)d, recording written in single bagfile and splitting '
'is disabled. If both splitting by size and duration are enabled, '
'the bag will split at whichever threshold is reached first.')
parser.add_argument(
'--max-recording-size', type=int, default=0,
help='Maximum size in bytes before the recording will be stopped. '
'Default: %(default)d, recording will not stop based on size.')
parser.add_argument(
'--max-recording-duration', type=int, default=0,
help='Maximum duration in seconds before the recording will be stopped. '
'Default: %(default)d, recording will not stop based on duration.')
parser.add_argument(
'--max-recording-messages', type=int, default=0,
help='Maximum number of messages before the recording will be stopped. '
'Default: %(default)d, recording will not stop based on number of messages.')
parser.add_argument(
'--max-cache-size', type=int, default=100 * 1024 * 1024,
help='Maximum size (in bytes) of messages to hold in each buffer of cache. '
Expand Down Expand Up @@ -325,7 +337,10 @@ def main(self, *, args): # noqa: D102
storage_preset_profile=args.storage_preset_profile,
storage_config_uri=storage_config_file,
snapshot_mode=args.snapshot_mode,
custom_data=custom_data
custom_data=custom_data,
max_recording_size=args.max_recording_size,
max_recording_duration=args.max_recording_duration,
max_recording_messages=args.max_recording_messages,
)
record_options = RecordOptions()
record_options.all_topics = args.all_topics or args.all
Expand Down
16 changes: 16 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/bag_events.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ enum class BagEvent
WRITE_SPLIT,
/// Reading of the input bag file has gone over a split, opening the next file.
READ_SPLIT,
/// The recording has reached a predetermined stopping condition.
STOP_RECORDING
};

/**
Expand All @@ -51,7 +53,19 @@ struct BagSplitInfo
std::string opened_file;
};

/**
* \brief The information structure passed to callbacks for the STOP_RECORDING event.
*/
struct StopRecordingInfo
{
/// The base folder where the recording was taking place.
std::string base_folder;
/// The reason for stopping the recording.
std::string reason;
};

using BagSplitCallbackType = std::function<void (BagSplitInfo &)>;
using BagStopRecordingCallbackType = std::function<void (StopRecordingInfo &)>;

/**
* \brief Use this structure to register callbacks with Writers.
Expand All @@ -60,6 +74,8 @@ struct WriterEventCallbacks
{
/// The callback to call for the WRITE_SPLIT event.
BagSplitCallbackType write_split_callback;
/// The callback to call for the STOP_RECORDING event.
BagStopRecordingCallbackType stop_recording_callback;
};

/**
Expand Down
32 changes: 32 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/stop_recording_options.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2025 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_CPP__STOP_RECORDING_OPTIONS_HPP_
#define ROSBAG2_CPP__STOP_RECORDING_OPTIONS_HPP_

#include <cstdint>

namespace rosbag2_cpp
{

struct StopRecordingOptions
{
uint64_t max_duration;
uint64_t max_size;
uint64_t max_messages;
};

} // namespace rosbag2_cpp

#endif // ROSBAG2_CPP__STOP_RECORDING_OPTIONS_HPP_
1 change: 1 addition & 0 deletions rosbag2_cpp/include/rosbag2_cpp/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "rosbag2_cpp/bag_events.hpp"
#include "rosbag2_cpp/converter_options.hpp"
#include "rosbag2_cpp/stop_recording_options.hpp"
#include "rosbag2_cpp/visibility_control.hpp"
#include "rosbag2_cpp/writers/sequential_writer.hpp"

Expand Down
23 changes: 23 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
void execute_bag_split_callbacks(
const std::string & closed_file, const std::string & opened_file);

void signal_stop_recording(const std::string & out_stop_reason);

void execute_signal_stop_recording_callbacks(const std::string & out_stop_reason);

void switch_to_next_storage();

std::string format_storage_uri(
Expand All @@ -183,6 +187,16 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
bool should_split_bagfile(
const std::chrono::time_point<std::chrono::high_resolution_clock> & current_time) const;

// Checks if the recording needs to be stopped due to predefined conditions
// before writing the current message.
bool should_stop_recording_pre_write(
const std::chrono::time_point<std::chrono::high_resolution_clock> & current_time,
std::string & stop_reason) const;

// Checks if the recording needs to be stopped due to predefined conditions
// after writing the current message.
bool should_stop_recording_post_write(std::string & stop_reason) const;

// Checks if the message to be written is within accepted time range
bool message_within_accepted_time_range(
const rcutils_time_point_value_t current_time) const;
Expand All @@ -200,6 +214,15 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
get_writeable_message(
std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message);

// Used to track the approximate size of the previously splitted bags.
uint64_t previous_bags_size_ {0};

// Used to track the number of messages in the previously splitted bags.
uint64_t previous_bags_num_messages_ {0};

// Used to track if the stop recording signal has been already sent.
bool signaled_stop_recording_ {false};

private:
/// Helper method to write messages while also updating tracked metadata.
void write_messages(
Expand Down
87 changes: 87 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,9 @@ void SequentialWriter::switch_to_next_storage()
message_cache_->log_dropped();
}

previous_bags_size_ += storage_->get_bagfile_size();
previous_bags_num_messages_ += metadata_.files.back().message_count;

storage_->update_metadata(metadata_);
storage_options_.uri = format_storage_uri(
base_folder_,
Expand Down Expand Up @@ -385,8 +388,30 @@ void SequentialWriter::split_bagfile()
(void)split_bagfile_local();
}

void SequentialWriter::signal_stop_recording(const std::string & reason)
{
if (signaled_stop_recording_) {
return;
}
execute_signal_stop_recording_callbacks(reason);
signaled_stop_recording_ = true;
}

void SequentialWriter::execute_signal_stop_recording_callbacks(
const std::string & reason)
{
auto info = std::make_shared<bag_events::StopRecordingInfo>();
info->base_folder = base_folder_;
info->reason = reason;
callback_manager_.execute_callbacks(bag_events::BagEvent::STOP_RECORDING, info);
}

void SequentialWriter::write(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message)
{
if (signaled_stop_recording_) {
return;
}

if (!is_open_) {
throw std::runtime_error("Bag is not open. Call open() before writing.");
}
Expand Down Expand Up @@ -415,6 +440,12 @@ void SequentialWriter::write(std::shared_ptr<const rosbag2_storage::SerializedBa
is_first_message_ = false;
}

std::string out_stop_reason;
if (should_stop_recording_pre_write(message_timestamp, out_stop_reason)) {
signal_stop_recording(out_stop_reason);
return;
}

if (!storage_options_.snapshot_mode && should_split_bagfile(message_timestamp)) {
split_bagfile();
metadata_.files.back().starting_time = message_timestamp;
Expand Down Expand Up @@ -442,6 +473,11 @@ void SequentialWriter::write(std::shared_ptr<const rosbag2_storage::SerializedBa
// Otherwise, use cache buffer
message_cache_->push(converted_msg);
}

if (should_stop_recording_post_write(out_stop_reason)) {
signal_stop_recording(out_stop_reason);
return;
}
}

bool SequentialWriter::take_snapshot()
Expand Down Expand Up @@ -508,6 +544,52 @@ bool SequentialWriter::message_within_accepted_time_range(
return true;
}

bool SequentialWriter::should_stop_recording_pre_write(
const std::chrono::time_point<std::chrono::high_resolution_clock> & current_time,
std::string & out_stop_reason) const
{
// Stopping by time
if (storage_options_.max_recording_duration !=
rosbag2_storage::storage_interfaces::MAX_RECORDING_DURATION_NO_STOP)
{
auto max_duration_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::seconds(storage_options_.max_recording_duration));
if ((current_time - metadata_.starting_time) > max_duration_ns) {
out_stop_reason = "max duration";
return true;
}
}

return false;
}

bool SequentialWriter::should_stop_recording_post_write(std::string & out_stop_reason) const
{
// Stopping by size
if (storage_options_.max_recording_size !=
rosbag2_storage::storage_interfaces::MAX_RECORDING_SIZE_NO_STOP)
{
if (previous_bags_size_ + storage_->get_bagfile_size() >= storage_options_.max_recording_size) {
out_stop_reason = "max size";
return true;
}
}

// Stopping by number of messages
if (storage_options_.max_recording_messages !=
rosbag2_storage::storage_interfaces::MAX_RECORDING_MESSAGES_NO_STOP)
{
if (previous_bags_num_messages_ + metadata_.files.back().message_count >=
storage_options_.max_recording_messages)
{
out_stop_reason = "max messages";
return true;
}
}

return false;
}

void SequentialWriter::finalize_metadata()
{
metadata_.bag_size = 0;
Expand Down Expand Up @@ -563,6 +645,11 @@ void SequentialWriter::add_event_callbacks(const bag_events::WriterEventCallback
callbacks.write_split_callback,
bag_events::BagEvent::WRITE_SPLIT);
}
if (callbacks.stop_recording_callback) {
callback_manager_.add_event_callback(
callbacks.stop_recording_callback,
bag_events::BagEvent::STOP_RECORDING);
}
}

} // namespace writers
Expand Down
1 change: 1 addition & 0 deletions rosbag2_interfaces/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
"msg/ReadSplitEvent.msg"
"msg/StopRecordingEvent.msg"
"msg/WriteSplitEvent.msg"
"srv/Burst.srv"
"srv/GetRate.srv"
Expand Down
8 changes: 8 additions & 0 deletions rosbag2_interfaces/msg/StopRecordingEvent.msg
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# The full path of the base folder where the recording was taking place.
string base_folder

# The reason for stopping the recording.
string reason

# The fully qualified node name of the event sender
string node_name
5 changes: 4 additions & 1 deletion rosbag2_py/rosbag2_py/_storage.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,16 @@ class StorageOptions:
max_bagfile_duration: int
max_bagfile_size: int
max_cache_size: int
max_recording_duration: int
max_recording_messages: int
max_recording_size: int
snapshot_mode: bool
start_time_ns: int
storage_config_uri: str
storage_id: str
storage_preset_profile: str
uri: str
def __init__(self, uri: str, storage_id: str = ..., max_bagfile_size: int = ..., max_bagfile_duration: int = ..., max_cache_size: int = ..., storage_preset_profile: str = ..., storage_config_uri: str = ..., snapshot_mode: bool = ..., start_time_ns: int = ..., end_time_ns: int = ..., custom_data: Dict[str, str] = ...) -> None: ...
def __init__(self, uri: str, storage_id: str = ..., max_bagfile_size: int = ..., max_bagfile_duration: int = ..., max_cache_size: int = ..., storage_preset_profile: str = ..., storage_config_uri: str = ..., snapshot_mode: bool = ..., start_time_ns: int = ..., end_time_ns: int = ..., custom_data: Dict[str, str] = ..., max_recording_size: int = ..., max_recording_duration: int = ..., max_recording_messages: int = ...) -> None: ...

class TopicInformation:
message_count: int
Expand Down
18 changes: 15 additions & 3 deletions rosbag2_py/src/rosbag2_py/_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ PYBIND11_MODULE(_storage, m) {
.def(
pybind11::init<
std::string, std::string, uint64_t, uint64_t, uint64_t, std::string, std::string, bool,
int64_t, int64_t, KEY_VALUE_MAP>(),
int64_t, int64_t, KEY_VALUE_MAP, uint64_t, uint64_t, uint64_t>(),
pybind11::arg("uri"),
pybind11::arg("storage_id") = "",
pybind11::arg("max_bagfile_size") = 0,
Expand All @@ -94,7 +94,10 @@ PYBIND11_MODULE(_storage, m) {
pybind11::arg("snapshot_mode") = false,
pybind11::arg("start_time_ns") = -1,
pybind11::arg("end_time_ns") = -1,
pybind11::arg("custom_data") = KEY_VALUE_MAP{})
pybind11::arg("custom_data") = KEY_VALUE_MAP{},
pybind11::arg("max_recording_size") = 0,
pybind11::arg("max_recording_duration") = 0,
pybind11::arg("max_recording_messages") = 0)
.def_readwrite("uri", &rosbag2_storage::StorageOptions::uri)
.def_readwrite("storage_id", &rosbag2_storage::StorageOptions::storage_id)
.def_readwrite(
Expand Down Expand Up @@ -123,7 +126,16 @@ PYBIND11_MODULE(_storage, m) {
&rosbag2_storage::StorageOptions::end_time_ns)
.def_readwrite(
"custom_data",
&rosbag2_storage::StorageOptions::custom_data);
&rosbag2_storage::StorageOptions::custom_data)
.def_readwrite(
"max_recording_size",
&rosbag2_storage::StorageOptions::max_recording_size)
.def_readwrite(
"max_recording_duration",
&rosbag2_storage::StorageOptions::max_recording_duration)
.def_readwrite(
"max_recording_messages",
&rosbag2_storage::StorageOptions::max_recording_messages);

pybind11::class_<rosbag2_storage::StorageFilter>(m, "StorageFilter")
.def(
Expand Down
Loading
Loading