Skip to content

Commit

Permalink
[INLONG-11382][SDK] Optimize all columns select of Transform SDK
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng committed Oct 21, 2024
1 parent dd37f0a commit 22f3247
Show file tree
Hide file tree
Showing 20 changed files with 96 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import java.util.ArrayList;
import java.util.List;

public class AvroSourceDecoder implements SourceDecoder<byte[]> {
public class AvroSourceDecoder extends SourceDecoder<byte[]> {

private static final Logger LOG = LoggerFactory.getLogger(AvroSourceDecoder.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* BsonSourceDecoder
*/
@Slf4j
public class BsonSourceDecoder implements SourceDecoder<byte[]> {
public class BsonSourceDecoder extends SourceDecoder<byte[]> {

private final JsonSourceDecoder decoder;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,20 @@
import org.apache.commons.lang3.StringUtils;

import java.nio.charset.Charset;
import java.util.List;

/**
* CsvSourceDecoder
*
*/
public class CsvSourceDecoder implements SourceDecoder<String> {
public class CsvSourceDecoder extends SourceDecoder<String> {

protected CsvSourceInfo sourceInfo;
private Charset srcCharset = Charset.defaultCharset();
private Character delimiter = '|';
private Character escapeChar = null;
private List<FieldInfo> fields;

public CsvSourceDecoder(CsvSourceInfo sourceInfo) {
super(sourceInfo.getFields());
this.sourceInfo = sourceInfo;
if (sourceInfo.getDelimiter() != null) {
this.delimiter = sourceInfo.getDelimiter();
Expand All @@ -50,7 +49,6 @@ public CsvSourceDecoder(CsvSourceInfo sourceInfo) {
if (!StringUtils.isBlank(sourceInfo.getCharset())) {
this.srcCharset = Charset.forName(sourceInfo.getCharset());
}
this.fields = sourceInfo.getFields();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
* JsonSourceDecoder
*
*/
public class JsonSourceDecoder implements SourceDecoder<String> {
public class JsonSourceDecoder extends SourceDecoder<String> {

protected JsonSourceInfo sourceInfo;
private Charset srcCharset = Charset.defaultCharset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* KvSourceDecoder
*
*/
public class KvSourceDecoder implements SourceDecoder<String> {
public class KvSourceDecoder extends SourceDecoder<String> {

protected KvSourceInfo sourceInfo;
private Character entryDelimiter = '&';
Expand All @@ -41,9 +41,9 @@ public class KvSourceDecoder implements SourceDecoder<String> {
private Character quoteChar = '\"';
private Character lineDelimiter = '\n';
private Charset srcCharset = Charset.defaultCharset();
private List<FieldInfo> fields;

public KvSourceDecoder(KvSourceInfo sourceInfo) {
super(sourceInfo.getFields());
this.sourceInfo = sourceInfo;
if (!StringUtils.isBlank(sourceInfo.getCharset())) {
this.srcCharset = Charset.forName(sourceInfo.getCharset());
Expand All @@ -63,8 +63,6 @@ public KvSourceDecoder(KvSourceInfo sourceInfo) {
if (sourceInfo.getLineDelimiter() != null) {
this.lineDelimiter = sourceInfo.getLineDelimiter();
}

this.fields = sourceInfo.getFields();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
/**
* PbSourceDecoder
*/
public class ParquetSourceDecoder implements SourceDecoder<byte[]> {
public class ParquetSourceDecoder extends SourceDecoder<byte[]> {

private static final Logger LOG = LoggerFactory.getLogger(ParquetSourceDecoder.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
* PbSourceDecoder
*
*/
public class PbSourceDecoder implements SourceDecoder<byte[]> {
public class PbSourceDecoder extends SourceDecoder<byte[]> {

private static final Logger LOG = LoggerFactory.getLogger(PbSourceDecoder.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,32 @@

package org.apache.inlong.sdk.transform.decode;

import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.process.Context;

import com.google.common.collect.ImmutableList;
import lombok.Getter;

import java.util.List;

/**
* SourceDecoder
*/
public interface SourceDecoder<Input> {
@Getter
public abstract class SourceDecoder<Input> {

protected final List<FieldInfo> fields;

public SourceDecoder() {
this(ImmutableList.of());
}

public SourceDecoder(List<FieldInfo> fields) {
this.fields = fields;
}

SourceData decode(byte[] srcBytes, Context context);
public abstract SourceData decode(byte[] srcBytes, Context context);

SourceData decode(Input input, Context context);
public abstract SourceData decode(Input input, Context context);

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
* XmlSourceDecoder
*/
@Slf4j
public class XmlSourceDecoder implements SourceDecoder<String> {
public class XmlSourceDecoder extends SourceDecoder<String> {

protected XmlSourceInfo sourceInfo;
private Charset srcCharset = Charset.defaultCharset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.Map;

@Slf4j
public class YamlSourceDecoder implements SourceDecoder<String> {
public class YamlSourceDecoder extends SourceDecoder<String> {

protected YamlSourceInfo sourceInfo;
private Charset srcCharset = Charset.defaultCharset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,20 @@
import org.apache.commons.lang3.StringUtils;

import java.nio.charset.Charset;
import java.util.List;

/**
* CsvSinkEncoder
*/
public class CsvSinkEncoder implements SinkEncoder<String> {
public class CsvSinkEncoder extends SinkEncoder<String> {

protected CsvSinkInfo sinkInfo;
protected Charset sinkCharset = Charset.defaultCharset();
private Character delimiter = '|';
private Character escapeChar = null;
private List<FieldInfo> fields;
private StringBuilder builder = new StringBuilder();

public CsvSinkEncoder(CsvSinkInfo sinkInfo) {
super(sinkInfo.getFields());
this.sinkInfo = sinkInfo;
if (sinkInfo.getDelimiter() != null) {
this.delimiter = sinkInfo.getDelimiter();
Expand All @@ -49,7 +48,6 @@ public CsvSinkEncoder(CsvSinkInfo sinkInfo) {
if (!StringUtils.isBlank(sinkInfo.getCharset())) {
this.sinkCharset = Charset.forName(sinkInfo.getCharset());
}
this.fields = sinkInfo.getFields();
}

/**
Expand Down Expand Up @@ -89,11 +87,4 @@ public String encode(SinkData sinkData, Context context) {
return builder.substring(0, builder.length() - 1);
}

/**
* get fields
* @return the fields
*/
public List<FieldInfo> getFields() {
return fields;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,20 @@
import org.apache.commons.lang3.StringUtils;

import java.nio.charset.Charset;
import java.util.List;

/**
* KvSinkEncoder
*/
public class KvSinkEncoder implements SinkEncoder<String> {
public class KvSinkEncoder extends SinkEncoder<String> {

protected KvSinkInfo sinkInfo;
protected Charset sinkCharset = Charset.defaultCharset();
private Character entryDelimiter = '&';
private Character kvDelimiter = '=';
private List<FieldInfo> fields;
private StringBuilder builder = new StringBuilder();

public KvSinkEncoder(KvSinkInfo sinkInfo) {
super(sinkInfo.getFields());
this.sinkInfo = sinkInfo;
if (!StringUtils.isBlank(sinkInfo.getCharset())) {
this.sinkCharset = Charset.forName(sinkInfo.getCharset());
Expand All @@ -49,7 +48,6 @@ public KvSinkEncoder(KvSinkInfo sinkInfo) {
if (sinkInfo.getKvDelimiter() != null) {
this.kvDelimiter = sinkInfo.getKvDelimiter();
}
this.fields = sinkInfo.getFields();
}

/**
Expand Down Expand Up @@ -78,12 +76,4 @@ public String encode(SinkData sinkData, Context context) {
}
return builder.substring(0, builder.length() - 1);
}

/**
* get fields
* @return the fields
*/
public List<FieldInfo> getFields() {
return fields;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@Slf4j
public class MapSinkEncoder implements SinkEncoder<Map<String, Object>> {
public class MapSinkEncoder extends SinkEncoder<Map<String, Object>> {

private final MapSinkInfo sinkInfo;
private final Map<String, TypeConverter> converters;

public MapSinkEncoder(MapSinkInfo sinkInfo) {
super(sinkInfo.getFields());
this.sinkInfo = sinkInfo;
this.converters = sinkInfo.getFields()
.stream()
Expand All @@ -47,7 +47,7 @@ public MapSinkEncoder(MapSinkInfo sinkInfo) {
@Override
public Map<String, Object> encode(SinkData sinkData, Context context) {
Map<String, Object> esMap = new HashMap<>();
for (FieldInfo fieldInfo : sinkInfo.getFields()) {
for (FieldInfo fieldInfo : fields) {
String fieldName = fieldInfo.getName();
String strValue = sinkData.getField(fieldName);
TypeConverter converter = converters.get(fieldName);
Expand All @@ -65,9 +65,4 @@ public Map<String, Object> encode(SinkData sinkData, Context context) {

return esMap;
}

@Override
public List<FieldInfo> getFields() {
return sinkInfo.getFields();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -38,17 +37,14 @@
/**
* ParquetSinkEncoder
*/
public class ParquetSinkEncoder implements SinkEncoder<ByteArrayOutputStream> {
public class ParquetSinkEncoder extends SinkEncoder<ByteArrayOutputStream> {

protected ParquetSinkInfo sinkInfo;
protected Charset sinkCharset = Charset.defaultCharset();

private final List<FieldInfo> fields;
private ParquetByteArrayWriter<Object[]> writer;

public ParquetSinkEncoder(ParquetSinkInfo sinkInfo) {
super(sinkInfo.getFields());
this.sinkInfo = sinkInfo;
this.fields = sinkInfo.getFields();
ArrayList<Type> typesList = new ArrayList<>();
for (FieldInfo fieldInfo : this.fields) {
typesList.add(Types.required(BINARY)
Expand Down Expand Up @@ -88,10 +84,6 @@ public ByteArrayOutputStream encode(SinkData sinkData, Context context) {
return writer.getByteArrayOutputStream();
}

@Override
public List<FieldInfo> getFields() {
return this.fields;
}
public byte[] mergeByteArray(List<ByteArrayOutputStream> list) {
if (list.isEmpty()) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@

import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class PbSinkEncoder implements SinkEncoder<byte[]> {
public class PbSinkEncoder extends SinkEncoder<byte[]> {

protected PbSinkInfo sinkInfo;

Expand All @@ -40,9 +39,10 @@ public class PbSinkEncoder implements SinkEncoder<byte[]> {
private final Map<String, Descriptors.FieldDescriptor.Type> fieldTypes;

public PbSinkEncoder(PbSinkInfo pbSinkInfo) {
super(pbSinkInfo.getFields());
this.sinkInfo = pbSinkInfo;
this.fieldTypes = new HashMap<>();
for (FieldInfo field : pbSinkInfo.getFields()) {
for (FieldInfo field : this.fields) {
fieldTypes.put(field.getName(), Descriptors.FieldDescriptor.Type.STRING);
}
// decode protoDescription
Expand Down Expand Up @@ -108,8 +108,4 @@ private Object convertValue(Descriptors.FieldDescriptor fieldDescriptor, Object
}
}

@Override
public List<FieldInfo> getFields() {
return sinkInfo.getFields();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,28 @@
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.process.Context;

import com.google.common.collect.ImmutableList;
import lombok.Getter;

import java.util.List;

/**
* SinkEncoder
*/
public interface SinkEncoder<Output> {
@Getter
public abstract class SinkEncoder<Output> {

public static final String ALL_SOURCE_FIELD_SIGN = "*";

Output encode(SinkData sinkData, Context context);
protected final List<FieldInfo> fields;

public SinkEncoder() {
this(ImmutableList.of());
}

public SinkEncoder(List<FieldInfo> fields) {
this.fields = fields;
}

List<FieldInfo> getFields();
public abstract Output encode(SinkData sinkData, Context context);
}
Loading

0 comments on commit 22f3247

Please sign in to comment.