diff --git a/server/src/main/java/com/github/dtprj/dongting/dtkv/server/DtKV.java b/server/src/main/java/com/github/dtprj/dongting/dtkv/server/DtKV.java index 0290d359d..2368f97a4 100644 --- a/server/src/main/java/com/github/dtprj/dongting/dtkv/server/DtKV.java +++ b/server/src/main/java/com/github/dtprj/dongting/dtkv/server/DtKV.java @@ -19,6 +19,7 @@ import com.github.dtprj.dongting.codec.Decoder; import com.github.dtprj.dongting.codec.Encodable; import com.github.dtprj.dongting.codec.StrEncoder; +import com.github.dtprj.dongting.common.AbstractLifeCircle; import com.github.dtprj.dongting.common.DtTime; import com.github.dtprj.dongting.fiber.FiberFuture; import com.github.dtprj.dongting.fiber.FiberGroup; @@ -36,7 +37,7 @@ /** * @author huangli */ -public class DtKV implements StateMachine { +public class DtKV extends AbstractLifeCircle implements StateMachine { public static final int BIZ_TYPE_GET = 0; public static final int BIZ_TYPE_PUT = 1; public static final int BIZ_TYPE_REMOVE = 2; @@ -172,9 +173,12 @@ private static void ensureRunning(KvStatus kvStatus) { } } + @Override + protected void doStart() { + } @Override - public void close() throws Exception { + protected void doStop(DtTime timeout, boolean force) { newStatus(KvStatus.CLOSED, null); } diff --git a/server/src/main/java/com/github/dtprj/dongting/raft/impl/ApplyManager.java b/server/src/main/java/com/github/dtprj/dongting/raft/impl/ApplyManager.java index ef43e29af..93176b80b 100644 --- a/server/src/main/java/com/github/dtprj/dongting/raft/impl/ApplyManager.java +++ b/server/src/main/java/com/github/dtprj/dongting/raft/impl/ApplyManager.java @@ -16,6 +16,7 @@ package com.github.dtprj.dongting.raft.impl; import com.github.dtprj.dongting.codec.ByteArrayEncoder; +import com.github.dtprj.dongting.common.DtTime; import com.github.dtprj.dongting.common.DtUtil; import com.github.dtprj.dongting.common.PerfCallback; import com.github.dtprj.dongting.common.Timestamp; @@ -147,8 +148,13 @@ public void apply() { condition.signal(); } - public void close() { + public void shutdown(DtTime timeout) { condition.signal(); + try { + stateMachine.stop(timeout); + } catch (Throwable e) { + log.error("state machine stop failed", e); + } } private FrameCallResult exec(RaftTask rt, long index, FrameCall resumePoint) { diff --git a/server/src/main/java/com/github/dtprj/dongting/raft/server/RaftServer.java b/server/src/main/java/com/github/dtprj/dongting/raft/server/RaftServer.java index 35e900c0c..e7ba70e91 100644 --- a/server/src/main/java/com/github/dtprj/dongting/raft/server/RaftServer.java +++ b/server/src/main/java/com/github/dtprj/dongting/raft/server/RaftServer.java @@ -489,7 +489,7 @@ private CompletableFuture stopGroup(RaftGroupImpl g, DtTime timeout) { @Override public FrameCallResult execute(Void input) { GroupComponents gc = g.getGroupComponents(); - gc.getApplyManager().close(); + gc.getApplyManager().shutdown(timeout); return gc.getRaftLog().close().await(this::afterRaftLogClose); } diff --git a/server/src/main/java/com/github/dtprj/dongting/raft/sm/StateMachine.java b/server/src/main/java/com/github/dtprj/dongting/raft/sm/StateMachine.java index b6ed47958..4e2b32e65 100644 --- a/server/src/main/java/com/github/dtprj/dongting/raft/sm/StateMachine.java +++ b/server/src/main/java/com/github/dtprj/dongting/raft/sm/StateMachine.java @@ -15,6 +15,7 @@ */ package com.github.dtprj.dongting.raft.sm; +import com.github.dtprj.dongting.common.LifeCircle; import com.github.dtprj.dongting.fiber.FiberFuture; import com.github.dtprj.dongting.raft.server.RaftInput; @@ -25,7 +26,7 @@ * * @author huangli */ -public interface StateMachine extends AutoCloseable, RaftCodecFactory { +public interface StateMachine extends LifeCircle, RaftCodecFactory { /** * this method is called in raft thread.