Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: Fix LOOKUP JOIN with limit #120411

Merged
merged 32 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6d1b913
Move test so all _non_unique_key tests are bundled
alex-spies Jan 17, 2025
6e1ff52
Add failing test
alex-spies Jan 17, 2025
bacc841
Make progress, add another failing test
alex-spies Jan 17, 2025
dcfedd1
Fix the test, expand constructor, make compile
alex-spies Jan 17, 2025
c61dfa4
Fix tests
alex-spies Jan 17, 2025
e4c1d58
Fix more tests
alex-spies Jan 17, 2025
5c05982
Merge remote-tracking branch 'upstream/main' into fix-lookup-with-limit
alex-spies Jan 21, 2025
44d5c16
Fix serialization tests
alex-spies Jan 21, 2025
a47a580
Align MV_EXPAND optimization
alex-spies Jan 21, 2025
9c67e41
Remove MvExpand.limit
alex-spies Jan 21, 2025
0407b40
Update tests
alex-spies Jan 21, 2025
4afb559
Avoid serializing the push down marker boolean
alex-spies Jan 21, 2025
96d4285
Merge branch 'main' into fix-lookup-with-limit
alex-spies Jan 21, 2025
19d8e2a
Fix unrelated tests
alex-spies Jan 22, 2025
4d2bd6f
Slightly stronger assertions
alex-spies Jan 22, 2025
4b613c3
Merge remote-tracking branch 'upstream/main' into fix-lookup-with-limit
alex-spies Jan 27, 2025
7cfebda
Address Costin's feecback
alex-spies Jan 27, 2025
bae6947
Simplify Limit.equals
alex-spies Jan 27, 2025
90b9baf
Add capability
alex-spies Jan 27, 2025
46764e4
Minor comment fixes etc.
alex-spies Jan 27, 2025
13e84a6
Improve comments and asserts in tests
alex-spies Jan 27, 2025
356e98c
Skip new spec tests in CsvTests
alex-spies Jan 27, 2025
9566b80
Fix limit serialization tests
alex-spies Jan 27, 2025
7705e48
Streamline LocalLogicalPlanOptimizerTests changes
alex-spies Jan 27, 2025
842034a
More LOOKUP JOIN optimizer tests
alex-spies Jan 27, 2025
1b16513
More tests
alex-spies Jan 27, 2025
9717067
Merge remote-tracking branch 'upstream/main' into fix-lookup-with-limit
alex-spies Jan 28, 2025
a736d58
Copy all relevant MV_EXPAND tests for LOOKUP JOIN
alex-spies Jan 28, 2025
5ac310e
Add edge case tests for descendant limits
alex-spies Jan 28, 2025
44249dd
Refactor based on Bogdan's feedback
alex-spies Jan 28, 2025
75a7b19
Merge branch 'main' into fix-lookup-with-limit
alex-spies Jan 28, 2025
ac32bfd
Checkstyle
alex-spies Jan 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.elasticsearch.xpack.esql.parser.QueryParam;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.Limit;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
Expand Down Expand Up @@ -111,6 +112,7 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.test.ESTestCase.assertEquals;
import static org.elasticsearch.test.ESTestCase.between;
import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength;
import static org.elasticsearch.test.ESTestCase.randomBoolean;
Expand Down Expand Up @@ -403,6 +405,21 @@ public static <T> T as(Object node, Class<T> type) {
return type.cast(node);
}

public static Limit asLimit(Object node, Integer limitLiteral) {
return asLimit(node, limitLiteral, null);
}

public static Limit asLimit(Object node, Integer limitLiteral, Boolean duplicated) {
Limit limit = as(node, Limit.class);
if (limitLiteral != null) {
assertEquals(as(limit.limit(), Literal.class).value(), limitLiteral);
}
if (duplicated != null) {
assertEquals(limit.duplicated(), duplicated);
}
return limit;
}

public static Map<String, EsField> loadMapping(String name) {
return LoadMapping.loadMapping(name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,58 @@ emp_no:integer
10001
;


lookupIndexInFromRepeatedRowBug
// Test for https://github.com/elastic/elasticsearch/issues/118852
required_capability: join_lookup_v12
FROM languages_lookup_non_unique_key
| WHERE language_code == 1
| LOOKUP JOIN languages_lookup ON language_code
| KEEP language_code, language_name, country
| SORT language_code, language_name, country
;

language_code:integer | language_name:keyword | country:text
1 | English | Canada
1 | English | United Kingdom
1 | English | United States of America
1 | English | null
;

nonUniqueRightKeyOnTheCoordinatorLateLimit
required_capability: join_lookup_v12
required_capability: join_lookup_fix_limit_pushdown

FROM employees
| SORT emp_no
| EVAL language_code = emp_no % 10
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
| KEEP emp_no, language_code, language_name, country
| LIMIT 4
| SORT country
;

emp_no:integer | language_code:integer | language_name:keyword | country:text
10001 | 1 | English | Canada
10001 | 1 | null | United Kingdom
10001 | 1 | English | United States of America
10001 | 1 | English | null
;

nonUniqueRightKeyLateLimitWithEmptyRelation
required_capability: join_lookup_v12
required_capability: join_lookup_fix_limit_pushdown

ROW language_code = 1
| WHERE language_code != 1
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
| LIMIT 1
| KEEP language_code, language_name
;

language_code:integer | language_name:keyword
;

###########################################################################
# null and multi-value behavior with languages_lookup_non_unique_key index
###########################################################################
Expand Down Expand Up @@ -1278,23 +1330,6 @@ ignoreOrder:true
2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233 | QA | null
;

lookupIndexInFromRepeatedRowBug
// Test for https://github.com/elastic/elasticsearch/issues/118852
required_capability: join_lookup_v12
FROM languages_lookup_non_unique_key
| WHERE language_code == 1
| LOOKUP JOIN languages_lookup ON language_code
| KEEP language_code, language_name, country
| SORT language_code, language_name, country
;

language_code:integer | language_name:keyword | country:text
1 | English | Canada
1 | English | United Kingdom
1 | English | United States of America
1 | English | null
;

lookupIndexQuoting
required_capability: join_lookup_v12
FROM languages_lookup_non_unique_key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,11 @@ public enum Cap {
*/
JOIN_LOOKUP_SKIP_MV_ON_LOOKUP_KEY(JOIN_LOOKUP_V12.isEnabled()),

/**
* Fix pushing down LIMIT past LOOKUP JOIN in case of multiple matching join keys.
*/
JOIN_LOOKUP_FIX_LIMIT_PUSHDOWN(JOIN_LOOKUP_V12.isEnabled()),

/**
* Fix for https://github.com/elastic/elasticsearch/issues/117054
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,7 @@ private LogicalPlan resolveMvExpand(MvExpand p, List<Attribute> childrenOutput)
resolved,
resolved.resolved()
? new ReferenceAttribute(resolved.source(), resolved.name(), resolved.dataType(), resolved.nullable(), null, false)
: resolved,
p.limit()
: resolved
);
}
return p;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* | sort first_name
* | limit 15
* <p>
* PushDownAndCombineLimits rule will copy the "limit 15" after "sort emp_no" if there is no filter on the expanded values
* {@link PushDownAndCombineLimits} will copy the "limit 15" after "sort emp_no" if there is no filter on the expanded values
* OR if there is no sort between "limit" and "mv_expand".
* But, since this type of query has such a filter, the "sort emp_no" will have no limit when it reaches the current rule.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

package org.elasticsearch.xpack.esql.optimizer.rules.logical;

import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
Expand All @@ -21,6 +20,9 @@
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;

import java.util.ArrayList;
import java.util.List;

public final class PushDownAndCombineLimits extends OptimizerRules.ParameterizedOptimizerRule<Limit, LogicalOptimizerContext> {

public PushDownAndCombineLimits() {
Expand All @@ -31,27 +33,18 @@ public PushDownAndCombineLimits() {
public LogicalPlan rule(Limit limit, LogicalOptimizerContext ctx) {
if (limit.child() instanceof Limit childLimit) {
var limitSource = limit.limit();
var l1 = (int) limitSource.fold(ctx.foldCtx());
var l2 = (int) childLimit.limit().fold(ctx.foldCtx());
return new Limit(limit.source(), Literal.of(limitSource, Math.min(l1, l2)), childLimit.child());
var parentLimitValue = (int) limitSource.fold(ctx.foldCtx());
var childLimitValue = (int) childLimit.limit().fold(ctx.foldCtx());
// We want to preserve the duplicated() value of the smaller limit, so we'll use replaceChild.
return parentLimitValue < childLimitValue ? limit.replaceChild(childLimit.child()) : childLimit;
} else if (limit.child() instanceof UnaryPlan unary) {
if (unary instanceof Eval || unary instanceof Project || unary instanceof RegexExtract || unary instanceof Enrich) {
return unary.replaceChild(limit.replaceChild(unary.child()));
} else if (unary instanceof MvExpand mvx) {
} else if (unary instanceof MvExpand) {
// MV_EXPAND can increase the number of rows, so we cannot just push the limit down
// (we also have to preserve the LIMIT afterwards)
//
// To avoid infinite loops, ie.
// | MV_EXPAND | LIMIT -> | LIMIT | MV_EXPAND | LIMIT -> ... | MV_EXPAND | LIMIT
// we add an inner limit to MvExpand and just push down the existing limit, ie.
// | MV_EXPAND | LIMIT N -> | LIMIT N | MV_EXPAND (with limit N)
var limitSource = limit.limit();
var limitVal = (int) limitSource.fold(ctx.foldCtx());
Integer mvxLimit = mvx.limit();
if (mvxLimit == null || mvxLimit > limitVal) {
mvx = new MvExpand(mvx.source(), mvx.child(), mvx.target(), mvx.expanded(), limitVal);
}
return mvx.replaceChild(limit.replaceChild(mvx.child()));
// To avoid repeating this infinitely, we have to set duplicated = true.
return duplicateLimitAsFirstGrandchild(limit);
}
// check if there's a 'visible' descendant limit lower than the current one
// and if so, align the current limit since it adds no value
Expand All @@ -62,17 +55,15 @@ public LogicalPlan rule(Limit limit, LogicalOptimizerContext ctx) {
var l1 = (int) limit.limit().fold(ctx.foldCtx());
var l2 = (int) descendantLimit.limit().fold(ctx.foldCtx());
if (l2 <= l1) {
return new Limit(limit.source(), Literal.of(limit.limit(), l2), limit.child());
return limit.withLimit(descendantLimit.limit());
}
}
}
} else if (limit.child() instanceof Join join) {
if (join.config().type() == JoinTypes.LEFT) {
// NOTE! This is only correct because our LEFT JOINs preserve the number of rows from the left hand side.
// This deviates from SQL semantics. In SQL, multiple matches on the right hand side lead to multiple rows in the output.
// For us, multiple matches on the right hand side are collected into multi-values.
return join.replaceChildren(limit.replaceChild(join.left()), join.right());
}
} else if (limit.child() instanceof Join join && join.config().type() == JoinTypes.LEFT) {
// Left joins increase the number of rows if any join key has multiple matches from the right hand side.
// Therefore, we cannot simply push down the limit - but we can add another limit before the join.
// To avoid repeating this infinitely, we have to set duplicated = true.
return duplicateLimitAsFirstGrandchild(limit);
}
return limit;
}
Expand Down Expand Up @@ -100,4 +91,27 @@ private static Limit descendantLimit(UnaryPlan unary) {
}
return null;
}

/**
* Duplicate the limit past its child if it wasn't duplicated yet. The duplicate is placed on top of its leftmost grandchild.
* Idempotent. (Sets {@link Limit#duplicated()} to {@code true} on the limit that remains at the top.)
*/
private static Limit duplicateLimitAsFirstGrandchild(Limit limit) {
if (limit.duplicated()) {
return limit;
}

List<LogicalPlan> grandChildren = limit.child().children();
LogicalPlan firstGrandChild = grandChildren.getFirst();
LogicalPlan newFirstGrandChild = limit.replaceChild(firstGrandChild);

List<LogicalPlan> newGrandChildren = new ArrayList<>();
newGrandChildren.add(newFirstGrandChild);
for (int i = 1; i < grandChildren.size(); i++) {
newGrandChildren.add(grandChildren.get(i));
}

LogicalPlan newChild = limit.child().replaceChildren(newGrandChildren);
return limit.replaceChild(newChild).withDuplicated(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,52 @@ public class Limit extends UnaryPlan {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Limit", Limit::new);

private final Expression limit;

/**
* Important for optimizations. This should be {@code false} in most cases, which allows this instance to be duplicated past a child
* plan node that increases the number of rows, like for LOOKUP JOIN and MV_EXPAND.
* Needs to be set to {@code true} in {@link org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownAndCombineLimits} to avoid
* infinite loops from adding a duplicate of the limit past the child over and over again.
*/
private final transient boolean duplicated;

/**
* Default way to create a new instance. Do not use this to copy an existing instance, as this sets {@link Limit#duplicated} to
* {@code false}.
*/
public Limit(Source source, Expression limit, LogicalPlan child) {
this(source, limit, child, false);
}

public Limit(Source source, Expression limit, LogicalPlan child, boolean duplicated) {
super(source, child);
this.limit = limit;
this.duplicated = duplicated;
}

/**
* Omits reading {@link Limit#duplicated}, c.f. {@link Limit#writeTo}.
*/
private Limit(StreamInput in) throws IOException {
this(Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(Expression.class), in.readNamedWriteable(LogicalPlan.class));
this(
Source.readFrom((PlanStreamInput) in),
in.readNamedWriteable(Expression.class),
in.readNamedWriteable(LogicalPlan.class),
false
);
}

/**
* Omits serializing {@link Limit#duplicated} because when sent to a data node, this should always be {@code false}.
* That's because if it's true, this means a copy of this limit was pushed down below an MvExpand or Join, and thus there's
* another pipeline breaker further upstream - we're already on the coordinator node.
*/
@Override
public void writeTo(StreamOutput out) throws IOException {
Source.EMPTY.writeTo(out);
out.writeNamedWriteable(limit());
out.writeNamedWriteable(child());
// Let's make sure we notice during tests if we ever serialize a duplicated Limit.
assert duplicated == false;
}

@Override
Expand All @@ -45,18 +76,30 @@ public String getWriteableName() {

@Override
protected NodeInfo<Limit> info() {
return NodeInfo.create(this, Limit::new, limit, child());
return NodeInfo.create(this, Limit::new, limit, child(), duplicated);
}

@Override
public Limit replaceChild(LogicalPlan newChild) {
return new Limit(source(), limit, newChild);
return new Limit(source(), limit, newChild, duplicated);
}

public Expression limit() {
return limit;
}

public Limit withLimit(Expression limit) {
return new Limit(source(), limit, child(), duplicated);
}

public boolean duplicated() {
return duplicated;
}

public Limit withDuplicated(boolean duplicated) {
return new Limit(source(), limit, child(), duplicated);
}

@Override
public String commandName() {
return "LIMIT";
Expand All @@ -69,7 +112,7 @@ public boolean expressionsResolved() {

@Override
public int hashCode() {
return Objects.hash(limit, child());
return Objects.hash(limit, child(), duplicated);
}

@Override
Expand All @@ -83,6 +126,6 @@ public boolean equals(Object obj) {

Limit other = (Limit) obj;

return Objects.equals(limit, other.limit) && Objects.equals(child(), other.child());
return Objects.equals(limit, other.limit) && Objects.equals(child(), other.child()) && (duplicated == other.duplicated);
}
}
Loading