Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get event loop deprecate #308

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions src/plumpy/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@
from .processes import Process

get_event_loop = asyncio.get_event_loop
new_event_loop = asyncio.new_event_loop


def create_running_loop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

return loop


def set_event_loop(*args: Any, **kwargs: Any) -> None:
Expand All @@ -34,16 +42,18 @@ class PlumpyEventLoopPolicy(asyncio.DefaultEventLoopPolicy):

_loop: Optional[asyncio.AbstractEventLoop] = None

def get_event_loop(self) -> asyncio.AbstractEventLoop:
"""Return the patched event loop."""
def new_event_loop(self) -> asyncio.AbstractEventLoop:
import nest_asyncio

if self._loop is None:
self._loop = super().get_event_loop()
nest_asyncio.apply(self._loop)
self._loop = super().new_event_loop()
nest_asyncio.apply(self._loop)

return self._loop

def get_event_loop(self) -> asyncio.AbstractEventLoop:
"""Return the patched event loop."""
return self._loop or self.new_event_loop()


def set_event_loop_policy() -> None:
"""Enable plumpy's event loop policy that will make event loop's reentrant."""
Expand All @@ -55,7 +65,8 @@ def set_event_loop_policy() -> None:

def reset_event_loop_policy() -> None:
"""Reset the event loop policy to the default."""
loop = get_event_loop()

loop = asyncio.get_event_loop()

cls = loop.__class__

Expand Down
2 changes: 1 addition & 1 deletion src/plumpy/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ def __init__(
# Don't allow the spec to be changed anymore
self.spec().seal()

self._loop = loop if loop is not None else asyncio.get_event_loop()
self._loop = loop or asyncio.get_event_loop()

self._setup_event_hooks()

Expand Down
8 changes: 5 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# -*- coding: utf-8 -*-
import pytest

from plumpy.events import set_event_loop_policy, reset_event_loop_policy

@pytest.fixture(scope='session')
def set_event_loop_policy():
from plumpy import set_event_loop_policy

@pytest.fixture(scope='function')
def custom_event_loop_policy():
set_event_loop_policy()
yield
reset_event_loop_policy()
12 changes: 12 additions & 0 deletions tests/rmq/test_process_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def sync_controller(thread_communicator: rmq.RmqThreadCommunicator):

class TestRemoteProcessController:
@pytest.mark.asyncio
@pytest.mark.usefixtures('custom_event_loop_policy')
async def test_pause(self, thread_communicator, async_controller):
proc = utils.WaitForSignalProcess(communicator=thread_communicator)
# Run the process in the background
Expand All @@ -56,6 +57,7 @@ async def test_pause(self, thread_communicator, async_controller):
assert proc.paused

@pytest.mark.asyncio
@pytest.mark.usefixtures('custom_event_loop_policy')
async def test_play(self, thread_communicator, async_controller):
proc = utils.WaitForSignalProcess(communicator=thread_communicator)
# Run the process in the background
Expand All @@ -74,6 +76,7 @@ async def test_play(self, thread_communicator, async_controller):
await async_controller.kill_process(proc.pid)

@pytest.mark.asyncio
@pytest.mark.usefixtures('custom_event_loop_policy')
async def test_kill(self, thread_communicator, async_controller):
proc = utils.WaitForSignalProcess(communicator=thread_communicator)
# Run the process in the event loop
Expand All @@ -87,6 +90,7 @@ async def test_kill(self, thread_communicator, async_controller):
assert proc.state == plumpy.ProcessState.KILLED

@pytest.mark.asyncio
@pytest.mark.usefixtures('custom_event_loop_policy')
async def test_status(self, thread_communicator, async_controller):
proc = utils.WaitForSignalProcess(communicator=thread_communicator)
# Run the process in the background
Expand All @@ -100,6 +104,7 @@ async def test_status(self, thread_communicator, async_controller):
# make sure proc reach the final state
await async_controller.kill_process(proc.pid)

@pytest.mark.usefixtures('custom_event_loop_policy')
def test_broadcast(self, thread_communicator):
messages = []

Expand All @@ -122,6 +127,7 @@ def on_broadcast_receive(**msg):

class TestRemoteProcessThreadController:
@pytest.mark.asyncio
@pytest.mark.usefixtures('custom_event_loop_policy')
async def test_pause(self, thread_communicator, sync_controller):
proc = utils.WaitForSignalProcess(communicator=thread_communicator)

Expand All @@ -136,6 +142,7 @@ async def test_pause(self, thread_communicator, sync_controller):
assert proc.paused

@pytest.mark.asyncio
@pytest.mark.usefixtures('custom_event_loop_policy')
async def test_pause_all(self, thread_communicator, sync_controller):
"""Test pausing all processes on a communicator"""
procs = []
Expand All @@ -147,6 +154,7 @@ async def test_pause_all(self, thread_communicator, sync_controller):
await utils.wait_util(lambda: all([proc.paused for proc in procs]))

@pytest.mark.asyncio
@pytest.mark.usefixtures('custom_event_loop_policy')
async def test_play_all(self, thread_communicator, sync_controller):
"""Test pausing all processes on a communicator"""
procs = []
Expand All @@ -161,6 +169,7 @@ async def test_play_all(self, thread_communicator, sync_controller):
await utils.wait_util(lambda: all([not proc.paused for proc in procs]))

@pytest.mark.asyncio
@pytest.mark.usefixtures('custom_event_loop_policy')
async def test_play(self, thread_communicator, sync_controller):
proc = utils.WaitForSignalProcess(communicator=thread_communicator)
assert proc.pause()
Expand All @@ -175,6 +184,7 @@ async def test_play(self, thread_communicator, sync_controller):
assert proc.state == plumpy.ProcessState.CREATED

@pytest.mark.asyncio
@pytest.mark.usefixtures('custom_event_loop_policy')
async def test_kill(self, thread_communicator, sync_controller):
proc = utils.WaitForSignalProcess(communicator=thread_communicator)

Expand All @@ -189,6 +199,7 @@ async def test_kill(self, thread_communicator, sync_controller):
assert proc.state == plumpy.ProcessState.KILLED

@pytest.mark.asyncio
@pytest.mark.usefixtures('custom_event_loop_policy')
async def test_kill_all(self, thread_communicator, sync_controller):
"""Test pausing all processes on a communicator"""
procs = []
Expand All @@ -200,6 +211,7 @@ async def test_kill_all(self, thread_communicator, sync_controller):
assert all([proc.state == plumpy.ProcessState.KILLED for proc in procs])

@pytest.mark.asyncio
@pytest.mark.usefixtures('custom_event_loop_policy')
async def test_status(self, thread_communicator, sync_controller):
proc = utils.WaitForSignalProcess(communicator=thread_communicator)
# Run the process in the background
Expand Down
7 changes: 6 additions & 1 deletion tests/test_communications.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def subscriber():
"""Return an instance of `Subscriber`."""
return Subscriber()


@pytest.mark.usefixtures('custom_event_loop_policy')
def test_add_rpc_subscriber(loop_communicator, subscriber):
"""Test the `LoopCommunicator.add_rpc_subscriber` method."""
assert loop_communicator.add_rpc_subscriber(subscriber) is not None
Expand All @@ -45,12 +45,14 @@ def test_add_rpc_subscriber(loop_communicator, subscriber):
assert loop_communicator.add_rpc_subscriber(subscriber, identifier) == identifier


@pytest.mark.usefixtures('custom_event_loop_policy')
def test_remove_rpc_subscriber(loop_communicator, subscriber):
"""Test the `LoopCommunicator.remove_rpc_subscriber` method."""
identifier = loop_communicator.add_rpc_subscriber(subscriber)
loop_communicator.remove_rpc_subscriber(identifier)


@pytest.mark.usefixtures('custom_event_loop_policy')
def test_add_broadcast_subscriber(loop_communicator, subscriber):
"""Test the `LoopCommunicator.add_broadcast_subscriber` method."""
assert loop_communicator.add_broadcast_subscriber(subscriber) is not None
Expand All @@ -59,17 +61,20 @@ def test_add_broadcast_subscriber(loop_communicator, subscriber):
assert loop_communicator.add_broadcast_subscriber(subscriber, identifier) == identifier


@pytest.mark.usefixtures('custom_event_loop_policy')
def test_remove_broadcast_subscriber(loop_communicator, subscriber):
"""Test the `LoopCommunicator.remove_broadcast_subscriber` method."""
identifier = loop_communicator.add_broadcast_subscriber(subscriber)
loop_communicator.remove_broadcast_subscriber(identifier)


@pytest.mark.usefixtures('custom_event_loop_policy')
def test_add_task_subscriber(loop_communicator, subscriber):
"""Test the `LoopCommunicator.add_task_subscriber` method."""
assert loop_communicator.add_task_subscriber(subscriber) is not None


@pytest.mark.usefixtures('custom_event_loop_policy')
def test_remove_task_subscriber(loop_communicator, subscriber):
"""Test the `LoopCommunicator.remove_task_subscriber` method."""
identifier = loop_communicator.add_task_subscriber(subscriber)
Expand Down
5 changes: 5 additions & 0 deletions tests/test_processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def on_kill(self, msg):


@pytest.mark.asyncio
@pytest.mark.usefixtures('custom_event_loop_policy')
async def test_process_scope():
class ProcessTaskInterleave(plumpy.Process):
async def task(self, steps: list):
Expand All @@ -63,6 +64,7 @@ async def task(self, steps: list):


class TestProcess(unittest.TestCase):
@pytest.mark.usefixtures('custom_event_loop_policy')
def test_spec(self):
"""
Check that the references to specs are doing the right thing...
Expand Down Expand Up @@ -385,6 +387,7 @@ async def async_test():
loop.create_task(proc.step_until_terminated())
loop.run_until_complete(async_test())

@pytest.mark.usefixtures('custom_event_loop_policy')
def test_pause_play_status_messaging(self):
"""
Test the setting of a processes' status through pause and play works correctly.
Expand Down Expand Up @@ -616,6 +619,7 @@ def run(self):

self.assertEqual(len(expect_true), n_run * 3)

@pytest.mark.usefixtures('custom_event_loop_policy')
def test_process_nested(self):
"""
Run multiple and nested processes to make sure the process stack is always correct
Expand All @@ -631,6 +635,7 @@ def run(self):

ParentProcess().execute()

@pytest.mark.usefixtures('custom_event_loop_policy')
def test_call_soon(self):
class CallSoon(plumpy.Process):
def run(self):
Expand Down
Loading