diff --git a/CMakeLists.txt b/CMakeLists.txt index c5c46326..aacc2126 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -331,6 +331,7 @@ if (CODE_COVERAGE GREATER 0) strfmt_test stat_mgr_test logger_test + new_joiner_test ) # lcov diff --git a/include/libnuraft/peer.hxx b/include/libnuraft/peer.hxx index 62d67739..7473af25 100644 --- a/include/libnuraft/peer.hxx +++ b/include/libnuraft/peer.hxx @@ -99,6 +99,10 @@ public: return config_->is_learner(); } + bool is_new_joiner() const { + return config_->is_new_joiner(); + } + const srv_config& get_config() { return *config_; } diff --git a/include/libnuraft/raft_params.hxx b/include/libnuraft/raft_params.hxx index d11bfaf4..45aacc7a 100644 --- a/include/libnuraft/raft_params.hxx +++ b/include/libnuraft/raft_params.hxx @@ -94,6 +94,7 @@ struct raft_params { , return_method_(blocking) , auto_forwarding_req_timeout_(0) , grace_period_of_lagging_state_machine_(0) + , use_new_joiner_type_(false) , use_bg_thread_for_snapshot_io_(false) , use_full_consensus_among_healthy_members_(false) , parallel_log_appending_(false) @@ -553,6 +554,20 @@ public: */ int32 grace_period_of_lagging_state_machine_; + /** + * If `true`, the new joiner will be added to cluster config as a `new_joiner` + * even before syncing all data. The new joiner will not initiate a vote or + * participate in leader election. + * + * Once the log gap becomes smaller than `log_sync_stop_gap_`, the new joiner + * will be a regular member. + * + * The purpose of this featuer is to preserve the new joiner information + * even after leader re-election, in order to let the new leader continue + * the sync process without calling `add_srv` again. + */ + bool use_new_joiner_type_; + /** * (Experimental) * If `true`, reading snapshot objects will be done by a background thread diff --git a/include/libnuraft/srv_config.hxx b/include/libnuraft/srv_config.hxx index 6dd80ddc..03b33a1f 100644 --- a/include/libnuraft/srv_config.hxx +++ b/include/libnuraft/srv_config.hxx @@ -41,6 +41,7 @@ public: , dc_id_(0) , endpoint_(endpoint) , learner_(false) + , new_joiner_(false) , priority_(INIT_PRIORITY) {} @@ -55,6 +56,7 @@ public: , endpoint_(endpoint) , aux_(aux) , learner_(learner) + , new_joiner_(false) , priority_(priority) {} @@ -75,6 +77,10 @@ public: bool is_learner() const { return learner_; } + bool is_new_joiner() const { return new_joiner_; } + + void set_new_joiner(bool to) { new_joiner_ = to; } + int32 get_priority() const { return priority_; } void set_priority(const int32 new_val) { priority_ = new_val; } @@ -111,6 +117,12 @@ private: */ bool learner_; + /** + * `true` if this node is a new joiner, but not yet fully synced. + * New joiner will not initiate or participate in leader election. + */ + bool new_joiner_; + /** * Priority of this node. * 0 will never be a leader. diff --git a/scripts/test/runtests.sh b/scripts/test/runtests.sh index ddf60d2c..8c497526 100755 --- a/scripts/test/runtests.sh +++ b/scripts/test/runtests.sh @@ -7,5 +7,6 @@ set -e ./tests/strfmt_test --abort-on-failure ./tests/stat_mgr_test --abort-on-failure ./tests/raft_server_test --abort-on-failure +./tests/new_joiner_test --abort-on-failure ./tests/failure_test --abort-on-failure ./tests/asio_service_test --abort-on-failure diff --git a/src/handle_append_entries.cxx b/src/handle_append_entries.cxx index a07d5b88..fb886724 100644 --- a/src/handle_append_entries.cxx +++ b/src/handle_append_entries.cxx @@ -564,7 +564,7 @@ ptr raft_server::handle_append_entries(req_msg& req) // may cause stepping down of this node. ptr cur_config = get_config(); ptr my_config = cur_config->get_server(id_); - if (my_config) { + if (my_config && !my_config->is_new_joiner()) { p_in("catch-up process is done, clearing the flag"); catching_up_ = false; } @@ -1081,6 +1081,45 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) { resp.get_next_idx(), p->get_next_log_idx() ); } + if (!config_changing_ && p->get_config().is_new_joiner()) { + auto params = ctx_->get_params(); + uint64_t log_sync_stop_gap = + params->log_sync_stop_gap_ ? params->log_sync_stop_gap_ : 1; + uint64_t matched_idx = p->get_matched_idx(); + uint64_t next_slot = log_store_->next_slot(); + if (matched_idx + log_sync_stop_gap >= next_slot) { + p_in("peer %d is no longer a new joiner, matched index: %" PRIu64 ", " + "next slot: %" PRIu64 ", sync stop gap: %" PRIu64 + ", set new joiner flag to false", + p->get_id(), matched_idx, next_slot, log_sync_stop_gap); + + // Clone the current cluster config. + ptr cur_conf = get_config(); + ptr enc_conf_buf = cur_conf->serialize(); + ptr new_conf = cluster_config::deserialize(*enc_conf_buf); + new_conf->set_log_idx(log_store_->next_slot()); + + // Remove new joiner flag. + for (auto& ss: new_conf->get_servers()) { + if (ss->get_id() == p->get_id()) { + ss->set_new_joiner(false); + break; + } + } + + ptr new_conf_buf(new_conf->serialize()); + ptr entry( cs_new( state_->get_term(), + new_conf_buf, + log_val_type::conf, + timer_helper::get_timeofday_us() ) ); + store_log_entry(entry); + config_changing_ = true; + uncommitted_config_ = new_conf; + request_append_entries(); + return; + } + } + // NOTE: // If all other followers are not responding, we may not make // below condition true. In that case, we check the timeout of diff --git a/src/handle_commit.cxx b/src/handle_commit.cxx index 1c05ec08..3eff13a7 100644 --- a/src/handle_commit.cxx +++ b/src/handle_commit.cxx @@ -757,10 +757,17 @@ void raft_server::reconfigure(const ptr& new_config) { if (id_ == (*it)->get_id()) { my_priority_ = (*it)->get_priority(); steps_to_down_ = 0; - if (role_ == srv_role::follower && + if (!(*it)->is_new_joiner() && + role_ == srv_role::follower && catching_up_) { - // If this node is newly added, start election timer - // without waiting for the next append_entries message. + // Except for new joiner type, if this server is added + // to the cluster config, that means the sync is done. + // Start election timer without waiting for + // the next append_entries message. + // + // If this server is a new joiner, `catching_up_` flag + // will be cleared when it becomes a regular member, + // that is also notified by a new cluster config. p_in("now this node is the part of cluster, " "catch-up process is done, clearing the flag"); catching_up_ = false; @@ -799,6 +806,7 @@ void raft_server::reconfigure(const ptr& new_config) { str_buf << "add peer " << srv_added->get_id() << ", " << srv_added->get_endpoint() << ", " << (srv_added->is_learner() ? "learner" : "voting member") + << ", " << (srv_added->is_new_joiner() ? "new joiner" : "regular") << std::endl; peers_.insert(std::make_pair(srv_added->get_id(), p)); @@ -940,6 +948,7 @@ void raft_server::reconfigure(const ptr& new_config) { << ", DC ID " << s_conf->get_dc_id() << ", " << s_conf->get_endpoint() << ", " << (s_conf->is_learner() ? "learner" : "voting member") + << ", " << (s_conf->is_new_joiner() ? "new joiner" : "regular member") << ", " << s_conf->get_priority() << std::endl; } diff --git a/src/handle_join_leave.cxx b/src/handle_join_leave.cxx index 2121f532..30e45bab 100644 --- a/src/handle_join_leave.cxx +++ b/src/handle_join_leave.cxx @@ -58,8 +58,13 @@ ptr raft_server::handle_add_srv_req(req_msg& req) { // Before checking duplicate ID, confirm srv_to_leave_ is gone. check_srv_to_leave_timeout(); - ptr srv_conf = - srv_config::deserialize( entries[0]->get_buf() ); + ptr srv_conf = srv_config::deserialize( entries[0]->get_buf() ); + + ptr params = ctx_->get_params(); + if (params->use_new_joiner_type_) { + srv_conf->set_new_joiner(true); + } + if ( peers_.find( srv_conf->get_id() ) != peers_.end() || id_ == srv_conf->get_id() ) { p_wn( "the server to be added has a duplicated " @@ -232,7 +237,8 @@ void raft_server::sync_log_to_new_srv(ulong start_idx) { ptr params = ctx_->get_params(); if ( ( params->log_sync_stop_gap_ > 0 && gap < (ulong)params->log_sync_stop_gap_ ) || - params->log_sync_stop_gap_ == 0 ) { + params->log_sync_stop_gap_ == 0 || + params->use_new_joiner_type_ ) { p_in( "[SYNC LOG] LogSync is done for server %d " "with log gap %" PRIu64 " (%" PRIu64 " - %" PRIu64 ", limit %d), " "now put the server into cluster", diff --git a/src/handle_timeout.cxx b/src/handle_timeout.cxx index 530db127..f9eb611a 100644 --- a/src/handle_timeout.cxx +++ b/src/handle_timeout.cxx @@ -195,6 +195,7 @@ void raft_server::stop_election_timer() { return; } + p_tr("stop election timer"); cancel_task(election_task_); } diff --git a/src/handle_vote.cxx b/src/handle_vote.cxx index e6e23f27..b002cbae 100644 --- a/src/handle_vote.cxx +++ b/src/handle_vote.cxx @@ -254,7 +254,7 @@ void raft_server::request_vote(bool force_vote) { for (peer_itor it = peers_.begin(); it != peers_.end(); ++it) { ptr pp = it->second; if (!is_regular_member(pp)) { - // Do not send voting request to learner. + // Do not send voting request to learner or new joiner. continue; } ptr req = cs_new diff --git a/src/raft_server.cxx b/src/raft_server.cxx index 15dacb4b..3d0d7d0f 100644 --- a/src/raft_server.cxx +++ b/src/raft_server.cxx @@ -393,6 +393,7 @@ void raft_server::apply_and_log_current_params() { "max batch %d, backoff %d, snapshot distance %d, " "enable randomized snapshot creation %s, " "log sync stop gap %d, " + "use new joiner type %s, " "reserved logs %d, client timeout %d, " "auto forwarding %s, API call type %s, " "custom commit quorum size %d, " @@ -411,6 +412,7 @@ void raft_server::apply_and_log_current_params() { params->snapshot_distance_, params->enable_randomized_snapshot_creation_ ? "YES" : "NO", params->log_sync_stop_gap_, + params->use_new_joiner_type_ ? "YES" : "NO", params->reserved_log_items_, params->client_req_timeout_, ( params->auto_forwarding_ ? "ON" : "OFF" ), @@ -546,6 +548,9 @@ bool raft_server::is_regular_member(const ptr& p) { // Skip learner. if (p->is_learner()) return false; + // Skip new joiner. + if (p->is_new_joiner()) return false; + return true; } diff --git a/src/srv_config.cxx b/src/srv_config.cxx index 69c6743d..25dadbde 100644 --- a/src/srv_config.cxx +++ b/src/srv_config.cxx @@ -22,6 +22,9 @@ limitations under the License. namespace nuraft { +static const uint8_t LEARNER_FLAG = 0x1; +static const uint8_t NEW_JOINER_FLAG = 0x2; + ptr srv_config::deserialize(buffer& buf) { buffer_serializer bs(buf); return deserialize(bs); @@ -34,9 +37,17 @@ ptr srv_config::deserialize(buffer_serializer& bs) { const char* aux_char = bs.get_cstr(); std::string endpoint( (endpoint_char) ? endpoint_char : std::string() ); std::string aux( (aux_char) ? aux_char : std::string() ); - byte is_learner = bs.get_u8(); + + uint8_t srv_type = bs.get_u8(); + bool is_learner = (srv_type & LEARNER_FLAG) ? true : false; + bool new_joiner = (srv_type & NEW_JOINER_FLAG) ? true : false; + int32 priority = bs.get_i32(); - return cs_new(id, dc_id, endpoint, aux, is_learner, priority); + + ptr ret = + cs_new(id, dc_id, endpoint, aux, is_learner, priority); + ret->set_new_joiner(new_joiner); + return ret; } ptr srv_config::serialize() const{ @@ -52,7 +63,12 @@ ptr srv_config::serialize() const{ buf->put(dc_id_); buf->put(endpoint_); buf->put(aux_); - buf->put((byte)(learner_?(1):(0))); + + uint8_t srv_type = 0x0; + srv_type |= (learner_ ? LEARNER_FLAG : 0x0); + srv_type |= (new_joiner_ ? NEW_JOINER_FLAG : 0x0); + buf->put((byte)srv_type); + buf->put(priority_); buf->pos(0); return buf; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 39a44a85..59146b03 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -1,4 +1,4 @@ -# === Basic Raft server functionality test === +# === Basic Raft server functionality tests without real network stack === add_executable(raft_server_test unit/raft_server_test.cxx unit/fake_network.cxx @@ -9,6 +9,17 @@ add_dependencies(raft_server_test target_link_libraries(raft_server_test ${BUILD_DIR}/${LIBRARY_OUTPUT_NAME}) +add_executable(new_joiner_test + unit/new_joiner_test.cxx + unit/fake_network.cxx + ${EXAMPLES_SRC}/logger.cc + ${EXAMPLES_SRC}/in_memory_log_store.cxx) +add_dependencies(new_joiner_test + static_lib) +target_link_libraries(new_joiner_test + ${BUILD_DIR}/${LIBRARY_OUTPUT_NAME}) + + # === Failure recovery & conflict resolution test === add_executable(failure_test unit/failure_test.cxx diff --git a/tests/unit/new_joiner_test.cxx b/tests/unit/new_joiner_test.cxx new file mode 100644 index 00000000..2f27c25e --- /dev/null +++ b/tests/unit/new_joiner_test.cxx @@ -0,0 +1,379 @@ +/************************************************************************ +Copyright 2017-present eBay Inc. +Author/Developer(s): Jung-Sang Ahn + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +**************************************************************************/ + +#include "debugging_options.hxx" +#include "fake_network.hxx" +#include "raft_package_fake.hxx" + +#include "raft_params.hxx" +#include "test_common.h" + +#include + +using namespace nuraft; +using namespace raft_functional_common; + +using raft_result = cmd_result< ptr >; + +namespace new_joiner_test { + +int append_logs(size_t num_appends, + RaftPkg& leader, + const std::vector& pkgs) +{ + // Append messages asynchronously. + std::list< ptr< cmd_result< ptr > > > handlers; + for (size_t ii = 0; ii < num_appends; ++ii) { + std::string test_msg = "test" + std::to_string(ii); + ptr msg = buffer::alloc(test_msg.size() + 1); + msg->put(test_msg); + ptr< cmd_result< ptr > > ret = + leader.raftServer->append_entries( {msg} ); + CHK_TRUE( ret->get_accepted() ); + handlers.push_back(ret); + } + + // Packet for pre-commit. + leader.fNet->execReqResp(); + // Packet for commit. + leader.fNet->execReqResp(); + // Wait for bg commit. + CHK_Z( wait_for_sm_exec(pkgs, COMMIT_TIMEOUT_SEC) ); + + // One more time to make sure. + leader.fNet->execReqResp(); + leader.fNet->execReqResp(); + CHK_Z( wait_for_sm_exec(pkgs, COMMIT_TIMEOUT_SEC) ); + + return 0; +} + +int add_new_joiner(RaftPkg& leader, + RaftPkg& new_joiner, + const std::vector& pkgs_old, + const std::vector& pkgs_new) +{ + // Now add a new member. + leader.raftServer->add_srv( *(new_joiner.getTestMgr()->get_srv_config()) ); + + // Join req/resp. + leader.fNet->execReqResp(); + // Add new server, notify existing peers. + // After getting response, it will make configuration commit. + leader.fNet->execReqResp(); + // Notify new commit. + leader.fNet->execReqResp(); + // Wait for bg commit for configuration change. + CHK_Z( wait_for_sm_exec(pkgs_new, COMMIT_TIMEOUT_SEC) ); + + // The original members should see S3 as a new joiner. + for (auto m: pkgs_old) { + CHK_TRUE( m->raftServer->get_srv_config(new_joiner.myId)->is_new_joiner() ); + } + + return 0; +} + +int basic_test() { + reset_log_files(); + ptr f_base = cs_new(); + + std::string s1_addr = "S1"; + std::string s2_addr = "S2"; + std::string s3_addr = "S3"; + + RaftPkg s1(f_base, 1, s1_addr); + RaftPkg s2(f_base, 2, s2_addr); + RaftPkg s3(f_base, 3, s3_addr); + + // Exclude s3 at first. + std::vector pkgs_old = {&s1, &s2}; + std::vector pkgs_new = {&s1, &s2, &s3}; + + CHK_Z( launch_servers( pkgs_new ) ); + CHK_Z( make_group( pkgs_old ) ); + + for (auto& entry: pkgs_new) { + RaftPkg* pp = entry; + raft_params param = pp->raftServer->get_current_params(); + param.return_method_ = raft_params::async_handler; + param.use_new_joiner_type_ = true; + pp->raftServer->update_params(param); + } + + const size_t NUM = 10; + CHK_Z( append_logs(NUM, s1, pkgs_old) ); + CHK_Z( add_new_joiner(s1, s3, pkgs_old, pkgs_new) ); + + // Send snapshot. + s1.fTimer->invoke( timer_task_type::heartbeat_timer ); + for (size_t ii = 0; ii < NUM + 5; ++ii) { + s1.fNet->execReqResp(); + } + // Wait for bg commit for configuration change. + CHK_Z( wait_for_sm_exec(pkgs_new, COMMIT_TIMEOUT_SEC) ); + // After getting response, it will make configuration commit. + s1.fNet->execReqResp(); + // Notify new commit. + s1.fNet->execReqResp(); + + // Now all of them see S3 as a normal member. + CHK_FALSE( s1.raftServer->get_srv_config(3)->is_new_joiner() ); + CHK_FALSE( s2.raftServer->get_srv_config(3)->is_new_joiner() ); + CHK_FALSE( s3.raftServer->get_srv_config(3)->is_new_joiner() ); + + print_stats(pkgs_new); + + s1.raftServer->shutdown(); + s2.raftServer->shutdown(); + s3.raftServer->shutdown(); + + f_base->destroy(); + + return 0; +} + +int initiate_vote_test() { + reset_log_files(); + ptr f_base = cs_new(); + + std::string s1_addr = "S1"; + std::string s2_addr = "S2"; + std::string s3_addr = "S3"; + std::string s4_addr = "S4"; + + RaftPkg s1(f_base, 1, s1_addr); + RaftPkg s2(f_base, 2, s2_addr); + RaftPkg s3(f_base, 3, s3_addr); + RaftPkg s4(f_base, 4, s4_addr); + + // Exclude s3 at first. + std::vector pkgs_quorum = {&s1, &s2}; + std::vector pkgs_old = {&s1, &s2, &s3}; + std::vector pkgs_new = {&s1, &s2, &s3, &s4}; + + CHK_Z( launch_servers( pkgs_new ) ); + CHK_Z( make_group( pkgs_old ) ); + + for (auto& entry: pkgs_new) { + RaftPkg* pp = entry; + raft_params param = pp->raftServer->get_current_params(); + param.return_method_ = raft_params::async_handler; + param.use_new_joiner_type_ = true; + + // Avoid snapshot. + param.reserved_log_items_ = 1000; + param.snapshot_distance_ = 1000; + pp->raftServer->update_params(param); + } + + const size_t NUM = 10; + CHK_Z( append_logs(NUM, s1, pkgs_old) ); + CHK_Z( add_new_joiner(s1, s4, pkgs_old, pkgs_new) ); + + // Append more logs, but replicate to S2 only. + // It should be able to commit even with 2 servers, as S4 is still new joiner. + std::list< ptr< cmd_result< ptr > > > handlers; + for (size_t ii = 0; ii < NUM; ++ii) { + std::string test_msg = "test" + std::to_string(ii); + ptr msg = buffer::alloc(test_msg.size() + 1); + msg->put(test_msg); + ptr< cmd_result< ptr > > ret = + s1.raftServer->append_entries( {msg} ); + CHK_TRUE( ret->get_accepted() ); + handlers.push_back(ret); + } + + s1.fNet->execReqResp(s2_addr); + s1.fNet->execReqResp(s2_addr); + CHK_Z( wait_for_sm_exec(pkgs_quorum, COMMIT_TIMEOUT_SEC) ); + + s1.fNet->execReqResp(s2_addr); + s1.fNet->execReqResp(s2_addr); + CHK_Z( wait_for_sm_exec(pkgs_quorum, COMMIT_TIMEOUT_SEC) ); + + // Even with 2 servers, all logs should be committed. + CHK_EQ( s1.raftServer->get_last_log_idx(), + s1.raftServer->get_committed_log_idx() ); + + // Replicate logs to S4, up to the first config, but not all. + // As a result, S4 will recognize itself as a new joiner, + // but still there is a log gap. + for (auto& entry: pkgs_new) { + RaftPkg* pp = entry; + raft_params param = pp->raftServer->get_current_params(); + param.max_append_size_ = 1; + pp->raftServer->update_params(param); + } + + s1.fTimer->invoke( timer_task_type::heartbeat_timer ); + for (size_t ii = 0; ii < NUM + 5; ++ii) { + s1.fNet->execReqResp(); + } + CHK_Z( wait_for_sm_exec(pkgs_new, COMMIT_TIMEOUT_SEC) ); + + // Initiate election timer for S4, it should do nothing. + s4.fTimer->invoke( timer_task_type::election_timer ); + for (const auto& endpoint: {s1_addr, s2_addr, s3_addr}) { + CHK_Z( s4.fNet->getNumPendingReqs(endpoint) ); + } + CHK_Z( s4.fTimer->getNumPendingTasks() ); + + // Initiate election timer for S3, it should start election, + // but it should not send vote request to S4. + s3.fTimer->invoke( timer_task_type::election_timer ); + CHK_Z( s3.fNet->getNumPendingReqs(s4_addr) ); + + s2.fTimer->invoke( timer_task_type::election_timer ); + CHK_Z( s2.fNet->getNumPendingReqs(s4_addr) ); + + // Leader election should succeed by only vote from S2. + s3.fNet->execReqResp(s2_addr); + s3.fNet->execReqResp(s2_addr); + CHK_Z( wait_for_sm_exec(pkgs_new, COMMIT_TIMEOUT_SEC) ); + + s3.fNet->execReqResp(s2_addr); + s3.fNet->execReqResp(s2_addr); + CHK_Z( wait_for_sm_exec(pkgs_new, COMMIT_TIMEOUT_SEC) ); + + CHK_TRUE( s3.raftServer->is_leader() ); + CHK_EQ(3, s2.raftServer->get_leader() ); + + print_stats(pkgs_new); + + s1.raftServer->shutdown(); + s2.raftServer->shutdown(); + s3.raftServer->shutdown(); + + f_base->destroy(); + + return 0; +} + +int new_joiner_take_over_test() { + reset_log_files(); + ptr f_base = cs_new(); + + std::string s1_addr = "S1"; + std::string s2_addr = "S2"; + std::string s3_addr = "S3"; + std::string s4_addr = "S4"; + + RaftPkg s1(f_base, 1, s1_addr); + RaftPkg s2(f_base, 2, s2_addr); + RaftPkg s3(f_base, 3, s3_addr); + RaftPkg s4(f_base, 4, s4_addr); + + // Exclude s3 at first. + std::vector pkgs_quorum = {&s1, &s2}; + std::vector pkgs_old = {&s1, &s2, &s3}; + std::vector pkgs_new = {&s1, &s2, &s3, &s4}; + + CHK_Z( launch_servers( pkgs_new ) ); + CHK_Z( make_group( pkgs_old ) ); + + for (auto& entry: pkgs_new) { + RaftPkg* pp = entry; + raft_params param = pp->raftServer->get_current_params(); + param.return_method_ = raft_params::async_handler; + param.use_new_joiner_type_ = true; + pp->raftServer->update_params(param); + } + + // Setting it to 8, as 10 makes the last config index 15. + const size_t NUM = 8; + CHK_Z( append_logs(NUM, s1, pkgs_old) ); + CHK_Z( add_new_joiner(s1, s4, pkgs_old, pkgs_new) ); + + // Now S3 takes over the leadership. + s2.fTimer->invoke( timer_task_type::election_timer ); + s3.fTimer->invoke( timer_task_type::election_timer ); + + s3.fNet->execReqResp(); + s3.fNet->execReqResp(); + CHK_Z( wait_for_sm_exec(pkgs_new, COMMIT_TIMEOUT_SEC) ); + + s3.fNet->execReqResp(); + s3.fNet->execReqResp(); + CHK_Z( wait_for_sm_exec(pkgs_new, COMMIT_TIMEOUT_SEC) ); + + // S3 takes over the new joiner and send snapshot. + s3.fTimer->invoke( timer_task_type::heartbeat_timer ); + for (size_t ii = 0; ii < NUM + 5; ++ii) { + s3.fNet->execReqResp(); + } + // Wait for bg commit for configuration change. + CHK_Z( wait_for_sm_exec(pkgs_new, COMMIT_TIMEOUT_SEC) ); + // After getting response, it will make configuration commit. + s3.fNet->execReqResp(); + // Notify new commit. + s3.fNet->execReqResp(); + + // Now all of them see S4 as a normal member. + for (auto& ss: pkgs_new) { + TestSuite::setInfo("server id %d", ss->myId); + CHK_FALSE( ss->raftServer->get_srv_config(4)->is_new_joiner() ); + } + + print_stats(pkgs_new); + + s1.raftServer->shutdown(); + s2.raftServer->shutdown(); + s3.raftServer->shutdown(); + + f_base->destroy(); + + return 0; +} + +} // namespace new_joiner_test; +using namespace new_joiner_test; + +int main(int argc, char** argv) { + TestSuite ts(argc, argv); + + ts.options.printTestMessage = true; + + // Disable reconnection timer for deterministic test. + debugging_options::get_instance().disable_reconn_backoff_ = true; + + ts.doTest( "basic test", + basic_test ); + + ts.doTest( "initiate vote test", + initiate_vote_test ); + + ts.doTest( "new joiner take over test", + new_joiner_take_over_test ); + +#ifdef ENABLE_RAFT_STATS + _msg("raft stats: ENABLED\n"); +#else + _msg("raft stats: DISABLED\n"); +#endif + _msg("num allocs: %zu\n" + "amount of allocs: %zu bytes\n" + "num active buffers: %zu\n" + "amount of active buffers: %zu bytes\n", + raft_server::get_stat_counter("num_buffer_allocs"), + raft_server::get_stat_counter("amount_buffer_allocs"), + raft_server::get_stat_counter("num_active_buffers"), + raft_server::get_stat_counter("amount_active_buffers")); + + return 0; +} +