Skip to content

Commit

Permalink
Merge pull request #358 from alex268/master
Browse files Browse the repository at this point in the history
Added simple integration test to topics module
  • Loading branch information
alex268 authored Jan 23, 2025
2 parents 5f7959f + bc11969 commit 4271d4a
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 8 deletions.
4 changes: 4 additions & 0 deletions topic/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>tech.ydb.test</groupId>
<artifactId>ydb-junit4-support</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import org.junit.Assert;
import org.junit.Test;
import org.junit.function.ThrowingRunnable;

import tech.ydb.topic.description.OffsetsRange;
import tech.ydb.topic.read.impl.DisjointOffsetRangeSet;
Expand All @@ -28,19 +29,26 @@ public void testRangesSimple() {
Assert.assertEquals(4, rangesResult.get(1).getEnd());
}

private void assertException(ThrowingRunnable runnable, String addedRange, String existingRange) {
RuntimeException ex = Assert.assertThrows(RuntimeException.class, runnable);
String message = "Error adding new offset range. Added range " + addedRange +
" clashes with existing range " + existingRange;
Assert.assertEquals(message, ex.getMessage());
}

@Test
public void testReuseRangeSet() {
DisjointOffsetRangeSet ranges = new DisjointOffsetRangeSet();

ranges.add(new OffsetsRangeImpl(30, 40));
ranges.add(new OffsetsRangeImpl(10, 20));
ranges.add(new OffsetsRangeImpl(0, 9));
Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(8, 11)));
Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(8, 10)));
Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(9, 11)));
Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(25, 31)));
Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(31, 100)));
Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(25, 100)));
assertException(() -> ranges.add(new OffsetsRangeImpl(8, 11)), "[8,11)", "[0,9)");
assertException(() -> ranges.add(new OffsetsRangeImpl(8, 10)), "[8,10)", "[0,9)");
assertException(() -> ranges.add(new OffsetsRangeImpl(9, 11)), "[9,11)", "[10,20)");
assertException(() -> ranges.add(new OffsetsRangeImpl(25, 31)), "[25,31)", "[30,40)");
assertException(() -> ranges.add(new OffsetsRangeImpl(31, 100)), "[31,100)", "[30,40)");
assertException(() -> ranges.add(new OffsetsRangeImpl(25, 100)), "[25,100)", "[30,40)");
ranges.add(new OffsetsRangeImpl(9, 10));
List<OffsetsRange> firstResult = ranges.getRangesAndClear();
Assert.assertEquals(2, firstResult.size());
Expand Down
230 changes: 230 additions & 0 deletions topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package tech.ydb.topic.impl;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import tech.ydb.core.Status;
import tech.ydb.test.junit4.GrpcTransportRule;
import tech.ydb.topic.TopicClient;
import tech.ydb.topic.description.Consumer;
import tech.ydb.topic.description.TopicDescription;
import tech.ydb.topic.read.AsyncReader;
import tech.ydb.topic.read.DeferredCommitter;
import tech.ydb.topic.read.SyncReader;
import tech.ydb.topic.read.events.AbstractReadEventHandler;
import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.settings.CreateTopicSettings;
import tech.ydb.topic.settings.ReadEventHandlersSettings;
import tech.ydb.topic.settings.ReaderSettings;
import tech.ydb.topic.settings.TopicReadSettings;
import tech.ydb.topic.settings.WriterSettings;
import tech.ydb.topic.write.Message;
import tech.ydb.topic.write.SyncWriter;

/**
*
* @author Aleksandr Gorshenin
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class YdbTopicsIntegrationTest {
private final static Logger logger = LoggerFactory.getLogger(YdbTopicsIntegrationTest.class);

@ClassRule
public final static GrpcTransportRule ydbTransport = new GrpcTransportRule();

private final static String TEST_TOPIC = "integration_test_topic";
private final static String TEST_CONSUMER1 = "consumer";
private final static String TEST_CONSUMER2 = "other_consumer";

private static TopicClient client;

private final static byte[][] TEST_MESSAGES = new byte[][] {
"Test message".getBytes(),
"".getBytes(),
" ".getBytes(),
"Other message".getBytes(),
"Last message".getBytes(),
};

@BeforeClass
public static void initTopic() {
logger.info("Create test topic {} ...", TEST_TOPIC);

client = TopicClient.newClient(ydbTransport).build();
client.createTopic(TEST_TOPIC, CreateTopicSettings.newBuilder()
.addConsumer(Consumer.newBuilder().setName(TEST_CONSUMER1).build())
.addConsumer(Consumer.newBuilder().setName(TEST_CONSUMER2).build())
.build()
).join().expectSuccess("can't create a new topic");
}

@AfterClass
public static void dropTopic() {
logger.info("Drop test topic {} ...", TEST_TOPIC);
Status dropStatus = client.dropTopic(TEST_TOPIC).join();
client.close();
dropStatus.expectSuccess("can't drop test topic");
}

@Test
public void step01_writeWithoutDeduplication() throws InterruptedException, ExecutionException, TimeoutException {
WriterSettings settings = WriterSettings.newBuilder()
.setTopicPath(TEST_TOPIC)
.build();
SyncWriter writer = client.createSyncWriter(settings);
writer.init();

for (int idx = 0; idx < TEST_MESSAGES.length; idx += 1) {
writer.send(Message.newBuilder().setData(TEST_MESSAGES[idx]).build());
}

for (int idx = TEST_MESSAGES.length - 1; idx >= 0; idx -= 1) {
writer.send(Message.newBuilder().setData(TEST_MESSAGES[idx]).build());
}

writer.flush();
writer.shutdown(1, TimeUnit.MINUTES);
}

@Test
public void step02_readHalfWithoutCommit() throws InterruptedException {
ReaderSettings settings = ReaderSettings.newBuilder()
.addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build())
.setConsumerName(TEST_CONSUMER1)
.build();

SyncReader reader = client.createSyncReader(settings);
reader.initAndWait();

for (byte[] bytes: TEST_MESSAGES) {
tech.ydb.topic.read.Message msg = reader.receive();
Assert.assertArrayEquals(bytes, msg.getData());
}

reader.shutdown();
}

@Test
public void step03_readHalfWithCommit() throws InterruptedException {
ReaderSettings settings = ReaderSettings.newBuilder()
.addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build())
.setConsumerName(TEST_CONSUMER1)
.build();

SyncReader reader = client.createSyncReader(settings);
reader.initAndWait();

for (byte[] bytes: TEST_MESSAGES) {
tech.ydb.topic.read.Message msg = reader.receive();
Assert.assertArrayEquals(bytes, msg.getData());
msg.commit();
}

reader.shutdown();
}

@Test
public void step03_readNextHalfWithoutCommit() throws InterruptedException {
ReaderSettings settings = ReaderSettings.newBuilder()
.addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build())
.setConsumerName(TEST_CONSUMER1)
.build();

SyncReader reader = client.createSyncReader(settings);
reader.initAndWait();

for (int idx = TEST_MESSAGES.length - 1; idx >= 0; idx -= 1) {
tech.ydb.topic.read.Message msg = reader.receive();
Assert.assertArrayEquals(TEST_MESSAGES[idx], msg.getData());
}

reader.shutdown();
}

@Test
public void step04_readNextHalfWithCommit() throws InterruptedException {
ReaderSettings settings = ReaderSettings.newBuilder()
.addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build())
.setConsumerName(TEST_CONSUMER1)
.build();

SyncReader reader = client.createSyncReader(settings);
reader.initAndWait();

DeferredCommitter committer = DeferredCommitter.newInstance();
for (int idx = TEST_MESSAGES.length - 1; idx >= 0; idx -= 1) {
tech.ydb.topic.read.Message msg = reader.receive();
Assert.assertArrayEquals(TEST_MESSAGES[idx], msg.getData());
committer.add(msg);
}

committer.commit();
reader.shutdown();
}

@Test
public void step05_describeTopic() throws InterruptedException {
TopicDescription description = client.describeTopic(TEST_TOPIC).join().getValue();

Assert.assertNull(description.getTopicStats());
List<Consumer> consumers = description.getConsumers();
Assert.assertEquals(2, consumers.size());

Assert.assertEquals(TEST_CONSUMER1, consumers.get(0).getName());
Assert.assertEquals(TEST_CONSUMER2, consumers.get(1).getName());
}

@Test
public void step06_readAllByAsyncReader() throws InterruptedException {
ReaderSettings settings = ReaderSettings.newBuilder()
.addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build())
.setConsumerName(TEST_CONSUMER2)
.build();

final AtomicInteger atomicIdx = new AtomicInteger(0);
final CompletableFuture<Void> wait = new CompletableFuture<>();
final byte[][] results = new byte[TEST_MESSAGES.length * 2][];

AsyncReader reader = client.createAsyncReader(settings, ReadEventHandlersSettings.newBuilder()
.setEventHandler(new AbstractReadEventHandler() {
@Override
public void onMessages(DataReceivedEvent dre) {
for (tech.ydb.topic.read.Message msg: dre.getMessages()) {
int idx = atomicIdx.getAndIncrement();
if (idx < results.length) {
results[idx] = msg.getData();
}
if (idx >= results.length - 1) {
wait.complete(null);
return;
}
}
}
})
.build());

reader.init();
wait.join();

reader.shutdown().join();

for (int idx = 0; idx < TEST_MESSAGES.length; idx += 1) {
Assert.assertArrayEquals(TEST_MESSAGES[idx], results[idx]);
Assert.assertArrayEquals(TEST_MESSAGES[idx], results[results.length - idx - 1]);
}
}
}
12 changes: 10 additions & 2 deletions topic/src/test/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,20 @@
</Appenders>

<Loggers>
<Logger name="io.netty" level="warn" additivity="false">
<Logger name="io.grpc" level="warn" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="tech.ydb.core.grpc" level="error" additivity="false">
<Logger name="tech.ydb" level="debug" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="tech.ydb.topic" level="debug" additivity="false">
<AppenderRef ref="Console"/>
</Logger>

<!-- https://www.testcontainers.org/supported_docker_environment/logging_config/ -->
<Logger name="org.testcontainers" level="warn" />
<Logger name="com.github.dockerjava" level="warn"/>
<Logger name="com.github.dockerjava.zerodep.shaded.org.apache.hc.client5.http.wire" level="off"/>

<Root level="debug" >
<AppenderRef ref="Console"/>
Expand Down

0 comments on commit 4271d4a

Please sign in to comment.