diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java
index 9c659995ca2..67ac3e01d9e 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java
@@ -17,14 +17,14 @@
package org.apache.inlong.sort.postgre;
-import com.ververica.cdc.connectors.postgres.table.PostgreSQLTableSource;
+import org.apache.inlong.sort.base.metric.MetricOption;
+
import com.ververica.cdc.debezium.table.DebeziumChangelogMode;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
@@ -39,7 +39,10 @@
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
-/** Factory for creating configured instance of {@link PostgreSQLTableSource}. */
+/** Factory for creating configured instance of {@link PostgreSQLTableSource}.
+ *
+ * Copy from com.ververica:flink-connector-postgres-cdc-2.3.0
+ * */
public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
private static final String IDENTIFIER = "postgres-cdc-inlong";
@@ -116,7 +119,7 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
+ "\"upsert\": Encodes changes as upsert stream that describes idempotent updates on a key. It can be used for tables with primary keys when replica identity FULL is not an option.");
@Override
- public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
+ public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
helper.validateExcept(DEBEZIUM_OPTIONS_PREFIX);
@@ -139,6 +142,14 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c
physicalSchema.getPrimaryKey().isPresent(),
"Primary key must be present when upsert mode is selected.");
}
+ String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
+ String auditHostAndPorts = config.get(INLONG_AUDIT);
+ String auditKeys = config.get(AUDIT_KEYS);
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withAuditAddress(auditHostAndPorts)
+ .withAuditKeys(auditKeys)
+ .build();
return new PostgreSQLTableSource(
physicalSchema,
@@ -152,7 +163,8 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c
pluginName,
slotName,
changelogMode,
- getDebeziumProperties(context.getCatalogTable().getOptions()));
+ getDebeziumProperties(context.getCatalogTable().getOptions()),
+ metricOption);
}
@Override
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java
new file mode 100644
index 00000000000..22fb76a4e10
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.postgre;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+
+import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
+import com.ververica.cdc.connectors.postgres.table.PostgreSQLDeserializationConverterFactory;
+import com.ververica.cdc.connectors.postgres.table.PostgreSQLReadableMetadata;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.DebeziumSourceFunction;
+import com.ververica.cdc.debezium.table.DebeziumChangelogMode;
+import com.ververica.cdc.debezium.table.MetadataConverter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link DynamicTableSource} that describes how to create a PostgreSQL source from a logical
+ * description.
+ *
+ * Copy from com.ververica:flink-connector-postgres-cdc-2.3.0
+ */
+public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMetadata {
+
+ private final ResolvedSchema physicalSchema;
+ private final int port;
+ private final String hostname;
+ private final String database;
+ private final String schemaName;
+ private final String tableName;
+ private final String username;
+ private final String password;
+ private final String pluginName;
+ private final String slotName;
+ private final DebeziumChangelogMode changelogMode;
+ private final Properties dbzProperties;
+
+ // --------------------------------------------------------------------------------------------
+ // Mutable attributes
+ // --------------------------------------------------------------------------------------------
+
+ /** Data type that describes the final output of the source. */
+ protected DataType producedDataType;
+
+ /** Metadata that is appended at the end of a physical source row. */
+ protected List metadataKeys;
+ private final MetricOption metricOption;
+ public PostgreSQLTableSource(
+ ResolvedSchema physicalSchema,
+ int port,
+ String hostname,
+ String database,
+ String schemaName,
+ String tableName,
+ String username,
+ String password,
+ String pluginName,
+ String slotName,
+ DebeziumChangelogMode changelogMode,
+ Properties dbzProperties,
+ MetricOption metricOption) {
+ this.physicalSchema = physicalSchema;
+ this.port = port;
+ this.hostname = checkNotNull(hostname);
+ this.database = checkNotNull(database);
+ this.schemaName = checkNotNull(schemaName);
+ this.tableName = checkNotNull(tableName);
+ this.username = checkNotNull(username);
+ this.password = checkNotNull(password);
+ this.pluginName = checkNotNull(pluginName);
+ this.slotName = slotName;
+ this.changelogMode = changelogMode;
+ this.dbzProperties = dbzProperties;
+ this.producedDataType = physicalSchema.toPhysicalRowDataType();
+ this.metadataKeys = Collections.emptyList();
+ this.metricOption = metricOption;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ switch (changelogMode) {
+ case UPSERT:
+ return ChangelogMode.upsert();
+ case ALL:
+ return ChangelogMode.all();
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported changelog mode: " + changelogMode);
+ }
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+ RowType physicalDataType =
+ (RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
+ MetadataConverter[] metadataConverters = getMetadataConverters();
+ TypeInformation typeInfo = scanContext.createTypeInformation(producedDataType);
+
+ DebeziumDeserializationSchema deserializer =
+ RowDataDebeziumDeserializeSchema.newBuilder()
+ .setPhysicalRowType(physicalDataType)
+ .setMetadataConverters(metadataConverters)
+ .setResultTypeInfo(typeInfo)
+ .setUserDefinedConverterFactory(
+ PostgreSQLDeserializationConverterFactory.instance())
+ .setValueValidator(new PostgresValueValidator(schemaName, tableName))
+ .setChangelogMode(changelogMode)
+ .setSourceMetricData(metricOption == null ? null : new SourceMetricData(metricOption))
+ .build();
+ DebeziumSourceFunction sourceFunction =
+ PostgreSQLSource.builder()
+ .hostname(hostname)
+ .port(port)
+ .database(database)
+ .schemaList(schemaName)
+ .tableList(schemaName + "." + tableName)
+ .username(username)
+ .password(password)
+ .decodingPluginName(pluginName)
+ .slotName(slotName)
+ .debeziumProperties(dbzProperties)
+ .deserializer(deserializer)
+ .build();
+ return SourceFunctionProvider.of(sourceFunction, false);
+ }
+
+ private MetadataConverter[] getMetadataConverters() {
+ if (metadataKeys.isEmpty()) {
+ return new MetadataConverter[0];
+ }
+
+ return metadataKeys.stream()
+ .map(
+ key -> Stream.of(PostgreSQLReadableMetadata.values())
+ .filter(m -> m.getKey().equals(key))
+ .findFirst()
+ .orElseThrow(IllegalStateException::new))
+ .map(PostgreSQLReadableMetadata::getConverter)
+ .toArray(MetadataConverter[]::new);
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ PostgreSQLTableSource source =
+ new PostgreSQLTableSource(
+ physicalSchema,
+ port,
+ hostname,
+ database,
+ schemaName,
+ tableName,
+ username,
+ password,
+ pluginName,
+ slotName,
+ changelogMode,
+ dbzProperties,
+ metricOption);
+ source.metadataKeys = metadataKeys;
+ source.producedDataType = producedDataType;
+ return source;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PostgreSQLTableSource that = (PostgreSQLTableSource) o;
+ return port == that.port
+ && Objects.equals(physicalSchema, that.physicalSchema)
+ && Objects.equals(hostname, that.hostname)
+ && Objects.equals(database, that.database)
+ && Objects.equals(schemaName, that.schemaName)
+ && Objects.equals(tableName, that.tableName)
+ && Objects.equals(username, that.username)
+ && Objects.equals(password, that.password)
+ && Objects.equals(pluginName, that.pluginName)
+ && Objects.equals(slotName, that.slotName)
+ && Objects.equals(dbzProperties, that.dbzProperties)
+ && Objects.equals(producedDataType, that.producedDataType)
+ && Objects.equals(metadataKeys, that.metadataKeys)
+ && Objects.equals(changelogMode, that.changelogMode)
+ && Objects.equals(metricOption, that.metricOption);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ physicalSchema,
+ port,
+ hostname,
+ database,
+ schemaName,
+ tableName,
+ username,
+ password,
+ pluginName,
+ slotName,
+ dbzProperties,
+ producedDataType,
+ metadataKeys,
+ changelogMode);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "PostgreSQL-CDC";
+ }
+
+ @Override
+ public Map listReadableMetadata() {
+ return Stream.of(PostgreSQLReadableMetadata.values())
+ .collect(
+ Collectors.toMap(
+ PostgreSQLReadableMetadata::getKey,
+ PostgreSQLReadableMetadata::getDataType));
+ }
+
+ @Override
+ public void applyReadableMetadata(List metadataKeys, DataType producedDataType) {
+ this.metadataKeys = metadataKeys;
+ this.producedDataType = producedDataType;
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgresValueValidator.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgresValueValidator.java
new file mode 100644
index 00000000000..a32d5f9a061
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgresValueValidator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.postgre;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+
+/** The {@link RowDataDebeziumDeserializeSchema.ValueValidator} for Postgres connector.
+ *
+ * Copy from com.ververica:flink-connector-postgres-cdc-2.3.0
+ * */
+public final class PostgresValueValidator
+ implements
+ RowDataDebeziumDeserializeSchema.ValueValidator {
+
+ private static final long serialVersionUID = -1870679469578028765L;
+
+ private static final String REPLICA_IDENTITY_EXCEPTION =
+ "The \"before\" field of UPDATE/DELETE message is null, "
+ + "please check the Postgres table has been set REPLICA IDENTITY to FULL level. "
+ + "You can update the setting by running the command in Postgres 'ALTER TABLE %s REPLICA IDENTITY FULL'. "
+ + "Please see more in Debezium documentation: https://debezium.io/documentation/reference/1.5/connectors/postgresql.html#postgresql-replica-identity";
+
+ private final String schemaTable;
+
+ public PostgresValueValidator(String schema, String table) {
+ this.schemaTable = schema + "." + table;
+ }
+
+ @Override
+ public void validate(RowData rowData, RowKind rowKind) throws Exception {
+ if (rowData == null) {
+ throw new IllegalStateException(String.format(REPLICA_IDENTITY_EXCEPTION, schemaTable));
+ }
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java
new file mode 100644
index 00000000000..c6cf4e0d545
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java
@@ -0,0 +1,678 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.postgre;
+
+import org.apache.inlong.sort.base.metric.MetricsCollector;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.table.AppendMetadataCollector;
+import com.ververica.cdc.debezium.table.DebeziumChangelogMode;
+import com.ververica.cdc.debezium.table.DeserializationRuntimeConverter;
+import com.ververica.cdc.debezium.table.DeserializationRuntimeConverterFactory;
+import com.ververica.cdc.debezium.table.MetadataConverter;
+import com.ververica.cdc.debezium.utils.TemporalConversions;
+import io.debezium.data.Envelope;
+import io.debezium.data.SpecialValueDecimal;
+import io.debezium.data.VariableScaleDecimal;
+import io.debezium.time.MicroTime;
+import io.debezium.time.MicroTimestamp;
+import io.debezium.time.NanoTime;
+import io.debezium.time.NanoTimestamp;
+import io.debezium.time.Timestamp;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.data.Decimal;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Deserialization schema from Debezium object to Flink Table/SQL internal data structure {@link
+ * RowData}.
+ *
+ * Copy from com.ververica:flink-connector-postgres-cdc-2.3.0
+ */
+public final class RowDataDebeziumDeserializeSchema implements DebeziumDeserializationSchema {
+
+ private static final long serialVersionUID = 2L;
+
+ /** Custom validator to validate the row value. */
+ public interface ValueValidator extends Serializable {
+
+ void validate(RowData rowData, RowKind rowKind) throws Exception;
+ }
+
+ /** TypeInformation of the produced {@link RowData}. * */
+ private final TypeInformation resultTypeInfo;
+
+ /**
+ * Runtime converter that converts Kafka {@link SourceRecord}s into {@link RowData} consisted of
+ * physical column values.
+ */
+ private final DeserializationRuntimeConverter physicalConverter;
+
+ /** Whether the deserializer needs to handle metadata columns. */
+ private final boolean hasMetadata;
+
+ /**
+ * A wrapped output collector which is used to append metadata columns after physical columns.
+ */
+ private final AppendMetadataCollector appendMetadataCollector;
+
+ /** Validator to validate the row value. */
+ private final ValueValidator validator;
+
+ /** Changelog Mode to use for encoding changes in Flink internal data structure. */
+ private final DebeziumChangelogMode changelogMode;
+ private final SourceMetricData sourceMetricData;
+
+ /** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ RowDataDebeziumDeserializeSchema(
+ RowType physicalDataType,
+ MetadataConverter[] metadataConverters,
+ TypeInformation resultTypeInfo,
+ ValueValidator validator,
+ ZoneId serverTimeZone,
+ DeserializationRuntimeConverterFactory userDefinedConverterFactory,
+ DebeziumChangelogMode changelogMode,
+ SourceMetricData sourceMetricData) {
+ this.hasMetadata = checkNotNull(metadataConverters).length > 0;
+ this.appendMetadataCollector = new AppendMetadataCollector(metadataConverters);
+ this.physicalConverter =
+ createConverter(
+ checkNotNull(physicalDataType),
+ serverTimeZone,
+ userDefinedConverterFactory);
+ this.resultTypeInfo = checkNotNull(resultTypeInfo);
+ this.validator = checkNotNull(validator);
+ this.changelogMode = checkNotNull(changelogMode);
+ this.sourceMetricData = sourceMetricData;
+ }
+
+ @Override
+ public void deserialize(SourceRecord record, Collector out) throws Exception {
+ Envelope.Operation op = Envelope.operationFor(record);
+ Struct value = (Struct) record.value();
+ Schema valueSchema = record.valueSchema();
+ if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
+ GenericRowData insert = extractAfterRow(value, valueSchema);
+ validator.validate(insert, RowKind.INSERT);
+ insert.setRowKind(RowKind.INSERT);
+ if (sourceMetricData != null) {
+ out = new MetricsCollector<>(out, sourceMetricData);
+ }
+ emit(record, insert, out);
+ } else if (op == Envelope.Operation.DELETE) {
+ GenericRowData delete = extractBeforeRow(value, valueSchema);
+ validator.validate(delete, RowKind.DELETE);
+ delete.setRowKind(RowKind.DELETE);
+ emit(record, delete, out);
+ } else {
+ if (changelogMode == DebeziumChangelogMode.ALL) {
+ GenericRowData before = extractBeforeRow(value, valueSchema);
+ validator.validate(before, RowKind.UPDATE_BEFORE);
+ before.setRowKind(RowKind.UPDATE_BEFORE);
+ emit(record, before, out);
+ }
+
+ GenericRowData after = extractAfterRow(value, valueSchema);
+ validator.validate(after, RowKind.UPDATE_AFTER);
+ after.setRowKind(RowKind.UPDATE_AFTER);
+ if (sourceMetricData != null) {
+ out = new MetricsCollector<>(out, sourceMetricData);
+ }
+ emit(record, after, out);
+ }
+ }
+
+ private GenericRowData extractAfterRow(Struct value, Schema valueSchema) throws Exception {
+ Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema();
+ Struct after = value.getStruct(Envelope.FieldName.AFTER);
+ return (GenericRowData) physicalConverter.convert(after, afterSchema);
+ }
+
+ private GenericRowData extractBeforeRow(Struct value, Schema valueSchema) throws Exception {
+ Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema();
+ Struct before = value.getStruct(Envelope.FieldName.BEFORE);
+ return (GenericRowData) physicalConverter.convert(before, beforeSchema);
+ }
+
+ private void emit(SourceRecord inRecord, RowData physicalRow, Collector collector) {
+ if (!hasMetadata) {
+ collector.collect(physicalRow);
+ return;
+ }
+ appendMetadataCollector.inputRecord = inRecord;
+ appendMetadataCollector.outputCollector = collector;
+ appendMetadataCollector.collect(physicalRow);
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return resultTypeInfo;
+ }
+
+ // -------------------------------------------------------------------------------------
+ // Builder
+ // -------------------------------------------------------------------------------------
+
+ /** Builder of {@link RowDataDebeziumDeserializeSchema}. */
+ public static class Builder {
+
+ private RowType physicalRowType;
+ private TypeInformation resultTypeInfo;
+ private MetadataConverter[] metadataConverters = new MetadataConverter[0];
+ private ValueValidator validator = (rowData, rowKind) -> {
+ };
+ private final ZoneId serverTimeZone = ZoneId.of("UTC");
+ private DeserializationRuntimeConverterFactory userDefinedConverterFactory =
+ DeserializationRuntimeConverterFactory.DEFAULT;
+ private DebeziumChangelogMode changelogMode = DebeziumChangelogMode.ALL;
+ private SourceMetricData sourceMetricData;
+
+ public Builder setPhysicalRowType(RowType physicalRowType) {
+ this.physicalRowType = physicalRowType;
+ return this;
+ }
+
+ public Builder setMetadataConverters(MetadataConverter[] metadataConverters) {
+ this.metadataConverters = metadataConverters;
+ return this;
+ }
+
+ public Builder setResultTypeInfo(TypeInformation resultTypeInfo) {
+ this.resultTypeInfo = resultTypeInfo;
+ return this;
+ }
+
+ public Builder setValueValidator(ValueValidator validator) {
+ this.validator = validator;
+ return this;
+ }
+
+ public Builder setUserDefinedConverterFactory(
+ DeserializationRuntimeConverterFactory userDefinedConverterFactory) {
+ this.userDefinedConverterFactory = userDefinedConverterFactory;
+ return this;
+ }
+
+ public Builder setChangelogMode(DebeziumChangelogMode changelogMode) {
+ this.changelogMode = changelogMode;
+ return this;
+ }
+ public Builder setSourceMetricData(SourceMetricData sourceMetricData) {
+ this.sourceMetricData = sourceMetricData;
+ return this;
+ }
+
+ public RowDataDebeziumDeserializeSchema build() {
+ return new RowDataDebeziumDeserializeSchema(
+ physicalRowType,
+ metadataConverters,
+ resultTypeInfo,
+ validator,
+ serverTimeZone,
+ userDefinedConverterFactory,
+ changelogMode,
+ sourceMetricData);
+ }
+ }
+
+ // -------------------------------------------------------------------------------------
+ // Runtime Converters
+ // -------------------------------------------------------------------------------------
+
+ /** Creates a runtime converter which is null safe. */
+ private static DeserializationRuntimeConverter createConverter(
+ LogicalType type,
+ ZoneId serverTimeZone,
+ DeserializationRuntimeConverterFactory userDefinedConverterFactory) {
+ return wrapIntoNullableConverter(
+ createNotNullConverter(type, serverTimeZone, userDefinedConverterFactory));
+ }
+
+ // --------------------------------------------------------------------------------
+ // IMPORTANT! We use anonymous classes instead of lambdas for a reason here. It is
+ // necessary because the maven shade plugin cannot relocate classes in
+ // SerializedLambdas (MSHADE-260).
+ // --------------------------------------------------------------------------------
+
+ /** Creates a runtime converter which assuming input object is not null. */
+ public static DeserializationRuntimeConverter createNotNullConverter(
+ LogicalType type,
+ ZoneId serverTimeZone,
+ DeserializationRuntimeConverterFactory userDefinedConverterFactory) {
+ // user defined converter has a higher resolve order
+ Optional converter =
+ userDefinedConverterFactory.createUserDefinedConverter(type, serverTimeZone);
+ if (converter.isPresent()) {
+ return converter.get();
+ }
+
+ // if no matched user defined converter, fallback to the default converter
+ switch (type.getTypeRoot()) {
+ case NULL:
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ return null;
+ }
+ };
+ case BOOLEAN:
+ return convertToBoolean();
+ case TINYINT:
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ return Byte.parseByte(dbzObj.toString());
+ }
+ };
+ case SMALLINT:
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ return Short.parseShort(dbzObj.toString());
+ }
+ };
+ case INTEGER:
+ case INTERVAL_YEAR_MONTH:
+ return convertToInt();
+ case BIGINT:
+ case INTERVAL_DAY_TIME:
+ return convertToLong();
+ case DATE:
+ return convertToDate();
+ case TIME_WITHOUT_TIME_ZONE:
+ return convertToTime();
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return convertToTimestamp(serverTimeZone);
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return convertToLocalTimeZoneTimestamp(serverTimeZone);
+ case FLOAT:
+ return convertToFloat();
+ case DOUBLE:
+ return convertToDouble();
+ case CHAR:
+ case VARCHAR:
+ return convertToString();
+ case BINARY:
+ case VARBINARY:
+ return convertToBinary();
+ case DECIMAL:
+ return createDecimalConverter((DecimalType) type);
+ case ROW:
+ return createRowConverter(
+ (RowType) type, serverTimeZone, userDefinedConverterFactory);
+ case ARRAY:
+ case MAP:
+ case MULTISET:
+ case RAW:
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + type);
+ }
+ }
+
+ private static DeserializationRuntimeConverter convertToBoolean() {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Boolean) {
+ return dbzObj;
+ } else if (dbzObj instanceof Byte) {
+ return (byte) dbzObj == 1;
+ } else if (dbzObj instanceof Short) {
+ return (short) dbzObj == 1;
+ } else {
+ return Boolean.parseBoolean(dbzObj.toString());
+ }
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToInt() {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Integer) {
+ return dbzObj;
+ } else if (dbzObj instanceof Long) {
+ return ((Long) dbzObj).intValue();
+ } else {
+ return Integer.parseInt(dbzObj.toString());
+ }
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToLong() {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Integer) {
+ return ((Integer) dbzObj).longValue();
+ } else if (dbzObj instanceof Long) {
+ return dbzObj;
+ } else {
+ return Long.parseLong(dbzObj.toString());
+ }
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToDouble() {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Float) {
+ return ((Float) dbzObj).doubleValue();
+ } else if (dbzObj instanceof Double) {
+ return dbzObj;
+ } else {
+ return Double.parseDouble(dbzObj.toString());
+ }
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToFloat() {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Float) {
+ return dbzObj;
+ } else if (dbzObj instanceof Double) {
+ return ((Double) dbzObj).floatValue();
+ } else {
+ return Float.parseFloat(dbzObj.toString());
+ }
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToDate() {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ return (int) TemporalConversions.toLocalDate(dbzObj).toEpochDay();
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToTime() {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Long) {
+ switch (schema.name()) {
+ case MicroTime.SCHEMA_NAME:
+ return (int) ((long) dbzObj / 1000);
+ case NanoTime.SCHEMA_NAME:
+ return (int) ((long) dbzObj / 1000_000);
+ }
+ } else if (dbzObj instanceof Integer) {
+ return dbzObj;
+ }
+ // get number of milliseconds of the day
+ return TemporalConversions.toLocalTime(dbzObj).toSecondOfDay() * 1000;
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToTimestamp(ZoneId serverTimeZone) {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Long) {
+ switch (schema.name()) {
+ case Timestamp.SCHEMA_NAME:
+ return TimestampData.fromEpochMillis((Long) dbzObj);
+ case MicroTimestamp.SCHEMA_NAME:
+ long micro = (long) dbzObj;
+ return TimestampData.fromEpochMillis(
+ micro / 1000, (int) (micro % 1000 * 1000));
+ case NanoTimestamp.SCHEMA_NAME:
+ long nano = (long) dbzObj;
+ return TimestampData.fromEpochMillis(
+ nano / 1000_000, (int) (nano % 1000_000));
+ }
+ }
+ LocalDateTime localDateTime =
+ TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
+ return TimestampData.fromLocalDateTime(localDateTime);
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToLocalTimeZoneTimestamp(
+ ZoneId serverTimeZone) {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof String) {
+ String str = (String) dbzObj;
+ // TIMESTAMP_LTZ type is encoded in string type
+ Instant instant = Instant.parse(str);
+ return TimestampData.fromLocalDateTime(
+ LocalDateTime.ofInstant(instant, serverTimeZone));
+ }
+ throw new IllegalArgumentException(
+ "Unable to convert to TimestampData from unexpected value '"
+ + dbzObj
+ + "' of type "
+ + dbzObj.getClass().getName());
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToString() {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ return StringData.fromString(dbzObj.toString());
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToBinary() {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof byte[]) {
+ return dbzObj;
+ } else if (dbzObj instanceof ByteBuffer) {
+ ByteBuffer byteBuffer = (ByteBuffer) dbzObj;
+ byte[] bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ return bytes;
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName());
+ }
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter createDecimalConverter(DecimalType decimalType) {
+ final int precision = decimalType.getPrecision();
+ final int scale = decimalType.getScale();
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ BigDecimal bigDecimal;
+ if (dbzObj instanceof byte[]) {
+ // decimal.handling.mode=precise
+ bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj);
+ } else if (dbzObj instanceof String) {
+ // decimal.handling.mode=string
+ bigDecimal = new BigDecimal((String) dbzObj);
+ } else if (dbzObj instanceof Double) {
+ // decimal.handling.mode=double
+ bigDecimal = BigDecimal.valueOf((Double) dbzObj);
+ } else {
+ if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
+ SpecialValueDecimal decimal =
+ VariableScaleDecimal.toLogical((Struct) dbzObj);
+ bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO);
+ } else {
+ // fallback to string
+ bigDecimal = new BigDecimal(dbzObj.toString());
+ }
+ }
+ return DecimalData.fromBigDecimal(bigDecimal, precision, scale);
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter createRowConverter(
+ RowType rowType,
+ ZoneId serverTimeZone,
+ DeserializationRuntimeConverterFactory userDefinedConverterFactory) {
+ final DeserializationRuntimeConverter[] fieldConverters =
+ rowType.getFields().stream()
+ .map(RowType.RowField::getType)
+ .map(
+ logicType -> createConverter(
+ logicType,
+ serverTimeZone,
+ userDefinedConverterFactory))
+ .toArray(DeserializationRuntimeConverter[]::new);
+ final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
+
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) throws Exception {
+ Struct struct = (Struct) dbzObj;
+ int arity = fieldNames.length;
+ GenericRowData row = new GenericRowData(arity);
+ for (int i = 0; i < arity; i++) {
+ String fieldName = fieldNames[i];
+ Field field = schema.field(fieldName);
+ if (field == null) {
+ row.setField(i, null);
+ } else {
+ Object fieldValue = struct.getWithoutDefault(fieldName);
+ Schema fieldSchema = schema.field(fieldName).schema();
+ Object convertedField =
+ convertField(fieldConverters[i], fieldValue, fieldSchema);
+ row.setField(i, convertedField);
+ }
+ }
+ return row;
+ }
+ };
+ }
+
+ private static Object convertField(
+ DeserializationRuntimeConverter fieldConverter, Object fieldValue, Schema fieldSchema)
+ throws Exception {
+ if (fieldValue == null) {
+ return null;
+ } else {
+ return fieldConverter.convert(fieldValue, fieldSchema);
+ }
+ }
+
+ private static DeserializationRuntimeConverter wrapIntoNullableConverter(
+ DeserializationRuntimeConverter converter) {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) throws Exception {
+ if (dbzObj == null) {
+ return null;
+ }
+ return converter.convert(dbzObj, schema);
+ }
+ };
+ }
+}
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 970ef97ce6b..4bf18576907 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -850,11 +850,22 @@
Source : org.apache.flink:flink-connector-pulsar:4.1.0-1.18 (Please note that the software have been modified.)
License : https://github.com/apache/flink-connector-pulsar/blob/main/LICENSE
+
+1.3.23 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java
+ inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java
+ inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgresValueValidator.java
+ inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java
+Source : com.ververica:flink-connector-mongodb-cdc:2.3.0 (Please note that the software have been modified.)
+License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
+
+
+
1.3.24 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java
Source : com.ververica:flink-connector-mongodb-cdc:2.3.0 (Please note that the software have been modified.)
License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
+
=======================================================================
Apache InLong Subcomponents: