Skip to content

Commit

Permalink
[InLong-9476][Sort] Add custom function for data time transform
Browse files Browse the repository at this point in the history
  • Loading branch information
EMsnap committed Dec 15, 2023
1 parent 941923e commit 0192ed6
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.function;

import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;

/**
* Round timestamp and output formatted timestamp.
*/
public class RoundTimestampFunction extends ScalarFunction {

private static final long serialVersionUID = 1L;

public static final Logger LOG = LoggerFactory.getLogger(RoundTimestampFunction.class);

/**
* Round timestamp and output formatted timestamp.
* For example, if the input timestamp is 1702610371(s), the roundTime is 600(s), and the format is "yyyyMMddHHmm",
* the formatted timestamp is "2023121510".
*
* @param timestamp The input timestamp in seconds.
* @param roundTime The round time in seconds.
* @param format The format of the output timestamp.
* @return The formatted timestamp.
*/
public static Long eval(Long timestamp, Long roundTime, String format) {
try {
LocalDateTime dateTime = LocalDateTime.ofInstant(
Instant.ofEpochSecond(timestamp - timestamp % roundTime),
ZoneId.systemDefault()
);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(format);
String formattedDateTime = dateTime.format(formatter);
return Long.parseLong(formattedDateTime);
} catch (Exception e) {
LOG.error("get formatted timestamp error, timestamp: {}, roundTime: {},format: {}",
timestamp, roundTime, format, e);
return null;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.inlong.sort.function.JsonGetterFunction;
import org.apache.inlong.sort.function.RegexpReplaceFirstFunction;
import org.apache.inlong.sort.function.RegexpReplaceFunction;
import org.apache.inlong.sort.function.RoundTimestampFunction;
import org.apache.inlong.sort.parser.Parser;
import org.apache.inlong.sort.parser.result.FlinkSqlParseResult;
import org.apache.inlong.sort.parser.result.ParseResult;
Expand Down Expand Up @@ -125,6 +126,7 @@ private void registerUDF() {
tableEnv.createTemporarySystemFunction("ENCRYPT", EncryptFunction.class);
tableEnv.createTemporarySystemFunction("JSON_GETTER", JsonGetterFunction.class);
tableEnv.createTemporarySystemFunction(DEFAULT_EMBEDDING_FUNCTION_NAME, EmbeddingFunction.class);
tableEnv.createTemporarySystemFunction("ROUND_TIMESTAMP", RoundTimestampFunction.class);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.inlong.sort.function.JsonGetterFunction;
import org.apache.inlong.sort.function.RegexpReplaceFirstFunction;
import org.apache.inlong.sort.function.RegexpReplaceFunction;
import org.apache.inlong.sort.function.RoundTimestampFunction;
import org.apache.inlong.sort.parser.Parser;
import org.apache.inlong.sort.parser.result.FlinkSqlParseResult;
import org.apache.inlong.sort.parser.result.ParseResult;
Expand Down Expand Up @@ -74,6 +75,7 @@ private void registerUDF() {
tableEnv.createTemporarySystemFunction("ENCRYPT", EncryptFunction.class);
tableEnv.createTemporarySystemFunction("JSON_GETTER", JsonGetterFunction.class);
tableEnv.createTemporarySystemFunction(DEFAULT_EMBEDDING_FUNCTION_NAME, EmbeddingFunction.class);
tableEnv.createTemporarySystemFunction("ROUND_TIMESTAMP", RoundTimestampFunction.class);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class JsonGetterFunctionTest extends AbstractTestBase {
/**
* Test for JsonGetter function
*
* @throws Exception The exception may throw when test Encrypt function
* @throws Exception The exception may throw when test JsonGetter function
*/
@Test
public void testJsonGetterFunction() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package org.apache.inlong.sort.function;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.Assert;
import org.junit.Test;

/**
* Test for {@link RoundTimestampFunction}
*/
public class RoundTimestampFunctionTest extends AbstractTestBase {

public static final long TEST_TIMESTAMP = 1702610371L;

/**
* Test for round timestamp function
*
* @throws Exception The exception may throw when test round timestamp function
*/
@Test
public void testRoundTimestampFunction() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(10000);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

// step 1. Register custom function of ROUND_TIMESTAMP
tableEnv.createTemporaryFunction("ROUND_TIMESTAMP", RoundTimestampFunction.class);

// step 2. Generate test data and convert to DataStream

List<Row> data = new ArrayList<>();
data.add(Row.of(TEST_TIMESTAMP));
TypeInformation<?>[] types = {BasicTypeInfo.LONG_TYPE_INFO};

String[] names = {"f1"};
RowTypeInfo typeInfo = new RowTypeInfo(types, names);
DataStream<Row> dataStream = env.fromCollection(data).returns(typeInfo);

String formattedTimestamp = "2023121510";

// step 3. Convert from DataStream to Table and execute the ROUND_TIMESTAMP function
Table tempView = tableEnv.fromDataStream(dataStream).as("f1");
tableEnv.createTemporaryView("temp_view", tempView);
Table outputTable = tableEnv.sqlQuery(
"SELECT ROUND_TIMESTAMP(f1, 600, 'yyyyMMddmm') " +
"from temp_view");

// step 4. Get function execution result and parse it
DataStream<Row> resultSet = tableEnv.toAppendStream(outputTable, Row.class);
List<String> result = new ArrayList<>();
for (CloseableIterator<Row> it = resultSet.executeAndCollect(); it.hasNext();) {
Row row = it.next();
if (row != null) {
result.add(row.getField(0).toString());
}
}

Assert.assertEquals(result.size(), 1);
Assert.assertEquals(result.get(0), formattedTimestamp);

}

}

0 comments on commit 0192ed6

Please sign in to comment.