Skip to content

Commit

Permalink
[INLONG-11227][SDK] Add Parquet formatted data sink for Transform (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
Zkplo authored Oct 9, 2024
1 parent e806ccf commit f489c51
Show file tree
Hide file tree
Showing 10 changed files with 721 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.encode;

import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;

import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;

import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;

public final class ParquetByteArrayWriter<T> implements Closeable {

private final org.apache.parquet.hadoop.ParquetWriter<T> writer;
private final ParquetOutputByteArray outputByteArray;

public static <T> ParquetByteArrayWriter<T> buildWriter(MessageType schema, ParquetWriteRunner<T> writeRunner)
throws IOException {
return new ParquetByteArrayWriter<>(new ParquetOutputByteArray(), schema, writeRunner);
}

private ParquetByteArrayWriter(ParquetOutputByteArray outputFile, MessageType schema,
ParquetWriteRunner<T> writeRunner)
throws IOException {
this.writer = new Builder<T>(outputFile)
.withType(schema)
.withWriteRunner(writeRunner)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
.build();
outputByteArray = outputFile;
}

@Override
public void close() throws IOException {
this.writer.close();
}

public void write(T record) throws IOException {
this.writer.write(record);
}

public ByteArrayOutputStream getByteArrayOutputStream() {
return outputByteArray.getByteArrayOutputStream();
}

private static final class Builder<T>
extends
org.apache.parquet.hadoop.ParquetWriter.Builder<T, ParquetByteArrayWriter.Builder<T>> {

private MessageType schema;
private ParquetWriteRunner<T> writeRunner;

private Builder(OutputFile file) {
super(file);
}

public Builder<T> withType(MessageType schema) {
this.schema = schema;
return this;
}

public Builder<T> withWriteRunner(ParquetWriteRunner<T> writeRunner) {
this.writeRunner = writeRunner;
return this;
}

@Override
protected Builder<T> self() {
return this;
}

@Override
protected WriteSupport<T> getWriteSupport(Configuration conf) {
return new ParquetByteArrayWriter.SimpleWriteSupport<>(schema, writeRunner);
}
}

private static class SimpleWriteSupport<T> extends WriteSupport<T> {

private final MessageType schema;
private final ParquetWriteRunner<T> writeRunner;
private final ParquetValueWriter valueWriter;

private RecordConsumer recordConsumer;

SimpleWriteSupport(MessageType schema, ParquetWriteRunner<T> writeRunner) {
this.schema = schema;
this.writeRunner = writeRunner;
this.valueWriter = this::write;
}

public void write(String name, Object value) {
int fieldIndex = schema.getFieldIndex(name);
PrimitiveType type = schema.getType(fieldIndex).asPrimitiveType();
recordConsumer.startField(name, fieldIndex);

switch (type.getPrimitiveTypeName()) {
case INT32:
recordConsumer.addInteger((int) value);
break;
case INT64:
recordConsumer.addLong((long) value);
break;
case DOUBLE:
recordConsumer.addDouble((double) value);
break;
case BOOLEAN:
recordConsumer.addBoolean((boolean) value);
break;
case FLOAT:
recordConsumer.addFloat((float) value);
break;
case BINARY:
if (type.getLogicalTypeAnnotation() == LogicalTypeAnnotation.stringType()) {
recordConsumer.addBinary(Binary.fromString((String) value));
} else {
throw new UnsupportedOperationException(
"Don't support writing " + type.getLogicalTypeAnnotation());
}
break;
default:
throw new UnsupportedOperationException("Don't support writing " + type.getPrimitiveTypeName());
}
recordConsumer.endField(name, fieldIndex);
}

@Override
public WriteContext init(Configuration configuration) {
return new WriteContext(schema, Collections.emptyMap());
}

@Override
public void prepareForWrite(RecordConsumer recordConsumer) {
this.recordConsumer = recordConsumer;
}

@Override
public void write(T record) {
recordConsumer.startMessage();
writeRunner.doWrite(record, valueWriter);
recordConsumer.endMessage();
}

@Override
public String getName() {
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.encode;

import org.apache.parquet.io.DelegatingPositionOutputStream;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.PositionOutputStream;

import java.io.ByteArrayOutputStream;
import java.io.IOException;

public class ParquetOutputByteArray implements OutputFile {

private final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();

public ByteArrayOutputStream getByteArrayOutputStream() {
return byteArrayOutputStream;
}

@Override
public PositionOutputStream create(long blockSizeHint) throws IOException {
return createOrOverwrite(blockSizeHint);
}

@Override
public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
return new DelegatingPositionOutputStream(byteArrayOutputStream) {

@Override
public long getPos() throws IOException {
return byteArrayOutputStream.size();
}
};
}

@Override
public boolean supportsBlockSize() {
return false;
}

@Override
public long defaultBlockSize() {
return 1024L;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.encode;

import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.pojo.ParquetSinkInfo;
import org.apache.inlong.sdk.transform.process.Context;

import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;

/**
* ParquetSinkEncoder
*/
public class ParquetSinkEncoder implements SinkEncoder<ByteArrayOutputStream> {

protected ParquetSinkInfo sinkInfo;
protected Charset sinkCharset = Charset.defaultCharset();

private final List<FieldInfo> fields;
private ParquetByteArrayWriter<Object[]> writer;

public ParquetSinkEncoder(ParquetSinkInfo sinkInfo) {
this.sinkInfo = sinkInfo;
this.fields = sinkInfo.getFields();
ArrayList<Type> typesList = new ArrayList<>();
for (FieldInfo fieldInfo : this.fields) {
typesList.add(Types.required(BINARY)
.as(LogicalTypeAnnotation.stringType())
.named(fieldInfo.getName()));
}
MessageType schema = new MessageType("Output", typesList);
ParquetWriteRunner<Object[]> writeRunner = (record, valueWriter) -> {
for (int i = 0; i < record.length; i++) {
valueWriter.write(this.fields.get(i).getName(), record[i]);
}
};
try {
writer = ParquetByteArrayWriter.buildWriter(schema, writeRunner);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public ByteArrayOutputStream encode(SinkData sinkData, Context context) {
int size = this.fields.size();
Object[] rowsInfo = new Object[size];
Arrays.fill(rowsInfo, "");
for (int i = 0; i < size; i++) {
String fieldData = sinkData.getField(this.fields.get(i).getName());
if (fieldData == null) {
continue;
}
rowsInfo[i] = fieldData;
}
try {
writer.write(rowsInfo);
} catch (Exception ignored) {

}
return writer.getByteArrayOutputStream();
}

@Override
public List<FieldInfo> getFields() {
return this.fields;
}
public byte[] mergeByteArray(List<ByteArrayOutputStream> list) {
if (list.isEmpty()) {
return null;
}
try {
this.writer.close(); // need firstly close
} catch (IOException e) {
throw new RuntimeException(e);
}
return list.get(0).toByteArray();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.encode;

public interface ParquetValueWriter {

void write(String name, Object value);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.encode;

public interface ParquetWriteRunner<T> {

/**
* Write the specified record into the Parquet row by the supplied writer.
* @param record data that needs to be written
* @param valueWriter parquet data writer
*/
void doWrite(T record, ParquetValueWriter valueWriter);
}
Loading

0 comments on commit f489c51

Please sign in to comment.