Source code for mirar.processors.astrometry.autoastrometry.crossmatch

"""
Component module for autoastrometry, dealing with crossmatching image sources with
reference sources
"""
import logging
import os
from typing import Optional

import numpy as np

from mirar.processors.astrometry.autoastrometry.sources import (
    BaseSource,
    SextractorSource,
    distance,
    pixel_distance,
    position_angle,
    quickdistance,
)
from mirar.processors.astrometry.autoastrometry.utils import median, mode, stdev, unique

logger = logging.getLogger(__name__)

FAST_MATCH = False
SHOW_MATCH = False


[docs] def distance_match( img_src_list: list[SextractorSource], ref_src_list: list[BaseSource], base_output_path: str, max_rad: float = 180.0, min_rad: float = 10.0, tolerance: float = 0.010, req_match: int = 3, pa_tolerance: float = 1.2, unc_pa: Optional[float] = None, write_crosscheck_files: bool = False, ) -> tuple[list[int], list[int], list[float]]: """ Cross-match sources in image with reference sources :param img_src_list: sources in image :param ref_src_list: reference sources :param base_output_path: output path to save crossmatch :param max_rad: max radius (degrees) :param min_rad: min radius (degrees) :param tolerance: :param req_match: minimum number of matches needed :param pa_tolerance: pa tolerance needed :param unc_pa: uncertainty in pa :param write_crosscheck_files: boolean whether to write crosscheck files :return: image_matches, ref_matches, mpa """ if tolerance <= 0: err = f"Tolerance cannot be negative! Using {abs(tolerance)} not {tolerance}." logger.error(err) raise ValueError(err) if pa_tolerance <= 0: err = ( f"PA tolerance cannot be negative! " f"Using {abs(pa_tolerance)} not {pa_tolerance}." ) logger.error(err) raise ValueError(err) if req_match < 2: logger.warning("Warning: reqmatch >=3 suggested") if unc_pa is None: unc_pa = 720.0 dec_list = [] for src in img_src_list: dec_list.append(src.dec_rad) median_dec_rad = median(dec_list) # faster distance computation ra_scale = np.cos(median_dec_rad) # will mess up meridian crossings, however # Calculate all the distances # In image catalog: img_src_dists = [] img_src_match_ids = [] for i, src in enumerate(img_src_list): distances = [] distances_j_index = [] for j, src2 in enumerate(img_src_list): if i == j: continue if abs(src.dec_deg - src2.dec_deg) > max_rad: continue if ra_scale * abs(src.ra_deg - src2.ra_deg) > max_rad: continue dist = quickdistance(src, src2, ra_scale) if min_rad < dist < max_rad: distances.append(dist) distances_j_index.append(j) img_src_dists.append(distances) img_src_match_ids.append(distances_j_index) # In reference catalog: ref_src_dists = [] ref_src_match_ids = [] for i, src in enumerate(ref_src_list): distances = [] distances_j_index = [] for j, src2 in enumerate(ref_src_list): if i == j: continue if abs(src.dec_deg - src2.dec_deg) > max_rad: continue if ra_scale * abs(src.ra_deg - src2.ra_deg) > max_rad: continue dist = quickdistance(src, src2, ra_scale) if min_rad < dist < max_rad: distances.append(dist) distances_j_index.append(j) ref_src_dists.append(distances) ref_src_match_ids.append(distances_j_index) # Now look for matches in the reference catalog to distances in the image catalog. n_great_matches = 0 img_match = [] ref_match = [] mpa = [] match_ns = [] primary_match_img = [] primary_match_ref = [] for img_i, img_dist_array in enumerate(img_src_dists): if len(img_dist_array) < 2: continue for ref_i, ref_dist_array in enumerate(ref_src_dists): if len(ref_dist_array) < 2: continue match = 0 img_match_in = [] ref_match_in = [] for img_j, img_src_dist in enumerate(img_dist_array): new_match = 1 for ref_j, ref_src_dist in enumerate(ref_dist_array): if abs((img_src_dist / ref_src_dist) - 1.0) < tolerance: match += new_match # further matches before the next # img_j loop indicate degeneracies new_match = 0 img_match_in.append(img_src_match_ids[img_i][img_j]) ref_match_in.append(ref_src_match_ids[ref_i][ref_j]) if match >= req_match: dpa = [] # Here, dpa[n] is the mean rotation of the PA from # the primary star of this match to the stars in its match # RELATIVE TO those same angles for those same stars # in the catalog. Therefore it is a robust measurement of the rotation. for i, match_index in enumerate(img_match_in): ddpa = position_angle( img_src_list[img_i], img_src_list[match_index] ) - position_angle( ref_src_list[ref_i], ref_src_list[ref_match_in[i]] ) while ddpa > 200.0: ddpa -= 360.0 while ddpa < -160.0: ddpa += 360.0 dpa.append(ddpa) # If user was confident the initial PA was right, remove bad PA'src # right away for i in range(len(img_match_in) - 1, -1, -1): if abs(dpa[i]) > unc_pa: del img_match_in[i] del ref_match_in[i] del dpa[i] if len(img_match_in) < 2: continue mode_dpa = mode(dpa) # Remove deviant matches by PA for i in range(len(img_match_in) - 1, -1, -1): if abs(dpa[i] - mode_dpa) > pa_tolerance: del img_match_in[i] del ref_match_in[i] del dpa[i] if len(img_match_in) < 2: continue n_degeneracies = ( len(img_match_in) - len(unique(img_match_in)) + len(ref_match_in) - len(unique(ref_match_in)) ) # this isn't quite accurate (overestimates if degeneracies are mixed up) mpa.append(mode_dpa) primary_match_img.append(img_i) primary_match_ref.append(ref_i) img_match.append(img_match_in) ref_match.append(ref_match_in) match_ns.append(len(img_match_in) - n_degeneracies) if len(img_match_in) - n_degeneracies > 6: n_great_matches += 1 if n_great_matches > 16 and FAST_MATCH is True: break # save processing time n_matches = len(img_match) if n_matches == 0: logger.error("Found no potential matches of any sort (including pairs).") logger.error( "The algorithm is probably not finding enough real stars to solve " "the field. Check seeing." ) return [], [], [] # Kill the bad matches rejects = 0 # Get rid of matches that don't pass the reqmatch cut # if n_matches > 10 and max(match_ns) >= reqmatch: for i in range(len(primary_match_img) - 1, -1, -1): if match_ns[i] < req_match: del mpa[i] del primary_match_img[i] del primary_match_ref[i] del img_match[i] del ref_match[i] del match_ns[i] if len(img_match) < 1: logger.error(f"Found no matching clusters of reqmatch = {req_match}") return [], [], [] # If we still have lots of matches, # get rid of those with the minimum number of submatches # (that is, increase reqmatch by 1) min_match = min(match_ns) count_not_min = 0 for match_n in match_ns: if match_n > min_match: count_not_min += 1 if len(match_ns) > 16 and count_not_min > 3: logger.debug(f"Too many matches: increasing reqmatch to {req_match + 1}") for i in range(len(primary_match_img) - 1, -1, -1): if match_ns[i] == min_match: del mpa[i] del primary_match_img[i] del primary_match_ref[i] del img_match[i] del ref_match[i] del match_ns[i] n_matches = len( img_match ) # recalculate with the new reqmatch and with prunes supposedly removed logger.debug(f"Found {n_matches} candidate matches.") # Use only matches with a consistent PA offset_pa = mode(mpa) if len(img_match) > 2: # Coarse iteration for anything away from the mode for i in range(len(primary_match_img) - 1, -1, -1): if abs(mpa[i] - offset_pa) > pa_tolerance: del mpa[i] del primary_match_img[i] del primary_match_ref[i] del img_match[i] del ref_match[i] del match_ns[i] rejects += 1 st_dev_pa = stdev(mpa) refined_tolerance = 2.2 * st_dev_pa # Fine iteration to flag outliers now that we know most are reliable for i in range(len(primary_match_img) - 1, -1, -1): if abs(mpa[i] - offset_pa) > refined_tolerance: del mpa[i] del primary_match_img[i] del primary_match_ref[i] del img_match[i] del ref_match[i] del match_ns[i] rejects += ( 1 # these aren't necessarily bad, just making more manageable. ) # New verification step: calculate distances and PAs between central stars # of matches n_dist_flags = [0] * len(primary_match_img) for _ in range(2): # two iterations # find bad pairs if len(primary_match_img) == 0: break for i, img_i in enumerate(primary_match_img): for j, img_j in enumerate(primary_match_img): if i == j: continue ref_i = primary_match_ref[i] ref_j = primary_match_ref[j] img_dist_ij = distance(img_src_list[img_i], img_src_list[img_j]) ref_dist_ij = distance(ref_src_list[ref_i], ref_src_list[ref_j]) try: if abs((img_dist_ij / ref_dist_ij) - 1.0) > tolerance: n_dist_flags[i] += 1 except ZeroDivisionError: # (occasionally will get divide by zero) pass # delete bad clusters n_test_matches = len(primary_match_img) for i in range(n_test_matches - 1, -1, -1): if ( n_dist_flags[i] == n_test_matches - 1 ): # if every comparison is bad, this is a bad match del mpa[i] del primary_match_img[i] del primary_match_ref[i] del img_match[i] del ref_match[i] del match_ns[i] rejects += 1 logger.debug(f"Rejected {rejects} bad matches.") n_matches = len(primary_match_img) logger.debug(f"Found {n_matches} good matches.") if n_matches == 0: return [], [], [] # check the pixel scale while we're at it pix_scale_list = [] if len(primary_match_img) >= 2: for i in range(len(primary_match_img) - 1): for j in range(i + 1, len(primary_match_img)): img_i = primary_match_img[i] ref_i = primary_match_ref[i] img_j = primary_match_img[j] ref_j = primary_match_ref[j] pix_scale_list.append( distance(ref_src_list[ref_i], ref_src_list[ref_j]) / pixel_distance(img_src_list[img_i], img_src_list[img_j]) ) pix_scale = median(pix_scale_list) pix_scale_std = stdev(pix_scale_list) if len(primary_match_img) >= 3: logger.debug( f'Refined pixel scale measurement: {pix_scale:.4f}"/pix ' f"(+/- {pix_scale_std:.4f})" ) else: logger.debug(f'Refined pixel scale measurement: {pix_scale:.4f}"/pix') for i, img_i in enumerate(primary_match_img): ref_i = primary_match_ref[i] if SHOW_MATCH: logger.debug(f"{img_i} matches {ref_i} (dPA ={mpa[i]:.3f})") if len(img_match[i]) < 16: logger.debug(f" {img_i} --> {img_match[i]}") logger.debug(f" {ref_i} --> {ref_match[i]}") else: logger.debug( f" {img_i} --> {img_match[i][0:10]} {len(img_match[i]) - 10} more" ) logger.debug( f" {ref_i} --> {ref_match[i][0:10]} {len(ref_match[i]) - 10} more" ) if i + 1 >= 10 and len(primary_match_img) - 10 > 0: logger.debug( f"" f"{(len(primary_match_img) - 10)} additional matches not shown." ) break else: logger.debug( f"{img_i} matches {ref_i} (dPA ={mpa[i]:.3f}):" f" {str(len(img_match[i])).strip()} rays" ) if write_crosscheck_files: match_lines_im = os.path.splitext(base_output_path)[0] + ".matchlines.im.reg" logger.info(f"Writing match lines to {match_lines_im}") with open(match_lines_im, "w", encoding="utf8") as out: color = "red" out.write( f"# Region file format: DS9 version 4.0\nglobal color={color} " f'font="helvetica 10 normal" select=1 highlite=1 ' f"edit=1 move=1 delete=1 include=1 fixed=0 source\n" ) out.write("image\n") for i, img_i in enumerate(primary_match_img): for j, img_j in enumerate(img_match[i]): out.write( f"line({img_src_list[img_i].x:.3f}," f"{img_src_list[img_i].y:.3f}," f"{img_src_list[img_j].x:.3f}," f"{img_src_list[img_j].y:.3f}) # line=0 0\n" ) match_lines_wcs = os.path.splitext(base_output_path)[0] + ".matchlines.wcs.reg" logger.info(f"Writing match lines to {match_lines_wcs}") with open(match_lines_wcs, "w", encoding="utf8") as out: color = "green" out.write( f"# Region file format: DS9 version 4.0\nglobal color={color} " f'font="helvetica 10 normal" select=1 highlite=1 ' f"edit=1 move=1 delete=1 include=1 fixed=0 source\n" ) out.write("fk5\n") for i, ref_i in enumerate(primary_match_ref): for j, ref_j in enumerate(ref_match[i]): out.write( f"line({ref_src_list[ref_i].ra_deg:.5f}," f"{ref_src_list[ref_i].dec_deg:.5f}," f"{ref_src_list[ref_j].ra_deg:.5f}," f"{ref_src_list[ref_j].dec_deg:.5f}) # line=0 0\n" ) # future project: if not enough, go to the secondary offsets return primary_match_img, primary_match_ref, mpa
[docs] def crosscheck_source_lists( img_src_list: list[SextractorSource], n_img: int, img_density: float, ref_src_list: list[BaseSource], n_ref: int, ref_density: float, box_size_arcsec: float, area_sq_min: float, ) -> tuple[list[SextractorSource], int, float, list[BaseSource], int, float]: """ Compares detected sources in image and reference, and trims so they are of comparable density :param img_src_list: list of image sources :param n_img: number of image sources :param img_density: img source density :param ref_src_list: list of reference images :param n_ref: number of reference sources :param ref_density: ref source density :param box_size_arcsec: radius of search box :param area_sq_min: area of image :return: trimmed list of sources, number, density, trimmed ref sources, number & density """ # If this image is actually shallower than reference catalog, trim the # reference catalog down if n_ref > 16 and ref_density > 3 * img_density: logger.debug("Image is shallow. Trimming reference catalog...") while ref_density > 3 * img_density: ref_src_list = ref_src_list[0 : int(len(ref_src_list) * 4 / 5)] n_ref = len(ref_src_list) ref_density = n_ref / (2 * box_size_arcsec / 60.0) ** 2 # If the image is way deeper than USNO, trim the image catalog down if n_img > 16 and img_density > 4 * ref_density: logger.debug("Image is deep. Trimming image catalog...") while img_density > 4 * ref_density and n_img > 8: img_src_list = img_src_list[0 : int(len(img_src_list) * 4 / 5)] n_img = len(img_src_list) img_density = n_img / area_sq_min # If too many objects, do some more trimming if n_img * n_ref > 120 * 120 * 4: logger.debug("Image and/or catalog still too deep. Trimming...") while n_img * n_ref > 120 * 120 * 4: if img_density > ref_density: img_src_list = img_src_list[0 : int(len(img_src_list) * 4 / 5)] n_img = len(img_src_list) img_density = n_img / area_sq_min else: ref_src_list = ref_src_list[0 : int(len(ref_src_list) * 4 / 5)] n_ref = len(ref_src_list) ref_density = n_ref / (2 * box_size_arcsec / 60.0) ** 2 # Remove fainter object in close pairs for both lists min_sep = 3 delete_list = [] for i, src in enumerate(img_src_list): for j, src2 in enumerate(img_src_list[i + 1 :]): dist = distance(src, src2) if dist < min_sep: if src.mag > src2.mag: delete_list.append(i) else: delete_list.append(i + j + 1) delete_list = unique(delete_list) delete_list.reverse() for del_index in delete_list: del img_src_list[del_index] for i, src in enumerate(ref_src_list): for j, src2 in enumerate(ref_src_list[i + 1 :]): dist = distance(src, src2) if dist < min_sep: if src.mag > src2.mag: delete_list.append(i) else: delete_list.append(i + j + 1) delete_list = unique(delete_list) delete_list.reverse() for del_index in delete_list: del ref_src_list[del_index] return img_src_list, n_img, img_density, ref_src_list, n_ref, ref_density