Skip to content

Commit

Permalink
fix: lease read linearizability in RaftUtil.incrTerm()
Browse files Browse the repository at this point in the history
  • Loading branch information
areyouok committed Jun 18, 2024
1 parent 6388064 commit 5f22757
Showing 1 changed file with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.github.dtprj.dongting.raft.impl;

import com.github.dtprj.dongting.common.Pair;
import com.github.dtprj.dongting.common.RefCount;
import com.github.dtprj.dongting.fiber.Fiber;
import com.github.dtprj.dongting.fiber.FiberGroup;
Expand All @@ -34,6 +35,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -123,21 +125,22 @@ public static void incrTerm(int remoteTerm, RaftStatusImpl raftStatus, int newLe
if (newLeaderId > 0) {
updateLeader(raftStatus, newLeaderId);
}
LinkedList<Pair<CompletableFuture<?>, NotLeaderException>> failList = new LinkedList<>();
if (oldRole != RaftRole.observer) {
log.info("update term from {} to {}, change to follower, oldRole={}",
raftStatus.getCurrentTerm(), remoteTerm, raftStatus.getRole());
TailCache oldPending = raftStatus.getTailCache();
raftStatus.setRole(RaftRole.follower);
if (oldRole == RaftRole.leader) {
TailCache oldPending = raftStatus.getTailCache();
NotLeaderException e = new NotLeaderException(raftStatus.getCurrentLeaderNode());
oldPending.forEach((idx, task) -> {
RaftNode leaderNode = raftStatus.getCurrentLeaderNode();
if (task.getFuture() != null) {
task.getFuture().completeExceptionally(new NotLeaderException(leaderNode));
failList.add(new Pair<>(task.getFuture(), e));
}
RaftTask reader;
while ((reader = task.getNextReader()) != null) {
if (reader.getFuture() != null) {
reader.getFuture().completeExceptionally(new NotLeaderException(leaderNode));
failList.add(new Pair<>(reader.getFuture(), e));
}
}
});
Expand All @@ -149,6 +152,10 @@ public static void incrTerm(int remoteTerm, RaftStatusImpl raftStatus, int newLe
raftStatus.setCurrentTerm(remoteTerm);
raftStatus.setVotedFor(0);
raftStatus.copyShareStatus();
// copy share status should happen before futures complete
for(Pair<CompletableFuture<?>, NotLeaderException> pair : failList) {
pair.getLeft().completeExceptionally(pair.getRight());
}
}

public static void resetStatus(RaftStatusImpl raftStatus) {
Expand Down

0 comments on commit 5f22757

Please sign in to comment.