Skip to content

Commit

Permalink
Parallel blocks download
Browse files Browse the repository at this point in the history
  • Loading branch information
ErakhtinB committed Dec 26, 2024
1 parent 57eac0f commit 41a51d1
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 22 deletions.
2 changes: 2 additions & 0 deletions core/application/app_configuration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ namespace kagome::application {
const = 0;

virtual std::optional<PrecompileWasmConfig> precompileWasm() const = 0;

virtual uint32_t maxParallelDownloads() const = 0;
};

} // namespace kagome::application
18 changes: 12 additions & 6 deletions core/application/impl/app_configuration_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ namespace {
#endif
const uint32_t def_db_cache_size = 1024;
const uint32_t def_parachain_runtime_instance_cache_size = 100;
const uint32_t def_max_parallel_downloads = 5;

/**
* Generate once at run random node name if form of UUID
Expand Down Expand Up @@ -176,12 +177,11 @@ namespace {

static constexpr std::array<std::string_view,
1 + KAGOME_WASM_COMPILER_WASM_EDGE>
interpreters {
interpreters{
#if KAGOME_WASM_COMPILER_WASM_EDGE == 1
"WasmEdge",
"WasmEdge",
#endif
"Binaryen"
};
"Binaryen"};

static const std::string interpreters_str =
fmt::format("[{}]", fmt::join(interpreters, ", "));
Expand Down Expand Up @@ -841,6 +841,9 @@ namespace kagome::application {
("rpc-methods", po::value<std::string>(), R"("auto" (default), "unsafe", "safe")")
("no-mdns", po::bool_switch(), "(unused, zombienet stub)")
("prometheus-external", po::bool_switch(), "alias for \"--prometheus-host 0.0.0.0\"")
("max-parallel-downloads", po::value<uint32_t>()->default_value(def_max_parallel_downloads),
"Maximum number of peers from which to ask for the same blocks in parallel."
"This allows downloading announced blocks from multiple peers. Decrease to save traffic and risk increased latency.")
;

po::options_description development_desc("Additional options");
Expand Down Expand Up @@ -912,8 +915,8 @@ namespace kagome::application {
}

if (vm.count("help") > 0) {
std::cout
<< "Available subcommands: storage-explorer db-editor benchmark key\n";
std::cout << "Available subcommands: storage-explorer db-editor "
"benchmark key\n";
std::cout << desc << '\n';
return false;
}
Expand Down Expand Up @@ -1600,6 +1603,9 @@ namespace kagome::application {
runtime_exec_method_ = RuntimeExecutionMethod::Compile;
}

max_parallel_downloads_ =
find_argument<uint32_t>(vm, "max-parallel-downloads")
.value_or(def_max_parallel_downloads);
// if something wrong with config print help message
if (not validate_config()) {
std::cout << desc << '\n';
Expand Down
5 changes: 5 additions & 0 deletions core/application/impl/app_configuration_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ namespace kagome::application {
return precompile_wasm_;
}

uint32_t maxParallelDownloads() const override {
return max_parallel_downloads_;
}

private:
void parse_general_segment(const rapidjson::Value &val);
void parse_blockchain_segment(const rapidjson::Value &val);
Expand Down Expand Up @@ -382,6 +386,7 @@ namespace kagome::application {
std::max<size_t>(std::thread::hardware_concurrency(), 1)};
bool disable_secure_mode_{false};
std::optional<PrecompileWasmConfig> precompile_wasm_;
uint32_t max_parallel_downloads_;
};

} // namespace kagome::application
Expand Down
67 changes: 51 additions & 16 deletions core/network/impl/synchronizer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ namespace {

namespace kagome::network {

static constexpr auto default_max_peers_for_block_request = 5;

SynchronizerImpl::SynchronizerImpl(
const application::AppConfiguration &app_config,
application::AppStateManager &app_state_manager,
Expand Down Expand Up @@ -122,7 +124,8 @@ namespace kagome::network {
chain_sub_engine_(std::move(chain_sub_engine)),
main_pool_handler_{
poolHandlerReadyMake(app_state_manager, main_thread_pool)},
block_storage_{std::move(block_storage)} {
block_storage_{std::move(block_storage)},
best_block_number_{} {
BOOST_ASSERT(block_tree_);
BOOST_ASSERT(block_executor_);
BOOST_ASSERT(trie_node_db_);
Expand All @@ -137,6 +140,7 @@ namespace kagome::network {
BOOST_ASSERT(block_storage_);

sync_method_ = app_config.syncMethod();
max_parallel_downloads_ = app_config.maxParallelDownloads();

// Register metrics
metrics_registry_->registerGaugeFamily(
Expand Down Expand Up @@ -352,6 +356,8 @@ namespace kagome::network {
known_blocks_.find(header.parent_hash) != known_blocks_.end()
or block_tree_->has(header.parent_hash);

best_block_number_ = std::max(best_block_number_, header.number);

if (parent_is_known) {
loadBlocks(peer_id, block_info, [wp{weak_from_this()}](auto res) {
if (auto self = wp.lock()) {
Expand Down Expand Up @@ -1192,7 +1198,39 @@ namespace kagome::network {
}
}
};

// TODO: remove active_peers filling mechanism, when peer manager
// methods are implemented correctly
std::vector<libp2p::peer::PeerId> active_peers;
peer_manager_->enumeratePeerState(
[wp{weak_from_this()}, &active_peers, &peer_id](
const libp2p::peer::PeerId &peer, network::PeerState &state) {
if (auto self = wp.lock()) {
if (self->best_block_number_ <= state.best_block.number
and peer != peer_id) {
active_peers.push_back(peer);
}
}
return true;
});
std::vector<libp2p::peer::PeerId> selected_peers;
selected_peers.push_back(peer_id);
static const auto number_of_peers_to_add =
max_parallel_downloads_ ? max_parallel_downloads_ - 1 : 0;
if (const auto active_peers_size = active_peers.size();
active_peers_size <= number_of_peers_to_add) {
for (const auto &p_id : active_peers) {
selected_peers.push_back(p_id);
}
} else {
std::vector<uint32_t> indices(active_peers_size);
std::iota(indices.begin(), indices.end(), 0);
std::random_device rd;
std::mt19937 gen(rd());
std::ranges::shuffle(indices.begin(), indices.end(), gen);
for (uint32_t i = 0; i < number_of_peers_to_add; ++i) {
selected_peers.push_back(active_peers[indices[i]]);
}
}
if (sync_method_ == application::SyncMethod::Full) {
auto lower = generations_.begin()->number;
auto upper = generations_.rbegin()->number + 1;
Expand All @@ -1209,7 +1247,10 @@ namespace kagome::network {
lower,
upper,
hint,
[wp{weak_from_this()}, peer_id, handler = std::move(handler)](
[wp{weak_from_this()},
peer_id,
handler = std::move(handler),
selected_peers = std::move(selected_peers)](
outcome::result<primitives::BlockInfo> res) {
if (auto self = wp.lock()) {
if (not res.has_value()) {
Expand All @@ -1221,22 +1262,16 @@ namespace kagome::network {
return;
}
auto &common_block_info = res.value();
SL_DEBUG(self->log_,
"Start to load next portion of blocks from {} "
"since block {}",
peer_id,
common_block_info);
self->loadBlocks(
peer_id, common_block_info, std::move(handler));
for (const auto &p_id : selected_peers) {
self->loadBlocks(
p_id, common_block_info, std::move(handler));
}
}
});
} else {
SL_DEBUG(log_,
"Start to load next portion of blocks from {} "
"since block {}",
peer_id,
block_info);
loadBlocks(peer_id, block_info, std::move(handler));
for (const auto &p_id : selected_peers) {
loadBlocks(p_id, block_info, std::move(handler));
}
}
return;
}
Expand Down
3 changes: 3 additions & 0 deletions core/network/impl/synchronizer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ namespace kagome::network {
primitives::events::ChainSubscriptionEnginePtr chain_sub_engine_;
std::shared_ptr<PoolHandlerReady> main_pool_handler_;
std::shared_ptr<blockchain::BlockStorage> block_storage_;
primitives::BlockNumber best_block_number_;
uint32_t max_parallel_downloads_;


application::SyncMethod sync_method_;

Expand Down
2 changes: 2 additions & 0 deletions test/mock/core/application/app_configuration_mock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ namespace kagome::application {
precompileWasm,
(),
(const, override));

MOCK_METHOD(uint32_t, maxParallelDownloads, (), (const, override));
};

} // namespace kagome::application

0 comments on commit 41a51d1

Please sign in to comment.