diff --git a/docs/en/connector-v2/source/MySQL-CDC.md b/docs/en/connector-v2/source/MySQL-CDC.md index 0114d5c1d5b..42d3db09c91 100644 --- a/docs/en/connector-v2/source/MySQL-CDC.md +++ b/docs/en/connector-v2/source/MySQL-CDC.md @@ -175,7 +175,9 @@ When an initial consistent snapshot is made for large databases, your establishe | username | String | Yes | - | Name of the database to use when connecting to the database server. | | password | String | Yes | - | Password to use when connecting to the database server. | | database-names | List | No | - | Database name of the database to monitor. | +| database-pattern | String | No | .* | The database names RegEx of the database to capture, for example: `database_prefix.*`. | | table-names | List | Yes | - | Table name of the database to monitor. The table name needs to include the database name, for example: `database_name.table_name` | +| table-pattern | String | Yes | - | The table names RegEx of the database to capture. The table name needs to include the database name, for example: `database.*\\.table_.*` | | table-names-config | List | No | - | Table config list. for example: [{"table": "db1.schema1.table1","primaryKeys": ["key1"],"snapshotSplitColumn": "key2"}] | | startup.mode | Enum | No | INITIAL | Optional startup mode for MySQL CDC consumer, valid enumerations are `initial`, `earliest`, `latest` and `specific`.
`initial`: Synchronize historical data at startup, and then synchronize incremental data.
`earliest`: Startup from the earliest offset possible.
`latest`: Startup from the latest offset.
`specific`: Startup from user-supplied specific offsets. | | startup.specific-offset.file | String | No | - | Start from the specified binlog file name. **Note, This option is required when the `startup.mode` option used `specific`.** | @@ -303,6 +305,34 @@ sink { } ``` +### Support table-pattern for multi-table reading +> `table-pattern` and `table-names` are mutually exclusive +```hocon +env { + # You can set engine configuration here + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 + read_limit.bytes_per_second=7000000 + read_limit.rows_per_second=400 +} + +source { + MySQL-CDC { + server-id = 5652 + username = "st_user_source" + password = "mysqlpw" + database-pattern = "source.*" + table-pattern = "source.*\\..*" + base-url = "jdbc:mysql://mysql_cdc_e2e:3306" + } +} + +sink { + Console { + } +} +``` ## Changelog diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java index 87dd7d3a8f7..4f58172a6e2 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java @@ -42,6 +42,8 @@ public abstract class JdbcSourceConfigFactory implements SourceConfig.Factory databaseList; protected List tableList; + protected String databasePattern; + protected String tablePattern; protected StartupConfig startupConfig; protected StopConfig stopConfig; protected double distributionFactorUpper = @@ -243,6 +245,8 @@ public JdbcSourceConfigFactory fromReadonlyConfig(ReadonlyConfig config) { this.password = config.get(JdbcSourceOptions.PASSWORD); this.databaseList = config.get(JdbcSourceOptions.DATABASE_NAMES); this.tableList = config.get(CatalogOptions.TABLE_NAMES); + this.databasePattern = config.get(CatalogOptions.DATABASE_PATTERN); + this.tablePattern = config.get(CatalogOptions.TABLE_PATTERN); this.distributionFactorUpper = config.get(JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); this.distributionFactorLower = diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java index db63e4e4dce..5cc1b51d0f9 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java @@ -95,9 +95,13 @@ public MySqlSourceConfig create(int subtaskId) { } if (databaseList != null) { props.setProperty("database.include.list", String.join(",", databaseList)); + } else if (databasePattern != null) { + props.setProperty("database.include.list", databasePattern); } if (tableList != null) { props.setProperty("table.include.list", String.join(",", tableList)); + } else if (tablePattern != null) { + props.setProperty("table.include.list", tablePattern); } if (serverTimeZone != null) { props.setProperty("database.serverTimezone", serverTimeZone); diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java index c11f9e72d4d..5c93aa64547 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceSplit; @@ -38,12 +39,14 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions; import com.google.auto.service.AutoService; +import lombok.extern.slf4j.Slf4j; import java.io.Serializable; import java.util.List; import java.util.Optional; @AutoService(Factory.class) +@Slf4j public class MySqlIncrementalSourceFactory extends BaseChangeStreamTableSourceFactory { @Override public String factoryIdentifier() { @@ -99,9 +102,9 @@ public Class getSourceClass() { TableSource restoreSource( TableSourceFactoryContext context, List restoreTables) { return () -> { + ReadonlyConfig config = context.getOptions(); List catalogTables = - CatalogTableUtil.getCatalogTables( - context.getOptions(), context.getClassLoader()); + CatalogTableUtil.getCatalogTables(config, context.getClassLoader()); boolean enableSchemaChange = context.getOptions() .getOptional(SourceOptions.SCHEMA_CHANGES_ENABLED) @@ -137,7 +140,7 @@ TableSource restoreSource( text -> TablePath.of(text, false)); } return (SeaTunnelSource) - new MySqlIncrementalSource<>(context.getOptions(), catalogTables); + new MySqlIncrementalSource<>(config, catalogTables); }; } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java index 35a86a4e266..f422b53d13d 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java @@ -53,6 +53,7 @@ import java.util.stream.Stream; import static org.awaitility.Awaitility.await; +import static org.testcontainers.shaded.org.awaitility.Awaitility.given; @Slf4j @DisabledOnContainer( @@ -72,6 +73,7 @@ public class MysqlCDCIT extends TestSuiteBase implements TestResource { private final UniqueDatabase inventoryDatabase = new UniqueDatabase( MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw", MYSQL_DATABASE); + private final String QUERY_SQL = "select * from %s.%s"; // mysql source table query sql private static final String SOURCE_SQL_TEMPLATE = @@ -539,6 +541,59 @@ public void testMysqlCdcMultiTableWithCustomPrimaryKey(TestContainer container) SOURCE_TABLE_2_CUSTOM_PRIMARY_KEY))))); } + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK}, + disabledReason = "Currently SPARK do not support cdc") + public void testMysqlCdcByWildcardsConfig(TestContainer container) + throws IOException, InterruptedException { + inventoryDatabase.setTemplateName("wildcards").createAndInitialize(); + CompletableFuture.runAsync( + () -> { + try { + container.executeJob("/mysqlcdc_wildcards_to_mysql.conf"); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + }); + TimeUnit.SECONDS.sleep(5); + inventoryDatabase.setTemplateName("wildcards_dml").createAndInitialize(); + given().pollDelay(20, TimeUnit.SECONDS) + .pollInterval(2000, TimeUnit.MILLISECONDS) + .await() + .atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertAll( + () -> { + log.info( + query(getQuerySQL("sink", "source_products")) + .toString()); + Assertions.assertIterableEquals( + query(getQuerySQL("source", "products")), + query(getQuerySQL("sink", "source_products"))); + }, + () -> { + log.info( + query(getQuerySQL("sink", "source_customers")) + .toString()); + Assertions.assertIterableEquals( + query(getQuerySQL("source", "customers")), + query(getQuerySQL("sink", "source_customers"))); + }, + () -> { + log.info( + query(getQuerySQL("sink", "source1_orders")) + .toString()); + Assertions.assertIterableEquals( + query(getQuerySQL("source1", "orders")), + query(getQuerySQL("sink", "source1_orders"))); + }); + }); + } + private Connection getJdbcConnection() throws SQLException { return DriverManager.getConnection( MYSQL_CONTAINER.getJdbcUrl(), @@ -703,4 +758,8 @@ private String getSourceQuerySQL(String database, String tableName) { private String getSinkQuerySQL(String database, String tableName) { return String.format(SINK_SQL_TEMPLATE, database, tableName); } + + private String getQuerySQL(String database, String tableName) { + return String.format(QUERY_SQL, database, tableName); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/wildcards.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/wildcards.sql new file mode 100644 index 00000000000..2e35c8c6906 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/wildcards.sql @@ -0,0 +1,93 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: source +-- ---------------------------------------------------------------------------------------------------------------- +CREATE DATABASE IF NOT EXISTS `source`; +use `source`; + +drop table if exists `source`.`products`; +-- Create and populate our products using a single insert with many rows +CREATE TABLE products ( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'SeaTunnel', + description VARCHAR(512), + weight FLOAT +); + +ALTER TABLE `source`.`products` AUTO_INCREMENT = 101; + +INSERT INTO `source`.`products` +VALUES (101,"scooter","Small 2-wheel scooter",3.14), + (102,"car battery","12V car battery",8.1), + (103,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8), + (104,"hammer","12oz carpenter's hammer",0.75), + (105,"hammer","14oz carpenter's hammer",0.875), + (106,"hammer","16oz carpenter's hammer",1.0), + (107,"rocks","box of assorted rocks",5.3), + (108,"jacket","water resistent black wind breaker",0.1), + (109,"spare tire","24 inch spare tire",22.2); + + +DROP TABLE IF EXISTS `source`.`customers`; +CREATE TABLE `source`.`customers` ( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + first_name VARCHAR(255) NOT NULL, + last_name VARCHAR(255) NOT NULL, + email VARCHAR(255) NOT NULL UNIQUE KEY +) AUTO_INCREMENT=1001; + + +INSERT INTO `source`.`customers` +VALUES (1001,"Sally","Thomas","sally.thomas@acme.com"), + (1002,"George","Bailey","gbailey@foobar.com"), + (1003,"Edward","Walker","ed@walker.com"), + (1004,"Anne","Kretchmar","annek@noanswer.org"); + + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: source1 +-- ---------------------------------------------------------------------------------------------------------------- +CREATE DATABASE IF NOT EXISTS `source1`; +use `source1`; + +DROP TABLE IF EXISTS `source1`.`orders`; +CREATE TABLE `source1`.`orders` ( + order_number INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + order_date DATE NOT NULL, + purchaser INTEGER NOT NULL, + quantity INTEGER NOT NULL, + product_id INTEGER NOT NULL +) AUTO_INCREMENT = 10001; + + +INSERT INTO `source1`.`orders` +VALUES (10001, '2016-01-16', 1001, 1, 102), + (10002, '2016-01-17', 1002, 2, 105), + (10003, '2016-02-18', 1004, 3, 109), + (10004, '2016-02-19', 1002, 2, 106), + (10005, '16-02-21', 1003, 1, 107); + +CREATE DATABASE IF NOT EXISTS `sink`; + +use `sink`; + +DROP TABLE IF EXISTS `source_products`; +DROP TABLE IF EXISTS `source_customers`; +DROP TABLE IF EXISTS `source1_orders`; + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/wildcards_dml.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/wildcards_dml.sql new file mode 100644 index 00000000000..ce3deef6388 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/wildcards_dml.sql @@ -0,0 +1,34 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: source +-- ---------------------------------------------------------------------------------------------------------------- + +use `source`; + +UPDATE `source`.`products` SET name = 'Illustrated new quality productivity' WHERE id = 102; +INSERT INTO `source`.`customers` VALUES (1005,"Zhangdonghao","","hawk9821@xxx.com"); + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: source1 +-- ---------------------------------------------------------------------------------------------------------------- + +use `source1`; +DELETE FROM `source1`.`orders` where order_number < 10004; + + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_wildcards_to_mysql.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_wildcards_to_mysql.conf new file mode 100644 index 00000000000..105063d04ab --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_wildcards_to_mysql.conf @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 + read_limit.bytes_per_second=7000000 + read_limit.rows_per_second=400 +} + +source { + MySQL-CDC { + server-id = 5652 + username = "st_user_source" + password = "mysqlpw" + table-pattern = "source.*\\..*" + base-url = "jdbc:mysql://mysql_cdc_e2e:3306" + } +} + +sink { + jdbc { + url = "jdbc:mysql://mysql_cdc_e2e:3306/sink" + driver = "com.mysql.cj.jdbc.Driver" + user = "st_user_sink" + password = "mysqlpw" + + generate_sink_sql = true + # You need to configure both database and table + database = sink + table = "${database_name}_${table_name}" + } +} \ No newline at end of file