Source code for msaexp.msa

"""
Helper scripts for dealing with MSA metadata files (``MSAMETFL``)
"""

import os
import numpy as np
import astropy.io.fits as pyfits

from . import utils as msautils

__all__ = [
    "regions_from_metafile",
    "regions_from_fits",
    "MSAMetafile",
    "slit_best_source_alias",
    "plot_msa_shutters",
    "set_msa_axis_ticks",
    "read_apt_shutter_csv",
    "get_shutter_wavelength_limits",
]


def pad_msa_metafile(
    metafile,
    pad=0,
    source_ids=None,
    slitlet_ids=None,
    positive_ids=False,
    prefix="src_",
    verbose=True,
    primary_sources=True,
    merge_first=True,
    **kwargs,
):
    """
    Pad a MSAMETFL with dummy slits and trim to a subset of source_ids

    Parameters
    ----------
    metafile : str
        Filename of the MSA metadata file (``MSAMETFL``)

    pad : int
        Padding of dummy slits

    source_ids : list, None
        List of source_id values to keep

    slitlet_ids : list, None
        List of slitlet_id values to keep

    positive_ids : bool
        If no ``source_ids`` provided, generate sources with `source_id > 0`

    prefix : str
        Prefix of new file to create (``prefix + metafile``)

    verbose : bool
        Print verbose output

    primary_sources : bool
        Include only primary sources

    Returns
    -------

    output_file : str
        Filename of new table

    """
    from astropy.table import Table
    import grizli.utils

    msa = MSAMetafile(metafile)

    if merge_first:
        msa.merge_source_ids(verbose=verbose, **kwargs)

    all_ids = np.unique(msa.shutter_table["source_id"])

    if source_ids is None:
        if slitlet_ids is not None:
            six = np.isin(msa.shutter_table["slitlet_id"], slitlet_ids)

            if six.sum() == 0:
                msg = f"msaexp.msa.pad_msa_metafile: {slitlet_ids} not found"
                msg += " in {metafile} slitlet_id"
                raise ValueError(msg)

            source_ids = np.unique(msa.shutter_table["source_id"][six])

            msg = "msaexp.msa.pad_msa_metafile: Trim slitlet_ids in "
            msg += f"{metafile} to "
            msg += f"{list(slitlet_ids)} (N={len(source_ids)} source_ids)\n"
            grizli.utils.log_comment(
                grizli.utils.LOGFILE, msg, verbose=True, show_date=True
            )

        else:
            if positive_ids:
                source_ids = all_ids[all_ids > 0]
            else:
                source_ids = all_ids[all_ids != 0]

            six = np.isin(msa.shutter_table["source_id"], source_ids)
            if primary_sources:
                six &= msa.shutter_table["primary_source"] == "Y"

            if six.sum() == 0:
                msg = f"msaexp.msa.pad_msa_metafile: {source_ids} not found"
                msg += f" in {metafile}.  Available ids are {list(all_ids)}"
                raise ValueError(msg)
    else:
        six = np.isin(msa.shutter_table["source_id"], source_ids)
        if six.sum() == 0:
            msg = f"msaexp.msa.pad_msa_metafile: {source_ids} not found"
            msg += f" in {metafile}.  Available ids are {list(all_ids)}"
            raise ValueError(msg)

    this_source_ids = np.unique(msa.shutter_table["source_id"][six])

    slitlets = np.unique(msa.shutter_table["slitlet_id"][six])
    im = pyfits.open(metafile)

    shut = Table(im["SHUTTER_INFO"].data)
    shut = shut[np.isin(shut["slitlet_id"], slitlets)]

    # Add a shutter on either side
    row = {}
    for k in shut.colnames:
        row[k] = shut[k][0]

    row["shutter_state"] = "CLOSED"
    row["background"] = "Y"
    row["estimated_source_in_shutter_x"] = np.nan
    row["estimated_source_in_shutter_y"] = np.nan
    row["primary_source"] = "N"

    new_rows = []

    # Add padding
    for src_id in this_source_ids:
        six = shut["source_id"] == src_id

        for mid in np.unique(shut["msa_metadata_id"][six]):
            mix = (shut["msa_metadata_id"] == mid) & (six)
            quad = shut["shutter_quadrant"][mix][0]
            slit_id = shut["slitlet_id"][mix][0]
            slix = shut["msa_metadata_id"] == mid
            slix &= shut["slitlet_id"] == slit_id

            shutters = np.unique(shut["shutter_column"][slix])
            for eid in np.unique(shut["dither_point_index"][slix]):
                for p in range(pad):
                    for s in [
                        shutters.min() - (p + 1),
                        shutters.max() + (p + 1),
                    ]:

                        row["msa_metadata_id"] = mid
                        row["dither_point_index"] = eid
                        row["shutter_column"] = s
                        row["shutter_quadrant"] = quad
                        row["slitlet_id"] = slit_id
                        row["source_id"] = src_id

                        new_row = {}
                        for k in row:
                            new_row[k] = row[k]

                        new_rows.append(new_row)

    for row in new_rows:
        shut.add_row(row)

    src = Table(im["SOURCE_INFO"].data)
    src = src[np.isin(src["source_id"], source_ids)]

    hdus = {
        "SHUTTER_INFO": pyfits.BinTableHDU(shut),
        "SOURCE_INFO": pyfits.BinTableHDU(src),
    }

    for e in hdus:
        for k in im[e].header:
            if k not in hdus[e].header:
                # print(e, k, im[e].header[k])
                hdus[e].header[k] = im[e].header[k]
        im[e] = hdus[e]

    output_file = prefix + os.path.basename(metafile)

    im.writeto(output_file, overwrite=True)
    im.close()

    if verbose:
        msg = (
            f"msaexp.msa.pad_msa_metafile: Trim source_id in {metafile} to "
        )
        msg += f"{list(this_source_ids)}\n"
        msg += f"msaexp.msa.pad_msa_metafile: pad = {pad}"
        grizli.utils.log_comment(
            grizli.utils.LOGFILE, msg, verbose=True, show_date=True
        )

    return output_file


def get_msa_source_ids(rate_file, merge_first=True):
    """
    Get source_ids from the MSAMETFL associated with an exposure

    Parameters
    ----------
    rate_file : str
        Exposure filename

    merge_first : bool
        Run `~msaexp.msa.MSAMetafile.merge_source_ids` before computing unique source
        ids.

    Returns
    -------
    source_ids : list
        List of source ids in the mask configuration

    """

    with pyfits.open(rate_file) as im:
        if im[0].header["EXP_TYPE"] != "NRS_MSASPEC":
            return []

        metfl = im[0].header["MSAMETFL"]
        msa = MSAMetafile(metfl)
        if merge_first:
            msa.merge_source_ids()

        conf = im[0].header["MSACONID"]
        msaid = im[0].header["MSAMETID"]
        expid = im[0].header["PATT_NUM"]

    shu = msa.shutter_table

    rows = shu["msa_metadata_id"] == msaid
    rows &= shu["dither_point_index"] == expid

    source_ids = np.unique(shu[rows]["source_id"]).tolist()

    return source_ids


[docs]def regions_from_metafile(metafile, **kwargs): """ Wrapper around `msaexp.msa.MSAMetafile.regions_from_metafile` Parameters ---------- metafile : str Name of a MSAMETFL metadata file kwargs : dict Keyword arguments are passed through to `~msaexp.msa.MSAMetafile.regions_from_metafile`. Returns ------- regions : str, list Output from `~msaexp.msa.MSAMetafile.regions_from_metafile` """ metf = MSAMetafile(metafile) regions = metf.regions_from_metafile(**kwargs) return regions
[docs]def regions_from_fits(file, **kwargs): """ Wrapper around `msaexp.msa.MSAMetafile.regions_from_metafile` Parameters ---------- file : str Exposure filename, e.g., `..._rate.fits`. The `dither_point_index` and `msa_metadata_id` will be determined from the file header kwargs : dict Keyword arguments are passed through to `~msaexp.msa.MSAMetafile.regions_from_metafile`. Returns ------- regions : str, list Output from `~msaexp.msa.MSAMetafile.regions_from_metafile` """ with pyfits.open(file) as im: metafile = im[0].header["MSAMETFL"] metaid = im[0].header["MSAMETID"] dither_point = im[0].header["PATT_NUM"] metf = MSAMetafile(metafile) regions = metf.regions_from_metafile( msa_metadata_id=metaid, dither_point_index=dither_point, **kwargs ) return regions
[docs]class MSAMetafile: query_product_level = ["2b"] def __init__(self, filename): """ Helper for parsing MSAMETFL metadata files Parameters ---------- filename : str Filename of an `_msa.fits` metadata file or a FITS file with a keyword `MSAMETFL` in the primary header, e.g., a `_rate.fits` file. Attributes ---------- filename : str Input filename metafile : str Filename of the MSAMETFL, either ``filename`` itself or derived from it shutter_table : `~astropy.table.Table` Table of shutter metadata src_table : `~astropy.table.Table` Table of source information mast : `~astropy.table.Table`, None Result of `~msaexp.msa.MSAMetafile.query_mast_exposures` Examples -------- .. plot:: :include-source: ### Make a plot with slitlets import numpy as np import matplotlib.pyplot as plt from msaexp import msa uri = 'https://mast.stsci.edu/api/v0.1/Download/file?uri=mast:JWST/product/' meta = msa.MSAMetafile(uri+'jw02756001001_01_msa.fits') fig, axes = plt.subplots(1, 3, figsize=(9, 2.6), sharex=True, sharey=True) cosd = np.cos(np.median(meta.src_table['dec'])/180*np.pi) # Show offset slitlets from three dithered exposures for i in [0,1,2]: ax = axes[i] ax.scatter(meta.src_table['ra'], meta.src_table['dec'], marker='.', color='k', alpha=0.5) slits = meta.regions_from_metafile(dither_point_index=i+1, as_string=False, with_bars=True) for s in slits: if s.meta['is_source']: if s.meta['source_id'] in [110003, 410044, 410045]: ax.text(s.meta['ra'] - 0.8/3600, s.meta['dec'], s.meta['source_id'], fontsize=7, ha='left', va='center') fc = '0.5' else: fc = 'pink' for patch in s.get_patch(fc=fc, ec='None', alpha=0.8, zorder=100): ax.add_patch(patch) ax.set_aspect(1./cosd) ax.set_xlim(3.5936537138517317, 3.588363444812261) ax.set_ylim(-30.39750646306242, -30.394291511397544) ax.grid() ax.set_title(f'Dither point #{i+1}') x0 = np.mean(ax.get_xlim()) ax.set_xticks(np.array([-5, 0, 5])/3600./cosd + x0) ax.set_xticklabels(['+5"', 'R.A.', '-5"']) y0 = np.mean(ax.get_ylim()) ax.set_yticks(np.array([-5, 0, 5])/3600. + y0) axes[0].set_yticklabels(['-5"', 'Dec.', '+5"']) axes[1].scatter(x0, y0, marker='x', c='b') axes[1].text(0.5, 0.45, f'({x0:.6f}, {y0:.6f})', ha='left', va='top', transform=axes[1].transAxes, fontsize=6, color='b') fig.tight_layout(pad=0.5) """ from astropy.table import Table self.filename = filename if filename.endswith("_msa.fits"): self.metafile = filename else: with pyfits.open(filename) as _im: if "MSAMETFL" not in _im[0].header: raise ValueError( f"{filename}[0].header does not have MSAMETFL keyword" ) else: self.metafile = _im[0].header["MSAMETFL"] with pyfits.open(self.metafile) as im: src = Table(im["SOURCE_INFO"].data) shut = Table(im["SHUTTER_INFO"].data) # Merge src and shutter tables shut_ix, src_ix = [], [] for i, sid in enumerate(shut["source_id"]): if sid == 0: continue elif sid in src["source_id"]: shut_ix.append(i) src_ix.append(np.where(src["source_id"] == sid)[0][0]) for c in ["ra", "dec", "program"]: shut[c] = src[c][0] * 0 shut[c][shut_ix] = src[c][src_ix] self.shutter_table = shut self.src_table = src self.mast = None @property def metadata_id_list(self): """ Returns ------- ids : list list of metadata_id from ``shutter_table`` """ ids = list(np.unique(self.shutter_table["msa_metadata_id"])) return ids @property def metadata_id_unique(self): """ Returns ------- un : `grizli.utils.Unique` Unique of metadata_id from ``shutter_table`` """ import grizli.utils return grizli.utils.Unique(self.shutter_table["msa_metadata_id"]) @property def key_pairs(self): """ List of unique ``msa_metadata_id, dither_point_index`` pairs from the ``shutter_table`` Returns ------- keys : list List of key pairs """ keys = [] for row in self.shutter_table: key = row["msa_metadata_id"], row["dither_point_index"] if key not in keys: keys.append(key) return keys @property def mast_key_pairs(self): """ List of unique ``msametid, exposure`` pairs from the ``mast`` metadata table Returns ------- keys : list List of key pairs """ mast = self.query_mast_exposures(force=False) keys = [] for row in mast: key = row["msametid"], row["patt_num"] if key not in keys: keys.append(key) return keys
[docs] def merge_source_ids(self, sort_count=-1, verbose=True, **kwargs): """ Merge cases where a particular slitlet_id has multiple source_ids Parameters ---------- sort_count : int Direction in which to prefer among the multiple ``source_id`` values sorted by their count. Returns ------- ``shutter_table`` attribute updated in place """ from grizli import utils shu = self.shutter_table conf_ids = np.unique(shu["msa_metadata_id"]) for msaid in conf_ids: rows = shu["msa_metadata_id"] == msaid row_ix = np.where(rows)[0] slits = utils.Unique(shu[rows]["slitlet_id"], verbose=False) rename_slits = {} for sl in slits.values: nexp = len( np.unique(shu[rows][slits[sl]]["dither_point_index"]) ) nrows = slits[sl].sum() source_ids = utils.Unique( shu[rows][slits[sl]]["source_id"], verbose=False ) test = np.array(source_ids.values) > 0 # All individual exposures or all rows test &= (source_ids.counts != nexp) & ( source_ids.counts != nrows ) if test.sum() > 0: if sort_count is not None: so = np.argsort(source_ids.counts)[::sort_count] else: so = range(source_ids.N) id_sorted = [] for j in so: if source_ids.values[j] > 0: id_sorted.append(source_ids.values[j]) if len(id_sorted) > 0: rename_slits[sl] = id_sorted for sl in rename_slits: msg = f"{self.filename} {msaid:>3} : " msg += ( f"merge source_ids for slitlet {sl:>3} {rename_slits[sl]}" ) utils.log_comment(utils.LOGFILE, msg, verbose=verbose) shu["source_id"][row_ix[slits[sl]]] = rename_slits[sl][0] return None
[docs] def query_mast_exposures(self, force=False, **kwargs): """ Query MAST database for exposures for this MSA file Parameters ---------- force : bool Running the query with this method stores the result in the `mast` attribute. If the attribute is `None` or if ``force==True`` then run/redo the query. Returns ------- mast : `~astropy.table.Table` Query results from `mastquery.jwst.query_jwst`. """ from mastquery.jwst import make_query_filter, query_jwst if (self.mast is not None) & (not force): return self.mast filters = [] filters += make_query_filter( "grating", values=[ "PRISM", "G140M", "G235M", "G395M", "G140H", "G235H", "G395H", ], ) filters += make_query_filter("effexptm", range=[30, 5.0e5]) filters += make_query_filter( "productLevel", values=self.query_product_level ) filters += make_query_filter( "apername", values=[ "NRS_FULL_MSA", "NRS_FIELD1_MSA4", "NRS_FIELD2_MSA4", "NRS_FULL_MSA1", ], ) filters += make_query_filter( "category", values=["COM", "DD", "ERS", "GTO", "GO", "CAL"] ) filters += make_query_filter("detector", values=["NRS1"]) filters += make_query_filter( "msametfl", values=[os.path.basename(self.filename)] ) mast = query_jwst( instrument="NRS", filters=filters, columns="*", rates_and_cals=True, extensions=["rate", "cal"], ) self.mast = mast return mast
[docs] def get_transforms( self, dither_point_index=None, msa_metadata_id=None, fit_degree=2, verbose=False, **kwargs, ): """ Fit for `~astropy.modeling.models.Polynomial2D` transforms between slit ``(row, col)`` and ``(ra, dec)``. Parameters ---------- dither_point_index : int, None Dither index in ``shutter_table`` msa_metadata_id : int, None Metadata id in ``shutter_table`` fit_degree : int Polynomial degree verbose : bool Print status messages Returns ------- dither_match : bool array Boolean mask of ``shutter_table`` matching ``dither_point_index`` meta_match : bool array Boolean mask of ``shutter_table`` matching ``msa_metadata_id`` coeffs : dict `~astropy.modeling.models.Polynomial2D` transformations to sky coordinates in each of 4 MSA quadrants >>> quadrant = 1 >>> pra, pdec = coeffs[quadrant] >>> ra = pra(shutter_row, shutter_column) >>> dec = pdec(shutter_row, shutter_column) inv_coeffs : dict Inverse `~astropy.modeling.models.Polynomial2D` transformations from sky to shutters: >>> quadrant = 1 >>> prow, pcol = inv_coeffs[quadrant] >>> shutter_row = prow(ra, dec) >>> shutter_column = pcol(ra, dec) """ from astropy.modeling.models import Polynomial2D from astropy.modeling.fitting import LinearLSQFitter p2 = Polynomial2D(degree=fit_degree) fitter = LinearLSQFitter() _shut = self.shutter_table if dither_point_index is None: dither_point_index = _shut["dither_point_index"].min() if msa_metadata_id is None: msa_metadata_id = _shut["msa_metadata_id"].min() dither_match = _shut["dither_point_index"] == dither_point_index meta_match = _shut["msa_metadata_id"] == msa_metadata_id exp = dither_match & meta_match has_offset = np.isfinite(_shut["estimated_source_in_shutter_x"]) has_offset &= np.isfinite(_shut["estimated_source_in_shutter_y"]) is_src = (_shut["source_id"] > 0) & (has_offset) si = _shut[exp & is_src] if len(si) == 0: is_src = has_offset si = _shut[exp & is_src] # Fit for transformations coeffs = {} inv_coeffs = {} if verbose: output = f"# msametfl = {self.metafile}\n" output += f"# dither_point_index = {dither_point_index}\n" output += f"# msa_metadata_id = {msa_metadata_id}" print(output) for qi in np.unique(si["shutter_quadrant"]): q = si["shutter_quadrant"] == qi # print(qi, q.sum()) row = si["shutter_row"] + ( si["estimated_source_in_shutter_x"] - 0.5 ) col = si["shutter_column"] + ( si["estimated_source_in_shutter_y"] - 0.5 ) pra = fitter(p2, row[q], col[q], si["ra"][q]) pdec = fitter(p2, row[q], col[q], si["dec"][q]) # RMS of the fit xra = pra(row[q], col[q]) xdec = pdec(row[q], col[q]) dra = ( (si["ra"][q] - xra) * np.cos(si["dec"][q] / 180 * np.pi) * 3600 * 1000 ) dde = (si["dec"][q] - xdec) * 3600 * 1000 pra.rms = np.std(dra) pdec.rms = np.std(dde) pra.N = q.sum() if verbose: print( f"# Q{qi} N={q.sum()} " + f"rms= {pra.rms:.1f}, {pdec.rms:.1f} mas" ) coeffs[qi] = pra, pdec prow = fitter(p2, si["ra"][q], si["dec"][q], row[q]) pcol = fitter(p2, si["ra"][q], si["dec"][q], col[q]) inv_coeffs[qi] = prow, pcol return dither_match, meta_match, coeffs, inv_coeffs
[docs] def regions_from_metafile(self, as_string=False, with_bars=True, **kwargs): """ Get slit footprints in sky coords Parameters ---------- as_string : bool Return regions as DS9 region strings with_bars : bool Account for bar vignetting kwargs : dict Keyword arguments passed to `msaexp.msa.MSAMetafile.get_transforms` Returns ------- String or a list of `grizli.utils.SRegion` objects, depending on ``as_string`` Examples -------- .. code-block:: python :dedent: >>> from msaexp import msa >>> uri = 'https://mast.stsci.edu/api/v0.1/Download/file?uri=mast:JWST/product/' >>> meta = msa.MSAMetafile(uri+'jw02756001001_01_msa.fits') >>> regs = meta.regions_from_metafile(as_string=True, with_bars=True) >>> print(regs) # msametfl = https://mast.stsci.edu/api/v0.1/Download/file?uri=mast:JWST/product/jw02756001001_01_msa.fits # dither_point_index = 1 # msa_metadata_id = 1 # Q1 N=13 rms= 0.7, 0.4 mas # Q2 N=29 rms= 1.3, 1.3 mas # Q3 N=11 rms= 1.3, 0.3 mas # Q4 N=27 rms= 1.2, 0.7 mas icrs polygon(3.623046,-30.427251,3.622983,-30.427262,3.622951,-30.427136,3.623014,-30.427125) # color=lightblue circle(3.6229814, -30.4270337, 0.2") # color=cyan text={160159} polygon(3.623009,-30.427106,3.622946,-30.427117,3.622915,-30.426991,3.622978,-30.426980) # color=cyan polygon(3.622973,-30.426960,3.622910,-30.426971,3.622878,-30.426845,3.622941,-30.426834) # color=lightblue polygon(3.613902,-30.392060,3.613840,-30.392071,3.613809,-30.391948,3.613871,-30.391936) # color=lightblue circle(3.6137989, -30.3918859, 0.2") # color=cyan text={160321} polygon(3.613867,-30.391918,3.613804,-30.391929,3.613774,-30.391805,3.613836,-30.391794) # color=cyan polygon(3.613831,-30.391775,3.613769,-30.391786,3.613738,-30.391663,3.613800,-30.391652) # color=lightblue polygon(3.610960,-30.384123,3.610897,-30.384134,3.610867,-30.384011,3.610929,-30.384000) # color=lightblue ... """ import grizli.utils dith, metid, coeffs, inv_coeffs = self.get_transforms(**kwargs) exp = dith & metid _shut = self.shutter_table has_offset = np.isfinite(_shut["estimated_source_in_shutter_x"]) has_offset &= np.isfinite(_shut["estimated_source_in_shutter_y"]) # Regions for a particular exposure se = _shut[exp] sx = (np.array([-0.5, 0.5, 0.5, -0.5])) * ( 1 - 0.07 / 0.27 * with_bars / 2 ) sy = (np.array([-0.5, -0.5, 0.5, 0.5])) * ( 1 - 0.07 / 0.53 * with_bars / 2 ) row = se["shutter_row"] col = se["shutter_column"] regions = [] for i in range(len(se)): if se["shutter_quadrant"][i] not in coeffs: continue pra, pdec = coeffs[se["shutter_quadrant"][i]] sra = pra(row[i] + sx, col[i] + sy) sdec = pdec(row[i] + sx, col[i] + sy) sr = grizli.utils.SRegion(np.array([sra, sdec]), wrap=False) sr.meta = {} for k in [ "program", "source_id", "ra", "dec", "slitlet_id", "shutter_quadrant", "shutter_row", "shutter_column", "estimated_source_in_shutter_x", "estimated_source_in_shutter_y", ]: sr.meta[k] = se[k][i] sr.meta["is_source"] = np.isfinite( se["estimated_source_in_shutter_x"][i] ) if sr.meta["is_source"]: sr.ds9_properties = "color=cyan" else: sr.ds9_properties = "color=lightblue" regions.append(sr) if as_string: output = f"# msametfl = {self.metafile}\n" di = _shut["dither_point_index"][dith][0] output += f"# dither_point_index = {di}\n" mi = _shut["msa_metadata_id"][metid][0] output += f"# msa_metadata_id = {mi}\n" for qi in coeffs: pra, pdec = coeffs[qi] output += f"# Q{qi} N={pra.N} " output += f"rms= {pra.rms:.1f}, {pdec.rms:.1f} mas\n" output += "icrs\n" for sr in regions: m = sr.meta if m["is_source"]: output += f"circle({m['ra']:.7f}, {m['dec']:.7f}, 0.2\")" output += f" # color=cyan text=xx{m['source_id']}yy\n" for r in sr.region: output += r + "\n" output = output.replace("xx", "{").replace("yy", "}") else: output = regions return output
[docs] def plot_slitlet( self, source_id=110003, dither_point_index=1, msa_metadata_id=None, cutout_size=1.5, step=None, rgb_filters=None, rgb_scale=5, rgb_invert=False, figsize=(4, 4), ax=None, add_labels=True, set_axis_labels=True, ): """ Make a plot showing a slitlet Parameters ---------- source_id : int Source id, must be in ``src_table`` dither_point_index : int Dither to show msa_metadata_id : int Optional specified ``msa_metadata_id`` in ``shutter_table`` cutout_size : float Cutout half-width, arcsec step : int Place to mark axis labels, defaults to ``floor(cutout_size)`` rgb_filters : list, None List of filters to use for an RGB cutout. Will be grayscale if just one item specified. rgb_scale : float Scaling of the image thumbnail if ``rgb_filters`` specified rgb_invert : bool Invert color map if ``rgb_filters`` specified figsize : tuple Size if generating a new figure ax : `~matplotlib.axes._subplots.AxesSubplot`, None Plot axis add_labels : bool Add plot labels set_axis_labels : bool Set axis labels Returns ------- fig : `matplotlib.figure.Figure` Figure object if generating a new figure, None otherwise ax : `~matplotlib.axes._subplots.AxesSubplot` Plot axes Examples -------- .. plot:: :include-source: # Simple figure import matplotlib.pyplot as plt from msaexp import msa uri = 'https://mast.stsci.edu/api/v0.1/Download/file?uri=mast:JWST/product/' meta = msa.MSAMetafile(uri+'jw02756001001_01_msa.fits') fig, ax = plt.subplots(1,1,figsize=(8,8)) _ = meta.plot_slitlet(source_id=110003, cutout_size=12.5, rgb_filters=None, ax=ax) fig.tight_layout(pad=1.0) fig.show() .. plot:: :include-source: # With RGB cutout import matplotlib.pyplot as plt from msaexp import msa uri = 'https://mast.stsci.edu/api/v0.1/Download/file?uri=mast:JWST/product/' meta = msa.MSAMetafile(uri+'jw02756001001_01_msa.fits') fig, ax = plt.subplots(1,1,figsize=(4,4)) filters = ['f200w-clear','f150w-clear','f115w-clear'] _ = meta.plot_slitlet(source_id=110003, cutout_size=1.5, rgb_filters=filters, ax=ax) fig.tight_layout(pad=1.0) fig.show() .. plot:: :include-source: # With grayscale cutout import matplotlib.pyplot as plt from msaexp import msa uri = 'https://mast.stsci.edu/api/v0.1/Download/file?uri=mast:JWST/product/' meta = msa.MSAMetafile(uri+'jw02756001001_01_msa.fits') fig, ax = plt.subplots(1,1,figsize=(4,4)) filters = ['f160w'] _ = meta.plot_slitlet(source_id=110003, cutout_size=1.5, rgb_filters=filters, ax=ax, rgb_invert=True) fig.tight_layout(pad=1.0) fig.show() """ import numpy as np import matplotlib.pyplot as plt import PIL from urllib.request import urlopen if (rgb_filters is not None) & (cutout_size > 20): raise ValueError('Maximum size of 20" with the thumbnail') if source_id not in self.src_table["source_id"]: print(f"{source_id} not in src_table for {self.metafile}") return None ix = np.where(self.src_table["source_id"] == source_id)[0][0] ra = self.src_table["ra"][ix] dec = self.src_table["dec"][ix] cosd = np.cos(dec / 180 * np.pi) if ax is None: fig, ax = plt.subplots(1, 1, figsize=figsize) else: fig = None # cutout_size = 2.5 # arcsec if rgb_filters is not None: url = ( f"https://grizli-cutout.herokuapp.com/thumb?coords={ra},{dec}" ) url += "&filters=" + ",".join(rgb_filters) url += f"&size={cutout_size}&scl={rgb_scale}&invert={rgb_invert}" # url = cutout.format(ra=ra, dec=dec, cutout_size=cutout_size) if rgb_invert: src_color = "k" else: src_color = "w" try: rgb = np.array(PIL.Image.open(urlopen(url))) # rgb = np.roll(np.roll(rgb, 2, axis=0), -2, axis=1) pscale = np.round(2 * cutout_size / rgb.shape[0] / 0.05) * 0.05 thumb_size = rgb.shape[0] / 2.0 * pscale extent = ( ra + thumb_size / 3600 / cosd, ra - thumb_size / 3600.0 / cosd, dec - thumb_size / 3600.0, dec + thumb_size / 3600.0, ) ax.imshow( np.flip(rgb, axis=0), origin="lower", extent=extent, interpolation="Nearest", ) except: src_color = "k" else: extent = ( ra + cutout_size / 3600 / cosd, ra - cutout_size / 3600.0 / cosd, dec - cutout_size / 3600.0, dec + cutout_size / 3600.0, ) ax.set_xlim(extent[:2]) ax.set_ylim(extent[2:]) src_color = "k" # ax.set_aspect(cosd) ax.scatter( self.src_table["ra"], self.src_table["dec"], marker="o", fc="None", ec=src_color, alpha=0.5, ) slits = self.regions_from_metafile( dither_point_index=dither_point_index, as_string=False, with_bars=True, msa_metadata_id=msa_metadata_id, ) for s in slits: if s.meta["is_source"]: kws = dict(color=src_color, alpha=0.8, zorder=100) else: kws = dict(color="0.7", alpha=0.8, zorder=100) ax.plot(*np.vstack([s.xy[0], s.xy[0][:1, :]]).T, **kws) if step is None: step = int(np.floor(cutout_size)) xt = np.array([-step, 0, step]) / 3600.0 / cosd + ra yt = np.array([-step, 0, step]) / 3600.0 + dec ax.set_xticks(xt) ax.set_yticks(yt) if set_axis_labels: ax.set_yticklabels([f'-{step}"', "Dec.", f'+{step}"']) ax.set_xticklabels([f'+{step}"', "R.A.", f'-{step}"']) ax.set_xlim(ra + np.array([cutout_size, -cutout_size]) / 3600.0 / cosd) ax.set_ylim(dec + np.array([-cutout_size, cutout_size]) / 3600.0) ax.set_aspect(1.0 / cosd) ax.grid() if add_labels: ax.text( 0.03, 0.07, f"Dither #{dither_point_index}", ha="left", va="bottom", transform=ax.transAxes, color=src_color, fontsize=8, ) ax.text( 0.03, 0.03, f"{os.path.basename(self.metafile)}", ha="left", va="bottom", transform=ax.transAxes, color=src_color, fontsize=8, ) ax.text( 0.97, 0.07, f"{source_id}", ha="right", va="bottom", transform=ax.transAxes, color=src_color, fontsize=8, ) ax.text( 0.97, 0.03, f"({ra:.6f}, {dec:.6f})", ha="right", va="bottom", transform=ax.transAxes, color=src_color, fontsize=8, ) if fig is not None: fig.tight_layout(pad=1) return fig, ax
[docs] def make_summary_table( self, msa_metadata_id=None, image_path="slit_images", write_tables=True, **kwargs, ): """ Make a summary table for all sources in the mask Parameters ---------- msa_metadata_id : int, None Metadata id in ``shutter_table`` image_path : str Path for slitlet thumbnail images with filename derived from `self.metafile`. write_tables : bool Write FITS and HTML versions of the summary table kwargs : dict Arguments passed to `~msaexp.msa.MSAMetafile.plot_slitlet` if ``image_path`` specified Returns ------- tab : `~astropy.table.Table` Summary table with slit information. Examples -------- .. code-block:: python :dedent: >>> from msaexp import msa >>> uri = 'https://mast.stsci.edu/api/v0.1/Download/file?uri=mast:JWST/product/' >>> meta = msa.MSAMetafile(uri+'jw02756001001_01_msa.fits') >>> res = meta.make_summary_table(msa_metadata_id=None, image_path=None, write_tables=False) >>> print(res[-10:]) source_id ra dec nexp Exp1 Exp2 Exp3 --------- -------- ---------- ---- ---- ---- ---- 320023 3.610363 -30.414991 3 -o- o-- --o 320029 3.557964 -30.426137 3 -o- o-- --o 320035 3.616975 -30.419344 3 -o- o-- --o 340975 3.576616 -30.401801 3 --- --- --- 410005 3.604646 -30.392461 3 --- --- --- 410044 3.592863 -30.396336 3 -o- o-- --o 410045 3.592619 -30.397096 3 -o- o-- --o 410067 3.571049 -30.388132 3 -o- o-- --o 500002 3.589697 -30.398156 2 --o -o- 500003 3.591399 -30.401982 3 -o- o-- --o """ from tqdm import tqdm import matplotlib.pyplot as plt import grizli.utils if msa_metadata_id is None: msa_metadata_id = self.shutter_table["msa_metadata_id"].min() _mset = self.shutter_table["msa_metadata_id"] == msa_metadata_id shut = self.shutter_table[_mset] sources = grizli.utils.Unique(shut["source_id"], verbose=False) root = os.path.basename(self.metafile).split("_msa.fits")[0] tab = grizli.utils.GTable() tab["source_id"] = sources.values tab["ra"] = -1.0 tab["dec"] = -1.0 tab["ra"][sources.indices] = shut["ra"] tab["dec"][sources.indices] = shut["dec"] tab["ra"].format = ".6f" tab["dec"].format = ".6f" tab["ra"].description = "Target R.A. (degrees)" tab["dec"].description = "Target Dec. (degrees)" tab.meta["root"] = root tab.meta["msa_metadata_id"] = msa_metadata_id exps = np.unique(shut["dither_point_index"]) tab["nexp"] = 0 for exp in exps: exp_ids = shut["source_id"][shut["dither_point_index"] == exp] tab["nexp"] += np.isin(tab["source_id"], exp_ids) slitlets = [] for s in tab["source_id"]: if s in exp_ids: ix = sources[s] & (shut["dither_point_index"] == exp) so = np.argsort( shut["shutter_column", "primary_source"][ix] ) ss = "" # ss = f'{ix.sum()} ' for p in shut["primary_source"][ix][so]: if p == "Y": ss += "o" else: ss += "-" slitlets.append(ss) else: slitlets.append("") tab[f"Exp{exp}"] = slitlets mroot = f"{root}_{msa_metadata_id}" if image_path is not None: slit_images = [] if not os.path.exists(image_path): os.makedirs(image_path) print(f"Make {len(tab)} slit thumbnail images:") for src, ra in tqdm(zip(tab["source_id"], tab["ra"])): slit_image = os.path.join( image_path, f"{mroot}_{src}_slit.png" ) slit_image = slit_image.replace("_-", "_m") slit_images.append(slit_image) if os.path.exists(slit_image): continue elif ra <= 0.00001: continue # Make slit cutout dith = shut["dither_point_index"][sources[src]].min() fig, ax = self.plot_slitlet( source_id=src, dither_point_index=dith, msa_metadata_id=msa_metadata_id, **kwargs, ) fig.savefig(slit_image) plt.close(fig) if 0: kwargs = dict( cutout_size=1.5, step=None, rgb_filters=[ "f200w-clear", "f150w-clear", "f115w-clear", ], rgb_scale=5, rgb_invert=False, figsize=(4, 4), ax=None, add_labels=True, set_axis_labels=True, ) tab["thumb"] = [ f'<img src="{im}" height=200px />' for im in slit_images ] if write_tables: tab.write_sortable_html( mroot + "_slits.html", max_lines=5000, filter_columns=["ra", "dec", "source_id"], localhost=False, use_json=False, ) tab.write(mroot + "_slits.fits", overwrite=True) return tab
[docs] def get_siaf_transforms( self, prefix=( "https://github.com/spacetelescope/pysiaf/raw/" + "master/pysiaf/source_data/NIRSpec/delivery/" + "test_data/apertures_testData/" ), check_rms=True, ): """ Read shutter (i,j) > (v2,v3) transformations from the files at https://github.com/spacetelescope/pysiaf/tree/master/pysiaf/source_data/NIRSpec/delivery/test_data/apertures_testData Parameters ---------- prefix : str, optional URL prefix for the files containing the transformations. check_rms : bool, optional If True, calculate and print the root mean square (RMS) of the transformations. Returns ------- transforms : dict A dictionary containing the (i,j) > (v2,v3) transformations for each quadrant of the MSA. """ from astropy.modeling.models import Polynomial2D from astropy.modeling.fitting import LinearLSQFitter import grizli.utils poly = Polynomial2D(degree=3) transforms = {} for quadrant in [1, 2, 3, 4]: ref_file = os.path.join( prefix, f"sky_fpa_projectionMSA_Q{quadrant}.fits" ) fpa = grizli.utils.read_catalog(ref_file) # i, j transformations ij_to_v2 = LinearLSQFitter()( poly, fpa["I"] * 1.0, fpa["J"] * 1.0, fpa["XPOSSKY"] * 3600 ) ij_to_v3 = LinearLSQFitter()( poly, fpa["I"] * 1.0, fpa["J"] * 1.0, fpa["YPOSSKY"] * 3600 ) transforms[quadrant] = (ij_to_v2, ij_to_v3) if check_rms: ijx = ij_to_v2(fpa["I"] * 1.0, fpa["J"] * 1.0) ijy = ij_to_v3(fpa["I"] * 1.0, fpa["J"] * 1.0) stdx = np.std(ijx - fpa["XPOSSKY"] * 3600) stdy = np.std(ijy - fpa["YPOSSKY"] * 3600) print(f"Quadrant {quadrant} rms = {stdx:.2e} {stdy:.2e}") return transforms
[docs] def get_exposure_info( self, msa_metadata_id=1, dither_point_index=1, **kwargs ): """ Get MAST keywords for a particular exposure Parameters ---------- msa_metadata_id, dither_point_index : int Exposure definition Returns ------- row : `~astropy.table.row.Row` Row of the ``mast`` info table for a particular exposure """ mast = self.query_mast_exposures(**kwargs) ix = mast["msametfl"] == os.path.basename(self.filename) ix &= mast["msametid"] == msa_metadata_id ix &= mast["patt_num"] == dither_point_index if ix.sum() == 0: msg = f"msametid = {msa_metadata_id}, " msg += f"exposure = {dither_point_index}" msg += " not found in MAST table" raise ValueError(msg) row = mast[ix][0] return row
[docs] def get_siaf_aperture( self, msa_metadata_id=1, dither_point_index=1, pa_offset=-0.1124, ra_ref=None, dec_ref=None, roll_ref=None, use_ref_columns=True, apername="NRS_FULL_MSA", **kwargs, ): """ Generate a `pysiaf` aperture object based on pointing information Parameters ---------- msa_metadata_id, dither_point_index : int Exposure definition pa_offset : float Empirical offset added to ``gs_v3_pa`` from the MAST query to match ``ROLL_REF`` in the science headers ra_ref, dec_ref, roll_ref : None, float Specify a reference parameters aperture attitude, e.g., taken from the ``ROLL_REF`` science header keywords use_ref_columns : bool Use "ref" columns in `mast` metadata table if found, e.g., generated from `~msaexp.msa.MSAMetafile.fit_mast_pointing_offset` Returns ------- ra, dec, roll : float The V2/V3 reference ra, dec, roll used for the aperture attitude ap : `pysiaf.aperture.NirspecAperture` Aperture object with attitude set based on database pointing keywords and with various coordinate transformation methods """ import pysiaf from pysiaf.utils import rotations # Define SIAF aperture # --------------------- instrument = "NIRSPEC" siaf = pysiaf.siaf.Siaf(instrument) ap = siaf[apername] # Input is fully specified # ------------------------- if ( (ra_ref is not None) & (dec_ref is not None) & (roll_ref is not None) ): att = rotations.attitude( ap.V2Ref, ap.V3Ref, ra_ref, dec_ref, roll_ref ) ap.set_attitude_matrix(att) return ra_ref, dec_ref, roll_ref, ap # Get pointing information from MAST query # ----------------------------------------- row = self.get_exposure_info( msa_metadata_id=msa_metadata_id, dither_point_index=dither_point_index, **kwargs, ) if ("roll_ref" in row.colnames) & use_ref_columns: ra_ref = row["ra_ref"] dec_ref = row["dec_ref"] roll_ref = row["roll_ref"] att = rotations.attitude( ap.V2Ref, ap.V3Ref, ra_ref, dec_ref, roll_ref ) ap.set_attitude_matrix(att) return ra_ref, dec_ref, roll_ref, ap if roll_ref is None: roll = row["gs_v3_pa"] + pa_offset else: roll = roll_ref att = rotations.attitude( ap.V2Ref, ap.V3Ref, row["targ_ra"], row["targ_dec"], roll ) ap.set_attitude_matrix(att) # Apply xoffset, yoffset defined in Ideal (idl) coordinates # ---------------------------------------------------------- tel = ap.idl_to_tel(-row["xoffset"], -row["yoffset"]) offset_rd = ap.tel_to_sky(*tel) # Recompute aperture with offset # ------------------------------- siaf = pysiaf.siaf.Siaf(instrument) ap = siaf["NRS_FULL_MSA"] att = rotations.attitude(ap.V2Ref, ap.V3Ref, *offset_rd, roll) ap.set_attitude_matrix(att) return *offset_rd, roll, ap
[docs] def fit_mast_pointing_offset( self, iterations=3, with_ransac=True, verbose=True, apply=True, init_ref_columns=False, **kwargs, ): """ Fit for offsets to the pointing attitude derived from the MAST metadata Parameters ---------- iterations : int Number of fitting iterations with_ransac : bool Refine transformation with `skimage.measure` inliers verbose : bool Print messages apply : bool Add updated pointing parameters to `ref` columns in `mast` metadata init_ref_columns : bool Initialize using "ref" columns in the MAST table Returns ------- res : `~astropy.table.Table` Table summarizing fit results """ import skimage.transform from skimage.measure import ransac import grizli.utils transform = skimage.transform.EuclideanTransform coeffs = load_siaf_shutter_transforms() _shut = self.shutter_table has_offset = np.isfinite(_shut["estimated_source_in_shutter_x"]) has_offset &= np.isfinite(_shut["estimated_source_in_shutter_y"]) is_src = (_shut["source_id"] > 0) & (has_offset) rows = [] msg = "{0} {1:>.0f} {2:>.0f} {2:.6f} {3:.6f} {4:.3f} {5:>5.0f} " msg += "{6:.6f} {7:.6f} {8:.3f}" for key in self.mast_key_pairs: msa_metadata_id, dither_point_index = key _ = self.get_siaf_aperture( msa_metadata_id=msa_metadata_id, dither_point_index=dither_point_index, pa_offset=0.0, use_ref_columns=init_ref_columns, **kwargs, ) _ra, _dec, _roll, ap = _ xrow = [ msa_metadata_id, dither_point_index, _ra * 1.0, _dec * 1, _roll * 1, ] exp = self.shutter_table["msa_metadata_id"] == msa_metadata_id exp &= ( self.shutter_table["dither_point_index"] == dither_point_index ) # _shut[is_src]['msa_metadata_id','dither_point_index'] Nsrc = (exp & is_src).sum() if Nsrc < 4: xrow += [Nsrc, np.nan, np.nan, np.nan] rows.append(xrow) if (verbose & 4) > 0: print(msg.format(*xrow)) continue se = _shut[exp & is_src] row = se["shutter_row"] + ( se["estimated_source_in_shutter_x"] - 0.5 ) col = se["shutter_column"] + ( se["estimated_source_in_shutter_y"] - 0.5 ) ra, dec = se["ra"], se["dec"] # Shutter to V2,V3 v2 = [] v3 = [] ind = [] for i in range(len(se)): if se["shutter_quadrant"][i] not in coeffs: v2.append(0) v3.append(0) continue ij_to_v2, ij_to_v3 = coeffs[se["shutter_quadrant"][i]] v2.append(ij_to_v2(row[i], col[i])) v3.append(ij_to_v3(row[i], col[i])) ind.append(i) ######### # Fit for the transform # APT target coordinates input = np.array([[ra[i], dec[i]] for i in ind]) x0 = np.mean(input, axis=0) cosd = np.array( [[np.cos(inp[1] / 180 * np.pi), 1] for inp in input] ) init_output = np.array([ap.tel_to_sky(v2[i], v3[i]) for i in ind]) for _iter in range(iterations): output = np.array([ap.tel_to_sky(v2[i], v3[i]) for i in ind]) tf = transform() tf.estimate((output - x0) * cosd, (input - x0) * cosd) if with_ransac: # Fit with inliers / outliers tf, inliers = ransac( [(output - x0) * cosd, (input - x0) * cosd], transform, min_samples=3, residual_threshold=3, max_trials=100, ) _ = self.get_siaf_aperture( msa_metadata_id=msa_metadata_id, dither_point_index=dither_point_index, ra_ref=_ra + tf.translation[0] / cosd[0][0], dec_ref=_dec + tf.translation[1], roll_ref=_roll - tf.rotation / np.pi * 180, **kwargs, ) _ra, _dec, _roll, ap = _ if (verbose & 2) > 0: resid_in = (init_output - x0) * cosd - (input - x0) * cosd resid_out = (output - x0) * cosd - (input - x0) * cosd print( "rms: in ({0:.2f} {1:.2f}) out ({2:.2f}, {3:.2f}) mas".format( *np.std(resid_in, axis=0) * 3600.0 * 1000, *np.std(resid_out, axis=0) * 3600 * 1000.0, ) ) xrow += [Nsrc, _ra * 1, _dec * 1, _roll * 1] if (verbose & 4) > 0: print(msg.format(*xrow)) rows.append(xrow) res = grizli.utils.GTable( rows=rows, names=[ "id", "dith", "ra0", "dec0", "roll0", "Nsrc", "ra", "dec", "roll", ], ) cosd = np.cos(res["dec0"] / 180 * np.pi) res["dra"] = (res["ra"] - res["ra0"]) * cosd * 3600 res["ddec"] = (res["dec"] - res["dec0"]) * 3600 res["droll"] = res["roll"] - res["roll0"] if apply & (res["Nsrc"].max() > 4): dra = np.nanmedian(res["dra"] / 3600 / cosd) dde = np.nanmedian(res["ddec"] / 3600) dro = np.nanmedian(res["droll"]) res["ra_ref"] = res["ra0"] + dra res["dec_ref"] = res["dec0"] + dde res["roll_ref"] = res["roll0"] + dro if not init_ref_columns: self.mast["ra_ref"] = 0.0 self.mast["dec_ref"] = 0.0 self.mast["roll_ref"] = 0.0 for i, (id, dith) in enumerate( zip(self.mast["msametid"], self.mast["patt_num"]) ): _ = self.get_siaf_aperture( msa_metadata_id=id, dither_point_index=dith, pa_offset=0.0, use_ref_columns=init_ref_columns, **kwargs, ) _ra, _dec, _roll, ap = _ # print('xxx', _ra, _dec, _roll) ix = np.where( (self.mast["msametid"] == id) & (self.mast["patt_num"] == dith) )[0][0] self.mast["ra_ref"][ix] = _ra + dra self.mast["dec_ref"][ix] = _dec + dde self.mast["roll_ref"][ix] = _roll + dro # for c in ['ra_ref','dec_ref','roll_ref']: # self.mast[c][ix] = res[c][i] if verbose: print( f"Apply offset to metadata: translation= {dra*3600:6.3f}" + f" {dde*3600:6.3f} rotation= {dro:>.4f}" ) return res
[docs] def regions_from_metafile_siaf( self, as_string=True, with_bars=True, msa_metadata_id=1, dither_point_index=1, shutter_table=None, meta_keys=[ "program", "pi_name", "proptarg", "filename", "filter", "grating", "expstart", "effexptm", "nod_type", "final_0x0_EOSA", ], source_color="cyan", other_color="lightblue", verbose=False, **kwargs, ): """ MSA shutter regions using pointing info and SIAF shutter transformations Parameters ---------- as_string : bool Return regions as DS9 region strings with_bars : bool Account for bar vignetting msa_metadata_id, dither_point_index : int Exposure definition meta_keys : list of str, optional List of metadata keys to include in the output verbose : bool, optional Print verbose messages Returns ------- String or a list of `grizli.utils.SRegion` objects, depending on ``as_string`` """ import grizli.utils # Exposure metadata meta = self.get_exposure_info( msa_metadata_id=msa_metadata_id, dither_point_index=dither_point_index, **kwargs, ) if verbose: msg = "Generate regions from {msametfl} id = {msametid:>3}" msg += " exposure = {exposure}" msg += " (filename = {filename})" print(msg.format(**meta)) # Slitlet transforms (i,j) > (v2,v3) # ---------------------------------- coeffs = load_siaf_shutter_transforms() # Aperture with pointing information # ----------------------------------- _ra, _dec, _roll, ap = self.get_siaf_aperture( msa_metadata_id=msa_metadata_id, dither_point_index=dither_point_index, **kwargs, ) # Which exposure? # ---------------- exp = self.shutter_table["msa_metadata_id"] == msa_metadata_id exp &= self.shutter_table["dither_point_index"] == dither_point_index _shut = self.shutter_table has_offset = np.isfinite(_shut["estimated_source_in_shutter_x"]) has_offset &= np.isfinite(_shut["estimated_source_in_shutter_y"]) # Regions for a particular exposure if shutter_table is None: se = _shut[exp] else: se = shutter_table sx = (np.array([-0.5, 0.5, 0.5, -0.5])) * ( 1 - 0.07 / 0.27 * with_bars / 2 ) sy = (np.array([-0.5, -0.5, 0.5, 0.5])) * ( 1 - 0.07 / 0.53 * with_bars / 2 ) row = se["shutter_row"] col = se["shutter_column"] regions = [] for i in range(len(se)): if se["shutter_quadrant"][i] not in coeffs: continue ij_to_v2, ij_to_v3 = coeffs[se["shutter_quadrant"][i]] v2 = ij_to_v2(row[i] + sx, col[i] + sy) v3 = ij_to_v3(row[i] + sx, col[i] + sy) sra, sdec = ap.tel_to_sky(v2, v3) if 0: ij_to_v2, ij_to_v3 = coeffs[se["shutter_quadrant"][i]] v2_ = ij_to_v2(row[i] + sx, col[i] + sy - 1) v3_ = ij_to_v3(row[i] + sx, col[i] + sy - 1) sra_, sdec_ = ap.tel_to_sky(v2_, v3_) dra = (sra_ - sra) * np.cos(sdec / 180 * np.pi) dde = sdec_ - sdec slit_pa = np.arctan2(dra, dde) # sra = pra(row[i] + sx, col[i]+sy) # sdec = pdec(row[i] + sx, col[i]+sy) sr = grizli.utils.SRegion(np.array([sra, sdec]), wrap=False) sr.meta = {} for k in [ "program", "source_id", "ra", "dec", "slitlet_id", "shutter_quadrant", "shutter_row", "shutter_column", "estimated_source_in_shutter_x", "estimated_source_in_shutter_y", ]: if k in se.colnames: sr.meta[k] = se[k][i] if "estimated_source_in_shutter_x" in sr.meta: sr.meta["is_source"] = np.isfinite( sr.meta["estimated_source_in_shutter_x"] ) else: sr.meta["is_source"] = False if sr.meta["is_source"]: sr.ds9_properties = "color=" + source_color else: sr.ds9_properties = "color=" + other_color regions.append(sr) if as_string: output = f"# msametfl = {self.metafile}\n" di = dither_point_index output += f"# dither_point_index = {di}\n" mi = msa_metadata_id output += f"# msa_metadata_id = {mi}\n" for k in meta_keys: if k in meta.colnames: output += f"# {k} = {meta[k]}\n" output += "icrs\n" for sr in regions: m = sr.meta if m["is_source"]: output += f"circle({m['ra']:.7f}, {m['dec']:.7f}, 0.2\")" output += f" # color=cyan text=xx{m['source_id']}yy\n" for r in sr.region: output += r + "\n" output = output.replace("xx", "{").replace("yy", "}") else: output = regions return output
[docs] def all_regions_from_metafile_siaf(self, **kwargs): """ Run `~msaexp.msa.MSAMetafile.regions_from_metafile_siaf` for all exposures Parameters ---------- kwargs : dict Passed to `~msaexp.msa.MSAMetafile.regions_from_metafile_siaf` Returns ------- output : list, str Depending on ``as_string`` input keyword """ output = None for key in self.mast_key_pairs: msa_metadata_id, dither_point_index = key _out = self.regions_from_metafile_siaf( msa_metadata_id=msa_metadata_id, dither_point_index=dither_point_index, **kwargs, ) if output is None: output = _out else: output += _out return output
[docs] def write(self, prefix="edit_", overwrite=True, verbose=True): """ Write a new metadata file """ import grizli.utils im = pyfits.open(self.filename) shut = self.shutter_table src = self.src_table hdus = { "SHUTTER_INFO": pyfits.BinTableHDU(shut), "SOURCE_INFO": pyfits.BinTableHDU(src), } for e in hdus: for k in im[e].header: if k not in hdus[e].header: # print(e, k, im[e].header[k]) hdus[e].header[k] = im[e].header[k] im[e] = hdus[e] output_file = prefix + self.filename msg = f"msaexp.msa: write from {self.filename} to {output_file}" grizli.utils.log_comment( grizli.utils.LOGFILE, msg, verbose=verbose, show_date=True ) im.writeto(output_file, overwrite=overwrite) im.close()
PYSIAF_GITHUB = "https://github.com/spacetelescope/pysiaf/raw/main/pysiaf/source_data/NIRSpec/delivery/test_data/apertures_testData/" def fit_siaf_shutter_transforms( prefix=PYSIAF_GITHUB, degree=3, inverse_degree=2, check_rms=True ): """ Fit shutter (i,j) > (v2,v3) transformations from the files at https://github.com/spacetelescope/pysiaf/tree/main/pysiaf/source_data/NIRSpec/delivery/test_data/apertures_testData Parameters ---------- prefix : str, optional The URL prefix for the files containing the transformations. degree : int, optional The degree of the polynomial fit for the transformations. Default is 3. inverse_degree : int, optional The degree of the polynomial fit for the inverse transformations. Default is 2. check_rms : bool, optional Whether to calculate and print the RMS of the transformations. Default is True. Returns ------- transforms : dict A dictionary containing the fitted transformations. The keys are: - "degree": the degree of the polynomial fit - "coeffs": a nested dictionary containing the coefficients of the transformations for each quadrant - "irange": a nested dictionary containing the range of valid "i" values for each quadrant - "jrange": a nested dictionary containing the range of valid "j" values for each quadrant - "inverse_degree": the degree of the polynomial fit for the inverse transformations - "inverse": a nested dictionary containing the coefficients of the inverse transformations for each quadrant - "rms" (optional): a nested dictionary containing the RMS of the transformations for each quadrant """ from astropy.modeling.models import Polynomial2D from astropy.modeling.fitting import LinearLSQFitter import grizli.utils def _model_to_dict(model): """ """ params = {} for k, v in zip(model.param_names, model.parameters): params[k] = float(v) return params poly = Polynomial2D(degree=degree) pinv = Polynomial2D(degree=inverse_degree) transforms = { "degree": degree, "coeffs": {}, "irange": {}, "jrange": {}, "inverse_degree": inverse_degree, "inverse": {}, } if check_rms: transforms["rms"] = {} for quadrant in [1, 2, 3, 4]: ref_file = os.path.join( prefix, f"sky_fpa_projectionMSA_Q{quadrant}.fits" ) fpa = grizli.utils.read_catalog(ref_file) transforms["irange"][quadrant] = [ int(fpa["I"].min()), int(fpa["I"].max()), ] transforms["jrange"][quadrant] = [ int(fpa["J"].min()), int(fpa["J"].max()), ] # Transformation (i,j) to (v2,v3) ij_to_v2 = LinearLSQFitter()( poly, fpa["I"] * 1.0, fpa["J"] * 1.0, fpa["XPOSSKY"] * 3600 ) ij_to_v3 = LinearLSQFitter()( poly, fpa["I"] * 1.0, fpa["J"] * 1.0, fpa["YPOSSKY"] * 3600 ) v2d = _model_to_dict(ij_to_v2) v3d = _model_to_dict(ij_to_v3) transforms["coeffs"][quadrant] = {"ij_to_v2": v2d, "ij_to_v3": v3d} # Inverse transformation v2,v3 to i,j v23_to_i = LinearLSQFitter()( pinv, fpa["XPOSSKY"] * 3600, fpa["YPOSSKY"] * 3600, fpa["I"] * 1.0 ) v23_to_j = LinearLSQFitter()( pinv, fpa["XPOSSKY"] * 3600, fpa["YPOSSKY"] * 3600, fpa["J"] * 1.0 ) icoeffs = _model_to_dict(v23_to_i) jcoeffs = _model_to_dict(v23_to_j) transforms["inverse"][quadrant] = { "v23_to_i": icoeffs, "v23_to_j": jcoeffs, } if check_rms: ijx = ij_to_v2(fpa["I"] * 1.0, fpa["J"] * 1.0) ijy = ij_to_v3(fpa["I"] * 1.0, fpa["J"] * 1.0) stdx = np.std(ijx - fpa["XPOSSKY"] * 3600) stdy = np.std(ijy - fpa["YPOSSKY"] * 3600) transforms["rms"][quadrant] = [float(stdx), float(stdy)] msg = f"Q{quadrant} rms = {stdx:.2e} {stdy:.2e}" msg += " arcsec (i,j > v2,v3)" ix = v23_to_i(fpa["XPOSSKY"] * 3600.0, fpa["YPOSSKY"] * 3600.0) jx = v23_to_j(fpa["XPOSSKY"] * 3600.0, fpa["YPOSSKY"] * 3600.0) stdi = np.std(ix - fpa["I"] * 1.0) stdj = np.std(jx - fpa["J"] * 1.0) msg += ( "\n" + f" {stdi:.2e} {stdj:.2e} shutter (v2,v3 > i,j)" ) print(msg) if False: import yaml header = """# 2D polynomal fits to the tables at https://github.com/spacetelescope/pysiaf/tree/main/pysiaf/source_data/NIRSpec/delivery/test_data/apertures_testData # # The keys of the "coeffs" dict are the MSA quadrant numbers: (1,2,3,4) # and the values are astropy.modeling.models.Polynomial2D(degree) coefficients # for the transformation shutter (row=i,col=j) > v2 and (row=i,col=j) > v3 # # The coefficients of the "inverse" dict are the transformations # (v2,v3) > (row=i) and (v2,v3) > (col=j) # # RMS of the transformations: # Q1 rms = 1.76e-04 1.80e-04 arcsec (i,j > v2,v3) # 6.93e-03 1.70e-03 shutter (v2,v3 > i,j) # Q2 rms = 1.55e-04 1.88e-04 arcsec (i,j > v2,v3) # 8.96e-03 2.99e-03 shutter (v2,v3 > i,j) # Q3 rms = 1.44e-04 2.56e-04 arcsec (i,j > v2,v3) # 6.66e-03 2.10e-03 shutter (v2,v3 > i,j) # Q4 rms = 6.72e-05 2.51e-04 arcsec (i,j > v2,v3) # 9.91e-03 2.62e-03 shutter (v2,v3 > i,j) """ with open("/tmp/nirspec_msa_transforms.yaml", "w") as fp: fp.write(header) yaml.dump(transforms, fp) return transforms def load_siaf_shutter_transforms(): """ Read MSA shutter transforms (i,j) > (v2,v3) from file created with `msaexp.msa.fit_siaf_shutter_transforms` Returns ------- tr : dict Transform coefficients by quadrant """ import yaml from astropy.modeling.models import Polynomial2D tfile = os.path.join( os.path.dirname(__file__), "data/nirspec_msa_transforms.yaml" ) with open(tfile) as fp: traw = yaml.load(fp, Loader=yaml.Loader) degree = traw["degree"] tr = {} for k in traw["coeffs"]: tr[k] = ( Polynomial2D(degree, **traw["coeffs"][k]["ij_to_v2"]), Polynomial2D(degree, **traw["coeffs"][k]["ij_to_v3"]), ) return tr def load_siaf_inverse_shutter_transforms(): """ Read MSA shutter transforms (v2,v3) > (i,j) from file created with `msaexp.msa.fit_siaf_shutter_transforms` Returns ------- tr : dict Nested dictionary with keys - ``inverse``: inverse transform by quadrant - ``irange``: Range of valid ``i`` - ``jrange``: Range of valid ``j`` """ import yaml from astropy.modeling.models import Polynomial2D tfile = os.path.join( os.path.dirname(__file__), "data/nirspec_msa_transforms.yaml" ) with open(tfile) as fp: traw = yaml.load(fp, Loader=yaml.Loader) degree = traw["inverse_degree"] tr = { "inverse": {}, "irange": traw["irange"], "jrange": traw["jrange"], } for k in traw["inverse"]: tr["inverse"][k] = ( Polynomial2D(degree, **traw["inverse"][k]["v23_to_i"]), Polynomial2D(degree, **traw["inverse"][k]["v23_to_j"]), ) return tr
[docs]def slit_best_source_alias( slit, require_primary=False, which="min", verbose=False ): """ Get the "best" source id from the MSA metadata file associated with a particular slit extraction. Parameters ---------- slit : str, `stdatamodels.jwst.datamodels.slit.SlitModel` Slit object or filename of one require_primary : bool Include a test on "primary_source = Y" in the shutter table. which : 'most', 'min', 'max' Criterion for picking the "best" source_id among multiple source ids assigned for a particular ``slitlet_id``. Returns ------- alias : str Source alias, either ``background_{id}``if no non-zero source ids found or ``{program}_{id}`` if a primary source found with a particular slitlet_id. """ import jwst.datamodels import grizli.utils if which not in ["most", "min", "max"]: msg = f"slit_best_source_alias: 'which' must be 'most_common', 'min', 'max'" raise ValueError(msg) if isinstance(slit, str): slit = jwst.datamodels.open(slit) opened = True else: opened = False msa = MSAMetafile(slit.meta.instrument.msa_metadata_file) shu = msa.shutter_table rows = shu["msa_metadata_id"] == slit.meta.instrument.msa_metadata_id rows &= shu["slitlet_id"] == slit.slitlet_id rows &= shu["source_id"] > 0 test = rows if require_primary: primary = rows & (shu["primary_source"] == "Y") if primary.sum() > 0: test = primary if test.sum() == 0: alias = f"background_{slit.slitlet_id}" return alias source_ids = grizli.utils.Unique( shu["source_id"][test].tolist(), verbose=False ) if which == "most": best_id = np.array(source_ids.values)[np.argmax(source_ids.counts)] elif which == "min": best_id = np.min(source_ids.values) elif which == "max": best_id = np.max(source_ids.values) prog = int(slit.meta.observation.program_number) alias = f"{prog}_{best_id}" msg = f"slit_best_source_alias: {alias} chosen among {source_ids.values}" grizli.utils.log_comment(grizli.utils.LOGFILE, msg, verbose=verbose) if opened: slit.close() return alias
def msa_shutter_catalog(ra, dec, pointing=None, ap=None, inv=None): """ Compute shutter centering for a list of input coordinates Parameters ---------- ra, dec : array-like Arrays of RA, Dec. in decimal degrees pointing : None, (RA_REF, DEC_REF, ROLL_REF) Pointing definition ap : `pysiaf.aperture.NirspecAperture` Aperture object with attitude set based on database pointing keywords and with various coordinate transformation methods inv : dict Inverse shutter transformations from `msaexp.msa.load_siaf_inverse_shutter_transforms` Returns ------- tab : `astropy.table.table` Table with shutter information """ import pysiaf from pysiaf.utils import rotations from grizli import utils if pointing is not None: siaf = pysiaf.siaf.Siaf("NIRSPEC") ap = siaf["NRS_FULL_MSA"] att = rotations.attitude(ap.V2Ref, ap.V3Ref, *pointing) ap.set_attitude_matrix(att) if ap is None: raise ValueError("Either pointing or ap must be provided") if inv is None: inv = load_siaf_inverse_shutter_transforms() tab = utils.GTable() tab["ra"] = ra tab["dec"] = dec tab["quadrant"] = -1 tab["row_i"] = np.nan tab["col_j"] = np.nan v2, v3 = ap.sky_to_tel(ra, dec) for q in [1, 2, 3, 4]: tr = inv["inverse"][q] iq = tr[0](v2, v3) + 0.5 jq = tr[1](v2, v3) + 0.5 inti = np.floor(iq).astype(int) intj = np.floor(jq).astype(int) clip = (inti >= inv["irange"][q][0]) & (inti <= inv["irange"][q][1]) clip &= (intj >= inv["jrange"][q][0]) & (intj <= inv["jrange"][q][1]) if clip.sum() > 0: tab["quadrant"][clip] = q tab["row_i"][clip] = iq[clip] tab["col_j"][clip] = jq[clip] tab["di"] = tab["row_i"] - np.floor(tab["row_i"]) - 0.5 tab["dj"] = tab["col_j"] - np.floor(tab["col_j"]) - 0.5 return tab # Offsets for MSA shutter quadrants DCOL = 450 DROW = 231.7
[docs]def set_msa_axis_ticks(ax=None, which="xy"): """ Draw axis ticks for a plot of the MSA shutters """ import matplotlib.pyplot as plt global DCOL, DROW if ax is None: ax = plt.gca() xlim = (377 + DCOL + 20, -20) ylim = (171 + DROW + 10, -10) if "x" in which: xt = [1, 90, 180, 270, 377] xtv = xt + [xi + DCOL for xi in xt] ax.set_xticks(xtv) ax.set_xticklabels(xt * 2) ax.set_xlabel("MSA Column") ax.set_xlim(*xlim) if "y" in which: yt = [1, 60, 120, 171] ytv = yt + [yi + DROW for yi in yt] ax.set_yticks(ytv) ax.set_yticklabels(yt * 2) ax.set_ylabel("MSA Row") ax.set_ylim(*ylim) return ax
[docs]def plot_msa_shutters(col, row, quadrant, c=None, ax=None, **kwargs): """ Make a plot of a list of MSA shutters Parameters ---------- col, row, quadrant : array-like List of shutter columns ("x", dispersion), rows ("y", spatial), quadrants c : array-like Optional array to color the points. If not provided, will color by ``quadrant``. ax : axis Plot axis to draw the shutter positions. If not specified, generate a new figure + axis kwargs : dict Keyword arguments passed to `matplotlib.pyplot.scatter` Returns ------- fig : figure, None Plot figure. If ``ax`` was provided as input, will be empty. ax : axis Plot axis """ import matplotlib.pyplot as plt global DCOL, DROW if ax is None: fig, ax = plt.subplots(1, 1, figsize=(8, 8)) newplot = True else: newplot = False if c is None: c = quadrant ax.scatter( col + np.isin(quadrant, [3, 4]) * DCOL, row + np.isin(quadrant, [2, 4]) * DROW, c=c, **kwargs, ) if newplot: set_msa_axis_ticks(ax, "xy") return fig, ax else: return None, ax
def diffraction_spikes_on_msa( ra, dec, attitude_matrix, aper=None, buffer_arcsec=None ): """ Generate estimated regions of diffraction spikes for a bright star Parameters ---------- ra, dec: float Coordinates of the source creating the spikes in decimal degrees aper : `pysiaf.Aperture`, None SIAF aperture in which to compute the transformations. If None, then use NIRSpec NRS_FULL_MSA. attitude_matrix : array Spacecraft attitude matrix from `pysiaf.rotations.attitude` buffer_arcsec : float, None Optional buffer to add to the spike regions Returns ------- spike_regions : list List of `~grizli.utils.SRegion` objects for the spike polygons """ import pysiaf import grizli.utils # Rough polygons of the diffraction spikes in relative "tel" coordinates regions_v23 = """ ((0.69,-1.27),(-0.69,-1.25),(-0.33,-38.09),(0.72,-38.68)) ((0.75,1.24),(1.43,0.04),(40.86,22.74),(39.10,23.29),(3.88,3.07)) ((4.30,3.31),(4.30,3.31),(4.30,3.31),(4.30,3.31)) ((-0.58,-1.15),(-1.26,0.05),(-34.79,-19.34),(-33.77,-20.18)) ((-0.76,1.08),(0.61,1.07),(0.10,36.74),(-0.58,36.97)) ((1.30,0.00),(0.61,-1.20),(41.75,-24.18),(42.99,-23.48)) ((-1.45,-0.07),(-0.74,1.11),(-34.94,21.31),(-35.74,20.20)) ((-1.64,-0.23),(-1.64,0.29),(-11.00,0.14),(-11.02,-0.18),(-4.41,-0.14)) ((1.36,0.31),(1.35,-0.21),(10.71,-0.11),(10.73,0.21)) """.strip().split() if aper is None: nrs = pysiaf.Siaf("NIRSPEC") aper = nrs["NRS_FULL_MSA"] aper.set_attitude_matrix(attitude_matrix) # ra_src, dec_src = 150.0670825, 2.2730196 v23_src = aper.sky_to_tel(ra, dec) spike_regions = [] for region in regions_v23: sr0 = grizli.utils.SRegion(region, wrap=False) reg_v23 = np.array(sr0.xy[0]) + np.array(v23_src) reg_sky = np.array(aper.tel_to_sky(*(reg_v23.T))) sr = grizli.utils.SRegion(reg_sky) if buffer_arcsec is not None: sr = grizli.utils.SRegion( sr.shapely[0].buffer(buffer_arcsec / 3600.0) ) spike_regions.append(sr) return spike_regions
[docs]def read_apt_shutter_csv(file): """ Read an APT MSA shutter CSV file into a table Parameters ---------- file : str Filename of the shutter CSV file exported from APT Returns ------- tab, otab : `~grizli.utils.GTable` Tables for closed and open shutters """ import grizli.utils with open(file) as fp: lines = fp.readlines()[1:] data = np.array([np.cast[int](line.strip().split(",")) for line in lines]) q = data * 0 q[:365, :171] = 1 q[365:, :171] = 3 q[:365, 171:] = 2 q[365:, 171:] = 4 row, col = np.indices(data.shape) row = (row % 365) + 1 col = (col % 171) + 1 closed = data == 1 tab = grizli.utils.GTable() tab["shutter_quadrant"] = q[closed] tab["shutter_row"] = row[closed] tab["shutter_column"] = col[closed] otab = grizli.utils.GTable() otab["shutter_quadrant"] = q[~closed] otab["shutter_row"] = row[~closed] otab["shutter_column"] = col[~closed] return tab, otab
[docs]def get_shutter_wavelength_limits(col, row, quadrant, grating='prism', filter='clear'): """ Compute wavelength limits for specific slitlets by MSA row, col, quadrant Parameters ---------- row, col, quadrant : scalar, array-like Shutter definition by MSA column (1-370), row (1-170) and quadrant (1,2,3,4) grating, filter : str Grating and filter names. Currently the PRISM and M grating dispersers are implemented. Returns ------- tab : table Table with wavelength limits by detector ``[nrs1/nrs2]_wave_[min/max]``. The chip gap is the range between ``nrs1_wave_max`` and ``nrs2_wave_min``. The shutter dependence is calculated from the spectra extracted with msaexp itself. This function is tested to be consistent with the "MSA Target Info" table that can be exported with APT for the PRISM and G395M gratings. However, for the other M gratings this function includes the full wavelength range that can be extracted extending into where the spectral orders can overlap. If a specified shutter is outside of the convex hull of the "training" data used to derive the mapping, the value in the output table is set to NaN. Examples -------- .. plot:: :include-source: ### Make a plot showing wavelength limits import numpy as np import matplotlib.pyplot as plt from grizli import utils from msaexp import msa import msaexp.utils # APT: Export -> MSA Target Info apt = utils.read_catalog("https://github.com/gbrammer/msaexp/raw/refs/heads/main/msaexp/tests/data/4233-obs2-exp2-c1_uds_obs2az_g395me2n1-G395M-F290LP.csv") # Compute wavelength limits by shutter limits = msa.get_shutter_wavelength_limits( apt['Column (Disp)'], apt['Row (Spat)'], apt['Quadrant'], grating='g395m', filter='f290lp', ) fig, axes = plt.subplots(1, 2, figsize=(8,4), sharex=True, sharey=True) msa.set_msa_axis_ticks(axes[0], "xy") msa.set_msa_axis_ticks(axes[1], "x") fig.tight_layout(pad=1) kws = dict( cmap=plt.cm.RdYlBu_r, vmin=2.7, vmax=5.6, ) # Plot the MSA layout colored by "wave_max" # offset columns, rows for plotting px = apt['Column (Disp)'] + np.isin(apt['Quadrant'], [3, 4]) * msa.DCOL py = apt['Row (Spat)'] + np.isin(apt['Quadrant'], [2, 4]) * msa.DROW for i in [1,2]: ax = axes[i-1] no_gap = limits[f'nrs{i}_wave_min'] < 2.7 no_gap &= limits[f'nrs{i}_wave_max'] > 5.6 ax.scatter( px[no_gap], py[no_gap], ec='magenta', fc='None', marker='s', s=50, label=f'Fully on NRS{i}', ) _ = msa.plot_msa_shutters( apt['Column (Disp)'], apt['Row (Spat)'], apt['Quadrant'], c=limits[f'nrs{i}_wave_max'], ec='None', ax=ax, marker='s', s=20, **kws, ) sc = ax.scatter(-99, -99, c=[5], **kws) _ = msaexp.utils.tight_colorbar( sc, fig, ax, loc='cc', sy=0.02, labelsize=6, label=f"nrs{i}_wave_max", ) leg = ax.legend( loc='center ' + ['left', 'right'][i-1], fontsize=7, ) for q in [1,2,3,4]: ax.text( 180 + (q in [3,4]) * msa.DCOL, 85 + (q in [2,4]) * msa.DROW, f'Q{q}', ha='center', va='center', bbox={'fc':'w', 'ec': 'None', 'alpha': 0.8}, ) """ import yaml import grizli.utils tab = grizli.utils.GTable() tab["col"] = np.atleast_1d(col) tab["row"] = row tab["quadrant"] = quadrant coo = np.array([tab["col"], tab["row"]]).T coeffs_file = os.path.join( msautils.module_data_path(), "detector_gap", f"wavelength_range_coeffs_{grating}_{filter}.yaml".lower() ) tab.meta["grating"] = grating tab.meta["filter"] = filter tab.meta["coeffs"] = os.path.basename(coeffs_file) with open(coeffs_file) as fp: coeffs = yaml.load(fp, Loader=yaml.Loader) # print(coeffs.keys()) for det_i in ['NRS1','NRS2']: for val in ['wave_min','wave_max']: col = f'{det_i}_{val}'.lower() tab[col] = -1.0 for q in [1,2,3,4]: qsub = tab["quadrant"] == q if qsub.sum() == 0: continue for det_i in ['NRS1','NRS2']: for val in ['wave_min','wave_max']: col = f'{det_i}_{val}'.lower() key = f'{q}-{det_i}-{val}' if key not in coeffs: continue ck = coeffs[key] sr = grizli.utils.SRegion(np.array(ck['hull']), wrap=False) ####### px = grizli.utils.bspline_templates( tab["col"][qsub], minmax=(0, 380), df=ck['df_x'], get_matrix=True ) py = grizli.utils.bspline_templates( tab["row"][qsub], minmax=(0, 180), df=ck['df_y'], get_matrix=True ) pp = [] for py_i in py.T: pp.append((px.T * py_i).T) tab[col][qsub] = np.hstack(pp).dot(ck['coeffs']) in_hull = sr.path[0].contains_points(coo) tab[col][qsub & ~in_hull] = np.nan return tab