Source code for mirar.processors.astrometry.autoastrometry.io

"""
Module component of autoastrometry, handling I/O
"""
import logging
import os
from pathlib import Path
from typing import Optional

import ephem
import numpy as np
from astropy.io import fits

from mirar.processors.astrometry.autoastrometry.sources import (
    BaseSource,
    SextractorSource,
)
from mirar.processors.astrometry.autoastrometry.utils import dec_str_2_deg, ra_str_2_deg

logger = logging.getLogger(__name__)


[docs] def parse_header( file_path: str | Path, temp_path: str | Path, pixel_scale: Optional[float] = None, pa: Optional[float] = None, inv: bool = False, user_ra_deg: Optional[float] = None, user_dec_deg: Optional[float] = None, ) -> tuple[int, int, float, float, float, float, float, float, float, float]: """ Extract relevant info for astrometry from header of file :param file_path: File to parse :param temp_path: Temp path to write updated version to :param pixel_scale: Pixel scale :param pa: PA :param inv: :param user_ra_deg: :param user_dec_deg: :return: nxpix, nypix, cd11, cd12, cd21, cd22, crpix1, crpix2, cra, cdec """ sci_ext = 0 # Get some basic info from the header with fits.open(file_path) as hdu: hdu.verify("silentfix") header = hdu[sci_ext].header if np.logical_and(pixel_scale is not None, pa is None): pa = 0 # Check for old-style WCS header if pixel_scale is None: old_wcs_type = False for hkey in header.keys(): if hkey in ["CDELT1", "CDELT2"]: old_wcs_type = True if old_wcs_type: key = "CDELT1" cdelt1 = header[key] key = "CDELT2" cdelt2 = header[key] try: c_rot = 0 key = "CROTA1" c_rot = header[key] key = "CROTA2" c_rot = header[key] except KeyError: pass if ( np.sqrt(cdelt1**2 + cdelt2**2) < 0.1 ): # some images use CDELT to indicate nonstandard things header["CD1_1"] = cdelt1 * np.cos(c_rot * np.pi / 180.0) header["CD1_2"] = -cdelt2 * np.sin(c_rot * np.pi / 180.0) header["CD2_1"] = cdelt1 * np.sin(c_rot * np.pi / 180.0) header["CD2_2"] = cdelt2 * np.cos(c_rot * np.pi / 180.0) if np.logical_and(pixel_scale is not None, pa is not None): # Create WCS header information if pixel scale is specified pa_rad = pa * np.pi / 180.0 px_scale_deg = pixel_scale / 3600.0 if inv > 0: parity = -1 else: parity = 1 if user_ra_deg is not None: ra = user_ra_deg else: ra = ra_str_2_deg(header["CRVAL1"]) if user_dec_deg is not None: dec = user_dec_deg else: dec = dec_str_2_deg(header["CRVAL2"]) try: epoch = float(header.get("EPOCH", 2000)) except KeyError: logger.warning("No EPOCH found in header. Assuming 2000") epoch = 2000.0 try: equinox = float( header.get("EQUINOX", epoch) ) # If RA and DEC are not J2000 then convert except KeyError: logger.warning("No EQUINOX found in header. Assuming 2000") equinox = 2000.0 # could be 'J2000'; try to strip off first character? if abs(equinox - 2000) > 0.5: logger.debug(f"Converting equinox from {equinox} to J2000") j2000 = ephem.Equatorial( ephem.Equatorial(str(ra / 15), str(dec), epoch=str(equinox)), epoch=ephem.J2000, ) [ra, dec] = [ra_str_2_deg(j2000.ra), dec_str_2_deg(j2000.dec)] header["CD1_1"] = px_scale_deg * np.cos(pa_rad) * parity header["CD1_2"] = px_scale_deg * np.sin(pa_rad) header["CD2_1"] = -px_scale_deg * np.sin(pa_rad) * parity header["CD2_2"] = px_scale_deg * np.cos(pa_rad) header["CRPIX1"] = header["NAXIS1"] / 2 header["CRPIX2"] = header["NAXIS2"] / 2 header["CRVAL1"] = ra header["CRVAL2"] = dec header["CTYPE1"] = "RA---TAN" header["CTYPE2"] = "DEC--TAN" header["EQUINOX"] = 2000.0 hdu[sci_ext].header = header hdu.writeto( temp_path, output_verify="silentfix", overwrite=True ) # ,clobber=True # Read the header info from the file. try: # no longer drawing RA and DEC from here. key = "NAXIS1" nxpix = header[key] key = "NAXIS2" nypix = header[key] except KeyError: err = f"Cannot find necessary WCS header keyword {key}" logger.debug(err) raise try: key = "CRVAL1" cra = float(header[key]) key = "CRVAL2" cdec = float(header[key]) key = "CRPIX1" crpix1 = float(header[key]) key = "CRPIX2" crpix2 = float(header[key]) key = "CD1_1" cd11 = float(header[key]) key = "CD2_2" cd22 = float(header[key]) key = "CD1_2" cd12 = float(header[key]) # deg / pix key = "CD2_1" cd21 = float(header[key]) equinox = float(header.get("EQUINOX", 2000.0)) if abs(equinox - 2000.0) > 0.2: logger.debug("Warning: EQUINOX is not 2000.0") except KeyError: if pixel_scale == -1: err = ( f"Cannot find necessary WCS header keyword '{key}' \n " f"Must specify pixel scale (-px VAL) or " f"provide provisional basic WCS info via CD matrix." ) logger.error(err) raise # Some images might use CROT parameters, could try to be compatible with # this too...? # Wipe nonstandard hdu info from the header (otherwise this will confuse # verification) header_keys = list(header.keys()) ctype_change = 0 iraf_keys = [] high_keys = [] old_keys = [] distortion_keys = [] for hkey in header_keys: if ( hkey == "RADECSYS" or hkey == "WCSDIM" or hkey.find("WAT") == 0 or hkey.find("LTV") >= 0 or hkey.find("LTM") == 0 ): del header[hkey] iraf_keys.append(hkey) if ( hkey.find("CO1_") == 0 or hkey.find("CO2_") == 0 or hkey.find("PV1_") == 0 or hkey.find("PV2_") == 0 or hkey.find("PC00") == 0 ): del header[hkey] high_keys.append(hkey) if ( hkey.find("CDELT1") == 0 or hkey.find("CDELT2") == 0 or hkey.find("CROTA1") == 0 or hkey.find("CROTA2") == 0 ): del header[hkey] old_keys.append(hkey) if ( hkey.find("A_") == 0 or hkey.find("B_") == 0 or hkey.find("AP_") == 0 or hkey.find("BP_") == 0 ): del header[hkey] distortion_keys.append(hkey) if header["CTYPE1"] != "RA---TAN": logger.debug(f"Changing CTYPE1 from '{header['CTYPE1']}' to 'RA---TAN'") header["CTYPE1"] = "RA---TAN" ctype_change = 1 if header["CTYPE2"] != "DEC--TAN": if ctype_change: logger.debug(f"Changing CTYPE2 from '{header['CTYPE2']}' to 'DEC--TAN'") header["CTYPE2"] = "DEC--TAN" ctype_change = 1 wcs_key_check = [ "CRVAL1", "CRVAL2", "CRPIX1", "CRPIX2", "CD1_1", "CD1_2", "CD2_2", "CD2_1", "EQUINOX", "EPOCH", ] header_format_change = False for w_key in wcs_key_check: if isinstance(w_key, str): try: header[w_key] = float(header[w_key]) header_format_change = True except KeyError: pass if len(iraf_keys) > 0: logger.warning(f"Removed nonstandard WCS keywords: {iraf_keys}") if len(high_keys) > 0: logger.warning(f"Removed higher-order WCS keywords: {high_keys}") if len(old_keys) > 0: logger.warning(f"Removed old-style WCS keywords: {old_keys}") if len(distortion_keys) > 0: logger.warning(f"Removed distortion WCS keywords: {distortion_keys}") if ( len(high_keys) + len(distortion_keys) + ctype_change + header_format_change > 0 ): # Rewrite and reload the image if the header # was modified in a significant way so # sextractor sees the same thing that we do. hdu[sci_ext].header = header hdu.writeto( temp_path, output_verify="silentfix", overwrite=True ) # ,clobber=True return nxpix, nypix, cd11, cd12, cd21, cd22, crpix1, crpix2, cra, cdec
[docs] def write_text_file(file_path: str, src_list: list[BaseSource]): """ Write a text file with a list of sources :param file_path: Output file :param src_list: List of sources :return: None """ logger.debug(f"Saving text file to {file_path}") with open(file_path, "w", encoding="utf8") as out: for src in src_list: out.write(f"{src.ra_deg:11.7f} {src.dec_deg:11.7f} {src.mag:5.2f}\n")
[docs] def write_region_file( file_path: str, src_list: list[BaseSource], color: str = "green", system: Optional[str] = None, ): """ Write a region file :param file_path: Output path :param src_list: List of sources :param color: Colour to use :param system: system to use (default wcs) :return: """ if system is None: system = "wcs" if system not in ["wcs", "img"]: err = f"Did not recognise system '{system}'. Valid values are 'wcs' and 'img'." logger.error(err) raise ValueError(err) logger.debug(f"Saving region file to {file_path}") with open(file_path, "w", encoding="utf8") as out: out.write( f"# Region file format: DS9 version 4.0\n" f'global color={color} font="helvetica 10 normal" select=1 ' f"highlite=1 edit=1 move=1 delete=1 include=1 fixed=0 source\n" ) if system == "wcs": out.write("fk5\n") for i, src in enumerate(src_list): out.write( f"point({src.ra_deg:.7f},{src.dec_deg:.7f}) " f"# point=boxcircle text={{{i + 1}}}\n" ) elif system == "img": out.write("image\n") for i, src in enumerate(src_list): out.write( f"point({src.x:.3f},{src.y:.3f}) " f"# point=boxcircle text={{{i + 1}}}\n" )
[docs] def export_src_lists( img_src_list: list[SextractorSource], ref_src_list: list[BaseSource], base_output_path: str, ): """ Export both image and reference source lists to txt/region files :param img_src_list: Image sources :param ref_src_list: Reference sources :param base_output_path: base output path :return: None """ write_text_file( file_path=os.path.splitext(base_output_path)[0] + ".det.init.txt", src_list=img_src_list, ) write_region_file( file_path=os.path.splitext(base_output_path)[0] + ".det.im.reg", src_list=img_src_list, color="red", system="img", ) write_text_file( file_path=os.path.splitext(base_output_path)[0] + ".cat.txt", src_list=ref_src_list, ) write_region_file( file_path=os.path.splitext(base_output_path)[0] + ".cat.wcs.reg", src_list=ref_src_list, color="green", system="wcs", )