From 2dd9bf7fe145280cac78269bc0900cb866ac4499 Mon Sep 17 00:00:00 2001 From: XiaoYou201 Date: Mon, 13 May 2024 10:45:58 +0800 Subject: [PATCH 1/8] [INLONG-10193][Sort] Postgres connector support audit ID. --- .../sort/postgre/PostgreSQLTableFactory.java | 17 +- .../sort/postgre/PostgreSQLTableSource.java | 258 +++++++ .../sort/postgre/PostgresValueValidator.java | 48 ++ .../RowDataDebeziumDeserializeSchema.java | 682 ++++++++++++++++++ licenses/inlong-sort-connectors/LICENSE | 8 + 5 files changed, 1009 insertions(+), 4 deletions(-) create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgresValueValidator.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java index 9c659995ca2..1ec6403ffea 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java @@ -17,14 +17,14 @@ package org.apache.inlong.sort.postgre; -import com.ververica.cdc.connectors.postgres.table.PostgreSQLTableSource; +import org.apache.inlong.sort.base.metric.MetricOption; + import com.ververica.cdc.debezium.table.DebeziumChangelogMode; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.source.DynamicTableSource; -import org.apache.flink.table.factories.DynamicTableFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; @@ -116,7 +116,7 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory { + "\"upsert\": Encodes changes as upsert stream that describes idempotent updates on a key. It can be used for tables with primary keys when replica identity FULL is not an option."); @Override - public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) { + public DynamicTableSource createDynamicTableSource(Context context) { final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); helper.validateExcept(DEBEZIUM_OPTIONS_PREFIX); @@ -139,6 +139,14 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c physicalSchema.getPrimaryKey().isPresent(), "Primary key must be present when upsert mode is selected."); } + String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null); + String auditHostAndPorts = config.get(INLONG_AUDIT); + String auditKeys = config.get(AUDIT_KEYS); + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withAuditAddress(auditHostAndPorts) + .withAuditKeys(auditKeys) + .build(); return new PostgreSQLTableSource( physicalSchema, @@ -152,7 +160,8 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c pluginName, slotName, changelogMode, - getDebeziumProperties(context.getCatalogTable().getOptions())); + getDebeziumProperties(context.getCatalogTable().getOptions()), + metricOption); } @Override diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java new file mode 100644 index 00000000000..fbea3037b08 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.postgre; + +import org.apache.inlong.sort.base.metric.MetricOption; + +import com.ververica.cdc.connectors.postgres.PostgreSQLSource; +import com.ververica.cdc.connectors.postgres.table.PostgreSQLDeserializationConverterFactory; +import com.ververica.cdc.connectors.postgres.table.PostgreSQLReadableMetadata; +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import com.ververica.cdc.debezium.DebeziumSourceFunction; +import com.ververica.cdc.debezium.table.DebeziumChangelogMode; +import com.ververica.cdc.debezium.table.MetadataConverter; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceFunctionProvider; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link DynamicTableSource} that describes how to create a PostgreSQL source from a logical + * description. + */ +public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMetadata { + + private final ResolvedSchema physicalSchema; + private final int port; + private final String hostname; + private final String database; + private final String schemaName; + private final String tableName; + private final String username; + private final String password; + private final String pluginName; + private final String slotName; + private final DebeziumChangelogMode changelogMode; + private final Properties dbzProperties; + + // -------------------------------------------------------------------------------------------- + // Mutable attributes + // -------------------------------------------------------------------------------------------- + + /** Data type that describes the final output of the source. */ + protected DataType producedDataType; + + /** Metadata that is appended at the end of a physical source row. */ + protected List metadataKeys; + private MetricOption metricOption; + public PostgreSQLTableSource( + ResolvedSchema physicalSchema, + int port, + String hostname, + String database, + String schemaName, + String tableName, + String username, + String password, + String pluginName, + String slotName, + DebeziumChangelogMode changelogMode, + Properties dbzProperties, + MetricOption metricOption) { + this.physicalSchema = physicalSchema; + this.port = port; + this.hostname = checkNotNull(hostname); + this.database = checkNotNull(database); + this.schemaName = checkNotNull(schemaName); + this.tableName = checkNotNull(tableName); + this.username = checkNotNull(username); + this.password = checkNotNull(password); + this.pluginName = checkNotNull(pluginName); + this.slotName = slotName; + this.changelogMode = changelogMode; + this.dbzProperties = dbzProperties; + this.producedDataType = physicalSchema.toPhysicalRowDataType(); + this.metadataKeys = Collections.emptyList(); + this.metricOption = metricOption; + } + + @Override + public ChangelogMode getChangelogMode() { + switch (changelogMode) { + case UPSERT: + return ChangelogMode.upsert(); + case ALL: + return ChangelogMode.all(); + default: + throw new UnsupportedOperationException( + "Unsupported changelog mode: " + changelogMode); + } + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { + RowType physicalDataType = + (RowType) physicalSchema.toPhysicalRowDataType().getLogicalType(); + MetadataConverter[] metadataConverters = getMetadataConverters(); + TypeInformation typeInfo = scanContext.createTypeInformation(producedDataType); + + DebeziumDeserializationSchema deserializer = + RowDataDebeziumDeserializeSchema.newBuilder() + .setPhysicalRowType(physicalDataType) + .setMetadataConverters(metadataConverters) + .setResultTypeInfo(typeInfo) + .setUserDefinedConverterFactory( + PostgreSQLDeserializationConverterFactory.instance()) + .setValueValidator(new PostgresValueValidator(schemaName, tableName)) + .setChangelogMode(changelogMode) + .build(); + DebeziumSourceFunction sourceFunction = + PostgreSQLSource.builder() + .hostname(hostname) + .port(port) + .database(database) + .schemaList(schemaName) + .tableList(schemaName + "." + tableName) + .username(username) + .password(password) + .decodingPluginName(pluginName) + .slotName(slotName) + .debeziumProperties(dbzProperties) + .deserializer(deserializer) + .build(); + return SourceFunctionProvider.of(sourceFunction, false); + } + + private MetadataConverter[] getMetadataConverters() { + if (metadataKeys.isEmpty()) { + return new MetadataConverter[0]; + } + + return metadataKeys.stream() + .map( + key -> Stream.of(PostgreSQLReadableMetadata.values()) + .filter(m -> m.getKey().equals(key)) + .findFirst() + .orElseThrow(IllegalStateException::new)) + .map(PostgreSQLReadableMetadata::getConverter) + .toArray(MetadataConverter[]::new); + } + + @Override + public DynamicTableSource copy() { + PostgreSQLTableSource source = + new PostgreSQLTableSource( + physicalSchema, + port, + hostname, + database, + schemaName, + tableName, + username, + password, + pluginName, + slotName, + changelogMode, + dbzProperties, + metricOption); + source.metadataKeys = metadataKeys; + source.producedDataType = producedDataType; + return source; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PostgreSQLTableSource that = (PostgreSQLTableSource) o; + return port == that.port + && Objects.equals(physicalSchema, that.physicalSchema) + && Objects.equals(hostname, that.hostname) + && Objects.equals(database, that.database) + && Objects.equals(schemaName, that.schemaName) + && Objects.equals(tableName, that.tableName) + && Objects.equals(username, that.username) + && Objects.equals(password, that.password) + && Objects.equals(pluginName, that.pluginName) + && Objects.equals(slotName, that.slotName) + && Objects.equals(dbzProperties, that.dbzProperties) + && Objects.equals(producedDataType, that.producedDataType) + && Objects.equals(metadataKeys, that.metadataKeys) + && Objects.equals(changelogMode, that.changelogMode) + && Objects.equals(metricOption, that.metricOption); + } + + @Override + public int hashCode() { + return Objects.hash( + physicalSchema, + port, + hostname, + database, + schemaName, + tableName, + username, + password, + pluginName, + slotName, + dbzProperties, + producedDataType, + metadataKeys, + changelogMode); + } + + @Override + public String asSummaryString() { + return "PostgreSQL-CDC"; + } + + @Override + public Map listReadableMetadata() { + return Stream.of(PostgreSQLReadableMetadata.values()) + .collect( + Collectors.toMap( + PostgreSQLReadableMetadata::getKey, + PostgreSQLReadableMetadata::getDataType)); + } + + @Override + public void applyReadableMetadata(List metadataKeys, DataType producedDataType) { + this.metadataKeys = metadataKeys; + this.producedDataType = producedDataType; + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgresValueValidator.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgresValueValidator.java new file mode 100644 index 00000000000..48aafbe2a0b --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgresValueValidator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.postgre; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.types.RowKind; + +/** The {@link RowDataDebeziumDeserializeSchema.ValueValidator} for Postgres connector. */ +public final class PostgresValueValidator + implements + RowDataDebeziumDeserializeSchema.ValueValidator { + + private static final long serialVersionUID = -1870679469578028765L; + + private static final String REPLICA_IDENTITY_EXCEPTION = + "The \"before\" field of UPDATE/DELETE message is null, " + + "please check the Postgres table has been set REPLICA IDENTITY to FULL level. " + + "You can update the setting by running the command in Postgres 'ALTER TABLE %s REPLICA IDENTITY FULL'. " + + "Please see more in Debezium documentation: https://debezium.io/documentation/reference/1.5/connectors/postgresql.html#postgresql-replica-identity"; + + private final String schemaTable; + + public PostgresValueValidator(String schema, String table) { + this.schemaTable = schema + "." + table; + } + + @Override + public void validate(RowData rowData, RowKind rowKind) throws Exception { + if (rowData == null) { + throw new IllegalStateException(String.format(REPLICA_IDENTITY_EXCEPTION, schemaTable)); + } + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java new file mode 100644 index 00000000000..4e8dd50e301 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java @@ -0,0 +1,682 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.postgre; + +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricsCollector; +import org.apache.inlong.sort.base.metric.SourceMetricData; + +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import com.ververica.cdc.debezium.table.AppendMetadataCollector; +import com.ververica.cdc.debezium.table.DebeziumChangelogMode; +import com.ververica.cdc.debezium.table.DeserializationRuntimeConverter; +import com.ververica.cdc.debezium.table.DeserializationRuntimeConverterFactory; +import com.ververica.cdc.debezium.table.MetadataConverter; +import com.ververica.cdc.debezium.utils.TemporalConversions; +import io.debezium.data.Envelope; +import io.debezium.data.SpecialValueDecimal; +import io.debezium.data.VariableScaleDecimal; +import io.debezium.time.MicroTime; +import io.debezium.time.MicroTimestamp; +import io.debezium.time.NanoTime; +import io.debezium.time.NanoTimestamp; +import io.debezium.time.Timestamp; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Collector; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Deserialization schema from Debezium object to Flink Table/SQL internal data structure {@link + * RowData}. + */ +public final class RowDataDebeziumDeserializeSchema + implements + DebeziumDeserializationSchema { + + private static final long serialVersionUID = 2L; + + /** Custom validator to validate the row value. */ + public interface ValueValidator extends Serializable { + + void validate(RowData rowData, RowKind rowKind) throws Exception; + } + + /** TypeInformation of the produced {@link RowData}. * */ + private final TypeInformation resultTypeInfo; + + /** + * Runtime converter that converts Kafka {@link SourceRecord}s into {@link RowData} consisted of + * physical column values. + */ + private final DeserializationRuntimeConverter physicalConverter; + + /** Whether the deserializer needs to handle metadata columns. */ + private final boolean hasMetadata; + + /** + * A wrapped output collector which is used to append metadata columns after physical columns. + */ + private final AppendMetadataCollector appendMetadataCollector; + + /** Validator to validate the row value. */ + private final ValueValidator validator; + + /** Changelog Mode to use for encoding changes in Flink internal data structure. */ + private final DebeziumChangelogMode changelogMode; + private SourceMetricData sourceMetricData; + + /** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */ + public static Builder newBuilder() { + return new Builder(); + } + + RowDataDebeziumDeserializeSchema( + RowType physicalDataType, + MetadataConverter[] metadataConverters, + TypeInformation resultTypeInfo, + ValueValidator validator, + ZoneId serverTimeZone, + DeserializationRuntimeConverterFactory userDefinedConverterFactory, + DebeziumChangelogMode changelogMode, + SourceMetricData sourceMetricData) { + this.hasMetadata = checkNotNull(metadataConverters).length > 0; + this.appendMetadataCollector = new AppendMetadataCollector(metadataConverters); + this.physicalConverter = + createConverter( + checkNotNull(physicalDataType), + serverTimeZone, + userDefinedConverterFactory); + this.resultTypeInfo = checkNotNull(resultTypeInfo); + this.validator = checkNotNull(validator); + this.changelogMode = checkNotNull(changelogMode); + this.sourceMetricData = sourceMetricData; + } + + @Override + public void deserialize(SourceRecord record, Collector out) throws Exception { + Envelope.Operation op = Envelope.operationFor(record); + Struct value = (Struct) record.value(); + Schema valueSchema = record.valueSchema(); + if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) { + GenericRowData insert = extractAfterRow(value, valueSchema); + validator.validate(insert, RowKind.INSERT); + insert.setRowKind(RowKind.INSERT); + out = new MetricsCollector<>(out, sourceMetricData); + emit(record, insert, out); + } else if (op == Envelope.Operation.DELETE) { + GenericRowData delete = extractBeforeRow(value, valueSchema); + validator.validate(delete, RowKind.DELETE); + delete.setRowKind(RowKind.DELETE); + emit(record, delete, out); + } else { + if (changelogMode == DebeziumChangelogMode.ALL) { + GenericRowData before = extractBeforeRow(value, valueSchema); + validator.validate(before, RowKind.UPDATE_BEFORE); + before.setRowKind(RowKind.UPDATE_BEFORE); + emit(record, before, out); + } + + GenericRowData after = extractAfterRow(value, valueSchema); + validator.validate(after, RowKind.UPDATE_AFTER); + after.setRowKind(RowKind.UPDATE_AFTER); + out = new MetricsCollector<>(out, sourceMetricData); + emit(record, after, out); + } + } + + private GenericRowData extractAfterRow(Struct value, Schema valueSchema) throws Exception { + Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema(); + Struct after = value.getStruct(Envelope.FieldName.AFTER); + return (GenericRowData) physicalConverter.convert(after, afterSchema); + } + + private GenericRowData extractBeforeRow(Struct value, Schema valueSchema) throws Exception { + Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema(); + Struct before = value.getStruct(Envelope.FieldName.BEFORE); + return (GenericRowData) physicalConverter.convert(before, beforeSchema); + } + + private void emit(SourceRecord inRecord, RowData physicalRow, Collector collector) { + if (!hasMetadata) { + collector.collect(physicalRow); + return; + } + appendMetadataCollector.inputRecord = inRecord; + appendMetadataCollector.outputCollector = collector; + appendMetadataCollector.collect(physicalRow); + } + + @Override + public TypeInformation getProducedType() { + return resultTypeInfo; + } + + // ------------------------------------------------------------------------------------- + // Builder + // ------------------------------------------------------------------------------------- + + /** Builder of {@link RowDataDebeziumDeserializeSchema}. */ + public static class Builder { + + private RowType physicalRowType; + private TypeInformation resultTypeInfo; + private MetadataConverter[] metadataConverters = new MetadataConverter[0]; + private ValueValidator validator = (rowData, rowKind) -> { + }; + private ZoneId serverTimeZone = ZoneId.of("UTC"); + private DeserializationRuntimeConverterFactory userDefinedConverterFactory = + DeserializationRuntimeConverterFactory.DEFAULT; + private DebeziumChangelogMode changelogMode = DebeziumChangelogMode.ALL; + private SourceMetricData sourceMetricData; + + public Builder setPhysicalRowType(RowType physicalRowType) { + this.physicalRowType = physicalRowType; + return this; + } + + public Builder setMetadataConverters(MetadataConverter[] metadataConverters) { + this.metadataConverters = metadataConverters; + return this; + } + + public Builder setResultTypeInfo(TypeInformation resultTypeInfo) { + this.resultTypeInfo = resultTypeInfo; + return this; + } + + public Builder setValueValidator(ValueValidator validator) { + this.validator = validator; + return this; + } + + public Builder setServerTimeZone(ZoneId serverTimeZone) { + this.serverTimeZone = serverTimeZone; + return this; + } + + public Builder setUserDefinedConverterFactory( + DeserializationRuntimeConverterFactory userDefinedConverterFactory) { + this.userDefinedConverterFactory = userDefinedConverterFactory; + return this; + } + + public Builder setChangelogMode(DebeziumChangelogMode changelogMode) { + this.changelogMode = changelogMode; + return this; + } + public Builder setSourceMetricData(MetricOption metricOption) { + if (metricOption != null) { + this.sourceMetricData = new SourceMetricData(metricOption); + } + return this; + } + + public RowDataDebeziumDeserializeSchema build() { + return new RowDataDebeziumDeserializeSchema( + physicalRowType, + metadataConverters, + resultTypeInfo, + validator, + serverTimeZone, + userDefinedConverterFactory, + changelogMode, + sourceMetricData); + } + } + + // ------------------------------------------------------------------------------------- + // Runtime Converters + // ------------------------------------------------------------------------------------- + + /** Creates a runtime converter which is null safe. */ + private static DeserializationRuntimeConverter createConverter( + LogicalType type, + ZoneId serverTimeZone, + DeserializationRuntimeConverterFactory userDefinedConverterFactory) { + return wrapIntoNullableConverter( + createNotNullConverter(type, serverTimeZone, userDefinedConverterFactory)); + } + + // -------------------------------------------------------------------------------- + // IMPORTANT! We use anonymous classes instead of lambdas for a reason here. It is + // necessary because the maven shade plugin cannot relocate classes in + // SerializedLambdas (MSHADE-260). + // -------------------------------------------------------------------------------- + + /** Creates a runtime converter which assuming input object is not null. */ + public static DeserializationRuntimeConverter createNotNullConverter( + LogicalType type, + ZoneId serverTimeZone, + DeserializationRuntimeConverterFactory userDefinedConverterFactory) { + // user defined converter has a higher resolve order + Optional converter = + userDefinedConverterFactory.createUserDefinedConverter(type, serverTimeZone); + if (converter.isPresent()) { + return converter.get(); + } + + // if no matched user defined converter, fallback to the default converter + switch (type.getTypeRoot()) { + case NULL: + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + return null; + } + }; + case BOOLEAN: + return convertToBoolean(); + case TINYINT: + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + return Byte.parseByte(dbzObj.toString()); + } + }; + case SMALLINT: + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + return Short.parseShort(dbzObj.toString()); + } + }; + case INTEGER: + case INTERVAL_YEAR_MONTH: + return convertToInt(); + case BIGINT: + case INTERVAL_DAY_TIME: + return convertToLong(); + case DATE: + return convertToDate(); + case TIME_WITHOUT_TIME_ZONE: + return convertToTime(); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return convertToTimestamp(serverTimeZone); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return convertToLocalTimeZoneTimestamp(serverTimeZone); + case FLOAT: + return convertToFloat(); + case DOUBLE: + return convertToDouble(); + case CHAR: + case VARCHAR: + return convertToString(); + case BINARY: + case VARBINARY: + return convertToBinary(); + case DECIMAL: + return createDecimalConverter((DecimalType) type); + case ROW: + return createRowConverter( + (RowType) type, serverTimeZone, userDefinedConverterFactory); + case ARRAY: + case MAP: + case MULTISET: + case RAW: + default: + throw new UnsupportedOperationException("Unsupported type: " + type); + } + } + + private static DeserializationRuntimeConverter convertToBoolean() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof Boolean) { + return dbzObj; + } else if (dbzObj instanceof Byte) { + return (byte) dbzObj == 1; + } else if (dbzObj instanceof Short) { + return (short) dbzObj == 1; + } else { + return Boolean.parseBoolean(dbzObj.toString()); + } + } + }; + } + + private static DeserializationRuntimeConverter convertToInt() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof Integer) { + return dbzObj; + } else if (dbzObj instanceof Long) { + return ((Long) dbzObj).intValue(); + } else { + return Integer.parseInt(dbzObj.toString()); + } + } + }; + } + + private static DeserializationRuntimeConverter convertToLong() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof Integer) { + return ((Integer) dbzObj).longValue(); + } else if (dbzObj instanceof Long) { + return dbzObj; + } else { + return Long.parseLong(dbzObj.toString()); + } + } + }; + } + + private static DeserializationRuntimeConverter convertToDouble() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof Float) { + return ((Float) dbzObj).doubleValue(); + } else if (dbzObj instanceof Double) { + return dbzObj; + } else { + return Double.parseDouble(dbzObj.toString()); + } + } + }; + } + + private static DeserializationRuntimeConverter convertToFloat() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof Float) { + return dbzObj; + } else if (dbzObj instanceof Double) { + return ((Double) dbzObj).floatValue(); + } else { + return Float.parseFloat(dbzObj.toString()); + } + } + }; + } + + private static DeserializationRuntimeConverter convertToDate() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + return (int) TemporalConversions.toLocalDate(dbzObj).toEpochDay(); + } + }; + } + + private static DeserializationRuntimeConverter convertToTime() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof Long) { + switch (schema.name()) { + case MicroTime.SCHEMA_NAME: + return (int) ((long) dbzObj / 1000); + case NanoTime.SCHEMA_NAME: + return (int) ((long) dbzObj / 1000_000); + } + } else if (dbzObj instanceof Integer) { + return dbzObj; + } + // get number of milliseconds of the day + return TemporalConversions.toLocalTime(dbzObj).toSecondOfDay() * 1000; + } + }; + } + + private static DeserializationRuntimeConverter convertToTimestamp(ZoneId serverTimeZone) { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof Long) { + switch (schema.name()) { + case Timestamp.SCHEMA_NAME: + return TimestampData.fromEpochMillis((Long) dbzObj); + case MicroTimestamp.SCHEMA_NAME: + long micro = (long) dbzObj; + return TimestampData.fromEpochMillis( + micro / 1000, (int) (micro % 1000 * 1000)); + case NanoTimestamp.SCHEMA_NAME: + long nano = (long) dbzObj; + return TimestampData.fromEpochMillis( + nano / 1000_000, (int) (nano % 1000_000)); + } + } + LocalDateTime localDateTime = + TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone); + return TimestampData.fromLocalDateTime(localDateTime); + } + }; + } + + private static DeserializationRuntimeConverter convertToLocalTimeZoneTimestamp( + ZoneId serverTimeZone) { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof String) { + String str = (String) dbzObj; + // TIMESTAMP_LTZ type is encoded in string type + Instant instant = Instant.parse(str); + return TimestampData.fromLocalDateTime( + LocalDateTime.ofInstant(instant, serverTimeZone)); + } + throw new IllegalArgumentException( + "Unable to convert to TimestampData from unexpected value '" + + dbzObj + + "' of type " + + dbzObj.getClass().getName()); + } + }; + } + + private static DeserializationRuntimeConverter convertToString() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + return StringData.fromString(dbzObj.toString()); + } + }; + } + + private static DeserializationRuntimeConverter convertToBinary() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof byte[]) { + return dbzObj; + } else if (dbzObj instanceof ByteBuffer) { + ByteBuffer byteBuffer = (ByteBuffer) dbzObj; + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + return bytes; + } else { + throw new UnsupportedOperationException( + "Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName()); + } + } + }; + } + + private static DeserializationRuntimeConverter createDecimalConverter(DecimalType decimalType) { + final int precision = decimalType.getPrecision(); + final int scale = decimalType.getScale(); + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + BigDecimal bigDecimal; + if (dbzObj instanceof byte[]) { + // decimal.handling.mode=precise + bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj); + } else if (dbzObj instanceof String) { + // decimal.handling.mode=string + bigDecimal = new BigDecimal((String) dbzObj); + } else if (dbzObj instanceof Double) { + // decimal.handling.mode=double + bigDecimal = BigDecimal.valueOf((Double) dbzObj); + } else { + if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) { + SpecialValueDecimal decimal = + VariableScaleDecimal.toLogical((Struct) dbzObj); + bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO); + } else { + // fallback to string + bigDecimal = new BigDecimal(dbzObj.toString()); + } + } + return DecimalData.fromBigDecimal(bigDecimal, precision, scale); + } + }; + } + + private static DeserializationRuntimeConverter createRowConverter( + RowType rowType, + ZoneId serverTimeZone, + DeserializationRuntimeConverterFactory userDefinedConverterFactory) { + final DeserializationRuntimeConverter[] fieldConverters = + rowType.getFields().stream() + .map(RowType.RowField::getType) + .map( + logicType -> createConverter( + logicType, + serverTimeZone, + userDefinedConverterFactory)) + .toArray(DeserializationRuntimeConverter[]::new); + final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]); + + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) throws Exception { + Struct struct = (Struct) dbzObj; + int arity = fieldNames.length; + GenericRowData row = new GenericRowData(arity); + for (int i = 0; i < arity; i++) { + String fieldName = fieldNames[i]; + Field field = schema.field(fieldName); + if (field == null) { + row.setField(i, null); + } else { + Object fieldValue = struct.getWithoutDefault(fieldName); + Schema fieldSchema = schema.field(fieldName).schema(); + Object convertedField = + convertField(fieldConverters[i], fieldValue, fieldSchema); + row.setField(i, convertedField); + } + } + return row; + } + }; + } + + private static Object convertField( + DeserializationRuntimeConverter fieldConverter, Object fieldValue, Schema fieldSchema) + throws Exception { + if (fieldValue == null) { + return null; + } else { + return fieldConverter.convert(fieldValue, fieldSchema); + } + } + + private static DeserializationRuntimeConverter wrapIntoNullableConverter( + DeserializationRuntimeConverter converter) { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) throws Exception { + if (dbzObj == null) { + return null; + } + return converter.convert(dbzObj, schema); + } + }; + } +} diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE index 4c8eca64913..46dcf33184f 100644 --- a/licenses/inlong-sort-connectors/LICENSE +++ b/licenses/inlong-sort-connectors/LICENSE @@ -850,6 +850,14 @@ Source : org.apache.flink:flink-connector-pulsar:4.1.0-1.18 (Please note that the software have been modified.) License : https://github.com/apache/flink-connector-pulsar/blob/main/LICENSE +1.3.24 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgresValueValidator.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java +Source : com.ververica:flink-connector-mongodb-cdc:2.3.0 (Please note that the software have been modified.) +License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE + + ======================================================================= Apache InLong Subcomponents: From eae60a6c75c5320e55cf085d0aad0b75d56ad5d0 Mon Sep 17 00:00:00 2001 From: XiaoYou201 Date: Mon, 13 May 2024 10:52:59 +0800 Subject: [PATCH 2/8] [INLONG-10193][Sort] Add reference. --- .../apache/inlong/sort/postgre/PostgreSQLTableFactory.java | 5 ++++- .../apache/inlong/sort/postgre/PostgreSQLTableSource.java | 2 ++ .../apache/inlong/sort/postgre/PostgresValueValidator.java | 5 ++++- .../sort/postgre/RowDataDebeziumDeserializeSchema.java | 2 ++ 4 files changed, 12 insertions(+), 2 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java index 1ec6403ffea..7016f631e68 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java @@ -39,7 +39,10 @@ import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; -/** Factory for creating configured instance of {@link PostgreSQLTableSource}. */ +/** Factory for creating configured instance of {@link PostgreSQLTableSource}. + * + * Copy from com.ververica:flink-connector-postgres-cdc-2.3.0 + * */ public class PostgreSQLTableFactory implements DynamicTableSourceFactory { private static final String IDENTIFIER = "postgres-cdc-inlong"; diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java index fbea3037b08..c2f9bb02d39 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java @@ -50,6 +50,8 @@ /** * A {@link DynamicTableSource} that describes how to create a PostgreSQL source from a logical * description. + * + * Copy from com.ververica:flink-connector-postgres-cdc-2.3.0 */ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMetadata { diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgresValueValidator.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgresValueValidator.java index 48aafbe2a0b..f50dafd27c2 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgresValueValidator.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgresValueValidator.java @@ -20,7 +20,10 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.types.RowKind; -/** The {@link RowDataDebeziumDeserializeSchema.ValueValidator} for Postgres connector. */ +/** The {@link RowDataDebeziumDeserializeSchema.ValueValidator} for Postgres connector. + * + * Copy from com.ververica:flink-connector-postgres-cdc-2.3.0 + * */ public final class PostgresValueValidator implements RowDataDebeziumDeserializeSchema.ValueValidator { diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java index 4e8dd50e301..544fc8d1ffa 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java @@ -66,6 +66,8 @@ /** * Deserialization schema from Debezium object to Flink Table/SQL internal data structure {@link * RowData}. + * + * Copy from com.ververica:flink-connector-postgres-cdc-2.3.0 */ public final class RowDataDebeziumDeserializeSchema implements From 8e4272b781286055db49ebc6cc4312f8b2d327af Mon Sep 17 00:00:00 2001 From: XiaoYou201 Date: Mon, 13 May 2024 17:02:25 +0800 Subject: [PATCH 3/8] [INLONG-10193][Sort] fix bug. --- .../org/apache/inlong/sort/postgre/PostgreSQLTableSource.java | 1 + 1 file changed, 1 insertion(+) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java index c2f9bb02d39..413f3243c79 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java @@ -138,6 +138,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { PostgreSQLDeserializationConverterFactory.instance()) .setValueValidator(new PostgresValueValidator(schemaName, tableName)) .setChangelogMode(changelogMode) + .setSourceMetricData(metricOption) .build(); DebeziumSourceFunction sourceFunction = PostgreSQLSource.builder() From 5f99602505481c038d9e597e319ee810d90bc67c Mon Sep 17 00:00:00 2001 From: XiaoYou201 Date: Tue, 14 May 2024 11:21:12 +0800 Subject: [PATCH 4/8] [INLONG-10193][Sort] fix defect. --- .../apache/inlong/sort/postgre/PostgreSQLTableSource.java | 3 ++- .../sort/postgre/RowDataDebeziumDeserializeSchema.java | 6 ++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java index 413f3243c79..1eb93953739 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java @@ -36,6 +36,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; +import org.apache.inlong.sort.base.metric.SourceMetricData; import java.util.Collections; import java.util.List; @@ -138,7 +139,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { PostgreSQLDeserializationConverterFactory.instance()) .setValueValidator(new PostgresValueValidator(schemaName, tableName)) .setChangelogMode(changelogMode) - .setSourceMetricData(metricOption) + .setSourceMetricData(new SourceMetricData(metricOption)) .build(); DebeziumSourceFunction sourceFunction = PostgreSQLSource.builder() diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java index 544fc8d1ffa..f7d9cb99f6a 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java @@ -244,10 +244,8 @@ public Builder setChangelogMode(DebeziumChangelogMode changelogMode) { this.changelogMode = changelogMode; return this; } - public Builder setSourceMetricData(MetricOption metricOption) { - if (metricOption != null) { - this.sourceMetricData = new SourceMetricData(metricOption); - } + public Builder setSourceMetricData(SourceMetricData sourceMetricData) { + this.sourceMetricData = sourceMetricData; return this; } From 643c7fb958f57546c4db44187ba67fbe9b5453c5 Mon Sep 17 00:00:00 2001 From: XiaoYou201 Date: Tue, 14 May 2024 11:49:55 +0800 Subject: [PATCH 5/8] [INLONG-10193][Sort] fix format. --- .../org/apache/inlong/sort/postgre/PostgreSQLTableSource.java | 2 +- .../inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java index 1eb93953739..5db39e48450 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java @@ -18,6 +18,7 @@ package org.apache.inlong.sort.postgre; import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.SourceMetricData; import com.ververica.cdc.connectors.postgres.PostgreSQLSource; import com.ververica.cdc.connectors.postgres.table.PostgreSQLDeserializationConverterFactory; @@ -36,7 +37,6 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; -import org.apache.inlong.sort.base.metric.SourceMetricData; import java.util.Collections; import java.util.List; diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java index f7d9cb99f6a..ce244c067c0 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java @@ -17,7 +17,6 @@ package org.apache.inlong.sort.postgre; -import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricsCollector; import org.apache.inlong.sort.base.metric.SourceMetricData; From 4e636f4534215dd1aa0c9c9c5553348a825c90be Mon Sep 17 00:00:00 2001 From: XiaoYou201 Date: Tue, 14 May 2024 14:54:04 +0800 Subject: [PATCH 6/8] [INLONG-10193][Sort] fix bug. --- .../org/apache/inlong/sort/postgre/PostgreSQLTableSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java index 5db39e48450..d69a9d7795e 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java @@ -139,7 +139,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { PostgreSQLDeserializationConverterFactory.instance()) .setValueValidator(new PostgresValueValidator(schemaName, tableName)) .setChangelogMode(changelogMode) - .setSourceMetricData(new SourceMetricData(metricOption)) + .setSourceMetricData(metricOption == null ? null : new SourceMetricData(metricOption)) .build(); DebeziumSourceFunction sourceFunction = PostgreSQLSource.builder() From 9098c22297e8eb91c4b6a36efe5105891d4f23e4 Mon Sep 17 00:00:00 2001 From: XiaoYou201 Date: Tue, 14 May 2024 16:02:37 +0800 Subject: [PATCH 7/8] [INLONG-10193][Sort] fix format and bug. --- .../sort/postgre/PostgreSQLTableFactory.java | 2 +- .../sort/postgre/PostgreSQLTableSource.java | 4 ++-- .../sort/postgre/PostgresValueValidator.java | 2 +- .../RowDataDebeziumDeserializeSchema.java | 23 ++++++++----------- 4 files changed, 14 insertions(+), 17 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java index 7016f631e68..67ac3e01d9e 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java @@ -40,7 +40,7 @@ import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; /** Factory for creating configured instance of {@link PostgreSQLTableSource}. - * + *

* Copy from com.ververica:flink-connector-postgres-cdc-2.3.0 * */ public class PostgreSQLTableFactory implements DynamicTableSourceFactory { diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java index d69a9d7795e..22fb76a4e10 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java @@ -51,7 +51,7 @@ /** * A {@link DynamicTableSource} that describes how to create a PostgreSQL source from a logical * description. - * + *

* Copy from com.ververica:flink-connector-postgres-cdc-2.3.0 */ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMetadata { @@ -78,7 +78,7 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe /** Metadata that is appended at the end of a physical source row. */ protected List metadataKeys; - private MetricOption metricOption; + private final MetricOption metricOption; public PostgreSQLTableSource( ResolvedSchema physicalSchema, int port, diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgresValueValidator.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgresValueValidator.java index f50dafd27c2..a32d5f9a061 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgresValueValidator.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgresValueValidator.java @@ -21,7 +21,7 @@ import org.apache.flink.types.RowKind; /** The {@link RowDataDebeziumDeserializeSchema.ValueValidator} for Postgres connector. - * + *

* Copy from com.ververica:flink-connector-postgres-cdc-2.3.0 * */ public final class PostgresValueValidator diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java index ce244c067c0..9a2beb2fc90 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java @@ -65,12 +65,10 @@ /** * Deserialization schema from Debezium object to Flink Table/SQL internal data structure {@link * RowData}. - * + *

* Copy from com.ververica:flink-connector-postgres-cdc-2.3.0 */ -public final class RowDataDebeziumDeserializeSchema - implements - DebeziumDeserializationSchema { +public final class RowDataDebeziumDeserializeSchema implements DebeziumDeserializationSchema { private static final long serialVersionUID = 2L; @@ -102,7 +100,7 @@ public interface ValueValidator extends Serializable { /** Changelog Mode to use for encoding changes in Flink internal data structure. */ private final DebeziumChangelogMode changelogMode; - private SourceMetricData sourceMetricData; + private final SourceMetricData sourceMetricData; /** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */ public static Builder newBuilder() { @@ -140,7 +138,9 @@ public void deserialize(SourceRecord record, Collector out) throws Exce GenericRowData insert = extractAfterRow(value, valueSchema); validator.validate(insert, RowKind.INSERT); insert.setRowKind(RowKind.INSERT); - out = new MetricsCollector<>(out, sourceMetricData); + if (sourceMetricData != null){ + out = new MetricsCollector<>(out, sourceMetricData); + } emit(record, insert, out); } else if (op == Envelope.Operation.DELETE) { GenericRowData delete = extractBeforeRow(value, valueSchema); @@ -158,7 +158,9 @@ public void deserialize(SourceRecord record, Collector out) throws Exce GenericRowData after = extractAfterRow(value, valueSchema); validator.validate(after, RowKind.UPDATE_AFTER); after.setRowKind(RowKind.UPDATE_AFTER); - out = new MetricsCollector<>(out, sourceMetricData); + if (sourceMetricData != null){ + out = new MetricsCollector<>(out, sourceMetricData); + } emit(record, after, out); } } @@ -202,7 +204,7 @@ public static class Builder { private MetadataConverter[] metadataConverters = new MetadataConverter[0]; private ValueValidator validator = (rowData, rowKind) -> { }; - private ZoneId serverTimeZone = ZoneId.of("UTC"); + private final ZoneId serverTimeZone = ZoneId.of("UTC"); private DeserializationRuntimeConverterFactory userDefinedConverterFactory = DeserializationRuntimeConverterFactory.DEFAULT; private DebeziumChangelogMode changelogMode = DebeziumChangelogMode.ALL; @@ -228,11 +230,6 @@ public Builder setValueValidator(ValueValidator validator) { return this; } - public Builder setServerTimeZone(ZoneId serverTimeZone) { - this.serverTimeZone = serverTimeZone; - return this; - } - public Builder setUserDefinedConverterFactory( DeserializationRuntimeConverterFactory userDefinedConverterFactory) { this.userDefinedConverterFactory = userDefinedConverterFactory; From 9f32ac5fe08c4e8d8321ec396150610a080262fc Mon Sep 17 00:00:00 2001 From: XiaoYou201 Date: Tue, 14 May 2024 16:28:06 +0800 Subject: [PATCH 8/8] [INLONG-10193][Sort] fix format and bug. --- .../inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java index 9a2beb2fc90..c6cf4e0d545 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java @@ -138,7 +138,7 @@ public void deserialize(SourceRecord record, Collector out) throws Exce GenericRowData insert = extractAfterRow(value, valueSchema); validator.validate(insert, RowKind.INSERT); insert.setRowKind(RowKind.INSERT); - if (sourceMetricData != null){ + if (sourceMetricData != null) { out = new MetricsCollector<>(out, sourceMetricData); } emit(record, insert, out); @@ -158,7 +158,7 @@ public void deserialize(SourceRecord record, Collector out) throws Exce GenericRowData after = extractAfterRow(value, valueSchema); validator.validate(after, RowKind.UPDATE_AFTER); after.setRowKind(RowKind.UPDATE_AFTER); - if (sourceMetricData != null){ + if (sourceMetricData != null) { out = new MetricsCollector<>(out, sourceMetricData); } emit(record, after, out);