# routines to deal with stellar images
import copy
import glob
import numpy as np
import pdb
import astropy
import os
import subprocess
import tempfile
import multiprocessing as mp
from astropy.io import fits
from astropy.table import Table, Column, vstack
from astropy.nddata import support_nddata
from astropy.time import Time
from pyvista import mmm, tv, spectra
from astropy.stats import sigma_clipped_stats
#from photutils import CircularAperture, CircularAnnulus,aperture_photometry
from photutils.aperture import ApertureStats, CircularAperture, CircularAnnulus,aperture_photometry
from photutils.detection import DAOStarFinder
from holtztools import plots,html
from pyvista import bitmask, centroid
import matplotlib.pyplot as plt
from scipy.optimize import curve_fit
from astroquery.sdss import SDSS
from astropy import coordinates as coord
import astropy.units as u
from collections import namedtuple
Center = namedtuple('Center', ['x', 'y', 'tot', 'meanprof','varprof'])
[docs]@support_nddata
def find(data,fwhm=4,thresh=4000,sharp=[0.,1.],round=[-2.,2.],brightest=None) :
""" Star finding using DAOStarfinder
Parameters
----------
data : array-like
2D data array
fwhm : optional, float
FWHM (pixels) for matching, default=4
thresh : optional, float
Threshold above background, default=4000
sharp : optional, [float,float]
Low and high bounds for sharpness, default=[0.,1.]
round : optional, [float,float]
Low and high bounds for roundness, default=[-2.,2]
"""
daofind = DAOStarFinder(fwhm=fwhm,threshold=thresh,brightest=brightest,
sharplo=sharp[0],sharphi=sharp[1],
roundlo=round[0],roundhi=round[1],
exclude_border=True)
try :
sources=daofind(data)
sources.rename_column('xcentroid','x')
sources.rename_column('ycentroid','y')
except :
return None
return sources
[docs]@support_nddata
def automark(data,stars,header=None,rad=3,func='centroid',plot=None,dx=0,dy=0,verbose=False,background=True) :
""" Recentroid existing star list on input data array
"""
if func == 'centroid' :
center = centroid.centroid
elif func == 'marginal_gfit' :
center = centroid.marginal_gfit
elif func == 'gfit2' :
center = centroid.gfit2
new=copy.deepcopy(stars)
if header is not None :
try:
dateobs=Time(header['DATE-OBS'],format='fits')
mjds=np.zeros(len(new))+dateobs.mjd
new.replace_column('MJD',Column(mjds,name='MJD',dtype=('f8')))
new['MJD'].info.format = '.6f'
except: dateobs=None
else : dateobs=None
for i,star in enumerate(new) :
try :
cent = center(data,star['x']+dx,star['y']+dy,rad,
plot=plot,verbose=verbose,background=background)
new[i]['x'] = cent.x
new[i]['y'] = cent.y
except :
new[i]['x'] = np.nan
new[i]['y'] = np.nan
return new
[docs]def mark(tv,stars=None,rad=3,auto=False,color='m',new=False,exit=False,id=False,func='centroid'):
""" Interactive mark stars on TV, or recenter current list
Args :
tv : TV instance from which user will mark stars
stars = : existing star table
auto= (bool) : if True, recentroid from existing position
radius= (int): radius to use for centroiding and for size of circles (default=3)
color= (char) : color for circles (default='m')
"""
if func == 'centroid' :
center = centroid.centroid
elif func == 'marginal_gfit' :
center = centroid.marginal_gfit
elif func == 'gfit2' :
center = centroid.gfit2
# clear display and mark current star list( if not new)
if new: tv.tvclear()
try: dateobs=Time(tv.hdr['DATE-OBS'],format='fits')
except: dateobs=None
cards=['EXPTIME','FILTER','AIRMASS']
types=['f4','S','f4']
if stars is None :
stars = Table(names=('id','x', 'y'), dtype=('i4','f4', 'f4'))
stars['x'].info.format = '.2f'
stars['y'].info.format = '.2f'
if dateobs is not None :
stars.add_column(Column([],name='MJD',dtype=('f8')))
stars['MJD'].info.format = '.6f'
for icard,card in enumerate(cards) :
try: stars.add_column(Column([],name=card,dtype=(types[icard])))
except: pass
stars['AIRMASS'].info.format = '.3f'
else :
if auto :
# with auto option, recentroid and update from current header
try:
for icard,card in enumerate(cards) :
try: stars[card] = tv.hdr[card]
except KeyError: stars.add_column(0.,name=card)
stars['AIRMASS'].info.format = '.3f'
try: stars['MJD'] = tv.hdr['MJD']
except KeyError:
stars.add_column(0.,name='MJD')
stars['MJD'].info.format = '.6f'
except: pass
for star in stars :
cent = center(tv.img,star['x'],star['y'],rad)
star['x'] = cent.x
star['y'] = cent.y
if dateobs is not None : star['MJD'] = dateobs.mjd
for icard,card in enumerate(cards) :
try: star[card] = tv.hdr[card]
except: pass
# display stars
for star in stars :
tv.tvcirc(star['x'],star['y'],rad,color=color)
if id : tv.tvtext(star['x'],star['y'],star['id'],color=color)
if exit : return stars
istar=len(stars)+1
print('Hit c near desired star(s) to get centroid position\n'+
' i to use integer position of cursor\n'+
' n to get ID of nearest star\n'+
' q or e to quit')
while True :
key,x,y = tv.tvmark()
if key == 'q' or key == 'e' : break
if key == 'i' :
# add at nearest integer pixel
x = round(x)
y = round(y)
elif key == 'c' :
# centroid around marked position
cent = centroid.centroid(tv.img,x,y,rad)
x = cent.x
y = cent.y
elif key == 'g' :
# gaussian fit to marginal distribution around marked position
cent = centroid.gfit2(tv.img,x,y,rad,plot=tv)
x = cent.x
y = cent.y
elif key == 'n' :
j=np.argmin((x-stars['x'])**2+(y-stars['y'])**2)
print(j)
print('Star: {:d} at ({:f},{:f})'.format(j,stars['x'][j],stars['y'][j]))
continue
else :
print('unimplemented key: ', key,' Try again')
continue
# add blank row, recognizing that we may have added other columns
stars.add_row()
stars[len(stars)-1]['id'] = istar
stars[len(stars)-1]['x'] = x
stars[len(stars)-1]['y'] = y
tv.tvcirc(x,y,rad,color=color)
if dateobs is not None :
stars[len(stars)-1]['MJD'] = dateobs.mjd
for icard,card in enumerate(cards) :
try: stars[len(stars)-1][card] = tv.hdr[card]
except: pass
istar+=1
return stars
[docs]def sdss_label(t,im,label='psfmag_g',maxmag=19, rad=0.25,xoff=0,yoff=-10) :
""" inital stab for getting SDSS coords and labelling image
"""
pos = coord.SkyCoord('{:s} {:s}'.format(
im.header['RA'].replace(' ',':'),im.header['DEC'].replace(' ',':')),
unit=(u.hour,u.degree))
sdss = SDSS.query_region(pos,radius=rad*u.degree,
photoobj_fields=['ra','dec','psfmag_g','psfmag_r','psfmag_i','type'])
x,y = im.wcs.wcs_world2pix(sdss['ra'],sdss['dec'],0)
sdss['x'] = x+xoff
sdss['y'] = y+yoff
sdss['id'] = sdss[label].astype('|S4')
t.tv(im)
gd = np.where((sdss[label] < maxmag) & (sdss[label]>0) & (sdss['type'] == 6) )[0]
mark(t,sdss[gd],rad=0,id=True,color='b')
@support_nddata
def add_coord(data,stars,wcs=None) :
if wcs is not None :
ra,dec=wcs.wcs_pix2world(stars['x'],stars['y'],0)
stars['RA'] = ra
stars['DEC'] = dec
[docs]@support_nddata
def photom(data,stars,uncertainty=None,mask=None,rad=[3],skyrad=None,display=None,
gain=1,rn=0,mag=True,utils=True,cum=False) :
""" Aperture photometry of input image with current star list
"""
# input radius(ii) in a list
if type(rad) is int or type(rad) is float: rad = [rad]
# uncertainty either specified in array, or use gain/rn, but not both
if uncertainty is not None :
if type(uncertainty) is not astropy.nddata.nduncertainty.StdDevUncertainty :
raise Exception('uncertainty must be StdDevUncertainty ')
uncertainty_data = uncertainty.array
else :
uncertainty_data = np.sqrt(data/gain + rn**2/gain**2)
# bad pixels from bitmask
if mask is not None :
pixmask = bitmask.PixMask()
bd = np.where(mask & pixmask.badpix())
data[bd[0],bd[1]] = np.nan
# Add new output columns to table, removing them first if they exist already
emptycol = Column( np.empty(len(stars))*np.nan )
for r in rad :
if type(r) is int : fmt='{:d}'
else : fmt='{:.1f}'
for suffix in ['','err'] :
name=('aper'+fmt+suffix).format(r)
try : stars.remove_column(name)
except: pass
stars.add_column(emptycol,name=name)
if mag : stars[name].info.format = '.3f'
else : stars[name].info.format = '.1f'
try : stars.remove_column('sky')
except: pass
stars.add_column(emptycol,name='sky')
stars['sky'].info.format = '.2f'
try : stars.remove_column('skysig')
except: pass
stars.add_column(emptycol,name='skysig')
stars['skysig'].info.format = '.2f'
try : stars.remove_column('peak')
except: pass
stars.add_column(emptycol,name='peak')
stars['peak'].info.format = '.1f'
cnts=[]
cntserr=[]
# Create pixel index arrays
pix = np.mgrid[0:data.shape[0],0:data.shape[1]]
ypix = pix[0]
xpix = pix[1]
# loop over each stars
for istar in range(len(stars)) :
star=stars[istar]
dist2 = (xpix-star['x'])**2 + (ypix-star['y'])**2
# get sky if requested
if skyrad is not None :
if utils :
try :
sky_aperture = CircularAnnulus((star['x'],star['y']),
r_in=skyrad[0], r_out=skyrad[1])
sky_mask = sky_aperture.to_mask(method='center')
mask=sky_mask.data
skymean, skymedian, skysig = sigma_clipped_stats(
sky_mask.multiply(data)[mask>0])
sky=skymean
sigsq=skysig**2
except :
sky = 0.
sigsq = 0.
else :
gd = np.where((dist2 > skyrad[0]**2) &
(dist2 < skyrad[1]**2) )
sky,skysig,skyskew,nsky = mmm.mmm(data[gd[0],gd[1]].flatten())
sigsq=skysig**2/nsky
if display is not None :
display.tvcirc(star['x'],star['y'],skyrad[0],color='g')
display.tvcirc(star['x'],star['y'],skyrad[1],color='g')
else :
sky =0.
skysig= 0.
sigsq =0.
# photutils aperture photometry handles pixels on the edges
apertures = [ CircularAperture((star['x'],star['y']),r) for r in rad ]
aptab = aperture_photometry(data,apertures,error=uncertainty_data)
# run ApertureStats on first aperture to get peak
apstats = ApertureStats(data,apertures[0],error=uncertainty_data)
# loop over apertures
for irad,r in enumerate(rad) :
#column names for sum and uncertainty
if type(r) is int : fmt='{:d}'
else : fmt='{:.1f}'
name=('aper'+fmt).format(r)
ename=('aper'+fmt+'err').format(r)
# pixels within aperture
area = np.pi*r**2
if utils :
tot = aptab['aperture_sum_{:d}'.format(irad)]
unc = aptab['aperture_sum_err_{:d}'.format(irad)]
else :
# here include pixel only if center is within aperture (not so good)
gd = np.where(dist2 < r**2)
# sum counts, subtract sky
tot =data[gd[0],gd[1]].sum()
# uncertainty
unc = np.sqrt(
(uncertainty_data[gd[0],gd[1]]**2).sum()+
sigsq*area)
# subtract sky, load columns
stars[istar][name] = tot - sky*area
stars[istar][ename] = unc
# instrumental magnitudes if requested
if mag :
stars[istar][ename] = (
1.086*(stars[istar][ename]/stars[istar][name]) )
try : stars[istar][name] = -2.5 * np.log10(stars[istar][name])
except : stars[istar][name] = 99.999
if display is not None :
display.tvcirc(star['x'],star['y'],r,color='b')
stars[istar]['sky'] = sky
stars[istar]['skysig'] = skysig
stars[istar]['peak'] = apstats.max
if cum :
cumulative=[]
for irad,r in enumerate(rad) :
if type(r) is int : fmt='{:d}'
else : fmt='{:.1f}'
name=('aper'+fmt).format(r)
cumulative.append(stars[name])
return stars, np.array(cumulative)
else :
return stars
[docs]def get(file) :
""" Read FITS table into internal photometry list """
stars=Table.read(file)
return stars
[docs]def save(file,stars) :
""" Save internal photometry list to FITS table"""
stars.write(file,overwrite=True)
[docs]def process_all(files,red,tab,bias=None,dark=None,flat=None,threads=8, display=None, solve=True,
seeing=15,rad=[3,5,7],skyrad=[10,15],cards=['EXPTIME','FILTER','AIRMASS']):
""" multi-threaded processing of files
"""
pars=[]
for file in files :
pars.append((file,red,tab,bias,dark,flat,solve,seeing,rad,skyrad,cards))
if threads == 0 :
output=[]
for par in pars :
output.append(process_thread(par))
else :
pool = mp.Pool(threads)
output = pool.map_async(process_thread, pars).get()
pool.close()
pool.join()
all=[]
for out in output :
all=vstack([all,out])
return all
def process_thread(pars) :
file = pars[0]
red = pars[1]
tab = pars[2]
bias = pars[3]
dark = pars[4]
flat = pars[5]
solve = pars[6]
seeing= pars[7]
rad= pars[8]
skyrad= pars[9]
cards= pars[10]
return process(file,red,tab,bias=bias,dark=dark,flat=flat,
rad=rad,skyrad=skyrad,seeing=seeing,cards=cards)
[docs]def process(file,red,tab,bias=None,dark=None,flat=None,display=None, solve=True,
seeing=15,rad=[3,5,7],skyrad=[10,15],cards=['EXPTIME','FILTER','AIRMASS']):
""" Process and do photometry on input file
"""
# work in temporary directory
cwd = os.getcwd()
try:
with tempfile.TemporaryDirectory(dir='./') as tempdir :
os.chdir(tempdir)
# process file
a=red.reduce(file,dark=dark,bias=bias,flat=flat,solve=solve,
seeing=seeing,display=display)
dateobs=Time(a.header['DATE-OBS'],format='fits')
# get x,y positions from RA/DEC and load into photometry table
x,y=a.wcs.wcs_world2pix(tab['RA'],tab['DEC'],0)
phot=copy.copy(tab)
phot['x']=x
phot['y']=y
# re-centroid stars
if display is not None :
display.tv(a)
mark(display,phot,exit=True,auto=False,color='r',new=True,
rad=seeing/red.scale)
mark(display,phot,exit=True,auto=True,color='g',rad=seeing/red.scale)
else :
for star in phot :
x,y = centroid(a.data,star['x'],star['y'],seeing/red.scale)
star['x'] = x
star['y'] = y
# do photometry
try : phot=photom(a,phot,rad=rad,skyrad=skyrad,display=display)
except :
print('Error with photom')
phot.add_column(Column([file]*len(tab),name='FILE',dtype=str))
for card in cards :
phot[card] = [a.header[card]]*len(tab)
phot['MJD'] = [Time(a.header['DATE-OBS'],format='fits').mjd]*len(tab)
#except :
# print('Error in process')
# pdb.set_trace()
# phot=copy.copy(tab)
os.chdir(cwd)
except OSError :
print('OSError')
return phot
def dostar(red,obj,date,filts=['SR+D25'],seeing=12,dark=None,flats=[None],solve=True,
rad=np.arange(5,45,5), skyrad=[50,60],clobber=False,threads=32,display=None) :
if dark is None : print('No dark frame')
try : tab = Table.read(obj+'.fits')
except : tab=None
grid=[]
for filt,flat in zip(filts,flats) :
if flat is None : print('No flat frame')
files= glob.glob(red.dir+'/*'+obj+'*'+filt+'*')
files.sort()
if tab == None :
print('no existing star table ....')
out = red.reduce(files[0],dark=dark,flat=flat,solve=solve,seeing=seeing)
t=tv.TV()
t.tv(out)
print('mark desired stars...')
tab=mark(t,rad=seeing/red.scale)
add_coord(out,tab)
tab.write(obj+'.fits')
sav='{:s}.{:s}.{:s}'.format(obj,date,filt)
if not os.path.exists(sav+'.fits') or clobber :
out = process_all(files,red,tab,flat=flat,dark=dark,seeing=seeing,solve=solve,
rad=rad,skyrad=skyrad,threads=threads,display=display)
out.write(sav+'.fits',overwrite=True)
else :
out=Table.read(sav+'.fits')
diffphot(out,title=sav, hard=sav)
grid.append([sav+'_mjd.png',sav+'_air.png'])
html.htmltab(grid,file=obj+'.'+date+'.html')
[docs]def diffphot(tab,aper='aper35.0',yr=0.1,title=None,hard=None) :
""" Make differential photometry plots
including airmass detrending
"""
nstars = len(set(tab['id']))
nmjd = len(set(tab['MJD']))
dat = np.zeros([nmjd,nstars])
daterr = np.zeros([nmjd,nstars])
x = np.zeros([nmjd])
air = np.zeros([nmjd])
# two plots, one vs MJD and one vs airmass
fig,ax=plots.multi(nstars,nstars,figsize=(14,8),hspace=0.001,wspace=0.5)
airfig,airax=plots.multi(nstars,nstars,figsize=(14,8),hspace=0.001,wspace=0.5)
# loop over all MJDs
for i,mjd in enumerate(sorted(set(tab['MJD']))) :
# load up x, air, dat, and daterr arrays
x[i]=mjd
for j in range(nstars):
ii=np.where((tab['MJD'] == mjd) & (tab['id'] == j+1) ) [0]
dat[i,j] = tab[aper][ii]
daterr[i,j] = tab[aper+'err'][ii]
air[i] = tab['AIRMASS'][ii]
# total the comp stars
#afig,aax = plots.multi(1,1)
#tot = -2.5*np.log10((10**(-0.4*dat[:,1:])).sum(axis=1))
#aax.scatter(x,dat[:,0]-tot)
#aax.set_ylim(aax.get_ylim()[1],aax.get_ylim()[0])
# make plots of all pairs of stars
for j in range(nstars) :
for k in range(j,nstars) :
diff=dat[:,j]-dat[:,k]
err = np.sqrt(daterr[:,j]**2+daterr[:,k]**2)
med=np.median(diff)
std=np.std(diff)
mad=np.median(np.abs(diff-med))
# airmass detrending fit
fit=np.polyfit(air,diff,1)
diff_fit=diff-fit[0]*(air-np.median(air))
med_fit=np.median(diff_fit)
std_fit=np.std(diff_fit)
mad_fit=np.median(np.abs(diff_fit-med_fit))
# plots
ax[j,k].scatter(x,diff,edgecolors='g',facecolors='none')
ax[j,k].errorbar(x,diff,yerr=err,fmt='none',color='g')
ax[j,k].scatter(x,diff_fit,color='b')
ax[j,k].errorbar(x,diff_fit,yerr=err,fmt='none',color='b')
ax[j,k].set_xlabel('MJD')
ax[j,k].scatter(x,dat[:,j]-np.median(dat[:,j])+np.median(diff)+0.01,edgecolors='r',facecolors='none')
ax[j,k].scatter(x,dat[:,k]-np.median(dat[:,k])+np.median(diff)-0.01,edgecolors='m',facecolors='none')
ax[j,k].set_ylim(med+yr,med-yr)
bd =np.where(diff>(med+yr))[0]
ax[j,k].scatter(x[bd],len(bd)*[med+yr-.01*yr],marker=6,color='r')
bd =np.where(diff<(med-yr))[0]
ax[j,k].scatter(x[bd],len(bd)*[med-yr+.01*yr],marker=7,color='r')
ax[j,k].text(0.,0.9,'std: {:.3f} MAD: {:.3f}'.format(std,mad),
transform=ax[j,k].transAxes,color='g')
ax[j,k].text(0.,0.1,'std: {:.3f} MAD: {:.3f}'.format(
std_fit,mad_fit),transform=ax[j,k].transAxes,color='b')
airax[j,k].scatter(air,diff,color='g')
airax[j,k].errorbar(air,diff,yerr=err,fmt='none',color='g')
airax[j,k].scatter(air,diff_fit,color='b')
airax[j,k].errorbar(air,diff_fit,yerr=err,fmt='none',color='b')
airax[j,k].scatter(air,dat[:,j]-np.median(dat[:,j])+np.median(diff)+0.01,edgecolors='r',facecolors='none')
airax[j,k].scatter(air,dat[:,k]-np.median(dat[:,k])+np.median(diff)-0.01,edgecolors='m',facecolors='none')
airax[j,k].set_ylim(med+yr,med-yr)
bd =np.where(diff>(med+yr))[0]
airax[j,k].scatter(air[bd],len(bd)*[med+yr-.01*yr],
marker=6,color='r')
bd =np.where(diff<(med-yr))[0]
airax[j,k].scatter(air[bd],len(bd)*[med-yr+.01*yr],
marker=7,color='r')
airax[j,k].set_xlabel('Airmass')
airax[j,k].text(0.,0.9,'std: {:.3f} MAD: {:.3f}'.format(std,mad),
transform=airax[j,k].transAxes,color='g')
airax[j,k].text(0.,0.1,'std: {:.3f} MAD: {:.3f}'.format(
std_fit,mad_fit),transform=airax[j,k].transAxes,color='b')
# remove unfilled plots
for j in range(nstars) :
for k in range(j) :
ax[j,k].set_visible(False)
airax[j,k].set_visible(False)
if title != None :
fig.suptitle(title)
airfig.suptitle(title)
if hard != None :
fig.savefig(hard+'_mjd.png')
airfig.savefig(hard+'_air.png')
#fig.tight_layout()
#plt.draw()
#airfig.tight_layout()
#plt.draw()
#pdb.set_trace()
#plt.close()
#plt.close()
#plt.close()
return x,dat