Skip to content

Commit

Permalink
Fix flush() to retry fetch() on transient errors
Browse files Browse the repository at this point in the history
This changes flush() to retry the fetch() call on transient errors such
as EPIPE, ECONNREFUSED, and ECONNRESET.
  • Loading branch information
penberg committed Jan 21, 2024
1 parent 5db0765 commit 5d5bcdf
Showing 1 changed file with 24 additions and 2 deletions.
26 changes: 24 additions & 2 deletions src/http/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,7 @@ export class HttpStream extends Stream implements SqlOwner {
let promise;
try {
const request = createRequest();
const fetch = this.#fetch;
promise = fetch(request);
promise = this.#fetchWithRetry(request);
} catch (error) {
promise = Promise.reject(error);
}
Expand All @@ -356,6 +355,20 @@ export class HttpStream extends Stream implements SqlOwner {
});
}

#fetchWithRetry(request: Request, retryCount = 3): Promise<Response> {
try {
const fetch = this.#fetch;
return fetch(request);
} catch (error: any) {
if (isRetryableError(error)) {
if (retryCount > 0) {
return this.#fetchWithRetry(request, retryCount - 1);
}
}
throw error;
}
}

#createPipelineRequest(pipeline: Array<PipelineEntry>, endpoint: Endpoint): Request {
return this.#createRequest<proto.PipelineReqBody>(
new URL(endpoint.pipelinePath, this.#baseUrl),
Expand Down Expand Up @@ -417,6 +430,15 @@ export class HttpStream extends Stream implements SqlOwner {
}
}

function isRetryableError(error: any): boolean {
if (!error.errno) {
return false;
}
return error.errno === "EPIPE"
|| error.errno === "ECONNREFUSED"
|| error.errno === "ECONNRESET";
}

function handlePipelineResponse(pipeline: Array<PipelineEntry>, respBody: proto.PipelineRespBody): void {
if (respBody.results.length !== pipeline.length) {
throw new ProtoError("Server returned unexpected number of pipeline results");
Expand Down

0 comments on commit 5d5bcdf

Please sign in to comment.