Skip to content

Commit

Permalink
refactor: RaftFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
areyouok committed Jun 14, 2024
1 parent 50a36cf commit 9e670ef
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.github.dtprj.dongting.dtkv.server.DtKV;
import com.github.dtprj.dongting.dtkv.server.KvServerUtil;
import com.github.dtprj.dongting.fiber.Dispatcher;
import com.github.dtprj.dongting.fiber.FiberGroup;
import com.github.dtprj.dongting.log.DtLog;
import com.github.dtprj.dongting.log.DtLogs;
import com.github.dtprj.dongting.net.HostPort;
Expand Down Expand Up @@ -116,10 +115,9 @@ public StateMachine createStateMachine(RaftGroupConfigEx groupConfig) {
}

@Override
public FiberGroup createFiberGroup(RaftGroupConfig groupConfig) {
public Dispatcher createDispatcher(RaftGroupConfig groupConfig) {
// we start multi nodes in same jvm, so use node id as part of dispatcher name
Dispatcher dispatcher = new Dispatcher("node-" + nodeId, groupConfig.getPerfCallback());
return new FiberGroup("group-" + GROUP_ID + "-node-" + nodeId, dispatcher);
return new Dispatcher("node-" + nodeId + "-dispatcher", groupConfig.getPerfCallback());
}
};
raftFactory.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.github.dtprj.dongting.common.AbstractLifeCircle;
import com.github.dtprj.dongting.common.DtTime;
import com.github.dtprj.dongting.fiber.Dispatcher;
import com.github.dtprj.dongting.fiber.FiberGroup;
import com.github.dtprj.dongting.raft.sm.DefaultSnapshotManager;
import com.github.dtprj.dongting.raft.sm.RaftCodecFactory;
import com.github.dtprj.dongting.raft.sm.SnapshotManager;
Expand All @@ -27,7 +26,6 @@
import com.github.dtprj.dongting.raft.store.RaftLog;
import com.github.dtprj.dongting.raft.store.StatusManager;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -74,20 +72,17 @@ public SnapshotManager createSnapshotManager(RaftGroupConfigEx groupConfig, Stat
}

@Override
public FiberGroup createFiberGroup(RaftGroupConfig groupConfig) {
Dispatcher dispatcher = new Dispatcher("raft-dispatcher-" + groupConfig.getGroupId(),
groupConfig.getPerfCallback());
return new FiberGroup("group-" + groupConfig.getGroupId(), dispatcher);
public Dispatcher createDispatcher(RaftGroupConfig groupConfig) {
return new Dispatcher("raft-dispatcher-" + groupConfig.getGroupId(), groupConfig.getPerfCallback());
}

@Override
public CompletableFuture<Void> startFiberGroup(FiberGroup group) {
group.getDispatcher().start();
return group.getDispatcher().startGroup(group);
public void startDispatcher(Dispatcher dispatcher) {
dispatcher.start();
}

@Override
public void afterGroupShutdown(FiberGroup group, DtTime timeout) {
group.getDispatcher().stop(timeout);
public void stopDispatcher(Dispatcher dispatcher, DtTime timeout) {
dispatcher.stop(timeout);
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package com.github.dtprj.dongting.raft.server;

import com.github.dtprj.dongting.common.DtTime;
import com.github.dtprj.dongting.fiber.FiberGroup;
import com.github.dtprj.dongting.fiber.Dispatcher;
import com.github.dtprj.dongting.raft.sm.RaftCodecFactory;
import com.github.dtprj.dongting.raft.sm.SnapshotManager;
import com.github.dtprj.dongting.raft.sm.StateMachine;
import com.github.dtprj.dongting.raft.store.RaftLog;
import com.github.dtprj.dongting.raft.store.StatusManager;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

public interface RaftFactory {
Expand All @@ -20,9 +19,9 @@ public interface RaftFactory {

SnapshotManager createSnapshotManager(RaftGroupConfigEx groupConfig, StateMachine stateMachine);

FiberGroup createFiberGroup(RaftGroupConfig groupConfig);
Dispatcher createDispatcher(RaftGroupConfig groupConfig);

CompletableFuture<Void> startFiberGroup(FiberGroup group);
void startDispatcher(Dispatcher dispatcher);

void afterGroupShutdown(FiberGroup group, DtTime timeout);
void stopDispatcher(Dispatcher dispatcher, DtTime timeout);
}
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ private RaftGroupImpl createRaftGroup(RaftServerConfig serverConfig, Set<Integer
throw new IllegalArgumentException("self id not found in group members/observers list: " + serverConfig.getNodeId());
}

FiberGroup fiberGroup = raftFactory.createFiberGroup(rgc);
Dispatcher dispatcher = raftFactory.createDispatcher(rgc);
FiberGroup fiberGroup = new FiberGroup("group-" + rgc.getGroupId(), dispatcher);
RaftStatusImpl raftStatus = new RaftStatusImpl(fiberGroup.getDispatcher().getTs());
raftStatus.setTailCache(new TailCache(rgc, raftStatus));
raftStatus.setNodeIdOfMembers(nodeIdOfMembers);
Expand Down Expand Up @@ -340,7 +341,10 @@ protected void doStart() {
GroupComponents gc = g.getGroupComponents();
// nodeManager.getAllNodesEx() is not thread safe
gc.getMemberManager().init(nodeManager.getAllNodesEx());
CompletableFuture<Void> f = raftFactory.startFiberGroup(gc.getFiberGroup());

FiberGroup fg = gc.getFiberGroup();
raftFactory.startDispatcher(fg.getDispatcher());
CompletableFuture<Void> f = fg.getDispatcher().startGroup(fg);
futures.add(f);
});
// should complete soon, so we wait here
Expand Down Expand Up @@ -517,7 +521,7 @@ private FrameCallResult afterRaftLogClose(Void unused) {
});

// the group shutdown is not finished, but it's ok to call afterGroupShutdown(to shutdown dispatcher)
raftFactory.afterGroupShutdown(fiberGroup, timeout);
raftFactory.stopDispatcher(fiberGroup.getDispatcher(), timeout);

return g.getShutdownFuture();
}
Expand Down Expand Up @@ -610,10 +614,11 @@ public CompletableFuture<Void> addGroup(RaftGroupConfig groupConfig, long acquir
}
});

RaftGroupImpl g = f.get();
RaftGroupImpl g = f.get(60, TimeUnit.SECONDS);
GroupComponents gc = g.getGroupComponents();

raftFactory.startFiberGroup(gc.getFiberGroup()).get();
FiberGroup fg = gc.getFiberGroup();
raftFactory.startDispatcher(fg.getDispatcher());
fg.getDispatcher().startGroup(fg).get(60, TimeUnit.SECONDS);

raftGroups.put(groupConfig.getGroupId(), g);
initRaftGroup(g);
Expand Down

0 comments on commit 9e670ef

Please sign in to comment.