From 3f1356c1559b35e5471aaa267b6d1f04da17ad47 Mon Sep 17 00:00:00 2001 From: Chris Meyer Date: Mon, 30 Oct 2023 15:04:59 -0700 Subject: [PATCH] Change display values pipeline to use a more general architecture. --- nion/swift/model/DisplayItem.py | 608 +++++++++++++++++++++++--------- 1 file changed, 435 insertions(+), 173 deletions(-) diff --git a/nion/swift/model/DisplayItem.py b/nion/swift/model/DisplayItem.py index c4866af8d..9b86c754c 100755 --- a/nion/swift/model/DisplayItem.py +++ b/nion/swift/model/DisplayItem.py @@ -5,6 +5,7 @@ import concurrent.futures import contextlib import copy +import dataclasses import datetime import functools import gettext @@ -177,7 +178,8 @@ def remove_index(self, remove_index: int) -> None: def calculate_display_range(display_limits: DisplayLimitsType, data_range: typing.Optional[typing.Tuple[float, float]], - data_sample: typing.Optional[_ImageDataType], xdata: typing.Optional[DataAndMetadata.DataAndMetadata], + data_sample: typing.Optional[_ImageDataType], + xdata: typing.Optional[DataAndMetadata.DataAndMetadata], complex_display_type: typing.Optional[str]) -> typing.Optional[typing.Tuple[float, float]]: if display_limits is not None: assert data_range is not None @@ -284,6 +286,357 @@ def transform(self, data: _ImageDataType, display_limits: typing.Tuple[float, fl return None +@typing.runtime_checkable +class ProcessorLike(typing.Protocol): + """A processor like object that can be used to process data and metadata. + + Subclasses should implement _get_result but callers should call get_result, which provides caching. + """ + + def execute(self) -> None: ... + + def _execute(self) -> None: ... + + def set_parameter(self, key: str, value: typing.Any) -> None: ... + + def _get_parameter(self, key: str) -> typing.Any: ... + + def get_result(self, key: str) -> typing.Any: ... + + def set_result(self, key: str, value: typing.Any) -> None: ... + + +@dataclasses.dataclass +class ProcessorConnection: + source: ProcessorLike + source_key: str + target_key: typing.Optional[str] = None + + +class ProcessorBase(ProcessorLike): + """A processor-like object that can be used to process data and metadata. + + The result timestamps must be set to the first input timestamp. + """ + + def __init__(self, **kwargs: typing.Any) -> None: + self.__dirty = True + self.__parameters = dict[str, typing.Any]() + self.__results = dict[str, typing.Any]() + self.__lock = threading.RLock() + self.__connections = list[ProcessorConnection]() + for key, value in kwargs.items(): + if isinstance(value, ProcessorConnection): + self.add_connection(value.source, value.source_key, value.target_key) + self.set_parameter(key, value) + + def add_connection(self, source: ProcessorLike, source_key: str, target_key: typing.Optional[str] = None) -> None: + target_key = target_key if target_key else source_key + self.__connections.append(ProcessorConnection(source, source_key, target_key)) + + def execute(self) -> None: + with self.__lock: + if self.__dirty: + for connection in self.__connections: + self.set_parameter(connection.target_key or connection.source_key, connection.source.get_result(connection.source_key)) + self.__dirty = False + self._execute() + + def set_parameter(self, key: str, value: typing.Any) -> None: + with self.__lock: + self.__parameters[key] = value + self.__dirty = True + + def _get_parameter(self, key: str) -> typing.Any: + with self.__lock: + return self.__parameters.get(key, None) + + def _get_string(self, key: str) -> str: + with self.__lock: + return typing.cast(str, self.__parameters[key]) + + def _get_optional_string(self, key: str) -> typing.Optional[str]: + with self.__lock: + return typing.cast(typing.Optional[str], self.__parameters[key]) + + def _get_int(self, key: str) -> int: + with self.__lock: + return typing.cast(int, self.__parameters[key]) + + def _get_optional_int(self, key: str) -> typing.Optional[int]: + with self.__lock: + return typing.cast(typing.Optional[int], self.__parameters[key]) + + def _get_float(self, key: str) -> float: + with self.__lock: + return typing.cast(float, self.__parameters[key]) + + def _get_optional_float(self, key: str) -> typing.Optional[float]: + with self.__lock: + return typing.cast(typing.Optional[float], self.__parameters[key]) + + def get_result(self, key: str) -> typing.Any: + with self.__lock: + self.execute() + return self.__results.get(key, None) + + def set_result(self, key: str, value: typing.Any) -> None: + with self.__lock: + self.__results[key] = value + + def _get_data_and_metadata_like(self, key: str) -> typing.Optional[DataAndMetadata.DataAndMetadata]: + input_data_and_metadata = self._get_parameter(key) + return DataAndMetadata.promote_ndarray(input_data_and_metadata) if input_data_and_metadata is not None else None + + +class ElementDataProcessor(ProcessorBase): + def __init__(self, *, + data: typing.Union[typing.Optional[DataAndMetadata._DataAndMetadataLike], ProcessorConnection] = None, + sequence_index: typing.Union[typing.Optional[int], ProcessorConnection] = None, + collection_index: typing.Union[typing.Optional[DataAndMetadata.PositionType], ProcessorConnection] = None, + slice_center: typing.Union[typing.Optional[int], ProcessorConnection] = None, + slice_width: typing.Union[typing.Optional[int], ProcessorConnection] = None) -> None: + super().__init__(data=data, sequence_index=sequence_index, collection_index=collection_index, + slice_center=slice_center, slice_width=slice_width) + + def _execute(self) -> None: + input_data_and_metadata = self._get_data_and_metadata_like("data") + sequence_index = self._get_int("sequence_index") + collection_index = typing.cast(typing.Optional[DataAndMetadata.PositionType], self._get_parameter("collection_index")) + slice_center = self._get_int("slice_center") + slice_width = self._get_int("slice_width") + data_and_metadata: typing.Optional[DataAndMetadata.DataAndMetadata] = None + if input_data_and_metadata: + data_and_metadata, modified = Core.function_element_data_no_copy(input_data_and_metadata, + sequence_index, + collection_index, + slice_center, + slice_width, + flag16=False) + self.set_result("data", data_and_metadata) + + +class DisplayDataProcessor(ProcessorBase): + def __init__(self, *, + element_data: typing.Union[typing.Optional[DataAndMetadata._DataAndMetadataLike], ProcessorConnection] = None, + complex_display_type: typing.Union[typing.Optional[str], ProcessorConnection] = None) -> None: + super().__init__(element_data=element_data, complex_display_type=complex_display_type) + + def _execute(self) -> None: + element_data_and_metadata = self._get_data_and_metadata_like("element_data") + complex_display_type = self._get_string("complex_display_type") + data_and_metadata: typing.Optional[DataAndMetadata.DataAndMetadata] = None + if element_data_and_metadata: + data_and_metadata, modified = Core.function_scalar_data_no_copy(element_data_and_metadata, complex_display_type) + self.set_result("data", data_and_metadata) + + +class DataRangeProcessor(ProcessorBase): + def __init__(self, *, + data: typing.Union[typing.Optional[DataAndMetadata._DataAndMetadataLike], ProcessorConnection] = None, + display_data: typing.Union[typing.Optional[DataAndMetadata._DataAndMetadataLike], ProcessorConnection] = None) -> None: + super().__init__(data=data, display_data=display_data) + + def _execute(self) -> None: + data_and_metadata = self._get_data_and_metadata_like("data") + display_data_and_metadata = self._get_data_and_metadata_like("display_data") + display_data = display_data_and_metadata.data if display_data_and_metadata else None + data_range: typing.Optional[typing.Tuple[float, float]] + if display_data is not None and display_data.shape and data_and_metadata: + data_shape = data_and_metadata.data_shape + data_dtype = data_and_metadata.data_dtype + if Image.is_shape_and_dtype_rgb_type(data_shape, data_dtype): + data_range = (0, 255) + elif Image.is_shape_and_dtype_complex_type(data_shape, data_dtype): + data_range = (numpy.amin(display_data), numpy.amax(display_data)) + else: + data_range = (numpy.amin(display_data), numpy.amax(display_data)) + else: + data_range = None + if data_range is not None: + if math.isnan(data_range[0]) or math.isnan(data_range[1]) or math.isinf( + data_range[0]) or math.isinf(data_range[1]): + data_range = (0.0, 0.0) + if numpy.issubdtype(type(data_range[0]), numpy.bool_): + data_range = (int(data_range[0]), data_range[1]) + if numpy.issubdtype(type(data_range[1]), numpy.bool_): + data_range = (data_range[0], int(data_range[1])) + self.set_result("data_range", data_range) + + +class DisplayRangeProcessor(ProcessorBase): + def __init__(self, *, + element_data: typing.Union[typing.Optional[DataAndMetadata._DataAndMetadataLike], ProcessorConnection] = None, + display_limits: typing.Union[typing.Optional[DisplayLimitsType], ProcessorConnection] = None, + data_range: typing.Union[typing.Optional[typing.Tuple[float, float]], ProcessorConnection] = None, + data_sample: typing.Union[typing.Optional[_ImageDataType], ProcessorConnection] = None, + complex_display_type: typing.Union[typing.Optional[str], ProcessorConnection] = None) -> None: + super().__init__(element_data=element_data, display_limits=display_limits, data_range=data_range, + data_sample=data_sample, complex_display_type=complex_display_type) + + def _execute(self) -> None: + element_data_and_metadata = self._get_data_and_metadata_like("element_data") + display_limits = typing.cast(typing.Optional[DisplayLimitsType], self._get_parameter("display_limits")) + data_range = typing.cast(typing.Optional[typing.Tuple[float, float]], self._get_parameter("data_range")) + data_sample = typing.cast(typing.Optional[_ImageDataType], self._get_parameter("data_sample")) + complex_display_type = self._get_optional_string("complex_display_type") + display_range = calculate_display_range(display_limits, data_range, data_sample, element_data_and_metadata, complex_display_type) + self.set_result("display_range", display_range) + + +class DataSampleProcessor(ProcessorBase): + def __init__(self, *, + data: typing.Union[typing.Optional[DataAndMetadata._DataAndMetadataLike], ProcessorConnection] = None, + display_data: typing.Union[typing.Optional[DataAndMetadata._DataAndMetadataLike], ProcessorConnection] = None) -> None: + super().__init__(data=data, display_data=display_data) + + def _execute(self) -> None: + data_and_metadata = self._get_data_and_metadata_like("data") + display_data_and_metadata = self._get_data_and_metadata_like("display_data") + display_data = display_data_and_metadata.data if display_data_and_metadata else None + data_sample: typing.Optional[_ImageDataType] = None + if display_data is not None and display_data.shape and data_and_metadata: + data_shape = data_and_metadata.data_shape + data_dtype = data_and_metadata.data_dtype + if Image.is_shape_and_dtype_rgb_type(data_shape, data_dtype): + data_sample = None + elif Image.is_shape_and_dtype_complex_type(data_shape, data_dtype): + data_sample = numpy.sort(numpy.random.choice(display_data.reshape(numpy.prod(display_data.shape, dtype=numpy.uint64)), 200)) + else: + data_sample = None + self.set_result("data_sample", data_sample) + + +class DisplayRGBProcessor(ProcessorBase): + def __init__(self, *, + adjusted_data: typing.Union[typing.Optional[DataAndMetadata._DataAndMetadataLike], ProcessorConnection] = None, + data_range: typing.Union[typing.Optional[typing.Tuple[float, float]], ProcessorConnection] = None, + display_range: typing.Union[typing.Optional[typing.Tuple[float, float]], ProcessorConnection] = None, + color_map_data: typing.Union[typing.Optional[_ImageDataType], ProcessorConnection] = None) -> None: + super().__init__(adjusted_data=adjusted_data, data_range=data_range, display_range=display_range, color_map_data=color_map_data) + + def _execute(self) -> None: + adjusted_data_and_metadata = self._get_data_and_metadata_like("adjusted_data") + data_range = typing.cast(typing.Optional[typing.Tuple[float, float]], self._get_parameter("data_range")) + display_range = typing.cast(typing.Optional[typing.Tuple[float, float]], self._get_parameter("display_range")) + color_map_data = typing.cast(typing.Optional[_ImageDataType], self._get_parameter("color_map_data")) + display_rgba_data: typing.Optional[_ImageDataType] = None + if adjusted_data_and_metadata: + if data_range is not None: # workaround until validating and retrieving data stats is an atomic operation + # display_range is just display_limits but calculated if display_limits is None + display_rgba = Core.function_display_rgba(adjusted_data_and_metadata, display_range, color_map_data) + display_rgba_data = display_rgba.data if display_rgba else None + self.set_result("display_rgba", display_rgba_data) + + +class NormalizedDataProcessor(ProcessorBase): + def __init__(self, *, + display_data: typing.Union[typing.Optional[DataAndMetadata._DataAndMetadataLike], ProcessorConnection] = None, + display_range: typing.Union[typing.Optional[typing.Tuple[float, float]], ProcessorConnection] = None) -> None: + super().__init__(display_data=display_data, display_range=display_range) + + def _execute(self) -> None: + display_data_and_metadata = self._get_data_and_metadata_like("display_data") + display_range = typing.cast(typing.Optional[typing.Tuple[float, float]], self._get_parameter("display_range")) + data_and_metadata: typing.Optional[DataAndMetadata.DataAndMetadata] = None + if display_range is not None and display_data_and_metadata: + display_limit_low, display_limit_high = display_range + # normalize the data to [0, 1]. + m = 1 / (display_limit_high - display_limit_low) if display_limit_high != display_limit_low else 0.0 + b = -display_limit_low + data_and_metadata = DataAndMetadata.new_data_and_metadata(data=float(m) * (display_data_and_metadata.data + float(b)), + timestamp=display_data_and_metadata.timestamp, + timezone=display_data_and_metadata.timezone, + timezone_offset=display_data_and_metadata.timezone_offset) + self.set_result("data", data_and_metadata) + + +class AdjustedDataProcessor(ProcessorBase): + def __init__(self, *, + normalized_data: typing.Union[typing.Optional[DataAndMetadata._DataAndMetadataLike], ProcessorConnection] = None, + display_data: typing.Union[typing.Optional[DataAndMetadata._DataAndMetadataLike], ProcessorConnection] = None, + display_range: typing.Union[typing.Optional[typing.Tuple[float, float]], ProcessorConnection] = None, + adjustments: typing.Union[typing.Optional[typing.Sequence[Persistence.PersistentDictType]], ProcessorConnection] = None) -> None: + super().__init__(normalized_data=normalized_data, display_data=display_data, display_range=display_range, adjustments=adjustments) + + def _execute(self) -> None: + normalized_data_and_metadata = self._get_data_and_metadata_like("normalized_data") + display_data_and_metadata = self._get_data_and_metadata_like("display_data") + display_range = typing.cast(typing.Optional[typing.Tuple[float, float]], self._get_parameter("display_range")) + adjustments = typing.cast(typing.Optional[typing.Sequence[Persistence.PersistentDictType]], self._get_parameter("adjustments")) + adjusted_data_and_metadata: typing.Optional[DataAndMetadata.DataAndMetadata] = display_data_and_metadata + if adjustments: + adjusted_data_and_metadata = normalized_data_and_metadata + for adjustment_d in adjustments: + adjustment = adjustment_factory(adjustment_d) + if adjustment: + display_range = display_range + if adjusted_data_and_metadata and display_range is not None: + display_data = adjusted_data_and_metadata.data + if display_data is not None: + adjusted_data_and_metadata = DataAndMetadata.new_data_and_metadata( + adjustment.transform(display_data, display_range), + timestamp=adjusted_data_and_metadata.timestamp, + timezone=adjusted_data_and_metadata.timezone, + timezone_offset=adjusted_data_and_metadata.timezone_offset) + self.set_result("data", adjusted_data_and_metadata) + + +class AdjustedDisplayRangeProcessor(ProcessorBase): + def __init__(self, *, + display_range: typing.Union[typing.Optional[typing.Tuple[float, float]], ProcessorConnection] = None, + adjustments: typing.Union[typing.Optional[typing.Sequence[Persistence.PersistentDictType]], ProcessorConnection] = None) -> None: + super().__init__(display_range=display_range, adjustments=adjustments) + + def _execute(self) -> None: + display_range = typing.cast(typing.Optional[typing.Tuple[float, float]], self._get_parameter("display_range")) + adjustments = typing.cast(typing.Optional[typing.Sequence[Persistence.PersistentDictType]], self._get_parameter("adjustments")) + adjusted_display_range: typing.Optional[typing.Tuple[float, float]] + if adjustments: + # transforms have already been applied and data is now in the range of 0.0, 1.0. + # brightness and contrast will be applied on top of this transform. + adjusted_display_range = 0.0, 1.0 + else: + adjusted_display_range = display_range + self.set_result("display_range", adjusted_display_range) + + +class TransformedDisplayRangeProcessor(ProcessorBase): + def __init__(self, *, + adjusted_display_range: typing.Union[typing.Optional[typing.Tuple[float, float]], ProcessorConnection] = None, + brightness: typing.Union[typing.Optional[float], ProcessorConnection] = None, + contrast: typing.Union[typing.Optional[float], ProcessorConnection] = None) -> None: + super().__init__(adjusted_display_range=adjusted_display_range, brightness=brightness, contrast=contrast) + + def _execute(self) -> None: + adjusted_display_range = typing.cast(typing.Optional[typing.Tuple[float, float]], self._get_parameter("adjusted_display_range")) + brightness = self._get_float("brightness") + contrast = self._get_float("contrast") + assert adjusted_display_range is not None + display_limit_low, display_limit_high = adjusted_display_range + m = (contrast / (display_limit_high - display_limit_low)) if contrast > 0 and display_limit_high != display_limit_low else 1 + b = 1 / (2 * m) - (1 - brightness) * (display_limit_high - display_limit_low) / 2 - display_limit_low + # back calculate the display limits as they would be with brightness/contrast adjustments + transformed_display_range = (0 - m * b) / m, (1 - m * b) / m + self.set_result("display_range", transformed_display_range) + + +class TransformedDataProcessor(ProcessorBase): + def __init__(self, *, + adjusted_data: typing.Union[typing.Optional[DataAndMetadata._DataAndMetadataLike], ProcessorConnection] = None, + transformed_display_range: typing.Union[typing.Optional[typing.Tuple[float, float]], ProcessorConnection] = None) -> None: + super().__init__(adjusted_data=adjusted_data, transformed_display_range=transformed_display_range) + + def _execute(self) -> None: + adjusted_data_and_metadata = self._get_data_and_metadata_like("adjusted_data") + transformed_display_range = typing.cast(typing.Optional[typing.Tuple[float, float]], self._get_parameter("transformed_display_range")) + transformed_data_and_metadata: typing.Optional[DataAndMetadata.DataAndMetadata] = None + if adjusted_data_and_metadata: + transformed_data_and_metadata = Core.function_rescale(adjusted_data_and_metadata, data_range=(0.0, 1.0), in_range=transformed_display_range) + self.set_result("data", transformed_data_and_metadata) + + + class DisplayValues: """Calculate display data used to render the display. @@ -308,37 +661,72 @@ def __init__(self, data_and_metadata: typing.Optional[DataAndMetadata.DataAndMet color_map_data: typing.Optional[_RGBA32Type], brightness: float, contrast: float, adjustments: typing.Sequence[Persistence.PersistentDictType]) -> None: DisplayValues._count += 1 - self.__lock = threading.RLock() + self.__data_and_metadata = data_and_metadata - self.__sequence_index = sequence_index - self.__collection_index = collection_index - self.__slice_center = slice_center - self.__slice_width = slice_width - self.__display_limits = display_limits - self.__complex_display_type = complex_display_type self.__color_map_data = color_map_data - self.__brightness = brightness - self.__contrast = contrast - self.__adjustments = list(adjustments) - self.__element_data_and_metadata_dirty = True - self.__element_data_and_metadata: typing.Optional[DataAndMetadata.DataAndMetadata] = None - self.__display_data_and_metadata_dirty = True - self.__display_data_and_metadata: typing.Optional[DataAndMetadata.DataAndMetadata] = None - self.__normalized_data_and_metadata_dirty = True - self.__normalized_data_and_metadata: typing.Optional[DataAndMetadata.DataAndMetadata] = None - self.__adjusted_data_and_metadata_dirty = True - self.__adjusted_data_and_metadata: typing.Optional[DataAndMetadata.DataAndMetadata] = None - self.__transformed_data_and_metadata_dirty = True - self.__transformed_data_and_metadata: typing.Optional[DataAndMetadata.DataAndMetadata] = None - self.__data_range_dirty = True - self.__data_range: typing.Optional[typing.Tuple[float, float]] = None - self.__data_sample_dirty = True - self.__data_sample: typing.Optional[_ImageDataType] = None - self.__display_range_dirty = True - self.__display_range: typing.Optional[typing.Tuple[float, float]] = None - self.__display_rgba_dirty = True - self.__display_rgba: typing.Optional[_ImageDataType] = None - self.__display_rgba_timestamp: typing.Optional[datetime.datetime] = data_and_metadata.timestamp if data_and_metadata else None + + self.__element_data_processor = ElementDataProcessor(data=data_and_metadata, + sequence_index=sequence_index, + collection_index=collection_index, + slice_center=slice_center, + slice_width=slice_width) + + self.__display_data_processor = DisplayDataProcessor( + element_data=ProcessorConnection(self.__element_data_processor, "data", "element_data"), + complex_display_type=complex_display_type) + + self.__data_range_processor = DataRangeProcessor( + data=data_and_metadata, + display_data=ProcessorConnection(self.__display_data_processor, "data", "display_data"), + ) + + self.__data_sample_processor = DataSampleProcessor( + data=data_and_metadata, + display_data=ProcessorConnection(self.__display_data_processor, "data", "display_data"), + ) + + self.__display_range_processor = DisplayRangeProcessor( + element_data=ProcessorConnection(self.__element_data_processor, "data", "element_data"), + display_limits=display_limits, + complex_display_type=complex_display_type, + data_range=ProcessorConnection(self.__data_range_processor, "data_range"), + data_sample=ProcessorConnection(self.__data_sample_processor, "data_sample"), + ) + + self.__normalized_data_processor = NormalizedDataProcessor( + display_data=ProcessorConnection(self.__display_data_processor, "data", "display_data"), + display_range=ProcessorConnection(self.__display_range_processor, "display_range"), + ) + + self.__adjusted_data_processor = AdjustedDataProcessor( + normalized_data=ProcessorConnection(self.__normalized_data_processor, "data", "normalized_data"), + display_data=ProcessorConnection(self.__display_data_processor, "data", "display_data"), + display_range=ProcessorConnection(self.__display_range_processor, "display_range"), + adjustments=adjustments, + ) + + self.__adjusted_display_range_processor = AdjustedDisplayRangeProcessor( + display_range=ProcessorConnection(self.__display_range_processor, "display_range"), + adjustments=adjustments + ) + + self.__transformed_display_range_processor = TransformedDisplayRangeProcessor( + adjusted_display_range=ProcessorConnection(self.__adjusted_display_range_processor, "display_range", "adjusted_display_range"), + brightness=brightness, + contrast=contrast + ) + + self.__display_rgb_processor = DisplayRGBProcessor( + adjusted_data=ProcessorConnection(self.__adjusted_data_processor, "data", "adjusted_data"), + data_range=ProcessorConnection(self.__data_range_processor, "data_range"), + display_range=ProcessorConnection(self.__transformed_display_range_processor, "display_range"), + color_map_data=color_map_data + ) + + self.__transformed_data_processor = TransformedDataProcessor( + adjusted_data=ProcessorConnection(self.__adjusted_data_processor, "data", "adjusted_data"), + transformed_display_range=ProcessorConnection(self.__transformed_display_range_processor, "display_range", "transformed_display_range"), + ) def finalize() -> None: DisplayValues._count -= 1 @@ -353,179 +741,53 @@ def color_map_data(self) -> typing.Optional[_RGBA32Type]: def data_and_metadata(self) -> typing.Optional[DataAndMetadata.DataAndMetadata]: return self.__data_and_metadata + @property + def display_rgba_timestamp(self) -> typing.Optional[datetime.datetime]: + return self.__data_and_metadata.timestamp if self.__data_and_metadata else None + @property def element_data_and_metadata(self) -> typing.Optional[DataAndMetadata.DataAndMetadata]: - with self.__lock: - if self.__element_data_and_metadata_dirty: - self.__element_data_and_metadata_dirty = False - data_and_metadata = self.__data_and_metadata - if data_and_metadata is not None: - timestamp = data_and_metadata.timestamp - data_and_metadata, modified = Core.function_element_data_no_copy(data_and_metadata, - self.__sequence_index, - self.__collection_index, - self.__slice_center, - self.__slice_width, - flag16=False) - if data_and_metadata: - data_and_metadata.data_metadata._set_timestamp(timestamp) - self.__element_data_and_metadata = data_and_metadata - return self.__element_data_and_metadata + return typing.cast(typing.Optional[DataAndMetadata.DataAndMetadata], self.__element_data_processor.get_result("data")) @property def display_data_and_metadata(self) -> typing.Optional[DataAndMetadata.DataAndMetadata]: - with self.__lock: - if self.__display_data_and_metadata_dirty: - self.__display_data_and_metadata_dirty = False - data_and_metadata = self.element_data_and_metadata - if data_and_metadata is not None: - timestamp = data_and_metadata.timestamp - data_and_metadata, modified = Core.function_scalar_data_no_copy(data_and_metadata, self.__complex_display_type) - if data_and_metadata: - data_and_metadata.data_metadata._set_timestamp(timestamp) - self.__display_data_and_metadata = data_and_metadata - return self.__display_data_and_metadata + return typing.cast(typing.Optional[DataAndMetadata.DataAndMetadata], self.__display_data_processor.get_result("data")) @property def data_range(self) -> typing.Optional[typing.Tuple[float, float]]: - with self.__lock: - if self.__data_range_dirty: - self.__data_range_dirty = False - display_data_and_metadata = self.display_data_and_metadata - display_data = display_data_and_metadata.data if display_data_and_metadata else None - if display_data is not None and display_data.shape and self.__data_and_metadata: - data_shape = self.__data_and_metadata.data_shape - data_dtype = self.__data_and_metadata.data_dtype - if Image.is_shape_and_dtype_rgb_type(data_shape, data_dtype): - self.__data_range = (0, 255) - elif Image.is_shape_and_dtype_complex_type(data_shape, data_dtype): - self.__data_range = (numpy.amin(display_data), numpy.amax(display_data)) - else: - self.__data_range = (numpy.amin(display_data), numpy.amax(display_data)) - else: - self.__data_range = None - if self.__data_range is not None: - if math.isnan(self.__data_range[0]) or math.isnan(self.__data_range[1]) or math.isinf(self.__data_range[0]) or math.isinf(self.__data_range[1]): - self.__data_range = (0.0, 0.0) - if numpy.issubdtype(type(self.__data_range[0]), numpy.bool_): - self.__data_range = (int(self.__data_range[0]), self.__data_range[1]) - if numpy.issubdtype(type(self.__data_range[1]), numpy.bool_): - self.__data_range = (self.__data_range[0], int(self.__data_range[1])) - return self.__data_range - - @property - def display_range(self) -> typing.Optional[typing.Tuple[float, float]]: - with self.__lock: - if self.__display_range_dirty: - self.__display_range_dirty = False - self.__display_range = calculate_display_range(self.__display_limits, self.data_range, self.data_sample, self.__data_and_metadata, self.__complex_display_type) - return self.__display_range + return typing.cast(typing.Optional[typing.Tuple[float, float]], self.__data_range_processor.get_result("data_range")) @property def data_sample(self) -> typing.Optional[_ImageDataType]: - with self.__lock: - if self.__data_sample_dirty: - self.__data_sample_dirty = False - display_data_and_metadata = self.display_data_and_metadata - display_data = display_data_and_metadata.data if display_data_and_metadata else None - if display_data is not None and display_data.shape and self.__data_and_metadata: - data_shape = self.__data_and_metadata.data_shape - data_dtype = self.__data_and_metadata.data_dtype - if Image.is_shape_and_dtype_rgb_type(data_shape, data_dtype): - self.__data_sample = None - elif Image.is_shape_and_dtype_complex_type(data_shape, data_dtype): - self.__data_sample = numpy.sort(numpy.random.choice(display_data.reshape(numpy.prod(display_data.shape, dtype=numpy.uint64)), 200)) - else: - self.__data_sample = None - else: - self.__data_sample = None - return self.__data_sample + return typing.cast(typing.Optional[_ImageDataType], self.__data_sample_processor.get_result("data_sample")) @property - def display_rgba(self) -> typing.Optional[_ImageDataType]: - with self.__lock: - if self.__display_rgba_dirty: - self.__display_rgba_dirty = False - display_data = self.adjusted_data_and_metadata - if display_data is not None and self.__data_and_metadata is not None: - if self.data_range is not None: # workaround until validating and retrieving data stats is an atomic operation - # display_range is just display_limits but calculated if display_limits is None - display_range = self.transformed_display_range - display_rgba = Core.function_display_rgba(display_data, display_range, self.__color_map_data) - self.__display_rgba = display_rgba.data if display_rgba else None - return self.__display_rgba + def display_range(self) -> typing.Optional[typing.Tuple[float, float]]: + return typing.cast(typing.Optional[typing.Tuple[float, float]], self.__display_range_processor.get_result("display_range")) @property - def display_rgba_timestamp(self) -> typing.Optional[datetime.datetime]: - return self.__display_rgba_timestamp + def display_rgba(self) -> typing.Optional[_ImageDataType]: + return typing.cast(typing.Optional[_ImageDataType], self.__display_rgb_processor.get_result("display_rgba")) @property def normalized_data_and_metadata(self) -> typing.Optional[DataAndMetadata.DataAndMetadata]: - with self.__lock: - if self.__normalized_data_and_metadata_dirty: - self.__normalized_data_and_metadata_dirty = False - display_data_and_metadata = self.display_data_and_metadata - display_range = self.display_range - if display_range is not None and display_data_and_metadata: - display_limit_low, display_limit_high = display_range - # normalize the data to [0, 1]. - m = 1 / (display_limit_high - display_limit_low) if display_limit_high != display_limit_low else 0.0 - b = -display_limit_low - self.__normalized_data_and_metadata = float(m) * (display_data_and_metadata + float(b)) - return self.__normalized_data_and_metadata + return typing.cast(typing.Optional[DataAndMetadata.DataAndMetadata], self.__normalized_data_processor.get_result("data")) @property def adjusted_data_and_metadata(self) -> typing.Optional[DataAndMetadata.DataAndMetadata]: - with self.__lock: - if self.__adjusted_data_and_metadata_dirty: - self.__adjusted_data_and_metadata_dirty = False - if self.__adjustments: - display_xdata = self.normalized_data_and_metadata - for adjustment_d in self.__adjustments: - adjustment = adjustment_factory(adjustment_d) - if adjustment: - display_range = self.display_range - if display_xdata and display_range is not None: - display_data = display_xdata.data - if display_data is not None: - display_xdata = DataAndMetadata.new_data_and_metadata(adjustment.transform(display_data, display_range)) - self.__adjusted_data_and_metadata = display_xdata - else: - self.__adjusted_data_and_metadata = self.display_data_and_metadata - return self.__adjusted_data_and_metadata + return typing.cast(typing.Optional[DataAndMetadata.DataAndMetadata], self.__adjusted_data_processor.get_result("data")) @property def adjusted_display_range(self) -> typing.Optional[typing.Tuple[float, float]]: - if self.__adjustments: - # transforms have already been applied and data is now in the range of 0.0, 1.0. - # brightness and contrast will be applied on top of this transform. - return 0.0, 1.0 - else: - return self.display_range + return typing.cast(typing.Optional[typing.Tuple[float, float]], self.__adjusted_display_range_processor.get_result("display_range")) @property def transformed_data_and_metadata(self) -> typing.Optional[DataAndMetadata.DataAndMetadata]: - with self.__lock: - if self.__transformed_data_and_metadata_dirty: - self.__transformed_data_and_metadata_dirty = False - adjusted_xdata = self.adjusted_data_and_metadata - if adjusted_xdata: - self.__transformed_data_and_metadata = Core.function_rescale(adjusted_xdata, data_range=(0.0, 1.0), in_range=self.transformed_display_range) - else: - self.__transformed_data_and_metadata = None - return self.__transformed_data_and_metadata + return typing.cast(typing.Optional[DataAndMetadata.DataAndMetadata], self.__transformed_data_processor.get_result("data")) @property def transformed_display_range(self) -> typing.Tuple[float, float]: - adjusted_display_range = self.adjusted_display_range - assert adjusted_display_range is not None - display_limit_low, display_limit_high = adjusted_display_range - brightness = self.__brightness - contrast = self.__contrast - m = (contrast / (display_limit_high - display_limit_low)) if contrast > 0 and display_limit_high != display_limit_low else 1 - b = 1 / (2 * m) - (1 - brightness) * (display_limit_high - display_limit_low) / 2 - display_limit_low - # back calculate the display limits as they would be would be with brightness/contrast adjustments - return (0 - m * b) / m, (1 - m * b) / m + return typing.cast(typing.Tuple[float, float], self.__transformed_display_range_processor.get_result("display_range")) DisplayValuesSubscription = object