#
# DRAGONS
#
# calrequestlib.py
# ------------------------------------------------------------------------------
import hashlib
from os import makedirs
from os.path import basename, exists
from os.path import join, split
from urllib.parse import urlparse
from gempy.utils import logutils
from geminidr import set_caches
from recipe_system.cal_service import cal_search_factory, handle_returns_factory
from .file_getter import get_file_iterator, GetterError
# ------------------------------------------------------------------------------
log = logutils.get_logger(__name__)
# ------------------------------------------------------------------------------
# Currently delivers transport_request.calibration_search fn.
calibration_search = cal_search_factory()
# ------------------------------------------------------------------------------
[docs]def get_request(url, filename):
iterator = get_file_iterator(url)
with open(filename, 'wb') as fd:
for chunk in iterator:
fd.write(chunk)
return filename
[docs]def generate_md5_digest(filename):
md5 = hashlib.md5()
fdata = open(filename, 'rb').read()
md5.update(fdata)
return md5.hexdigest()
def _check_cache(cname, ctype):
cachedir = _makecachedir(ctype)
cachename = join(cachedir, cname)
if exists(cachename):
return cachename, cachedir
return None, cachedir
def _makecachedir(caltype):
cache = set_caches()
cachedir = join(cache["calibrations"], caltype)
makedirs(cachedir, exist_ok=True)
return cachedir
[docs]class CalibrationRequest:
"""
Request objects are passed to a calibration_search() function
"""
def __init__(self, ad, caltype=None, procmode=None):
self.ad = ad
self.caltype = caltype
self.procmode = procmode
self.datalabel = ad.data_label()
self.descriptors = None
self.filename = ad.filename
self.tags = ad.tags
[docs] def as_dict(self):
retd = {}
retd.update(
{'filename' : self.filename,
'caltype' : self.caltype,
'procmode' : self.procmode,
'datalabel' : self.datalabel,
"descriptors": self.descriptors,
"tags" : self.tags,
}
)
return retd
def __str__(self):
tempStr = "filename: {}\nDescriptors: {}\nTypes: {}"
tempStr = tempStr.format(self.filename, self.descriptors, self.tags)
return tempStr
[docs]def get_cal_requests(inputs, caltype, procmode=None):
"""
Builds a list of :class:`.CalibrationRequest` objects, one for each `ad`
input.
Parameters
----------
inputs: <list>
A list of input AstroData instances.
caltype: <str>
Calibration type, eg., 'processed_bias', 'flat', etc.
Returns
-------
rq_events: <list>
A list of CalibrationRequest instances, one for each passed
'ad' instance in 'inputs'.
"""
options = {'central_wavelength': {'asMicrometers': True}}
_handle_returns = handle_returns_factory()
rq_events = []
for ad in inputs:
log.stdinfo("Received calibration request for {}".format(ad.filename))
rq = CalibrationRequest(ad, caltype, procmode)
# Check that each descriptor works and returns a sensible value.
desc_dict = {}
for desc_name in ad.descriptors:
try:
descriptor = getattr(ad, desc_name)
except AttributeError:
pass
else:
kwargs = options[desc_name] if desc_name in list(options.keys()) else {}
try:
dv = _handle_returns(descriptor(**kwargs))
except:
dv = None
# Munge list to value if all item(s) are the same
if isinstance(dv, list):
dv = dv[0] if all(v == dv[0] for v in dv) else "+".join(
[str(v) for v in dv])
desc_dict[desc_name] = dv
rq.descriptors = desc_dict
rq_events.append(rq)
return rq_events
[docs]def process_cal_requests(cal_requests, howmany=None):
"""
Conduct a search for calibration files for the passed list of calibration
requests. This passes the requests to the calibration_search() function,
and then examines the search results to see if a matching file, if any,
is cached. If not, then the calibration file is retrieved from the
archive.
If a calibration match is found by the calibration manager, a URL is
returned. This function will perform a cache inspection to see if the
matched calibraiton file is already present. If not, the calibration
will be downloaded and written to the cache. It is this path that is
returned in the dictionary structure. A path of 'None' indicates that no
calibration match was found.
Parameters
----------
cal_requests : <list>
A list of CalibrationRequest objects
howmany : <int>, optional
Maximum number of calibrations to return per request
(not passed to the server-side request but trims the returned list)
Returns
-------
calibration_records : <dict>
A set of science frames and matching calibrations.
Example
-------
The returned dictionary has the form::
{(ad): <filename_of_calibration_including_path>,
...
}
"""
calibration_records = {}
warn = "\tNo {} calibration file found for {}\n"
md5msg = "\tNo {} calibration found for {}\n"
def _add_cal_record(rq, calfile):
calibration_records.update({rq.ad: calfile})
return
cache = set_caches()
for rq in cal_requests:
calname = None
calmd5 = None
calurl = None
calurl, calmd5 = calibration_search(rq, howmany=(howmany if howmany else 1))
if not calurl:
log.warning("START CALIBRATION SERVICE REPORT\n")
if not calmd5:
log.warning(md5msg.format(rq.caltype, rq.filename))
else:
log.warning("\t{}".format(calmd5))
log.warning(warn.format(rq.caltype, rq.filename))
log.warning("END CALIBRATION SERVICE REPORT\n")
continue
calibs = []
for url, md5 in zip(calurl, calmd5):
log.info("Found calibration (url): {}".format(url))
components = urlparse(url)
calname = basename(components.path)
cachename, cachedir = _check_cache(calname, rq.caltype)
if cachename:
cached_md5 = generate_md5_digest(cachename)
if cached_md5 == md5:
log.stdinfo("Cached calibration {} matched.".format(cachename))
calibs.append(cachename)
else:
log.stdinfo("File {} is cached but".format(calname))
log.stdinfo("md5 checksums DO NOT MATCH")
log.stdinfo("Making request on calibration service")
log.stdinfo("Requesting URL {}".format(url))
try:
calname = get_request(url, cachename)
calibs.append(cachename)
except GetterError as err:
for message in err.messages:
log.error(message)
continue
log.status("Making request for {}".format(url))
fname = split(url)[1]
calname = join(cachedir, fname)
try:
calname = get_request(url, calname)
except GetterError as err:
for message in err.messages:
log.error(message)
else:
# hash compare
download_mdf5 = generate_md5_digest(calname)
if download_mdf5 == md5:
log.status("MD5 hash match. Download OK.")
calibs.append(calname)
else:
err = "MD5 hash of downloaded file does not match expected hash {}"
raise OSError(err.format(md5))
# If howmany=None, append the only file as a string, instead of the list
if calibs:
_add_cal_record(rq, calibs if howmany else calibs[0])
return calibration_records