From 43a575e2895599df2081a5ee12443bfddb2421aa Mon Sep 17 00:00:00 2001 From: Colin LeMahieu Date: Thu, 9 May 2024 12:56:01 +0100 Subject: [PATCH 1/3] Add ability for tcp_server to pause if its request queue is full. When a message is received from the network and placed in to the fair_queue, the queue is checked to see if the message queue is full for this channel. If full, the channel is placed in a paused state where new messages are not received. When the message_processor processes items for a channel, it signals the associated channel to resume if it was in a paused state. --- nano/core_test/network.cpp | 2 +- nano/core_test/request_aggregator.cpp | 14 ++++++------ nano/core_test/socket.cpp | 2 +- nano/lib/logging_enums.hpp | 6 ++++- nano/node/bootstrap/bootstrap_connections.cpp | 2 +- nano/node/fair_queue.hpp | 11 ++++++++++ nano/node/message_processor.cpp | 17 +++++++++++--- nano/node/message_processor.hpp | 4 ++-- nano/node/transport/channel.hpp | 13 ++++++++++- nano/node/transport/tcp.cpp | 22 +++++++++++++++---- nano/node/transport/tcp.hpp | 5 ++++- nano/node/transport/tcp_server.cpp | 3 +-- nano/node/transport/tcp_server.hpp | 1 + 13 files changed, 78 insertions(+), 24 deletions(-) diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index d84ac7da05..d784f93693 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -607,7 +607,7 @@ TEST (tcp_listener, tcp_listener_timeout_node_id_handshake) ASSERT_TRUE (cookie); nano::node_id_handshake::query_payload query{ *cookie }; nano::node_id_handshake node_id_handshake{ nano::dev::network_params.network, query }; - auto channel = std::make_shared (*node0, socket); + auto channel = std::make_shared (*node0, socket, std::weak_ptr{}); socket->async_connect (node0->tcp_listener.endpoint (), [&node_id_handshake, channel] (boost::system::error_code const & ec) { ASSERT_FALSE (ec); channel->send (node_id_handshake, [] (boost::system::error_code const & ec, size_t size_a) { diff --git a/nano/core_test/request_aggregator.cpp b/nano/core_test/request_aggregator.cpp index 316ea8f079..1948618a5c 100644 --- a/nano/core_test/request_aggregator.cpp +++ b/nano/core_test/request_aggregator.cpp @@ -37,7 +37,7 @@ TEST (request_aggregator, one) std::vector> request; request.emplace_back (send1->hash (), send1->root ()); auto client = std::make_shared (node); - std::shared_ptr dummy_channel = std::make_shared (node, client); + std::shared_ptr dummy_channel = std::make_shared (node, client, std::weak_ptr{}); node.aggregator.request (request, dummy_channel); ASSERT_TIMELY (3s, node.aggregator.empty ()); // Not yet in the ledger @@ -103,7 +103,7 @@ TEST (request_aggregator, one_update) std::vector> request; request.emplace_back (send2->hash (), send2->root ()); auto client = std::make_shared (node); - std::shared_ptr dummy_channel = std::make_shared (node, client); + std::shared_ptr dummy_channel = std::make_shared (node, client, std::weak_ptr{}); node.aggregator.request (request, dummy_channel); request.clear (); request.emplace_back (receive1->hash (), receive1->root ()); @@ -169,7 +169,7 @@ TEST (request_aggregator, two) request.emplace_back (send2->hash (), send2->root ()); request.emplace_back (receive1->hash (), receive1->root ()); auto client = std::make_shared (node); - std::shared_ptr dummy_channel = std::make_shared (node, client); + std::shared_ptr dummy_channel = std::make_shared (node, client, std::weak_ptr{}); // Process both blocks node.aggregator.request (request, dummy_channel); // One vote should be generated for both blocks @@ -288,7 +288,7 @@ TEST (request_aggregator, split) ASSERT_TIMELY_EQ (5s, max_vbh + 2, node.ledger.cemented_count ()); ASSERT_EQ (max_vbh + 1, request.size ()); auto client = std::make_shared (node); - std::shared_ptr dummy_channel = std::make_shared (node, client); + std::shared_ptr dummy_channel = std::make_shared (node, client, std::weak_ptr{}); node.aggregator.request (request, dummy_channel); // In the ledger but no vote generated yet ASSERT_TIMELY_EQ (3s, 2, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes)); @@ -327,7 +327,7 @@ TEST (request_aggregator, channel_max_queue) std::vector> request; request.emplace_back (send1->hash (), send1->root ()); auto client = std::make_shared (node); - std::shared_ptr dummy_channel = std::make_shared (node, client); + std::shared_ptr dummy_channel = std::make_shared (node, client, std::weak_ptr{}); node.aggregator.request (request, dummy_channel); node.aggregator.request (request, dummy_channel); ASSERT_LT (0, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped)); @@ -356,7 +356,7 @@ TEST (request_aggregator, DISABLED_unique) std::vector> request; request.emplace_back (send1->hash (), send1->root ()); auto client = std::make_shared (node); - std::shared_ptr dummy_channel = std::make_shared (node, client); + std::shared_ptr dummy_channel = std::make_shared (node, client, std::weak_ptr{}); node.aggregator.request (request, dummy_channel); node.aggregator.request (request, dummy_channel); node.aggregator.request (request, dummy_channel); @@ -401,7 +401,7 @@ TEST (request_aggregator, cannot_vote) // Incorrect hash, correct root request.emplace_back (1, send2->root ()); auto client = std::make_shared (node); - std::shared_ptr dummy_channel = std::make_shared (node, client); + std::shared_ptr dummy_channel = std::make_shared (node, client, std::weak_ptr{}); node.aggregator.request (request, dummy_channel); ASSERT_TIMELY (3s, node.aggregator.empty ()); ASSERT_EQ (1, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted)); diff --git a/nano/core_test/socket.cpp b/nano/core_test/socket.cpp index 971887e530..d942999b87 100644 --- a/nano/core_test/socket.cpp +++ b/nano/core_test/socket.cpp @@ -422,7 +422,7 @@ TEST (socket, drop_policy) }); auto client = std::make_shared (*node); - auto channel = std::make_shared (*node, client); + auto channel = std::make_shared (*node, client, std::weak_ptr{}); std::atomic completed_writes{ 0 }; diff --git a/nano/lib/logging_enums.hpp b/nano/lib/logging_enums.hpp index f78371b799..44ed767d43 100644 --- a/nano/lib/logging_enums.hpp +++ b/nano/lib/logging_enums.hpp @@ -158,6 +158,10 @@ enum class detail frontier_req, bulk_pull_account, + // message_processor + channel_pause, + channel_resume, + _last // Must be the last enum }; @@ -216,4 +220,4 @@ struct magic_enum::customize::enum_range { static constexpr int min = 0; static constexpr int max = 512; -}; \ No newline at end of file +}; diff --git a/nano/node/bootstrap/bootstrap_connections.cpp b/nano/node/bootstrap/bootstrap_connections.cpp index 36344df9e1..6b6fb3cca1 100644 --- a/nano/node/bootstrap/bootstrap_connections.cpp +++ b/nano/node/bootstrap/bootstrap_connections.cpp @@ -160,7 +160,7 @@ void nano::bootstrap_connections::connect_client (nano::tcp_endpoint const & end { this_l->node.logger.debug (nano::log::type::bootstrap, "Connection established to: {}", nano::util::to_str (endpoint_a)); - auto client (std::make_shared (this_l->node.shared (), std::make_shared (*this_l->node.shared (), socket), socket)); + auto client (std::make_shared (this_l->node.shared (), std::make_shared (*this_l->node.shared (), socket, std::weak_ptr{}), socket)); this_l->pool_connection (client, true, push_front); } else diff --git a/nano/node/fair_queue.hpp b/nano/node/fair_queue.hpp index 1a8561d6f9..de4f5f136b 100644 --- a/nano/node/fair_queue.hpp +++ b/nano/node/fair_queue.hpp @@ -161,6 +161,11 @@ class fair_queue final { return requests.size (); } + + bool full () const + { + return requests.size () == max_size; + } }; public: @@ -192,6 +197,12 @@ class fair_queue final return total_size; }; + bool full (origin_type source) const + { + auto it = queues.find (source); + return it == queues.end () ? false : it->second.full (); + } + bool empty () const { return size () == 0; diff --git a/nano/node/message_processor.cpp b/nano/node/message_processor.cpp index 16606f594c..87d79f2b90 100644 --- a/nano/node/message_processor.cpp +++ b/nano/node/message_processor.cpp @@ -76,7 +76,7 @@ void nano::message_processor::stop () threads.clear (); } -bool nano::message_processor::put (std::unique_ptr message, std::shared_ptr const & channel) +void nano::message_processor::put (std::unique_ptr message, std::shared_ptr const & channel) { release_assert (message != nullptr); release_assert (channel != nullptr); @@ -87,6 +87,12 @@ bool nano::message_processor::put (std::unique_ptr message, std:: { nano::lock_guard guard{ mutex }; added = queue.push ({ std::move (message), channel }, { nano::no_value{}, channel }); + if (queue.full ({ nano::no_value{}, channel })) + { + node.logger.trace (nano::log::type::message_processor, nano::log::detail::channel_pause, + nano::log::arg{ "channel", channel }); + channel->pause (); + } } if (added) { @@ -100,7 +106,6 @@ bool nano::message_processor::put (std::unique_ptr message, std:: stats.inc (nano::stat::type::message_processor, nano::stat::detail::overfill); stats.inc (nano::stat::type::message_processor_overfill, to_stat_detail (type)); } - return added; } void nano::message_processor::run () @@ -143,6 +148,12 @@ void nano::message_processor::run_batch (nano::unique_lock & lock) { auto const & [message, channel] = entry; release_assert (message != nullptr); + auto resumed = channel->resume_maybe (); + if (resumed) + { + node.logger.trace (nano::log::type::message_processor, nano::log::detail::channel_resume, + nano::log::arg{ "channel", channel }); + } process (*message, channel); } @@ -313,4 +324,4 @@ nano::error nano::message_processor_config::deserialize (nano::tomlconfig & toml toml.get ("max_queue", max_queue); return toml.get_error (); -} \ No newline at end of file +} diff --git a/nano/node/message_processor.hpp b/nano/node/message_processor.hpp index e160b3396a..e575b1d816 100644 --- a/nano/node/message_processor.hpp +++ b/nano/node/message_processor.hpp @@ -33,7 +33,7 @@ class message_processor final void start (); void stop (); - bool put (std::unique_ptr, std::shared_ptr const &); + void put (std::unique_ptr, std::shared_ptr const &); void process (nano::message const &, std::shared_ptr const &); std::unique_ptr collect_container_info (std::string const & name); @@ -57,4 +57,4 @@ class message_processor final nano::condition_variable condition; std::vector threads; }; -} \ No newline at end of file +} diff --git a/nano/node/transport/channel.hpp b/nano/node/transport/channel.hpp index 996884a031..ca3db36c37 100644 --- a/nano/node/transport/channel.hpp +++ b/nano/node/transport/channel.hpp @@ -127,6 +127,16 @@ class channel network_version = network_version_a; } + void pause () + { + paused = true; + } + + virtual bool resume_maybe () + { + return paused.exchange (false); + } + nano::endpoint get_peering_endpoint () const; void set_peering_endpoint (nano::endpoint endpoint); @@ -139,6 +149,7 @@ class channel boost::optional node_id{ boost::none }; std::atomic network_version{ 0 }; std::optional peering_endpoint{}; + std::atomic paused{ false }; protected: nano::node & node; @@ -146,4 +157,4 @@ class channel public: // Logging virtual void operator() (nano::object_stream &) const; }; -} \ No newline at end of file +} diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 5c60ed4b32..116c2329e4 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -11,9 +11,10 @@ * channel_tcp */ -nano::transport::channel_tcp::channel_tcp (nano::node & node_a, std::weak_ptr socket_a) : - channel (node_a), - socket (std::move (socket_a)) +nano::transport::channel_tcp::channel_tcp (nano::node & node_a, std::weak_ptr socket_a, std::weak_ptr server) : + channel{ node_a }, + socket{ std::move (socket_a) }, + server{ server } { } @@ -103,6 +104,19 @@ void nano::transport::channel_tcp::operator() (nano::object_stream & obs) const obs.write ("socket", socket); } +bool nano::transport::channel_tcp::resume_maybe () +{ + bool result; + if ((result = channel::resume_maybe ())) + { + if (auto server = this->server.lock ()) + { + server->receive_message (); + } + } + return result; +} + /* * tcp_channels */ @@ -219,7 +233,7 @@ std::shared_ptr nano::transport::tcp_channels::cre fmt::streamed (socket->remote_endpoint ()), node_id.to_node_id ()); - auto channel = std::make_shared (node, socket); + auto channel = std::make_shared (node, socket, server); channel->update_endpoints (); channel->set_node_id (node_id); diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index f3965d6746..b3c87fc1b2 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -31,7 +31,7 @@ namespace transport friend class nano::transport::tcp_channels; public: - channel_tcp (nano::node &, std::weak_ptr); + channel_tcp (nano::node &, std::weak_ptr, std::weak_ptr server); ~channel_tcp () override; void update_endpoints (); @@ -90,8 +90,11 @@ namespace transport } } + bool resume_maybe () override; + public: std::weak_ptr socket; + std::weak_ptr server; private: nano::endpoint endpoint; diff --git a/nano/node/transport/tcp_server.cpp b/nano/node/transport/tcp_server.cpp index 2287e53a43..25a8f976ac 100644 --- a/nano/node/transport/tcp_server.cpp +++ b/nano/node/transport/tcp_server.cpp @@ -254,8 +254,7 @@ void nano::transport::tcp_server::queue_realtime (std::unique_ptr channel->set_last_packet_received (std::chrono::steady_clock::now ()); - bool added = node->message_processor.put (std::move (message), channel); - // TODO: Throttle if not added + node->message_processor.put (std::move (message), channel); } auto nano::transport::tcp_server::process_handshake (nano::node_id_handshake const & message) -> handshake_status diff --git a/nano/node/transport/tcp_server.hpp b/nano/node/transport/tcp_server.hpp index 0ce95cdefe..f7c42daf5d 100644 --- a/nano/node/transport/tcp_server.hpp +++ b/nano/node/transport/tcp_server.hpp @@ -134,6 +134,7 @@ class tcp_server final : public std::enable_shared_from_this std::shared_ptr server; }; + friend class channel_tcp; friend class handshake_message_visitor; }; } From b6b5d53d38af61dbc4bbade4b190eced423e210c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sat, 11 May 2024 16:24:43 +0200 Subject: [PATCH 2/3] Simplify system.add_node checks --- nano/test_common/system.cpp | 32 ++++++-------------------------- 1 file changed, 6 insertions(+), 26 deletions(-) diff --git a/nano/test_common/system.cpp b/nano/test_common/system.cpp index ce6950e896..bf0e54e1b9 100644 --- a/nano/test_common/system.cpp +++ b/nano/test_common/system.cpp @@ -135,12 +135,6 @@ std::shared_ptr nano::test::system::add_node (nano::node_config cons auto starting_size_1 = node1->network.size (); auto starting_size_2 = node2->network.size (); - auto starting_realtime_1 = node1->tcp_listener.realtime_count (); - auto starting_realtime_2 = node2->tcp_listener.realtime_count (); - - auto starting_keepalives_1 = node1->stats.count (stat::type::message, stat::detail::keepalive, stat::dir::in); - auto starting_keepalives_2 = node2->stats.count (stat::type::message, stat::detail::keepalive, stat::dir::in); - logger.debug (nano::log::type::system, "Connecting nodes: {} and {}", node1->identifier (), node2->identifier ()); // TCP is the only transport layer available. @@ -155,27 +149,13 @@ std::shared_ptr nano::test::system::add_node (nano::node_config cons }); debug_assert (!ec); } - - if (type_a == nano::transport::transport_type::tcp && node_config_a.tcp_incoming_connections_max != 0 && !node_flags_a.disable_tcp_realtime) { - { - // Wait for initial connection finish - auto ec = poll_until_true (5s, [&node1, &node2, starting_realtime_1, starting_realtime_2] () { - auto realtime_1 = node1->tcp_listener.realtime_count (); - auto realtime_2 = node2->tcp_listener.realtime_count (); - return realtime_1 > starting_realtime_1 && realtime_2 > starting_realtime_2; - }); - debug_assert (!ec); - } - { - // Wait for keepalive message exchange - auto ec = poll_until_true (5s, [&node1, &node2, starting_keepalives_1, starting_keepalives_2] () { - auto keepalives_1 = node1->stats.count (stat::type::message, stat::detail::keepalive, stat::dir::in); - auto keepalives_2 = node2->stats.count (stat::type::message, stat::detail::keepalive, stat::dir::in); - return keepalives_1 > starting_keepalives_1 && keepalives_2 > starting_keepalives_2; - }); - debug_assert (!ec); - } + auto ec = poll_until_true (5s, [&node1, &node2] () { + bool result_1 = node1->network.find_node_id (node2->node_id.pub) != nullptr; + bool result_2 = node2->network.find_node_id (node1->node_id.pub) != nullptr; + return result_1 && result_2; + }); + debug_assert (!ec); } } From 864177f08ad806df3278810055be92c0976fa223 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sat, 11 May 2024 16:36:13 +0200 Subject: [PATCH 3/3] Test throttling behavior --- nano/core_test/network.cpp | 28 ++++++++++++++++++++++++++++ nano/lib/stats_enums.hpp | 1 + nano/node/transport/tcp_server.cpp | 2 +- 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index d784f93693..ad33fe98ba 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -1125,3 +1125,31 @@ TEST (network, purge_dead_channel_remote) }; ASSERT_TIMELY (5s, !channel_exists (node2, channel)); } + +TEST (network, tcp_server_throttle) +{ + nano::test::system system; + + auto config = system.default_config (); + config.message_processor.threads = 0; // Disable message processing + auto & node1 = *system.add_node (config); + // We need a second node to hijack one of the channels + auto & node2 = *system.add_node (config); + + auto channel = node2.network.find_node_id (node1.node_id.pub); + + // Up to (queue size + 1) messages should be received and deserialized, with the last one causing the channel to be throttled + nano::keepalive msg{ nano::dev::network_params.network }; + for (int n = 0; n < config.message_processor.max_queue + 1; ++n) + { + channel->send (msg); + } + ASSERT_TIMELY_EQ (5s, node1.stats.count (nano::stat::type::tcp_server_message, nano::stat::detail::keepalive), config.message_processor.max_queue + 1); + + // No messages should be processed + ASSERT_ALWAYS_EQ (1s, node1.stats.count (nano::stat::type::message, nano::stat::detail::keepalive), 0); + + // Any additional messages should not be eagerly deserialized from the socket + channel->send (msg); + ASSERT_ALWAYS_EQ (1s, node1.stats.count (nano::stat::type::tcp_server_message, nano::stat::detail::keepalive), config.message_processor.max_queue + 1); +} \ No newline at end of file diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 18f34e7dfc..e220db80e0 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -22,6 +22,7 @@ enum class type bootstrap, network, tcp_server, + tcp_server_message, vote, vote_processor, vote_processor_tier, diff --git a/nano/node/transport/tcp_server.cpp b/nano/node/transport/tcp_server.cpp index 25a8f976ac..0623eec373 100644 --- a/nano/node/transport/tcp_server.cpp +++ b/nano/node/transport/tcp_server.cpp @@ -161,7 +161,7 @@ auto nano::transport::tcp_server::process_message (std::unique_ptrstats.inc (nano::stat::type::tcp_server, to_stat_detail (message->type ()), nano::stat::dir::in); + node->stats.inc (nano::stat::type::tcp_server_message, to_stat_detail (message->type ()), nano::stat::dir::in); debug_assert (is_undefined_connection () || is_realtime_connection () || is_bootstrap_connection ());