Source code for mirar.pipelines.sedmv2.load_sedmv2_image

"""
Module for loading raw SEDMv2 images and ensuring they have the correct format
"""
import logging
import os
from pathlib import Path

import numpy as np
from astropy.io import fits
from astropy.time import Time

from mirar.data import Image
from mirar.io import open_mef_fits, open_mef_image
from mirar.paths import (
    BASE_NAME_KEY,
    EXPTIME_KEY,
    GAIN_KEY,
    OBSCLASS_KEY,
    PROC_FAIL_KEY,
    PROC_HISTORY_KEY,
    RAW_IMG_KEY,
    TARGET_KEY,
    TIME_KEY,
    __version__,
)
from mirar.processors.skyportal.skyportal_source import SNCOSMO_KEY
from mirar.processors.utils.image_loader import InvalidImage

logger = logging.getLogger(__name__)


[docs] def clean_science_header( # pylint: disable=too-many-branches header: fits.Header, split_headers: list[fits.Header], is_mode0: bool ) -> tuple[fits.Header, list[fits.Header]]: """ function to modify the primary header of an SEDMv2 science file :param header: original primary header of science file :param split_headers: the remaining headers, one for each extension of MEF :param is_mode0: True if observed in SEDMv2 observation mode 0 :return: modified primary header """ if is_mode0: # main information is not stored in primary header for mode0 informative_hdr = split_headers[0] else: informative_hdr = header header[OBSCLASS_KEY] = "science" header[TARGET_KEY] = "science" # positions header["RA"] = informative_hdr["RAD"] header["DEC"] = informative_hdr["DECD"] header["TELRA"] = informative_hdr["TELRAD"] header["TELDEC"] = informative_hdr["TELDECD"] # gain if GAIN_KEY in informative_hdr: if header[GAIN_KEY] == 0.0: header[GAIN_KEY] = 1.0 else: header[GAIN_KEY] = 1.0 for ext in split_headers: if GAIN_KEY in ext: if ext[GAIN_KEY] == 0.0: ext[GAIN_KEY] = 1.0 else: ext[GAIN_KEY] = 1.0 # time header["MJD-OBS"] = date_obs_to_mjd(informative_hdr["DATE-OBS"]) header["MJD"] = header["MJD-OBS"] header[TIME_KEY] = Time(header["MJD-OBS"], format="mjd").isot # filters header["FILTERID"] = informative_hdr["FILTER"].split(" ")[1][0] header["FILTER"] = header["FILTERID"] header[SNCOSMO_KEY] = "sdss" + header["FILTER"] if is_mode0: informative_hdr[TIME_KEY] = Time(header["MJD-OBS"], format="mjd").isot informative_hdr["FILTER"] = header["FILTER"] informative_hdr["FILTERID"] = header["FILTERID"] # keys, IDs ("core fields") header[PROC_HISTORY_KEY] = "" header["PROCFLAG"] = 0 header[PROC_FAIL_KEY] = "" header["PROGID"] = int(3) # sedmv2's ID header["COADDS"] = 1 if not isinstance(informative_hdr["OBJECTID"], str): if isinstance(informative_hdr["QCOMMENT"], str): header["OBJECTID"] = informative_hdr["QCOMMENT"].split("_")[0] if not EXPTIME_KEY in informative_hdr: header[EXPTIME_KEY] = informative_hdr[ "EXPOSURE" ] # be weary - exposure is not always reliable return header, split_headers
[docs] def clean_cal_header( hdr0: fits.Header, hdr1: fits.Header, filepath ) -> tuple[fits.Header, list[fits.Header]]: """ function to modify the primary header of an SEDMv2 calibration file (flat or bias) :param hdr0: original primary header of calibration file :param hdr1: original secondary header of calibration file :return: modified headers """ hdr0[OBSCLASS_KEY] = hdr1["IMGTYPE"].lower() hdr0["IMGTYPE"] = hdr1["IMGTYPE"] hdr0[TARGET_KEY] = hdr0[OBSCLASS_KEY] # flat/bias-specific keys if hdr0["IMGTYPE"] == "flat": filt = filepath.split("flat_s")[1][0] # sedm-specific file name structure hdr0["FILTERID"] = filt hdr0["FILTER"] = filt # f"SDSS {filt}'" #f"SDSS {filt}' (Chroma)" if hdr0["IMGTYPE"] == "bias": hdr0["FILTER"] = "g" # arbitrary filter for bias hdr0["FILTERID"] = "g" # arbitrary filter for bias hdr0["SOURCE"] = "None" hdr0["COADDS"] = 1 hdr0["CALSTEPS"] = "" hdr0["PROCFAIL"] = 1 hdr0[GAIN_KEY] = 1.0 req_headers = [ "RA", "DEC", "TELRA", "TELDEC", "RAD", "DECD", "TELRAD", "TELDECD", "UTC", "DATE", ] default_vals = [ "+00:00:00", "+00:00:00", "+00:00:00", "+00:00:00", 0.0, 0.0, 0.0, 0.0, "20221223_023123.072011", "2022-12-23T02:31:23.073", ] # can these be changed? shortened? if EXPTIME_KEY not in hdr1: hdr1[EXPTIME_KEY] = hdr1["EXPOSURE"] for count, key in enumerate(req_headers): hdr0[key] = default_vals[count] return hdr0, [hdr1]
[docs] def load_raw_sedmv2_mef( path: str | Path, ) -> tuple[fits.Header, list[np.array], list[fits.Header]]: """ Load mef image """ sedmv2_ignore_files = [ "dark", "sedm2", "speccal", ] if any(ext in Path(path).name for ext in sedmv2_ignore_files): logger.debug(f"Skipping unneeded SEDMv2 file {path}.") raise InvalidImage header, split_data, split_headers = open_mef_fits(path) if "IMGTYPE" in header.keys(): # all modes except mode0 check_header = header skip_first = True is_mode0 = False else: # mode0 check_header = split_headers[0] skip_first = False # there is only 1 extension in mode0 is_mode0 = True if check_header["IMGTYPE"] == "object": if skip_first: split_data = split_data[1:] split_headers = split_headers[1:] header, split_headers = clean_science_header(header, split_headers, is_mode0) elif check_header["IMGTYPE"] in ["flat", "bias"]: header, split_headers = clean_cal_header(header, split_headers[0], path) else: logger.debug("Unexpected IMGTYPE. Is this a dark?") header[BASE_NAME_KEY] = os.path.basename(path) header[RAW_IMG_KEY] = path header["EXPID"] = int("".join(header[BASE_NAME_KEY].split("_")[1:3])[2:]) pipeline_version = __version__ pipeline_version_padded_str = "".join( [x.rjust(2, "0") for x in pipeline_version.split(".")] ) header["PROCID"] = int(str(header["EXPID"]) + str(pipeline_version_padded_str)) return header, split_data, split_headers
[docs] def load_sedmv2_mef_image( path: str | Path, ) -> list[Image]: """ Function to load sedmv2 mef images :param path: Path to image :return: list of images """ return open_mef_image(path, load_raw_sedmv2_mef)
[docs] def date_obs_to_mjd(t_raw: str) -> str: """ function to convert DATE-OBS from raw SEDMv2 headers into MJD :param t_raw: date from SEDMv2 header :return: time in MJD example: 20230609_102119.377549 -> 60104.43147427719 """ ymd, hms = t_raw.split("_") hour, mins, sec = hms[:2], hms[2:4], hms[4:] date = ymd[:4] + "-" + ymd[4:6] + "-" + ymd[6:] time = hour + ":" + mins + ":" + sec t_mjd = Time(date + " " + time, format="iso").mjd return t_mjd