diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java index c73535a2308..f2651a7b5a7 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java @@ -30,6 +30,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.serialization.ByteArraySerializer; @@ -62,6 +63,8 @@ public class KafkaProducerCluster implements LifecycleAware { private KafkaProducer producer; + private long configuredMaxPayloadSize; + public KafkaProducerCluster( String workerName, CacheClusterConfig cacheClusterConfig, @@ -125,6 +128,7 @@ private void startByNodeConfig() { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, nodeConfig.getBootstrapServers()); props.put(ProducerConfig.CLIENT_ID_CONFIG, nodeConfig.getClientId() + "-" + workerName); LOG.info("init kafka client by node config info: " + props); + configuredMaxPayloadSize = Long.parseLong(props.getProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG)); producer = new KafkaProducer<>(props, new StringSerializer(), new ByteArraySerializer()); Preconditions.checkNotNull(producer); } catch (Exception e) { @@ -217,14 +221,25 @@ public boolean send(ProfileEvent profileEvent, Transaction tx) throws IOExceptio sinkContext.addSendResultMetric(profileEvent, topic, true, sendTime); profileEvent.ack(); } else { - if (ex instanceof UnknownTopicOrPartitionException + + if (ex instanceof RecordTooLargeException) { + // for the message bigger than configuredMaxPayloadSize, just discard it; + // otherwise, retry and wait for the server side changes the limitation + if (record.value().length > configuredMaxPayloadSize) { + tx.commit(); + profileEvent.ack(); + } else { + tx.rollback(); + } + } else if (ex instanceof UnknownTopicOrPartitionException || !(ex instanceof RetriableException)) { + // for non-retriable exception, just discard it tx.commit(); + profileEvent.ack(); } else { tx.rollback(); } - LOG.error(String.format("send failed, topic is %s, partition is %s", - metadata.topic(), metadata.partition()), ex); + LOG.error(String.format("send failed, topic is %s", topic), ex); sinkContext.addSendResultMetric(profileEvent, topic, false, sendTime); } tx.close();