"""
Module containing the base Kowalski catalog object
"""
import logging
import os
from abc import ABC
from typing import Optional
from penquins import Kowalski
from mirar.catalog.base_catalog import BaseXMatchCatalog
from mirar.errors import ProcessorError
logger = logging.getLogger(__name__)
[docs]
class KowalskiError(ProcessorError):
"""Error relating to Kowalski"""
PROTOCOL, HOST, PORT = "https", "kowalski.caltech.edu", 443
[docs]
def get_kowalski() -> Kowalski:
"""
Get a Kowalski object, using credentials stored in the environment
:return: Kowalski object
"""
token_kowalski = os.environ.get("KOWALSKI_TOKEN")
if token_kowalski is not None:
logger.debug("Using kowalski token")
kowalski_instance = Kowalski(
token=token_kowalski, protocol=PROTOCOL, host=HOST, port=PORT
)
else:
username_kowalski = os.environ.get("KOWALSKI_USER")
password_kowalski = os.environ.get("KOWALSKI_PWD")
if username_kowalski is None:
err = (
"Kowalski username not provided, "
"please run export KOWALSKI_USER=<user>"
)
logger.error(err)
raise KowalskiError(err)
if password_kowalski is None:
err = (
"Kowalski password not provided, "
"please run export KOWALSKI_PWD=<user>"
)
logger.error(err)
raise KowalskiError(err)
kowalski_instance = Kowalski(
username=username_kowalski,
password=password_kowalski,
protocol=PROTOCOL,
host=HOST,
port=PORT,
)
if not kowalski_instance.ping():
err = "Error connecting to Kowalski. Are your credentials right?"
logger.error(err)
raise KowalskiError(err)
return kowalski_instance
[docs]
class BaseKowalskiXMatch(BaseXMatchCatalog, ABC):
"""
Base class for a catalog using Kowalski
"""
def __init__(
self,
*args,
kowalski: Optional[Kowalski] = None,
max_time_ms: float = 10000,
**kwargs,
):
super().__init__(*args, **kwargs)
self.max_time_ms = max_time_ms
self.kowalski = kowalski
[docs]
def near_query_kowalski(self, coords: dict) -> dict:
"""
Performs a Kowalski query around coords
:param coords: ra/dec
:return: crossmatch dict
"""
query = {
"query_type": "near",
"query": {
"max_distance": self.search_radius_arcsec,
"distance_units": "arcsec",
"radec": coords,
"catalogs": {
f"{self.catalog_name}": {
"filter": {},
"projection": self.projection,
}
},
},
"kwargs": {
"max_time_ms": self.max_time_ms,
"limit": self.num_sources,
},
}
logger.debug(f"Kowalski is {self.kowalski}")
response = self.kowalski.query(query=query)
data = response.get("default").get("data")
return data[self.catalog_name]
[docs]
def query(self, coords) -> dict:
"""
Uses a Kowalski object to query for sources around coords
:param coords: ra/dec
:return: crossmatch sources
"""
if self.kowalski is None:
self.kowalski = get_kowalski()
logger.debug("Querying kowalski")
data = self.near_query_kowalski(coords)
data = self.update_data(data)
return data
[docs]
@staticmethod
def update_data(data: dict) -> dict:
"""
For a given catalog, update the data with any extra information
:param data: kowalski data
:return: updated data
"""
return data