Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove TimeoutManager and add command for requesting messages for P2PHandle #18

Merged
merged 5 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ serde_json = "1.0.135"
threadpool.workspace = true
tokio = { workspace = true, features = ["sync"] }

libp2p-identity = { workspace = true, features = ["serde"] }

[dev-dependencies]
tokio = { workspace = true, features = ["rt"] }
rand = "0.8.5"
Expand Down
103 changes: 64 additions & 39 deletions crates/db/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
use async_trait::async_trait;
use bitcoin::{hashes::sha256, OutPoint, XOnlyPublicKey};
use libp2p_identity::PeerId;
use musig2::{PartialSignature, PubNonce};
use serde::{de::DeserializeOwned, Serialize};
use strata_p2p_types::OperatorPubKey;
use thiserror::Error;

use crate::states::PeerDepositState;

pub mod sled;

pub mod states;

mod prost_serde;
pub mod sled;

pub type DBResult<T> = Result<T, RepositoryError>;

Expand Down Expand Up @@ -45,6 +41,30 @@ pub trait Repository: Send + Sync + 'static {
async fn get_raw(&self, key: String) -> DBResult<Option<Vec<u8>>>;
async fn set_raw(&self, key: String, value: Vec<u8>) -> DBResult<()>;

/// Set new value if it wasn't there before. Default implementation is
/// just `get`+`set`, but some databases may have more optimized
/// implementation in one go.
///
/// Returns `true` if `value` wasn't there before.
async fn set_raw_if_not_exists(&self, key: String, value: Vec<u8>) -> DBResult<bool> {
if self.get_raw(key.clone()).await?.is_some() {
return Ok(false);
}
self.set_raw(key, value).await?;

Ok(true)
}

async fn set_if_not_exists<T>(&self, key: String, value: T) -> DBResult<bool>
where
T: Serialize + Send + Sync + 'static,
{
let mut buf = Vec::new();
serde_json::to_writer(&mut buf, &value)?;

self.set_raw_if_not_exists(key, buf).await
}

async fn get<T: DeserializeOwned>(&self, key: String) -> DBResult<Option<T>> {
let bytes = self.get_raw(key).await?;
let Some(bytes) = bytes else {
Expand Down Expand Up @@ -83,13 +103,13 @@ where
self.get(key).await
}

async fn set_partial_signatures(
async fn set_partial_signatures_if_not_exists(
&self,
scope: sha256::Hash,
entry: PartialSignaturesEntry,
) -> DBResult<()> {
) -> DBResult<bool> {
let key = format!("sigs-{}_{scope}", entry.key);
self.set(key, entry).await
self.set_if_not_exists(key, entry).await
}

async fn get_pub_nonces(
Expand All @@ -101,9 +121,13 @@ where
self.get(key).await
}

async fn set_pub_nonces(&self, scope: sha256::Hash, entry: NoncesEntry) -> DBResult<()> {
async fn set_pub_nonces_if_not_exist(
&self,
scope: sha256::Hash,
entry: NoncesEntry,
) -> DBResult<bool> {
let key = format!("nonces-{}_{scope}", entry.key);
self.set(key, entry).await
self.set_if_not_exists(key, entry).await
}

async fn get_deposit_setup(
Expand All @@ -115,50 +139,51 @@ where
self.get(key).await
}

async fn set_deposit_setup(
async fn set_deposit_setup_if_not_exists(
&self,
scope: sha256::Hash,
setup: DepositSetupEntry<DepositSetupPayload>,
) -> DBResult<()> {
) -> DBResult<bool> {
let key = format!("setup-{}_{scope}", setup.key);
self.set(key, setup).await
self.set_if_not_exists(key, setup).await
}

async fn get_peer_deposit_status(
async fn get_genesis_info(
&self,
operator_pk: &OperatorPubKey,
scope: sha256::Hash,
) -> DBResult<PeerDepositState> {
if self
.get_partial_signatures(operator_pk, scope)
.await?
.is_some()
{
return Ok(PeerDepositState::Sigs);
}

if self.get_pub_nonces(operator_pk, scope).await?.is_some() {
return Ok(PeerDepositState::Nonces);
}

if self.get_deposit_setup(operator_pk, scope).await?.is_some() {
return Ok(PeerDepositState::Setup);
}
) -> DBResult<Option<GenesisInfoEntry>> {
let key = format!("genesis-{operator_pk}");
self.get(key).await
}

Ok(PeerDepositState::PreSetup)
async fn set_genesis_info_if_not_exists(&self, info: GenesisInfoEntry) -> DBResult<bool> {
let key = format!("genesis-{}", info.key);
self.set_if_not_exists(key, info).await
}

async fn get_genesis_info(
/* P2P stores mapping of Musig2 exchange signers (operators) to node peer
id that publishes messages. These methods store and retrieve this
mapping: */

/// Get peer id of node, that distributed message signed by operator
/// pubkey.
async fn get_peer_by_signer_pubkey(
&self,
operator_pk: &OperatorPubKey,
) -> DBResult<Option<GenesisInfoEntry>> {
let key = format!("genesis-{operator_pk}");
) -> DBResult<Option<PeerId>> {
let key = format!("signers=>peerid-{}", operator_pk);
self.get(key).await
}

async fn set_genesis_info(&self, info: GenesisInfoEntry) -> DBResult<()> {
let key = format!("genesis-{}", info.key);
self.set(key, info).await
/// Store peer id of node, that distributed message signed by this
/// operator.
async fn set_peer_for_signer_pubkey(
&self,
operator_pk: &OperatorPubKey,
peer_id: PeerId,
) -> DBResult<()> {
let key = format!("signers=>peerid-{}", operator_pk);
self.set(key, peer_id).await
}
}

Expand Down
33 changes: 30 additions & 3 deletions crates/db/src/sled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,21 @@ impl Repository for AsyncDB {

rx.await?.map_err(Into::into)
}

async fn set_raw_if_not_exists(&self, key: String, value: Vec<u8>) -> DBResult<bool> {
let (tx, rx) = oneshot::channel();

let db = self.db.clone();
self.pool.execute(move || {
let result = set_if_not_exist(db, key, value);

if tx.send(result).is_err() {
warn!("Receiver channel hanged up or dropped");
}
});

rx.await?.map_err(Into::into)
}
}

impl From<RecvError> for RepositoryError {
Expand All @@ -58,6 +73,16 @@ impl From<RecvError> for RepositoryError {
}
}

fn set_if_not_exist(db: Arc<sled::Db>, key: String, value: Vec<u8>) -> Result<bool, sled::Error> {
if db.get(key.clone())?.is_some() {
return Ok(false);
}

db.insert(key, value)?;

Ok(true)
}

impl From<sled::Error> for RepositoryError {
fn from(value: sled::Error) -> Self {
RepositoryError::Storage(value.into())
Expand Down Expand Up @@ -110,7 +135,9 @@ mod tests {
key: operator_pk.clone(),
};

db.set_pub_nonces(scope, nonces_entry).await.unwrap();
db.set_pub_nonces_if_not_exist(scope, nonces_entry)
.await
.unwrap();

let agg_nonce = AggNonce::sum([pub_nonce.clone()]);
let ctx = KeyAggContext::new([keypair.public_key()]).unwrap();
Expand All @@ -124,7 +151,7 @@ mod tests {
key: operator_pk.clone(),
};

db.set_partial_signatures(scope, sigs_entry)
db.set_partial_signatures_if_not_exists(scope, sigs_entry)
.await
.expect("Failed to set signature");

Expand All @@ -144,7 +171,7 @@ mod tests {
key: operator_pk.clone(),
};

db.set_genesis_info(entry).await.unwrap();
db.set_genesis_info_if_not_exists(entry).await.unwrap();

let GenesisInfoEntry {
entry: (got_op, got_keys),
Expand Down
10 changes: 0 additions & 10 deletions crates/db/src/states.rs

This file was deleted.

1 change: 1 addition & 0 deletions crates/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ futures.workspace = true
musig2 = { workspace = true, features = ["serde"] }
prost.workspace = true
thiserror.workspace = true
threadpool.workspace = true
tokio = { workspace = true, features = ["macros", "time"] }
tokio-util.workspace = true
tracing.workspace = true
Expand Down
Loading
Loading