Skip to content

Commit

Permalink
[INLONG-9476][Sort] Add custom function for data time transform (#9483)
Browse files Browse the repository at this point in the history
* [InLong-9476][Sort] Add custom function for data time transform

* [InLong-9476][Sort] Add custom function for data time transform

* change return type to string

* fix comment

* fix standalone ut test

* make zoneid default

* make zoneid default

* add support for different format and test for it

* format check

* make formatters private
  • Loading branch information
EMsnap authored Dec 18, 2023
1 parent 7fe1edd commit 850b922
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ public void testStandaloneCluster() {

ClusterInfo info = clusterService.get(id, GLOBAL_OPERATOR);
Assertions.assertInstanceOf(SortClsClusterInfo.class, info);
this.deleteCluster(id);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.function;

import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;

/**
* Round timestamp and output formatted timestamp.
*/
public class RoundTimestampFunction extends ScalarFunction {

private static final long serialVersionUID = 1L;

public static final Logger LOG = LoggerFactory.getLogger(RoundTimestampFunction.class);
public static final ZoneId DEFAULT_ZONE = ZoneId.systemDefault();
private transient Map<String, DateTimeFormatter> formatters;

public void open(FunctionContext context) throws Exception {
super.open(context);
formatters = new HashMap<>();
}

/**
* Round timestamp and output formatted timestamp.
* For example, if the input timestamp is 1702610371(s), the roundTime is 600(s), and the format is "yyyyMMddHHmm",
* the formatted timestamp is "2023121510".
*
* @param timestamp The input timestamp in seconds.
* @param roundTime The round time in seconds.
* @param format The format of the output timestamp.
* @return The formatted timestamp.
*/
public String eval(Long timestamp, Long roundTime, String format) {
try {
LocalDateTime dateTime = LocalDateTime.ofInstant(
Instant.ofEpochSecond(timestamp - timestamp % roundTime),
DEFAULT_ZONE);
DateTimeFormatter formatter = formatters.get(format);
if (formatter == null) {
formatter = DateTimeFormatter.ofPattern(format);
formatters.put(format, formatter);
}
return dateTime.format(formatter);
} catch (Exception e) {
LOG.error("get formatted timestamp error, timestamp: {}, roundTime: {},format: {}",
timestamp, roundTime, format, e);
return null;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.inlong.sort.function.JsonGetterFunction;
import org.apache.inlong.sort.function.RegexpReplaceFirstFunction;
import org.apache.inlong.sort.function.RegexpReplaceFunction;
import org.apache.inlong.sort.function.RoundTimestampFunction;
import org.apache.inlong.sort.parser.Parser;
import org.apache.inlong.sort.parser.result.FlinkSqlParseResult;
import org.apache.inlong.sort.parser.result.ParseResult;
Expand Down Expand Up @@ -125,6 +126,7 @@ private void registerUDF() {
tableEnv.createTemporarySystemFunction("ENCRYPT", EncryptFunction.class);
tableEnv.createTemporarySystemFunction("JSON_GETTER", JsonGetterFunction.class);
tableEnv.createTemporarySystemFunction(DEFAULT_EMBEDDING_FUNCTION_NAME, EmbeddingFunction.class);
tableEnv.createTemporarySystemFunction("ROUND_TIMESTAMP", RoundTimestampFunction.class);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.inlong.sort.function.JsonGetterFunction;
import org.apache.inlong.sort.function.RegexpReplaceFirstFunction;
import org.apache.inlong.sort.function.RegexpReplaceFunction;
import org.apache.inlong.sort.function.RoundTimestampFunction;
import org.apache.inlong.sort.parser.Parser;
import org.apache.inlong.sort.parser.result.FlinkSqlParseResult;
import org.apache.inlong.sort.parser.result.ParseResult;
Expand Down Expand Up @@ -74,6 +75,7 @@ private void registerUDF() {
tableEnv.createTemporarySystemFunction("ENCRYPT", EncryptFunction.class);
tableEnv.createTemporarySystemFunction("JSON_GETTER", JsonGetterFunction.class);
tableEnv.createTemporarySystemFunction(DEFAULT_EMBEDDING_FUNCTION_NAME, EmbeddingFunction.class);
tableEnv.createTemporarySystemFunction("ROUND_TIMESTAMP", RoundTimestampFunction.class);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class JsonGetterFunctionTest extends AbstractTestBase {
/**
* Test for JsonGetter function
*
* @throws Exception The exception may throw when test Encrypt function
* @throws Exception The exception may throw when test JsonGetter function
*/
@Test
public void testJsonGetterFunction() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.function;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

/**
* Test for {@link RoundTimestampFunction}
*/
public class RoundTimestampFunctionTest extends AbstractTestBase {

public static final long TEST_TIMESTAMP = 1702610371L;

/**
* Test for round timestamp function
*
* @throws Exception The exception may throw when test round timestamp function
*/
@Test
public void testRoundTimestampFunction() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(10000);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

// step 1. Register custom function of ROUND_TIMESTAMP
tableEnv.createTemporaryFunction("ROUND_TIMESTAMP", RoundTimestampFunction.class);

// step 2. Generate test data and convert to DataStream

List<Row> data = new ArrayList<>();
data.add(Row.of(TEST_TIMESTAMP));
TypeInformation<?>[] types = {BasicTypeInfo.LONG_TYPE_INFO};

String[] names = {"f1"};
RowTypeInfo typeInfo = new RowTypeInfo(types, names);
DataStream<Row> dataStream = env.fromCollection(data).returns(typeInfo);

String formattedTimestamp = "2023121510";

// step 3. Convert from DataStream to Table and execute the ROUND_TIMESTAMP function
Table tempView = tableEnv.fromDataStream(dataStream).as("f1");
tableEnv.createTemporaryView("temp_view", tempView);
Table outputTable = tableEnv.sqlQuery(
"SELECT ROUND_TIMESTAMP(f1, 600, 'yyyyMMddmm') " +
"from temp_view");

// step 4. Get function execution result and parse it
DataStream<Row> resultSet = tableEnv.toAppendStream(outputTable, Row.class);
List<String> result = new ArrayList<>();
for (CloseableIterator<Row> it = resultSet.executeAndCollect(); it.hasNext();) {
Row row = it.next();
if (row != null) {
result.add(row.getField(0).toString());
}
}

Assert.assertEquals(result.size(), 1);
Assert.assertEquals(result.get(0), formattedTimestamp);

// step 5. provide a different format and check the result
Table outputTable2 = tableEnv.sqlQuery(
"SELECT ROUND_TIMESTAMP(f1, 600, 'yyyyMMddmmss') " +
"from temp_view");

DataStream<Row> resultSet2 = tableEnv.toAppendStream(outputTable2, Row.class);
List<String> result2 = new ArrayList<>();
for (CloseableIterator<Row> it = resultSet2.executeAndCollect(); it.hasNext();) {
Row row = it.next();
if (row != null) {
result2.add(row.getField(0).toString());
}
}

Assert.assertEquals(result2.size(), 1);
Assert.assertEquals(result2.get(0), formattedTimestamp + "00");

}

}

0 comments on commit 850b922

Please sign in to comment.