Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#963 Unimplemented PPL Sort Syntax #994

Merged
merged 12 commits into from
Jan 9, 2025
2 changes: 1 addition & 1 deletion docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ source = table | where ispresent(a) |
- `source=accounts | parse email '.+@(?<host>.+)' | stats count() by host`
- `source=accounts | parse email '.+@(?<host>.+)' | eval eval_result=1 | fields host, eval_result`
- `source=accounts | parse email '.+@(?<host>.+)' | where age > 45 | sort - age | fields age, email, host`
- `source=accounts | parse address '(?<streetNumber>\d+) (?<street>.+)' | where streetNumber > 500 | sort num(streetNumber) | fields streetNumber, street`
- `source=accounts | parse address '(?<streetNumber>\d+) (?<street>.+)' | eval streetNumberInt = cast(streetNumber as integer) | where streetNumberInt > 500 | sort streetNumberInt | fields streetNumber, street`
- Limitation: [see limitations](ppl-parse-command.md#limitations)

#### **Grok**
Expand Down
2 changes: 1 addition & 1 deletion docs/ppl-lang/ppl-parse-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ The example shows how to sort street numbers that are higher than 500 in ``addre

PPL query:

os> source=accounts | parse address '(?<streetNumber>\d+) (?<street>.+)' | where cast(streetNumber as int) > 500 | sort num(streetNumber) | fields streetNumber, street ;
os> source=accounts | parse address '(?<streetNumber>\d+) (?<street>.+)' | eval streetNumberInt = cast(streetNumber as integer) | where streetNumberInt > 500 | sort streetNumberInt | fields streetNumber, street ;
fetched rows / total rows = 3/3
+----------------+----------------+
| streetNumber | street |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@

package org.opensearch.flint.spark.ppl

import scala.reflect.internal.Reporter.Count

import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq

import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Coalesce, Descending, GreaterThan, Literal, NullsFirst, NullsLast, RegExpExtract, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Cast, Descending, GreaterThan, Literal, NullsFirst, NullsLast, RegExpExtract, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, GlobalLimit, LocalLimit, LogicalPlan, Project, Sort}
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.types.IntegerType

class FlintSparkPPLParseITSuite
extends QueryTest
Expand Down Expand Up @@ -214,10 +211,16 @@ class FlintSparkPPLParseITSuite
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("test parse email & host expressions including cast and sort commands") {
val frame = sql(s"""
| source = $testTable| parse street_address '(?<streetNumber>\\d+) (?<street>.+)' | where streetNumber > 500 | sort num(streetNumber) | fields streetNumber, street
| """.stripMargin)
test("test parse street number & street expressions including cast and sort commands") {

// TODO #963: Implement 'num', 'str', and 'ip' sort syntax
val query = s"source = $testTable | " +
"parse street_address '(?<streetNumber>\\d+) (?<street>.+)' | " +
"eval streetNumberInt = cast(streetNumber as integer) | " +
"where streetNumberInt > 500 | " +
"sort streetNumberInt | " +
"fields streetNumber, street"
val frame = sql(query)
// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
Expand All @@ -233,36 +236,36 @@ class FlintSparkPPLParseITSuite
// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical

val addressAttribute = UnresolvedAttribute("street_address")
val streetAddressAttribute = UnresolvedAttribute("street_address")
val streetNumberAttribute = UnresolvedAttribute("streetNumber")
val streetAttribute = UnresolvedAttribute("street")
val streetNumberIntAttribute = UnresolvedAttribute("streetNumberInt")

val streetNumberExpression = Alias(
RegExpExtract(
addressAttribute,
Literal("(?<streetNumber>\\d+) (?<street>.+)"),
Literal("1")),
"streetNumber")()
val regexLiteral = Literal("(?<streetNumber>\\d+) (?<street>.+)")
val streetNumberExpression =
Alias(RegExpExtract(streetAddressAttribute, regexLiteral, Literal("1")), "streetNumber")()
val streetExpression =
Alias(RegExpExtract(streetAddressAttribute, regexLiteral, Literal("2")), "street")()

val streetExpression = Alias(
RegExpExtract(
addressAttribute,
Literal("(?<streetNumber>\\d+) (?<street>.+)"),
Literal("2")),
"street")()
val castExpression = Cast(streetNumberAttribute, IntegerType)

val expectedPlan = Project(
Seq(streetNumberAttribute, streetAttribute),
Sort(
Seq(SortOrder(streetNumberAttribute, Ascending, NullsFirst, Seq.empty)),
Seq(SortOrder(streetNumberIntAttribute, Ascending, NullsFirst, Seq.empty)),
global = true,
Filter(
GreaterThan(streetNumberAttribute, Literal(500)),
GreaterThan(streetNumberIntAttribute, Literal(500)),
Project(
Seq(addressAttribute, streetNumberExpression, streetExpression, UnresolvedStar(None)),
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))))))
Seq(UnresolvedStar(None), Alias(castExpression, "streetNumberInt")()),
Project(
Seq(
streetAddressAttribute,
streetNumberExpression,
streetExpression,
UnresolvedStar(None)),
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))))))

assert(compareByString(expectedPlan) === compareByString(logicalPlan))
}

}
4 changes: 2 additions & 2 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ DATASOURCES: 'DATASOURCES';
USING: 'USING';
WITH: 'WITH';

// FIELD KEYWORDS
// SORT FIELD KEYWORDS
// TODO #963: Implement 'num', 'str', and 'ip' sort syntax
AUTO: 'AUTO';
STR: 'STR';
IP: 'IP';
NUM: 'NUM';


// FIELDSUMMARY keywords
FIELDSUMMARY: 'FIELDSUMMARY';
INCLUDEFIELDS: 'INCLUDEFIELDS';
Expand Down
11 changes: 7 additions & 4 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,8 @@ sortField

sortFieldExpression
currantw marked this conversation as resolved.
Show resolved Hide resolved
: fieldExpression

// TODO #963: Implement 'num', 'str', and 'ip' sort syntax
| AUTO LT_PRTHS fieldExpression RT_PRTHS
| STR LT_PRTHS fieldExpression RT_PRTHS
| IP LT_PRTHS fieldExpression RT_PRTHS
Expand Down Expand Up @@ -1095,10 +1097,6 @@ keywordsCanBeId
| INDEX
| DESC
| DATASOURCES
| AUTO
| STR
| IP
| NUM
| FROM
| PATTERN
| NEW_FIELD
Expand Down Expand Up @@ -1181,4 +1179,9 @@ keywordsCanBeId
| BETWEEN
| CIDRMATCH
| trendlineType
// SORT FIELD KEYWORDS
| AUTO
| STR
| IP
| NUM
;
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@
import org.opensearch.sql.ast.expression.subquery.ExistsSubquery;
import org.opensearch.sql.ast.expression.subquery.InSubquery;
import org.opensearch.sql.ast.expression.subquery.ScalarSubquery;
import org.opensearch.sql.ast.tree.Trendline;
import org.opensearch.sql.common.antlr.SyntaxCheckException;
import org.opensearch.sql.common.utils.StringUtils;
import org.opensearch.sql.ppl.utils.ArgumentFactory;

Expand Down Expand Up @@ -183,6 +181,8 @@ public UnresolvedExpression visitWcFieldExpression(OpenSearchPPLParser.WcFieldEx

@Override
public UnresolvedExpression visitSortField(OpenSearchPPLParser.SortFieldContext ctx) {

// TODO #963: Implement 'num', 'str', and 'ip' sort syntax
return new Field((QualifiedName)
visit(ctx.sortFieldExpression().fieldExpression().qualifiedName()),
ArgumentFactory.getArgumentList(ctx));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ import org.scalatest.matchers.should.Matchers
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.ScalaReflection.universe.Star
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Coalesce, Descending, GreaterThan, Literal, NamedExpression, NullsFirst, NullsLast, RegExpExtract, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Cast, Coalesce, Descending, GreaterThan, Literal, NamedExpression, NullsFirst, NullsLast, RegExpExtract, SortOrder}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, GlobalLimit, LocalLimit, Project, Sort}
import org.apache.spark.sql.types.IntegerType

class PPLLogicalPlanParseTranslatorTestSuite
extends SparkFunSuite
Expand Down Expand Up @@ -120,43 +121,49 @@ class PPLLogicalPlanParseTranslatorTestSuite
assert(compareByString(expectedPlan) === compareByString(logPlan))
}

test("test parse email & host expressions including cast and sort commands") {
test("test parse street number & street expressions including cast and sort commands") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(
pplParser,
"source=t | parse address '(?<streetNumber>\\d+) (?<street>.+)' | where streetNumber > 500 | sort num(streetNumber) | fields streetNumber, street"),
context)

// TODO #963: Implement 'num', 'str', and 'ip' sort syntax
val query =
"source=t" +
" | parse address '(?<streetNumber>\\d+) (?<street>.+)'" +
" | eval streetNumberInt = cast(streetNumber as integer)" +
" | where streetNumberInt > 500" +
" | sort streetNumberInt" +
" | fields streetNumber, street"

val logPlan = planTransformer.visit(plan(pplParser, query), context)

val addressAttribute = UnresolvedAttribute("address")
val streetNumberAttribute = UnresolvedAttribute("streetNumber")
val streetAttribute = UnresolvedAttribute("street")
val streetNumberIntAttribute = UnresolvedAttribute("streetNumberInt")

val streetNumberExpression = Alias(
RegExpExtract(
addressAttribute,
Literal("(?<streetNumber>\\d+) (?<street>.+)"),
Literal("1")),
"streetNumber")()
val regexLiteral = Literal("(?<streetNumber>\\d+) (?<street>.+)")
val streetNumberExpression =
Alias(RegExpExtract(addressAttribute, regexLiteral, Literal("1")), "streetNumber")()
val streetExpression =
Alias(RegExpExtract(addressAttribute, regexLiteral, Literal("2")), "street")()

val streetExpression = Alias(
RegExpExtract(
addressAttribute,
Literal("(?<streetNumber>\\d+) (?<street>.+)"),
Literal("2")),
"street")()
val castExpression = Cast(streetNumberAttribute, IntegerType)

val expectedPlan = Project(
Seq(streetNumberAttribute, streetAttribute),
Sort(
Seq(SortOrder(streetNumberAttribute, Ascending, NullsFirst, Seq.empty)),
Seq(SortOrder(streetNumberIntAttribute, Ascending, NullsFirst, Seq.empty)),
global = true,
Filter(
GreaterThan(streetNumberAttribute, Literal(500)),
GreaterThan(streetNumberIntAttribute, Literal(500)),
Project(
Seq(addressAttribute, streetNumberExpression, streetExpression, UnresolvedStar(None)),
UnresolvedRelation(Seq("t"))))))
Seq(UnresolvedStar(None), Alias(castExpression, "streetNumberInt")()),
Project(
Seq(
addressAttribute,
streetNumberExpression,
streetExpression,
UnresolvedStar(None)),
UnresolvedRelation(Seq("t")))))))

assert(compareByString(expectedPlan) === compareByString(logPlan))
}
Expand Down
Loading