diff --git a/pyproject.toml b/pyproject.toml index 4ea06147cc..e25494a4ba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -75,9 +75,6 @@ dev = [ "types-pyyaml", ] -[project.scripts] -ophyd-async = "ophyd_async.__main__:main" - [project.urls] GitHub = "https://github.com/bluesky/ophyd-async" diff --git a/src/ophyd_async/core/_table.py b/src/ophyd_async/core/_table.py index 62b7b1233a..69ab8e0ea8 100644 --- a/src/ophyd_async/core/_table.py +++ b/src/ophyd_async/core/_table.py @@ -39,6 +39,11 @@ class Table(BaseModel): # so it is strictly checked against the BaseModel we are supplied. model_config = ConfigDict(extra="allow") + # Add an init method to match the above model config, otherwise the type + # checker will not think we can pass arbitrary kwargs into the base class init + def __init__(self, **kwargs): + super().__init__(**kwargs) + @classmethod def __init_subclass__(cls): # But forbit extra in subclasses so it gets validated diff --git a/src/ophyd_async/core/_utils.py b/src/ophyd_async/core/_utils.py index 43ce473b8b..e89131ea75 100644 --- a/src/ophyd_async/core/_utils.py +++ b/src/ophyd_async/core/_utils.py @@ -189,7 +189,7 @@ def get_enum_cls(datatype: type | None) -> type[StrictEnum] | None: if datatype and issubclass(datatype, Enum): if not issubclass(datatype, StrictEnum): raise TypeError( - f"{datatype} should inherit from .SubsetEnum " + f"{datatype} should inherit from ophyd_async.core.SubsetEnum " "or ophyd_async.core.StrictEnum" ) return datatype diff --git a/src/ophyd_async/epics/core/_aioca.py b/src/ophyd_async/epics/core/_aioca.py index d8e7537136..bca8892371 100644 --- a/src/ophyd_async/epics/core/_aioca.py +++ b/src/ophyd_async/epics/core/_aioca.py @@ -40,7 +40,7 @@ def _limits_from_augmented_value(value: AugmentedValue) -> Limits: def get_limits(limit: str) -> LimitsRange | None: low = getattr(value, f"lower_{limit}_limit", nan) high = getattr(value, f"upper_{limit}_limit", nan) - if not (isnan(low) and isnan(high)): + if not (isnan(low) and isnan(high)) and not high == low == 0: return LimitsRange( low=None if isnan(low) else low, high=None if isnan(high) else high, @@ -59,14 +59,16 @@ def get_limits(limit: str) -> LimitsRange | None: def _metadata_from_augmented_value( - value: AugmentedValue, metadata: SignalMetadata + datatype: type[SignalDatatypeT] | None, + value: AugmentedValue, + metadata: SignalMetadata, ) -> SignalMetadata: metadata = metadata.copy() - if hasattr(value, "units"): + if hasattr(value, "units") and datatype not in (str, bool): metadata["units"] = value.units if hasattr(value, "precision") and not isnan(value.precision): metadata["precision"] = value.precision - if limits := _limits_from_augmented_value(value): + if (limits := _limits_from_augmented_value(value)) and datatype is not bool: metadata["limits"] = limits return metadata @@ -290,7 +292,9 @@ async def put(self, value: SignalDatatypeT | None, wait: bool): async def get_datakey(self, source: str) -> DataKey: value = await self._caget(self.read_pv, FORMAT_CTRL) - metadata = _metadata_from_augmented_value(value, self.converter.metadata) + metadata = _metadata_from_augmented_value( + self.datatype, value, self.converter.metadata + ) return make_datakey( self.converter.datatype, self.converter.value(value), source, metadata ) diff --git a/src/ophyd_async/epics/core/_p4p.py b/src/ophyd_async/epics/core/_p4p.py index 5a5fa290dd..e0e2713b0f 100644 --- a/src/ophyd_async/epics/core/_p4p.py +++ b/src/ophyd_async/epics/core/_p4p.py @@ -39,7 +39,7 @@ def get_limits( substructure = getattr(value, substucture_name, None) low = getattr(substructure, low_name, nan) high = getattr(substructure, high_name, nan) - if not (isnan(low) and isnan(high)): + if not (isnan(low) and isnan(high)) and not low == high == 0: return LimitsRange( low=None if isnan(low) else low, high=None if isnan(high) else high, @@ -60,12 +60,21 @@ def get_limits( def _metadata_from_value(datatype: type[SignalDatatype], value: Any) -> SignalMetadata: metadata = SignalMetadata() value_data: Any = getattr(value, "value", None) + specifier = _get_specifier(value) display_data: Any = getattr(value, "display", None) - if hasattr(display_data, "units"): + if ( + hasattr(display_data, "units") + and specifier[-1] in _number_specifiers + and datatype is not str + ): metadata["units"] = display_data.units - if hasattr(display_data, "precision") and not isnan(display_data.precision): + if ( + hasattr(display_data, "precision") + and not isnan(display_data.precision) + and specifier[-1] in _float_specifiers + ): metadata["precision"] = display_data.precision - if limits := _limits_from_value(value): + if (limits := _limits_from_value(value)) and specifier[-1] in _number_specifiers: metadata["limits"] = limits # Get choices from display or value if datatype is str or issubclass(datatype, StrictEnum): @@ -174,6 +183,9 @@ def write_value(self, value: BaseModel | dict[str, Any]) -> Any: # https://mdavidsaver.github.io/p4p/values.html +_float_specifiers = {"f", "d"} +_int_specifiers = {"b", "B", "h", "H", "i", "I", "l", "L"} +_number_specifiers = _float_specifiers.union(_int_specifiers) _datatype_converter_from_typeid: dict[ tuple[str, str], tuple[type[SignalDatatype], type[PvaConverter]] ] = { @@ -208,7 +220,7 @@ def write_value(self, value: BaseModel | dict[str, Any]) -> Any: } -def _get_specifier(value: Value): +def _get_specifier(value: Value) -> str: typ = value.type("value").aspy() if isinstance(typ, tuple): return typ[0] diff --git a/src/ophyd_async/epics/testing/__init__.py b/src/ophyd_async/epics/testing/__init__.py index d8f39ef841..c0903af12e 100644 --- a/src/ophyd_async/epics/testing/__init__.py +++ b/src/ophyd_async/epics/testing/__init__.py @@ -1,24 +1,24 @@ -from ._example_ioc import ( +from ._example import ( CA_PVA_RECORDS, PVA_RECORDS, - ExampleCaDevice, - ExampleEnum, - ExamplePvaDevice, - ExampleTable, - connect_example_device, - get_example_ioc, + EpicsTestCaDevice, + EpicsTestEnum, + EpicsTestIocAndDevices, + EpicsTestPvaDevice, + EpicsTestSubsetEnum, + EpicsTestTable, ) -from ._utils import TestingIOC, generate_random_PV_prefix +from ._utils import TestingIOC, generate_random_pv_prefix __all__ = [ "CA_PVA_RECORDS", "PVA_RECORDS", - "ExampleCaDevice", - "ExampleEnum", - "ExamplePvaDevice", - "ExampleTable", - "connect_example_device", - "get_example_ioc", + "EpicsTestCaDevice", + "EpicsTestEnum", + "EpicsTestSubsetEnum", + "EpicsTestPvaDevice", + "EpicsTestTable", + "EpicsTestIocAndDevices", "TestingIOC", - "generate_random_PV_prefix", + "generate_random_pv_prefix", ] diff --git a/src/ophyd_async/epics/testing/_example_ioc.py b/src/ophyd_async/epics/testing/_example_ioc.py index 670ae10edf..6ae7a313fa 100644 --- a/src/ophyd_async/epics/testing/_example_ioc.py +++ b/src/ophyd_async/epics/testing/_example_ioc.py @@ -1,50 +1,48 @@ from collections.abc import Sequence from pathlib import Path from typing import Annotated as A -from typing import Literal import numpy as np -from ophyd_async.core import ( - Array1D, - SignalRW, - StrictEnum, - Table, -) -from ophyd_async.epics.core import ( - EpicsDevice, - PvSuffix, -) +from ophyd_async.core import Array1D, SignalR, SignalRW, StrictEnum, Table +from ophyd_async.core._utils import SubsetEnum +from ophyd_async.epics.core import EpicsDevice, PvSuffix -from ._utils import TestingIOC +from ._utils import TestingIOC, generate_random_pv_prefix -CA_PVA_RECORDS = str(Path(__file__).parent / "test_records.db") -PVA_RECORDS = str(Path(__file__).parent / "test_records_pva.db") +CA_PVA_RECORDS = Path(__file__).parent / "test_records.db" +PVA_RECORDS = Path(__file__).parent / "test_records_pva.db" -class ExampleEnum(StrictEnum): +class EpicsTestEnum(StrictEnum): A = "Aaa" B = "Bbb" C = "Ccc" -class ExampleTable(Table): +class EpicsTestSubsetEnum(SubsetEnum): + A = "Aaa" + B = "Bbb" + + +class EpicsTestTable(Table): bool: Array1D[np.bool_] int: Array1D[np.int32] float: Array1D[np.float64] str: Sequence[str] - enum: Sequence[ExampleEnum] + enum: Sequence[EpicsTestEnum] -class ExampleCaDevice(EpicsDevice): +class EpicsTestCaDevice(EpicsDevice): my_int: A[SignalRW[int], PvSuffix("int")] my_float: A[SignalRW[float], PvSuffix("float")] my_str: A[SignalRW[str], PvSuffix("str")] longstr: A[SignalRW[str], PvSuffix("longstr")] - longstr2: A[SignalRW[str], PvSuffix("longstr2")] + longstr2: A[SignalRW[str], PvSuffix("longstr2.VAL$")] my_bool: A[SignalRW[bool], PvSuffix("bool")] - enum: A[SignalRW[ExampleEnum], PvSuffix("enum")] - enum2: A[SignalRW[ExampleEnum], PvSuffix("enum2")] + enum: A[SignalRW[EpicsTestEnum], PvSuffix("enum")] + enum2: A[SignalRW[EpicsTestEnum], PvSuffix("enum2")] + subset_enum: A[SignalRW[EpicsTestSubsetEnum], PvSuffix("subset_enum")] bool_unnamed: A[SignalRW[bool], PvSuffix("bool_unnamed")] partialint: A[SignalRW[int], PvSuffix("partialint")] lessint: A[SignalRW[int], PvSuffix("lessint")] @@ -56,52 +54,36 @@ class ExampleCaDevice(EpicsDevice): stra: A[SignalRW[Sequence[str]], PvSuffix("stra")] -class ExamplePvaDevice(ExampleCaDevice): # pva can support all signal types that ca can +class EpicsTestPvaDevice(EpicsTestCaDevice): + # pva can support all signal types that ca can int8a: A[SignalRW[Array1D[np.int8]], PvSuffix("int8a")] uint16a: A[SignalRW[Array1D[np.uint16]], PvSuffix("uint16a")] uint32a: A[SignalRW[Array1D[np.uint32]], PvSuffix("uint32a")] int64a: A[SignalRW[Array1D[np.int64]], PvSuffix("int64a")] uint64a: A[SignalRW[Array1D[np.uint64]], PvSuffix("uint64a")] - table: A[SignalRW[ExampleTable], PvSuffix("table")] - ntndarray_data: A[SignalRW[Array1D[np.int64]], PvSuffix("ntndarray:data")] - - -async def connect_example_device( - ioc: TestingIOC, protocol: Literal["ca", "pva"] -) -> ExamplePvaDevice | ExampleCaDevice: - """Helper function to return a connected example device. - - Parameters - ---------- - - ioc: TestingIOC - TestingIOC configured to provide the records needed for the device - - protocol: Literal["ca", "pva"] - The transport protocol of the device - - Returns - ------- - ExamplePvaDevice | ExampleCaDevice - a connected EpicsDevice with signals of many EPICS record types - """ - device_cls = ExamplePvaDevice if protocol == "pva" else ExampleCaDevice - device = device_cls(f"{protocol}://{ioc.prefix_for(device_cls)}") - await device.connect() - return device - - -def get_example_ioc() -> TestingIOC: - """Get TestingIOC instance with the example databases loaded. - - Returns - ------- - TestingIOC - instance with test_records.db loaded for ExampleCaDevice and - test_records.db and test_records_pva.db loaded for ExamplePvaDevice. - """ - ioc = TestingIOC() - ioc.database_for(PVA_RECORDS, ExamplePvaDevice) - ioc.database_for(CA_PVA_RECORDS, ExamplePvaDevice) - ioc.database_for(CA_PVA_RECORDS, ExampleCaDevice) - return ioc + table: A[SignalRW[EpicsTestTable], PvSuffix("table")] + ntndarray: A[SignalR[np.ndarray], PvSuffix("ntndarray")] + + +class EpicsTestIocAndDevices: + def __init__(self): + self.prefix = generate_random_pv_prefix() + self.ioc = TestingIOC() + # Create supporting records and ExampleCaDevice + ca_prefix = f"{self.prefix}ca:" + self.ioc.add_database(CA_PVA_RECORDS, device=ca_prefix) + self.ca_device = EpicsTestCaDevice(f"ca://{ca_prefix}") + # Create supporting records and ExamplePvaDevice + pva_prefix = f"{self.prefix}pva:" + self.ioc.add_database(CA_PVA_RECORDS, device=pva_prefix) + self.ioc.add_database(PVA_RECORDS, device=pva_prefix) + self.pva_device = EpicsTestPvaDevice(f"pva://{pva_prefix}") + + def get_device(self, protocol: str) -> EpicsTestCaDevice | EpicsTestPvaDevice: + return getattr(self, f"{protocol}_device") + + def get_signal(self, protocol: str, name: str) -> SignalRW: + return getattr(self.get_device(protocol), name) + + def get_pv(self, protocol: str, name: str) -> str: + return f"{protocol}://{self.prefix}{protocol}:{name}" diff --git a/src/ophyd_async/epics/testing/_utils.py b/src/ophyd_async/epics/testing/_utils.py index 8e156f5c0b..efe8cc6966 100644 --- a/src/ophyd_async/epics/testing/_utils.py +++ b/src/ophyd_async/epics/testing/_utils.py @@ -5,39 +5,18 @@ import time from pathlib import Path -from aioca import purge_channel_caches -from ophyd_async.core import Device - - -def generate_random_PV_prefix() -> str: +def generate_random_pv_prefix() -> str: return "".join(random.choice(string.ascii_lowercase) for _ in range(12)) + ":" class TestingIOC: - _dbs: dict[type[Device], list[Path]] = {} - _prefixes: dict[type[Device], str] = {} - - @classmethod - def with_database(cls, db: Path | str): # use as a decorator - def inner(device_cls: type[Device]): - cls.database_for(db, device_cls) - return device_cls - - return inner - - @classmethod - def database_for(cls, db, device_cls): - path = Path(db) - if not path.is_file(): - raise OSError(f"{path} is not a file.") - if device_cls not in cls._dbs: - cls._dbs[device_cls] = [] - cls._dbs[device_cls].append(path) + def __init__(self): + self._db_macros: list[tuple[Path, dict[str, str]]] = [] + self.output = "" - def prefix_for(self, device_cls): - # generate random prefix, return existing if already generated - return self._prefixes.setdefault(device_cls, generate_random_PV_prefix()) + def add_database(self, db: Path | str, /, **macros: str): + self._db_macros.append((Path(db), macros)) def start_ioc(self): ioc_args = [ @@ -45,10 +24,10 @@ def start_ioc(self): "-m", "epicscorelibs.ioc", ] - for device_cls, dbs in self._dbs.items(): - prefix = self.prefix_for(device_cls) - for db in dbs: - ioc_args += ["-m", f"device={prefix}", "-d", str(db)] + for db, macros in self._db_macros: + macro_str = ",".join(f"{k}={v}" for k, v in macros.items()) + ioc_args += ["-m", macro_str, "-d", str(db)] + print(ioc_args) self._process = subprocess.Popen( ioc_args, stdin=subprocess.PIPE, @@ -56,23 +35,18 @@ def start_ioc(self): stderr=subprocess.STDOUT, universal_newlines=True, ) + assert self._process.stdout start_time = time.monotonic() - while "iocRun: All initialization complete" not in ( - self._process.stdout.readline().strip() # type: ignore - ): + while "iocRun: All initialization complete" not in self.output: if time.monotonic() - start_time > 10: - try: - print(self._process.communicate("exit()")[0]) - except ValueError: - # Someone else already called communicate - pass - raise TimeoutError("IOC did not start in time") + self.stop_ioc() + raise TimeoutError(f"IOC did not start in time:\n{self.output}") + self.output += self._process.stdout.readline() def stop_ioc(self): - # close backend caches before the event loop - purge_channel_caches() try: - print(self._process.communicate("exit()")[0]) + self.output += self._process.communicate("exit()")[0] except ValueError: # Someone else already called communicate pass + print(self.output) diff --git a/src/ophyd_async/epics/testing/test_records.db b/src/ophyd_async/epics/testing/test_records.db index ff45a6350c..082ba6d421 100644 --- a/src/ophyd_async/epics/testing/test_records.db +++ b/src/ophyd_async/epics/testing/test_records.db @@ -96,6 +96,14 @@ record(mbbo, "$(device)enum2") { field(PINI, "YES") } +record(mbbo, "$(device)subset_enum") { + field(ZRST, "Aaa") + field(ONST, "Bbb") + field(TWST, "Ccc") + field(VAL, "1") + field(PINI, "YES") +} + record(waveform, "$(device)uint8a") { field(NELM, "3") field(FTVL, "UCHAR") diff --git a/src/ophyd_async/testing/__init__.py b/src/ophyd_async/testing/__init__.py index ac8e1bcf9e..2e65cba983 100644 --- a/src/ophyd_async/testing/__init__.py +++ b/src/ophyd_async/testing/__init__.py @@ -1,5 +1,9 @@ +from . import __pytest_assert_rewrite # noqa: F401 from ._assert import ( + ApproxTable, + MonitorQueue, assert_configuration, + assert_describe_signal, assert_emitted, assert_reading, assert_value, @@ -24,6 +28,7 @@ __all__ = [ "assert_configuration", + "assert_describe_signal", "assert_emitted", "assert_reading", "assert_value", @@ -40,4 +45,6 @@ "ExampleTable", "OneOfEverythingDevice", "ParentOfEverythingDevice", + "MonitorQueue", + "ApproxTable", ] diff --git a/src/ophyd_async/testing/__pytest_assert_rewrite.py b/src/ophyd_async/testing/__pytest_assert_rewrite.py new file mode 100644 index 0000000000..0aae6e126f --- /dev/null +++ b/src/ophyd_async/testing/__pytest_assert_rewrite.py @@ -0,0 +1,5 @@ +import pytest + +# So that bare asserts give a nice pytest traceback +pytest.register_assert_rewrite("ophyd_async.testing._assert") +pytest.register_assert_rewrite("ophyd_async.testing._monitor_queue") diff --git a/src/ophyd_async/testing/_assert.py b/src/ophyd_async/testing/_assert.py index 849c1a74ec..da2281c1d8 100644 --- a/src/ophyd_async/testing/_assert.py +++ b/src/ophyd_async/testing/_assert.py @@ -1,10 +1,20 @@ +import asyncio +import time from collections.abc import Mapping +from contextlib import AbstractContextManager from typing import Any import pytest from bluesky.protocols import Reading +from event_model import DataKey -from ophyd_async.core import AsyncConfigurable, AsyncReadable, SignalDatatypeT, SignalR +from ophyd_async.core import ( + AsyncConfigurable, + AsyncReadable, + SignalDatatypeT, + SignalR, + Table, +) def _generate_assert_error_msg(name: str, expected_result, actual_result) -> str: @@ -108,6 +118,14 @@ async def assert_configuration( ) +async def assert_describe_signal(signal: SignalR, /, **metadata): + actual_describe = await signal.describe() + assert list(actual_describe) == [signal.name] + (actual_datakey,) = actual_describe.values() + expected_datakey = DataKey(source=signal.source, **metadata) + assert actual_datakey == expected_datakey + + def assert_emitted(docs: Mapping[str, list[dict]], **numbers: int): """Assert emitted document generated by running a Bluesky plan @@ -138,3 +156,53 @@ def assert_emitted(docs: Mapping[str, list[dict]], **numbers: int): expected_result=numbers, actual_result=actual_numbers, ) + + +class ApproxTable: + def __init__(self, expected: Table, rel=None, abs=None, nan_ok: bool = False): + self.expected = expected + self.rel = rel + self.abs = abs + self.nan_ok = nan_ok + + def __eq__(self, value): + approx_fields = { + k: pytest.approx(v, self.rel, self.abs, self.nan_ok) + for k, v in self.expected + } + expected = type(self.expected).model_construct(**approx_fields) # type: ignore + return expected == value + + +class MonitorQueue(AbstractContextManager): + def __init__(self, signal: SignalR): + self.signal = signal + self.updates: asyncio.Queue[dict[str, Reading]] = asyncio.Queue() + self.signal.subscribe(self.updates.put_nowait) + + async def assert_updates(self, expected_value): + # Get an update, value and reading + if isinstance(expected_value, Table): + expected_value = ApproxTable(expected_value) + else: + expected_value = pytest.approx(expected_value) + update = await self.updates.get() + value = await self.signal.get_value() + reading = await self.signal.read() + # Check they match what we expected + assert value == expected_value + expected_reading = { + self.signal.name: { + "value": expected_value, + "timestamp": pytest.approx(time.time(), rel=0.1), + "alarm_severity": 0, + } + } + assert reading == update == expected_reading + + def __enter__(self): + self.signal.subscribe(self.updates.put_nowait) + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.signal.clear_sub(self.updates.put_nowait) diff --git a/src/ophyd_async/testing/conftest.py b/src/ophyd_async/testing/conftest.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/epics/signal/test_epics_signals.py b/tests/epics/signal/test_epics_signals.py new file mode 100644 index 0000000000..cf023a4981 --- /dev/null +++ b/tests/epics/signal/test_epics_signals.py @@ -0,0 +1,652 @@ +import asyncio +import os +import time +from collections.abc import Awaitable, Callable +from enum import Enum +from pathlib import Path +from typing import Generic, Literal, get_args + +import bluesky.plan_stubs as bps +import numpy as np +import pytest +import yaml +from aioca import purge_channel_caches +from bluesky.protocols import Location +from event_model import Dtype, Limits, LimitsRange +from ophyd.signal import EpicsSignal + +from ophyd_async.core import ( + Array1D, + NotConnected, + Signal, + SignalDatatypeT, + SignalR, + SignalRW, + StrictEnum, + SubsetEnum, + T, + Table, + YamlSettingsProvider, + observe_value, +) +from ophyd_async.epics.core import ( + CaSignalBackend, + PvaSignalBackend, + epics_signal_r, + epics_signal_rw, + epics_signal_rw_rbv, + epics_signal_w, + epics_signal_x, +) +from ophyd_async.epics.testing import ( + EpicsTestEnum, + EpicsTestIocAndDevices, + EpicsTestSubsetEnum, + EpicsTestTable, +) +from ophyd_async.plan_stubs import ( + apply_settings, + ensure_connected, + retrieve_settings, + store_settings, +) +from ophyd_async.testing import MonitorQueue, assert_describe_signal + +Protocol = Literal["ca", "pva"] + + +@pytest.fixture(scope="module") +def ioc_devices(): + ioc_devices = EpicsTestIocAndDevices() + ioc_devices.ioc.start_ioc() + yield ioc_devices + # Purge the channel caches before we stop the IOC to stop + # RuntimeError: Event loop is closed errors on teardown + purge_channel_caches() + ioc_devices.ioc.stop_ioc() + + +class ExpectedData(Generic[T]): + def __init__( + self, initial: T, put: T, dtype: Dtype, dtype_numpy: str | list, **metadata + ): + self.initial = initial + self.put = put + self.metadata = dict(dtype=dtype, dtype_numpy=dtype_numpy, **metadata) + + +async def assert_monitor_then_put( + signal: SignalR[SignalDatatypeT], + initial_value: SignalDatatypeT, + put_value: SignalDatatypeT, + metadata: dict, + signal_set: Callable[[SignalDatatypeT], Awaitable[None]] | None = None, +): + if signal_set is None: + assert isinstance(signal, SignalRW) + signal_set = signal.set + await signal.connect(timeout=1) + with MonitorQueue(signal) as q: + # Check initial value + await q.assert_updates(initial_value) + # Check descriptor + if isinstance(initial_value, np.ndarray): + shape = list(initial_value.shape) + elif isinstance(initial_value, list | Table): + shape = [len(initial_value)] + else: + shape = [] + await assert_describe_signal(signal, shape=shape, **metadata) + # Put to new value and check it + await signal_set(put_value) + await q.assert_updates(put_value) + + +# Can be removed once numpy >=2 is pinned. +default_int_type = ( + " list: + return [ + # natively bool fields are uint8, so if we don't provide a Table + # subclass to specify bool, that is what we get + ("bool", "|u1" if guess else "|b1"), + ("int", "", + "requested ['Aaa', 'B', 'Ccc'] to be strictly equal", + ), + ), + ( + SubsetEnumWrongChoices, + "enum", + ( + "has choices ('Aaa', 'Bbb', 'Ccc')", + "but ", + "requested ['Aaa', 'B', 'Ccc'] to be a subset", + ), + ), + ( + int, + "str", + ("with inferred datatype str", "cannot be coerced to int"), + ), + ( + str, + "float", + ("with inferred datatype float", "cannot be coerced to str"), + ), + ( + str, + "stra", + ("with inferred datatype Sequence[str]", "cannot be coerced to str"), + ), + ( + int, + "uint8a", + ("with inferred datatype Array1D[np.uint8]", "cannot be coerced to int"), + ), + ( + float, + "enum", + ("with inferred datatype str", "cannot be coerced to float"), + ), + ( + Array1D[np.int32], + "float64a", + ( + "with inferred datatype Array1D[np.float64]", + "cannot be coerced to Array1D[np.int32]", + ), + ), + ( + EnumNoString, + "enum2", + ( + " should inherit from ", + "ophyd_async.core.SubsetEnum or ophyd_async.core.StrictEnum", + ), + ), + ], +) +async def test_backend_wrong_type_errors( + ioc_devices: EpicsTestIocAndDevices, typ, suff, errors, protocol: Protocol +): + signal = epics_signal_rw(typ, ioc_devices.get_pv(protocol, suff)) + with pytest.raises(TypeError) as cm: + await signal.connect() + for error in errors: + assert error in str(cm.value) + + +@pytest.mark.parametrize("protocol", get_args(Protocol)) +async def test_backend_put_enum_string( + ioc_devices: EpicsTestIocAndDevices, protocol: Protocol +): + signal = ioc_devices.get_signal(protocol, "enum2") + await signal.connect() + await signal.set("Ccc") + assert ( + Location(setpoint=EpicsTestEnum.C, readback=EpicsTestEnum.C) + == await signal.locate() + ) + val = await signal.get_value() + assert val == "Ccc" + assert val is EpicsTestEnum.C + assert repr(val) == "" + + +@pytest.mark.parametrize("protocol", get_args(Protocol)) +async def test_non_existent_errors( + ioc_devices: EpicsTestIocAndDevices, protocol: Protocol +): + signal = epics_signal_rw(str, "non-existent") + with pytest.raises(NotConnected): + await signal.connect(timeout=0.1) + + +def test_make_backend_fails_for_different_transports(): + read_pv = "test" + write_pv = "pva://test" + + with pytest.raises( + TypeError, + match=f"Differing protocols: {read_pv} has EpicsProtocol.CA," + + f" {write_pv} has EpicsProtocol.PVA", + ): + epics_signal_rw(str, read_pv, write_pv) + + +def _get_epics_backend(signal: Signal) -> CaSignalBackend | PvaSignalBackend: + backend = signal._connector.backend + assert isinstance(backend, CaSignalBackend | PvaSignalBackend) + return backend + + +def test_signal_helpers(): + read_write = epics_signal_rw(int, "ReadWrite") + assert _get_epics_backend(read_write).read_pv == "ReadWrite" + assert _get_epics_backend(read_write).write_pv == "ReadWrite" + + read_write_rbv_manual = epics_signal_rw(int, "ReadWrite_RBV", "ReadWrite") + assert _get_epics_backend(read_write_rbv_manual).read_pv == "ReadWrite_RBV" + assert _get_epics_backend(read_write_rbv_manual).write_pv == "ReadWrite" + + read_write_rbv = epics_signal_rw_rbv(int, "ReadWrite") + assert _get_epics_backend(read_write_rbv).read_pv == "ReadWrite_RBV" + assert _get_epics_backend(read_write_rbv).write_pv == "ReadWrite" + + read_write_rbv_suffix = epics_signal_rw_rbv(int, "ReadWrite", read_suffix=":RBV") + assert _get_epics_backend(read_write_rbv_suffix).read_pv == "ReadWrite:RBV" + assert _get_epics_backend(read_write_rbv_suffix).write_pv == "ReadWrite" + + read_write_rbv_w_field = epics_signal_rw_rbv(int, "ReadWrite.VAL") + assert _get_epics_backend(read_write_rbv_w_field).read_pv == "ReadWrite_RBV.VAL" + assert _get_epics_backend(read_write_rbv_w_field).write_pv == "ReadWrite.VAL" + + read = epics_signal_r(int, "Read") + assert _get_epics_backend(read).read_pv == "Read" + + write = epics_signal_w(int, "Write") + assert _get_epics_backend(write).write_pv == "Write" + + execute = epics_signal_x("Execute") + assert _get_epics_backend(execute).write_pv == "Execute" + + +@pytest.mark.parametrize("protocol", get_args(Protocol)) +async def test_signals_created_for_prec_0_float_can_use_int( + ioc_devices: EpicsTestIocAndDevices, protocol: Protocol +): + sig = epics_signal_rw(int, ioc_devices.get_pv(protocol, "float_prec_0")) + await sig.connect() + + +@pytest.mark.parametrize("protocol", get_args(Protocol)) +async def test_signals_created_for_not_prec_0_float_cannot_use_int( + ioc_devices: EpicsTestIocAndDevices, protocol: Protocol +): + sig = epics_signal_rw(int, ioc_devices.get_pv(protocol, "float_prec_1")) + with pytest.raises( + TypeError, + match="float_prec_1 with inferred datatype float" ".* cannot be coerced to int", + ): + await sig.connect() + + +@pytest.mark.parametrize("protocol", get_args(Protocol)) +async def test_bool_works_for_mismatching_enums( + ioc_devices: EpicsTestIocAndDevices, protocol: Protocol +): + pv_name = ioc_devices.get_pv(protocol, "bool") + sig = epics_signal_rw(bool, pv_name, pv_name + "_unnamed") + await sig.connect() + + +async def test_can_read_using_ophyd_async_then_ophyd( + RE, ioc_devices: EpicsTestIocAndDevices +): + ophyd_async_sig = epics_signal_rw(float, ioc_devices.get_pv("ca", "float_prec_1")) + await ophyd_async_sig.connect() + ophyd_signal = EpicsSignal(ioc_devices.get_pv("ca", "float_prec_0").split("://")[1]) + ophyd_signal.wait_for_connection(timeout=5) + + def my_plan(): + yield from bps.rd(ophyd_async_sig) + yield from bps.rd(ophyd_signal) + + RE(my_plan()) + + +def test_signal_module_emits_deprecation_warning(): + with pytest.deprecated_call(): + import ophyd_async.epics.signal # noqa: F401 + + +@pytest.mark.parametrize("protocol", get_args(Protocol)) +async def test_observe_ticking_signal_with_busy_loop( + ioc_devices: EpicsTestIocAndDevices, protocol: Protocol +): + sig = epics_signal_rw(int, ioc_devices.get_pv("ca", "ticking")) + await sig.connect() + + recv = [] + + async def watch(): + async for val in observe_value(sig, done_timeout=0.4): + time.sleep(0.3) + recv.append(val) + + start = time.time() + + with pytest.raises(asyncio.TimeoutError): + await watch() + assert time.time() - start == pytest.approx(0.6, abs=0.1) + assert len(recv) == 2 + # Don't check values as CA and PVA have different algorithms for + # dropping updates for slow callbacks + + +HERE = Path(__file__).absolute().parent + + +@pytest.mark.parametrize("protocol", get_args(Protocol)) +async def test_retrieve_apply_store_settings( + RE, ioc_devices: EpicsTestIocAndDevices, protocol: Protocol, tmp_path +): + tmp_provider = YamlSettingsProvider(tmp_path) + expected_provider = YamlSettingsProvider(HERE) + device = ioc_devices.get_device(protocol) + + def my_plan(): + yield from ensure_connected(device) + settings = yield from retrieve_settings( + expected_provider, f"test_yaml_save_{protocol}", device + ) + yield from apply_settings(settings) + yield from store_settings(tmp_provider, "test_file", device) + with open(tmp_path / "test_file.yaml") as actual_file: + with open(HERE / f"test_yaml_save_{protocol}.yaml") as expected_file: + assert yaml.safe_load(actual_file) == yaml.safe_load(expected_file) + + RE(my_plan()) diff --git a/tests/epics/signal/test_file.yaml b/tests/epics/signal/test_file.yaml new file mode 100644 index 0000000000..051b650eb0 --- /dev/null +++ b/tests/epics/signal/test_file.yaml @@ -0,0 +1,40 @@ +bool_unnamed: true +enum: Bbb +enum2: Bbb +float32a: [1.9999999949504854e-06, -123.12300109863281] +float64a: [0.1, -12345678.123] +int16a: [-32768, 32767] +int32a: [-2147483648, 2147483647] +int64a: [-2147483649, 2147483648] +int8a: [-128, 127] +lessint: 42 +longstr: a string that is just longer than forty characters +longstr2: a string that is just longer than forty characters +my_bool: true +my_float: 3.141 +my_int: 42 +my_str: hello +partialint: 42 +stra: +- five +- six +- seven +subset_enum: Bbb +table: + bool: [false, false, true, true] + enum: + - Aaa + - Bbb + - Aaa + - Ccc + float: [1.8, 8.2, -6.0, 32.9887] + int: [1, 8, -9, 32] + str: + - Hello + - World + - Foo + - Bar +uint16a: [0, 65535] +uint32a: [0, 4294967295] +uint64a: [0, 4294967297] +uint8a: [0, 255] diff --git a/tests/epics/signal/test_signals.py b/tests/epics/signal/test_signals.py deleted file mode 100644 index eb3b0cca8f..0000000000 --- a/tests/epics/signal/test_signals.py +++ /dev/null @@ -1,988 +0,0 @@ -import asyncio -import os -import time -from collections.abc import Sequence -from contextlib import closing -from enum import Enum -from pathlib import Path -from types import GenericAlias -from typing import Any, Literal, get_args -from unittest.mock import ANY - -import bluesky.plan_stubs as bps -import numpy as np -import pytest -from bluesky.protocols import Reading -from bluesky.run_engine import RunEngine -from event_model import DataKey, Limits, LimitsRange -from ophyd.signal import EpicsSignal - -from ophyd_async.core import ( - Array1D, - NotConnected, - SignalBackend, - StrictEnum, - SubsetEnum, - T, - Table, - YamlSettingsProvider, - observe_value, -) -from ophyd_async.epics.core import ( - EpicsDevice, - epics_signal_r, - epics_signal_rw, - epics_signal_rw_rbv, - epics_signal_w, - epics_signal_x, -) -from ophyd_async.epics.core._signal import ( - _epics_signal_backend, # noqa: PLC2701 -) -from ophyd_async.epics.testing import ( - ExampleCaDevice, - ExampleEnum, - ExamplePvaDevice, - ExampleTable, - TestingIOC, - connect_example_device, - get_example_ioc, -) - - -class MySubsetEnum(SubsetEnum): - A = "Aaa" - B = "Bbb" - C = "Ccc" - - -Protocol = Literal["ca", "pva"] -PARAMETERISE_PROTOCOLS = pytest.mark.parametrize("protocol", get_args(Protocol)) - - -@pytest.fixture(scope="module") -def ioc(): - ioc = get_example_ioc() - ioc.start_ioc() - yield ioc - ioc.stop_ioc() - - -def get_prefix(ioc: TestingIOC, protocol: Protocol): - device_cls = ExamplePvaDevice if protocol == "pva" else ExampleCaDevice - return ioc.prefix_for(device_cls) - - -async def _make_backend(ioc, typ: type | None, protocol: str, suff: str, timeout=10.0): - device_cls = ExampleCaDevice if protocol == "ca" else ExamplePvaDevice - prefix = ioc.prefix_for(device_cls) - pv = f"{protocol}://{prefix}{suff}" - # Make and connect the backend - backend = _epics_signal_backend(typ, pv, pv) - await backend.connect(timeout=timeout) - return backend - - -def assert_types_are_equal(t_actual, t_expected, actual_value): - expected_plain_type = getattr(t_expected, "__origin__", t_expected) - if issubclass(expected_plain_type, np.ndarray): - actual_plain_type = getattr(t_actual, "__origin__", t_actual) - assert actual_plain_type == expected_plain_type - actual_dtype_type = actual_value.dtype.type - expected_dtype_type = t_expected.__args__[1].__args__[0] - assert actual_dtype_type == expected_dtype_type - elif ( - expected_plain_type is not str - and not issubclass(expected_plain_type, Enum) - and issubclass(expected_plain_type, Sequence) - ): - actual_plain_type = getattr(t_actual, "__origin__", t_actual) - assert issubclass(actual_plain_type, expected_plain_type) - assert len(actual_value) == 0 or isinstance( - actual_value[0], t_expected.__args__[0] - ) - else: - assert t_actual == t_expected - - -class MonitorQueue: - def __init__(self, backend: SignalBackend): - self.backend = backend - self.updates: asyncio.Queue[Reading] = asyncio.Queue() - self.subscription = backend.set_callback(self.updates.put_nowait) - - async def assert_updates(self, expected_value, expected_type=None): - expected_reading = { - "value": expected_value, - "timestamp": pytest.approx(time.time(), rel=0.1), - "alarm_severity": 0, - } - backend_reading = await asyncio.wait_for(self.backend.get_reading(), timeout=5) - backend_value = await asyncio.wait_for(self.backend.get_value(), timeout=5) - update_reading = await asyncio.wait_for(self.updates.get(), timeout=5) - update_value = update_reading["value"] - - # We can't compare arrays of bool easily so we do it as numpy rows - if issubclass(type(update_value), Table): - assert all( - row1 == row2 - for row1, row2 in zip( - expected_value.numpy_table(), - update_value.numpy_table(), - strict=True, - ) - ) - assert all( - row1 == row2 - for row1, row2 in zip( - expected_value.numpy_table(), - backend_value.numpy_table(), - strict=True, - ) - ) - else: - assert update_value == expected_value == backend_value - - if expected_type: - assert_types_are_equal(type(update_value), expected_type, update_value) - assert_types_are_equal(type(backend_value), expected_type, backend_value) - - for key in expected_reading: - if key == "value": - continue - assert update_reading[key] == expected_reading[key] - assert backend_reading[key] == expected_reading[key] - - def close(self): - self.backend.set_callback(None) - - -def _is_numpy_subclass(t): - if t is None: - return False - plain_type = t.__origin__ if isinstance(t, GenericAlias) else t - return issubclass(plain_type, np.ndarray) - - -async def assert_monitor_then_put( - ioc: TestingIOC, - device: EpicsDevice, - suffix: str, - protocol: Protocol, - datakey: dict, - initial_value: T, - put_value: T, - datatype: type[T] | None = None, - check_type: bool | None = True, -): - signal = getattr(device, suffix) - backend = signal._connector.backend - # Make a monitor queue that will monitor for updates - q = MonitorQueue(backend) - try: - # Check datakey - source = f"{protocol}://{get_prefix(ioc, protocol)}{suffix}" - assert dict(source=source, **datakey) == await backend.get_datakey(source) - # Check initial value - await q.assert_updates( - pytest.approx(initial_value), - datatype if check_type else None, - ) - # Put to new value and check that - await backend.put(put_value, wait=True) - await q.assert_updates( - pytest.approx(put_value), datatype if check_type else None - ) - finally: - q.close() - - -_metadata: dict[str, dict[str, dict[str, Any]]] = { - "ca": { - "boolean": {"units": ANY, "limits": ANY}, - "integer": {"units": ANY, "limits": ANY}, - "number": {"units": ANY, "limits": ANY, "precision": ANY}, - "enum": {}, - "string": {}, - }, - "pva": { - "boolean": {}, - "integer": {"units": ANY, "precision": ANY, "limits": ANY}, - "number": {"units": ANY, "precision": ANY, "limits": ANY}, - "enum": {}, - "string": {"units": ANY, "precision": ANY, "limits": ANY}, - }, -} - - -def datakey(protocol: str, suffix: str, value=None) -> DataKey: - def get_internal_dtype(suffix: str) -> str: - # uint32, [u]int64 backed by DBR_DOUBLE, have precision - if "float" in suffix or "uint32" in suffix or "int64" in suffix: - return "number" - if "int" in suffix: - return "integer" - if "bool" in suffix: - return "boolean" - if "enum" in suffix: - return "enum" - return "string" - - def get_dtype(suffix: str) -> str: - if suffix.endswith("a"): - return "array" - if "enum" in suffix: - return "string" - return get_internal_dtype(suffix) - - def get_dtype_numpy(suffix: str) -> str: # type: ignore - if "float32" in suffix: - return " None: - """Booleans are converted to Short Enumerations with values 0,1 as database does - not support boolean natively. - The flow of test_backend_get_put_monitor Gets a value with a dtype of None: we - cannot tell the difference between an enum with 2 members and a boolean, so - cannot get a DataKey that does not mutate form. - This test otherwise performs the same. - """ - # With the given datatype, check we have the correct initial value and putting - # works - device = await connect_example_device(ioc, protocol) - await assert_monitor_then_put( - ioc, - device, - suffix, - protocol, - datakey(protocol, suffix), - True, - False, - bool, - ) - # With datatype guessed from CA/PVA, check we can set it back to the initial value - await assert_monitor_then_put( - ioc, - device, - suffix, - protocol, - datakey(protocol, suffix, True), - False, - True, - bool, - ) - - -@PARAMETERISE_PROTOCOLS -async def test_error_raised_on_disconnected_PV(ioc, protocol) -> None: - if protocol == "pva": - expected = "pva://Disconnect" - elif protocol == "ca": - expected = "ca://Disconnect" - else: - raise TypeError() - device = await connect_example_device(ioc, protocol) - signal = device.my_bool - backend = signal._connector.backend - # The below will work without error - await signal.set(False) - # Change the name of write_pv to mock disconnection - backend.__setattr__("write_pv", "Disconnect") - with pytest.raises(asyncio.TimeoutError, match=expected): - await signal.set(True, timeout=0.1) - - -class BadEnum(StrictEnum): - A = "Aaa" - B = "B" - C = "Ccc" - - -def test_enum_equality(): - """Check that we are allowed to replace the passed datatype enum from a signal with - a version generated from the signal with at least all of the same values, but - possibly more. - """ - - class GeneratedChoices(StrictEnum): - A = "Aaa" - B = "B" - C = "Ccc" - - class ExtendedGeneratedChoices(StrictEnum): - A = "Aaa" - B = "B" - C = "Ccc" - D = "Ddd" - - for enum_class in (GeneratedChoices, ExtendedGeneratedChoices): - assert BadEnum.A == enum_class.A - assert BadEnum.A.value == enum_class.A - assert BadEnum.A.value == enum_class.A.value - assert BadEnum(enum_class.A) is BadEnum.A - assert BadEnum(enum_class.A.value) is BadEnum.A - assert not BadEnum == enum_class - - # We will always PUT BadEnum by String, and GET GeneratedChoices by index, - # so shouldn't ever run across this from conversion code, but may occur if - # casting returned values or passing as enum rather than value. - with pytest.raises(ValueError): - BadEnum(ExtendedGeneratedChoices.D) - - -class EnumNoString(Enum): - A = "Aaa" - - -class SubsetEnumWrongChoices(SubsetEnum): - A = "Aaa" - B = "B" - C = "Ccc" - - -@PARAMETERISE_PROTOCOLS -@pytest.mark.parametrize( - "typ, suff, errors", - [ - ( - BadEnum, - "enum", - ( - "has choices ('Aaa', 'Bbb', 'Ccc')", - "but ", - "requested ['Aaa', 'B', 'Ccc'] to be strictly equal", - ), - ), - ( - SubsetEnumWrongChoices, - "enum", - ( - "has choices ('Aaa', 'Bbb', 'Ccc')", - "but ", - "requested ['Aaa', 'B', 'Ccc'] to be a subset", - ), - ), - ( - int, - "str", - ("with inferred datatype str", "cannot be coerced to int"), - ), - ( - str, - "float", - ("with inferred datatype float", "cannot be coerced to str"), - ), - ( - str, - "stra", - ("with inferred datatype Sequence[str]", "cannot be coerced to str"), - ), - ( - int, - "uint8a", - ("with inferred datatype Array1D[np.uint8]", "cannot be coerced to int"), - ), - ( - float, - "enum", - ("with inferred datatype str", "cannot be coerced to float"), - ), - ( - Array1D[np.int32], - "float64a", - ( - "with inferred datatype Array1D[np.float64]", - "cannot be coerced to Array1D[np.int32]", - ), - ), - ], -) -async def test_backend_wrong_type_errors(ioc, typ, suff, errors, protocol): - with pytest.raises(TypeError) as cm: - await _make_backend(ioc, typ, protocol, suff) - for error in errors: - assert error in str(cm.value) - - -@PARAMETERISE_PROTOCOLS -async def test_backend_put_enum_string(ioc, protocol) -> None: - device = await connect_example_device(ioc, protocol) - backend = device.enum2._connector.backend - # Don't do this in production code, but allow on CLI - await backend.put("Ccc", wait=True) # type: ignore - assert ExampleEnum.C == await backend.get_value() - - -@PARAMETERISE_PROTOCOLS -async def test_backend_enum_which_doesnt_inherit_string(ioc, protocol) -> None: - with pytest.raises(TypeError): - await _make_backend(ioc, EnumNoString, protocol, "enum2") - - -@PARAMETERISE_PROTOCOLS -async def test_backend_get_setpoint(ioc, protocol) -> None: - device = await connect_example_device(ioc, protocol) - backend = device.enum2._connector.backend - await backend.put("Ccc", wait=True) - assert await backend.get_setpoint() == ExampleEnum.C - - -def approx_table(datatype: type[Table], table: Table): - new_table = datatype(**table.model_dump()) - for k, v in new_table: - if datatype is Table: - setattr(new_table, k, v) - else: - object.__setattr__(new_table, k, v) - return new_table - - -async def test_pva_table(ioc) -> None: - protocol: Protocol = "pva" - # CA can't do tables - initial = ExampleTable( - bool=np.array([False, False, True, True], np.bool_), - int=np.array([1, 8, -9, 32], np.int32), - float=np.array([1.8, 8.2, -6, 32.9887], np.float64), - str=["Hello", "World", "Foo", "Bar"], - enum=[ExampleEnum.A, ExampleEnum.B, ExampleEnum.A, ExampleEnum.C], - ) - put = ExampleTable( - bool=np.array([True, False], np.bool_), - int=np.array([-5, 32], np.int32), - float=np.array([8.5, -6.97], np.float64), - str=["Hello", "Bat"], - enum=[ExampleEnum.C, ExampleEnum.B], - ) - # Make and connect the backend - for t, i, p in [(ExampleTable, initial, put), (None, put, initial)]: - backend = await _make_backend(ioc, t, protocol, "table") - # Make a monitor queue that will monitor for updates - q = MonitorQueue(backend) - try: - # Check datakey - dk = await backend.get_datakey("test-source") - expected_dk = { - "dtype": "array", - "shape": [len(i)], - "source": "test-source", - "dtype_numpy": [ - # natively bool fields are uint8, so if we don't provide a Table - # subclass to specify bool, that is what we get - ("bool", "|b1" if t else "|u1"), - ("int", "" - assert val is ExampleEnum.B - assert val == "Bbb" - - -@PARAMETERISE_PROTOCOLS -async def test_str_datatype_in_mbbo(ioc, protocol): - device = await connect_example_device(ioc, protocol) - sig = device.enum - backend = sig._connector.backend - datakey = await backend.get_datakey(sig.source) - assert datakey["choices"] == ["Aaa", "Bbb", "Ccc"] - await sig.connect() - description = await sig.describe() - assert description[""]["choices"] == ["Aaa", "Bbb", "Ccc"] - val = await sig.get_value() - assert val == "Bbb" - - -@PARAMETERISE_PROTOCOLS -async def test_runtime_enum_returns_str(ioc, protocol): - pv_name = f"{protocol}://{get_prefix(ioc, protocol)}enum" - sig = epics_signal_rw(MySubsetEnum, pv_name) - - await sig.connect() - val = await sig.get_value() - assert val == "Bbb" - - -@PARAMETERISE_PROTOCOLS -async def test_signal_returns_units_and_precision(ioc, protocol): - device = await connect_example_device(ioc, protocol) - sig = device.my_float - datakey = (await sig.describe())[""] - assert datakey["units"] == "mm" - assert datakey["precision"] == 1 - - -@PARAMETERISE_PROTOCOLS -async def test_signal_not_return_none_units_and_precision(ioc, protocol): - device = await connect_example_device(ioc, protocol) - datakey = (await device.my_str.describe())[""] - assert not hasattr(datakey, "units") - assert not hasattr(datakey, "precision") - - -@PARAMETERISE_PROTOCOLS -async def test_signal_returns_limits(ioc, protocol): - expected_limits = Limits( - # LOW, HIGH - warning=LimitsRange(low=5.0, high=96.0), - # DRVL, DRVH - control=LimitsRange(low=10.0, high=90.0), - # LOPR, HOPR - display=LimitsRange(low=0.0, high=100.0), - # LOLO, HIHI - alarm=LimitsRange(low=2.0, high=98.0), - ) - - device = await connect_example_device(ioc, protocol) - limits = (await device.my_int.describe())[""]["limits"] - assert limits == expected_limits - - -@PARAMETERISE_PROTOCOLS -async def test_signal_returns_partial_limits(ioc, protocol): - expected_limits = Limits( - # LOLO, HIHI - alarm=LimitsRange(low=2.0, high=98.0), - # DRVL, DRVH - control=LimitsRange(low=10.0, high=90.0), - # LOPR, HOPR - display=LimitsRange(low=0.0, high=100.0), - ) - if protocol == "ca": - # HSV, LSV not set, but still present for CA - expected_limits["warning"] = LimitsRange(low=0, high=0) - device = await connect_example_device(ioc, protocol) - limits = (await device.partialint.describe())[""]["limits"] - assert limits == expected_limits - - -@PARAMETERISE_PROTOCOLS -async def test_signal_returns_warning_and_partial_limits(ioc, protocol): - expected_limits = Limits( - # control = display if DRVL, DRVH not set - control=LimitsRange(low=0.0, high=100.0), - # LOPR, HOPR - display=LimitsRange(low=0.0, high=100.0), - # LOW, HIGH - warning=LimitsRange(low=2.0, high=98.0), - ) - if protocol == "ca": - # HSV, LSV not set, but still present for CA - expected_limits["alarm"] = LimitsRange(low=0, high=0) - device = await connect_example_device(ioc, protocol) - sig = device.lessint - await sig.connect() - limits = (await sig.describe())[""]["limits"] - assert limits == expected_limits - - -@PARAMETERISE_PROTOCOLS -async def test_signal_not_return_no_limits(ioc, protocol): - device = await connect_example_device(ioc, protocol) - datakey = (await device.enum.describe())[""] - assert not hasattr(datakey, "limits") - - -@PARAMETERISE_PROTOCOLS -async def test_signals_created_for_prec_0_float_can_use_int(ioc, protocol): - pv_name = f"{protocol}://{get_prefix(ioc, protocol)}float_prec_0" - sig = epics_signal_rw(int, pv_name) - await sig.connect() - - -@PARAMETERISE_PROTOCOLS -async def test_signals_created_for_not_prec_0_float_cannot_use_int(ioc, protocol): - pv_name = f"{protocol}://{get_prefix(ioc, protocol)}float_prec_1" - sig = epics_signal_rw(int, pv_name) - with pytest.raises( - TypeError, - match="float_prec_1 with inferred datatype float" ".* cannot be coerced to int", - ): - await sig.connect() - - -@PARAMETERISE_PROTOCOLS -async def test_bool_works_for_mismatching_enums(ioc, protocol): - pv_name = f"{protocol}://{get_prefix(ioc, protocol)}bool" - sig = epics_signal_rw(bool, pv_name, pv_name + "_unnamed") - await sig.connect() - - -@pytest.mark.skipif(os.name == "nt", reason="Hangs on windows for unknown reasons") -@PARAMETERISE_PROTOCOLS -async def test_can_read_using_ophyd_async_then_ophyd(ioc, protocol): - prefix = get_prefix(ioc, protocol) - oa_read = f"{protocol}://{prefix}float_prec_1" - ophyd_read = f"{prefix}float_prec_0" - - ophyd_async_sig = epics_signal_rw(float, oa_read) - await ophyd_async_sig.connect() - ophyd_signal = EpicsSignal(ophyd_read) - ophyd_signal.wait_for_connection(timeout=5) - - RE = RunEngine() - - def my_plan(): - yield from bps.rd(ophyd_async_sig) - yield from bps.rd(ophyd_signal) - - RE(my_plan()) - - -def test_signal_module_emits_deprecation_warning(): - with pytest.deprecated_call(): - import ophyd_async.epics.signal # noqa: F401 - - -@PARAMETERISE_PROTOCOLS -async def test_observe_ticking_signal_with_busy_loop(ioc, protocol): - sig = epics_signal_rw(int, f"{protocol}://{get_prefix(ioc, protocol)}ticking") - await sig.connect() - - recv = [] - - async def watch(): - async for val in observe_value(sig, done_timeout=0.4): - time.sleep(0.3) - recv.append(val) - - start = time.time() - - with pytest.raises(asyncio.TimeoutError): - await watch() - assert time.time() - start == pytest.approx(0.6, abs=0.1) - assert len(recv) == 2 - # Don't check values as CA and PVA have different algorithms for - # dropping updates for slow callbacks diff --git a/tests/epics/signal/test_yaml_save_ca.yaml b/tests/epics/signal/test_yaml_save_ca.yaml new file mode 100644 index 0000000000..73eeda49f3 --- /dev/null +++ b/tests/epics/signal/test_yaml_save_ca.yaml @@ -0,0 +1,21 @@ +bool_unnamed: true +enum: Bbb +enum2: Bbb +float32a: [1.9999999949504854e-06, -123.12300109863281] +float64a: [0.1, -12345678.123] +int16a: [-32768, 32767] +int32a: [-2147483648, 2147483647] +lessint: 42 +longstr: a string that is just longer than forty characters +longstr2: a string that is just longer than forty characters +my_bool: true +my_float: 3.141 +my_int: 42 +my_str: hello +partialint: 42 +stra: +- five +- six +- seven +subset_enum: Bbb +uint8a: [0, 255] diff --git a/tests/epics/signal/test_yaml_save_pva.yaml b/tests/epics/signal/test_yaml_save_pva.yaml new file mode 100644 index 0000000000..051b650eb0 --- /dev/null +++ b/tests/epics/signal/test_yaml_save_pva.yaml @@ -0,0 +1,40 @@ +bool_unnamed: true +enum: Bbb +enum2: Bbb +float32a: [1.9999999949504854e-06, -123.12300109863281] +float64a: [0.1, -12345678.123] +int16a: [-32768, 32767] +int32a: [-2147483648, 2147483647] +int64a: [-2147483649, 2147483648] +int8a: [-128, 127] +lessint: 42 +longstr: a string that is just longer than forty characters +longstr2: a string that is just longer than forty characters +my_bool: true +my_float: 3.141 +my_int: 42 +my_str: hello +partialint: 42 +stra: +- five +- six +- seven +subset_enum: Bbb +table: + bool: [false, false, true, true] + enum: + - Aaa + - Bbb + - Aaa + - Ccc + float: [1.8, 8.2, -6.0, 32.9887] + int: [1, 8, -9, 32] + str: + - Hello + - World + - Foo + - Bar +uint16a: [0, 65535] +uint32a: [0, 4294967295] +uint64a: [0, 4294967297] +uint8a: [0, 255]