Skip to content

Commit

Permalink
capacity-limiter-api: make AimdCapacityLimiter use ReentrantLock (#3025)
Browse files Browse the repository at this point in the history
Motivation:

Right now AimdCapacityLimiter uses a custom spin-lock whereas
most everywhere else we use a ReentrantLock if locking is necessary.

Modifications:

Convert it to a ReentrantLock.

Result:

More uniformity.
  • Loading branch information
bryce-anderson authored Jul 31, 2024
1 parent 21c0a74 commit 4e7d64e
Showing 1 changed file with 42 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.LongSupplier;
import javax.annotation.Nullable;

Expand All @@ -59,12 +59,7 @@ final class AimdCapacityLimiter implements CapacityLimiter {

private static final Logger LOGGER = LoggerFactory.getLogger(AimdCapacityLimiter.class);

private static final AtomicIntegerFieldUpdater<AimdCapacityLimiter> stateUpdater =
AtomicIntegerFieldUpdater.newUpdater(AimdCapacityLimiter.class, "state");

private static final int UNLOCKED = 0;
private static final int LOCKED = 1;

private final ReentrantLock lock = new ReentrantLock();
private final String name;
private final int min;
private final int max;
Expand All @@ -78,7 +73,6 @@ final class AimdCapacityLimiter implements CapacityLimiter {
private int pending;
private double limit;
private long lastIncreaseTimestampNs;
private volatile int state;
AimdCapacityLimiter(final String name, final int min, final int max, final int initial, final float increment,
final float backoffRatioOnLimit, final float backoffRatioOnLoss,
final Duration cooldown, @Nullable final StateObserver observer,
Expand All @@ -89,7 +83,6 @@ final class AimdCapacityLimiter implements CapacityLimiter {
this.increment = increment;
this.limit = initial;
this.pending = 0;
this.state = UNLOCKED;
this.backoffRatioOnLimit = backoffRatioOnLimit;
this.backoffRatioOnLoss = backoffRatioOnLoss;
this.coolDownPeriodNs = cooldown.toNanos();
Expand All @@ -108,19 +101,18 @@ public Ticket tryAcquire(final Classification classification, @Nullable final Co
Ticket ticket;
double l;
int p;
for (;;) {
if (stateUpdater.compareAndSet(this, UNLOCKED, LOCKED)) {
if (pending >= limit || pending == max) { // prevent pending going above max if limit is fractional
ticket = null;
} else {
ticket = new DefaultTicket(this, (int) limit - pending, pending);
pending++;
}
l = limit;
p = pending;
stateUpdater.set(this, UNLOCKED);
break;
lock.lock();
try {
if (pending >= limit || pending == max) { // prevent pending going above max if limit is fractional
ticket = null;
} else {
ticket = new DefaultTicket(this, (int) limit - pending, pending);
pending++;
}
l = limit;
p = pending;
} finally {
lock.unlock();
}
notifyObserver(l, p);
return ticket;
Expand All @@ -135,54 +127,51 @@ private void notifyObserver(double limit, int pending) {
private void onSuccess() {
double l;
int p;
for (;;) {
if (stateUpdater.compareAndSet(this, UNLOCKED, LOCKED)) {
if (coolDownPeriodNs == 0 || (timeSource.getAsLong() - lastIncreaseTimestampNs) >= coolDownPeriodNs) {
limit += increment;
if (limit > max || limit < 0) { // prevent limit going above max or overflow
limit = max;
}
if (coolDownPeriodNs != 0) {
lastIncreaseTimestampNs = timeSource.getAsLong();
}
lock.lock();
try {
if (coolDownPeriodNs == 0 || (timeSource.getAsLong() - lastIncreaseTimestampNs) >= coolDownPeriodNs) {
limit += increment;
if (limit > max || limit < 0) { // prevent limit going above max or overflow
limit = max;
}
if (coolDownPeriodNs != 0) {
lastIncreaseTimestampNs = timeSource.getAsLong();
}
pending--;
l = limit;
p = pending;
stateUpdater.set(this, UNLOCKED);
break;
}
pending--;
l = limit;
p = pending;
} finally {
lock.unlock();
}
notifyObserver(l, p);
}

private void onLoss() {
double l;
int p;
for (;;) {
if (stateUpdater.compareAndSet(this, UNLOCKED, LOCKED)) {
limit = max(min, (int) (limit * (limit >= max ? backoffRatioOnLimit : backoffRatioOnLoss)));
pending--;
l = limit;
p = pending;
stateUpdater.set(this, UNLOCKED);
break;
}
lock.lock();
try {
limit = max(min, (int) (limit * (limit >= max ? backoffRatioOnLimit : backoffRatioOnLoss)));
pending--;
l = limit;
p = pending;
} finally {
lock.unlock();
}
notifyObserver(l, p);
}

private void onIgnore() {
double l;
int p;
for (;;) {
if (stateUpdater.compareAndSet(this, UNLOCKED, LOCKED)) {
pending--;
l = limit;
p = pending;
stateUpdater.set(this, UNLOCKED);
break;
}
lock.lock();
try {
pending--;
l = limit;
p = pending;
} finally {
lock.unlock();
}
notifyObserver(l, p);
}
Expand All @@ -200,7 +189,6 @@ public String toString() {
", pending=" + pending +
", limit=" + limit +
", lastIncreaseTimestampNs=" + lastIncreaseTimestampNs +
", state=" + state +
'}';
}

Expand Down

0 comments on commit 4e7d64e

Please sign in to comment.