Skip to content

Commit

Permalink
[INLONG-11611][SDK] Transform SDK supports RowData source and sink (#…
Browse files Browse the repository at this point in the history
…11613)

Co-authored-by: vernedeng <[email protected]>
  • Loading branch information
vernedeng and vernedeng authored Dec 19, 2024
1 parent d31c887 commit 3a297fb
Show file tree
Hide file tree
Showing 12 changed files with 382 additions and 0 deletions.
18 changes: 18 additions & 0 deletions inlong-sdk/transform-sdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@
<artifactId>sdk-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
Expand Down Expand Up @@ -110,6 +116,18 @@
<groupId>org.dom4j</groupId>
<artifactId>dom4j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-format-common</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-format-rowdata-base</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.decode;

import org.apache.inlong.sort.formats.base.RowDataToFieldConverters;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.table.data.RowData;

import java.util.Map;

@Slf4j
public class RowDataSourceData implements SourceData {

private final RowData rowData;
private final Map<String, Integer> fieldPositionMap;
private final RowDataToFieldConverters.RowFieldConverter[] converters;

public RowDataSourceData(
RowData rowData,
Map<String, Integer> fieldPositionMap,
RowDataToFieldConverters.RowFieldConverter[] converters) {
this.rowData = rowData;
this.fieldPositionMap = fieldPositionMap;
this.converters = converters;
}

@Override
public int getRowCount() {
return 1;
}

@Override
public Object getField(int rowNum, String fieldName) {
if (rowNum != 0) {
return null;
}
try {
int fieldPosition = fieldPositionMap.get(fieldName);
return converters[fieldPosition].convert(rowData, fieldPosition);
} catch (Throwable e) {
log.error("failed to convert field={}", fieldName, e);
return null;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.decode;

import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.pojo.RowDataSourceInfo;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sort.formats.base.RowDataToFieldConverters;
import org.apache.inlong.sort.formats.base.TableFormatForRowDataUtils;

import org.apache.flink.table.data.RowData;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class RowDataSourceDecoder extends SourceDecoder<RowData> {

private final Map<String, Integer> fieldPositionMap;
private final RowDataToFieldConverters.RowFieldConverter[] rowFieldConverters;

public RowDataSourceDecoder(RowDataSourceInfo sourceInfo) {
super(sourceInfo.getFields());
List<FieldInfo> fields = sourceInfo.getFields();
this.fieldPositionMap = parseFieldPositionMap(fields);

rowFieldConverters = new RowDataToFieldConverters.RowFieldConverter[fields.size()];
for (int i = 0; i < rowFieldConverters.length; i++) {
rowFieldConverters[i] = RowDataToFieldConverters.createNullableRowFieldConverter(
TableFormatForRowDataUtils.deriveLogicalType(fields.get(i).getFormatInfo()));
}
}

private Map<String, Integer> parseFieldPositionMap(List<FieldInfo> fields) {
Map<String, Integer> map = new HashMap<>();
for (int i = 0; i < fields.size(); i++) {
map.put(fields.get(i).getName(), i);
}
return map;
}

@Override
public SourceData decode(byte[] srcBytes, Context context) {
throw new UnsupportedOperationException("do not support decoding bytes for row data decoder");
}

@Override
public SourceData decode(RowData rowData, Context context) {
return new RowDataSourceData(rowData, fieldPositionMap, rowFieldConverters);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
import org.apache.inlong.sdk.transform.pojo.ParquetSourceInfo;
import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
import org.apache.inlong.sdk.transform.pojo.RowDataSourceInfo;
import org.apache.inlong.sdk.transform.pojo.XmlSourceInfo;
import org.apache.inlong.sdk.transform.pojo.YamlSourceInfo;

Expand Down Expand Up @@ -65,4 +66,8 @@ public static YamlSourceDecoder createYamlDecoder(YamlSourceInfo sourceInfo) {
return new YamlSourceDecoder(sourceInfo);
}

public static RowDataSourceDecoder createRowDecoder(RowDataSourceInfo sourceInfo) {
return new RowDataSourceDecoder(sourceInfo);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.encode;

import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.pojo.RowDataSinkInfo;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
import org.apache.inlong.sort.formats.base.TableFormatUtils;

import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class RowDataSinkEncoder extends SinkEncoder<RowData> {

private final FieldToRowDataConverters.FieldToRowDataConverter[] fieldToRowDataConverters;
private final Map<String, Integer> fieldPositionMap;

public RowDataSinkEncoder(RowDataSinkInfo sinkInfo) {
super(sinkInfo.getFields());
this.fieldPositionMap = parseFieldPositionMap(fields);

fieldToRowDataConverters = new FieldToRowDataConverters.FieldToRowDataConverter[fields.size()];
for (int i = 0; i < fields.size(); i++) {
fieldToRowDataConverters[i] = FieldToRowDataConverters.createConverter(
TableFormatUtils.deriveLogicalType(fields.get(i).getFormatInfo()));
}
}

private Map<String, Integer> parseFieldPositionMap(List<FieldInfo> fields) {
Map<String, Integer> map = new HashMap<>();
for (int i = 0; i < fields.size(); i++) {
map.put(fields.get(i).getName(), i);
}
return map;
}

@Override
public RowData encode(SinkData sinkData, Context context) {
GenericRowData rowData = new GenericRowData(fieldToRowDataConverters.length);

for (int i = 0; i < fields.size(); i++) {
String fieldName = fields.get(i).getName();
String fieldValue = sinkData.getField(fieldName);
rowData.setField(i, fieldToRowDataConverters[i].convert(fieldValue));
}

return rowData;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
import org.apache.inlong.sdk.transform.pojo.ParquetSinkInfo;
import org.apache.inlong.sdk.transform.pojo.PbSinkInfo;
import org.apache.inlong.sdk.transform.pojo.RowDataSinkInfo;

public class SinkEncoderFactory {

Expand All @@ -45,4 +46,8 @@ public static PbSinkEncoder createPbEncoder(PbSinkInfo pbSinkInfo) {
return new PbSinkEncoder(pbSinkInfo);
}

public static RowDataSinkEncoder createRowEncoder(RowDataSinkInfo rowDataSinkInfo) {
return new RowDataSinkEncoder(rowDataSinkInfo);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.sdk.transform.pojo;

import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
import org.apache.inlong.sdk.transform.process.converter.TypeConverter;

import lombok.Data;
Expand All @@ -28,6 +29,7 @@
public class FieldInfo {

private String name;
private FormatInfo formatInfo;
private TypeConverter converter = TypeConverter.DefaultTypeConverter();

public FieldInfo() {
Expand All @@ -42,4 +44,9 @@ public FieldInfo(String name, TypeConverter converter) {
this.name = name;
this.converter = converter;
}

public FieldInfo(String name, TypeConverter converter, FormatInfo formatInfo) {
this(name, converter);
this.formatInfo = formatInfo;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.pojo;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
import lombok.experimental.SuperBuilder;

import java.util.List;

@JsonIgnoreProperties(ignoreUnknown = true)
@Data
@SuperBuilder
public class RowDataSinkInfo extends SinkInfo {

private List<FieldInfo> fields;

public RowDataSinkInfo(String charset, List<FieldInfo> fields) {
super(SinkInfo.ROWDATA, charset);
this.fields = fields;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.pojo;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Getter;
import lombok.experimental.SuperBuilder;

import java.util.List;

@JsonIgnoreProperties(ignoreUnknown = true)
@SuperBuilder
@Getter
public class RowDataSourceInfo extends SourceInfo {

private List<FieldInfo> fields;

public RowDataSourceInfo(String charset, List<FieldInfo> fields) {
super(charset);
this.fields = fields;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public abstract class SinkInfo {
public static final String ES_MAP = "es_map";
public static final String PARQUET = "parquet";
public static final String PB = "pb";
public static final String ROWDATA = "rowdata";

@JsonIgnore
private String type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.sdk.transform.process.processor;

import org.apache.inlong.common.pojo.sort.dataflow.field.format.StringFormatInfo;
import org.apache.inlong.sdk.transform.decode.ParquetInputByteArray;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;

Expand Down Expand Up @@ -48,6 +49,7 @@ protected List<FieldInfo> getTestFieldList(String... fieldNames) {
for (String fieldName : fieldNames) {
FieldInfo field = new FieldInfo();
field.setName(fieldName);
field.setFormatInfo(new StringFormatInfo());
fields.add(field);
}
return fields;
Expand Down
Loading

0 comments on commit 3a297fb

Please sign in to comment.