Skip to content

Commit

Permalink
Add massa event cache crate (#4769)
Browse files Browse the repository at this point in the history
* Add massa event cache crate

* Add event cache controller into massa execution

* Cargo fmt

* Add event cache config in masa-node

* Minor fixes

* Cargo clippy fixes

* Cargo clippy fixes

* Add limits & security checks

* Add controller + manager

* Cargo fmt pass

* Fix check/clippy with --all-targets

* Better event cache clear at startup && better filtering

* Rename to config to max_call_stack_length

* Improve event cache filtering

* Avoid lock contention in controller::get_filtered_sc_output_events

* Improve comment

* Add query limit

* Add tick delay in event cache writer thread

* Use per address / operation id / is_error counters

* Cargo fmt

* typos fixes

* Cargo clippy fixes for tests

* Cargo fmt

* Add mock expectations + impl

* Cargo clippy for TU fixes

* Use MAX_EVENT_PER_OPERATION constant

* Unit test the filter optimisations

* Add more doc

* Cargo clippy fixes

* Use ..Default::default in TU

* Cargo clippy fix

* Use scope

* Use scope 2

* Remove tick_delay + directly mem::take struct

* Add tu for counter removal

* Add KeyKind in KeyBuilder

* Wait for condvar in wait_loop_event

* Removed unused lib

* Condvar wait fix

* Truncate event message in case of error
  • Loading branch information
sydhds authored Dec 19, 2024
1 parent b5834b2 commit e6b0288
Show file tree
Hide file tree
Showing 25 changed files with 3,446 additions and 419 deletions.
1,038 changes: 652 additions & 386 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ members = [
"massa-versioning",
"massa-grpc",
"massa-xtask",
"massa-event-cache",
]
resolver = "2"

Expand Down Expand Up @@ -105,6 +106,7 @@ massa_test_framework = { path = "./massa-test-framework" }
massa_time = { path = "./massa-time" }
massa_versioning = { path = "./massa-versioning" }
massa_wallet = { path = "./massa-wallet" }
massa_event_cache = { path = "./massa-event-cache" }

# Massa projects dependencies
# massa-proto-rs = { git = "https://github.com/massalabs/massa-proto-rs", branch = "deferred_calls" }
Expand Down
3 changes: 2 additions & 1 deletion _typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ extend-ignore-re = [
# Secret key (S): 18 - 62 characters
# Public key (P): 18 - 62 characters
# NodeId (N)
"(AU|AS|N|S|P)\\d\\w{18,62}",
# OperationId (O)
"(AU|AS|N|S|P|O)\\d\\w{18,62}",
]

[default.extend-words]
Expand Down
6 changes: 3 additions & 3 deletions massa-db-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
//! * if we want to delete item a: 1000 ^ 0011 == 1011 (== item b)
//! * if we want to delete item b: 1000 ^ 1011 == 0011 (== item a)
//!
//! Note that this does not provides "Proof of present" nor "Proof of Absence"
//! Note that this does not provide "Proof of present" nor "Proof of Absence"
//! (operations avail with Merkle trees)
//!
//! For more details here: https://github.com/massalabs/massa/discussions/3852#discussioncomment-6188158
Expand All @@ -45,10 +45,10 @@
//! # Caches
//!
//! A cache of db changes is kept in memory allowing to easily stream it
//! (by streaming, we means: sending it to another massa node (aka bootstrap))
//! (by streaming, we mean: sending it to another massa node (aka bootstrap))
//! There is 2 separate caches: one for 'state' and one for 'versioning'
//!
//! These caches is stored as a key, value: slot -> insertion_data|deletion_data.
//! These caches are stored as a key, value: slot -> insertion_data|deletion_data.
//!
//! # Streaming steps
//!
Expand Down
33 changes: 33 additions & 0 deletions massa-event-cache/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
[package]
name = "massa_event_cache"
version = "0.1.0"
edition = "2021"

[features]
test-exports = [
"massa_models/test-exports",
"mockall",
"mockall_wrap"
]


[dependencies]
nom = {workspace = true}
rocksdb = {workspace = true}
tracing = {workspace = true}
parking_lot = { workspace = true }
num_enum = { workspace = true }
massa_models = {workspace = true}
massa_serialization = {workspace = true}
massa_time = {workspace = true}
mockall = {workspace = true, optional = true}
mockall_wrap = {workspace = true, optional = true}

[dev-dependencies]
tempfile = {workspace = true}
serial_test = {workspace = true}
more-asserts = {workspace = true}
rand = {workspace = true}
mockall = {workspace = true}
mockall_wrap = {workspace = true}
massa_models = { workspace = true, features = ["test-exports"] }
22 changes: 22 additions & 0 deletions massa-event-cache/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use std::path::PathBuf;

pub struct EventCacheConfig {
/// Path to the hard drive cache storage
pub event_cache_path: PathBuf,
/// Maximum number of entries we want to keep in the event cache
pub max_event_cache_length: usize,
/// Amount of entries removed when `event_cache_size` is reached
pub snip_amount: usize,
/// Maximum length of an event data (aka event message)
pub max_event_data_length: u64,
/// Thread count
pub thread_count: u8,
/// Call stack max length
pub max_call_stack_length: u16,
/// Maximum number of events per operation
pub max_events_per_operation: u64,
/// Maximum number of operations per block
pub max_operations_per_block: u64,
/// Maximum events returned in a query
pub max_events_per_query: usize,
}
136 changes: 136 additions & 0 deletions massa-event-cache/src/controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// std
use std::collections::{BTreeSet, VecDeque};
use std::sync::Arc;
// third-party
use parking_lot::{Condvar, Mutex, RwLock};
// internal
use crate::event_cache::EventCache;
use massa_models::execution::EventFilter;
use massa_models::output_event::SCOutputEvent;

/// structure used to communicate with controller
#[derive(Debug, Default)]
pub(crate) struct EventCacheWriterInputData {
/// set stop to true to stop the thread
pub stop: bool,
pub(crate) events: VecDeque<SCOutputEvent>,
}

impl EventCacheWriterInputData {
pub fn new() -> Self {
Self {
stop: Default::default(),
events: Default::default(),
}
}

/*
/// Takes the current input data into a clone that is returned,
/// and resets self.
pub fn take(&mut self) -> Self {
Self {
stop: std::mem::take(&mut self.stop),
events: std::mem::take(&mut self.events),
}
}
*/
}

/// interface that communicates with the worker thread
#[cfg_attr(feature = "test-exports", mockall_wrap::wrap, mockall::automock)]
pub trait EventCacheController: Send + Sync {
fn save_events(&self, events: VecDeque<SCOutputEvent>);

fn get_filtered_sc_output_events(&self, filter: &EventFilter) -> Vec<SCOutputEvent>;
}

#[derive(Clone)]
/// implementation of the event cache controller
pub struct EventCacheControllerImpl {
/// input data to process in the VM loop
/// with a wake-up condition variable that needs to be triggered when the data changes
pub(crate) input_data: Arc<(Condvar, Mutex<EventCacheWriterInputData>)>,
/// Event cache
pub(crate) cache: Arc<RwLock<EventCache>>,
}

impl EventCacheController for EventCacheControllerImpl {
fn save_events(&self, events: VecDeque<SCOutputEvent>) {
// lock input data
let mut input_data = self.input_data.1.lock();
input_data.events.extend(events);
// Wake up the condvar in EventCacheWriterThread waiting for events
self.input_data.0.notify_all();
}

fn get_filtered_sc_output_events(&self, filter: &EventFilter) -> Vec<SCOutputEvent> {
let mut res_0 = {
// Read from new events first
let lock_0 = self.input_data.1.lock();
#[allow(clippy::unnecessary_filter_map)]
let it = lock_0.events.iter().filter_map(|event| {
if let Some(start) = filter.start {
if event.context.slot < start {
return None;
}
}
if let Some(end) = filter.end {
if event.context.slot >= end {
return None;
}
}
if let Some(is_final) = filter.is_final {
if event.context.is_final != is_final {
return None;
}
}
if let Some(is_error) = filter.is_error {
if event.context.is_error != is_error {
return None;
}
}
match (
filter.original_caller_address,
event.context.call_stack.front(),
) {
(Some(addr1), Some(addr2)) if addr1 != *addr2 => return None,
(Some(_), None) => return None,
_ => (),
}
match (filter.emitter_address, event.context.call_stack.back()) {
(Some(addr1), Some(addr2)) if addr1 != *addr2 => return None,
(Some(_), None) => return None,
_ => (),
}
match (
filter.original_operation_id,
event.context.origin_operation_id,
) {
(Some(addr1), Some(addr2)) if addr1 != addr2 => return None,
(Some(_), None) => return None,
_ => (),
}
Some(event)
});

let res_0: BTreeSet<SCOutputEvent> = it.cloned().collect();
// Drop the lock on the queue as soon as possible to avoid deadlocks
drop(lock_0);
res_0
};

let res_1 = {
// Read from db (on disk) events
let lock = self.cache.read();
let (_, res_1) = lock.get_filtered_sc_output_events(filter);
// Drop the lock on the event cache db asap
drop(lock);
res_1
};

// Merge results
let res_1: BTreeSet<SCOutputEvent> = BTreeSet::from_iter(res_1);
res_0.extend(res_1);
Vec::from_iter(res_0)
}
}
Loading

0 comments on commit e6b0288

Please sign in to comment.