From 9d9ba044fd5438f1071264c07fd1443373c2755c Mon Sep 17 00:00:00 2001 From: Zkplo <87751516+Zkplo@users.noreply.github.com> Date: Tue, 15 Oct 2024 18:35:41 +0800 Subject: [PATCH] [INLONG-11303][SDK] Transform SQL supports "JSON_REMOVE" function (#11344) --- .../json/JsonArrayAppendFunction.java | 4 +- .../json/JsonArrayInsertFunction.java | 7 +- .../function/json/JsonRemoveFunction.java | 96 +++++++++++++++++++ .../json/TestJsonArrayAppendFunction.java | 14 ++- .../json/TestJsonArrayInsertFunction.java | 6 ++ .../function/json/TestJsonRemoveFunction.java | 77 +++++++++++++++ 6 files changed, 196 insertions(+), 8 deletions(-) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/json/JsonRemoveFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/json/TestJsonRemoveFunction.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/json/JsonArrayAppendFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/json/JsonArrayAppendFunction.java index c7d5fbb684..6df4f15820 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/json/JsonArrayAppendFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/json/JsonArrayAppendFunction.java @@ -91,9 +91,9 @@ public static String jsonArrayAppend(String jsonDoc, ArrayList pathValue // Attempt to append a value to the array pointed to by the specified path try { - jsonObject = appendValueToArray(jsonObject, path, value); + jsonObject = appendValueToArray(jsonObject, path, JSON.parse(value.toString())); } catch (Exception e) { - throw new IllegalArgumentException("Error processing path: " + path, e); + jsonObject = appendValueToArray(jsonObject, path, value); } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/json/JsonArrayInsertFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/json/JsonArrayInsertFunction.java index 580c427ed2..6bbb6b62f1 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/json/JsonArrayInsertFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/json/JsonArrayInsertFunction.java @@ -117,8 +117,11 @@ private void insertIntoArray(JSONArray array, String path, Object value) { // If the index exceeds the length of the array, insert at the end if (index >= array.size()) { - array.add(value); - } else { + index = array.size(); + } + try { + array.add(index, JSON.parse(value.toString())); + } catch (Exception ignored) { array.add(index, value); } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/json/JsonRemoveFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/json/JsonRemoveFunction.java new file mode 100644 index 0000000000..da1de78b5e --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/json/JsonRemoveFunction.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.json; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONPath; +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.ArrayList; +import java.util.List; + +/** + * JsonRemoveFunction -> JSON_REMOVE(json_doc, path[, path] ...) + * description: + * - return NULL if any argument is NULL or the 'json_doc' argument is not a valid JSON document or any path argument + * is not a valid path expression or is $ or contains a * or ** wildcard; + * - return the result of removing data from 'json_doc'. + */ +@TransformFunction(names = { + "json_remove"}, parameter = "(String json_doc, String path1[, String path2, ...])", descriptions = { + "- Return \"\" if any argument is NULL or the 'json_doc' argument is not a valid JSON document or any " + + + "path argument is not a valid path expression or is $ or contains a * or ** wildcard;", + "- Return the result of removing data from 'json_doc'." + }, examples = { + "json_remove(\"{\\\"name\\\":\\\"Charlie\\\",\\\"hobbies\\\":[[\\\"swimming1\\\",\\\"swimming2\\\"]," + + "\\\"reading\\\",\\\"coding\\\"]}\",\"$.age\") = {\"hobbies\":[[\"swimming2\"],\"coding\"]," + + "\"name\":\"Charlie\"}" + }) +public class JsonRemoveFunction implements ValueParser { + + private ValueParser jsonDocParser; + private List pathsParser; + + public JsonRemoveFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + jsonDocParser = OperatorTools.buildParser(expressions.get(0)); + pathsParser = new ArrayList<>(); + for (int i = 1; i < expressions.size(); i++) { + pathsParser.add(OperatorTools.buildParser(expressions.get(i))); + } + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object jsonDocObj = jsonDocParser.parse(sourceData, rowIndex, context); + if (jsonDocObj == null) { + return null; + } + ArrayList pathValuePairs = new ArrayList<>(); + for (ValueParser valueParser : pathsParser) { + pathValuePairs.add(valueParser.parse(sourceData, rowIndex, context).toString()); + } + return jsonRemove(jsonDocObj.toString(), pathValuePairs); + } + + private String jsonRemove(String jsonDoc, ArrayList paths) { + if (jsonDoc == null || paths == null) { + return null; + } + + Object json = JSON.parse(jsonDoc); + + for (String path : paths) { + if (path.equals("$") || path.contains("*") || path.contains("**")) { + throw new IllegalArgumentException("Invalid path expression: " + path); + } + + JSONPath.remove(json, path); + } + + return json.toString(); + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/json/TestJsonArrayAppendFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/json/TestJsonArrayAppendFunction.java index fc5c43e423..f6d823beb8 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/json/TestJsonArrayAppendFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/json/TestJsonArrayAppendFunction.java @@ -47,19 +47,19 @@ public void testJsonArrayAppendFunction() throws Exception { data = "[\\\"a\\\",[\\\"b\\\",\\\"c\\\"],\\\"d\\\"]|$[1]|1|"; output = processor.transform(data, new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=[\"a\",[\"b\",\"c\",\"1\"],\"d\"]", output.get(0)); + Assert.assertEquals("result=[\"a\",[\"b\",\"c\",1],\"d\"]", output.get(0)); // case2: json_array_append(["a", ["b", "c"], "d"], $[0],2) data = "[\\\"a\\\",[\\\"b\\\",\\\"c\\\"],\\\"d\\\"]|$[0]|2|"; output = processor.transform(data, new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=[[\"a\",\"2\"],[\"b\",\"c\"],\"d\"]", output.get(0)); + Assert.assertEquals("result=[[\"a\",2],[\"b\",\"c\"],\"d\"]", output.get(0)); // case3: json_array_append(["a", ["b", "c"], "d"],$[1][0],3) data = "[\\\"a\\\",[\\\"b\\\",\\\"c\\\"],\\\"d\\\"]|$[1][0]|3|"; output = processor.transform(data, new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=[\"a\",[[\"b\",\"3\"],\"c\"],\"d\"]", output.get(0)); + Assert.assertEquals("result=[\"a\",[[\"b\",3],\"c\"],\"d\"]", output.get(0)); // case4: json_array_append("{\"a\": 1, \"b\": [2, 3], \"c\": 4}",$.b,"x") data = "{\\\"a\\\": 1, \\\"b\\\": [2, 3], \\\"c\\\": 4}|$.b|x|"; @@ -88,6 +88,12 @@ public void testJsonArrayAppendFunction() throws Exception { data = "[\\\"a\\\", [\\\"b\\\", \\\"c\\\"], \\\"d\\\"]|$[0]|2|$[1]|3"; output = processor.transform(data, new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=[[\"a\",\"2\"],[\"b\",\"c\",\"3\"],\"d\"]", output.get(0)); + Assert.assertEquals("result=[[\"a\",2],[\"b\",\"c\",3],\"d\"]", output.get(0)); + + // case8: json_array_append(["a", ["b", "c"], "d"],$[0],"[\"inlong\"]",$[0][1],3) + data = "[\\\"a\\\", [\\\"b\\\", \\\"c\\\"], \\\"d\\\"]|$[0]|[\\\"inlong\\\"]|$[0][1]|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[[\"a\",[\"inlong\",3]],[\"b\",\"c\"],\"d\"]", output.get(0)); } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/json/TestJsonArrayInsertFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/json/TestJsonArrayInsertFunction.java index 27f0bf7222..ba44914604 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/json/TestJsonArrayInsertFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/json/TestJsonArrayInsertFunction.java @@ -86,5 +86,11 @@ public void testJsonArrayInsertFunction() throws Exception { Assert.assertEquals(1, output.size()); Assert.assertEquals("result=[\"x\",\"a\",{\"b\":[1,2]},[3,\"y\",4]]", output.get(0)); + // case7: json_array_append(["a", {"b": [1, 2]}, [3, 4]], "$[0]", "[\"inlong\"]", "$[0][0]", "apache") + data = "[\\\"a\\\", {\\\"b\\\": [1, 2]}, [3, 4]]|$[0]|[\\\"inlong\\\"]|$[0][0]|apache"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[[\"apache\",\"inlong\"],\"a\",{\"b\":[1,2]},[3,4]]", output.get(0)); + } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/json/TestJsonRemoveFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/json/TestJsonRemoveFunction.java new file mode 100644 index 0000000000..ae759cac81 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/json/TestJsonRemoveFunction.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.json; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestJsonRemoveFunction extends AbstractFunctionJsonTestBase { + + @Test + public void testJsonRemoveFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + String json_doc = + "{\\\"name\\\":\\\"Alice\\\",\\\"age\\\":30,\\\"address\\\":{\\\"city\\\":\\\"Wonderland\\\",\\\"zip\\\":\\\"12345\\\"}}"; + transformSql = "select json_remove(string1,string2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case1: json_remove("{\"name\":\"Alice\",\"age\":30,\"address\":{\"city\":\"Wonderland\",\"zip\":\"12345\"}}", + // "$.age") + data = json_doc + "|$.age|"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result={\"address\":{\"zip\":\"12345\",\"city\":\"Wonderland\"},\"name\":\"Alice\"}", + output.get(0)); + + // case2: json_remove("{\"name\":\"Alice\",\"age\":30,\"address\":{\"city\":\"Wonderland\",\"zip\":\"12345\"}}", + // "$.address.zip") + data = json_doc + "|$.address.zip|"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result={\"address\":{\"city\":\"Wonderland\"},\"name\":\"Alice\",\"age\":30}", + output.get(0)); + + json_doc = + "{\\\"name\\\":\\\"Charlie\\\",\\\"hobbies\\\":[[\\\"swimming1\\\",\\\"swimming2\\\"],\\\"reading\\\",\\\"coding\\\"]}"; + transformSql = "select json_remove(string1,string2,string3) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case3: + // json_remove("{\"name\":\"Charlie\",\"hobbies\":[[\"swimming1\",\"swimming2\"],\"reading\",\"coding\"]}","$.age") + data = json_doc + "|$.hobbies[1]|$.hobbies[0][0]|"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result={\"hobbies\":[[\"swimming2\"],\"coding\"],\"name\":\"Charlie\"}", output.get(0)); + } +}