Source code for msaexp.utils.lines

"""
Tools for working with emission lines
"""

import os

import numpy as np

import astropy.constants
import astropy.units as u

from grizli import utils

try:
    from grizli.utils import trapz
except ImportError:
    try:
        from numpy import trapz
    except ImportError:
        from numpy import trapezoid as trapz

from .general import module_data_path

__all__ = ["LineList", "MolecularHydrogen", "line_flam_to_fnu", "trapz"]

def arabic_to_roman(label, count=-1):
    """
    Convert string with an arabic number to Roman numeral, e.g., "Ca2" -> "CaII"
    """
    roman = ['','I','II','III','IV','V','VI','VII','VIII','IX','X','XI','XII','XIII','XIV','XV']
    out = label + ''
    for a in range(1,len(roman)):
        out = out.replace(f"{a}", roman[a], count)

    return out


[docs]def line_flam_to_fnu( wave=2.12 * (1 + 1.0132), flam_unit=1.0e-20 * u.erg / u.second / u.cm**2 / u.Angstrom, ): """ Convert from flambda cgs to microJansky """ to_fnu = ( (1 * flam_unit) .to(u.microJansky, equivalencies=u.spectral_density(wave * u.micron)) .value ) return to_fnu
def axis_renorm_x(ax, value): """Transform value to normalized coordinate""" if ax.xaxis.get_scale() == "log": return np.interp(np.log(value), np.log(ax.get_xlim()), [0,1]) else: return np.interp(value, ax.get_xlim(), [0,1])
[docs]class LineList(object): WHITE_BBOX = { "fc": "w", "ec": "None", "alpha": 1.0, "boxstyle": "square,pad=0.3", } CLEAN_CENTER_KWARGS = { "ha": "center", "bbox": WHITE_BBOX, "x_pad": 0.0, } LEFT_OFFSET_KWARGS = { "ha": "right", "x_pad": -0.005, } RIGHT_OFFSET_KWARGS = { "ha": "left", "x_pad": 0.005, } def __init__(self, data=None): """ Tools for working with a table of lines Examples -------- .. plot:: :include-source: from msaexp.utils import LineList ll = LineList() fig = ll.demo() """ if data is None: self.load_data() else: self.data = data
[docs] def load_data(self, fill_color="0.2"): """ Load the data from ``msaexp/data/linelist.ecsv`` """ linelist_file = os.path.join(module_data_path(), "linelist.ecsv") self.data = utils.read_catalog(linelist_file, format="ascii.ecsv") self.data["color"] = self.data["color"].filled(fill_color)
#self.N = len(self.data)
[docs] @staticmethod def from_table(tab, wave_column="wavelength", to_angstroms=1.0, color="0.5", priority=5, prefix="", ratio_column=None): """ Generate from normal table with a list of line wavelengths """ data = utils.GTable() for c in tab.colnames: data[c] = tab[c] data["wavelength"] = [[w*to_angstroms] for w in tab[wave_column]] if ratio_column is None: data["ratio"] = [[1.0] for w in tab[wave_column]] else: data["ratio"] = [[r] for r in tab[ratio_column]] if "color" not in data.colnames: data["color"] = color if "name" not in data.colnames: data["name"] = [ f"{prefix}{w*to_angstroms:.0f}" for w in tab[wave_column] ] if "label" not in data.colnames: data["label"] = data["name"] if "priority" not in data.colnames: data["priority"] = priority return LineList(data=data)
@property def N(self): return len(self.data)
[docs] def select_lines( self, z=0, wave_range=[0, np.inf], prefixes=[], exclude_prefixes=[], priorities=[4, 5], floor=True, **kwargs, ): """ Get selection of lines within a wavelength range Parameters ---------- z : float Redshift wave_range : [float, float] Observed-frame wavelength range. The script tries to automatically distinguish between wavelength units of Angstroms or microns assuming the latter if ``wave_range[1] < 500`` prefixes : list List of strings compared to the ``name`` column in the linelist using the test ``name.startswith(prefix)`` priorities : list List of linelist priorities to include floor : bool If true, ignore decimal part of the linelist priorities and just compare the integer values Returns ------- selected : boolean array Selection array on the listlist table that satisfy the selection criteria x_scale : float Scale factor needed to put the "wavelength" column of the linelist in the frame of ``wave_range`` """ x_scale = 1 + z if wave_range[1] < 500: # axis is in probably in microns x_scale /= 1.0e4 selected = np.zeros(self.N, dtype=bool) for i, row in enumerate(self.data): selected[i] = (row["wavelength"][0] > wave_range[0] / x_scale) & ( row["wavelength"][0] < wave_range[1] / x_scale ) if round: selected[i] &= np.floor(row["priority"]) in priorities else: selected[i] &= row["priority"] in priorities if len(prefixes) > 0: in_prefixes = False for prefix in prefixes: if row["name"].startswith(prefix): in_prefixes = True break selected[i] &= in_prefixes if len(exclude_prefixes) > 0: in_excluded = False for prefix in exclude_prefixes: if row["name"].startswith(prefix): in_excluded = True break selected[i] &= ~in_excluded return selected, x_scale
[docs] def add_to_axis( self, ax, wave_pixels=None, bottom=0, top=1.0, x_pad=0, wfunc=np.min, y_label=0.98, priorities=[3, 4, 5], label_priorities=None, label_prefixes=[], y_normalized=True, rotation=90, ha="center", va="top", lw=1.5, ls="-", fontsize=7, alpha=0.8, bbox={"fc": "w", "ec": "None", "alpha": 1.0}, zorder=1, label_zorder=2, **kwargs, ): """ Add vlines and labels to axis Parameters ---------- ax : `matplotlib` axis Axis to draw labels in bottom : float, (array-like, array-like) Lower limit of vlines to draw. If two arrays are provided, will interpolate the value ``value = np.interp(wave, *bottom)`` top : float, (array-like, array-like) Upper limit of the vlines, with same interpolation options as ``bottom`` x_pad : float relative padding for text labels wfunc : func Function to apply to the potential multiple entries for line complexes to specify where to draw the labels y_label : float Vertical location of the line text labels priorities : list List of priorities to include from the linelist table label_priorities : list, None List of priorities to include with text labels. If not provided, will be the same as ``priorities`` label_prefixes : list Prefix filtering for lines with text labels y_normalized : bool If True, the ``bottom``, ``top`` and ``y_label`` values are calculated in the normalized frame of the plot extent, i.e., with 0 at the bottom and 1 at the top. Not used if values are interpolated. lw, ls : float, string Linewidth and linestyle properties for the vlines rotation, ha, va, fontsize, alpha, bbox, zorder : Text label properties Returns ------- data : table View of the linelist table with the selected lines """ xlim = ax.get_xlim() ylim = ax.get_ylim() if wave_pixels is not None: wpix = np.arange(len(wave_pixels)) xlim = np.interp(xlim, wpix, wave_pixels) if label_priorities is None: label_priorities = priorities label_priorities_str = ' '.join([f'{p:.1f}' for p in label_priorities]) selected, x_scale = self.select_lines( wave_range=xlim, priorities=priorities, **kwargs ) if selected.sum() == 0: return None is_xlog = ax.xaxis.get_scale() == "log" ax_yscale = ax.yaxis.get_scale() is_ylog = ax_yscale == "log" if y_normalized: tr = ax.transAxes y_normalized = False else: tr = None for row in self.data[selected]: wave_i = np.array(row["wavelength"]) * x_scale p_i = row["priority"] if hasattr(bottom, "upper") & (bottom == "priority"): vlo = p_i / 5. elif hasattr(bottom, "__len__"): vlo = np.interp(wave_i, *bottom) else: vlo = bottom if (y_normalized): if is_ylog: vlo = 10 ** np.interp(vlo, [0, 1], np.log10(ylim)) elif ax_yscale == "asinh": vlo = np.sinh(np.interp(vlo, [0, 1], np.arcsinh(ylim))) else: vlo = np.interp(vlo, [0, 1], ylim) else: vlo = vlo if hasattr(top, "upper") & (top == "priority"): vhi = p_i / 5. elif hasattr(top, "__len__"): vhi = np.interp(wave_i, *top) else: vhi = top if y_normalized: if is_ylog: vhi = 10 ** np.interp(vhi, [0, 1], np.log10(ylim)) elif ax_yscale == "asinh": vhi = np.sinh(np.interp(vhi, [0, 1], np.arcsinh(ylim))) else: vhi = np.interp(vhi, [0, 1], ylim) else: vhi = vhi if wave_pixels is not None: x_line = np.interp(wave_i, wave_pixels, wpix) else: x_line = wave_i if tr is not None: x_line = axis_renorm_x(ax, x_line) ax.vlines( x_line, vlo, vhi, color=row["color"], ls=ls, lw=lw, alpha=alpha, zorder=zorder, transform=tr, ) if (y_label is not None) & (f'{p_i:.1f}' in label_priorities_str): if len(label_prefixes) > 0: in_prefixes = False for prefix in label_prefixes: if row["name"].startswith(prefix): in_prefixes = True break if not in_prefixes: continue wl = wfunc(wave_i) if hasattr(y_label, "upper") & (y_label == "priority"): yl = p_i / 5. * 0.95 elif hasattr(y_label, "__len__"): yl = np.interp(wl, *y_label) else: yl = y_label if y_normalized: if is_ylog: yl = 10 ** np.interp(yl, [0, 1], np.log10(ylim)) elif ax_yscale == "asinh": yl = np.sinh(np.interp(yl, [0, 1], np.arcsinh(ylim))) else: yl = np.interp(yl, [0, 1], ylim) else: yl = yl if tr is not None: dx = 0 elif is_xlog: dx = wl * (np.exp(x_pad * np.log(xlim[1] / xlim[0])) - 1) else: dx = x_pad * (xlim[1] - xlim[0]) if wave_pixels is not None: x_text = np.interp(wl + dx, wave_pixels, wpix) else: x_text = wl + dx if tr is not None: x_text = axis_renorm_x(ax, x_text) + x_pad ax.text( x_text, yl, row["label"], color=row["color"], va=va, ha=ha, alpha=alpha, fontsize=fontsize, bbox=bbox, zorder=label_zorder, rotation=rotation, transform=tr, ) return self.data[selected]
[docs] def demo(self, xlim=[0.35, 0.51]): """ Show a demonstration of various plot options """ import matplotlib.pyplot as plt fig, axes = plt.subplots(7, 1, figsize=(8, 8)) for ax in axes: ax.set_ylim(-0.5, 2.5) kws = dict(fontsize=8, bbox=self.WHITE_BBOX, ha="left", va="bottom") lxy = (0.02, 0.09) axes[0].set_xlim(xlim[0] * 1.0e4, xlim[1] * 1.0e4) self.add_to_axis(axes[0]) axes[0].text(*lxy, "defaults", transform=axes[0].transAxes, **kws) for ax in axes[1:]: ax.set_xlim(xlim) self.add_to_axis( axes[1], prefixes=["O", "Pa"], va="bottom", y_normalized=False ) axes[1].text( *lxy, 'prefixes=["O","Pa"], y_normalized=False', transform=axes[1].transAxes, **kws, ) self.add_to_axis( axes[2], priorities=[3, 4, 5], label_prefixes=["Ne", "H"], **self.LEFT_OFFSET_KWARGS, ) axes[2].text( *lxy, 'priorities=[3,4,5], label_prefixes=["Ne","H"], **LEFT_OFFSET_KWARGS', transform=axes[2].transAxes, **kws, ) axes[3].set_ylim(0.1, 10) axes[3].semilogy() self.add_to_axis( axes[3], priorities=[2, 3, 4, 5], label_priorities=[4, 5] ) axes[3].text( *lxy, "priorities=[2,3,4,5], label_priorities=[4,5], semilogy", transform=axes[3].transAxes, **kws, ) axes[4].set_ylim(0.1, 10) axes[4].loglog() self.add_to_axis( axes[4], priorities=[3, 4, 5], y_label=0.5, va="center" ) axes[4].text( *lxy, 'loglog, y_label=0.5, va="center"', transform=axes[4].transAxes, **kws, ) xi = np.linspace(-1, 1, 128) yi = np.cos(xi * 4 * np.pi) xp = np.interp(xi, [-1, 1], xlim) axes[5].plot(xp, yi - 1, color="0.8") axes[5].plot(xp, yi * 0.2 + 1, color="0.8") self.add_to_axis( axes[5], bottom=(xp, yi - 1), top=(xp, yi * 0.2 + 1), priorities=[3, 4, 5], ) axes[5].text( *lxy, "interpolated bottom, top", transform=axes[5].transAxes, **kws, ) axes[6].plot(xp, yi * 0.2 + 1, color="0.8") self.add_to_axis( axes[6], y_label=(xp, yi * 0.2 + 1), priorities=[3, 4, 5], va="center", bbox=self.WHITE_BBOX, fontsize=5, ) axes[6].text( *lxy, 'interpolated y_label, va="center", white bbox, fontsize=5', transform=axes[6].transAxes, **kws, ) fig.tight_layout(pad=1) return fig
[docs] def spec_model( self, spec, z=0, selected=None, fnu=True, use_ratios=True, **kwargs ): """ Make line models `msaexp.spectrum.SpectrumSampler.fast_emission_line` Parameters ---------- spec : `~msaexp.spectrum.SpectrumSampler` Spectrum object z : float Redshift selected : None, array Line selection array. If not provided, will pull from `LineList.select_lines`. fnu : bool Return in fnu units use_ratios : bool Use line ratio values. If False, normalize line flux to unity. kwargs : dict Keyword arguments passed to `LineList.select_lines` and `msaexp.spectrum.SpectrumSampler.fast_emission_line` Returns ------- result : dict Model results: * ``templates``: ``(Nline, Nwave)`` array. of line templates * ``names``: line names * ``index``: indices of the selected lines from the full linelist """ if selected is None: valid_wave = spec["wave"][spec["valid"]] wave_range = [valid_wave.min(), valid_wave.max()] selected, x_scale = self.select_lines( wave_range=wave_range, z=z, **kwargs ) if selected.sum() == 0: return np.zeros((1, spec.NSPEC)), ["empty"] line_names = [] line_templates = [] for i, row in enumerate(self.data[selected]): line_i = np.zeros_like(spec["flux"]) for lw, lr in zip(row["wavelength"], row["ratio"]): if use_ratios: lri = lr else: lri = 1.0 line_i += spec.fast_emission_line( lw * (1 + z) / 1.0e4, line_flux=lri, **kwargs, ) line_templates.append(line_i) line_names.append(row["name"]) line_templates = np.array(line_templates) if fnu: line_templates *= 1.0e-4 / spec["to_flam"] result = { "templates": line_templates, "names": line_names, "index": np.where(selected)[0], } return result
def get_atom_colors(my_colors={}): """ """ from matplotlib.colors import to_rgb, CSS4_COLORS colors = {} # S c_i = np.array(to_rgb(CSS4_COLORS["gold"])) colors["S2"] = c_i * 0.6 colors["S3"] = c_i * 0.8 colors["S4"] = c_i * 0.9 c_i = np.array(to_rgb(CSS4_COLORS["darkorchid"])) colors["Ar3"] = c_i for i in range(7): colors[f"Ar{i+1}"] = c_i * np.interp(i, [0, 6], [0.6, 0.9]) # C # for i in range(4): # colors[f"C{i+1}"] = np.ones(3) * np.interp(i, [0, 3], [0.3, 0.7]) c_i = np.array(to_rgb(CSS4_COLORS["goldenrod"])) for i in range(4): colors[f"C{i+1}"] = c_i * 0.7 # colors["Fe2"] = np.array([63.14, 61.57, 58.04])/100. colors["Fe2"] = np.array(to_rgb(CSS4_COLORS["saddlebrown"])) for i in range(2, 11): colors[f"Fe{i+1}"] = colors["Fe2"] * np.interp(i, [2, 10], [0.5, 0.8]) # O c_i = np.array(to_rgb(CSS4_COLORS["yellowgreen"])) for i in range(5): colors[f"O{i+1}"] = np.array(c_i) * np.interp(i, [0, 5], [0.6, 0.9]) for i in range(5, 7): colors[f"O{i+1}"] = colors["O3"] # Ne c_i = np.array(to_rgb(CSS4_COLORS["orange"])) for i in range(2, 7): colors[f"Ne{i}"] = np.array(c_i) * np.interp(i, [2, 6], [0.6, 0.9]) # Mg c_i = np.array(to_rgb(CSS4_COLORS["thistle"])) for i in range(1, 9): colors[f"Mg{i}"] = c_i * 0.7 # N c_i = np.array(to_rgb(CSS4_COLORS["steelblue"])) for i in range(1, 5): colors[f"N{i}"] = np.array(c_i) * np.interp(i, [1, 5], [0.6, 0.9]) # Si c_i = np.array(to_rgb(CSS4_COLORS["green"])) for i in range(1, 10): colors[f"Si{i}"] = np.array(c_i) * c_i * (0.8 - 0.2 * (i % 2)) c_i = np.array(to_rgb(CSS4_COLORS["orangered"])) for i in range(1, 10): colors[f"Ca{i}"] = np.array(c_i) * c_i * (0.8 - 0.2 * (i % 2)) c_i = np.array(to_rgb(CSS4_COLORS["yellowgreen"])) for i in range(1, 10): colors[f"Cl{i}"] = np.array(c_i) * c_i * (0.8 - 0.2 * (i % 2)) # Al, Ni c_i = np.array(to_rgb(CSS4_COLORS["slategray"])) for i in range(10): colors[f"Ca{i+1}"] = c_i colors[f"Al{i+1}"] = c_i * 0.8 colors[f"Ni{i+1}"] = c_i * 0.6 colors[f"Na{i+1}"] = c_i * 0.6 colors[f"K{i+1}"] = c_i * 0.5 for i in range(1, 11): colors[f"P{i}"] = np.array(to_rgb(CSS4_COLORS["peru"])) c_i = np.array(to_rgb(CSS4_COLORS["tomato"])) for i, v in enumerate(['H','Pa','Br','Pf','Hu']): colors["H1_" + v] = c_i * (0.8 - 0.3 * (i % 2)) c_i = np.array(to_rgb(CSS4_COLORS["teal"])) colors["He1"] = c_i * 0.8 colors["He2"] = c_i * 0.95 for c in my_colors: if hasattr(my_colors[c], 'upper'): colors[c] = to_rgb(CSS4_COLORS[my_colors[c]]) else: colors[c] = my_colors[c] return colors class FullLineList(LineList): column = None threshold = 1.e-3 with_fe2 = False with_extra = True emis_columns = [] merged_lines = {} def __init__(self, use_pumped_iron=True, **kwargs): """ Use a linelist derived from PyNeb with ionic line ratios as a function of temperature and density """ import astropy.table self.pyneb = utils.read_catalog( os.path.join(module_data_path(), "lines_pyneb_emissivity.rst"), format="ascii.rst" ) self.available_columns = [] for c in self.pyneb.colnames: if c.startswith('t'): if (c[4] == "n"): self.available_columns.append(c) if use_pumped_iron: fe2 = utils.read_catalog( os.path.join(module_data_path(), "lines_pumped_iron.rst"), format="ascii.rst" ) for c in self.available_columns: fe2[c] = fe2["flux"] fe2.remove_column("flux") pop = self.pyneb["atom"] == "Fe2" self.pyneb = astropy.table.vstack([self.pyneb[~pop], fe2]) self.extra = utils.read_catalog( os.path.join(module_data_path(), "lines_supplemental.rst"), format="ascii.rst" ) self.pyneb["pyneb"] = True self.extra["pyneb"] = False self.set_colors(**kwargs) self.set_linelist(**kwargs) def set_colors(self, my_colors={}, default=(0.2, 0.2, 0.2), **kwargs): """ Set colors for line labels """ from matplotlib.colors import to_rgb, CSS4_COLORS if hasattr(default, "upper"): self.pyneb["color"] = [to_rgb(CSS4_COLORS[default])] * len(self.pyneb) self.extra["color"] = [to_rgb(CSS4_COLORS[default])] * len(self.extra) else: self.pyneb["color"] = [default] * len(self.pyneb) self.extra["color"] = [default] * len(self.extra) colors = get_atom_colors() for t in [self.pyneb, self.extra]: ind_h1 = np.where(t["atom"] == "H1")[0] for c in colors: if c.startswith("H1_"): for j in ind_h1: prefix = f"{t['atom'][j]}_{t['name'][j]}" if prefix.startswith(c): t["color"][j] = colors[c] else: test = t["atom"] == c if test.sum() > 0: for j in np.where(test)[0]: t["color"][j] = colors[c] def calculate_merged_lines(self, threshold=0.01, skip_atoms=["H1", "Fe2"], verbose=True): """ Look for lines with ratios insensitive to temperature / density """ line_grid = np.array([self.pyneb[c] for c in self.available_columns]) merged_lines = {} una = utils.Unique(self.pyneb["atom"], verbose=False) for ia, atom in enumerate(una.values): if atom in skip_atoms: continue Na = una.counts[ia] if Na > 0: ind = np.where(una[atom])[0] lr = line_grid[:, ind] lrm = lr.max(axis=0) so = np.argsort(lrm)[::-1] ks = [] for si in range(Na-1): for sj in range(si+1, Na): i = so[si] j = so[sj] if lrm[j] < 8e-3: continue lr_ij = lr[:, j] / lr[:, i] ok_ij = np.isfinite(lr_ij) std = np.std(lr_ij[ok_ij]) med = np.median(lr_ij[ok_ij]) ki = ind[i] kj = ind[j] if std / med < threshold: if ki in merged_lines: merged_lines[ki].append(kj) else: used = False for k in ks: if ki in merged_lines[k]: merged_lines[k].append(kj) used = True if not used: merged_lines[ki] = [kj] ks.append(ki) # Print a summary if (len(ks) > 0) & verbose: print(f"\nAtom: {atom}") for k in ks: print(f" {self.pyneb['name'][k]}") for kj in merged_lines[k]: lr_ij = line_grid[:, kj] / line_grid[:, k] lr_ijs = " ".join([f"{v:6.3f}" for v in lr_ij]) print(f" {self.pyneb['name'][kj]}: {lr_ijs}") return merged_lines def set_linelist(self, column="t1.0n3", threshold=1.e-3, with_fe2=False, with_extra=True, **kwargs): """ """ import astropy.table keep = self.pyneb["wavelength"] > 0 if column is not None: keep &= self.pyneb[column] > threshold if not with_fe2: keep &= ~(self.pyneb["atom"] == "Fe2") self.column = column self.threshold = threshold self.with_fe2 = with_fe2 self.with_extra = with_extra pn_data_ = LineList.from_table( self.pyneb[keep], ratio_column=column, to_angstroms=1.e4 ).data if with_extra: data_ = [ pn_data_, LineList.from_table(self.extra, to_angstroms=1.e4).data ] data_ = astropy.table.vstack(data_) else: data_ = pn_data_ # Merged lines if column is not None: kfull = np.zeros(len(self.pyneb), dtype=int) - 1 kfull[keep] = np.arange(keep.sum(), dtype=int) lw = [w.tolist() for w in data_["wavelength"]] lr = [r.tolist() for r in data_["ratio"]] pop = np.zeros(len(data_), dtype=bool) merged_k = [] for k in self.merged_lines: ks = np.array([k] + self.merged_lines[k]) keep_k = keep[ks] if keep_k.sum() < 2: continue ks = ks[keep_k] lr_k = self.pyneb[column][ks] so = np.argsort(lr_k)[::-1] kf = kfull[ks[so]].tolist() data_["priority"][kf[0]] = self.pyneb["priority"][ks].max() for kf_i in kf[1:]: lw[kf[0]] += lw[kf_i] lr[kf[0]] += lr[kf_i] merged_k.append(kf[0]) pop[kf[1:]] = True data_["wavelength"] = lw data_["ratio"] = lr data_["multiple"] = False data_["multiple"][merged_k] = True data_ = data_[~pop] self.data = data_ def atom_spec_model(self, spec, **kwargs): """ """ kwargs["use_ratios"] = True res = self.spec_model(spec, **kwargs) pyn = self.data["pyneb"][res["index"]] if pyn.sum() == 0: return res ind_pyn = np.where(pyn)[0] una = utils.Unique(self.data["atom"][res["index"]][pyn], verbose=False) templates = [] names = [] index = [] for atom in una.values: for i, j in enumerate(ind_pyn[una[atom]]): if i == 0: model_j = res["templates"][j,:] index.append(j) else: model_j += res["templates"][j,:] templates.append(model_j) names.append(f"{atom}_series") extra = ~pyn if extra.sum() > 0: for j in np.where(extra)[0]: templates.append(res["templates"][j,:]) names.append(res["names"][j]) index.append(res["index"][j]) result = { "templates": np.array(templates), "names": names, "index": index, "color": self.data["color"][index], } return result def init_pumped_fe2(): """ Process the Fe2 table from Sigut & Pradhan 2003 https://iopscience.iop.org/article/10.1086/345498/fulltext/ """ import pyneb as pn fe2 = utils.read_catalog( "https://iopscience.iop.org/article/10.1086/345498/fulltext/datafile3.txt?doi=10.1086/345498", format="cds" ) fe2["Flux"] = fe2["Flux"].astype(float) / fe2["Flux"].max() fe2["atom"] = "Fe2" fe2["wavelength"] = pn.utils.physics.airtovac(fe2["Wave"]) / 1.e4 fe2["name"] = [f"Fe2_{w*1.e4:.0f}" for w in fe2["wavelength"]] fe2["label"] = [r"FeII$_{~xx}$".replace("xx", f"{w*1.e4:.0f}") for w in fe2["wavelength"]] fe2["priority"] = 5 + np.round(np.maximum(np.log10(fe2["Flux"]), -4)).astype(int) fe2.remove_column("Wave") fe2.rename_column("Flux","flux") fe2["wavelength"].format = ".7f" fe2["flux"].format = ".3f" fe2.write("/tmp/lines_pumped_iron.rst", format="ascii.rst", overwrite=True)
[docs]class MolecularHydrogen: # Reference transition reference = ("1-0", "S(1)") # Do Z(T) sum separate by odd / even j separate_ZT = False def __init__(self, **kwargs): """ Tools for working with molecular hydrogen lines using the helpful relations from Aditya Togi and J. D. T. Smith 2016 ApJ, 830, 18 Optically thin flux: .. math:: F_j = h \\nu A N_{j+2} \\Omega / 4\\pi Line flux F as a function of temperature, T (excitation diagram): .. math:: N_{j+2} = \\frac{g_{j+2}}{Z(T)} e^{-E_\mathrm{upper} / kT} Z(T) = \\sum g_j e^{-E_j / kT} F_j \\propto \\frac{N_{j+2} A}{\\lambda} Line data from the Gemini compilation at https://www.gemini.edu/observing/resources/near-ir-resources/spectroscopy/important-h2-lines """ self.load_data() self.initialize_flux_table() self.unv = utils.Unique(self.data["vib"], verbose=False)
[docs] def load_data(self): """ Read the data table and set the units on some columns """ self.data = utils.read_catalog( os.path.join(module_data_path(), "h2_linelist.txt") ) self.data["wave"].unit = u.micron self.data["A"].unit = 1.0e-7 * u.second**-1 self.data["j"] = [int(r[2:-1]) for r in self.data["rot"]] self.data["OSQ"] = [r[0] for r in self.data["rot"]]
[docs] def initialize_flux_table(self): """ Initialize ``flux`` table """ self.flux = self.data["wave",] self.flux["name"] = self.line_names() self.flux["flux"] = 0.0 self.flux["Nj"] = 0.0 self.flux["mask"] = True self.flux.meta["T"] = (0, "Temperature") self.Tflux = None
[docs] def line_names(self, prefix=r"H$_2$"): """ LaTeX line names """ return np.array( [ prefix + f'{row["vib"]:^5} {row["rot"]}' for row in self.data["vib", "rot"] ] )
@property def nu(self): """ Transition frequency """ return (astropy.constants.c / self.data["wave"]).to(u.Hz) @property def h_nu_A(self): """ precompute :math:`h \\cdot \\nu \\cdot A` (erg / second) """ return (astropy.constants.h * self.nu * self.data["A"]).to( u.erg / u.second ) @property def N(self): """ Length of data table """ return len(self.data) @property def ix(self): """ Index of the reference transition (usually "1-0 S(1)") in the data table """ return np.where( (self.data["vib"] == self.reference[0]) & (self.data["rot"] == self.reference[1]) )[0][0]
[docs] @staticmethod def temperature_powerlaw(n=4.5, Tl=50, Tu=2000, nsteps=16): """ Generate a powerlaw temperature distribution :math:`dN = T^{-n} dT` """ tgrid = np.linspace(Tl, Tu, nsteps) Nt = tgrid**-n * (n - 1) / (Tl ** (1 - n) - Tu ** (1 - n)) return tgrid, Nt
[docs] def ZT(self, T=1000.0): """ Compute :math:`Z(T) = \\sum g_j e^{-E_j / kT}` """ if self.separate_ZT: # Seems like it should be this ZT = np.zeros_like(self.data["wave"]) for i in [0, 1]: sub = self.data["j"] % 2 == i ZT[sub] = np.sum( (self.data["gJ"] * np.exp(-self.data["Eupper"] / T))[sub] ) else: # But this needed to make the ratios agree with the gemini table ZT = np.sum((self.data["gJ"] * np.exp(-self.data["Eupper"] / T))) return ZT
[docs] def Nj(self, T=1000.0): """ Compute number density: :math:`N_j = \\frac{g_j}{Z(T)}~e^{-E_j / kT}` """ if hasattr(T, "__len__"): # Temperature distribution norm = trapz(T[1], T[0]) Tx = T[0] Ty = T[1] / norm dT = utils.trapz_dx(Tx) else: Tx = [T] Ty = [1.0] dT = [1.0] Nj = np.zeros(self.N) Nt = len(Tx) for i in range(Nt): Nj += ( dT[i] * Ty[i] * self.data["gJ"] / self.ZT(Tx[i]) * np.exp(-self.data["Eupper"] / Tx[i]) ) return Nj
[docs] def line_flux(self, T=1000.0, **kwargs): """ Line flux :math:`F_j = h \\nu A N_{j+2} \\Omega / 4\\pi` """ Nj = self.Nj(T) * u.cm**-2 Fj = self.h_nu_A * Nj return Nj, Fj.to(u.erg / u.second / u.cm**2)
[docs] def update_flux_table( self, T=1000.0, min_line_fraction=0.05, wave_range=(1.6, 2.6), **kwargs ): """ Get a list of lines and ratios """ Nj, Fj = self.line_flux(T, **kwargs) keep = ( (Fj > min_line_fraction * Fj[self.ix]) & (self.data["wave"] > wave_range[0]) & (self.data["wave"] < wave_range[1]) ) self.flux["flux"] = Fj self.flux["Nj"] = Nj self.flux["mask"] = keep self.flux.meta["ref_flux"] = Fj[self.ix] self.Tflux = T
[docs] def to_linelist(self): """ Reformat into a `LineList` object """ rows = [] for j in np.where(self.flux["mask"])[0]: name = "{vib} {rot}".format(**self.data[j]) rows.append({ "priority": 5 if "(1)" in name else 2.1, "name": name, "label": self.flux["name"][j], "wavelength": [self.flux["wave"][j] * 1.e4], "ratio": [self.flux["flux"][j] / self.flux["flux"][self.ix]], "color": "brown", }) return LineList(data=utils.GTable(rows=rows))
[docs] def spec_model( self, spec, z=0.0, update_flux=True, wave_range=None, fnu=True, single=True, **kwargs, ): """ Make an MSAEXP model """ if update_flux: if wave_range is None: v = spec["valid"] wave_range = ( spec["wave"][v].min() / (1 + z), spec["wave"][v].max() / (1 + z), ) self.update_flux_table(wave_range=wave_range, **kwargs) if single: model = np.zeros_like(spec["flux"]) else: models = [] if fnu: scale = 1.0e-4 / spec["to_flam"] else: scale = 1.0 for row in self.flux[self.flux["mask"]]: if single: flux_i = row["flux"] / self.flux.meta["ref_flux"] else: flux_i = 1.0 model_i = spec.fast_emission_line( row["wave"] * (1 + z), line_flux=flux_i, **kwargs, ) if single: model += model_i else: models.append(model_i) if single: return model * scale else: return np.array(models) * scale
[docs] def h2_mass( self, line_flux=1.0e-20 * u.erg / u.second / u.cm**2, transition=("1-0", "S(1)"), z=1.01, **kwargs, ): """ Still figuring out unit conversions.... """ from astropy.cosmology import WMAP9 dL = WMAP9.luminosity_distance(z).to(u.cm) # tgrid, Nt = self.temperature_powerlaw(n=n, Tl=Tl, Tu=Tu, nsteps=512) # Nj = self.Nj(T=(tgrid, Nt)) Nj = self.flux["Nj"] if transition is None: ix = self.ix else: ix = np.where( (self.data["vib"] == transition[0]) & (self.data["rot"] == transition[1]) )[0][0] transition = ( self.data["vib"][ix], self.data["rot"][ix], ) Ntot = line_flux / self.h_nu_A[ix] / Nj[ix] * dL.to(u.cm) ** 2 Mtot = (Ntot * 2 * astropy.constants.m_p).to(u.Msun) result = { "line_flux": line_flux, "z": z, "dL": dL, "transition": transition, "ix": ix, "wave": self.flux["wave"][ix], "name": self.line_names()[ix], "mass": Mtot, "log_mass": np.log10(Mtot.value), } return result