Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii-indexer): parallelize models & event messages #2912

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 90 additions & 50 deletions crates/torii/indexer/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ pub struct ParallelizedEvent {
pub event: Event,
}

type TaskPriority = usize;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wdyt about an enum, using the Ord trait to keep keys ordered in the BTreeMap?
Feeling like enum would be more idiomatic, and remove the magic values like we have in the engine.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like:

enum TaskPriority {
    Low,
    Medium,
    High,
}

type TaskId = u64;

#[allow(missing_debug_implementations)]
pub struct Engine<P: Provider + Send + Sync + std::fmt::Debug + 'static> {
world: Arc<WorldContractReader<P>>,
Expand All @@ -216,7 +219,7 @@ pub struct Engine<P: Provider + Send + Sync + std::fmt::Debug + 'static> {
config: EngineConfig,
shutdown_tx: Sender<()>,
block_tx: Option<BoundedSender<u64>>,
tasks: HashMap<u64, Vec<(ContractType, ParallelizedEvent)>>,
tasks: BTreeMap<TaskPriority, HashMap<TaskId, Vec<(ContractType, ParallelizedEvent)>>>,
contracts: Arc<HashMap<Felt, ContractType>>,
}

Expand Down Expand Up @@ -250,7 +253,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
shutdown_tx,
block_tx,
contracts,
tasks: HashMap::new(),
tasks: BTreeMap::new(),
}
}

Expand Down Expand Up @@ -596,44 +599,71 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
}

async fn process_tasks(&mut self) -> Result<()> {
// We use a semaphore to limit the number of concurrent tasks
let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_tasks));

// Run all tasks concurrently
let mut handles = Vec::new();
for (task_id, events) in self.tasks.drain() {
let db = self.db.clone();
let world = self.world.clone();
let semaphore = semaphore.clone();
let processors = self.processors.clone();

let event_processor_config = self.config.event_processor_config.clone();
handles.push(tokio::spawn(async move {
let _permit = semaphore.acquire().await?;
let mut local_db = db.clone();
for (contract_type, ParallelizedEvent { event_id, event, block_number, block_timestamp }) in events {
let contract_processors = processors.get_event_processor(contract_type);
if let Some(processors) = contract_processors.get(&event.keys[0]) {

let processor = processors.iter().find(|p| p.validate(&event)).expect("Must find atleast one processor for the event");

debug!(target: LOG_TARGET, event_name = processor.event_key(), task_id = %task_id, "Processing parallelized event.");

if let Err(e) = processor
.process(&world, &mut local_db, block_number, block_timestamp, &event_id, &event, &event_processor_config)
.await
{
error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, task_id = %task_id, "Processing parallelized event.");

// Process each priority level sequentially
for (priority, task_group) in std::mem::take(&mut self.tasks) {
let mut handles = Vec::new();

// Process all tasks within this priority level concurrently
for (task_id, events) in task_group {
let db = self.db.clone();
let world = self.world.clone();
let semaphore = semaphore.clone();
let processors = self.processors.clone();
let event_processor_config = self.config.event_processor_config.clone();

handles.push(tokio::spawn(async move {
let _permit = semaphore.acquire().await?;
let mut local_db = db.clone();

// Process all events for this task sequentially
for (contract_type, event) in events {
let contract_processors = processors.get_event_processor(contract_type);
if let Some(processors) = contract_processors.get(&event.event.keys[0]) {
let processor = processors.iter()
.find(|p| p.validate(&event.event))
.expect("Must find at least one processor for the event");

debug!(
target: LOG_TARGET,
event_name = processor.event_key(),
task_id = %task_id,
priority = %priority,
"Processing parallelized event."
);

if let Err(e) = processor
.process(
&world,
&mut local_db,
event.block_number,
event.block_timestamp,
&event.event_id,
&event.event,
&event_processor_config,
)
.await
{
error!(
target: LOG_TARGET,
event_name = processor.event_key(),
error = %e,
task_id = %task_id,
priority = %priority,
"Processing parallelized event."
);
}
}
}
}

Ok::<_, anyhow::Error>(())
}));
}
Ok::<_, anyhow::Error>(())
}));
}

// Join all tasks
try_join_all(handles).await?;
// Wait for all tasks in this priority level to complete before moving to next priority
try_join_all(handles).await?;
}

Ok(())
}
Expand Down Expand Up @@ -856,30 +886,40 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
.find(|p| p.validate(event))
.expect("Must find atleast one processor for the event");

let task_identifier = match processor.event_key().as_str() {
let (task_priority, task_identifier) = match processor.event_key().as_str() {
"ModelRegistered" | "EventRegistered" => {
let mut hasher = DefaultHasher::new();
event.keys.iter().for_each(|k| k.hash(&mut hasher));
let hash = hasher.finish();
(0usize, hash) // Priority 0 (highest) for model/event registration
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
(0usize, hash) // Priority 0 (highest) for model/event registration
(TaskPriority::High, hash) // Priority 0 (highest) for model/event registration

}
"StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => {
let mut hasher = DefaultHasher::new();
// model selector
event.keys[1].hash(&mut hasher);
// entity id
event.keys[2].hash(&mut hasher);
hasher.finish()
let hash = hasher.finish();
(2usize, hash) // Priority 2 (lower) for store operations
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
(2usize, hash) // Priority 2 (lower) for store operations
(TaskPriority::Low, hash) // Priority 2 (lower) for store operations

}
_ => 0,
_ => (0, 0) // No parallelization for other events
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix match expression syntax.

Add missing comma in the match expression.

             "StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => {
                 let mut hasher = DefaultHasher::new();
                 event.keys[1].hash(&mut hasher);
                 event.keys[2].hash(&mut hasher);
                 let hash = hasher.finish();
-                (2usize, hash) // Priority 2 (lower) for store operations
+                (2usize, hash), // Priority 2 (lower) for store operations
             }
             _ => (0, 0) // No parallelization for other events
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let (task_priority, task_identifier) = match processor.event_key().as_str() {
"ModelRegistered" | "EventRegistered" => {
let mut hasher = DefaultHasher::new();
event.keys.iter().for_each(|k| k.hash(&mut hasher));
let hash = hasher.finish();
(0usize, hash) // Priority 0 (highest) for model/event registration
}
"StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => {
let mut hasher = DefaultHasher::new();
// model selector
event.keys[1].hash(&mut hasher);
// entity id
event.keys[2].hash(&mut hasher);
hasher.finish()
let hash = hasher.finish();
(2usize, hash) // Priority 2 (lower) for store operations
}
_ => 0,
_ => (0, 0) // No parallelization for other events
let (task_priority, task_identifier) = match processor.event_key().as_str() {
"ModelRegistered" | "EventRegistered" => {
let mut hasher = DefaultHasher::new();
event.keys.iter().for_each(|k| k.hash(&mut hasher));
let hash = hasher.finish();
(0usize, hash) // Priority 0 (highest) for model/event registration
}
"StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => {
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
let hash = hasher.finish();
(2usize, hash), // Priority 2 (lower) for store operations
}
_ => (0, 0) // No parallelization for other events
🧰 Tools
🪛 GitHub Actions: ci

[warning] 900-900: Missing comma in match expression

};

// if we have a task identifier, we queue the event to be parallelized
if task_identifier != 0 {
self.tasks.entry(task_identifier).or_default().push((
contract_type,
ParallelizedEvent {
event_id: event_id.to_string(),
event: event.clone(),
block_number,
block_timestamp,
},
));
self.tasks
.entry(task_priority)
.or_default()
.entry(task_identifier)
.or_default()
.push((
contract_type,
ParallelizedEvent {
event_id: event_id.to_string(),
event: event.clone(),
block_number,
block_timestamp,
},
));
} else {
// Process non-parallelized events immediately
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Review handling of task_identifier equal to zero.

When task_identifier is zero, events are processed immediately outside of the priority queue. Ensure that zero cannot be a valid task_identifier that should be queued, or adjust the logic to handle zero appropriately to prevent unintended bypassing of the task queue.

// if we dont have a task identifier, we process the event immediately
if processor.validate(event) {
if let Err(e) = processor
Expand Down
12 changes: 9 additions & 3 deletions crates/torii/indexer/src/processors/store_del_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ where
block_timestamp: u64,
event_id: &str,
event: &Event,
_config: &EventProcessorConfig,
config: &EventProcessorConfig,
) -> Result<(), Error> {
// Torii version is coupled to the world version, so we can expect the event to be well
// formed.
Expand All @@ -55,15 +55,21 @@ where
// This can happen if only specific namespaces are indexed.
let model = match db.model(event.selector).await {
Ok(m) => m,
Err(e) if e.to_string().contains("no rows") => {
Err(e) if e.to_string().contains("no rows") && !config.namespaces.is_empty() => {
debug!(
target: LOG_TARGET,
selector = %event.selector,
"Model does not exist, skipping."
);
return Ok(());
}
Err(e) => return Err(e),
Err(e) => {
return Err(anyhow::anyhow!(
"Failed to retrieve model with selector {:#x}: {}",
event.selector,
e
))
}
};

info!(
Expand Down
12 changes: 9 additions & 3 deletions crates/torii/indexer/src/processors/store_set_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ where
block_timestamp: u64,
event_id: &str,
event: &Event,
_config: &EventProcessorConfig,
config: &EventProcessorConfig,
) -> Result<(), Error> {
// Torii version is coupled to the world version, so we can expect the event to be well
// formed.
Expand All @@ -56,15 +56,21 @@ where
// This can happen if only specific namespaces are indexed.
let model = match db.model(event.selector).await {
Ok(m) => m,
Err(e) if e.to_string().contains("no rows") => {
Err(e) if e.to_string().contains("no rows") && !config.namespaces.is_empty() => {
debug!(
target: LOG_TARGET,
selector = %event.selector,
"Model does not exist, skipping."
);
return Ok(());
}
Err(e) => return Err(e),
Err(e) => {
return Err(anyhow::anyhow!(
"Failed to retrieve model with selector {:#x}: {}",
event.selector,
e
))
}
};

info!(
Expand Down
21 changes: 15 additions & 6 deletions crates/torii/indexer/src/processors/store_update_member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use starknet::core::types::Event;
use starknet::core::utils::get_selector_from_name;
use starknet::providers::Provider;
use torii_sqlite::Sql;
use tracing::info;
use tracing::{debug, info};

use super::{EventProcessor, EventProcessorConfig};

Expand Down Expand Up @@ -37,7 +37,7 @@ where
block_timestamp: u64,
event_id: &str,
event: &Event,
_config: &EventProcessorConfig,
config: &EventProcessorConfig,
) -> Result<(), Error> {
// Torii version is coupled to the world version, so we can expect the event to be well
// formed.
Expand All @@ -61,11 +61,20 @@ where
// This can happen if only specific namespaces are indexed.
let model = match db.model(model_selector).await {
Ok(m) => m,
Err(e) if e.to_string().contains("no rows") && !config.namespaces.is_empty() => {
debug!(
target: LOG_TARGET,
selector = %model_selector,
"Model does not exist, skipping."
);
return Ok(());
}
Err(e) => {
if e.to_string().contains("no rows") {
return Ok(());
}
return Err(e);
return Err(anyhow::anyhow!(
"Failed to retrieve model with selector {:#x}: {}",
event.selector,
e
))
}
};

Expand Down
12 changes: 9 additions & 3 deletions crates/torii/indexer/src/processors/store_update_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ where
block_timestamp: u64,
event_id: &str,
event: &Event,
_config: &EventProcessorConfig,
config: &EventProcessorConfig,
) -> Result<(), Error> {
// Torii version is coupled to the world version, so we can expect the event to be well
// formed.
Expand All @@ -59,15 +59,21 @@ where
// This can happen if only specific namespaces are indexed.
let model = match db.model(event.selector).await {
Ok(m) => m,
Err(e) if e.to_string().contains("no rows") => {
Err(e) if e.to_string().contains("no rows") && !config.namespaces.is_empty() => {
debug!(
target: LOG_TARGET,
selector = %event.selector,
"Model does not exist, skipping."
);
return Ok(());
}
Err(e) => return Err(e),
Err(e) => {
return Err(anyhow::anyhow!(
"Failed to retrieve model with selector {:#x}: {}",
event.selector,
e
))
}
};

info!(
Expand Down
Loading