Skip to content

Commit

Permalink
configuration variables
Browse files Browse the repository at this point in the history
  • Loading branch information
Natoandro committed Jan 21, 2025
1 parent 348c359 commit 16a91dc
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 33 deletions.
12 changes: 11 additions & 1 deletion src/typegate/src/config/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ export const globalConfigSchema = z.object({
sentry_sample_rate: z.coerce.number().positive().min(0).max(1),
sentry_traces_sample_rate: z.coerce.number().positive().min(0).max(1),
deno_v8_flags: z.string().optional(),
min_deno_workers: z.coerce.number().positive().default(2),
max_deno_workers: z.coerce.number().positive().default(8),
deno_worker_wait_timeout_ms: z.coerce.number().positive().default(5000),
// deno_idle_worker_timeout_ms: z.coerce.number().positive().optional(), // auto remove idle workers
min_substantial_workers: z.coerce.number().positive().default(2),
max_substantial_workers: z.coerce.number().positive().default(8),
substantial_worker_wait_timeout_ms: z.coerce.number().positive().default(
15000,
),
// substantial_idle_worker_timeout_ms: z.coerce.number().positive().optional(), // auto remove idle workers
});
export type GlobalConfig = z.infer<typeof globalConfigSchema>;

Expand Down Expand Up @@ -122,7 +132,7 @@ export type SyncConfigX = {
redis: RedisConnectOptions;
s3: S3ClientConfig;
s3Bucket: string;
forceRemove?: boolean
forceRemove?: boolean;
};

export type TypegateConfig = {
Expand Down
8 changes: 6 additions & 2 deletions src/typegate/src/runtimes/deno/worker_manager.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0.
// SPDX-License-Identifier: MPL-2.0

import { globalConfig } from "../../config.ts";
import { getLogger } from "../../log.ts";
import { DenoWorker } from "../patterns/worker_manager/deno.ts";
import {
Expand All @@ -25,8 +26,11 @@ export class WorkerManager
if (!WorkerManager.#pool) {
WorkerManager.#pool = new WorkerPool(
"deno runtime",
// TODO load from config
{},
{
minWorkers: globalConfig.min_deno_workers,
maxWorkers: globalConfig.max_deno_workers,
waitTimeoutMs: globalConfig.deno_worker_wait_timeout_ms,
},
(id: string) => new DenoWorker(id, import.meta.resolve("./worker.ts")),
);
}
Expand Down
12 changes: 6 additions & 6 deletions src/typegate/src/runtimes/patterns/worker_manager/deno.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import { envSharedWithWorkers } from "../../../config/shared.ts";
import { BaseWorker } from "./mod.ts";
import { BaseMessage, EventHandler, TaskId } from "./types.ts";
import { BaseMessage, EventHandler } from "./types.ts";

export interface DenoWorkerError extends BaseMessage {
type: "WORKER_ERROR";
Expand All @@ -15,11 +15,11 @@ export type BaseDenoWorkerMessage = BaseMessage | DenoWorkerError;
export class DenoWorker<M extends BaseMessage, E extends BaseDenoWorkerMessage>
extends BaseWorker<M, E> {
#worker: Worker;
#taskId: TaskId;
constructor(taskId: TaskId, workerPath: string) {
#workerId: string;
constructor(workerId: string, workerPath: string) {
super();
this.#worker = new Worker(workerPath, {
name: taskId,
name: workerId,
type: "module",
deno: {
permissions: {
Expand All @@ -35,7 +35,7 @@ export class DenoWorker<M extends BaseMessage, E extends BaseDenoWorkerMessage>
},
},
});
this.#taskId = taskId;
this.#workerId = workerId;
}

listen(handlerFn: EventHandler<E>) {
Expand All @@ -62,6 +62,6 @@ export class DenoWorker<M extends BaseMessage, E extends BaseDenoWorkerMessage>
}

get id() {
return this.#taskId;
return this.#workerId;
}
}
51 changes: 28 additions & 23 deletions src/typegate/src/runtimes/patterns/worker_manager/pooling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ import { getLogger } from "../../../log.ts";
const logger = getLogger(import.meta, "WARN");

export type PoolConfig = {
maxWorkers?: number | null;
minWorkers?: number | null;
waitTimeoutMs?: number | null;
// non-negative integer; 0 means no limit
maxWorkers: number;
// non-negative integer; must be less than or equal to `maxWorkers` if maxWorkers is not 0
minWorkers: number;
// non-negative integer; 0 means no timeout
waitTimeoutMs: number;
};

export type Consumer<T> = (x: T) => void;
Expand Down Expand Up @@ -87,10 +90,7 @@ export class WaitQueueWithTimeout<W> implements WaitQueue<W> {
this.#updateTimer();
return;
}
this.#timerId = setTimeout(
this.#timeoutHandler.bind(this),
timeoutMs,
);
this.#timerId = setTimeout(this.#timeoutHandler.bind(this), timeoutMs);
} else {
this.#timerId = null;
}
Expand Down Expand Up @@ -124,19 +124,22 @@ export class WorkerPool<
#workerFactory: () => W;
#nextWorkerId = 1;

constructor(
name: string,
config: PoolConfig,
factory: (id: string) => W,
) {
constructor(name: string, config: PoolConfig, factory: (id: string) => W) {
if (config.maxWorkers != 0 && config.minWorkers > config.maxWorkers) {
throw new Error(
"Worker pool configuration error: maxWorkers must be greater than or equal to minWorkers or be 0",
);
}

this.#config = config;
this.#workerFactory = () =>
factory(`${name} worker #${this.#nextWorkerId++}`);

if (config.waitTimeoutMs == null) { // no timeout
if (config.waitTimeoutMs === 0) {
// no timeout
this.#waitQueue = createSimpleWaitQueue();
} else {
this.#waitQueue = new WaitQueueWithTimeout(config.waitTimeoutMs ?? 30000);
this.#waitQueue = new WaitQueueWithTimeout(config.waitTimeoutMs);
}
}

Expand All @@ -151,7 +154,7 @@ export class WorkerPool<
return Promise.resolve(this.#lendWorkerTo(idleWorker, manager));
}
if (
this.#config.maxWorkers == null ||
this.#config.maxWorkers === 0 ||
this.#busyWorkers.size < this.#config.maxWorkers
) {
return Promise.resolve(
Expand All @@ -172,17 +175,16 @@ export class WorkerPool<
}

// ensureMinWorkers will be false when we are shutting down.
unborrowWorker(
worker: W,
) {
unborrowWorker(worker: W) {
this.#busyWorkers.delete(worker.id);
const taskAdded = this.#waitQueue.shift(() => worker);
if (!taskAdded) { // worker has not been reassigned
if (!taskAdded) {
// worker has not been reassigned
const { maxWorkers } = this.#config;
// how?? xD
// We might add "urgent" tasks in the future;
// in this case the worker count might exceed `maxWorkers`.
if (maxWorkers != null && this.#workerCount >= maxWorkers) {
if (maxWorkers !== 0 && this.#workerCount >= maxWorkers) {
worker.destroy();
} else {
this.#idleWorkers.push(worker);
Expand All @@ -196,9 +198,10 @@ export class WorkerPool<
worker.destroy();
if (!shutdown) {
const taskAdded = this.#waitQueue.shift(() => this.#workerFactory());
if (!taskAdded) { // queue was empty: worker not reassigned
if (!taskAdded) {
// queue was empty: worker not reassigned
const { minWorkers } = this.#config;
if (minWorkers != null && this.#workerCount < minWorkers) {
if (this.#workerCount < minWorkers) {
this.#idleWorkers.push(this.#workerFactory());
}
}
Expand All @@ -212,7 +215,9 @@ export class WorkerPool<
clear() {
logger.warn(
`destroying idle workers: ${
this.#idleWorkers.map((w) => `"${w.id}"`).join(", ")
this.#idleWorkers
.map((w) => `"${w.id}"`)
.join(", ")
}`,
);
for (const worker of this.#idleWorkers) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0.
// SPDX-License-Identifier: MPL-2.0

import { globalConfig } from "../../config.ts";
import { getLogger } from "../../log.ts";
import { TaskContext } from "../deno/shared_types.ts";
import { DenoWorker } from "../patterns/worker_manager/deno.ts";
Expand Down Expand Up @@ -32,7 +33,11 @@ export class WorkerManager
WorkerManager.#pool = new WorkerPool(
"substantial workflows",
// TODO load from config
{},
{
minWorkers: globalConfig.min_substantial_workers,
maxWorkers: globalConfig.max_substantial_workers,
waitTimeoutMs: globalConfig.substantial_worker_wait_timeout_ms,
},
(id: string) => new DenoWorker(id, import.meta.resolve("./worker.ts")),
);
}
Expand Down

0 comments on commit 16a91dc

Please sign in to comment.