Skip to content

Commit

Permalink
Merge pull request #47117 from Dr15Jones/droppedResolver
Browse files Browse the repository at this point in the history
Created DroppedDataProductResolver
  • Loading branch information
cmsbuild authored Jan 20, 2025
2 parents a696d9d + 7b12dde commit 3885803
Show file tree
Hide file tree
Showing 13 changed files with 389 additions and 31 deletions.
3 changes: 1 addition & 2 deletions FWCore/Framework/interface/Principal.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,7 @@ namespace edm {
virtual void changedIndexes_() {}

//called by adjustIndexesAfterProductRegistryAddition
void addDelayedReaderInputProduct(std::shared_ptr<BranchDescription const> bd);
void addPutOnReadInputProduct(std::shared_ptr<BranchDescription const> bd);
void addDroppedProduct(BranchDescription const& bd);

WrapperBase const* getIt(ProductID const&) const override;
std::optional<std::tuple<WrapperBase const*, unsigned int>> getThinnedProduct(ProductID const&,
Expand Down
29 changes: 29 additions & 0 deletions FWCore/Framework/src/DroppedDataProductResolver.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*----------------------------------------------------------------------
----------------------------------------------------------------------*/
#include "DroppedDataProductResolver.h"
#include "FWCore/Framework/interface/ProductProvenanceRetriever.h"

namespace edm {

DroppedDataProductResolver::Resolution DroppedDataProductResolver::resolveProduct_(
Principal const& principal,
bool skipCurrentProcess,
SharedResourcesAcquirer* sra,
ModuleCallingContext const* mcc) const {
return Resolution(nullptr);
}
void DroppedDataProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
Principal const& principal,
bool skipCurrentProcess,
ServiceToken const& token,
SharedResourcesAcquirer* sra,
ModuleCallingContext const* mcc) const noexcept {}

void DroppedDataProductResolver::retrieveAndMerge_(
Principal const& principal, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {}

void DroppedDataProductResolver::setProductProvenanceRetriever_(ProductProvenanceRetriever const* provRetriever) {
m_provenance.setStore(provRetriever);
}

} // namespace edm
56 changes: 56 additions & 0 deletions FWCore/Framework/src/DroppedDataProductResolver.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#ifndef FWCore_Framework_DroppedDataProductResolver_h
#define FWCore_Framework_DroppedDataProductResolver_h

/*----------------------------------------------------------------------
DroppedDataProductResolver: Handles case of a DataProduct which was dropped on output
----------------------------------------------------------------------*/

#include "FWCore/Framework/interface/ProductResolverBase.h"

namespace edm {
class DroppedDataProductResolver : public ProductResolverBase {
public:
DroppedDataProductResolver(std::shared_ptr<BranchDescription const> bd)
: ProductResolverBase(), m_provenance(std::move(bd), {}) {}

void connectTo(ProductResolverBase const&, Principal const*) final {}

private:
Resolution resolveProduct_(Principal const& principal,
bool skipCurrentProcess,
SharedResourcesAcquirer* sra,
ModuleCallingContext const* mcc) const final;
void prefetchAsync_(WaitingTaskHolder waitTask,
Principal const& principal,
bool skipCurrentProcess,
ServiceToken const& token,
SharedResourcesAcquirer* sra,
ModuleCallingContext const* mcc) const noexcept final;

void retrieveAndMerge_(Principal const& principal,
MergeableRunProductMetadata const* mergeableRunProductMetadata) const final;
bool productUnavailable_() const final { return true; }
bool productResolved_() const final { return true; }
bool productWasDeleted_() const final { return false; }
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const final { return false; }
bool unscheduledWasNotRun_() const final { return false; }
void resetProductData_(bool deleteEarly) final {}
BranchDescription const& branchDescription_() const final { return m_provenance.branchDescription(); }
void resetBranchDescription_(std::shared_ptr<BranchDescription const> bd) final {
m_provenance.setBranchDescription(bd);
}
Provenance const* provenance_() const final { return &m_provenance; }

std::string const& resolvedModuleLabel_() const final { return moduleLabel(); }
void setProductProvenanceRetriever_(ProductProvenanceRetriever const* provRetriever) final;
void setProductID_(ProductID const& pid) final { m_provenance.setProductID(pid); }
ProductProvenance const* productProvenancePtr_() const final { return m_provenance.productProvenance(); }
bool singleProduct_() const final { return true; }

Provenance m_provenance;
};
} // namespace edm

#endif
10 changes: 9 additions & 1 deletion FWCore/Framework/src/EventPrincipal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,15 @@ namespace edm {

unsigned int EventPrincipal::transitionIndex_() const { return streamID_.value(); }

void EventPrincipal::changedIndexes_() { provRetrieverPtr_->update(productRegistry()); }
void EventPrincipal::changedIndexes_() {
provRetrieverPtr_->update(productRegistry());
//If new Retrievers were added, we need to pass the provenance retriever
for (auto& prod : *this) {
if (prod->singleProduct()) {
prod->setProductProvenanceRetriever(productProvenanceRetrieverPtr());
}
}
}

static void throwProductDeletedException(ProductID const& pid,
edm::EventPrincipal::ConstProductResolverPtr const phb) {
Expand Down
20 changes: 7 additions & 13 deletions FWCore/Framework/src/Principal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include "FWCore/Framework/src/ProductDeletedException.h"
#include "FWCore/Framework/interface/ProductPutterBase.h"
#include "FWCore/Framework/interface/EDConsumerBase.h"
#include "ProductResolvers.h"
#include "DroppedDataProductResolver.h"
#include "FWCore/Utilities/interface/EDMException.h"
#include "FWCore/Utilities/interface/ProductResolverIndex.h"
#include "FWCore/Utilities/interface/TypeID.h"
Expand Down Expand Up @@ -166,12 +166,8 @@ namespace edm {
return true;
}

void Principal::addDelayedReaderInputProduct(std::shared_ptr<BranchDescription const> bd) {
addProductOrThrow(std::make_unique<DelayedReaderInputProductResolver>(std::move(bd)));
}

void Principal::addPutOnReadInputProduct(std::shared_ptr<BranchDescription const> bd) {
addProductOrThrow(std::make_unique<PutOnReadInputProductResolver>(std::move(bd)));
void Principal::addDroppedProduct(BranchDescription const& bd) {
addProductOrThrow(std::make_unique<DroppedDataProductResolver>(std::make_shared<BranchDescription const>(bd)));
}

// "Zero" the principal so it can be reused for another Event.
Expand Down Expand Up @@ -357,6 +353,7 @@ namespace edm {

Principal::ConstProductResolverPtr Principal::getProductResolverByIndex(
ProductResolverIndex const& index) const noexcept {
assert(index < productResolvers_.size());
ConstProductResolverPtr const phb = productResolvers_[index].get();
return phb;
}
Expand Down Expand Up @@ -678,12 +675,9 @@ namespace edm {
if (!productResolvers_[index]) {
// no product holder. Must add one. The new entry must be an input product holder.
assert(!bd.produced());
auto cbd = std::make_shared<BranchDescription const>(bd);
if (bd.onDemand()) {
addDelayedReaderInputProduct(cbd);
} else {
addPutOnReadInputProduct(cbd);
}
assert(bd.dropped());
//adding the resolver allows access to the provenance for the data product
addDroppedProduct(bd);
changed = true;
}
}
Expand Down
9 changes: 9 additions & 0 deletions FWCore/Framework/src/ProductResolversFactory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "FWCore/Framework/interface/ProductResolverBase.h"
#include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
#include "ProductResolvers.h"
#include "DroppedDataProductResolver.h"

#include <memory>

Expand All @@ -25,6 +26,10 @@ namespace edm::productResolversFactory {
std::shared_ptr<ProductResolverBase> makeTransformProduct(std::shared_ptr<BranchDescription const> bd) {
return std::make_shared<TransformingProductResolver>(std::move(bd));
}
std::shared_ptr<ProductResolverBase> makeDroppedProduct(std::shared_ptr<BranchDescription const> bd) {
return std::make_shared<DroppedDataProductResolver>(std::move(bd));
}

std::shared_ptr<ProductResolverBase> makeAliasedProduct(
std::shared_ptr<BranchDescription const> bd,
ProductRegistry const& iReg,
Expand Down Expand Up @@ -98,6 +103,10 @@ namespace edm::productResolversFactory {
return makeScheduledProduct(cbd);
}
/* not produced so comes from source */
if (bd.dropped()) {
//this allows access to provenance for the dropped product
return makeDroppedProduct(cbd);
}
if (bd.onDemand()) {
return makeDelayedReaderInputProduct(cbd);
}
Expand Down
24 changes: 17 additions & 7 deletions FWCore/Integration/plugins/OtherThingProducer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,18 @@ namespace edmtest {

private:
OtherThingAlgorithm alg_;
edm::EDGetToken thingToken_;
edm::EDPutToken putToken_;
edm::EDGetTokenT<ThingCollection> thingToken_;
edm::EDPutTokenT<OtherThingCollection> putToken_;
bool useRefs_;
bool refsAreTransient_;
bool thingMissing_;
};

OtherThingProducer::OtherThingProducer(edm::ParameterSet const& pset) : alg_(), refsAreTransient_(false) {
putToken_ = produces<OtherThingCollection>("testUserTag");
useRefs_ = pset.getUntrackedParameter<bool>("useRefs");
if (useRefs_) {
thingMissing_ = pset.getUntrackedParameter<bool>("thingMissing");
if (useRefs_ or thingMissing_) {
thingToken_ = consumes<ThingCollection>(pset.getParameter<edm::InputTag>("thingTag"));
}
refsAreTransient_ = pset.getUntrackedParameter<bool>("transient");
Expand All @@ -51,10 +53,17 @@ namespace edmtest {

// Step C: Get data for algorithm
edm::Handle<ThingCollection> parentHandle;
if (useRefs_) {
bool succeeded = e.getByToken(thingToken_, parentHandle);
assert(succeeded);
assert(parentHandle.isValid());
if (useRefs_ or thingMissing_) {
parentHandle = e.getHandle(thingToken_);
//If not here, throw exception
if (thingMissing_) {
if (parentHandle.isValid()) {
throw cms::Exception("TestFailure")
<< "The ThingCollection is available when it was expected to not be available";
}
} else {
*parentHandle;
}
}

// Step D: Invoke the algorithm, passing in inputs (NONE) and getting back outputs.
Expand All @@ -71,6 +80,7 @@ namespace edmtest {
->setComment("Actually get the ThingCollection and build edm::Refs to the contained items.");
desc.addUntracked<bool>("transient", false)
->setComment("If true, then the Refs constructed by the ThingCollection can not be persisted");
desc.addUntracked<bool>("thingMissing", false)->setComment("If true, expect that thing collection is missing");
descriptions.add("otherThingProd", desc);
}

Expand Down
34 changes: 34 additions & 0 deletions FWCore/Integration/plugins/TestFindProduct.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ namespace edmtest {
void endProcessBlock(edm::ProcessBlock const&) override;
void endJob() override;

static void fillDescriptions(edm::ConfigurationDescriptions& iDesc);

private:
std::vector<edm::InputTag> inputTags_;
int expectedSum_;
Expand Down Expand Up @@ -191,6 +193,38 @@ namespace edmtest {
}
}

void TestFindProduct::fillDescriptions(edm::ConfigurationDescriptions& iDesc) {
iDesc.setComment("Tests state of IntProduct, UInt64Product, and/or View<int> data products in the job.");
edm::ParameterSetDescription ps;

const std::vector<edm::InputTag> emptyTagVector;

ps.addUntracked<std::vector<edm::InputTag>>("inputTags", emptyTagVector)
->setComment("Get these IntProduct data products");
ps.addUntracked<int>("expectedSum", 0)
->setComment("The sum of all values from data products obtained from entire job.");
ps.addUntracked<int>("expectedCache", 0)->setComment("Check value of ProcessBlock caches.");
ps.addUntracked<bool>("getByTokenFirst", false)->setComment("Call getByToken before calling getByLabel");
ps.addUntracked<bool>("runProducerParameterCheck", false);
ps.addUntracked<bool>("testGetterOfProducts", false);
ps.addUntracked<std::vector<edm::InputTag>>("inputTagsNotFound", emptyTagVector)
->setComment("Data products which should be missing from the job.");
ps.addUntracked<std::vector<edm::InputTag>>("inputTagsView", emptyTagVector)
->setComment("Data products to get via View<int>.");
ps.addUntracked<std::vector<edm::InputTag>>("inputTagsUInt64", emptyTagVector)
->setComment("Get these UInt64Product data products.");
ps.addUntracked<std::vector<edm::InputTag>>("inputTagsEndLumi", emptyTagVector);
ps.addUntracked<std::vector<edm::InputTag>>("inputTagsEndRun", emptyTagVector);
ps.addUntracked<std::vector<edm::InputTag>>("inputTagsBeginProcessBlock", emptyTagVector);
ps.addUntracked<std::vector<edm::InputTag>>("inputTagsInputProcessBlock", emptyTagVector);
ps.addUntracked<std::vector<edm::InputTag>>("inputTagsEndProcessBlock", emptyTagVector);
ps.addUntracked<std::vector<edm::InputTag>>("inputTagsEndProcessBlock2", emptyTagVector);
ps.addUntracked<std::vector<edm::InputTag>>("inputTagsEndProcessBlock3", emptyTagVector);
ps.addUntracked<std::vector<edm::InputTag>>("inputTagsEndProcessBlock4", emptyTagVector);

iDesc.addDefault(ps);
}

TestFindProduct::~TestFindProduct() {}

void TestFindProduct::analyze(edm::Event const& event, edm::EventSetup const&) {
Expand Down
48 changes: 40 additions & 8 deletions FWCore/Integration/plugins/TestParentage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ namespace edmtest {

void analyze(edm::StreamID, edm::Event const& e, edm::EventSetup const& es) const override;

static void fillDescriptions(edm::ConfigurationDescriptions& iDesc);

private:
edm::EDGetTokenT<IntProduct> token_;
std::vector<std::string> expectedAncestors_;
Expand All @@ -70,14 +72,31 @@ namespace edmtest {
expectedAncestors_(pset.getParameter<std::vector<std::string> >("expectedAncestors")),
callGetProvenance_(pset.getUntrackedParameter<bool>("callGetProvenance", true)) {}

void TestParentage::fillDescriptions(edm::ConfigurationDescriptions& iDesc) {
edm::ParameterSetDescription ps;
ps.add<edm::InputTag>("inputTag");
ps.add<std::vector<std::string> >("expectedAncestors")
->setComment(
"Module labels for data products directly/indirectly obtained to make data product retrieved using "
"'inputTag'.");
ps.addUntracked<bool>("callGetProvenance", true)
->setComment("Use Event::getProvenance to get ancestor provenance.");

iDesc.addDefault(ps);
}

void TestParentage::analyze(edm::StreamID, edm::Event const& e, edm::EventSetup const&) const {
edm::Handle<IntProduct> h = e.getHandle(token_);

*h;
edm::Provenance const* prov = h.provenance();

if (not prov) {
throw cms::Exception("MissingProvenance") << "Failed to get provenance for 'inputTag'";
}
if (prov->originalBranchID() != prov->branchDescription().originalBranchID()) {
std::cerr << "TestParentage::analyze: test of Provenance::originalBranchID function failed" << std::endl;
abort();
throw cms::Exception("InconsistentBranchID")
<< " test of Provenance::originalBranchID function failed. Expected "
<< prov->branchDescription().originalBranchID() << " but see " << prov->originalBranchID();
}

std::set<std::string> expectedAncestors(expectedAncestors_.begin(), expectedAncestors_.end());
Expand All @@ -91,6 +110,12 @@ namespace edmtest {
// Currently we need to turn off this part of the test of when calling
// from a SubProcess and the parentage includes a product not kept
// in the SubProcess. This might get fixed someday ...
auto toException = [](auto& ex, auto const& ancestors) {
for (auto const& a : ancestors) {
ex << a << ", ";
}
};

if (callGetProvenance_) {
std::set<edm::BranchID> ancestors;
getAncestors(e, prov->branchID(), ancestors);
Expand All @@ -100,8 +125,12 @@ namespace edmtest {
ancestorLabels.insert(branchIDToLabel[ancestor]);
}
if (ancestorLabels != expectedAncestors) {
std::cerr << "TestParentage::analyze: ancestors do not match expected ancestors" << std::endl;
abort();
cms::Exception ex("WrongAncestors");
ex << "ancestors from Event::getProvenance\n";
toException(ex, ancestorLabels);
ex << "\n do not match expected ancestors\n";
toException(ex, expectedAncestors);
throw ex;
}
}

Expand All @@ -114,9 +143,12 @@ namespace edmtest {
ancestorLabels2.insert(branchIDToLabel[ancestor]);
}
if (ancestorLabels2 != expectedAncestors) {
std::cerr << "TestParentage::analyze: ancestors do not match expected ancestors (parentage from retriever)"
<< std::endl;
abort();
cms::Exception ex("WrongAncestors");
ex << "ancestors from ParentageRetriever\n";
toException(ex, ancestorLabels2);
ex << "\n do not match expected ancestors\n";
toException(ex, expectedAncestors);
throw ex;
}
}
} // namespace edmtest
Expand Down
2 changes: 2 additions & 0 deletions FWCore/Integration/test/BuildFile.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@
<test name="TestFWCoreIntegrationTryToContinueESProducer" command="cmsRun ${LOCALTOP}/src/FWCore/Integration/test/test_TryToContinue_ESProducer_cfg.py"/>
<test name="TestFWCoreIntegrationTryToContinueESProducerContinue" command="cmsRun ${LOCALTOP}/src/FWCore/Integration/test/test_TryToContinue_ESProducer_cfg.py --continueAnalyzer"/>

<test name="TestFWCoreIntegrationInconsistentProducts" command="run_inconsistent_products.sh"/>

<test name="TestIntegrationProcessBlock1" command="run_TestProcessBlock.sh 1"/>
<test name="TestIntegrationProcessBlock2" command="run_TestProcessBlock.sh 2"/>
<test name="TestIntegrationProcessBlock3" command="run_TestProcessBlock.sh 3"/>
Expand Down
Loading

0 comments on commit 3885803

Please sign in to comment.