Skip to content

Commit

Permalink
Speedwagon-500 Speedwagon Medusa Pre Curation script error
Browse files Browse the repository at this point in the history
  • Loading branch information
henryborchers committed Dec 7, 2023
1 parent 6520eab commit 319bb37
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 91 deletions.
107 changes: 73 additions & 34 deletions speedwagon/workflows/workflow_medusa_preingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import abc
import os
import typing
from typing import List, Any, Dict, Optional, Iterator, Union, Callable
from pathlib import Path
import speedwagon
Expand Down Expand Up @@ -67,19 +68,37 @@ def discover_task_metadata(
) -> List[dict]:
"""Organize the order the files & directories should be removed."""
new_tasks: List[Dict[str, str]] = []
to_remove: typing.Set[str] = set()

for file_path in additional_data["files"]:
new_tasks.append({
"type": "file",
"path": file_path
})

for directory_path in additional_data['directories']:
new_tasks.append({
"type": "directory",
"path": directory_path
})

for item in additional_data.get('to remove', []):
if str(item) in to_remove:
continue
if os.path.isdir(item):
for child_item in get_contents_of_folder_for_removal(item):
if str(child_item) in to_remove:
continue
if child_item.is_dir():
new_tasks.append({
"type": "directory",
"path": str(child_item)
})
elif child_item.is_file():
new_tasks.append({
"type": "file",
"path": str(child_item),
})
else:
raise RuntimeError(
f'not sure what to do. "{child_item}" is not '
f'considered a file or a directory.'
)
to_remove.add(str(child_item))
elif os.path.isfile(item):
new_tasks.append({
"type": "file",
"path": item,
})
to_remove.add(item)
return new_tasks

def get_additional_info(
Expand All @@ -92,9 +111,13 @@ def get_additional_info(
confirm = \
user_request_factory.confirm_removal()

return self.sort_item_data(
confirm.get_user_response(options, pretask_results)['items']
)
return {
"to remove": [
os.path.join(str(options['Path']), item)
for item in
confirm.get_user_response(options, pretask_results)['items']
]
}

@staticmethod
def sort_item_data(data: List[str]) -> Dict[str, List[str]]:
Expand Down Expand Up @@ -232,6 +255,14 @@ def is_valid(self, path: Path) -> bool:
return not path.name.startswith("._")


class CaptureOneChecker(AbsChecker):

def is_valid(self, path: Path) -> bool:
if not path.is_dir():
return True
return path.name != "CaptureOne"


class OffendingPathDecider(AbsPathItemDecision):

def __init__(self) -> None:
Expand All @@ -253,10 +284,11 @@ def __init__(self, **user_args) -> None:
self.root: str = user_args['Path']
self._include_subdirectory = user_args['Include Subdirectories']

self._locate_capture_one: bool = \
user_args['Locate and delete Capture One files']
self.file_deciding_strategy = OffendingPathDecider()

if user_args['Locate and delete Capture One files']:
self.file_deciding_strategy.add_checker(CaptureOneChecker())

if user_args['Locate and delete dot underscore files']:
self.file_deciding_strategy.add_checker(DotUnderScoreChecker())

Expand All @@ -277,7 +309,9 @@ def locate_results(self) -> List[str]:
return [
item
for item in self.filesystem_locator_strategy.locate(self.root)
if self.file_deciding_strategy.is_offending(Path(item))
if self.file_deciding_strategy.is_offending(
Path(self.root) / Path(item)
)
]


Expand All @@ -286,24 +320,9 @@ class FilesystemItemLocator:
def locate(self, path: str) -> Iterator[str]:
if not os.path.exists(path):
raise FileNotFoundError(f"Could not find {path}")
for item in self._locate_contents(path):
for item in get_contents_of_folder_for_removal(path):
yield os.path.relpath(item, start=path)

def _locate_contents(self, path: str) -> Iterator[str]:
"""Locate files and folders in the path.
This function guarantees that the content of a folder is listed before
the folder itself. This is to help delete items in the right order.
"""
files = []
for item in os.scandir(path):
if item.is_dir():
yield from self._locate_contents(item.path)
else:
files.append(item.path)
yield from files
yield path


def find_capture_one_data(directory: str) -> Iterator[str]:
potential_capture_one_dir_name = \
Expand All @@ -316,3 +335,23 @@ def find_capture_one_data(directory: str) -> Iterator[str]:
for dir_name in dirs:
yield os.path.join(root, dir_name)
yield potential_capture_one_dir_name


def get_contents_of_folder_for_removal(
root: Union[Path, str]
) -> Iterator[Path]:
"""Locate files and folders in the path.
This function guarantees that the content of a folder is listed before
the folder itself. This is to help delete items in the right order.
"""
root = Path(root)
files = []

for item in root.iterdir():
if item.is_dir():
yield from get_contents_of_folder_for_removal(item)
else:
files.append(item)
yield from files
yield root
121 changes: 64 additions & 57 deletions tests/workflows/test_medua_preingest_curation.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os.path
import pathlib
from unittest.mock import Mock, MagicMock

import pytest
Expand Down Expand Up @@ -58,13 +59,44 @@ def test_sort_item_data(self, workflow, monkeypatch):
"directories": ["./some/directory/"],
}

def test_discover_task_metadata(self, workflow, default_args):
def test_discover_task_metadata(self, workflow, default_args, monkeypatch):
initial_results = []

removal_files = ["somefile.txt"]
removal_dirs = ["somedir", "nested_path"]
nested_dirs = [os.path.join("nested_path", "a")]

def isfile(path):
return path in removal_files

def isdir(path):
return path in removal_dirs

def iterdir(self):
if self.name == "nested_path":
return [pathlib.Path(a) for a in nested_dirs]
return []

def is_dir(self):
return self.name in removal_dirs or self.name == os.path.join(
"nested_path", "a"
)

def is_file(self):
return self.name in removal_files + ["a"]

monkeypatch.setattr(workflow_medusa_preingest.os.path, "isdir", isdir)
monkeypatch.setattr(
workflow_medusa_preingest.os.path, "isfile", isfile
)
monkeypatch.setattr(workflow_medusa_preingest.Path, "iterdir", iterdir)
monkeypatch.setattr(workflow_medusa_preingest.Path, "is_dir", is_dir)
monkeypatch.setattr(workflow_medusa_preingest.Path, "is_file", is_file)

new_tasks = workflow.discover_task_metadata(
initial_results,
additional_data={
"files": ["somefile.txt"],
"directories": ["somedir"],
"to remove": removal_files + removal_dirs + nested_dirs
},
**default_args,
)
Expand Down Expand Up @@ -252,18 +284,22 @@ def test_locate_results_throws_file_not_found_if_not_exists(self):
def offending_files(self):
def _make_mock_offending_files(search_path):
ds_store_file = Mock(
os.DirEntry,
name=".DS_Store",
path=os.path.join(search_path, ".DS_Store"),
is_file=Mock(return_value=True),
is_dir=Mock(return_value=False),
spec_set=pathlib.Path,
# name='.DS_Store',
# parent=pathlib.Path(os.path.join(search_path, "starting_point", "empty_path")),
parts=(search_path, ".DS_Store"),
is_dir=Mock(return_value=True),
is_file=Mock(return_value=False),
)
ds_store_file.name = ".DS_Store"

dot_under_score_file = Mock(
path=os.path.join(search_path, "._cache"),
spec_set=pathlib.Path,
parts=(search_path, "._cache"),
is_dir=Mock(return_value=False),
is_file=Mock(return_value=True),
)

dot_under_score_file.name = "._cache"

return [ds_store_file, dot_under_score_file]
Expand Down Expand Up @@ -304,9 +340,13 @@ def test_locate_offending_files(
)

task.filesystem_locator_strategy.locate = Mock(
return_value=[i.path for i in offending_files(search_path)]
return_value=[
os.path.join(*i.parts) for i in offending_files(search_path)
]
)
monkeypatch.setattr(
workflow_medusa_preingest.os.path, "exists", lambda: True
)

if expected_file:
assert os.path.join(search_path, expected_file) in list(
task.locate_results()
Expand Down Expand Up @@ -384,64 +424,31 @@ def walk(top, *args, **kwargs):
class TestFilesystemItemLocator:
def test_locate_contents_order(self, monkeypatch):
files = {
os.path.join("."): [
Mock(
spec_set=os.DirEntry,
path=os.path.join(".", "starting_point"),
is_dir=Mock(return_value=True),
)
],
os.path.join(".", "starting_point", "empty_path"): [],
os.path.join(".", "starting_point", "nested_path"): [
Mock(
spec_set=os.DirEntry,
path=os.path.join(
".", "starting_point", "nested_path", "file1.txt"
),
is_dir=Mock(return_value=False),
),
Mock(
spec_set=os.DirEntry,
path=os.path.join(
".", "starting_point", "nested_path", "file2.txt"
),
is_dir=Mock(return_value=False),
),
],
os.path.join(".", "starting_point"): [
Mock(
spec_set=os.DirEntry,
path=os.path.join(".", "starting_point", "empty_path"),
is_dir=Mock(return_value=True),
),
Mock(
spec_set=os.DirEntry,
path=os.path.join(".", "starting_point", "dummy.txt"),
is_dir=Mock(return_value=False),
),
Mock(
spec_set=os.DirEntry,
path=os.path.join(".", "starting_point", "nested_path"),
is_dir=Mock(return_value=True),
),
"starting_point": [
pathlib.Path("starting_point/empty_path"),
pathlib.Path("starting_point/nested_path/file1.txt"),
pathlib.Path("starting_point/nested_path/file2.txt"),
pathlib.Path("starting_point/nested_path"),
pathlib.Path("starting_point/dummy.txt"),
],
}

def scandir(path, **kwargs):
return files[path]
def iterdir(self):
return files[str(self)]

monkeypatch.setattr(
workflow_medusa_preingest.os.path,
"exists",
lambda path: path in files,
)
monkeypatch.setattr(workflow_medusa_preingest.os, "scandir", scandir)

monkeypatch.setattr(workflow_medusa_preingest.Path, "iterdir", iterdir)
locator = workflow_medusa_preingest.FilesystemItemLocator()
assert list(locator.locate(os.path.join(".", "starting_point"))) == [
assert set(locator.locate("starting_point")) == {
"empty_path",
os.path.join("nested_path", "file1.txt"),
os.path.join("nested_path", "file2.txt"),
os.path.join("nested_path", "file1.txt"),
"nested_path",
"dummy.txt",
os.path.join("."),
]
}

0 comments on commit 319bb37

Please sign in to comment.