Skip to content

Commit

Permalink
fix(cli): cleanup temp files
Browse files Browse the repository at this point in the history
  • Loading branch information
juanrgm committed Jan 16, 2024
1 parent f8f5fe8 commit bd8ee7b
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 91 deletions.
5 changes: 5 additions & 0 deletions .changeset/polite-hounds-kneel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@datatruck/cli": patch
---

Fix garbage collector
27 changes: 15 additions & 12 deletions packages/cli/src/actions/BackupAction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ export class BackupAction<TRequired extends boolean = true> {
}
async exec() {
const { options } = this;
const gc = new GargabeCollector();
const pm = new ProgressManager({
verbose: options.verbose,
tty: options.tty,
Expand All @@ -259,6 +260,7 @@ export class BackupAction<TRequired extends boolean = true> {
const l = new Listr3<Context>({
streams: this.options.streams,
progressManager: pm,
gargabeCollector: gc,
});
return l
.add([
Expand All @@ -283,7 +285,6 @@ export class BackupAction<TRequired extends boolean = true> {
return [
...packages.flatMap((pkg) => {
let taskResult: { snapshotPath?: string } | undefined = {};
const gc = new GargabeCollector();
const repositories = this.getRepositoryNames(
pkg.repositoryNames ?? [],
);
Expand All @@ -293,6 +294,7 @@ export class BackupAction<TRequired extends boolean = true> {
mirrors.map((mirror) => ({ name, mirror })),
);

const taskGc = gc.create();
return l.$tasks(
!!pkg.task &&
l.$task({
Expand All @@ -306,14 +308,15 @@ export class BackupAction<TRequired extends boolean = true> {
completed: `Task executed: ${pkg.name} (${pkg.task.name})`,
},
exitOnError: false,
runWrapper: gc.cleanupIfFail.bind(gc),
run: async (task) => {
using progress = pm.create(task);
taskResult = await createTask(pkg.task!).backup({
options,
package: pkg,
snapshot,
onProgress: progress.update,
await taskGc.disposeIfFail(async () => {
using progress = pm.create(task);
taskResult = await createTask(pkg.task!).backup({
options,
package: pkg,
snapshot,
onProgress: progress.update,
});
});
},
}),
Expand All @@ -333,8 +336,8 @@ export class BackupAction<TRequired extends boolean = true> {
failed: `Backup create failed: ${pkg.name} (${repositoryName})`,
},
exitOnError: false,
runWrapper: gc.cleanupOnFinish.bind(gc),
run: async (task, data) => {
await using _ = gc.create().disposeOnFinish();
const taskSummary = pkg.task
? l.result("task", pkg.name)
: undefined;
Expand Down Expand Up @@ -363,8 +366,8 @@ export class BackupAction<TRequired extends boolean = true> {
failed: "Task files clean failed",
},
exitOnError: false,
enabled: gc.pending,
run: () => gc.cleanup(),
enabled: taskGc.pending(),
run: () => taskGc.dispose(),
}),
...mirrorRepositories.map(({ name, mirror }) =>
l.$task({
Expand All @@ -383,8 +386,8 @@ export class BackupAction<TRequired extends boolean = true> {
failed: `Snapshot copy failed: ${pkg.name} (${mirror})`,
},
exitOnError: false,
runWrapper: gc.cleanup.bind(gc),
run: async (task, data) => {
await using _ = gc.create().disposeOnFinish();
const backupSummary = l.result("backup", [
pkg.name,
name,
Expand Down
67 changes: 32 additions & 35 deletions packages/cli/src/actions/RestoreAction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ export class RestoreAction<TRequired extends boolean = true> {
pkg: PackageConfig;
task: TaskAbstract | undefined;
snapshot: RestoreSnapshot;
gc: GargabeCollector;
onProgress: (progress: Progress) => void;
}) {
let { snapshot, pkg, task } = data;
Expand All @@ -125,33 +124,28 @@ export class RestoreAction<TRequired extends boolean = true> {

let snapshotPath = pkg.restorePath ?? pkg.path;

await data.gc.cleanupIfFail(async () => {
if (task) {
const taskResult = await task.prepareRestore({
options: this.options,
package: pkg,
snapshot,
});
snapshotPath = taskResult?.snapshotPath;
}
await initEmptyDir(snapshotPath);
if (this.config.minFreeDiskSpace)
await ensureFreeDiskSpace(
[snapshotPath!],
this.config.minFreeDiskSpace,
);
await repo.restore({
if (task) {
const taskResult = await task.prepareRestore({
options: this.options,
snapshot: data.snapshot,
package: pkg,
snapshotPath: snapshotPath!,
packageConfig: pkg.repositoryConfigs?.find(
(config) =>
config.type === repoConfig.type &&
(!config.names || config.names.includes(repoConfig.name)),
)?.config,
onProgress: data.onProgress,
snapshot,
});
snapshotPath = taskResult?.snapshotPath;
}
await initEmptyDir(snapshotPath);
if (this.config.minFreeDiskSpace)
await ensureFreeDiskSpace([snapshotPath!], this.config.minFreeDiskSpace);
await repo.restore({
options: this.options,
snapshot: data.snapshot,
package: pkg,
snapshotPath: snapshotPath!,
packageConfig: pkg.repositoryConfigs?.find(
(config) =>
config.type === repoConfig.type &&
(!config.names || config.names.includes(repoConfig.name)),
)?.config,
onProgress: data.onProgress,
});
return { snapshotPath };
}
Expand Down Expand Up @@ -213,6 +207,7 @@ export class RestoreAction<TRequired extends boolean = true> {
}
async exec() {
const { options } = this;
const gc = new GargabeCollector();
const pm = new ProgressManager({
verbose: options.verbose,
tty: options.tty,
Expand All @@ -222,6 +217,7 @@ export class RestoreAction<TRequired extends boolean = true> {
const l = new Listr3<Context>({
streams: options.streams,
progressManager: pm,
gargabeCollector: gc,
});

return l
Expand Down Expand Up @@ -275,17 +271,18 @@ export class RestoreAction<TRequired extends boolean = true> {
if (this.options.initial)
pkg = { ...pkg, restorePath: pkg.path };

const gc = new GargabeCollector();
const task = pkg.task ? createTask(pkg.task) : undefined;
using progress = pm.create(listTask);
const restore = await this.restore({
gc,
pkg,
task,
snapshot: snapshot,
onProgress: progress.update,
});
if (!task) return await gc.cleanup();
const restoreGc = gc.create();
const restore = await restoreGc.disposeIfFail(() =>
this.restore({
pkg,
task,
snapshot: snapshot,
onProgress: progress.update,
}),
);
if (!task) return await restoreGc.dispose();
return l.$tasks({
key: "task",
keyIndex: pkg.name,
Expand All @@ -303,8 +300,8 @@ export class RestoreAction<TRequired extends boolean = true> {
})`,
},
exitOnError: false,
runWrapper: gc.cleanup.bind(gc),
run: async (listTask) => {
await using _ = restoreGc.disposeOnFinish();
const { snapshotPath } = restore;
ok(snapshotPath);
using progress = pm.create(listTask);
Expand Down
10 changes: 4 additions & 6 deletions packages/cli/src/utils/list.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Timer, createTimer } from "./date";
import { onExit } from "./exit";
import { ProgressManager } from "./progress";
import { StdStreams, createStdStreams } from "./stream";
import { GargabeCollector } from "./temp";
import {
Listr,
ListrGetRendererClassFromValue,
Expand Down Expand Up @@ -48,7 +49,6 @@ type Listr3Task<T extends Listr3Context, K extends keyof T> = {
failed?: string;
completed?: string;
};
runWrapper?: (cb: () => any) => any;
run: (
task: ListrTaskWrapper<any, any, any>,
data: T[K],
Expand Down Expand Up @@ -96,6 +96,7 @@ export class Listr3<T extends Listr3Context> extends Listr<
readonly $options: {
streams?: StdStreams;
progressManager?: ProgressManager;
gargabeCollector?: GargabeCollector;
},
) {
const logger = new List3Logger();
Expand Down Expand Up @@ -164,11 +165,7 @@ export class Listr3<T extends Listr3Context> extends Listr<
const timer = createTimer();
if (title)
try {
const runResult = item.runWrapper
? await item.runWrapper(
async () => await item.run(task, result.data as any),
)
: await item.run(task, result.data as any);
const runResult = await item.run(task, result.data as any);
if (title.completed) task.title = title.completed;
return Array.isArray(runResult)
? task.newListr(runResult)
Expand Down Expand Up @@ -229,6 +226,7 @@ export class Listr3<T extends Listr3Context> extends Listr<
} catch (error) {
throw error;
} finally {
await this.$options.gargabeCollector?.dispose();
dispose();
}
}
Expand Down
77 changes: 39 additions & 38 deletions packages/cli/src/utils/temp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ export function tmpDir(...keys: [string, ...string[]]) {
sessionTmpDir(),
[...keys, id].map(encodeURIComponent).join("-"),
);
for (const listener of listeners) listener.paths.push(path);
if (collectors.size) {
const lastListener = [...collectors.values()].at(collectors.size - 1);
if (lastListener) lastListener.paths.add(path);
}
return path;
}

type Listener = { paths: string[] };
const listeners = new Set<Listener>();
export const collectors = new Set<GargabeCollector>();

export async function mkTmpDir(...keys: [string, ...string[]]) {
const path = tmpDir(...keys);
Expand Down Expand Up @@ -77,49 +79,48 @@ export function useTempFile(path: string): AsyncDisposable & { path: string } {
};
}

export class CleanupListener {
readonly paths: string[] = [];
stop() {
listeners.delete(this);
}
async dispose() {
this.stop();
await rmTmpDir(this.paths);
}
}

export class GargabeCollector {
protected listeners: Set<CleanupListener> = new Set();
get pending() {
return this.listeners.size > 0;
readonly paths: Set<string> = new Set();
readonly children: Set<GargabeCollector> = new Set();
constructor(protected parent?: GargabeCollector) {
collectors.add(this);
}
async cleanup(cb?: () => any) {
try {
await cb?.();
} finally {
for (const listener of this.listeners) {
this.listeners.delete(listener);
await listener.dispose();
}
}
pending() {
if (this.paths.size) return true;
for (const child of this.children) if (child.pending()) return true;
return false;
}
async cleanupOnFinish(cb: () => any) {
const cleanup = new CleanupListener();
try {
await cb();
} finally {
cleanup.dispose();
async cleanup() {
for (const path of this.paths) {
try {
await rmTmpDir(path);
this.paths.delete(path);
} catch (_) {}
}
for (const child of this.children) await child.cleanup();
}
async cleanupIfFail(cb: () => any) {
const cleanup = new CleanupListener();
async dispose() {
await this.cleanup();
collectors.delete(this);
}
async disposeIfFail<T>(cb: () => Promise<T>): Promise<T> {
try {
await cb();
return await cb();
} catch (error) {
await cleanup.dispose();
await this.dispose();
throw error;
}
this.listeners.add(cleanup);
return cleanup;
}
disposeOnFinish() {
return {
[Symbol.asyncDispose]: async () => {
return this.dispose();
},
};
}
create() {
const gc = new GargabeCollector();
this.children.add(gc);
return gc;
}
}
Loading

0 comments on commit bd8ee7b

Please sign in to comment.