diff --git a/src/ophyd_async/core/_signal_backend.py b/src/ophyd_async/core/_signal_backend.py index a0ad517b7c..03a0e87327 100644 --- a/src/ophyd_async/core/_signal_backend.py +++ b/src/ophyd_async/core/_signal_backend.py @@ -1,5 +1,6 @@ from abc import abstractmethod from collections.abc import Sequence +from enum import IntEnum from typing import Generic, TypedDict, TypeVar, get_origin import numpy as np @@ -30,6 +31,7 @@ | Array1D[np.float32] | Array1D[np.float64] | np.ndarray + | IntEnum | StrictEnum | Sequence[str] | Sequence[StrictEnum] diff --git a/src/ophyd_async/testing/_assert.py b/src/ophyd_async/testing/_assert.py index fcb266862e..6fc7cd92e9 100644 --- a/src/ophyd_async/testing/_assert.py +++ b/src/ophyd_async/testing/_assert.py @@ -1,5 +1,6 @@ import asyncio import time +from collections.abc import Callable from contextlib import AbstractContextManager from typing import Any @@ -60,11 +61,19 @@ async def assert_reading( """ actual_reading = await readable.read() + _assert_readings_approx_equal(expected_reading, actual_reading) + + +def _assert_readings_approx_equal(expected, actual): approx_expected_reading = { - k: dict(v, value=approx_value(expected_reading[k]["value"])) - for k, v in expected_reading.items() + k: dict( + v, + value=approx_value(expected[k]["value"]), + timestamp=approx_value(expected[k]["timestamp"]), + ) + for k, v in expected.items() } - assert actual_reading == approx_expected_reading + assert actual == approx_expected_reading async def assert_configuration( @@ -88,11 +97,7 @@ async def assert_configuration( """ actual_configuration = await configurable.read_configuration() - approx_expected_configuration = { - k: dict(v, value=approx_value(configuration[k]["value"])) - for k, v in configuration.items() - } - assert actual_configuration == approx_expected_configuration + _assert_readings_approx_equal(configuration, actual_configuration) async def assert_describe_signal(signal: SignalR, /, **metadata): @@ -144,29 +149,27 @@ def __eq__(self, value): class MonitorQueue(AbstractContextManager): - def __init__(self, signal: SignalR): + def __init__( + self, signal: SignalR, timestamp_provider: Callable[[], float] | None = None + ): self.signal = signal self.updates: asyncio.Queue[dict[str, Reading]] = asyncio.Queue() - self.signal.subscribe(self.updates.put_nowait) + self._timestamp_provider = timestamp_provider or time.time async def assert_updates(self, expected_value): # Get an update, value and reading - expected_type = type(expected_value) - expected_value = approx_value(expected_value) update = await self.updates.get() - value = await self.signal.get_value() - reading = await self.signal.read() - # Check they match what we expected - assert value == expected_value - assert type(value) is expected_type - expected_reading = { + # reading = await self.signal.read() + await assert_value(self.signal, expected_value) + expected_reading: dict[str, Reading] = { self.signal.name: { "value": expected_value, - "timestamp": pytest.approx(time.time(), rel=0.1), + "timestamp": self._timestamp_provider(), # type: ignore "alarm_severity": 0, } } - assert reading == update == expected_reading + await assert_reading(self.signal, expected_reading) + _assert_readings_approx_equal(expected_reading, update) def __enter__(self): self.signal.subscribe(self.updates.put_nowait) diff --git a/src/ophyd_async/testing/_one_of_everything.py b/src/ophyd_async/testing/_one_of_everything.py index 0ae8385717..1f15e212f0 100644 --- a/src/ophyd_async/testing/_one_of_everything.py +++ b/src/ophyd_async/testing/_one_of_everything.py @@ -1,4 +1,5 @@ from collections.abc import Sequence +from dataclasses import dataclass from typing import Any import numpy as np @@ -7,6 +8,7 @@ Array1D, Device, DTypeScalar_co, + SignalDatatype, SignalRW, StandardReadable, StrictEnum, @@ -32,19 +34,14 @@ class ExampleTable(Table): enum: Sequence[ExampleEnum] -def int_array_signal( - dtype: type[DTypeScalar_co], name: str = "" -) -> SignalRW[Array1D[DTypeScalar_co]]: +def int_array_value(dtype: type[DTypeScalar_co]) -> Array1D[DTypeScalar_co]: iinfo = np.iinfo(dtype) # type: ignore - value = np.array([iinfo.min, iinfo.max, 0, 1, 2, 3, 4], dtype=dtype) - return soft_signal_rw(Array1D[dtype], value, name) + return np.array([iinfo.min, iinfo.max, 0, 1, 2, 3, 4], dtype=dtype) -def float_array_signal( - dtype: type[DTypeScalar_co], name: str = "" -) -> SignalRW[Array1D[DTypeScalar_co]]: +def float_array_value(dtype: type[DTypeScalar_co]) -> Array1D[DTypeScalar_co]: finfo = np.finfo(dtype) # type: ignore - value = np.array( + return np.array( [ finfo.min, finfo.max, @@ -57,7 +54,55 @@ def float_array_signal( ], dtype=dtype, ) - return soft_signal_rw(Array1D[dtype], value, name) + + +@dataclass +class EverythingSignal: + name: str + dtype: type[SignalDatatype] + initial_value: Any = None + + +def get_every_signal_data(): + # list containing necessary info to construct a signal of each type for multiple + # transports e.g. soft/epics/tango + return [ + EverythingSignal("int", int, 1), + EverythingSignal("float", float, 1.234), + EverythingSignal("str", str, "test_string"), + EverythingSignal("bool", bool, True), + EverythingSignal("enum", ExampleEnum, ExampleEnum.B), + EverythingSignal("int8a", Array1D[np.int8], int_array_value(np.int8)), + EverythingSignal("uint8a", Array1D[np.uint8], int_array_value(np.uint8)), + EverythingSignal("int16a", Array1D[np.int16], int_array_value(np.int16)), + EverythingSignal("uint16a", Array1D[np.uint16], int_array_value(np.uint16)), + EverythingSignal("int32a", Array1D[np.int32], int_array_value(np.int32)), + EverythingSignal("uint32a", Array1D[np.uint32], int_array_value(np.uint32)), + EverythingSignal("int64a", Array1D[np.int64], int_array_value(np.int64)), + EverythingSignal("uint64a", Array1D[np.uint64], int_array_value(np.uint64)), + EverythingSignal( + "float32a", Array1D[np.float32], float_array_value(np.float32) + ), + EverythingSignal( + "float64a", Array1D[np.float64], float_array_value(np.float64) + ), + EverythingSignal("stra", Sequence[str], ["one", "two", "three"]), + EverythingSignal( + "enuma", Sequence[ExampleEnum], [ExampleEnum.A, ExampleEnum.C] + ), + EverythingSignal( + "table", + ExampleTable, + ExampleTable( + bool=np.array([False, False, True, True], np.bool_), + int=np.array([1, 8, -9, 32], np.int32), + float=np.array([1.8, 8.2, -6, 32.9887], np.float64), + str=["Hello", "World", "Foo", "Bar"], + enum=[ExampleEnum.A, ExampleEnum.B, ExampleEnum.A, ExampleEnum.C], + ), + ), + EverythingSignal("ndarray", np.ndarray, np.array(([1, 2, 3], [4, 5, 6]))), + ] class OneOfEverythingDevice(StandardReadable): @@ -65,40 +110,8 @@ class OneOfEverythingDevice(StandardReadable): def __init__(self, name=""): # add all signals to configuration with self.add_children_as_readables(Format.CONFIG_SIGNAL): - self.int = soft_signal_rw(int, 1) - self.float = soft_signal_rw(float, 1.234) - self.str = soft_signal_rw(str, "test_string") - self.bool = soft_signal_rw(bool, True) - self.enum = soft_signal_rw(ExampleEnum, ExampleEnum.B) - self.int8a = int_array_signal(np.int8) - self.uint8a = int_array_signal(np.uint8) - self.int16a = int_array_signal(np.int16) - self.uint16a = int_array_signal(np.uint16) - self.int32a = int_array_signal(np.int32) - self.uint32a = int_array_signal(np.uint32) - self.int64a = int_array_signal(np.int64) - self.uint64a = int_array_signal(np.uint64) - self.float32a = float_array_signal(np.float32) - self.float64a = float_array_signal(np.float64) - self.stra = soft_signal_rw( - Sequence[str], - ["one", "two", "three"], - ) - self.enuma = soft_signal_rw( - Sequence[ExampleEnum], - [ExampleEnum.A, ExampleEnum.C], - ) - self.table = soft_signal_rw( - ExampleTable, - ExampleTable( - bool=np.array([False, False, True, True], np.bool_), - int=np.array([1, 8, -9, 32], np.int32), - float=np.array([1.8, 8.2, -6, 32.9887], np.float64), - str=["Hello", "World", "Foo", "Bar"], - enum=[ExampleEnum.A, ExampleEnum.B, ExampleEnum.A, ExampleEnum.C], - ), - ) - self.ndarray = soft_signal_rw(np.ndarray, np.array(([1, 2, 3], [4, 5, 6]))) + for data in get_every_signal_data(): + setattr(self, data.name, soft_signal_rw(data.dtype, data.initial_value)) super().__init__(name) diff --git a/tests/core/test_soft_signal_backend.py b/tests/core/test_soft_signal_backend.py index 399631db54..38274e1ee7 100644 --- a/tests/core/test_soft_signal_backend.py +++ b/tests/core/test_soft_signal_backend.py @@ -10,12 +10,12 @@ from ophyd_async.core import ( Array1D, - SignalBackend, SoftSignalBackend, StrictEnum, T, soft_signal_rw, ) +from ophyd_async.testing import MonitorQueue class MyEnum(StrictEnum): @@ -44,30 +44,6 @@ def waveform_d(value): return {"dtype": "array", "shape": [len(value)]} -class MonitorQueue: - def __init__(self, backend: SignalBackend): - self.backend = backend - self.updates: asyncio.Queue[Reading] = asyncio.Queue() - backend.set_callback(self.updates.put_nowait) - - async def assert_updates(self, expected_value): - expected_reading = { - "value": expected_value, - "timestamp": pytest.approx(time.monotonic(), rel=0.1), - "alarm_severity": 0, - } - reading = await self.updates.get() - - backend_value = await self.backend.get_value() - backend_reading = await self.backend.get_reading() - - assert reading["value"] == expected_value == backend_value - assert reading == expected_reading == backend_reading - - def close(self): - self.backend.set_callback(None) - - # Can be removed once numpy >=2 is pinned. default_int_type = ( " None: setattr(proxy, pv, put_value) -# -------------------------------------------------------------------- -class MonitorQueue: - def __init__(self, backend: SignalBackend): - self.updates: asyncio.Queue[Reading] = asyncio.Queue() - self.backend = backend - self.subscription = backend.set_callback(self.updates.put_nowait) - - async def assert_updates(self, expected_value): - expected_reading = { - "timestamp": pytest.approx(time.time(), rel=0.1), - "alarm_severity": 0, - } - update_reading = dict(await asyncio.wait_for(self.updates.get(), timeout=5)) - update_value = update_reading.pop("value") - assert_close(update_value, expected_value) - backend_reading = dict( - await asyncio.wait_for(self.backend.get_reading(), timeout=5) - ) - backend_reading.pop("value") - backend_value = await asyncio.wait_for(self.backend.get_value(), timeout=5) - assert_close(backend_value, expected_value) - assert update_reading == expected_reading == backend_reading - - def close(self): - self.backend.set_callback(None) - - # -------------------------------------------------------------------- async def assert_monitor_then_put( echo_device: str, @@ -315,10 +301,11 @@ async def assert_monitor_then_put( ): await prepare_device(echo_device, pv, initial_value) source = echo_device + "/" + pv - backend = await make_backend(datatype, source, allow_events=True) + signal = tango_signal_rw(datatype, source) + backend = signal._connector.backend + await signal.connect() # Make a monitor queue that will monitor for updates - q = MonitorQueue(backend) - try: + with MonitorQueue(signal) as q: assert dict(source=source, **descriptor) == await backend.get_datakey("") # Check initial value await q.assert_updates(initial_value) @@ -326,8 +313,6 @@ async def assert_monitor_then_put( await backend.put(put_value, wait=True) assert_close(put_value, await backend.get_setpoint()) await q.assert_updates(put_value) - finally: - q.close() # --------------------------------------------------------------------