Skip to content

Commit

Permalink
Maximum downloads are handled correct
Browse files Browse the repository at this point in the history
  • Loading branch information
ErakhtinB committed Dec 31, 2024
1 parent ab41109 commit b5630f4
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 25 deletions.
1 change: 0 additions & 1 deletion core/blockchain/impl/block_tree_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ namespace kagome::blockchain {
std::move(justification_storage_policy),
state_pruner,
main_thread_pool));

// Add non-finalized block to the block tree
for (auto &e : collected) {
const auto &block = e.first;
Expand Down
77 changes: 54 additions & 23 deletions core/network/impl/synchronizer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,31 +330,43 @@ namespace kagome::network {
return false;
}

// Block is already enqueued
if (auto it = known_blocks_.find(block_info.hash);
it != known_blocks_.end()) {
auto &block_in_queue = it->second;
block_in_queue.peers.emplace(peer_id);
return false;
}

// Already enough parallel downloads
{
std::shared_lock lock{load_blocks_mutex_};
if (auto it = load_blocks_.find(block_info); it != load_blocks_.end()) {
if (it->second >= max_parallel_downloads_) {
return false;
}
}
}

std::vector<libp2p::peer::PeerId> selected_peers = {peer_id};
std::vector<libp2p::peer::PeerId> active_peers;
peer_manager_->enumeratePeerState(
[&active_peers, &block_info, &peer_id](const PeerId &p_id,
PeerState &peer_state) {
if (peer_state.best_block >= block_info and p_id != peer_id) {
[&active_peers, &peer_id](const PeerId &p_id, PeerState &) {
if (p_id != peer_id) {
active_peers.push_back(p_id);
}
return true;
});
std::ranges::shuffle(active_peers, random_gen_);
for (auto it = active_peers.begin(); it != active_peers.end(); ++it) {
if (selected_peers.size() >= max_parallel_downloads_) {
break;
}
selected_peers.push_back(*it);
}
// Block is already enqueued
if (auto it = known_blocks_.find(block_info.hash);
it != known_blocks_.end()) {
auto &block_in_queue = it->second;
for (const auto &p_id : selected_peers) {
block_in_queue.peers.emplace(p_id);
}
return false;
static const auto peers_to_add_number =
max_parallel_downloads_ ? max_parallel_downloads_ - 1 : 0;
if (active_peers.size() <= peers_to_add_number) {
selected_peers.insert(
selected_peers.end(), active_peers.begin(), active_peers.end());
} else {
std::ranges::shuffle(active_peers, random_gen_);
selected_peers.insert(selected_peers.end(),
active_peers.begin(),
active_peers.begin() + peers_to_add_number);
}

// Number of provided block header greater currently watched.
Expand Down Expand Up @@ -580,12 +592,20 @@ namespace kagome::network {
return;
}

if (not load_blocks_.emplace(from).second) {
if (handler) {
handler(Error::ALREADY_IN_QUEUE);
{
std::unique_lock lock{load_blocks_mutex_};
if (auto [it, ok] = load_blocks_.emplace(from, 1); not ok) {
auto &requests_number = it->second;
if (requests_number >= max_parallel_downloads_) {
if (handler) {
handler(Error::ALREADY_IN_QUEUE);
}
return;
}
++requests_number;
}
return;
}

load_blocks_max_ = {from.number, now};

auto response_handler =
Expand All @@ -600,7 +620,16 @@ namespace kagome::network {
if (not self) {
return;
}
self->load_blocks_.erase(from);
{
std::unique_lock lock{self->load_blocks_mutex_};
if (auto it = self->load_blocks_.find(from);
it != self->load_blocks_.end()) {
auto &requests_number = it->second;
if (requests_number) {
--requests_number;
}
}
}

// Any error interrupts loading of blocks
if (response_res.has_error()) {
Expand Down Expand Up @@ -782,6 +811,8 @@ namespace kagome::network {
});
self->metric_import_queue_length_->set(
self->known_blocks_.size());
std::unique_lock lock{self->load_blocks_mutex_};
self->load_blocks_.erase(from);
} else {
it->second.peers.emplace(peer_id);
SL_TRACE(self->log_,
Expand Down
3 changes: 2 additions & 1 deletion core/network/impl/synchronizer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,8 @@ namespace kagome::network {

std::atomic_bool asking_blocks_portion_in_progress_ = false;
std::set<libp2p::peer::PeerId> busy_peers_;
std::unordered_set<primitives::BlockInfo> load_blocks_;
std::unordered_map<primitives::BlockInfo, uint32_t> load_blocks_;
std::shared_mutex load_blocks_mutex_;
std::pair<primitives::BlockNumber, std::chrono::milliseconds>
load_blocks_max_{};

Expand Down

0 comments on commit b5630f4

Please sign in to comment.