diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/GreatestFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/GreatestFunction.java new file mode 100644 index 0000000000..96218baa72 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/GreatestFunction.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; +/** + * GreatestFunction + * description: GREATEST(value1[, value2]*)--Returns the greatest value of the list of arguments. + * Returns NULL if any argument is NULL. + */ +@TransformFunction(names = {"greatest"}) +public class GreatestFunction implements ValueParser { + + private List parserList; + + public GreatestFunction(Function expr) { + if (expr.getParameters() == null) { + this.parserList = new ArrayList<>(); + } else { + List params = expr.getParameters().getExpressions(); + parserList = new ArrayList<>(params.size()); + for (Expression param : params) { + ValueParser node = OperatorTools.buildParser(param); + parserList.add(node); + } + } + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + BigDecimal maxValue = null; + for (ValueParser valueParser : parserList) { + Object valueObj = valueParser.parse(sourceData, rowIndex, context); + if (valueObj == null) { + return null; + } + + BigDecimal value = OperatorTools.parseBigDecimal(valueObj); + if (maxValue == null || value.compareTo(maxValue) > 0) { + maxValue = value; + } + } + return maxValue; + } + +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LeastFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LeastFunction.java new file mode 100644 index 0000000000..08fe5ca626 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LeastFunction.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; +/** + * LeastFunction + * description: LEAST(value1[, value2]*)--Returns the least value of the list of arguments. + * Returns NULL if any argument is NULL. + */ +@TransformFunction(names = {"least"}) +public class LeastFunction implements ValueParser { + + private List parserList; + + public LeastFunction(Function expr) { + if (expr.getParameters() == null) { + this.parserList = new ArrayList<>(); + } else { + List params = expr.getParameters().getExpressions(); + parserList = new ArrayList<>(params.size()); + for (Expression param : params) { + ValueParser node = OperatorTools.buildParser(param); + parserList.add(node); + } + } + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + BigDecimal minValue = null; + for (ValueParser valueParser : parserList) { + Object valueObj = valueParser.parse(sourceData, rowIndex, context); + if (valueObj == null) { + return null; + } + + BigDecimal value = OperatorTools.parseBigDecimal(valueObj); + if (minValue == null || value.compareTo(minValue) < 0) { + minValue = value; + } + } + return minValue; + } + +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/AbstractFunctionArithmeticTestBase.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/AbstractFunctionArithmeticTestBase.java index 94e4912a7f..c37acb466d 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/AbstractFunctionArithmeticTestBase.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/AbstractFunctionArithmeticTestBase.java @@ -37,7 +37,7 @@ public abstract class AbstractFunctionArithmeticTestBase { protected static final KvSinkInfo kvSink; static { - for (int i = 1; i < 5; i++) { + for (int i = 1; i <= 5; i++) { FieldInfo field = new FieldInfo(); field.setName("numeric" + i); srcFields.add(field); diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestGreatestFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestGreatestFunction.java new file mode 100644 index 0000000000..1dafa463c8 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestGreatestFunction.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.arithmetic; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestGreatestFunction extends AbstractFunctionArithmeticTestBase { + + @Test + public void testGreatestFunction() throws Exception { + String transformSql1 = "select greatest(numeric1, greatest(numeric2, numeric3, numeric4)) from source"; + TransformConfig config1 = new TransformConfig(transformSql1); + TransformProcessor processor1 = TransformProcessor + .create(config1, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case: greatest(1, greatest(2, 3, 4)) + List output1 = processor1.transform("1|2|3|4", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=4"); + + // case: greatest(3.14, greatest(7, 2, 1)) + List output2 = processor1.transform("3.14|7|2|1", new HashMap<>()); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals(output2.get(0), "result=7"); + + String transformSql2 = "select greatest(numeric1, numeric2, greatest(numeric3, numeric4)) from source"; + TransformConfig config2 = new TransformConfig(transformSql2); + TransformProcessor processor2 = TransformProcessor + .create(config2, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case: greatest(3.141592653589793, 3, greatest(4, 1)) + List output3 = processor2.transform("3.141592653589793|3|4|1", new HashMap<>()); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "result=4"); + + // case: greatest(-9223372036854775808, 1, greatest(-2, 3)) + List output4 = processor2.transform("-9223372036854775808|1|-2|3", new HashMap<>()); + Assert.assertEquals(1, output4.size()); + Assert.assertEquals(output4.get(0), "result=3"); + + String transformSql3 = + "select greatest(numeric1, greatest(numeric2, numeric3), greatest(numeric4, numeric5)) from source"; + TransformConfig config3 = new TransformConfig(transformSql3); + TransformProcessor processor3 = TransformProcessor + .create(config3, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case: greatest(1, greatest(-2, -5), greatest(3.14836, 8)) + List output5 = processor3.transform("1|-2|-5|3.14836|8", new HashMap<>()); + Assert.assertEquals(1, output5.size()); + Assert.assertEquals(output5.get(0), "result=8"); + + String transformSql4 = + "select greatest(numeric1, least(numeric2, numeric3), greatest(numeric4, numeric5)) from source"; + TransformConfig config4 = new TransformConfig(transformSql4); + TransformProcessor processor4 = TransformProcessor + .create(config4, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case: greatest(1, least(89, 10), greatest(3.14836, 8)) + List output6 = processor4.transform("-1|89|10|3.14836|8", new HashMap<>()); + Assert.assertEquals(1, output6.size()); + Assert.assertEquals(output6.get(0), "result=10"); + + // case: greatest(1, least(-2, ), greatest(3.14836, 8)) + List output7 = processor4.transform("1|-2||3.14836|8", new HashMap<>()); + Assert.assertEquals(1, output7.size()); + Assert.assertEquals(output7.get(0), "result="); + } + +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestLeastFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestLeastFunction.java new file mode 100644 index 0000000000..9fe2698bc6 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestLeastFunction.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.arithmetic; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestLeastFunction extends AbstractFunctionArithmeticTestBase { + + @Test + public void testLeastFunction() throws Exception { + String transformSql1 = "select least(numeric1, least(numeric2, numeric3, numeric4)) from source"; + TransformConfig config1 = new TransformConfig(transformSql1); + TransformProcessor processor1 = TransformProcessor + .create(config1, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case: least(3.14, least(7, 2, 1)) + List output2 = processor1.transform("3.14|7|2|1", new HashMap<>()); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals(output2.get(0), "result=1"); + + String transformSql2 = "select least(numeric1, numeric2, least(numeric3, numeric4)) from source"; + TransformConfig config2 = new TransformConfig(transformSql2); + TransformProcessor processor2 = TransformProcessor + .create(config2, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case: least(3.141592653589793, 4, least(3.33, 3.4)) + List output3 = processor2.transform("3.141592653589793|4|3.33|3.4", new HashMap<>()); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "result=3.141592653589793"); + + // case: least(-9223372036854775808, 1, least(-2, 3)) + List output4 = processor2.transform("-9223372036854775808|1|-2|3", new HashMap<>()); + Assert.assertEquals(1, output4.size()); + Assert.assertEquals(output4.get(0), "result=-9223372036854775808"); + + String transformSql3 = + "select least(numeric1, least(numeric2, numeric3), least(numeric4, numeric5)) from source"; + TransformConfig config3 = new TransformConfig(transformSql3); + TransformProcessor processor3 = TransformProcessor + .create(config3, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case: least(1, least(-2, -5), least(3.14836, 8)) + List output5 = processor3.transform("1|-2|-5|3.14836|8", new HashMap<>()); + Assert.assertEquals(1, output5.size()); + Assert.assertEquals(output5.get(0), "result=-5"); + + // case: least(1, least(-2, -5), least(, 8)) + List output6 = processor3.transform("1|-2|-5||8", new HashMap<>()); + Assert.assertEquals(1, output6.size()); + Assert.assertEquals(output6.get(0), "result="); + } + +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/operator/TestIsBooleanOperator.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/operator/TestIsBooleanOperator.java index 8aec150567..934618f5e2 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/operator/TestIsBooleanOperator.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/operator/TestIsBooleanOperator.java @@ -19,18 +19,42 @@ import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo; +import org.apache.inlong.sdk.transform.pojo.FieldInfo; +import org.apache.inlong.sdk.transform.pojo.KvSinkInfo; import org.apache.inlong.sdk.transform.pojo.TransformConfig; import org.apache.inlong.sdk.transform.process.TransformProcessor; +import org.apache.inlong.sdk.transform.process.converter.BooleanConverter; import org.apache.inlong.sdk.transform.process.function.arithmetic.AbstractFunctionArithmeticTestBase; import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; public class TestIsBooleanOperator extends AbstractFunctionArithmeticTestBase { + private static final List srcFields = new ArrayList<>(); + private static final List dstFields = new ArrayList<>(); + private static final CsvSourceInfo csvSource; + private static final KvSinkInfo kvSink; + + static { + for (int i = 1; i < 5; i++) { + FieldInfo field = new FieldInfo(); + field.setName("numeric" + i); + srcFields.add(field); + } + srcFields.add(new FieldInfo("booleanVal", new BooleanConverter())); + FieldInfo field = new FieldInfo(); + field.setName("result"); + dstFields.add(field); + csvSource = new CsvSourceInfo("UTF-8", '|', '\\', srcFields); + kvSink = new KvSinkInfo("UTF-8", dstFields); + } + @Test public void testIsBooleanOperator() throws Exception { String transformSql = null, data = null;