Source code for mirar.processors.mask

"""
Module containing processors which mask pixels
"""
import logging
from pathlib import Path
from typing import Callable

import numpy as np
from astropy.coordinates import SkyCoord
from astropy.io import fits
from astropy.wcs import WCS

from mirar.data import Image, ImageBatch
from mirar.paths import BASE_NAME_KEY, FITS_MASK_KEY, get_output_dir
from mirar.processors.base_processor import BaseImageProcessor

logger = logging.getLogger(__name__)

MASK_VALUE = np.nan


[docs] class BaseMask(BaseImageProcessor): """ Base class for masking processors """ def __init__( self, write_masked_pixels_to_file: bool = False, output_dir: str | Path = "mask", only_write_mask: bool = False, ): super().__init__() self.write_masked_pixels_to_file = write_masked_pixels_to_file self.output_dir = output_dir self.only_write_mask = only_write_mask
[docs] def get_mask(self, image) -> np.ndarray[bool]: """ Function to get the mask for a given image """ raise NotImplementedError
def _apply_to_images( self, batch: ImageBatch, ) -> ImageBatch: for image in batch: data = image.get_data() logger.debug(f"Masking {image[BASE_NAME_KEY]}") pixels_to_mask = self.get_mask(image) if not self.only_write_mask: data[pixels_to_mask] = MASK_VALUE image.set_data(data) logger.debug( f"Masked {np.sum(pixels_to_mask)}/{pixels_to_mask.size} pixels " f"in {image[BASE_NAME_KEY]}" ) if self.write_masked_pixels_to_file: mask_directory = get_output_dir(self.output_dir, self.night_sub_dir) mask_directory.mkdir(parents=True, exist_ok=True) mask_file_path = mask_directory.joinpath( image[BASE_NAME_KEY] ).with_suffix(".mask.fits") mask_data = np.ones_like(data) mask_data[pixels_to_mask] = 0.0 mask_image = Image(data=mask_data, header=image.get_header()) self.save_fits(mask_image, mask_file_path) image[FITS_MASK_KEY] = mask_file_path.as_posix() return batch
[docs] class MaskPixelsFromPath(BaseMask): """ Processor to apply a mask to images using another `mask image'. Following the general mirar convention, every zero pixel in the mask image will be masked in the science image. """ base_key = "maskfrompath" def __init__( self, mask_path: str | Path = None, mask_path_key: str = None, write_masked_pixels_to_file: bool = False, output_dir: str | Path = "mask", only_write_mask: bool = False, ): super().__init__( write_masked_pixels_to_file=write_masked_pixels_to_file, output_dir=output_dir, only_write_mask=only_write_mask, ) self.mask_path = mask_path self.mask_path_key = mask_path_key if mask_path is None and mask_path_key is None: raise ValueError("Must specify either mask_path or mask_path_key") if mask_path is not None and mask_path_key is not None: raise ValueError("Must specify either mask_path or mask_path_key, not both") def __str__(self) -> str: return f"Processor to mask bad pixels using a pre-defined map: {self.mask_path}"
[docs] def get_mask(self, image) -> np.ndarray: """ loads mask if needed, and returns it :return: mask """ # if self.mask is None: # why is this needed? if self.mask_path is not None: mask_img = self.open_fits(self.mask_path) elif self.mask_path_key is not None: logger.debug(f"Loading mask from {image[self.mask_path_key]}") mask_img = self.open_fits(image[self.mask_path_key]) else: raise ValueError("Must specify either mask_path or mask_path_key") pixels_to_keep = mask_img.get_data().astype(bool) return ~pixels_to_keep
[docs] class MaskPixelsFromPathInverted(MaskPixelsFromPath): """ Processor to apply a mask to images using another `mask image'. In contrast to the general mirar convention, every non-zero pixel in the mask image will be masked in the science image. """ base_key = "maskfrompathinverted"
[docs] def get_mask(self, image) -> np.ndarray: """ Mask pixels which are non-zero in the mask file. This is the inverse of MaskPixelsFromPath, which masks pixels which are zero in the mask file. :param image: image to mask :return: Boolean mask """ mask = super().get_mask(image) return ~mask
[docs] class MaskPixelsFromFunction(BaseMask): """ Processor to apply a mask to images using a function """ base_key = "maskfromfunction" def __init__( self, mask_function: Callable[[Image], np.ndarray], write_masked_pixels_to_file: bool = False, output_dir: str | Path = "mask", only_write_mask: bool = False, ): super().__init__( write_masked_pixels_to_file=write_masked_pixels_to_file, output_dir=output_dir, only_write_mask=only_write_mask, ) self.mask_function = mask_function
[docs] def get_mask(self, image) -> np.ndarray: """ Function to get the mask for a given image """ return self.mask_function(image)
def __str__(self) -> str: return ( f"Processor to mask pixels using the function " f"'{self.mask_function.__name__}'" )
[docs] class MaskAboveThreshold(BaseMask): """ Processor to mask pixels above a threshold """ base_key = "maskthresh" def __init__( self, threshold: float = None, threshold_key: str = None, write_masked_pixels_to_file: bool = False, output_dir: str | Path = "mask", only_write_mask: bool = False, ): """ :param threshold: threshold to mask above :param threshold_key: key to use to get threshold from image header """ super().__init__( write_masked_pixels_to_file=write_masked_pixels_to_file, output_dir=output_dir, only_write_mask=only_write_mask, ) self.threshold = threshold self.threshold_key = threshold_key self.write_masked_pixels_to_file = write_masked_pixels_to_file if threshold is None and threshold_key is None: raise ValueError("Must specify either threshold or threshold_key") if threshold is not None and threshold_key is not None: raise ValueError("Must specify either threshold or threshold_key, not both") def __str__(self) -> str: return f"Processor to mask pixels above a threshold '{self.threshold}'"
[docs] def get_mask(self, image) -> np.ndarray: """ Returns a mask for pixels above a threshold :return: mask """ if self.threshold is None: self.threshold = image.get_header()[self.threshold_key] pixels_to_mask = image.get_data() > self.threshold return pixels_to_mask
[docs] class MaskPixelsFromWCS(BaseMask): """ Processor to mask pixels from a file where WCS coordinates of masked pixels are given """ base_key = "maskwcs" def __init__( self, mask_pixels_ra: float | list[float] = None, mask_pixels_dec: float | list[float] = None, mask_file_key: str = FITS_MASK_KEY, write_masked_pixels_to_file: bool = False, output_dir: str | Path = "mask", only_write_mask: bool = False, ): super().__init__( write_masked_pixels_to_file=write_masked_pixels_to_file, output_dir=output_dir, only_write_mask=only_write_mask, ) self.mask_pixels_ra = mask_pixels_ra self.mask_pixels_dec = mask_pixels_dec self.mask_file_key = mask_file_key if self.mask_pixels_ra is not None: self.mask_file_key = None def __str__(self) -> str: return "Processor to mask pixels using a list of RA/Dec coordinates."
[docs] def get_mask(self, image) -> np.ndarray: """ loads mask if needed, and returns it :return: mask """ wcs = WCS(image.get_header()) if self.mask_file_key is not None: mask_file_path = image.get_header()[self.mask_file_key] with fits.open(mask_file_path) as mask_image: pixels_to_keep = mask_image[0].data # pylint: disable=no-member mask_wcs = WCS(mask_image[0].header) # pylint: disable=no-member masked_pixel_x, masked_pixel_y = np.where(pixels_to_keep == 0.0) mask_pixel_coords = mask_wcs.pixel_to_world(masked_pixel_y, masked_pixel_x) mask_pixels_ra = mask_pixel_coords.ra.deg mask_pixels_dec = mask_pixel_coords.dec.deg else: mask_pixels_ra = self.mask_pixels_ra mask_pixels_dec = self.mask_pixels_dec logger.debug(f"Masking {mask_pixels_ra} ras and {mask_pixels_dec} decs") mask_pixel_coords = SkyCoord(mask_pixels_ra, mask_pixels_dec, unit="deg") mask_pixels_x, mask_pixels_y = wcs.world_to_pixel(mask_pixel_coords) mask_pixels_x = mask_pixels_x.astype(int) mask_pixels_y = mask_pixels_y.astype(int) pixels_to_keep = np.ones(image.get_data().shape, dtype=bool) mask_in_image = np.logical_and( mask_pixels_x > 0.0, mask_pixels_x < pixels_to_keep.shape[1] ) & np.logical_and(mask_pixels_y > 0.0, mask_pixels_y < pixels_to_keep.shape[0]) mask_pixels_x = mask_pixels_x[mask_in_image] mask_pixels_y = mask_pixels_y[mask_in_image] pixels_to_keep[mask_pixels_y, mask_pixels_x] = False return ~pixels_to_keep
[docs] class WriteMaskedCoordsToFile(BaseMask): """ Processor to write masked coordinates to a file """ base_key = "writemaskedcoords" def __init__(self, output_dir: str | Path = "mask", only_write_mask: bool = False): super().__init__( write_masked_pixels_to_file=True, output_dir=output_dir, only_write_mask=only_write_mask, ) def __str__(self) -> str: return f"Processor to save image masks to the {self.output_dir} directory"
[docs] def get_mask(self, image) -> np.ndarray: pixels_to_mask = np.zeros(image.get_data().shape, dtype=bool) # For some reason, MASK_VALUE == np.nan returns False. Issue/Feature of numpy? # This is a workaround if np.isnan(MASK_VALUE): pixels_to_mask[np.isnan(image.get_data())] = True else: pixels_to_mask[image.get_data() == MASK_VALUE] = True return pixels_to_mask
[docs] class MaskDatasecPixels(BaseMask): """ Processor to mask the data section of an image """ base_key = "maskdatasec"
[docs] def get_mask(self, image: Image) -> np.ndarray: """ Function to mask the data section of an image """ header = image.header data = image.get_data() datasec = header["DATASEC"].replace("[", "").replace("]", "").split(",") datasec_xmin = int(datasec[0].split(":")[0]) datasec_xmax = int(datasec[0].split(":")[1]) datasec_ymin = int(datasec[1].split(":")[0]) datasec_ymax = int(datasec[1].split(":")[1]) mask = np.zeros(data.shape) mask[:, :datasec_xmin] = 1.0 mask[:, datasec_xmax:] = 1.0 mask[:datasec_ymin, :] = 1.0 mask[datasec_ymax:, :] = 1.0 return mask.astype(bool)
def __str__(self) -> str: return "Processor to mask pixels using a 'datasec' keyword in the header."