Source code for mirar.pipelines.summer.load_summer_image

"""
Module with functions to load raw and processed summer images
"""
import logging
import warnings
from pathlib import Path

import astropy
import numpy as np
from astropy import units as u
from astropy.coordinates import SkyCoord
from astropy.time import Time
from astropy.utils.exceptions import AstropyWarning

from mirar.data import Image
from mirar.io import open_fits, open_raw_image
from mirar.paths import (
    BASE_NAME_KEY,
    GAIN_KEY,
    LATEST_SAVE_KEY,
    OBSCLASS_KEY,
    PROC_FAIL_KEY,
    PROC_HISTORY_KEY,
    RAW_IMG_KEY,
    TARGET_KEY,
    __version__,
)
from mirar.pipelines.summer.models import DEFAULT_FIELD, SUMMER_NIGHT_FORMAT

logger = logging.getLogger(__name__)


[docs] def load_raw_summer_fits(path: str | Path) -> tuple[np.array, astropy.io.fits.Header]: """ Function to load a raw summer image and add/modify the required headers Args: path: Path to the raw image Returns: [image data, image header] """ if isinstance(path, str): path = Path(path) data, header = open_fits(path) with warnings.catch_warnings(): warnings.simplefilter("ignore", AstropyWarning) header[OBSCLASS_KEY] = header["OBSTYPE"].lower() header["UTCTIME"] = str(header["UTCSHUT"]).replace(" ", "T") header["MJD-OBS"] = header["OBSMJD"] header["DATE-OBS"] = Time(header["OBSMJD"], format="mjd").isot # If the image is a calibration image, set the target name to the OBSTYPE. # If it is a science image, it is either a field observation, or a ToO with # a target name. If it is a field observation, set the target name to the # field ID. If it is a ToO, set the target name to the TARGNAME. if header[OBSCLASS_KEY] == "science": if "TARGNAME" in header: target_name = header["TARGNAME"] else: target_name = header[OBSCLASS_KEY] if target_name == "": target_name = f"field_{header['FIELDID']}" else: target_name = header[OBSCLASS_KEY] try: header[TARGET_KEY] = target_name.lower() except (ValueError, AttributeError): header[TARGET_KEY] = target_name crd = SkyCoord(ra=header["RA"], dec=header["DEC"], unit=(u.deg, u.deg)) header["RA"] = crd.ra.deg header["DEC"] = crd.dec.deg header["CRVAL1"] = header["RA"] header["CRVAL2"] = header["DEC"] tel_crd = SkyCoord( ra=header["TELRA"], dec=header["TELDEC"], unit=(u.deg, u.deg), ) header["TELRA"] = tel_crd.ra.deg header["TELDEC"] = tel_crd.dec.deg header["BZERO"] = 0 header[LATEST_SAVE_KEY] = path.as_posix() header[RAW_IMG_KEY] = path.as_posix() data = data * 1.0 # pylint: disable=no-member if "other" in header["FILTERID"]: header["FILTERID"] = "r" header[PROC_HISTORY_KEY] = "" base_name = path.name header[BASE_NAME_KEY] = base_name pipeline_version = __version__ pipeline_version_padded_str = "".join( [x.rjust(2, "0") for x in pipeline_version.split(".")] ) obstime = Time(header["UTCISO"], format="iso") header["EXPID"] = int( (obstime.mjd - 59000.0) * 86400.0 ) # seconds since 60000 MJD header["PROCID"] = int(str(header["EXPID"]) + str(pipeline_version_padded_str)) header["OBSDATE"] = int(header["UTC"].split("_")[0]) header["TIMEUTC"] = header["UTCISO"] header["NIGHTDATE"] = obstime.to_datetime().strftime(SUMMER_NIGHT_FORMAT) header["EXPMJD"] = header["OBSMJD"] header["OBS-DATE"] = Time(header["OBSMJD"], format="mjd").isot default_id = 0 for key in ["PROGID", "OBSID"]: if key not in header.keys(): # logger.warning(f"No {key} found in header of {path}") header[key] = default_id else: try: header[key] = int(header[key]) except ValueError: header[key] = default_id if "SUBPROG" not in header.keys(): # logger.warning(f"No SUBPROG found in header of {path}") header["SUBPROG"] = "none" header["FILTER"] = header["FILTERID"] header["FID"] = header["FILPOS"] try: header["SHUTOPEN"] = Time(header["SHUTOPEN"], format="iso").jd except ValueError: logger.debug(f"Error parsing 'SHUTOPEN' of {path}: ({header['SHUTOPEN']})") try: header["SHUTCLSD"] = Time(header["SHUTCLSD"], format="iso").jd except ValueError: logger.debug(f"Error parsing 'SHUTCLSD' of {path}: ({header['SHUTCLSD']})") header["PROCFLAG"] = 0 header[PROC_FAIL_KEY] = "" sunmoon_keywords = [ "MOONRA", "MOONDEC", "MOONILLF", "MOONPHAS", "MOONALT", "SUNAZ", "SUNALT", ] for key in sunmoon_keywords: val = 0 if key in header.keys(): if header[key] not in [""]: val = header[key] header[key] = val itid_dict = { "SCIENCE": 1, "BIAS": 2, "FLAT": 2, "DARK": 2, "FOCUS": 3, "POINTING": 4, "OTHER": 5, } if not header[OBSCLASS_KEY] in itid_dict: header["ITID"] = 5 else: header["ITID"] = itid_dict[header[OBSCLASS_KEY]] if header["FIELDID"] == "radec": header["FIELDID"] = DEFAULT_FIELD if header["ITID"] != 1: header["FIELDID"] = DEFAULT_FIELD if "COADDS" not in header.keys(): header["COADDS"] = 1 if "PROGPI" not in header.keys(): header["PROGPI"] = "?" if header["PROGID"] in ["", "WINTER"]: header["PROGID"] = 2 try: header["PROGID"] = int(header["PROGID"]) except ValueError: try: progpi = header["PROGID"] header["PROGID"] = int(header["PROGPI"]) header["PROGPI"] = progpi except KeyError: header["PROGID"] = 1 # TODO Figure out how if database query is required for this. header["PUID"] = header["PROGID"] crds = SkyCoord(ra=header["RA"], dec=header["DEC"], unit=(u.deg, u.deg)) header["RA"] = crds.ra.deg header["DEC"] = crds.dec.deg # TODO Write a processor to calculate the split QID header["QID"] = 1 header["PROCSTATUS"] = 0 # TODO Figure out what to do about primary keys header["RAWID"] = header["EXPID"] # + subdetid if GAIN_KEY not in header.keys(): header[GAIN_KEY] = 1.0 return data, header # pylint: disable=no-member
[docs] def load_raw_summer_image(path: str | Path) -> Image: """ Function to load a raw summer image and add/modify the required headers :param path: Path to the raw image :return: Image object """ return open_raw_image(path, load_raw_summer_fits)
[docs] def load_proc_summer_image(path: str) -> Image: """ Function to load a processed summer image and add/modify the required headers Args: path: Path to the processed image Returns: [image data, image header] """ data, header = open_fits(path) if "ZP" not in header.keys(): header["ZP"] = header["ZP_AUTO"] header["ZP_std"] = header["ZP_AUTO_std"] header["CENTRA"] = header["CRVAL1"] header["CENTDEC"] = header["CRVAL2"] if "TARGET" in header.keys(): header[TARGET_KEY] = header["TARGET"] pipeline_version = __version__ pipeline_version_padded_str = "".join( [x.rjust(2, "0") for x in pipeline_version.split(".")] ) header["DIFFID"] = int(str(header["EXPID"]) + str(pipeline_version_padded_str)) if "PROCID" not in header.keys(): header["PROCID"] = header["DIFFID"] header["TIMEUTC"] = header["UTCISO"] data[data == 0] = np.nan return Image(data, header)