Skip to content

Commit

Permalink
[CALCITE-6652] RelDecorrelator can't decorrelate query with limit 1
Browse files Browse the repository at this point in the history
  • Loading branch information
suibianwanwank committed Dec 8, 2024
1 parent c8a513b commit fb1b07f
Show file tree
Hide file tree
Showing 5 changed files with 777 additions and 20 deletions.
127 changes: 117 additions & 10 deletions core/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.calcite.plan.hep.HepRelVertex;
import org.apache.calcite.rel.BiRel;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelHomogeneousShuttle;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
Expand Down Expand Up @@ -73,6 +74,7 @@
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.rex.RexVisitorImpl;
import org.apache.calcite.runtime.PairList;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlExplainFormat;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlFunction;
Expand All @@ -94,6 +96,7 @@
import org.apache.calcite.util.mapping.Mappings;
import org.apache.calcite.util.trace.CalciteTrace;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap;
Expand Down Expand Up @@ -523,6 +526,19 @@ protected RexNode removeCorrelationExpr(
return null;
}

if (isCorVarDefined && (rel.fetch != null || rel.offset != null)) {
if (rel.fetch != null
&& rel.offset == null
&& RexLiteral.intValue(rel.fetch) == 1) {
return decorrelateFetchOne(rel, frame);
}
// Can not decorrelate if the sort has per-correlate-key attributes like
// offset or fetch limit, because these attributes scope would change to
// global after decorrelation. They should take effect within the scope
// of the correlation key actually.
return null;
}

final RelNode newInput = frame.r;

Mappings.TargetMapping mapping =
Expand Down Expand Up @@ -767,16 +783,6 @@ private static void shiftMapping(Map<Integer, Integer> mapping, int startIndex,
public @Nullable Frame getInvoke(RelNode r, boolean isCorVarDefined, @Nullable RelNode parent) {
final Frame frame = dispatcher.invoke(r, isCorVarDefined);
currentRel = parent;
if (frame != null && isCorVarDefined && r instanceof Sort) {
final Sort sort = (Sort) r;
// Can not decorrelate if the sort has per-correlate-key attributes like
// offset or fetch limit, because these attributes scope would change to
// global after decorrelation. They should take effect within the scope
// of the correlation key actually.
if (sort.offset != null || sort.fetch != null) {
return null;
}
}
if (frame != null) {
map.put(r, frame);
}
Expand All @@ -795,6 +801,107 @@ private static void shiftMapping(Map<Integer, Integer> mapping, int startIndex,
return null;
}

private @Nullable Frame decorrelateFetchOne(Sort sort, Frame frame) {
// For sorting with only one field, we specifically optimize for max/min
//
// Note that max/min is not equivalent to order by ... limit 1
// if the number of rows is 0.
// So we have to make sure that corDefOutputs is not empty.
if (sort.getRowType().getFieldCount() == 1
&& sort.getCollation().getFieldCollations().size() == 1
&& !frame.corDefOutputs.isEmpty()) {
RelFieldCollation collation = sort.getCollation().getFieldCollations().get(0);

SqlAggFunction aggFunction;
switch (collation.getDirection()) {
case ASCENDING:
case STRICTLY_ASCENDING:
aggFunction = SqlStdOperatorTable.MIN;
break;
case DESCENDING:
case STRICTLY_DESCENDING:
aggFunction = SqlStdOperatorTable.MAX;
break;
default:
return null;
}
relBuilder.push(sort.getInput());
RelBuilder.AggCall aggCall =
relBuilder.aggregateCall(aggFunction,
relBuilder.fields(ImmutableList.of(collation.getFieldIndex())));

RelNode aggregate = relBuilder
.aggregate(relBuilder.groupKey(), aggCall).build();
return getInvoke(aggregate, true, null);
}
//
// Rewrite logic:
//
// If sorted without offset and fetch = 1, rewrite the sort to be
// Aggregate(group=(corVar), agg=[any_value(field))
// project(first_value(field) over (partition by corVar order by (sort collation)))
// input

final List<CorDef> corDefList = new ArrayList<>();
final PairList<RexNode, String> corVarProjects = PairList.of();
for (Map.Entry<CorDef, Integer> entry : frame.corDefOutputs.entrySet()) {
RexInputRef.add2(corVarProjects, entry.getValue(),
frame.r.getRowType().getFieldList());
corDefList.add(entry.getKey());
}

final PairList<RexNode, String> windowProjects = PairList.of();

for (RelDataTypeField field : sort.getRowType().getFieldList()) {
final int newIndex =
requireNonNull(frame.oldToNewOutputs.get(field.getIndex()));

RelBuilder.AggCall aggCall =
relBuilder.aggregateCall(SqlStdOperatorTable.FIRST_VALUE,
RexInputRef.of(newIndex, frame.r.getRowType()));

RexNode winCall = aggCall.over()
.orderBy(sort.getSortExps())
.partitionBy(corVarProjects.leftList())
.toRex();
windowProjects.add(winCall, field.getName());
}

final PairList<RexNode, String> projects = PairList.of();
projects.addAll(windowProjects);
projects.addAll(corVarProjects);

RelNode newProject = relBuilder.push(frame.r)
.project(projects.leftList(), projects.rightList())
.build();

List<RelBuilder.AggCall> anyValues = new ArrayList<>();
final Map<Integer, Integer> mapOldToNewOutputs = new HashMap<>();
int originOutputSize = windowProjects.size();
for (int i = 0; i < originOutputSize; i++) {
RelBuilder.AggCall anyValue =
relBuilder.aggregateCall(SqlStdOperatorTable.ANY_VALUE,
RexInputRef.of(i, newProject.getRowType()));
anyValues.add(anyValue);
mapOldToNewOutputs.put(i, i + corDefList.size());
}

final NavigableMap<CorDef, Integer> corDefOutputs = new TreeMap<>();
List<RexNode> groupKeys = new ArrayList<>();
for (int i = 0; i < corDefList.size(); i++) {
groupKeys.add(RexInputRef.of(i + originOutputSize, newProject.getRowType()));
corDefOutputs.put(corDefList.get(i), i);
}

RelBuilder.GroupKey groupKey = relBuilder.groupKey(groupKeys);
RelNode aggregate = relBuilder
.push(newProject)
.aggregate(groupKey, anyValues)
.build();

return register(sort, aggregate, mapOldToNewOutputs, corDefOutputs);
}

public @Nullable Frame decorrelateRel(LogicalProject rel, boolean isCorVarDefined) {
return decorrelateRel((Project) rel, isCorVarDefined);
}
Expand Down
56 changes: 56 additions & 0 deletions core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -8534,6 +8534,62 @@ private void checkSemiJoinRuleOnAntiJoin(RelOptRule rule) {
.check();
}

/** Test case for
* <a href="https://issues.apache.org/jira/browse/CALCITE-6652">[CALCITE-6652]
* RelDecorrelator can't decorrelate query with limit 1</a>.
*/
@Test void testDecorrelateProjectWithFetchOne() {
final String query = "SELECT name, "
+ "(SELECT sal FROM emp where dept.deptno = emp.deptno order by sal limit 1) "
+ "FROM dept";
sql(query).withRule(
CoreRules.PROJECT_SUB_QUERY_TO_CORRELATE)
.withLateDecorrelate(true)
.check();
}

/** Test case for
* <a href="https://issues.apache.org/jira/browse/CALCITE-6652">[CALCITE-6652]
* RelDecorrelator can't decorrelate query with limit 1</a>.
*/
@Test void testDecorrelateProjectWithFetchOneDesc() {
final String query = "SELECT name, "
+ "(SELECT emp.sal FROM emp WHERE dept.deptno = emp.deptno ORDER BY emp.sal desc LIMIT 1) "
+ "FROM dept";
sql(query).withRule(
CoreRules.PROJECT_SUB_QUERY_TO_CORRELATE)
.withLateDecorrelate(true)
.check();
}

/** Test case for
* <a href="https://issues.apache.org/jira/browse/CALCITE-6652">[CALCITE-6652]
* RelDecorrelator can't decorrelate query with limit 1</a>.
*/
@Test void testDecorrelateFilterWithFetchOne() {
final String query = "SELECT name FROM dept "
+ "WHERE 10 > (SELECT emp.sal FROM emp where dept.deptno = emp.deptno "
+ "ORDER BY emp.sal desc limit 1)";
sql(query).withRule(
CoreRules.FILTER_SUB_QUERY_TO_CORRELATE)
.withLateDecorrelate(true)
.check();
}

/** Test case for
* <a href="https://issues.apache.org/jira/browse/CALCITE-6652">[CALCITE-6652]
* RelDecorrelator can't decorrelate query with limit 1</a>.
*/
@Test void testDecorrelateFilterWithMultiKeyAndFetchOne() {
final String query = "SELECT name FROM dept "
+ "WHERE 10 > (SELECT emp.sal FROM emp where dept.deptno = emp.deptno "
+ "order by year(hiredate), emp.sal limit 1)";
sql(query).withRule(
CoreRules.FILTER_SUB_QUERY_TO_CORRELATE)
.withLateDecorrelate(true)
.check();
}

/** Test case for
* <a href="https://issues.apache.org/jira/browse/CALCITE-434">[CALCITE-434]
* Converting predicates on date dimension columns into date ranges</a>,
Expand Down
Loading

0 comments on commit fb1b07f

Please sign in to comment.