import gc
import inspect
import logging
import os
import re
import traceback
import warnings
from collections import OrderedDict
from copy import deepcopy
from functools import partial, wraps
from io import BytesIO
from itertools import zip_longest, product as cart_product
from .core import AstroData, DataProvider, astro_data_descriptor
from .nddata import NDAstroData as NDDataObject, ADVarianceUncertainty
from . import wcs as adwcs
import astropy
from astropy.io import fits
from astropy.io.fits import HDUList, DELAYED
from astropy.io.fits import PrimaryHDU, ImageHDU, BinTableHDU, TableHDU
from astropy.io.fits import Column, FITS_rec
from astropy.io.fits.hdu.table import _TableBaseHDU
from astropy import units as u
# NDDataRef is still not in the stable astropy, but this should be the one
# we use in the future...
# from astropy.nddata import NDData, NDDataRef as NDDataObject
from astropy.nddata import NDData
from astropy.table import Table
import numpy as np
from gwcs.wcs import WCS as gWCS
from gwcs import coordinate_frames as cf
INTEGER_TYPES = (int, np.integer)
NO_DEFAULT = object()
LOGGER = logging.getLogger(__name__)
[docs]class AstroDataFitsDeprecationWarning(DeprecationWarning):
pass
warnings.simplefilter("always", AstroDataFitsDeprecationWarning)
[docs]def deprecated(reason):
def decorator_wrapper(fn):
@wraps(fn)
def wrapper(*args, **kw):
current_source = '|'.join(traceback.format_stack(inspect.currentframe()))
if current_source not in wrapper.seen:
wrapper.seen.add(current_source)
warnings.warn(reason, AstroDataFitsDeprecationWarning)
return fn(*args, **kw)
wrapper.seen = set()
return wrapper
return decorator_wrapper
[docs]class KeywordCallableWrapper:
def __init__(self, keyword, default=NO_DEFAULT, on_ext=False, coerce_with=None):
self.kw = keyword
self.on_ext = on_ext
self.default = default
self.coercion_fn = coerce_with if coerce_with is not None else (lambda x: x)
def __call__(self, adobj):
def wrapper():
manip = adobj.phu if not self.on_ext else adobj.hdr
if self.default is NO_DEFAULT:
ret = getattr(manip, self.kw)
else:
ret = manip.get(self.kw, self.default)
return self.coercion_fn(ret)
return wrapper
[docs]def new_imagehdu(data, header, name=None):
# Assigning data in a delayed way, won't reset BZERO/BSCALE in the header,
# for some reason. Need to investigated. Maybe astropy.io.fits bug. Figure
# out WHY were we delaying in the first place.
# i = ImageHDU(data=DELAYED, header=header.copy(), name=name)
# i.data = data
return ImageHDU(data=data, header=header.copy(), name=name)
[docs]def table_to_bintablehdu(table, extname=None):
"""
Convert an astropy Table object to a BinTableHDU before writing to disk.
Parameters
----------
table: astropy.table.Table instance
the table to be converted to a BinTableHDU
extname: str
name to go in the EXTNAME field of the FITS header
Returns
-------
BinTableHDU
"""
add_header_to_table(table)
array = table.as_array()
header = table.meta['header'].copy()
if extname:
header['EXTNAME'] = (extname, 'added by AstroData')
coldefs = []
for n, name in enumerate(array.dtype.names, 1):
coldefs.append(Column(
name = header.get('TTYPE{}'.format(n)),
format = header.get('TFORM{}'.format(n)),
unit = header.get('TUNIT{}'.format(n)),
null = header.get('TNULL{}'.format(n)),
bscale = header.get('TSCAL{}'.format(n)),
bzero = header.get('TZERO{}'.format(n)),
disp = header.get('TDISP{}'.format(n)),
start = header.get('TBCOL{}'.format(n)),
dim = header.get('TDIM{}'.format(n)),
array = array[name]
))
return BinTableHDU(data=FITS_rec.from_columns(coldefs), header=header)
header_type_map = {
'bool': 'L',
'int8': 'B',
'int16': 'I',
'int32': 'J',
'int64': 'K',
'uint8': 'B',
'uint16': 'I',
'uint32': 'J',
'uint64': 'K',
'float32': 'E',
'float64': 'D',
'complex64': 'C',
'complex128': 'M'
}
[docs]def card_filter(cards, include=None, exclude=None):
for card in cards:
if include is not None and card[0] not in include:
continue
elif exclude is not None and card[0] in exclude:
continue
yield card
[docs]def normalize_indices(slc, nitems):
multiple = True
if isinstance(slc, slice):
start, stop, step = slc.indices(nitems)
indices = list(range(start, stop, step))
elif isinstance(slc, INTEGER_TYPES) or (isinstance(slc, tuple) and all(isinstance(i, INTEGER_TYPES) for i in slc)):
if isinstance(slc, INTEGER_TYPES):
slc = (int(slc),) # slc's type m
multiple = False
else:
multiple = True
# Normalize negative indices...
indices = [(x if x >= 0 else nitems + x) for x in slc]
else:
raise ValueError("Invalid index: {}".format(slc))
if any(i >= nitems for i in indices):
raise IndexError("Index out of range")
return indices, multiple
[docs]class FitsProviderProxy(DataProvider):
# TODO: CAVEAT. Not all methods are intercepted. Some, like "info", may not
# make sense for slices. If a method of interest is identified, we need to
# implement it properly, or make it raise an exception if not valid.
def __init__(self, provider, mapping, single):
# We're overloading __setattr__. This is safer than setting the
# attributes the normal way.
self.__dict__.update({
'_provider': provider,
'_mapping': tuple(mapping),
'_sliced': True,
'_single': single
})
@property
def is_sliced(self):
return True
@property
def is_single(self):
return self._single
def __deepcopy__(self, memo):
return self._provider._clone(mapping=self._mapping)
[docs] def is_settable(self, attr):
if attr in {'path', 'filename'}:
return False
return self._provider.is_settable(attr)
def __len__(self):
return len(self._mapping)
def _mapped_nddata(self, idx=None):
if idx is None:
return [self._provider._nddata[idx] for idx in self._mapping]
else:
return self._provider._nddata[self._mapping[idx]]
def __getattr__(self, attribute):
if not attribute.startswith('_'):
try:
# Check first if this is something we can get from the main object
# But only if it's not an internal attribute
try:
return self._provider._getattr_impl(attribute, self._mapped_nddata())
except AttributeError:
# Not a special attribute. Check the regular interface
return getattr(self._provider, attribute)
except AttributeError:
pass
# Not found in the real Provider. Ok, if we're working with single
# slices, let's look some things up in the ND object
if self.is_single:
if attribute.isupper():
try:
return self._mapped_nddata(0).meta['other'][attribute]
except KeyError:
# Not found. Will raise an exception...
pass
raise AttributeError("{} not found in this object".format(attribute))
def __setattr__(self, attribute, value):
def _my_attribute(attr):
return attr in self.__dict__ or attr in self.__class__.__dict__
# This method is meant to let the user set certain attributes of the NDData
# objects. First we check if the attribute belongs to this object's dictionary.
# Otherwise, see if we can pass it down.
if not _my_attribute(attribute) and self._provider.is_settable(attribute):
if attribute.isupper():
if not self.is_single:
raise TypeError("This attribute can only be assigned to a single-slice object")
target = self._mapped_nddata(0)
self._provider.append(value, name=attribute, add_to=target)
return
elif attribute in {'path', 'filename'}:
# FIXME: never reached because path/filename are not settable
raise AttributeError("Can't set path or filename on a sliced object")
else:
setattr(self._provider, attribute, value)
super().__setattr__(attribute, value)
def __delattr__(self, attribute):
if not attribute.isupper():
raise ValueError("Can't delete non-capitalized attributes from slices")
if not self.is_single:
raise TypeError("Can't delete attributes on non-single slices")
other, otherh = self.nddata.meta['other'], self.nddata.meta['other_header']
if attribute in other:
del other[attribute]
if attribute in otherh:
del otherh[attribute]
else:
raise AttributeError("'{}' does not exist in this extension".format(attribute))
@property
def exposed(self):
return self._provider._exposed.copy() | set(self._mapped_nddata(0).meta['other'])
def __iter__(self):
if self._single:
yield self
else:
for n in self._mapping:
yield self._provider._slice((n,), multi=False)
def __getitem__(self, slc):
if self.is_single:
raise TypeError("Can't slice a single slice!")
indices, multiple = normalize_indices(slc, nitems=len(self))
mapped_indices = tuple(self._mapping[idx] for idx in indices)
return self._provider._slice(mapped_indices, multi=multiple)
def __delitem__(self, idx):
raise TypeError("Can't remove items from a sliced object")
def __iadd__(self, operand):
self._provider._standard_nddata_op(NDDataObject.add, operand, self._mapping)
return self
def __isub__(self, operand):
self._provider._standard_nddata_op(NDDataObject.subtract, operand, self._mapping)
return self
def __imul__(self, operand):
self._provider._standard_nddata_op(NDDataObject.multiply, operand, self._mapping)
return self
def __itruediv__(self, operand):
self._provider._standard_nddata_op(NDDataObject.divide, operand, self._mapping)
return self
def __rtruediv__(self, operand):
self._provider._oper(self._provider._rdiv, operand, self._mapping)
return self
@property
@deprecated("Access to headers through this property is deprecated and will be removed in the future. "
"Use '.hdr' instead.")
def header(self):
return self._provider._get_raw_headers(with_phu=True, indices=self._mapping)
@property
def data(self):
if self.is_single:
return self._mapped_nddata(0).data
else:
return [nd.data for nd in self._mapped_nddata()]
@data.setter
def data(self, value):
if not self.is_single:
raise ValueError("Trying to assign to an AstroData object that is not a single slice")
ext = self._mapped_nddata(0)
# Setting the ._data in the NDData is a bit kludgy, but we're all grown adults
# and know what we're doing, isn't it?
if hasattr(value, 'shape'):
ext._data = value
else:
raise AttributeError("Trying to assign data to be something with no shape")
@property
def uncertainty(self):
if self.is_single:
return self._mapped_nddata(0).uncertainty
else:
return [nd.uncertainty for nd in self._mapped_nddata()]
@uncertainty.setter
def uncertainty(self, value):
if not self.is_single:
raise ValueError("Trying to assign to an AstroData object that is not a single slice")
self._mapped_nddata(0).uncertainty = value
@property
def mask(self):
if self.is_single:
return self._mapped_nddata(0).mask
else:
return [nd.mask for nd in self._mapped_nddata()]
@mask.setter
def mask(self, value):
if not self.is_single:
raise ValueError("Trying to assign to an AstroData object that is not a single slice")
self._mapped_nddata(0).mask = value
@property
def variance(self):
if self.is_single:
return self._mapped_nddata(0).variance
else:
return [nd.variance for nd in self._mapped_nddata()]
@variance.setter
def variance(self, value):
if not self.is_single:
raise ValueError("Trying to assign to an AstroData object that is not a single slice")
nd = self._mapped_nddata(0)
if value is None:
nd.uncertainty = None
else:
nd.uncertainty = ADVarianceUncertainty(value)
@property
def nddata(self):
if not self.is_single:
return self._mapped_nddata()
else:
return self._mapped_nddata(0)
@property
def shape(self):
if not self.is_single:
return [nd.shape for nd in self._mapped_nddata()]
else:
return self._mapped_nddata(0).shape
@property
def wcs(self):
if self.is_single:
return self._mapped_nddata(0).wcs
raise ValueError("Cannot return WCS for an AstroData object that is not a single slice")
@wcs.setter
def wcs(self, value):
if not self.is_single:
raise ValueError("Trying to assign to an AstroData object that is not a single slice")
self._mapped_nddata(0).wcs = value
[docs] def hdr(self):
headers = self._provider._get_raw_headers(indices=self._mapping)
return headers[0] if self.is_single else FitsHeaderCollection(headers)
def _crop_nd(self, nd, x1, y1, x2, y2):
# needed because __getattr__ breaks finding private methods in the
# parent class...
self._provider._crop_nd(nd, x1, y1, x2, y2)
def _crop_impl(self, x1, y1, x2, y2, nds=None):
# needed because __getattr__ breaks finding private methods in the
# parent class...
self._provider._crop_impl(x1, y1, x2, y2, nds=nds)
[docs] def crop(self, x1, y1, x2, y2):
self._crop_impl(x1, y1, x2, y2, self._mapped_nddata())
[docs] def append(self, ext, name):
if not self.is_single:
# TODO: We could rethink this one, but leave it like that at the moment
raise TypeError("Can't append objects to non-single slices")
elif name is None:
raise TypeError("Can't append objects to a slice without an extension name")
target = self._mapped_nddata(0)
return self._provider.append(ext, name=name, add_to=target)
[docs] def extver_map(self):
"""
Provide a mapping between the FITS EXTVER of an extension and the index
that will be used to access it within this object.
Returns
-------
A dictionary `{EXTVER:index, ...}`
Raises
------
ValueError
If used against a single slice. It is of no use in that situation.
"""
if self.is_single:
raise ValueError("Trying to get a mapping out of a single slice")
return self._provider._extver_impl(self._mapped_nddata())
[docs] def info(self, tags):
self._provider.info(tags, indices=self._mapping)
[docs]class FitsProvider(DataProvider):
default_extension = 'SCI'
def __init__(self):
# We're overloading __setattr__. This is safer than setting the
# attributes the normal way.
self.__dict__.update({
'_sliced': False,
'_single': False,
'_phu': None,
'_nddata': [],
'_path': None,
'_orig_filename': None,
'_tables': {},
'_exposed': set(),
'_resetting': False,
'_fixed_settable': {
'data',
'uncertainty',
'mask',
'variance',
'wcs',
'path',
'filename'
}
})
def __deepcopy__(self, memo):
nfp = FitsProvider()
to_copy = ('_sliced', '_phu', '_single', '_nddata',
'_path', '_orig_filename', '_tables', '_exposed',
'_resetting')
for attr in to_copy:
nfp.__dict__[attr] = deepcopy(self.__dict__[attr])
# Top-level tables
for key in set(self.__dict__) - set(nfp.__dict__):
nfp.__dict__[key] = nfp.__dict__['_tables'][key]
return nfp
def _clone(self, mapping=None):
if mapping is None:
mapping = range(len(self))
dp = FitsProvider()
dp._phu = deepcopy(self._phu)
for n in mapping:
dp.append(deepcopy(self._nddata[n]))
for t in self._tables.values():
dp.append(deepcopy(t))
return dp
[docs] def is_settable(self, attr):
return attr in self._fixed_settable or attr.isupper()
def _getattr_impl(self, attribute, nds):
# Exposed objects are part of the normal object interface. We may have
# just lazy-loaded them, and that's why we get here...
if attribute in self._exposed:
return getattr(self, attribute)
# Check if it's an aliased object
for nd in nds:
if nd.meta.get('name') == attribute:
return nd
raise AttributeError("Not found")
def __getattr__(self, attribute):
try:
return self._getattr_impl(attribute, self._nddata)
except AttributeError:
raise AttributeError("{} not found in this object, or available only for sliced data".format(attribute))
def __setattr__(self, attribute, value):
def _my_attribute(attr):
return attr in self.__dict__ or attr in self.__class__.__dict__
# This method is meant to let the user set certain attributes of the NDData
# objects.
#
# self._resetting shortcircuits the method when populating the object. In that
# situation, we don't want to interfere. Of course, we need to check first
# if self._resetting is there, because otherwise we enter a loop..
# CJS 20200131: if the attribute is "exposed" then we should set it via the
# append method I think (it's a Table or something)
if ('_resetting' in self.__dict__ and not self._resetting and
(not _my_attribute(attribute) or attribute in self._exposed)):
if attribute.isupper():
self.append(value, name=attribute, add_to=None)
return
# Fallback
super().__setattr__(attribute, value)
def __delattr__(self, attribute):
# TODO: So far we're only deleting tables by name.
# Figure out what to do with aliases
if not attribute.isupper():
raise ValueError("Can't delete non-capitalized attributes")
try:
del self._tables[attribute]
del self.__dict__[attribute]
except KeyError:
raise AttributeError("'{}' is not a global table for this instance".format(attribute))
def _oper(self, operator, operand, indices=None):
if indices is None:
indices = tuple(range(len(self._nddata)))
if isinstance(operand, AstroData):
if len(operand) != len(indices):
raise ValueError("Operands are not the same size")
for n in indices:
try:
self._set_nddata(n, operator(self._nddata[n],
(operand.nddata if operand.is_single else operand.nddata[n])))
except TypeError:
# This may happen if operand is a sliced, single AstroData object
self._set_nddata(n, operator(self._nddata[n], operand.nddata))
op_table = operand.table()
ltab, rtab = set(self._tables), set(op_table)
for tab in (rtab - ltab):
# Fix for release/3.0.x - revert to _tables in master with the Simon refactor
self.append(op_table[tab], name=tab, add_to=None)
# self._tables[tab] = op_table[tab]
else:
for n in indices:
self._set_nddata(n, operator(self._nddata[n], operand))
def _standard_nddata_op(self, fn, operand, indices=None):
return self._oper(partial(fn, handle_mask=np.bitwise_or, handle_meta='first_found'),
operand, indices)
def __iadd__(self, operand):
self._standard_nddata_op(NDDataObject.add, operand)
return self
def __isub__(self, operand):
self._standard_nddata_op(NDDataObject.subtract, operand)
return self
def __imul__(self, operand):
self._standard_nddata_op(NDDataObject.multiply, operand)
return self
def __itruediv__(self, operand):
self._standard_nddata_op(NDDataObject.divide, operand)
return self
def __rtruediv__(self, operand):
self._oper(self._rdiv, operand)
return self
def _rdiv(self, ndd, operand):
# Divide method works with the operand first
return NDDataObject.divide(operand, ndd)
[docs] def set_phu(self, phu):
self._phu = phu
[docs] def info(self, tags, indices=None):
print("Filename: {}".format(self.path if self.path else "Unknown"))
# This is fixed. We don't support opening for update
# print("Mode: readonly")
tags = sorted(tags, reverse=True)
tag_line = "Tags: "
while tags:
new_tag = tags.pop() + ' '
if len(tag_line + new_tag) > 80:
print(tag_line)
tag_line = " " + new_tag
else:
tag_line = tag_line + new_tag
print(tag_line)
# Let's try to be generic. Could it be that some file contains only tables?
if indices is None:
indices = tuple(range(len(self._nddata)))
if indices:
main_fmt = "{:6} {:24} {:17} {:14} {}"
other_fmt = " .{:20} {:17} {:14} {}"
print("\nPixels Extensions")
print(main_fmt.format("Index", "Content", "Type", "Dimensions", "Format"))
for pi in self._pixel_info(indices):
main_obj = pi['main']
print(main_fmt.format(pi['idx'], main_obj['content'][:24], main_obj['type'][:17],
main_obj['dim'], main_obj['data_type']))
for other in pi['other']:
print(other_fmt.format(other['attr'][:20], other['type'][:17], other['dim'],
other['data_type']))
additional_ext = list(self._other_info())
if additional_ext:
print("\nOther Extensions")
print(" Type Dimensions")
for (attr, type_, dim) in additional_ext:
print(".{:13} {:11} {}".format(attr[:13], type_[:11], dim))
def _pixel_info(self, indices):
for idx, obj in ((n, self._nddata[k]) for (n, k) in enumerate(indices)):
other_objects = []
uncer = obj.uncertainty
fixed = (('variance', None if uncer is None else uncer), ('mask', obj.mask))
for name, other in fixed + tuple(sorted(obj.meta['other'].items())):
if other is not None:
if isinstance(other, Table):
other_objects.append(dict(
attr=name, type='Table',
dim=str((len(other), len(other.columns))),
data_type='n/a'
))
else:
dim = ''
if hasattr(other, 'dtype'):
dt = other.dtype.name
dim = str(other.shape)
elif hasattr(other, 'data'):
dt = other.data.dtype.name
dim = str(other.data.shape)
elif hasattr(other, 'array'):
dt = other.array.dtype.name
dim = str(other.array.shape)
else:
dt = 'unknown'
other_objects.append(dict(
attr=name, type=type(other).__name__,
dim=dim, data_type = dt
))
yield dict(
idx = '[{:2}]'.format(idx),
main = dict(
content = 'science',
type = type(obj).__name__,
dim = '({})'.format(', '.join(str(s) for s in obj.data.shape)),
data_type = obj.data.dtype.name
),
other = other_objects
)
def _other_info(self):
# NOTE: This covers tables, only. Study other cases before implementing a more general solution
if self._tables:
for name, table in sorted(self._tables.items()):
if type(table) is list:
# This is not a free floating table
continue
yield (name, 'Table', (len(table), len(table.columns)))
@property
def exposed(self):
return self._exposed.copy()
def _slice(self, indices, multi=True):
return FitsProviderProxy(self, indices, single=not multi)
def __iter__(self):
for n in range(len(self)):
yield self._slice((n,), multi=False)
def __getitem__(self, slc):
nitems = len(self._nddata)
indices, multiple = normalize_indices(slc, nitems=nitems)
return self._slice(indices, multi=multiple)
def __delitem__(self, idx):
del self._nddata[idx]
def __len__(self):
return len(self._nddata)
# NOTE: This one does not make reference to self at all. May as well
# move it out
def _process_table(self, table, name=None, header=None):
if isinstance(table, BinTableHDU):
obj = Table(table.data, meta={'header': header or table.header})
for i, col in enumerate(obj.columns, start=1):
try:
obj[col].unit = u.Unit(obj.meta['header']['TUNIT{}'.format(i)])
except (KeyError, TypeError, ValueError):
pass
elif isinstance(table, Table):
obj = Table(table)
if header is not None:
obj.meta['header'] = deepcopy(header)
elif 'header' not in obj.meta:
obj.meta['header'] = header_for_table(obj)
else:
raise ValueError("{} is not a recognized table type".format(table.__class__))
if name is not None:
obj.meta['header']['EXTNAME'] = name
return obj
def _get_max_ver(self):
try:
return max(_nd.meta['ver'] for _nd in self._nddata) + 1
except ValueError:
# This seems to be the first extension!
return 1
def _reset_ver(self, nd):
ver = self._get_max_ver()
nd.meta['header']['EXTVER'] = ver
nd.meta['ver'] = ver
try:
oheaders = nd.meta['other_header']
for extname, ext in nd.meta['other'].items():
try:
oheaders[extname]['EXTVER'] = ver
except KeyError:
pass
try:
# The object may keep the header on its own structure
ext.meta['header']['EXTVER'] = ver
except AttributeError:
pass
except KeyError:
pass
return ver
def _process_pixel_plane(self, pixim, name=None, top_level=False,
reset_ver=True, custom_header=None):
if not isinstance(pixim, NDDataObject):
# Assume that we get an ImageHDU or something that can be
# turned into one
if isinstance(pixim, ImageHDU):
nd = NDDataObject(pixim.data, meta={'header': pixim.header})
elif custom_header is not None:
nd = NDDataObject(pixim, meta={'header': custom_header})
else:
nd = NDDataObject(pixim, meta={'header': {}})
else:
nd = pixim
if custom_header is not None:
nd.meta['header'] = custom_header
header = nd.meta['header']
currname = header.get('EXTNAME')
ver = header.get('EXTVER', -1)
# TODO: Review the logic. This one seems bogus
if name and (currname is None):
header['EXTNAME'] = (name if name is not None
else FitsProvider.default_extension)
if top_level:
if 'other' not in nd.meta:
nd.meta['other'] = OrderedDict()
nd.meta['other_header'] = {}
if reset_ver or ver == -1:
self._reset_ver(nd)
else:
nd.meta['ver'] = ver
return nd
@property
def path(self):
return self._path
@path.setter
def path(self, value):
if self._path is None and value is not None:
self._orig_filename = os.path.basename(value)
self._path = value
@property
def filename(self):
if self.path is not None:
return os.path.basename(self.path)
@filename.setter
def filename(self, value):
if os.path.isabs(value):
raise ValueError("Cannot set the filename to an absolute path!")
elif self.path is None:
self.path = os.path.abspath(value)
else:
dirname = os.path.dirname(self.path)
self.path = os.path.join(dirname, value)
@property
def orig_filename(self):
return self._orig_filename
def _ext_header(self, obj):
if isinstance(obj, int):
# Assume that 'obj' is an index
obj = self.nddata[obj]
return obj.meta['header']
def _get_raw_headers(self, with_phu=False, indices=None):
if indices is None:
indices = range(len(self.nddata))
extensions = [self._ext_header(self.nddata[n]) for n in indices]
if with_phu:
return [self._phu] + extensions
return extensions
@property
@deprecated("Access to headers through this property is deprecated and will be removed in the future")
def header(self):
return self._get_raw_headers(with_phu=True)
@property
def nddata(self):
return self._nddata
[docs] def phu(self):
return self._phu
[docs] def hdr(self):
if not self.nddata:
return None
return FitsHeaderCollection(self._get_raw_headers())
[docs] def to_hdulist(self):
hlst = HDUList()
hlst.append(PrimaryHDU(header=self.phu(), data=DELAYED))
for ext in self._nddata:
meta = ext.meta
header, ver = meta['header'], meta['ver']
wcs = ext.wcs
if isinstance(wcs, gWCS):
# We don't have access to the AD tags so see if it's an image
# Catch ValueError as any sort of failure
try:
wcs_dict = adwcs.gwcs_to_fits(ext, self._phu)
except (ValueError, NotImplementedError) as e:
LOGGER.warning(e)
else:
# Must delete keywords if image WCS has been downscaled
# from a higher number of dimensions
for i in range(1, 5):
for kw in (f'CDELT{i}', f'CRVAL{i}', f'CUNIT{i}', f'CTYPE{i}'):
if kw in header:
del header[kw]
for j in range(1, 5):
for kw in (f'CD{i}_{j}', f'PC{i}_{j}', f'CRPIX{j}'):
if kw in header:
del header[kw]
header.update(wcs_dict)
# Use "in" here as the dict entry may be (value, comment)
if 'APPROXIMATE' not in wcs_dict.get('FITS-WCS', ''):
wcs = None # There's no need to create a WCS extension
hlst.append(new_imagehdu(ext.data, header, 'SCI'))
if ext.uncertainty is not None:
hlst.append(new_imagehdu(ext.uncertainty.array, header, 'VAR'))
if ext.mask is not None:
hlst.append(new_imagehdu(ext.mask, header, 'DQ'))
if isinstance(wcs, gWCS):
hlst.append(wcs_to_asdftablehdu(ext.wcs, extver=ver))
for name, other in meta.get('other', {}).items():
if isinstance(other, Table):
hlst.append(table_to_bintablehdu(other))
elif isinstance(other, np.ndarray):
hlst.append(new_imagehdu(other, meta['other_header'].get(name, meta['header']), name=name))
elif isinstance(other, NDDataObject):
hlst.append(new_imagehdu(other.data, meta['header']))
else:
raise ValueError("I don't know how to write back an object of type {}".format(type(other)))
if self._tables is not None:
for name, table in sorted(self._tables.items()):
hlst.append(table_to_bintablehdu(table, extname=name))
return hlst
[docs] def table(self):
return self._tables.copy()
@property
def tables(self):
return set(self._tables.keys())
@property
def shape(self):
return [nd.shape for nd in self._nddata]
@property
def data(self):
return [nd.data for nd in self._nddata]
@data.setter
def data(self, value):
raise ValueError("Trying to assign to a non-sliced AstroData object")
@property
def uncertainty(self):
return [nd.uncertainty for nd in self._nddata]
@uncertainty.setter
def uncertainty(self, value):
raise ValueError("Trying to assign to a non-sliced AstroData object")
@property
def mask(self):
return [nd.mask for nd in self._nddata]
@mask.setter
def mask(self, value):
raise ValueError("Trying to assign to a non-sliced AstroData object")
@property
def variance(self):
def variance_for(nd):
if nd.uncertainty is not None:
return nd.uncertainty.array
return [variance_for(nd) for nd in self._nddata]
@variance.setter
def variance(self, value):
raise ValueError("Trying to assign to a non-sliced AstroData object")
def _crop_nd(self, nd, x1, y1, x2, y2):
nd.data = nd.data[y1:y2+1, x1:x2+1]
if nd.uncertainty is not None:
nd.uncertainty = nd.uncertainty[y1:y2+1, x1:x2+1]
if nd.mask is not None:
nd.mask = nd.mask[y1:y2+1, x1:x2+1]
def _crop_impl(self, x1, y1, x2, y2, nds=None):
if nds is None:
nds = self.nddata
# TODO: Consider cropping of objects in the meta section
for nd in nds:
orig_shape = nd.data.shape
self._crop_nd(nd, x1, y1, x2, y2)
for o in nd.meta['other'].values():
try:
if o.shape == orig_shape:
self._crop_nd(o, x1, y1, x2, y2)
except AttributeError:
# No 'shape' attribute in the object. It's probably
# not array-like
pass
[docs] def crop(self, x1, y1, x2, y2):
self._crop_impl(x1, y1, x2, y2)
def _add_to_other(self, add_to, name, data, header=None):
meta = add_to.meta
meta['other'][name] = data
if header:
header['EXTVER'] = meta.get('ver', -1)
meta['other_header'][name] = header
def _append_array(self, data, name=None, header=None, add_to=None):
def_ext = FitsProvider.default_extension
if add_to is None:
# Top level extension
# Special cases for Gemini
if name is None:
name = def_ext
if name in {'DQ', 'VAR'}:
raise ValueError("'{}' need to be associated to a '{}' one"
.format(name, def_ext))
else:
# FIXME: the logic here is broken since name is
# always set to somehing above with def_ext
if name is not None:
hname = name
elif header is not None:
hname = header.get('EXTNAME', def_ext)
else:
hname = def_ext
hdu = ImageHDU(data, header=header)
hdu.header['EXTNAME'] = hname
ret = self._append_imagehdu(hdu, name=hname, header=None, add_to=None)
else:
# Attaching to another extension
if header is not None and name in {'DQ', 'VAR'}:
LOGGER.warning("The header is ignored for '{}' extensions".format(name))
if name is None:
raise ValueError("Can't append pixel planes to other objects without a name")
elif name is def_ext:
raise ValueError("Can't attach '{}' arrays to other objects".format(def_ext))
elif name == 'DQ':
add_to.mask = data
ret = data
elif name == 'VAR':
std_un = ADVarianceUncertainty(data)
std_un.parent_nddata = add_to
add_to.uncertainty = std_un
ret = std_un
else:
self._add_to_other(add_to, name, data, header=header)
ret = data
return ret
def _append_imagehdu(self, hdu, name, header, add_to, reset_ver=True):
if name in {'DQ', 'VAR'} or add_to is not None:
return self._append_array(hdu.data, name=name, add_to=add_to)
else:
nd = self._process_pixel_plane(hdu, name=name, top_level=True,
reset_ver=reset_ver,
custom_header=header)
return self._append_nddata(nd, name, add_to=None)
def _append_raw_nddata(self, raw_nddata, name, header, add_to, reset_ver=True):
# We want to make sure that the instance we add is whatever we specify as
# `NDDataObject`, instead of the random one that the user may pass
top_level = add_to is None
if not isinstance(raw_nddata, NDDataObject):
raw_nddata = NDDataObject(raw_nddata)
processed_nddata = self._process_pixel_plane(raw_nddata, top_level=top_level,
custom_header=header, reset_ver=reset_ver)
return self._append_nddata(processed_nddata, name=name, add_to=add_to)
def _append_nddata(self, new_nddata, name, add_to, reset_ver=True):
# NOTE: This method is only used by others that have constructed NDData
# according to our internal format. We don't accept new headers at this
# point, and that's why it's missing from the signature. 'name' is
# ignored. It's there just to comply with the _append_XXX signature.
def_ext = FitsProvider.default_extension
if add_to is not None:
raise TypeError("You can only append NDData derived instances at the top level")
hd = new_nddata.meta['header']
hname = hd.get('EXTNAME', def_ext)
if hname == def_ext:
if reset_ver:
self._reset_ver(new_nddata)
self._nddata.append(new_nddata)
else:
raise ValueError("Arbitrary image extensions can only be added "
"in association to a '{}'".format(def_ext))
return new_nddata
def _set_nddata(self, n, new_nddata):
self._nddata[n] = new_nddata
def _append_table(self, new_table, name, header, add_to, reset_ver=True):
tb = self._process_table(new_table, name, header)
hname = tb.meta['header'].get('EXTNAME') if name is None else name
#if hname is None:
# raise ValueError("Can't attach a table without a name!")
if add_to is None:
if hname is None:
table_num = 1
while 'TABLE{}'.format(table_num) in self._tables:
table_num += 1
hname = 'TABLE{}'.format(table_num)
# Don't use setattr, which is overloaded and may case problems
self.__dict__[hname] = tb
self._tables[hname] = tb
self._exposed.add(hname)
else:
if hname is None:
table_num = 1
while getattr(add_to, 'TABLE{}'.format(table_num), None):
table_num += 1
hname = 'TABLE{}'.format(table_num)
setattr(add_to, hname, tb)
self._add_to_other(add_to, hname, tb, tb.meta['header'])
add_to.meta['other'][hname] = tb
return tb
def _append_astrodata(self, ad, name, header, add_to, reset_ver=True):
if not ad.is_single:
raise ValueError("Cannot append AstroData instances that are not single slices")
elif add_to is not None:
raise ValueError("Cannot append an AstroData slice to another slice")
new_nddata = deepcopy(ad.nddata)
if header is not None:
new_nddata.meta['header'] = deepcopy(header)
return self._append_nddata(new_nddata, name=None, add_to=None, reset_ver=True)
[docs] def append(self, ext, name=None, header=None, reset_ver=True, add_to=None):
# NOTE: Most probably, if we want to copy the input argument, we
# should do it here...
if isinstance(ext, PrimaryHDU):
raise ValueError("Only one Primary HDU allowed. "
"Use set_phu if you really need to set one")
dispatcher = (
(NDData, self._append_raw_nddata),
((Table, _TableBaseHDU), self._append_table),
(ImageHDU, self._append_imagehdu),
(AstroData, self._append_astrodata),
)
for bases, method in dispatcher:
if isinstance(ext, bases):
return method(ext, name=name, header=header, add_to=add_to, reset_ver=reset_ver)
else:
# Assume that this is an array for a pixel plane
return self._append_array(ext, name=name, header=header, add_to=add_to)
def _extver_impl(self, nds=None):
if nds is None:
nds = self.nddata
return {nd._meta['ver']: n for (n, nd) in enumerate(nds)}
[docs] def extver_map(self):
"""
Provide a mapping between the FITS EXTVER of an extension and the index
that will be used to access it within this object.
Returns
-------
A dictionary `{EXTVER:index, ...}`
Raises
------
ValueError
If used against a single slice. It is of no use in that situation.
"""
return self._extver_impl()
[docs]def fits_ext_comp_key(ext):
"""
Returns a pair (integer, string) that will be used to sort extensions
"""
if isinstance(ext, PrimaryHDU):
# This will guarantee that the primary HDU goes first
ret = (-1, "")
else:
header = ext.header
ver = header.get('EXTVER')
# When two extensions share version number, we'll use their names
# to sort them out. Choose a suitable key so that:
#
# - SCI extensions come first
# - unnamed extensions come last
#
# We'll resort to add 'z' in front of the usual name to force
# SCI to be the "smallest"
name = header.get('EXTNAME') # Make sure that the name is a string
if name is None:
name = "zzzz"
elif name != FitsProvider.default_extension:
name = "z" + name
if ver in (-1, None):
# In practice, this number should be larger than any
# EXTVER found in real life HDUs, pushing unnumbered
# HDUs to the end
ret = (2**32-1, name)
else:
# For the general case, just return version and name, to let them
# be sorted naturally
ret = (ver, name)
return ret
[docs]class FitsLazyLoadable:
def __init__(self, obj):
self._obj = obj
self.lazy = True
def _create_result(self, shape):
return np.empty(shape, dtype=self.dtype)
def _scale(self, data):
bscale = self._obj._orig_bscale
bzero = self._obj._orig_bzero
if bscale == 1 and bzero == 0:
return data
return (bscale * data + bzero).astype(self.dtype)
def __getitem__(self, sl):
# TODO: We may want (read: should) create an empty result array before scaling
return self._scale(self._obj.section[sl])
@property
def header(self):
return self._obj.header
@property
def data(self):
res = self._create_result(self.shape)
res[:] = self._scale(self._obj.data)
return res
@property
def shape(self):
return self._obj.shape
@property
def dtype(self):
"""
Need to to some overriding of astropy.io.fits since it doesn't
know about BITPIX=8
"""
bitpix = self._obj._orig_bitpix
if self._obj._orig_bscale == 1 and self._obj._orig_bzero == 0:
dtype = fits.BITPIX2DTYPE[bitpix]
else:
# this method from astropy will return the dtype if the data
# needs to be converted to unsigned int or scaled to float
dtype = self._obj._dtype_for_bitpix()
if dtype is None:
if bitpix < 0:
dtype = np.dtype('float{}'.format(abs(bitpix)))
if (self._obj.header['EXTNAME'] == 'DQ' or self._obj._uint and
self._obj._orig_bscale == 1 and bitpix == 8):
dtype = np.uint16
return dtype
[docs]class FitsLoader:
def __init__(self, cls=FitsProvider):
self._cls = cls
@staticmethod
def _prepare_hdulist(hdulist, default_extension='SCI', extname_parser=None):
new_list = []
highest_ver = 0
recognized = set()
if len(hdulist) > 1 or (len(hdulist) == 1 and hdulist[0].data is None):
# MEF file
for n, unit in enumerate(hdulist):
if extname_parser:
extname_parser(unit)
ev = unit.header.get('EXTVER')
eh = unit.header.get('EXTNAME')
if ev not in (-1, None) and eh is not None:
highest_ver = max(highest_ver, unit.header['EXTVER'])
elif not isinstance(unit, PrimaryHDU):
continue
new_list.append(unit)
recognized.add(unit)
for unit in hdulist:
if unit in recognized:
continue
elif isinstance(unit, ImageHDU):
highest_ver += 1
if 'EXTNAME' not in unit.header:
unit.header['EXTNAME'] = (default_extension, 'Added by AstroData')
if unit.header.get('EXTVER') in (-1, None):
unit.header['EXTVER'] = (highest_ver, 'Added by AstroData')
new_list.append(unit)
recognized.add(unit)
else:
# Uh-oh, a single image FITS file
new_list.append(PrimaryHDU(header=hdulist[0].header))
image = ImageHDU(header=hdulist[0].header, data=hdulist[0].data)
# Fudge due to apparent issues with assigning ImageHDU from data
image._orig_bscale = hdulist[0]._orig_bscale
image._orig_bzero = hdulist[0]._orig_bzero
for keyw in ('SIMPLE', 'EXTEND'):
if keyw in image.header:
del image.header[keyw]
image.header['EXTNAME'] = (default_extension, 'Added by AstroData')
image.header['EXTVER'] = (1, 'Added by AstroData')
new_list.append(image)
return HDUList(sorted(new_list, key=fits_ext_comp_key))
[docs] def load(self, source, extname_parser=None):
"""
Takes either a string (with the path to a file) or an HDUList as input, and
tries to return a populated FitsProvider (or descendant) instance.
It will raise exceptions if the file is not found, or if there is no match
for the HDUList, among the registered AstroData classes.
"""
provider = self._cls()
if isinstance(source, str):
hdulist = fits.open(source, memmap=True,
do_not_scale_image_data=True, mode='readonly')
provider.path = source
else:
hdulist = source
try:
provider.path = source[0].header.get('ORIGNAME')
except AttributeError:
provider.path = None
def_ext = self._cls.default_extension
_file = hdulist._file
hdulist = self._prepare_hdulist(hdulist, default_extension=def_ext,
extname_parser=extname_parser)
if _file is not None:
hdulist._file = _file
# Initialize the object containers to a bare minimum
if 'ORIGNAME' not in hdulist[0].header and provider.orig_filename is not None:
hdulist[0].header.set('ORIGNAME', provider.orig_filename,
'Original filename prior to processing')
provider.set_phu(hdulist[0].header)
seen = {hdulist[0]}
skip_names = {def_ext, 'REFCAT', 'MDF'}
def associated_extensions(ver):
for unit in hdulist:
header = unit.header
if header.get('EXTVER') == ver and header['EXTNAME'] not in skip_names:
yield unit
sci_units = [x for x in hdulist[1:] if x.header.get('EXTNAME') == def_ext]
for idx, unit in enumerate(sci_units):
seen.add(unit)
ver = unit.header.get('EXTVER', -1)
parts = {'data': unit, 'uncertainty': None, 'mask': None, 'wcs': None, 'other': []}
for extra_unit in associated_extensions(ver):
seen.add(extra_unit)
name = extra_unit.header.get('EXTNAME')
if name == 'DQ':
parts['mask'] = extra_unit
elif name == 'VAR':
parts['uncertainty'] = extra_unit
elif name == 'WCS':
parts['wcs'] = extra_unit
else:
parts['other'].append(extra_unit)
if hdulist._file is not None and hdulist._file.memmap:
nd = NDDataObject(
data = FitsLazyLoadable(parts['data']),
uncertainty = None if parts['uncertainty'] is None else FitsLazyLoadable(parts['uncertainty']),
mask = None if parts['mask'] is None else FitsLazyLoadable(parts['mask']),
wcs = None if parts['wcs'] is None else asdftablehdu_to_wcs(parts['wcs'])
)
if nd.wcs is None:
try:
nd.wcs = adwcs.fitswcs_to_gwcs(nd.meta['header'])
# In case WCS info is in the PHU
if nd.wcs is None:
nd.wcs = adwcs.fitswcs_to_gwcs(hdulist[0].header)
except TypeError as e:
raise e
provider.append(nd, name=def_ext, reset_ver=False)
else:
nd = provider.append(parts['data'], name=def_ext, reset_ver=False)
for item_name in ('mask', 'uncertainty'):
item = parts[item_name]
if item is not None:
provider.append(item, name=item.header['EXTNAME'], add_to=nd)
if isinstance(nd, NDData):
if parts['wcs'] is not None:
nd.wcs = asdftablehdu_to_wcs(parts['wcs'])
else:
try:
nd.wcs = adwcs.fitswcs_to_gwcs(nd.meta['header'])
except TypeError:
pass
for other in parts['other']:
provider.append(other, name=other.header['EXTNAME'], add_to=nd)
for other in hdulist:
if other in seen:
continue
name = other.header.get('EXTNAME')
try:
added = provider.append(other, name=name, reset_ver=False)
except ValueError as e:
print(str(e)+". Discarding "+name)
return provider
[docs]def windowedOp(func, sequence, kernel, shape=None, dtype=None,
with_uncertainty=False, with_mask=False, **kwargs):
"""Apply function on a NDData obbjects, splitting the data in chunks to
limit memory usage.
Parameters
----------
func : callable
The function to apply.
sequence : list of NDData
List of NDData objects.
kernel : tuple of int
Shape of the blocks.
shape : tuple of int
Shape of inputs. Defaults to ``sequence[0].shape``.
dtype : str or dtype
Type of the output array. Defaults to ``sequence[0].dtype``.
with_uncertainty : bool
Compute uncertainty?
with_mask : bool
Compute mask?
**kwargs
Additional args are passed to ``func``.
"""
def generate_boxes(shape, kernel):
if len(shape) != len(kernel):
raise AssertionError("Incompatible shape ({}) and kernel ({})"
.format(shape, kernel))
ticks = [[(x, x+step) for x in range(0, axis, step)]
for axis, step in zip(shape, kernel)]
return list(cart_product(*ticks))
if shape is None:
if len({x.shape for x in sequence}) > 1:
raise ValueError("Can't calculate final shape: sequence elements "
"disagree on shape, and none was provided")
shape = sequence[0].shape
if dtype is None:
dtype = sequence[0].window[:1, :1].data.dtype
result = NDDataObject(
np.empty(shape, dtype=dtype),
uncertainty=(ADVarianceUncertainty(np.zeros(shape, dtype=dtype))
if with_uncertainty else None),
mask=(np.empty(shape, dtype=np.uint16) if with_mask else None),
meta=sequence[0].meta, wcs=sequence[0].wcs
)
# Delete other extensions because we don't know what to do with them
result.meta['other'] = OrderedDict()
result.meta['other_header'] = {}
# The Astropy logger's "INFO" messages aren't warnings, so have to fudge
log_level = astropy.logger.conf.log_level
astropy.log.setLevel(astropy.logger.WARNING)
boxes = generate_boxes(shape, kernel)
try:
for coords in boxes:
section = tuple([slice(start, end) for (start, end) in coords])
out = func([element.window[section] for element in sequence],
**kwargs)
result.set_section(section, out)
# propagate additional attributes
if out.meta.get('other'):
for k, v in out.meta['other'].items():
if len(boxes) > 1:
result.meta['other'][k, coords] = v
else:
result.meta['other'][k] = v
gc.collect()
finally:
astropy.log.setLevel(log_level) # and reset
# Now if the input arrays where splitted in chunks, we need to gather
# the data arrays for the additional attributes.
other = result.meta['other']
if other:
if len(boxes) > 1:
for (name, coords), obj in list(other.items()):
if not isinstance(obj, NDData):
raise ValueError('only NDData objects are handled here')
if name not in other:
other[name] = NDDataObject(np.empty(shape,
dtype=obj.data.dtype))
section = tuple([slice(start, end) for (start, end) in coords])
other[name].set_section(section, obj)
del other[name, coords]
for name in other:
# To set the name of our object we need to save it as an ndarray,
# otherwise for a NDData one AstroData would use the name of the
# AstroData object.
other[name] = other[name].data
result.meta['other_header'][name] = fits.Header({'EXTNAME': name})
return result
[docs]class AstroDataFits(AstroData):
# Derived classes may provide their own __keyword_dict. Being a private
# variable, each class will preserve its own, and there's no risk of
# overriding the whole thing
__keyword_dict = {
'instrument': 'INSTRUME',
'object': 'OBJECT',
'telescope': 'TELESCOP',
'ut_date': 'DATE-OBS'
}
[docs] @classmethod
def load(cls, source):
"""
Implementation of the abstract method `load`.
It takes an `HDUList` and returns a fully instantiated `AstroData` instance.
"""
return cls(FitsLoader(FitsProvider).load(source))
def _keyword_for(self, name):
"""
Returns the FITS keyword name associated to ``name``.
Parameters
----------
name : str
The common "key" name for which we want to know the associated
FITS keyword
Returns
-------
str
The desired keyword name
Raises
------
AttributeError
If there is no keyword for the specified ``name``
"""
for cls in self.__class__.mro():
mangled_dict_name = '_{}__keyword_dict'.format(cls.__name__)
try:
return getattr(self, mangled_dict_name)[name]
except (AttributeError, KeyError) as e:
pass
else:
raise AttributeError("No match for '{}'".format(name))
@staticmethod
def _matches_data(dataprov):
# This one is trivial. As long as we get a FITS file...
return True
@property
def phu(self):
return self._dataprov.phu()
@property
def hdr(self):
return self._dataprov.hdr()
[docs] def info(self):
self._dataprov.info(self.tags)
[docs] def write(self, filename=None, overwrite=False):
if filename is None:
if self.path is None:
raise ValueError("A filename needs to be specified")
filename = self.path
hdulist = self._dataprov.to_hdulist()
hdulist.writeto(filename, overwrite=overwrite)
[docs] def update_filename(self, prefix=None, suffix=None, strip=False):
"""
This method updates the "filename" attribute of the AstroData object.
A prefix and/or suffix can be specified. If strip=True, these will
replace the existing prefix/suffix; if strip=False, they will simply
be prepended/appended.
The current filename is broken down into its existing prefix, root,
and suffix using the ORIGNAME phu keyword, if it exists and is
contained within the current filename. Otherwise, the filename is
split at the last underscore and the part before is assigned as the
root and the underscore and part after the suffix. No prefix is
assigned.
Note that, if strip=True, a prefix or suffix will only be stripped
if '' is specified.
Parameters
----------
prefix: str/None
new prefix (None => leave alone)
suffix: str/None
new suffix (None => leave alone)
strip: bool
Strip existing prefixes and suffixes if new ones are given?
"""
if self.filename is None:
if 'ORIGNAME' in self.phu:
self.filename = self.phu['ORIGNAME']
else:
raise ValueError("A filename needs to be set before it "
"can be updated")
# Set the ORIGNAME keyword if it's not there
if 'ORIGNAME' not in self.phu:
self.phu.set('ORIGNAME', self.orig_filename,
'Original filename prior to processing')
if strip:
root, filetype = os.path.splitext(self.phu['ORIGNAME'])
filename, filetype = os.path.splitext(self.filename)
m = re.match('(.*){}(.*)'.format(re.escape(root)), filename)
# Do not strip a prefix/suffix unless a new one is provided
if m:
if prefix is None:
prefix = m.groups()[0]
existing_suffix = m.groups()[1]
else:
try:
root, existing_suffix = filename.rsplit("_", 1)
existing_suffix = "_" + existing_suffix
except ValueError:
root, existing_suffix = filename, ''
if suffix is None:
suffix = existing_suffix
else:
root, filetype = os.path.splitext(self.filename)
# Cope with prefix or suffix as None
self.filename = (prefix or '') + root + (suffix or '') + filetype
[docs] @astro_data_descriptor
def instrument(self):
"""
Returns the name of the instrument making the observation
Returns
-------
str
instrument name
"""
return self.phu.get(self._keyword_for('instrument'))
[docs] @astro_data_descriptor
def object(self):
"""
Returns the name of the object being observed
Returns
-------
str
object name
"""
return self.phu.get(self._keyword_for('object'))
[docs] @astro_data_descriptor
def telescope(self):
"""
Returns the name of the telescope
Returns
-------
str
name of the telescope
"""
return self.phu.get(self._keyword_for('telescope'))
[docs] def extver(self, ver):
"""
Get an extension using its EXTVER instead of the positional index
in this object.
Parameters
----------
ver : int
The EXTVER for the desired extension
Returns
-------
A sliced object containing the desired extension
Raises
------
IndexError
If the provided EXTVER doesn't exist
"""
try:
if isinstance(ver, int):
return self[self._dataprov.extver_map()[ver]]
else:
raise ValueError("{} is not an integer EXTVER".format(ver))
except KeyError as e:
raise IndexError("EXTVER {} not found".format(e.args[0]))
# ---------------------------------------------------------------------------
# gWCS <-> FITS WCS helper functions go here
# ---------------------------------------------------------------------------
# Could parametrize some naming conventions in the following two functions if
# done elsewhere for hard-coded names like 'SCI' in future, but they only have
# to be self-consistent with one another anyway.
[docs]def wcs_to_asdftablehdu(wcs, extver=None):
"""
Serialize a gWCS object as a FITS TableHDU (ASCII) extension.
The ASCII table is actually a mini ASDF file. The constituent AstroPy
models must have associated ASDF "tags" that specify how to serialize them.
In the event that serialization as pure ASCII fails (this should not
happen), a binary table representation will be used as a fallback.
"""
import asdf
import jsonschema
# Create a small ASDF file in memory containing the WCS object
# representation because there's no public API for generating only the
# relevant YAML subsection and an ASDF file handles the "tags" properly.
try:
af = asdf.AsdfFile({"wcs" : wcs})
except jsonschema.exceptions.ValidationError:
# (The original traceback also gets printed here)
raise TypeError("Cannot serialize model(s) for 'WCS' extension {}"\
.format(extver or ''))
# ASDF can only dump YAML to a binary file object, so do that and read
# the contents back from it for storage in a FITS extension:
with BytesIO() as fd:
with af:
# Generate the YAML, dumping any binary arrays as text:
af.write_to(fd, all_array_storage='inline')
fd.seek(0)
wcsbuf = fd.read()
# Convert the bytes to readable lines of text for storage (falling back to
# saving as binary in the unexpected event that this is not possible):
try:
wcsbuf = wcsbuf.decode('ascii').splitlines()
except UnicodeDecodeError:
# This should not happen, but if the ASDF contains binary data in
# spite of the 'inline' option above, we have to dump the bytes to
# a non-human-readable binary table rather than an ASCII one:
LOGGER.warning("Could not convert WCS {} ASDF to ASCII; saving table "
"as binary".format(extver or ''))
hduclass = BinTableHDU
fmt = 'B'
wcsbuf = np.frombuffer(wcsbuf, dtype=np.uint8)
else:
hduclass = TableHDU
fmt = 'A{0}'.format(max(len(line) for line in wcsbuf))
# Construct the FITS table extension:
col = Column(name='gWCS', format=fmt, array=wcsbuf,
ascii= hduclass is TableHDU)
hdu = hduclass.from_columns([col], name='WCS', ver=extver)
return hdu
[docs]def asdftablehdu_to_wcs(hdu):
"""
Recreate a gWCS object from its serialization in a FITS table extension.
Returns None (issuing a warning) if the extension cannot be parsed, so
the rest of the file can still be read.
"""
import asdf
ver = hdu.header.get('EXTVER', -1)
if isinstance(hdu, (TableHDU, BinTableHDU)):
try:
colarr = hdu.data['gWCS']
except KeyError:
LOGGER.warning("Ignoring 'WCS' extension {} with no 'gWCS' table "
"column".format(ver))
return
# If this table column contains text strings as expected, join the rows
# as separate lines of a string buffer and encode the resulting YAML as
# bytes that ASDF can parse. If AstroData has produced another format,
# it will be a binary dump due to the unexpected presence of non-ASCII
# data, in which case we just extract unmodified bytes from the table.
if colarr.dtype.kind in ('U', 'S'):
sep = os.linesep
# Just in case io.fits ever produces 'S' on Py 3 (not the default):
# join lines as str & avoid a TypeError with unicode linesep; could
# also use astype('U') but it assumes an encoding implicitly.
if colarr.dtype.kind == 'S' and not isinstance(sep, bytes):
colarr = np.char.decode(np.char.rstrip(colarr),
encoding='ascii')
wcsbuf = sep.join(colarr).encode('ascii')
else:
wcsbuf = colarr.tobytes()
# Convert the stored text to a Bytes file object that ASDF can open:
with BytesIO(wcsbuf) as fd:
# Try to extract a 'wcs' entry from the YAML:
try:
af = asdf.open(fd)
except:
LOGGER.warning("Ignoring 'WCS' extension {}: failed to parse "
"ASDF.\nError was as follows:\n{}"\
.format(ver, traceback.format_exc()))
return
else:
with af:
try:
wcs = af.tree['wcs']
except KeyError:
LOGGER.warning("Ignoring 'WCS' extension {}: missing "
"'wcs' dict entry.".format(ver))
return
else:
LOGGER.warning("Ignoring non-FITS-table 'WCS' extension {}"\
.format(ver))
return
return wcs