Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Unify worker manager #954

Merged
merged 20 commits into from
Jan 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

122 changes: 64 additions & 58 deletions src/typegate/src/runtimes/deno/deno.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
} from "../../typegraph/types.ts";
import * as ast from "graphql/ast";
import { InternalAuth } from "../../services/auth/protocols/internal.ts";
import { DenoMessenger } from "./deno_messenger.ts";
import type { Task } from "./shared_types.ts";
import { path } from "compress/deps.ts";
import { globalConfig as config } from "../../config.ts";
Expand All @@ -27,6 +26,7 @@
import { getInjectionValues } from "../../engine/planner/injection_utils.ts";
import DynamicInjection from "../../engine/injection/dynamic.ts";
import { getLogger } from "../../log.ts";
import { WorkerManager } from "./worker_manager.ts";

const logger = getLogger(import.meta);

Expand All @@ -37,7 +37,8 @@
allow: () => "ALLOW" as PolicyResolverOutput,
deny: () => "DENY" as PolicyResolverOutput,
pass: () => "PASS" as PolicyResolverOutput,
internal_policy: ({ _: { context } }) => context.provider === "internal" ? "ALLOW" : "PASS" as PolicyResolverOutput,
internal_policy: ({ _: { context } }) =>

Check warning on line 40 in src/typegate/src/runtimes/deno/deno.ts

View check run for this annotation

Codecov / codecov/patch

src/typegate/src/runtimes/deno/deno.ts#L40

Added line #L40 was not covered by tests
context.provider === "internal" ? "ALLOW" : "PASS" as PolicyResolverOutput,
};

export class DenoRuntime extends Runtime {
Expand All @@ -46,8 +47,7 @@
uuid: string,
private tg: TypeGraphDS,
private typegate: Typegate,
private w: DenoMessenger,
private registry: Map<string, number>,
private workerManager: WorkerManager,
private secrets: Record<string, string>,
) {
super(typegraphName, uuid);
Expand Down Expand Up @@ -138,36 +138,24 @@
}
}

const w = new DenoMessenger(
name,
{
...(args.permissions ?? {}),
read: [basePath],
} as Deno.PermissionOptionsObject,
false,
ops,
typegate.config.base,
);

if (Deno.env.get("DENO_TESTING") === "true") {
w.disableLazyness();
}
const workerManager = new WorkerManager({
timeout_ms: typegate.config.base.timer_max_timeout_ms,
});

const rt = new DenoRuntime(
typegraphName,
uuid,
tg,
typegate,
w,
registry,
workerManager,
secrets,
);

return rt;
}

async deinit(): Promise<void> {
await this.w.terminate();
// await this.workerManager.deinit();
Natoandro marked this conversation as resolved.
Show resolved Hide resolved
}

materialize(
Expand Down Expand Up @@ -257,7 +245,10 @@
const modMat = this.tg.materializers[mat.data.mod as number];
const entryPoint =
this.tg.meta.artifacts[modMat.data.entryPoint as string];
const op = this.registry.get(entryPoint.hash)!;
const depMetas = (modMat.data.deps as string[]).map((dep) =>
createArtifactMeta(this.typegraphName, this.tg.meta.artifacts[dep])
);
const moduleMeta = createArtifactMeta(this.typegraphName, entryPoint);

return async ({
_: {
Expand All @@ -269,33 +260,33 @@
}) => {
const token = await InternalAuth.emit(this.typegate.cryptoKeys);

return await this.w.execute(
op,
// TODO cache??
const entryModulePath = await this.typegate.artifactStore.getLocalPath(
moduleMeta,
depMetas,
);

return await this.workerManager.callFunction(
mat.data.name as string,
entryModulePath,
entryPoint.path,
args,
{
type: "import_func",
args,
internals: {
parent,
context,
secrets,
effect: mat.effect.effect ?? null,
meta: {
url: `${url.protocol}//${url.host}/${this.typegraphName}`,
token,
},
headers,
parent,
context,
secrets,
effect: mat.effect.effect ?? null,
meta: {
url: `${url.protocol}//${url.host}/${this.typegraphName}`,
token,
},
name: mat.data.name as string,
verbose,
headers,
},
[],
pulseCount,
);
};
}

if (mat.name === "function") {
const op = this.registry.get(mat.data.script as string)!;
return async ({
_: {
context,
Expand All @@ -306,26 +297,29 @@
}) => {
const token = await InternalAuth.emit(this.typegate.cryptoKeys);

return await this.w.execute(
op,
const modulePath = await this.typegate.artifactStore.getInlineArtifact(
this.typegraphName,
mat.data.script as string,
".ts",
exportInlineFunction("inlineFunction"),
);

return await this.workerManager.callFunction(
"inlineFunction",
modulePath,
"tg",
args,
{
type: "func",
args,
internals: {
parent,
context,
secrets,
effect: mat.effect.effect ?? null,
meta: {
url: `${url.protocol}//${url.host}/${this.typegraphName}`,
token,
},
headers,
parent,
context,
secrets,
effect: mat.effect.effect ?? null,
meta: {
url: `${url.protocol}//${url.host}/${this.typegraphName}`,
token,
},
verbose,
headers,
},
[],
pulseCount,
);
};
}
Expand Down Expand Up @@ -365,6 +359,18 @@
}
}

function exportInlineFunction(name = "fn", symbol = "_my_lambda") {
if (!name.match(/^[a-zA-Z_][a-zA-Z0-9_]*$/)) {
throw new Error(`Invalid identifier: ${name}`);
}

Check warning on line 365 in src/typegate/src/runtimes/deno/deno.ts

View check run for this annotation

Codecov / codecov/patch

src/typegate/src/runtimes/deno/deno.ts#L364-L365

Added lines #L364 - L365 were not covered by tests
if (!symbol.match(/^[a-zA-Z_][a-zA-Z0-9_]*$/)) {
throw new Error(`Invalid identifier: ${symbol}`);
}

Check warning on line 368 in src/typegate/src/runtimes/deno/deno.ts

View check run for this annotation

Codecov / codecov/patch

src/typegate/src/runtimes/deno/deno.ts#L367-L368

Added lines #L367 - L368 were not covered by tests
return (code: string) => {
return `${code}\nexport const ${name} = ${symbol};`;
};
}

function getInjectionData(d: InjectionData) {
if ("value" in d) {
return d.value;
Expand Down
74 changes: 0 additions & 74 deletions src/typegate/src/runtimes/deno/deno_messenger.ts

This file was deleted.

21 changes: 21 additions & 0 deletions src/typegate/src/runtimes/deno/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0.
// SPDX-License-Identifier: MPL-2.0

import { TaskContext } from "./shared_types.ts";

export type TaskSpec = {
modulePath: string;
functionName: string;
};

export type DenoMessage = {
type: "CALL";
modulePath: string;
functionName: string;
args: unknown;
internals: TaskContext;
};
Natoandro marked this conversation as resolved.
Show resolved Hide resolved

export type DenoEvent =
| { type: "SUCCESS"; result: unknown }
| { type: "FAILURE"; error: string; exception: Error | undefined };
Loading
Loading