Skip to content

Commit

Permalink
fix(cli): try fix connection aborted error
Browse files Browse the repository at this point in the history
  • Loading branch information
juanrgm committed Feb 23, 2024
1 parent 067bfee commit d6c84c3
Show file tree
Hide file tree
Showing 8 changed files with 350 additions and 221 deletions.
5 changes: 5 additions & 0 deletions .changeset/late-sheep-rush.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@datatruck/cli": patch
---

Fix `Connection aborted` error
10 changes: 10 additions & 0 deletions packages/cli/src/utils/crypto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,13 @@ export function calcFileHash(path: string, algorithm: string) {
.on("end", () => resolve(hash.digest("hex")));
});
}

export async function assertFileChecksum(
path: string,
checksum: string,
algorithm: string,
) {
const fileChecksum = await calcFileHash(path, algorithm);
if (fileChecksum !== checksum)
throw new Error(`Invalid checksum file: ${checksum} != ${fileChecksum}`);
}
35 changes: 19 additions & 16 deletions packages/cli/src/utils/datatruck/client.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { logExec } from "../cli";
import { DiskStats } from "../fs";
import { downloadFile, fetchJson, post, uploadFile } from "../http";
import { createHref, downloadFile, fetchJson, post, uploadFile } from "../http";
import { BasicProgress } from "../progress";
import { AbstractFs, FsOptions, LocalFs } from "../virtual-fs";
import { headerKey } from "./repository-server";
Expand All @@ -24,19 +24,19 @@ export class RemoteFs extends AbstractFs {
return false;
}
protected async fetchJson(name: string, params: any[]) {
return await fetchJson(`${this.url}/${name}`, {
const url = createHref(`${this.url}/${name}`, {
params: JSON.stringify(params),
});
return await fetchJson(url, {
headers: this.headers,
query: {
params: JSON.stringify(params),
},
});
}
protected async post(name: string, params: any[], data: string) {
return await post(`${this.url}/${name}`, data, {
const url = createHref(`${this.url}/${name}`, {
params: JSON.stringify(params),
});
return await post(url, data, {
headers: this.headers,
query: {
params: JSON.stringify(params),
},
});
}
override async existsDir(path: string): Promise<boolean> {
Expand All @@ -61,7 +61,7 @@ export class RemoteFs extends AbstractFs {
return await this.fetchJson("readdir", [path]);
}
override async writeFile(path: string, contents: string): Promise<void> {
return await this.post("writeFile", [path], contents);
await this.post("writeFile", [path], contents);
}
override async rmAll(path: string): Promise<void> {
return await this.fetchJson("rmAll", [path]);
Expand All @@ -72,11 +72,12 @@ export class RemoteFs extends AbstractFs {
}
override async upload(source: string, target: string): Promise<void> {
if (this.options.verbose) logExec("fs.upload", [source, target]);
return await uploadFile(`${this.url}/upload`, source, {
const url = createHref(`${this.url}/upload`, {
params: JSON.stringify([target]),
});
return await uploadFile(url, source, {
headers: this.headers,
query: {
params: JSON.stringify([target]),
},
checksum: true,
});
}
override async download(
Expand All @@ -88,10 +89,12 @@ export class RemoteFs extends AbstractFs {
} = {},
): Promise<{ bytes: number }> {
if (this.options.verbose) logExec("fs.download", [source, target]);
return await downloadFile(`${this.url}/download`, target, {
const url = createHref(`${this.url}/download`, {
params: JSON.stringify([source]),
});
return await downloadFile(url, target, {
...options,
headers: this.headers,
query: { params: JSON.stringify([source]) },
});
}
}
Expand Down
22 changes: 6 additions & 16 deletions packages/cli/src/utils/datatruck/repository-server.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import { ConfigAction } from "../../actions/ConfigAction";
import { logJson } from "../cli";
import { readRequestData } from "../http";
import { readRequestData, recvFile, sendFile } from "../http";
import { Counter } from "../math";
import { LocalFs } from "../virtual-fs";
import { createReadStream, createWriteStream } from "fs";
import { stat } from "fs/promises";
import { IncomingMessage, createServer } from "http";
import { pipeline } from "stream/promises";

type User = {
enabled?: boolean;
Expand Down Expand Up @@ -144,15 +141,11 @@ export function createDatatruckRepositoryServer(
} else if (action === "upload") {
const [target] = params;
const path = fs.resolvePath(target);
const file = createWriteStream(path);
await pipeline(req, file);
await recvFile(req, res, path);
} else if (action === "download") {
const [target] = params;
const path = fs.resolvePath(target);
const file = createReadStream(path);
const fileStat = await stat(path);
res.setHeader("Content-Length", fileStat.size);
await pipeline(file, res, { end: false });
await sendFile(req, res, path);
} else if (action === "writeFile") {
const data = await readRequestData(req);
const [target] = params;
Expand All @@ -169,7 +162,8 @@ export function createDatatruckRepositoryServer(
logJson("repository-server", "request failed", { id });
console.error(error);
}
if (!res.headersSent) res.writeHead(500, (error as Error).message);
if (!res.writableEnded && !res.headersSent)
res.writeHead(500, (error as Error).message);
} finally {
if (requestError) {
logJson("repository-server", "request error", { id });
Expand All @@ -180,11 +174,7 @@ export function createDatatruckRepositoryServer(
console.error(responseError);
}

if (requestError || responseError) {
res.destroy();
} else {
res.end();
}
if (!res.writableEnded) res.end();
}
});
}
Loading

0 comments on commit d6c84c3

Please sign in to comment.