Skip to content

Commit

Permalink
[INLONG-9231][Sort] Find no audit time field when the field is in upp…
Browse files Browse the repository at this point in the history
…er case (apache#9232)
  • Loading branch information
vernedeng authored Nov 8, 2023
1 parent 33df4f5 commit d116f1a
Showing 1 changed file with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
Expand All @@ -49,13 +50,14 @@ public class SinkMetadataUtils implements Serializable {
public SinkMetadataUtils(List<String> metadataKeys, DataType consumedDataType) {
Set<String> metadataKeySet = ImmutableSet.copyOf(metadataKeys);
List<RowType.RowField> metaFields = ((RowType) consumedDataType.getLogicalType()).getFields();

List<String> names = metaFields.stream().map(RowType.RowField::getName).collect(Collectors.toList());
log.info("start to config SinkMetadataUtils, metaKeys={}, consume fields={}", metadataKeys, names);
// get related converters by real keys
// the pos of physical column will be replaced by IcebergWritableMetadata.NULL
this.converters = metaFields.stream()
.map(RowType.RowField::getName)
.map(key -> Stream.of(IcebergWritableMetadata.values())
.filter(m -> m.getKey().equals(key))
.filter(m -> m.getKey().equalsIgnoreCase(key))
.findFirst()
.orElse(IcebergWritableMetadata.NULL))
.map(IcebergWritableMetadata::getConverter)
Expand All @@ -65,15 +67,16 @@ public SinkMetadataUtils(List<String> metadataKeys, DataType consumedDataType) {
ImmutableBiMap.Builder<String, Integer> builder = ImmutableBiMap.builder();
for (int i = 0; i < metaFields.size(); i++) {
String name = metaFields.get(i).getName();
if (metadataKeySet.contains(name)) {
builder.put(name, i);
if (metadataKeySet.contains(name.toLowerCase())) {
builder.put(name.toLowerCase(), i);
}
}
this.field2posMap = builder.build();
this.pos2Field = field2posMap.inverse();

// for audit time
DATA_TIME_INDEX = field2posMap.getOrDefault(Constants.META_AUDIT_DATA_TIME, -1);
log.info("find data time index={}, filed2posMap={}", DATA_TIME_INDEX, field2posMap);
}

public Integer getMetadataPosByName(String name) {
Expand Down

0 comments on commit d116f1a

Please sign in to comment.