Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compact block cache pt1 #158

Merged
merged 22 commits into from
Jan 20, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
changes from code review
AloeareV committed Jan 20, 2025

Verified

This commit was signed with the committer’s verified signature.
nicosomb Nicolas Lœuillet
commit a7753d8d714240eda50f30ee89e7f40bfd5b1403
2 changes: 1 addition & 1 deletion integration-tests/tests/wallet_to_validator.rs
Original file line number Diff line number Diff line change
@@ -217,7 +217,7 @@ mod wallet_basic {
)
.await
.unwrap();


dbg!(unfinalised_transactions.clone());

6 changes: 1 addition & 5 deletions zaino-state/src/fetch.rs
Original file line number Diff line number Diff line change
@@ -393,11 +393,7 @@ impl ZcashIndexer for FetchServiceSubscriber {
) -> Result<GetSubtrees, Self::Error> {
Ok(self
.fetcher
.get_subtrees_by_index(
pool,
start_index.0,
limit.map(|limit_index| limit_index.0),
)
.get_subtrees_by_index(pool, start_index.0, limit.map(|limit_index| limit_index.0))
.await?
.into())
}
130 changes: 68 additions & 62 deletions zaino-state/src/local_cache/finalised_state.rs
Original file line number Diff line number Diff line change
@@ -149,22 +149,29 @@ impl FinalisedState {
};

finalised_state.sync_db_from_reorg().await?;

finalised_state.write_task_handle =
Some(finalised_state.spawn_writer(block_receiver).await?);

finalised_state.read_task_handle = Some(finalised_state.spawn_reader(request_rx).await?);
finalised_state.spawn_writer(block_receiver).await?;
finalised_state.spawn_reader(request_rx).await?;

finalised_state.status.store(StatusType::Ready.into());

Ok(finalised_state)
}

async fn spawn_writer(
&self,
&mut self,
mut block_receiver: tokio::sync::mpsc::Receiver<(Height, Hash, CompactBlock)>,
) -> Result<tokio::task::JoinHandle<()>, FinalisedStateError> {
let finalised_state = self.clone();
) -> Result<(), FinalisedStateError> {
let finalised_state = Self {
fetcher: self.fetcher.clone(),
database: Arc::clone(&self.database),
heights_to_hashes: self.heights_to_hashes,
hashes_to_blocks: self.hashes_to_blocks,
request_sender: self.request_sender.clone(),
read_task_handle: None,
write_task_handle: None,
status: self.status.clone(),
config: self.config.clone(),
};

let writer_handle = tokio::spawn(async move {
while let Some((height, mut hash, mut compact_block)) = block_receiver.recv().await {
@@ -262,14 +269,25 @@ impl FinalisedState {
}
});

Ok(writer_handle)
self.write_task_handle = Some(writer_handle);
Ok(())
}

async fn spawn_reader(
&self,
&mut self,
mut request_receiver: tokio::sync::mpsc::Receiver<DbRequest>,
) -> Result<tokio::task::JoinHandle<()>, FinalisedStateError> {
let finalised_state = self.clone();
) -> Result<(), FinalisedStateError> {
let finalised_state = Self {
fetcher: self.fetcher.clone(),
database: Arc::clone(&self.database),
heights_to_hashes: self.heights_to_hashes,
hashes_to_blocks: self.hashes_to_blocks,
request_sender: self.request_sender.clone(),
read_task_handle: None,
write_task_handle: None,
status: self.status.clone(),
config: self.config.clone(),
};

let reader_handle = tokio::spawn(async move {
while let Some(DbRequest {
@@ -291,7 +309,8 @@ impl FinalisedState {
}
});

Ok(reader_handle)
self.read_task_handle = Some(reader_handle);
Ok(())
}

/// Syncs database with the server,
@@ -396,45 +415,48 @@ impl FinalisedState {
// Wait for server to sync to with p2p network and sync new blocks.
if !self.config.network.is_regtest() && !self.config.no_sync {
self.status.store(StatusType::Syncing.into());
loop {
let blockchain_info = self.fetcher.get_blockchain_info().await?;
let server_height = blockchain_info.blocks.0;
for block_height in (sync_height + 1)..(server_height - 99) {
if self.get_hash(block_height).is_ok() {
self.delete_block(Height(block_height))?;
}
loop {
match fetch_block_from_node(
&self.fetcher,
HashOrHeight::Height(Height(block_height)),
)
.await
{
Ok((hash, block)) => {
self.insert_block((Height(block_height), hash, block))?;
break;
}
Err(e) => {
self.status.store(StatusType::RecoverableError.into());
eprintln!("{e}");
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
loop {
let blockchain_info = self.fetcher.get_blockchain_info().await?;
let server_height = blockchain_info.blocks.0;
for block_height in (sync_height + 1)..(server_height - 99) {
if self.get_hash(block_height).is_ok() {
self.delete_block(Height(block_height))?;
}
loop {
match fetch_block_from_node(
&self.fetcher,
HashOrHeight::Height(Height(block_height)),
)
.await
{
Ok((hash, block)) => {
self.insert_block((Height(block_height), hash, block))?;
break;
}
Err(e) => {
self.status.store(StatusType::RecoverableError.into());
eprintln!("{e}");
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
}
}
sync_height = server_height - 99;
if (blockchain_info.blocks.0 as i64 - blockchain_info.estimated_height.0 as i64).abs() <= 10 {
break;
} else {
println!(" - Validator syncing with network. ZainoDB chain height: {}, Validator chain height: {}, Estimated Network chain height: {}",
}
sync_height = server_height - 99;
if (blockchain_info.blocks.0 as i64 - blockchain_info.estimated_height.0 as i64)
.abs()
<= 10
{
break;
} else {
println!(" - Validator syncing with network. ZainoDB chain height: {}, Validator chain height: {}, Estimated Network chain height: {}",
&sync_height,
&blockchain_info.blocks.0,
&blockchain_info.estimated_height.0
&blockchain_info.blocks.0,
&blockchain_info.estimated_height.0
);
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
continue;
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
continue;
}
}
}

Ok(())
@@ -588,22 +610,6 @@ impl Drop for FinalisedState {
}
}

impl Clone for FinalisedState {
fn clone(&self) -> Self {
Self {
fetcher: self.fetcher.clone(),
database: Arc::clone(&self.database),
heights_to_hashes: self.heights_to_hashes,
hashes_to_blocks: self.hashes_to_blocks,
request_sender: self.request_sender.clone(),
read_task_handle: None,
write_task_handle: None,
status: self.status.clone(),
config: self.config.clone(),
}
}
}

/// A subscriber to a [`NonFinalisedState`].
#[derive(Debug, Clone)]
pub struct FinalisedStateSubscriber {
9 changes: 6 additions & 3 deletions zaino-state/src/local_cache/non_finalised_state.rs
Original file line number Diff line number Diff line change
@@ -98,12 +98,15 @@ impl NonFinalisedState {
non_finalised_state.status.store(StatusType::Syncing.into());
loop {
let blockchain_info = fetcher.get_blockchain_info().await?;
if (blockchain_info.blocks.0 as i64 - blockchain_info.estimated_height.0 as i64).abs() <= 10 {
if (blockchain_info.blocks.0 as i64 - blockchain_info.estimated_height.0 as i64)
.abs()
<= 10
{
break;
} else {
println!(" - Validator syncing with network. Validator chain height: {}, Estimated Network chain height: {}",
&blockchain_info.blocks.0,
&blockchain_info.estimated_height.0
&blockchain_info.blocks.0,
&blockchain_info.estimated_height.0
);
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
continue;