diff --git a/debezium-server-bom/pom.xml b/debezium-server-bom/pom.xml
index b9eed7dc..de532a2d 100644
--- a/debezium-server-bom/pom.xml
+++ b/debezium-server-bom/pom.xml
@@ -251,6 +251,12 @@
+
+ io.debezium
+ debezium-testing-system
+ ${project.version}
+ test
+
org.junit-pioneer
junit-pioneer
diff --git a/debezium-system-tests/pom.xml b/debezium-system-tests/pom.xml
new file mode 100644
index 00000000..cb078bdb
--- /dev/null
+++ b/debezium-system-tests/pom.xml
@@ -0,0 +1,183 @@
+
+
+ io.debezium
+ debezium-server
+ 2.7.0-SNAPSHOT
+ ../pom.xml
+
+
+ 4.0.0
+ debezium-system-tests
+ Debezium System Integration test-suite
+ jar
+
+ http://maven.apache.org
+
+
+ UTF-8
+
+
+
+
+
+ io.quarkus
+ quarkus-junit5
+ test
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+ io.debezium
+ debezium-testing-testcontainers
+ test
+
+
+ io.debezium
+ debezium-testing-system
+ test
+
+
+ io.debezium
+ debezium-server-core
+ test
+
+
+ io.debezium
+ debezium-connector-oracle
+ test
+
+
+ io.debezium
+ debezium-connector-postgres
+ test
+
+
+ io.debezium
+ debezium-connector-mysql
+ test
+
+
+ io.debezium
+ debezium-connector-sqlserver
+ test
+
+
+ io.debezium
+ debezium-connector-mongodb
+ test
+
+
+ io.debezium
+ debezium-connector-db2
+ test
+
+
+
+
+
+
+ io.quarkus
+ quarkus-maven-plugin
+ ${quarkus.version.runtime}
+
+
+
+ build
+
+
+
+
+
+ org.jboss.jandex
+ jandex-maven-plugin
+
+
+ make-index
+
+ jandex
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-failsafe-plugin
+
+
+ integration-test
+
+ integration-test
+
+
+
+ verify
+
+ verify
+
+
+
+
+ ${skipITs}
+ true
+
+ IT
+ false
+
+ ${runOrder}
+
+
+
+
+
+
+ assembly
+
+ false
+
+
+
+
+ org.apache.maven.plugins
+ maven-failsafe-plugin
+ ${version.failsafe.plugin}
+
+ ${skipITs}
+ true
+ ${runOrder}
+
+
+
+ integration-test
+
+ integration-test
+
+
+
+ verify
+
+ verify
+
+
+
+
+
+
+
+
+ quick
+
+ false
+
+ quick
+
+
+
+ true
+
+
+
+
diff --git a/debezium-system-tests/src/test/java/io/debezium/server/DebeziumServerPostgresIT.java b/debezium-system-tests/src/test/java/io/debezium/server/DebeziumServerPostgresIT.java
new file mode 100644
index 00000000..0e8f7da0
--- /dev/null
+++ b/debezium-system-tests/src/test/java/io/debezium/server/DebeziumServerPostgresIT.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.server;
+
+import static io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager.JDBC_POSTGRESQL_URL_FORMAT;
+import static io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager.POSTGRES_HOST;
+import static io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager.POSTGRES_PORT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.sql.SQLException;
+import java.time.Duration;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import jakarta.inject.Inject;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+
+import io.debezium.config.CommonConnectorConfig;
+import io.debezium.testing.system.tools.databases.SqlDatabaseClient;
+import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
+import io.debezium.util.Testing;
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+
+/**
+ * Integration test that verifies basic reading from PostgreSQL database.
+ *
+ * @author Fiore Mario Vitale
+ */
+@QuarkusTest
+@QuarkusTestResource(PostgresTestResourceLifecycleManager.class)
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class DebeziumServerPostgresIT {
+
+ private static final int MESSAGE_COUNT = 4;
+ @Inject
+ DebeziumServer server;
+
+ @Inject
+ DebeziumMetrics metrics;
+
+ {
+ Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH);
+ }
+
+ @Test
+ @Order(1)
+ public void shouldSnapshot() {
+
+ Testing.Print.enable();
+
+ final TestConsumer testConsumer = (TestConsumer) server.getConsumer();
+
+ waitSnapshotCompletion();
+
+ assertThat(testConsumer.getValues().size()).isEqualTo(MESSAGE_COUNT);
+
+ List values = testConsumer.getValues().stream().map(Object::toString).collect(Collectors.toList());
+
+ assertThat(values.get(0)).contains("\"after\":{\"id\":1001,\"first_name\":\"Sally\",\"last_name\":\"Thomas\",\"email\":\"sally.thomas@acme.com\"}");
+ assertThat(values.get(1)).contains("\"after\":{\"id\":1002,\"first_name\":\"George\",\"last_name\":\"Bailey\",\"email\":\"gbailey@foobar.com\"}");
+ assertThat(values.get(2)).contains("\"after\":{\"id\":1003,\"first_name\":\"Edward\",\"last_name\":\"Walker\",\"email\":\"ed@walker.com\"}");
+ assertThat(values.get(3)).contains("\"after\":{\"id\":1004,\"first_name\":\"Anne\",\"last_name\":\"Kretchmar\",\"email\":\"annek@noanswer.org\"}");
+ }
+
+ @Test
+ @Order(2)
+ public void shouldStream() throws SQLException {
+ Testing.Print.enable();
+
+ final TestConsumer testConsumer = (TestConsumer) server.getConsumer();
+
+ waitSnapshotCompletion();
+
+ insertNewRow();
+
+ Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds()))
+ .until(() -> (testConsumer.getValues().size() >= MESSAGE_COUNT + 1));
+
+ List values = testConsumer.getValues().stream().map(Object::toString).collect(Collectors.toList());
+
+ assertThat(values.get(0)).contains("\"after\":{\"id\":1001,\"first_name\":\"Sally\",\"last_name\":\"Thomas\",\"email\":\"sally.thomas@acme.com\"}");
+ assertThat(values.get(1)).contains("\"after\":{\"id\":1002,\"first_name\":\"George\",\"last_name\":\"Bailey\",\"email\":\"gbailey@foobar.com\"}");
+ assertThat(values.get(2)).contains("\"after\":{\"id\":1003,\"first_name\":\"Edward\",\"last_name\":\"Walker\",\"email\":\"ed@walker.com\"}");
+ assertThat(values.get(3)).contains("\"after\":{\"id\":1004,\"first_name\":\"Anne\",\"last_name\":\"Kretchmar\",\"email\":\"annek@noanswer.org\"}");
+ assertThat(values.get(4)).contains("\"after\":{\"id\":1005,\"first_name\":\"Jon\",\"last_name\":\"Snow\",\"email\":\"jon_snow@gameofthrones.com\"}");
+
+ }
+
+ private void waitSnapshotCompletion() {
+ Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> {
+ try {
+ // snapshot process finished
+ // and consuming events finished!
+ return metrics.snapshotCompleted()
+ && metrics.streamingQueueCurrentSize() == 0
+ && metrics.maxQueueSize() == CommonConnectorConfig.DEFAULT_MAX_QUEUE_SIZE;
+ }
+ catch (Exception e) {
+ return false;
+ }
+ });
+ }
+
+ private static void insertNewRow() throws SQLException {
+
+ SqlDatabaseClient sqlDatabaseClient = new SqlDatabaseClient(getJdbcUrl(),
+ PostgresTestResourceLifecycleManager.POSTGRES_USER,
+ PostgresTestResourceLifecycleManager.POSTGRES_PASSWORD);
+
+ String sql = "INSERT INTO inventory.customers VALUES (default, 'Jon', 'Snow', 'jon_snow@gameofthrones.com')";
+ sqlDatabaseClient.execute("inventory", sql);
+ }
+
+ public static String getJdbcUrl() {
+
+ return String.format(JDBC_POSTGRESQL_URL_FORMAT, POSTGRES_HOST, PostgresTestResourceLifecycleManager.getContainer().getMappedPort(POSTGRES_PORT).toString());
+ }
+}
diff --git a/debezium-system-tests/src/test/java/io/debezium/server/TestConfigSource.java b/debezium-system-tests/src/test/java/io/debezium/server/TestConfigSource.java
new file mode 100644
index 00000000..146f3003
--- /dev/null
+++ b/debezium-system-tests/src/test/java/io/debezium/server/TestConfigSource.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.server;
+
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.eclipse.microprofile.config.spi.ConfigSource;
+
+import io.debezium.data.Json;
+import io.debezium.util.Testing;
+
+/**
+ * A config source used during tests. Amended/overridden by values exposed from test lifecycle listeners.
+ */
+public class TestConfigSource implements ConfigSource {
+
+ public static final String OFFSETS_FILE = "file-connector-offsets.txt";
+ public static final Path OFFSET_STORE_PATH = Testing.Files.createTestingPath(OFFSETS_FILE).toAbsolutePath();
+
+ final Map config = new HashMap<>();
+
+ public TestConfigSource() {
+
+ config.put("debezium.sink.type", "test");
+ config.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
+ config.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
+ config.put("debezium.source.offset.flush.interval.ms", "0");
+ config.put("debezium.source.topic.prefix", "testc");
+ config.put("debezium.source.schema.include.list", "inventory");
+ config.put("debezium.source.table.include.list", "inventory.customers");
+
+ String format = System.getProperty("test.apicurio.converter.format");
+ String formatKey = System.getProperty("debezium.format.key");
+ String formatValue = System.getProperty("debezium.format.value");
+ String formatHeader = System.getProperty("debezium.format.header", "json");
+
+ if (format != null && format.length() != 0) {
+ config.put("debezium.format.key", format);
+ config.put("debezium.format.value", format);
+ config.put("debezium.format.header", formatHeader);
+ // TODO remove once https://github.com/Apicurio/apicurio-registry/issues/4351 is fixed
+ config.put("debezium.source.record.processing.threads", "1");
+ }
+ else {
+ formatKey = (formatKey != null) ? formatKey : Json.class.getSimpleName().toLowerCase();
+ formatValue = (formatValue != null) ? formatValue : Json.class.getSimpleName().toLowerCase();
+ formatHeader = (formatHeader != null) ? formatHeader : Json.class.getSimpleName().toLowerCase();
+ config.put("debezium.format.key", formatKey);
+ config.put("debezium.format.value", formatValue);
+ config.put("debezium.format.header", formatHeader);
+ }
+ }
+
+ @Override
+ public Map getProperties() {
+ return config;
+ }
+
+ @Override
+ public String getValue(String propertyName) {
+ return config.get(propertyName);
+ }
+
+ @Override
+ public String getName() {
+ return "test";
+ }
+
+ @Override
+ public Set getPropertyNames() {
+ return config.keySet();
+ }
+
+ public static int waitForSeconds() {
+ return 60;
+ }
+}
diff --git a/debezium-system-tests/src/test/java/io/debezium/server/TestConsumer.java b/debezium-system-tests/src/test/java/io/debezium/server/TestConsumer.java
new file mode 100644
index 00000000..917f8981
--- /dev/null
+++ b/debezium-system-tests/src/test/java/io/debezium/server/TestConsumer.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.server;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.Dependent;
+import jakarta.inject.Named;
+
+import io.debezium.DebeziumException;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.engine.DebeziumEngine.RecordCommitter;
+import io.debezium.util.Testing;
+
+@Dependent
+@Named("test")
+public class TestConsumer implements DebeziumEngine.ChangeConsumer> {
+
+ final List