Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-38258: Disable implicit threading #29

Open
wants to merge 6 commits into
base: lsst-dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ deprecated
pyyaml
nose
getCalspec>=2.0.0
threadpoolctl
16 changes: 9 additions & 7 deletions spectractor/extractor/spectrum.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import random
import string
import astropy
from threadpoolctl import threadpool_limits

from spectractor import parameters
from spectractor.config import set_logger, load_config, update_derived_parameters, apply_rebinning_to_parameters
Expand Down Expand Up @@ -1842,13 +1843,14 @@ def shift_minimizer(params):
# bounds=((D - 5 * parameters.DISTANCE2CCD_ERR, D + 5 * parameters.DISTANCE2CCD_ERR), (-2, 2)))
error = [parameters.DISTANCE2CCD_ERR, pixel_shift_step]
fix = [False, False]
m = Minuit(shift_minimizer, start)
m.errors = error
m.errordef = 1
m.fixed = fix
m.print_level = 0
m.limits = ((D - 5 * parameters.DISTANCE2CCD_ERR, D + 5 * parameters.DISTANCE2CCD_ERR), (-2, 2))
m.migrad()
with threadpool_limits(limits=1):
m = Minuit(shift_minimizer, start)
m.errors = error
m.errordef = 1
m.fixed = fix
m.print_level = 0
m.limits = ((D - 5 * parameters.DISTANCE2CCD_ERR, D + 5 * parameters.DISTANCE2CCD_ERR), (-2, 2))
m.migrad()
# if parameters.DEBUG:
# print(m.prin)
# if not res.success:
Expand Down
120 changes: 64 additions & 56 deletions spectractor/fit/fitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from spectractor.config import set_logger
from spectractor.tools import formatting_numbers, compute_correlation_matrix, plot_correlation_matrix_simple
from spectractor.fit.statistics import Likelihood
from threadpoolctl import threadpool_limits


class FitWorkspace:
Expand Down Expand Up @@ -543,46 +544,46 @@ def save_parameters_summary(self, ipar, header=""):
A header to add to the file (default: "").
"""
output_filename = os.path.splitext(self.filename)[0] + "_bestfit.txt"

#print(">>>>> \t fitter.py :: save_parameters_summary :: output_filename = ", output_filename)

f = open(output_filename, 'w')
txt = self.filename + "\n"
if header != "":
txt += header + "\n"

#print(">>>>> \t save_parameters_summary :: cov = ",self.cov, " type = ",type(self.cov), " shape = ",self.cov.shape)

mycov = np.copy(self.cov)
maxk = np.min(mycov.shape)

for k, ip in enumerate(ipar):
#print(">>>> \t \t k = ", k)

if k < maxk:

covariance_matrix_element = mycov[k, k]
#print(">>>>> \t save_parameters_summary :: k = ", k ,
# " , ip = ", ip ,
#print(">>>>> \t save_parameters_summary :: k = ", k ,
# " , ip = ", ip ,
# " p[ip] = " , self.p[ip],
# " , label = ", self.input_labels[ip] ,
# " , label = ", self.input_labels[ip] ,
# " , cov = ",covariance_matrix_element)

if covariance_matrix_element >= 0:
covariance_matrix_element_sigma = np.sqrt(covariance_matrix_element)
else:
#print(">>>>> \t save_parameters_summary :: k = ", k , " , ip = ", ip , " , label = ", self.input_labels[ip] , " , Negative cov = ",covariance_matrix_element)
covariance_matrix_element_sigma = np.sqrt(-covariance_matrix_element)

txt += "%s: %s +%s - %s\n" % formatting_numbers(self.p[ip], covariance_matrix_element_sigma,
covariance_matrix_element_sigma,
label=self.input_labels[ip])
else:
#print(">>>>> \t save_parameters_summary :: SKIP k = ", k , ' >= kmax = ',maxk)
pass



#txt += "%s: %s +%s - %s\n" % formatting_numbers(self.p[ip], np.sqrt(self.cov[k, k]),
# np.sqrt(self.cov[k, k]),
# label=self.input_labels[ip])
Expand Down Expand Up @@ -726,34 +727,35 @@ def chisq(self, p, model_output=False):

"""
# check data format
if (self.data.dtype != object and self.data.ndim > 1) or (self.err.dtype != object and self.err.ndim > 1):
raise ValueError("Fitworkspace.data and Fitworkspace.err must be a flat 1D array,"
" or an array of flat arrays of unequal lengths.")
# prepare weight matrices in case they have not been built before
self.prepare_weight_matrices()
x, model, model_err = self.simulate(*p)
W = self.compute_W_with_model_error(model_err)
if W.ndim == 1 and W.dtype != object:
res = (model - self.data)
chisq = res @ (W * res)
elif W.dtype == object:
K = len(W)
res = [model[k] - self.data[k] for k in range(K)]
if W[0].ndim == 1:
chisq = np.sum([res[k] @ (W[k] * res[k]) for k in range(K)])
elif W[0].ndim == 2:
chisq = np.sum([res[k] @ W[k] @ res[k] for k in range(K)])
with threadpool_limits(limits=1): # methods in here thread-thrash if not limited
if (self.data.dtype != object and self.data.ndim > 1) or (self.err.dtype != object and self.err.ndim > 1):
raise ValueError("Fitworkspace.data and Fitworkspace.err must be a flat 1D array,"
" or an array of flat arrays of unequal lengths.")
# prepare weight matrices in case they have not been built before
self.prepare_weight_matrices()
x, model, model_err = self.simulate(*p)
W = self.compute_W_with_model_error(model_err)
if W.ndim == 1 and W.dtype != object:
res = (model - self.data)
chisq = res @ (W * res)
elif W.dtype == object:
K = len(W)
res = [model[k] - self.data[k] for k in range(K)]
if W[0].ndim == 1:
chisq = np.sum([res[k] @ (W[k] * res[k]) for k in range(K)])
elif W[0].ndim == 2:
chisq = np.sum([res[k] @ W[k] @ res[k] for k in range(K)])
else:
raise ValueError(f"First element of fitworkspace.W has no ndim attribute or has a dimension above 2. "
f"I get W[0]={W[0]}")
elif W.ndim == 2 and W.dtype != object:
res = (model - self.data)
chisq = res @ W @ res
else:
raise ValueError(f"First element of fitworkspace.W has no ndim attribute or has a dimension above 2. "
f"I get W[0]={W[0]}")
elif W.ndim == 2 and W.dtype != object:
res = (model - self.data)
chisq = res @ W @ res
else:
raise ValueError(
f"Data inverse covariance matrix must be a np.ndarray of dimension 1 or 2,"
f"either made of 1D or 2D arrays of equal lengths or not for block diagonal matrices."
f"\nHere W type is {type(W)}, shape is {W.shape} and W is {W}.")
raise ValueError(
f"Data inverse covariance matrix must be a np.ndarray of dimension 1 or 2,"
f"either made of 1D or 2D arrays of equal lengths or not for block diagonal matrices."
f"\nHere W type is {type(W)}, shape is {W.shape} and W is {W}.")
if model_output:
return chisq, x, model, model_err
else:
Expand Down Expand Up @@ -1045,7 +1047,9 @@ def line_search(alpha):
return fit_workspace.chisq(tmp_params_2)

# tol parameter acts on alpha (not func)
alpha_min, fval, iter, funcalls = optimize.brent(line_search, full_output=True, tol=5e-1, brack=(0, 1))
with threadpool_limits(limits=1):
alpha_min, fval, iter, funcalls = optimize.brent(line_search, full_output=True, tol=5e-1,
brack=(0, 1))
else:
alpha_min = 1
fval = np.copy(cost)
Expand Down Expand Up @@ -1331,8 +1335,9 @@ def run_minimisation(fit_workspace, method="newton", epsilon=None, fix=None, xto

if method == "minimize":
start = time.time()
result = optimize.minimize(nll, fit_workspace.p, method=minimizer_method,
options={'ftol': ftol, 'maxiter': 100000}, bounds=bounds)
with threadpool_limits(limits=1):
result = optimize.minimize(nll, fit_workspace.p, method=minimizer_method,
options={'ftol': ftol, 'maxiter': 100000}, bounds=bounds)
fit_workspace.p = result['x']
if verbose:
my_logger.debug(f"\n\t{result}")
Expand All @@ -1342,7 +1347,8 @@ def run_minimisation(fit_workspace, method="newton", epsilon=None, fix=None, xto
elif method == 'basinhopping':
start = time.time()
minimizer_kwargs = dict(method=minimizer_method, bounds=bounds)
result = optimize.basinhopping(nll, guess, minimizer_kwargs=minimizer_kwargs)
with threadpool_limits(limits=1):
result = optimize.basinhopping(nll, guess, minimizer_kwargs=minimizer_kwargs)
fit_workspace.p = result['x']
if verbose:
my_logger.debug(f"\n\t{result}")
Expand All @@ -1355,8 +1361,9 @@ def run_minimisation(fit_workspace, method="newton", epsilon=None, fix=None, xto
start = time.time()
x_scale = np.abs(guess)
x_scale[x_scale == 0] = 0.1
p = optimize.least_squares(fit_workspace.weighted_residuals, guess, verbose=2, ftol=1e-6, x_scale=x_scale,
diff_step=0.001, bounds=bounds.T)
with threadpool_limits(limits=1):
p = optimize.least_squares(fit_workspace.weighted_residuals, guess, verbose=2, ftol=1e-6,
x_scale=x_scale, diff_step=0.001, bounds=bounds.T)
fit_workspace.p = p.x # m.np_values()
if verbose:
my_logger.debug(f"\n\t{p}")
Expand All @@ -1374,14 +1381,15 @@ def run_minimisation(fit_workspace, method="newton", epsilon=None, fix=None, xto
fix = [False] * guess.size
# noinspection PyArgumentList
# m = Minuit(fcn=nll, values=guess, error=error, errordef=1, fix=fix, print_level=verbose, limit=bounds)
m = Minuit(nll, np.copy(guess))
m.errors = error
m.errordef = 1
m.fixed = fix
m.print_level = verbose
m.limits = bounds
m.tol = 10
m.migrad()
with threadpool_limits(limits=1):
m = Minuit(nll, np.copy(guess))
m.errors = error
m.errordef = 1
m.fixed = fix
m.print_level = verbose
m.limits = bounds
m.tol = 10
m.migrad()
fit_workspace.p = np.array(m.values[:])
if verbose:
my_logger.debug(f"\n\t{m}")
Expand Down
26 changes: 16 additions & 10 deletions spectractor/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from spectractor.config import set_logger
from spectractor import parameters
from math import floor
from threadpoolctl import threadpool_limits

from numba import njit

Expand Down Expand Up @@ -151,8 +152,10 @@ def fit_gauss(x, y, guess=[10, 1000, 1], bounds=(-np.inf, np.inf), sigma=None):
"""
def gauss_jacobian_wrapper(*params):
return np.array(gauss_jacobian(*params)).T
popt, pcov = curve_fit(gauss, x, y, p0=guess, bounds=bounds, tr_solver='exact', jac=gauss_jacobian_wrapper,
sigma=sigma, method='dogbox', verbose=0, xtol=1e-15, ftol=1e-15)
with threadpool_limits(limits=1):
popt, pcov = curve_fit(gauss, x, y, p0=guess, bounds=bounds, tr_solver='exact',
jac=gauss_jacobian_wrapper, sigma=sigma, method='dogbox',
verbose=0, xtol=1e-15, ftol=1e-15)
return popt, pcov


Expand Down Expand Up @@ -226,7 +229,9 @@ def fit_multigauss_and_line(x, y, guess=[0, 1, 10, 1000, 1, 0], bounds=(-np.inf,
[ 1. 10. 20. 650. 3. 40. 750. 10.]
"""
maxfev = 1000
popt, pcov = curve_fit(multigauss_and_line, x, y, p0=guess, bounds=bounds, maxfev=maxfev, absolute_sigma=True)
with threadpool_limits(limits=1):
popt, pcov = curve_fit(multigauss_and_line, x, y, p0=guess, bounds=bounds, maxfev=maxfev,
absolute_sigma=True)
return popt, pcov


Expand Down Expand Up @@ -408,9 +413,10 @@ def fit_multigauss_and_bgd(x, y, guess=[0, 1, 10, 1000, 1, 0], bounds=(-np.inf,
plt.show()
"""
maxfev = 10000
popt, pcov = curve_fit(multigauss_and_bgd, x, y, p0=guess, bounds=bounds, maxfev=maxfev, sigma=sigma,
absolute_sigma=True, method='trf', xtol=1e-4, ftol=1e-4, verbose=0,
jac=multigauss_and_bgd_jacobian, x_scale='jac')
with threadpool_limits(limits=1):
popt, pcov = curve_fit(multigauss_and_bgd, x, y, p0=guess, bounds=bounds, maxfev=maxfev, sigma=sigma,
absolute_sigma=True, method='trf', xtol=1e-4, ftol=1e-4, verbose=0,
jac=multigauss_and_bgd_jacobian, x_scale='jac')
# error = 0.1 * np.abs(guess) * np.ones_like(guess)
# z = np.where(np.isclose(error,0.0,1e-6))
# error[z] = 0.01
Expand Down Expand Up @@ -1388,15 +1394,15 @@ def weighted_avg_and_std(values, weights):
Return the weighted average and standard deviation.

values, weights -- Numpy ndarrays with the same shape.

For example for the PSF

x=pixel number
y=Intensity in pixel

values-x
weights=y=f(x)

"""
average = np.average(values, weights=weights)
variance = np.average((values - average) ** 2, weights=weights) # Fast and numerically precise
Expand Down
4 changes: 3 additions & 1 deletion tests/run_full_chain.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from numpy.testing import run_module_suite
from threadpoolctl import threadpool_limits
import sys

from test_simulator import *
Expand All @@ -14,4 +15,5 @@
"Use '--all' to run all tests.")
args.append("-a!slow")

run_module_suite(argv=args)
with threadpool_limits(limits=1):
run_module_suite(argv=args)
4 changes: 3 additions & 1 deletion tests/run_tests.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from numpy.testing import run_module_suite
from threadpoolctl import threadpool_limits
import sys

# from test_astrometry import *
Expand All @@ -15,4 +16,5 @@
print("Running tests that are not tagged as 'slow'. "
"Use '--all' to run all tests.")
args.append("-a!slow")
run_module_suite(argv=args)
with threadpool_limits(limits=1):
run_module_suite(argv=args)
4 changes: 3 additions & 1 deletion tests/test_astrometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import subprocess # noqa: E402
import numpy as np # noqa: E402
import unittest # noqa: E402
from threadpoolctl import threadpool_limits # noqa: E402


# TODO: DM-33441 Fix broken spectractor tests
Expand Down Expand Up @@ -70,4 +71,5 @@ def test_astrometry():


if __name__ == "__main__":
run_module_suite()
with threadpool_limits(limits=1):
run_module_suite()
5 changes: 3 additions & 2 deletions tests/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import sys # noqa: E402
import numpy as np # noqa: E402
import unittest # noqa: E402
from threadpoolctl import threadpool_limits # noqa: E402


def test_logbook():
Expand Down Expand Up @@ -151,5 +152,5 @@ def extractor_auxtel():


if __name__ == "__main__":

run_module_suite()
with threadpool_limits(limits=1):
run_module_suite()
4 changes: 3 additions & 1 deletion tests/test_fitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import os # noqa: E402
import sys # noqa: E402
import unittest # noqa: E402
from threadpoolctl import threadpool_limits # noqa: E402


class LineFitWorkspace(FitWorkspace):
Expand Down Expand Up @@ -135,4 +136,5 @@ def test_minimisation_sigma_clipping(seed=42):


if __name__ == "__main__":
run_module_suite()
with threadpool_limits(limits=1):
run_module_suite()
4 changes: 3 additions & 1 deletion tests/test_fullchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import numpy as np # noqa: E402
import matplotlib.pyplot as plt # noqa: E402
import unittest # noqa: E402
from threadpoolctl import threadpool_limits # noqa: E402


# original parameters
Expand Down Expand Up @@ -205,4 +206,5 @@ def test_ctio_fullchain():


if __name__ == "__main__":
run_module_suite()
with threadpool_limits(limits=1):
run_module_suite()
Loading