Source code for recipe_system.cal_service.caldb

import abc
import os
from functools import partial

import numpy as np

from gempy.utils import logutils

# Subdirectory in which to store processed calibrations
CALDIR = "calibrations"

REQUIRED_TAG_DICT = {'processed_arc': ['PROCESSED', 'ARC'],
                     'processed_bias': ['PROCESSED', 'BIAS'],
                     'processed_dark': ['PROCESSED', 'DARK'],
                     'processed_flat': ['PROCESSED', 'FLAT'],
                     'processed_fringe': ['PROCESSED', 'FRINGE'],
                     'processed_standard': ['PROCESSED', 'STANDARD'],
                     'processed_slitillum': ['PROCESSED', 'SLITILLUM'],
                     'processed_bpm': ['PROCESSED', 'BPM'],
                     'processed_slitflat': ['PROCESSED', 'FLAT', 'SLITV'],
                     'processed_slit': ['PROCESSED', 'SLITV'],
                     'bpm': ['BPM'],
                     'mask': ['MASK'],
                     }

VALID_CALTYPES = REQUIRED_TAG_DICT.keys()

[docs] def cascade(fn): """ Decorates methods such that, after execution, the method with the same name is called on the next CalDB instance in the cascade, with the same arguments. """ def wrapper(instance, *args, **kwargs): fn(instance, *args, **kwargs) if instance.nextdb is not None: getattr(instance.nextdb, fn.__name__)(*args, **kwargs) return wrapper
[docs] class CalDB(metaclass=abc.ABCMeta): """ The parent class for all types of calibration database handlers. Attributes ---------- name : str a name for this database (gets reported with the calibration, so there is some provenance) log : logger object the logger object to report to get_cal : bool allow retrievals from this database? store_cal : bool allow new calibrations to be stored to this database? caldir : str directory under which calibrations will be stored (either when createed, or when retrieved from a RemoteDB) procmode : str the default processing mode that retrieved calibrations must be suitable for nextdb : CalDB object/None a place to send unfulfilled calibration requests _valid_caltypes : list of str valid calibration types for retrieval/storage """ def __init__(self, name=None, get_cal=True, store_cal=False, valid_caltypes=None, procmode=None, log=None): self._valid_caltypes = valid_caltypes or VALID_CALTYPES self.caldir = CALDIR self.name = name self.get_cal = get_cal self.store_cal = store_cal self.procmode = procmode self.nextdb = None self.log = log or logutils.get_logger(__name__) def __getattr__(self, attr): """ A shortcut to avoid having to define every single get_processed_cheese method to simply call get_calibrations(caltype="processed_cheese") """ if (attr.startswith("get_processed_") and attr[4:] in self._valid_caltypes): return partial(self.get_calibrations, caltype=attr[4:]) raise AttributeError("Unknown attribute {!r}".format(attr)) def __len__(self): """Return the number of separate databases""" return 1 if self.nextdb is None else 1 + len(self.nextdb) def __getitem__(self, item): """Return the CalDB instance at that position in the list""" if not isinstance(item, (int, np.integer)): raise TypeError("Index must me an integer") if item == 0: return self elif item > 0 and self.nextdb: return self.nextdb[item-1] elif -len(self) < item < 0: return self[len(self)+item] raise IndexError("index out of range")
[docs] def add_database(self, db): """ Adds a database to the end of the cascade. It does this recursively, calling this method on the next database in the casecade, until it gets to the end. Parameters ---------- db : CalDB instance the database to add to the end of this cascade """ if self.nextdb is None: self.nextdb = db else: self.nextdb.add_database(db)
[docs] def get_calibrations(self, adinputs, caltype=None, procmode=None, howmany=1): """ Returns the requested calibration type for the list of files provided. Parameters ---------- adinputs : list of AstroData objects the files requiring calibrations caltype : str the type of calibration required procmode : str/None minimum quality of processing needed for the calibration howmany : int maximum number of calibrations to return (for backwards compat). Use howmany>1 at your own risk! Returns ------- CalReturn : the calibration filenames and the databases which provided them """ if caltype not in self._valid_caltypes: raise ValueError("Calibration type {!r} not recognized". format(caltype)) if procmode is None: procmode = self.procmode # TODO: Ideally this would build the cal_requests only once at the # start and then pass on the unfulfilled requests, instead of the # AD objects. But currently the cal_requests are slightly different # for local and remote DBs (because of the "Section" namedtuple) so # this isn't possible just yet. cal_ret = None if self.get_cal: try: cal_ret = self._get_calibrations(adinputs, caltype=caltype, procmode=procmode, howmany=howmany) except Exception as calex: self.log.warning(f"Error getting calibrations, continuing: {calex}") if cal_ret is None: cal_ret = CalReturn([None] * len(adinputs)) if cal_ret.files.count(None) and self.nextdb: # Pass adinputs without calibrations to next database (with # possible recursion) new_ret = self.nextdb.get_calibrations( [ad for ad, cal in zip(adinputs, cal_ret.files) if cal is None], caltype=caltype, procmode=procmode) # Insert any new calibrations into the original list new_files, new_origins = iter(new_ret.files), iter(new_ret.origins) return CalReturn([next(new_files) if cal is None else cal for cal in cal_ret.files], [next(new_origins) if origin is None else origin for origin in cal_ret.origins]) return cal_ret
[docs] @cascade def set_calibrations(self, adinputs, caltype=None, calfile=None): """ Manually assign files as calibrations of a specified type to a list of one or more AstroData objects. Parameters ---------- adinputs: list of AstroData objects the files to be assigned calibrations caltype : str the type of calibration being assigned calfile : str the filename of the calibration """ self._set_calibrations(adinputs, caltype=caltype, calfile=calfile)
[docs] @cascade def unset_calibrations(self, adinputs, caltype=None): """ Manually remove calibrations of a specified type from a list of one or more AstroData objects. Parameters ---------- adinputs: list of AstroData objects the files to have calibrations removed caltype : str the type of calibration to be removed """ self._unset_calibrations(adinputs, caltype=caltype)
[docs] @cascade def clear_calibrations(self): """ Clear all manually-assigned calibrations. """ self._clear_calibrations()
[docs] def store_calibration(self, cal, caltype=None): """ Handle storage of calibrations. For all types of calibration (NOT processed_science) this saves the file to disk in the appropriate calibrations/ subdirectory (if store=True) and then passes the filename down the chain. For processed_science files, it passes the AstroData object. Parameters ---------- cals : str/AstroData the calibration or its filename caltype : str type of calibration """ # Create storage directory if it doesn't exist and we've got an AD # object and this is not a processed science image. This will only # happen with the first CalDB to store, since the calibration will # be forwarded as a filename. Processed science files are simply # passed through as AD objects since they don't get stored on disk # in a calibrations/ subdirectory. if not (isinstance(cal, str) or "science" in caltype): required_tags = REQUIRED_TAG_DICT[caltype] if cal.tags.issuperset(required_tags): caltype_dir = os.path.join(self.caldir, caltype) if not os.path.exists(caltype_dir): os.makedirs(caltype_dir) fname = os.path.join(caltype_dir, os.path.basename(cal.filename)) cal.write(fname, overwrite=True) else: self.log.warning( f"File {cal.filename} is not recognized as a" f" {caltype}. Not storing as a calibration.") return cal = fname self._store_calibration(cal, caltype=caltype) if self.nextdb is not None: self.nextdb.store_calibration(cal, caltype=caltype)
@abc.abstractmethod def _get_calibrations(self, adinputs, caltype=None, procmode=None): pass @abc.abstractmethod def _store_calibration(self, cals, caltype=None): pass def _set_calibrations(self, adinputs, caltype=None, calfile=None): pass def _unset_calibrations(self, adinputs, caltype=None): pass def _clear_calibrations(self): pass
[docs] class CalReturn: """ A simple class to store files and origins (i.e., which CalDB instance provided the calibration) for calibration requests. The class can be instantiated with either a list of files and a list of origins, or a single list of 2-tuples (file, origin), where a single None can be used in place of (None, None). Public attributes are: files : list of files (or list of lists of files) origins : list of origins There is also an items() method that returns these lists as zipped pairs, like the dict.items() method. Iteration is set up to return first the complete list of files, then the complete list of origins, which helps with coding the cheeseCorrect() primitives. """ def __init__(self, *args): if len(args) == 1: self.files = [item[0] if item else None for item in args[0]] self.origins = [item[1] if item else None for item in args[0]] elif len(args) == 2: self.files, self.origins = args else: raise ValueError("CalReturn must be initialized with 1 or 2 " "arguments") def __len__(self): return len(self.files) def __iter__(self): for ret in (self.files, self.origins): yield ret
[docs] def items(self): return list((file, origin) for file, origin in zip(self.files, self.origins))