Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-10193][Sort] Postgres connector support audit ID #10198

Merged
merged 9 commits into from
May 15, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

package org.apache.inlong.sort.postgre;

import com.ververica.cdc.connectors.postgres.table.PostgreSQLTableSource;
import org.apache.inlong.sort.base.metric.MetricOption;

import com.ververica.cdc.debezium.table.DebeziumChangelogMode;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;

Expand All @@ -39,7 +39,10 @@
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;

/** Factory for creating configured instance of {@link PostgreSQLTableSource}. */
/** Factory for creating configured instance of {@link PostgreSQLTableSource}.
*
* Copy from com.ververica:flink-connector-postgres-cdc-2.3.0
* */
public class PostgreSQLTableFactory implements DynamicTableSourceFactory {

private static final String IDENTIFIER = "postgres-cdc-inlong";
Expand Down Expand Up @@ -116,7 +119,7 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
+ "\"upsert\": Encodes changes as upsert stream that describes idempotent updates on a key. It can be used for tables with primary keys when replica identity FULL is not an option.");

@Override
public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
helper.validateExcept(DEBEZIUM_OPTIONS_PREFIX);
Expand All @@ -139,6 +142,14 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c
physicalSchema.getPrimaryKey().isPresent(),
"Primary key must be present when upsert mode is selected.");
}
String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
String auditHostAndPorts = config.get(INLONG_AUDIT);
String auditKeys = config.get(AUDIT_KEYS);
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
.withAuditAddress(auditHostAndPorts)
.withAuditKeys(auditKeys)
.build();

return new PostgreSQLTableSource(
physicalSchema,
Expand All @@ -152,7 +163,8 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c
pluginName,
slotName,
changelogMode,
getDebeziumProperties(context.getCatalogTable().getOptions()));
getDebeziumProperties(context.getCatalogTable().getOptions()),
metricOption);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.postgre;

import org.apache.inlong.sort.base.metric.MetricOption;

import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
import com.ververica.cdc.connectors.postgres.table.PostgreSQLDeserializationConverterFactory;
import com.ververica.cdc.connectors.postgres.table.PostgreSQLReadableMetadata;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.table.DebeziumChangelogMode;
import com.ververica.cdc.debezium.table.MetadataConverter;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A {@link DynamicTableSource} that describes how to create a PostgreSQL source from a logical
* description.
*
* Copy from com.ververica:flink-connector-postgres-cdc-2.3.0
*/
public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMetadata {

private final ResolvedSchema physicalSchema;
private final int port;
private final String hostname;
private final String database;
private final String schemaName;
private final String tableName;
private final String username;
private final String password;
private final String pluginName;
private final String slotName;
private final DebeziumChangelogMode changelogMode;
private final Properties dbzProperties;

// --------------------------------------------------------------------------------------------
// Mutable attributes
// --------------------------------------------------------------------------------------------

/** Data type that describes the final output of the source. */
protected DataType producedDataType;

/** Metadata that is appended at the end of a physical source row. */
protected List<String> metadataKeys;
private MetricOption metricOption;
public PostgreSQLTableSource(
ResolvedSchema physicalSchema,
int port,
String hostname,
String database,
String schemaName,
String tableName,
String username,
String password,
String pluginName,
String slotName,
DebeziumChangelogMode changelogMode,
Properties dbzProperties,
MetricOption metricOption) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
this.database = checkNotNull(database);
this.schemaName = checkNotNull(schemaName);
this.tableName = checkNotNull(tableName);
this.username = checkNotNull(username);
this.password = checkNotNull(password);
this.pluginName = checkNotNull(pluginName);
this.slotName = slotName;
this.changelogMode = changelogMode;
this.dbzProperties = dbzProperties;
this.producedDataType = physicalSchema.toPhysicalRowDataType();
this.metadataKeys = Collections.emptyList();
this.metricOption = metricOption;
}

@Override
public ChangelogMode getChangelogMode() {
switch (changelogMode) {
case UPSERT:
return ChangelogMode.upsert();
case ALL:
return ChangelogMode.all();
default:
throw new UnsupportedOperationException(
"Unsupported changelog mode: " + changelogMode);
}
}

@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
RowType physicalDataType =
(RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
MetadataConverter[] metadataConverters = getMetadataConverters();
TypeInformation<RowData> typeInfo = scanContext.createTypeInformation(producedDataType);

DebeziumDeserializationSchema<RowData> deserializer =
RowDataDebeziumDeserializeSchema.newBuilder()
.setPhysicalRowType(physicalDataType)
.setMetadataConverters(metadataConverters)
.setResultTypeInfo(typeInfo)
.setUserDefinedConverterFactory(
PostgreSQLDeserializationConverterFactory.instance())
.setValueValidator(new PostgresValueValidator(schemaName, tableName))
.setChangelogMode(changelogMode)
.setSourceMetricData(metricOption)
.build();
DebeziumSourceFunction<RowData> sourceFunction =
PostgreSQLSource.<RowData>builder()
.hostname(hostname)
.port(port)
.database(database)
.schemaList(schemaName)
.tableList(schemaName + "." + tableName)
.username(username)
.password(password)
.decodingPluginName(pluginName)
.slotName(slotName)
.debeziumProperties(dbzProperties)
.deserializer(deserializer)
.build();
return SourceFunctionProvider.of(sourceFunction, false);
}

private MetadataConverter[] getMetadataConverters() {
if (metadataKeys.isEmpty()) {
return new MetadataConverter[0];
}

return metadataKeys.stream()
.map(
key -> Stream.of(PostgreSQLReadableMetadata.values())
.filter(m -> m.getKey().equals(key))
.findFirst()
.orElseThrow(IllegalStateException::new))
.map(PostgreSQLReadableMetadata::getConverter)
.toArray(MetadataConverter[]::new);
}

@Override
public DynamicTableSource copy() {
PostgreSQLTableSource source =
new PostgreSQLTableSource(
physicalSchema,
port,
hostname,
database,
schemaName,
tableName,
username,
password,
pluginName,
slotName,
changelogMode,
dbzProperties,
metricOption);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PostgreSQLTableSource that = (PostgreSQLTableSource) o;
return port == that.port
&& Objects.equals(physicalSchema, that.physicalSchema)
&& Objects.equals(hostname, that.hostname)
&& Objects.equals(database, that.database)
&& Objects.equals(schemaName, that.schemaName)
&& Objects.equals(tableName, that.tableName)
&& Objects.equals(username, that.username)
&& Objects.equals(password, that.password)
&& Objects.equals(pluginName, that.pluginName)
&& Objects.equals(slotName, that.slotName)
&& Objects.equals(dbzProperties, that.dbzProperties)
&& Objects.equals(producedDataType, that.producedDataType)
&& Objects.equals(metadataKeys, that.metadataKeys)
&& Objects.equals(changelogMode, that.changelogMode)
&& Objects.equals(metricOption, that.metricOption);
}

@Override
public int hashCode() {
return Objects.hash(
physicalSchema,
port,
hostname,
database,
schemaName,
tableName,
username,
password,
pluginName,
slotName,
dbzProperties,
producedDataType,
metadataKeys,
changelogMode);
}

@Override
public String asSummaryString() {
return "PostgreSQL-CDC";
}

@Override
public Map<String, DataType> listReadableMetadata() {
return Stream.of(PostgreSQLReadableMetadata.values())
.collect(
Collectors.toMap(
PostgreSQLReadableMetadata::getKey,
PostgreSQLReadableMetadata::getDataType));
}

@Override
public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
this.metadataKeys = metadataKeys;
this.producedDataType = producedDataType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.postgre;

import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;

/** The {@link RowDataDebeziumDeserializeSchema.ValueValidator} for Postgres connector.
*
* Copy from com.ververica:flink-connector-postgres-cdc-2.3.0
* */
public final class PostgresValueValidator
XiaoYou201 marked this conversation as resolved.
Show resolved Hide resolved
implements
RowDataDebeziumDeserializeSchema.ValueValidator {

private static final long serialVersionUID = -1870679469578028765L;

private static final String REPLICA_IDENTITY_EXCEPTION =
"The \"before\" field of UPDATE/DELETE message is null, "
+ "please check the Postgres table has been set REPLICA IDENTITY to FULL level. "
+ "You can update the setting by running the command in Postgres 'ALTER TABLE %s REPLICA IDENTITY FULL'. "
+ "Please see more in Debezium documentation: https://debezium.io/documentation/reference/1.5/connectors/postgresql.html#postgresql-replica-identity";

private final String schemaTable;

public PostgresValueValidator(String schema, String table) {
this.schemaTable = schema + "." + table;
}

@Override
public void validate(RowData rowData, RowKind rowKind) throws Exception {
if (rowData == null) {
throw new IllegalStateException(String.format(REPLICA_IDENTITY_EXCEPTION, schemaTable));
}
}
}
Loading
Loading