Skip to content

Commit

Permalink
fix: transfer leader request send as one way
Browse files Browse the repository at this point in the history
  • Loading branch information
areyouok committed Jan 8, 2025
1 parent 0914803 commit 272ce19
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
public interface DecoderCallbackCreator<T> {
DecoderCallback<T> apply(DecodeContext ctx);

DecoderCallbackCreator<Void> VOID_DECODE_CALLBACK_CREATOR = fixed(DecoderCallback.VOID_DECODE_CALLBACK);
DecoderCallbackCreator<Void> VOID_DECODE_CALLBACK_CREATOR = ctx -> DecoderCallback.VOID_DECODE_CALLBACK;

static <T> DecoderCallbackCreator<T> fixed(DecoderCallback<T> c) {
return ctx -> c;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,13 @@ public void waitStart() {

public <T> CompletableFuture<ReadPacket<T>> sendRequest(WritePacket request,
DecoderCallbackCreator<T> decoder, DtTime timeout) {
Objects.requireNonNull(decoder);
return sendRequest(null, request, decoder, timeout);
}

public <T> CompletableFuture<ReadPacket<T>> sendRequest(Peer peer, WritePacket request,
DecoderCallbackCreator<T> decoder, DtTime timeout) {
Objects.requireNonNull(decoder);
CompletableFuture<ReadPacket<T>> f = new CompletableFuture<>();
send(worker, peer, request, decoder, timeout, new RpcCallback<T>() {
@Override
Expand All @@ -157,11 +159,13 @@ public void fail(Throwable ex) {

public <T> void sendRequest(WritePacket request, DecoderCallbackCreator<T> decoder,
DtTime timeout, RpcCallback<T> callback) {
Objects.requireNonNull(decoder);
send(worker, null, request, decoder, timeout, callback);
}

public <T> void sendRequest(Peer peer, WritePacket request, DecoderCallbackCreator<T> decoder,
DtTime timeout, RpcCallback<T> callback) {
Objects.requireNonNull(decoder);
send(worker, peer, request, decoder, timeout, callback);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,8 +701,10 @@ public static boolean validCandidate(RaftStatusImpl raftStatus, int nodeId) {
}

public void transferLeadership(int nodeId, CompletableFuture<Void> f, DtTime deadline) {
groupConfig.getFiberGroup().fireFiber("transfer-leader",
new TranferLeaderFiberFrame(nodeId, f, deadline));
if (!groupConfig.getFiberGroup().fireFiber("transfer-leader",
new TranferLeaderFiberFrame(nodeId, f, deadline))) {
f.completeExceptionally(new RaftException("fire transfer leader fiber failed"));
}
}

private class TranferLeaderFiberFrame extends FiberFrame<Void> {
Expand All @@ -717,6 +719,13 @@ private class TranferLeaderFiberFrame extends FiberFrame<Void> {
this.deadline = deadline;
}

@Override
protected FrameCallResult handle(Throwable ex) {
RaftUtil.clearTransferLeaderCondition(raftStatus);
f.completeExceptionally(ex);
return Fiber.frameReturn();
}

@Override
public FrameCallResult execute(Void input) {
if (raftStatus.getTransferLeaderCondition() != null) {
Expand Down Expand Up @@ -777,7 +786,7 @@ private FrameCallResult checkBeforeTransferLeader(Void v) {
req.groupId = groupId;
TransferLeaderReq.TransferLeaderReqWritePacket frame = new TransferLeaderReq.TransferLeaderReqWritePacket(req);
client.sendRequest(newLeader.getNode().getPeer(), frame,
null, new DtTime(5, TimeUnit.SECONDS))
DecoderCallbackCreator.VOID_DECODE_CALLBACK_CREATOR, new DtTime(5, TimeUnit.SECONDS))
.whenComplete((rf, ex) -> {
if (ex != null) {
log.error("transfer leader failed, groupId={}", groupId, ex);
Expand Down

0 comments on commit 272ce19

Please sign in to comment.