Source code for astrodata.nddata

"""
This module implements a derivative class based on NDData with some Mixins,
implementing windowing and on-the-fly data scaling.
"""


import warnings
from copy import deepcopy
from functools import reduce

from astropy.nddata import (NDData, NDSlicingMixin, NDArithmeticMixin,
                            VarianceUncertainty)
from astropy.io.fits import ImageHDU
from astropy.modeling import models, Model

import numpy as np

from astropy.wcs import WCS
from gwcs.wcs import WCS as gWCS

INTEGER_TYPES = (int, np.integer)

__all__ = ['NDAstroData']


class ADVarianceUncertainty(VarianceUncertainty):
    """
    Subclass VarianceUncertainty to check for negative values.
    """
    @VarianceUncertainty.array.setter
    def array(self, value):
        if value is not None and np.any(value < 0):
            warnings.warn("Negative variance values found. Setting to zero.",
                          RuntimeWarning)
            value = np.where(value >= 0., value, 0.)
        VarianceUncertainty.array.fset(self, value)


class FakeArray:

    def __init__(self, very_faked):
        self.data = very_faked
        self.shape = (100, 100)  # Won't matter. This is just to fool NDData
        self.dtype = np.float32  # Same here

    def __getitem__(self, index):
        # FAKE NEWS!
        return None

    def __array__(self):
        return self.data


class NDWindowing:

    def __init__(self, target):
        self._target = target

    def __getitem__(self, slice):
        return NDWindowingAstroData(self._target, window=slice)


class NDWindowingAstroData(NDArithmeticMixin, NDSlicingMixin, NDData):
    """
    Allows "windowed" access to some properties of an ``NDAstroData`` instance.
    In particular, ``data``, ``uncertainty``, ``variance``, and ``mask`` return
    clipped data.
    """
    def __init__(self, target, window):
        self._target = target
        self._window = window

    @property
    def unit(self):
        return self._target.unit

    @property
    def wcs(self):
        return self._target._slice_wcs(self._window)

    @property
    def data(self):
        return self._target._get_simple('_data', section=self._window)

    @property
    def uncertainty(self):
        return self._target._get_uncertainty(section=self._window)

    @property
    def variance(self):
        if self.uncertainty is not None:
            return self.uncertainty.array

    @property
    def mask(self):
        return self._target._get_simple('_mask', section=self._window)


def is_lazy(item):
    return isinstance(item, ImageHDU) or (hasattr(item, 'lazy') and item.lazy)


[docs]class NDAstroData(NDArithmeticMixin, NDSlicingMixin, NDData): """ Implements ``NDData`` with all Mixins, plus some ``AstroData`` specifics. This class implements an ``NDData``-like container that supports reading and writing as implemented in the ``astropy.io.registry`` and also slicing (indexing) and simple arithmetics (add, subtract, divide and multiply). A very important difference between ``NDAstroData`` and ``NDData`` is that the former attempts to load all its data lazily. There are also some important differences in the interface (eg. ``.data`` lets you reset its contents after initialization). Documentation is provided where our class differs. See also -------- NDData NDArithmeticMixin NDSlicingMixin Examples -------- The mixins allow operation that are not possible with ``NDData`` or ``NDDataBase``, i.e. simple arithmetics:: >>> from astropy.nddata import StdDevUncertainty >>> import numpy as np >>> data = np.ones((3,3), dtype=np.float) >>> ndd1 = NDAstroData(data, uncertainty=StdDevUncertainty(data)) >>> ndd2 = NDAstroData(data, uncertainty=StdDevUncertainty(data)) >>> ndd3 = ndd1.add(ndd2) >>> ndd3.data array([[2., 2., 2.], [2., 2., 2.], [2., 2., 2.]]) >>> ndd3.uncertainty.array array([[1.41421356, 1.41421356, 1.41421356], [1.41421356, 1.41421356, 1.41421356], [1.41421356, 1.41421356, 1.41421356]]) see ``NDArithmeticMixin`` for a complete list of all supported arithmetic operations. But also slicing (indexing) is possible:: >>> ndd4 = ndd3[1,:] >>> ndd4.data array([2., 2., 2.]) >>> ndd4.uncertainty.array array([1.41421356, 1.41421356, 1.41421356]) See ``NDSlicingMixin`` for a description how slicing works (which attributes) are sliced. """ def __init__(self, data, uncertainty=None, mask=None, wcs=None, meta=None, unit=None, copy=False, window=None): super().__init__(FakeArray(data) if is_lazy(data) else data, None if is_lazy(uncertainty) else uncertainty, mask, wcs, meta, unit, copy) if is_lazy(data): self.data = data if is_lazy(uncertainty): self.uncertainty = uncertainty def __deepcopy__(self, memo): new = self.__class__(self._data if is_lazy(self._data) else deepcopy(self.data, memo), self._uncertainty if is_lazy(self._uncertainty) else None, self._mask if is_lazy(self._mask) else deepcopy(self.mask, memo), deepcopy(self.wcs, memo), None, self.unit) new.meta = deepcopy(self.meta, memo) # Needed to avoid recursion because of uncertainty's weakref to self if not is_lazy(self._uncertainty): new.variance = deepcopy(self.variance) return new def _arithmetic(self, operation, operand, propagate_uncertainties=True, handle_mask=np.bitwise_or, handle_meta=None, uncertainty_correlation=0, compare_wcs='first_found', **kwds): """ Override the NDData method so that "bitwise_or" becomes the default operation to combine masks, rather than "logical_or" """ return super()._arithmetic( operation, operand, propagate_uncertainties=propagate_uncertainties, handle_mask=handle_mask, handle_meta=handle_meta, uncertainty_correlation=uncertainty_correlation, compare_wcs=compare_wcs, **kwds) def _slice_wcs(self, slices): """ gWCS doesn't appear to conform to the APE 14 interface for WCS implementations, and doesn't react to slicing properly. We override NDSlicing's method to do what we want. """ if not isinstance(self.wcs, gWCS): return self.wcs # Sanitize the slices, catching some errors early if not isinstance(slices, (tuple, list)): slices = (slices,) slices = list(slices) ndim = len(self.shape) if len(slices) > ndim: raise ValueError(f"Too many dimensions specified in slice {slices}") if Ellipsis in slices: if slices.count(Ellipsis) > 1: raise IndexError("Only one ellipsis can be specified in a slice") ell_index = slices.index(Ellipsis) slices[ell_index:ell_index+1] = [slice(None)] * (ndim - len(slices) + 1) slices.extend([slice(None)] * (ndim-len(slices))) mods = [] mapped_axes = [] for i, (slice_, length) in enumerate(zip(slices[::-1], self.shape)): model = [] if isinstance(slice_, slice): if slice_.step and slice_.step > 1: raise IndexError("Cannot slice with a step") if slice_.start: start = length + slice_.start if slice_.start < 1 else slice_.start if start > 0: model.append(models.Shift(start)) mapped_axes.append(max(mapped_axes)+1 if mapped_axes else 0) elif isinstance(slice_, INTEGER_TYPES): model.append(models.Const1D(slice_)) mapped_axes.append(-1) else: raise IndexError("Slice not an integer or range") if model: mods.append(reduce(Model.__or__, model)) else: # If the previous model was an Identity, we can hang this # one onto that without needing to append a new Identity if i > 0 and isinstance(mods[-1], models.Identity): mods[-1] = models.Identity(mods[-1].n_inputs + 1) else: mods.append(models.Identity(1)) slicing_model = reduce(Model.__and__, mods) if mapped_axes != list(np.arange(ndim)): mapped_axes = [max(x, 0) for x in mapped_axes] slicing_model = models.Mapping(mapped_axes) | slicing_model if isinstance(slicing_model, models.Identity) and slicing_model.n_inputs == ndim: return self.wcs # Unchanged! new_wcs = deepcopy(self.wcs) new_wcs.insert_transform(new_wcs.input_frame, slicing_model, after=True) return new_wcs @property def window(self): """ Interface to access a section of the data, using lazy access whenever possible. Returns -------- An instance of ``NDWindowing``, which provides ``__getitem__``, to allow the use of square brackets when specifying the window. Ultimately, an ``NDWindowingAstrodata`` instance is returned Examples --------- >>> ad[0].nddata.window[100:200, 100:200] # doctest: +SKIP <NDWindowingAstrodata .....> """ return NDWindowing(self) @property def shape(self): return self._data.shape def _get_uncertainty(self, section=None): """Return the ADVarianceUncertainty object, or a slice of it.""" if self._uncertainty is not None: if is_lazy(self._uncertainty): if section is None: self.uncertainty = ADVarianceUncertainty(self._uncertainty.data) return self.uncertainty else: return ADVarianceUncertainty(self._uncertainty[section]) elif section is not None: return self._uncertainty[section] else: return self._uncertainty def _get_simple(self, target, section=None): source = getattr(self, target) if source is not None: if is_lazy(source): if section is None: ret = np.empty(source.shape, dtype=source.dtype) ret[:] = source.data setattr(self, target, ret) else: ret = source[section] return ret elif section is not None: return np.array(source, copy=False)[section] else: return np.array(source, copy=False) @property def data(self): """ An array representing the raw data stored in this instance. It implements a setter. """ return self._get_simple('_data') @data.setter def data(self, value): if value is None: raise ValueError("Cannot have None as the data value for an NDData object") if is_lazy(value): self.meta['header'] = value.header self._data = value @property def uncertainty(self): return self._get_uncertainty() @uncertainty.setter def uncertainty(self, value): if value is not None and not is_lazy(value): if value._parent_nddata is not None: value = value.__class__(value, copy=False) value.parent_nddata = self self._uncertainty = value @property def mask(self): return self._get_simple('_mask') @mask.setter def mask(self, value): self._mask = value @property def variance(self): """ A convenience property to access the contents of ``uncertainty``, squared (as the uncertainty data is stored as standard deviation). """ arr = self._get_uncertainty() if arr is not None: return arr.array @variance.setter def variance(self, value): self.uncertainty = (ADVarianceUncertainty(value) if value is not None else None)
[docs] def set_section(self, section, input): """ Sets only a section of the data. This method is meant to prevent fragmentation in the Python heap, by reusing the internal structures instead of replacing them with new ones. Args ----- section : ``slice`` The area that will be replaced input : ``NDData``-like instance This object needs to implement at least ``data``, ``uncertainty``, and ``mask``. Their entire contents will replace the data in the area defined by ``section``. Examples --------- >>> sec = NDData(np.zeros((100,100))) # doctest: +SKIP >>> ad[0].nddata.set_section((slice(None,100),slice(None,100)), sec) # doctest: +SKIP """ self.data[section] = input.data if self.uncertainty is not None: self.uncertainty.array[section] = input.uncertainty.array if self.mask is not None: self.mask[section] = input.mask
def __repr__(self): if is_lazy(self._data): return self.__class__.__name__ + '(Memmapped)' else: return super().__repr__() @property def T(self): return self.transpose()
[docs] def transpose(self): unc = self.uncertainty return self.__class__( self.data.T, uncertainty=None if unc is None else unc.__class__(unc.array.T), mask=None if self.mask is None else self.mask.T, copy=False )
@property def wcs(self): return super().wcs @wcs.setter def wcs(self, value): if value is not None and not isinstance(value, (WCS, gWCS)): raise TypeError("wcs value must be None or a WCS object") self._wcs = value