From d8051cf738f0224ac6cc53b62795a0011e04ae47 Mon Sep 17 00:00:00 2001 From: jarvis Date: Tue, 24 Dec 2024 21:59:26 +0800 Subject: [PATCH] [improve][kafka] use config to set kafka deserialize error ignore parameter --- .../kafka/source/KafkaSourceConfig.java | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java index 1093d3f2f28..0e8d71244ec 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java @@ -255,10 +255,13 @@ private DeserializationSchema createDeserializationSchema( .setCatalogTable(catalogTable) .build(); } - + boolean ignoreParseErrors = + readonlyConfig + .get(MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION) + .equals(MessageFormatErrorHandleWay.SKIP); switch (format) { case JSON: - return new JsonDeserializationSchema(catalogTable, false, false); + return new JsonDeserializationSchema(catalogTable, false, ignoreParseErrors); case TEXT: String delimiter = readonlyConfig.get(FIELD_DELIMITER); return TextDeserializationSchema.builder() @@ -267,15 +270,15 @@ private DeserializationSchema createDeserializationSchema( .build(); case CANAL_JSON: return CanalJsonDeserializationSchema.builder(catalogTable) - .setIgnoreParseErrors(true) + .setIgnoreParseErrors(ignoreParseErrors) .build(); case OGG_JSON: return OggJsonDeserializationSchema.builder(catalogTable) - .setIgnoreParseErrors(true) + .setIgnoreParseErrors(ignoreParseErrors) .build(); case MAXWELL_JSON: return MaxWellJsonDeserializationSchema.builder(catalogTable) - .setIgnoreParseErrors(true) + .setIgnoreParseErrors(ignoreParseErrors) .build(); case COMPATIBLE_KAFKA_CONNECT_JSON: @@ -286,10 +289,11 @@ private DeserializationSchema createDeserializationSchema( readonlyConfig.get( KafkaConnectJsonFormatOptions.VALUE_CONVERTER_SCHEMA_ENABLED); return new CompatibleKafkaConnectDeserializationSchema( - catalogTable, keySchemaEnable, valueSchemaEnable, false, false); + catalogTable, keySchemaEnable, valueSchemaEnable, false, ignoreParseErrors); case DEBEZIUM_JSON: boolean includeSchema = readonlyConfig.get(DEBEZIUM_RECORD_INCLUDE_SCHEMA); - return new DebeziumJsonDeserializationSchema(catalogTable, true, includeSchema); + return new DebeziumJsonDeserializationSchema( + catalogTable, ignoreParseErrors, includeSchema); case AVRO: return new AvroDeserializationSchema(catalogTable); case PROTOBUF: