Skip to content

Commit

Permalink
It works !
Browse files Browse the repository at this point in the history
  • Loading branch information
fwyzard committed Jan 13, 2021
1 parent 50bb09e commit 7f49394
Show file tree
Hide file tree
Showing 12 changed files with 545 additions and 173 deletions.
2 changes: 1 addition & 1 deletion HeterogeneousCore/MPICore/BuildFile.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<use name="openmpi"/>
<use name="DataFormats/Common"/>
<use name="FWCore/Framework"/>
<export>
<lib name="1"/>
</export>
4 changes: 2 additions & 2 deletions HeterogeneousCore/MPICore/interface/MPIOrigin.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ class MPIOrigin : public edm::DoNotRecordParents {
int process() const { return process_; } // rank of the original MPI process
int rank() const { return process_; } // alias for process()

int stream() const { return stream_; } // EDM stream id within the original process
int tag() const { return stream_; } // alias for stream()
int stream() const { return stream_; } // EDM stream id within the original process
int tag() const { return stream_; } // alias for stream()

bool valid() const { return (-1 != process_ and -1 != stream_); }

Expand Down
2 changes: 2 additions & 0 deletions HeterogeneousCore/MPICore/plugins/BuildFile.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
<use name="FWCore/Framework"/>
<use name="FWCore/MessageLogger"/>
<use name="FWCore/ParameterSet"/>
<use name="FWCore/PluginManager"/>
<use name="FWCore/Reflection"/>
<use name="FWCore/ServiceRegistry"/>
<use name="FWCore/Sources"/>
<use name="FWCore/Utilities"/>
Expand Down
225 changes: 120 additions & 105 deletions HeterogeneousCore/MPICore/plugins/MPIDriver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
#include <mpi.h>

#include <TBufferFile.h>
#include <TClass.h>

#include "DataFormats/Provenance/interface/BranchKey.h"
#include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
#include "FWCore/Framework/interface/Event.h"
#include "FWCore/Framework/interface/GenericHandle.h"
#include "FWCore/Framework/interface/LuminosityBlock.h"
#include "FWCore/Framework/interface/MakerMacros.h"
#include "FWCore/Framework/interface/Run.h"
Expand All @@ -16,11 +19,13 @@
#include "FWCore/ParameterSet/interface/ParameterDescriptionNode.h"
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
#include "FWCore/Reflection/interface/ObjectWithDict.h"
#include "FWCore/Reflection/interface/TypeWithDict.h"
#include "FWCore/ServiceRegistry/interface/Service.h"
#include "FWCore/Utilities/src/Guid.h"
#include "HeterogeneousCore/MPIServices/interface/MPIService.h"

#include "conversion.h"
#include "api.h"
#include "messages.h"

/* MPIDriver class
Expand Down Expand Up @@ -59,14 +64,19 @@ class MPIDriver : public edm::stream::EDAnalyzer<> {
private:
edm::StreamID sid_ = edm::StreamID::invalidStreamID();

MPISender sender_;
MPI_Comm comm_ = MPI_COMM_NULL;

std::vector<std::string> eventLabels_;
std::vector<edm::BranchDescription> branches_;
std::vector<edm::EDGetToken> tokens_;
};

MPIDriver::MPIDriver(edm::ParameterSet const& config)
: eventLabels_(config.getUntrackedParameter<std::vector<std::string>>("eventProducts")) {
// sort the labels, so they can be used with binary_search
std::sort(eventLabels_.begin(), eventLabels_.end());

// make sure that MPI is initialised
MPIService::required();

Expand All @@ -76,18 +86,20 @@ MPIDriver::MPIDriver(edm::ParameterSet const& config)
// look up the "server" port
char port[MPI_MAX_PORT_NAME];
MPI_Lookup_name("server", MPI_INFO_NULL, port);
edm::LogAbsolute("MPIDriver") << "Try to connect to the MPI servers at port " << port;
edm::LogAbsolute("MPI") << "Trying to connect to the MPI server on port " << port;

// connect to the server
int size;
MPI_Comm_connect(port, MPI_INFO_NULL, 0, MPI_COMM_SELF, &comm_);
MPI_Comm_remote_size(comm_, &size);
edm::LogAbsolute("MPIDriver") << "Client connected to " << size << (size == 1 ? " server" : " servers");
edm::LogAbsolute("MPI") << "Client connected to " << size << (size == 1 ? " server" : " servers");
if (size > 1) {
throw cms::Exception("UnsupportedFeature")
<< "MPIDriver supports only a single follower, but it was connected to " << size << " followers";
}
sender_ = MPISender(comm_, 0);

// register a dependency on all the event products described by the "eventProducts"
callWhenNewProductsRegistered([this](edm::BranchDescription const& branch) {
static const std::string kWildcard("*");
static const std::string kPathStatus("edm::PathStatus");
Expand All @@ -98,15 +110,16 @@ MPIDriver::MPIDriver(edm::ParameterSet const& config)
if (std::binary_search(eventLabels_.begin(), eventLabels_.end(), branch.moduleLabel()) or
(std::binary_search(eventLabels_.begin(), eventLabels_.end(), kWildcard) and
branch.className() != kPathStatus and branch.className() != kEndPathStatus)) {
this->consumes(edm::TypeToGet{branch.unwrappedTypeID(), edm::PRODUCT_TYPE},
edm::InputTag{branch.moduleLabel(), branch.productInstanceName(), branch.processName()});
tokens_.push_back(
this->consumes(edm::TypeToGet{branch.unwrappedTypeID(), edm::PRODUCT_TYPE},
edm::InputTag{branch.moduleLabel(), branch.productInstanceName(), branch.processName()}));
branches_.push_back(branch);
}
break;

default:
// ignore the other product types
;
// ignore the other product types
;
}
});
}
Expand All @@ -117,133 +130,113 @@ MPIDriver::~MPIDriver() {
}

void MPIDriver::beginStream(edm::StreamID sid) {
// store this stream's id
sid_ = sid;

{
edm::LogAbsolute log("MPI");
log << "stream " << sid_ << ": begin stream";
}

EDM_MPI_Empty_t buffer;
buffer.messageTag = EDM_MPI_Connect;
buffer.stream = sid_;
MPI_Send(&buffer, 1, EDM_MPI_Empty, 0, EDM_MPI_Connect, comm_);
// signal the connection
sender_.sendConnect(sid_);

/*
/* is there a way to access all known process histories ?
edm::ProcessHistoryRegistry const& registry = * edm::ProcessHistoryRegistry::instance();
edm::LogAbsolute("MPIDriver") << "ProcessHistoryRegistry:";
edm::LogAbsolute("MPI") << "ProcessHistoryRegistry:";
for (auto const& keyval: registry) {
edm::LogAbsolute("MPIDriver") << keyval.first << ": " << keyval.second;
edm::LogAbsolute("MPI") << keyval.first << ": " << keyval.second;
}
*/

// send the branch descriptions for all event products
for (auto& branch : branches_) {
TBufferFile blob{TBuffer::kWrite};
TClass::GetClass(typeid(edm::BranchDescription))->WriteBuffer(blob, &branch);
MPI_Send(blob.Buffer(), blob.Length(), MPI_BYTE, 0, EDM_MPI_SendSerializedProduct, comm_);
sender_.sendSerializedProduct(sid_, branch);
}
// indicate that all branches have been sent
buffer.messageTag = EDM_MPI_SendComplete;
buffer.stream = sid_;
MPI_Send(&buffer, 1, EDM_MPI_Empty, 0, EDM_MPI_SendComplete, comm_);

buffer.messageTag = EDM_MPI_BeginStream;
buffer.stream = sid_;
MPI_Send(&buffer, 1, EDM_MPI_Empty, 0, EDM_MPI_BeginStream, comm_);
sender_.sendComplete(sid_);
// signal the begin stream
sender_.sendBeginStream(sid_);
}

void MPIDriver::endStream() {
{
edm::LogAbsolute log("MPI");
log << "stream " << sid_ << ": end stream";
}

EDM_MPI_Empty_t buffer;
buffer.messageTag = EDM_MPI_EndStream;
buffer.stream = sid_;
MPI_Send(&buffer, 1, EDM_MPI_Empty, 0, EDM_MPI_EndStream, comm_);

buffer.messageTag = EDM_MPI_Disconnect;
buffer.stream = sid_;
MPI_Send(&buffer, 1, EDM_MPI_Empty, 0, EDM_MPI_Disconnect, comm_);
// signal the end stream
sender_.sendEndStream(sid_);
// signal the disconnection
sender_.sendDisconnect(sid_);
}

void MPIDriver::beginRun(edm::Run const& run, edm::EventSetup const& setup) {
{
edm::LogAbsolute log("MPI");
log << "stream " << sid_ << ": begin run " << run.run();
log << "\nprocess history: " << run.processHistory();
log << "\nprocess history id: " << run.processHistory().id();
log << "\nprocess history id: " << run.runAuxiliary().processHistoryID() << " (from runAuxiliary)";
}
EDM_MPI_RunAuxiliary_t buffer;
buffer.messageTag = EDM_MPI_BeginRun;
buffer.stream = sid_;
/* FIXME: set the ProcessHistoryID in the RunAuxiliary to the correct value ------------------------------- */
//edmToBuffer(buffer, run.runAuxiliary());
// signal a new run, and transmit the RunAuxiliary
/* FIXME
* Ideally the ProcessHistoryID stored in the run.runAuxiliary() should be the correct one, and
* we could simply do
sender_.sendBeginRun(sid_, run.runAuxiliary());
* Instead, it looks like the ProcessHistoryID stored in the run.runAuxiliary() is that of the
* _parent_ process.
* So, we make a copy of the RunAuxiliary, set the ProcessHistoryID to the correct value, and
* transmit the modified RunAuxiliary.
*/
auto aux = run.runAuxiliary();
aux.setProcessHistoryID(run.processHistory().id());
edmToBuffer(buffer, aux);
/* FIXME -------------------------------------------------------------------------------------------------- */
MPI_Send(&buffer, 1, EDM_MPI_RunAuxiliary, 0, EDM_MPI_BeginRun, comm_);

// send the ProcessHistory
TBufferFile blob{TBuffer::kWrite};
TClass::GetClass(typeid(edm::ProcessHistory))
->WriteBuffer(blob, const_cast<edm::ProcessHistory*>(&run.processHistory()));
MPI_Send(blob.Buffer(), blob.Length(), MPI_BYTE, 0, EDM_MPI_SendSerializedProduct, comm_);
sender_.sendBeginRun(sid_, aux);
// transmit the ProcessHistory
sender_.sendSerializedProduct(sid_, run.processHistory());
}

void MPIDriver::endRun(edm::Run const& run, edm::EventSetup const& setup) {
{
edm::LogInfo log("MPI");
log << "stream " << sid_ << ": end run " << run.run();
}

EDM_MPI_RunAuxiliary_t buffer;
buffer.messageTag = EDM_MPI_EndRun;
buffer.stream = sid_;
edmToBuffer(buffer, run.runAuxiliary());
MPI_Send(&buffer, 1, EDM_MPI_RunAuxiliary, 0, EDM_MPI_EndRun, comm_);
// signal the end of run
/* FIXME
* Ideally the ProcessHistoryID stored in the run.runAuxiliary() should be the correct one, and
* we could simply do
sender_.sendEndRun(sid_, run.runAuxiliary());
* Instead, it looks like the ProcessHistoryID stored in the run.runAuxiliary() is that of the
* _parent_ process.
* So, we make a copy of the RunAuxiliary, set the ProcessHistoryID to the correct value, and
* transmit the modified RunAuxiliary.
*/
auto aux = run.runAuxiliary();
aux.setProcessHistoryID(run.processHistory().id());
sender_.sendEndRun(sid_, aux);
}

void MPIDriver::beginLuminosityBlock(edm::LuminosityBlock const& lumi, edm::EventSetup const& setup) {
{
edm::LogAbsolute log("MPI");
log << "stream " << sid_ << ": begin run " << lumi.run() << ", lumi " << lumi.luminosityBlock();
log << "\nprocess history: " << lumi.processHistory();
log << "\nprocess history id: " << lumi.processHistory().id();
log << "\nprocess history id: " << lumi.luminosityBlockAuxiliary().processHistoryID()
<< " (from luminosityBlockAuxiliary)";
}

EDM_MPI_LuminosityBlockAuxiliary_t buffer;
buffer.messageTag = EDM_MPI_BeginLuminosityBlock;
buffer.stream = sid_;
/* FIXME: set the ProcessHistoryID in the RunAuxiliary to the correct value ------------------------------- */
//edmToBuffer(buffer, lumi.luminosityBlockAuxiliary());
// signal a new luminosity block, and transmit the LuminosityBlockAuxiliary
/* FIXME
* Ideally the ProcessHistoryID stored in the lumi.luminosityBlockAuxiliary() should be the
* correct one, and we could simply do
sender_.sendBeginLuminosityBlock(sid_, lumi.luminosityBlockAuxiliary());
* Instead, it looks like the ProcessHistoryID stored in the lumi.luminosityBlockAuxiliary() is
* that of the _parent_ process.
* So, we make a copy of the LuminosityBlockAuxiliary, set the ProcessHistoryID to the correct
* value, and transmit the modified LuminosityBlockAuxiliary.
*/
auto aux = lumi.luminosityBlockAuxiliary();
aux.setProcessHistoryID(lumi.processHistory().id());
edmToBuffer(buffer, aux);
/* FIXME -------------------------------------------------------------------------------------------------- */
MPI_Send(&buffer, 1, EDM_MPI_LuminosityBlockAuxiliary, 0, EDM_MPI_BeginLuminosityBlock, comm_);
sender_.sendBeginLuminosityBlock(sid_, aux);
}

void MPIDriver::endLuminosityBlock(edm::LuminosityBlock const& lumi, edm::EventSetup const& setup) {
{
edm::LogInfo log("MPI");
log << "stream " << sid_ << ": end run " << lumi.run() << ", lumi " << lumi.luminosityBlock();
}

EDM_MPI_LuminosityBlockAuxiliary_t buffer;
buffer.messageTag = EDM_MPI_EndLuminosityBlock;
buffer.stream = sid_;
edmToBuffer(buffer, lumi.luminosityBlockAuxiliary());
MPI_Send(&buffer, 1, EDM_MPI_LuminosityBlockAuxiliary, 0, EDM_MPI_EndLuminosityBlock, comm_);
// signal the end of luminosity block
/* FIXME
* Ideally the ProcessHistoryID stored in the lumi.luminosityBlockAuxiliary() should be the
* correct one, and we could simply do
sender_.sendEndLuminosityBlock(sid_, lumi.luminosityBlockAuxiliary());
* Instead, it looks like the ProcessHistoryID stored in the lumi.luminosityBlockAuxiliary() is
* that of the _parent_ process.
* So, we make a copy of the LuminosityBlockAuxiliary, set the ProcessHistoryID to the correct
* value, and transmit the modified LuminosityBlockAuxiliary.
*/
auto aux = lumi.luminosityBlockAuxiliary();
aux.setProcessHistoryID(lumi.processHistory().id());
sender_.sendEndLuminosityBlock(sid_, aux);
}

void MPIDriver::analyze(edm::Event const& event, edm::EventSetup const& setup) {
/*
{
edm::LogAbsolute log("MPI");
log << "stream " << sid_ << ": processing run " << event.run() << ", lumi " << event.luminosityBlock() << ", event "
Expand All @@ -259,12 +252,34 @@ void MPIDriver::analyze(edm::Event const& event, edm::EventSetup const& setup) {
log << "\nprocessHistoryID " << event.eventAuxiliary().processHistoryID();
log << "\nprocessGUID " << edm::Guid(event.eventAuxiliary().processGUID(), true).toString();
}
*/

// signal a new event, and transmit the EventAuxiliary
sender_.sendEvent(sid_, event.eventAuxiliary());

// transmit the event data products
unsigned int size = tokens_.size();
assert(branches_.size() == size);
for (unsigned int i = 0; i < size; ++i) {
auto const& token = tokens_[i];
auto const& branch = branches_[i];
auto const& type = branch.unwrappedType();
edm::GenericHandle handle(type);
event.getByToken(token, handle);
if (handle.isValid()) {
// transmit the BranchKey in order to reconstruct the BranchDescription on the receiving side
sender_.sendSerializedProduct(sid_, edm::BranchKey(branch));
// transmit the ProductProvenance
sender_.sendSerializedProduct(sid_, *handle.provenance()->productProvenance());
// transmit the ProductID
sender_.sendSerializedProduct(sid_, handle.id());
// transmit the wrapped product
sender_.sendSerializedProduct(sid_, *handle.product());
}
}

EDM_MPI_EventAuxiliary_t buffer;
buffer.messageTag = EDM_MPI_ProcessEvent;
buffer.stream = sid_;
edmToBuffer(buffer, event.eventAuxiliary());
MPI_Send(&buffer, 1, EDM_MPI_EventAuxiliary, 0, EDM_MPI_ProcessEvent, comm_);
// indicate that all products have been sent
sender_.sendComplete(sid_);
}

void MPIDriver::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
Expand Down
14 changes: 6 additions & 8 deletions HeterogeneousCore/MPICore/plugins/MPIReporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@ class MPIReporter : public edm::stream::EDAnalyzer<> {
edm::EDGetTokenT<MPIOrigin> origin_;
};

MPIReporter::MPIReporter(edm::ParameterSet const& config) :
origin_(consumes<MPIOrigin>(edm::InputTag("source")))
{ }
MPIReporter::MPIReporter(edm::ParameterSet const& config) : origin_(consumes<MPIOrigin>(edm::InputTag("source"))) {}

void MPIReporter::analyze(edm::Event const& event, edm::EventSetup const& setup) {
{
edm::LogAbsolute log("MPIReporter");
log << "stream " << event.streamID() << ": processing run " << event.run() << ", lumi " << event.luminosityBlock() << ", event "
<< event.id().event();
edm::LogAbsolute log("MPI");
log << "stream " << event.streamID() << ": processing run " << event.run() << ", lumi " << event.luminosityBlock()
<< ", event " << event.id().event();
log << "\nprocess history: " << event.processHistory();
log << "\nprocess history id: " << event.processHistory().id();
log << "\nprocess history id: " << event.eventAuxiliary().processHistoryID() << " (from eventAuxiliary)";
Expand All @@ -47,9 +45,9 @@ void MPIReporter::analyze(edm::Event const& event, edm::EventSetup const& setup)

auto const& origin = event.get(origin_);
{
edm::LogAbsolute log("MPIReporter");
edm::LogAbsolute log("MPI");
log << "original process rank: " << origin.rank();
log << "\noriginal process stream: " << origin.stream();
log << "\noriginal process stream: " << origin.stream();
}
}

Expand Down
Loading

0 comments on commit 7f49394

Please sign in to comment.