diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml b/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
index 8a3f7207211..8846aa0446e 100644
--- a/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
@@ -94,7 +94,7 @@
org.apache.flink
flink-csv
- test
+ provided
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
index 0f67bbc0726..b8eddb5ad58 100644
--- a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
@@ -17,6 +17,13 @@
package org.apache.inlong.sort.formats.inlongmsg;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.formats.csv.CsvRowDataDeserializationSchema;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgDeserializationSchema.MetadataConverter;
import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -31,6 +38,7 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
+import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -40,6 +48,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
+@Slf4j
public class InLongMsgDecodingFormat implements DecodingFormat> {
private final String innerFormatMetaPrefix;
@@ -50,14 +59,18 @@ public class InLongMsgDecodingFormat implements DecodingFormat> innerDecodingFormat,
String innerFormatMetaPrefix,
- boolean ignoreErrors) {
+ boolean ignoreErrors,
+ boolean ignoreTrailingUnmappable) {
this.innerDecodingFormat = innerDecodingFormat;
this.innerFormatMetaPrefix = innerFormatMetaPrefix;
this.metadataKeys = Collections.emptyList();
this.ignoreErrors = ignoreErrors;
+ this.ignoreTrailingUnmappable = ignoreTrailingUnmappable;
}
@Override
@@ -83,8 +96,14 @@ public DeserializationSchema createRuntimeDecoder(Context context, Data
final TypeInformation producedTypeInfo =
context.createTypeInformation(producedDataType);
+ DeserializationSchema innerSchema =
+ innerDecodingFormat.createRuntimeDecoder(context, physicalDataType);
+ if (innerSchema instanceof CsvRowDataDeserializationSchema && ignoreTrailingUnmappable) {
+ this.makeCsvInnerFormatIgnoreTrailingUnmappable(innerSchema);
+ }
+
return new InLongMsgDeserializationSchema(
- innerDecodingFormat.createRuntimeDecoder(context, physicalDataType),
+ innerSchema,
metadataConverters,
producedTypeInfo,
ignoreErrors);
@@ -190,4 +209,25 @@ public Object read(InLongMsgHead head) {
this.converter = converter;
}
}
+
+
+ private void makeCsvInnerFormatIgnoreTrailingUnmappable(DeserializationSchema innerSchema) {
+ try {
+ Field readerField = CsvRowDataDeserializationSchema.class.getDeclaredField("objectReader");
+ readerField.setAccessible(true);
+ ObjectReader oldReader = (ObjectReader) readerField.get(innerSchema);
+
+ Field schemaField = ObjectReader.class.getDeclaredField("_schema");
+ schemaField.setAccessible(true);
+ CsvSchema oldSchema = (CsvSchema) schemaField.get(oldReader);
+
+ ObjectReader newReader = new CsvMapper()
+ .enable(CsvParser.Feature.IGNORE_TRAILING_UNMAPPABLE)
+ .readerFor(JsonNode.class)
+ .with(oldSchema);
+ readerField.set(innerSchema, newReader);
+ } catch (Throwable t) {
+ log.error("failed to make csv inner format to ignore trailing unmappable, ex is ", t);
+ }
+ }
}
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java
index c9b368a3645..4703d29803f 100644
--- a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java
@@ -35,6 +35,7 @@
import java.util.Set;
import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.IGNORE_PARSE_ERRORS;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.IGNORE_TRAILING_UNMAPPABLE;
import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.INNER_FORMAT;
import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.validateDecodingFormatOptions;
@@ -65,8 +66,8 @@ public DecodingFormat> createDecodingFormat(Conte
DecodingFormat> innerFormat =
innerFactory.createDecodingFormat(context, new DelegatingConfiguration(allOptions, innerFormatPrefix));
boolean ignoreErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
-
- return new InLongMsgDecodingFormat(innerFormat, innerFormatMetaPrefix, ignoreErrors);
+ boolean ignoreTrailingUnmappable = formatOptions.get(IGNORE_TRAILING_UNMAPPABLE);
+ return new InLongMsgDecodingFormat(innerFormat, innerFormatMetaPrefix, ignoreErrors, ignoreTrailingUnmappable);
}
@Override
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java
index 92a2ebc0b78..7e1d33b3fa0 100644
--- a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java
@@ -41,6 +41,13 @@ private InLongMsgOptions() {
.withDescription("Optional flag to skip fields and rows with parse errors instead of failing;\n"
+ "fields are set to null in case of errors");
+ public static final ConfigOption IGNORE_TRAILING_UNMAPPABLE =
+ ConfigOptions.key("ignore-trailing-unmappable")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Allows the case that real size exceeds the expected size.\n "
+ + "The extra column will be skipped");
+
public static void validateDecodingFormatOptions(ReadableConfig config) {
String innerFormat = config.get(INNER_FORMAT);
if (innerFormat == null) {