Skip to content

Commit

Permalink
NIFI-13811: Fix CalculateRecordStats so that it can properly handle a…
Browse files Browse the repository at this point in the history
…rrays of nested records; clarified documentation / code cleanup
  • Loading branch information
markap14 committed Sep 27, 2024
1 parent 0c4d14f commit f4c6b38
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,31 +33,39 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;

@Tags({ "record", "stats", "metrics" })
@CapabilityDescription("A processor that can count the number of items in a record set, as well as provide counts based on " +
"user-defined criteria on subsets of the record set.")
@CapabilityDescription("Counts the number of Records in a record set, optionally counting the number of elements per category, where the categories are " +
"defined by user-defined properties.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@DynamicProperty(name = "Record Path property", value = "The Record Path value",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
description = "A Record Path value, pointing to a field to be counted")
@DynamicProperty(
name = "The name of the category. For example, sport",
value = "The RecordPath that points to the value of the category. For example /sport",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
description = "Specifies a category that should be counted. For example, if the property name is 'sport' and the value is '/sport', " +
"the processor will count how many records have a value of 'soccer' for the /sport field, how many have a value of 'baseball' for the /sport, " +
"and so on. These counts be added as attributes named recordStats.sport.soccer, recordStats.sport.baseball.")

@WritesAttributes({
@WritesAttribute(attribute = CalculateRecordStats.RECORD_COUNT_ATTR, description = "A count of the records in the record set in the FlowFile."),
Expand Down Expand Up @@ -93,11 +101,11 @@ public class CalculateRecordStats extends AbstractProcessor {

static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("If a flowfile is successfully processed, it goes here.")
.description("All FlowFiles that are successfully processed, are routed to this Relationship.")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("If a flowfile fails to be processed, it goes here.")
.description("If a FlowFile cannot be processed for any reason, it is routed to this Relationship.")
.build();

static final Set<Relationship> RELATIONSHIPS = Set.of(
Expand All @@ -112,7 +120,7 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String
.name(propertyDescriptorName)
.displayName(propertyDescriptorName)
.dynamic(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.addValidator(new RecordPathValidator())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
}
Expand All @@ -133,104 +141,106 @@ public Set<Relationship> getRelationships() {

@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile input = session.get();
if (input == null) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}

try {
Map<String, RecordPath> paths = getRecordPaths(context, input);
Map<String, String> stats = getStats(input, paths, context, session);

input = session.putAllAttributes(input, stats);

session.transfer(input, REL_SUCCESS);

} catch (Exception ex) {
getLogger().error("Error processing stats.", ex);
session.transfer(input, REL_FAILURE);
final Map<String, RecordPath> recordPath = getRecordPaths(context, flowFile);
final Map<String, String> stats = calculateStats(flowFile, recordPath, context, session);

flowFile = session.putAllAttributes(flowFile, stats);
session.transfer(flowFile, REL_SUCCESS);
} catch (final Exception ex) {
getLogger().error("Failed to process stats for {}", flowFile, ex);
session.transfer(flowFile, REL_FAILURE);
}

}

protected Map<String, RecordPath> getRecordPaths(ProcessContext context, FlowFile flowFile) {
protected Map<String, RecordPath> getRecordPaths(final ProcessContext context, final FlowFile flowFile) {
return context.getProperties().keySet()
.stream().filter(PropertyDescriptor::isDynamic)
.collect(Collectors.toMap(
e -> e.getName(),
e -> {
String val = context.getProperty(e).evaluateAttributeExpressions(flowFile).getValue();
PropertyDescriptor::getName,
propertyName -> {
final String val = context.getProperty(propertyName).evaluateAttributeExpressions(flowFile).getValue();
return cache.getCompiled(val);
})
);
}

protected Map<String, String> getStats(FlowFile flowFile, Map<String, RecordPath> paths, ProcessContext context, ProcessSession session) {
try (InputStream is = session.read(flowFile)) {
RecordReaderFactory factory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final Integer limit = context.getProperty(LIMIT).evaluateAttributeExpressions(flowFile).asInteger();
RecordReader reader = factory.createRecordReader(flowFile, is, getLogger());
protected Map<String, String> calculateStats(final FlowFile flowFile, final Map<String, RecordPath> paths, final ProcessContext context, final ProcessSession session)
throws IOException, SchemaNotFoundException, MalformedRecordException {

Map<String, Integer> retVal = new HashMap<>();
try (final InputStream is = session.read(flowFile)) {
final RecordReaderFactory factory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final int limit = context.getProperty(LIMIT).evaluateAttributeExpressions(flowFile).asInteger();
final RecordReader reader = factory.createRecordReader(flowFile, is, getLogger());

final Map<String, Integer> stats = new HashMap<>();
Record record;

int recordCount = 0;
List<String> baseKeys = new ArrayList<>();
final Set<String> baseKeys = new LinkedHashSet<>();
while ((record = reader.nextRecord()) != null) {
for (Map.Entry<String, RecordPath> entry : paths.entrySet()) {
RecordPathResult result = entry.getValue().evaluate(record);
Optional<FieldValue> value = result.getSelectedFields().findFirst();
if (value.isPresent() && value.get().getValue() != null) {
String approxValue = value.get().getValue().toString();
String baseKey = String.format("recordStats.%s", entry.getKey());
String key = String.format("%s.%s", baseKey, approxValue);
Integer stat = retVal.getOrDefault(key, 0);
Integer baseStat = retVal.getOrDefault(baseKey, 0);
stat++;
baseStat++;

retVal.put(key, stat);
retVal.put(baseKey, baseStat);

if (!baseKeys.contains(baseKey)) {
baseKeys.add(baseKey);
for (final Map.Entry<String, RecordPath> entry : paths.entrySet()) {
final RecordPathResult result = entry.getValue().evaluate(record);

result.getSelectedFields().forEach(selectedField -> {
final Object selectedValue = selectedField.getValue();
final String approxValue = selectedValue == null ? "<null>" : selectedValue.toString();
final String baseKey = "recordStats." + entry.getKey();
final String key = baseKey + "." + approxValue;
final int stat = stats.getOrDefault(key, 0);
final int baseStat = stats.getOrDefault(baseKey, 0);

stats.put(key, stat + 1);
if (selectedValue != null) {
stats.put(baseKey, baseStat + 1);
}
}

baseKeys.add(baseKey);
});
}

recordCount++;
}

retVal = filterBySize(retVal, limit, baseKeys);

retVal.put(RECORD_COUNT_ATTR, recordCount);
final Map<String, Integer> limited = filterBySize(stats, limit, baseKeys);
limited.put(RECORD_COUNT_ATTR, recordCount);

return retVal.entrySet().stream()
.collect(Collectors.toMap(
e -> e.getKey(),
e -> e.getValue().toString()
));
} catch (Exception e) {
getLogger().error("Could not read flowfile", e);
throw new ProcessException(e);
return limited.entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().toString()));
}
}

protected Map filterBySize(Map<String, Integer> values, Integer limit, List<String> baseKeys) {
Map<String, Integer> toFilter = values.entrySet().stream()
protected Map<String, Integer> filterBySize(final Map<String, Integer> values, final int limit, final Collection<String> baseKeys) {
if (values.size() <= limit) {
return values;
}

final Map<String, Integer> toFilter = values.entrySet().stream()
.filter(e -> !baseKeys.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Map<String, Integer> retVal = values.entrySet().stream()
.filter((e -> baseKeys.contains(e.getKey())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

List<Map.Entry<String, Integer>> _flat = new ArrayList<>(toFilter.entrySet());
_flat.sort(Map.Entry.comparingByValue());
Collections.reverse(_flat);
for (int index = 0; index < _flat.size() && index < limit; index++) {
retVal.put(_flat.get(index).getKey(), _flat.get(index).getValue());
List<Map.Entry<String, Integer>> entryList = new ArrayList<>(toFilter.entrySet());
entryList.sort(Map.Entry.comparingByValue());
entryList = entryList.reversed();
final List<Map.Entry<String, Integer>> topEntries = entryList.subList(0, limit);

final Map<String, Integer> limitedValues = new HashMap<>();
// Add any element that is in the baseKeys
values.forEach((k, v) -> {
if (baseKeys.contains(k)) {
limitedValues.put(k, v);
}
});

for (final Map.Entry<String, Integer> entry : topEntries) {
limitedValues.put(entry.getKey(), entry.getValue());
}

return retVal;
return limitedValues;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@

import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.SimpleRecordSchema;

import org.apache.nifi.serialization.record.ArrayListRecordReader;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
Expand All @@ -42,9 +43,9 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;

public class TestCalculateRecordStats {
TestRunner runner;
MockRecordParser recordParser;
RecordSchema personSchema;
private TestRunner runner;
private MockRecordParser recordParser;
private RecordSchema personSchema;

@BeforeEach
void setup() throws InitializationException {
Expand All @@ -55,7 +56,6 @@ void setup() throws InitializationException {
runner.enableControllerService(recordParser);
runner.assertValid();

recordParser.addSchemaField("id", RecordFieldType.INT);
List<RecordField> personFields = new ArrayList<>();
RecordField nameField = new RecordField("name", RecordFieldType.STRING.getDataType());
RecordField ageField = new RecordField("age", RecordFieldType.INT.getDataType());
Expand All @@ -64,9 +64,72 @@ void setup() throws InitializationException {
personFields.add(ageField);
personFields.add(sportField);
personSchema = new SimpleRecordSchema(personFields);

recordParser.addSchemaField("id", RecordFieldType.INT);
recordParser.addSchemaField("person", RecordFieldType.RECORD);
}

@Test
public void testWithArray() throws InitializationException {
// Create a Record that has an array of records
final List<RecordField> issueFields = new ArrayList<>();
issueFields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
issueFields.add(new RecordField("severity", RecordFieldType.STRING.getDataType()));
issueFields.add(new RecordField("description", RecordFieldType.STRING.getDataType()));
final RecordSchema issueSchema = new SimpleRecordSchema(issueFields);

final List<RecordField> issuesFields = new ArrayList<>();
issuesFields.add(new RecordField("issues", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(issueSchema))));
final RecordSchema issuesSchema = new SimpleRecordSchema(issuesFields);

final List<Record> issueList = new ArrayList<>();
issueList.add(new MapRecord(issueSchema, Map.of(
"id", "1",
"severity", "High",
"description", "This is a high severity issue"
)));
issueList.add(new MapRecord(issueSchema, Map.of(
"id", "2",
"severity", "Medium",
"description", "This is a medium severity issue"
)));
issueList.add(new MapRecord(issueSchema, Map.of(
"id", "3",
"severity", "Low",
"description", "This is a low severity issue"
)));
issueList.add(new MapRecord(issueSchema, Map.of(
"id", "",
"severity", "High",
"description", "This is another high severity issue"
)));

final Record[] issues = issueList.toArray(new Record[0]);
final Record issuesRecord = new MapRecord(issuesSchema, Map.of(
"issues", issues
));

// Set RecordReader to one that can properly handle nested records / arrays.
final ArrayListRecordReader readerFactory = new ArrayListRecordReader(issuesSchema);
runner.addControllerService("readerFactory", readerFactory);
runner.enableControllerService(readerFactory);
runner.setProperty(CalculateRecordStats.RECORD_READER, "readerFactory");
readerFactory.addRecord(issuesRecord);

// Set the RecordPath to point to the 'severity' field of the record within the array.
runner.setProperty("severity", "/issues[*]/severity");
runner.enqueue("");
runner.run();

runner.assertTransferCount(CalculateRecordStats.REL_SUCCESS, 1);
final MockFlowFile output = runner.getFlowFilesForRelationship(CalculateRecordStats.REL_SUCCESS).getFirst();
output.assertAttributeEquals("recordStats.severity.High", "2");
output.assertAttributeEquals("recordStats.severity.Medium", "1");
output.assertAttributeEquals("recordStats.severity.Low", "1");
output.assertAttributeEquals("recordStats.severity", "4");
output.assertAttributeEquals("record.count", "1");
}

@Test
void testNoNullOrEmptyRecordFields() {
final List<String> sports = Arrays.asList("Soccer", "Soccer", "Soccer", "Football", "Football", "Basketball");
Expand Down Expand Up @@ -120,9 +183,9 @@ void testWithSizeLimit() {
expectedAttributes.put("recordStats.sport", String.valueOf(sports.size()));
expectedAttributes.put("record.count", String.valueOf(sports.size()));

final Map<String, String> propz = Collections.singletonMap("sport", "/person/sport");
final Map<String, String> counts = Collections.singletonMap("sport", "/person/sport");

commonTest(propz, sports, expectedAttributes);
commonTest(counts, sports, expectedAttributes);
}

private void commonTest(Map<String, String> procProperties, List<String> sports, Map<String, String> expectedAttributes) {
Expand All @@ -145,12 +208,12 @@ private void commonTest(Map<String, String> procProperties, List<String> sports,
runner.assertTransferCount(CalculateRecordStats.REL_SUCCESS, 1);

final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(CalculateRecordStats.REL_SUCCESS);
final MockFlowFile ff = flowFiles.get(0);
final MockFlowFile ff = flowFiles.getFirst();
for (final Map.Entry<String, String> expectedAttribute : expectedAttributes.entrySet()) {
final String key = expectedAttribute.getKey();
final String value = expectedAttribute.getValue();
assertNotNull(ff.getAttribute(key), String.format("Missing %s", key));
assertEquals(value, ff.getAttribute(key));
assertEquals(value, ff.getAttribute(key), "Expected " + value + " for " + key);
}
}
}

0 comments on commit f4c6b38

Please sign in to comment.