From 16d993a7b68fd6cf402dba5d05d7182b8711c94c Mon Sep 17 00:00:00 2001 From: Mark Yen Date: Thu, 12 Sep 2024 16:55:29 -0700 Subject: [PATCH] postinstall: download: retry if download is stuck This reimplements fetchWithRetry to manually watch the progress of the download, and if there has been no progress in five seconds, abort the download and retry. This will hopefully resolve issues in CI where the whole download times out after ten minutes because the downloads get stuck. Signed-off-by: Mark Yen --- .github/actions/spelling/expect.txt | 1 + scripts/lib/download.ts | 145 +++++++++++++++++++++++----- 2 files changed, 124 insertions(+), 22 deletions(-) diff --git a/.github/actions/spelling/expect.txt b/.github/actions/spelling/expect.txt index 3a0d8db1705..2c50191823d 100644 --- a/.github/actions/spelling/expect.txt +++ b/.github/actions/spelling/expect.txt @@ -820,6 +820,7 @@ udhcpc UEFI Unauthed UNCONFIGURED +Undici unexpose unexposing unfetch diff --git a/scripts/lib/download.ts b/scripts/lib/download.ts index e717447b009..3a0eb1c4dd9 100644 --- a/scripts/lib/download.ts +++ b/scripts/lib/download.ts @@ -8,6 +8,7 @@ import fs from 'fs'; import os from 'os'; import path from 'path'; import stream from 'stream'; +import timers from 'timers/promises'; import { simpleSpawn } from 'scripts/simple_process'; @@ -29,17 +30,123 @@ export type ArchiveDownloadOptions = DownloadOptions & { entryName?: string; }; -async function fetchWithRetry(url: string) { +/** + * GrowingWritable is an implementation of stream.Writable that just buffers + * everything in memory. + */ +class GrowingWritable extends stream.Writable { + protected name: string; + protected buffer = Buffer.alloc(0); + constructor(name: string) { + super(); + this.name = name; + } + + _writev(chunks: Array<{ chunk: Buffer; encoding: BufferEncoding | 'buffer'; }>, callback: (error?: Error | null) => void): void { + // Check that all chunks have 'buffer' encoding. + const unexpectedEncoding = chunks.map(({ encoding }) => encoding).find(e => e !== 'buffer'); + + if (unexpectedEncoding) { + console.log(`${ this.name }: failed to buffer to memory: ${ unexpectedEncoding }`); + callback(new Error(`Only buffer chunks are accepted, not string with encoding ${ unexpectedEncoding }`)); + + return; + } + // Copy the buffer to avoid it being lost. + try { + this.buffer = Buffer.concat([this.buffer, ...chunks.map(({ chunk }) => chunk)]); + } catch (ex: any) { + callback(ex); + + return; + } + callback(null); + } + + get text() { + return this.buffer.toString('utf-8'); + } +} + +async function fetchWithRetry(url: string): Promise; +async function fetchWithRetry(url: string, writable: fs.WriteStream): Promise; +async function fetchWithRetry(url: string, writable?: fs.WriteStream): Promise { while (true) { try { - return await fetch(url, { redirect: 'follow' }); + const response = await fetch(url, { redirect: 'follow' }); + + if (!response.ok) { + if ([429, 500, 502, 503, 504].includes(response.status)) { + // For these responses, retry the download. + await timers.setTimeout(1_000); + continue; + } + throw new Error(`Error downloading ${ url }: ${ response.statusText }`); + } + if (!response.body) { + throw new Error(`Error downloading ${ url }: did not receive response body`); + } + const outStream = writable || new GrowingWritable(url); + const streamFinished = stream.promises.finished(outStream); + const progressTimeout = 5_000; // body timeout, in milliseconds. + let abortSignal = AbortSignal.timeout(progressTimeout); + const abortedError = new Error(`Timed out reading body`, { cause: { code: 'EAI_AGAIN' } }); + const reader = response.body.getReader(); + + while (!abortSignal.aborted) { + const abortPromise = new Promise>>((resolve, reject) => { + abortSignal.onabort = () => reject(abortedError); + }); + const { value, done } = await Promise.race([reader.read(), abortPromise]); + + // Reset the abort signal on progress; we set up `onabort` on next iteration. + abortSignal.onabort = null; + abortSignal = AbortSignal.timeout(progressTimeout); + if (done) { + await new Promise(resolve => outStream.end(resolve)); + break; + } + await new Promise((resolve) => { + if (outStream.write(value)) { + resolve(); + } else { + outStream.once('drain', resolve); + } + }); + } + if (abortSignal.aborted) { + // This can happen if we timed out waiting on `outStream.write()` etc. + throw abortedError; + } + await streamFinished; + if (!writable) { + return (outStream as GrowingWritable).text; + } + + return; } catch (ex: any) { - if (ex && ex.errno === 'EAI_AGAIN') { - console.log(`Recoverable error downloading ${ url }, retrying...`); + const hasError = (ex: any, ...codes: string[]) => { + while (ex) { + if (codes.includes(ex.errno) || codes.includes(ex.code)) { + return true; + } + ex = ex.cause; + } + + return false; + }; + const errorCodes = [ + 'EAI_AGAIN', 'ECONNRESET', 'ECONNREFUSED', 'EHOSTDOWN', + 'ENETDOWN', 'ENETUNREACH', 'ENOTFOUND']; + const UndiciPrefix = 'UND_ERR_'; // spellcheck-ignore-line + + errorCodes.push(...['BODY_TIMEOUT', 'CONNECT_TIMEOUT', 'REQ_RETRY', 'SOCKET'].map(e => UndiciPrefix + e )); + if (hasError(ex, ...errorCodes)) { + console.log(`Recoverable error ${ ex } downloading ${ url }, retrying...`); continue; } console.dir(ex); - throw ex; + throw new Error(`Error downloading ${ url }`, { cause: ex }); } } } @@ -68,22 +175,22 @@ export async function download(url: string, destPath: string, options: DownloadO } } } - console.log(`Downloading ${ url } to ${ destPath }...`); + const destPathDisplay = [ + path.dirname(destPath), + path.sep, + '\x1B[0;1;33;40m', + path.basename(destPath), + '\x1B[0m', + ].join(''); + + console.log(`Downloading ${ url } to ${ destPathDisplay }`); await fs.promises.mkdir(path.dirname(destPath), { recursive: true }); - const response = await fetchWithRetry(url); - - if (!response.ok) { - throw new Error(`Error downloading ${ url }: ${ response.statusText }`); - } - if (!response.body) { - throw new Error(`Error downloading ${ url }: did not receive response body`); - } const tempPath = `${ destPath }.download`; try { const file = fs.createWriteStream(tempPath); - await response.body.pipeTo(stream.Writable.toWeb(file)); + await fetchWithRetry(url, file); if (expectedChecksum) { const actualChecksum = await getChecksumForFile(tempPath, checksumAlgorithm); @@ -139,13 +246,7 @@ async function getChecksumForFile(inputPath: string, checksumAlgorithm: Checksum * @returns The file contents. */ export async function getResource(url: string): Promise { - const response = await fetchWithRetry(url); - - if (!response.ok) { - throw new Error(`Error downloading ${ url }: ${ response.statusText }`); - } - - return await response.text(); + return await fetchWithRetry(url); } /**