Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Created DroppedDataProductResolver #47117

Merged
merged 4 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions FWCore/Framework/interface/Principal.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,7 @@ namespace edm {
virtual void changedIndexes_() {}

//called by adjustIndexesAfterProductRegistryAddition
void addDelayedReaderInputProduct(std::shared_ptr<BranchDescription const> bd);
void addPutOnReadInputProduct(std::shared_ptr<BranchDescription const> bd);
void addDroppedProduct(BranchDescription const& bd);

WrapperBase const* getIt(ProductID const&) const override;
std::optional<std::tuple<WrapperBase const*, unsigned int>> getThinnedProduct(ProductID const&,
Expand Down
29 changes: 29 additions & 0 deletions FWCore/Framework/src/DroppedDataProductResolver.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*----------------------------------------------------------------------
----------------------------------------------------------------------*/
#include "DroppedDataProductResolver.h"
#include "FWCore/Framework/interface/ProductProvenanceRetriever.h"

namespace edm {

DroppedDataProductResolver::Resolution DroppedDataProductResolver::resolveProduct_(
Principal const& principal,
bool skipCurrentProcess,
SharedResourcesAcquirer* sra,
ModuleCallingContext const* mcc) const {
return Resolution(nullptr);
}
void DroppedDataProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
Principal const& principal,
bool skipCurrentProcess,
ServiceToken const& token,
SharedResourcesAcquirer* sra,
ModuleCallingContext const* mcc) const noexcept {}

void DroppedDataProductResolver::retrieveAndMerge_(
Principal const& principal, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {}

void DroppedDataProductResolver::setProductProvenanceRetriever_(ProductProvenanceRetriever const* provRetriever) {
m_provenance.setStore(provRetriever);
}

} // namespace edm
56 changes: 56 additions & 0 deletions FWCore/Framework/src/DroppedDataProductResolver.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#ifndef FWCore_Framework_DroppedDataProductResolver_h
#define FWCore_Framework_DroppedDataProductResolver_h

/*----------------------------------------------------------------------

DroppedDataProductResolver: Handles case of a DataProduct which was dropped on output

----------------------------------------------------------------------*/

#include "FWCore/Framework/interface/ProductResolverBase.h"

namespace edm {
class DroppedDataProductResolver : public ProductResolverBase {
public:
DroppedDataProductResolver(std::shared_ptr<BranchDescription const> bd)
: ProductResolverBase(), m_provenance(std::move(bd), {}) {}

void connectTo(ProductResolverBase const&, Principal const*) final {}

private:
Resolution resolveProduct_(Principal const& principal,
bool skipCurrentProcess,
SharedResourcesAcquirer* sra,
ModuleCallingContext const* mcc) const final;
void prefetchAsync_(WaitingTaskHolder waitTask,
Principal const& principal,
bool skipCurrentProcess,
ServiceToken const& token,
SharedResourcesAcquirer* sra,
ModuleCallingContext const* mcc) const noexcept final;

void retrieveAndMerge_(Principal const& principal,
MergeableRunProductMetadata const* mergeableRunProductMetadata) const final;
bool productUnavailable_() const final { return true; }
bool productResolved_() const final { return true; }
bool productWasDeleted_() const final { return false; }
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const final { return false; }
bool unscheduledWasNotRun_() const final { return false; }
void resetProductData_(bool deleteEarly) final {}
BranchDescription const& branchDescription_() const final { return m_provenance.branchDescription(); }
void resetBranchDescription_(std::shared_ptr<BranchDescription const> bd) final {
m_provenance.setBranchDescription(bd);
}
Provenance const* provenance_() const final { return &m_provenance; }

std::string const& resolvedModuleLabel_() const final { return moduleLabel(); }
void setProductProvenanceRetriever_(ProductProvenanceRetriever const* provRetriever) final;
void setProductID_(ProductID const& pid) final { m_provenance.setProductID(pid); }
ProductProvenance const* productProvenancePtr_() const final { return m_provenance.productProvenance(); }
bool singleProduct_() const final { return true; }

Provenance m_provenance;
};
} // namespace edm

#endif
10 changes: 9 additions & 1 deletion FWCore/Framework/src/EventPrincipal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,15 @@ namespace edm {

unsigned int EventPrincipal::transitionIndex_() const { return streamID_.value(); }

void EventPrincipal::changedIndexes_() { provRetrieverPtr_->update(productRegistry()); }
void EventPrincipal::changedIndexes_() {
provRetrieverPtr_->update(productRegistry());
//If new Retrievers were added, we need to pass the provenance retriever
for (auto& prod : *this) {
if (prod->singleProduct()) {
prod->setProductProvenanceRetriever(productProvenanceRetrieverPtr());
}
}
}

static void throwProductDeletedException(ProductID const& pid,
edm::EventPrincipal::ConstProductResolverPtr const phb) {
Expand Down
20 changes: 7 additions & 13 deletions FWCore/Framework/src/Principal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include "FWCore/Framework/src/ProductDeletedException.h"
#include "FWCore/Framework/interface/ProductPutterBase.h"
#include "FWCore/Framework/interface/EDConsumerBase.h"
#include "ProductResolvers.h"
#include "DroppedDataProductResolver.h"
#include "FWCore/Utilities/interface/EDMException.h"
#include "FWCore/Utilities/interface/ProductResolverIndex.h"
#include "FWCore/Utilities/interface/TypeID.h"
Expand Down Expand Up @@ -166,12 +166,8 @@ namespace edm {
return true;
}

void Principal::addDelayedReaderInputProduct(std::shared_ptr<BranchDescription const> bd) {
addProductOrThrow(std::make_unique<DelayedReaderInputProductResolver>(std::move(bd)));
}

void Principal::addPutOnReadInputProduct(std::shared_ptr<BranchDescription const> bd) {
addProductOrThrow(std::make_unique<PutOnReadInputProductResolver>(std::move(bd)));
void Principal::addDroppedProduct(BranchDescription const& bd) {
addProductOrThrow(std::make_unique<DroppedDataProductResolver>(std::make_shared<BranchDescription const>(bd)));
}

// "Zero" the principal so it can be reused for another Event.
Expand Down Expand Up @@ -357,6 +353,7 @@ namespace edm {

Principal::ConstProductResolverPtr Principal::getProductResolverByIndex(
ProductResolverIndex const& index) const noexcept {
assert(index < productResolvers_.size());
ConstProductResolverPtr const phb = productResolvers_[index].get();
return phb;
}
Expand Down Expand Up @@ -678,12 +675,9 @@ namespace edm {
if (!productResolvers_[index]) {
// no product holder. Must add one. The new entry must be an input product holder.
assert(!bd.produced());
auto cbd = std::make_shared<BranchDescription const>(bd);
if (bd.onDemand()) {
addDelayedReaderInputProduct(cbd);
} else {
addPutOnReadInputProduct(cbd);
}
assert(bd.dropped());
//adding the resolver allows access to the provenance for the data product
addDroppedProduct(bd);
changed = true;
}
}
Expand Down
9 changes: 9 additions & 0 deletions FWCore/Framework/src/ProductResolversFactory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "FWCore/Framework/interface/ProductResolverBase.h"
#include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
#include "ProductResolvers.h"
#include "DroppedDataProductResolver.h"

#include <memory>

Expand All @@ -25,6 +26,10 @@ namespace edm::productResolversFactory {
std::shared_ptr<ProductResolverBase> makeTransformProduct(std::shared_ptr<BranchDescription const> bd) {
return std::make_shared<TransformingProductResolver>(std::move(bd));
}
std::shared_ptr<ProductResolverBase> makeDroppedProduct(std::shared_ptr<BranchDescription const> bd) {
return std::make_shared<DroppedDataProductResolver>(std::move(bd));
}

std::shared_ptr<ProductResolverBase> makeAliasedProduct(
std::shared_ptr<BranchDescription const> bd,
ProductRegistry const& iReg,
Expand Down Expand Up @@ -98,6 +103,10 @@ namespace edm::productResolversFactory {
return makeScheduledProduct(cbd);
}
/* not produced so comes from source */
if (bd.dropped()) {
//this allows access to provenance for the dropped product
return makeDroppedProduct(cbd);
}
if (bd.onDemand()) {
return makeDelayedReaderInputProduct(cbd);
}
Expand Down
24 changes: 17 additions & 7 deletions FWCore/Integration/plugins/OtherThingProducer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,18 @@ namespace edmtest {

private:
OtherThingAlgorithm alg_;
edm::EDGetToken thingToken_;
edm::EDPutToken putToken_;
edm::EDGetTokenT<ThingCollection> thingToken_;
edm::EDPutTokenT<OtherThingCollection> putToken_;
bool useRefs_;
bool refsAreTransient_;
bool thingMissing_;
};

OtherThingProducer::OtherThingProducer(edm::ParameterSet const& pset) : alg_(), refsAreTransient_(false) {
putToken_ = produces<OtherThingCollection>("testUserTag");
useRefs_ = pset.getUntrackedParameter<bool>("useRefs");
if (useRefs_) {
thingMissing_ = pset.getUntrackedParameter<bool>("thingMissing");
if (useRefs_ or thingMissing_) {
thingToken_ = consumes<ThingCollection>(pset.getParameter<edm::InputTag>("thingTag"));
}
refsAreTransient_ = pset.getUntrackedParameter<bool>("transient");
Expand All @@ -51,10 +53,17 @@ namespace edmtest {

// Step C: Get data for algorithm
edm::Handle<ThingCollection> parentHandle;
if (useRefs_) {
bool succeeded = e.getByToken(thingToken_, parentHandle);
assert(succeeded);
assert(parentHandle.isValid());
if (useRefs_ or thingMissing_) {
parentHandle = e.getHandle(thingToken_);
//If not here, throw exception
if (thingMissing_) {
if (parentHandle.isValid()) {
throw cms::Exception("TestFailure")
<< "The ThingCollection is available when it was expected to not be available";
}
} else {
*parentHandle;
}
}

// Step D: Invoke the algorithm, passing in inputs (NONE) and getting back outputs.
Expand All @@ -71,6 +80,7 @@ namespace edmtest {
->setComment("Actually get the ThingCollection and build edm::Refs to the contained items.");
desc.addUntracked<bool>("transient", false)
->setComment("If true, then the Refs constructed by the ThingCollection can not be persisted");
desc.addUntracked<bool>("thingMissing", false)->setComment("If true, expect that thing collection is missing");
descriptions.add("otherThingProd", desc);
}

Expand Down
34 changes: 34 additions & 0 deletions FWCore/Integration/plugins/TestFindProduct.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ namespace edmtest {
void endProcessBlock(edm::ProcessBlock const&) override;
void endJob() override;

static void fillDescriptions(edm::ConfigurationDescriptions& iDesc);

private:
std::vector<edm::InputTag> inputTags_;
int expectedSum_;
Expand Down Expand Up @@ -191,6 +193,38 @@ namespace edmtest {
}
}

void TestFindProduct::fillDescriptions(edm::ConfigurationDescriptions& iDesc) {
iDesc.setComment("Tests state of IntProduct, UInt64Product, and/or View<int> data products in the job.");
edm::ParameterSetDescription ps;

const std::vector<edm::InputTag> emptyTagVector;

ps.addUntracked<std::vector<edm::InputTag>>("inputTags", emptyTagVector)
->setComment("Get these IntProduct data products");
ps.addUntracked<int>("expectedSum", 0)
->setComment("The sum of all values from data products obtained from entire job.");
ps.addUntracked<int>("expectedCache", 0)->setComment("Check value of ProcessBlock caches.");
ps.addUntracked<bool>("getByTokenFirst", false)->setComment("Call getByToken before calling getByLabel");
ps.addUntracked<bool>("runProducerParameterCheck", false);
ps.addUntracked<bool>("testGetterOfProducts", false);
ps.addUntracked<std::vector<edm::InputTag>>("inputTagsNotFound", emptyTagVector)
->setComment("Data products which should be missing from the job.");
ps.addUntracked<std::vector<edm::InputTag>>("inputTagsView", emptyTagVector)
->setComment("Data products to get via View<int>.");
ps.addUntracked<std::vector<edm::InputTag>>("inputTagsUInt64", emptyTagVector)
->setComment("Get these UInt64Product data products.");
ps.addUntracked<std::vector<edm::InputTag>>("inputTagsEndLumi", emptyTagVector);
ps.addUntracked<std::vector<edm::InputTag>>("inputTagsEndRun", emptyTagVector);
ps.addUntracked<std::vector<edm::InputTag>>("inputTagsBeginProcessBlock", emptyTagVector);
ps.addUntracked<std::vector<edm::InputTag>>("inputTagsInputProcessBlock", emptyTagVector);
ps.addUntracked<std::vector<edm::InputTag>>("inputTagsEndProcessBlock", emptyTagVector);
ps.addUntracked<std::vector<edm::InputTag>>("inputTagsEndProcessBlock2", emptyTagVector);
ps.addUntracked<std::vector<edm::InputTag>>("inputTagsEndProcessBlock3", emptyTagVector);
ps.addUntracked<std::vector<edm::InputTag>>("inputTagsEndProcessBlock4", emptyTagVector);

iDesc.addDefault(ps);
}

TestFindProduct::~TestFindProduct() {}

void TestFindProduct::analyze(edm::Event const& event, edm::EventSetup const&) {
Expand Down
48 changes: 40 additions & 8 deletions FWCore/Integration/plugins/TestParentage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ namespace edmtest {

void analyze(edm::StreamID, edm::Event const& e, edm::EventSetup const& es) const override;

static void fillDescriptions(edm::ConfigurationDescriptions& iDesc);

private:
edm::EDGetTokenT<IntProduct> token_;
std::vector<std::string> expectedAncestors_;
Expand All @@ -70,14 +72,31 @@ namespace edmtest {
expectedAncestors_(pset.getParameter<std::vector<std::string> >("expectedAncestors")),
callGetProvenance_(pset.getUntrackedParameter<bool>("callGetProvenance", true)) {}

void TestParentage::fillDescriptions(edm::ConfigurationDescriptions& iDesc) {
edm::ParameterSetDescription ps;
ps.add<edm::InputTag>("inputTag");
ps.add<std::vector<std::string> >("expectedAncestors")
->setComment(
"Module labels for data products directly/indirectly obtained to make data product retrieved using "
"'inputTag'.");
ps.addUntracked<bool>("callGetProvenance", true)
->setComment("Use Event::getProvenance to get ancestor provenance.");

iDesc.addDefault(ps);
}

void TestParentage::analyze(edm::StreamID, edm::Event const& e, edm::EventSetup const&) const {
edm::Handle<IntProduct> h = e.getHandle(token_);

*h;
edm::Provenance const* prov = h.provenance();

if (not prov) {
throw cms::Exception("MissingProvenance") << "Failed to get provenance for 'inputTag'";
}
if (prov->originalBranchID() != prov->branchDescription().originalBranchID()) {
std::cerr << "TestParentage::analyze: test of Provenance::originalBranchID function failed" << std::endl;
abort();
throw cms::Exception("InconsistentBranchID")
<< " test of Provenance::originalBranchID function failed. Expected "
<< prov->branchDescription().originalBranchID() << " but see " << prov->originalBranchID();
}

std::set<std::string> expectedAncestors(expectedAncestors_.begin(), expectedAncestors_.end());
Expand All @@ -91,6 +110,12 @@ namespace edmtest {
// Currently we need to turn off this part of the test of when calling
// from a SubProcess and the parentage includes a product not kept
// in the SubProcess. This might get fixed someday ...
auto toException = [](auto& ex, auto const& ancestors) {
for (auto const& a : ancestors) {
ex << a << ", ";
}
};

if (callGetProvenance_) {
std::set<edm::BranchID> ancestors;
getAncestors(e, prov->branchID(), ancestors);
Expand All @@ -100,8 +125,12 @@ namespace edmtest {
ancestorLabels.insert(branchIDToLabel[ancestor]);
}
if (ancestorLabels != expectedAncestors) {
std::cerr << "TestParentage::analyze: ancestors do not match expected ancestors" << std::endl;
abort();
cms::Exception ex("WrongAncestors");
ex << "ancestors from Event::getProvenance\n";
toException(ex, ancestorLabels);
ex << "\n do not match expected ancestors\n";
toException(ex, expectedAncestors);
throw ex;
}
}

Expand All @@ -114,9 +143,12 @@ namespace edmtest {
ancestorLabels2.insert(branchIDToLabel[ancestor]);
}
if (ancestorLabels2 != expectedAncestors) {
std::cerr << "TestParentage::analyze: ancestors do not match expected ancestors (parentage from retriever)"
<< std::endl;
abort();
cms::Exception ex("WrongAncestors");
ex << "ancestors from ParentageRetriever\n";
toException(ex, ancestorLabels2);
ex << "\n do not match expected ancestors\n";
toException(ex, expectedAncestors);
throw ex;
}
}
} // namespace edmtest
Expand Down
2 changes: 2 additions & 0 deletions FWCore/Integration/test/BuildFile.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@
<test name="TestFWCoreIntegrationTryToContinueESProducer" command="cmsRun ${LOCALTOP}/src/FWCore/Integration/test/test_TryToContinue_ESProducer_cfg.py"/>
<test name="TestFWCoreIntegrationTryToContinueESProducerContinue" command="cmsRun ${LOCALTOP}/src/FWCore/Integration/test/test_TryToContinue_ESProducer_cfg.py --continueAnalyzer"/>

<test name="TestFWCoreIntegrationInconsistentProducts" command="run_inconsistent_products.sh"/>

<test name="TestIntegrationProcessBlock1" command="run_TestProcessBlock.sh 1"/>
<test name="TestIntegrationProcessBlock2" command="run_TestProcessBlock.sh 2"/>
<test name="TestIntegrationProcessBlock3" command="run_TestProcessBlock.sh 3"/>
Expand Down
Loading