Source code for msaexp.pipeline_extended

"""
Extend CRDS reference files for fixed-slit observations
"""

import os
import time

import numpy as np
import astropy.io.fits as pyfits
import astropy.table
import matplotlib.pyplot as plt

import jwst.datamodels
from jwst.assign_wcs import AssignWcsStep
from jwst.msaflagopen import MSAFlagOpenStep
from jwst.extract_2d import Extract2dStep
from jwst.flatfield import FlatFieldStep
from jwst.pathloss import PathLossStep
from jwst.barshadow import BarShadowStep
from jwst.photom import PhotomStep
from jwst.assign_wcs.util import NoDataOnDetectorError

from jwst.assign_wcs.assign_wcs import load_wcs
from jwst.flatfield import flat_field
import jwst.photom.photom

from grizli import utils
import msaexp.utils as msautils
from .msa import slit_best_source_alias
from . import pipeline
from .fork.assign_wcs.nirspec import zeroth_order_mask

# NEW_WAVERANGE_REF = "jwst_nirspec_wavelengthrange_0008_ext.asdf"

EXTENDED_RANGES = {
    "F070LP_G140M": [0.6, 3.3],
    "F100LP_G140M": [0.9, 3.3],
    "F170LP_G235M": [1.5, 5.3],
    "F290LP_G395M": [2.6, 5.6],
    "F070LP_G140H": [0.6, 3.3],
    "F100LP_G140H": [0.9, 3.3],
    "F170LP_G235H": [1.5, 5.3],
    "F290LP_G395H": [2.6, 5.6],
    "CLEAR_PRISM": [0.5, 5.6],
}

__all__ = [
    "step_reference_files",
    "extend_wavelengthrange",
    "extend_fs_fflat",
    "extend_quad_fflat",
    "extend_sflat",
    "extend_dflat",
    "extend_photom",
    "run_pipeline",
]


[docs]def step_reference_files(step, input_model): """ Get reference filenames for a `jwst` pipeline step Parameters ---------- step : `jwst` pipeline step instance Step object, e.g., `jwst.assign_wcs.AssignWcsStep()` input_model : `jwsts.datamodels.Model` Data model instance Returns ------- reference_file_names : dict List of reference filenames by type """ reference_file_names = {} for reftype in step.reference_file_types: reffile = step.get_reference_file(input_model, reftype) reference_file_names[reftype] = reffile if reffile else "" return reference_file_names
VERBOSITY = True
[docs]def extend_wavelengthrange( ref_file="jwst_nirspec_wavelengthrange_0008.asdf", ranges=EXTENDED_RANGES ): """ Extend limits of ``wavelengthrange`` reference file Parameters ---------- ref_file : str Filename of ``wavelengthrange`` asdf reference file. If not provided as an absolute path starting with ``/``, will look for the file in ``./`` and ``$CRDS_PATH/references/jwst/nirspec``. ranges : dict Extended ranges by grating / filter Returns ------- status : bool True if completed without exception Writes a file ``ref_file.replace(".asdf", "_ext.asdf")``. """ if os.path.exists(ref_file): NIRSPEC_CRDS = "./" elif ref_file.startswith("/"): NIRSPEC_CRDS = os.path.dirname(ref_file) else: NIRSPEC_CRDS = os.path.join( os.getenv("CRDS_PATH"), "references/jwst/nirspec" ) waverange = jwst.datamodels.open(os.path.join(NIRSPEC_CRDS, ref_file)) for k in ranges: i = waverange.waverange_selector.index(k) waverange.wavelengthrange[i] = [v * 1e-6 for v in ranges[k]] new_waverange = ref_file.replace(".asdf", "_ext.asdf") waverange.meta.author = "G. Brammer" waverange.meta.date = utils.nowtime().iso.replace(" ", "T") waverange.meta.description = "Extended M grating wavelength ranges" so = np.argsort(waverange.waverange_selector) msg = f"New wavelength range file {new_waverange}" utils.log_comment(utils.LOGFILE, msg, verbose=VERBOSITY) for i in so: key = waverange.waverange_selector[i] spl = key.split("_") if ( spl[0] in [ "CLEAR", "FLAT1", "FLAT2", "FLAT3", "LINE1", "LINE2", "LINE3", "REF", "TEST", ] ) & (key != "CLEAR_PRISM"): continue if (spl[1] == "MIRROR") | (key == "F100LP_PRISM"): continue if spl[0] in ["OPAQUE", "F110W", "F140X"]: continue if (spl[1] == "PRISM") & (spl[0] != "CLEAR"): continue msg = f"{i:>2} {waverange.waverange_selector[i]:>16} {waverange.order[i]:>3} {waverange.wavelengthrange[i][0]*1.e6:.2f} - {waverange.wavelengthrange[i][1]*1.e6:.2f}" utils.log_comment(utils.LOGFILE, msg, verbose=VERBOSITY) waverange.write(new_waverange) waverange.close() return True
[docs]def extend_fs_fflat( fflat_filename, full_range=[0.55, 5.6], FILL_VALUE=2.0e20, log_prefix="extend_fs_fflat", ): """ Extend fflat reference for ``EXP_TYPE = NRS_FIXEDSLIT`` Parameters ---------- fflat_filename : str Filename full_range : list Wavelength limits FILL_VALUE : float Value to use for the monochromatic flat table. The default is set such that the products processed with the updated references files have roughly the same pixel values as with the default calibrations. log_prefix : str String for log messages Returns ------- ff : ``jwst.datamodels.NirspecFlat`` F-Flat object """ fbase = os.path.basename(fflat_filename) ff = jwst.datamodels.open(fflat_filename) fftab = astropy.table.Table(ff.flat_table) nelem = len(fftab[0]["wavelength"]) non_zero = fftab["wavelength"][0] > 0 newtab = astropy.table.Table() newtab["slit_name"] = fftab["slit_name"] med_FILL_VALUE = np.nanmedian(fftab["data"][0][non_zero]) msg = f"{log_prefix} {fbase} fill fflat with {FILL_VALUE} (med = {med_FILL_VALUE})" utils.log_comment(utils.LOGFILE, msg, verbose=VERBOSITY) wave = np.logspace(*(np.log10(full_range)), nelem) newtab["nelem"] = nelem newtab["wavelength"] = [wave.astype(np.float32).tolist()] * len(fftab) newtab["data"] = [ np.full(nelem, FILL_VALUE, dtype=np.float32).tolist() ] * len(fftab) newtab["error"] = [ np.full(nelem, FILL_VALUE * 1.0e-5, dtype=np.float32).tolist() ] * len(fftab) for c in ["wavelength", "data", "error"]: newtab[c] = newtab[c].astype(np.float32) ff.flat_table = pyfits.BinTableHDU(newtab).data for c in ["wavelength", "data", "error"]: newtab[c] = newtab[c].astype(np.float32) return ff
[docs]def extend_quad_fflat( fflat_filename, full_range=[0.55, 5.6], FILL_VALUE=2.0e20, log_prefix="extend_quad_fflat", ): """ Extend fflat reference for ``EXP_TYPE = NRS_MSASPEC`` Parameters ---------- fflat_filename : str Filename full_range : list Wavelength limits FILL_VALUE : float Value to use for the monochromatic flat table. The default is set such that the products processed with the updated references files have roughly the same pixel values as with the default calibrations. log_prefix : str String for log messages Returns ------- ff : ``jwst.datamodels.NirspecFlat`` F-Flat object """ fbase = os.path.basename(fflat_filename) ff = jwst.datamodels.open(fflat_filename) for i in range(4): fftab = astropy.table.Table(ff.quadrants[i].flat_table) non_zero = fftab["wavelength"][0] > 0 newtab = astropy.table.Table() newtab["slit_name"] = fftab["slit_name"] med_FILL_VALUE = np.nanmedian(fftab["data"][0][non_zero]) msg = f"{log_prefix} {fbase} Q={i} fill fflat with {FILL_VALUE} (med = {med_FILL_VALUE})" utils.log_comment(utils.LOGFILE, msg, verbose=VERBOSITY) nelem = len(fftab[0]["wavelength"]) wave = np.logspace(*(np.log10(full_range)), nelem) newtab["nelem"] = nelem newtab["wavelength"] = [wave.astype(np.float32).tolist()] * len(fftab) newtab["data"] = [ np.full(nelem, FILL_VALUE, dtype=np.float32).tolist() ] * len(fftab) newtab["error"] = [ np.full(nelem, FILL_VALUE * 1.0e-5, dtype=np.float32).tolist() ] * len(fftab) ff.quadrants[i].flat_table = pyfits.BinTableHDU(newtab).data for c in ["wavelength", "data", "error"]: newtab[c] = newtab[c].astype(np.float32) return ff
[docs]def extend_sflat( sflat_filename, full_range=[0.55, 5.6], FILL_VALUE=5.0e-10, log_prefix="extend_sflat", ): """ Extend NIRSpec sflat reference file Parameters ---------- fflat_filename : str Filename full_range : list Wavelength limits FILL_VALUE : float Value to use for the monochromatic flat table. The default is set such that the products processed with the updated references files have roughly the same pixel values as with the default calibrations. log_prefix : str String for log messages Returns ------- sf : ``jwst.datamodels.NirspecFlat`` S-Flat object """ fbase = os.path.basename(sflat_filename) sf = jwst.datamodels.open(sflat_filename) msg = f"{log_prefix} {fbase} Unset NO_FLAT_FIELD from SFlat DQ" utils.log_comment(utils.LOGFILE, msg, verbose=True) sf.dq -= sf.dq & 2**18 if sf.meta.exposure.type == "NRS_IFU": msg = f"{log_prefix} {fbase} Unset 1 from IFU SFlat DQ" utils.log_comment(utils.LOGFILE, msg, verbose=True) sf.dq -= sf.dq & 1 # Reset all msg = f"{log_prefix} {fbase} Set unity SFlat SCI" utils.log_comment(utils.LOGFILE, msg, verbose=True) sf.data = np.ones_like(sf.data) sf.err = np.ones_like(sf.data) * 1.0e-3 # Just reset wavelength metadata, don't add extensions head_ = sf.extra_fits.SCI.header nflat = sf.data.shape[0] inds = [] for i, row in enumerate(head_): if row[0] == "FLAT_01": head_[i][1] = full_range[0] inds.append(i) elif row[0] == f"FLAT_{nflat:02d}": head_[i][1] = full_range[-1] inds.append(i) for i in inds: utils.log_comment( utils.LOGFILE, f"{log_prefix} {fbase} header {sf.extra_fits.SCI.header[i]}", verbose=True, ) if len(sf.wavelength) > 0: sf.wavelength[0] = (full_range[0],) sf.wavelength[-1] = (full_range[-1],) # Make flat table sft = astropy.table.Table(sf.flat_table) non_zero = sft["wavelength"][0] > 0 newtab = astropy.table.Table() newtab["slit_name"] = sft["slit_name"] nelem = len(sft[0]["wavelength"]) if nelem > 50000: nelem = 1024 head_ = sf.extra_fits.FAST_VARIATION.header for i in range(len(head_)): head_[i][1] = f"({nelem})" # wierd format # print(head_[i]) wave = np.logspace(*(np.log10(full_range)), nelem) med_FILL_VALUE = np.nanmax(sft["data"][0][non_zero]) msg = f"{log_prefix} {fbase} fill sflat with {FILL_VALUE} (med = {med_FILL_VALUE})" utils.log_comment(utils.LOGFILE, msg, verbose=True) newtab["nelem"] = nelem newtab["wavelength"] = [wave.astype(np.float32).tolist()] * len(sft) newtab["data"] = [ np.full(nelem, FILL_VALUE, dtype=np.float32).tolist() ] * len(sft) newtab["error"] = [ np.full(nelem, FILL_VALUE * 1.0e-5, dtype=np.float32).tolist() ] * len(sft) for c in ["wavelength", "data", "error"]: newtab[c] = newtab[c].astype(np.float32) sf.flat_table = pyfits.BinTableHDU(newtab).data return sf
[docs]def extend_dflat( dflat_filename, full_range=[0.55, 5.6], nelem=1024, log_prefix="extend_dflat", ): """ Extend NIRSpec sflat reference file Parameters ---------- dflat_filename : str Filename full_range : list Wavelength limits nelem : int Number of elements in the flattened "fast_variation" table log_prefix : str String for log messages Returns ------- df : ``jwst.datamodels.NirspecFlat`` D-Flat object """ fbase = os.path.basename(dflat_filename) df = jwst.datamodels.open(dflat_filename) dflat_waves = df.extra_fits.WAVELENGTH.data.wavelength.tolist() prism_min = full_range[0] prism_max = full_range[1] #################### # Maximum wavelength if dflat_waves[-1] < prism_max: msg = f"{log_prefix} {fbase} max wave = {dflat_waves[-1]:.2f} -> append {prism_max}" utils.log_comment(utils.LOGFILE, msg, verbose=VERBOSITY) dflat_waves.append(prism_max) astropy.table.Table(df.extra_fits.WAVELENGTH.data) newtab = astropy.table.Table() newtab["wavelength"] = dflat_waves df.extra_fits.WAVELENGTH.data = pyfits.BinTableHDU(newtab).data df.extra_fits.SCI.header[0][1] += 1 df.extra_fits.SCI.header.append( [ f"PFLAT_{df.extra_fits.SCI.header[0][1]}", prism_max, "central wavelength monochromatic pflat (micron)", ] ) df.data = np.vstack([df.data, df.data[-1:, :, :] * 1]) #################### # Minimum wavelength dflat_waves = df.extra_fits.WAVELENGTH.data.wavelength.tolist() if dflat_waves[0] > prism_min: msg = f"{log_prefix} {fbase} min wave = {dflat_waves[0]:.2f} -> prepend {prism_min}" utils.log_comment(utils.LOGFILE, msg, verbose=VERBOSITY) dflat_waves.insert(0, prism_min) astropy.table.Table(df.extra_fits.WAVELENGTH.data) newtab = astropy.table.Table() newtab["wavelength"] = dflat_waves df.extra_fits.WAVELENGTH.data = pyfits.BinTableHDU(newtab).data df.extra_fits.SCI.header[0][1] += 1 df.extra_fits.SCI.header.insert( 1, [ f"PFLAT_1", prism_min, "central wavelength monochromatic pflat (micron)", ], ) N = df.extra_fits.SCI.header[0][1] for i in range(N): df.extra_fits.SCI.header[i + 1][0] = f"PFLAT_{i+1}" df.data = np.vstack([df.data[:1, :, :] * 1, df.data]) ############################## # Flatten FAST_VARIATION table in_fast = astropy.table.Table(df.extra_fits.FAST_VARIATION.data) wgrid = np.logspace(*np.log10(full_range), nelem) resp = (wgrid > 0) * 1 fast_1d = astropy.table.Table() fast_1d["slit_name"] = in_fast["slit_name"] fast_1d["nelem"] = nelem fast_1d["wavelength"] = [wgrid.astype(in_fast["wavelength"].dtype)] * len( in_fast ) fast_1d["data"] = [resp.astype(in_fast["data"].dtype)] * len(in_fast) df.extra_fits.FAST_VARIATION.data = pyfits.BinTableHDU(fast_1d).data return df
[docs]def extend_photom(phot_filename, ranges=EXTENDED_RANGES): """ Extend wavelength ranges in photom reference file Parameters ---------- phot_filename : str Filename of a ``photom`` reference file ranges : dict: Wavelength ranges by grating, filter Returns ------- pref : `stdatamodels.jwst.datamodels.photom.NrsMosPhotomModel` Updated reference file object """ pref = jwst.datamodels.open(phot_filename) ptab = astropy.table.Table(pref.phot_table) for k in ranges: filt, grat = k.split("_") nfull = len(ptab[0]["wavelength"]) nelem = nfull - 4 wgrid = np.zeros(nfull, dtype=np.float32) wgrid[:nelem] = np.logspace(*np.log10(ranges[k]), nelem) resp = (wgrid > 0) * 1 rows = (ptab["grating"] == grat) & (ptab["filter"] == filt) ptab["nelem"][rows] = nelem ptab["wavelength"][rows] = wgrid ptab["relresponse"][rows] = resp pref.phot_table = pyfits.BinTableHDU(ptab).data return pref
def extend_pathloss( path_filename, full_range=[0.55, 5.6], ): """ Extend wavelength ranges in pathloss reference file. This function just resets the first pixel of the reference file to the minimum of the desired wavelength range. Parameters ---------- path_filename : str Filename of a ``pathloss`` reference file full_range : list Minimum and maximum wavelengths, microns. Assumes long wavelength side doesn't need to be extended. Returns ------- hdu : `astropy.io.fits.HDUList` Updated HDUlist of """ hdu = pyfits.open(path_filename) for ext in hdu: h = hdu[ext].header if "CRVAL1" not in h: continue if h["NAXIS"] == 3: crval = "CRVAL3" cdelt = "CDELT3" naxis = "NAXIS3" else: crval = "CRVAL1" cdelt = "CDELT1" naxis = "NAXIS1" to_meters = 1.0e-6 if h[crval] < 1.0e-5 else 1.0 h[crval] = full_range[0] * to_meters max_wavelength = h[crval] + h[naxis] * h[cdelt] if max_wavelength / to_meters < full_range[1]: h[cdelt] = (full_range[1] * to_meters - h[crval]) / h[naxis] return hdu def extend_barshadow( bar_filename, full_range=[0.55, 5.6], ): """ Extend wavelength ranges in barshadow reference file. This function just resets the first pixel of the reference file to the minimum of the desired wavelength range. It also updates the wavelength step if necessary to cover the full desired range. Parameters ---------- path_filename : str Filename of a ``barshadow`` reference file full_range : list Minimum and maximum wavelengths, microns. Assumes long wavelength side doesn't need to be extended. Returns ------- hdu : `astropy.io.fits.HDUList` Updated HDUlist of """ hdu = pyfits.open(bar_filename) msg = f"extend_barshadow: full_range = {full_range[0]:.2f} {full_range[1]:.2f}" utils.log_comment(utils.LOGFILE, msg, verbose=VERBOSITY) for ext in hdu: h = hdu[ext].header if "CRVAL1" not in h: continue key = "CRVAL1" to_meters = 1.0e-6 if h[key] < 1.0e-5 else 1.0 msg = f"extend_barshadow: {os.path.basename(bar_filename)}[{h['EXTNAME']}] " msg += ( f"set minimum wave {h[key]/to_meters:.2f} -> {full_range[0]:.2f}" ) utils.log_comment(utils.LOGFILE, msg, verbose=VERBOSITY) h[key] = full_range[0] * to_meters wmax = h[key] + h["CDELT1"] * h["NAXIS1"] if wmax / to_meters < full_range[1]: old_cd1 = h["CDELT1"] new_cd1 = (full_range[1] * to_meters - h[key]) / h["NAXIS1"] msg = f"extend_barshadow: {os.path.basename(bar_filename)}[{h['EXTNAME']}] " msg += f"set cdelt1 {old_cd1/to_meters:.4f} -> {new_cd1/to_meters:.4f}" utils.log_comment(utils.LOGFILE, msg, verbose=VERBOSITY) h["CDELT1"] = new_cd1 return hdu def assign_wcs_with_extended(dm, file=None, ranges=EXTENDED_RANGES, **kwargs): """ """ import jwst.assign_wcs.nirspec import jwst.assign_wcs.assign_wcs_step # Fork of load_wcs to ignore limits on IFU detectors from .fork.assign_wcs.assign_wcs import load_wcs as load_wcs_fork jwst.assign_wcs.load_wcs = load_wcs_fork jwst.assign_wcs.assign_wcs_step.load_wcs = load_wcs_fork from jwst.assign_wcs import AssignWcsStep ORIG_LOGFILE = utils.LOGFILE wstep = AssignWcsStep() ############## # AssignWCS with extended wavelength range wcs_reference_files = step_reference_files(wstep, dm) new_waverange = os.path.basename(wcs_reference_files["wavelengthrange"]) new_waverange = new_waverange.replace(".asdf", "_ext.asdf") msg = f"msaexp.pipeline_extended.assign_wcs_with_extended {new_waverange}" utils.log_comment(utils.LOGFILE, msg, verbose=VERBOSITY) if not os.path.exists(new_waverange): extend_wavelengthrange( ref_file=os.path.basename(wcs_reference_files["wavelengthrange"]), ranges=ranges, ) # Use new reference wcs_reference_files["wavelengthrange"] = new_waverange # slit_y_range = [wstep.slit_y_low, wstep.slit_y_high] try: # with_wcs = load_wcs(dm, wcs_reference_files, slit_y_range) # with_wcs = wstep.call(dm, override_wavelengthrange=new_waverange) wstep.override_wavelengthrange = new_waverange with_wcs = wstep.run(dm) except NoDataOnDetectorError: msg = f"{file} No open slits found to work on" utils.log_comment(utils.LOGFILE, msg, verbose=VERBOSITY) utils.LOGFILE = ORIG_LOGFILE return None return with_wcs
[docs]def run_pipeline( file, slit_index=0, all_slits=True, write_output=True, set_log=True, skip_existing_log=False, undo_flat=True, preprocess_kwargs={}, ranges=EXTENDED_RANGES, make_trace_figures=False, run_pathloss=True, run_barshadow=True, mask_zeroth_kwargs={}, **kwargs, ): """ Pipeline for extending reference files Parameters ---------- file : str Exposure (rate.fits) filename slit_index : int Index of a single slit to extract all_slits : bool Extract all slits of the final `jwst.datamodles.MultiSlit` model write_output : bool Write output extracted `jwst.datamodels.SlitModel` objects to separate files set_log : bool Set log file based on the input ``file`` skip_existing_log : bool Skip subsequent processing if the log file from ``set_log`` is found in the working directory. undo_flat : bool Main switch for using dummy monochromatic flat reference files. This needs to be True for the extracted spectra to not be masked with NaN beyond where the current reference files are defined preprocess_kwargs : dict Keyword arguments for `msaexp.pipeline.exposure_detector_effects` preprocessing. Skip if ``None`` ranges : dict Full extended wavelength ranges by FILTER / GRATING make_trace_figures : bool Make some diagnostic figures run_pathloss : bool Run PathLoss step for MSA exposures run_barshadow : bool Run BarShadow step for MSA exposures Returns ------- result : None, `jwst.datamodels.MultiSlitModel` None if an existing log was found, otherwise the final calibrated product. Also returns ``None`` if `jwst.assign_wcs.AssignWcsStep` raises a ``NoDataOnDetectorError`` exception. """ ORIG_LOGFILE = utils.LOGFILE if set_log: utils.LOGFILE = file.replace("_rate.fits", "_rate.wave_log.txt") if os.path.exists(utils.LOGFILE) & skip_existing_log: utils.LOGFILE = ORIG_LOGFILE print(f"log file {utils.LOGFILE} found, skip") return None msg = f""" ######## # extend_reference_files {time.ctime()} # {file} # slit_index={slit_index} all_slits={all_slits} write_output={write_output} # log to {utils.LOGFILE} """ utils.log_comment(utils.LOGFILE, msg, verbose=VERBOSITY) # Preprocessing if preprocess_kwargs is not None: # 1/f, bias & rnoise status = pipeline.exposure_detector_effects(file, **preprocess_kwargs) with jwst.datamodels.open(file) as dm: EXPOSURE_TYPE = dm.meta.exposure.type if EXPOSURE_TYPE == "NRS_IFU": make_trace_figures = False run_pathloss = False run_barshadow = False mask_zeroth_kwargs = None # write_output = False # Mask zeroth orders if mask_zeroth_kwargs is not None: with pyfits.open(file, mode="update") as hdul: with jwst.datamodels.open(hdul) as input_model: dq, slits_, bounding_boxes = zeroth_order_mask( input_model, **mask_zeroth_kwargs ) if dq.max() > 0: if dq.size != hdul["DQ"].data.size: hdul["DQ"].data |= dq else: subarray_ = input_model.meta.subarray slx_ = slice( subarray_.xstart - 1, subarray_.xstart - 1 + subarray_.xsize, ) sly_ = slice( subarray_.ystart - 1, subarray_.ystart - 1 + subarray_.ysize, ) hdul["DQ"].data |= dq[sly_, slx_] hdul.flush() wstep = AssignWcsStep() msg = f"{file} AssignWCS" utils.log_comment(utils.LOGFILE, msg, verbose=VERBOSITY) ############## # AssignWCS with extended wavelength range with jwst.datamodels.open(file) as dm: wcs_reference_files = step_reference_files(wstep, dm) new_waverange = os.path.basename( wcs_reference_files["wavelengthrange"] ) new_waverange = new_waverange.replace(".asdf", "_ext.asdf") print(f"extend_wavelengthrange: {new_waverange}") if not os.path.exists(new_waverange): extend_wavelengthrange( ref_file=os.path.basename( wcs_reference_files["wavelengthrange"] ), ranges=ranges, ) # Use new reference wcs_reference_files["wavelengthrange"] = new_waverange # slit_y_range = [wstep.slit_y_low, wstep.slit_y_high] try: # with_wcs = load_wcs(dm, wcs_reference_files, slit_y_range) wstep.override_wavelengthrange = new_waverange with_wcs = wstep.run(dm) except NoDataOnDetectorError: msg = f"{file} No open slits found to work on" utils.log_comment(utils.LOGFILE, msg, verbose=VERBOSITY) utils.LOGFILE = ORIG_LOGFILE return None if EXPOSURE_TYPE in ["NRS_MSASPEC", "NRS_IFU"]: ############ # MSAFlagOpen msg = f"{file} MSAFlagOpen" utils.log_comment(utils.LOGFILE, msg, verbose=VERBOSITY) flag_open = MSAFlagOpenStep().run(with_wcs) else: flag_open = with_wcs ############ # Extract2D if EXPOSURE_TYPE == "NRS_IFU": _slit = ext2d = flag_open _slit.name = "ifu" all_slits = False else: msg = f"{file} Extract2D" utils.log_comment(utils.LOGFILE, msg, verbose=VERBOSITY) step2d = Extract2dStep() ext2d = step2d.run(flag_open) _slit_index = None if EXPOSURE_TYPE == "NRS_FIXEDSLIT": for ind, slit in enumerate(ext2d.slits): if f"NRS_{slit.name}_SLIT".upper() == ext2d.meta.aperture.name: _slit_index = ind break if _slit_index is not None: msg = f"{file} use slit_index = {_slit_index} for {slit.name}" utils.log_comment(utils.LOGFILE, msg, verbose=VERBOSITY) slit_index = _slit_index all_slits = False _slit = ext2d[slit_index] if _slit.meta.target.catalog_name is not None: targ_ = _slit.meta.target.catalog_name.replace(" ", "-").replace( "_", "-" ) targ_ = targ_.replace("(", "").replace(")", "").replace("/", "") else: targ_ = "cat" inst_key = ( f"{_slit.meta.instrument.filter}_{_slit.meta.instrument.grating}" ) if EXPOSURE_TYPE in ["NRS_FIXEDSLIT", "NRS_IFU"]: slit_prefix_ = ( f"{file.split('_rate')[0]}_{targ_}_{inst_key}_{_slit.name}".lower() ) else: if undo_flat: plabel = "raw" else: plabel = "phot" slit_prefix_ = f"{file.split('_rate')[0]}_{inst_key}_{plabel}.{_slit.name}.{_slit.source_name}".lower() det = _slit.meta.instrument.detector msg = f"{file} {inst_key} {det}" utils.log_comment(utils.LOGFILE, msg, verbose=VERBOSITY) ############### # Trace figures if make_trace_figures: xtr, ytr, wtr, rs, ds = msautils.slit_trace_center( _slit, with_source_xpos=False, with_source_ypos=False ) fig, ax = plt.subplots(1, 1, figsize=(12, 5)) ax.imshow(_slit.data, aspect="auto", vmin=-0.1, vmax=2, cmap="magma_r") ax.plot(xtr, ytr, color="magenta") ax.set_title(f"{inst_key} {det}") fig.tight_layout(pad=1) fig.savefig(f"{slit_prefix_}_trace.png".lower()) ############ # Flat-field msg = f"{file} FlatFieldStep" utils.log_comment(utils.LOGFILE, msg, verbose=VERBOSITY) flat_step = FlatFieldStep() flat_reference_files = step_reference_files(flat_step, ext2d) if inst_key in ranges: full_range = ranges[inst_key] else: full_range = None undo_flat = False if undo_flat: ######## # F-Flat new_fflat_filename = os.path.basename( flat_reference_files["fflat"] ).replace(".fits", "_ext.fits") msg = f"{file} fflat = '{new_fflat_filename}'" utils.log_comment(utils.LOGFILE, msg, verbose=VERBOSITY) if not os.path.exists(new_fflat_filename): if EXPOSURE_TYPE == "NRS_MSASPEC": ff = extend_quad_fflat( flat_reference_files["fflat"], full_range=full_range, log_prefix=file, ) else: ff = extend_fs_fflat( flat_reference_files["fflat"], full_range=full_range, log_prefix=file, ) ff.write(new_fflat_filename, overwrite=True) flat_reference_files["fflat"] = new_fflat_filename ######## # S-Flat new_sflat_filename = os.path.basename( flat_reference_files["sflat"] ).replace(".fits", "_ext.fits") msg = f"{file} sflat = '{new_sflat_filename}'" utils.log_comment(utils.LOGFILE, msg, verbose=VERBOSITY) if not os.path.exists(new_sflat_filename): sf = extend_sflat( flat_reference_files["sflat"], full_range=full_range, log_prefix=file, ) sf.write(new_sflat_filename, overwrite=True) flat_reference_files["sflat"] = new_sflat_filename ######## # D-FLAT new_dflat_filename = os.path.basename( flat_reference_files["dflat"] ).replace(".fits", "_ext.fits") msg = f"{file} dflat = '{new_dflat_filename}'" utils.log_comment(utils.LOGFILE, msg, verbose=VERBOSITY) if not os.path.exists(new_dflat_filename): wmin, wmax = 10, 0 for k in ranges: wmin = np.minimum(ranges[k][0], wmin) wmax = np.maximum(ranges[k][1], wmax) df = extend_dflat( flat_reference_files["dflat"], full_range=[wmin, wmax], log_prefix=file, ) df.write(new_dflat_filename, overwrite=True) flat_reference_files["dflat"] = new_dflat_filename ######################### # Metadata for fixed slit if EXPOSURE_TYPE == "NRS_FIXEDSLIT": for _slit in ext2d.slits: msautils.update_slit_metadata(_slit) # Run the pipeline step with the updated references (or the originals) # flat_corr = flat_step.call( # ext2d, # override_fflat=flat_reference_files["fflat"], # override_sflat=flat_reference_files["sflat"], # override_dflat=flat_reference_files["dflat"], # ) flat_step.override_fflat = flat_reference_files["fflat"] flat_step.override_sflat = flat_reference_files["sflat"] flat_step.override_dflat = flat_reference_files["dflat"] flat_corr = flat_step.run(ext2d) if run_pathloss: ########## # PathLoss msg = f"{file} NRS_MSASPEC run PathLossStep" utils.log_comment(utils.LOGFILE, msg, verbose=VERBOSITY) path_step = PathLossStep() path_filename = path_step.get_reference_file(flat_corr, "pathloss") if full_range is not None: new_path_filename = os.path.basename(path_filename).replace( ".fits", "_ext.fits" ) if not os.path.exists(new_path_filename): path_hdul = extend_pathloss( path_filename, full_range=full_range ) path_hdul.writeto(new_path_filename, overwrite=True) path_hdul.close() # path_result = path_step.call( # flat_corr, override_pathloss=new_path_filename # ) path_step.override_pathloss = new_path_filename path_result = path_step.run(flat_corr) else: path_result = path_step.run(flat_corr) else: path_result = flat_corr last_result = path_result if EXPOSURE_TYPE == "NRS_MSASPEC": if run_barshadow: ########### # BarShadow msg = f"{file} NRS_MSASPEC run BarShadowStep" utils.log_comment(utils.LOGFILE, msg, verbose=VERBOSITY) bar_step = BarShadowStep() bar_filename = bar_step.get_reference_file( path_result, "barshadow" ) if full_range is not None: new_bar_filename = os.path.basename(bar_filename).replace( ".fits", "_ext.fits" ) if not os.path.exists(new_bar_filename): bar_hdul = extend_barshadow( bar_filename, full_range=full_range ) bar_hdul.writeto(new_bar_filename, overwrite=True) bar_hdul.close() # bar_result = bar_step.call( # path_result, override_barshadow=new_bar_filename # ) bar_step.override_barshadow = new_bar_filename bar_result = bar_step.run(path_result) else: bar_result = bar_step.run(path_result) last_result = bar_result ######## # Photom phot_step = PhotomStep() phot_filename = phot_step.get_reference_file(last_result, "photom") area_filename = phot_step.get_reference_file(last_result, "area") phot_step = PhotomStep() if full_range is not None: new_phot_filename = os.path.basename(phot_filename).replace( ".fits", "_ext.fits" ) msg = f"{file} photom = '{new_phot_filename}'" utils.log_comment(utils.LOGFILE, msg, verbose=VERBOSITY) if not os.path.exists(new_phot_filename): pref = extend_photom(phot_filename, ranges=ranges) pref.write(new_phot_filename, overwrite=True) # Apply photom phot_step.override_photom = new_phot_filename result = phot_step.run(last_result) else: result = phot_step.run(last_result) ######## # Figure if make_trace_figures: fig, ax = plt.subplots(1, 1, figsize=(12, 5)) ax.imshow( result.slits[slit_index].data, aspect="auto", vmin=-0.1 * 20, vmax=2 * 20, cmap="magma_r", ) ax.plot(xtr, ytr, color="magenta") ax.set_title(f"{inst_key} {det}") fig.tight_layout(pad=1) fig.savefig(f"{slit_prefix_}_final.png".lower()) # result.write(os.path.basename(file).replace('_rate.fits', '_photom.fits')) ######## # Write calibrated slitlet files if write_output & (EXPOSURE_TYPE == "NRS_IFU"): cal_file = file.replace("_rate.fits", "_cal.fits") msg = f"{file} write calibrated IFU exposure cal_file" utils.log_comment(utils.LOGFILE, msg, verbose=VERBOSITY) result.write(cal_file, overwrite=True) elif write_output: if all_slits: slit_list = result.slits else: slit_list = result.slits[slit_index : slit_index + 1] for _slit in slit_list: if _slit.meta.exposure.type == "NRS_FIXEDSLIT": slit_prefix_ = f"{file.split('_rate')[0]}_{targ_}_{inst_key}_{_slit.name}".lower() else: if undo_flat: plabel = "raw" else: plabel = "phot" try: source_alias = slit_best_source_alias( _slit, require_primary=False, which="min", verbose=False, ) except: source_alias = _slit.source_name slit_prefix_ = f"{file.split('_rate')[0]}_{inst_key}_{plabel}.{_slit.name}.{source_alias}".lower() slit_file = slit_prefix_ + ".fits" msg = f"{file} write slitlet {slit_file}" utils.log_comment(utils.LOGFILE, msg, verbose=VERBOSITY) slit_model = jwst.datamodels.SlitModel(_slit.instance) slit_model.write(slit_file, overwrite=True) with pyfits.open(slit_file, mode="update") as im: im[0].header["NOFLAT"] = ( True, "Dummy flat field with extended wavelength references", ) im[0].header["R_FFLAT"] = flat_reference_files["fflat"] im[0].header["R_SFLAT"] = flat_reference_files["sflat"] im[0].header["R_DFLAT"] = flat_reference_files["dflat"] im[0].header["R_PHOTOM"] = new_phot_filename im.flush() utils.LOGFILE = ORIG_LOGFILE return result