Skip to content

Commit

Permalink
feature: add mvp code.
Browse files Browse the repository at this point in the history
  • Loading branch information
lintingbin committed Jan 24, 2025
1 parent 49989ba commit 6d254a9
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,11 @@ public static OptimizingConfig parseOptimizingConfig(Map<String, String> propert
properties,
TableProperties.SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES,
TableProperties.SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES_DEFAULT))
.setPartitionFilter(
CompatiblePropertyUtil.propertyAsString(
properties,
TableProperties.SELF_OPTIMIZING_PARTITION_FILTER,
TableProperties.SELF_OPTIMIZING_PARTITION_FILTER_DEFAULT))
.setBaseHashBucket(
CompatiblePropertyUtil.propertyAsInt(
properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ public class OptimizingConfig {
// self-optimizing.full.rewrite-all-files
private boolean fullRewriteAllFiles;

// self-optimizing.partition-filter
private String partitionFilter;

// base.file-index.hash-bucket
private int baseHashBucket;

Expand Down Expand Up @@ -240,6 +243,15 @@ public OptimizingConfig setFullRewriteAllFiles(boolean fullRewriteAllFiles) {
return this;
}

public OptimizingConfig setPartitionFilter(String partitionFilter) {
this.partitionFilter = partitionFilter;
return this;
}

public String getPartitionFilter() {
return partitionFilter;
}

public int getBaseHashBucket() {
return baseHashBucket;
}
Expand Down Expand Up @@ -291,6 +303,7 @@ public boolean equals(Object o) {
&& Double.compare(that.majorDuplicateRatio, majorDuplicateRatio) == 0
&& fullTriggerInterval == that.fullTriggerInterval
&& fullRewriteAllFiles == that.fullRewriteAllFiles
&& Objects.equal(partitionFilter, that.partitionFilter)
&& baseHashBucket == that.baseHashBucket
&& baseRefreshInterval == that.baseRefreshInterval
&& hiveRefreshInterval == that.hiveRefreshInterval
Expand All @@ -317,6 +330,7 @@ public int hashCode() {
majorDuplicateRatio,
fullTriggerInterval,
fullRewriteAllFiles,
partitionFilter,
baseHashBucket,
baseRefreshInterval,
hiveRefreshInterval,
Expand All @@ -341,6 +355,7 @@ public String toString() {
.add("majorDuplicateRatio", majorDuplicateRatio)
.add("fullTriggerInterval", fullTriggerInterval)
.add("fullRewriteAllFiles", fullRewriteAllFiles)
.add("partitionFilter", partitionFilter)
.add("baseHashBucket", baseHashBucket)
.add("baseRefreshInterval", baseRefreshInterval)
.add("hiveRefreshInterval", hiveRefreshInterval)
Expand Down
33 changes: 33 additions & 0 deletions amoro-format-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,39 @@
<artifactId>paimon-bundle</artifactId>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${terminal.spark.version}</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>hive-storage-api</artifactId>
<groupId>org.apache.hive</groupId>
</exclusion>
<exclusion>
<artifactId>jackson-databind</artifactId>
<groupId>com.fasterxml.jackson.core</groupId>
</exclusion>
<exclusion>
<artifactId>orc-core</artifactId>
<groupId>org.apache.orc</groupId>
</exclusion>
<exclusion>
<artifactId>orc-mapreduce</artifactId>
<groupId>org.apache.orc</groupId>
</exclusion>
<exclusion>
<artifactId>parquet-column</artifactId>
<groupId>org.apache.parquet</groupId>
</exclusion>
<exclusion>
<artifactId>parquet-hadoop</artifactId>
<groupId>org.apache.parquet</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,20 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute;
import org.apache.spark.sql.catalyst.expressions.Alias;
import org.apache.spark.sql.catalyst.expressions.And;
import org.apache.spark.sql.catalyst.expressions.AttributeReference;
import org.apache.spark.sql.catalyst.expressions.EqualTo;
import org.apache.spark.sql.catalyst.expressions.GreaterThan;
import org.apache.spark.sql.catalyst.expressions.LessThan;
import org.apache.spark.sql.catalyst.expressions.Literal;
import org.apache.spark.sql.catalyst.expressions.Or;
import org.apache.spark.sql.catalyst.plans.logical.Filter;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.SparkSqlParser;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.unsafe.types.UTF8String;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -111,7 +125,80 @@ protected void initEvaluator() {
}

protected Expression getPartitionFilter() {
return Expressions.alwaysTrue();
String partitionFilter = config.getPartitionFilter();
return partitionFilter == null
? Expressions.alwaysTrue()
: convertSqlToIcebergExpression(partitionFilter);
}

protected static Expression convertSqlToIcebergExpression(String sql) {
try {
SparkSqlParser parser = new SparkSqlParser();
LogicalPlan logicalPlan = parser.parsePlan("SELECT * FROM dummy WHERE " + sql);

Filter filter = (Filter) logicalPlan.children().head();
org.apache.spark.sql.catalyst.expressions.Expression sparkExpr = filter.condition();
return convertSparkExpressionToIceberg(sparkExpr);
} catch (Exception e) {
throw new IllegalArgumentException("Failed to parse where condition: " + sql, e);
}
}

private static Expression convertSparkExpressionToIceberg(
org.apache.spark.sql.catalyst.expressions.Expression sparkExpr) {
if (sparkExpr instanceof EqualTo) {
EqualTo eq = (EqualTo) sparkExpr;
return Expressions.equal(getColumnName(eq.left()), getValue(eq.right()));
} else if (sparkExpr instanceof GreaterThan) {
GreaterThan gt = (GreaterThan) sparkExpr;
return Expressions.greaterThan(getColumnName(gt.left()), getValue(gt.right()));
} else if (sparkExpr instanceof LessThan) {
LessThan lt = (LessThan) sparkExpr;
return Expressions.lessThan(getColumnName(lt.left()), getValue(lt.right()));
} else if (sparkExpr instanceof And) {
And and = (And) sparkExpr;
return Expressions.and(
convertSparkExpressionToIceberg(and.left()),
convertSparkExpressionToIceberg(and.right()));
} else if (sparkExpr instanceof Or) {
Or or = (Or) sparkExpr;
return Expressions.or(
convertSparkExpressionToIceberg(or.left()), convertSparkExpressionToIceberg(or.right()));
}

throw new UnsupportedOperationException("Unsupported expression: " + sparkExpr);
}

private static String getColumnName(org.apache.spark.sql.catalyst.expressions.Expression expr) {
if (expr instanceof AttributeReference) {
return ((AttributeReference) expr).name();
}

if (expr instanceof Alias) {
return getColumnName(((Alias) expr).child());
}

if (expr instanceof UnresolvedAttribute) {
return ((UnresolvedAttribute) expr).name();
}
throw new IllegalArgumentException("Expected column reference, got: " + expr);
}

private static Object getValue(org.apache.spark.sql.catalyst.expressions.Expression expr) {
if (expr instanceof Literal) {
return convertLiteral((Literal) expr);
}

throw new IllegalArgumentException("Expected literal value, got: " + expr);
}

private static Object convertLiteral(Literal literal) {
if (literal.value() instanceof UTF8String) {
return ((UTF8String) literal.value()).toString();
} else if (literal.value() instanceof Decimal) {
return ((Decimal) literal.value()).toJavaBigDecimal();
}
return literal.value();
}

private void initPartitionPlans(TableFileScanHelper tableFileScanHelper) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ private TableProperties() {}
"self-optimizing.full.rewrite-all-files";
public static final boolean SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES_DEFAULT = true;

public static final String SELF_OPTIMIZING_PARTITION_FILTER = "self-optimizing.partition-filter";
public static final String SELF_OPTIMIZING_PARTITION_FILTER_DEFAULT = null;

public static final String SELF_OPTIMIZING_MIN_PLAN_INTERVAL =
"self-optimizing.min-plan-interval";
public static final long SELF_OPTIMIZING_MIN_PLAN_INTERVAL_DEFAULT = 60000;
Expand Down

0 comments on commit 6d254a9

Please sign in to comment.