diff --git a/server/src/main/java/com/github/dtprj/dongting/raft/rpc/AppendProcessor.java b/server/src/main/java/com/github/dtprj/dongting/raft/rpc/AppendProcessor.java index 4160fc4c..70eb9e76 100644 --- a/server/src/main/java/com/github/dtprj/dongting/raft/rpc/AppendProcessor.java +++ b/server/src/main/java/com/github/dtprj/dongting/raft/rpc/AppendProcessor.java @@ -444,6 +444,7 @@ private FrameCallResult afterTruncate(long matchIndex, int matchTerm) { class InstallFiberFrame extends AbstractAppendFrame { private static final DtLog log = DtLogs.getLogger(InstallFiberFrame.class); private final int groupId = gc.getRaftStatus().getGroupId(); + private boolean markInstall = false; public InstallFiberFrame(ReqInfoEx reqInfo, AppendProcessor processor) { super("install snapshot", processor, reqInfo); @@ -485,24 +486,26 @@ protected FrameCallResult process() { } private FrameCallResult startInstall(RaftStatusImpl raftStatus) { - if (RaftUtil.writeNotFinished(raftStatus)) { - log.info("wait write finish before install snapshot, groupId={}", groupId); - return RaftUtil.waitWriteFinish(raftStatus, this); + if (!markInstall) { + log.info("start install snapshot, groupId={}", groupId); + raftStatus.setInstallSnapshot(true); + gc.getApplyManager().wakeupApply(); // wakeup apply fiber to exit + gc.getStatusManager().persistAsync(true); + markInstall = true; } - log.info("start install snapshot, groupId={}", groupId); - raftStatus.setInstallSnapshot(true); - gc.getApplyManager().wakeupApply(); // wakeup apply fiber to exit - gc.getStatusManager().persistAsync(true); + Fiber applyFiber = gc.getApplyManager().getApplyFiber(); + if (!applyFiber.isFinished()) { + return applyFiber.join(this::afterApplyExit); + } + return afterApplyExit(null); + } + + private FrameCallResult afterApplyExit(Void v) { return gc.getStatusManager().waitUpdateFinish(this::afterStatusPersist); } private FrameCallResult afterStatusPersist(Void v) throws Exception { - Fiber applyFiber = gc.getApplyManager().getApplyFiber(); - if (applyFiber.isFinished()) { - return Fiber.call(gc.getRaftLog().beginInstall(), this::applyConfigChange); - } else { - return applyFiber.join(this::afterStatusPersist); - } + return Fiber.call(gc.getRaftLog().beginInstall(), this::applyConfigChange); } private FrameCallResult applyConfigChange(Void unused) { diff --git a/server/src/main/java/com/github/dtprj/dongting/raft/store/DefaultRaftLog.java b/server/src/main/java/com/github/dtprj/dongting/raft/store/DefaultRaftLog.java index c244eb5d..65fdbce6 100644 --- a/server/src/main/java/com/github/dtprj/dongting/raft/store/DefaultRaftLog.java +++ b/server/src/main/java/com/github/dtprj/dongting/raft/store/DefaultRaftLog.java @@ -186,10 +186,16 @@ public void markTruncateByTimestamp(long timestampBound, long delayMillis) { @Override public FiberFrame beginInstall() { return new FiberFrame<>() { + @Override public FrameCallResult execute(Void unused) { if (idxFiles.chainWriter.hasTask()) { - return idxFiles.flushDoneCondition.await(100, this); + log.info("idx files wait for flush done"); + return idxFiles.flushDoneCondition.await(1000, this); + } + if (logFiles.logAppender.chainWriter.hasTask()) { + log.info("log files wait for flush done"); + return raftStatus.getLogForceFinishCondition().await(1000, this); } log.info("log files begin install snapshot"); return Fiber.call(logFiles.beginInstall(), this::afterLogBeginInstall);