diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..99ca682 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,6 @@ +[report] +exclude_lines = + pragma: no cover + -no-cov- + raise NotImplementedError + \.\.\. diff --git a/exorcist/__init__.py b/exorcist/__init__.py index e69de29..621d53f 100644 --- a/exorcist/__init__.py +++ b/exorcist/__init__.py @@ -0,0 +1,2 @@ +# from .taskdb import TaskStatusDB, NoStatusChange +from .models import TaskStatus, Task diff --git a/exorcist/example_client.py b/exorcist/example_client.py new file mode 100644 index 0000000..2f1b3be --- /dev/null +++ b/exorcist/example_client.py @@ -0,0 +1,111 @@ +""" +Simple example of implementing Exorcist in client code. + +This is useful both to illustrate how client code for Exocist can be +written, as well as to be used in our test suite. +""" + + +import dataclasses +import pathlib +import json +import pickle + +from functools import partial + +from .resultstore import ResultStore +from .taskstore import TaskDetailsStore + +from typing import Callable, Union + +@dataclasses.dataclass +class ExampleResult: + """Result class. + + This doesn't have to be a user-defined class; for example, it could be a + dict with an expected structure. + """ + label: str + main_result: Union[float, str] + is_failure: bool + + +@dataclasses.dataclass +class ExampleTaskDetails: + label: str + input_result_labels: dict[str, str] + """keys are variable names, values are the label where result is stored + """ + task_func: Callable + + def _extract_main_result(self, resultfile): + with open(resultfile, mode='r') as f: + result = ExampleResult(**json.load(f)) + + return result.main_result + + def run_task(self, directory): + inputs = { + key: self._extract_main_result(directory / f"{inp}_result.json") + for key, inp in self.input_result_labels.items() + } + try: + main_result = self.task_func(**inputs) + is_failure = False + except Exception as e: + main_result = str(e) + is_failure = True + + return ExampleResult( + label=self.label, + main_result=main_result, + is_failure=is_failure + ) + + +class ExampleResultStore(ResultStore): + """Example of a ResultStore + + This stores :class:`.ExampleResult`\ s as JSON files in a given + directory. + """ + def __init__(self, directory): + self.directory = directory + + def is_failure_result(self, result: ExampleResult) -> bool: + return result.is_failure + + def store_result(self, result: ExampleResult, retry: int = 0): + # the idea here is that there is only ever one successful result for + # a given task, but there may be many failures -- we label the + # failures by trial numbers. This way we save both successes and + # failures. + if self.is_failure_result(result): + path = self.directory / f"{result.label}_result_{retry}.json" + else: + path = self.directory / f"{result.label}_result.json" + + with open(path, mode='w') as f: + f.write(json.dumps(dataclasses.asdict(result))) + + +class ExampleTaskDetailsStore(TaskDetailsStore): + """Example of a TaskDetailsStore + + """ + def __init__(self, directory): + self.directory = directory + + def store_task_details(self, taskid: str, + task_details: ExampleTaskDetails): + with open(self.directory / f"{taskid}.p", mode='wb') as f: + pickle.dump(task_details, f) + + def load_task_details(self, taskid: str) -> ExampleTaskDetails: + with open(self.directory / f"{taskid}.p", mode='rb') as f: + task_details = pickle.load(f) + + return task_details + + def run_task(self, task_details: ExampleTaskDetails) -> ExampleResult: + return task_details.run_task(directory=self.directory) diff --git a/exorcist/models.py b/exorcist/models.py index 79f0891..7f1788d 100644 --- a/exorcist/models.py +++ b/exorcist/models.py @@ -1,23 +1,31 @@ from enum import Enum -from dataclasses import dataclass -from datetime import datetime +from typing import TypeVar, Generic +import dataclasses + +# generics: the actual types here depend on the client library +Result = TypeVar("Result") +TaskDetails = TypeVar("TaskDetails") class TaskStatus(Enum): + """ + Status of a given task. + """ BLOCKED = 0 AVAILABLE = 1 IN_PROGRESS = 2 RESULTS_READY = 3 COMPLETED = 99 + TOO_MANY_RETRIES = -2 ERROR = -1 -@dataclass -class Task: - taskid: str - taskfile: str - last_modified: datetime - n_retries: int +# TODO: it isn't entirely clear to me that this is needed, or that this is +# the right way to do it. but I wanted to capture the way to handle typing +# of something like this +@dataclasses.dataclass +class Task(Generic[TaskDetails]): + """Generic to contain taskid and the client-specific TaskDetails. - @classmethod - def from_db_row(row): - ... + """ + taskid: str + task_details: TaskDetails diff --git a/exorcist/resultstore.py b/exorcist/resultstore.py new file mode 100644 index 0000000..f27415e --- /dev/null +++ b/exorcist/resultstore.py @@ -0,0 +1,26 @@ +import abc + +from typing import Generic +from .models import Result + + +class ResultStore(abc.ABC, Generic[Result]): + """Result storage. + + Client code must provide a storage object that conforms to this abstract + API. + """ + @abc.abstractmethod + def is_failure_result(self, result: Result) -> bool: + """Test whether this result represents a failed run. + + This allows failures (e.g., raising exceptions) to be treated as + first-class results, which can then be introspected by the users. + """ + raise NotImplementedError() + + @abc.abstractmethod + def store_result(self, result: Result, retry: int = 0): + """Store a result to permanent storage. + """ + raise NotImplementedError() diff --git a/exorcist/taskstore.py b/exorcist/taskstore.py new file mode 100644 index 0000000..fd563b5 --- /dev/null +++ b/exorcist/taskstore.py @@ -0,0 +1,26 @@ +import abc + +from typing import Generic, Callable +from .models import TaskDetails, Result + +class TaskDetailsStore(abc.ABC, Generic[TaskDetails, Result]): + """Task details storage. + + Client code must provide a storage object that conforms to this abstract + API. + """ + @abc.abstractmethod + def store_task_details(self, taskid: str, task_details: TaskDetails): + """Store the given task details. + """ + raise NotImplementedError() + + @abc.abstractmethod + def load_task_details(self, taskid: str) -> TaskDetails: + """Load the task details from disk.""" + raise NotImplementedError() + + @abc.abstractmethod + def run_task(self, task_details: TaskDetails) -> Result: + """Run the task, based on the given details""" + raise NotImplementedError() diff --git a/exorcist/tests/test_example_client.py b/exorcist/tests/test_example_client.py new file mode 100644 index 0000000..2e57d27 --- /dev/null +++ b/exorcist/tests/test_example_client.py @@ -0,0 +1,126 @@ +import pytest +import json +import pickle + +from exorcist.example_client import ( + ExampleResult, ExampleTaskDetails, ExampleResultStore, + ExampleTaskDetailsStore, +) + +@pytest.fixture +def example_result(): + return ExampleResult( + label="foo", + main_result=3.0, + is_failure=False + ) + +@pytest.fixture +def failure_result(): + return ExampleResult( + label="foo", + main_result="float division by zero", + is_failure=True + ) + + +# simple functions used as our tasks (need to be named functions here +# because pickle doesn't like lambdas) +def incr(x): + return x + 1.0 + +def return_1(): + return 1.0 + +def failure_func(): + return 1.0/0.0 + + +@pytest.fixture +def example_details(): + return ExampleTaskDetails( + label="foo", + input_result_labels={}, + task_func=return_1 + ) + +@pytest.fixture +def result_store(tmp_path): + return ExampleResultStore(tmp_path) + +@pytest.fixture +def task_store(tmp_path): + return ExampleTaskDetailsStore(tmp_path) + + +class TestExampleResultStore: + @pytest.mark.parametrize('fixture', [ + 'example_result', 'failure_result' + ]) + def test_is_failure_result(self, result_store, request, fixture): + result = request.getfixturevalue(fixture) + expected = (fixture == 'failure_result') + assert result_store.is_failure_result(result) == expected + + @pytest.mark.parametrize('fixture', [ + 'example_result', 'failure_result' + ]) + def test_store_result(self, result_store, request, fixture): + result = request.getfixturevalue(fixture) + result_store.store_result(result, retry=5) + filename = { + "example_result": "foo_result.json", + "failure_result": "foo_result_5.json", + }[fixture] + path = result_store.directory / filename + assert path.exists() + with open(path) as f: + dct = json.load(f) + + recreated = ExampleResult(**dct) + assert result == recreated + + +class TestExampleTaskDetailsStore: + def test_store_load_task_details_cycle(self, task_store, + example_details): + task_store.store_task_details(example_details.label, + example_details) + path = task_store.directory / f"{example_details.label}.p" + assert path.exists() + reloaded = task_store.load_task_details(example_details.label) + assert reloaded == example_details + + def test_run_task(self, task_store, example_details): + result = task_store.run_task(example_details) + assert not result.is_failure + assert result.main_result == 1.0 + assert result.label == "foo" + + def test_run_task_failing_result(self, task_store): + details = ExampleTaskDetails( + label="baz", + input_result_labels={}, + task_func=failure_func + ) + result = task_store.run_task(details) + assert result.is_failure + assert result.main_result == "float division by zero" + assert result.label == "baz" + + def test_workflow(self, task_store, result_store, example_details): + # depends on store_task_details working + example_details_2 = ExampleTaskDetails( + label="bar", + input_result_labels={'x': "foo"}, + task_func=incr + ) + + # manually do the work of the worker here + result1 = task_store.run_task(example_details) + assert not result_store.is_failure_result(result1) + result_store.store_result(result1) + result2 = task_store.run_task(example_details_2) + assert not result_store.is_failure_result(result2) + + assert result2.main_result == 2.0 diff --git a/exorcist/tests/test_nothing.py b/exorcist/tests/test_nothing.py deleted file mode 100644 index 99bb93d..0000000 --- a/exorcist/tests/test_nothing.py +++ /dev/null @@ -1,4 +0,0 @@ -import exorcist - -def test_temporary_to_make_pytest_pass(): - ...