Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii-indexer): parallelize models & event messages #2912

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 106 additions & 58 deletions crates/torii/indexer/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use anyhow::Result;
use bitflags::bitflags;
use cainome::cairo_serde::CairoSerde;
use dojo_utils::provider as provider_utils;
use dojo_world::contracts::world::WorldContractReader;
use futures_util::future::{join_all, try_join_all};
Expand All @@ -17,7 +18,7 @@
};
use starknet::core::utils::get_selector_from_name;
use starknet::providers::Provider;
use starknet_crypto::Felt;
use starknet_crypto::{poseidon_hash_many, Felt};
use tokio::sync::broadcast::Sender;
use tokio::sync::mpsc::Sender as BoundedSender;
use tokio::sync::Semaphore;
Expand Down Expand Up @@ -207,6 +208,9 @@
pub event: Event,
}

type TaskPriority = usize;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wdyt about an enum, using the Ord trait to keep keys ordered in the BTreeMap?
Feeling like enum would be more idiomatic, and remove the magic values like we have in the engine.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like:

enum TaskPriority {
    Low,
    Medium,
    High,
}

type TaskId = u64;

#[allow(missing_debug_implementations)]
pub struct Engine<P: Provider + Send + Sync + std::fmt::Debug + 'static> {
world: Arc<WorldContractReader<P>>,
Expand All @@ -216,7 +220,7 @@
config: EngineConfig,
shutdown_tx: Sender<()>,
block_tx: Option<BoundedSender<u64>>,
tasks: HashMap<u64, Vec<(ContractType, ParallelizedEvent)>>,
tasks: BTreeMap<TaskPriority, HashMap<TaskId, Vec<(ContractType, ParallelizedEvent)>>>,
contracts: Arc<HashMap<Felt, ContractType>>,
}

Expand Down Expand Up @@ -250,7 +254,7 @@
shutdown_tx,
block_tx,
contracts,
tasks: HashMap::new(),
tasks: BTreeMap::new(),
}
}

Expand Down Expand Up @@ -596,44 +600,72 @@
}

async fn process_tasks(&mut self) -> Result<()> {
// We use a semaphore to limit the number of concurrent tasks
let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_tasks));

// Run all tasks concurrently
let mut handles = Vec::new();
for (task_id, events) in self.tasks.drain() {
let db = self.db.clone();
let world = self.world.clone();
let semaphore = semaphore.clone();
let processors = self.processors.clone();

let event_processor_config = self.config.event_processor_config.clone();
handles.push(tokio::spawn(async move {
let _permit = semaphore.acquire().await?;
let mut local_db = db.clone();
for (contract_type, ParallelizedEvent { event_id, event, block_number, block_timestamp }) in events {
let contract_processors = processors.get_event_processor(contract_type);
if let Some(processors) = contract_processors.get(&event.keys[0]) {

let processor = processors.iter().find(|p| p.validate(&event)).expect("Must find atleast one processor for the event");

debug!(target: LOG_TARGET, event_name = processor.event_key(), task_id = %task_id, "Processing parallelized event.");

if let Err(e) = processor
.process(&world, &mut local_db, block_number, block_timestamp, &event_id, &event, &event_processor_config)
.await
{
error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, task_id = %task_id, "Processing parallelized event.");
// Process each priority level sequentially
for (priority, task_group) in std::mem::take(&mut self.tasks) {
let mut handles = Vec::new();

// Process all tasks within this priority level concurrently
for (task_id, events) in task_group {
let db = self.db.clone();
let world = self.world.clone();
let semaphore = semaphore.clone();
let processors = self.processors.clone();
let event_processor_config = self.config.event_processor_config.clone();

handles.push(tokio::spawn(async move {
let _permit = semaphore.acquire().await?;
let mut local_db = db.clone();

// Process all events for this task sequentially
for (contract_type, event) in events {
let contract_processors = processors.get_event_processor(contract_type);
if let Some(processors) = contract_processors.get(&event.event.keys[0]) {
let processor = processors
.iter()
.find(|p| p.validate(&event.event))
.expect("Must find at least one processor for the event");

debug!(
target: LOG_TARGET,
event_name = processor.event_key(),
task_id = %task_id,
priority = %priority,
"Processing parallelized event."

Check warning on line 635 in crates/torii/indexer/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/indexer/src/engine.rs#L632-L635

Added lines #L632 - L635 were not covered by tests
);

if let Err(e) = processor
.process(
&world,
&mut local_db,
event.block_number,
event.block_timestamp,
&event.event_id,
&event.event,
&event_processor_config,
)
.await
{
error!(

Check warning on line 650 in crates/torii/indexer/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/indexer/src/engine.rs#L650

Added line #L650 was not covered by tests
target: LOG_TARGET,
event_name = processor.event_key(),
error = %e,
task_id = %task_id,
priority = %priority,
"Processing parallelized event."

Check warning on line 656 in crates/torii/indexer/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/indexer/src/engine.rs#L652-L656

Added lines #L652 - L656 were not covered by tests
);
}
}
}
}

Ok::<_, anyhow::Error>(())
}));
}
Ok::<_, anyhow::Error>(())
}));
}

// Join all tasks
try_join_all(handles).await?;
// Wait for all tasks in this priority level to complete before moving to next priority
try_join_all(handles).await?;
}

Ok(())
}
Expand Down Expand Up @@ -802,14 +834,7 @@
contract_type: ContractType,
) -> Result<()> {
if self.config.flags.contains(IndexingFlags::RAW_EVENTS) {
match contract_type {
ContractType::WORLD => {
self.db.store_event(event_id, event, transaction_hash, block_timestamp)?;
}
// ERC events needs to be processed inside there respective processor
// we store transfer events for ERC contracts regardless of this flag
ContractType::ERC20 | ContractType::ERC721 => {}
}
self.db.store_event(event_id, event, transaction_hash, block_timestamp)?;

Check warning on line 837 in crates/torii/indexer/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/indexer/src/engine.rs#L837

Added line #L837 was not covered by tests
}

let event_key = event.keys[0];
Expand Down Expand Up @@ -856,30 +881,53 @@
.find(|p| p.validate(event))
.expect("Must find atleast one processor for the event");

let task_identifier = match processor.event_key().as_str() {
let (task_priority, task_identifier) = match processor.event_key().as_str() {
"ModelRegistered" | "EventRegistered" => {
let mut hasher = DefaultHasher::new();
event.keys.iter().for_each(|k| k.hash(&mut hasher));
let hash = hasher.finish();
(0usize, hash) // Priority 0 (highest) for model/event registration
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
(0usize, hash) // Priority 0 (highest) for model/event registration
(TaskPriority::High, hash) // Priority 0 (highest) for model/event registration

}
"StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => {
let mut hasher = DefaultHasher::new();
// model selector
event.keys[1].hash(&mut hasher);
// entity id
event.keys[2].hash(&mut hasher);
hasher.finish()
let hash = hasher.finish();
(2usize, hash) // Priority 2 (lower) for store operations
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
(2usize, hash) // Priority 2 (lower) for store operations
(TaskPriority::Low, hash) // Priority 2 (lower) for store operations

}
"EventEmitted" => {
let mut hasher = DefaultHasher::new();

let keys = Vec::<Felt>::cairo_deserialize(&event.data, 0).unwrap_or_else(|e| {
panic!("Expected EventEmitted keys to be well formed: {:?}", e);

Check warning on line 902 in crates/torii/indexer/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/indexer/src/engine.rs#L902

Added line #L902 was not covered by tests
});

// selector
event.keys[1].hash(&mut hasher);
// entity id
let entity_id = poseidon_hash_many(&keys);
entity_id.hash(&mut hasher);

let hash = hasher.finish();
(2usize, hash) // Priority 2 for event messages
}
Comment on lines +898 to 913
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Replace panic with proper error handling in EventEmitted processing.

The current implementation panics on deserialization failure, which is not production-safe.

Apply this diff to handle errors gracefully:

 "EventEmitted" => {
     let mut hasher = DefaultHasher::new();
 
-    let keys = Vec::<Felt>::cairo_deserialize(&event.data, 0).unwrap_or_else(|e| {
-        panic!("Expected EventEmitted keys to be well formed: {:?}", e);
-    });
+    let keys = match Vec::<Felt>::cairo_deserialize(&event.data, 0) {
+        Ok(keys) => keys,
+        Err(e) => {
+            error!(
+                target: LOG_TARGET,
+                error = ?e,
+                "Failed to deserialize EventEmitted keys. Processing immediately."
+            );
+            return (0, 0); // Process immediately as fallback
+        }
+    };
 
     // selector
     event.keys[1].hash(&mut hasher);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"EventEmitted" => {
let mut hasher = DefaultHasher::new();
let keys = Vec::<Felt>::cairo_deserialize(&event.data, 0).unwrap_or_else(|e| {
panic!("Expected EventEmitted keys to be well formed: {:?}", e);
});
// selector
event.keys[1].hash(&mut hasher);
// entity id
let entity_id = poseidon_hash_many(&keys);
entity_id.hash(&mut hasher);
let hash = hasher.finish();
(2usize, hash) // Priority 2 for event messages
}
"EventEmitted" => {
let mut hasher = DefaultHasher::new();
let keys = match Vec::<Felt>::cairo_deserialize(&event.data, 0) {
Ok(keys) => keys,
Err(e) => {
error!(
target: LOG_TARGET,
error = ?e,
"Failed to deserialize EventEmitted keys. Processing immediately."
);
return (0, 0); // Process immediately as fallback
}
};
// selector
event.keys[1].hash(&mut hasher);
// entity id
let entity_id = poseidon_hash_many(&keys);
entity_id.hash(&mut hasher);
let hash = hasher.finish();
(2usize, hash) // Priority 2 for event messages
}

_ => 0,
_ => (0, 0), // No parallelization for other events

Check warning on line 914 in crates/torii/indexer/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/indexer/src/engine.rs#L914

Added line #L914 was not covered by tests
};

// if we have a task identifier, we queue the event to be parallelized
if task_identifier != 0 {
self.tasks.entry(task_identifier).or_default().push((
contract_type,
ParallelizedEvent {
event_id: event_id.to_string(),
event: event.clone(),
block_number,
block_timestamp,
},
));
self.tasks.entry(task_priority).or_default().entry(task_identifier).or_default().push(
(
contract_type,
ParallelizedEvent {
event_id: event_id.to_string(),
event: event.clone(),
block_number,
block_timestamp,
},
),
);
} else {
// Process non-parallelized events immediately
// if we dont have a task identifier, we process the event immediately
if processor.validate(event) {
if let Err(e) = processor
Expand Down
12 changes: 9 additions & 3 deletions crates/torii/indexer/src/processors/store_del_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
block_timestamp: u64,
event_id: &str,
event: &Event,
_config: &EventProcessorConfig,
config: &EventProcessorConfig,
) -> Result<(), Error> {
// Torii version is coupled to the world version, so we can expect the event to be well
// formed.
Expand All @@ -55,15 +55,21 @@
// This can happen if only specific namespaces are indexed.
let model = match db.model(event.selector).await {
Ok(m) => m,
Err(e) if e.to_string().contains("no rows") => {
Err(e) if e.to_string().contains("no rows") && !config.namespaces.is_empty() => {
debug!(
target: LOG_TARGET,
selector = %event.selector,
"Model does not exist, skipping."
);
return Ok(());
}
Err(e) => return Err(e),
Err(e) => {
return Err(anyhow::anyhow!(
"Failed to retrieve model with selector {:#x}: {}",
event.selector,
e
));

Check warning on line 71 in crates/torii/indexer/src/processors/store_del_record.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/indexer/src/processors/store_del_record.rs#L67-L71

Added lines #L67 - L71 were not covered by tests
}
};

info!(
Expand Down
12 changes: 9 additions & 3 deletions crates/torii/indexer/src/processors/store_set_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
block_timestamp: u64,
event_id: &str,
event: &Event,
_config: &EventProcessorConfig,
config: &EventProcessorConfig,
) -> Result<(), Error> {
// Torii version is coupled to the world version, so we can expect the event to be well
// formed.
Expand All @@ -56,15 +56,21 @@
// This can happen if only specific namespaces are indexed.
let model = match db.model(event.selector).await {
Ok(m) => m,
Err(e) if e.to_string().contains("no rows") => {
Err(e) if e.to_string().contains("no rows") && !config.namespaces.is_empty() => {
debug!(
target: LOG_TARGET,
selector = %event.selector,
"Model does not exist, skipping."
);
return Ok(());
}
Err(e) => return Err(e),
Err(e) => {
return Err(anyhow::anyhow!(
"Failed to retrieve model with selector {:#x}: {}",
event.selector,
e
));

Check warning on line 72 in crates/torii/indexer/src/processors/store_set_record.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/indexer/src/processors/store_set_record.rs#L68-L72

Added lines #L68 - L72 were not covered by tests
}
};

info!(
Expand Down
21 changes: 15 additions & 6 deletions crates/torii/indexer/src/processors/store_update_member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use starknet::core::utils::get_selector_from_name;
use starknet::providers::Provider;
use torii_sqlite::Sql;
use tracing::info;
use tracing::{debug, info};

use super::{EventProcessor, EventProcessorConfig};

Expand Down Expand Up @@ -37,7 +37,7 @@
block_timestamp: u64,
event_id: &str,
event: &Event,
_config: &EventProcessorConfig,
config: &EventProcessorConfig,
) -> Result<(), Error> {
// Torii version is coupled to the world version, so we can expect the event to be well
// formed.
Expand All @@ -61,11 +61,20 @@
// This can happen if only specific namespaces are indexed.
let model = match db.model(model_selector).await {
Ok(m) => m,
Err(e) if e.to_string().contains("no rows") && !config.namespaces.is_empty() => {
debug!(
target: LOG_TARGET,
selector = %model_selector,
"Model does not exist, skipping."
);
return Ok(());
}

Check warning on line 71 in crates/torii/indexer/src/processors/store_update_member.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/indexer/src/processors/store_update_member.rs#L64-L71

Added lines #L64 - L71 were not covered by tests
Err(e) => {
if e.to_string().contains("no rows") {
return Ok(());
}
return Err(e);
return Err(anyhow::anyhow!(
"Failed to retrieve model with selector {:#x}: {}",
event.selector,
e
));

Check warning on line 77 in crates/torii/indexer/src/processors/store_update_member.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/indexer/src/processors/store_update_member.rs#L73-L77

Added lines #L73 - L77 were not covered by tests
}
};

Expand Down
12 changes: 9 additions & 3 deletions crates/torii/indexer/src/processors/store_update_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
block_timestamp: u64,
event_id: &str,
event: &Event,
_config: &EventProcessorConfig,
config: &EventProcessorConfig,
) -> Result<(), Error> {
// Torii version is coupled to the world version, so we can expect the event to be well
// formed.
Expand All @@ -59,15 +59,21 @@
// This can happen if only specific namespaces are indexed.
let model = match db.model(event.selector).await {
Ok(m) => m,
Err(e) if e.to_string().contains("no rows") => {
Err(e) if e.to_string().contains("no rows") && !config.namespaces.is_empty() => {
debug!(
target: LOG_TARGET,
selector = %event.selector,
"Model does not exist, skipping."
);
return Ok(());
}
Err(e) => return Err(e),
Err(e) => {
return Err(anyhow::anyhow!(
"Failed to retrieve model with selector {:#x}: {}",
event.selector,
e
));

Check warning on line 75 in crates/torii/indexer/src/processors/store_update_record.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/indexer/src/processors/store_update_record.rs#L71-L75

Added lines #L71 - L75 were not covered by tests
}
};

info!(
Expand Down
Loading