From f489c51f0e36f23cd9672437fcff3f9330248096 Mon Sep 17 00:00:00 2001 From: Zkplo <87751516+Zkplo@users.noreply.github.com> Date: Wed, 9 Oct 2024 12:49:54 +0800 Subject: [PATCH] [INLONG-11227][SDK] Add Parquet formatted data sink for Transform (#11245) --- .../encode/ParquetByteArrayWriter.java | 177 +++++++++++++++ .../encode/ParquetOutputByteArray.java | 60 +++++ .../transform/encode/ParquetSinkEncoder.java | 106 +++++++++ .../transform/encode/ParquetValueWriter.java | 23 ++ .../transform/encode/ParquetWriteRunner.java | 28 +++ .../transform/encode/SinkEncoderFactory.java | 5 + .../sdk/transform/pojo/ParquetSinkInfo.java | 64 ++++++ .../inlong/sdk/transform/pojo/SinkInfo.java | 1 + .../processor/AbstractProcessorTestBase.java | 49 +++++ .../processor/TestJson2ParquetProcessor.java | 208 ++++++++++++++++++ 10 files changed, 721 insertions(+) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetByteArrayWriter.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetOutputByteArray.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetValueWriter.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetWriteRunner.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/ParquetSinkInfo.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestJson2ParquetProcessor.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetByteArrayWriter.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetByteArrayWriter.java new file mode 100644 index 00000000000..bfb072ea668 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetByteArrayWriter.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.encode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.io.OutputFile; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.RecordConsumer; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; + +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.IOException; +import java.util.Collections; + +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; + +public final class ParquetByteArrayWriter implements Closeable { + + private final org.apache.parquet.hadoop.ParquetWriter writer; + private final ParquetOutputByteArray outputByteArray; + + public static ParquetByteArrayWriter buildWriter(MessageType schema, ParquetWriteRunner writeRunner) + throws IOException { + return new ParquetByteArrayWriter<>(new ParquetOutputByteArray(), schema, writeRunner); + } + + private ParquetByteArrayWriter(ParquetOutputByteArray outputFile, MessageType schema, + ParquetWriteRunner writeRunner) + throws IOException { + this.writer = new Builder(outputFile) + .withType(schema) + .withWriteRunner(writeRunner) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) + .build(); + outputByteArray = outputFile; + } + + @Override + public void close() throws IOException { + this.writer.close(); + } + + public void write(T record) throws IOException { + this.writer.write(record); + } + + public ByteArrayOutputStream getByteArrayOutputStream() { + return outputByteArray.getByteArrayOutputStream(); + } + + private static final class Builder + extends + org.apache.parquet.hadoop.ParquetWriter.Builder> { + + private MessageType schema; + private ParquetWriteRunner writeRunner; + + private Builder(OutputFile file) { + super(file); + } + + public Builder withType(MessageType schema) { + this.schema = schema; + return this; + } + + public Builder withWriteRunner(ParquetWriteRunner writeRunner) { + this.writeRunner = writeRunner; + return this; + } + + @Override + protected Builder self() { + return this; + } + + @Override + protected WriteSupport getWriteSupport(Configuration conf) { + return new ParquetByteArrayWriter.SimpleWriteSupport<>(schema, writeRunner); + } + } + + private static class SimpleWriteSupport extends WriteSupport { + + private final MessageType schema; + private final ParquetWriteRunner writeRunner; + private final ParquetValueWriter valueWriter; + + private RecordConsumer recordConsumer; + + SimpleWriteSupport(MessageType schema, ParquetWriteRunner writeRunner) { + this.schema = schema; + this.writeRunner = writeRunner; + this.valueWriter = this::write; + } + + public void write(String name, Object value) { + int fieldIndex = schema.getFieldIndex(name); + PrimitiveType type = schema.getType(fieldIndex).asPrimitiveType(); + recordConsumer.startField(name, fieldIndex); + + switch (type.getPrimitiveTypeName()) { + case INT32: + recordConsumer.addInteger((int) value); + break; + case INT64: + recordConsumer.addLong((long) value); + break; + case DOUBLE: + recordConsumer.addDouble((double) value); + break; + case BOOLEAN: + recordConsumer.addBoolean((boolean) value); + break; + case FLOAT: + recordConsumer.addFloat((float) value); + break; + case BINARY: + if (type.getLogicalTypeAnnotation() == LogicalTypeAnnotation.stringType()) { + recordConsumer.addBinary(Binary.fromString((String) value)); + } else { + throw new UnsupportedOperationException( + "Don't support writing " + type.getLogicalTypeAnnotation()); + } + break; + default: + throw new UnsupportedOperationException("Don't support writing " + type.getPrimitiveTypeName()); + } + recordConsumer.endField(name, fieldIndex); + } + + @Override + public WriteContext init(Configuration configuration) { + return new WriteContext(schema, Collections.emptyMap()); + } + + @Override + public void prepareForWrite(RecordConsumer recordConsumer) { + this.recordConsumer = recordConsumer; + } + + @Override + public void write(T record) { + recordConsumer.startMessage(); + writeRunner.doWrite(record, valueWriter); + recordConsumer.endMessage(); + } + + @Override + public String getName() { + return null; + } + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetOutputByteArray.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetOutputByteArray.java new file mode 100644 index 00000000000..bf60301a104 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetOutputByteArray.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.encode; + +import org.apache.parquet.io.DelegatingPositionOutputStream; +import org.apache.parquet.io.OutputFile; +import org.apache.parquet.io.PositionOutputStream; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +public class ParquetOutputByteArray implements OutputFile { + + private final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + + public ByteArrayOutputStream getByteArrayOutputStream() { + return byteArrayOutputStream; + } + + @Override + public PositionOutputStream create(long blockSizeHint) throws IOException { + return createOrOverwrite(blockSizeHint); + } + + @Override + public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException { + return new DelegatingPositionOutputStream(byteArrayOutputStream) { + + @Override + public long getPos() throws IOException { + return byteArrayOutputStream.size(); + } + }; + } + + @Override + public boolean supportsBlockSize() { + return false; + } + + @Override + public long defaultBlockSize() { + return 1024L; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java new file mode 100644 index 00000000000..168d7d0c442 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.encode; + +import org.apache.inlong.sdk.transform.pojo.FieldInfo; +import org.apache.inlong.sdk.transform.pojo.ParquetSinkInfo; +import org.apache.inlong.sdk.transform.process.Context; + +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; + +/** + * ParquetSinkEncoder + */ +public class ParquetSinkEncoder implements SinkEncoder { + + protected ParquetSinkInfo sinkInfo; + protected Charset sinkCharset = Charset.defaultCharset(); + + private final List fields; + private ParquetByteArrayWriter writer; + + public ParquetSinkEncoder(ParquetSinkInfo sinkInfo) { + this.sinkInfo = sinkInfo; + this.fields = sinkInfo.getFields(); + ArrayList typesList = new ArrayList<>(); + for (FieldInfo fieldInfo : this.fields) { + typesList.add(Types.required(BINARY) + .as(LogicalTypeAnnotation.stringType()) + .named(fieldInfo.getName())); + } + MessageType schema = new MessageType("Output", typesList); + ParquetWriteRunner writeRunner = (record, valueWriter) -> { + for (int i = 0; i < record.length; i++) { + valueWriter.write(this.fields.get(i).getName(), record[i]); + } + }; + try { + writer = ParquetByteArrayWriter.buildWriter(schema, writeRunner); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public ByteArrayOutputStream encode(SinkData sinkData, Context context) { + int size = this.fields.size(); + Object[] rowsInfo = new Object[size]; + Arrays.fill(rowsInfo, ""); + for (int i = 0; i < size; i++) { + String fieldData = sinkData.getField(this.fields.get(i).getName()); + if (fieldData == null) { + continue; + } + rowsInfo[i] = fieldData; + } + try { + writer.write(rowsInfo); + } catch (Exception ignored) { + + } + return writer.getByteArrayOutputStream(); + } + + @Override + public List getFields() { + return this.fields; + } + public byte[] mergeByteArray(List list) { + if (list.isEmpty()) { + return null; + } + try { + this.writer.close(); // need firstly close + } catch (IOException e) { + throw new RuntimeException(e); + } + return list.get(0).toByteArray(); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetValueWriter.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetValueWriter.java new file mode 100644 index 00000000000..0e3e57b9991 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetValueWriter.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.encode; + +public interface ParquetValueWriter { + + void write(String name, Object value); +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetWriteRunner.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetWriteRunner.java new file mode 100644 index 00000000000..91c6fa590a3 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetWriteRunner.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.encode; + +public interface ParquetWriteRunner { + + /** + * Write the specified record into the Parquet row by the supplied writer. + * @param record data that needs to be written + * @param valueWriter parquet data writer + */ + void doWrite(T record, ParquetValueWriter valueWriter); +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java index 30619078ac0..0fa308162b7 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java @@ -20,6 +20,7 @@ import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo; import org.apache.inlong.sdk.transform.pojo.KvSinkInfo; import org.apache.inlong.sdk.transform.pojo.MapSinkInfo; +import org.apache.inlong.sdk.transform.pojo.ParquetSinkInfo; public class SinkEncoderFactory { @@ -34,4 +35,8 @@ public static KvSinkEncoder createKvEncoder(KvSinkInfo kvSinkInfo) { public static MapSinkEncoder createMapEncoder(MapSinkInfo mapSinkInfo) { return new MapSinkEncoder(mapSinkInfo); } + + public static ParquetSinkEncoder createParquetEncoder(ParquetSinkInfo parquetSinkInfo) { + return new ParquetSinkEncoder(parquetSinkInfo); + } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/ParquetSinkInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/ParquetSinkInfo.java new file mode 100644 index 00000000000..c54670e44a8 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/ParquetSinkInfo.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.pojo; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.ArrayList; +import java.util.List; + +/** + * ParquetSinkInfo + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class ParquetSinkInfo extends SinkInfo { + + private List fields; + + @JsonCreator + public ParquetSinkInfo( + @JsonProperty("charset") String charset, + @JsonProperty("fields") List fields) { + super(SinkInfo.PARQUET, charset); + if (fields != null) { + this.fields = fields; + } else { + this.fields = new ArrayList<>(); + } + } + + /** + * get fields + * @return the fields + */ + @JsonProperty("fields") + public List getFields() { + return fields; + } + + /** + * set fields + * @param fields the fields to set + */ + public void setFields(List fields) { + this.fields = fields; + } + +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java index 9c61c6b46c1..3c976c1b4c5 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java @@ -44,6 +44,7 @@ public abstract class SinkInfo { public static final String CSV = "csv"; public static final String KV = "kv"; public static final String ES_MAP = "es_map"; + public static final String PARQUET = "parquet"; @JsonIgnore private String type; diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java index e99a3c83c4b..3322d831992 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java @@ -17,11 +17,26 @@ package org.apache.inlong.sdk.transform.process.processor; +import org.apache.inlong.sdk.transform.decode.ParquetInputByteArray; import org.apache.inlong.sdk.transform.pojo.FieldInfo; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.convert.GroupRecordConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.schema.MessageType; + +import java.io.IOException; import java.util.ArrayList; import java.util.Base64; import java.util.List; + /** * AbstractProcessorTestBase * description: define static parameters for Processor tests @@ -123,4 +138,38 @@ protected byte[] getAvroTestData() { byte[] srcBytes = Base64.getDecoder().decode(srcString); return srcBytes; } + + public static List ParquetByteArray2CsvStr(byte[] parquetBytes) throws IOException { + InputFile inputFile = new ParquetInputByteArray(parquetBytes); + List strRows = new ArrayList<>(); + try (ParquetFileReader reader = ParquetFileReader.open(inputFile)) { + ParquetMetadata footer = reader.getFooter(); + MessageType schema = footer.getFileMetaData().getSchema(); + int fieldSize = schema.getFields().size(); + PageReadStore pages; + + while ((pages = reader.readNextRowGroup()) != null) { + long rows = pages.getRowCount(); + + ColumnIOFactory factory = new ColumnIOFactory(); + MessageColumnIO columnIO = factory.getColumnIO(schema); + + RecordMaterializer recordMaterializer = new GroupRecordConverter(schema); + + RecordReader recordReader = columnIO.getRecordReader(pages, recordMaterializer); + + for (int i = 0; i < rows; i++) { + Group group = recordReader.read(); + if (group != null) { + StringBuilder builder = new StringBuilder(); + for (int j = 0; j < fieldSize; j++) { + builder.append(group.getValueToString(j, 0) + "|"); + } + strRows.add(builder.substring(0, builder.length() - 1)); + } + } + } + } + return strRows; + } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestJson2ParquetProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestJson2ParquetProcessor.java new file mode 100644 index 00000000000..30e2d4f9e5a --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestJson2ParquetProcessor.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.processor; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.ParquetSinkEncoder; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.FieldInfo; +import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo; +import org.apache.inlong.sdk.transform.pojo.ParquetSinkInfo; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.util.HashMap; +import java.util.List; + +public class TestJson2ParquetProcessor extends AbstractProcessorTestBase { + + @Test + public void testJson2Parquet() throws Exception { + List fields; + JsonSourceInfo jsonSource; + ParquetSinkInfo parquetSinkInfo; + ParquetSinkEncoder parquetEncoder; + String transformSql; + TransformConfig config; + TransformProcessor processor; + String srcString; + List output; + List result; + byte[] bytes; + + fields = this.getTestFieldList("sid", "packageID", "msgTime", "msg"); + jsonSource = new JsonSourceInfo("UTF-8", "msgs"); + parquetSinkInfo = new ParquetSinkInfo("UTF-8", fields); + parquetEncoder = SinkEncoderFactory.createParquetEncoder(parquetSinkInfo); + transformSql = "select $root.sid,$root.packageID,$child.msgTime,$child.msg from source"; + config = new TransformConfig(transformSql); + // case1 + processor = TransformProcessor + .create(config, SourceDecoderFactory.createJsonDecoder(jsonSource), + parquetEncoder); + srcString = "{\n" + + " \"sid\":\"value1\",\n" + + " \"packageID\":\"value2\",\n" + + " \"msgs\":[\n" + + " {\"msg\":\"value4\",\"msgTime\":1713243918000},\n" + + " {\"msg\":\"v4\",\"msgTime\":1713243918000}\n" + + " ]\n" + + "}"; + output = processor.transform(srcString, new HashMap<>()); + bytes = parquetEncoder.mergeByteArray(output); + result = ParquetByteArray2CsvStr(bytes); + Assert.assertEquals(2, result.size()); + Assert.assertEquals("value1|value2|1713243918000|value4", result.get(0)); + Assert.assertEquals("value1|value2|1713243918000|v4", result.get(1)); + + fields = this.getTestFieldList("id", "itemId", "subItemId", "msg"); + jsonSource = new JsonSourceInfo("UTF-8", "items"); + parquetSinkInfo = new ParquetSinkInfo("UTF-8", fields); + parquetEncoder = SinkEncoderFactory.createParquetEncoder(parquetSinkInfo); + transformSql = "select $root.id,$child.itemId,$child.subItems(0).subItemId,$child.subItems(1).msg from source"; + config = new TransformConfig(transformSql); + // case2 + processor = TransformProcessor + .create(config, SourceDecoderFactory.createJsonDecoder(jsonSource), + parquetEncoder); + srcString = "{\n" + + " \"id\":\"value1\",\n" + + " \"name\":\"value2\",\n" + + " \"items\":[\n" + + " {\"itemId\":\"item1\",\n" + + " \"subItems\":[\n" + + " {\"subItemId\":\"1001\", \"msg\":\"1001msg\"},\n" + + " {\"subItemId\":\"1002\", \"msg\":\"1002msg\"}\n" + + " ]\n" + + " },\n" + + " {\"itemId\":\"item2\",\n" + + " \"subItems\":[\n" + + " {\"subItemId\":\"2001\", \"msg\":\"2001msg\"},\n" + + " {\"subItemId\":\"2002\", \"msg\":\"2002msg\"}\n" + + " ]\n" + + " }\n" + + " ]\n" + + "}"; + output = processor.transform(srcString, new HashMap<>()); + bytes = parquetEncoder.mergeByteArray(output); + result = ParquetByteArray2CsvStr(bytes); + Assert.assertEquals(2, result.size()); + Assert.assertEquals("value1|item1|1001|1002msg", result.get(0)); + Assert.assertEquals("value1|item2|2001|2002msg", result.get(1)); + + fields = this.getTestFieldList("matrix(0,0)", "matrix(1,1)", "matrix(2,2)"); + jsonSource = new JsonSourceInfo("UTF-8", ""); + parquetSinkInfo = new ParquetSinkInfo("UTF-8", fields); + parquetEncoder = SinkEncoderFactory.createParquetEncoder(parquetSinkInfo); + transformSql = "select $root.matrix(0, 0), $root.matrix(1, 1), $root.matrix(2, 2) from source"; + config = new TransformConfig(transformSql); + // case3 + processor = TransformProcessor + .create(config, SourceDecoderFactory.createJsonDecoder(jsonSource), + parquetEncoder); + srcString = "{\n" + + " \"matrix\": [\n" + + " [1, 2, 3],\n" + + " [4, 5, 6],\n" + + " [7, 8, 9]\n" + + " ]\n" + + "}"; + output = processor.transform(srcString, new HashMap<>()); + bytes = parquetEncoder.mergeByteArray(output); + result = ParquetByteArray2CsvStr(bytes); + Assert.assertEquals(1, result.size()); + Assert.assertEquals("1|5|9", result.get(0)); + + fields = this.getTestFieldList("department_name", "course_id", "num"); + jsonSource = new JsonSourceInfo("UTF-8", ""); + parquetSinkInfo = new ParquetSinkInfo("UTF-8", fields); + parquetEncoder = SinkEncoderFactory.createParquetEncoder(parquetSinkInfo); + transformSql = + "select $root.departments(0).name, $root.departments(0).courses(0,1).courseId, sqrt($root.departments(0).courses(0,1).courseId - 2) from source"; + config = new TransformConfig(transformSql); + // case4 + processor = TransformProcessor + .create(config, SourceDecoderFactory.createJsonDecoder(jsonSource), + parquetEncoder); + srcString = "{\n" + + " \"departments\": [\n" + + " {\n" + + " \"name\": \"Mathematics\",\n" + + " \"courses\": [\n" + + " [\n" + + " {\"courseId\": \"101\", \"title\": \"Calculus I\"},\n" + + " {\"courseId\": \"102\", \"title\": \"Linear Algebra\"}\n" + + " ],\n" + + " [\n" + + " {\"courseId\": \"201\", \"title\": \"Calculus II\"},\n" + + " {\"courseId\": \"202\", \"title\": \"Abstract Algebra\"}\n" + + " ]\n" + + " ]\n" + + " }\n" + + " ]\n" + + "}"; + output = processor.transform(srcString, new HashMap<>()); + bytes = parquetEncoder.mergeByteArray(output); + result = ParquetByteArray2CsvStr(bytes); + Assert.assertEquals(1, result.size()); + Assert.assertEquals("Mathematics|102|10.0", result.get(0)); + } + + @Test + public void testJson2ParquetForOne() throws Exception { + List fields; + JsonSourceInfo jsonSource; + ParquetSinkInfo parquetSinkInfo; + ParquetSinkEncoder parquetEncoder; + String transformSql; + TransformConfig config; + TransformProcessor processor; + String srcString; + List output; + List result; + byte[] bytes; + + fields = this.getTestFieldList("sid", "packageID", "msgTime", "msg"); + jsonSource = new JsonSourceInfo("UTF-8", ""); + parquetSinkInfo = new ParquetSinkInfo("UTF-8", fields); + parquetEncoder = SinkEncoderFactory.createParquetEncoder(parquetSinkInfo); + transformSql = "select $root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).msg from source"; + config = new TransformConfig(transformSql); + // case1 + processor = TransformProcessor + .create(config, SourceDecoderFactory.createJsonDecoder(jsonSource), + parquetEncoder); + srcString = "{\n" + + " \"sid\":\"value1\",\n" + + " \"packageID\":\"value2\",\n" + + " \"msgs\":[\n" + + " {\"msg\":\"value4\",\"msgTime\":1713243918000},\n" + + " {\"msg\":\"v4\",\"msgTime\":1713243918000}\n" + + " ]\n" + + "}"; + output = processor.transform(srcString, new HashMap<>()); + bytes = parquetEncoder.mergeByteArray(output); + result = ParquetByteArray2CsvStr(bytes); + Assert.assertEquals(1, result.size()); + Assert.assertEquals("value1|value2|1713243918000|value4", result.get(0)); + } +}