Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make StandardReadable into generic PluggableDevice #1144

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 103 additions & 58 deletions ophyd/v2/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from enum import Enum
from typing import (
Any,
AsyncGenerator,
AsyncIterator,
Awaitable,
Callable,
Coroutine,
Expand All @@ -37,6 +37,7 @@

import numpy as np
from bluesky.protocols import (
Asset,
Configurable,
Descriptor,
Dtype,
Expand All @@ -47,6 +48,8 @@
Stageable,
Status,
Subscribable,
Triggerable,
WritesExternalAssets,
)
from bluesky.run_engine import call_in_bluesky_event_loop

Expand Down Expand Up @@ -782,7 +785,7 @@ async def execute(self, wait=True, timeout=None):
await self._backend.put(None, wait=wait, timeout=timeout or self._timeout)


async def observe_value(signal: SignalR[T]) -> AsyncGenerator[T, None]:
async def observe_value(signal: SignalR[T]) -> AsyncIterator[T]:
"""Subscribe to the value of a signal so it can be iterated from.

Parameters
Expand Down Expand Up @@ -898,83 +901,125 @@ async def set_and_wait_for_value(
return status


async def merge_gathered_dicts(
coros: Iterable[Awaitable[Dict[str, T]]]
) -> Dict[str, T]:
"""Merge dictionaries produced by a sequence of coroutines.
Plug = Callable[[], T]
DescribePlug = Plug[Awaitable[Dict[str, Descriptor]]]
ReadPlug = Plug[Awaitable[Dict[str, Reading]]]
StatusPlug = Plug[AsyncStatus]
AssetPlug = Plug[AsyncIterator[Asset]]

Can be used for merging ``read()`` or ``describe``. For instance::

combined_read = await merge_gathered_dicts(s.read() for s in signals)
"""
async def _status_in_parallel(plugs: Iterable[StatusPlug]) -> None:
await asyncio.wait([plug().task for plug in plugs])


async def _merge_gathered_dicts(
callables: Iterable[Plug[Awaitable[Dict[str, T]]]]
) -> Dict[str, T]:
"""Merge dictionaries produced by a sequence of coroutines"""
ret: Dict[str, T] = {}
coros = [f() for f in callables]
for result in await asyncio.gather(*coros):
ret.update(result)
return ret


class StandardReadable(Device, Readable, Configurable, Stageable):
"""Device that owns its children and provides useful default behavior.

- When its name is set it renames child Devices
- Signals can be registered for read() and read_configuration()
- These signals will be subscribed for read() between stage() and unstage()
"""

_read_signals: Tuple[SignalR, ...] = ()
_configuration_signals: Tuple[SignalR, ...] = ()
_read_uncached_signals: Tuple[SignalR, ...] = ()

def set_readable_signals(
class PluggableDevice(
Device, Stageable, Configurable, Triggerable, Readable, WritesExternalAssets
):
_stage_plugs: Tuple[StatusPlug, ...] = ()
_describe_config_plugs: Tuple[DescribePlug, ...] = ()
_read_config_plugs: Tuple[ReadPlug, ...] = ()
_describe_plugs: Tuple[DescribePlug, ...] = ()
_trigger_plugs: Tuple[StatusPlug, ...] = ()
_read_plugs: Tuple[ReadPlug, ...] = ()
_collect_asset_docs_plugs: Tuple[AssetPlug, ...] = ()
_unstage_plugs: Tuple[StatusPlug, ...] = ()

def add_plug(
self,
read: Sequence[SignalR] = (),
config: Sequence[SignalR] = (),
read_uncached: Sequence[SignalR] = (),
*,
stage: Optional[StatusPlug] = None,
describe_config: Optional[DescribePlug] = None,
read_config: Optional[ReadPlug] = None,
describe: Optional[DescribePlug] = None,
trigger: Optional[StatusPlug] = None,
read: Optional[ReadPlug] = None,
collect_asset_docs: Optional[AssetPlug] = None,
unstage: Optional[StatusPlug] = None,
):
"""
Parameters
----------
read:
Signals to make up `read()`
conf:
Signals to make up `read_configuration()`
read_uncached:
Signals to make up `read()` that won't be cached
"""
self._read_signals = tuple(read)
self._configuration_signals = tuple(config)
self._read_uncached_signals = tuple(read_uncached)
if stage:
self._stage_plugs += (stage,)
if describe_config:
self._describe_config_plugs += (describe_config,)
if read_config:
self._read_config_plugs += (read_config,)
if describe:
self._describe_plugs += (describe,)
if trigger:
self._trigger_plugs += (trigger,)
if read:
self._read_plugs += (read,)
if collect_asset_docs:
self._collect_asset_docs_plugs += (collect_asset_docs,)
if unstage:
self._unstage_plugs += (unstage,)

@AsyncStatus.wrap
async def stage(self) -> None:
for sig in self._read_signals + self._configuration_signals:
await sig.stage().task

@AsyncStatus.wrap
async def unstage(self) -> None:
for sig in self._read_signals + self._configuration_signals:
await sig.unstage().task
await _status_in_parallel(self._stage_plugs)

async def describe_configuration(self) -> Dict[str, Descriptor]:
return await merge_gathered_dicts(
[sig.describe() for sig in self._configuration_signals]
)
return await _merge_gathered_dicts(self._describe_config_plugs)

async def read_configuration(self) -> Dict[str, Reading]:
return await merge_gathered_dicts(
[sig.read() for sig in self._configuration_signals]
)
return await _merge_gathered_dicts(self._read_config_plugs)

async def describe(self) -> Dict[str, Descriptor]:
return await merge_gathered_dicts(
[sig.describe() for sig in self._read_signals + self._read_uncached_signals]
)
return await _merge_gathered_dicts(self._describe_plugs)

@AsyncStatus.wrap
async def trigger(self) -> None:
await _status_in_parallel(self._trigger_plugs)

async def read(self) -> Dict[str, Reading]:
return await merge_gathered_dicts(
[sig.read() for sig in self._read_signals]
+ [sig.read(cached=False) for sig in self._read_uncached_signals]
)
return await _merge_gathered_dicts(self._read_plugs)

async def collect_asset_docs(self) -> AsyncIterator[Asset]:
for plug in self._collect_asset_docs_plugs:
async for asset in plug():
yield asset

@AsyncStatus.wrap
async def unstage(self) -> None:
await _status_in_parallel(self._unstage_plugs)


def add_readable_signals(
device: PluggableDevice,
read: Sequence[SignalR] = (),
config: Sequence[SignalR] = (),
read_uncached: Sequence[SignalR] = (),
):
"""
Parameters
----------
read:
Signals to make up `read()`
conf:
Signals to make up `read_configuration()`
read_uncached:
Signals to make up `read()` that won't be cached
"""
read, config, read_uncached = tuple(read), tuple(config), tuple(read_uncached)
for sig in read + config:
# Start monitoring them so read is fast
device.add_plug(stage=sig.stage, unstage=sig.unstage)
for sig in config:
device.add_plug(describe_config=sig.describe, read_config=sig.read)
for sig in read:
device.add_plug(describe=sig.describe, read=sig.read)
for sig in read_uncached:
device.add_plug(describe=sig.describe, read=lambda: sig.read(cached=False))


VT = TypeVar("VT", bound=Device)
Expand Down
22 changes: 12 additions & 10 deletions ophyd/v2/epicsdemo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@
import numpy as np
from bluesky.protocols import Movable, Stoppable

from ophyd.v2.core import AsyncStatus, Device, StandardReadable, observe_value
from ophyd.v2.core import (
AsyncStatus,
Device,
PluggableDevice,
add_readable_signals,
observe_value,
)
from ophyd.v2.epics import epics_signal_r, epics_signal_rw, epics_signal_x


Expand All @@ -21,22 +27,19 @@ class EnergyMode(Enum):
high = "High Energy"


class Sensor(StandardReadable):
class Sensor(PluggableDevice):
"""A demo sensor that produces a scalar value based on X and Y Movers"""

def __init__(self, prefix: str, name="") -> None:
# Define some signals
self.value = epics_signal_r(float, prefix + "Value")
self.mode = epics_signal_rw(EnergyMode, prefix + "Mode")
# Set name and signals for read() and read_configuration()
self.set_readable_signals(
read=[self.value],
config=[self.mode],
)
add_readable_signals(self, read=[self.value], config=[self.mode])
super().__init__(name=name)


class Mover(StandardReadable, Movable, Stoppable):
class Mover(PluggableDevice, Movable, Stoppable):
"""A demo movable that moves based on velocity"""

def __init__(self, prefix: str, name="") -> None:
Expand All @@ -51,9 +54,8 @@ def __init__(self, prefix: str, name="") -> None:
# Whether set() should complete successfully or not
self._set_success = True
# Set name and signals for read() and read_configuration()
self.set_readable_signals(
read=[self.readback],
config=[self.velocity, self.units],
add_readable_signals(
self, read=[self.readback], config=[self.velocity, self.units]
)
super().__init__(name=name)

Expand Down