Skip to content

Commit

Permalink
refactor: move wait write finish to DefaultRaftLog.beginInstall()
Browse files Browse the repository at this point in the history
  • Loading branch information
areyouok committed Dec 13, 2024
1 parent 35d9161 commit 1b0ebd1
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ private FrameCallResult afterTruncate(long matchIndex, int matchTerm) {
class InstallFiberFrame extends AbstractAppendFrame<InstallSnapshotReq> {
private static final DtLog log = DtLogs.getLogger(InstallFiberFrame.class);
private final int groupId = gc.getRaftStatus().getGroupId();
private boolean markInstall = false;

public InstallFiberFrame(ReqInfoEx<InstallSnapshotReq> reqInfo, AppendProcessor processor) {
super("install snapshot", processor, reqInfo);
Expand Down Expand Up @@ -485,24 +486,26 @@ protected FrameCallResult process() {
}

private FrameCallResult startInstall(RaftStatusImpl raftStatus) {
if (RaftUtil.writeNotFinished(raftStatus)) {
log.info("wait write finish before install snapshot, groupId={}", groupId);
return RaftUtil.waitWriteFinish(raftStatus, this);
if (!markInstall) {
log.info("start install snapshot, groupId={}", groupId);
raftStatus.setInstallSnapshot(true);
gc.getApplyManager().wakeupApply(); // wakeup apply fiber to exit
gc.getStatusManager().persistAsync(true);
markInstall = true;
}
log.info("start install snapshot, groupId={}", groupId);
raftStatus.setInstallSnapshot(true);
gc.getApplyManager().wakeupApply(); // wakeup apply fiber to exit
gc.getStatusManager().persistAsync(true);
Fiber applyFiber = gc.getApplyManager().getApplyFiber();
if (!applyFiber.isFinished()) {
return applyFiber.join(this::afterApplyExit);
}
return afterApplyExit(null);
}

private FrameCallResult afterApplyExit(Void v) {
return gc.getStatusManager().waitUpdateFinish(this::afterStatusPersist);
}

private FrameCallResult afterStatusPersist(Void v) throws Exception {
Fiber applyFiber = gc.getApplyManager().getApplyFiber();
if (applyFiber.isFinished()) {
return Fiber.call(gc.getRaftLog().beginInstall(), this::applyConfigChange);
} else {
return applyFiber.join(this::afterStatusPersist);
}
return Fiber.call(gc.getRaftLog().beginInstall(), this::applyConfigChange);
}

private FrameCallResult applyConfigChange(Void unused) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,16 @@ public void markTruncateByTimestamp(long timestampBound, long delayMillis) {
@Override
public FiberFrame<Void> beginInstall() {
return new FiberFrame<>() {

@Override
public FrameCallResult execute(Void unused) {
if (idxFiles.chainWriter.hasTask()) {
return idxFiles.flushDoneCondition.await(100, this);
log.info("idx files wait for flush done");
return idxFiles.flushDoneCondition.await(1000, this);
}
if (logFiles.logAppender.chainWriter.hasTask()) {
log.info("log files wait for flush done");
return raftStatus.getLogForceFinishCondition().await(1000, this);
}
log.info("log files begin install snapshot");
return Fiber.call(logFiles.beginInstall(), this::afterLogBeginInstall);
Expand Down

0 comments on commit 1b0ebd1

Please sign in to comment.