Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-7732 Add connector specific annotation to SnapshotLock and SnapshotQuery implementations #85

Merged
merged 3 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions debezium-server-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,12 @@
</dependency>

<!-- Testing -->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-testing-system</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit-pioneer</groupId>
<artifactId>junit-pioneer</artifactId>
Expand Down
183 changes: 183 additions & 0 deletions debezium-system-tests/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.debezium</groupId>
<artifactId>debezium-server</artifactId>
<version>2.7.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<modelVersion>4.0.0</modelVersion>
<artifactId>debezium-system-tests</artifactId>
<name>Debezium System Integration test-suite</name>
<packaging>jar</packaging>

<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<!-- Testing -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-testing-testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-testing-system</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-server-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-oracle</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-sqlserver</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-db2</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${quarkus.version.runtime}</version>
<executions>
<execution>
<goals>
<goal>build</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jboss.jandex</groupId>
<artifactId>jandex-maven-plugin</artifactId>
<executions>
<execution>
<id>make-index</id>
<goals>
<goal>jandex</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<executions>
<execution>
<id>integration-test</id>
<goals>
<goal>integration-test</goal>
</goals>
</execution>
<execution>
<id>verify</id>
<goals>
<goal>verify</goal>
</goals>
</execution>
</executions>
<configuration>
<skipTests>${skipITs}</skipTests>
<enableAssertions>true</enableAssertions>
<systemProperties>
<test.type>IT</test.type>
<test.apicurio>false</test.apicurio>
</systemProperties>
<runOrder>${runOrder}</runOrder>
</configuration>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>assembly</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>${version.failsafe.plugin}</version>
<configuration>
<skipTests>${skipITs}</skipTests>
<enableAssertions>true</enableAssertions>
<runOrder>${runOrder}</runOrder>
</configuration>
<executions>
<execution>
<id>integration-test</id>
<goals>
<goal>integration-test</goal>
</goals>
</execution>
<execution>
<id>verify</id>
<goals>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>quick</id>
<activation>
<activeByDefault>false</activeByDefault>
<property>
<name>quick</name>
</property>
</activation>
<properties>
<skipITs>true</skipITs>
</properties>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server;

import static io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager.JDBC_POSTGRESQL_URL_FORMAT;
import static io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager.POSTGRES_HOST;
import static io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager.POSTGRES_PORT;
import static org.assertj.core.api.Assertions.assertThat;

import java.sql.SQLException;
import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;

import jakarta.inject.Inject;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.testing.system.tools.databases.SqlDatabaseClient;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;

/**
* Integration test that verifies basic reading from PostgreSQL database.
*
* @author Fiore Mario Vitale
*/
@QuarkusTest
@QuarkusTestResource(PostgresTestResourceLifecycleManager.class)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class DebeziumServerPostgresIT {

private static final int MESSAGE_COUNT = 4;
@Inject
DebeziumServer server;

@Inject
DebeziumMetrics metrics;

{
Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH);
}

@Test
@Order(1)
public void shouldSnapshot() {

Testing.Print.enable();

final TestConsumer testConsumer = (TestConsumer) server.getConsumer();

waitSnapshotCompletion();

assertThat(testConsumer.getValues().size()).isEqualTo(MESSAGE_COUNT);

List<String> values = testConsumer.getValues().stream().map(Object::toString).collect(Collectors.toList());

assertThat(values.get(0)).contains("\"after\":{\"id\":1001,\"first_name\":\"Sally\",\"last_name\":\"Thomas\",\"email\":\"[email protected]\"}");
assertThat(values.get(1)).contains("\"after\":{\"id\":1002,\"first_name\":\"George\",\"last_name\":\"Bailey\",\"email\":\"[email protected]\"}");
assertThat(values.get(2)).contains("\"after\":{\"id\":1003,\"first_name\":\"Edward\",\"last_name\":\"Walker\",\"email\":\"[email protected]\"}");
assertThat(values.get(3)).contains("\"after\":{\"id\":1004,\"first_name\":\"Anne\",\"last_name\":\"Kretchmar\",\"email\":\"[email protected]\"}");
}

@Test
@Order(2)
public void shouldStream() throws SQLException {
Testing.Print.enable();

final TestConsumer testConsumer = (TestConsumer) server.getConsumer();

waitSnapshotCompletion();

insertNewRow();

Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds()))
.until(() -> (testConsumer.getValues().size() >= MESSAGE_COUNT + 1));

List<String> values = testConsumer.getValues().stream().map(Object::toString).collect(Collectors.toList());

assertThat(values.get(0)).contains("\"after\":{\"id\":1001,\"first_name\":\"Sally\",\"last_name\":\"Thomas\",\"email\":\"[email protected]\"}");
assertThat(values.get(1)).contains("\"after\":{\"id\":1002,\"first_name\":\"George\",\"last_name\":\"Bailey\",\"email\":\"[email protected]\"}");
assertThat(values.get(2)).contains("\"after\":{\"id\":1003,\"first_name\":\"Edward\",\"last_name\":\"Walker\",\"email\":\"[email protected]\"}");
assertThat(values.get(3)).contains("\"after\":{\"id\":1004,\"first_name\":\"Anne\",\"last_name\":\"Kretchmar\",\"email\":\"[email protected]\"}");
assertThat(values.get(4)).contains("\"after\":{\"id\":1005,\"first_name\":\"Jon\",\"last_name\":\"Snow\",\"email\":\"[email protected]\"}");

}

private void waitSnapshotCompletion() {
Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> {
try {
// snapshot process finished
// and consuming events finished!
return metrics.snapshotCompleted()
&& metrics.streamingQueueCurrentSize() == 0
&& metrics.maxQueueSize() == CommonConnectorConfig.DEFAULT_MAX_QUEUE_SIZE;
}
catch (Exception e) {
return false;
}
});
}

private static void insertNewRow() throws SQLException {

SqlDatabaseClient sqlDatabaseClient = new SqlDatabaseClient(getJdbcUrl(),
PostgresTestResourceLifecycleManager.POSTGRES_USER,
PostgresTestResourceLifecycleManager.POSTGRES_PASSWORD);

String sql = "INSERT INTO inventory.customers VALUES (default, 'Jon', 'Snow', '[email protected]')";
sqlDatabaseClient.execute("inventory", sql);
}

public static String getJdbcUrl() {

return String.format(JDBC_POSTGRESQL_URL_FORMAT, POSTGRES_HOST, PostgresTestResourceLifecycleManager.getContainer().getMappedPort(POSTGRES_PORT).toString());
}
}
Loading