From c2c635b31fc29a8aff7009c7213722a5ec034f79 Mon Sep 17 00:00:00 2001 From: Andrea Bocci Date: Mon, 18 Nov 2024 17:08:31 +0100 Subject: [PATCH 1/6] Implement a generic product type Implement a generic product type that can be constructed based on the object's ObjectWithDict and on the wrapper's TypeWithDict. --- FWCore/Framework/interface/GenericProduct.h | 72 +++++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 FWCore/Framework/interface/GenericProduct.h diff --git a/FWCore/Framework/interface/GenericProduct.h b/FWCore/Framework/interface/GenericProduct.h new file mode 100644 index 0000000000000..3751abab5644f --- /dev/null +++ b/FWCore/Framework/interface/GenericProduct.h @@ -0,0 +1,72 @@ +#include +#include +#include +#include + +#include "DataFormats/Common/interface/OrphanHandle.h" +#include "FWCore/Framework/interface/Event.h" +#include "FWCore/Reflection/interface/ObjectWithDict.h" +#include "FWCore/Reflection/interface/TypeWithDict.h" + +namespace edm { + + class GenericProduct { + public: + // TODO make private and add accessors and a constructor + TypeWithDict wrappedType_; + ObjectWithDict object_; + }; + + template <> + class OrphanHandle : public OrphanHandleBase { + public: + OrphanHandle(ObjectWithDict prod, ProductID const& id) : OrphanHandleBase(prod.address(), id) {} + }; + + // specialise Event::putImpl for GenericProduct + template <> + OrphanHandle Event::putImpl(EDPutToken::value_type index, std::unique_ptr product) { + /* TODO implement for ObjectWithDict + // The following will call post_insert if T has such a function, + // and do nothing if T has no such function. + if constexpr (not std::derived_from and requires(T& p) { p.post_insert(); }) { + iProduct.post_insert(); + } + */ + + assert(index < putProducts().size()); + ObjectWithDict wrapper = product->wrappedType_.construct(); + + // memcpy the object representation of product->object_ into the wrapper + std::memcpy(wrapper.get("obj").address(), product->object_.address(), product->object_.typeOf().size()); + ObjectWithDict prod(product->object_.typeOf(), wrapper.get("obj").address()); + + // mark the object as present + *reinterpret_cast(wrapper.get("present").address()) = true; + + std::unique_ptr wp(reinterpret_cast(wrapper.address())); + putProducts()[index] = std::move(wp); + auto const& prodID = provRecorder_.getProductID(index); + return OrphanHandle(prod, prodID); + } + + // specialise Event::put for GenericProduct + template <> + OrphanHandle Event::put(EDPutToken token, std::unique_ptr product) { + if (UNLIKELY(product.get() == nullptr)) { // null pointer is illegal + TypeID typeID(typeid(GenericProduct)); + principal_get_adapter_detail::throwOnPutOfNullProduct("Event", typeID, provRecorder_.productInstanceLabel(token)); + } + std::type_info const& type = product->object_.typeOf().typeInfo(); + if (UNLIKELY(token.isUninitialized())) { + principal_get_adapter_detail::throwOnPutOfUninitializedToken("Event", type); + } + TypeID const& expected = provRecorder_.getTypeIDForPutTokenIndex(token.index()); + if (UNLIKELY(expected != TypeID{type})) { + principal_get_adapter_detail::throwOnPutOfWrongType(type, expected); + } + + return putImpl(token.index(), std::move(product)); + } + +} // namespace edm From b67443ccd6afbe1fad0c45242ac688eafcf9fbd1 Mon Sep 17 00:00:00 2001 From: Andrea Bocci Date: Tue, 26 Nov 2024 06:57:43 +0100 Subject: [PATCH 2/6] Implement edmtest::GenericCloner This EDProducer will clone all the event products declared by its configuration, using their ROOT dictionaries. Refactor common functionality with GenericConsumer. --- .../Provenance/interface/BranchPattern.h | 124 ++++++++++++ FWCore/Modules/src/GenericConsumer.cc | 92 +-------- FWCore/TestModules/README.md | 11 ++ FWCore/TestModules/plugins/GenericCloner.cc | 181 ++++++++++++++++++ FWCore/TestModules/test/BuildFile.xml | 2 + .../TestModules/test/testGenericCloner_cfg.py | 36 ++++ 6 files changed, 364 insertions(+), 82 deletions(-) create mode 100644 DataFormats/Provenance/interface/BranchPattern.h create mode 100644 FWCore/TestModules/plugins/GenericCloner.cc create mode 100644 FWCore/TestModules/test/testGenericCloner_cfg.py diff --git a/DataFormats/Provenance/interface/BranchPattern.h b/DataFormats/Provenance/interface/BranchPattern.h new file mode 100644 index 0000000000000..a73f316979e41 --- /dev/null +++ b/DataFormats/Provenance/interface/BranchPattern.h @@ -0,0 +1,124 @@ +#ifndef DataFormats_Provenance_interface_BranchPattern_h +#define DataFormats_Provenance_interface_BranchPattern_h + +#include +#include +#include +#include +#include + +#include + +#include "DataFormats/Provenance/interface/BranchDescription.h" +#include "FWCore/Utilities/interface/EDMException.h" + +namespace edm { + + /* BranchPattern + * + * A BranchPattern is constructed from a string representing either a module label (e.g. "") or a + * a branch name (e.g. "___"). + * + * A BranchPattern object can be compared with a BranchDescription object using the match() method: + * + * branchPattern.match(branch) + * + * . + * Glob expressions ("?" and "*") are supported in module labels and within the individual fields of branch names, + * similar to an OutputModule's "keep" statements. + * Use "*" to match all products of a given category. + * + * If a module label is used, it must not contain any underscores ("_"); the resulting BranchPattern will match all + * the branches prodced by a module with the given label, including those with a non-empty instance names, and those + * produced by the Transformer functionality (such as the implicitly copied-to-host products in case of Alpaka-based + * modules). + * If a branch name is used, all four fields must be present, separated by underscores; the resulting BranchPattern + * will match the branches matching all four fields. + * + * For example, in the case of products from an Alpaka-based producer running on a device + * + * BranchPattern("module") + * + * would match all branches produced by "module", including the automatic host copy of its device products. + * While + * + * BranchPattern( "*DeviceProduct_module_*_*" ) + * + * would match only the branches corresponding to the device products. + */ + + class BranchPattern { + public: + explicit BranchPattern(std::string const& label) { + static const char kSeparator = '_'; + static const std::string_view kWildcard{"*"}; + static const std::regex kAny{".*"}; + + // wildcard + if (label == kWildcard) { + type_ = kAny; + moduleLabel_ = kAny; + productInstanceName_ = kAny; + processName_ = kAny; + return; + } + + int fields = std::count(label.begin(), label.end(), kSeparator) + 1; + if (fields == 1) { + // convert the module label into a regular expression + type_ = kAny; + moduleLabel_ = glob_to_regex(label); + productInstanceName_ = kAny; + processName_ = kAny; + } else if (fields == 4) { + // split the branch name into ___ + // and convert the glob expressions into regular expressions + size_t first = 0, last = 0; + last = label.find(kSeparator, first); + type_ = glob_to_regex(label.substr(first, last - first)); + first = last + 1; + last = label.find(kSeparator, first); + moduleLabel_ = glob_to_regex(label.substr(first, last - first)); + first = last + 1; + last = label.find(kSeparator, first); + productInstanceName_ = glob_to_regex(label.substr(first, last - first)); + first = last + 1; + last = label.find(kSeparator, first); + processName_ = glob_to_regex(label.substr(first, last - first)); + } else { + // invalid input + throw edm::Exception(edm::errors::Configuration) << "Invalid module label or branch name: \"" << label << "\""; + } + } + + bool match(edm::BranchDescription const& branch) const { + return (std::regex_match(branch.friendlyClassName(), type_) and + std::regex_match(branch.moduleLabel(), moduleLabel_) and + std::regex_match(branch.productInstanceName(), productInstanceName_) and + std::regex_match(branch.processName(), processName_)); + } + + private: + static std::regex glob_to_regex(std::string pattern) { + boost::replace_all(pattern, "*", ".*"); + boost::replace_all(pattern, "?", "."); + return std::regex(pattern); + } + + std::regex type_; + std::regex moduleLabel_; + std::regex productInstanceName_; + std::regex processName_; + }; + + inline std::vector branchPatterns(std::vector const& labels) { + std::vector patterns; + patterns.reserve(labels.size()); + for (auto const& label : labels) + patterns.emplace_back(label); + return patterns; + } + +} // namespace edm + +#endif // DataFormats_Provenance_interface_BranchPattern_h diff --git a/FWCore/Modules/src/GenericConsumer.cc b/FWCore/Modules/src/GenericConsumer.cc index 4cceb6f5113ca..ab1ae45a793cb 100644 --- a/FWCore/Modules/src/GenericConsumer.cc +++ b/FWCore/Modules/src/GenericConsumer.cc @@ -34,6 +34,7 @@ #include #include "DataFormats/Provenance/interface/BranchDescription.h" +#include "DataFormats/Provenance/interface/BranchPattern.h" #include "FWCore/Framework/interface/global/EDAnalyzer.h" #include "FWCore/MessageLogger/interface/MessageLogger.h" #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" @@ -41,80 +42,6 @@ #include "FWCore/ParameterSet/interface/ParameterSet.h" #include "FWCore/ParameterSet/interface/ParameterSetDescription.h" -namespace { - struct ProductBranch { - public: - ProductBranch(std::string const& label) { - static const char kSeparator = '_'; - static const char kWildcard = '*'; - static const std::regex kAny{".*"}; - - // wildcard - if (label == kWildcard) { - type_ = kAny; - moduleLabel_ = kAny; - productInstanceName_ = kAny; - processName_ = kAny; - return; - } - - int fields = std::count(label.begin(), label.end(), kSeparator) + 1; - if (fields == 1) { - // convert the module label into a regular expression - type_ = kAny; - moduleLabel_ = glob_to_regex(label); - productInstanceName_ = kAny; - processName_ = kAny; - } else if (fields == 4) { - // split the branch name into ___ - // and convert the glob expressions into regular expressions - size_t first = 0, last = 0; - last = label.find(kSeparator, first); - type_ = glob_to_regex(label.substr(first, last - first)); - first = last + 1; - last = label.find(kSeparator, first); - moduleLabel_ = glob_to_regex(label.substr(first, last - first)); - first = last + 1; - last = label.find(kSeparator, first); - productInstanceName_ = glob_to_regex(label.substr(first, last - first)); - first = last + 1; - last = label.find(kSeparator, first); - processName_ = glob_to_regex(label.substr(first, last - first)); - } else { - // invalid input - throw edm::Exception(edm::errors::Configuration) << "Invalid module label or branch name: \"" << label << "\""; - } - } - - bool match(edm::BranchDescription const& branch) const { - return (std::regex_match(branch.friendlyClassName(), type_) and - std::regex_match(branch.moduleLabel(), moduleLabel_) and - std::regex_match(branch.productInstanceName(), productInstanceName_) and - std::regex_match(branch.processName(), processName_)); - } - - private: - static std::regex glob_to_regex(std::string pattern) { - boost::replace_all(pattern, "*", ".*"); - boost::replace_all(pattern, "?", "."); - return std::regex(pattern); - } - - std::regex type_; - std::regex moduleLabel_; - std::regex productInstanceName_; - std::regex processName_; - }; - - std::vector make_patterns(std::vector const& labels) { - std::vector patterns; - patterns.reserve(labels.size()); - for (auto const& label : labels) - patterns.emplace_back(label); - return patterns; - } -} // namespace - namespace edm { class GenericConsumer : public edm::global::EDAnalyzer<> { public: @@ -126,19 +53,20 @@ namespace edm { static void fillDescriptions(ConfigurationDescriptions& descriptions); private: - std::vector eventProducts_; - std::vector lumiProducts_; - std::vector runProducts_; - std::vector processProducts_; + std::vector eventProducts_; + std::vector lumiProducts_; + std::vector runProducts_; + std::vector processProducts_; std::string label_; bool verbose_; }; GenericConsumer::GenericConsumer(ParameterSet const& config) - : eventProducts_(make_patterns(config.getUntrackedParameter>("eventProducts"))), - lumiProducts_(make_patterns(config.getUntrackedParameter>("lumiProducts"))), - runProducts_(make_patterns(config.getUntrackedParameter>("runProducts"))), - processProducts_(make_patterns(config.getUntrackedParameter>("processProducts"))), + : eventProducts_(edm::branchPatterns(config.getUntrackedParameter>("eventProducts"))), + lumiProducts_(edm::branchPatterns(config.getUntrackedParameter>("lumiProducts"))), + runProducts_(edm::branchPatterns(config.getUntrackedParameter>("runProducts"))), + processProducts_( + edm::branchPatterns(config.getUntrackedParameter>("processProducts"))), label_(config.getParameter("@module_label")), verbose_(config.getUntrackedParameter("verbose")) { callWhenNewProductsRegistered([this](edm::BranchDescription const& branch) { diff --git a/FWCore/TestModules/README.md b/FWCore/TestModules/README.md index bd4cadf13823d..7e247dcc4b317 100644 --- a/FWCore/TestModules/README.md +++ b/FWCore/TestModules/README.md @@ -24,3 +24,14 @@ product read from the `Event`. Together `edmtest::EventIDProducer` and `edmtest::EventIDValidator` can be used to validate that an object produced in a given event is being read back in the same event. + + +## `edmtest::GenericCloner` + +This module will clone all the event products declared by its configuration, +using their ROOT dictionaries. +The products can be specified either as module labels (_e.g._ ``) +or as branch names (_e.g._ `___`). +Glob expressions (`?` and `*`) are supported in module labels and within the +individual fields of branch names, similar to an `OutputModule`'s `keep` +statements. diff --git a/FWCore/TestModules/plugins/GenericCloner.cc b/FWCore/TestModules/plugins/GenericCloner.cc new file mode 100644 index 0000000000000..64ccc7b9ad268 --- /dev/null +++ b/FWCore/TestModules/plugins/GenericCloner.cc @@ -0,0 +1,181 @@ +/* + * This EDProducer will clone all the event products declared by its configuration, using their ROOT dictionaries. + * + * The products can be specified either as module labels (e.g. "") or as branch names (e.g. + * "___"). + * + * If a module label is used, no underscore ("_") must be present; this module will clone all the products produced by + * that module, including those produced by the Transformer functionality (such as the implicitly copied-to-host + * products in case of Alpaka-based modules). + * If a branch name is used, all four fields must be present, separated by underscores; this module will clone only on + * the matching product(s). + * + * Glob expressions ("?" and "*") are supported in module labels and within the individual fields of branch names, + * similar to an OutputModule's "keep" statements. + * Use "*" to clone all products. + * + * For example, in the case of Alpaka-based modules running on a device, using + * + * eventProducts = cms.untracked.vstring( "module" ) + * + * will cause "module" to run, along with automatic copy of its device products to the host, and will attempt to clone + * all device and host products. + * To clone only the host product, the branch can be specified explicitly with + * + * eventProducts = cms.untracked.vstring( "HostProductType_module_*_*" ) + * + * . + */ + +#include +#include +#include +#include +#include +#include + +#include + +#include "DataFormats/Provenance/interface/BranchDescription.h" +#include "DataFormats/Provenance/interface/BranchPattern.h" +#include "FWCore/Framework/interface/Event.h" +#include "FWCore/Framework/interface/GenericHandle.h" +#include "FWCore/Framework/interface/GenericProduct.h" +#include "FWCore/Framework/interface/global/EDProducer.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" +#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" +#include "FWCore/ParameterSet/interface/ParameterDescriptionNode.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/Reflection/interface/ObjectWithDict.h" + +namespace edmtest { + + class GenericCloner : public edm::global::EDProducer<> { + public: + explicit GenericCloner(edm::ParameterSet const&); + ~GenericCloner() override = default; + + void produce(edm::StreamID, edm::Event&, edm::EventSetup const&) const override; + + static void fillDescriptions(edm::ConfigurationDescriptions& descriptions); + + private: + struct Entry { + edm::TypeWithDict objectType_; + edm::TypeWithDict wrappedType_; + edm::EDGetToken getToken_; + edm::EDPutToken putToken_; + }; + + std::vector eventPatterns_; + std::vector eventProducts_; + std::string label_; + bool verbose_; + }; + + GenericCloner::GenericCloner(edm::ParameterSet const& config) + : eventPatterns_(edm::branchPatterns(config.getParameter>("eventProducts"))), + label_(config.getParameter("@module_label")), + verbose_(config.getUntrackedParameter("verbose")) { + eventProducts_.reserve(eventPatterns_.size()); + + callWhenNewProductsRegistered([this](edm::BranchDescription const& branch) { + static const std::string_view kPathStatus("edm::PathStatus"); + static const std::string_view kEndPathStatus("edm::EndPathStatus"); + + switch (branch.branchType()) { + case edm::InEvent: + if (branch.className() == kPathStatus or branch.className() == kEndPathStatus) + return; + for (auto& pattern : eventPatterns_) + if (pattern.match(branch)) { + Entry product; + product.objectType_ = branch.unwrappedType(); + product.wrappedType_ = branch.wrappedType(); + // TODO move this to EDConsumerBase::consumes() ? + product.getToken_ = this->consumes( + edm::TypeToGet{branch.unwrappedTypeID(), edm::PRODUCT_TYPE}, + edm::InputTag{branch.moduleLabel(), branch.productInstanceName(), branch.processName()}); + product.putToken_ = this->produces(branch.unwrappedTypeID(), branch.productInstanceName()); + eventProducts_.push_back(product); + + if (verbose_) { + edm::LogInfo("GenericCloner") + << label_ << " will clone Event product " << branch.friendlyClassName() << '_' + << branch.moduleLabel() << '_' << branch.productInstanceName() << '_' << branch.processName(); + } + break; + } + break; + + case edm::InLumi: + case edm::InRun: + case edm::InProcess: + // lumi, run and process products are not supported + break; + + default: + throw edm::Exception(edm::errors::LogicError) + << "Unexpected branch type " << branch.branchType() << "\nPlease contact a Framework developer\n"; + } + }); + } + + void GenericCloner::produce(edm::StreamID /*unused*/, edm::Event& event, edm::EventSetup const& /*unused*/) const { + for (auto& product : eventProducts_) { + edm::GenericHandle handle(product.objectType_); + event.getByToken(product.getToken_, handle); + edm::ObjectWithDict const* object = handle.product(); + + TBufferFile send_buffer(TBuffer::kWrite); + send_buffer.WriteObjectAny(object->address(), product.objectType_.getClass(), false); + int size = send_buffer.Length(); + + TBufferFile recv_buffer(TBuffer::kRead, size); + std::memcpy(recv_buffer.Buffer(), send_buffer.Buffer(), size); + + void* clone_ptr = reinterpret_cast(recv_buffer.ReadObjectAny(product.objectType_.getClass())); + auto clone = std::make_unique(); + clone->object_ = edm::ObjectWithDict(product.objectType_, clone_ptr); + clone->wrappedType_ = product.wrappedType_; + + // specialise Event::put for GenericProduct + event.put(product.putToken_, std::move(clone)); + } + } + + void GenericCloner::fillDescriptions(edm::ConfigurationDescriptions& descriptions) { + descriptions.setComment( + R"(This EDProducer will clone all the event products declared by its configuration, using their ROOT dictionaries. + +The products can be specified either as module labels (e.g. "") or as branch names (e.g. "___"). +If a module label is used, no underscore ("_") must be present; this module will clone all the products produced by that module, including those produced by the Transformer functionality (such as the implicitly copied-to-host products in case of Alpaka-based modules). +If a branch name is used, all four fields must be present, separated by underscores; this module will clone only on the matching product(s). + +Glob expressions ("?" and "*") are supported in module labels and within the individual fields of branch names, similar to an OutputModule's "keep" statements. +Use "*" to clone all products. + +For example, in the case of Alpaka-based modules running on a device, using + + eventProducts = cms.untracked.vstring( "module" ) + +will cause "module" to run, along with automatic copy of its device products to the host, and will attempt to clone all device and host products. +To clone only the host product, the branch can be specified explicitly with + + eventProducts = cms.untracked.vstring( "HostProductType_module_*_*" ) + +.)"); + + edm::ParameterSetDescription desc; + desc.add>("eventProducts", {}) + ->setComment("List of modules or branches whose event products will be cloned."); + desc.addUntracked("verbose", false) + ->setComment("Print the branch names of the products that will be cloned."); + descriptions.addWithDefaultLabel(desc); + } + +} // namespace edmtest + +#include "FWCore/Framework/interface/MakerMacros.h" +DEFINE_FWK_MODULE(edmtest::GenericCloner); diff --git a/FWCore/TestModules/test/BuildFile.xml b/FWCore/TestModules/test/BuildFile.xml index 386d18c359309..d16951110027d 100644 --- a/FWCore/TestModules/test/BuildFile.xml +++ b/FWCore/TestModules/test/BuildFile.xml @@ -1 +1,3 @@ + + diff --git a/FWCore/TestModules/test/testGenericCloner_cfg.py b/FWCore/TestModules/test/testGenericCloner_cfg.py new file mode 100644 index 0000000000000..d8eaa8c9efc59 --- /dev/null +++ b/FWCore/TestModules/test/testGenericCloner_cfg.py @@ -0,0 +1,36 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("TEST") + +process.load("FWCore.MessageService.MessageLogger_cfi") +process.MessageLogger.cerr.INFO.limit = 10000000 + +process.options.numberOfThreads = 4 +process.options.numberOfStreams = 4 + +process.source = cms.Source("EmptySource") +process.maxEvents.input = 10 + +process.eventIds = cms.EDProducer("edmtest::EventIDProducer") + +process.cloneByLabel = cms.EDProducer("edmtest::GenericCloner", + eventProducts = cms.vstring("eventIds"), + verbose = cms.untracked.bool(True) +) + +process.cloneByBranch = cms.EDProducer("edmtest::GenericCloner", + eventProducts = cms.vstring("*_eventIds__TEST"), + verbose = cms.untracked.bool(True) +) + +process.validateByLabel = cms.EDAnalyzer("edmtest::EventIDValidator", + source = cms.untracked.InputTag('cloneByLabel') +) + +process.validateByBranch = cms.EDAnalyzer("edmtest::EventIDValidator", + source = cms.untracked.InputTag('cloneByBranch') +) + +process.task = cms.Task(process.eventIds, process.cloneByLabel, process.cloneByBranch) + +process.path = cms.Path(process.validateByLabel + process.validateByBranch, process.task) From 57ecb13eb385001193ca0307a54dcd1bab7b318f Mon Sep 17 00:00:00 2001 From: Andrea Bocci Date: Sat, 30 Nov 2024 10:43:27 +0100 Subject: [PATCH 3/6] Fix PFClusterSoAProducer to read a device collection Note: this is a quick workaround to let the device code use the device collection, while being able to access the actual number of pf rechits on the host side. It should replaced with a better and more general implementation, and the use of the host collection should be removed. --- .../plugins/alpaka/PFClusterECLCC.h | 18 +++++----- .../plugins/alpaka/PFClusterSoAProducer.cc | 33 +++++++++++++------ .../alpaka/PFClusterSoAProducerKernel.dev.cc | 30 ++++++++--------- .../alpaka/PFClusterSoAProducerKernel.h | 15 +++++---- 4 files changed, 53 insertions(+), 43 deletions(-) diff --git a/RecoParticleFlow/PFClusterProducer/plugins/alpaka/PFClusterECLCC.h b/RecoParticleFlow/PFClusterProducer/plugins/alpaka/PFClusterECLCC.h index abf63c01e9531..2b6fba2c69d5c 100644 --- a/RecoParticleFlow/PFClusterProducer/plugins/alpaka/PFClusterECLCC.h +++ b/RecoParticleFlow/PFClusterProducer/plugins/alpaka/PFClusterECLCC.h @@ -1,10 +1,11 @@ #ifndef RecoParticleFlow_PFClusterProducer_plugins_alpaka_PFClusterECLCC_h #define RecoParticleFlow_PFClusterProducer_plugins_alpaka_PFClusterECLCC_h +#include "DataFormats/ParticleFlowReco/interface/alpaka/PFRecHitDeviceCollection.h" #include "HeterogeneousCore/AlpakaInterface/interface/config.h" #include "HeterogeneousCore/AlpakaInterface/interface/workdivision.h" -#include "RecoParticleFlow/PFClusterProducer/interface/alpaka/PFClusteringVarsDeviceCollection.h" #include "RecoParticleFlow/PFClusterProducer/interface/alpaka/PFClusteringEdgeVarsDeviceCollection.h" +#include "RecoParticleFlow/PFClusterProducer/interface/alpaka/PFClusteringVarsDeviceCollection.h" // The following comment block is required in using the ECL-CC algorithm for topological clustering @@ -79,9 +80,8 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { // Initial step of ECL-CC. Uses ID of first neighbour in edgeList with a smaller ID class ECLCCInit { public: - template >> - ALPAKA_FN_ACC void operator()(const TAcc& acc, - reco::PFRecHitHostCollection::ConstView pfRecHits, + ALPAKA_FN_ACC void operator()(Acc1D const& acc, + reco::PFRecHitDeviceCollection::ConstView pfRecHits, reco::PFClusteringVarsDeviceCollection::View pfClusteringVars, reco::PFClusteringEdgeVarsDeviceCollection::View pfClusteringEdgeVars) const { const int nRH = pfRecHits.size(); @@ -103,9 +103,8 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { // Processes vertices class ECLCCCompute1 { public: - template >> - ALPAKA_FN_ACC void operator()(const TAcc& acc, - reco::PFRecHitHostCollection::ConstView pfRecHits, + ALPAKA_FN_ACC void operator()(Acc1D const& acc, + reco::PFRecHitDeviceCollection::ConstView pfRecHits, reco::PFClusteringVarsDeviceCollection::View pfClusteringVars, reco::PFClusteringEdgeVarsDeviceCollection::View pfClusteringEdgeVars) const { const int nRH = pfRecHits.size(); @@ -148,9 +147,8 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { /* link all vertices to sink */ class ECLCCFlatten { public: - template >> - ALPAKA_FN_ACC void operator()(const TAcc& acc, - reco::PFRecHitHostCollection::ConstView pfRecHits, + ALPAKA_FN_ACC void operator()(Acc1D const& acc, + reco::PFRecHitDeviceCollection::ConstView pfRecHits, reco::PFClusteringVarsDeviceCollection::View pfClusteringVars, reco::PFClusteringEdgeVarsDeviceCollection::View pfClusteringEdgeVars) const { const int nRH = pfRecHits.size(); diff --git a/RecoParticleFlow/PFClusterProducer/plugins/alpaka/PFClusterSoAProducer.cc b/RecoParticleFlow/PFClusterProducer/plugins/alpaka/PFClusterSoAProducer.cc index 65c0e4f5c33f3..5ecbdb2c34522 100644 --- a/RecoParticleFlow/PFClusterProducer/plugins/alpaka/PFClusterSoAProducer.cc +++ b/RecoParticleFlow/PFClusterProducer/plugins/alpaka/PFClusterSoAProducer.cc @@ -1,7 +1,7 @@ #include #include -#include "DataFormats/ParticleFlowReco/interface/PFRecHitHostCollection.h" +#include "DataFormats/ParticleFlowReco/interface/alpaka/PFRecHitDeviceCollection.h" #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" #include "FWCore/ParameterSet/interface/ParameterSet.h" #include "FWCore/ParameterSet/interface/ParameterSetDescription.h" @@ -16,11 +16,13 @@ #include "RecoParticleFlow/PFRecHitProducer/interface/PFRecHitTopologyRecord.h" namespace ALPAKA_ACCELERATOR_NAMESPACE { + class PFClusterSoAProducer : public stream::SynchronizingEDProducer<> { public: PFClusterSoAProducer(edm::ParameterSet const& config) : pfClusParamsToken(esConsumes(config.getParameter("pfClusterParams"))), topologyToken_(esConsumes(config.getParameter("topology"))), + inputPFRecHitHostSoA_Token_{consumes(config.getParameter("pfRecHits"))}, inputPFRecHitSoA_Token_{consumes(config.getParameter("pfRecHits"))}, outputPFClusterSoA_Token_{produces()}, outputPFRHFractionSoA_Token_{produces()}, @@ -30,10 +32,16 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { void acquire(device::Event const& event, device::EventSetup const& setup) override { const reco::PFClusterParamsDeviceCollection& params = setup.getData(pfClusParamsToken); const reco::PFRecHitHCALTopologyDeviceCollection& topology = setup.getData(topologyToken_); - const reco::PFRecHitHostCollection& pfRecHits = event.get(inputPFRecHitSoA_Token_); + const reco::PFRecHitDeviceCollection& pfRecHits = event.get(inputPFRecHitSoA_Token_); int nRH = 0; - if (pfRecHits->metadata().size() != 0) - nRH = pfRecHits->size(); + if (pfRecHits->metadata().size() != 0) { + // FIXME this is a quick workaround to let the device code use the device collection, + // while being able to access the actual number of pf rechits on the host side. + // It should replaced with a better and more general implementation, and the use of the + // host collection should be removed. + reco::PFRecHitHostCollection const& pfRecHitsHost = event.get(inputPFRecHitHostSoA_Token_); + nRH = pfRecHitsHost->size(); + } pfClusteringVars_.emplace(nRH, event.queue()); pfClusteringEdgeVars_.emplace(nRH * 8, event.queue()); @@ -42,13 +50,14 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { *numRHF_ = 0; if (nRH != 0) { - PFClusterProducerKernel kernel(event.queue(), pfRecHits); + PFClusterProducerKernel kernel(event.queue()); kernel.seedTopoAndContract(event.queue(), params, topology, *pfClusteringVars_, *pfClusteringEdgeVars_, pfRecHits, + nRH, *pfClusters_, numRHF_.data()); } @@ -57,23 +66,26 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { void produce(device::Event& event, device::EventSetup const& setup) override { const reco::PFClusterParamsDeviceCollection& params = setup.getData(pfClusParamsToken); const reco::PFRecHitHCALTopologyDeviceCollection& topology = setup.getData(topologyToken_); - const reco::PFRecHitHostCollection& pfRecHits = event.get(inputPFRecHitSoA_Token_); + const reco::PFRecHitDeviceCollection& pfRecHits = event.get(inputPFRecHitSoA_Token_); int nRH = 0; std::optional pfrhFractions; - if (pfRecHits->metadata().size() != 0) - nRH = pfRecHits->size(); + if (pfRecHits->metadata().size() != 0) { + reco::PFRecHitHostCollection const& pfRecHitsHost = event.get(inputPFRecHitHostSoA_Token_); + nRH = pfRecHitsHost->size(); + } if (nRH != 0) { pfrhFractions.emplace(*numRHF_.data(), event.queue()); - PFClusterProducerKernel kernel(event.queue(), pfRecHits); + PFClusterProducerKernel kernel(event.queue()); kernel.cluster(event.queue(), params, topology, *pfClusteringVars_, *pfClusteringEdgeVars_, pfRecHits, + nRH, *pfClusters_, *pfrhFractions); } else { @@ -99,7 +111,8 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { private: const device::ESGetToken pfClusParamsToken; const device::ESGetToken topologyToken_; - const edm::EDGetTokenT inputPFRecHitSoA_Token_; + const edm::EDGetTokenT inputPFRecHitHostSoA_Token_; + const device::EDGetToken inputPFRecHitSoA_Token_; const device::EDPutToken outputPFClusterSoA_Token_; const device::EDPutToken outputPFRHFractionSoA_Token_; cms::alpakatools::host_buffer numRHF_; diff --git a/RecoParticleFlow/PFClusterProducer/plugins/alpaka/PFClusterSoAProducerKernel.dev.cc b/RecoParticleFlow/PFClusterProducer/plugins/alpaka/PFClusterSoAProducerKernel.dev.cc index e0c5e8d5a24fa..b9ec4e557009c 100644 --- a/RecoParticleFlow/PFClusterProducer/plugins/alpaka/PFClusterSoAProducerKernel.dev.cc +++ b/RecoParticleFlow/PFClusterProducer/plugins/alpaka/PFClusterSoAProducerKernel.dev.cc @@ -1,13 +1,12 @@ #include +#include "DataFormats/ParticleFlowReco/interface/PFLayer.h" #include "FWCore/Utilities/interface/bit_cast.h" +#include "HeterogeneousCore/AlpakaInterface/interface/atomicMaxF.h" #include "HeterogeneousCore/AlpakaInterface/interface/config.h" #include "HeterogeneousCore/AlpakaInterface/interface/workdivision.h" -#include "HeterogeneousCore/AlpakaInterface/interface/atomicMaxF.h" - -#include "DataFormats/ParticleFlowReco/interface/PFLayer.h" -#include "RecoParticleFlow/PFClusterProducer/plugins/alpaka/PFClusterSoAProducerKernel.h" #include "RecoParticleFlow/PFClusterProducer/plugins/alpaka/PFClusterECLCC.h" +#include "RecoParticleFlow/PFClusterProducer/plugins/alpaka/PFClusterSoAProducerKernel.h" namespace ALPAKA_ACCELERATOR_NAMESPACE { @@ -1088,7 +1087,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { reco::PFClusteringVarsDeviceCollection::View pfClusteringVars, const reco::PFClusterParamsDeviceCollection::ConstView pfClusParams, const reco::PFRecHitHCALTopologyDeviceCollection::ConstView topology, - const reco::PFRecHitHostCollection::ConstView pfRecHits, + const reco::PFRecHitDeviceCollection::ConstView pfRecHits, reco::PFClusterDeviceCollection::View clusterView, uint32_t* __restrict__ nSeeds) const { const int nRH = pfRecHits.size(); @@ -1165,7 +1164,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { public: template >> ALPAKA_FN_ACC void operator()(const TAcc& acc, - const reco::PFRecHitHostCollection::ConstView pfRecHits, + const reco::PFRecHitDeviceCollection::ConstView pfRecHits, reco::PFClusteringVarsDeviceCollection::View pfClusteringVars, reco::PFClusteringEdgeVarsDeviceCollection::View pfClusteringEdgeVars, uint32_t* __restrict__ nSeeds) const { @@ -1195,7 +1194,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { public: template >> ALPAKA_FN_ACC void operator()(const TAcc& acc, - const reco::PFRecHitHostCollection::ConstView pfRecHits, + const reco::PFRecHitDeviceCollection::ConstView pfRecHits, reco::PFClusteringVarsDeviceCollection::View pfClusteringVars, reco::PFClusterDeviceCollection::View clusterView, uint32_t* __restrict__ nSeeds, @@ -1319,7 +1318,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { public: template >> ALPAKA_FN_ACC void operator()(const TAcc& acc, - const reco::PFRecHitHostCollection::ConstView pfRecHits, + const reco::PFRecHitDeviceCollection::ConstView pfRecHits, reco::PFClusteringVarsDeviceCollection::View pfClusteringVars, reco::PFRecHitFractionDeviceCollection::View fracView) const { const int nRH = pfRecHits.size(); @@ -1350,7 +1349,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { public: template >> ALPAKA_FN_ACC void operator()(const TAcc& acc, - const reco::PFRecHitHostCollection::ConstView pfRecHits, + const reco::PFRecHitDeviceCollection::ConstView pfRecHits, const reco::PFClusterParamsDeviceCollection::ConstView pfClusParams, const reco::PFRecHitHCALTopologyDeviceCollection::ConstView topology, reco::PFClusteringVarsDeviceCollection::View pfClusteringVars, @@ -1412,7 +1411,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { public: template >> ALPAKA_FN_ACC void operator()(const TAcc& acc, - const reco::PFRecHitHostCollection::ConstView pfRecHits, + const reco::PFRecHitDeviceCollection::ConstView pfRecHits, const reco::PFClusterParamsDeviceCollection::ConstView pfClusParams, const reco::PFRecHitHCALTopologyDeviceCollection::ConstView topology, reco::PFClusteringVarsDeviceCollection::View pfClusteringVars, @@ -1454,7 +1453,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { } }; - PFClusterProducerKernel::PFClusterProducerKernel(Queue& queue, const reco::PFRecHitHostCollection& pfRecHits) + PFClusterProducerKernel::PFClusterProducerKernel(Queue& queue) : nSeeds(cms::alpakatools::make_device_buffer(queue)), globalClusterPos( cms::alpakatools::make_device_buffer(queue, blocksForExoticClusters * maxTopoInput)), @@ -1473,10 +1472,10 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { const reco::PFRecHitHCALTopologyDeviceCollection& topology, reco::PFClusteringVarsDeviceCollection& pfClusteringVars, reco::PFClusteringEdgeVarsDeviceCollection& pfClusteringEdgeVars, - const reco::PFRecHitHostCollection& pfRecHits, + const reco::PFRecHitDeviceCollection& pfRecHits, + int nRH, reco::PFClusterDeviceCollection& pfClusters, uint32_t* __restrict__ nRHF) { - const int nRH = pfRecHits->size(); const int threadsPerBlock = 256; const int blocks = divide_up_by(nRH, threadsPerBlock); @@ -1533,11 +1532,10 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { const reco::PFRecHitHCALTopologyDeviceCollection& topology, reco::PFClusteringVarsDeviceCollection& pfClusteringVars, reco::PFClusteringEdgeVarsDeviceCollection& pfClusteringEdgeVars, - const reco::PFRecHitHostCollection& pfRecHits, + const reco::PFRecHitDeviceCollection& pfRecHits, + int nRH, reco::PFClusterDeviceCollection& pfClusters, reco::PFRecHitFractionDeviceCollection& pfrhFractions) { - const int nRH = pfRecHits->size(); - // fillRhfIndex alpaka::exec(queue, make_workdiv({divide_up_by(nRH, 32), divide_up_by(nRH, 32)}, {32, 32}), diff --git a/RecoParticleFlow/PFClusterProducer/plugins/alpaka/PFClusterSoAProducerKernel.h b/RecoParticleFlow/PFClusterProducer/plugins/alpaka/PFClusterSoAProducerKernel.h index 32c68a64f4b24..cfc79b8d7ad69 100644 --- a/RecoParticleFlow/PFClusterProducer/plugins/alpaka/PFClusterSoAProducerKernel.h +++ b/RecoParticleFlow/PFClusterProducer/plugins/alpaka/PFClusterSoAProducerKernel.h @@ -1,15 +1,14 @@ #ifndef RecoParticleFlow_PFClusterProducer_PFClusterProducerAlpakaKernel_h #define RecoParticleFlow_PFClusterProducer_PFClusterProducerAlpakaKernel_h -#include "DataFormats/ParticleFlowReco/interface/alpaka/PFRecHitDeviceCollection.h" -#include "DataFormats/ParticleFlowReco/interface/PFRecHitHostCollection.h" #include "DataFormats/ParticleFlowReco/interface/alpaka/PFClusterDeviceCollection.h" +#include "DataFormats/ParticleFlowReco/interface/alpaka/PFRecHitDeviceCollection.h" #include "DataFormats/ParticleFlowReco/interface/alpaka/PFRecHitFractionDeviceCollection.h" +#include "HeterogeneousCore/AlpakaInterface/interface/config.h" #include "RecoParticleFlow/PFClusterProducer/interface/alpaka/PFClusterParamsDeviceCollection.h" -#include "RecoParticleFlow/PFClusterProducer/interface/alpaka/PFClusteringVarsDeviceCollection.h" #include "RecoParticleFlow/PFClusterProducer/interface/alpaka/PFClusteringEdgeVarsDeviceCollection.h" +#include "RecoParticleFlow/PFClusterProducer/interface/alpaka/PFClusteringVarsDeviceCollection.h" #include "RecoParticleFlow/PFRecHitProducer/interface/alpaka/PFRecHitTopologyDeviceCollection.h" -#include "HeterogeneousCore/AlpakaInterface/interface/config.h" namespace ALPAKA_ACCELERATOR_NAMESPACE { @@ -37,14 +36,15 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { class PFClusterProducerKernel { public: - PFClusterProducerKernel(Queue& queue, const reco::PFRecHitHostCollection& pfRecHits); + explicit PFClusterProducerKernel(Queue& queue); void seedTopoAndContract(Queue& queue, const reco::PFClusterParamsDeviceCollection& params, const reco::PFRecHitHCALTopologyDeviceCollection& topology, reco::PFClusteringVarsDeviceCollection& pfClusteringVars, reco::PFClusteringEdgeVarsDeviceCollection& pfClusteringEdgeVars, - const reco::PFRecHitHostCollection& pfRecHits, + const reco::PFRecHitDeviceCollection& pfRecHits, + int nRH, reco::PFClusterDeviceCollection& pfClusters, uint32_t* __restrict__ nRHF); @@ -53,7 +53,8 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { const reco::PFRecHitHCALTopologyDeviceCollection& topology, reco::PFClusteringVarsDeviceCollection& pfClusteringVars, reco::PFClusteringEdgeVarsDeviceCollection& pfClusteringEdgeVars, - const reco::PFRecHitHostCollection& pfRecHits, + const reco::PFRecHitDeviceCollection& pfRecHits, + int nRH, reco::PFClusterDeviceCollection& pfClusters, reco::PFRecHitFractionDeviceCollection& pfrhFractions); From d3e2b2a3a88ce24032e4254be1725332aec32916 Mon Sep 17 00:00:00 2001 From: Andrea Bocci Date: Thu, 30 Mar 2023 15:01:26 +0200 Subject: [PATCH 4/6] Extend CMSSW to a fully distributed application Let multiple CMSSW processes on the same or different machines coordinate event processing and transfer data products over MPI. The implementation is based on four CMSSW modules. Two are responsible for setting up the communication channels and coordinate the event processing: - the MPIController - the MPISource and two are responsible for the transfer of data products: - the MPISender - the MPIReceiver . The MPIController is an EDProducer running in a regular CMSSW process. After setting up the communication with an MPISource, it transmits to it all EDM run, lumi and event transitions, and instructs the MPISource to replicate them in the second process. The MPISource is a Source controlling the execution of a second CMSSW process. After setting up the communication with an MPIController, it listens for EDM run, lumi and event transitions, and replicates them in its process. Both MPIController and MPISource produce an MPIToken, a special data product that encapsulates the information about the MPI communication channel. The MPISender is an EDProducer that can read a collection of a predefined type from the Event, serialise it using its ROOT dictionary, and send it over the MPI communication channel. The MPIReceiver is an EDProducer that can receive a collection of a predefined type over the MPI communication channel, deserialise is using its ROOT dictionary, and put it in the Event. Both MPISender and MPIReceiver are templated on the type to be transmitted and de/serialised. Each MPISender and MPIReceiver is configured with an instance value that is used to match one MPISender in one process to one MPIReceiver in another process. Using different instance values allows the use of multiple MPISenders/MPIReceivers in a process. Both MPISender and MPIReceiver obtain the MPI communication channel reading an MPIToken from the event. They also produce a copy of the MPIToken, so other modules can consume it to declare a dependency on the previous modules. An automated test is available in the test/ directory. --- HeterogeneousCore/MPICore/BuildFile.xml | 5 + HeterogeneousCore/MPICore/README.md | 84 ++++++ .../MPICore/interface/MPIToken.h | 25 ++ .../MPICore/plugins/BuildFile.xml | 14 + .../MPICore/plugins/MPIController.cc | 230 +++++++++++++++ .../MPICore/plugins/MPIReceiver.cc | 66 +++++ .../MPICore/plugins/MPIReporter.cc | 59 ++++ .../MPICore/plugins/MPISender.cc | 61 ++++ .../MPICore/plugins/MPISource.cc | 266 ++++++++++++++++++ HeterogeneousCore/MPICore/plugins/api.cc | 177 ++++++++++++ HeterogeneousCore/MPICore/plugins/api.h | 142 ++++++++++ .../MPICore/plugins/conversion.cc | 85 ++++++ .../MPICore/plugins/conversion.h | 26 ++ HeterogeneousCore/MPICore/plugins/macros.h | 144 ++++++++++ HeterogeneousCore/MPICore/plugins/messages.cc | 76 +++++ HeterogeneousCore/MPICore/plugins/messages.h | 117 ++++++++ HeterogeneousCore/MPICore/src/classes.h | 2 + HeterogeneousCore/MPICore/src/classes_def.xml | 4 + HeterogeneousCore/MPICore/test/BuildFile.xml | 3 + .../MPICore/test/eventlist_cff.py | 39 +++ HeterogeneousCore/MPICore/test/testMPI.sh | 58 ++++ .../MPICore/test/testMPIController.py | 52 ++++ .../MPICore/test/testMPIFollower.py | 42 +++ 23 files changed, 1777 insertions(+) create mode 100644 HeterogeneousCore/MPICore/BuildFile.xml create mode 100644 HeterogeneousCore/MPICore/README.md create mode 100644 HeterogeneousCore/MPICore/interface/MPIToken.h create mode 100644 HeterogeneousCore/MPICore/plugins/BuildFile.xml create mode 100644 HeterogeneousCore/MPICore/plugins/MPIController.cc create mode 100644 HeterogeneousCore/MPICore/plugins/MPIReceiver.cc create mode 100644 HeterogeneousCore/MPICore/plugins/MPIReporter.cc create mode 100644 HeterogeneousCore/MPICore/plugins/MPISender.cc create mode 100644 HeterogeneousCore/MPICore/plugins/MPISource.cc create mode 100644 HeterogeneousCore/MPICore/plugins/api.cc create mode 100644 HeterogeneousCore/MPICore/plugins/api.h create mode 100644 HeterogeneousCore/MPICore/plugins/conversion.cc create mode 100644 HeterogeneousCore/MPICore/plugins/conversion.h create mode 100644 HeterogeneousCore/MPICore/plugins/macros.h create mode 100644 HeterogeneousCore/MPICore/plugins/messages.cc create mode 100644 HeterogeneousCore/MPICore/plugins/messages.h create mode 100644 HeterogeneousCore/MPICore/src/classes.h create mode 100644 HeterogeneousCore/MPICore/src/classes_def.xml create mode 100644 HeterogeneousCore/MPICore/test/eventlist_cff.py create mode 100755 HeterogeneousCore/MPICore/test/testMPI.sh create mode 100644 HeterogeneousCore/MPICore/test/testMPIController.py create mode 100644 HeterogeneousCore/MPICore/test/testMPIFollower.py diff --git a/HeterogeneousCore/MPICore/BuildFile.xml b/HeterogeneousCore/MPICore/BuildFile.xml new file mode 100644 index 0000000000000..f8ececf282946 --- /dev/null +++ b/HeterogeneousCore/MPICore/BuildFile.xml @@ -0,0 +1,5 @@ + + + + + diff --git a/HeterogeneousCore/MPICore/README.md b/HeterogeneousCore/MPICore/README.md new file mode 100644 index 0000000000000..b1303c37e20f2 --- /dev/null +++ b/HeterogeneousCore/MPICore/README.md @@ -0,0 +1,84 @@ +# Extend CMSSW to a fully distributed application + +Let multiple CMSSW processes on the same or different machines coordinate event processing and transfer data products +over MPI. + +The implementation is based on four CMSSW modules. +Two are responsible for setting up the communication channels and coordinate the event processing: + - the `MPIController` + - the `MPISource` + +and two are responsible for the transfer of data products: + - the `MPISender` + - the `MPIReceiver` + +. + +## `MPIController` class + +The `MPIController` is an `EDProducer` running in a regular CMSSW process. After setting up the communication with an +`MPISource`, it transmits to it all EDM run, lumi and event transitions, and instructs the `MPISource` to replicate them +in the second process. + + +## `MPISource` class + +The `MPISource` is a `Source` controlling the execution of a second CMSSW process. After setting up the communication +with an `MPIController`, it listens for EDM run, lumi and event transitions, and replicates them in its process. + + +## `MPISender` class + +The `MPISender` is an `EDProducer` that can read a collection of a predefined type from the `Event`, serialise it using +its ROOT dictionary, and send it over the MPI communication channel. +`MPISender` is templated on the type to be serialised and transmitted. + + +## `MPIReceiver` class + +The `MPIReceiver` is an `EDProducer` that can receive a collection of a predefined type over the MPI communication +channel, deserialise is using its ROOT dictionary, and put it in the `Event`. +`MPIReceiver` is templated on the type to be received and deserialised. + +Both `MPISender` and `MPIReceiver` are configured with an instance value that is used to match one `MPISender` in one +process to one `MPIReceiver` in another process. Using different instance values allows the use of multiple pairs of +`MPISender`/`MPIReceiver` modules in a process. + + +## MPI communication channel + +The `MPIController` and `MPISource` produce an MPIToken, a special data product that encapsulates the information about +the MPI communication channel. + +Both `MPISender` and `MPIReceiver` obtain the MPI communication channel reading an MPIToken from the event. They also +produce a copy of the MPIToken, so other modules can consume it to declare a dependency on the previous modules. + + +## Testing + +An automated test is available in the test/ directory. + + +## Current limitations + + - all communication is blocking, and there is no acknowledgment or feedback from one module to the other; + - `MPIDriver` is a "one" module that supports only a single luminosity block at a time; + - `MPISender` and `MPIReceiver` support a single compile-time type; + - there is no check that the type sent by the `MPISender` matches the type expected by the `MPIReceiver`. + + +## Notes for future developments + + - implement efficient serialisation for standard layout types; + - implement efficient serialisation for `PortableCollection` types; + - check the the collection sent by the `MPISender` and the one expected by the `MPIReceiver` match; + - extend the `MPISender` and `MPIReceiver` to send and receive multiple collections; + - rewrite the `MPISender` and `MPIReceiver` to send and receive arbitrary run-time collections; + - improve the `MPIController` to be a `global` module rather than a `one` module; + - let an `MPISource` accept connections and events from multiple `MPIController` modules in different jobs; + - let an `MPIController` connect and sent events to multiple `MPISource` modules in different jobs; + - support multiple concurrent runs and luminosity blocks, up to a given maximum; + - transfer the `ProcessingHistory` from the `MPIController` to the `MPISource` ? and vice-versa ? + - transfer other provenance information from the `MPIController` to the `MPISource` ? and vice-versa ? + - when a run, luminosity block or event is received, check that they belong to the same `ProcessingHistory` as the + ongoing run ? diff --git a/HeterogeneousCore/MPICore/interface/MPIToken.h b/HeterogeneousCore/MPICore/interface/MPIToken.h new file mode 100644 index 0000000000000..6796243a1527b --- /dev/null +++ b/HeterogeneousCore/MPICore/interface/MPIToken.h @@ -0,0 +1,25 @@ +#ifndef HeterogeneousCore_MPICore_MPIToken_h +#define HeterogeneousCore_MPICore_MPIToken_h + +#include + +// forward declaration +class MPIChannel; + +class MPIToken { +public: + // default constructor, needed to write the type's dictionary + MPIToken() = default; + + // user-defined constructor + explicit MPIToken(std::shared_ptr channel) : channel_(channel) {} + + // access the data member + MPIChannel* channel() const { return channel_.get(); } + +private: + // wrap the MPI communicator and destination + std::shared_ptr channel_; +}; + +#endif // HeterogeneousCore_MPICore_MPIToken_h diff --git a/HeterogeneousCore/MPICore/plugins/BuildFile.xml b/HeterogeneousCore/MPICore/plugins/BuildFile.xml new file mode 100644 index 0000000000000..f094803caf49b --- /dev/null +++ b/HeterogeneousCore/MPICore/plugins/BuildFile.xml @@ -0,0 +1,14 @@ + + + + + + + + + + + + + + diff --git a/HeterogeneousCore/MPICore/plugins/MPIController.cc b/HeterogeneousCore/MPICore/plugins/MPIController.cc new file mode 100644 index 0000000000000..fe18d2fcd366b --- /dev/null +++ b/HeterogeneousCore/MPICore/plugins/MPIController.cc @@ -0,0 +1,230 @@ +#include +#include +#include + +#include + +#include +#include + +#include "DataFormats/Provenance/interface/BranchKey.h" +#include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h" +#include "FWCore/Framework/interface/Event.h" +#include "FWCore/Framework/interface/GenericHandle.h" +#include "FWCore/Framework/interface/LuminosityBlock.h" +#include "FWCore/Framework/interface/MakerMacros.h" +#include "FWCore/Framework/interface/Run.h" +#include "FWCore/Framework/interface/one/EDProducer.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" +#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" +#include "FWCore/ParameterSet/interface/ParameterDescriptionNode.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/Reflection/interface/ObjectWithDict.h" +#include "FWCore/Reflection/interface/TypeWithDict.h" +#include "FWCore/ServiceRegistry/interface/Service.h" +#include "FWCore/Utilities/interface/Guid.h" +#include "HeterogeneousCore/MPICore/interface/MPIToken.h" +#include "HeterogeneousCore/MPIServices/interface/MPIService.h" + +#include "api.h" +#include "messages.h" + +/* MPIController class + * + * This module runs inside a CMSSW job (the "controller") and connects to an "MPISource" in a separate CMSSW job (the "follower"). + * The follower is informed of all transitions seen by the controller, and can replicate them in its own process. + * + * Current limitations: + * - support a single "follower" + * + * Future work: + * - support multiple "followers" + */ + +class MPIController : public edm::one::EDProducer { +public: + explicit MPIController(edm::ParameterSet const& config); + ~MPIController() override; + + void beginJob() override; + void endJob() override; + + void beginRun(edm::Run const& run, edm::EventSetup const& setup) override; + void endRun(edm::Run const& run, edm::EventSetup const& setup) override; + + void beginLuminosityBlock(edm::LuminosityBlock const& lumi, edm::EventSetup const& setup) override; + void endLuminosityBlock(edm::LuminosityBlock const& lumi, edm::EventSetup const& setup) override; + + void produce(edm::Event& event, edm::EventSetup const& setup) override; + + static void fillDescriptions(edm::ConfigurationDescriptions& descriptions); + +private: + MPI_Comm comm_ = MPI_COMM_NULL; + MPIChannel channel_; + edm::EDPutTokenT token_; +}; + +MPIController::MPIController(edm::ParameterSet const& config) + : token_(produces()) // +{ + // make sure that MPI is initialised + MPIService::required(); + + // FIXME move into the MPIService ? + // make sure the EDM MPI types are available + EDM_MPI_build_types(); + + // look up the "server" port + char port[MPI_MAX_PORT_NAME]; + MPI_Lookup_name("server", MPI_INFO_NULL, port); + edm::LogAbsolute("MPI") << "Trying to connect to the MPI server on port " << port; + + // connect to the server + int size; + MPI_Comm_connect(port, MPI_INFO_NULL, 0, MPI_COMM_SELF, &comm_); + MPI_Comm_remote_size(comm_, &size); + edm::LogAbsolute("MPI") << "Client connected to " << size << (size == 1 ? " server" : " servers"); + if (size > 1) { + throw cms::Exception("UnsupportedFeature") + << "MPIController supports only a single follower, but it was connected to " << size << " followers"; + } + channel_ = MPIChannel(comm_, 0); +} + +MPIController::~MPIController() { + // close the intercommunicator + MPI_Comm_disconnect(&comm_); +} + +void MPIController::beginJob() { + // signal the connection + channel_.sendConnect(); + + /* is there a way to access all known process histories ? + edm::ProcessHistoryRegistry const& registry = * edm::ProcessHistoryRegistry::instance(); + edm::LogAbsolute("MPI") << "ProcessHistoryRegistry:"; + for (auto const& keyval: registry) { + edm::LogAbsolute("MPI") << keyval.first << ": " << keyval.second; + } + */ +} + +void MPIController::endJob() { + // signal the disconnection + channel_.sendDisconnect(); +} + +void MPIController::beginRun(edm::Run const& run, edm::EventSetup const& setup) { + // signal a new run, and transmit the RunAuxiliary + /* FIXME + * Ideally the ProcessHistoryID stored in the run.runAuxiliary() should be the correct one, and + * we could simply do + + channel_.sendBeginRun(run.runAuxiliary()); + + * Instead, it looks like the ProcessHistoryID stored in the run.runAuxiliary() is that of the + * _parent_ process. + * So, we make a copy of the RunAuxiliary, set the ProcessHistoryID to the correct value, and + * transmit the modified RunAuxiliary. + */ + auto aux = run.runAuxiliary(); + aux.setProcessHistoryID(run.processHistory().id()); + channel_.sendBeginRun(aux); + + // transmit the ProcessHistory + channel_.sendSerializedProduct(0, run.processHistory()); +} + +void MPIController::endRun(edm::Run const& run, edm::EventSetup const& setup) { + // signal the end of run + /* FIXME + * Ideally the ProcessHistoryID stored in the run.runAuxiliary() should be the correct one, and + * we could simply do + + channel_.sendEndRun(run.runAuxiliary()); + + * Instead, it looks like the ProcessHistoryID stored in the run.runAuxiliary() is that of the + * _parent_ process. + * So, we make a copy of the RunAuxiliary, set the ProcessHistoryID to the correct value, and + * transmit the modified RunAuxiliary. + */ + auto aux = run.runAuxiliary(); + aux.setProcessHistoryID(run.processHistory().id()); + channel_.sendEndRun(aux); +} + +void MPIController::beginLuminosityBlock(edm::LuminosityBlock const& lumi, edm::EventSetup const& setup) { + // signal a new luminosity block, and transmit the LuminosityBlockAuxiliary + /* FIXME + * Ideally the ProcessHistoryID stored in the lumi.luminosityBlockAuxiliary() should be the + * correct one, and we could simply do + + channel_.sendBeginLuminosityBlock(lumi.luminosityBlockAuxiliary()); + + * Instead, it looks like the ProcessHistoryID stored in the lumi.luminosityBlockAuxiliary() is + * that of the _parent_ process. + * So, we make a copy of the LuminosityBlockAuxiliary, set the ProcessHistoryID to the correct + * value, and transmit the modified LuminosityBlockAuxiliary. + */ + auto aux = lumi.luminosityBlockAuxiliary(); + aux.setProcessHistoryID(lumi.processHistory().id()); + channel_.sendBeginLuminosityBlock(aux); +} + +void MPIController::endLuminosityBlock(edm::LuminosityBlock const& lumi, edm::EventSetup const& setup) { + // signal the end of luminosity block + /* FIXME + * Ideally the ProcessHistoryID stored in the lumi.luminosityBlockAuxiliary() should be the + * correct one, and we could simply do + + channel_.sendEndLuminosityBlock(lumi.luminosityBlockAuxiliary()); + + * Instead, it looks like the ProcessHistoryID stored in the lumi.luminosityBlockAuxiliary() is + * that of the _parent_ process. + * So, we make a copy of the LuminosityBlockAuxiliary, set the ProcessHistoryID to the correct + * value, and transmit the modified LuminosityBlockAuxiliary. + */ + auto aux = lumi.luminosityBlockAuxiliary(); + aux.setProcessHistoryID(lumi.processHistory().id()); + channel_.sendEndLuminosityBlock(aux); +} + +void MPIController::produce(edm::Event& event, edm::EventSetup const& setup) { + { + edm::LogInfo log("MPI"); + log << "processing run " << event.run() << ", lumi " << event.luminosityBlock() << ", event " << event.id().event(); + log << "\nprocess history: " << event.processHistory(); + log << "\nprocess history id: " << event.processHistory().id(); + log << "\nprocess history id: " << event.eventAuxiliary().processHistoryID() << " (from eventAuxiliary)"; + log << "\nisRealData " << event.eventAuxiliary().isRealData(); + log << "\nexperimentType " << event.eventAuxiliary().experimentType(); + log << "\nbunchCrossing " << event.eventAuxiliary().bunchCrossing(); + log << "\norbitNumber " << event.eventAuxiliary().orbitNumber(); + log << "\nstoreNumber " << event.eventAuxiliary().storeNumber(); + log << "\nprocessHistoryID " << event.eventAuxiliary().processHistoryID(); + log << "\nprocessGUID " << edm::Guid(event.eventAuxiliary().processGUID(), true).toString(); + } + + // signal a new event, and transmit the EventAuxiliary + channel_.sendEvent(event.eventAuxiliary()); + + // duplicate the MPIChannel and put the copy into the Event + std::shared_ptr link(new MPIChannel(channel_.duplicate()), [](MPIChannel* ptr) { + ptr->reset(); + delete ptr; + }); + event.emplace(token_, std::move(link)); +} + +void MPIController::fillDescriptions(edm::ConfigurationDescriptions& descriptions) { + descriptions.setComment( + "This module connects to an \"MPISource\" in a separate CMSSW job, and transmits all " + "Runs, LuminosityBlocks and Events from the current process to the remote one."); + + edm::ParameterSetDescription desc; + descriptions.addWithDefaultLabel(desc); +} + +DEFINE_FWK_MODULE(MPIController); diff --git a/HeterogeneousCore/MPICore/plugins/MPIReceiver.cc b/HeterogeneousCore/MPICore/plugins/MPIReceiver.cc new file mode 100644 index 0000000000000..1afc2d616eadd --- /dev/null +++ b/HeterogeneousCore/MPICore/plugins/MPIReceiver.cc @@ -0,0 +1,66 @@ +// C++ include files +#include + +// CMSSW include files +#include "FWCore/Framework/interface/Event.h" +#include "FWCore/Framework/interface/global/EDProducer.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/Utilities/interface/Exception.h" +#include "HeterogeneousCore/MPICore/interface/MPIToken.h" + +// local include files +#include "api.h" + +template +class MPIReceiver : public edm::global::EDProducer<> { + using CollectionType = T; + +public: + MPIReceiver(edm::ParameterSet const& config) + : mpiPrev_(consumes(config.getParameter("channel"))), + mpiNext_(produces()), + data_(produces()), + instance_(config.getParameter("instance")) // + { + // instance 0 is reserved for the MPIController / MPISource pair + // instance values greater than 255 may not fit in the MPI tag + if (instance_ < 1 or instance_ > 255) { + throw cms::Exception("InvalidValue") + << "Invalid MPIReceiver instance value, please use a value between 1 and 255"; + } + } + + void produce(edm::StreamID, edm::Event& event, edm::EventSetup const&) const override { + // read the MPIToken used to establish the communication channel + MPIToken token = event.get(mpiPrev_); + + // receive the data sent over the MPI channel + // note: currently this uses a blocking probe/recv + CollectionType data; + token.channel()->receiveSerializedProduct(instance_, data); + + // put the data into the Event + event.emplace(data_, std::move(data)); + + // write a shallow copy of the channel to the output, so other modules can consume it + // to indicate that they should run after this + event.emplace(mpiNext_, token); + } + +private: + edm::EDGetTokenT const mpiPrev_; // MPIToken used to establish the communication channel + edm::EDPutTokenT const mpiNext_; // copy of the MPIToken that may be used to implement an ordering relation + edm::EDPutTokenT const data_; // data to be read over the channel and put into the Event + int32_t const instance_; // instance used to identify the source-destination pair +}; + +#include "FWCore/Framework/interface/MakerMacros.h" + +#include "DataFormats/Provenance/interface/EventID.h" +using MPIReceiverEventID = MPIReceiver; +DEFINE_FWK_MODULE(MPIReceiverEventID); + +#include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h" +using MPIReceiverFEDRawDataCollection = MPIReceiver; +DEFINE_FWK_MODULE(MPIReceiverFEDRawDataCollection); diff --git a/HeterogeneousCore/MPICore/plugins/MPIReporter.cc b/HeterogeneousCore/MPICore/plugins/MPIReporter.cc new file mode 100644 index 0000000000000..49ec608e0a0f5 --- /dev/null +++ b/HeterogeneousCore/MPICore/plugins/MPIReporter.cc @@ -0,0 +1,59 @@ +#include "FWCore/Framework/interface/Event.h" +#include "FWCore/Framework/interface/stream/EDAnalyzer.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" +#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/Utilities/interface/Guid.h" +#include "HeterogeneousCore/MPICore/interface/MPIToken.h" + +/* MPIReporter class + * + */ + +class MPIReporter : public edm::stream::EDAnalyzer<> { +public: + explicit MPIReporter(edm::ParameterSet const& config); + ~MPIReporter() override = default; + + void analyze(edm::Event const& event, edm::EventSetup const& setup) override; + + static void fillDescriptions(edm::ConfigurationDescriptions& descriptions); + +private: + edm::EDGetTokenT token_; +}; + +MPIReporter::MPIReporter(edm::ParameterSet const& config) : token_(consumes(edm::InputTag("source"))) {} + +void MPIReporter::analyze(edm::Event const& event, edm::EventSetup const& setup) { + { + edm::LogAbsolute log("MPI"); + log << "stream " << event.streamID() << ": processing run " << event.run() << ", lumi " << event.luminosityBlock() + << ", event " << event.id().event(); + log << "\nprocess history: " << event.processHistory(); + log << "\nprocess history id: " << event.processHistory().id(); + log << "\nprocess history id: " << event.eventAuxiliary().processHistoryID() << " (from eventAuxiliary)"; + log << "\nisRealData " << event.eventAuxiliary().isRealData(); + log << "\nexperimentType " << event.eventAuxiliary().experimentType(); + log << "\nbunchCrossing " << event.eventAuxiliary().bunchCrossing(); + log << "\norbitNumber " << event.eventAuxiliary().orbitNumber(); + log << "\nstoreNumber " << event.eventAuxiliary().storeNumber(); + log << "\nprocessHistoryID " << event.eventAuxiliary().processHistoryID(); + log << "\nprocessGUID " << edm::Guid(event.eventAuxiliary().processGUID(), true).toString(); + } + + auto const& token = event.get(token_); + { + edm::LogAbsolute log("MPI"); + log << "got the MPIToken opaque wrapper around the MPIChannel at " << &token; + } +} + +void MPIReporter::fillDescriptions(edm::ConfigurationDescriptions& descriptions) { + edm::ParameterSetDescription desc; + descriptions.addWithDefaultLabel(desc); +} + +#include "FWCore/Framework/interface/MakerMacros.h" +DEFINE_FWK_MODULE(MPIReporter); diff --git a/HeterogeneousCore/MPICore/plugins/MPISender.cc b/HeterogeneousCore/MPICore/plugins/MPISender.cc new file mode 100644 index 0000000000000..9cfd3936bbf46 --- /dev/null +++ b/HeterogeneousCore/MPICore/plugins/MPISender.cc @@ -0,0 +1,61 @@ +// CMSSW include files +#include "FWCore/Framework/interface/Event.h" +#include "FWCore/Framework/interface/global/EDProducer.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/Utilities/interface/Exception.h" +#include "HeterogeneousCore/MPICore/interface/MPIToken.h" + +// local include files +#include "api.h" + +template +class MPISender : public edm::global::EDProducer<> { + using CollectionType = T; + +public: + MPISender(edm::ParameterSet const& config) + : mpiPrev_(consumes(config.getParameter("channel"))), + mpiNext_(produces()), + data_(consumes(config.getParameter("data"))), + instance_(config.getParameter("instance")) // + { + // instance 0 is reserved for the MPIController / MPISource pair + // instance values greater than 255 may not fit in the MPI tag + if (instance_ < 1 or instance_ > 255) { + throw cms::Exception("InvalidValue") << "Invalid MPISender instance value, please use a value between 1 and 255"; + } + } + + void produce(edm::StreamID, edm::Event& event, edm::EventSetup const&) const override { + // read the MPIToken used to establish the communication channel + MPIToken token = event.get(mpiPrev_); + + // read the data to be sent over the MPI channel + auto data = event.get(data_); + + // send the data over MPI + // note: currently this uses a blocking send + token.channel()->sendSerializedProduct(instance_, data); + + // write a shallow copy of the channel to the output, so other modules can consume it + // to indicate that they should run after this + event.emplace(mpiNext_, token); + } + +private: + edm::EDGetTokenT const mpiPrev_; // MPIToken used to establish the communication channel + edm::EDPutTokenT const mpiNext_; // copy of the MPIToken that may be used to implement an ordering relation + edm::EDGetTokenT const data_; // data to be read from the Event and sent over the channel + int32_t const instance_; // instance used to identify the source-destination pair +}; + +#include "FWCore/Framework/interface/MakerMacros.h" + +#include "DataFormats/Provenance/interface/EventID.h" +using MPISenderEventID = MPISender; +DEFINE_FWK_MODULE(MPISenderEventID); + +#include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h" +using MPISenderFEDRawDataCollection = MPISender; +DEFINE_FWK_MODULE(MPISenderFEDRawDataCollection); diff --git a/HeterogeneousCore/MPICore/plugins/MPISource.cc b/HeterogeneousCore/MPICore/plugins/MPISource.cc new file mode 100644 index 0000000000000..07e132d3cb37e --- /dev/null +++ b/HeterogeneousCore/MPICore/plugins/MPISource.cc @@ -0,0 +1,266 @@ +// C++ headers +#include +#include +#include + +// ROOT headers +#include +#include +#include + +// MPI headers +#include + +// CMSSW headers +#include "DataFormats/Provenance/interface/BranchListIndex.h" +#include "DataFormats/Provenance/interface/EventAuxiliary.h" +#include "DataFormats/Provenance/interface/EventSelectionID.h" +#include "DataFormats/Provenance/interface/EventToProcessBlockIndexes.h" +#include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h" +#include "DataFormats/Provenance/interface/ProcessHistory.h" +#include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h" +#include "DataFormats/Provenance/interface/RunAuxiliary.h" +#include "FWCore/Framework/interface/Event.h" +#include "FWCore/Framework/interface/EventPrincipal.h" +#include "FWCore/Framework/interface/InputSource.h" +#include "FWCore/Framework/interface/InputSourceDescription.h" +#include "FWCore/Framework/interface/InputSourceMacros.h" +#include "FWCore/Framework/interface/ProductProvenanceRetriever.h" +#include "FWCore/MessageLogger/interface/ErrorObj.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" +#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescriptionFiller.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/Sources/interface/ProducerSourceBase.h" +#include "HeterogeneousCore/MPICore/interface/MPIToken.h" +#include "HeterogeneousCore/MPIServices/interface/MPIService.h" + +// local headers +#include "api.h" +#include "conversion.h" +#include "messages.h" + +class MPISource : public edm::ProducerSourceBase { +public: + explicit MPISource(edm::ParameterSet const& config, edm::InputSourceDescription const& desc); + ~MPISource() override; + using InputSource::processHistoryRegistryForUpdate; + using InputSource::productRegistryUpdate; + + static void fillDescriptions(edm::ConfigurationDescriptions& descriptions); + +private: + bool setRunAndEventInfo(edm::EventID& id, edm::TimeValue_t& time, edm::EventAuxiliary::ExperimentType&) override; + void produce(edm::Event&) override; + + char port_[MPI_MAX_PORT_NAME]; + MPI_Comm comm_ = MPI_COMM_NULL; + MPIChannel channel_; + edm::EDPutTokenT token_; + + edm::ProcessHistory history_; +}; + +MPISource::MPISource(edm::ParameterSet const& config, edm::InputSourceDescription const& desc) + : // note that almost all configuration parameters passed to IDGeneratorSourceBase via ProducerSourceBase will + // effectively be ignored, because this ConfigurableSource will explicitly set the run, lumi, and event + // numbers, the timestamp, and the event type + edm::ProducerSourceBase(config, desc, false), + token_(produces()) // +{ + // make sure that MPI is initialised + MPIService::required(); + + // FIXME move into the MPIService ? + // make sure the EDM MPI types are available + EDM_MPI_build_types(); + + // open a server-side port + MPI_Open_port(MPI_INFO_NULL, port_); + + // publish the port under the name "server" + MPI_Info port_info; + MPI_Info_create(&port_info); + MPI_Info_set(port_info, "ompi_global_scope", "true"); + MPI_Info_set(port_info, "ompi_unique", "true"); + MPI_Publish_name("server", port_info, port_); + + // create an intercommunicator and accept a client connection + edm::LogAbsolute("MPI") << "waiting for a connection to the MPI server at port " << port_; + MPI_Comm_accept(port_, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &comm_); + channel_ = MPIChannel(comm_, 0); + + // wait for a client to connect + MPI_Status status; + EDM_MPI_Empty_t buffer; + MPI_Recv(&buffer, 1, EDM_MPI_Empty, MPI_ANY_SOURCE, EDM_MPI_Connect, comm_, &status); + edm::LogAbsolute("MPI") << "connected from " << status.MPI_SOURCE; +} + +MPISource::~MPISource() { + // close the intercommunicator + MPI_Comm_disconnect(&comm_); + + // unpublish and close the port + MPI_Info port_info; + MPI_Info_create(&port_info); + MPI_Info_set(port_info, "ompi_global_scope", "true"); + MPI_Info_set(port_info, "ompi_unique", "true"); + MPI_Unpublish_name("server", port_info, port_); + MPI_Close_port(port_); +} + +//MPISource::ItemTypeInfo MPISource::getNextItemType() { +bool MPISource::setRunAndEventInfo(edm::EventID& event, + edm::TimeValue_t& time, + edm::EventAuxiliary::ExperimentType& type) { + while (true) { + MPI_Status status; + MPI_Message message; + MPI_Mprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, comm_, &message, &status); + switch (status.MPI_TAG) { + // Connect message + case EDM_MPI_Connect: { + // receive the message header + EDM_MPI_Empty_t buffer; + MPI_Mrecv(&buffer, 1, EDM_MPI_Empty, &message, &status); + + // the Connect message is unexpected here (see above) + throw cms::Exception("InvalidValue") + << "The MPISource has received an EDM_MPI_Connect message after the initial connection"; + return false; + } + + // Disconnect message + case EDM_MPI_Disconnect: { + // receive the message header + EDM_MPI_Empty_t buffer; + MPI_Mrecv(&buffer, 1, EDM_MPI_Empty, &message, &status); + + // signal the end of the input data + return false; + } + + // BeginStream message + case EDM_MPI_BeginStream: { + // receive the message header + EDM_MPI_Empty_t buffer; + MPI_Mrecv(&buffer, 1, EDM_MPI_Empty, &message, &status); + + // receive the next message + break; + } + + // EndStream message + case EDM_MPI_EndStream: { + // receive the message header + EDM_MPI_Empty_t buffer; + MPI_Mrecv(&buffer, 1, EDM_MPI_Empty, &message, &status); + + // receive the next message + break; + } + + // BeginRun message + case EDM_MPI_BeginRun: { + // receive the RunAuxiliary + EDM_MPI_RunAuxiliary_t buffer; + MPI_Mrecv(&buffer, 1, EDM_MPI_RunAuxiliary, &message, &status); + // TODO this is currently not used + edm::RunAuxiliary runAuxiliary; + edmFromBuffer(buffer, runAuxiliary); + + // receive the ProcessHistory + history_.clear(); + channel_.receiveSerializedProduct(0, history_); + history_.initializeTransients(); + if (processHistoryRegistryForUpdate().registerProcessHistory(history_)) { + edm::LogAbsolute("MPI") << "new ProcessHistory registered: " << history_; + } + + // receive the next message + break; + } + + // EndRun message + case EDM_MPI_EndRun: { + // receive the RunAuxiliary message + EDM_MPI_RunAuxiliary_t buffer; + MPI_Mrecv(&buffer, 1, EDM_MPI_RunAuxiliary, &message, &status); + + // receive the next message + break; + } + + // BeginLuminosityBlock message + case EDM_MPI_BeginLuminosityBlock: { + // receive the LuminosityBlockAuxiliary + EDM_MPI_LuminosityBlockAuxiliary_t buffer; + MPI_Mrecv(&buffer, 1, EDM_MPI_LuminosityBlockAuxiliary, &message, &status); + // TODO this is currently not used + edm::LuminosityBlockAuxiliary luminosityBlockAuxiliary; + edmFromBuffer(buffer, luminosityBlockAuxiliary); + + // receive the next message + break; + } + + // EndLuminosityBlock message + case EDM_MPI_EndLuminosityBlock: { + // receive the LuminosityBlockAuxiliary + EDM_MPI_LuminosityBlockAuxiliary_t buffer; + MPI_Mrecv(&buffer, 1, EDM_MPI_LuminosityBlockAuxiliary, &message, &status); + + // receive the next message + break; + } + + // ProcessEvent message + case EDM_MPI_ProcessEvent: { + // receive the EventAuxiliary + edm::EventAuxiliary aux; + status = channel_.receiveEvent(aux, message); + + // extract the rank of the other process (currently unused) + int source = status.MPI_SOURCE; + (void)source; + + // fill the event details + event = aux.id(); + time = aux.time().value(); + type = aux.experimentType(); + + // signal a new event + return true; + } + + // unexpected message + default: { + throw cms::Exception("InvalidValue") + << "The MPISource has received an unknown message with tag " << status.MPI_TAG; + return false; + } + } + } +} + +void MPISource::produce(edm::Event& event) { + // duplicate the MPIChannel and put the copy into the Event + std::shared_ptr channel(new MPIChannel(channel_.duplicate()), [](MPIChannel* ptr) { + ptr->reset(); + delete ptr; + }); + event.emplace(token_, std::move(channel)); +} + +void MPISource::fillDescriptions(edm::ConfigurationDescriptions& descriptions) { + edm::ParameterSetDescription desc; + desc.setComment("Comunicate with another cmsRun process over MPI."); + edm::ProducerSourceBase::fillDescription(desc); + + descriptions.add("source", desc); +} + +#include "FWCore/Framework/interface/InputSourceMacros.h" +DEFINE_FWK_INPUT_SOURCE(MPISource); diff --git a/HeterogeneousCore/MPICore/plugins/api.cc b/HeterogeneousCore/MPICore/plugins/api.cc new file mode 100644 index 0000000000000..2a75e92ab8a83 --- /dev/null +++ b/HeterogeneousCore/MPICore/plugins/api.cc @@ -0,0 +1,177 @@ +// C++ standard library headers +#include +#include +#include + +// ROOT headers +#include +#include + +// CMSSW headers +#include "DataFormats/Provenance/interface/EventAuxiliary.h" +#include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h" +#include "DataFormats/Provenance/interface/RunAuxiliary.h" + +// local headers +#include "api.h" +#include "conversion.h" +#include "messages.h" + +namespace { + // copy the content of an std::string-like object to an N-sized char buffer: + // if the string is larger than the buffer, copy only the first N bytes; + // if the string is smaller than the buffer, fill the rest of the buffer with NUL characters. + template + void copy_and_fill(char (&dest)[N], S const& src) { + if (std::size(src) < N) { + memset(dest, 0x00, N); + memcpy(dest, src.data(), std::size(src)); + } else { + memcpy(dest, src.data(), N); + } + } +} // namespace + +// build a new MPIChannel that uses a duplicate of the underlying communicator and the same destination +MPIChannel MPIChannel::duplicate() const { + MPI_Comm newcomm; + MPI_Comm_dup(comm_, &newcomm); + return MPIChannel(newcomm, dest_); +} + +// close the underlying communicator and reset the MPIChannel to an invalid state +void MPIChannel::reset() { + MPI_Comm_disconnect(&comm_); + dest_ = MPI_UNDEFINED; +} + +// fill an edm::RunAuxiliary object from an EDM_MPI_RunAuxiliary buffer +void MPIChannel::edmFromBuffer_(EDM_MPI_RunAuxiliary_t const& buffer, edm::RunAuxiliary& aux) { + aux = edm::RunAuxiliary(buffer.run, edm::Timestamp(buffer.beginTime), edm::Timestamp(buffer.endTime)); + aux.setProcessHistoryID( + edm::ProcessHistoryID(std::string(buffer.processHistoryID, std::size(buffer.processHistoryID)))); +} + +// fill an EDM_MPI_RunAuxiliary buffer from an edm::RunAuxiliary object +void MPIChannel::edmToBuffer_(EDM_MPI_RunAuxiliary_t& buffer, edm::RunAuxiliary const& aux) { + copy_and_fill(buffer.processHistoryID, aux.processHistoryID().compactForm()); + buffer.beginTime = aux.beginTime().value(); + buffer.endTime = aux.endTime().value(); + buffer.run = aux.id().run(); +} + +// fill an edm::LuminosityBlockAuxiliary object from an EDM_MPI_LuminosityBlockAuxiliary buffer +void MPIChannel::edmFromBuffer_(EDM_MPI_LuminosityBlockAuxiliary_t const& buffer, edm::LuminosityBlockAuxiliary& aux) { + aux = edm::LuminosityBlockAuxiliary( + buffer.run, buffer.lumi, edm::Timestamp(buffer.beginTime), edm::Timestamp(buffer.endTime)); + aux.setProcessHistoryID( + edm::ProcessHistoryID(std::string(buffer.processHistoryID, std::size(buffer.processHistoryID)))); +} + +// fill an EDM_MPI_LuminosityBlockAuxiliary buffer from an edm::LuminosityBlockAuxiliary object +void MPIChannel::edmToBuffer_(EDM_MPI_LuminosityBlockAuxiliary_t& buffer, edm::LuminosityBlockAuxiliary const& aux) { + copy_and_fill(buffer.processHistoryID, aux.processHistoryID().compactForm()); + buffer.beginTime = aux.beginTime().value(); + buffer.endTime = aux.endTime().value(); + buffer.run = aux.id().run(); + buffer.lumi = aux.id().luminosityBlock(); +} + +// fill an edm::EventAuxiliary object from an EDM_MPI_EventAuxiliary buffer +void MPIChannel::edmFromBuffer_(EDM_MPI_EventAuxiliary_t const& buffer, edm::EventAuxiliary& aux) { + aux = edm::EventAuxiliary({buffer.run, buffer.lumi, buffer.event}, + std::string(buffer.processGuid, std::size(buffer.processGuid)), + edm::Timestamp(buffer.time), + buffer.realData, + static_cast(buffer.experimentType), + buffer.bunchCrossing, + buffer.storeNumber, + buffer.orbitNumber); + aux.setProcessHistoryID( + edm::ProcessHistoryID(std::string(buffer.processHistoryID, std::size(buffer.processHistoryID)))); +} + +// fill an EDM_MPI_EventAuxiliary buffer from an edm::EventAuxiliary object +void MPIChannel::edmToBuffer_(EDM_MPI_EventAuxiliary_t& buffer, edm::EventAuxiliary const& aux) { + copy_and_fill(buffer.processHistoryID, aux.processHistoryID().compactForm()); + copy_and_fill(buffer.processGuid, aux.processGUID()); + buffer.time = aux.time().value(); + buffer.realData = aux.isRealData(); + buffer.experimentType = aux.experimentType(); + buffer.bunchCrossing = aux.bunchCrossing(); + buffer.orbitNumber = aux.orbitNumber(); + buffer.storeNumber = aux.storeNumber(); + buffer.run = aux.id().run(); + buffer.lumi = aux.id().luminosityBlock(); + buffer.event = aux.id().event(); +} + +// fill and send an EDM_MPI_Empty_t buffer +void MPIChannel::sendEmpty_(int tag) { + EDM_MPI_Empty_t buffer; + buffer.messageTag = tag; + MPI_Send(&buffer, 1, EDM_MPI_Empty, dest_, tag, comm_); +} + +// fill and send an EDM_MPI_RunAuxiliary_t buffer +void MPIChannel::sendRunAuxiliary_(int tag, edm::RunAuxiliary const& aux) { + EDM_MPI_RunAuxiliary_t buffer; + buffer.messageTag = tag; + edmToBuffer_(buffer, aux); + MPI_Send(&buffer, 1, EDM_MPI_RunAuxiliary, dest_, tag, comm_); +} + +// fill and send an EDM_MPI_RunAuxiliary_t buffer +void MPIChannel::sendLuminosityBlockAuxiliary_(int tag, edm::LuminosityBlockAuxiliary const& aux) { + EDM_MPI_LuminosityBlockAuxiliary_t buffer; + buffer.messageTag = tag; + edmToBuffer_(buffer, aux); + MPI_Send(&buffer, 1, EDM_MPI_LuminosityBlockAuxiliary, dest_, tag, comm_); +} + +// fill and send an EDM_MPI_EventAuxiliary_t buffer +void MPIChannel::sendEventAuxiliary_(edm::EventAuxiliary const& aux) { + EDM_MPI_EventAuxiliary_t buffer; + buffer.messageTag = EDM_MPI_ProcessEvent; + edmToBuffer_(buffer, aux); + MPI_Send(&buffer, 1, EDM_MPI_EventAuxiliary, dest_, EDM_MPI_ProcessEvent, comm_); +} + +// receive an EDM_MPI_EventAuxiliary_t buffer and populate an edm::EventAuxiliary +MPI_Status MPIChannel::receiveEventAuxiliary_(edm::EventAuxiliary& aux, int source, int tag) { + MPI_Status status; + EDM_MPI_EventAuxiliary_t buffer; + MPI_Recv(&buffer, 1, EDM_MPI_EventAuxiliary, source, tag, comm_, &status); + edmFromBuffer_(buffer, aux); + return status; +} + +// receive an EDM_MPI_EventAuxiliary_t buffer and populate an edm::EventAuxiliary +MPI_Status MPIChannel::receiveEventAuxiliary_(edm::EventAuxiliary& aux, MPI_Message& message) { + MPI_Status status; + EDM_MPI_EventAuxiliary_t buffer; + MPI_Mrecv(&buffer, 1, EDM_MPI_EventAuxiliary, &message, &status); + edmFromBuffer_(buffer, aux); + return status; +} + +// serialize an object of generic type using its ROOT dictionary, and send the binary blob +void MPIChannel::sendSerializedProduct_(int instance, TClass const* type, void const* product) { + TBufferFile buffer{TBuffer::kWrite}; + buffer.WriteClassBuffer(type, const_cast(product)); + int tag = EDM_MPI_SendSerializedProduct | instance * EDM_MPI_MessageTagWidth_; + MPI_Send(buffer.Buffer(), buffer.Length(), MPI_BYTE, dest_, tag, comm_); +} + +// receive a binary blob, and deserialize an object of generic type using its ROOT dictionary +void MPIChannel::receiveSerializedProduct_(int instance, TClass const* type, void* product) { + int tag = EDM_MPI_SendSerializedProduct | instance * EDM_MPI_MessageTagWidth_; + MPI_Message message; + MPI_Status status; + MPI_Mprobe(dest_, tag, comm_, &message, &status); + int size; + MPI_Get_count(&status, MPI_BYTE, &size); + TBufferFile buffer{TBuffer::kRead, size}; + MPI_Mrecv(buffer.Buffer(), size, MPI_BYTE, &message, &status); + const_cast(type)->ReadBuffer(buffer, product); +} diff --git a/HeterogeneousCore/MPICore/plugins/api.h b/HeterogeneousCore/MPICore/plugins/api.h new file mode 100644 index 0000000000000..4540ceff934cc --- /dev/null +++ b/HeterogeneousCore/MPICore/plugins/api.h @@ -0,0 +1,142 @@ +#ifndef HeterogeneousCore_MPICore_plugins_api_h +#define HeterogeneousCore_MPICore_plugins_api_h + +// externals headers +#include + +// ROOT headers +#include + +// CMSSW headers +#include "DataFormats/Provenance/interface/ProvenanceFwd.h" +#include "FWCore/Reflection/interface/ObjectWithDict.h" + +// local headers +#include "messages.h" + +class MPIChannel { +public: + MPIChannel() = default; + MPIChannel(MPI_Comm comm, int destination) : comm_(comm), dest_(destination) {} + + // build a new MPIChannel that uses a duplicate of the underlying communicator and the same destination + MPIChannel duplicate() const; + + // close the underlying communicator and reset the MPIChannel to an invalid state + void reset(); + + // announce that a client has just connected + void sendConnect() { sendEmpty_(EDM_MPI_Connect); } + + // announce that the client will disconnect + void sendDisconnect() { sendEmpty_(EDM_MPI_Disconnect); } + + // signal the begin of stream + void sendBeginStream() { sendEmpty_(EDM_MPI_BeginStream); } + + // signal the end of stream + void sendEndStream() { sendEmpty_(EDM_MPI_EndStream); } + + // signal a new run, and transmit the RunAuxiliary + void sendBeginRun(edm::RunAuxiliary const& aux) { sendRunAuxiliary_(EDM_MPI_BeginRun, aux); } + + // signal the end of run, and re-transmit the RunAuxiliary + void sendEndRun(edm::RunAuxiliary const& aux) { sendRunAuxiliary_(EDM_MPI_EndRun, aux); } + + // signal a new luminosity block, and transmit the LuminosityBlockAuxiliary + void sendBeginLuminosityBlock(edm::LuminosityBlockAuxiliary const& aux) { + sendLuminosityBlockAuxiliary_(EDM_MPI_BeginLuminosityBlock, aux); + } + + // signal the end of luminosity block, and re-transmit the LuminosityBlockAuxiliary + void sendEndLuminosityBlock(edm::LuminosityBlockAuxiliary const& aux) { + sendLuminosityBlockAuxiliary_(EDM_MPI_EndLuminosityBlock, aux); + } + + // signal a new event, and transmit the EventAuxiliary + void sendEvent(edm::EventAuxiliary const& aux) { sendEventAuxiliary_(aux); } + + // start processing a new event, and receive the EventAuxiliary + MPI_Status receiveEvent(edm::EventAuxiliary& aux, int source) { + return receiveEventAuxiliary_(aux, source, EDM_MPI_ProcessEvent); + } + + MPI_Status receiveEvent(edm::EventAuxiliary& aux, MPI_Message& message) { + return receiveEventAuxiliary_(aux, message); + } + + // serialize an object of type T using its ROOT dictionary, and transmit it + template + void sendSerializedProduct(int instance, T const& product) { + static const TClass* type = TClass::GetClass(); + sendSerializedProduct_(instance, type, &product); + } + + // serialize an object of generic type using its ROOT dictionary, and transmit it + void sendSerializedProduct(int instance, edm::ObjectWithDict const& product) { + // the expected use case is that the product type corresponds to the actual type, + // so we access the type with typeOf() instead of dynamicType() + sendSerializedProduct_(instance, product.typeOf().getClass(), product.address()); + } + + // signal that an expected product will not be transmitted + void sendSkipProduct() { sendEmpty_(EDM_MPI_SkipProduct); } + + // signal that the transmission of multiple products is complete + void sendComplete() { sendEmpty_(EDM_MPI_SendComplete); } + + // receive and object of type T, and deserialize it using its ROOT dictionary + template + void receiveSerializedProduct(int instance, T& product) { + static const TClass* type = TClass::GetClass(); + receiveSerializedProduct_(instance, type, &product); + } + + // receive and object of generic type, and deserialize it using its ROOT dictionary + void receiveSerializedProduct(int instance, edm::ObjectWithDict& product) { + // the expected use case is that the product type corresponds to the actual type, + // so we access the type with typeOf() instead of dynamicType() + receiveSerializedProduct_(instance, product.typeOf().getClass(), product.address()); + } + +private: + // serialize an EDM object to a simplified representation that can be transmitted as an MPI message + void edmToBuffer_(EDM_MPI_RunAuxiliary_t& buffer, edm::RunAuxiliary const& aux); + void edmToBuffer_(EDM_MPI_LuminosityBlockAuxiliary_t& buffer, edm::LuminosityBlockAuxiliary const& aux); + void edmToBuffer_(EDM_MPI_EventAuxiliary_t& buffer, edm::EventAuxiliary const& aux); + + // dwserialize an EDM object from a simplified representation transmitted as an MPI message + void edmFromBuffer_(EDM_MPI_RunAuxiliary_t const& buffer, edm::RunAuxiliary& aux); + void edmFromBuffer_(EDM_MPI_LuminosityBlockAuxiliary_t const& buffer, edm::LuminosityBlockAuxiliary& aux); + void edmFromBuffer_(EDM_MPI_EventAuxiliary_t const& buffer, edm::EventAuxiliary& aux); + + // fill and send an EDM_MPI_Empty_t buffer + void sendEmpty_(int tag); + + // fill and send an EDM_MPI_RunAuxiliary_t buffer + void sendRunAuxiliary_(int tag, edm::RunAuxiliary const& aux); + + // fill and send an EDM_MPI_LuminosityBlock_t buffer + void sendLuminosityBlockAuxiliary_(int tag, edm::LuminosityBlockAuxiliary const& aux); + + // fill and send an EDM_MPI_EventAuxiliary_t buffer + void sendEventAuxiliary_(edm::EventAuxiliary const& aux); + + // receive an EDM_MPI_EventAuxiliary_t buffer and populate an edm::EventAuxiliary + MPI_Status receiveEventAuxiliary_(edm::EventAuxiliary& aux, int source, int tag); + MPI_Status receiveEventAuxiliary_(edm::EventAuxiliary& aux, MPI_Message& message); + + // serialize a generic object using its ROOT dictionary, and send the binary blob + void sendSerializedProduct_(int instance, TClass const* type, void const* product); + + // receive a binary blob, and deserialize an object of generic type using its ROOT dictionary + void receiveSerializedProduct_(int instance, TClass const* type, void* product); + + // MPI intercommunicator + MPI_Comm comm_ = MPI_COMM_NULL; + + // MPI destination + int dest_ = MPI_UNDEFINED; +}; + +#endif // HeterogeneousCore_MPICore_plugins_api_h diff --git a/HeterogeneousCore/MPICore/plugins/conversion.cc b/HeterogeneousCore/MPICore/plugins/conversion.cc new file mode 100644 index 0000000000000..313363300b47f --- /dev/null +++ b/HeterogeneousCore/MPICore/plugins/conversion.cc @@ -0,0 +1,85 @@ +#include +#include + +#include "DataFormats/Provenance/interface/EventAuxiliary.h" +#include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h" +#include "DataFormats/Provenance/interface/RunAuxiliary.h" + +#include "conversion.h" +#include "messages.h" + +namespace { + // copy the content of an std::string-like object to an N-sized char buffer: + // if the string is larger than the buffer, copy only the first N bytes; + // if the string is smaller than the buffer, fill the rest of the buffer with NUL characters. + template + void copy_and_fill(char (&dest)[N], S const& src) { + if (std::size(src) < N) { + memset(dest, 0x00, N); + memcpy(dest, src.data(), std::size(src)); + } else { + memcpy(dest, src.data(), N); + } + } +} // namespace + +// fill an edm::RunAuxiliary object from an EDM_MPI_RunAuxiliary buffer +void edmFromBuffer(EDM_MPI_RunAuxiliary_t const& buffer, edm::RunAuxiliary& aux) { + aux = edm::RunAuxiliary(buffer.run, edm::Timestamp(buffer.beginTime), edm::Timestamp(buffer.endTime)); + aux.setProcessHistoryID( + edm::ProcessHistoryID(std::string(buffer.processHistoryID, std::size(buffer.processHistoryID)))); +} + +// fill an EDM_MPI_RunAuxiliary buffer from an edm::RunAuxiliary object +void edmToBuffer(EDM_MPI_RunAuxiliary_t& buffer, edm::RunAuxiliary const& aux) { + copy_and_fill(buffer.processHistoryID, aux.processHistoryID().compactForm()); + buffer.beginTime = aux.beginTime().value(); + buffer.endTime = aux.endTime().value(); + buffer.run = aux.id().run(); +} + +// fill an edm::LuminosityBlockAuxiliary object from an EDM_MPI_LuminosityBlockAuxiliary buffer +void edmFromBuffer(EDM_MPI_LuminosityBlockAuxiliary_t const& buffer, edm::LuminosityBlockAuxiliary& aux) { + aux = edm::LuminosityBlockAuxiliary( + buffer.run, buffer.lumi, edm::Timestamp(buffer.beginTime), edm::Timestamp(buffer.endTime)); + aux.setProcessHistoryID( + edm::ProcessHistoryID(std::string(buffer.processHistoryID, std::size(buffer.processHistoryID)))); +} + +// fill an EDM_MPI_LuminosityBlockAuxiliary buffer from an edm::LuminosityBlockAuxiliary object +void edmToBuffer(EDM_MPI_LuminosityBlockAuxiliary_t& buffer, edm::LuminosityBlockAuxiliary const& aux) { + copy_and_fill(buffer.processHistoryID, aux.processHistoryID().compactForm()); + buffer.beginTime = aux.beginTime().value(); + buffer.endTime = aux.endTime().value(); + buffer.run = aux.id().run(); + buffer.lumi = aux.id().luminosityBlock(); +} + +// fill an edm::EventAuxiliary object from an EDM_MPI_EventAuxiliary buffer +void edmFromBuffer(EDM_MPI_EventAuxiliary_t const& buffer, edm::EventAuxiliary& aux) { + aux = edm::EventAuxiliary({buffer.run, buffer.lumi, buffer.event}, + std::string(buffer.processGuid, std::size(buffer.processGuid)), + edm::Timestamp(buffer.time), + buffer.realData, + static_cast(buffer.experimentType), + buffer.bunchCrossing, + buffer.storeNumber, + buffer.orbitNumber); + aux.setProcessHistoryID( + edm::ProcessHistoryID(std::string(buffer.processHistoryID, std::size(buffer.processHistoryID)))); +} + +// fill an EDM_MPI_EventAuxiliary buffer from an edm::EventAuxiliary object +void edmToBuffer(EDM_MPI_EventAuxiliary_t& buffer, edm::EventAuxiliary const& aux) { + copy_and_fill(buffer.processHistoryID, aux.processHistoryID().compactForm()); + copy_and_fill(buffer.processGuid, aux.processGUID()); + buffer.time = aux.time().value(); + buffer.realData = aux.isRealData(); + buffer.experimentType = aux.experimentType(); + buffer.bunchCrossing = aux.bunchCrossing(); + buffer.orbitNumber = aux.orbitNumber(); + buffer.storeNumber = aux.storeNumber(); + buffer.run = aux.id().run(); + buffer.lumi = aux.id().luminosityBlock(); + buffer.event = aux.id().event(); +} diff --git a/HeterogeneousCore/MPICore/plugins/conversion.h b/HeterogeneousCore/MPICore/plugins/conversion.h new file mode 100644 index 0000000000000..1c7f8be2ad6e0 --- /dev/null +++ b/HeterogeneousCore/MPICore/plugins/conversion.h @@ -0,0 +1,26 @@ +#ifndef HeterogeneousCore_MPICore_plugins_conversion_h +#define HeterogeneousCore_MPICore_plugins_conversion_h + +#include "DataFormats/Provenance/interface/ProvenanceFwd.h" + +#include "messages.h" + +// fill an edm::RunAuxiliary object from an EDM_MPI_RunAuxiliary_t buffer +void edmFromBuffer(EDM_MPI_RunAuxiliary_t const &, edm::RunAuxiliary &); + +// fill an EDM_MPI_RunAuxiliary_t buffer from an edm::RunAuxiliary object +void edmToBuffer(EDM_MPI_RunAuxiliary_t &, edm::RunAuxiliary const &); + +// fill an edm::LuminosityBlockAuxiliary object from an EDM_MPI_LuminosityBlockAuxiliary_t buffer +void edmFromBuffer(EDM_MPI_LuminosityBlockAuxiliary_t const &, edm::LuminosityBlockAuxiliary &); + +// fill an EDM_MPI_LuminosityBlockAuxiliary_t buffer from an edm::LuminosityBlockAuxiliary object +void edmToBuffer(EDM_MPI_LuminosityBlockAuxiliary_t &, edm::LuminosityBlockAuxiliary const &); + +// fill an edm::EventAuxiliary object from an EDM_MPI_EventAuxiliary_t buffer +void edmFromBuffer(EDM_MPI_EventAuxiliary_t const &, edm::EventAuxiliary &); + +// fill an EDM_MPI_EventAuxiliary_t buffer from an edm::EventAuxiliary object +void edmToBuffer(EDM_MPI_EventAuxiliary_t &, edm::EventAuxiliary const &); + +#endif // HeterogeneousCore_MPICore_plugins_conversion_h diff --git a/HeterogeneousCore/MPICore/plugins/macros.h b/HeterogeneousCore/MPICore/plugins/macros.h new file mode 100644 index 0000000000000..0fc5f36b6273e --- /dev/null +++ b/HeterogeneousCore/MPICore/plugins/macros.h @@ -0,0 +1,144 @@ +#ifndef HeterogeneousCore_MPICore_plugins_macros_h +#define HeterogeneousCore_MPICore_plugins_macros_h + +#include + +#include + +namespace mpi_traits { + template + constexpr inline size_t mpi_length = 1; + + template + constexpr inline size_t mpi_length = N; + + template + struct mpi_type { + inline static const MPI_Datatype value = MPI_DATATYPE_NULL; + }; + + template + struct mpi_type { + inline static const MPI_Datatype value = mpi_type::value; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_CHAR; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_UNSIGNED_CHAR; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_WCHAR; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_SHORT; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_UNSIGNED_SHORT; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_INT; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_UNSIGNED; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_LONG; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_UNSIGNED_LONG; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_LONG_LONG; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_UNSIGNED_LONG_LONG; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_FLOAT; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_DOUBLE; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_LONG_DOUBLE; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_BYTE; + }; +} // namespace mpi_traits + +// clang-format off + +#define _GET_MPI_TYPE_LENGTH_IMPL(STRUCT, FIELD) \ + mpi_traits::mpi_length + +#define _GET_MPI_TYPE_LENGTH(R, STRUCT, FIELD) \ + _GET_MPI_TYPE_LENGTH_IMPL(STRUCT, FIELD), + +#define _GET_MPI_TYPE_LENGTHS(STRUCT, ...) \ + BOOST_PP_SEQ_FOR_EACH(_GET_MPI_TYPE_LENGTH, STRUCT, BOOST_PP_VARIADIC_TO_SEQ(__VA_ARGS__)) + +#define _GET_MPI_TYPE_OFFSET_IMPL(STRUCT, FIELD) \ + offsetof(STRUCT, FIELD) + +#define _GET_MPI_TYPE_OFFSET(R, STRUCT, FIELD) \ + _GET_MPI_TYPE_OFFSET_IMPL(STRUCT, FIELD), + +#define _GET_MPI_TYPE_OFFSETS(STRUCT, ...) \ + BOOST_PP_SEQ_FOR_EACH(_GET_MPI_TYPE_OFFSET, STRUCT, BOOST_PP_VARIADIC_TO_SEQ(__VA_ARGS__)) + +#define _GET_MPI_TYPE_TYPEID_IMPL(STRUCT, FIELD) \ + mpi_traits::mpi_type::value + +#define _GET_MPI_TYPE_TYPEID(R, STRUCT, FIELD) \ + _GET_MPI_TYPE_TYPEID_IMPL(STRUCT, FIELD), + +#define _GET_MPI_TYPE_TYPEIDS(STRUCT, ...) \ + BOOST_PP_SEQ_FOR_EACH(_GET_MPI_TYPE_TYPEID, STRUCT, BOOST_PP_VARIADIC_TO_SEQ(__VA_ARGS__)) + +#define DECLARE_MPI_TYPE(TYPE, STRUCT, ...) \ + _Pragma("GCC diagnostic push"); \ + _Pragma("GCC diagnostic ignored \"-Winvalid-offsetof\""); \ + { \ + constexpr int lenghts[] = {_GET_MPI_TYPE_LENGTHS(STRUCT, __VA_ARGS__)}; \ + constexpr MPI_Aint displacements[] = {_GET_MPI_TYPE_OFFSETS(STRUCT, __VA_ARGS__)}; \ + const MPI_Datatype types[] = {_GET_MPI_TYPE_TYPEIDS(STRUCT, __VA_ARGS__)}; \ + MPI_Type_create_struct(std::size(lenghts), lenghts, displacements, types, &TYPE); \ + MPI_Type_commit(&TYPE); \ + } \ + _Pragma("GCC diagnostic pop") + +// clang-format on + +#endif // HeterogeneousCore_MPICore_plugins_macros_h diff --git a/HeterogeneousCore/MPICore/plugins/messages.cc b/HeterogeneousCore/MPICore/plugins/messages.cc new file mode 100644 index 0000000000000..8305a2541c39f --- /dev/null +++ b/HeterogeneousCore/MPICore/plugins/messages.cc @@ -0,0 +1,76 @@ +#include + +#include + +#include + +#include "macros.h" +#include "messages.h" + +MPI_Datatype EDM_MPI_Empty; +MPI_Datatype EDM_MPI_RunAuxiliary; +MPI_Datatype EDM_MPI_LuminosityBlockAuxiliary; +MPI_Datatype EDM_MPI_EventAuxiliary; + +MPI_Datatype EDM_MPI_MessageType[EDM_MPI_MessageTagCount_]; + +void EDM_MPI_build_types_() { + // EDM_MPI_Empty + DECLARE_MPI_TYPE(EDM_MPI_Empty, // MPI_Datatype + EDM_MPI_Empty_t, // C++ struct + messageTag); // EDM_MPI_MessageTag + + // EDM_MPI_RunAuxiliary + DECLARE_MPI_TYPE(EDM_MPI_RunAuxiliary, // MPI_Datatype + EDM_MPI_RunAuxiliary_t, // C++ struct + messageTag, // EDM_MPI_MessageTag + processHistoryID, // edm::ProcessHistoryID::compactForm() + beginTime, // edm::TimeValue_t + endTime, // edm::TimeValue_t + run); // edm::RunNumber_t + + // EDM_MPI_LuminosityBlockAuxiliary + DECLARE_MPI_TYPE(EDM_MPI_LuminosityBlockAuxiliary, // MPI_Datatype + EDM_MPI_LuminosityBlockAuxiliary_t, // C++ struct + messageTag, // EDM_MPI_MessageTag + processHistoryID, // edm::ProcessHistoryID::compactForm() + beginTime, // edm::TimeValue_t + endTime, // edm::TimeValue_t + run, // edm::RunNumber_t + lumi); // edm::LuminosityBlockNumber_t + + // EDM_MPI_EventAuxiliary + DECLARE_MPI_TYPE(EDM_MPI_EventAuxiliary, // MPI_Datatype + EDM_MPI_EventAuxiliary_t, // C++ struct + messageTag, // EDM_MPI_MessageTag + processHistoryID, // edm::ProcessHistoryID::compactForm() + processGuid, // process GUID + time, // edm::TimeValue_t + realData, // real data (true) vs simulation (false) + experimentType, // edm::EventAuxiliary::ExperimentType + bunchCrossing, // LHC bunch crossing + orbitNumber, // LHC orbit number + storeNumber, // LHC fill number ? + run, // edm::RunNumber_t + lumi, // edm::LuminosityBlockNumber_t + event); // edm::EventNumber_t + + EDM_MPI_MessageType[EDM_MPI_Connect] = EDM_MPI_Empty; // + EDM_MPI_MessageType[EDM_MPI_Disconnect] = EDM_MPI_Empty; // + EDM_MPI_MessageType[EDM_MPI_BeginStream] = EDM_MPI_Empty; // + EDM_MPI_MessageType[EDM_MPI_EndStream] = EDM_MPI_Empty; // + EDM_MPI_MessageType[EDM_MPI_BeginRun] = EDM_MPI_RunAuxiliary; // + EDM_MPI_MessageType[EDM_MPI_EndRun] = EDM_MPI_RunAuxiliary; // + EDM_MPI_MessageType[EDM_MPI_BeginLuminosityBlock] = EDM_MPI_LuminosityBlockAuxiliary; // + EDM_MPI_MessageType[EDM_MPI_EndLuminosityBlock] = EDM_MPI_LuminosityBlockAuxiliary; // + EDM_MPI_MessageType[EDM_MPI_ProcessEvent] = EDM_MPI_EventAuxiliary; // + EDM_MPI_MessageType[EDM_MPI_SendSerializedProduct] = MPI_BYTE; // variable-length binary blob + EDM_MPI_MessageType[EDM_MPI_SendTrivialProduct] = MPI_BYTE; // variable-length binary blob + EDM_MPI_MessageType[EDM_MPI_SkipProduct] = EDM_MPI_Empty; // + EDM_MPI_MessageType[EDM_MPI_SendComplete] = EDM_MPI_Empty; // +} + +void EDM_MPI_build_types() { + static std::once_flag flag; + std::call_once(flag, EDM_MPI_build_types_); +} diff --git a/HeterogeneousCore/MPICore/plugins/messages.h b/HeterogeneousCore/MPICore/plugins/messages.h new file mode 100644 index 0000000000000..394346c8d7c5e --- /dev/null +++ b/HeterogeneousCore/MPICore/plugins/messages.h @@ -0,0 +1,117 @@ +#ifndef HeterogeneousCore_MPICore_plugins_messages_h +#define HeterogeneousCore_MPICore_plugins_messages_h + +#include + +#include + +/* register the MPI message types forthe EDM communication + */ +void EDM_MPI_build_types(); + +/* MPI message tags corresponding to EDM transitions + */ +enum EDM_MPI_MessageTag { + EDM_MPI_Connect, + EDM_MPI_Disconnect, + EDM_MPI_BeginStream, + EDM_MPI_EndStream, + EDM_MPI_BeginRun, + EDM_MPI_EndRun, + EDM_MPI_BeginLuminosityBlock, + EDM_MPI_EndLuminosityBlock, + EDM_MPI_ProcessEvent, + EDM_MPI_SendSerializedProduct, + EDM_MPI_SendTrivialProduct, + EDM_MPI_SkipProduct, + EDM_MPI_SendComplete, + EDM_MPI_MessageTagCount_ +}; + +/* Ensure that the MPI message tags can fit in a single byte + */ +inline constexpr int EDM_MPI_MessageTagWidth_ = 256; +static_assert(EDM_MPI_MessageTagCount_ <= EDM_MPI_MessageTagWidth_); + +extern MPI_Datatype EDM_MPI_MessageType[EDM_MPI_MessageTagCount_]; + +/* Common header for EDM MPI messages, containing + * - the message type (to allow decoding the message further) + */ +struct __attribute__((aligned(8))) EDM_MPI_Header_t { + uint32_t messageTag; // EDM_MPI_MessageTag +}; + +/* Empty EDM MPI message, used when only the header is needed + */ +struct EDM_MPI_Empty_t : public EDM_MPI_Header_t {}; + +// corresponding MPI type +extern MPI_Datatype EDM_MPI_Empty; + +/* Run information stored in edm::RunAuxiliary, + * augmented with the MPI message id + * + * See DataFormats/Provenance/interface/RunAuxiliary.h + */ +struct EDM_MPI_RunAuxiliary_t : public EDM_MPI_Header_t { + // from DataFormats/Provenance/interface/RunAuxiliary.h + char processHistoryID[16]; // edm::ProcessHistoryID::compactForm() + uint64_t beginTime; // edm::TimeValue_t + uint64_t endTime; // edm::TimeValue_t + uint32_t run; // edm::RunNumber_t +}; + +// corresponding MPI type +extern MPI_Datatype EDM_MPI_RunAuxiliary; + +/* LuminosityBlock information stored in edm::LuminosityBlockAuxiliary, + * augmented with the MPI message id + * + * See DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h + */ +struct EDM_MPI_LuminosityBlockAuxiliary_t : public EDM_MPI_Header_t { + // from DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h + char processHistoryID[16]; // edm::ProcessHistoryID::compactForm() + uint64_t beginTime; // edm::TimeValue_t + uint64_t endTime; // edm::TimeValue_t + uint32_t run; // edm::RunNumber_t + uint32_t lumi; // edm::LuminosityBlockNumber_t +}; + +// corresponding MPI type +extern MPI_Datatype EDM_MPI_LuminosityBlockAuxiliary; + +/* Event information stored in edm::EventAuxiliary, + * augmented with the MPI message id + * + * See DataFormats/Provenance/interface/EventAuxiliary.h + */ +struct EDM_MPI_EventAuxiliary_t : public EDM_MPI_Header_t { + // from DataFormats/Provenance/interface/EventAuxiliary.h + char processHistoryID[16]; // edm::ProcessHistoryID::compactForm() + char processGuid[16]; // process GUID + uint64_t time; // edm::TimeValue_t + int32_t realData; // real data (true) vs simulation (false) + int32_t experimentType; // edm::EventAuxiliary::ExperimentType + int32_t bunchCrossing; // LHC bunch crossing + int32_t orbitNumber; // LHC orbit number + int32_t storeNumber; // LHC fill number ? + uint32_t run; // edm::RunNumber_t + uint32_t lumi; // edm::LuminosityBlockNumber_t + uint32_t event; // edm::EventNumber_t +}; + +// corresponding MPI type +extern MPI_Datatype EDM_MPI_EventAuxiliary; + +// union of all possible messages +union EDM_MPI_Any_t { + EDM_MPI_Header_t header; + EDM_MPI_Empty_t empty; + EDM_MPI_RunAuxiliary_t runAuxiliary; + EDM_MPI_LuminosityBlockAuxiliary_t luminosityBlockAuxiliary; + EDM_MPI_EventAuxiliary_t eventAuxiliary; +}; + +#endif // HeterogeneousCore_MPICore_plugins_messages_h diff --git a/HeterogeneousCore/MPICore/src/classes.h b/HeterogeneousCore/MPICore/src/classes.h new file mode 100644 index 0000000000000..6909ea88bb3c9 --- /dev/null +++ b/HeterogeneousCore/MPICore/src/classes.h @@ -0,0 +1,2 @@ +#include "DataFormats/Common/interface/Wrapper.h" +#include "HeterogeneousCore/MPICore/interface/MPIToken.h" diff --git a/HeterogeneousCore/MPICore/src/classes_def.xml b/HeterogeneousCore/MPICore/src/classes_def.xml new file mode 100644 index 0000000000000..5f4cdb8fcbbf5 --- /dev/null +++ b/HeterogeneousCore/MPICore/src/classes_def.xml @@ -0,0 +1,4 @@ + + + + diff --git a/HeterogeneousCore/MPICore/test/BuildFile.xml b/HeterogeneousCore/MPICore/test/BuildFile.xml index 620466f88c768..2ee6d6058fd40 100644 --- a/HeterogeneousCore/MPICore/test/BuildFile.xml +++ b/HeterogeneousCore/MPICore/test/BuildFile.xml @@ -3,6 +3,9 @@ + + + diff --git a/HeterogeneousCore/MPICore/test/eventlist_cff.py b/HeterogeneousCore/MPICore/test/eventlist_cff.py new file mode 100644 index 0000000000000..abeb05bf21d7e --- /dev/null +++ b/HeterogeneousCore/MPICore/test/eventlist_cff.py @@ -0,0 +1,39 @@ +import FWCore.ParameterSet.Config as cms + +eventlist = cms.VEventID( + # Run 100 + cms.EventID(100, 1, 1), + cms.EventID(100, 1, 4), + cms.EventID(100, 3, 13), + cms.EventID(100, 3, 17), + cms.EventID(100, 5, 18), + cms.EventID(100, 5, 28), + # Run 101 + cms.EventID(101, 1, 2), + cms.EventID(101, 1, 9), + cms.EventID(101, 2, 10), + cms.EventID(101, 2, 14), + cms.EventID(101, 4, 15), + cms.EventID(101, 4, 16), + # Run 102 + cms.EventID(102, 1, 1), + cms.EventID(102, 1, 18), + cms.EventID(102, 1, 43), + cms.EventID(102, 5, 59), + cms.EventID(102, 8, 85), + cms.EventID(102, 8, 89), + # Run 103 + cms.EventID(103, 1, 13), + cms.EventID(103, 4, 42), + cms.EventID(103, 4, 43), + cms.EventID(103, 4, 44), + cms.EventID(103, 9, 95), + cms.EventID(103, 9, 99), + # Run 104 + cms.EventID(104, 3, 31), + cms.EventID(104, 5, 52), + cms.EventID(104, 8, 83), + cms.EventID(104, 8, 84), + cms.EventID(104, 8, 85), + cms.EventID(104, 8, 89), +) diff --git a/HeterogeneousCore/MPICore/test/testMPI.sh b/HeterogeneousCore/MPICore/test/testMPI.sh new file mode 100755 index 0000000000000..4b93c713004a8 --- /dev/null +++ b/HeterogeneousCore/MPICore/test/testMPI.sh @@ -0,0 +1,58 @@ +#! /bin/bash +# Shell script for testing CMSSW over MPI + +mkdir -p $CMSSW_BASE/tmp/$SCRAM_ARCH/test +DIR=$(mktemp -d -p $CMSSW_BASE/tmp/$SCRAM_ARCH/test) +echo "Running MPI tests at $DIR/" +pushd $DIR > /dev/null + +# start an MPI server to let independent CMSSW processes find each other +echo "Starting the Open RTE data server" +ompi-server -r server.uri -d >& ompi-server.log & +SERVER_PID=$! +disown +# wait until the ORTE server logs 'up and running' +while ! grep -q 'up and running' ompi-server.log; do + sleep 1s +done + +# Note: "mpirun --mca pmix_server_uri file:server.uri" is required to make the +# tests work inside a singularity/apptainer container. Without a container the +# cmsRun commands can be used directly. + +# start the "follower" CMSSW job(s) +{ + mpirun --mca pmix_server_uri file:server.uri -n 1 -- cmsRun $CMSSW_BASE/src/HeterogeneousCore/MPICore/test/testMPIFollower.py >& mpifollower.log + echo $? > mpifollower.status +} & +FOLLOWER_PID=$! + +# wait to make sure the MPISource has established the connection to the ORTE server +sleep 3s + +# start the "controller" CMSSW job(s) +{ + mpirun --mca pmix_server_uri file:server.uri -n 1 -- cmsRun $CMSSW_BASE/src/HeterogeneousCore/MPICore/test/testMPIController.py >& mpicontroller.log + echo $? > mpicontroller.status +} & +CONTROLLER_PID=$! + +# wait for the CMSSW jobs to finish +wait $CONTROLLER_PID $FOLLOWER_PID + +# print the jobs' output and check the jobs' exit status +echo '========== testMPIController ===========' +cat mpicontroller.log +MPICONTROLLER_STATUS=$(< mpicontroller.status) +echo '========================================' +echo +echo '=========== testMPIFollower ============' +cat mpifollower.log +MPISOURCE_STATUS=$(< mpifollower.status) +echo '========================================' + +# stop the MPI server and cleanup the URI file +kill $SERVER_PID + +popd > /dev/null +exit $((MPISOURCE_STATUS > MPICONTROLLER_STATUS ? MPISOURCE_STATUS : MPICONTROLLER_STATUS)) diff --git a/HeterogeneousCore/MPICore/test/testMPIController.py b/HeterogeneousCore/MPICore/test/testMPIController.py new file mode 100644 index 0000000000000..99427aaed4267 --- /dev/null +++ b/HeterogeneousCore/MPICore/test/testMPIController.py @@ -0,0 +1,52 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("MPIServer") + +process.options.numberOfThreads = 4 +process.options.numberOfStreams = 4 +# MPIController supports a single concurrent LuminosityBlock +process.options.numberOfConcurrentLuminosityBlocks = 1 +process.options.numberOfConcurrentRuns = 1 +process.options.wantSummary = False + +process.load("HeterogeneousCore.MPIServices.MPIService_cfi") +process.MPIService.pmix_server_uri = 'file:server.uri' + +from eventlist_cff import eventlist +process.source = cms.Source("EmptySourceFromEventIDs", + events = cms.untracked(eventlist) +) + +process.maxEvents.input = 100 + +from HeterogeneousCore.MPICore.mpiController_cfi import mpiController as mpiController_ +process.mpiController = mpiController_.clone() + +process.ids = cms.EDProducer("edmtest::EventIDProducer") + +process.initialcheck = cms.EDAnalyzer("edmtest::EventIDValidator", + source = cms.untracked.InputTag('ids') +) + +process.sender = cms.EDProducer("MPISenderEventID", + channel = cms.InputTag("mpiController"), + instance = cms.int32(42), + data = cms.InputTag("ids") +) + +process.othersender = cms.EDProducer("MPISenderEventID", + channel = cms.InputTag("mpiController"), + instance = cms.int32(19), + data = cms.InputTag("ids") +) + +process.receiver = cms.EDProducer("MPIReceiverEventID", + channel = cms.InputTag("othersender"), # guarantees that this module will only run after othersender has run + instance = cms.int32(99) +) + +process.finalcheck = cms.EDAnalyzer("edmtest::EventIDValidator", + source = cms.untracked.InputTag('receiver') +) + +process.path = cms.Path(process.mpiController + process.ids + process.initialcheck + process.sender + process.othersender + process.receiver + process.finalcheck) diff --git a/HeterogeneousCore/MPICore/test/testMPIFollower.py b/HeterogeneousCore/MPICore/test/testMPIFollower.py new file mode 100644 index 0000000000000..58ed0ed536194 --- /dev/null +++ b/HeterogeneousCore/MPICore/test/testMPIFollower.py @@ -0,0 +1,42 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("MPIClient") + +process.options.numberOfThreads = 8 +process.options.numberOfStreams = 8 +process.options.numberOfConcurrentLuminosityBlocks = 2 +process.options.numberOfConcurrentRuns = 2 +process.options.wantSummary = False + +process.load("HeterogeneousCore.MPIServices.MPIService_cfi") +process.MPIService.pmix_server_uri = "file:server.uri" + +process.source = cms.Source("MPISource") + +process.maxEvents.input = -1 + +# very verbose +#from HeterogeneousCore.MPICore.mpiReporter_cfi import mpiReporter as mpiReporter_ +#process.reporter = mpiReporter_.clone() + +process.receiver = cms.EDProducer("MPIReceiverEventID", + channel = cms.InputTag("source"), + instance = cms.int32(42) +) + +process.otherreceiver = cms.EDProducer("MPIReceiverEventID", + channel = cms.InputTag("source"), + instance = cms.int32(19) +) + +process.sender = cms.EDProducer("MPISenderEventID", + channel = cms.InputTag("otherreceiver"), # guarantees that this module will only run after otherreceiver has run + instance = cms.int32(99), + data = cms.InputTag("otherreceiver") +) + +process.analyzer = cms.EDAnalyzer("edmtest::EventIDValidator", + source = cms.untracked.InputTag("receiver") +) + +process.path = cms.Path(process.receiver + process.analyzer + process.otherreceiver + process.sender) From 166ebc4f90894a8721fdc7bcd05593de15dd21f6 Mon Sep 17 00:00:00 2001 From: Andrea Bocci Date: Tue, 26 Nov 2024 11:10:13 +0100 Subject: [PATCH 5/6] Support generic types in MPISender/MPIReceiver Let MPISender and MPIReceiver consume, send/receive and produce collections of arbitrary types, as long as they have a ROOT dictionary and can be persisted. Note that any transient information is lost during the transfer, and needs to be recreated by the receiving side. The documentation and tests are updated accordingly. Warning: this approach is a work in progress! TODO: - improve framework integration - add checks between send/receive types --- HeterogeneousCore/MPICore/README.md | 47 +++++--- .../MPICore/plugins/BuildFile.xml | 1 - .../MPICore/plugins/MPIReceiver.cc | 72 +++++++----- .../MPICore/plugins/MPISender.cc | 104 +++++++++++++----- .../MPICore/test/testMPIController.py | 22 ++-- .../MPICore/test/testMPIFollower.py | 26 +++-- 6 files changed, 184 insertions(+), 88 deletions(-) diff --git a/HeterogeneousCore/MPICore/README.md b/HeterogeneousCore/MPICore/README.md index b1303c37e20f2..4520a2ebc2d05 100644 --- a/HeterogeneousCore/MPICore/README.md +++ b/HeterogeneousCore/MPICore/README.md @@ -24,21 +24,34 @@ in the second process. ## `MPISource` class The `MPISource` is a `Source` controlling the execution of a second CMSSW process. After setting up the communication -with an `MPIController`, it listens for EDM run, lumi and event transitions, and replicates them in its process. +with an `MPIController`, it listens for EDM run, lumi and event transitions, and replicates them in its own process. ## `MPISender` class -The `MPISender` is an `EDProducer` that can read a collection of a predefined type from the `Event`, serialise it using -its ROOT dictionary, and send it over the MPI communication channel. -`MPISender` is templated on the type to be serialised and transmitted. +The `MPISender` is an `EDProducer` that can read any number of collections or arbitrary types from the `Event`, +serialise them using their ROOT dictionaries, and send them over the MPI communication channel. +The number and types of the collections to be read from the `Event` is determined by the module configuration. + +The configuration can speficy a list of module labels, branch names, or a mix of the two: + - a module label selects all collections produced by that module, irrespective of the type and instance; + - a branch name selects only the collections that match all the branch fields (type, label, instance, process name), + similar to an `OutputModule`'s `"keep ..."` statement. + +Wildcards (`?` and `*`) are allowed in a module label or in each field of a branch name. ## `MPIReceiver` class -The `MPIReceiver` is an `EDProducer` that can receive a collection of a predefined type over the MPI communication -channel, deserialise is using its ROOT dictionary, and put it in the `Event`. -`MPIReceiver` is templated on the type to be received and deserialised. +The `MPIReceiver` is an `EDProducer` that can receive any number of collections of arbitrary types over the MPI +communication channel, deserialise them using their ROOT dictionaries, and produces them in the `Event`. +The number, type and label of the collections to be produced is determined by the module configuration. + +For each collection, the `type` indicates the C++ type as understood by the ROOT dictionary, and the `label` indicates +the module instance label to be used for producing that cllection into the `Event`. + + +## `MPISender` and `MPIReceiver` instances Both `MPISender` and `MPIReceiver` are configured with an instance value that is used to match one `MPISender` in one process to one `MPIReceiver` in another process. Using different instance values allows the use of multiple pairs of @@ -47,24 +60,26 @@ process to one `MPIReceiver` in another process. Using different instance values ## MPI communication channel -The `MPIController` and `MPISource` produce an MPIToken, a special data product that encapsulates the information about -the MPI communication channel. +The `MPIController` and `MPISource` produce an `MPIToken`, a special data product that encapsulates the information +about the MPI communication channel. -Both `MPISender` and `MPIReceiver` obtain the MPI communication channel reading an MPIToken from the event. They also -produce a copy of the MPIToken, so other modules can consume it to declare a dependency on the previous modules. +Both `MPISender` and `MPIReceiver` obtain the MPI communication channel reading an `MPIToken` from the event, identified +by the `upstream` parmeter. +They also produce a copy of the `MPIToken`, so other modules can consume it to declare a dependency on those modules. ## Testing -An automated test is available in the test/ directory. +An automated test is available in the `test/` directory. ## Current limitations - - all communication is blocking, and there is no acknowledgment or feedback from one module to the other; - `MPIDriver` is a "one" module that supports only a single luminosity block at a time; - - `MPISender` and `MPIReceiver` support a single compile-time type; - - there is no check that the type sent by the `MPISender` matches the type expected by the `MPIReceiver`. + - all communication is blocking, and there is no acknowledgment or feedback from one module to the other; this may + lead to a dead lock if a complex sender/receiver topology is used; + - there is no check that the number, type and order of collections sent by the `MPISender` matches those expected by + the `MPIReceiver`. ## Notes for future developments @@ -72,8 +87,6 @@ An automated test is available in the test/ directory. - implement efficient serialisation for standard layout types; - implement efficient serialisation for `PortableCollection` types; - check the the collection sent by the `MPISender` and the one expected by the `MPIReceiver` match; - - extend the `MPISender` and `MPIReceiver` to send and receive multiple collections; - - rewrite the `MPISender` and `MPIReceiver` to send and receive arbitrary run-time collections; - improve the `MPIController` to be a `global` module rather than a `one` module; - let an `MPISource` accept connections and events from multiple `MPIController` modules in different jobs; - let an `MPIController` connect and sent events to multiple `MPISource` modules in different jobs; diff --git a/HeterogeneousCore/MPICore/plugins/BuildFile.xml b/HeterogeneousCore/MPICore/plugins/BuildFile.xml index f094803caf49b..9a9c390c2d27b 100644 --- a/HeterogeneousCore/MPICore/plugins/BuildFile.xml +++ b/HeterogeneousCore/MPICore/plugins/BuildFile.xml @@ -1,5 +1,4 @@ - diff --git a/HeterogeneousCore/MPICore/plugins/MPIReceiver.cc b/HeterogeneousCore/MPICore/plugins/MPIReceiver.cc index 1afc2d616eadd..fc2e268eac104 100644 --- a/HeterogeneousCore/MPICore/plugins/MPIReceiver.cc +++ b/HeterogeneousCore/MPICore/plugins/MPIReceiver.cc @@ -3,7 +3,9 @@ // CMSSW include files #include "FWCore/Framework/interface/Event.h" +#include "FWCore/Framework/interface/GenericProduct.h" #include "FWCore/Framework/interface/global/EDProducer.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" #include "FWCore/ParameterSet/interface/ParameterSet.h" #include "FWCore/ParameterSet/interface/ParameterSetDescription.h" #include "FWCore/Utilities/interface/Exception.h" @@ -12,15 +14,11 @@ // local include files #include "api.h" -template class MPIReceiver : public edm::global::EDProducer<> { - using CollectionType = T; - public: MPIReceiver(edm::ParameterSet const& config) - : mpiPrev_(consumes(config.getParameter("channel"))), - mpiNext_(produces()), - data_(produces()), + : upstream_(consumes(config.getParameter("upstream"))), + token_(produces()), instance_(config.getParameter("instance")) // { // instance 0 is reserved for the MPIController / MPISource pair @@ -29,38 +27,60 @@ class MPIReceiver : public edm::global::EDProducer<> { throw cms::Exception("InvalidValue") << "Invalid MPIReceiver instance value, please use a value between 1 and 255"; } + + auto const& products = config.getParameter>("products"); + products_.reserve(products.size()); + for (auto const& product : products) { + auto const& type = product.getParameter("type"); + auto const& label = product.getParameter("label"); + + Entry entry; + entry.type = edm::TypeWithDict::byName(type); + entry.wrappedType = edm::TypeWithDict::byName("edm::Wrapper<" + type + ">"); + entry.token = produces(edm::TypeID{entry.type.typeInfo()}, label); + + edm::LogAbsolute("MPIReceiver") << "receive type \"" << entry.type.name() << "\" for label \"" << label + << "\" over MPI channel instance " << this->instance_; + + products_.push_back(entry); + } } void produce(edm::StreamID, edm::Event& event, edm::EventSetup const&) const override { // read the MPIToken used to establish the communication channel - MPIToken token = event.get(mpiPrev_); + MPIToken token = event.get(upstream_); - // receive the data sent over the MPI channel - // note: currently this uses a blocking probe/recv - CollectionType data; - token.channel()->receiveSerializedProduct(instance_, data); + for (auto const& entry : products_) { + auto product = std::make_unique(); + product->object_ = entry.type.construct(); + product->wrappedType_ = entry.wrappedType; - // put the data into the Event - event.emplace(data_, std::move(data)); + // receive the data sent over the MPI channel + // note: currently this uses a blocking probe/recv + token.channel()->receiveSerializedProduct(instance_, product->object_); + + // put the data into the Event + event.put(entry.token, std::move(product)); + } // write a shallow copy of the channel to the output, so other modules can consume it // to indicate that they should run after this - event.emplace(mpiNext_, token); + event.emplace(token_, token); } private: - edm::EDGetTokenT const mpiPrev_; // MPIToken used to establish the communication channel - edm::EDPutTokenT const mpiNext_; // copy of the MPIToken that may be used to implement an ordering relation - edm::EDPutTokenT const data_; // data to be read over the channel and put into the Event - int32_t const instance_; // instance used to identify the source-destination pair + struct Entry { + edm::TypeWithDict type; + edm::TypeWithDict wrappedType; + edm::EDPutToken token; + }; + + // TODO consider if upstream_ should be a vector instead of a single token ? + edm::EDGetTokenT const upstream_; // MPIToken used to establish the communication channel + edm::EDPutTokenT const token_; // copy of the MPIToken that may be used to implement an ordering relation + std::vector products_; // data to be read over the channel and put into the Event + int32_t const instance_; // instance used to identify the source-destination pair }; #include "FWCore/Framework/interface/MakerMacros.h" - -#include "DataFormats/Provenance/interface/EventID.h" -using MPIReceiverEventID = MPIReceiver; -DEFINE_FWK_MODULE(MPIReceiverEventID); - -#include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h" -using MPIReceiverFEDRawDataCollection = MPIReceiver; -DEFINE_FWK_MODULE(MPIReceiverFEDRawDataCollection); +DEFINE_FWK_MODULE(MPIReceiver); diff --git a/HeterogeneousCore/MPICore/plugins/MPISender.cc b/HeterogeneousCore/MPICore/plugins/MPISender.cc index 9cfd3936bbf46..e6af5a696ba12 100644 --- a/HeterogeneousCore/MPICore/plugins/MPISender.cc +++ b/HeterogeneousCore/MPICore/plugins/MPISender.cc @@ -1,23 +1,29 @@ +#include +#include +#include + // CMSSW include files +#include "DataFormats/Provenance/interface/BranchDescription.h" +#include "DataFormats/Provenance/interface/BranchPattern.h" #include "FWCore/Framework/interface/Event.h" +#include "FWCore/Framework/interface/GenericHandle.h" #include "FWCore/Framework/interface/global/EDProducer.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" #include "FWCore/ParameterSet/interface/ParameterSet.h" #include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/Reflection/interface/TypeWithDict.h" #include "FWCore/Utilities/interface/Exception.h" #include "HeterogeneousCore/MPICore/interface/MPIToken.h" // local include files #include "api.h" -template class MPISender : public edm::global::EDProducer<> { - using CollectionType = T; - public: MPISender(edm::ParameterSet const& config) - : mpiPrev_(consumes(config.getParameter("channel"))), - mpiNext_(produces()), - data_(consumes(config.getParameter("data"))), + : upstream_(consumes(config.getParameter("upstream"))), + token_(produces()), + patterns_(edm::branchPatterns(config.getParameter>("products"))), instance_(config.getParameter("instance")) // { // instance 0 is reserved for the MPIController / MPISource pair @@ -25,37 +31,83 @@ class MPISender : public edm::global::EDProducer<> { if (instance_ < 1 or instance_ > 255) { throw cms::Exception("InvalidValue") << "Invalid MPISender instance value, please use a value between 1 and 255"; } + + products_.reserve(patterns_.size()); + + callWhenNewProductsRegistered([this](edm::BranchDescription const& branch) { + static const std::string_view kPathStatus("edm::PathStatus"); + static const std::string_view kEndPathStatus("edm::EndPathStatus"); + + switch (branch.branchType()) { + case edm::InEvent: + if (branch.className() == kPathStatus or branch.className() == kEndPathStatus) + return; + for (auto const& pattern : patterns_) { + if (pattern.match(branch)) { + Entry entry; + entry.type = branch.unwrappedType(); + // TODO move this to EDConsumerBase::consumes() ? + entry.token = this->consumes( + edm::TypeToGet{branch.unwrappedTypeID(), edm::PRODUCT_TYPE}, + edm::InputTag{branch.moduleLabel(), branch.productInstanceName(), branch.processName()}); + products_.push_back(entry); + // + edm::LogAbsolute("MPISender") + << "send branch \"" << branch.friendlyClassName() << '_' << branch.moduleLabel() << '_' + << branch.productInstanceName() << '_' << branch.processName() << "\" of type \"" << entry.type.name() + << "\" over MPI channel instance " << instance_; + break; + } + } + break; + + case edm::InLumi: + case edm::InRun: + case edm::InProcess: + // lumi, run and process products are not supported + break; + + default: + throw edm::Exception(edm::errors::LogicError) + << "Unexpected branch type " << branch.branchType() << "\nPlease contact a Framework developer\n"; + } + }); + + // TODO add an error if a pattern does not match any branches? how? } void produce(edm::StreamID, edm::Event& event, edm::EventSetup const&) const override { // read the MPIToken used to establish the communication channel - MPIToken token = event.get(mpiPrev_); - - // read the data to be sent over the MPI channel - auto data = event.get(data_); + MPIToken token = event.get(upstream_); - // send the data over MPI - // note: currently this uses a blocking send - token.channel()->sendSerializedProduct(instance_, data); + for (auto const& product : products_) { + // read the products to be sent over the MPI channel + edm::GenericHandle handle(product.type); + event.getByToken(product.token, handle); + edm::ObjectWithDict const* object = handle.product(); + // send the products over MPI + // note: currently this uses a blocking send + token.channel()->sendSerializedProduct(instance_, *object); + } // write a shallow copy of the channel to the output, so other modules can consume it // to indicate that they should run after this - event.emplace(mpiNext_, token); + event.emplace(token_, token); } private: - edm::EDGetTokenT const mpiPrev_; // MPIToken used to establish the communication channel - edm::EDPutTokenT const mpiNext_; // copy of the MPIToken that may be used to implement an ordering relation - edm::EDGetTokenT const data_; // data to be read from the Event and sent over the channel - int32_t const instance_; // instance used to identify the source-destination pair + struct Entry { + edm::TypeWithDict type; + edm::EDGetToken token; + }; + + // TODO consider if upstream_ should be a vector instead of a single token ? + edm::EDGetTokenT const upstream_; // MPIToken used to establish the communication channel + edm::EDPutTokenT const token_; // copy of the MPIToken that may be used to implement an ordering relation + std::vector patterns_; // branches to read from the Event and send over the MPI channel + std::vector products_; // types and tokens corresponding to the branches + int32_t const instance_; // instance used to identify the source-destination pair }; #include "FWCore/Framework/interface/MakerMacros.h" - -#include "DataFormats/Provenance/interface/EventID.h" -using MPISenderEventID = MPISender; -DEFINE_FWK_MODULE(MPISenderEventID); - -#include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h" -using MPISenderFEDRawDataCollection = MPISender; -DEFINE_FWK_MODULE(MPISenderFEDRawDataCollection); +DEFINE_FWK_MODULE(MPISender); diff --git a/HeterogeneousCore/MPICore/test/testMPIController.py b/HeterogeneousCore/MPICore/test/testMPIController.py index 99427aaed4267..15ce2265cdc1a 100644 --- a/HeterogeneousCore/MPICore/test/testMPIController.py +++ b/HeterogeneousCore/MPICore/test/testMPIController.py @@ -28,21 +28,25 @@ source = cms.untracked.InputTag('ids') ) -process.sender = cms.EDProducer("MPISenderEventID", - channel = cms.InputTag("mpiController"), +process.sender = cms.EDProducer("MPISender", + upstream = cms.InputTag("mpiController"), instance = cms.int32(42), - data = cms.InputTag("ids") + products = cms.vstring("edmEventID_ids__*") ) -process.othersender = cms.EDProducer("MPISenderEventID", - channel = cms.InputTag("mpiController"), +process.othersender = cms.EDProducer("MPISender", + upstream = cms.InputTag("mpiController"), instance = cms.int32(19), - data = cms.InputTag("ids") + products = cms.vstring("edmEventID_ids__*") ) -process.receiver = cms.EDProducer("MPIReceiverEventID", - channel = cms.InputTag("othersender"), # guarantees that this module will only run after othersender has run - instance = cms.int32(99) +process.receiver = cms.EDProducer("MPIReceiver", + upstream = cms.InputTag("othersender"), # guarantees that this module will only run after othersender has run + instance = cms.int32(99), + products = cms.VPSet(cms.PSet( + type = cms.string("edm::EventID"), + label = cms.string("") + )) ) process.finalcheck = cms.EDAnalyzer("edmtest::EventIDValidator", diff --git a/HeterogeneousCore/MPICore/test/testMPIFollower.py b/HeterogeneousCore/MPICore/test/testMPIFollower.py index 58ed0ed536194..eb42f26e59022 100644 --- a/HeterogeneousCore/MPICore/test/testMPIFollower.py +++ b/HeterogeneousCore/MPICore/test/testMPIFollower.py @@ -19,20 +19,28 @@ #from HeterogeneousCore.MPICore.mpiReporter_cfi import mpiReporter as mpiReporter_ #process.reporter = mpiReporter_.clone() -process.receiver = cms.EDProducer("MPIReceiverEventID", - channel = cms.InputTag("source"), - instance = cms.int32(42) +process.receiver = cms.EDProducer("MPIReceiver", + upstream = cms.InputTag("source"), + instance = cms.int32(42), + products = cms.VPSet(cms.PSet( + type = cms.string("edm::EventID"), + label = cms.string("") + )) ) -process.otherreceiver = cms.EDProducer("MPIReceiverEventID", - channel = cms.InputTag("source"), - instance = cms.int32(19) +process.otherreceiver = cms.EDProducer("MPIReceiver", + upstream = cms.InputTag("source"), + instance = cms.int32(19), + products = cms.VPSet(cms.PSet( + type = cms.string("edm::EventID"), + label = cms.string("") + )) ) -process.sender = cms.EDProducer("MPISenderEventID", - channel = cms.InputTag("otherreceiver"), # guarantees that this module will only run after otherreceiver has run +process.sender = cms.EDProducer("MPISender", + upstream = cms.InputTag("otherreceiver"), # guarantees that this module will only run after otherreceiver has run instance = cms.int32(99), - data = cms.InputTag("otherreceiver") + products = cms.vstring("edmEventID_otherreceiver__*") ) process.analyzer = cms.EDAnalyzer("edmtest::EventIDValidator", From 4a91403c14035e4b13834f40f684e72a59f7f135 Mon Sep 17 00:00:00 2001 From: Andrea Bocci Date: Sun, 1 Dec 2024 15:04:39 +0100 Subject: [PATCH 6/6] Improve MPI tests --- HeterogeneousCore/MPICore/test/BuildFile.xml | 2 +- ...testMPIController.py => controller_cfg.py} | 0 .../{testMPIFollower.py => follower_cfg.py} | 0 .../{testMPI.sh => testMPIApplication.sh} | 29 ++++++++++++------- 4 files changed, 20 insertions(+), 11 deletions(-) rename HeterogeneousCore/MPICore/test/{testMPIController.py => controller_cfg.py} (100%) rename HeterogeneousCore/MPICore/test/{testMPIFollower.py => follower_cfg.py} (100%) rename HeterogeneousCore/MPICore/test/{testMPI.sh => testMPIApplication.sh} (69%) diff --git a/HeterogeneousCore/MPICore/test/BuildFile.xml b/HeterogeneousCore/MPICore/test/BuildFile.xml index 2ee6d6058fd40..e79e8d27877c3 100644 --- a/HeterogeneousCore/MPICore/test/BuildFile.xml +++ b/HeterogeneousCore/MPICore/test/BuildFile.xml @@ -3,7 +3,7 @@ - + diff --git a/HeterogeneousCore/MPICore/test/testMPIController.py b/HeterogeneousCore/MPICore/test/controller_cfg.py similarity index 100% rename from HeterogeneousCore/MPICore/test/testMPIController.py rename to HeterogeneousCore/MPICore/test/controller_cfg.py diff --git a/HeterogeneousCore/MPICore/test/testMPIFollower.py b/HeterogeneousCore/MPICore/test/follower_cfg.py similarity index 100% rename from HeterogeneousCore/MPICore/test/testMPIFollower.py rename to HeterogeneousCore/MPICore/test/follower_cfg.py diff --git a/HeterogeneousCore/MPICore/test/testMPI.sh b/HeterogeneousCore/MPICore/test/testMPIApplication.sh similarity index 69% rename from HeterogeneousCore/MPICore/test/testMPI.sh rename to HeterogeneousCore/MPICore/test/testMPIApplication.sh index 4b93c713004a8..dab4869e282dd 100755 --- a/HeterogeneousCore/MPICore/test/testMPI.sh +++ b/HeterogeneousCore/MPICore/test/testMPIApplication.sh @@ -1,5 +1,12 @@ #! /bin/bash # Shell script for testing CMSSW over MPI +CONTROLLER=$(realpath $1) +FOLLOWER=$(realpath $2) + +# make sure the CMSSW environment has been loaded +if [ -z "$CMSSW_BASE" ]; then + eval `scram runtime -sh` +fi mkdir -p $CMSSW_BASE/tmp/$SCRAM_ARCH/test DIR=$(mktemp -d -p $CMSSW_BASE/tmp/$SCRAM_ARCH/test) @@ -22,18 +29,20 @@ done # start the "follower" CMSSW job(s) { - mpirun --mca pmix_server_uri file:server.uri -n 1 -- cmsRun $CMSSW_BASE/src/HeterogeneousCore/MPICore/test/testMPIFollower.py >& mpifollower.log - echo $? > mpifollower.status + mpirun --mca pmix_server_uri file:server.uri -n 1 -- cmsRun $FOLLOWER >& follower.log + echo $? > follower.status } & FOLLOWER_PID=$! -# wait to make sure the MPISource has established the connection to the ORTE server -sleep 3s +# wait until the MPISource has established the connection to the ORTE server +while ! grep -q 'waiting for a connection to the MPI server' follower.log; do + sleep 1s +done # start the "controller" CMSSW job(s) { - mpirun --mca pmix_server_uri file:server.uri -n 1 -- cmsRun $CMSSW_BASE/src/HeterogeneousCore/MPICore/test/testMPIController.py >& mpicontroller.log - echo $? > mpicontroller.status + mpirun --mca pmix_server_uri file:server.uri -n 1 -- cmsRun $CONTROLLER >& controller.log + echo $? > controller.status } & CONTROLLER_PID=$! @@ -42,13 +51,13 @@ wait $CONTROLLER_PID $FOLLOWER_PID # print the jobs' output and check the jobs' exit status echo '========== testMPIController ===========' -cat mpicontroller.log -MPICONTROLLER_STATUS=$(< mpicontroller.status) +cat controller.log +MPICONTROLLER_STATUS=$(< controller.status) echo '========================================' echo echo '=========== testMPIFollower ============' -cat mpifollower.log -MPISOURCE_STATUS=$(< mpifollower.status) +cat follower.log +MPISOURCE_STATUS=$(< follower.status) echo '========================================' # stop the MPI server and cleanup the URI file