Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cementing rate limiter #4789

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions nano/core_test/rate_limiting.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ TEST (rate, basic)
// Allow time for the bucket to completely refill and do a full burst
std::this_thread::sleep_for (1s);
ASSERT_TRUE (bucket.try_consume (10));
ASSERT_EQ (bucket.largest_burst (), 10);
}

TEST (rate, network)
Expand All @@ -35,9 +34,7 @@ TEST (rate, network)

// Initial burst of 10 mb/s over two calls
ASSERT_TRUE (bucket.try_consume (5));
ASSERT_EQ (bucket.largest_burst (), 5);
ASSERT_TRUE (bucket.try_consume (5));
ASSERT_EQ (bucket.largest_burst (), 10);
ASSERT_FALSE (bucket.try_consume (5));

// After 200 ms, the 5 mb/s fillrate means we have 1 mb available
Expand Down Expand Up @@ -84,13 +81,10 @@ TEST (rate, unlimited)
{
nano::rate::token_bucket bucket (0, 0);
ASSERT_TRUE (bucket.try_consume (5));
ASSERT_EQ (bucket.largest_burst (), 5);
ASSERT_TRUE (bucket.try_consume (static_cast<size_t> (1e9)));
ASSERT_EQ (bucket.largest_burst (), static_cast<size_t> (1e9));

// With unlimited tokens, consuming always succeed
ASSERT_TRUE (bucket.try_consume (static_cast<size_t> (1e9)));
ASSERT_EQ (bucket.largest_burst (), static_cast<size_t> (1e9));
}

TEST (rate, busy_spin)
Expand Down
45 changes: 21 additions & 24 deletions nano/lib/rate_limiting.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,56 +13,53 @@ nano::rate::token_bucket::token_bucket (std::size_t max_token_count_a, std::size
reset (max_token_count_a, refill_rate_a);
}

bool nano::rate::token_bucket::try_consume (unsigned tokens_required_a)
bool nano::rate::token_bucket::try_consume (std::size_t tokens_required)
{
debug_assert (tokens_required_a <= 1e9);
debug_assert (tokens_required <= unlimited_rate_sentinel);

refill ();
bool possible = current_size >= tokens_required_a;

bool possible = current_size >= tokens_required;
if (possible)
{
current_size -= tokens_required_a;
}
else if (tokens_required_a == 1e9)
{
current_size = 0;
current_size -= tokens_required;
}

// Keep track of smallest observed bucket size so burst size can be computed (for tests and stats)
smallest_size = std::min (smallest_size, current_size);

return possible || refill_rate == unlimited_rate_sentinel;
}

void nano::rate::token_bucket::refill ()
{
auto now (std::chrono::steady_clock::now ());
auto now = std::chrono::steady_clock::now ();
std::size_t tokens_to_add = static_cast<std::size_t> (std::chrono::duration_cast<std::chrono::nanoseconds> (now - last_refill).count () / 1e9 * refill_rate);
// Only update if there are any tokens to add
// Only update if there are tokens to add
if (tokens_to_add > 0)
{
current_size = std::min (current_size + tokens_to_add, max_token_count);
last_refill = std::chrono::steady_clock::now ();
last_refill = now;
}
}

void nano::rate::token_bucket::reset (std::size_t max_token_count_a, std::size_t refill_rate_a)
{
// A token count of 0 indicates unlimited capacity. We use 1e9 as
// a sentinel, allowing largest burst to still be computed.
if (max_token_count_a == 0 || refill_rate_a == 0)
// A token count of 0 indicates unlimited capacity. We use 1e9 as a sentinel, allowing largest burst to still be computed.
if (max_token_count_a == 0)
{
refill_rate_a = max_token_count_a = unlimited_rate_sentinel;
// Unlimited capacity
max_token_count_a = unlimited_rate_sentinel;
}
max_token_count = smallest_size = current_size = max_token_count_a;
if (refill_rate_a == 0)
{
// Unlimited rate
refill_rate_a = unlimited_rate_sentinel;
}

max_token_count = max_token_count_a;
refill_rate = refill_rate_a;
current_size = max_token_count < unlimited_rate_sentinel ? max_token_count : 0;
last_refill = std::chrono::steady_clock::now ();
}

std::size_t nano::rate::token_bucket::largest_burst () const
{
return max_token_count - smallest_size;
}

std::size_t nano::rate::token_bucket::size () const
{
return current_size;
Expand Down
20 changes: 7 additions & 13 deletions nano/lib/rate_limiting.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ class token_bucket
public:
/**
* Set up a token bucket.
* @param max_token_count Maximum number of tokens in this bucket, which limits bursts.
* @param refill_rate Token refill rate, which limits the long term rate (tokens per seconds)
* @param max_token_count Maximum number of tokens in this bucket, which limits bursts. 0 is unlimited.
* @param refill_rate Token refill rate, which limits the long term rate (tokens per seconds). 0 is unlimited (everything passes).
*/
token_bucket (std::size_t max_token_count, std::size_t refill_rate);

Expand All @@ -36,25 +36,20 @@ class token_bucket
* The default cost is 1 token, but resource intensive operations may request
* more tokens to be available.
*/
bool try_consume (unsigned tokens_required = 1);
bool try_consume (std::size_t tokens_required = 1);

/** Update the max_token_count and/or refill_rate_a parameters */
void reset (std::size_t max_token_count, std::size_t refill_rate);

/** Returns the largest burst observed */
std::size_t largest_burst () const;
/** Returns the current number of tokens in the bucket */
std::size_t size () const;

private:
void refill ();

private:
std::size_t max_token_count;
std::size_t refill_rate;

std::size_t max_token_count{ 0 };
std::size_t refill_rate{ 0 };
std::size_t current_size{ 0 };
/** The minimum observed bucket size, from which the largest burst can be derived */
std::size_t smallest_size{ 0 };
std::chrono::steady_clock::time_point last_refill;

static std::size_t constexpr unlimited_rate_sentinel{ static_cast<std::size_t> (1e9) };
Expand All @@ -70,8 +65,7 @@ class rate_limiter final
rate_limiter (std::size_t limit, double burst_ratio = 1.0);

bool should_pass (std::size_t buffer_size);
void reset (std::size_t limit, double burst_ratio = 1.0);

void reset (std::size_t limit, double burst_ratio);
std::size_t size () const;

private:
Expand Down
1 change: 1 addition & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ enum class detail
sync,
requeued,
evicted,
rate_limited,

// processing queue
queue,
Expand Down
21 changes: 19 additions & 2 deletions nano/node/confirming_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ nano::confirming_set::confirming_set (confirming_set_config const & config_a, na
block_processor{ block_processor_a },
stats{ stats_a },
logger{ logger_a },
limiter{ config.rate_limit, /* unlimited token bucket capacity */ 0 },
workers{ 1, nano::thread_role::name::confirmation_height_notifications }
{
batch_cemented.add ([this] (auto const & cemented) {
Expand Down Expand Up @@ -134,7 +135,9 @@ void nano::confirming_set::run ()
}
else
{
condition.wait (lock, [&] () { return !set.empty () || stopped; });
condition.wait (lock, [&] () {
return !set.empty () || stopped;
});
}
}
}
Expand Down Expand Up @@ -243,11 +246,24 @@ void nano::confirming_set::run_batch (std::unique_lock<std::mutex> & lock)
{
// Confirming this block may implicitly confirm more
stats.add (nano::stat::type::confirming_set, nano::stat::detail::cemented, added.size ());
for (auto & block : added)
for (auto const & block : added)
{
cemented.push_back ({ block, hash, election });
}
cemented_count += added.size ();

// Rate limit cementing
while (!limiter.should_pass (added.size ()))
{
stats.inc (nano::stat::type::confirming_set, nano::stat::detail::rate_limited);
transaction.commit ();
std::this_thread::sleep_for (100ms);
transaction.renew ();
if (stopped)
{
return;
}
}
}
else
{
Expand Down Expand Up @@ -336,6 +352,7 @@ nano::container_info nano::confirming_set::container_info () const
nano::container_info info;
info.put ("set", set);
info.put ("deferred", deferred);
info.put ("limiter", limiter.size ());
info.add ("workers", workers.container_info ());
return info;
}
6 changes: 6 additions & 0 deletions nano/node/confirming_set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <nano/lib/numbers.hpp>
#include <nano/lib/numbers_templ.hpp>
#include <nano/lib/observer_set.hpp>
#include <nano/lib/rate_limiting.hpp>
#include <nano/lib/thread_pool.hpp>
#include <nano/node/fwd.hpp>
#include <nano/secure/common.hpp>
Expand Down Expand Up @@ -41,6 +42,9 @@ class confirming_set_config final
size_t max_deferred{ 16 * 1024 };
/** Max age of deferred blocks before they are dropped */
std::chrono::seconds deferred_age_cutoff{ 15min };

/** For bounded backlog testing */
size_t rate_limit{ 0 };
};

/**
Expand Down Expand Up @@ -120,6 +124,8 @@ class confirming_set final
// Blocks that are being cemented in the current batch
std::unordered_set<nano::block_hash> current;

nano::rate_limiter limiter;

std::atomic<bool> stopped{ false };
mutable std::mutex mutex;
std::condition_variable condition;
Expand Down
Loading