Skip to content

Commit

Permalink
feat: support StateMachine async exec
Browse files Browse the repository at this point in the history
  • Loading branch information
areyouok committed Jun 20, 2024
1 parent ed94d18 commit 9d5b814
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ public Decoder<? extends Encodable> createBodyDecoder(int bizType) {
}

@Override
public Object exec(long index, int term, RaftInput input) {
public FiberFuture<Object> exec(long index, int term, RaftInput input) {
KvStatus kvStatus = this.kvStatus;
ensureRunning(kvStatus);
StrEncoder key = (StrEncoder) input.getHeader();
ByteArrayEncoder data = (ByteArrayEncoder) input.getBody();
return switch (input.getBizType()) {
Object r = switch (input.getBizType()) {
case BIZ_TYPE_GET -> kvStatus.kvImpl.get(index, key.getStr());
case BIZ_TYPE_PUT -> {
kvStatus.kvImpl.put(index, key.getStr(), data.getData(), maxOpenSnapshotIndex);
Expand All @@ -83,6 +83,7 @@ public Object exec(long index, int term, RaftInput input) {
case BIZ_TYPE_REMOVE -> kvStatus.kvImpl.remove(index, key.getStr(), maxOpenSnapshotIndex);
default -> throw new IllegalArgumentException("unknown bizType " + input.getBizType());
};
return FiberFuture.completedFuture(FiberGroup.currentGroup(), r);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.github.dtprj.dongting.fiber.Fiber;
import com.github.dtprj.dongting.fiber.FiberCondition;
import com.github.dtprj.dongting.fiber.FiberFrame;
import com.github.dtprj.dongting.fiber.FiberFuture;
import com.github.dtprj.dongting.fiber.FiberGroup;
import com.github.dtprj.dongting.fiber.FrameCall;
import com.github.dtprj.dongting.fiber.FrameCallResult;
Expand All @@ -38,6 +39,7 @@
import com.github.dtprj.dongting.raft.store.StatusManager;

import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -57,7 +59,10 @@ public class ApplyManager {
private RaftLog raftLog;
private StateMachine stateMachine;

private FiberCondition condition;
private FiberCondition needApplyCond;
private boolean waitApply;
private FiberCondition applyFinishCond;
private final LinkedList<RaftTask> heartBeatQueue = new LinkedList<>();

private long initCommitIndex;
private boolean initFutureComplete = false;
Expand All @@ -79,7 +84,8 @@ public void postInit() {
}

public void init(FiberGroup fiberGroup) {
this.condition = fiberGroup.newCondition("needApply");
this.needApplyCond = fiberGroup.newCondition("needApply");
this.applyFinishCond = fiberGroup.newCondition("applyFinish");
this.initCommitIndex = raftStatus.getCommitIndex();
startApplyFiber(fiberGroup);
if (raftStatus.getLastApplied() >= raftStatus.getCommitIndex()) {
Expand All @@ -98,55 +104,72 @@ private void startApplyFiber(FiberGroup fiberGroup) {
f.start();
}

private FrameCallResult execNormalTask(long index, RaftTask rt, FrameCall<Void> resumePoint) {
RaftInput input = rt.getInput();
try {
if (input.isReadOnly() && (rt.getFuture() == null || rt.getFuture().isDone())) {
return afterExec(index, rt, null, null, resumePoint);
}
long t = perfCallback.takeTime(PerfConsts.RAFT_D_STATE_MACHINE_EXEC);
Object r = stateMachine.exec(index, rt.getItem().getTerm(), input);
execCount++;
perfCallback.fireTime(PerfConsts.RAFT_D_STATE_MACHINE_EXEC, t);
return afterExec(index, rt, r, null, resumePoint);
} catch (Throwable ex) {
if (input.isReadOnly()) {
return afterExec(index, rt, null, ex, resumePoint);
} else {
throw Fiber.fatal(new RaftException("exec write failed.", ex));
}
}
}

public void apply() {
condition.signal();
needApplyCond.signal();
}

public void shutdown(DtTime timeout) {
condition.signal();
needApplyCond.signal();
try {
stateMachine.stop(timeout);
} catch (Throwable e) {
log.error("state machine stop failed", e);
}
}

@SuppressWarnings("EnhancedSwitchMigration")
private FrameCallResult exec(RaftTask rt, long index, FrameCall<Void> resumePoint) {
return switch (rt.getType()) {
case LogItem.TYPE_PREPARE_CONFIG_CHANGE,
LogItem.TYPE_DROP_CONFIG_CHANGE,
LogItem.TYPE_COMMIT_CONFIG_CHANGE -> Fiber.call(new ConfigChangeFrame(rt), unused -> {
raftStatus.setLastConfigChangeIndex(index);
return afterExec(index, rt, null, null, resumePoint);
});
case LogItem.TYPE_NORMAL -> execNormalTask(index, rt, resumePoint);
// heart beat, no need to exec
default -> this.afterExec(index, rt, null, null, resumePoint);
};
raftStatus.setLastApplying(index);
switch (rt.getType()) {
case LogItem.TYPE_PREPARE_CONFIG_CHANGE:
case LogItem.TYPE_DROP_CONFIG_CHANGE:
case LogItem.TYPE_COMMIT_CONFIG_CHANGE:
// block apply fiber util config change complete
return Fiber.call(new ConfigChangeFrame(rt), unused -> {
raftStatus.setLastConfigChangeIndex(index);
afterExec(index, rt, null, null);
return Fiber.resume(null, resumePoint);
});
case LogItem.TYPE_NORMAL: {
RaftInput input = rt.getInput();
if (input.isReadOnly() && (rt.getFuture() == null || rt.getFuture().isDone())) {
// no need to execute read only task if no one wait for result
afterExec(index, rt, null, null);
} else {
long t = perfCallback.takeTime(PerfConsts.RAFT_D_STATE_MACHINE_EXEC);
FiberFuture<Object> f = null;
Throwable execEx = null;
try {
f = stateMachine.exec(index, rt.getItem().getTerm(), input);
execCount++;
} catch (Throwable e) {
execEx = e;
}
if (execEx != null) {
afterExec(index, rt, null, execEx);
} else if (f != null) {
f.registerCallback((result, ex) -> {
perfCallback.fireTime(PerfConsts.RAFT_D_STATE_MACHINE_EXEC, t);
afterExec(index, rt, result, ex);
});
} else {
throw Fiber.fatal(new RaftException("statemachine exec return null future"));
}
}
return Fiber.resume(null, resumePoint);
}
default:
// heart beat, no need to exec
heartBeatQueue.addLast(rt);
tryApplyHeartBeat(raftStatus.getLastApplied());
return Fiber.resume(null, resumePoint);
}
}

private FrameCallResult afterExec(long index, RaftTask rt, Object execResult,
Throwable execEx, FrameCall<Void> resumePoint) {
private void afterExec(long index, RaftTask rt, Object execResult, Throwable execEx) {
if (execEx != null && !rt.getInput().isReadOnly()) {
throw Fiber.fatal(execEx);
}
RaftStatusImpl raftStatus = ApplyManager.this.raftStatus;

raftStatus.setLastApplied(index);
Expand Down Expand Up @@ -174,13 +197,20 @@ private FrameCallResult afterExec(long index, RaftTask rt, Object execResult,
rt.getFuture().complete(new RaftOutput(index, execResult));
} else if (rt.getInput().isReadOnly()) {
rt.getFuture().completeExceptionally(execEx);
} else {
throw Fiber.fatal(execEx);
}
}
if (waitApply) {
applyFinishCond.signal();
}
tryApplyHeartBeat(index);
}

// resume loop
return Fiber.resume(null, resumePoint);
private void tryApplyHeartBeat(long appliedIndex) {
RaftTask t = heartBeatQueue.peekFirst();
if (t != null && t.getItem().getIndex() == appliedIndex + 1) {
heartBeatQueue.pollFirst();
afterExec(appliedIndex + 1, t, null, null);
}
}

private class ApplyFrame extends FiberFrame<Void> {
Expand Down Expand Up @@ -229,18 +259,18 @@ private FrameCallResult execLoop(Void v) {
return Fiber.yield(this);
}
RaftStatusImpl raftStatus = ApplyManager.this.raftStatus;
long diff = raftStatus.getCommitIndex() - raftStatus.getLastApplied();
long diff = raftStatus.getCommitIndex() - raftStatus.getLastApplying();
if (diff == 0) {
return condition.await(this);
return needApplyCond.await(this);
}
long index = raftStatus.getLastApplied() + 1;
long index = raftStatus.getLastApplying() + 1;
RaftTask rt = tailCache.get(index);
if (rt == null || rt.getInput().isReadOnly()) {
int limit = (int) Math.min(diff, 1024L);
if (log.isDebugEnabled()) {
log.debug("load from {}, diff={}, limit={}, cacheSize={}, cacheFirstIndex={},commitIndex={},applyIndex={}",
log.debug("load from {}, diff={}, limit={}, cacheSize={}, cacheFirstIndex={},commitIndex={},lastApplying={}",
index, diff, limit, tailCache.size(), tailCache.getFirstIndex(),
raftStatus.getCommitIndex(), raftStatus.getLastApplied());
raftStatus.getCommitIndex(), raftStatus.getLastApplying());
}
if (logIterator == null) {
logIterator = raftLog.openIterator(null);
Expand Down Expand Up @@ -322,12 +352,17 @@ private ConfigChangeFrame(RaftTask rt) {

@Override
public FrameCallResult execute(Void input) {
StatusManager statusManager = gc.getStatusManager();
statusManager.persistAsync(true);
return statusManager.waitUpdateFinish(this::afterPersist);
if (raftStatus.getLastApplied() != rt.getItem().getIndex() - 1) {
waitApply = true;
return applyFinishCond.await(this::afterApplyFinish);
}
return afterApplyFinish(null);
}

private FrameCallResult afterPersist(Void unused) {
private FrameCallResult afterApplyFinish(Void unused) {
waitApply = false;
StatusManager statusManager = gc.getStatusManager();
statusManager.persistAsync(true);
return switch (rt.getType()) {
case LogItem.TYPE_PREPARE_CONFIG_CHANGE -> doPrepare(rt);
case LogItem.TYPE_DROP_CONFIG_CHANGE -> gc.getMemberManager().doAbort();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ private FrameCallResult afterRecoverStateMachine(Pair<Integer, Long> snapshotRes
log.info("load snapshot to term={}, index={}, groupId={}", snapshotTerm, snapshotIndex, groupConfig.getGroupId());
raftStatus.setLastApplied(snapshotIndex);
raftStatus.setLastAppliedTerm(snapshotTerm);
raftStatus.setLastApplying(snapshotIndex);
if (snapshotIndex > raftStatus.getCommitIndex()) {
raftStatus.setCommitIndex(snapshotIndex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,13 @@ public class RaftStatusImpl extends RaftStatus {

private int lastAppliedTerm;

// lastApplied <= commitIndex (<= lastForceLogIndex <=) lastWriteLogIndex <= lastLogIndex
// lastApplied <= lastApplying <= commitIndex (<= lastForceLogIndex <=) lastWriteLogIndex <= lastLogIndex
// IdxFiles.nextPersistIndex may less than lastApplied, since it's update asynchronously
private long lastLogIndex;
private int lastLogTerm;
private long lastForceLogIndex;
private long lastWriteLogIndex;
private long lastApplying;

private long leaderCommit;

Expand Down Expand Up @@ -434,4 +435,12 @@ public int getLastAppliedTerm() {
public void setLastAppliedTerm(int lastAppliedTerm) {
this.lastAppliedTerm = lastAppliedTerm;
}

public long getLastApplying() {
return lastApplying;
}

public void setLastApplying(long lastApplying) {
this.lastApplying = lastApplying;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ private FrameCallResult finishInstall(InstallSnapshotReq req, RaftStatusImpl raf
// call raftStatus.copyShareStatus() in doFinally()
raftStatus.setLastApplied(req.lastIncludedIndex);
raftStatus.setLastAppliedTerm(req.lastIncludedTerm);
raftStatus.setLastApplying(req.lastIncludedIndex);

raftStatus.setCommitIndex(req.lastIncludedIndex);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface StateMachine extends LifeCircle, RaftCodecFactory {
/**
* this method is called in raft thread.
*/
Object exec(long index, int term, RaftInput input);
FiberFuture<Object> exec(long index, int term, RaftInput input);

/**
* this method is called in raft thread.
Expand Down

0 comments on commit 9d5b814

Please sign in to comment.