diff --git a/server/src/main/java/com/github/dtprj/dongting/raft/store/ChainWriter.java b/server/src/main/java/com/github/dtprj/dongting/raft/store/ChainWriter.java index c8e5db33..e3bd44fb 100644 --- a/server/src/main/java/com/github/dtprj/dongting/raft/store/ChainWriter.java +++ b/server/src/main/java/com/github/dtprj/dongting/raft/store/ChainWriter.java @@ -32,16 +32,19 @@ import java.nio.ByteBuffer; import java.util.LinkedList; +import java.util.function.Consumer; import java.util.function.Supplier; /** * @author huangli */ -public abstract class ChainWriter { +public class ChainWriter { private static final DtLog log = DtLogs.getLogger(ChainWriter.class); private final PerfCallback perfCallback; private final RaftGroupConfigEx config; + private final Consumer writeCallback; + private final Consumer forceCallback; private int writePerfType1; private int writePerfType2; @@ -62,9 +65,13 @@ public abstract class ChainWriter { private boolean close; - public ChainWriter(String fiberNamePrefix, RaftGroupConfigEx config) { + public ChainWriter(String fiberNamePrefix, RaftGroupConfigEx config, Consumer writeCallback, + Consumer forceCallback) { this.config = config; this.perfCallback = config.getPerfCallback(); + this.writeCallback = writeCallback; + this.forceCallback = forceCallback; + DispatcherThread t = config.getFiberGroup().getThread(); this.directPool = t.getDirectPool(); this.needForceCondition = config.getFiberGroup().newCondition("needForceCond"); @@ -72,10 +79,6 @@ public ChainWriter(String fiberNamePrefix, RaftGroupConfigEx config) { new ForceLoopFrame()); } - protected abstract void writeFinish(WriteTask writeTask); - - protected abstract void forceFinish(WriteTask writeTask); - public void start() { forceFiber.start(); } @@ -187,7 +190,9 @@ private void afterWrite(Throwable ioEx, WriteTask task, long startTime) { } if (lastTaskNeedCallback != null) { needForceCondition.signal(); - writeFinish(lastTaskNeedCallback); + if (writeCallback != null) { + writeCallback.accept(lastTaskNeedCallback); + } } } @@ -256,7 +261,7 @@ private FrameCallResult afterForce(WriteTask task, long perfStartTime) { return Fiber.frameReturn(); } - forceFinish(task); + forceCallback.accept(task); return Fiber.resume(null, this); } } diff --git a/server/src/main/java/com/github/dtprj/dongting/raft/store/IdxFileQueue.java b/server/src/main/java/com/github/dtprj/dongting/raft/store/IdxFileQueue.java index 7d7f91f6..4f5b3ee9 100644 --- a/server/src/main/java/com/github/dtprj/dongting/raft/store/IdxFileQueue.java +++ b/server/src/main/java/com/github/dtprj/dongting/raft/store/IdxFileQueue.java @@ -75,7 +75,7 @@ class IdxFileQueue extends FileQueue implements IdxOps { private final FiberCondition needFlushCondition; final FiberCondition flushDoneCondition; - final IdxChainWriter chainWriter; + final ChainWriter chainWriter; private boolean closed; @@ -99,7 +99,7 @@ public IdxFileQueue(File dir, StatusManager statusManager, RaftGroupConfigEx gro this.needFlushCondition = groupConfig.getFiberGroup().newCondition("IdxNeedFlush-" + groupConfig.getGroupId()); this.flushDoneCondition = groupConfig.getFiberGroup().newCondition("IdxFlushDone-" + groupConfig.getGroupId()); - this.chainWriter = new IdxChainWriter(groupConfig); + this.chainWriter = new ChainWriter("IdxForce", groupConfig, null, this::forceFinish); chainWriter.setWritePerfType1(0); chainWriter.setWritePerfType2(PerfConsts.RAFT_D_IDX_WRITE); chainWriter.setForcePerfType(PerfConsts.RAFT_D_IDX_FORCE); @@ -171,28 +171,15 @@ protected FrameCallResult handle(Throwable ex) throws Throwable { } } - class IdxChainWriter extends ChainWriter { - - public IdxChainWriter(RaftGroupConfigEx config) { - super("IdxForce", config); - } - - @Override - protected void writeFinish(WriteTask writeTask) { - // nothing to do - } - - @Override - protected void forceFinish(WriteTask writeTask) { - // if we set syncForce to false, lastRaftIndex(committed) may less than lastForceLogIndex - long idx = Math.min(writeTask.getLastRaftIndex(), raftStatus.getLastForceLogIndex()); - if (idx > persistedIndexInStatusFile && !raftStatus.isInstallSnapshot()) { - statusManager.getProperties().put(KEY_PERSIST_IDX_INDEX, String.valueOf(idx)); - statusManager.persistAsync(true); - } - persistedIndex = writeTask.getLastRaftIndex(); - flushDoneCondition.signalAll(); + private void forceFinish(ChainWriter.WriteTask writeTask) { + // if we set syncForce to false, lastRaftIndex(committed) may less than lastForceLogIndex + long idx = Math.min(writeTask.getLastRaftIndex(), raftStatus.getLastForceLogIndex()); + if (idx > persistedIndexInStatusFile && !raftStatus.isInstallSnapshot()) { + statusManager.getProperties().put(KEY_PERSIST_IDX_INDEX, String.valueOf(idx)); + statusManager.persistAsync(true); } + persistedIndex = writeTask.getLastRaftIndex(); + flushDoneCondition.signalAll(); } public long indexToPos(long index) { diff --git a/server/src/main/java/com/github/dtprj/dongting/raft/store/LogFileQueue.java b/server/src/main/java/com/github/dtprj/dongting/raft/store/LogFileQueue.java index 8b19debd..e7a732f9 100644 --- a/server/src/main/java/com/github/dtprj/dongting/raft/store/LogFileQueue.java +++ b/server/src/main/java/com/github/dtprj/dongting/raft/store/LogFileQueue.java @@ -73,33 +73,24 @@ public LogFileQueue(File dir, RaftGroupConfigEx groupConfig, IdxOps idxOps, long this.heapPool = t.getHeapPool(); this.directPool = t.getDirectPool(); - LogChainWriter chainWriter = new LogChainWriter(groupConfig); + ChainWriter chainWriter = new ChainWriter("LogForce", groupConfig, this::writeFinish, this::forceFinish); chainWriter.setWritePerfType1(PerfConsts.RAFT_D_LOG_WRITE1); chainWriter.setWritePerfType2(PerfConsts.RAFT_D_LOG_WRITE2); chainWriter.setForcePerfType(PerfConsts.RAFT_D_LOG_SYNC); this.logAppender = new LogAppender(idxOps, this, groupConfig, chainWriter); } - private class LogChainWriter extends ChainWriter { - - public LogChainWriter(RaftGroupConfigEx config) { - super("LogForce", config); - } - - @Override - protected void writeFinish(WriteTask writeTask) { - if (writeTask.getLastRaftIndex() > 0) { - raftStatus.getLogWriteFinishCondition().signalAll(); - raftStatus.setLastWriteLogIndex(writeTask.getLastRaftIndex()); - } + private void writeFinish(ChainWriter.WriteTask writeTask) { + if (writeTask.getLastRaftIndex() > 0) { + raftStatus.getLogWriteFinishCondition().signalAll(); + raftStatus.setLastWriteLogIndex(writeTask.getLastRaftIndex()); } + } - @Override - protected void forceFinish(WriteTask writeTask) { - // assert lastRaftIndex > 0 - raftStatus.setLastForceLogIndex(writeTask.getLastRaftIndex()); - raftStatus.getLogForceFinishCondition().signalAll(); - } + private void forceFinish(ChainWriter.WriteTask writeTask) { + // assert lastRaftIndex > 0 + raftStatus.setLastForceLogIndex(writeTask.getLastRaftIndex()); + raftStatus.getLogForceFinishCondition().signalAll(); } public long fileLength() {