From 4b32c4cad22138c865935db118e0dee894be7105 Mon Sep 17 00:00:00 2001 From: Snazzah Date: Sat, 10 Feb 2024 22:22:31 -0600 Subject: [PATCH] feat(bot): shard concurrency --- apps/bot/package.json | 2 ++ apps/bot/src/sharding/index.ts | 2 +- apps/bot/src/sharding/manager.ts | 31 +++++++++++++++++++++++++++++++ yarn.lock | 10 ++++++++++ 4 files changed, 44 insertions(+), 1 deletion(-) diff --git a/apps/bot/package.json b/apps/bot/package.json index 2090e115..3536d817 100644 --- a/apps/bot/package.json +++ b/apps/bot/package.json @@ -42,6 +42,8 @@ "i18next": "^21.10.0", "i18next-fs-backend": "^1.1.4", "ioredis": "^5.0.6", + "just-range": "^4.2.0", + "just-split": "^3.2.0", "nanoid": "^3.3.4", "node-fetch": "2.6.7", "slash-create": "^5.10.0", diff --git a/apps/bot/src/sharding/index.ts b/apps/bot/src/sharding/index.ts index 432c28c5..f9b2b80e 100644 --- a/apps/bot/src/sharding/index.ts +++ b/apps/bot/src/sharding/index.ts @@ -17,7 +17,7 @@ process.on('unhandledRejection', (r) => logger.error('Unhandled exception:', r)) (async () => { logger.info('Starting to spawn...'); - await manager.spawnAll(); + await manager.spawnAllWithConcurrency(); logger.info( `Spawned ${manager.shards.size} shards in ${Array.from(manager.shards.values()) .map((shard) => shard.guildCount) diff --git a/apps/bot/src/sharding/manager.ts b/apps/bot/src/sharding/manager.ts index ba3dcba4..f8bbf245 100644 --- a/apps/bot/src/sharding/manager.ts +++ b/apps/bot/src/sharding/manager.ts @@ -1,4 +1,6 @@ import EventEmitter from 'eventemitter3'; +import range from 'just-range'; +import split from 'just-split'; import { wait } from '../util'; import * as logger from './logger'; @@ -9,6 +11,7 @@ import { ManagerRequestMessage } from './types'; export interface ManagerOptions { file: string; shardCount: number; + concurrency?: number; readyTimeout?: number; respawn?: boolean; args?: string[]; @@ -29,6 +32,7 @@ export default class ShardManager extends EventEmitter { { readyTimeout: options.readyTimeout ?? 30000, respawn: options.respawn ?? true, + concurrency: options.concurrency ?? 1, args: options.args ?? [], execArgv: options.execArgv ?? [] }, @@ -74,6 +78,33 @@ export default class ShardManager extends EventEmitter { } } + async spawnAllWithConcurrency(concurrency = this.options.concurrency, delay = 500) { + const spawnShard = async (id: number) => { + let retries = 0; + while (retries < 5) { + logger.info(`Spawning shard ${id}... (attempt ${retries + 1})`); + try { + retries++; + if (this.shards.has(id)) { + const shard = this.shards.get(id)!; + await shard.respawn(0); + } else await this.spawn(id); + break; + } catch (e) { + logger.error(`Failed to spawn shard ${id}`, e); + } + await wait(delay); + } + }; + + const batches = split(range(this.options.shardCount), concurrency); + for (const batchNum in batches) { + const batch = batches[batchNum]; + logger.info(`Spawning shards ${batch[0]}-${[...batch].reverse()[0]} in a batch (#${batchNum})`); + await Promise.all(batch.map(spawnShard)); + } + } + async spawnAll(delay = 500) { while (this.shards.size < this.options.shardCount) { const currentId = this.shards.size; diff --git a/yarn.lock b/yarn.lock index d2924c22..0045b0a6 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4305,6 +4305,16 @@ jsonwebtoken@^8.5.1: array-includes "^3.1.3" object.assign "^4.1.2" +just-range@^4.2.0: + version "4.2.0" + resolved "https://registry.yarnpkg.com/just-range/-/just-range-4.2.0.tgz#34d901f76b2d0ce3a0d2c98cb691b06303945122" + integrity sha512-z2Ip8H2j7a9Vr8rKFi0IZf4IXn2Yuq2lGZFS41GjfPwksoHWTaTx1xIjU6C0HyQ1jBXEoz/FzjS5eyVXeW19tg== + +just-split@^3.2.0: + version "3.2.0" + resolved "https://registry.yarnpkg.com/just-split/-/just-split-3.2.0.tgz#14552b426e4f4c48bfbd7a716d97844ff73ea774" + integrity sha512-hh57dN5koTBkmg3T6gBFISVVaW5bgZ6Ct1W5KODD5M7hQJKqGzTKkfMwOil8MBxyztLQEjh/v6UGXE8cP5tnqQ== + jwa@^1.4.1: version "1.4.1" resolved "https://registry.yarnpkg.com/jwa/-/jwa-1.4.1.tgz#743c32985cb9e98655530d53641b66c8645b039a"