Skip to content

Commit

Permalink
Ledger notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Jan 14, 2025
1 parent 8a2ab30 commit 5ed3aab
Show file tree
Hide file tree
Showing 30 changed files with 360 additions and 179 deletions.
24 changes: 11 additions & 13 deletions nano/core_test/confirming_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <nano/node/block_processor.hpp>
#include <nano/node/confirming_set.hpp>
#include <nano/node/election.hpp>
#include <nano/node/ledger_notifications.hpp>
#include <nano/node/make_store.hpp>
#include <nano/node/unchecked_map.hpp>
#include <nano/secure/ledger.hpp>
Expand All @@ -26,17 +27,15 @@ struct confirming_set_context
nano::stats & stats;
nano::ledger & ledger;

nano::unchecked_map unchecked;
nano::block_processor block_processor;
nano::ledger_notifications ledger_notifications;
nano::confirming_set confirming_set;

explicit confirming_set_context (nano::test::ledger_context & ledger_context, nano::node_config node_config = {}) :
logger{ ledger_context.logger () },
stats{ ledger_context.stats () },
ledger{ ledger_context.ledger () },
unchecked{ 0, stats, false },
block_processor{ node_config, ledger, unchecked, stats, logger },
confirming_set{ node_config.confirming_set, ledger, block_processor, stats, logger }
ledger_notifications{ node_config, stats, logger },
confirming_set{ node_config.confirming_set, ledger, ledger_notifications, stats, logger }
{
}
};
Expand Down Expand Up @@ -78,21 +77,20 @@ TEST (confirming_set, process_one)
TEST (confirming_set, process_multiple)
{
nano::test::system system;
auto & node = *system.add_node ();
auto ctx = nano::test::ledger_send_receive ();
nano::confirming_set_config config{};
nano::confirming_set confirming_set{ config, ctx.ledger (), node.block_processor, ctx.stats (), ctx.logger () };
auto ledger_ctx = nano::test::ledger_send_receive ();
confirming_set_context ctx{ ledger_ctx };
nano::confirming_set & confirming_set = ctx.confirming_set;
std::atomic<int> count = 0;
std::mutex mutex;
std::condition_variable condition;
confirming_set.cemented_observers.add ([&] (auto const &) { ++count; condition.notify_all (); });
confirming_set.add (ctx.blocks ()[0]->hash ());
confirming_set.add (ctx.blocks ()[1]->hash ());
confirming_set.add (ledger_ctx.blocks ()[0]->hash ());
confirming_set.add (ledger_ctx.blocks ()[1]->hash ());
nano::test::start_stop_guard guard{ confirming_set };
std::unique_lock lock{ mutex };
ASSERT_TRUE (condition.wait_for (lock, 5s, [&] () { return count == 2; }));
ASSERT_EQ (2, ctx.stats ().count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
ASSERT_EQ (3, ctx.ledger ().cemented_count ());
ASSERT_EQ (2, ctx.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
ASSERT_EQ (3, ctx.ledger.cemented_count ());
}

TEST (confirmation_callback, observer_callbacks)
Expand Down
2 changes: 0 additions & 2 deletions nano/core_test/toml.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,6 @@ TEST (toml_config, daemon_config_deserialize_defaults)

ASSERT_EQ (conf.node.bounded_backlog.enable, defaults.node.bounded_backlog.enable);
ASSERT_EQ (conf.node.bounded_backlog.batch_size, defaults.node.bounded_backlog.batch_size);
ASSERT_EQ (conf.node.bounded_backlog.max_queued_notifications, defaults.node.bounded_backlog.max_queued_notifications);
ASSERT_EQ (conf.node.bounded_backlog.scan_rate, defaults.node.bounded_backlog.scan_rate);

ASSERT_EQ (conf.node.websocket_config.enabled, defaults.node.websocket_config.enabled);
Expand Down Expand Up @@ -743,7 +742,6 @@ TEST (toml_config, daemon_config_deserialize_no_defaults)

ASSERT_NE (conf.node.bounded_backlog.enable, defaults.node.bounded_backlog.enable);
ASSERT_NE (conf.node.bounded_backlog.batch_size, defaults.node.bounded_backlog.batch_size);
ASSERT_NE (conf.node.bounded_backlog.max_queued_notifications, defaults.node.bounded_backlog.max_queued_notifications);
ASSERT_NE (conf.node.bounded_backlog.scan_rate, defaults.node.bounded_backlog.scan_rate);

ASSERT_NE (conf.node.websocket_config.enabled, defaults.node.websocket_config.enabled);
Expand Down
5 changes: 5 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ enum class type
message,
block,
ledger,
ledger_notifications,
rollback,
network,
vote,
Expand Down Expand Up @@ -577,6 +578,10 @@ enum class detail
tier_2,
tier_3,

// ledger_notifications
notify_processed,
notify_rolled_back,

// confirming_set
notify_cemented,
notify_already_cemented,
Expand Down
7 changes: 2 additions & 5 deletions nano/lib/thread_roles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::block_processing:
thread_role_name_string = "Blck processing";
break;
case nano::thread_role::name::block_processing_notifications:
thread_role_name_string = "Blck proc notif";
case nano::thread_role::name::ledger_notifications:
thread_role_name_string = "Ledger notif";
break;
case nano::thread_role::name::request_loop:
thread_role_name_string = "Request loop";
Expand Down Expand Up @@ -106,9 +106,6 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::bounded_backlog_scan:
thread_role_name_string = "Bounded b scan";
break;
case nano::thread_role::name::bounded_backlog_notifications:
thread_role_name_string = "Bounded b notif";
break;
case nano::thread_role::name::vote_generator_queue:
thread_role_name_string = "Voting que";
break;
Expand Down
3 changes: 1 addition & 2 deletions nano/lib/thread_roles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ enum class name
vote_processing,
vote_cache_processing,
block_processing,
block_processing_notifications,
ledger_notifications,
request_loop,
wallet_actions,
bootstrap_initiator,
Expand All @@ -40,7 +40,6 @@ enum class name
backlog_scan,
bounded_backlog,
bounded_backlog_scan,
bounded_backlog_notifications,
vote_generator_queue,
telemetry,
bootstrap,
Expand Down
2 changes: 2 additions & 0 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ add_library(
ipc/ipc_server.cpp
json_handler.hpp
json_handler.cpp
ledger_notifications.hpp
ledger_notifications.cpp
local_block_broadcaster.cpp
local_block_broadcaster.hpp
local_vote_history.cpp
Expand Down
9 changes: 5 additions & 4 deletions nano/node/active_elections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <nano/node/confirmation_solicitor.hpp>
#include <nano/node/confirming_set.hpp>
#include <nano/node/election.hpp>
#include <nano/node/ledger_notifications.hpp>
#include <nano/node/node.hpp>
#include <nano/node/online_reps.hpp>
#include <nano/node/repcrawler.hpp>
Expand All @@ -21,11 +22,11 @@

using namespace std::chrono;

nano::active_elections::active_elections (nano::node & node_a, nano::confirming_set & confirming_set_a, nano::block_processor & block_processor_a) :
nano::active_elections::active_elections (nano::node & node_a, nano::ledger_notifications & ledger_notifications_a, nano::confirming_set & confirming_set_a) :
config{ node_a.config.active_elections },
node{ node_a },
ledger_notifications{ ledger_notifications_a },
confirming_set{ confirming_set_a },
block_processor{ block_processor_a },
recently_confirmed{ config.confirmation_cache },
recently_cemented{ config.confirmation_history_size }
{
Expand Down Expand Up @@ -55,7 +56,7 @@ nano::active_elections::active_elections (nano::node & node_a, nano::confirming_
});

// Notify elections about alternative (forked) blocks
block_processor.batch_processed.add ([this] (auto const & batch) {
ledger_notifications.blocks_processed.add ([this] (auto const & batch) {
for (auto const & [result, context] : batch)
{
if (result == nano::block_status::fork)
Expand All @@ -66,7 +67,7 @@ nano::active_elections::active_elections (nano::node & node_a, nano::confirming_
});

// Stop all rolled back active transactions except initial
block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) {
ledger_notifications.blocks_rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) {
for (auto const & block : blocks)
{
if (block->qualified_root () != rollback_root)
Expand Down
4 changes: 2 additions & 2 deletions nano/node/active_elections.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class active_elections final
ordered_roots roots;

public:
active_elections (nano::node &, nano::confirming_set &, nano::block_processor &);
active_elections (nano::node &, nano::ledger_notifications &, nano::confirming_set &);
~active_elections ();

void start ();
Expand Down Expand Up @@ -144,8 +144,8 @@ class active_elections final
private: // Dependencies
active_elections_config const & config;
nano::node & node;
nano::ledger_notifications & ledger_notifications;
nano::confirming_set & confirming_set;
nano::block_processor & block_processor;

public:
nano::recently_confirmed_cache recently_confirmed;
Expand Down
87 changes: 38 additions & 49 deletions nano/node/block_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <nano/lib/timer.hpp>
#include <nano/node/active_elections.hpp>
#include <nano/node/block_processor.hpp>
#include <nano/node/ledger_notifications.hpp>
#include <nano/node/local_vote_history.hpp>
#include <nano/node/node.hpp>
#include <nano/node/unchecked_map.hpp>
Expand All @@ -18,14 +19,14 @@
* block_processor
*/

nano::block_processor::block_processor (nano::node_config const & node_config, nano::ledger & ledger_a, nano::unchecked_map & unchecked_a, nano::stats & stats_a, nano::logger & logger_a) :
nano::block_processor::block_processor (nano::node_config const & node_config, nano::ledger & ledger_a, nano::ledger_notifications & ledger_notifications_a, nano::unchecked_map & unchecked_a, nano::stats & stats_a, nano::logger & logger_a) :
config{ node_config.block_processor },
network_params{ node_config.network_params },
ledger{ ledger_a },
ledger_notifications{ ledger_notifications_a },
unchecked{ unchecked_a },
stats{ stats_a },
logger{ logger_a },
workers{ 1, nano::thread_role::name::block_processing_notifications }
logger{ logger_a }
{
queue.max_size_query = [this] (auto const & origin) {
switch (origin.source)
Expand Down Expand Up @@ -65,15 +66,12 @@ nano::block_processor::~block_processor ()
{
// Thread must be stopped before destruction
debug_assert (!thread.joinable ());
debug_assert (!workers.alive ());
}

void nano::block_processor::start ()
{
debug_assert (!thread.joinable ());

workers.start ();

thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::block_processing);
run ();
Expand All @@ -91,7 +89,6 @@ void nano::block_processor::stop ()
{
thread.join ();
}
workers.stop ();
}

// TODO: Remove and replace all checks with calls to size (block_source)
Expand Down Expand Up @@ -175,7 +172,7 @@ bool nano::block_processor::add_impl (nano::block_context ctx, std::shared_ptr<n
return added;
}

void nano::block_processor::rollback_competitor (secure::write_transaction const & transaction, nano::block const & fork_block)
void nano::block_processor::rollback_competitor (secure::write_transaction & transaction, nano::block const & fork_block)
{
auto const hash = fork_block.hash ();
auto const successor_hash = ledger.any.block_successor (transaction, fork_block.qualified_root ());
Expand All @@ -197,10 +194,13 @@ void nano::block_processor::rollback_competitor (secure::write_transaction const
logger.debug (nano::log::type::block_processor, "Blocks rolled back: {}", rollback_list.size ());
}

// Notify observers of the rolled back blocks on a background thread while not holding the ledger write lock
workers.post ([this, rollback_list = std::move (rollback_list), root = fork_block.qualified_root ()] () {
rolled_back.notify (rollback_list, root);
});
if (!rollback_list.empty ())
{
// Notify observers of the rolled back blocks on a background thread while not holding the ledger write lock
ledger_notifications.notify_rolled_back (transaction, std::move (rollback_list), fork_block.qualified_root (), [this] {
stats.inc (nano::stat::type::block_processor, nano::stat::detail::notify_rolled_back);
});
}
}
}

Expand All @@ -210,50 +210,36 @@ void nano::block_processor::run ()
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
if (!queue.empty ())
condition.wait (lock, [this] {
return stopped || !queue.empty ();
});

if (stopped)
{
// It's possible that ledger processing happens faster than the notifications can be processed by other components, cooldown here
while (workers.queued_tasks () >= config.max_queued_notifications)
{
stats.inc (nano::stat::type::block_processor, nano::stat::detail::cooldown);
condition.wait_for (lock, 100ms, [this] { return stopped; });
if (stopped)
{
return;
}
}
return;
}

lock.unlock ();

// It's possible that ledger processing happens faster than the notifications can be processed by other components, cooldown here
ledger_notifications.wait ([this] {
stats.inc (nano::stat::type::block_processor, nano::stat::detail::cooldown);
});

lock.lock ();

if (!queue.empty ())
{
if (log_interval.elapse (15s))
{
logger.info (nano::log::type::block_processor, "{} blocks (+ {} forced) in processing queue",
queue.size (),
queue.size ({ nano::block_source::forced }));
}

auto processed = process_batch (lock);
process_batch (lock);
debug_assert (!lock.owns_lock ());
lock.lock ();

// Queue notifications to be dispatched in the background
workers.post ([this, processed = std::move (processed)] () mutable {
stats.inc (nano::stat::type::block_processor, nano::stat::detail::notify);
// Set results for futures when not holding the lock
for (auto & [result, context] : processed)
{
if (context.callback)
{
context.callback (result);
}
context.set_result (result);
}
batch_processed.notify (processed);
});
}
else
{
condition.wait (lock, [this] {
return stopped || !queue.empty ();
});
}
}
}
Expand Down Expand Up @@ -288,7 +274,7 @@ auto nano::block_processor::next_batch (size_t max_count) -> std::deque<nano::bl
return results;
}

auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock) -> processed_batch_t
void nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock)
{
debug_assert (lock.owns_lock ());
debug_assert (!mutex.try_lock ());
Expand All @@ -307,7 +293,8 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
size_t number_of_blocks_processed = 0;
size_t number_of_forced_processed = 0;

processed_batch_t processed;
std::deque<std::pair<nano::block_status, nano::block_context>> processed;

for (auto & ctx : batch)
{
auto const hash = ctx.block->hash ();
Expand All @@ -332,7 +319,10 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
logger.debug (nano::log::type::block_processor, "Processed {} blocks ({} forced) in {} {}", number_of_blocks_processed, number_of_forced_processed, timer.value ().count (), timer.unit ());
}

return processed;
// Queue notifications to be dispatched in the background
ledger_notifications.notify_processed (transaction, std::move (processed), [this] {
stats.inc (nano::stat::type::block_processor, nano::stat::detail::notify_processed);
});
}

nano::block_status nano::block_processor::process_one (secure::write_transaction const & transaction_a, nano::block_context const & context, bool const forced_a)
Expand Down Expand Up @@ -451,7 +441,6 @@ nano::container_info nano::block_processor::container_info () const
info.put ("blocks", queue.size ());
info.put ("forced", queue.size ({ nano::block_source::forced }));
info.add ("queue", queue.container_info ());
info.add ("workers", workers.container_info ());
return info;
}

Expand Down
Loading

0 comments on commit 5ed3aab

Please sign in to comment.