Skip to content

Commit

Permalink
libp2p dial order (#2319)
Browse files Browse the repository at this point in the history
Signed-off-by: turuslan <[email protected]>
Co-authored-by: Igor Egorov <[email protected]>
  • Loading branch information
turuslan and igor-egorov authored Dec 24, 2024
1 parent 39f55a2 commit 78779b9
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 51 deletions.
1 change: 1 addition & 0 deletions .thread-sanitizer-ignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
race:soralog::Logger::push
deadlock:boost::di::v1_1_0::wrappers::shared
4 changes: 2 additions & 2 deletions cmake/Hunter/config.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ hunter_config(

hunter_config(
libp2p
URL https://github.com/libp2p/cpp-libp2p/archive/b183f881917e69c84f43099efcbbde894625376e.zip
SHA1 15ad9926f7b9dad339826ad30616e9ec415e5cb8
URL https://github.com/libp2p/cpp-libp2p/archive/c3e6cce18335c989c9bbf3485885630a6ba463e4.zip
SHA1 32698ef4c3d373a39f87e7acb60eb7dc39399653
)

hunter_config(
Expand Down
3 changes: 3 additions & 0 deletions cmake/functions.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ function(addtest test_name)
LIBRARY_OUTPUT_PATH ${CMAKE_BINARY_DIR}/test_lib
)
disable_clang_tidy(${test_name})
if(KAGOME_CTEST_ENV)
set_tests_properties(${test_name} PROPERTIES ENVIRONMENT "${KAGOME_CTEST_ENV}")
endif()
endfunction()

function(addtest_part test_name)
Expand Down
2 changes: 1 addition & 1 deletion cmake/toolchain/flags/sanitize_thread.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ include(${CMAKE_CURRENT_LIST_DIR}/../../add_cache_flag.cmake)

set(TSAN_IGNORELIST "${CMAKE_CURRENT_LIST_DIR}/../../../.thread-sanitizer-ignore")

set(ENV{TSAN_OPTIONS} "suppressions=${TSAN_IGNORELIST}")
list(APPEND KAGOME_CTEST_ENV "TSAN_OPTIONS=suppressions=${TSAN_IGNORELIST}")

set(FLAGS
-fsanitize=thread
Expand Down
9 changes: 7 additions & 2 deletions core/application/app_configuration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ namespace kagome::application {
*/
virtual uint32_t luckyPeers() const = 0;

/**
* @return maximum number of peer connections
*/
virtual uint32_t maxPeers() const = 0;

/**
* @return multiaddresses of bootstrat nodes
*/
Expand Down Expand Up @@ -213,8 +218,8 @@ namespace kagome::application {
* List of telemetry endpoints specified via CLI argument or config file
* @return a vector of parsed telemetry endpoints
*/
virtual const std::vector<telemetry::TelemetryEndpoint> &
telemetryEndpoints() const = 0;
virtual const std::vector<telemetry::TelemetryEndpoint>
&telemetryEndpoints() const = 0;

/**
* @return enum constant of the chosen sync method
Expand Down
14 changes: 10 additions & 4 deletions core/application/impl/app_configuration_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ namespace {
const uint32_t def_in_peers = 75;
const uint32_t def_in_peers_light = 100;
const auto def_lucky_peers = 4;
const auto def_max_peers = 1000;
const uint32_t def_random_walk_interval = 15;
const auto def_full_sync = "Full";
const auto def_wasm_execution = "Interpreted";
Expand Down Expand Up @@ -180,7 +181,7 @@ namespace {
#if KAGOME_WASM_COMPILER_WASM_EDGE == 1
"WasmEdge",
#endif
"Binaryen"
"Binaryen"
};

static const std::string interpreters_str =
Expand Down Expand Up @@ -282,6 +283,7 @@ namespace kagome::application {
in_peers_(def_in_peers),
in_peers_light_(def_in_peers_light),
lucky_peers_(def_lucky_peers),
max_peers_(def_max_peers),
dev_mode_(def_dev_mode),
node_name_(randomNodeName()),
node_version_(buildVersion()),
Expand Down Expand Up @@ -495,6 +497,7 @@ namespace kagome::application {
load_u32(val, "in-peers", in_peers_);
load_u32(val, "in-peers-light", in_peers_light_);
load_u32(val, "lucky-peers", lucky_peers_);
load_u32(val, "max-peers", max_peers_);
load_telemetry_uris(val, "telemetry-endpoints", telemetry_endpoints_);
load_u32(val, "random-walk-interval", random_walk_interval_);
}
Expand Down Expand Up @@ -827,7 +830,8 @@ namespace kagome::application {
("out-peers", po::value<uint32_t>()->default_value(def_out_peers), "number of outgoing connections we're trying to maintain")
("in-peers", po::value<uint32_t>()->default_value(def_in_peers), "maximum number of inbound full nodes peers")
("in-peers-light", po::value<uint32_t>()->default_value(def_in_peers_light), "maximum number of inbound light nodes peers")
("lucky-peers", po::value<int32_t>()->default_value(def_lucky_peers), "number of \"lucky\" peers (peers that are being gossiped to). -1 for broadcast." )
("lucky-peers", po::value<uint32_t>()->default_value(def_lucky_peers), "number of \"lucky\" peers (peers that are being gossiped to). -1 for full broadcast." )
("max-peers", po::value<uint32_t>()->default_value(def_max_peers), "maximum number of peer connections" )
("max-blocks-in-response", po::value<uint32_t>(), "max block per response while syncing")
("name", po::value<std::string>(), "the human-readable name for this node")
("no-telemetry", po::bool_switch(), "Disables telemetry broadcasting")
Expand Down Expand Up @@ -912,8 +916,8 @@ namespace kagome::application {
}

if (vm.count("help") > 0) {
std::cout
<< "Available subcommands: storage-explorer db-editor benchmark key\n";
std::cout << "Available subcommands: storage-explorer db-editor "
"benchmark key\n";
std::cout << desc << '\n';
return false;
}
Expand Down Expand Up @@ -1329,6 +1333,8 @@ namespace kagome::application {
find_argument<int32_t>(
vm, "lucky-peers", [&](int32_t val) { lucky_peers_ = val; });

max_peers_ = vm.at("max-peers").as<uint32_t>();

find_argument<uint32_t>(vm, "ws-max-connections", [&](uint32_t val) {
max_ws_connections_ = val;
});
Expand Down
4 changes: 4 additions & 0 deletions core/application/impl/app_configuration_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ namespace kagome::application {
uint32_t luckyPeers() const override {
return lucky_peers_;
}
uint32_t maxPeers() const override {
return max_peers_;
}
const boost::asio::ip::tcp::endpoint &rpcEndpoint() const override {
return rpc_endpoint_;
}
Expand Down Expand Up @@ -348,6 +351,7 @@ namespace kagome::application {
uint32_t in_peers_;
uint32_t in_peers_light_;
uint32_t lucky_peers_;
uint32_t max_peers_;
network::PeeringConfig peering_config_;
bool dev_mode_;
std::string node_name_;
Expand Down
79 changes: 44 additions & 35 deletions core/network/impl/peer_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ OUTCOME_CPP_DEFINE_CATEGORY(kagome::network, PeerManagerImpl::Error, e) {
}

namespace kagome::network {
constexpr auto kLibp2pCollectGarbage = std::chrono::seconds{30};

PeerManagerImpl::PeerManagerImpl(
std::shared_ptr<application::AppStateManager> app_state_manager,
libp2p::Host &host,
Expand Down Expand Up @@ -276,8 +278,6 @@ namespace kagome::network {
void PeerManagerImpl::align() {
SL_TRACE(log_, "Try to align peers number");

const auto hard_limit = app_config_.inPeers() + app_config_.inPeersLight()
+ app_config_.outPeers();
const auto peer_ttl = app_config_.peeringConfig().peerTtl;

align_timer_.reset();
Expand Down Expand Up @@ -332,7 +332,7 @@ namespace kagome::network {
});

for (; !peers_list.empty()
&& (peers_list.size() > hard_limit
&& (peers_list.size() > app_config_.maxPeers()
|| peers_list.back().first
== std::numeric_limits<PriorityType>::min());
peers_list.pop_back()) {
Expand Down Expand Up @@ -426,42 +426,38 @@ namespace kagome::network {
SL_DEBUG(log_, " address: {}", addr.getStringAddress());
}

host_.connect(
peer_info,
[wp{weak_from_this()}, peer_id](auto res) mutable {
auto self = wp.lock();
if (not self) {
return;
}
host_.connect(peer_info, [wp{weak_from_this()}, peer_id](auto res) mutable {
auto self = wp.lock();
if (not self) {
return;
}

if (not res.has_value()) {
SL_DEBUG(self->log_,
"Connecting to peer {} is failed: {}",
peer_id,
res.error());
self->connecting_peers_.erase(peer_id);
return;
}
if (not res.has_value()) {
SL_DEBUG(self->log_,
"Connecting to peer {} is failed: {}",
peer_id,
res.error());
self->connecting_peers_.erase(peer_id);
return;
}

auto &connection = res.value();
auto remote_peer_id_res = connection->remotePeer();
if (not remote_peer_id_res.has_value()) {
SL_DEBUG(
self->log_,
"Connected, but not identified yet (expecting peer_id={:l})",
peer_id);
self->connecting_peers_.erase(peer_id);
return;
}
auto &connection = res.value();
auto remote_peer_id_res = connection->remotePeer();
if (not remote_peer_id_res.has_value()) {
SL_DEBUG(self->log_,
"Connected, but not identified yet (expecting peer_id={:l})",
peer_id);
self->connecting_peers_.erase(peer_id);
return;
}

auto &remote_peer_id = remote_peer_id_res.value();
if (remote_peer_id == peer_id) {
SL_DEBUG(self->log_, "Connected to peer {}", peer_id);
auto &remote_peer_id = remote_peer_id_res.value();
if (remote_peer_id == peer_id) {
SL_DEBUG(self->log_, "Connected to peer {}", peer_id);

self->processFullyConnectedPeer(peer_id);
}
},
kTimeoutForConnecting);
self->processFullyConnectedPeer(peer_id);
}
});
}

void PeerManagerImpl::disconnectFromPeer(const PeerId &peer_id) {
Expand Down Expand Up @@ -759,4 +755,17 @@ namespace kagome::network {
}
return std::nullopt;
}

void PeerManagerImpl::collectGarbage() {
host_.getNetwork().getConnectionManager().collectGarbage();
host_.getPeerRepository().getAddressRepository().collectGarbage();
host_.getPeerRepository().getProtocolRepository().collectGarbage();

scheduler_->schedule(
[WEAK_SELF] {
WEAK_LOCK(self);
self->collectGarbage();
},
kLibp2pCollectGarbage);
}
} // namespace kagome::network
4 changes: 2 additions & 2 deletions core/network/impl/peer_manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ namespace kagome::network {
class PeerManagerImpl : public PeerManager,
public std::enable_shared_from_this<PeerManagerImpl> {
public:
static constexpr std::chrono::seconds kTimeoutForConnecting{15};

enum class Error { UNDECLARED_COLLATOR = 1 };

PeerManagerImpl(
Expand Down Expand Up @@ -168,6 +166,8 @@ namespace kagome::network {
using IsLight = Tagged<bool, struct IsLightTag>;
size_t countPeers(PeerType in_out, IsLight in_light = false) const;

void collectGarbage();

log::Logger log_;

libp2p::Host &host_;
Expand Down
3 changes: 1 addition & 2 deletions core/network/impl/protocols/request_response_protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,7 @@ namespace kagome::network {
self->protocolName(),
peer_id);
cb(std::move(stream));
},
timeout_);
});
}

template <typename M>
Expand Down
6 changes: 3 additions & 3 deletions test/core/network/rpc_libp2p_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ TEST_F(RpcLibp2pTest, ReadWithoutResponse) {
*/
TEST_F(RpcLibp2pTest, WriteWithResponse) {
EXPECT_CALL(host_,
newStream(peer_info_.id, libp2p::StreamProtocols{protocol_}, _))
newStream(peer_info_, libp2p::StreamProtocols{protocol_}, _))
.WillOnce(
testing::InvokeArgument<2>(StreamAndProtocol{stream_, protocol_}));

Expand Down Expand Up @@ -165,7 +165,7 @@ TEST_F(RpcLibp2pTest, WriteWithResponse) {
*/
TEST_F(RpcLibp2pTest, WriteWithResponseErroredResponse) {
EXPECT_CALL(host_,
newStream(peer_info_.id, libp2p::StreamProtocols{protocol_}, _))
newStream(peer_info_, libp2p::StreamProtocols{protocol_}, _))
.WillOnce(
testing::InvokeArgument<2>(StreamAndProtocol{stream_, protocol_}));

Expand Down Expand Up @@ -195,7 +195,7 @@ TEST_F(RpcLibp2pTest, WriteWithResponseErroredResponse) {
*/
TEST_F(RpcLibp2pTest, WriteWithoutResponse) {
EXPECT_CALL(host_,
newStream(peer_info_.id, libp2p::StreamProtocols{protocol_}, _))
newStream(peer_info_, libp2p::StreamProtocols{protocol_}, _))
.WillOnce(
testing::InvokeArgument<2>(StreamAndProtocol{stream_, protocol_}));

Expand Down
2 changes: 2 additions & 0 deletions test/mock/core/application/app_configuration_mock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ namespace kagome::application {

MOCK_METHOD(uint32_t, luckyPeers, (), (const, override));

MOCK_METHOD(uint32_t, maxPeers, (), (const, override));

MOCK_METHOD(bool, isTelemetryEnabled, (), (const, override));

MOCK_METHOD(std::optional<size_t>,
Expand Down

0 comments on commit 78779b9

Please sign in to comment.