Skip to content

Commit

Permalink
Merge pull request #4816 from pwojcikdev/move-rpc-callbacks
Browse files Browse the repository at this point in the history
Move http callbacks logic out of node class
  • Loading branch information
pwojcikdev authored Jan 13, 2025
2 parents 51c8caa + c833b02 commit 6dcf92b
Show file tree
Hide file tree
Showing 18 changed files with 323 additions and 134 deletions.
23 changes: 22 additions & 1 deletion nano/lib/interval.hpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
#pragma once

#include <chrono>
#include <mutex>

namespace nano
{
class interval
{
public:
bool elapsed (auto target)
bool elapse (auto target)
{
auto const now = std::chrono::steady_clock::now ();
if (now - last >= target)
Expand All @@ -21,4 +22,24 @@ class interval
private:
std::chrono::steady_clock::time_point last{ std::chrono::steady_clock::now () };
};

class interval_mt
{
public:
bool elapse (auto target)
{
std::lock_guard guard{ mutex };
auto const now = std::chrono::steady_clock::now ();
if (now - last >= target)
{
last = now;
return true;
}
return false;
}

private:
std::mutex mutex;
std::chrono::steady_clock::time_point last{ std::chrono::steady_clock::now () };
};
}
2 changes: 1 addition & 1 deletion nano/lib/logging_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ enum class type
qt,
rpc,
rpc_connection,
rpc_callbacks,
http_callbacks,
rpc_request,
ipc,
ipc_server,
Expand Down
17 changes: 16 additions & 1 deletion nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ enum class type
election,
election_cleanup,
election_vote,
http_callback,
http_callbacks,
http_callbacks_notified,
http_callbacks_ec,
ipc,
tcp,
tcp_server,
Expand Down Expand Up @@ -166,6 +168,8 @@ enum class detail
other,
drop,
queued,
error,
failed,

// processing queue
queue,
Expand Down Expand Up @@ -626,6 +630,17 @@ enum class detail
host_unreachable,
not_supported,

// http
error_resolving,
error_connecting,
error_sending,
error_completing,
bad_status,

// http_callbacks
block_confirmed,
large_backlog,

_last // Must be the last enum
};

Expand Down
3 changes: 3 additions & 0 deletions nano/lib/thread_roles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::monitor:
thread_role_name_string = "Monitor";
break;
case nano::thread_role::name::http_callbacks:
thread_role_name_string = "HTTP callbacks";
break;
default:
debug_assert (false && "nano::thread_role::get_string unhandled thread role");
}
Expand Down
1 change: 1 addition & 0 deletions nano/lib/thread_roles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ enum class name
vote_router,
online_reps,
monitor,
http_callbacks,
};

std::string_view to_string (name);
Expand Down
2 changes: 1 addition & 1 deletion nano/lib/uniquer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class uniquer final

nano::lock_guard<nano::mutex> guard{ mutex };

if (cleanup_interval.elapsed (cleanup_cutoff))
if (cleanup_interval.elapse (cleanup_cutoff))
{
cleanup ();
}
Expand Down
2 changes: 2 additions & 0 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ add_library(
rep_tiers.cpp
request_aggregator.hpp
request_aggregator.cpp
rpc_callbacks.hpp
rpc_callbacks.cpp
scheduler/bucket.cpp
scheduler/bucket.hpp
scheduler/component.hpp
Expand Down
2 changes: 1 addition & 1 deletion nano/node/block_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ void nano::block_processor::run ()
}
}

if (log_interval.elapsed (15s))
if (log_interval.elapse (15s))
{
logger.info (nano::log::type::block_processor, "{} blocks (+ {} forced) in processing queue",
queue.size (),
Expand Down
2 changes: 1 addition & 1 deletion nano/node/bootstrap/bootstrap_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ void nano::bootstrap_service::cleanup_and_sync ()
tags_by_order.pop_front ();
}

if (sync_dependencies_interval.elapsed (60s))
if (sync_dependencies_interval.elapse (60s))
{
stats.inc (nano::stat::type::bootstrap, nano::stat::detail::sync_dependencies);
accounts.sync_dependencies ();
Expand Down
1 change: 1 addition & 0 deletions nano/node/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class recently_cemented_cache;
class recently_confirmed_cache;
class rep_crawler;
class rep_tiers;
class http_callbacks;
class telemetry;
class unchecked_map;
class stats;
Expand Down
2 changes: 1 addition & 1 deletion nano/node/local_block_broadcaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ void nano::local_block_broadcaster::run ()
{
stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::loop);

if (cleanup_interval.elapsed (config.cleanup_interval))
if (cleanup_interval.elapse (config.cleanup_interval))
{
cleanup (lock);
debug_assert (lock.owns_lock ());
Expand Down
128 changes: 6 additions & 122 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <nano/node/peer_history.hpp>
#include <nano/node/portmapping.hpp>
#include <nano/node/request_aggregator.hpp>
#include <nano/node/rpc_callbacks.hpp>
#include <nano/node/scheduler/component.hpp>
#include <nano/node/scheduler/hinted.hpp>
#include <nano/node/scheduler/manual.hpp>
Expand Down Expand Up @@ -193,6 +194,8 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
peer_history{ *peer_history_impl },
monitor_impl{ std::make_unique<nano::monitor> (config.monitor, *this) },
monitor{ *monitor_impl },
http_callbacks_impl{ std::make_unique<nano::http_callbacks> (*this) },
http_callbacks{ *http_callbacks_impl },
startup_time{ std::chrono::steady_clock::now () },
node_seq{ seq }
{
Expand Down Expand Up @@ -250,66 +253,6 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
network.disconnect_observer = [this] () {
observers.disconnect.notify ();
};
if (!config.callback_address.empty ())
{
observers.blocks.add ([this] (nano::election_status const & status_a, std::vector<nano::vote_with_weight_info> const & votes_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a, bool is_state_epoch_a) {
auto block_a (status_a.winner);
if ((status_a.type == nano::election_status_type::active_confirmed_quorum || status_a.type == nano::election_status_type::active_confirmation_height))
{
auto node_l (shared_from_this ());
io_ctx.post ([node_l, block_a, account_a, amount_a, is_state_send_a, is_state_epoch_a] () {
boost::property_tree::ptree event;
event.add ("account", account_a.to_account ());
event.add ("hash", block_a->hash ().to_string ());
std::string block_text;
block_a->serialize_json (block_text);
event.add ("block", block_text);
event.add ("amount", amount_a.to_string_dec ());
if (is_state_send_a)
{
event.add ("is_send", is_state_send_a);
event.add ("subtype", "send");
}
// Subtype field
else if (block_a->type () == nano::block_type::state)
{
if (block_a->is_change ())
{
event.add ("subtype", "change");
}
else if (is_state_epoch_a)
{
debug_assert (amount_a == 0 && node_l->ledger.is_epoch_link (block_a->link_field ().value ()));
event.add ("subtype", "epoch");
}
else
{
event.add ("subtype", "receive");
}
}
std::stringstream ostream;
boost::property_tree::write_json (ostream, event);
ostream.flush ();
auto body (std::make_shared<std::string> (ostream.str ()));
auto address (node_l->config.callback_address);
auto port (node_l->config.callback_port);
auto target (std::make_shared<std::string> (node_l->config.callback_target));
auto resolver (std::make_shared<boost::asio::ip::tcp::resolver> (node_l->io_ctx));
resolver->async_resolve (boost::asio::ip::tcp::resolver::query (address, std::to_string (port)), [node_l, address, port, target, body, resolver] (boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator i_a) {
if (!ec)
{
node_l->do_rpc_callback (i_a, address, port, target, body, resolver);
}
else
{
node_l->logger.error (nano::log::type::rpc_callbacks, "Error resolving callback: {}:{} ({})", address, port, ec.message ());
node_l->stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out);
}
});
});
}
});
}

observers.channel_connected.add ([this] (std::shared_ptr<nano::transport::channel> const & channel) {
network.send_keepalive_self (channel);
Expand Down Expand Up @@ -472,68 +415,6 @@ nano::node::~node ()
stop ();
}

// TODO: Move to a separate class
void nano::node::do_rpc_callback (boost::asio::ip::tcp::resolver::iterator i_a, std::string const & address, uint16_t port, std::shared_ptr<std::string> const & target, std::shared_ptr<std::string> const & body, std::shared_ptr<boost::asio::ip::tcp::resolver> const & resolver)
{
if (i_a != boost::asio::ip::tcp::resolver::iterator{})
{
auto node_l (shared_from_this ());
auto sock (std::make_shared<boost::asio::ip::tcp::socket> (node_l->io_ctx));
sock->async_connect (i_a->endpoint (), [node_l, target, body, sock, address, port, i_a, resolver] (boost::system::error_code const & ec) mutable {
if (!ec)
{
auto req (std::make_shared<boost::beast::http::request<boost::beast::http::string_body>> ());
req->method (boost::beast::http::verb::post);
req->target (*target);
req->version (11);
req->insert (boost::beast::http::field::host, address);
req->insert (boost::beast::http::field::content_type, "application/json");
req->body () = *body;
req->prepare_payload ();
boost::beast::http::async_write (*sock, *req, [node_l, sock, address, port, req, i_a, target, body, resolver] (boost::system::error_code const & ec, std::size_t bytes_transferred) mutable {
if (!ec)
{
auto sb (std::make_shared<boost::beast::flat_buffer> ());
auto resp (std::make_shared<boost::beast::http::response<boost::beast::http::string_body>> ());
boost::beast::http::async_read (*sock, *sb, *resp, [node_l, sb, resp, sock, address, port, i_a, target, body, resolver] (boost::system::error_code const & ec, std::size_t bytes_transferred) mutable {
if (!ec)
{
if (boost::beast::http::to_status_class (resp->result ()) == boost::beast::http::status_class::successful)
{
node_l->stats.inc (nano::stat::type::http_callback, nano::stat::detail::initiate, nano::stat::dir::out);
}
else
{
node_l->logger.error (nano::log::type::rpc_callbacks, "Callback to {}:{} failed [status: {}]", address, port, nano::util::to_str (resp->result ()));
node_l->stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out);
}
}
else
{
node_l->logger.error (nano::log::type::rpc_callbacks, "Unable to complete callback: {}:{} ({})", address, port, ec.message ());
node_l->stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out);
};
});
}
else
{
node_l->logger.error (nano::log::type::rpc_callbacks, "Unable to send callback: {}:{} ({})", address, port, ec.message ());
node_l->stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out);
}
});
}
else
{
node_l->logger.error (nano::log::type::rpc_callbacks, "Unable to connect to callback address({}): {}:{} ({})", address, i_a->endpoint ().address ().to_string (), port, ec.message ());
node_l->stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out);
++i_a;

node_l->do_rpc_callback (i_a, address, port, target, body, resolver);
}
});
}
}

bool nano::node::copy_with_compaction (std::filesystem::path const & destination)
{
return store.copy_db (destination);
Expand Down Expand Up @@ -677,6 +558,7 @@ void nano::node::start ()
vote_router.start ();
online_reps.start ();
monitor.start ();
http_callbacks.start ();

add_initial_peers ();
}
Expand Down Expand Up @@ -724,6 +606,7 @@ void nano::node::stop ()
message_processor.stop ();
network.stop ();
monitor.stop ();
http_callbacks.stop ();

bootstrap_workers.stop ();
wallet_workers.stop ();
Expand Down Expand Up @@ -1194,6 +1077,7 @@ nano::container_info nano::node::container_info () const
info.add ("bandwidth", outbound_limiter.container_info ());
info.add ("backlog_scan", backlog_scan.container_info ());
info.add ("bounded_backlog", backlog.container_info ());
info.add ("http_callbacks", http_callbacks.container_info ());
return info;
}

Expand Down
3 changes: 2 additions & 1 deletion nano/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ class node final : public std::enable_shared_from_this<node>
bool block_confirmed_or_being_confirmed (nano::secure::transaction const &, nano::block_hash const &);
bool block_confirmed_or_being_confirmed (nano::block_hash const &);

void do_rpc_callback (boost::asio::ip::tcp::resolver::iterator i_a, std::string const &, uint16_t, std::shared_ptr<std::string> const &, std::shared_ptr<std::string> const &, std::shared_ptr<boost::asio::ip::tcp::resolver> const &);
bool online () const;
bool init_error () const;
std::pair<uint64_t, std::unordered_map<nano::account, nano::uint128_t>> get_bootstrap_weights () const;
Expand Down Expand Up @@ -201,6 +200,8 @@ class node final : public std::enable_shared_from_this<node>
nano::peer_history & peer_history;
std::unique_ptr<nano::monitor> monitor_impl;
nano::monitor & monitor;
std::unique_ptr<nano::http_callbacks> http_callbacks_impl;
nano::http_callbacks & http_callbacks;

public:
std::chrono::steady_clock::time_point const startup_time;
Expand Down
Loading

0 comments on commit 6dcf92b

Please sign in to comment.