From d59d435fc1a244da944ad0c36bf1dcf7735ba289 Mon Sep 17 00:00:00 2001 From: Juanra GM Date: Mon, 16 Oct 2023 13:41:48 +0200 Subject: [PATCH] feat(cli): add `compress` option --- .changeset/sixty-lions-carry.md | 5 + packages/cli/src/Task/MysqlDumpTask.ts | 117 ++++++++++++++++------ packages/cli/src/utils/fs.ts | 43 ++++++++ packages/cli/src/utils/tar.ts | 34 +++++-- packages/cli/src/utils/temp.ts | 28 ++++++ packages/cli/test/mysql-dump-task.test.ts | 12 ++- packages/cli/test/utils/fs.test.ts | 49 ++++++++- 7 files changed, 245 insertions(+), 43 deletions(-) create mode 100644 .changeset/sixty-lions-carry.md diff --git a/.changeset/sixty-lions-carry.md b/.changeset/sixty-lions-carry.md new file mode 100644 index 0000000..ffcdb01 --- /dev/null +++ b/.changeset/sixty-lions-carry.md @@ -0,0 +1,5 @@ +--- +"@datatruck/cli": minor +--- + +Add `compress` option to `mysql-dump` task diff --git a/packages/cli/src/Task/MysqlDumpTask.ts b/packages/cli/src/Task/MysqlDumpTask.ts index 047a7d2..81f04d8 100644 --- a/packages/cli/src/Task/MysqlDumpTask.ts +++ b/packages/cli/src/Task/MysqlDumpTask.ts @@ -1,4 +1,5 @@ import { AppError } from "../Error/AppError"; +import { DefinitionEnum, makeRef } from "../JsonSchema/DefinitionEnum"; import { runParallel } from "../utils/async"; import { logExec } from "../utils/cli"; import { @@ -7,6 +8,8 @@ import { } from "../utils/datatruck/config"; import { ensureEmptyDir, + ensureSingleFile, + groupFiles, mkdirIfNotExists, readDir, safeRename, @@ -14,7 +17,8 @@ import { import { progressPercent } from "../utils/math"; import { createMysqlCli } from "../utils/mysql"; import { endsWith } from "../utils/string"; -import { mkTmpDir } from "../utils/temp"; +import { CompressOptions, createTar, extractTar } from "../utils/tar"; +import { mkTmpDir, useTempDir, useTempFile } from "../utils/temp"; import { SqlDumpTaskConfigType, TargetDatabaseType, @@ -26,8 +30,8 @@ import { TaskRestoreData, TaskAbstract, } from "./TaskAbstract"; -import { chmod, mkdir, readdir, rm } from "fs/promises"; -import { join } from "path"; +import { chmod, mkdir, readdir, rm, writeFile } from "fs/promises"; +import { basename, dirname, join, relative } from "path"; export const mysqlDumpTaskName = "mysql-dump"; @@ -41,12 +45,16 @@ export type MysqlDumpTaskConfigType = { * @default 1 */ concurrency?: number; + compress?: boolean | CompressOptions; } & SqlDumpTaskConfigType; export const mysqlDumpTaskDefinition = sqlDumpTaskDefinition({ dataFormat: { enum: ["csv", "sql"] }, concurrency: { type: "integer", minimum: 1 }, csvSharedPath: { type: "string" }, + compress: { + anyOf: [{ type: "boolean" }, makeRef(DefinitionEnum.compressUtil)], + }, }); const suffix = { @@ -59,6 +67,24 @@ const suffix = { export class MysqlDumpTask extends TaskAbstract { override async backup(data: TaskBackupData) { + const compressAndClean = this.config.compress + ? async (path: string) => { + data.onProgress({ + relative: { + description: "Compressing", + payload: basename(path), + }, + }); + await createTar({ + include: [relative(snapshotPath, path)], + output: `${path}.tar.gz`, + path: dirname(path), + compress: this.config.compress, + verbose: data.options.verbose, + }); + await rm(path); + } + : undefined; const snapshotPath = data.package.path ?? (await mkTmpDir(mysqlDumpTaskName, "task", "backup", "snapshot")); @@ -132,14 +158,18 @@ export class MysqlDumpTask extends TaskAbstract { throw new AppError( `Invalid csv dump files: ${files.join(", ")}`, ); - await safeRename( - join(tableSharedPath, schemaFile), - join(snapshotPath, `${tableName}${suffix.tableSchema}`), + const schemaPath = join( + snapshotPath, + `${tableName}${suffix.tableSchema}`, ); - await safeRename( - join(tableSharedPath, dataFile), - join(snapshotPath, `${tableName}${suffix.tableData}`), + await safeRename(join(tableSharedPath, schemaFile), schemaPath); + await compressAndClean?.(schemaPath); + const tablePath = join( + snapshotPath, + `${tableName}${suffix.tableData}`, ); + await safeRename(join(tableSharedPath, dataFile), tablePath); + await compressAndClean?.(tablePath); } finally { await rm(tableSharedPath, { recursive: true }); } @@ -169,6 +199,7 @@ export class MysqlDumpTask extends TaskAbstract { }), }); await sql.assertDumpFile(outPath); + await compressAndClean?.(outPath); } }, }); @@ -194,6 +225,7 @@ export class MysqlDumpTask extends TaskAbstract { }), }); await sql.assertDumpFile(outPath); + await compressAndClean?.(outPath); } if (this.config.storedPrograms ?? true) { @@ -210,6 +242,7 @@ export class MysqlDumpTask extends TaskAbstract { onlyStoredPrograms: true, }); await sql.assertDumpFile(outPath); + await compressAndClean?.(outPath); } return { snapshotPath, @@ -249,9 +282,9 @@ export class MysqlDumpTask extends TaskAbstract { database: database.name, }); - const suffixes = Object.values(suffix); - const files = (await readDir(snapshotPath)).filter((f) => - endsWith(f, suffixes), + const [files, compressed] = groupFiles( + await readDir(snapshotPath), + Object.values(suffix), ); // Database check @@ -316,11 +349,28 @@ export class MysqlDumpTask extends TaskAbstract { }, }), onItem: async ({ item: file, controller }) => { - await sql.importFile({ - path: join(snapshotPath, file), - database: database.name, - onSpawn: (p) => (controller.stop = () => p.kill()), - }); + let path = join(snapshotPath, file); + const tempDir = compressed[file] + ? await useTempDir(mysqlDumpTaskName, "task", "restore", "decompress") + : undefined; + try { + if (tempDir) { + await extractTar({ + input: join(snapshotPath, compressed[file]), + output: tempDir.path, + decompress: true, + verbose: data.options.verbose, + }); + path = await ensureSingleFile(tempDir.path); + } + await sql.importFile({ + path, + database: database.name, + onSpawn: (p) => (controller.stop = () => p.kill()), + }); + } finally { + await tempDir?.[Symbol.asyncDispose](); + } }, }); @@ -344,25 +394,36 @@ export class MysqlDumpTask extends TaskAbstract { }, }), onItem: async ({ item: file, controller }) => { - const filePath = join(snapshotPath, file); + const id = data.snapshot.id.slice(0, 8); const tableName = file.slice(0, suffix.tableData.length * -1); - const sharedFilePath = join( - sharedDir!, - `tmp-dtt-restore-${data.snapshot.id.slice( - 0, - 8, - )}-${tableName}.data.csv`, - ); + const sharedName = `tmp-dtt-restore-${id}-${tableName}.data.csv`; + const temp = useTempFile(join(sharedDir!, sharedName)); + try { - await safeRename(filePath, sharedFilePath); + let csvFile = temp.path; + + if (compressed[file]) { + await mkdirIfNotExists(temp.path); + await extractTar({ + input: join(snapshotPath, compressed[file]), + output: temp.path, + decompress: true, + verbose: data.options.verbose, + }); + csvFile = await ensureSingleFile(temp.path); + } else { + const sourceFile = join(snapshotPath, file); + await safeRename(sourceFile, temp.path); + } + await sql.importCsvFile({ - path: sharedFilePath, + path: csvFile, database: database.name, table: tableName, onSpawn: (p) => (controller.stop = () => p.kill()), }); } finally { - await rm(sharedFilePath); + await temp[Symbol.asyncDispose](); } }, }); diff --git a/packages/cli/src/utils/fs.ts b/packages/cli/src/utils/fs.ts index 64478d6..e498c6f 100644 --- a/packages/cli/src/utils/fs.ts +++ b/packages/cli/src/utils/fs.ts @@ -1,6 +1,7 @@ import { progressPercent } from "./math"; import { rootPath } from "./path"; import { Progress } from "./progress"; +import { endsWith } from "./string"; import { eachLimit } from "async"; import bytes from "bytes"; import fastFolderSize from "fast-folder-size"; @@ -68,6 +69,14 @@ export async function ensureEmptyDir(path: string) { if (!(await isEmptyDir(path))) throw new Error(`Dir is not empty: ${path}`); } +export async function ensureSingleFile(path: string) { + const files = await readDir(path); + if (files.length !== 1) + throw new Error(`Dir has not one file: ${files.length}`); + const [file] = files; + return join(path, file); +} + export async function ensureExistsDir(path: string) { if (!(await existsDir(path))) throw new Error(`Dir is not crated: ${path}`); } @@ -691,3 +700,37 @@ export async function ensureFreeDiskSpace( await checkFreeDiskSpace(input, inSize); } } + +export function groupFiles( + inFiles: string[], + suffixes?: string[], + gzSuffix = ".tar.gz", +): [string[], Record] { + const compressed: Record = {}; + if (suffixes) { + const validGzSuffixes = suffixes.map((f) => `${f}${gzSuffix}`); + inFiles = inFiles.filter( + (f) => endsWith(f, suffixes) || endsWith(f, validGzSuffixes), + ); + } + const grouped = inFiles.reduce( + (items, name) => { + const key = name.endsWith(gzSuffix) + ? name.slice(0, -gzSuffix.length) + : name; + if (!items[key]) items[key] = []; + items[key].push(name); + return items; + }, + {} as Record, + ); + + for (const key in grouped) { + const suffixFile = grouped[key].find((v) => v.endsWith(gzSuffix)); + if (suffixFile) { + compressed[key] = suffixFile; + } + } + + return [Object.keys(grouped), compressed]; +} diff --git a/packages/cli/src/utils/tar.ts b/packages/cli/src/utils/tar.ts index 8bbe640..23747f5 100644 --- a/packages/cli/src/utils/tar.ts +++ b/packages/cli/src/utils/tar.ts @@ -34,14 +34,20 @@ export type DecompressOptions = { cores?: CoresOptions; }; -export interface CreateTarOptions { +export type CreateTarOptions = { path: string; verbose?: boolean; output: string; - includeList: string; compress?: boolean | CompressOptions; onEntry?: (entry: TarEntry) => void; -} +} & ( + | { + includeList: string; + } + | { + include: string[]; + } +); export interface ExtractOptions { input: string; @@ -168,7 +174,10 @@ async function ifX( export async function createTar(options: CreateTarOptions) { const vendor = await getTarVendor(true, options.verbose); - const total = await countFileLines(options.includeList); + const total = + "include" in options + ? options.include.length + : await countFileLines(options.includeList); const compress = await ifX(options.compress, async (compress) => ({ ...compress, cores: await resolveCores(compress.cores), @@ -204,8 +213,6 @@ export async function createTar(options: CreateTarOptions) { toLocalPath(options.path), compress?.cores === 1 ? "-czvf" : "-cvf", toLocalPath(options.output), - "-T", - toLocalPath(options.includeList), // https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=172293 ...(vendor === "bsdtar" ? [] : ["--ignore-failed-read"]), ...(vendor === "bsdtar" ? [] : ["--force-local"]), @@ -222,6 +229,10 @@ export async function createTar(options: CreateTarOptions) { .join(" ")}"`, ] : []), + + ...("includeList" in options + ? ["-T", toLocalPath(options.includeList)] + : ["--", ...options.include]), ], { ...(compress && @@ -264,9 +275,10 @@ export function normalizeTarPath(path: string) { } export async function extractTar(options: ExtractOptions) { - let total = - options.total ?? - (await listTar({ input: options.input, verbose: options.verbose })); + let total = options.onEntry + ? options.total ?? + (await listTar({ input: options.input, verbose: options.verbose })) + : undefined; if (!(await existsDir(options.output))) { if (options.verbose) logExec("mkdir", ["-p", options.output]); @@ -286,9 +298,9 @@ export async function extractTar(options: ExtractOptions) { options.onEntry?.({ path: normalizeTarPath(path), progress: { - total, + total: total!, current, - percent: progressPercent(total, current), + percent: progressPercent(total!, current), }, }); }; diff --git a/packages/cli/src/utils/temp.ts b/packages/cli/src/utils/temp.ts index 2f73f1b..74344bd 100644 --- a/packages/cli/src/utils/temp.ts +++ b/packages/cli/src/utils/temp.ts @@ -4,6 +4,9 @@ import { randomUUID } from "crypto"; import { mkdir, rm } from "fs/promises"; import { join } from "path"; +(Symbol as any).dispose ??= Symbol("Symbol.dispose"); +(Symbol as any).asyncDispose ??= Symbol("Symbol.asyncDispose"); + export function parentTmpDir() { return join(globalData.tempDir, "datatruck-temp"); } @@ -49,6 +52,31 @@ export async function mkTmpDir(...keys: [string, ...string[]]) { return path; } +export async function useTempDir( + ...keys: [string, ...string[]] +): Promise { + const path = await mkTmpDir(...keys); + return { + path, + async [Symbol.asyncDispose]() { + try { + await rmTmpDir(path); + } catch (_) {} + }, + }; +} + +export function useTempFile(path: string): AsyncDisposable & { path: string } { + return { + path, + async [Symbol.asyncDispose]() { + try { + await rm(path, { recursive: true }); + } catch (_) {} + }, + }; +} + export class CleanupListener { readonly paths: string[] = []; stop() { diff --git a/packages/cli/test/mysql-dump-task.test.ts b/packages/cli/test/mysql-dump-task.test.ts index ad51ea6..b68bf98 100644 --- a/packages/cli/test/mysql-dump-task.test.ts +++ b/packages/cli/test/mysql-dump-task.test.ts @@ -12,8 +12,13 @@ const repositoryTypes = parseStringList( const dataFormats = parseStringList( process.env.DTT_DATA_FORMAT, - ["sql" as const, "csv" as const], - process.env.CI ? ["sql"] : true, + [ + "sql" as const, + "csv" as const, + "sql-compress" as const, + "csv-compress" as const, + ], + process.env.CI ? ["sql", "sql-compress"] : true, ); describe( @@ -84,12 +89,13 @@ describe( task: { name: "mysql-dump", config: { - dataFormat, + dataFormat: dataFormat.startsWith("sql") ? "sql" : "csv", database: dbName, hostname: sql.options.hostname, username: sql.options.username, password: sql.options.password, port: sql.options.port, + compress: dataFormat.endsWith("-compress"), targetDatabase: { name: `${dbName}_{snapshotId}`, }, diff --git a/packages/cli/test/utils/fs.test.ts b/packages/cli/test/utils/fs.test.ts index c0d3714..1705955 100644 --- a/packages/cli/test/utils/fs.test.ts +++ b/packages/cli/test/utils/fs.test.ts @@ -1,7 +1,9 @@ import { createWriteStreamPool, ensureFreeDiskSpace, + ensureSingleFile, fetchDiskStats, + groupFiles, } from "../../src/utils/fs"; import { isTmpDir, @@ -11,7 +13,7 @@ import { tmpDir, } from "../../src/utils/temp"; import { randomBytes } from "crypto"; -import { mkdir, readFile, rm, rmdir } from "fs/promises"; +import { mkdir, readFile, rm, rmdir, writeFile } from "fs/promises"; import { tmpdir } from "os"; import { join, normalize } from "path"; import { it } from "vitest"; @@ -100,3 +102,48 @@ describe("ensureFreeDiskSpace", async () => { ).rejects.toThrowError(); }); }); + +describe("ensureSingleFile", async () => { + it("passes", async () => { + const path = await mkTmpDir("test", "ensureSingleFile"); + await writeFile(path + "/f1", ""); + await expect(ensureSingleFile(path)).resolves.toBe(join(path, "f1")); + }); + it("fails with empty dir", async () => { + const path = await mkTmpDir("test", "ensureSingleFile"); + await expect(ensureSingleFile(path)).rejects.toThrowError(); + }); + it("fails with dir", async () => { + const path = await mkTmpDir("test", "ensureSingleFile"); + await writeFile(path + "/f1", ""); + await writeFile(path + "/f2", ""); + await expect(ensureSingleFile(path)).rejects.toThrowError(); + }); + it("fails uncreated dir", async () => { + const path = await mkTmpDir("test", "ensureSingleFile"); + await rmdir(path); + await expect(ensureSingleFile(path)).rejects.toThrowError(); + }); +}); + +describe("groupFiles", async () => { + it("groups files", async () => { + const [files, compressed] = groupFiles( + [ + "a.sql", + "b.sql", + "c.sql.tar.gz", + "d.sql", + "d.sql.tar.gz", + "otherfile.tar.gz", + ], + [".sql"], + ); + + expect(files).toEqual(["a.sql", "b.sql", "c.sql", "d.sql"]); + expect(compressed).toEqual({ + "c.sql": "c.sql.tar.gz", + "d.sql": "d.sql.tar.gz", + }); + }); +});