From 535059e25d7f3d5630f2f592c557f80a8917e8a4 Mon Sep 17 00:00:00 2001 From: vernedeng Date: Thu, 19 Dec 2024 22:02:25 +0800 Subject: [PATCH] [INLONG-11616][SDK] Use self-defined Field and RowData conversion utils (#11617) --- .../transform/decode/RowDataSourceData.java | 6 +- .../decode/RowDataSourceDecoder.java | 8 +- .../transform/encode/RowDataSinkEncoder.java | 23 +- .../transform/utils/FieldToRowDataUtils.java | 135 +++++++++ .../transform/utils/RowToFieldDataUtils.java | 258 ++++++++++++++++++ .../base/FieldToRowDataConverters.java | 13 +- 6 files changed, 414 insertions(+), 29 deletions(-) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/RowToFieldDataUtils.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java index 3e6ee9fc397..fbfd4234974 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java @@ -17,7 +17,7 @@ package org.apache.inlong.sdk.transform.decode; -import org.apache.inlong.sort.formats.base.RowDataToFieldConverters; +import org.apache.inlong.sdk.transform.utils.RowToFieldDataUtils; import lombok.extern.slf4j.Slf4j; import org.apache.flink.table.data.RowData; @@ -29,12 +29,12 @@ public class RowDataSourceData implements SourceData { private final RowData rowData; private final Map fieldPositionMap; - private final RowDataToFieldConverters.RowFieldConverter[] converters; + private final RowToFieldDataUtils.RowFieldConverter[] converters; public RowDataSourceData( RowData rowData, Map fieldPositionMap, - RowDataToFieldConverters.RowFieldConverter[] converters) { + RowToFieldDataUtils.RowFieldConverter[] converters) { this.rowData = rowData; this.fieldPositionMap = fieldPositionMap; this.converters = converters; diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java index fdd6c4ce088..a82fc2ac6b9 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java @@ -20,7 +20,7 @@ import org.apache.inlong.sdk.transform.pojo.FieldInfo; import org.apache.inlong.sdk.transform.pojo.RowDataSourceInfo; import org.apache.inlong.sdk.transform.process.Context; -import org.apache.inlong.sort.formats.base.RowDataToFieldConverters; +import org.apache.inlong.sdk.transform.utils.RowToFieldDataUtils; import org.apache.inlong.sort.formats.base.TableFormatForRowDataUtils; import org.apache.flink.table.data.RowData; @@ -32,16 +32,16 @@ public class RowDataSourceDecoder extends SourceDecoder { private final Map fieldPositionMap; - private final RowDataToFieldConverters.RowFieldConverter[] rowFieldConverters; + private final RowToFieldDataUtils.RowFieldConverter[] rowFieldConverters; public RowDataSourceDecoder(RowDataSourceInfo sourceInfo) { super(sourceInfo.getFields()); List fields = sourceInfo.getFields(); this.fieldPositionMap = parseFieldPositionMap(fields); - rowFieldConverters = new RowDataToFieldConverters.RowFieldConverter[fields.size()]; + rowFieldConverters = new RowToFieldDataUtils.RowFieldConverter[fields.size()]; for (int i = 0; i < rowFieldConverters.length; i++) { - rowFieldConverters[i] = RowDataToFieldConverters.createNullableRowFieldConverter( + rowFieldConverters[i] = RowToFieldDataUtils.createNullableRowFieldConverter( TableFormatForRowDataUtils.deriveLogicalType(fields.get(i).getFormatInfo())); } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java index f2203cb3cd0..1cab9f6fe3c 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java @@ -17,43 +17,28 @@ package org.apache.inlong.sdk.transform.encode; -import org.apache.inlong.sdk.transform.pojo.FieldInfo; import org.apache.inlong.sdk.transform.pojo.RowDataSinkInfo; import org.apache.inlong.sdk.transform.process.Context; -import org.apache.inlong.sort.formats.base.FieldToRowDataConverters; +import org.apache.inlong.sdk.transform.utils.FieldToRowDataUtils; import org.apache.inlong.sort.formats.base.TableFormatUtils; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - public class RowDataSinkEncoder extends SinkEncoder { - private final FieldToRowDataConverters.FieldToRowDataConverter[] fieldToRowDataConverters; - private final Map fieldPositionMap; + private final FieldToRowDataUtils.FieldToRowDataConverter[] fieldToRowDataConverters; public RowDataSinkEncoder(RowDataSinkInfo sinkInfo) { super(sinkInfo.getFields()); - this.fieldPositionMap = parseFieldPositionMap(fields); - fieldToRowDataConverters = new FieldToRowDataConverters.FieldToRowDataConverter[fields.size()]; + fieldToRowDataConverters = new FieldToRowDataUtils.FieldToRowDataConverter[fields.size()]; for (int i = 0; i < fields.size(); i++) { - fieldToRowDataConverters[i] = FieldToRowDataConverters.createConverter( + fieldToRowDataConverters[i] = FieldToRowDataUtils.createConverter( TableFormatUtils.deriveLogicalType(fields.get(i).getFormatInfo())); } } - private Map parseFieldPositionMap(List fields) { - Map map = new HashMap<>(); - for (int i = 0; i < fields.size(); i++) { - map.put(fields.get(i).getName(), i); - } - return map; - } - @Override public RowData encode(SinkData sinkData, Context context) { GenericRowData rowData = new GenericRowData(fieldToRowDataConverters.length); diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java new file mode 100644 index 00000000000..e27c0da21fe --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.utils; + +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +public class FieldToRowDataUtils { + + private static final long serialVersionUID = 1L; + + /** + * Base class of Field To RowData Converters. + */ + public interface FieldToRowDataConverter extends Serializable { + + Object convert(Object obj); + } + + public static FieldToRowDataConverter createConverter(LogicalType logicalType) { + return wrapIntoNullableConverter(createFieldRowConverter(logicalType)); + } + + private static FieldToRowDataConverter wrapIntoNullableConverter( + FieldToRowDataConverter converter) { + return obj -> { + if (obj == null) { + return null; + } + return converter.convert(obj); + }; + } + + private static FieldToRowDataConverter createFieldRowConverter(LogicalType fieldType) { + switch (fieldType.getTypeRoot()) { + case NULL: + return (obj) -> null; + case BOOLEAN: + return obj -> Boolean.parseBoolean(obj.toString()); + case TINYINT: + return obj -> Byte.parseByte(obj.toString()); + case SMALLINT: + return obj -> Short.parseShort(obj.toString()); + case INTERVAL_YEAR_MONTH: + case INTEGER: + return obj -> Integer.parseInt(obj.toString()); + case INTERVAL_DAY_TIME: + case BIGINT: + return obj -> Long.parseLong(obj.toString()); + case FLOAT: + return obj -> Float.parseFloat(obj.toString()); + case DOUBLE: + return obj -> Double.parseDouble(obj.toString()); + case BINARY: + case VARBINARY: + return obj -> obj.toString().getBytes(); + case CHAR: + case VARCHAR: + return (obj -> StringData.fromString((String) obj)); + case DATE: + return (obj -> ((Date) obj).toLocalDate().toEpochDay()); + case TIME_WITHOUT_TIME_ZONE: + return (obj -> ((Time) obj).toLocalTime().toSecondOfDay() * 1000); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_TIME_ZONE: + return obj -> TimestampData.fromTimestamp((Timestamp) obj); + case DECIMAL: + return obj -> DecimalData.fromBigDecimal( + (BigDecimal) obj, + DecimalType.DEFAULT_PRECISION, + DecimalType.DEFAULT_SCALE); + case ARRAY: + return obj -> { + final Object[] array = (Object[]) obj; + FieldToRowDataConverter elementConverter = + createFieldRowConverter(((ArrayType) fieldType).getElementType()); + Object[] converted = Arrays.stream(array) + .map(elementConverter::convert) + .toArray(); + return new GenericArrayData(converted); + }; + case MAP: + return obj -> { + FieldToRowDataConverter keyConverter = + createFieldRowConverter(((MapType) fieldType).getKeyType()); + FieldToRowDataConverter valueConverter = + createFieldRowConverter(((MapType) fieldType).getValueType()); + Map map = (Map) obj; + Map internalMap = new HashMap<>(); + for (Object k : map.keySet()) { + internalMap.put(keyConverter.convert(k), + valueConverter.convert(map.get(k))); + } + return new GenericMapData(internalMap); + }; + case ROW: + case MULTISET: + case RAW: + default: + throw new UnsupportedOperationException("Unsupported type:" + fieldType); + } + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/RowToFieldDataUtils.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/RowToFieldDataUtils.java new file mode 100644 index 00000000000..878c87ced04 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/RowToFieldDataUtils.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.utils; + +import org.apache.inlong.sort.formats.base.RowDataToFieldConverters; + +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.types.Row; + +import java.io.Serializable; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalTime; +import java.util.Arrays; + +public class RowToFieldDataUtils { + + private static final long serialVersionUID = 1L; + + /** + * Runtime converter that converts objects of Flink Table & SQL internal data structures to + * corresponding {@link Object}s. + */ + public interface RowFieldConverter extends Serializable { + + Object convert(RowData row, int pos); + } + + private interface ArrayElementConverter extends Serializable { + + Object convert(ArrayData array, int pos); + } + + public static RowFieldConverter createNullableRowFieldConverter(LogicalType fieldType) { + final RowFieldConverter fieldConverter = createRowFieldConverter(fieldType); + return (row, pos) -> { + if (row.isNullAt(pos)) { + return null; + } + return fieldConverter.convert(row, pos); + }; + } + + private static RowFieldConverter createRowFieldConverter(LogicalType fieldType) { + switch (fieldType.getTypeRoot()) { + case NULL: + return (row, pos) -> null; + case BOOLEAN: + return RowData::getBoolean; + case TINYINT: + return RowData::getByte; + case SMALLINT: + return RowData::getShort; + case INTEGER: + case INTERVAL_YEAR_MONTH: + return RowData::getInt; + case BIGINT: + case INTERVAL_DAY_TIME: + return RowData::getLong; + case FLOAT: + return RowData::getFloat; + case DOUBLE: + return RowData::getDouble; + case CHAR: + case VARCHAR: + return (row, pos) -> row.getString(pos).toString(); + case BINARY: + case VARBINARY: + return RowData::getBinary; + case DATE: + return (row, pos) -> convertDate(row.getLong(pos)); + case TIME_WITHOUT_TIME_ZONE: + return (row, pos) -> convertTime(row.getInt(pos)); + case TIMESTAMP_WITHOUT_TIME_ZONE: + final int timestampPrecision = ((TimestampType) fieldType).getPrecision(); + return (row, pos) -> convertTimestamp( + row.getTimestamp(pos, timestampPrecision)); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final int zonedTimestampPrecision = + ((LocalZonedTimestampType) fieldType).getPrecision(); + return (row, pos) -> convertTimestamp( + row.getTimestamp(pos, zonedTimestampPrecision)); + case DECIMAL: + return createDecimalRowFieldConverter((DecimalType) fieldType); + case ARRAY: + return createArrayRowFieldConverter((ArrayType) fieldType); + case ROW: + return createRowRowFieldConverter((RowType) fieldType); + case MAP: + case MULTISET: + case RAW: + default: + throw new UnsupportedOperationException("Unsupported type: " + fieldType); + } + } + + private static ArrayElementConverter createNullableArrayElementConverter( + LogicalType fieldType) { + final ArrayElementConverter elementConverter = createArrayElementConverter(fieldType); + return (array, pos) -> { + if (array.isNullAt(pos)) { + return null; + } + return elementConverter.convert(array, pos); + }; + } + + private static ArrayElementConverter createArrayElementConverter(LogicalType fieldType) { + switch (fieldType.getTypeRoot()) { + case NULL: + return (array, pos) -> null; + case BOOLEAN: + return ArrayData::getBoolean; + case TINYINT: + return ArrayData::getByte; + case SMALLINT: + return ArrayData::getShort; + case INTEGER: + case INTERVAL_YEAR_MONTH: + return ArrayData::getInt; + case BIGINT: + case INTERVAL_DAY_TIME: + return ArrayData::getLong; + case FLOAT: + return ArrayData::getFloat; + case DOUBLE: + return ArrayData::getDouble; + case CHAR: + case VARCHAR: + return (array, pos) -> array.getString(pos).toString(); + case BINARY: + case VARBINARY: + return ArrayData::getBinary; + case DATE: + return (array, pos) -> convertDate(array.getLong(pos)); + case TIME_WITHOUT_TIME_ZONE: + return (array, pos) -> convertTime(array.getInt(pos)); + case TIMESTAMP_WITHOUT_TIME_ZONE: + final int timestampPrecision = ((TimestampType) fieldType).getPrecision(); + return (array, pos) -> convertTimestamp( + array.getTimestamp(pos, timestampPrecision)); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final int localZonedTimestampPrecision = + ((LocalZonedTimestampType) fieldType).getPrecision(); + return (array, pos) -> convertTimestamp( + array.getTimestamp(pos, localZonedTimestampPrecision)); + case DECIMAL: + return createDecimalArrayElementConverter((DecimalType) fieldType); + // we don't support ARRAY and ROW in an ARRAY, see + // CsvRowSchemaConverter#validateNestedField + case ARRAY: + case ROW: + case MAP: + case MULTISET: + case RAW: + default: + throw new UnsupportedOperationException("Unsupported type: " + fieldType); + } + } + + // ------------------------------------------------------------------------------------------ + // Field/Element Converters + // ------------------------------------------------------------------------------------------ + + private static RowFieldConverter createDecimalRowFieldConverter(DecimalType decimalType) { + final int precision = decimalType.getPrecision(); + final int scale = decimalType.getScale(); + return (row, pos) -> { + DecimalData decimal = row.getDecimal(pos, precision, scale); + return decimal.toBigDecimal(); + }; + } + + private static ArrayElementConverter createDecimalArrayElementConverter( + DecimalType decimalType) { + final int precision = decimalType.getPrecision(); + final int scale = decimalType.getScale(); + return (array, pos) -> { + DecimalData decimal = array.getDecimal(pos, precision, scale); + return decimal.toBigDecimal(); + }; + } + + private static Date convertDate(long days) { + LocalDate localDate = LocalDate.ofEpochDay(days); + return Date.valueOf(localDate); + } + + private static Time convertTime(int millisecond) { + LocalTime time = LocalTime.ofNanoOfDay(millisecond * 1000_000L); + return Time.valueOf(time); + } + + private static Timestamp convertTimestamp(TimestampData timestamp) { + return timestamp.toTimestamp(); + } + + private static RowFieldConverter createArrayRowFieldConverter(ArrayType type) { + LogicalType elementType = type.getElementType(); + final ArrayElementConverter elementConverter = + createNullableArrayElementConverter(elementType); + return (row, pos) -> { + ArrayData arrayData = row.getArray(pos); + int numElements = arrayData.size(); + Object[] result = new Object[numElements]; + for (int i = 0; i < numElements; i++) { + result[i] = elementConverter.convert(arrayData, i); + } + return result; + }; + } + + private static RowFieldConverter createRowRowFieldConverter(RowType type) { + LogicalType[] fieldTypes = + type.getFields().stream() + .map(RowType.RowField::getType) + .toArray(LogicalType[]::new); + final RowFieldConverter[] fieldConverters = + Arrays.stream(fieldTypes) + .map(RowDataToFieldConverters::createNullableRowFieldConverter) + .toArray(RowFieldConverter[]::new); + final int rowArity = type.getFieldCount(); + return (row, pos) -> { + final RowData value = row.getRow(pos, rowArity); + Row result = new Row(rowArity); + for (int i = 0; i < rowArity; i++) { + result.setField(i, fieldConverters[i].convert(value, i)); + } + return result; + }; + } +} diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/FieldToRowDataConverters.java b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/FieldToRowDataConverters.java index 8d6a5473f77..adeee86fa63 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/FieldToRowDataConverters.java +++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/FieldToRowDataConverters.java @@ -70,17 +70,24 @@ private static FieldToRowDataConverter createFieldRowConverter(LogicalType field case NULL: return (obj) -> null; case BOOLEAN: + return obj -> Boolean.parseBoolean(obj.toString()); case TINYINT: + return obj -> Byte.parseByte(obj.toString()); case SMALLINT: - case INTEGER: + return obj -> Short.parseShort(obj.toString()); case INTERVAL_YEAR_MONTH: - case BIGINT: + case INTEGER: + return obj -> Integer.parseInt(obj.toString()); case INTERVAL_DAY_TIME: + case BIGINT: + return obj -> Long.parseLong(obj.toString()); case FLOAT: + return obj -> Float.parseFloat(obj.toString()); case DOUBLE: + return obj -> Double.parseDouble(obj.toString()); case BINARY: case VARBINARY: - return (obj) -> obj; + return obj -> obj.toString().getBytes(); case CHAR: case VARCHAR: return (obj -> StringData.fromString((String) obj));