Skip to content

Commit

Permalink
Pushed attribute 'previous_max_notification_id' down to Leader class.…
Browse files Browse the repository at this point in the history
… Moved RecordingEvent class to system module.
  • Loading branch information
johnbywater committed Nov 8, 2024
1 parent 77b63ff commit 6e7905d
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 20 deletions.
15 changes: 0 additions & 15 deletions eventsourcing/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,18 +599,6 @@ def save(
self.collect_events(*aggregates, **kwargs)


class RecordingEvent:
def __init__(
self,
application_name: str,
recordings: List[Recording],
previous_max_notification_id: int | None,
):
self.application_name = application_name
self.recordings = recordings
self.previous_max_notification_id = previous_max_notification_id


class Application:
"""
Base class for event-sourced applications.
Expand Down Expand Up @@ -659,9 +647,6 @@ def __init__(self, env: EnvType | None = None) -> None:
self._repository = self.construct_repository()
self._notification_log = self.construct_notification_log()
self.closing = Event()
self.previous_max_notification_id: int | None = (
self.recorder.max_notification_id()
)

@property
def repository(self) -> Repository:
Expand Down
27 changes: 25 additions & 2 deletions eventsourcing/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@
Application,
NotificationLog,
ProcessingEvent,
RecordingEvent,
Section,
TApplication,
)
from eventsourcing.domain import DomainEventProtocol
from eventsourcing.domain import DomainEventProtocol, MutableOrImmutableAggregate
from eventsourcing.persistence import (
IntegrityError,
Mapper,
Expand All @@ -46,6 +45,20 @@
from eventsourcing.utils import EnvType, get_topic, resolve_topic

ProcessingJob = Tuple[DomainEventProtocol, Tracking]


class RecordingEvent:
def __init__(
self,
application_name: str,
recordings: List[Recording],
previous_max_notification_id: int | None,
):
self.application_name = application_name
self.recordings = recordings
self.previous_max_notification_id = previous_max_notification_id


ConvertingJob = Optional[Union[RecordingEvent, List[Notification]]]


Expand Down Expand Up @@ -230,6 +243,7 @@ class by also being responsible for keeping track of

def __init__(self, env: EnvType | None = None) -> None:
super().__init__(env)
self.previous_max_notification_id: int | None = None
self.followers: List[RecordingEventReceiver] = []

def lead(self, follower: RecordingEventReceiver) -> None:
Expand All @@ -238,6 +252,15 @@ def lead(self, follower: RecordingEventReceiver) -> None:
"""
self.followers.append(follower)

def save(
self,
*objs: MutableOrImmutableAggregate | DomainEventProtocol | None,
**kwargs: Any,
) -> List[Recording]:
if self.previous_max_notification_id is None:
self.previous_max_notification_id = self.recorder.max_notification_id()
return super().save(*objs, **kwargs)

def _notify(self, recordings: List[Recording]) -> None:
"""
Calls :func:`receive_recording_event` on each follower
Expand Down
2 changes: 1 addition & 1 deletion tests/application_tests/test_processapplication.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from unittest.case import TestCase

from eventsourcing.application import RecordingEvent
from eventsourcing.dispatch import singledispatchmethod
from eventsourcing.domain import AggregateEvent
from eventsourcing.persistence import Transcoder
Expand All @@ -9,6 +8,7 @@
Leader,
ProcessApplication,
ProcessingEvent,
RecordingEvent,
RecordingEventReceiver,
)
from eventsourcing.tests.application import BankAccounts, EmailAddressAsStr
Expand Down
4 changes: 3 additions & 1 deletion tests/system_tests/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from unittest.case import TestCase
from unittest.mock import MagicMock

from eventsourcing.application import ProcessingEvent, RecordingEvent
from eventsourcing.domain import Aggregate, AggregateEvent, event
from eventsourcing.persistence import Notification, ProgrammingError
from eventsourcing.postgres import PostgresDatastore
Expand All @@ -36,6 +35,7 @@
ProcessApplication,
ProcessingJob,
PullingThread,
RecordingEvent,
Runner,
RunnerAlreadyStartedError,
SingleThreadedRunner,
Expand All @@ -50,6 +50,8 @@
if TYPE_CHECKING: # pragma: nocover
from uuid import UUID

from eventsourcing.application import ProcessingEvent


class EmailProcess2(EmailProcess):
pass
Expand Down
3 changes: 2 additions & 1 deletion tests/system_tests/test_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
from unittest.case import TestCase
from uuid import NAMESPACE_URL, uuid4, uuid5

from eventsourcing.application import Application, RecordingEvent
from eventsourcing.application import Application
from eventsourcing.domain import Aggregate
from eventsourcing.persistence import IntegrityError, Notification, Tracking
from eventsourcing.system import (
Follower,
Leader,
ProcessApplication,
RecordingEvent,
RecordingEventReceiver,
System,
)
Expand Down

0 comments on commit 6e7905d

Please sign in to comment.