From 72e985e81b64c8ea6982e680de19f051f090cca6 Mon Sep 17 00:00:00 2001 From: Roman Sokolkov Date: Wed, 31 Jul 2024 13:55:25 +0300 Subject: [PATCH] Add cli option compression-threads-priority Signed-off-by: Roman Sokolkov --- ros2bag/ros2bag/verb/record.py | 5 ++ ...record_with_compression_thread_priority.py | 54 +++++++++++++++++++ .../sequential_compression_writer.cpp | 2 + rosbag2_py/src/rosbag2_py/_transport.cpp | 1 + 4 files changed, 62 insertions(+) create mode 100644 ros2bag/test/test_record_with_compression_thread_priority.py diff --git a/ros2bag/ros2bag/verb/record.py b/ros2bag/ros2bag/verb/record.py index 733ff360a..20afc89f7 100644 --- a/ros2bag/ros2bag/verb/record.py +++ b/ros2bag/ros2bag/verb/record.py @@ -194,6 +194,10 @@ def add_recorder_arguments(parser: ArgumentParser) -> None: '--compression-threads', type=int, default=0, help='Number of files or messages that may be compressed in parallel. ' 'Default is %(default)d, which will be interpreted as the number of CPU cores.') + parser.add_argument( + '--compression-threads-priority', type=int, default=0, + help='Thread scheduling priority. Use nice value for Linux and ' + 'priority level for Windows. Default is %(default)d.') parser.add_argument( '--compression-mode', type=str, default='none', choices=['none', 'file', 'message'], @@ -338,6 +342,7 @@ def main(self, *, args): # noqa: D102 record_options.compression_format = args.compression_format record_options.compression_queue_size = args.compression_queue_size record_options.compression_threads = args.compression_threads + record_options.compression_threads_priority = args.compression_threads_priority record_options.topic_qos_profile_overrides = qos_profile_overrides record_options.include_hidden_topics = args.include_hidden_topics record_options.include_unpublished_topics = args.include_unpublished_topics diff --git a/ros2bag/test/test_record_with_compression_thread_priority.py b/ros2bag/test/test_record_with_compression_thread_priority.py new file mode 100644 index 000000000..31f59fd0d --- /dev/null +++ b/ros2bag/test/test_record_with_compression_thread_priority.py @@ -0,0 +1,54 @@ +# Copyright 2020 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path +import tempfile + +import unittest + +from launch import LaunchDescription +from launch.actions import ExecuteProcess + +import launch_testing +import launch_testing.actions + +import pytest + + +@pytest.mark.launch_test +def generate_test_description(): + tmp_dir_name = tempfile.mkdtemp() + output_path = Path(tmp_dir_name) / 'ros2bag_test_record_with_compression_thread_priority' + record_process = ExecuteProcess( + cmd=['ros2', 'bag', 'record', '-a', '--output', output_path.as_posix(), + '--log-level', 'debug', '--compression-threads-priority', '1', + '--compression-mode', 'file', '--compression-format', 'zstd'], + name='ros2bag-cli', + output='screen', + ) + + return LaunchDescription([ + record_process, + launch_testing.actions.ReadyToTest() + ]), locals() + + +class TestRecordWithCompressionThreadPriority(unittest.TestCase): + + def test_priority_propagated_into_compression_thread( + self, record_process, proc_output): + proc_output.assertWaitFor( + 'Setting compression thread priority to 1', + process=record_process + ) diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp index 8f5a89c61..d1d7f2e23 100644 --- a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp @@ -71,6 +71,8 @@ SequentialCompressionWriter::~SequentialCompressionWriter() void SequentialCompressionWriter::compression_thread_fn() { if (compression_options_.thread_priority) { + ROSBAG2_COMPRESSION_LOG_DEBUG_STREAM("Setting compression thread priority to " + << *compression_options_.thread_priority); #ifdef _WIN32 // This must match THREAD_PRIORITY_IDLE, THREAD_PRIORITY_LOWEST... int wanted_thread_priority = *compression_options_.thread_priority; diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index a92dd3684..9d15e82b1 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -573,6 +573,7 @@ PYBIND11_MODULE(_transport, m) { .def_readwrite("compression_format", &RecordOptions::compression_format) .def_readwrite("compression_queue_size", &RecordOptions::compression_queue_size) .def_readwrite("compression_threads", &RecordOptions::compression_threads) + .def_readwrite("compression_threads_priority", &RecordOptions::compression_threads_priority) .def_property( "topic_qos_profile_overrides", &RecordOptions::getTopicQoSProfileOverrides,