"""
This module provides decorator functions for implementing the (somewhat TBD)
parameter override policy of the prototype primitive classes. This implementation
is subject to any change in policy.
Currently, the policy defines an order of parameter precedence:
1. *user parameters*-- as passed by -p on the reduce command line. These take
full precedence over any other parameter setting. User parameters may be
primitive-specific, as in `-p makeFringeFrame:reject_method=jilt` in which
case, only that primitive's parameter will be overridden. Other primitives
with the same parameter (e.g. `reject_method` is also a parameter for
`stackFlats`) will remain unaffected. If a user passes an un-specified
parameter, as in `reject_method=jilt`, then any primitive with that parameter
will receive that parameter value.
2. *recipe parameters* -- as passed on a recipe function call, like
`recipe_name(par1=val1)`. These will be overridden by same named user
parameters.
3. *default parameters* -- The default parameter sets as defined in package
parameter files. These will be overridden by recipe parameters and then
user parameters when applicable.
This policy is implemented in the decorator function,
parameter_override
This is the primitive decorator and should be the only one decorating a
primitive class.
This decorator is fully enhanced by the make_class_wrapper decorator, which
maps the parameter_override decorator function to all public methods on
the decorated class.
E.g.,::
from pkg_utilities.decorators import parameter_override
@parameter_override
class PrimitivesIMAGE(PrimitivesGMOS):
def __init__(self, adinputs, uparms={}):
[ . . . ]
"""
import gc
import inspect
import json
import traceback
from datetime import datetime
from functools import wraps
from copy import copy, deepcopy
import geminidr
from astrodata import AstroData
from astrodata.provenance import add_provenance_history, clone_provenance, clone_provenance_history
from gempy.utils import logutils
from recipe_system.utils.md5 import md5sum
[docs]def memusage():
import psutil
proc = psutil.Process()
return '{:9.3f}'.format(float(proc.memory_info().rss) / 1e6)
# ------------------------------------------------------------------------------
LOGINDENT = 0
log = logutils.get_logger(__name__)
# ------------------------------------------------------------------------------
[docs]def userpar_override(pname, args, upars):
"""
Implement user parameter overrides. In this implementation, user
parameters *always* take precedence. Any user parameters passed to the
Primitive class constructor, usually via -p on the 'reduce' command
line, *must* override any specified recipe parameters.
Note: user parameters may be primitive-specific, i.e. passed as
-p makeFringeFrame:reject_method='jilt'
in which case, the parameter will only be overridden for that primitive.
Other primitives with the same parameter (e.g. stackFlats reject_method)
will not be affected.
This returns a dict of the overridden parameters and their values.
"""
parset = {}
for key, val in list(upars.items()):
if ':' in key:
prim, par = key.split(':')
if prim == pname and par in args:
parset.update({par: val})
elif key in args:
parset.update({key: val})
return parset
[docs]def set_logging(pname):
global LOGINDENT
LOGINDENT += 1
logutils.update_indent(LOGINDENT)
#stat_msg = "{} PRIMITIVE: {}".format(memusage(), pname)
stat_msg = "PRIMITIVE: {}".format(pname)
log.status(stat_msg)
log.status("-" * len(stat_msg))
return
[docs]def unset_logging():
global LOGINDENT
log.status(".")
LOGINDENT -= 1
logutils.update_indent(LOGINDENT)
return
[docs]def zeroset():
global LOGINDENT
LOGINDENT = 0
logutils.update_indent(LOGINDENT)
return
# -------------------------------- decorators ----------------------------------
[docs]def make_class_wrapper(wrapped):
@wraps(wrapped)
def class_wrapper(cls):
for attr_name, attr_fn in list(cls.__dict__.items()):
if attr_name.startswith("_") or not callable(attr_fn): # no prive,magic
continue
setattr(cls, attr_name, wrapped(attr_fn))
return cls
return class_wrapper
def _get_provenance_inputs(adinputs):
"""
gets the input information for a future call to store provenance history.
The AstroData inputs can change during the call to a primitive. We use this
helper function to extract the 'before' state of things so that we can accurately
record provenance history. After the primitive returns, we have the AstroData
objects into which we'll want to record this information.
Args
-----
adinputs : list of incoming `AstroData` objects
We expect to be called before the primitive executes, since we want to capture the
state of the adinputs before they may be modified.
Returns
--------
`dict` by datalabel of dictionaries with the filename, md5, provenance and
provenance_history data from the inputs
"""
retval = dict()
for ad in adinputs:
if ad.path:
md5 = md5sum(ad.path) or ""
else:
md5 = ""
if hasattr(ad, 'PROVENANCE'):
provenance = ad.PROVENANCE.copy()
else:
provenance = []
if hasattr(ad, 'PROVHISTORY'):
provenance_history = ad.PROVHISTORY.copy()
else:
provenance_history = []
retval[ad.data_label()] = \
{
"filename": ad.filename,
"md5": md5,
"provenance": provenance,
"provenance_history": provenance_history
}
return retval
def _clone_provenance_deprecated(provenance_input, ad):
"""
For a single input's provenance, copy it into the output
`AstroData` object as appropriate.
This takes a dictionary with a source filename, md5 and both it's
original provenance and provenance_history information. It duplicates
the provenance data into the outgoing `AstroData` ad object.
Args
-----
provenance_input : dictionary with provenance data from a single input.
We only care about the `provenance` element, which holds a list of
provenance data
ad : outgoing `AstroData` object to add provenance data to
Returns
--------
none
"""
provenance = provenance_input["provenance"]
for prov in provenance:
ad.add_provenance(prov)
[docs]def __top_level_primitive__():
""" Check if we are at the top-level, not being called from another primitive.
We only want to capture provenance history when we are passing through the
uppermost primitive calls. These are the calls that get made from the recipe.
"""
for trace in inspect.stack():
if "self" in trace[0].f_locals:
inst = trace[0].f_locals["self"]
if isinstance(inst, geminidr.PrimitivesBASE):
return False
# if we encounter no primitives above this decorator, then this is a top level primitive call
return True
def _capture_provenance(provenance_inputs, ret_value, timestamp_start, fn, args):
"""
Add the given provenance data to the outgoing `AstroData` objects in ret_value,
with an additional provenance entry for the current operation.
This is a fairly specific function that does a couple of things. First, it will
iterate over collected provenance and history data in provenance_inputs and add
them as appropriate to the outgoing `AstroData` in ret_value. Second, it takes
the current operation, expressed in timestamp_start, fn and args, and adds that
to the outgoing ret_value objects' provenance as well.
Args
-----
provenance_inputs : provenance and provenance history information to add
This is an dictionary keyed by datalabel of dictionaries with the relevant
provenance for that particular input. Each dictionary contains the filename,
md5 and the provenance and provenance_history of that `AstroData` prior to execution of
the primitive.
ret_value : outgoing list of `AstroData` data
fn : name of the function (primitive) being executed
args : arguments that are being passed to the primitive
Returns
--------
none
"""
if not __top_level_primitive__():
# We're inside a primitive being called from another, we want to omit this
# from the provenance. Only the recipe calls are relevant.
return
try:
timestamp = datetime.now()
for ad in ret_value:
if ad.data_label() in provenance_inputs:
# output corresponds to an input, we only need to copy from there
clone_provenance(provenance_inputs[ad.data_label()]['provenance'], ad)
clone_provenance_history(provenance_inputs[ad.data_label()]['provenance_history'], ad)
else:
if hasattr(ad, 'PROVHISTORY'):
clone_hist = False
else:
clone_hist = True
for provenance_input in provenance_inputs.values():
clone_provenance(provenance_input['provenance'], ad)
if clone_hist:
clone_provenance_history(provenance_input['provenance_history'], ad)
for ad in ret_value:
add_provenance_history(ad, timestamp_start, timestamp, fn.__name__, args)
except Exception as e:
# we don't want provenance failures to prevent data reduction
log.warn("Unable to save provenance information, continuing on: %s" % e)
traceback.print_exc()
[docs]@make_class_wrapper
def parameter_override(fn):
@wraps(fn)
def gn(pobj, *args, **kwargs):
pname = fn.__name__
# for provenance information
stringified_args = "%s" % kwargs
timestamp_start = datetime.now()
# Start with the config file to get list of parameters
# Copy to avoid permanent changes; shallow copy is OK
if pname not in pobj.params:
err_msg = ('Could not find "{}Config" configuration in "{}" module'
' or any parent module.')
raise KeyError(err_msg.format(
pname, pobj.__module__.replace("primitives", "parameters")))
config = copy(pobj.params[pname])
# Find user parameter overrides
params = userpar_override(pname, list(config), pobj.user_params)
set_logging(pname)
# Override with values in the function call
for k, v in kwargs.items():
if k in params:
log.warning('Parameter {}={} was set but will be ignored '
'because the recipe enforces a different value '
'(={})'.format(k, params[k], v))
params.update(kwargs)
# config doesn't know about streams or adinputs
instream = params.get('instream', params.get('stream', 'main'))
outstream = params.get('outstream', params.get('stream', 'main'))
adinputs = params.get('adinputs')
for k in ('adinputs', 'stream', 'instream', 'outstream'):
try:
del params[k]
except KeyError:
pass
# Can update config now it only has parameters it knows about
config.update(**params)
config.validate()
if len(args) == 0 and adinputs is None:
# Use appropriate stream input/output
# Many primitives operate on AD instances in situ, so need to
# copy inputs if they're going to a new output stream
if instream != outstream:
adinputs = [deepcopy(ad) for ad in pobj.streams[instream]]
else:
# Allow a non-existent stream to be passed
adinputs = pobj.streams.get(instream, [])
try:
provenance_inputs = _get_provenance_inputs(adinputs)
fnargs = dict(config.items())
stringified_args = json.dumps({k: v for k, v in fnargs.items()
if not k.startswith('debug_')},
default=lambda v: v.filename if hasattr(v, 'filename')
else '<not serializable>')
ret_value = fn(pobj, adinputs=adinputs, **fnargs)
_capture_provenance(provenance_inputs, ret_value, timestamp_start, fn, stringified_args)
except Exception:
zeroset()
raise
# And place the outputs in the appropriate stream
pobj.streams[outstream] = ret_value
else:
if args: # if not, adinputs has already been assigned from params
adinputs = args[0]
try:
if isinstance(adinputs, AstroData):
raise TypeError("Single AstroData instance passed to primitive, should be a list")
provenance_inputs = _get_provenance_inputs(adinputs)
ret_value = fn(pobj, adinputs=adinputs, **dict(config.items()))
_capture_provenance(provenance_inputs, ret_value, timestamp_start, fn, stringified_args)
except Exception:
zeroset()
raise
unset_logging()
gc.collect()
return ret_value
return gn