Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Unify worker manager #954

Merged
merged 20 commits into from
Jan 11, 2025
Merged

feat: Unify worker manager #954

merged 20 commits into from
Jan 11, 2025

Conversation

Natoandro
Copy link
Contributor

@Natoandro Natoandro commented Jan 7, 2025

  • Solves MET-667
    • BaseWorkerManager
    • Use in Deno runtime
    • Use in Python runtime (followup PR)
    • Use in Rust runtime (followup PR)
    • Worker pooling (followup PR)

Migration notes


  • The change comes with new or modified tests
  • Hard-to-understand functions have explanatory comments
  • End-user documentation is updated to reflect the change

Summary by CodeRabbit

Summary by CodeRabbit

Based on the comprehensive summary, here are the updated release notes:

  • New Features

    • Enhanced worker management system with improved task tracking and execution.
    • Introduced new WorkerManager for more robust Deno runtime operations.
    • Added support for inline artifact generation and management.
    • New asynchronous method getInlineArtifact in the ArtifactStore class.
  • Improvements

    • Streamlined messaging and event handling across different runtime components.
    • Improved error reporting and task lifecycle management.
    • Refined type definitions for better type safety.
  • Breaking Changes

    • Removed DenoMessenger and LazyAsyncMessenger classes.
    • Restructured workflow event and message handling.
    • Updated task ID generation mechanism.
  • Performance

    • Optimized worker initialization and task execution.
    • Introduced more efficient task tracking and resource management.
  • Bug Fixes

    • Improved error handling in worker and runtime environments.
    • Enhanced message communication between workers and main thread.
    • Removed outdated test cases to focus on relevant functionality.

Copy link

linear bot commented Jan 7, 2025

Copy link
Contributor

coderabbitai bot commented Jan 7, 2025

📝 Walkthrough

Walkthrough

The provided changes represent a comprehensive refactoring of the worker management and runtime systems within the Deno and Substantial runtime environments. The primary focus is on replacing the existing DenoMessenger and LazyAsyncMessenger classes with a new, more modular WorkerManager architecture. This involves introducing new type definitions, restructuring message handling, and creating a more flexible worker management system that supports better task tracking, error handling, and lifecycle management.

Changes

File Change Summary
src/typegate/src/runtimes/deno/deno.ts Replaced DenoMessenger with WorkerManager, updated constructor and method signatures, added exportInlineFunction method.
src/typegate/src/runtimes/deno/deno_messenger.ts Deleted file containing DenoMessenger class.
src/typegate/src/runtimes/deno/types.ts Added new type definitions: TaskSpec, DenoMessage, DenoEvent.
src/typegate/src/runtimes/deno/worker.ts Restructured message handling, removed task registration functions, updated message handler to use DenoMessage.
src/typegate/src/runtimes/deno/worker_manager.ts New WorkerManager class with enhanced worker management capabilities and a callFunction method.
src/typegate/src/runtimes/patterns/messenger/async_messenger.ts Deleted file with AsyncMessenger implementation.
src/typegate/src/runtimes/patterns/messenger/lazy_async_messenger.ts Deleted file with LazyAsyncMessenger implementation.
src/typegate/src/runtimes/patterns/messenger/types.ts Deleted file with messaging-related type definitions.
src/typegate/src/runtimes/patterns/worker_manager/* New modules for worker management with base classes and type definitions, including BaseWorker and BaseWorkerManager.
src/typegate/src/runtimes/substantial/* Updated type definitions, event handling, and worker management, including changes in Agent and Workflow handling.
src/typegate/src/typegate/artifacts/mod.ts Added getInlineArtifact method to ArtifactStore.
tests/runtimes/deno/deno_test.ts Removed test case for global variable behavior, retained other functionality tests.
tests/runtimes/substantial/common.ts Enhanced test template types and function signatures, added TestTemplateOptions type.

Sequence Diagram

sequenceDiagram
    participant Client
    participant WorkerManager
    participant DenoWorker
    participant WorkerRuntime

    Client->>WorkerManager: callFunction(modulePath, functionName, args)
    WorkerManager->>DenoWorker: create worker
    DenoWorker->>WorkerRuntime: send message with function details
    WorkerRuntime->>DenoWorker: execute function
    alt Function Successful
        DenoWorker->>WorkerManager: return result (SUCCESS)
    else Function Failed
        DenoWorker->>WorkerManager: return error (FAIL/ERROR)
    end
    WorkerManager->>Client: resolve/reject promise
Loading

This sequence diagram illustrates the new workflow for executing functions in the Deno runtime using the WorkerManager. The process involves creating a worker, sending function details, executing the function, and handling the result or error appropriately.


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6c00c14 and 8f2fc68.

📒 Files selected for processing (1)
  • tests/runtimes/deno/deno_sync_test.ts (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: lint-compat (macos-14, aarch64-apple-darwin, false)
  • GitHub Check: lint-compat (macos-13, x86_64-apple-darwin, false)
  • GitHub Check: test-full
  • GitHub Check: bulid-docker (linux/amd64, custom-ubuntu-large)
  • GitHub Check: pre-commit
🔇 Additional comments (1)
tests/runtimes/deno/deno_sync_test.ts (1)

70-91: Document the rationale for removing the global variables test

The removal of this test case appears to be related to changes in worker management behavior ("no worker reuse" comment). With the transition to a unified WorkerManager architecture mentioned in the PR objectives, the behavior of global variables might have changed fundamentally.

Please clarify:

  1. How does the new WorkerManager handle global state persistence?
  2. Should we add new tests that align with the expected behavior of global variables in the unified architecture?

Let's verify the changes to global variable handling:

✅ Verification successful

Test removal is correct due to architectural changes

The test was removed because it no longer reflects the current architecture. The codebase has transitioned to using WorkerManager which handles workers differently, and global state persistence between calls is not guaranteed in this new design.

  • The commented test was testing behavior that's no longer applicable with the unified WorkerManager architecture
  • Worker management is now more systematic with proper lifecycle management
  • Content hashing and worker pooling suggest intentional isolation between calls
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for other tests or implementations related to global variable handling
rg -l "global|state persistence" --type ts

# Look for any documentation about global variable behavior
fd -e md -e txt -x grep -l "global variable"

Length of output: 1116


Script:

#!/bin/bash
# Check Deno runtime implementation
ast-grep --pattern 'class $_ {
  $$$
  worker$_($$$) {
    $$$
  }
  $$$
}' src/typegate/src/runtimes/deno/deno.ts

# Look for similar test patterns
ast-grep --pattern 'should("$_$_ global $_", async () => {
  $$$
})' tests/runtimes/deno/deno_test.ts

# Check for worker management related code
rg "WorkerManager|worker" src/typegate/src/runtimes/deno/deno.ts -A 2

Length of output: 1522

Finishing Touches

  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

codecov bot commented Jan 7, 2025

Codecov Report

Attention: Patch coverage is 82.05689% with 82 lines in your changes missing coverage. Please review.

Project coverage is 77.76%. Comparing base (3d8dac2) to head (8f2fc68).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...pegate/src/runtimes/patterns/worker_manager/mod.ts 71.42% 34 Missing ⚠️
src/typegate/src/runtimes/substantial/agent.ts 71.21% 19 Missing ⚠️
src/typegate/src/runtimes/deno/deno.ts 82.75% 10 Missing ⚠️
...egate/src/runtimes/patterns/worker_manager/deno.ts 82.00% 9 Missing ⚠️
src/typegate/src/typegate/artifacts/mod.ts 68.42% 6 Missing ⚠️
...rc/runtimes/substantial/workflow_worker_manager.ts 90.90% 3 Missing ⚠️
src/typegate/src/runtimes/deno/worker_manager.ts 98.36% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #954      +/-   ##
==========================================
+ Coverage   77.72%   77.76%   +0.04%     
==========================================
  Files         154      154              
  Lines       19154    19019     -135     
  Branches     1930     1918      -12     
==========================================
- Hits        14888    14791      -97     
+ Misses       4243     4205      -38     
  Partials       23       23              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@luckasRanarison luckasRanarison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I can close my PR?

@Natoandro
Copy link
Contributor Author

So I can close my PR?

Sure 👉

@Natoandro Natoandro marked this pull request as ready for review January 9, 2025 18:30
@Natoandro
Copy link
Contributor Author

@coderabbitai review

Copy link
Contributor

coderabbitai bot commented Jan 9, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🧹 Nitpick comments (21)
src/typegate/src/runtimes/deno/worker.ts (1)

47-49: Handle unknown message types by sending a FAILURE message

Instead of throwing an error for unknown message types, consider sending a FAILURE message back to the main thread to gracefully handle unexpected input.

Apply this diff to handle unknown message types:

       default:
-        // unreachable
-        throw new Error(`Unknown message type: ${type}`);
+        self.postMessage({
+          type: "FAILURE",
+          error: `Unknown message type: ${type}`,
+        });
+        return;
src/typegate/src/runtimes/patterns/worker_manager/deno.ts (1)

46-47: Make the 'onerror' handler asynchronous

The onerror handler should be marked as async to maintain consistency with the onmessage handler and ensure that any asynchronous operations within handlerFn are properly awaited.

Apply this diff:

     this.#worker.onerror = /*async*/ (event) =>
+    this.#worker.onerror = async (event) =>
src/typegate/src/runtimes/deno/worker_manager.ts (1)

64-64: Wrap error messages in Error objects when rejecting Promises

To ensure consistent error handling, wrap error messages in Error objects when rejecting Promises. This allows downstream code to handle errors effectively using standard error handling mechanisms.

Apply this diff:

             case "FAILURE":
-              reject(event.exception ?? event.error);
+              if (event.exception) {
+                reject(event.exception);
+              } else {
+                reject(new Error(event.error));
+              }
src/typegate/src/runtimes/substantial/workflow_worker_manager.ts (1)

88-99: Avoid duplication of modulePath in worker creation and message data

In triggerStart(), you're passing modulePath both when creating the worker and in the message data sent to the worker. If the worker can access modulePath from its task specification, consider removing the duplication to simplify the code and reduce potential inconsistencies.

Apply this diff to eliminate the duplication:

  this.createWorker(name, runId, {
    modulePath: workflowModPath,
  });
  this.sendMessage(runId, {
    type: "START",
    data: {
-     modulePath: workflowModPath,
      functionName: name,
      run: storedRun,
      schedule,
      internal: internalTCtx,
    },
  });
src/typegate/src/runtimes/patterns/worker_manager/mod.ts (1)

74-74: Address the TODO comment in createWorker method

There's a TODO comment // TODO inline in the createWorker() method. To ensure code clarity and maintainability, please address this TODO by either implementing the intended functionality or removing the comment if it's no longer relevant.

Do you want me to help implement the required functionality or open a new GitHub issue to track this task?

src/typegate/src/runtimes/substantial/agent.ts (11)

49-49: Trailing Comma in Constructor

There's an unnecessary trailing comma in the constructor parameters after config.

Apply this diff to remove the trailing comma:

 constructor(
   private backend: Backend,
   private queue: string,
-  private config: AgentConfig,
+  private config: AgentConfig
 ) {

101-105: Concatenate Workflow Names Properly

The logging statement in the start method uses unnecessary concatenation. The template literal can be simplified.

Apply this diff to simplify the logging statement:

     this.logger.warn(
-        `Initializing agent to handle ${
-          workflows
-            .map(({ name }) => name)
-            .join(", ")
-        }`,
+        `Initializing agent to handle ${workflows.map(({ name }) => name).join(", ")}`
     );

144-144: Comparison Using Strict Equality

In the filter function, use strict equality (===) for string comparison to ensure type safety.

Apply this diff to update the comparison:

       const requests = replayRequests.filter(
-            ({ run_id }) => getTaskNameFromId(run_id) == workflow.name,
+            ({ run_id }) => getTaskNameFromId(run_id) === workflow.name,
       );

201-201: Typographical Error in Log Message

There's a minor grammatical error in the log message: "skip triggering" should be "Skipping triggering" for clarity.

Apply this diff to correct the log message:

         this.logger.warn(
-            `skip triggering ${next.run_id} for the current tick as it is still ongoing`,
+            `Skipping triggering ${next.run_id} for the current tick as it is still ongoing`,
         );

248-252: Improve Clarity in Log Message Formatting

The log message uses nested template literals and JSON.stringify, which can be simplified for better readability.

Apply this diff to simplify the log message:

           this.logger.warn(
-              `First item in the operation list is not a Start, got "${
-                JSON.stringify(
-                  first,
-                )
-              }" instead. Closing the underlying schedule.`,
+              `First item in the operation list is not a Start, got "${JSON.stringify(first)}" instead. Closing the underlying schedule.`
           );

289-292: Return Type Annotation for Event Handler

The method #eventResultHandlerFor should explicitly state its return type to ensure type safety and readability.

Apply this diff to add the return type annotation:

     #eventResultHandlerFor(
       workflowName: string,
       runId: string,
+    ): EventHandler<WorkflowEvent> {

330-330: Typographical Error in Log Message

The log message includes an extra quote at the end. Remove the unnecessary quote for correctness.

Apply this diff to correct the log message:

     this.logger.debug(`Interrupt "${workflowName}": ${interrupt}"`);
-    this.logger.debug(`Interrupt "${workflowName}": ${interrupt}"`);
+    this.logger.debug(`Interrupt "${workflowName}": ${interrupt}`);

334-334: Redundant Log Message

The log message repeats the interrupt information already logged previously. Consider removing or combining it to reduce verbosity.

Apply this diff to update the log message:

     this.logger.info(`Persist records for "${workflowName}": ${interrupt}"`);
-    this.logger.info(`Persist records for "${workflowName}": ${interrupt}"`);
+    this.logger.info(`Persisting records for "${workflowName}": ${interrupt}`);

378-379: Remove Debugging Statements

The console.log({ event }); statement appears to be left over from debugging. Consider removing it or replacing it with a proper logging statement.

Apply this diff to remove the debugging statement:

     this.workerManager.destroyWorker(workflowName, runId);
-    console.log({ event });

Or, if logging is necessary, use the logger:

+    this.logger.debug(`Event: ${JSON.stringify(event)}`);

439-443: Use of Deprecated Logger Method

In the checkIfRunHasStopped function, logger.error is used to log a JSON string. Ensure that the logger method supports JSON objects or stringify the message properly.

Apply this diff to ensure proper logging:

           logger.error(
               `bad logs: ${
-                  JSON.stringify(
-                      run.operations.map(({ event }) => event.type),
-                  )
+                  run.operations.map(({ event }) => event.type).join(", ")
               }`,
           );

456-460: Duplicate Code Block

The block of code logging bad logs appears twice in the function. Consider refactoring to avoid duplication.

src/typegate/src/runtimes/patterns/worker_manager/types.ts (1)

6-8: BaseMessage Interface Should Be Extensible

The BaseMessage interface currently only includes a type property. Consider adding an index signature to allow for additional properties in extended interfaces.

Apply this diff to make BaseMessage extensible:

 export interface BaseMessage {
     type: string;
+    [key: string]: unknown;
 }
src/typegate/src/runtimes/deno/types.ts (1)

19-21: Optional exception Property in DenoEvent

In the DenoEvent type, the exception property is marked as Error | undefined. Consider using Error only, and when there's no exception, omit the property, to simplify the type definition.

Apply this diff to adjust the type:

 export type DenoEvent =
     | { type: "SUCCESS"; result: unknown }
-    | { type: "FAILURE"; error: string; exception: Error | undefined };
+    | { type: "FAILURE"; error: string; exception?: Error };
src/typegate/src/typegate/artifacts/mod.ts (1)

198-217: Consider adding input validation and file permissions.

While the implementation is solid, consider these improvements:

  1. Validate input parameters (empty strings, null values)
  2. Set appropriate file permissions for security
   async getInlineArtifact(
     tgName: string,
     code: string,
     ext: string,
     transform = (code: string) => code,
   ) {
+    if (!tgName?.trim() || !code?.trim() || !ext?.trim()) {
+      throw new Error('Invalid input: tgName, code, and ext must not be empty');
+    }
     const hash = await sha256(code);
     const path = resolve(
       this.persistence.dirs.cache,
       "inline",
       tgName,
       hash + ext,
     );
     if (await exists(path)) {
       return path;
     }
     await Deno.mkdir(dirname(path), { recursive: true });
-    await Deno.writeTextFile(path, transform(code));
+    await Deno.writeTextFile(path, transform(code), { mode: 0o644 });
     return path;
   }
tests/runtimes/deno/deno_test.ts (1)

42-63: Improve documentation for removed test.

The comment "no worker reuse" could be more descriptive about why this functionality was removed and its implications.

-    // -- no worker reuse...
+    // Test removed: Worker reuse is no longer supported in the new unified worker management system.
+    // Each workflow execution now uses a fresh worker instance to ensure isolation.
tests/runtimes/substantial/common.ts (1)

Line range hint 626-667: Add comments explaining the filter expression logic.

While the filter implementation is correct and type-safe, the complex nested OR/AND conditions would benefit from documentation explaining:

  • The purpose of each condition
  • The expected matching behavior
  • Why these specific test cases were chosen
 filter: {
   or: [
     {
+      // Match completed runs containing "substantial"
       and: [
         { status: { contains: JSON.stringify("COMPL") } },
         { contains: JSON.stringify("substantial") },
       ],
     },
     {
+      // Match exact value "Bump typegraph v3 => v4" using double negation
       not: {
         not: { eq: JSON.stringify("Bump typegraph v3 => v4") },
       },
     },
   ],
 } satisfies Expr,
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3d8dac2 and 65e127e.

⛔ Files ignored due to path filters (1)
  • deno.lock is excluded by !**/*.lock
📒 Files selected for processing (19)
  • src/typegate/src/runtimes/deno/deno.ts (8 hunks)
  • src/typegate/src/runtimes/deno/deno_messenger.ts (0 hunks)
  • src/typegate/src/runtimes/deno/types.ts (1 hunks)
  • src/typegate/src/runtimes/deno/worker.ts (1 hunks)
  • src/typegate/src/runtimes/deno/worker_manager.ts (1 hunks)
  • src/typegate/src/runtimes/patterns/messenger/async_messenger.ts (0 hunks)
  • src/typegate/src/runtimes/patterns/messenger/lazy_async_messenger.ts (0 hunks)
  • src/typegate/src/runtimes/patterns/messenger/types.ts (0 hunks)
  • src/typegate/src/runtimes/patterns/worker_manager/deno.ts (1 hunks)
  • src/typegate/src/runtimes/patterns/worker_manager/mod.ts (1 hunks)
  • src/typegate/src/runtimes/patterns/worker_manager/types.ts (1 hunks)
  • src/typegate/src/runtimes/substantial.ts (2 hunks)
  • src/typegate/src/runtimes/substantial/agent.ts (15 hunks)
  • src/typegate/src/runtimes/substantial/types.ts (1 hunks)
  • src/typegate/src/runtimes/substantial/worker.ts (3 hunks)
  • src/typegate/src/runtimes/substantial/workflow_worker_manager.ts (3 hunks)
  • src/typegate/src/typegate/artifacts/mod.ts (1 hunks)
  • tests/runtimes/deno/deno_test.ts (1 hunks)
  • tests/runtimes/substantial/common.ts (7 hunks)
💤 Files with no reviewable changes (4)
  • src/typegate/src/runtimes/patterns/messenger/lazy_async_messenger.ts
  • src/typegate/src/runtimes/deno/deno_messenger.ts
  • src/typegate/src/runtimes/patterns/messenger/types.ts
  • src/typegate/src/runtimes/patterns/messenger/async_messenger.ts
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: lint-compat (macos-14, aarch64-apple-darwin, false)
  • GitHub Check: lint-compat (macos-13, x86_64-apple-darwin, false)
  • GitHub Check: test-full
  • GitHub Check: pre-commit
🔇 Additional comments (20)
src/typegate/src/runtimes/substantial/agent.ts (9)

267-267: Ensure TaskContext is Passed Correctly

When triggering the start in workerManager, ensure that taskContext is correctly initialized and passed to avoid runtime errors.


375-375: Consistent Naming: 'Graceful' Spelling Correction

The word "Gracefull" is misspelled. It should be "Graceful" for consistency and correctness.

[typo]

Apply this diff to correct the spelling:

     async #workflowHandleGracefullCompletion(
+    async #workflowHandleGracefulCompletion(

Also, update any references to this method accordingly.


383-385: Sensitive Data in Logs

Logging the entire result object might expose sensitive data. Consider sanitizing the output or reducing the verbosity.

[security]

Apply this diff to modify the logging:

     this.logger.info(
-        `gracefull completion of "${runId}" (${event.type}): ${JSON.stringify(result)} started at "${startedAt}"`,
+        `Graceful completion of "${runId}" (${event.type}) started at "${startedAt}"`
     );

404-404: Typographical Error in Log Message

There's an extra quotation mark at the end of the log message. Remove it for correctness.

[typo]

Apply this diff to correct the log message:

     this.logger.info(
-        `Persist finalized records for "${workflowName}": ${result}" and closing everything..`,
+        `Persist finalized records for "${workflowName}": ${result} and closing everything..`,
     );

447-447: Typographical Error in Error Message

There's a typo in the error message: "occured" should be "occurred".

[typo]

Apply this diff to correct the typo:

           throw new Error(
-              `"${run.run_id}" has potentially corrupted logs, another run occured yet previous has not stopped`,
+              `"${run.run_id}" has potentially corrupted logs, another run occurred yet the previous has not stopped`,
           );

272-274: ⚠️ Potential issue

Event Handler Return Type Mismatch

In this.#eventResultHandlerFor, ensure that the returned function matches the expected EventHandler<WorkflowEvent> signature.

Apply this diff to ensure the function matches the expected signature:

       this.workerManager.listen(
           next.run_id,
-          this.#eventResultHandlerFor(workflowName, next.run_id),
+          this.#eventResultHandlerFor(workflow.name, next.run_id)
       );

[Note] Replace workflowName with workflow.name to use the correct variable.

Likely invalid or redundant comment.


68-68: ⚠️ Potential issue

String Template Syntax

The template string in the catch block might not interpolate variables correctly due to the usage of double quotes and backticks.

Apply this diff to fix the string interpolation:

       this.logger.warn(
-          `Failed writing log metadata for schedule "${schedule}" (${runId}), skipping it: ${err}`,
+          `Failed writing log metadata for schedule "${schedule}" (${runId}), skipping it: ${err}`
       );

Likely invalid or redundant comment.


13-14: Import Statement: Ensure Correct Import Path

The import path for getTaskNameFromId and EventHandler may need verification. Ensure that the module ../patterns/worker_manager/mod.ts exports these entities correctly, and the path is accurate.

Run the following script to verify the existence of the imported entities:

✅ Verification successful

Import Paths Verified: No Issues Found

The import statements are correct. Both getTaskNameFromId and EventHandler are properly exported from their respective modules, and the relative paths accurately resolve to these locations.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify that getTaskNameFromId and EventHandler are correctly exported.

# Search for the export statements in the codebase
rg 'export.*(getTaskNameFromId|EventHandler)' --type ts

Length of output: 277


17-19: Update Import References after Type Changes

The imports InterruptEvent, WorkflowCompletionEvent, and WorkflowEvent have been added, replacing previous types. Ensure that all references to the old types (Interrupt, Result, WorkerData, WorkflowResult) have been updated accordingly throughout the file.

Run the following script to check for any remaining references to the old types:

src/typegate/src/runtimes/patterns/worker_manager/types.ts (2)

4-4: Type Alias for Consistency

The TaskId type is defined as a string. Ensure that this type alias is used consistently throughout the codebase to represent task identifiers.


10-12: Generic EventHandler Type

The EventHandler type is properly defined as a generic function. Ensure that all event handlers comply with this signature for consistency.

src/typegate/src/runtimes/deno/types.ts (1)

6-9: Clarify TaskSpec Type Usage

The TaskSpec type includes modulePath and functionName. Ensure that paths and names used comply with the expected formats, and consider validating them to prevent errors during runtime.

src/typegate/src/runtimes/substantial/types.ts (3)

14-23: LGTM! Well-structured message type.

The WorkflowMessage type provides a clear contract for workflow execution with proper type discrimination and all necessary fields.


25-38: LGTM! Comprehensive completion event type.

The WorkflowCompletionEvent type effectively handles both success and failure cases with proper error context.


40-44: LGTM! Well-designed event hierarchy.

The event types (InterruptEvent and WorkflowEvent) form a clear and comprehensive event handling system.

Also applies to: 47-54

src/typegate/src/runtimes/substantial/worker.ts (2)

24-29: LGTM! Clear error handling for missing functions.

The error message structure is clear and type-safe with the satisfies keyword.


48-71: Address the "How??" comment and consider improving exception handling.

The exception handling logic is correct, but the comment suggests uncertainty about the implementation. Consider documenting why this approach was chosen or if there's a better way to handle non-Error exceptions.

-                // How??
+                // Note: We only pass Error instances to maintain structured error handling.
+                // Other types of exceptions are intentionally undefined to avoid serialization issues.
✅ Verification successful

The "How??" comment should be replaced with clear documentation about serialization safety.

The current exception handling approach is correct and follows established patterns in the codebase. The suggested comment explains the rationale:

-                // How??
+                // Note: We only pass Error instances to maintain structured error handling.
+                // Other types of exceptions are intentionally undefined to avoid serialization issues.
🏁 Scripts executed

The following scripts were executed for the analysis:

Error: No shell scripts found


Script:

#!/bin/bash
# Find errorToString implementation
ast-grep --pattern 'function errorToString'

# Look for similar exception handling patterns
rg "instanceof Error" -A 2 -B 2

# Search for any comments or patterns related to error serialization
rg "serialization|serialize" --type ts

Length of output: 16754

src/typegate/src/runtimes/substantial.ts (1)

23-28: LGTM! Centralized task ID generation.

The change from Agent.nextId to createTaskId aligns well with the PR's objective of unifying worker management, providing a consistent approach to task ID generation across different runtimes.

Also applies to: 255-255

tests/runtimes/substantial/common.ts (2)

29-38: LGTM! Well-structured type definitions.

The new type definitions enhance type safety and reusability:

  • TestTemplateOptions provides a clean base interface
  • TestTemplateOptionsX adds flexible delay configurations using generics

45-46: LGTM! Consistent test template updates.

The addition of the only parameter with a default value of false across all test templates provides:

  • Selective test execution capability
  • Backward compatibility
  • Consistent interface across templates

Also applies to: 174-175, 350-351, 517-518, 676-677

src/typegate/src/runtimes/deno/worker.ts Outdated Show resolved Hide resolved
src/typegate/src/runtimes/deno/worker.ts Show resolved Hide resolved
src/typegate/src/runtimes/patterns/worker_manager/mod.ts Outdated Show resolved Hide resolved
src/typegate/src/runtimes/deno/deno.ts Show resolved Hide resolved
src/typegate/src/runtimes/deno/deno.ts Outdated Show resolved Hide resolved
src/typegate/src/runtimes/deno/types.ts Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
src/typegate/src/runtimes/patterns/worker_manager/mod.ts (1)

123-127: ⚠️ Potential issue

Memory leak in destroyWorker persists

The startedAt record for the task is still not deleted after the worker is destroyed, which could lead to a memory leak.

Apply this diff to fix the memory leak:

  task.worker.destroy();
  this.#activeTasks.delete(taskId);
  this.#tasksByName.get(name)!.delete(taskId);
- // startedAt records are not deleted
+ this.#startedAt.delete(taskId);
🧹 Nitpick comments (3)
src/typegate/src/runtimes/deno/deno.ts (1)

263-267: Consider caching entryModulePath for performance optimization

The TODO comment at line 263 suggests caching entryModulePath. Implementing caching can reduce redundant file system operations, improve performance, and prevent unnecessary disk access when retrieving module paths.

src/typegate/src/runtimes/patterns/worker_manager/mod.ts (2)

9-18: Well-designed abstract worker interface!

The BaseWorker abstract class provides a clean interface for worker implementations with proper type safety through generics. The abstract methods cover all essential operations: listening, sending messages, destroying workers, and ID access.

Consider adding:

  1. A status property to track worker lifecycle states
  2. Error handling methods for worker failures
  3. Health check mechanism for long-running workers

61-69: Improve error handling in getInitialTimeStartedAt

The error message could be more helpful by including the available task IDs for debugging.

Apply this diff to enhance the error message:

  if (!startedAt) {
+   const availableTaskIds = Array.from(this.#startedAt.keys()).join(', ');
    throw new Error(
-     `Invalid state: cannot find initial time for task "${taskId}"`,
+     `Invalid state: cannot find initial time for task "${taskId}". Available tasks: [${availableTaskIds}]`,
    );
  }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 65e127e and 6c00c14.

📒 Files selected for processing (4)
  • src/typegate/src/runtimes/deno/deno.ts (8 hunks)
  • src/typegate/src/runtimes/deno/worker.ts (1 hunks)
  • src/typegate/src/runtimes/patterns/worker_manager/mod.ts (1 hunks)
  • src/typegate/src/runtimes/substantial/workflow_worker_manager.ts (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: lint-compat (macos-14, aarch64-apple-darwin, false)
  • GitHub Check: lint-compat (macos-13, x86_64-apple-darwin, false)
  • GitHub Check: bulid-docker (linux/amd64, custom-ubuntu-large)
  • GitHub Check: test-full
  • GitHub Check: pre-commit
🔇 Additional comments (8)
src/typegate/src/runtimes/deno/deno.ts (2)

158-158: Ensure proper resource cleanup by calling workerManager.deinit()

The deinit() method currently has await this.workerManager.deinit(); commented out. To prevent potential resource leaks and ensure proper cleanup of workers, consider uncommenting this line or implementing the necessary cleanup logic within the WorkerManager.


369-373: Review security implications when generating code in exportInlineFunction()

The exportInlineFunction() function appends code without sanitizing the input code. If code contains malicious content, it could introduce security vulnerabilities. Ensure that the code parameter is properly sanitized or originates from a trusted source to prevent code injection attacks.

src/typegate/src/runtimes/deno/worker.ts (1)

40-44: Avoid including raw exception object in failure message

Including the raw exception object e in the failure message may lead to serialization errors or unintended exposure of sensitive information. It's safer to include only the error message or a sanitized version of the error.

src/typegate/src/runtimes/substantial/workflow_worker_manager.ts (1)

Line range hint 22-99: Refactoring enhances modularity and maintainability

Extending BaseWorkerManager and refactoring methods like destroyWorker, isOngoing, and listen improves code reuse and readability. The changes streamline worker management and align with best practices.

src/typegate/src/runtimes/patterns/worker_manager/mod.ts (4)

1-8: LGTM! Proper license header and imports.

The file includes the appropriate MPL-2.0 license header and necessary imports for logging and type definitions.


74-74: Remove or implement the TODO comment

The TODO comment "TODO inline" needs to be addressed. If inlining is necessary, it should be implemented; if not, the comment should be removed.

Would you like me to help implement the inlining of the addWorker call?


145-149: LGTM! Task ID creation with proper sanitization

The implementation now correctly sanitizes the name by replacing the delimiter pattern, addressing the previous concern about task ID parsing errors.


151-159: Verify "unreachable" comment and improve error handling

The comment states this is "unreachable", but we should verify this claim and improve error handling.

Let's verify this by checking all task ID creation points:

Consider adding validation in createTaskId to ensure this truly is unreachable:

export function createTaskId(name: string) {
+ if (!name) {
+   throw new Error("Task name cannot be empty");
+ }
  const uuid = crypto.randomUUID();
  const sanitizedName = name.replace(/_::_/g, "__");
  return `${sanitizedName}_::_${uuid}`;
}
✅ Verification successful

"unreachable" comment is correct, but error message could be improved

The comment is accurate as all TaskIds are created through createTaskId, which ensures proper format by sanitizing the name and always providing a UUID. However, consider updating the error message to:

throw new Error(`Internal error: malformed task ID "${taskId}". Please report this as a bug.`);
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for all createTaskId calls to verify task ID format
echo "Searching for createTaskId calls:"
rg "createTaskId\(" -A 2

# Search for direct TaskId assignments to catch any manual creation
echo "Searching for TaskId type usage:"
ast-grep --pattern 'const $var: TaskId = $value'

Length of output: 1025


Script:

#!/bin/bash
# Find TaskId type definition and any other potential creation points
echo "Searching for TaskId type definition:"
rg "type.*TaskId"

echo -e "\nSearching for string literals with _::_:"
rg '"[^"]*_::_[^"]*"'

echo -e "\nChecking if TaskId is exported/imported:"
rg "export.*TaskId|import.*TaskId"

Length of output: 1449

Copy link
Contributor

@michael-0acf4 michael-0acf4 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work!

@Natoandro Natoandro merged commit 0f2c8fa into main Jan 11, 2025
12 of 13 checks passed
@Natoandro Natoandro deleted the met-667-worker-manager-2 branch January 11, 2025 03:11
Yohe-Am pushed a commit that referenced this pull request Jan 15, 2025
<!--
Pull requests are squashed and merged using:
- their title as the commit message
- their description as the commit body

Having a good title and description is important for the users to get
readable changelog.
-->

<!-- 1. Explain WHAT the change is about -->

- Solves
[MET-667](https://linear.app/metatypedev/issue/MET-667/gate-unify-the-worker-manager-between-workflows-and-runtime)
  - [x] `BaseWorkerManager`
  - [x] Use in Deno runtime
  - [ ] ~Use in Python runtime~ _(followup PR)_
  - [ ] ~Use in Rust runtime~ _(followup PR)_
  - [ ] ~Worker pooling~ _(followup PR)_

<!-- 2. Explain WHY the change cannot be made simpler -->

<!-- 3. Explain HOW users should update their code -->

---

- [ ] The change comes with new or modified tests
- [ ] Hard-to-understand functions have explanatory comments
- [ ] End-user documentation is updated to reflect the change

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

Based on the comprehensive summary, here are the updated release notes:

- **New Features**
- Enhanced worker management system with improved task tracking and
execution.
- Introduced new `WorkerManager` for more robust Deno runtime
operations.
  - Added support for inline artifact generation and management.
- New asynchronous method `getInlineArtifact` in the `ArtifactStore`
class.

- **Improvements**
- Streamlined messaging and event handling across different runtime
components.
  - Improved error reporting and task lifecycle management.
  - Refined type definitions for better type safety.

- **Breaking Changes**
  - Removed `DenoMessenger` and `LazyAsyncMessenger` classes.
  - Restructured workflow event and message handling.
  - Updated task ID generation mechanism.

- **Performance**
  - Optimized worker initialization and task execution.
  - Introduced more efficient task tracking and resource management.

- **Bug Fixes**
  - Improved error handling in worker and runtime environments.
  - Enhanced message communication between workers and main thread.
  - Removed outdated test cases to focus on relevant functionality.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants