From 272ce198c2c9db5b8868cd93a751e848d3bee1d2 Mon Sep 17 00:00:00 2001 From: huangli Date: Wed, 8 Jan 2025 22:30:37 +0800 Subject: [PATCH] fix: transfer leader request send as one way --- .../dongting/codec/DecoderCallbackCreator.java | 5 +---- .../com/github/dtprj/dongting/net/NioClient.java | 4 ++++ .../dtprj/dongting/raft/impl/MemberManager.java | 15 ++++++++++++--- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/client/src/main/java/com/github/dtprj/dongting/codec/DecoderCallbackCreator.java b/client/src/main/java/com/github/dtprj/dongting/codec/DecoderCallbackCreator.java index 406727cd..369a1b85 100644 --- a/client/src/main/java/com/github/dtprj/dongting/codec/DecoderCallbackCreator.java +++ b/client/src/main/java/com/github/dtprj/dongting/codec/DecoderCallbackCreator.java @@ -22,9 +22,6 @@ public interface DecoderCallbackCreator { DecoderCallback apply(DecodeContext ctx); - DecoderCallbackCreator VOID_DECODE_CALLBACK_CREATOR = fixed(DecoderCallback.VOID_DECODE_CALLBACK); + DecoderCallbackCreator VOID_DECODE_CALLBACK_CREATOR = ctx -> DecoderCallback.VOID_DECODE_CALLBACK; - static DecoderCallbackCreator fixed(DecoderCallback c) { - return ctx -> c; - } } diff --git a/client/src/main/java/com/github/dtprj/dongting/net/NioClient.java b/client/src/main/java/com/github/dtprj/dongting/net/NioClient.java index 8ab61ccf..8c38e4de 100644 --- a/client/src/main/java/com/github/dtprj/dongting/net/NioClient.java +++ b/client/src/main/java/com/github/dtprj/dongting/net/NioClient.java @@ -135,11 +135,13 @@ public void waitStart() { public CompletableFuture> sendRequest(WritePacket request, DecoderCallbackCreator decoder, DtTime timeout) { + Objects.requireNonNull(decoder); return sendRequest(null, request, decoder, timeout); } public CompletableFuture> sendRequest(Peer peer, WritePacket request, DecoderCallbackCreator decoder, DtTime timeout) { + Objects.requireNonNull(decoder); CompletableFuture> f = new CompletableFuture<>(); send(worker, peer, request, decoder, timeout, new RpcCallback() { @Override @@ -157,11 +159,13 @@ public void fail(Throwable ex) { public void sendRequest(WritePacket request, DecoderCallbackCreator decoder, DtTime timeout, RpcCallback callback) { + Objects.requireNonNull(decoder); send(worker, null, request, decoder, timeout, callback); } public void sendRequest(Peer peer, WritePacket request, DecoderCallbackCreator decoder, DtTime timeout, RpcCallback callback) { + Objects.requireNonNull(decoder); send(worker, peer, request, decoder, timeout, callback); } diff --git a/server/src/main/java/com/github/dtprj/dongting/raft/impl/MemberManager.java b/server/src/main/java/com/github/dtprj/dongting/raft/impl/MemberManager.java index 68f6f7f5..e1470411 100644 --- a/server/src/main/java/com/github/dtprj/dongting/raft/impl/MemberManager.java +++ b/server/src/main/java/com/github/dtprj/dongting/raft/impl/MemberManager.java @@ -701,8 +701,10 @@ public static boolean validCandidate(RaftStatusImpl raftStatus, int nodeId) { } public void transferLeadership(int nodeId, CompletableFuture f, DtTime deadline) { - groupConfig.getFiberGroup().fireFiber("transfer-leader", - new TranferLeaderFiberFrame(nodeId, f, deadline)); + if (!groupConfig.getFiberGroup().fireFiber("transfer-leader", + new TranferLeaderFiberFrame(nodeId, f, deadline))) { + f.completeExceptionally(new RaftException("fire transfer leader fiber failed")); + } } private class TranferLeaderFiberFrame extends FiberFrame { @@ -717,6 +719,13 @@ private class TranferLeaderFiberFrame extends FiberFrame { this.deadline = deadline; } + @Override + protected FrameCallResult handle(Throwable ex) { + RaftUtil.clearTransferLeaderCondition(raftStatus); + f.completeExceptionally(ex); + return Fiber.frameReturn(); + } + @Override public FrameCallResult execute(Void input) { if (raftStatus.getTransferLeaderCondition() != null) { @@ -777,7 +786,7 @@ private FrameCallResult checkBeforeTransferLeader(Void v) { req.groupId = groupId; TransferLeaderReq.TransferLeaderReqWritePacket frame = new TransferLeaderReq.TransferLeaderReqWritePacket(req); client.sendRequest(newLeader.getNode().getPeer(), frame, - null, new DtTime(5, TimeUnit.SECONDS)) + DecoderCallbackCreator.VOID_DECODE_CALLBACK_CREATOR, new DtTime(5, TimeUnit.SECONDS)) .whenComplete((rf, ex) -> { if (ex != null) { log.error("transfer leader failed, groupId={}", groupId, ex);