Skip to content

Commit

Permalink
Merge pull request #4824 from pwojcikdev/ledger-notifications-3
Browse files Browse the repository at this point in the history
Fix out of order ledger notifications
  • Loading branch information
pwojcikdev authored Jan 17, 2025
2 parents 7115a27 + 5ed3aab commit 0db11e8
Show file tree
Hide file tree
Showing 35 changed files with 497 additions and 271 deletions.
24 changes: 11 additions & 13 deletions nano/core_test/confirming_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <nano/node/block_processor.hpp>
#include <nano/node/confirming_set.hpp>
#include <nano/node/election.hpp>
#include <nano/node/ledger_notifications.hpp>
#include <nano/node/make_store.hpp>
#include <nano/node/unchecked_map.hpp>
#include <nano/secure/ledger.hpp>
Expand All @@ -26,17 +27,15 @@ struct confirming_set_context
nano::stats & stats;
nano::ledger & ledger;

nano::unchecked_map unchecked;
nano::block_processor block_processor;
nano::ledger_notifications ledger_notifications;
nano::confirming_set confirming_set;

explicit confirming_set_context (nano::test::ledger_context & ledger_context, nano::node_config node_config = {}) :
logger{ ledger_context.logger () },
stats{ ledger_context.stats () },
ledger{ ledger_context.ledger () },
unchecked{ 0, stats, false },
block_processor{ node_config, ledger, unchecked, stats, logger },
confirming_set{ node_config.confirming_set, ledger, block_processor, stats, logger }
ledger_notifications{ node_config, stats, logger },
confirming_set{ node_config.confirming_set, ledger, ledger_notifications, stats, logger }
{
}
};
Expand Down Expand Up @@ -78,21 +77,20 @@ TEST (confirming_set, process_one)
TEST (confirming_set, process_multiple)
{
nano::test::system system;
auto & node = *system.add_node ();
auto ctx = nano::test::ledger_send_receive ();
nano::confirming_set_config config{};
nano::confirming_set confirming_set{ config, ctx.ledger (), node.block_processor, ctx.stats (), ctx.logger () };
auto ledger_ctx = nano::test::ledger_send_receive ();
confirming_set_context ctx{ ledger_ctx };
nano::confirming_set & confirming_set = ctx.confirming_set;
std::atomic<int> count = 0;
std::mutex mutex;
std::condition_variable condition;
confirming_set.cemented_observers.add ([&] (auto const &) { ++count; condition.notify_all (); });
confirming_set.add (ctx.blocks ()[0]->hash ());
confirming_set.add (ctx.blocks ()[1]->hash ());
confirming_set.add (ledger_ctx.blocks ()[0]->hash ());
confirming_set.add (ledger_ctx.blocks ()[1]->hash ());
nano::test::start_stop_guard guard{ confirming_set };
std::unique_lock lock{ mutex };
ASSERT_TRUE (condition.wait_for (lock, 5s, [&] () { return count == 2; }));
ASSERT_EQ (2, ctx.stats ().count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
ASSERT_EQ (3, ctx.ledger ().cemented_count ());
ASSERT_EQ (2, ctx.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
ASSERT_EQ (3, ctx.ledger.cemented_count ());
}

TEST (confirmation_callback, observer_callbacks)
Expand Down
2 changes: 0 additions & 2 deletions nano/core_test/toml.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,6 @@ TEST (toml_config, daemon_config_deserialize_defaults)

ASSERT_EQ (conf.node.bounded_backlog.enable, defaults.node.bounded_backlog.enable);
ASSERT_EQ (conf.node.bounded_backlog.batch_size, defaults.node.bounded_backlog.batch_size);
ASSERT_EQ (conf.node.bounded_backlog.max_queued_notifications, defaults.node.bounded_backlog.max_queued_notifications);
ASSERT_EQ (conf.node.bounded_backlog.scan_rate, defaults.node.bounded_backlog.scan_rate);

ASSERT_EQ (conf.node.websocket_config.enabled, defaults.node.websocket_config.enabled);
Expand Down Expand Up @@ -743,7 +742,6 @@ TEST (toml_config, daemon_config_deserialize_no_defaults)

ASSERT_NE (conf.node.bounded_backlog.enable, defaults.node.bounded_backlog.enable);
ASSERT_NE (conf.node.bounded_backlog.batch_size, defaults.node.bounded_backlog.batch_size);
ASSERT_NE (conf.node.bounded_backlog.max_queued_notifications, defaults.node.bounded_backlog.max_queued_notifications);
ASSERT_NE (conf.node.bounded_backlog.scan_rate, defaults.node.bounded_backlog.scan_rate);

ASSERT_NE (conf.node.websocket_config.enabled, defaults.node.websocket_config.enabled);
Expand Down
5 changes: 5 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ enum class type
message,
block,
ledger,
ledger_notifications,
rollback,
network,
vote,
Expand Down Expand Up @@ -577,6 +578,10 @@ enum class detail
tier_2,
tier_3,

// ledger_notifications
notify_processed,
notify_rolled_back,

// confirming_set
notify_cemented,
notify_already_cemented,
Expand Down
7 changes: 2 additions & 5 deletions nano/lib/thread_roles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::block_processing:
thread_role_name_string = "Blck processing";
break;
case nano::thread_role::name::block_processing_notifications:
thread_role_name_string = "Blck proc notif";
case nano::thread_role::name::ledger_notifications:
thread_role_name_string = "Ledger notif";
break;
case nano::thread_role::name::request_loop:
thread_role_name_string = "Request loop";
Expand Down Expand Up @@ -106,9 +106,6 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::bounded_backlog_scan:
thread_role_name_string = "Bounded b scan";
break;
case nano::thread_role::name::bounded_backlog_notifications:
thread_role_name_string = "Bounded b notif";
break;
case nano::thread_role::name::vote_generator_queue:
thread_role_name_string = "Voting que";
break;
Expand Down
3 changes: 1 addition & 2 deletions nano/lib/thread_roles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ enum class name
vote_processing,
vote_cache_processing,
block_processing,
block_processing_notifications,
ledger_notifications,
request_loop,
wallet_actions,
bootstrap_initiator,
Expand All @@ -40,7 +40,6 @@ enum class name
backlog_scan,
bounded_backlog,
bounded_backlog_scan,
bounded_backlog_notifications,
vote_generator_queue,
telemetry,
bootstrap,
Expand Down
5 changes: 5 additions & 0 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ add_library(
backlog_scan.cpp
bandwidth_limiter.hpp
bandwidth_limiter.cpp
block_context.hpp
block_processor.hpp
block_processor.cpp
block_source.hpp
block_source.cpp
bucketing.hpp
bucketing.cpp
bounded_backlog.hpp
Expand Down Expand Up @@ -85,6 +88,8 @@ add_library(
ipc/ipc_server.cpp
json_handler.hpp
json_handler.cpp
ledger_notifications.hpp
ledger_notifications.cpp
local_block_broadcaster.cpp
local_block_broadcaster.hpp
local_vote_history.cpp
Expand Down
9 changes: 5 additions & 4 deletions nano/node/active_elections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <nano/node/confirmation_solicitor.hpp>
#include <nano/node/confirming_set.hpp>
#include <nano/node/election.hpp>
#include <nano/node/ledger_notifications.hpp>
#include <nano/node/node.hpp>
#include <nano/node/online_reps.hpp>
#include <nano/node/repcrawler.hpp>
Expand All @@ -21,11 +22,11 @@

using namespace std::chrono;

nano::active_elections::active_elections (nano::node & node_a, nano::confirming_set & confirming_set_a, nano::block_processor & block_processor_a) :
nano::active_elections::active_elections (nano::node & node_a, nano::ledger_notifications & ledger_notifications_a, nano::confirming_set & confirming_set_a) :
config{ node_a.config.active_elections },
node{ node_a },
ledger_notifications{ ledger_notifications_a },
confirming_set{ confirming_set_a },
block_processor{ block_processor_a },
recently_confirmed{ config.confirmation_cache },
recently_cemented{ config.confirmation_history_size }
{
Expand Down Expand Up @@ -55,7 +56,7 @@ nano::active_elections::active_elections (nano::node & node_a, nano::confirming_
});

// Notify elections about alternative (forked) blocks
block_processor.batch_processed.add ([this] (auto const & batch) {
ledger_notifications.blocks_processed.add ([this] (auto const & batch) {
for (auto const & [result, context] : batch)
{
if (result == nano::block_status::fork)
Expand All @@ -66,7 +67,7 @@ nano::active_elections::active_elections (nano::node & node_a, nano::confirming_
});

// Stop all rolled back active transactions except initial
block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) {
ledger_notifications.blocks_rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) {
for (auto const & block : blocks)
{
if (block->qualified_root () != rollback_root)
Expand Down
4 changes: 2 additions & 2 deletions nano/node/active_elections.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class active_elections final
ordered_roots roots;

public:
active_elections (nano::node &, nano::confirming_set &, nano::block_processor &);
active_elections (nano::node &, nano::ledger_notifications &, nano::confirming_set &);
~active_elections ();

void start ();
Expand Down Expand Up @@ -144,8 +144,8 @@ class active_elections final
private: // Dependencies
active_elections_config const & config;
nano::node & node;
nano::ledger_notifications & ledger_notifications;
nano::confirming_set & confirming_set;
nano::block_processor & block_processor;

public:
nano::recently_confirmed_cache recently_confirmed;
Expand Down
44 changes: 44 additions & 0 deletions nano/node/block_context.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#pragma once

#include <nano/node/block_source.hpp>
#include <nano/secure/common.hpp>

#include <future>

namespace nano
{
class block_context
{
public:
using result_t = nano::block_status;
using callback_t = std::function<void (result_t)>;

public: // Keep fields public for simplicity
std::shared_ptr<nano::block> block;
nano::block_source source;
callback_t callback;
std::chrono::steady_clock::time_point arrival{ std::chrono::steady_clock::now () };

public:
block_context (std::shared_ptr<nano::block> block, nano::block_source source, callback_t callback = nullptr) :
block{ std::move (block) },
source{ source },
callback{ std::move (callback) }
{
debug_assert (source != nano::block_source::unknown);
}

std::future<result_t> get_future ()
{
return promise.get_future ();
}

void set_result (result_t result)
{
promise.set_value (result);
}

private:
std::promise<result_t> promise;
};
}
Loading

0 comments on commit 0db11e8

Please sign in to comment.