Skip to content

Commit

Permalink
perf: yield ApplyManager
Browse files Browse the repository at this point in the history
  • Loading branch information
areyouok committed Jun 14, 2024
1 parent 33c5a3f commit 649ce7d
Showing 1 changed file with 18 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public class ApplyManager {
private long initCommitIndex;
private boolean initFutureComplete = false;

private int execCount = 0;

private final PerfCallback perfCallback;

public ApplyManager(GroupComponents gc) {
Expand Down Expand Up @@ -108,6 +110,7 @@ public void execRead(long index, RaftTask rt) {
}
long t = perfCallback.takeTime(PerfConsts.RAFT_D_STATE_MACHINE_EXEC);
Object r = stateMachine.exec(index, rt.getItem().getTerm(), input);
execCount++;
perfCallback.fireTime(PerfConsts.RAFT_D_STATE_MACHINE_EXEC, t);
future.complete(new RaftOutput(index, r));
} catch (Throwable e) {
Expand All @@ -124,6 +127,7 @@ private void execNormalWrite(long index, RaftTask rt) {
RaftInput input = rt.getInput();
long t = perfCallback.takeTime(PerfConsts.RAFT_D_STATE_MACHINE_EXEC);
Object r = stateMachine.exec(index, rt.getItem().getTerm(), input);
execCount++;
perfCallback.fireTime(PerfConsts.RAFT_D_STATE_MACHINE_EXEC, t);
CompletableFuture<RaftOutput> future = rt.getFuture();
if (future != null) {
Expand Down Expand Up @@ -245,11 +249,19 @@ public FrameCallResult execute(Void input) {
if (raftStatus.isInstallSnapshot()) {
return Fiber.sleepUntilShouldStop(10, this);
}
execCount = 1;
return execLoop(null);
}

private FrameCallResult execLoop(Void v) {
if (execCount >= 100) {
return Fiber.yield(this);
}
RaftStatusImpl raftStatus = ApplyManager.this.raftStatus;
long diff = raftStatus.getCommitIndex() - raftStatus.getLastApplied();
if (diff == 0) {
return condition.await(this);
}

long index = raftStatus.getLastApplied() + 1;
RaftTask rt = tailCache.get(index);
if (rt == null || rt.getInput().isReadOnly()) {
Expand All @@ -266,13 +278,13 @@ public FrameCallResult execute(Void input) {
return Fiber.call(ff, this::afterLoad);
} else {
closeIterator();
return exec(rt, index, this);
return exec(rt, index, this::execLoop);
}
}

private FrameCallResult afterLoad(List<LogItem> items) {
ExecLoadResultFrame ff = new ExecLoadResultFrame(items);
return Fiber.call(ff, this);
return Fiber.call(ff, this::execLoop);
}

public void closeIterator() {
Expand Down Expand Up @@ -300,6 +312,9 @@ protected FrameCallResult doFinally() {

@Override
public FrameCallResult execute(Void input) {
if (isGroupShouldStopPlain()) {
return Fiber.frameReturn();
}
if (raftStatus.isInstallSnapshot()) {
log.warn("install snapshot, ignore load result");
return Fiber.frameReturn();
Expand Down

0 comments on commit 649ce7d

Please sign in to comment.