import numpy as np
import shutil
import astropy
from photutils.detection import DAOStarFinder
import code
import copy
from astropy import units as u
from astropy.table import Table
from astropy.nddata import StdDevUncertainty
from pyvista.dataclass import Data
from astropy.io import fits, ascii
from astropy.wcs import WCS
from astropy.modeling import models, fitting
from astropy.convolution import convolve, Box1DKernel, Box2DKernel, Box2DKernel, Gaussian1DKernel
from holtztools import html, plots
try : import astroscrappy
except : print('no astroscrappy')
import scipy.signal
from scipy.optimize import curve_fit
import yaml
import subprocess
import sys
import tempfile
from pyvista import stars, image, tv, bitmask, dataclass, spectra
try: from pyvista import apogee
except : pass
import pyvista.data as DATA
import warnings
from astropy.utils.exceptions import AstropyWarning
warnings.simplefilter('ignore', category=AstropyWarning)
import matplotlib
import matplotlib.pyplot as plt
import glob
import bz2
import os
import pdb
import importlib_resources
try:
import pyds9
except:
#print('pyds9 is not available, proceeding')
pass
[docs]class Reducer() :
""" Class for reducing images of a given instrument
Parameters
----------
Attributes
----------
dir : str
root : str
verbose : bool
inst : str
badpix : str
scat : int
mask : str
transpose : bool
scale : float
biastype : int
gain : float
rn : float
namp : int
crbox : list
biasavg : int
biasregion :
trimbox :
outbox :
"""
def __init__(self,inst=None,conf='',dir='./',root='*',formstr='{:04d}.f*',
gain=1,rn=0.,saturation=2**32,verbose=True,nfowler=1,
trim=False) :
""" Initialize reducer with information about how to reduce
Parameters
----------
inst : str, default=None
configuration file name to read for instrument
conf : str, default=''
configuration suffix to add to configuration file name
dir : str,default='./'
default directory to get images from
trim : bool, default=False
if True, trim calibration products
formstr : str, default='{04d}.f*'
sets format string to find images given integer ID,
if not reading from configuration file
gain : float, default=1
gain if not reading from configuration file
rn : float, default=0
rn if not reading from configuration file
saturation : int, default=2**32
saturation value if not reading from configuration file
verbose : bool, default=False
turn on verbose output
nfowler : integer, default=1
nfowler value if not reading from configuration file
"""
self.dir=dir
self.root=root
self.verbose=verbose
self.badpix=None
self.scat=None
self.scat_smooth=None
self.bitmask=None
self.transpose=None
self.trim=trim
self.scale=1
self.biastype=-1
self.biasbox=[]
self.trimbox=[]
self.normbox=[]
self.outbox=[]
self.headerbox=False
self.ext='fits'
self.inst='generic'
# we will allow for instruments to have multiple channels, so everything goes in lists
self.channels=['']
if type(gain) is list : self.gain=gain
else : self.gain = [gain]
if type(rn) is list : self.rn=rn
else : self.rn = [rn]
if type(saturation) is list : self.saturation=saturation
else : self.saturation = [saturation]
if type(formstr) is list : self.formstr=formstr
else : self.formstr=[formstr]
# Read instrument configuation from YAML configuration file
if inst is not None :
self.inst=inst
if inst.find('/') < 0 :
config = yaml.load(open(importlib_resources.files(DATA).joinpath(inst+'/'+inst+
conf+'.yml'),'r'), Loader=yaml.FullLoader)
else :
config = yaml.load(open(inst+'.yml','r'),
Loader=yaml.FullLoader)
self.channels=config['channels']
self.formstr=config['formstr']
self.gain=config['gain']
self.rn=config['rn']/np.sqrt(nfowler)
for card in ['cols','ext','scale','crbox','headerbox'] :
try : setattr(self,card,config[card])
except : setattr(self,card,None)
try : self.saturation=config['saturation']
except KeyError:
self.saturation = []
for chan in self.channels: self.saturation.append(2**32)
try : self.namp=config['namp']
except KeyError: self.namp = 1
try :self.transpose=config['transpose']
except KeyError: self.transpose = False
self.biastype=config['biastype']
try : self.biasavg=config['biasavg']
except KeyError: self.biasavg=11
if self.biasavg %2 == 0 : self.biasavg += 1
self.biasbox=[]
for box in config['biasbox'] :
if self.namp == 1 :
self.biasbox.append(image.BOX(xr=box[0],yr=box[1]) )
else :
ampbox=[]
for amp in box :
ampbox.append(image.BOX(xr=amp[0],yr=amp[1]) )
self.biasbox.append(ampbox)
self.biasregion=[]
try :
for box in config['biasregion'] :
if self.namp == 1 :
self.biasregion.append(image.BOX(xr=box[0],yr=box[1]) )
else :
ampbox=[]
for amp in box :
ampbox.append(image.BOX(xr=amp[0],yr=amp[1]) )
self.biasregion.append(ampbox)
except KeyError:
self.biasregion=[None]
self.trimbox=[]
for box in config['trimbox'] :
if self.namp == 1 :
self.trimbox.append(image.BOX(xr=box[0],yr=box[1]) )
else :
ampbox=[]
for amp in box :
ampbox.append(image.BOX(xr=amp[0],yr=amp[1]) )
self.trimbox.append(ampbox)
self.outbox=[]
try :
for box in config['outbox'] :
if self.namp == 1 :
self.outbox.append(image.BOX(xr=box[0],yr=box[1]) )
else :
ampbox=[]
for amp in box :
ampbox.append(image.BOX(xr=amp[0],yr=amp[1]) )
self.outbox.append(ampbox)
except: self.outbox=self.trimbox
self.normbox=[]
for box in config['normbox'] :
self.normbox.append(image.BOX(xr=box[0],yr=box[1]) )
try: self.scat=config['scat']
except : pass
try: self.scat_smooth=config['scat_smooth']
except : pass
# Add bad pixel mask if it exists
try: self.bitmask=fits.open(importlib_resources.files(DATA).joinpath(inst+'/'+
inst+'_mask.fits'))[0].data
except: pass
# save number of chips for convenience
self.nchip = len(self.formstr)
# output setup if verbose
if self.verbose :
if inst is not None :
print('INSTRUMENT: {:s} config: {:s}'.format(inst,conf))
for form in self.formstr :
print(' will use format: {:s}/{:s}{:s}.fits*'.format(
self.dir,self.root,form))
print(' gain: {} rn: {}'.format(self.gain,self.rn))
print(' scale: {} '.format(self.scale))
print(' Biastype : {:d}'.format(self.biastype))
print(' Bias box: ')
for box in self.biasbox :
if self.namp == 1 : box.show()
else :
for i,amp in enumerate(box) :
if i==0 : header = True
else: header=False
amp.show(header=header)
print(' Trim box: ')
for box in self.trimbox :
if self.namp == 1 : box.show()
else :
for i,amp in enumerate(box) :
if i==0 : header = True
else: header=False
amp.show(header=header)
print(' Norm box: ')
for box in self.normbox :
box.show()
[docs] def log(self,files=None,htmlfile=None,ext=None,hdu=0,channel='',
cols=None, display=None,hexdump=False) :
""" Create chronological image log from file headers in default
directory.
If any .csv file exists in the directory, its contents are
added as a table to htmlfile
Parameters
----------
htmlfile : str, default=None
if specified, write HTML log to htmlfile
ext : override default extension to search
cols : array-like, str, specifies which FITS header cards to output,
default=None, which will use cards
as defined in the Reducer object, if they have been set
by configuration file, or, otherwise will use
['DATE-OBS','OBJNAME','RA','DEC','EXPTIME']
display : if not None, specifies tv tool in which to
display each image and make png thumbnail, to include
in htmlfile
Returns
-------
astropy table from FITS headers
"""
if cols is None :
if self.cols is not None :
cols = self.cols
else :
cols=['DATE-OBS','OBJNAME','RA','DEC','EXPTIME']
if ext is None :
if self.ext is not None :
ext = self.ext
else :
ext='fit*'
# get list of files from default formstr or as requested by keyword
if files == None :
files=glob.glob(self.dir+'/*{:s}*.'.format(channel)+ext)
elif num.find('/') >= 0 :
files=glob.glob('{:s}'.format(files))
else :
files=glob.glob(self.dir+'/{:s}'.format(files))
if len(files) == 0 :
print('no files found matching: ',
self.dir+'/*{:s}*.'.format(channel)+ext)
date=[]
objs=[]
for file in files :
try :
if hexdump : header=_get_meta(file,keys=cols)
else : header=fits.open(file)[hdu].header
except FileNotFound :
print('error opening file: {:s}, hdu: {:d}'.format(file,hdu))
return
try :
date.append(header['DATE-OBS'])
except KeyError :
print('file {:s} does not have DATE-OBS'.format(file))
date.append('')
for col in cols :
try:
if 'OBJ' in col : objs.append(header[col])
except:
objs.append('?')
date=np.array(date)
sort=np.argsort(date)
if htmlfile is not None :
fp=html.head(htmlfile+'.html')
fp.write('Objects observed: ')
for obj in set(objs) : fp.write('{:s} '.format(obj))
fp.write('\n<p>\n')
fp.write('new objects in light blue, new filter in light green')
fp.write('<TABLE BORDER=2>\n')
if display is not None :
fp.write('<br> <a href={:s}_thumb.html> Thumbnails page</a>'.format(
os.path.basename(htmlfile)))
fd=html.head(htmlfile+'_thumb.html')
fd.write('<TABLE BORDER=2>\n')
# get names and dtypes for table, based on which requested cards are in last header
names=['FILE']
dtypes=['S24']
if htmlfile is not None : fp.write('<TR style="background-color:lightred"><TD>FILE\n')
for col in cols :
names.append(col)
dtypes.append('S16')
if htmlfile is not None :
fp.write('<TD>{:s}\n'.format(col))
tab=Table(names=names,dtype=dtypes)
# set up style for rows with new object
newobj= ''
newfilt= ''
for col in cols :
if 'OBJ' in col :
newobj='style="background-color:lightgreen"'
if 'FILT' in col :
newfilt='style="background-color:lightblue"'
oldobj = ''
oldfilt = ''
style = ''
for i in sort :
if hexdump : header=_get_meta(file,keys=cols)
else : header=fits.open(files[i])[hdu].header
# fix for RA/DEC for MaximDL headers
if 'RA' not in header :
try: header['RA'] = header['OBJCTRA'].replace(' ',':')
except : pass
if 'DEC' not in header :
try: header['DEC'] = header['OBJCTDEC'].replace(' ',':')
except : pass
# if we have OBJECT card, we can color rows for new object
for col in cols :
if 'OBJ' in col :
try :
if header[col] != oldobj :
style=newobj
oldobj=header[col]
else : style=''
except : pass
# if we have FILTER card, we can color rows for new filter (if not new object)
for col in cols :
if 'FILT' in col :
try :
if header[col] != oldfilt :
oldfilt=header[col]
if style == '' : style=newfilt
except : pass
if htmlfile is not None :
fp.write('<TR {:s}><TD>{:s}\n'.format(style,os.path.basename(files[i])))
row=[os.path.basename(files[i])]
for col in cols :
try:
row.append(str(header[col]))
if htmlfile is not None:
fp.write('<TD>{:s}\n'.format(str(header[col])))
except:
row.append('')
if htmlfile is not None: fp.write('<TD>\n')
tab.add_row(row)
if display is not None :
if not os.path.exists(files[i]+'.png') :
a=fits.open(files[i])[hdu].data
display.tv(a)
display.savefig(files[i]+'.png')
fp.write('<TD><A HREF="{:s}">png image</A>\n'.
format(
os.path.basename(files[i]+'.png')))
fd.write('<TR><TD>{:s}<TD><A HREF="{:s}"><IMG SRC="{:s}" WIDTH=400></A>\n'.
format(os.path.basename(files[i]),
os.path.basename(files[i]+'.png'),
os.path.basename(files[i]+'.png')))
if htmlfile is not None :
fp.write('</TABLE>\n')
if display is not None : fd.write('</TABLE>')
files=glob.glob(self.dir+'/*csv')
for file in files :
log=ascii.read(file)
if htmlfile is not None :
fp.write('<h3>{:s}'.format(file))
fp.write('<TABLE BORDER=2>\n')
for row in log :
fp.write('<TR>')
for col in row :
fp.write('<TD>')
fp.write(str(col))
fp.write('</TABLE>\n')
if htmlfile is not None : html.tail(fp)
return tab
[docs] def movie(self,ims,display=None,out='movie.mp4',channel=0,
min=None, max=None,box=None,text=True,fps=None) :
"""
Create animated gif of images
Parameters
----------
ims : list of image numbers.
display : pyvista TV object
out : output file name
"""
if display == None :
raise ValueError('you must specify a pyvista TV object with display=')
files = []
for i,im in enumerate(ims) :
display.clear()
a=self.rd(im,channel=channel)
if box is not None :
a=image.window(a,box,header=a.header)
display.tv(a,min=min,max=max,draw=False)
y,x=a.data.shape
try:
if text: display.tvtext(x//2,y*3//4,'{:d} {:s} {:f}'.format(
im,a.header['DATE-OBS'],a.header['EXPTIME']),color='r')
display.savefig('tmpimage{:d}.png'.format(i))
files.append('tmpimage{:d}.png'.format(i))
except:
print('error at file: ', im,' stopping there')
continue
import imageio
with imageio.get_writer(out,mode='I',format='FFMPEG',fps=fps) as writer :
for filename in files :
tmpimage = imageio.imread(filename)
os.remove(filename)
writer.append_data(tmpimage)
[docs] def reduce(self,num,channel=None,ext=0,
crbox=None,crsig=5,objlim=5,sigfrac=0.3,
bias=None,dark=None,flat=None,
scat=None,scat_smooth=None,badpix=None,solve=False,return_list=False,display=None,
trim=True,seeing=2,utr=False) :
""" Reads data from disk, and performs reduction steps
as determined from command line parameters
Parameters
----------
id : int or str
Number or string specifying file to read. If a number,
the filename will be constructed based on dir and formstr
attributed of Reducer object. Without any additional
command-line arguments, data will be read, overscan
subtracted, and uncertainty array populated based
on gain and readout noise in Reducer attributes
display : TV object, default=None
if specified, pyvista TV object to display data in as
various reduction steps are taken
channel : int, default= None
if specified, channel to reduce if instrument is
multi-channel (multi-file), otherwise all channels
will be read/reduced
bias : Data object, default= None
if specified, superbias frame to subtract
dark : Data object, default= None
if specified, superdark frame to subtract
flat : Data object, default= None
if specified, superflat frame to divide by
crbox : list or str, default=None
if specified, parameter to pass to CR rejection routine,
either 2-element list giving shape of box for median
filter, or 'lacosmic'
scat : integer, default=None
if specified, do scattered light correction, getting
estimate every scat pixels
badpix : int, default=None
if specified, set masked pixels to specified value
trim : bool, default=True
trim image after calibration, irrelevant if red.trimg=True
solve : bool, default=False
attempt to plate-solve image after reduction, requires
local astrometry.net
seeing : float, default=2
seeing used to find stars if solve=True
"""
im=self.rd(num,dark=dark,channel=channel,utr=utr,ext=ext)
self.overscan(im,display=display,channel=channel)
if self.trim : im=self.trimimage(im,trimimage=self.trim)
im=self.bias(im,superbias=bias)
im=self.dark(im,superdark=dark)
im=self.crrej(im,crbox=crbox,crsig=crsig,objlim=objlim,sigfrac=sigfrac,
display=display)
self.scatter(im,scat=scat,display=display,smooth=scat_smooth)
im=self.flat(im,superflat=flat,display=display)
self.badpix_fix(im,val=badpix)
if trim and display is not None: display.tvclear()
if trim and not self.trim : im=self.trimimage(im,trimimage=trim)
if solve :
im=self.platesolve(im,display=display,scale=self.scale,seeing=seeing)
if return_list and type(im) is not list : im=[im]
return im
[docs] def rd(self,num, ext=0, dark=None, channel=None, utr=False) :
""" Read an image
Parameters
----------
num (str or int) : name or number of image to read
Returns
-------
image (Data ) : Data object, but noise will be incorrect without overscan subtraction
"""
out=[]
# loop over different channels (if any)
idet=0
if channel is not None : channels=[channel]
else : channels = range(len(self.formstr))
for chan in channels :
form = self.formstr[chan]
gain = self.gain[chan]
rn = self.rn[chan]
# find the files that match the directory/format
if isinstance(num,(int,np.int64)) :
search=self.dir+'/'+self.root+form.format(num)
elif type(num) is str or type(num) is np.str_ :
if num.find('/') >= 0 :
search=num
else :
search=self.dir+'/'+num
else :
print('stopping in rd... num:',num)
pdb.set_trace()
file=glob.glob(search)
if len(file) == 0 :
file=glob.glob(search+'.gz')
if len(file) == 0 :
raise ValueError('cannot find file matching: '+search,num)
return
elif len(file) > 1 :
if self.verbose : print('more than one match found, using first!',file)
file=file[0]
# read the file into a Data object
if self.verbose : print(' Reading file: {:s}'.format(file))
if 'APOGEE' in self.inst :
if utr : im=apogee.utr(file,dark=dark)
else : im=apogee.cds(file,dark=dark)
else :
try : im=Data.read(file,hdu=ext,unit=u.dimensionless_unscaled)
except : raise RuntimeError('Error reading file: {:s}'.format(file))
im.data = im.data.astype(np.float32)
im.header['FILE'] = os.path.basename(file)
if 'OBJECT' not in im.header or im.header['OBJECT'] == '':
if 'OBJNAME' in im.header and im.header['OBJNAME'] != '' :
im.header['OBJECT'] = im.header['OBJNAME']
else :
try : im.header['OBJECT'] = im.header['FILE']
except KeyError : print('No OBJECT, OBJNAME, or FILE in header')
# fix for RA/DEC for MaximDL headers
if 'RA' not in im.header :
try: im.header['RA'] = im.header['OBJCTRA']
except : print('no RA or OBJCTRA found in {:s}'.format(file))
if 'DEC' not in im.header :
try: im.header['DEC'] = im.header['OBJCTDEC']
except : print('no DEC or OBJCTDEC found in {:s}'.format(file))
# Add uncertainty (will be in error if there is an overscan, but redo with overscan subraction later)
data=copy.copy(im.data)
data[data<0] = 0.
if isinstance(gain,list) :
im.uncertainty = StdDevUncertainty(np.sqrt( data/gain[0] + (rn/gain[0])**2 ))
else :
im.uncertainty = StdDevUncertainty(np.sqrt( data/gain + (rn/gain)**2 ))
# Add mask
pixmask = bitmask.PixelBitMask()
if self.bitmask is not None : im.bitmask = self.bitmask
else : im.bitmask = np.zeros(im.data.shape,dtype=np.short)
if self.badpix is not None :
for badpix in self.badpix[idet] :
badpix.setval(im.bitmask,pixmask.getval('BADPIX'))
if self.saturation is not None :
bd = np.where(im.data >= self.saturation[chan])
im.bitmask[bd] |= pixmask.getval('SATPIX')
out.append(im)
idet+=1
# return the data
if len(out) == 1 : return out[0]
else : return out
[docs] def overscan(self,im,display=None,channel=None) :
""" Overscan subtraction
"""
if self.biastype < 0 : return im
if type(im) is not list : ims=[im]
else : ims = im
if channel is not None :
gains = [self.gain[channel]]
rns = [self.rn[channel]]
biasboxes = [self.biasbox[channel]]
biasregions = [self.biasregion[channel]]
else :
gains = self.gain
rns = self.rn
biasboxes = self.biasbox
biasregions = self.biasregion
if self.headerbox : self.boxfromheader(im,2,2)
for ichan,(im,gain,rn,biasbox,biasregion) in \
enumerate(zip(ims,gains,rns,biasboxes,biasregions)) :
if display is not None :
display.tv(im)
ax=display.plotax2
ax.cla()
if self.namp == 1 :
ampboxes = [biasbox]
databoxes = [biasregion]
else :
ampboxes = biasbox
databoxes = biasregion
im.data = im.data.astype(np.float32)
color=['m','g','b','r','c','y']
for ibias,(databox,ampbox) in enumerate(zip(databoxes,ampboxes)) :
if display is not None :
display.tvbox(0,0,box=ampbox,color=color[ibias])
if type(databox) == image.BOX :
display.tvbox(0,0,box=databox,color=color[ibias],ls='--',lw=2)
ax.plot(np.arange(databox.ymin,databox.ymax),
np.mean(im.data[databox.ymin:databox.ymax,
ampbox.xmin:ampbox.xmax], axis=1),color=color[ibias])
if self.biastype == 0 :
b=ampbox.mean(im.data)
if self.verbose: print(' subtracting overscan: ', b)
if display is not None :
ax.plot([databox.ymin,databox.ymax],[b,b],color='k')
ax.text(0.05,0.95,'Overscan mean',transform=ax.transAxes)
ax.set_xlabel('Row')
display.fig.canvas.draw_idle()
plt.draw()
if type(databox) == image.BOX :
im.data[databox.ymin:databox.ymax+1,
databox.xmin:databox.xmax+1] = \
im.data[databox.ymin:databox.ymax+1,
databox.xmin:databox.xmax+1].astype(np.float32)-b
else :
im.data = im.data.astype(np.float32)-b
im.header.add_comment('subtracted overscan: {:f}'.format(b))
elif self.biastype == 1 :
over=np.median(im.data[databox.ymin:databox.ymax+1,
ampbox.xmin:ampbox.xmax],axis=1)
#boxcar = Box1DKernel(self.biasavg)
#over=convolve(over,boxcar,boundary='extend')
over=scipy.signal.medfilt(over,kernel_size=self.biasavg)
if display is not None :
ax.plot(np.arange(databox.ymin,databox.ymax+1),
over,color='k')
ax.set_xlabel('Row')
over2d=image.stretch(over,ncol=databox.ncol())
if self.verbose: print(' subtracting overscan vector ')
im.data[databox.ymin:databox.ymax+1,
databox.xmin:databox.xmax+1] = \
im.data[databox.ymin:databox.ymax+1,
databox.xmin:databox.xmax+1].astype(np.float32) - over2d
# if we have separate gains, multiply by them here
if isinstance(gain,list) :
print(' multiplying by gain: ', gain[ibias])
if type(databox) == image.BOX :
im.data[databox.ymin:databox.ymax+1,
databox.xmin:databox.xmax+1] = \
im.data[databox.ymin:databox.ymax+1,
databox.xmin:databox.xmax+1].astype(np.float32)*gain[ibias]
else :
im.data = im.data.astype(np.float32)*gain[ibias]
if display is not None :
display.tv(im)
getinput(" See bias box (solid outlines applied to dashed regions of the same color), and cross section. ",display)
# Add uncertainty (redo from scratch after overscan)
data=copy.copy(im.data)
data[data<0] = 0.
if isinstance(gain,list) :
im.uncertainty = StdDevUncertainty(np.sqrt( data + rn**2 ))
else :
im.uncertainty = StdDevUncertainty(np.sqrt( data/gain + (rn/gain)**2 ))
[docs] def trimimage(self,inim,trimimage=False) :
""" Trim image by masking non-trimmed pixels
May need to preserve image size to match reference/calibration frames, etc.
"""
if type(inim) is not list : ims=[inim]
else : ims = inim
if self.headerbox : self.boxfromheader(inim,2,2)
outim = []
for im,trimbox,outbox in zip(ims,self.trimbox,self.outbox) :
if self.namp == 1 :
boxes = [trimbox]
outboxes = [outbox]
else :
boxes = trimbox
outboxes = outbox
if im.bitmask is None :
print('adding a bitmask...')
im.add_bitmask(np.zeros(im.data.shape,dtype=np.uintc))
pixmask=bitmask.PixelBitMask()
for box in boxes :
box.setbit(im.bitmask,pixmask.getval('INACTIVE_PIXEL'))
if trimimage :
xmax=0
ymax=0
for box,outbox in zip(boxes,outboxes) :
xmax = np.max([xmax,outbox.xmax])
ymax = np.max([ymax,outbox.ymax])
z=np.zeros([ymax+1,xmax+1])
out = Data(data=z,uncertainty=z,bitmask=z.astype(np.uintc),
unit=u.dimensionless_unscaled,header=im.header)
for box,outbox in zip(boxes,outboxes) :
out.data[outbox.ymin:outbox.ymax+1,outbox.xmin:outbox.xmax+1] = \
im.data[box.ymin:box.ymax+1,box.xmin:box.xmax+1]
out.uncertainty.array[outbox.ymin:outbox.ymax+1,outbox.xmin:outbox.xmax+1] = \
im.uncertainty.array[box.ymin:box.ymax+1,box.xmin:box.xmax+1]
out.bitmask[outbox.ymin:outbox.ymax+1,outbox.xmin:outbox.xmax+1] = \
im.bitmask[box.ymin:box.ymax+1,box.xmin:box.xmax+1]
outim.append(out)
if trimimage:
if len(outim) == 1 : return outim[0]
else : return outim
else : return inim
[docs] def bias(self,im,superbias=None) :
""" Superbias subtraction
"""
# only subtract if we are given a superbias!
if superbias is None : return im
# work with lists so that we can handle multi-channel instruments
if type(im) is not list : ims=[im]
else : ims = im
if type(superbias) is not list : superbiases=[superbias]
else : superbiases = superbias
out=[]
for im,bias in zip(ims,superbiases) :
if self.verbose : print(' subtracting bias...')
corr = copy.deepcopy(im)
corr.data -= bias.data
corr.uncertainty.array = np.sqrt(corr.uncertainty.array**2+
bias.uncertainty.array**2)
out.append(corr)
if len(out) == 1 : return out[0]
else : return out
[docs] def dark(self,im,superdark=None) :
""" Superdark subtraction
"""
# only subtract if we are given a superdark!
if superdark is None : return im
if self.inst == 'APOGEE' : return im
# work with lists so that we can handle multi-channel instruments
if type(im) is not list : ims=[im]
else : ims = im
if type(superdark) is not list : superdarks=[superdark]
else : superdarks = superdark
out=[]
for im,dark in zip(ims,superdarks) :
if self.verbose : print(' subtracting dark...')
corr = copy.deepcopy(im)
exptime = corr.header['EXPTIME']
dark_exptime = dark.header['EXPTIME']
corr.data -= dark.data/dark_exptime*exptime
corr.uncertainty.array = np.sqrt(corr.uncertainty.array**2+
exptime**2*(dark.uncertainty.array/dark_exptime)**2)
out.append(corr)
if len(out) == 1 : return out[0]
else : return out
[docs] def flat(self,im,superflat=None,display=None) :
""" Flat fielding
"""
# only flatfield if we are given a superflat!
if superflat is None : return im
if type(im) is not list : ims=[im]
else : ims = im
if type(superflat) is not list : superflats=[superflat]
else : superflats = superflat
out=[]
for im,flat in zip(ims,superflats) :
if self.verbose : print(' flat fielding...')
if display is not None :
display.tv(im)
corr = copy.deepcopy(im)
corr.data /= flat.data
corr.uncertainty.array /= flat.data
out.append(corr)
if display is not None :
display.tv(corr,same=True)
#plot central crossections
display.plotax2.cla()
dim=corr.data.shape
col = int(dim[1]/2)
row = corr.data[:,col]
display.plotax2.plot(row)
min,max=image.minmax(row,low=5,high=5)
display.plotax2.set_ylim(min,max)
display.plotax2.set_xlabel('row')
display.plotax2.text(0.05,0.95,'Column {:d}'.format(col),
transform=display.plotax2.transAxes)
#display.plotax2.cla()
row = int(dim[0]/2)
col = corr.data[row,:]
min,max=image.minmax(col,low=10,high=10)
display.plotax2.plot(col)
display.plotax2.set_xlabel('col')
display.plotax2.text(0.05,0.95,'Row {:d}'.format(row),
transform=display.plotax2.transAxes)
display.plotax2.set_ylim(min,max)
getinput(" See flat-fielded image and original with - (minus) key.",display)
if len(out) == 1 : return out[0]
else : return out
[docs] def scatter(self,inim,scat=None,display=None,smooth=None,smooth2d=31,transpose=False) :
""" Removal of scattered light (for multi-order/object spectrograph)
Remove scattered light by looking for valleys in cross-sections across traces
and fitting a 2D surface to the low points. Cross-sections are smoothed before
finding the valleys, and interpolated surface is also smoothed. Some attempt
is made to reject outliers before fitting the final surface, which is subtracted
from the image.
Parameters
----------
im : Data object
input image to correct
transpose : bool, default=False
set to true if spectra run along columns
scat : integer, default=None
get scattered light measurements every scat pixels. If None, no correction
smooth : integer, default=3
boxcar width for smoothing profile perpendicular to traces before looking
for valleys
display : TV object, default=None
if set, show the scattered light measurements
"""
if scat is None : return
if smooth is None :
if self.scat_smooth is None : smooth=1
else : smooth=self.scat_smooth
if transpose :
im = Data(data=inim.data.T)
else :
im = inim
print(' estimating scattered light ...',smooth)
kernel = Box1DKernel(smooth)
ipoints=[]
points=[]
values=[]
nrows = im.data.shape[0]
ncols = im.data.shape[-1]
# find minima in each group of columns, and save location and value
for icol,col in enumerate(range(scat//2,ncols,scat)) :
print(' column: {:d}'.format(col),end='\r')
# take mean of surrounding columns, and smooth
tmp=convolve(np.median(im.data[:,col-scat//2:col+scat//2],axis=1),kernel)
yscat = scipy.signal.find_peaks(-tmp)[0]
for y in yscat :
if im.mask is None or not im.mask[y,col] :
ipoints.append([y,icol])
points.append([y,col])
values.append(tmp[y])
# fit [nrow:ncols/scat] surface to these values
grid_x, grid_y = np.mgrid[0:nrows,0:icol+1]
grid_z=scipy.interpolate.griddata(ipoints,values,(grid_x,grid_y), method='nearest',
fill_value=0.)
# smooth along columns and reject outliers
kernel= Box1DKernel(scat)
for icol in range(grid_z.shape[1]) :
grid_z[:,icol] = convolve(grid_z[:,icol],kernel)
points_gd=[]
values_gd=[]
for ipoint,point,value in zip(ipoints,points,values) :
if (value < 1.05*grid_z[ipoint[0],ipoint[1]] and
value > 0.95*grid_z[ipoint[0],ipoint[1]]) :
points_gd.append(point)
values_gd.append(value)
# fit surface to the minimum values
#print(' fitting surface ...')
#grid_x, grid_y = np.mgrid[0:nrows,0:ncols]
#grid_z=scipy.interpolate.griddata(points,values,(grid_x,grid_y), method='cubic',
# fill_value=0.)
## smooth and reject outlying points
#boxcar = Box2DKernel(smooth2d)
#grid_z=convolve(scipy.interpolate.griddata(points,values,(grid_x,grid_y),
# method='cubic',fill_value=0.),boxcar)
## go back and try to reject outliers
#print(' rejecting points ...')
#points_gd=[]
#values_gd=[]
#for point,value in zip(points,values) :
# if value < 1.1*grid_z[point[0],point[1]] :
# points_gd.append(point)
# values_gd.append(value)
#
# refit surface, this time to full-sized image. Do it once with linear, and then
# again with nearest to use to fill in edges outside of the hull of points
print(' refitting surface ...')
grid_x, grid_y = np.mgrid[0:nrows,0:ncols]
#grid_z=convolve(scipy.interpolate.griddata(points_gd,values_gd,(grid_x,grid_y),
# method='cubic',fill_value=0.),boxcar)
grid_z=scipy.interpolate.griddata(points_gd,values_gd,(grid_x,grid_y),
method='linear')
nearest_grid_z=scipy.interpolate.griddata(points_gd,values_gd,(grid_x,grid_y),
method='nearest')
bd = np.where(np.isnan(grid_z))
grid_z[bd] = nearest_grid_z[bd]
if display is not None :
display.clear()
display.tv(im)
points=np.array(points)
display.ax.scatter(points[:,1],points[:,0],color='r',s=3)
points_gd=np.array(points_gd)
display.ax.scatter(points_gd[:,1],points_gd[:,0],color='g',s=3)
getinput(" See image with scattered light points",display)
display.clear()
display.tv(im)
display.tv(grid_z)
col=int(im.shape[-1]/2)
display.plotax2.cla()
display.plotax2.plot(im.data[:,col])
display.plotax2.plot(grid_z[:,col])
plt.draw()
getinput(" See scattered light image",display)
if transpose :
im.data -= grid_z.T
else :
im.data -= grid_z
[docs] def crrej(self,im,crbox=None,crsig=5,display=None,
objlim=5.,sigfrac=0.3,fsmode='median',inbkg=None) :
""" Cosmic ray rejection using spatial median filter or lacosmic.
If crbox is given as a 2-element list, then a box of this shape is
run over the image. At each location, the median in the box is determined.
For each pixel in the box, if the value is larger than crsig*uncertainty
(where uncertainty is taken from the input.uncertainty.array), the pixel
is replaced by the median. If crsig is a list, then multiple passes
are done with successive values of crsig (which should then be decreasing),
but only neighbors of previously flagged CRs are tested.
Affected pixels are flagged in input.bitmask
If crbox='lacosmic', the LA Cosmic routine, as implemented in
astroscrappy is run on the image, with default options,
but objlim, fsmode, and inbkg can be specified.
Parameters
----------
crbox : list, int shape of box to use for median filters, or 'lacosmic'
crsig : list/float, default 5, threshold for CR rejection if using spatial
median filter; if list, do multiple passes, with all passes after
first only on neighbors of previously flagged CRs
objlim : for LAcosmic, default=5
fsmod : for LAcosmic, default='median'
inbkg : for LAcosmic, default=None
display : None for no display, pyvista TV object to display
"""
if crbox is None: return im
if type(im) is not list : ims=[im]
else : ims = im
if type(crsig) is not list : nsigs=[crsig]
else : nsigs = crsig
out=[]
for i,(im,gain,rn,sat) in enumerate(zip(ims,self.gain,self.rn,self.saturation)) :
print(' starting CR rejection, may take some time ....')
for iter,nsig in enumerate(nsigs) :
if display is not None :
display.clear()
min,max=image.minmax(im,low=5,high=30)
display.tv(im.uncertainty.array,min=min,max=max)
display.tv(im,min=min,max=max)
if crbox == 'lacosmic':
if self.verbose :
print(' zapping CRs with astroscrappy detect_cosmics')
if isinstance(gain,list) : g=1.
else : g=gain
outim =copy.deepcopy(im)
crmask,outim.data =astroscrappy.detect_cosmics(im.data,
sigclip=crsig, sigfrac=sigfrac, objlim=objlim,
gain=g, readnoise=rn, satlevel=sat,
fsmode=fsmode)
else :
if self.verbose :
print(' Iteration {:d}, zapping CRs with filter [{:d},{:d}]...'.format(iter, *crbox))
if crbox[0]%2 == 0 or crbox[1]%2 == 0 :
raise ValueError('cosmic ray rejection box dimensions must be odd numbers...')
if crbox[0]*crbox[1] > 49 :
print('WARNING: large rejection box may take a long time to complete!')
tmp=input(" Hit c to continue anyway, else quit")
if tmp != 'c' : return
if iter == 0 : outim=copy.deepcopy(im)
image.zap(outim,crbox,nsig=nsig)
if iter > 0 :
# if not first iteration, only allow changes to neighbors of CRs
mask = np.where( image.smooth
(outim.bitmask&pixmask.getval('CRPIX'),[3,3]) == 0)
outim.data[mask[0],mask[1]] = im.data[mask[0],mask[1]]
if display is not None :
display.tv(outim,min=min,max=max)
display.tv(im.subtract(outim),min=min,max=max)
getinput(" See CRs and CR-zapped image and original using - key",display)
crpix = np.where(~np.isclose(im.data-outim.data,0.))
pixmask = bitmask.PixelBitMask()
outim.bitmask[crpix] |= pixmask.getval('CRPIX')
out.append(outim)
if len(out) == 1 : return out[0]
else : return out
[docs] def badpix_fix(self,im,val=0.) :
""" Replace bad pixels
"""
if val is None : return
if type(im) is not list : ims=[im]
else : ims = im
for i, im in enumerate(ims) :
if im.mask is not None :
bd=np.where(im.mask)
ims[i].data[bd[0],bd[1]] = val
ims[i].uncertainty.array[bd[0],bd[1]] = np.inf
[docs] def platesolve(self,im,scale=0.46,seeing=2,display=None,thresh=10) :
""" try to get plate solution with astrometry.net
"""
if self.verbose : print(' plate solving with local astrometry.net....')
# find stars
mad=np.nanmedian(np.abs(im-np.nanmedian(im)))
daofind=DAOStarFinder(fwhm=seeing/scale,threshold=thresh*mad)
objs=daofind(im.data-np.nanmedian(im.data))
if len(objs) == 0 :
raise RuntimeError('no stars detected. Maybe try setting seeing?')
else : print('found ',len(objs),' objects ')
try: objs.sort(['mag'])
except: pdb.set_trace()
gd=np.where((objs['xcentroid']>50)&(objs['ycentroid']>50)&
(objs['xcentroid']<im.data.shape[1]-50)&
(objs['ycentroid']<im.data.shape[0]-50))[0]
if display is not None :
display.tv(im)
objs['x'] = objs['xcentroid']
objs['y'] = objs['ycentroid']
stars.mark(display,objs[gd],exit=True)
tmpfile=tempfile.mkstemp(dir='./')
objs.write(os.path.basename(tmpfile[1])+'xy.fits')
# solve with astrometry.net routines
ra=im.header['RA'].replace(' ',':')
dec=im.header['DEC'].replace(' ',':')
rad=15*(float(ra.split(':')[0])+float(ra.split(':')[1])/60.+float(ra.split(':')[2])/3600.)
decd=(float(dec.split(':')[0])+float(dec.split(':')[1])/60.+float(dec.split(':')[2])/3600.)
cmdname=shutil.which('solve-field')
if cmdname is None :
cmdname=shutil.which('/usr/local/astrometry/bin/solve-field')
if cmdname is None :
print('cannot find local astrometry.net solve-field routine')
pdb.set_trace()
cmd=(cmdname+
' --scale-units arcsecperpix --scale-low {:f} --scale-high {:f}'+
' -X xcentroid -Y ycentroid -w 4800 -e 3000 --overwrite'+
' --ra {:f} --dec {:f} --radius 3 {:s}xy.fits').format(
.9*scale,1.1*scale,rad,decd,os.path.basename(tmpfile[1]))
print(cmd)
ret = subprocess.call(cmd.split())
if display is not None :
getinput(" See plate solve stars",display)
display.tvclear()
# get WCS
try:
header=fits.open(os.path.basename(tmpfile[1])+'xy.wcs')[0].header
w=WCS(header)
im.wcs=w
except :
for f in glob.glob(os.path.basename(tmpfile[1])+'*') : os.remove(f)
raise RuntimeError('plate solve FAILED')
for f in glob.glob(os.path.basename(tmpfile[1])+'*') : os.remove(f)
return im
[docs] def noise(self,pairs,rows=None,cols=None,nbox=200,display=None,channel=None,levels=None,skip=1) :
""" Noise characterization from image pairs
"""
title=''
fig,ax=plots.multi(1,3,hspace=0.001,sharex=True)
colors=['r','g','b','c','m','y','k']
for icolor,pair in enumerate(pairs) :
mean=[]
std=[]
iqr=[]
n=[]
title+='[{:s},{:s}]'.format(str(pair[0]),str(pair[1]))
a=self.reduce(pair[0],channel=channel)
b=self.reduce(pair[1],channel=channel)
diff=a.data[::skip,::skip]-b.data[::skip,::skip]
avg=(a.data[::skip,::skip]+b.data[::skip,::skip])/2
if display != None :
display.tv(avg)
display.tv(diff)
if levels is not None :
for i,level in enumerate(levels[0:-1]) :
j=np.where((avg.flatten() > level) & (avg.flatten() <= levels[i+1]))[0]
if len(j) > 100 :
mean.append((level+levels[i+1])/2.)
std0 = diff.flatten()[j].std()
gd = np.where(np.abs(diff.flatten()[j]) < 5*std0)[0]
std.append(diff.flatten()[j[gd]].std())
n.append(len(j))
q1 = np.percentile(diff.flatten()[j], 25)
q3 = np.percentile(diff.flatten()[j], 75)
iqr.append((q3 - q1)/1.35)
print((level+levels[i+1])/2.,diff.flatten()[j].std(),(q3-q1)/1.35,len(j))
else :
if rows is None : rows=np.arange(0,a.shape[0],nbox)
if cols is None : cols=np.arange(0,a.shape[1],nbox)
for irow,r0 in enumerate(rows[0:-1]) :
for icol,c0 in enumerate(cols[0:-1]) :
box = image.BOX(xr=[cols[icol],cols[icol+1]],
yr=[rows[irow],rows[irow+1]])
if display != None :
display.tvbox(0,0,box=box)
print(r0,c0,box.median(avg),box.stdev(diff))
mean.append(box.median(avg))
std.append(box.stdev(diff))
iqr.append(box.iqr(diff))
n.append((cols[icol+1]-cols[icol])*(rows[irow+1]-rows[irow]))
mean=np.array(mean)
std=np.array(std)
iqr=np.array(iqr)
n=np.array(n)
plots.plotp(ax[0],mean,std**2,yt=r'$\sigma^2$',size=30,color=colors[icolor%7])
plots.plotp(ax[0],mean,iqr**2,yt=r'$\sigma^2$',size=30,color=colors[icolor%7])
plots.plotp(ax[1],mean,2*mean/std**2,yt=r'G = 2 C / $\sigma^2$',size=20,color=colors[icolor%7])
plots.plotp(ax[1],mean,2*mean/iqr**2,yt=r'G = 2 C / $\sigma^2$',size=20,color=colors[icolor%7])
plots.plotp(ax[2],mean,np.log10(n),xt='counts (C)',yt='log(Npix)',size=20,color=colors[icolor%7])
fig.suptitle(title+' channel: {:d}'.format(channel))
pdb.set_trace()
[docs] def display(self,display,id) :
""" Reduce and display image
"""
im = self.reduce(id)
if type(im) is not list : ims=[im]
else : ims = im
for i, im in enumerate(ims) :
display.tv(im)
[docs] def write(self,im,name,overwrite=True,png=False,wave=None) :
""" write out image, deal with multiple channels
"""
if type(im) is not list : ims=[im]
else : ims = im
for i,frame in enumerate(ims) :
if self.nchip > 1 : outname = name.replace('.fits','')+'_'+self.channels[i]+'.fits'
else : outname = name
frame.write(outname,overwrite=overwrite)
if png :
#backend=matplotlib.get_backend()
#matplotlib.use('Agg')
if wave is not None :
fig=plt.figure(figsize=(18,6))
for row in frame.data.size[0] :
plt.plot(wave[row],frame.data[row])
plt.xlabel('Wavelength')
plt.ylabel('Flux')
else :
fig=plt.figure(figsize=(12,9))
vmin,vmax=image.minmax(frame.data)
plt.imshow(frame.data,vmin=vmin,vmax=vmax,
cmap='Greys_r',interpolation='nearest',origin='lower')
plt.colorbar(shrink=0.8)
plt.axis('off')
fig.tight_layout()
fig.savefig(name.replace('.fits','.png'))
plt.close()
#matplotlib.use(backend)
[docs] def getcube(self,ims,**kwargs) :
""" Read images into data cube
"""
# create list of images, reading and overscan subtracting
allcube = []
for im in ims :
if isinstance(im,dataclass.Data) :
data = im
elif isinstance(im,list) :
#multi-channel instrument
data = im
else :
data = self.reduce(im, **kwargs)
allcube.append(data)
# if just one frame, put in 2D list anyway so we can use same code, allcube[nframe][nchip]
if self.nchip == 1 :
allcube=[list(i) for i in zip(*[allcube])]
return allcube
[docs] def sum(self,ims, return_list=False, **kwargs) :
""" Coadd input images
"""
return self.combine(ims,type='sum',return_list=return_list,**kwargs)
[docs] def combine(self,ims,normalize=False,display=None,div=True,
return_list=False, type='median',sigreject=5,**kwargs) :
""" Combine images from list of images
"""
# create list of images, reading and overscan subtracting
allcube = self.getcube(ims,**kwargs)
nframe = len(allcube)
# do the combination
out=[]
for chip in range(self.nchip) :
datacube = []
varcube = []
maskcube = []
allnorm = []
for im in range(nframe) :
if normalize :
try:
norm=self.normbox[chip].mean(allcube[im][chip].data)
except:
norm=np.nanmean(allcube[im][chip].data)
allnorm.append(norm)
allcube[im][chip].data /= norm
allcube[im][chip].uncertainty.array /= norm
datacube.append(allcube[im][chip].data)
varcube.append(allcube[im][chip].uncertainty.array**2)
maskcube.append(allcube[im][chip].bitmask)
if type == 'median' :
if self.verbose: print(' combining data with median....')
med = np.median(np.array(datacube),axis=0)
sig = 1.253 * np.sqrt(np.mean(np.array(varcube),axis=0)/nframe)
elif type == 'mean' :
if self.verbose: print(' combining data with mean....')
med = np.mean(np.array(datacube),axis=0)
sig = np.sqrt(np.sum(np.array(varcube),axis=0)/nframe)
elif type == 'sum' :
if self.verbose: print(' combining data with sum....')
med = np.sum(np.array(datacube),axis=0)
sig = np.sqrt(np.mean(np.array(varcube),axis=0)/nframe)
elif type == 'reject' :
datacube = np.array(datacube)
if self.verbose: print(' combining data with rejection....')
med = np.median(datacube,axis=0)
bd=np.where(datacube>med+sigreject*np.sqrt(np.array(varcube)))
datacube[bd]=np.nan
med = np.nanmean(datacube,axis=0)
sig = np.sqrt(np.nanmean(np.array(varcube),axis=0)/nframe)
else :
raise ValueError('no combination type: {:s}'.format(type))
if self.verbose: print(' calculating uncertainty....')
#mask = np.any(maskcube,axis=0)
mask = np.bitwise_or.reduce(maskcube,axis=0)
comb=Data(med.astype(np.float32),header=allcube[im][chip].header,
uncertainty=StdDevUncertainty(sig.astype(np.float32)),
bitmask=mask,unit=u.dimensionless_unscaled)
if normalize: comb.meta['MEANNORM'] = np.array(allnorm).mean()
out.append(comb)
# display final combined frame and individual frames relative to combined
if display :
for i,f in enumerate(comb) :
comb.header['OBJECT'] = 'Combined frame'
display.clear()
display.tv(comb,sn=True)
display.tv(comb)
if comb.mask is not None :
gd=np.where(comb.mask == False)
elif comb.bitmask is not None :
pixmask=bitmask.PixelBitMask()
gd=np.where((comb.bitmask&pixmask.badval())==0)
else :
gd=np.where(med>0)
min,max=image.minmax(med[gd[0],gd[1]],low=10,high=10)
display.plotax2.hist(med[gd[0],gd[1]],
bins=np.linspace(min,max,100),histtype='step')
display.fig.canvas.draw_idle()
getinput(" See final image, use - key for S/N image.",display)
for i,im in enumerate(ims) :
min,max=image.minmax(med[gd[0],gd[1]],low=5,high=5)
display.fig.canvas.draw_idle()
if div :
display.plotax2.hist((allcube[i][chip].data/med)[gd[0],gd[1]],
bins=np.linspace(0.8,1.2,100),histtype='step')
display.tv(allcube[i][chip].data/med,min=0.8,max=1.2,
object='{} / master'.format(im))
getinput(" see image: {} divided by master".format(allcube[i][chip].header['FILE']),display)
else :
delta=5*self.rn[chip]
display.plotax2.hist((allcube[i][chip].data-med)[gd[0],gd[1]],
bins=np.linspace(-delta,delta,100),histtype='step')
display.tv(allcube[i][chip].data-med,min=-delta,max=delta,
object='{} - master'.format(im))
getinput(" see image: {} minus master".format(allcube[i][chip].header['FILE']),display)
# return the frame
if len(out) == 1 :
if return_list : return [out[0]]
else : return out[0]
else : return out
[docs] def mkbias(self,ims,display=None,scat=None,type='median',sigreject=5,
trim=False) :
""" Driver for superbias combination (no superbias subtraction no normalization)
ims : list of frames to combine
display : TV object, default= None
if specified, displays bias and individual frames-bias for inspection
type : str, default='median'
combine method
sigreject : float
rejection threshold for combine type='reject', otherwise ignored
"""
bias= self.combine(ims,display=display,div=False,scat=scat,trim=trim,
type=type,sigreject=sigreject)
for i,f in enumerate(bias) :
bias[i].header['OBJECT'] = 'Combined bias'
return bias
[docs] def mkdark(self,ims,ext=0,bias=None,display=None,scat=None,trim=False,
type='median',sigreject=5,clip=None) :
""" Driver for superdark combination (no normalization)
Parameters
----------
ims : list of frames to combine
display : TV object, default= None
if specified, displays dark and individual frames-dark for inspection
bias : Data object, default=None
if specified, superbias to subtract before combining darks
type : str, default='median'
combine method
sigreject : float
rejection threshold for combine type='reject', otherwise ignored
clip : float, default=None
if specified, set all values in output dark < clip*uncertainty to zero in master dark
"""
dark= self.combine(ims,ext=ext,bias=bias,display=display,trim=trim,
div=False,scat=scat,type=type,sigreject=sigreject)
for i,f in enumerate(dark) :
dark[i].header['OBJECT'] = 'Combined dark'
if clip != None:
low = np.where(dark.data < clip*dark.uncertainty.array)
dark.data[low] = 0.
return dark
[docs] def mkflat(self,ims,bias=None,dark=None,scat=None,display=None,trim=False,ext=0,
type='median',sigreject=5,spec=False,width=101,littrow=False,normalize=True,
snmin=50,clip=None) :
""" Driver for superflat combination, with superbias if specified, normalize to normbox
Parameters
----------
ims : list of frames to combine
display : TV object, default= None
if specified, displays flat and individual frames/flat for inspection
bias : Data object, default=None
if specified, superbias to subtract before combining flats
dark : Data object, default=None
if specified, superdark to subtract before combining flats
scat :
type : str, default='median'
combine method
sigreject : float
rejection threshold for combine type='reject', otherwise ignored
spec : bool, default=False
if True, creates "spectral" flat by taking out wavelength
shape
littrow : bool, default=False
if True, attempts to fit and remove Littrow ghost from flat,
LITTROW_GHOST bit must be set in bitmask first to identify
ghost location. Ignored if spec==False
width : int, default=101
window width for removing spectral shape for spec=True
Returns
-------
Data object with combined flat
"""
flat= self.combine(ims,ext=ext,bias=bias,dark=dark,normalize=normalize,trim=trim,
scat=scat,display=display,type=type,sigreject=sigreject)
for i,f in enumerate(flat) :
flat[i].header['OBJECT'] = 'Combined flat'
if spec :
flat = self.mkspecflat(flat,width=width,display=display,
littrow=littrow,snmin=snmin)
if clip is not None :
low = np.where(flat.data < clip*flat.uncertainty.array)
flat.data[low] = 1.
flat.uncertainty.array[low] = 1.
return flat
[docs] def mkspecflat(self,flats,width=101,display=None,littrow=False,snmin=50) :
""" Spectral flat takes out variation along wavelength direction
"""
if type(flats) is not list : flats=[flats]
boxcar = Box1DKernel(width)
sflat=[]
for flat in flats :
if self.transpose :
tmp = dataclass.transpose(flat)
else :
tmp = copy.deepcopy(flat)
nrows=tmp.data.shape[0]
# subtract Littrow ghost
if littrow :
print(' fitting/subtracting Littrow ghost')
pixmask=bitmask.PixelBitMask()
nmed2=10
fixed = copy.deepcopy(tmp.data)
for i in np.arange(nmed2,tmp.shape[0]-nmed2) :
try :
pix=np.where((tmp.bitmask[i]&pixmask.getval('LITTROW_GHOST')) > 0)[0]
yy=np.median(tmp.data[i-nmed2:i+nmed2,pix.min()-10:pix.max()+10],axis=0)
xx=np.arange(yy.size)
p0 = [.05,(pix.max()-pix.min())/2+10,(pix.max()-pix.min())/4.,1.,0.]
coeffs,var = curve_fit(spectra.gauss,xx,yy,p0)
xx=np.arange(tmp.data.shape[1])
if coeffs[3] > 0.5 :
coeffs[3] = 0.
coeffs[4] = 0.
coeffs[1] += pix.min()-10
fixed[i] -= spectra.gauss(xx,*coeffs)
except: pass
tmp.data = fixed
# limit region for spectral shape to high S/N area (slit width)
snmed = np.nanpercentile(tmp.data/tmp.uncertainty.array,[90],axis=1)
gdrows = np.where(snmed[0]>snmin)[0]
med = convolve(np.nanmedian(tmp[gdrows,:],axis=0),
boxcar,boundary='extend')
if display is not None :
display.tv(tmp)
display.plotax2.cla()
display.plotax2.plot(med)
for row in range(tmp.data.shape[0]) :
tmp.data[row,:] /= med
tmp.uncertainty.array[row,:] /= med
if display is not None :
display.tv(tmp,min=0.7,max=1.3)
if self.transpose :
sflat.append(dataclass.transpose(tmp))
else :
sflat.append(tmp)
if len(sflat) == 1 :return sflat[0]
else : return sflat
[docs]class DET() :
"""
Defines detector class
"""
def __init__(self,inst=None,gain=0.,rn=0.,biastype=0,biasbox=None,
normbox=None,trimbox=None,formstr='{:04d}') :
self.gain = gain
self.rn = rn
self.biastype = biastype
if biasbox is None : self.biasbox = image.BOX()
else : self.biasbox = biasbox
if normbox is None : self.normbox = image.BOX()
else : self.normbox = normbox
if trimbox is None : self.trimbox = image.BOX()
else : self.trimbox = trimbox
self.formstr = formstr
if inst == 'ARCES' :
# APO ARCES
self.gain=3.8
self.rn=7
self.biasbox.set(2052,2057,20,2028)
self.trimbox.set(200,1850,0,2047)
self.biasbox = [ image.BOX(xr=[1030,1050],yr=[0,2047]) ,
image.BOX(xr=[1030,1050],yr=[0,2047]) ]
self.trimbox = [ image.BOX(xr=[0,2047],yr=[0,1023]) ,
image.BOX(xr=[1030,1050],yr=[0,2047]) ]
elif inst == 'DIS' :
# DIS blue
self.gain=[1.71,1.71]
self.rn=[3.9,3.9]
self.biasbox = [ image.BOX(xr=[1030,1050],yr=[0,2047]) ,
image.BOX(xr=[1030,1050],yr=[0,2047]) ]
self.trimbox = [ image.BOX(xr=[0,2047],yr=[0,1023]) ,
image.BOX(xr=[1030,1050],yr=[0,2047]) ]
self.formstr=['{:04d}b','{:04d}r']
[docs]def look(tv,pause=True,files=None,list=None,min=None, max=None) :
"""
Displays series of files
"""
if files is None and list is None :
files=glob.glob(indir+'/*.fits*')
if list is not None :
f=open(indir+list,'r')
files=[]
for line in f :
files.extend(np.arange(int(line.split()[0]),int(line.split()[1])))
f.close()
pdb.set_trace()
for file in files :
hd=read(file,verbose=True,bias=False)
disp(tv,hd,min=min,max=max)
if pause :
pdb.set_trace()
[docs]def getfiles(type,listfile=None,filter=None,verbose=False) :
"""
Get all files of desired type from specified directory. If file is specified, read numbers from that file, else use IMAGETYP card
"""
if listfile is None :
list=[]
if verbose: print('directory: ', indir)
for file in glob.glob(indir+'/'+root+'*.fits*') :
head=fits.open(file)[0].header
#if verbose :
# print('file: ', file)
# print('IMAGETYPE: ', head['IMAGETYP'])
try :
if head['IMAGETYP'] == type :
if filter is None or head['FILTER'] == filter :
list.append(file)
except :
pass
else :
list=ascii.read(indir+listfile,Reader=ascii.NoHeader)['col1']
return list
[docs]def disp(tv,hd,min=None,max=None,sky=False) :
"""
Displays HDU or data array on specified ds9/tv device
"""
if type(tv) is pyds9.DS9 :
if isinstance(hd, (np.ndarray)) :
tv.set_np2arr(hd)
data=hd
elif isinstance(hd, (astropy.io.fits.hdu.hdulist.HDUList)) :
tv.set_pyfits(hd)
data=hd[0].data
elif isinstance(hd, (astropy.io.fits.hdu.image.PrimaryHDU)) :
tv.set_np2arr(hd.data)
data=hd.data
else :
print('Unrecognized data type for display: ',type(hd))
if sky :
skyval = mmm.mmm(data)
min = skyval[0]-5*skyval[1]
max = skyval[0]+20*skyval[1]
if min is not None and max is not None :
tv.set("scale limits {:5d} {:5d}".format(min,max))
else :
tv.set("scale histequ")
else :
if isinstance(hd, (astropy.io.fits.hdu.hdulist.HDUList)) :
tv.tv(hd[0],min=min,max=max)
else :
tv.tv(hd,min=min,max=max)
def mkmask(inst=None) :
if inst == 'ARCES' :
nrow=2068
ncol=2128
badpix = [ image.BOX(yr=[0,2067],xr=[0,130]), # left side
image.BOX(yr=[0,2067],xr=[2000,2127]), # right side
image.BOX(yr=[802,2000],xr=[787,787]),
image.BOX(yr=[663,2000],xr=[1682,1682]),
image.BOX(yr=[219,2067],xr=[101,101]),
image.BOX(yr=[1792,1835],xr=[1284,1284]),
image.BOX(yr=[1474,2067],xr=[1355,1355]),
image.BOX(yr=[1418,1782],xr=[1602,1602]),
image.BOX(yr=[1905,1943],xr=[1382,1382]),
image.BOX(yr=[1926,1974],xr=[1416,1416]),
image.BOX(yr=[1610,1890],xr=[981,981]),
image.BOX(yr=[1575,2067],xr=[490,490]),
image.BOX(yr=[1710,1722],xr=[568,568]),
image.BOX(yr=[1905,1981],xr=[653,654]),
image.BOX(yr=[1870,1925],xr=[853,853]) ]
elif inst == 'DIS' :
nrow=1078
ncol=2098
badpix = [ image.BOX(yr=[474,1077],xr=[803,803]),
image.BOX(yr=[0,1077],xr=[1196,1196]),
image.BOX(yr=[0,1077],xr=[0,0]) ]
mask = np.zeros([nrow,ncol],dtype=np.int16)
for box in badpix :
box.setval(mask,True)
hdulist=fits.HDUList()
hdulist.append(fits.PrimaryHDU(mask))
hdulist.writeto(inst+'_mask.fits',overwrite=True)
return mask
def _parse_hexdump_headers(output, keys, default=""):
""" Parse character string from file into keyword,value pairs
"""
meta = [default] * len(keys)
for line in output:
try:
key, value = line.split("=", 2)
except ValueError: # grep'd something in the data
continue
key = key.strip()
if key in keys:
index = keys.index(key)
if "/" in value:
# could be comment
*parts, comment = value.split("/")
value = "/".join(parts)
value = value.strip("' ")
meta[index] = value.strip()
return meta
def _get_meta(path, keys=['DATE-OBS'], head=20_000):
""" Read fits headers faster!!
"""
keys_str = "|".join(keys)
if '.gz' in path :
commands = " | ".join([
'zcat {path}','hexdump -n {head} -e \'80/1 "%_p" "\\n"\'' ,
'egrep "{keys_str}"'
]).format(head=head, path=path, keys_str=keys_str)
else :
commands = " | ".join([
'hexdump -n {head} -e \'80/1 "%_p" "\\n"\' {path}' ,
'egrep "{keys_str}"'
]).format(head=head, path=path, keys_str=keys_str)
outputs = subprocess.check_output(commands, shell=True, text=True)
outputs = outputs.strip().split("\n")
values = _parse_hexdump_headers(outputs, keys)
headers = dict()
headers.update(dict(zip(keys, values)))
return headers