Skip to content

Commit

Permalink
fix: lease read linearizability
Browse files Browse the repository at this point in the history
  • Loading branch information
areyouok committed Jun 18, 2024
1 parent a910404 commit 6388064
Showing 1 changed file with 36 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,21 +122,15 @@ public void execRead(long index, RaftTask rt) {
}
}

private void execNormalWrite(long index, RaftTask rt) {
private Object execNormalWrite(long index, RaftTask rt) {
try {
RaftInput input = rt.getInput();
long t = perfCallback.takeTime(PerfConsts.RAFT_D_STATE_MACHINE_EXEC);
Object r = stateMachine.exec(index, rt.getItem().getTerm(), input);
execCount++;
perfCallback.fireTime(PerfConsts.RAFT_D_STATE_MACHINE_EXEC, t);
CompletableFuture<RaftOutput> future = rt.getFuture();
if (future != null) {
future.complete(new RaftOutput(index, r));
}
return r;
} catch (Throwable ex) {
if (rt.getFuture() != null) {
rt.getFuture().completeExceptionally(ex);
}
throw Fiber.fatal(new RaftException("exec write failed.", ex));
}
}
Expand All @@ -158,43 +152,52 @@ public void close() {
}

private FrameCallResult exec(RaftTask rt, long index, FrameCall<Void> resumePoint) {
switch (rt.getType()) {
case LogItem.TYPE_PREPARE_CONFIG_CHANGE:
case LogItem.TYPE_DROP_CONFIG_CHANGE:
case LogItem.TYPE_COMMIT_CONFIG_CHANGE:
return Fiber.call(new ConfigChangeFrame(rt), unused -> afterExec(true, index, rt, resumePoint));
case LogItem.TYPE_NORMAL:
execNormalWrite(index, rt);
// not break here
case LogItem.TYPE_HEARTBEAT:
default:
return this.afterExec(false, index, rt, resumePoint);
}
return switch (rt.getType()) {
case LogItem.TYPE_PREPARE_CONFIG_CHANGE,
LogItem.TYPE_DROP_CONFIG_CHANGE,
LogItem.TYPE_COMMIT_CONFIG_CHANGE ->
Fiber.call(new ConfigChangeFrame(rt), unused -> afterExec(
true, index, rt, null, resumePoint));
case LogItem.TYPE_NORMAL -> {
Object r = execNormalWrite(index, rt);
yield this.afterExec(false, index, rt, r, resumePoint);
}
// heart beat, no need to exec
default -> this.afterExec(false, index, rt, null, resumePoint);
};
}

private FrameCallResult afterExec(boolean configChange, long index, RaftTask rt, FrameCall<Void> resumePoint) {
private FrameCallResult afterExec(boolean configChange, long index, RaftTask rt, Object execResult,
FrameCall<Void> resumePoint) {
RaftStatusImpl raftStatus = ApplyManager.this.raftStatus;

raftStatus.setLastApplied(index);
raftStatus.setLastAppliedTerm(rt.getItem().getTerm());

if (configChange) {
raftStatus.setLastConfigChangeIndex(index);
}

// copy share status should happen before group ready future and raft task future complete
if (raftStatus.getGroupReadyFuture() != null && index >= raftStatus.getGroupReadyIndex()) {
raftStatus.getGroupReadyFuture().complete(null);
CompletableFuture<Void> f = raftStatus.getGroupReadyFuture();
raftStatus.setGroupReadyFuture(null);
raftStatus.copyShareStatus();
f.complete(null);
log.info("{} mark group ready future complete: groupId={}, groupReadyIndex={}",
raftStatus.getRole(), raftStatus.getGroupId(), raftStatus.getGroupReadyIndex());
}

if (configChange) {
postConfigChange(index, rt);
} else {
raftStatus.copyShareStatus();
}

if (!initFutureComplete && index >= initCommitIndex) {
log.info("apply manager init complete, initCommitIndex={}", initCommitIndex);
initFutureComplete = true;
raftStatus.getInitFuture().complete(null);
}
raftStatus.copyShareStatus();
if (rt.getFuture() != null) {
rt.getFuture().complete(new RaftOutput(index, execResult));
}

execReaders(index, rt);

Expand All @@ -205,13 +208,6 @@ private FrameCallResult afterExec(boolean configChange, long index, RaftTask rt,
return Fiber.resume(null, resumePoint);
}

private void postConfigChange(long index, RaftTask rt) {
raftStatus.setLastConfigChangeIndex(index);
if (rt.getFuture() != null) {
rt.getFuture().complete(new RaftOutput(index, null));
}
}

private class ApplyFrame extends FiberFrame<Void> {

private RaftLog.LogIterator logIterator;
Expand Down Expand Up @@ -365,16 +361,12 @@ public FrameCallResult execute(Void input) {
}

private FrameCallResult afterPersist(Void unused) {
switch (rt.getType()) {
case LogItem.TYPE_PREPARE_CONFIG_CHANGE:
return doPrepare(rt);
case LogItem.TYPE_DROP_CONFIG_CHANGE:
return gc.getMemberManager().doAbort();
case LogItem.TYPE_COMMIT_CONFIG_CHANGE:
return gc.getMemberManager().doCommit();
default:
throw Fiber.fatal(new RaftException("unknown config change type"));
}
return switch (rt.getType()) {
case LogItem.TYPE_PREPARE_CONFIG_CHANGE -> doPrepare(rt);
case LogItem.TYPE_DROP_CONFIG_CHANGE -> gc.getMemberManager().doAbort();
case LogItem.TYPE_COMMIT_CONFIG_CHANGE -> gc.getMemberManager().doCommit();
default -> throw Fiber.fatal(new RaftException("unknown config change type"));
};
}

private FrameCallResult doPrepare(RaftTask rt) {
Expand Down

0 comments on commit 6388064

Please sign in to comment.