From 0f2c8fae17f4164b8c061a342aa2d05afe584567 Mon Sep 17 00:00:00 2001 From: Natoandro Date: Sat, 11 Jan 2025 06:11:21 +0300 Subject: [PATCH] feat: Unify worker manager (#954) - Solves [MET-667](https://linear.app/metatypedev/issue/MET-667/gate-unify-the-worker-manager-between-workflows-and-runtime) - [x] `BaseWorkerManager` - [x] Use in Deno runtime - [ ] ~Use in Python runtime~ _(followup PR)_ - [ ] ~Use in Rust runtime~ _(followup PR)_ - [ ] ~Worker pooling~ _(followup PR)_ #### Migration notes --- - [ ] The change comes with new or modified tests - [ ] Hard-to-understand functions have explanatory comments - [ ] End-user documentation is updated to reflect the change ## Summary by CodeRabbit ## Summary by CodeRabbit Based on the comprehensive summary, here are the updated release notes: - **New Features** - Enhanced worker management system with improved task tracking and execution. - Introduced new `WorkerManager` for more robust Deno runtime operations. - Added support for inline artifact generation and management. - New asynchronous method `getInlineArtifact` in the `ArtifactStore` class. - **Improvements** - Streamlined messaging and event handling across different runtime components. - Improved error reporting and task lifecycle management. - Refined type definitions for better type safety. - **Breaking Changes** - Removed `DenoMessenger` and `LazyAsyncMessenger` classes. - Restructured workflow event and message handling. - Updated task ID generation mechanism. - **Performance** - Optimized worker initialization and task execution. - Introduced more efficient task tracking and resource management. - **Bug Fixes** - Improved error handling in worker and runtime environments. - Enhanced message communication between workers and main thread. - Removed outdated test cases to focus on relevant functionality. --- deno.lock | 1 + src/typegate/src/runtimes/deno/deno.ts | 122 ++++++----- .../src/runtimes/deno/deno_messenger.ts | 74 ------- src/typegate/src/runtimes/deno/types.ts | 21 ++ src/typegate/src/runtimes/deno/worker.ts | 146 ++++--------- .../src/runtimes/deno/worker_manager.ts | 77 +++++++ .../patterns/messenger/async_messenger.ts | 148 ------------- .../messenger/lazy_async_messenger.ts | 114 ---------- .../src/runtimes/patterns/messenger/types.ts | 20 -- .../runtimes/patterns/worker_manager/deno.ts | 66 ++++++ .../runtimes/patterns/worker_manager/mod.ts | 159 ++++++++++++++ .../runtimes/patterns/worker_manager/types.ts | 12 + src/typegate/src/runtimes/substantial.ts | 9 +- .../src/runtimes/substantial/agent.ts | 178 +++++++-------- .../src/runtimes/substantial/types.ts | 67 +++--- .../src/runtimes/substantial/worker.ts | 75 ++++--- .../substantial/workflow_worker_manager.ts | 205 +++--------------- src/typegate/src/typegate/artifacts/mod.ts | 21 ++ tests/runtimes/deno/deno_sync_test.ts | 43 ++-- tests/runtimes/deno/deno_test.ts | 43 ++-- tests/runtimes/substantial/common.ts | 113 +++++----- 21 files changed, 770 insertions(+), 944 deletions(-) delete mode 100644 src/typegate/src/runtimes/deno/deno_messenger.ts create mode 100644 src/typegate/src/runtimes/deno/types.ts create mode 100644 src/typegate/src/runtimes/deno/worker_manager.ts delete mode 100644 src/typegate/src/runtimes/patterns/messenger/async_messenger.ts delete mode 100644 src/typegate/src/runtimes/patterns/messenger/lazy_async_messenger.ts delete mode 100644 src/typegate/src/runtimes/patterns/messenger/types.ts create mode 100644 src/typegate/src/runtimes/patterns/worker_manager/deno.ts create mode 100644 src/typegate/src/runtimes/patterns/worker_manager/mod.ts create mode 100644 src/typegate/src/runtimes/patterns/worker_manager/types.ts diff --git a/deno.lock b/deno.lock index a597a8792..f325caf56 100644 --- a/deno.lock +++ b/deno.lock @@ -8,6 +8,7 @@ "jsr:@std/assert@^0.221.0": "jsr:@std/assert@0.221.0", "jsr:@std/assert@^1.0.6": "jsr:@std/assert@1.0.9", "jsr:@std/assert@^1.0.9": "jsr:@std/assert@1.0.9", + "jsr:@std/async@1.0.9": "jsr:@std/async@1.0.9", "jsr:@std/async@^1.0.3": "jsr:@std/async@1.0.9", "jsr:@std/bytes@^0.221.0": "jsr:@std/bytes@0.221.0", "jsr:@std/bytes@^1.0.2": "jsr:@std/bytes@1.0.4", diff --git a/src/typegate/src/runtimes/deno/deno.ts b/src/typegate/src/runtimes/deno/deno.ts index c98ad0f50..6fde12200 100644 --- a/src/typegate/src/runtimes/deno/deno.ts +++ b/src/typegate/src/runtimes/deno/deno.ts @@ -18,7 +18,6 @@ import type { } from "../../typegraph/types.ts"; import * as ast from "graphql/ast"; import { InternalAuth } from "../../services/auth/protocols/internal.ts"; -import { DenoMessenger } from "./deno_messenger.ts"; import type { Task } from "./shared_types.ts"; import { path } from "compress/deps.ts"; import { globalConfig as config } from "../../config.ts"; @@ -27,6 +26,7 @@ import { PolicyResolverOutput } from "../../engine/planner/policies.ts"; import { getInjectionValues } from "../../engine/planner/injection_utils.ts"; import DynamicInjection from "../../engine/injection/dynamic.ts"; import { getLogger } from "../../log.ts"; +import { WorkerManager } from "./worker_manager.ts"; const logger = getLogger(import.meta); @@ -37,7 +37,8 @@ const predefinedFuncs: Record>> = { allow: () => "ALLOW" as PolicyResolverOutput, deny: () => "DENY" as PolicyResolverOutput, pass: () => "PASS" as PolicyResolverOutput, - internal_policy: ({ _: { context } }) => context.provider === "internal" ? "ALLOW" : "PASS" as PolicyResolverOutput, + internal_policy: ({ _: { context } }) => + context.provider === "internal" ? "ALLOW" : "PASS" as PolicyResolverOutput, }; export class DenoRuntime extends Runtime { @@ -46,8 +47,7 @@ export class DenoRuntime extends Runtime { uuid: string, private tg: TypeGraphDS, private typegate: Typegate, - private w: DenoMessenger, - private registry: Map, + private workerManager: WorkerManager, private secrets: Record, ) { super(typegraphName, uuid); @@ -138,28 +138,16 @@ export class DenoRuntime extends Runtime { } } - const w = new DenoMessenger( - name, - { - ...(args.permissions ?? {}), - read: [basePath], - } as Deno.PermissionOptionsObject, - false, - ops, - typegate.config.base, - ); - - if (Deno.env.get("DENO_TESTING") === "true") { - w.disableLazyness(); - } + const workerManager = new WorkerManager({ + timeout_ms: typegate.config.base.timer_max_timeout_ms, + }); const rt = new DenoRuntime( typegraphName, uuid, tg, typegate, - w, - registry, + workerManager, secrets, ); @@ -167,7 +155,7 @@ export class DenoRuntime extends Runtime { } async deinit(): Promise { - await this.w.terminate(); + // await this.workerManager.deinit(); } materialize( @@ -257,7 +245,10 @@ export class DenoRuntime extends Runtime { const modMat = this.tg.materializers[mat.data.mod as number]; const entryPoint = this.tg.meta.artifacts[modMat.data.entryPoint as string]; - const op = this.registry.get(entryPoint.hash)!; + const depMetas = (modMat.data.deps as string[]).map((dep) => + createArtifactMeta(this.typegraphName, this.tg.meta.artifacts[dep]) + ); + const moduleMeta = createArtifactMeta(this.typegraphName, entryPoint); return async ({ _: { @@ -269,33 +260,33 @@ export class DenoRuntime extends Runtime { }) => { const token = await InternalAuth.emit(this.typegate.cryptoKeys); - return await this.w.execute( - op, + // TODO cache?? + const entryModulePath = await this.typegate.artifactStore.getLocalPath( + moduleMeta, + depMetas, + ); + + return await this.workerManager.callFunction( + mat.data.name as string, + entryModulePath, + entryPoint.path, + args, { - type: "import_func", - args, - internals: { - parent, - context, - secrets, - effect: mat.effect.effect ?? null, - meta: { - url: `${url.protocol}//${url.host}/${this.typegraphName}`, - token, - }, - headers, + parent, + context, + secrets, + effect: mat.effect.effect ?? null, + meta: { + url: `${url.protocol}//${url.host}/${this.typegraphName}`, + token, }, - name: mat.data.name as string, - verbose, + headers, }, - [], - pulseCount, ); }; } if (mat.name === "function") { - const op = this.registry.get(mat.data.script as string)!; return async ({ _: { context, @@ -306,26 +297,29 @@ export class DenoRuntime extends Runtime { }) => { const token = await InternalAuth.emit(this.typegate.cryptoKeys); - return await this.w.execute( - op, + const modulePath = await this.typegate.artifactStore.getInlineArtifact( + this.typegraphName, + mat.data.script as string, + ".ts", + exportInlineFunction("inlineFunction"), + ); + + return await this.workerManager.callFunction( + "inlineFunction", + modulePath, + "tg", + args, { - type: "func", - args, - internals: { - parent, - context, - secrets, - effect: mat.effect.effect ?? null, - meta: { - url: `${url.protocol}//${url.host}/${this.typegraphName}`, - token, - }, - headers, + parent, + context, + secrets, + effect: mat.effect.effect ?? null, + meta: { + url: `${url.protocol}//${url.host}/${this.typegraphName}`, + token, }, - verbose, + headers, }, - [], - pulseCount, ); }; } @@ -365,6 +359,18 @@ export class DenoRuntime extends Runtime { } } +function exportInlineFunction(name = "fn", symbol = "_my_lambda") { + if (!name.match(/^[a-zA-Z_][a-zA-Z0-9_]*$/)) { + throw new Error(`Invalid identifier: ${name}`); + } + if (!symbol.match(/^[a-zA-Z_][a-zA-Z0-9_]*$/)) { + throw new Error(`Invalid identifier: ${symbol}`); + } + return (code: string) => { + return `${code}\nexport const ${name} = ${symbol};`; + }; +} + function getInjectionData(d: InjectionData) { if ("value" in d) { return d.value; diff --git a/src/typegate/src/runtimes/deno/deno_messenger.ts b/src/typegate/src/runtimes/deno/deno_messenger.ts deleted file mode 100644 index 27050f26e..000000000 --- a/src/typegate/src/runtimes/deno/deno_messenger.ts +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0. -// SPDX-License-Identifier: MPL-2.0 - -import * as Sentry from "sentry"; -import { envSharedWithWorkers } from "../../config/shared.ts"; -import type { Task } from "./shared_types.ts"; -import { - type AsyncMessengerConfig, - LazyAsyncMessenger, -} from "../patterns/messenger/lazy_async_messenger.ts"; - -export class DenoMessenger extends LazyAsyncMessenger { - constructor( - name: string, - permissions: Deno.PermissionOptionsObject, - lazy: boolean, - ops: Map, - config: AsyncMessengerConfig, - ) { - super( - (receive) => { - const worker = new Worker(import.meta.resolve("./worker.ts"), { - type: "module", - deno: { - namespace: false, - permissions: { - // overrideable default permissions - hrtime: false, - net: true, - // on request permissions - read: false, // default read permission - ...permissions, - // non-overridable permissions (security between typegraphs) - run: false, - write: false, - ffi: false, - env: envSharedWithWorkers, // use secrets on the materializer instead - }, - }, - } as WorkerOptions); - worker.onmessage = async (event) => { - await receive(event.data); - }; - worker.onerror = (error) => { - console.error(error.message); - Sentry.captureException(error.message); - throw error; - }; - worker.onmessageerror = (error) => { - console.error(error); - Sentry.captureException(error); - throw error; - }; - - worker.postMessage({ - name, - }); - return worker; - }, - ops, - (broker, message) => { - broker.postMessage(message); - }, - (broker) => { - broker.terminate(); - }, - config, - ); - - if (lazy) { - this.enableLazyness(); - } - } -} diff --git a/src/typegate/src/runtimes/deno/types.ts b/src/typegate/src/runtimes/deno/types.ts new file mode 100644 index 000000000..ede8ec480 --- /dev/null +++ b/src/typegate/src/runtimes/deno/types.ts @@ -0,0 +1,21 @@ +// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0. +// SPDX-License-Identifier: MPL-2.0 + +import { TaskContext } from "./shared_types.ts"; + +export type TaskSpec = { + modulePath: string; + functionName: string; +}; + +export type DenoMessage = { + type: "CALL"; + modulePath: string; + functionName: string; + args: unknown; + internals: TaskContext; +}; + +export type DenoEvent = + | { type: "SUCCESS"; result: unknown } + | { type: "FAILURE"; error: string; exception: Error | undefined }; diff --git a/src/typegate/src/runtimes/deno/worker.ts b/src/typegate/src/runtimes/deno/worker.ts index 424e29537..5ac3498d9 100644 --- a/src/typegate/src/runtimes/deno/worker.ts +++ b/src/typegate/src/runtimes/deno/worker.ts @@ -1,116 +1,54 @@ // Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0. // SPDX-License-Identifier: MPL-2.0 -// bring the unstable WorkerOptions api into scope -/// - -import { getLogger } from "../../log.ts"; -import { make_internal } from "../../worker_utils.ts"; -import type { Answer, Message } from "../patterns/messenger/types.ts"; import { toFileUrl } from "@std/path/to-file-url"; - -import type { - FuncTask, - ImportFuncTask, - RegisterFuncTask, - RegisterImportFuncTask, - Task, - TaskExec, -} from "./shared_types.ts"; - -let logger = getLogger("worker"); - -let initData = null as unknown as { name: string }; - -type TaskModule = Record; -const registry: Map = new Map(); +import { DenoMessage } from "./types.ts"; +import { errorToString, make_internal } from "../../worker_utils.ts"; const isTest = Deno.env.get("DENO_TESTING") === "true"; + const additionalHeaders = isTest ? { connection: "close" } : { connection: "keep-alive" }; -async function import_func(op: number, task: ImportFuncTask) { - const { name, args, internals, verbose } = task; - - if (!registry.has(op)) { - throw new Error(`no module registered with id ${op}`); - } - - verbose && logger.info(`exec func "${name}" from module ${op}`); - const mod = registry.get(op)! as TaskModule; - if (name in mod && typeof mod[name] === "function") { - return await mod[name]( - args, - internals, - make_internal(internals, additionalHeaders), - ); - } - throw new Error(`"${name}" is not a valid method`); -} - -async function func(op: number, task: FuncTask) { - const { args, internals, verbose } = task; - - if (!registry.has(op)) { - throw new Error(`no function registered with id ${op}`); - } - - verbose && logger.info(`exec func "${op}"`); - const fn = registry.get(op)! as TaskExec; - return await fn(args, internals, make_internal(internals, additionalHeaders)); -} - -async function register_import_func(_: null, task: RegisterImportFuncTask) { - const { modulePath, verbose, op } = task; - verbose && - logger.info(`register import func "${op}" from "${modulePath.toString()}`); - - registry.set(op, await import(toFileUrl(modulePath).toString())); -} - -function register_func(_: null, task: RegisterFuncTask) { - const { fnCode, verbose, op } = task; - verbose && logger.info(`register func "${op}"`); - - registry.set( - op, - new Function(`"use strict"; ${fnCode}; return _my_lambda;`)(), - ); -} - -const taskList: any = { - register_func, - register_import_func, - import_func, - func, -}; - -function answer(res: Answer) { - self.postMessage(res); -} - -self.onmessage = async (event: MessageEvent>) => { - if (initData == null) { - initData = event.data as typeof initData; - logger = getLogger(`worker (${initData.name})`); - return; - } - - const { id, op, data: task } = event.data; - const exec = taskList[task.type]; - - if (exec == null) { - const error = `unsupported operation found "${op}"`; - logger.error(error); - answer({ id, error }); - } - - try { - const data = await exec(op, task); - answer({ id, data }); - } catch (err) { - logger.error(err); - answer({ id, error: String(err) }); +self.onmessage = async function (event: MessageEvent) { + const { type, modulePath, functionName, args, internals } = event.data; + switch (type) { + case "CALL": { + const module = await import(toFileUrl(modulePath).toString()); + const fn = module[functionName]; + + if (typeof fn !== "function") { + postMessage({ + type: "FAILURE", + error: `Function "${functionName}" not found`, + }); + return; + } + + try { + const result = await fn( + args, + internals, + make_internal(internals, additionalHeaders), + ); + self.postMessage({ + type: "SUCCESS", + result, + }); + } catch (e) { + self.postMessage({ + type: "FAILURE", + error: errorToString(e), + exception: e, + }); + } + + break; + } + + default: + // unreachable + throw new Error(`Unknown message type: ${type}`); } }; diff --git a/src/typegate/src/runtimes/deno/worker_manager.ts b/src/typegate/src/runtimes/deno/worker_manager.ts new file mode 100644 index 000000000..7b16fca1c --- /dev/null +++ b/src/typegate/src/runtimes/deno/worker_manager.ts @@ -0,0 +1,77 @@ +// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0. +// SPDX-License-Identifier: MPL-2.0 + +import { getLogger } from "../../log.ts"; +import { DenoWorker } from "../patterns/worker_manager/deno.ts"; +import { + BaseWorkerManager, + createTaskId, +} from "../patterns/worker_manager/mod.ts"; +import { TaskId } from "../patterns/worker_manager/types.ts"; +import { TaskContext } from "./shared_types.ts"; +import { DenoEvent, DenoMessage, TaskSpec } from "./types.ts"; + +const logger = getLogger(import.meta, "WARN"); + +export type WorkerManagerConfig = { + timeout_ms: number; +}; + +export class WorkerManager + extends BaseWorkerManager { + constructor(private config: WorkerManagerConfig) { + super( + (taskId: TaskId) => { + return new DenoWorker(taskId, import.meta.resolve("./worker.ts")); + }, + ); + } + + callFunction( + name: string, + modulePath: string, + relativeModulePath: string, + args: unknown, + internalTCtx: TaskContext, + ) { + const taskId = createTaskId(`${name}@${relativeModulePath}`); + this.createWorker(name, taskId, { + modulePath, + functionName: name, + }); + this.sendMessage(taskId, { + type: "CALL", + modulePath, + functionName: name, + args, + internals: internalTCtx, + }); + + return new Promise((resolve, reject) => { + const timeoutId = setTimeout(() => { + this.destroyWorker(name, taskId); + reject(new Error(`${this.config.timeout_ms}ms timeout exceeded`)); + }, this.config.timeout_ms); + + const handler: (event: DenoEvent) => void = (event) => { + clearTimeout(timeoutId); + this.destroyWorker(name, taskId); + switch (event.type) { + case "SUCCESS": + resolve(event.result); + break; + case "FAILURE": + reject(event.exception ?? event.error); + break; + } + }; + + const { worker } = this.getTask(taskId); + worker.listen(handler); + }); + } + + override logMessage(taskId: TaskId, msg: DenoMessage) { + logger.info(`Task "${taskId}" received message: ${msg.type}`); + } +} diff --git a/src/typegate/src/runtimes/patterns/messenger/async_messenger.ts b/src/typegate/src/runtimes/patterns/messenger/async_messenger.ts deleted file mode 100644 index d77a4c360..000000000 --- a/src/typegate/src/runtimes/patterns/messenger/async_messenger.ts +++ /dev/null @@ -1,148 +0,0 @@ -// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0. -// SPDX-License-Identifier: MPL-2.0 - -import { getLogger } from "../../../log.ts"; -import type { Answer, Message, TaskData } from "./types.ts"; -import { maxi32 } from "../../../utils.ts"; -import type { TypegateConfigBase } from "../../../config/types.ts"; - -const logger = getLogger(import.meta); - -export type MessengerStart = ( - receive: (answer: Answer) => Promise, -) => Broker; - -export type MessengerSend = ( - broker: Broker, - data: Message, -) => Promise | void; - -export type MessengerStop = (broker: Broker) => Promise | void; - -export type AsyncMessengerConfig = Readonly< - Pick< - TypegateConfigBase, - | "timer_max_timeout_ms" - | "timer_destroy_resources" - > ->; - -export class AsyncMessenger { - protected broker: Broker; - #counter = 0; - #tasks: Map = new Map(); - #start: MessengerStart; - #send: MessengerSend; - #stop: MessengerStop; - - #timer?: ReturnType; - - #operationQueues: Array>> = [ - [], - [], - ]; - #queueIndex = 0; - - #timeoutSecs: number; - - protected constructor( - start: MessengerStart, - send: MessengerSend, - stop: MessengerStop, - private config: AsyncMessengerConfig, - ) { - this.#start = start; - this.#send = send; - this.#stop = stop; - this.#timeoutSecs = config.timer_max_timeout_ms / 1000; - // init broker - this.broker = start(this.receive.bind(this)); - this.initTimer(); - } - - async terminate(): Promise { - await Promise.all([...this.#tasks.values()].map((t) => t.promise)); - logger.info(`close worker ${this.constructor.name}`); - await this.#stop(this.broker); - clearInterval(this.#timer); - } - - initTimer() { - if (this.#timer === undefined) { - this.#timer = setInterval(() => { - const currentQueue = this.#operationQueues[this.#queueIndex]; - this.#queueIndex = 1 - this.#queueIndex; - - let shouldStop = false; - for (const item of currentQueue) { - if (this.#tasks.has(item.id)) { - if ( - item.remainingPulseCount !== undefined && - item.remainingPulseCount > 0 - ) { - // check again next time if unterminated - item.remainingPulseCount -= 1; - continue; - } - // default behavior or 0 pulse left - const data = JSON.stringify(item, null, 2); - this.receive({ - id: item.id, - error: `${this.#timeoutSecs}s timeout exceeded: ${data}`, - }); - shouldStop = true; - } - } - - if (shouldStop && this.config.timer_destroy_resources) { - this.#stop(this.broker); - logger.info("reset broker after timeout"); - this.broker = this.#start(this.receive.bind(this)); - } - }, this.config.timer_max_timeout_ms); - } - } - - execute( - op: string | number | null, - data: M, - hooks: Array<() => Promise> = [], - pulseCount = 0, - ): Promise { - const id = this.nextId(); - const promise = Promise.withResolvers(); - this.#tasks.set(id, { promise, hooks }); - - const message = { id, op, data, remainingPulseCount: pulseCount }; - this.#operationQueues[this.#queueIndex].push(message); - void this.#send(this.broker, message); - return promise.promise; - } - - async receive(answer: Answer): Promise { - const { id } = answer; - const { promise, hooks } = this.#tasks.get(id)!; - if (answer.error) { - promise.reject(new Error(answer.error)); - } else { - promise.resolve(answer.data); - } - await Promise.all(hooks.map((h) => h())); - this.#tasks.delete(id); - } - - private nextId(): number { - const n = this.#counter; - this.#counter += 1; - this.#counter %= maxi32; - return n; - } - - get isEmpty(): boolean { - return this.#tasks.size === 0; - } - - get counter(): number { - return this.#counter; - } -} diff --git a/src/typegate/src/runtimes/patterns/messenger/lazy_async_messenger.ts b/src/typegate/src/runtimes/patterns/messenger/lazy_async_messenger.ts deleted file mode 100644 index e645734da..000000000 --- a/src/typegate/src/runtimes/patterns/messenger/lazy_async_messenger.ts +++ /dev/null @@ -1,114 +0,0 @@ -// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0. -// SPDX-License-Identifier: MPL-2.0 - -import { getLogger } from "../../../log.ts"; -import { maxi32 } from "../../../utils.ts"; -import { - AsyncMessenger, - type AsyncMessengerConfig, - type MessengerSend, - type MessengerStart, - type MessengerStop, -} from "./async_messenger.ts"; - -export type { AsyncMessengerConfig }; - -const logger = getLogger(import.meta); - -const inactivityThreshold = 1; -const inactivityIntervalMs = 15_000; - -export class LazyAsyncMessenger - extends AsyncMessenger { - #gcState = 0; - #gcInterval?: number; - #start: MessengerStart; - - #ops: Map; - #loadedOps: Set = new Set(); - - constructor( - start: MessengerStart, - ops: Map, - send: MessengerSend, - stop: MessengerStop, - config: AsyncMessengerConfig, - ) { - const lazySend: MessengerSend = async ( - _, - message, - ) => { - if (!this.broker) { - this.broker = start( - this.receive.bind(this), - ); - } - const { op, remainingPulseCount } = message; - if (op !== null && !this.#loadedOps.has(op)) { - const initOp = this.#ops.get(op); - if (!initOp) { - throw new Error(`unknown op ${op}`); - } - await this.execute( - null, - initOp, - [], - remainingPulseCount, - ); - this.#loadedOps.add(op); - } - await send(this.broker, message); - }; - - const lazyStop: MessengerStop = async (_) => { - clearInterval(this.#gcInterval); - const broker = this.broker; - if (broker) { - this.broker = null; - this.#loadedOps.clear(); - await stop(broker); - } - }; - - super(() => null, lazySend, lazyStop, config); - this.#start = start; - this.#ops = ops; - } - - enableLazyness(): void { - logger.info(`enable laziness`); - clearInterval(this.#gcInterval); - this.#gcInterval = setInterval( - () => this.checkLazyness(), - inactivityIntervalMs, - ); - } - - async checkLazyness(): Promise { - if (!this.broker) { - return; - } - - const activity = (this.counter - this.#gcState + maxi32) % - maxi32; - this.#gcState = this.counter; - - if (activity <= inactivityThreshold && this.isEmpty) { - logger.info( - `lazy close worker ${this.constructor.name}`, - ); - this.broker = null; - await this.terminate(); - } - } - - disableLazyness(): void { - logger.info(`disable laziness`); - clearInterval(this.#gcInterval); - if (!this.broker) { - this.broker = this.#start( - this.receive.bind(this), - ); - } - } -} diff --git a/src/typegate/src/runtimes/patterns/messenger/types.ts b/src/typegate/src/runtimes/patterns/messenger/types.ts deleted file mode 100644 index 202d67a7c..000000000 --- a/src/typegate/src/runtimes/patterns/messenger/types.ts +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0. -// SPDX-License-Identifier: MPL-2.0 - -export interface Message { - id: number; - op: string | number | null; - data: T; - remainingPulseCount?: number; -} - -type PromiseWithResolvers = ReturnType>; - -export type Answer = - | ({ id: number; data: T; error?: never }) - | ({ id: number; data?: never; error: string }); - -export interface TaskData { - promise: PromiseWithResolvers; - hooks: Array<() => void | Promise>; -} diff --git a/src/typegate/src/runtimes/patterns/worker_manager/deno.ts b/src/typegate/src/runtimes/patterns/worker_manager/deno.ts new file mode 100644 index 000000000..b2ec55179 --- /dev/null +++ b/src/typegate/src/runtimes/patterns/worker_manager/deno.ts @@ -0,0 +1,66 @@ +// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0. +// SPDX-License-Identifier: MPL-2.0 + +import { envSharedWithWorkers } from "../../../config/shared.ts"; +import { BaseWorker } from "./mod.ts"; +import { BaseMessage, EventHandler, TaskId } from "./types.ts"; + +export interface DenoWorkerError extends BaseMessage { + type: "WORKER_ERROR"; + event: ErrorEvent; +} + +export type BaseDenoWorkerMessage = BaseMessage | DenoWorkerError; + +export class DenoWorker + extends BaseWorker { + #worker: Worker; + #taskId: TaskId; + constructor(taskId: TaskId, workerPath: string) { + super(); + this.#worker = new Worker(workerPath, { + name: taskId, + type: "module", + deno: { + permissions: { + net: true, + // on request permissions + read: "inherit", // default read permission + sys: "inherit", + // non-overridable permissions (security between typegraphs) + run: false, + write: false, + ffi: false, + env: envSharedWithWorkers, + }, + }, + }); + this.#taskId = taskId; + } + + listen(handlerFn: EventHandler) { + this.#worker.onmessage = async (message) => { + await handlerFn(message.data as E); + }; + + this.#worker.onerror = /*async*/ (event) => + handlerFn( + { + type: "WORKER_ERROR", + event, + } as E, + ); + } + + send(msg: M) { + this.#worker.postMessage(msg); + } + + destroy() { + this.#worker.terminate(); + } + + get id() { + return this.#taskId; + } +} diff --git a/src/typegate/src/runtimes/patterns/worker_manager/mod.ts b/src/typegate/src/runtimes/patterns/worker_manager/mod.ts new file mode 100644 index 000000000..fe06470cb --- /dev/null +++ b/src/typegate/src/runtimes/patterns/worker_manager/mod.ts @@ -0,0 +1,159 @@ +// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0. +// SPDX-License-Identifier: MPL-2.0 + +import { getLogger } from "../../../log.ts"; +import { BaseMessage, EventHandler, TaskId } from "./types.ts"; + +const logger = getLogger(import.meta, "WARN"); + +/** + * `M` is the message type that the worker will receive; + * `E` is the message type that the worker will send back (event). + */ +export abstract class BaseWorker { + abstract listen(handlerFn: EventHandler): void; + abstract send(msg: M): void; + abstract destroy(): void; + abstract get id(): TaskId; +} + +export class BaseWorkerManager< + T, + M extends BaseMessage, + E extends BaseMessage, +> { + #activeTasks: Map; + taskSpec: T; + }> = new Map(); + #tasksByName: Map> = new Map(); + #startedAt: Map = new Map(); + + #workerFactory: (taskId: TaskId) => BaseWorker; + protected constructor(workerFactory: (taskId: TaskId) => BaseWorker) { + this.#workerFactory = workerFactory; + } + + get workerFactory() { + return this.#workerFactory; + } + + protected getActiveTaskNames() { + return Array.from(this.#tasksByName.keys()); + } + + protected hasTask(taskId: TaskId) { + return this.#activeTasks.has(taskId); + } + + protected getTask(taskId: TaskId) { + const task = this.#activeTasks.get(taskId); + if (!task) { + throw new Error(`Task "${taskId}" does not exist or has been completed`); + } + return task; + } + + protected getTasksByName(name: string) { + return this.#tasksByName.get(name) ?? new Set(); + } + + getInitialTimeStartedAt(taskId: TaskId) { + const startedAt = this.#startedAt.get(taskId); + if (!startedAt) { + throw new Error( + `Invalid state: cannot find initial time for task "${taskId}"`, + ); + } + return startedAt; + } + + // allocate worker? + protected createWorker(name: string, taskId: TaskId, taskSpec: T) { + const worker = this.#workerFactory(taskId); + // TODO inline + this.addWorker(name, taskId, worker, taskSpec, new Date()); + } + + protected addWorker( + name: string, + taskId: TaskId, + worker: BaseWorker, + taskSpec: T, + startedAt: Date, + ) { + if (!this.#tasksByName.has(name)) { + this.#tasksByName.set(name, new Set()); + } + + this.#tasksByName.get(name)!.add(taskId); + this.#activeTasks.set(taskId, { worker, taskSpec }); + if (!this.#startedAt.has(taskId)) { + this.#startedAt.set(taskId, startedAt); + } + } + + protected destroyAllWorkers() { + for (const name of this.getActiveTaskNames()) { + this.destroyWorkersByName(name); + } + } + + protected destroyWorkersByName(name: string) { + const taskIds = this.#tasksByName.get(name); + if (taskIds) { + for (const taskId of taskIds) { + this.destroyWorker(name, taskId); + } + return true; + } + return false; + } + + protected destroyWorker(name: string, taskId: TaskId) { + const task = this.#activeTasks.get(taskId); + if (this.#tasksByName.has(name)) { + if (!task) { + logger.warn( + `Task "${taskId}" associated with "${name}" does not exist or has been already destroyed`, + ); + return false; + } + + task.worker.destroy(); + this.#activeTasks.delete(taskId); + this.#tasksByName.get(name)!.delete(taskId); + // startedAt records are not deleted + + return true; + } + + return false; + } + + logMessage(_taskId: TaskId, _msg: M) { + // default implementation is empty + } + + sendMessage(taskId: TaskId, msg: M) { + const { worker } = this.getTask(taskId); + worker.send(msg); + this.logMessage(taskId, msg); + } +} + +export function createTaskId(name: string) { + const uuid = crypto.randomUUID(); + const sanitizedName = name.replace(/_::_/g, "__"); + return `${sanitizedName}_::_${uuid}`; +} + +export function getTaskNameFromId(taskId: TaskId) { + const [name, uuid] = taskId.split("_::_"); + if (!name || !uuid) { + // unreachable + throw new Error(`Fatal: task ID ${taskId} does not respect the convention`); + } + + return name; +} diff --git a/src/typegate/src/runtimes/patterns/worker_manager/types.ts b/src/typegate/src/runtimes/patterns/worker_manager/types.ts new file mode 100644 index 000000000..de417a36c --- /dev/null +++ b/src/typegate/src/runtimes/patterns/worker_manager/types.ts @@ -0,0 +1,12 @@ +// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0. +// SPDX-License-Identifier: MPL-2.0 + +export type TaskId = string; + +export interface BaseMessage { + type: string; +} + +export type EventHandler = ( + message: E, +) => void | Promise; diff --git a/src/typegate/src/runtimes/substantial.ts b/src/typegate/src/runtimes/substantial.ts index 8fada6ea2..336e2372d 100644 --- a/src/typegate/src/runtimes/substantial.ts +++ b/src/typegate/src/runtimes/substantial.ts @@ -20,7 +20,12 @@ import { } from "./substantial/agent.ts"; import { closestWord } from "../utils.ts"; import { InternalAuth } from "../services/auth/protocols/internal.ts"; -import { applyFilter, type Expr, type ExecutionStatus } from "./substantial/filter_utils.ts"; +import { + applyFilter, + type ExecutionStatus, + type Expr, +} from "./substantial/filter_utils.ts"; +import { createTaskId } from "./patterns/worker_manager/mod.ts"; const logger = getLogger(import.meta); @@ -247,7 +252,7 @@ export class SubstantialRuntime extends Runtime { }) => { this.#checkWorkflowExistOrThrow(workflowName); - const runId = Agent.nextId(workflowName); + const runId = createTaskId(workflowName); const schedule = new Date().toJSON(); const token = await InternalAuth.emit(this.typegate.cryptoKeys); diff --git a/src/typegate/src/runtimes/substantial/agent.ts b/src/typegate/src/runtimes/substantial/agent.ts index 3e2691ca7..fe54fccc5 100644 --- a/src/typegate/src/runtimes/substantial/agent.ts +++ b/src/typegate/src/runtimes/substantial/agent.ts @@ -10,14 +10,15 @@ import { } from "../../../engine/runtime.js"; import { getLoggerByAddress, Logger } from "../../log.ts"; import { TaskContext } from "../deno/shared_types.ts"; +import { getTaskNameFromId } from "../patterns/worker_manager/mod.ts"; +import { EventHandler } from "../patterns/worker_manager/types.ts"; import { appendIfOngoing, - Interrupt, - Result, - WorkerData, - WorkflowResult, + InterruptEvent, + WorkflowCompletionEvent, + WorkflowEvent, } from "./types.ts"; -import { RunId, WorkerManager } from "./workflow_worker_manager.ts"; +import { WorkerManager } from "./workflow_worker_manager.ts"; export interface StdKwargs { taskContext: TaskContext; @@ -45,7 +46,7 @@ export class Agent { constructor( private backend: Backend, private queue: string, - private config: AgentConfig + private config: AgentConfig, ) { this.logger = getLoggerByAddress(import.meta, "substantial"); } @@ -64,7 +65,7 @@ export class Agent { }); } catch (err) { this.logger.warn( - `Failed writing log metadata for schedule "${schedule}" (${runId}), skipping it: ${err}` + `Failed writing log metadata for schedule "${schedule}" (${runId}), skipping it: ${err}`, ); } } @@ -97,9 +98,11 @@ export class Agent { this.workflows = workflows; this.logger.warn( - `Initializing agent to handle ${workflows - .map(({ name }) => name) - .join(", ")}` + `Initializing agent to handle ${ + workflows + .map(({ name }) => name) + .join(", ") + }`, ); this.pollIntervalHandle = setInterval(async () => { @@ -138,7 +141,7 @@ export class Agent { for (const workflow of this.workflows) { const requests = replayRequests.filter( - ({ run_id }) => Agent.parseWorkflowName(run_id) == workflow.name + ({ run_id }) => getTaskNameFromId(run_id) == workflow.name, ); while (requests.length > 0) { @@ -149,7 +152,7 @@ export class Agent { await this.#replay(next, workflow); } catch (err) { this.logger.error( - `Replay failed for ${workflow.name} => ${JSON.stringify(next)}` + `Replay failed for ${workflow.name} => ${JSON.stringify(next)}`, ); this.logger.error(err); } finally { @@ -195,7 +198,7 @@ export class Agent { // necessarily represent the state of what is actually running on the current typegate node if (this.workerManager.isOngoing(next.run_id)) { this.logger.warn( - `skip triggering ${next.run_id} for the current tick as it is still ongoing` + `skip triggering ${next.run_id} for the current tick as it is still ongoing`, ); return; @@ -222,7 +225,7 @@ export class Agent { // This may occur if an event is sent but the underlying run already completed // Or does not exist. this.logger.warn( - `Run ${next.run_id} has already stopped, closing schedule` + `Run ${next.run_id} has already stopped, closing schedule`, ); await Meta.substantial.storeCloseSchedule(schedDef); return; @@ -242,9 +245,11 @@ export class Agent { // A consequence of the above, a workflow is always triggered by gql { start(..) } // This can also occur if an event is sent from gql under a runId that is not valid (e.g. due to typo) this.logger.warn( - `First item in the operation list is not a Start, got "${JSON.stringify( - first - )}" instead. Closing the underlying schedule.` + `First item in the operation list is not a Start, got "${ + JSON.stringify( + first, + ) + }" instead. Closing the underlying schedule.`, ); await Meta.substantial.storeCloseSchedule(schedDef); @@ -259,12 +264,12 @@ export class Agent { workflow.path, run, next.schedule_date, - taskContext + taskContext, ); this.workerManager.listen( next.run_id, - this.#eventResultHandlerFor(workflow.name, next.run_id) + this.#eventResultHandlerFor(workflow.name, next.run_id), ); } catch (err) { throw err; @@ -281,73 +286,52 @@ export class Agent { } } - #eventResultHandlerFor(workflowName: string, runId: string) { - return async (result: Result) => { - if (result.error) { - // All Worker/Runner non-user issue should fall here - // Note: Should never throw (typegate will panic), this will run in a worker - this.logger.error( - `result error for "${runId}": ${JSON.stringify(result.payload)}` - ); - return; - } - - const answer = result.payload as WorkerData; - this.logger.info( - `"${runId}" answered: type ${JSON.stringify(answer.type)}` - ); - + #eventResultHandlerFor( + workflowName: string, + runId: string, + ): EventHandler { + return async (e) => { const startedAt = this.workerManager.getInitialTimeStartedAt(runId); - switch (answer.type) { - case "START": { - const ret = answer.data as WorkflowResult; - const interrupt = Interrupt.getTypeOf(ret.exception); - switch (interrupt) { - case "SAVE_RETRY": - case "SLEEP": - case "WAIT_ENSURE_VALUE": - case "WAIT_HANDLE_EVENT": - case "WAIT_RECEIVE_EVENT": { - await this.#workflowHandleInterrupts(workflowName, runId, ret); - break; - } - case null: { - await this.#workflowHandleGracefullCompletion( - startedAt, - workflowName, - runId, - ret - ); - break; - } - default: - throw new Error(`Unknown interrupt "${interrupt}"`); - } + switch (e.type) { + case "SUCCESS": + case "FAIL": + await this.#workflowHandleGracefullCompletion( + startedAt, + workflowName, + runId, + e, + ); break; - } - default: + case "ERROR": this.logger.error( - `Fatal: invalid type ${ - answer.type - } sent by "${runId}": ${JSON.stringify(answer.data)}` + `Result error for "${runId}": ${JSON.stringify(e.error)}`, ); + return; + case "INTERRUPT": + // TODO unknown interrupt + await this.#workflowHandleInterrupts(workflowName, runId, e); + break; } + + // this.logger.info( + // `"${runId}" answered: type ${JSON.stringify(answer.type)}`, + // ); }; } async #workflowHandleInterrupts( workflowName: string, runId: string, - { result, schedule, run }: WorkflowResult + { interrupt, schedule, run }: InterruptEvent, ) { this.workerManager.destroyWorker(workflowName, runId); // ! - this.logger.debug(`Interrupt "${workflowName}": ${result}"`); + this.logger.debug(`Interrupt "${workflowName}": ${interrupt}"`); // TODO: make all of these transactional - this.logger.info(`Persist records for "${workflowName}": ${result}"`); + this.logger.info(`Persist records for "${workflowName}": ${interrupt}"`); const _run = await Meta.substantial.storePersistRun({ backend: this.backend, run, @@ -388,38 +372,41 @@ export class Agent { startedAt: Date, workflowName: string, runId: string, - { result, kind, schedule, run }: WorkflowResult + event: WorkflowCompletionEvent, ) { this.workerManager.destroyWorker(workflowName, runId); + console.log({ event }); + + const result = event.type == "SUCCESS" ? event.result : event.error; this.logger.info( - `gracefull completion of "${runId}" (${kind}): ${JSON.stringify( - result - )} started at "${startedAt}"` + `gracefull completion of "${runId}" (${event.type}): ${ + JSON.stringify(result) + } started at "${startedAt}"`, ); this.logger.info(`Append Stop ${runId}`); - const rustResult = kind == "FAIL" ? "Err" : "Ok"; + const rustResult = event.type == "FAIL" ? "Err" : "Ok"; // Note: run is a one-time value, thus can be mutated - appendIfOngoing(run, { + appendIfOngoing(event.run, { at: new Date().toJSON(), event: { type: "Stop", result: { - [rustResult]: result ?? null, + [rustResult]: result, } as unknown, }, }); this.logger.info( - `Persist finalized records for "${workflowName}": ${result}" and closing everything..` + `Persist finalized records for "${workflowName}": ${result}" and closing everything..`, ); const _run = await Meta.substantial.storePersistRun({ backend: this.backend, - run, + run: event.run, }); // console.log("Persisted", run); @@ -428,7 +415,7 @@ export class Agent { backend: this.backend, queue: this.queue, run_id: runId, - schedule, + schedule: event.schedule, }); await Meta.substantial.agentRemoveLease({ @@ -437,21 +424,6 @@ export class Agent { lease_seconds: this.config.leaseLifespanSec, }); } - - static nextId(name: string): RunId { - const uuid = crypto.randomUUID(); - return `${name}_::_${uuid}`; - } - - static parseWorkflowName(runId: string) { - const [name, uuid] = runId.split("_::_"); - if (!name && !uuid) { - // Impossible since it must be produced from nextId upon a Start event - throw new Error(`Fatal: ${runId} does not respect the convention`); - } - - return name; - } } function checkIfRunHasStopped(run: Run) { @@ -464,13 +436,15 @@ function checkIfRunHasStopped(run: Run) { if (op.event.type == "Start") { if (life >= 1) { logger.error( - `bad logs: ${JSON.stringify( - run.operations.map(({ event }) => event.type) - )}` + `bad logs: ${ + JSON.stringify( + run.operations.map(({ event }) => event.type), + ) + }`, ); throw new Error( - `"${run.run_id}" has potentially corrupted logs, another run occured yet previous has not stopped` + `"${run.run_id}" has potentially corrupted logs, another run occured yet previous has not stopped`, ); } @@ -479,13 +453,15 @@ function checkIfRunHasStopped(run: Run) { } else if (op.event.type == "Stop") { if (life <= 0) { logger.error( - `bad logs: ${JSON.stringify( - run.operations.map(({ event }) => event.type) - )}` + `bad logs: ${ + JSON.stringify( + run.operations.map(({ event }) => event.type), + ) + }`, ); throw new Error( - `"${run.run_id}" has potentitally corrupted logs, attempted stopping already closed run, or run with a missing Start` + `"${run.run_id}" has potentitally corrupted logs, attempted stopping already closed run, or run with a missing Start`, ); } diff --git a/src/typegate/src/runtimes/substantial/types.ts b/src/typegate/src/runtimes/substantial/types.ts index c750130e6..5d38d05ef 100644 --- a/src/typegate/src/runtimes/substantial/types.ts +++ b/src/typegate/src/runtimes/substantial/types.ts @@ -2,6 +2,8 @@ // SPDX-License-Identifier: MPL-2.0 import { Operation, Run } from "../../../engine/runtime.js"; +import { TaskContext } from "../deno/shared_types.ts"; +import { DenoWorkerError } from "../patterns/worker_manager/deno.ts"; export type { Backend, Operation, @@ -9,44 +11,55 @@ export type { Run, } from "../../../engine/runtime.js"; -export type AnyString = string & Record; - -export type WorkerEvent = "START" | AnyString; +export type WorkflowMessage = { + type: "START"; + data: { + modulePath: string; + functionName: string; + run: Run; + schedule: string; + internal: TaskContext; + }; +}; -export type WorkerData = { - type: WorkerEvent; - data: any; +export type WorkflowCompletionEvent = + | { + type: "SUCCESS"; + result: unknown; + run: Run; + schedule: string; + } + | { + type: "FAIL"; + error: string; + exception: Error | undefined; + run: Run; + schedule: string; + }; + +export type InterruptEvent = { + type: "INTERRUPT"; + interrupt: InterruptType; + schedule: string; + run: Run; }; -export type WorkerEventHandler = (message: Result) => Promise; +export type WorkflowEvent = + | WorkflowCompletionEvent + | InterruptEvent + | { + type: "ERROR"; + error: string; + } + | DenoWorkerError; export type Result = { error: boolean; payload: T; }; -export function Ok(payload: R): Result { - return { error: false, payload }; -} - -export function Err(payload: E): Result { - return { error: true, payload }; -} - -export function Msg(type: WorkerEvent, data: unknown): WorkerData { - return { type, data }; -} - export type ExecutionResultKind = "SUCCESS" | "FAIL"; -export type WorkflowResult = { - kind: ExecutionResultKind; - result: unknown; - exception?: Error; - schedule: string; - run: Run; -}; - // TODO: convert python exceptions into these // by using prefixes on the exception message for example diff --git a/src/typegate/src/runtimes/substantial/worker.ts b/src/typegate/src/runtimes/substantial/worker.ts index ebc0505dc..4de7df8b0 100644 --- a/src/typegate/src/runtimes/substantial/worker.ts +++ b/src/typegate/src/runtimes/substantial/worker.ts @@ -4,12 +4,12 @@ import { errorToString } from "../../worker_utils.ts"; import { Context } from "./deno_context.ts"; import { toFileUrl } from "@std/path/to-file-url"; -import { Err, Msg, Ok, WorkerData, WorkflowResult } from "./types.ts"; +import { Interrupt, WorkflowEvent, WorkflowMessage } from "./types.ts"; let runCtx: Context | undefined; self.onmessage = async function (event) { - const { type, data } = event.data as WorkerData; + const { type, data } = event.data as WorkflowMessage; switch (type) { case "START": { const { modulePath, functionName, run, schedule, internal } = data; @@ -21,7 +21,12 @@ self.onmessage = async function (event) { const workflowFn = module[functionName]; if (typeof workflowFn !== "function") { - self.postMessage(Err(`Function "${functionName}" not found`)); + self.postMessage( + { + type: "ERROR", + error: `Function "${functionName}" not found`, + } satisfies WorkflowEvent, + ); self.close(); return; } @@ -31,40 +36,48 @@ self.onmessage = async function (event) { workflowFn(runCtx, internal) .then((wfResult: unknown) => { self.postMessage( - Ok( - Msg( - type, - { - kind: "SUCCESS", - result: wfResult, - run: runCtx!.getRun(), - schedule, - } satisfies WorkflowResult, - ), - ), + { + type: "SUCCESS", + result: wfResult, + run: runCtx!.getRun(), + schedule, + } satisfies WorkflowEvent, ); }) .catch((wfException: unknown) => { - self.postMessage( - Ok( - Msg( - type, - { - kind: "FAIL", - result: errorToString(wfException), - exception: wfException instanceof Error - ? wfException - : undefined, - run: runCtx!.getRun(), - schedule, - } satisfies WorkflowResult, - ), - ), - ); + const interrupt = Interrupt.getTypeOf(wfException); + if (interrupt) { + self.postMessage( + { + type: "INTERRUPT", + interrupt, + run: runCtx!.getRun(), + schedule, + } satisfies WorkflowEvent, + ); + } else { + self.postMessage( + { + type: "FAIL", + error: errorToString(wfException), + // How?? + exception: wfException instanceof Error + ? wfException + : undefined, + run: runCtx!.getRun(), + schedule, + } satisfies WorkflowEvent, + ); + } }); break; } default: - self.postMessage(Err(Msg(type, `Unknown command ${type}`))); + self.postMessage( + { + type: "ERROR", + error: `Unknown command ${type}`, + } satisfies WorkflowEvent, + ); } }; diff --git a/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts b/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts index c7223d11f..d6176dc30 100644 --- a/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts +++ b/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts @@ -1,169 +1,54 @@ // Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0. // SPDX-License-Identifier: MPL-2.0 -import { envSharedWithWorkers } from "../../config/shared.ts"; import { getLogger } from "../../log.ts"; import { TaskContext } from "../deno/shared_types.ts"; -import { - Err, - Msg, - Result, - Run, - WorkerEvent, - WorkerEventHandler, -} from "./types.ts"; +import { DenoWorker } from "../patterns/worker_manager/deno.ts"; +import { BaseWorkerManager } from "../patterns/worker_manager/mod.ts"; +import { EventHandler, TaskId } from "../patterns/worker_manager/types.ts"; +import { Run, WorkflowEvent, WorkflowMessage } from "./types.ts"; const logger = getLogger(import.meta, "WARN"); -export type WorkerRecord = { - worker: Worker; +export type WorkflowSpec = { modulePath: string; }; -export type RunId = string; -export type WorkflowName = string; - -export class WorkflowRecorder { - workflowRuns: Map> = new Map(); - workers: Map = new Map(); - startedAtRecords: Map = new Map(); - - getRegisteredWorkflowNames() { - return Array.from(this.workflowRuns.keys()); - } - - hasRun(runId: RunId) { - return this.workers.has(runId); - } - - getWorkerRecord(runId: RunId) { - const record = this.workers.get(runId); - if (!record) { - throw new Error(`Run "${runId}" does not exist or has been completed`); - } - - return record!; - } - - addWorker( - name: WorkflowName, - runId: RunId, - worker: WorkerRecord, - startedAt: Date, - ) { - if (!this.workflowRuns.has(name)) { - this.workflowRuns.set(name, new Set()); - } - - this.workflowRuns.get(name)!.add(runId); - this.workers.set(runId, worker); - if (!this.startedAtRecords.has(runId)) { - this.startedAtRecords.set(runId, startedAt); - } - } - - destroyAllWorkers() { - for (const name of this.getRegisteredWorkflowNames()) { - this.destroyRelatedWorkers(name); - } - } - - destroyRelatedWorkers(name: WorkflowName) { - if (this.workflowRuns.has(name)) { - const runIds = this.workflowRuns.get(name)!.values(); - for (const runId of runIds) { - this.destroyWorker(name, runId); - } - return true; - } - return false; - } - - destroyWorker(name: WorkflowName, runId: RunId) { - const record = this.workers.get(runId); - if (this.workflowRuns.has(name)) { - if (!record) { - logger.warn( - `"${runId}" associated with "${name}" does not exist or has been already destroyed`, - ); - return false; - } - - record!.worker.terminate(); // ! - - this.workflowRuns.get(name)!.delete(runId); - this.workers.delete(runId); - - // Let it alive throughout typegate lifetime - // x this.startedAtRecords.delete(runId); - return true; - } - - return false; - } -} /** * - A workflow file can contain multiple workflows (functions) * - A workflow can be run as many times as a START event is triggered (with a run_id) * - The completion of a workflow is run async, it is entirely up to the event listeners to act upon the results */ -export class WorkerManager { - private recorder: WorkflowRecorder = new WorkflowRecorder(); - - constructor() {} - - #createWorker(name: string, modulePath: string, runId: RunId) { - const worker = new Worker(import.meta.resolve("./worker.ts"), { - name: runId, - type: "module", - deno: { - permissions: { - net: true, - // on request permissions - read: "inherit", // default read permission - sys: "inherit", - // non-overridable permissions (security between typegraphs) - run: false, - write: false, - ffi: false, - env: envSharedWithWorkers, - }, - }, +export class WorkerManager + extends BaseWorkerManager { + constructor() { + super((taskId: TaskId) => { + return new DenoWorker(taskId, import.meta.resolve("./worker.ts")); }); - - this.recorder.addWorker( - name, - runId, - { - modulePath, - worker, - }, - new Date(), - ); } destroyWorker(name: string, runId: string) { - return this.recorder.destroyWorker(name, runId); + return super.destroyWorker(name, runId); } destroyAllWorkers() { - this.recorder.destroyAllWorkers(); logger.warn( - `Destroyed workers for ${ - this.recorder - .getRegisteredWorkflowNames() + `Destroying workers for ${ + this + .getActiveTaskNames() .map((w) => `"${w}"`) .join(", ") }`, ); + super.destroyAllWorkers(); } - isOngoing(runId: RunId) { - return this.recorder.hasRun(runId); + isOngoing(runId: TaskId) { + return this.hasTask(runId); } - getAllocatedResources(name: WorkflowName) { - const runIds = this.recorder.workflowRuns.get(name) ?? new Set(); + getAllocatedResources(name: string) { + const runIds = super.getTasksByName(name) ?? new Set(); return { count: runIds.size, workflow: name, @@ -176,43 +61,20 @@ export class WorkerManager { }; } - /** Returns a Date object representing the *initial* time the runId has been registered/run */ - getInitialTimeStartedAt(runId: RunId): Date { - const rec = this.recorder.startedAtRecords.get(runId); - if (!rec) { - throw new Error( - `Invalid state: cannot find initial time for run "${runId}"`, - ); - } - return rec; - } - - listen(runId: RunId, handlerFn: WorkerEventHandler) { - if (!this.recorder.hasRun(runId)) { + listen(runId: TaskId, handlerFn: EventHandler) { + if (!this.hasTask(runId)) { // Note: never throw on worker events, this will make typegate panic! logger.warn(`Attempt listening on missing ${runId}`); return; } - const { worker } = this.recorder.getWorkerRecord(runId); - - worker.onmessage = async (message) => { - if (message.data.error) { - // worker level failure - await handlerFn(Err(message.data.error)); - } else { - // logic level Result (Ok | Err) - await handlerFn(message.data as Result); - } - }; + const { worker } = this.getTask(runId); - worker.onerror = /*async*/ (event) => handlerFn(Err(event)); + worker.listen(handlerFn); } - trigger(type: WorkerEvent, runId: RunId, data: unknown) { - const { worker } = this.recorder.getWorkerRecord(runId); - worker.postMessage(Msg(type, data)); - logger.info(`trigger ${type} for ${runId}`); + override logMessage(runId: TaskId, msg: WorkflowMessage) { + logger.info(`trigger ${msg.type} for ${runId}`); } triggerStart( @@ -223,13 +85,18 @@ export class WorkerManager { schedule: string, internalTCtx: TaskContext, ) { - this.#createWorker(name, workflowModPath, runId); - this.trigger("START", runId, { + this.createWorker(name, runId, { modulePath: workflowModPath, - functionName: name, - run: storedRun, - schedule, - internal: internalTCtx, + }); + this.sendMessage(runId, { + type: "START", + data: { + modulePath: workflowModPath, + functionName: name, + run: storedRun, + schedule, + internal: internalTCtx, + }, }); } } diff --git a/src/typegate/src/typegate/artifacts/mod.ts b/src/typegate/src/typegate/artifacts/mod.ts index 34fb655da..9138a7952 100644 --- a/src/typegate/src/typegate/artifacts/mod.ts +++ b/src/typegate/src/typegate/artifacts/mod.ts @@ -195,6 +195,27 @@ export class ArtifactStore implements AsyncDisposable { return this.#resolveLocalPath(meta, parentDirName); } + async getInlineArtifact( + tgName: string, + code: string, + ext: string, + transform = (code: string) => code, + ) { + const hash = await sha256(code); + const path = resolve( + this.persistence.dirs.cache, + "inline", + tgName, + hash + ext, + ); + if (await exists(path)) { + return path; + } + await Deno.mkdir(dirname(path), { recursive: true }); + await Deno.writeTextFile(path, transform(code)); + return path; + } + prepareUpload(meta: ArtifactMeta) { return this.uploadEndpoints.prepareUpload(meta, this.persistence); } diff --git a/tests/runtimes/deno/deno_sync_test.ts b/tests/runtimes/deno/deno_sync_test.ts index 79688aef9..e4e285961 100644 --- a/tests/runtimes/deno/deno_sync_test.ts +++ b/tests/runtimes/deno/deno_sync_test.ts @@ -67,27 +67,28 @@ Meta.test( .on(e); }); - await t.should("work with global variables in a module", async () => { - await gql` - mutation { - count - } - ` - .expectData({ - count: 1, - }) - .on(e); - - await gql` - mutation { - count - } - ` - .expectData({ - count: 2, - }) - .on(e); - }); + // -- no worker reuse... + // await t.should("work with global variables in a module", async () => { + // await gql` + // mutation { + // count + // } + // ` + // .expectData({ + // count: 1, + // }) + // .on(e); + // + // await gql` + // mutation { + // count + // } + // ` + // .expectData({ + // count: 2, + // }) + // .on(e); + // }); await t.should("work with async function", async () => { await gql` diff --git a/tests/runtimes/deno/deno_test.ts b/tests/runtimes/deno/deno_test.ts index b0d14dce4..70a0a4de7 100644 --- a/tests/runtimes/deno/deno_test.ts +++ b/tests/runtimes/deno/deno_test.ts @@ -39,27 +39,28 @@ Meta.test( .on(e); }); - await t.should("work with global variables in a module", async () => { - await gql` - mutation { - count - } - ` - .expectData({ - count: 1, - }) - .on(e); - - await gql` - mutation { - count - } - ` - .expectData({ - count: 2, - }) - .on(e); - }); + // -- no worker reuse... + // await t.should("work with global variables in a module", async () => { + // await gql` + // mutation { + // count + // } + // ` + // .expectData({ + // count: 1, + // }) + // .on(e); + // + // await gql` + // mutation { + // count + // } + // ` + // .expectData({ + // count: 2, + // }) + // .on(e); + // }); await t.should("work with async function", async () => { await gql` diff --git a/tests/runtimes/substantial/common.ts b/tests/runtimes/substantial/common.ts index ffd012a41..07ef56bd7 100644 --- a/tests/runtimes/substantial/common.ts +++ b/tests/runtimes/substantial/common.ts @@ -26,22 +26,30 @@ export function redisCleanup(url: string) { }; } +type TestTemplateOptions = { + secrets?: Record; + only?: boolean; +}; + +type TestTemplateOptionsX = TestTemplateOptions & { + delays: { + [K in TDelayKeys]: number; + }; +}; + export function basicTestTemplate( backendName: BackendName, { delays, secrets, - }: { - delays: { - awaitSleepCompleteSec: number; - }; - secrets?: Record; - }, + only = false, + }: TestTemplateOptionsX<"awaitSleepCompleteSec">, cleanup?: MetaTestCleanupFn, ) { Meta.test( { name: `Basic workflow execution lifecycle + interrupts (${backendName})`, + only, }, async (t) => { Deno.env.set("SUB_BACKEND", backendName); @@ -163,17 +171,14 @@ export function concurrentWorkflowTestTemplate( { delays, secrets, - }: { - delays: { - awaitEmailCompleteSec: number; - }; - secrets?: Record; - }, + only = false, + }: TestTemplateOptionsX<"awaitEmailCompleteSec">, cleanup?: MetaTestCleanupFn, ) { Meta.test( { name: `Events and concurrent runs (${backendName})`, + only, }, async (t) => { Deno.env.set("SUB_BACKEND", backendName); @@ -342,17 +347,14 @@ export function retrySaveTestTemplate( { delays, secrets, - }: { - delays: { - awaitCompleteAll: number; - }; - secrets?: Record; - }, + only = false, + }: TestTemplateOptionsX<"awaitCompleteAll">, cleanup?: MetaTestCleanupFn, ) { Meta.test( { name: `Retry logic (${backendName})`, + only, }, async (t) => { Deno.env.set("SUB_BACKEND", backendName); @@ -512,17 +514,14 @@ export function childWorkflowTestTemplate( { delays, secrets, - }: { - delays: { - awaitCompleteSec: number; - }; - secrets?: Record; - }, + only = false, + }: TestTemplateOptionsX<"awaitCompleteSec">, cleanup?: MetaTestCleanupFn, ) { Meta.test( { name: `Child workflows (${backendName})`, + only, }, async (t) => { Deno.env.set("SUB_BACKEND", backendName); @@ -624,9 +623,10 @@ export function childWorkflowTestTemplate( .on(e); }); - - await t.should(`filter the runs given a nested expr (${backendName})`, async () => { - await gql` + await t.should( + `filter the runs given a nested expr (${backendName})`, + async () => { + await gql` query { search(name: "bumpPackage", filter: $filter) { # started_at @@ -636,30 +636,35 @@ export function childWorkflowTestTemplate( } } ` - .withVars({ - filter: { - or: [ - { - and: [ - { status: { contains: JSON.stringify("COMPL") }}, - { contains: JSON.stringify("substantial") } - ] - }, - { not: { not: { eq: JSON.stringify("Bump typegraph v3 => v4") } } } - ] - } satisfies Expr - }) - .expectBody((body) => { - const sorted = body.data.search.sort((a: any, b: any) => a.value.localeCompare(b.value)); - assertEquals(sorted, - [ - { status: "COMPLETED", value: '"Bump substantial v2 => v3"' }, - { status: "COMPLETED", value: '"Bump typegraph v3 => v4"' } - ] - ); - }) - .on(e); - }); + .withVars({ + filter: { + or: [ + { + and: [ + { status: { contains: JSON.stringify("COMPL") } }, + { contains: JSON.stringify("substantial") }, + ], + }, + { + not: { + not: { eq: JSON.stringify("Bump typegraph v3 => v4") }, + }, + }, + ], + } satisfies Expr, + }) + .expectBody((body) => { + const sorted = body.data.search.sort((a: any, b: any) => + a.value.localeCompare(b.value) + ); + assertEquals(sorted, [ + { status: "COMPLETED", value: '"Bump substantial v2 => v3"' }, + { status: "COMPLETED", value: '"Bump typegraph v3 => v4"' }, + ]); + }) + .on(e); + }, + ); }, ); } @@ -668,14 +673,14 @@ export function inputMutationTemplate( backendName: BackendName, { secrets, - }: { - secrets?: Record; - }, + only = false, + }: TestTemplateOptions, cleanup?: MetaTestCleanupFn, ) { Meta.test( { name: `kwargs input mutation after interrupts (${backendName})`, + only, }, async (t) => { Deno.env.set("SUB_BACKEND", backendName);