Skip to content

Commit

Permalink
Remove some uses of synchronized (#3024)
Browse files Browse the repository at this point in the history
Motivation:

With java-21 and v-threads synchronization can cause deadlocks
because carrier threads cannot release a v-thread that is in a
synchronized block.

Modifications:

Identify the cases where we're using synchronization and move
them to the equivalent v-thread safe ReentrantLock.

Result:

Better v-thread safety.
  • Loading branch information
bryce-anderson authored Jul 30, 2024
1 parent 6b3a09f commit 1c2afec
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -67,7 +68,7 @@ final class GradientCapacityLimiter implements CapacityLimiter {

private static final Logger LOGGER = LoggerFactory.getLogger(GradientCapacityLimiter.class);

private final Object lock = new Object();
private final ReentrantLock lock = new ReentrantLock();

private final String name;
private final int min;
Expand Down Expand Up @@ -128,12 +129,15 @@ public Ticket tryAcquire(final Classification classification, @Nullable final Co
int newLimit;

Ticket ticket = null;
synchronized (lock) {
lock.lock();
try {
newLimit = (int) limit;
if (pending < limit) {
newPending = ++pending;
ticket = new DefaultTicket(this, newLimit - newPending, newPending);
}
} finally {
lock.unlock();
}

if (ticket != null) {
Expand All @@ -143,9 +147,10 @@ public Ticket tryAcquire(final Classification classification, @Nullable final Co
}

/**
* Needs to be called within a synchronized block.
* Needs to be called while holding the lock.
*/
private int updateLimit(final long timestampNs, final double shortLatencyMillis, final double longLatencyMillis) {
assert lock.isHeldByCurrentThread();
if (isNaN(longLatencyMillis) || isNaN(shortLatencyMillis) || shortLatencyMillis == 0) {
return -1;
}
Expand Down Expand Up @@ -175,7 +180,8 @@ private int onSuccess(final long durationNs) {
final long rttMillis = NANOSECONDS.toMillis(durationNs);
int newPending;
int limit;
synchronized (lock) {
lock.lock();
try {
limit = (int) this.limit;
final double longLatencyMillis = longLatency.observe(nowNs, rttMillis);
final double shortLatencyMillis = shortLatency.observe(nowNs, rttMillis);
Expand All @@ -184,6 +190,8 @@ private int onSuccess(final long durationNs) {
if ((nowNs - lastSamplingNs) >= limitUpdateIntervalNs) {
limit = updateLimit(nowNs, shortLatencyMillis, longLatencyMillis);
}
} finally {
lock.unlock();
}

observer.onActiveRequestsDecr();
Expand All @@ -194,9 +202,12 @@ private int onDrop() {
int newPending;
double newLimit;

synchronized (lock) {
lock.lock();
try {
newLimit = limit = max(min, limit * (limit >= max ? backoffRatioOnLimit : backoffRatioOnLoss));
newPending = --pending;
} finally {
lock.unlock();
}

observer.onActiveRequestsDecr();
Expand All @@ -207,17 +218,21 @@ private int onIgnore() {
int newPending;
double newLimit;

synchronized (lock) {
lock.lock();
try {
newLimit = limit;
newPending = --pending;
} finally {
lock.unlock();
}
observer.onActiveRequestsDecr();
return (int) (newLimit - newPending);
}

@Override
public String toString() {
synchronized (lock) {
lock.lock();
try {
return "GradientCapacityLimiter{" +
", name='" + name + '\'' +
", min=" + min +
Expand All @@ -232,6 +247,8 @@ public String toString() {
", limit=" + limit +
", lastSamplingNs=" + lastSamplingNs +
'}';
} finally {
lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,17 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.ToIntFunction;
import javax.annotation.Nullable;

final class CacheConnectionFactory<ResolvedAddress, C extends ListenableAsyncCloseable>
extends DelegatingConnectionFactory<ResolvedAddress, C> {
private static final Logger LOGGER = LoggerFactory.getLogger(CacheConnectionFactory.class);

private final Lock lock = new ReentrantLock();
// access to `map` must be protected by `lock`.
private final Map<ResolvedAddress, Item<C>> map = new HashMap<>();
private final ToIntFunction<ResolvedAddress> maxConcurrencyFunc;

Expand Down Expand Up @@ -63,7 +68,8 @@ public Single<C> newConnection(final ResolvedAddress resolvedAddress, @Nullable
}

Single<C> result;
synchronized (map) {
lock.lock();
try {
final Item<C> item1 = map.get(resolvedAddress);
if (item1 == null || (result = item1.addSubscriber(maxConcurrency)) == null) {
final Item<C> item2 = new Item<>();
Expand Down Expand Up @@ -110,8 +116,11 @@ public void onError(final Throwable t) {
}

private void lockRemoveFromMap() {
synchronized (map) {
lock.lock();
try {
map.remove(resolvedAddress, item2);
} finally {
lock.unlock();
}
}
})
Expand All @@ -123,8 +132,11 @@ public void onSubscribe(final Cancellable cancellable) {
// Acquire the lock before cache operator processes cancel, so if it results
// in an upstream cancel we will be holding the lock and able to remove the
// map entry safely.
synchronized (map) {
lock.lock();
try {
cancellable.cancel();
} finally {
lock.unlock();
}
});
}
Expand Down Expand Up @@ -153,12 +165,17 @@ private void lockRemoveFromMap() {
// pending connection attempt is exceeded. When the single completes we don't need
// to cache the connection anymore because the LoadBalancer above will cache the
// connection. This also help keep memory down from the map.
synchronized (map) {
lock.lock();
try {
map.remove(resolvedAddress, item2);
} finally {
lock.unlock();
}
}
});
}
} finally {
lock.unlock();
}

return result.shareContextOnSubscribe();
Expand Down

0 comments on commit 1c2afec

Please sign in to comment.