Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Base classes and example for client-side code #10

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
6 changes: 6 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[report]
exclude_lines =
pragma: no cover
-no-cov-
raise NotImplementedError
\.\.\.
2 changes: 2 additions & 0 deletions exorcist/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# from .taskdb import TaskStatusDB, NoStatusChange
from .models import TaskStatus, Task
111 changes: 111 additions & 0 deletions exorcist/example_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""
Simple example of implementing Exorcist in client code.

This is useful both to illustrate how client code for Exocist can be
written, as well as to be used in our test suite.
"""


import dataclasses
import pathlib
import json
import pickle

from functools import partial

from .resultstore import ResultStore
from .taskstore import TaskDetailsStore

from typing import Callable, Union

@dataclasses.dataclass
class ExampleResult:
"""Result class.

This doesn't have to be a user-defined class; for example, it could be a
dict with an expected structure.
"""
label: str
main_result: Union[float, str]
is_failure: bool


@dataclasses.dataclass
class ExampleTaskDetails:
label: str
input_result_labels: dict[str, str]
"""keys are variable names, values are the label where result is stored
"""
task_func: Callable

def _extract_main_result(self, resultfile):
with open(resultfile, mode='r') as f:
result = ExampleResult(**json.load(f))

return result.main_result

def run_task(self, directory):
inputs = {
key: self._extract_main_result(directory / f"{inp}_result.json")
for key, inp in self.input_result_labels.items()
}
try:
main_result = self.task_func(**inputs)
is_failure = False
except Exception as e:
main_result = str(e)
is_failure = True

return ExampleResult(
label=self.label,
main_result=main_result,
is_failure=is_failure
)


class ExampleResultStore(ResultStore):
"""Example of a ResultStore

This stores :class:`.ExampleResult`\ s as JSON files in a given
directory.
"""
def __init__(self, directory):
self.directory = directory

def is_failure_result(self, result: ExampleResult) -> bool:
return result.is_failure

def store_result(self, result: ExampleResult, retry: int = 0):
# the idea here is that there is only ever one successful result for
# a given task, but there may be many failures -- we label the
# failures by trial numbers. This way we save both successes and
# failures.
if self.is_failure_result(result):
path = self.directory / f"{result.label}_result_{retry}.json"
else:
path = self.directory / f"{result.label}_result.json"

with open(path, mode='w') as f:
f.write(json.dumps(dataclasses.asdict(result)))


class ExampleTaskDetailsStore(TaskDetailsStore):
"""Example of a TaskDetailsStore

"""
def __init__(self, directory):
self.directory = directory

def store_task_details(self, taskid: str,
task_details: ExampleTaskDetails):
with open(self.directory / f"{taskid}.p", mode='wb') as f:
pickle.dump(task_details, f)

def load_task_details(self, taskid: str) -> ExampleTaskDetails:
with open(self.directory / f"{taskid}.p", mode='rb') as f:
task_details = pickle.load(f)

return task_details

def run_task(self, task_details: ExampleTaskDetails) -> ExampleResult:
return task_details.run_task(directory=self.directory)
30 changes: 19 additions & 11 deletions exorcist/models.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,31 @@
from enum import Enum
from dataclasses import dataclass
from datetime import datetime
from typing import TypeVar, Generic
import dataclasses

# generics: the actual types here depend on the client library
Result = TypeVar("Result")
TaskDetails = TypeVar("TaskDetails")

class TaskStatus(Enum):
"""
Status of a given task.
"""
BLOCKED = 0
AVAILABLE = 1
IN_PROGRESS = 2
RESULTS_READY = 3
COMPLETED = 99
TOO_MANY_RETRIES = -2
ERROR = -1


@dataclass
class Task:
taskid: str
taskfile: str
last_modified: datetime
n_retries: int
# TODO: it isn't entirely clear to me that this is needed, or that this is
# the right way to do it. but I wanted to capture the way to handle typing
# of something like this
@dataclasses.dataclass
class Task(Generic[TaskDetails]):
"""Generic to contain taskid and the client-specific TaskDetails.

@classmethod
def from_db_row(row):
...
"""
taskid: str
task_details: TaskDetails
26 changes: 26 additions & 0 deletions exorcist/resultstore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import abc

from typing import Generic
from .models import Result


class ResultStore(abc.ABC, Generic[Result]):
"""Result storage.

Client code must provide a storage object that conforms to this abstract
API.
"""
@abc.abstractmethod
def is_failure_result(self, result: Result) -> bool:
"""Test whether this result represents a failed run.

This allows failures (e.g., raising exceptions) to be treated as
first-class results, which can then be introspected by the users.
"""
raise NotImplementedError()

@abc.abstractmethod
def store_result(self, result: Result, retry: int = 0):
"""Store a result to permanent storage.
"""
raise NotImplementedError()
26 changes: 26 additions & 0 deletions exorcist/taskstore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import abc

from typing import Generic, Callable
from .models import TaskDetails, Result

class TaskDetailsStore(abc.ABC, Generic[TaskDetails, Result]):
"""Task details storage.

Client code must provide a storage object that conforms to this abstract
API.
"""
@abc.abstractmethod
def store_task_details(self, taskid: str, task_details: TaskDetails):
"""Store the given task details.
"""
raise NotImplementedError()

@abc.abstractmethod
def load_task_details(self, taskid: str) -> TaskDetails:
"""Load the task details from disk."""
raise NotImplementedError()

@abc.abstractmethod
def run_task(self, task_details: TaskDetails) -> Result:
"""Run the task, based on the given details"""
raise NotImplementedError()
126 changes: 126 additions & 0 deletions exorcist/tests/test_example_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import pytest
import json
import pickle

from exorcist.example_client import (
ExampleResult, ExampleTaskDetails, ExampleResultStore,
ExampleTaskDetailsStore,
)

@pytest.fixture
def example_result():
return ExampleResult(
label="foo",
main_result=3.0,
is_failure=False
)

@pytest.fixture
def failure_result():
return ExampleResult(
label="foo",
main_result="float division by zero",
is_failure=True
)


# simple functions used as our tasks (need to be named functions here
# because pickle doesn't like lambdas)
def incr(x):
return x + 1.0

def return_1():
return 1.0

def failure_func():
return 1.0/0.0


@pytest.fixture
def example_details():
return ExampleTaskDetails(
label="foo",
input_result_labels={},
task_func=return_1
)

@pytest.fixture
def result_store(tmp_path):
return ExampleResultStore(tmp_path)

@pytest.fixture
def task_store(tmp_path):
return ExampleTaskDetailsStore(tmp_path)


class TestExampleResultStore:
@pytest.mark.parametrize('fixture', [
'example_result', 'failure_result'
])
def test_is_failure_result(self, result_store, request, fixture):
result = request.getfixturevalue(fixture)
expected = (fixture == 'failure_result')
assert result_store.is_failure_result(result) == expected

@pytest.mark.parametrize('fixture', [
'example_result', 'failure_result'
])
def test_store_result(self, result_store, request, fixture):
result = request.getfixturevalue(fixture)
result_store.store_result(result, retry=5)
filename = {
"example_result": "foo_result.json",
"failure_result": "foo_result_5.json",
}[fixture]
path = result_store.directory / filename
assert path.exists()
with open(path) as f:
dct = json.load(f)

recreated = ExampleResult(**dct)
assert result == recreated


class TestExampleTaskDetailsStore:
def test_store_load_task_details_cycle(self, task_store,
example_details):
task_store.store_task_details(example_details.label,
example_details)
path = task_store.directory / f"{example_details.label}.p"
assert path.exists()
reloaded = task_store.load_task_details(example_details.label)
assert reloaded == example_details

def test_run_task(self, task_store, example_details):
result = task_store.run_task(example_details)
assert not result.is_failure
assert result.main_result == 1.0
assert result.label == "foo"

def test_run_task_failing_result(self, task_store):
details = ExampleTaskDetails(
label="baz",
input_result_labels={},
task_func=failure_func
)
result = task_store.run_task(details)
assert result.is_failure
assert result.main_result == "float division by zero"
assert result.label == "baz"

def test_workflow(self, task_store, result_store, example_details):
# depends on store_task_details working
example_details_2 = ExampleTaskDetails(
label="bar",
input_result_labels={'x': "foo"},
task_func=incr
)

# manually do the work of the worker here
result1 = task_store.run_task(example_details)
assert not result_store.is_failure_result(result1)
result_store.store_result(result1)
result2 = task_store.run_task(example_details_2)
assert not result_store.is_failure_result(result2)

assert result2.main_result == 2.0
4 changes: 0 additions & 4 deletions exorcist/tests/test_nothing.py

This file was deleted.