diff --git a/speedwagon/workflows/workflow_medusa_preingest.py b/speedwagon/workflows/workflow_medusa_preingest.py index b83cd481c..b86935a50 100644 --- a/speedwagon/workflows/workflow_medusa_preingest.py +++ b/speedwagon/workflows/workflow_medusa_preingest.py @@ -7,6 +7,7 @@ import abc import os +import typing from typing import List, Any, Dict, Optional, Iterator, Union, Callable from pathlib import Path import speedwagon @@ -67,19 +68,37 @@ def discover_task_metadata( ) -> List[dict]: """Organize the order the files & directories should be removed.""" new_tasks: List[Dict[str, str]] = [] + to_remove: typing.Set[str] = set() - for file_path in additional_data["files"]: - new_tasks.append({ - "type": "file", - "path": file_path - }) - - for directory_path in additional_data['directories']: - new_tasks.append({ - "type": "directory", - "path": directory_path - }) - + for item in additional_data.get('to remove', []): + if str(item) in to_remove: + continue + if os.path.isdir(item): + for child_item in get_contents_of_folder_for_removal(item): + if str(child_item) in to_remove: + continue + if child_item.is_dir(): + new_tasks.append({ + "type": "directory", + "path": str(child_item) + }) + elif child_item.is_file(): + new_tasks.append({ + "type": "file", + "path": str(child_item), + }) + else: + raise RuntimeError( + f'not sure what to do. "{child_item}" is not ' + f'considered a file or a directory.' + ) + to_remove.add(str(child_item)) + elif os.path.isfile(item): + new_tasks.append({ + "type": "file", + "path": item, + }) + to_remove.add(item) return new_tasks def get_additional_info( @@ -92,9 +111,13 @@ def get_additional_info( confirm = \ user_request_factory.confirm_removal() - return self.sort_item_data( - confirm.get_user_response(options, pretask_results)['items'] - ) + return { + "to remove": [ + os.path.join(str(options['Path']), item) + for item in + confirm.get_user_response(options, pretask_results)['items'] + ] + } @staticmethod def sort_item_data(data: List[str]) -> Dict[str, List[str]]: @@ -232,6 +255,14 @@ def is_valid(self, path: Path) -> bool: return not path.name.startswith("._") +class CaptureOneChecker(AbsChecker): + + def is_valid(self, path: Path) -> bool: + if not path.is_dir(): + return True + return path.name != "CaptureOne" + + class OffendingPathDecider(AbsPathItemDecision): def __init__(self) -> None: @@ -253,10 +284,11 @@ def __init__(self, **user_args) -> None: self.root: str = user_args['Path'] self._include_subdirectory = user_args['Include Subdirectories'] - self._locate_capture_one: bool = \ - user_args['Locate and delete Capture One files'] self.file_deciding_strategy = OffendingPathDecider() + if user_args['Locate and delete Capture One files']: + self.file_deciding_strategy.add_checker(CaptureOneChecker()) + if user_args['Locate and delete dot underscore files']: self.file_deciding_strategy.add_checker(DotUnderScoreChecker()) @@ -277,7 +309,9 @@ def locate_results(self) -> List[str]: return [ item for item in self.filesystem_locator_strategy.locate(self.root) - if self.file_deciding_strategy.is_offending(Path(item)) + if self.file_deciding_strategy.is_offending( + Path(self.root) / Path(item) + ) ] @@ -286,24 +320,9 @@ class FilesystemItemLocator: def locate(self, path: str) -> Iterator[str]: if not os.path.exists(path): raise FileNotFoundError(f"Could not find {path}") - for item in self._locate_contents(path): + for item in get_contents_of_folder_for_removal(path): yield os.path.relpath(item, start=path) - def _locate_contents(self, path: str) -> Iterator[str]: - """Locate files and folders in the path. - - This function guarantees that the content of a folder is listed before - the folder itself. This is to help delete items in the right order. - """ - files = [] - for item in os.scandir(path): - if item.is_dir(): - yield from self._locate_contents(item.path) - else: - files.append(item.path) - yield from files - yield path - def find_capture_one_data(directory: str) -> Iterator[str]: potential_capture_one_dir_name = \ @@ -316,3 +335,23 @@ def find_capture_one_data(directory: str) -> Iterator[str]: for dir_name in dirs: yield os.path.join(root, dir_name) yield potential_capture_one_dir_name + + +def get_contents_of_folder_for_removal( + root: Union[Path, str] +) -> Iterator[Path]: + """Locate files and folders in the path. + + This function guarantees that the content of a folder is listed before + the folder itself. This is to help delete items in the right order. + """ + root = Path(root) + files = [] + + for item in root.iterdir(): + if item.is_dir(): + yield from get_contents_of_folder_for_removal(item) + else: + files.append(item) + yield from files + yield root diff --git a/tests/workflows/test_medua_preingest_curation.py b/tests/workflows/test_medua_preingest_curation.py index f9fdb067f..ca6c8b27e 100644 --- a/tests/workflows/test_medua_preingest_curation.py +++ b/tests/workflows/test_medua_preingest_curation.py @@ -1,4 +1,5 @@ import os.path +import pathlib from unittest.mock import Mock, MagicMock import pytest @@ -58,13 +59,44 @@ def test_sort_item_data(self, workflow, monkeypatch): "directories": ["./some/directory/"], } - def test_discover_task_metadata(self, workflow, default_args): + def test_discover_task_metadata(self, workflow, default_args, monkeypatch): initial_results = [] + + removal_files = ["somefile.txt"] + removal_dirs = ["somedir", "nested_path"] + nested_dirs = [os.path.join("nested_path", "a")] + + def isfile(path): + return path in removal_files + + def isdir(path): + return path in removal_dirs + + def iterdir(self): + if self.name == "nested_path": + return [pathlib.Path(a) for a in nested_dirs] + return [] + + def is_dir(self): + return self.name in removal_dirs or self.name == os.path.join( + "nested_path", "a" + ) + + def is_file(self): + return self.name in removal_files + ["a"] + + monkeypatch.setattr(workflow_medusa_preingest.os.path, "isdir", isdir) + monkeypatch.setattr( + workflow_medusa_preingest.os.path, "isfile", isfile + ) + monkeypatch.setattr(workflow_medusa_preingest.Path, "iterdir", iterdir) + monkeypatch.setattr(workflow_medusa_preingest.Path, "is_dir", is_dir) + monkeypatch.setattr(workflow_medusa_preingest.Path, "is_file", is_file) + new_tasks = workflow.discover_task_metadata( initial_results, additional_data={ - "files": ["somefile.txt"], - "directories": ["somedir"], + "to remove": removal_files + removal_dirs + nested_dirs }, **default_args, ) @@ -252,18 +284,22 @@ def test_locate_results_throws_file_not_found_if_not_exists(self): def offending_files(self): def _make_mock_offending_files(search_path): ds_store_file = Mock( - os.DirEntry, - name=".DS_Store", - path=os.path.join(search_path, ".DS_Store"), - is_file=Mock(return_value=True), - is_dir=Mock(return_value=False), + spec_set=pathlib.Path, + # name='.DS_Store', + # parent=pathlib.Path(os.path.join(search_path, "starting_point", "empty_path")), + parts=(search_path, ".DS_Store"), + is_dir=Mock(return_value=True), + is_file=Mock(return_value=False), ) ds_store_file.name = ".DS_Store" dot_under_score_file = Mock( - path=os.path.join(search_path, "._cache"), + spec_set=pathlib.Path, + parts=(search_path, "._cache"), + is_dir=Mock(return_value=False), is_file=Mock(return_value=True), ) + dot_under_score_file.name = "._cache" return [ds_store_file, dot_under_score_file] @@ -304,9 +340,13 @@ def test_locate_offending_files( ) task.filesystem_locator_strategy.locate = Mock( - return_value=[i.path for i in offending_files(search_path)] + return_value=[ + os.path.join(*i.parts) for i in offending_files(search_path) + ] + ) + monkeypatch.setattr( + workflow_medusa_preingest.os.path, "exists", lambda: True ) - if expected_file: assert os.path.join(search_path, expected_file) in list( task.locate_results() @@ -384,64 +424,31 @@ def walk(top, *args, **kwargs): class TestFilesystemItemLocator: def test_locate_contents_order(self, monkeypatch): files = { - os.path.join("."): [ - Mock( - spec_set=os.DirEntry, - path=os.path.join(".", "starting_point"), - is_dir=Mock(return_value=True), - ) - ], - os.path.join(".", "starting_point", "empty_path"): [], - os.path.join(".", "starting_point", "nested_path"): [ - Mock( - spec_set=os.DirEntry, - path=os.path.join( - ".", "starting_point", "nested_path", "file1.txt" - ), - is_dir=Mock(return_value=False), - ), - Mock( - spec_set=os.DirEntry, - path=os.path.join( - ".", "starting_point", "nested_path", "file2.txt" - ), - is_dir=Mock(return_value=False), - ), - ], - os.path.join(".", "starting_point"): [ - Mock( - spec_set=os.DirEntry, - path=os.path.join(".", "starting_point", "empty_path"), - is_dir=Mock(return_value=True), - ), - Mock( - spec_set=os.DirEntry, - path=os.path.join(".", "starting_point", "dummy.txt"), - is_dir=Mock(return_value=False), - ), - Mock( - spec_set=os.DirEntry, - path=os.path.join(".", "starting_point", "nested_path"), - is_dir=Mock(return_value=True), - ), + "starting_point": [ + pathlib.Path("starting_point/empty_path"), + pathlib.Path("starting_point/nested_path/file1.txt"), + pathlib.Path("starting_point/nested_path/file2.txt"), + pathlib.Path("starting_point/nested_path"), + pathlib.Path("starting_point/dummy.txt"), ], } - def scandir(path, **kwargs): - return files[path] + def iterdir(self): + return files[str(self)] monkeypatch.setattr( workflow_medusa_preingest.os.path, "exists", lambda path: path in files, ) - monkeypatch.setattr(workflow_medusa_preingest.os, "scandir", scandir) + + monkeypatch.setattr(workflow_medusa_preingest.Path, "iterdir", iterdir) locator = workflow_medusa_preingest.FilesystemItemLocator() - assert list(locator.locate(os.path.join(".", "starting_point"))) == [ + assert set(locator.locate("starting_point")) == { "empty_path", - os.path.join("nested_path", "file1.txt"), os.path.join("nested_path", "file2.txt"), + os.path.join("nested_path", "file1.txt"), "nested_path", "dummy.txt", os.path.join("."), - ] + }