Skip to content

Commit

Permalink
NIFI-13800: When inheriting Registry Clients from a cluster's flow, a…
Browse files Browse the repository at this point in the history
…ny missing registry clients are removed at the end instead of the beginning of the synchronization logic. This allows those registry clients to still be referenced while performing synchronization. Also found that if a Processor is missing from cluster's flow but is running in local flow, on startup we get an error indicating that the processor does not belong to the associated process group so fixed that in tandem. Finally, noticed while verifying the fix that we check if the proposed flow is empty with a null check instead of using the isFlowEmpty() method - this could result in inheriting an empty flow from cluster even when a flow is loaded locally.
  • Loading branch information
markap14 committed Sep 24, 2024
1 parent 150cba8 commit 3e239d0
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1220,6 +1220,13 @@ public void trigger(final ComponentNode component) {
}

try {
// During flow synchronization, when inheriting a flow from cluster, it's possible that the component was removed.
final Connectable existingConnectable = connectable.getProcessGroup().getConnectable(connectable.getIdentifier());
if (existingConnectable == null) {
LOG.debug("Will not start {} because it no longer exists", connectable);
continue;
}

if (connectable instanceof ProcessorNode) {
connectable.getProcessGroup().startProcessor((ProcessorNode) connectable, true);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,11 @@ public synchronized void sync(final FlowController controller, final DataFlow pr
final ProcessGroup root = flowManager.getRootGroup();

// handle corner cases involving no proposed flow
if (proposedFlow == null) {
if (isFlowEmpty(proposedFlow)) {
if (root.isEmpty()) {
return; // no sync to perform
} else {
throw new UninheritableFlowException("Proposed configuration is empty, but the controller contains a data flow.");
throw new UninheritableFlowException("Attempted to inherit an empty flow, but this NiFi instance already has a flow loaded.");
}
}

Expand Down Expand Up @@ -455,6 +455,7 @@ private void synchronizeFlow(final FlowController controller, final DataFlow exi
inheritAuthorizations(existingFlow, proposedFlow, controller);

removeMissingParameterContexts(controller, versionedFlow);
removeMissingRegistryClients(controller.getFlowManager(), versionedFlow);
} catch (final Exception ex) {
throw new FlowSynchronizationException(ex);
}
Expand Down Expand Up @@ -539,9 +540,7 @@ private AffectedComponentSet determineAffectedComponents(final FlowComparison fl
private void inheritRegistryClients(final FlowController controller, final VersionedDataflow dataflow, final AffectedComponentSet affectedComponentSet) {
final FlowManager flowManager = controller.getFlowManager();

final Set<String> versionedClientIds = new HashSet<>();
for (final VersionedFlowRegistryClient versionedFlowRegistryClient : dataflow.getRegistries()) {
versionedClientIds.add(versionedFlowRegistryClient.getInstanceIdentifier());
final FlowRegistryClientNode existing = flowManager.getFlowRegistryClient(versionedFlowRegistryClient.getIdentifier());

if (existing == null) {
Expand All @@ -550,6 +549,14 @@ private void inheritRegistryClients(final FlowController controller, final Versi
updateRegistry(existing, versionedFlowRegistryClient, controller);
}
}
}

private void removeMissingRegistryClients(final FlowManager flowManager, final VersionedDataflow dataflow) {
final List<VersionedFlowRegistryClient> versionedRegistryClients = dataflow == null ? null : dataflow.getRegistries();
final Set<String> versionedClientIds = versionedRegistryClients == null ? Set.of() :
versionedRegistryClients.stream()
.map(VersionedFlowRegistryClient::getInstanceIdentifier)
.collect(Collectors.toSet());

for (final FlowRegistryClientNode clientNode : flowManager.getAllFlowRegistryClients()) {
if (!versionedClientIds.contains(clientNode.getIdentifier())) {
Expand Down

0 comments on commit 3e239d0

Please sign in to comment.