-
Notifications
You must be signed in to change notification settings - Fork 188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(torii-indexer): parallelize models & event messages #2912
base: main
Are you sure you want to change the base?
Changes from 2 commits
9db6ad0
312befa
2ad2772
a7cf967
f461cea
bbd1462
0a73aa6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -207,6 +207,9 @@ pub struct ParallelizedEvent { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pub event: Event, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
type TaskPriority = usize; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
type TaskId = u64; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
#[allow(missing_debug_implementations)] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pub struct Engine<P: Provider + Send + Sync + std::fmt::Debug + 'static> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
world: Arc<WorldContractReader<P>>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -216,7 +219,7 @@ pub struct Engine<P: Provider + Send + Sync + std::fmt::Debug + 'static> { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
config: EngineConfig, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
shutdown_tx: Sender<()>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
block_tx: Option<BoundedSender<u64>>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
tasks: HashMap<u64, Vec<(ContractType, ParallelizedEvent)>>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
tasks: BTreeMap<TaskPriority, HashMap<TaskId, Vec<(ContractType, ParallelizedEvent)>>>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
contracts: Arc<HashMap<Felt, ContractType>>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -250,7 +253,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
shutdown_tx, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
block_tx, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
contracts, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
tasks: HashMap::new(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
tasks: BTreeMap::new(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -596,44 +599,71 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async fn process_tasks(&mut self) -> Result<()> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// We use a semaphore to limit the number of concurrent tasks | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_tasks)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Run all tasks concurrently | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let mut handles = Vec::new(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for (task_id, events) in self.tasks.drain() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let db = self.db.clone(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let world = self.world.clone(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let semaphore = semaphore.clone(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let processors = self.processors.clone(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let event_processor_config = self.config.event_processor_config.clone(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
handles.push(tokio::spawn(async move { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let _permit = semaphore.acquire().await?; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let mut local_db = db.clone(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for (contract_type, ParallelizedEvent { event_id, event, block_number, block_timestamp }) in events { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let contract_processors = processors.get_event_processor(contract_type); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if let Some(processors) = contract_processors.get(&event.keys[0]) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let processor = processors.iter().find(|p| p.validate(&event)).expect("Must find atleast one processor for the event"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
debug!(target: LOG_TARGET, event_name = processor.event_key(), task_id = %task_id, "Processing parallelized event."); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if let Err(e) = processor | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.process(&world, &mut local_db, block_number, block_timestamp, &event_id, &event, &event_processor_config) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.await | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, task_id = %task_id, "Processing parallelized event."); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Process each priority level sequentially | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for (priority, task_group) in std::mem::take(&mut self.tasks) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let mut handles = Vec::new(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Process all tasks within this priority level concurrently | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for (task_id, events) in task_group { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let db = self.db.clone(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let world = self.world.clone(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let semaphore = semaphore.clone(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let processors = self.processors.clone(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let event_processor_config = self.config.event_processor_config.clone(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
handles.push(tokio::spawn(async move { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let _permit = semaphore.acquire().await?; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let mut local_db = db.clone(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Process all events for this task sequentially | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for (contract_type, event) in events { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let contract_processors = processors.get_event_processor(contract_type); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if let Some(processors) = contract_processors.get(&event.event.keys[0]) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let processor = processors.iter() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.find(|p| p.validate(&event.event)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.expect("Must find at least one processor for the event"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
debug!( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
target: LOG_TARGET, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
event_name = processor.event_key(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
task_id = %task_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
priority = %priority, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"Processing parallelized event." | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if let Err(e) = processor | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.process( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
&world, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
&mut local_db, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
event.block_number, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
event.block_timestamp, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
&event.event_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
&event.event, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
&event_processor_config, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.await | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
error!( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
target: LOG_TARGET, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
event_name = processor.event_key(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
error = %e, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
task_id = %task_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
priority = %priority, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"Processing parallelized event." | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Ok::<_, anyhow::Error>(()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
})); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Ok::<_, anyhow::Error>(()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
})); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Join all tasks | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
try_join_all(handles).await?; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Wait for all tasks in this priority level to complete before moving to next priority | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
try_join_all(handles).await?; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Ok(()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -856,30 +886,40 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.find(|p| p.validate(event)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.expect("Must find atleast one processor for the event"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let task_identifier = match processor.event_key().as_str() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let (task_priority, task_identifier) = match processor.event_key().as_str() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"ModelRegistered" | "EventRegistered" => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let mut hasher = DefaultHasher::new(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
event.keys.iter().for_each(|k| k.hash(&mut hasher)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let hash = hasher.finish() & 0x00FFFFFFFFFFFFFF; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(0usize, hash) // Priority 0 (highest) for model/event registration | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let mut hasher = DefaultHasher::new(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// model selector | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
event.keys[1].hash(&mut hasher); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// entity id | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
event.keys[2].hash(&mut hasher); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
hasher.finish() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let hash = hasher.finish() & 0x00FFFFFFFFFFFFFF; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(2usize, hash) // Priority 2 (lower) for store operations | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
_ => 0, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
_ => (0, 0) // No parallelization for other events | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ohayo, sensei! Guard against index out-of-bounds when accessing Accessing Apply this diff to add necessary checks: + if event.keys.len() >= 3 {
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
let hash = hasher.finish() & 0x00FFFFFFFFFFFFFF;
(2usize, hash) // Priority 2 (lower) for store operations
+ } else {
+ warn!(target: LOG_TARGET, "Insufficient event keys for hashing. Using default task identifier.");
+ (2usize, 0) // Handle with default or error as appropriate
+ } 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// if we have a task identifier, we queue the event to be parallelized | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if task_identifier != 0 { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.tasks.entry(task_identifier).or_default().push(( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
contract_type, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ParallelizedEvent { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
event_id: event_id.to_string(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
event: event.clone(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
block_number, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
block_timestamp, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.tasks | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.entry(task_priority) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.or_default() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.entry(task_identifier) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.or_default() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.push(( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
contract_type, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ParallelizedEvent { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
event_id: event_id.to_string(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
event: event.clone(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
block_number, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
block_timestamp, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Process non-parallelized events immediately | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ohayo, sensei! Review handling of When |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// if we dont have a task identifier, we process the event immediately | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if processor.validate(event) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if let Err(e) = processor | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wdyt about an enum, using the
Ord
trait to keep keys ordered in theBTreeMap
?Feeling like enum would be more idiomatic, and remove the magic values like we have in the engine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like: