From c9cc04fb73411d2b914914909f06ed618d9f8834 Mon Sep 17 00:00:00 2001 From: vernedeng Date: Fri, 15 Dec 2023 17:15:48 +0800 Subject: [PATCH] [INLONG-9491]][Sort] Csv format support ignore trailing unmappable fields --- .../format-inlongmsg-base/pom.xml | 2 +- .../inlongmsg/InLongMsgDecodingFormat.java | 44 ++++++++++++++++++- .../inlongmsg/InLongMsgFormatFactory.java | 5 ++- .../formats/inlongmsg/InLongMsgOptions.java | 7 +++ 4 files changed, 53 insertions(+), 5 deletions(-) diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml b/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml index 8a3f7207211..8846aa0446e 100644 --- a/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml +++ b/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml @@ -94,7 +94,7 @@ org.apache.flink flink-csv - test + provided diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java index 0f67bbc0726..b8eddb5ad58 100644 --- a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java +++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java @@ -17,6 +17,13 @@ package org.apache.inlong.sort.formats.inlongmsg; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.formats.csv.CsvRowDataDeserializationSchema; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; import org.apache.inlong.sort.formats.inlongmsg.InLongMsgDeserializationSchema.MetadataConverter; import org.apache.flink.api.common.serialization.DeserializationSchema; @@ -31,6 +38,7 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.utils.DataTypeUtils; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -40,6 +48,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +@Slf4j public class InLongMsgDecodingFormat implements DecodingFormat> { private final String innerFormatMetaPrefix; @@ -50,14 +59,18 @@ public class InLongMsgDecodingFormat implements DecodingFormat> innerDecodingFormat, String innerFormatMetaPrefix, - boolean ignoreErrors) { + boolean ignoreErrors, + boolean ignoreTrailingUnmappable) { this.innerDecodingFormat = innerDecodingFormat; this.innerFormatMetaPrefix = innerFormatMetaPrefix; this.metadataKeys = Collections.emptyList(); this.ignoreErrors = ignoreErrors; + this.ignoreTrailingUnmappable = ignoreTrailingUnmappable; } @Override @@ -83,8 +96,14 @@ public DeserializationSchema createRuntimeDecoder(Context context, Data final TypeInformation producedTypeInfo = context.createTypeInformation(producedDataType); + DeserializationSchema innerSchema = + innerDecodingFormat.createRuntimeDecoder(context, physicalDataType); + if (innerSchema instanceof CsvRowDataDeserializationSchema && ignoreTrailingUnmappable) { + this.makeCsvInnerFormatIgnoreTrailingUnmappable(innerSchema); + } + return new InLongMsgDeserializationSchema( - innerDecodingFormat.createRuntimeDecoder(context, physicalDataType), + innerSchema, metadataConverters, producedTypeInfo, ignoreErrors); @@ -190,4 +209,25 @@ public Object read(InLongMsgHead head) { this.converter = converter; } } + + + private void makeCsvInnerFormatIgnoreTrailingUnmappable(DeserializationSchema innerSchema) { + try { + Field readerField = CsvRowDataDeserializationSchema.class.getDeclaredField("objectReader"); + readerField.setAccessible(true); + ObjectReader oldReader = (ObjectReader) readerField.get(innerSchema); + + Field schemaField = ObjectReader.class.getDeclaredField("_schema"); + schemaField.setAccessible(true); + CsvSchema oldSchema = (CsvSchema) schemaField.get(oldReader); + + ObjectReader newReader = new CsvMapper() + .enable(CsvParser.Feature.IGNORE_TRAILING_UNMAPPABLE) + .readerFor(JsonNode.class) + .with(oldSchema); + readerField.set(innerSchema, newReader); + } catch (Throwable t) { + log.error("failed to make csv inner format to ignore trailing unmappable, ex is ", t); + } + } } diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java index c9b368a3645..4703d29803f 100644 --- a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java +++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java @@ -35,6 +35,7 @@ import java.util.Set; import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.IGNORE_PARSE_ERRORS; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.IGNORE_TRAILING_UNMAPPABLE; import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.INNER_FORMAT; import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.validateDecodingFormatOptions; @@ -65,8 +66,8 @@ public DecodingFormat> createDecodingFormat(Conte DecodingFormat> innerFormat = innerFactory.createDecodingFormat(context, new DelegatingConfiguration(allOptions, innerFormatPrefix)); boolean ignoreErrors = formatOptions.get(IGNORE_PARSE_ERRORS); - - return new InLongMsgDecodingFormat(innerFormat, innerFormatMetaPrefix, ignoreErrors); + boolean ignoreTrailingUnmappable = formatOptions.get(IGNORE_TRAILING_UNMAPPABLE); + return new InLongMsgDecodingFormat(innerFormat, innerFormatMetaPrefix, ignoreErrors, ignoreTrailingUnmappable); } @Override diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java index 92a2ebc0b78..7e1d33b3fa0 100644 --- a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java +++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java @@ -41,6 +41,13 @@ private InLongMsgOptions() { .withDescription("Optional flag to skip fields and rows with parse errors instead of failing;\n" + "fields are set to null in case of errors"); + public static final ConfigOption IGNORE_TRAILING_UNMAPPABLE = + ConfigOptions.key("ignore-trailing-unmappable") + .booleanType() + .defaultValue(true) + .withDescription("Allows the case that real size exceeds the expected size.\n " + + "The extra column will be skipped"); + public static void validateDecodingFormatOptions(ReadableConfig config) { String innerFormat = config.get(INNER_FORMAT); if (innerFormat == null) {