From e6e677ebe3d8b5854e0e3af9902c736c83d1576b Mon Sep 17 00:00:00 2001 From: kunni Date: Thu, 24 Nov 2022 14:39:05 +0800 Subject: [PATCH 1/3] Core,Flink: add partial read and write implementation. --- .../java/org/apache/iceberg/ContentFile.java | 2 + .../java/org/apache/iceberg/DataFile.java | 17 +- .../java/org/apache/iceberg/FileContent.java | 3 +- build.gradle | 9 - .../java/org/apache/iceberg/BaseFile.java | 19 ++ .../java/org/apache/iceberg/FileMetadata.java | 11 + .../org/apache/iceberg/GenericDataFile.java | 1 + .../org/apache/iceberg/GenericDeleteFile.java | 2 + .../org/apache/iceberg/SnapshotSummary.java | 8 + .../java/org/apache/iceberg/V2Metadata.java | 10 +- .../java/org/apache/iceberg/avro/Avro.java | 52 +++++ .../org/apache/iceberg/deletes/Deletes.java | 16 ++ .../iceberg/deletes/PartialDeleteWriter.java | 105 +++++++++ .../org/apache/iceberg/io/BaseTaskWriter.java | 41 ++++ .../iceberg/io/FileAppenderFactory.java | 14 ++ .../apache/iceberg/TestManifestReader.java | 4 +- .../iceberg/TestManifestWriterVersions.java | 2 + .../org/apache/iceberg/data/DeleteFilter.java | 114 +++++++++- .../iceberg/data/GenericAppenderFactory.java | 66 ++++++ .../iceberg/data/GenericDeleteFilter.java | 18 ++ .../org/apache/iceberg/data/FileHelpers.java | 40 ++++ .../apache/iceberg/data/PartialReadTests.java | 193 +++++++++++++++++ .../apache/iceberg/flink/FlinkWriteConf.java | 8 + .../iceberg/flink/FlinkWriteOptions.java | 3 + .../flink/sink/BaseDeltaTaskWriter.java | 40 +++- .../flink/sink/FlinkAppenderFactory.java | 94 +++++++- .../apache/iceberg/flink/sink/FlinkSink.java | 35 ++- .../flink/sink/PartitionedDeltaWriter.java | 2 + .../flink/sink/RowDataTaskWriterFactory.java | 25 ++- .../flink/sink/UnpartitionedDeltaWriter.java | 2 + .../source/RowDataFileScanTaskReader.java | 33 +++ .../iceberg/flink/source/RowDataRewriter.java | 1 + .../apache/iceberg/flink/TestFlinkUpsert.java | 39 ++-- .../flink/sink/TestCompressionSettings.java | 2 +- .../flink/sink/TestDeltaTaskWriter.java | 1 + .../flink/sink/TestFlinkAppenderFactory.java | 6 +- .../iceberg/flink/sink/TestFlinkManifest.java | 4 + .../flink/sink/TestIcebergFilesCommitter.java | 4 + .../flink/sink/TestIcebergStreamWriter.java | 2 +- .../iceberg/flink/sink/TestTaskWriters.java | 1 + .../flink/source/TestProjectMetaColumn.java | 1 + .../source/RowDataFileScanTaskReader.java | 6 + .../source/RowDataFileScanTaskReader.java | 6 + gradle.properties | 2 +- .../mr/TestInputFormatReaderDeletes2.java | 112 ++++++++++ .../main/java/org/apache/iceberg/orc/ORC.java | 58 +++++ .../org/apache/iceberg/parquet/Parquet.java | 57 +++++ .../iceberg/spark/source/RowDataReader.java | 6 + .../iceberg/spark/source/RowDataReader.java | 203 ++++++++++++++++++ .../iceberg/spark/source/BatchDataReader.java | 6 + .../iceberg/spark/source/RowDataReader.java | 6 + .../iceberg/spark/source/BaseReader.java | 6 + .../iceberg/spark/source/BaseReader.java | 6 + 53 files changed, 1462 insertions(+), 62 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/deletes/PartialDeleteWriter.java create mode 100644 data/src/test/java/org/apache/iceberg/data/PartialReadTests.java create mode 100644 mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes2.java create mode 100644 spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java diff --git a/api/src/main/java/org/apache/iceberg/ContentFile.java b/api/src/main/java/org/apache/iceberg/ContentFile.java index d214ee6eb5ba..56055178846c 100644 --- a/api/src/main/java/org/apache/iceberg/ContentFile.java +++ b/api/src/main/java/org/apache/iceberg/ContentFile.java @@ -104,6 +104,8 @@ public interface ContentFile { */ List equalityFieldIds(); + List partialFieldIds(); + /** * Returns the sort order id of this file, which describes how the file is ordered. This * information will be useful for merging data and equality delete files more efficiently when diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java b/api/src/main/java/org/apache/iceberg/DataFile.java index 59b329c500c7..4a4aade91379 100644 --- a/api/src/main/java/org/apache/iceberg/DataFile.java +++ b/api/src/main/java/org/apache/iceberg/DataFile.java @@ -102,7 +102,14 @@ public interface DataFile extends ContentFile { int PARTITION_ID = 102; String PARTITION_NAME = "partition"; String PARTITION_DOC = "Partition data tuple, schema based on the partition spec"; - // NEXT ID TO ASSIGN: 142 + + Types.NestedField PARTIAL_IDS = + optional( + 142, + "partial_ids", + ListType.ofRequired(143, IntegerType.get()), + "partial comparison field IDs"); + // NEXT ID TO ASSIGN: 144 static StructType getType(StructType partitionType) { // IDs start at 100 to leave room for changes to ManifestEntry @@ -123,7 +130,8 @@ static StructType getType(StructType partitionType) { KEY_METADATA, SPLIT_OFFSETS, EQUALITY_IDS, - SORT_ORDER_ID); + SORT_ORDER_ID, + PARTIAL_IDS); } /** @return the content stored in the file; one of DATA, POSITION_DELETES, or EQUALITY_DELETES */ @@ -136,4 +144,9 @@ default FileContent content() { default List equalityFieldIds() { return null; } + + @Override + default List partialFieldIds() { + return null; + } } diff --git a/api/src/main/java/org/apache/iceberg/FileContent.java b/api/src/main/java/org/apache/iceberg/FileContent.java index 2c9a2fa51bd2..6cb1e2a5182e 100644 --- a/api/src/main/java/org/apache/iceberg/FileContent.java +++ b/api/src/main/java/org/apache/iceberg/FileContent.java @@ -22,7 +22,8 @@ public enum FileContent { DATA(0), POSITION_DELETES(1), - EQUALITY_DELETES(2); + EQUALITY_DELETES(2), + PARTIAL_UPDATE(3); private final int id; diff --git a/build.gradle b/build.gradle index 12ed7017ecbf..75dec665ba68 100644 --- a/build.gradle +++ b/build.gradle @@ -106,15 +106,6 @@ subprojects { apply plugin: 'nebula.dependency-recommender' apply plugin: 'java-library' - if (project.name in REVAPI_PROJECTS) { - apply plugin: 'com.palantir.revapi' - revapi { - oldGroup = project.group - oldName = project.name - oldVersion = "1.0.0" - } - } - configurations { testImplementation.extendsFrom compileOnly diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java index e1a10124138b..387d455ddaaf 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFile.java +++ b/core/src/main/java/org/apache/iceberg/BaseFile.java @@ -75,6 +75,7 @@ public PartitionData copy() { private int[] equalityIds = null; private byte[] keyMetadata = null; private Integer sortOrderId; + private int[] partialIds = null; // cached schema private transient Schema avroSchema = null; @@ -132,6 +133,7 @@ public PartitionData copy() { Map upperBounds, List splitOffsets, int[] equalityFieldIds, + int[] partialIds, Integer sortOrderId, ByteBuffer keyMetadata) { this.partitionSpecId = specId; @@ -159,6 +161,7 @@ public PartitionData copy() { this.upperBounds = SerializableByteBufferMap.wrap(upperBounds); this.splitOffsets = ArrayUtil.toLongArray(splitOffsets); this.equalityIds = equalityFieldIds; + this.partialIds = partialIds; this.sortOrderId = sortOrderId; this.keyMetadata = ByteBuffers.toByteArray(keyMetadata); } @@ -208,6 +211,11 @@ public PartitionData copy() { ? Arrays.copyOf(toCopy.equalityIds, toCopy.equalityIds.length) : null; this.sortOrderId = toCopy.sortOrderId; + + this.partialIds = + toCopy.partialIds != null + ? Arrays.copyOf(toCopy.partialIds, toCopy.partialIds.length) + : null; } /** Constructor for Java serialization. */ @@ -294,6 +302,9 @@ public void put(int i, Object value) { this.sortOrderId = (Integer) value; return; case 17: + this.partialIds = ArrayUtil.toIntArray((List) value); + return; + case 18: this.fileOrdinal = (long) value; return; default: @@ -349,6 +360,8 @@ public Object get(int i) { case 16: return sortOrderId; case 17: + return partialFieldIds(); + case 18: return fileOrdinal; default: throw new UnsupportedOperationException("Unknown field ordinal: " + pos); @@ -445,6 +458,11 @@ public List equalityFieldIds() { return ArrayUtil.toIntList(equalityIds); } + @Override + public List partialFieldIds() { + return ArrayUtil.toIntList(partialIds); + } + @Override public Integer sortOrderId() { return sortOrderId; @@ -478,6 +496,7 @@ public String toString() { .add("split_offsets", splitOffsets == null ? "null" : splitOffsets()) .add("equality_ids", equalityIds == null ? "null" : equalityFieldIds()) .add("sort_order_id", sortOrderId) + .add("partial_ids", equalityIds == null ? "null" : partialFieldIds()) .toString(); } } diff --git a/core/src/main/java/org/apache/iceberg/FileMetadata.java b/core/src/main/java/org/apache/iceberg/FileMetadata.java index 7d025fa8a630..f8170791f5c0 100644 --- a/core/src/main/java/org/apache/iceberg/FileMetadata.java +++ b/core/src/main/java/org/apache/iceberg/FileMetadata.java @@ -41,6 +41,7 @@ public static class Builder { private final int specId; private FileContent content = null; private int[] equalityFieldIds = null; + private int[] partialFieldIds = null; private PartitionData partitionData; private String filePath = null; private FileFormat format = null; @@ -116,6 +117,13 @@ public Builder ofEqualityDeletes(int... fieldIds) { return this; } + public Builder ofPartialDeletes(int[] newEqualityFieldIds, int[] newPartialFieldIds) { + this.content = FileContent.PARTIAL_UPDATE; + this.equalityFieldIds = newEqualityFieldIds; + this.partialFieldIds = newPartialFieldIds; + return this; + } + public Builder withStatus(FileStatus stat) { this.filePath = stat.getPath().toString(); this.fileSizeInBytes = stat.getLen(); @@ -222,6 +230,8 @@ public DeleteFile build() { sortOrderId == null, "Position delete file should not have sort order"); break; case EQUALITY_DELETES: + + case PARTIAL_UPDATE: if (sortOrderId == null) { sortOrderId = SortOrder.unsorted().orderId(); } @@ -246,6 +256,7 @@ public DeleteFile build() { lowerBounds, upperBounds), equalityFieldIds, + partialFieldIds, sortOrderId, keyMetadata); } diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java b/core/src/main/java/org/apache/iceberg/GenericDataFile.java index 34c65e669fb2..5d39535fc4ae 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java @@ -57,6 +57,7 @@ class GenericDataFile extends BaseFile implements DataFile { metrics.upperBounds(), splitOffsets, null, + null, sortOrderId, keyMetadata); } diff --git a/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java index 1b4effba642c..ce6b7aa440a9 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java @@ -39,6 +39,7 @@ class GenericDeleteFile extends BaseFile implements DeleteFile { long fileSizeInBytes, Metrics metrics, int[] equalityFieldIds, + int[] partialFieldIds, Integer sortOrderId, ByteBuffer keyMetadata) { super( @@ -57,6 +58,7 @@ class GenericDeleteFile extends BaseFile implements DeleteFile { metrics.upperBounds(), null, equalityFieldIds, + partialFieldIds, sortOrderId, keyMetadata); } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java index 460e67430b2f..6f1e317cb982 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java @@ -221,12 +221,14 @@ private static class UpdateMetrics { private int addedPosDeleteFiles = 0; private int removedPosDeleteFiles = 0; private int addedDeleteFiles = 0; + private int addedPartialFiles = 0; private int removedDeleteFiles = 0; private long addedRecords = 0L; private long deletedRecords = 0L; private long addedPosDeletes = 0L; private long removedPosDeletes = 0L; private long addedEqDeletes = 0L; + private long addedPartialUpdates = 0L; private long removedEqDeletes = 0L; private boolean trustSizeAndDeleteCounts = true; @@ -290,6 +292,12 @@ void addedFile(ContentFile file) { this.addedEqDeleteFiles += 1; this.addedEqDeletes += file.recordCount(); break; + case PARTIAL_UPDATE: + this.addedDeleteFiles += 1; + this.addedPartialFiles += 1; + this.addedPartialUpdates += file.recordCount(); + break; + default: throw new UnsupportedOperationException( "Unsupported file content type: " + file.content()); diff --git a/core/src/main/java/org/apache/iceberg/V2Metadata.java b/core/src/main/java/org/apache/iceberg/V2Metadata.java index 64ab0fe94bd1..2d5fe73ac3c5 100644 --- a/core/src/main/java/org/apache/iceberg/V2Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V2Metadata.java @@ -272,7 +272,8 @@ static Types.StructType fileType(Types.StructType partitionType) { DataFile.KEY_METADATA, DataFile.SPLIT_OFFSETS, DataFile.EQUALITY_IDS, - DataFile.SORT_ORDER_ID); + DataFile.SORT_ORDER_ID, + DataFile.PARTIAL_IDS); } static class IndexedManifestEntry> @@ -456,6 +457,8 @@ public Object get(int pos) { return wrapped.equalityFieldIds(); case 15: return wrapped.sortOrderId(); + case 16: + return wrapped.partialFieldIds(); } throw new IllegalArgumentException("Unknown field ordinal: " + pos); } @@ -550,6 +553,11 @@ public List equalityFieldIds() { return wrapped.equalityFieldIds(); } + @Override + public List partialFieldIds() { + return wrapped.partialFieldIds(); + } + @Override public Integer sortOrderId() { return wrapped.sortOrderId(); diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index 85cc8d902026..ec3c38dcfaff 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -53,6 +53,7 @@ import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PartialDeleteWriter; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptionKeyMetadata; @@ -377,6 +378,7 @@ public static class DeleteWriteBuilder { private StructLike partition; private EncryptionKeyMetadata keyMetadata = null; private int[] equalityFieldIds = null; + private int[] partialFieldIds = null; private SortOrder sortOrder; private DeleteWriteBuilder(OutputFile file) { @@ -461,6 +463,11 @@ public DeleteWriteBuilder equalityFieldIds(int... fieldIds) { return this; } + public DeleteWriteBuilder partialFieldIds(int... fieldIds) { + this.partialFieldIds = fieldIds; + return this; + } + public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) { this.sortOrder = newSortOrder; return this; @@ -503,6 +510,51 @@ public EqualityDeleteWriter buildEqualityWriter() throws IOException { equalityFieldIds); } + public PartialDeleteWriter buildPartialWriter() throws IOException { + Preconditions.checkState( + rowSchema != null, "Cannot create equality delete file without a schema"); + Preconditions.checkState( + equalityFieldIds != null, "Cannot create partial file without delete field ids"); + Preconditions.checkState( + partialFieldIds != null, "Cannot create partial file without partial field ids"); + Preconditions.checkState( + createWriterFunc != null, "Cannot create partial file unless createWriterFunc is set"); + Preconditions.checkArgument( + spec != null, "Spec must not be null when creating partial writer"); + Preconditions.checkArgument( + spec.isUnpartitioned() || partition != null, + "Partition must not be null for partitioned writes"); + + meta("delete-type", "partial"); + meta( + "delete-field-ids", + IntStream.of(equalityFieldIds) + .mapToObj(Objects::toString) + .collect(Collectors.joining(", "))); + + meta( + "partial-field-ids", + IntStream.of(partialFieldIds) + .mapToObj(Objects::toString) + .collect(Collectors.joining(", "))); + + // the appender uses the row schema without extra columns + appenderBuilder.schema(rowSchema); + appenderBuilder.createWriterFunc(createWriterFunc); + appenderBuilder.createContextFunc(WriteBuilder.Context::deleteContext); + + return new PartialDeleteWriter<>( + appenderBuilder.build(), + FileFormat.AVRO, + location, + spec, + partition, + keyMetadata, + sortOrder, + equalityFieldIds, + partialFieldIds); + } + public PositionDeleteWriter buildPositionWriter() throws IOException { Preconditions.checkState( equalityFieldIds == null, "Cannot create position delete file using delete field ids"); diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java index 7fc118d17a03..929365e4e079 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java @@ -38,7 +38,9 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Filter; import org.apache.iceberg.util.SortedMerge; +import org.apache.iceberg.util.StructLikeMap; import org.apache.iceberg.util.StructLikeSet; +import org.apache.iceberg.util.StructProjection; public class Deletes { private static final Schema POSITION_DELETE_SCHEMA = @@ -124,6 +126,20 @@ protected boolean shouldKeep(T item) { return remainingRowsFilter.filter(rows); } + public static StructLikeMap toPartialMap( + CloseableIterable eqDeletes, + Types.StructType eqType, + StructProjection projectRow) { + try (CloseableIterable deletes = eqDeletes) { + StructLikeMap objectStructLikeMap = StructLikeMap.create(eqType); + deletes.forEach(delete -> objectStructLikeMap.put(projectRow.wrap(delete), delete)); + + return objectStructLikeMap; + } catch (IOException e) { + throw new UncheckedIOException("Failed to close equality delete source", e); + } + } + public static StructLikeSet toEqualitySet( CloseableIterable eqDeletes, Types.StructType eqType) { try (CloseableIterable deletes = eqDeletes) { diff --git a/core/src/main/java/org/apache/iceberg/deletes/PartialDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/PartialDeleteWriter.java new file mode 100644 index 000000000000..fa3e39a8f1b7 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/deletes/PartialDeleteWriter.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.deletes; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileMetadata; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.io.DeleteWriteResult; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class PartialDeleteWriter implements FileWriter { + private final FileAppender appender; + private final FileFormat format; + private final String location; + private final PartitionSpec spec; + private final StructLike partition; + private final ByteBuffer keyMetadata; + private final int[] equalityFieldIds; + private final int[] partialFieldIds; + private final SortOrder sortOrder; + private DeleteFile deleteFile = null; + + public PartialDeleteWriter( + FileAppender appender, + FileFormat format, + String location, + PartitionSpec spec, + StructLike partition, + EncryptionKeyMetadata keyMetadata, + SortOrder sortOrder, + int[] equalityFieldIds, + int[] partialFieldIds) { + this.appender = appender; + this.format = format; + this.location = location; + this.spec = spec; + this.partition = partition; + this.keyMetadata = keyMetadata != null ? keyMetadata.buffer() : null; + this.sortOrder = sortOrder; + this.equalityFieldIds = equalityFieldIds; + this.partialFieldIds = partialFieldIds; + } + + @Override + public void write(T row) { + appender.add(row); + } + + @Override + public long length() { + return appender.length(); + } + + @Override + public void close() throws IOException { + if (deleteFile == null) { + appender.close(); + this.deleteFile = + FileMetadata.deleteFileBuilder(spec) + .ofPartialDeletes(equalityFieldIds, partialFieldIds) + .withFormat(format) + .withPath(location) + .withPartition(partition) + .withEncryptionKeyMetadata(keyMetadata) + .withFileSizeInBytes(appender.length()) + .withMetrics(appender.metrics()) + .withSortOrder(sortOrder) + .build(); + } + } + + public DeleteFile toDeleteFile() { + Preconditions.checkState(deleteFile != null, "Cannot create delete file from unclosed writer"); + return deleteFile; + } + + @Override + public DeleteWriteResult result() { + return new DeleteWriteResult(toDeleteFile()); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java index bba2c5355a98..e3c1c2223b88 100644 --- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java @@ -30,6 +30,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PartialDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -111,6 +112,7 @@ protected abstract class BaseEqualityDeltaWriter implements Closeable { private RollingEqDeleteWriter eqDeleteWriter; private SortedPosDeleteWriter posDeleteWriter; private Map insertedRowMap; + private RollingPartialDeleteWriter partialDeleteWriter; protected BaseEqualityDeltaWriter(StructLike partition, Schema schema, Schema deleteSchema) { Preconditions.checkNotNull(schema, "Iceberg table schema cannot be null."); @@ -121,6 +123,7 @@ protected BaseEqualityDeltaWriter(StructLike partition, Schema schema, Schema de this.eqDeleteWriter = new RollingEqDeleteWriter(partition); this.posDeleteWriter = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, partition); + this.partialDeleteWriter = new RollingPartialDeleteWriter(partition); this.insertedRowMap = StructLikeMap.create(deleteSchema.asStruct()); } @@ -146,6 +149,10 @@ public void write(T row) throws IOException { dataWriter.write(row); } + public void update(T row) throws IOException { + partialDeleteWriter.write(row); + } + /** * Write the pos-delete if there's an existing row matching the given key. * @@ -208,6 +215,14 @@ public void close() throws IOException { } } + if (partialDeleteWriter != null) { + try { + partialDeleteWriter.close(); + } finally { + partialDeleteWriter = null; + } + } + if (insertedRowMap != null) { insertedRowMap.clear(); insertedRowMap = null; @@ -393,4 +408,30 @@ void complete(EqualityDeleteWriter closedWriter) { completedDeleteFiles.add(closedWriter.toDeleteFile()); } } + + protected class RollingPartialDeleteWriter extends BaseRollingWriter> { + RollingPartialDeleteWriter(StructLike partitionKey) { + super(partitionKey); + } + + @Override + PartialDeleteWriter newWriter(EncryptedOutputFile file, StructLike partitionKey) { + return appenderFactory.newPartialWriter(file, format, partitionKey); + } + + @Override + long length(PartialDeleteWriter writer) { + return writer.length(); + } + + @Override + void write(PartialDeleteWriter writer, T record) { + writer.write(record); + } + + @Override + void complete(PartialDeleteWriter closedWriter) { + completedDeleteFiles.add(closedWriter.toDeleteFile()); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java b/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java index 59b0b4b3bf6a..3b51843635d4 100644 --- a/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java @@ -21,6 +21,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.StructLike; import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PartialDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; @@ -62,6 +63,19 @@ DataWriter newDataWriter( EqualityDeleteWriter newEqDeleteWriter( EncryptedOutputFile outputFile, FileFormat format, StructLike partition); + /** + * Create a new {@link PartialDeleteWriter}. + * + * @param outputFile an OutputFile used to create an output stream. + * @param format a file format + * @param partition a tuple of partition values + * @return a newly created {@link PartialDeleteWriter} for partial updates. + */ + default PartialDeleteWriter newPartialWriter( + EncryptedOutputFile outputFile, FileFormat format, StructLike partition) { + throw new UnsupportedOperationException("Not implemented yet."); + } + /** * Create a new {@link PositionDeleteWriter}. * diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java b/core/src/test/java/org/apache/iceberg/TestManifestReader.java index dfc84200fdb2..c45ec15e6cac 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java @@ -131,7 +131,7 @@ public void testDataFilePositions() throws IOException { for (DataFile file : reader) { Assert.assertEquals("Position should match", (Long) expectedPos, file.pos()); Assert.assertEquals( - "Position from field index should match", expectedPos, ((BaseFile) file).get(17)); + "Position from field index should match", expectedPos, ((BaseFile) file).get(18)); expectedPos += 1; } } @@ -148,7 +148,7 @@ public void testDeleteFilePositions() throws IOException { for (DeleteFile file : reader) { Assert.assertEquals("Position should match", (Long) expectedPos, file.pos()); Assert.assertEquals( - "Position from field index should match", expectedPos, ((BaseFile) file).get(17)); + "Position from field index should match", expectedPos, ((BaseFile) file).get(18)); expectedPos += 1; } } diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java index 8f9cff01967b..f59fa4d67144 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java @@ -81,6 +81,7 @@ public class TestManifestWriterVersions { private static final List EQUALITY_IDS = ImmutableList.of(1); private static final int[] EQUALITY_ID_ARR = new int[] {1}; + private static final int[] PARTIAL_ID_ARR = new int[] {1}; private static final DeleteFile DELETE_FILE = new GenericDeleteFile( @@ -92,6 +93,7 @@ public class TestManifestWriterVersions { 22905L, METRICS, EQUALITY_ID_ARR, + PARTIAL_ID_ARR, SORT_ORDER_ID, null); diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index a7979fd2ed3e..81a5d54f08cb 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -41,6 +41,7 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.base.Function; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -51,6 +52,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.StructLikeMap; import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.StructProjection; import org.slf4j.Logger; @@ -66,7 +69,8 @@ public abstract class DeleteFilter { private final String filePath; private final List posDeletes; private final List eqDeletes; - private final Schema requiredSchema; + private final List partialUpdates; + private final Schema requiredAllSchema; private final Accessor posAccessor; private final boolean hasIsDeletedColumn; private final int isDeletedColumnPosition; @@ -74,6 +78,7 @@ public abstract class DeleteFilter { private PositionDeleteIndex deleteRowPositions = null; private List> isInDeleteSets = null; + private Set> partialDataSet = null; private Predicate eqDeleteRows = null; protected DeleteFilter( @@ -88,6 +93,7 @@ protected DeleteFilter( ImmutableList.Builder posDeleteBuilder = ImmutableList.builder(); ImmutableList.Builder eqDeleteBuilder = ImmutableList.builder(); + ImmutableList.Builder partialBuilder = ImmutableList.builder(); for (DeleteFile delete : deletes) { switch (delete.content()) { case POSITION_DELETES: @@ -98,6 +104,10 @@ protected DeleteFilter( LOG.debug("Adding equality delete file {} to filter", delete.path()); eqDeleteBuilder.add(delete); break; + case PARTIAL_UPDATE: + LOG.debug("Adding partial file {} to filter", delete.path()); + partialBuilder.add(delete); + break; default: throw new UnsupportedOperationException( "Unknown delete file content: " + delete.content()); @@ -106,11 +116,12 @@ protected DeleteFilter( this.posDeletes = posDeleteBuilder.build(); this.eqDeletes = eqDeleteBuilder.build(); - this.requiredSchema = fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes); - this.posAccessor = requiredSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId()); + this.partialUpdates = partialBuilder.build(); + this.requiredAllSchema = fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes); + this.posAccessor = requiredAllSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId()); this.hasIsDeletedColumn = - requiredSchema.findField(MetadataColumns.IS_DELETED.fieldId()) != null; - this.isDeletedColumnPosition = requiredSchema.columns().indexOf(MetadataColumns.IS_DELETED); + requiredAllSchema.findField(MetadataColumns.IS_DELETED.fieldId()) != null; + this.isDeletedColumnPosition = requiredAllSchema.columns().indexOf(MetadataColumns.IS_DELETED); } protected DeleteFilter( @@ -123,7 +134,7 @@ protected int columnIsDeletedPosition() { } public Schema requiredSchema() { - return requiredSchema; + return requiredAllSchema; } public boolean hasPosDeletes() { @@ -144,6 +155,9 @@ Accessor posAccessor() { protected abstract StructLike asStructLike(T record); + protected abstract T combineRecord( + T record, StructLike partialRecord, Schema partialSchema, Schema requiredSchema); + protected abstract InputFile getInputFile(String location); protected long pos(T record) { @@ -151,7 +165,7 @@ protected long pos(T record) { } public CloseableIterable filter(CloseableIterable records) { - return applyEqDeletes(applyPosDeletes(records)); + return applyPartialUpdate(applyEqDeletes(applyPosDeletes(records))); } private List> applyEqDeletes() { @@ -175,11 +189,11 @@ private List> applyEqDeletes() { Set ids = entry.getKey(); Iterable deletes = entry.getValue(); - Schema deleteSchema = TypeUtil.select(requiredSchema, ids); + Schema deleteSchema = TypeUtil.select(requiredAllSchema, ids); InternalRecordWrapper wrapper = new InternalRecordWrapper(deleteSchema.asStruct()); // a projection to select and reorder fields of the file schema to match the delete rows - StructProjection projectRow = StructProjection.create(requiredSchema, deleteSchema); + StructProjection projectRow = StructProjection.create(requiredAllSchema, deleteSchema); Iterable> deleteRecords = Iterables.transform(deletes, delete -> openDeletes(delete, deleteSchema)); @@ -200,6 +214,71 @@ record -> deleteSet.contains(projectRow.wrap(asStructLike(record))); return isInDeleteSets; } + private Set> applyPartialMap() { + if (partialDataSet != null) { + return partialDataSet; + } + + partialDataSet = Sets.newHashSet(); + if (partialUpdates.isEmpty()) { + return partialDataSet; + } + + Multimap, Set>, DeleteFile> filesByPartialIds = + Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList); + for (DeleteFile delete : partialUpdates) { + Set eqIds = Sets.newHashSet(delete.equalityFieldIds()); + Set partialIds = Sets.newHashSet(delete.partialFieldIds()); + filesByPartialIds.put(Pair.of(eqIds, partialIds), delete); + } + + for (Map.Entry, Set>, Collection> entry : + filesByPartialIds.asMap().entrySet()) { + Pair, Set> pairIds = entry.getKey(); + Iterable deletes = entry.getValue(); + Set eqIds = pairIds.first(); + Set partialIds = pairIds.second(); + Set partialUpdateIds = Sets.newHashSet(eqIds); + partialUpdateIds.addAll(partialIds); + + Schema partialUpdateSchema = TypeUtil.select(requiredAllSchema, partialUpdateIds); + Schema eqSchema = TypeUtil.select(requiredAllSchema, eqIds); + Schema partialSchema = TypeUtil.select(requiredAllSchema, partialIds); + + InternalRecordWrapper wrapper = new InternalRecordWrapper(partialUpdateSchema.asStruct()); + StructProjection projectRow = StructProjection.create(partialUpdateSchema, eqSchema); + + Iterable> deleteRecords = + Iterables.transform(deletes, delete -> openDeletes(delete, partialUpdateSchema)); + + CloseableIterable records = + CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy); + + StructLikeMap deleteSet = + Deletes.toPartialMap( + CloseableIterable.transform(records, wrapper::copyFor), + eqSchema.asStruct(), + projectRow); + + StructProjection eqProjectFromRequired = StructProjection.create(requiredAllSchema, eqSchema); + StructProjection partialProject = StructProjection.create(partialUpdateSchema, partialSchema); + + partialDataSet.add( + record -> { + StructProjection wrap = eqProjectFromRequired.wrap(asStructLike(record)); + StructLike structLike = deleteSet.get(wrap); + + if (structLike != null) { + StructLike partialRecord = partialProject.wrap(structLike); + return combineRecord(record, partialRecord, partialSchema, requiredAllSchema); + } + return null; + }); + } + + return partialDataSet; + } + public CloseableIterable findEqualityDeleteRows(CloseableIterable records) { // Predicate to test whether a row has been deleted by equality deletions. Predicate deletedRows = applyEqDeletes().stream().reduce(Predicate::or).orElse(t -> false); @@ -213,6 +292,23 @@ private CloseableIterable applyEqDeletes(CloseableIterable records) { return createDeleteIterable(records, isEqDeleted); } + private CloseableIterable applyPartialUpdate(CloseableIterable records) { + Set> partialCombines = applyPartialMap(); + + return CloseableIterable.transform( + records, + record -> { + T result = record; + for (Function partialCombine : partialCombines) { + T temp = partialCombine.apply(record); + if (temp != null) { + result = temp; + } + } + return result; + }); + } + protected void markRowDeleted(T item) { throw new UnsupportedOperationException( this.getClass().getName() + " does not implement markRowDeleted"); diff --git a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java index 23a94ebc9944..eea88e36b17a 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java @@ -31,6 +31,7 @@ import org.apache.iceberg.data.orc.GenericOrcWriter; import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PartialDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.io.FileAppender; @@ -47,8 +48,12 @@ public class GenericAppenderFactory implements FileAppenderFactory { private final Schema schema; private final PartitionSpec spec; private final int[] equalityFieldIds; + private final int[] partialFieldIds; + private final int[] partialFullFieldIds; private final Schema eqDeleteRowSchema; private final Schema posDeleteRowSchema; + private final Schema partialDataSchema; + private final Schema partialFullSchema; private final Map config = Maps.newHashMap(); public GenericAppenderFactory(Schema schema) { @@ -65,11 +70,37 @@ public GenericAppenderFactory( int[] equalityFieldIds, Schema eqDeleteRowSchema, Schema posDeleteRowSchema) { + this( + schema, + spec, + equalityFieldIds, + null, + null, + eqDeleteRowSchema, + posDeleteRowSchema, + null, + null); + } + + public GenericAppenderFactory( + Schema schema, + PartitionSpec spec, + int[] equalityFieldIds, + int[] partialFieldIds, + int[] partialFullFieldIds, + Schema eqDeleteRowSchema, + Schema posDeleteRowSchema, + Schema partialDataSchema, + Schema partialFullSchema) { this.schema = schema; this.spec = spec; this.equalityFieldIds = equalityFieldIds; + this.partialFieldIds = partialFieldIds; + this.partialFullFieldIds = partialFullFieldIds; this.eqDeleteRowSchema = eqDeleteRowSchema; this.posDeleteRowSchema = posDeleteRowSchema; + this.partialDataSchema = partialDataSchema; + this.partialFullSchema = partialFullSchema; } public GenericAppenderFactory set(String property, String value) { @@ -135,6 +166,41 @@ public org.apache.iceberg.io.DataWriter newDataWriter( file.keyMetadata()); } + @Override + public PartialDeleteWriter newPartialWriter( + EncryptedOutputFile file, FileFormat format, StructLike partition) { + Preconditions.checkState( + equalityFieldIds != null && equalityFieldIds.length > 0, + "Equality field ids shouldn't be null or empty when creating equality-delete writer"); + Preconditions.checkNotNull( + eqDeleteRowSchema, + "Equality delete row schema shouldn't be null when creating equality-delete writer"); + + MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); + try { + switch (format) { + case AVRO: + return Avro.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc(DataWriter::create) + .withPartition(partition) + .overwrite() + .setAll(config) + .rowSchema(partialFullSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .equalityFieldIds(equalityFieldIds) + .partialFieldIds(partialFieldIds) + .buildPartialWriter(); + + default: + throw new UnsupportedOperationException( + "Cannot write equality-deletes for unsupported file format: " + format); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + @Override public EqualityDeleteWriter newEqDeleteWriter( EncryptedOutputFile file, FileFormat format, StructLike partition) { diff --git a/data/src/main/java/org/apache/iceberg/data/GenericDeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/GenericDeleteFilter.java index 0779ed09ce1e..18d9b204d49f 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericDeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericDeleteFilter.java @@ -18,11 +18,15 @@ */ package org.apache.iceberg.data; +import java.util.List; +import java.util.Map; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; public class GenericDeleteFilter extends DeleteFilter { private final FileIO io; @@ -45,6 +49,20 @@ protected StructLike asStructLike(Record record) { return asStructLike.wrap(record); } + @Override + protected Record combineRecord( + Record record, StructLike partialRecord, Schema partialSchema, Schema requiredSchema) { + Map overwriteValues = + Maps.newHashMapWithExpectedSize(partialSchema.columns().size()); + + List columns = partialSchema.columns(); + for (int i = 0; i < columns.size(); i++) { + overwriteValues.put(columns.get(i).name(), partialRecord.get(i, Object.class)); + } + + return record.copy(overwriteValues); + } + @Override protected InputFile getInputFile(String location) { return io.newInputFile(location); diff --git a/data/src/test/java/org/apache/iceberg/data/FileHelpers.java b/data/src/test/java/org/apache/iceberg/data/FileHelpers.java index 6531441fa57f..e1f872736f72 100644 --- a/data/src/test/java/org/apache/iceberg/data/FileHelpers.java +++ b/data/src/test/java/org/apache/iceberg/data/FileHelpers.java @@ -33,6 +33,7 @@ import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PartialDeleteWriter; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedFiles; @@ -100,6 +101,45 @@ public static DeleteFile writeDeleteFile( return writer.toDeleteFile(); } + public static DeleteFile writePartialFile( + Table table, + OutputFile out, + StructLike partition, + List deletes, + Schema equalitySchema, + Schema partialDataSchema, + Schema partialFullSchema) + throws IOException { + FileFormat format = defaultFormat(table.properties()); + int[] equalityFieldIds = + equalitySchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray(); + + int[] partialFieldIds = + partialDataSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray(); + + int[] partialFullFieldIds = + partialFullSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray(); + + FileAppenderFactory factory = + new GenericAppenderFactory( + table.schema(), + table.spec(), + equalityFieldIds, + partialFieldIds, + partialFullFieldIds, + equalitySchema, + null, + partialDataSchema, + partialFullSchema); + + PartialDeleteWriter writer = factory.newPartialWriter(encrypt(out), format, partition); + try (Closeable toClose = writer) { + writer.write(deletes); + } + + return writer.toDeleteFile(); + } + public static DataFile writeDataFile(Table table, OutputFile out, List rows) throws IOException { FileFormat format = defaultFormat(table.properties()); diff --git a/data/src/test/java/org/apache/iceberg/data/PartialReadTests.java b/data/src/test/java/org/apache/iceberg/data/PartialReadTests.java new file mode 100644 index 000000000000..ded84e6cd66d --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/data/PartialReadTests.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.io.IOException; +import java.time.LocalDate; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TestHelpers.Row; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.StructLikeSet; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public abstract class PartialReadTests { + // Schema passed to create tables + public static final Schema DATE_SCHEMA = + new Schema( + Types.NestedField.required(1, "dt", Types.DateType.get()), + Types.NestedField.required(2, "data", Types.StringType.get()), + Types.NestedField.required(3, "data2", Types.StringType.get()), + Types.NestedField.required(4, "id", Types.IntegerType.get())); + + // Partition spec used to create tables + public static final PartitionSpec DATE_SPEC = + PartitionSpec.builderFor(DATE_SCHEMA).day("dt").build(); + + @Rule public TemporaryFolder temp = new TemporaryFolder(); + + protected String tableName = null; + protected String dateTableName = null; + protected Table table = null; + protected Table dateTable = null; + protected List records = null; + private List dateRecords = null; + protected DataFile dataFile = null; + + @Before + public void writeTestDataFile() throws IOException { + this.dateTableName = "test2"; + this.dateTable = createTable(dateTableName, DATE_SCHEMA, DATE_SPEC); + + GenericRecord record = GenericRecord.create(dateTable.schema()); + this.dateRecords = Lists.newArrayList(); + + Map overwriteValues = Maps.newHashMapWithExpectedSize(4); + overwriteValues.put("dt", LocalDate.parse("2021-09-01")); + overwriteValues.put("data", "a"); + overwriteValues.put("data2", "a2"); + overwriteValues.put("id", 1); + dateRecords.add(record.copy(overwriteValues)); + + overwriteValues.put("dt", LocalDate.parse("2021-09-02")); + overwriteValues.put("data", "b"); + overwriteValues.put("data2", "b2"); + overwriteValues.put("id", 2); + dateRecords.add(record.copy(overwriteValues)); + + DataFile dataFile1 = + FileHelpers.writeDataFile( + dateTable, + Files.localOutput(temp.newFile()), + Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-01"))), + dateRecords); + + dateTable.newAppend().appendFile(dataFile1).commit(); + } + + @After + public void cleanup() throws IOException { + dropTable("test"); + dropTable("test2"); + } + + protected abstract Table createTable(String name, Schema schema, PartitionSpec spec) + throws IOException; + + protected abstract void dropTable(String name) throws IOException; + + protected abstract StructLikeSet rowSet(String name, Table testTable, String... columns) + throws IOException; + + protected boolean countPartial() { + return false; + } + + /** + * This will only be called after calling rowSet(String, Table, String...), and only if + * countDeletes() is true. + */ + protected long deleteCount() { + return 0L; + } + + protected void checkPartialCount(long expectedPartials) { + if (countPartial()) { + long actualPartials = deleteCount(); + Assert.assertEquals( + "Table should contain expected number of deletes", expectedPartials, actualPartials); + } + } + + // todo need more tests for partial update + + @Test + public void testPartialData() throws IOException { + Schema partialFullSchema = dateTable.schema().select("dt", "data", "id"); + Schema eqDeleteSchema = dateTable.schema().select("dt", "id"); + Schema partialDataSchema = dateTable.schema().select("data"); + + Record dataPartial = GenericRecord.create(partialFullSchema); + List dataDeletes = + Lists.newArrayList( + dataPartial.copy("dt", LocalDate.parse("2021-09-01"), "data", "a3", "id", 1)); + + DeleteFile partialFile = + FileHelpers.writePartialFile( + dateTable, + Files.localOutput(temp.newFile()), + Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-01"))), + dataDeletes, + eqDeleteSchema, + partialDataSchema, + partialFullSchema); + + dateTable.newRowDelta().addDeletes(partialFile).commit(); + + List expectedDateRecords = Lists.newArrayList(); + GenericRecord record = GenericRecord.create(dateTable.schema()); + + Map overwriteValues = Maps.newHashMapWithExpectedSize(4); + overwriteValues.put("dt", LocalDate.parse("2021-09-01")); + overwriteValues.put("data", "a3"); + overwriteValues.put("data2", "a2"); + overwriteValues.put("id", 1); + expectedDateRecords.add(record.copy(overwriteValues)); + + overwriteValues.put("dt", LocalDate.parse("2021-09-02")); + overwriteValues.put("data", "b"); + overwriteValues.put("data2", "b2"); + overwriteValues.put("id", 2); + expectedDateRecords.add(record.copy(overwriteValues)); + + StructLikeSet expected = rowSetWithoutIds(dateTable, expectedDateRecords); + + StructLikeSet actual = rowSet(dateTableName, dateTable, "*"); + + Assert.assertEquals("Table should contain expected rows", expected, actual); + } + + protected static StructLikeSet rowSetWithoutIds( + Table table, List recordList, int... idsToRemove) { + Set deletedIds = Sets.newHashSet(ArrayUtil.toIntList(idsToRemove)); + StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); + recordList.stream() + .filter(row -> !deletedIds.contains(row.getField("id"))) + .map(record -> new InternalRecordWrapper(table.schema().asStruct()).wrap(record)) + .forEach(set::add); + return set; + } +} diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java index 448b2aa2d8ef..4fec1db536e8 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java @@ -172,4 +172,12 @@ public int workerPoolSize() { .defaultValue(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue()) .parse(); } + + public String partialIds() { + return confParser + .stringConf() + .option(FlinkWriteOptions.PARTIAL_IDS.key()) + .defaultValue("") + .parse(); + } } diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java index f3cc52972bfe..00d6d3894625 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java @@ -56,4 +56,7 @@ private FlinkWriteOptions() {} // Overrides the table's write.distribution-mode public static final ConfigOption DISTRIBUTION_MODE = ConfigOptions.key("distribution-mode").stringType().noDefaultValue(); + + public static final ConfigOption PARTIAL_IDS = + ConfigOptions.key("partial-ids").stringType().noDefaultValue(); } diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java index b8786f259a9c..f547bf4a5c72 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.util.List; +import java.util.Set; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileFormat; @@ -30,6 +32,7 @@ import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataWrapper; import org.apache.iceberg.flink.data.RowDataProjection; +import org.apache.iceberg.flink.data.RowDataUtil; import org.apache.iceberg.io.BaseTaskWriter; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.FileIO; @@ -45,6 +48,11 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter { private final RowDataWrapper keyWrapper; private final RowDataProjection keyProjection; private final boolean upsert; + private boolean partial; + private List equalityFieldIds; + private List partialFieldIds; + private Set fullFieldIds = Sets.newHashSet(); + private Schema fullSchema; BaseDeltaTaskWriter( PartitionSpec spec, @@ -56,6 +64,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter { Schema schema, RowType flinkSchema, List equalityFieldIds, + List partialFieldIds, boolean upsert) { super(spec, format, appenderFactory, fileFactory, io, targetFileSize); this.schema = schema; @@ -65,6 +74,14 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter { new RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), deleteSchema.asStruct()); this.keyProjection = RowDataProjection.create(schema, deleteSchema); this.upsert = upsert; + this.equalityFieldIds = equalityFieldIds; + this.partialFieldIds = partialFieldIds; + if (partialFieldIds != null && partialFieldIds.size() > 0) { + fullFieldIds.addAll(equalityFieldIds); + fullFieldIds.addAll(partialFieldIds); + fullSchema = TypeUtil.select(schema, fullFieldIds); + partial = true; + } } abstract RowDataDeltaWriter route(RowData row); @@ -80,10 +97,27 @@ public void write(RowData row) throws IOException { switch (row.getRowKind()) { case INSERT: case UPDATE_AFTER: - if (upsert) { - writer.deleteKey(keyProjection.wrap(row)); + if (partial) { + GenericRowData ret = new GenericRowData(fullFieldIds.size()); + ret.setRowKind(row.getRowKind()); + RowDataWrapper rowDataWrapper = this.wrapper.wrap(row); + for (int i = 0; i < fullFieldIds.size(); i++) { + for (int j = 0; j < schema.columns().size(); j++) { + if (schema.columns().get(j).fieldId() == fullSchema.columns().get(i).fieldId()) { + ret.setField( + i, + RowDataUtil.convertConstant( + fullSchema.columns().get(i).type(), rowDataWrapper.get(j, Object.class))); + } + } + } + writer.update(ret); + } else { + if (upsert) { + writer.deleteKey(keyProjection.wrap(row)); + } + writer.write(row); } - writer.write(row); break; case UPDATE_BEFORE: diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index b5d08b46be58..e49ab6f70c6e 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -32,6 +32,7 @@ import org.apache.iceberg.StructLike; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PartialDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.flink.FlinkSchemaUtil; @@ -46,6 +47,7 @@ import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; public class FlinkAppenderFactory implements FileAppenderFactory, Serializable { private final Schema schema; @@ -53,15 +55,21 @@ public class FlinkAppenderFactory implements FileAppenderFactory, Seria private final Map props; private final PartitionSpec spec; private final int[] equalityFieldIds; + private final int[] partialFieldIds; + private final int[] partialFullFieldIds; private final Schema eqDeleteRowSchema; private final Schema posDeleteRowSchema; + private final Schema partialDataSchema; + private final Schema partialFullSchema; + private final Map config = Maps.newHashMap(); private RowType eqDeleteFlinkSchema = null; private RowType posDeleteFlinkSchema = null; + private RowType partialUpdateFlinkSchema = null; public FlinkAppenderFactory( Schema schema, RowType flinkSchema, Map props, PartitionSpec spec) { - this(schema, flinkSchema, props, spec, null, null, null); + this(schema, flinkSchema, props, spec, null, null, null, null, null, null, null); } public FlinkAppenderFactory( @@ -70,15 +78,23 @@ public FlinkAppenderFactory( Map props, PartitionSpec spec, int[] equalityFieldIds, + int[] partialFieldIds, + int[] partialFullFieldIds, Schema eqDeleteRowSchema, - Schema posDeleteRowSchema) { + Schema posDeleteRowSchema, + Schema partialDataSchema, + Schema partialFullSchema) { this.schema = schema; this.flinkSchema = flinkSchema; this.props = props; this.spec = spec; this.equalityFieldIds = equalityFieldIds; + this.partialFieldIds = partialFieldIds; + this.partialFullFieldIds = partialFullFieldIds; this.eqDeleteRowSchema = eqDeleteRowSchema; this.posDeleteRowSchema = posDeleteRowSchema; + this.partialDataSchema = partialDataSchema; + this.partialFullSchema = partialFullSchema; } private RowType lazyEqDeleteFlinkSchema() { @@ -97,6 +113,14 @@ private RowType lazyPosDeleteFlinkSchema() { return this.posDeleteFlinkSchema; } + private RowType lazyPartialUpdateFlinkSchema() { + if (partialUpdateFlinkSchema == null) { + Preconditions.checkNotNull(partialFullSchema, "Partial-update row schema shouldn't be null"); + this.partialUpdateFlinkSchema = FlinkSchemaUtil.convert(partialFullSchema); + } + return this.partialUpdateFlinkSchema; + } + @Override public FileAppender newAppender(OutputFile outputFile, FileFormat format) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); @@ -268,4 +292,70 @@ public PositionDeleteWriter newPosDeleteWriter( throw new UncheckedIOException(e); } } + + @Override + public PartialDeleteWriter newPartialWriter( + EncryptedOutputFile file, FileFormat format, StructLike partition) { + Preconditions.checkState( + equalityFieldIds != null && equalityFieldIds.length > 0, + "Equality field ids shouldn't be null or empty when creating equality-delete writer"); + Preconditions.checkNotNull( + eqDeleteRowSchema, + "Equality delete row schema shouldn't be null when creating equality-delete writer"); + + MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); + try { + switch (format) { + case AVRO: + return Avro.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc(ignore -> new FlinkAvroWriter(lazyPartialUpdateFlinkSchema())) + .withPartition(partition) + .overwrite() + .setAll(config) + .rowSchema(partialFullSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .equalityFieldIds(equalityFieldIds) + .partialFieldIds(partialFieldIds) + .buildPartialWriter(); + case ORC: + return ORC.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc( + (iSchema, typDesc) -> + FlinkOrcWriter.buildWriter(lazyPartialUpdateFlinkSchema(), iSchema)) + .withPartition(partition) + .overwrite() + .setAll(props) + .metricsConfig(metricsConfig) + .rowSchema(partialFullSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .transformPaths(path -> StringData.fromString(path.toString())) + .equalityFieldIds(equalityFieldIds) + .partialFieldIds(partialFieldIds) + .buildPartialWriter(); + case PARQUET: + return Parquet.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc( + msgType -> + FlinkParquetWriters.buildWriter(lazyPartialUpdateFlinkSchema(), msgType)) + .withPartition(partition) + .overwrite() + .setAll(props) + .metricsConfig(metricsConfig) + .rowSchema(partialFullSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .transformPaths(path -> StringData.fromString(path.toString())) + .equalityFieldIds(equalityFieldIds) + .partialFieldIds(partialFieldIds) + .buildPartialWriter(); + default: + throw new UnsupportedOperationException( + "Cannot write equality-deletes for unsupported file format: " + format); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } } diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index ead0b757e583..36f330c66c3b 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -47,6 +47,7 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.Row; +import org.apache.hadoop.util.StringUtils; import org.apache.iceberg.DataFile; import org.apache.iceberg.DistributionMode; import org.apache.iceberg.FileFormat; @@ -341,6 +342,7 @@ private DataStreamSink chainIcebergOperators() { // Find out the equality field id list based on the user-provided equality field column names. List equalityFieldIds = checkAndGetEqualityFieldIds(); + List partialFieldIds = checkAndGetPartialFieldIds(flinkWriteConf.partialIds()); // Convert the requested flink table schema to flink row type. RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema); @@ -353,7 +355,7 @@ private DataStreamSink chainIcebergOperators() { // Add parallel writers that append rows to files SingleOutputStreamOperator writerStream = - appendWriter(distributeStream, flinkRowType, equalityFieldIds); + appendWriter(distributeStream, flinkRowType, equalityFieldIds, partialFieldIds); // Add single-parallelism committer that commits files // after successful checkpoint or end of input @@ -404,6 +406,25 @@ List checkAndGetEqualityFieldIds() { return equalityFieldIds; } + List checkAndGetPartialFieldIds(String partialFieldIdString) { + List partialFieldIds = Lists.newArrayList(); + if (partialFieldIdString == null || partialFieldIdString.length() == 0) { + return partialFieldIds; + } else { + String[] columns = StringUtils.split(partialFieldIdString, ','); + for (String column : columns) { + org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column); + Preconditions.checkNotNull( + field, + "Missing required equality field column '%s' in table schema %s", + column, + table.schema()); + partialFieldIds.add(field.fieldId()); + } + return partialFieldIds; + } + } + @SuppressWarnings("unchecked") private DataStreamSink appendDummySink( SingleOutputStreamOperator committerStream) { @@ -438,7 +459,10 @@ private SingleOutputStreamOperator appendCommitter( } private SingleOutputStreamOperator appendWriter( - DataStream input, RowType flinkRowType, List equalityFieldIds) { + DataStream input, + RowType flinkRowType, + List equalityFieldIds, + List partialFieldIds) { // Validate the equality fields and partition fields if we enable the upsert mode. if (flinkWriteConf.upsertMode()) { Preconditions.checkState( @@ -459,7 +483,8 @@ private SingleOutputStreamOperator appendWriter( } IcebergStreamWriter streamWriter = - createStreamWriter(table, flinkWriteConf, flinkRowType, equalityFieldIds); + createStreamWriter( + table, flinkWriteConf, flinkRowType, equalityFieldIds, partialFieldIds); int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism; SingleOutputStreamOperator writerStream = @@ -570,7 +595,8 @@ static IcebergStreamWriter createStreamWriter( Table table, FlinkWriteConf flinkWriteConf, RowType flinkRowType, - List equalityFieldIds) { + List equalityFieldIds, + List partialFieldIds) { Preconditions.checkArgument(table != null, "Iceberg table shouldn't be null"); Table serializableTable = SerializableTable.copyOf(table); @@ -583,6 +609,7 @@ static IcebergStreamWriter createStreamWriter( format, writeProperties(table, format, flinkWriteConf), equalityFieldIds, + partialFieldIds, flinkWriteConf.upsertMode()); return new IcebergStreamWriter<>(table.name(), taskWriterFactory); } diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java index 38062dd1a2c4..de9a3d690b2a 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java @@ -50,6 +50,7 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter { Schema schema, RowType flinkSchema, List equalityFieldIds, + List partialFieldIds, boolean upsert) { super( spec, @@ -61,6 +62,7 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter { schema, flinkSchema, equalityFieldIds, + partialFieldIds, upsert); this.partitionKey = new PartitionKey(spec, schema); } diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index 634c2dfddaed..8a212c6d03ee 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileFormat; @@ -48,6 +49,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory { private final long targetFileSizeBytes; private final FileFormat format; private final List equalityFieldIds; + private final List partialFieldIds; private final boolean upsert; private final FileAppenderFactory appenderFactory; @@ -60,6 +62,7 @@ public RowDataTaskWriterFactory( FileFormat format, Map writeProperties, List equalityFieldIds, + List partialFieldIds, boolean upsert) { this.table = table; this.schema = table.schema(); @@ -69,11 +72,14 @@ public RowDataTaskWriterFactory( this.targetFileSizeBytes = targetFileSizeBytes; this.format = format; this.equalityFieldIds = equalityFieldIds; + this.partialFieldIds = partialFieldIds; this.upsert = upsert; - if (equalityFieldIds == null || equalityFieldIds.isEmpty()) { this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, writeProperties, spec); } else if (upsert) { + Set fullFieldIds = Sets.newHashSet(); + fullFieldIds.addAll(equalityFieldIds); + fullFieldIds.addAll(partialFieldIds); // In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of // the inserted row // may differ from the deleted row other than the primary key fields, and the delete file must @@ -86,9 +92,16 @@ public RowDataTaskWriterFactory( writeProperties, spec, ArrayUtil.toIntArray(equalityFieldIds), + ArrayUtil.toIntArray(partialFieldIds), + null, TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), - null); + null, + TypeUtil.select(schema, Sets.newHashSet(partialFieldIds)), + TypeUtil.select(schema, fullFieldIds)); } else { + Set fullFieldIds = Sets.newHashSet(); + fullFieldIds.addAll(equalityFieldIds); + fullFieldIds.addAll(partialFieldIds); this.appenderFactory = new FlinkAppenderFactory( schema, @@ -96,8 +109,12 @@ public RowDataTaskWriterFactory( writeProperties, spec, ArrayUtil.toIntArray(equalityFieldIds), + ArrayUtil.toIntArray(partialFieldIds), + null, schema, - null); + null, + TypeUtil.select(schema, Sets.newHashSet(partialFieldIds)), + TypeUtil.select(schema, fullFieldIds)); } } @@ -142,6 +159,7 @@ public TaskWriter create() { schema, flinkSchema, equalityFieldIds, + partialFieldIds, upsert); } else { return new PartitionedDeltaWriter( @@ -154,6 +172,7 @@ public TaskWriter create() { schema, flinkSchema, equalityFieldIds, + partialFieldIds, upsert); } } diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java index 7680fb933b20..40539cb96926 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java @@ -42,6 +42,7 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter { Schema schema, RowType flinkSchema, List equalityFieldIds, + List partialFieldIds, boolean upsert) { super( spec, @@ -53,6 +54,7 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter { schema, flinkSchema, equalityFieldIds, + partialFieldIds, upsert); this.writer = new RowDataDeltaWriter(null); } diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java index 5fada27d5471..2ad477b4a64e 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java @@ -18,8 +18,10 @@ */ package org.apache.iceberg.flink.source; +import java.util.List; import java.util.Map; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileScanTask; @@ -45,6 +47,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PartitionUtil; @Internal @@ -214,6 +217,36 @@ protected StructLike asStructLike(RowData row) { return asStructLike.wrap(row); } + @Override + protected RowData combineRecord( + RowData record, StructLike partialRecord, Schema partialSchema, Schema requiredSchema) { + GenericRowData ret; + + if (record instanceof GenericRowData) { + ret = (GenericRowData) record; + } else { + ret = new GenericRowData(record.getArity()); + } + ret.setRowKind(record.getRowKind()); + List columns = partialSchema.columns(); + List fullColumns = requiredSchema.columns(); + int index = 0; + int pos = 0; + while (index < columns.size()) { + Object obj = partialRecord.get(index, Object.class); + while (pos < fullColumns.size() + && fullColumns.get(pos).fieldId() != columns.get(index).fieldId()) { + pos++; + if (pos == fullColumns.size()) { + pos = 0; + } + } + ret.setField(pos, obj); + index++; + } + return ret; + } + @Override protected InputFile getInputFile(String location) { return inputFilesDecryptor.getInputFile(location); diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java index 23665b7c9f0f..1d1c48dfee40 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java @@ -83,6 +83,7 @@ public RowDataRewriter( format, table.properties(), null, + null, false); } diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java index f62bcaa49dcd..f96b4151effd 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java @@ -173,39 +173,34 @@ public void testUpsertOptions() { Map optionsUpsertProps = Maps.newHashMap(tableUpsertProps); optionsUpsertProps.remove(TableProperties.UPSERT_ENABLED); sql( - "CREATE TABLE %s(id INT NOT NULL, province STRING NOT NULL, dt DATE, PRIMARY KEY(id,province) NOT ENFORCED) " + "CREATE TABLE %s(id INT NOT NULL, province STRING NOT NULL, name STRING NOT NULL, dt DATE, PRIMARY KEY(id, province) NOT ENFORCED) " + "PARTITIONED BY (province) WITH %s", tableName, toWithClause(optionsUpsertProps)); try { sql( "INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ VALUES " - + "(1, 'a', DATE '2022-03-01')," - + "(2, 'b', DATE '2022-03-01')," - + "(1, 'b', DATE '2022-03-01')", + + "(1, 'b', 'b', DATE '2022-03-01')", tableName); sql( - "INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ VALUES " - + "(4, 'a', DATE '2022-03-02')," - + "(5, 'b', DATE '2022-03-02')," - + "(1, 'b', DATE '2022-03-02')", + "INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true', 'partial-ids'='dt')*/ VALUES " + + "(1, 'b', 'a', DATE '2022-03-02')", tableName); - List rowsOn20220301 = - Lists.newArrayList(Row.of(2, "b", dt20220301), Row.of(1, "a", dt20220301)); - TestHelpers.assertRows( - sql("SELECT * FROM %s WHERE dt < '2022-03-02'", tableName), rowsOn20220301); - - List rowsOn20220302 = - Lists.newArrayList( - Row.of(1, "b", dt20220302), Row.of(4, "a", dt20220302), Row.of(5, "b", dt20220302)); - TestHelpers.assertRows( - sql("SELECT * FROM %s WHERE dt = '2022-03-02'", tableName), rowsOn20220302); - - TestHelpers.assertRows( - sql("SELECT * FROM %s", tableName), - Lists.newArrayList(Iterables.concat(rowsOn20220301, rowsOn20220302))); + List rowsOn20220301 = Lists.newArrayList(Row.of(1, "b", "b", dt20220302)); + TestHelpers.assertRows(sql("SELECT * FROM %s ", tableName), rowsOn20220301); + + // List rowsOn20220302 = + // Lists.newArrayList( + // Row.of(1, "b", dt20220302), Row.of(4, "a", dt20220302), Row.of(5, "b", + // dt20220302)); + // TestHelpers.assertRows( + // sql("SELECT * FROM %s WHERE dt = '2022-03-02'", tableName), rowsOn20220302); + // + // TestHelpers.assertRows( + // sql("SELECT * FROM %s", tableName), + // Lists.newArrayList(Iterables.concat(rowsOn20220301, rowsOn20220302))); } finally { sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); } diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java index 49f472b7325e..1df92e910d8c 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java @@ -215,7 +215,7 @@ private static OneInputStreamOperatorTestHarness createIce icebergTable, override, new org.apache.flink.configuration.Configuration()); IcebergStreamWriter streamWriter = - FlinkSink.createStreamWriter(icebergTable, flinkWriteConfig, flinkRowType, null); + FlinkSink.createStreamWriter(icebergTable, flinkWriteConfig, flinkRowType, null, null); OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>(streamWriter, 1, 1, 0); diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java index 1f8cbfe19152..963c856da6f8 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java @@ -347,6 +347,7 @@ private TaskWriterFactory createTaskWriterFactory(List equalit format, table.properties(), equalityFieldIds, + null, false); } } diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java index 4c17cd7607df..3c8735eaa026 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java @@ -48,8 +48,12 @@ protected FileAppenderFactory createAppenderFactory( table.properties(), table.spec(), ArrayUtil.toIntArray(equalityFieldIds), + null, + null, eqDeleteSchema, - posDeleteRowSchema); + posDeleteRowSchema, + null, + null); } @Override diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java index 7fe4e159fc61..8381f4f46140 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java @@ -84,7 +84,11 @@ public void before() throws IOException { table.properties(), table.spec(), equalityFieldIds, + null, + null, table.schema(), + null, + null, null); } diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index e27b9093f19d..9da1fb3d907f 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -776,7 +776,11 @@ private FileAppenderFactory createDeletableAppenderFactory() { table.properties(), table.spec(), equalityFieldIds, + null, + null, table.schema(), + null, + null, null); } diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java index bd959bfb31c4..9d0b7019626e 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java @@ -380,7 +380,7 @@ private OneInputStreamOperatorTestHarness createIcebergStr icebergTable, Maps.newHashMap(), new org.apache.flink.configuration.Configuration()); IcebergStreamWriter streamWriter = - FlinkSink.createStreamWriter(icebergTable, flinkWriteConfig, flinkRowType, null); + FlinkSink.createStreamWriter(icebergTable, flinkWriteConfig, flinkRowType, null, null); OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>(streamWriter, 1, 1, 0); diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java index c56a348e7445..7768c919e1a1 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java @@ -239,6 +239,7 @@ private TaskWriter createTaskWriter(long targetFileSize) { format, table.properties(), null, + null, false); taskWriterFactory.initialize(1, 1); return taskWriterFactory.create(); diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java index 25ecec23d216..9dd88aedd146 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java @@ -178,6 +178,7 @@ private TaskWriter createTaskWriter( format, table.properties(), equalityFieldIds, + null, upsert); taskWriterFactory.initialize(1, 1); diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java index 5fada27d5471..b935d85d6b44 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java @@ -214,6 +214,12 @@ protected StructLike asStructLike(RowData row) { return asStructLike.wrap(row); } + @Override + protected RowData combineRecord( + RowData record, StructLike partialRecord, Schema partialSchema) { + throw new UnsupportedOperationException("Partial updates are not yet supported."); + } + @Override protected InputFile getInputFile(String location) { return inputFilesDecryptor.getInputFile(location); diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java index 5fada27d5471..b935d85d6b44 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java @@ -214,6 +214,12 @@ protected StructLike asStructLike(RowData row) { return asStructLike.wrap(row); } + @Override + protected RowData combineRecord( + RowData record, StructLike partialRecord, Schema partialSchema) { + throw new UnsupportedOperationException("Partial updates are not yet supported."); + } + @Override protected InputFile getInputFile(String location) { return inputFilesDecryptor.getInputFile(location); diff --git a/gradle.properties b/gradle.properties index ad3884e238b1..043a93613e73 100644 --- a/gradle.properties +++ b/gradle.properties @@ -15,7 +15,7 @@ jmhOutputPath=build/reports/jmh/human-readable-output.txt jmhIncludeRegex=.* -systemProp.defaultFlinkVersions=1.16 +systemProp.defaultFlinkVersions=1.14 systemProp.knownFlinkVersions=1.14,1.15,1.16 systemProp.defaultHiveVersions=2 systemProp.knownHiveVersions=2,3 diff --git a/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes2.java b/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes2.java new file mode 100644 index 000000000000..97b9a02303ff --- /dev/null +++ b/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes2.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.mr; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.data.InternalRecordWrapper; +import org.apache.iceberg.data.PartialReadTests; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.util.StructLikeSet; +import org.junit.Assert; +import org.junit.Before; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestInputFormatReaderDeletes2 extends PartialReadTests { + private final Configuration conf = new Configuration(); + private final HadoopTables tables = new HadoopTables(conf); + private TestHelper helper; + + // parametrized variables + private final String inputFormat; + private final FileFormat fileFormat; + + @Parameterized.Parameters(name = "inputFormat = {0}, fileFormat={1}") + public static Object[][] parameters() { + return new Object[][] { + {"IcebergInputFormat", FileFormat.AVRO}, + }; + } + + @Before + @Override + public void writeTestDataFile() throws IOException { + conf.set(CatalogUtil.ICEBERG_CATALOG_TYPE, Catalogs.LOCATION); + super.writeTestDataFile(); + } + + public TestInputFormatReaderDeletes2(String inputFormat, FileFormat fileFormat) { + this.inputFormat = inputFormat; + this.fileFormat = fileFormat; + } + + @Override + protected Table createTable(String name, Schema schema, PartitionSpec spec) throws IOException { + Table table; + + File location = temp.newFolder(inputFormat, fileFormat.name()); + Assert.assertTrue(location.delete()); + helper = new TestHelper(conf, tables, location.toString(), schema, spec, fileFormat, temp); + table = helper.createTable(); + + TableOperations ops = ((BaseTable) table).operations(); + TableMetadata meta = ops.current(); + ops.commit(meta, meta.upgradeToFormatVersion(2)); + + return table; + } + + @Override + protected void dropTable(String name) { + tables.dropTable(helper.table().location()); + } + + @Override + public StructLikeSet rowSet(String name, Table table, String... columns) { + InputFormatConfig.ConfigBuilder builder = + new InputFormatConfig.ConfigBuilder(conf).readFrom(table.location()); + Schema projected = table.schema().select(columns); + StructLikeSet set = StructLikeSet.create(projected.asStruct()); + + set.addAll( + TestIcebergInputFormats.TESTED_INPUT_FORMATS.stream() + .filter(recordFactory -> recordFactory.name().equals(inputFormat)) + .map( + recordFactory -> + recordFactory.create(builder.project(projected).conf()).getRecords()) + .flatMap(List::stream) + .map(record -> new InternalRecordWrapper(projected.asStruct()).wrap(record)) + .collect(Collectors.toList())); + + return set; + } +} diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java index 5b2b877a97d4..7991a194ece2 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java @@ -61,6 +61,7 @@ import org.apache.iceberg.data.orc.GenericOrcWriter; import org.apache.iceberg.data.orc.GenericOrcWriters; import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PartialDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.exceptions.RuntimeIOException; @@ -499,6 +500,8 @@ public static class DeleteWriteBuilder { private StructLike partition = null; private EncryptionKeyMetadata keyMetadata = null; private int[] equalityFieldIds = null; + private int[] partialFieldIds = null; + private SortOrder sortOrder; private Function pathTransformFunc = Function.identity(); @@ -575,11 +578,21 @@ public DeleteWriteBuilder equalityFieldIds(List fieldIds) { return this; } + public DeleteWriteBuilder partialFieldIds(List fieldIds) { + this.partialFieldIds = ArrayUtil.toIntArray(fieldIds); + return this; + } + public DeleteWriteBuilder equalityFieldIds(int... fieldIds) { this.equalityFieldIds = fieldIds; return this; } + public DeleteWriteBuilder partialFieldIds(int... fieldIds) { + this.partialFieldIds = fieldIds; + return this; + } + public DeleteWriteBuilder transformPaths(Function newPathTransformFunc) { this.pathTransformFunc = newPathTransformFunc; return this; @@ -590,6 +603,51 @@ public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) { return this; } + public PartialDeleteWriter buildPartialWriter() throws IOException { + Preconditions.checkState( + rowSchema != null, "Cannot create equality delete file without a schema"); + Preconditions.checkState( + equalityFieldIds != null, "Cannot create partial file without delete field ids"); + Preconditions.checkState( + partialFieldIds != null, "Cannot create partial file without partial field ids"); + Preconditions.checkState( + createWriterFunc != null, "Cannot create partial file unless createWriterFunc is set"); + Preconditions.checkArgument( + spec != null, "Spec must not be null when creating partial writer"); + Preconditions.checkArgument( + spec.isUnpartitioned() || partition != null, + "Partition must not be null for partitioned writes"); + + meta("delete-type", "partial"); + meta( + "delete-field-ids", + IntStream.of(equalityFieldIds) + .mapToObj(Objects::toString) + .collect(Collectors.joining(", "))); + + meta( + "partial-field-ids", + IntStream.of(partialFieldIds) + .mapToObj(Objects::toString) + .collect(Collectors.joining(", "))); + + // the appender uses the row schema without extra columns + appenderBuilder.schema(rowSchema); + appenderBuilder.createWriterFunc(createWriterFunc); + appenderBuilder.createContextFunc(WriteBuilder.Context::deleteContext); + + return new PartialDeleteWriter<>( + appenderBuilder.build(), + FileFormat.ORC, + location, + spec, + partition, + keyMetadata, + sortOrder, + equalityFieldIds, + partialFieldIds); + } + public EqualityDeleteWriter buildEqualityWriter() { Preconditions.checkState( rowSchema != null, "Cannot create equality delete file without a schema"); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 8b1e6c056403..f24989c14f59 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -69,6 +69,7 @@ import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PartialDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.exceptions.RuntimeIOException; @@ -669,6 +670,7 @@ public static class DeleteWriteBuilder { private StructLike partition = null; private EncryptionKeyMetadata keyMetadata = null; private int[] equalityFieldIds = null; + private int[] partialFieldIds = null; private SortOrder sortOrder; private Function pathTransformFunc = Function.identity(); @@ -745,11 +747,21 @@ public DeleteWriteBuilder equalityFieldIds(List fieldIds) { return this; } + public DeleteWriteBuilder partialFieldIds(List fieldIds) { + this.partialFieldIds = ArrayUtil.toIntArray(fieldIds); + return this; + } + public DeleteWriteBuilder equalityFieldIds(int... fieldIds) { this.equalityFieldIds = fieldIds; return this; } + public DeleteWriteBuilder partialFieldIds(int... fieldIds) { + this.partialFieldIds = fieldIds; + return this; + } + public DeleteWriteBuilder transformPaths(Function newPathTransformFunc) { this.pathTransformFunc = newPathTransformFunc; return this; @@ -760,6 +772,51 @@ public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) { return this; } + public PartialDeleteWriter buildPartialWriter() throws IOException { + Preconditions.checkState( + rowSchema != null, "Cannot create equality delete file without a schema"); + Preconditions.checkState( + equalityFieldIds != null, "Cannot create partial file without delete field ids"); + Preconditions.checkState( + partialFieldIds != null, "Cannot create partial file without partial field ids"); + Preconditions.checkState( + createWriterFunc != null, "Cannot create partial file unless createWriterFunc is set"); + Preconditions.checkArgument( + spec != null, "Spec must not be null when creating partial writer"); + Preconditions.checkArgument( + spec.isUnpartitioned() || partition != null, + "Partition must not be null for partitioned writes"); + + meta("delete-type", "partial"); + meta( + "delete-field-ids", + IntStream.of(equalityFieldIds) + .mapToObj(Objects::toString) + .collect(Collectors.joining(", "))); + + meta( + "partial-field-ids", + IntStream.of(partialFieldIds) + .mapToObj(Objects::toString) + .collect(Collectors.joining(", "))); + + // the appender uses the row schema without extra columns + appenderBuilder.schema(rowSchema); + appenderBuilder.createWriterFunc(createWriterFunc); + appenderBuilder.createContextFunc(WriteBuilder.Context::deleteContext); + + return new PartialDeleteWriter<>( + appenderBuilder.build(), + FileFormat.PARQUET, + location, + spec, + partition, + keyMetadata, + sortOrder, + equalityFieldIds, + partialFieldIds); + } + public EqualityDeleteWriter buildEqualityWriter() throws IOException { Preconditions.checkState( rowSchema != null, "Cannot create equality delete file without a schema"); diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index f206149da30e..1507b79d7102 100644 --- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -189,6 +189,12 @@ protected StructLike asStructLike(InternalRow row) { return asStructLike.wrap(row); } + @Override + protected InternalRow combineRecord( + InternalRow record, StructLike partialRecord, Schema partialSchema) { + throw new UnsupportedOperationException("Partial updates are not yet supported."); + } + @Override protected InputFile getInputFile(String location) { return RowDataReader.this.getInputFile(location); diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java new file mode 100644 index 000000000000..1507b79d7102 --- /dev/null +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.Map; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataTask; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.data.SparkAvroReader; +import org.apache.iceberg.spark.data.SparkOrcReader; +import org.apache.iceberg.spark.data.SparkParquetReaders; +import org.apache.iceberg.types.TypeUtil; +import org.apache.spark.rdd.InputFileBlockHolder; +import org.apache.spark.sql.catalyst.InternalRow; + +class RowDataReader extends BaseDataReader { + + private final Schema tableSchema; + private final Schema expectedSchema; + private final String nameMapping; + private final boolean caseSensitive; + + RowDataReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive) { + super(table, task); + this.tableSchema = table.schema(); + this.expectedSchema = expectedSchema; + this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING); + this.caseSensitive = caseSensitive; + } + + @Override + CloseableIterator open(FileScanTask task) { + SparkDeleteFilter deletes = new SparkDeleteFilter(task, tableSchema, expectedSchema); + + // schema or rows returned by readers + Schema requiredSchema = deletes.requiredSchema(); + Map idToConstant = constantsMap(task, expectedSchema); + DataFile file = task.file(); + + // update the current file for Spark's filename() function + InputFileBlockHolder.set(file.path().toString(), task.start(), task.length()); + + return deletes.filter(open(task, requiredSchema, idToConstant)).iterator(); + } + + protected Schema tableSchema() { + return tableSchema; + } + + protected CloseableIterable open( + FileScanTask task, Schema readSchema, Map idToConstant) { + CloseableIterable iter; + if (task.isDataTask()) { + iter = newDataIterable(task.asDataTask(), readSchema); + } else { + InputFile location = getInputFile(task); + Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask"); + + switch (task.file().format()) { + case PARQUET: + iter = newParquetIterable(location, task, readSchema, idToConstant); + break; + + case AVRO: + iter = newAvroIterable(location, task, readSchema, idToConstant); + break; + + case ORC: + iter = newOrcIterable(location, task, readSchema, idToConstant); + break; + + default: + throw new UnsupportedOperationException( + "Cannot read unknown format: " + task.file().format()); + } + } + + return iter; + } + + private CloseableIterable newAvroIterable( + InputFile location, FileScanTask task, Schema projection, Map idToConstant) { + Avro.ReadBuilder builder = + Avro.read(location) + .reuseContainers() + .project(projection) + .split(task.start(), task.length()) + .createReaderFunc( + readSchema -> new SparkAvroReader(projection, readSchema, idToConstant)); + + if (nameMapping != null) { + builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); + } + + return builder.build(); + } + + private CloseableIterable newParquetIterable( + InputFile location, FileScanTask task, Schema readSchema, Map idToConstant) { + Parquet.ReadBuilder builder = + Parquet.read(location) + .reuseContainers() + .split(task.start(), task.length()) + .project(readSchema) + .createReaderFunc( + fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema, idToConstant)) + .filter(task.residual()) + .caseSensitive(caseSensitive); + + if (nameMapping != null) { + builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); + } + + return builder.build(); + } + + private CloseableIterable newOrcIterable( + InputFile location, FileScanTask task, Schema readSchema, Map idToConstant) { + Schema readSchemaWithoutConstantAndMetadataFields = + TypeUtil.selectNot( + readSchema, Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds())); + + ORC.ReadBuilder builder = + ORC.read(location) + .project(readSchemaWithoutConstantAndMetadataFields) + .split(task.start(), task.length()) + .createReaderFunc( + readOrcSchema -> new SparkOrcReader(readSchema, readOrcSchema, idToConstant)) + .filter(task.residual()) + .caseSensitive(caseSensitive); + + if (nameMapping != null) { + builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); + } + + return builder.build(); + } + + private CloseableIterable newDataIterable(DataTask task, Schema readSchema) { + StructInternalRow row = new StructInternalRow(readSchema.asStruct()); + CloseableIterable asSparkRows = + CloseableIterable.transform(task.asDataTask().rows(), row::setStruct); + return asSparkRows; + } + + protected class SparkDeleteFilter extends DeleteFilter { + private final InternalRowWrapper asStructLike; + + SparkDeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) { + super(task.file().path().toString(), task.deletes(), tableSchema, requestedSchema); + this.asStructLike = new InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema())); + } + + @Override + protected StructLike asStructLike(InternalRow row) { + return asStructLike.wrap(row); + } + + @Override + protected InternalRow combineRecord( + InternalRow record, StructLike partialRecord, Schema partialSchema) { + throw new UnsupportedOperationException("Partial updates are not yet supported."); + } + + @Override + protected InputFile getInputFile(String location) { + return RowDataReader.this.getInputFile(location); + } + } +} diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index 68e98ba913b7..46d8697c1e99 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -157,6 +157,12 @@ protected StructLike asStructLike(InternalRow row) { return asStructLike.wrap(row); } + @Override + protected InternalRow combineRecord( + InternalRow record, StructLike partialRecord, Schema partialSchema) { + throw new UnsupportedOperationException("Partial updates are not yet supported."); + } + @Override protected InputFile getInputFile(String location) { return BatchDataReader.this.getInputFile(location); diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index f206149da30e..1507b79d7102 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -189,6 +189,12 @@ protected StructLike asStructLike(InternalRow row) { return asStructLike.wrap(row); } + @Override + protected InternalRow combineRecord( + InternalRow record, StructLike partialRecord, Schema partialSchema) { + throw new UnsupportedOperationException("Partial updates are not yet supported."); + } + @Override protected InputFile getInputFile(String location) { return RowDataReader.this.getInputFile(location); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java index 2333cd734bbe..c87506f97f3c 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java @@ -261,6 +261,12 @@ protected StructLike asStructLike(InternalRow row) { return asStructLike.wrap(row); } + @Override + protected InternalRow combineRecord( + InternalRow record, StructLike partialRecord, Schema partialSchema) { + throw new UnsupportedOperationException("Partial updates are not yet supported."); + } + @Override protected InputFile getInputFile(String location) { return BaseReader.this.getInputFile(location); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java index 2333cd734bbe..07c41c68df05 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java @@ -261,6 +261,12 @@ protected StructLike asStructLike(InternalRow row) { return asStructLike.wrap(row); } + @Override + protected InternalRow combineRecord( + InternalRow record, StructLike partialRecord, Schema partialSchema, Schema requiredSchema) { + throw new UnsupportedOperationException("Partial updates are not yet supported."); + } + @Override protected InputFile getInputFile(String location) { return BaseReader.this.getInputFile(location); From 0e9e086970700b8cd1a27238bb5643512d3329d0 Mon Sep 17 00:00:00 2001 From: kunni Date: Fri, 25 Nov 2022 13:53:36 +0800 Subject: [PATCH 2/3] Core,Flink: add partial read and write implementation. --- .../java/org/apache/iceberg/data/DeleteFilter.java | 12 ++++++++---- .../apache/iceberg/data/GenericAppenderFactory.java | 1 + 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index 81a5d54f08cb..f7cdb71ac65f 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -69,7 +69,7 @@ public abstract class DeleteFilter { private final String filePath; private final List posDeletes; private final List eqDeletes; - private final List partialUpdates; + private final List partialDeletes; private final Schema requiredAllSchema; private final Accessor posAccessor; private final boolean hasIsDeletedColumn; @@ -116,7 +116,7 @@ protected DeleteFilter( this.posDeletes = posDeleteBuilder.build(); this.eqDeletes = eqDeleteBuilder.build(); - this.partialUpdates = partialBuilder.build(); + this.partialDeletes = partialBuilder.build(); this.requiredAllSchema = fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes); this.posAccessor = requiredAllSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId()); this.hasIsDeletedColumn = @@ -145,6 +145,10 @@ public boolean hasEqDeletes() { return !eqDeletes.isEmpty(); } + public boolean hasPartialDeletes() { + return !partialDeletes.isEmpty(); + } + public void incrementDeleteCount() { counter.increment(); } @@ -220,13 +224,13 @@ private Set> applyPartialMap() { } partialDataSet = Sets.newHashSet(); - if (partialUpdates.isEmpty()) { + if (partialDeletes.isEmpty()) { return partialDataSet; } Multimap, Set>, DeleteFile> filesByPartialIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList); - for (DeleteFile delete : partialUpdates) { + for (DeleteFile delete : partialDeletes) { Set eqIds = Sets.newHashSet(delete.equalityFieldIds()); Set partialIds = Sets.newHashSet(delete.partialFieldIds()); filesByPartialIds.put(Pair.of(eqIds, partialIds), delete); diff --git a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java index eea88e36b17a..ad938be7245d 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java @@ -187,6 +187,7 @@ public PartialDeleteWriter newPartialWriter( .setAll(config) .rowSchema(partialFullSchema) .withSpec(spec) + .metricsConfig(metricsConfig) .withKeyMetadata(file.keyMetadata()) .equalityFieldIds(equalityFieldIds) .partialFieldIds(partialFieldIds) From 1eb23de7b095fe9b01fd4b98caa46ce534d9acec Mon Sep 17 00:00:00 2001 From: kunni Date: Thu, 1 Dec 2022 11:30:37 +0800 Subject: [PATCH 3/3] tmp --- .../org/apache/iceberg/DeleteFileIndex.java | 46 +++++++++++++++++++ .../iceberg/data/GenericAppenderFactory.java | 2 +- deploy.gradle | 15 +----- 3 files changed, 49 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java index eedde21397eb..400ee58218ae 100644 --- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java +++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java @@ -156,6 +156,37 @@ private static boolean canContainDeletesForFile( case EQUALITY_DELETES: return canContainEqDeletesForFile(dataFile, deleteFile, schema); + + case PARTIAL_UPDATE: + return canContainPartialUpdateDeletesForFile(dataFile, deleteFile, schema); + } + + return true; + } + + // todo: add actual implementation + private static boolean canContainPartialUpdateDeletesForFile( + DataFile dataFile, DeleteFile deleteFile, Schema schema) { + // check that the delete file can contain the data file's file_path + Map lowers = deleteFile.lowerBounds(); + Map uppers = deleteFile.upperBounds(); + if (lowers == null || uppers == null) { + return true; + } + + Type pathType = MetadataColumns.DELETE_FILE_PATH.type(); + int pathId = MetadataColumns.DELETE_FILE_PATH.fieldId(); + Comparator comparator = Comparators.charSequences(); + ByteBuffer lower = lowers.get(pathId); + if (lower != null + && comparator.compare(dataFile.path(), Conversions.fromByteBuffer(pathType, lower)) < 0) { + return false; + } + + ByteBuffer upper = uppers.get(pathId); + if (upper != null + && comparator.compare(dataFile.path(), Conversions.fromByteBuffer(pathType, upper)) > 0) { + return false; } return true; @@ -474,6 +505,21 @@ DeleteFileIndex build() { globalApplySeqs = eqFilesSortedBySeq.stream().mapToLong(Pair::first).toArray(); globalDeletes = eqFilesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new); + List> partialDeleteSortedBySeq = + deleteFilesByPartition.get(partition).stream() + .filter(entry -> entry.file().content() == FileContent.PARTIAL_UPDATE) + .map( + entry -> + // a delete file is indexed by the sequence number it should be applied to + Pair.of(entry.dataSequenceNumber(), entry.file())) + .sorted(Comparator.comparingLong(Pair::first)) + .collect(Collectors.toList()); + if (partialDeleteSortedBySeq.size() > 0) { + globalApplySeqs = partialDeleteSortedBySeq.stream().mapToLong(Pair::first).toArray(); + globalDeletes = + partialDeleteSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new); + } + List> posFilesSortedBySeq = deleteFilesByPartition.get(partition).stream() .filter(entry -> entry.file().content() == FileContent.POSITION_DELETES) diff --git a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java index ad938be7245d..0f1315f3b117 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java @@ -195,7 +195,7 @@ public PartialDeleteWriter newPartialWriter( default: throw new UnsupportedOperationException( - "Cannot write equality-deletes for unsupported file format: " + format); + "Cannot write equality-deletes for unsupported file format: " + format); } } catch (IOException e) { throw new UncheckedIOException(e); diff --git a/deploy.gradle b/deploy.gradle index 8e0d43fe02e4..4273af28c57e 100644 --- a/deploy.gradle +++ b/deploy.gradle @@ -81,7 +81,7 @@ subprojects { } } - groupId = 'org.apache.iceberg' + groupId = 'org.apache.iceberg-kn' pom { name = 'Apache Iceberg' description = 'A table format for huge analytic datasets' @@ -109,18 +109,7 @@ subprojects { } repositories { - maven { - credentials { - username project.hasProperty('mavenUser') ? "$mavenUser" : "" - password project.hasProperty('mavenPassword') ? "$mavenPassword" : "" - } - // upload to the releases repository using ./gradlew -Prelease publish - def apacheSnapshotsRepoUrl = 'https://repository.apache.org/content/repositories/snapshots' - def apacheReleasesRepoUrl = 'https://repository.apache.org/service/local/staging/deploy/maven2' - def snapshotsRepoUrl = project.hasProperty('mavenSnapshotsRepo') ? "$mavenSnapshotsRepo" : "$apacheSnapshotsRepoUrl" - def releasesRepoUrl = project.hasProperty('mavenReleasesRepo') ? "$mavenReleasesRepo" : "$apacheReleasesRepoUrl" - url = project.hasProperty('release') ? releasesRepoUrl : snapshotsRepoUrl - } + mavenLocal() } }