From 7e7b949bfa48116540c467d1ae8b1db45c9caf6a Mon Sep 17 00:00:00 2001 From: Tom Cobb Date: Fri, 4 Aug 2023 15:56:25 +0100 Subject: [PATCH] Make StandardReadable into generic PluggableDevice --- ophyd/v2/core.py | 161 +++++++++++++++++++++------------ ophyd/v2/epicsdemo/__init__.py | 22 +++-- 2 files changed, 115 insertions(+), 68 deletions(-) diff --git a/ophyd/v2/core.py b/ophyd/v2/core.py index 559c41d75..22652d35b 100644 --- a/ophyd/v2/core.py +++ b/ophyd/v2/core.py @@ -15,7 +15,7 @@ from enum import Enum from typing import ( Any, - AsyncGenerator, + AsyncIterator, Awaitable, Callable, Coroutine, @@ -37,6 +37,7 @@ import numpy as np from bluesky.protocols import ( + Asset, Configurable, Descriptor, Dtype, @@ -47,6 +48,8 @@ Stageable, Status, Subscribable, + Triggerable, + WritesExternalAssets, ) from bluesky.run_engine import call_in_bluesky_event_loop @@ -782,7 +785,7 @@ async def execute(self, wait=True, timeout=None): await self._backend.put(None, wait=wait, timeout=timeout or self._timeout) -async def observe_value(signal: SignalR[T]) -> AsyncGenerator[T, None]: +async def observe_value(signal: SignalR[T]) -> AsyncIterator[T]: """Subscribe to the value of a signal so it can be iterated from. Parameters @@ -898,83 +901,125 @@ async def set_and_wait_for_value( return status -async def merge_gathered_dicts( - coros: Iterable[Awaitable[Dict[str, T]]] -) -> Dict[str, T]: - """Merge dictionaries produced by a sequence of coroutines. +Plug = Callable[[], T] +DescribePlug = Plug[Awaitable[Dict[str, Descriptor]]] +ReadPlug = Plug[Awaitable[Dict[str, Reading]]] +StatusPlug = Plug[AsyncStatus] +AssetPlug = Plug[AsyncIterator[Asset]] - Can be used for merging ``read()`` or ``describe``. For instance:: - combined_read = await merge_gathered_dicts(s.read() for s in signals) - """ +async def _status_in_parallel(plugs: Iterable[StatusPlug]) -> None: + await asyncio.wait([plug().task for plug in plugs]) + + +async def _merge_gathered_dicts( + callables: Iterable[Plug[Awaitable[Dict[str, T]]]] +) -> Dict[str, T]: + """Merge dictionaries produced by a sequence of coroutines""" ret: Dict[str, T] = {} + coros = [f() for f in callables] for result in await asyncio.gather(*coros): ret.update(result) return ret -class StandardReadable(Device, Readable, Configurable, Stageable): - """Device that owns its children and provides useful default behavior. - - - When its name is set it renames child Devices - - Signals can be registered for read() and read_configuration() - - These signals will be subscribed for read() between stage() and unstage() - """ - - _read_signals: Tuple[SignalR, ...] = () - _configuration_signals: Tuple[SignalR, ...] = () - _read_uncached_signals: Tuple[SignalR, ...] = () - - def set_readable_signals( +class PluggableDevice( + Device, Stageable, Configurable, Triggerable, Readable, WritesExternalAssets +): + _stage_plugs: Tuple[StatusPlug, ...] = () + _describe_config_plugs: Tuple[DescribePlug, ...] = () + _read_config_plugs: Tuple[ReadPlug, ...] = () + _describe_plugs: Tuple[DescribePlug, ...] = () + _trigger_plugs: Tuple[StatusPlug, ...] = () + _read_plugs: Tuple[ReadPlug, ...] = () + _collect_asset_docs_plugs: Tuple[AssetPlug, ...] = () + _unstage_plugs: Tuple[StatusPlug, ...] = () + + def add_plug( self, - read: Sequence[SignalR] = (), - config: Sequence[SignalR] = (), - read_uncached: Sequence[SignalR] = (), + *, + stage: Optional[StatusPlug] = None, + describe_config: Optional[DescribePlug] = None, + read_config: Optional[ReadPlug] = None, + describe: Optional[DescribePlug] = None, + trigger: Optional[StatusPlug] = None, + read: Optional[ReadPlug] = None, + collect_asset_docs: Optional[AssetPlug] = None, + unstage: Optional[StatusPlug] = None, ): - """ - Parameters - ---------- - read: - Signals to make up `read()` - conf: - Signals to make up `read_configuration()` - read_uncached: - Signals to make up `read()` that won't be cached - """ - self._read_signals = tuple(read) - self._configuration_signals = tuple(config) - self._read_uncached_signals = tuple(read_uncached) + if stage: + self._stage_plugs += (stage,) + if describe_config: + self._describe_config_plugs += (describe_config,) + if read_config: + self._read_config_plugs += (read_config,) + if describe: + self._describe_plugs += (describe,) + if trigger: + self._trigger_plugs += (trigger,) + if read: + self._read_plugs += (read,) + if collect_asset_docs: + self._collect_asset_docs_plugs += (collect_asset_docs,) + if unstage: + self._unstage_plugs += (unstage,) @AsyncStatus.wrap async def stage(self) -> None: - for sig in self._read_signals + self._configuration_signals: - await sig.stage().task - - @AsyncStatus.wrap - async def unstage(self) -> None: - for sig in self._read_signals + self._configuration_signals: - await sig.unstage().task + await _status_in_parallel(self._stage_plugs) async def describe_configuration(self) -> Dict[str, Descriptor]: - return await merge_gathered_dicts( - [sig.describe() for sig in self._configuration_signals] - ) + return await _merge_gathered_dicts(self._describe_config_plugs) async def read_configuration(self) -> Dict[str, Reading]: - return await merge_gathered_dicts( - [sig.read() for sig in self._configuration_signals] - ) + return await _merge_gathered_dicts(self._read_config_plugs) async def describe(self) -> Dict[str, Descriptor]: - return await merge_gathered_dicts( - [sig.describe() for sig in self._read_signals + self._read_uncached_signals] - ) + return await _merge_gathered_dicts(self._describe_plugs) + + @AsyncStatus.wrap + async def trigger(self) -> None: + await _status_in_parallel(self._trigger_plugs) async def read(self) -> Dict[str, Reading]: - return await merge_gathered_dicts( - [sig.read() for sig in self._read_signals] - + [sig.read(cached=False) for sig in self._read_uncached_signals] - ) + return await _merge_gathered_dicts(self._read_plugs) + + async def collect_asset_docs(self) -> AsyncIterator[Asset]: + for plug in self._collect_asset_docs_plugs: + async for asset in plug(): + yield asset + + @AsyncStatus.wrap + async def unstage(self) -> None: + await _status_in_parallel(self._unstage_plugs) + + +def add_readable_signals( + device: PluggableDevice, + read: Sequence[SignalR] = (), + config: Sequence[SignalR] = (), + read_uncached: Sequence[SignalR] = (), +): + """ + Parameters + ---------- + read: + Signals to make up `read()` + conf: + Signals to make up `read_configuration()` + read_uncached: + Signals to make up `read()` that won't be cached + """ + read, config, read_uncached = tuple(read), tuple(config), tuple(read_uncached) + for sig in read + config: + # Start monitoring them so read is fast + device.add_plug(stage=sig.stage, unstage=sig.unstage) + for sig in config: + device.add_plug(describe_config=sig.describe, read_config=sig.read) + for sig in read: + device.add_plug(describe=sig.describe, read=sig.read) + for sig in read_uncached: + device.add_plug(describe=sig.describe, read=lambda: sig.read(cached=False)) VT = TypeVar("VT", bound=Device) diff --git a/ophyd/v2/epicsdemo/__init__.py b/ophyd/v2/epicsdemo/__init__.py index e97a963d1..e9b8c4ce0 100644 --- a/ophyd/v2/epicsdemo/__init__.py +++ b/ophyd/v2/epicsdemo/__init__.py @@ -8,7 +8,13 @@ import numpy as np from bluesky.protocols import Movable, Stoppable -from ophyd.v2.core import AsyncStatus, Device, StandardReadable, observe_value +from ophyd.v2.core import ( + AsyncStatus, + Device, + PluggableDevice, + add_readable_signals, + observe_value, +) from ophyd.v2.epics import epics_signal_r, epics_signal_rw, epics_signal_x @@ -21,7 +27,7 @@ class EnergyMode(Enum): high = "High Energy" -class Sensor(StandardReadable): +class Sensor(PluggableDevice): """A demo sensor that produces a scalar value based on X and Y Movers""" def __init__(self, prefix: str, name="") -> None: @@ -29,14 +35,11 @@ def __init__(self, prefix: str, name="") -> None: self.value = epics_signal_r(float, prefix + "Value") self.mode = epics_signal_rw(EnergyMode, prefix + "Mode") # Set name and signals for read() and read_configuration() - self.set_readable_signals( - read=[self.value], - config=[self.mode], - ) + add_readable_signals(self, read=[self.value], config=[self.mode]) super().__init__(name=name) -class Mover(StandardReadable, Movable, Stoppable): +class Mover(PluggableDevice, Movable, Stoppable): """A demo movable that moves based on velocity""" def __init__(self, prefix: str, name="") -> None: @@ -51,9 +54,8 @@ def __init__(self, prefix: str, name="") -> None: # Whether set() should complete successfully or not self._set_success = True # Set name and signals for read() and read_configuration() - self.set_readable_signals( - read=[self.readback], - config=[self.velocity, self.units], + add_readable_signals( + self, read=[self.readback], config=[self.velocity, self.units] ) super().__init__(name=name)