-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathultraflatstask.py
121 lines (97 loc) · 4.48 KB
/
ultraflatstask.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
"""
Ultraflats task for EOTest code.
"""
from os.path import join
import glob
import numpy as np
import scipy.optimize
import astropy.io.fits as fits
from lsst.eotest.fitsTools import fitsTableFactory, fitsWriteto
import lsst.eotest.image_utils as imutils
from lsst.eotest.sensor.MaskedCCD import MaskedCCD
from lsst.eotest.sensor.EOTestResults import EOTestResults
from lsst.eotest.sensor.AmplifierGeometry import makeAmplifierGeometry
from scipy import ndimage, stats
import sys
import lsst.afw.image as afwImage
import lsst.afw.math as afwMath
import lsst.pex.config as pexConfig
import lsst.pipe.base as pipeBase
def glob_flats(full_path, outfile = 'ultraflats.txt'):
flats = glob.glob(join(full_path, '20*.fits'))
output = open(outfile, 'w')
for item in flats:
output.write('%s\n' % item)
output.close()
class UltraflatStackStats(object):
def __init__(self, fmean, fvar):
self.flats_mean = fmean
self.flats_var = fvar
self.flats_med = fmed
class PtcConfig(pexConfig.Config):
"""Configuration for ptc task"""
output_dir = pexConfig.Field("Output directory", str, default='.')
eotest_results_file = pexConfig.Field("EO test results filename", str, default=
None)
max_frac_offset = pexConfig.Field("maximum fraction offset from median gain curve to omit points from PTC fit.", float, default=0.2)
verbose = pexConfig.Field("Turn verbosity on", bool, default=True)
class UltraFlatsTask(pipeBase.Task):
"""Task to compute a variety of statistics on ultraflats dataset."""
ConfigClass = PtcConfig
_DefaultName = "UltraflatsTask"
@pipeBase.timeMethod
def stack(self, sensor_id, infiles, mask_files, gains, binsize=1, bias_frame = None,exps=['0.27','0.54','1.60','2.40']):
for exp in exps:
all_amps = imutils.allAmps(infiles[0])
#ptc_stats = dict([amp, ([],[])) for amp in all_amps])
exposure = []
bitpix = None
overscan = makeAmplifierGeometry(infiles[0]).serial_overscan
mean_stack = fits.open(infiles[0])
var_stack = fits.open(infiles[0])
sum_stack = fits.open(infiles[0])
median_stack = fits.open(infiles[0])
for amp in imutils.allAmps(infiles[0]):
print 'on amp number', amp
images = afwImage.vectorImageF()
for idx, infile in enumerate(infiles):
print infile
ccd = MaskedCCD(infile, mask_files = (), bias_frame = bias_frame)
image = ccd.unbiased_and_trimmed_image(amp)
if idx == 0:
fratio_im = afwImage.MaskedImageF(image)
fratio_im = fratio_im.getImage().getArray()
image_array = image.getImage().getArray()
fratio = np.mean(fratio_im[50:fratio_im.shape[0]-50,50:fratio_im.shape[1]-50])/np.mean(image_array[50:image_array.shape[0]-50,50:image_array.shape[1]-50])
image = afwImage.MaskedImageF(image).getImage()
image *= fratio
images.push_back(image)
meanstack_image = afwMath.statisticsStack(images, afwMath.MEAN)
varstack_image = afwMath.statisticsStack(images, afwMath.VARIANCE)
sumstack_image = afwMath.statisticsStack(images, afwMath.SUM)
medianstack_image = afwMath.statisticsStack(images, afwMath.MEDIAN)
mean_stack[amp].data = meanstack_image.getArray()
var_stack[amp].data = varstack_image.getArray()
sum_stack[amp].data = sumstack_image.getArray()
median_stack[amp].data = medianstack_image.getArray()
if bitpix is not None:
imutils.set_bitpix(output[amp], bitpix)
fitsWriteto(mean_stack, 'mean_image_'+exp+'.fits',clobber = True)
fitsWriteto(var_stack, 'var_image_'+exp+'.fits', clobber = True)
fitsWriteto(sum_stack, 'sum_image_'+exp+'.fits', clobber = True)
fitsWriteto(median_stack, 'median_image_'+exp+'.fits', clobber = True)
def single_pixel_ptc(self,meanimages,varimages,infiles):
gain_map_image = fits.open(infiles[0])
for amp in imutils.allAmps(infiles[0]):
meanmaps = [afwImage.ImageF(meanimage).getArray() for meanimage in meanimages]
varmaps = [afwImage.ImageF(varimage).getArray() for varimage in varimages]
gain_map = np.zeros((meanmaps[0].shape[0],meanmaps[0].shape[1]))
for x in range(meanmaps[0].shape[0]):
for y in range(meanmaps[0].shape[1]):
means = [meanmap[x][y] for meanmap in meanmaps]
varrs = [varmap[x][y] for varmap in varmaps]
slope, intercept, r_value, p_value, std_err = stats.linregress(means,varrs)
gain_map[x][y] = 1/slope
gain_map_image[amp].data = gain_map
print np.median(gain_map)
fitsWriteto(gain_map_image, 'gain_map.fits',clobber=True)