Source code for mirar.processors.astromatic.swarp.swarp

"""
Module relating to `swarp <https://www.astromatic.net/software/swarp`_
"""
import logging
import shutil
import warnings
from pathlib import Path
from typing import Optional

import numpy as np
from astropy.utils.exceptions import AstropyWarning
from astropy.wcs import WCS

from mirar.data import ImageBatch
from mirar.errors import ProcessorError
from mirar.io import MissingCoreFieldError, check_image_has_core_fields
from mirar.paths import (
    BASE_NAME_KEY,
    EXPTIME_KEY,
    LATEST_SAVE_KEY,
    LATEST_WEIGHT_SAVE_KEY,
    RAW_IMG_KEY,
    STACKED_COMPONENT_IMAGES_KEY,
    SWARP_FLUX_SCALING_KEY,
    TIME_KEY,
    all_astrometric_keywords,
    copy_temp_file,
    get_output_dir,
    get_temp_path,
)
from mirar.processors.astromatic.scamp.scamp import SCAMP_HEADER_KEY
from mirar.processors.astromatic.swarp.swarp_wrapper import run_swarp
from mirar.processors.base_processor import BaseImageProcessor

logger = logging.getLogger(__name__)


[docs] class SwarpError(ProcessorError): """Error relating to swarp"""
[docs] class SwarpWarning(UserWarning): """Warning relating to swarp"""
[docs] class Swarp(BaseImageProcessor): """ Processor to apply Swarp """ base_key = "swarp" def __init__( self, swarp_config_path: str, temp_output_sub_dir: str = "swarp", pixscale: Optional[float] = None, x_imgpixsize: Optional[float] = None, y_imgpixsize: Optional[float] = None, propogate_headerlist: Optional[list] = None, center_type: Optional[str] = None, center_ra: Optional[float] = None, center_dec: Optional[float] = None, gain: Optional[float] = None, include_scamp: bool = True, combine: bool = True, cache: bool = False, subtract_bkg: bool = False, flux_scaling_factor: float = None, calculate_dims_in_swarp: bool = False, header_keys_to_combine: Optional[str | list[str]] = None, ): """ Args: swarp_config_path: str path to config path temp_output_sub_dir: str output sub-directory pixscale: float Pixel scale in degrees. If None, set as median of the pixel scales of input images. x_imgpixsize: float X-dimension in pixels. If None, set as max x-size of input images. If you want a stacked image covering all input images, set calculate_dims_in_swarp to True instead. y_imgpixsize: float Y-dimension in pixels. If None, set as max y-size of input images. propogate_headerlist: list List of header keywords to propagate. Recommended to leave None, the processor will take care of it. center_ra: float Desired central RA of output image. If None, set as the median of the input images. center_dec: Desired central Dec of output image. If None, set as the median of the input images. gain: float Gain include_scamp: bool Whether to include scamp results or not? combine: bool Combine and coadd all images? For reasons internal to Swarp, it is strongly advised to always set this to True (even if you are running Swarp on only one image). The processor will raise an error if you try running it on a Batch with multiple images by setting combine to False. If you want to resample multiple images, just DeBatch them and run Swarp separately. cache: bool Save temporary files? subtract_bkg: Subtract background? flux_scaling_factor: Do you want to scale the images by some factor? There are two ways to provide this value. First, you can specify the scaling factor for the image in the header using the FLXSCALE keyword. This is the preferred way for a batch with multiple images. This calculation should happen somewhere upstream. The second option, which could be used for cases for single images is by providing a float value to this parameter. The processor will then create a FLXSCALE card in the header with this value. By design, Swarp is forced to look for FLXSCALE in the header for scaling, so this keyword should not be used for other purposes. The FLXSCALE keyword is not propagated to the resampled/stacked image to avoid unwanted re-scalings for future resamplings of the image. If you want this logged, you should add it manually to the headers where you calculate it using some keyword other than FLXSCALE. If no value is provided, it defaults to 1 (no scaling). If multiple images are provided without FLXSCALE in their headers, the same scaling will be applied to all of them. header_keys_to_combine: list If specified, will combine the values of the corresponding header keys to a comma-separated value in the stacked images """ super().__init__() self.swarp_config = swarp_config_path self.temp_output_sub_dir = temp_output_sub_dir self.pixscale = pixscale self.propogate_headerlist = propogate_headerlist self.center_ra = center_ra self.center_dec = center_dec self.x_imgpixsize = x_imgpixsize self.y_imgpixsize = y_imgpixsize self.include_scamp = include_scamp self.combine = combine self.cache = cache self.gain = gain self.subtract_bkg = subtract_bkg self.flux_scaling_factor = flux_scaling_factor self.calculate_dims_in_swarp = calculate_dims_in_swarp self.center_type = center_type if self.center_type is not None: assert self.center_type in ["MOST", "ALL", "MANUAL"] self.header_keys_to_combine = header_keys_to_combine if isinstance(self.header_keys_to_combine, str): self.header_keys_to_combine = [self.header_keys_to_combine] def __str__(self) -> str: return "Processor to apply swarp to images, stacking them together."
[docs] def get_swarp_output_dir(self) -> Path: """ Get custom output directory for swarp :return: Swarp directory """ return get_output_dir(self.temp_output_sub_dir, self.night_sub_dir)
def _apply_to_images( self, batch: ImageBatch, ) -> ImageBatch: basenames = [x[BASE_NAME_KEY] for x in batch] sort_inds = np.argsort(basenames) batch = ImageBatch([batch[i] for i in sort_inds]) if len(basenames) != len(set(basenames)): raise SwarpError( "Duplicate basenames in batch, " f"following names were found: {basenames}" ) swarp_output_dir = self.get_swarp_output_dir() swarp_output_dir.mkdir(parents=True, exist_ok=True) swarp_image_list_path = swarp_output_dir.joinpath( Path(batch[0][BASE_NAME_KEY]).name + "_swarp_img_list.txt", ) swarp_weight_list_path = swarp_output_dir.joinpath( Path(batch[0][BASE_NAME_KEY]).name + "_swarp_weight_list.txt", ) logger.debug(f"Writing file list to {swarp_image_list_path}") temp_files = [swarp_image_list_path, swarp_weight_list_path] # If swarp is run with combine -N option, # it outputs an intermediate file called inpname+.resamp.fits. This name is not # changeable. if self.combine: output_image_path = swarp_output_dir.joinpath( Path(batch[0][BASE_NAME_KEY]).name.replace(".fits", "_stack.fits"), ) else: if len(batch) > 1: err = ( f"Attempting to run Swarp with batch-size of length " f"{len(batch)} but with combine=False. Either set combine=True" f", or DeBatch the images" ) logger.error(err) raise SwarpError(err) warnings.warn( "You are choosing to run Swarp without combining the image. " "This causes swarp to output an intermediate image, " "with possibly incorrect FLXSCALE values. You only want to run this" "if you want to cut your original image to match with the dimensions " "of a second image. If this is not your motive, please consider " "running with combine=True, it almost always gives the same result " "as running it without, but will have FLXSCALE in headers that make " "sense.", SwarpWarning, ) output_image_path = swarp_output_dir.joinpath( Path(batch[0][BASE_NAME_KEY]).with_suffix(".resamp.fits").name, ) logger.debug(f"Saving to {output_image_path}") try: component_images_list = [x[LATEST_SAVE_KEY] for x in batch] except KeyError: component_images_list = None all_pixscales = [] all_imgpixsizes = [] all_ras = [] all_decs = [] with open(swarp_image_list_path, "w", encoding="utf8") as img_list, open( swarp_weight_list_path, "w", encoding="utf8" ) as weight_list: for image in batch: pixscale_to_use = self.pixscale x_imgpixsize_to_use = self.x_imgpixsize y_imgpixsize_to_use = self.y_imgpixsize center_ra_to_use = self.center_ra center_dec_to_use = self.center_dec if ( (self.pixscale is None) | (self.x_imgpixsize is None) | (self.y_imgpixsize is None) | (self.center_ra is None) | (self.center_dec is None) ): with warnings.catch_warnings(): warnings.simplefilter("ignore", AstropyWarning) wcs = WCS(image.get_header()) cd11 = image["CD1_1"] cd21 = image["CD2_1"] nxpix = image["NAXIS1"] nypix = image["NAXIS2"] image_x_cen = nxpix / 2 image_y_cen = nypix / 2 [ra, dec] = wcs.all_pix2world(image_x_cen, image_y_cen, 1) xscale = np.sqrt(cd11**2 + cd21**2) pixscale = xscale * 3600 imgpixsize = max(nxpix, nypix) all_pixscales.append(pixscale) all_imgpixsizes.append(imgpixsize) all_ras.append(ra) all_decs.append(dec) logger.debug(f"{all_ras}, {all_decs}") if self.include_scamp: temp_head_path = copy_temp_file( output_dir=swarp_output_dir, file_path=Path(image[SCAMP_HEADER_KEY]), ) if np.logical_and( SWARP_FLUX_SCALING_KEY in image.header.keys(), self.flux_scaling_factor is not None, ): err = ( f"{SWARP_FLUX_SCALING_KEY} is present in header, and" f"a value for flux_scaling_factor has also been provided. " f"Please use only one." ) raise SwarpError(err) if SWARP_FLUX_SCALING_KEY not in image.header.keys(): if self.flux_scaling_factor is None: image[SWARP_FLUX_SCALING_KEY] = 1 else: image[SWARP_FLUX_SCALING_KEY] = self.flux_scaling_factor temp_img_path = get_temp_path(swarp_output_dir, image[BASE_NAME_KEY]) self.save_fits(image, temp_img_path) logger.debug(f"Saving mask image for {temp_img_path}") temp_mask_path = self.save_mask_image(image, temp_img_path) img_list.write(f"{temp_img_path}\n") weight_list.write(f"{temp_mask_path}\n") temp_files += [temp_img_path, temp_mask_path] if self.include_scamp: temp_files += [temp_head_path] if pixscale_to_use is None: pixscale_to_use = np.max(all_pixscales) if x_imgpixsize_to_use is None: x_imgpixsize_to_use = np.max(all_imgpixsizes) if y_imgpixsize_to_use is None: y_imgpixsize_to_use = np.max(all_imgpixsizes) if center_ra_to_use is None: center_ra_to_use = np.median(all_ras) if center_dec_to_use is None: center_dec_to_use = np.median(all_decs) if self.calculate_dims_in_swarp: x_imgpixsize_to_use = None y_imgpixsize_to_use = None if self.center_type is None: self.center_type = "MANUAL" if self.center_type != "MANUAL": center_dec_to_use = None center_ra_to_use = None output_image_weight_path = output_image_path.with_suffix(".weight.fits") run_swarp( stack_list_path=swarp_image_list_path, swarp_config_path=self.swarp_config, out_path=output_image_path, weight_list_path=swarp_weight_list_path, weight_out_path=output_image_weight_path, pixscale=pixscale_to_use, x_imgpixsize=x_imgpixsize_to_use, y_imgpixsize=y_imgpixsize_to_use, propogate_headerlist=self.propogate_headerlist, center_type=self.center_type, center_ra=center_ra_to_use, center_dec=center_dec_to_use, combine=self.combine, gain=self.gain, subtract_bkg=self.subtract_bkg, flux_scaling_keyword=SWARP_FLUX_SCALING_KEY, cache=self.cache, ) # Check if output image exists if combine is no. # This is the intermediate image that swarp makes # Hopefully this is obsolete now and noone uses this if not self.combine: temp_output_image_path = get_temp_path( swarp_output_dir, output_image_path.name, ) temp_output_image_weight_path = temp_output_image_path.with_suffix( ".weight.fits" ) if temp_output_image_path.exists(): shutil.copy(temp_output_image_path, output_image_path) shutil.copy(temp_output_image_weight_path, output_image_weight_path) # temp_output_image_path.rename(output_image_path) # temp_output_image_weight_path.rename(output_image_weight_path) temp_files.append(temp_output_image_path) temp_files.append(temp_output_image_weight_path) else: err = ( f"Swarp seems to have misbehaved, " f"and not made the correct output file {temp_output_image_path}" ) logger.error(err) raise SwarpError(err) new_image = self.open_fits(output_image_path) # Swarp sets pixels with no data to 0, which is not ideal weight_data = self.open_fits(output_image_weight_path).get_data() mask = weight_data == 0 img_data = new_image.get_data() img_data[mask] = np.nan new_image.set_data(img_data) # Add missing keywords that are common in all input images to the # header of resampled image, and save again # Omit any astrometric keywords with warnings.catch_warnings(): warnings.simplefilter("ignore", AstropyWarning) for key in batch[0].keys(): if np.any([key not in x.keys() for x in batch]): continue if np.logical_and( np.sum([x[key] == batch[0][key] for x in batch]) == len(batch), key.strip() not in all_astrometric_keywords, ): if key not in new_image.keys(): try: new_image[key] = batch[0][key] except ValueError: continue new_image["COADDS"] = sum(x["COADDS"] for x in batch) new_image[EXPTIME_KEY] = sum(x[EXPTIME_KEY] for x in batch) new_image[TIME_KEY] = min(x[TIME_KEY] for x in batch) new_image[RAW_IMG_KEY] = ",".join([x[RAW_IMG_KEY] for x in batch]) if self.header_keys_to_combine is not None: for key in self.header_keys_to_combine: try: new_image[key] = ",".join([str(x[key]) for x in batch]) except KeyError as exc: raise SwarpError(f"Key {key} not found in input images.") from exc if component_images_list is not None: new_image[STACKED_COMPONENT_IMAGES_KEY] = ",".join(component_images_list) new_image[BASE_NAME_KEY] = output_image_path.name new_image[LATEST_WEIGHT_SAVE_KEY] = output_image_weight_path.as_posix() # Reference images should have all the core fields try: check_image_has_core_fields(new_image) except MissingCoreFieldError as err: raise SwarpError(err) from err if not self.cache: for temp_file in temp_files: temp_file.unlink() logger.debug(f"Deleted temporary file {temp_file}") # Remove the output file made by Swarp, as we are passing along the image # we made. Also, the Swarp output image does not have any of the header # keywords we added above. output_image_path.unlink() return ImageBatch([new_image])