Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
klion26 authored Dec 20, 2024
2 parents 76d7d78 + be8ecf5 commit 3fcc6ac
Showing 1 changed file with 23 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,52 +24,36 @@
import org.apache.amoro.AmoroTable;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableFormat;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.formats.AmoroCatalogTestHelper;
import org.apache.amoro.formats.IcebergHadoopCatalogTestHelper;
import org.apache.amoro.formats.iceberg.IcebergTable;
import org.apache.amoro.hive.formats.IcebergHiveCatalogTestHelper;
import org.apache.amoro.optimizing.MetricsSummary;
import org.apache.amoro.optimizing.OptimizingType;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.AmoroManagementConf;
import org.apache.amoro.server.persistence.DataSourceFactory;
import org.apache.amoro.server.persistence.SqlSessionFactoryProvider;
import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
import org.apache.amoro.server.table.DerbyPersistence;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.table.TableIdentifier;
import org.apache.amoro.table.descriptor.FormatTableDescriptor;
import org.apache.amoro.table.descriptor.OptimizingProcessInfo;
import org.apache.amoro.table.descriptor.TestServerTableDescriptor;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.ibatis.session.SqlSession;
import org.apache.iceberg.Table;
import org.apache.iceberg.types.Types;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sql.DataSource;

import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.List;
import java.util.Map;

@RunWith(Parameterized.class)
public class TestIcebergServerTableDescriptor extends TestServerTableDescriptor {
Persistency persistency = null;
@Rule public DerbyPersistence derbyPersistence = new DerbyPersistence();

public TestIcebergServerTableDescriptor(AmoroCatalogTestHelper<?> amoroCatalogTestHelper) {
super(amoroCatalogTestHelper);
Expand All @@ -91,32 +75,24 @@ protected void tableOperationsAddColumns() {
.commit();
}

@After
public void after() throws IOException {
if (persistency != null) {
persistency.truncateAllTables();
}
super.after();
}

@Test
public void testOptimizingPorcess() {
Persistency persistency = new Persistency();
public void testOptimizingProcess() {
TestMixedAndIcebergTableDescriptor descriptor = new TestMixedAndIcebergTableDescriptor();

String catalogName = "catalog1";
String dbName = "db1";
String tableName = "table1";

ServerTableIdentifier identifier =
ServerTableIdentifier.of(1L, catalogName, dbName, tableName, TableFormat.ICEBERG);
persistency.insertTable(identifier);
descriptor.insertTable(identifier);
MetricsSummary dummySummery = new MetricsSummary();
dummySummery.setNewDeleteFileCnt(1);
dummySummery.setNewFileCnt(1);
dummySummery.setNewFileSize(1);
dummySummery.setNewDeleteFileCnt(1);
dummySummery.setNewDeleteSize(1);
persistency.insertOptimizingProcess(
descriptor.insertOptimizingProcess(
identifier,
1L,
1,
Expand All @@ -128,7 +104,7 @@ public void testOptimizingPorcess() {
Collections.emptyMap(),
Collections.emptyMap());

persistency.insertOptimizingProcess(
descriptor.insertOptimizingProcess(
identifier,
2L,
2L,
Expand All @@ -140,7 +116,7 @@ public void testOptimizingPorcess() {
Collections.emptyMap(),
Collections.emptyMap());

persistency.insertOptimizingProcess(
descriptor.insertOptimizingProcess(
identifier,
3L,
3L,
Expand All @@ -152,7 +128,7 @@ public void testOptimizingPorcess() {
Collections.emptyMap(),
Collections.emptyMap());

persistency.insertOptimizingProcess(
descriptor.insertOptimizingProcess(
identifier,
4L,
4L,
Expand All @@ -164,7 +140,7 @@ public void testOptimizingPorcess() {
Collections.emptyMap(),
Collections.emptyMap());

persistency.insertOptimizingProcess(
descriptor.insertOptimizingProcess(
identifier,
5L,
5L,
Expand All @@ -176,7 +152,7 @@ public void testOptimizingPorcess() {
Collections.emptyMap(),
Collections.emptyMap());

persistency.insertOptimizingProcess(
descriptor.insertOptimizingProcess(
identifier,
6L,
6L,
Expand All @@ -188,7 +164,7 @@ public void testOptimizingPorcess() {
Collections.emptyMap(),
Collections.emptyMap());

persistency.insertOptimizingProcess(
descriptor.insertOptimizingProcess(
identifier,
7L,
7L,
Expand All @@ -200,7 +176,7 @@ public void testOptimizingPorcess() {
Collections.emptyMap(),
Collections.emptyMap());

persistency.insertOptimizingProcess(
descriptor.insertOptimizingProcess(
identifier,
8L,
8L,
Expand All @@ -212,7 +188,7 @@ public void testOptimizingPorcess() {
Collections.emptyMap(),
Collections.emptyMap());

persistency.insertOptimizingProcess(
descriptor.insertOptimizingProcess(
identifier,
9L,
9L,
Expand All @@ -224,7 +200,7 @@ public void testOptimizingPorcess() {
Collections.emptyMap(),
Collections.emptyMap());

persistency.insertOptimizingProcess(
descriptor.insertOptimizingProcess(
identifier,
10L,
10L,
Expand All @@ -243,29 +219,29 @@ public void testOptimizingPorcess() {
doReturn(tableIdentifier).when(table).id();

Pair<List<OptimizingProcessInfo>, Integer> res =
persistency.getOptimizingProcessesInfo(table, null, null, 4, 4);
Integer expectResturnItemSizeForNoTypeNoStatusOffset0Limit5 = 4;
descriptor.getOptimizingProcessesInfo(table, null, null, 4, 4);
Integer expectReturnItemSizeForNoTypeNoStatusOffset0Limit5 = 4;
Integer expectTotalForNoTypeNoStatusOffset0Limit5 = 10;
Assert.assertEquals(
expectResturnItemSizeForNoTypeNoStatusOffset0Limit5, (Integer) res.getLeft().size());
expectReturnItemSizeForNoTypeNoStatusOffset0Limit5, (Integer) res.getLeft().size());
Assert.assertEquals(expectTotalForNoTypeNoStatusOffset0Limit5, res.getRight());

res = persistency.getOptimizingProcessesInfo(table, null, ProcessStatus.SUCCESS, 5, 0);
res = descriptor.getOptimizingProcessesInfo(table, null, ProcessStatus.SUCCESS, 5, 0);
Integer expectReturnItemSizeForOnlyStatusOffset0limit5 = 5;
Integer expectedTotalForOnlyStatusOffset0Limit5 = 7;
Assert.assertEquals(
expectReturnItemSizeForOnlyStatusOffset0limit5, (Integer) res.getLeft().size());
Assert.assertEquals(expectedTotalForOnlyStatusOffset0Limit5, res.getRight());

res = persistency.getOptimizingProcessesInfo(table, OptimizingType.MINOR.name(), null, 5, 0);
res = descriptor.getOptimizingProcessesInfo(table, OptimizingType.MINOR.name(), null, 5, 0);
Integer expectedRetItemsSizeForOnlyTypeOffset0Limit5 = 4;
Integer expectedRetTotalForOnlyTypeOffset0Limit5 = 4;
Assert.assertEquals(
expectedRetItemsSizeForOnlyTypeOffset0Limit5, (Integer) res.getLeft().size());
Assert.assertEquals(expectedRetTotalForOnlyTypeOffset0Limit5, res.getRight());

res =
persistency.getOptimizingProcessesInfo(
descriptor.getOptimizingProcessesInfo(
table, OptimizingType.MINOR.name(), ProcessStatus.SUCCESS, 2, 2);
Integer expectedRetItemSizeForBothTypeAndStatusOffset2Limit2 = 2;
Integer expectedRetTotalForBothTypeAndStatusOffset2Limit2 = 4;
Expand Down Expand Up @@ -311,63 +287,8 @@ private Table getTable() {
return (Table) getAmoroCatalog().loadTable(TEST_DB, TEST_TABLE).originalTable();
}

/**
* Test persistence class used to test MixedAndIcebergTableDescriptor, it will use derby as the
* underly db.
*/
private static class Persistency extends MixedAndIcebergTableDescriptor {
private static final Logger LOG = LoggerFactory.getLogger(DerbyPersistence.class);

private static TemporaryFolder SINGLETON_FOLDER;

Persistency() {
try {
SINGLETON_FOLDER = new TemporaryFolder();
SINGLETON_FOLDER.create();
String derbyFilePath = SINGLETON_FOLDER.newFolder("derby").getPath();
String derbyUrl = String.format("jdbc:derby:%s/derby;create=true", derbyFilePath);
Configurations configurations = new Configurations();
configurations.set(AmoroManagementConf.DB_CONNECTION_URL, derbyUrl);
configurations.set(AmoroManagementConf.DB_TYPE, AmoroManagementConf.DB_TYPE_DERBY);
configurations.set(
AmoroManagementConf.DB_DRIVER_CLASS_NAME, "org.apache.derby.jdbc.EmbeddedDriver");
DataSource ds = DataSourceFactory.createDataSource(configurations);
SqlSessionFactoryProvider.getInstance().init(ds);
LOG.info("Initialized derby persistent with url: {}", derbyUrl);
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
SINGLETON_FOLDER.delete();
LOG.info("Deleted resources in derby persistent.");
}));
truncateAllTables();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private void truncateAllTables() {
try (SqlSession sqlSession =
SqlSessionFactoryProvider.getInstance().get().openSession(true)) {
try (Connection connection = sqlSession.getConnection()) {
try (Statement statement = connection.createStatement()) {
String query = "SELECT TABLENAME FROM SYS.SYSTABLES WHERE TABLETYPE='T'";
List<String> tableList = Lists.newArrayList();
try (ResultSet rs = statement.executeQuery(query)) {
while (rs.next()) {
tableList.add(rs.getString(1));
}
}
for (String table : tableList) {
statement.execute("TRUNCATE TABLE " + table);
}
}
}
} catch (SQLException e) {
throw new RuntimeException("Clear table failed", e);
}
}
/** Test descriptor class, add insert table/optimizing process methods for test. */
private static class TestMixedAndIcebergTableDescriptor extends MixedAndIcebergTableDescriptor {

public void insertTable(ServerTableIdentifier identifier) {
doAs(TableMetaMapper.class, mapper -> mapper.insertTable(identifier));
Expand Down

0 comments on commit 3fcc6ac

Please sign in to comment.