Skip to content

Commit

Permalink
refactor: make ChainWriter concrete
Browse files Browse the repository at this point in the history
  • Loading branch information
areyouok committed Dec 22, 2024
1 parent 2534abf commit 964dcc3
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,19 @@

import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* @author huangli
*/
public abstract class ChainWriter {
public class ChainWriter {
private static final DtLog log = DtLogs.getLogger(ChainWriter.class);

private final PerfCallback perfCallback;
private final RaftGroupConfigEx config;
private final Consumer<WriteTask> writeCallback;
private final Consumer<WriteTask> forceCallback;

private int writePerfType1;
private int writePerfType2;
Expand All @@ -62,20 +65,20 @@ public abstract class ChainWriter {

private boolean close;

public ChainWriter(String fiberNamePrefix, RaftGroupConfigEx config) {
public ChainWriter(String fiberNamePrefix, RaftGroupConfigEx config, Consumer<WriteTask> writeCallback,
Consumer<WriteTask> forceCallback) {
this.config = config;
this.perfCallback = config.getPerfCallback();
this.writeCallback = writeCallback;
this.forceCallback = forceCallback;

DispatcherThread t = config.getFiberGroup().getThread();
this.directPool = t.getDirectPool();
this.needForceCondition = config.getFiberGroup().newCondition("needForceCond");
this.forceFiber = new Fiber(fiberNamePrefix + "-" + config.getGroupId(), config.getFiberGroup(),
new ForceLoopFrame());
}

protected abstract void writeFinish(WriteTask writeTask);

protected abstract void forceFinish(WriteTask writeTask);

public void start() {
forceFiber.start();
}
Expand Down Expand Up @@ -187,7 +190,9 @@ private void afterWrite(Throwable ioEx, WriteTask task, long startTime) {
}
if (lastTaskNeedCallback != null) {
needForceCondition.signal();
writeFinish(lastTaskNeedCallback);
if (writeCallback != null) {
writeCallback.accept(lastTaskNeedCallback);
}
}
}

Expand Down Expand Up @@ -256,7 +261,7 @@ private FrameCallResult afterForce(WriteTask task, long perfStartTime) {
return Fiber.frameReturn();
}

forceFinish(task);
forceCallback.accept(task);
return Fiber.resume(null, this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class IdxFileQueue extends FileQueue implements IdxOps {
private final FiberCondition needFlushCondition;
final FiberCondition flushDoneCondition;

final IdxChainWriter chainWriter;
final ChainWriter chainWriter;

private boolean closed;

Expand All @@ -99,7 +99,7 @@ public IdxFileQueue(File dir, StatusManager statusManager, RaftGroupConfigEx gro
this.needFlushCondition = groupConfig.getFiberGroup().newCondition("IdxNeedFlush-" + groupConfig.getGroupId());
this.flushDoneCondition = groupConfig.getFiberGroup().newCondition("IdxFlushDone-" + groupConfig.getGroupId());

this.chainWriter = new IdxChainWriter(groupConfig);
this.chainWriter = new ChainWriter("IdxForce", groupConfig, null, this::forceFinish);
chainWriter.setWritePerfType1(0);
chainWriter.setWritePerfType2(PerfConsts.RAFT_D_IDX_WRITE);
chainWriter.setForcePerfType(PerfConsts.RAFT_D_IDX_FORCE);
Expand Down Expand Up @@ -171,28 +171,15 @@ protected FrameCallResult handle(Throwable ex) throws Throwable {
}
}

class IdxChainWriter extends ChainWriter {

public IdxChainWriter(RaftGroupConfigEx config) {
super("IdxForce", config);
}

@Override
protected void writeFinish(WriteTask writeTask) {
// nothing to do
}

@Override
protected void forceFinish(WriteTask writeTask) {
// if we set syncForce to false, lastRaftIndex(committed) may less than lastForceLogIndex
long idx = Math.min(writeTask.getLastRaftIndex(), raftStatus.getLastForceLogIndex());
if (idx > persistedIndexInStatusFile && !raftStatus.isInstallSnapshot()) {
statusManager.getProperties().put(KEY_PERSIST_IDX_INDEX, String.valueOf(idx));
statusManager.persistAsync(true);
}
persistedIndex = writeTask.getLastRaftIndex();
flushDoneCondition.signalAll();
private void forceFinish(ChainWriter.WriteTask writeTask) {
// if we set syncForce to false, lastRaftIndex(committed) may less than lastForceLogIndex
long idx = Math.min(writeTask.getLastRaftIndex(), raftStatus.getLastForceLogIndex());
if (idx > persistedIndexInStatusFile && !raftStatus.isInstallSnapshot()) {
statusManager.getProperties().put(KEY_PERSIST_IDX_INDEX, String.valueOf(idx));
statusManager.persistAsync(true);
}
persistedIndex = writeTask.getLastRaftIndex();
flushDoneCondition.signalAll();
}

public long indexToPos(long index) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,33 +73,24 @@ public LogFileQueue(File dir, RaftGroupConfigEx groupConfig, IdxOps idxOps, long
this.heapPool = t.getHeapPool();
this.directPool = t.getDirectPool();

LogChainWriter chainWriter = new LogChainWriter(groupConfig);
ChainWriter chainWriter = new ChainWriter("LogForce", groupConfig, this::writeFinish, this::forceFinish);
chainWriter.setWritePerfType1(PerfConsts.RAFT_D_LOG_WRITE1);
chainWriter.setWritePerfType2(PerfConsts.RAFT_D_LOG_WRITE2);
chainWriter.setForcePerfType(PerfConsts.RAFT_D_LOG_SYNC);
this.logAppender = new LogAppender(idxOps, this, groupConfig, chainWriter);
}

private class LogChainWriter extends ChainWriter {

public LogChainWriter(RaftGroupConfigEx config) {
super("LogForce", config);
}

@Override
protected void writeFinish(WriteTask writeTask) {
if (writeTask.getLastRaftIndex() > 0) {
raftStatus.getLogWriteFinishCondition().signalAll();
raftStatus.setLastWriteLogIndex(writeTask.getLastRaftIndex());
}
private void writeFinish(ChainWriter.WriteTask writeTask) {
if (writeTask.getLastRaftIndex() > 0) {
raftStatus.getLogWriteFinishCondition().signalAll();
raftStatus.setLastWriteLogIndex(writeTask.getLastRaftIndex());
}
}

@Override
protected void forceFinish(WriteTask writeTask) {
// assert lastRaftIndex > 0
raftStatus.setLastForceLogIndex(writeTask.getLastRaftIndex());
raftStatus.getLogForceFinishCondition().signalAll();
}
private void forceFinish(ChainWriter.WriteTask writeTask) {
// assert lastRaftIndex > 0
raftStatus.setLastForceLogIndex(writeTask.getLastRaftIndex());
raftStatus.getLogForceFinishCondition().signalAll();
}

public long fileLength() {
Expand Down

0 comments on commit 964dcc3

Please sign in to comment.