From 0d3cfa9565c396fb75d3f55d2e49b57d27c4c33f Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Wed, 22 Jan 2025 20:24:10 +0000 Subject: [PATCH] [IcebergIO] cleanups, improvements, and test tweaks (#33592) * small iceberg improvements * remove duplicate cache query --- .../sdk/io/iceberg/IcebergCatalogConfig.java | 10 ++- .../sdk/io/iceberg/RecordWriterManager.java | 10 +-- .../catalog/BigQueryMetastoreCatalogIT.java | 4 +- .../io/iceberg/catalog/HadoopCatalogIT.java | 11 ++- .../sdk/io/iceberg/catalog/HiveCatalogIT.java | 40 +++++++-- .../iceberg/catalog/IcebergCatalogBaseIT.java | 89 +++++++++++-------- .../catalog/hiveutils/TestHiveMetastore.java | 3 + 7 files changed, 115 insertions(+), 52 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java index 5307047354b8..1d89a6a26e98 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java @@ -24,11 +24,15 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.catalog.Catalog; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.dataflow.qual.Pure; @AutoValue public abstract class IcebergCatalogConfig implements Serializable { + private transient @MonotonicNonNull Catalog cachedCatalog; + @Pure @Nullable public abstract String getCatalogName(); @@ -47,6 +51,9 @@ public static Builder builder() { } public org.apache.iceberg.catalog.Catalog catalog() { + if (cachedCatalog != null) { + return cachedCatalog; + } String catalogName = getCatalogName(); if (catalogName == null) { catalogName = "apache-beam-" + ReleaseInfo.getReleaseInfo().getVersion(); @@ -63,7 +70,8 @@ public org.apache.iceberg.catalog.Catalog catalog() { for (Map.Entry prop : confProps.entrySet()) { config.set(prop.getKey(), prop.getValue()); } - return CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, config); + cachedCatalog = CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, config); + return cachedCatalog; } @AutoValue.Builder diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index 63186f26fb5a..78b01b15e62c 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -158,10 +158,11 @@ class DestinationState { boolean write(Record record) { routingPartitionKey.partition(getPartitionableRecord(record)); - if (!writers.asMap().containsKey(routingPartitionKey) && openWriters >= maxNumWriters) { + @Nullable RecordWriter writer = writers.getIfPresent(routingPartitionKey); + if (writer == null && openWriters >= maxNumWriters) { return false; } - RecordWriter writer = fetchWriterForPartition(routingPartitionKey); + writer = fetchWriterForPartition(routingPartitionKey, writer); writer.write(record); return true; } @@ -171,9 +172,8 @@ boolean write(Record record) { * no {@link RecordWriter} exists or if it has reached the maximum limit of bytes written, a new * one is created and returned. */ - private RecordWriter fetchWriterForPartition(PartitionKey partitionKey) { - RecordWriter recordWriter = writers.getIfPresent(partitionKey); - + private RecordWriter fetchWriterForPartition( + PartitionKey partitionKey, @Nullable RecordWriter recordWriter) { if (recordWriter == null || recordWriter.bytesWritten() > maxFileSize) { // each writer must have its own PartitionKey object PartitionKey copy = partitionKey.copy(); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java index 39920e66199b..887b0cfa56ca 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io.iceberg.catalog; -import java.io.IOException; import java.util.Map; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.hadoop.conf.Configuration; @@ -50,7 +49,7 @@ public Catalog createCatalog() { } @Override - public void catalogCleanup() throws IOException { + public void catalogCleanup() { for (TableIdentifier tableIdentifier : catalog.listTables(Namespace.of(DATASET))) { // only delete tables that were created in this test run if (tableIdentifier.name().contains(String.valueOf(SALT))) { @@ -70,6 +69,7 @@ public Map managedIcebergConfig(String tableId) { .put("gcp_location", "us-central1") .put("warehouse", warehouse) .put("catalog-impl", BQMS_CATALOG) + .put("io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO") .build()) .build(); } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HadoopCatalogIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HadoopCatalogIT.java index d33a372e5e3b..4e6766cc496d 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HadoopCatalogIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HadoopCatalogIT.java @@ -18,11 +18,14 @@ package org.apache.beam.sdk.io.iceberg.catalog; import java.io.IOException; +import java.util.List; import java.util.Map; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopCatalog; public class HadoopCatalogIT extends IcebergCatalogBaseIT { @@ -46,7 +49,12 @@ public Catalog createCatalog() { @Override public void catalogCleanup() throws IOException { - ((HadoopCatalog) catalog).close(); + HadoopCatalog hadoopCatalog = (HadoopCatalog) catalog; + List tables = hadoopCatalog.listTables(Namespace.of(testName.getMethodName())); + for (TableIdentifier identifier : tables) { + hadoopCatalog.dropTable(identifier); + } + hadoopCatalog.close(); } @Override @@ -58,6 +66,7 @@ public Map managedIcebergConfig(String tableId) { ImmutableMap.builder() .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) .put("warehouse", warehouse) + .put("io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO") .build()) .build(); } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java index 076d3f4f9db8..acb0e36b4b01 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.iceberg.catalog; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.io.iceberg.catalog.hiveutils.HiveMetastoreExtension; @@ -24,10 +25,13 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.hive.HiveCatalog; +import org.junit.AfterClass; +import org.junit.BeforeClass; /** * Read and write tests using {@link HiveCatalog}. @@ -37,18 +41,33 @@ */ public class HiveCatalogIT extends IcebergCatalogBaseIT { private static HiveMetastoreExtension hiveMetastoreExtension; - private static final String TEST_DB = "test_db"; + + private String testDb() { + return "test_db_" + testName.getMethodName(); + } @Override public String tableId() { - return String.format("%s.%s", TEST_DB, testName.getMethodName()); + return String.format("%s.%s", testDb(), "test_table"); + } + + @BeforeClass + public static void setUpClass() throws MetaException { + String warehouse = warehouse(HiveCatalogIT.class); + hiveMetastoreExtension = new HiveMetastoreExtension(warehouse); + } + + @AfterClass + public static void tearDown() throws Exception { + if (hiveMetastoreExtension != null) { + hiveMetastoreExtension.cleanup(); + } } @Override public void catalogSetup() throws Exception { - hiveMetastoreExtension = new HiveMetastoreExtension(warehouse); - String dbPath = hiveMetastoreExtension.metastore().getDatabasePath(TEST_DB); - Database db = new Database(TEST_DB, "description", dbPath, Maps.newHashMap()); + String dbPath = hiveMetastoreExtension.metastore().getDatabasePath(testDb()); + Database db = new Database(testDb(), "description", dbPath, Maps.newHashMap()); hiveMetastoreExtension.metastoreClient().createDatabase(db); } @@ -66,7 +85,11 @@ public Catalog createCatalog() { @Override public void catalogCleanup() throws Exception { if (hiveMetastoreExtension != null) { - hiveMetastoreExtension.cleanup(); + List tables = hiveMetastoreExtension.metastoreClient().getAllTables(testDb()); + for (String table : tables) { + hiveMetastoreExtension.metastoreClient().dropTable(testDb(), table, true, false); + } + hiveMetastoreExtension.metastoreClient().dropDatabase(testDb()); } } @@ -81,7 +104,10 @@ public Map managedIcebergConfig(String tableId) { return ImmutableMap.builder() .put("table", tableId) - .put("name", "hive_" + catalogName) + .put("catalog_name", "hive_" + catalogName) + .put( + "catalog_properties", + ImmutableMap.of("io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO")) .put("config_properties", confProperties) .build(); } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java index df2ca5adb7ac..f11c77ccc68d 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java @@ -23,7 +23,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertTrue; -import com.google.api.services.storage.model.Objects; +import com.google.api.services.storage.model.StorageObject; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -75,6 +75,7 @@ import org.apache.iceberg.encryption.InputFilesDecryptor; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; @@ -136,17 +137,18 @@ public String tableId() { return testName.getMethodName() + ".test_table"; } + public static String warehouse(Class testClass) { + return String.format( + "%s/%s/%s", + TestPipeline.testingPipelineOptions().getTempLocation(), testClass.getSimpleName(), RANDOM); + } + public String catalogName = "test_catalog_" + System.nanoTime(); @Before public void setUp() throws Exception { options = TestPipeline.testingPipelineOptions().as(GcpOptions.class); - warehouse = - String.format( - "%s/%s/%s", - TestPipeline.testingPipelineOptions().getTempLocation(), - getClass().getSimpleName(), - RANDOM); + warehouse = warehouse(getClass()); catalogSetup(); catalog = createCatalog(); } @@ -163,17 +165,24 @@ public void cleanUp() throws Exception { GcsUtil gcsUtil = options.as(GcsOptions.class).getGcsUtil(); GcsPath path = GcsPath.fromUri(warehouse); - Objects objects = - gcsUtil.listObjects( - path.getBucket(), - getClass().getSimpleName() + "/" + path.getFileName().toString(), - null); - List filesToDelete = - objects.getItems().stream() - .map(obj -> "gs://" + path.getBucket() + "/" + obj.getName()) - .collect(Collectors.toList()); - - gcsUtil.remove(filesToDelete); + @Nullable + List objects = + gcsUtil + .listObjects( + path.getBucket(), + getClass().getSimpleName() + "/" + path.getFileName().toString(), + null) + .getItems(); + + // sometimes a catalog's cleanup will take care of all the files. + // If any files are left though, manually delete them with GCS utils + if (objects != null) { + List filesToDelete = + objects.stream() + .map(obj -> "gs://" + path.getBucket() + "/" + obj.getName()) + .collect(Collectors.toList()); + gcsUtil.remove(filesToDelete); + } } catch (Exception e) { LOG.warn("Failed to clean up GCS files.", e); } @@ -181,7 +190,7 @@ public void cleanUp() throws Exception { protected static String warehouse; public Catalog catalog; - protected GcpOptions options; + protected static GcpOptions options; private static final String RANDOM = UUID.randomUUID().toString(); @Rule public TestPipeline pipeline = TestPipeline.create(); @Rule public TestName testName = new TestName(); @@ -275,7 +284,10 @@ private List populateTable(Table table) throws IOException { int totalRecords = 0; for (int shardNum = 0; shardNum < NUM_SHARDS; ++shardNum) { String filepath = table.location() + "/" + UUID.randomUUID(); - OutputFile file = table.io().newOutputFile(filepath); + OutputFile file; + try (FileIO io = table.io()) { + file = io.newOutputFile(filepath); + } DataWriter writer = Parquet.writeData(file) .schema(ICEBERG_SCHEMA) @@ -318,17 +330,20 @@ private List populateTable(Table table) throws IOException { } } - private List readRecords(Table table) { + private List readRecords(Table table) throws IOException { org.apache.iceberg.Schema tableSchema = table.schema(); TableScan tableScan = table.newScan().project(tableSchema); List writtenRecords = new ArrayList<>(); - for (CombinedScanTask task : tableScan.planTasks()) { - InputFilesDecryptor descryptor = - new InputFilesDecryptor(task, table.io(), table.encryption()); + CloseableIterable tasks = tableScan.planTasks(); + for (CombinedScanTask task : tasks) { + InputFilesDecryptor decryptor; + try (FileIO io = table.io()) { + decryptor = new InputFilesDecryptor(task, io, table.encryption()); + } for (FileScanTask fileTask : task.files()) { Map idToConstants = constantsMap(fileTask, IdentityPartitionConverters::convertConstant, tableSchema); - InputFile inputFile = descryptor.getInputFile(fileTask); + InputFile inputFile = decryptor.getInputFile(fileTask); CloseableIterable iterable = Parquet.read(inputFile) .split(fileTask.start(), fileTask.length()) @@ -342,8 +357,10 @@ private List readRecords(Table table) { for (Record rec : iterable) { writtenRecords.add(rec); } + iterable.close(); } } + tasks.close(); return writtenRecords; } @@ -363,7 +380,7 @@ public void testRead() throws Exception { } @Test - public void testWrite() { + public void testWrite() throws IOException { // Write with Beam // Expect the sink to create the table Map config = managedIcebergConfig(tableId()); @@ -381,7 +398,7 @@ public void testWrite() { } @Test - public void testWriteToPartitionedTable() { + public void testWriteToPartitionedTable() throws IOException { // For an example row where bool=true, modulo_5=3, str=value_303, // this partition spec will create a partition like: /bool=true/modulo_5=3/str_trunc=value_3/ PartitionSpec partitionSpec = @@ -412,7 +429,7 @@ private PeriodicImpulse getStreamingSource() { } @Test - public void testStreamingWrite() { + public void testStreamingWrite() throws IOException { int numRecords = numRecords(); PartitionSpec partitionSpec = PartitionSpec.builderFor(ICEBERG_SCHEMA).identity("bool").identity("modulo_5").build(); @@ -442,7 +459,7 @@ public void testStreamingWrite() { } @Test - public void testStreamingWriteWithPriorWindowing() { + public void testStreamingWriteWithPriorWindowing() throws IOException { int numRecords = numRecords(); PartitionSpec partitionSpec = PartitionSpec.builderFor(ICEBERG_SCHEMA).identity("bool").identity("modulo_5").build(); @@ -474,7 +491,7 @@ public void testStreamingWriteWithPriorWindowing() { returnedRecords, containsInAnyOrder(inputRows.stream().map(RECORD_FUNC::apply).toArray())); } - private void writeToDynamicDestinations(@Nullable String filterOp) { + private void writeToDynamicDestinations(@Nullable String filterOp) throws IOException { writeToDynamicDestinations(filterOp, false, false); } @@ -484,7 +501,7 @@ private void writeToDynamicDestinations(@Nullable String filterOp) { * and "only" */ private void writeToDynamicDestinations( - @Nullable String filterOp, boolean streaming, boolean partitioning) { + @Nullable String filterOp, boolean streaming, boolean partitioning) throws IOException { int numRecords = numRecords(); String tableIdentifierTemplate = tableId() + "_{modulo_5}_{char}"; Map writeConfig = new HashMap<>(managedIcebergConfig(tableIdentifierTemplate)); @@ -585,27 +602,27 @@ private void writeToDynamicDestinations( } @Test - public void testWriteToDynamicDestinations() { + public void testWriteToDynamicDestinations() throws IOException { writeToDynamicDestinations(null); } @Test - public void testWriteToDynamicDestinationsAndDropFields() { + public void testWriteToDynamicDestinationsAndDropFields() throws IOException { writeToDynamicDestinations("drop"); } @Test - public void testWriteToDynamicDestinationsWithOnlyRecord() { + public void testWriteToDynamicDestinationsWithOnlyRecord() throws IOException { writeToDynamicDestinations("only"); } @Test - public void testStreamToDynamicDestinationsAndKeepFields() { + public void testStreamToDynamicDestinationsAndKeepFields() throws IOException { writeToDynamicDestinations("keep", true, false); } @Test - public void testStreamToPartitionedDynamicDestinations() { + public void testStreamToPartitionedDynamicDestinations() throws IOException { writeToDynamicDestinations(null, true, true); } } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/hiveutils/TestHiveMetastore.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/hiveutils/TestHiveMetastore.java index 94f519179e9d..01fc54ed59cd 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/hiveutils/TestHiveMetastore.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/hiveutils/TestHiveMetastore.java @@ -215,6 +215,9 @@ public void reset() throws Exception { Path warehouseRoot = new Path(hiveWarehousePath); FileSystem fs = Util.getFs(warehouseRoot, hiveConf); + if (!fs.exists(warehouseRoot)) { + return; + } for (FileStatus fileStatus : fs.listStatus(warehouseRoot)) { if (!fileStatus.getPath().getName().equals("derby.log") && !fileStatus.getPath().getName().equals("metastore_db")) {