Skip to content

Commit

Permalink
NIFI-14084 Replaced anonymous classes with lambdas part 2 (#9592)
Browse files Browse the repository at this point in the history
Signed-off-by: Lucas Ottersbach <[email protected]>
  • Loading branch information
dan-s1 authored Dec 25, 2024
1 parent 8c2ceac commit b6bc6bf
Show file tree
Hide file tree
Showing 66 changed files with 1,199 additions and 1,674 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,7 @@ public Thread newThread(final Runnable r) {
}
});

taskExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
peerSelector.refresh();
}
}, 0, 5, TimeUnit.SECONDS);
taskExecutor.scheduleWithFixedDelay(peerSelector::refresh, 0, 5, TimeUnit.SECONDS);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,19 +332,16 @@ public void testUpdatePerformance() throws IOException, InterruptedException {

for (int j = 0; j < 2; j++) {
for (int i = 0; i < numThreads; i++) {
final Thread t = new Thread(new Runnable() {
@Override
public void run() {
final List<DummyRecord> batch = new ArrayList<>();
for (int i = 0; i < updateCountPerThread / batchSize; i++) {
batch.clear();
for (int j = 0; j < batchSize; j++) {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
batch.add(record);
}

assertThrows(Throwable.class, () -> repo.update(batch, false));
final Thread t = new Thread(() -> {
final List<DummyRecord> batch = new ArrayList<>();
for (int i1 = 0; i1 < updateCountPerThread / batchSize; i1++) {
batch.clear();
for (int j1 = 0; j1 < batchSize; j1++) {
final DummyRecord record = new DummyRecord(String.valueOf(i1), UpdateType.CREATE);
batch.add(record);
}

assertThrows(Throwable.class, () -> repo.update(batch, false));
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void close() throws IOException, TimeoutException {
}

@Override
public void close(int closeCode, String closeMessage) throws IOException, TimeoutException {
public void close(int closeCode, String closeMessage) {
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");

}
Expand Down Expand Up @@ -285,14 +285,11 @@ private void discard(final String exchange, final String routingKey, boolean man
final byte[] body) {
// NO ROUTE. Invoke return listener async
for (final ReturnListener listener : returnListeners) {
this.executorService.execute(new Runnable() {
@Override
public void run() {
try {
listener.handleReturn(-9, "Rejecting", exchange, routingKey, props, body);
} catch (Exception e) {
throw new IllegalStateException("Failed to send return message", e);
}
this.executorService.execute(() -> {
try {
listener.handleReturn(-9, "Rejecting", exchange, routingKey, props, body);
} catch (Exception e) {
throw new IllegalStateException("Failed to send return message", e);
}
});
}
Expand Down Expand Up @@ -509,7 +506,7 @@ public void basicReject(long deliveryTag, boolean requeue) throws IOException {
}

@Override
public String basicConsume(String queue, Consumer callback) throws IOException {
public String basicConsume(String queue, Consumer callback) {
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.nifi.processors.avro;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
Expand Down Expand Up @@ -51,7 +50,6 @@
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;

@SideEffectFree
Expand Down Expand Up @@ -163,44 +161,41 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}

try {
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream rawIn) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn);
final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<>())) {

final Schema schema = reader.getSchema();
if (schema == null) {
throw new ProcessException("Avro schema was null");
}
session.read(flowFile, rawIn -> {
try (final InputStream in = new BufferedInputStream(rawIn);
final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<>())) {

for (String key : reader.getMetaKeys()) {
if (requestedMetadataKeys.contains(key)) {
avroMetadata.put(key, reader.getMetaString(key));
}
}
final Schema schema = reader.getSchema();
if (schema == null) {
throw new ProcessException("Avro schema was null");
}

try {
final byte[] rawFingerprint = SchemaNormalization.parsingFingerprint(fingerprintAlgorithm, schema);
avroMetadata.put(SCHEMA_FINGERPRINT_ATTR, Hex.encodeHexString(rawFingerprint));
avroMetadata.put(SCHEMA_TYPE_ATTR, schema.getType().getName());
avroMetadata.put(SCHEMA_NAME_ATTR, schema.getName());
} catch (NoSuchAlgorithmException e) {
// shouldn't happen since allowable values are valid algorithms
throw new ProcessException(e);
for (String key : reader.getMetaKeys()) {
if (requestedMetadataKeys.contains(key)) {
avroMetadata.put(key, reader.getMetaString(key));
}
}

try {
final byte[] rawFingerprint = SchemaNormalization.parsingFingerprint(fingerprintAlgorithm, schema);
avroMetadata.put(SCHEMA_FINGERPRINT_ATTR, Hex.encodeHexString(rawFingerprint));
avroMetadata.put(SCHEMA_TYPE_ATTR, schema.getType().getName());
avroMetadata.put(SCHEMA_NAME_ATTR, schema.getName());
} catch (NoSuchAlgorithmException e) {
// shouldn't happen since allowable values are valid algorithms
throw new ProcessException(e);
}

if (countRecords) {
long recordCount = reader.getBlockCount();
try {
while (reader.nextBlock() != null) {
recordCount += reader.getBlockCount();
}
} catch (NoSuchElementException e) {
// happens at end of file
if (countRecords) {
long recordCount = reader.getBlockCount();
try {
while (reader.nextBlock() != null) {
recordCount += reader.getBlockCount();
}
avroMetadata.put(ITEM_COUNT_ATTR, String.valueOf(recordCount));
} catch (NoSuchElementException e) {
// happens at end of file
}
avroMetadata.put(ITEM_COUNT_ATTR, String.valueOf(recordCount));
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;

@SideEffectFree
Expand Down Expand Up @@ -247,55 +245,49 @@ public List<FlowFile> split(final ProcessSession session, final FlowFile origina
final List<FlowFile> childFlowFiles = new ArrayList<>();
final AtomicReference<GenericRecord> recordHolder = new AtomicReference<>(null);

session.read(originalFlowFile, new InputStreamCallback() {
@Override
public void process(InputStream rawIn) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn);
final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<>())) {
session.read(originalFlowFile, rawIn -> {
try (final InputStream in = new BufferedInputStream(rawIn);
final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<>())) {

final AtomicReference<String> codec = new AtomicReference<>(reader.getMetaString(DataFileConstants.CODEC));
if (codec.get() == null) {
codec.set(DataFileConstants.NULL_CODEC);
}
final AtomicReference<String> codec = new AtomicReference<>(reader.getMetaString(DataFileConstants.CODEC));
if (codec.get() == null) {
codec.set(DataFileConstants.NULL_CODEC);
}

// while records are left, start a new split by spawning a FlowFile
final AtomicReference<Boolean> hasNextHolder = new AtomicReference<>(reader.hasNext());
while (hasNextHolder.get()) {
FlowFile childFlowFile = session.create(originalFlowFile);
childFlowFile = session.write(childFlowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream rawOut) throws IOException {
try (final BufferedOutputStream out = new BufferedOutputStream(rawOut)) {
splitWriter.init(reader, codec.get(), out);

// append to the current FlowFile until no more records, or splitSize is reached
int recordCount = 0;
while (hasNextHolder.get() && recordCount < splitSize) {
recordHolder.set(reader.next(recordHolder.get()));
splitWriter.write(recordHolder.get());
recordCount++;
hasNextHolder.set(reader.hasNext());
}

splitWriter.flush();
} finally {
splitWriter.close();
}
// while records are left, start a new split by spawning a FlowFile
final AtomicReference<Boolean> hasNextHolder = new AtomicReference<>(reader.hasNext());
while (hasNextHolder.get()) {
FlowFile childFlowFile = session.create(originalFlowFile);
childFlowFile = session.write(childFlowFile, rawOut -> {
try (final BufferedOutputStream out = new BufferedOutputStream(rawOut)) {
splitWriter.init(reader, codec.get(), out);

// append to the current FlowFile until no more records, or splitSize is reached
int recordCount = 0;
while (hasNextHolder.get() && recordCount < splitSize) {
recordHolder.set(reader.next(recordHolder.get()));
splitWriter.write(recordHolder.get());
recordCount++;
hasNextHolder.set(reader.hasNext());
}
});

// would prefer this to be part of the SplitWriter, but putting the metadata in FlowFile attributes
// can't be done inside of an OutputStream callback which is where the splitWriter is used
if (splitWriter instanceof BareRecordSplitWriter && transferMetadata) {
final Map<String, String> metadata = new HashMap<>();
for (String metaKey : reader.getMetaKeys()) {
metadata.put(metaKey, reader.getMetaString(metaKey));
}
childFlowFile = session.putAllAttributes(childFlowFile, metadata);
}

childFlowFiles.add(childFlowFile);
splitWriter.flush();
} finally {
splitWriter.close();
}
});

// would prefer this to be part of the SplitWriter, but putting the metadata in FlowFile attributes
// can't be done inside of an OutputStream callback which is where the splitWriter is used
if (splitWriter instanceof BareRecordSplitWriter && transferMetadata) {
final Map<String, String> metadata = new HashMap<>();
for (String metaKey : reader.getMetaKeys()) {
metadata.put(metaKey, reader.getMetaString(metaKey));
}
childFlowFile = session.putAllAttributes(childFlowFile, metadata);
}

childFlowFiles.add(childFlowFile);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.listen.event.Event;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -185,15 +183,12 @@ protected Map<String, FlowFileEventBatch> getBatches(final ProcessSession sessio
final boolean writeDemarcator = (i > 0);
try {
final byte[] rawMessage = event.getData();
FlowFile appendedFlowFile = session.append(batch.getFlowFile(), new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
if (writeDemarcator) {
out.write(messageDemarcatorBytes);
}

out.write(rawMessage);
FlowFile appendedFlowFile = session.append(batch.getFlowFile(), out -> {
if (writeDemarcator) {
out.write(messageDemarcatorBytes);
}

out.write(rawMessage);
});

// update the FlowFile reference in the batch object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
package org.apache.nifi.processor.util.listen;

import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
Expand Down Expand Up @@ -51,37 +49,34 @@ public class ListenerProperties {
public static final PropertyDescriptor NETWORK_INTF_NAME = new PropertyDescriptor.Builder()
.name("Local Network Interface")
.description("The name of a local network interface to be used to restrict listening to a specific LAN.")
.addValidator(new Validator() {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
ValidationResult result = new ValidationResult.Builder()
.subject("Local Network Interface").valid(true).input(input).build();
if (interfaceSet.contains(input.toLowerCase())) {
return result;
}

String message;
String realValue = input;
try {
if (context.isExpressionLanguagePresent(input)) {
AttributeExpression ae = context.newExpressionLanguageCompiler().compile(input);
realValue = ae.evaluate();
}

if (interfaceSet.contains(realValue.toLowerCase())) {
return result;
}
.addValidator((subject, input, context) -> {
ValidationResult result = new ValidationResult.Builder()
.subject("Local Network Interface").valid(true).input(input).build();
if (interfaceSet.contains(input.toLowerCase())) {
return result;
}

message = realValue + " is not a valid network name. Valid names are " + interfaceSet.toString();
String message;
String realValue = input;
try {
if (context.isExpressionLanguagePresent(input)) {
AttributeExpression ae = context.newExpressionLanguageCompiler().compile(input);
realValue = ae.evaluate();
}

} catch (IllegalArgumentException e) {
message = "Not a valid AttributeExpression: " + e.getMessage();
if (interfaceSet.contains(realValue.toLowerCase())) {
return result;
}
result = new ValidationResult.Builder().subject("Local Network Interface")
.valid(false).input(input).explanation(message).build();

return result;
message = realValue + " is not a valid network name. Valid names are " + interfaceSet;

} catch (IllegalArgumentException e) {
message = "Not a valid AttributeExpression: " + e.getMessage();
}
result = new ValidationResult.Builder().subject("Local Network Interface")
.valid(false).input(input).explanation(message).build();

return result;
})
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
Expand Down
Loading

0 comments on commit b6bc6bf

Please sign in to comment.