Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(events): handle events during the load phase #534

Merged
merged 13 commits into from
Mar 15, 2023
8 changes: 7 additions & 1 deletion hathor/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,12 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa
event_storage=self.event_storage,
event_ws_factory=self.event_ws_factory,
pubsub=pubsub,
reactor=reactor
reactor=reactor,
emit_load_events=args.x_emit_load_events
)
else:
self.check_or_raise(not args.x_emit_load_events, '--x-emit-load-events cannot be used without '
'--x-enable-event-queue')

if args.wallet_index and tx_storage.indexes is not None:
self.log.debug('enable wallet indexes')
Expand Down Expand Up @@ -202,6 +206,8 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa
self.wallet.test_mode = True

if args.x_full_verification:
self.check_or_raise(not args.x_enable_event_queue, '--x-full-verification cannot be used with '
'--x-enable-event-queue')
self.manager._full_verification = True
if args.x_fast_init_beta:
self.log.warn('--x-fast-init-beta is now the default, no need to specify it')
Expand Down
2 changes: 2 additions & 0 deletions hathor/cli/run_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ def create_parser(cls) -> ArgumentParser:
parser.add_argument('--x-localhost-only', action='store_true', help='Only connect to peers on localhost')
parser.add_argument('--x-rocksdb-indexes', action='store_true', help=SUPPRESS)
parser.add_argument('--x-enable-event-queue', action='store_true', help='Enable event queue mechanism')
parser.add_argument('--x-emit-load-events', action='store_true', help='Enable emission of events during the '
'LOAD phase')
parser.add_argument('--peer-id-blacklist', action='extend', default=[], nargs='+', type=str,
help='Peer IDs to forbid connection')
return parser
Expand Down
26 changes: 22 additions & 4 deletions hathor/event/event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

_SUBSCRIBE_EVENTS = [
HathorEvents.NETWORK_NEW_TX_ACCEPTED,
HathorEvents.LOAD_STARTED,
msbrogli marked this conversation as resolved.
Show resolved Hide resolved
HathorEvents.LOAD_FINISHED,
HathorEvents.REORG_STARTED,
HathorEvents.REORG_FINISHED,
Expand All @@ -43,7 +42,7 @@
HathorEvents.CONSENSUS_TX_REMOVED,
]

_EVENT_CONVERTER = {
_EVENT_CONVERTERS = {
HathorEvents.CONSENSUS_TX_UPDATE: HathorEvents.VERTEX_METADATA_CHANGED
}

Expand All @@ -55,6 +54,7 @@ class EventManager:
"""

_peer_id: str
_load_finished: bool = False

@property
def event_storage(self) -> EventStorage:
Expand All @@ -65,14 +65,16 @@ def __init__(
event_storage: EventStorage,
event_ws_factory: EventWebsocketFactory,
pubsub: PubSubManager,
reactor: Reactor
reactor: Reactor,
emit_load_events: bool = False
):
self.log = logger.new()

self._clock = reactor
self._event_storage = event_storage
self._event_ws_factory = event_ws_factory
self._pubsub = pubsub
self.emit_load_events = emit_load_events

self._last_event = self._event_storage.get_last_event()
self._last_existing_group_id = self._event_storage.get_last_group_id()
Expand Down Expand Up @@ -108,8 +110,21 @@ def _subscribe_events(self):
self._pubsub.subscribe(event, self._handle_event)

def _handle_event(self, event_type: HathorEvents, event_args: EventArguments) -> None:
event_type = _EVENT_CONVERTERS.get(event_type, event_type)
event_specific_handlers = {
HathorEvents.LOAD_FINISHED: self._handle_load_finished
}

if event_specific_handler := event_specific_handlers.get(event_type):
event_specific_handler()

if not self._load_finished and not self.emit_load_events:
return

self._handle_event_creation(event_type, event_args)

def _handle_event_creation(self, event_type: HathorEvents, event_args: EventArguments) -> None:
create_event_fn: Callable[[HathorEvents, EventArguments], BaseEvent]
event_type = _EVENT_CONVERTER.get(event_type, event_type)

if event_type in _GROUP_START_EVENTS:
create_event_fn = self._create_group_start_event
Expand Down Expand Up @@ -161,6 +176,9 @@ def _create_non_group_edge_event(self, event_type: HathorEvents, event_args: Eve
group_id=group_id,
)

def _handle_load_finished(self):
self._load_finished = True

def _create_event(
self,
event_type: HathorEvents,
Expand Down
1 change: 0 additions & 1 deletion hathor/event/model/base_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from hathor.utils.pydantic import BaseModel

_EVENT_DATA_MAP: Dict[HathorEvents, Type[BaseEventData]] = {
HathorEvents.LOAD_STARTED: EmptyData,
HathorEvents.LOAD_FINISHED: EmptyData,
HathorEvents.NETWORK_NEW_TX_ACCEPTED: TxData,
HathorEvents.REORG_STARTED: ReorgData,
Expand Down
4 changes: 4 additions & 0 deletions hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,8 @@ def _initialize_components(self) -> None:

This method runs through all transactions, verifying them and updating our wallet.
"""
assert not self._event_manager, 'this method cannot be used if the events feature is enabled.'

self.log.info('initialize')
if self.wallet:
self.wallet._manually_initialize()
Expand Down Expand Up @@ -554,6 +556,7 @@ def _initialize_components(self) -> None:

# self.stop_profiler(save_to='profiles/initializing.prof')
self.state = self.NodeState.READY

total_load_time = LogDuration(t2 - t0)
tx_rate = '?' if total_load_time == 0 else cnt / total_load_time

Expand Down Expand Up @@ -631,6 +634,7 @@ def _initialize_components_new(self) -> None:
# XXX: last step before actually starting is updating the last started at timestamps
self.tx_storage.update_last_started_at(started_at)
self.state = self.NodeState.READY
self.pubsub.publish(HathorEvents.LOAD_FINISHED)

t1 = time.time()
total_load_time = LogDuration(t1 - t0)
Expand Down
5 changes: 0 additions & 5 deletions hathor/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@ class HathorEvents(Enum):
WALLET_ELEMENT_VOIDED:
Triggered when a wallet element is marked as voided

LOAD_STARTED
Triggered when manager has started reading data from the local database

LOAD_FINISHED
Triggered when manager finishes reading local data and it is ready to sync

Expand Down Expand Up @@ -132,8 +129,6 @@ class HathorEvents(Enum):

WALLET_ELEMENT_VOIDED = 'wallet:element_voided'

LOAD_STARTED = 'manager:load_started'

LOAD_FINISHED = 'manager:load_finished'

REORG_STARTED = 'reorg:started'
Expand Down
6 changes: 4 additions & 2 deletions tests/event/test_event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ def setUp(self):
self.manager = self.create_peer(
self.network,
event_manager=self.event_manager,
pubsub=pubsub
pubsub=pubsub,
full_verification=False
)

def test_if_event_is_persisted(self):
Expand All @@ -49,11 +50,12 @@ def test_event_group(self):
self._fake_reorg_started()
self._fake_reorg_finished()
self.run_to_completion()
# XXX: 0 is a tx update
event0 = self.event_storage.get_event(0)
event1 = self.event_storage.get_event(1)
event2 = self.event_storage.get_event(2)
event3 = self.event_storage.get_event(3)
event4 = self.event_storage.get_event(4)
self.assertEqual(HathorEvents(event0.type), HathorEvents.LOAD_FINISHED)
self.assertEqual(HathorEvents(event1.type), HathorEvents.REORG_STARTED)
self.assertIsNotNone(event1.group_id)
self.assertEqual(HathorEvents(event2.type), HathorEvents.REORG_FINISHED)
Expand Down
7 changes: 5 additions & 2 deletions tests/event/test_event_reorg.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ def setUp(self):
self.manager = self.create_peer(
self.network,
event_manager=self.event_manager,
pubsub=pubsub
pubsub=pubsub,
full_verification=False
)

# read genesis keys
Expand Down Expand Up @@ -70,9 +71,11 @@ def test_reorg_events(self):
class unsorted(list):
pass
expected_events_grouped = [
[
(HathorEvents.LOAD_FINISHED, {})
],
# XXX: the order of the following events can vary depending on which genesis is spent/confirmed first
unsorted([
(HathorEvents.VERTEX_METADATA_CHANGED, {'hash': settings.GENESIS_BLOCK_HASH.hex()}),
(HathorEvents.VERTEX_METADATA_CHANGED, {'hash': settings.GENESIS_TX1_HASH.hex()}),
(HathorEvents.VERTEX_METADATA_CHANGED, {'hash': settings.GENESIS_TX2_HASH.hex()}),
(HathorEvents.VERTEX_METADATA_CHANGED, {'hash': blocks[0].hash_hex}),
Expand Down
14 changes: 14 additions & 0 deletions tests/others/test_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,24 @@ def test_event_queue_with_rocksdb_storage(self):
self.assertIsInstance(manager._event_manager, EventManager)
self.assertIsInstance(manager._event_manager._event_storage, EventRocksDBStorage)
self.assertIsInstance(manager._event_manager._event_ws_factory, EventWebsocketFactory)
self.assertFalse(manager._event_manager.emit_load_events)

def test_event_queue_with_memory_storage(self):
manager = self._build(['--x-enable-event-queue', '--memory-storage'])

self.assertIsInstance(manager._event_manager, EventManager)
self.assertIsInstance(manager._event_manager._event_storage, EventMemoryStorage)
self.assertIsInstance(manager._event_manager._event_ws_factory, EventWebsocketFactory)
self.assertFalse(manager._event_manager.emit_load_events)

def test_event_queue_with_full_verification(self):
args = ['--x-enable-event-queue', '--memory-storage', '--x-full-verification']
self._build_with_error(args, '--x-full-verification cannot be used with --x-enable-event-queue')

def test_event_queue_with_emit_load_events(self):
manager = self._build(['--x-enable-event-queue', '--memory-storage', '--x-emit-load-events'])

self.assertIsInstance(manager._event_manager, EventManager)
self.assertIsInstance(manager._event_manager._event_storage, EventMemoryStorage)
self.assertIsInstance(manager._event_manager._event_ws_factory, EventWebsocketFactory)
self.assertTrue(manager._event_manager.emit_load_events)