diff --git a/nano/core_test/confirming_set.cpp b/nano/core_test/confirming_set.cpp index c49b4f44b3..5de8028270 100644 --- a/nano/core_test/confirming_set.cpp +++ b/nano/core_test/confirming_set.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -26,17 +27,15 @@ struct confirming_set_context nano::stats & stats; nano::ledger & ledger; - nano::unchecked_map unchecked; - nano::block_processor block_processor; + nano::ledger_notifications ledger_notifications; nano::confirming_set confirming_set; explicit confirming_set_context (nano::test::ledger_context & ledger_context, nano::node_config node_config = {}) : logger{ ledger_context.logger () }, stats{ ledger_context.stats () }, ledger{ ledger_context.ledger () }, - unchecked{ 0, stats, false }, - block_processor{ node_config, ledger, unchecked, stats, logger }, - confirming_set{ node_config.confirming_set, ledger, block_processor, stats, logger } + ledger_notifications{ node_config, stats, logger }, + confirming_set{ node_config.confirming_set, ledger, ledger_notifications, stats, logger } { } }; @@ -78,21 +77,20 @@ TEST (confirming_set, process_one) TEST (confirming_set, process_multiple) { nano::test::system system; - auto & node = *system.add_node (); - auto ctx = nano::test::ledger_send_receive (); - nano::confirming_set_config config{}; - nano::confirming_set confirming_set{ config, ctx.ledger (), node.block_processor, ctx.stats (), ctx.logger () }; + auto ledger_ctx = nano::test::ledger_send_receive (); + confirming_set_context ctx{ ledger_ctx }; + nano::confirming_set & confirming_set = ctx.confirming_set; std::atomic count = 0; std::mutex mutex; std::condition_variable condition; confirming_set.cemented_observers.add ([&] (auto const &) { ++count; condition.notify_all (); }); - confirming_set.add (ctx.blocks ()[0]->hash ()); - confirming_set.add (ctx.blocks ()[1]->hash ()); + confirming_set.add (ledger_ctx.blocks ()[0]->hash ()); + confirming_set.add (ledger_ctx.blocks ()[1]->hash ()); nano::test::start_stop_guard guard{ confirming_set }; std::unique_lock lock{ mutex }; ASSERT_TRUE (condition.wait_for (lock, 5s, [&] () { return count == 2; })); - ASSERT_EQ (2, ctx.stats ().count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in)); - ASSERT_EQ (3, ctx.ledger ().cemented_count ()); + ASSERT_EQ (2, ctx.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in)); + ASSERT_EQ (3, ctx.ledger.cemented_count ()); } TEST (confirmation_callback, observer_callbacks) diff --git a/nano/core_test/toml.cpp b/nano/core_test/toml.cpp index 29a9916675..148170d314 100644 --- a/nano/core_test/toml.cpp +++ b/nano/core_test/toml.cpp @@ -325,7 +325,6 @@ TEST (toml_config, daemon_config_deserialize_defaults) ASSERT_EQ (conf.node.bounded_backlog.enable, defaults.node.bounded_backlog.enable); ASSERT_EQ (conf.node.bounded_backlog.batch_size, defaults.node.bounded_backlog.batch_size); - ASSERT_EQ (conf.node.bounded_backlog.max_queued_notifications, defaults.node.bounded_backlog.max_queued_notifications); ASSERT_EQ (conf.node.bounded_backlog.scan_rate, defaults.node.bounded_backlog.scan_rate); ASSERT_EQ (conf.node.websocket_config.enabled, defaults.node.websocket_config.enabled); @@ -743,7 +742,6 @@ TEST (toml_config, daemon_config_deserialize_no_defaults) ASSERT_NE (conf.node.bounded_backlog.enable, defaults.node.bounded_backlog.enable); ASSERT_NE (conf.node.bounded_backlog.batch_size, defaults.node.bounded_backlog.batch_size); - ASSERT_NE (conf.node.bounded_backlog.max_queued_notifications, defaults.node.bounded_backlog.max_queued_notifications); ASSERT_NE (conf.node.bounded_backlog.scan_rate, defaults.node.bounded_backlog.scan_rate); ASSERT_NE (conf.node.websocket_config.enabled, defaults.node.websocket_config.enabled); diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 4f7f0fbb35..47a5d7f18f 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -17,6 +17,7 @@ enum class type message, block, ledger, + ledger_notifications, rollback, network, vote, @@ -577,6 +578,10 @@ enum class detail tier_2, tier_3, + // ledger_notifications + notify_processed, + notify_rolled_back, + // confirming_set notify_cemented, notify_already_cemented, diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index 8b4c0314a9..0731b4ac9a 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -40,8 +40,8 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::block_processing: thread_role_name_string = "Blck processing"; break; - case nano::thread_role::name::block_processing_notifications: - thread_role_name_string = "Blck proc notif"; + case nano::thread_role::name::ledger_notifications: + thread_role_name_string = "Ledger notif"; break; case nano::thread_role::name::request_loop: thread_role_name_string = "Request loop"; @@ -106,9 +106,6 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::bounded_backlog_scan: thread_role_name_string = "Bounded b scan"; break; - case nano::thread_role::name::bounded_backlog_notifications: - thread_role_name_string = "Bounded b notif"; - break; case nano::thread_role::name::vote_generator_queue: thread_role_name_string = "Voting que"; break; diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index 349c02f1bd..99269536d3 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -18,7 +18,7 @@ enum class name vote_processing, vote_cache_processing, block_processing, - block_processing_notifications, + ledger_notifications, request_loop, wallet_actions, bootstrap_initiator, @@ -40,7 +40,6 @@ enum class name backlog_scan, bounded_backlog, bounded_backlog_scan, - bounded_backlog_notifications, vote_generator_queue, telemetry, bootstrap, diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 01915848ac..67b8aba41f 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -88,6 +88,8 @@ add_library( ipc/ipc_server.cpp json_handler.hpp json_handler.cpp + ledger_notifications.hpp + ledger_notifications.cpp local_block_broadcaster.cpp local_block_broadcaster.hpp local_vote_history.cpp diff --git a/nano/node/active_elections.cpp b/nano/node/active_elections.cpp index 56d3fd1148..5bd27fdc58 100644 --- a/nano/node/active_elections.cpp +++ b/nano/node/active_elections.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -21,11 +22,11 @@ using namespace std::chrono; -nano::active_elections::active_elections (nano::node & node_a, nano::confirming_set & confirming_set_a, nano::block_processor & block_processor_a) : +nano::active_elections::active_elections (nano::node & node_a, nano::ledger_notifications & ledger_notifications_a, nano::confirming_set & confirming_set_a) : config{ node_a.config.active_elections }, node{ node_a }, + ledger_notifications{ ledger_notifications_a }, confirming_set{ confirming_set_a }, - block_processor{ block_processor_a }, recently_confirmed{ config.confirmation_cache }, recently_cemented{ config.confirmation_history_size } { @@ -55,7 +56,7 @@ nano::active_elections::active_elections (nano::node & node_a, nano::confirming_ }); // Notify elections about alternative (forked) blocks - block_processor.batch_processed.add ([this] (auto const & batch) { + ledger_notifications.blocks_processed.add ([this] (auto const & batch) { for (auto const & [result, context] : batch) { if (result == nano::block_status::fork) @@ -66,7 +67,7 @@ nano::active_elections::active_elections (nano::node & node_a, nano::confirming_ }); // Stop all rolled back active transactions except initial - block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) { + ledger_notifications.blocks_rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) { for (auto const & block : blocks) { if (block->qualified_root () != rollback_root) diff --git a/nano/node/active_elections.hpp b/nano/node/active_elections.hpp index 8b831112b9..d7c863f484 100644 --- a/nano/node/active_elections.hpp +++ b/nano/node/active_elections.hpp @@ -90,7 +90,7 @@ class active_elections final ordered_roots roots; public: - active_elections (nano::node &, nano::confirming_set &, nano::block_processor &); + active_elections (nano::node &, nano::ledger_notifications &, nano::confirming_set &); ~active_elections (); void start (); @@ -144,8 +144,8 @@ class active_elections final private: // Dependencies active_elections_config const & config; nano::node & node; + nano::ledger_notifications & ledger_notifications; nano::confirming_set & confirming_set; - nano::block_processor & block_processor; public: nano::recently_confirmed_cache recently_confirmed; diff --git a/nano/node/block_processor.cpp b/nano/node/block_processor.cpp index a9cc5a03d7..67da0d1ecc 100644 --- a/nano/node/block_processor.cpp +++ b/nano/node/block_processor.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -18,14 +19,14 @@ * block_processor */ -nano::block_processor::block_processor (nano::node_config const & node_config, nano::ledger & ledger_a, nano::unchecked_map & unchecked_a, nano::stats & stats_a, nano::logger & logger_a) : +nano::block_processor::block_processor (nano::node_config const & node_config, nano::ledger & ledger_a, nano::ledger_notifications & ledger_notifications_a, nano::unchecked_map & unchecked_a, nano::stats & stats_a, nano::logger & logger_a) : config{ node_config.block_processor }, network_params{ node_config.network_params }, ledger{ ledger_a }, + ledger_notifications{ ledger_notifications_a }, unchecked{ unchecked_a }, stats{ stats_a }, - logger{ logger_a }, - workers{ 1, nano::thread_role::name::block_processing_notifications } + logger{ logger_a } { queue.max_size_query = [this] (auto const & origin) { switch (origin.source) @@ -65,15 +66,12 @@ nano::block_processor::~block_processor () { // Thread must be stopped before destruction debug_assert (!thread.joinable ()); - debug_assert (!workers.alive ()); } void nano::block_processor::start () { debug_assert (!thread.joinable ()); - workers.start (); - thread = std::thread ([this] () { nano::thread_role::set (nano::thread_role::name::block_processing); run (); @@ -91,7 +89,6 @@ void nano::block_processor::stop () { thread.join (); } - workers.stop (); } // TODO: Remove and replace all checks with calls to size (block_source) @@ -175,7 +172,7 @@ bool nano::block_processor::add_impl (nano::block_context ctx, std::shared_ptr lock{ mutex }; while (!stopped) { - if (!queue.empty ()) + condition.wait (lock, [this] { + return stopped || !queue.empty (); + }); + + if (stopped) { - // It's possible that ledger processing happens faster than the notifications can be processed by other components, cooldown here - while (workers.queued_tasks () >= config.max_queued_notifications) - { - stats.inc (nano::stat::type::block_processor, nano::stat::detail::cooldown); - condition.wait_for (lock, 100ms, [this] { return stopped; }); - if (stopped) - { - return; - } - } + return; + } + + lock.unlock (); + // It's possible that ledger processing happens faster than the notifications can be processed by other components, cooldown here + ledger_notifications.wait ([this] { + stats.inc (nano::stat::type::block_processor, nano::stat::detail::cooldown); + }); + + lock.lock (); + + if (!queue.empty ()) + { if (log_interval.elapse (15s)) { logger.info (nano::log::type::block_processor, "{} blocks (+ {} forced) in processing queue", @@ -230,30 +237,9 @@ void nano::block_processor::run () queue.size ({ nano::block_source::forced })); } - auto processed = process_batch (lock); + process_batch (lock); debug_assert (!lock.owns_lock ()); lock.lock (); - - // Queue notifications to be dispatched in the background - workers.post ([this, processed = std::move (processed)] () mutable { - stats.inc (nano::stat::type::block_processor, nano::stat::detail::notify); - // Set results for futures when not holding the lock - for (auto & [result, context] : processed) - { - if (context.callback) - { - context.callback (result); - } - context.set_result (result); - } - batch_processed.notify (processed); - }); - } - else - { - condition.wait (lock, [this] { - return stopped || !queue.empty (); - }); } } } @@ -288,7 +274,7 @@ auto nano::block_processor::next_batch (size_t max_count) -> std::deque & lock) -> processed_batch_t +void nano::block_processor::process_batch (nano::unique_lock & lock) { debug_assert (lock.owns_lock ()); debug_assert (!mutex.try_lock ()); @@ -307,7 +293,8 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock size_t number_of_blocks_processed = 0; size_t number_of_forced_processed = 0; - processed_batch_t processed; + std::deque> processed; + for (auto & ctx : batch) { auto const hash = ctx.block->hash (); @@ -332,7 +319,10 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock logger.debug (nano::log::type::block_processor, "Processed {} blocks ({} forced) in {} {}", number_of_blocks_processed, number_of_forced_processed, timer.value ().count (), timer.unit ()); } - return processed; + // Queue notifications to be dispatched in the background + ledger_notifications.notify_processed (transaction, std::move (processed), [this] { + stats.inc (nano::stat::type::block_processor, nano::stat::detail::notify_processed); + }); } nano::block_status nano::block_processor::process_one (secure::write_transaction const & transaction_a, nano::block_context const & context, bool const forced_a) @@ -451,7 +441,6 @@ nano::container_info nano::block_processor::container_info () const info.put ("blocks", queue.size ()); info.put ("forced", queue.size ({ nano::block_source::forced })); info.add ("queue", queue.container_info ()); - info.add ("workers", workers.container_info ()); return info; } diff --git a/nano/node/block_processor.hpp b/nano/node/block_processor.hpp index 48b448ee85..173830bed5 100644 --- a/nano/node/block_processor.hpp +++ b/nano/node/block_processor.hpp @@ -25,6 +25,8 @@ class block_processor_config final nano::error serialize (nano::tomlconfig & toml) const; public: + size_t batch_size{ 256 }; + // Maximum number of blocks to queue from network peers size_t max_peer_queue{ 128 }; // Maximum number of blocks to queue from system components (local RPC, bootstrap) @@ -35,9 +37,6 @@ class block_processor_config final size_t priority_bootstrap{ 8 }; size_t priority_local{ 16 }; size_t priority_system{ 32 }; - - size_t batch_size{ 256 }; - size_t max_queued_notifications{ 8 }; }; /** @@ -47,7 +46,7 @@ class block_processor_config final class block_processor final { public: - block_processor (nano::node_config const &, nano::ledger &, nano::unchecked_map &, nano::stats &, nano::logger &); + block_processor (nano::node_config const &, nano::ledger &, nano::ledger_notifications &, nano::unchecked_map &, nano::stats &, nano::logger &); ~block_processor (); void start (); @@ -63,20 +62,11 @@ class block_processor final std::atomic flushing{ false }; -public: // Events - // All processed blocks including forks, rejected etc - using processed_batch_t = std::deque>; - using processed_batch_event_t = nano::observer_set; - processed_batch_event_t batch_processed; - - // Rolled back blocks - using rolled_back_event_t = nano::observer_set>, nano::qualified_root>; - rolled_back_event_t rolled_back; - private: // Dependencies block_processor_config const & config; nano::network_params const & network_params; nano::ledger & ledger; + nano::ledger_notifications & ledger_notifications; nano::unchecked_map & unchecked; nano::stats & stats; nano::logger & logger; @@ -84,9 +74,9 @@ class block_processor final private: void run (); // Roll back block in the ledger that conflicts with 'block' - void rollback_competitor (secure::write_transaction const &, nano::block const & block); + void rollback_competitor (secure::write_transaction &, nano::block const & block); nano::block_status process_one (secure::write_transaction const &, nano::block_context const &, bool forced = false); - processed_batch_t process_batch (nano::unique_lock &); + void process_batch (nano::unique_lock &); std::deque next_batch (size_t max_count); nano::block_context next (); bool add_impl (nano::block_context, std::shared_ptr const & channel = nullptr); @@ -98,7 +88,5 @@ class block_processor final nano::condition_variable condition; mutable nano::mutex mutex{ mutex_identifier (mutexes::block_processor) }; std::thread thread; - - nano::thread_pool workers; }; } diff --git a/nano/node/bootstrap/bootstrap_service.cpp b/nano/node/bootstrap/bootstrap_service.cpp index dd11b36e2e..0048849ed5 100644 --- a/nano/node/bootstrap/bootstrap_service.cpp +++ b/nano/node/bootstrap/bootstrap_service.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -18,11 +19,13 @@ using namespace std::chrono_literals; -nano::bootstrap_service::bootstrap_service (nano::node_config const & node_config_a, nano::block_processor & block_processor_a, nano::ledger & ledger_a, nano::network & network_a, nano::stats & stat_a, nano::logger & logger_a) : +nano::bootstrap_service::bootstrap_service (nano::node_config const & node_config_a, nano::ledger & ledger_a, nano::ledger_notifications & ledger_notifications_a, +nano::block_processor & block_processor_a, nano::network & network_a, nano::stats & stat_a, nano::logger & logger_a) : config{ node_config_a.bootstrap }, network_constants{ node_config_a.network_params.network }, - block_processor{ block_processor_a }, ledger{ ledger_a }, + ledger_notifications{ ledger_notifications_a }, + block_processor{ block_processor_a }, network{ network_a }, stats{ stat_a }, logger{ logger_a }, @@ -36,7 +39,8 @@ nano::bootstrap_service::bootstrap_service (nano::node_config const & node_confi frontiers_limiter{ config.frontier_rate_limit }, workers{ 1, nano::thread_role::name::bootstrap_worker } { - block_processor.batch_processed.add ([this] (auto const & batch) { + // Inspect all processed blocks + ledger_notifications.blocks_processed.add ([this] (auto const & batch) { { nano::lock_guard lock{ mutex }; @@ -51,7 +55,7 @@ nano::bootstrap_service::bootstrap_service (nano::node_config const & node_confi }); // Unblock rolled back accounts as the dependency is no longer valid - block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) { + ledger_notifications.blocks_rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) { nano::lock_guard lock{ mutex }; for (auto const & block : blocks) { diff --git a/nano/node/bootstrap/bootstrap_service.hpp b/nano/node/bootstrap/bootstrap_service.hpp index f28d3fbe14..891f3c14bf 100644 --- a/nano/node/bootstrap/bootstrap_service.hpp +++ b/nano/node/bootstrap/bootstrap_service.hpp @@ -30,7 +30,7 @@ namespace nano class bootstrap_service { public: - bootstrap_service (nano::node_config const &, nano::block_processor &, nano::ledger &, nano::network &, nano::stats &, nano::logger &); + bootstrap_service (nano::node_config const &, nano::ledger &, nano::ledger_notifications &, nano::block_processor &, nano::network &, nano::stats &, nano::logger &); ~bootstrap_service (); void start (); @@ -55,8 +55,9 @@ class bootstrap_service private: // Dependencies bootstrap_config const & config; nano::network_constants const & network_constants; - nano::block_processor & block_processor; nano::ledger & ledger; + nano::ledger_notifications & ledger_notifications; + nano::block_processor & block_processor; nano::network & network; nano::stats & stats; nano::logger & logger; diff --git a/nano/node/bounded_backlog.cpp b/nano/node/bounded_backlog.cpp index b019d8c674..8f9e9f8337 100644 --- a/nano/node/bounded_backlog.cpp +++ b/nano/node/bounded_backlog.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -12,18 +13,17 @@ #include #include -nano::bounded_backlog::bounded_backlog (nano::node_config const & config_a, nano::node & node_a, nano::ledger & ledger_a, nano::bucketing & bucketing_a, nano::backlog_scan & backlog_scan_a, nano::block_processor & block_processor_a, nano::confirming_set & confirming_set_a, nano::stats & stats_a, nano::logger & logger_a) : +nano::bounded_backlog::bounded_backlog (nano::node_config const & config_a, nano::node & node_a, nano::ledger & ledger_a, nano::ledger_notifications & ledger_notifications_a, nano::bucketing & bucketing_a, nano::backlog_scan & backlog_scan_a, nano::block_processor & block_processor_a, nano::confirming_set & confirming_set_a, nano::stats & stats_a, nano::logger & logger_a) : config{ config_a }, node{ node_a }, ledger{ ledger_a }, + ledger_notifications{ ledger_notifications_a }, bucketing{ bucketing_a }, backlog_scan{ backlog_scan_a }, - block_processor{ block_processor_a }, confirming_set{ confirming_set_a }, stats{ stats_a }, logger{ logger_a }, - scan_limiter{ config.bounded_backlog.scan_rate }, - workers{ 1, nano::thread_role::name::bounded_backlog_notifications } + scan_limiter{ config.bounded_backlog.scan_rate } { // Activate accounts with unconfirmed blocks backlog_scan.batch_activated.add ([this] (auto const & batch) { @@ -47,7 +47,7 @@ nano::bounded_backlog::bounded_backlog (nano::node_config const & config_a, nano }); // Track unconfirmed blocks - block_processor.batch_processed.add ([this] (auto const & batch) { + ledger_notifications.blocks_processed.add ([this] (auto const & batch) { auto transaction = ledger.tx_begin_read (); for (auto const & [result, context] : batch) { @@ -60,7 +60,7 @@ nano::bounded_backlog::bounded_backlog (nano::node_config const & config_a, nano }); // Remove rolled back blocks from the backlog - block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) { + ledger_notifications.blocks_rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) { nano::lock_guard guard{ mutex }; for (auto const & block : blocks) { @@ -83,7 +83,6 @@ nano::bounded_backlog::~bounded_backlog () // Thread must be stopped before destruction debug_assert (!thread.joinable ()); debug_assert (!scan_thread.joinable ()); - debug_assert (!workers.alive ()); } void nano::bounded_backlog::start () @@ -95,8 +94,6 @@ void nano::bounded_backlog::start () return; } - workers.start (); - thread = std::thread{ [this] () { nano::thread_role::set (nano::thread_role::name::bounded_backlog); run (); @@ -123,7 +120,6 @@ void nano::bounded_backlog::stop () { scan_thread.join (); } - workers.stop (); } size_t nano::bounded_backlog::index_size () const @@ -206,16 +202,14 @@ void nano::bounded_backlog::run () return; } + lock.unlock (); + // Wait until all notification about the previous rollbacks are processed - while (workers.queued_tasks () >= config.bounded_backlog.max_queued_notifications) - { + ledger_notifications.wait ([this] { stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::cooldown); - condition.wait_for (lock, 100ms, [this] { return stopped.load (); }); - if (stopped) - { - return; - } - } + }); + + lock.lock (); stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::loop); @@ -310,9 +304,8 @@ std::deque nano::bounded_backlog::perform_rollbacks (std::dequ } // Notify observers of the rolled back blocks on a background thread, avoid dispatching notifications when holding ledger write transaction - workers.post ([this, rollback_list = std::move (rollback_list), root = block->qualified_root ()] { - // TODO: Calling block_processor's event here is not ideal, but duplicating these events is even worse - block_processor.rolled_back.notify (rollback_list, root); + ledger_notifications.notify_rolled_back (transaction, std::move (rollback_list), block->qualified_root (), [this] { + stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::notify_rolled_back); }); // Return early if we reached the maximum number of rollbacks @@ -420,7 +413,6 @@ nano::container_info nano::bounded_backlog::container_info () const nano::lock_guard guard{ mutex }; nano::container_info info; info.put ("backlog", index.size ()); - info.put ("notifications", workers.queued_tasks ()); info.add ("index", index.container_info ()); return info; } @@ -547,7 +539,6 @@ nano::error nano::bounded_backlog_config::serialize (nano::tomlconfig & toml) co { toml.put ("enable", enable, "Enable the bounded backlog. \ntype:bool"); toml.put ("batch_size", batch_size, "Maximum number of blocks to rollback per iteration. \ntype:uint64"); - toml.put ("max_queued_notifications", max_queued_notifications, "Maximum number of queued background tasks before cooldown. \ntype:uint64"); toml.put ("scan_rate", scan_rate, "Rate limit for refreshing the backlog index. \ntype:uint64"); return toml.get_error (); @@ -557,7 +548,6 @@ nano::error nano::bounded_backlog_config::deserialize (nano::tomlconfig & toml) { toml.get ("enable", enable); toml.get ("batch_size", batch_size); - toml.get ("max_queued_notifications", max_queued_notifications); toml.get ("scan_rate", scan_rate); return toml.get_error (); diff --git a/nano/node/bounded_backlog.hpp b/nano/node/bounded_backlog.hpp index 807bfe69d6..bbd0ed1111 100644 --- a/nano/node/bounded_backlog.hpp +++ b/nano/node/bounded_backlog.hpp @@ -101,14 +101,13 @@ class bounded_backlog_config public: bool enable{ true }; size_t batch_size{ 32 }; - size_t max_queued_notifications{ 128 }; size_t scan_rate{ 64 }; }; class bounded_backlog { public: - bounded_backlog (nano::node_config const &, nano::node &, nano::ledger &, nano::bucketing &, nano::backlog_scan &, nano::block_processor &, nano::confirming_set &, nano::stats &, nano::logger &); + bounded_backlog (nano::node_config const &, nano::node &, nano::ledger &, nano::ledger_notifications &, nano::bucketing &, nano::backlog_scan &, nano::block_processor &, nano::confirming_set &, nano::stats &, nano::logger &); ~bounded_backlog (); void start (); @@ -124,9 +123,9 @@ class bounded_backlog nano::node_config const & config; nano::node & node; nano::ledger & ledger; + nano::ledger_notifications & ledger_notifications; nano::bucketing & bucketing; nano::backlog_scan & backlog_scan; - nano::block_processor & block_processor; nano::confirming_set & confirming_set; nano::stats & stats; nano::logger & logger; @@ -155,7 +154,5 @@ class bounded_backlog mutable nano::mutex mutex; std::thread thread; std::thread scan_thread; - - nano::thread_pool workers; }; } \ No newline at end of file diff --git a/nano/node/confirming_set.cpp b/nano/node/confirming_set.cpp index db7ca3e6a4..e08082e5d8 100644 --- a/nano/node/confirming_set.cpp +++ b/nano/node/confirming_set.cpp @@ -4,16 +4,17 @@ #include #include #include +#include #include #include #include #include #include -nano::confirming_set::confirming_set (confirming_set_config const & config_a, nano::ledger & ledger_a, nano::block_processor & block_processor_a, nano::stats & stats_a, nano::logger & logger_a) : +nano::confirming_set::confirming_set (confirming_set_config const & config_a, nano::ledger & ledger_a, nano::ledger_notifications & ledger_notifications_a, nano::stats & stats_a, nano::logger & logger_a) : config{ config_a }, ledger{ ledger_a }, - block_processor{ block_processor_a }, + ledger_notifications{ ledger_notifications_a }, stats{ stats_a }, logger{ logger_a }, workers{ 1, nano::thread_role::name::confirmation_height_notifications } @@ -26,7 +27,7 @@ nano::confirming_set::confirming_set (confirming_set_config const & config_a, na }); // Requeue blocks that failed to cement immediately due to missing ledger blocks - block_processor.batch_processed.add ([this] (auto const & batch) { + ledger_notifications.blocks_processed.add ([this] (auto const & batch) { bool should_notify = false; { std::lock_guard lock{ mutex }; diff --git a/nano/node/confirming_set.hpp b/nano/node/confirming_set.hpp index 908b08c95e..7043068b5b 100644 --- a/nano/node/confirming_set.hpp +++ b/nano/node/confirming_set.hpp @@ -52,7 +52,7 @@ class confirming_set final friend class confirmation_height_pruned_source_Test; public: - confirming_set (confirming_set_config const &, nano::ledger &, nano::block_processor &, nano::stats &, nano::logger &); + confirming_set (confirming_set_config const &, nano::ledger &, nano::ledger_notifications &, nano::stats &, nano::logger &); ~confirming_set (); void start (); @@ -83,7 +83,7 @@ class confirming_set final private: // Dependencies confirming_set_config const & config; nano::ledger & ledger; - nano::block_processor & block_processor; + nano::ledger_notifications & ledger_notifications; nano::stats & stats; nano::logger & logger; diff --git a/nano/node/fwd.hpp b/nano/node/fwd.hpp index 8aa4674085..5bca88abf8 100644 --- a/nano/node/fwd.hpp +++ b/nano/node/fwd.hpp @@ -19,6 +19,7 @@ class bootstrap_service; class confirming_set; class election; class election_status; +class ledger_notifications; class local_block_broadcaster; class local_vote_history; class logger; diff --git a/nano/node/ledger_notifications.cpp b/nano/node/ledger_notifications.cpp new file mode 100644 index 0000000000..aeb1bf9063 --- /dev/null +++ b/nano/node/ledger_notifications.cpp @@ -0,0 +1,138 @@ +#include +#include +#include +#include + +nano::ledger_notifications::ledger_notifications (nano::node_config const & config, nano::stats & stats, nano::logger & logger) : + config{ config }, + stats{ stats }, + logger{ logger } +{ +} + +nano::ledger_notifications::~ledger_notifications () +{ + debug_assert (!thread.joinable ()); +} + +void nano::ledger_notifications::start () +{ + debug_assert (!thread.joinable ()); + + thread = std::thread{ [this] () { + nano::thread_role::set (nano::thread_role::name::ledger_notifications); + run (); + } }; +} + +void nano::ledger_notifications::stop () +{ + { + nano::lock_guard guard{ mutex }; + stopped = true; + } + condition.notify_all (); + if (thread.joinable ()) + { + thread.join (); + } +} + +void nano::ledger_notifications::wait (std::function cooldown_action) +{ + nano::unique_lock lock{ mutex }; + condition.wait (lock, [this, &cooldown_action] { + bool predicate = stopped || notifications.size () < config.max_ledger_notifications; + if (!predicate && cooldown_action) + { + cooldown_action (); + } + return predicate; + }); +} + +void nano::ledger_notifications::notify_processed (nano::secure::write_transaction & transaction, processed_batch_t processed, std::function callback) +{ + { + nano::lock_guard guard{ mutex }; + notifications.emplace_back (transaction.get_future (), nano::wrap_move_only ([this, processed = std::move (processed), callback = std::move (callback)] () mutable { + stats.inc (nano::stat::type::ledger_notifications, nano::stat::detail::notify_processed); + + // Set results for futures when not holding the lock + for (auto & [result, context] : processed) + { + if (context.callback) + { + context.callback (result); + } + context.set_result (result); + } + + blocks_processed.notify (processed); + + if (callback) + { + callback (); + } + })); + } + condition.notify_all (); +} + +void nano::ledger_notifications::notify_rolled_back (nano::secure::write_transaction & transaction, rolled_back_batch_t batch, nano::qualified_root rollback_root, std::function callback) +{ + { + nano::lock_guard guard{ mutex }; + notifications.emplace_back (transaction.get_future (), nano::wrap_move_only ([this, batch = std::move (batch), rollback_root, callback = std::move (callback)] () { + stats.inc (nano::stat::type::ledger_notifications, nano::stat::detail::notify_rolled_back); + + blocks_rolled_back.notify (batch, rollback_root); + + if (callback) + { + callback (); + } + })); + } + condition.notify_all (); +} + +void nano::ledger_notifications::run () +{ + nano::unique_lock lock{ mutex }; + while (!stopped) + { + condition.wait (lock, [this] { + return stopped || !notifications.empty (); + }); + + if (stopped) + { + return; + } + + while (!notifications.empty ()) + { + auto notification = std::move (notifications.front ()); + notifications.pop_front (); + lock.unlock (); + + auto & [future, callback] = notification; + future.wait (); // Wait for the associated transaction to be committed + callback (); // Notify observers + + condition.notify_all (); // Notify waiting threads about possible vacancy + + lock.lock (); + } + } +} + +nano::container_info nano::ledger_notifications::container_info () const +{ + nano::lock_guard guard{ mutex }; + + nano::container_info info; + info.put ("notifications", notifications.size ()); + return info; +} \ No newline at end of file diff --git a/nano/node/ledger_notifications.hpp b/nano/node/ledger_notifications.hpp new file mode 100644 index 0000000000..52c23dca5d --- /dev/null +++ b/nano/node/ledger_notifications.hpp @@ -0,0 +1,66 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace nano +{ +class ledger_notifications +{ +public: // Events + // All processed blocks including forks, rejected etc + using processed_batch_t = std::deque>; + using processed_batch_event_t = nano::observer_set; + processed_batch_event_t blocks_processed; + + // Rolled back blocks + using rolled_back_batch_t = std::deque>; + using rolled_back_event_t = nano::observer_set>, nano::qualified_root>; + rolled_back_event_t blocks_rolled_back; + +public: + ledger_notifications (nano::node_config const &, nano::stats &, nano::logger &); + ~ledger_notifications (); + + void start (); + void stop (); + + /* Components should cooperate to ensure that the notification queue does not grow indefinitely */ + void wait (std::function cooldown_action = nullptr); + + /* + * Write transactions are passed to ensure that notifications are queued in the correct order, which is the same as the order of write transactions + * However, we cannot dispatch notifications before the write transaction is committed otherwise the notified components may not see the changes + * It's an important subtlety and the reason for additional complexity in this and transaction classes + */ + void notify_processed (nano::secure::write_transaction &, processed_batch_t batch, std::function callback = nullptr); + void notify_rolled_back (nano::secure::write_transaction &, rolled_back_batch_t batch, nano::qualified_root rollback_root, std::function callback = nullptr); + + nano::container_info container_info () const; + +private: // Dependencies + nano::node_config const & config; + nano::stats & stats; + nano::logger & logger; + +private: + void run (); + +private: + using entry = std::pair, std::function>; // + std::deque notifications; + + std::thread thread; + nano::condition_variable condition; + mutable nano::mutex mutex; + bool stopped{ false }; +}; +} \ No newline at end of file diff --git a/nano/node/local_block_broadcaster.cpp b/nano/node/local_block_broadcaster.cpp index 607fb9d257..10acde6261 100644 --- a/nano/node/local_block_broadcaster.cpp +++ b/nano/node/local_block_broadcaster.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -10,10 +11,10 @@ #include -nano::local_block_broadcaster::local_block_broadcaster (local_block_broadcaster_config const & config_a, nano::node & node_a, nano::block_processor & block_processor_a, nano::network & network_a, nano::confirming_set & confirming_set_a, nano::stats & stats_a, nano::logger & logger_a, bool enabled_a) : +nano::local_block_broadcaster::local_block_broadcaster (local_block_broadcaster_config const & config_a, nano::node & node_a, nano::ledger_notifications & ledger_notifications_a, nano::network & network_a, nano::confirming_set & confirming_set_a, nano::stats & stats_a, nano::logger & logger_a, bool enabled_a) : config{ config_a }, node{ node_a }, - block_processor{ block_processor_a }, + ledger_notifications{ ledger_notifications_a }, network{ network_a }, confirming_set{ confirming_set_a }, stats{ stats_a }, @@ -26,7 +27,7 @@ nano::local_block_broadcaster::local_block_broadcaster (local_block_broadcaster_ return; } - block_processor.batch_processed.add ([this] (auto const & batch) { + ledger_notifications.blocks_processed.add ([this] (auto const & batch) { bool should_notify = false; for (auto const & [result, context] : batch) { @@ -56,7 +57,7 @@ nano::local_block_broadcaster::local_block_broadcaster (local_block_broadcaster_ } }); - block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) { + ledger_notifications.blocks_rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) { nano::lock_guard guard{ mutex }; for (auto const & block : blocks) { diff --git a/nano/node/local_block_broadcaster.hpp b/nano/node/local_block_broadcaster.hpp index 7353f39fe5..3f18aa63be 100644 --- a/nano/node/local_block_broadcaster.hpp +++ b/nano/node/local_block_broadcaster.hpp @@ -53,7 +53,7 @@ class local_block_broadcaster_config final class local_block_broadcaster final { public: - local_block_broadcaster (local_block_broadcaster_config const &, nano::node &, nano::block_processor &, nano::network &, nano::confirming_set &, nano::stats &, nano::logger &, bool enabled = false); + local_block_broadcaster (local_block_broadcaster_config const &, nano::node &, nano::ledger_notifications &, nano::network &, nano::confirming_set &, nano::stats &, nano::logger &, bool enabled = false); ~local_block_broadcaster (); void start (); @@ -73,7 +73,7 @@ class local_block_broadcaster final private: // Dependencies local_block_broadcaster_config const & config; nano::node & node; - nano::block_processor & block_processor; + nano::ledger_notifications & ledger_notifications; nano::network & network; nano::confirming_set & confirming_set; nano::stats & stats; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index f3f7e679b8..259ca63c67 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -116,6 +117,8 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy wallets{ *wallets_impl }, ledger_impl{ std::make_unique (store, stats, network_params.ledger, flags_a.generate_cache, config_a.representative_vote_weight_minimum.number ()) }, ledger{ *ledger_impl }, + ledger_notifications_impl{ std::make_unique (config, stats, logger) }, + ledger_notifications{ *ledger_notifications_impl }, outbound_limiter_impl{ std::make_unique (config) }, outbound_limiter{ *outbound_limiter_impl }, message_processor_impl{ std::make_unique (config.message_processor, *this) }, @@ -138,13 +141,13 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy tcp_listener{ *tcp_listener_impl }, port_mapping_impl{ std::make_unique (*this) }, port_mapping{ *port_mapping_impl }, - block_processor_impl{ std::make_unique (config, ledger, unchecked, stats, logger) }, + block_processor_impl{ std::make_unique (config, ledger, ledger_notifications, unchecked, stats, logger) }, block_processor{ *block_processor_impl }, - confirming_set_impl{ std::make_unique (config.confirming_set, ledger, block_processor, stats, logger) }, + confirming_set_impl{ std::make_unique (config.confirming_set, ledger, ledger_notifications, stats, logger) }, confirming_set{ *confirming_set_impl }, bucketing_impl{ std::make_unique () }, bucketing{ *bucketing_impl }, - active_impl{ std::make_unique (*this, confirming_set, block_processor) }, + active_impl{ std::make_unique (*this, ledger_notifications, confirming_set) }, active{ *active_impl }, online_reps_impl{ std::make_unique (config, ledger, stats, logger) }, online_reps{ *online_reps_impl }, @@ -170,23 +173,23 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy generator{ *generator_impl }, final_generator_impl{ std::make_unique (config, *this, ledger, wallets, vote_processor, history, network, stats, logger, /* final */ true) }, final_generator{ *final_generator_impl }, - scheduler_impl{ std::make_unique (config, *this, ledger, bucketing, block_processor, active, online_reps, vote_cache, confirming_set, stats, logger) }, + scheduler_impl{ std::make_unique (config, *this, ledger, ledger_notifications, bucketing, active, online_reps, vote_cache, confirming_set, stats, logger) }, scheduler{ *scheduler_impl }, aggregator_impl{ std::make_unique (config.request_aggregator, *this, generator, final_generator, history, ledger, wallets, vote_router) }, aggregator{ *aggregator_impl }, backlog_scan_impl{ std::make_unique (config.backlog_scan, ledger, stats) }, backlog_scan{ *backlog_scan_impl }, - backlog_impl{ std::make_unique (config, *this, ledger, bucketing, backlog_scan, block_processor, confirming_set, stats, logger) }, + backlog_impl{ std::make_unique (config, *this, ledger, ledger_notifications, bucketing, backlog_scan, block_processor, confirming_set, stats, logger) }, backlog{ *backlog_impl }, bootstrap_server_impl{ std::make_unique (config.bootstrap_server, store, ledger, network_params.network, stats) }, bootstrap_server{ *bootstrap_server_impl }, - bootstrap_impl{ std::make_unique (config, block_processor, ledger, network, stats, logger) }, + bootstrap_impl{ std::make_unique (config, ledger, ledger_notifications, block_processor, network, stats, logger) }, bootstrap{ *bootstrap_impl }, websocket_impl{ std::make_unique (config.websocket_config, observers, wallets, ledger, io_ctx, logger) }, websocket{ *websocket_impl }, epoch_upgrader_impl{ std::make_unique (*this, ledger, store, network_params, logger) }, epoch_upgrader{ *epoch_upgrader_impl }, - local_block_broadcaster_impl{ std::make_unique (config.local_block_broadcaster, *this, block_processor, network, confirming_set, stats, logger, !flags.disable_block_processor_republishing) }, + local_block_broadcaster_impl{ std::make_unique (config.local_block_broadcaster, *this, ledger_notifications, network, confirming_set, stats, logger, !flags.disable_block_processor_republishing) }, local_block_broadcaster{ *local_block_broadcaster_impl }, process_live_dispatcher_impl{ std::make_unique (ledger, scheduler.priority, vote_cache, websocket) }, process_live_dispatcher{ *process_live_dispatcher_impl }, @@ -237,8 +240,23 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy active.recently_confirmed.erase (hash); }); + // Announce new blocks via websocket + ledger_notifications.blocks_processed.add ([this] (auto const & batch) { + auto const transaction = ledger.tx_begin_read (); + for (auto const & [result, context] : batch) + { + if (result == nano::block_status::progress) + { + if (websocket.server && websocket.server->any_subscriber (nano::websocket::topic::new_unconfirmed_block)) + { + websocket.server->broadcast (nano::websocket::message_builder ().new_block_arrived (*context.block)); + } + } + } + }); + // Do some cleanup of rolled back blocks - block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) { + ledger_notifications.blocks_rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) { for (auto const & block : blocks) { history.erase (block->root ()); @@ -539,6 +557,7 @@ void nano::node::start () rep_tiers.start (); vote_processor.start (); vote_cache_processor.start (); + ledger_notifications.start (); block_processor.start (); active.start (); generator.start (); @@ -595,6 +614,7 @@ void nano::node::stop () generator.stop (); final_generator.stop (); confirming_set.stop (); + ledger_notifications.stop (); telemetry.stop (); websocket.stop (); bootstrap_server.stop (); diff --git a/nano/node/node.hpp b/nano/node/node.hpp index f25098be8e..904155b3a0 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -132,6 +132,8 @@ class node final : public std::enable_shared_from_this nano::wallets & wallets; std::unique_ptr ledger_impl; nano::ledger & ledger; + std::unique_ptr ledger_notifications_impl; + nano::ledger_notifications & ledger_notifications; std::unique_ptr outbound_limiter_impl; nano::bandwidth_limiter & outbound_limiter; std::unique_ptr message_processor_impl; diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index a981b00ca5..54e7b02201 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -134,6 +134,7 @@ class node_config nano::rocksdb_config rocksdb_config; nano::lmdb_config lmdb_config; bool enable_upnp{ true }; + std::size_t max_ledger_notifications{ 8 }; public: nano::vote_cache_config vote_cache; diff --git a/nano/node/process_live_dispatcher.cpp b/nano/node/process_live_dispatcher.cpp index 9e4775bc82..2a429be8ad 100644 --- a/nano/node/process_live_dispatcher.cpp +++ b/nano/node/process_live_dispatcher.cpp @@ -19,32 +19,12 @@ nano::process_live_dispatcher::process_live_dispatcher (nano::ledger & ledger, n void nano::process_live_dispatcher::connect (nano::block_processor & block_processor) { - block_processor.batch_processed.add ([this] (auto const & batch) { - auto const transaction = ledger.tx_begin_read (); - for (auto const & [result, context] : batch) - { - debug_assert (context.block != nullptr); - inspect (result, *context.block, transaction); - } - }); } void nano::process_live_dispatcher::inspect (nano::block_status const & result, nano::block const & block, secure::transaction const & transaction) { - switch (result) - { - case nano::block_status::progress: - process_live (block, transaction); - break; - default: - break; - } } void nano::process_live_dispatcher::process_live (nano::block const & block, secure::transaction const & transaction) { - if (websocket.server && websocket.server->any_subscriber (nano::websocket::topic::new_unconfirmed_block)) - { - websocket.server->broadcast (nano::websocket::message_builder ().new_block_arrived (block)); - } } diff --git a/nano/node/scheduler/component.cpp b/nano/node/scheduler/component.cpp index 82b034bd1b..175b7186e4 100644 --- a/nano/node/scheduler/component.cpp +++ b/nano/node/scheduler/component.cpp @@ -5,11 +5,11 @@ #include #include -nano::scheduler::component::component (nano::node_config & node_config, nano::node & node, nano::ledger & ledger, nano::bucketing & bucketing, nano::block_processor & block_processor, nano::active_elections & active, nano::online_reps & online_reps, nano::vote_cache & vote_cache, nano::confirming_set & confirming_set, nano::stats & stats, nano::logger & logger) : +nano::scheduler::component::component (nano::node_config & node_config, nano::node & node, nano::ledger & ledger, nano::ledger_notifications & ledger_notifications, nano::bucketing & bucketing, nano::active_elections & active, nano::online_reps & online_reps, nano::vote_cache & vote_cache, nano::confirming_set & confirming_set, nano::stats & stats, nano::logger & logger) : hinted_impl{ std::make_unique (node_config.hinted_scheduler, node, vote_cache, active, online_reps, stats) }, manual_impl{ std::make_unique (node) }, optimistic_impl{ std::make_unique (node_config.optimistic_scheduler, node, ledger, active, node_config.network_params.network, stats) }, - priority_impl{ std::make_unique (node_config, node, ledger, bucketing, block_processor, active, confirming_set, stats, logger) }, + priority_impl{ std::make_unique (node_config, node, ledger, ledger_notifications, bucketing, active, confirming_set, stats, logger) }, hinted{ *hinted_impl }, manual{ *manual_impl }, optimistic{ *optimistic_impl }, diff --git a/nano/node/scheduler/component.hpp b/nano/node/scheduler/component.hpp index 97373577d2..e4ddf3d908 100644 --- a/nano/node/scheduler/component.hpp +++ b/nano/node/scheduler/component.hpp @@ -10,7 +10,7 @@ namespace nano::scheduler class component final { public: - component (nano::node_config &, nano::node &, nano::ledger &, nano::bucketing &, nano::block_processor &, nano::active_elections &, nano::online_reps &, nano::vote_cache &, nano::confirming_set &, nano::stats &, nano::logger &); + component (nano::node_config &, nano::node &, nano::ledger &, nano::ledger_notifications &, nano::bucketing &, nano::active_elections &, nano::online_reps &, nano::vote_cache &, nano::confirming_set &, nano::stats &, nano::logger &); ~component (); void start (); diff --git a/nano/node/scheduler/priority.cpp b/nano/node/scheduler/priority.cpp index 566c9511f3..14d83af51d 100644 --- a/nano/node/scheduler/priority.cpp +++ b/nano/node/scheduler/priority.cpp @@ -2,18 +2,19 @@ #include #include #include +#include #include #include #include #include #include -nano::scheduler::priority::priority (nano::node_config & node_config, nano::node & node_a, nano::ledger & ledger_a, nano::bucketing & bucketing_a, nano::block_processor & block_processor_a, nano::active_elections & active_a, nano::confirming_set & confirming_set_a, nano::stats & stats_a, nano::logger & logger_a) : +nano::scheduler::priority::priority (nano::node_config & node_config, nano::node & node_a, nano::ledger & ledger_a, nano::ledger_notifications & ledger_notifications_a, nano::bucketing & bucketing_a, nano::active_elections & active_a, nano::confirming_set & confirming_set_a, nano::stats & stats_a, nano::logger & logger_a) : config{ node_config.priority_scheduler }, node{ node_a }, ledger{ ledger_a }, + ledger_notifications{ ledger_notifications_a }, bucketing{ bucketing_a }, - block_processor{ block_processor_a }, active{ active_a }, confirming_set{ confirming_set_a }, stats{ stats_a }, @@ -25,7 +26,7 @@ nano::scheduler::priority::priority (nano::node_config & node_config, nano::node } // Activate accounts with fresh blocks - block_processor.batch_processed.add ([this] (auto const & batch) { + ledger_notifications.blocks_processed.add ([this] (auto const & batch) { auto transaction = ledger.tx_begin_read (); for (auto const & [result, context] : batch) { diff --git a/nano/node/scheduler/priority.hpp b/nano/node/scheduler/priority.hpp index 6f0e3badd2..b8ca4eba83 100644 --- a/nano/node/scheduler/priority.hpp +++ b/nano/node/scheduler/priority.hpp @@ -27,7 +27,7 @@ class priority_config class priority final { public: - priority (nano::node_config &, nano::node &, nano::ledger &, nano::bucketing &, nano::block_processor &, nano::active_elections &, nano::confirming_set &, nano::stats &, nano::logger &); + priority (nano::node_config &, nano::node &, nano::ledger &, nano::ledger_notifications &, nano::bucketing &, nano::active_elections &, nano::confirming_set &, nano::stats &, nano::logger &); ~priority (); void start (); @@ -52,8 +52,8 @@ class priority final priority_config const & config; nano::node & node; nano::ledger & ledger; + nano::ledger_notifications & ledger_notifications; nano::bucketing & bucketing; - nano::block_processor & block_processor; nano::active_elections & active; nano::confirming_set & confirming_set; nano::stats & stats; diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index f25ecff973..b5c4ef613f 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -1144,9 +1145,9 @@ TEST (confirmation_height, many_accounts_send_receive_self_no_elections) nano::node_config node_config; nano::unchecked_map unchecked{ 0, stats, false }; - nano::block_processor block_processor{ node_config, ledger, unchecked, stats, logger }; + nano::ledger_notifications ledger_notifications{ node_config, stats, logger }; nano::confirming_set_config confirming_set_config{}; - nano::confirming_set confirming_set{ confirming_set_config, ledger, block_processor, stats, logger }; + nano::confirming_set confirming_set{ confirming_set_config, ledger, ledger_notifications, stats, logger }; auto const num_accounts = 100000;