diff --git a/client/src/main/java/com/github/dtprj/dongting/codec/DecoderCallbackCreator.java b/client/src/main/java/com/github/dtprj/dongting/codec/DecoderCallbackCreator.java index 406727cd..369a1b85 100644 --- a/client/src/main/java/com/github/dtprj/dongting/codec/DecoderCallbackCreator.java +++ b/client/src/main/java/com/github/dtprj/dongting/codec/DecoderCallbackCreator.java @@ -22,9 +22,6 @@ public interface DecoderCallbackCreator { DecoderCallback apply(DecodeContext ctx); - DecoderCallbackCreator VOID_DECODE_CALLBACK_CREATOR = fixed(DecoderCallback.VOID_DECODE_CALLBACK); + DecoderCallbackCreator VOID_DECODE_CALLBACK_CREATOR = ctx -> DecoderCallback.VOID_DECODE_CALLBACK; - static DecoderCallbackCreator fixed(DecoderCallback c) { - return ctx -> c; - } } diff --git a/client/src/main/java/com/github/dtprj/dongting/net/NioClient.java b/client/src/main/java/com/github/dtprj/dongting/net/NioClient.java index 8ab61ccf..8c38e4de 100644 --- a/client/src/main/java/com/github/dtprj/dongting/net/NioClient.java +++ b/client/src/main/java/com/github/dtprj/dongting/net/NioClient.java @@ -135,11 +135,13 @@ public void waitStart() { public CompletableFuture> sendRequest(WritePacket request, DecoderCallbackCreator decoder, DtTime timeout) { + Objects.requireNonNull(decoder); return sendRequest(null, request, decoder, timeout); } public CompletableFuture> sendRequest(Peer peer, WritePacket request, DecoderCallbackCreator decoder, DtTime timeout) { + Objects.requireNonNull(decoder); CompletableFuture> f = new CompletableFuture<>(); send(worker, peer, request, decoder, timeout, new RpcCallback() { @Override @@ -157,11 +159,13 @@ public void fail(Throwable ex) { public void sendRequest(WritePacket request, DecoderCallbackCreator decoder, DtTime timeout, RpcCallback callback) { + Objects.requireNonNull(decoder); send(worker, null, request, decoder, timeout, callback); } public void sendRequest(Peer peer, WritePacket request, DecoderCallbackCreator decoder, DtTime timeout, RpcCallback callback) { + Objects.requireNonNull(decoder); send(worker, peer, request, decoder, timeout, callback); } diff --git a/server/src/main/java/com/github/dtprj/dongting/raft/impl/MemberManager.java b/server/src/main/java/com/github/dtprj/dongting/raft/impl/MemberManager.java index 68f6f7f5..e1470411 100644 --- a/server/src/main/java/com/github/dtprj/dongting/raft/impl/MemberManager.java +++ b/server/src/main/java/com/github/dtprj/dongting/raft/impl/MemberManager.java @@ -701,8 +701,10 @@ public static boolean validCandidate(RaftStatusImpl raftStatus, int nodeId) { } public void transferLeadership(int nodeId, CompletableFuture f, DtTime deadline) { - groupConfig.getFiberGroup().fireFiber("transfer-leader", - new TranferLeaderFiberFrame(nodeId, f, deadline)); + if (!groupConfig.getFiberGroup().fireFiber("transfer-leader", + new TranferLeaderFiberFrame(nodeId, f, deadline))) { + f.completeExceptionally(new RaftException("fire transfer leader fiber failed")); + } } private class TranferLeaderFiberFrame extends FiberFrame { @@ -717,6 +719,13 @@ private class TranferLeaderFiberFrame extends FiberFrame { this.deadline = deadline; } + @Override + protected FrameCallResult handle(Throwable ex) { + RaftUtil.clearTransferLeaderCondition(raftStatus); + f.completeExceptionally(ex); + return Fiber.frameReturn(); + } + @Override public FrameCallResult execute(Void input) { if (raftStatus.getTransferLeaderCondition() != null) { @@ -777,7 +786,7 @@ private FrameCallResult checkBeforeTransferLeader(Void v) { req.groupId = groupId; TransferLeaderReq.TransferLeaderReqWritePacket frame = new TransferLeaderReq.TransferLeaderReqWritePacket(req); client.sendRequest(newLeader.getNode().getPeer(), frame, - null, new DtTime(5, TimeUnit.SECONDS)) + DecoderCallbackCreator.VOID_DECODE_CALLBACK_CREATOR, new DtTime(5, TimeUnit.SECONDS)) .whenComplete((rf, ex) -> { if (ex != null) { log.error("transfer leader failed, groupId={}", groupId, ex);