Skip to content

Commit

Permalink
[INLONG-11611][SDK] Transform SDK supports RowData source and sink
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng authored and vernedeng committed Dec 18, 2024
1 parent d31c887 commit 078b72d
Show file tree
Hide file tree
Showing 20 changed files with 319 additions and 10 deletions.
18 changes: 18 additions & 0 deletions inlong-sdk/transform-sdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@
<artifactId>sdk-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
Expand Down Expand Up @@ -110,6 +116,18 @@
<groupId>org.dom4j</groupId>
<artifactId>dom4j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-format-common</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-format-rowdata-base</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public AvroSourceDecoder(AvroSourceInfo sourceInfo) {
}

@Override
public SourceData decode(byte[] srcBytes, Context context) {
public SourceData decodeBytes(byte[] srcBytes, Context context) {
try {
InputStream inputStream = new ByteArrayInputStream(srcBytes);
DataFileStream<GenericRecord> dataFileStream =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public BsonSourceDecoder(BsonSourceInfo sourceInfo) {
}

@Override
public SourceData decode(byte[] srcBytes, Context context) {
public SourceData decodeBytes(byte[] srcBytes, Context context) {
return decoder.decode(parse(srcBytes), context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public CsvSourceDecoder(CsvSourceInfo sourceInfo) {
}

@Override
public SourceData decode(byte[] srcBytes, Context context) {
public SourceData decodeBytes(byte[] srcBytes, Context context) {
String srcString = new String(srcBytes, srcCharset);
return this.decode(srcString, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public JsonSourceDecoder(JsonSourceInfo sourceInfo) {
* @return
*/
@Override
public SourceData decode(byte[] srcBytes, Context context) {
public SourceData decodeBytes(byte[] srcBytes, Context context) {
String srcString = new String(srcBytes, srcCharset);
return this.decode(srcString, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public KvSourceDecoder(KvSourceInfo sourceInfo) {
}

@Override
public SourceData decode(byte[] srcBytes, Context context) {
public SourceData decodeBytes(byte[] srcBytes, Context context) {
String srcString = new String(srcBytes, srcCharset);
return this.decode(srcString, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public ParquetSourceDecoder(ParquetSourceInfo sourceInfo) {

@SuppressWarnings("unchecked")
@Override
public SourceData decode(byte[] srcBytes, Context context) {
public SourceData decodeBytes(byte[] srcBytes, Context context) {
try {
// Create a custom InputFile
InputFile inputFile = new ParquetInputByteArray(srcBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public PbSourceDecoder(PbSourceInfo sourceInfo) {
*/
@SuppressWarnings("unchecked")
@Override
public SourceData decode(byte[] srcBytes, Context context) {
public SourceData decodeBytes(byte[] srcBytes, Context context) {
try {
// decode
DynamicMessage.Builder builder = DynamicMessage.newBuilder(rootDesc);
Expand Down Expand Up @@ -151,4 +151,9 @@ public SourceData decode(byte[] srcBytes, Context context) {
return null;
}
}

@Override
public SourceData decode(byte[] bytes, Context context) {
return decodeBytes(bytes, context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.decode;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.table.data.RowData;
import org.apache.inlong.sort.formats.base.RowDataToFieldConverters;

import java.util.Map;

@Slf4j
public class RowDataSourceData implements SourceData {
private final RowData rowData;
private final Map<String, Integer> fieldPositionMap;
private final RowDataToFieldConverters.RowFieldConverter[] converters;

public RowDataSourceData(
RowData rowData,
Map<String, Integer> fieldPositionMap,
RowDataToFieldConverters.RowFieldConverter[] converters) {
this.rowData = rowData;
this.fieldPositionMap = fieldPositionMap;
this.converters = converters;
}

@Override
public int getRowCount() {
return 1;
}

@Override
public Object getField(int rowNum, String fieldName) {
if (rowNum != 0) {
return null;
}
try {
int fieldPosition = fieldPositionMap.get(fieldName);
return converters[fieldPosition].convert(rowData, fieldPosition);
} catch (Throwable e) {
log.error("failed to convert field={}", fieldName, e);
return null;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.decode;

import org.apache.flink.table.data.RowData;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.pojo.RowDataSourceInfo;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sort.formats.base.RowDataToFieldConverters;
import org.apache.inlong.sort.formats.base.TableFormatForRowDataUtils;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class RowDataSourceDecoder extends SourceDecoder<RowData> {

private final Map<String, Integer> fieldPositionMap;
private final RowDataToFieldConverters.RowFieldConverter[] rowFieldConverters;

public RowDataSourceDecoder(RowDataSourceInfo sourceInfo) {
super(sourceInfo.getFields());
List<FieldInfo> fields = sourceInfo.getFields();
this.fieldPositionMap = parseFieldPositionMap(fields);

rowFieldConverters = new RowDataToFieldConverters.RowFieldConverter[fields.size()];
for (int i = 0; i < rowFieldConverters.length; i++) {
rowFieldConverters[i] = RowDataToFieldConverters.createNullableRowFieldConverter(
TableFormatForRowDataUtils.deriveLogicalType(fields.get(i).getFormatInfo()));
}
}

private Map<String, Integer> parseFieldPositionMap(List<FieldInfo> fields) {
Map<String, Integer> map = new HashMap<>();
for (int i =0; i < fields.size(); i++) {
map.put(fields.get(i).getName(), i);
}
return map;
}

@Override
public SourceData decodeBytes(byte[] srcBytes, Context context) {
throw new UnsupportedOperationException("do not support decoding bytes for row data decoder");
}

@Override
public SourceData decode(RowData rowData, Context context) {
return new RowDataSourceData(rowData, fieldPositionMap, rowFieldConverters);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public SourceDecoder(List<FieldInfo> fields) {
this.fields = fields;
}

public abstract SourceData decode(byte[] srcBytes, Context context);
public abstract SourceData decodeBytes(byte[] srcBytes, Context context);

public abstract SourceData decode(Input input, Context context);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
import org.apache.inlong.sdk.transform.pojo.ParquetSourceInfo;
import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
import org.apache.inlong.sdk.transform.pojo.RowDataSourceInfo;
import org.apache.inlong.sdk.transform.pojo.XmlSourceInfo;
import org.apache.inlong.sdk.transform.pojo.YamlSourceInfo;

Expand Down Expand Up @@ -65,4 +66,8 @@ public static YamlSourceDecoder createYamlDecoder(YamlSourceInfo sourceInfo) {
return new YamlSourceDecoder(sourceInfo);
}

public static RowDataSourceDecoder createRowDecoder(RowDataSourceInfo sourceInfo) {
return new RowDataSourceDecoder(sourceInfo);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public XmlSourceDecoder(XmlSourceInfo sourceInfo) {
}

@Override
public SourceData decode(byte[] srcBytes, Context context) {
public SourceData decodeBytes(byte[] srcBytes, Context context) {
String srcString = new String(srcBytes, srcCharset);
return this.decode(srcString, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public YamlSourceDecoder(YamlSourceInfo sourceInfo) {
}
}
@Override
public SourceData decode(byte[] srcBytes, Context context) {
public SourceData decodeBytes(byte[] srcBytes, Context context) {
String srcString = new String(srcBytes, srcCharset);
return this.decode(srcString, context);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.encode;

import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.pojo.RowDataSinkInfo;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
import org.apache.inlong.sort.formats.base.TableFormatUtils;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class RowDataSinkEncoder extends SinkEncoder<RowData> {

private final FieldToRowDataConverters.FieldToRowDataConverter[] fieldToRowDataConverters;
private final Map<String, Integer> fieldPositionMap;

public RowDataSinkEncoder(RowDataSinkInfo sinkInfo) {
super(sinkInfo.getFields());
this.fieldPositionMap = parseFieldPositionMap(fields);

fieldToRowDataConverters = new FieldToRowDataConverters.FieldToRowDataConverter[fields.size()];
for (int i = 0; i < fields.size(); i++) {
fieldToRowDataConverters[i] = FieldToRowDataConverters.createConverter(
TableFormatUtils.deriveLogicalType(fields.get(i).getFormatInfo()));
}
}

private Map<String, Integer> parseFieldPositionMap(List<FieldInfo> fields) {
Map<String, Integer> map = new HashMap<>();
for (int i =0; i < fields.size(); i++) {
map.put(fields.get(i).getName(), i);
}
return map;
}

@Override
public RowData encode(SinkData sinkData, Context context) {
GenericRowData rowData = new GenericRowData(fieldToRowDataConverters.length);

for (int i = 0; i < fields.size(); i++) {
String fieldName = fields.get(i).getName();
String fieldValue = sinkData.getField(fieldName);
rowData.setField(i, fieldToRowDataConverters[i].convert(fieldValue));
}

return rowData;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
import org.apache.inlong.sdk.transform.pojo.ParquetSinkInfo;
import org.apache.inlong.sdk.transform.pojo.PbSinkInfo;
import org.apache.inlong.sdk.transform.pojo.RowDataSinkInfo;

public class SinkEncoderFactory {

Expand All @@ -45,4 +46,8 @@ public static PbSinkEncoder createPbEncoder(PbSinkInfo pbSinkInfo) {
return new PbSinkEncoder(pbSinkInfo);
}

public static RowDataSinkEncoder createRowEncoder(RowDataSinkInfo rowDataSinkInfo) {
return new RowDataSinkEncoder(rowDataSinkInfo);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.sdk.transform.pojo;

import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
import org.apache.inlong.sdk.transform.process.converter.TypeConverter;

import lombok.Data;
Expand All @@ -28,6 +29,7 @@
public class FieldInfo {

private String name;
private FormatInfo formatInfo;
private TypeConverter converter = TypeConverter.DefaultTypeConverter();

public FieldInfo() {
Expand All @@ -42,4 +44,9 @@ public FieldInfo(String name, TypeConverter converter) {
this.name = name;
this.converter = converter;
}

public FieldInfo(String name, TypeConverter converter, FormatInfo formatInfo) {
this(name, converter);
this.formatInfo = formatInfo;
}
}
Loading

0 comments on commit 078b72d

Please sign in to comment.