From ea146704dae3821657df5f214dc9f4031ec8c472 Mon Sep 17 00:00:00 2001 From: vernedeng Date: Thu, 19 Dec 2024 11:11:37 +0800 Subject: [PATCH] [INLONG-11611][SDK] Transform SDK supports RowData source and sink --- .../transform/decode/AvroSourceDecoder.java | 2 +- .../transform/decode/BsonSourceDecoder.java | 2 +- .../transform/decode/CsvSourceDecoder.java | 2 +- .../transform/decode/JsonSourceDecoder.java | 2 +- .../sdk/transform/decode/KvSourceDecoder.java | 2 +- .../decode/ParquetSourceDecoder.java | 2 +- .../sdk/transform/decode/PbSourceDecoder.java | 7 +- .../decode/RowDataSourceDecoder.java | 2 +- .../sdk/transform/decode/SourceDecoder.java | 2 +- .../transform/decode/XmlSourceDecoder.java | 2 +- .../transform/decode/YamlSourceDecoder.java | 2 +- .../processor/AbstractProcessorTestBase.java | 2 + .../TestRowData2RowDataProcessor.java | 71 +++++++++++++++++++ 13 files changed, 84 insertions(+), 16 deletions(-) create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestRowData2RowDataProcessor.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceDecoder.java index a2ebb9cd839..992dea88c6b 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceDecoder.java @@ -66,7 +66,7 @@ public AvroSourceDecoder(AvroSourceInfo sourceInfo) { } @Override - public SourceData decodeBytes(byte[] srcBytes, Context context) { + public SourceData decode(byte[] srcBytes, Context context) { try { InputStream inputStream = new ByteArrayInputStream(srcBytes); DataFileStream dataFileStream = diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceDecoder.java index c760ef8cc39..6dcde13adeb 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceDecoder.java @@ -41,7 +41,7 @@ public BsonSourceDecoder(BsonSourceInfo sourceInfo) { } @Override - public SourceData decodeBytes(byte[] srcBytes, Context context) { + public SourceData decode(byte[] srcBytes, Context context) { return decoder.decode(parse(srcBytes), context); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java index 7e389451b01..830d5c2c013 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java @@ -52,7 +52,7 @@ public CsvSourceDecoder(CsvSourceInfo sourceInfo) { } @Override - public SourceData decodeBytes(byte[] srcBytes, Context context) { + public SourceData decode(byte[] srcBytes, Context context) { String srcString = new String(srcBytes, srcCharset); return this.decode(srcString, context); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java index 8823e121768..0332094090e 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java @@ -70,7 +70,7 @@ public JsonSourceDecoder(JsonSourceInfo sourceInfo) { * @return */ @Override - public SourceData decodeBytes(byte[] srcBytes, Context context) { + public SourceData decode(byte[] srcBytes, Context context) { String srcString = new String(srcBytes, srcCharset); return this.decode(srcString, context); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java index c0c4600cd7e..e62fcccc375 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java @@ -66,7 +66,7 @@ public KvSourceDecoder(KvSourceInfo sourceInfo) { } @Override - public SourceData decodeBytes(byte[] srcBytes, Context context) { + public SourceData decode(byte[] srcBytes, Context context) { String srcString = new String(srcBytes, srcCharset); return this.decode(srcString, context); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceDecoder.java index cda3f7f6a45..85e3ba319dd 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceDecoder.java @@ -68,7 +68,7 @@ public ParquetSourceDecoder(ParquetSourceInfo sourceInfo) { @SuppressWarnings("unchecked") @Override - public SourceData decodeBytes(byte[] srcBytes, Context context) { + public SourceData decode(byte[] srcBytes, Context context) { try { // Create a custom InputFile InputFile inputFile = new ParquetInputByteArray(srcBytes); diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java index cd4da6fbfe1..4ee704bd0ea 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java @@ -96,7 +96,7 @@ public PbSourceDecoder(PbSourceInfo sourceInfo) { */ @SuppressWarnings("unchecked") @Override - public SourceData decodeBytes(byte[] srcBytes, Context context) { + public SourceData decode(byte[] srcBytes, Context context) { try { // decode DynamicMessage.Builder builder = DynamicMessage.newBuilder(rootDesc); @@ -151,9 +151,4 @@ public SourceData decodeBytes(byte[] srcBytes, Context context) { return null; } } - - @Override - public SourceData decode(byte[] bytes, Context context) { - return decodeBytes(bytes, context); - } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java index 57fee682f4f..af91225ed41 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java @@ -54,7 +54,7 @@ private Map parseFieldPositionMap(List fields) { } @Override - public SourceData decodeBytes(byte[] srcBytes, Context context) { + public SourceData decode(byte[] srcBytes, Context context) { throw new UnsupportedOperationException("do not support decoding bytes for row data decoder"); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java index 620d28147bb..26bbbbbaac1 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java @@ -41,7 +41,7 @@ public SourceDecoder(List fields) { this.fields = fields; } - public abstract SourceData decodeBytes(byte[] srcBytes, Context context); + public abstract SourceData decode(byte[] srcBytes, Context context); public abstract SourceData decode(Input input, Context context); diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/XmlSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/XmlSourceDecoder.java index 45e3149cf89..a9a6bb9a66c 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/XmlSourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/XmlSourceDecoder.java @@ -58,7 +58,7 @@ public XmlSourceDecoder(XmlSourceInfo sourceInfo) { } @Override - public SourceData decodeBytes(byte[] srcBytes, Context context) { + public SourceData decode(byte[] srcBytes, Context context) { String srcString = new String(srcBytes, srcCharset); return this.decode(srcString, context); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceDecoder.java index 130b5d0bc54..5be1a0a93ff 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceDecoder.java @@ -52,7 +52,7 @@ public YamlSourceDecoder(YamlSourceInfo sourceInfo) { } } @Override - public SourceData decodeBytes(byte[] srcBytes, Context context) { + public SourceData decode(byte[] srcBytes, Context context) { String srcString = new String(srcBytes, srcCharset); return this.decode(srcString, context); } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java index 3322d831992..bc0fb0371e5 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java @@ -17,6 +17,7 @@ package org.apache.inlong.sdk.transform.process.processor; +import org.apache.inlong.common.pojo.sort.dataflow.field.format.StringFormatInfo; import org.apache.inlong.sdk.transform.decode.ParquetInputByteArray; import org.apache.inlong.sdk.transform.pojo.FieldInfo; @@ -48,6 +49,7 @@ protected List getTestFieldList(String... fieldNames) { for (String fieldName : fieldNames) { FieldInfo field = new FieldInfo(); field.setName(fieldName); + field.setFormatInfo(new StringFormatInfo()); fields.add(field); } return fields; diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestRowData2RowDataProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestRowData2RowDataProcessor.java new file mode 100644 index 00000000000..7a1b3c53ba8 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestRowData2RowDataProcessor.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.processor; + +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.FieldInfo; +import org.apache.inlong.sdk.transform.pojo.RowDataSinkInfo; +import org.apache.inlong.sdk.transform.pojo.RowDataSourceInfo; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class TestRowData2RowDataProcessor extends AbstractProcessorTestBase { + + @Test + public void testRowData2RowData() throws Exception { + List fields1 = this.getTestFieldList("sid", "packageID", "msgTime", "msg"); + RowDataSourceInfo sourceInfo = new RowDataSourceInfo("utf-8", fields1); + List fields2 = this.getTestFieldList("f1", "f2", "f3", "f4"); + RowDataSinkInfo sinkInfo = new RowDataSinkInfo("utf-8", fields2); + + String transformSql = "select msgTime ,msg, packageID, sid"; + TransformConfig config = new TransformConfig(transformSql); + TransformProcessor processor = + TransformProcessor.create( + config, + SourceDecoderFactory.createRowDecoder(sourceInfo), + SinkEncoderFactory.createRowEncoder(sinkInfo)); + + RowData sourceRow = createRowData(); + + List sinkRow = processor.transform(sourceRow); + RowData expectedRow = sinkRow.get(0); + Assert.assertEquals("2024-12-19T11:00:55.212", expectedRow.getString(0).toString()); + Assert.assertEquals("msg111", expectedRow.getString(1).toString()); + Assert.assertEquals("pack123", expectedRow.getString(2).toString()); + Assert.assertEquals("s123", expectedRow.getString(3).toString()); + + } + + private RowData createRowData() { + GenericRowData rowData = new GenericRowData(4); + rowData.setField(0, StringData.fromString("s123")); + rowData.setField(1, StringData.fromString("pack123")); + rowData.setField(2, StringData.fromString("2024-12-19T11:00:55.212")); + rowData.setField(3, StringData.fromString("msg111")); + return rowData; + } +}