Skip to content

Commit

Permalink
Rewrite EPICS signal tests
Browse files Browse the repository at this point in the history
  • Loading branch information
coretl committed Dec 20, 2024
1 parent c781e27 commit 5e4c310
Show file tree
Hide file tree
Showing 18 changed files with 953 additions and 1,126 deletions.
3 changes: 0 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,6 @@ dev = [
"types-pyyaml",
]

[project.scripts]
ophyd-async = "ophyd_async.__main__:main"

[project.urls]
GitHub = "https://github.com/bluesky/ophyd-async"

Expand Down
5 changes: 5 additions & 0 deletions src/ophyd_async/core/_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ class Table(BaseModel):
# so it is strictly checked against the BaseModel we are supplied.
model_config = ConfigDict(extra="allow")

# Add an init method to match the above model config, otherwise the type
# checker will not think we can pass arbitrary kwargs into the base class init
def __init__(self, **kwargs):
super().__init__(**kwargs)

@classmethod
def __init_subclass__(cls):
# But forbit extra in subclasses so it gets validated
Expand Down
2 changes: 1 addition & 1 deletion src/ophyd_async/core/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def get_enum_cls(datatype: type | None) -> type[StrictEnum] | None:
if datatype and issubclass(datatype, Enum):
if not issubclass(datatype, StrictEnum):
raise TypeError(
f"{datatype} should inherit from .SubsetEnum "
f"{datatype} should inherit from ophyd_async.core.SubsetEnum "
"or ophyd_async.core.StrictEnum"
)
return datatype
Expand Down
14 changes: 9 additions & 5 deletions src/ophyd_async/epics/core/_aioca.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def _limits_from_augmented_value(value: AugmentedValue) -> Limits:
def get_limits(limit: str) -> LimitsRange | None:
low = getattr(value, f"lower_{limit}_limit", nan)
high = getattr(value, f"upper_{limit}_limit", nan)
if not (isnan(low) and isnan(high)):
if not (isnan(low) and isnan(high)) and not high == low == 0:
return LimitsRange(
low=None if isnan(low) else low,
high=None if isnan(high) else high,
Expand All @@ -59,14 +59,16 @@ def get_limits(limit: str) -> LimitsRange | None:


def _metadata_from_augmented_value(
value: AugmentedValue, metadata: SignalMetadata
datatype: type[SignalDatatypeT] | None,
value: AugmentedValue,
metadata: SignalMetadata,
) -> SignalMetadata:
metadata = metadata.copy()
if hasattr(value, "units"):
if hasattr(value, "units") and datatype not in (str, bool):
metadata["units"] = value.units
if hasattr(value, "precision") and not isnan(value.precision):
metadata["precision"] = value.precision
if limits := _limits_from_augmented_value(value):
if (limits := _limits_from_augmented_value(value)) and datatype is not bool:
metadata["limits"] = limits
return metadata

Expand Down Expand Up @@ -290,7 +292,9 @@ async def put(self, value: SignalDatatypeT | None, wait: bool):

async def get_datakey(self, source: str) -> DataKey:
value = await self._caget(self.read_pv, FORMAT_CTRL)
metadata = _metadata_from_augmented_value(value, self.converter.metadata)
metadata = _metadata_from_augmented_value(
self.datatype, value, self.converter.metadata
)
return make_datakey(
self.converter.datatype, self.converter.value(value), source, metadata
)
Expand Down
22 changes: 17 additions & 5 deletions src/ophyd_async/epics/core/_p4p.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def get_limits(
substructure = getattr(value, substucture_name, None)
low = getattr(substructure, low_name, nan)
high = getattr(substructure, high_name, nan)
if not (isnan(low) and isnan(high)):
if not (isnan(low) and isnan(high)) and not low == high == 0:
return LimitsRange(
low=None if isnan(low) else low,
high=None if isnan(high) else high,
Expand All @@ -60,12 +60,21 @@ def get_limits(
def _metadata_from_value(datatype: type[SignalDatatype], value: Any) -> SignalMetadata:
metadata = SignalMetadata()
value_data: Any = getattr(value, "value", None)
specifier = _get_specifier(value)
display_data: Any = getattr(value, "display", None)
if hasattr(display_data, "units"):
if (
hasattr(display_data, "units")
and specifier[-1] in _number_specifiers
and datatype is not str
):
metadata["units"] = display_data.units
if hasattr(display_data, "precision") and not isnan(display_data.precision):
if (
hasattr(display_data, "precision")
and not isnan(display_data.precision)
and specifier[-1] in _float_specifiers
):
metadata["precision"] = display_data.precision
if limits := _limits_from_value(value):
if (limits := _limits_from_value(value)) and specifier[-1] in _number_specifiers:
metadata["limits"] = limits
# Get choices from display or value
if datatype is str or issubclass(datatype, StrictEnum):
Expand Down Expand Up @@ -174,6 +183,9 @@ def write_value(self, value: BaseModel | dict[str, Any]) -> Any:


# https://mdavidsaver.github.io/p4p/values.html
_float_specifiers = {"f", "d"}
_int_specifiers = {"b", "B", "h", "H", "i", "I", "l", "L"}
_number_specifiers = _float_specifiers.union(_int_specifiers)
_datatype_converter_from_typeid: dict[
tuple[str, str], tuple[type[SignalDatatype], type[PvaConverter]]
] = {
Expand Down Expand Up @@ -208,7 +220,7 @@ def write_value(self, value: BaseModel | dict[str, Any]) -> Any:
}


def _get_specifier(value: Value):
def _get_specifier(value: Value) -> str:
typ = value.type("value").aspy()
if isinstance(typ, tuple):
return typ[0]
Expand Down
30 changes: 15 additions & 15 deletions src/ophyd_async/epics/testing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
from ._example_ioc import (
from ._example import (
CA_PVA_RECORDS,
PVA_RECORDS,
ExampleCaDevice,
ExampleEnum,
ExamplePvaDevice,
ExampleTable,
connect_example_device,
get_example_ioc,
EpicsTestCaDevice,
EpicsTestEnum,
EpicsTestIocAndDevices,
EpicsTestPvaDevice,
EpicsTestSubsetEnum,
EpicsTestTable,
)
from ._utils import TestingIOC, generate_random_PV_prefix
from ._utils import TestingIOC, generate_random_pv_prefix

__all__ = [
"CA_PVA_RECORDS",
"PVA_RECORDS",
"ExampleCaDevice",
"ExampleEnum",
"ExamplePvaDevice",
"ExampleTable",
"connect_example_device",
"get_example_ioc",
"EpicsTestCaDevice",
"EpicsTestEnum",
"EpicsTestSubsetEnum",
"EpicsTestPvaDevice",
"EpicsTestTable",
"EpicsTestIocAndDevices",
"TestingIOC",
"generate_random_PV_prefix",
"generate_random_pv_prefix",
]
112 changes: 47 additions & 65 deletions src/ophyd_async/epics/testing/_example_ioc.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,48 @@
from collections.abc import Sequence
from pathlib import Path
from typing import Annotated as A
from typing import Literal

import numpy as np

from ophyd_async.core import (
Array1D,
SignalRW,
StrictEnum,
Table,
)
from ophyd_async.epics.core import (
EpicsDevice,
PvSuffix,
)
from ophyd_async.core import Array1D, SignalR, SignalRW, StrictEnum, Table
from ophyd_async.core._utils import SubsetEnum
from ophyd_async.epics.core import EpicsDevice, PvSuffix

from ._utils import TestingIOC
from ._utils import TestingIOC, generate_random_pv_prefix

CA_PVA_RECORDS = str(Path(__file__).parent / "test_records.db")
PVA_RECORDS = str(Path(__file__).parent / "test_records_pva.db")
CA_PVA_RECORDS = Path(__file__).parent / "test_records.db"
PVA_RECORDS = Path(__file__).parent / "test_records_pva.db"


class ExampleEnum(StrictEnum):
class EpicsTestEnum(StrictEnum):
A = "Aaa"
B = "Bbb"
C = "Ccc"


class ExampleTable(Table):
class EpicsTestSubsetEnum(SubsetEnum):
A = "Aaa"
B = "Bbb"


class EpicsTestTable(Table):
bool: Array1D[np.bool_]
int: Array1D[np.int32]
float: Array1D[np.float64]
str: Sequence[str]
enum: Sequence[ExampleEnum]
enum: Sequence[EpicsTestEnum]


class ExampleCaDevice(EpicsDevice):
class EpicsTestCaDevice(EpicsDevice):
my_int: A[SignalRW[int], PvSuffix("int")]
my_float: A[SignalRW[float], PvSuffix("float")]
my_str: A[SignalRW[str], PvSuffix("str")]
longstr: A[SignalRW[str], PvSuffix("longstr")]
longstr2: A[SignalRW[str], PvSuffix("longstr2")]
longstr2: A[SignalRW[str], PvSuffix("longstr2.VAL$")]
my_bool: A[SignalRW[bool], PvSuffix("bool")]
enum: A[SignalRW[ExampleEnum], PvSuffix("enum")]
enum2: A[SignalRW[ExampleEnum], PvSuffix("enum2")]
enum: A[SignalRW[EpicsTestEnum], PvSuffix("enum")]
enum2: A[SignalRW[EpicsTestEnum], PvSuffix("enum2")]
subset_enum: A[SignalRW[EpicsTestSubsetEnum], PvSuffix("subset_enum")]
bool_unnamed: A[SignalRW[bool], PvSuffix("bool_unnamed")]
partialint: A[SignalRW[int], PvSuffix("partialint")]
lessint: A[SignalRW[int], PvSuffix("lessint")]
Expand All @@ -56,52 +54,36 @@ class ExampleCaDevice(EpicsDevice):
stra: A[SignalRW[Sequence[str]], PvSuffix("stra")]


class ExamplePvaDevice(ExampleCaDevice): # pva can support all signal types that ca can
class EpicsTestPvaDevice(EpicsTestCaDevice):
# pva can support all signal types that ca can
int8a: A[SignalRW[Array1D[np.int8]], PvSuffix("int8a")]
uint16a: A[SignalRW[Array1D[np.uint16]], PvSuffix("uint16a")]
uint32a: A[SignalRW[Array1D[np.uint32]], PvSuffix("uint32a")]
int64a: A[SignalRW[Array1D[np.int64]], PvSuffix("int64a")]
uint64a: A[SignalRW[Array1D[np.uint64]], PvSuffix("uint64a")]
table: A[SignalRW[ExampleTable], PvSuffix("table")]
ntndarray_data: A[SignalRW[Array1D[np.int64]], PvSuffix("ntndarray:data")]


async def connect_example_device(
ioc: TestingIOC, protocol: Literal["ca", "pva"]
) -> ExamplePvaDevice | ExampleCaDevice:
"""Helper function to return a connected example device.
Parameters
----------
ioc: TestingIOC
TestingIOC configured to provide the records needed for the device
protocol: Literal["ca", "pva"]
The transport protocol of the device
Returns
-------
ExamplePvaDevice | ExampleCaDevice
a connected EpicsDevice with signals of many EPICS record types
"""
device_cls = ExamplePvaDevice if protocol == "pva" else ExampleCaDevice
device = device_cls(f"{protocol}://{ioc.prefix_for(device_cls)}")
await device.connect()
return device


def get_example_ioc() -> TestingIOC:
"""Get TestingIOC instance with the example databases loaded.
Returns
-------
TestingIOC
instance with test_records.db loaded for ExampleCaDevice and
test_records.db and test_records_pva.db loaded for ExamplePvaDevice.
"""
ioc = TestingIOC()
ioc.database_for(PVA_RECORDS, ExamplePvaDevice)
ioc.database_for(CA_PVA_RECORDS, ExamplePvaDevice)
ioc.database_for(CA_PVA_RECORDS, ExampleCaDevice)
return ioc
table: A[SignalRW[EpicsTestTable], PvSuffix("table")]
ntndarray: A[SignalR[np.ndarray], PvSuffix("ntndarray")]


class EpicsTestIocAndDevices:
def __init__(self):
self.prefix = generate_random_pv_prefix()
self.ioc = TestingIOC()
# Create supporting records and ExampleCaDevice
ca_prefix = f"{self.prefix}ca:"
self.ioc.add_database(CA_PVA_RECORDS, device=ca_prefix)
self.ca_device = EpicsTestCaDevice(f"ca://{ca_prefix}")
# Create supporting records and ExamplePvaDevice
pva_prefix = f"{self.prefix}pva:"
self.ioc.add_database(CA_PVA_RECORDS, device=pva_prefix)
self.ioc.add_database(PVA_RECORDS, device=pva_prefix)
self.pva_device = EpicsTestPvaDevice(f"pva://{pva_prefix}")

def get_device(self, protocol: str) -> EpicsTestCaDevice | EpicsTestPvaDevice:
return getattr(self, f"{protocol}_device")

def get_signal(self, protocol: str, name: str) -> SignalRW:
return getattr(self.get_device(protocol), name)

def get_pv(self, protocol: str, name: str) -> str:
return f"{protocol}://{self.prefix}{protocol}:{name}"
Loading

0 comments on commit 5e4c310

Please sign in to comment.