Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jazzy] Add cli option compression-threads-priority (backport #1768) #1778

Merged
merged 1 commit into from
Aug 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions ros2bag/ros2bag/verb/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,17 @@ def add_recorder_arguments(parser: ArgumentParser) -> None:
'--compression-threads', type=int, default=0,
help='Number of files or messages that may be compressed in parallel. '
'Default is %(default)d, which will be interpreted as the number of CPU cores.')
parser.add_argument(
'--compression-threads-priority', type=int, default=0,
help='Compression threads scheduling priority. \nFor Windows the valid values are:'
' THREAD_PRIORITY_LOWEST=-2, THREAD_PRIORITY_BELOW_NORMAL=-1 and'
' THREAD_PRIORITY_NORMAL=0. Please refer to'
' https://learn.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-setthreadpriority' # noqa E501
' for details.\n'
'For POSIX compatible OSes this is the "nice" value. The nice value range is'
' -20 to +19 where -20 is highest, 0 default and +19 is lowest.'
' Please refer to https://man7.org/linux/man-pages/man2/nice.2.html for details.\n'
'Default is %(default)d.')
parser.add_argument(
'--compression-mode', type=str, default='none',
choices=['none', 'file', 'message'],
Expand Down Expand Up @@ -336,6 +347,7 @@ def main(self, *, args): # noqa: D102
record_options.compression_format = args.compression_format
record_options.compression_queue_size = args.compression_queue_size
record_options.compression_threads = args.compression_threads
record_options.compression_threads_priority = args.compression_threads_priority
record_options.topic_qos_profile_overrides = qos_profile_overrides
record_options.include_hidden_topics = args.include_hidden_topics
record_options.include_unpublished_topics = args.include_unpublished_topics
Expand Down
55 changes: 55 additions & 0 deletions ros2bag/test/test_record_with_compression_thread_priority.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Copyright 2024 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from pathlib import Path
import tempfile

import unittest

from launch import LaunchDescription
from launch.actions import ExecuteProcess

import launch_testing
import launch_testing.actions

import pytest


@pytest.mark.launch_test
def generate_test_description():
tmp_dir_name = tempfile.mkdtemp()
output_path = Path(tmp_dir_name) / 'ros2bag_test_record_with_compression_thread_priority'
record_process = ExecuteProcess(
cmd=['ros2', 'bag', 'record', '-a', '--output', output_path.as_posix(),
'--log-level', 'debug', '--compression-threads-priority', '1',
'--compression-mode', 'file', '--compression-format', 'zstd'],
name='ros2bag-cli',
output='screen',
)

return LaunchDescription([
record_process,
launch_testing.actions.ReadyToTest()
]), locals()


class TestRecordWithCompressionThreadPriority(unittest.TestCase):

def test_priority_propagated_into_compression_thread(
self, record_process, proc_output):
proc_output.assertWaitFor(
'Setting compression thread priority to 1',
timeout=45,
process=record_process
)
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ struct CompressionOptions
/// \brief // The number of compression threads
uint64_t compression_threads{0};
/// \brief If set, the compression thread(s) will try to set the given priority for itself
/// For Windows the valid values are: THREAD_PRIORITY_LOWEST, THREAD_PRIORITY_BELOW_NORMAL and
/// THREAD_PRIORITY_NORMAL. For POSIX compatible OSes this is the "nice" value.
/// For Windows the valid values are: THREAD_PRIORITY_LOWEST=-2, THREAD_PRIORITY_BELOW_NORMAL=-1
/// and THREAD_PRIORITY_NORMAL=0. For POSIX compatible OSes this is the "nice" value.
/// The nice value range is -20 to +19 where -20 is highest, 0 default and +19 is lowest.
std::optional<int32_t> thread_priority;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ SequentialCompressionWriter::~SequentialCompressionWriter()
void SequentialCompressionWriter::compression_thread_fn()
{
if (compression_options_.thread_priority) {
ROSBAG2_COMPRESSION_LOG_DEBUG_STREAM(
"Setting compression thread priority to "
<< *compression_options_.thread_priority);
#ifdef _WIN32
// This must match THREAD_PRIORITY_IDLE, THREAD_PRIORITY_LOWEST...
int wanted_thread_priority = *compression_options_.thread_priority;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>

const uint64_t kDefaultCompressionQueueSize = 1;
const uint64_t kDefaultCompressionQueueThreads = 4;
const std::optional<int32_t> kDefaultCompressionQueueThreadsPriority = std::nullopt;
const int32_t kDefaultCompressionQueueThreadsPriority = 0;
};

TEST_F(SequentialCompressionWriterTest, open_throws_on_empty_storage_options_uri)
Expand Down
1 change: 1 addition & 0 deletions rosbag2_py/rosbag2_py/_transport.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class RecordOptions:
compression_mode: str
compression_queue_size: int
compression_threads: int
compression_threads_priority: int
disable_keyboard_controls: bool
exclude_regex: str
exclude_service_events: List[str]
Expand Down
1 change: 1 addition & 0 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ PYBIND11_MODULE(_transport, m) {
.def_readwrite("compression_format", &RecordOptions::compression_format)
.def_readwrite("compression_queue_size", &RecordOptions::compression_queue_size)
.def_readwrite("compression_threads", &RecordOptions::compression_threads)
.def_readwrite("compression_threads_priority", &RecordOptions::compression_threads_priority)
.def_property(
"topic_qos_profile_overrides",
&RecordOptions::getTopicQoSProfileOverrides,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ struct RecordOptions
std::string compression_mode = "";
std::string compression_format = "";
uint64_t compression_queue_size = 1;
/// \brief // The number of compression threads
uint64_t compression_threads = 0;
/// \brief Compression threads scheduling priority.
/// For Windows the valid values are: THREAD_PRIORITY_LOWEST=-2, THREAD_PRIORITY_BELOW_NORMAL=-1
/// and THREAD_PRIORITY_NORMAL=0. For POSIX compatible OSes this is the "nice" value.
/// The nice value range is -20 to +19 where -20 is highest, 0 default and +19 is lowest.
int32_t compression_threads_priority = 0;
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides{};
bool include_hidden_topics = false;
Expand Down
Loading