Skip to content

Commit

Permalink
speeds up fate lock acquisition (#5262)
Browse files Browse the repository at this point in the history
Stores the lock data for fate locks in the zookeeper node name instead
of the zookeeper data for the node.  Ran some local performance test
with hundreds of fate operations and saw lock times go from 750ms to
15ms.

fixes #5181


Co-authored-by: Christopher Tubbs <[email protected]>
  • Loading branch information
keith-turner and ctubbsii authored Feb 1, 2025
1 parent e37cae1 commit 3213646
Show file tree
Hide file tree
Showing 7 changed files with 284 additions and 117 deletions.
24 changes: 12 additions & 12 deletions core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.stream.Stream;

import org.apache.accumulo.core.fate.FateStore.FateTxStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.ReadOnlyFateTxStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.LockType;
import org.apache.accumulo.core.fate.zookeeper.FateLock;
import org.apache.accumulo.core.fate.zookeeper.FateLock.FateLockPath;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
Expand Down Expand Up @@ -275,38 +277,36 @@ private void findLocks(ZooSession zk, final ServiceLockPath lockPath,
List<String> lockedIds = zr.getChildren(lockPath.toString());

for (String id : lockedIds) {

try {

FateLockPath fLockPath = FateLock.path(lockPath + "/" + id);
List<String> lockNodes =
FateLock.validateAndSort(fLockPath, zr.getChildren(fLockPath.toString()));
SortedSet<FateLock.NodeName> lockNodes =
FateLock.validateAndWarn(fLockPath, zr.getChildren(fLockPath.toString()));

int pos = 0;
boolean sawWriteLock = false;

for (String node : lockNodes) {
for (FateLock.NodeName node : lockNodes) {
try {
byte[] data = zr.getData(lockPath + "/" + id + "/" + node);
// Example data: "READ:<FateId>". FateId contains ':' hence the limit of 2
String[] lda = new String(data, UTF_8).split(":", 2);
FateId fateId = FateId.from(lda[1]);
FateLock.FateLockEntry fateLockEntry = node.fateLockEntry.get();
var fateId = fateLockEntry.getFateId();
var lockType = fateLockEntry.getLockType();

if (lda[0].charAt(0) == 'W') {
if (lockType == LockType.WRITE) {
sawWriteLock = true;
}

Map<FateId,List<String>> locks;

if (pos == 0) {
locks = heldLocks;
} else if (lda[0].charAt(0) == 'R' && !sawWriteLock) {
} else if (lockType == LockType.READ && !sawWriteLock) {
locks = heldLocks;
} else {
locks = waitingLocks;
}

locks.computeIfAbsent(fateId, k -> new ArrayList<>()).add(lda[0].charAt(0) + ":" + id);
locks.computeIfAbsent(fateId, k -> new ArrayList<>())
.add(lockType.name().charAt(0) + ":" + id);

} catch (Exception e) {
log.error("{}", e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.function.BiPredicate;
import java.util.function.Supplier;

import org.apache.accumulo.core.fate.FateId;
Expand All @@ -50,9 +51,10 @@ public enum LockType {
// them,
// a writer only runs when they are at the top of the queue.
public interface QueueLock {
SortedMap<Long,Supplier<FateLockEntry>> getEarlierEntries(long entry);
SortedMap<Long,Supplier<FateLockEntry>>
getEntries(BiPredicate<Long,Supplier<FateLockEntry>> predicate);

void removeEntry(long entry);
void removeEntry(FateLockEntry data, long seq);

long addEntry(FateLockEntry entry);
}
Expand Down Expand Up @@ -115,7 +117,9 @@ public boolean tryLock() {
entry = qlock.addEntry(FateLockEntry.from(this.getType(), this.fateId));
log.info("Added lock entry {} fateId {} lockType {}", entry, fateId, getType());
}
SortedMap<Long,Supplier<FateLockEntry>> entries = qlock.getEarlierEntries(entry);

SortedMap<Long,Supplier<FateLockEntry>> entries =
qlock.getEntries((seq, lockData) -> seq <= entry);
for (Entry<Long,Supplier<FateLockEntry>> entry : entries.entrySet()) {
if (entry.getKey().equals(this.entry)) {
return true;
Expand Down Expand Up @@ -150,7 +154,7 @@ public void unlock() {
return;
}
log.debug("Removing lock entry {} fateId {} lockType {}", entry, this.fateId, getType());
qlock.removeEntry(entry);
qlock.removeEntry(FateLockEntry.from(this.getType(), this.fateId), entry);
entry = -1;
}

Expand Down Expand Up @@ -181,7 +185,8 @@ public boolean tryLock() {
entry = qlock.addEntry(FateLockEntry.from(this.getType(), this.fateId));
log.info("Added lock entry {} fateId {} lockType {}", entry, this.fateId, getType());
}
SortedMap<Long,Supplier<FateLockEntry>> entries = qlock.getEarlierEntries(entry);
SortedMap<Long,Supplier<FateLockEntry>> entries =
qlock.getEntries((seq, locData) -> seq <= entry);
Iterator<Entry<Long,Supplier<FateLockEntry>>> iterator = entries.entrySet().iterator();
if (!iterator.hasNext()) {
throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry
Expand All @@ -200,19 +205,26 @@ public DistributedReadWriteLock(QueueLock qlock, FateId fateId) {
}

public static DistributedLock recoverLock(QueueLock qlock, FateId fateId) {
SortedMap<Long,Supplier<FateLockEntry>> entries = qlock.getEarlierEntries(Long.MAX_VALUE);
for (Entry<Long,Supplier<FateLockEntry>> entry : entries.entrySet()) {
FateLockEntry lockEntry = entry.getValue().get();
if (fateId.equals(lockEntry.getFateId())) {
SortedMap<Long,Supplier<FateLockEntry>> entries =
qlock.getEntries((seq, lockData) -> lockData.get().fateId.equals(fateId));

switch (entries.size()) {
case 0:
return null;
case 1:
var entry = entries.entrySet().iterator().next();
FateLockEntry lockEntry = entry.getValue().get();
switch (lockEntry.getLockType()) {
case READ:
return new ReadLock(qlock, lockEntry.getFateId(), entry.getKey());
case WRITE:
return new WriteLock(qlock, lockEntry.getFateId(), entry.getKey());
default:
throw new IllegalStateException("Unknown lock type " + lockEntry.getLockType());
}
}
default:
throw new IllegalStateException("Found more than one lock node " + entries);
}
return null;
}

@Override
Expand Down
Loading

0 comments on commit 3213646

Please sign in to comment.