diff --git a/CMakeLists.txt b/CMakeLists.txt index 9da231a4..4b0f9606 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -35,6 +35,7 @@ set_target_properties(LCT PROPERTIES OUTPUT_NAME lct) set(CMAKE_THREAD_PREFER_PTHREAD TRUE) set(THREADS_PREFER_PTHREAD_FLAG TRUE) find_package(Threads REQUIRED) +target_include_directories(LCT PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}) target_link_libraries(LCT PUBLIC Threads::Threads) add_subdirectory(lct) diff --git a/dependency/CMakeLists.txt b/dependency/CMakeLists.txt index 79dd80bd..9c973f20 100644 --- a/dependency/CMakeLists.txt +++ b/dependency/CMakeLists.txt @@ -3,3 +3,6 @@ if(LCI_COMPILE_DREG) target_include_directories(LCI PRIVATE ucx) target_link_libraries(LCI PRIVATE lci-ucx) endif() + +add_subdirectory(ConcurrencyFreaks) +add_subdirectory(lprq) diff --git a/dependency/ConcurrencyFreaks/BitNextLazyHeadQueue.hpp b/dependency/ConcurrencyFreaks/BitNextLazyHeadQueue.hpp new file mode 100644 index 00000000..9245ccbb --- /dev/null +++ b/dependency/ConcurrencyFreaks/BitNextLazyHeadQueue.hpp @@ -0,0 +1,230 @@ +/****************************************************************************** + * Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Concurrency Freaks nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + *AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + *IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + ****************************************************************************** + */ + +#ifndef _BIT_NEXT_LAZY_HEAD_HP_H_ +#define _BIT_NEXT_LAZY_HEAD_HP_H_ + +#include +#include +#include "HazardPointers.hpp" + +/** + *

Bit Next Lazy Head Queue

+ * + * enqueue algorithm: bit-next, based on the trick of the bit on the next like + on Maged-Harris list + * dequeue algorithm: bit-next, based on the trick of the bit on the next like + on Maged-Harris list + * Consistency: Linearizable + * enqueue() progress: lock-free + * dequeue() progress: lock-free + * Memory Reclamation: Hazard Pointers + * Uncontended enqueue: 2 CAS + 1 HP + * Uncontended dequeue: 2 CAS + 1 HP + * + * TODO: + * Although this is like the Maged-Harris list, there is no sentinel head or + sentinel tail nodes. + * Nodes that have been dequeued may have their next pointing to (nullptr|0x1). + + * + * + *

+ * The paper on Hazard Pointers is named "Hazard Pointers: Safe Memory + * Reclamation for Lock-Free objects" and it is available here: + * http://web.cecs.pdx.edu/~walpole/class/cs510/papers/11.pdf + * + */ +template +class BitNextLazyHeadQueue +{ + private: + struct Node { + T* item; + std::atomic next; + Node(T* userItem) : item{userItem}, next{nullptr} {} + }; + + bool casTail(Node* cmp, Node* val) + { + return tail.compare_exchange_strong(cmp, val); + } + + bool casHead(Node* cmp, Node* val) + { + return head.compare_exchange_strong(cmp, val); + } + + struct HPGuard { + HazardPointers& hp; + const int tid; + HPGuard(HazardPointers& hp, const int tid) : hp{hp}, tid{tid} {} + ~HPGuard() { hp.clear(tid); } + }; + + // Pointers to head and tail of the list + alignas(128) std::atomic head; + alignas(128) std::atomic tail; + + static const int MAX_THREADS = 128; + const int maxThreads; + + // We need three hazard pointers for dequeue() + HazardPointers hp{3, maxThreads}; + const int kHpTail = 0; + const int kHpHead = 0; + const int kHpNext = 1; // and 2 (kHpNext+1) as well + + /* + * Bit-related functions + */ + bool isMarked(Node* node) { return ((size_t)node & 0x1); } + + Node* getMarked(Node* node) { return (Node*)((size_t)node | 0x1); } + + Node* getUnmarked(Node* node) { return (Node*)((size_t)node & (~0x1)); } + + void retireSubList(Node* start, Node* end, const int tid) + { + for (Node* node = start; node != end;) { + Node* lnext = getUnmarked(node->next.load()); + hp.retire(node, tid); + node = lnext; + } + } + + public: + BitNextLazyHeadQueue(int maxThreads = MAX_THREADS) : maxThreads{maxThreads} + { + Node* sentinelNode = new Node(nullptr); + // The sentinel is already "logically removed" + sentinelNode->next.store(getMarked(nullptr), std::memory_order_relaxed); + head.store(sentinelNode, std::memory_order_relaxed); + tail.store(sentinelNode, std::memory_order_relaxed); + } + + ~BitNextLazyHeadQueue() + { + while (dequeue(0) != nullptr) + ; // Drain the queue + delete head.load(); // Delete the last node + } + + std::string className() { return "BitNextLazyHeadQueue"; } + + /* + * Progress condition: lock-free + * + * If we don't know maxThreads, we can replace the for() loop with a + * while(true) and it will still be correct. + */ + void enqueue(T* item, const int tid) + { + if (item == nullptr) throw std::invalid_argument("item can not be nullptr"); + HPGuard hpguard{hp, tid}; // RAII to call hp.clear(tid) when returning + Node* newNode = new Node(item); + while (true) { + Node* ltail = hp.protectPtr(kHpTail, tail.load(), tid); + if (ltail != tail.load()) continue; + Node* lnext = ltail->next.load(); + if (getUnmarked(lnext) != nullptr) { // Advance the tail first + casTail(ltail, getUnmarked(lnext)); // "tail" is always unmarked + } else { + for (int i = 0; i < 2; i++) { + Node* newNodeMark = + isMarked(lnext) + ? getMarked(newNode) + : newNode; // lnext here is either nullptr or nullptr|0x1 + newNode->next.store(nullptr, std::memory_order_relaxed); + if (ltail->next.compare_exchange_strong(lnext, newNodeMark)) { + casTail(ltail, newNode); + return; + } + lnext = ltail->next.load(); + if (getUnmarked(lnext) != nullptr) { + casTail(ltail, getUnmarked(lnext)); // "tail" is always unmarked + break; + } + } + } + for (int i = 0; i < maxThreads - 1; + i++) { // This loop will run at most maxThreads because the CAS can + // fail at most maxThreads + lnext = ltail->next.load(); + if (isMarked(lnext)) + break; // This node has been dequeued, must re-read tail. It's ok to + // be marked as long as it's the first and therefore, nullptr + newNode->next.store(lnext, std::memory_order_relaxed); + if (ltail->next.compare_exchange_strong(lnext, newNode)) return; + } + } + } + + /* + * Progress condition: lock-free + * + * The dequeue() marks the node that has the item as "logically removed" + * by setting the "marked" bit in node.next + * By default, the "head" is pointing to the first node that has not been + * "logically removed", but if it's the last node (node.next is nullptr), + * then the head will be pointing to the last "logically removed" node. + */ + T* dequeue(const int tid) + { + HPGuard hpguard{hp, tid}; // RAII to call hp.clear(tid) when returning + while (true) { + Node* lhead = hp.protectPtr(kHpHead, head.load(), tid); + if (lhead != head.load()) continue; + Node* lcurr = lhead; + for (int i = 0;;) { + Node* lnext = lcurr->next.load(); + if (lnext == getMarked(nullptr)) { + if (lhead != lcurr && casHead(lhead, lcurr)) + retireSubList(lhead, lcurr, tid); + return nullptr; // Queue is empty + } + if (isMarked(lnext)) { + hp.protectPtr(kHpNext + (i & 0x1), getUnmarked(lnext), + tid); // Alternate hps during traversal + if (lhead != head.load()) break; + lcurr = getUnmarked(lnext); + i++; + continue; + } + if (!lcurr->next.compare_exchange_strong(lnext, getMarked(lnext))) + continue; + T* item = lcurr->item; + if (lcurr != lhead && casHead(lhead, lcurr)) + retireSubList(lhead, lcurr, tid); + return item; + } + } + } +}; + +#endif /* _BIT_NEXT_HP_H_ */ diff --git a/dependency/ConcurrencyFreaks/BitNextQueue.hpp b/dependency/ConcurrencyFreaks/BitNextQueue.hpp new file mode 100644 index 00000000..cab18db1 --- /dev/null +++ b/dependency/ConcurrencyFreaks/BitNextQueue.hpp @@ -0,0 +1,209 @@ +/****************************************************************************** + * Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Concurrency Freaks nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + *AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + *IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + ****************************************************************************** + */ + +#ifndef _BIT_NEXT_HP_H_ +#define _BIT_NEXT_HP_H_ + +#include +#include +#include "HazardPointers.hpp" + +/** + *

Bit Next Queue

+ * + * A Lock-Free queue which uses a trick similar to Tim Harris lock-free list of + * setting a bit on the "next" of the node. + * Based on the paper "BitNext - A Lock-Free Queue" + * https://github.com/pramalhe/ConcurrencyFreaks/tree/master/papers/bitnext-2016.pdf + * + * enqueue algorithm: bit-next, based on the trick of the bit on the next like + on Maged-Harris list + * dequeue algorithm: bit-next, based on the trick of the bit on the next like + on Maged-Harris list + * Consistency: Linearizable + * enqueue() progress: lock-free + * dequeue() progress: lock-free + * Memory Reclamation: Hazard Pointers + * Uncontended enqueue: 2 CAS + 1 HP + * Uncontended dequeue: 2 CAS + 1 HP + + *

+ * The paper on Hazard Pointers is named "Hazard Pointers: Safe Memory + * Reclamation for Lock-Free objects" and it is available here: + * http://web.cecs.pdx.edu/~walpole/class/cs510/papers/11.pdf + * + * @author Andreia Correia + * @author Pedro Ramalhete + */ +template +class BitNextQueue +{ + private: + struct Node { + T* item; + std::atomic next; + Node(T* item) : item{item}, next{nullptr} {} + }; + + bool casTail(Node* cmp, Node* val) + { + return tail.compare_exchange_strong(cmp, val); + } + + bool casHead(Node* cmp, Node* val) + { + return head.compare_exchange_strong(cmp, val); + } + + struct HPGuard { + HazardPointers& hp; + const int tid; + HPGuard(HazardPointers& hp, const int tid) : hp{hp}, tid{tid} {} + ~HPGuard() { hp.clear(tid); } + }; + + // Pointers to head and tail of the list + alignas(128) std::atomic head; + alignas(128) std::atomic tail; + + static const int MAX_THREADS = 128; + const int maxThreads; + + HazardPointers hp{ + 1, maxThreads}; // We don't traverse the list, so we use just one hp + const int kHpTail = 0; + const int kHpHead = 0; + + /* + * Bit-related functions + */ + bool isMarked(Node* node) { return ((size_t)node & 0x1); } + + Node* getMarked(Node* node) { return (Node*)((size_t)node | 0x1); } + + Node* getUnmarked(Node* node) { return (Node*)((size_t)node & (~0x1)); } + + public: + BitNextQueue(int maxThreads = MAX_THREADS) : maxThreads{maxThreads} + { + Node* sentinelNode = new Node(nullptr); + // The sentinel is already "logically removed" + sentinelNode->next.store(getMarked(nullptr), std::memory_order_relaxed); + head.store(sentinelNode, std::memory_order_relaxed); + tail.store(sentinelNode, std::memory_order_relaxed); + } + + ~BitNextQueue() + { + while (dequeue(0) != nullptr) + ; // Drain the queue + delete head.load(); // Delete the last node + } + + std::string className() { return "BitNextQueue"; } + + /* + * Progress condition: lock-free + * + * If we don't know maxThreads, we can replace the for() loop with a + * while(true) and it will still be correct. + */ + void enqueue(T* item, const int tid) + { + if (item == nullptr) throw std::invalid_argument("item can not be nullptr"); + HPGuard hpguard{hp, tid}; // RAII to call hp.clear(tid) when returning + Node* newNode = new Node(item); + while (true) { + Node* ltail = hp.protectPtr(kHpTail, tail.load(), tid); + if (ltail != tail.load()) continue; + Node* lnext = ltail->next.load(); + if (getUnmarked(lnext) != nullptr) { // Advance the tail first + casTail(ltail, getUnmarked(lnext)); // "tail" is always unmarked + } else { + for (int i = 0; i < 2; i++) { + Node* newNodeMark = + isMarked(lnext) + ? getMarked(newNode) + : newNode; // lnext here is either nullptr or nullptr|0x1 + newNode->next.store(nullptr, std::memory_order_relaxed); + if (ltail->next.compare_exchange_strong(lnext, newNodeMark)) { + casTail(ltail, newNode); + return; + } + lnext = ltail->next.load(); + if (getUnmarked(lnext) != nullptr) { + casTail(ltail, getUnmarked(lnext)); // "tail" is always unmarked + break; + } + } + } + for (int i = 0; i < maxThreads - 1; + i++) { // This loop will run at most maxThreads because the CAS can + // fail at most maxThreads + lnext = ltail->next.load(); + if (isMarked(lnext)) + break; // This node has been dequeued, must re-read tail. It's ok to + // be marked as long as it's the first and therefore, nullptr + newNode->next.store(lnext, std::memory_order_relaxed); + if (ltail->next.compare_exchange_strong(lnext, newNode)) return; + } + } + } + + /* + * Progress condition: lock-free + * + * The dequeue() marks the node that has the item as "logically removed" + * by setting the "marked" bit in node.next + * By default, the "head" is pointing to the first node that has not been + * "logically removed", but if it's the last node (node.next is nullptr), + * then the head will be pointing to the last "logically removed" node. + */ + T* dequeue(const int tid) + { + HPGuard hpguard{hp, tid}; // RAII to call hp.clear(tid) when returning + while (true) { + Node* lhead = hp.protectPtr(kHpHead, head.load(), tid); + if (lhead != head.load()) continue; + Node* lnext = lhead->next.load(); + if (isMarked(lnext)) { // This one is marked, help advance the head and + // go for the next node. + if (getUnmarked(lnext) == nullptr) + return nullptr; // Don't advance head if this is already the last + // node + if (casHead(lhead, getUnmarked(lnext))) hp.retire(lhead, tid); + continue; + } + // By now we are certain lnext is not marked + if (lhead->next.compare_exchange_strong(lnext, getMarked(lnext))) + return lhead->item; + } + } +}; + +#endif /* _BIT_NEXT_HP_H_ */ diff --git a/dependency/ConcurrencyFreaks/CMakeLists.txt b/dependency/ConcurrencyFreaks/CMakeLists.txt new file mode 100644 index 00000000..7c272b60 --- /dev/null +++ b/dependency/ConcurrencyFreaks/CMakeLists.txt @@ -0,0 +1 @@ +target_include_directories(LCT PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}) diff --git a/dependency/ConcurrencyFreaks/CRDoubleLinkQueue.hpp b/dependency/ConcurrencyFreaks/CRDoubleLinkQueue.hpp new file mode 100644 index 00000000..d09b1b06 --- /dev/null +++ b/dependency/ConcurrencyFreaks/CRDoubleLinkQueue.hpp @@ -0,0 +1,151 @@ +/****************************************************************************** + * Copyright (c) 2014-2017, Pedro Ramalhete, Andreia Correia + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Concurrency Freaks nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + *AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + *IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + ****************************************************************************** + */ + +#ifndef _CR_DOUBLE_LINK_QUEUE_HPDL_H_ +#define _CR_DOUBLE_LINK_QUEUE_HPDL_H_ + +#include +#include +#include "HazardPointersDL.hpp" // We need to use a special version of hazard pointers + +/** + *

Double Linked Queue with special Hazard Pointers

+ * + * Double Linked Queue based on the short paper by Andreia Correia and Pedro + * Ramalhete + * https://github.com/pramalhe/ConcurrencyFreaks/blob/master/papers/doublelink-2016.pdf + * + *

+ * When running uncontended, this algorithm does one CAS and one store to + * enqueue and one CAS to dequeue, thus improving over Michael-Scott which does + * two CAS to enqueue. The dequeue mechanism is pretty much the same as the one + * on Michael-Scott, with one CAS to dequeue.

enqueue algorithm: + * Double-linked enqueue with relaxed store dequeue algorithm: Michael-Scott + * Consistency: Linearizable + * enqueue() progress: lock-free + * dequeue() progress: lock-free + * Memory Reclamation: Hazard Pointers with scan for next and prev + * Uncontended enqueue: 1 CAS + 1 HP + * Uncontended dequeue: 1 CAS + 1 HP + *

+ * + * @author Pedro Ramalhete + * @author Andreia Correia + */ +template +class CRDoubleLinkQueue +{ + private: + struct Node { + T* item; + Node* prev; + std::atomic next; + Node(T* item) : item{item}, prev{nullptr}, next{nullptr} {} + }; + + bool casTail(Node* cmp, Node* val) + { + return tail.compare_exchange_strong(cmp, val); + } + + bool casHead(Node* cmp, Node* val) + { + return head.compare_exchange_strong(cmp, val); + } + + // Pointers to head and tail of the list + alignas(128) std::atomic head; + alignas(128) std::atomic tail; + + static const int MAX_THREADS = 128; + const int maxThreads; + // We need one hazard pointer for this algorithm (1 for enqueue and 1 for + // dequeue) + HazardPointersDL hp{maxThreads}; + + public: + CRDoubleLinkQueue(int maxThreads = MAX_THREADS) : maxThreads{maxThreads} + { + Node* sentinelNode = new Node(nullptr); + // The sentinel node starts self-linked in prev + sentinelNode->prev = sentinelNode; + head.store(sentinelNode); + tail.store(sentinelNode); + } + + ~CRDoubleLinkQueue() + { + while (dequeue(0) != nullptr) + ; // Drain the queue + delete head.load(); // Delete the last node + } + + std::string className() { return "CRDoubleLinkQueue"; } + + void enqueue(T* item, const int tid) + { + if (item == nullptr) throw std::invalid_argument("item can not be nullptr"); + Node* newNode = new Node(item); + while (true) { + Node* ltail = hp.protectPtr(tail.load(), tid); + if (ltail != tail.load()) continue; + Node* lprev = ltail->prev; // lprev is protected by the hp + newNode->prev = ltail; + // Try to help the previous enqueue to complete + if (lprev->next.load() == nullptr) + lprev->next.store(ltail, std::memory_order_relaxed); + if (casTail(ltail, newNode)) { + ltail->next.store(newNode, std::memory_order_release); + hp.clear(tid); // There is a release store here + return; + } + } + } + + T* dequeue(const int tid) + { + while (true) { + Node* lhead = hp.protectPtr(head.load(), tid); + if (lhead != head.load()) continue; + Node* lnext = lhead->next.load(); // lnext is protected by the hp + if (lnext == nullptr) { // Check if queue is empty + hp.clear(tid); + return nullptr; + } + if (casHead(lhead, lnext)) { + // lnext may be deleted after we do hp.clear() + T* item = lnext->item; + hp.clear(tid); + hp.retire(lhead, tail.load(), tid); + return item; + } + } + } +}; + +#endif /* _CR_DOUBLE_LINK_QUEUE_HPDL_H_ */ diff --git a/dependency/ConcurrencyFreaks/CRTurnQueue.hpp b/dependency/ConcurrencyFreaks/CRTurnQueue.hpp new file mode 100644 index 00000000..d977ef0d --- /dev/null +++ b/dependency/ConcurrencyFreaks/CRTurnQueue.hpp @@ -0,0 +1,276 @@ +/****************************************************************************** + * Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Concurrency Freaks nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + *AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + *IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + ****************************************************************************** + */ + +#ifndef _CR_TURN_QUEUE_HP_H_ +#define _CR_TURN_QUEUE_HP_H_ + +#include +#include +#include "HazardPointers.hpp" + +/** + *

CR Turn Queue

+ * + * A concurrent wait-free queue that is Multi-Producer-Multi-Consumer and does + * its own wait-free memory reclamation. + * Based on the paper "A Wait-Free Queue with Wait-Free Memory Reclamation" + * https://github.com/pramalhe/ConcurrencyFreaks/tree/master/papers/crturnqueue-2016.pdf + * + *

+ * Enqueue algorithm: CR Turn enqueue + * Dequeue algorithm: CR Turn dequeue + * Consistency: Linearizable + * enqueue() progress: wait-free bounded O(N_threads) + * dequeue() progress: wait-free bounded O(N_threads) + * Memory Reclamation: Hazard Pointers (wait-free) + * + *

+ * The paper on Hazard Pointers is named "Hazard Pointers: Safe Memory + * Reclamation for Lock-Free objects" and it is available here: + * http://web.cecs.pdx.edu/~walpole/class/cs510/papers/11.pdf + * + * @author Andreia Correia + * @author Pedro Ramalhete + */ +template +class CRTurnQueue +{ + private: + struct Node { + T* item; + const int enqTid; + std::atomic deqTid; + std::atomic next; + + Node(T* item, int tid) + : item{item}, enqTid{tid}, deqTid{IDX_NONE}, next{nullptr} + { + } + + bool casDeqTid(int cmp, int val) + { + return deqTid.compare_exchange_strong(cmp, val); + } + }; + + static const int IDX_NONE = -1; + static const int MAX_THREADS = 128; + const int maxThreads; + + // Pointers to head and tail of the list + alignas(128) std::atomic head; + alignas(128) std::atomic tail; + // Enqueue requests + alignas(128) std::atomic enqueuers[MAX_THREADS]; + // Dequeue requests + alignas(128) std::atomic deqself[MAX_THREADS]; + alignas(128) std::atomic deqhelp[MAX_THREADS]; + + HazardPointers hp{3, maxThreads}; // We need three hazard pointers + const int kHpTail = 0; + const int kHpHead = 0; + const int kHpNext = 1; + const int kHpDeq = 2; + + Node* sentinelNode = new Node(nullptr, 0); + + /** + * Called only from dequeue() + * + * Search for the next request to dequeue and assign it to lnext.deqTid + * It is only a request to dequeue if deqself[i] equals deqhelp[i]. + */ + int searchNext(Node* lhead, Node* lnext) + { + const int turn = lhead->deqTid.load(); + for (int idx = turn + 1; idx < turn + maxThreads + 1; idx++) { + const int idDeq = idx % maxThreads; + if (deqself[idDeq].load() != deqhelp[idDeq].load()) continue; + if (lnext->deqTid.load() == IDX_NONE) lnext->casDeqTid(IDX_NONE, idDeq); + break; + } + return lnext->deqTid.load(); + } + + /** + * Called only from dequeue() + * + * If the ldeqTid is not our own, we must use an HP to protect against + * deqhelp[ldeqTid] being retired-deleted-newed-reenqueued. + */ + void casDeqAndHead(Node* lhead, Node* lnext, const int tid) + { + const int ldeqTid = lnext->deqTid.load(); + if (ldeqTid == tid) { + deqhelp[ldeqTid].store(lnext, std::memory_order_release); + } else { + Node* ldeqhelp = hp.protectPtr(kHpDeq, deqhelp[ldeqTid].load(), tid); + if (ldeqhelp != lnext && lhead == head.load()) { + deqhelp[ldeqTid].compare_exchange_strong( + ldeqhelp, lnext); // Assign next to request + } + } + head.compare_exchange_strong(lhead, lnext); + } + + /** + * Called only from dequeue() + * + * Giveup procedure, for when there are no nodes left to dequeue + */ + void giveUp(Node* myReq, const int tid) + { + Node* lhead = head.load(); + if (deqhelp[tid].load() != myReq || lhead == tail.load()) return; + hp.protectPtr(kHpHead, lhead, tid); + if (lhead != head.load()) return; + Node* lnext = hp.protectPtr(kHpNext, lhead->next.load(), tid); + if (lhead != head.load()) return; + if (searchNext(lhead, lnext) == IDX_NONE) lnext->casDeqTid(IDX_NONE, tid); + casDeqAndHead(lhead, lnext, tid); + } + + public: + CRTurnQueue(int maxThreads = MAX_THREADS) : maxThreads(maxThreads) + { + head.store(sentinelNode, std::memory_order_relaxed); + tail.store(sentinelNode, std::memory_order_relaxed); + for (int i = 0; i < maxThreads; i++) { + enqueuers[i].store(nullptr, std::memory_order_relaxed); + // deqself[i] != deqhelp[i] means that isRequest=false + deqself[i].store(new Node(nullptr, 0), std::memory_order_relaxed); + deqhelp[i].store(new Node(nullptr, 0), std::memory_order_relaxed); + } + } + + ~CRTurnQueue() + { + delete sentinelNode; + while (dequeue(0) != nullptr) + ; // Drain the queue + for (int i = 0; i < maxThreads; i++) delete deqself[i].load(); + for (int i = 0; i < maxThreads; i++) delete deqhelp[i].load(); + } + + std::string className() { return "CRTurnQueue"; } + + /** + * Steps when uncontended: + * 1. Add node to enqueuers[] + * 2. Insert node in tail.next using a CAS + * 3. Advance tail to tail.next + * 4. Remove node from enqueuers[] + * + * @param tid The tid must be a UNIQUE index for each thread, in the range 0 + * to maxThreads-1 + */ + void enqueue(T* item, const int tid) + { + if (item == nullptr) throw std::invalid_argument("item can not be nullptr"); + Node* myNode = new Node(item, tid); + enqueuers[tid].store(myNode); + for (int i = 0; i < maxThreads; i++) { + if (enqueuers[tid].load() == nullptr) { + hp.clear(tid); + return; // Some thread did all the steps + } + Node* ltail = hp.protectPtr(kHpTail, tail.load(), tid); + if (ltail != tail.load()) + continue; // If the tail advanced maxThreads times, then my node has + // been enqueued + if (enqueuers[ltail->enqTid].load() == + ltail) { // Help a thread do step 4 + Node* tmp = ltail; + enqueuers[ltail->enqTid].compare_exchange_strong(tmp, nullptr); + } + for (int j = 1; j < maxThreads + 1; j++) { // Help a thread do step 2 + Node* nodeToHelp = enqueuers[(j + ltail->enqTid) % maxThreads].load(); + if (nodeToHelp == nullptr) continue; + Node* nodenull = nullptr; + ltail->next.compare_exchange_strong(nodenull, nodeToHelp); + break; + } + Node* lnext = ltail->next.load(); + if (lnext != nullptr) + tail.compare_exchange_strong(ltail, lnext); // Help a thread do step 3 + } + enqueuers[tid].store( + nullptr, + std::memory_order_release); // Do step 4, just in case it's not done + hp.clear(tid); + } + + /** + * Steps when uncontended: + * 1. Publish request to dequeue in dequeuers[tid]; + * 2. CAS node->deqTid from IDX_START to tid; + * 3. Set dequeuers[tid] to the newly owned node; + * 4. Advance the head with casHead(); + * + * We must protect either head or tail with HP before doing the check for + * empty queue, otherwise we may get into retired-deleted-newed-reenqueued. + * + * @param tid: The tid must be a UNIQUE index for each thread, in the range 0 + * to maxThreads-1 + */ + T* dequeue(const int tid) + { + Node* prReq = deqself[tid].load(); // Previous request + Node* myReq = deqhelp[tid].load(); + deqself[tid].store(myReq); // Step 1 + for (int i = 0; i < maxThreads; i++) { + if (deqhelp[tid].load() != myReq) break; // No need for HP + Node* lhead = hp.protectPtr(kHpHead, head.load(), tid); + if (lhead != head.load()) continue; + if (lhead == tail.load()) { // Give up + deqself[tid].store(prReq); // Rollback request to dequeue + giveUp(myReq, tid); + if (deqhelp[tid].load() != myReq) { + deqself[tid].store(myReq, std::memory_order_relaxed); + break; + } + hp.clear(tid); + return nullptr; + } + Node* lnext = hp.protectPtr(kHpNext, lhead->next.load(), tid); + if (lhead != head.load()) continue; + if (searchNext(lhead, lnext) != IDX_NONE) + casDeqAndHead(lhead, lnext, tid); + } + Node* myNode = deqhelp[tid].load(); + Node* lhead = + hp.protectPtr(kHpHead, head.load(), tid); // Do step 4 if needed + if (lhead == head.load() && myNode == lhead->next.load()) + head.compare_exchange_strong(lhead, myNode); + hp.clear(tid); + hp.retire(prReq, tid); + return myNode->item; + } +}; + +#endif /* _CR_TURN_QUEUE_HP_H_ */ diff --git a/dependency/ConcurrencyFreaks/HazardPointers.hpp b/dependency/ConcurrencyFreaks/HazardPointers.hpp new file mode 100644 index 00000000..4c84f65d --- /dev/null +++ b/dependency/ConcurrencyFreaks/HazardPointers.hpp @@ -0,0 +1,167 @@ +/****************************************************************************** + * Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Concurrency Freaks nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + ****************************************************************************** + */ + +#ifndef _HAZARD_POINTERS_H_ +#define _HAZARD_POINTERS_H_ + +#include +#include +#include + +template +class HazardPointers +{ + public: + static const int HP_MAX_THREADS = 512; + + private: + static const int HP_MAX_HPS = 4; // This is named 'K' in the HP paper + static const int CLPAD = 128 / sizeof(std::atomic); + static const int HP_THRESHOLD_R = 0; // This is named 'R' in the HP paper + static const int MAX_RETIRED = + HP_MAX_THREADS * + HP_MAX_HPS; // Maximum number of retired objects per thread + + const int maxHPs; + const int maxThreads; + + std::atomic* hp[HP_MAX_THREADS]; + // It's not nice that we have a lot of empty vectors, but we need padding to + // avoid false sharing + std::vector retiredList[HP_MAX_THREADS * CLPAD]; + + public: + HazardPointers(int maxHPs = HP_MAX_HPS, int maxThreads = HP_MAX_THREADS) + : maxHPs{maxHPs}, maxThreads{maxThreads} + { + for (int ithread = 0; ithread < HP_MAX_THREADS; ithread++) { + hp[ithread] = + new std::atomic[CLPAD * + 2]; // We allocate four cache lines to allow for + // many hps and without false sharing + for (int ihp = 0; ihp < HP_MAX_HPS; ihp++) { + hp[ithread][ihp].store(nullptr, std::memory_order_relaxed); + } + } + } + + ~HazardPointers() + { + for (int ithread = 0; ithread < HP_MAX_THREADS; ithread++) { + delete[] hp[ithread]; + // Clear the current retired nodes + for (unsigned iret = 0; iret < retiredList[ithread * CLPAD].size(); + iret++) { + delete retiredList[ithread * CLPAD][iret]; + } + } + } + + /** + * Progress Condition: wait-free bounded (by maxHPs) + */ + void clear(const int tid) + { + for (int ihp = 0; ihp < maxHPs; ihp++) { + hp[tid][ihp].store(nullptr, std::memory_order_release); + } + } + + /** + * Progress Condition: wait-free population oblivious + */ + void clearOne(int ihp, const int tid) + { + hp[tid][ihp].store(nullptr, std::memory_order_release); + } + + /** + * Progress Condition: lock-free + */ + T* protect(int index, const std::atomic& atom, const int tid) + { + T* n = nullptr; + T* ret; + while ((ret = atom.load()) != n) { + hp[tid][index].store(ret); + n = ret; + } + return ret; + } + + /** + * This returns the same value that is passed as ptr, which is sometimes + * useful Progress Condition: wait-free population oblivious + */ + T* protectPtr(int index, T* ptr, const int tid) + { + hp[tid][index].store(ptr); + return ptr; + } + + /** + * This returns the same value that is passed as ptr, which is sometimes + * useful Progress Condition: wait-free population oblivious + */ + T* protectRelease(int index, T* ptr, const int tid) + { + hp[tid][index].store(ptr, std::memory_order_release); + return ptr; + } + + /** + * Progress Condition: wait-free bounded (by the number of threads squared) + */ + void retire(T* ptr, const int tid) + { + retiredList[tid * CLPAD].push_back(ptr); + if (retiredList[tid * CLPAD].size() < HP_THRESHOLD_R) return; + for (unsigned iret = 0; iret < retiredList[tid * CLPAD].size();) { + auto obj = retiredList[tid * CLPAD][iret]; + bool canDelete = true; + for (int tid = 0; tid < maxThreads && canDelete; tid++) { + for (int ihp = maxHPs - 1; ihp >= 0; ihp--) { + if (hp[tid][ihp].load() == obj) { + canDelete = false; + break; + } + } + } + if (canDelete) { + retiredList[tid * CLPAD].erase(retiredList[tid * CLPAD].begin() + iret); + delete obj; + continue; + } + iret++; + } + } +}; + +#endif /* _HAZARD_POINTERS_H_ */ diff --git a/dependency/ConcurrencyFreaks/HazardPointersConditional.hpp b/dependency/ConcurrencyFreaks/HazardPointersConditional.hpp new file mode 100644 index 00000000..db38c311 --- /dev/null +++ b/dependency/ConcurrencyFreaks/HazardPointersConditional.hpp @@ -0,0 +1,137 @@ +/****************************************************************************** + * Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Concurrency Freaks nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + ****************************************************************************** + */ + +#ifndef _HAZARD_POINTERS_COND_H_ +#define _HAZARD_POINTERS_COND_H_ + +#include +#include + +template +class HazardPointersConditional +{ + private: + static const int HP_MAX_THREADS = 128; + static const int HP_MAX_HPS = 4; // This is named 'K' in the HP paper + static const int CLPAD = 128 / sizeof(std::atomic); + static const int HP_THRESHOLD_R = 0; // This is named 'R' in the HP paper + static const int MAX_RETIRED = + HP_MAX_THREADS * + HP_MAX_HPS; // Maximum number of retired objects per thread + + const int maxHPs; + const int maxThreads; + std::atomic hp[HP_MAX_THREADS * CLPAD][HP_MAX_HPS]; + // It's not nice that we have a lot of empty vectors, but we need padding to + // avoid false sharing + std::vector retiredList[HP_MAX_THREADS * CLPAD]; + + public: + HazardPointersConditional(int maxHPs = HP_MAX_HPS, + int maxThreads = HP_MAX_THREADS) + : maxHPs{maxHPs}, maxThreads{maxThreads} + { + for (int ithread = 0; ithread < HP_MAX_THREADS; ithread++) { + for (int ihp = 0; ihp < HP_MAX_HPS; ihp++) { + hp[ithread * CLPAD][ihp].store(nullptr, std::memory_order_relaxed); + } + } + } + + ~HazardPointersConditional() + { + for (int ithread = 0; ithread < HP_MAX_THREADS; ithread++) { + // Clear the current retired nodes + for (unsigned iret = 0; iret < retiredList[ithread * CLPAD].size(); + iret++) { + delete retiredList[ithread * CLPAD][iret]; + } + } + } + + /** + * Progress Condition: wait-free bounded (by maxHPs) + */ + void clear(const int tid) + { + for (int ihp = 0; ihp < maxHPs; ihp++) { + hp[tid * CLPAD][ihp].store(nullptr, std::memory_order_release); + } + } + + T* protect(int index, const std::atomic& atom, const int tid) + { + T* n = nullptr; + T* ret; + while ((ret = atom.load()) != n) { + hp[tid * CLPAD][index].store(ret); + n = ret; + } + return ret; + } + + // This returns the same value that is passed as ptr, which is sometimes + // usefull + T* protectPtr(int index, T* ptr, const int tid) + { + hp[tid * CLPAD][index].store(ptr); + return ptr; + } + + void retire(T* ptr, const int tid) + { + retiredList[tid * CLPAD].push_back(ptr); + if (retiredList[tid * CLPAD].size() < HP_THRESHOLD_R) return; + for (unsigned iret = 0; iret < retiredList[tid * CLPAD].size();) { + auto obj = retiredList[tid * CLPAD][iret]; + if (obj->item.load() != nullptr) { + iret++; + continue; // Delete only if Node.item == nullptr + } + bool canDelete = true; + for (int tid = 0; tid < maxThreads && canDelete; tid++) { + for (int ihp = maxHPs - 1; ihp >= 0; ihp--) { + if (hp[tid * CLPAD][ihp].load() == obj) { + canDelete = false; + break; + } + } + } + if (canDelete) { + retiredList[tid * CLPAD].erase(retiredList[tid * CLPAD].begin() + iret); + delete obj; + continue; + } + iret++; + } + } +}; + +#endif /* _HAZARD_POINTERS_CONDITIONAL_H_ */ diff --git a/dependency/ConcurrencyFreaks/HazardPointersDL.hpp b/dependency/ConcurrencyFreaks/HazardPointersDL.hpp new file mode 100644 index 00000000..d7101338 --- /dev/null +++ b/dependency/ConcurrencyFreaks/HazardPointersDL.hpp @@ -0,0 +1,129 @@ +/****************************************************************************** + * Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Concurrency Freaks nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + ****************************************************************************** + */ + +#ifndef _HAZARD_POINTERS_DL_H_ +#define _HAZARD_POINTERS_DL_H_ + +#include +#include + +template +class HazardPointersDL +{ + private: + static const int HP_MAX_THREADS = 128; + static const int CLPAD = 128 / sizeof(std::atomic); + static const int HP_THRESHOLD_R = 0; // This is named 'R' in the HP paper + static const int MAX_RETIRED = + HP_MAX_THREADS; // Maximum number of retired objects per thread + + const int maxThreads; + + std::atomic hp[HP_MAX_THREADS * CLPAD]; + // It's not nice that we have a lot of empty vectors, but we need padding to + // avoid false sharing + std::vector retiredList[HP_MAX_THREADS * CLPAD]; + + public: + HazardPointersDL(int maxThreads = HP_MAX_THREADS) : maxThreads{maxThreads} + { + for (int ithread = 0; ithread < HP_MAX_THREADS; ithread++) { + hp[ithread * CLPAD].store(nullptr, std::memory_order_relaxed); + } + } + + ~HazardPointersDL() + { + for (int ithread = 0; ithread < HP_MAX_THREADS; ithread++) { + // Clear the current retired nodes + for (unsigned iret = 0; iret < retiredList[ithread * CLPAD].size(); + iret++) { + delete retiredList[ithread * CLPAD][iret]; + } + } + } + + /** + * Progress Condition: wait-free + */ + void clear(const int tid) + { + hp[tid * CLPAD].store(nullptr, std::memory_order_release); + } + + /** + * This returns the same value that is passed as ptr, which is sometimes + * useful Progress Condition: wait-free population oblivious + */ + T* protectPtr(T* ptr, const int tid) + { + hp[tid * CLPAD].store(ptr); + return ptr; + } + + /** + * Progress Condition: wait-free bounded (by the number of threads squared) + * + * We can not delete a node if: + * - There is an hazard pointer for it; + * - There is an hazard pointer for its "next": It might dereference + * node->prev in enqueue() + * - There is an hazard pointer for its "prev": It might dereference + * node->next in dequeue() + */ + void retire(T* ptr, T* ltail, const int tid) + { + retiredList[tid * CLPAD].push_back(ptr); + if (retiredList[tid * CLPAD].size() < HP_THRESHOLD_R) return; + for (unsigned iret = 0; iret < retiredList[tid * CLPAD].size();) { + auto obj = retiredList[tid * CLPAD][iret]; + if (obj->next.load() == ltail) { + iret++; + continue; // Can't delete the node previous to the current tail + } + bool canDelete = true; + for (int tid = 0; tid < maxThreads; tid++) { + T* curhp = hp[tid * CLPAD].load(); + if (obj == curhp || obj->next.load() == curhp || obj->prev == curhp) { + canDelete = false; + break; + } + } + if (canDelete) { + retiredList[tid * CLPAD].erase(retiredList[tid * CLPAD].begin() + iret); + delete obj; + continue; + } + iret++; + } + } +}; + +#endif /* _HAZARD_POINTERS_DL_H_ */ diff --git a/dependency/ConcurrencyFreaks/KoganPetrankQueueCHP.hpp b/dependency/ConcurrencyFreaks/KoganPetrankQueueCHP.hpp new file mode 100644 index 00000000..91dd3d92 --- /dev/null +++ b/dependency/ConcurrencyFreaks/KoganPetrankQueueCHP.hpp @@ -0,0 +1,384 @@ +/****************************************************************************** + * Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Concurrency Freaks nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + *AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + *IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + ****************************************************************************** + */ + +#ifndef _KOGAN_PETRANK_QUEUE_CHP_H_ +#define _KOGAN_PETRANK_QUEUE_CHP_H_ + +#include +#include +#include "HazardPointers.hpp" +#include "HazardPointersConditional.hpp" + +/** + *

Kogan-Petrank Queue with Conditional Hazard Pointers

+ * + * http://www.cs.technion.ac.il/~erez/Papers/wfquque-ppopp.pdf + * + * enqueue algorithm: Kogan-Petrank, based on the consensus of Lamport's bakery + * dequeue algorithm: Kogan-Petrank, based on the consensus of Lamport's bakery + * Consistency: Linearizable + * enqueue() progress: wait-free bounded O(N_threads) + * dequeue() progress: wait-free bounded O(N_threads) + * Memory Reclamation: Hazard Pointers + Hazard Pointers Conditional + * + * The paper on Hazard Pointers is named "Hazard Pointers: Safe Memory + * Reclamation for Lock-Free objects" and it is available here: + * http://web.cecs.pdx.edu/~walpole/class/cs510/papers/11.pdf + * + */ +template +class KoganPetrankQueueCHP +{ + private: + struct Node { + std::atomic item; + const int enqTid; + std::atomic deqTid{IDX_NONE}; + std::atomic next{nullptr}; + + Node(T* userItem, int enqTid) : item{userItem}, enqTid{enqTid} {} + + bool casNext(Node* cmp, Node* val) + { + // Use a tmp variable because this CAS "replaces" the value of the first + // argument + Node* tmp = cmp; + return next.compare_exchange_strong(tmp, val); + } + }; + + struct OpDesc { + const long long phase; + const bool pending; + const bool enqueue; + Node* node; // This is immutable once assigned + OpDesc(long long ph, bool pend, bool enq, Node* n) + : phase{ph}, pending{pend}, enqueue{enq}, node{n} + { + } + }; + + bool casTail(Node* cmp, Node* val) + { + return tail.compare_exchange_strong(cmp, val); + } + + bool casHead(Node* cmp, Node* val) + { + return head.compare_exchange_strong(cmp, val); + } + + // Member variables + static const int MAX_THREADS = 128; + + // Pointers to head and tail of the list + alignas(128) std::atomic head; + alignas(128) std::atomic tail; + // Array of enque and dequeue requests + alignas(128) std::atomic state[MAX_THREADS]; + + const static int IDX_NONE = -1; + OpDesc* OPDESC_END = new OpDesc(IDX_NONE, false, true, nullptr); + const int maxThreads; + + const static int HP_CRT_REQ = 3; + + // Hazard Pointers and HPC + HazardPointers hpOpDesc{ + 2, maxThreads}; // We only need two HPs for OpDesc instances + const int kHpODCurr = 0; + const int kHpODNext = 1; + HazardPointersConditional hpNode{ + 3, maxThreads}; // This will delete only if Node.item == nullptr + const int kHpCurr = 0; + const int kHpNext = 1; + const int kHpPrev = 2; + + public: + KoganPetrankQueueCHP(int maxThreads = MAX_THREADS) : maxThreads(maxThreads) + { + Node* sentinelNode = new Node(nullptr, IDX_NONE); + head.store(sentinelNode); + tail.store(sentinelNode); + for (int i = 0; i < maxThreads; i++) { + state[i].store(OPDESC_END); + } + } + + ~KoganPetrankQueueCHP() + { + while (dequeue(0) != nullptr) + ; // Drain the queue + delete head.load(); // Delete the last node + delete OPDESC_END; + } + + std::string className() { return "KoganPetrankQueueCHP"; } + + void help(long long phase, const int TID) + { + for (int i = 0; i < maxThreads; i++) { + // Try to validate the HP for OpDesc at most MAX_OPDESC_TRANS times + OpDesc* desc = hpOpDesc.protectPtr(kHpODCurr, state[i].load(), TID); + int it = 0; + for (; it < maxThreads + 1; it++) { + if (desc == state[i].load()) break; + desc = hpOpDesc.protectPtr(kHpODCurr, state[i].load(), TID); + } + if (it == maxThreads + 1 && desc != state[i].load()) continue; + if (desc->pending && desc->phase <= phase) { + if (desc->enqueue) { + help_enq(i, phase, TID); + } else { + help_deq(i, phase, TID); + } + } + } + } + + /** + * Progress Condition: wait-free bounded by maxThreads + */ + long long maxPhase(const int TID) + { + long long maxPhase = -1; + for (int i = 0; i < maxThreads; i++) { + // Try to validate the HP for OpDesc at most MAX_OPDESC_TRANS times + OpDesc* desc = hpOpDesc.protectPtr(kHpODCurr, state[i].load(), TID); + int it = 0; + for (; it < maxThreads + 1; it++) { + if (desc == state[i].load()) break; + desc = hpOpDesc.protectPtr(kHpODCurr, state[i].load(), TID); + } + if (it == maxThreads + 1 && desc != state[i].load()) continue; + long long phase = desc->phase; + if (phase > maxPhase) { + maxPhase = phase; + } + } + return maxPhase; + } + + bool isStillPending(int tid, long long ph, const int TID) + { + OpDesc* desc = hpOpDesc.protectPtr(kHpODNext, state[tid].load(), TID); + int it = 0; + for (; it < maxThreads + 1; it++) { + if (desc == state[tid].load()) break; + desc = hpOpDesc.protectPtr(kHpODNext, state[tid].load(), TID); + } + if (it == maxThreads + 1 && desc != state[tid].load()) return false; + return desc->pending && desc->phase <= ph; + } + + void enqueue(T* item, const int TID) + { + // We better have consecutive thread ids, otherwise this will blow up + long long phase = maxPhase(TID) + 1; + state[TID].store(new OpDesc(phase, true, true, new Node(item, TID))); + help(phase, TID); + help_finish_enq(TID); + hpOpDesc.clear(TID); + hpNode.clear(TID); + OpDesc* desc = state[TID].load(); + for (int i = 0; i < maxThreads * 2; i++) { // Is maxThreads+1 enough? + if (desc == OPDESC_END) break; + if (state[TID].compare_exchange_strong(desc, OPDESC_END)) break; + desc = state[TID].load(); + } + hpOpDesc.retire(desc, TID); + } + + void help_enq(int tid, long long phase, const int TID) + { + while (isStillPending(tid, phase, TID)) { + Node* last = hpNode.protectPtr(kHpCurr, tail.load(), TID); + if (last != tail.load()) continue; + Node* next = last->next.load(); + if (last == tail) { + if (next == nullptr) { + if (isStillPending(tid, phase, TID)) { + OpDesc* curDesc = + hpOpDesc.protectPtr(kHpODCurr, state[tid].load(), TID); + if (curDesc != state[tid].load()) continue; + if (last->casNext(next, curDesc->node)) { + help_finish_enq(TID); + return; + } + } + } else { + help_finish_enq(TID); + } + } + } + } + + void help_finish_enq(const int TID) + { + Node* last = hpNode.protectPtr(kHpCurr, tail.load(), TID); + if (last != tail.load()) return; + // The inner loop will run at most twice, because last->next is immutable + // when non-null + Node* next = hpNode.protect(kHpNext, last->next, TID); + // Check "last" equals "tail" to prevent ABA on "last->next" + if (last == tail && next != nullptr) { + int tid = next->enqTid; + OpDesc* curDesc = hpOpDesc.protectPtr(kHpODCurr, state[tid], TID); + if (curDesc != state[tid].load()) return; + if (last == tail && curDesc->node == next) { + OpDesc* newDesc = new OpDesc(curDesc->phase, false, true, next); + OpDesc* tmp = curDesc; + if (state[tid].compare_exchange_strong(tmp, newDesc)) { + hpOpDesc.retire(curDesc, TID); + } else { + delete newDesc; + } + casTail(last, next); + } + } + } + + T* dequeue(const int TID) + { + // We better have consecutive thread ids, otherwise this will blow up + long long phase = maxPhase(TID) + 1; + state[TID].store(new OpDesc(phase, true, false, nullptr)); + help(phase, TID); + help_finish_deq(TID); + OpDesc* curDesc = hpOpDesc.protect(kHpODCurr, state[TID], TID); + Node* node = curDesc->node; // No need for hp because this thread will be + // the one to retire "node" + if (node == nullptr) { + hpOpDesc.clear(TID); + hpNode.clear(TID); + OpDesc* desc = state[TID].load(); + for (int i = 0; i < MAX_THREADS; i++) { + if (state[TID].compare_exchange_strong(desc, OPDESC_END)) break; + desc = state[TID].load(); + if (desc == OPDESC_END) break; + } + hpOpDesc.retire(desc, TID); + return nullptr; // We return null instead of throwing an exception + } + Node* next = node->next; // No need for chp because "next" can only be + // deleted when item set to nullptr + T* value = next->item.load(); + next->item.store(nullptr); // "next" can be deleted now + hpOpDesc.clear(TID); + hpNode.clear(TID); + hpNode.retire( + node, TID); // "node" will be deleted only when node.item == nullptr + OpDesc* desc = state[TID].load(); + for (int i = 0; i < maxThreads * 2; i++) { // Is maxThreads+1 enough? + if (desc == OPDESC_END) break; + if (state[TID].compare_exchange_strong(desc, OPDESC_END)) break; + desc = state[TID].load(); + } + hpOpDesc.retire(desc, TID); + return value; + } + + void help_deq(int tid, long long phase, const int TID) + { + while (isStillPending(tid, phase, TID)) { + Node* first = hpNode.protectPtr(kHpPrev, head, TID); + Node* last = hpNode.protectPtr(kHpCurr, tail, TID); + if (first != head.load() || last != tail.load()) continue; + Node* next = first->next.load(); + if (first == head) { + if (first == last) { + if (next == nullptr) { + OpDesc* curDesc = hpOpDesc.protectPtr(kHpODCurr, state[tid], TID); + if (curDesc != state[tid].load()) continue; + if (last == tail && isStillPending(tid, phase, TID)) { + OpDesc* newDesc = + new OpDesc(curDesc->phase, false, false, nullptr); + OpDesc* tmp = curDesc; + if (state[tid].compare_exchange_strong(tmp, newDesc)) { + hpOpDesc.retire(curDesc, TID); + } else { + delete newDesc; + } + } + } else { + help_finish_enq(TID); + } + } else { + OpDesc* curDesc = hpOpDesc.protectPtr(kHpODCurr, state[tid], TID); + if (curDesc != state[tid].load()) continue; + Node* node = curDesc->node; + if (!isStillPending(tid, phase, TID)) break; + if (first == head && node != first) { + OpDesc* newDesc = new OpDesc(curDesc->phase, true, false, first); + OpDesc* tmp = curDesc; + if (state[tid].compare_exchange_strong(tmp, newDesc)) { + hpOpDesc.retire(curDesc, TID); + } else { + delete newDesc; + continue; + } + } + int tmp = -1; + first->deqTid.compare_exchange_strong(tmp, tid); + help_finish_deq(TID); + } + } + } + } + + void help_finish_deq(const int TID) + { + Node* first = hpNode.protectPtr(kHpPrev, head, TID); + if (first != head.load()) return; + Node* next = first->next.load(); + int tid = first->deqTid.load(); + if (tid != -1) { + OpDesc* curDesc = nullptr; + for (int i = 0; i < MAX_THREADS; i++) { + curDesc = hpOpDesc.protectPtr(kHpODCurr, state[tid], TID); + if (curDesc == state[tid].load()) break; + if (i == MAX_THREADS - 1) + return; // If the opdesc has changed these many times, the operation + // must be complete + } + if (first == head && next != nullptr) { + OpDesc* newDesc = + new OpDesc(curDesc->phase, false, false, curDesc->node); + OpDesc* tmp = curDesc; + if (state[tid].compare_exchange_strong(tmp, newDesc)) { + hpOpDesc.retire(curDesc, TID); + } else { + delete newDesc; + } + casHead(first, next); + } + } + } +}; + +#endif /* _KOGAN_PETRANK_QUEUE_HP_H_ */ diff --git a/dependency/ConcurrencyFreaks/LCRQueue.hpp b/dependency/ConcurrencyFreaks/LCRQueue.hpp new file mode 100644 index 00000000..df4c9910 --- /dev/null +++ b/dependency/ConcurrencyFreaks/LCRQueue.hpp @@ -0,0 +1,307 @@ +/****************************************************************************** + * Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Concurrency Freaks nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + *AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + *IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + ****************************************************************************** + */ + +#ifndef _LCRQ_QUEUE_HP_H_ +#define _LCRQ_QUEUE_HP_H_ + +#include +#include "HazardPointers.hpp" + +// CAS2 macro + +#define __CAS2(ptr, o1, o2, n1, n2) \ + ({ \ + char __ret; \ + __typeof__(o2) __junk; \ + __typeof__(*(ptr)) __old1 = (o1); \ + __typeof__(o2) __old2 = (o2); \ + __typeof__(*(ptr)) __new1 = (n1); \ + __typeof__(o2) __new2 = (n2); \ + asm volatile("lock cmpxchg16b %2;setz %1" \ + : "=d"(__junk), "=a"(__ret), "+m"(*ptr) \ + : "b"(__new1), "c"(__new2), "a"(__old1), "d"(__old2)); \ + __ret; \ + }) + +#define CAS2(ptr, o1, o2, n1, n2) __CAS2(ptr, o1, o2, n1, n2) + +#define BIT_TEST_AND_SET(ptr, b) \ + ({ \ + char __ret; \ + asm volatile("lock btsq $63, %0; setnc %1" \ + : "+m"(*ptr), "=a"(__ret) \ + : \ + : "cc"); \ + __ret; \ + }) + +/** + *

LCRQ Queue

+ * + * This is LCRQ by Adam Morrison and Yehuda Afek + * http://www.cs.tau.ac.il/~mad/publications/ppopp2013-x86queues.pdf + * + * This implementation does NOT obey the C++ memory model rules AND it is x86 + * specific. No guarantees are given on the correctness or consistency of the + * results if you use this queue. + * + * Bugs fixed: + * tt was not initialized in dequeue(); + * + *

+ * enqueue algorithm: MS enqueue + LCRQ with re-usage + * dequeue algorithm: MS dequeue + LCRQ with re-usage + * Consistency: Linearizable + * enqueue() progress: lock-free + * dequeue() progress: lock-free + * Memory Reclamation: Hazard Pointers (lock-free) + * + *

+ * The paper on Hazard Pointers is named "Hazard Pointers: Safe Memory + * Reclamation for Lock-Free objects" and it is available here: + * http://web.cecs.pdx.edu/~walpole/class/cs510/papers/11.pdf + * + * @author Pedro Ramalhete + * @author Andreia Correia + */ +template +class LCRQueue +{ + private: + static const int RING_POW = 10; + static const uint64_t RING_SIZE = 1ull << RING_POW; + + struct Cell { + std::atomic val; + std::atomic idx; + uint64_t pad[14]; + } __attribute__((aligned(128))); + + struct Node { + std::atomic head __attribute__((aligned(128))); + std::atomic tail __attribute__((aligned(128))); + std::atomic next __attribute__((aligned(128))); + Cell array[RING_SIZE]; + + Node() + { + for (unsigned i = 0; i < RING_SIZE; i++) { + array[i].val.store(nullptr, std::memory_order_relaxed); + array[i].idx.store(i, std::memory_order_relaxed); + } + head.store(0, std::memory_order_relaxed); + tail.store(0, std::memory_order_relaxed); + next.store(nullptr, std::memory_order_relaxed); + } + }; + + alignas(128) std::atomic head; + alignas(128) std::atomic tail; + + static const int MAX_THREADS = 128; + const int maxThreads; + + HazardPointers hp{1, maxThreads}; + const int kHpTail = 0; + const int kHpHead = 0; + + /* + * Private methods + */ + int is_empty(T* v) { return (v == nullptr); } + + uint64_t node_index(uint64_t i) { return (i & ~(1ull << 63)); } + + uint64_t set_unsafe(uint64_t i) { return (i | (1ull << 63)); } + + uint64_t node_unsafe(uint64_t i) { return (i & (1ull << 63)); } + + inline uint64_t tail_index(uint64_t t) { return (t & ~(1ull << 63)); } + + int crq_is_closed(uint64_t t) { return (t & (1ull << 63)) != 0; } + + void fixState(Node* lhead) + { + while (1) { + uint64_t t = lhead->tail.fetch_add(0); + uint64_t h = lhead->head.fetch_add(0); + // TODO: is it ok or not to cast "t" to int64_t ? + if (lhead->tail.load() != (int64_t)t) continue; + if (h > t) { + int64_t tmp = t; + if (lhead->tail.compare_exchange_strong(tmp, h)) break; + continue; + } + break; + } + } + + int close_crq(Node* rq, const uint64_t tailticket, const int tries) + { + if (tries < 10) { + int64_t tmp = tailticket + 1; + return rq->tail.compare_exchange_strong(tmp, + (tailticket + 1) | (1ull << 63)); + } else { + return BIT_TEST_AND_SET(&rq->tail, 63); + } + } + + public: + LCRQueue(int maxThreads = MAX_THREADS) : maxThreads{maxThreads} + { + // Shared object init + Node* sentinel = new Node; + head.store(sentinel, std::memory_order_relaxed); + tail.store(sentinel, std::memory_order_relaxed); + } + + ~LCRQueue() + { + while (dequeue(0) != nullptr) + ; // Drain the queue + delete head.load(); // Delete the last node + } + + std::string className() { return "LCRQueue"; } + + void enqueue(T* item, const int tid) + { + int try_close = 0; + while (true) { + Node* ltail = hp.protectPtr(kHpTail, tail.load(), tid); + if (ltail != tail.load()) continue; + Node* lnext = ltail->next.load(); + if (lnext != nullptr) { // Help advance the tail + tail.compare_exchange_strong(ltail, lnext); + continue; + } + + uint64_t tailticket = ltail->tail.fetch_add(1); + if (crq_is_closed(tailticket)) { + Node* newNode = new Node(); + // Solo enqueue (superfluous?) + newNode->tail.store(1, std::memory_order_relaxed); + newNode->array[0].val.store(item, std::memory_order_relaxed); + newNode->array[0].idx.store(0, std::memory_order_relaxed); + Node* nullnode = nullptr; + if (ltail->next.compare_exchange_strong(nullnode, + newNode)) { // Insert new ring + tail.compare_exchange_strong(ltail, newNode); // Advance the tail + hp.clear(tid); + return; + } + delete newNode; + continue; + } + Cell* cell = <ail->array[tailticket & (RING_SIZE - 1)]; + uint64_t idx = cell->idx.load(); + if (cell->val.load() == nullptr) { + if (node_index(idx) <= tailticket) { + // TODO: is the missing cast before "t" ok or not to add? + if ((!node_unsafe(idx) || ltail->head.load() < (int64_t)tailticket)) { + if (CAS2((void**)cell, nullptr, idx, item, tailticket)) { + hp.clear(tid); + return; + } + } + } + } + if (((int64_t)(tailticket - ltail->head.load()) >= (int64_t)RING_SIZE) && + close_crq(ltail, tailticket, ++try_close)) + continue; + } + } + + T* dequeue(const int tid) + { + while (true) { + Node* lhead = hp.protectPtr(kHpHead, head.load(), tid); + if (lhead != head.load()) continue; + uint64_t headticket = lhead->head.fetch_add(1); + Cell* cell = &lhead->array[headticket & (RING_SIZE - 1)]; + + int r = 0; + uint64_t tt = 0; + + while (true) { + uint64_t cell_idx = cell->idx.load(); + uint64_t unsafe = node_unsafe(cell_idx); + uint64_t idx = node_index(cell_idx); + T* val = cell->val.load(); + + if (idx > headticket) break; + + if (val != nullptr) { + if (idx == headticket) { + if (CAS2((void**)cell, val, cell_idx, nullptr, + unsafe | (headticket + RING_SIZE))) { + hp.clear(tid); + return val; + } + } else { + if (CAS2((void**)cell, val, cell_idx, val, set_unsafe(idx))) break; + } + } else { + if ((r & ((1ull << 10) - 1)) == 0) tt = lhead->tail.load(); + // Optimization: try to bail quickly if queue is closed. + int crq_closed = crq_is_closed(tt); + uint64_t t = tail_index(tt); + if (unsafe) { // Nothing to do, move along + if (CAS2((void**)cell, val, cell_idx, val, + unsafe | (headticket + RING_SIZE))) + break; + } else if (t < headticket + 1 || r > 200000 || crq_closed) { + if (CAS2((void**)cell, val, idx, val, headticket + RING_SIZE)) { + if (r > 200000 && tt > RING_SIZE) + BIT_TEST_AND_SET(&lhead->tail, 63); + break; + } + } else { + ++r; + } + } + } + + if (tail_index(lhead->tail.load()) <= headticket + 1) { + fixState(lhead); + // try to return empty + Node* lnext = lhead->next.load(); + if (lnext == nullptr) { + hp.clear(tid); + return nullptr; // Queue is empty + } + if (tail_index(lhead->tail) <= headticket + 1) { + if (head.compare_exchange_strong(lhead, lnext)) hp.retire(lhead, tid); + } + } + } + } +}; + +#endif /* _LCRQ_QUEUE_HP_H_ */ diff --git a/dependency/ConcurrencyFreaks/LICENSE b/dependency/ConcurrencyFreaks/LICENSE new file mode 100644 index 00000000..b9222789 --- /dev/null +++ b/dependency/ConcurrencyFreaks/LICENSE @@ -0,0 +1,23 @@ +MIT License + +Copyright (c) 2014-2019 +Andreia Correia +Pedro Ramalhete + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/dependency/ConcurrencyFreaks/MichaelScottQueue.hpp b/dependency/ConcurrencyFreaks/MichaelScottQueue.hpp new file mode 100644 index 00000000..996599b8 --- /dev/null +++ b/dependency/ConcurrencyFreaks/MichaelScottQueue.hpp @@ -0,0 +1,157 @@ +/****************************************************************************** + * Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Concurrency Freaks nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + *AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + *IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + ****************************************************************************** + */ + +#ifndef _MICHAEL_SCOTT_QUEUE_HP_H_ +#define _MICHAEL_SCOTT_QUEUE_HP_H_ + +#include +#include +#include "HazardPointers.hpp" + +/** + *

Michael-Scott Queue

+ * + * enqueue algorithm: MS enqueue + * dequeue algorithm: MS dequeue + * Consistency: Linearizable + * enqueue() progress: lock-free + * dequeue() progress: lock-free + * Memory Reclamation: Hazard Pointers (lock-free) + * + * + * Maged Michael and Michael Scott's Queue with Hazard Pointers + *

+ * Lock-Free Linked List as described in Maged Michael and Michael Scott's + * paper: + * {@link http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf} + * + * Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue + * Algorithms

The paper on Hazard Pointers is named "Hazard Pointers: + * Safe Memory Reclamation for Lock-Free objects" and it is available here: + * http://web.cecs.pdx.edu/~walpole/class/cs510/papers/11.pdf + * + */ +template +class MichaelScottQueue +{ + private: + struct Node { + T* item; + std::atomic next; + + Node(T* userItem) : item{userItem}, next{nullptr} {} + + bool casNext(Node* cmp, Node* val) + { + return next.compare_exchange_strong(cmp, val); + } + }; + + bool casTail(Node* cmp, Node* val) + { + return tail.compare_exchange_strong(cmp, val); + } + + bool casHead(Node* cmp, Node* val) + { + return head.compare_exchange_strong(cmp, val); + } + + // Pointers to head and tail of the list + alignas(128) std::atomic head; + alignas(128) std::atomic tail; + + static const int MAX_THREADS = 128; + const int maxThreads; + + // We need two hazard pointers for dequeue() + HazardPointers hp{2, maxThreads}; + const int kHpTail = 0; + const int kHpHead = 0; + const int kHpNext = 1; + + public: + MichaelScottQueue(int maxThreads = MAX_THREADS) : maxThreads{maxThreads} + { + Node* sentinelNode = new Node(nullptr); + head.store(sentinelNode, std::memory_order_relaxed); + tail.store(sentinelNode, std::memory_order_relaxed); + } + + ~MichaelScottQueue() + { + while (dequeue(0) != nullptr) + ; // Drain the queue + delete head.load(); // Delete the last node + } + + std::string className() { return "MichaelScottQueue"; } + + void enqueue(T* item, const int tid) + { + if (item == nullptr) throw std::invalid_argument("item can not be nullptr"); + Node* newNode = new Node(item); + while (true) { + Node* ltail = hp.protectPtr(kHpTail, tail, tid); + if (ltail == tail.load()) { + Node* lnext = ltail->next.load(); + if (lnext == nullptr) { + // It seems this is the last node, so add the newNode here + // and try to move the tail to the newNode + if (ltail->casNext(nullptr, newNode)) { + casTail(ltail, newNode); + hp.clear(tid); + return; + } + } else { + casTail(ltail, lnext); + } + } + } + } + + T* dequeue(const int tid) + { + Node* node = hp.protect(kHpHead, head, tid); + while (node != tail.load()) { + Node* lnext = hp.protect(kHpNext, node->next, tid); + if (casHead(node, lnext)) { + T* item = lnext->item; // Another thread may clean up lnext after we do + // hp.clear() + hp.clear(tid); + hp.retire(node, tid); + return item; + } + node = hp.protect(kHpHead, head, tid); + } + hp.clear(tid); + return nullptr; // Queue is empty + } +}; + +#endif /* _MICHAEL_SCOTT_QUEUE_HP_H_ */ diff --git a/dependency/ConcurrencyFreaks/README.txt b/dependency/ConcurrencyFreaks/README.txt new file mode 100644 index 00000000..cfd5e9f6 --- /dev/null +++ b/dependency/ConcurrencyFreaks/README.txt @@ -0,0 +1,11 @@ +ConcurrencyFreaks +================= + +A library of concurrent data structures and synchronization mechanisms. +More info on http://concurrencyfreaks.blogspot.com/ + +C11 - Some locks implemented in the C11 language +CPP - Some locks and data structures implemented in C++1x, mostly lock-free and wait-free queues. You should use a compiler that supports C++14 (like gcc 4.9.1) because some classes use std:shared_timed_mutex +D - Some data structures implemented in the D programming language +Java - Synchronization mechanisms and data structures (mostly lock-free and wait-free queues) implemented in Java. Some of them need Java 8 or above. + diff --git a/dependency/ConcurrencyFreaks/array/FAAArrayQueue.hpp b/dependency/ConcurrencyFreaks/array/FAAArrayQueue.hpp new file mode 100644 index 00000000..240a374d --- /dev/null +++ b/dependency/ConcurrencyFreaks/array/FAAArrayQueue.hpp @@ -0,0 +1,206 @@ +/****************************************************************************** + * Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Concurrency Freaks nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + *AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + *IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + ****************************************************************************** + */ + +#ifndef _FAA_ARRAY_QUEUE_HP_H_ +#define _FAA_ARRAY_QUEUE_HP_H_ + +#include +#include +#include "HazardPointers.hpp" + +/** + *

Fetch-And-Add Array Queue

+ * + * Each node has one array but we don't search for a vacant entry. Instead, we + * use FAA to obtain an index in the array, for enqueueing or dequeuing. + * + * There are some similarities between this queue and the basic queue in YMC: + * http://chaoran.me/assets/pdf/wfq-ppopp16.pdf + * but it's not the same because the queue in listing 1 is obstruction-free, + * while our algorithm is lock-free. In FAAArrayQueue eventually a new node will + * be inserted (using Michael-Scott's algorithm) and it will have an item + * pre-filled in the first position, which means that at most, after BUFFER_SIZE + * steps, one item will be enqueued (and it can then be dequeued). This kind of + * progress is lock-free. + * + * Each entry in the array may contain one of three possible values: + * - A valid item that has been enqueued; + * - nullptr, which means no item has yet been enqueued in that position; + * - taken, a special value that means there was an item but it has been + * dequeued; + * + * Enqueue algorithm: FAA + CAS(null,item) + * Dequeue algorithm: FAA + CAS(item,taken) + * Consistency: Linearizable + * enqueue() progress: lock-free + * dequeue() progress: lock-free + * Memory Reclamation: Hazard Pointers (lock-free) + * Uncontended enqueue: 1 FAA + 1 CAS + 1 HP + * Uncontended dequeue: 1 FAA + 1 CAS + 1 HP + * + * + *

+ * Lock-Free Linked List as described in Maged Michael and Michael Scott's + * paper: + * {@link http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf} + * + * Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue + * Algorithms

The paper on Hazard Pointers is named "Hazard Pointers: + * Safe Memory Reclamation for Lock-Free objects" and it is available here: + * http://web.cecs.pdx.edu/~walpole/class/cs510/papers/11.pdf + * + * @author Pedro Ramalhete + * @author Andreia Correia + */ +template +class FAAArrayQueue +{ + static const long BUFFER_SIZE = 1024; // 1024 + + private: + struct Node { + std::atomic deqidx; + std::atomic items[BUFFER_SIZE]; + std::atomic enqidx; + std::atomic next; + + // Start with the first entry pre-filled and enqidx at 1 + Node(T* item) : deqidx{0}, enqidx{1}, next{nullptr} + { + items[0].store(item, std::memory_order_relaxed); + for (long i = 1; i < BUFFER_SIZE; i++) { + items[i].store(nullptr, std::memory_order_relaxed); + } + } + + bool casNext(Node* cmp, Node* val) + { + return next.compare_exchange_strong(cmp, val); + } + }; + + bool casTail(Node* cmp, Node* val) + { + return tail.compare_exchange_strong(cmp, val); + } + + bool casHead(Node* cmp, Node* val) + { + return head.compare_exchange_strong(cmp, val); + } + + // Pointers to head and tail of the list + alignas(128) std::atomic head; + alignas(128) std::atomic tail; + + static const int MAX_THREADS = 128; + const int maxThreads; + + T* taken = (T*)new int(); // Muuuahahah ! + + // We need just one hazard pointer + HazardPointers hp{1, maxThreads}; + const int kHpTail = 0; + const int kHpHead = 0; + + public: + FAAArrayQueue(int maxThreads = MAX_THREADS) : maxThreads{maxThreads} + { + Node* sentinelNode = new Node(nullptr); + sentinelNode->enqidx.store(0, std::memory_order_relaxed); + head.store(sentinelNode, std::memory_order_relaxed); + tail.store(sentinelNode, std::memory_order_relaxed); + } + + ~FAAArrayQueue() + { + while (dequeue(0) != nullptr) + ; // Drain the queue + delete head.load(); // Delete the last node + delete (int*)taken; + } + + std::string className() { return "FAAArrayQueue"; } + + void enqueue(T* item, const int tid) + { + if (item == nullptr) throw std::invalid_argument("item can not be nullptr"); + while (true) { + Node* ltail = hp.protect(kHpTail, tail, tid); + const int idx = ltail->enqidx.fetch_add(1); + if (idx > BUFFER_SIZE - 1) { // This node is full + if (ltail != tail.load()) continue; + Node* lnext = ltail->next.load(); + if (lnext == nullptr) { + Node* newNode = new Node(item); + if (ltail->casNext(nullptr, newNode)) { + casTail(ltail, newNode); + hp.clear(tid); + return; + } + delete newNode; + } else { + casTail(ltail, lnext); + } + continue; + } + T* itemnull = nullptr; + if (ltail->items[idx].compare_exchange_strong(itemnull, item)) { + hp.clear(tid); + return; + } + } + } + + T* dequeue(const int tid) + { + while (true) { + Node* lhead = hp.protect(kHpHead, head, tid); + if (lhead->deqidx.load() >= lhead->enqidx.load() && + lhead->next.load() == nullptr) + break; + const int idx = lhead->deqidx.fetch_add(1); + if (idx > + BUFFER_SIZE - + 1) { // This node has been drained, check if there is another one + Node* lnext = lhead->next.load(); + if (lnext == nullptr) break; // No more nodes in the queue + if (casHead(lhead, lnext)) hp.retire(lhead, tid); + continue; + } + T* item = lhead->items[idx].exchange(taken); + if (item == nullptr) continue; + hp.clear(tid); + return item; + } + hp.clear(tid); + return nullptr; + } +}; + +#endif /* _FAA_ARRAY_QUEUE_HP_H_ */ diff --git a/dependency/ConcurrencyFreaks/array/LazyIndexArrayQueue.hpp b/dependency/ConcurrencyFreaks/array/LazyIndexArrayQueue.hpp new file mode 100644 index 00000000..1c2596b8 --- /dev/null +++ b/dependency/ConcurrencyFreaks/array/LazyIndexArrayQueue.hpp @@ -0,0 +1,210 @@ +/****************************************************************************** + * Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Concurrency Freaks nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + *AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + *IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + ****************************************************************************** + */ + +#ifndef _LAZY_INDEX_ARRAY_QUEUE_HP_H_ +#define _LAZY_INDEX_ARRAY_QUEUE_HP_H_ + +#include +#include +#include "HazardPointers.hpp" + +/** + *

Lazy Index Array Queue

+ * + * Same as Linear Array Queue but with lazy indexes for both enqueuers and + * dequeuers. + * + * This is a lock-free queue where each node contains an array of items. + * Each entry in the array may contain on of three possible values: + * - A valid item that has been enqueued; + * - nullptr, which means no item has yet been enqueued in that position; + * - taken, a special value that means there was an item but it has been + * dequeued; The enqueue() searches for the first nullptr entry in the array and + * tries to CAS from nullptr to its item. The dequeue() searches for the first + * valid item in the array and tries to CAS from item to "taken". + * + * Enqueue algorithm: Linear array search starting at lazy index with + * CAS(nullptr,item) Dequeue algorithm: Linear array search starting at lazy + * index with CAS(item,taken) Consistency: Linearizable enqueue() progress: + * lock-free dequeue() progress: lock-free Memory Reclamation: Hazard Pointers + * (lock-free) Uncontended enqueue: 1 CAS + 1 HP Uncontended dequeue: 1 CAS + 1 + * HP + * + * + *

+ * Lock-Free Linked List as described in Maged Michael and Michael Scott's + * paper: + * {@link http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf} + * + * Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue + * Algorithms

The paper on Hazard Pointers is named "Hazard Pointers: + * Safe Memory Reclamation for Lock-Free objects" and it is available here: + * http://web.cecs.pdx.edu/~walpole/class/cs510/papers/11.pdf + * + * @author Pedro Ramalhete + * @author Andreia Correia + */ +template +class LazyIndexArrayQueue +{ + static const long BUFFER_SIZE = 1024; + + private: + struct Node { + std::atomic deqidx; + std::atomic items[BUFFER_SIZE]; + std::atomic enqidx; + std::atomic next; + + Node(T* item) : deqidx{0}, enqidx{0}, next{nullptr} + { + items[0].store(item, std::memory_order_relaxed); + for (long i = 1; i < BUFFER_SIZE; i++) { + items[i].store(nullptr, std::memory_order_relaxed); + } + } + + bool casNext(Node* cmp, Node* val) + { + return next.compare_exchange_strong(cmp, val); + } + }; + + bool casTail(Node* cmp, Node* val) + { + return tail.compare_exchange_strong(cmp, val); + } + + bool casHead(Node* cmp, Node* val) + { + return head.compare_exchange_strong(cmp, val); + } + + // Pointers to head and tail of the list + alignas(128) std::atomic head; + alignas(128) std::atomic tail; + + static const int MAX_THREADS = 128; + const int maxThreads; + + T* taken = (T*)new int(); // Muuuahahah ! + + // We need just one hazard pointer + HazardPointers hp{1, maxThreads}; + const int kHpTail = 0; + const int kHpHead = 0; + + public: + LazyIndexArrayQueue(int maxThreads = MAX_THREADS) : maxThreads{maxThreads} + { + Node* sentinelNode = new Node(nullptr); + head.store(sentinelNode, std::memory_order_relaxed); + tail.store(sentinelNode, std::memory_order_relaxed); + } + + ~LazyIndexArrayQueue() + { + while (dequeue(0) != nullptr) + ; // Drain the queue + delete head.load(); // Delete the last node + delete (int*)taken; + } + + std::string className() { return "LazyIndexArrayQueue"; } + + void enqueue(T* item, const int tid) + { + if (item == nullptr) throw std::invalid_argument("item can not be nullptr"); + while (true) { + Node* ltail = hp.protect(kHpTail, tail, tid); + if (ltail->items[BUFFER_SIZE - 1].load() != + nullptr) { // This node is full + if (ltail != tail.load()) continue; + Node* lnext = ltail->next.load(); + if (lnext == nullptr) { + Node* newNode = new Node(item); + if (ltail->casNext(nullptr, newNode)) { + casTail(ltail, newNode); + hp.clear(tid); + return; + } + delete newNode; + } else { + casTail(ltail, lnext); + } + continue; + } + // Find the first null entry in items[] and try to CAS from null to item + for (long i = ltail->enqidx.load(); i < BUFFER_SIZE; i++) { + if (ltail->items[i].load() != nullptr) continue; + T* itemnull = nullptr; + if (ltail->items[i].compare_exchange_strong(itemnull, item)) { + ltail->enqidx.store(i + 1, std::memory_order_release); + hp.clear(tid); + return; + } + if (ltail != tail.load()) break; + } + } + } + + T* dequeue(const int tid) + { + while (true) { + Node* lhead = hp.protect(kHpHead, head, tid); + if (lhead->items[BUFFER_SIZE - 1].load() == + taken) { // This node has been drained, check if there is another one + Node* lnext = lhead->next.load(); + if (lnext == nullptr) { // No more nodes in the queue + hp.clear(tid); + return nullptr; + } + if (casHead(lhead, lnext)) hp.retire(lhead, tid); + continue; + } + // Find the first non taken entry in items[] and try to CAS from item to + // taken + for (long i = lhead->deqidx.load(); i < BUFFER_SIZE; i++) { + T* item = lhead->items[i].load(); + if (item == nullptr) { + hp.clear(tid); + return nullptr; // This node is empty + } + if (item == taken) continue; + if (lhead->items[i].compare_exchange_strong(item, taken)) { + lhead->deqidx.store(i + 1, std::memory_order_release); + hp.clear(tid); + return item; + } + if (lhead != head.load()) break; + } + } + } +}; + +#endif /* _LAZY_INDEX_ARRAY_QUEUE_HP_H_ */ diff --git a/dependency/ConcurrencyFreaks/array/LinearArrayQueue.hpp b/dependency/ConcurrencyFreaks/array/LinearArrayQueue.hpp new file mode 100644 index 00000000..c6c4dba3 --- /dev/null +++ b/dependency/ConcurrencyFreaks/array/LinearArrayQueue.hpp @@ -0,0 +1,207 @@ +/****************************************************************************** + * Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Concurrency Freaks nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + *AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + *IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + ****************************************************************************** + */ + +#ifndef _LINEAR_ARRAY_QUEUE_HP_H_ +#define _LINEAR_ARRAY_QUEUE_HP_H_ + +#include +#include +#include "HazardPointers.hpp" + +/** + *

Linear Array Queue

+ * + * This is a lock-free queue where each node contains an array of items. + * Each entry in the array may contain on of three possible values: + * - A valid item that has been enqueued; + * - nullptr, which means no item has yet been enqueued in that position; + * - taken, a special value that means there was an item but it has been + * dequeued; The enqueue() searches for the first nullptr entry in the array and + * tries to CAS from nullptr to its item. The dequeue() searches for the first + * valid item in the array and tries to CAS from item to "taken". The search is + * done sequentially, seeen that arrays are fast at doing that, as long as + * they're small. + * + * Enqueue algorithm: Linear array search with CAS(nullptr,item) + * Dequeue algorithm: Linear array search with CAS(item,taken) + * Consistency: Linearizable + * enqueue() progress: lock-free + * dequeue() progress: lock-free + * Memory Reclamation: Hazard Pointers (lock-free) + * Uncontended enqueue: 1 CAS + 1 HP + * Uncontended dequeue: 1 CAS + 1 HP + * + * + *

+ * Lock-Free Linked List as described in Maged Michael and Michael Scott's + * paper: + * {@link http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf} + * + * Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue + * Algorithms

The paper on Hazard Pointers is named "Hazard Pointers: + * Safe Memory Reclamation for Lock-Free objects" and it is available here: + * http://web.cecs.pdx.edu/~walpole/class/cs510/papers/11.pdf + * + * @author Pedro Ramalhete + * @author Andreia Correia + */ +template +class LinearArrayQueue +{ + static const long BUFFER_SIZE = 1024; + + private: + struct Node { + std::atomic items[BUFFER_SIZE]; + std::atomic next; + + Node(T* item) : next{nullptr} + { + items[0].store(item, std::memory_order_relaxed); + for (long i = 1; i < BUFFER_SIZE; i++) { + items[i].store(nullptr, std::memory_order_relaxed); + } + } + + bool casNext(Node* cmp, Node* val) + { + return next.compare_exchange_strong(cmp, val); + } + }; + + bool casTail(Node* cmp, Node* val) + { + return tail.compare_exchange_strong(cmp, val); + } + + bool casHead(Node* cmp, Node* val) + { + return head.compare_exchange_strong(cmp, val); + } + + // Pointers to head and tail of the list + alignas(128) std::atomic head; + alignas(128) std::atomic tail; + + static const int MAX_THREADS = 128; + const int maxThreads; + + T* taken = (T*)new int(); // Muuuahahah ! + + // We need just one hazard pointer + HazardPointers hp{1, maxThreads}; + const int kHpTail = 0; + const int kHpHead = 0; + + public: + LinearArrayQueue(int maxThreads = MAX_THREADS) : maxThreads{maxThreads} + { + Node* sentinelNode = new Node(nullptr); + head.store(sentinelNode, std::memory_order_relaxed); + tail.store(sentinelNode, std::memory_order_relaxed); + } + + ~LinearArrayQueue() + { + while (dequeue(0) != nullptr) + ; // Drain the queue + delete head.load(); // Delete the last node + delete (int*)taken; + } + + std::string className() { return "LinearArrayQueue"; } + + void enqueue(T* item, const int tid) + { + if (item == nullptr) throw std::invalid_argument("item can not be nullptr"); + while (true) { + Node* ltail = hp.protect(kHpTail, tail, tid); + if (ltail->items[BUFFER_SIZE - 1].load() != + nullptr) { // This node is full + if (ltail != tail.load()) continue; + Node* lnext = ltail->next.load(); + if (lnext == nullptr) { + Node* newNode = new Node(item); + if (ltail->casNext(nullptr, newNode)) { + casTail(ltail, newNode); + hp.clear(tid); + return; + } + delete newNode; + } else { + casTail(ltail, lnext); + } + continue; + } + // Find the first null entry in items[] and try to CAS from null to item + for (long i = 0; i < BUFFER_SIZE; i++) { + if (ltail->items[i].load() != nullptr) continue; + T* itemnull = nullptr; + if (ltail->items[i].compare_exchange_strong(itemnull, item)) { + hp.clear(tid); + return; + } + if (ltail != tail.load()) break; + } + } + } + + T* dequeue(const int tid) + { + while (true) { + Node* lhead = hp.protect(kHpHead, head, tid); + if (lhead->items[BUFFER_SIZE - 1].load() == + taken) { // This node has been drained, check if there is another one + Node* lnext = lhead->next.load(); + if (lnext == nullptr) { // No more nodes in the queue + hp.clear(tid); + return nullptr; + } + if (casHead(lhead, lnext)) hp.retire(lhead, tid); + continue; + } + // Find the first non taken entry in items[] and try to CAS from item to + // taken + for (long i = 0; i < BUFFER_SIZE; i++) { + T* item = lhead->items[i].load(); + if (item == nullptr) { + hp.clear(tid); + return nullptr; // This node is empty + } + if (item == taken) continue; + if (lhead->items[i].compare_exchange_strong(item, taken)) { + hp.clear(tid); + return item; + } + if (lhead != head.load()) break; + } + } + } +}; + +#endif /* _LINEAR_ARRAY_QUEUE_HP_H_ */ diff --git a/dependency/ConcurrencyFreaks/array/Log2ArrayQueue.hpp b/dependency/ConcurrencyFreaks/array/Log2ArrayQueue.hpp new file mode 100644 index 00000000..a155c357 --- /dev/null +++ b/dependency/ConcurrencyFreaks/array/Log2ArrayQueue.hpp @@ -0,0 +1,259 @@ +/****************************************************************************** + * Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Concurrency Freaks nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + *AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + *IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + ****************************************************************************** + */ + +#ifndef _LOG2_ARRAY_QUEUE_HP_H_ +#define _LOG2_ARRAY_QUEUE_HP_H_ + +#include +#include +#include "HazardPointers.hpp" + +/** + *

Log 2 Array Queue

+ * + * Same as Linear Array Queue but does a binary (Log2) search on the array. + * + * This is a lock-free queue where each node contains an array of items. + * Each entry in the array may contain on of three possible values: + * - A valid item that has been enqueued; + * - nullptr, which means no item has yet been enqueued in that position; + * - taken, a special value that means there was an item but it has been + * dequeued; The enqueue() searches for the first nullptr entry in the array and + * tries to CAS from nullptr to its item. The dequeue() searches for the first + * valid item in the array and tries to CAS from item to "taken". The search is + * done in a binary search which takes at most log2 steps. + * + * Enqueue algorithm: Log2 array search with CAS(nullptr,item) + * Dequeue algorithm: Log2 array search with CAS(item,taken) + * Consistency: Linearizable + * enqueue() progress: lock-free + * dequeue() progress: lock-free + * Memory Reclamation: Hazard Pointers (lock-free) + * Uncontended enqueue: 1 CAS + 1 HP + * Uncontended dequeue: 1 CAS + 1 HP + * + *

+ * Possible improvements: + * - We tried to make the indexing write on one cache line at a time + * (see getIndex()) but it didn't seem to make a difference. More research + * into this may be interesting; + * - Create "nextEnq" and "nextDeq" atomic variables per node where each + * enqueuer and dequeuer write in the end of their operation the index + * of the next nullptr/item. This may be overwritten by an older thread + * but remember that this is per-thread so it may pay off. + * The store can be memory_order_release. + * + * + *

+ * Lock-Free Linked List as described in Maged Michael and Michael Scott's + * paper: + * {@link http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf} + * + * Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue + * Algorithms

The paper on Hazard Pointers is named "Hazard Pointers: + * Safe Memory Reclamation for Lock-Free objects" and it is available here: + * http://web.cecs.pdx.edu/~walpole/class/cs510/papers/11.pdf + * + * @author Pedro Ramalhete + * @author Andreia Correia + */ +template +class Log2ArrayQueue +{ + static const long BUFFER_SIZE = 1024; // 16, 32, 128, 1024, or 2^17 for LCRQ + + private: + struct Node { + std::atomic items[BUFFER_SIZE]; + std::atomic next; + + Node(T* item) : next{nullptr} + { + items[0].store(item, std::memory_order_relaxed); + for (long i = 1; i < BUFFER_SIZE; i++) { + items[i].store(nullptr, std::memory_order_relaxed); + } + } + + bool casNext(Node* cmp, Node* val) + { + return next.compare_exchange_strong(cmp, val); + } + }; + + bool casTail(Node* cmp, Node* val) + { + return tail.compare_exchange_strong(cmp, val); + } + + bool casHead(Node* cmp, Node* val) + { + return head.compare_exchange_strong(cmp, val); + } + + // Pointers to head and tail of the list + std::atomic head alignas(128); + std::atomic tail alignas(128); + + static const int MAX_THREADS = 128; + const int maxThreads; + + T* taken = (T*)new int(); // Muuuahahah ! + + // We need just one hazard pointer + HazardPointers hp{1, maxThreads}; + const int kHpTail = 0; + const int kHpHead = 0; + + long getIndex(long i) + { + return i; + // return ((i << 4) % BUFFER_SIZE) + (i >> (10-4)); // 4 is for cache + // padding + } + + long findFirstNull(Node* node) + { + if (node->items[0].load() == nullptr) return 0; + long minPos = 0; + long maxPos = BUFFER_SIZE - 1; + while (true) { + long pos = (maxPos - minPos) / 2 + minPos; + if (node->items[getIndex(pos)].load() == nullptr) { + maxPos = pos; + } else { + minPos = pos; + } + if (maxPos - minPos <= 3) return minPos; + } + } + + long findLastTaken(Node* node) + { + if (node->items[BUFFER_SIZE - 1].load() == taken) return BUFFER_SIZE - 1; + long minPos = 0; + long maxPos = BUFFER_SIZE - 1; + while (true) { + long pos = (maxPos - minPos) / 2 + minPos; + if (node->items[getIndex(pos)].load() == taken) { + minPos = pos; + } else { + maxPos = pos; + } + if (maxPos - minPos <= 3) return minPos; + } + } + + public: + Log2ArrayQueue(int maxThreads = MAX_THREADS) : maxThreads{maxThreads} + { + Node* sentinelNode = new Node(nullptr); + head.store(sentinelNode, std::memory_order_relaxed); + tail.store(sentinelNode, std::memory_order_relaxed); + } + + ~Log2ArrayQueue() + { + while (dequeue(0) != nullptr) + ; // Drain the queue + delete head.load(); // Delete the last node + delete (int*)taken; + } + + std::string className() { return "Log2ArrayQueue"; } + + void enqueue(T* item, const int tid) + { + if (item == nullptr) throw std::invalid_argument("item can not be nullptr"); + while (true) { + Node* ltail = hp.protect(kHpTail, tail, tid); + if (ltail->items[getIndex(BUFFER_SIZE - 1)].load() != + nullptr) { // This node is full + if (ltail != tail.load()) continue; + Node* lnext = ltail->next.load(); + if (lnext == nullptr) { + Node* newNode = new Node(item); + if (ltail->casNext(nullptr, newNode)) { + casTail(ltail, newNode); + hp.clear(tid); + return; + } + delete newNode; + } else { + casTail(ltail, lnext); + } + continue; + } + // Find the first null entry in items[] and try to CAS from null to item + for (long i = findFirstNull(ltail); i < BUFFER_SIZE; i++) { + if (ltail->items[getIndex(i)].load() != nullptr) continue; + T* itemnull = nullptr; + if (ltail->items[getIndex(i)].compare_exchange_strong(itemnull, item)) { + hp.clear(tid); + return; + } + if (ltail != tail.load()) break; + } + } + } + + T* dequeue(const int tid) + { + while (true) { + Node* lhead = hp.protect(kHpHead, head, tid); + if (lhead->items[getIndex(BUFFER_SIZE - 1)].load() == + taken) { // This node has been drained, check if there is another one + Node* lnext = lhead->next.load(); + if (lnext == nullptr) { // No more nodes in the queue + hp.clear(tid); + return nullptr; + } + if (casHead(lhead, lnext)) hp.retire(lhead, tid); + continue; + } + // Find the first non taken entry in items[] and try to CAS from item to + // taken + long i = findLastTaken(lhead); + for (; i < BUFFER_SIZE; i++) { + T* item = lhead->items[getIndex(i)].load(); + if (item == nullptr) { + hp.clear(tid); + return nullptr; // This node is empty + } + if (item == taken) continue; + if (lhead->items[getIndex(i)].compare_exchange_strong(item, taken)) { + hp.clear(tid); + return item; + } + if (lhead != head.load()) break; + } + } + } +}; + +#endif /* _LOG2_ARRAY_QUEUE_HP_H_ */ diff --git a/dependency/lprq/CMakeLists.txt b/dependency/lprq/CMakeLists.txt new file mode 100644 index 00000000..7c272b60 --- /dev/null +++ b/dependency/lprq/CMakeLists.txt @@ -0,0 +1 @@ +target_include_directories(LCT PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}) diff --git a/dependency/lprq/CacheRemap.hpp b/dependency/lprq/CacheRemap.hpp new file mode 100644 index 00000000..94f1f612 --- /dev/null +++ b/dependency/lprq/CacheRemap.hpp @@ -0,0 +1,43 @@ +#pragma once + +#include +#include + +template +class CacheRemap +{ + private: + static_assert(cache_line_size % cell_size == 0); + static_assert(size * cell_size % cache_line_size == 0); + + static constexpr size_t cellsPerCacheLine = cache_line_size / cell_size; + static constexpr size_t numCacheLines = size * cell_size / cache_line_size; + + public: + constexpr static bool REMAP = true; + + constexpr inline size_t operator[](size_t i) const noexcept + __attribute__((always_inline)) + { + return (i % numCacheLines) * cellsPerCacheLine + i / numCacheLines; + } +}; + +class IdentityRemap +{ + public: + constexpr static bool REMAP = false; + + constexpr inline size_t operator[](size_t i) const noexcept + __attribute__((always_inline)) + { + return i; + } +}; + +template +using ConditionalCacheRemap = + std::conditional_t= cache_line_size), + CacheRemap, + IdentityRemap>; diff --git a/dependency/lprq/LICENSE b/dependency/lprq/LICENSE new file mode 100644 index 00000000..1323fa05 --- /dev/null +++ b/dependency/lprq/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2022 Raed Romanov + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/dependency/lprq/LPRQueue.hpp b/dependency/lprq/LPRQueue.hpp new file mode 100644 index 00000000..218291ab --- /dev/null +++ b/dependency/lprq/LPRQueue.hpp @@ -0,0 +1,154 @@ +#pragma once + +#include +#include "LinkedRingQueue.hpp" +#include "RQCell.hpp" +#include "CacheRemap.hpp" + +template +class PRQueue + : public QueueSegmentBase> +{ + private: + using Base = + QueueSegmentBase>; + using Cell = detail::CRQCell; + + Cell array[ring_size]; + + [[no_unique_address]] ConditionalCacheRemap + remap{}; + + inline uint64_t nodeIndex(uint64_t i) const { return (i & ~(1ull << 63)); } + + inline uint64_t setUnsafe(uint64_t i) const { return (i | (1ull << 63)); } + + inline uint64_t nodeUnsafe(uint64_t i) const { return (i & (1ull << 63)); } + + inline bool isBottom(void* const value) const + { + return (reinterpret_cast(value) & 1) != 0; + } + + inline void* threadLocalBottom(const int tid) const + { + return reinterpret_cast(static_cast((tid << 1) | 1)); + } + + public: + static constexpr size_t RING_SIZE = ring_size; + + PRQueue(uint64_t start) : Base() + { + for (uint64_t i = start; i < start + RING_SIZE; i++) { + uint64_t j = i % RING_SIZE; + array[remap[j]].val.store(nullptr, std::memory_order_relaxed); + array[remap[j]].idx.store(i, std::memory_order_relaxed); + } + Base::head.store(start, std::memory_order_relaxed); + Base::tail.store(start, std::memory_order_relaxed); + } + + static std::string className() + { + using namespace std::string_literals; + return "PRQueue"s + (padded_cells ? "/ca"s : ""s) + + (cache_remap ? "/remap" : ""); + } + + bool enqueue(T* item, [[maybe_unused]] const int tid) + { + int try_close = 0; + + while (true) { + uint64_t tailticket = Base::tail.fetch_add(1); + if (Base::isClosed(tailticket)) { + return false; + } + + Cell& cell = array[remap[tailticket % RING_SIZE]]; + uint64_t idx = cell.idx.load(); + void* val = cell.val.load(); + if (val == nullptr && nodeIndex(idx) <= tailticket && + (!nodeUnsafe(idx) || Base::head.load() <= tailticket)) { + void* bottom = threadLocalBottom(tid); + if (cell.val.compare_exchange_strong(val, bottom)) { + if (cell.idx.compare_exchange_strong(idx, tailticket + RING_SIZE)) { + if (cell.val.compare_exchange_strong(bottom, item)) { + return true; + } + } else { + cell.val.compare_exchange_strong(bottom, nullptr); + } + } + } + if (tailticket >= Base::head.load() + RING_SIZE) { + if (Base::closeSegment(tailticket, ++try_close > 10)) return false; + } + } + } + + T* dequeue([[maybe_unused]] const int tid) + { +#ifdef CAUTIOUS_DEQUEUE + if (Base::isEmpty()) return nullptr; +#endif + + while (true) { + uint64_t headticket = Base::head.fetch_add(1); + Cell& cell = array[remap[headticket % RING_SIZE]]; + + int r = 0; + uint64_t tt = 0; + + while (true) { + uint64_t cell_idx = cell.idx.load(); + uint64_t unsafe = nodeUnsafe(cell_idx); + uint64_t idx = nodeIndex(cell_idx); + void* val = cell.val.load(); + + if (idx > headticket + RING_SIZE) break; + + if (val != nullptr && !isBottom(val)) { + if (idx == headticket + RING_SIZE) { + cell.val.store(nullptr); + return static_cast(val); + } else { + if (unsafe) { + if (cell.idx.load() == cell_idx) break; + } else { + if (cell.idx.compare_exchange_strong(cell_idx, setUnsafe(idx))) + break; + } + } + } else { + if ((r & ((1ull << 8) - 1)) == 0) tt = Base::tail.load(); + + int crq_closed = Base::isClosed(tt); + uint64_t t = Base::tailIndex(tt); + if (unsafe || t < headticket + 1 || crq_closed || r > 4 * 1024) { + if (isBottom(val) && + !cell.val.compare_exchange_strong(val, nullptr)) + continue; + if (cell.idx.compare_exchange_strong( + cell_idx, unsafe | (headticket + RING_SIZE))) + break; + } + ++r; + } + } + + if (Base::tailIndex(Base::tail.load()) <= headticket + 1) { + Base::fixState(); + return nullptr; + } + } + } +}; + +template +using LPRQueue = + LinkedRingQueue>; diff --git a/dependency/lprq/LinkedRingQueue.hpp b/dependency/lprq/LinkedRingQueue.hpp new file mode 100644 index 00000000..ce30c319 --- /dev/null +++ b/dependency/lprq/LinkedRingQueue.hpp @@ -0,0 +1,200 @@ +#pragma once + +#include +#include "x86AtomicOps.hpp" +#include "HazardPointers.hpp" +#include "Metrics.hpp" + +template +class LinkedRingQueue : public MetricsAwareBase +{ + private: + static constexpr int MAX_THREADS = 128; + static constexpr int kHpTail = 0; + static constexpr int kHpHead = 1; + const int maxThreads; + + alignas(128) std::atomic head; + alignas(128) std::atomic tail; + + HazardPointers hp{2, maxThreads}; + + MetricsCollector::Accessor mAppendNode = accessor("appendNode"); + MetricsCollector::Accessor mWasteNode = accessor("wasteNode"); + + inline T* dequeueAfterNextLinked(Segment* lhead, int tid) + { + // This is a hack for LSCQ. + // See SCQ::prepareDequeueAfterNextLinked for details. + // if constexpr(requires(Segment s) { + // s.prepareDequeueAfterNextLinked(); + // }) { + // lhead->prepareDequeueAfterNextLinked(); + // } + return lhead->dequeue(tid); + } + + public: + static constexpr size_t RING_SIZE = Segment::RING_SIZE; + + explicit LinkedRingQueue(int maxThreads = MAX_THREADS) + : MetricsAwareBase(maxThreads), maxThreads{maxThreads} + { + // Shared object init + Segment* sentinel = new Segment(0); + head.store(sentinel, std::memory_order_relaxed); + tail.store(sentinel, std::memory_order_relaxed); + mAppendNode.inc(1, 0); + } + + ~LinkedRingQueue() + { + while (dequeue(0) != nullptr) + ; // Drain the queue + delete head.load(); // Delete the last segment + } + + static std::string className() { return "L" + Segment::className(); } + + void enqueue(T* item, int tid) + { + Segment* ltail = hp.protectPtr(kHpTail, tail.load(), tid); + while (true) { +#ifndef DISABLE_HP + Segment* ltail2 = tail.load(); + if (ltail2 != ltail) { + ltail = hp.protectPtr(kHpTail, ltail2, tid); + continue; + } +#endif + + Segment* lnext = ltail->next.load(); + if (lnext != nullptr) { // Help advance the tail + if (tail.compare_exchange_strong(ltail, lnext)) { + ltail = hp.protectPtr(kHpTail, lnext, tid); + } else { + ltail = hp.protectPtr(kHpTail, tail.load(), tid); + } + continue; + } + + if (ltail->enqueue(item, tid)) { + hp.clearOne(kHpTail, tid); + break; + } + + Segment* newTail = new Segment(ltail->getNextSegmentStartIndex()); + newTail->enqueue(item, tid); + + Segment* nullNode = nullptr; + if (ltail->next.compare_exchange_strong(nullNode, newTail)) { + tail.compare_exchange_strong(ltail, newTail); + hp.clearOne(kHpTail, tid); + mAppendNode.inc(1, tid); + break; + } else { + delete newTail; + mWasteNode.inc(1, tid); + } + + ltail = hp.protectPtr(kHpTail, nullNode, tid); + } + } + + T* dequeue(int tid) + { + Segment* lhead = hp.protectPtr(kHpHead, head.load(), tid); + while (true) { +#ifndef DISABLE_HP + Segment* lhead2 = head.load(); + if (lhead2 != lhead) { + lhead = hp.protectPtr(kHpHead, lhead2, tid); + continue; + } +#endif + + T* item = lhead->dequeue(tid); + if (item == nullptr) { + Segment* lnext = lhead->next.load(); + if (lnext != nullptr) { + item = dequeueAfterNextLinked(lhead, tid); + if (item == nullptr) { + if (head.compare_exchange_strong(lhead, lnext)) { + hp.retire(lhead, tid); + lhead = hp.protectPtr(kHpHead, lnext, tid); + } else { + lhead = hp.protectPtr(kHpHead, lhead, tid); + } + continue; + } + } + } + + hp.clearOne(kHpHead, tid); + return item; + } + } + + size_t estimateSize(int tid) + { + Segment* lhead = hp.protect(kHpHead, head, tid); + Segment* ltail = hp.protect(kHpTail, tail, tid); + uint64_t t = ltail->getTailIndex(); + uint64_t h = lhead->getHeadIndex(); + hp.clear(tid); + return t > h ? t - h : 0; + } +}; + +template +struct QueueSegmentBase { + protected: + alignas(128) std::atomic head{0}; + alignas(128) std::atomic tail{0}; + alignas(128) std::atomic next{nullptr}; + + inline uint64_t tailIndex(uint64_t t) const { return (t & ~(1ull << 63)); } + + inline bool isClosed(uint64_t t) const { return (t & (1ull << 63)) != 0; } + + void fixState() + { + while (true) { + uint64_t t = tail.load(); + uint64_t h = head.load(); + if (tail.load() != t) continue; + if (h > t) { // h would be less than t if queue is closed + uint64_t tmp = t; + if (tail.compare_exchange_strong(tmp, h)) break; + continue; + } + break; + } + } + + bool closeSegment(const uint64_t tailticket, bool force) + { + if (!force) { + uint64_t tmp = tailticket + 1; + return tail.compare_exchange_strong(tmp, (tailticket + 1) | (1ull << 63)); + } else { + return BIT_TEST_AND_SET63(&tail); + } + } + + inline bool isEmpty() const + { + uint64_t h = head.load(); + uint64_t t = tailIndex(tail.load()); + return h >= t; + } + + uint64_t getHeadIndex() { return head.load(); } + + uint64_t getTailIndex() { return tailIndex(tail.load()); } + + uint64_t getNextSegmentStartIndex() { return getTailIndex() - 1; } + + public: + friend class LinkedRingQueue; +}; diff --git a/dependency/lprq/Metrics.hpp b/dependency/lprq/Metrics.hpp new file mode 100644 index 00000000..555079db --- /dev/null +++ b/dependency/lprq/Metrics.hpp @@ -0,0 +1,155 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include "Stats.hpp" + +class Metrics +{ + private: + std::unordered_map metrics{}; + + public: + size_t& operator[](const std::string& metric) { return metrics[metric]; } + + size_t operator[](const std::string& metric) const + { + auto it = metrics.find(metric); + if (it == metrics.end()) return 0; + return it->second; + } + + const std::unordered_map& data() const + { + return metrics; + } + + void reset() { metrics.clear(); } + + Metrics& operator+=(const Metrics& other) + { + for (auto [key, value] : other.metrics) { + metrics[key] += value; + } + return *this; + } + + friend std::ostream& operator<<(std::ostream& stream, const Metrics& metrics); +}; + +Metrics operator+(const Metrics& a, const Metrics& b) +{ + Metrics res; + res += a; + res += b; + return res; +} + +std::ostream& operator<<(std::ostream& stream, const Metrics& metrics) +{ + for (auto [key, value] : metrics.metrics) { + stream << key << ": " << value << '\n'; + } + return stream; +} + +template +std::unordered_map> metricStats(const It begin, + const It end) +{ + std::unordered_map> data; + for (It it = begin; it != end; ++it) { + for (auto [key, value] : it->data()) { + data[key].push_back(static_cast(value)); + } + } + + std::unordered_map> res; + for (const auto& [key, values] : data) { + res[key] = stats(values.begin(), values.end()); + } + return res; +} + +class MetricsCollector +{ + private: + std::unordered_map> allMetrics; + std::mutex mutex; + size_t numThreads; + + public: + class Accessor + { + private: + size_t* tlMetrics; + + public: + explicit Accessor(size_t* tlMetrics) : tlMetrics(tlMetrics) {} + + void inc(const size_t value, int tid) { tlMetrics[tid] += value; } + }; + + explicit MetricsCollector(size_t numThreads) : numThreads(numThreads) {} + MetricsCollector(const Metrics&) = delete; + MetricsCollector(Metrics&&) = delete; + MetricsCollector& operator=(const Metrics&) = delete; + MetricsCollector& operator=(Metrics&&) = delete; + + Accessor accessor(std::string metric) + { + std::lock_guard lock(mutex); + std::vector& tlMetrics = allMetrics[std::move(metric)]; + if (tlMetrics.size() < numThreads) tlMetrics.resize(numThreads, 0); + return Accessor(tlMetrics.data()); + } + + Metrics combine() + { + Metrics res; + std::lock_guard lock(mutex); + for (const auto& [key, tlMetrics] : allMetrics) { + res[key] = std::reduce(tlMetrics.begin(), tlMetrics.end()); + } + return res; + } + + void reset(int tid) + { + std::lock_guard lock(mutex); + for (auto& [key, tlMetrics] : allMetrics) { + tlMetrics[tid] = 0; + } + } + + void reset() + { + std::lock_guard lock(mutex); + allMetrics.clear(); + } +}; + +class MetricsAwareBase +{ + private: + MetricsCollector collector; + + protected: + MetricsCollector::Accessor accessor(std::string metric) + { + return collector.accessor(std::move(metric)); + } + + public: + explicit MetricsAwareBase(size_t numThreads) : collector(numThreads) {} + + Metrics collectMetrics() { return collector.combine(); } + + void resetMetrics(int tid) { collector.reset(tid); } + + void resetMetrics() { collector.reset(); } +}; diff --git a/dependency/lprq/RQCell.hpp b/dependency/lprq/RQCell.hpp new file mode 100644 index 00000000..980aa562 --- /dev/null +++ b/dependency/lprq/RQCell.hpp @@ -0,0 +1,37 @@ +#pragma once + +#include + +namespace detail +{ +template +struct CRQCell; + +template +struct CRQCell { + std::atomic val; + std::atomic idx; + uint64_t pad[14]; +} __attribute__((aligned(128))); + +template +struct CRQCell { + std::atomic val; + std::atomic idx; +} __attribute__((aligned(16))); + +template +struct PlainCell; + +template +struct alignas(128) PlainCell { + std::atomic val; + uint64_t pad[15]; +}; + +template +struct PlainCell { + std::atomic val; +}; + +} // namespace detail diff --git a/dependency/lprq/Stats.hpp b/dependency/lprq/Stats.hpp new file mode 100644 index 00000000..9c099206 --- /dev/null +++ b/dependency/lprq/Stats.hpp @@ -0,0 +1,76 @@ +#pragma once + +#include +#include + +template +struct Stats { + V mean; + V stddev; + + Stats& operator/=(V x) + { + mean /= x; + stddev /= x; + return *this; + } + + Stats& operator*=(V x) + { + mean *= x; + stddev *= x; + return *this; + } + + Stats& operator+=(const Stats& other) + { + mean += other.mean; + + // Assume normal distribution! + stddev = std::sqrt(stddev * stddev + other.stddev * other.stddev); + + return *this; + } + + Stats operator-() const { return {-mean, stddev}; } +}; + +template +Stats operator+(Stats a, const Stats& b) +{ + a += b; + return a; +} + +template +Stats operator*(Stats a, V x) +{ + a *= x; + return a; +} + +template +Stats operator/(Stats a, V x) +{ + a /= x; + return a; +} + +template ::value_type> +Stats stats(const It begin, const It end) +{ + V sum{}; + size_t n = 0; + for (It i = begin; i != end; ++i) { + sum += *i; + ++n; + } + V mean = sum / n; + V sqSum{}; + for (It i = begin; i != end; ++i) { + V x = *i - mean; + sqSum += x * x; + } + V stddev = n == 1 ? V{} : std::sqrt(sqSum / (n - 1)); + return {mean, stddev}; +} diff --git a/dependency/lprq/x86AtomicOps.hpp b/dependency/lprq/x86AtomicOps.hpp new file mode 100644 index 00000000..ba39a53e --- /dev/null +++ b/dependency/lprq/x86AtomicOps.hpp @@ -0,0 +1,11 @@ +#pragma once + +#define BIT_TEST_AND_SET63(ptr) \ + ({ \ + char __ret; \ + asm volatile("lock btsq $63, %0; setnc %1" \ + : "+m"(*ptr), "=a"(__ret) \ + : \ + : "cc"); \ + __ret; \ + }) diff --git a/lci/runtime/completion/cq.c b/lci/runtime/completion/cq.c index d90cee2a..438a04bc 100644 --- a/lci/runtime/completion/cq.c +++ b/lci/runtime/completion/cq.c @@ -12,6 +12,11 @@ void LCII_env_init_cq_type() {"array_atomic_basic", LCT_QUEUE_ARRAY_ATOMIC_BASIC}, {"array_mutex", LCT_QUEUE_ARRAY_MUTEX}, {"std_mutex", LCT_QUEUE_STD_MUTEX}, + {"ms", LCT_QUEUE_MS}, + {"lcrq", LCT_QUEUE_LCRQ}, + {"lprq", LCT_QUEUE_LPRQ}, + {"faaarray", LCT_QUEUE_FAAARRAY}, + {"lazy_index", LCT_QUEUE_LAZY_INDEX}, }; bool succeed = LCT_str_int_search(dict, sizeof(dict) / sizeof(dict[0]), getenv("LCI_CQ_TYPE"), cq_type_default, diff --git a/lct/api/lct.h b/lct/api/lct.h index d9d00c27..6383e64d 100644 --- a/lct/api/lct.h +++ b/lct/api/lct.h @@ -163,6 +163,11 @@ typedef enum { LCT_QUEUE_ARRAY_MUTEX, LCT_QUEUE_STD, LCT_QUEUE_STD_MUTEX, + LCT_QUEUE_MS, + LCT_QUEUE_LCRQ, + LCT_QUEUE_LPRQ, + LCT_QUEUE_FAAARRAY, + LCT_QUEUE_LAZY_INDEX, } LCT_queue_type_t; struct LCT_queue_opaque_t; typedef struct LCT_queue_opaque_t* LCT_queue_t; diff --git a/lct/data_structure/queue/queue.cpp b/lct/data_structure/queue/queue.cpp index a6da279b..3c49933f 100644 --- a/lct/data_structure/queue/queue.cpp +++ b/lct/data_structure/queue/queue.cpp @@ -6,6 +6,9 @@ #include "data_structure/queue/queue_array_atomic_basic.hpp" #include "data_structure/queue/queue_array.hpp" #include "data_structure/queue/queue_std.hpp" +#include "data_structure/queue/queue_concurrency_freaks.hpp" +// #include "data_structure/queue/queue_faaarray.hpp" +// #include "data_structure/queue/queue_lazy_index.hpp" LCT_queue_t LCT_queue_alloc(LCT_queue_type_t type, size_t length) { @@ -32,6 +35,21 @@ LCT_queue_t LCT_queue_alloc(LCT_queue_type_t type, size_t length) case LCT_QUEUE_STD_MUTEX: q = new lct::queue_std_t(); break; + case LCT_QUEUE_MS: + q = new lct::queue_concurrency_freaks_t>(); + break; + case LCT_QUEUE_LCRQ: + q = new lct::queue_concurrency_freaks_t>(); + break; + case LCT_QUEUE_LPRQ: + q = new lct::queue_concurrency_freaks_t>(); + break; + case LCT_QUEUE_FAAARRAY: + q = new lct::queue_concurrency_freaks_t>(); + break; + case LCT_QUEUE_LAZY_INDEX: + q = new lct::queue_concurrency_freaks_t>(); + break; default: throw std::runtime_error("unknown queue type " + std::to_string(type)); } diff --git a/lct/data_structure/queue/queue_concurrency_freaks.hpp b/lct/data_structure/queue/queue_concurrency_freaks.hpp new file mode 100644 index 00000000..ef8ca7d0 --- /dev/null +++ b/lct/data_structure/queue/queue_concurrency_freaks.hpp @@ -0,0 +1,47 @@ +#ifndef LCI_QUEUE_CONCURRENCY_FREAKS_HPP +#define LCI_QUEUE_CONCURRENCY_FREAKS_HPP + +#include "dependency/ConcurrencyFreaks/MichaelScottQueue.hpp" +#include "dependency/ConcurrencyFreaks/LCRQueue.hpp" +#include "dependency/ConcurrencyFreaks/array/FAAArrayQueue.hpp" +#include "dependency/ConcurrencyFreaks/array/LazyIndexArrayQueue.hpp" +#include "dependency/lprq/LPRQueue.hpp" + +namespace lct +{ +template +struct queue_concurrency_freaks_t : public queue_base_t { + queue_concurrency_freaks_t(); + void push(void* val) override; + void* pop() override; + + private: + // TODO: Figure out the best metaparameter. + T queue; +}; + +template +queue_concurrency_freaks_t::queue_concurrency_freaks_t() : queue() +{ +} + +template +void queue_concurrency_freaks_t::push(void* val) +{ + int tid = LCT_get_thread_id(); + LCT_Assert(LCT_log_ctx_default, tid < HazardPointers::HP_MAX_THREADS, + "Too many threads for this queue."); + queue.enqueue(val, tid); +} + +template +void* queue_concurrency_freaks_t::pop() +{ + int tid = LCT_get_thread_id(); + LCT_Assert(LCT_log_ctx_default, tid < HazardPointers::HP_MAX_THREADS, + "Too many threads for this queue."); + return queue.dequeue(tid); +} +} // namespace lct + +#endif // LCI_QUEUE_CONCURRENCY_FREAKS_HPP diff --git a/lct/data_structure/queue/queue_faaarray.hpp b/lct/data_structure/queue/queue_faaarray.hpp new file mode 100644 index 00000000..3545d7a3 --- /dev/null +++ b/lct/data_structure/queue/queue_faaarray.hpp @@ -0,0 +1,53 @@ +#ifndef LCI_QUEUE_FAAARRAY_HPP +#define LCI_QUEUE_FAAARRAY_HPP + +#include "dependency/ConcurrencyFreaks/array/FAAArrayQueue.hpp" + +namespace lct +{ +struct queue_faaarray_t : public queue_base_t { + queue_faaarray_t(); + void push(void* val) override; + void* pop() override; + + private: + // TODO: Figure out the best metaparameter. + FAAArrayQueue queue; + spinlock_t lock; +}; + +queue_faaarray_t::queue_faaarray_t() : queue(512 /*Max Threads*/) +{ + fprintf(stderr, "%d: create queue %p\n", LCT_rank, this); +} + +void queue_faaarray_t::push(void* val) +{ + lock.lock(); + int tid = LCT_get_thread_id(); + if (tid >= 512) + throw std::runtime_error("thread id " + std::to_string(tid) + + " is too large"); + fprintf(stderr, "%d: queue %p tid %d push %p\n", LCT_rank, this, tid, val); + std::atomic_thread_fence(std::memory_order_seq_cst); + queue.enqueue(val, tid); + lock.unlock(); +} + +void* queue_faaarray_t::pop() +{ + if (!lock.try_lock()) return nullptr; + int tid = LCT_get_thread_id(); + if (tid >= 512) + throw std::runtime_error("thread id " + std::to_string(tid) + + " is too large"); + std::atomic_thread_fence(std::memory_order_seq_cst); + void* ret = queue.dequeue(LCT_get_thread_id()); + if (ret) + fprintf(stderr, "%d: queue %p tid %d pop %p\n", LCT_rank, this, tid, ret); + lock.unlock(); + return ret; +} +} // namespace lct + +#endif // LCI_QUEUE_FAAARRAY_HPP