Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Lock-free Queue #253

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
561d370
add libboost-dev dependency
saikishor Dec 19, 2024
2455b16
Add LockFreeBuffer class
saikishor Dec 19, 2024
02db30d
Add LockFreeSPSCQueueBase class
saikishor Dec 28, 2024
1468fe8
Add more functionality and tests
saikishor Dec 29, 2024
2ef8020
Add tests to CMakeLists
saikishor Dec 30, 2024
268138b
Draft: functional lockfree queue (not spsc_queue)
saikishor Dec 30, 2024
5e94cc9
Add more template enabled methods
saikishor Dec 31, 2024
34d03b5
update tests
saikishor Dec 31, 2024
6b629b1
Update the LockFreeQueue to completely support SPSC and MPMC queues +…
saikishor Dec 31, 2024
9a9951d
cleanup and change license
saikishor Jan 1, 2025
8c2c974
Update the documentation and add some minor tests
saikishor Jan 1, 2025
c991609
Merge branch 'ros-controls:master' into boost/lockfree/realtime/buffer
saikishor Jan 1, 2025
860029e
Add is_lock_free method
saikishor Jan 1, 2025
fe75f45
Update documentation
saikishor Jan 1, 2025
52f53b1
Add get_latest method to the LockFreeQueue
saikishor Jan 1, 2025
356d87e
update CMakeLists.txt
saikishor Jan 2, 2025
ac7ae53
Apply suggestions from code review
saikishor Jan 2, 2025
66dcdfd
Change template arg LockFreeSPSCContainer to LockFreeContainer
saikishor Jan 2, 2025
5d990c0
Install boost
christophfroehlich Jan 2, 2025
71321b3
cleanup the commented tests
saikishor Jan 2, 2025
4f61624
Update include/realtime_tools/lock_free_queue.hpp
saikishor Jan 3, 2025
9a8acdf
Update tests
saikishor Jan 3, 2025
ddb5ca9
Update condition for bounded push and update documentation
saikishor Jan 3, 2025
494b47d
Install boost in Windows CI
christophfroehlich Jan 3, 2025
8d1bae4
Unify empty method with internal conditioning of SPSC queue or MPMC q…
saikishor Jan 3, 2025
f90d22b
Try to set CMP0167 for boost on windows
christophfroehlich Jan 3, 2025
081d2f6
Remove quiet and add Boost::boost
christophfroehlich Jan 3, 2025
2f8d285
Remove atomic link libraries
christophfroehlich Jan 3, 2025
fff6ee3
Try to fix msvc atomic
christophfroehlich Jan 3, 2025
2bb6519
Try to fix atomic.lib on msvc
christophfroehlich Jan 3, 2025
551a9fb
Don't link atomic on windows
christophfroehlich Jan 3, 2025
689f63c
Use master branch of CI repo
christophfroehlich Jan 3, 2025
b8becad
Simplify push method and also add fixed_size arg to the MPMC queue
saikishor Jan 9, 2025
bd74a53
reuse tests to test all constructable types of LockFreeQueues
saikishor Jan 9, 2025
0d3fc7e
Merge branch 'master' into boost/lockfree/realtime/buffer
saikishor Jan 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/rolling-binary-build-win.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ jobs:
ros_distro: jazzy
ref_for_scheduled_build: master
os_name: windows-2019
install_boost: true
17 changes: 17 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ foreach(Dependency IN ITEMS ${THIS_PACKAGE_INCLUDE_DEPENDS})
find_package(${Dependency} REQUIRED)
endforeach()

if(POLICY CMP0167)
cmake_policy(SET CMP0167 NEW)
endif()
find_package(Boost COMPONENTS headers)
if(NOT Boost_headers_FOUND)
find_package(Boost REQUIRED)
endif()

add_library(realtime_tools SHARED
src/realtime_clock.cpp
src/realtime_helpers.cpp
Expand Down Expand Up @@ -74,6 +82,15 @@ if(BUILD_TESTING)
ament_add_gmock(realtime_buffer_tests test/realtime_buffer_tests.cpp)
target_link_libraries(realtime_buffer_tests realtime_tools)

ament_add_gmock(lock_free_queue_tests test/lock_free_queue_tests.cpp)
if(WIN32)
# atomic is not found on Windows, but also not needed
target_link_libraries(lock_free_queue_tests realtime_tools Boost::boost)
else()
# without adding atomic, clang throws a linker error
target_link_libraries(lock_free_queue_tests realtime_tools atomic Boost::boost)
endif()

ament_add_gmock(realtime_clock_tests test/realtime_clock_tests.cpp)
target_link_libraries(realtime_clock_tests realtime_tools)

Expand Down
333 changes: 333 additions & 0 deletions include/realtime_tools/lock_free_queue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,333 @@
// Copyright 2025 PAL Robotics S.L.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/// \author Sai Kishor Kothakota

#ifndef REALTIME_TOOLS__LOCK_FREE_QUEUE_HPP_
#define REALTIME_TOOLS__LOCK_FREE_QUEUE_HPP_

#include <chrono>
#include <mutex>
#include <type_traits>
#include <utility>

#include <boost/lockfree/queue.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/lockfree/stack.hpp>

namespace
{
// Trait to check if the capacity is set
template <typename T>
struct has_capacity : std::false_type
{
};

template <typename T, std::size_t Capacity>
struct has_capacity<boost::lockfree::spsc_queue<T, boost::lockfree::capacity<Capacity>>>
: std::true_type
{
};

template <typename T, std::size_t Capacity>
struct has_capacity<boost::lockfree::queue<T, boost::lockfree::capacity<Capacity>>> : std::true_type
{
};

template <typename T, std::size_t Capacity, bool FixedSize>
struct has_capacity<boost::lockfree::queue<
T, boost::lockfree::capacity<Capacity>, boost::lockfree::fixed_sized<FixedSize>>> : std::true_type
{
};

// Traits to check if the queue is spsc_queue
template <typename T>
struct is_spsc_queue : std::false_type
{
};

template <typename T>
struct is_spsc_queue<boost::lockfree::spsc_queue<T>> : std::true_type
{
};

template <typename T, std::size_t Capacity>
struct is_spsc_queue<boost::lockfree::spsc_queue<T, boost::lockfree::capacity<Capacity>>>
: std::true_type
{
};

// Traits to get the capacity of the queue
// Default case: no capacity
template <typename T>
struct get_boost_lockfree_queue_capacity
{
static constexpr std::size_t value = 0; // Default to 0 if capacity is not defined
};

// Specialization for queues with capacity
template <typename T, std::size_t Capacity>
struct get_boost_lockfree_queue_capacity<
boost::lockfree::spsc_queue<T, boost::lockfree::capacity<Capacity>>>
{
static constexpr std::size_t value = Capacity;
};

// Specialization for queues with capacity
template <typename T, std::size_t Capacity>
struct get_boost_lockfree_queue_capacity<
boost::lockfree::queue<T, boost::lockfree::capacity<Capacity>>>
{
static constexpr std::size_t value = Capacity;
};

// Specialization for queues with capacity
template <typename T, std::size_t Capacity, bool FixedSize>
struct get_boost_lockfree_queue_capacity<boost::lockfree::queue<
T, boost::lockfree::capacity<Capacity>, boost::lockfree::fixed_sized<FixedSize>>>
{
static constexpr std::size_t value = Capacity;
};

} // namespace

namespace realtime_tools
{
template <typename DataType, typename LockFreeContainer>
class LockFreeQueueBase
/**
* @brief A base class for lock-free queues.
*
* This class provides a base implementation for lock-free queues with various functionalities
* such as pushing, popping, and checking the state of the queue. It supports both single-producer
* single-consumer (SPSC) and multiple-producer multiple-consumer (MPMC) queues.
*
* @tparam DataType The type of data to be stored in the queue.
* @tparam LockFreeContainer The underlying lock-free container type - Typically boost::lockfree::spsc_queue or boost::lockfree::queue with their own template parameters
*/
{
public:
using T = DataType;

/**
* @brief Construct a new LockFreeQueueBase object
* @note This constructor is enabled only if the LockFreeContainer has a capacity set
*/
template <
bool HasCapacity = has_capacity<LockFreeContainer>::value,
typename std::enable_if_t<HasCapacity, int> = 0>
LockFreeQueueBase() : capacity_(get_boost_lockfree_queue_capacity<LockFreeContainer>::value)
{
}

/**
* @brief Construct a new LockFreeQueueBase object
* @param capacity Capacity of the queue
* @note This constructor is enabled only if the LockFreeContainer has no capacity set
*/
template <
bool HasCapacity = has_capacity<LockFreeContainer>::value,
typename std::enable_if_t<!HasCapacity, int> = 1>
explicit LockFreeQueueBase(std::size_t capacity) : data_queue_(capacity), capacity_(capacity)
{
}

virtual ~LockFreeQueueBase() = default;

/**
* @brief Pop the data from the queue
* @param data Data to be popped
* @return true If the data is popped successfully
* @return false If the queue is empty or the data could not be popped
*/
[[nodiscard]] bool pop(T & data) { return data_queue_.pop(data); }

/**
* @brief Pop the data from the queue
* @param data Data to be popped
* @return true If the data is popped successfully
* @return false If the queue is empty or the data could not be popped
* @note This function is enabled only if the data type is convertible to the template type of the queue
*/
template <typename U>
[[nodiscard]] std::enable_if_t<std::is_convertible_v<T, U>, bool> pop(U & data)
{
return data_queue_.pop(data);
}

/**
* @brief Push the data into the queue
* @param data Data to be pushed
* @return true If the data is pushed successfully
* @return false If the queue is full or the data could not be pushed
*/

[[nodiscard]] bool push(const T & data)
{
if constexpr (is_spsc_queue<LockFreeContainer>::value) {
return data_queue_.push(data);
} else {
return data_queue_.bounded_push(data);
}
}

/**
* @brief Push the data into the queue
* @param data Data to be pushed
* @return true If the data is pushed successfully
* @return false If the queue is full or the data could not be pushed
* @note This function is enabled only if the data type is convertible to the template type of the queue
*/
template <typename U>
[[nodiscard]] std::enable_if_t<std::is_convertible_v<T, U>, bool> push(const U & data)
{
if constexpr (is_spsc_queue<LockFreeContainer>::value) {
return data_queue_.push(data);
} else {
return data_queue_.bounded_push(data);
}
}

/**
* @brief The bounded_push function pushes the data into the queue and pops the oldest data if the queue is full
* @param data Data to be pushed
* @return true If the data is pushed successfully
* @return false If the data could not be pushed
* @note This function is enabled only if the queue is a spsc_queue and only if the data type
* is convertible to the template type of the queue
* @note For a SPSC Queue, to be used in single-threaded applications
* @warning For a SPSC Queue, this method might not work as expected when used in multi-threaded applications
* if it is used with two different threads, one doing bounded_push and the other doing pop. In this case, the
* queue is no longer a single producer single consumer queue. So, the behavior might not be as expected.
*/
template <typename U>
[[nodiscard]] std::enable_if_t<std::is_convertible_v<T, U>, bool> bounded_push(const U & data)
{
if (!push(data)) {
T dummy;
data_queue_.pop(dummy);
return push(data);
}
return true;
}

/**
* @brief Get the latest data from the queue
* @param data Data to be filled with the latest data
* @return true If the data is filled with the latest data, false otherwise
* @return false If the queue is empty
* @note This function consumes all the data in the queue until the last data and returns the last element of the queue
*/
[[nodiscard]] bool get_latest(T & data)
{
return data_queue_.consume_all([&data](const T & d) { data = d; }) > 0;
}

/**
* @brief Check if the queue is empty
* @return true If the queue is empty
* @return false If the queue is not empty
* @note The result is only accurate, if no other thread modifies the queue. Therefore, it is rarely practical to use this value in program logic.
* @note For a SPSC Queue, it should only be called from the consumer thread where pop is called. If need to be called from producer thread, use write_available() instead.
*/
bool empty() const
{
if constexpr (is_spsc_queue<LockFreeContainer>::value) {
return data_queue_.read_available() == 0;
} else {
return data_queue_.empty();
}
}

/**
* @brief Get the capacity of the queue
* @return std::size_t Capacity of the queue
*/
size_t capacity() const { return capacity_; }

/**
* @brief Get the size of the queue
* @return std::size_t Size of the queue
* @note This function is enabled only if the queue is a spsc_queue
*/
template <
bool IsSPSCQueue = is_spsc_queue<LockFreeContainer>::value,
typename std::enable_if_t<IsSPSCQueue, int> = 0>
std::size_t size() const
{
return data_queue_.read_available();
}

/**
* @brief The method to check if the queue is lock free
* @return true If the queue is lock free, false otherwise
* @warning It only checks, if the queue head and tail nodes and the freelist can
* be modified in a lock-free manner. On most platforms, the whole implementation
* is lock-free, if this is true. Using c++0x-style atomics, there is no possibility
* to provide a completely accurate implementation, because one would need to test
* every internal node, which is impossible if further nodes will be allocated from
* the operating system.
* @link https://www.boost.org/doc/libs/1_74_0/doc/html/boost/lockfree/queue.html
*/
bool is_lock_free() const
{
return (is_spsc_queue<LockFreeContainer>::value) || data_queue_.is_lock_free();
}

/**
* @brief Get the lockfree container
* @return const LockFreeContainer& Reference to the lockfree container
*/
const LockFreeContainer & get_lockfree_container() const { return data_queue_; }

/**
* @brief Get the lockfree container
* @return LockFreeContainer& Reference to the lockfree container
*/
LockFreeContainer & get_lockfree_container() { return data_queue_; }

private:
LockFreeContainer data_queue_;
std::size_t capacity_;
}; // class

/**
* @brief Lock-free Single Producer Single Consumer Queue
* @tparam DataType Type of the data to be stored in the queue
* @tparam Capacity Capacity of the queue
*/
template <class DataType, std::size_t Capacity = 0>
using LockFreeSPSCQueue = std::conditional_t<
Capacity == 0, LockFreeQueueBase<DataType, boost::lockfree::spsc_queue<DataType>>,
LockFreeQueueBase<
DataType, boost::lockfree::spsc_queue<DataType, boost::lockfree::capacity<Capacity>>>>;

/**
* @brief Lock-free Multiple Producer Multiple Consumer Queue
* @tparam DataType Type of the data to be stored in the queue
* @tparam Capacity Capacity of the queue
* @tparam FixedSize Fixed size of the queue
*/
template <class DataType, std::size_t Capacity = 0, bool FixedSize = true>
using LockFreeMPMCQueue = std::conditional_t<
Capacity == 0,
LockFreeQueueBase<
DataType, boost::lockfree::queue<DataType, boost::lockfree::fixed_sized<FixedSize>>>,
LockFreeQueueBase<
DataType,
boost::lockfree::queue<
DataType, boost::lockfree::capacity<Capacity>, boost::lockfree::fixed_sized<FixedSize>>>>;

} // namespace realtime_tools
#endif // REALTIME_TOOLS__LOCK_FREE_QUEUE_HPP_
1 change: 1 addition & 0 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

<buildtool_export_depend>ament_cmake</buildtool_export_depend>

<depend>libboost-dev</depend>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a smaller dependency by any chance?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/ros/rosdistro/blob/master/rosdep/base.yaml#L2833-L2842

It is basic core one

If boost is included, then it pulls in everything

<depend>rclcpp</depend>
<depend>rclcpp_action</depend>
<depend>libcap-dev</depend>
Expand Down
Loading
Loading