From 4f029b88fad9ed0d79992ed4eb1133c9835c2bf5 Mon Sep 17 00:00:00 2001 From: Velnbur Date: Wed, 22 Jan 2025 12:20:24 +0200 Subject: [PATCH] Add `CleanStorage` command for P2P `CleanStorageCommand` - deletes all entries from storage by session IDs, scopes and operators' pubkeys. For that `delete_raw` method for `Repository` was added, which deletes entries at once by array of keys. Implementation creates cartesian products from sets of operators' pubkeys and session IDs, operators' pubkeys and scopes, then deletes all some entries at once, but sequantially for each type of entry (nonces, signatures or deposit setups). --- Cargo.lock | 14 ++++++- crates/db/src/lib.rs | 36 ++++++++++++++++++ crates/db/src/sled.rs | 23 ++++++++++++ crates/p2p/Cargo.toml | 1 + crates/p2p/src/commands.rs | 53 +++++++++++++++++++++++++++ crates/p2p/src/swarm/handle.rs | 4 +- crates/p2p/src/swarm/mod.rs | 25 +++++++++++++ crates/p2p/tests/gossipsub.rs | 67 +++++++++++++++++++++++++++++++++- 8 files changed, 218 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9ece92b..af4108e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1466,6 +1466,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.14" @@ -2527,7 +2536,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0f3e5beed80eb580c68e2c600937ac2c4eedabdfd5ef1e5b7ea4f3fba84497b" dependencies = [ "heck", - "itertools", + "itertools 0.13.0", "log", "multimap", "once_cell", @@ -2547,7 +2556,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "157c5a9d7ea5c2ed2d9fb8f495b64759f7816c7eaea54ba3978f0d63000162e3" dependencies = [ "anyhow", - "itertools", + "itertools 0.13.0", "proc-macro2", "quote", "syn", @@ -3155,6 +3164,7 @@ dependencies = [ "async-trait", "bitcoin", "futures", + "itertools 0.14.0", "libp2p", "musig2", "prost", diff --git a/crates/db/src/lib.rs b/crates/db/src/lib.rs index 12a43ab..f940f5d 100644 --- a/crates/db/src/lib.rs +++ b/crates/db/src/lib.rs @@ -41,6 +41,9 @@ pub trait Repository: Send + Sync + 'static { async fn get_raw(&self, key: String) -> DBResult>>; async fn set_raw(&self, key: String, value: Vec) -> DBResult<()>; + /// Delete all values with provided keys. + async fn delete_raw(&self, keys: Vec) -> DBResult<()>; + /// Set new value if it wasn't there before. Default implementation is /// just `get`+`set`, but some databases may have more optimized /// implementation in one go. @@ -112,6 +115,19 @@ where self.set_if_not_exists(key, entry).await } + /// Delete multiple entries of partial signatures from storage by pairs of + /// operator pubkey and session ids. + async fn delete_partial_signatures( + &self, + keys: &[(&OperatorPubKey, &SessionId)], + ) -> DBResult<()> { + let keys = keys + .iter() + .map(|(key, id)| format!("sigs-{key}_{id}")) + .collect::>(); + self.delete_raw(keys).await + } + async fn get_pub_nonces( &self, operator_pk: &OperatorPubKey, @@ -130,6 +146,16 @@ where self.set_if_not_exists(key, entry).await } + /// Delete multiple entries of pub nonces from storage by pairs of + /// operator pubkey and session ids. + async fn delete_pub_nonces(&self, keys: &[(&OperatorPubKey, &SessionId)]) -> DBResult<()> { + let keys = keys + .iter() + .map(|(key, id)| format!("nonces-{key}_{id}")) + .collect::>(); + self.delete_raw(keys).await + } + async fn get_deposit_setup( &self, operator_pk: &OperatorPubKey, @@ -148,6 +174,16 @@ where self.set_if_not_exists(key, setup).await } + /// Delete multiple entries of deposit setups from storage by pairs of + /// operator pubkey and session ids. + async fn delete_deposit_setups(&self, keys: &[(&OperatorPubKey, &Scope)]) -> DBResult<()> { + let keys = keys + .iter() + .map(|(key, scope)| format!("setup-{key}_{scope}")) + .collect::>(); + self.delete_raw(keys).await + } + async fn get_genesis_info( &self, operator_pk: &OperatorPubKey, diff --git a/crates/db/src/sled.rs b/crates/db/src/sled.rs index bc6a337..70f7a26 100644 --- a/crates/db/src/sled.rs +++ b/crates/db/src/sled.rs @@ -52,6 +52,21 @@ impl Repository for AsyncDB { rx.await?.map_err(Into::into) } + async fn delete_raw(&self, keys: Vec) -> DBResult<()> { + let (tx, rx) = oneshot::channel(); + + let db = self.db.clone(); + self.pool.execute(move || { + let result = delete_all_by_key(db, keys); + + if tx.send(result).is_err() { + warn!("Receiver channel hanged up or dropped"); + } + }); + + rx.await?.map_err(Into::into) + } + async fn set_raw_if_not_exists(&self, key: String, value: Vec) -> DBResult { let (tx, rx) = oneshot::channel(); @@ -84,6 +99,14 @@ fn set_if_not_exist(db: Arc, key: String, value: Vec) -> Result, keys: Vec) -> Result<(), sled::Error> { + for key in keys { + db.remove(key)?; + } + + Ok(()) +} + impl From for RepositoryError { fn from(value: sled::Error) -> Self { RepositoryError::Storage(value.into()) diff --git a/crates/p2p/Cargo.toml b/crates/p2p/Cargo.toml index a71f251..5da1174 100644 --- a/crates/p2p/Cargo.toml +++ b/crates/p2p/Cargo.toml @@ -27,6 +27,7 @@ tokio = { workspace = true, features = ["macros", "time"] } tokio-util.workspace = true tracing.workspace = true +itertools = "0.14.0" strata-p2p-db.path = "../db" strata-p2p-types.path = "../types" strata-p2p-wire.path = "../wire" diff --git a/crates/p2p/src/commands.rs b/crates/p2p/src/commands.rs index e3b2289..503f752 100644 --- a/crates/p2p/src/commands.rs +++ b/crates/p2p/src/commands.rs @@ -17,6 +17,9 @@ pub enum Command { /// Request some message directly from other operator by peer id. RequestMessage(GetMessageRequest), + + /// Clean session, scopes from internal DB. + CleanStorage(CleanStorageCommand), } #[derive(Debug, Clone)] @@ -113,3 +116,53 @@ impl From> Self::PublishMessage(v) } } + +/// Command P2P to clean entries from internal key-value storage by +/// session IDs, scopes and operator pubkeys. +#[derive(Debug, Clone)] +pub struct CleanStorageCommand { + pub scopes: Vec, + pub session_ids: Vec, + pub operators: Vec, +} + +impl CleanStorageCommand { + pub const fn new( + scopes: Vec, + session_ids: Vec, + operators: Vec, + ) -> Self { + Self { + scopes, + session_ids, + operators, + } + } + + /// Clean entries only by scope and operators from storage. + pub const fn with_scopes(scopes: Vec, operators: Vec) -> Self { + Self { + scopes, + session_ids: Vec::new(), + operators, + } + } + + /// Clean entries only by session IDs and operators from storage. + pub const fn with_session_ids( + session_ids: Vec, + operators: Vec, + ) -> Self { + Self { + scopes: Vec::new(), + session_ids, + operators, + } + } +} + +impl From for Command { + fn from(v: CleanStorageCommand) -> Self { + Self::CleanStorage(v) + } +} diff --git a/crates/p2p/src/swarm/handle.rs b/crates/p2p/src/swarm/handle.rs index 24f9fd6..2c1600b 100644 --- a/crates/p2p/src/swarm/handle.rs +++ b/crates/p2p/src/swarm/handle.rs @@ -31,8 +31,8 @@ where } /// Send command to P2P. - pub async fn send_command(&self, command: Command) { - let _ = self.commands.send(command).await; + pub async fn send_command(&self, command: impl Into>) { + let _ = self.commands.send(command.into()).await; } /// Get next event from P2P from events channel. diff --git a/crates/p2p/src/swarm/mod.rs b/crates/p2p/src/swarm/mod.rs index e9e5ad3..3e9e2c5 100644 --- a/crates/p2p/src/swarm/mod.rs +++ b/crates/p2p/src/swarm/mod.rs @@ -4,6 +4,7 @@ use behavior::{Behaviour, BehaviourEvent}; use bitcoin::hashes::Hash; use futures::StreamExt as _; use handle::P2PHandle; +use itertools::iproduct; use libp2p::{ core::{muxing::StreamMuxerBox, transport::MemoryTransport, ConnectedPoint}, gossipsub::{Event as GossipsubEvent, Message, MessageAcceptance, MessageId, Sha256Topic}, @@ -456,6 +457,30 @@ where .send_request(&peer, request.clone()); } + Ok(()) + } + Command::CleanStorage(cmd) => { + // Get cartesian product of all provided operators and session ids + let operator_session_id_pairs = + iproduct!(&cmd.operators, &cmd.session_ids).collect::>(); + + if !operator_session_id_pairs.is_empty() { + self.db + .delete_partial_signatures(&operator_session_id_pairs) + .await?; + self.db + .delete_pub_nonces(&operator_session_id_pairs) + .await?; + } + + // Get cartesian product of all provided operators and scopes + let operator_scope_pairs = + iproduct!(&cmd.operators, &cmd.scopes).collect::>(); + + if !operator_scope_pairs.is_empty() { + self.db.delete_deposit_setups(&operator_scope_pairs).await?; + } + Ok(()) } } diff --git a/crates/p2p/tests/gossipsub.rs b/crates/p2p/tests/gossipsub.rs index 14f3db0..fa07b98 100644 --- a/crates/p2p/tests/gossipsub.rs +++ b/crates/p2p/tests/gossipsub.rs @@ -8,7 +8,7 @@ use libp2p::{ PeerId, }; use strata_p2p::{ - commands::{Command, UnsignedPublishMessage}, + commands::{CleanStorageCommand, Command, UnsignedPublishMessage}, events::Event, swarm::handle::P2PHandle, }; @@ -267,6 +267,71 @@ async fn test_all_to_all_multiple_scopes() -> anyhow::Result<()> { Ok(()) } +#[tokio::test] +async fn test_operator_cleans_entries_correctly_at_command() -> anyhow::Result<()> { + const OPERATORS_NUM: usize = 2; + + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); + + let Setup { + mut operators, + cancel, + tasks, + } = Setup::all_to_all(OPERATORS_NUM).await?; + + let scope = Scope::hash(b"scope"); + let session_id = SessionId::hash(b"session_id"); + + exchange_genesis_info(&mut operators, OPERATORS_NUM).await?; + exchange_deposit_setup(&mut operators, OPERATORS_NUM, scope).await?; + exchange_deposit_nonces(&mut operators, OPERATORS_NUM, session_id).await?; + exchange_deposit_sigs(&mut operators, OPERATORS_NUM, session_id).await?; + + let other_operator_pubkey = OperatorPubKey::from(operators[0].kp.public().to_bytes().to_vec()); + let last_operator = &mut operators[1]; + last_operator + .handle + .send_command(CleanStorageCommand::new( + vec![scope], + vec![session_id], + vec![other_operator_pubkey.clone()], + )) + .await; + + cancel.cancel(); + tasks.wait().await; + + // Check that storage is empty after that. + let setup_entry = >::get_deposit_setup( + &last_operator.db, + &other_operator_pubkey, + scope, + ) + .await?; + assert!(setup_entry.is_none()); + + let nonces_entry = >::get_pub_nonces( + &last_operator.db, + &other_operator_pubkey, + session_id, + ) + .await?; + assert!(nonces_entry.is_none()); + + let sigs_entry = >::get_partial_signatures( + &last_operator.db, + &other_operator_pubkey, + session_id, + ) + .await?; + assert!(sigs_entry.is_none()); + + Ok(()) +} + async fn exchange_genesis_info( operators: &mut [OperatorHandle], operators_num: usize,