Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Unify worker manager #954

Merged
merged 20 commits into from
Jan 11, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 50 additions & 44 deletions src/typegate/src/runtimes/substantial/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,9 @@
} from "../../../engine/runtime.js";
import { getLoggerByAddress, Logger } from "../../log.ts";
import { TaskContext } from "../deno/shared_types.ts";
import {
appendIfOngoing,
Interrupt,
Result,
WorkerData,
WorkflowResult,
} from "./types.ts";
import { RunId, WorkerManager } from "./workflow_worker_manager.ts";
import { TaskId } from "../utils/worker_manager.ts";
import { appendIfOngoing, Interrupt, Result, WorkflowResult } from "./types.ts";
import { WorkerManager } from "./workflow_worker_manager.ts";

export interface StdKwargs {
taskContext: TaskContext;
Expand Down Expand Up @@ -45,7 +40,7 @@
constructor(
private backend: Backend,
private queue: string,
private config: AgentConfig
private config: AgentConfig,
) {
this.logger = getLoggerByAddress(import.meta, "substantial");
}
Expand All @@ -64,7 +59,7 @@
});
} catch (err) {
this.logger.warn(
`Failed writing log metadata for schedule "${schedule}" (${runId}), skipping it: ${err}`
`Failed writing log metadata for schedule "${schedule}" (${runId}), skipping it: ${err}`,

Check warning on line 62 in src/typegate/src/runtimes/substantial/agent.ts

View check run for this annotation

Codecov / codecov/patch

src/typegate/src/runtimes/substantial/agent.ts#L62

Added line #L62 was not covered by tests
);
}
}
Expand Down Expand Up @@ -97,9 +92,11 @@
this.workflows = workflows;

this.logger.warn(
`Initializing agent to handle ${workflows
.map(({ name }) => name)
.join(", ")}`
`Initializing agent to handle ${
workflows
.map(({ name }) => name)
.join(", ")
}`,
);

this.pollIntervalHandle = setInterval(async () => {
Expand Down Expand Up @@ -138,7 +135,7 @@

for (const workflow of this.workflows) {
const requests = replayRequests.filter(
({ run_id }) => Agent.parseWorkflowName(run_id) == workflow.name
({ run_id }) => Agent.parseWorkflowName(run_id) == workflow.name,
);

while (requests.length > 0) {
Expand All @@ -149,7 +146,7 @@
await this.#replay(next, workflow);
} catch (err) {
this.logger.error(
`Replay failed for ${workflow.name} => ${JSON.stringify(next)}`
`Replay failed for ${workflow.name} => ${JSON.stringify(next)}`,

Check warning on line 149 in src/typegate/src/runtimes/substantial/agent.ts

View check run for this annotation

Codecov / codecov/patch

src/typegate/src/runtimes/substantial/agent.ts#L149

Added line #L149 was not covered by tests
);
this.logger.error(err);
} finally {
Expand Down Expand Up @@ -195,7 +192,7 @@
// necessarily represent the state of what is actually running on the current typegate node
if (this.workerManager.isOngoing(next.run_id)) {
this.logger.warn(
`skip triggering ${next.run_id} for the current tick as it is still ongoing`
`skip triggering ${next.run_id} for the current tick as it is still ongoing`,
);

return;
Expand All @@ -222,7 +219,7 @@
// This may occur if an event is sent but the underlying run already completed
// Or does not exist.
this.logger.warn(
`Run ${next.run_id} has already stopped, closing schedule`
`Run ${next.run_id} has already stopped, closing schedule`,
);
await Meta.substantial.storeCloseSchedule(schedDef);
return;
Expand All @@ -242,9 +239,11 @@
// A consequence of the above, a workflow is always triggered by gql { start(..) }
// This can also occur if an event is sent from gql under a runId that is not valid (e.g. due to typo)
this.logger.warn(
`First item in the operation list is not a Start, got "${JSON.stringify(
first
)}" instead. Closing the underlying schedule.`
`First item in the operation list is not a Start, got "${
JSON.stringify(
first,

Check warning on line 244 in src/typegate/src/runtimes/substantial/agent.ts

View check run for this annotation

Codecov / codecov/patch

src/typegate/src/runtimes/substantial/agent.ts#L242-L244

Added lines #L242 - L244 were not covered by tests
)
}" instead. Closing the underlying schedule.`,

Check warning on line 246 in src/typegate/src/runtimes/substantial/agent.ts

View check run for this annotation

Codecov / codecov/patch

src/typegate/src/runtimes/substantial/agent.ts#L246

Added line #L246 was not covered by tests
);

await Meta.substantial.storeCloseSchedule(schedDef);
Expand All @@ -259,12 +258,12 @@
workflow.path,
run,
next.schedule_date,
taskContext
taskContext,
);

this.workerManager.listen(
next.run_id,
this.#eventResultHandlerFor(workflow.name, next.run_id)
this.#eventResultHandlerFor(workflow.name, next.run_id),
);
} catch (err) {
throw err;
Expand All @@ -287,14 +286,15 @@
// All Worker/Runner non-user issue should fall here
// Note: Should never throw (typegate will panic), this will run in a worker
this.logger.error(
`result error for "${runId}": ${JSON.stringify(result.payload)}`
`result error for "${runId}": ${JSON.stringify(result.payload)}`,

Check warning on line 289 in src/typegate/src/runtimes/substantial/agent.ts

View check run for this annotation

Codecov / codecov/patch

src/typegate/src/runtimes/substantial/agent.ts#L289

Added line #L289 was not covered by tests
);
return;
}

const answer = result.payload as WorkerData;
// TODO generic event type on BaseWorker
const answer = result.payload as { type: string; data: unknown };
this.logger.info(
`"${runId}" answered: type ${JSON.stringify(answer.type)}`
`"${runId}" answered: type ${JSON.stringify(answer.type)}`,
);

const startedAt = this.workerManager.getInitialTimeStartedAt(runId);
Expand All @@ -317,7 +317,7 @@
startedAt,
workflowName,
runId,
ret
ret,
);
break;
}
Expand All @@ -328,9 +328,9 @@
}
default:
this.logger.error(
`Fatal: invalid type ${
answer.type
} sent by "${runId}": ${JSON.stringify(answer.data)}`
`Fatal: invalid type ${answer.type} sent by "${runId}": ${
JSON.stringify(answer.data)
}`,

Check warning on line 333 in src/typegate/src/runtimes/substantial/agent.ts

View check run for this annotation

Codecov / codecov/patch

src/typegate/src/runtimes/substantial/agent.ts#L331-L333

Added lines #L331 - L333 were not covered by tests
);
}
};
Expand All @@ -339,7 +339,7 @@
async #workflowHandleInterrupts(
workflowName: string,
runId: string,
{ result, schedule, run }: WorkflowResult
{ result, schedule, run }: WorkflowResult,
) {
this.workerManager.destroyWorker(workflowName, runId); // !

Expand Down Expand Up @@ -388,14 +388,16 @@
startedAt: Date,
workflowName: string,
runId: string,
{ result, kind, schedule, run }: WorkflowResult
{ result, kind, schedule, run }: WorkflowResult,
) {
this.workerManager.destroyWorker(workflowName, runId);

this.logger.info(
`gracefull completion of "${runId}" (${kind}): ${JSON.stringify(
result
)} started at "${startedAt}"`
`gracefull completion of "${runId}" (${kind}): ${
JSON.stringify(
result,
)
} started at "${startedAt}"`,
);

this.logger.info(`Append Stop ${runId}`);
Expand All @@ -414,7 +416,7 @@
});

this.logger.info(
`Persist finalized records for "${workflowName}": ${result}" and closing everything..`
`Persist finalized records for "${workflowName}": ${result}" and closing everything..`,
);

const _run = await Meta.substantial.storePersistRun({
Expand All @@ -438,7 +440,7 @@
});
}

static nextId(name: string): RunId {
static nextId(name: string): TaskId {
const uuid = crypto.randomUUID();
return `${name}_::_${uuid}`;
}
Expand All @@ -464,13 +466,15 @@
if (op.event.type == "Start") {
if (life >= 1) {
logger.error(
`bad logs: ${JSON.stringify(
run.operations.map(({ event }) => event.type)
)}`
`bad logs: ${
JSON.stringify(
run.operations.map(({ event }) => event.type),

Check warning on line 471 in src/typegate/src/runtimes/substantial/agent.ts

View check run for this annotation

Codecov / codecov/patch

src/typegate/src/runtimes/substantial/agent.ts#L469-L471

Added lines #L469 - L471 were not covered by tests
)
}`,

Check warning on line 473 in src/typegate/src/runtimes/substantial/agent.ts

View check run for this annotation

Codecov / codecov/patch

src/typegate/src/runtimes/substantial/agent.ts#L473

Added line #L473 was not covered by tests
);

throw new Error(
`"${run.run_id}" has potentially corrupted logs, another run occured yet previous has not stopped`
`"${run.run_id}" has potentially corrupted logs, another run occured yet previous has not stopped`,

Check warning on line 477 in src/typegate/src/runtimes/substantial/agent.ts

View check run for this annotation

Codecov / codecov/patch

src/typegate/src/runtimes/substantial/agent.ts#L477

Added line #L477 was not covered by tests
);
}

Expand All @@ -479,13 +483,15 @@
} else if (op.event.type == "Stop") {
if (life <= 0) {
logger.error(
`bad logs: ${JSON.stringify(
run.operations.map(({ event }) => event.type)
)}`
`bad logs: ${
JSON.stringify(
run.operations.map(({ event }) => event.type),

Check warning on line 488 in src/typegate/src/runtimes/substantial/agent.ts

View check run for this annotation

Codecov / codecov/patch

src/typegate/src/runtimes/substantial/agent.ts#L486-L488

Added lines #L486 - L488 were not covered by tests
)
}`,

Check warning on line 490 in src/typegate/src/runtimes/substantial/agent.ts

View check run for this annotation

Codecov / codecov/patch

src/typegate/src/runtimes/substantial/agent.ts#L490

Added line #L490 was not covered by tests
);

throw new Error(
`"${run.run_id}" has potentitally corrupted logs, attempted stopping already closed run, or run with a missing Start`
`"${run.run_id}" has potentitally corrupted logs, attempted stopping already closed run, or run with a missing Start`,

Check warning on line 494 in src/typegate/src/runtimes/substantial/agent.ts

View check run for this annotation

Codecov / codecov/patch

src/typegate/src/runtimes/substantial/agent.ts#L494

Added line #L494 was not covered by tests
);
}

Expand Down
21 changes: 10 additions & 11 deletions src/typegate/src/runtimes/substantial/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,23 @@
// SPDX-License-Identifier: MPL-2.0

import { Operation, Run } from "../../../engine/runtime.js";
import { TaskContext } from "../deno/shared_types.ts";
export type {
Backend,
Operation,
OperationEvent,
Run,
} from "../../../engine/runtime.js";

export type AnyString = string & Record<string | number | symbol, never>;

export type WorkerEvent = "START" | AnyString;

export type WorkerData = {
type: WorkerEvent;
data: any;
export type WorkflowMessage = {
type: "START";
data: {
modulePath: string;
Natoandro marked this conversation as resolved.
Show resolved Hide resolved
functionName: string;
run: Run;
schedule: string;
internal: TaskContext;
};
};

export type WorkerEventHandler = (message: Result<unknown>) => Promise<void>;
Expand All @@ -33,10 +36,6 @@ export function Err<E>(payload: E): Result<E> {
return { error: true, payload };
}

export function Msg(type: WorkerEvent, data: unknown): WorkerData {
return { type, data };
}

export type ExecutionResultKind = "SUCCESS" | "FAIL";

export type WorkflowResult = {
Expand Down
18 changes: 9 additions & 9 deletions src/typegate/src/runtimes/substantial/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
import { errorToString } from "../../worker_utils.ts";
import { Context } from "./deno_context.ts";
import { toFileUrl } from "@std/path/to-file-url";
import { Err, Msg, Ok, WorkerData, WorkflowResult } from "./types.ts";
import { Err, Ok, WorkflowMessage, WorkflowResult } from "./types.ts";

let runCtx: Context | undefined;

self.onmessage = async function (event) {
const { type, data } = event.data as WorkerData;
const { type, data } = event.data as WorkflowMessage;
switch (type) {
case "START": {
const { modulePath, functionName, run, schedule, internal } = data;
Expand All @@ -32,24 +32,24 @@ self.onmessage = async function (event) {
.then((wfResult: unknown) => {
self.postMessage(
Ok(
Msg(
{
type,
{
data: {
kind: "SUCCESS",
result: wfResult,
run: runCtx!.getRun(),
schedule,
} satisfies WorkflowResult,
),
},
),
);
})
.catch((wfException: unknown) => {
self.postMessage(
Ok(
Msg(
{
type,
{
data: {
kind: "FAIL",
result: errorToString(wfException),
exception: wfException instanceof Error
Expand All @@ -58,13 +58,13 @@ self.onmessage = async function (event) {
run: runCtx!.getRun(),
schedule,
} satisfies WorkflowResult,
),
},
),
);
});
break;
}
default:
self.postMessage(Err(Msg(type, `Unknown command ${type}`)));
self.postMessage(Err({ type, data: `Unknown command ${type}` }));
}
};
Loading
Loading