Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Unify worker manager #954

Merged
merged 20 commits into from
Jan 11, 2025
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions import_map.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"@std/archive/": "jsr:/@std/archive@^0.225.0/",
"@std/assert": "jsr:@std/assert@^1.0.6",
"@std/assert/": "jsr:/@std/assert@^1.0.6/",
"@std/async/": "jsr:/@std/[email protected]/",
"@std/cli/": "jsr:/@std/cli@^1.0.4/",
"@std/collections/": "jsr:/@std/collections@^1.0.5/",
"@std/encoding/": "jsr:/@std/encoding@^1.0.2/",
Expand Down
96 changes: 58 additions & 38 deletions src/typegate/src/runtimes/deno/deno.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { PolicyResolverOutput } from "../../engine/planner/policies.ts";
import { getInjectionValues } from "../../engine/planner/injection_utils.ts";
import DynamicInjection from "../../engine/injection/dynamic.ts";
import { getLogger } from "../../log.ts";
import { WorkerManager } from "./deno_worker_manager.ts";

const logger = getLogger(import.meta);

Expand All @@ -37,7 +38,8 @@ const predefinedFuncs: Record<string, Resolver<Record<string, unknown>>> = {
allow: () => "ALLOW" as PolicyResolverOutput,
deny: () => "DENY" as PolicyResolverOutput,
pass: () => "PASS" as PolicyResolverOutput,
internal_policy: ({ _: { context } }) => context.provider === "internal" ? "ALLOW" : "PASS" as PolicyResolverOutput,
internal_policy: ({ _: { context } }) =>
context.provider === "internal" ? "ALLOW" : "PASS" as PolicyResolverOutput,
};

export class DenoRuntime extends Runtime {
Expand All @@ -47,6 +49,7 @@ export class DenoRuntime extends Runtime {
private tg: TypeGraphDS,
private typegate: Typegate,
private w: DenoMessenger,
private workerManager: WorkerManager,
private registry: Map<string, number>,
private secrets: Record<string, string>,
) {
Expand Down Expand Up @@ -149,6 +152,10 @@ export class DenoRuntime extends Runtime {
typegate.config.base,
);

const workerManager = new WorkerManager({
timeout_ms: typegate.config.base.timer_max_timeout_ms,
});

if (Deno.env.get("DENO_TESTING") === "true") {
w.disableLazyness();
}
Expand All @@ -159,6 +166,7 @@ export class DenoRuntime extends Runtime {
tg,
typegate,
w,
workerManager,
registry,
secrets,
);
Expand Down Expand Up @@ -257,7 +265,10 @@ export class DenoRuntime extends Runtime {
const modMat = this.tg.materializers[mat.data.mod as number];
const entryPoint =
this.tg.meta.artifacts[modMat.data.entryPoint as string];
const op = this.registry.get(entryPoint.hash)!;
const depMetas = (modMat.data.deps as string[]).map((dep) =>
createArtifactMeta(this.typegraphName, this.tg.meta.artifacts[dep])
);
const moduleMeta = createArtifactMeta(this.typegraphName, entryPoint);

return async ({
_: {
Expand All @@ -269,33 +280,33 @@ export class DenoRuntime extends Runtime {
}) => {
const token = await InternalAuth.emit(this.typegate.cryptoKeys);

return await this.w.execute(
op,
// TODO cache??
const entryModulePath = await this.typegate.artifactStore.getLocalPath(
moduleMeta,
depMetas,
);

return await this.workerManager.callFunction(
mat.data.name as string,
entryModulePath,
entryPoint.path,
args,
{
type: "import_func",
args,
internals: {
parent,
context,
secrets,
effect: mat.effect.effect ?? null,
meta: {
url: `${url.protocol}//${url.host}/${this.typegraphName}`,
token,
},
headers,
parent,
context,
secrets,
effect: mat.effect.effect ?? null,
meta: {
url: `${url.protocol}//${url.host}/${this.typegraphName}`,
token,
},
name: mat.data.name as string,
verbose,
headers,
},
[],
pulseCount,
);
};
}

if (mat.name === "function") {
const op = this.registry.get(mat.data.script as string)!;
return async ({
_: {
context,
Expand All @@ -306,26 +317,29 @@ export class DenoRuntime extends Runtime {
}) => {
const token = await InternalAuth.emit(this.typegate.cryptoKeys);

return await this.w.execute(
op,
const modulePath = await this.typegate.artifactStore.getInlineArtifact(
this.typegraphName,
mat.data.script as string,
".ts",
exportInlineFunction("inlineFunction"),
);

return await this.workerManager.callFunction(
"inlineFunction",
modulePath,
"inline", // TODO
luckasRanarison marked this conversation as resolved.
Show resolved Hide resolved
args,
{
type: "func",
args,
internals: {
parent,
context,
secrets,
effect: mat.effect.effect ?? null,
meta: {
url: `${url.protocol}//${url.host}/${this.typegraphName}`,
token,
},
headers,
parent,
context,
secrets,
effect: mat.effect.effect ?? null,
meta: {
url: `${url.protocol}//${url.host}/${this.typegraphName}`,
token,
},
verbose,
headers,
},
[],
pulseCount,
);
};
}
Expand Down Expand Up @@ -365,6 +379,12 @@ export class DenoRuntime extends Runtime {
}
}

function exportInlineFunction(name = "fn", symbol = "_my_lambda") {
return (code: string) => {
return `${code}\nexport const ${name} = ${symbol};`;
};
}
Natoandro marked this conversation as resolved.
Show resolved Hide resolved

function getInjectionData(d: InjectionData) {
if ("value" in d) {
return d.value;
Expand Down
2 changes: 1 addition & 1 deletion src/typegate/src/runtimes/deno/deno_messenger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export class DenoMessenger extends LazyAsyncMessenger<Worker, Task, unknown> {
) {
super(
(receive) => {
const worker = new Worker(import.meta.resolve("./worker.ts"), {
const worker = new Worker(import.meta.resolve("./worker_old.ts"), {
type: "module",
deno: {
namespace: false,
Expand Down
75 changes: 75 additions & 0 deletions src/typegate/src/runtimes/deno/deno_worker_manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0.
// SPDX-License-Identifier: MPL-2.0

import { getLogger } from "../../log.ts";
import { DenoWorker } from "../utils/workers/deno.ts";
import { BaseWorkerManager, createTaskId } from "../utils/workers/manager.ts";
import { TaskId } from "../utils/workers/types.ts";
import { TaskContext } from "./shared_types.ts";
import { DenoEvent, DenoMessage, TaskSpec } from "./types.ts";
import { delay } from "@std/async/delay";

const logger = getLogger(import.meta, "WARN");

export type WorkerManagerConfig = {
timeout_ms: number;
};

export class WorkerManager
extends BaseWorkerManager<TaskSpec, DenoMessage, DenoEvent> {
constructor(private config: WorkerManagerConfig) {
super(
(taskId: TaskId) => {
return new DenoWorker(taskId, import.meta.resolve("./worker.ts"));
},
);
}

callFunction(
name: string,
modulePath: string,
relativeModulePath: string,
args: unknown,
internalTCtx: TaskContext,
) {
const taskId = createTaskId(`${name}@${relativeModulePath}`);
this.createWorker(name, taskId, {
modulePath,
functionName: name,
});
this.sendMessage(taskId, {
type: "CALL",
modulePath,
functionName: name,
args,
internals: internalTCtx,
});

return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
this.destroyWorker(name, taskId);
reject(new Error(`${this.config.timeout_ms}ms timeout exceeded`));
}, this.config.timeout_ms);

const handler: (event: DenoEvent) => void = (event) => {
clearTimeout(timeoutId);
this.destroyWorker(name, taskId);
switch (event.type) {
case "SUCCESS":
resolve(event.result);
break;
case "FAILURE":
reject(event.exception ?? event.error);
break;
}
};

const { worker } = this.getTask(taskId);
worker.listen(handler);
});
}

override logMessage(taskId: TaskId, msg: DenoMessage) {
logger.info(`Task "${taskId}" received message: ${msg.type}`);
}
}
21 changes: 21 additions & 0 deletions src/typegate/src/runtimes/deno/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0.
// SPDX-License-Identifier: MPL-2.0

import { TaskContext } from "./shared_types.ts";

export type TaskSpec = {
modulePath: string;
functionName: string;
};

export type DenoMessage = {
type: "CALL";
modulePath: string;
functionName: string;
args: unknown;
internals: TaskContext;
};
Natoandro marked this conversation as resolved.
Show resolved Hide resolved

export type DenoEvent =
| { type: "SUCCESS"; result: unknown }
| { type: "FAILURE"; error: string; exception: Error | undefined };
Loading
Loading