Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Do Not Merge][POC] Calcite integration #993

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ lazy val pplSparkIntegration = (project in file("ppl-spark-integration"))
"com.github.sbt" % "junit-interface" % "0.13.3" % "test",
"org.projectlombok" % "lombok" % "1.18.30",
"com.github.seancfoley" % "ipaddress" % "5.5.1",
"org.apache.calcite" % "calcite-core" % "1.38.0",
"org.apache.calcite" % "calcite-linq4j" % "1.38.0",
"org.apache.calcite" % "calcite-testkit" % "1.38.0" % "test",
),
libraryDependencies ++= deps(sparkVersion),
// ANTLR settings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public List<UnresolvedPlan> getChild() {
return ImmutableList.of(left);
}

public List<UnresolvedPlan> getChildren() {
return ImmutableList.of(left, right);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitJoin(this, context);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite;

import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.RelBuilder.AggCall;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.AggregateFunction;
import org.opensearch.sql.ast.expression.Alias;
import org.opensearch.sql.ast.expression.UnresolvedExpression;
import org.opensearch.sql.calcite.utils.AggregateUtils;

public class CalciteAggCallVisitor extends AbstractNodeVisitor<AggCall, CalcitePlanContext> {
private final CalciteRexNodeVisitor rexNodeVisitor;

public CalciteAggCallVisitor(CalciteRexNodeVisitor rexNodeVisitor) {
this.rexNodeVisitor = rexNodeVisitor;
}

public AggCall analyze(UnresolvedExpression unresolved, CalcitePlanContext context) {
return unresolved.accept(this, context);
}

@Override
public AggCall visitAlias(Alias node, CalcitePlanContext context) {
AggCall aggCall = analyze(node.getDelegated(), context);
return aggCall.as(node.getName());
}

@Override
public AggCall visitAggregateFunction(AggregateFunction node, CalcitePlanContext context) {
RexNode field = rexNodeVisitor.analyze(node.getField(), context);
return AggregateUtils.translate(node, field, context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite;

import lombok.Getter;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.RelBuilder;
import org.opensearch.sql.ast.expression.UnresolvedExpression;

import java.util.function.BiFunction;

public class CalcitePlanContext {

public final RelBuilder relBuilder;
public final ExtendedRexBuilder rexBuilder;

@Getter private boolean isResolvingJoinCondition = false;

public CalcitePlanContext(RelBuilder relBuilder) {
this.relBuilder = relBuilder;
this.rexBuilder = new ExtendedRexBuilder(relBuilder.getRexBuilder());
}

public RexNode resolveJoinCondition(
UnresolvedExpression expr,
BiFunction<UnresolvedExpression, CalcitePlanContext, RexNode> transformFunction) {
isResolvingJoinCondition = true;
RexNode result = transformFunction.apply(expr, this);
isResolvingJoinCondition = false;
return result;
}

public static CalcitePlanContext create(FrameworkConfig config) {
return new CalcitePlanContext(RelBuilder.create(config));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite;

import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.RelBuilder.AggCall;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.AllFields;
import org.opensearch.sql.ast.expression.Argument;
import org.opensearch.sql.ast.expression.Field;
import org.opensearch.sql.ast.expression.QualifiedName;
import org.opensearch.sql.ast.expression.UnresolvedExpression;
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Join;
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.Project;
import org.opensearch.sql.ast.tree.Relation;
import org.opensearch.sql.ast.tree.Sort;
import org.opensearch.sql.ast.tree.SubqueryAlias;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.calcite.utils.JoinAndLookupUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.calcite.sql.SqlKind.AS;
import static org.opensearch.sql.ast.tree.Sort.NullOrder.NULL_FIRST;
import static org.opensearch.sql.ast.tree.Sort.NullOrder.NULL_LAST;
import static org.opensearch.sql.ast.tree.Sort.SortOption.DEFAULT_DESC;
import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC;
import static org.opensearch.sql.ast.tree.Sort.SortOrder.DESC;

public class CalciteRelNodeVisitor extends AbstractNodeVisitor<Void, CalcitePlanContext> {

private final CalciteRexNodeVisitor rexVisitor;
private final CalciteAggCallVisitor aggVisitor;

public CalciteRelNodeVisitor() {
this.rexVisitor = new CalciteRexNodeVisitor();
this.aggVisitor = new CalciteAggCallVisitor(rexVisitor);
}

public Void analyze(UnresolvedPlan unresolved, CalcitePlanContext context) {
return unresolved.accept(this, context);
}

@Override
public Void visitRelation(Relation node, CalcitePlanContext context) {
for (QualifiedName qualifiedName : node.getQualifiedNames()) {
context.relBuilder.scan(qualifiedName.getParts());
}
if (node.getQualifiedNames().size() > 1) {
context.relBuilder.union(true, node.getQualifiedNames().size());
}
return null;
}

@Override
public Void visitFilter(Filter node, CalcitePlanContext context) {
visitChildren(node, context);
RexNode condition = rexVisitor.analyze(node.getCondition(), context);
context.relBuilder.filter(condition);
return null;
}

@Override
public Void visitProject(Project node, CalcitePlanContext context) {
visitChildren(node, context);
List<RexNode> projectList = node.getProjectList().stream()
.filter(expr -> !(expr instanceof AllFields))
.map(expr -> rexVisitor.analyze(expr, context))
.collect(Collectors.toList());
if (projectList.isEmpty()) {
return null;
}
if (node.isExcluded()) {
context.relBuilder.projectExcept(projectList);
} else {
context.relBuilder.project(projectList);
}
return null;
}

@Override
public Void visitSort(Sort node, CalcitePlanContext context) {
visitChildren(node, context);
List<RexNode> sortList = node.getSortList().stream().map(
expr -> {
RexNode sortField = rexVisitor.analyze(expr, context);
Sort.SortOption sortOption = analyzeSortOption(expr.getFieldArgs());
if (sortOption == DEFAULT_DESC) {
return context.relBuilder.desc(sortField);
} else {
return sortField;
}
}).collect(Collectors.toList());
context.relBuilder.sort(sortList);
return null;
}

private Sort.SortOption analyzeSortOption(List<Argument> fieldArgs) {
Boolean asc = (Boolean) fieldArgs.get(0).getValue().getValue();
Optional<Argument> nullFirst =
fieldArgs.stream().filter(option -> "nullFirst".equals(option.getName())).findFirst();

if (nullFirst.isPresent()) {
Boolean isNullFirst = (Boolean) nullFirst.get().getValue().getValue();
return new Sort.SortOption((asc ? ASC : DESC), (isNullFirst ? NULL_FIRST : NULL_LAST));
}
return asc ? Sort.SortOption.DEFAULT_ASC : DEFAULT_DESC;
}

@Override
public Void visitHead(Head node, CalcitePlanContext context) {
visitChildren(node, context);
context.relBuilder.limit(node.getFrom(), node.getSize());
return null;
}

@Override
public Void visitEval(Eval node, CalcitePlanContext context) {
visitChildren(node, context);
List<String> originalFieldNames = context.relBuilder.peek().getRowType().getFieldNames();
List<RexNode> evalList = node.getExpressionList().stream()
.map(expr -> {
RexNode eval = rexVisitor.analyze(expr, context);
context.relBuilder.projectPlus(eval);
return eval;
}).collect(Collectors.toList());
// Overriding the existing field if the alias has the same name with original field name. For example, eval field = 1
List<String> overriding = evalList.stream().filter(expr -> expr.getKind() == AS)
.map(expr -> ((RexLiteral) ((RexCall) expr).getOperands().get(1)).getValueAs(String.class))
.collect(Collectors.toList());
overriding.retainAll(originalFieldNames);
if (!overriding.isEmpty()) {
List<RexNode> toDrop = context.relBuilder.fields(overriding);
context.relBuilder.projectExcept(toDrop);
}
return null;
}

@Override
public Void visitAggregation(Aggregation node, CalcitePlanContext context) {
visitChildren(node, context);
List<AggCall> aggList = node.getAggExprList().stream()
.map(expr -> aggVisitor.analyze(expr, context))
.collect(Collectors.toList());
List<RexNode> groupByList = node.getGroupExprList().stream()
.map(expr -> rexVisitor.analyze(expr, context))
.collect(Collectors.toList());

UnresolvedExpression span = node.getSpan();
if (!Objects.isNull(span)) {
RexNode spanRex = rexVisitor.analyze(span, context);
groupByList.add(spanRex);
//add span's group alias field (most recent added expression)
}
// List<RexNode> aggList = node.getAggExprList().stream()
// .map(expr -> rexVisitor.analyze(expr, context))
// .collect(Collectors.toList());
// relBuilder.aggregate(relBuilder.groupKey(groupByList),
// aggList.stream().map(rex -> (MyAggregateCall) rex)
// .map(MyAggregateCall::getCall).collect(Collectors.toList()));
context.relBuilder.aggregate(context.relBuilder.groupKey(groupByList), aggList);
return null;
}

@Override
public Void visitJoin(Join node, CalcitePlanContext context) {
List<UnresolvedPlan> children = node.getChildren();
children.forEach(c -> analyze(c, context));
RexNode joinCondition = node.getJoinCondition().map(c -> rexVisitor.analyzeJoinCondition(c, context))
.orElse(context.relBuilder.literal(true));
context.relBuilder.join(JoinAndLookupUtils.translateJoinType(node.getJoinType()), joinCondition);
return null;
}

@Override
public Void visitSubqueryAlias(SubqueryAlias node, CalcitePlanContext context) {
visitChildren(node, context);
context.relBuilder.as(node.getAlias());
return null;
}

@Override
public Void visitLookup(Lookup node, CalcitePlanContext context) {
// 1. resolve source side
visitChildren(node, context);
// get sourceOutputFields from top of stack which is used to build final output
List<RexNode> sourceOutputFields = context.relBuilder.fields();

// 2. resolve lookup table
analyze(node.getLookupRelation(), context);
// If the output fields are specified, build a project list for lookup table.
// The mapping fields of lookup table should be added in this project list, otherwise join will fail.
// So the mapping fields of lookup table should be dropped after join.
List<RexNode> projectList = JoinAndLookupUtils.buildLookupRelationProjectList(node, rexVisitor, context);
if (!projectList.isEmpty()) {
context.relBuilder.project(projectList);
}

// 3. resolve join condition
RexNode joinCondition = JoinAndLookupUtils.buildLookupMappingCondition(node)
.map(c -> rexVisitor.analyzeJoinCondition(c, context))
.orElse(context.relBuilder.literal(true));

// 4. If no output field is specified, all fields from lookup table are applied to the output.
if (node.allFieldsShouldAppliedToOutputList()) {
context.relBuilder.join(JoinRelType.LEFT, joinCondition);
return null;
}

// 5. push join to stack
context.relBuilder.join(JoinRelType.LEFT, joinCondition);

// 6. Drop the mapping fields of lookup table in result:
// For example, in command "LOOKUP lookTbl Field1 AS Field2, Field3",
// the Field1 and Field3 are projection fields and join keys which will be dropped in result.
List<Field> mappingFieldsOfLookup = node.getLookupMappingMap().entrySet().stream()
.map(kv -> kv.getKey().getField() == kv.getValue().getField() ? JoinAndLookupUtils.buildFieldWithLookupSubqueryAlias(node, kv.getKey()) : kv.getKey())
.collect(Collectors.toList());
List<RexNode> dropListOfLookupMappingFields =
JoinAndLookupUtils.buildProjectListFromFields(mappingFieldsOfLookup, rexVisitor, context);
// Drop the $sourceOutputField if existing
List<RexNode> dropListOfSourceFields =
node.getFieldListWithSourceSubqueryAlias().stream().map( field -> {
try {
return rexVisitor.analyze(field, context);
} catch (RuntimeException e) {
// If the field is not found in the source, skip it
return null;
}
}).filter(Objects::nonNull).collect(Collectors.toList());
List<RexNode> toDrop = new ArrayList<>(dropListOfLookupMappingFields);
toDrop.addAll(dropListOfSourceFields);

// 7. build final outputs
List<RexNode> outputFields = new ArrayList<>(sourceOutputFields);
// Add new columns based on different strategies:
// Append: coalesce($outputField, $"inputField").as(outputFieldName)
// Replace: $outputField.as(outputFieldName)
outputFields.addAll(JoinAndLookupUtils.buildOutputProjectList(node, rexVisitor, context));
outputFields.removeAll(toDrop);

context.relBuilder.project(outputFields);

return null;
}
}
Loading
Loading