From 16a91dc3ac6a61eb762865f9e713051844e34ba7 Mon Sep 17 00:00:00 2001 From: Natoandro Date: Tue, 21 Jan 2025 12:30:30 +0300 Subject: [PATCH] configuration variables --- src/typegate/src/config/types.ts | 12 ++++- .../src/runtimes/deno/worker_manager.ts | 8 ++- .../runtimes/patterns/worker_manager/deno.ts | 12 ++--- .../patterns/worker_manager/pooling.ts | 51 ++++++++++--------- .../substantial/workflow_worker_manager.ts | 7 ++- 5 files changed, 57 insertions(+), 33 deletions(-) diff --git a/src/typegate/src/config/types.ts b/src/typegate/src/config/types.ts index a8102255c..b00143735 100644 --- a/src/typegate/src/config/types.ts +++ b/src/typegate/src/config/types.ts @@ -60,6 +60,16 @@ export const globalConfigSchema = z.object({ sentry_sample_rate: z.coerce.number().positive().min(0).max(1), sentry_traces_sample_rate: z.coerce.number().positive().min(0).max(1), deno_v8_flags: z.string().optional(), + min_deno_workers: z.coerce.number().positive().default(2), + max_deno_workers: z.coerce.number().positive().default(8), + deno_worker_wait_timeout_ms: z.coerce.number().positive().default(5000), + // deno_idle_worker_timeout_ms: z.coerce.number().positive().optional(), // auto remove idle workers + min_substantial_workers: z.coerce.number().positive().default(2), + max_substantial_workers: z.coerce.number().positive().default(8), + substantial_worker_wait_timeout_ms: z.coerce.number().positive().default( + 15000, + ), + // substantial_idle_worker_timeout_ms: z.coerce.number().positive().optional(), // auto remove idle workers }); export type GlobalConfig = z.infer; @@ -122,7 +132,7 @@ export type SyncConfigX = { redis: RedisConnectOptions; s3: S3ClientConfig; s3Bucket: string; - forceRemove?: boolean + forceRemove?: boolean; }; export type TypegateConfig = { diff --git a/src/typegate/src/runtimes/deno/worker_manager.ts b/src/typegate/src/runtimes/deno/worker_manager.ts index b1687a084..0b295292b 100644 --- a/src/typegate/src/runtimes/deno/worker_manager.ts +++ b/src/typegate/src/runtimes/deno/worker_manager.ts @@ -1,6 +1,7 @@ // Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0. // SPDX-License-Identifier: MPL-2.0 +import { globalConfig } from "../../config.ts"; import { getLogger } from "../../log.ts"; import { DenoWorker } from "../patterns/worker_manager/deno.ts"; import { @@ -25,8 +26,11 @@ export class WorkerManager if (!WorkerManager.#pool) { WorkerManager.#pool = new WorkerPool( "deno runtime", - // TODO load from config - {}, + { + minWorkers: globalConfig.min_deno_workers, + maxWorkers: globalConfig.max_deno_workers, + waitTimeoutMs: globalConfig.deno_worker_wait_timeout_ms, + }, (id: string) => new DenoWorker(id, import.meta.resolve("./worker.ts")), ); } diff --git a/src/typegate/src/runtimes/patterns/worker_manager/deno.ts b/src/typegate/src/runtimes/patterns/worker_manager/deno.ts index 5a3efdc7d..3137e51bf 100644 --- a/src/typegate/src/runtimes/patterns/worker_manager/deno.ts +++ b/src/typegate/src/runtimes/patterns/worker_manager/deno.ts @@ -3,7 +3,7 @@ import { envSharedWithWorkers } from "../../../config/shared.ts"; import { BaseWorker } from "./mod.ts"; -import { BaseMessage, EventHandler, TaskId } from "./types.ts"; +import { BaseMessage, EventHandler } from "./types.ts"; export interface DenoWorkerError extends BaseMessage { type: "WORKER_ERROR"; @@ -15,11 +15,11 @@ export type BaseDenoWorkerMessage = BaseMessage | DenoWorkerError; export class DenoWorker extends BaseWorker { #worker: Worker; - #taskId: TaskId; - constructor(taskId: TaskId, workerPath: string) { + #workerId: string; + constructor(workerId: string, workerPath: string) { super(); this.#worker = new Worker(workerPath, { - name: taskId, + name: workerId, type: "module", deno: { permissions: { @@ -35,7 +35,7 @@ export class DenoWorker }, }, }); - this.#taskId = taskId; + this.#workerId = workerId; } listen(handlerFn: EventHandler) { @@ -62,6 +62,6 @@ export class DenoWorker } get id() { - return this.#taskId; + return this.#workerId; } } diff --git a/src/typegate/src/runtimes/patterns/worker_manager/pooling.ts b/src/typegate/src/runtimes/patterns/worker_manager/pooling.ts index 60376b052..11d04b8af 100644 --- a/src/typegate/src/runtimes/patterns/worker_manager/pooling.ts +++ b/src/typegate/src/runtimes/patterns/worker_manager/pooling.ts @@ -8,9 +8,12 @@ import { getLogger } from "../../../log.ts"; const logger = getLogger(import.meta, "WARN"); export type PoolConfig = { - maxWorkers?: number | null; - minWorkers?: number | null; - waitTimeoutMs?: number | null; + // non-negative integer; 0 means no limit + maxWorkers: number; + // non-negative integer; must be less than or equal to `maxWorkers` if maxWorkers is not 0 + minWorkers: number; + // non-negative integer; 0 means no timeout + waitTimeoutMs: number; }; export type Consumer = (x: T) => void; @@ -87,10 +90,7 @@ export class WaitQueueWithTimeout implements WaitQueue { this.#updateTimer(); return; } - this.#timerId = setTimeout( - this.#timeoutHandler.bind(this), - timeoutMs, - ); + this.#timerId = setTimeout(this.#timeoutHandler.bind(this), timeoutMs); } else { this.#timerId = null; } @@ -124,19 +124,22 @@ export class WorkerPool< #workerFactory: () => W; #nextWorkerId = 1; - constructor( - name: string, - config: PoolConfig, - factory: (id: string) => W, - ) { + constructor(name: string, config: PoolConfig, factory: (id: string) => W) { + if (config.maxWorkers != 0 && config.minWorkers > config.maxWorkers) { + throw new Error( + "Worker pool configuration error: maxWorkers must be greater than or equal to minWorkers or be 0", + ); + } + this.#config = config; this.#workerFactory = () => factory(`${name} worker #${this.#nextWorkerId++}`); - if (config.waitTimeoutMs == null) { // no timeout + if (config.waitTimeoutMs === 0) { + // no timeout this.#waitQueue = createSimpleWaitQueue(); } else { - this.#waitQueue = new WaitQueueWithTimeout(config.waitTimeoutMs ?? 30000); + this.#waitQueue = new WaitQueueWithTimeout(config.waitTimeoutMs); } } @@ -151,7 +154,7 @@ export class WorkerPool< return Promise.resolve(this.#lendWorkerTo(idleWorker, manager)); } if ( - this.#config.maxWorkers == null || + this.#config.maxWorkers === 0 || this.#busyWorkers.size < this.#config.maxWorkers ) { return Promise.resolve( @@ -172,17 +175,16 @@ export class WorkerPool< } // ensureMinWorkers will be false when we are shutting down. - unborrowWorker( - worker: W, - ) { + unborrowWorker(worker: W) { this.#busyWorkers.delete(worker.id); const taskAdded = this.#waitQueue.shift(() => worker); - if (!taskAdded) { // worker has not been reassigned + if (!taskAdded) { + // worker has not been reassigned const { maxWorkers } = this.#config; // how?? xD // We might add "urgent" tasks in the future; // in this case the worker count might exceed `maxWorkers`. - if (maxWorkers != null && this.#workerCount >= maxWorkers) { + if (maxWorkers !== 0 && this.#workerCount >= maxWorkers) { worker.destroy(); } else { this.#idleWorkers.push(worker); @@ -196,9 +198,10 @@ export class WorkerPool< worker.destroy(); if (!shutdown) { const taskAdded = this.#waitQueue.shift(() => this.#workerFactory()); - if (!taskAdded) { // queue was empty: worker not reassigned + if (!taskAdded) { + // queue was empty: worker not reassigned const { minWorkers } = this.#config; - if (minWorkers != null && this.#workerCount < minWorkers) { + if (this.#workerCount < minWorkers) { this.#idleWorkers.push(this.#workerFactory()); } } @@ -212,7 +215,9 @@ export class WorkerPool< clear() { logger.warn( `destroying idle workers: ${ - this.#idleWorkers.map((w) => `"${w.id}"`).join(", ") + this.#idleWorkers + .map((w) => `"${w.id}"`) + .join(", ") }`, ); for (const worker of this.#idleWorkers) { diff --git a/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts b/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts index c4256f2a0..e68b28d65 100644 --- a/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts +++ b/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts @@ -1,6 +1,7 @@ // Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0. // SPDX-License-Identifier: MPL-2.0 +import { globalConfig } from "../../config.ts"; import { getLogger } from "../../log.ts"; import { TaskContext } from "../deno/shared_types.ts"; import { DenoWorker } from "../patterns/worker_manager/deno.ts"; @@ -32,7 +33,11 @@ export class WorkerManager WorkerManager.#pool = new WorkerPool( "substantial workflows", // TODO load from config - {}, + { + minWorkers: globalConfig.min_substantial_workers, + maxWorkers: globalConfig.max_substantial_workers, + waitTimeoutMs: globalConfig.substantial_worker_wait_timeout_ms, + }, (id: string) => new DenoWorker(id, import.meta.resolve("./worker.ts")), ); }