Skip to content

Commit

Permalink
Merge pull request #338 from Myllyenko/replacing_synchronized_with_re…
Browse files Browse the repository at this point in the history
…entrant_locks

Replacing synchronized blocks with ReentrantLocks in topic-related part of SDK
  • Loading branch information
pnv1 authored Oct 28, 2024
2 parents c513c4d + f887345 commit 855fab7
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 67 deletions.
93 changes: 56 additions & 37 deletions topic/src/main/java/tech/ydb/topic/impl/SessionBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;

Expand All @@ -19,6 +20,7 @@ public abstract class SessionBase<R, W> implements Session {

protected final GrpcReadWriteStream<R, W> streamConnection;
protected final AtomicBoolean isWorking = new AtomicBoolean(true);
private final ReentrantLock lock = new ReentrantLock();
private String token;

public SessionBase(GrpcReadWriteStream<R, W> streamConnection) {
Expand All @@ -32,57 +34,74 @@ public SessionBase(GrpcReadWriteStream<R, W> streamConnection) {

protected abstract void onStop();

protected synchronized CompletableFuture<Status> start(GrpcReadStream.Observer<R> streamObserver) {
getLogger().info("Session start");
return streamConnection.start(message -> {
if (getLogger().isTraceEnabled()) {
getLogger().trace("Message received:\n{}", message);
} else {
getLogger().debug("Message received");
}
protected CompletableFuture<Status> start(GrpcReadStream.Observer<R> streamObserver) {
lock.lock();

try {
getLogger().info("Session start");
return streamConnection.start(message -> {
if (getLogger().isTraceEnabled()) {
getLogger().trace("Message received:\n{}", message);
} else {
getLogger().debug("Message received");
}

if (isWorking.get()) {
streamObserver.onNext(message);
}
});
} finally {
lock.unlock();
}
}

public void send(W request) {
lock.lock();

if (isWorking.get()) {
streamObserver.onNext(message);
try {
if (!isWorking.get()) {
if (getLogger().isTraceEnabled()) {
getLogger().trace("Session is already closed. This message is NOT sent:\n{}", request);
}
return;
}
String currentToken = streamConnection.authToken();
if (!Objects.equals(token, currentToken)) {
token = currentToken;
getLogger().info("Sending new token");
sendUpdateTokenRequest(token);
}
});
}

public synchronized void send(W request) {
if (!isWorking.get()) {
if (getLogger().isTraceEnabled()) {
getLogger().trace("Session is already closed. This message is NOT sent:\n{}", request);
getLogger().trace("Sending request:\n{}", request);
} else {
getLogger().debug("Sending request");
}
return;
streamConnection.sendNext(request);
} finally {
lock.unlock();
}
String currentToken = streamConnection.authToken();
if (!Objects.equals(token, currentToken)) {
token = currentToken;
getLogger().info("Sending new token");
sendUpdateTokenRequest(token);
}

if (getLogger().isTraceEnabled()) {
getLogger().trace("Sending request:\n{}", request);
} else {
getLogger().debug("Sending request");
}
streamConnection.sendNext(request);
}

private boolean stop() {
getLogger().info("Session stop");
return isWorking.compareAndSet(true, false);
}


@Override
public synchronized boolean shutdown() {
getLogger().info("Session shutdown");
if (stop()) {
onStop();
streamConnection.close();
return true;
public boolean shutdown() {
lock.lock();

try {
getLogger().info("Session shutdown");
if (stop()) {
onStop();
streamConnection.close();
return true;
}
return false;
} finally {
lock.unlock();
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -24,15 +25,20 @@ public class DeferredCommitterImpl implements DeferredCommitter {
private static class PartitionRanges {
private final PartitionSessionImpl partitionSession;
private final DisjointOffsetRangeSet ranges = new DisjointOffsetRangeSet();
private final ReentrantLock rangesLock = new ReentrantLock();

private PartitionRanges(PartitionSessionImpl partitionSession) {
this.partitionSession = partitionSession;
}

private void add(OffsetsRange offsetRange) {
try {
synchronized (ranges) {
rangesLock.lock();

try {
ranges.add(offsetRange);
} finally {
rangesLock.unlock();
}
} catch (RuntimeException exception) {
String errorMessage = "Error adding new offset range to DeferredCommitter for partition session " +
Expand All @@ -45,8 +51,11 @@ private void add(OffsetsRange offsetRange) {

private void commit() {
List<OffsetsRange> rangesToCommit;
synchronized (ranges) {
rangesLock.lock();
try {
rangesToCommit = ranges.getRangesAndClear();
} finally {
rangesLock.unlock();
}
partitionSession.commitOffsetRanges(rangesToCommit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -44,11 +45,13 @@ public class PartitionSessionImpl {
private final AtomicBoolean isWorking = new AtomicBoolean(true);

private final Queue<Batch> decodingBatches = new LinkedList<>();
private final ReentrantLock decodingBatchesLock = new ReentrantLock();
private final Queue<Batch> readingQueue = new ConcurrentLinkedQueue<>();
private final Function<DataReceivedEvent, CompletableFuture<Void>> dataEventCallback;
private final AtomicBoolean isReadingNow = new AtomicBoolean();
private final Consumer<List<OffsetsRange>> commitFunction;
private final NavigableMap<Long, CompletableFuture<Void>> commitFutures = new ConcurrentSkipListMap<>();
private final ReentrantLock commitFuturesLock = new ReentrantLock();
// Offset of the last read message + 1
private long lastReadOffset;
private long lastCommittedOffset;
Expand Down Expand Up @@ -149,14 +152,21 @@ public CompletableFuture<Void> addBatches(List<YdbTopic.StreamReadMessage.ReadRe
);
});
batchFutures.add(newBatch.getReadFuture());
synchronized (decodingBatches) {

decodingBatchesLock.lock();

try {
decodingBatches.add(newBatch);
} finally {
decodingBatchesLock.unlock();
}

CompletableFuture.runAsync(() -> decode(newBatch), decompressionExecutor)
.thenRun(() -> {
boolean haveNewBatchesReady = false;
synchronized (decodingBatches) {
decodingBatchesLock.lock();

try {
// Taking all encoded messages to sending queue
while (true) {
Batch decodingBatch = decodingBatches.peek();
Expand All @@ -176,7 +186,10 @@ public CompletableFuture<Void> addBatches(List<YdbTopic.StreamReadMessage.ReadRe
break;
}
}
} finally {
decodingBatchesLock.unlock();
}

if (haveNewBatchesReady) {
sendDataToReadersIfNeeded();
}
Expand All @@ -185,10 +198,12 @@ public CompletableFuture<Void> addBatches(List<YdbTopic.StreamReadMessage.ReadRe
return CompletableFuture.allOf(batchFutures.toArray(new CompletableFuture<?>[0]));
}

// Сommit single offset range with result future
// Commit single offset range with result future
public CompletableFuture<Void> commitOffsetRange(OffsetsRange rangeToCommit) {
CompletableFuture<Void> resultFuture = new CompletableFuture<>();
synchronized (commitFutures) {
commitFuturesLock.lock();

try {
if (isWorking.get()) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Offset range [{}, {}) is requested to be committed for partition session {} " +
Expand All @@ -205,6 +220,8 @@ public CompletableFuture<Void> commitOffsetRange(OffsetsRange rangeToCommit) {
partitionId + ") for " + path + " is already closed"));
return resultFuture;
}
} finally {
commitFuturesLock.unlock();
}
List<OffsetsRange> rangeWrapper = new ArrayList<>(1);
rangeWrapper.add(rangeToCommit);
Expand Down Expand Up @@ -334,16 +351,25 @@ private void sendDataToReadersIfNeeded() {
}

public void shutdown() {
synchronized (commitFutures) {
commitFuturesLock.lock();

try {
isWorking.set(false);
logger.info("[{}] Partition session {} (partition {}) is shutting down. Failing {} commit futures...", path,
id, partitionId, commitFutures.size());
commitFutures.values().forEach(f -> f.completeExceptionally(new RuntimeException("Partition session " + id +
" (partition " + partitionId + ") for " + path + " is closed")));
} finally {
commitFuturesLock.unlock();
}
synchronized (decodingBatches) {

decodingBatchesLock.lock();

try {
decodingBatches.forEach(Batch::complete);
readingQueue.forEach(Batch::complete);
} finally {
decodingBatchesLock.unlock();
}
}

Expand Down
33 changes: 21 additions & 12 deletions topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

import javax.annotation.Nullable;
Expand All @@ -34,6 +36,8 @@ public class SyncReaderImpl extends ReaderImpl implements SyncReader {
private static final Logger logger = LoggerFactory.getLogger(SyncReaderImpl.class);
private static final int POLL_INTERVAL_SECONDS = 5;
private final Queue<MessageBatchWrapper> batchesInQueue = new LinkedList<>();
private final ReentrantLock queueLock = new ReentrantLock();
private final Condition queueIsNotEmptyCondition = queueLock.newCondition();
private int currentMessageIndex = 0;

public SyncReaderImpl(TopicRpc topicRpc, ReaderSettings settings) {
Expand Down Expand Up @@ -66,22 +70,21 @@ public Message receiveInternal(ReceiveSettings receiveSettings, long timeout, Ti
if (isStopped.get()) {
throw new RuntimeException("Reader was stopped");
}
synchronized (batchesInQueue) {

queueLock.lock();

try {
if (batchesInQueue.isEmpty()) {
long millisToWait = TimeUnit.MILLISECONDS.convert(timeout, unit);
Instant deadline = Instant.now().plusMillis(millisToWait);
while (true) {
if (!batchesInQueue.isEmpty()) {
break;
}
Instant now = Instant.now();
if (now.isAfter(deadline)) {
while (batchesInQueue.isEmpty()) {
millisToWait = Duration.between(Instant.now(), deadline).toMillis();
if (millisToWait <= 0) {
break;
}
// Using Math.max to prevent rounding duration to 0 which would lead to infinite wait
millisToWait = Math.max(1, Duration.between(now, deadline).toMillis());

logger.trace("No messages in queue. Waiting for {} ms...", millisToWait);
batchesInQueue.wait(millisToWait);
queueIsNotEmptyCondition.await(millisToWait, TimeUnit.MILLISECONDS);
}

if (batchesInQueue.isEmpty()) {
Expand Down Expand Up @@ -112,6 +115,8 @@ public Message receiveInternal(ReceiveSettings receiveSettings, long timeout, Ti
}
}
return result;
} finally {
queueLock.unlock();
}
}

Expand Down Expand Up @@ -143,10 +148,14 @@ protected CompletableFuture<Void> handleDataReceivedEvent(DataReceivedEvent even
return resultFuture;
}

synchronized (batchesInQueue) {
queueLock.lock();

try {
logger.debug("Putting a message batch into queue and notifying in case receive method is waiting");
batchesInQueue.add(new MessageBatchWrapper(event.getMessages(), resultFuture));
batchesInQueue.notify();
queueIsNotEmptyCondition.signal();
} finally {
queueLock.unlock();
}
return resultFuture;
}
Expand Down
Loading

0 comments on commit 855fab7

Please sign in to comment.