diff --git a/.github/workflows/test-build-push.yml b/.github/workflows/test-build-push.yml index 31e30b16..cb7cb038 100644 --- a/.github/workflows/test-build-push.yml +++ b/.github/workflows/test-build-push.yml @@ -48,6 +48,7 @@ jobs: - name: Install test dependencies run: | micromamba install -f tests/requirements.txt -c conda-forge + pip install --no-deps git+https://github.com/isce-framework/tophu@main - name: Enable numba boundscheck for better error catching run: | echo "NUMBA_BOUNDSCHECK=1" >> $GITHUB_ENV diff --git a/CHANGELOG.md b/CHANGELOG.md index ae26f5b8..f414abe0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,34 @@ -# [Unreleased](https://github.com/isce-framework/dolphin/compare/v0.15.0...main) +# [Unreleased](https://github.com/isce-framework/dolphin/compare/v0.15.3...main) + +**Added** +- Added `dolphin.timeseries` module with basic functionality: + - Invert a stack of unwrapped interferograms to a timeseries (using correlation weighting optionally) + - Estimate a (weighted) linear velocity from a timeseries +- Create `DatasetStackWriter` protocol, with `BackgroundStackWriter` implementation + +**Changed** +- Rename `GdalWriter` to `BackgroundBlockWriter` + +**Fixed** +- `BackgroundRasterWriter` was not creating the files necessary before writing + +# [0.15.3](https://github.com/isce-framework/dolphin/compare/v0.15.2...0.15.3) - 2024-02-27 + +**Changed** +- Return the output paths created by the ionosphere/troposphere modules to make it easier to use afterward + +# [0.15.2](https://github.com/isce-framework/dolphin/compare/v0.15.1...0.15.2) - 2024-02-27 + +**Fixed** + +- Fixes to ionosphere/troposphere correction in for `DisplacementWorkflow` and `PsWorkflow` +- Correct the nodata value passed through to snaphu-py + +# [0.15.1](https://github.com/isce-framework/dolphin/compare/v0.15.0...0.15.1) - 2024-02-26 + +**Fixed** + +- PHASS now uses the Tophu wrapper to avoid isce3 inconsistencies between argument order # [0.15.0](https://github.com/isce-framework/dolphin/compare/v0.14.1...0.15.0) - 2024-02-16 @@ -10,7 +40,7 @@ **Fixed** -- Intersection of nodata regions for SLC stack are now all set to `nan` during phase linking +- Intersection of nodata regions for SLC stack are now all set to `nan` during phase linking, avoiding 0 gaps between bursts # [0.14.1](https://github.com/isce-framework/dolphin/compare/v0.14.0...0.14.1) - 2024-02-15 diff --git a/conda-env.yml b/conda-env.yml index b5d64620..7fa9b45a 100644 --- a/conda-env.yml +++ b/conda-env.yml @@ -17,7 +17,6 @@ dependencies: - rasterio>=1.3 - ruamel.yaml>=0.15 - scipy>=1.9 # "scipy 0.16+ is required for linear algebra", numba. 1.9 is the oldest version working with jax=0.4.19 - - shapely>=1.8 - threadpoolctl>=3.0 - tqdm>=4.60 - typing_extensions>=3.10 diff --git a/requirements.txt b/requirements.txt index e2365817..f6f01d3d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,6 +9,5 @@ pyproj>=3.3 rasterio>=1.3 ruamel_yaml>=0.15 scipy>=1.9 -shapely>=1.8 threadpoolctl>=3.0 tqdm>=4.60 diff --git a/src/dolphin/io/_readers.py b/src/dolphin/io/_readers.py index 96bb7540..ff9b7d6a 100644 --- a/src/dolphin/io/_readers.py +++ b/src/dolphin/io/_readers.py @@ -28,6 +28,7 @@ from dolphin.io._blocks import iter_blocks from ._background import _DEFAULT_TIMEOUT, BackgroundReader +from ._utils import _ensure_slices, _unpack_3d_slices logger = logging.getLogger(__name__) @@ -44,10 +45,9 @@ "EagerLoader", ] -if TYPE_CHECKING: - from builtins import ellipsis - Index = ellipsis | slice | int +if TYPE_CHECKING: + from dolphin._types import Index @runtime_checkable @@ -276,18 +276,6 @@ def __getitem__(self, key: tuple[Index, ...], /) -> np.ndarray: return _mask_array(data, self.nodata) if self.nodata is not None else data -def _ensure_slices(rows: Index, cols: Index) -> tuple[slice, slice]: - def _parse(key: Index): - if isinstance(key, int): - return slice(key, key + 1) - elif key is ...: - return slice(None) - else: - return key - - return _parse(rows), _parse(cols) - - @dataclass class RasterReader(DatasetReader): """A single raster band of a GDAL-compatible dataset. @@ -414,20 +402,7 @@ def __getitem__(self, key: tuple[Index, ...], /) -> np.ndarray: def _read_3d( key: tuple[Index, ...], readers: Sequence[DatasetReader], num_threads: int = 1 ): - # Check that it's a tuple of slices - if not isinstance(key, tuple): - msg = "Index must be a tuple of slices." - raise TypeError(msg) - if len(key) not in (1, 3): - msg = "Index must be a tuple of 1 or 3 slices." - raise TypeError(msg) - # If only the band is passed (e.g. stack[0]), convert to (0, :, :) - if len(key) == 1: - key = (key[0], slice(None), slice(None)) - # unpack the slices - bands, rows, cols = key - # convert the rows/cols to slices - r_slice, c_slice = _ensure_slices(rows, cols) + bands, r_slice, c_slice = _unpack_3d_slices(key) if isinstance(bands, slice): # convert the bands to -1-indexed list diff --git a/src/dolphin/io/_utils.py b/src/dolphin/io/_utils.py new file mode 100644 index 00000000..c79908b4 --- /dev/null +++ b/src/dolphin/io/_utils.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from dolphin._types import Index + + +def _ensure_slices(rows: Index, cols: Index) -> tuple[slice, slice]: + def _parse(key: Index): + if isinstance(key, int): + return slice(key, key + 1) + elif key is ...: + return slice(None) + else: + return key + + return _parse(rows), _parse(cols) + + +def _unpack_3d_slices(key: tuple[Index, ...]) -> tuple[Index, slice, slice]: + # Check that it's a tuple of slices + if not isinstance(key, tuple): + msg = "Index must be a tuple of slices." + raise TypeError(msg) + if len(key) not in (1, 3): + msg = "Index must be a tuple of 1 or 3 slices." + raise TypeError(msg) + # If only the band is passed (e.g. stack[0]), convert to (0, :, :) + if len(key) == 1: + key = (key[0], slice(None), slice(None)) + # unpack the slices + bands, rows, cols = key + # convert the rows/cols to slices + r_slice, c_slice = _ensure_slices(rows, cols) + return bands, r_slice, c_slice diff --git a/src/dolphin/io/_writers.py b/src/dolphin/io/_writers.py index 70559c58..78bd4f52 100644 --- a/src/dolphin/io/_writers.py +++ b/src/dolphin/io/_writers.py @@ -2,11 +2,13 @@ from contextlib import AbstractContextManager from dataclasses import dataclass +from pathlib import Path from typing import ( TYPE_CHECKING, Any, Mapping, Protocol, + Sequence, TypeVar, runtime_checkable, ) @@ -19,17 +21,58 @@ from dolphin._types import Filename from ._background import BackgroundWriter +from ._utils import _unpack_3d_slices __all__ = [ + "BackgroundBlockWriter", "DatasetWriter", + "DatasetStackWriter", "RasterWriter", - "GdalWriter", + "BackgroundRasterWriter", + "BackgroundStackWriter", ] if TYPE_CHECKING: from dolphin._types import Index +class BackgroundBlockWriter(BackgroundWriter): + """Class to write data to multiple files in the background using `gdal` bindings.""" + + def __init__(self, *, max_queue: int = 0, debug: bool = False, **kwargs): + if debug is False: + super().__init__(nq=max_queue, name="Writer", **kwargs) + else: + # Don't start a background thread. Just synchronously write data + self.queue_write = self.write # type: ignore[assignment] + + def write( + self, data: ArrayLike, filename: Filename, row_start: int, col_start: int + ): + """Write out an ndarray to a subset of the pre-made `filename`. + + Parameters + ---------- + data : ArrayLike + 2D or 3D data array to save. + filename : Filename + list of output files to save to, or (if cur_block is 2D) a single file. + row_start : int + Row index to start writing at. + col_start : int + Column index to start writing at. + + Raises + ------ + ValueError + If length of `output_files` does not match length of `cur_block`. + + """ + from dolphin.io import write_block + + write_block(data, filename, row_start, col_start) + + @runtime_checkable class DatasetWriter(Protocol): """An array-like interface for writing output datasets. @@ -54,6 +97,30 @@ def __setitem__(self, key: tuple[Index, ...], value: np.ndarray, /) -> None: ... +@runtime_checkable +class DatasetStackWriter(Protocol): + """An array-like interface for writing output datasets. + + `DatasetWriter` defines the abstract interface that types must conform to in order + to be used by functions which write outputs in blocks. + Such objects must export NumPy-like `dtype`, `shape`, and `ndim` attributes, + and must support NumPy-style slice-based indexing for setting data.. + """ + + dtype: np.dtype + """numpy.dtype : Data-type of the array's elements.""" + + shape: tuple[int, ...] + """tuple of int : Tuple of array dimensions.""" + + ndim: int = 3 + """int : Number of array dimensions.""" + + def __setitem__(self, key: tuple[Index, ...], value: np.ndarray, /) -> None: + """Write a block of data.""" + ... + + RasterT = TypeVar("RasterT", bound="RasterWriter") @@ -232,27 +299,38 @@ def __setitem__(self, key: tuple[Index, ...], value: np.ndarray, /) -> None: self.filename, "r+", ) as dataset: + if len(key) == 2: + rows, cols = key + elif len(key) == 3: + _, rows, cols = _unpack_3d_slices(key) + else: + raise ValueError(f"Invalid key: {key!r}") window = Window.from_slices( - *key, + rows, + cols, height=dataset.height, width=dataset.width, ) return dataset.write(value, self.band, window=window) -class BackgroundRasterWriter(BackgroundWriter): +class BackgroundRasterWriter(BackgroundWriter, DatasetWriter): """Class to write data to files in a background thread.""" def __init__( - self, filename: Filename, max_queue: int = 0, debug: bool = False, **kwargs + self, filename: Filename, *, max_queue: int = 0, debug: bool = False, **kwargs ): if debug is False: - super().__init__(nq=max_queue, name="Writer", **kwargs) + super().__init__(nq=max_queue, name="Writer") else: # Don't start a background thread. Just synchronously write data self.queue_write = self.write # type: ignore[assignment] - self._raster = RasterWriter(filename, **kwargs) + if Path(filename).exists(): + self._raster = RasterWriter(filename) + else: + self._raster = RasterWriter.create(filename, **kwargs) self.filename = filename + self.ndim = 2 def write(self, key: tuple[Index, ...], value: np.ndarray): """Write out an ndarray to a subset of the pre-made `filename`. @@ -289,10 +367,6 @@ def shape(self): def dtype(self) -> np.dtype: return self._raster.dtype - @property - def ndim(self) -> int: - return self._raster.ndim - def __enter__(self): return self @@ -300,25 +374,50 @@ def __exit__(self, exc_type, exc_value, traceback): # type: ignore[no-untyped-d self.close() -class GdalWriter(BackgroundWriter): - """Class to write data to files in a background thread using `gdal` bindings.""" +class BackgroundStackWriter(BackgroundWriter, DatasetStackWriter): + """Class to write 3D data to multiple files in a background thread. + + Will create/overwrite the files in `file_list` if they exist. + """ + + def __init__( + self, + file_list: Sequence[Filename], + *, + like_filename: Filename | None = None, + max_queue: int = 0, + debug: bool = False, + **file_creation_kwargs, + ): + from dolphin.io import write_arr - def __init__(self, max_queue: int = 0, debug: bool = False, **kwargs): if debug is False: - super().__init__(nq=max_queue, name="Writer", **kwargs) + super().__init__(nq=max_queue, name="GdalStackWriter") else: # Don't start a background thread. Just synchronously write data self.queue_write = self.write # type: ignore[assignment] - def write( - self, data: ArrayLike, filename: Filename, row_start: int, col_start: int - ): + for fn in file_list: + write_arr( + arr=None, + output_name=fn, + like_filename=like_filename, + **file_creation_kwargs, + ) + + self.file_list = file_list + + with rasterio.open(self.file_list[0]) as src: + self.shape = (len(self.file_list), *src.shape) + self.dtype = src.dtypes[0] + + def write(self, data: ArrayLike, row_start: int, col_start: int): """Write out an ndarray to a subset of the pre-made `filename`. Parameters ---------- data : ArrayLike - 2D or 3D data array to save. + 3D data array to save. filename : Filename list of output files to save to, or (if cur_block is 2D) a single file. row_start : int @@ -334,7 +433,27 @@ def write( """ from dolphin.io import write_block - write_block(data, filename, row_start, col_start) + if data.ndim == 2: + data = data[None, ...] + if data.shape[0] != len(self.file_list): + raise ValueError(f"{data.shape = }, but {len(self.file_list) = }") + for fn, layer in zip(self.file_list, data): + write_block(layer, fn, row_start, col_start) def __setitem__(self, key, value): - self.queue_write(value, key) + # Unpack the slices + band, rows, cols = _unpack_3d_slices(key) + # Check we asked to write to all the files + if band not in (slice(None), slice(None, None, None), ...): + self.notify_finished() + raise NotImplementedError("Can only write to all files at once.") + self.queue_write(value, rows.start, cols.start) + + @property + def closed(self) -> bool: + """bool : True if the dataset is closed.""" # noqa: D403 + return self._thread.is_alive() is False + + def close(self) -> None: + """Close the underlying dataset and stop the background thread.""" + self.notify_finished() diff --git a/src/dolphin/ps.py b/src/dolphin/ps.py index bdbb1b30..cbc2a794 100644 --- a/src/dolphin/ps.py +++ b/src/dolphin/ps.py @@ -107,7 +107,7 @@ def create_ps( skip_empty = nodata_mask is None - writer = io.GdalWriter() + writer = io.BackgroundBlockWriter() # Make the generator for the blocks block_gen = EagerLoader( reader, diff --git a/src/dolphin/timeseries.py b/src/dolphin/timeseries.py new file mode 100644 index 00000000..e821b0fc --- /dev/null +++ b/src/dolphin/timeseries.py @@ -0,0 +1,524 @@ +import logging +from concurrent.futures import Future, ThreadPoolExecutor +from pathlib import Path +from tempfile import NamedTemporaryFile +from typing import NamedTuple, Protocol, Sequence, TypeVar + +import jax.numpy as jnp +import numpy as np +from jax import Array, jit, vmap +from numpy.typing import ArrayLike +from opera_utils import get_dates +from tqdm.auto import tqdm + +from dolphin import DateOrDatetime, io +from dolphin._types import PathOrStr +from dolphin.utils import DummyProcessPoolExecutor, flatten, format_dates + +__all__ = [ + "weighted_lstsq_single", + "invert_stack", + "get_incidence_matrix", + "estimate_velocity", + "create_velocity", + "create_temporal_average", + "invert_unw_network", +] + +T = TypeVar("T") + +logger = logging.getLogger(__name__) + + +@jit +def weighted_lstsq_single( + A: ArrayLike, + b: ArrayLike, + weights: ArrayLike, +) -> Array: + r"""Perform weighted least for one data vector. + + Minimizes the weighted 2-norm of the residual vector: + + \[ + || b - A x ||^2_W + \] + + where \(W\) is a diagonal matrix of weights. + + Parameters + ---------- + A : Arraylike + Incidence matrix of shape (n_ifgs, n_sar_dates - 1) + b : ArrayLike, 1D + The phase differences between the ifg pairs + weights : ArrayLike, 1D, optional + The weights for each element of `b`. + + Returns + ------- + x : np.array 1D + The estimated phase for each SAR acquisition + residuals : np.array 1D + Sums of squared residuals: Squared Euclidean 2-norm for `b - A @ x` + For a 1D `b`, this is a scalar. + + """ + # scale both A and b by sqrt so we are minimizing + sqrt_weights = jnp.sqrt(weights) + # Multiply each data point by sqrt(weight) + b_scaled = b * sqrt_weights + # Multiply each row by sqrt(weight) + A_scaled = A * sqrt_weights[:, None] + + # Run the weighted least squares + x, residuals, rank, sing_vals = jnp.linalg.lstsq(A_scaled, b_scaled) + # TODO: do we need special handling? + # if rank < A.shape[1]: + # # logger.warning("Rank deficient solution") + + return x, residuals.ravel() + + +@jit +def invert_stack( + A: ArrayLike, dphi: ArrayLike, weights: ArrayLike | None = None +) -> Array: + """Solve the SBAS problem for a stack of unwrapped phase differences. + + Parameters + ---------- + A : ArrayLike + Incidence matrix of shape (n_ifgs, n_sar_dates - 1) + dphi : ArrayLike + The phase differences between the ifg pairs, shape=(n_ifgs, n_rows, n_cols) + weights : ArrayLike, optional + The weights for each element of `dphi`. + Same shape as `dphi`. + If not provided, all weights are set to 1 (ordinary least squares). + + Returns + ------- + phi : np.array 3D + The estimated phase for each SAR acquisition + Shape is (n_sar_dates, n_rows, n_cols) + residuals : np.array 2D + Sums of squared residuals: Squared Euclidean 2-norm for `dphi - A @ x` + Shape is (n_rows, n_cols) + + Notes + ----- + To mask out data points of a pixel, the weight can be set to 0. + When `A` remains full rank, setting the weight to zero is the same as removing + the entry from the data vector and the corresponding row from `A`. + + """ + n_ifgs, n_rows, n_cols = dphi.shape + + # vectorize the solve function to work on 2D and 3D arrays + # We are not vectorizing over the A matrix, only the dphi vector + # Solve 2d shapes: (nrows, n_ifgs) -> (nrows, n_sar_dates) + # invert_2d = vmap(invert_single, in_axes=(None, 1, 1), out_axes=1) + invert_2d = vmap(weighted_lstsq_single, in_axes=(None, 1, 1), out_axes=(1, 1)) + # Solve 3d shapes: (nrows, ncols, n_ifgs) -> (nrows, ncols, n_sar_dates) + invert_3d = vmap(invert_2d, in_axes=(None, 2, 2), out_axes=(2, 2)) + + if weights is None: + weights = jnp.ones_like(dphi) + phase, residuals = invert_3d(A, dphi, weights) + # Add 0 for the reference date to the front + phase = jnp.concatenate([jnp.zeros((1, n_rows, n_cols)), phase], axis=0) + # Reshape the residuals to be 2D + return phase, residuals[0] + + +def get_incidence_matrix( + ifg_pairs: Sequence[tuple[T, T]], sar_idxs: Sequence[T] | None = None +) -> np.ndarray: + """Build the indicator matrix from a list of ifg pairs (index 1, index 2). + + Parameters + ---------- + ifg_pairs : Sequence[tuple[T, T]] + List of ifg pairs represented as tuples of (day 1, day 2) + Can be ints, datetimes, etc. + sar_idxs : Sequence[T], optional + If provided, used as the total set of indexes which `ifg_pairs` + were formed from. + Otherwise, created from the unique entries in `ifg_pairs`. + Only provide if there are some dates which are not present in `ifg_pairs`. + + Returns + ------- + A : np.array 2D + The incident-like matrix for the system: A*phi = dphi + Each row corresponds to an ifg, each column to a SAR date. + The value will be -1 on the early (reference) ifgs, +1 on later (secondary) + since the ifg phase = (later - earlier) + Shape: (n_ifgs, n_sar_dates - 1) + + """ + if sar_idxs is None: + sar_idxs = sorted(set(flatten(ifg_pairs))) + + M = len(ifg_pairs) + N = len(sar_idxs) - 1 + A = np.zeros((M, N)) + + # Create a dictionary mapping sar dates to matrix columns + # We take the first SAR acquisition to be time 0, leave out of matrix + date_to_col = {date: i for i, date in enumerate(sar_idxs[1:])} + # Populate the matrix + for i, (early, later) in enumerate(ifg_pairs): + if early in date_to_col: + A[i, date_to_col[early]] = -1 + if later in date_to_col: + A[i, date_to_col[later]] = +1 + + return A + + +@jit +def estimate_velocity_pixel(x: ArrayLike, y: ArrayLike, w: ArrayLike) -> Array: + """Estimate the velocity from a single pixel's time series. + + Parameters + ---------- + x : np.array 1D + The time values + y : np.array 1D + The unwrapped phase values + w : np.array 1D + The weights for each time value + + Returns + ------- + velocity : np.array, 0D + The estimated velocity in (unw unit) / day. + + """ + # Jax polyfit will grab the first *2* dimensions of y to solve in a batch + return jnp.polyfit(x, y, deg=1, w=w.reshape(y.shape), rcond=None)[0] + + +@jit +def estimate_velocity( + x_arr: ArrayLike, unw_stack: ArrayLike, weight_stack: ArrayLike +) -> Array: + """Estimate the velocity from a stack of unwrapped interferograms. + + Parameters + ---------- + x_arr : ArrayLike + Array of time values corresponding to each unwrapped phase image. + Length must match `unw_stack.shape[0]`. + unw_stack : ArrayLike + Array of unwrapped phase values at each pixel, shape=`(n_time, n_rows, n_cols)`. + weight_stack : ArrayLike + Array of weights for each pixel, shape=`(n_time, n_rows, n_cols)`. + If not provided, all weights are set to 1. + + Returns + ------- + velocity : np.array 2D + The estimated velocity in (unw unit) / day. + E.g. if the unwrapped phase is in radians, the velocity is in rad/day. + + """ + # TODO: weighted least squares using correlation? + n_time, n_rows, n_cols = unw_stack.shape + + # We use the same x inputs for all output pixels + assert unw_stack.shape == weight_stack.shape + unw_pixels = unw_stack.reshape(n_time, -1) + weights_pixels = weight_stack.reshape(n_time, 1, -1) + + # coeffs = jnp.polyfit(x_arr, unw_pixels, deg=1, rcond=None) + velos = vmap(estimate_velocity_pixel, in_axes=(None, -1, -1))( + x_arr, unw_pixels, weights_pixels + ) + return velos.reshape(n_rows, n_cols) + + +def datetime_to_float(dates: Sequence[DateOrDatetime]) -> np.ndarray: + """Convert a sequence of datetime objects to a float representation. + + Output units are in days since the first item in `dates`. + + Parameters + ---------- + dates : Sequence[DateOrDatetime] + List of datetime objects to convert to floats + + Returns + ------- + date_arr : np.array 1D + The float representation of the datetime objects + + """ + sec_per_day = 60 * 60 * 24 + date_arr = np.asarray(dates).astype("datetime64[s]") + # Reference the 0 to the first date + date_arr = date_arr - date_arr[0] + return date_arr.astype(float) / sec_per_day + + +class BlockProcessor(Protocol): + """Protocol for a block-wise processing function. + + Reads a block of data from each reader, processes it, and returns the result + as an array-like object. + """ + + def __call__( + self, readers: Sequence[io.StackReader], rows: slice, cols: slice + ) -> ArrayLike: + ... + + +def process_blocks( + readers: Sequence[io.StackReader], + writer: io.DatasetWriter, + func: BlockProcessor, + # output_files: Sequence[PathOrStr], + # like_filename: PathOrStr | None = None, + block_shape: tuple[int, int] = (512, 512), + num_threads: int = 5, +): + """Perform block-wise processing over blocks in `readers`, writing to `writer`.""" + # reader = RasterStackReader.from_file_list(file_list=file_list) + # writer = io.GdalStackWriter( + # file_list=output_files, like_filename=like_filename or file_list[0] + # ) + # shape = io.get_raster_xysize(file_list[0])[::-1] + shape = readers[0].shape[-2:] + slices = list(io.iter_blocks(shape, block_shape=block_shape)) + + pbar = tqdm(total=len(slices)) + + # Define the callback to write the result to an output DatasetWrite + def write_callback(fut: Future): + rows, cols, data = fut.result() + writer[..., rows, cols] = data + pbar.update() + + Executor = ThreadPoolExecutor if num_threads > 1 else DummyProcessPoolExecutor + with Executor(num_threads) as exc: + for rows, cols in slices: + future = exc.submit(func, readers=readers, rows=rows, cols=cols) + future.add_done_callback(write_callback) + + +def create_velocity( + unw_file_list: Sequence[PathOrStr], + output_file: PathOrStr, + date_list: Sequence[DateOrDatetime] | None = None, + cor_file_list: Sequence[PathOrStr] | None = None, + cor_threshold: float = 0.2, + block_shape: tuple[int, int] = (512, 512), + num_threads: int = 5, +) -> None: + """Perform pixel-wise (weighted) linear regression to estimate velocity.""" + if date_list is None: + date_list = [get_dates(f)[1] for f in unw_file_list] + # TODO: need the relative one? + x_arr = datetime_to_float(date_list) + + def read_and_fit( + readers: Sequence[io.StackReader], rows: slice, cols: slice + ) -> tuple[slice, slice, np.ndarray]: + if len(readers) == 2: + unw_reader, cor_reader = readers + unw_stack = unw_reader[:, rows, cols] + weights = cor_reader[:, rows, cols] + weights[weights < cor_threshold] = 0 + else: + unw_stack = readers[0][:, rows, cols] + weights = np.ones_like(unw_stack) + # Fit a line to each pixel with weighted least squares + return ( + rows, + cols, + estimate_velocity(x_arr=x_arr, stack=unw_stack, weight_stack=weights), + ) + + # Note: For some reason, the `RasterStackReader` is much slower than the VRT: + # ~300 files takes >2 min to open, >2 min to read each block + # VRTStack seems to take ~30 secs to open, 1 min to read + # Very possible there's a tuning param/rasterio config to fix, but not sure. + # unw_reader = io.RasterStackReader.from_file_list(file_list=unw_file_list) + # cor_reader = io.RasterStackReader.from_file_list(file_list=cor_file_list) + with NamedTemporaryFile(mode="w", suffix=".vrt") as f1, NamedTemporaryFile( + mode="w", suffix=".vrt" + ) as f2: + unw_reader = io.VRTStack( + file_list=unw_file_list, outfile=f1.name, skip_size_check=True + ) + if cor_file_list is not None: + cor_reader = io.VRTStack( + file_list=cor_file_list, outfile=f2.name, skip_size_check=True + ) + readers = [unw_reader, cor_reader] + else: + readers = [unw_reader] + + writer = io.BackgroundRasterWriter(output_file, like_filename=unw_file_list[0]) + process_blocks( + # file_list=unw_file_list, + # output_files=[output_file], + readers=readers, + writer=writer, + func=read_and_fit, + block_shape=block_shape, + num_threads=num_threads, + ) + + writer.notify_finished() + + +def create_temporal_average( + file_list: Sequence[PathOrStr], + output_file: PathOrStr, + block_shape: tuple[int, int] = (512, 512), + num_threads: int = 5, +) -> None: + """Average all images in `reader` to create a 2D image in `output_file`.""" + + def read_and_average( + readers: Sequence[io.StackReader], rows: slice, cols: slice + ) -> tuple[slice, slice, np.ndarray]: + return rows, cols, np.nanmean(readers[0][:, rows, cols], axis=0) + + writer = io.BackgroundRasterWriter(output_file, like_filename=file_list[0]) + with NamedTemporaryFile(mode="w", suffix=".vrt") as f: + reader = io.VRTStack(file_list=file_list, outfile=f.name, skip_size_check=True) + + process_blocks( + # file_list=file_list, + # output_files=[output_file], + readers=[reader], + writer=writer, + func=read_and_average, + block_shape=block_shape, + num_threads=num_threads, + ) + + writer.notify_finished() + + +class ReferencePoint(NamedTuple): + row: int + col: int + + +def invert_unw_network( + unw_file_list: Sequence[PathOrStr], + reference: ReferencePoint, + output_dir: PathOrStr, + cor_file_list: Sequence[PathOrStr] | None = None, + cor_threshold: float = 0.2, + ifg_date_pairs: Sequence[Sequence[DateOrDatetime]] | None = None, + block_shape: tuple[int, int] = (512, 512), + num_threads: int = 5, +) -> list[Path]: + """Perform pixel-wise inversion of unwrapped network to get phase per date. + + Parameters + ---------- + unw_file_list : Sequence[PathOrStr] + List of unwrapped phase files. + reference : ReferencePoint + The reference point to use for the inversion. + The data vector from `unw_file_list` at this point will be subtracted + from all other points when solving. + output_dir : PathOrStr + The directory to save the output files + ifg_date_pairs : Sequence[Sequence[DateOrDatetime]], optional + List of date pairs to use for the inversion. If not provided, will be + parsed from filenames in `unw_file_list`. + block_shape : tuple[int, int], optional + The shape of the blocks to process in parallel + cor_file_list : Sequence[PathOrStr], optional + List of correlation files to use for weighting the inversion + cor_threshold : float, optional + The correlation threshold to use for weighting the inversion + num_threads : int, optional + The parallel blocks to process at once. + + Returns + ------- + out_paths : list[Path] + List of the output files created by the inversion. + + """ + if ifg_date_pairs is None: + ifg_date_pairs = [get_dates(f) for f in unw_file_list] + + try: + # Ensure it's a list of pairs + ifg_tuples = [(ref, sec) for (ref, sec) in ifg_date_pairs] # noqa: C416 + except ValueError as e: + raise ValueError( + "Each item in `ifg_date_pairs` must be a sequence of length 2" + ) from e + + sar_dates = sorted(set(flatten(ifg_tuples))) + A = get_incidence_matrix(ifg_pairs=ifg_tuples, sar_idxs=sar_dates) + # Make the names of the output files from the SAR dates to solve for + ref_date = sar_dates[0] + suffix = ".tif" + out_paths = [ + Path(output_dir) / (format_dates(ref_date, d) + suffix) for d in sar_dates + ] + + out_vrt_name = Path(output_dir) / "unw_network.vrt" + unw_reader = io.VRTStack( + file_list=unw_file_list, outfile=out_vrt_name, skip_size_check=True + ) + cor_vrt_name = Path(output_dir) / "unw_network.vrt" + + # Get the reference point data + ref_row, ref_col = reference + ref_data = unw_reader[:, ref_row, ref_col].reshape(-1, 1, 1) + + def read_and_solve( + readers: Sequence[io.StackReader], rows: slice, cols: slice + ) -> tuple[slice, slice, np.ndarray]: + if len(readers) == 2: + unw_reader, cor_reader = readers + stack = unw_reader[:, rows, cols] + weights = cor_reader[:, rows, cols] + weights[weights < cor_threshold] = 0 + else: + stack = readers[0][:, rows, cols] + weights = np.ones_like(stack) + + # subtract the reference + stack = stack - ref_data + + # TODO: possible second input for weights? from conncomps + # TODO: do i want to write residuals too? Do i need + # to have multiple writers then? + phases = invert_stack(A, stack, weights)[0] + return rows, cols, np.asarray(phases) + + if cor_file_list is not None: + cor_reader = io.VRTStack( + file_list=cor_file_list, outfile=cor_vrt_name, skip_size_check=True + ) + readers = [unw_reader, cor_reader] + else: + readers = [unw_reader] + + writer = io.BackgroundStackWriter(out_paths, like_filename=unw_file_list[0]) + + process_blocks( + readers=readers, + writer=writer, + func=read_and_solve, + block_shape=block_shape, + num_threads=num_threads, + ) + writer.notify_finished() + # Return the output files + return out_paths diff --git a/src/dolphin/workflows/single.py b/src/dolphin/workflows/single.py index 1cd6f2fb..21fcd374 100644 --- a/src/dolphin/workflows/single.py +++ b/src/dolphin/workflows/single.py @@ -103,7 +103,7 @@ def run_wrapped_phase_single( logger.info(msg) # Create the background writer for this ministack - writer = io.GdalWriter() + writer = io.BackgroundBlockWriter() logger.info(f"Total stack size (in pixels): {vrt.shape}") # Set up the output folder with empty files to write into diff --git a/tests/requirements.txt b/tests/requirements.txt index c61594e6..6874a625 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -7,3 +7,4 @@ pytest pytest-cov pytest-randomly # control random seed pytest-xdist # parallel tests: https://pytest-xdist.readthedocs.io/en/latest/ +shapely diff --git a/tests/test_io_writers.py b/tests/test_io_writers.py index 27369300..8ca704e1 100644 --- a/tests/test_io_writers.py +++ b/tests/test_io_writers.py @@ -1,4 +1,5 @@ import warnings +from pathlib import Path import numpy as np import pytest @@ -6,7 +7,12 @@ from rasterio.errors import NotGeoreferencedWarning from dolphin.io._core import load_gdal -from dolphin.io._writers import BackgroundRasterWriter, RasterWriter +from dolphin.io._writers import ( + BackgroundBlockWriter, + BackgroundRasterWriter, + BackgroundStackWriter, + RasterWriter, +) # Note: uses the fixtures from conftest.py @@ -28,6 +34,27 @@ def suppress_not_georeferenced_warning(): yield +@pytest.fixture +def output_file_list(slc_file_list): + suffix = Path(slc_file_list[0]).suffix + return [f"{f}_out{suffix}" for f in slc_file_list] + + +def test_background_block_writer(output_file_list, slc_file_list): + from dolphin.io import write_arr + + write_arr(arr=None, output_name=output_file_list[0], like_filename=slc_file_list[0]) + data = np.random.randn(5, 10) + w = BackgroundBlockWriter() + rows, cols = slice(0, 5), slice(0, 10) + w.queue_write(data, output_file_list[0], 0, 0) + # Make sure we dont write too late + w.notify_finished() + assert w._thread.is_alive() is False + + assert np.allclose(load_gdal(output_file_list[0], rows=rows, cols=cols), data) + + # #### RasterReader Tests #### class TestRasterWriter: def raster_init(self, slc_file_list, slc_stack): @@ -74,7 +101,7 @@ def test_init(self, slc_file_list): brw.close() assert brw.closed is True - def test_write(self, slc_file_list): + def test_setitem(self, slc_file_list): data = np.random.randn(5, 10) w = BackgroundRasterWriter(slc_file_list[0]) rows, cols = slice(0, 5), slice(0, 10) @@ -95,3 +122,31 @@ def test_context_manager(self, slc_file_list): assert w.closed is True assert w._thread.is_alive() is False assert np.allclose(load_gdal(slc_file_list[0], rows=rows, cols=cols), data) + + +class TestBackgroundStackWriter: + def test_stack(self, slc_file_list, output_file_list): + w = BackgroundStackWriter(output_file_list, like_filename=slc_file_list[0]) + with rio.open(output_file_list[0]) as src: + shape_3d = (len(slc_file_list), *src.shape) + assert w.shape == shape_3d + assert w.dtype == src.dtypes[0] + + assert w.ndim == 3 + assert w.closed is False + w.close() + assert w.closed is True + + def test_setitem(self, output_file_list, slc_file_list): + data = np.random.randn(len(output_file_list), 5, 10) + w = BackgroundStackWriter(output_file_list, like_filename=slc_file_list[0]) + rows, cols = slice(0, 5), slice(0, 10) + w[:, rows, cols] = data + # Make sure we dont write too late + w.close() + assert w._thread.is_alive() is False + + for i in range(len(output_file_list)): + assert np.allclose( + load_gdal(output_file_list[i], rows=rows, cols=cols), data[i] + ) diff --git a/tests/test_timeseries.py b/tests/test_timeseries.py new file mode 100644 index 00000000..c631b1e8 --- /dev/null +++ b/tests/test_timeseries.py @@ -0,0 +1,224 @@ +import itertools +from datetime import datetime, timedelta + +import numpy as np +import numpy.testing as npt +import pytest +from numpy.linalg import lstsq as lstsq_numpy + +from dolphin import io, timeseries +from dolphin.utils import format_dates + +NUM_DATES = 10 +DT = 12 +START_DATE = datetime(2020, 1, 1) +SHAPE = 50, 50 +VELO_RAD_PER_DAY = 0.2 # rad / day + + +def make_sar_dates(): + sar_date_list = [] + for idx in range(NUM_DATES): + sar_date_list.append(START_DATE + timedelta(days=idx * DT)) + return sar_date_list + + +def make_sar_phases(sar_dates): + out = np.empty((NUM_DATES, *SHAPE), dtype="float32") + # Make a linear ramp which increases each time interval + ramp0 = np.ones((SHAPE[0], 1)) * np.arange(SHAPE[1]).reshape(1, -1) + ramp0 /= SHAPE[1] + for idx in range(len(sar_dates)): + cur_slope = VELO_RAD_PER_DAY * DT * idx + out[idx] = cur_slope * ramp0 + return out + + +def make_ifg_date_pairs(sar_dates): + """Form all possible unwrapped interferogram pairs from the date list""" + return [tuple(pair) for pair in itertools.combinations(sar_dates, 2)] + + +def make_ifgs(sar_phases): + """Form all possible unwrapped interferogram pairs from the sar images""" + return np.stack( + [(sec - ref) for (ref, sec) in itertools.combinations(sar_phases, 2)] + ) + + +@pytest.fixture(scope="module") +def data(): + sar_dates = make_sar_dates() + sar_phases = make_sar_phases(sar_dates) + ifg_date_pairs = make_ifg_date_pairs(sar_dates) + ifgs = make_ifgs(sar_phases) + return sar_dates, sar_phases, ifg_date_pairs, ifgs + + +def solve_with_removal(A, b): + good_idxs = (~np.isnan(b)).flatten() + b_clean = b[good_idxs] + A_clean = A[good_idxs, :] + return lstsq_numpy(A_clean, b_clean)[0] + + +def solve_by_zeroing(A, b): + weight = (~np.isnan(b)).astype(float) + A0 = A.copy() + A0 = A * weight[:, None] + b2 = np.nan_to_num(b) + return lstsq_numpy(A0, b2)[0] + + +class TestUtils: + def test_datetime_to_float(self): + sar_dates = make_sar_dates() + date_arr = timeseries.datetime_to_float(sar_dates) + expected = np.array( + [0.0, 12.0, 24.0, 36.0, 48.0, 60.0, 72.0, 84.0, 96.0, 108.0] + ) + assert np.allclose(date_arr, expected) + + def test_incidence_matrix(self): + A = timeseries.get_incidence_matrix([(0, 1), (1, 2), (2, 3), (3, 4), (4, 5)]) + assert A.shape == (5, 5) + expected = np.array( + [ + [1, 0, 0, 0, 0], + [-1, 1, 0, 0, 0], + [0, -1, 1, 0, 0], + [0, 0, -1, 1, 0], + [0, 0, 0, -1, 1], + ] + ) + np.testing.assert_array_equal(A, expected) + + +class TestInvert: + @pytest.fixture + def A(self, data): + _, _, ifg_date_pairs, _ = data + return timeseries.get_incidence_matrix(ifg_date_pairs) + + def test_basic(self, data, A): + sar_dates, sar_phases, ifg_date_pairs, ifgs = data + + # Get one pixel + b = ifgs[:, -1, -1] + weights = np.ones_like(b) + x, residual = timeseries.weighted_lstsq_single(A, b, weights) + assert x.shape[0] == len(sar_dates) - 1 + npt.assert_allclose(x, sar_phases[1:, -1, -1], atol=1e-5) + assert np.array(residual).item() < 1e-5 + + def test_stack(self, data, A): + sar_dates, sar_phases, ifg_date_pairs, ifgs = data + + phi_stack, residuals = timeseries.invert_stack(A, ifgs) + assert phi_stack.shape == sar_phases.shape + assert residuals.shape == sar_phases.shape[1:] + npt.assert_allclose(phi_stack, sar_phases, atol=1e-5) + npt.assert_array_less(residuals, 1e-5) + + def test_weighted_stack(self, data, A): + sar_dates, sar_phases, ifg_date_pairs, ifgs = data + + weights = np.ones_like(ifgs) + phi, residuals = timeseries.invert_stack(A, ifgs, weights) + assert phi.shape[0] == len(sar_dates) + npt.assert_allclose(phi, sar_phases, atol=1e-5) + # Here there is no noise, so it's over determined + weights[1, :, :] = 0 + phi2, residuals2 = timeseries.invert_stack(A, ifgs, weights) + npt.assert_allclose(phi2, sar_phases, atol=1e-5) + npt.assert_allclose(residuals2, residuals, atol=1e-5) + + def test_remove_row_vs_weighted(self, data, A): + """Check that removing a row/data point is equivalent to zero-weighting it.""" + sar_dates, sar_phases, ifg_date_pairs, ifgs = data + dphi = ifgs[:, -1, -1] + # add noise to the ifgs, or we can't tell a difference + dphi_noisy = dphi + np.random.normal(0, 0.3, dphi.shape) + + # Once with a zeroed weight + weights = np.ones_like(dphi_noisy) + weights[1] = 0 + phi_0, residual_0 = timeseries.weighted_lstsq_single(A, dphi_noisy, weights) + + A2 = A.copy() + # delete a row + A2 = np.delete(A2, 1, axis=0) + # delete the data point + dphi2 = np.delete(dphi_noisy, 1) + weights2 = np.delete(weights, 1) + phi2, residual2 = timeseries.weighted_lstsq_single(A2, dphi2, weights2) + npt.assert_allclose(phi_0, phi2, atol=1e-5) + + npt.assert_allclose(residual_0, residual2, atol=1e-5) + + @pytest.fixture + def unw_files(self, tmp_path, data, raster_100_by_200): + """Write the data to disk and return the file names.""" + sar_dates, sar_phases, ifg_date_pairs, ifgs = data + + out = [] + # use the raster as a template + for pair, ifg in zip(ifg_date_pairs, ifgs): + fname = tmp_path / (format_dates(*pair) + ".tif") + io.write_arr(arr=ifg, output_name=fname, like_filename=raster_100_by_200) + out.append(fname) + + return out + + def test_invert_unw_network(self, data, unw_files, tmp_path): + """""" + + output_dir = tmp_path / "output" + output_dir.mkdir() + ref_point = (0, 0) + out_files = timeseries.invert_unw_network( + unw_file_list=unw_files, + reference=ref_point, + output_dir=output_dir, + # ifg_date_pairs: Sequence[Sequence[DateOrDatetime]] | None = None, + # block_shape: tuple[int, int] = (512, 512), + # cor_file_list: Sequence[PathOrStr] | None = None, + # cor_threshold: float = 0.2, + num_threads=1, + ) + # Check results + solved_stack = io.RasterStackReader.from_file_list(out_files)[:, :, :] + sar_phases = data[1] + npt.assert_allclose(solved_stack, sar_phases, atol=1e-5) + + +class TestVelocity: + @pytest.fixture + def x_arr(self, data): + sar_dates, sar_phases, ifg_date_pairs, ifgs = data + return timeseries.datetime_to_float(sar_dates) + + @pytest.fixture + def expected_velo(self, data, x_arr): + sar_dates, sar_phases, ifg_date_pairs, ifgs = data + out = np.zeros(SHAPE) + for i in range(SHAPE[0]): + for j in range(SHAPE[1]): + out[i, j] = np.polyfit(x_arr, sar_phases[:, i, j], 1)[0] + return out + + def test_stack(self, data, x_arr, expected_velo): + sar_dates, sar_phases, ifg_date_pairs, ifgs = data + + weights = np.ones_like(sar_phases) + velocities = timeseries.estimate_velocity(x_arr, sar_phases, weights) + assert velocities.shape == (SHAPE[0], SHAPE[1]) + npt.assert_allclose(velocities, expected_velo, atol=1e-5) + + +if __name__ == "__main__": + # import the fixtures + sar_dates = make_sar_dates() + sar_phases = make_sar_phases(sar_dates) + ifg_date_pairs = make_ifg_date_pairs(sar_dates) + ifgs = make_ifgs(sar_phases)