Skip to content

Commit

Permalink
[INLONG-9491]][Sort] Csv format support ignore trailing unmappable fi…
Browse files Browse the repository at this point in the history
…elds
  • Loading branch information
vernedeng committed Dec 15, 2023
1 parent 47c4fa0 commit c9cc04f
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 5 deletions.
2 changes: 1 addition & 1 deletion inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<scope>test</scope>
<scope>provided</scope>
</dependency>

</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@

package org.apache.inlong.sort.formats.inlongmsg;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.formats.csv.CsvRowDataDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgDeserializationSchema.MetadataConverter;

import org.apache.flink.api.common.serialization.DeserializationSchema;
Expand All @@ -31,6 +38,7 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -40,6 +48,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Slf4j
public class InLongMsgDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> {

private final String innerFormatMetaPrefix;
Expand All @@ -50,14 +59,18 @@ public class InLongMsgDecodingFormat implements DecodingFormat<DeserializationSc

private final boolean ignoreErrors;

private final boolean ignoreTrailingUnmappable;

public InLongMsgDecodingFormat(
DecodingFormat<DeserializationSchema<RowData>> innerDecodingFormat,
String innerFormatMetaPrefix,
boolean ignoreErrors) {
boolean ignoreErrors,
boolean ignoreTrailingUnmappable) {
this.innerDecodingFormat = innerDecodingFormat;
this.innerFormatMetaPrefix = innerFormatMetaPrefix;
this.metadataKeys = Collections.emptyList();
this.ignoreErrors = ignoreErrors;
this.ignoreTrailingUnmappable = ignoreTrailingUnmappable;
}

@Override
Expand All @@ -83,8 +96,14 @@ public DeserializationSchema<RowData> createRuntimeDecoder(Context context, Data
final TypeInformation<RowData> producedTypeInfo =
context.createTypeInformation(producedDataType);

DeserializationSchema<RowData> innerSchema =
innerDecodingFormat.createRuntimeDecoder(context, physicalDataType);
if (innerSchema instanceof CsvRowDataDeserializationSchema && ignoreTrailingUnmappable) {
this.makeCsvInnerFormatIgnoreTrailingUnmappable(innerSchema);
}

return new InLongMsgDeserializationSchema(
innerDecodingFormat.createRuntimeDecoder(context, physicalDataType),
innerSchema,
metadataConverters,
producedTypeInfo,
ignoreErrors);
Expand Down Expand Up @@ -190,4 +209,25 @@ public Object read(InLongMsgHead head) {
this.converter = converter;
}
}


private void makeCsvInnerFormatIgnoreTrailingUnmappable(DeserializationSchema<RowData> innerSchema) {
try {
Field readerField = CsvRowDataDeserializationSchema.class.getDeclaredField("objectReader");
readerField.setAccessible(true);
ObjectReader oldReader = (ObjectReader) readerField.get(innerSchema);

Field schemaField = ObjectReader.class.getDeclaredField("_schema");
schemaField.setAccessible(true);
CsvSchema oldSchema = (CsvSchema) schemaField.get(oldReader);

ObjectReader newReader = new CsvMapper()
.enable(CsvParser.Feature.IGNORE_TRAILING_UNMAPPABLE)
.readerFor(JsonNode.class)
.with(oldSchema);
readerField.set(innerSchema, newReader);
} catch (Throwable t) {
log.error("failed to make csv inner format to ignore trailing unmappable, ex is ", t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Set;

import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.IGNORE_PARSE_ERRORS;
import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.IGNORE_TRAILING_UNMAPPABLE;
import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.INNER_FORMAT;
import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.validateDecodingFormatOptions;

Expand Down Expand Up @@ -65,8 +66,8 @@ public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(Conte
DecodingFormat<DeserializationSchema<RowData>> innerFormat =
innerFactory.createDecodingFormat(context, new DelegatingConfiguration(allOptions, innerFormatPrefix));
boolean ignoreErrors = formatOptions.get(IGNORE_PARSE_ERRORS);

return new InLongMsgDecodingFormat(innerFormat, innerFormatMetaPrefix, ignoreErrors);
boolean ignoreTrailingUnmappable = formatOptions.get(IGNORE_TRAILING_UNMAPPABLE);
return new InLongMsgDecodingFormat(innerFormat, innerFormatMetaPrefix, ignoreErrors, ignoreTrailingUnmappable);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ private InLongMsgOptions() {
.withDescription("Optional flag to skip fields and rows with parse errors instead of failing;\n"
+ "fields are set to null in case of errors");

public static final ConfigOption<Boolean> IGNORE_TRAILING_UNMAPPABLE =
ConfigOptions.key("ignore-trailing-unmappable")
.booleanType()
.defaultValue(true)
.withDescription("Allows the case that real size exceeds the expected size.\n "
+ "The extra column will be skipped");

public static void validateDecodingFormatOptions(ReadableConfig config) {
String innerFormat = config.get(INNER_FORMAT);
if (innerFormat == null) {
Expand Down

0 comments on commit c9cc04f

Please sign in to comment.