Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii-indexer): parallelize models & event messages #2912

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

Larkooo
Copy link
Collaborator

@Larkooo Larkooo commented Jan 15, 2025

Summary by CodeRabbit

  • Refactor
    • Updated internal data structure for task management to improve performance and processing efficiency.
    • Refined event processing logic with a more nuanced approach to task identification and parallel processing, allowing for prioritized execution of tasks.
    • Enhanced error handling and logging in various processors to provide clearer insights into processing flow and potential issues.
    • Made configuration parameters actively usable within processing methods, improving overall functionality.

Copy link

coderabbitai bot commented Jan 15, 2025

Walkthrough

Ohayo, sensei! The pull request introduces substantial modifications to the task management system in the Engine struct. The primary change involves switching from a HashMap to a BTreeMap for storing tasks, impacting the initialization and processing of events. This modification enhances the event processing logic by implementing a structured task identifier generation mechanism based on priority levels, allowing for prioritized processing of tasks while maintaining concurrency within the same priority level.

Changes

File Change Summary
crates/torii/indexer/src/engine.rs - Replaced HashMap with BTreeMap for tasks field
- Updated new method to initialize tasks with BTreeMap::new()
- Restructured task processing to handle tasks by priority sequentially while allowing concurrent processing of events within the same task
- Added type aliases for TaskPriority and TaskId
crates/torii/indexer/src/processors/store_del_record.rs - Updated process method to use config parameter
- Enhanced error handling to check config.namespaces before skipping model retrieval errors
- Wrapped errors in anyhow::anyhow! for better context
crates/torii/indexer/src/processors/store_set_record.rs - Renamed _config to config in process method
- Improved error handling logic to depend on config.namespaces state
crates/torii/indexer/src/processors/store_update_member.rs - Updated process method to include debug logging for model retrieval errors
- Enhanced error reporting with anyhow::anyhow!
crates/torii/indexer/src/processors/store_update_record.rs - Changed _config to config in process method
- Improved error handling to check config.namespaces before skipping errors

Possibly related PRs

  • fix(torii-core): correct parallelized task hash #2860: The changes in this PR involve modifications to the task identifier generation logic in the Engine struct, which is directly related to the task management logic being restructured in the main PR. Both PRs focus on enhancing how tasks are processed and identified within the Engine.
  • refactor(torii): max conns to tasks & dont use sqlite acquire #2911: This PR modifies connection options for the SQLite database in the Runner struct, which may relate to the overall task management and processing capabilities introduced in the main PR, particularly in how tasks interact with the database.

Suggested Reviewers

  • glihm

Sensei, the changes look quite intriguing! The transition to a BTreeMap and the more nuanced event processing approach should bring some interesting improvements to the system. Ohayo and happy coding! 🍵🖥️


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
crates/torii/indexer/src/engine.rs (2)

881-896: Document priority levels and consider using constants, sensei!

The task identifier generation logic cleverly uses bit manipulation for priorities, but could benefit from better documentation and constant definitions:

+ // Priority levels for different event types
+ const PRIORITY_MODEL_EVENT: u64 = 0;
+ const PRIORITY_STORE_OPS: u64 = 2;
+ // Mask to clear priority bits
+ const PRIORITY_MASK: u64 = 0x00FFFFFFFFFFFFFF;
+ 
  match processor.event_key().as_str() {
      "ModelRegistered" | "EventRegistered" => {
          let mut hasher = DefaultHasher::new();
          event.keys[1].hash(&mut hasher);
-         // Priority 0 (highest) for model/event registration
-         let hash = hasher.finish() & 0x00FFFFFFFFFFFFFF;
-         hash | (0u64 << 56)
+         // Use highest 8 bits for priority, lower 56 bits for hash-based grouping
+         let hash = hasher.finish() & PRIORITY_MASK;
+         hash | (PRIORITY_MODEL_EVENT << 56)
      }

601-657: Enhance error handling with more context, sensei!

The task processing implementation looks solid, but could benefit from improved error handling. Consider adding more context to errors and enhancing debug logging:

  if let Err(e) = processor
      .process(
          &world,
          &mut local_db,
          event.block_number,
          event.block_timestamp,
          &event.event_id,
          &event.event,
          &event_processor_config,
      )
      .await
  {
      error!(
          target: LOG_TARGET,
          event_name = processor.event_key(),
+         block_number = %event.block_number,
+         timestamp = %event.block_timestamp,
          error = %e,
          task_id = %task_id,
+         "Failed to process event. This might impact dependent operations.",
-         "Processing parallelized event."
      );
+     // Propagate error with additional context
+     return Err(anyhow::anyhow!("Failed to process event {}: {}", event.event_id, e));
  }
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a8af3d6 and 9db6ad0.

📒 Files selected for processing (1)
  • crates/torii/indexer/src/engine.rs (4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: fmt
🔇 Additional comments (1)
crates/torii/indexer/src/engine.rs (1)

219-219: Ohayo! Nice choice using BTreeMap for ordered task processing, sensei!

The switch from HashMap to BTreeMap enables natural priority ordering of tasks, which is crucial for maintaining the correct processing sequence of model registrations and store operations.

Also applies to: 253-253

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
crates/torii/indexer/src/engine.rs (2)

256-256: Ohayo, sensei! Confirm consistent initialization of tasks.

Initializing tasks with BTreeMap::new() is appropriate. However, if there are default priorities or initial tasks, consider initializing them here for clarity.


603-666: Ohayo, sensei! Potential task starvation due to sequential priority processing.

Processing tasks strictly sequentially by priority level may cause lower-priority tasks to be delayed indefinitely if higher-priority tasks are continuously added. Consider implementing a more dynamic scheduling strategy to balance task priorities and prevent starvation.

Do you want assistance in proposing an enhanced task scheduling mechanism?

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9db6ad0 and 312befa.

📒 Files selected for processing (1)
  • crates/torii/indexer/src/engine.rs (5 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: fmt
🔇 Additional comments (3)
crates/torii/indexer/src/engine.rs (3)

210-212: Ohayo, sensei! Good use of type aliases for clarity.

Introducing TaskPriority and TaskId improves code readability and ensures type safety.


222-222: Ohayo, sensei! Verify the implications of changing tasks to a BTreeMap.

Switching from HashMap to BTreeMap for tasks introduces ordered task management based on priority. Ensure this change aligns with the desired task processing logic and does not introduce unintended side effects.


889-903: ⚠️ Potential issue

Ohayo, sensei! Potential hash collisions in task_identifier generation.

Using truncated hash values for task_identifier may lead to collisions, causing unrelated tasks to share the same identifier. Consider using a more collision-resistant method or including additional unique data to ensure task identifiers are unique.

Please run the following script to check for hash collisions:

Comment on lines 907 to 922
self.tasks
.entry(task_priority)
.or_default()
.entry(task_identifier)
.or_default()
.push((
contract_type,
ParallelizedEvent {
event_id: event_id.to_string(),
event: event.clone(),
block_number,
block_timestamp,
},
));
} else {
// Process non-parallelized events immediately
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Review handling of task_identifier equal to zero.

When task_identifier is zero, events are processed immediately outside of the priority queue. Ensure that zero cannot be a valid task_identifier that should be queued, or adjust the logic to handle zero appropriately to prevent unintended bypassing of the task queue.

Comment on lines 889 to 903
let (task_priority, task_identifier) = match processor.event_key().as_str() {
"ModelRegistered" | "EventRegistered" => {
let mut hasher = DefaultHasher::new();
event.keys.iter().for_each(|k| k.hash(&mut hasher));
let hash = hasher.finish() & 0x00FFFFFFFFFFFFFF;
(0usize, hash) // Priority 0 (highest) for model/event registration
}
"StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => {
let mut hasher = DefaultHasher::new();
// model selector
event.keys[1].hash(&mut hasher);
// entity id
event.keys[2].hash(&mut hasher);
hasher.finish()
let hash = hasher.finish() & 0x00FFFFFFFFFFFFFF;
(2usize, hash) // Priority 2 (lower) for store operations
}
_ => 0,
_ => (0, 0) // No parallelization for other events
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Guard against index out-of-bounds when accessing event.keys.

Accessing event.keys[1] and event.keys[2] without checking their existence may lead to panics if the keys vector is shorter than expected. Ensure that the event.keys vector has the required length before accessing these indices.

Apply this diff to add necessary checks:

+ if event.keys.len() >= 3 {
    let mut hasher = DefaultHasher::new();
    event.keys[1].hash(&mut hasher);
    event.keys[2].hash(&mut hasher);
    let hash = hasher.finish() & 0x00FFFFFFFFFFFFFF;
    (2usize, hash) // Priority 2 (lower) for store operations
+ } else {
+     warn!(target: LOG_TARGET, "Insufficient event keys for hashing. Using default task identifier.");
+     (2usize, 0) // Handle with default or error as appropriate
+ }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let (task_priority, task_identifier) = match processor.event_key().as_str() {
"ModelRegistered" | "EventRegistered" => {
let mut hasher = DefaultHasher::new();
event.keys.iter().for_each(|k| k.hash(&mut hasher));
let hash = hasher.finish() & 0x00FFFFFFFFFFFFFF;
(0usize, hash) // Priority 0 (highest) for model/event registration
}
"StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => {
let mut hasher = DefaultHasher::new();
// model selector
event.keys[1].hash(&mut hasher);
// entity id
event.keys[2].hash(&mut hasher);
hasher.finish()
let hash = hasher.finish() & 0x00FFFFFFFFFFFFFF;
(2usize, hash) // Priority 2 (lower) for store operations
}
_ => 0,
_ => (0, 0) // No parallelization for other events
let (task_priority, task_identifier) = match processor.event_key().as_str() {
"ModelRegistered" | "EventRegistered" => {
let mut hasher = DefaultHasher::new();
event.keys.iter().for_each(|k| k.hash(&mut hasher));
let hash = hasher.finish() & 0x00FFFFFFFFFFFFFF;
(0usize, hash) // Priority 0 (highest) for model/event registration
}
"StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => {
if event.keys.len() >= 3 {
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
let hash = hasher.finish() & 0x00FFFFFFFFFFFFFF;
(2usize, hash) // Priority 2 (lower) for store operations
} else {
warn!(target: LOG_TARGET, "Insufficient event keys for hashing. Using default task identifier.");
(2usize, 0) // Handle with default or error as appropriate
}
}
_ => (0, 0) // No parallelization for other events

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
crates/torii/indexer/src/engine.rs (1)

895-901: ⚠️ Potential issue

Add safety check for event keys access.

Guard against potential index out-of-bounds when accessing event.keys.

             "StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => {
+                if event.keys.len() < 3 {
+                    warn!(
+                        target: LOG_TARGET,
+                        "Insufficient event keys for hashing. Using default task identifier."
+                    );
+                    return (2usize, 0);
+                }
                 let mut hasher = DefaultHasher::new();
                 event.keys[1].hash(&mut hasher);
                 event.keys[2].hash(&mut hasher);
                 let hash = hasher.finish();
                 (2usize, hash)
             }
🧰 Tools
🪛 GitHub Actions: ci

[warning] 900-900: Missing comma in match expression

🧹 Nitpick comments (6)
crates/torii/indexer/src/processors/store_del_record.rs (1)

66-72: Excellent error message improvement, but there's a small formatting issue, sensei!

The enhanced error message with hex-formatted selector will make debugging much easier. However, there's a formatting issue to fix.

Fix the extra closing parenthesis on line 68 as flagged by the CI:

-                return Err(anyhow::anyhow!(
-                    "Failed to retrieve model with selector {:#x}: {}",
-                    event.selector,
-                    e
-                ))
+                return Err(anyhow::anyhow!(
+                    "Failed to retrieve model with selector {:#x}: {}",
+                    event.selector,
+                    e
+                ));
🧰 Tools
🪛 GitHub Actions: ci

[warning] 68-68: Extra closing parenthesis, formatting issue

crates/torii/indexer/src/processors/store_update_record.rs (1)

71-75: Ohayo! Small formatting fix needed, sensei!

The CI pipeline indicates an extra closing parenthesis. Please fix the formatting to pass the pipeline checks.

-                return Err(anyhow::anyhow!(
-                    "Failed to retrieve model with selector {:#x}: {}",
-                    event.selector,
-                    e
-                ))
+                return Err(anyhow::anyhow!("Failed to retrieve model with selector {:#x}: {}", event.selector, e))
🧰 Tools
🪛 GitHub Actions: ci

[warning] 72-72: Extra closing parenthesis, formatting issue

crates/torii/indexer/src/processors/store_set_record.rs (1)

67-73: Fix formatting and enhance error context, sensei!

The error handling provides good context, but there are two improvements to consider:

  1. Fix the formatting issue with the extra closing parenthesis.
  2. Consider including the model namespace/name in the error message for better debugging.

Apply this diff:

-            Err(e) => {
-                return Err(anyhow::anyhow!(
-                    "Failed to retrieve model with selector {:#x}: {}",
-                    event.selector,
-                    e
-                ))
-            }
+            Err(e) => return Err(anyhow::anyhow!(
+                "Failed to retrieve model with selector {:#x} (namespace: {}, name: {}): {}",
+                event.selector,
+                model.namespace,
+                model.name,
+                e
+            ))
🧰 Tools
🪛 GitHub Actions: ci

[warning] 69-69: Extra closing parenthesis, formatting issue

crates/torii/indexer/src/processors/store_update_member.rs (1)

64-71: Enhanced error handling with debug logging, sensei!

The addition of debug logging provides better visibility into model absence cases.

Consider using string interpolation for a more idiomatic debug message:

-                debug!(
-                    target: LOG_TARGET,
-                    selector = %model_selector,
-                    "Model does not exist, skipping."
-                );
+                debug!(
+                    target: LOG_TARGET,
+                    "Model with selector {model_selector:#x} does not exist, skipping."
+                );
crates/torii/indexer/src/engine.rs (2)

600-662: Clean up formatting issues.

Please address the following formatting issues:

  • Remove extra whitespace lines at 600, 607, and 616
  • Split the chain method calls at lines 623-625 for better readability
- let processor = processors.iter()
-     .find(|p| p.validate(&event.event))
-     .expect("Must find at least one processor for the event");
+ let processor = processors
+     .iter()
+     .find(|p| p.validate(&event.event))
+     .expect("Must find at least one processor for the event");
🧰 Tools
🪛 GitHub Actions: ci

[warning] 600-600: Extra whitespace line found, formatting issue


[warning] 607-607: Extra whitespace line found, formatting issue


[warning] 616-616: Extra whitespace line found, formatting issue


[warning] 623-625: Chain method calls should be split into multiple lines for better readability


907-920: Format method chain for better readability.

Split the long chain of method calls across multiple lines.

-            self.tasks
-                .entry(task_priority)
-                .or_default()
-                .entry(task_identifier)
-                .or_default()
-                .push((
-                    contract_type,
-                    ParallelizedEvent {
-                        event_id: event_id.to_string(),
-                        event: event.clone(),
-                        block_number,
-                        block_timestamp,
-                    },
-                ));
+            self.tasks
+                .entry(task_priority)
+                .or_default()
+                .entry(task_identifier)
+                .or_default()
+                .push((
+                    contract_type,
+                    ParallelizedEvent {
+                        event_id: event_id.to_string(),
+                        event: event.clone(),
+                        block_number,
+                        block_timestamp,
+                    },
+                ));
🧰 Tools
🪛 GitHub Actions: ci

[warning] 907-917: Long chain of method calls should be formatted across multiple lines

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 312befa and 2ad2772.

📒 Files selected for processing (5)
  • crates/torii/indexer/src/engine.rs (5 hunks)
  • crates/torii/indexer/src/processors/store_del_record.rs (2 hunks)
  • crates/torii/indexer/src/processors/store_set_record.rs (2 hunks)
  • crates/torii/indexer/src/processors/store_update_member.rs (3 hunks)
  • crates/torii/indexer/src/processors/store_update_record.rs (2 hunks)
🧰 Additional context used
🪛 GitHub Actions: ci
crates/torii/indexer/src/processors/store_set_record.rs

[warning] 69-69: Extra closing parenthesis, formatting issue

crates/torii/indexer/src/processors/store_update_member.rs

[warning] 74-74: Extra closing parenthesis, formatting issue

crates/torii/indexer/src/processors/store_del_record.rs

[warning] 68-68: Extra closing parenthesis, formatting issue

crates/torii/indexer/src/processors/store_update_record.rs

[warning] 72-72: Extra closing parenthesis, formatting issue

crates/torii/indexer/src/engine.rs

[warning] 600-600: Extra whitespace line found, formatting issue


[warning] 607-607: Extra whitespace line found, formatting issue


[warning] 616-616: Extra whitespace line found, formatting issue


[warning] 623-625: Chain method calls should be split into multiple lines for better readability


[warning] 900-900: Missing comma in match expression


[warning] 907-917: Long chain of method calls should be formatted across multiple lines

🔇 Additional comments (11)
crates/torii/indexer/src/processors/store_del_record.rs (2)

38-38: Ohayo! Parameter renaming looks good, sensei!

Removing the underscore prefix is appropriate since we're now using the config parameter.


58-65: Nice error handling improvement, sensei!

The additional check for non-empty namespaces prevents silently ignoring errors when no namespaces are configured, making the behavior more explicit and safer.

crates/torii/indexer/src/processors/store_update_record.rs (2)

39-39: Ohayo! Parameter renaming looks good, sensei!

The change from _config to config properly reflects that this parameter is now being utilized in the implementation.


62-76: Excellent error handling improvements, sensei!

The changes enhance error handling in two valuable ways:

  1. The condition !config.namespaces.is_empty() ensures we only skip missing models when explicitly filtering by namespaces
  2. The error message now includes the selector in hex format for better debugging
🧰 Tools
🪛 GitHub Actions: ci

[warning] 72-72: Extra closing parenthesis, formatting issue

crates/torii/indexer/src/processors/store_set_record.rs (2)

39-39: Ohayo! Parameter rename looks good, sensei!

The removal of the underscore prefix is appropriate since the config parameter is now actively used in the error handling logic.


59-66: Ohayo! Nice enhancement to the namespace-specific indexing logic, sensei!

The condition !config.namespaces.is_empty() effectively handles the case where only specific namespaces are being indexed.

crates/torii/indexer/src/processors/store_update_member.rs (3)

10-10: Ohayo! Import changes look good, sensei!

The addition of debug to the tracing import aligns well with the new debug logging capability.


40-40: Parameter rename reflects its usage, sensei!

The rename from _config to config accurately reflects that the parameter is now actively used in the function.


73-77: ⚠️ Potential issue

Fix the formatting issue, sensei!

There's an extra closing parenthesis as indicated by the pipeline failure.

Apply this fix:

-                return Err(anyhow::anyhow!(
-                    "Failed to retrieve model with selector {:#x}: {}",
-                    event.selector,
-                    e
-                ))
+                return Err(anyhow::anyhow!(
+                    "Failed to retrieve model with selector {:#x}: {}",
+                    event.selector,
+                    e
+                ))

Likely invalid or redundant comment.

🧰 Tools
🪛 GitHub Actions: ci

[warning] 74-74: Extra closing parenthesis, formatting issue

crates/torii/indexer/src/engine.rs (2)

Line range hint 210-256: Ohayo, sensei! LGTM - Clean type definitions and struct changes.

The introduction of type aliases and the switch to BTreeMap for tasks improves code organization and enables priority-based processing.


603-666: Ohayo, sensei! Excellent implementation of priority-based task processing.

The sequential processing of priority levels while maintaining concurrency within each level is well-designed.

🧰 Tools
🪛 GitHub Actions: ci

[warning] 607-607: Extra whitespace line found, formatting issue


[warning] 616-616: Extra whitespace line found, formatting issue


[warning] 623-625: Chain method calls should be split into multiple lines for better readability

Comment on lines 889 to 903
let (task_priority, task_identifier) = match processor.event_key().as_str() {
"ModelRegistered" | "EventRegistered" => {
let mut hasher = DefaultHasher::new();
event.keys.iter().for_each(|k| k.hash(&mut hasher));
let hash = hasher.finish();
(0usize, hash) // Priority 0 (highest) for model/event registration
}
"StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => {
let mut hasher = DefaultHasher::new();
// model selector
event.keys[1].hash(&mut hasher);
// entity id
event.keys[2].hash(&mut hasher);
hasher.finish()
let hash = hasher.finish();
(2usize, hash) // Priority 2 (lower) for store operations
}
_ => 0,
_ => (0, 0) // No parallelization for other events
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix match expression syntax.

Add missing comma in the match expression.

             "StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => {
                 let mut hasher = DefaultHasher::new();
                 event.keys[1].hash(&mut hasher);
                 event.keys[2].hash(&mut hasher);
                 let hash = hasher.finish();
-                (2usize, hash) // Priority 2 (lower) for store operations
+                (2usize, hash), // Priority 2 (lower) for store operations
             }
             _ => (0, 0) // No parallelization for other events
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let (task_priority, task_identifier) = match processor.event_key().as_str() {
"ModelRegistered" | "EventRegistered" => {
let mut hasher = DefaultHasher::new();
event.keys.iter().for_each(|k| k.hash(&mut hasher));
let hash = hasher.finish();
(0usize, hash) // Priority 0 (highest) for model/event registration
}
"StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => {
let mut hasher = DefaultHasher::new();
// model selector
event.keys[1].hash(&mut hasher);
// entity id
event.keys[2].hash(&mut hasher);
hasher.finish()
let hash = hasher.finish();
(2usize, hash) // Priority 2 (lower) for store operations
}
_ => 0,
_ => (0, 0) // No parallelization for other events
let (task_priority, task_identifier) = match processor.event_key().as_str() {
"ModelRegistered" | "EventRegistered" => {
let mut hasher = DefaultHasher::new();
event.keys.iter().for_each(|k| k.hash(&mut hasher));
let hash = hasher.finish();
(0usize, hash) // Priority 0 (highest) for model/event registration
}
"StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => {
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
let hash = hasher.finish();
(2usize, hash), // Priority 2 (lower) for store operations
}
_ => (0, 0) // No parallelization for other events
🧰 Tools
🪛 GitHub Actions: ci

[warning] 900-900: Missing comma in match expression

@Larkooo Larkooo changed the title feat(torii-indexer): natural priority in parallelized tasks for model… feat(torii-indexer): parallelize models & event messages Jan 15, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
crates/torii/indexer/src/engine.rs (2)

605-668: Ohayo, sensei! Consider enhancing error handling in task processing.

While the priority-based processing logic is solid, the error handling could be more robust. Currently, errors in individual event processing are logged but don't affect other events or tasks.

Consider implementing a more sophisticated error handling strategy:

 for (contract_type, event) in events {
     let contract_processors = processors.get_event_processor(contract_type);
     if let Some(processors) = contract_processors.get(&event.event.keys[0]) {
-        let processor = processors
-            .iter()
-            .find(|p| p.validate(&event.event))
-            .expect("Must find at least one processor for the event");
+        let processor = processors
+            .iter()
+            .find(|p| p.validate(&event.event))
+            .ok_or_else(|| anyhow::anyhow!("No valid processor found for event"))?;

         debug!(
             target: LOG_TARGET,
             event_name = processor.event_key(),
             task_id = %task_id,
             priority = %priority,
             "Processing parallelized event."
         );

-        if let Err(e) = processor
-            .process(
+        processor
+            .process(
                 &world,
                 &mut local_db,
                 event.block_number,
                 event.block_timestamp,
                 &event.event_id,
                 &event.event,
                 &event_processor_config,
             )
-            .await
-        {
-            error!(
-                target: LOG_TARGET,
-                event_name = processor.event_key(),
-                error = %e,
-                task_id = %task_id,
-                priority = %priority,
-                "Processing parallelized event."
-            );
-        }
+            .await
+            .map_err(|e| {
+                error!(
+                    target: LOG_TARGET,
+                    event_name = processor.event_key(),
+                    error = %e,
+                    task_id = %task_id,
+                    priority = %priority,
+                    "Processing parallelized event."
+                );
+                e
+            })?;
     }
 }

837-837: Ohayo, sensei! Enhance error context in raw event storage.

Consider adding more context to the error when storing raw events fails.

-self.db.store_event(event_id, event, transaction_hash, block_timestamp)?;
+self.db.store_event(event_id, event, transaction_hash, block_timestamp)
+    .with_context(|| format!("Failed to store raw event {}", event_id))?;
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f461cea and bbd1462.

📒 Files selected for processing (1)
  • crates/torii/indexer/src/engine.rs (8 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: fmt
🔇 Additional comments (2)
crates/torii/indexer/src/engine.rs (2)

Line range hint 211-223: Ohayo, sensei! Excellent choice of data structures for task prioritization.

The use of BTreeMap for the tasks field ensures natural ordering by priority, which is essential for sequential processing of priority levels. The type aliases improve code readability.


884-928: ⚠️ Potential issue

Ohayo, sensei! Guard against array access in event key handling.

When processing store operations, the code assumes event.keys[1] and event.keys[2] exist. This could lead to panics if the event keys array is shorter than expected.

Add bounds checking:

 "StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => {
+    if event.keys.len() < 3 {
+        error!(
+            target: LOG_TARGET,
+            "Insufficient event keys for store operation. Expected at least 3 keys."
+        );
+        return Ok(());
+    }
     let mut hasher = DefaultHasher::new();
     event.keys[1].hash(&mut hasher);
     event.keys[2].hash(&mut hasher);
     let hash = hasher.finish();
     (2usize, hash) // Priority 2 (lower) for store operations
 }

Likely invalid or redundant comment.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
crates/torii/indexer/src/engine.rs (1)

605-668: Ohayo, sensei! Consider enhancing task processing robustness.

While the implementation effectively manages task prioritization and concurrency, consider these improvements:

  1. Add timeout handling for task groups to prevent priority level deadlocks
  2. Implement more granular error handling for individual task failures
 for (priority, task_group) in std::mem::take(&mut self.tasks) {
+    let timeout = Duration::from_secs(30); // Configurable timeout
     let mut handles = Vec::new();
     
     // Process all tasks within this priority level concurrently
     for (task_id, events) in task_group {
         let db = self.db.clone();
         let world = self.world.clone();
         let semaphore = semaphore.clone();
         let processors = self.processors.clone();
         let event_processor_config = self.config.event_processor_config.clone();
 
         handles.push(tokio::spawn(async move {
             let _permit = semaphore.acquire().await?;
             let mut local_db = db.clone();
+            let mut task_errors = Vec::new();
 
             // Process all events for this task sequentially
             for (contract_type, event) in events {
                 let contract_processors = processors.get_event_processor(contract_type);
                 if let Some(processors) = contract_processors.get(&event.event.keys[0]) {
                     let processor = processors
                         .iter()
                         .find(|p| p.validate(&event.event))
                         .expect("Must find at least one processor for the event");
 
                     if let Err(e) = processor
                         .process(
                             &world,
                             &mut local_db,
                             event.block_number,
                             event.block_timestamp,
                             &event.event_id,
                             &event.event,
                             &event_processor_config,
                         )
                         .await
                     {
-                        error!(
+                        task_errors.push((
                             target: LOG_TARGET,
                             event_name = processor.event_key(),
                             error = %e,
                             task_id = %task_id,
                             priority = %priority,
                             "Processing parallelized event."
-                        );
+                        ));
                     }
                 }
             }
+            if !task_errors.is_empty() {
+                error!(target: LOG_TARGET, "Task {task_id} completed with {} errors", task_errors.len());
+                for (event_name, error, task_id, priority) in task_errors {
+                    error!(target: LOG_TARGET, %event_name, %error, %task_id, %priority, "Event processing error");
+                }
+            }
             Ok::<_, anyhow::Error>(())
         }));
     }
 
     // Wait for all tasks in this priority level to complete before moving to next priority
-    try_join_all(handles).await?;
+    match tokio::time::timeout(timeout, try_join_all(handles)).await {
+        Ok(result) => result?,
+        Err(_) => {
+            error!(target: LOG_TARGET, "Priority level {priority} tasks timed out");
+            return Err(anyhow::anyhow!("Task processing timeout"));
+        }
+    }
 }
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bbd1462 and 0a73aa6.

📒 Files selected for processing (1)
  • crates/torii/indexer/src/engine.rs (8 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: fmt
🔇 Additional comments (2)
crates/torii/indexer/src/engine.rs (2)

211-212: Ohayo, sensei! LGTM - Well-structured task prioritization system.

The type aliases and BTreeMap implementation provide a clear and ordered approach to task prioritization.

Also applies to: 223-223, 257-257


884-914: ⚠️ Potential issue

Ohayo, sensei! Guard against array access and hash collisions.

The task identifier generation has potential issues:

  1. Missing bounds check when accessing event.keys[1] and event.keys[2]
  2. Hash collisions could lead to task merging

Apply this diff to add necessary checks:

 "StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => {
+    if event.keys.len() < 3 {
+        error!(target: LOG_TARGET, "Insufficient keys for store operation event");
+        return (0, 0); // Process immediately as fallback
+    }
     let mut hasher = DefaultHasher::new();
     event.keys[1].hash(&mut hasher);
     event.keys[2].hash(&mut hasher);
-    let hash = hasher.finish();
+    // Use more bits to reduce collision probability
+    let hash = hasher.finish() & 0x00FFFFFFFFFFFFFF;
     (2usize, hash) // Priority 2 (lower) for store operations
 }

Likely invalid or redundant comment.

Comment on lines +898 to 913
"EventEmitted" => {
let mut hasher = DefaultHasher::new();

let keys = Vec::<Felt>::cairo_deserialize(&event.data, 0).unwrap_or_else(|e| {
panic!("Expected EventEmitted keys to be well formed: {:?}", e);
});

// selector
event.keys[1].hash(&mut hasher);
// entity id
let entity_id = poseidon_hash_many(&keys);
entity_id.hash(&mut hasher);

let hash = hasher.finish();
(2usize, hash) // Priority 2 for event messages
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Replace panic with proper error handling in EventEmitted processing.

The current implementation panics on deserialization failure, which is not production-safe.

Apply this diff to handle errors gracefully:

 "EventEmitted" => {
     let mut hasher = DefaultHasher::new();
 
-    let keys = Vec::<Felt>::cairo_deserialize(&event.data, 0).unwrap_or_else(|e| {
-        panic!("Expected EventEmitted keys to be well formed: {:?}", e);
-    });
+    let keys = match Vec::<Felt>::cairo_deserialize(&event.data, 0) {
+        Ok(keys) => keys,
+        Err(e) => {
+            error!(
+                target: LOG_TARGET,
+                error = ?e,
+                "Failed to deserialize EventEmitted keys. Processing immediately."
+            );
+            return (0, 0); // Process immediately as fallback
+        }
+    };
 
     // selector
     event.keys[1].hash(&mut hasher);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"EventEmitted" => {
let mut hasher = DefaultHasher::new();
let keys = Vec::<Felt>::cairo_deserialize(&event.data, 0).unwrap_or_else(|e| {
panic!("Expected EventEmitted keys to be well formed: {:?}", e);
});
// selector
event.keys[1].hash(&mut hasher);
// entity id
let entity_id = poseidon_hash_many(&keys);
entity_id.hash(&mut hasher);
let hash = hasher.finish();
(2usize, hash) // Priority 2 for event messages
}
"EventEmitted" => {
let mut hasher = DefaultHasher::new();
let keys = match Vec::<Felt>::cairo_deserialize(&event.data, 0) {
Ok(keys) => keys,
Err(e) => {
error!(
target: LOG_TARGET,
error = ?e,
"Failed to deserialize EventEmitted keys. Processing immediately."
);
return (0, 0); // Process immediately as fallback
}
};
// selector
event.keys[1].hash(&mut hasher);
// entity id
let entity_id = poseidon_hash_many(&keys);
entity_id.hash(&mut hasher);
let hash = hasher.finish();
(2usize, hash) // Priority 2 for event messages
}

Copy link

codecov bot commented Jan 15, 2025

Codecov Report

Attention: Patch coverage is 66.11570% with 41 lines in your changes missing coverage. Please review.

Project coverage is 55.20%. Comparing base (a8af3d6) to head (0a73aa6).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
crates/torii/indexer/src/engine.rs 84.52% 13 Missing ⚠️
...orii/indexer/src/processors/store_update_member.rs 0.00% 13 Missing ⚠️
...s/torii/indexer/src/processors/store_del_record.rs 37.50% 5 Missing ⚠️
...s/torii/indexer/src/processors/store_set_record.rs 37.50% 5 Missing ⚠️
...orii/indexer/src/processors/store_update_record.rs 37.50% 5 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2912      +/-   ##
==========================================
- Coverage   55.22%   55.20%   -0.02%     
==========================================
  Files         449      449              
  Lines       58335    58395      +60     
==========================================
+ Hits        32215    32237      +22     
- Misses      26120    26158      +38     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Larkooo Larkooo enabled auto-merge (squash) January 15, 2025 16:57
Copy link
Collaborator

@glihm glihm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great improvement with that.
Some suggestions, if not something you consider for now, please don't hesitate and we will merge as is. 👍

@@ -207,6 +208,9 @@ pub struct ParallelizedEvent {
pub event: Event,
}

type TaskPriority = usize;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wdyt about an enum, using the Ord trait to keep keys ordered in the BTreeMap?
Feeling like enum would be more idiomatic, and remove the magic values like we have in the engine.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like:

enum TaskPriority {
    Low,
    Medium,
    High,
}

let mut hasher = DefaultHasher::new();
event.keys.iter().for_each(|k| k.hash(&mut hasher));
let hash = hasher.finish();
(0usize, hash) // Priority 0 (highest) for model/event registration
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
(0usize, hash) // Priority 0 (highest) for model/event registration
(TaskPriority::High, hash) // Priority 0 (highest) for model/event registration

event.keys[2].hash(&mut hasher);
hasher.finish()
let hash = hasher.finish();
(2usize, hash) // Priority 2 (lower) for store operations
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
(2usize, hash) // Priority 2 (lower) for store operations
(TaskPriority::Low, hash) // Priority 2 (lower) for store operations

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants