diff --git a/server/src/main/java/com/github/dtprj/dongting/raft/impl/ApplyManager.java b/server/src/main/java/com/github/dtprj/dongting/raft/impl/ApplyManager.java index ac84e28fa..ef43e29af 100644 --- a/server/src/main/java/com/github/dtprj/dongting/raft/impl/ApplyManager.java +++ b/server/src/main/java/com/github/dtprj/dongting/raft/impl/ApplyManager.java @@ -122,21 +122,15 @@ public void execRead(long index, RaftTask rt) { } } - private void execNormalWrite(long index, RaftTask rt) { + private Object execNormalWrite(long index, RaftTask rt) { try { RaftInput input = rt.getInput(); long t = perfCallback.takeTime(PerfConsts.RAFT_D_STATE_MACHINE_EXEC); Object r = stateMachine.exec(index, rt.getItem().getTerm(), input); execCount++; perfCallback.fireTime(PerfConsts.RAFT_D_STATE_MACHINE_EXEC, t); - CompletableFuture future = rt.getFuture(); - if (future != null) { - future.complete(new RaftOutput(index, r)); - } + return r; } catch (Throwable ex) { - if (rt.getFuture() != null) { - rt.getFuture().completeExceptionally(ex); - } throw Fiber.fatal(new RaftException("exec write failed.", ex)); } } @@ -158,35 +152,42 @@ public void close() { } private FrameCallResult exec(RaftTask rt, long index, FrameCall resumePoint) { - switch (rt.getType()) { - case LogItem.TYPE_PREPARE_CONFIG_CHANGE: - case LogItem.TYPE_DROP_CONFIG_CHANGE: - case LogItem.TYPE_COMMIT_CONFIG_CHANGE: - return Fiber.call(new ConfigChangeFrame(rt), unused -> afterExec(true, index, rt, resumePoint)); - case LogItem.TYPE_NORMAL: - execNormalWrite(index, rt); - // not break here - case LogItem.TYPE_HEARTBEAT: - default: - return this.afterExec(false, index, rt, resumePoint); - } + return switch (rt.getType()) { + case LogItem.TYPE_PREPARE_CONFIG_CHANGE, + LogItem.TYPE_DROP_CONFIG_CHANGE, + LogItem.TYPE_COMMIT_CONFIG_CHANGE -> + Fiber.call(new ConfigChangeFrame(rt), unused -> afterExec( + true, index, rt, null, resumePoint)); + case LogItem.TYPE_NORMAL -> { + Object r = execNormalWrite(index, rt); + yield this.afterExec(false, index, rt, r, resumePoint); + } + // heart beat, no need to exec + default -> this.afterExec(false, index, rt, null, resumePoint); + }; } - private FrameCallResult afterExec(boolean configChange, long index, RaftTask rt, FrameCall resumePoint) { + private FrameCallResult afterExec(boolean configChange, long index, RaftTask rt, Object execResult, + FrameCall resumePoint) { RaftStatusImpl raftStatus = ApplyManager.this.raftStatus; raftStatus.setLastApplied(index); raftStatus.setLastAppliedTerm(rt.getItem().getTerm()); + if (configChange) { + raftStatus.setLastConfigChangeIndex(index); + } + + // copy share status should happen before group ready future and raft task future complete if (raftStatus.getGroupReadyFuture() != null && index >= raftStatus.getGroupReadyIndex()) { - raftStatus.getGroupReadyFuture().complete(null); + CompletableFuture f = raftStatus.getGroupReadyFuture(); raftStatus.setGroupReadyFuture(null); + raftStatus.copyShareStatus(); + f.complete(null); log.info("{} mark group ready future complete: groupId={}, groupReadyIndex={}", raftStatus.getRole(), raftStatus.getGroupId(), raftStatus.getGroupReadyIndex()); - } - - if (configChange) { - postConfigChange(index, rt); + } else { + raftStatus.copyShareStatus(); } if (!initFutureComplete && index >= initCommitIndex) { @@ -194,7 +195,9 @@ private FrameCallResult afterExec(boolean configChange, long index, RaftTask rt, initFutureComplete = true; raftStatus.getInitFuture().complete(null); } - raftStatus.copyShareStatus(); + if (rt.getFuture() != null) { + rt.getFuture().complete(new RaftOutput(index, execResult)); + } execReaders(index, rt); @@ -205,13 +208,6 @@ private FrameCallResult afterExec(boolean configChange, long index, RaftTask rt, return Fiber.resume(null, resumePoint); } - private void postConfigChange(long index, RaftTask rt) { - raftStatus.setLastConfigChangeIndex(index); - if (rt.getFuture() != null) { - rt.getFuture().complete(new RaftOutput(index, null)); - } - } - private class ApplyFrame extends FiberFrame { private RaftLog.LogIterator logIterator; @@ -365,16 +361,12 @@ public FrameCallResult execute(Void input) { } private FrameCallResult afterPersist(Void unused) { - switch (rt.getType()) { - case LogItem.TYPE_PREPARE_CONFIG_CHANGE: - return doPrepare(rt); - case LogItem.TYPE_DROP_CONFIG_CHANGE: - return gc.getMemberManager().doAbort(); - case LogItem.TYPE_COMMIT_CONFIG_CHANGE: - return gc.getMemberManager().doCommit(); - default: - throw Fiber.fatal(new RaftException("unknown config change type")); - } + return switch (rt.getType()) { + case LogItem.TYPE_PREPARE_CONFIG_CHANGE -> doPrepare(rt); + case LogItem.TYPE_DROP_CONFIG_CHANGE -> gc.getMemberManager().doAbort(); + case LogItem.TYPE_COMMIT_CONFIG_CHANGE -> gc.getMemberManager().doCommit(); + default -> throw Fiber.fatal(new RaftException("unknown config change type")); + }; } private FrameCallResult doPrepare(RaftTask rt) {