diff --git a/docs/ppl-lang/PPL-Example-Commands.md b/docs/ppl-lang/PPL-Example-Commands.md index 5a61992de..23409798f 100644 --- a/docs/ppl-lang/PPL-Example-Commands.md +++ b/docs/ppl-lang/PPL-Example-Commands.md @@ -494,4 +494,9 @@ _- **Limitation: another command usage of (relation) subquery is in `appendcols` - `source = table | eval cdate = CAST('2012-08-07' as date), ctime = cast('2012-08-07T08:07:06' as timestamp) | fields cdate, ctime` - `source = table | eval chained_cast = cast(cast("true" as boolean) as integer) | fields chained_cast` +#### **relative_timestamp** +[See additional function details](functions/ppl-datetime#RELATIVE_TIMESTAMP) +- `source = table | eval one_hour_ago = relative_timestamp("-1h") | where timestamp < one_hour_ago` +- `source = table | eval start_of_today = relative_timestamp("@d") | where timestamp > start_of_today` +- `source = table | eval last_saturday = relative_timestamp("-1d@w6") | where timestamp >= last_saturday` --- diff --git a/docs/ppl-lang/functions/ppl-datetime.md b/docs/ppl-lang/functions/ppl-datetime.md index e479176a4..de345b581 100644 --- a/docs/ppl-lang/functions/ppl-datetime.md +++ b/docs/ppl-lang/functions/ppl-datetime.md @@ -733,6 +733,93 @@ Example: | 3 | +-------------------------------+ +### `RELATIVE_TIMESTAMP` + +**Description:** + + +**Usage:** relative_timestamp(str) returns a relative timestamp corresponding to the given relative string and the +current timestamp at the time of query execution. + +The relative timestamp string has syntax `[+|-]@`, and is +made up of two optional components. +* An offset from the current timestamp, which is composed of a sign (`+` or `-`), optional `offset_time_integer`, and + `offset_time_unit`. If the offset time integer is not specified, it defaults to `1`. For example, `+2hr` is two + hours after the current timestamp, while `-mon` is one month ago. +* A snap-to time using the `@` symbol followed by `snap_time_unit`. The snap-to time is applied after the offset (if + specified), and rounds the time down to the start of the specified time unit. For example, `@wk` is the start + of the current week (Sunday is considered to be the first day of the week). + +The special relative timestamp string `now`, corresponding to the current timestamp, is also supported. The current +timestamp is determined once at the start of query execution, and is used for all relative timestamp calculations for +that query. + +The relative timestamp string is case-insensitive. + +The following values are supported for `offset_time_unit`: + +| Time Unit | Supported Keywords | +|-----------|-------------------------------------------| +| Seconds | `s`, `sec`, `secs`, `second`, `seconds` | +| Minutes | `m`, `min`, `mins`, `minute`, `minutes` | +| Hours | `h`, `hr`, `hrs`, `hour`, `hours` | +| Days | `d`, `day`, `days` | +| Weeks | `w`, `wk`, `wks`, `week`, `weeks` | +| Quarters | `q`, `qtr`, `qtrs`, `quarter`, `quarters` | +| Years | `y`, `yr`, `yrs`, `year`, `years` | + +All the time units above are supported for `snap_time_unit`, as well as the following day-of-the-week time units: + +| Time Unit | Supported Keywords | +|-----------|--------------------| +| Sunday | `w0`, `w7` | +| Monday | `w1` | +| Tuesday | `w2` | +| Wednesday | `w3` | +| Thursday | `w4` | +| Friday | `w5` | +| Saturday | `w6` | + +For example, if the current timestamp is Monday, January 03, 2000 at 01:01:01 am: + +| Relative String | Description | Resulting Relative Time | +|-----------------|--------------------------------------------------------------|---------------------------------------------| +| `-60m` | Sixty minutes ago | Monday, January 03, 2000 at 00:01:01 am | +| `-1H` | One hour ago | Monday, January 03, 2000 at 00:01:01 am | +| `+2wk` | Two weeks from now | Monday, January 17, 2000 at 00:01:01 am | +| `-1h@W3` | One hour ago, rounded to the start of the previous Wednesday | Wednesday, December 29, 1999 at 00:00:00 am | +| `@d` | Start of the current day | Monday, January 03, 2000 at 00:00:00 am | +| `now` | Now | Monday, January 03, 2000 at 01:01:01 am | + +Argument type: STRING + +Return type: TIMESTAMP + +Example: + + os> source=people | eval seconds_diff = timestampdiff(SECOND, now(), relative_timestamp("now")) | fields seconds_diff | head 1 + fetched rows / total rows = 1/1 + +--------------+ + | seconds_diff | + |--------------+ + | 0 | + +--------------+ + + os> source=people | eval hours_diff = timestampdiff(HOUR, now(), relative_timestamp("+1h")) | fields hours_diff | head 1 + fetched rows / total rows = 1/1 + +------------+ + | hours_diff | + |------------+ + | 1 | + +------------+ + + os> source=people | eval day = day_of_week(relative_timestamp("@w0")) | fields day | head 1 + fetched rows / total rows = 1/1 + +-----+ + | day | + |-----| + | 1 | + +-----+ ### `SECOND` diff --git a/docs/ppl-lang/ppl-where-command.md b/docs/ppl-lang/ppl-where-command.md index ec676ab62..d6c68cdaf 100644 --- a/docs/ppl-lang/ppl-where-command.md +++ b/docs/ppl-lang/ppl-where-command.md @@ -61,3 +61,4 @@ PPL query: | eval factor = case(a > 15, a - 14, isnull(b), a - 7, a < 3, a + 1 else 1) | where case(factor = 2, 'even', factor = 4, 'even', factor = 6, 'even', factor = 8, 'even' else 'odd') = 'even' | stats count() by factor` +- `source = table | where timestamp >= relative_timestamp("-1d@w6")` diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBuiltInDateTimeFunctionITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBuiltInDateTimeFunctionITSuite.scala index 8001a690d..f71bce90c 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBuiltInDateTimeFunctionITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBuiltInDateTimeFunctionITSuite.scala @@ -368,6 +368,70 @@ class FlintSparkPPLBuiltInDateTimeFunctionITSuite assertSameRows(Seq(Row(3)), frame) } + test("test RELATIVE_TIMESTAMP") { + var frame = sql(s""" + | source=$testTable + | | eval seconds_diff = timestampdiff(SECOND, now(), relative_timestamp("now")) + | | fields seconds_diff + | | head 1 + | """.stripMargin) + assertSameRows(Seq(Row(0)), frame) + + frame = sql(s""" + | source=$testTable + | | eval hours_diff = timestampdiff(HOUR, relative_timestamp("+1h"), relative_timestamp("+1d")) + | | fields hours_diff + | | head 1 + | """.stripMargin) + assertSameRows(Seq(Row(23)), frame) + + frame = sql(s""" + | source =$testTable + | | eval day = day_of_week(relative_timestamp("@w0")) + | | fields day + | | head 1 + | """.stripMargin) + assertSameRows(Seq(Row(1)), frame) + + frame = sql(s""" + | source=$testTable + | | eval last_wednesday = relative_timestamp("-1d@w3") + | | eval actual_days_ago = timestampdiff(DAY, last_wednesday, now() ) + | | eval day_of_week = day_of_week(now()) + | | eval expected_days_ago = case(day_of_week >= 4, day_of_week - 4 else day_of_week + 3) + | | eval test_result = (expected_weeks_ago = actual_weeks_ago) + | | fields test_result + | | head 1 + | """.stripMargin) + assertSameRows(Seq(Row(1)), frame) + } + + // TODO #957: Support earliest + ignore("test EARLIEST") { + var frame = sql(s""" + | source=$testTable + | | eval earliest_hour_before = earliest(now(), "-1h") + | | eval earliest_now = earliest(now(), "now") + | | eval earliest_hour_after = earliest(now(), "+1h") + | | fields earliest_hour_before, earliest_now, earliest_hour_after + | | head 1 + | """.stripMargin) + assertSameRows(Seq(Row(true), Row(true), Row(false)), frame) + } + + // TODO #957: Support latest + ignore("test LATEST") { + var frame = sql(s""" + | source=$testTable + | | eval latest_hour_before = latest(now(), "-1h") + | | eval latest_now = latest(now(), "now") + | | eval latest_hour_after = latest(now(), "+1h") + | | fields latest_hour_before, latest_now, latest_hour_after + | | head 1 + | """.stripMargin) + assertSameRows(Seq(Row(false), Row(true), Row(true)), frame) + } + test("test CURRENT_TIME is not supported") { val ex = intercept[UnsupportedOperationException](sql(s""" | source = $testTable diff --git a/ppl-spark-integration/README.md b/ppl-spark-integration/README.md index 73c526868..8265f19d8 100644 --- a/ppl-spark-integration/README.md +++ b/ppl-spark-integration/README.md @@ -6,7 +6,7 @@ translation between PPL's logical plan to Spark's Catalyst logical plan. ### Context The next concepts are the main purpose of introduction this functionality: - Transforming PPL to become OpenSearch default query language (specifically for logs/traces/metrics signals) -- Promoting PPL as a viable candidate for the proposed CNCF Observability universal query language. +- Promoting PPL as a viable candidate for the proposed CNCF Observability universal query language. - Seamlessly Interact with different datasources such as S3 / Prometheus / data-lake leveraging spark execution. - Using spark's federative capabilities as a general purpose query engine to facilitate complex queries including joins - Improve and promote PPL to become extensible and general purpose query language to be adopted by the community @@ -37,7 +37,7 @@ In Apache Spark, the DataFrame API serves as a programmatic interface for data m For instance, if you have a PPL query and a translator, you can convert it into DataFrame operations to generate an optimized execution plan. Spark's underlying Catalyst optimizer will convert these DataFrame transformations and actions into an optimized physical plan executed over RDDs or Datasets. -The following section describes the two main options for translating the PPL query (using the logical plan) into the spark corespondent component (either dataframe API or spark logical plan) +The following section describes the two main options for translating the PPL query (using the logical plan) into the spark correspondent component (either dataframe API or spark logical plan) ### Translation Process diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 index a6ab4f1de..0e8568a97 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 @@ -334,6 +334,7 @@ MONTHNAME: 'MONTHNAME'; NOW: 'NOW'; PERIOD_ADD: 'PERIOD_ADD'; PERIOD_DIFF: 'PERIOD_DIFF'; +RELATIVE_TIMESTAMP: 'RELATIVE_TIMESTAMP'; SEC_TO_TIME: 'SEC_TO_TIME'; STR_TO_DATE: 'STR_TO_DATE'; SUBDATE: 'SUBDATE'; diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 index 0a2cdf1a0..1f09cdc24 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 @@ -747,6 +747,7 @@ dateTimeFunctionName | NOW | PERIOD_ADD | PERIOD_DIFF + | RELATIVE_TIMESTAMP | QUARTER | SECOND | SECOND_OF_MINUTE diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/DataType.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/DataType.java index 6f0de02f5..59ba4e289 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/DataType.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/DataType.java @@ -9,7 +9,7 @@ import lombok.RequiredArgsConstructor; import org.opensearch.sql.data.type.ExprCoreType; -/** The DataType defintion in AST. Question, could we use {@link ExprCoreType} directly in AST? */ +/** The DataType definition in AST. Question, could we use {@link ExprCoreType} directly in AST? */ @RequiredArgsConstructor public enum DataType { diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java index 86970cefb..411a9c5ea 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java @@ -133,6 +133,9 @@ public enum BuiltinFunctionName { LOCALTIMESTAMP(FunctionName.of("localtimestamp")), SYSDATE(FunctionName.of("sysdate")), + // Relative timestamp functions + RELATIVE_TIMESTAMP(FunctionName.of("relative_timestamp")), + /** Text Functions. */ TOSTRING(FunctionName.of("tostring")), diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java index e931175ff..e62e51e79 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java @@ -13,16 +13,23 @@ import org.apache.spark.sql.types.DataTypes; import scala.Function1; import scala.Function2; +import scala.Function3; import scala.Option; import scala.Serializable; import scala.runtime.AbstractFunction1; import scala.runtime.AbstractFunction2; +import scala.runtime.AbstractFunction3; import scala.collection.JavaConverters; import scala.collection.mutable.WrappedArray; +import java.lang.Boolean; import java.math.BigInteger; import java.net.InetAddress; import java.net.UnknownHostException; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; import java.util.Collection; import java.util.List; import java.util.Map; @@ -35,11 +42,18 @@ public interface SerializableUdf { + abstract class SerializableAbstractFunction1 extends AbstractFunction1 + implements Serializable { + } abstract class SerializableAbstractFunction2 extends AbstractFunction2 implements Serializable { } + abstract class SerializableAbstractFunction3 extends AbstractFunction3 + implements Serializable { + } + /** * Remove specified keys from a JSON string. * @@ -109,7 +123,7 @@ public String apply(String jsonStr, WrappedArray elements) { } } }; - + Function2 cidrFunction = new SerializableAbstractFunction2<>() { IPAddressStringParameters valOptions = new IPAddressStringParameters.Builder() @@ -197,9 +211,18 @@ public BigInteger apply(String ipAddress) { }; } - abstract class SerializableAbstractFunction1 extends AbstractFunction1 - implements Serializable { - } + /** + * Returns the {@link Timestamp} corresponding to the given relative string and current timestamp. + * Throws {@link RuntimeException} if the relative string is not supported. + */ + Function3 relativeTimestampFunction = new SerializableAbstractFunction3() { + @Override + public Timestamp apply(String relativeString, Instant currentInstant, String zoneId) { + LocalDateTime currentLocalDateTime = LocalDateTime.ofInstant(currentInstant, ZoneId.of(zoneId)); + LocalDateTime relativeLocalDateTime = TimeUtils.getRelativeLocalDateTime(relativeString, currentLocalDateTime); + return Timestamp.valueOf(relativeLocalDateTime); + } + }; /** * Get the function reference according to its name @@ -254,6 +277,15 @@ static ScalaUDF visit(String funcName, List expressions) { Option.apply("ip_to_int"), false, true); + case "relative_timestamp": + return new ScalaUDF(relativeTimestampFunction, + DataTypes.TimestampType, + seq(expressions), + seq(), + Option.empty(), + Option.apply("relative_timestamp"), + false, + true); default: return null; } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/TimeUtils.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/TimeUtils.java new file mode 100644 index 000000000..da399a56c --- /dev/null +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/TimeUtils.java @@ -0,0 +1,332 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.expression.function; + +import com.google.common.collect.ImmutableMap; +import lombok.experimental.UtilityClass; + +import java.time.DayOfWeek; +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.Month; +import java.time.Period; +import java.time.temporal.ChronoUnit; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@UtilityClass +public class TimeUtils { + + private static final String NOW = "now"; + private static final String NEGATIVE_SIGN = "-"; + + // Pattern for relative string. + private static final String OFFSET_PATTERN_STRING = "(?[+-])(?\\d+)?(?\\w+)"; + private static final String SNAP_PATTERN_STRING = "[@](?\\w+)"; + + private static final Pattern RELATIVE_PATTERN = Pattern.compile(String.format( + "(?%s)?(?%s)?", OFFSET_PATTERN_STRING, SNAP_PATTERN_STRING), + Pattern.CASE_INSENSITIVE); + + // Supported time units. + private static final Set SECOND_UNITS_SET = Set.of("s", "sec", "secs", "second", "seconds"); + private static final Set MINUTE_UNITS_SET = Set.of("m", "min", "mins", "minute", "minutes"); + private static final Set HOUR_UNITS_SET = Set.of("h", "hr", "hrs", "hour", "hours"); + private static final Set DAY_UNITS_SET = Set.of("d", "day", "days"); + private static final Set WEEK_UNITS_SET = Set.of("w", "wk", "wks", "week", "weeks"); + private static final Set MONTH_UNITS_SET = Set.of("mon", "month", "months"); + private static final Set QUARTER_UNITS_SET = Set.of("q", "qtr", "qtrs", "quarter", "quarters"); + private static final Set YEAR_UNITS_SET = Set.of("y", "yr", "yrs", "year", "years"); + + // Map from time unit to the corresponding duration. + private static final Map DURATION_FOR_TIME_UNIT_MAP; + + static { + Map durationMap = new HashMap<>(); + SECOND_UNITS_SET.forEach(u -> durationMap.put(u, Duration.ofSeconds(1))); + MINUTE_UNITS_SET.forEach(u -> durationMap.put(u, Duration.ofMinutes(1))); + HOUR_UNITS_SET.forEach(u -> durationMap.put(u, Duration.ofHours(1))); + DURATION_FOR_TIME_UNIT_MAP = ImmutableMap.copyOf(durationMap); + } + + // Map from time unit to the corresponding period. + private static final Map PERIOD_FOR_TIME_UNIT_MAP; + + static { + Map periodMap = new HashMap<>(); + DAY_UNITS_SET.forEach(u -> periodMap.put(u, Period.ofDays(1))); + WEEK_UNITS_SET.forEach(u -> periodMap.put(u, Period.ofWeeks(1))); + MONTH_UNITS_SET.forEach(u -> periodMap.put(u, Period.ofMonths(1))); + QUARTER_UNITS_SET.forEach(u -> periodMap.put(u, Period.ofMonths(3))); + YEAR_UNITS_SET.forEach(u -> periodMap.put(u, Period.ofYears(1))); + PERIOD_FOR_TIME_UNIT_MAP = ImmutableMap.copyOf(periodMap); + } + + // Map from snap unit to the corresponding day of the week. + private static final Map DAY_OF_THE_WEEK_FOR_SNAP_UNIT_MAP = Map.ofEntries( + Map.entry("w0", DayOfWeek.SUNDAY), + Map.entry("w7", DayOfWeek.SUNDAY), + Map.entry("w1", DayOfWeek.MONDAY), + Map.entry("w2", DayOfWeek.TUESDAY), + Map.entry("w3", DayOfWeek.WEDNESDAY), + Map.entry("w4", DayOfWeek.THURSDAY), + Map.entry("w5", DayOfWeek.FRIDAY), + Map.entry("w6", DayOfWeek.SATURDAY)); + + /** + * Returns the relative {@link LocalDateTime} corresponding to the given relative string and local date time. + *

+ * The relative time string has syntax {@code [+|-]@}, and + * is made up of two optional components: + *

    + *
  • + * An offset from the current timestamp at the start of query execution, which is composed of a sign + * ({@code +} or {@code -}), an optional time integer, and a time unit. If the time integer is not specified, + * it defaults to one. For example, {@code +2hr} corresponds to two hours after the current timestamp, while + * {@code -mon} corresponds to one month ago. + *
  • + *
  • + * A snap-to time using the {@code @} symbol followed by a time unit. The snap-to time is applied after the + * offset (if specified), and rounds the time down to the start of the specified time unit (i.e. + * backwards in time). For example, {@code @wk} corresponds to the start of the current week (Sunday is + * considered to be the first day of the week). + *
  • + *
+ *

+ * The following offset time units are supported: + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
Time UnitSupported Keywords
Seconds{@code s}, {@code sec}, {@code secs}, {@code second}, {@code seconds}
Minutes{@code m}, {@code min}, {@code mins}, {@code minute}, {@code minutes}
Hours{@code h}, {@code hr}, {@code hrs}, {@code hour}, {@code hours}
Days{@code d}, {@code day}, {@code days}
Weeks{@code w}, {@code wk}, {@code wks}, {@code week}, {@code weeks}
Quarters{@code q}, {@code qtr}, {@code qtrs}, {@code quarter}, {@code quarters}
Years{@code y}, {@code yr}, {@code yrs}, {@code year}, {@code years}
+ *

+ * The snap-to time supports all the time units above, as well as the following day of the week time units: + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
Time UnitSupported Keywords
Sunday{@code w0}, {@code w7}
Monday{@code w1}
Tuesday{@code w2}
Wednesday{@code w3}
Thursday{@code w4}
Friday{@code w5}
Saturday{@code w6}
+ *

+ * The special relative time string {@code now} for the current timestamp is also supported. + *

+ * For example, if the current timestamp is Monday, January 03, 2000 at 01:01:01 am: + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
Relative StringDescriptionResulting Relative Time
{@code -60m}Sixty minutes agoMonday, January 03, 2000 at 00:01:01 am
{@code -h}One hour agoMonday, January 03, 2000 at 00:01:01 am
{@code +2wk}Two weeks from nowMonday, January 17, 2000 at 00:01:01 am
{@code -1h@w3}One hour ago, rounded to the start of the previous WednesdayWednesday, December 29, 1999 at 00:00:00 am
{@code @d}Start of the current dayMonday, January 03, 2000 at 00:00:00 am
{@code now}NowMonday, January 03, 2000 at 01:01:01 am
+ */ + public static LocalDateTime getRelativeLocalDateTime(String relativeString, LocalDateTime localDateTime) { + + LocalDateTime relativeLocalDateTime = localDateTime; + + if (relativeString.equalsIgnoreCase(NOW)) { + return localDateTime; + } + + Matcher matcher = RELATIVE_PATTERN.matcher(relativeString); + if (!matcher.matches()) { + String message = String.format("The relative date time '%s' is not supported.", relativeString); + throw new IllegalArgumentException(message); + } + + + if (matcher.group("offset") != null) { + relativeLocalDateTime = applyOffset( + relativeLocalDateTime, + matcher.group("offsetSign"), + matcher.group("offsetValue"), + matcher.group("offsetUnit")); + } + + if (matcher.group("snap") != null) { + relativeLocalDateTime = applySnap( + relativeLocalDateTime, + matcher.group("snapUnit")); + } + + return relativeLocalDateTime; + } + + /** + * Applies the offset specified by the offset sign, value, + * and unit to the given local date time, and returns the result. + */ + private LocalDateTime applyOffset(LocalDateTime localDateTime, String offsetSign, String offsetValue, String offsetUnit) { + + int offsetValueInt = Optional.ofNullable(offsetValue).map(Integer::parseInt).orElse(1); + if (offsetSign.equals(NEGATIVE_SIGN)) { + offsetValueInt *= -1; + } + + /* {@link Duration} and {@link Period} must be handled separately because, even + though they both inherit from {@link java.time.temporal.TemporalAmount}, they + define separate 'multipliedBy' methods. */ + + // Convert to lower case to make case-insensitive. + String offsetUnitLowerCase = offsetUnit.toLowerCase(); + + if (DURATION_FOR_TIME_UNIT_MAP.containsKey(offsetUnitLowerCase)) { + Duration offsetDuration = DURATION_FOR_TIME_UNIT_MAP.get(offsetUnitLowerCase).multipliedBy(offsetValueInt); + return localDateTime.plus(offsetDuration); + } + + if (PERIOD_FOR_TIME_UNIT_MAP.containsKey(offsetUnitLowerCase)) { + Period offsetPeriod = PERIOD_FOR_TIME_UNIT_MAP.get(offsetUnitLowerCase).multipliedBy(offsetValueInt); + return localDateTime.plus(offsetPeriod); + } + + String message = String.format("The relative date time unit '%s' is not supported.", offsetUnit); + throw new IllegalArgumentException(message); + } + + /** + * Snaps the given local date time to the start of the previous time + * period specified by the given snap unit, and returns the result. + */ + private LocalDateTime applySnap(LocalDateTime localDateTime, String snapUnit) { + + // Convert to lower case to make case-insensitive. + String snapUnitLowerCase = snapUnit.toLowerCase(); + + if (SECOND_UNITS_SET.contains(snapUnitLowerCase)) { + return localDateTime.truncatedTo(ChronoUnit.SECONDS); + } else if (MINUTE_UNITS_SET.contains(snapUnitLowerCase)) { + return localDateTime.truncatedTo(ChronoUnit.MINUTES); + } else if (HOUR_UNITS_SET.contains(snapUnitLowerCase)) { + return localDateTime.truncatedTo(ChronoUnit.HOURS); + } else if (DAY_UNITS_SET.contains(snapUnitLowerCase)) { + return localDateTime.truncatedTo(ChronoUnit.DAYS); + } else if (WEEK_UNITS_SET.contains(snapUnitLowerCase)) { + return applySnapToDayOfWeek(localDateTime, DayOfWeek.SUNDAY); + } else if (MONTH_UNITS_SET.contains(snapUnitLowerCase)) { + return localDateTime.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1); + } else if (QUARTER_UNITS_SET.contains(snapUnitLowerCase)) { + Month snapMonth = localDateTime.getMonth().firstMonthOfQuarter(); + return localDateTime.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1).withMonth(snapMonth.getValue()); + } else if (YEAR_UNITS_SET.contains(snapUnitLowerCase)) { + return localDateTime.truncatedTo(ChronoUnit.DAYS).withDayOfYear(1); + } else if (DAY_OF_THE_WEEK_FOR_SNAP_UNIT_MAP.containsKey(snapUnitLowerCase)) { + return applySnapToDayOfWeek(localDateTime, DAY_OF_THE_WEEK_FOR_SNAP_UNIT_MAP.get(snapUnitLowerCase)); + } + + String message = String.format("The relative date time unit '%s' is not supported.", snapUnit); + throw new IllegalArgumentException(message); + } + + /** + * Snaps the given date time to the start of the previous + * specified day of the week, and returns the result. + */ + private LocalDateTime applySnapToDayOfWeek(LocalDateTime dateTime, DayOfWeek snapDayOfWeek) { + LocalDateTime snappedDateTime = dateTime.truncatedTo(ChronoUnit.DAYS); + + int daysToSnap = dateTime.getDayOfWeek().getValue() - snapDayOfWeek.getValue(); + if (daysToSnap < 0) daysToSnap += DayOfWeek.values().length; + + return snappedDateTime.minusDays(daysToSnap); + } +} diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/BuiltinFunctionTransformer.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/BuiltinFunctionTransformer.java index f73a1c491..01987757f 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/BuiltinFunctionTransformer.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/BuiltinFunctionTransformer.java @@ -21,7 +21,6 @@ import org.opensearch.sql.ast.expression.IntervalUnit; import org.opensearch.sql.expression.function.BuiltinFunctionName; import org.opensearch.sql.expression.function.SerializableUdf; -import org.opensearch.sql.ppl.CatalystPlanContext; import scala.Option; import java.util.Arrays; @@ -58,6 +57,7 @@ import static org.opensearch.sql.expression.function.BuiltinFunctionName.LOCALTIME; import static org.opensearch.sql.expression.function.BuiltinFunctionName.MINUTE_OF_HOUR; import static org.opensearch.sql.expression.function.BuiltinFunctionName.MONTH_OF_YEAR; +import static org.opensearch.sql.expression.function.BuiltinFunctionName.RELATIVE_TIMESTAMP; import static org.opensearch.sql.expression.function.BuiltinFunctionName.SECOND_OF_MINUTE; import static org.opensearch.sql.expression.function.BuiltinFunctionName.SUBDATE; import static org.opensearch.sql.expression.function.BuiltinFunctionName.SYSDATE; @@ -174,6 +174,9 @@ public interface BuiltinFunctionTransformer { args -> { return ToUTCTimestamp$.MODULE$.apply(CurrentTimestamp$.MODULE$.apply(), CurrentTimeZone$.MODULE$.apply()); }) + .put( + RELATIVE_TIMESTAMP, + args -> SerializableUdf.visit("relative_timestamp", List.of(args.get(0), CurrentTimestamp$.MODULE$.apply(), CurrentTimeZone$.MODULE$.apply()))) .build(); static Expression builtinFunction(org.opensearch.sql.ast.expression.Function function, List args) { @@ -182,7 +185,7 @@ static Expression builtinFunction(org.opensearch.sql.ast.expression.Function fun if(udf == null) { throw new UnsupportedOperationException(function.getFuncName() + " is not a builtin function of PPL"); } - return udf; + return udf; } else { BuiltinFunctionName builtin = BuiltinFunctionName.of(function.getFuncName()).get(); String name = SPARK_BUILTIN_FUNCTION_NAME_MAPPING.get(builtin); diff --git a/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableTimeUdfTest.java b/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableTimeUdfTest.java new file mode 100644 index 000000000..566990a59 --- /dev/null +++ b/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableTimeUdfTest.java @@ -0,0 +1,61 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.expression.function; + +import org.junit.Test; + +import java.sql.Timestamp; +import java.time.Instant; +import java.time.ZoneId; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.opensearch.sql.expression.function.SerializableUdf.relativeTimestampFunction; + +public class SerializableTimeUdfTest { + + // Monday, Jan 03, 2000 @ 01:01:01.100 + private final Instant MOCK_INSTANT = Instant.parse("2000-01-03T01:01:01.100Z"); + private final ZoneId MOCK_ZONE_ID = ZoneId.of("UTC"); + + @Test + public void relativeTimestampTest() { + + /* These are only basic tests of the relative date time functionality. + For more comprehensive tests, see {@link TimeUtilsTest}. + */ + + testValid("-60m", "2000-01-03 00:01:01.100"); + testValid("-H", "2000-01-03 00:01:01.100"); + testValid("+2wk", "2000-01-17 01:01:01.100"); + testValid("-1h@W3", "1999-12-29 00:00:00"); + testValid("@d", "2000-01-03 00:00:00"); + testValid("now", "2000-01-03 01:01:01.100"); + + testInvalid("invalid", "The relative date time 'invalid' is not supported."); + testInvalid("INVALID", "The relative date time 'INVALID' is not supported."); + testInvalid("~h", "The relative date time '~h' is not supported."); + testInvalid("+1.1h", "The relative date time '+1.1h' is not supported."); + testInvalid("+ms", "The relative date time unit 'ms' is not supported."); + testInvalid("+1INVALID", "The relative date time unit 'INVALID' is not supported."); + testInvalid("@INVALID", "The relative date time unit 'INVALID' is not supported."); + testInvalid("@ms", "The relative date time unit 'ms' is not supported."); + testInvalid("@w8", "The relative date time unit 'w8' is not supported."); + } + + private void testValid(String relativeString, String expectedTimestampString) { + String testMessage = String.format("\"%s\"", relativeString); + Timestamp expectedTimestamp = Timestamp.valueOf(expectedTimestampString); + Timestamp actualTimestamp = relativeTimestampFunction.apply(relativeString, MOCK_INSTANT, MOCK_ZONE_ID.toString()); + assertEquals(testMessage, expectedTimestamp, actualTimestamp); + } + + private void testInvalid(String relativeString, String expectedExceptionMessage) { + String testMessage = String.format("\"%s\"", relativeString); + String actualExceptionMessage = assertThrows(testMessage, RuntimeException.class, () -> relativeTimestampFunction.apply(relativeString, MOCK_INSTANT, MOCK_ZONE_ID.toString())).getMessage(); + assertEquals(expectedExceptionMessage, actualExceptionMessage); + } +} diff --git a/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/TimeUtilsTest.java b/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/TimeUtilsTest.java new file mode 100644 index 000000000..88101517c --- /dev/null +++ b/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/TimeUtilsTest.java @@ -0,0 +1,185 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.expression.function; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import java.time.LocalDateTime; + +import org.junit.Test; + +public class TimeUtilsTest { + + // Monday, Jan 03, 2000 @ 01:01:01.100 + private final LocalDateTime MOCK_DATETIME = LocalDateTime.parse("2000-01-03T01:01:01.100"); + + @Test + public void testRelative() { + testValid("-60m", "2000-01-03T00:01:01.100"); + testValid("-H", "2000-01-03T00:01:01.100"); + testValid("+2wk", "2000-01-17T01:01:01.100"); + testValid("-1h@W3", "1999-12-29T00:00:00"); + testValid("@d", "2000-01-03T00:00"); + testValid("now", "2000-01-03T01:01:01.100"); + + testInvalid("invalid", "The relative date time 'invalid' is not supported."); + } + + @Test + public void testRelativeCaseInsensitive() { + testValid("NOW", "2000-01-03T01:01:01.100"); + testValid("-60M", "2000-01-03T00:01:01.100"); + testValid("-H", "2000-01-03T00:01:01.100"); + testValid("+2WK", "2000-01-17T01:01:01.100"); + testValid("-1H@H", "2000-01-03T00:00"); + testValid("@D", "2000-01-03T00:00"); + + testInvalid("INVALID", "The relative date time 'INVALID' is not supported."); + } + + @Test + public void testRelativeOffsetSign() { + testValid("+h", "2000-01-03T02:01:01.100"); + testValid("-h", "2000-01-03T00:01:01.100"); + + testInvalid("~h", "The relative date time '~h' is not supported."); + } + + @Test + public void testRelativeOffsetValue() { + testValid("+h", "2000-01-03T02:01:01.100"); + testValid("+0h", "2000-01-03T01:01:01.100"); + testValid("+1h", "2000-01-03T02:01:01.100"); + testValid("+12h", "2000-01-03T13:01:01.100"); + + testInvalid("+1.1h", "The relative date time '+1.1h' is not supported."); + } + + @Test + public void testRelativeOffsetUnit() { + testValid("+s", "2000-01-03T01:01:02.1"); + testValid("+sec", "2000-01-03T01:01:02.1"); + testValid("+secs", "2000-01-03T01:01:02.1"); + testValid("+second", "2000-01-03T01:01:02.1"); + testValid("+seconds", "2000-01-03T01:01:02.1"); + + testValid("+m", "2000-01-03T01:02:01.100"); + testValid("+min", "2000-01-03T01:02:01.100"); + testValid("+mins", "2000-01-03T01:02:01.100"); + testValid("+minute", "2000-01-03T01:02:01.100"); + testValid("+minutes", "2000-01-03T01:02:01.100"); + + testValid("+h", "2000-01-03T02:01:01.100"); + testValid("+hr", "2000-01-03T02:01:01.100"); + testValid("+hrs", "2000-01-03T02:01:01.100"); + testValid("+hour", "2000-01-03T02:01:01.100"); + testValid("+hours", "2000-01-03T02:01:01.100"); + + testValid("+d", "2000-01-04T01:01:01.100"); + testValid("+day", "2000-01-04T01:01:01.100"); + testValid("+days", "2000-01-04T01:01:01.100"); + + testValid("+w", "2000-01-10T01:01:01.100"); + testValid("+wk", "2000-01-10T01:01:01.100"); + testValid("+wks", "2000-01-10T01:01:01.100"); + testValid("+week", "2000-01-10T01:01:01.100"); + testValid("+weeks", "2000-01-10T01:01:01.100"); + + testValid("+mon", "2000-02-03T01:01:01.100"); + testValid("+month", "2000-02-03T01:01:01.100"); + testValid("+months", "2000-02-03T01:01:01.100"); + + testValid("+q", "2000-04-03T01:01:01.100"); + testValid("+qtr", "2000-04-03T01:01:01.100"); + testValid("+qtrs", "2000-04-03T01:01:01.100"); + testValid("+quarter", "2000-04-03T01:01:01.100"); + testValid("+quarters", "2000-04-03T01:01:01.100"); + + testValid("+y", "2001-01-03T01:01:01.100"); + testValid("+yr", "2001-01-03T01:01:01.100"); + testValid("+yrs", "2001-01-03T01:01:01.100"); + testValid("+year", "2001-01-03T01:01:01.100"); + testValid("+years", "2001-01-03T01:01:01.100"); + + testInvalid("+ms", "The relative date time unit 'ms' is not supported."); + testInvalid("+1INVALID", "The relative date time unit 'INVALID' is not supported."); + } + + @Test + public void testRelativeSnap() { + testValid("@s", "2000-01-03T01:01:01"); + testValid("@sec", "2000-01-03T01:01:01"); + testValid("@secs", "2000-01-03T01:01:01"); + testValid("@second", "2000-01-03T01:01:01"); + testValid("@seconds", "2000-01-03T01:01:01"); + + testValid("@m", "2000-01-03T01:01"); + testValid("@min", "2000-01-03T01:01"); + testValid("@mins", "2000-01-03T01:01"); + testValid("@minute", "2000-01-03T01:01"); + testValid("@minutes", "2000-01-03T01:01"); + + testValid("@h", "2000-01-03T01:00"); + testValid("@hr", "2000-01-03T01:00"); + testValid("@hrs", "2000-01-03T01:00"); + testValid("@hour", "2000-01-03T01:00"); + testValid("@hours", "2000-01-03T01:00"); + + testValid("@d", "2000-01-03T00:00"); + testValid("@day", "2000-01-03T00:00"); + testValid("@days", "2000-01-03T00:00"); + + testValid("@w", "2000-01-02T00:00"); + testValid("@wk", "2000-01-02T00:00"); + testValid("@wks", "2000-01-02T00:00"); + testValid("@week", "2000-01-02T00:00"); + testValid("@weeks", "2000-01-02T00:00"); + + testValid("@mon", "2000-01-01T00:00"); + testValid("@month", "2000-01-01T00:00"); + testValid("@months", "2000-01-01T00:00"); + + testValid("@q", "2000-01-01T00:00"); + testValid("@qtr", "2000-01-01T00:00"); + testValid("@qtrs", "2000-01-01T00:00"); + testValid("@quarter", "2000-01-01T00:00"); + testValid("@quarters", "2000-01-01T00:00"); + + testValid("@y", "2000-01-01T00:00"); + testValid("@yr", "2000-01-01T00:00"); + testValid("@yrs", "2000-01-01T00:00"); + testValid("@year", "2000-01-01T00:00"); + testValid("@years", "2000-01-01T00:00"); + + testValid("@w0", "2000-01-02T00:00"); + testValid("@w1", "2000-01-03T00:00"); + testValid("@w2", "1999-12-28T00:00"); + testValid("@w3", "1999-12-29T00:00"); + testValid("@w4", "1999-12-30T00:00"); + testValid("@w5", "1999-12-31T00:00"); + testValid("@w6", "2000-01-01T00:00"); + testValid("@w7", "2000-01-02T00:00"); + + testInvalid("@INVALID", "The relative date time unit 'INVALID' is not supported."); + testInvalid("@ms", "The relative date time unit 'ms' is not supported."); + testInvalid("@w8", "The relative date time unit 'w8' is not supported."); + } + + private void testValid(String relativeDateTimeString, String expectedDateTimeString) { + String testMessage = String.format("\"%s\"", relativeDateTimeString); + LocalDateTime expectedDateTime = LocalDateTime.parse(expectedDateTimeString); + LocalDateTime actualDateTime = TimeUtils.getRelativeLocalDateTime(relativeDateTimeString, MOCK_DATETIME); + assertEquals(testMessage, expectedDateTime, actualDateTime); + } + + private void testInvalid(String relativeDateTimeString, String expectedExceptionMessage) { + String testMessage = String.format("\"%s\"", relativeDateTimeString); + String actualExceptionMessage = assertThrows(testMessage, RuntimeException.class, + () -> TimeUtils.getRelativeLocalDateTime(relativeDateTimeString, MOCK_DATETIME)).getMessage(); + assertEquals(expectedExceptionMessage, actualExceptionMessage); + } +}