Skip to content

Commit

Permalink
[INLONG-10193][Sort] Postgres connector support audit ID (#10198)
Browse files Browse the repository at this point in the history
  • Loading branch information
XiaoYou201 authored May 15, 2024
1 parent d770484 commit 540fd51
Show file tree
Hide file tree
Showing 5 changed files with 1,019 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

package org.apache.inlong.sort.postgre;

import com.ververica.cdc.connectors.postgres.table.PostgreSQLTableSource;
import org.apache.inlong.sort.base.metric.MetricOption;

import com.ververica.cdc.debezium.table.DebeziumChangelogMode;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;

Expand All @@ -39,7 +39,10 @@
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;

/** Factory for creating configured instance of {@link PostgreSQLTableSource}. */
/** Factory for creating configured instance of {@link PostgreSQLTableSource}.
* <p>
* Copy from com.ververica:flink-connector-postgres-cdc-2.3.0
* */
public class PostgreSQLTableFactory implements DynamicTableSourceFactory {

private static final String IDENTIFIER = "postgres-cdc-inlong";
Expand Down Expand Up @@ -116,7 +119,7 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
+ "\"upsert\": Encodes changes as upsert stream that describes idempotent updates on a key. It can be used for tables with primary keys when replica identity FULL is not an option.");

@Override
public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
helper.validateExcept(DEBEZIUM_OPTIONS_PREFIX);
Expand All @@ -139,6 +142,14 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c
physicalSchema.getPrimaryKey().isPresent(),
"Primary key must be present when upsert mode is selected.");
}
String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
String auditHostAndPorts = config.get(INLONG_AUDIT);
String auditKeys = config.get(AUDIT_KEYS);
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
.withAuditAddress(auditHostAndPorts)
.withAuditKeys(auditKeys)
.build();

return new PostgreSQLTableSource(
physicalSchema,
Expand All @@ -152,7 +163,8 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c
pluginName,
slotName,
changelogMode,
getDebeziumProperties(context.getCatalogTable().getOptions()));
getDebeziumProperties(context.getCatalogTable().getOptions()),
metricOption);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.postgre;

import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.SourceMetricData;

import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
import com.ververica.cdc.connectors.postgres.table.PostgreSQLDeserializationConverterFactory;
import com.ververica.cdc.connectors.postgres.table.PostgreSQLReadableMetadata;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.table.DebeziumChangelogMode;
import com.ververica.cdc.debezium.table.MetadataConverter;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A {@link DynamicTableSource} that describes how to create a PostgreSQL source from a logical
* description.
* <p>
* Copy from com.ververica:flink-connector-postgres-cdc-2.3.0
*/
public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMetadata {

private final ResolvedSchema physicalSchema;
private final int port;
private final String hostname;
private final String database;
private final String schemaName;
private final String tableName;
private final String username;
private final String password;
private final String pluginName;
private final String slotName;
private final DebeziumChangelogMode changelogMode;
private final Properties dbzProperties;

// --------------------------------------------------------------------------------------------
// Mutable attributes
// --------------------------------------------------------------------------------------------

/** Data type that describes the final output of the source. */
protected DataType producedDataType;

/** Metadata that is appended at the end of a physical source row. */
protected List<String> metadataKeys;
private final MetricOption metricOption;
public PostgreSQLTableSource(
ResolvedSchema physicalSchema,
int port,
String hostname,
String database,
String schemaName,
String tableName,
String username,
String password,
String pluginName,
String slotName,
DebeziumChangelogMode changelogMode,
Properties dbzProperties,
MetricOption metricOption) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
this.database = checkNotNull(database);
this.schemaName = checkNotNull(schemaName);
this.tableName = checkNotNull(tableName);
this.username = checkNotNull(username);
this.password = checkNotNull(password);
this.pluginName = checkNotNull(pluginName);
this.slotName = slotName;
this.changelogMode = changelogMode;
this.dbzProperties = dbzProperties;
this.producedDataType = physicalSchema.toPhysicalRowDataType();
this.metadataKeys = Collections.emptyList();
this.metricOption = metricOption;
}

@Override
public ChangelogMode getChangelogMode() {
switch (changelogMode) {
case UPSERT:
return ChangelogMode.upsert();
case ALL:
return ChangelogMode.all();
default:
throw new UnsupportedOperationException(
"Unsupported changelog mode: " + changelogMode);
}
}

@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
RowType physicalDataType =
(RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
MetadataConverter[] metadataConverters = getMetadataConverters();
TypeInformation<RowData> typeInfo = scanContext.createTypeInformation(producedDataType);

DebeziumDeserializationSchema<RowData> deserializer =
RowDataDebeziumDeserializeSchema.newBuilder()
.setPhysicalRowType(physicalDataType)
.setMetadataConverters(metadataConverters)
.setResultTypeInfo(typeInfo)
.setUserDefinedConverterFactory(
PostgreSQLDeserializationConverterFactory.instance())
.setValueValidator(new PostgresValueValidator(schemaName, tableName))
.setChangelogMode(changelogMode)
.setSourceMetricData(metricOption == null ? null : new SourceMetricData(metricOption))
.build();
DebeziumSourceFunction<RowData> sourceFunction =
PostgreSQLSource.<RowData>builder()
.hostname(hostname)
.port(port)
.database(database)
.schemaList(schemaName)
.tableList(schemaName + "." + tableName)
.username(username)
.password(password)
.decodingPluginName(pluginName)
.slotName(slotName)
.debeziumProperties(dbzProperties)
.deserializer(deserializer)
.build();
return SourceFunctionProvider.of(sourceFunction, false);
}

private MetadataConverter[] getMetadataConverters() {
if (metadataKeys.isEmpty()) {
return new MetadataConverter[0];
}

return metadataKeys.stream()
.map(
key -> Stream.of(PostgreSQLReadableMetadata.values())
.filter(m -> m.getKey().equals(key))
.findFirst()
.orElseThrow(IllegalStateException::new))
.map(PostgreSQLReadableMetadata::getConverter)
.toArray(MetadataConverter[]::new);
}

@Override
public DynamicTableSource copy() {
PostgreSQLTableSource source =
new PostgreSQLTableSource(
physicalSchema,
port,
hostname,
database,
schemaName,
tableName,
username,
password,
pluginName,
slotName,
changelogMode,
dbzProperties,
metricOption);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PostgreSQLTableSource that = (PostgreSQLTableSource) o;
return port == that.port
&& Objects.equals(physicalSchema, that.physicalSchema)
&& Objects.equals(hostname, that.hostname)
&& Objects.equals(database, that.database)
&& Objects.equals(schemaName, that.schemaName)
&& Objects.equals(tableName, that.tableName)
&& Objects.equals(username, that.username)
&& Objects.equals(password, that.password)
&& Objects.equals(pluginName, that.pluginName)
&& Objects.equals(slotName, that.slotName)
&& Objects.equals(dbzProperties, that.dbzProperties)
&& Objects.equals(producedDataType, that.producedDataType)
&& Objects.equals(metadataKeys, that.metadataKeys)
&& Objects.equals(changelogMode, that.changelogMode)
&& Objects.equals(metricOption, that.metricOption);
}

@Override
public int hashCode() {
return Objects.hash(
physicalSchema,
port,
hostname,
database,
schemaName,
tableName,
username,
password,
pluginName,
slotName,
dbzProperties,
producedDataType,
metadataKeys,
changelogMode);
}

@Override
public String asSummaryString() {
return "PostgreSQL-CDC";
}

@Override
public Map<String, DataType> listReadableMetadata() {
return Stream.of(PostgreSQLReadableMetadata.values())
.collect(
Collectors.toMap(
PostgreSQLReadableMetadata::getKey,
PostgreSQLReadableMetadata::getDataType));
}

@Override
public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
this.metadataKeys = metadataKeys;
this.producedDataType = producedDataType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.postgre;

import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;

/** The {@link RowDataDebeziumDeserializeSchema.ValueValidator} for Postgres connector.
* <p>
* Copy from com.ververica:flink-connector-postgres-cdc-2.3.0
* */
public final class PostgresValueValidator
implements
RowDataDebeziumDeserializeSchema.ValueValidator {

private static final long serialVersionUID = -1870679469578028765L;

private static final String REPLICA_IDENTITY_EXCEPTION =
"The \"before\" field of UPDATE/DELETE message is null, "
+ "please check the Postgres table has been set REPLICA IDENTITY to FULL level. "
+ "You can update the setting by running the command in Postgres 'ALTER TABLE %s REPLICA IDENTITY FULL'. "
+ "Please see more in Debezium documentation: https://debezium.io/documentation/reference/1.5/connectors/postgresql.html#postgresql-replica-identity";

private final String schemaTable;

public PostgresValueValidator(String schema, String table) {
this.schemaTable = schema + "." + table;
}

@Override
public void validate(RowData rowData, RowKind rowKind) throws Exception {
if (rowData == null) {
throw new IllegalStateException(String.format(REPLICA_IDENTITY_EXCEPTION, schemaTable));
}
}
}
Loading

0 comments on commit 540fd51

Please sign in to comment.