diff --git a/Examples/Framework/CMakeLists.txt b/Examples/Framework/CMakeLists.txt index 032627e0b89..26e280b4dd3 100644 --- a/Examples/Framework/CMakeLists.txt +++ b/Examples/Framework/CMakeLists.txt @@ -15,6 +15,7 @@ add_library( src/Framework/RandomNumbers.cpp src/Framework/Sequencer.cpp src/Framework/DataHandle.cpp + src/Framework/BufferedReader.cpp src/Utilities/EventDataTransforms.cpp src/Utilities/Paths.cpp src/Utilities/Options.cpp diff --git a/Examples/Framework/include/ActsExamples/Framework/BufferedReader.hpp b/Examples/Framework/include/ActsExamples/Framework/BufferedReader.hpp new file mode 100644 index 00000000000..5f12c5df30a --- /dev/null +++ b/Examples/Framework/include/ActsExamples/Framework/BufferedReader.hpp @@ -0,0 +1,73 @@ +// This file is part of the ACTS project. +// +// Copyright (C) 2016 CERN for the benefit of the ACTS project +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +#pragma once + +#include "Acts/Utilities/Logger.hpp" +#include "ActsExamples/Framework/AlgorithmContext.hpp" +#include "ActsExamples/Framework/IReader.hpp" +#include "ActsExamples/Framework/ProcessCode.hpp" + +#include + +namespace ActsExamples { + +class WhiteBoard; + +/// Event data reader that takes a concrete reader instance, reads a number of +/// events in a buffer, and selects events from that buffer instead of directly +/// reading them from disk. +/// The purpose is to avoid IO bottlenecks in timing measurements +class BufferedReader final : public IReader { + public: + struct Config { + /// The upstream reader that should be used + std::shared_ptr upstreamReader; + + /// The seed for sampling events from the buffer + std::size_t selectionSeed = 123456; + + /// Buffer size. The reader will throw and exception if the downstream + /// reader does not provide enough events + std::size_t bufferSize = 1; + }; + + /// Constructed the reader + BufferedReader(const Config& config, Acts::Logging::Level level); + + /// Return the config + const Config& config() const { return m_cfg; } + + /// Give the reader a understandable name + std::string name() const override { + return "Buffered" + m_cfg.upstreamReader->name(); + } + + /// The buffered reader provides the maximum available event range + std::pair availableEvents() const override { + return {0, std::numeric_limits::max()}; + } + + /// Return a event from the buffer + ProcessCode read(const AlgorithmContext& ctx) override; + + /// Fulfill the algorithm interface + ProcessCode initialize() override { return ProcessCode::SUCCESS; } + + /// Fulfill the algorithm interface + ProcessCode finalize() override { return ProcessCode::SUCCESS; } + + private: + Config m_cfg; + std::unique_ptr m_logger; + std::vector> m_buffer; + + const Acts::Logger& logger() const { return *m_logger; } +}; + +} // namespace ActsExamples diff --git a/Examples/Framework/include/ActsExamples/Framework/SequenceElement.hpp b/Examples/Framework/include/ActsExamples/Framework/SequenceElement.hpp index 4db3b8d1af6..591579af2cd 100644 --- a/Examples/Framework/include/ActsExamples/Framework/SequenceElement.hpp +++ b/Examples/Framework/include/ActsExamples/Framework/SequenceElement.hpp @@ -51,6 +51,8 @@ class SequenceElement { template friend class ReadDataHandle; + friend class BufferedReader; + std::vector m_writeHandles; std::vector m_readHandles; }; diff --git a/Examples/Framework/include/ActsExamples/Framework/WhiteBoard.hpp b/Examples/Framework/include/ActsExamples/Framework/WhiteBoard.hpp index 777d27fe898..44cc22ee712 100644 --- a/Examples/Framework/include/ActsExamples/Framework/WhiteBoard.hpp +++ b/Examples/Framework/include/ActsExamples/Framework/WhiteBoard.hpp @@ -18,7 +18,6 @@ #include #include #include -#include #include #include #include @@ -38,28 +37,20 @@ class WhiteBoard { Acts::getDefaultLogger("WhiteBoard", Acts::Logging::INFO), std::unordered_map objectAliases = {}); - // A WhiteBoard holds unique elements and can not be copied WhiteBoard(const WhiteBoard& other) = delete; WhiteBoard& operator=(const WhiteBoard&) = delete; + WhiteBoard(WhiteBoard&& other) = default; + WhiteBoard& operator=(WhiteBoard&& other) = default; + bool exists(const std::string& name) const; - private: - /// Store an object on the white board and transfer ownership. - /// - /// @param name Non-empty identifier to store it under - /// @param object Movable reference to the transferable object - /// @throws std::invalid_argument on empty or duplicate name - template - void add(const std::string& name, T&& object); - - /// Get access to a stored object. - /// - /// @param[in] name Identifier for the object - /// @return reference to the stored object - /// @throws std::out_of_range if no object is stored under the requested name - template - const T& get(const std::string& name) const; + /// Copies key from another whiteboard to this whiteboard. + /// This is a low overhead operation, since the data holders are + /// shared pointers. + /// Throws an exception if this whiteboard already contains one of + /// the keys in the other whiteboard. + void copyFrom(const WhiteBoard& other); private: /// Find similar names for suggestions with levenshtein-distance @@ -80,6 +71,30 @@ class WhiteBoard { const std::type_info& type() const override { return typeid(T); } }; + /// Store a holder on the white board. + /// + /// @param name Non-empty identifier to store it under + /// @param holder The holder to store + /// @throws std::invalid_argument on empty or duplicate name + void addHolder(const std::string& name, std::shared_ptr holder); + + /// Store an object on the white board and transfer ownership. + /// + /// @param name Non-empty identifier to store it under + /// @param object Movable reference to the transferable object + template + void add(const std::string& name, T&& object) { + addHolder(name, std::make_shared>(std::forward(object))); + } + + /// Get access to a stored object. + /// + /// @param[in] name Identifier for the object + /// @return reference to the stored object + /// @throws std::out_of_range if no object is stored under the requested name + template + const T& get(const std::string& name) const; + std::unique_ptr m_logger; std::unordered_map> m_store; std::unordered_map m_objectAliases; @@ -103,23 +118,6 @@ inline ActsExamples::WhiteBoard::WhiteBoard( std::unordered_map objectAliases) : m_logger(std::move(logger)), m_objectAliases(std::move(objectAliases)) {} -template -inline void ActsExamples::WhiteBoard::add(const std::string& name, T&& object) { - if (name.empty()) { - throw std::invalid_argument("Object can not have an empty name"); - } - if (m_store.contains(name)) { - throw std::invalid_argument("Object '" + name + "' already exists"); - } - auto holder = std::make_shared>(std::forward(object)); - m_store.emplace(name, holder); - ACTS_VERBOSE("Added object '" << name << "' of type " << typeid(T).name()); - if (auto it = m_objectAliases.find(name); it != m_objectAliases.end()) { - m_store[it->second] = holder; - ACTS_VERBOSE("Added alias object '" << it->second << "'"); - } -} - template inline const T& ActsExamples::WhiteBoard::get(const std::string& name) const { ACTS_VERBOSE("Attempt to get object '" << name << "' of type " diff --git a/Examples/Framework/src/Framework/BufferedReader.cpp b/Examples/Framework/src/Framework/BufferedReader.cpp new file mode 100644 index 00000000000..4431fd99f5d --- /dev/null +++ b/Examples/Framework/src/Framework/BufferedReader.cpp @@ -0,0 +1,75 @@ +// This file is part of the ACTS project. +// +// Copyright (C) 2016 CERN for the benefit of the ACTS project +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +#include "ActsExamples/Framework/BufferedReader.hpp" + +#include "Acts/Utilities/Logger.hpp" +#include "ActsExamples/Framework/AlgorithmContext.hpp" +#include "ActsExamples/Framework/WhiteBoard.hpp" + +#include +#include + +namespace ActsExamples { + +BufferedReader::BufferedReader(const Config &config, Acts::Logging::Level level) + : m_cfg(config), m_logger(Acts::getDefaultLogger(name(), level)) { + if (!m_cfg.upstreamReader) { + throw std::invalid_argument("No upstream reader provided!"); + } + + // Register write and read handles of the upstream reader + for (auto rh : m_cfg.upstreamReader->readHandles()) { + registerReadHandle(*rh); + } + + for (auto wh : m_cfg.upstreamReader->writeHandles()) { + registerWriteHandle(*wh); + } + + // Read the events + auto [ebegin, eend] = m_cfg.upstreamReader->availableEvents(); + if (eend - ebegin < m_cfg.bufferSize) { + throw std::runtime_error("Reader does not provide enough events"); + } + + ACTS_INFO("Start reading events into buffer..."); + + m_buffer.reserve(eend - ebegin); + for (auto i = ebegin; i < ebegin + m_cfg.bufferSize; ++i) { + auto board = std::make_unique(m_logger->clone()); + ActsExamples::AlgorithmContext ctx(0, i, *board); + + ACTS_DEBUG("Read event " << i << " into buffer"); + m_cfg.upstreamReader->read(ctx); + m_buffer.emplace_back(std::move(board)); + } + + ACTS_INFO("Filled " << m_buffer.size() << " events into the buffer"); +} + +ProcessCode BufferedReader::read(const AlgorithmContext &ctx) { + // Set up a random event selection that is consistent if multiple + // BufferedReader are used within a workflow The linear congruential engine is + // chosen since it is cheap to instantiate. For each eventNumber, it is put in + // a reproducible state. + std::minstd_rand rng(m_cfg.selectionSeed); + rng.discard(ctx.eventNumber); + + /// Sample from the buffer and transfer the content + std::uniform_int_distribution dist(0, m_cfg.bufferSize - 1); + + const auto entry = dist(rng); + ctx.eventStore.copyFrom(*m_buffer.at(entry)); + + ACTS_DEBUG("Use buffer entry " << entry << " for event " << ctx.eventNumber); + + return ProcessCode::SUCCESS; +} + +} // namespace ActsExamples diff --git a/Examples/Framework/src/Framework/WhiteBoard.cpp b/Examples/Framework/src/Framework/WhiteBoard.cpp index 758767f8429..eb74d5e7d46 100644 --- a/Examples/Framework/src/Framework/WhiteBoard.cpp +++ b/Examples/Framework/src/Framework/WhiteBoard.cpp @@ -67,6 +67,11 @@ std::vector ActsExamples::WhiteBoard::similarNames( names.push_back({d, n}); } } + for (const auto &[from, to] : m_objectAliases) { + if (const auto d = levenshteinDistance(from, name); d < distThreshold) { + names.push_back({d, from}); + } + } std::ranges::sort(names, {}, [](const auto &n) { return n.first; }); @@ -84,3 +89,36 @@ std::string ActsExamples::WhiteBoard::typeMismatchMessage( boost::core::demangle(req) + " but actually " + boost::core::demangle(act)}; } + +void ActsExamples::WhiteBoard::copyFrom(const WhiteBoard &other) { + for (auto &[key, val] : other.m_store) { + addHolder(key, val); + ACTS_VERBOSE("Copied key '" << key << "' to whiteboard"); + } +} + +void ActsExamples::WhiteBoard::addHolder(const std::string &name, + std::shared_ptr holder) { + if (name.empty()) { + throw std::invalid_argument("Object can not have an empty name"); + } + + if (holder == nullptr) { + throw std::invalid_argument("Object '" + name + "' is nullptr"); + } + + auto [storeIt, success] = m_store.insert({name, std::move(holder)}); + + if (!success) { + throw std::invalid_argument("Object '" + name + "' already exists"); + } + ACTS_VERBOSE("Added object '" << name << "' of type " + << storeIt->second->type().name()); + + if (success) { + if (auto it = m_objectAliases.find(name); it != m_objectAliases.end()) { + m_store[it->second] = storeIt->second; + ACTS_VERBOSE("Added alias object '" << it->second << "'"); + } + } +} diff --git a/Examples/Python/src/Input.cpp b/Examples/Python/src/Input.cpp index b3ad9e1f657..60e0424b208 100644 --- a/Examples/Python/src/Input.cpp +++ b/Examples/Python/src/Input.cpp @@ -8,6 +8,7 @@ #include "Acts/Plugins/Python/Utilities.hpp" #include "ActsExamples/EventData/Cluster.hpp" +#include "ActsExamples/Framework/BufferedReader.hpp" #include "ActsExamples/Io/Csv/CsvDriftCircleReader.hpp" #include "ActsExamples/Io/Csv/CsvExaTrkXGraphReader.hpp" #include "ActsExamples/Io/Csv/CsvMeasurementReader.hpp" @@ -39,6 +40,11 @@ namespace Acts::Python { void addInput(Context& ctx) { auto mex = ctx.get("examples"); + // Buffered reader + ACTS_PYTHON_DECLARE_READER(ActsExamples::BufferedReader, mex, + "BufferedReader", upstreamReader, selectionSeed, + bufferSize); + // ROOT READERS ACTS_PYTHON_DECLARE_READER(ActsExamples::RootParticleReader, mex, "RootParticleReader", outputParticles, treeName, diff --git a/Examples/Python/tests/test_reader.py b/Examples/Python/tests/test_reader.py index 46b1c9c229f..5faf358efd3 100644 --- a/Examples/Python/tests/test_reader.py +++ b/Examples/Python/tests/test_reader.py @@ -435,3 +435,52 @@ def test_edm4hep_tracks_reader(tmp_path): ) s.run() + + +@pytest.mark.root +def test_buffered_reader(tmp_path, conf_const, ptcl_gun): + # Test the buffered reader with the ROOT particle reader + # need to write out some particles first + eventsInBuffer = 5 + eventsToProcess = 10 + + s = Sequencer(numThreads=1, events=eventsInBuffer, logLevel=acts.logging.WARNING) + evGen = ptcl_gun(s) + + file = tmp_path / "particles.root" + s.addWriter( + conf_const( + RootParticleWriter, + acts.logging.WARNING, + inputParticles=evGen.config.outputParticles, + filePath=str(file), + ) + ) + + s.run() + + # reset sequencer for reading + s2 = Sequencer(events=eventsToProcess, numThreads=1, logLevel=acts.logging.WARNING) + + reader = acts.examples.RootParticleReader( + level=acts.logging.WARNING, + outputParticles="particles_input", + filePath=str(file), + ) + + s2.addReader( + acts.examples.BufferedReader( + level=acts.logging.WARNING, + upstreamReader=reader, + bufferSize=eventsInBuffer, + ) + ) + + alg = AssertCollectionExistsAlg( + "particles_input", "check_alg", acts.logging.WARNING + ) + s2.addAlgorithm(alg) + + s2.run() + + assert alg.events_seen == eventsToProcess