From 9df3d56a0387f07375f805793e7e08ee38a681f4 Mon Sep 17 00:00:00 2001 From: Xincheng Huang <60057611+ying-hua@users.noreply.github.com> Date: Sat, 14 Sep 2024 10:32:43 +0800 Subject: [PATCH] [INLONG-11030][SDK] Add AVRO formatted data source for Transform (#11082) --- .../inlong/sdk/transform/decode/AvroNode.java | 58 ++++++ .../sdk/transform/decode/AvroSourceData.java | 170 ++++++++++++++++++ .../transform/decode/AvroSourceDecoder.java | 139 ++++++++++++++ .../decode/SourceDecoderFactory.java | 5 + .../sdk/transform/pojo/AvroSourceInfo.java | 56 ++++++ .../processor/AbstractProcessorTestBase.java | 13 ++ .../processor/TestAvro2CsvProcessor.java | 52 ++++++ 7 files changed, 493 insertions(+) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroNode.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceData.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceDecoder.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/AvroSourceInfo.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestAvro2CsvProcessor.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroNode.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroNode.java new file mode 100644 index 00000000000..856d164fb7e --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroNode.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.decode; + +import lombok.Data; +import org.apache.commons.lang.math.NumberUtils; +import org.apache.commons.lang3.StringUtils; + +import java.util.ArrayList; +import java.util.List; + +/** + * AvroNode + */ +@Data +public class AvroNode { + + private String name; + private boolean isArray = false; + private List arrayIndices = new ArrayList<>(); + + public AvroNode(String nodeString) { + int beginIndex = nodeString.indexOf('('); + if (beginIndex < 0) { + this.name = nodeString; + } else { + this.name = StringUtils.trim(nodeString.substring(0, beginIndex)); + int endIndex = nodeString.lastIndexOf(')'); + if (endIndex >= 0) { + this.isArray = true; + String indicesString = nodeString.substring(beginIndex + 1, endIndex); + String[] indices = indicesString.split(","); + for (String index : indices) { + int arrayIndex = NumberUtils.toInt(StringUtils.trim(index), -1); + if (arrayIndex < 0) { + arrayIndex = 0; + } + this.arrayIndices.add(arrayIndex); + } + } + } + } +} \ No newline at end of file diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceData.java new file mode 100644 index 00000000000..c060c89af43 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceData.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.decode; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Type; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.StringUtils; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + +public class AvroSourceData implements SourceData { + + public static final String ROOT_KEY = "$root"; + + public static final String CHILD_KEY = "$child"; + + private GenericRecord root; + + private List childRoot; + + private Charset srcCharset; + + public AvroSourceData(GenericRecord root, List childRoot, Charset srcCharset) { + this.root = root; + this.childRoot = childRoot; + this.srcCharset = srcCharset; + } + + @Override + public int getRowCount() { + if (this.childRoot == null) { + return 1; + } else { + return this.childRoot.size(); + } + } + + @Override + public String getField(int rowNum, String fieldName) { + try { + List childNodes = new ArrayList<>(); + String[] nodeStrings = fieldName.split("\\."); + for (String nodeString : nodeStrings) { + childNodes.add(new AvroNode(nodeString)); + } + // parse + if (childNodes.size() == 0) { + return ""; + } + // first node + AvroNode firstNode = childNodes.get(0); + Object current = root; + Schema curSchema = root.getSchema(); + if (StringUtils.equals(ROOT_KEY, firstNode.getName())) { + current = root; + curSchema = root.getSchema(); + } else if (StringUtils.equals(CHILD_KEY, firstNode.getName())) { + if (rowNum < childRoot.size()) { + current = childRoot.get(rowNum); + curSchema = childRoot.get(rowNum).getSchema(); + } else { + return ""; + } + } else { + // error data + return ""; + } + if (current == null) { + // error data + return ""; + } + // parse other node + for (int i = 1; i < childNodes.size(); i++) { + AvroNode node = childNodes.get(i); + if (curSchema.getType() != Type.RECORD) { + // error data + return ""; + } + Object newElement = ((GenericRecord) current).get(node.getName()); + if (newElement == null) { + // error data + return ""; + } + // node is not array + if (!node.isArray()) { + curSchema = curSchema.getField(node.getName()).schema(); + current = newElement; + continue; + } + // node is an array + current = getElementFromArray(node, newElement, curSchema); + if (current == null) { + // error data + return ""; + } + } + return getNodeAsString(current, curSchema); + } catch (Exception e) { + return ""; + } + } + + private Object getElementFromArray(AvroNode node, Object curElement, Schema curSchema) { + if (node.getArrayIndices().isEmpty()) { + // error data + return null; + } + for (int index : node.getArrayIndices()) { + if (curSchema.getType() != Type.ARRAY) { + // error data + return null; + } + List newArray = (List) curElement; + if (index >= newArray.size()) { + // error data + return null; + } + curSchema = curSchema.getElementType(); + curElement = newArray.get(index); + } + return curElement; + } + + private String getNodeAsString(Object node, Schema schema) { + String fieldValue = ""; + Type fieldType = schema.getType(); + switch (fieldType) { + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case STRING: + case BOOLEAN: + case ENUM: + fieldValue = String.valueOf(node); + break; + case BYTES: + ByteBuffer byteBuffer = (ByteBuffer) node; + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + fieldValue = new String(bytes, srcCharset); + break; + case FIXED: + byteBuffer = (ByteBuffer) node; + bytes = new byte[schema.getFixedSize()]; + fieldValue = new String(bytes, srcCharset); + } + return fieldValue; + } + +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceDecoder.java new file mode 100644 index 00000000000..0f71f282093 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceDecoder.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.decode; + +import org.apache.inlong.sdk.transform.pojo.AvroSourceInfo; +import org.apache.inlong.sdk.transform.process.Context; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Type; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + +public class AvroSourceDecoder implements SourceDecoder { + + private static final Logger LOG = LoggerFactory.getLogger(AvroSourceDecoder.class); + + protected AvroSourceInfo sourceInfo; + private Charset srcCharset = Charset.defaultCharset(); + private String rowsNodePath; + private List childNodes; + + public AvroSourceDecoder(AvroSourceInfo sourceInfo) { + try { + this.sourceInfo = sourceInfo; + if (!StringUtils.isBlank(sourceInfo.getCharset())) { + this.srcCharset = Charset.forName(sourceInfo.getCharset()); + } + this.rowsNodePath = sourceInfo.getRowsNodePath(); + if (!StringUtils.isBlank(rowsNodePath)) { + this.childNodes = new ArrayList<>(); + String[] nodeStrings = this.rowsNodePath.split("\\."); + for (String nodeString : nodeStrings) { + this.childNodes.add(new AvroNode(nodeString)); + } + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new TransformException(e.getMessage(), e); + } + } + + @Override + public SourceData decode(byte[] srcBytes, Context context) { + try { + InputStream inputStream = new ByteArrayInputStream(srcBytes); + DataFileStream dataFileStream = + new DataFileStream<>(inputStream, new GenericDatumReader<>()); + GenericRecord root = dataFileStream.next(); + List childRoot = null; + if (CollectionUtils.isEmpty(childNodes)) { + return new AvroSourceData(root, null, srcCharset); + } + + Object current = root; + Schema curSchema = root.getSchema(); + + for (AvroNode node : childNodes) { + if (curSchema.getType() != Type.RECORD) { + // error data + return new AvroSourceData(root, null, srcCharset); + } + Object newElement = ((GenericRecord) current).get(node.getName()); + if (newElement == null) { + // error data + return new AvroSourceData(root, null, srcCharset); + } + // node is not array + if (!node.isArray()) { + curSchema = curSchema.getField(node.getName()).schema(); + current = newElement; + continue; + } + // node is an array + current = getElementFromArray(node, newElement, curSchema); + if (current == null) { + // error data + return new AvroSourceData(root, null, srcCharset); + } + } + if (curSchema.getType() != Type.ARRAY) { + // error data + return new AvroSourceData(root, null, srcCharset); + } + childRoot = (List) current; + return new AvroSourceData(root, childRoot, srcCharset); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + return null; + } + } + + private Object getElementFromArray(AvroNode node, Object curElement, Schema curSchema) { + if (node.getArrayIndices().isEmpty()) { + // error data + return null; + } + for (int index : node.getArrayIndices()) { + if (curSchema.getType() != Type.ARRAY) { + // error data + return null; + } + List newArray = (List) curElement; + if (index >= newArray.size()) { + // error data + return null; + } + curSchema = curSchema.getElementType(); + curElement = newArray.get(index); + } + return curElement; + } + +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java index b29f6f093c8..06c3dd128b7 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java @@ -17,6 +17,7 @@ package org.apache.inlong.sdk.transform.decode; +import org.apache.inlong.sdk.transform.pojo.AvroSourceInfo; import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo; import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo; import org.apache.inlong.sdk.transform.pojo.KvSourceInfo; @@ -39,4 +40,8 @@ public static JsonSourceDecoder createJsonDecoder(JsonSourceInfo sourceInfo) { public static PbSourceDecoder createPbDecoder(PbSourceInfo sourceInfo) { return new PbSourceDecoder(sourceInfo); } + + public static AvroSourceDecoder createAvroDecoder(AvroSourceInfo sourceInfo) { + return new AvroSourceDecoder(sourceInfo); + } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/AvroSourceInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/AvroSourceInfo.java new file mode 100644 index 00000000000..6c54f8a3ec3 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/AvroSourceInfo.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.pojo; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * AvroSourceInfo + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class AvroSourceInfo extends SourceInfo { + + private String rowsNodePath; + + @JsonCreator + public AvroSourceInfo( + @JsonProperty("charset") String charset, + @JsonProperty("rowsNodePath") String rowsNodePath) { + super(charset); + this.rowsNodePath = rowsNodePath; + } + + /** + * get rowsNodePath + * @return the rowsNodePath + */ + @JsonProperty("rowsNodePath") + public String getRowsNodePath() { + return rowsNodePath; + } + + /** + * set rowsNodePath + * @param rowsNodePath the rowsNodePath to set + */ + public void setRowsNodePath(String rowsNodePath) { + this.rowsNodePath = rowsNodePath; + } +} \ No newline at end of file diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java index 58c2393631d..9f053966678 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java @@ -65,4 +65,17 @@ protected String getPbTestDescription() { + "AudGVzdC5TZGtNZXNzYWdlUgRtc2dzEhwKCXBhY2thZ2VJRBgDIAEoBFIJcGFja2FnZUlEYgZwcm90bzM="; return transformBase64; } + + protected byte[] getAvroTestData() { + String srcString = "T2JqAQIWYXZyby5zY2hlbWHIBXsidHlwZSI6InJlY29yZCIsIm5hbWUiOiJTZGtEYXRhUmVxdWVzdCIs" + + "Im5hbWVzcGFjZSI6InRlc3QiLCJmaWVsZHMiOlt7Im5hbWUiOiJzaWQiLCJ0eXBlIjoic3RyaW5nIn0seyJuYW1lIj" + + "oibXNncyIsInR5cGUiOnsidHlwZSI6ImFycmF5IiwiaXRlbXMiOnsidHlwZSI6InJlY29yZCIsIm5hbWUiOiJTZGtNZ" + + "XNzYWdlIiwiZmllbGRzIjpbeyJuYW1lIjoibXNnIiwidHlwZSI6ImJ5dGVzIn0seyJuYW1lIjoibXNnVGltZSIsInR5" + + "cGUiOiJsb25nIn0seyJuYW1lIjoiZXh0aW5mbyIsInR5cGUiOnsidHlwZSI6Im1hcCIsInZhbHVlcyI6InN0cmluZyJ" + + "9fV19fX0seyJuYW1lIjoicGFja2FnZUlEIiwidHlwZSI6ImxvbmcifV19AI7h/J8SaFCGp012msD3lKMCngEIc2lkMQ" + + "QKQXBwbGXyhcYJBAhrZXkxCGtleTEIa2V5Mgx2YWx1ZTEADEJhbmFuYeSLjBMECGtleTEIa2V5MghrZXkyDHZhbHVlM" + + "gAAgIkPjuH8nxJoUIanTXaawPeUow=="; + byte[] srcBytes = Base64.getDecoder().decode(srcString); + return srcBytes; + } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestAvro2CsvProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestAvro2CsvProcessor.java new file mode 100644 index 00000000000..fa0a3611128 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestAvro2CsvProcessor.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.processor; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.AvroSourceInfo; +import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo; +import org.apache.inlong.sdk.transform.pojo.FieldInfo; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class TestAvro2CsvProcessor extends AbstractProcessorTestBase { + + @Test + public void testAvro2Csv() throws Exception { + List fields = this.getTestFieldList("sid", "packageID", "msgTime", "msg"); + AvroSourceInfo avroSource = new AvroSourceInfo("UTF-8", "msgs"); + CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields); + String transformSql = "select $root.sid,$root.packageID,$child.msgTime,$child.msg from source"; + TransformConfig config = new TransformConfig(transformSql); + // case1 + TransformProcessor processor = TransformProcessor + .create(config, SourceDecoderFactory.createAvroDecoder(avroSource), + SinkEncoderFactory.createCsvEncoder(csvSink)); + byte[] srcBytes = this.getAvroTestData(); + List output = processor.transform(srcBytes); + Assert.assertEquals(2, output.size()); + Assert.assertEquals(output.get(0), "sid1|123456|10011001|Apple"); + Assert.assertEquals(output.get(1), "sid1|123456|20022002|Banana"); + } +}