Skip to content

Commit

Permalink
NIFI-13929: Removed Provenance Repository from the stateless Reposito…
Browse files Browse the repository at this point in the history
…ryContextFactory and added it to the DataflowTriggerContext. This was necessary because the previous design overlooked the possibility of many threads concurrently running the same dataflow. They all shared the same StatelessProvenanceRepository, but the code was designed as if only a single thread would be using the repository. As a result, the events that were registered with the stateless prov repo were being copied many times into NiFi's underlying provenance repository. This refactoring also led to the discovery of some old Java 8 syntax that could be cleaned up, and it led to the discovery of some methods that were no longer being used and could be cleaned up. Finally, in testing, I found that when a Stateless Group was scheduled, it scheduled the triggering of the stateless group before marking the state as RUNNING; as a result, the second thread could run, determine that the state is STARTING instead of RUNNING, and return without triggering the stateless group. This was addressed by ensuring that we set the state to RUNNING before triggering the stateless group to be triggered.
  • Loading branch information
markap14 committed Oct 24, 2024
1 parent 3521905 commit 7dd18a6
Show file tree
Hide file tree
Showing 19 changed files with 464 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,23 +217,15 @@ public boolean isRelationshipAvailabilitySatisfied(final int requiredNumber) {
}

protected String getProvenanceComponentDescription() {
switch (connectable.getConnectableType()) {
case PROCESSOR:
final ProcessorNode procNode = (ProcessorNode) connectable;
return procNode.getComponentType();
case INPUT_PORT:
return "Input Port";
case OUTPUT_PORT:
return "Output Port";
case REMOTE_INPUT_PORT:
return ProvenanceEventRecord.REMOTE_INPUT_PORT_TYPE;
case REMOTE_OUTPUT_PORT:
return ProvenanceEventRecord.REMOTE_OUTPUT_PORT_TYPE;
case FUNNEL:
return "Funnel";
default:
throw new AssertionError("Connectable type is " + connectable.getConnectableType());
}
return switch (connectable.getConnectableType()) {
case PROCESSOR -> ((ProcessorNode) connectable).getComponentType();
case INPUT_PORT -> "Input Port";
case OUTPUT_PORT -> "Output Port";
case REMOTE_INPUT_PORT -> ProvenanceEventRecord.REMOTE_INPUT_PORT_TYPE;
case REMOTE_OUTPUT_PORT -> ProvenanceEventRecord.REMOTE_OUTPUT_PORT_TYPE;
case FUNNEL -> "Funnel";
default -> throw new AssertionError("Connectable type is " + connectable.getConnectableType());
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.parameter.ParameterContextManager;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.registry.flow.mapping.ComponentIdLookup;
import org.apache.nifi.registry.flow.mapping.FlowMappingOptions;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
Expand Down Expand Up @@ -239,7 +238,7 @@ public Future<Set<Bundle>> fetch(final Set<BundleCoordinate> bundleCoordinates,
.extensionRepository(extensionRepository)
.flowFileEventRepository(flowFileEventRepository)
.processScheduler(statelessScheduler)
.provenanceRepository((ProvenanceRepository) statelessRepositoryContextFactory.getProvenanceRepository())
.provenanceRepository(flowController.getProvenanceRepository())
.stateManagerProvider(stateManagerProvider)
.kerberosConfiguration(kerberosConfig)
.statusTaskInterval(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.nifi.stateless.flow.FlowFileSupplier;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.flow.TriggerResult;
import org.apache.nifi.stateless.repository.StatelessProvenanceRepository;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -83,7 +84,6 @@ public class StatelessFlowTask {
private final boolean allowBatch;

// State that is updated during invocation - these variables are guarded by synchronized block
private Long maxProvenanceEventId;
private List<FlowFileCloneResult> cloneResults;
private List<RepositoryRecord> outputRepositoryRecords;
private List<ProvenanceEventRecord> cloneProvenanceEvents;
Expand Down Expand Up @@ -167,8 +167,7 @@ public synchronized void trigger() {
final List<Invocation> allInvocations = new ArrayList<>();
final List<Invocation> successfulInvocations = new ArrayList<>();

final ProvenanceEventRepository statelessProvRepo = flow.getProvenanceRepository();
maxProvenanceEventId = statelessProvRepo.getMaxEventId();
final ProvenanceEventRepository statelessProvRepo = new StatelessProvenanceRepository(10_000);

try {
int invocationCount = 0;
Expand All @@ -177,7 +176,7 @@ public synchronized void trigger() {

final Invocation invocation = new Invocation();
final FlowFileSupplier flowFileSupplier = new BridgingFlowFileSupplier(invocation);
final DataflowTriggerContext triggerContext = new StatelessFlowTaskTriggerContext(flowFileSupplier);
final DataflowTriggerContext triggerContext = new StatelessFlowTaskTriggerContext(flowFileSupplier, statelessProvRepo);

final TriggerResult triggerResult = triggerFlow(triggerContext);
invocation.setTriggerResult(triggerResult);
Expand All @@ -196,19 +195,19 @@ public synchronized void trigger() {
}
} else {
logger.debug("Failed to trigger", triggerResult.getFailureCause().orElse(null));
fail(invocation);
fail(invocation, statelessProvRepo);
break;
}
}

logger.debug("Finished triggering");
} finally {
try {
completeInvocations(successfulInvocations);
completeInvocations(successfulInvocations, statelessProvRepo);
} catch (final Exception e) {
logger.error("Failed to complete Stateless Flow", e);
statelessGroupNode.yield();
fail(successfulInvocations, e);
fail(successfulInvocations, statelessProvRepo, e);
}

logger.debug("Acknowledging FlowFiles from {} invocations", allInvocations.size());
Expand All @@ -221,26 +220,26 @@ public synchronized void trigger() {
}


private void fail(final List<Invocation> invocations, final Throwable cause) {
invocations.forEach(invocation -> fail(invocation, cause));
private void fail(final List<Invocation> invocations, final ProvenanceEventRepository statelessProvRepo, final Throwable cause) {
invocations.forEach(invocation -> fail(invocation, statelessProvRepo, cause));
}

private void fail(final Invocation invocation) {
private void fail(final Invocation invocation, final ProvenanceEventRepository statelessProvRepo) {
final Throwable cause;
if (invocation.getTriggerResult().isCanceled()) {
cause = new TerminatedTaskException();
} else {
cause = invocation.getTriggerResult().getFailureCause().orElse(null);
}

fail(invocation, cause);
fail(invocation, statelessProvRepo, cause);
}

private void fail(final Invocation invocation, final Throwable cause) {
private void fail(final Invocation invocation, final ProvenanceEventRepository statelessProvRepo, final Throwable cause) {
final Port destinationPort = getDestinationPort(cause);

try {
failInvocation(invocation, destinationPort, cause);
failInvocation(invocation, statelessProvRepo, destinationPort, cause);
} catch (final Exception e) {
if (cause != null) {
cause.addSuppressed(e);
Expand All @@ -251,11 +250,10 @@ private void fail(final Invocation invocation, final Throwable cause) {
}

private Port getDestinationPort(final Throwable failureCause) {
if (!(failureCause instanceof FailurePortEncounteredException)) {
if (!(failureCause instanceof final FailurePortEncounteredException fpee)) {
return null;
}

final FailurePortEncounteredException fpee = (FailurePortEncounteredException) failureCause;
final Port port = this.outputPorts.get(fpee.getPortName());
if (port == null) {
logger.error("FlowFile was routed to Failure Port {} but no such port exists in the dataflow", fpee.getPortName());
Expand Down Expand Up @@ -283,7 +281,7 @@ private TriggerResult triggerFlow(final DataflowTriggerContext triggerContext) {
}


private void completeInvocations(final List<Invocation> invocations) throws IOException {
private void completeInvocations(final List<Invocation> invocations, final ProvenanceEventRepository statelessProvRepo) throws IOException {
logger.debug("Completing transactions from {} invocations", invocations.size());
if (invocations.isEmpty()) {
return;
Expand Down Expand Up @@ -318,7 +316,7 @@ private void completeInvocations(final List<Invocation> invocations) throws IOEx
throw new IOException("Failed to update FlowFile Repository after triggering " + this, e);
}

updateProvenanceRepository(event -> true);
updateProvenanceRepository(statelessProvRepo, event -> true);

// Acknowledge the invocations so that the sessions can be committed
for (final Invocation invocation : invocations) {
Expand All @@ -337,7 +335,7 @@ void resetState() {
cloneProvenanceEvents = new ArrayList<>();
}

private void failInvocation(final Invocation invocation, final Port destinationPort, final Throwable cause) throws IOException {
private void failInvocation(final Invocation invocation, final ProvenanceEventRepository statelessProvRepo, final Port destinationPort, final Throwable cause) throws IOException {
final List<PolledFlowFile> inputFlowFiles = invocation.getPolledFlowFiles();

boolean stopped = false;
Expand Down Expand Up @@ -401,7 +399,7 @@ private void failInvocation(final Invocation invocation, final Port destinationP
throw new IOException("Failed to update FlowFile Repository after triggering " + this, e);
}

updateProvenanceRepository(event -> eventTypesToKeepOnFailure.contains(event.getEventType()));
updateProvenanceRepository(statelessProvRepo, event -> eventTypesToKeepOnFailure.contains(event.getEventType()));

// Acknowledge the invocations so that the sessions can be committed
abort(invocation, cause);
Expand Down Expand Up @@ -474,17 +472,16 @@ void createOutputRecords(final Map<String, List<FlowFile>> outputFlowFiles) {
}
}

void updateProvenanceRepository(final Predicate<ProvenanceEventRecord> eventFilter) {
long firstProvEventId = (maxProvenanceEventId == null) ? 0 : (maxProvenanceEventId + 1);
final ProvenanceEventRepository statelessProvRepo = flow.getProvenanceRepository();
void updateProvenanceRepository(final ProvenanceEventRepository statelessRepo, final Predicate<ProvenanceEventRecord> eventFilter) {
long firstProvEventId = 0;

if (!cloneProvenanceEvents.isEmpty()) {
nifiProvenanceEventRepository.registerEvents(cloneProvenanceEvents);
}

while (true) {
try {
final List<ProvenanceEventRecord> statelessProvEvents = statelessProvRepo.getEvents(firstProvEventId, 1000);
final List<ProvenanceEventRecord> statelessProvEvents = statelessRepo.getEvents(firstProvEventId, 1000);
if (statelessProvEvents.isEmpty()) {
return;
}
Expand All @@ -494,7 +491,7 @@ void updateProvenanceRepository(final Predicate<ProvenanceEventRecord> eventFilt
// copy the Event ID.
final List<ProvenanceEventRecord> provenanceEvents = new ArrayList<>();
for (final ProvenanceEventRecord eventRecord : statelessProvEvents) {
if (eventFilter.test(eventRecord) == false) {
if (!eventFilter.test(eventRecord)) {
continue;
}

Expand Down Expand Up @@ -731,7 +728,7 @@ public void setTriggerResult(final TriggerResult triggerResult) {

public List<PolledFlowFile> getPolledFlowFiles() {
if (polledFlowFiles == null) {
return Collections.emptyList();
return List.of();
}

return polledFlowFiles;
Expand Down Expand Up @@ -880,9 +877,11 @@ public Optional<FlowFile> getFlowFile(final String portName) {

private class StatelessFlowTaskTriggerContext implements DataflowTriggerContext {
private final FlowFileSupplier flowFileSupplier;
private final ProvenanceEventRepository statelessProvRepo;

public StatelessFlowTaskTriggerContext(final FlowFileSupplier flowFileSupplier) {
public StatelessFlowTaskTriggerContext(final FlowFileSupplier flowFileSupplier, final ProvenanceEventRepository statelessProvRepo) {
this.flowFileSupplier = flowFileSupplier;
this.statelessProvRepo = statelessProvRepo;
}

@Override
Expand All @@ -895,5 +894,9 @@ public FlowFileSupplier getFlowFileSupplier() {
return flowFileSupplier;
}

@Override
public ProvenanceEventRepository getProvenanceEventRepository() {
return statelessProvRepo;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,9 @@ private void initialize(final ScheduledExecutorService executor, final Schedulin
writeLock.lock();
try {
if (desiredState == ScheduledState.RUNNING) {
schedulingAgentCallback.trigger();
logger.info("{} has been started", this);
currentState = ScheduledState.RUNNING;
schedulingAgentCallback.trigger();
} else {
logger.info("{} completed setup but is no longer scheduled to run; desired state is now {}; will shutdown", this, desiredState);
shutdown = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class TestStatelessFlowTask {
private List<ProvenanceEventRecord> registeredProvenanceEvents;
private List<ProvenanceEventRecord> statelessProvenanceEvents;
private Map<String, StandardFlowFileEvent> flowFileEventsByComponentId;
private ProvenanceEventRepository statelessProvRepo;

@BeforeEach
public void setup() throws IOException {
Expand All @@ -110,7 +111,7 @@ public void setup() throws IOException {
rootGroup.addOutputPort(secondOutputPort);

statelessProvenanceEvents = new ArrayList<>();
final ProvenanceEventRepository statelessProvRepo = mock(ProvenanceEventRepository.class);
statelessProvRepo = mock(ProvenanceEventRepository.class);
doAnswer(invocation -> statelessProvenanceEvents.size() - 1).when(statelessProvRepo).getMaxEventId();
doAnswer(invocation -> {
final long startEventId = invocation.getArgument(0, Long.class);
Expand All @@ -123,7 +124,6 @@ public void setup() throws IOException {
}).when(statelessProvRepo).getEvents(anyLong(), anyInt());

final StatelessDataflow statelessFlow = mock(StatelessDataflow.class);
when(statelessFlow.getProvenanceRepository()).thenReturn(statelessProvRepo);

final StatelessGroupNode statelessGroupNode = mock(StatelessGroupNode.class);
when(statelessGroupNode.getProcessGroup()).thenReturn(rootGroup);
Expand Down Expand Up @@ -423,7 +423,7 @@ public void testProvenanceEventsCopiedFromStatelessFlow() {
statelessProvenanceEvents.add(event);
}

task.updateProvenanceRepository(event -> true);
task.updateProvenanceRepository(statelessProvRepo, event -> true);

assertEquals(statelessProvenanceEvents, registeredProvenanceEvents);
for (final ProvenanceEventRecord eventRecord : registeredProvenanceEvents) {
Expand All @@ -446,7 +446,7 @@ public void testMoreThan1000ProvenanceEventsCopied() {
statelessProvenanceEvents.add(event);
}

task.updateProvenanceRepository(event -> true);
task.updateProvenanceRepository(statelessProvRepo, event -> true);

assertEquals(statelessProvenanceEvents, registeredProvenanceEvents);
for (final ProvenanceEventRecord eventRecord : registeredProvenanceEvents) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.nifi.stateless.flow;

import org.apache.nifi.provenance.ProvenanceEventRepository;

public interface DataflowTriggerContext {

/**
* Provides a mechanism by which the triggering class can abort a dataflow
* @return <code>true</code> if the dataflow should be aborted, <code>false</code> otherwise
Expand All @@ -28,13 +31,22 @@ default FlowFileSupplier getFlowFileSupplier() {
return null;
}

ProvenanceEventRepository getProvenanceEventRepository();

/**
* The implicit context that will be used if no other context is provided when triggering a dataflow
*/
DataflowTriggerContext IMPLICIT_CONTEXT = new DataflowTriggerContext() {
private final ProvenanceEventRepository eventRepo = new NopProvenanceEventRepository();

@Override
public boolean isAbort() {
return false;
}

@Override
public ProvenanceEventRepository getProvenanceEventRepository() {
return eventRepo;
}
};
}
Loading

0 comments on commit 7dd18a6

Please sign in to comment.