From 49f38f38311d5df53d18dc9e026482d6bd63906a Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Sun, 21 Jan 2024 10:22:37 +0200 Subject: [PATCH] Fix flush() to retry fetch() on transient errors This changes flush() to retry the fetch() call on transient errors such as EPIPE, ECONNREFUSED, and ECONNRESET. --- src/http/stream.ts | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/src/http/stream.ts b/src/http/stream.ts index 154ae0c..c27d7b7 100644 --- a/src/http/stream.ts +++ b/src/http/stream.ts @@ -329,8 +329,7 @@ export class HttpStream extends Stream implements SqlOwner { let promise; try { const request = createRequest(); - const fetch = this.#fetch; - promise = fetch(request); + promise = this.#fetchWithRetry(request); } catch (error) { promise = Promise.reject(error); } @@ -356,6 +355,20 @@ export class HttpStream extends Stream implements SqlOwner { }); } + #fetchWithRetry(request: Request, retryCount: number = 3, backoff: number = 100): Promise { + const fetch = this.#fetch; + return fetch(request).catch((error) => { + if (isRetryableError(error)) { + if (retryCount > 0) { + return new Promise((resolve) => setTimeout(resolve, backoff)).then(() => { + return this.#fetchWithRetry(request, retryCount - 1, backoff * 2); + }); + } + } + return Promise.reject(error); + }); + } + #createPipelineRequest(pipeline: Array, endpoint: Endpoint): Request { return this.#createRequest( new URL(endpoint.pipelinePath, this.#baseUrl), @@ -417,6 +430,15 @@ export class HttpStream extends Stream implements SqlOwner { } } +function isRetryableError(error: any): boolean { + if (!error.errno) { + return false; + } + return error.errno === "EPIPE" + || error.errno === "ECONNREFUSED" + || error.errno === "ECONNRESET"; +} + function handlePipelineResponse(pipeline: Array, respBody: proto.PipelineRespBody): void { if (respBody.results.length !== pipeline.length) { throw new ProtoError("Server returned unexpected number of pipeline results");