"""
Fits, etc. to extracted spectra
"""
import os
import time
import warnings
import inspect
import yaml
import numpy as np
import scipy.ndimage as nd
from scipy.optimize import nnls
import matplotlib.pyplot as plt
from matplotlib.ticker import MultipleLocator
from matplotlib.gridspec import GridSpec
import astropy.io.fits as pyfits
from grizli import utils
from .utils import trapz
utils.set_warnings()
def float_representer(dumper, value):
"""
Representer function for converting a float value to a YAML scalar
with six decimal places (``{value:.6f}``).
Parameters
----------
dumper : yaml.Dumper
The YAML dumper object.
value : float
The float value to be represented.
Returns
-------
yaml.ScalarNode:
The YAML scalar node representing the float value.
"""
text = "{0:.6f}".format(value)
return dumper.represent_scalar("tag:yaml.org,2002:float", text)
yaml.add_representer(float, float_representer)
try:
import eazy.templates
wave = np.exp(np.arange(np.log(2.4), np.log(4.5), 1.0 / 4000)) * 1.0e4
_temp = utils.pah33(wave)
PAH_TEMPLATES = {}
for t in _temp:
if "3.47" in t:
continue
_tp = _temp[t]
PAH_TEMPLATES[t] = eazy.templates.Template(
name=t, arrays=(_tp.wave, _tp.flux)
)
except:
print("Failed to initialize PAH_TEMPLATES")
PAH_TEMPLATES = {}
import astropy.units as u
import eazy.igm
from . import utils as msautils
try:
from .resample_numba import compute_igm
IGM_FUNC = compute_igm
except ImportError:
igm = eazy.igm.Inoue14()
IGM_FUNC = igm.full_IGM
try:
from .resample_numba import (
resample_template_numba as RESAMPLE_FUNC,
)
from .resample_numba import (
sample_gaussian_line_numba as SAMPLE_LINE_FUNC,
)
except ImportError:
from .resample import resample_template as RESAMPLE_FUNC
from .resample import sample_gaussian_line as SAMPLE_LINE_FUNC
SCALE_UNCERTAINTY = 1.0
FFTSMOOTH = False
__all__ = [
"fit_redshift",
"fit_redshift_grid",
"plot_spectrum",
"read_spectrum",
"calc_uncertainty_scale",
"resample_bagpipes_model",
"SpectrumSampler",
"multiplot_spectra",
"integrate_spectrum_filter",
"do_integrate_filters",
]
[docs]def resample_bagpipes_model(
model_galaxy,
model_comp=None,
spec_wavs=None,
R_curve=None,
nsig=5,
scale_disp=1.3,
wave_scale=1,
):
"""
Parameters
----------
model_galaxy : `bagpipes.models.model_galaxy.model_galaxy`
model_comp : dict
Model components dictionary. If specified, run
``model_galaxy.update(model_comp)``, otherwise get from
``model_galaxy.model_comp``
spec_wavs : array-like, None
Spectrum wavelengths, Angstroms. If not specified, try to use
``model_galaxy.spec_wavs``
R_curve : array-like, None
Spectral resolution FWHM curve. If not specified, try to use
``model_comp["R_curve"][:,1]``.
nsig : int
Number of sigmas for resample.
wave_scale : float
Scalar multipled to model wavelengths, i.e., for additional spectral orders
Returns
-------
spectrum : array-like
Resampled. If ``model_galaxy.spec_wavs`` is found and has same length
as ``spec_wavs``, also set ``model_galaxy.spectrum = spectrum`` along with the
wavelength array.
The units of ``spectrum`` returned by the function are always microJansky, but
the units of ``model_galaxy.spectrum`` are set as appropriate given
``model_galaxy.spec_units``.
"""
from .resample_numba import resample_template_numba
if model_comp is not None:
model_galaxy.update(model_comp)
model_comp = model_galaxy.model_comp
zplusone = model_comp["redshift"] + 1.0
if "veldisp" in model_comp:
velocity_sigma = model_comp["veldisp"]
else:
velocity_sigma = 0.0
redshifted_wavs = zplusone * model_galaxy.wavelengths
if spec_wavs is None:
spec_wavs = model_galaxy.spec_wavs
if R_curve is None:
R_curve = model_comp["R_curve"][:, 1] * scale_disp
fluxes = resample_template_numba(
spec_wavs,
R_curve,
redshifted_wavs * wave_scale,
model_galaxy.spectrum_full,
velocity_sigma=velocity_sigma,
nsig=nsig,
fill_value=0.0,
)
to_mujy = 10**-29 * 2.9979 * 10**18 / spec_wavs**2
if model_galaxy.spec_wavs is not None:
if len(model_galaxy.spec_wavs) == len(spec_wavs):
if model_galaxy.spec_units == "mujy":
spectrum = np.c_[spec_wavs, fluxes / to_mujy]
else:
spectrum = np.c_[spec_wavs, fluxes]
model_galaxy.spectrum = spectrum
return fluxes / to_mujy
[docs]class SpectrumSampler(object):
spec = {}
spec_wobs = None
spec_R_fwhm = None
valid = None
wave_step = None
def __init__(
self,
spec_input,
oversample_kwargs=dict(factor=5, pad=12),
with_sensitivity=True,
**kwargs,
):
"""
Helper functions for sampling templates onto the wavelength grid
of an observed spectrum
Parameters
----------
spec_input : str, `~astropy.io.fits.HDUList`
- `str` : spectrum filename, usually `[root].spec.fits`
- `~astropy.io.fits.HDUList` : FITS data
with_sensitivity : bool
Initialize sensitivity curves for including multiple spectral orders
in the model generation.
Attributes
----------
spec : `~astropy.table.Table`
1D spectrum table from the `SPEC1D HDU of ``file``
meta : dict
Metadata from ``spec.meta``
spec_wobs : array-like
Observed wavelengths, microns
spec_R_fwhm : array-like
Tabulated spectral resolution `R = lambda / dlambda`, assumed to be
defined as FWHM
valid : array-like
Boolean array of valid 1D data
"""
self.initialize_spec(spec_input, **kwargs)
self.NSPEC = len(self.spec)
self.oversamp_wobs = self.oversampled_wavelengths(**oversample_kwargs)
self.initialize_emission_line()
self.sensitivity_file = None
self.sensitivity = {1: 1.0}
self.inv_sensitivity = 1.0
for i in range(1, 4):
self.sensitivity[i + 1] = None
if with_sensitivity:
self.load_sensitivity_curve(**kwargs)
def __getitem__(self, key):
"""
Return column of the `spec` table
"""
return self.spec[key]
@property
def meta(self):
"""
Metadata of `spec` table
"""
return self.spec.meta
@property
def label(self):
"""
"""
if "label" in self.spec.meta:
return self.spec.meta["label"]
elif self.file is not None:
return os.path.basename(self.file)
elif "SRCNAME" in self.spec.meta:
return self.spec.meta["SRCNAME"]
elif "GRATING" in self.spec.meta:
return "spec {GRATING}-{FILTER}".format(**self.spec.meta).lower()
else:
return ""
[docs] def initialize_emission_line(self, nsamp=64):
"""
Initialize emission line
Parameters
----------
nsamp : int
Number of samples for the emission line.
Default = 64
"""
self.xline = (
np.linspace(-nsamp, nsamp, 2 * nsamp + 1) / nsamp * 0.1 + 1
)
self.yline = self.xline * 0.0
self.yline[nsamp] = 1
self.yline /= trapz(self.yline, self.xline)
[docs] def initialize_spec(self, spec_input, **kwargs):
"""
Read spectrum data from file and initialize attributes
Parameters
----------
spec_input : str
Filename, usually `[root].spec.fits`
kwargs : dict
Keyword arguments passed to `~msaexp.spectrum.read_spectrum`
"""
self.spec_input = spec_input
if isinstance(spec_input, str):
self.file = spec_input
else:
self.file = None
self.spec = read_spectrum(spec_input, **kwargs)
self.spec_wobs = self.spec["wave"].astype(np.float32)
self.spec_R_fwhm = self.spec["R"].astype(np.float32)
if "wave_step" in self.spec.colnames:
self.wave_step = self.spec["wave_step"].astype(np.float32)
self.valid = np.isfinite(self.spec["flux"] / self.spec["full_err"])
if "valid" in self.spec.colnames:
self.valid &= self.spec["valid"]
[docs] def oversampled_wavelengths(self, factor=5, pad=12):
"""
Generate a wavelength grid that oversamples the spectrum wavelengths
Parameters
----------
factor : int
Oversampling factor
Returns
-------
waves : array-like
Oversampled wavelengths
"""
dx = np.gradient(self.spec["wave"])
xin = np.linspace(0, 1, self.NSPEC)
xout = np.linspace(0, 1, (self.NSPEC) * factor)
dxout = np.interp(xout, xin, dx) / factor
waves = (
self.spec["wave"][0] - (dx[0] + dxout[0]) / 2 + np.cumsum(dxout)
)
if pad > 0:
waves = np.pad(
waves,
pad * factor,
mode="linear_ramp",
end_values=(
waves[0] - pad * factor * dxout[0],
waves[-1] + pad * factor * dxout[-1],
),
)
return waves
[docs] def resample_eazy_template(
self,
template,
z=0,
scale_disp=1.0,
velocity_sigma=100.0,
fnu=True,
nsig=4,
with_igm=False,
orders=[1, 2, 3, 4],
verbose=False,
**kwargs,
):
"""
Smooth and resample an `eazy.templates.Template` object onto the
observed wavelength grid of a spectrum
Parameters
----------
template : `eazy.templates.Template`
Template object
z : float
Redshift
scale_disp : float
Factor multiplied to the tabulated spectral resolution before
sampling
velocity_sigma : float
Gaussian velocity broadening factor, km/s
fnu : bool
Return resampled template in f-nu flux densities
nsig : int
Number of standard deviations to sample for the convolution
orders : list
List of spectral orders to include if the sensitivity curves have been
read along with the spectrum.
Returns
-------
res : array-like
Template flux density smoothed and resampled at the spectrum
wavelengths
"""
templ_wobs = template.wave.astype(np.float32) * (1 + z) / 1.0e4
templ_flux = template.flux_fnu(z=z).astype(np.float32)
if hasattr(template, 'unc'):
templ_unc = template.unc
res_unc = np.zeros_like(self.spec_wobs)
else:
templ_unc = None
if with_igm:
igmz = IGM_FUNC(z, templ_wobs * 1.0e4)
else:
# Turn off
igmz = 1.0
res = np.zeros_like(self.spec_wobs)
for order in orders:
if order not in self.sensitivity:
msg = f"resample_eazy_template: order {order} not found in sensitivity"
utils.log_comment(utils.LOGFILE, msg, verbose=True)
continue
elif self.sensitivity[order] is None:
continue
res_ = RESAMPLE_FUNC(
self.spec_wobs,
self.spec_R_fwhm * scale_disp * order,
templ_wobs * order,
templ_flux * igmz,
templ_unc=templ_unc,
velocity_sigma=velocity_sigma,
nsig=nsig,
)
if templ_unc is None:
res_i = res_
else:
res_i, res_unc = res_.reshape(2, self.NSPEC)
if order != 1:
res += res_i * self.sensitivity[order] * self.inv_sensitivity
else:
res += res_i
if templ_unc is None:
return res * self.spec["to_flam"]**(fnu is False)
else:
return (
res * self.spec["to_flam"]**(fnu is False),
res_unc * self.spec["to_flam"]**(fnu is False)
)
[docs] def smooth_velocity(self, velocity_sigma=500., unc="err", full_object=False, full_sliced=False, mask="valid", orders=[1], **kwargs):
"""
Smooth spectrum in velocity space
"""
import copy
import eazy.templates
if isinstance(mask, str):
if mask == "valid":
mask = self.spec["valid"] > 0
elif mask in self.spec.colnames:
mask = self.spec[mask] > 0
elif mask is None:
mask = np.isfinite(self.spec["flux"])
tsp = eazy.templates.Template(
arrays=(
self.spec["wave"][mask] * 1.e4,
(self.spec["flux"] * self.spec["to_flam"])[mask] * 1.e9
)
)
if isinstance(unc, str):
if unc == "valid":
tsp.unc = self.spec["valid"][mask] * 1
elif unc in self.spec.colnames:
tsp.unc = self.spec[unc][mask] * 1.0
elif unc is not None:
tsp.unc = unc[mask] * 1.0
else:
tsp.unc = None
result = self.resample_eazy_template(
tsp,
velocity_sigma=velocity_sigma,
orders=orders,
**kwargs
)
if tsp.unc is None:
resamp, resamp_unc = result, result * 0.
else:
resamp, resamp_unc = result
bad = resamp_unc > self.spec["err"]
resamp_unc[bad] = 0.0
if full_object:
wstep = np.exp(
np.arange(
np.log(self.spec["wave"][mask].min()),
np.log(self.spec["wave"][mask].max()),
velocity_sigma / 2.5 / 3.e5)
)
if full_sliced:
wi = np.round(
np.interp(
wstep[3:-3], self.spec["wave"], np.arange(self.NSPEC))
).astype(int)
else:
wi = np.arange(self.NSPEC, dtype=int)
spc = copy.deepcopy(self)
spc.NSPEC = len(wi)
spc.spec = self.spec[wi]
spc.spec["wave_index"] = wi
spc.spec_wobs = (spc.spec["wave"] * 1.0).astype(np.float32)
Rdisp = 1.0 / spc.spec["R"]**2 + (2.35 * velocity_sigma / 3e5)**2
spc.spec_R_fwhm = (1.0 / np.sqrt(Rdisp)).astype(np.float32)
spc.spec["flux"] = resamp[wi]
spc.spec["err"] = resamp_unc[wi]
spc.spec["valid"] = (spc.spec["flux"] != 0) & (resamp_unc[wi] > 0)
spc.meta["smooth_velocity"] = velocity_sigma
for order in spc.sensitivity:
if spc.sensitivity[order] is not None:
if hasattr(spc.sensitivity[order], "__len__"):
spc.sensitivity[order] = spc.sensitivity[order][wi]
if isinstance(self.spec_input, pyfits.HDUList):
orig_cols = [
c.name for c in spc.spec_input["SPEC1D"].columns
]
spc.spec_input["SPEC1D"] = pyfits.BinTableHDU(
spc.spec[orig_cols]
)
if "WHT" in self.spec_input:
err2d = 1.0 / np.sqrt(self.spec_input["WHT"].data)
NY = err2d.shape[0]
spc.spec_input["WHT"].data = (
self.spec_input["WHT"].data[:, wi]
)
for ext in ["SCI", "PROFILE", "BACKGROUND"]:
if ext not in self.spec_input:
continue
spc.spec_input[ext].data = (
self.spec_input[ext].data[:, wi]
)
for i, row in enumerate(self.spec_input[ext].data):
row_i = (row * self.spec["to_flam"])[mask] * 1.e9
tsp.flux[0,:] = row_i
tsp.unc = err2d[i,:] * 1.0
result = self.resample_eazy_template(
tsp,
velocity_sigma=velocity_sigma,
**kwargs
)
resamp, resamp_unc = result
spc.spec_input[ext].data[i,:] = resamp[wi]
if ext == "SCI":
spc.spec_input["WHT"].data[i,:] = (
1.0 / resamp_unc[wi]**2
)
return spc
else:
return resamp, resamp_unc
[docs] def regrid(self, wstep=None, velocity_sigma=250, mask="valid"):
"""
"""
from .resample_numba import trapz_unc
if isinstance(mask, str):
if mask == "valid":
mask = self.spec["valid"] > 0
elif mask in self.spec.colnames:
mask = self.spec[mask] > 0
elif mask is None:
mask = np.isfinite(self.spec["flux"])
if wstep is None:
wstep = np.exp(
np.arange(
np.log(self.spec["wave"].min()),
np.log(self.spec["wave"].max()),
velocity_sigma / 3.e5)
)
wgrid = msautils.array_to_bin_edges(wstep)
new = utils.GTable()
new["wave"] = wstep
new["flux"] = 0.0
new["err"] = 0.0
new["R"] = np.interp(
wstep, self.spec["wave"], self.spec["R"],
left=self.spec["R"][0], right=self.spec["R"][-1]
)
new["nbin"] = 0
N = len(wstep)
nu = 1.0 / self.spec["wave"]
for i in range(N):
wbin = (
(self.spec["wave"] >= wgrid[i])
& (self.spec["wave"] < wgrid[i+1])
& self.spec["valid"] & mask
)
nbin = wbin.sum()
if nbin > 1:
nu_bin = nu[wbin][::-1]
dnu = nu_bin[-1] - nu_bin[0]
resamp, resamp_unc = trapz_unc(
self.spec["flux"][wbin][::-1],
nu_bin,
self.spec["err"][wbin][::-1],
)
new["flux"][i] = resamp / dnu
new["err"][i] = resamp_unc / dnu
new["nbin"][i] = wbin.sum()
for k in self.spec.meta:
new.meta[k] = self.spec.meta[k]
return SpectrumSampler(new)
[docs] def emission_line(
self,
line_um,
line_flux=1,
scale_disp=1.0,
velocity_sigma=100.0,
nsig=4,
**kwargs,
):
"""
Make an emission line template - *deprecated in favor of*
`~msaexp.spectrum.SpectrumSampler.fast_emission_line`
Parameters
----------
line_um : float
Line center, microns
line_flux : float
Line normalization
scale_disp : float
Factor by which to scale the tabulated resolution FWHM curve
velocity_sigma : float
Velocity sigma width in km/s
nsig : int
Number of sigmas of the convolution kernel to sample
Returns
-------
res : array-like
Gaussian emission line sampled at the spectrum wavelengths
"""
res = RESAMPLE_FUNC(
self.spec_wobs,
self.spec_R_fwhm * scale_disp,
self.xline * line_um,
self.yline,
velocity_sigma=velocity_sigma,
nsig=nsig,
)
return res * line_flux / line_um
[docs] def fast_emission_line(
self,
line_um,
line_flux=1,
scale_disp=1.0,
velocity_sigma=100.0,
orders=[1, 2, 3, 4],
lorentz=False,
verbose=False,
**kwargs,
):
"""
Make an emission line template with numerically correct pixel
integration function
Parameters
----------
line_um : float
Line center, microns
line_flux : float
Line normalization
scale_disp : float
Factor by which to scale the tabulated resolution FWHM curve
velocity_sigma : float
Velocity sigma width in km/s
orders : list
List of spectral orders to include if the sensitivity curves have been
read along with the spectrum.
lorentz : bool
Generate a Lorentzian function instead of a Gaussian
Returns
-------
res : array-like
Gaussian emission line sampled at the spectrum wavelengths
"""
res = np.zeros_like(self.spec_wobs)
for order in orders:
if order not in self.sensitivity:
msg = f"resample_eazy_template: order {order} not found in sensitivity"
utils.log_comment(utils.LOGFILE, msg, verbose=True)
continue
elif self.sensitivity[order] is None:
continue
res_i = SAMPLE_LINE_FUNC(
self.spec_wobs,
self.spec_R_fwhm * scale_disp * order,
line_um * order,
dx=self.wave_step,
line_flux=line_flux,
velocity_sigma=velocity_sigma,
lorentz=lorentz,
)
if order != 1:
res += res_i * self.sensitivity[order] * self.inv_sensitivity
else:
res += res_i
return res
[docs] def bspline_array(
self,
nspline=13,
step_size=None,
remap_arrays=None,
minmax=None,
log=False,
by_wavelength=False,
get_matrix=True,
orders=[1, 2, 3, 4],
):
"""
Initialize bspline templates for continuum fits
Parameters
----------
nspline : int
Number of spline functions to sample across the wavelength range
log : bool
Sample in log(wavelength)
by_wavelength : bool
If True, sample bspline functions across the wavelength range.
If False, sample bspline functions across the index range.
get_matrix : bool
If True, return array data. Otherwise, return template objects.
Returns
-------
bspl : array-like
bspline data, depending on ``get_matrix``
"""
if by_wavelength:
if step_size is not None:
nspline = np.log(
self.spec_wobs.max() / self.spec_wobs.min()
) * 3.e5 / step_size
nspline = int(np.maximum(np.round(nspline), 5))
bspl = utils.bspline_templates(
wave=self.spec_wobs * 1.0e4,
degree=3,
df=nspline,
log=log,
get_matrix=get_matrix,
minmax=minmax,
)
else:
if remap_arrays is None:
xvalue = np.arange(len(self.spec_wobs))
else:
xvalue = np.interp(
self.spec_wobs, *remap_arrays, left=0.0, right=0.0
)
if step_size is not None:
nspline = (xvalue.max() - xvalue.min()) / step_size
nspline = int(np.maximum(np.round(nspline), 5))
bspl = utils.bspline_templates(
wave=xvalue,
degree=3,
df=nspline,
log=log,
get_matrix=get_matrix,
minmax=minmax,
)
if 1 not in orders:
if get_matrix:
bspl1 = bspl * 1
else:
bspl1 = {}
for t in bspl:
bspl1[t] = bspl[t].flux * 1
else:
bspl1 = None
for order in orders:
if order == 1:
# Computed above
continue
elif order not in self.sensitivity:
msg = f"resample_eazy_template: order {order} not found in sensitivity"
utils.log_comment(utils.LOGFILE, msg, verbose=True)
continue
elif self.sensitivity[order] is None:
continue
if get_matrix:
for i in range(nspline):
extra = np.interp(
self.spec_wobs,
self.spec_wobs * order,
bspl[:, i],
left=0,
right=0,
)
extra *= self.sensitivity[order] * self.inv_sensitivity
extra[~np.isfinite(extra)] = 0
bspl[:, i] += extra
else:
for t in bspl:
extra = np.interp(
self.spec_wobs,
self.spec_wobs * order,
bspl[t].flux,
left=0,
right=0,
)
extra *= self.sensitivity[order] * self.inv_sensitivity
extra[~np.isfinite(extra)] = 0
bspl[t].flux += extra
if 1 not in orders:
if get_matrix:
bspl = bspl - bspl1
else:
for t in bspl:
bspl[t].flux -= bspl1[t].flux
del bspl1
if get_matrix:
bspl = bspl.T
else:
for t in bspl:
bspl[t].wave = self.spec_wobs
return bspl
[docs] def fit_single_template(
self,
template,
z=0.0,
spl=None,
spline_type='multiply',
lsq=np.linalg.lstsq,
lsq_kwargs=dict(rcond=None),
loss=None,
**kwargs,
):
"""
Resample and fit a template to the spectrum
Parameters
----------
template : `eazy.templates.Template`
High-resolution, rest-frame template
z : float
Redshift
spl : (N, M) array
Optional spline array
lsq, lsq_kwargs : func, dict
Least-squares optimization function, keyword args
loss : func
Loss function, e.g, `scip.stats.norm(scale=spec['full_err'][spec.valid])`
kwargs : dict
keyword arguments passed to
`~msaexp.spectrum.SpectrumSampler.resample_eazy_template`, e.g.,
"scale_disp", "velocity_sigma"
Returns
-------
result : dict
Fit results
- ``model``: best-fit model, (M,) array
- ``A``: design matrix, (N, M) array
- ``coeffs``: least-squares coefficients, (N,) array
- ``lnp`` : ``loss.logpdf(residual)`` (M,) array
"""
from scipy.stats import norm
res = self.resample_eazy_template(template, z=z, **kwargs)
if spl is not None:
if spline_type == 'multiply':
res = res * spl
else:
res = np.vstack([res, spl])
else:
res = res[None, :]
A = res / self.spec["full_err"]
b = self.spec["flux"] / self.spec["full_err"]
coeffs, _r, rank, s = lsq(
A[:, self.valid].T, b[self.valid], **lsq_kwargs
)
model = res.T.dot(coeffs)
residual = self.spec["flux"] - model
if loss is None:
loss = norm(loc=0, scale=self.spec["full_err"][self.valid])
result = {
"model": model,
"A": res,
"coeffs": coeffs,
"lnp": loss.logpdf(residual[self.valid]),
}
return result
[docs] def resample_bagpipes_model(
self,
model_galaxy,
model_comp=None,
nsig=5,
scale_disp=1.3,
orders=[1, 2, 3, 4],
**kwargs,
):
"""
Resample a `bagpipes` model to the wavelength grid of the spectrum.
See ``msaexp.spectrum.resample_bagpipes_model``.
"""
if "scale_disp" in model_comp:
scale_disp = model_comp["scale_disp"]
spectrum = np.zeros_like(self.spec_wobs)
for order in orders:
if order not in self.sensitivity:
msg = f"resample_eazy_template: order {order} not found in sensitivity"
utils.log_comment(utils.LOGFILE, msg, verbose=True)
continue
elif self.sensitivity[order] is None:
continue
spectrum_i = resample_bagpipes_model(
model_galaxy,
model_comp=model_comp,
spec_wavs=self.spec_wobs * 1.0e4,
R_curve=self.spec["R"] * scale_disp * order,
nsig=nsig,
wave_scale=order,
)
spectrum += (
spectrum_i * self.sensitivity[order] * self.inv_sensitivity
)
return spectrum
[docs] def igm_absorption(self, z, scale_tau=1.0, scale_disp=1.3):
r"""
`Inoue+ (2014) <https://ui.adsabs.harvard.edu/abs/2014MNRAS.442.1805I>`_ IGM
absorption
Parameters
----------
z : float
Redshift
scale_tau : float
Factor to scale $\tau_\mathrm{IGM}$
scale_disp : float
Dispersion R rescaling
Returns
-------
igm : array-like
IGM model transmission at observed-frame wavelengths
"""
from .resample_numba import compute_igm, resample_template_numba
igm_raw = IGM_FUNC(z, self.oversamp_wobs * 1.0e4)
igm = np.ones(self.NSPEC)
igm = RESAMPLE_FUNC(
self.spec_wobs,
self.spec["R"] * scale_disp,
self.oversamp_wobs,
igm_raw,
velocity_sigma=0.0,
wave_min=0.0800 * (1 + z),
wave_max=0.1350 * (1 + z),
left=0.0,
right=1.0,
)
return igm
[docs] def load_sensitivity_curve(
self,
sens_file=None,
prefix="msaexp_sensitivity",
version="001",
file_template="{prefix}_{grating}_{filter}_{version}.fits",
verbose=False,
**kwargs,
):
""" """
if sens_file is None:
sens_file = file_template.format(
prefix=prefix,
filter=self.meta["FILTER"],
grating=self.meta["GRATING"],
version=version,
).lower()
# paths to search
paths = [
"",
os.path.join(
os.path.dirname(__file__), "data/extended_sensitivity"
),
]
file_path = None
for path in paths:
if os.path.exists(os.path.join(path, sens_file)):
file_path = path
break
if file_path is None:
return None
msg = f"load_sensitivity_curve: {sens_file}"
utils.log_comment(utils.LOGFILE, msg, verbose=verbose)
sens_data = utils.read_catalog(os.path.join(file_path, sens_file))
self.sensitivity_file = sens_file
self.sensitivity_correction = 1.0
self.sensitivity_correction_type = None
if "nrs1_s200a1" in sens_data.colnames:
if "APERNAME" in self.meta:
if "_SLIT" in self.meta["APERNAME"]:
self.sensitivity_correction = (
1.0 / sens_data["nrs1_s200a1"]
)
self.sensitivity_correction_type = "nrs1_s200a1"
if 'S400A1_SLIT' in self.meta['APERNAME']:
s400_file = os.path.join(file_path, "sensitivity_ratio_s400a1_s200a1_001.yaml")
# print("xxx", s400_file)
if os.path.exists(s400_file):
with open(s400_file) as fp:
s400 = yaml.load(fp, Loader=yaml.Loader)
df = len(s400['s400a1_s200a1_coefs'])
wpr = msautils.get_standard_wavelength_grid(grating='PRISM', sample=1.0)
spx = np.interp(
# self.spec['wave'],
sens_data["wavelength"],
wpr,
np.arange(len(wpr))/len(wpr)
)
bspl = utils.bspline_templates(
spx, df=df, minmax=(0, 1), get_matrix=True
)
self.sensitivity_correction *= bspl.dot(s400['s400a1_s200a1_coefs'])
self.sensitivity_correction_type = "nrs1_s200a1_s400a1"
self.sensitivity[1] = np.interp(
self.spec["wave"],
sens_data["wavelength"],
sens_data["sensitivity"] * self.sensitivity_correction,
left=0.0,
right=0.0,
)
# Precompute inv_sensitivity = 1 / sensitivity[1] to avoid divide by zero
self.inv_sensitivity = 1.0 / self.sensitivity[1]
self.inv_sensitivity[~np.isfinite(self.inv_sensitivity)] = 0.0
for order in range(2, 5):
if f"sensitivity_{order}" in sens_data.colnames:
sens_i = np.interp(
self.spec["wave"],
sens_data["wavelength"] * order,
sens_data[f"sensitivity_{order}"]
* self.sensitivity_correction,
left=0.0,
right=0.0,
)
if np.nanmax(sens_i) == 0:
self.sensitivity[order] = None
else:
self.sensitivity[order] = sens_i
else:
self.sensitivity[order] = None
[docs] def multiplot(self, sx=10, sy=2.5, z=0, ny=5, xpad=0.05, wave_limits=None, xgrid=None, fig=None, ymax=None, sharey=False, flam=0, okws=None, bkg=0., data=None, show_bkg=True, verbose=True, show_sn=False, **kwargs):
"""
Make a plot of the spectrum split into wavelength intervals
Parameters
----------
sx, sy : float
x and y size of the separate figure axes
z : float
If provided, plot in rest-frame intervals
ny : int
Number of wavelength intervals
xpad : float
Fractional overlap of wavelength intervals
wave_limits : [float, float], None
Override limits from min / max wavelength of the spectrum
xgrid : list
Explicit list of wavelength steps. If not provided, will split
the spectrum wavelength array into ``ny`` intervals
fig : figure
If not provided, initialize a new figure
Returns
-------
fig : figure
Figure object
axes : list
List of axis objects
Examples
--------
.. plot::
:include-source:
from msaexp import spectrum
import msaexp.utils
sp = spectrum.SpectrumSampler("https://s3.amazonaws.com/msaexp-nirspec/extractions/smacs0723-ero-v4/smacs0723-ero-v4_g395m-f290lp_2736_6355.spec.fits")
z = 7.665
fig, axes = sp.multiplot(
ny=4,
sx=8, sy=2,
z=z,
color='k', alpha=0.5
)
# Overplot line list
li = msaexp.utils.lines.LineList()
for ax in axes:
li.add_to_axis(ax, alpha=0.5)
axes[-1].set_xlabel(f'rest wavelength, z={z:.3f}')
fig.tight_layout(pad=1)
"""
w = self.spec["wave"] / (1 + z)
if xgrid is None:
if wave_limits is None:
wave_limits = (
w[self.spec["valid"]].min(), w[self.spec["valid"]].max()
)
xgrid = np.linspace(*wave_limits, ny+1)
xgrid_ = np.array([xgrid[i:i+2] for i in range(ny)])
else:
# ny = len(xgrid) - 1
if hasattr(xgrid, 'ndim'):
if xgrid.ndim == 1:
xgrid_ = np.array(
[xgrid[i:i+2] for i in range(len(xgrid)-1)]
)
else:
xgrid_ = xgrid
else:
xgrid_ = xgrid
ny = len(xgrid_)
if fig is None:
fig, axes = plt.subplots(ny,1,figsize=(sx, sy * ny), sharey=sharey)
else:
axes = fig.axes
fig = None
valid = (
self.spec["valid"]
& np.isfinite((self.spec["flux"] - bkg) / self.spec["full_err"])
)
low_sensitivity = (
self.sensitivity[1] < 0.1 * np.nanmax(self.sensitivity[1])
)
low_sensitivity &= (
self.spec["wave"] < np.nanmedian(self.spec["wave"][valid])
)
valid &= ~low_sensitivity
msg = f"{self.label} z={z:.3f} "
msg += f"{w[valid].min():.2f} – {w[valid].max():.2f} µm"
utils.log_comment(utils.LOGFILE, msg, verbose=verbose)
msk = np.nan**(1 - valid)
fsub = ((self.spec["flux"] - bkg) * self.spec["to_flam"]**flam * msk)
esub = (self.spec["full_err"] * self.spec["to_flam"]**flam * msk)
fbkg = ((self.spec["flux"]*0. - bkg) * self.spec["to_flam"]**flam * msk)
if data is not None:
fsub, esub = data[0] * 1, data[1] * 1.
show_sn = False
emed = np.nanmedian(esub)
if show_sn:
fsub = fsub / esub
fbkg = fbkg / esub
esub = esub**0 * msk
for i, ax in enumerate(axes):
if fig is None:
xl = ax.get_xlim()
else:
dx = xgrid_[i][1] - xgrid_[i][0]
xl = (xgrid_[i][0] - xpad * dx, xgrid_[i][1] + xpad * dx)
ax.set_xlim(*xl)
sub = (w > xl[0]) & (w < xl[1])
subm = sub & np.isfinite(msk)
if subm.sum() == 0:
continue
pl = ax.plot(w[sub], fsub[sub], **kwargs)
if hasattr(bkg, 'size'):
if show_bkg:
ax.fill_between(
w[sub], fbkg[sub], fsub[sub],
color=pl[0].get_color(), alpha=0.05
)
if (ymax is not None) & (sub.sum() > 3):
yl = (
np.nanmedian(fsub[sub] - ymax[0] * emed),
np.nanmedian(fsub[sub] + ymax[1] * emed)
)
if ymax[0] < 0:
ym = np.nanmedian(fsub[sub] + ymax[1] * emed)
yl = (ymax[0] * ym, ym)
ax.set_ylim(*yl)
else:
yl = ax.get_ylim()
if okws is not None:
for k in self.sensitivity:
if k == 1:
continue
try:
s_i = np.interp(
self["wave"] * k,
self["wave"],
self.sensitivity[k],
left=1, right=1
)
except ValueError:
continue
subk = (w * k > xl[0]) & (w * k < xl[1])
subk &= (w * k < w.max())
oflux = fsub * s_i
ax.plot(w[subk] * k, oflux[subk], **okws)
ax.set_xlim(*xl)
ax.set_ylim(*yl)
return fig, axes
GRATING_COLORS = {
"PRISM": "purple",
"G395M": "tomato",
"G235M": "goldenrod",
"G140M": "steelblue",
"G395H": "coral",
"G235H": "gold",
"G140H": "lightblue",
}
[docs]def multiplot_spectra(file="smacs0723-ero-v4_g395m-f290lp_2736_6355.spec.fits", ny=8, z=None, log_steps=True, wave_limits=None, renorm=True, extra="", spline_step=64, gratings=[], trim_empty_axes=True, grating_colors=GRATING_COLORS, **kwargs):
"""
Query and plot overlapping spectra
Parameters
----------
file : str
Parent DJA spectrum filename
spline_step : int, None
Number of spline components to use for continuum fit
Returns
-------
result : dict
Output figure and spectrum objects from the query
Examples
--------
.. plot::
:include-source:
from msaexp import spectrum
import msaexp.utils
result = spectrum.multiplot_spectra(
file="smacs0723-ero-v4_g395m-f290lp_2736_6355.spec.fits",
sx=8, sy=2,
ny=4,
)
# Overplot line list
li = msaexp.utils.lines.LineList()
for ax in result['axes']:
li.add_to_axis(ax, alpha=0.5)
result['axes'][-1].set_xlabel(
f"rest wavelength, z={result['z']:.3f}"
)
result['fig'].tight_layout(pad=1)
"""
import eazy.filters
query_url = (
"https://grizli-cutout.herokuapp.com/nirspec_extractions?file="
+ file + "&output=csv" + extra
)
print(query_url),
spec_list = utils.read_catalog(query_url, format="csv")
if len(gratings) > 0:
gr = [g.split("_")[0] for g in spec_list["grating"]]
keep = np.isin(gr, gratings)
else:
keep = np.ones(len(spec_list), dtype=bool)
if z is None:
z = spec_list["z"][(spec_list["grade"] > 2.5) & (keep)].mean()
for i, row in enumerate(spec_list):
print(
"{file} ({keep}) {z:.4f} grade={grade}".format(
keep=keep[i], **row
)
)
FILE_URL = "https://s3.amazonaws.com/msaexp-nirspec/extractions/{root}/{file}"
specs = [
SpectrumSampler(
FILE_URL.format(**row)
)
for row in spec_list[keep]
]
if spline_step is not None:
for spe in specs:
bspl = spe.bspline_array(by_wavelength=False, step_size=spline_step)
fmask = spe['flux'] * np.nan**(1 - spe['valid'])
fmed = np.squeeze(
utils.safe_nanmedian_filter(
fmask[None,:],
filter_footprint=np.ones(
spline_step // 2), axis=1, cval=np.nan
)[0]
)
fmed[spe['flux'] - fmed > 20 * spe["full_err"]] = np.nan
ok = spe["valid"] & (np.isfinite(fmed / spe["full_err"]))
c_ = np.linalg.lstsq(
(bspl / spe["full_err"])[:, ok].T,
(fmed / spe["full_err"])[ok],
rcond=None
)
spe.bkg = bspl.T.dot(c_[0])
else:
for spe in specs:
spe.bkg = 0.
if renorm:
RES = eazy.filters.FilterFile()
filter_numbers = [364, 365, 366, 375, 376, 377]
cols = ["npix", "frac", "flux", "err", "full_err"]
res = {}
for c in cols:
res[c] = []
for spe in specs:
rows = []
for fn in filter_numbers:
fdata = integrate_spectrum_filter(
spe.spec, RES[fn]
)
rows.append(fdata)
rows = np.array(rows)
for i, c in enumerate(cols):
res[c].append(rows[:, i])
for c in cols:
res[c] = np.array(res[c])
ok = (res["full_err"] > 0) & (res["frac"] > 0.3)
res["full_err"][~ok] = np.nan
res["flux"][~ok] = np.nan
scales = np.ones_like(res["flux"])
fw = np.array([RES[fn].pivot for fn in filter_numbers]) / 1.e4
cpoly = [np.array([0., 1.])] * len(specs)
nok = ok.sum(axis=1)
iref = np.argmax(nok)
for iter_ in range(3):
fscl = res["flux"] / scales
avg = np.nansum(fscl / res["full_err"]**2, axis=0)
avg /= np.nansum(1. / res["full_err"]**2, axis=0)
for i in range(len(specs)):
ok_i = ok[i,:]
w_i = 1./res["full_err"][i,ok_i]
c_i = np.polyfit(fw[ok_i], fscl[i, ok_i] / avg[ok_i], 1, w=w_i)
scales[i,:] *= np.polyval(c_i, fw)
for i, spe in enumerate(specs):
c_i = np.polyfit(fw, scales[i,:], 1)
scl = np.polyval(c_i, spe["wave"])
spe.meta["rescale0"] = c_i[0]
spe.meta["rescale1"] = c_i[1]
print("renorm: ", os.path.basename(spe.file), c_i)
spe.bkg /= scl
for c in ["flux", "err", "full_err"]:
spe.spec[c] /= scl
w = np.hstack([s["wave"][s["valid"]] / (1+z) for s in specs])
if wave_limits is None:
wave_limits = (w.min(), w.max())
if log_steps:
xgrid = np.logspace(*np.log10(wave_limits), ny+1)
else:
xgrid = np.linspace(*wave_limits, ny+1)
if trim_empty_axes:
first = int(
np.floor(np.interp(w.min(), xgrid, np.arange(ny+1), left=0))
)
last = int(
np.floor(np.interp(w.max(), xgrid, np.arange(ny+1), right=ny+1))
) + 2
xgrid = xgrid[first:last]
ny = len(xgrid) - 1
fig = None
for i, spe in enumerate(specs):
if grating_colors is not None:
kwargs["color"] = grating_colors[spe.meta['GRATING']]
fig_, axes = spe.multiplot(
fig=fig,
z=z,
ny=ny,
wave_limits=wave_limits,
xgrid=xgrid,
bkg=spe.bkg,
**kwargs
)
if fig_ is not None:
fig = fig_
for ax in axes:
yl = ax.get_ylim()
xl = ax.get_xlim()
ax.hlines([0], *xl, color='0.5', linestyle='--', alpha=0.3, zorder=-1)
ax.set_xlim(*xl)
ax.set_ylim(*yl)
result = {
"query_url": query_url,
"fig": fig,
"axes": axes,
"specs": specs,
"z": z,
}
return result
def smooth_template_disp_eazy(
templ, wobs_um, disp, z, velocity_fwhm=80, scale_disp=1.3, flambda=True
):
"""
Smooth a template with a wavelength-dependent dispersion function.
*NB:* Not identical to the preferred
`~msaexp.spectrum.SpectrumSampler.resample_eazy_template`
Parameters
----------
templ : `eazy.template.Template`
Template object
wobs_um : array-like
Target observed-frame wavelengths, microns
disp : table
NIRSpec dispersion table with columns ``WAVELENGTH``, ``R``
z : float
Target redshift
velocity_fwhm : float
Velocity dispersion FWHM, km/s
scale_disp : float
Scale factor applied to ``disp['R']``
flambda : bool
Return smoothed template in units of f_lambda or f_nu.
Returns
-------
tsmooth : array-like
Template convolved with spectral resolution + velocity dispersion.
Same length as `wobs_um`
"""
dv = np.sqrt(velocity_fwhm**2 + (3.0e5 / disp["R"] / scale_disp) ** 2)
disp_ang = disp["WAVELENGTH"] * 1.0e4
dlam_ang = disp_ang * dv / 3.0e5 / 2.35
def _lsf(wave):
return np.interp(
wave,
disp_ang,
dlam_ang,
left=dlam_ang[0],
right=dlam_ang[-1],
)
if hasattr(wobs_um, "value"):
wobs_ang = wobs_um.value * 1.0e4
else:
wobs_ang = wobs_um * 1.0e4
flux_model = templ.to_observed_frame(
z=z,
lsf_func=_lsf,
clip_wavelengths=None,
wavelengths=wobs_ang,
smoothspec_kwargs={"fftsmooth": FFTSMOOTH},
)
if flambda:
flux_model = np.squeeze(flux_model.flux_flam())
else:
flux_model = np.squeeze(flux_model.flux_fnu())
return flux_model
def smooth_template_disp_sedpy(
templ,
wobs_um,
disp,
z,
velocity_fwhm=80,
scale_disp=1.3,
flambda=True,
with_igm=True,
):
"""
Smooth a template with a wavelength-dependent dispersion function using
the `sedpy`/`prospector` LSF smoothing function
Parameters
----------
templ : `eazy.template.Template`
Template object
wobs_um : array-like
Target observed-frame wavelengths, microns
disp : table
NIRSpec dispersion table with columns ``WAVELENGTH``, ``R``
z : float
Target redshift
velocity_fwhm : float
Velocity dispersion FWHM, km/s
scale_disp : float
Scale factor applied to ``disp['R']``
flambda : bool
Return smoothed template in units of f_lambda or f_nu.
with_igm : bool
Apply the intergalactic medium (IGM) absorption to smoothed template
Returns
-------
tsmooth : array-like
Template convolved with spectral resolution + velocity dispersion.
Same length as `wobs_um`
"""
from sedpy.smoothing import smoothspec
wobs = templ.wave * (1 + z)
trim = wobs > wobs_um[0] * 1.0e4 * 0.95
trim &= wobs < wobs_um[-1] * 1.0e4 * 1.05
if flambda:
fobs = templ.flux_flam(z=z) # [wclip]
else:
fobs = templ.flux_fnu(z=z) # [wclip]
if with_igm:
fobs *= templ.igm_absorption(z)
wobs = wobs[trim]
fobs = fobs[trim]
R = (
np.interp(
wobs,
disp["WAVELENGTH"] * 1.0e4,
disp["R"],
left=disp["R"][0],
right=disp["R"][-1],
)
* scale_disp
)
dv = np.sqrt(velocity_fwhm**2 + (3.0e5 / R) ** 2)
dlam_ang = wobs * dv / 3.0e5 / 2.35
def _lsf(wave):
return np.interp(wave, wobs, dlam_ang)
tsmooth = smoothspec(
wobs,
fobs,
smoothtype="lsf",
lsf=_lsf,
outwave=wobs_um * 1.0e4,
fftsmooth=FFTSMOOTH,
)
return tsmooth
def smooth_template_disp(
templ,
wobs_um,
disp,
z,
velocity_fwhm=80,
scale_disp=1.3,
flambda=True,
with_igm=True,
):
"""
Smooth a template with a wavelength-dependent dispersion function
Parameters
----------
templ : `eazy.template.Template`
Template object
wobs_um : array-like
Target observed-frame wavelengths, microns
disp : table
NIRSpec dispersion table with columns ``WAVELENGTH``, ``R``
z : float
Target redshift
velocity_fwhm : float
Velocity dispersion FWHM, km/s
scale_disp : float
Scale factor applied to ``disp['R']``
flambda : bool
Return smoothed template in units of f_lambda or f_nu.
with_igm : bool
Apply the intergalactic medium (IGM) absorption to the template
Returns
-------
tsmooth : array-like
Template convolved with spectral resolution + velocity dispersion.
Same length as `wobs_um`
"""
wobs = templ.wave * (1 + z) / 1.0e4
if flambda:
fobs = templ.flux_flam(z=z) # [wclip]
else:
fobs = templ.flux_fnu(z=z) # [wclip]
if with_igm:
fobs *= templ.igm_absorption(z)
disp_r = np.interp(wobs, disp["WAVELENGTH"], disp["R"]) * scale_disp
fwhm_um = np.sqrt(
(wobs / disp_r) ** 2 + (velocity_fwhm / 3.0e5 * wobs) ** 2
)
sig_um = np.maximum(fwhm_um / 2.35, 0.5 * np.gradient(wobs))
x = wobs_um[:, np.newaxis] - wobs[np.newaxis, :]
gaussian_kernel = (
1.0 / np.sqrt(2 * np.pi * sig_um**2) * np.exp(-(x**2) / 2 / sig_um**2)
)
tsmooth = trapz(gaussian_kernel * fobs, x=wobs, axis=1)
return tsmooth
[docs]def fit_redshift(
file="jw02767005001-02-clear-prism-nrs2-2767_11027.spec.fits",
z0=[0.2, 10],
zstep=None,
eazy_templates=None,
nspline=None,
scale_disp=1.3,
vel_width=100,
Rline=None,
is_prism=False,
use_full_dispersion=False,
ranges=None,
sys_err=0.02,
**kwargs,
):
"""
Fit spectrum for the redshift
Parameters
----------
file : str
Spectrum filename
z0 : (float, float)
Redshift range
zstep : (float, float)
Step sizes in `dz/(1+z)`
eazy_templates : list, None
List of `eazy.templates.Template` objects. If not provided, just use
dummy spline continuum and emission line templates
nspline : int
Number of splines to use for dummy continuum
scale_disp : float
Scale factor of nominal dispersion files, i.e., `scale_disp > 1`
*increases* the spectral resolution
vel_width : float
Velocity width the emission line templates
Rline : float
Original spectral resolution used to sample the line templates
is_prism : bool
Is the spectrum from the prism?
use_full_dispersion : bool
Convolve `eazy_templates` with the full wavelength-dependent
dispersion function
ranges : list of tuples
Wavelength ranges for the subplots
sys_err : float
Systematic uncertainty added in quadrature with nominal uncertainties
Returns
-------
fig : Figure
Diagnostic figure
sp : `~astropy.table.Table`
A copy of the 1D spectrum as fit with additional columns describing the
best-fit templates
data : dict
Fit metadata
"""
frame = inspect.currentframe()
if "spec.fits" in file:
froot = file.split(".spec.fits")[0]
else:
froot = file.split(".fits")[0]
# Log function arguments
utils.LOGFILE = f"{froot}.zfit.log"
if os.path.exists(utils.LOGFILE):
os.remove(utils.LOGFILE)
# Log arguments
args = utils.log_function_arguments(
utils.LOGFILE,
frame,
"spectrum.fit_redshift",
ignore=["eazy_templates"],
)
if isinstance(args, dict):
with open(f"{froot}.zfit.call.yml", "w") as fp:
fp.write(f"# {time.ctime()}\n# {os.getcwd()}\n")
if eazy_templates is not None:
for i, t in enumerate(eazy_templates):
msg = f'# eazy_templates[{i}] = "{t.__str__()}"'
utils.log_comment(utils.LOGFILE, msg, verbose=False)
fp.write(msg + "\n")
yaml.dump(args, stream=fp, Dumper=yaml.Dumper)
# is_prism |= ('clear' in file)
spec = read_spectrum(file, sys_err=sys_err, **kwargs)
is_prism |= spec.grating in ["prism"]
if zstep is None:
if is_prism:
step0 = 0.002
step1 = 0.0001
else:
step0 = 0.001
step1 = 0.00002
else:
step0, step1 = zstep
if Rline is None:
if is_prism:
Rline = 1000
else:
Rline = 5000
# First pass
zgrid = utils.log_zgrid(z0, step0)
zg0, chi0 = fit_redshift_grid(
file,
zgrid=zgrid,
line_complexes=False,
vel_width=vel_width,
scale_disp=scale_disp,
eazy_templates=eazy_templates,
Rline=Rline,
use_full_dispersion=use_full_dispersion,
sys_err=sys_err,
**kwargs,
)
zbest0 = zg0[np.argmin(chi0)]
# Second pass
zgrid = utils.log_zgrid(
zbest0 + np.array([-0.005, 0.005]) * (1 + zbest0), step1
)
zg1, chi1 = fit_redshift_grid(
file,
zgrid=zgrid,
line_complexes=False,
vel_width=vel_width,
scale_disp=scale_disp,
eazy_templates=eazy_templates,
Rline=Rline,
use_full_dispersion=use_full_dispersion,
sys_err=sys_err,
**kwargs,
)
zbest = zg1[np.argmin(chi1)]
fz, az = plt.subplots(1, 1, figsize=(6, 4))
az.plot(zg0, chi0)
az.plot(zg1, chi1)
az.set_ylim(chi1.min() - 50, chi1.min() + 10**2)
az.grid()
az.set_xlabel("redshift")
az.set_ylabel(r"$\chi^2$")
az.set_title(os.path.basename(file))
fz.tight_layout(pad=1)
fz.savefig(froot + ".chi2.png")
if is_prism:
if ranges is None:
ranges = [(3427, 5308), (6250, 9700)]
if nspline is None:
nspline = 41
else:
if ranges is None:
ranges = [(3680, 4400), (4861 - 50, 5008 + 50), (6490, 6760)]
if nspline is None:
nspline = 23
fig, sp, data = plot_spectrum(
file,
z=zbest,
show_cont=True,
draws=100,
nspline=nspline,
figsize=(16, 8),
vel_width=vel_width,
ranges=ranges,
Rline=Rline,
scale_disp=scale_disp,
eazy_templates=eazy_templates,
use_full_dispersion=use_full_dispersion,
sys_err=sys_err,
**kwargs,
)
if eazy_templates is not None:
spl_fig, sp2, spl_data = plot_spectrum(
file,
z=zbest,
show_cont=True,
draws=100,
nspline=nspline,
figsize=(16, 8),
vel_width=vel_width,
ranges=ranges,
Rline=Rline,
scale_disp=scale_disp,
eazy_templates=None,
use_full_dispersion=use_full_dispersion,
sys_err=sys_err,
**kwargs,
)
for k in [
"coeffs",
"covar",
"model",
"mline",
"fullchi2",
"contchi2",
"eqwidth",
]:
if k in spl_data:
data[f"spl_{k}"] = spl_data[k]
spl_fig.savefig(froot + ".spl.png")
sp["spl_model"] = sp2["model"]
sp["wave"].unit = u.micron
sp["flux"].unit = u.microJansky
sp.write(froot + ".spec.zfit.fits", overwrite=True)
zdata = {}
zdata["zg0"] = zg0.tolist()
zdata["chi0"] = chi0.tolist()
zdata["zg1"] = zg1.tolist()
zdata["chi1"] = chi1.tolist()
data["dchi2"] = float(np.nanmedian(chi0) - np.nanmin(chi0))
for k in ["templates", "spl_covar", "covar"]:
if k in data:
_ = data.pop(k)
with open(froot + ".zfit.yaml", "w") as fp:
yaml.dump(zdata, stream=fp)
with open(froot + ".yaml", "w") as fp:
yaml.dump(data, stream=fp)
fig.savefig(froot + ".zfit.png")
return fig, sp, data
H_RECOMBINATION_LINES = [
"Ha+NII",
"Ha",
"Hb",
"Hg",
"Hd",
"PaA",
"PaB",
"PaG",
"PaD",
"Pa8",
"BrA",
"BrB",
"BrG",
"BrD",
]
EXTRA_NIR_LINES = [
"FeII-11128",
"PII-11886",
"FeII-12570",
"FeII-16440",
"FeII-16877",
"FeII-17418",
"FeII-18362",
]
def make_templates(
sampler,
z,
bspl={},
eazy_templates=None,
vel_width=100,
broad_width=4000,
broad_lines=[],
scale_disp=1.3,
grating="prism",
halpha_prism=["Ha+NII"],
oiii=["OIII"],
o4363=[],
sii=["SII"],
with_pah=True,
extra_lines=EXTRA_NIR_LINES,
apply_igm=True,
**kwargs,
):
"""
Generate fitting templates
sampler : `~msaexp.spectrum.SpectrumSampler`
Spectrum object with wavelength arrays and metadata
z : float
Redshift
bspl : dict
Spline templates for dummy continuum
eazy_templates : list
Optional list of `eazy.templates.Template` template objects to use in
place of the spline + line templates
vel_width : float
Velocity width of the individual emission line templates, km/s
broad_width : float
Velocity width of the broad emission line templates, km/s
broad_lines : list
List of line template names to be modeled as broad lines
scale_disp : float
Scaling factor for the tabulated dispersion
grating : str
Grating used for the observation
halpha_prism : ['Ha+NII'], ['Ha','NII']
Line template names to use for Halpha and [NII], i.e., ``['Ha+NII']``
fits with a fixed line ratio and `['Ha','NII']` fits them separately
but with a fixed line ratio 6548:6584 = 1:3
oiii : ['OIII'], ['OIII-4959','OIII-5007']
Similar for [OIII]4959+5007, ``['OIII']`` fits as a doublet with fixed
ratio 4959:5007 = 1:2.98 and ``['OIII-4949', 'OIII-5007']`` fits them
independently.
o4363 : [] or ['OIII-4363']
Include [OIII]4363 as a separate template
sii : ['SII'], ['SII-6717','SII-6731']
How to fit the [SII] doublet
with_pah : bool
Whether to include PAH templates in the fitting
extra_lines : list
List of extra emission lines to fit
Returns
-------
templates : list
List of the computed template objects
tline : array
Boolean list of which templates are line components
_A : (NT, NWAVE) array
Design matrix of templates interpolated at ``wobs``
"""
from grizli import utils
wobs = sampler.spec_wobs
wmask = sampler.valid
wmin = wobs[wmask].min()
wmax = wobs[wmask].max()
templates = []
tline = []
if eazy_templates is None:
lw, lr = utils.get_line_wavelengths()
_A = [bspl * 1]
for i in range(bspl.shape[0]):
templates.append(f"spl {i}")
tline.append(False)
# templates = {}
# for k in bspl:
# templates[k] = bspl[k]
# templates = {}
if grating in ["prism"]:
hlines = ["Hb", "Hg", "Hd"]
if z > 4:
oiii = ["OIII-4959", "OIII-5007"]
hene = ["HeII-4687", "NeIII-3867", "HeI-3889"]
o4363 = ["OIII-4363"]
else:
# oiii = ['OIII']
hene = ["HeI-3889"]
# o4363 = []
# sii = ['SII']
# sii = ['SII-6717', 'SII-6731']
hlines += halpha_prism + ["NeIII-3968"]
fuv = ["OIII-1663"]
oii_7320 = ["OII-7325"]
extra = []
else:
hlines = ["Hb", "Hg", "Hd", "H8", "H9", "H10", "H11", "H12"]
hene = ["HeII-4687", "NeIII-3867"]
o4363 = ["OIII-4363"]
oiii = ["OIII-4959", "OIII-5007"]
sii = ["SII-6717", "SII-6731"]
hlines += ["Ha", "NII-6549", "NII-6584"]
hlines += ["H7", "NeIII-3968"]
fuv = ["OIII-1663", "HeII-1640", "CIV-1549"]
oii_7320 = ["OII-7323", "OII-7332"]
extra = ["HeI-6680", "SIII-6314"]
line_names = []
line_waves = []
for li in [
*hlines,
*oiii,
*o4363,
"OII",
*hene,
*sii,
*oii_7320,
"ArIII-7138",
"ArIII-7753",
"SIII-9068",
"SIII-9531",
"OI-6302",
"PaD",
"PaG",
"PaB",
"PaA",
"HeI-1083",
"CI-9850",
# "SiVI-19634",
"BrA",
"BrB",
"BrG",
"BrD",
"BrE",
"BrF",
"PfB",
"PfG",
"PfD",
"PfE",
"Pa8",
"Pa9",
"Pa10",
"HeI-5877",
*fuv,
"CIII-1906",
"NIII-1750",
"Lya",
"MgII",
"NeV-3346",
"NeVI-3426",
"HeI-7065",
"HeI-8446",
*extra,
*extra_lines,
]:
if li not in lw:
continue
lwi = lw[li][0] * (1 + z)
if lwi < wmin * 1.0e4:
continue
if lwi > wmax * 1.0e4:
continue
line_names.append(li)
line_waves.append(lwi)
so = np.argsort(line_waves)
line_waves = np.array(line_waves)[so]
for iline in so:
li = line_names[iline]
lwi = lw[li][0] * (1 + z)
if lwi < wmin * 1.0e4:
continue
if lwi > wmax * 1.0e4:
continue
# print(l, lwi, disp_r)
name = f"line {li}"
for i, (lwi0, lri) in enumerate(zip(lw[li], lr[li])):
lwi = lwi0 * (1 + z) / 1.0e4
if li in broad_lines:
vel_i = broad_width
else:
vel_i = vel_width
line_i = sampler.fast_emission_line(
lwi,
line_flux=lri / np.sum(lr[li]),
scale_disp=scale_disp,
velocity_sigma=vel_i,
**kwargs,
)
if i == 0:
line_0 = line_i
else:
line_0 += line_i
_A.append(line_0 / 1.0e4)
templates.append(name)
tline.append(True)
if with_pah:
xpah = 3.3 * (1 + z)
if ((xpah > wmin) & (xpah < wmax)) | (0):
for t in PAH_TEMPLATES:
tp = PAH_TEMPLATES[t]
tflam = sampler.resample_eazy_template(
tp,
z=z,
velocity_sigma=vel_width,
scale_disp=scale_disp,
fnu=False,
**kwargs,
)
_A.append(tflam)
templates.append(t)
tline.append(True)
_A = np.vstack(_A)
if apply_igm:
igmz = IGM_FUNC(z, wobs.value * 1.0e4)
_A *= np.maximum(igmz, 0.001)
else:
if isinstance(eazy_templates[0], dict) & (len(eazy_templates) == 2):
# lw, lr dicts
lw, lr = eazy_templates
_A = [bspl * 1]
for i in range(bspl.shape[0]):
templates.append(f"spl {i}")
tline.append(False)
for li in lw:
name = f"line {li}"
line_0 = None
for i, (lwi0, lri) in enumerate(zip(lw[li], lr[li])):
lwi = lwi0 * (1 + z) / 1.0e4
if lwi < wmin:
continue
elif lwi > wmax:
continue
if li in broad_lines:
vel_i = broad_width
else:
vel_i = vel_width
line_i = sampler.fast_emission_line(
lwi,
line_flux=lri / np.sum(lr[li]),
scale_disp=scale_disp,
velocity_sigma=vel_i,
**kwargs,
)
if line_0 is None:
line_0 = line_i
else:
line_0 += line_i
if line_0 is not None:
_A.append(line_0 / 1.0e4)
templates.append(name)
tline.append(True)
_A = np.vstack(_A)
if apply_igm:
igmz = IGM_FUNC(z, wobs.value * 1.0e4)
_A *= np.maximum(igmz, 0.001)
elif len(eazy_templates) == 1:
# Scale single template by spline
t = eazy_templates[0]
for i in range(bspl.shape[0]):
templates.append(f"{t.name} spl {i}")
tline.append(False)
tflam = sampler.resample_eazy_template(
t,
z=z,
velocity_sigma=vel_width,
scale_disp=scale_disp,
fnu=False,
**kwargs
)
_A = np.vstack([bspl * tflam])
if apply_igm:
igmz = IGM_FUNC(z, wobs.value * 1.0e4)
_A *= np.maximum(igmz, 0.001)
else:
templates = []
tline = []
_A = []
for i, t in enumerate(eazy_templates):
tflam = sampler.resample_eazy_template(
t,
z=z,
velocity_sigma=vel_width,
scale_disp=scale_disp,
fnu=False,
**kwargs,
)
_A.append(tflam)
templates.append(t.name)
tline.append(False)
_A = np.vstack(_A)
if apply_igm:
igmz = IGM_FUNC(z, wobs.value * 1.0e4)
_A *= np.maximum(igmz, 0.001)
return templates, np.array(tline), _A
[docs]def fit_redshift_grid(
file="jw02767005001-02-clear-prism-nrs2-2767_11027.spec.fits",
zgrid=None,
vel_width=100,
scale_disp=1.3,
nspline=27,
eazy_templates=None,
use_full_dispersion=True,
use_aper_columns=False,
**kwargs,
):
"""
Fit redshifts on a grid
Parameters
----------
zgrid : array
Redshifts to fit
others : see `~msaexp.spectrum.fit_redshift`
Returns
-------
zgrid : array
Copy of `zgrid`
chi2 : array
Chi-squared of the template fits at redshifts from `zgrid`
"""
from tqdm import tqdm
sampler = SpectrumSampler(file, **kwargs)
spec = sampler.spec
if (use_aper_columns > 0) & ("aper_flux" in spec.colnames):
if ("aper_corr" in spec.colnames) & (use_aper_columns > 1):
ap_corr = spec["aper_corr"] * 1
else:
ap_corr = 1
flam = spec["aper_flux"] * spec["to_flam"] * ap_corr
eflam = spec["aper_full_err"] * spec["to_flam"] * ap_corr
else:
flam = spec["flux"] * spec["to_flam"]
eflam = spec["full_err"] * spec["to_flam"]
mask = spec["valid"]
flam[~mask] = np.nan
eflam[~mask] = np.nan
bspl = sampler.bspline_array(nspline=nspline, get_matrix=True)
chi2 = zgrid * 0.0
for iz, z in tqdm(enumerate(zgrid)):
templates, tline, _A = make_templates(
sampler,
z,
bspl=bspl,
eazy_templates=eazy_templates,
vel_width=vel_width,
scale_disp=scale_disp,
use_full_dispersion=use_full_dispersion,
grating=spec.grating,
**kwargs,
)
okt = _A[:, mask].sum(axis=1) != 0
_Ax = _A[okt, :] / eflam
_yx = flam / eflam
try:
if eazy_templates is None:
_x = np.linalg.lstsq(_Ax[:, mask].T, _yx[mask], rcond=None)
else:
_x = nnls(_Ax[:, mask].T, _yx[mask])
except RuntimeError:
x = [np.zeros(okt.sum())]
coeffs = np.zeros(_A.shape[0])
coeffs[okt] = _x[0]
_model = _A.T.dot(coeffs)
chi = (flam - _model) / eflam
chi2_i = (chi[mask] ** 2).sum()
# print(z, chi2_i)
chi2[iz] = chi2_i
return zgrid, chi2
[docs]def calc_uncertainty_scale(
file=None,
data=None,
order=0,
initial_mask=(0.2, 5),
sys_err=0.02,
fit_sys_err=False,
method="bfgs",
student_df=10,
update=True,
verbose=True,
**kwargs,
):
"""
Compute a polynomial scaling of the spectrum uncertainties. The procedure
is to fit for coefficients of a polynomial multiplied to the `err` array
of the spectrum such that `(flux - model)/(err*scl)` residuals are `N(0,1)`
Parameters
----------
file : str
Spectrum filename
data : tuple
Precomputed outputs from `~msaexp.spectrum.plot_spectrum`
order : int
Degree of the correction polynomial
initial_mask : (float, float)
Masking for the fit initialization. First parameter is zeroth-order
uncertainty scaling and the second parameter is the mask threshold of
the residuals
sys_err : float
Systematic component of the uncertainties
fit_sys_err : bool
Fit for adjusted ``sys_err`` parameter
method : str
Optimization method for `scipy.optimize.minimize`
student_df : int, None
If specified, calculate log likelihood of a `scipy.stats.t Student-t
distribution with ``df=student_df``. Otherwise, calculate
log-likelihood of the `scipy.stats.normal` distribution.
update : bool
Update the global `msaexp.spectrum.SCALE_UNCERTAINTY` array with the
fit result
verbose : bool
Print status messages. If ``verbose > 1`` will also print status at
each step of the optimization.
kwargs : dict
Keyword arguments for `~msaexp.spectrum.plot_spectrum` if `data` not
specified
Returns
-------
spec : `~astropy.table.Table`
The spectrum as fit
escale : array
The wavelength-dependent scaling of the uncertainties
sys_err : float
The systematic uncertainty used, fixed or adjusted depending on
``fit_sys_err``
res : object
Output from `scipy.optimize.minimize`
"""
from scipy.stats import norm
from scipy.stats import t as student
from scipy.optimize import minimize
global SCALE_UNCERTAINTY
SCALE_UNCERTAINTY = 1.0
if data is None:
spec, spl = plot_spectrum(
inp=file,
eazy_templates=None,
get_spl_templates=True,
sys_err=sys_err,
get_init_data=True,
**kwargs,
)
else:
spec, spl = data
_err = spec["err"]
ok = (_err > 0) & (spec["flux"] != 0)
ok &= np.isfinite(_err + spec["flux"])
def objfun_scale_uncertainties(c, ok, ret):
"""
Objective function for scaling uncertainties
(Nested method in `calc_uncertainty_scale`).
Parameters
----------
c : list
List of coefficients for scaling uncertainties.
ok : array
Boolean array indicating which elements to include.
ret : int
Return type indicator. If 1, return sys_err, full_err, and model.
Returns
----------
chi2 : float
The calculated chi-squared value.
"""
if fit_sys_err:
_sys_err = c[0] / 100
_coeffs = c[1:]
else:
_coeffs = c[:]
_sys_err = sys_err
err = 10 ** np.polyval(_coeffs, spec["wave"] - 3) * _err
if "escale" in spec.colnames:
err *= spec["escale"]
_full_err = np.sqrt(
err**2 + np.maximum(_sys_err * spec["flux"], 0) ** 2
)
_Ax = spl / _full_err
_yx = spec["flux"] / _full_err
_x = np.linalg.lstsq(_Ax[:, ok].T, _yx[ok], rcond=None)
_model = spl.T.dot(_x[0])
if ret == 1:
return _sys_err, _full_err, _model
if student_df is None:
_lnp = norm.logpdf(
(spec["flux"] - _model)[ok],
loc=_model[ok] * 0.0,
scale=_full_err[ok],
).sum()
chi2 = -1 * _lnp / 2
else:
_lnp = student.logpdf(
(spec["flux"] - _model)[ok],
student_df,
loc=_model[ok] * 0.0,
scale=_full_err[ok],
).sum()
chi2 = -1 * _lnp / 2
if 0:
_resid = (spec["flux"] - _model)[ok] / _full_err[ok]
chi2 = np.log(utils.nmad(_resid)) ** 2
cstr = " ".join([f"{ci:6.2f}" for ci in c])
msg = f"{cstr}: {chi2:.6e}"
utils.log_comment(utils.LOGFILE, msg, verbose=(verbose > 1))
return chi2
if fit_sys_err:
c0 = np.zeros(order + 2)
c0[0] = sys_err * 100
else:
c0 = np.zeros(order + 1)
if initial_mask is not None:
c0[-1] = initial_mask[0]
_sys, _efit, _model = objfun_scale_uncertainties(c0, ok, 1)
bad = np.abs((spec["flux"] - _model) / _efit) > initial_mask[1]
msg = "calc_uncertainty_scale: "
msg += f"Mask additional {(bad & ok).sum()} pixels"
utils.log_comment(utils.LOGFILE, msg, verbose=verbose)
ok &= ~bad
res = minimize(objfun_scale_uncertainties, c0, method=method, args=(ok, 0))
_sys, _efit, _model = objfun_scale_uncertainties(res.x, ok, 1)
spec.meta["calc_syse"] = _sys
spec["calc_err"] = _efit
spec["calc_model"] = _model
spec["calc_valid"] = ok
if fit_sys_err:
sys_err, _coeffs = res.x[0] / 100.0, res.x[1:]
else:
_coeffs = res.x[:]
_resid = (spec["flux"] - _model) / _efit
msg = f"calc_uncertainty_scale: sys_err = {sys_err:.4f}"
msg += f"\ncalc_uncertainty_scale: coeffs = {_coeffs}"
msg += f"\ncalc_uncertainty_scale: NMAD = {utils.nmad(_resid[ok]):.3f}"
utils.log_comment(utils.LOGFILE, msg, verbose=verbose)
if update:
msg = f"calc_uncertainty_scale: Set SCALE_UNCERTAINTY: {_coeffs}"
utils.log_comment(utils.LOGFILE, msg, verbose=verbose)
SCALE_UNCERTAINTY = _coeffs
return spec, 10 ** np.polyval(_coeffs, spec["wave"] - 3), sys_err, res
def setup_spectrum(file, **kwargs):
"""
Deprecated, use `~msaexp.spectrum.read_spectrum`
"""
return read_spectrum(file, **kwargs)
def set_spec_sys_err(spec, sys_err=0.02):
"""
Set `full_err` columns including a systematic component
>>> full_err**2 = (err * escale)**2 + (sys_err * maximum(flux,0))**2
"""
spec["full_err"] = np.sqrt(
(spec["err"] * spec["escale"]) ** 2
+ np.maximum(sys_err * spec["flux"], 0) ** 2
)
if "aper_err" in spec.colnames:
spec["aper_full_err"] = np.sqrt(
(spec["aper_err"] * spec["escale"]) ** 2
+ np.maximum(sys_err * spec["aper_flux"], 0) ** 2
)
spec.meta["sys_err"] = sys_err
[docs]def read_spectrum(
inp,
spectrum_extension="SPEC1D",
sys_err=0.02,
err_mask=(5, 0.5),
err_median_filter=[11, 0.2],
**kwargs,
):
"""
Read a spectrum and apply flux and/or uncertainty scaling
Flux scaling `corr` is applied if there are `POLY[i]` keywords in the
spectrum metadata, with
.. code-block:: python
:dedent:
>>> coeffs = [header[f'POLY{i}'] for i in range(order+1)]
>>> corr = np.polyval(coeffs, np.log(spec['wave']*1.e4))
Parameters
----------
inp : str or `~astropy.io.fits.HDUList`
Fits filename of a file that includes a `~astropy.io.fits.BinTableHDU`
table of an extracted spectrum. Alternatively, can be an
`~astropy.io.fits.HDUList` itself
spectrum_extension : str
Extension name of 1D spectrum in file or HDUList input
sys_err : float
Systematic uncertainty added in quadrature with `err` array
err_mask : float, float or None
Mask pixels where ``err < np.percentile(err[err > 0],
err_mask[0])*err_mask[1]``
err_median_filter : int, float or None
Mask pixels where ``err < nd.median_filter(err,
err_median_filter[0])*err_median_filter[1]``
Returns
-------
spec : `~astropy.table.Table`
Spectrum table. Existing columns in `file` should be
- ``wave`` : observed-frame wavelength, microns
- ``flux`` : flux density, `~astropy.units.microJansky`
- ``err`` : Uncertainty on ```flux```
Columns calculated here are
- ``corr`` : flux scaling
- ``escale`` : extra scaling of uncertainties
- ``full_err`` : Full uncertainty including `sys_err`
- ``R`` : spectral resolution
- ``valid`` : Data are valid
"""
global SCALE_UNCERTAINTY
import scipy.ndimage as nd
if isinstance(inp, str):
if "fits" in inp:
with pyfits.open(inp) as hdul:
if spectrum_extension in hdul:
spec = utils.read_catalog(hdul[spectrum_extension])
else:
spec = utils.read_catalog(inp)
else:
spec = utils.read_catalog(inp)
elif isinstance(inp, pyfits.HDUList):
if spectrum_extension in inp:
spec = utils.read_catalog(inp[spectrum_extension])
else:
msg = f"{spectrum_extension} extension not found in HDUList input"
raise ValueError(msg)
elif hasattr(inp, "colnames"):
spec = utils.GTable()
for c in inp.colnames:
spec[c] = inp[c]
for k in inp.meta:
spec.meta[k] = inp.meta[k]
else:
spec = utils.read_catalog(inp)
if "POLY0" in spec.meta:
pc = []
for pi in range(10):
if f"POLY{pi}" in spec.meta:
pc.append(spec.meta[f"POLY{pi}"])
corr = np.polyval(pc, np.log(spec["wave"] * 1.0e4))
spec["flux"] *= corr
spec["err"] *= corr
spec["corr"] = corr
else:
spec["corr"] = 1.0
if "escale" not in spec.colnames:
if hasattr(SCALE_UNCERTAINTY, "__len__"):
if len(SCALE_UNCERTAINTY) < 6:
spec["escale"] = 10 ** np.polyval(
SCALE_UNCERTAINTY, spec["wave"]
)
elif len(SCALE_UNCERTAINTY) == len(spec):
spec["escale"] = SCALE_UNCERTAINTY
else:
spec["escale"] = SCALE_UNCERTAINTY
# print('xx scale scalar', SCALE_UNCERTAINTY)
for c in ["flux", "err"]:
if hasattr(spec[c], "filled"):
spec[c] = spec[c].filled(0)
valid = np.isfinite(spec["flux"] + spec["err"])
valid &= spec["err"] > 0
valid &= spec["flux"] != 0
if (err_mask is not None) & (valid.sum() > 0):
_min_err = (
np.nanpercentile(spec["err"][valid], err_mask[0]) * err_mask[1]
)
valid &= spec["err"] > _min_err
if (err_median_filter is not None) & (valid.sum() > 0):
med = nd.median_filter(
spec["err"][valid].astype(float), err_median_filter[0]
)
medi = np.interp(
spec["wave"], spec["wave"][valid], med, left=0, right=0
)
valid &= spec["err"] > err_median_filter[1] * medi
set_spec_sys_err(spec, sys_err=sys_err)
spec["full_err"][~valid] = 0
spec["flux"][~valid] = 0.0
spec["err"][~valid] = 0.0
spec["valid"] = valid
grating = spec.meta["GRATING"].lower()
_filter = spec.meta["FILTER"].lower()
if "R" not in spec.colnames:
R_fwhm = msautils.get_default_resolution_curve(
grating=grating,
wave=spec['wave'],
# grating_degree=2 default poly fit for gratings
**kwargs
)
spec["R"] = R_fwhm
spec["R"].description = "Spectral resolution from tabulated curves"
if "smooth_velocity" in spec.meta:
vsm = spec.meta["smooth_velocity"]
Rdisp = 1.0 / spec["R"]**2 + (2.35 * vsm / 3.e5)**2
spec["R"] = 1.0 / np.sqrt(Rdisp)
spec["R"].description += f" + smooth_velocity={vsm:.1f} km/s"
spec.grating = grating
spec.filter = _filter
flam_unit = 1.0e-20 * u.erg / u.second / u.cm**2 / u.Angstrom
wave_unit = spec["wave"].unit
if spec["wave"].unit is None:
wave_unit = u.micron
flux_unit = spec["flux"].unit
if flux_unit is None:
flux_unit = u.microJansky
spec.equiv = u.spectral_density(spec["wave"].data * wave_unit)
spec["to_flam"] = (
(1 * flux_unit).to(flam_unit, equivalencies=spec.equiv).value
)
spec.meta["flamunit"] = flam_unit.unit
spec.meta["fluxunit"] = flux_unit
spec.meta["waveunit"] = wave_unit
spec["wave"] = spec["wave"].value
spec["flux"] = spec["flux"].value
spec["err"] = spec["err"].value
return spec
[docs]def plot_spectrum(
inp="jw02767005001-02-clear-prism-nrs2-2767_11027.spec.fits",
z=9.505,
vel_width=100,
scale_disp=1.3,
nspline=27,
bspl=None,
show_cont=True,
draws=100,
figsize=(16, 8),
ranges=[(3650, 4980)],
full_log=False,
eazy_templates=None,
use_full_dispersion=True,
get_init_data=False,
scale_uncertainty_kwargs=None,
plot_unit=None,
trim_negative=True,
sys_err=0.02,
return_fit_results=False,
use_aper_columns=False,
label=None,
**kwargs,
):
"""
Fit templates to a spectrum and make a diagnostic figure
Parameters
----------
inp : str or `~astropy.io.fits.HDUList` or `~msaexp.spectrum.SpectrumSampler`
Input spectrum file path or HDUList object or SpectrumSampler object.
Default is ``jw02767005001-02-clear-prism-nrs2-2767_11027.spec.fits``.
z : float, optional
Redshift value. Default is 9.505.
vel_width : int, optional
Velocity width, km/s. Default is 100.
scale_disp : float, optional
Dispersion scale factor. Default is 1.3.
nspline : int, optional
Number of spline continuum components. Default is 27.
show_cont : bool, optional
Whether to show continuum. Default is True.
draws : int, optional
Number of draws from the fit covariance to plot. Default is 100.
figsize : tuple, optional
Figure size. Default is (16, 8).
ranges : list of tuples, optional
List of wavelength ranges to plot in zoom panels. Default is [(3650, 4980)].
full_log : bool, optional
Whether to use full log. Default is False.
eazy_templates : None or str, optional
Eazy templates to use for the fit. Default is None.
use_full_dispersion : bool, optional
Whether to use full dispersion. Default is True.
get_init_data : bool, optional
If specified, just return the `~msaexp.spectrum.SpectrumSampler` object and
the design matrix array ``A``
scale_uncertainty_kwargs : None or dict, optional
Scale uncertainty keyword arguments. Default is None.
plot_unit : None, str, `~astropy.units.Unit`, optional
Plot unit. Default is None.
sys_err : float, optional
Systematic error added in quadrature with the spectrum
uncertainties. Default is 0.02.
return_fit_results : bool
Just return the fit results without making a plot -
``templates, coeffs, flam, eflam, _model, mask, full_chi2``
use_aper_columns : bool, optional
Whether to use aperture columns in the spectrum table.
Default is False.
label : None or str, optional
Label to add to the figure. Default is None.
kwargs : dict, optional
Additional keyword arguments passed to `~msaexp.spectrum.SpectrumSampler`
and `~msaexp.spectrum.make_templates`
Returns
-------
If ``return_fit_results = True``, returns a tuple containing the fit results:
``templates, coeffs, flam, eflam, _model, mask, full_chi2``.
Otherwise, returns ``None``.
"""
global SCALE_UNCERTAINTY
lw, lr = utils.get_line_wavelengths()
if isinstance(inp, str):
sampler = SpectrumSampler(inp, sys_err=sys_err, **kwargs)
file = inp
elif isinstance(inp, pyfits.HDUList):
sampler = SpectrumSampler(inp, sys_err=sys_err, **kwargs)
file = None
else:
file = None
sampler = inp
if (label is None) & (file is not None):
label = os.path.basename(file)
spec = sampler.spec
if (use_aper_columns > 0) & ("aper_flux" in spec.colnames):
if ("aper_corr" in spec.colnames) & (use_aper_columns > 1):
ap_corr = spec["aper_corr"] * 1
else:
ap_corr = 1
flux_column = "aper_flux"
err_column = "aper_full_err"
# flam = spec['aper_flux']*spec['to_flam']*ap_corr
# eflam = spec['aper_full_err']*spec['to_flam']*ap_corr
else:
flux_column = "flux"
err_column = "full_err"
# flam = spec['flux']*spec['to_flam']
# eflam = spec['full_err']*spec['to_flam']
ap_corr = 1.0
flam = spec[flux_column] * spec["to_flam"] * ap_corr
eflam = spec[err_column] * spec["to_flam"] * ap_corr
wrest = spec["wave"] / (1 + z) * 1.0e4
wobs = spec["wave"]
mask = spec["valid"]
flam[~mask] = np.nan
eflam[~mask] = np.nan
if bspl is None:
if "continuum_model" in spec.colnames:
bspl = spec["continuum_model"][None, :] * spec["to_flam"] * ap_corr
apply_igm = False
else:
bspl = sampler.bspline_array(nspline=nspline, get_matrix=True)
apply_igm = True
templates, tline, _A = make_templates(
sampler,
z,
bspl=bspl,
eazy_templates=eazy_templates,
vel_width=vel_width,
scale_disp=scale_disp,
use_full_dispersion=use_full_dispersion,
grating=spec.grating,
apply_igm=apply_igm,
**kwargs,
)
if get_init_data:
return spec, _A
if scale_uncertainty_kwargs is not None:
_, escl, _sys_err, _ = calc_uncertainty_scale(
file=None,
data=(spec, _A),
sys_err=sys_err,
**scale_uncertainty_kwargs,
)
# eflam *= escl
spec["escale"] *= escl
set_spec_sys_err(spec, sys_err=_sys_err)
eflam = spec[err_column] * spec["to_flam"] * ap_corr
okt = _A[:, mask].sum(axis=1) != 0
_Ax = _A[okt, :] / eflam
_yx = flam / eflam
if eazy_templates is None:
_x = np.linalg.lstsq(_Ax[:, mask].T, _yx[mask], rcond=None)
else:
_x = nnls(_Ax[:, mask].T, _yx[mask])
coeffs = np.zeros(_A.shape[0])
coeffs[okt] = _x[0]
_model = _A.T.dot(coeffs)
_mline = _A.T.dot(coeffs * tline)
_mcont = _model - _mline
full_chi2 = ((flam - _model) ** 2 / eflam**2)[mask].sum()
cont_chi2 = ((flam - _mcont) ** 2 / eflam**2)[mask].sum()
if return_fit_results:
return templates, coeffs, flam, eflam, _model, mask, full_chi2
try:
oktemp = okt & (coeffs != 0)
AxT = (_A[oktemp, :] / eflam)[:, mask].T
covar_i = utils.safe_invert(np.dot(AxT.T, AxT))
covar = utils.fill_masked_covar(covar_i, oktemp)
covard = np.sqrt(covar.diagonal())
has_covar = True
except:
has_covar = False
covard = coeffs * 0.0
N = len(templates)
covar = np.eye(N, N)
msg = "\n# line flux err\n# flux x 10^-20 erg/s/cm2\n"
if label is not None:
msg += f"# {label}\n"
msg += f"# z = {z:.5f}\n# {time.ctime()}\n"
cdict = {}
eqwidth = {}
for i, t in enumerate(templates):
cdict[t] = [float(coeffs[i]), float(covard[i])]
if t.startswith("line "):
lk = t.split()[-1]
# Equivalent width:
# coeffs, line fluxes are in units of 1e-20 erg/s/cm2
# _mcont, continuum model is in units of 1-e20 erg/s/cm2/A
# so observed-frame equivalent width is roughly
# eqwi = coeffs[i] / _mcont[ wave_obs[i] ]
if lk in lw:
lwi = lw[lk][0] * (1 + z) / 1.0e4
continuum_i = np.interp(lwi, spec["wave"], _mcont)
eqwi = coeffs[i] / continuum_i
else:
eqwi = np.nan
eqwidth[t] = [float(eqwi)]
msg += f"{t:>20} {coeffs[i]:8.1f} ± {covard[i]:8.1f}"
msg += f" (EW={eqwi:9.1f})\n"
utils.log_comment(utils.LOGFILE, msg, verbose=True)
if "srcra" not in spec.meta:
spec.meta["srcra"] = 0.0
spec.meta["srcdec"] = 0.0
spec.meta["srcname"] = "unknown"
spec["model"] = _model / spec["to_flam"]
spec["mline"] = _mline / spec["to_flam"]
data = {
"z": float(z),
"file": file,
"label": label,
"ra": float(spec.meta["srcra"]),
"dec": float(spec.meta["srcdec"]),
"name": str(spec.meta["srcname"]),
"wmin": float(spec["wave"][mask].min()),
"wmax": float(spec["wave"][mask].max()),
"coeffs": cdict,
"covar": covar.tolist(),
"wave": [float(m) for m in spec["wave"]],
"flux": [float(m) for m in spec["flux"]],
"err": [float(m) for m in spec["err"]],
"escale": [float(m) for m in spec["escale"]],
"model": [float(m) for m in _model / spec["to_flam"]],
"mline": [float(m) for m in _mline / spec["to_flam"]],
"templates": templates,
"dof": int(mask.sum()),
"fullchi2": float(full_chi2),
"contchi2": float(cont_chi2),
"eqwidth": eqwidth,
}
for k in ["z", "wmin", "wmax", "dof", "fullchi2", "contchi2"]:
spec.meta[k] = data[k]
# fig, axes = plt.subplots(len(ranges)+1,1,figsize=figsize)
if len(ranges) > 0:
fig = plt.figure(figsize=figsize, constrained_layout=True)
gs = GridSpec(2, len(ranges), figure=fig)
axes = []
for i, _ra in enumerate(ranges):
axes.append(fig.add_subplot(gs[0, i]))
axes.append(fig.add_subplot(gs[1, :]))
else:
fig, ax = plt.subplots(1, 1, figsize=figsize)
axes = [ax]
_Acont = (_A.T * coeffs)[mask, :][:, :nspline]
if trim_negative:
_Acont[_Acont < 0.001 * _Acont.max()] = np.nan
if (draws is not None) & has_covar:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
mu = np.random.multivariate_normal(
coeffs[oktemp], covar_i, size=draws
)
# print('draws', draws, mu.shape, _A.shape)
mdraws = _A[oktemp, :].T.dot(mu.T)
else:
mdraws = None
if plot_unit is not None:
unit_conv = (
(1 * spec.meta["flamunit"])
.to(plot_unit, equivalencies=spec.equiv)
.value
)
else:
unit_conv = np.ones(len(wobs))
for ax in axes:
if 1:
ax.errorbar(
wobs,
flam * unit_conv,
eflam * unit_conv,
marker="None",
linestyle="None",
alpha=0.5,
color="k",
ecolor="k",
zorder=100,
)
ax.step(
wobs, flam * unit_conv, color="k", where="mid", lw=1, alpha=0.8
)
# ax.set_xlim(3500, 5100)
# ax.plot(_[1]['templz']/(1+z), _[1]['templf'])
ax.step(
wobs[mask],
(_mcont * unit_conv)[mask],
color="pink",
alpha=0.8,
where="mid",
)
ax.step(
wobs[mask],
(_model * unit_conv)[mask],
color="r",
alpha=0.8,
where="mid",
)
cc = utils.MPL_COLORS
for w, c in zip(
[3727, 4980, 6565, 9070, 9530, 1.094e4, 1.282e4, 1.875e4],
[
cc["purple"],
cc["b"],
cc["g"],
"darkred",
"darkred",
cc["pink"],
cc["pink"],
cc["pink"],
],
):
wz = w * (1 + z) / 1.0e4
dw = 70 * (1 + z) / 1.0e4
ax.fill_between(
[wz - dw, wz + dw],
[0, 0],
[100, 100],
color=c,
alpha=0.07,
zorder=-100,
)
if mdraws is not None:
ax.step(
wobs[mask],
(mdraws.T * unit_conv).T[mask, :],
color="r",
alpha=np.maximum(1.0 / draws, 0.02),
zorder=-100,
where="mid",
)
if show_cont:
ax.plot(
wobs[mask],
(_Acont.T * unit_conv[mask]).T,
color="olive",
alpha=0.3,
)
ax.fill_between(
ax.get_xlim(),
[-100, -100],
[0, 0],
color="0.8",
alpha=0.5,
zorder=-1,
)
ax.fill_betweenx(
[0, 100],
[0, 0],
[1215.67 * (1 + z) / 1.0e4] * 2,
color=utils.MPL_COLORS["orange"],
alpha=0.2,
zorder=-1,
)
ax.grid()
# axes[0].set_xlim(1000, 2500)
# ym = 0.15; axes[0].set_ylim(-0.1*ym, ym)
for i, r in enumerate(ranges):
axes[i].set_xlim(*[ri * (1 + z) / 1.0e4 for ri in r])
# print('xxx', r)
if spec.filter == "clear":
axes[-1].set_xlim(0.6, 5.54)
axes[-1].xaxis.set_minor_locator(MultipleLocator(0.1))
axes[-1].xaxis.set_major_locator(MultipleLocator(0.5))
elif spec.filter == "f070lp":
axes[-1].set_xlim(0.65, 1.31)
axes[-1].xaxis.set_minor_locator(MultipleLocator(0.02))
elif spec.filter == "f100lp":
axes[-1].set_xlim(0.92, 1.91)
axes[-1].xaxis.set_minor_locator(MultipleLocator(0.02))
axes[-1].xaxis.set_major_locator(MultipleLocator(0.1))
elif spec.filter == "f170lp":
axes[-1].set_xlim(1.65, 4.21)
elif spec.filter == "f290lp":
axes[-1].set_xlim(2.89, 5.31)
else:
axes[-1].set_xlim(wrest[mask].min(), wrest[mask].max())
axes[-1].set_xlabel(f"obs wavelenth, z = {z:.5f}")
# axes[0].set_title(os.path.basename(file))
for ax in axes:
xl = ax.get_xlim()
ok = wobs > xl[0]
ok &= wobs < xl[1]
ok &= np.abs(wrest - 5008) > 100
ok &= np.abs(wrest - 6564) > 100
ok &= mask
if ok.sum() == 0:
ax.set_visible(False)
continue
ymax = np.maximum(
(_model * unit_conv)[ok].max(),
10 * np.median((eflam * unit_conv)[ok]),
)
ymin = np.minimum(-0.1 * ymax, -3 * np.median((eflam * unit_conv)[ok]))
ax.set_ylim(ymin, ymax * 1.3)
# print(xl, ymax)
if ok.sum() > 0:
if (np.nanmax((flam / eflam)[ok]) > 20) & (full_log):
ax.set_ylim(0.005 * ymax, ymax * 5)
ax.semilogy()
if len(axes) > 0:
gs.tight_layout(fig, pad=0.8)
else:
fig.tight_layout(pad=0.8)
if label is not None:
fig.text(
0.015 * 12.0 / 12,
0.005,
f"{label}",
ha="left",
va="bottom",
transform=fig.transFigure,
fontsize=8,
)
fig.text(
1 - 0.015 * 12.0 / 12,
0.005,
time.ctime(),
ha="right",
va="bottom",
transform=fig.transFigure,
fontsize=6,
)
return fig, spec, data
DEFAULT_FNUMBERS = [
239,
205, # F814W, F160W
362,
363,
364,
365,
366,
370,
371,
375,
376,
377, # NRC BB
379,
380,
381,
382,
383,
384,
385,
386, # NRC MB
]
DEFAULT_REST_FNUMBERS = [
120,
121, # GALEX
218,
219,
270,
271,
272,
274, # FUV
153,
154,
155, # UBV
156,
157,
158,
159,
160, # SDSS ugriz
161,
162,
163, # 2MASS JHK
414,
415,
416, # ugi Antwi-Danso 2022
]
DEFAULT_SCALE_KWARGS = dict(
order=0, sys_err=0.02, nspline=31, scale_disp=1.3, vel_width=100
)
BETA_KWS = dict(
wrange=((0.14, 0.1860), (0.1955, 0.2580)),
ref_wave=0.1550,
dla_wrange=(0.1180, 0.1350),
fit_restart=False,
make_plot=False,
)
[docs]def do_integrate_filters(
file,
z=0,
RES=None,
fnumbers=DEFAULT_FNUMBERS,
rest_fnumbers=DEFAULT_REST_FNUMBERS,
scale_kwargs=DEFAULT_SCALE_KWARGS,
beta_kwargs=BETA_KWS,
):
"""
Integrate a spectrum through a list of filter bandpasses
Parameters
----------
file : str
Spectrum filename
z : float
Redshift
RES : `eazy.filters.FilterFile`
Container of filter bandpasses
fnumbers : list
List of observed-frame ``f_numbers``
rest_fnumbers : list
List of rest-frame ``f_numbers`` to evaluate at ``z``
scale_kwargs : dict, None
If provided, initialize the spectrum by first passing through
`~msaexp.spectrum.calc_uncertainty_scale`
beta_kwargs : dict
Compute rest-frame UV slope and DLA equivalent width with
`~msaexp.spectrum.measure_uv_slope`
Returns
-------
fdict : dict
"horizontal" dictionary with keys for each separate filter
sed : `~astropy.table.Table`
"vertical" table of the integrated flux densities
"""
import eazy.filters
global SCALE_UNCERTAINTY
SCALE_UNCERTAINTY = 1.0
if RES is None:
RES = eazy.filters.FilterFile(path=None)
if scale_kwargs is not None:
# Initialize spectrum rescaling uncertainties
spec, escl, _sys_err, _ = calc_uncertainty_scale(
file, z=z, **scale_kwargs
)
spec["escale"] *= escl
set_spec_sys_err(spec, sys_err=_sys_err)
else:
# Just read the catalog
spec = utils.read_catalog(file)
if "escale" not in spec.colnames:
spec["escale"] = 1.0
rows = []
fdict = {"file": file, "z": z, "escale": np.nanmedian(spec["escale"])}
# Observed-frame filters
for fn in fnumbers:
obs_flux = integrate_spectrum_filter(spec, RES[fn], z=0)
rows.append(
[RES[fn].name, RES[fn].pivot / 1.0e4, fn, 0] + list(obs_flux)
)
for c, v in zip(
["valid", "frac", "flux", "err", "full_err"], obs_flux
):
fdict[f"obs_{fn}_{c}"] = v
for fn in rest_fnumbers:
rest_flux = integrate_spectrum_filter(spec, RES[fn], z=z)
rows.append(
[RES[fn].name, RES[fn].pivot * (1 + z) / 1.0e4, fn, z]
+ list(rest_flux)
)
for c, v in zip(
["valid", "frac", "flux", "err", "full_err"], rest_flux
):
fdict[f"rest_{fn}_{c}"] = v
sed = utils.GTable(
names=[
"name",
"pivot",
"f_number",
"z",
"valid",
"frac",
"flux",
"err",
"full_err",
],
rows=rows,
)
if beta_kwargs is not None:
bet = measure_uv_slope(spec, z, **beta_kwargs)
# beta, beta_unc, ndla, dla_value, dla_unc, _fig = _
cov = bet.pop("beta_cov")
for k in bet:
fdict[k] = bet[k]
for i in range(2):
for j in range(2):
fdict[f"beta_cov_{i}{j}"] = cov[i][j]
# fdict['dla_beta'] = beta
# fdict['dla_beta_unc'] = beta_unc
# fdict['ndla'] = ndla
# fdict['dla_value'] = dla_value
# fdict['dla_unc'] = dla_unc
return fdict, sed
[docs]def integrate_spectrum_filter(spec, filt, z=0, filter_fraction_threshold=0.1):
"""Integrate spectrum data through a filter bandpass
Parameters
----------
spec : `~astropy.table.Table`
Spectrum data with columns ``wave`` (microns), ``flux`` and ``err``
[``full_err``] (fnu) and ``valid``
filt : `~eazy.filters.FilterDefinition`
Filter bandpass object
z : float
Redshift
filter_fraction_threshold : float
Minimum allowed ``filter_fraction``, i.e. for filters that overlap
with the observed spectrum
Returns
-------
npix : int
Number of "valid" pixels
filter_fraction : float
Fraction of the integrated bandpass that falls on valid spectral
wavelength bins
filter_flux : float
Integrated flux density, in units of ``spec['flux']``
filter_err : float
Propagated uncertainty from ``spec['err']``
filter_full_err : float
Propagated uncertianty from ``spec['full_err']``, if available.
Returns -1 if not available.
"""
# Interpolate bandpass to wavelength grid
filter_int = np.interp(
spec["wave"],
filt.wave / 1.0e4 * (1 + z),
filt.throughput,
left=0.0,
right=0.0,
)
if "valid" in spec.colnames:
valid = spec["valid"]
else:
valid = (spec["err"] > 0) & np.isfinite(spec["err"] + spec["flux"])
if valid.sum() < 2:
return (valid.sum(), 0.0, 0.0, -1.0, -1.0)
# Full filter normalization
filt_norm_full = trapz(
filt.throughput / (filt.wave / 1.0e4), filt.wave / 1.0e4
)
# Normalization of filter sampled by the spectrum
filt_norm = trapz(
(filter_int / spec["wave"])[valid], spec["wave"][valid]
)
filter_fraction = filt_norm / filt_norm_full
if filter_fraction < filter_fraction_threshold:
return (valid.sum(), filter_fraction, 0.0, -1.0, -1.0)
#######
# Integrals
# Trapezoid rule steps
trapz_dx = utils.trapz_dx(spec["wave"])
# Integrated flux
fnu_flux = (
(filter_int / filt_norm * trapz_dx * spec["flux"] / spec["wave"])[
valid
]
).sum()
# Propagated uncertainty
fnu_err = np.sqrt(
(
(filter_int / filt_norm * trapz_dx * spec["err"] / spec["wave"])[
valid
]
** 2
).sum()
)
if "full_err" in spec.colnames:
fnu_full_err = np.sqrt(
(
(
filter_int
/ filt_norm
* trapz_dx
* spec["full_err"]
/ spec["wave"]
)[valid]
** 2
).sum()
)
else:
fnu_full_err = -1.0
return valid.sum(), filter_fraction, fnu_flux, fnu_err, fnu_full_err
def measure_uv_slope(
spec,
z,
wrange=((0.14, 0.1860), (0.1955, 0.2580)),
ref_wave=0.1550,
dla_wrange=(0.1180, 0.1350),
fit_restart=False,
make_plot=False,
):
"""
Measure UV slope beta and absolute magnitude
Parameters
----------
spec : `~astropy.table.Table`
Spectrum data with columns ``wave`` (microns), ``flux`` and ``err``
[``full_err``] (fnu) and ``valid``
z : float
Redshift
wrange : list of (float, float)
Wavelength ranges for the UV slope fit, microns. The default excludes
potential contribution from CIII]1909.
ref_wave : float
Reference wavelength, microns
dla_wrange : (float, float)
Wavelength range for the DLA equivalent width parameter, which is the
integral of ``(1 - Fobs/Fcont)`` near Ly-alpha where ``Fobs`` is the
observed spectrum and ``Fcont`` is the continuum spectrum extrapolated
with the derived UV slope beta
fit_restart : bool
Redo fit after perturbing initial parameters. Seems to help make the
fit covariance more reasonable in some cases.
make_plot : bool
Make a diagnostic plot
Returns
-------
res : dict
- ``beta`` = Derived UV slope ``flam = lam**beta``
- ``beta_ref_flux`` = Flux density at redshifted ``ref_wave``
- ``beta_cov`` = 2x2 covariance matrix for the fit for ``beta``,
``flux_ref``
- ``beta_npix`` = Number of wavelength pixels satisfying ``wrange``
- ``beta_wlo, beta_whi`` = Minimum and maximum of rest-frame
wavelengths within ``wrange``
- ``beta_nmad`` = NMAD of the power-law beta fit
- ``dla_npix`` = Number of spectral bins for DLA parameter estimate
- ``dla_value`` = DLA equivalent width, Angstroms
- ``dla_unc`` = Uncertainty on the DLA EQW
- ``fig`` = `matplotlib.Figure` object if requested with make_plot
"""
from scipy.optimize import minimize
# Force fit beta slope to linear flux densities
fit_linear = True
wrest = spec["wave"] / (1 + z)
blim = np.zeros(len(spec), dtype=bool)
for wr in wrange:
blim |= (wrest >= wr[0]) & (wrest <= wr[1])
pos = spec["flux"].data > 0
blim &= spec["flux"] > -spec["full_err"]
npix = blim.sum()
if npix < 3:
cov = np.ones((2, 2)) * np.nan
res = {
"beta": np.nan,
"beta_ref_flux": np.nan,
"beta_cov": np.ones((2, 2)) * np.nan,
"beta_npix": npix,
"beta_wlo": np.nan,
"beta_whi": np.nan,
}
return res
wlo = wrest[blim].min()
whi = wrest[blim].max()
scale_err = 1.0
# Objective function for beta fit
def _objfun_beta(theta, x, flux, err, ret):
m = theta[1] * x ** (theta[0] + 2)
if ret == 1:
return m
chi2 = ((flux - m) ** 2 / err**2).sum()
# Penalize very blue
if theta[0] < -2:
chi2 += (theta[0] - -2) ** 2 / 2 / 1.0**2
return chi2
# Guess from log-linear fit mag = beta * ln(wave) + mag(ref_wave)
lnf = 23.9 - 2.5 * np.log10((spec["flux"].data))
lnfe = (
2.5 / np.log(10) * spec["full_err"] / scale_err / spec["flux"]
).data
x = np.log(wrest / ref_wave)
A = np.array([-x, x**0])
AxT = (A / lnfe).T
c = np.linalg.lstsq(
AxT[blim & pos, :], (lnf / lnfe)[blim & pos], rcond=None
)
beta = c[0][0] - 2
ref_flux = 10 ** (-0.4 * (c[0][1] - 23.9))
theta = c[0]
np.random.seed(fit_restart * 1)
nmad = -1
if fit_linear:
# Now fit in linear flux densities
x0 = np.array([c[0][0] - 2, 10 ** (-0.4 * (c[0][1] - 23.9))])
xargs = (
wrest[blim] / ref_wave,
spec["flux"].data[blim],
spec["full_err"].data[blim],
)
_x = minimize(_objfun_beta, x0=x0, method="bfgs", args=(*xargs, 0))
best_model = _objfun_beta(_x.x, *xargs, 1)
resid = (xargs[1] - best_model) / xargs[2]
# Rescale Uncertainties
nmad = utils.nmad(resid)
if fit_restart > 0:
# Restart
print("nmad: ", nmad, fit_restart)
xargs = (
wrest[blim] / ref_wave,
spec["flux"].data[blim],
spec["full_err"].data[blim] * nmad,
)
if fit_restart > 1:
_x = minimize(
_objfun_beta,
x0=_x.x + np.random.normal(size=2) * np.array([0.1, 0.03]),
method="bfgs",
args=(*xargs, 0),
) # , tol=1.e-6)
else:
_x = minimize(
_objfun_beta, x0=x0, method="bfgs", args=(*xargs, 0)
) # , tol=1.e-6)
theta = _x.x
beta, ref_flux = theta
# Covariance is hess_inv from the BFGS optimization
cov = _x.hess_inv
else:
cov = utils.safe_invert(
np.dot(AxT[blim & pos, :].T, AxT[blim & pos, :])
)
nmad = -1
res = {
"beta": beta,
"beta_ref_flux": ref_flux,
"beta_cov": cov,
"beta_npix": npix,
"beta_wlo": wlo,
"beta_whi": whi,
"beta_nmad": nmad,
}
# DLA equivalent width parameter
if fit_linear:
xdla = (
(wrest >= dla_wrange[0])
& (wrest <= dla_wrange[1])
& (spec["valid"])
)
res["dla_npix"] = xdla.sum()
if xdla.sum() < 3:
dla_value = -1
dla_unc = -1
res["dla_value"] = -1
res["dla_unc"] = -1
else:
draws = np.random.multivariate_normal(theta, cov, 1000)
xargs = (
wrest[xdla] / ref_wave,
spec["flux"].data[xdla],
spec["full_err"].data[xdla],
)
dla_continuum = _objfun_beta(_x.x, *xargs, 1)
dla_draws = np.array(
[_objfun_beta(xi, *xargs, 1) for xi in draws]
).T
dla_continuum_unc = np.std(dla_draws, axis=1)
dx = utils.trapz_dx(wrest[xdla]) * 1.0e4
sx = spec[xdla]
ydata = 1 - sx["flux"] / dla_continuum
vdata = (sx["flux"] / dla_continuum) ** 2 * (
(sx["full_err"] / sx["flux"]) ** 2
+ (dla_continuum_unc / dla_continuum) ** 2
)
# Do trapezoid rule integration and propagation of uncertainty
dla_value = (ydata * dx).sum()
dla_unc = np.sqrt((vdata * dx**2).sum())
res["dla_value"] = dla_value
res["dla_unc"] = dla_unc
else:
res["dla_npix"] = 0
res["dla_value"] = dla_value = -1
res["dla_unc"] = dla_unc = -1
# Make a diagnostic plot
if make_plot:
fig, ax = plt.subplots(1, 1, figsize=(7, 4))
draws = np.random.multivariate_normal(theta, cov, 100)
if fit_linear:
xargs = (
wrest / ref_wave,
spec["flux"].data,
spec["full_err"].data,
)
best_model = _objfun_beta(_x.x, *xargs, 1)
flux_draws = np.array(
[_objfun_beta(xi, *xargs, 1) for xi in draws]
).T
mag = 23.9 - 2.5 * np.log10(ref_flux)
emag = 2.5 / np.log(10) * np.sqrt(cov[1][1]) / ref_flux
else:
best_model = 10 ** (-0.4 * (A.T.dot(c[0]) - 23.9))
flux_draws = 10 ** (-0.4 * (A.T.dot(draws.T) - 23.9))
mag = c[0][1]
emag = np.sqrt(cov[1][1])
ok = ~blim
ax.errorbar(
wrest[ok],
spec["flux"][ok],
spec["full_err"][ok] / scale_err,
color="k",
marker=".",
linestyle="None",
alpha=0.5,
)
ax.errorbar(
wrest[blim],
spec["flux"][blim],
spec["full_err"][blim] / scale_err,
color="magenta",
marker="o",
linestyle="None",
alpha=0.5,
)
# plot labels
lab = r"$\beta = bbv \pm bbe$"
lab += "\n" + r"$m_{rr} = mmv \pm mme$"
lab = lab.replace("bbv", f"{beta:5.2f}")
lab = lab.replace("bbe", f"{np.sqrt(cov[0][0]):5.2f}")
lab = lab.replace("rr", f"{ref_wave*1e4:.0f}")
lab = lab.replace("mmv", f"{mag:5.2f}")
lab = lab.replace("mme", f"{emag:4.2f}")
ax.plot(wrest, best_model, color="tomato", label=lab)
if dla_value < 0:
y0 = 2 * np.interp(0.1216, wrest[xdla], dla_continuum)
else:
y0 = 0.0
ax.vlines(
0.1216 + np.array([-1, 1]) * dla_value / 2.0e4,
y0,
np.interp(0.1216, wrest[xdla], dla_continuum),
color="orange",
label=f"EW_DLA = {dla_value:.1f} ({dla_unc:.1f})",
)
_ = ax.plot(wrest, flux_draws, color="pink", alpha=0.1)
ax.set_xlim(0.1, 0.39)
ymax = flux_draws[blim, :].max() + spec["full_err"][blim].max()
for wr in wrange:
ax.vlines(
wr, -0.5 * ymax, 10 * ymax, color="purple", lw=3, alpha=0.4
)
leg = ax.legend(loc="upper right")
leg.set_title(f"{spec.meta['SRCNAME']}\nz = {z:.4f}")
ax.set_ylim(-0.5 * ymax, 2 * ymax)
ax.errorbar(
wrest[xdla], dla_continuum, dla_continuum_unc, color="orange"
)
ax.set_xlabel(r"$\lambda_\mathrm{rest}$")
ax.set_ylabel(r"$f_\nu$")
ax.grid()
fig.tight_layout(pad=1)
res["fig"] = fig
return res