Skip to content

Commit

Permalink
Notify user code about follower loss (#517)
Browse files Browse the repository at this point in the history
* Notify user code about follower loss

* PR review fixes

* Fixed a warning "comparison of integer expressions of different signedness"

* [Update PR] Add comments and put more set_recovered() calls

---------

Co-authored-by: Pavel Yurin <[email protected]>
Co-authored-by: Jung-Sang Ahn <[email protected]>
  • Loading branch information
3 people authored Jul 18, 2024
1 parent 35083c6 commit 0e28077
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 16 deletions.
7 changes: 7 additions & 0 deletions include/libnuraft/callback.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,13 @@ public:
*/
ResignationFromLeader = 27,

/**
* When a peer RPC errors count exceeds raft_server::limits.warning_limit_, or
* a peer doesn't respond for a long time (raft_params::leadership_expiry_),
* the peer is considered lost.
* ctx: null.
*/
FollowerLost = 28,
};

struct Param {
Expand Down
11 changes: 11 additions & 0 deletions include/libnuraft/peer.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public:
, reconn_backoff_(0)
, suppress_following_error_(false)
, abandoned_(false)
, lost_by_leader_(false)
, rsv_msg_(nullptr)
, rsv_msg_handler_(nullptr)
, l_(logger)
Expand Down Expand Up @@ -302,6 +303,10 @@ public:
ptr<req_msg> get_rsv_msg() const { return rsv_msg_; }
rpc_handler get_rsv_msg_handler() const { return rsv_msg_handler_; }

bool is_lost() const { return lost_by_leader_; }
void set_lost() { lost_by_leader_ = true; }
void set_recovered() { lost_by_leader_ = false; }

private:
void handle_rpc_result(ptr<peer> myself,
ptr<rpc_client> my_rpc_client,
Expand Down Expand Up @@ -498,6 +503,12 @@ private:
*/
std::atomic<bool> abandoned_;

/**
* If `true`, this peer is considered unresponsive
* and treated as if it has been lost.
*/
std::atomic<bool> lost_by_leader_;

/**
* Reserved message that should be sent next time.
*/
Expand Down
5 changes: 4 additions & 1 deletion include/libnuraft/raft_server.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -886,9 +886,12 @@ protected:
int32 get_quorum_for_election();
int32 get_quorum_for_commit();
int32 get_leadership_expiry();
size_t get_not_responding_peers();
std::list<ptr<peer>> get_not_responding_peers();
size_t get_not_responding_peers_count();
size_t get_num_stale_peers();

void apply_to_not_responding_peers(const std::function<void(const ptr<peer>&)>&);

ptr<resp_msg> handle_append_entries(req_msg& req);
ptr<resp_msg> handle_prevote_req(req_msg& req);
ptr<resp_msg> handle_vote_req(req_msg& req);
Expand Down
4 changes: 2 additions & 2 deletions src/handle_append_entries.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ bool raft_server::request_append_entries(ptr<peer> p) {
chk_timer.timeout_and_reset() ) {
// If auto adjust mode is on for 2-node cluster, and
// the follower is not responding, adjust the quorum.
size_t num_not_responding_peers = get_not_responding_peers();
size_t num_not_responding_peers = get_not_responding_peers_count();
size_t cur_quorum_size = get_quorum_for_commit();
size_t num_stale_peers = get_num_stale_peers();
if (cur_quorum_size >= 1) {
Expand Down Expand Up @@ -1191,7 +1191,7 @@ ulong raft_server::get_expected_committed_log_idx() {

size_t quorum_idx = get_quorum_for_commit();
if (ctx_->get_params()->use_full_consensus_among_healthy_members_) {
size_t not_responding_peers = get_not_responding_peers();
size_t not_responding_peers = get_not_responding_peers_count();
if (not_responding_peers < voting_members - quorum_idx) {
// If full consensus option is on, commit should be
// agreed by all healthy members, and the number of
Expand Down
64 changes: 51 additions & 13 deletions src/raft_server.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -608,27 +608,43 @@ int32 raft_server::get_leadership_expiry() {
return expiry;
}

size_t raft_server::get_not_responding_peers() {
// Check if quorum nodes are not responding
// (i.e., don't respond 20x heartbeat time long).
std::list<ptr<peer>> raft_server::get_not_responding_peers() {
std::list<ptr<peer>> rs;
auto cb = [&rs](const ptr<peer>& peer_ptr) {
rs.push_back(peer_ptr);
};
apply_to_not_responding_peers(cb);
return rs;
}

size_t raft_server::get_not_responding_peers_count() {
size_t num_not_resp_nodes = 0;
auto cb = [&num_not_resp_nodes](const ptr<peer>&) {
++num_not_resp_nodes;
};
apply_to_not_responding_peers(cb);
return num_not_resp_nodes;
}

void raft_server::apply_to_not_responding_peers(
const std::function<void(const ptr<peer>&)>& callback) {
// Check if quorum nodes are not responding
// (i.e., don't respond 20x heartbeat time long).
ptr<raft_params> params = ctx_->get_params();
int expiry = params->heart_beat_interval_ *
raft_server::raft_limits_.response_limit_;
int expiry = params->heart_beat_interval_ * raft_server::raft_limits_.response_limit_;

// Check the number of not responding peers.
// Check not responding peers.
for (auto& entry: peers_) {
ptr<peer> p = entry.second;
const auto& peer_ptr = entry.second;

if (!is_regular_member(p)) continue;
if (!is_regular_member(peer_ptr)) continue;

int32 resp_elapsed_ms = (int32)(p->get_resp_timer_us() / 1000);
if ( resp_elapsed_ms > expiry ) {
num_not_resp_nodes++;
const auto resp_elapsed_ms =
static_cast<int32>(peer_ptr->get_resp_timer_us() / 1000);
if (resp_elapsed_ms > expiry) {
callback(peer_ptr);
}
}
return num_not_resp_nodes;
}

size_t raft_server::get_num_stale_peers() {
Expand Down Expand Up @@ -804,6 +820,14 @@ void raft_server::handle_peer_resp(ptr<resp_msg>& resp, ptr<rpc_exception>& err)
} else if (rpc_errs == raft_server::raft_limits_.warning_limit_) {
p_wn("too verbose RPC error on peer (%d), "
"will suppress it from now", peer_id);
if (!pp || !pp->is_lost()) {
if (pp) {
pp->set_lost();
}
cb_func::Param param(id_, leader_, peer_id);
const auto rc = ctx_->cb_func_.call(cb_func::FollowerLost, &param);
assert(rc == cb_func::ReturnCode::Ok);
}
}

if (pp && pp->is_leave_flag_set()) {
Expand Down Expand Up @@ -840,6 +864,7 @@ void raft_server::handle_peer_resp(ptr<resp_msg>& resp, ptr<rpc_exception>& err)
p_wn("recovered from RPC failure from peer %d, %d errors",
resp->get_src(), rpc_errs);
}
pp->set_recovered();
pp->reset_rpc_errs();
pp->reset_resp_timer();
}
Expand Down Expand Up @@ -1019,6 +1044,7 @@ void raft_server::become_leader() {

pp->set_next_log_idx(log_store_->next_slot());
enable_hb_for_peer(*pp);
pp->set_recovered();
}

// If there are uncommitted logs, search if conf log exists.
Expand Down Expand Up @@ -1085,7 +1111,7 @@ bool raft_server::check_leadership_validity() {
int32 num_voting_members = get_num_voting_members();

int leadership_expiry = get_leadership_expiry();
int32 nr_peers = (int32)get_not_responding_peers();
int32 nr_peers = (int32)get_not_responding_peers_count();
if (leadership_expiry < 0) {
// Negative expiry: leadership will never expire.
nr_peers = 0;
Expand All @@ -1102,6 +1128,18 @@ bool raft_server::check_leadership_validity() {
get_leadership_expiry(),
min_quorum_size);

const auto nr_peers_list = get_not_responding_peers();
assert(nr_peers_list.size() == static_cast<std::size_t>(nr_peers));
for (auto& peer : nr_peers_list) {
if (peer->is_lost()) {
continue;
}
peer->set_lost();
cb_func::Param param(id_, leader_, peer->get_id());
const auto rc = ctx_->cb_func_.call(cb_func::FollowerLost, &param);
assert(rc == cb_func::ReturnCode::Ok);
}

// NOTE:
// For a cluster where the number of members is the same
// as the size of quorum, we should not expire leadership,
Expand Down

0 comments on commit 0e28077

Please sign in to comment.