diff --git a/docs/source/tutorial_notebooks/getting_started.ipynb b/docs/source/tutorial_notebooks/getting_started.ipynb index fe15f085..786364bf 100644 --- a/docs/source/tutorial_notebooks/getting_started.ipynb +++ b/docs/source/tutorial_notebooks/getting_started.ipynb @@ -169,7 +169,9 @@ "\n", "### The relative path\n", "\n", - "Datasets are registered at the data registry shared space under a relative path. For those interested, the eventual full path for the dataset will be `///`. The relative path is one of the two required parameters you must specify when registering a dataset (in the example here our relative path is `nersc_tutorial/my_desc_dataset`).\n", + "Datasets are registered at the data registry shared space under a path relative to the root directory. For those interested, the eventual full path for the dataset will be `///`. This means that the combination of `relative_path`, `owner` and `owner_type` must be unique within the registry, and therefore cannot already be taken when you register a new dataset (an exception to this is if you allow your datasets to be overwritable, see below). \n", + "\n", + "The relative path is one of the two required parameters you must specify when registering a dataset (in the example here our relative path is `nersc_tutorial/my_desc_dataset`).\n", "\n", "### The version string\n", "\n", @@ -186,7 +188,9 @@ "\n", "### Overwriting datasets\n", "\n", - "By default datasets are not overwritable. In those scenarios you will need to choose a combination of `relative_path`, `owner` and `owner_type` that is not already taken in the database. For our example we have set it so that the dataset can be overwritten so that it does not raise an error through multiple tests. Note that when a dataset has `is_overwritable=true`, the data in the shared space will be overwritten with each registration, but the entry in the data registry database is never lost (a new unique entry is created each time, and the 'old' entries will obtain `true` for their `is_overwritten` row).\n", + "By default, datasets in the data registry, once registered, are not overwritable. You can change this behavior by setting `is_overwritable=true` when registering your datasets. If `is_overwritable=true` on one of your previous datasets, you can register a new dataset with the same combination of `relative_path`, `owner` and `owner_type` as before (be warned that any previous data stored under this path will be deleted first). \n", + "\n", + "Note that whilst the data in the shared space will be overwritten with each registration when `is_overwritable=true`, the original entries in the data registry database are never lost (a new unique entry is created each time, and the 'old' entries will obtain `true` for their `is_overwritten` row).\n", "\n", "### Copying the data\n", "\n", @@ -362,7 +366,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.13" + "version": "3.10.12" } }, "nbformat": 4, diff --git a/scripts/create_registry_db.py b/scripts/create_registry_db.py index 958cd38f..b21cf36c 100644 --- a/scripts/create_registry_db.py +++ b/scripts/create_registry_db.py @@ -60,9 +60,9 @@ cols.append(Column("owner", String, nullable=False)) # To store metadata about the dataset. -cols.append(Column("data_org", String, nullable=False)) -cols.append(Column("nfiles", Integer, nullable=False)) -cols.append(Column("total_disk_space", Float, nullable=False)) +cols.append(Column("data_org", String)) +cols.append(Column("nfiles", Integer)) +cols.append(Column("total_disk_space", Float)) tab_creator.define_table("dataset", cols, [Index("relative_path", "owner", "owner_type"), UniqueConstraint("name", "version_string", diff --git a/src/dataregistry/DataRegistry.py b/src/dataregistry/DataRegistry.py index e72160b4..1ab0b5eb 100644 --- a/src/dataregistry/DataRegistry.py +++ b/src/dataregistry/DataRegistry.py @@ -113,7 +113,7 @@ def _get_root_dir(self, root_dir, site): "`root_dir` must not equal a pre-defined site with Sqlite" ) return root_dir - + # Non Sqlite case else: if root_dir is None: diff --git a/src/dataregistry/registrar.py b/src/dataregistry/registrar.py index a8c4fb77..78071b43 100644 --- a/src/dataregistry/registrar.py +++ b/src/dataregistry/registrar.py @@ -1,7 +1,6 @@ import time import os from datetime import datetime -from shutil import copyfile, copytree # from sqlalchemy import MetaData, Table, Column, insert, text, from sqlalchemy import update, select @@ -10,7 +9,7 @@ from dataregistry.db_basic import add_table_row from dataregistry.registrar_util import _form_dataset_path, get_directory_info from dataregistry.registrar_util import _parse_version_string, _bump_version -from dataregistry.registrar_util import _name_from_relpath +from dataregistry.registrar_util import _name_from_relpath, _copy_data from dataregistry.db_basic import TableMetadata # from dataregistry.exceptions import * @@ -161,6 +160,8 @@ def _handle_data(self, relative_path, old_location, owner, owner_type, verbose): Total disk space of dataset in bytes ds_creation_date : datetime When file or directory was created + success : bool + True if data copy was successful, else False """ # Get destination directory in data registry. @@ -204,14 +205,11 @@ def _handle_data(self, relative_path, old_location, owner, owner_type, verbose): f"Copying {num_files} files ({total_size/1024/1024:.2f} Mb)...", end="", ) - if dataset_organization == "file": - # Create any intervening directories - os.makedirs(os.path.dirname(dest), exist_ok=True) - copyfile(old_location, dest) - elif dataset_organization == "directory": - copytree(old_location, dest, copy_function=copyfile) + _copy_data(dataset_organization, old_location, dest) if verbose: print(f"took {time.time()-tic:.2f}") + else: + success = True return dataset_organization, num_files, total_size, ds_creation_date @@ -314,6 +312,10 @@ def register_dataset( """ Register a new dataset in the DESC data registry. + First, the dataset entry is created in the database. If success, the + data is then copied (if `old_location` was provided). Only if both + steps are successful will there be a permanent entry in the registry. + Parameters ---------- relative_path : str @@ -369,7 +371,7 @@ def register_dataset( execution_name : str, optional Typically pipeline name or program name execution_description : str, optional - Human readible description of execution + Human readable description of execution execution_start : datetime, optional Date the execution started execution_locale : str, optional @@ -447,22 +449,6 @@ def register_dataset( f"{v_fields['major']}.{v_fields['minor']}.{v_fields['patch']}" ) - # Get dataset characteristics; copy if requested - if not is_dummy: - ( - dataset_organization, - num_files, - total_size, - ds_creation_date, - ) = self._handle_data( - relative_path, old_location, owner, owner_type, verbose - ) - else: - dataset_organization = "dummy" - num_files = 0 - total_size = 0 - ds_creation_date = None - # If no execution_id is supplied, create a minimal entry if execution_id is None: if execution_name is None: @@ -481,8 +467,7 @@ def register_dataset( ) # Pull the dataset properties together - values = {"name": name} - values["relative_path"] = relative_path + values = {"name": name, "relative_path": relative_path} values["version_major"] = v_fields["major"] values["version_minor"] = v_fields["minor"] values["version_patch"] = v_fields["patch"] @@ -491,9 +476,6 @@ def register_dataset( values["version_suffix"] = version_suffix if creation_date: values["dataset_creation_date"] = creation_date - else: - if ds_creation_date: - values["dataset_creation_date"] = ds_creation_date if description: values["description"] = description if execution_id: @@ -506,9 +488,10 @@ def register_dataset( values["owner_type"] = owner_type values["owner"] = owner values["creator_uid"] = self._uid - values["data_org"] = dataset_organization - values["nfiles"] = num_files - values["total_disk_space"] = total_size / 1024 / 1024 # Mb + + # We tentatively start with an "invalid" dataset in the database. This + # will be upgraded to True if the data copying (if any) was successful. + values["is_valid"] = False # Create a new row in the data registry database. with self._engine.connect() as conn: @@ -524,6 +507,38 @@ def register_dataset( conn.execute(update_stmt) conn.commit() + # Get dataset characteristics; copy to `root_dir` if requested + if not is_dummy: + ( + dataset_organization, + num_files, + total_size, + ds_creation_date, + ) = self._handle_data( + relative_path, old_location, owner, owner_type, verbose + ) + else: + dataset_organization = "dummy" + num_files = 0 + total_size = 0 + ds_creation_date = None + + # Copy was successful, update the entry with dataset metadata + with self._engine.connect() as conn: + update_stmt = ( + update(dataset_table) + .where(dataset_table.c.dataset_id == prim_key) + .values( + data_org=dataset_organization, + nfiles=num_files, + total_disk_space=total_size / 1024 / 1024, + dataset_creation_date=ds_creation_date, + is_valid=True, + ) + ) + conn.execute(update_stmt) + conn.commit() + return prim_key, execution_id def register_dataset_alias(self, aliasname, dataset_id): diff --git a/src/dataregistry/registrar_util.py b/src/dataregistry/registrar_util.py index 4fa1db67..9b57dc30 100644 --- a/src/dataregistry/registrar_util.py +++ b/src/dataregistry/registrar_util.py @@ -1,6 +1,8 @@ +import hashlib import os import re from sqlalchemy import MetaData, Table, Column, text, select +from shutil import copyfile, copytree, rmtree __all__ = [ "_parse_version_string", @@ -8,6 +10,7 @@ "_form_dataset_path", "get_directory_info", "_name_from_relpath", + "_copy_data", ] VERSION_SEPARATOR = "." _nonneg_int_re = "0|[1-9][0-9]*" @@ -207,3 +210,85 @@ def _name_from_relpath(relative_path): name = base return name + + +def _copy_data(dataset_organization, source, dest, do_checksum=True): + """ + Copy data from one location to another (for ingesting directories and files + into the `root_dir` shared space. + + Note prior to this, in `_handle_data`, it has already been check that + `source` exists, so we do not have to check again. + + To ensure robustness, if overwriting data, the original file/folder is + moved to a temporary location, then deleted if the copy was successful. If + the copy was not successful the backup is renamed back. + + For individual files a checksum validation can be performed if + `do_checksum=True`, there is no such check for directories. + + Parameters + ---------- + dataset_organization : str + The dataset organization, either "file" or "directory" + source : str + Path of source file or directory + dest : str + Destination we are copying to + do_checksum : bool + When overwriting files, do a checksum with the old and new file + """ + + def _compute_checksum(file_path): + hasher = hashlib.sha256() + with open(file_path, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hasher.update(chunk) + return hasher.hexdigest() + + temp_dest = dest + "_DATAREG_backup" + + try: + # Backup original before copy + if os.path.exists(dest): + os.rename(dest, temp_dest) + + # Create any intervening directories + os.makedirs(os.path.dirname(dest), exist_ok=True) + + # Copy a single file + if dataset_organization == "file": + copyfile(source, dest) + + # Checksums on the files + if do_checksum and os.path.exists(temp_dest): + cs_dest = _compute_checksum(dest) + cs_dest_backup = _compute_checksum(temp_dest) + + if cs_dest != cs_dest_backup: + raise Exception("Checksum with backup failed") + + # Copy a single directory (and subdirectories) + elif dataset_organization == "directory": + copytree(source, dest, copy_function=copyfile) + + # If successful, delete the backup + if os.path.exists(temp_dest): + if dataset_organization == "file": + os.remove(temp_dest) + else: + rmtree(temp_dest) + + except Exception as e: + if os.path.exists(temp_dest): + if os.path.exists(dest): + rmtree(dest) + os.rename(temp_dest, dest) + + print( + f"Something went wrong during data copying, aborting." + "Note an entry in the registry database will still have" + "been created" + ) + + raise Exception(e) diff --git a/tests/unit_tests/test_rutil_copy_data.py b/tests/unit_tests/test_rutil_copy_data.py new file mode 100644 index 00000000..91a20c38 --- /dev/null +++ b/tests/unit_tests/test_rutil_copy_data.py @@ -0,0 +1,72 @@ +import pytest +import os +from dataregistry.registrar_util import _copy_data + + +@pytest.fixture +def dummy_file(tmp_path): + """Create some dummy (temporary) files and directories""" + + # Root temp source dir for files + tmp_src_dir = tmp_path / "source" + tmp_src_dir.mkdir() + + # Make a dummy file + p = tmp_src_dir / "dummy_standalone_file.txt" + p.write_text("dummy stand alone file") + + # Make a dummy directory with a file in it + p = tmp_src_dir / "tmpdir" / "tmpdir2" + p.mkdir(parents=True) + q = p / "dummy_file_within_folder.txt" + q.write_text("dummy file within folder") + + # Root temp dest dir to copy into + tmp_dest_dir = tmp_path / "dest" + tmp_dest_dir.mkdir() + + return tmp_src_dir, tmp_dest_dir + + +def test_copy_file(dummy_file): + """ + Test copying files and directories + + Each test is looped twice, the 2nd emulating overwriting a dataset. + """ + + tmp_src_dir, tmp_dest_dir = dummy_file + + # Copy a single file from source to destination + for i in range(2): + _copy_data( + "file", + str(tmp_src_dir / "dummy_standalone_file.txt"), + str(tmp_dest_dir / "dummy_standalone_file.txt"), + ) + + p = tmp_dest_dir / "dummy_standalone_file.txt" + assert os.path.isfile(p) + assert p.read_text() == "dummy stand alone file" + + +def test_copy_directory(dummy_file): + """ + Test copying files and directories + + Each test is looped twice, the 2nd emulating overwriting a dataset. + """ + + tmp_src_dir, tmp_dest_dir = dummy_file + + # Copy a single directory from source to destination + for i in range(2): + _copy_data( + "directory", str(tmp_src_dir / "tmpdir"), str(tmp_dest_dir / "tmpdir") + ) + + assert os.path.isdir(tmp_dest_dir / "tmpdir") + assert os.path.isdir(tmp_dest_dir / "tmpdir" / "tmpdir2") + p = tmp_dest_dir / "tmpdir" / "tmpdir2" / "dummy_file_within_folder.txt" + assert os.path.isfile(p) + assert p.read_text() == "dummy file within folder"