Skip to content

Commit

Permalink
[IcebergIO] cleanups, improvements, and test tweaks (#33592)
Browse files Browse the repository at this point in the history
* small iceberg improvements

* remove duplicate cache query
  • Loading branch information
ahmedabu98 authored Jan 22, 2025
1 parent c939a73 commit 0d3cfa9
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.dataflow.qual.Pure;

@AutoValue
public abstract class IcebergCatalogConfig implements Serializable {
private transient @MonotonicNonNull Catalog cachedCatalog;

@Pure
@Nullable
public abstract String getCatalogName();
Expand All @@ -47,6 +51,9 @@ public static Builder builder() {
}

public org.apache.iceberg.catalog.Catalog catalog() {
if (cachedCatalog != null) {
return cachedCatalog;
}
String catalogName = getCatalogName();
if (catalogName == null) {
catalogName = "apache-beam-" + ReleaseInfo.getReleaseInfo().getVersion();
Expand All @@ -63,7 +70,8 @@ public org.apache.iceberg.catalog.Catalog catalog() {
for (Map.Entry<String, String> prop : confProps.entrySet()) {
config.set(prop.getKey(), prop.getValue());
}
return CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, config);
cachedCatalog = CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, config);
return cachedCatalog;
}

@AutoValue.Builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,11 @@ class DestinationState {
boolean write(Record record) {
routingPartitionKey.partition(getPartitionableRecord(record));

if (!writers.asMap().containsKey(routingPartitionKey) && openWriters >= maxNumWriters) {
@Nullable RecordWriter writer = writers.getIfPresent(routingPartitionKey);
if (writer == null && openWriters >= maxNumWriters) {
return false;
}
RecordWriter writer = fetchWriterForPartition(routingPartitionKey);
writer = fetchWriterForPartition(routingPartitionKey, writer);
writer.write(record);
return true;
}
Expand All @@ -171,9 +172,8 @@ boolean write(Record record) {
* no {@link RecordWriter} exists or if it has reached the maximum limit of bytes written, a new
* one is created and returned.
*/
private RecordWriter fetchWriterForPartition(PartitionKey partitionKey) {
RecordWriter recordWriter = writers.getIfPresent(partitionKey);

private RecordWriter fetchWriterForPartition(
PartitionKey partitionKey, @Nullable RecordWriter recordWriter) {
if (recordWriter == null || recordWriter.bytesWritten() > maxFileSize) {
// each writer must have its own PartitionKey object
PartitionKey copy = partitionKey.copy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.io.iceberg.catalog;

import java.io.IOException;
import java.util.Map;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -50,7 +49,7 @@ public Catalog createCatalog() {
}

@Override
public void catalogCleanup() throws IOException {
public void catalogCleanup() {
for (TableIdentifier tableIdentifier : catalog.listTables(Namespace.of(DATASET))) {
// only delete tables that were created in this test run
if (tableIdentifier.name().contains(String.valueOf(SALT))) {
Expand All @@ -70,6 +69,7 @@ public Map<String, Object> managedIcebergConfig(String tableId) {
.put("gcp_location", "us-central1")
.put("warehouse", warehouse)
.put("catalog-impl", BQMS_CATALOG)
.put("io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO")
.build())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
package org.apache.beam.sdk.io.iceberg.catalog;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;

public class HadoopCatalogIT extends IcebergCatalogBaseIT {
Expand All @@ -46,7 +49,12 @@ public Catalog createCatalog() {

@Override
public void catalogCleanup() throws IOException {
((HadoopCatalog) catalog).close();
HadoopCatalog hadoopCatalog = (HadoopCatalog) catalog;
List<TableIdentifier> tables = hadoopCatalog.listTables(Namespace.of(testName.getMethodName()));
for (TableIdentifier identifier : tables) {
hadoopCatalog.dropTable(identifier);
}
hadoopCatalog.close();
}

@Override
Expand All @@ -58,6 +66,7 @@ public Map<String, Object> managedIcebergConfig(String tableId) {
ImmutableMap.<String, String>builder()
.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
.put("warehouse", warehouse)
.put("io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO")
.build())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@
*/
package org.apache.beam.sdk.io.iceberg.catalog;

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.io.iceberg.catalog.hiveutils.HiveMetastoreExtension;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.hive.HiveCatalog;
import org.junit.AfterClass;
import org.junit.BeforeClass;

/**
* Read and write tests using {@link HiveCatalog}.
Expand All @@ -37,18 +41,33 @@
*/
public class HiveCatalogIT extends IcebergCatalogBaseIT {
private static HiveMetastoreExtension hiveMetastoreExtension;
private static final String TEST_DB = "test_db";

private String testDb() {
return "test_db_" + testName.getMethodName();
}

@Override
public String tableId() {
return String.format("%s.%s", TEST_DB, testName.getMethodName());
return String.format("%s.%s", testDb(), "test_table");
}

@BeforeClass
public static void setUpClass() throws MetaException {
String warehouse = warehouse(HiveCatalogIT.class);
hiveMetastoreExtension = new HiveMetastoreExtension(warehouse);
}

@AfterClass
public static void tearDown() throws Exception {
if (hiveMetastoreExtension != null) {
hiveMetastoreExtension.cleanup();
}
}

@Override
public void catalogSetup() throws Exception {
hiveMetastoreExtension = new HiveMetastoreExtension(warehouse);
String dbPath = hiveMetastoreExtension.metastore().getDatabasePath(TEST_DB);
Database db = new Database(TEST_DB, "description", dbPath, Maps.newHashMap());
String dbPath = hiveMetastoreExtension.metastore().getDatabasePath(testDb());
Database db = new Database(testDb(), "description", dbPath, Maps.newHashMap());
hiveMetastoreExtension.metastoreClient().createDatabase(db);
}

Expand All @@ -66,7 +85,11 @@ public Catalog createCatalog() {
@Override
public void catalogCleanup() throws Exception {
if (hiveMetastoreExtension != null) {
hiveMetastoreExtension.cleanup();
List<String> tables = hiveMetastoreExtension.metastoreClient().getAllTables(testDb());
for (String table : tables) {
hiveMetastoreExtension.metastoreClient().dropTable(testDb(), table, true, false);
}
hiveMetastoreExtension.metastoreClient().dropDatabase(testDb());
}
}

Expand All @@ -81,7 +104,10 @@ public Map<String, Object> managedIcebergConfig(String tableId) {

return ImmutableMap.<String, Object>builder()
.put("table", tableId)
.put("name", "hive_" + catalogName)
.put("catalog_name", "hive_" + catalogName)
.put(
"catalog_properties",
ImmutableMap.of("io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO"))
.put("config_properties", confProperties)
.build();
}
Expand Down
Loading

0 comments on commit 0d3cfa9

Please sign in to comment.