#The following methods were extracted from https://github.com/aimalz/qp/blob/master/qp/pdf.py and modified only so that they would run within this notebook. The math has not changed.
+from scipy.interpolate import interp1d, InterpolatedUnivariateSpline
+import sys
+
+epsilon = sys.float_info.epsilon
+infty = sys.float_info.max * epsilon
+
+def sandwich(in_arr, ends):
+ """
+ Adds given values to the ends of a 1D array
+
+ Parameters
+ ----------
+ in_arr: numpy.ndarray, float
+ original array
+ ends: numpy.ndarray or tuple or list, float or numpy.ndarray, float
+ values to be added to the beginning and end
+
+ Returns
+ -------
+ out_arr: numpy.ndarray, float
+ array with front and back concatenations
+ """
+ if type(ends[0]) == np.ndarray:
+ prepend = len(ends[0])
+ else:
+ prepend = 1
+ if type(ends[-1]) == np.ndarray:
+ append = -1 * len(ends[-1])
+ else:
+ append = -1
+ out_arr = np.zeros(prepend + len(in_arr) - append)
+ out_arr[:prepend] = ends[0]
+ out_arr[prepend:append] = in_arr
+ out_arr[append:] = ends[-1]
+ return out_arr
+
+def evaluate_histogram(in_data, threshold=epsilon):
+ """
+ Produces PDF values given samples
+
+ Parameters
+ ----------
+ in_data: None or tuple, numpy.ndarray, float
+ tuple of (n+1) bin endpoints x and (n) CDF y between endpoints
+ threshold: float, optional
+
+ vb: boolean, optional
+ be careful and print progress to stdout?
+
+ Returns
+ -------
+ out_data: tuple, float
+ sorted samples x and corresponding PDF values y
+ """
+ # x = locs (or in our case redshift values)
+ # y = first derivatives of delta_quant/delta_loc or in our case p(z)
+ (x, y) = in_data
+ dx = threshold
+ xs = np.zeros(2 * len(y))
+ ys = xs
+ # ! xs defines the bin edges.
+ # ! This is creating the bin edges. xs[0] and xs[1] are the edges of the first bin.
+ # ! xs[2] = xs[1]+epsilon becomes the beginning of the second bin, etc.
+ # ! Then we "repeat" the y values so that you end up with histogram steps.
+ xs[::2] = x[:-1] + dx
+ xs[1::2] = x[1:] - dx
+ ys = np.repeat(y, 2)
+ xs = sandwich(xs, (x[0] - dx, x[-1] + dx))
+ ys = sandwich(ys, (threshold, threshold))
+ out_data = (xs, ys)
+ return out_data
+
+def evaluate_quantiles(in_data, threshold=epsilon):
+ """
+ Estimates PDF values given quantile information
+
+ Parameters
+ ----------
+ in_data: tuple, numpy.ndarray, float
+ tuple of CDF values iy and values x at which those CDFs are achieved
+ threshold: float, optional
+ optional minimum threshold for CDF difference
+ vb: boolean, optional
+ be careful and print progress to stdout?
+
+ Returns
+ -------
+ out_data: tuple, numpy.ndarray, float
+ values xs and corresponding PDF values ys
+ """
+
+ # iy = quants
+ # x = locs
+ (iy, x) = in_data
+
+ # This is the same as np.diff(x)
+ dx = x[1:] - x[:-1]
+
+ # This is the same as np.diff(iy)
+ diy = iy[1:] - iy[:-1]
+
+ # this is the numerical first derivative i.e. p(z)
+ y = diy / dx
+
+ # evaluate_histogram(locs, first_derivs)
+ (xs, ys) = evaluate_histogram((x, y), threshold=threshold)
+ out_data = (xs[1:-1], ys[1:-1])
+ return out_data
+
+def normalize_quantiles(in_data, threshold=epsilon):
+ """
+ Evaluates PDF from quantiles including endpoints from linear extrapolation
+
+ Parameters
+ ----------
+ in_data: tuple, numpy.ndarray, float
+ tuple of CDF values iy corresponding to quantiles and the points x at
+ which those CDF values are achieved
+ threshold: float, optional
+ optional minimum threshold for PDF
+ vb: boolean, optional
+ be careful and print progress to stdout?
+
+ Returns
+ -------
+ out_data: tuple, ndarray, float
+ tuple of values x at which CDF is achieved, including extrema, and
+ normalized PDF values y at x
+ """
+
+ # iy = quants
+ # x = locs
+ (iy, x) = in_data
+ (xs, ys) = evaluate_quantiles((iy, x))
+ # xs = xs[1:-1]
+ # ys = ys[1:-1]
+
+ # ! I believe that this is just using the slope to add end points to the list of x values.
+ x_min = xs[0] - 2 * iy[0] / ys[0]
+ x_max = xs[-1] + 2 * (1. - iy[-1]) / ys[-1]
+ xs = sandwich(xs, (x_min, x_max))
+ ys = sandwich(ys, (threshold, threshold))
+ out_data = (xs, ys)
+ return out_data
+
+def normalize_gridded(in_data, thresholds=(epsilon, infty)):
+ """
+ Removes extreme values from gridded parametrizations
+
+ Parameters
+ ----------
+ in_data: None or tuple, numpy.ndarray, float
+ tuple of points x at which function is evaluated and the PDF y at those
+ points
+ thresholds: tuple, float, optional
+ optional min/max thresholds for normalization
+
+ Returns
+ -------
+ out_data: tuple, numpy.ndarray, float
+ tuple of input x and normalized y
+ """
+ if in_data is None:
+ return in_data
+ (x, y) = in_data
+ y[y < thresholds[0]] = thresholds[0]
+ y[y > thresholds[-1]] = thresholds[-1]
+ out_data = (x, y)
+ return out_data
+
+def original_qp_interpolation(quants, locs):
+
+ # ! These are the variable name substitutions I've made
+ # quants_and_locs = self.quantiles # A 2d array of quantiles and locations
+ # scheme = scheme # using 'linear', used to determine the type of interpolation with interp1d
+
+ scheme = 'linear'
+ quants_and_locs = np.array([quants, locs])
+
+ if type(scheme) != int:
+ order = min(5, len(quants_and_locs[0]))
+ else:
+ order = scheme
+
+ # ! Looks like only min(x) and max(x) are used after this in this function, but x,y are used in `quantile_interpolator` later.
+ (x, y) = normalize_quantiles(quants_and_locs)
+ z = np.insert(quants_and_locs[1], 0, min(x))
+ z = np.append(z, max(x))
+ q = np.insert(quants_and_locs[0], 0, 0.)
+ q = np.append(q, 1.)
+
+ [x_crit_lo, x_crit_hi] = [quants_and_locs[1][0], quants_and_locs[1][-1]]
+ [y_crit_lo, y_crit_hi] = [-1., -1.]
+
+ try:
+ while (order>0) and ((y_crit_lo <= 0.) or (y_crit_hi <= 0.)):
+ inside = InterpolatedUnivariateSpline(z, q, k=order, ext=1).derivative()
+ [y_crit_lo, y_crit_hi] = inside([x_crit_lo, x_crit_hi])
+ order -= 1
+ assert((y_crit_lo > 0.) and (y_crit_hi > 0.))
+ except AssertionError:
+ print('ERROR: spline tangents '+str((y_crit_lo, y_crit_hi))+'<0')
+ if type(scheme) == str:
+ this_scheme = scheme
+ else:
+ this_scheme = 'linear'
+ inside_int = interp1d(z, q, kind=this_scheme, bounds_error=False, fill_value=epsilon)
+ derivative = (q[1:] - q[:-1]) / (z[1:] - z[:-1])
+ derivative = np.insert(derivative, 0, epsilon)
+ derivative = np.append(derivative, epsilon)
+ def inside(xf):
+ nx = len(xf)
+ yf = np.ones(nx) * epsilon
+ for n in range(nx):
+ i = bisect.bisect_left(z, xf[n])
+ yf[n] = derivative[i]
+ return(yf)
+ [y_crit_lo, y_crit_hi] = inside([x_crit_lo, x_crit_hi])
+ assert((y_crit_lo > 0.) and (y_crit_hi > 0.))
+ return quants_and_locs, x_crit_lo, x_crit_hi, y_crit_lo, y_crit_hi, x, y, z, inside
+
+def quantile_interpolator(quants_and_locs, x_crit_lo, x_crit_hi, y_crit_lo, y_crit_hi, x, y, z, inside, xf):
+ yf = np.ones(np.shape(xf)) * epsilon
+ in_inds = ((xf >= quants_and_locs[1][0]) & (xf <= quants_and_locs[1][-1])).nonzero()[0]
+ lo_inds = ((xf < quants_and_locs[1][0]) & (xf >= z[0])).nonzero()[0]
+ hi_inds = ((xf > quants_and_locs[1][-1]) & (xf <= z[-1])).nonzero()[0]
+
+ try:
+ yf[in_inds] = inside(xf[in_inds])
+ assert(np.all(yf >= epsilon))
+
+ except AssertionError:
+ print('ERROR: spline interpolation failed with '+str((xf[in_inds], yf[in_inds])))
+ try:
+ alternate = interp1d(x, y, kind='linear', bounds_error=False, fill_value=epsilon)
+ yf[in_inds] = alternate(xf[in_inds])
+ assert(np.all(yf >= epsilon))
+
+ except AssertionError:
+ print('ERROR: linear interpolation failed for the '+using+' parametrization with '+str((xf[in_inds], yf[in_inds])))
+ backup = qp.utils.make_kludge_interpolator((x, y), threshold=epsilon)
+ yf[in_inds] = backup(xf[in_inds])
+
+ assert(np.all(yf >= epsilon))
+
+ try:
+ tan_lo = y_crit_lo / (x_crit_lo - z[0])
+ yf[lo_inds] = tan_lo * (xf[lo_inds] - z[0])# yf[in_inds[0]] / (xf[in_inds[0]] - z[0])
+ assert(np.all(yf >= epsilon))
+ except AssertionError:
+ print('ERROR: linear extrapolation below failed with '+str((xf[lo_inds], yf[lo_inds]))+' via '+str((tan_lo, x_crit_lo, z[0])))
+
+ try:
+ tan_hi = y_crit_hi / (z[-1] - x_crit_hi)
+ yf[hi_inds] = tan_hi * (z[-1] - xf[hi_inds])# yf[in_inds[-1]] * (xf[hi_inds] - z[-1]) / (xf[in_inds[-1]] - z[-1])
+ assert(np.all(yf >= epsilon))
+ except AssertionError:
+ print('ERROR: linear extrapolation above failed with '+str((xf[hi_inds], yf[hi_inds]))+' via '+str((tan_hi, z[-1], x_crit_hi)))
+
+ return(yf)
+
+def approximate(quants, locs, grid=None):
+ """
+ Interpolates the parametrization to get an approximation to the density.
+
+ Parameters
+ ----------
+ points: ndarray
+ the value(s) at which to evaluate the interpolated function
+
+
+ Returns
+ -------
+ points: ndarray, float
+ the input grid upon which to interpolate
+ interpolated: ndarray, float
+ the interpolated points.
+
+ Notes
+ -----
+ Extrapolation is via the `scheme` while values are positive;
+ otherwise, extrapolation returns 0.
+
+ Example:
+ x, y = p.approximate(np.linspace(-1., 1., 100))
+ """
+ if grid is None:
+ grid = locs
+
+ quants_and_locs, x_crit_lo, x_crit_hi, y_crit_lo, y_crit_hi, x, y, z, inside = original_qp_interpolation(quants, locs)
+
+ interpolated = quantile_interpolator(quants_and_locs, x_crit_lo, x_crit_hi, y_crit_lo, y_crit_hi, x, y, z, inside, grid)
+
+ interpolated = normalize_gridded((grid, interpolated))
+
+ return interpolated
+