Skip to content

Commit

Permalink
feat(cli): add compress option
Browse files Browse the repository at this point in the history
  • Loading branch information
juanrgm committed Oct 16, 2023
1 parent 7878312 commit d59d435
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 43 deletions.
5 changes: 5 additions & 0 deletions .changeset/sixty-lions-carry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@datatruck/cli": minor
---

Add `compress` option to `mysql-dump` task
117 changes: 89 additions & 28 deletions packages/cli/src/Task/MysqlDumpTask.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { AppError } from "../Error/AppError";
import { DefinitionEnum, makeRef } from "../JsonSchema/DefinitionEnum";
import { runParallel } from "../utils/async";
import { logExec } from "../utils/cli";
import {
Expand All @@ -7,14 +8,17 @@ import {
} from "../utils/datatruck/config";
import {
ensureEmptyDir,
ensureSingleFile,
groupFiles,
mkdirIfNotExists,
readDir,
safeRename,
} from "../utils/fs";
import { progressPercent } from "../utils/math";
import { createMysqlCli } from "../utils/mysql";
import { endsWith } from "../utils/string";
import { mkTmpDir } from "../utils/temp";
import { CompressOptions, createTar, extractTar } from "../utils/tar";
import { mkTmpDir, useTempDir, useTempFile } from "../utils/temp";
import {
SqlDumpTaskConfigType,
TargetDatabaseType,
Expand All @@ -26,8 +30,8 @@ import {
TaskRestoreData,
TaskAbstract,
} from "./TaskAbstract";
import { chmod, mkdir, readdir, rm } from "fs/promises";
import { join } from "path";
import { chmod, mkdir, readdir, rm, writeFile } from "fs/promises";
import { basename, dirname, join, relative } from "path";

export const mysqlDumpTaskName = "mysql-dump";

Expand All @@ -41,12 +45,16 @@ export type MysqlDumpTaskConfigType = {
* @default 1
*/
concurrency?: number;
compress?: boolean | CompressOptions;
} & SqlDumpTaskConfigType;

export const mysqlDumpTaskDefinition = sqlDumpTaskDefinition({
dataFormat: { enum: ["csv", "sql"] },
concurrency: { type: "integer", minimum: 1 },
csvSharedPath: { type: "string" },
compress: {
anyOf: [{ type: "boolean" }, makeRef(DefinitionEnum.compressUtil)],
},
});

const suffix = {
Expand All @@ -59,6 +67,24 @@ const suffix = {

export class MysqlDumpTask extends TaskAbstract<MysqlDumpTaskConfigType> {
override async backup(data: TaskBackupData) {
const compressAndClean = this.config.compress
? async (path: string) => {
data.onProgress({
relative: {
description: "Compressing",
payload: basename(path),
},
});
await createTar({
include: [relative(snapshotPath, path)],
output: `${path}.tar.gz`,
path: dirname(path),
compress: this.config.compress,
verbose: data.options.verbose,
});
await rm(path);
}
: undefined;
const snapshotPath =
data.package.path ??
(await mkTmpDir(mysqlDumpTaskName, "task", "backup", "snapshot"));
Expand Down Expand Up @@ -132,14 +158,18 @@ export class MysqlDumpTask extends TaskAbstract<MysqlDumpTaskConfigType> {
throw new AppError(
`Invalid csv dump files: ${files.join(", ")}`,
);
await safeRename(
join(tableSharedPath, schemaFile),
join(snapshotPath, `${tableName}${suffix.tableSchema}`),
const schemaPath = join(
snapshotPath,
`${tableName}${suffix.tableSchema}`,
);
await safeRename(
join(tableSharedPath, dataFile),
join(snapshotPath, `${tableName}${suffix.tableData}`),
await safeRename(join(tableSharedPath, schemaFile), schemaPath);
await compressAndClean?.(schemaPath);
const tablePath = join(
snapshotPath,
`${tableName}${suffix.tableData}`,
);
await safeRename(join(tableSharedPath, dataFile), tablePath);
await compressAndClean?.(tablePath);
} finally {
await rm(tableSharedPath, { recursive: true });
}
Expand Down Expand Up @@ -169,6 +199,7 @@ export class MysqlDumpTask extends TaskAbstract<MysqlDumpTaskConfigType> {
}),
});
await sql.assertDumpFile(outPath);
await compressAndClean?.(outPath);
}
},
});
Expand All @@ -194,6 +225,7 @@ export class MysqlDumpTask extends TaskAbstract<MysqlDumpTaskConfigType> {
}),
});
await sql.assertDumpFile(outPath);
await compressAndClean?.(outPath);
}

if (this.config.storedPrograms ?? true) {
Expand All @@ -210,6 +242,7 @@ export class MysqlDumpTask extends TaskAbstract<MysqlDumpTaskConfigType> {
onlyStoredPrograms: true,
});
await sql.assertDumpFile(outPath);
await compressAndClean?.(outPath);
}
return {
snapshotPath,
Expand Down Expand Up @@ -249,9 +282,9 @@ export class MysqlDumpTask extends TaskAbstract<MysqlDumpTaskConfigType> {
database: database.name,
});

const suffixes = Object.values(suffix);
const files = (await readDir(snapshotPath)).filter((f) =>
endsWith(f, suffixes),
const [files, compressed] = groupFiles(
await readDir(snapshotPath),
Object.values(suffix),
);

// Database check
Expand Down Expand Up @@ -316,11 +349,28 @@ export class MysqlDumpTask extends TaskAbstract<MysqlDumpTaskConfigType> {
},
}),
onItem: async ({ item: file, controller }) => {
await sql.importFile({
path: join(snapshotPath, file),
database: database.name,
onSpawn: (p) => (controller.stop = () => p.kill()),
});
let path = join(snapshotPath, file);
const tempDir = compressed[file]
? await useTempDir(mysqlDumpTaskName, "task", "restore", "decompress")
: undefined;
try {
if (tempDir) {
await extractTar({
input: join(snapshotPath, compressed[file]),
output: tempDir.path,
decompress: true,
verbose: data.options.verbose,
});
path = await ensureSingleFile(tempDir.path);
}
await sql.importFile({
path,
database: database.name,
onSpawn: (p) => (controller.stop = () => p.kill()),
});
} finally {
await tempDir?.[Symbol.asyncDispose]();
}
},
});

Expand All @@ -344,25 +394,36 @@ export class MysqlDumpTask extends TaskAbstract<MysqlDumpTaskConfigType> {
},
}),
onItem: async ({ item: file, controller }) => {
const filePath = join(snapshotPath, file);
const id = data.snapshot.id.slice(0, 8);
const tableName = file.slice(0, suffix.tableData.length * -1);
const sharedFilePath = join(
sharedDir!,
`tmp-dtt-restore-${data.snapshot.id.slice(
0,
8,
)}-${tableName}.data.csv`,
);
const sharedName = `tmp-dtt-restore-${id}-${tableName}.data.csv`;
const temp = useTempFile(join(sharedDir!, sharedName));

try {
await safeRename(filePath, sharedFilePath);
let csvFile = temp.path;

if (compressed[file]) {
await mkdirIfNotExists(temp.path);
await extractTar({
input: join(snapshotPath, compressed[file]),
output: temp.path,
decompress: true,
verbose: data.options.verbose,
});
csvFile = await ensureSingleFile(temp.path);
} else {
const sourceFile = join(snapshotPath, file);
await safeRename(sourceFile, temp.path);
}

await sql.importCsvFile({
path: sharedFilePath,
path: csvFile,
database: database.name,
table: tableName,
onSpawn: (p) => (controller.stop = () => p.kill()),
});
} finally {
await rm(sharedFilePath);
await temp[Symbol.asyncDispose]();
}
},
});
Expand Down
43 changes: 43 additions & 0 deletions packages/cli/src/utils/fs.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { progressPercent } from "./math";
import { rootPath } from "./path";
import { Progress } from "./progress";
import { endsWith } from "./string";
import { eachLimit } from "async";
import bytes from "bytes";
import fastFolderSize from "fast-folder-size";
Expand Down Expand Up @@ -68,6 +69,14 @@ export async function ensureEmptyDir(path: string) {
if (!(await isEmptyDir(path))) throw new Error(`Dir is not empty: ${path}`);
}

export async function ensureSingleFile(path: string) {
const files = await readDir(path);
if (files.length !== 1)
throw new Error(`Dir has not one file: ${files.length}`);
const [file] = files;
return join(path, file);
}

export async function ensureExistsDir(path: string) {
if (!(await existsDir(path))) throw new Error(`Dir is not crated: ${path}`);
}
Expand Down Expand Up @@ -691,3 +700,37 @@ export async function ensureFreeDiskSpace(
await checkFreeDiskSpace(input, inSize);
}
}

export function groupFiles(
inFiles: string[],
suffixes?: string[],
gzSuffix = ".tar.gz",
): [string[], Record<string, string>] {
const compressed: Record<string, string> = {};
if (suffixes) {
const validGzSuffixes = suffixes.map((f) => `${f}${gzSuffix}`);
inFiles = inFiles.filter(
(f) => endsWith(f, suffixes) || endsWith(f, validGzSuffixes),
);
}
const grouped = inFiles.reduce(
(items, name) => {
const key = name.endsWith(gzSuffix)
? name.slice(0, -gzSuffix.length)
: name;
if (!items[key]) items[key] = [];
items[key].push(name);
return items;
},
{} as Record<string, string[]>,
);

for (const key in grouped) {
const suffixFile = grouped[key].find((v) => v.endsWith(gzSuffix));
if (suffixFile) {
compressed[key] = suffixFile;
}
}

return [Object.keys(grouped), compressed];
}
34 changes: 23 additions & 11 deletions packages/cli/src/utils/tar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,20 @@ export type DecompressOptions = {
cores?: CoresOptions;
};

export interface CreateTarOptions {
export type CreateTarOptions = {
path: string;
verbose?: boolean;
output: string;
includeList: string;
compress?: boolean | CompressOptions;
onEntry?: (entry: TarEntry) => void;
}
} & (
| {
includeList: string;
}
| {
include: string[];
}
);

export interface ExtractOptions {
input: string;
Expand Down Expand Up @@ -168,7 +174,10 @@ async function ifX<T, R>(

export async function createTar(options: CreateTarOptions) {
const vendor = await getTarVendor(true, options.verbose);
const total = await countFileLines(options.includeList);
const total =
"include" in options
? options.include.length
: await countFileLines(options.includeList);
const compress = await ifX(options.compress, async (compress) => ({
...compress,
cores: await resolveCores(compress.cores),
Expand Down Expand Up @@ -204,8 +213,6 @@ export async function createTar(options: CreateTarOptions) {
toLocalPath(options.path),
compress?.cores === 1 ? "-czvf" : "-cvf",
toLocalPath(options.output),
"-T",
toLocalPath(options.includeList),
// https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=172293
...(vendor === "bsdtar" ? [] : ["--ignore-failed-read"]),
...(vendor === "bsdtar" ? [] : ["--force-local"]),
Expand All @@ -222,6 +229,10 @@ export async function createTar(options: CreateTarOptions) {
.join(" ")}"`,
]
: []),

...("includeList" in options
? ["-T", toLocalPath(options.includeList)]
: ["--", ...options.include]),
],
{
...(compress &&
Expand Down Expand Up @@ -264,9 +275,10 @@ export function normalizeTarPath(path: string) {
}

export async function extractTar(options: ExtractOptions) {
let total =
options.total ??
(await listTar({ input: options.input, verbose: options.verbose }));
let total = options.onEntry
? options.total ??
(await listTar({ input: options.input, verbose: options.verbose }))
: undefined;

if (!(await existsDir(options.output))) {
if (options.verbose) logExec("mkdir", ["-p", options.output]);
Expand All @@ -286,9 +298,9 @@ export async function extractTar(options: ExtractOptions) {
options.onEntry?.({
path: normalizeTarPath(path),
progress: {
total,
total: total!,
current,
percent: progressPercent(total, current),
percent: progressPercent(total!, current),
},
});
};
Expand Down
Loading

0 comments on commit d59d435

Please sign in to comment.