From 8ab8eae94b05805b3d360e571de594d96a833e58 Mon Sep 17 00:00:00 2001 From: Rob N Date: Mon, 13 Jan 2025 18:50:28 -1000 Subject: [PATCH] refactor(client)!: rename `EventSender` to `Requester` --- example/managed.rs | 6 ++-- example/rescan.rs | 8 ++--- example/signet.rs | 6 ++-- example/testnet4.rs | 4 +-- src/core/client.rs | 16 +++++----- src/lib.rs | 6 ++-- tests/core.rs | 74 ++++++++++++++++++++++----------------------- 7 files changed, 59 insertions(+), 61 deletions(-) diff --git a/example/managed.rs b/example/managed.rs index 11613c6..76b0f29 100644 --- a/example/managed.rs +++ b/example/managed.rs @@ -52,7 +52,7 @@ async fn main() { tokio::task::spawn(async move { node.run().await }); let Client { - sender, + requester, mut log_rx, mut event_rx, } = client; @@ -78,7 +78,7 @@ async fn main() { if filter.contains_any(&addresses) { let hash = *filter.block_hash(); tracing::info!("Found script at {}!", hash); - let indexed_block = sender.get_block(hash).await.unwrap(); + let indexed_block = requester.get_block(hash).await.unwrap(); let coinbase = indexed_block.block.txdata.first().unwrap().compute_txid(); tracing::info!("Coinbase transaction ID: {}", coinbase); break; @@ -91,6 +91,6 @@ async fn main() { } } } - let _ = sender.shutdown().await; + let _ = requester.shutdown().await; tracing::info!("Shutting down"); } diff --git a/example/rescan.rs b/example/rescan.rs index edae3f5..8e396d5 100644 --- a/example/rescan.rs +++ b/example/rescan.rs @@ -43,7 +43,7 @@ async fn main() { tracing::info!("Running the node and waiting for a sync message. Please wait a minute!"); // Split the client into components that send messages and listen to messages let Client { - sender, + requester, mut log_rx, mut event_rx, } = client; @@ -74,9 +74,9 @@ async fn main() { .unwrap() .require_network(NETWORK) .unwrap(); - sender.add_script(new_script).await.unwrap(); + requester.add_script(new_script).await.unwrap(); // // Tell the node to look for these new scripts - sender.rescan().await.unwrap(); + requester.rescan().await.unwrap(); tracing::info!("Starting rescan"); loop { tokio::select! { @@ -98,6 +98,6 @@ async fn main() { } } } - let _ = sender.shutdown().await; + let _ = requester.shutdown().await; tracing::info!("Shutting down"); } diff --git a/example/signet.rs b/example/signet.rs index b6f2442..8e33d70 100644 --- a/example/signet.rs +++ b/example/signet.rs @@ -55,7 +55,7 @@ async fn main() { // With this construction, different parts of the program can take ownership of // specific tasks. let Client { - sender, + requester, mut log_rx, mut event_rx, } = client; @@ -69,7 +69,7 @@ async fn main() { tracing::info!("Synced chain up to block {}",update.tip().height); tracing::info!("Chain tip: {}",update.tip().hash); // Request information from the node - let fee = sender.broadcast_min_feerate().await.unwrap(); + let fee = requester.broadcast_min_feerate().await.unwrap(); tracing::info!("Minimum transaction broadcast fee rate: {}", fee); break; }, @@ -97,6 +97,6 @@ async fn main() { } } } - let _ = sender.shutdown().await; + let _ = requester.shutdown().await; tracing::info!("Shutting down"); } diff --git a/example/testnet4.rs b/example/testnet4.rs index 2965362..1fa5ce8 100644 --- a/example/testnet4.rs +++ b/example/testnet4.rs @@ -50,7 +50,7 @@ async fn main() { // With this construction, different parts of the program can take ownership of // specific tasks. let Client { - sender, + requester, mut log_rx, mut event_rx, } = client; @@ -89,6 +89,6 @@ async fn main() { } } } - let _ = sender.shutdown().await; + let _ = requester.shutdown().await; tracing::info!("Shutting down"); } diff --git a/src/core/client.rs b/src/core/client.rs index f9894b2..729e9c7 100644 --- a/src/core/client.rs +++ b/src/core/client.rs @@ -21,7 +21,7 @@ use super::{ #[derive(Debug)] pub struct Client { /// Send events to a node, such as broadcasting a transaction. - pub sender: EventSender, + pub requester: Requester, /// Receive log messages from a node. pub log_rx: mpsc::Receiver, /// Receive [`Event`] from a node to act on. @@ -35,7 +35,7 @@ impl Client { ntx: Sender, ) -> Self { Self { - sender: EventSender::new(ntx), + requester: Requester::new(ntx), log_rx, event_rx, } @@ -44,17 +44,15 @@ impl Client { /// Send messages to a node that is running so the node may complete a task. #[derive(Debug, Clone)] -pub struct EventSender { +pub struct Requester { ntx: Sender, } -impl EventSender { +impl Requester { fn new(ntx: Sender) -> Self { Self { ntx } } -} -impl EventSender { /// Tell the node to shut down. /// /// # Errors @@ -356,7 +354,7 @@ mod tests { let (_, event_rx) = tokio::sync::mpsc::unbounded_channel::(); let (ctx, crx) = mpsc::channel::(5); let Client { - sender, + requester, mut log_rx, event_rx: _, } = Client::new(log_rx, event_rx, ctx); @@ -375,7 +373,7 @@ mod tests { let message = log_rx.recv().await; assert!(message.is_some()); drop(log_rx); - let broadcast = sender + let broadcast = requester .broadcast_tx(TxBroadcast::new( transaction.clone(), crate::TxBroadcastPolicy::AllPeers, @@ -383,7 +381,7 @@ mod tests { .await; assert!(broadcast.is_ok()); drop(crx); - let broadcast = sender.shutdown().await; + let broadcast = requester.shutdown().await; assert!(broadcast.is_err()); } } diff --git a/src/lib.rs b/src/lib.rs index 04934dd..5050d63 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,7 +42,7 @@ //! // Run the node and wait for the sync message; //! tokio::task::spawn(async move { node.run().await }); //! // Split the client into components that send messages and listen to messages -//! let Client { sender, mut log_rx, mut event_rx } = client; +//! let Client { requester, mut log_rx, mut event_rx } = client; //! // Sync with the single script added //! loop { //! tokio::select! { @@ -67,7 +67,7 @@ //! } //! } //! } -//! sender.shutdown().await; +//! requester.shutdown().await; //! } //! ``` //! @@ -126,7 +126,7 @@ pub use tokio::sync::mpsc::UnboundedReceiver; #[doc(inline)] pub use { crate::core::builder::NodeBuilder, - crate::core::client::{Client, EventSender}, + crate::core::client::{Client, Requester}, crate::core::error::{ClientError, NodeError}, crate::core::messages::{Event, FailurePayload, Log, Progress, SyncUpdate, Warning}, crate::core::node::{Node, NodeState}, diff --git a/tests/core.rs b/tests/core.rs index 5ac6da2..7b2d185 100644 --- a/tests/core.rs +++ b/tests/core.rs @@ -159,7 +159,7 @@ async fn test_reorg() { let (node, client) = new_node(scripts.clone(), socket_addr).await; tokio::task::spawn(async move { node.run().await }); let Client { - sender, + requester, log_rx: mut log, event_rx: mut channel, } = client; @@ -180,13 +180,13 @@ async fn test_reorg() { } kyoto::core::messages::Event::Synced(update) => { assert_eq!(update.tip().hash, best); - sender.shutdown().await.unwrap(); + requester.shutdown().await.unwrap(); break; } _ => {} } } - sender.shutdown().await.unwrap(); + requester.shutdown().await.unwrap(); rpc.stop().unwrap(); } @@ -211,7 +211,7 @@ async fn test_mine_after_reorg() { let (node, client) = new_node(scripts.clone(), socket_addr).await; tokio::task::spawn(async move { node.run().await }); let Client { - sender, + requester, log_rx: mut log, event_rx: mut channel, } = client; @@ -219,7 +219,7 @@ async fn test_mine_after_reorg() { // Reorganize the blocks let old_best = best; let old_height = num_blocks(rpc); - let fetched_header = sender.get_header(10).await.unwrap(); + let fetched_header = requester.get_header(10).await.unwrap(); assert_eq!(old_best, fetched_header.block_hash()); invalidate_block(rpc, &best).await; mine_blocks(rpc, &miner, 2, 1).await; @@ -242,7 +242,7 @@ async fn test_mine_after_reorg() { mine_blocks(rpc, &miner, 2, 1).await; let best = best_hash(rpc); sync_assert(&best, &mut channel, &mut log).await; - sender.shutdown().await.unwrap(); + requester.shutdown().await.unwrap(); rpc.stop().unwrap(); } @@ -266,19 +266,19 @@ async fn test_various_client_methods() { let (node, client) = new_node(scripts.clone(), socket_addr).await; tokio::task::spawn(async move { node.run().await }); let Client { - sender, + requester, log_rx: mut log, event_rx: mut channel, } = client; sync_assert(&best, &mut channel, &mut log).await; - let batch = sender.get_header_range(10_000..10_002).await.unwrap(); + let batch = requester.get_header_range(10_000..10_002).await.unwrap(); assert!(batch.is_empty()); - let _ = sender.broadcast_min_feerate().await.unwrap(); - let _ = sender.get_header(3).await.unwrap(); + let _ = requester.broadcast_min_feerate().await.unwrap(); + let _ = requester.get_header(3).await.unwrap(); let script = rpc.new_address().unwrap(); - sender.add_script(script).await.unwrap(); - assert!(sender.is_running().await); - sender.shutdown().await.unwrap(); + requester.add_script(script).await.unwrap(); + assert!(requester.is_running().await); + requester.shutdown().await.unwrap(); rpc.stop().unwrap(); } @@ -303,14 +303,14 @@ async fn test_sql_reorg() { let (node, client) = new_node_sql(scripts.clone(), socket_addr, tempdir.clone()).await; tokio::task::spawn(async move { node.run().await }); let Client { - sender, + requester, log_rx: mut log, event_rx: mut channel, } = client; sync_assert(&best, &mut channel, &mut log).await; - let batch = sender.get_header_range(0..10).await.unwrap(); + let batch = requester.get_header_range(0..10).await.unwrap(); assert!(!batch.is_empty()); - sender.shutdown().await.unwrap(); + requester.shutdown().await.unwrap(); // Reorganize the blocks let old_best = best; let old_height = num_blocks(rpc); @@ -321,7 +321,7 @@ async fn test_sql_reorg() { let (node, client) = new_node_sql(scripts.clone(), socket_addr, tempdir.clone()).await; tokio::task::spawn(async move { node.run().await }); let Client { - sender, + requester, log_rx: _, event_rx: mut channel, } = client; @@ -341,7 +341,7 @@ async fn test_sql_reorg() { _ => {} } } - sender.shutdown().await.unwrap(); + requester.shutdown().await.unwrap(); // Mine more blocks mine_blocks(rpc, &miner, 2, 1).await; let best = best_hash(rpc); @@ -349,13 +349,13 @@ async fn test_sql_reorg() { let (node, client) = new_node_sql(scripts.clone(), socket_addr, tempdir).await; tokio::task::spawn(async move { node.run().await }); let Client { - sender, + requester, log_rx: mut log, event_rx: mut channel, } = client; // The node properly syncs after persisting a reorg sync_assert(&best, &mut channel, &mut log).await; - sender.shutdown().await.unwrap(); + requester.shutdown().await.unwrap(); rpc.stop().unwrap(); } @@ -380,12 +380,12 @@ async fn test_two_deep_reorg() { let (node, client) = new_node_sql(scripts.clone(), socket_addr, tempdir.clone()).await; tokio::task::spawn(async move { node.run().await }); let Client { - sender, + requester, log_rx: mut log, event_rx: mut channel, } = client; sync_assert(&best, &mut channel, &mut log).await; - sender.shutdown().await.unwrap(); + requester.shutdown().await.unwrap(); // Reorganize the blocks let old_height = num_blocks(rpc); let old_best = best; @@ -398,7 +398,7 @@ async fn test_two_deep_reorg() { let (node, client) = new_node_sql(scripts.clone(), socket_addr, tempdir.clone()).await; tokio::task::spawn(async move { node.run().await }); let Client { - sender, + requester, log_rx: _, event_rx: mut channel, } = client; @@ -417,7 +417,7 @@ async fn test_two_deep_reorg() { _ => {} } } - sender.shutdown().await.unwrap(); + requester.shutdown().await.unwrap(); // Mine more blocks mine_blocks(rpc, &miner, 2, 1).await; let best = best_hash(rpc); @@ -425,13 +425,13 @@ async fn test_two_deep_reorg() { let (node, client) = new_node_sql(scripts.clone(), socket_addr, tempdir).await; tokio::task::spawn(async move { node.run().await }); let Client { - sender, + requester, log_rx: mut log, event_rx: mut channel, } = client; // The node properly syncs after persisting a reorg sync_assert(&best, &mut channel, &mut log).await; - sender.shutdown().await.unwrap(); + requester.shutdown().await.unwrap(); rpc.stop().unwrap(); } @@ -455,12 +455,12 @@ async fn test_sql_stale_anchor() { let (node, client) = new_node_sql(scripts.clone(), socket_addr, tempdir.clone()).await; tokio::task::spawn(async move { node.run().await }); let Client { - sender, + requester, log_rx: mut log, event_rx: mut channel, } = client; sync_assert(&best, &mut channel, &mut log).await; - sender.shutdown().await.unwrap(); + requester.shutdown().await.unwrap(); // Reorganize the blocks let old_best = best; let old_height = num_blocks(rpc); @@ -477,7 +477,7 @@ async fn test_sql_stale_anchor() { .await; tokio::task::spawn(async move { node.run().await }); let Client { - sender, + requester, log_rx: _, event_rx: mut channel, } = client; @@ -497,7 +497,7 @@ async fn test_sql_stale_anchor() { _ => {} } } - sender.shutdown().await.unwrap(); + requester.shutdown().await.unwrap(); // Don't do anything, but reload the node from the checkpoint let cp = best_hash(rpc); let old_height = num_blocks(rpc); @@ -512,13 +512,13 @@ async fn test_sql_stale_anchor() { .await; tokio::task::spawn(async move { node.run().await }); let Client { - sender, + requester, log_rx: mut log, event_rx: mut channel, } = client; // The node properly syncs after persisting a reorg sync_assert(&best, &mut channel, &mut log).await; - sender.shutdown().await.unwrap(); + requester.shutdown().await.unwrap(); // Mine more blocks and reload from the checkpoint let cp = best_hash(rpc); let old_height = num_blocks(rpc); @@ -534,13 +534,13 @@ async fn test_sql_stale_anchor() { .await; tokio::task::spawn(async move { node.run().await }); let Client { - sender, + requester, log_rx: mut log, event_rx: mut channel, } = client; // The node properly syncs after persisting a reorg sync_assert(&best, &mut channel, &mut log).await; - sender.shutdown().await.unwrap(); + requester.shutdown().await.unwrap(); rpc.stop().unwrap(); } @@ -573,7 +573,7 @@ async fn test_halting_works() { tokio::task::spawn(async move { node.run().await }); let Client { - sender, + requester, log_rx: mut log, event_rx: mut channel, } = client; @@ -586,7 +586,7 @@ async fn test_halting_works() { if let NodeState::FilterHeadersSynced = node_state { println!("Sleeping for one second..."); tokio::time::sleep(Duration::from_secs(1)).await; - sender.continue_download().await.unwrap(); + requester.continue_download().await.unwrap(); break; } } @@ -600,7 +600,7 @@ async fn test_halting_works() { break; } } - sender.shutdown().await.unwrap(); + requester.shutdown().await.unwrap(); rpc.stop().unwrap(); }