Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(gate): task timeout management #957

Closed
wants to merge 4 commits into from
Closed

Conversation

luckasRanarison
Copy link
Contributor

@luckasRanarison luckasRanarison commented Jan 9, 2025

  • Fix task timeout management logic which previously spawned an interval timer for each messenger that continuously run.

Migration notes


  • The change comes with new or modified tests
  • Hard-to-understand functions have explanatory comments
  • End-user documentation is updated to reflect the change

Summary by CodeRabbit

  • New Features

    • Enhanced task timeout management in messenger functionality.
    • Added support for tracking timeout identifiers during asynchronous operations.
  • Improvements

    • Streamlined timeout handling process.
    • Centralized task lifecycle management.
    • Improved clarity in task execution flow.
    • Reformatted type declarations for better clarity.

Copy link
Contributor

coderabbitai bot commented Jan 9, 2025

📝 Walkthrough

Walkthrough

The changes involve significant modifications to the AsyncMessenger class within the messaging pattern implementation. The initTimer method has been removed and replaced with a new private method, startTaskTimer, which centralizes task timeout management. The terminate method has been updated to clear task timeouts, and the execute method now manages a timeoutId for each task. Additionally, the TaskData interface has been enhanced with a new optional property, timeoutId, to support the updated timeout handling mechanism.

Changes

File Change Summary
src/typegate/src/runtimes/patterns/messenger/async_messenger.ts - Removed initTimer method
- Added startTaskTimer private method
- Modified terminate method to clear task timeouts
- Updated execute method to create and manage timeoutId
src/typegate/src/runtimes/patterns/messenger/types.ts - Reformatted Answer<T> type
- Added optional timeoutId: number to TaskData interface

Sequence Diagram

sequenceDiagram
    participant AsyncMessenger
    participant Task
    participant Broker

    AsyncMessenger->>Task: execute()
    AsyncMessenger->>Task: startTaskTimer()
    activate Task
    Task-->>AsyncMessenger: timeoutId
    AsyncMessenger-->>Task: monitor pulse
    alt Timeout occurs
        AsyncMessenger->>Task: receive()
    else Task completes
        Task->>AsyncMessenger: resolve promise
    end
    deactivate Task
Loading

The sequence diagram illustrates the new task execution and timeout management flow, showing how the startTaskTimer method monitors task lifecycle and handles potential timeouts.

Finishing Touches

  • 📝 Generate Docstrings

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (2)
src/typegate/src/runtimes/patterns/messenger/async_messenger.ts (2)

Line range hint 33-33: Remove unused timer property.

The #timer property appears to be unused after the refactoring. It can be safely removed.

- #timer?: ReturnType<typeof setInterval>;

110-114: Document resource destruction behavior.

The resource destruction behavior on timeout is significant and should be documented. Consider adding a JSDoc comment explaining when and why resources are destroyed.

+  /**
+   * Destroys and recreates the broker when a task times out if timer_destroy_resources is enabled.
+   * This helps prevent resource leaks and reset the broker to a clean state.
+   */
   if (this.config.timer_destroy_resources) {
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3d8dac2 and e63c1b1.

📒 Files selected for processing (2)
  • src/typegate/src/runtimes/patterns/messenger/async_messenger.ts (4 hunks)
  • src/typegate/src/runtimes/patterns/messenger/types.ts (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: lint-compat (macos-14, aarch64-apple-darwin, false)
  • GitHub Check: lint-compat (macos-13, x86_64-apple-darwin, false)
  • GitHub Check: bulid-docker (linux/amd64, custom-ubuntu-large)
  • GitHub Check: test-full
  • GitHub Check: pre-commit
🔇 Additional comments (3)
src/typegate/src/runtimes/patterns/messenger/types.ts (1)

14-15: LGTM! Clean formatting of union types.

The separation of union types into distinct lines improves code readability.

src/typegate/src/runtimes/patterns/messenger/async_messenger.ts (2)

23-23: LGTM! Clear type definition.

The config type clearly specifies the required timer-related configuration fields.


72-74: LGTM! Proper timeout initialization.

The timeout is now properly initialized and stored with the task data.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
src/typegate/src/runtimes/patterns/messenger/async_messenger.ts (2)

104-108: Consider sanitizing the error message.

The current implementation includes the entire task object in the error message, which might expose sensitive information. Consider creating a sanitized version with only necessary fields.

-const data = JSON.stringify(task, null, 2);
+const data = JSON.stringify({
+  id: task.id,
+  op: task.op,
+  remainingPulseCount: task.remainingPulseCount
+}, null, 2);

92-117: Add documentation for the recursive timeout strategy.

The recursive timeout implementation is non-trivial. Consider adding JSDoc comments explaining:

  • The purpose of remainingPulseCount
  • The recursive timeout mechanism
  • Resource cleanup behavior
+/**
+ * Manages task timeouts using a recursive strategy.
+ * @param task The message task to monitor
+ * @returns The timeout ID for the current check
+ *
+ * The method implements a pulse-based monitoring system where:
+ * - Each pulse decrements remainingPulseCount
+ * - Tasks with remaining pulses are rescheduled
+ * - Tasks with no remaining pulses trigger timeout handling
+ */
 private startTaskTimer(task: Message<M>) {
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e63c1b1 and 9c0c5ab.

📒 Files selected for processing (1)
  • src/typegate/src/runtimes/patterns/messenger/async_messenger.ts (4 hunks)
🧰 Additional context used
📓 Learnings (1)
src/typegate/src/runtimes/patterns/messenger/async_messenger.ts (1)
Learnt from: luckasRanarison
PR: metatypedev/metatype#957
File: src/typegate/src/runtimes/patterns/messenger/async_messenger.ts:92-117
Timestamp: 2025-01-09T07:12:37.705Z
Learning: In AsyncMessenger class, task IDs are incremented sequentially and tasks are removed upon completion, ensuring no timeout collisions. Previous timeouts don't need to be cleared as tasks with same IDs won't exist simultaneously due to the increment and cleanup mechanism.
⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: lint-compat (macos-14, aarch64-apple-darwin, false)
  • GitHub Check: lint-compat (macos-13, x86_64-apple-darwin, false)
  • GitHub Check: test-full
  • GitHub Check: bulid-docker (linux/amd64, custom-ubuntu-large)
  • GitHub Check: pre-commit
🔇 Additional comments (4)
src/typegate/src/runtimes/patterns/messenger/async_messenger.ts (4)

23-23: LGTM: Clear and well-defined configuration type.

The type definition correctly specifies the required configuration properties for timeout management.


52-57: LGTM: Proper cleanup of tasks and timeouts.

The implementation correctly handles task cleanup by clearing timeouts and resolving promises before termination.


72-74: LGTM: Clean timeout initialization and storage.

The implementation properly initializes and stores the timeout ID for each task, maintaining the task lifecycle.


110-114: Verify the resource cleanup strategy.

The current implementation destroys and recreates the broker on timeout. This might be too aggressive if multiple tasks are running concurrently.

Consider:

  1. Adding a grace period before broker reset
  2. Logging additional context about concurrent tasks
✅ Verification successful

The broker reset implementation is appropriate and safe

The current implementation is well-designed with proper task management:

  • Tasks are tracked independently of the broker state
  • The reset only occurs for timed-out tasks and is configurable
  • Concurrent tasks are properly managed through the task tracking system
  • The terminate() method ensures clean shutdown by waiting for all tasks

The aggressive reset strategy is intentional and safe as it only affects tasks that have already timed out, while maintaining the integrity of other running tasks.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for broker usage patterns to understand impact
rg -A 3 "this\.broker = this\.#start" .

Length of output: 795


Script:

#!/bin/bash
# Check broker implementation and task management
ast-grep --pattern 'class $_Messenger {
  $$$
  #start($_) {
    $$$
  }
  $$$
}'

echo "---"
# Look for task tracking or queue management
rg -B 2 -A 5 "task|queue|concurrent" src/typegate/src/runtimes/patterns/messenger/

Length of output: 6097

Natoandro
Natoandro previously approved these changes Jan 9, 2025
@luckasRanarison
Copy link
Contributor Author

Ouch, I missed something

michael-0acf4
michael-0acf4 previously approved these changes Jan 9, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/typegate/src/runtimes/patterns/messenger/async_messenger.ts (1)

96-125: Consider enhancing error handling and documentation

The implementation of task timeout management is solid, but could benefit from these improvements:

  1. Structure the error message as JSON for better parsing
  2. Add JSDoc comments explaining the recursive timeout strategy and pulse count mechanism
 private startTaskTimer(message: Message<M>) {
+  /**
+   * Manages task timeouts with support for pulse counting.
+   * When pulse count > 0, the timeout is reset and decremented,
+   * allowing long-running tasks to stay alive by "pulsing".
+   * @param message The task message containing ID and pulse count
+   * @returns Timeout ID for cleanup
+   */
   return setTimeout(() => {
     const task = this.#tasks.get(message.id);

     if (!task) return;

     if (
       message.remainingPulseCount !== undefined &&
       message.remainingPulseCount > 0
     ) {
       // check again next time if unterminated
       message.remainingPulseCount -= 1;
       task.timeoutId = this.startTaskTimer(message);
       return;
     }

     // default behavior or 0 pulse left
-    const data = JSON.stringify(message, null, 2);
     this.receive({
       id: message.id,
-      error: `${this.#timeoutSecs}s timeout exceeded: ${data}`,
+      error: JSON.stringify({
+        code: 'TASK_TIMEOUT',
+        message: `Task timeout after ${this.#timeoutSecs}s`,
+        details: message
+      })
     });

     if (this.config.timer_destroy_resources) {
       this.#stop(this.broker);
       logger.info("reset broker after timeout");
       this.broker = this.#start(this.receive.bind(this));
     }
   }, this.config.timer_max_timeout_ms);
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9c0c5ab and e54b954.

📒 Files selected for processing (1)
  • src/typegate/src/runtimes/patterns/messenger/async_messenger.ts (3 hunks)
🧰 Additional context used
📓 Learnings (2)
📓 Common learnings
Learnt from: luckasRanarison
PR: metatypedev/metatype#957
File: src/typegate/src/runtimes/patterns/messenger/async_messenger.ts:92-117
Timestamp: 2025-01-09T07:12:37.705Z
Learning: In AsyncMessenger class, task IDs are incremented sequentially and tasks are removed upon completion, ensuring no timeout collisions. Previous timeouts don't need to be cleared as tasks with same IDs won't exist simultaneously due to the increment and cleanup mechanism.
src/typegate/src/runtimes/patterns/messenger/async_messenger.ts (1)
Learnt from: luckasRanarison
PR: metatypedev/metatype#957
File: src/typegate/src/runtimes/patterns/messenger/async_messenger.ts:92-117
Timestamp: 2025-01-09T07:12:37.705Z
Learning: In AsyncMessenger class, task IDs are incremented sequentially and tasks are removed upon completion, ensuring no timeout collisions. Previous timeouts don't need to be cleared as tasks with same IDs won't exist simultaneously due to the increment and cleanup mechanism.
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: lint-compat (macos-14, aarch64-apple-darwin, false)
  • GitHub Check: lint-compat (macos-13, x86_64-apple-darwin, false)
  • GitHub Check: pre-commit
🔇 Additional comments (4)
src/typegate/src/runtimes/patterns/messenger/async_messenger.ts (4)

23-23: LGTM: Config type update

The addition of timer_destroy_resources to AsyncMessengerConfig type is well-defined and properly marked as readonly.


52-57: LGTM: Proper cleanup of task timeouts

The terminate method now correctly cleans up task-specific timeouts before resolving promises, ensuring no memory leaks.


82-84: LGTM: Proper cleanup in receive

The receive method correctly cleans up the task's timeout before processing the response, maintaining clean task lifecycle management.


72-74: LGTM: Task execution with timeout management

The execute method properly initializes and stores task-specific timeouts. The implementation aligns with the sequential task ID mechanism mentioned in the learnings, preventing timeout collisions.

Let's verify the task ID increment mechanism:

✅ Verification successful

✓ Verified: Task ID generation is correctly implemented

The implementation uses a private counter that generates sequential IDs and properly wraps around using modulo operation, ensuring unique task IDs for timeout management.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Verify that task IDs are properly incremented and wrapped
ast-grep --pattern 'nextId(): number {
  $$$
  this.#counter += 1;
  this.#counter %= $_
  $$$
}'

Length of output: 99


Script:

#!/bin/bash
# Search for ID generation methods with more flexible pattern
ast-grep --pattern 'private #counter'

# Find any method that might handle ID generation
ast-grep --pattern 'getId(): number {
  $$$
}'

# Backup search with ripgrep for counter-related code
rg "#counter" -A 3 -B 3

Length of output: 1932

clearTimeout(t.timeoutId);
return t.promise;
}),
);
logger.info(`close worker ${this.constructor.name}`);
await this.#stop(this.broker);
clearInterval(this.#timer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rm?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, you're right

Copy link

codecov bot commented Jan 9, 2025

Codecov Report

Attention: Patch coverage is 62.85714% with 13 lines in your changes missing coverage. Please review.

Project coverage is 77.69%. Comparing base (3d8dac2) to head (f030c3a).

Files with missing lines Patch % Lines
...src/runtimes/patterns/messenger/async_messenger.ts 62.85% 13 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #957      +/-   ##
==========================================
- Coverage   77.72%   77.69%   -0.04%     
==========================================
  Files         154      154              
  Lines       19154    19144      -10     
  Branches     1930     1928       -2     
==========================================
- Hits        14888    14873      -15     
- Misses       4243     4248       +5     
  Partials       23       23              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
src/typegate/src/runtimes/patterns/messenger/async_messenger.ts (1)

94-108: Consider implementing backoff strategy for pulse retries

The current implementation recursively creates new timeouts without any backoff strategy, which could lead to memory pressure under certain conditions.

Consider implementing an exponential backoff strategy for pulse retries:

   if (
     message.remainingPulseCount !== undefined &&
     message.remainingPulseCount > 0
   ) {
     // check again next time if unterminated
     message.remainingPulseCount -= 1;
-    task.timeoutId = this.startTaskTimer(message);
+    const backoffFactor = Math.pow(2, message.initialPulseCount - message.remainingPulseCount);
+    const nextTimeout = Math.min(this.config.timer_max_timeout_ms * backoffFactor, MAX_TIMEOUT);
+    task.timeoutId = setTimeout(() => this.checkTaskTimeout(message), nextTimeout);
     return;
   }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e54b954 and f030c3a.

📒 Files selected for processing (1)
  • src/typegate/src/runtimes/patterns/messenger/async_messenger.ts (3 hunks)
🧰 Additional context used
📓 Learnings (2)
📓 Common learnings
Learnt from: luckasRanarison
PR: metatypedev/metatype#957
File: src/typegate/src/runtimes/patterns/messenger/async_messenger.ts:92-117
Timestamp: 2025-01-09T07:12:37.705Z
Learning: In AsyncMessenger class, task IDs are incremented sequentially and tasks are removed upon completion, ensuring no timeout collisions. Previous timeouts don't need to be cleared as tasks with same IDs won't exist simultaneously due to the increment and cleanup mechanism.
src/typegate/src/runtimes/patterns/messenger/async_messenger.ts (1)
Learnt from: luckasRanarison
PR: metatypedev/metatype#957
File: src/typegate/src/runtimes/patterns/messenger/async_messenger.ts:92-117
Timestamp: 2025-01-09T07:12:37.705Z
Learning: In AsyncMessenger class, task IDs are incremented sequentially and tasks are removed upon completion, ensuring no timeout collisions. Previous timeouts don't need to be cleared as tasks with same IDs won't exist simultaneously due to the increment and cleanup mechanism.
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: lint-compat (macos-14, aarch64-apple-darwin, false)
  • GitHub Check: bulid-docker (linux/amd64, custom-ubuntu-large)
  • GitHub Check: lint-compat (macos-13, x86_64-apple-darwin, false)
  • GitHub Check: test-full
🔇 Additional comments (4)
src/typegate/src/runtimes/patterns/messenger/async_messenger.ts (4)

51-56: LGTM: Improved resource cleanup

The changes properly clean up timeouts before resolving promises, preventing potential memory leaks.


80-82: LGTM: Proper timeout cleanup on task completion

The code correctly cleans up the timeout when a task completes, preventing unnecessary timer executions.


23-23: Breaking change: Update configuration type

The addition of timer_destroy_resources to TypegateConfigBase requires users to update their configuration. This should be documented in migration notes.


70-72: Verify task timeout race condition handling

While the timeout initialization looks good, there's a potential edge case where a task could complete right after startTaskTimer but before this.#tasks.set. Consider moving the timeout initialization after setting the task.

-    const timeoutId = this.startTaskTimer(message);
-    this.#tasks.set(id, { promise, hooks, timeoutId });
+    this.#tasks.set(id, { promise, hooks, timeoutId: null });
+    const timeoutId = this.startTaskTimer(message);
+    this.#tasks.get(id)!.timeoutId = timeoutId;

@luckasRanarison
Copy link
Contributor Author

Closing because of #954

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants