Skip to content

Commit

Permalink
Setup basic engine class to manage and transition flow instances
Browse files Browse the repository at this point in the history
  • Loading branch information
tomtitherington committed Mar 1, 2024
1 parent 5bc69d2 commit e8cd0c1
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 0 deletions.
1 change: 1 addition & 0 deletions .dictionary/custom.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ pycodestyle
pycryptodome
pyenv
pyflakes
Pylance
pylint
pyproject
pytest
Expand Down
1 change: 1 addition & 0 deletions backend/app/engine/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .engine import Engine
199 changes: 199 additions & 0 deletions backend/app/engine/engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
from typing import Optional

from django.core.exceptions import ValidationError
from django.db import transaction

from backend.app.models import FlowInstance, FlowSchema, Person, StepInstance, TransitionInstance, TransitionSchema

from .utils import has_flow_instance


class Engine:
"""
Manages the overall coordination of a flow. It determines the order in which steps are executed,
handles transitions between steps, and so on.
"""

def __init__(
self,
person: Person,
flow_schema: Optional[FlowSchema] = None,
flow_instance: Optional[FlowInstance] = None,
):
self.person = person
self.flow_schema = flow_schema
self.flow_instance = flow_instance

@classmethod
def resume(cls, flow_instance: FlowInstance):
"""
Factory constructor that initializes and resumes a flow given a FlowInstance.
Parameters:
- flow_instance: The FlowInstance object for the flow to be resumed.
"""
return cls(person=flow_instance.person, flow_instance=flow_instance)

@classmethod
def start(cls, person: Person, flow_schema: FlowSchema):
"""
Factory constructor that initializes and starts a flow given a Person and FlowSchema.
Sets up the flow instance and executes any initial automatic transitions.
Parameters:
- person: The Person object for whom the flow is being started.
- flow_schema: The FlowSchema object defining the flow structure.
"""
engine = cls(person=person, flow_schema=flow_schema)
engine.setup()
engine.execute_automatic_transitions()
return engine

@transaction.atomic
def setup(self):
"""
Sets up a new FlowInstance and its related StepInstances based on the FlowSchema.
"""
if not self.flow_schema:
raise ValueError("FlowSchema must be provided for setup.")

# Check for existing FlowInstance for this Person and FlowSchema
existing_instance = FlowInstance.objects.filter(
person=self.person,
schema_version__schema=self.flow_schema,
).exists()
if existing_instance:
raise ValueError("A FlowInstance already exists for this Person and FlowSchema.")

latest_version = self.flow_schema.latest_version()
if not latest_version:
raise ValueError("No versions available for the provided FlowSchema.")

self.flow_instance = FlowInstance.objects.create(
person=self.person,
schema_version=latest_version,
environment=self.person.environment,
)

# Create StepInstances for each StepSchema in the FlowSchemaVersion
step_instances = {}
for step_schema in latest_version.steps.all():
step_instance = StepInstance.objects.create(
step_schema=step_schema,
flow_instance=self.flow_instance,
state=StepInstance.StepState.INACTIVE,
)
step_instances[step_schema.id] = step_instance

# Create TransitionInstances for each TransitionSchema in the FlowSchemaVersion
for transition_schema in latest_version.transitions.all():
TransitionInstance.objects.create(
transition_schema=transition_schema,
flow_instance=self.flow_instance,
step_instance_from=step_instances[transition_schema.from_step.id],
step_instance_to=step_instances[transition_schema.to_step.id],
)

# TODO: Will need an async version of this to handle long-running actions
# TODO: I don't like the from_automatic flag, let's revisit this later
@has_flow_instance
@transaction.atomic
def execute_transition(self, transition: TransitionInstance | str, from_automatic: bool = False):
"""
Executes the specified transition, moving the flow from one step to another.
Parameters:
- transition: The TransitionInstance to execute, or a TransitionSchema.identifier (str).
"""
transition_instance: TransitionInstance | None = None
active_steps = []

if isinstance(transition, TransitionInstance):
transition_instance = transition
elif isinstance(transition, str): # Assuming the transition is a TransitionSchema identifier
# Retrieve all active StepInstances for the current flow instance
active_steps = StepInstance.objects.filter(
flow_instance=self.flow_instance, state=StepInstance.StepState.ACTIVE
)

# Find the TransitionInstance based on the active steps and the identifier
for active_step in active_steps:
transition_instance = TransitionInstance.objects.filter(
step_instance_from=active_step,
transition_schema__identifier=transition,
flow_instance=self.flow_instance,
).first()

if transition_instance:
break # Break the loop once the correct transition instance is found

if not transition_instance:
raise ValueError("Invalid transition argument or no active step matches the transition identifier.")

from_step = transition_instance.step_instance_from
to_step = transition_instance.step_instance_to

# Validate transition can be made

if from_step.state != StepInstance.StepState.ACTIVE:
raise ValidationError("Invalid transition state. The step you are transitioning from is not active.")

if to_step.state != StepInstance.StepState.INACTIVE:
raise ValidationError("Invalid transition state. The step you are transitioning to is not inactive.")

# Execute the transition: Update states of from_step and to_step
from_step.state = StepInstance.StepState.COMPLETED
from_step.save()
to_step.state = StepInstance.StepState.ACTIVE
to_step.save()

self._check_completed()

if not from_automatic:
self.execute_automatic_transitions()

@has_flow_instance
def execute_automatic_transitions(self):
"""
Executes any available automatic transitions until either the flow is completed
or it arrives at a step with no automatic transitions.
"""
assert self.flow_instance is not None # Needed to silence Pylance / type checking
automatic_transitions_exist = True
while automatic_transitions_exist:
automatic_transitions_exist = False
active_steps = self.flow_instance.active_steps.all()
for step_instance in active_steps:
transitions = TransitionInstance.objects.filter(
step_instance_from=step_instance,
transition_schema__type=TransitionSchema.TransitionType.AUTOMATIC,
)
for transition in transitions:
self.execute_transition(transition, from_automatic=True)
automatic_transitions_exist = True

@has_flow_instance
@transaction.atomic
def _check_completed(self):
"""
Checks if the flow is completed, and if so, updates the state of the flow instance.
"""
assert self.flow_instance is not None # Needed to silence Pylance / type checking

# Assume all steps are completed.
all_steps_completed = True

for step in self.flow_instance.active_steps.all():
# If any step has outgoing transitions, mark as not all steps completed
if step.outgoing_transitions.exists():
all_steps_completed = False
break # No need to check further if any step is not completed
else:
# Mark the individual step as completed
step.state = StepInstance.StepState.COMPLETED
step.save()

# Only if all steps are completed, mark the flow instance as completed
if all_steps_completed:
self.flow_instance.state = FlowInstance.FlowState.COMPLETED
self.flow_instance.save()
10 changes: 10 additions & 0 deletions backend/app/engine/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from functools import wraps


def has_flow_instance(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
if not self.flow_instance:
raise ValueError("Flow instance is not set. Please set up the flow instance before calling this method.")
return func(self, *args, **kwargs)
return wrapper
7 changes: 7 additions & 0 deletions backend/app/models/flow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class FlowState(models.TextChoices):
environment = models.ForeignKey(Environment, on_delete=models.CASCADE)

steps: "RelatedManager[StepInstance]"
transitions: "RelatedManager[TransitionInstance]"

@property
def active_steps(self):
Expand Down Expand Up @@ -58,12 +59,18 @@ class StepState(models.TextChoices):
flow_instance = models.ForeignKey(FlowInstance, on_delete=models.CASCADE, related_name="steps")
state = models.CharField(max_length=20, choices=StepState.choices, default=StepState.INACTIVE)

outgoing_transitions: "RelatedManager[TransitionInstance]"
incoming_transitions: "RelatedManager[TransitionInstance]"


class TransitionInstance(UUIDModel):
"""
Tracks the state of a transition within a flow for a specific person.
"""

# TODO: Should we add a state to TransitionInstance?
# - This would allow us to track the state of a transition (e.g. 'called', 'uncalled')

# use '+' to avoid reverse relation
transition_schema = models.ForeignKey(TransitionSchema, on_delete=models.CASCADE, related_name="+")
flow_instance = models.ForeignKey(FlowInstance, on_delete=models.CASCADE, related_name="transitions")
Expand Down
4 changes: 4 additions & 0 deletions backend/app/models/flow_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,7 @@ class FlowSchemaVersion(UUIDModel):
description = models.CharField(max_length=250, blank=False)
created_at = models.DateTimeField(auto_now_add=True)
version_identifier = models.CharField(max_length=50, unique=True, blank=False)

steps: "RelatedManager[StepSchema]"
transitions: "RelatedManager[TransitionSchema]"

0 comments on commit e8cd0c1

Please sign in to comment.