diff --git a/HeterogeneousCore/MPICore/BuildFile.xml b/HeterogeneousCore/MPICore/BuildFile.xml index 2b7959989737b..f8ececf282946 100644 --- a/HeterogeneousCore/MPICore/BuildFile.xml +++ b/HeterogeneousCore/MPICore/BuildFile.xml @@ -1,5 +1,5 @@ - + diff --git a/HeterogeneousCore/MPICore/interface/MPIOrigin.h b/HeterogeneousCore/MPICore/interface/MPIOrigin.h index bc912c9c180a4..51428073c4cd3 100644 --- a/HeterogeneousCore/MPICore/interface/MPIOrigin.h +++ b/HeterogeneousCore/MPICore/interface/MPIOrigin.h @@ -22,8 +22,8 @@ class MPIOrigin : public edm::DoNotRecordParents { int process() const { return process_; } // rank of the original MPI process int rank() const { return process_; } // alias for process() - int stream() const { return stream_; } // EDM stream id within the original process - int tag() const { return stream_; } // alias for stream() + int stream() const { return stream_; } // EDM stream id within the original process + int tag() const { return stream_; } // alias for stream() bool valid() const { return (-1 != process_ and -1 != stream_); } diff --git a/HeterogeneousCore/MPICore/plugins/BuildFile.xml b/HeterogeneousCore/MPICore/plugins/BuildFile.xml index a7a5611a9ef30..5cfe88a0d0c46 100644 --- a/HeterogeneousCore/MPICore/plugins/BuildFile.xml +++ b/HeterogeneousCore/MPICore/plugins/BuildFile.xml @@ -4,6 +4,8 @@ + + diff --git a/HeterogeneousCore/MPICore/plugins/MPIDriver.cc b/HeterogeneousCore/MPICore/plugins/MPIDriver.cc index 6560f6182b2b3..cbf4881bc0d2f 100644 --- a/HeterogeneousCore/MPICore/plugins/MPIDriver.cc +++ b/HeterogeneousCore/MPICore/plugins/MPIDriver.cc @@ -4,9 +4,12 @@ #include #include +#include +#include "DataFormats/Provenance/interface/BranchKey.h" #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h" #include "FWCore/Framework/interface/Event.h" +#include "FWCore/Framework/interface/GenericHandle.h" #include "FWCore/Framework/interface/LuminosityBlock.h" #include "FWCore/Framework/interface/MakerMacros.h" #include "FWCore/Framework/interface/Run.h" @@ -16,11 +19,13 @@ #include "FWCore/ParameterSet/interface/ParameterDescriptionNode.h" #include "FWCore/ParameterSet/interface/ParameterSet.h" #include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/Reflection/interface/ObjectWithDict.h" +#include "FWCore/Reflection/interface/TypeWithDict.h" #include "FWCore/ServiceRegistry/interface/Service.h" #include "FWCore/Utilities/src/Guid.h" #include "HeterogeneousCore/MPIServices/interface/MPIService.h" -#include "conversion.h" +#include "api.h" #include "messages.h" /* MPIDriver class @@ -59,14 +64,19 @@ class MPIDriver : public edm::stream::EDAnalyzer<> { private: edm::StreamID sid_ = edm::StreamID::invalidStreamID(); + MPISender sender_; MPI_Comm comm_ = MPI_COMM_NULL; std::vector eventLabels_; std::vector branches_; + std::vector tokens_; }; MPIDriver::MPIDriver(edm::ParameterSet const& config) : eventLabels_(config.getUntrackedParameter>("eventProducts")) { + // sort the labels, so they can be used with binary_search + std::sort(eventLabels_.begin(), eventLabels_.end()); + // make sure that MPI is initialised MPIService::required(); @@ -76,18 +86,20 @@ MPIDriver::MPIDriver(edm::ParameterSet const& config) // look up the "server" port char port[MPI_MAX_PORT_NAME]; MPI_Lookup_name("server", MPI_INFO_NULL, port); - edm::LogAbsolute("MPIDriver") << "Try to connect to the MPI servers at port " << port; + edm::LogAbsolute("MPI") << "Trying to connect to the MPI server on port " << port; // connect to the server int size; MPI_Comm_connect(port, MPI_INFO_NULL, 0, MPI_COMM_SELF, &comm_); MPI_Comm_remote_size(comm_, &size); - edm::LogAbsolute("MPIDriver") << "Client connected to " << size << (size == 1 ? " server" : " servers"); + edm::LogAbsolute("MPI") << "Client connected to " << size << (size == 1 ? " server" : " servers"); if (size > 1) { throw cms::Exception("UnsupportedFeature") << "MPIDriver supports only a single follower, but it was connected to " << size << " followers"; } + sender_ = MPISender(comm_, 0); + // register a dependency on all the event products described by the "eventProducts" callWhenNewProductsRegistered([this](edm::BranchDescription const& branch) { static const std::string kWildcard("*"); static const std::string kPathStatus("edm::PathStatus"); @@ -98,15 +110,16 @@ MPIDriver::MPIDriver(edm::ParameterSet const& config) if (std::binary_search(eventLabels_.begin(), eventLabels_.end(), branch.moduleLabel()) or (std::binary_search(eventLabels_.begin(), eventLabels_.end(), kWildcard) and branch.className() != kPathStatus and branch.className() != kEndPathStatus)) { - this->consumes(edm::TypeToGet{branch.unwrappedTypeID(), edm::PRODUCT_TYPE}, - edm::InputTag{branch.moduleLabel(), branch.productInstanceName(), branch.processName()}); + tokens_.push_back( + this->consumes(edm::TypeToGet{branch.unwrappedTypeID(), edm::PRODUCT_TYPE}, + edm::InputTag{branch.moduleLabel(), branch.productInstanceName(), branch.processName()})); branches_.push_back(branch); } break; default: - // ignore the other product types - ; + // ignore the other product types + ; } }); } @@ -117,133 +130,113 @@ MPIDriver::~MPIDriver() { } void MPIDriver::beginStream(edm::StreamID sid) { + // store this stream's id sid_ = sid; - { - edm::LogAbsolute log("MPI"); - log << "stream " << sid_ << ": begin stream"; - } - - EDM_MPI_Empty_t buffer; - buffer.messageTag = EDM_MPI_Connect; - buffer.stream = sid_; - MPI_Send(&buffer, 1, EDM_MPI_Empty, 0, EDM_MPI_Connect, comm_); + // signal the connection + sender_.sendConnect(sid_); - /* + /* is there a way to access all known process histories ? edm::ProcessHistoryRegistry const& registry = * edm::ProcessHistoryRegistry::instance(); - edm::LogAbsolute("MPIDriver") << "ProcessHistoryRegistry:"; + edm::LogAbsolute("MPI") << "ProcessHistoryRegistry:"; for (auto const& keyval: registry) { - edm::LogAbsolute("MPIDriver") << keyval.first << ": " << keyval.second; + edm::LogAbsolute("MPI") << keyval.first << ": " << keyval.second; } */ // send the branch descriptions for all event products for (auto& branch : branches_) { - TBufferFile blob{TBuffer::kWrite}; - TClass::GetClass(typeid(edm::BranchDescription))->WriteBuffer(blob, &branch); - MPI_Send(blob.Buffer(), blob.Length(), MPI_BYTE, 0, EDM_MPI_SendSerializedProduct, comm_); + sender_.sendSerializedProduct(sid_, branch); } // indicate that all branches have been sent - buffer.messageTag = EDM_MPI_SendComplete; - buffer.stream = sid_; - MPI_Send(&buffer, 1, EDM_MPI_Empty, 0, EDM_MPI_SendComplete, comm_); - - buffer.messageTag = EDM_MPI_BeginStream; - buffer.stream = sid_; - MPI_Send(&buffer, 1, EDM_MPI_Empty, 0, EDM_MPI_BeginStream, comm_); + sender_.sendComplete(sid_); + // signal the begin stream + sender_.sendBeginStream(sid_); } void MPIDriver::endStream() { - { - edm::LogAbsolute log("MPI"); - log << "stream " << sid_ << ": end stream"; - } - - EDM_MPI_Empty_t buffer; - buffer.messageTag = EDM_MPI_EndStream; - buffer.stream = sid_; - MPI_Send(&buffer, 1, EDM_MPI_Empty, 0, EDM_MPI_EndStream, comm_); - - buffer.messageTag = EDM_MPI_Disconnect; - buffer.stream = sid_; - MPI_Send(&buffer, 1, EDM_MPI_Empty, 0, EDM_MPI_Disconnect, comm_); + // signal the end stream + sender_.sendEndStream(sid_); + // signal the disconnection + sender_.sendDisconnect(sid_); } void MPIDriver::beginRun(edm::Run const& run, edm::EventSetup const& setup) { - { - edm::LogAbsolute log("MPI"); - log << "stream " << sid_ << ": begin run " << run.run(); - log << "\nprocess history: " << run.processHistory(); - log << "\nprocess history id: " << run.processHistory().id(); - log << "\nprocess history id: " << run.runAuxiliary().processHistoryID() << " (from runAuxiliary)"; - } - EDM_MPI_RunAuxiliary_t buffer; - buffer.messageTag = EDM_MPI_BeginRun; - buffer.stream = sid_; - /* FIXME: set the ProcessHistoryID in the RunAuxiliary to the correct value ------------------------------- */ - //edmToBuffer(buffer, run.runAuxiliary()); + // signal a new run, and transmit the RunAuxiliary + /* FIXME + * Ideally the ProcessHistoryID stored in the run.runAuxiliary() should be the correct one, and + * we could simply do + + sender_.sendBeginRun(sid_, run.runAuxiliary()); + + * Instead, it looks like the ProcessHistoryID stored in the run.runAuxiliary() is that of the + * _parent_ process. + * So, we make a copy of the RunAuxiliary, set the ProcessHistoryID to the correct value, and + * transmit the modified RunAuxiliary. + */ auto aux = run.runAuxiliary(); aux.setProcessHistoryID(run.processHistory().id()); - edmToBuffer(buffer, aux); - /* FIXME -------------------------------------------------------------------------------------------------- */ - MPI_Send(&buffer, 1, EDM_MPI_RunAuxiliary, 0, EDM_MPI_BeginRun, comm_); - - // send the ProcessHistory - TBufferFile blob{TBuffer::kWrite}; - TClass::GetClass(typeid(edm::ProcessHistory)) - ->WriteBuffer(blob, const_cast(&run.processHistory())); - MPI_Send(blob.Buffer(), blob.Length(), MPI_BYTE, 0, EDM_MPI_SendSerializedProduct, comm_); + sender_.sendBeginRun(sid_, aux); + // transmit the ProcessHistory + sender_.sendSerializedProduct(sid_, run.processHistory()); } void MPIDriver::endRun(edm::Run const& run, edm::EventSetup const& setup) { - { - edm::LogInfo log("MPI"); - log << "stream " << sid_ << ": end run " << run.run(); - } - - EDM_MPI_RunAuxiliary_t buffer; - buffer.messageTag = EDM_MPI_EndRun; - buffer.stream = sid_; - edmToBuffer(buffer, run.runAuxiliary()); - MPI_Send(&buffer, 1, EDM_MPI_RunAuxiliary, 0, EDM_MPI_EndRun, comm_); + // signal the end of run + /* FIXME + * Ideally the ProcessHistoryID stored in the run.runAuxiliary() should be the correct one, and + * we could simply do + + sender_.sendEndRun(sid_, run.runAuxiliary()); + + * Instead, it looks like the ProcessHistoryID stored in the run.runAuxiliary() is that of the + * _parent_ process. + * So, we make a copy of the RunAuxiliary, set the ProcessHistoryID to the correct value, and + * transmit the modified RunAuxiliary. + */ + auto aux = run.runAuxiliary(); + aux.setProcessHistoryID(run.processHistory().id()); + sender_.sendEndRun(sid_, aux); } void MPIDriver::beginLuminosityBlock(edm::LuminosityBlock const& lumi, edm::EventSetup const& setup) { - { - edm::LogAbsolute log("MPI"); - log << "stream " << sid_ << ": begin run " << lumi.run() << ", lumi " << lumi.luminosityBlock(); - log << "\nprocess history: " << lumi.processHistory(); - log << "\nprocess history id: " << lumi.processHistory().id(); - log << "\nprocess history id: " << lumi.luminosityBlockAuxiliary().processHistoryID() - << " (from luminosityBlockAuxiliary)"; - } - - EDM_MPI_LuminosityBlockAuxiliary_t buffer; - buffer.messageTag = EDM_MPI_BeginLuminosityBlock; - buffer.stream = sid_; - /* FIXME: set the ProcessHistoryID in the RunAuxiliary to the correct value ------------------------------- */ - //edmToBuffer(buffer, lumi.luminosityBlockAuxiliary()); + // signal a new luminosity block, and transmit the LuminosityBlockAuxiliary + /* FIXME + * Ideally the ProcessHistoryID stored in the lumi.luminosityBlockAuxiliary() should be the + * correct one, and we could simply do + + sender_.sendBeginLuminosityBlock(sid_, lumi.luminosityBlockAuxiliary()); + + * Instead, it looks like the ProcessHistoryID stored in the lumi.luminosityBlockAuxiliary() is + * that of the _parent_ process. + * So, we make a copy of the LuminosityBlockAuxiliary, set the ProcessHistoryID to the correct + * value, and transmit the modified LuminosityBlockAuxiliary. + */ auto aux = lumi.luminosityBlockAuxiliary(); aux.setProcessHistoryID(lumi.processHistory().id()); - edmToBuffer(buffer, aux); - /* FIXME -------------------------------------------------------------------------------------------------- */ - MPI_Send(&buffer, 1, EDM_MPI_LuminosityBlockAuxiliary, 0, EDM_MPI_BeginLuminosityBlock, comm_); + sender_.sendBeginLuminosityBlock(sid_, aux); } void MPIDriver::endLuminosityBlock(edm::LuminosityBlock const& lumi, edm::EventSetup const& setup) { - { - edm::LogInfo log("MPI"); - log << "stream " << sid_ << ": end run " << lumi.run() << ", lumi " << lumi.luminosityBlock(); - } - - EDM_MPI_LuminosityBlockAuxiliary_t buffer; - buffer.messageTag = EDM_MPI_EndLuminosityBlock; - buffer.stream = sid_; - edmToBuffer(buffer, lumi.luminosityBlockAuxiliary()); - MPI_Send(&buffer, 1, EDM_MPI_LuminosityBlockAuxiliary, 0, EDM_MPI_EndLuminosityBlock, comm_); + // signal the end of luminosity block + /* FIXME + * Ideally the ProcessHistoryID stored in the lumi.luminosityBlockAuxiliary() should be the + * correct one, and we could simply do + + sender_.sendEndLuminosityBlock(sid_, lumi.luminosityBlockAuxiliary()); + + * Instead, it looks like the ProcessHistoryID stored in the lumi.luminosityBlockAuxiliary() is + * that of the _parent_ process. + * So, we make a copy of the LuminosityBlockAuxiliary, set the ProcessHistoryID to the correct + * value, and transmit the modified LuminosityBlockAuxiliary. + */ + auto aux = lumi.luminosityBlockAuxiliary(); + aux.setProcessHistoryID(lumi.processHistory().id()); + sender_.sendEndLuminosityBlock(sid_, aux); } void MPIDriver::analyze(edm::Event const& event, edm::EventSetup const& setup) { + /* { edm::LogAbsolute log("MPI"); log << "stream " << sid_ << ": processing run " << event.run() << ", lumi " << event.luminosityBlock() << ", event " @@ -259,12 +252,34 @@ void MPIDriver::analyze(edm::Event const& event, edm::EventSetup const& setup) { log << "\nprocessHistoryID " << event.eventAuxiliary().processHistoryID(); log << "\nprocessGUID " << edm::Guid(event.eventAuxiliary().processGUID(), true).toString(); } + */ + + // signal a new event, and transmit the EventAuxiliary + sender_.sendEvent(sid_, event.eventAuxiliary()); + + // transmit the event data products + unsigned int size = tokens_.size(); + assert(branches_.size() == size); + for (unsigned int i = 0; i < size; ++i) { + auto const& token = tokens_[i]; + auto const& branch = branches_[i]; + auto const& type = branch.unwrappedType(); + edm::GenericHandle handle(type); + event.getByToken(token, handle); + if (handle.isValid()) { + // transmit the BranchKey in order to reconstruct the BranchDescription on the receiving side + sender_.sendSerializedProduct(sid_, edm::BranchKey(branch)); + // transmit the ProductProvenance + sender_.sendSerializedProduct(sid_, *handle.provenance()->productProvenance()); + // transmit the ProductID + sender_.sendSerializedProduct(sid_, handle.id()); + // transmit the wrapped product + sender_.sendSerializedProduct(sid_, *handle.product()); + } + } - EDM_MPI_EventAuxiliary_t buffer; - buffer.messageTag = EDM_MPI_ProcessEvent; - buffer.stream = sid_; - edmToBuffer(buffer, event.eventAuxiliary()); - MPI_Send(&buffer, 1, EDM_MPI_EventAuxiliary, 0, EDM_MPI_ProcessEvent, comm_); + // indicate that all products have been sent + sender_.sendComplete(sid_); } void MPIDriver::fillDescriptions(edm::ConfigurationDescriptions& descriptions) { diff --git a/HeterogeneousCore/MPICore/plugins/MPIReporter.cc b/HeterogeneousCore/MPICore/plugins/MPIReporter.cc index 1d9e1ba88ff34..a1ab7387f701a 100644 --- a/HeterogeneousCore/MPICore/plugins/MPIReporter.cc +++ b/HeterogeneousCore/MPICore/plugins/MPIReporter.cc @@ -24,15 +24,13 @@ class MPIReporter : public edm::stream::EDAnalyzer<> { edm::EDGetTokenT origin_; }; -MPIReporter::MPIReporter(edm::ParameterSet const& config) : - origin_(consumes(edm::InputTag("source"))) -{ } +MPIReporter::MPIReporter(edm::ParameterSet const& config) : origin_(consumes(edm::InputTag("source"))) {} void MPIReporter::analyze(edm::Event const& event, edm::EventSetup const& setup) { { - edm::LogAbsolute log("MPIReporter"); - log << "stream " << event.streamID() << ": processing run " << event.run() << ", lumi " << event.luminosityBlock() << ", event " - << event.id().event(); + edm::LogAbsolute log("MPI"); + log << "stream " << event.streamID() << ": processing run " << event.run() << ", lumi " << event.luminosityBlock() + << ", event " << event.id().event(); log << "\nprocess history: " << event.processHistory(); log << "\nprocess history id: " << event.processHistory().id(); log << "\nprocess history id: " << event.eventAuxiliary().processHistoryID() << " (from eventAuxiliary)"; @@ -47,9 +45,9 @@ void MPIReporter::analyze(edm::Event const& event, edm::EventSetup const& setup) auto const& origin = event.get(origin_); { - edm::LogAbsolute log("MPIReporter"); + edm::LogAbsolute log("MPI"); log << "original process rank: " << origin.rank(); - log << "\noriginal process stream: " << origin.stream(); + log << "\noriginal process stream: " << origin.stream(); } } diff --git a/HeterogeneousCore/MPICore/plugins/MPISource.cc b/HeterogeneousCore/MPICore/plugins/MPISource.cc index e0b600b868509..f3a742ff8255a 100644 --- a/HeterogeneousCore/MPICore/plugins/MPISource.cc +++ b/HeterogeneousCore/MPICore/plugins/MPISource.cc @@ -4,6 +4,7 @@ #include "DataFormats/Common/interface/Wrapper.h" #include "DataFormats/Provenance/interface/BranchDescription.h" +#include "DataFormats/Provenance/interface/BranchKey.h" #include "DataFormats/Provenance/interface/ProcessHistory.h" #include "DataFormats/Provenance/interface/ProductRegistry.h" #include "FWCore/Framework/interface/Frameworkfwd.h" @@ -12,10 +13,13 @@ #include "FWCore/MessageLogger/interface/MessageLogger.h" #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" #include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/PluginManager/interface/PluginCapabilities.h" +#include "FWCore/Reflection/interface/DictionaryTools.h" #include "FWCore/Sources/interface/ProducerSourceBase.h" #include "HeterogeneousCore/MPICore/interface/MPIOrigin.h" #include "HeterogeneousCore/MPIServices/interface/MPIService.h" +#include "api.h" #include "conversion.h" #include "messages.h" @@ -53,9 +57,11 @@ class MPISource : public edm::InputSource { char port_[MPI_MAX_PORT_NAME]; MPI_Comm comm_ = MPI_COMM_NULL; + MPISender link; + edm::ProcessHistory history_; - std::shared_ptr originBranchDescription_; - std::shared_ptr originProvenance_; + edm::BranchDescription originBranchDescription_; + edm::ProductProvenance originProvenance_; std::shared_ptr runAuxiliary_; std::shared_ptr luminosityBlockAuxiliary_; @@ -64,14 +70,13 @@ class MPISource : public edm::InputSource { struct DataProduct { DataProduct() = default; DataProduct(std::unique_ptr product, - std::shared_ptr branchDescription, - std::shared_ptr provenance) - : product(std::move(product)), - branchDescription(std::move(branchDescription)), - provenance(std::move(provenance)) {} - std::unique_ptr product; - std::shared_ptr branchDescription; - std::shared_ptr provenance; + edm::BranchDescription const* branchDescription, + edm::ProductProvenance provenance) + : product(std::move(product)), branchDescription(branchDescription), provenance(std::move(provenance)) {} + + std::unique_ptr product; // owns the wrapped product until it is put into the event + edm::BranchDescription const* branchDescription; // non-owning pointer + edm::ProductProvenance provenance; // cheap enough to be stored by value }; struct EventData { @@ -84,11 +89,10 @@ class MPISource : public edm::InputSource { MPISource::MPISource(edm::ParameterSet const& config, edm::InputSourceDescription const& desc) : edm::InputSource(config, desc), - originBranchDescription_( - std::make_shared(makeOriginBranchDescription(desc.moduleDescription_))), - originProvenance_(std::make_shared(originBranchDescription_->branchID())) { + originBranchDescription_(makeOriginBranchDescription(desc.moduleDescription_)), + originProvenance_(originBranchDescription_.branchID()) { // register the MPIOrigin branch - productRegistryUpdate().addProduct(*originBranchDescription_, false); + productRegistryUpdate().addProduct(originBranchDescription_); // make sure that MPI is initialised MPIService::required(); @@ -107,8 +111,9 @@ MPISource::MPISource(edm::ParameterSet const& config, edm::InputSourceDescriptio MPI_Publish_name("server", port_info, port_); // create an intercommunicator and accept a client connection - edm::LogAbsolute("MPISource") << "waiting for a connection to the MPI server at port " << port_; + edm::LogAbsolute("MPI") << "waiting for a connection to the MPI server at port " << port_; MPI_Comm_accept(port_, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &comm_); + link = MPISender(comm_, 0); // wait for a client to connect MPI_Status status; @@ -121,23 +126,32 @@ MPISource::MPISource(edm::ParameterSet const& config, edm::InputSourceDescriptio while (true) { MPI_Mprobe(source, MPI_ANY_TAG, comm_, &message, &status); if (status.MPI_TAG == EDM_MPI_SendComplete) { + // all branches have been received MPI_Mrecv(&buffer, 1, EDM_MPI_Empty, &message, &status); - edm::LogAbsolute("MPISource") << "all BranchDescription received"; + edm::LogAbsolute("MPI") << "all BranchDescription received"; break; } else { - assert(status.MPI_TAG == EDM_MPI_SendSerializedProduct); + // receive the branch description for the next event product + assert(EDM_MPI_SendSerializedProduct == status.MPI_TAG); int size; MPI_Get_count(&status, MPI_BYTE, &size); TBufferFile blob{TBuffer::kRead, size}; MPI_Mrecv(blob.Buffer(), size, MPI_BYTE, &message, &status); edm::BranchDescription bd; TClass::GetClass(typeid(edm::BranchDescription))->ReadBuffer(blob, &bd); + bd.setDropped(false); bd.setProduced(false); - // TODO check if the branch description is already present ? - edm::LogAbsolute("MPISource") << "new BranchDescription registered: " << bd; + bd.setOnDemand(false); + bd.setIsProvenanceSetOnRead(true); + bd.init(); productRegistryUpdate().copyProduct(bd); } } + edm::LogAbsolute("MPI") << "registered branchess:\n"; + for (auto& keyval : productRegistry()->productList()) { + edm::LogAbsolute("MPI") << " - " << keyval.first; + } + edm::LogAbsolute("MPI") << '\n'; } MPISource::~MPISource() { @@ -216,7 +230,7 @@ MPISource::ItemType MPISource::getNextItemType() { TClass::GetClass(typeid(edm::ProcessHistory))->ReadBuffer(blob, &history_); history_.initializeTransients(); if (processHistoryRegistryForUpdate().registerProcessHistory(history_)) { - edm::LogAbsolute("MPISource") << "new ProcessHistory registered: " << history_; + edm::LogAbsolute("MPI") << "new ProcessHistory registered: " << history_; } // signal a new run @@ -259,43 +273,88 @@ MPISource::ItemType MPISource::getNextItemType() { case EDM_MPI_ProcessEvent: { // allocate a new event auto& event = events_.emplace_back(); + event.eventProducts.reserve(productRegistryUpdate().size()); // receive the EventAuxiliary - EDM_MPI_EventAuxiliary_t buffer; - MPI_Mrecv(&buffer, 1, EDM_MPI_EventAuxiliary, &message, &status); - edmFromBuffer(buffer, event.eventAuxiliary); + auto [status, stream] = link.receiveEvent(event.eventAuxiliary, message); + int source = status.MPI_SOURCE; // store the MPI origin - auto origin = - std::make_unique>(edm::WrapperBase::Emplace{}, status.MPI_SOURCE, buffer.stream); - event.eventProducts.emplace_back(std::move(origin), originBranchDescription_, originProvenance_); - -#if WHATAVER - // from RootFile::fillEventHistory - EventSelectionIDVector* pESV = &eventSelectionIDs_; - TBranch* eventSelectionIDBranch = eventTree_.tree()->GetBranch(poolNames::eventSelectionsBranchName().c_str()); - eventTree_.fillBranchEntry(eventSelectionIDBranch, pESV); - - BranchListIndexes* pBLI = &branchListIndexes_; - TBranch* branchListIndexesBranch = eventTree_.tree()->GetBranch(poolNames::branchListIndexesBranchName().c_str()); - eventTree_.fillBranchEntry(branchListIndexesBranch, pBLI); - - // from RootFile::readCurrentEvent - runHelper_->overrideRunNumber(eventAux_.id(), eventAux().isRealData()); - - eventTree_.insertEntryForIndex(principal.transitionIndex()); - auto history = processHistoryRegistry_->getMapped(eventAux().processHistoryID()); - principal.fillEventPrincipal(eventAux(), - history, - std::move(eventSelectionIDs_), - std::move(branchListIndexes_), - *(makeProductProvenanceRetriever(principal.streamID().value())), - eventTree_.resetAndGetRootDelayedReader()); - - // from RootFile::readEvent - runHelper_->checkRunConsistency(eventAux().run(), indexIntoFileIter_.run()); - runHelper_->checkLumiConsistency(eventAux().luminosityBlock(), indexIntoFileIter_.lumi()); -#endif // WHATEVER + auto origin = std::make_unique>(edm::WrapperBase::Emplace{}, source, stream); + event.eventProducts.emplace_back(std::move(origin), &originBranchDescription_, originProvenance_); + + // + MPI_Message message; + while (true) { + MPI_Mprobe(source, MPI_ANY_TAG, comm_, &message, &status); + if (EDM_MPI_SendComplete == status.MPI_TAG) { + // all products have been received + EDM_MPI_Empty_t buffer; + MPI_Mrecv(&buffer, 1, EDM_MPI_Empty, &message, &status); + edm::LogAbsolute("MPI") << "all Products received"; + break; + } else { + edm::BranchKey key; + edm::ProductProvenance provenance; + edm::ProductID pid; + edm::WrapperBase* wrapper; + { + // receive the BranchKey + assert(EDM_MPI_SendSerializedProduct == status.MPI_TAG); + int size; + MPI_Get_count(&status, MPI_BYTE, &size); + TBufferFile buffer{TBuffer::kRead, size}; + MPI_Mrecv(buffer.Buffer(), size, MPI_BYTE, &message, &status); + TClass::GetClass(typeid(edm::BranchKey))->ReadBuffer(buffer, &key); + } + + edm::BranchDescription const& branch = productRegistry()->productList().at(key); + + MPI_Mprobe(source, MPI_ANY_TAG, comm_, &message, &status); + { + // receive the ProductProvenance + assert(EDM_MPI_SendSerializedProduct == status.MPI_TAG); + int size; + MPI_Get_count(&status, MPI_BYTE, &size); + TBufferFile buffer{TBuffer::kRead, size}; + MPI_Mrecv(buffer.Buffer(), size, MPI_BYTE, &message, &status); + TClass::GetClass(typeid(edm::ProductProvenance))->ReadBuffer(buffer, &provenance); + } + MPI_Mprobe(source, MPI_ANY_TAG, comm_, &message, &status); + { + // receive the ProductID + assert(EDM_MPI_SendSerializedProduct == status.MPI_TAG); + int size; + MPI_Get_count(&status, MPI_BYTE, &size); + TBufferFile buffer{TBuffer::kRead, size}; + MPI_Mrecv(buffer.Buffer(), size, MPI_BYTE, &message, &status); + TClass::GetClass(typeid(edm::ProductID))->ReadBuffer(buffer, &pid); + } + MPI_Mprobe(source, MPI_ANY_TAG, comm_, &message, &status); + { + // receive the product + assert(EDM_MPI_SendSerializedProduct == status.MPI_TAG); + int size; + MPI_Get_count(&status, MPI_BYTE, &size); + TBufferFile buffer{TBuffer::kRead, size}; + MPI_Mrecv(buffer.Buffer(), size, MPI_BYTE, &message, &status); + // construct an edm::Wrapper and fill it with the received product + // TODO this would be much simpler if the MPIDriver could sent the Wrapper instead of T + edm::TypeWithDict const& type = branch.wrappedType(); + edm::ObjectWithDict object = type.construct(); + *reinterpret_cast(reinterpret_cast(object.address()) + + type.dataMemberByName("present").offset()) = true; + branch.unwrappedType().getClass()->ReadBuffer( + buffer, reinterpret_cast(object.address()) + type.dataMemberByName("obj").offset()); + wrapper = reinterpret_cast(object.address()); + } + edm::LogAbsolute("MPI") << "received object for branch " << key; + //edm::LogAbsolute("MPI") << "received object of type " << branch.unwrappedType(); + + // store the received product + event.eventProducts.emplace_back(std::unique_ptr(wrapper), &branch, provenance); + } + } // signal a new event return IsEvent; @@ -315,12 +374,15 @@ std::shared_ptr MPISource::readLuminosityBlockAux } void MPISource::readEvent_(edm::EventPrincipal& eventPrincipal) { + edm::LogAbsolute("MPI") << "number of buffered events: " << events_.size(); auto& event = events_.front(); - eventPrincipal.fillEventPrincipal(event.eventAuxiliary, &history_); + edm::ProductProvenanceRetriever prov(eventPrincipal.transitionIndex(), *productRegistry()); + eventPrincipal.fillEventPrincipal(event.eventAuxiliary, &history_, edm::EventSelectionIDVector{}, edm::BranchListIndexes{}, prov, nullptr, false); + for (auto& product : event.eventProducts) { - eventPrincipal.put(*product.branchDescription, std::move(product.product), *product.provenance); + //edm::LogAbsolute("MPI") << "putting object for branch " << *product.branchDescription; + eventPrincipal.put(*product.branchDescription, std::move(product.product), product.provenance); } - //eventPrincipal.readAllFromSourceAndMergeImmediately(); events_.pop_front(); } diff --git a/HeterogeneousCore/MPICore/plugins/api.cc b/HeterogeneousCore/MPICore/plugins/api.cc new file mode 100644 index 0000000000000..27b85537cf94f --- /dev/null +++ b/HeterogeneousCore/MPICore/plugins/api.cc @@ -0,0 +1,154 @@ +// C++ standard library headers +#include +#include +#include + +// ROOT headers +#include +#include + +// CMSSW headers +#include "DataFormats/Provenance/interface/EventAuxiliary.h" +#include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h" +#include "DataFormats/Provenance/interface/RunAuxiliary.h" + +// local headers +#include "api.h" +#include "conversion.h" +#include "messages.h" + +namespace { + // copy the content of an std::string-like object to an N-sized char buffer: + // if the string is larger than the buffer, copy only the first N bytes; + // if the string is smaller than the buffer, fill the rest of the buffer with NUL characters. + template + void copy_and_fill(char (&dest)[N], S const& src) { + if (std::size(src) < N) { + memset(dest, 0x00, N); + memcpy(dest, src.data(), std::size(src)); + } else { + memcpy(dest, src.data(), N); + } + } +} // namespace + +// fill an edm::RunAuxiliary object from an EDM_MPI_RunAuxiliary buffer +void MPISender::edmFromBuffer_(EDM_MPI_RunAuxiliary_t const& buffer, edm::RunAuxiliary& aux) { + aux = edm::RunAuxiliary(buffer.run, edm::Timestamp(buffer.beginTime), edm::Timestamp(buffer.endTime)); + aux.setProcessHistoryID( + edm::ProcessHistoryID(std::string(buffer.processHistoryID, std::size(buffer.processHistoryID)))); +} + +// fill an EDM_MPI_RunAuxiliary buffer from an edm::RunAuxiliary object +void MPISender::edmToBuffer_(EDM_MPI_RunAuxiliary_t& buffer, edm::RunAuxiliary const& aux) { + copy_and_fill(buffer.processHistoryID, aux.processHistoryID().compactForm()); + buffer.beginTime = aux.beginTime().value(); + buffer.endTime = aux.endTime().value(); + buffer.run = aux.id().run(); +} + +// fill an edm::LuminosityBlockAuxiliary object from an EDM_MPI_LuminosityBlockAuxiliary buffer +void MPISender::edmFromBuffer_(EDM_MPI_LuminosityBlockAuxiliary_t const& buffer, edm::LuminosityBlockAuxiliary& aux) { + aux = edm::LuminosityBlockAuxiliary( + buffer.run, buffer.lumi, edm::Timestamp(buffer.beginTime), edm::Timestamp(buffer.endTime)); + aux.setProcessHistoryID( + edm::ProcessHistoryID(std::string(buffer.processHistoryID, std::size(buffer.processHistoryID)))); +} + +// fill an EDM_MPI_LuminosityBlockAuxiliary buffer from an edm::LuminosityBlockAuxiliary object +void MPISender::edmToBuffer_(EDM_MPI_LuminosityBlockAuxiliary_t& buffer, edm::LuminosityBlockAuxiliary const& aux) { + copy_and_fill(buffer.processHistoryID, aux.processHistoryID().compactForm()); + buffer.beginTime = aux.beginTime().value(); + buffer.endTime = aux.endTime().value(); + buffer.run = aux.id().run(); + buffer.lumi = aux.id().luminosityBlock(); +} + +// fill an edm::EventAuxiliary object from an EDM_MPI_EventAuxiliary buffer +void MPISender::edmFromBuffer_(EDM_MPI_EventAuxiliary_t const& buffer, edm::EventAuxiliary& aux) { + aux = edm::EventAuxiliary({buffer.run, buffer.lumi, buffer.event}, + std::string(buffer.processGuid, std::size(buffer.processGuid)), + edm::Timestamp(buffer.time), + buffer.realData, + static_cast(buffer.experimentType), + buffer.bunchCrossing, + buffer.storeNumber, + buffer.orbitNumber); + aux.setProcessHistoryID( + edm::ProcessHistoryID(std::string(buffer.processHistoryID, std::size(buffer.processHistoryID)))); +} + +// fill an EDM_MPI_EventAuxiliary buffer from an edm::EventAuxiliary object +void MPISender::edmToBuffer_(EDM_MPI_EventAuxiliary_t& buffer, edm::EventAuxiliary const& aux) { + copy_and_fill(buffer.processHistoryID, aux.processHistoryID().compactForm()); + copy_and_fill(buffer.processGuid, aux.processGUID()); + buffer.time = aux.time().value(); + buffer.realData = aux.isRealData(); + buffer.experimentType = aux.experimentType(); + buffer.bunchCrossing = aux.bunchCrossing(); + buffer.orbitNumber = aux.orbitNumber(); + buffer.storeNumber = aux.storeNumber(); + buffer.run = aux.id().run(); + buffer.lumi = aux.id().luminosityBlock(); + buffer.event = aux.id().event(); +} + +// fill and send an EDM_MPI_Empty_t buffer +void MPISender::sendEmpty_(int tag, int stream) { + EDM_MPI_Empty_t buffer; + buffer.messageTag = tag; + buffer.stream = stream; + MPI_Send(&buffer, 1, EDM_MPI_Empty, dest_, tag, comm_); +} + +// fill and send an EDM_MPI_RunAuxiliary_t buffer +void MPISender::sendRunAuxiliary_(int tag, int stream, edm::RunAuxiliary const& aux) { + EDM_MPI_RunAuxiliary_t buffer; + buffer.messageTag = tag; + buffer.stream = stream; + edmToBuffer_(buffer, aux); + MPI_Send(&buffer, 1, EDM_MPI_RunAuxiliary, dest_, tag, comm_); +} + +// fill and send an EDM_MPI_RunAuxiliary_t buffer +void MPISender::sendLuminosityBlockAuxiliary_(int tag, int stream, edm::LuminosityBlockAuxiliary const& aux) { + EDM_MPI_LuminosityBlockAuxiliary_t buffer; + buffer.messageTag = tag; + buffer.stream = stream; + edmToBuffer_(buffer, aux); + MPI_Send(&buffer, 1, EDM_MPI_LuminosityBlockAuxiliary, dest_, tag, comm_); +} + +// fill and send an EDM_MPI_EventAuxiliary_t buffer +void MPISender::sendEventAuxiliary_(int stream, edm::EventAuxiliary const& aux) { + EDM_MPI_EventAuxiliary_t buffer; + buffer.messageTag = EDM_MPI_ProcessEvent; + buffer.stream = stream; + edmToBuffer_(buffer, aux); + MPI_Send(&buffer, 1, EDM_MPI_EventAuxiliary, dest_, EDM_MPI_ProcessEvent, comm_); +} + +// receive an EDM_MPI_EventAuxiliary_t buffer and populate an edm::EventAuxiliary +std::tuple MPISender::receiveEventAuxiliary_(edm::EventAuxiliary& aux, int source, int tag) { + MPI_Status status; + EDM_MPI_EventAuxiliary_t buffer; + MPI_Recv(&buffer, 1, EDM_MPI_EventAuxiliary, source, tag, comm_, &status); + edmFromBuffer_(buffer, aux); + return std::make_tuple(status, buffer.stream); +} + +// receive an EDM_MPI_EventAuxiliary_t buffer and populate an edm::EventAuxiliary +std::tuple MPISender::receiveEventAuxiliary_(edm::EventAuxiliary& aux, MPI_Message& message) { + MPI_Status status; + EDM_MPI_EventAuxiliary_t buffer; + MPI_Mrecv(&buffer, 1, EDM_MPI_EventAuxiliary, &message, &status); + edmFromBuffer_(buffer, aux); + return std::make_tuple(status, buffer.stream); +} + +// serialize an object of generic type using its ROOT dictionary, and send the binary blob +void MPISender::sendSerializedProduct_(int stream, TClass const* type, void const* product) { + TBufferFile buffer{TBuffer::kWrite}; + buffer.WriteClassBuffer(type, const_cast(product)); + MPI_Send(buffer.Buffer(), buffer.Length(), MPI_BYTE, dest_, EDM_MPI_SendSerializedProduct, comm_); +} diff --git a/HeterogeneousCore/MPICore/plugins/api.h b/HeterogeneousCore/MPICore/plugins/api.h new file mode 100644 index 0000000000000..12d7f6cc3091b --- /dev/null +++ b/HeterogeneousCore/MPICore/plugins/api.h @@ -0,0 +1,138 @@ +#ifndef HeterogeneousCore_MPICore_plugins_api_h +#define HeterogeneousCore_MPICore_plugins_api_h + +// externals headers +#include + +// ROOT headers +#include + +// CMSSW headers +#include "DataFormats/Provenance/interface/ProvenanceFwd.h" +#include "FWCore/Reflection/interface/ObjectWithDict.h" + +// local headers +#include "messages.h" + + +class MPISender { +public: + MPISender() = default; + MPISender(MPI_Comm comm, int destination) : comm_(comm), dest_(destination) {} + + // announce that a client has just connected + void sendConnect(int stream) { + sendEmpty_(EDM_MPI_Connect, stream); + } + + // announce that the client will disconnect + void sendDisconnect(int stream) { + sendEmpty_(EDM_MPI_Disconnect, stream); + } + + // signal the begin of stream + void sendBeginStream(int stream) { + sendEmpty_(EDM_MPI_BeginStream, stream); + } + + // signal the end of stream + void sendEndStream(int stream) { + sendEmpty_(EDM_MPI_EndStream, stream); + } + + // signal a new run, and transmit the RunAuxiliary + void sendBeginRun(int stream, edm::RunAuxiliary const& aux) { + sendRunAuxiliary_(EDM_MPI_BeginRun, stream, aux); + } + + // signal the end of run, and re-transmit the RunAuxiliary + void sendEndRun(int stream, edm::RunAuxiliary const& aux) { + sendRunAuxiliary_(EDM_MPI_EndRun, stream, aux); + } + + // signal a new luminosity block, and transmit the LuminosityBlockAuxiliary + void sendBeginLuminosityBlock(int stream, edm::LuminosityBlockAuxiliary const& aux) { + sendLuminosityBlockAuxiliary_(EDM_MPI_BeginLuminosityBlock, stream, aux); + } + + // signal the end of luminosity block, and re-transmit the LuminosityBlockAuxiliary + void sendEndLuminosityBlock(int stream, edm::LuminosityBlockAuxiliary const& aux) { + sendLuminosityBlockAuxiliary_(EDM_MPI_EndLuminosityBlock, stream, aux); + } + + // signal a new event, and transmit the EventAuxiliary + void sendEvent(int stream, edm::EventAuxiliary const& aux) { + sendEventAuxiliary_(stream, aux); + } + + // start processing a new event, and receive the EventAuxiliary + std::tuple receiveEvent(edm::EventAuxiliary& aux, int source) { + return receiveEventAuxiliary_(aux, source, EDM_MPI_ProcessEvent); + } + + std::tuple receiveEvent(edm::EventAuxiliary& aux, MPI_Message& message) { + return receiveEventAuxiliary_(aux, message); + } + + // serialize an object of type T using its ROOT dictionary, and transmit it + template + void sendSerializedProduct(int stream, T const& product) { + static const TClass* type = TClass::GetClass(); + sendSerializedProduct_(stream, type, &product); + } + + // serialize an object of generic type using its ROOT dictionary, and transmit it + void sendSerializedProduct(int stream, edm::ObjectWithDict const& product) { + // the expected use case is that the product type corresponds to the actual type, + // so we access the type with typeOf() instead of dynamicType() + sendSerializedProduct_(stream, product.typeOf().getClass(), product.address()); + } + + // signal that an expected product will not be transmitted + void sendSkipProduct(int stream) { + sendEmpty_(EDM_MPI_SkipProduct, stream); + } + + // signal that tre transmission of multiple products is complete + void sendComplete(int stream) { + sendEmpty_(EDM_MPI_SendComplete, stream); + } + +private: + // serialize an EDM object to a simplified representation that can be transmitted as an MPI message + void edmToBuffer_(EDM_MPI_RunAuxiliary_t& buffer, edm::RunAuxiliary const& aux); + void edmToBuffer_(EDM_MPI_LuminosityBlockAuxiliary_t& buffer, edm::LuminosityBlockAuxiliary const& aux); + void edmToBuffer_(EDM_MPI_EventAuxiliary_t& buffer, edm::EventAuxiliary const& aux); + + // dwserialize an EDM object from a simplified representation transmitted as an MPI message + void edmFromBuffer_(EDM_MPI_RunAuxiliary_t const& buffer, edm::RunAuxiliary& aux); + void edmFromBuffer_(EDM_MPI_LuminosityBlockAuxiliary_t const& buffer, edm::LuminosityBlockAuxiliary& aux); + void edmFromBuffer_(EDM_MPI_EventAuxiliary_t const& buffer, edm::EventAuxiliary& aux); + + // fill and send an EDM_MPI_Empty_t buffer + void sendEmpty_(int tag, int stream); + + // fill and send an EDM_MPI_RunAuxiliary_t buffer + void sendRunAuxiliary_(int tag, int stream, edm::RunAuxiliary const& aux); + + // fill and send an EDM_MPI_LuminosityBlock_t buffer + void sendLuminosityBlockAuxiliary_(int tag, int stream, edm::LuminosityBlockAuxiliary const& aux); + + // fill and send an EDM_MPI_EventAuxiliary_t buffer + void sendEventAuxiliary_(int stream, edm::EventAuxiliary const& aux); + + // receive an EDM_MPI_EventAuxiliary_t buffer and populate an edm::EventAuxiliary + std::tuple receiveEventAuxiliary_(edm::EventAuxiliary& aux, int source, int tag); + std::tuple receiveEventAuxiliary_(edm::EventAuxiliary& aux, MPI_Message& message); + + // serialize a generic object using its ROOT dictionary, and send the binary blob + void sendSerializedProduct_(int stream, TClass const* type, void const* product); + + // MPI intercommunicator + MPI_Comm comm_ = MPI_COMM_NULL; + + // MPI destination + int dest_ = 0; +}; + +#endif // HeterogeneousCore_MPICore_plugins_api_h diff --git a/HeterogeneousCore/MPICore/plugins/messages.cc b/HeterogeneousCore/MPICore/plugins/messages.cc index 7a44e75ede1d5..ede2226f604c7 100644 --- a/HeterogeneousCore/MPICore/plugins/messages.cc +++ b/HeterogeneousCore/MPICore/plugins/messages.cc @@ -70,6 +70,7 @@ void EDM_MPI_build_types_() { EDM_MPI_MessageType[EDM_MPI_ProcessEvent] = EDM_MPI_EventAuxiliary; // EDM_MPI_MessageType[EDM_MPI_SendSerializedProduct] = MPI_BYTE; // variable-length binary blob EDM_MPI_MessageType[EDM_MPI_SendTrivialProduct] = MPI_BYTE; // variable-length binary blob + EDM_MPI_MessageType[EDM_MPI_SkipProduct] = EDM_MPI_Empty; // EDM_MPI_MessageType[EDM_MPI_SendComplete] = EDM_MPI_Empty; // } diff --git a/HeterogeneousCore/MPICore/plugins/messages.h b/HeterogeneousCore/MPICore/plugins/messages.h index e376f992dc1ab..9f739628926a1 100644 --- a/HeterogeneousCore/MPICore/plugins/messages.h +++ b/HeterogeneousCore/MPICore/plugins/messages.h @@ -5,7 +5,6 @@ #include - /* register the MPI message types forthe EDM communication */ void EDM_MPI_build_types(); @@ -25,6 +24,7 @@ enum EDM_MPI_MessageTag { EDM_MPI_ProcessEvent, EDM_MPI_SendSerializedProduct, EDM_MPI_SendTrivialProduct, + EDM_MPI_SkipProduct, EDM_MPI_SendComplete, EDM_MPI_MessageTagCount_ }; diff --git a/HeterogeneousCore/MPICore/test/testMPIDriver.py b/HeterogeneousCore/MPICore/test/testMPIDriver.py index b420801fe1e0c..11f652779facb 100644 --- a/HeterogeneousCore/MPICore/test/testMPIDriver.py +++ b/HeterogeneousCore/MPICore/test/testMPIDriver.py @@ -2,6 +2,7 @@ process = cms.Process("MPIServer") +#process.load("FWCore.Services.Tracer_cfi") process.load("HeterogeneousCore.MPIServices.MPIService_cfi") process.MPIService.pmix_server_uri = 'file:server.uri' diff --git a/HeterogeneousCore/MPICore/test/testMPISource.py b/HeterogeneousCore/MPICore/test/testMPISource.py index 2b607c3c6ccbb..5ec3a883595f2 100644 --- a/HeterogeneousCore/MPICore/test/testMPISource.py +++ b/HeterogeneousCore/MPICore/test/testMPISource.py @@ -2,6 +2,7 @@ process = cms.Process("MPIClient") +#process.load("FWCore.Services.Tracer_cfi") process.load("HeterogeneousCore.MPIServices.MPIService_cfi") process.MPIService.pmix_server_uri = 'file:server.uri'