Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compact block cache pt1 #158

Merged
merged 22 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 125 additions & 92 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 8 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ zcash_protocol = { git = "https://github.com/zingolabs/librustzcash.git", tag =


# Zebra
zebra-chain = { git = "https://github.com/idky137/zebra.git", branch = "add_public_func_for_zaino_pt2" }
zebra-state = { git = "https://github.com/idky137/zebra.git", branch = "add_public_func_for_zaino_pt2" }
zebra-rpc = { git = "https://github.com/idky137/zebra.git", branch = "add_public_func_for_zaino_pt2" }
# This should be changed to a specific release tag once Zebra regtest stabalizes, currently main is used to fetch most recent bug fixes from Zebra.
zebra-chain = { git = "https://github.com/ZcashFoundation/zebra.git", branch = "main" }
zebra-state = { git = "https://github.com/ZcashFoundation/zebra.git", branch = "main" }
zebra-rpc = { git = "https://github.com/ZcashFoundation/zebra.git", branch = "main" }


# Zcash-Local-Net
zcash_local_net = { git = "https://github.com/zingolabs/zcash-local-net.git", rev = "d64b76cab748455c5b9e1e4412f1689fb66f7d41", features = [ "test_fixtures" ] }
Expand Down Expand Up @@ -72,5 +74,7 @@ whoami = "1.5"
tower = { version = "0.4", features = ["buffer", "util"] }
async-trait = "0.1"
chrono = "0.4"
jsonrpc-core = "18.0.0"
jsonrpc-core = "18.0"
jsonrpsee-types = "0.24"
dashmap = "6.1"
lmdb = "0.8"
12 changes: 10 additions & 2 deletions docs/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,18 @@
3) [Zcashd, Zcash-Cli](https://github.com/zcash/zcash)

### Unit Tests
- To run Unit tests:
1) Simlink or copy compiled `zebrad`, zcashd` and `zcash-cli` binaries to `$ zaino/test_binaries/bins/*`

- To run Zaino-testutils tests:
2) Run `$ cargo nextest run -p zaino_testutils`

- To run Zaino-State FetchService tests:
2) Run `$ cargo nextest run fetch_service`

- To run Zaino-State StateService tests:
2) Generate the zcashd chain cache `cargo nextest run generate_zcashd_chain_cache --run-ignored ignored-only`
3) Generate the zebrad chain cache `cargo nextest run generate_zebrad_large_chain_cache --run-ignored ignored-only`
4) Run `$ cargo nextest run tests`
4) Run `$ cargo nextest run state_service --no-capture`

*NOTE: As we currently have several bugs using Zebra's regtest mode for our tests, we are having to rely on loading cached chain-data instead of creating chain data dynamically. Due to this, and the fact that Zebra requires a lock on its chain cache, all unit tests in zaino-state (and any others relying on loading cached chain data) must be run sequentially. This can be done by running tests with the `--no-capture` flag. Eg. `cargo nextest run -p zaino-state --no-capture`.

Expand All @@ -35,3 +42,4 @@ See the `get_subtree_roots_sapling` test fixture doc comments in zcash_local_net
2) copy the Zebrad mainnet `state` cache to `zaino/integration-tests/chain_cache/get_subtree_roots_orchard` directory.
See the `get_subtree_roots_orchard` test fixture doc comments in zcash_local_net for more details.

NOTE: These tests are currently not working and should be ignored until fixed in `zcash-local-net`.
2 changes: 1 addition & 1 deletion integration-tests/tests/wallet_to_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ mod wallet_basic {
)
.await
.unwrap();


dbg!(unfinalised_transactions.clone());

Expand Down
5 changes: 1 addition & 4 deletions rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
[toolchain]
channel = "1.81.0"
channel = "stable"
components = ["rustfmt", "clippy"]

[profile]
minimal = true
3 changes: 2 additions & 1 deletion zaino-fetch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ thiserror = { workspace = true }
prost = { workspace = true }
reqwest = { workspace = true }
url = { workspace = true }
serde_json = { workspace = true, features = ["preserve_order"] } # The preserve_order feature in serde_jsonn is a dependency of jsonrpc-core
serde_json = { workspace = true, features = ["preserve_order"] }
serde = { workspace = true, features = ["derive"] }
hex = { workspace = true, features = ["serde"] }
indexmap = { workspace = true, features = ["serde"] }
base64 = { workspace = true }
byteorder = { workspace = true }
sha2 = { workspace = true }
jsonrpsee-types = { workspace = true }

19 changes: 19 additions & 0 deletions zaino-fetch/src/jsonrpc/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,25 @@ impl RpcError {
data: None,
}
}

/// Creates a new `RpcError` from jsonrpsee-types `ErrorObject`.
pub fn new_from_errorobject(
error_obj: jsonrpsee_types::ErrorObject<'_>,
fallback_message: impl Into<String>,
) -> Self {
RpcError {
// We can use the actual JSON-RPC code:
code: error_obj.code() as i64,

// Or combine the fallback with the original message:
message: format!("{}: {}", fallback_message.into(), error_obj.message()),

// If you want to store the data too:
data: error_obj
.data()
.map(|raw| serde_json::from_str(raw.get()).unwrap_or_default()),
}
}
}

impl fmt::Display for RpcError {
Expand Down
6 changes: 5 additions & 1 deletion zaino-state/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@ chrono = { workspace = true }
indexmap = { workspace = true }
url = { workspace = true }
hex = { workspace = true, features = ["serde"] }
jsonrpc-core = { workspace = true }
tokio-stream = { workspace = true }
futures = { workspace = true }
tonic = { workspace = true }
http = { workspace = true }
lazy-regex = { workspace = true }
dashmap = { workspace = true }
lmdb = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true, features = ["preserve_order"] }
jsonrpc-core = { workspace = true }
prost = { workspace = true }

[dev-dependencies]
zaino-testutils = { path = "../zaino-testutils" }
Expand Down
76 changes: 47 additions & 29 deletions zaino-state/src/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ use crate::status::StatusType;

/// A generic, thread-safe broadcaster that manages mutable state and notifies clients of updates.
#[derive(Clone)]
pub struct Broadcast<K, V> {
pub(crate) struct Broadcast<K, V> {
state: Arc<DashMap<K, V>>,
notifier: watch::Sender<StatusType>,
}

impl<K: Eq + Hash + Clone, V: Clone> Broadcast<K, V> {
/// Creates a new Broadcast instance, uses default dashmap spec.
pub fn new_default() -> Self {
pub(crate) fn new_default() -> Self {
let (notifier, _) = watch::channel(StatusType::Spawning);
Self {
state: Arc::new(DashMap::new()),
Expand All @@ -24,7 +24,7 @@ impl<K: Eq + Hash + Clone, V: Clone> Broadcast<K, V> {
}

/// Creates a new Broadcast instance, exposes dashmap spec.
pub fn new_custom(capacity: usize, shard_amount: usize) -> Self {
pub(crate) fn new_custom(capacity: usize, shard_amount: usize) -> Self {
let (notifier, _) = watch::channel(StatusType::Spawning);
Self {
state: Arc::new(DashMap::with_capacity_and_shard_amount(
Expand All @@ -35,22 +35,26 @@ impl<K: Eq + Hash + Clone, V: Clone> Broadcast<K, V> {
}
}

/// Inserts or updates an entry in the state and broadcasts an update.
pub fn insert(&self, key: K, value: V, status: StatusType) {
/// Inserts or updates an entry in the state and optionally broadcasts an update.
#[allow(dead_code)]
pub(crate) fn insert(&self, key: K, value: V, status: Option<StatusType>) {
self.state.insert(key, value);
let _ = self.notifier.send(status);
if let Some(status) = status {
let _ = self.notifier.send(status);
}
}

/// Inserts or updates an entry in the state and broadcasts an update.
pub fn insert_set(&self, set: Vec<(K, V)>, status: StatusType) {
#[allow(dead_code)]
pub(crate) fn insert_set(&self, set: Vec<(K, V)>, status: StatusType) {
for (key, value) in set {
self.state.insert(key, value);
}
let _ = self.notifier.send(status);
}

/// Inserts only new entries from the set into the state and broadcasts an update.
pub fn insert_filtered_set(&self, set: Vec<(K, V)>, status: StatusType) {
pub(crate) fn insert_filtered_set(&self, set: Vec<(K, V)>, status: StatusType) {
for (key, value) in set {
// Check if the key is already in the map
if self.state.get(&key).is_none() {
Expand All @@ -61,20 +65,25 @@ impl<K: Eq + Hash + Clone, V: Clone> Broadcast<K, V> {
}

/// Removes an entry from the state and broadcasts an update.
pub fn remove(&self, key: &K, status: StatusType) {
#[allow(dead_code)]
pub(crate) fn remove(&self, key: &K, status: Option<StatusType>) {
self.state.remove(key);
let _ = self.notifier.send(status);
if let Some(status) = status {
let _ = self.notifier.send(status);
}
}

/// Retrieves a value from the state by key.
pub fn get(&self, key: &K) -> Option<Arc<V>> {
#[allow(dead_code)]
pub(crate) fn get(&self, key: &K) -> Option<Arc<V>> {
self.state
.get(key)
.map(|entry| Arc::new((*entry.value()).clone()))
}

/// Retrieves a set of values from the state by a list of keys.
pub fn get_set(&self, keys: &[K]) -> Vec<(K, Arc<V>)> {
#[allow(dead_code)]
pub(crate) fn get_set(&self, keys: &[K]) -> Vec<(K, Arc<V>)> {
keys.iter()
.filter_map(|key| {
self.state
Expand All @@ -85,30 +94,32 @@ impl<K: Eq + Hash + Clone, V: Clone> Broadcast<K, V> {
}

/// Checks if a key exists in the state.
pub fn contains_key(&self, key: &K) -> bool {
#[allow(dead_code)]
pub(crate) fn contains_key(&self, key: &K) -> bool {
self.state.contains_key(key)
}

/// Returns a receiver to listen for state update notifications.
pub fn subscribe(&self) -> watch::Receiver<StatusType> {
pub(crate) fn subscribe(&self) -> watch::Receiver<StatusType> {
self.notifier.subscribe()
}

/// Returns a [`BroadcastSubscriber`] to the [`Broadcast`].
pub fn subscriber(&self) -> BroadcastSubscriber<K, V> {
pub(crate) fn subscriber(&self) -> BroadcastSubscriber<K, V> {
BroadcastSubscriber {
state: self.get_state(),
notifier: self.subscribe(),
}
}

/// Provides read access to the internal state.
pub fn get_state(&self) -> Arc<DashMap<K, V>> {
pub(crate) fn get_state(&self) -> Arc<DashMap<K, V>> {
Arc::clone(&self.state)
}

/// Returns the whole state excluding keys in the ignore list.
pub fn get_filtered_state(&self, ignore_list: &HashSet<K>) -> Vec<(K, V)> {
#[allow(dead_code)]
pub(crate) fn get_filtered_state(&self, ignore_list: &HashSet<K>) -> Vec<(K, V)> {
self.state
.iter()
.filter(|entry| !ignore_list.contains(entry.key()))
Expand All @@ -117,22 +128,24 @@ impl<K: Eq + Hash + Clone, V: Clone> Broadcast<K, V> {
}

/// Clears all entries from the state.
pub fn clear(&self) {
pub(crate) fn clear(&self) {
self.state.clear();
}

/// Returns the number of entries in the state.
pub fn len(&self) -> usize {
#[allow(dead_code)]
pub(crate) fn len(&self) -> usize {
self.state.len()
}

/// Returns true if the state is empty.
pub fn is_empty(&self) -> bool {
#[allow(dead_code)]
pub(crate) fn is_empty(&self) -> bool {
self.state.is_empty()
}

/// Broadcasts an update.
pub fn notify(&self, status: StatusType) {
pub(crate) fn notify(&self, status: StatusType) {
if self.notifier.send(status).is_err() {
eprintln!("No subscribers are currently listening for updates.");
}
Expand Down Expand Up @@ -163,28 +176,30 @@ impl<K: Eq + Hash + Clone + std::fmt::Debug, V: Clone + std::fmt::Debug> std::fm

/// A generic, thread-safe broadcaster that manages mutable state and notifies clients of updates.
#[derive(Clone)]
pub struct BroadcastSubscriber<K, V> {
pub(crate) struct BroadcastSubscriber<K, V> {
state: Arc<DashMap<K, V>>,
notifier: watch::Receiver<StatusType>,
}

impl<K: Eq + Hash + Clone, V: Clone> BroadcastSubscriber<K, V> {
/// Waits on notifier update and returns StatusType.
pub async fn wait_on_notifier(&mut self) -> Result<StatusType, watch::error::RecvError> {
pub(crate) async fn wait_on_notifier(&mut self) -> Result<StatusType, watch::error::RecvError> {
self.notifier.changed().await?;
let status = self.notifier.borrow().clone();
Ok(status)
}

/// Retrieves a value from the state by key.
pub fn get(&self, key: &K) -> Option<Arc<V>> {
#[allow(dead_code)]
pub(crate) fn get(&self, key: &K) -> Option<Arc<V>> {
self.state
.get(key)
.map(|entry| Arc::new((*entry.value()).clone()))
}

/// Retrieves a set of values from the state by a list of keys.
pub fn get_set(&self, keys: &[K]) -> Vec<(K, Arc<V>)> {
#[allow(dead_code)]
pub(crate) fn get_set(&self, keys: &[K]) -> Vec<(K, Arc<V>)> {
keys.iter()
.filter_map(|key| {
self.state
Expand All @@ -195,12 +210,13 @@ impl<K: Eq + Hash + Clone, V: Clone> BroadcastSubscriber<K, V> {
}

/// Checks if a key exists in the state.
pub fn contains_key(&self, key: &K) -> bool {
#[allow(dead_code)]
pub(crate) fn contains_key(&self, key: &K) -> bool {
self.state.contains_key(key)
}

/// Returns the whole state excluding keys in the ignore list.
pub fn get_filtered_state(&self, ignore_list: &HashSet<K>) -> Vec<(K, V)> {
pub(crate) fn get_filtered_state(&self, ignore_list: &HashSet<K>) -> Vec<(K, V)> {
self.state
.iter()
.filter(|entry| !ignore_list.contains(entry.key()))
Expand All @@ -209,12 +225,14 @@ impl<K: Eq + Hash + Clone, V: Clone> BroadcastSubscriber<K, V> {
}

/// Returns the number of entries in the state.
pub fn len(&self) -> usize {
#[allow(dead_code)]
pub(crate) fn len(&self) -> usize {
self.state.len()
}

/// Returns true if the state is empty.
pub fn is_empty(&self) -> bool {
#[allow(dead_code)]
pub(crate) fn is_empty(&self) -> bool {
self.state.is_empty()
}
}
Expand Down
Loading
Loading