""" Module of helpful functions to aid PSF subtraction and emission searching modules in pycube
"""
import extinction
import numpy as np
import copy
import matplotlib.pyplot as plt
from astropy.stats import sigma_clipped_stats
import astropy.coordinates as coord
import astropy.units as u
from photutils import CircularAperture
from photutils import CircularAnnulus
from photutils import aperture_photometry
from photutils import EllipticalAperture
from photutils import centroids
from astroquery.irsa_dust import IrsaDust
[docs]def nice_plot():
"""Universal plotting parameters in place for debug outputs of functions
"""
print("nice_plot: Setting rcParams")
plt.rcParams["xtick.top"] = True
plt.rcParams["ytick.right"] = True
plt.rcParams["xtick.minor.visible"] = True
plt.rcParams["ytick.minor.visible"] = True
plt.rcParams["ytick.direction"] = 'in'
plt.rcParams["xtick.direction"] = 'in'
plt.rcParams["xtick.major.size"] = 6
plt.rcParams["ytick.major.size"] = 6
plt.rcParams["xtick.minor.size"] = 3
plt.rcParams["ytick.minor.size"] = 3
plt.rcParams["xtick.labelsize"] = 30
plt.rcParams["ytick.labelsize"] = 30
plt.rcParams["xtick.major.width"] = 1
plt.rcParams["ytick.major.width"] = 1
plt.rcParams["xtick.minor.width"] = 1
plt.rcParams["ytick.minor.width"] = 1
plt.rcParams["axes.linewidth"] = 2
plt.rcParams["axes.labelsize"] = 25
plt.rcParams["lines.linewidth"] = 3
plt.rcParams["lines.markeredgewidth"] = 3
plt.rcParams["patch.linewidth"] = 5
plt.rcParams["hatch.linewidth"] = 5
plt.rcParams["font.size"] = 30
plt.rcParams["legend.frameon"] = True
plt.rcParams["legend.handletextpad"] = 1
[docs]def find_sigma(data):
"""Simple expression to calculate Sigma quickly. Taking square root of the median value of an array.
Parameters
----------
data : np.array
2D array of interest to generate sigma value
Returns
-------
float
Sigma value of given 2D array, ignoring NaNs
"""
return np.sqrt(np.nanmedian(data))
[docs]def channel_array(datacube,
channel):
"""Given a 3D data cube, and a channel (x,y,z), creates a numpy array of values of channel range
Parameters
----------
datacube : np.array
3D datacube to create range of channel values
channel : str
Dimension to create array from
Returns
-------
np.array
Range array of length of given dimension
"""
channel_vals = ['x', 'X', 'y', 'Y', 'z', 'Z']
channel = str(channel)
z_max, y_max, x_max = np.shape(datacube)
if channel == 'x' or channel == 'X':
channel_range = np.arange(0, x_max, 1, int)
elif channel == 'y' or channel == 'Y':
channel_range = np.arange(0, y_max, 1, int)
elif channel == 'z' or channel == 'Z':
channel_range = np.arange(0, z_max, 1, int)
else:
raise ValueError('correct input for function is one of the following:', channel_vals)
del channel_vals
del z_max, y_max, x_max
return channel_range
[docs]def convert_to_wave(datacontainer,
datacube,
channels='z'):
"""Converts channel values in 3D MUSE data into wavelength values along z axis (wavelength).
Specifically works with .FITS formatted MUSE data.
Utilizes header vals ('CRVAL3' and 'CD3_3')
Parameters
----------
datacontainer : IFUcube object
data initialized in cubeClass.py
datacube : np.array
3D array to be converted to wavelengths
channels : str, optional
Channel dimension ('x', 'y', 'z' or 'X', 'Y', 'Z') to create wavelength range array (default: 'z')
Returns
-------
np.array
An array of wavelength values for given channel
"""
data_headers = datacontainer.get_data_header()
data_channel_range = channel_array(datacube, channels)
data_wave = data_headers['CRVAL3'] + (np.array(data_channel_range) * data_headers['CD3_3'])
return np.array(data_wave, float)
[docs]def convert_to_channel(datacontainer,
datacube,
channels='z'):
"""Converts wavelength values in 3D MUSE data into channel value along z axis
Specifically works with .FITS formatted MUSE data.
Parameters
----------
datacontainer : IFUcube
data initialized in cubeClass.py
datacube : np.array
3D data array to be converted to channel
channels : str, optional
Channel dimension ('x', 'y', 'z' or 'X', 'Y', 'Z') to create channel range array (default: 'z')
Returns
-------
np.array
An array of channel values for given wavelength
"""
data_headers = datacontainer.get_data_header()
channel_range = channel_array(datacube, channels)
channel = np.array(channel_range) - data_headers['CRVAL3'] / data_headers['CD3_3']
return np.array(channel, float)
[docs]def collapse_cube(datacube,
min_lambda=None,
max_lambda=None,
mask_z=None):
""" Given a 3D data/stat cube .FITS file, this function collapses along the z-axis given a range of values.
If mask_z is specified, the function will remove all channels masked as 1.
Parameters
----------
datacube : np.array
3D data cube
min_lambda : int, optional
Minimum wavelength to collapse file
max_lambda : int, optional
Maximum wavelength to collapse file
mask_z : np.array, optional
range of z-axis values to mask when collapsing
Returns
-------
np.array
Condensed 2D array of 3D file.
"""
datacopy = np.copy(datacube)
z_max, y_max, x_max = np.shape(datacopy)
# Checks and resets if outside boundaries of z
if max_lambda is None or max_lambda > z_max:
max_lambda = z_max
print("collapse_cube : Exceeded / unspecified wavelength in data cube. "
"Max value is set to {}".format(int(z_max)))
if min_lambda is None or min_lambda < 0:
min_lambda = 0
print("collapse_cube : Invalid / unspecified minimum wavelength. Min value is set to 0")
if mask_z is not None:
datacopy[mask_z, :, :] = np.nan
# Sums values between specifications, ignoring NaNs
col_cube = np.nansum(datacopy[min_lambda:max_lambda, :, :], axis=0)
del datacopy
del z_max, y_max, x_max
return col_cube
[docs]def collapse_mean_cube(datacube,
statcube,
min_lambda=None,
max_lambda=None,
mask_z=None):
"""Given 3D arrays of data and variance, this function collapses it along the
z-axis between min_lambda and max_lambda. If mask_z is given, channels masked
as 1 (or True) are removed. The macro uses the stat information to perform a
weighted mean along the velocity axis. In other words, each spaxel of the resulting
image will be the weighted mean spectrum of that spaxels along the wavelengths.
Parameters
----------
datacube : np.array
3D data file
statcube : np.array
3D variance file
min_lambda : int
minimum wavelength to collapse cube
max_lambda : int
maximum wavelength to collapse cube
mask_z : np.array
if specified, channels masked as 1 will be removed when collapsed
Returns
-------
np.arrays
2D collapsed and averaged data and variance cubes
"""
temp_collapsed_data = collapse_cube(datacube, min_lambda,
max_lambda, mask_z=mask_z)
temp_collapsed_stat = collapse_cube(statcube, min_lambda,
max_lambda, mask_z=mask_z)
temp_collapsed_stat = 1. / temp_collapsed_stat
bad_pix = np.isnan(temp_collapsed_data) | np.isnan(temp_collapsed_stat)
# to deal with np.nan a weight of (almost) zero is given to bad_pix
temp_collapsed_data[bad_pix] = 0.
temp_collapsed_stat[bad_pix] = np.nanmin(temp_collapsed_stat) / 1000.
collapsed_data, collapsed_weights = np.average(temp_collapsed_data,
weights=temp_collapsed_stat,
axis=0,
returned=True)
collapsed_stat = 1. / collapsed_weights
plt.imshow(temp_collapsed_stat)
plt.show()
print("collapse_mean_cube: Images produced")
# preserve memory
del temp_collapsed_data
del temp_collapsed_stat
del collapsed_weights
return collapsed_data, collapsed_stat
[docs]def collapse_container(datacontainer,
min_lambda=None,
max_lambda=None,
mask_z=None,
var_thresh=5.):
""" Given an IFUcube, this function collapses along the z-axis given a range of values.
If mask_z is specified, it will remove values masked as 1. If no variance (stat) data exists,
it will be generated from the data.
Parameters
----------
datacontainer : IFUcube object
data passed through cubeClass.py
min_lambda : int, optional
Minimum wavelength
max_lambda : int, optional
Maximum wavelength
mask_z : np.array, optional
range of z-axis values that the user can mask in the datacube
var_thresh : int, float, optional
if no variance in container, this value determines standard deviation threshold
to create variance cube from data (default is 5.)
Returns
-------
np.array
Condensed 2D array of 3D file.
"""
# pulls information from IFU object
datacopy, statcopy = datacontainer.get_data_stat()
z_max, y_max, x_max = np.shape(datacopy)
# Checks and sets if outside boundaries of z
if max_lambda is None or max_lambda > z_max:
max_lambda = z_max
print("collapse_container: Exceeded / unspecified wavelength in data cube. "
"Max value is set to {}".format(int(z_max)))
if min_lambda is None or min_lambda < 0:
min_lambda = 0
print("collapse_container: Invalid / unspecified minimum wavelength. Min value is set to 0")
if mask_z is not None:
datacopy[mask_z, :, :] = np.nan
col_datacube = np.nansum(datacopy[min_lambda:max_lambda, :, :], axis=0)
if statcopy is not None:
if mask_z is not None:
statcopy[mask_z, :, :] = np.nan
col_statcube = np.nansum(statcopy[min_lambda:max_lambda, :, :], axis=0)
else:
# generation of variance stemming from dataset
med_cube = np.nanmedian(col_datacube)
std_cube = np.nanstd(col_datacube - med_cube)
tmp_col_statcube =\
np.zeros_like(col_datacube) + np.nanvar(col_datacube[(col_datacube - med_cube) < (var_thresh * std_cube)])
if mask_z is not None:
tmp_col_statcube[mask_z, :, :] = np.nan
col_statcube = np.nansum(tmp_col_statcube[min_lambda:max_lambda, :, :], axis=0)
del tmp_col_statcube
del med_cube
del std_cube
del datacopy
del statcopy
del z_max, y_max, x_max
return col_datacube, col_statcube
[docs]def collapse_mean_container(datacontainer,
min_lambda=None,
max_lambda=None,
mask_z=None,
threshold=5.):
"""Given an IFUcube, collapse it along the z-axis between min_lambda and
max_lambda. If mask_z is given, channels masked as 1 (or True) are removed. The
macro uses the stat information to perform a weighted mean along the velocity
axis. In other words, each spaxel of the resulting image will be the weighted
mean spectrum of that spaxels along the wavelengths.
Parameters
----------
datacontainer : IFUcube object
data passed through cubeClass.py
min_lambda : int, optional
minimum wavelength to collapse cube
max_lambda : int, optional
maximum wavelength to collapse cube
mask_z : np.array, optional
if specified, will remove channels masked with 1
threshold : int, float, optional
if no variance file in data, variance is created from the data. Parameter changes threshold of standard
deviation for the creation of this variance (default is 5.)
Returns
-------
np.array
collapsed data and variance
"""
temp_collapsed_data, temp_collapsed_stat = collapse_container(datacontainer, min_lambda, max_lambda,
mask_z=mask_z, var_thresh=threshold)
temp_collapsed_stat = 1. / temp_collapsed_stat
bad_pix = np.isnan(temp_collapsed_data) | np.isnan(temp_collapsed_stat)
# to deal with np.nan a weight of (almost) zero is given to badPix
temp_collapsed_data[bad_pix] = 0.
temp_collapsed_stat[bad_pix] = np.nanmin(temp_collapsed_stat) / 1000.
collapsed_data, collapsed_weights = np.average(temp_collapsed_data,
weights=temp_collapsed_stat,
axis=0,
returned=True)
collapsed_stat = 1. / collapsed_weights
# preserve memory
del temp_collapsed_data
del temp_collapsed_stat
del collapsed_weights
return collapsed_data, collapsed_stat
[docs]def location(data_flat, x_position=None, y_position=None,
semi_maj=None, semi_min=None,
theta=0.):
"""User input function to create elliptical mask of given coordinates for a source in an image.
Works also for multiple position inputs to create a mask of all sources present.
Parameters
----------
data_flat : np.array
2D collapsed image
x_position : int, float, optional
User given x coord of stellar object
y_position : int, float, optional
User given y coord of stellar object
semi_maj : int, float, optional
Semi-major axis (default is 10)
semi_min : int, float, optional
Semi-minor axis default (default is 6)
theta : int, float, optional
Angle for ellipse rotation around object (default is 0)
Returns
-------
np.array
Mask of 2D array of with user defined stellar objects
denoted as 1 with all other elements 0
"""
mask_array = np.zeros_like(data_flat)
mask_shape = np.shape(data_flat)
x_mask, y_mask = mask_shape
object_position = (x_position, y_position)
if semi_maj is None:
semi_maj = 10
print("location: Missing semi-major axis <- setting pixel value to {}".format(semi_maj))
if semi_min is None:
semi_min = 6
print("location: Missing semi-minor axis <- setting pixel value to {}".format(semi_min))
if x_position is None:
print("location: Please specify location of the object")
return mask_array # returned mask of zeros
elif type(x_position) is int or type(x_position) is float:
print("location: single source identified")
theta_rad = (theta * np.pi) / 180. # converts angle degrees to radians
object_ellipse = EllipticalAperture(object_position, semi_maj, semi_min, theta_rad)
ellipse_mask = object_ellipse.to_mask(method="center").to_image(shape=(x_mask, y_mask))
mask_array += ellipse_mask
else:
# accounts for single input of angle and semi-diameter values
# creates full arrays of the specified value to apply to all objects
if type(theta) is int or type(theta) is float:
theta = np.zeros_like(x_position)
if type(semi_min) is int or type(semi_min) is float:
semi_min = np.full_like(x_position, semi_min)
if type(semi_maj) is int or type(semi_maj) is float:
semi_maj = np.full_like(x_position, semi_maj)
# masking for multiple source input (array of (x,y))
for index in range(0, len(x_position), 1):
object_position = [x_position[index], y_position[index]]
theta_rad = (theta[index] * np.pi) / 180. # converts angle degrees to radians
object_ellipse = EllipticalAperture(object_position, semi_maj[index], semi_min[index], theta_rad)
ellipse_mask = object_ellipse.to_mask(method="center").to_image(shape=(x_mask, y_mask))
mask_array += ellipse_mask
# preserve memory
del object_ellipse
del ellipse_mask
del object_position
del x_mask
del y_mask
return np.array((mask_array > 0.), int)
[docs]def annular_mask(data_flat,
x_pos,
y_pos,
semi_maj=5.,
semi_min=5.,
delta_maj=2.,
delta_min=2.,
theta=0.):
"""Returning a mask where annuli centered in x_pos and y_pos with
inner axis (semi_maj, semi_min) and with
outer axis (semi_maj + delta_maj, semi_min + delta_min) are marked as 1.
Parameters
----------
data_flat : np.array
data in a 2D array. The mask will have the same size
of this.
x_pos : np.array
x-location of the sources in pixels
y_pos : np.array
y-location of the sources in pixels
semi_maj : float, optional
inner semi-major axis in pixel (default is 5.)
semi_min : float, optional
inner semi-minor axis in pixel (default is 5.)
delta_maj : float, optional
width of the semi-major axis in pixel (default is 2.)
delta_min : float, optional
width of the semi-minor axis in pixel (default is 2.)
theta : float, optional
angle wrt the x-axis in degrees (default is 0.)
Returns
-------
np.array
mask where sources are marked with 1 and background with 0.
It has the same dimensions of the input datacube
"""
# Creating mask
img_msk = location(data_flat, x_pos, y_pos,
semi_maj=semi_maj + delta_maj, semi_min=semi_min + delta_min,
theta=theta)
inner_img_mask = location(data_flat, x_pos, y_pos,
semi_maj=semi_maj, semi_min=semi_min,
theta=theta)
img_msk[(inner_img_mask > 0)] = 0
# Cleaning up memory
del inner_img_mask
return img_msk.astype(int)
[docs]def small_cube(datacontainer,
min_lambda=None,
max_lambda=None):
"""Given an IFUcube, this function collapses along min_lambda
and max_lambda. It also updates the wavelength information to
be conformed to the new size. Note that min_lambda and max_lambda
can be given both as wavelength and as channels. If the input value is
<3000. it will be assumed to be channel number, otherwise
wavelength in Angstrom will be considered.
Parameters
----------
datacontainer : IFUcube object
data intialized in cubeClass.py
min_lambda : int, optional
min channel to create collapsed image
max_lambda : int, optional
max channel to create collapsed image
Returns
-------
s_primary_headers : hdu header
primary header
s_data_headers : hdu header
fits header for DATA with corrected CRVAL3
s_datacopy : np.array
data in a 3D array trim from min_lambda to max_lambda
s_stat_headers : hdu header
fits header for STAT with corrected CRVAL3
s_statcopy : np.array
variance in a 3D array trim from min_lambda to max_lambda
"""
# assignment of variables to all information from IFU cube
primary_headers = datacontainer.get_primary()
datacopy, statcopy = datacontainer.get_data_stat()
data_headers, stat_headers = datacontainer.get_headers()
# Check for the size of the cube
z_max, y_max, x_max = np.shape(datacopy)
# Setting min and max channels for collapsing
if (min_lambda is not None) & (max_lambda is not None):
min_channel_sort = np.min([min_lambda, max_lambda])
max_channel_sort = np.max([min_lambda, max_lambda])
min_lambda, max_lambda = min_channel_sort, max_channel_sort
del min_channel_sort
del max_channel_sort
# set values in case of Nones
if min_lambda is None:
print("small_cube: min_lambda set to 0")
min_lambda = 0
if max_lambda is None:
print("small_cube: max_lambda set to {}".format(int(z_max)))
max_lambda = int(z_max)
# If input values are larger than 3000., the macro converts from
# wavelength in ang. to channel number.
if min_lambda > 3000.:
print("small_cube: Converting min wavelength in Ang. to channel number")
min_lambda = int((np.array(min_lambda - data_headers['CRVAL3']) / data_headers['CD3_3']))
else:
min_lambda = int(min_lambda)
if max_lambda > 3000.:
print("small_cube: Converting Max wavelength in Ang. to channel number")
max_lambda = int((np.array(max_lambda - data_headers['CRVAL3']) / data_headers['CD3_3']))
else:
max_lambda = int(max_lambda)
# Check for upper and lower limits
if min_lambda < 0:
print("small_cube: Negative value for min_lambda set to 0")
min_lambda = int(0)
if max_lambda > (z_max + 1):
print("small_cube: max_lambda is outside the cube size. Set to {}".format(int(z_max)))
max_lambda = int(z_max)
# updates CRVAL3 of new data cube
small_cube_CRVAL3 = float(np.array(data_headers['CRVAL3'] + (np.array(min_lambda) * data_headers['CD3_3'])))
print("small_cube: Creating smaller cube")
print(" The old pivot wavelength was {}".format(data_headers['CRVAL3']))
print(" The new pivot wavelength is {}".format(small_cube_CRVAL3))
# Header
s_primary_headers = copy.deepcopy(primary_headers)
# DATA
s_data_headers = copy.deepcopy(data_headers)
s_data_headers['CRVAL3'] = small_cube_CRVAL3
s_datacopy = np.copy(datacopy[min_lambda:max_lambda, :, :])
# STAT
s_stat_headers = copy.deepcopy(stat_headers)
s_stat_headers['CRVAL3'] = small_cube_CRVAL3
s_statcopy = np.copy(statcopy[min_lambda:max_lambda, :, :])
return s_primary_headers, s_data_headers, s_datacopy, s_stat_headers, s_statcopy
[docs]def dust_correction(datacontainer):
"""Function queries the IRSA dust map for E(B-V) value and
returns a reddening array. Works along z-axis of datacube
http://irsa.ipac.caltech.edu/applications/DUST/
The query return E(B_V) from SFD (1998). This will be converted
to the S&F (2011) one using:
E(B-V)S&F = 0.86 * E(B-V)SFD
Parameters
----------
datacontainer : np.array
3D .fits file array / IFU cube object
Returns
-------
reddata, redstat : np.array
galactic dust corrected 3D arrays for data and variance
"""
reddata, redstat = datacontainer.get_data_stat()
channel_range = channel_array(reddata, 'z') # creates channel along z-axis
headers = datacontainer.get_primary()
ra = headers['RA']
dec = headers['DEC']
wavecube = convert_to_wave(datacontainer, reddata) # creates wavelength value cube along z-axis
coordinates = coord.SkyCoord(ra * u.deg, dec * u.deg, frame='fk5')
try:
dust_image = IrsaDust.get_images(coordinates, radius=5. * u.deg, image_type='ebv', timeout=60)[0]
except TimeoutError:
print("Increasing dust image radius to 10deg")
dust_image = IrsaDust.get_images(coordinates, radius=10. * u.deg, image_type='ebv', timeout=60)[0]
y_size, x_size = dust_image[0].data.shape
ebv = 0.86 * np.mean(dust_image[0].data[y_size - 2:y_size + 2, x_size - 2:x_size + 2])
r_v = 3.1
av = r_v * ebv
reddened_wave = extinction.fm07(wavecube, av)
for channel in channel_range:
reddata[channel, :, :] *= reddened_wave[channel]
redstat[channel, :, :] *= (reddened_wave[channel] ** 2)
return reddata, redstat
[docs]def quick_ap_photometry(datacopy,
statcopy,
x_pos,
y_pos,
radius_pos=2.,
inner_rad=10.,
outer_rad=15.):
"""Performing quick circular aperture photometry on an image.
An annular region with inner radius [inner_rad] and outer radius [outer_rad]
will be used to estimate the background.
Parameters
----------
datacopy : np.array
image where the aperture photometry will be performed
statcopy : np.array
variance image that will be used to calculate the errors
on the aperture photometry
x_pos : int, float, np.array
x-location of the source in pixel (converts to array)
y_pos : int, float, np.array
y-location of the source in pixel (converts to array)
radius_pos : np.array, optional
radius where to perform the aperture photometry (default is 2.)
inner_rad : np.array, optional
inner radius of the background region in pixel (default is 10.)
outer_rad : np.array, optional
outer radius of the background region in pixel (default is 15.)
Returns
-------
obj_flux, obj_err_flux, ave_flux : np.array
flux of object, sigma of flux, and average flux values
"""
print("quick_ap_photometry: Performing aperture photometry")
# converts any input into a np.array
if np.size(x_pos) == 1:
x_object = np.array([x_pos])
y_object = np.array([y_pos])
else:
x_object = np.array(x_pos)
y_object = np.array(y_pos)
if np.size(radius_pos) == 1:
radius_aperture = np.full_like(x_object, radius_pos, float)
else:
radius_aperture = radius_pos
if np.size(inner_rad) == 1:
inner_aperture = np.full_like(x_object, inner_rad, float)
else:
inner_aperture = inner_rad
if np.size(outer_rad) == 1:
outer_aperture = np.full_like(x_object, outer_rad, float)
else:
outer_aperture = outer_rad
# setting up arrays
obj_flux = np.zeros_like(x_object, float)
obj_err_flux = np.zeros_like(x_object, float)
ave_flux = np.zeros_like(x_object, float)
for index in np.arange(0, np.size(x_object)):
print("Source", index + 1,
": Aperture photometry on source at: {}, {}".format(x_object[index], y_object[index]))
obj_pos = [x_object[index], y_object[index]]
circle_obj = CircularAperture(obj_pos, r=radius_aperture[index])
annulus_obj = CircularAnnulus(obj_pos, r_in=inner_aperture[index], r_out=outer_aperture[index])
# Flux
ap_phot = aperture_photometry(datacopy, circle_obj)
# Background
image_datacopy = np.copy(datacopy)
bad_bg = np.min([np.nanmin(image_datacopy), -99999.])
image_datacopy[~np.isfinite(image_datacopy)] = bad_bg
mask_bg = annulus_obj.to_mask(method='center')
data_bg = mask_bg.multiply(image_datacopy)
data_bg[(np.array(mask_bg) == 0)] = bad_bg
flat_data_bg = data_bg[data_bg > bad_bg].flatten()
mean_bg, median_bg, sigma_bg = sigma_clipped_stats(flat_data_bg)
# Variance
var_ap_phot = aperture_photometry(statcopy, circle_obj)
# Filling arrays
obj_flux[index] = ap_phot['aperture_sum'][0] - (median_bg * circle_obj.area)
obj_err_flux[index] = np.power(np.array(var_ap_phot['aperture_sum'][0]), 0.5)
ave_flux[index] = median_bg
# Deleting temporary arrays to clear up memory
del radius_aperture
del inner_aperture
del outer_aperture
del obj_pos
del x_object
del y_object
del image_datacopy
return obj_flux, obj_err_flux, ave_flux
[docs]def quick_ap_photometry_no_bg(datacopy,
statcopy,
x_pos,
y_pos,
obj_rad=2.):
"""Performing quick circular aperture photometry on an image
without background subtraction
Parameters
----------
datacopy : np.array
image where the aperture photometry will be performed
statcopy : np.array
variance image that will be used to calculate the errors
on the aperture photometry
x_pos : np.array
x-location of the source in pixel
y_pos : np.array
y-location of the source in pixel
obj_rad : np.array, optional
radius where to perform the aperture photometry (default is 2.)
Returns
-------
flux_obj, err_flux_obj : np.arrays
fluxes and errors of the sources derived from datacopy
and statcopy
"""
print("quick_ap_photometry_no_bg: Performing aperture photometry")
# creates arrays of position values regardless of quantity
if np.size(x_pos) == 1:
x_object = np.array([x_pos])
y_object = np.array([y_pos])
else:
x_object = np.array(x_pos)
y_object = np.array(y_pos)
if np.size(obj_rad) == 1:
r_aperture = np.full_like(x_object, obj_rad, float)
else:
r_aperture = np.array(obj_rad)
# setting up empty arrays
flux_obj = np.zeros_like(x_object, float)
err_flux_obj = np.zeros_like(x_object, float)
# filling arrrays
print("qap_photometry_no_bg: Aperture photometry on source:")
for index in range(0, np.size(x_object)):
print(" {:03d} -> ({:03.2f},{:03.2f})".format(index, float(x_object[index]),
float(y_object[index])))
object_pos = [x_object[index], y_object[index]]
circ_obj = CircularAperture(object_pos, r=r_aperture[index])
ap_phot = aperture_photometry(datacopy, circ_obj)
var_ap_phot = aperture_photometry(statcopy, circ_obj)
flux_obj[index] = ap_phot['aperture_sum'][0]
err_flux_obj[index] = np.power(np.array(var_ap_phot['aperture_sum'][0]), 0.5)
# Deleting temporary arrays to clear up memory
del r_aperture
del object_pos
del circ_obj
del ap_phot
del var_ap_phot
return flux_obj, err_flux_obj
[docs]def q_spectrum(datacontainer,
x_pos,
y_pos,
statcube=None,
radius_pos=2.,
inner_rad=10.,
outer_rad=15.,
void_mask=None):
"""Performing quick spectrum extraction from a circular aperture on a cube.
Background is calculated from the median of an annular aperture performing
sigma clipping.
Parameters
----------
datacontainer : IFUcube object
original data read into cubeClass.py
x_pos : np.array
x-location of the source in pixel
y_pos: np.array
y-location of the source in pixel
statcube : np.array, optional
variance in 3D array if specifying a 3D array for datacube.
Otherwise, pulls this array from IFUcube object (default is None)
radius_pos: np.array, optional
radius where to perform the aperture photometry (default is 2.)
inner_rad: np.array, optional
inner radius of the background region in pixel (default is 10.)
outer_rad: np.array, optional
outer radius of the background region in pixel (default is 15.)
void_mask : np.array, optional
mask of possible contaminants (1->the pixel will be
removed from the photometry) (default is None)
Returns
-------
spec_ap_phot : np.array
spectra of selected object
spec_var_ap_phot : np.array
sigma of spectra
spec_flux_bg : np.array
background spectral flux
"""
# allows assignment of an IFUcube object or two 3D arrays
if statcube is None:
datacopy, statcopy = datacontainer.get_data_stat()
else:
datacopy = datacontainer
statcopy = np.copy(statcube)
if void_mask is None:
void_mask = np.zeros_like(datacopy[0, :, :])
else:
print("q_spectrum: Using void_mask")
# creating empty lists
spec_ap_phot = []
spec_var_ap_phot = []
spec_flux_bg = []
object_pos = [x_pos, y_pos]
circ_obj = CircularAperture(object_pos, r=radius_pos)
annu_obj = CircularAnnulus(object_pos, r_in=inner_rad, r_out=outer_rad)
z_max, y_max, x_max = datacopy.shape
for channel in np.arange(0, z_max, 1):
# Total flux
temp_data = np.copy(datacopy[channel, :, :])
ap_phot = aperture_photometry(temp_data, circ_obj)
# Background masking bad values
temp_data_bg = np.copy(datacopy[channel, :, :])
bad_bg = np.min([np.nanmin(temp_data_bg), -99999.])
temp_data_bg[(void_mask == 1)] = bad_bg
temp_data_bg[~np.isfinite(temp_data_bg)] = bad_bg
mask_bg = annu_obj.to_mask(method='center')
data_bg = mask_bg.multiply(temp_data_bg)
data_bg[(np.array(mask_bg) == 0)] = bad_bg
data_bg1d = data_bg[data_bg > bad_bg].flatten()
mean_bg, median_bg, sigma_bg = sigma_clipped_stats(data_bg1d)
# Error
temp_stat = np.copy(statcopy[channel, :, :])
var_ap_phot = aperture_photometry(temp_stat, circ_obj)
# Loading lists
spec_ap_phot.append(ap_phot['aperture_sum'][0] - (median_bg * circ_obj.area))
spec_var_ap_phot.append(var_ap_phot['aperture_sum'][0])
spec_flux_bg.append(median_bg)
# Deleting temporary arrays to clear up memory
del temp_data
del temp_stat
del temp_data_bg
spec_var_ap_phot = np.array(spec_var_ap_phot)
spec_var_ap_phot[~np.isfinite(spec_var_ap_phot)] = np.nanmax(spec_var_ap_phot)
return np.array(spec_ap_phot), np.power(spec_var_ap_phot, 0.5), np.array(spec_flux_bg)
[docs]def q_spectrum_no_bg(datacontainer,
x_pos,
y_pos,
statcube=None,
radius_pos=2.):
"""Performing quick spectrum extraction from a circular aperture on a cube
without calculating background spectral flux
Parameters
----------
datacontainer : np.array, IFUcube Object
data in a 3D array or object initiated in cubeClass.py
x_pos : int, float, np.array
x-location of the source in pixel
y_pos : int, float, np.array
y-location of the source in pixel
statcube : np.array, optional
variance in 3D array if specifying a 3D array for datacube.
Otherwise, pulls this array from IFUcube object (default is None)
radius_pos : float
radius where to perform the aperture photometry
Returns
-------
spec_ap_phot : np.array
spectra of selected object
spec_var_ap_phot : np.array
sigma of spectra
"""
print("q_spectrum_no_bg: Extracting spectrum from the cube")
# allows IFUcube object assignment or two 3D arrays
if statcube is None:
datacopy, statcopy = datacontainer.get_data_stat()
else:
datacopy = np.copy(datacontainer)
statcopy = np.copy(statcube)
# creating empty lists
spec_ap_phot = []
spec_var_ap_phot = []
object_pos = [x_pos, y_pos]
circ_obj = CircularAperture(object_pos, r=radius_pos)
z_max, y_max, x_max = np.shape(datacopy)
for channel in np.arange(0, z_max, 1):
# Total flux
temp_data = np.copy(datacopy[channel, :, :])
ap_phot = aperture_photometry(temp_data, circ_obj)
# Error
temp_stat = np.copy(statcopy[channel, :, :])
var_ap_phot = aperture_photometry(temp_stat, circ_obj)
# Loading lists
spec_ap_phot.append(ap_phot['aperture_sum'][0])
spec_var_ap_phot.append(var_ap_phot['aperture_sum'][0])
# Deleting temporary arrays to clear up memory
del temp_data
del temp_stat
return np.array(spec_ap_phot), np.power(np.array(spec_var_ap_phot), 0.5)
[docs]def q_spectrum_no_bg_mask(datacontainer,
mask_xy, statcube=None):
"""Performing quick spectrum extraction from an aperture given by a 2D mask
from a cube.
Parameters
----------
datacontainer : IFUcube object, np.array
data initialized in cubeClass.py or 3D array of data
mask_xy : np.array
2D aperture where to perform the spectral extraction.
Only spaxels marked as 1 are considered
statcube : np.array
3D variance array
Returns
-------
flux_obj :
sum of object flux ignoring Nans
err_flux_obj :
sigma of variance
"""
# allows IFUcube object assignment or two 3D arrays
if statcube is None:
datacopy, statcopy = datacontainer.get_data_stat()
else:
datacopy = np.copy(datacontainer)
statcopy = np.copy(statcube)
print("q_spectrum_no_bg_mask: Extracting spectrum from the cube")
z_max, y_max, x_max = np.shape(datacopy)
for channel in np.arange(0, z_max, 1):
# masking arrays
datacopy[channel, :, :][(mask_xy < 1)] = 0
statcopy[channel, :, :][(mask_xy < 1)] = 0
flux_obj = np.nansum(datacopy, axis=(1, 2))
err_flux_obj = np.sqrt(np.nansum(statcopy, axis=(1, 2)))
# Deleting temporary arrays to clear up memory
del datacopy
del statcopy
return flux_obj, err_flux_obj
[docs]def gaussian(x, norm, x0, sigma):
""" Returns gaussian given normalization, center, and sigma.
Parameters
----------
x : np.array
x-vector
norm, x0, sigma : float
Normalization, center, and sigma of the gaussian
Returns
-------
gauss : np.array
The gaussian curve evaluated in x
"""
gauss = norm * np.exp(-(x - x0) ** 2 / (2. * sigma ** 2))
return np.array(gauss, float)
[docs]def stat_fullcube(datacube,
n_sigma_extreme=None):
"""Given a cube the macro calculate average, median, and
sigma of all its voxels. NaNs are considered as bad pixels
and removed.
Parameters
----------
datacube : np.array
3D cube containing the voxels you want to get the
statistic for.
n_sigma_extreme : float, optional
if not None, voxels with values larger than
sigmaExtreme times the standard deviation of
the cube will be masked (default is None)
Returns
-------
cube_average, cube_median, cube_std : float
average, median, and standard deviation of
the cube.
"""
print("stat_fullcube: statistic on the cube")
datacopy = np.copy(datacube)
cube_average = np.nanmean(datacopy)
cube_median = np.nanmedian(datacopy)
cube_std = np.nanstd(datacopy)
if n_sigma_extreme is not None:
# Workaround to avoid problems generated by using np.abs and NaNs
extreme_mask = np.zeros_like(datacopy, int)
datacopy[~np.isfinite(datacopy)] = cube_average + (3. * n_sigma_extreme * cube_std)
extreme_mask[np.abs((datacopy - cube_average) / cube_std) > n_sigma_extreme] = 1
cube_average = np.nanmean(datacopy[(extreme_mask == 0)])
cube_median = np.nanmedian(datacopy[(extreme_mask == 0)])
cube_std = np.nanstd(datacopy[(extreme_mask == 0)])
del extreme_mask
print("stat_fullcube: average = {:+0.3f}".format(cube_average))
print(" median = {:+0.3f}".format(cube_median))
print(" sigma = {:+0.3f}".format(cube_std))
# preserve memory
del datacopy
return cube_average, cube_median, cube_std
[docs]def stat_fullcube_z(datacube,
n_sigma_extreme=None):
"""Given a cube the macro calculate average, median, and
sigma of all its voxels along the spectral (z) axis.
NaNs are considered as bad pixels and removed.
Parameters
----------
datacube : np.array
3D cube containing the voxels you want to get the
statistic for.
n_sigma_extreme : np.array, optional
if not None, voxels with values larger than
sigmaExtreme times the standard deviation of
the cube will be masked (default is None)
Returns
-------
cube_avg_z, cube_med_z, cube_std_z : np.arrays
average, median, and standard deviation of
the cube along the spectral axis.
"""
print("stat_fullcube_z: statistic on the cube")
datacopy = np.copy(datacube)
cube_avg_z = np.nanmean(datacopy, axis=(1, 2))
cube_med_z = np.nanmedian(datacopy, axis=(1, 2))
cube_std_z = np.nanstd(datacopy, axis=(1, 2))
if n_sigma_extreme is not None:
# Workaround to avoid problems generated by using np.abs and NaNs
extreme_mask = np.zeros_like(datacopy, int)
z_max, y_max, x_max = np.shape(datacopy)
for channel in np.arange(0, z_max):
datacopy[channel, :, :][~np.isfinite(datacopy[channel, :, :])] = cube_avg_z[channel] + (
3. * n_sigma_extreme * cube_std_z[channel])
extreme_mask[channel, :, :][np.abs(
(datacopy[channel, :, :] - cube_avg_z[channel]) / cube_std_z[channel]) > n_sigma_extreme] = 1
datacopy[(extreme_mask == 1)] = np.nan
cube_avg_z = np.nanmean(datacopy, axis=(1, 2))
cube_med_z = np.nanmedian(datacopy, axis=(1, 2))
cube_std_z = np.nanstd(datacopy, axis=(1, 2))
del extreme_mask
# Cleaning up memory
del datacopy
return cube_avg_z, cube_med_z, cube_std_z
[docs]def pixel_dist(z_pos1, y_pos1, x_pos1,
z_pos2, y_pos2, x_pos2):
""" Given a pixel (1) and a set of locations
(2), the macro returns the euclidean distance
from (2) to (1)
Parameters
----------
z_pos1, y_pos1, x_pos1 : float
location of the pixel from which calculate the
distances
z_pos2, y_pos2, x_pos2 : np.array
location of the pixels for which the distances
from zPix1, y_pos1, xPix1 will be calculated
Returns
-------
dist : np.array
distance from (1) to (2)
"""
z_dist = np.power(z_pos1 - z_pos2, 2.)
y_dist = np.power(y_pos1 - y_pos2, 2.)
x_dist = np.power(x_pos1 - x_pos2, 2.)
dist = np.sqrt(z_dist + y_dist + x_dist)
return dist
[docs]def object_coord(datacontainer, ra, dec, radius):
"""Reads in IFUcube and user input of an object in RA and DEC as well as a search radius.
RA and DEC specify where to extend radius to search for pixel value of center of object.
Utilizes Photutils centroid_sources and header data to convert RA and DEC into pixel values.
Parameters
----------
datacontainer : IFUcube Object
data file initialized in cubeClass.py
ra : float
Right ascension of object in degrees
dec : float
Declination of object in degrees
radius : int, float
pixel radius value to search for center of specified object
Returns
-------
float
x_pos X pixel coordinate of object in datacube
y_pos Y pixel coordinate of object in datacube
"""
headers = datacontainer.get_data_header()
datacopy = datacontainer.get_data()
image_data = np.nansum(datacopy[:, :, :], axis=0) # creates 2D image from dataset and deletes copy to save memory
del datacopy
# utilizes header information for conversions
ra_ref = headers['CRVAL1']
ra_conversion = headers['CD1_1']
x_ref = headers['CRPIX1']
dec_ref = headers['CRVAL2']
dec_conversion = headers['CD2_2']
y_ref = headers['CRPIX2']
ra_dif = ra - ra_ref
dec_dif = dec - dec_ref
x_pix = np.array((ra_dif / ra_conversion) + x_ref)
y_pix = np.array((dec_dif / dec_conversion) + y_ref)
# isolates position fof the center of the object
x_pos, y_pos = centroids.centroid_sources(image_data, x_pix, y_pix,
box_size=radius)
# converts back to float for returned coordinates
x_pos = float(x_pos)
y_pos = float(y_pos)
return x_pos, y_pos