Skip to content

Commit

Permalink
[INLONG-11030][SDK] Add AVRO formatted data source for Transform (#11082
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ying-hua authored Sep 14, 2024
1 parent af9acab commit 9df3d56
Show file tree
Hide file tree
Showing 7 changed files with 493 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.decode;

import lombok.Data;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.List;

/**
* AvroNode
*/
@Data
public class AvroNode {

private String name;
private boolean isArray = false;
private List<Integer> arrayIndices = new ArrayList<>();

public AvroNode(String nodeString) {
int beginIndex = nodeString.indexOf('(');
if (beginIndex < 0) {
this.name = nodeString;
} else {
this.name = StringUtils.trim(nodeString.substring(0, beginIndex));
int endIndex = nodeString.lastIndexOf(')');
if (endIndex >= 0) {
this.isArray = true;
String indicesString = nodeString.substring(beginIndex + 1, endIndex);
String[] indices = indicesString.split(",");
for (String index : indices) {
int arrayIndex = NumberUtils.toInt(StringUtils.trim(index), -1);
if (arrayIndex < 0) {
arrayIndex = 0;
}
this.arrayIndices.add(arrayIndex);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.decode;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.StringUtils;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

public class AvroSourceData implements SourceData {

public static final String ROOT_KEY = "$root";

public static final String CHILD_KEY = "$child";

private GenericRecord root;

private List<GenericRecord> childRoot;

private Charset srcCharset;

public AvroSourceData(GenericRecord root, List<GenericRecord> childRoot, Charset srcCharset) {
this.root = root;
this.childRoot = childRoot;
this.srcCharset = srcCharset;
}

@Override
public int getRowCount() {
if (this.childRoot == null) {
return 1;
} else {
return this.childRoot.size();
}
}

@Override
public String getField(int rowNum, String fieldName) {
try {
List<AvroNode> childNodes = new ArrayList<>();
String[] nodeStrings = fieldName.split("\\.");
for (String nodeString : nodeStrings) {
childNodes.add(new AvroNode(nodeString));
}
// parse
if (childNodes.size() == 0) {
return "";
}
// first node
AvroNode firstNode = childNodes.get(0);
Object current = root;
Schema curSchema = root.getSchema();
if (StringUtils.equals(ROOT_KEY, firstNode.getName())) {
current = root;
curSchema = root.getSchema();
} else if (StringUtils.equals(CHILD_KEY, firstNode.getName())) {
if (rowNum < childRoot.size()) {
current = childRoot.get(rowNum);
curSchema = childRoot.get(rowNum).getSchema();
} else {
return "";
}
} else {
// error data
return "";
}
if (current == null) {
// error data
return "";
}
// parse other node
for (int i = 1; i < childNodes.size(); i++) {
AvroNode node = childNodes.get(i);
if (curSchema.getType() != Type.RECORD) {
// error data
return "";
}
Object newElement = ((GenericRecord) current).get(node.getName());
if (newElement == null) {
// error data
return "";
}
// node is not array
if (!node.isArray()) {
curSchema = curSchema.getField(node.getName()).schema();
current = newElement;
continue;
}
// node is an array
current = getElementFromArray(node, newElement, curSchema);
if (current == null) {
// error data
return "";
}
}
return getNodeAsString(current, curSchema);
} catch (Exception e) {
return "";
}
}

private Object getElementFromArray(AvroNode node, Object curElement, Schema curSchema) {
if (node.getArrayIndices().isEmpty()) {
// error data
return null;
}
for (int index : node.getArrayIndices()) {
if (curSchema.getType() != Type.ARRAY) {
// error data
return null;
}
List<?> newArray = (List<?>) curElement;
if (index >= newArray.size()) {
// error data
return null;
}
curSchema = curSchema.getElementType();
curElement = newArray.get(index);
}
return curElement;
}

private String getNodeAsString(Object node, Schema schema) {
String fieldValue = "";
Type fieldType = schema.getType();
switch (fieldType) {
case INT:
case LONG:
case FLOAT:
case DOUBLE:
case STRING:
case BOOLEAN:
case ENUM:
fieldValue = String.valueOf(node);
break;
case BYTES:
ByteBuffer byteBuffer = (ByteBuffer) node;
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
fieldValue = new String(bytes, srcCharset);
break;
case FIXED:
byteBuffer = (ByteBuffer) node;
bytes = new byte[schema.getFixedSize()];
fieldValue = new String(bytes, srcCharset);
}
return fieldValue;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.decode;

import org.apache.inlong.sdk.transform.pojo.AvroSourceInfo;
import org.apache.inlong.sdk.transform.process.Context;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

public class AvroSourceDecoder implements SourceDecoder<byte[]> {

private static final Logger LOG = LoggerFactory.getLogger(AvroSourceDecoder.class);

protected AvroSourceInfo sourceInfo;
private Charset srcCharset = Charset.defaultCharset();
private String rowsNodePath;
private List<AvroNode> childNodes;

public AvroSourceDecoder(AvroSourceInfo sourceInfo) {
try {
this.sourceInfo = sourceInfo;
if (!StringUtils.isBlank(sourceInfo.getCharset())) {
this.srcCharset = Charset.forName(sourceInfo.getCharset());
}
this.rowsNodePath = sourceInfo.getRowsNodePath();
if (!StringUtils.isBlank(rowsNodePath)) {
this.childNodes = new ArrayList<>();
String[] nodeStrings = this.rowsNodePath.split("\\.");
for (String nodeString : nodeStrings) {
this.childNodes.add(new AvroNode(nodeString));
}
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new TransformException(e.getMessage(), e);
}
}

@Override
public SourceData decode(byte[] srcBytes, Context context) {
try {
InputStream inputStream = new ByteArrayInputStream(srcBytes);
DataFileStream<GenericRecord> dataFileStream =
new DataFileStream<>(inputStream, new GenericDatumReader<>());
GenericRecord root = dataFileStream.next();
List<GenericRecord> childRoot = null;
if (CollectionUtils.isEmpty(childNodes)) {
return new AvroSourceData(root, null, srcCharset);
}

Object current = root;
Schema curSchema = root.getSchema();

for (AvroNode node : childNodes) {
if (curSchema.getType() != Type.RECORD) {
// error data
return new AvroSourceData(root, null, srcCharset);
}
Object newElement = ((GenericRecord) current).get(node.getName());
if (newElement == null) {
// error data
return new AvroSourceData(root, null, srcCharset);
}
// node is not array
if (!node.isArray()) {
curSchema = curSchema.getField(node.getName()).schema();
current = newElement;
continue;
}
// node is an array
current = getElementFromArray(node, newElement, curSchema);
if (current == null) {
// error data
return new AvroSourceData(root, null, srcCharset);
}
}
if (curSchema.getType() != Type.ARRAY) {
// error data
return new AvroSourceData(root, null, srcCharset);
}
childRoot = (List<GenericRecord>) current;
return new AvroSourceData(root, childRoot, srcCharset);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
return null;
}
}

private Object getElementFromArray(AvroNode node, Object curElement, Schema curSchema) {
if (node.getArrayIndices().isEmpty()) {
// error data
return null;
}
for (int index : node.getArrayIndices()) {
if (curSchema.getType() != Type.ARRAY) {
// error data
return null;
}
List<?> newArray = (List<?>) curElement;
if (index >= newArray.size()) {
// error data
return null;
}
curSchema = curSchema.getElementType();
curElement = newArray.get(index);
}
return curElement;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.sdk.transform.decode;

import org.apache.inlong.sdk.transform.pojo.AvroSourceInfo;
import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo;
import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
Expand All @@ -39,4 +40,8 @@ public static JsonSourceDecoder createJsonDecoder(JsonSourceInfo sourceInfo) {
public static PbSourceDecoder createPbDecoder(PbSourceInfo sourceInfo) {
return new PbSourceDecoder(sourceInfo);
}

public static AvroSourceDecoder createAvroDecoder(AvroSourceInfo sourceInfo) {
return new AvroSourceDecoder(sourceInfo);
}
}
Loading

0 comments on commit 9df3d56

Please sign in to comment.