Skip to content

Commit

Permalink
NIFI-13324: In case a Python Processor routes a FlowFile to failure a…
Browse files Browse the repository at this point in the history
…nd returns attributes, add those attributes to the 'original' FlowFile before routing to 'failure'
  • Loading branch information
markap14 committed Jun 10, 2024
1 parent a56572d commit 3d27aab
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,18 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
try {
final String relationshipName = result.getRelationship();
final Relationship relationship = new Relationship.Builder().name(relationshipName).build();
final Map<String, String> attributes = result.getAttributes();

if (REL_FAILURE.getName().equals(relationshipName)) {
session.remove(transformed);
if (attributes != null) {
original = session.putAllAttributes(original, attributes);
}

session.transfer(original, REL_FAILURE);
return;
}

final Map<String, String> attributes = result.getAttributes();
if (attributes != null) {
transformed = session.putAllAttributes(transformed, attributes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,22 @@ private void writeResult(final RecordTransformResult result, final Map<RecordGro
final FlowFile destinationFlowFile = session.create(originalFlowFile);

final RecordSetWriter writer;
OutputStream out = null;
try {
final OutputStream out = session.write(destinationFlowFile);
out = session.write(destinationFlowFile);
final Map<String, String> originalAttributes = originalFlowFile.getAttributes();
final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, transformed.getSchema());
writer = writerFactory.createWriter(getLogger(), writeSchema, out, originalAttributes);
writer.beginRecordSet();
} catch (final Exception e) {
// If we failed to create the RecordSetWriter, ensure that we close the Output Stream
if (out != null) {
try {
out.close();
} catch (final IOException ignore) {
}
}

session.remove(destinationFlowFile);
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@
import org.apache.nifi.python.PythonProcessConfig;
import org.apache.nifi.python.PythonProcessorDetails;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
Expand All @@ -52,7 +48,6 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -559,16 +554,18 @@ public void testProcessRestarted() {
runner.assertTransferCount("success", 7);
}

private RecordSchema createSimpleRecordSchema(final List<String> fieldNames) {
final List<RecordField> recordFields = new ArrayList<>();
for (final String fieldName : fieldNames) {
recordFields.add(new RecordField(fieldName, RecordFieldType.STRING.getDataType(), true));
}

final RecordSchema schema = new SimpleRecordSchema(recordFields);
return schema;
}
@Test
public void testRouteToFailureWithAttributes() {
final TestRunner runner = createFlowFileTransform("FailWithAttributes");
runner.enqueue("Hello, World");
runner.run();

runner.assertAllFlowFilesTransferred("failure", 1);
final MockFlowFile out = runner.getFlowFilesForRelationship("failure").getFirst();
out.assertAttributeEquals("number", "1");
out.assertAttributeEquals("failureReason", "Intentional failure of unit test");
}

public interface StringLookupService extends ControllerService {
Optional<String> lookup(Map<String, String> coordinates);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult

class FailWithAttributes(FlowFileTransform):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileTransform']
class ProcessorDetails:
version = '0.0.1-SNAPSHOT'
description = 'Routes a FlowFile to failure and adds attributes to it.'

def __init__(self, **kwargs):
pass

def transform(self, context, flowFile):
return FlowFileTransformResult(relationship="failure", attributes={"number": "1", "failureReason": "Intentional failure of unit test"})

0 comments on commit 3d27aab

Please sign in to comment.