Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#957 Implement earliest and latest functions #1018

Merged
merged 6 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,23 @@ _- **Limitation: another command usage of (relation) subquery is in `appendcols`
- `source = table | eval cdate = CAST('2012-08-07' as date), ctime = cast('2012-08-07T08:07:06' as timestamp) | fields cdate, ctime`
- `source = table | eval chained_cast = cast(cast("true" as boolean) as integer) | fields chained_cast`

### **Relative Time Functions**

#### **relative_timestamp**
[See additional function details](functions/ppl-datetime#RELATIVE_TIMESTAMP)
[See additional function details](functions/ppl-datetime#relative_timestamp)
- `source = table | eval one_hour_ago = relative_timestamp("-1h") | where timestamp < one_hour_ago`
- `source = table | eval start_of_today = relative_timestamp("@d") | where timestamp > start_of_today`
- `source = table | eval last_saturday = relative_timestamp("-1d@w6") | where timestamp >= last_saturday`

#### **earliest**
[See additional function details](functions/ppl-datetime#earliest)
- `source = table | where earliest("-1wk", timestamp)`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the case where earliest is interpreted as
timestamp >= now() - 1s, depend on when the now() is resolved, it may produce insonsistent result?

Copy link
Contributor Author

@currantw currantw Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe so. Relative timestamp are based on calls to CurrentTimestamp (see org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala). If you look at the documentation for CurrentTimestamp, it says that:

Returns the current timestamp at the start of query evaluation. All calls of current_timestamp within the same query return the same value.

So now(), relative_timestamp("now"), earliest("now", timestamp), latest("now", timestamp) and current_timestamp should all return consistent results within the same query. I tested this out manually as well to verify (i.e. repeated calls to these methods within the same query return the same now timestamp, down to the millisecond).

- `source = table | where earliest("@qtr", timestamp)`
- `source = table | where earliest("-2y@q", timestamp)`

#### **latest**
[See additional function details](functions/ppl-datetime#latest)
- `source = table | where latest("-60m", timestamp)`
- `source = table | where latest("@year", timestamp)`
- `source = table | where latest("-day@w1", timestamp)`
---
87 changes: 83 additions & 4 deletions docs/ppl-lang/functions/ppl-datetime.md
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,44 @@ Example:
+-------------------------------+


### `EARLIEST`

**Description:**

**Usage:** earliest(string, timestamp) returns whether the timestamp defined by the given relative string is earlier
than or at the same time as the specified timestamp.

Argument type: STRING, TIMESTAMP

Return type: BOOLEAN

Example:

os> source=people | eval earliest = earliest("-1s", now()) | fields earliest | head 1
fetched rows / total rows = 1/1
+----------+
| earliest |
|----------|
| True |
+----------+

os> source=people | eval earliest = earliest("now", now()) | fields earliest | head 1
fetched rows / total rows = 1/1
+----------+
| earliest |
|----------|
| True |
+----------+

os> source=people | eval earliest = earliest("+1s", now()) | fields earliest | head 1
fetched rows / total rows = 1/1
+----------+
| earliest |
|----------|
| False |
+----------+


### `FROM_UNIXTIME`

**Description:**
Expand Down Expand Up @@ -507,6 +545,44 @@ Example:
+--------------------------+


### `LATEST`

**Description:**

**Usage:** latest(string, timestamp) returns whether the timestamp defined by the given relative string is later
than or at the same time as the specified timestamp. See [relative_timestamp](#relative_timestamp) for more details.

Argument type: STRING, TIMESTAMP

Return type: BOOLEAN

Example:

os> source=people | eval latest = latest("-1s", now()) | fields latest | head 1
fetched rows / total rows = 1/1
+--------+
| latest |
|--------|
| False |
+--------+

os> source=people | eval latest = latest("now", now()) | fields latest | head 1
fetched rows / total rows = 1/1
+--------+
| latest |
|--------|
| True |
+--------+

os> source=people | eval latest = latest("+1s", now()) | fields latest | head 1
fetched rows / total rows = 1/1
+--------+
| latest |
|--------|
| True |
+--------+


### `LOCALTIMESTAMP`

**Description:**
Expand Down Expand Up @@ -738,7 +814,7 @@ Example:
**Description:**


**Usage:** relative_timestamp(str) returns a relative timestamp corresponding to the given relative string and the
**Usage:** relative_timestamp(string) returns a relative timestamp corresponding to the given relative string and the
current timestamp at the time of query execution.

The relative timestamp string has syntax `[+|-]<offset_time_integer><offset_time_unit>@<snap_time_unit>`, and is
Expand All @@ -750,9 +826,12 @@ made up of two optional components.
specified), and rounds the time <i>down</i> to the start of the specified time unit. For example, `@wk` is the start
of the current week (Sunday is considered to be the first day of the week).

The special relative timestamp string `now`, corresponding to the current timestamp, is also supported. The current
timestamp is determined once at the start of query execution, and is used for all relative timestamp calculations for
that query.
The special relative timestamp string `now`, corresponding to the current timestamp, is also supported.

The current timestamp is determined once at the start of query execution, and is used for all relative timestamp
calculations for that query. The Spark session time zone (`spark.sql.session.timeZone`) is used for determining
relative timestamps, and accounts for changes in the time zone offset (e.g. daylight savings time); as a result, adding
one day (`+1d`) is not the same as adding twenty-four hours (`+24h`).
currantw marked this conversation as resolved.
Show resolved Hide resolved

The relative timestamp string is case-insensitive.

Expand Down
1 change: 1 addition & 0 deletions docs/ppl-lang/ppl-where-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,4 @@ PPL query:
| where case(factor = 2, 'even', factor = 4, 'even', factor = 6, 'even', factor = 8, 'even' else 'odd') = 'even'
| stats count() by factor`
- `source = table | where timestamp >= relative_timestamp("-1d@w6")`
- `source = table | where earliest("-1d@w0", timestamp) and latest("now")`
Original file line number Diff line number Diff line change
Expand Up @@ -406,10 +406,34 @@ class FlintSparkPPLBuiltInDateTimeFunctionITSuite
assertSameRows(Seq(Row(true)), frame)
}

test("test EARLIEST") {
var frame = sql(s"""
| source=$testTable
| | eval earliest_second_before = earliest("-1s", now())
| | eval earliest_now = earliest("now", now())
| | eval earliest_second_after = earliest("+1s", now())
| | fields earliest_second_before, earliest_now, earliest_second_after
currantw marked this conversation as resolved.
Show resolved Hide resolved
| | head 1
| """.stripMargin)
assertSameRows(Seq(Row(true, true, false)), frame)
}

test("test LATEST") {
var frame = sql(s"""
| source=$testTable
| | eval latest_second_before = latest("-1s", now())
| | eval latest_now = latest("now", now())
| | eval latest_second_after = latest("+1s", now())
| | fields latest_second_before, latest_now, latest_second_after
| | head 1
| """.stripMargin)
assertSameRows(Seq(Row(false, true, true)), frame)
}

// TODO #957: Support earliest
ignore("test EARLIEST") {
var frame = sql(s"""
| source=$testTable
| source = $testTable
| | eval earliest_hour_before = earliest(now(), "-1h")
| | eval earliest_now = earliest(now(), "now")
| | eval earliest_hour_after = earliest(now(), "+1h")
Expand All @@ -422,7 +446,7 @@ class FlintSparkPPLBuiltInDateTimeFunctionITSuite
// TODO #957: Support latest
ignore("test LATEST") {
var frame = sql(s"""
| source=$testTable
| source = $testTable
| | eval latest_hour_before = latest(now(), "-1h")
| | eval latest_now = latest(now(), "now")
| | eval latest_hour_after = latest(now(), "+1h")
Expand Down
10 changes: 5 additions & 5 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,6 @@ FIRST: 'FIRST';
LAST: 'LAST';
LIST: 'LIST';
VALUES: 'VALUES';
EARLIEST: 'EARLIEST';
EARLIEST_TIME: 'EARLIEST_TIME';
LATEST: 'LATEST';
LATEST_TIME: 'LATEST_TIME';
Comment on lines -250 to -253
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These aggregation commands don't appear to have ever been implemented. They are not used, referenced, or documented anywhere outside the auto-generated Antlr modules, that I can find. Moreover, I think that their functionality (as best as I can guess from the name, since they aren't documented) can just be accomplished using the existing min and max functions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there spark issues that refer to this syntax? if so should mention that they are no longer relevant

or does sql has these implemented? If so might want to create future issues to implement this as we are striving for feature parity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there spark issues that refer to this syntax? if so should mention that they are no longer relevant

Good idea. I searched and didn't find anything.

or does sql has these implemented? If so might want to create future issues to implement this as we are striving for feature parity.

OpenSearch SQL does not appear to have these implemented either, and I similarly wasn't able to find any GitHub issues that made mentioned of it.

@YANG-DB I assume it also makes sense to remove this syntax from OpenSearch SQL for parity? Should I raise an issue and create a (tiny) PR for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YANG-DB I have raise a cleanup issue on OpenSearch SQL here. Would you be able to triage it and I will create a (trivial) pull request to resolve it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: here is the pull request.

Copy link
Member

@YANG-DB YANG-DB Jan 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there spark issues that refer to this syntax? if so should mention that they are no longer relevant

Good idea. I searched and didn't find anything.

or does sql has these implemented? If so might want to create future issues to implement this as we are striving for feature parity.

OpenSearch SQL does not appear to have these implemented either, and I similarly wasn't able to find any GitHub issues that made mentioned of it.

@YANG-DB I assume it also makes sense to remove this syntax from OpenSearch SQL for parity? Should I raise an issue and create a (tiny) PR for this?

Yes I agree - thanks for creating a PR for parity in the SQL repo 🙏

PER_DAY: 'PER_DAY';
PER_HOUR: 'PER_HOUR';
PER_MINUTE: 'PER_MINUTE';
Expand Down Expand Up @@ -338,7 +334,6 @@ MONTHNAME: 'MONTHNAME';
NOW: 'NOW';
PERIOD_ADD: 'PERIOD_ADD';
PERIOD_DIFF: 'PERIOD_DIFF';
RELATIVE_TIMESTAMP: 'RELATIVE_TIMESTAMP';
SEC_TO_TIME: 'SEC_TO_TIME';
STR_TO_DATE: 'STR_TO_DATE';
SUBDATE: 'SUBDATE';
Expand All @@ -360,6 +355,11 @@ UTC_TIMESTAMP: 'UTC_TIMESTAMP';
WEEKDAY: 'WEEKDAY';
YEARWEEK: 'YEARWEEK';

// RELATIVE TIME FUNCTIONS
RELATIVE_TIMESTAMP: 'RELATIVE_TIMESTAMP';
EARLIEST: 'EARLIEST';
LATEST: 'LATEST';

// TEXT FUNCTIONS
SUBSTR: 'SUBSTR';
SUBSTRING: 'SUBSTRING';
Expand Down
12 changes: 7 additions & 5 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,6 @@ dateTimeFunctionName
| NOW
| PERIOD_ADD
| PERIOD_DIFF
| RELATIVE_TIMESTAMP
| QUARTER
| SECOND
| SECOND_OF_MINUTE
Expand All @@ -780,6 +779,13 @@ dateTimeFunctionName
| WEEK_OF_YEAR
| YEAR
| YEARWEEK
| relativeTimeFunctionName
;

relativeTimeFunctionName
: RELATIVE_TIMESTAMP
| EARLIEST
| LATEST
;

getFormatFunction
Expand Down Expand Up @@ -1171,10 +1177,6 @@ keywordsCanBeId
| LAST
| LIST
| VALUES
| EARLIEST
| EARLIEST_TIME
| LATEST
| LATEST_TIME
| PER_DAY
| PER_HOUR
| PER_MINUTE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,10 @@ public enum BuiltinFunctionName {
LOCALTIMESTAMP(FunctionName.of("localtimestamp")),
SYSDATE(FunctionName.of("sysdate")),

// Relative timestamp functions
// Relative time functions
RELATIVE_TIMESTAMP(FunctionName.of("relative_timestamp")),
EARLIEST(FunctionName.of("earliest")),
LATEST(FunctionName.of("latest")),

/** Text Functions. */
TOSTRING(FunctionName.of("tostring")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,7 @@ public Instant apply(String relativeString, Object currentTimestamp, String zone
? ((Timestamp) currentTimestamp).toInstant()
: (Instant) currentTimestamp;

/// The Spark session time zone (`spark.sql.session.timeZone`)
/// is used, which may be different from the system time zone.
ZoneId zoneId = ZoneId.of(zoneIdString);

/// Relative time calculations are performed using [ZonedDateTime] because offsets (e.g. one hour ago)
/// need to account for changes in the time zone offset (e.g. daylight savings time), while snaps (e.g.
/// start of previous Wednesday) need to account for the local date time.
currantw marked this conversation as resolved.
Show resolved Hide resolved
ZonedDateTime currentDateTime = ZonedDateTime.ofInstant(currentInstant, zoneId);
ZonedDateTime relativeDateTime = TimeUtils.getRelativeZonedDateTime(relativeString, currentDateTime);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.apache.spark.sql.catalyst.expressions.CurrentTimestamp$;
import org.apache.spark.sql.catalyst.expressions.DateAddInterval$;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual$;
import org.apache.spark.sql.catalyst.expressions.LessThanOrEqual$;
import org.apache.spark.sql.catalyst.expressions.Literal$;
import org.apache.spark.sql.catalyst.expressions.ScalaUDF;
import org.apache.spark.sql.catalyst.expressions.TimestampAdd$;
Expand All @@ -37,13 +39,15 @@
import static org.opensearch.sql.expression.function.BuiltinFunctionName.DATE_SUB;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.DAY_OF_MONTH;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.COALESCE;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.EARLIEST;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.JSON;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.JSON_ARRAY;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.JSON_ARRAY_LENGTH;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.JSON_EXTRACT;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.JSON_KEYS;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.JSON_OBJECT;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.JSON_VALID;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.LATEST;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.SUBTRACT;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MULTIPLY;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.DIVIDE;
Expand Down Expand Up @@ -174,9 +178,23 @@ public interface BuiltinFunctionTransformer {
args -> {
return ToUTCTimestamp$.MODULE$.apply(CurrentTimestamp$.MODULE$.apply(), CurrentTimeZone$.MODULE$.apply());
})

// Relative time functions
.put(
RELATIVE_TIMESTAMP,
BuiltinFunctionTransformer::buildRelativeTimestampExpression)
.put(
EARLIEST,
args ->
LessThanOrEqual$.MODULE$.apply(
buildRelativeTimestampExpression(List.of(args.get(0))),
args.get(1)))
currantw marked this conversation as resolved.
Show resolved Hide resolved
.put(
RELATIVE_TIMESTAMP,
args -> SerializableUdf.visit("relative_timestamp", List.of(args.get(0), CurrentTimestamp$.MODULE$.apply(), CurrentTimeZone$.MODULE$.apply())))
LATEST,
args ->
GreaterThanOrEqual$.MODULE$.apply(
buildRelativeTimestampExpression(List.of(args.get(0))),
args.get(1)))
.build();

static Expression builtinFunction(org.opensearch.sql.ast.expression.Function function, List<Expression> args) {
Expand Down Expand Up @@ -218,4 +236,10 @@ static Expression[] createIntervalArgs(IntervalUnit unit, Expression value) {
}
return args;
}

private static Expression buildRelativeTimestampExpression(List<Expression> args) {
return SerializableUdf.visit(
RELATIVE_TIMESTAMP.getName().getFunctionName(),
List.of(args.get(0), CurrentTimestamp$.MODULE$.apply(), CurrentTimeZone$.MODULE$.apply()));
}
}
Loading