diff --git a/.github/workflows/pylint.yml b/.github/workflows/pylint.yml new file mode 100644 index 000000000..b539abde3 --- /dev/null +++ b/.github/workflows/pylint.yml @@ -0,0 +1,26 @@ +name: Pylint Loads + +on: [push, pull_request] + +jobs: + formatting-and-linting: + runs-on: ubuntu-latest + + steps: + - name: Check out code + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: '3.11' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip wheel + pip install pylint + pip install . + + - name: Run Pylint on mhkit/loads/ + run: | + pylint mhkit/loads/ diff --git a/mhkit/loads/__init__.py b/mhkit/loads/__init__.py index d6c0551cc..4c21c7391 100644 --- a/mhkit/loads/__init__.py +++ b/mhkit/loads/__init__.py @@ -1,3 +1,12 @@ +""" +The `loads` package of the MHKiT (Marine and Hydrokinetic Toolkit) library +provides tools and functionalities for analyzing and visualizing loads data +from marine and hydrokinetic (MHK) devices. This package is designed to +assist engineers, researchers, and analysts in understanding the forces and +stresses applied to MHK devices under various operational and environmental +conditions. +""" + from mhkit.loads import general from mhkit.loads import graphics from mhkit.loads import extreme diff --git a/mhkit/loads/extreme.py b/mhkit/loads/extreme.py deleted file mode 100644 index 8fb549625..000000000 --- a/mhkit/loads/extreme.py +++ /dev/null @@ -1,1010 +0,0 @@ -import numpy as np -import pandas as pd -import xarray as xr -from scipy import stats, optimize, signal -from mhkit.wave.resource import frequency_moment -from mhkit.utils import upcrossing, custom - - -def _peaks_over_threshold(peaks, threshold, sampling_rate): - threshold_unit = np.percentile(peaks, 100 * threshold, method="hazen") - idx_peaks = np.arange(len(peaks)) - idx_storm_peaks, storm_peaks = global_peaks(idx_peaks, peaks - threshold_unit) - idx_storm_peaks = idx_storm_peaks.astype(int) - - # Two storms that are close enough (within specified window) are - # considered the same storm, to ensure independence. - independent_storm_peaks = [ - storm_peaks[0], - ] - idx_independent_storm_peaks = [ - idx_storm_peaks[0], - ] - # check first 14 days to determine window size - nlags = int(14 * 24 / sampling_rate) - x = peaks - np.mean(peaks) - acf = signal.correlate(x, x, mode="full") - lag = signal.correlation_lags(len(x), len(x), mode="full") - idx_zero = np.argmax(lag == 0) - positive_lag = lag[(idx_zero) : (idx_zero + nlags + 1)] - acf_positive = acf[(idx_zero) : (idx_zero + nlags + 1)] / acf[idx_zero] - - window_size = sampling_rate * positive_lag[acf_positive < 0.5][0] - # window size in "observations" instead of "hours" between peaks. - window = window_size / sampling_rate - # keep only independent storm peaks - for idx in idx_storm_peaks[1:]: - if (idx - idx_independent_storm_peaks[-1]) > window: - idx_independent_storm_peaks.append(idx) - independent_storm_peaks.append(peaks[idx] - threshold_unit) - elif peaks[idx] > independent_storm_peaks[-1]: - idx_independent_storm_peaks[-1] = idx - independent_storm_peaks[-1] = peaks[idx] - threshold_unit - - return independent_storm_peaks - - -def global_peaks(t, data): - """ - Find the global peaks of a zero-centered response time-series. - - The global peaks are the maxima between consecutive zero - up-crossings. - - Parameters - ---------- - t: np.array - Time array. - data: np.array - Response time-series. - - Returns - ------- - t_peaks: np.array - Time array for peaks - peaks: np.array - Peak values of the response time-series - """ - if not isinstance(t, np.ndarray): - raise TypeError(f"t must be of type np.ndarray. Got: {type(t)}") - if not isinstance(data, np.ndarray): - raise TypeError(f"data must be of type np.ndarray. Got: {type(data)}") - - # Find zero up-crossings - inds = upcrossing(t, data) - - # We also include the final point in the dataset - inds = np.append(inds, len(data) - 1) - - # As we want to return both the time and peak - # values, look for the index at the peak. - # The call to argmax gives us the index within the - # upcrossing period. Therefore to get the index in the - # original array we need to add on the index that - # starts the zero crossing period, ind1. - func = lambda ind1, ind2: np.argmax(data[ind1:ind2]) + ind1 - - peak_inds = np.array(custom(t, data, func, inds), dtype=int) - - return t[peak_inds], data[peak_inds] - - -def number_of_short_term_peaks(n, t, t_st): - """ - Estimate the number of peaks in a specified period. - - Parameters - ---------- - n : int - Number of peaks in analyzed timeseries. - t : float - Length of time of analyzed timeseries. - t_st: float - Short-term period for which to estimate the number of peaks. - - Returns - ------- - n_st : float - Number of peaks in short term period. - """ - if not isinstance(n, int): - raise TypeError(f"n must be of type int. Got: {type(n)}") - if not isinstance(t, float): - raise TypeError(f"t must be of type float. Got: {type(t)}") - if not isinstance(t_st, float): - raise TypeError(f"t_st must be of type float. Got: {type(t_st)}") - - return n * t_st / t - - -def peaks_distribution_weibull(x): - """ - Estimate the peaks distribution by fitting a Weibull - distribution to the peaks of the response. - - The fitted parameters can be accessed through the `params` field of - the returned distribution. - - Parameters - ---------- - x : np.array - Global peaks. - - Returns - ------- - peaks: scipy.stats.rv_frozen - Probability distribution of the peaks. - """ - if not isinstance(x, np.ndarray): - raise TypeError(f"x must be of type np.ndarray. Got: {type(x)}") - - # peaks distribution - peaks_params = stats.exponweib.fit(x, f0=1, floc=0) - param_names = ["a", "c", "loc", "scale"] - peaks_params = {k: v for k, v in zip(param_names, peaks_params)} - peaks = stats.exponweib(**peaks_params) - # save the parameter info - peaks.params = peaks_params - return peaks - - -def peaks_distribution_weibull_tail_fit(x): - """ - Estimate the peaks distribution using the Weibull tail fit - method. - - The fitted parameters can be accessed through the `params` field of - the returned distribution. - - Parameters - ---------- - x : np.array - Global peaks. - - Returns - ------- - peaks: scipy.stats.rv_frozen - Probability distribution of the peaks. - """ - if not isinstance(x, np.ndarray): - raise TypeError(f"x must be of type np.ndarray. Got: {type(x)}") - - # Initial guess for Weibull parameters - p0 = stats.exponweib.fit(x, f0=1, floc=0) - p0 = np.array([p0[1], p0[3]]) - # Approximate CDF - x = np.sort(x) - npeaks = len(x) - F = np.zeros(npeaks) - for i in range(npeaks): - F[i] = i / (npeaks + 1.0) - # Divide into seven sets & fit Weibull - subset_shape_params = np.zeros(7) - subset_scale_params = np.zeros(7) - setLim = np.arange(0.60, 0.90, 0.05) - func = lambda x, c, s: stats.exponweib(a=1, c=c, loc=0, scale=s).cdf(x) - for set in range(7): - xset = x[(F > setLim[set])] - Fset = F[(F > setLim[set])] - popt, _ = optimize.curve_fit(func, xset, Fset, p0=p0) - subset_shape_params[set] = popt[0] - subset_scale_params[set] = popt[1] - # peaks distribution - peaks_params = [1, np.mean(subset_shape_params), 0, np.mean(subset_scale_params)] - param_names = ["a", "c", "loc", "scale"] - peaks_params = {k: v for k, v in zip(param_names, peaks_params)} - peaks = stats.exponweib(**peaks_params) - # save the parameter info - peaks.params = peaks_params - peaks.subset_shape_params = subset_shape_params - peaks.subset_scale_params = subset_scale_params - return peaks - - -def automatic_hs_threshold( - peaks, - sampling_rate, - initial_threshold_range=(0.990, 0.995, 0.001), - max_refinement=5, -): - """ - Find the best significant wave height threshold for the - peaks-over-threshold method. - - This method was developed by: - - > Neary, V. S., S. Ahn, B. E. Seng, M. N. Allahdadi, T. Wang, Z. Yang and R. He (2020). - > "Characterization of Extreme Wave Conditions for Wave Energy Converter Design and Project Risk Assessment.” - > J. Mar. Sci. Eng. 2020, 8(4), 289; https://doi.org/10.3390/jmse8040289. - - please cite this paper if using this method. - - After all thresholds in the initial range are evaluated, the search - range is refined around the optimal point until either (i) there - is minimal change from the previous refinement results, (ii) the - number of data points become smaller than about 1 per year, or (iii) - the maximum number of iterations is reached. - - Parameters - ---------- - peaks: np.array - Peak values of the response time-series - sampling_rate: float - Sampling rate in hours. - initial_threshold_range: tuple - Initial range of thresholds to search. Described as - (min, max, step). - max_refinement: int - Maximum number of times to refine the search range. - - Returns - ------- - best_threshold: float - Threshold that results in the best correlation. - """ - if not isinstance(sampling_rate, (float, int)): - raise TypeError( - f"sampling_rate must be of type float or int. Got: {type(sampling_rate)}" - ) - if not isinstance(peaks, np.ndarray): - raise TypeError(f"peaks must be of type np.ndarray. Got: {type(peaks)}") - if not len(initial_threshold_range) == 3: - raise ValueError( - f"initial_threshold_range must be length 3. Got: {len(initial_threshold_range)}" - ) - if not isinstance(max_refinement, int): - raise TypeError( - f"max_refinement must be of type int. Got: {type(max_refinement)}" - ) - - range_min, range_max, range_step = initial_threshold_range - best_threshold = -1 - years = len(peaks) / (365.25 * 24 / sampling_rate) - - for i in range(max_refinement): - thresholds = np.arange(range_min, range_max, range_step) - correlations = [] - - for threshold in thresholds: - distribution = stats.genpareto - over_threshold = _peaks_over_threshold(peaks, threshold, sampling_rate) - rate_per_year = len(over_threshold) / years - if rate_per_year < 2: - break - distributions_parameters = distribution.fit(over_threshold, floc=0.0) - _, (_, _, correlation) = stats.probplot( - peaks, distributions_parameters, distribution, fit=True - ) - correlations.append(correlation) - - max_i = np.argmax(correlations) - minimal_change = np.abs(best_threshold - thresholds[max_i]) < 0.0005 - best_threshold = thresholds[max_i] - if minimal_change and i < max_refinement - 1: - break - range_step /= 10 - if max_i == len(thresholds) - 1: - range_min = thresholds[max_i - 1] - range_max = thresholds[max_i] + 5 * range_step - elif max_i == 0: - range_min = thresholds[max_i] - 9 * range_step - range_max = thresholds[max_i + 1] - else: - range_min = thresholds[max_i - 1] - range_max = thresholds[max_i + 1] - - best_threshold_unit = np.percentile(peaks, 100 * best_threshold, method="hazen") - return best_threshold, best_threshold_unit - - -def peaks_distribution_peaks_over_threshold(x, threshold=None): - """ - Estimate the peaks distribution using the peaks over threshold - method. - - This fits a generalized Pareto distribution to all the peaks above - the specified threshold. The distribution is only defined for values - above the threshold and therefore cannot be used to obtain integral - metrics such as the expected value. A typical choice of threshold is - 1.4 standard deviations above the mean. The peaks over threshold - distribution can be accessed through the `pot` field of the returned - peaks distribution. - - Parameters - ---------- - x : np.array - Global peaks. - threshold : float - Threshold value. Only peaks above this value will be used. - Default value calculated as: `np.mean(x) + 1.4 * np.std(x)` - - Returns - ------- - peaks: scipy.stats.rv_frozen - Probability distribution of the peaks. - """ - if not isinstance(x, np.ndarray): - raise TypeError(f"x must be of type np.ndarray. Got: {type(x)}") - if threshold is None: - threshold = np.mean(x) + 1.4 * np.std(x) - if not isinstance(threshold, float): - raise TypeError( - f"If specified, threshold must be of type float. Got: {type(threshold)}" - ) - - # peaks over threshold - x = np.sort(x) - pot = x[(x > threshold)] - threshold - npeaks = len(x) - npot = len(pot) - # Fit a generalized Pareto - pot_params = stats.genpareto.fit(pot, floc=0.0) - param_names = ["c", "loc", "scale"] - pot_params = {k: v for k, v in zip(param_names, pot_params)} - pot = stats.genpareto(**pot_params) - # save the parameter info - pot.params = pot_params - - # peaks - class _Peaks(stats.rv_continuous): - def __init__(self, *args, **kwargs): - self.pot = kwargs.pop("pot_distribution") - self.threshold = kwargs.pop("threshold") - super().__init__(*args, **kwargs) - - def _cdf(self, x): - x = np.atleast_1d(np.array(x)) - out = np.zeros(x.shape) - out[x < self.threshold] = np.NaN - xt = x[x >= self.threshold] - if xt.size != 0: - pot_ccdf = 1.0 - self.pot.cdf(xt - self.threshold) - prop_pot = npot / npeaks - out[x >= self.threshold] = 1.0 - (prop_pot * pot_ccdf) - return out - - peaks = _Peaks(name="peaks", pot_distribution=pot, threshold=threshold) - # save the peaks over threshold distribution - peaks.pot = pot - return peaks - - -def ste_peaks(peaks_distribution, npeaks): - """ - Estimate the short-term extreme distribution from the peaks - distribution. - - Parameters - ---------- - peaks_distribution: scipy.stats.rv_frozen - Probability distribution of the peaks. - npeaks : float - Number of peaks in short term period. - - Returns - ------- - ste: scipy.stats.rv_frozen - Short-term extreme distribution. - """ - if not callable(peaks_distribution.cdf): - raise TypeError("peaks_distribution must be a scipy.stat distribution.") - if not isinstance(npeaks, float): - raise TypeError(f"npeaks must be of type float. Got: {type(npeaks)}") - - class _ShortTermExtreme(stats.rv_continuous): - def __init__(self, *args, **kwargs): - self.peaks = kwargs.pop("peaks_distribution") - self.npeaks = kwargs.pop("npeaks") - super().__init__(*args, **kwargs) - - def _cdf(self, x): - peaks_cdf = np.array(self.peaks.cdf(x)) - peaks_cdf[np.isnan(peaks_cdf)] = 0.0 - if len(peaks_cdf) == 1: - peaks_cdf = peaks_cdf[0] - return peaks_cdf**self.npeaks - - ste = _ShortTermExtreme( - name="short_term_extreme", peaks_distribution=peaks_distribution, npeaks=npeaks - ) - return ste - - -def block_maxima(t, x, t_st): - """ - Find the block maxima of a time-series. - - The timeseries (t,x) is divided into blocks of length t_st, and the - maxima of each bloock is returned. - - Parameters - ---------- - t : np.array - Time array. - x : np.array - global peaks timeseries. - t_st : float - Short-term period. - - Returns - ------- - block_maxima: np.array - Block maxima (i.e. largest peak in each block). - """ - if not isinstance(t, np.ndarray): - raise TypeError(f"t must be of type np.ndarray. Got: {type(t)}") - if not isinstance(x, np.ndarray): - raise TypeError(f"x must be of type np.ndarray. Got: {type(x)}") - if not isinstance(t_st, float): - raise TypeError(f"t_st must be of type float. Got: {type(t_st)}") - - nblock = int(t[-1] / t_st) - block_maxima = np.zeros(int(nblock)) - for iblock in range(nblock): - ix = x[(t >= iblock * t_st) & (t < (iblock + 1) * t_st)] - block_maxima[iblock] = np.max(ix) - return block_maxima - - -def ste_block_maxima_gev(block_maxima): - """ - Approximate the short-term extreme distribution using the block - maxima method and the Generalized Extreme Value distribution. - - Parameters - ---------- - block_maxima: np.array - Block maxima (i.e. largest peak in each block). - - Returns - ------- - ste: scipy.stats.rv_frozen - Short-term extreme distribution. - """ - if not isinstance(block_maxima, np.ndarray): - raise TypeError( - f"block_maxima must be of type np.ndarray. Got: {type(block_maxima)}" - ) - - ste_params = stats.genextreme.fit(block_maxima) - param_names = ["c", "loc", "scale"] - ste_params = {k: v for k, v in zip(param_names, ste_params)} - ste = stats.genextreme(**ste_params) - ste.params = ste_params - return ste - - -def ste_block_maxima_gumbel(block_maxima): - """ - Approximate the short-term extreme distribution using the block - maxima method and the Gumbel (right) distribution. - - Parameters - ---------- - block_maxima: np.array - Block maxima (i.e. largest peak in each block). - - Returns - ------- - ste: scipy.stats.rv_frozen - Short-term extreme distribution. - """ - if not isinstance(block_maxima, np.ndarray): - raise TypeError( - f"block_maxima must be of type np.ndarray. Got: {type(block_maxima)}" - ) - - ste_params = stats.gumbel_r.fit(block_maxima) - param_names = ["loc", "scale"] - ste_params = {k: v for k, v in zip(param_names, ste_params)} - ste = stats.gumbel_r(**ste_params) - ste.params = ste_params - return ste - - -def ste(t, data, t_st, method): - """ - Alias for `short_term_extreme`. - """ - ste = short_term_extreme(t, data, t_st, method) - return ste - - -def short_term_extreme(t, data, t_st, method): - """ - Approximate the short-term extreme distribution from a - timeseries of the response using chosen method. - - The availabe methods are: 'peaks_weibull', 'peaks_weibull_tail_fit', - 'peaks_over_threshold', 'block_maxima_gev', and 'block_maxima_gumbel'. - For the block maxima methods the timeseries needs to be many times - longer than the short-term period. For the peak-fitting methods the - timeseries can be of arbitrary length. - - Parameters - ---------- - t: np.array - Time array. - data: np.array - Response timeseries. - t_st: float - Short-term period. - method : string - Method for estimating the short-term extreme distribution. - - Returns - ------- - ste: scipy.stats.rv_frozen - Short-term extreme distribution. - """ - if not isinstance(t, np.ndarray): - raise TypeError(f"t must be of type np.ndarray. Got: {type(t)}") - if not isinstance(data, np.ndarray): - raise TypeError(f"data must be of type np.ndarray. Got: {type(data)}") - if not isinstance(t_st, float): - raise TypeError(f"t_st must be of type float. Got: {type(t_st)}") - if not isinstance(method, str): - raise TypeError(f"method must be of type string. Got: {type(method)}") - - peaks_methods = { - "peaks_weibull": peaks_distribution_weibull, - "peaks_weibull_tail_fit": peaks_distribution_weibull_tail_fit, - "peaks_over_threshold": peaks_distribution_peaks_over_threshold, - } - blockmaxima_methods = { - "block_maxima_gev": ste_block_maxima_gev, - "block_maxima_gumbel": ste_block_maxima_gumbel, - } - - if method in peaks_methods.keys(): - fit_peaks = peaks_methods[method] - _, peaks = global_peaks(t, data) - npeaks = len(peaks) - time = t[-1] - t[0] - nst = number_of_short_term_peaks(npeaks, time, t_st) - peaks_dist = fit_peaks(peaks) - ste = ste_peaks(peaks_dist, nst) - elif method in blockmaxima_methods.keys(): - fit_maxima = blockmaxima_methods[method] - maxima = block_maxima(t, data, t_st) - ste = fit_maxima(maxima) - else: - print("Passed `method` not found.") - return ste - - -def full_seastate_long_term_extreme(ste, weights): - """ - Return the long-term extreme distribution of a response of - interest using the full sea state approach. - - Parameters - ---------- - ste: list[scipy.stats.rv_frozen] - Short-term extreme distribution of the quantity of interest for - each sample sea state. - weights: list, np.ndarray - The weights from the full sea state sampling - - Returns - ------- - ste: scipy.stats.rv_frozen - Short-term extreme distribution. - """ - if not isinstance(ste, list): - raise TypeError( - f"ste must be of type list[scipy.stats.rv_frozen]. Got: {type(ste)}" - ) - if not isinstance(weights, (list, np.ndarray)): - raise TypeError( - f"weights must be of type list or np.ndarray. Got: {type(weights)}" - ) - - class _LongTermExtreme(stats.rv_continuous): - def __init__(self, *args, **kwargs): - weights = kwargs.pop("weights") - # make sure weights add to 1.0 - self.weights = weights / np.sum(weights) - self.ste = kwargs.pop("ste") - self.n = len(self.weights) - super().__init__(*args, **kwargs) - - def _cdf(self, x): - f = 0.0 - for w_i, ste_i in zip(self.weights, self.ste): - f += w_i * ste_i.cdf(x) - return f - - return _LongTermExtreme(name="long_term_extreme", weights=weights, ste=ste) - - -def mler_coefficients( - rao, wave_spectrum, response_desired, frequency_dimension="", to_pandas=True -): - """ - Calculate MLER (most likely extreme response) coefficients from a - sea state spectrum and a response RAO. - - Parameters - ---------- - rao: numpy ndarray - Response amplitude operator. - wave_spectrum: pandas Series, pandas DataFrame, xarray DataArray, or xarray Dataset - Wave spectral density [m^2/Hz] indexed by frequency [Hz]. - DataFrame and Dataset inputs should only have one data variable - response_desired: int or float - Desired response, units should correspond to a motion RAO or - units of force for a force RAO. - frequency_dimension: string (optional) - Name of the xarray dimension corresponding to frequency. If not supplied, - defaults to the first dimension. Does not affect pandas input. - to_pandas: bool (optional) - Flag to output pandas instead of xarray. Default = True. - - Returns - ------- - mler: pandas DataFrame or xarray Dataset - DataFrame containing conditioned wave spectral amplitude - coefficient [m^2-s], and Phase [rad] indexed by freq [Hz]. - """ - try: - rao = np.array(rao) - except: - pass - - if not isinstance(rao, np.ndarray): - raise TypeError(f"rao must be of type np.ndarray. Got: {type(rao)}") - if not isinstance( - wave_spectrum, (pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset) - ): - raise TypeError( - f"wave_spectrum must be of type pd.Series, pd.DataFrame, xr.DataArray, or xr.Dataset. Got: {type(wave_spectrum)}" - ) - if not isinstance(response_desired, (int, float)): - raise TypeError( - f"response_desired must be of type int or float. Got: {type(response_desired)}" - ) - if not isinstance(to_pandas, bool): - raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") - - # Convert input to xarray DataArray - if isinstance(wave_spectrum, (pd.Series, pd.DataFrame)): - wave_spectrum = wave_spectrum.squeeze().to_xarray() - - if isinstance(wave_spectrum, xr.Dataset): - if len(wave_spectrum.data_vars) > 1: - raise ValueError( - f"wave_spectrum can only contain one variable. Got {list(wave_spectrum.data_vars)}." - ) - wave_spectrum = wave_spectrum.to_array() - - if frequency_dimension == "": - frequency_dimension = list(wave_spectrum.coords)[0] - - # convert from Hz to rad/s - freq_hz = wave_spectrum.coords[frequency_dimension].values - freq = freq_hz * (2 * np.pi) - wave_spectrum = wave_spectrum.to_numpy() / (2 * np.pi) - - # get frequency step - dw = 2.0 * np.pi / (len(freq) - 1) - - # Note: waves.A is "S" in Quon2016; 'waves' naming convention - # matches WEC-Sim conventions (EWQ) - # Response spectrum [(response units)^2-s/rad] -- Quon2016 Eqn. 3 - spectrum_r = np.abs(rao) ** 2 * (2 * wave_spectrum) - - # calculate spectral moments and other important spectral values. - m0 = (frequency_moment(pd.Series(spectrum_r, index=freq), 0)).iloc[0, 0] - m1 = (frequency_moment(pd.Series(spectrum_r, index=freq), 1)).iloc[0, 0] - m2 = (frequency_moment(pd.Series(spectrum_r, index=freq), 2)).iloc[0, 0] - wBar = m1 / m0 - - # calculate coefficient A_{R,n} [(response units)^-1] -- Quon2016 Eqn. 8 - # Drummen version. Dietz has negative of this. - _coeff_a_rn = ( - np.abs(rao) - * np.sqrt(2 * wave_spectrum * dw) - * ((m2 - freq * m1) + wBar * (freq * m0 - m1)) - / (m0 * m2 - m1**2) - ) - - # save the new spectral info to pass out - # Phase delay should be a positive number in this convention (AP) - _phase = -np.unwrap(np.angle(rao)) - - # for negative values of Amp, shift phase by pi and flip sign - # for negative amplitudes, add a pi phase shift, then flip sign on - # negative Amplitudes - _phase[_coeff_a_rn < 0] -= np.pi - _coeff_a_rn[_coeff_a_rn < 0] *= -1 - - # calculate the conditioned spectrum [m^2-s/rad] - _s = wave_spectrum * _coeff_a_rn**2 * response_desired**2 - _a = 2 * wave_spectrum * _coeff_a_rn**2 * response_desired**2 - - # if the response amplitude we ask for is negative, we will add - # a pi phase shift to the phase information. This is because - # the sign of self.desiredRespAmp is lost in the squaring above. - # Ordinarily this would be put into the final equation, but we - # are shaping the wave information so that it is buried in the - # new spectral information, S. (AP) - if response_desired < 0: - _phase += np.pi - - mler = xr.Dataset( - data_vars={ - "WaveSpectrum": (["frequency"], _s), - "Phase": (["frequency"], _phase), - }, - coords={"frequency": freq_hz}, - ) - mler.fillna(0) - - if to_pandas: - mler = mler.to_pandas() - - return mler - - -def mler_simulation(parameters=None): - """ - Define the simulation parameters that are used in various MLER - functionalities. - - See `extreme_response_contour_example.ipynb` example for how this is - useful. If no input is given, then default values are returned. - - Parameters - ---------- - parameters: dict (optional) - Simulation parameters. - Keys: - ----- - 'startTime': starting time [s] - 'endTime': ending time [s] - 'dT': time-step size [s] - 'T0': time of maximum event [s] - 'startx': start of simulation space [m] - 'endX': end of simulation space [m] - 'dX': horizontal spacing [m] - 'X': position of maximum event [m] - - Returns - ------- - sim: dict - Simulation parameters including spatial and time calculated - arrays. - """ - if not isinstance(parameters, (type(None), dict)): - raise TypeError( - f"If specified, parameters must be of type dict. Got: {type(parameters)}" - ) - - sim = {} - - if parameters == None: - sim["startTime"] = -150.0 # [s] Starting time - sim["endTime"] = 150.0 # [s] Ending time - sim["dT"] = 1.0 # [s] Time-step size - sim["T0"] = 0.0 # [s] Time of maximum event - - sim["startX"] = -300.0 # [m] Start of simulation space - sim["endX"] = 300.0 # [m] End of simulation space - sim["dX"] = 1.0 # [m] Horiontal spacing - sim["X0"] = 0.0 # [m] Position of maximum event - else: - sim = parameters - - # maximum timestep index - sim["maxIT"] = int(np.ceil((sim["endTime"] - sim["startTime"]) / sim["dT"] + 1)) - sim["T"] = np.linspace(sim["startTime"], sim["endTime"], sim["maxIT"]) - - sim["maxIX"] = int(np.ceil((sim["endX"] - sim["startX"]) / sim["dX"] + 1)) - sim["X"] = np.linspace(sim["startX"], sim["endX"], sim["maxIX"]) - - return sim - - -def mler_wave_amp_normalize( - wave_amp, mler, sim, k, frequency_dimension="", to_pandas=True -): - """ - Function that renormalizes the incoming amplitude of the MLER wave - to the desired peak height (peak to MSL). - - Parameters - ---------- - wave_amp: float - Desired wave amplitude (peak to MSL). - mler: pandas DataFrame or xarray Dataset - MLER coefficients generated by 'mler_coefficients' function. - sim: dict - Simulation parameters formatted by output from - 'mler_simulation'. - k: numpy ndarray - Wave number - frequency_dimension: string (optional) - Name of the xarray dimension corresponding to frequency. If not supplied, - defaults to the first dimension. Does not affect pandas input. - to_pandas: bool (optional) - Flag to output pandas instead of xarray. Default = True. - - Returns - ------- - mler_norm : pandas DataFrame or xarray Dataset - MLER coefficients - """ - try: - k = np.array(k) - except: - pass - if not isinstance(mler, (pd.DataFrame, xr.Dataset)): - raise TypeError( - f"mler must be of type pd.DataFrame or xr.Dataset. Got: {type(mler)}" - ) - if not isinstance(wave_amp, (int, float)): - raise TypeError(f"wave_amp must be of type int or float. Got: {type(wave_amp)}") - if not isinstance(sim, dict): - raise TypeError(f"sim must be of type dict. Got: {type(sim)}") - if not isinstance(k, np.ndarray): - raise TypeError(f"k must be of type ndarray. Got: {type(k)}") - if not isinstance(to_pandas, bool): - raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") - - # If input is pandas, convert to xarray - if isinstance(mler, pd.DataFrame): - mler = mler.to_xarray() - - if frequency_dimension == "": - frequency_dimension = list(mler.coords)[0] - freq = mler.coords[frequency_dimension].values * 2 * np.pi - dw = (max(freq) - min(freq)) / (len(freq) - 1) # get delta - - wave_amp_time = np.zeros((sim["maxIX"], sim["maxIT"])) - for ix, x in enumerate(sim["X"]): - for it, t in enumerate(sim["T"]): - # conditioned wave - wave_amp_time[ix, it] = np.sum( - np.sqrt(2 * mler["WaveSpectrum"] * dw) - * np.cos(freq * (t - sim["T0"]) - k * (x - sim["X0"]) + mler["Phase"]) - ) - - tmp_max_amp = np.max(np.abs(wave_amp_time)) - - # renormalization of wave amplitudes - rescale_fact = np.abs(wave_amp) / np.abs(tmp_max_amp) - - # rescale the wave spectral amplitude coefficients - mler_norm = mler["WaveSpectrum"] * rescale_fact**2 - mler_norm = mler_norm.to_dataset() - mler_norm = mler_norm.assign({"Phase": (frequency_dimension, mler["Phase"].data)}) - - if to_pandas: - mler_norm = mler_norm.to_pandas() - - return mler_norm - - -def mler_export_time_series(rao, mler, sim, k, frequency_dimension="", to_pandas=True): - """ - Generate the wave amplitude time series at X0 from the calculated - MLER coefficients - - Parameters - ---------- - rao: numpy ndarray - Response amplitude operator. - mler: pandas DataFrame or xarray Dataset - MLER coefficients dataframe generated from an MLER function. - sim: dict - Simulation parameters formatted by output from - 'mler_simulation'. - k: numpy ndarray - Wave number. - frequency_dimension: string (optional) - Name of the xarray dimension corresponding to frequency. If not supplied, - defaults to the first dimension. Does not affect pandas input. - to_pandas: bool (optional) - Flag to output pandas instead of xarray. Default = True. - - Returns - ------- - mler_ts: pandas DataFrame or xarray Dataset - Time series of wave height [m] and linear response [*] indexed - by time [s]. - - """ - try: - rao = np.array(rao) - except: - pass - try: - k = np.array(k) - except: - pass - if not isinstance(rao, np.ndarray): - raise TypeError(f"rao must be of type ndarray. Got: {type(rao)}") - if not isinstance(mler, (pd.DataFrame, xr.Dataset)): - raise TypeError( - f"mler must be of type pd.DataFrame or xr.Dataset. Got: {type(mler)}" - ) - if not isinstance(sim, dict): - raise TypeError(f"sim must be of type dict. Got: {type(sim)}") - if not isinstance(k, np.ndarray): - raise TypeError(f"k must be of type ndarray. Got: {type(k)}") - if not isinstance(to_pandas, bool): - raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") - - # If input is pandas, convert to xarray - if isinstance(mler, pd.DataFrame): - mler = mler.to_xarray() - - if frequency_dimension == "": - frequency_dimension = list(mler.coords)[0] - freq = mler.coords[frequency_dimension].values * 2 * np.pi - dw = (max(freq) - min(freq)) / (len(freq) - 1) # get delta - - # calculate the series - wave_amp_time = np.zeros((sim["maxIT"], 2)) - xi = sim["X0"] - for i, ti in enumerate(sim["T"]): - # conditioned wave - wave_amp_time[i, 0] = np.sum( - np.sqrt(2 * mler["WaveSpectrum"] * dw) - * np.cos(freq * (ti - sim["T0"]) + mler["Phase"] - k * (xi - sim["X0"])) - ) - # Response calculation - wave_amp_time[i, 1] = np.sum( - np.sqrt(2 * mler["WaveSpectrum"] * dw) - * np.abs(rao) - * np.cos(freq * (ti - sim["T0"]) - k * (xi - sim["X0"])) - ) - - mler_ts = xr.Dataset( - data_vars={ - "WaveHeight": (["time"], wave_amp_time[:, 0]), - "LinearResponse": (["time"], wave_amp_time[:, 1]), - }, - coords={"time": sim["T"]}, - ) - - if to_pandas: - mler_ts = mler_ts.to_pandas() - - return mler_ts - - -def return_year_value(ppf, return_year, short_term_period_hr): - """ - Calculate the value from a given distribution corresponding to a particular - return year. - - Parameters - ---------- - ppf: callable function of 1 argument - Percentage Point Function (inverse CDF) of short term distribution. - return_year: int, float - Return period in years. - short_term_period_hr: int, float - Short term period the distribution is created from in hours. - - Returns - ------- - value: float - The value corresponding to the return period from the distribution. - """ - if not callable(ppf): - raise TypeError("ppf must be a callable Percentage Point Function") - if not isinstance(return_year, (float, int)): - raise TypeError( - f"return_year must be of type float or int. Got: {type(return_year)}" - ) - if not isinstance(short_term_period_hr, (float, int)): - raise TypeError( - f"short_term_period_hr must be of type float or int. Got: {type(short_term_period_hr)}" - ) - - p = 1 / (return_year * 365.25 * 24 / short_term_period_hr) - - return ppf(1 - p) diff --git a/mhkit/loads/extreme/__init__.py b/mhkit/loads/extreme/__init__.py new file mode 100644 index 000000000..318a2cdc8 --- /dev/null +++ b/mhkit/loads/extreme/__init__.py @@ -0,0 +1,39 @@ +""" +This package provides tools and functions for extreme value analysis +and wave data statistics. + +It includes methods for calculating peaks over threshold, estimating +short-term extreme distributions,and performing wave amplitude +normalization for most likely extreme response analysis. +""" + +from mhkit.loads.extreme.extremes import ( + ste_peaks, + block_maxima, + ste_block_maxima_gev, + ste_block_maxima_gumbel, + ste, + short_term_extreme, + full_seastate_long_term_extreme, +) + +from mhkit.loads.extreme.mler import ( + mler_coefficients, + mler_simulation, + mler_wave_amp_normalize, + mler_export_time_series, +) + +from mhkit.loads.extreme.peaks import ( + _peaks_over_threshold, + global_peaks, + number_of_short_term_peaks, + peaks_distribution_weibull, + peaks_distribution_weibull_tail_fit, + automatic_hs_threshold, + peaks_distribution_peaks_over_threshold, +) + +from mhkit.loads.extreme.sample import ( + return_year_value, +) diff --git a/mhkit/loads/extreme/extremes.py b/mhkit/loads/extreme/extremes.py new file mode 100644 index 000000000..d89545c9d --- /dev/null +++ b/mhkit/loads/extreme/extremes.py @@ -0,0 +1,293 @@ +""" +This module provides functionality for estimating the short-term and +long-term extreme distributions of responses in a time series. It +includes methods for analyzing peaks, block maxima, and applying +statistical distributions to model extreme events. The module supports +various methods for short-term extreme estimation, including peaks +fitting with Weibull, tail fitting, peaks over threshold, and block +maxima methods with GEV (Generalized Extreme Value) and Gumbel +distributions. Additionally, it offers functionality to approximate +the long-term extreme distribution by weighting short-term extremes +across different sea states. + +Functions: +- ste_peaks: Estimates the short-term extreme distribution from peaks + distribution using specified statistical methods. +- block_maxima: Finds the block maxima in a time-series data to be used + in block maxima methods. +- ste_block_maxima_gev: Approximates the short-term extreme distribution + using the block maxima method with the GEV distribution. +- ste_block_maxima_gumbel: Approximates the short-term extreme + distribution using the block maxima method with the Gumbel distribution. +- ste: Alias for `short_term_extreme`, facilitating easier access to the + primary functionality of estimating short-term extremes. +- short_term_extreme: Core function to approximate the short-term extreme + distribution from a time series using chosen methods. +- full_seastate_long_term_extreme: Combines short-term extreme + distributions using weights to estimate the long-term extreme distribution. +""" + +from typing import Union + +import numpy as np +from scipy import stats +from scipy.stats import rv_continuous + +import mhkit.loads.extreme.peaks as peaks_distributions + + +def ste_peaks(peaks_distribution: rv_continuous, npeaks: float) -> rv_continuous: + """ + Estimate the short-term extreme distribution from the peaks + distribution. + + Parameters + ---------- + peaks_distribution: scipy.stats.rv_frozen + Probability distribution of the peaks. + npeaks : float + Number of peaks in short term period. + + Returns + ------- + short_term_extreme: scipy.stats.rv_frozen + Short-term extreme distribution. + """ + if not callable(peaks_distribution.cdf): + raise TypeError("peaks_distribution must be a scipy.stat distribution.") + if not isinstance(npeaks, float): + raise TypeError(f"npeaks must be of type float. Got: {type(npeaks)}") + + class _ShortTermExtreme(stats.rv_continuous): + def __init__(self, *args, **kwargs): + self.peaks = kwargs.pop("peaks_distribution") + self.npeaks = kwargs.pop("npeaks") + super().__init__(*args, **kwargs) + + def _cdf(self, x, *args, **kwargs): + peaks_cdf = np.array(self.peaks.cdf(x, *args, **kwargs)) + peaks_cdf[np.isnan(peaks_cdf)] = 0.0 + if len(peaks_cdf) == 1: + peaks_cdf = peaks_cdf[0] + return peaks_cdf**self.npeaks + + short_term_extreme_peaks = _ShortTermExtreme( + name="short_term_extreme", peaks_distribution=peaks_distribution, npeaks=npeaks + ) + return short_term_extreme_peaks + + +def block_maxima( + time: np.ndarray, global_peaks_data: np.ndarray, time_st: float +) -> np.ndarray: + """ + Find the block maxima of a time-series. + + The timeseries (time, global_peaks) is divided into blocks of length t_st, and the + maxima of each bloock is returned. + + Parameters + ---------- + time : np.array + Time array. + global_peaks_data : np.array + global peaks timeseries. + time_st : float + Short-term period. + + Returns + ------- + block_max: np.array + Block maxima (i.e. largest peak in each block). + """ + if not isinstance(time, np.ndarray): + raise TypeError(f"time must be of type np.ndarray. Got: {type(time)}") + if not isinstance(global_peaks_data, np.ndarray): + raise TypeError( + f"global_peaks_data must be of type np.ndarray. Got: {type(global_peaks_data)}" + ) + if not isinstance(time_st, float): + raise TypeError(f"time_st must be of type float. Got: {type(time_st)}") + + nblock = int(time[-1] / time_st) + block_max = np.zeros(int(nblock)) + for iblock in range(nblock): + i_x = global_peaks_data[ + (time >= iblock * time_st) & (time < (iblock + 1) * time_st) + ] + block_max[iblock] = np.max(i_x) + return block_max + + +def ste_block_maxima_gev(block_max): + """ + Approximate the short-term extreme distribution using the block + maxima method and the Generalized Extreme Value distribution. + + Parameters + ---------- + block_max: np.array + Block maxima (i.e. largest peak in each block). + + Returns + ------- + short_term_extreme_rv: scipy.stats.rv_frozen + Short-term extreme distribution. + """ + if not isinstance(block_max, np.ndarray): + raise TypeError(f"block_max must be of type np.ndarray. Got: {type(block_max)}") + + ste_params = stats.genextreme.fit(block_max) + param_names = ["c", "loc", "scale"] + ste_params = dict(zip(param_names, ste_params)) + short_term_extreme_rv = stats.genextreme(**ste_params) + short_term_extreme_rv.params = ste_params + return short_term_extreme_rv + + +def ste_block_maxima_gumbel(block_max): + """ + Approximate the short-term extreme distribution using the block + maxima method and the Gumbel (right) distribution. + + Parameters + ---------- + block_max: np.array + Block maxima (i.e. largest peak in each block). + + Returns + ------- + ste: scipy.stats.rv_frozen + Short-term extreme distribution. + """ + if not isinstance(block_max, np.ndarray): + raise TypeError(f"block_max must be of type np.ndarray. Got: {type(block_max)}") + + ste_params = stats.gumbel_r.fit(block_max) + param_names = ["loc", "scale"] + ste_params = dict(zip(param_names, ste_params)) + short_term_extreme_rv = stats.gumbel_r(**ste_params) + short_term_extreme_rv.params = ste_params + return short_term_extreme_rv + + +def ste(time: np.ndarray, data: np.ndarray, t_st: float, method: str) -> rv_continuous: + """ + Alias for `short_term_extreme`. + """ + ste_dist = short_term_extreme(time, data, t_st, method) + return ste_dist + + +def short_term_extreme( + time: np.ndarray, data: np.ndarray, t_st: float, method: str +) -> Union[rv_continuous, None]: + """ + Approximate the short-term extreme distribution from a + timeseries of the response using chosen method. + + The availabe methods are: 'peaks_weibull', 'peaks_weibull_tail_fit', + 'peaks_over_threshold', 'block_maxima_gev', and 'block_maxima_gumbel'. + For the block maxima methods the timeseries needs to be many times + longer than the short-term period. For the peak-fitting methods the + timeseries can be of arbitrary length. + + Parameters + ---------- + time: np.array + Time array. + data: np.array + Response timeseries. + t_st: float + Short-term period. + method : string + Method for estimating the short-term extreme distribution. + + Returns + ------- + short_term_extreme_dist: scipy.stats.rv_frozen + Short-term extreme distribution. + """ + if not isinstance(time, np.ndarray): + raise TypeError(f"time must be of type np.ndarray. Got: {type(time)}") + if not isinstance(data, np.ndarray): + raise TypeError(f"data must be of type np.ndarray. Got: {type(data)}") + if not isinstance(t_st, float): + raise TypeError(f"t_st must be of type float. Got: {type(t_st)}") + if not isinstance(method, str): + raise TypeError(f"method must be of type string. Got: {type(method)}") + + peaks_methods = { + "peaks_weibull": peaks_distributions.peaks_distribution_weibull, + "peaks_weibull_tail_fit": peaks_distributions.peaks_distribution_weibull_tail_fit, + "peaks_over_threshold": peaks_distributions.peaks_distribution_peaks_over_threshold, + } + blockmaxima_methods = { + "block_maxima_gev": ste_block_maxima_gev, + "block_maxima_gumbel": ste_block_maxima_gumbel, + } + + if method in peaks_methods: + fit_peaks = peaks_methods[method] + _, peaks = peaks_distributions.global_peaks(time, data) + npeaks = len(peaks) + time = time[-1] - time[0] + nst = peaks_distributions.number_of_short_term_peaks(npeaks, time, t_st) + peaks_dist = fit_peaks(peaks) + short_term_extreme_dist = ste_peaks(peaks_dist, nst) + elif method in blockmaxima_methods: + fit_maxima = blockmaxima_methods[method] + maxima = block_maxima(time, data, t_st) + short_term_extreme_dist = fit_maxima(maxima) + else: + print("Passed `method` not found.") + return short_term_extreme_dist + + +def full_seastate_long_term_extreme(short_term_extreme_dist, weights): + """ + Return the long-term extreme distribution of a response of + interest using the full sea state approach. + + Parameters + ---------- + ste: list[scipy.stats.rv_frozen] + Short-term extreme distribution of the quantity of interest for + each sample sea state. + weights: list, np.ndarray + The weights from the full sea state sampling + + Returns + ------- + ste: scipy.stats.rv_frozen + Short-term extreme distribution. + """ + if not isinstance(short_term_extreme_dist, list): + raise TypeError( + "short_term_extreme_dist must be of type list[scipy.stats.rv_frozen]." + + f"Got: {type(short_term_extreme_dist)}" + ) + if not isinstance(weights, (list, np.ndarray)): + raise TypeError( + f"weights must be of type list or np.ndarray. Got: {type(weights)}" + ) + + class _LongTermExtreme(stats.rv_continuous): + def __init__(self, *args, **kwargs): + weights = kwargs.pop("weights") + # make sure weights add to 1.0 + self.weights = weights / np.sum(weights) + self.ste = kwargs.pop("ste") + # Disabled bc not sure where/ how n is applied + self.n = len(self.weights) # pylint: disable=invalid-name + super().__init__(*args, **kwargs) + + def _cdf(self, x, *args, **kwargs): + weighted_cdf = 0.0 + for w_i, ste_i in zip(self.weights, self.ste): + weighted_cdf += w_i * ste_i.cdf(x, *args, **kwargs) + return weighted_cdf + + return _LongTermExtreme( + name="long_term_extreme", weights=weights, ste=short_term_extreme_dist + ) diff --git a/mhkit/loads/extreme/mler.py b/mhkit/loads/extreme/mler.py new file mode 100644 index 000000000..2922fc3b9 --- /dev/null +++ b/mhkit/loads/extreme/mler.py @@ -0,0 +1,458 @@ +""" +This module provides functionalities to calculate and analyze Most +Likely Extreme Response (MLER) coefficients for wave energy converter +design and risk assessment. It includes functions to: + + - Calculate MLER coefficients (`mler_coefficients`) from a sea state + spectrum and a response Amplitude Response Operator (ARO). + - Define and manipulate simulation parameters (`mler_simulation`) used + across various MLER analyses. + - Renormalize the incoming amplitude of the MLER wave + (`mler_wave_amp_normalize`) to match the desired peak height for more + accurate modeling and analysis. + - Export the wave amplitude time series (`mler_export_time_series`) + based on the calculated MLER coefficients for further analysis or + visualization. +""" + +from typing import Union, List, Optional, Dict, Any + +import pandas as pd +import xarray as xr +import numpy as np +from numpy.typing import NDArray + +from mhkit.wave.resource import frequency_moment + +SimulationParameters = Dict[str, Union[float, int, np.ndarray]] + + +def _calculate_spectral_values( + freq_hz: Union[np.ndarray, pd.Series], + rao_array: np.ndarray, + wave_spectrum: Union[pd.Series, pd.DataFrame, np.ndarray], + d_w: float, +) -> Dict[str, Union[float, np.ndarray]]: + """ + Calculates spectral moments and the coefficient A_{R,n} from a given sea state spectrum + and a response RAO. + + Parameters + ---------- + spectrum_r : Union[np.ndarray, pd.Series] + Real part of the spectrum. + freq_hz : Union[np.ndarray, pd.Series] + Frequencies in Hz corresponding to spectrum_r. + rao : numpy ndarray + Response Amplitude Operator (RAO) of the system. + wave_spectrum : Union[pd.Series, pd.DataFrame, np.ndarray] + Wave spectrum values corresponding to freq_hz. + d_w : float + Delta omega, the frequency interval. + + Returns + ------- + Dict[str, Union[float, np.ndarray]] + A dictionary containing spectral moments (m_0, m_1, m_2) and the coefficient A_{R,n}. + """ + # Note: waves.A is "S" in Quon2016; 'waves' naming convention + # matches WEC-Sim conventions (EWQ) + # Response spectrum [(response units)^2-s/rad] -- Quon2016 Eqn. 3 + spectrum_r = np.abs(rao_array) ** 2 * (2 * wave_spectrum) + + # Calculate spectral moments + m_0 = frequency_moment(pd.Series(spectrum_r, index=freq_hz), 0).iloc[0, 0] + m_1 = frequency_moment(pd.Series(spectrum_r, index=freq_hz), 1).iloc[0, 0] + m_2 = frequency_moment(pd.Series(spectrum_r, index=freq_hz), 2).iloc[0, 0] + + # Calculate coefficient A_{R,n} + coeff_a_rn = ( + np.abs(rao_array) + * np.sqrt(2 * wave_spectrum * d_w) + * ((m_2 - freq_hz * m_1) + (m_1 / m_0) * (freq_hz * m_0 - m_1)) + / (m_0 * m_2 - m_1**2) + ) + + return { + "m_0": m_0, + "m_1": m_1, + "m_2": m_2, + "coeff_a_rn": coeff_a_rn, + } + + +def mler_coefficients( + rao: Union[NDArray[np.float_], pd.Series, List[float], List[int], xr.DataArray], + wave_spectrum: Union[pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset], + response_desired: Union[int, float], + frequency_dimension: str = "", + to_pandas: bool = True, +) -> Union[pd.DataFrame, xr.Dataset]: + """ + Calculate MLER (most likely extreme response) coefficients from a + sea state spectrum and a response RAO. + + Parameters + ---------- + rao: numpy ndarray + Response amplitude operator. + wave_spectrum: pandas Series, pandas DataFrame, xarray DataArray, or xarray Dataset + Wave spectral density [m^2/Hz] indexed by frequency [Hz]. + DataFrame and Dataset inputs should only have one data variable + response_desired: int or float + Desired response, units should correspond to a motion RAO or + units of force for a force RAO. + frequency_dimension: string (optional) + Name of the xarray dimension corresponding to frequency. If not supplied, + defaults to the first dimension. Does not affect pandas input. + to_pandas: bool (optional) + Flag to output pandas instead of xarray. Default = True. + + Returns + ------- + mler: pandas DataFrame or xarray Dataset + DataFrame containing conditioned wave spectral amplitude + coefficient [m^2-s], and Phase [rad] indexed by freq [Hz]. + """ + + if isinstance(rao, (list, pd.Series, xr.DataArray)): + rao_array = np.array(rao) + elif isinstance(rao, np.ndarray): + rao_array = rao + else: + raise TypeError( + "Unsupported type for 'rao'. Must be one of: list, pd.Series, \ + np.ndarray, xr.DataArray." + ) + + if not isinstance(rao_array, np.ndarray): + raise TypeError(f"rao must be of type np.ndarray. Got: {type(rao_array)}") + if not isinstance( + wave_spectrum, (pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset) + ): + raise TypeError( + f"wave_spectrum must be of type pd.Series, pd.DataFrame, " + f"xr.DataArray, or xr.Dataset. Got: {type(wave_spectrum)}" + ) + if not isinstance(response_desired, (int, float)): + raise TypeError( + f"response_desired must be of type int or float. Got: {type(response_desired)}" + ) + if not isinstance(to_pandas, bool): + raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") + + # Convert input to xarray DataArray + if isinstance(wave_spectrum, (pd.Series, pd.DataFrame)): + wave_spectrum = wave_spectrum.squeeze().to_xarray() + + if isinstance(wave_spectrum, xr.Dataset): + if len(wave_spectrum.data_vars) > 1: + raise ValueError( + f"wave_spectrum can only contain one variable. Got {list(wave_spectrum.data_vars)}." + ) + wave_spectrum = wave_spectrum.to_array() + + if frequency_dimension == "": + frequency_dimension = list(wave_spectrum.coords)[0] + + # convert from Hz to rad/s + freq_hz = wave_spectrum.coords[frequency_dimension].values * (2 * np.pi) + wave_spectrum = wave_spectrum.to_numpy() / (2 * np.pi) + + # get frequency step + d_w = 2.0 * np.pi / (len(freq_hz) - 1) + + spectral_values = _calculate_spectral_values(freq_hz, rao_array, wave_spectrum, d_w) + + # save the new spectral info to pass out + # Phase delay should be a positive number in this convention (AP) + _phase = -np.unwrap(np.angle(rao_array)) + + # for negative values of Amp, shift phase by pi and flip sign + # for negative amplitudes, add a pi phase shift, then flip sign on + # negative Amplitudes + _phase[spectral_values["coeff_a_rn"] < 0] -= np.pi + spectral_values["coeff_a_rn"][spectral_values["coeff_a_rn"] < 0] *= -1 + + # calculate the conditioned spectrum [m^2-s/rad] + conditioned_spectrum = ( + wave_spectrum * spectral_values["coeff_a_rn"] ** 2 * response_desired**2 + ) + + # if the response amplitude we ask for is negative, we will add + # a pi phase shift to the phase information. This is because + # the sign of self.desiredRespAmp is lost in the squaring above. + # Ordinarily this would be put into the final equation, but we + # are shaping the wave information so that it is buried in the + # new spectral information, S. (AP) + if response_desired < 0: + _phase += np.pi + + mler = xr.Dataset( + { + "WaveSpectrum": (["frequency"], np.array(conditioned_spectrum)), + "Phase": (["frequency"], _phase + np.pi * (response_desired < 0)), + }, + coords={"frequency": freq_hz}, + ) + mler.fillna(0) + + return mler.to_pandas() if to_pandas else mler + + +def mler_simulation( + parameters: Optional[SimulationParameters] = None, +) -> SimulationParameters: + """ + Define the simulation parameters that are used in various MLER + functionalities. + + See `extreme_response_contour_example.ipynb` example for how this is + useful. If no input is given, then default values are returned. + + Parameters + ---------- + parameters: dict (optional) + Simulation parameters. + Keys: + ----- + - 'startTime': starting time [s] + - 'endTime': ending time [s] + - 'dT': time-step size [s] + - 'T0': time of maximum event [s] + - 'startx': start of simulation space [m] + - 'endX': end of simulation space [m] + - 'dX': horizontal spacing [m] + - 'X': position of maximum event [m] + The following keys are calculated from the above parameters: + - 'maxIT': int, maximum timestep index + - 'T': np.ndarray, time array + - 'maxIX': int, maximum index for space + - 'X': np.ndarray, space array + + Returns + ------- + sim: dict + Simulation parameters including spatial and time calculated + arrays. + """ + if not isinstance(parameters, (type(None), dict)): + raise TypeError( + f"If specified, parameters must be of type dict. Got: {type(parameters)}" + ) + + sim = {} + + if parameters is None: + sim["startTime"] = -150.0 # [s] Starting time + sim["endTime"] = 150.0 # [s] Ending time + sim["dT"] = 1.0 # [s] Time-step size + sim["T0"] = 0.0 # [s] Time of maximum event + sim["startX"] = -300.0 # [m] Start of simulation space + sim["endX"] = 300.0 # [m] End of simulation space + sim["dX"] = 1.0 # [m] Horiontal spacing + sim["X0"] = 0.0 # [m] Position of maximum event + else: + sim = parameters + + # maximum timestep index + sim["maxIT"] = int(np.ceil((sim["endTime"] - sim["startTime"]) / sim["dT"] + 1)) + sim["T"] = np.linspace(sim["startTime"], sim["endTime"], sim["maxIT"]) + + sim["maxIX"] = int(np.ceil((sim["endX"] - sim["startX"]) / sim["dX"] + 1)) + sim["X"] = np.linspace(sim["startX"], sim["endX"], sim["maxIX"]) + + return sim + + +def mler_wave_amp_normalize( + wave_amp: float, + mler: Union[pd.DataFrame, xr.Dataset], + sim: SimulationParameters, + k: Union[NDArray[np.float_], List[float], pd.Series], + **kwargs: Any, +) -> Union[pd.DataFrame, xr.Dataset]: + """ + Function that renormalizes the incoming amplitude of the MLER wave + to the desired peak height (peak to MSL). + + Parameters + ---------- + wave_amp: float + Desired wave amplitude (peak to MSL). + mler: pandas DataFrame or xarray Dataset + MLER coefficients generated by 'mler_coefficients' function. + sim: dict + Simulation parameters formatted by output from + 'mler_simulation'. + k: numpy ndarray + Wave number + frequency_dimension: string (optional) + Name of the xarray dimension corresponding to frequency. If not supplied, + defaults to the first dimension. Does not affect pandas input. + to_pandas: bool (optional) + Flag to output pandas instead of xarray. Default = True. + + Returns + ------- + mler_norm : pandas DataFrame or xarray Dataset + MLER coefficients + """ + frequency_dimension = kwargs.get("frequency_dimension", "") + to_pandas = kwargs.get("to_pandas", True) + + k_array = np.array(k, dtype=float) if not isinstance(k, np.ndarray) else k + + if not isinstance(mler, (pd.DataFrame, xr.Dataset)): + raise TypeError( + f"mler must be of type pd.DataFrame or xr.Dataset. Got: {type(mler)}" + ) + if not isinstance(wave_amp, (int, float)): + raise TypeError(f"wave_amp must be of type int or float. Got: {type(wave_amp)}") + if not isinstance(sim, dict): + raise TypeError(f"sim must be of type dict. Got: {type(sim)}") + if not isinstance(frequency_dimension, str): + raise TypeError( + "frequency_dimension must be of type bool." + + f"Got: {type(frequency_dimension)}" + ) + if not isinstance(to_pandas, bool): + raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") + + # If input is pandas, convert to xarray + mler_xr = mler.to_xarray() if isinstance(mler, pd.DataFrame) else mler() + + # Determine frequency dimension + freq_dim = frequency_dimension or list(mler_xr.coords)[0] + # freq = mler_xr.coords[freq_dim].values * 2 * np.pi + # d_w = np.diff(freq).mean() + + wave_amp_time = np.array( + [ + np.sum( + np.sqrt( + 2 + * mler_xr["WaveSpectrum"].values + * np.diff(mler_xr.coords[freq_dim].values * 2 * np.pi).mean() + ) + * np.cos( + mler_xr.coords[freq_dim].values * 2 * np.pi * (t - sim["T0"]) + - k_array * (x - sim["X0"]) + + mler_xr["Phase"].values + ) + ) + for x in np.linspace(sim["startX"], sim["endX"], sim["maxIX"]) + for t in np.linspace(sim["startTime"], sim["endTime"], sim["maxIT"]) + ] + ).reshape(sim["maxIX"], sim["maxIT"]) + + rescale_fact = np.abs(wave_amp) / np.max(np.abs(wave_amp_time)) + + # Rescale the wave spectral amplitude coefficients and assign phase + mler_norm = xr.Dataset( + { + "WaveSpectrum": ( + ["frequency"], + mler_xr["WaveSpectrum"].data * rescale_fact**2, + ), + "Phase": (["frequency"], mler_xr["Phase"].data), + }, + coords={"frequency": (["frequency"], mler_xr.coords[freq_dim].data)}, + ) + return mler_norm.to_pandas() if to_pandas else mler_norm + + +def mler_export_time_series( + rao: Union[NDArray[np.float_], List[float], pd.Series], + mler: Union[pd.DataFrame, xr.Dataset], + sim: SimulationParameters, + k: Union[NDArray[np.float_], List[float], pd.Series], + **kwargs: Any, +) -> Union[pd.DataFrame, xr.Dataset]: + """ + Generate the wave amplitude time series at X0 from the calculated + MLER coefficients + + Parameters + ---------- + rao: numpy ndarray + Response amplitude operator. + mler: pandas DataFrame or xarray Dataset + MLER coefficients dataframe generated from an MLER function. + sim: dict + Simulation parameters formatted by output from + 'mler_simulation'. + k: numpy ndarray + Wave number. + frequency_dimension: string (optional) + Name of the xarray dimension corresponding to frequency. If not supplied, + defaults to the first dimension. Does not affect pandas input. + to_pandas: bool (optional) + Flag to output pandas instead of xarray. Default = True. + + Returns + ------- + mler_ts: pandas DataFrame or xarray Dataset + Time series of wave height [m] and linear response [*] indexed + by time [s]. + + """ + frequency_dimension = kwargs.get("frequency_dimension", "") + to_pandas = kwargs.get("to_pandas", True) + + if not isinstance(rao, np.ndarray): + raise TypeError(f"rao must be of type ndarray. Got: {type(rao)}") + if not isinstance(mler, (pd.DataFrame, xr.Dataset)): + raise TypeError( + f"mler must be of type pd.DataFrame or xr.Dataset. Got: {type(mler)}" + ) + if not isinstance(sim, dict): + raise TypeError(f"sim must be of type dict. Got: {type(sim)}") + if not isinstance(k, (np.ndarray, list, pd.Series)): + raise TypeError(f"k must be of type ndarray. Got: {type(k)}") + if not isinstance(to_pandas, bool): + raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") + if not isinstance(frequency_dimension, str): + raise TypeError( + f"frequency_dimension must be of type str. Got: {type(frequency_dimension)}" + ) + + rao = np.array(rao, dtype=float) if not isinstance(rao, np.ndarray) else rao + k = np.array(k, dtype=float) if not isinstance(k, np.ndarray) else k + # If input is pandas, convert to xarray + mler = mler if isinstance(mler, xr.Dataset) else mler.to_xarray() + + # Handle optional frequency dimension + frequency_dimension = ( + frequency_dimension if frequency_dimension else list(mler.coords)[0] + ) + freq = mler.coords[frequency_dimension].values * 2 * np.pi + d_w = np.diff(freq).mean() + + wave_height = np.zeros(len(sim["T"])) + linear_response = np.zeros(len(sim["T"])) + for i, t_i in enumerate(sim["T"]): + cos_terms = np.cos( + freq * (t_i - sim["T0"]) + - k * (sim["X0"] - sim["X0"]) + + mler["Phase"].values + ) + wave_height[i] = np.sum(np.sqrt(2 * mler["WaveSpectrum"] * d_w) * cos_terms) + + linear_response[i] = np.sum( + np.sqrt(2 * mler["WaveSpectrum"] * d_w) + * np.abs(rao) + * np.cos(freq * (t_i - sim["T0"]) - k * (sim["X0"] - sim["X0"])) + ) + + # Construct the output dataset + mler_ts = xr.Dataset( + { + "WaveHeight": (["time"], wave_height), + "LinearResponse": (["time"], linear_response), + }, + coords={"time": sim["T"]}, + ) + + # Convert to pandas DataFrame if requested + return mler_ts.to_dataframe() if to_pandas else mler_ts diff --git a/mhkit/loads/extreme/peaks.py b/mhkit/loads/extreme/peaks.py new file mode 100644 index 000000000..3f588237a --- /dev/null +++ b/mhkit/loads/extreme/peaks.py @@ -0,0 +1,481 @@ +""" +This module provides utilities for analyzing wave data, specifically +for identifying significant wave heights and estimating wave peak +distributions using statistical methods. + +Functions: +- _calculate_window_size: Calculates the window size for peak + independence using the auto-correlation function of wave peaks. +- _peaks_over_threshold: Identifies peaks over a specified + threshold and returns independent storm peak values adjusted by + the threshold. +- global_peaks: Identifies global peaks in a zero-centered + response time-series based on consecutive zero up-crossings. +- number_of_short_term_peaks: Estimates the number of peaks within a + specified short-term period. +- peaks_distribution_weibull: Estimates the peaks distribution by + fitting a Weibull distribution to the peaks of the response. +- peaks_distribution_weibull_tail_fit: Estimates the peaks distribution + using the Weibull tail fit method. +- automatic_hs_threshold: Determines the best significant wave height + threshold for the peaks-over-threshold method. +- peaks_distribution_peaks_over_threshold: Estimates the peaks + distribution using the peaks over threshold method by fitting a + generalized Pareto distribution. + +References: +- Neary, V. S., S. Ahn, B. E. Seng, M. N. Allahdadi, T. Wang, Z. Yang, + and R. He (2020). "Characterization of Extreme Wave Conditions for + Wave Energy Converter Design and Project Risk Assessment.” J. Mar. + Sci. Eng. 2020, 8(4), 289; https://doi.org/10.3390/jmse8040289. + +""" + +from typing import List, Tuple, Optional + +import numpy as np +from numpy.typing import NDArray +from scipy import stats, optimize, signal +from scipy.stats import rv_continuous + +from mhkit.utils import upcrossing + + +def _calculate_window_size(peaks: NDArray[np.float64], sampling_rate: float) -> float: + """ + Calculate the window size for independence based on the auto-correlation function. + + Parameters + ---------- + peaks : np.ndarray + A NumPy array of peak values from a time series. + sampling_rate : float + The sampling rate of the time series in Hz (samples per second). + + Returns + ------- + float + The window size determined by the auto-correlation function. + """ + n_lags = int(14 * 24 / sampling_rate) + deviations_from_mean = peaks - np.mean(peaks) + acf = signal.correlate(deviations_from_mean, deviations_from_mean, mode="full") + lag = signal.correlation_lags(len(peaks), len(peaks), mode="full") + idx_zero = np.argmax(lag == 0) + positive_lag = lag[idx_zero : idx_zero + n_lags + 1] + acf_positive = acf[idx_zero : idx_zero + n_lags + 1] / acf[idx_zero] + + window_size = sampling_rate * positive_lag[acf_positive < 0.5][0] + return window_size / sampling_rate + + +def _peaks_over_threshold( + peaks: NDArray[np.float64], threshold: float, sampling_rate: float +) -> List[float]: + """ + Identifies peaks in a time series that are over a specified threshold and + returns a list of independent storm peak values adjusted by the threshold. + Independence is determined by a window size calculated from the auto-correlation + function to ensure that peaks are separated by at least the duration + corresponding to the first significant drop in auto-correlation. + + Parameters + ---------- + peaks : np.ndarray + A NumPy array of peak values from a time series. + threshold : float + The percentile threshold (0-1) to identify significant peaks. + For example, 0.95 for the 95th percentile. + sampling_rate : float + The sampling rate of the time series in Hz (samples per second). + + Returns + ------- + List[float] + A list of peak values exceeding the specified threshold, adjusted + for independence based on the calculated window size. + + Notes + ----- + This function requires the global_peaks function to identify the + maxima between consecutive zero up-crossings and uses the signal processing + capabilities from scipy.signal for calculating the auto-correlation function. + """ + threshold_unit = np.percentile(peaks, 100 * threshold, method="hazen") + idx_peaks = np.arange(len(peaks)) + idx_storm_peaks, storm_peaks = global_peaks(idx_peaks, peaks - threshold_unit) + idx_storm_peaks = idx_storm_peaks.astype(int) + + independent_storm_peaks = [storm_peaks[0]] + idx_independent_storm_peaks = [idx_storm_peaks[0]] + + window = _calculate_window_size(peaks, sampling_rate) + + for idx in idx_storm_peaks[1:]: + if (idx - idx_independent_storm_peaks[-1]) > window: + idx_independent_storm_peaks.append(idx) + independent_storm_peaks.append(peaks[idx] - threshold_unit) + elif peaks[idx] > independent_storm_peaks[-1]: + idx_independent_storm_peaks[-1] = idx + independent_storm_peaks[-1] = peaks[idx] - threshold_unit + + return independent_storm_peaks + + +def global_peaks(time: np.ndarray, data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + """ + Find the global peaks of a zero-centered response time-series. + + The global peaks are the maxima between consecutive zero + up-crossings. + + Parameters + ---------- + time: np.array + Time array. + data: np.array + Response time-series. + + Returns + ------- + time_peaks: np.array + Time array for peaks + peaks: np.array + Peak values of the response time-series + """ + if not isinstance(time, np.ndarray): + raise TypeError(f"time must be of type np.ndarray. Got: {type(time)}") + if not isinstance(data, np.ndarray): + raise TypeError(f"data must be of type np.ndarray. Got: {type(data)}") + + # Find zero up-crossings + inds = upcrossing(time, data) + + # We also include the final point in the dataset + inds = np.append(inds, len(data) - 1) + + # As we want to return both the time and peak + # values, look for the index at the peak. + # The call to argmax gives us the index within the + # upcrossing period. Therefore to get the index in the + # original array we need to add on the index that + # starts the zero crossing period, ind1. + def find_peak_index(ind1, ind2): + return np.argmax(data[ind1:ind2]) + ind1 + + peak_inds = np.array( + [find_peak_index(ind1, inds[i + 1]) for i, ind1 in enumerate(inds[:-1])], + dtype=int, + ) + + return time[peak_inds], data[peak_inds] + + +def number_of_short_term_peaks(n_peaks: int, time: float, time_st: float) -> float: + """ + Estimate the number of peaks in a specified period. + + Parameters + ---------- + n_peaks : int + Number of peaks in analyzed timeseries. + time : float + Length of time of analyzed timeseries. + time_st: float + Short-term period for which to estimate the number of peaks. + + Returns + ------- + n_st : float + Number of peaks in short term period. + """ + if not isinstance(n_peaks, int): + raise TypeError(f"n_peaks must be of type int. Got: {type(n_peaks)}") + if not isinstance(time, float): + raise TypeError(f"time must be of type float. Got: {type(time)}") + if not isinstance(time_st, float): + raise TypeError(f"time_st must be of type float. Got: {type(time_st)}") + + return n_peaks * time_st / time + + +def peaks_distribution_weibull(peaks_data: NDArray[np.float_]) -> rv_continuous: + """ + Estimate the peaks distribution by fitting a Weibull + distribution to the peaks of the response. + + The fitted parameters can be accessed through the `params` field of + the returned distribution. + + Parameters + ---------- + peaks_data : NDArray[np.float_] + Global peaks. + + Returns + ------- + peaks: scipy.stats.rv_frozen + Probability distribution of the peaks. + """ + if not isinstance(peaks_data, np.ndarray): + raise TypeError( + f"peaks_data must be of type np.ndarray. Got: {type(peaks_data)}" + ) + + # peaks distribution + peaks_params = stats.exponweib.fit(peaks_data, f0=1, floc=0) + param_names = ["a", "c", "loc", "scale"] + peaks_params = dict(zip(param_names, peaks_params)) + peaks = stats.exponweib(**peaks_params) + # save the parameter info + peaks.params = peaks_params + return peaks + + +# pylint: disable=R0914 +def peaks_distribution_weibull_tail_fit( + peaks_data: NDArray[np.float_], +) -> rv_continuous: + """ + Estimate the peaks distribution using the Weibull tail fit + method. + + The fitted parameters can be accessed through the `params` field of + the returned distribution. + + Parameters + ---------- + peaks_data : np.array + Global peaks. + + Returns + ------- + peaks: scipy.stats.rv_frozen + Probability distribution of the peaks. + """ + if not isinstance(peaks_data, np.ndarray): + raise TypeError( + f"peaks_data must be of type np.ndarray. Got: {type(peaks_data)}" + ) + + # Initial guess for Weibull parameters + p_0 = stats.exponweib.fit(peaks_data, f0=1, floc=0) + p_0 = np.array([p_0[1], p_0[3]]) + # Approximate CDF + peaks_data = np.sort(peaks_data) + n_peaks = len(peaks_data) + cdf_positions = np.zeros(n_peaks) + for i in range(n_peaks): + cdf_positions[i] = i / (n_peaks + 1.0) + # Divide into seven sets & fit Weibull + subset_shape_params = np.zeros(7) + subset_scale_params = np.zeros(7) + set_lim = np.arange(0.60, 0.90, 0.05) + + def weibull_cdf(data_points, shape, scale): + return stats.exponweib(a=1, c=shape, loc=0, scale=scale).cdf(data_points) + + for local_set in range(7): + global_peaks_set = peaks_data[(cdf_positions > set_lim[local_set])] + cdf_positions_set = cdf_positions[(cdf_positions > set_lim[local_set])] + # pylint: disable=W0632 + p_opt, _ = optimize.curve_fit( + weibull_cdf, global_peaks_set, cdf_positions_set, p0=p_0 + ) + subset_shape_params[local_set] = p_opt[0] + subset_scale_params[local_set] = p_opt[1] + # peaks distribution + peaks_params = [1, np.mean(subset_shape_params), 0, np.mean(subset_scale_params)] + param_names = ["a", "c", "loc", "scale"] + peaks_params = dict(zip(param_names, peaks_params)) + peaks = stats.exponweib(**peaks_params) + # save the parameter info + peaks.params = peaks_params + peaks.subset_shape_params = subset_shape_params + peaks.subset_scale_params = subset_scale_params + return peaks + + +# pylint: disable=R0914 +def automatic_hs_threshold( + peaks: NDArray[np.float_], + sampling_rate: float, + initial_threshold_range: Tuple[float, float, float] = (0.990, 0.995, 0.001), + max_refinement: int = 5, +) -> Tuple[float, float]: + """ + Find the best significant wave height threshold for the + peaks-over-threshold method. + + This method was developed by: + + > Neary, V. S., S. Ahn, B. E. Seng, M. N. Allahdadi, T. Wang, Z. Yang and R. He (2020). + > "Characterization of Extreme Wave Conditions for Wave Energy Converter Design and + > Project Risk Assessment.” + > J. Mar. Sci. Eng. 2020, 8(4), 289; https://doi.org/10.3390/jmse8040289. + + Please cite this paper if using this method. + + After all thresholds in the initial range are evaluated, the search + range is refined around the optimal point until either (i) there + is minimal change from the previous refinement results, (ii) the + number of data points become smaller than about 1 per year, or (iii) + the maximum number of iterations is reached. + + Parameters + ---------- + peaks: NDArray[np.float_] + Peak values of the response time-series. + sampling_rate: float + Sampling rate in hours. + initial_threshold_range: Tuple[float, float, float] + Initial range of thresholds to search. Described as + (min, max, step). + max_refinement: int + Maximum number of times to refine the search range. + + Returns + ------- + Tuple[float, float] + The best threshold and its corresponding unit. + + """ + if not isinstance(sampling_rate, (float, int)): + raise TypeError( + f"sampling_rate must be of type float or int. Got: {type(sampling_rate)}" + ) + if not isinstance(peaks, np.ndarray): + raise TypeError(f"peaks must be of type np.ndarray. Got: {type(peaks)}") + if not len(initial_threshold_range) == 3: + raise ValueError( + f"initial_threshold_range must be length 3. Got: {len(initial_threshold_range)}" + ) + if not isinstance(max_refinement, int): + raise TypeError( + f"max_refinement must be of type int. Got: {type(max_refinement)}" + ) + + range_min, range_max, range_step = initial_threshold_range + best_threshold = -1 + years = len(peaks) / (365.25 * 24 / sampling_rate) + + for i in range(max_refinement): + thresholds = np.arange(range_min, range_max, range_step) + correlations = [] + + for threshold in thresholds: + distribution = stats.genpareto + over_threshold = _peaks_over_threshold(peaks, threshold, sampling_rate) + rate_per_year = len(over_threshold) / years + if rate_per_year < 2: + break + distributions_parameters = distribution.fit(over_threshold, floc=0.0) + _, (_, _, correlation) = stats.probplot( + peaks, distributions_parameters, distribution, fit=True + ) + correlations.append(correlation) + + max_i = np.argmax(correlations) + minimal_change = np.abs(best_threshold - thresholds[max_i]) < 0.0005 + best_threshold = thresholds[max_i] + if minimal_change and i < max_refinement - 1: + break + range_step /= 10 + if max_i == len(thresholds) - 1: + range_min = thresholds[max_i - 1] + range_max = thresholds[max_i] + 5 * range_step + elif max_i == 0: + range_min = thresholds[max_i] - 9 * range_step + range_max = thresholds[max_i + 1] + else: + range_min = thresholds[max_i - 1] + range_max = thresholds[max_i + 1] + + best_threshold_unit = np.percentile(peaks, 100 * best_threshold, method="hazen") + return best_threshold, best_threshold_unit + + +def peaks_distribution_peaks_over_threshold( + peaks_data: NDArray[np.float_], threshold: Optional[float] = None +) -> rv_continuous: + """ + Estimate the peaks distribution using the peaks over threshold + method. + + This fits a generalized Pareto distribution to all the peaks above + the specified threshold. The distribution is only defined for values + above the threshold and therefore cannot be used to obtain integral + metrics such as the expected value. A typical choice of threshold is + 1.4 standard deviations above the mean. The peaks over threshold + distribution can be accessed through the `pot` field of the returned + peaks distribution. + + Parameters + ---------- + peaks_data : NDArray[np.float_] + Global peaks. + threshold : Optional[float] + Threshold value. Only peaks above this value will be used. + Default value calculated as: `np.mean(x) + 1.4 * np.std(x)` + + Returns + ------- + peaks: rv_continuous + Probability distribution of the peaks. + """ + if not isinstance(peaks_data, np.ndarray): + raise TypeError( + f"peaks_data must be of type np.ndarray. Got: {type(peaks_data)}" + ) + if threshold is None: + threshold = np.mean(peaks_data) + 1.4 * np.std(peaks_data) + if threshold is not None and not isinstance(threshold, float): + raise TypeError( + f"If specified, threshold must be of type float. Got: {type(threshold)}" + ) + + # peaks over threshold + peaks_data = np.sort(peaks_data) + pot = peaks_data[peaks_data > threshold] - threshold + npeaks = len(peaks_data) + npot = len(pot) + # Fit a generalized Pareto + pot_params = stats.genpareto.fit(pot, floc=0.0) + param_names = ["c", "loc", "scale"] + pot_params = dict(zip(param_names, pot_params)) + pot = stats.genpareto(**pot_params) + # save the parameter info + pot.params = pot_params + + # peaks + class _Peaks(rv_continuous): + def __init__( + self, pot_distribution: rv_continuous, threshold: float, *args, **kwargs + ): + self.pot = pot_distribution + self.threshold = threshold + super().__init__(*args, **kwargs) + + # pylint: disable=arguments-differ + def _cdf(self, data_points, *args, **kwds) -> NDArray[np.float_]: + # Convert data_points to a NumPy array if it's not already + data_points = np.atleast_1d(data_points) + out = np.zeros_like(data_points) + + # Use the instance's threshold attribute instead of passing as a parameter + below_threshold = data_points < self.threshold + out[below_threshold] = np.NaN + + above_threshold_indices = ~below_threshold + if np.any(above_threshold_indices): + points_above_threshold = data_points[above_threshold_indices] + pot_ccdf = 1.0 - self.pot.cdf( + points_above_threshold - self.threshold, *args, **kwds + ) + prop_pot = npot / npeaks + out[above_threshold_indices] = 1.0 - (prop_pot * pot_ccdf) + return out + + peaks = _Peaks(name="peaks", pot_distribution=pot, threshold=threshold) + peaks.pot = pot + return peaks diff --git a/mhkit/loads/extreme/sample.py b/mhkit/loads/extreme/sample.py new file mode 100644 index 000000000..3da0377de --- /dev/null +++ b/mhkit/loads/extreme/sample.py @@ -0,0 +1,52 @@ +""" +This module provides statistical analysis tools for extreme value +analysis in environmental and engineering applications. It focuses on +estimating values corresponding to specific return periods based on +the statistical distribution of observed or simulated data. + +Functionality: +- return_year_value: Calculates the value from a given distribution + corresponding to a specified return year. This function is particularly + useful for determining design values for engineering structures or for + risk assessment in environmental studies. + +""" + +from typing import Callable + + +def return_year_value( + ppf: Callable[[float], float], return_year: float, short_term_period_hr: float +) -> float: + """ + Calculate the value from a given distribution corresponding to a particular + return year. + + Parameters + ---------- + ppf: callable function of 1 argument + Percentage Point Function (inverse CDF) of short term distribution. + return_year: int, float + Return period in years. + short_term_period_hr: int, float + Short term period the distribution is created from in hours. + + Returns + ------- + value: float + The value corresponding to the return period from the distribution. + """ + if not callable(ppf): + raise TypeError("ppf must be a callable Percentage Point Function") + if not isinstance(return_year, (float, int)): + raise TypeError( + f"return_year must be of type float or int. Got: {type(return_year)}" + ) + if not isinstance(short_term_period_hr, (float, int)): + raise TypeError( + f"short_term_period_hr must be of type float or int. Got: {type(short_term_period_hr)}" + ) + + probability_of_exceedance = 1 / (return_year * 365.25 * 24 / short_term_period_hr) + + return ppf(1 - probability_of_exceedance) diff --git a/mhkit/loads/general.py b/mhkit/loads/general.py index e9a959426..119731443 100644 --- a/mhkit/loads/general.py +++ b/mhkit/loads/general.py @@ -1,11 +1,48 @@ +""" +This module provides tools for analyzing and processing data signals +related to turbine blade performance and fatigue analysis. It implements +methodologies based on standards such as IEC TS 62600-3:2020 ED1, +incorporating statistical binning, moment calculations, and fatigue +damage estimation using the rainflow counting algorithm. Key +functionalities include: + + - `bin_statistics`: Bins time-series data against a specified signal, + such as wind speed, to calculate mean and standard deviation statistics + for each bin, following IEC TS 62600-3:2020 ED1 guidelines. It supports + output in both pandas DataFrame and xarray Dataset formats. + + - `blade_moments`: Calculates the flapwise and edgewise moments of turbine + blades using derived calibration coefficients and raw strain signals. + This function is crucial for understanding the loading and performance + characteristics of turbine blades. + + - `damage_equivalent_load`: Estimates the damage equivalent load (DEL) + of a single data signal using a 4-point rainflow counting algorithm. + This method is vital for assessing fatigue life and durability of + materials under variable amplitude loading. + +References: +- C. Amzallag et. al., International Journal of Fatigue, 16 (1994) 287-293. +- ISO 12110-2, Metallic materials - Fatigue testing - Variable amplitude fatigue testing. +- G. Marsh et. al., International Journal of Fatigue, 82 (2016) 757-765. +""" + +from typing import Union, List, Tuple, Optional from scipy.stats import binned_statistic import pandas as pd import xarray as xr import numpy as np import fatpack +from mhkit.utils.type_handling import to_numeric_array -def bin_statistics(data, bin_against, bin_edges, data_signal=[], to_pandas=True): +def bin_statistics( + data: Union[pd.DataFrame, xr.Dataset], + bin_against: np.ndarray, + bin_edges: np.ndarray, + data_signal: Optional[List[str]] = None, + to_pandas: bool = True, +) -> Tuple[Union[pd.DataFrame, xr.Dataset], Union[pd.DataFrame, xr.Dataset]]: """ Bins calculated statistics against data signal (or channel) according to IEC TS 62600-3:2020 ED1. @@ -36,38 +73,9 @@ def bin_statistics(data, bin_against, bin_edges, data_signal=[], to_pandas=True) f"data must be of type pd.DataFrame or xr.Dataset. Got: {type(data)}" ) - if isinstance(bin_against, str): - raise TypeError( - f"bin_against must be numeric, not a string. Got: {bin_against}" - ) - - if not isinstance(bin_against, (list, xr.DataArray, pd.Series, np.ndarray)): - raise TypeError( - f"bin_against must be of type list, xr.DataArray, pd.Series, or np.ndarray. Got: {type(bin_against)}" - ) - - if not isinstance(bin_against, np.ndarray): - try: - bin_against = np.asarray(bin_against) - except: - raise TypeError( - f"bin_against must be of type np.ndarray. Got: {type(bin_against)}" - ) - - # Check if bin_edges is a string and raise an error if it is - if isinstance(bin_edges, str): - raise TypeError(f"bin_edges must not be a string. Got: {bin_edges}") - - # Check if bin_edges is one of the expected types, and convert if necessary - if isinstance(bin_edges, (list, xr.DataArray, pd.Series)): - try: - bin_edges = np.asarray(bin_edges) - except: - pass - - # Check if bin_edges is now a NumPy array, and raise an error if it's not - if not isinstance(bin_edges, np.ndarray): - raise TypeError(f"bin_edges must be of type np.ndarray. Got: {type(bin_edges)}") + # Use _to_numeric_array to process bin_against and bin_edges + bin_against = to_numeric_array(bin_against, "bin_against") + bin_edges = to_numeric_array(bin_edges, "bin_edges") if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") @@ -76,6 +84,8 @@ def bin_statistics(data, bin_against, bin_edges, data_signal=[], to_pandas=True) if isinstance(data, pd.DataFrame): data = data.to_xarray() + if data_signal is None: + data_signal = [] # Determine variables to analyze if len(data_signal) == 0: # if not specified, bin all variables data_signal = list(data.keys()) @@ -125,7 +135,13 @@ def bin_statistics(data, bin_against, bin_edges, data_signal=[], to_pandas=True) return bin_mean, bin_std -def blade_moments(blade_coefficients, flap_offset, flap_raw, edge_offset, edge_raw): +def blade_moments( + blade_coefficients: np.ndarray, + flap_offset: float, + flap_raw: np.ndarray, + edge_offset: float, + edge_raw: np.ndarray, +) -> Tuple[np.ndarray, np.ndarray]: """ Transfer function for deriving blade flap and edge moments using blade matrix. @@ -150,20 +166,10 @@ def blade_moments(blade_coefficients, flap_offset, flap_raw, edge_offset, edge_r Blade edgewise moment in SI units """ - try: - blade_coefficients = np.asarray(blade_coefficients) - except: - raise TypeError( - f"blade_coefficients must be of type np.ndarray. Got: {type(blade_coefficients)}" - ) - try: - flap_raw = np.asarray(flap_raw) - except: - raise TypeError(f"flap_raw must be of type np.ndarray. Got: {type(flap_raw)}") - try: - edge_raw = np.asarray(edge_raw) - except: - raise TypeError(f"edge_raw must be of type np.ndarray. Got: {type(edge_raw)}") + # Convert and validate blade_coefficients, flap_raw, and edge_raw + blade_coefficients = to_numeric_array(blade_coefficients, "blade_coefficients") + flap_raw = to_numeric_array(flap_raw, "flap_raw") + edge_raw = to_numeric_array(edge_raw, "edge_raw") if not isinstance(flap_offset, (float, int)): raise TypeError( @@ -179,13 +185,18 @@ def blade_moments(blade_coefficients, flap_offset, flap_raw, edge_offset, edge_r edge_signal = edge_raw - edge_offset # apply matrix to get load signals - M_flap = blade_coefficients[0] * flap_signal + blade_coefficients[1] * edge_signal - M_edge = blade_coefficients[2] * flap_signal + blade_coefficients[3] * edge_signal + m_flap = blade_coefficients[0] * flap_signal + blade_coefficients[1] * edge_signal + m_edge = blade_coefficients[2] * flap_signal + blade_coefficients[3] * edge_signal - return M_flap, M_edge + return m_flap, m_edge -def damage_equivalent_load(data_signal, m, bin_num=100, data_length=600): +def damage_equivalent_load( + data_signal: np.ndarray, + m: Union[float, int], + bin_num: int = 100, + data_length: Union[float, int] = 600, +) -> float: """ Calculates the damage equivalent load of a single data signal (or channel) based on IEC TS 62600-3:2020 ED1. 4-point rainflow counting algorithm from @@ -217,12 +228,7 @@ def damage_equivalent_load(data_signal, m, bin_num=100, data_length=600): Damage equivalent load (DEL) of single data signal """ - try: - data_signal = np.array(data_signal) - except: - raise TypeError( - f"data_signal must be of type np.ndarray. Got: {type(data_signal)}" - ) + to_numeric_array(data_signal, "data_signal") if not isinstance(m, (float, int)): raise TypeError(f"m must be of type float or int. Got: {type(m)}") if not isinstance(bin_num, (float, int)): @@ -235,9 +241,9 @@ def damage_equivalent_load(data_signal, m, bin_num=100, data_length=600): rainflow_ranges = fatpack.find_rainflow_ranges(data_signal, k=256) # Range count and bin - Nrf, Srf = fatpack.find_range_count(rainflow_ranges, bin_num) + n_rf, s_rf = fatpack.find_range_count(rainflow_ranges, bin_num) - DELs = Srf**m * Nrf / data_length - DEL = DELs.sum() ** (1 / m) + del_s = s_rf**m * n_rf / data_length + del_value = del_s.sum() ** (1 / m) - return DEL + return del_value diff --git a/mhkit/loads/graphics.py b/mhkit/loads/graphics.py index d37cb1a2c..3403dda7f 100644 --- a/mhkit/loads/graphics.py +++ b/mhkit/loads/graphics.py @@ -1,9 +1,35 @@ -import matplotlib.pyplot as plt +""" +This module provides functionalities for plotting statistical data +related to a given variable or dataset. + + - `plot_statistics` is designed to plot raw statistical measures + (mean, maximum, minimum, and optional standard deviation) of a + variable across a series of x-axis values. It allows for + customization of plot labels, title, and saving the plot to a file. + + - `plot_bin_statistics` extends these capabilities to binned data, + offering a way to visualize binned statistics (mean, maximum, minimum) + along with their respective standard deviations. This function also + supports label and title customization, as well as saving the plot to + a specified path. +""" + +from typing import Optional, Dict, Any import numpy as np -import pandas as pd +import matplotlib.pyplot as plt +from mhkit.utils.type_handling import to_numeric_array -def plot_statistics(x, y_mean, y_max, y_min, y_stdev=[], **kwargs): + +# pylint: disable=R0914 +def plot_statistics( + x: np.ndarray, + y_mean: np.ndarray, + y_max: np.ndarray, + y_min: np.ndarray, + y_stdev: Optional[np.ndarray] = None, + **kwargs: Dict[str, Any], +) -> plt.Axes: """ Plot showing standard raw statistics of variable @@ -33,20 +59,15 @@ def plot_statistics(x, y_mean, y_max, y_min, y_stdev=[], **kwargs): -------- ax : matplotlib pyplot axes """ + if y_stdev is None: + y_stdev = [] input_variables = [x, y_mean, y_max, y_min, y_stdev] - for i in range(len(input_variables)): - var_name = ["x", "y_mean", "y_max", "y_min", "y_stdev"][i] - if not isinstance(input_variables[i], (np.ndarray, pd.Series, int, float)): - raise TypeError( - f"{var_name} must be of type np.ndarray, int, or float. Got: {type(input_variables[i])}" - ) - - try: - input_variables[i] = np.array(input_variables[i]) - except: - pass + variable_names = ["x", "y_mean", "y_max", "y_min", "y_stdev"] + # Convert each input variable to a numeric array, ensuring all are numeric + for i, variable in enumerate(input_variables): + input_variables[i] = to_numeric_array(variable, variable_names[i]) x, y_mean, y_max, y_min, y_stdev = input_variables @@ -74,16 +95,16 @@ def plot_statistics(x, y_mean, y_max, y_min, y_stdev=[], **kwargs): ax.grid(alpha=0.4) ax.legend(loc="best") - if x_label != None: + if x_label: ax.set_xlabel(x_label) - if y_label != None: + if y_label: ax.set_ylabel(y_label) - if title != None: + if title: ax.set_title(title) fig.tight_layout() - if save_path == None: + if save_path is None: plt.show() else: fig.savefig(save_path) @@ -91,16 +112,17 @@ def plot_statistics(x, y_mean, y_max, y_min, y_stdev=[], **kwargs): return ax +# pylint: disable=R0913 def plot_bin_statistics( - bin_centers, - bin_mean, - bin_max, - bin_min, - bin_mean_std, - bin_max_std, - bin_min_std, - **kwargs, -): + bin_centers: np.ndarray, + bin_mean: np.ndarray, + bin_max: np.ndarray, + bin_min: np.ndarray, + bin_mean_std: np.ndarray, + bin_max_std: np.ndarray, + bin_min_std: np.ndarray, + **kwargs: Dict[str, Any], +) -> plt.Axes: """ Plot showing standard binned statistics of single variable @@ -144,36 +166,23 @@ def plot_bin_statistics( bin_max_std, bin_min_std, ] + variable_names = [ + "bin_centers", + "bin_mean", + "bin_max", + "bin_min", + "bin_mean_std", + "bin_max_std", + "bin_min_std", + ] - for i in range(len(input_variables)): - var_name = [ - "bin_centers", - "bin_mean", - "bin_max", - "bin_min", - "bin_mean_std", - "bin_max_std", - "bin_min_std", - ][i] - if not isinstance(input_variables[i], (np.ndarray, pd.Series, int, float)): - raise TypeError( - f"{var_name} must be of type np.ndarray, int, or float. Got: {type(input_variables[i])}" - ) - - try: - input_variables[i] = np.array(input_variables[i]) - except: - pass - - ( - bin_centers, - bin_mean, - bin_max, - bin_min, - bin_mean_std, - bin_max_std, - bin_min_std, - ) = input_variables + # Convert each input variable to a numeric array, ensuring all are numeric + for i, variable in enumerate(input_variables): + input_variables[i] = to_numeric_array(variable, variable_names[i]) + + bin_centers, bin_mean, bin_max, bin_min, bin_mean_std, bin_max_std, bin_min_std = ( + input_variables + ) x_label = kwargs.get("x_label", None) y_label = kwargs.get("y_label", None) @@ -221,16 +230,16 @@ def plot_bin_statistics( ax.grid(alpha=0.5) ax.legend(loc="best") - if x_label != None: + if x_label: ax.set_xlabel(x_label) - if y_label != None: + if y_label: ax.set_ylabel(y_label) - if title != None: + if title: ax.set_title(title) fig.tight_layout() - if save_path == None: + if save_path is None: plt.show() else: fig.savefig(save_path) diff --git a/mhkit/tests/loads/test_loads.py b/mhkit/tests/loads/test_loads.py index a4e07e5d3..8c119a38e 100644 --- a/mhkit/tests/loads/test_loads.py +++ b/mhkit/tests/loads/test_loads.py @@ -272,7 +272,7 @@ def test_plot_bin_statistics_type_errors(self): # Test invalid data types one at a time with self.assertRaises(TypeError): loads.graphics.plot_bin_statistics( - [1, 2, 3], # Invalid bin_centers (list instead of np.ndarray) + ["a", 2, 3], # Invalid bin_centers bin_mean, bin_max, bin_min, @@ -284,7 +284,7 @@ def test_plot_bin_statistics_type_errors(self): with self.assertRaises(TypeError): loads.graphics.plot_bin_statistics( bin_centers, - [10, 20, 30], # Invalid bin_mean (list instead of np.ndarray) + ["a", 20, 30], # Invalid bin_mean bin_max, bin_min, bin_mean_std, @@ -296,7 +296,7 @@ def test_plot_bin_statistics_type_errors(self): loads.graphics.plot_bin_statistics( bin_centers, bin_mean, - [15, 25, 35], # Invalid bin_max (list instead of np.ndarray) + ["a", 25, 35], # Invalid bin_max bin_min, bin_mean_std, bin_max_std, @@ -308,7 +308,7 @@ def test_plot_bin_statistics_type_errors(self): bin_centers, bin_mean, bin_max, - [5, 15, 25], # Invalid bin_min (list instead of np.ndarray) + ["a", 15, 25], # Invalid bin_min bin_mean_std, bin_max_std, bin_min_std, @@ -320,7 +320,7 @@ def test_plot_bin_statistics_type_errors(self): bin_mean, bin_max, bin_min, - [1, 2, 3], # Invalid bin_mean_std (list instead of np.ndarray) + ["a", 2, 3], # Invalid bin_mean_std bin_max_std, bin_min_std, ) @@ -332,7 +332,7 @@ def test_plot_bin_statistics_type_errors(self): bin_max, bin_min, bin_mean_std, - [0.5, 1.5, 2.5], # Invalid bin_max_std (list instead of np.ndarray) + ["a", 1.5, 2.5], # Invalid bin_max_std bin_min_std, ) @@ -344,7 +344,7 @@ def test_plot_bin_statistics_type_errors(self): bin_min, bin_mean_std, bin_max_std, - [0.8, 1.8, 2.8], # Invalid bin_min_std (list instead of np.ndarray) + ["a", 1.8, 2.8], # Invalid bin_min_std ) diff --git a/mhkit/utils/type_handling.py b/mhkit/utils/type_handling.py new file mode 100644 index 000000000..844850b2d --- /dev/null +++ b/mhkit/utils/type_handling.py @@ -0,0 +1,23 @@ +import numpy as np +import pandas as pd +import xarray as xr + + +def to_numeric_array(data, name): + """ + Convert input data to a numeric array, ensuring all elements are numeric. + """ + if isinstance(data, (list, np.ndarray, pd.Series, xr.DataArray)): + data = np.asarray(data) + if not np.issubdtype(data.dtype, np.number): + raise TypeError( + (f"{name} must contain numeric data." + f" Got data type: {data.dtype}") + ) + else: + raise TypeError( + ( + f"{name} must be a list, np.ndarray, pd.Series," + + f" or xr.DataArray. Got: {type(data)}" + ) + ) + return data