From ab70d55919034c302eafb5a2638777c1251e61e6 Mon Sep 17 00:00:00 2001 From: mfvitale Date: Fri, 5 Apr 2024 16:23:53 +0200 Subject: [PATCH 1/3] DBZ-7732 Add test module for future connectors integration tests --- debezium-system-tests/pom.xml | 54 ++++++++ .../io/debezium/server/DebeziumServerIT.java | 91 +++++++++++++ .../io/debezium/server/TestConfigSource.java | 126 ++++++++++++++++++ .../java/io/debezium/server/TestConsumer.java | 56 ++++++++ ...lipse.microprofile.config.spi.ConfigSource | 1 + .../src/test/resources/application.properties | 1 + .../src/test/resources/logback-test.xml | 39 ++++++ pom.xml | 1 + 8 files changed, 369 insertions(+) create mode 100644 debezium-system-tests/pom.xml create mode 100644 debezium-system-tests/src/test/java/io/debezium/server/DebeziumServerIT.java create mode 100644 debezium-system-tests/src/test/java/io/debezium/server/TestConfigSource.java create mode 100644 debezium-system-tests/src/test/java/io/debezium/server/TestConsumer.java create mode 100644 debezium-system-tests/src/test/resources/META-INF/services/org.eclipse.microprofile.config.spi.ConfigSource create mode 100644 debezium-system-tests/src/test/resources/application.properties create mode 100644 debezium-system-tests/src/test/resources/logback-test.xml diff --git a/debezium-system-tests/pom.xml b/debezium-system-tests/pom.xml new file mode 100644 index 00000000..6fe0a9a7 --- /dev/null +++ b/debezium-system-tests/pom.xml @@ -0,0 +1,54 @@ + + + io.debezium + debezium-server + 2.7.0-SNAPSHOT + ../pom.xml + + + 4.0.0 + debezium-system-tests + Debezium system integration test-suite + jar + + http://maven.apache.org + + + UTF-8 + + + + + + io.quarkus + quarkus-junit5 + test + + + org.assertj + assertj-core + test + + + io.debezium + debezium-testing-testcontainers + test + + + io.debezium + debezium-server-core + test + + + io.debezium + debezium-connector-oracle + test + + + io.debezium + debezium-connector-postgres + test + + + diff --git a/debezium-system-tests/src/test/java/io/debezium/server/DebeziumServerIT.java b/debezium-system-tests/src/test/java/io/debezium/server/DebeziumServerIT.java new file mode 100644 index 00000000..517c622f --- /dev/null +++ b/debezium-system-tests/src/test/java/io/debezium/server/DebeziumServerIT.java @@ -0,0 +1,91 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.server; + +import io.debezium.config.CommonConnectorConfig; +import io.debezium.server.events.ConnectorCompletedEvent; +import io.debezium.server.events.ConnectorStartedEvent; +import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager; +import io.debezium.util.Testing; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; +import jakarta.enterprise.event.Observes; +import jakarta.inject.Inject; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledIfSystemProperty; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; + +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration test that verifies basic reading from PostgreSQL database. + * + * @author Jiri Pechanec + */ +@QuarkusTest +@QuarkusTestResource(PostgresTestResourceLifecycleManager.class) +@EnabledIfSystemProperty(named = "test.apicurio", matches = "false", disabledReason = "DebeziumServerIT doesn't run with apicurio profile.") +@DisabledIfSystemProperty(named = "debezium.format.key", matches = "protobuf") +@DisabledIfSystemProperty(named = "debezium.format.value", matches = "protobuf") +public class DebeziumServerIT { + + private static final int MESSAGE_COUNT = 4; + @Inject + DebeziumServer server; + + @Inject + DebeziumMetrics metrics; + + { + Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH); + } + + void setupDependencies(@Observes ConnectorStartedEvent event) { + if (!TestConfigSource.isItTest()) { + return; + } + + } + + void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exception { + if (!event.isSuccess()) { + throw (Exception) event.getError().get(); + } + } + + @Test + public void testPostgresWithJson() throws Exception { + + Testing.Print.enable(); + final TestConsumer testConsumer = (TestConsumer) server.getConsumer(); + Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())) + .until(() -> (testConsumer.getValues().size() >= MESSAGE_COUNT)); + assertThat(testConsumer.getValues().size()).isEqualTo(MESSAGE_COUNT); + assertThat(((String) testConsumer.getValues().get(MESSAGE_COUNT - 1))).contains( + "\"after\":{\"id\":1004,\"first_name\":\"Anne\",\"last_name\":\"Kretchmar\",\"email\":\"annek@noanswer.org\"}"); + } + + @Test + public void testDebeziumMetricsWithPostgres() { + Testing.Print.enable(); + + Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> { + try { + // snapshot process finished + // and consuming events finished! + return metrics.snapshotCompleted() + && metrics.streamingQueueCurrentSize() == 0 + && metrics.maxQueueSize() == CommonConnectorConfig.DEFAULT_MAX_QUEUE_SIZE; + } + catch (Exception e) { + return false; + } + }); + } +} diff --git a/debezium-system-tests/src/test/java/io/debezium/server/TestConfigSource.java b/debezium-system-tests/src/test/java/io/debezium/server/TestConfigSource.java new file mode 100644 index 00000000..855ab042 --- /dev/null +++ b/debezium-system-tests/src/test/java/io/debezium/server/TestConfigSource.java @@ -0,0 +1,126 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.server; + +import io.debezium.data.Json; +import io.debezium.util.Testing; +import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; +import org.eclipse.microprofile.config.spi.ConfigSource; + +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * A config source used during tests. Amended/overridden by values exposed from test lifecycle listeners. + */ +public class TestConfigSource implements ConfigSource { + + public static final String OFFSETS_FILE = "file-connector-offsets.txt"; + public static final Path OFFSET_STORE_PATH = Testing.Files.createTestingPath(OFFSETS_FILE).toAbsolutePath(); + public static final Path TEST_FILE_PATH = Testing.Files.createTestingPath("file-connector-input.txt").toAbsolutePath(); + + final Map integrationTest = new HashMap<>(); + final Map kinesisTest = new HashMap<>(); + final Map pubsubTest = new HashMap<>(); + final Map unitTest = new HashMap<>(); + protected Map config; + + public TestConfigSource() { + integrationTest.put("debezium.sink.type", "test"); + integrationTest.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector"); + integrationTest.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); + integrationTest.put("debezium.source.offset.flush.interval.ms", "0"); + integrationTest.put("debezium.source.topic.prefix", "testc"); + integrationTest.put("debezium.source.schema.include.list", "inventory"); + integrationTest.put("debezium.source.table.include.list", "inventory.customers"); + + String format = System.getProperty("test.apicurio.converter.format"); + String formatKey = System.getProperty("debezium.format.key"); + String formatValue = System.getProperty("debezium.format.value"); + String formatHeader = System.getProperty("debezium.format.header", "json"); + + if (format != null && format.length() != 0) { + integrationTest.put("debezium.format.key", format); + integrationTest.put("debezium.format.value", format); + integrationTest.put("debezium.format.header", formatHeader); + // TODO remove once https://github.com/Apicurio/apicurio-registry/issues/4351 is fixed + integrationTest.put("debezium.source.record.processing.threads", "1"); + } + else { + formatKey = (formatKey != null) ? formatKey : Json.class.getSimpleName().toLowerCase(); + formatValue = (formatValue != null) ? formatValue : Json.class.getSimpleName().toLowerCase(); + formatHeader = (formatHeader != null) ? formatHeader : Json.class.getSimpleName().toLowerCase(); + integrationTest.put("debezium.format.key", formatKey); + integrationTest.put("debezium.format.value", formatValue); + integrationTest.put("debezium.format.header", formatHeader); + } + + unitTest.put("debezium.sink.type", "test"); + unitTest.put("debezium.source.connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector"); + unitTest.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); + unitTest.put("debezium.source.offset.flush.interval.ms", "0"); + unitTest.put("debezium.source.file", TEST_FILE_PATH.toAbsolutePath().toString()); + unitTest.put("debezium.source.topic", "topicX"); + unitTest.put("debezium.format.header", formatHeader); + unitTest.put("debezium.format.schemas.enable", "true"); + unitTest.put("debezium.format.header.schemas.enable", "false"); + unitTest.put("debezium.format.value.schemas.enable", "false"); + unitTest.put("debezium.transforms", "hoist,addheader"); + unitTest.put("debezium.transforms.hoist.type", "org.apache.kafka.connect.transforms.HoistField$Value"); + unitTest.put("debezium.transforms.hoist.field", "line"); + unitTest.put("debezium.transforms.hoist.predicate", "topicNameMatch"); + unitTest.put("debezium.transforms.addheader.type", "org.apache.kafka.connect.transforms.InsertHeader"); + unitTest.put("debezium.transforms.addheader.header", "headerKey"); + unitTest.put("debezium.transforms.addheader.value.literal", "headerValue"); + + unitTest.put("debezium.predicates", "topicNameMatch"); + unitTest.put("debezium.predicates.topicNameMatch.type", "org.apache.kafka.connect.transforms.predicates.TopicNameMatches"); + unitTest.put("debezium.predicates.topicNameMatch.pattern", ".*"); + + // DBZ-2622 For testing properties passed via smallrye/microprofile environment variables + unitTest.put("DEBEZIUM_SOURCE_TABLE_INCLUDE_LIST", "public.table_name"); + unitTest.put("debezium_source_offset_flush_interval_ms_Test", "0"); + unitTest.put("debezium.source.snapshot.select.statement.overrides.public.table_name", "SELECT * FROM table_name WHERE 1>2"); + unitTest.put("debezium.source.database.allowPublicKeyRetrieval", "true"); + + if (isItTest()) { + config = integrationTest; + } + else { + config = unitTest; + } + } + + public static boolean isItTest() { + return "IT".equals(System.getProperty("test.type")); + } + + @Override + public Map getProperties() { + return config; + } + + @Override + public String getValue(String propertyName) { + return config.get(propertyName); + } + + @Override + public String getName() { + return "test"; + } + + @Override + public Set getPropertyNames() { + return config.keySet(); + } + + public static int waitForSeconds() { + return 60; + } +} diff --git a/debezium-system-tests/src/test/java/io/debezium/server/TestConsumer.java b/debezium-system-tests/src/test/java/io/debezium/server/TestConsumer.java new file mode 100644 index 00000000..dbc5025d --- /dev/null +++ b/debezium-system-tests/src/test/java/io/debezium/server/TestConsumer.java @@ -0,0 +1,56 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.server; + +import io.debezium.DebeziumException; +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.DebeziumEngine.RecordCommitter; +import io.debezium.util.Testing; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.Dependent; +import jakarta.inject.Named; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +@Dependent +@Named("test") +public class TestConsumer implements DebeziumEngine.ChangeConsumer> { + + final List values = Collections.synchronizedList(new ArrayList<>()); + + @PostConstruct + void init() { + Testing.print("Test consumer constructed"); + } + + @PreDestroy + void close() { + Testing.print("Test consumer destroyed"); + } + + @Override + public void handleBatch(List> records, RecordCommitter> committer) + throws InterruptedException { + records.forEach(record -> { + Testing.print(record); + values.add(record.value()); + try { + committer.markProcessed(record); + } + catch (InterruptedException e) { + throw new DebeziumException(e); + } + }); + } + + public List getValues() { + return values; + } +} diff --git a/debezium-system-tests/src/test/resources/META-INF/services/org.eclipse.microprofile.config.spi.ConfigSource b/debezium-system-tests/src/test/resources/META-INF/services/org.eclipse.microprofile.config.spi.ConfigSource new file mode 100644 index 00000000..8617037b --- /dev/null +++ b/debezium-system-tests/src/test/resources/META-INF/services/org.eclipse.microprofile.config.spi.ConfigSource @@ -0,0 +1 @@ +io.debezium.server.TestConfigSource \ No newline at end of file diff --git a/debezium-system-tests/src/test/resources/application.properties b/debezium-system-tests/src/test/resources/application.properties new file mode 100644 index 00000000..b1970efd --- /dev/null +++ b/debezium-system-tests/src/test/resources/application.properties @@ -0,0 +1 @@ +quarkus.kubernetes-client.devservices.enabled=false \ No newline at end of file diff --git a/debezium-system-tests/src/test/resources/logback-test.xml b/debezium-system-tests/src/test/resources/logback-test.xml new file mode 100644 index 00000000..4c7e2a41 --- /dev/null +++ b/debezium-system-tests/src/test/resources/logback-test.xml @@ -0,0 +1,39 @@ + + + + + %d{ISO8601} %-5p %X{dbz.connectorType}|%X{dbz.connectorName}|%X{dbz.connectorContext} %m [%c]%n + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/pom.xml b/pom.xml index 8b5dba33..80624452 100644 --- a/pom.xml +++ b/pom.xml @@ -62,6 +62,7 @@ debezium-server-rabbitmq debezium-server-rocketmq debezium-server-sqs + debezium-system-tests From aa089a8cb2fcca047baf266ea493d413071ce967 Mon Sep 17 00:00:00 2001 From: mfvitale Date: Mon, 8 Apr 2024 10:40:22 +0200 Subject: [PATCH 2/3] DBZ-7732 Support multiple connector testing on Debezium Server --- debezium-server-bom/pom.xml | 6 + debezium-system-tests/pom.xml | 25 ++++ .../io/debezium/server/DebeziumServerIT.java | 91 ------------- .../server/DebeziumServerPostgresIT.java | 127 ++++++++++++++++++ .../io/debezium/server/TestConfigSource.java | 86 +++--------- .../java/io/debezium/server/TestConsumer.java | 21 +-- 6 files changed, 191 insertions(+), 165 deletions(-) delete mode 100644 debezium-system-tests/src/test/java/io/debezium/server/DebeziumServerIT.java create mode 100644 debezium-system-tests/src/test/java/io/debezium/server/DebeziumServerPostgresIT.java diff --git a/debezium-server-bom/pom.xml b/debezium-server-bom/pom.xml index b9eed7dc..de532a2d 100644 --- a/debezium-server-bom/pom.xml +++ b/debezium-server-bom/pom.xml @@ -251,6 +251,12 @@ + + io.debezium + debezium-testing-system + ${project.version} + test + org.junit-pioneer junit-pioneer diff --git a/debezium-system-tests/pom.xml b/debezium-system-tests/pom.xml index 6fe0a9a7..aecc6a42 100644 --- a/debezium-system-tests/pom.xml +++ b/debezium-system-tests/pom.xml @@ -35,6 +35,11 @@ debezium-testing-testcontainers test + + io.debezium + debezium-testing-system + test + io.debezium debezium-server-core @@ -50,5 +55,25 @@ debezium-connector-postgres test + + io.debezium + debezium-connector-mysql + test + + + io.debezium + debezium-connector-sqlserver + test + + + io.debezium + debezium-connector-mongodb + test + + + io.debezium + debezium-connector-db2 + test + diff --git a/debezium-system-tests/src/test/java/io/debezium/server/DebeziumServerIT.java b/debezium-system-tests/src/test/java/io/debezium/server/DebeziumServerIT.java deleted file mode 100644 index 517c622f..00000000 --- a/debezium-system-tests/src/test/java/io/debezium/server/DebeziumServerIT.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.server; - -import io.debezium.config.CommonConnectorConfig; -import io.debezium.server.events.ConnectorCompletedEvent; -import io.debezium.server.events.ConnectorStartedEvent; -import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager; -import io.debezium.util.Testing; -import io.quarkus.test.common.QuarkusTestResource; -import io.quarkus.test.junit.QuarkusTest; -import jakarta.enterprise.event.Observes; -import jakarta.inject.Inject; -import org.awaitility.Awaitility; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.DisabledIfSystemProperty; -import org.junit.jupiter.api.condition.EnabledIfSystemProperty; - -import java.time.Duration; - -import static org.assertj.core.api.Assertions.assertThat; - -/** - * Integration test that verifies basic reading from PostgreSQL database. - * - * @author Jiri Pechanec - */ -@QuarkusTest -@QuarkusTestResource(PostgresTestResourceLifecycleManager.class) -@EnabledIfSystemProperty(named = "test.apicurio", matches = "false", disabledReason = "DebeziumServerIT doesn't run with apicurio profile.") -@DisabledIfSystemProperty(named = "debezium.format.key", matches = "protobuf") -@DisabledIfSystemProperty(named = "debezium.format.value", matches = "protobuf") -public class DebeziumServerIT { - - private static final int MESSAGE_COUNT = 4; - @Inject - DebeziumServer server; - - @Inject - DebeziumMetrics metrics; - - { - Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH); - } - - void setupDependencies(@Observes ConnectorStartedEvent event) { - if (!TestConfigSource.isItTest()) { - return; - } - - } - - void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exception { - if (!event.isSuccess()) { - throw (Exception) event.getError().get(); - } - } - - @Test - public void testPostgresWithJson() throws Exception { - - Testing.Print.enable(); - final TestConsumer testConsumer = (TestConsumer) server.getConsumer(); - Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())) - .until(() -> (testConsumer.getValues().size() >= MESSAGE_COUNT)); - assertThat(testConsumer.getValues().size()).isEqualTo(MESSAGE_COUNT); - assertThat(((String) testConsumer.getValues().get(MESSAGE_COUNT - 1))).contains( - "\"after\":{\"id\":1004,\"first_name\":\"Anne\",\"last_name\":\"Kretchmar\",\"email\":\"annek@noanswer.org\"}"); - } - - @Test - public void testDebeziumMetricsWithPostgres() { - Testing.Print.enable(); - - Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> { - try { - // snapshot process finished - // and consuming events finished! - return metrics.snapshotCompleted() - && metrics.streamingQueueCurrentSize() == 0 - && metrics.maxQueueSize() == CommonConnectorConfig.DEFAULT_MAX_QUEUE_SIZE; - } - catch (Exception e) { - return false; - } - }); - } -} diff --git a/debezium-system-tests/src/test/java/io/debezium/server/DebeziumServerPostgresIT.java b/debezium-system-tests/src/test/java/io/debezium/server/DebeziumServerPostgresIT.java new file mode 100644 index 00000000..0e8f7da0 --- /dev/null +++ b/debezium-system-tests/src/test/java/io/debezium/server/DebeziumServerPostgresIT.java @@ -0,0 +1,127 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.server; + +import static io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager.JDBC_POSTGRESQL_URL_FORMAT; +import static io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager.POSTGRES_HOST; +import static io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager.POSTGRES_PORT; +import static org.assertj.core.api.Assertions.assertThat; + +import java.sql.SQLException; +import java.time.Duration; +import java.util.List; +import java.util.stream.Collectors; + +import jakarta.inject.Inject; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; + +import io.debezium.config.CommonConnectorConfig; +import io.debezium.testing.system.tools.databases.SqlDatabaseClient; +import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager; +import io.debezium.util.Testing; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; + +/** + * Integration test that verifies basic reading from PostgreSQL database. + * + * @author Fiore Mario Vitale + */ +@QuarkusTest +@QuarkusTestResource(PostgresTestResourceLifecycleManager.class) +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +public class DebeziumServerPostgresIT { + + private static final int MESSAGE_COUNT = 4; + @Inject + DebeziumServer server; + + @Inject + DebeziumMetrics metrics; + + { + Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH); + } + + @Test + @Order(1) + public void shouldSnapshot() { + + Testing.Print.enable(); + + final TestConsumer testConsumer = (TestConsumer) server.getConsumer(); + + waitSnapshotCompletion(); + + assertThat(testConsumer.getValues().size()).isEqualTo(MESSAGE_COUNT); + + List values = testConsumer.getValues().stream().map(Object::toString).collect(Collectors.toList()); + + assertThat(values.get(0)).contains("\"after\":{\"id\":1001,\"first_name\":\"Sally\",\"last_name\":\"Thomas\",\"email\":\"sally.thomas@acme.com\"}"); + assertThat(values.get(1)).contains("\"after\":{\"id\":1002,\"first_name\":\"George\",\"last_name\":\"Bailey\",\"email\":\"gbailey@foobar.com\"}"); + assertThat(values.get(2)).contains("\"after\":{\"id\":1003,\"first_name\":\"Edward\",\"last_name\":\"Walker\",\"email\":\"ed@walker.com\"}"); + assertThat(values.get(3)).contains("\"after\":{\"id\":1004,\"first_name\":\"Anne\",\"last_name\":\"Kretchmar\",\"email\":\"annek@noanswer.org\"}"); + } + + @Test + @Order(2) + public void shouldStream() throws SQLException { + Testing.Print.enable(); + + final TestConsumer testConsumer = (TestConsumer) server.getConsumer(); + + waitSnapshotCompletion(); + + insertNewRow(); + + Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())) + .until(() -> (testConsumer.getValues().size() >= MESSAGE_COUNT + 1)); + + List values = testConsumer.getValues().stream().map(Object::toString).collect(Collectors.toList()); + + assertThat(values.get(0)).contains("\"after\":{\"id\":1001,\"first_name\":\"Sally\",\"last_name\":\"Thomas\",\"email\":\"sally.thomas@acme.com\"}"); + assertThat(values.get(1)).contains("\"after\":{\"id\":1002,\"first_name\":\"George\",\"last_name\":\"Bailey\",\"email\":\"gbailey@foobar.com\"}"); + assertThat(values.get(2)).contains("\"after\":{\"id\":1003,\"first_name\":\"Edward\",\"last_name\":\"Walker\",\"email\":\"ed@walker.com\"}"); + assertThat(values.get(3)).contains("\"after\":{\"id\":1004,\"first_name\":\"Anne\",\"last_name\":\"Kretchmar\",\"email\":\"annek@noanswer.org\"}"); + assertThat(values.get(4)).contains("\"after\":{\"id\":1005,\"first_name\":\"Jon\",\"last_name\":\"Snow\",\"email\":\"jon_snow@gameofthrones.com\"}"); + + } + + private void waitSnapshotCompletion() { + Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> { + try { + // snapshot process finished + // and consuming events finished! + return metrics.snapshotCompleted() + && metrics.streamingQueueCurrentSize() == 0 + && metrics.maxQueueSize() == CommonConnectorConfig.DEFAULT_MAX_QUEUE_SIZE; + } + catch (Exception e) { + return false; + } + }); + } + + private static void insertNewRow() throws SQLException { + + SqlDatabaseClient sqlDatabaseClient = new SqlDatabaseClient(getJdbcUrl(), + PostgresTestResourceLifecycleManager.POSTGRES_USER, + PostgresTestResourceLifecycleManager.POSTGRES_PASSWORD); + + String sql = "INSERT INTO inventory.customers VALUES (default, 'Jon', 'Snow', 'jon_snow@gameofthrones.com')"; + sqlDatabaseClient.execute("inventory", sql); + } + + public static String getJdbcUrl() { + + return String.format(JDBC_POSTGRESQL_URL_FORMAT, POSTGRES_HOST, PostgresTestResourceLifecycleManager.getContainer().getMappedPort(POSTGRES_PORT).toString()); + } +} diff --git a/debezium-system-tests/src/test/java/io/debezium/server/TestConfigSource.java b/debezium-system-tests/src/test/java/io/debezium/server/TestConfigSource.java index 855ab042..146f3003 100644 --- a/debezium-system-tests/src/test/java/io/debezium/server/TestConfigSource.java +++ b/debezium-system-tests/src/test/java/io/debezium/server/TestConfigSource.java @@ -5,16 +5,17 @@ */ package io.debezium.server; -import io.debezium.data.Json; -import io.debezium.util.Testing; -import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; -import org.eclipse.microprofile.config.spi.ConfigSource; - import java.nio.file.Path; import java.util.HashMap; import java.util.Map; import java.util.Set; +import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; +import org.eclipse.microprofile.config.spi.ConfigSource; + +import io.debezium.data.Json; +import io.debezium.util.Testing; + /** * A config source used during tests. Amended/overridden by values exposed from test lifecycle listeners. */ @@ -22,22 +23,18 @@ public class TestConfigSource implements ConfigSource { public static final String OFFSETS_FILE = "file-connector-offsets.txt"; public static final Path OFFSET_STORE_PATH = Testing.Files.createTestingPath(OFFSETS_FILE).toAbsolutePath(); - public static final Path TEST_FILE_PATH = Testing.Files.createTestingPath("file-connector-input.txt").toAbsolutePath(); - final Map integrationTest = new HashMap<>(); - final Map kinesisTest = new HashMap<>(); - final Map pubsubTest = new HashMap<>(); - final Map unitTest = new HashMap<>(); - protected Map config; + final Map config = new HashMap<>(); public TestConfigSource() { - integrationTest.put("debezium.sink.type", "test"); - integrationTest.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector"); - integrationTest.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); - integrationTest.put("debezium.source.offset.flush.interval.ms", "0"); - integrationTest.put("debezium.source.topic.prefix", "testc"); - integrationTest.put("debezium.source.schema.include.list", "inventory"); - integrationTest.put("debezium.source.table.include.list", "inventory.customers"); + + config.put("debezium.sink.type", "test"); + config.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector"); + config.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); + config.put("debezium.source.offset.flush.interval.ms", "0"); + config.put("debezium.source.topic.prefix", "testc"); + config.put("debezium.source.schema.include.list", "inventory"); + config.put("debezium.source.table.include.list", "inventory.customers"); String format = System.getProperty("test.apicurio.converter.format"); String formatKey = System.getProperty("debezium.format.key"); @@ -45,61 +42,22 @@ public TestConfigSource() { String formatHeader = System.getProperty("debezium.format.header", "json"); if (format != null && format.length() != 0) { - integrationTest.put("debezium.format.key", format); - integrationTest.put("debezium.format.value", format); - integrationTest.put("debezium.format.header", formatHeader); + config.put("debezium.format.key", format); + config.put("debezium.format.value", format); + config.put("debezium.format.header", formatHeader); // TODO remove once https://github.com/Apicurio/apicurio-registry/issues/4351 is fixed - integrationTest.put("debezium.source.record.processing.threads", "1"); + config.put("debezium.source.record.processing.threads", "1"); } else { formatKey = (formatKey != null) ? formatKey : Json.class.getSimpleName().toLowerCase(); formatValue = (formatValue != null) ? formatValue : Json.class.getSimpleName().toLowerCase(); formatHeader = (formatHeader != null) ? formatHeader : Json.class.getSimpleName().toLowerCase(); - integrationTest.put("debezium.format.key", formatKey); - integrationTest.put("debezium.format.value", formatValue); - integrationTest.put("debezium.format.header", formatHeader); - } - - unitTest.put("debezium.sink.type", "test"); - unitTest.put("debezium.source.connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector"); - unitTest.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); - unitTest.put("debezium.source.offset.flush.interval.ms", "0"); - unitTest.put("debezium.source.file", TEST_FILE_PATH.toAbsolutePath().toString()); - unitTest.put("debezium.source.topic", "topicX"); - unitTest.put("debezium.format.header", formatHeader); - unitTest.put("debezium.format.schemas.enable", "true"); - unitTest.put("debezium.format.header.schemas.enable", "false"); - unitTest.put("debezium.format.value.schemas.enable", "false"); - unitTest.put("debezium.transforms", "hoist,addheader"); - unitTest.put("debezium.transforms.hoist.type", "org.apache.kafka.connect.transforms.HoistField$Value"); - unitTest.put("debezium.transforms.hoist.field", "line"); - unitTest.put("debezium.transforms.hoist.predicate", "topicNameMatch"); - unitTest.put("debezium.transforms.addheader.type", "org.apache.kafka.connect.transforms.InsertHeader"); - unitTest.put("debezium.transforms.addheader.header", "headerKey"); - unitTest.put("debezium.transforms.addheader.value.literal", "headerValue"); - - unitTest.put("debezium.predicates", "topicNameMatch"); - unitTest.put("debezium.predicates.topicNameMatch.type", "org.apache.kafka.connect.transforms.predicates.TopicNameMatches"); - unitTest.put("debezium.predicates.topicNameMatch.pattern", ".*"); - - // DBZ-2622 For testing properties passed via smallrye/microprofile environment variables - unitTest.put("DEBEZIUM_SOURCE_TABLE_INCLUDE_LIST", "public.table_name"); - unitTest.put("debezium_source_offset_flush_interval_ms_Test", "0"); - unitTest.put("debezium.source.snapshot.select.statement.overrides.public.table_name", "SELECT * FROM table_name WHERE 1>2"); - unitTest.put("debezium.source.database.allowPublicKeyRetrieval", "true"); - - if (isItTest()) { - config = integrationTest; - } - else { - config = unitTest; + config.put("debezium.format.key", formatKey); + config.put("debezium.format.value", formatValue); + config.put("debezium.format.header", formatHeader); } } - public static boolean isItTest() { - return "IT".equals(System.getProperty("test.type")); - } - @Override public Map getProperties() { return config; diff --git a/debezium-system-tests/src/test/java/io/debezium/server/TestConsumer.java b/debezium-system-tests/src/test/java/io/debezium/server/TestConsumer.java index dbc5025d..917f8981 100644 --- a/debezium-system-tests/src/test/java/io/debezium/server/TestConsumer.java +++ b/debezium-system-tests/src/test/java/io/debezium/server/TestConsumer.java @@ -5,19 +5,20 @@ */ package io.debezium.server; -import io.debezium.DebeziumException; -import io.debezium.engine.ChangeEvent; -import io.debezium.engine.DebeziumEngine; -import io.debezium.engine.DebeziumEngine.RecordCommitter; -import io.debezium.util.Testing; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import jakarta.enterprise.context.Dependent; import jakarta.inject.Named; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import io.debezium.DebeziumException; +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.DebeziumEngine.RecordCommitter; +import io.debezium.util.Testing; @Dependent @Named("test") @@ -36,8 +37,8 @@ void close() { } @Override - public void handleBatch(List> records, RecordCommitter> committer) - throws InterruptedException { + public void handleBatch(List> records, RecordCommitter> committer) { + records.forEach(record -> { Testing.print(record); values.add(record.value()); From 0293d456bc89d4548d16c8a39e3a879b925b19dd Mon Sep 17 00:00:00 2001 From: mfvitale Date: Mon, 8 Apr 2024 11:19:08 +0200 Subject: [PATCH 3/3] DBZ-7732 Add assembly profile to debezium-system-tests --- debezium-system-tests/pom.xml | 106 +++++++++++++++++++++++++++++++++- 1 file changed, 105 insertions(+), 1 deletion(-) diff --git a/debezium-system-tests/pom.xml b/debezium-system-tests/pom.xml index aecc6a42..cb078bdb 100644 --- a/debezium-system-tests/pom.xml +++ b/debezium-system-tests/pom.xml @@ -9,7 +9,7 @@ 4.0.0 debezium-system-tests - Debezium system integration test-suite + Debezium System Integration test-suite jar http://maven.apache.org @@ -76,4 +76,108 @@ test + + + + + io.quarkus + quarkus-maven-plugin + ${quarkus.version.runtime} + + + + build + + + + + + org.jboss.jandex + jandex-maven-plugin + + + make-index + + jandex + + + + + + org.apache.maven.plugins + maven-failsafe-plugin + + + integration-test + + integration-test + + + + verify + + verify + + + + + ${skipITs} + true + + IT + false + + ${runOrder} + + + + + + + assembly + + false + + + + + org.apache.maven.plugins + maven-failsafe-plugin + ${version.failsafe.plugin} + + ${skipITs} + true + ${runOrder} + + + + integration-test + + integration-test + + + + verify + + verify + + + + + + + + + quick + + false + + quick + + + + true + + +