Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: Fix LOOKUP JOIN with limit #120411

Merged
merged 32 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6d1b913
Move test so all _non_unique_key tests are bundled
alex-spies Jan 17, 2025
6e1ff52
Add failing test
alex-spies Jan 17, 2025
bacc841
Make progress, add another failing test
alex-spies Jan 17, 2025
dcfedd1
Fix the test, expand constructor, make compile
alex-spies Jan 17, 2025
c61dfa4
Fix tests
alex-spies Jan 17, 2025
e4c1d58
Fix more tests
alex-spies Jan 17, 2025
5c05982
Merge remote-tracking branch 'upstream/main' into fix-lookup-with-limit
alex-spies Jan 21, 2025
44d5c16
Fix serialization tests
alex-spies Jan 21, 2025
a47a580
Align MV_EXPAND optimization
alex-spies Jan 21, 2025
9c67e41
Remove MvExpand.limit
alex-spies Jan 21, 2025
0407b40
Update tests
alex-spies Jan 21, 2025
4afb559
Avoid serializing the push down marker boolean
alex-spies Jan 21, 2025
96d4285
Merge branch 'main' into fix-lookup-with-limit
alex-spies Jan 21, 2025
19d8e2a
Fix unrelated tests
alex-spies Jan 22, 2025
4d2bd6f
Slightly stronger assertions
alex-spies Jan 22, 2025
4b613c3
Merge remote-tracking branch 'upstream/main' into fix-lookup-with-limit
alex-spies Jan 27, 2025
7cfebda
Address Costin's feecback
alex-spies Jan 27, 2025
bae6947
Simplify Limit.equals
alex-spies Jan 27, 2025
90b9baf
Add capability
alex-spies Jan 27, 2025
46764e4
Minor comment fixes etc.
alex-spies Jan 27, 2025
13e84a6
Improve comments and asserts in tests
alex-spies Jan 27, 2025
356e98c
Skip new spec tests in CsvTests
alex-spies Jan 27, 2025
9566b80
Fix limit serialization tests
alex-spies Jan 27, 2025
7705e48
Streamline LocalLogicalPlanOptimizerTests changes
alex-spies Jan 27, 2025
842034a
More LOOKUP JOIN optimizer tests
alex-spies Jan 27, 2025
1b16513
More tests
alex-spies Jan 27, 2025
9717067
Merge remote-tracking branch 'upstream/main' into fix-lookup-with-limit
alex-spies Jan 28, 2025
a736d58
Copy all relevant MV_EXPAND tests for LOOKUP JOIN
alex-spies Jan 28, 2025
5ac310e
Add edge case tests for descendant limits
alex-spies Jan 28, 2025
44249dd
Refactor based on Bogdan's feedback
alex-spies Jan 28, 2025
75a7b19
Merge branch 'main' into fix-lookup-with-limit
alex-spies Jan 28, 2025
ac32bfd
Checkstyle
alex-spies Jan 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,54 @@ emp_no:integer
10001
;

lookupIndexInFromRepeatedRowBug
required_capability: join_lookup_v11
FROM languages_lookup_non_unique_key
| WHERE language_code == 1
| LOOKUP JOIN languages_lookup ON language_code
| KEEP language_code, language_name, country
| SORT language_code, language_name, country
;

language_code:integer | language_name:keyword | country:text
1 | English | Canada
1 | English | United Kingdom
1 | English | United States of America
1 | English | null
;

nonUniqueRightKeyOnTheCoordinatorLateLimit
required_capability: join_lookup_v11
alex-spies marked this conversation as resolved.
Show resolved Hide resolved

FROM employees
| SORT emp_no
| EVAL language_code = emp_no % 10
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
| KEEP emp_no, language_code, language_name, country
| LIMIT 4
| SORT country
;

emp_no:integer | language_code:integer | language_name:keyword | country:text
10001 | 1 | English | Canada
10001 | 1 | null | United Kingdom
10001 | 1 | English | United States of America
10001 | 1 | English | null
;

nonUniqueRightKeyLateLimitWithEmptyRelation
required_capability: join_lookup_v11

ROW language_code = 1
| WHERE language_code != 1
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
| LIMIT 1
| KEEP language_code, language_name
;

language_code:integer | language_name:keyword
;

###############################################
# Filtering tests with languages_lookup index
###############################################
Expand Down Expand Up @@ -1200,19 +1248,3 @@ ignoreOrder:true
2023-10-23T12:27:28.948Z | 172.21.2.113 | 2764889 | QA | null
2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233 | QA | null
;

lookupIndexInFromRepeatedRowBug
required_capability: join_lookup_v11
FROM languages_lookup_non_unique_key
| WHERE language_code == 1
| LOOKUP JOIN languages_lookup ON language_code
| KEEP language_code, language_name, country
| SORT language_code, language_name, country
;

language_code:integer | language_name:keyword | country:text
1 | English | Canada
1 | English | United Kingdom
1 | English | United States of America
1 | English | null
;
Original file line number Diff line number Diff line change
Expand Up @@ -577,8 +577,7 @@ private LogicalPlan resolveMvExpand(MvExpand p, List<Attribute> childrenOutput)
resolved,
resolved.resolved()
? new ReferenceAttribute(resolved.source(), resolved.name(), resolved.dataType(), resolved.nullable(), null, false)
: resolved,
p.limit()
: resolved
);
}
return p;
Expand Down Expand Up @@ -1067,7 +1066,7 @@ public LogicalPlan apply(LogicalPlan logicalPlan, AnalyzerContext context) {
limit = context.configuration().resultTruncationMaxSize(); // user provided a limit: cap result entries to the max
}
var source = logicalPlan.source();
return new Limit(source, new Literal(source, limit, DataType.INTEGER), logicalPlan);
return new Limit(source, new Literal(source, limit, DataType.INTEGER), logicalPlan, true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* | sort first_name
* | limit 15
* <p>
* PushDownAndCombineLimits rule will copy the "limit 15" after "sort emp_no" if there is no filter on the expanded values
* {@link PushDownAndCombineLimits} will copy the "limit 15" after "sort emp_no" if there is no filter on the expanded values
* OR if there is no sort between "limit" and "mv_expand".
* But, since this type of query has such a filter, the "sort emp_no" will have no limit when it reaches the current rule.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,19 @@ public PushDownAndCombineLimits() {
public LogicalPlan rule(Limit limit, LogicalOptimizerContext ctx) {
if (limit.child() instanceof Limit childLimit) {
var limitSource = limit.limit();
var l1 = (int) limitSource.fold(ctx.foldCtx());
var l2 = (int) childLimit.limit().fold(ctx.foldCtx());
return new Limit(limit.source(), Literal.of(limitSource, Math.min(l1, l2)), childLimit.child());
var parentLimitValue = (int) limitSource.fold(ctx.foldCtx());
var childLimitValue = (int) childLimit.limit().fold(ctx.foldCtx());
// We want to preserve the allowDuplicatePastExpandingNode of the smaller limit, so we'll use replaceChild.
return parentLimitValue < childLimitValue ? limit.replaceChild(childLimit.child()) : childLimit;
} else if (limit.child() instanceof UnaryPlan unary) {
if (unary instanceof Eval || unary instanceof Project || unary instanceof RegexExtract || unary instanceof Enrich) {
return unary.replaceChild(limit.replaceChild(unary.child()));
} else if (unary instanceof MvExpand mvx) {
} else if (unary instanceof MvExpand mvx && limit.allowDuplicatePastExpandingNode()) {
// MV_EXPAND can increase the number of rows, so we cannot just push the limit down
// (we also have to preserve the LIMIT afterwards)
//
// To avoid infinite loops, ie.
// | MV_EXPAND | LIMIT -> | LIMIT | MV_EXPAND | LIMIT -> ... | MV_EXPAND | LIMIT
// we add an inner limit to MvExpand and just push down the existing limit, ie.
// | MV_EXPAND | LIMIT N -> | LIMIT N | MV_EXPAND (with limit N)
var limitSource = limit.limit();
var limitVal = (int) limitSource.fold(ctx.foldCtx());
Integer mvxLimit = mvx.limit();
if (mvxLimit == null || mvxLimit > limitVal) {
mvx = new MvExpand(mvx.source(), mvx.child(), mvx.target(), mvx.expanded(), limitVal);
}
return mvx.replaceChild(limit.replaceChild(mvx.child()));
// To avoid repeating this infinitely, we have to set allowDuplicatePastExpandingNode = false.
MvExpand newChild = mvx.replaceChild(limit.replaceChild(mvx.child()));
return new Limit(limit.source(), limit.limit(), newChild, false);
}
// check if there's a 'visible' descendant limit lower than the current one
// and if so, align the current limit since it adds no value
Expand All @@ -62,16 +54,22 @@ public LogicalPlan rule(Limit limit, LogicalOptimizerContext ctx) {
var l1 = (int) limit.limit().fold(ctx.foldCtx());
var l2 = (int) descendantLimit.limit().fold(ctx.foldCtx());
if (l2 <= l1) {
return new Limit(limit.source(), Literal.of(limit.limit(), l2), limit.child());
return new Limit(
limit.source(),
Literal.of(limit.limit(), l2),
limit.child(),
limit.allowDuplicatePastExpandingNode()
);
}
}
}
} else if (limit.child() instanceof Join join) {
} else if (limit.child() instanceof Join join && limit.allowDuplicatePastExpandingNode()) {
if (join.config().type() == JoinTypes.LEFT) {
// NOTE! This is only correct because our LEFT JOINs preserve the number of rows from the left hand side.
// This deviates from SQL semantics. In SQL, multiple matches on the right hand side lead to multiple rows in the output.
// For us, multiple matches on the right hand side are collected into multi-values.
return join.replaceChildren(limit.replaceChild(join.left()), join.right());
// Left joins increase the number of rows if any join key has multiple matches from the right hand side.
// Therefore, we cannot simply push down the limit - but we can add another limit before the join.
// To avoid repeating this infinitely, we have to set allowDuplicatePastExpandingNode = false.
Join newChild = join.replaceChildren(limit.replaceChild(join.left()), join.right());
return new Limit(limit.source(), limit.limit(), newChild, false);
}
}
return limit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ public ReplaceTopNWithLimitAndSort() {

@Override
protected LogicalPlan rule(TopN plan) {
return new Limit(plan.source(), plan.limit(), new OrderBy(plan.source(), plan.child(), plan.order()));
return new Limit(plan.source(), plan.limit(), new OrderBy(plan.source(), plan.child(), plan.order()), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ public PlanFactory visitWhereCommand(EsqlBaseParser.WhereCommandContext ctx) {
public PlanFactory visitLimitCommand(EsqlBaseParser.LimitCommandContext ctx) {
Source source = source(ctx);
int limit = stringToInt(ctx.INTEGER_LITERAL().getText());
return input -> new Limit(source, new Literal(source, limit, DataType.INTEGER), input);
return input -> new Limit(source, new Literal(source, limit, DataType.INTEGER), input, true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,37 @@ public class Limit extends UnaryPlan {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Limit", Limit::new);

private final Expression limit;

public Limit(Source source, Expression limit, LogicalPlan child) {
/**
* Important for optimizations. This should be {@code true} in most cases, which allows this instance to be duplicated past a child plan
* node that increases the number of rows, like for LOOKUP JOIN and MV_EXPAND.
* Needs to be set to {@code false} in {@link org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownAndCombineLimits} to avoid
* infinite loops from adding a duplicate of the limit past the child over and over again.
*/
private final boolean allowDuplicatePastExpandingNode;
alex-spies marked this conversation as resolved.
Show resolved Hide resolved

public Limit(Source source, Expression limit, LogicalPlan child, boolean allowDuplicatePastExpandingNode) {
alex-spies marked this conversation as resolved.
Show resolved Hide resolved
super(source, child);
this.limit = limit;
this.allowDuplicatePastExpandingNode = allowDuplicatePastExpandingNode;
}

private Limit(StreamInput in) throws IOException {
this(Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(Expression.class), in.readNamedWriteable(LogicalPlan.class));
this(
Source.readFrom((PlanStreamInput) in),
in.readNamedWriteable(Expression.class),
in.readNamedWriteable(LogicalPlan.class),
true
);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
Source.EMPTY.writeTo(out);
out.writeNamedWriteable(limit());
out.writeNamedWriteable(child());
// For limits sent to data nodes, allowDuplicatePastExpandingNode should always be true.
alex-spies marked this conversation as resolved.
Show resolved Hide resolved
// That's because if it's false, this means a copy of this limit was pushed down below an MvExpand or Join, and thus there's
// another pipeline breaker further upstream.
}

@Override
Expand All @@ -45,18 +61,22 @@ public String getWriteableName() {

@Override
protected NodeInfo<Limit> info() {
return NodeInfo.create(this, Limit::new, limit, child());
return NodeInfo.create(this, Limit::new, limit, child(), allowDuplicatePastExpandingNode);
}

@Override
public Limit replaceChild(LogicalPlan newChild) {
return new Limit(source(), limit, newChild);
return new Limit(source(), limit, newChild, allowDuplicatePastExpandingNode);
}

public Expression limit() {
return limit;
}

public boolean allowDuplicatePastExpandingNode() {
return allowDuplicatePastExpandingNode;
}

@Override
public String commandName() {
return "LIMIT";
Expand All @@ -69,7 +89,7 @@ public boolean expressionsResolved() {

@Override
public int hashCode() {
return Objects.hash(limit, child());
return Objects.hash(limit, child(), allowDuplicatePastExpandingNode);
}

@Override
Expand All @@ -83,6 +103,8 @@ public boolean equals(Object obj) {

Limit other = (Limit) obj;

return Objects.equals(limit, other.limit) && Objects.equals(child(), other.child());
return Objects.equals(limit, other.limit)
&& Objects.equals(child(), other.child())
&& Objects.equals(allowDuplicatePastExpandingNode, other.allowDuplicatePastExpandingNode);
alex-spies marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,21 @@ public class MvExpand extends UnaryPlan {

private final NamedExpression target;
private final Attribute expanded;
private final Integer limit;

private List<Attribute> output;

public MvExpand(Source source, LogicalPlan child, NamedExpression target, Attribute expanded) {
this(source, child, target, expanded, null);
}

public MvExpand(Source source, LogicalPlan child, NamedExpression target, Attribute expanded, Integer limit) {
super(source, child);
this.target = target;
this.expanded = expanded;
this.limit = limit;
}

private MvExpand(StreamInput in) throws IOException {
this(
Source.readFrom((PlanStreamInput) in),
in.readNamedWriteable(LogicalPlan.class),
in.readNamedWriteable(NamedExpression.class),
in.readNamedWriteable(Attribute.class),
null // we only need this on the coordinator
in.readNamedWriteable(Attribute.class)
);
}

Expand All @@ -58,7 +51,6 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(child());
out.writeNamedWriteable(target());
out.writeNamedWriteable(expanded());
assert limit == null;
}

@Override
Expand Down Expand Up @@ -86,10 +78,6 @@ public Attribute expanded() {
return expanded;
}

public Integer limit() {
return limit;
}

@Override
protected AttributeSet computeReferences() {
return target.references();
Expand All @@ -105,8 +93,8 @@ public boolean expressionsResolved() {
}

@Override
public UnaryPlan replaceChild(LogicalPlan newChild) {
return new MvExpand(source(), newChild, target, expanded, limit);
public MvExpand replaceChild(LogicalPlan newChild) {
return new MvExpand(source(), newChild, target, expanded);
}

@Override
Expand All @@ -119,12 +107,12 @@ public List<Attribute> output() {

@Override
protected NodeInfo<? extends LogicalPlan> info() {
return NodeInfo.create(this, MvExpand::new, child(), target, expanded, limit);
return NodeInfo.create(this, MvExpand::new, child(), target, expanded);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), target, expanded, limit);
return Objects.hash(super.hashCode(), target, expanded);
}

@Override
Expand All @@ -133,6 +121,6 @@ public boolean equals(Object obj) {
return false;
}
MvExpand other = ((MvExpand) obj);
return Objects.equals(target, other.target) && Objects.equals(expanded, other.expanded) && Objects.equals(limit, other.limit);
return Objects.equals(target, other.target) && Objects.equals(expanded, other.expanded);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.Dissect;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
Expand All @@ -34,7 +31,6 @@
import org.elasticsearch.xpack.esql.plan.physical.EvalExec;
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
import org.elasticsearch.xpack.esql.plan.physical.GrokExec;
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.MvExpandExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
Expand Down Expand Up @@ -99,13 +95,7 @@ static PhysicalPlan mapUnary(UnaryPlan p, PhysicalPlan child) {
}

if (p instanceof MvExpand mvExpand) {
MvExpandExec result = new MvExpandExec(mvExpand.source(), child, mvExpand.target(), mvExpand.expanded());
if (mvExpand.limit() != null) {
// MvExpand could have an inner limit
// see PushDownAndCombineLimits rule
return new LimitExec(result.source(), result, new Literal(Source.EMPTY, mvExpand.limit(), DataType.INTEGER));
}
return result;
return new MvExpandExec(mvExpand.source(), child, mvExpand.target(), mvExpand.expanded());
}

return unsupported(p);
Expand Down
Loading