diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonNode.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonNode.java index adf6cc10976..6a446164f1e 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonNode.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonNode.java @@ -21,16 +21,26 @@ import org.apache.commons.lang.math.NumberUtils; import org.apache.commons.lang3.StringUtils; +import java.util.ArrayList; +import java.util.List; + /** * JsonNode - * + * + * This class represents a node in a JSON structure. It can handle both simple + * nodes and nodes that represent arrays with indices. + * + * Example: + * For a JSON path like "arr(0, 1, 2)", the `JsonNode` will parse it as: + * - name: "arr" + * - arrayIndices: [0, 1, 2] */ @Data public class JsonNode { private String name; private boolean isArray = false; - private int arrayIndex = -1; + private List arrayIndices = new ArrayList<>(); public JsonNode(String nodeString) { int beginIndex = nodeString.indexOf('('); @@ -41,9 +51,14 @@ public JsonNode(String nodeString) { int endIndex = nodeString.lastIndexOf(')'); if (endIndex >= 0) { this.isArray = true; - this.arrayIndex = NumberUtils.toInt(nodeString.substring(beginIndex + 1, endIndex), -1); - if (this.arrayIndex < 0) { - this.arrayIndex = 0; + String indicesString = nodeString.substring(beginIndex + 1, endIndex); + String[] indices = indicesString.split(","); + for (String index : indices) { + int arrayIndex = NumberUtils.toInt(StringUtils.trim(index), -1); + if (arrayIndex < 0) { + arrayIndex = 0; + } + this.arrayIndices.add(arrayIndex); } } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceData.java index 539f4b06dc5..ea9d296a972 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceData.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceData.java @@ -117,20 +117,35 @@ public String getField(int rowNum, String fieldName) { continue; } // node is an array - if (!newElement.isJsonArray()) { + current = getElementFromArray(node, newElement); + if (current == null) { // error data return ""; } - JsonArray newArray = newElement.getAsJsonArray(); - if (node.getArrayIndex() >= newArray.size()) { - // error data - return ""; - } - current = newArray.get(node.getArrayIndex()); } return current.getAsString(); } catch (Exception e) { return ""; } } + + private JsonElement getElementFromArray(JsonNode node, JsonElement curElement) { + if (node.getArrayIndices().isEmpty()) { + // error data + return null; + } + for (int index : node.getArrayIndices()) { + if (!curElement.isJsonArray()) { + // error data + return null; + } + JsonArray newArray = curElement.getAsJsonArray(); + if (index >= newArray.size()) { + // error data + return null; + } + curElement = newArray.get(index); + } + return curElement; + } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java index 426ae167a28..ec67d95d9ea 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java @@ -99,19 +99,16 @@ public SourceData decode(String srcString, Context context) { // error data return new JsonSourceData(root, null); } + // node is not array if (!node.isArray()) { current = newElement; - } else { - if (!newElement.isJsonArray()) { - // error data - return new JsonSourceData(root, null); - } - JsonArray newArray = newElement.getAsJsonArray(); - if (node.getArrayIndex() >= newArray.size()) { - // error data - return new JsonSourceData(root, null); - } - current = newArray.get(node.getArrayIndex()); + continue; + } + // node is an array + current = getElementFromArray(node, newElement); + if (current == null) { + // error data + return new JsonSourceData(root, null); } } if (!current.isJsonArray()) { @@ -121,4 +118,24 @@ public SourceData decode(String srcString, Context context) { childRoot = current.getAsJsonArray(); return new JsonSourceData(root, childRoot); } + + private JsonElement getElementFromArray(JsonNode node, JsonElement curElement) { + if (node.getArrayIndices().isEmpty()) { + // error data + return null; + } + for (int index : node.getArrayIndices()) { + if (!curElement.isJsonArray()) { + // error data + return null; + } + JsonArray newArray = curElement.getAsJsonArray(); + if (index >= newArray.size()) { + // error data + return null; + } + curElement = newArray.get(index); + } + return curElement; + } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestJson2CsvProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestJson2CsvProcessor.java index 704f2441e91..f649c5a8f7c 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestJson2CsvProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestJson2CsvProcessor.java @@ -88,6 +88,55 @@ public void testJson2Csv() throws Exception { Assert.assertEquals(2, output2.size()); Assert.assertEquals(output2.get(0), "value1|item1|1001|1002msg"); Assert.assertEquals(output2.get(1), "value1|item2|2001|2002msg"); + // case 3 + List fields3 = this.getTestFieldList("matrix(0,0)", "matrix(1,1)", "matrix(2,2)"); + JsonSourceInfo jsonSource3 = new JsonSourceInfo("UTF-8", ""); + CsvSinkInfo csvSink3 = new CsvSinkInfo("UTF-8", '|', '\\', fields3); + String transformSql3 = "select $root.matrix(0, 0), $root.matrix(1, 1), $root.matrix(2, 2) from source"; + TransformConfig config3 = new TransformConfig(transformSql3); + TransformProcessor processor3 = TransformProcessor + .create(config3, SourceDecoderFactory.createJsonDecoder(jsonSource3), + SinkEncoderFactory.createCsvEncoder(csvSink3)); + String srcString3 = "{\n" + + " \"matrix\": [\n" + + " [1, 2, 3],\n" + + " [4, 5, 6],\n" + + " [7, 8, 9]\n" + + " ]\n" + + "}"; + List output3 = processor3.transform(srcString3, new HashMap<>()); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "1|5|9"); + // case 4 + List fields4 = this.getTestFieldList("department_name", "course_id", "num"); + JsonSourceInfo jsonSource4 = new JsonSourceInfo("UTF-8", ""); + CsvSinkInfo csvSink4 = new CsvSinkInfo("UTF-8", '|', '\\', fields4); + String transformSql4 = + "select $root.departments(0).name, $root.departments(0).courses(0,1).courseId, sqrt($root.departments(0).courses(0,1).courseId - 2) from source"; + TransformConfig config4 = new TransformConfig(transformSql4); + TransformProcessor processor4 = TransformProcessor + .create(config4, SourceDecoderFactory.createJsonDecoder(jsonSource4), + SinkEncoderFactory.createCsvEncoder(csvSink4)); + String srcString4 = "{\n" + + " \"departments\": [\n" + + " {\n" + + " \"name\": \"Mathematics\",\n" + + " \"courses\": [\n" + + " [\n" + + " {\"courseId\": \"101\", \"title\": \"Calculus I\"},\n" + + " {\"courseId\": \"102\", \"title\": \"Linear Algebra\"}\n" + + " ],\n" + + " [\n" + + " {\"courseId\": \"201\", \"title\": \"Calculus II\"},\n" + + " {\"courseId\": \"202\", \"title\": \"Abstract Algebra\"}\n" + + " ]\n" + + " ]\n" + + " }\n" + + " ]\n" + + "}"; + List output4 = processor4.transform(srcString4, new HashMap<>()); + Assert.assertEquals(1, output4.size()); + Assert.assertEquals(output4.get(0), "Mathematics|102|10.0"); } @Test