Skip to content

Commit

Permalink
DBZ-7723 Remove dependency on debezium-connector-mysql test-jar
Browse files Browse the repository at this point in the history
  • Loading branch information
Naros committed Mar 30, 2024
1 parent ff9119d commit 8d42eeb
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 27 deletions.
6 changes: 0 additions & 6 deletions debezium-server-redis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,6 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-server-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,21 @@
*/
package io.debezium.server.redis;

import static io.debezium.jdbc.JdbcConnection.patternBasedFactory;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.List;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.strategy.AbstractConnectorConnection;
import io.debezium.doc.FixFor;
import io.debezium.relational.history.AbstractSchemaHistoryTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.relational.history.SchemaHistoryMetrics;
import io.debezium.testing.testcontainers.MySqlTestResourceLifecycleManager;
Expand All @@ -40,28 +41,30 @@
@QuarkusIntegrationTest
@TestProfile(RedisSchemaHistoryTestProfile.class)
@QuarkusTestResource(RedisTestResourceLifecycleManager.class)
public class RedisSchemaHistoryIT extends AbstractSchemaHistoryTest {
public class RedisSchemaHistoryIT {

private static final String STREAM_NAME = "metadata:debezium:schema_history";
private static final int INIT_HISTORY_SIZE = 16; // Initial number of entries in the schema history stream.

protected static Jedis jedis;
protected SchemaHistory history;

@Override
@BeforeEach
public void beforeEach() {
super.beforeEach();
}

@Override
protected SchemaHistory createHistory() {
SchemaHistory history = new RedisSchemaHistory();
this.history = new RedisSchemaHistory();

history.configure(Configuration.create()
.with("schema.history.internal.redis.address", HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress()))
.with("schema.history.internal.redis.address",
HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress()))
.build(), null, SchemaHistoryMetrics.NOOP, true);
history.start();
return history;
}

@AfterEach
public void afterEach() {
if (this.history != null) {
this.history.stop();
}
}

@Test
Expand All @@ -77,12 +80,6 @@ public void testSchemaHistoryIsSaved() {
assertTrue(entries.stream().anyMatch(item -> item.getFields().get("schema").contains("CREATE TABLE `customers`")));
}

@Test
@FixFor("DBZ-4771")
public void shouldRecordChangesAndRecoverToVariousPoints() {
super.shouldRecordChangesAndRecoverToVariousPoints();
}

/**
* Test retry mechanism when encountering Redis connectivity issues:
* 1. Make Redis unavailable while the server is up
Expand All @@ -102,7 +99,7 @@ public void testRedisConnectionRetry() throws Exception {
Testing.print("Pausing container");
RedisTestResourceLifecycleManager.pause();

final AbstractConnectorConnection connection = getMySqlConnection();
final JdbcConnection connection = getMySqlConnection();
connection.connect();
Testing.print("Creating new redis_test table and inserting 5 records to it");
connection.execute("CREATE TABLE IF NOT EXISTS inventory.redis_test (id INT PRIMARY KEY)");
Expand All @@ -123,7 +120,7 @@ public void testRedisConnectionRetry() throws Exception {
assertTrue(entries.get(INIT_HISTORY_SIZE).getFields().get("schema").contains("redis_test"));
}

private AbstractConnectorConnection getMySqlConnection() {
private JdbcConnection getMySqlConnection() {
final Configuration config = Configuration.create()
.with("database.user", MySqlTestResourceLifecycleManager.PRIVILEGED_USER)
.with("database.password", MySqlTestResourceLifecycleManager.PRIVILEGED_PASSWORD)
Expand All @@ -132,6 +129,21 @@ private AbstractConnectorConnection getMySqlConnection() {
.with("database.port", MySqlTestResourceLifecycleManager.getContainer().getMappedPort(MySqlTestResourceLifecycleManager.PORT))
.with("driver.protocol", "tcp")
.build();
return new MySqlConnectorConfig(config).getConnectorAdapter().createConnection(config);

// Intentionally set URI protocol as "jdbc:mysql" to avoid conflict with driver.protocol specified above
final String url = "jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false"
+ "&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL"
+ "&connectTimeout=30000";

// Using JdbcConnection here to avoid the need to depend on internals of the MySQL
// connector which could be refactored or changed at various points.
final MySqlConnectorConfig connectorConfig = new MySqlConnectorConfig(config);

// Creating the JdbcConnection directly
return new JdbcConnection(
connectorConfig.getJdbcConfig(),
patternBasedFactory(url, MySqlConnectorConfig.JDBC_PROTOCOL),
"`",
"`");
}
}

0 comments on commit 8d42eeb

Please sign in to comment.