diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/utils/SinkMetadataUtils.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/utils/SinkMetadataUtils.java index cd303c02349..dfadc073348 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/utils/SinkMetadataUtils.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/utils/SinkMetadataUtils.java @@ -32,6 +32,7 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; import java.util.stream.Stream; /** @@ -49,13 +50,14 @@ public class SinkMetadataUtils implements Serializable { public SinkMetadataUtils(List metadataKeys, DataType consumedDataType) { Set metadataKeySet = ImmutableSet.copyOf(metadataKeys); List metaFields = ((RowType) consumedDataType.getLogicalType()).getFields(); - + List names = metaFields.stream().map(RowType.RowField::getName).collect(Collectors.toList()); + log.info("start to config SinkMetadataUtils, metaKeys={}, consume fields={}", metadataKeys, names); // get related converters by real keys // the pos of physical column will be replaced by IcebergWritableMetadata.NULL this.converters = metaFields.stream() .map(RowType.RowField::getName) .map(key -> Stream.of(IcebergWritableMetadata.values()) - .filter(m -> m.getKey().equals(key)) + .filter(m -> m.getKey().equalsIgnoreCase(key)) .findFirst() .orElse(IcebergWritableMetadata.NULL)) .map(IcebergWritableMetadata::getConverter) @@ -65,8 +67,8 @@ public SinkMetadataUtils(List metadataKeys, DataType consumedDataType) { ImmutableBiMap.Builder builder = ImmutableBiMap.builder(); for (int i = 0; i < metaFields.size(); i++) { String name = metaFields.get(i).getName(); - if (metadataKeySet.contains(name)) { - builder.put(name, i); + if (metadataKeySet.contains(name.toLowerCase())) { + builder.put(name.toLowerCase(), i); } } this.field2posMap = builder.build(); @@ -74,6 +76,7 @@ public SinkMetadataUtils(List metadataKeys, DataType consumedDataType) { // for audit time DATA_TIME_INDEX = field2posMap.getOrDefault(Constants.META_AUDIT_DATA_TIME, -1); + log.info("find data time index={}, filed2posMap={}", DATA_TIME_INDEX, field2posMap); } public Integer getMetadataPosByName(String name) {