From e6e9e49c9099968672b956f21872ab8af5a6d6d7 Mon Sep 17 00:00:00 2001 From: Zkplo <87751516+Zkplo@users.noreply.github.com> Date: Tue, 15 Oct 2024 17:43:34 +0800 Subject: [PATCH] [INLONG-11304][SDK] Transform SQL supports "JSON_REPLACE" function (#11345) --- .../function/json/JsonReplaceFunction.java | 113 ++++++++++++++++++ .../json/TestJsonReplaceFunction.java | 88 ++++++++++++++ 2 files changed, 201 insertions(+) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/json/JsonReplaceFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/json/TestJsonReplaceFunction.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/json/JsonReplaceFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/json/JsonReplaceFunction.java new file mode 100644 index 0000000000..ce12538f3b --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/json/JsonReplaceFunction.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.json; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONPath; +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.ArrayList; +import java.util.List; + +/** + * JsonReplaceFunction -> JSON_REPLACE(json_doc, path1, val1[, path2, val2, ...] ): + * description: + * - Return NULL if any argument is NULL; + * - Return the result of replacing existing values in 'json_doc'. + * Note: An error occurs if the 'json_doc' argument is not a valid JSON document or any path argument is not a valid + * path expression or contains a * or ** wildcard. + */ +@TransformFunction(names = { + "json_replace"}, parameter = "(String json_doc, String path1, String val1[, String path2, String val2, ...] )", descriptions = { + "- Return \"\" if any argument is NULL or the 'json_doc' argument is not a valid JSON document or any " + + + "path argument is not a valid path expression or contains a * or ** wildcard;", + "- Return the result of replacing existing values in 'json_doc'." + }, examples = { + "json_replace(\"{ \\\"a\\\": 1, \\\"b\\\": [2, 3]}\", \"$.a\", 10, \"$.c\", \"[true, false]\") = " + + "{\"a\": 10, \"b\": [2, 3]}" + }) +public class JsonReplaceFunction implements ValueParser { + + private ValueParser jsonDocParser; + private List pathValuePairsParser; + + public JsonReplaceFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + jsonDocParser = OperatorTools.buildParser(expressions.get(0)); + pathValuePairsParser = new ArrayList<>(); + for (int i = 1; i < expressions.size(); i++) { + pathValuePairsParser.add(OperatorTools.buildParser(expressions.get(i))); + } + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object jsonDocObj = jsonDocParser.parse(sourceData, rowIndex, context); + if (jsonDocObj == null) { + return null; + } + ArrayList pathValuePairs = new ArrayList<>(); + for (ValueParser valueParser : pathValuePairsParser) { + pathValuePairs.add(valueParser.parse(sourceData, rowIndex, context)); + } + return jsonReplace(jsonDocObj.toString(), pathValuePairs); + } + + private String jsonReplace(String jsonDoc, ArrayList pathValuePairs) { + if (jsonDoc == null || pathValuePairs == null || pathValuePairs.size() % 2 != 0) { + return null; + } + + Object json = JSON.parse(jsonDoc); + + for (int i = 0; i < pathValuePairs.size(); i += 2) { + String path = (String) pathValuePairs.get(i); + Object value = pathValuePairs.get(i + 1); + + // If the path is' $', replace the entire JSON directly + if (path.equals("$")) { + try { + json = JSON.parse(value.toString()); + } catch (Exception ignored) { + json = value; + } + } else if (path.contains("*") || path.contains("**")) { + throw new IllegalArgumentException("Invalid path expression: " + path); + } else { + // If the path exists, replace the corresponding value. If the path does not exist, ignore it + if (JSONPath.contains(json, path)) { + try { + JSONPath.set(json, path, JSON.parse(value.toString())); + } catch (Exception ignored) { + JSONPath.set(json, path, value); + } + } + } + } + return json.toString(); + } + +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/json/TestJsonReplaceFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/json/TestJsonReplaceFunction.java new file mode 100644 index 0000000000..b41b9c57eb --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/json/TestJsonReplaceFunction.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.json; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; +import org.apache.inlong.sdk.transform.process.function.string.AbstractFunctionStringTestBase; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestJsonReplaceFunction extends AbstractFunctionStringTestBase { + + @Test + public void testJsonReplaceFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + String json_doc = ""; + transformSql = "select json_replace(string1,string2,string3) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: json_replace(json_doc, $.age, 35) + json_doc = "{\\\"age\\\":30}"; + data = json_doc + "|$.age|35|"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result={\"age\":35}", output.get(0)); + + // case2: json_replace(json_doc, $.address.zip, 99999) + json_doc = "{\\\"address\\\":{\\\"zip\\\":\\\"12345\\\"}}"; + data = json_doc + "|$.address.zip|99999|"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result={\"address\":{\"zip\":99999}}", output.get(0)); + + // case3: json_replace(json_doc, $.hobbies[1], hiking) + json_doc = "{\\\"hobbies\\\":[\\\"swimming\\\",\\\"reading\\\",\\\"coding\\\"]}"; + data = json_doc + "|$.hobbies[1]|hiking|"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result={\"hobbies\":[\"swimming\",\"hiking\",\"coding\"]}", output.get(0)); + + // case4: json_replace(json_doc, $, root_value) + data = json_doc + "|$|root_value|"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=root_value", output.get(0)); + + transformSql = "select json_replace(string1,string2,string3,numeric1,numeric2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case5: json_replace(json_doc, $, {\"name\":\"inlong\"},"$.name","apache.inlong") + data = json_doc + "|$|{\\\"name\\\":\\\"inlong\\\"}|$.name|apache.inlong"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result={\"name\":\"apache.inlong\"}", output.get(0)); + + } +}