Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-9491][Sort] CSV format support ignore trailing unmappable fields #9492

Merged
merged 5 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<scope>test</scope>
<scope>provided</scope>
</dependency>

</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,17 @@

import org.apache.inlong.sort.formats.inlongmsg.InLongMsgDeserializationSchema.MetadataConverter;

import com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.formats.csv.CsvRowDataDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
Expand All @@ -31,6 +40,7 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -40,6 +50,12 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.EMPTY_STRING_AS_NULL;
import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.IGNORE_PARSE_ERRORS;
import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.IGNORE_TRAILING_UNMAPPABLE;
import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.INSERT_NULLS_FOR_MISSING_COLUMNS;

@Slf4j
public class InLongMsgDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> {

private final String innerFormatMetaPrefix;
Expand All @@ -50,14 +66,23 @@ public class InLongMsgDecodingFormat implements DecodingFormat<DeserializationSc

private final boolean ignoreErrors;

private final boolean ignoreTrailingUnmappable;

private final boolean insertNullsForMissingColumns;

private final boolean emptyStringAsNull;

public InLongMsgDecodingFormat(
DecodingFormat<DeserializationSchema<RowData>> innerDecodingFormat,
String innerFormatMetaPrefix,
boolean ignoreErrors) {
ReadableConfig formatOptions) {
this.innerDecodingFormat = innerDecodingFormat;
this.innerFormatMetaPrefix = innerFormatMetaPrefix;
this.metadataKeys = Collections.emptyList();
this.ignoreErrors = ignoreErrors;
this.ignoreErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
this.ignoreTrailingUnmappable = formatOptions.get(IGNORE_TRAILING_UNMAPPABLE);
this.insertNullsForMissingColumns = formatOptions.get(INSERT_NULLS_FOR_MISSING_COLUMNS);
this.emptyStringAsNull = formatOptions.get(EMPTY_STRING_AS_NULL);
}

@Override
Expand All @@ -83,8 +108,15 @@ public DeserializationSchema<RowData> createRuntimeDecoder(Context context, Data
final TypeInformation<RowData> producedTypeInfo =
context.createTypeInformation(producedDataType);

DeserializationSchema<RowData> innerSchema =
innerDecodingFormat.createRuntimeDecoder(context, physicalDataType);
if (innerSchema instanceof CsvRowDataDeserializationSchema) {
configCsvInnerFormat(innerSchema, ignoreTrailingUnmappable,
insertNullsForMissingColumns, emptyStringAsNull);
}

return new InLongMsgDeserializationSchema(
innerDecodingFormat.createRuntimeDecoder(context, physicalDataType),
innerSchema,
metadataConverters,
producedTypeInfo,
ignoreErrors);
Expand Down Expand Up @@ -190,4 +222,30 @@ public Object read(InLongMsgHead head) {
this.converter = converter;
}
}

@VisibleForTesting
static void configCsvInnerFormat(
DeserializationSchema<RowData> innerSchema,
boolean ignoreTrailingUnmappable,
boolean insertNullsForMissingColumns,
boolean emptyStringAsNull) {
try {
Field readerField = CsvRowDataDeserializationSchema.class.getDeclaredField("objectReader");
readerField.setAccessible(true);
ObjectReader oldReader = (ObjectReader) readerField.get(innerSchema);

Field schemaField = ObjectReader.class.getDeclaredField("_schema");
schemaField.setAccessible(true);
CsvSchema oldSchema = (CsvSchema) schemaField.get(oldReader);
ObjectReader newReader = new CsvMapper()
.configure(CsvParser.Feature.IGNORE_TRAILING_UNMAPPABLE, ignoreTrailingUnmappable)
.configure(CsvParser.Feature.INSERT_NULLS_FOR_MISSING_COLUMNS, insertNullsForMissingColumns)
.configure(CsvParser.Feature.EMPTY_STRING_AS_NULL, emptyStringAsNull)
.readerFor(JsonNode.class)
.with(oldSchema);
readerField.set(innerSchema, newReader);
} catch (Throwable t) {
log.error("failed to make csv inner format to ignore trailing unmappable, ex is ", t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@
import java.util.HashSet;
import java.util.Set;

import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.EMPTY_STRING_AS_NULL;
import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.IGNORE_PARSE_ERRORS;
import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.IGNORE_TRAILING_UNMAPPABLE;
import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.INNER_FORMAT;
import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.INSERT_NULLS_FOR_MISSING_COLUMNS;
import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.validateDecodingFormatOptions;

/**
Expand Down Expand Up @@ -64,9 +67,7 @@ public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(Conte
String innerFormatPrefix = INLONG_PREFIX + innerFormatMetaPrefix;
DecodingFormat<DeserializationSchema<RowData>> innerFormat =
innerFactory.createDecodingFormat(context, new DelegatingConfiguration(allOptions, innerFormatPrefix));
boolean ignoreErrors = formatOptions.get(IGNORE_PARSE_ERRORS);

return new InLongMsgDecodingFormat(innerFormat, innerFormatMetaPrefix, ignoreErrors);
return new InLongMsgDecodingFormat(innerFormat, innerFormatMetaPrefix, formatOptions);
}

@Override
Expand All @@ -91,6 +92,9 @@ public Set<ConfigOption<?>> requiredOptions() {
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(IGNORE_PARSE_ERRORS);
options.add(IGNORE_TRAILING_UNMAPPABLE);
options.add(INSERT_NULLS_FOR_MISSING_COLUMNS);
options.add(EMPTY_STRING_AS_NULL);
return options;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,25 @@ private InLongMsgOptions() {
.withDescription("Optional flag to skip fields and rows with parse errors instead of failing;\n"
+ "fields are set to null in case of errors");

public static final ConfigOption<Boolean> IGNORE_TRAILING_UNMAPPABLE =
ConfigOptions.key("csv.ignore-trailing-unmappable")
.booleanType()
.defaultValue(false)
.withDescription("Allows the case that real size exceeds the expected size.\n "
+ "The extra column will be skipped");

public static final ConfigOption<Boolean> INSERT_NULLS_FOR_MISSING_COLUMNS =
ConfigOptions.key("csv.insert-nulls-for-missing-columns")
.booleanType()
.defaultValue(false)
.withDescription("For missing columns, insert null.");

public static final ConfigOption<Boolean> EMPTY_STRING_AS_NULL =
ConfigOptions.key("csv.empty-string-as-null")
.booleanType()
.defaultValue(false)
.withDescription("if the string value is empty, make it as null");

public static void validateDecodingFormatOptions(ReadableConfig config) {
String innerFormat = config.get(INNER_FORMAT);
if (innerFormat == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.inlong.common.msg.InLongMsg;

import com.google.common.collect.ImmutableList;
import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.Configuration;
Expand All @@ -34,6 +35,7 @@
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.junit.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.ArrayList;
Expand Down Expand Up @@ -90,6 +92,97 @@ public void testDeserializeInLongMsg() throws Exception {
assertEquals(exceptedOutput, deData);
}

@Test
public void testIgnoreTrailing() throws IOException {
// mock data
InLongMsg inLongMsg = InLongMsg.newInLongMsg();
inLongMsg.addMsg("streamId=HAHA&t=202201011112",
"1,asdqw,heihei,2".getBytes(StandardCharsets.UTF_8));
byte[] input = inLongMsg.buildArray();
List<RowData> exceptedOutput = ImmutableList.of(
GenericRowData.of(1L, BinaryStringData.fromString("asdqw"), BinaryStringData.fromString("heihei")));
final Map<String, String> tableOptions =
InLongMsgFormatFactoryTest.getModifiedOptions(opts -> {
opts.put("inlong-msg.inner.format", "csv");
opts.put("inlong-msg.csv.ignore-trailing-unmappable", "true");
});
ResolvedSchema schema = ResolvedSchema.of(
Column.physical("id", DataTypes.BIGINT()),
Column.physical("f1", DataTypes.STRING()),
Column.physical("f2", DataTypes.STRING()));

DeserializationSchema<RowData> inLongMsgDeserializationSchema =
InLongMsgFormatFactoryTest.createDeserializationSchema(tableOptions, schema);
List<RowData> deData = new ArrayList<>();
ListCollector<RowData> out = new ListCollector<>(deData);

inLongMsgDeserializationSchema.deserialize(input, out);

assertEquals(exceptedOutput, deData);
}

@Test
public void testEmptyFieldValueAsNull() throws IOException {
// mock data
InLongMsg inLongMsg = InLongMsg.newInLongMsg();
inLongMsg.addMsg("streamId=HAHA&t=202201011112",
"1,asdqw,,2".getBytes(StandardCharsets.UTF_8));
byte[] input = inLongMsg.buildArray();
List<RowData> exceptedOutput = ImmutableList.of(
GenericRowData.of(1L, BinaryStringData.fromString("asdqw"), null, 2L));
final Map<String, String> tableOptions =
InLongMsgFormatFactoryTest.getModifiedOptions(opts -> {
opts.put("inlong-msg.inner.format", "csv");
opts.put("inlong-msg.csv.empty-string-as-null", "true");
});
ResolvedSchema schema = ResolvedSchema.of(
Column.physical("id", DataTypes.BIGINT()),
Column.physical("f1", DataTypes.STRING()),
Column.physical("f2", DataTypes.STRING()),
Column.physical("f3", DataTypes.BIGINT()));

DeserializationSchema<RowData> inLongMsgDeserializationSchema =
InLongMsgFormatFactoryTest.createDeserializationSchema(tableOptions, schema);
List<RowData> deData = new ArrayList<>();
ListCollector<RowData> out = new ListCollector<>(deData);

inLongMsgDeserializationSchema.deserialize(input, out);

assertEquals(exceptedOutput, deData);
}

@Test
public void testInserNullForMissingColumn() throws IOException {
// mock data
InLongMsg inLongMsg = InLongMsg.newInLongMsg();
inLongMsg.addMsg("streamId=HAHA&t=202201011112",
"1,asdqw,123,2".getBytes(StandardCharsets.UTF_8));
byte[] input = inLongMsg.buildArray();
List<RowData> exceptedOutput = ImmutableList.of(
GenericRowData.of(1L, BinaryStringData.fromString("asdqw"),
BinaryStringData.fromString("123"), 2L, null));
final Map<String, String> tableOptions =
InLongMsgFormatFactoryTest.getModifiedOptions(opts -> {
opts.put("inlong-msg.inner.format", "csv");
opts.put("inlong-msg.csv.insert-nulls-for-missing-columns", "true");
});
ResolvedSchema schema = ResolvedSchema.of(
Column.physical("id", DataTypes.BIGINT()),
Column.physical("f1", DataTypes.STRING()),
Column.physical("f2", DataTypes.STRING()),
Column.physical("f3", DataTypes.BIGINT()),
Column.physical("f4", DataTypes.BIGINT()));

DeserializationSchema<RowData> inLongMsgDeserializationSchema =
InLongMsgFormatFactoryTest.createDeserializationSchema(tableOptions, schema);
List<RowData> deData = new ArrayList<>();
ListCollector<RowData> out = new ListCollector<>(deData);

inLongMsgDeserializationSchema.deserialize(input, out);

assertEquals(exceptedOutput, deData);
}

@Test
public void testDeserializeInLongMsgWithError() throws Exception {
// mock data
Expand Down