Skip to content

Commit

Permalink
support partial update
Browse files Browse the repository at this point in the history
  • Loading branch information
lvyanquan committed Dec 1, 2022
2 parents 49a0ea9 + 1eb23de commit b479d93
Show file tree
Hide file tree
Showing 34 changed files with 1,256 additions and 15 deletions.
10 changes: 10 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ acceptedBreaks:
- code: "java.method.removed"
old: "method void org.apache.iceberg.io.DataWriter<T>::add(T)"
justification: "Removing deprecated method"
"1.1.0":
org.apache.iceberg:iceberg-api:
- code: "java.method.addedToInterface"
new: "method java.util.List<java.lang.Integer> org.apache.iceberg.ContentFile<F>::partialFieldIds()"
justification: "{add new feature}"
org.apache.iceberg:iceberg-data:
- code: "java.method.abstractMethodAdded"
new: "method T org.apache.iceberg.data.DeleteFilter<T>::combineRecord(T, org.apache.iceberg.StructLike,\
\ org.apache.iceberg.Schema, org.apache.iceberg.Schema)"
justification: "{add new feature}"
apache-iceberg-0.14.0:
org.apache.iceberg:iceberg-api:
- code: "java.class.defaultSerializationChanged"
Expand Down
2 changes: 2 additions & 0 deletions api/src/main/java/org/apache/iceberg/ContentFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ public interface ContentFile<F> {
*/
List<Integer> equalityFieldIds();

List<Integer> partialFieldIds();

/**
* Returns the sort order id of this file, which describes how the file is ordered. This
* information will be useful for merging data and equality delete files more efficiently when
Expand Down
17 changes: 15 additions & 2 deletions api/src/main/java/org/apache/iceberg/DataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,14 @@ public interface DataFile extends ContentFile<DataFile> {
int PARTITION_ID = 102;
String PARTITION_NAME = "partition";
String PARTITION_DOC = "Partition data tuple, schema based on the partition spec";
// NEXT ID TO ASSIGN: 142

Types.NestedField PARTIAL_IDS =
optional(
142,
"partial_ids",
ListType.ofRequired(143, IntegerType.get()),
"partial comparison field IDs");
// NEXT ID TO ASSIGN: 144

static StructType getType(StructType partitionType) {
// IDs start at 100 to leave room for changes to ManifestEntry
Expand All @@ -123,7 +130,8 @@ static StructType getType(StructType partitionType) {
KEY_METADATA,
SPLIT_OFFSETS,
EQUALITY_IDS,
SORT_ORDER_ID);
SORT_ORDER_ID,
PARTIAL_IDS);
}

/** @return the content stored in the file; one of DATA, POSITION_DELETES, or EQUALITY_DELETES */
Expand All @@ -136,4 +144,9 @@ default FileContent content() {
default List<Integer> equalityFieldIds() {
return null;
}

@Override
default List<Integer> partialFieldIds() {
return null;
}
}
3 changes: 2 additions & 1 deletion api/src/main/java/org/apache/iceberg/FileContent.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
public enum FileContent {
DATA(0),
POSITION_DELETES(1),
EQUALITY_DELETES(2);
EQUALITY_DELETES(2),
PARTIAL_UPDATE(3);

private final int id;

Expand Down
18 changes: 18 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public PartitionData copy() {
private Map<Integer, ByteBuffer> upperBounds = null;
private long[] splitOffsets = null;
private int[] equalityIds = null;
private int[] partialIds = null;
private byte[] keyMetadata = null;
private Integer sortOrderId;

Expand Down Expand Up @@ -132,6 +133,7 @@ public PartitionData copy() {
Map<Integer, ByteBuffer> upperBounds,
List<Long> splitOffsets,
int[] equalityFieldIds,
int[] partialFieldIds,
Integer sortOrderId,
ByteBuffer keyMetadata) {
this.partitionSpecId = specId;
Expand Down Expand Up @@ -159,6 +161,7 @@ public PartitionData copy() {
this.upperBounds = SerializableByteBufferMap.wrap(upperBounds);
this.splitOffsets = ArrayUtil.toLongArray(splitOffsets);
this.equalityIds = equalityFieldIds;
this.partialIds = partialFieldIds;
this.sortOrderId = sortOrderId;
this.keyMetadata = ByteBuffers.toByteArray(keyMetadata);
}
Expand Down Expand Up @@ -207,6 +210,10 @@ public PartitionData copy() {
toCopy.equalityIds != null
? Arrays.copyOf(toCopy.equalityIds, toCopy.equalityIds.length)
: null;
this.partialIds =
toCopy.partialIds != null
? Arrays.copyOf(toCopy.partialIds, toCopy.partialIds.length)
: null;
this.sortOrderId = toCopy.sortOrderId;
}

Expand Down Expand Up @@ -294,6 +301,9 @@ public void put(int i, Object value) {
this.sortOrderId = (Integer) value;
return;
case 17:
this.partialIds = ArrayUtil.toIntArray((List<Integer>) value);
return;
case 18:
this.fileOrdinal = (long) value;
return;
default:
Expand Down Expand Up @@ -349,6 +359,8 @@ public Object get(int i) {
case 16:
return sortOrderId;
case 17:
return partialFieldIds();
case 18:
return fileOrdinal;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
Expand Down Expand Up @@ -445,6 +457,11 @@ public List<Integer> equalityFieldIds() {
return ArrayUtil.toIntList(equalityIds);
}

@Override
public List<Integer> partialFieldIds() {
return ArrayUtil.toIntList(partialIds);
}

@Override
public Integer sortOrderId() {
return sortOrderId;
Expand Down Expand Up @@ -478,6 +495,7 @@ public String toString() {
.add("split_offsets", splitOffsets == null ? "null" : splitOffsets())
.add("equality_ids", equalityIds == null ? "null" : equalityFieldIds())
.add("sort_order_id", sortOrderId)
.add("partial_ids", equalityIds == null ? "null" : partialFieldIds())
.toString();
}
}
47 changes: 47 additions & 0 deletions core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,37 @@ private static boolean canContainDeletesForFile(

case EQUALITY_DELETES:
return canContainEqDeletesForFile(dataFile, deleteFile, schema);

case PARTIAL_UPDATE:
return canContainPartialDeletesForFile(dataFile, deleteFile, schema);
}

return true;
}

// todo: add actual implementation
private static boolean canContainPartialDeletesForFile(
DataFile dataFile, DeleteFile deleteFile, Schema schema) {
// check that the delete file can contain the data file's file_path
Map<Integer, ByteBuffer> lowers = deleteFile.lowerBounds();
Map<Integer, ByteBuffer> uppers = deleteFile.upperBounds();
if (lowers == null || uppers == null) {
return true;
}

Type pathType = MetadataColumns.DELETE_FILE_PATH.type();
int pathId = MetadataColumns.DELETE_FILE_PATH.fieldId();
Comparator<CharSequence> comparator = Comparators.charSequences();
ByteBuffer lower = lowers.get(pathId);
if (lower != null
&& comparator.compare(dataFile.path(), Conversions.fromByteBuffer(pathType, lower)) < 0) {
return false;
}

ByteBuffer upper = uppers.get(pathId);
if (upper != null
&& comparator.compare(dataFile.path(), Conversions.fromByteBuffer(pathType, upper)) > 0) {
return false;
}

return true;
Expand Down Expand Up @@ -474,6 +505,22 @@ DeleteFileIndex build() {
globalApplySeqs = eqFilesSortedBySeq.stream().mapToLong(Pair::first).toArray();
globalDeletes = eqFilesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);

// fixme: this will overlap equal deletes
List<Pair<Long, DeleteFile>> partialDeleteSortedBySeq =
deleteFilesByPartition.get(partition).stream()
.filter(entry -> entry.file().content() == FileContent.PARTIAL_UPDATE)
.map(
entry ->
// a delete file is indexed by the sequence number it should be applied to
Pair.of(entry.dataSequenceNumber(), entry.file()))
.sorted(Comparator.comparingLong(Pair::first))
.collect(Collectors.toList());
if (partialDeleteSortedBySeq.size() > 0) {
globalApplySeqs = partialDeleteSortedBySeq.stream().mapToLong(Pair::first).toArray();
globalDeletes =
partialDeleteSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);
}

List<Pair<Long, DeleteFile>> posFilesSortedBySeq =
deleteFilesByPartition.get(partition).stream()
.filter(entry -> entry.file().content() == FileContent.POSITION_DELETES)
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/org/apache/iceberg/FileMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public static class Builder {
private final int specId;
private FileContent content = null;
private int[] equalityFieldIds = null;
private int[] partialFieldIds = null;
private PartitionData partitionData;
private String filePath = null;
private FileFormat format = null;
Expand Down Expand Up @@ -116,6 +117,13 @@ public Builder ofEqualityDeletes(int... fieldIds) {
return this;
}

public Builder ofPartialDeletes(int[] newEqualityFieldIds, int[] newPartialFieldIds) {
this.content = FileContent.PARTIAL_UPDATE;
this.equalityFieldIds = newEqualityFieldIds;
this.partialFieldIds = newPartialFieldIds;
return this;
}

public Builder withStatus(FileStatus stat) {
this.filePath = stat.getPath().toString();
this.fileSizeInBytes = stat.getLen();
Expand Down Expand Up @@ -222,6 +230,8 @@ public DeleteFile build() {
sortOrderId == null, "Position delete file should not have sort order");
break;
case EQUALITY_DELETES:

case PARTIAL_UPDATE:
if (sortOrderId == null) {
sortOrderId = SortOrder.unsorted().orderId();
}
Expand All @@ -246,6 +256,7 @@ public DeleteFile build() {
lowerBounds,
upperBounds),
equalityFieldIds,
partialFieldIds,
sortOrderId,
keyMetadata);
}
Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/iceberg/GenericDataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class GenericDataFile extends BaseFile<DataFile> implements DataFile {
metrics.upperBounds(),
splitOffsets,
null,
null,
sortOrderId,
keyMetadata);
}
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/iceberg/GenericDeleteFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class GenericDeleteFile extends BaseFile<DeleteFile> implements DeleteFile {
long fileSizeInBytes,
Metrics metrics,
int[] equalityFieldIds,
int[] partialFieldIds,
Integer sortOrderId,
ByteBuffer keyMetadata) {
super(
Expand All @@ -57,6 +58,7 @@ class GenericDeleteFile extends BaseFile<DeleteFile> implements DeleteFile {
metrics.upperBounds(),
null,
equalityFieldIds,
partialFieldIds,
sortOrderId,
keyMetadata);
}
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/java/org/apache/iceberg/SnapshotSummary.java
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,14 @@ private static class UpdateMetrics {
private int addedPosDeleteFiles = 0;
private int removedPosDeleteFiles = 0;
private int addedDeleteFiles = 0;
private int addedPartialFiles = 0;
private int removedDeleteFiles = 0;
private long addedRecords = 0L;
private long deletedRecords = 0L;
private long addedPosDeletes = 0L;
private long removedPosDeletes = 0L;
private long addedEqDeletes = 0L;
private long addedPartialUpdates = 0L;
private long removedEqDeletes = 0L;
private boolean trustSizeAndDeleteCounts = true;

Expand Down Expand Up @@ -290,6 +292,12 @@ void addedFile(ContentFile<?> file) {
this.addedEqDeleteFiles += 1;
this.addedEqDeletes += file.recordCount();
break;
case PARTIAL_UPDATE:
this.addedDeleteFiles += 1;
this.addedPartialFiles += 1;
this.addedPartialUpdates += file.recordCount();
break;

default:
throw new UnsupportedOperationException(
"Unsupported file content type: " + file.content());
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/java/org/apache/iceberg/V2Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ static Types.StructType fileType(Types.StructType partitionType) {
DataFile.KEY_METADATA,
DataFile.SPLIT_OFFSETS,
DataFile.EQUALITY_IDS,
DataFile.SORT_ORDER_ID);
DataFile.SORT_ORDER_ID,
DataFile.PARTIAL_IDS);
}

static class IndexedManifestEntry<F extends ContentFile<F>>
Expand Down Expand Up @@ -456,6 +457,8 @@ public Object get(int pos) {
return wrapped.equalityFieldIds();
case 15:
return wrapped.sortOrderId();
case 16:
return wrapped.partialFieldIds();
}
throw new IllegalArgumentException("Unknown field ordinal: " + pos);
}
Expand Down Expand Up @@ -550,6 +553,11 @@ public List<Integer> equalityFieldIds() {
return wrapped.equalityFieldIds();
}

@Override
public List<Integer> partialFieldIds() {
return wrapped.partialFieldIds();
}

@Override
public Integer sortOrderId() {
return wrapped.sortOrderId();
Expand Down
Loading

0 comments on commit b479d93

Please sign in to comment.