Skip to content

Commit

Permalink
Add CleanStorage command for P2P
Browse files Browse the repository at this point in the history
`CleanStorageCommand` - deletes all entries from storage by session
IDs, scopes and operators' pubkeys.

For that `delete_raw` method for `Repository` was added, which deletes
entries at once by array of keys.

Implementation creates cartesian products from sets of operators'
pubkeys and session IDs, operators' pubkeys and scopes, then
deletes all some entries at once, but sequantially for each type
of entry (nonces, signatures or deposit setups).
  • Loading branch information
Velnbur committed Jan 24, 2025
1 parent 02fa63a commit 4f029b8
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 5 deletions.
14 changes: 12 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 36 additions & 0 deletions crates/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ pub trait Repository: Send + Sync + 'static {
async fn get_raw(&self, key: String) -> DBResult<Option<Vec<u8>>>;
async fn set_raw(&self, key: String, value: Vec<u8>) -> DBResult<()>;

/// Delete all values with provided keys.
async fn delete_raw(&self, keys: Vec<String>) -> DBResult<()>;

/// Set new value if it wasn't there before. Default implementation is
/// just `get`+`set`, but some databases may have more optimized
/// implementation in one go.
Expand Down Expand Up @@ -112,6 +115,19 @@ where
self.set_if_not_exists(key, entry).await
}

/// Delete multiple entries of partial signatures from storage by pairs of
/// operator pubkey and session ids.
async fn delete_partial_signatures(
&self,
keys: &[(&OperatorPubKey, &SessionId)],
) -> DBResult<()> {
let keys = keys
.iter()
.map(|(key, id)| format!("sigs-{key}_{id}"))
.collect::<Vec<_>>();
self.delete_raw(keys).await
}

async fn get_pub_nonces(
&self,
operator_pk: &OperatorPubKey,
Expand All @@ -130,6 +146,16 @@ where
self.set_if_not_exists(key, entry).await
}

/// Delete multiple entries of pub nonces from storage by pairs of
/// operator pubkey and session ids.
async fn delete_pub_nonces(&self, keys: &[(&OperatorPubKey, &SessionId)]) -> DBResult<()> {
let keys = keys
.iter()
.map(|(key, id)| format!("nonces-{key}_{id}"))
.collect::<Vec<_>>();
self.delete_raw(keys).await
}

async fn get_deposit_setup(
&self,
operator_pk: &OperatorPubKey,
Expand All @@ -148,6 +174,16 @@ where
self.set_if_not_exists(key, setup).await
}

/// Delete multiple entries of deposit setups from storage by pairs of
/// operator pubkey and session ids.
async fn delete_deposit_setups(&self, keys: &[(&OperatorPubKey, &Scope)]) -> DBResult<()> {
let keys = keys
.iter()
.map(|(key, scope)| format!("setup-{key}_{scope}"))
.collect::<Vec<_>>();
self.delete_raw(keys).await
}

async fn get_genesis_info(
&self,
operator_pk: &OperatorPubKey,
Expand Down
23 changes: 23 additions & 0 deletions crates/db/src/sled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,21 @@ impl Repository for AsyncDB {
rx.await?.map_err(Into::into)
}

async fn delete_raw(&self, keys: Vec<String>) -> DBResult<()> {
let (tx, rx) = oneshot::channel();

let db = self.db.clone();
self.pool.execute(move || {
let result = delete_all_by_key(db, keys);

if tx.send(result).is_err() {
warn!("Receiver channel hanged up or dropped");
}
});

rx.await?.map_err(Into::into)
}

async fn set_raw_if_not_exists(&self, key: String, value: Vec<u8>) -> DBResult<bool> {
let (tx, rx) = oneshot::channel();

Expand Down Expand Up @@ -84,6 +99,14 @@ fn set_if_not_exist(db: Arc<sled::Db>, key: String, value: Vec<u8>) -> Result<bo
Ok(true)
}

fn delete_all_by_key(db: Arc<sled::Db>, keys: Vec<String>) -> Result<(), sled::Error> {
for key in keys {
db.remove(key)?;
}

Ok(())
}

impl From<sled::Error> for RepositoryError {
fn from(value: sled::Error) -> Self {
RepositoryError::Storage(value.into())
Expand Down
1 change: 1 addition & 0 deletions crates/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ tokio = { workspace = true, features = ["macros", "time"] }
tokio-util.workspace = true
tracing.workspace = true

itertools = "0.14.0"
strata-p2p-db.path = "../db"
strata-p2p-types.path = "../types"
strata-p2p-wire.path = "../wire"
Expand Down
53 changes: 53 additions & 0 deletions crates/p2p/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ pub enum Command<DepositSetupPayload> {

/// Request some message directly from other operator by peer id.
RequestMessage(GetMessageRequest),

/// Clean session, scopes from internal DB.
CleanStorage(CleanStorageCommand),
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -113,3 +116,53 @@ impl<DepositSetupPayload> From<PublishMessage<DepositSetupPayload>>
Self::PublishMessage(v)
}
}

/// Command P2P to clean entries from internal key-value storage by
/// session IDs, scopes and operator pubkeys.
#[derive(Debug, Clone)]
pub struct CleanStorageCommand {
pub scopes: Vec<Scope>,
pub session_ids: Vec<SessionId>,
pub operators: Vec<OperatorPubKey>,
}

impl CleanStorageCommand {
pub const fn new(
scopes: Vec<Scope>,
session_ids: Vec<SessionId>,
operators: Vec<OperatorPubKey>,
) -> Self {
Self {
scopes,
session_ids,
operators,
}
}

/// Clean entries only by scope and operators from storage.
pub const fn with_scopes(scopes: Vec<Scope>, operators: Vec<OperatorPubKey>) -> Self {
Self {
scopes,
session_ids: Vec::new(),
operators,
}
}

/// Clean entries only by session IDs and operators from storage.
pub const fn with_session_ids(
session_ids: Vec<SessionId>,
operators: Vec<OperatorPubKey>,
) -> Self {
Self {
scopes: Vec::new(),
session_ids,
operators,
}
}
}

impl<DSP> From<CleanStorageCommand> for Command<DSP> {
fn from(v: CleanStorageCommand) -> Self {
Self::CleanStorage(v)
}
}
4 changes: 2 additions & 2 deletions crates/p2p/src/swarm/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ where
}

/// Send command to P2P.
pub async fn send_command(&self, command: Command<DSP>) {
let _ = self.commands.send(command).await;
pub async fn send_command(&self, command: impl Into<Command<DSP>>) {
let _ = self.commands.send(command.into()).await;
}

/// Get next event from P2P from events channel.
Expand Down
25 changes: 25 additions & 0 deletions crates/p2p/src/swarm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use behavior::{Behaviour, BehaviourEvent};
use bitcoin::hashes::Hash;
use futures::StreamExt as _;
use handle::P2PHandle;
use itertools::iproduct;
use libp2p::{
core::{muxing::StreamMuxerBox, transport::MemoryTransport, ConnectedPoint},
gossipsub::{Event as GossipsubEvent, Message, MessageAcceptance, MessageId, Sha256Topic},
Expand Down Expand Up @@ -456,6 +457,30 @@ where
.send_request(&peer, request.clone());
}

Ok(())
}
Command::CleanStorage(cmd) => {
// Get cartesian product of all provided operators and session ids
let operator_session_id_pairs =
iproduct!(&cmd.operators, &cmd.session_ids).collect::<Vec<_>>();

if !operator_session_id_pairs.is_empty() {
self.db
.delete_partial_signatures(&operator_session_id_pairs)
.await?;
self.db
.delete_pub_nonces(&operator_session_id_pairs)
.await?;
}

// Get cartesian product of all provided operators and scopes
let operator_scope_pairs =
iproduct!(&cmd.operators, &cmd.scopes).collect::<Vec<_>>();

if !operator_scope_pairs.is_empty() {
self.db.delete_deposit_setups(&operator_scope_pairs).await?;
}

Ok(())
}
}
Expand Down
67 changes: 66 additions & 1 deletion crates/p2p/tests/gossipsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use libp2p::{
PeerId,
};
use strata_p2p::{
commands::{Command, UnsignedPublishMessage},
commands::{CleanStorageCommand, Command, UnsignedPublishMessage},
events::Event,
swarm::handle::P2PHandle,
};
Expand Down Expand Up @@ -267,6 +267,71 @@ async fn test_all_to_all_multiple_scopes() -> anyhow::Result<()> {
Ok(())
}

#[tokio::test]
async fn test_operator_cleans_entries_correctly_at_command() -> anyhow::Result<()> {
const OPERATORS_NUM: usize = 2;

tracing_subscriber::registry()
.with(fmt::layer())
.with(EnvFilter::from_default_env())
.init();

let Setup {
mut operators,
cancel,
tasks,
} = Setup::all_to_all(OPERATORS_NUM).await?;

let scope = Scope::hash(b"scope");
let session_id = SessionId::hash(b"session_id");

exchange_genesis_info(&mut operators, OPERATORS_NUM).await?;
exchange_deposit_setup(&mut operators, OPERATORS_NUM, scope).await?;
exchange_deposit_nonces(&mut operators, OPERATORS_NUM, session_id).await?;
exchange_deposit_sigs(&mut operators, OPERATORS_NUM, session_id).await?;

let other_operator_pubkey = OperatorPubKey::from(operators[0].kp.public().to_bytes().to_vec());
let last_operator = &mut operators[1];
last_operator
.handle
.send_command(CleanStorageCommand::new(
vec![scope],
vec![session_id],
vec![other_operator_pubkey.clone()],
))
.await;

cancel.cancel();
tasks.wait().await;

// Check that storage is empty after that.
let setup_entry = <AsyncDB as RepositoryExt<()>>::get_deposit_setup(
&last_operator.db,
&other_operator_pubkey,
scope,
)
.await?;
assert!(setup_entry.is_none());

let nonces_entry = <AsyncDB as RepositoryExt<()>>::get_pub_nonces(
&last_operator.db,
&other_operator_pubkey,
session_id,
)
.await?;
assert!(nonces_entry.is_none());

let sigs_entry = <AsyncDB as RepositoryExt<()>>::get_partial_signatures(
&last_operator.db,
&other_operator_pubkey,
session_id,
)
.await?;
assert!(sigs_entry.is_none());

Ok(())
}

async fn exchange_genesis_info(
operators: &mut [OperatorHandle],
operators_num: usize,
Expand Down

0 comments on commit 4f029b8

Please sign in to comment.