From 9db6ad0b8d65162297e7f25e90559569b109ac54 Mon Sep 17 00:00:00 2001 From: Nasr Date: Wed, 15 Jan 2025 14:39:02 +0700 Subject: [PATCH 1/7] feat(torii-indexer): natural priority in parallelized tasks for model processing --- crates/torii/indexer/src/engine.rs | 95 +++++++++++++++++++----------- 1 file changed, 61 insertions(+), 34 deletions(-) diff --git a/crates/torii/indexer/src/engine.rs b/crates/torii/indexer/src/engine.rs index 1210c5ec9d..06dba72cf1 100644 --- a/crates/torii/indexer/src/engine.rs +++ b/crates/torii/indexer/src/engine.rs @@ -216,7 +216,7 @@ pub struct Engine { config: EngineConfig, shutdown_tx: Sender<()>, block_tx: Option>, - tasks: HashMap>, + tasks: BTreeMap>, contracts: Arc>, } @@ -250,7 +250,7 @@ impl Engine

{ shutdown_tx, block_tx, contracts, - tasks: HashMap::new(), + tasks: BTreeMap::new(), } } @@ -596,44 +596,65 @@ impl Engine

{ } async fn process_tasks(&mut self) -> Result<()> { - // We use a semaphore to limit the number of concurrent tasks let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_tasks)); - - // Run all tasks concurrently let mut handles = Vec::new(); - for (task_id, events) in self.tasks.drain() { - let db = self.db.clone(); - let world = self.world.clone(); - let semaphore = semaphore.clone(); - let processors = self.processors.clone(); - - let event_processor_config = self.config.event_processor_config.clone(); - handles.push(tokio::spawn(async move { - let _permit = semaphore.acquire().await?; - let mut local_db = db.clone(); - for (contract_type, ParallelizedEvent { event_id, event, block_number, block_timestamp }) in events { + + // Drain our tasks since we're going to process them all + for (task_id, events) in std::mem::take(&mut self.tasks).into_iter() { + for (contract_type, event) in events { + let db = self.db.clone(); + let world = self.world.clone(); + let semaphore = semaphore.clone(); + let processors = self.processors.clone(); + let event_processor_config = self.config.event_processor_config.clone(); + + handles.push(tokio::spawn(async move { + let _permit = semaphore.acquire().await?; + let mut local_db = db.clone(); + let contract_processors = processors.get_event_processor(contract_type); - if let Some(processors) = contract_processors.get(&event.keys[0]) { - - let processor = processors.iter().find(|p| p.validate(&event)).expect("Must find atleast one processor for the event"); - - debug!(target: LOG_TARGET, event_name = processor.event_key(), task_id = %task_id, "Processing parallelized event."); + if let Some(processors) = contract_processors.get(&event.event.keys[0]) { + let processor = processors.iter() + .find(|p| p.validate(&event.event)) + .expect("Must find at least one processor for the event"); + + debug!( + target: LOG_TARGET, + event_name = processor.event_key(), + task_id = %task_id, + "Processing parallelized event." + ); if let Err(e) = processor - .process(&world, &mut local_db, block_number, block_timestamp, &event_id, &event, &event_processor_config) + .process( + &world, + &mut local_db, + event.block_number, + event.block_timestamp, + &event.event_id, + &event.event, + &event_processor_config, + ) .await { - error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, task_id = %task_id, "Processing parallelized event."); + error!( + target: LOG_TARGET, + event_name = processor.event_key(), + error = %e, + task_id = %task_id, + "Processing parallelized event." + ); } } - } - Ok::<_, anyhow::Error>(()) - })); - } + Ok::<_, anyhow::Error>(()) + })); + } - // Join all tasks - try_join_all(handles).await?; + // Wait for all tasks in this priority level to complete before moving to next + try_join_all(handles).await?; + handles = Vec::new(); + } Ok(()) } @@ -857,18 +878,24 @@ impl Engine

{ .expect("Must find atleast one processor for the event"); let task_identifier = match processor.event_key().as_str() { + "ModelRegistered" | "EventRegistered" => { + let mut hasher = DefaultHasher::new(); + event.keys[1].hash(&mut hasher); + // Priority 0 (highest) for model/event registration + let hash = hasher.finish() & 0x00FFFFFFFFFFFFFF; + hash | (0u64 << 56) + } "StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => { let mut hasher = DefaultHasher::new(); - // model selector event.keys[1].hash(&mut hasher); - // entity id event.keys[2].hash(&mut hasher); - hasher.finish() + // Priority 2 (lower) for store operations + let hash = hasher.finish() & 0x00FFFFFFFFFFFFFF; + hash | (2u64 << 56) } - _ => 0, + _ => 0 }; - // if we have a task identifier, we queue the event to be parallelized if task_identifier != 0 { self.tasks.entry(task_identifier).or_default().push(( contract_type, From 312befae0c556f85fb5a2a9b7c7b649b15261ca3 Mon Sep 17 00:00:00 2001 From: Nasr Date: Wed, 15 Jan 2025 15:16:54 +0700 Subject: [PATCH 2/7] btreemap of priorities --- crates/torii/indexer/src/engine.rs | 113 ++++++++++++++++------------- 1 file changed, 63 insertions(+), 50 deletions(-) diff --git a/crates/torii/indexer/src/engine.rs b/crates/torii/indexer/src/engine.rs index 06dba72cf1..68cbca4d5f 100644 --- a/crates/torii/indexer/src/engine.rs +++ b/crates/torii/indexer/src/engine.rs @@ -207,6 +207,9 @@ pub struct ParallelizedEvent { pub event: Event, } +type TaskPriority = usize; +type TaskId = u64; + #[allow(missing_debug_implementations)] pub struct Engine { world: Arc>, @@ -216,7 +219,7 @@ pub struct Engine { config: EngineConfig, shutdown_tx: Sender<()>, block_tx: Option>, - tasks: BTreeMap>, + tasks: BTreeMap>>, contracts: Arc>, } @@ -597,11 +600,13 @@ impl Engine

{ async fn process_tasks(&mut self) -> Result<()> { let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_tasks)); - let mut handles = Vec::new(); - // Drain our tasks since we're going to process them all - for (task_id, events) in std::mem::take(&mut self.tasks).into_iter() { - for (contract_type, event) in events { + // Process each priority level sequentially + for (priority, task_group) in std::mem::take(&mut self.tasks) { + let mut handles = Vec::new(); + + // Process all tasks within this priority level concurrently + for (task_id, events) in task_group { let db = self.db.clone(); let world = self.world.clone(); let semaphore = semaphore.clone(); @@ -612,38 +617,43 @@ impl Engine

{ let _permit = semaphore.acquire().await?; let mut local_db = db.clone(); - let contract_processors = processors.get_event_processor(contract_type); - if let Some(processors) = contract_processors.get(&event.event.keys[0]) { - let processor = processors.iter() - .find(|p| p.validate(&event.event)) - .expect("Must find at least one processor for the event"); - - debug!( - target: LOG_TARGET, - event_name = processor.event_key(), - task_id = %task_id, - "Processing parallelized event." - ); - - if let Err(e) = processor - .process( - &world, - &mut local_db, - event.block_number, - event.block_timestamp, - &event.event_id, - &event.event, - &event_processor_config, - ) - .await - { - error!( + // Process all events for this task sequentially + for (contract_type, event) in events { + let contract_processors = processors.get_event_processor(contract_type); + if let Some(processors) = contract_processors.get(&event.event.keys[0]) { + let processor = processors.iter() + .find(|p| p.validate(&event.event)) + .expect("Must find at least one processor for the event"); + + debug!( target: LOG_TARGET, event_name = processor.event_key(), - error = %e, task_id = %task_id, + priority = %priority, "Processing parallelized event." ); + + if let Err(e) = processor + .process( + &world, + &mut local_db, + event.block_number, + event.block_timestamp, + &event.event_id, + &event.event, + &event_processor_config, + ) + .await + { + error!( + target: LOG_TARGET, + event_name = processor.event_key(), + error = %e, + task_id = %task_id, + priority = %priority, + "Processing parallelized event." + ); + } } } @@ -651,9 +661,8 @@ impl Engine

{ })); } - // Wait for all tasks in this priority level to complete before moving to next + // Wait for all tasks in this priority level to complete before moving to next priority try_join_all(handles).await?; - handles = Vec::new(); } Ok(()) @@ -877,36 +886,40 @@ impl Engine

{ .find(|p| p.validate(event)) .expect("Must find atleast one processor for the event"); - let task_identifier = match processor.event_key().as_str() { + let (task_priority, task_identifier) = match processor.event_key().as_str() { "ModelRegistered" | "EventRegistered" => { let mut hasher = DefaultHasher::new(); - event.keys[1].hash(&mut hasher); - // Priority 0 (highest) for model/event registration + event.keys.iter().for_each(|k| k.hash(&mut hasher)); let hash = hasher.finish() & 0x00FFFFFFFFFFFFFF; - hash | (0u64 << 56) + (0usize, hash) // Priority 0 (highest) for model/event registration } "StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => { let mut hasher = DefaultHasher::new(); event.keys[1].hash(&mut hasher); event.keys[2].hash(&mut hasher); - // Priority 2 (lower) for store operations let hash = hasher.finish() & 0x00FFFFFFFFFFFFFF; - hash | (2u64 << 56) + (2usize, hash) // Priority 2 (lower) for store operations } - _ => 0 + _ => (0, 0) // No parallelization for other events }; if task_identifier != 0 { - self.tasks.entry(task_identifier).or_default().push(( - contract_type, - ParallelizedEvent { - event_id: event_id.to_string(), - event: event.clone(), - block_number, - block_timestamp, - }, - )); + self.tasks + .entry(task_priority) + .or_default() + .entry(task_identifier) + .or_default() + .push(( + contract_type, + ParallelizedEvent { + event_id: event_id.to_string(), + event: event.clone(), + block_number, + block_timestamp, + }, + )); } else { + // Process non-parallelized events immediately // if we dont have a task identifier, we process the event immediately if processor.validate(event) { if let Err(e) = processor From 2ad277282209017869a68348f98257b3f4b45d33 Mon Sep 17 00:00:00 2001 From: Nasr Date: Wed, 15 Jan 2025 15:34:00 +0700 Subject: [PATCH 3/7] more verbose error handling & get rid of bit masking task id --- crates/torii/indexer/src/engine.rs | 4 ++-- .../src/processors/store_del_record.rs | 12 ++++++++--- .../src/processors/store_set_record.rs | 12 ++++++++--- .../src/processors/store_update_member.rs | 21 +++++++++++++------ .../src/processors/store_update_record.rs | 12 ++++++++--- 5 files changed, 44 insertions(+), 17 deletions(-) diff --git a/crates/torii/indexer/src/engine.rs b/crates/torii/indexer/src/engine.rs index 68cbca4d5f..92da5cf474 100644 --- a/crates/torii/indexer/src/engine.rs +++ b/crates/torii/indexer/src/engine.rs @@ -890,14 +890,14 @@ impl Engine

{ "ModelRegistered" | "EventRegistered" => { let mut hasher = DefaultHasher::new(); event.keys.iter().for_each(|k| k.hash(&mut hasher)); - let hash = hasher.finish() & 0x00FFFFFFFFFFFFFF; + let hash = hasher.finish(); (0usize, hash) // Priority 0 (highest) for model/event registration } "StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => { let mut hasher = DefaultHasher::new(); event.keys[1].hash(&mut hasher); event.keys[2].hash(&mut hasher); - let hash = hasher.finish() & 0x00FFFFFFFFFFFFFF; + let hash = hasher.finish(); (2usize, hash) // Priority 2 (lower) for store operations } _ => (0, 0) // No parallelization for other events diff --git a/crates/torii/indexer/src/processors/store_del_record.rs b/crates/torii/indexer/src/processors/store_del_record.rs index e109f069d7..c92b6961f0 100644 --- a/crates/torii/indexer/src/processors/store_del_record.rs +++ b/crates/torii/indexer/src/processors/store_del_record.rs @@ -35,7 +35,7 @@ where block_timestamp: u64, event_id: &str, event: &Event, - _config: &EventProcessorConfig, + config: &EventProcessorConfig, ) -> Result<(), Error> { // Torii version is coupled to the world version, so we can expect the event to be well // formed. @@ -55,7 +55,7 @@ where // This can happen if only specific namespaces are indexed. let model = match db.model(event.selector).await { Ok(m) => m, - Err(e) if e.to_string().contains("no rows") => { + Err(e) if e.to_string().contains("no rows") && !config.namespaces.is_empty() => { debug!( target: LOG_TARGET, selector = %event.selector, @@ -63,7 +63,13 @@ where ); return Ok(()); } - Err(e) => return Err(e), + Err(e) => { + return Err(anyhow::anyhow!( + "Failed to retrieve model with selector {:#x}: {}", + event.selector, + e + )) + } }; info!( diff --git a/crates/torii/indexer/src/processors/store_set_record.rs b/crates/torii/indexer/src/processors/store_set_record.rs index c564cb3968..eb73f17f2b 100644 --- a/crates/torii/indexer/src/processors/store_set_record.rs +++ b/crates/torii/indexer/src/processors/store_set_record.rs @@ -36,7 +36,7 @@ where block_timestamp: u64, event_id: &str, event: &Event, - _config: &EventProcessorConfig, + config: &EventProcessorConfig, ) -> Result<(), Error> { // Torii version is coupled to the world version, so we can expect the event to be well // formed. @@ -56,7 +56,7 @@ where // This can happen if only specific namespaces are indexed. let model = match db.model(event.selector).await { Ok(m) => m, - Err(e) if e.to_string().contains("no rows") => { + Err(e) if e.to_string().contains("no rows") && !config.namespaces.is_empty() => { debug!( target: LOG_TARGET, selector = %event.selector, @@ -64,7 +64,13 @@ where ); return Ok(()); } - Err(e) => return Err(e), + Err(e) => { + return Err(anyhow::anyhow!( + "Failed to retrieve model with selector {:#x}: {}", + event.selector, + e + )) + } }; info!( diff --git a/crates/torii/indexer/src/processors/store_update_member.rs b/crates/torii/indexer/src/processors/store_update_member.rs index 776d61d9cb..b5662a21bd 100644 --- a/crates/torii/indexer/src/processors/store_update_member.rs +++ b/crates/torii/indexer/src/processors/store_update_member.rs @@ -7,7 +7,7 @@ use starknet::core::types::Event; use starknet::core::utils::get_selector_from_name; use starknet::providers::Provider; use torii_sqlite::Sql; -use tracing::info; +use tracing::{debug, info}; use super::{EventProcessor, EventProcessorConfig}; @@ -37,7 +37,7 @@ where block_timestamp: u64, event_id: &str, event: &Event, - _config: &EventProcessorConfig, + config: &EventProcessorConfig, ) -> Result<(), Error> { // Torii version is coupled to the world version, so we can expect the event to be well // formed. @@ -61,11 +61,20 @@ where // This can happen if only specific namespaces are indexed. let model = match db.model(model_selector).await { Ok(m) => m, + Err(e) if e.to_string().contains("no rows") && !config.namespaces.is_empty() => { + debug!( + target: LOG_TARGET, + selector = %model_selector, + "Model does not exist, skipping." + ); + return Ok(()); + } Err(e) => { - if e.to_string().contains("no rows") { - return Ok(()); - } - return Err(e); + return Err(anyhow::anyhow!( + "Failed to retrieve model with selector {:#x}: {}", + event.selector, + e + )) } }; diff --git a/crates/torii/indexer/src/processors/store_update_record.rs b/crates/torii/indexer/src/processors/store_update_record.rs index b92344849d..5cf5a96dea 100644 --- a/crates/torii/indexer/src/processors/store_update_record.rs +++ b/crates/torii/indexer/src/processors/store_update_record.rs @@ -36,7 +36,7 @@ where block_timestamp: u64, event_id: &str, event: &Event, - _config: &EventProcessorConfig, + config: &EventProcessorConfig, ) -> Result<(), Error> { // Torii version is coupled to the world version, so we can expect the event to be well // formed. @@ -59,7 +59,7 @@ where // This can happen if only specific namespaces are indexed. let model = match db.model(event.selector).await { Ok(m) => m, - Err(e) if e.to_string().contains("no rows") => { + Err(e) if e.to_string().contains("no rows") && !config.namespaces.is_empty() => { debug!( target: LOG_TARGET, selector = %event.selector, @@ -67,7 +67,13 @@ where ); return Ok(()); } - Err(e) => return Err(e), + Err(e) => { + return Err(anyhow::anyhow!( + "Failed to retrieve model with selector {:#x}: {}", + event.selector, + e + )) + } }; info!( From a7cf967924f117b381e784db90e512ccdb5d5c3f Mon Sep 17 00:00:00 2001 From: Nasr Date: Wed, 15 Jan 2025 15:36:48 +0700 Subject: [PATCH 4/7] store all raw events regardless of erc or world --- crates/torii/indexer/src/engine.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/crates/torii/indexer/src/engine.rs b/crates/torii/indexer/src/engine.rs index 92da5cf474..494e30772e 100644 --- a/crates/torii/indexer/src/engine.rs +++ b/crates/torii/indexer/src/engine.rs @@ -832,14 +832,7 @@ impl Engine

{ contract_type: ContractType, ) -> Result<()> { if self.config.flags.contains(IndexingFlags::RAW_EVENTS) { - match contract_type { - ContractType::WORLD => { - self.db.store_event(event_id, event, transaction_hash, block_timestamp)?; - } - // ERC events needs to be processed inside there respective processor - // we store transfer events for ERC contracts regardless of this flag - ContractType::ERC20 | ContractType::ERC721 => {} - } + self.db.store_event(event_id, event, transaction_hash, block_timestamp)?; } let event_key = event.keys[0]; From f461cea3ecab0a4e05463bee5e145fe6cb97bf0c Mon Sep 17 00:00:00 2001 From: Nasr Date: Wed, 15 Jan 2025 15:37:04 +0700 Subject: [PATCH 5/7] fmt --- crates/torii/indexer/src/engine.rs | 22 +++++++++---------- .../src/processors/store_del_record.rs | 2 +- .../src/processors/store_set_record.rs | 2 +- .../src/processors/store_update_member.rs | 2 +- .../src/processors/store_update_record.rs | 2 +- 5 files changed, 14 insertions(+), 16 deletions(-) diff --git a/crates/torii/indexer/src/engine.rs b/crates/torii/indexer/src/engine.rs index 494e30772e..a50583f118 100644 --- a/crates/torii/indexer/src/engine.rs +++ b/crates/torii/indexer/src/engine.rs @@ -600,11 +600,11 @@ impl Engine

{ async fn process_tasks(&mut self) -> Result<()> { let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_tasks)); - + // Process each priority level sequentially for (priority, task_group) in std::mem::take(&mut self.tasks) { let mut handles = Vec::new(); - + // Process all tasks within this priority level concurrently for (task_id, events) in task_group { let db = self.db.clone(); @@ -616,12 +616,13 @@ impl Engine

{ handles.push(tokio::spawn(async move { let _permit = semaphore.acquire().await?; let mut local_db = db.clone(); - + // Process all events for this task sequentially for (contract_type, event) in events { let contract_processors = processors.get_event_processor(contract_type); if let Some(processors) = contract_processors.get(&event.event.keys[0]) { - let processor = processors.iter() + let processor = processors + .iter() .find(|p| p.validate(&event.event)) .expect("Must find at least one processor for the event"); @@ -893,16 +894,12 @@ impl Engine

{ let hash = hasher.finish(); (2usize, hash) // Priority 2 (lower) for store operations } - _ => (0, 0) // No parallelization for other events + _ => (0, 0), // No parallelization for other events }; if task_identifier != 0 { - self.tasks - .entry(task_priority) - .or_default() - .entry(task_identifier) - .or_default() - .push(( + self.tasks.entry(task_priority).or_default().entry(task_identifier).or_default().push( + ( contract_type, ParallelizedEvent { event_id: event_id.to_string(), @@ -910,7 +907,8 @@ impl Engine

{ block_number, block_timestamp, }, - )); + ), + ); } else { // Process non-parallelized events immediately // if we dont have a task identifier, we process the event immediately diff --git a/crates/torii/indexer/src/processors/store_del_record.rs b/crates/torii/indexer/src/processors/store_del_record.rs index c92b6961f0..c129e7e7a9 100644 --- a/crates/torii/indexer/src/processors/store_del_record.rs +++ b/crates/torii/indexer/src/processors/store_del_record.rs @@ -68,7 +68,7 @@ where "Failed to retrieve model with selector {:#x}: {}", event.selector, e - )) + )); } }; diff --git a/crates/torii/indexer/src/processors/store_set_record.rs b/crates/torii/indexer/src/processors/store_set_record.rs index eb73f17f2b..1b30b24016 100644 --- a/crates/torii/indexer/src/processors/store_set_record.rs +++ b/crates/torii/indexer/src/processors/store_set_record.rs @@ -69,7 +69,7 @@ where "Failed to retrieve model with selector {:#x}: {}", event.selector, e - )) + )); } }; diff --git a/crates/torii/indexer/src/processors/store_update_member.rs b/crates/torii/indexer/src/processors/store_update_member.rs index b5662a21bd..30fd10882e 100644 --- a/crates/torii/indexer/src/processors/store_update_member.rs +++ b/crates/torii/indexer/src/processors/store_update_member.rs @@ -74,7 +74,7 @@ where "Failed to retrieve model with selector {:#x}: {}", event.selector, e - )) + )); } }; diff --git a/crates/torii/indexer/src/processors/store_update_record.rs b/crates/torii/indexer/src/processors/store_update_record.rs index 5cf5a96dea..a76f40293e 100644 --- a/crates/torii/indexer/src/processors/store_update_record.rs +++ b/crates/torii/indexer/src/processors/store_update_record.rs @@ -72,7 +72,7 @@ where "Failed to retrieve model with selector {:#x}: {}", event.selector, e - )) + )); } }; From bbd14620ac91d3f3b07b6a861c314386c288fc4a Mon Sep 17 00:00:00 2001 From: Nasr Date: Wed, 15 Jan 2025 23:06:51 +0700 Subject: [PATCH 6/7] parallelize event emitted --- crates/torii/indexer/src/engine.rs | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/crates/torii/indexer/src/engine.rs b/crates/torii/indexer/src/engine.rs index a50583f118..fb4a9d5e2e 100644 --- a/crates/torii/indexer/src/engine.rs +++ b/crates/torii/indexer/src/engine.rs @@ -6,6 +6,7 @@ use std::time::Duration; use anyhow::Result; use bitflags::bitflags; +use cainome::cairo_serde::CairoSerde; use dojo_utils::provider as provider_utils; use dojo_world::contracts::world::WorldContractReader; use futures_util::future::{join_all, try_join_all}; @@ -17,7 +18,7 @@ use starknet::core::types::{ }; use starknet::core::utils::get_selector_from_name; use starknet::providers::Provider; -use starknet_crypto::Felt; +use starknet_crypto::{poseidon_hash_many, Felt}; use tokio::sync::broadcast::Sender; use tokio::sync::mpsc::Sender as BoundedSender; use tokio::sync::Semaphore; @@ -894,6 +895,22 @@ impl Engine

{ let hash = hasher.finish(); (2usize, hash) // Priority 2 (lower) for store operations } + "EventEmitted" => { + let mut hasher = DefaultHasher::new(); + + let keys = Vec::::cairo_deserialize(&event.keys, 0).unwrap_or_else(|e| { + panic!("Expected EventEmitted keys to be well formed: {:?}", e); + }); + + // selector + event.keys[1].hash(&mut hasher); + // entity id + let entity_id = poseidon_hash_many(&keys); + entity_id.hash(&mut hasher); + + let hash = hasher.finish(); + (2usize, hash) // Priority 2 for event messages + } _ => (0, 0), // No parallelization for other events }; From 0a73aa6260fdb77c6ed2d99d74888b1a590c8a12 Mon Sep 17 00:00:00 2001 From: Nasr Date: Wed, 15 Jan 2025 23:19:20 +0700 Subject: [PATCH 7/7] need to use event data --- crates/torii/indexer/src/engine.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/torii/indexer/src/engine.rs b/crates/torii/indexer/src/engine.rs index fb4a9d5e2e..3e7f1c5332 100644 --- a/crates/torii/indexer/src/engine.rs +++ b/crates/torii/indexer/src/engine.rs @@ -898,7 +898,7 @@ impl Engine

{ "EventEmitted" => { let mut hasher = DefaultHasher::new(); - let keys = Vec::::cairo_deserialize(&event.keys, 0).unwrap_or_else(|e| { + let keys = Vec::::cairo_deserialize(&event.data, 0).unwrap_or_else(|e| { panic!("Expected EventEmitted keys to be well formed: {:?}", e); });