diff --git a/core/application/app_configuration.hpp b/core/application/app_configuration.hpp index 6a5807553c..ec81119f58 100644 --- a/core/application/app_configuration.hpp +++ b/core/application/app_configuration.hpp @@ -325,6 +325,8 @@ namespace kagome::application { const = 0; virtual std::optional precompileWasm() const = 0; + + virtual uint32_t maxParallelDownloads() const = 0; }; } // namespace kagome::application diff --git a/core/application/impl/app_configuration_impl.cpp b/core/application/impl/app_configuration_impl.cpp index 7e34615377..6fd5eed680 100644 --- a/core/application/impl/app_configuration_impl.cpp +++ b/core/application/impl/app_configuration_impl.cpp @@ -115,6 +115,7 @@ namespace { #endif const uint32_t def_db_cache_size = 1024; const uint32_t def_parachain_runtime_instance_cache_size = 100; + const uint32_t def_max_parallel_downloads = 5; /** * Generate once at run random node name if form of UUID @@ -176,12 +177,11 @@ namespace { static constexpr std::array - interpreters { + interpreters{ #if KAGOME_WASM_COMPILER_WASM_EDGE == 1 - "WasmEdge", + "WasmEdge", #endif - "Binaryen" - }; + "Binaryen"}; static const std::string interpreters_str = fmt::format("[{}]", fmt::join(interpreters, ", ")); @@ -841,6 +841,9 @@ namespace kagome::application { ("rpc-methods", po::value(), R"("auto" (default), "unsafe", "safe")") ("no-mdns", po::bool_switch(), "(unused, zombienet stub)") ("prometheus-external", po::bool_switch(), "alias for \"--prometheus-host 0.0.0.0\"") + ("max-parallel-downloads", po::value()->default_value(def_max_parallel_downloads), + "Maximum number of peers from which to ask for the same blocks in parallel." + "This allows downloading announced blocks from multiple peers. Decrease to save traffic and risk increased latency.") ; po::options_description development_desc("Additional options"); @@ -912,8 +915,8 @@ namespace kagome::application { } if (vm.count("help") > 0) { - std::cout - << "Available subcommands: storage-explorer db-editor benchmark key\n"; + std::cout << "Available subcommands: storage-explorer db-editor " + "benchmark key\n"; std::cout << desc << '\n'; return false; } @@ -1600,6 +1603,9 @@ namespace kagome::application { runtime_exec_method_ = RuntimeExecutionMethod::Compile; } + max_parallel_downloads_ = + find_argument(vm, "max-parallel-downloads") + .value_or(def_max_parallel_downloads); // if something wrong with config print help message if (not validate_config()) { std::cout << desc << '\n'; diff --git a/core/application/impl/app_configuration_impl.hpp b/core/application/impl/app_configuration_impl.hpp index 2421b9f874..f676485dd9 100644 --- a/core/application/impl/app_configuration_impl.hpp +++ b/core/application/impl/app_configuration_impl.hpp @@ -239,6 +239,10 @@ namespace kagome::application { return precompile_wasm_; } + uint32_t maxParallelDownloads() const override { + return max_parallel_downloads_; + } + private: void parse_general_segment(const rapidjson::Value &val); void parse_blockchain_segment(const rapidjson::Value &val); @@ -382,6 +386,7 @@ namespace kagome::application { std::max(std::thread::hardware_concurrency(), 1)}; bool disable_secure_mode_{false}; std::optional precompile_wasm_; + uint32_t max_parallel_downloads_; }; } // namespace kagome::application diff --git a/core/network/impl/synchronizer_impl.cpp b/core/network/impl/synchronizer_impl.cpp index a2f9292b37..1857af6fd6 100644 --- a/core/network/impl/synchronizer_impl.cpp +++ b/core/network/impl/synchronizer_impl.cpp @@ -86,6 +86,8 @@ namespace { namespace kagome::network { + static constexpr auto default_max_peers_for_block_request = 5; + SynchronizerImpl::SynchronizerImpl( const application::AppConfiguration &app_config, application::AppStateManager &app_state_manager, @@ -122,7 +124,8 @@ namespace kagome::network { chain_sub_engine_(std::move(chain_sub_engine)), main_pool_handler_{ poolHandlerReadyMake(app_state_manager, main_thread_pool)}, - block_storage_{std::move(block_storage)} { + block_storage_{std::move(block_storage)}, + best_block_number_{} { BOOST_ASSERT(block_tree_); BOOST_ASSERT(block_executor_); BOOST_ASSERT(trie_node_db_); @@ -137,6 +140,7 @@ namespace kagome::network { BOOST_ASSERT(block_storage_); sync_method_ = app_config.syncMethod(); + max_parallel_downloads_ = app_config.maxParallelDownloads(); // Register metrics metrics_registry_->registerGaugeFamily( @@ -352,6 +356,8 @@ namespace kagome::network { known_blocks_.find(header.parent_hash) != known_blocks_.end() or block_tree_->has(header.parent_hash); + best_block_number_ = std::max(best_block_number_, header.number); + if (parent_is_known) { loadBlocks(peer_id, block_info, [wp{weak_from_this()}](auto res) { if (auto self = wp.lock()) { @@ -1192,7 +1198,39 @@ namespace kagome::network { } } }; - + // TODO: remove active_peers filling mechanism, when peer manager + // methods are implemented correctly + std::vector active_peers; + peer_manager_->enumeratePeerState( + [wp{weak_from_this()}, &active_peers, &peer_id]( + const libp2p::peer::PeerId &peer, network::PeerState &state) { + if (auto self = wp.lock()) { + if (self->best_block_number_ <= state.best_block.number + and peer != peer_id) { + active_peers.push_back(peer); + } + } + return true; + }); + std::vector selected_peers; + selected_peers.push_back(peer_id); + static const auto number_of_peers_to_add = + max_parallel_downloads_ ? max_parallel_downloads_ - 1 : 0; + if (const auto active_peers_size = active_peers.size(); + active_peers_size <= number_of_peers_to_add) { + for (const auto &p_id : active_peers) { + selected_peers.push_back(p_id); + } + } else { + std::vector indices(active_peers_size); + std::iota(indices.begin(), indices.end(), 0); + std::random_device rd; + std::mt19937 gen(rd()); + std::ranges::shuffle(indices.begin(), indices.end(), gen); + for (uint32_t i = 0; i < number_of_peers_to_add; ++i) { + selected_peers.push_back(active_peers[indices[i]]); + } + } if (sync_method_ == application::SyncMethod::Full) { auto lower = generations_.begin()->number; auto upper = generations_.rbegin()->number + 1; @@ -1209,7 +1247,10 @@ namespace kagome::network { lower, upper, hint, - [wp{weak_from_this()}, peer_id, handler = std::move(handler)]( + [wp{weak_from_this()}, + peer_id, + handler = std::move(handler), + selected_peers = std::move(selected_peers)]( outcome::result res) { if (auto self = wp.lock()) { if (not res.has_value()) { @@ -1221,22 +1262,16 @@ namespace kagome::network { return; } auto &common_block_info = res.value(); - SL_DEBUG(self->log_, - "Start to load next portion of blocks from {} " - "since block {}", - peer_id, - common_block_info); - self->loadBlocks( - peer_id, common_block_info, std::move(handler)); + for (const auto &p_id : selected_peers) { + self->loadBlocks( + p_id, common_block_info, std::move(handler)); + } } }); } else { - SL_DEBUG(log_, - "Start to load next portion of blocks from {} " - "since block {}", - peer_id, - block_info); - loadBlocks(peer_id, block_info, std::move(handler)); + for (const auto &p_id : selected_peers) { + loadBlocks(p_id, block_info, std::move(handler)); + } } return; } diff --git a/core/network/impl/synchronizer_impl.hpp b/core/network/impl/synchronizer_impl.hpp index 6f932976c4..65664160f0 100644 --- a/core/network/impl/synchronizer_impl.hpp +++ b/core/network/impl/synchronizer_impl.hpp @@ -261,6 +261,9 @@ namespace kagome::network { primitives::events::ChainSubscriptionEnginePtr chain_sub_engine_; std::shared_ptr main_pool_handler_; std::shared_ptr block_storage_; + primitives::BlockNumber best_block_number_; + uint32_t max_parallel_downloads_; + application::SyncMethod sync_method_; diff --git a/test/mock/core/application/app_configuration_mock.hpp b/test/mock/core/application/app_configuration_mock.hpp index 3fa0d2761e..8e67103f16 100644 --- a/test/mock/core/application/app_configuration_mock.hpp +++ b/test/mock/core/application/app_configuration_mock.hpp @@ -197,6 +197,8 @@ namespace kagome::application { precompileWasm, (), (const, override)); + + MOCK_METHOD(uint32_t, maxParallelDownloads, (), (const, override)); }; } // namespace kagome::application