Skip to content

Commit

Permalink
NIFI-13784: Fixed error handling in several situations. Cleaned up le…
Browse files Browse the repository at this point in the history
…aky abstraction. Some code cleanup, fixed default values for time-based properties to use 'sec' instead of 's' to adhere to typical conventions.
  • Loading branch information
markap14 committed Sep 21, 2024
1 parent 7292479 commit 3bfbe61
Show file tree
Hide file tree
Showing 13 changed files with 295 additions and 187 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ public void testProduceRouteToFailure() throws InitializationException, IOExcept
// attempt to send a non-json FlowFile to Kafka using record strategy;
// this will fail on the record parsing step prior to send; triggering the failure strategy logic
final Map<String, String> attributes = new HashMap<>();
final byte[] bytesFlowFile = IOUtils.toByteArray(Objects.requireNonNull(
getClass().getClassLoader().getResource(TEST_RESOURCE)));
final byte[] bytesFlowFile = IOUtils.toByteArray(Objects.requireNonNull(getClass().getClassLoader().getResource(TEST_RESOURCE)));
runner.enqueue(bytesFlowFile, attributes);
runner.run(1);
runner.assertAllFlowFilesTransferred(PublishKafka.REL_FAILURE, 1);
Expand All @@ -65,10 +64,10 @@ public void testProduceRollback() throws InitializationException, IOException {
// attempt to send a non-json FlowFile to Kafka using record strategy;
// this will fail on the record parsing step prior to send; triggering the failure strategy logic
final Map<String, String> attributes = new HashMap<>();
final byte[] bytesFlowFile = IOUtils.toByteArray(Objects.requireNonNull(
getClass().getClassLoader().getResource(TEST_RESOURCE)));
final byte[] bytesFlowFile = IOUtils.toByteArray(Objects.requireNonNull(getClass().getClassLoader().getResource(TEST_RESOURCE)));
runner.enqueue(bytesFlowFile, attributes);
runner.run(1);

// on rollback, FlowFile is returned to source queue
runner.assertTransferCount(PublishKafka.REL_SUCCESS, 0);
runner.assertTransferCount(PublishKafka.REL_FAILURE, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@
import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration;
import org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService;
import org.apache.nifi.kafka.service.producer.Kafka3ProducerService;
import org.apache.nifi.kafka.shared.property.SaslMechanism;
import org.apache.nifi.kafka.shared.property.IsolationLevel;
import org.apache.nifi.kafka.shared.property.SaslMechanism;
import org.apache.nifi.kafka.shared.property.provider.KafkaPropertyProvider;
import org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
Expand Down Expand Up @@ -175,7 +175,7 @@ public class Kafka3ConnectionService extends AbstractControllerService implement
.name("default.api.timeout.ms")
.displayName("Client Timeout")
.description("Default timeout for Kafka client operations. Mapped to Kafka default.api.timeout.ms. The Kafka request.timeout.ms property is derived from half of the configured timeout")
.defaultValue("60 s")
.defaultValue("60 sec")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
Expand All @@ -191,7 +191,7 @@ public class Kafka3ConnectionService extends AbstractControllerService implement
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.defaultValue("5 s")
.defaultValue("5 sec")
.build();

public static final PropertyDescriptor ACK_WAIT_TIME = new PropertyDescriptor.Builder()
Expand All @@ -204,7 +204,7 @@ public class Kafka3ConnectionService extends AbstractControllerService implement
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.defaultValue("5 s")
.defaultValue("5 sec")
.build();

private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
Expand Down Expand Up @@ -244,7 +244,7 @@ public void onEnabled(final ConfigurationContext configurationContext) {
@OnDisabled
public void onDisabled() {
if (consumerService == null) {
getLogger().warn("Consumer Service not configured");
getLogger().debug("Consumer Service not configured");
} else {
consumerService.close();
}
Expand Down Expand Up @@ -278,6 +278,7 @@ public KafkaProducerService getProducerService(final ProducerConfiguration produ
if (partitionClass != null && partitionClass.startsWith("org.apache.kafka")) {
propertiesProducer.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, partitionClass);
}

return new Kafka3ProducerService(propertiesProducer, serviceConfiguration, producerConfiguration);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.nifi.kafka.service.consumer.pool.Subscription;
import org.apache.nifi.logging.ComponentLog;

import java.io.Closeable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
Expand All @@ -43,14 +45,13 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
* Kafka 3 Consumer Service implementation with Object Pooling for subscribed Kafka Consumers
*/
public class Kafka3ConsumerService implements KafkaConsumerService {
public class Kafka3ConsumerService implements KafkaConsumerService, Closeable {
private final ComponentLog componentLog;

private final ConsumerObjectPool consumerObjectPool;
Expand All @@ -68,10 +69,14 @@ public void commit(final PollingSummary pollingSummary) {
final Map<TopicPartition, OffsetAndMetadata> offsets = getOffsets(pollingSummary);

final long started = System.currentTimeMillis();
final long elapsed = runConsumerFunction(subscription, (consumer) -> {
final Consumer<byte[], byte[]> consumer = borrowConsumer(subscription);
final long elapsed;
try {
consumer.commitSync(offsets);
return started - System.currentTimeMillis();
});
elapsed = started - System.currentTimeMillis();
} finally {
returnConsumer(subscription, consumer);
}

componentLog.debug("Committed Records in [{} ms] for {}", elapsed, pollingSummary);
}
Expand All @@ -81,10 +86,13 @@ public Iterable<ByteRecord> poll(final PollingContext pollingContext) {
Objects.requireNonNull(pollingContext, "Polling Context required");
final Subscription subscription = getSubscription(pollingContext);

return runConsumerFunction(subscription, (consumer) -> {
final Consumer<byte[], byte[]> consumer = borrowConsumer(subscription);
try {
final ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(pollingContext.getMaxUncommittedTime());
return new RecordIterable(consumerRecords);
});
} finally {
returnConsumer(subscription, consumer);
}
}

@Override
Expand All @@ -96,12 +104,16 @@ public List<PartitionState> getPartitionStates(final PollingContext pollingConte

if (topics.hasNext()) {
final String topic = topics.next();
partitionStates = runConsumerFunction(subscription, (consumer) ->
consumer.partitionsFor(topic)
.stream()
.map(partitionInfo -> new PartitionState(partitionInfo.topic(), partitionInfo.partition()))
.collect(Collectors.toList())
);

final Consumer<byte[], byte[]> consumer = borrowConsumer(subscription);
try {
partitionStates = consumer.partitionsFor(topic)
.stream()
.map(partitionInfo -> new PartitionState(partitionInfo.topic(), partitionInfo.partition()))
.collect(Collectors.toList());
} finally {
returnConsumer(subscription, consumer);
}
} else {
partitionStates = Collections.emptyList();
}
Expand Down Expand Up @@ -140,21 +152,26 @@ private Map<TopicPartition, OffsetAndMetadata> getOffsets(final PollingSummary p
return offsets;
}

private <T> T runConsumerFunction(final Subscription subscription, final Function<Consumer<byte[], byte[]>, T> consumerFunction) {
Consumer<byte[], byte[]> consumer = null;
private Consumer<byte[], byte[]> borrowConsumer(final Subscription subscription) {
try {
consumer = consumerObjectPool.borrowObject(subscription);
return consumerFunction.apply(consumer);
return consumerObjectPool.borrowObject(subscription);
} catch (final Exception e) {
throw new ConsumerException("Borrow Consumer failed", e);
} finally {
if (consumer != null) {
try {
consumerObjectPool.returnObject(subscription, consumer);
} catch (final Exception e) {
componentLog.warn("Return Consumer failed", e);
}
}
}

private void returnConsumer(final Subscription subscription, final Consumer<byte[], byte[]> consumer) {
try {
consumerObjectPool.returnObject(subscription, consumer);
} catch (final Exception e) {
try {
consumerObjectPool.invalidateObject(subscription, consumer);
} catch (final Exception e2) {
componentLog.debug("Failed to invalidate Kafka Consumer", e2);
}

consumer.close(Duration.ofSeconds(30));
componentLog.warn("Failed to return Kafka Consumer to pool", e);
}
}

Expand Down Expand Up @@ -191,6 +208,7 @@ public ByteRecord next() {
final RecordHeader recordHeader = new RecordHeader(header.key(), header.value());
recordHeaders.add(recordHeader);
});

return new ByteRecord(
consumerRecord.topic(),
consumerRecord.partition(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,22 @@
import org.apache.nifi.kafka.service.producer.transaction.KafkaTransactionalProducerWrapper;

import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;

public class Kafka3ProducerService implements KafkaProducerService {
private final Producer<byte[], byte[]> producer;

private final Producer<byte[], byte[]> producer;
private final List<ProducerCallback> callbacks;

private final ServiceConfiguration serviceConfiguration;

private final KafkaProducerWrapper wrapper;

private volatile boolean closed = false;

public Kafka3ProducerService(final Properties properties,
final ServiceConfiguration serviceConfiguration,
final ProducerConfiguration producerConfiguration) {
Expand All @@ -65,50 +65,69 @@ public Kafka3ProducerService(final Properties properties,

@Override
public void close() {
producer.close();
closed = true;
producer.close(Duration.ofSeconds(30));
}

@Override
public void init() {
wrapper.init();
public boolean isClosed() {
return closed;
}

@Override
public void send(final Iterator<KafkaRecord> kafkaRecords, final PublishContext publishContext) {
final ProducerCallback callback = new ProducerCallback(publishContext.getFlowFile());
callbacks.add(callback);
Optional.ofNullable(publishContext.getException()).ifPresent(e -> callback.getExceptions().add(e));
if (callback.getExceptions().isEmpty()) {

final List<Exception> callbackExceptions = callback.getExceptions();

final Exception publishException = publishContext.getException();
if (publishException != null) {
callbackExceptions.add(publishException);
}

if (callbackExceptions.isEmpty()) {
try {
wrapper.send(kafkaRecords, publishContext, callback);
} catch (final UncheckedIOException e) {
callback.getExceptions().add(e);
// We don't throw the Exception because we will later deal with this by
// checking if there are any Exceptions.
callbackExceptions.add(e);
} catch (final Exception e) {
// We re-throw the Exception in this case because it is an unexpected Exception
callbackExceptions.add(e);
throw e;
}
}
}

@Override
public RecordSummary complete() {
final boolean shouldCommit = callbacks.stream().noneMatch(ProducerCallback::isFailure);
if (shouldCommit) {
producer.flush(); // finish Kafka processing of in-flight data
wrapper.commit(); // commit Kafka transaction (when transactions configured)
} else {
// rollback on transactions + exception
wrapper.abort();
}

final RecordSummary recordSummary = new RecordSummary(); // scrape the Kafka callbacks for disposition of in-flight data
final List<FlowFileResult> flowFileResults = recordSummary.getFlowFileResults();
for (final ProducerCallback callback : callbacks) {
// short-circuit the handling of the flowfile results here
if (callback.isFailure()) {
flowFileResults.add(callback.toFailureResult());
try {
final boolean shouldCommit = callbacks.stream().noneMatch(ProducerCallback::isFailure);
if (shouldCommit) {
producer.flush(); // finish Kafka processing of in-flight data
wrapper.commit(); // commit Kafka transaction (when transactions configured)
} else {
flowFileResults.add(callback.waitComplete(serviceConfiguration.getMaxAckWait().toMillis()));
// rollback on transactions + exception
wrapper.abort();
}

final RecordSummary recordSummary = new RecordSummary(); // scrape the Kafka callbacks for disposition of in-flight data
final List<FlowFileResult> flowFileResults = recordSummary.getFlowFileResults();
for (final ProducerCallback callback : callbacks) {
// short-circuit the handling of the flowfile results here
if (callback.isFailure()) {
flowFileResults.add(callback.toFailureResult());
} else {
flowFileResults.add(callback.waitComplete(serviceConfiguration.getMaxAckWait().toMillis()));
}
}

return recordSummary;
} finally {
callbacks.clear();
}
return recordSummary;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ public KafkaNonTransactionalProducerWrapper(final Producer<byte[], byte[]> produ
super(producer);
}

@Override
public void init() {
}

@Override
public void commit() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,6 @@ public KafkaProducerWrapper(final Producer<byte[], byte[]> producer) {
this.producer = producer;
}

/**
* Transaction-enabled publish to Kafka involves the use of special Kafka client library APIs.
*/
public abstract void init();

public void send(final Iterator<KafkaRecord> kafkaRecords, final PublishContext publishContext, final ProducerCallback callback) {
while (kafkaRecords.hasNext()) {
final KafkaRecord kafkaRecord = kafkaRecords.next();
Expand Down
Loading

0 comments on commit 3bfbe61

Please sign in to comment.