From 58fd7ddd85467658120f995a82afc68f5583fac6 Mon Sep 17 00:00:00 2001 From: Zkplo <87751516+Zkplo@users.noreply.github.com> Date: Thu, 10 Oct 2024 11:34:00 +0800 Subject: [PATCH] [INLONG-11300][SDK] Transform SQL supports "JSON_ARRAY_APPEND" function (#11310) Co-authored-by: ZKpLo <14148880+zkplo@user.noreply.gitee.com> --- .../function/JsonArrayAppendFunction.java | 127 ++++++++++++++++++ .../string/TestJsonArrayAppendFunction.java | 93 +++++++++++++ 2 files changed, 220 insertions(+) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/JsonArrayAppendFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestJsonArrayAppendFunction.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/JsonArrayAppendFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/JsonArrayAppendFunction.java new file mode 100644 index 0000000000..58c144c283 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/JsonArrayAppendFunction.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONPath; +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.ArrayList; +import java.util.List; + +/** + * JsonArrayAppendFunction -> JSON_ARRAY_APPEND(json_doc, path, val[, path, val] ...) + * description: + * - return NULL if any argument is NULL. + * - return the result of appends values to the end of the indicated arrays within a JSON document. + */ +@TransformFunction(names = {"json_array_append"}) +public class JsonArrayAppendFunction implements ValueParser { + + private ValueParser jsonDocParser; + private List pathValuePairsParser; + + public JsonArrayAppendFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + jsonDocParser = OperatorTools.buildParser(expressions.get(0)); + pathValuePairsParser = new ArrayList<>(); + for (int i = 1; i < expressions.size(); i++) { + pathValuePairsParser.add(OperatorTools.buildParser(expressions.get(i))); + } + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object jsonDocObj = jsonDocParser.parse(sourceData, rowIndex, context); + if (jsonDocObj == null) { + return null; + } + ArrayList pathValuePairs = new ArrayList<>(); + for (ValueParser valueParser : pathValuePairsParser) { + pathValuePairs.add(valueParser.parse(sourceData, rowIndex, context)); + } + return jsonArrayAppend(jsonDocObj.toString(), pathValuePairs); + } + + public static String jsonArrayAppend(String jsonDoc, ArrayList pathValuePairs) { + if (jsonDoc == null || pathValuePairs == null || pathValuePairs.size() % 2 != 0) { + return null; + } + + Object jsonObject; + try { + jsonObject = JSON.parse(jsonDoc); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid JSON document", e); + } + + // Process each pair of path and val + for (int i = 0; i < pathValuePairs.size(); i += 2) { + String path = (String) pathValuePairs.get(i); + Object value = pathValuePairs.get(i + 1); + + // Attempt to append a value to the array pointed to by the specified path + try { + jsonObject = appendValueToArray(jsonObject, path, value); + } catch (Exception e) { + throw new IllegalArgumentException("Error processing path: " + path, e); + } + } + + return JSON.toJSONString(jsonObject); + } + + /** + * Append values to an array at a specified path + * + * @param jsonObject The object parsed by jsonDoc + * @param path path + * @param value value + */ + private static Object appendValueToArray(Object jsonObject, String path, Object value) { + Object targetNode = JSONPath.eval(jsonObject, path); + + if (targetNode == null) { + throw new IllegalArgumentException("Target path does not exist."); + } + + // If it is already an array type, simply append it + if (targetNode instanceof JSONArray) { + JSONArray array = (JSONArray) targetNode; + array.add(value); + } + // If it is a non array type, convert it to an array and append it + else { + JSONArray newArray = new JSONArray(); + newArray.add(targetNode); + newArray.add(value); + if ("$".equals(path)) { + return newArray; + } + JSONPath.set(jsonObject, path, newArray); + } + return jsonObject; + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestJsonArrayAppendFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestJsonArrayAppendFunction.java new file mode 100644 index 0000000000..84d3e72882 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestJsonArrayAppendFunction.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.string; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestJsonArrayAppendFunction extends AbstractFunctionStringTestBase { + + @Test + public void testJsonArrayAppendFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select json_array_append(string1,string2,string3) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: json_array_append(["a", ["b", "c"], "d"], $[1],1) + data = "[\\\"a\\\",[\\\"b\\\",\\\"c\\\"],\\\"d\\\"]|$[1]|1|"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[\"a\",[\"b\",\"c\",\"1\"],\"d\"]", output.get(0)); + + // case2: json_array_append(["a", ["b", "c"], "d"], $[0],2) + data = "[\\\"a\\\",[\\\"b\\\",\\\"c\\\"],\\\"d\\\"]|$[0]|2|"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[[\"a\",\"2\"],[\"b\",\"c\"],\"d\"]", output.get(0)); + + // case3: json_array_append(["a", ["b", "c"], "d"],$[1][0],3) + data = "[\\\"a\\\",[\\\"b\\\",\\\"c\\\"],\\\"d\\\"]|$[1][0]|3|"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[\"a\",[[\"b\",\"3\"],\"c\"],\"d\"]", output.get(0)); + + // case4: json_array_append("{\"a\": 1, \"b\": [2, 3], \"c\": 4}",$.b,"x") + data = "{\\\"a\\\": 1, \\\"b\\\": [2, 3], \\\"c\\\": 4}|$.b|x|"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result={\"a\":1,\"b\":[2,3,\"x\"],\"c\":4}", output.get(0)); + + // case5: json_array_append("{\"a\": 1, \"b\": [2, 3], \"c\": 4}",$.c,"y") + data = "{\\\"a\\\": 1, \\\"b\\\": [2, 3], \\\"c\\\": 4}|$.c|y|"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result={\"a\":1,\"b\":[2,3],\"c\":[4,\"y\"]}", output.get(0)); + + // case6: json_array_append("{"a": 1}",$,"z") + data = "{\\\"a\\\": 1}|$|z|"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[{\"a\":1},\"z\"]", output.get(0)); + + transformSql = "select json_array_append(string1,string2,string3,numeric1,numeric2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case7: json_array_append(["a", ["b", "c"], "d"],$[0],2,$[1],3) + data = "[\\\"a\\\", [\\\"b\\\", \\\"c\\\"], \\\"d\\\"]|$[0]|2|$[1]|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[[\"a\",\"2\"],[\"b\",\"c\",\"3\"],\"d\"]", output.get(0)); + } +}