From 989a29afbe494696855922624a44ab7801ce2543 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 9 Feb 2024 16:58:09 -0500 Subject: [PATCH] NIFI-12773: Added join and anchored RecordPath function --- .../nifi/record/path/functions/Anchored.java | 80 ++++++++++++++++++ .../nifi/record/path/functions/Join.java | 84 +++++++++++++++++++ .../record/path/paths/RecordPathCompiler.java | 24 ++++++ .../nifi/record/path/TestRecordPath.java | 83 ++++++++++++++++++ .../src/main/asciidoc/record-path-guide.adoc | 42 ++++++++++ 5 files changed, 313 insertions(+) create mode 100644 nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Anchored.java create mode 100644 nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Join.java diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Anchored.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Anchored.java new file mode 100644 index 0000000000000..12cc9739f0e26 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Anchored.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.functions; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.StandardRecordPathEvaluationContext; +import org.apache.nifi.record.path.paths.RecordPathSegment; +import org.apache.nifi.serialization.record.Record; + +import java.util.Arrays; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +public class Anchored extends RecordPathSegment { + + private final RecordPathSegment anchorPath; + private final RecordPathSegment evaluationPath; + + public Anchored(final RecordPathSegment anchorPath, final RecordPathSegment evaluationPath, final boolean absolute) { + super("anchored", null, absolute); + + this.anchorPath = anchorPath; + this.evaluationPath = evaluationPath; + } + + + @Override + public Stream evaluate(final RecordPathEvaluationContext context) { + final Stream anchoredStream = anchorPath.evaluate(context); + + return anchoredStream.flatMap(fv -> { + final Object value = fv.getValue(); + return evaluate(value); + }); + } + + private Stream evaluate(final Object value) { + if (value == null) { + return Stream.of(); + } + if (value instanceof Record) { + final RecordPathEvaluationContext recordPathEvaluateContext = new StandardRecordPathEvaluationContext((Record) value); + return evaluationPath.evaluate(recordPathEvaluateContext); + } + if (value instanceof final Record[] array) { + return Arrays.stream(array).flatMap(element -> { + final RecordPathEvaluationContext recordPathEvaluateContext = new StandardRecordPathEvaluationContext(element); + return evaluationPath.evaluate(recordPathEvaluateContext); + }); + } + if (value instanceof final Iterable iterable) { + return StreamSupport.stream(iterable.spliterator(), false).flatMap(element -> { + if (!(element instanceof Record)) { + return Stream.of(); + } + + final RecordPathEvaluationContext recordPathEvaluateContext = new StandardRecordPathEvaluationContext((Record) element); + return evaluationPath.evaluate(recordPathEvaluateContext); + }); + } + + return Stream.of(); + } +} diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Join.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Join.java new file mode 100644 index 0000000000000..da9cee488fac4 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Join.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.functions; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.StandardFieldValue; +import org.apache.nifi.record.path.paths.RecordPathSegment; +import org.apache.nifi.record.path.util.RecordPathUtils; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Stream; + +public class Join extends RecordPathSegment { + private final RecordPathSegment delimiterPath; + private final RecordPathSegment[] valuePaths; + + public Join(final RecordPathSegment delimiterPath, final RecordPathSegment[] valuePaths, final boolean absolute) { + super("join", null, absolute); + this.delimiterPath = delimiterPath; + this.valuePaths = valuePaths; + } + + @Override + public Stream evaluate(final RecordPathEvaluationContext context) { + String delimiter = RecordPathUtils.getFirstStringValue(delimiterPath, context); + if (delimiter == null) { + delimiter = ""; + } + + final List values = new ArrayList<>(); + for (final RecordPathSegment valuePath : valuePaths) { + final Stream stream = valuePath.evaluate(context); + + stream.forEach(fv -> { + final Object value = fv.getValue(); + addStringValue(value, values); + }); + } + + final String joined = String.join(delimiter, values); + final RecordField field = new RecordField("join", RecordFieldType.STRING.getDataType()); + final FieldValue responseValue = new StandardFieldValue(joined, field, null); + return Stream.of(responseValue); + } + + private void addStringValue(final Object value, final List values) { + if (value == null) { + values.add("null"); + return; + } + + if (value instanceof final Object[] array) { + for (final Object element : array) { + addStringValue(element, values); + } + } else if (value instanceof final Iterable iterable) { + for (final Object element : iterable) { + addStringValue(element, values); + } + } else { + values.add(DataTypeUtils.toString(value, null)); + } + } +} diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java index c6f23b145ba10..c86bc2f0f527f 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java @@ -35,6 +35,7 @@ import org.apache.nifi.record.path.filter.NotFilter; import org.apache.nifi.record.path.filter.RecordPathFilter; import org.apache.nifi.record.path.filter.StartsWith; +import org.apache.nifi.record.path.functions.Anchored; import org.apache.nifi.record.path.functions.Base64Decode; import org.apache.nifi.record.path.functions.Base64Encode; import org.apache.nifi.record.path.functions.Coalesce; @@ -45,6 +46,7 @@ import org.apache.nifi.record.path.functions.FilterFunction; import org.apache.nifi.record.path.functions.Format; import org.apache.nifi.record.path.functions.Hash; +import org.apache.nifi.record.path.functions.Join; import org.apache.nifi.record.path.functions.MapOf; import org.apache.nifi.record.path.functions.PadLeft; import org.apache.nifi.record.path.functions.PadRight; @@ -129,6 +131,10 @@ public static RecordPathSegment buildPath(final Tree tree, final RecordPathSegme } case CHILD_REFERENCE: { final Tree childTree = tree.getChild(0); + if (childTree == null) { + return new RootPath(); + } + final int childTreeType = childTree.getType(); if (childTreeType == FIELD_NAME) { final String childName = childTree.getChild(0).getText(); @@ -404,6 +410,24 @@ public static RecordPathSegment buildPath(final Tree tree, final RecordPathSegme final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute); return new Count(args[0], absolute); } + case "join": { + final int numArgs = argumentListTree.getChildCount(); + if (numArgs < 2) { + throw new RecordPathException("Invalid number of arguments: " + functionName + " function takes 2 or more arguments but got " + numArgs); + } + + final RecordPathSegment[] joinPaths = new RecordPathSegment[numArgs - 1]; + for (int i = 0; i < numArgs - 1; i++) { + joinPaths[i] = buildPath(argumentListTree.getChild(i+ 1), null, absolute); + } + + final RecordPathSegment delimiterPath = buildPath(argumentListTree.getChild(0), null, absolute); + return new Join(delimiterPath, joinPaths, absolute); + } + case "anchored": { + final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute); + return new Anchored(args[0], args[1], absolute); + } case "not": case "contains": case "containsRegex": diff --git a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java index 2b6341124c399..f374325528cab 100644 --- a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java +++ b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java @@ -1241,6 +1241,89 @@ public void testConcat() { assertEquals("John Doe: 48", RecordPath.compile("concat(/firstName, ' ', /lastName, ': ', 48)").evaluate(record).getSelectedFields().findFirst().get().getValue()); } + @Test + public void testJoinWithTwoFields() { + final List fields = new ArrayList<>(); + fields.add(new RecordField("fullName", RecordFieldType.INT.getDataType())); + fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("firstName", RecordFieldType.LONG.getDataType())); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map values = new HashMap<>(); + values.put("lastName", "Doe"); + values.put("firstName", "John"); + final Record record = new MapRecord(schema, values); + + assertEquals("Doe, John", RecordPath.compile("join(', ', /lastName, /firstName)").evaluate(record).getSelectedFields().findFirst().get().getValue()); + } + + @Test + public void testJoinWithArray() { + final List fields = new ArrayList<>(); + fields.add(new RecordField("names", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()))); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map values = new HashMap<>(); + values.put("names", new String[] {"John", "Jane", "Jacob", "Judy"}); + final Record record = new MapRecord(schema, values); + + assertEquals("John,Jane,Jacob,Judy", RecordPath.compile("join(',', /names)").evaluate(record).getSelectedFields().findFirst().get().getValue()); + } + + @Test + public void testJoinWithArrayAndMultipleFields() { + final List personFields = new ArrayList<>(); + personFields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType())); + personFields.add(new RecordField("firstName", RecordFieldType.STRING.getDataType())); + personFields.add(new RecordField("friends", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()))); + final RecordSchema personSchema = new SimpleRecordSchema(personFields); + + final Map values = new HashMap<>(); + values.put("friends", new String[] {"John", "Jane", "Jacob", "Judy"}); + values.put("firstName", "John"); + values.put("lastName", "Doe"); + final Record record = new MapRecord(personSchema, values); + + assertEquals("Doe\nJohn\nJane\nJacob", RecordPath.compile("join('\\n', /lastName, /firstName, /friends[1..2])").evaluate(record).getSelectedFields().findFirst().get().getValue()); + } + + @Test + public void testAnchored() { + final List personFields = new ArrayList<>(); + personFields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType())); + personFields.add(new RecordField("firstName", RecordFieldType.STRING.getDataType())); + final RecordSchema personSchema = new SimpleRecordSchema(personFields); + + final List employeeFields = new ArrayList<>(); + employeeFields.add(new RecordField("self", RecordFieldType.RECORD.getRecordDataType(personSchema))); + employeeFields.add(new RecordField("manager", RecordFieldType.RECORD.getRecordDataType(personSchema))); + employeeFields.add(new RecordField("directReports", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(personSchema)))); + final RecordSchema employeeSchema = new SimpleRecordSchema(employeeFields); + + final Record directReport1 = createPerson("John", "Doe", personSchema); + final Record directReport2 = createPerson("John", "Jingleheimer", personSchema); + final Record directReport3 = createPerson("John", "Jacob", personSchema); + final Record manager = createPerson("Jane", "Smith", personSchema); + final Record employee = new MapRecord(employeeSchema, Map.of( + "self", createPerson("John", "Schmidt", personSchema), + "manager", manager, + "directReports", new Record[] {directReport1, directReport2, directReport3} + )); + + assertEquals("John", RecordPath.compile("anchored(/directReports[0], /firstName)").evaluate(employee).getSelectedFields().findFirst().get().getValue()); + assertEquals(List.of("John", "John", "John"), RecordPath.compile("anchored(/directReports, /firstName)").evaluate(employee).getSelectedFields().map(FieldValue::getValue).toList()); + assertEquals(List.of(), RecordPath.compile("anchored(/self/lastName, / )").evaluate(employee).getSelectedFields().map(FieldValue::getValue).toList()); + } + + private Record createPerson(final String firstName, final String lastName, final RecordSchema schema) { + final Map values = Map.of( + "firstName", firstName, + "lastName", lastName); + return new MapRecord(schema, values); + } + + @Test public void testMapOf() { final List fields = new ArrayList<>(); diff --git a/nifi-docs/src/main/asciidoc/record-path-guide.adoc b/nifi-docs/src/main/asciidoc/record-path-guide.adoc index a4d0323c4cb30..fae0538d646f8 100644 --- a/nifi-docs/src/main/asciidoc/record-path-guide.adoc +++ b/nifi-docs/src/main/asciidoc/record-path-guide.adoc @@ -456,6 +456,48 @@ Concatenates all the arguments together. |========================================================== +=== join + +Joins together multiple values with a separator. + +|========================================================== +| RecordPath | Return value +| `join(', ', /workAddress/* )` | 123, 5th Avenue, New York, NY, 10020 +|========================================================== + + +=== anchored + +Allows evaluating a RecordPath while anchoring the root context to a child record. + +|========================================================== +| RecordPath | Return value +| `anchored(/homeAddress, /city)` | Jersey City +|========================================================== + +Additionally, this can be used in conjunction with arrays. For example, if we have the following record: +---- +{ + "id": "1234", + "elements": [{ + "name": "book", + "color": "red" + }, { + "name": "computer", + "color": "black" + }] +} +---- + +We can evaluate hte following Record paths: + +|========================================================== +| RecordPath | Return value +| `anchored(/elements, /name)` | The array containing `book` and `computer` +| `anchored(/elements, concat(/name, ': ', /color))` | The array containing `book: red` and `computer: black` +|========================================================== + + === fieldName Normally, when a path is given to a particular field in a Record, what is returned is the value of that field. It