diff --git a/CMakeLists.txt b/CMakeLists.txt
index 57e3213e7..bc28ef9cc 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -598,6 +598,10 @@ if(WITH_TESTS)
target_link_libraries(dwarfs_utils_test gtest gtest_main)
list(APPEND TEST_TARGETS dwarfs_utils_test)
+ add_executable(block_merger_test test/block_merger_test.cpp)
+ target_link_libraries(block_merger_test gtest gtest_main)
+ list(APPEND TEST_TARGETS block_merger_test)
+
add_executable(dwarfs_pcm_sample_transformer_test test/pcm_sample_transformer_test.cpp)
target_link_libraries(dwarfs_pcm_sample_transformer_test gtest gtest_main)
list(APPEND TEST_TARGETS dwarfs_pcm_sample_transformer_test)
diff --git a/include/dwarfs/block_merger.h b/include/dwarfs/block_merger.h
new file mode 100644
index 000000000..27c92b0c9
--- /dev/null
+++ b/include/dwarfs/block_merger.h
@@ -0,0 +1,37 @@
+/* vim:set ts=2 sw=2 sts=2 et: */
+/**
+ * \author Marcus Holland-Moritz (github@mhxnet.de)
+ * \copyright Copyright (c) Marcus Holland-Moritz
+ *
+ * This file is part of dwarfs.
+ *
+ * dwarfs is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * dwarfs is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with dwarfs. If not, see .
+ */
+
+#pragma once
+
+namespace dwarfs {
+
+template
+class block_merger {
+ public:
+ using source_type = SourceT;
+ using block_type = BlockT;
+
+ virtual ~block_merger() = default;
+
+ virtual void add(source_type src, block_type blk, bool is_last) = 0;
+};
+
+} // namespace dwarfs
diff --git a/include/dwarfs/multi_queue_block_merger.h b/include/dwarfs/multi_queue_block_merger.h
new file mode 100644
index 000000000..0f8e7ab65
--- /dev/null
+++ b/include/dwarfs/multi_queue_block_merger.h
@@ -0,0 +1,143 @@
+/* vim:set ts=2 sw=2 sts=2 et: */
+/**
+ * \author Marcus Holland-Moritz (github@mhxnet.de)
+ * \copyright Copyright (c) Marcus Holland-Moritz
+ *
+ * This file is part of dwarfs.
+ *
+ * dwarfs is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * dwarfs is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with dwarfs. If not, see .
+ */
+
+#pragma once
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include "dwarfs/block_merger.h"
+
+namespace dwarfs {
+
+template
+class multi_queue_block_merger : public block_merger {
+ public:
+ using source_type = SourceT;
+ using block_type = BlockT;
+
+ multi_queue_block_merger(size_t num_active_slots, size_t max_queued_blocks,
+ std::vector const& sources,
+ std::function on_block_merged)
+ : num_queueable_{max_queued_blocks}
+ , sources_{sources.begin(), sources.end()}
+ , active_(num_active_slots)
+ , on_block_merged_{on_block_merged} {
+ for (size_t i = 0; i < active_.size() && !sources_.empty(); ++i) {
+ active_[i] = sources_.front();
+ sources_.pop_front();
+ }
+ }
+
+ void add(source_type src, block_type blk, bool is_last) override {
+ std::unique_lock lock{mx_};
+
+ cv_.wait(lock,
+ [this, &src] { return source_distance(src) < num_queueable_; });
+
+ --num_queueable_;
+
+ queues_[src].emplace(std::move(blk), is_last);
+
+ while (try_merge_block()) {
+ }
+
+ cv_.notify_all();
+ }
+
+ private:
+ size_t source_distance(source_type src) const {
+ auto ix = active_index_;
+ size_t distance{0};
+
+ while (active_[ix] && active_[ix].value() != src) {
+ ++distance;
+ ix = (ix + 1) % active_.size();
+
+ if (ix == active_index_) {
+ auto it = std::find(begin(sources_), end(sources_), src);
+ distance += std::distance(begin(sources_), it);
+ break;
+ }
+ }
+
+ return distance;
+ }
+
+ bool try_merge_block() {
+ auto const ix = active_index_;
+
+ assert(active_[ix]);
+
+ auto src = active_[ix].value();
+ auto it = queues_.find(src);
+
+ if (it == queues_.end() || it->second.empty()) {
+ return false;
+ }
+
+ auto [blk, is_last] = std::move(it->second.front());
+ it->second.pop();
+
+ on_block_merged_(std::move(blk));
+
+ ++num_queueable_;
+
+ if (is_last) {
+ queues_.erase(it);
+ update_active(ix);
+ }
+
+ do {
+ active_index_ = (active_index_ + 1) % active_.size();
+ } while (active_index_ != ix && !active_[active_index_]);
+
+ return active_index_ != ix || active_[active_index_];
+ }
+
+ void update_active(size_t ix) {
+ if (!sources_.empty()) {
+ active_[ix] = sources_.front();
+ sources_.pop_front();
+ } else {
+ active_[ix].reset();
+ }
+ }
+
+ std::mutex mx_;
+ std::condition_variable cv_;
+ size_t active_index_{0};
+ size_t num_queueable_;
+ std::unordered_map>>
+ queues_;
+ std::deque sources_;
+ std::vector> active_;
+ std::function on_block_merged_;
+};
+
+} // namespace dwarfs
diff --git a/test/block_merger_test.cpp b/test/block_merger_test.cpp
new file mode 100644
index 000000000..27ea44843
--- /dev/null
+++ b/test/block_merger_test.cpp
@@ -0,0 +1,267 @@
+/* vim:set ts=2 sw=2 sts=2 et: */
+/**
+ * \author Marcus Holland-Moritz (github@mhxnet.de)
+ * \copyright Copyright (c) Marcus Holland-Moritz
+ *
+ * This file is part of dwarfs.
+ *
+ * dwarfs is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * dwarfs is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with dwarfs. If not, see .
+ */
+
+#include
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include
+
+#include "dwarfs/multi_queue_block_merger.h"
+
+using namespace dwarfs;
+
+namespace {
+
+constexpr int const debuglevel{0};
+
+constexpr size_t const max_runs{250};
+constexpr size_t const num_runner_threads{16};
+constexpr size_t const num_repetitions{4};
+
+using block = std::pair;
+
+// Use std::shared_mutex because folly::SharedMutex might trigger TSAN
+template
+using synchronized = folly::Synchronized;
+
+template
+using sync_queue = synchronized>;
+
+class source {
+ public:
+ source(size_t id, std::mt19937& delay_rng, std::mt19937& rng,
+ size_t max_blocks = 20, double ips = 5000.0)
+ : id_{id}
+ , blocks_{init_blocks(delay_rng, rng, max_blocks, ips)} {}
+
+ std::tuple next() {
+ auto idx = idx_++;
+ return {std::make_pair(id_, idx), idx_ >= blocks_.size(), blocks_[idx]};
+ }
+
+ size_t id() const { return id_; }
+
+ size_t num_blocks() const { return blocks_.size(); }
+
+ std::chrono::nanoseconds total_time() const {
+ auto seconds = std::accumulate(begin(blocks_), end(blocks_), 0.0);
+ return std::chrono::duration_cast(
+ std::chrono::duration(seconds));
+ }
+
+ private:
+ static std::vector
+ init_blocks(std::mt19937& delay_rng, std::mt19937& rng, size_t max_blocks,
+ double ips) {
+ std::uniform_int_distribution<> idist(1, max_blocks);
+ std::exponential_distribution<> edist(ips);
+ std::vector blocks;
+ blocks.resize(idist(rng));
+ std::generate(begin(blocks), end(blocks), [&] { return edist(delay_rng); });
+ return blocks;
+ }
+
+ size_t idx_{0};
+ size_t id_;
+ std::vector blocks_;
+};
+
+void emitter(sync_queue