Skip to content

Commit

Permalink
No peer manager is used to work with active peers
Browse files Browse the repository at this point in the history
  • Loading branch information
ErakhtinB committed Dec 26, 2024
1 parent 2aa0cf0 commit 99d72bb
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 68 deletions.
108 changes: 41 additions & 67 deletions core/network/impl/synchronizer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ namespace {

namespace kagome::network {

static constexpr auto default_max_peers_for_block_request = 5;

SynchronizerImpl::SynchronizerImpl(
const application::AppConfiguration &app_config,
application::AppStateManager &app_state_manager,
Expand Down Expand Up @@ -125,8 +123,7 @@ namespace kagome::network {
chain_sub_engine_(std::move(chain_sub_engine)),
main_pool_handler_{
poolHandlerReadyMake(app_state_manager, main_thread_pool)},
block_storage_{std::move(block_storage)},
best_block_number_{} {
block_storage_{std::move(block_storage)} {
BOOST_ASSERT(block_tree_);
BOOST_ASSERT(block_executor_);
BOOST_ASSERT(trie_node_db_);
Expand Down Expand Up @@ -357,8 +354,6 @@ namespace kagome::network {
known_blocks_.find(header.parent_hash) != known_blocks_.end()
or block_tree_->has(header.parent_hash);

best_block_number_ = std::max(best_block_number_, header.number);

if (parent_is_known) {
loadBlocks(peer_id, block_info, [wp{weak_from_this()}](auto res) {
if (auto self = wp.lock()) {
Expand Down Expand Up @@ -1163,19 +1158,32 @@ namespace kagome::network {
continue;
}

for (auto p_it = peers.begin(); p_it != peers.end();) {
auto cp_it = p_it++;

auto &peer_id = *cp_it;

if (busy_peers_.find(peer_id) != busy_peers_.end()) {
SL_TRACE(log_, "Peer {} for block {} is busy", peer_id, block_info);
continue;
std::vector<libp2p::peer::PeerId> active_peers;
for (auto it = peers.begin(); it != peers.end(); ++it) {
const auto &peer_id = *it;
if (busy_peers_.find(peer_id) == busy_peers_.end()) {
active_peers.push_back(peer_id);
}

busy_peers_.insert(peers.extract(cp_it));
SL_TRACE(log_, "Peer {} marked as busy", peer_id);

}
std::random_device rd;
std::mt19937 gen(rd());
std::ranges::shuffle(active_peers, gen);
while (active_peers.size()
> static_cast<size_t>(max_parallel_downloads_)) {
active_peers.pop_back();
}
SL_INFO(log_,
"Active peers number is {} for block {}",
active_peers.size(),
block_info);
for (auto it = active_peers.begin(); it != active_peers.end(); ++it) {
SL_INFO(log_,
"Ask portion of blocks from peer {} for block {}",
*it,
block_info);
auto &peer_id = *it;
busy_peers_.emplace(peer_id);
peers.erase(peer_id);
auto handler = [wp{weak_from_this()}, peer_id](const auto &res) {
if (auto self = wp.lock()) {
if (self->busy_peers_.erase(peer_id) > 0) {
Expand All @@ -1199,43 +1207,6 @@ namespace kagome::network {
}
}
};
// TODO(ErakhtinB): #2326, review peer manager
// remove active_peers filling mechanism over peer states, when peer
// manager methods are implemented correctly
std::vector<libp2p::peer::PeerId> active_peers;
peer_manager_->enumeratePeerState(
[wp{weak_from_this()}, &active_peers, &peer_id](
const libp2p::peer::PeerId &peer, network::PeerState &state) {
if (auto self = wp.lock()) {
if (self->best_block_number_ <= state.best_block.number
and peer != peer_id) {
active_peers.push_back(peer);
}
}
return true;
});
std::vector<libp2p::peer::PeerId> selected_peers;
selected_peers.push_back(peer_id);
static const auto number_of_peers_to_add =
max_parallel_downloads_ ? max_parallel_downloads_ - 1 : 0;
if (const auto active_peers_size = active_peers.size();
active_peers_size <= number_of_peers_to_add) {
for (const auto &p_id : active_peers) {
selected_peers.push_back(p_id);
}
} else {
std::vector<uint32_t> indices;
indices.reserve(active_peers_size);
for (uint32_t i = 0; i < active_peers_size; ++i) {
indices.push_back(i);
}
std::random_device rd;
std::mt19937 gen(rd());
std::ranges::shuffle(indices, gen);
for (uint32_t i = 0; i < number_of_peers_to_add; ++i) {
selected_peers.push_back(active_peers[indices[i]]);
}
}
if (sync_method_ == application::SyncMethod::Full) {
auto lower = generations_.begin()->number;
auto upper = generations_.rbegin()->number + 1;
Expand All @@ -1252,10 +1223,7 @@ namespace kagome::network {
lower,
upper,
hint,
[wp{weak_from_this()},
peer_id,
handler = std::move(handler),
selected_peers = std::move(selected_peers)](
[wp{weak_from_this()}, peer_id, handler = std::move(handler)](
outcome::result<primitives::BlockInfo> res) {
if (auto self = wp.lock()) {
if (not res.has_value()) {
Expand All @@ -1267,18 +1235,24 @@ namespace kagome::network {
return;
}
auto &common_block_info = res.value();
for (const auto &p_id : selected_peers) {
self->loadBlocks(
p_id, common_block_info, std::move(handler));
}
SL_INFO(self->log_,
"Start to load next portion of blocks from {} "
"beginning from {}",
peer_id,
common_block_info);
self->loadBlocks(
peer_id, common_block_info, std::move(handler));
}
});
} else {
for (const auto &p_id : selected_peers) {
loadBlocks(p_id, block_info, std::move(handler));
}
SL_INFO(
log_,
"Start to load next portion of blocks from {} beginning from {}"
"block {}",
peer_id,
block_info);
loadBlocks(peer_id, block_info, std::move(handler));
}
return;
}

SL_TRACE(log_,
Expand Down
1 change: 0 additions & 1 deletion core/network/impl/synchronizer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@ namespace kagome::network {
primitives::events::ChainSubscriptionEnginePtr chain_sub_engine_;
std::shared_ptr<PoolHandlerReady> main_pool_handler_;
std::shared_ptr<blockchain::BlockStorage> block_storage_;
primitives::BlockNumber best_block_number_;
uint32_t max_parallel_downloads_;


Expand Down

0 comments on commit 99d72bb

Please sign in to comment.