Source code for recipe_system.cal_service.remotedb

# Defines the RemoteDB class for calibration returns. This is a high-level
# interface to FITSstore. It may be subclassed in future

from os import path, makedirs
from io import BytesIO
from pprint  import pformat
from xml.dom import minidom

import urllib.request
import urllib.parse
import urllib.error

from .caldb import CalDB, CalReturn
from .calrequestlib import get_cal_requests, generate_md5_digest, get_request
from .calrequestlib import GetterError

UPLOADCOOKIE = "qap_upload_processed_cal_ok"

RESPONSESTR = """########## Request Data BEGIN ##########
%(sequence)s
########## Request Data END ##########

########## Calibration Server Response BEGIN ##########
%(response)s
########## Calibration Server Response END ##########

########## Nones Report (descriptors that returned None):
%(nones)s
########## Note: all descriptors shown above, scroll up.
        """


[docs] class RemoteDB(CalDB): """ The class for remote calibration databases. It inherits from CalDB, but also has the following attributes: Attributes ---------- server : str URL of the server store_science : bool whether processed science images should be uploaded _upload_cookie : str the cookie to send when uploading files _calmgr : str the URL for making requests to the remote calibration manager _proccal_url, _science_url : str the URLs for uploading processed calibrations and processed science images, respectively. """ def __init__(self, server, name=None, valid_caltypes=None, get_cal=True, store_cal=False, store_science=False, procmode=None, log=None, upload_cookie=None): if name is None: name = server super().__init__(name=name, get_cal=get_cal, store_cal=store_cal, log=log, valid_caltypes=valid_caltypes, procmode=procmode) self.store_science = store_science if not server.startswith("http"): # allow https:// server = f"http://{server}" self.server = server self._calmgr = f"{self.server}/calmgr" self._proccal_url = f"{self.server}/upload_processed_cal" self._science_url = f"{self.server}/upload_file" self._upload_cookie = upload_cookie or UPLOADCOOKIE def _get_calibrations(self, adinputs, caltype=None, procmode=None, howmany=1): log = self.log cal_requests = get_cal_requests(adinputs, caltype, procmode=procmode, is_local=False) cals = [] for rq in cal_requests: procstr = "" if procmode is None else f"/{procmode}" rqurl = f"{self._calmgr}/{rq.caltype}{procstr}/{rq.filename}" log.stdinfo(f"Querying remote database: {rqurl}") remote_cals = retrieve_calibration(rqurl, rq, howmany=howmany) if not remote_cals[0]: log.warning("START CALIBRATION SERVICE REPORT\n") if remote_cals[1]: log.warning(f"\t{remote_cals[1]}") log.warning(f"No {rq.caltype} found for {rq.filename}") log.warning("END CALIBRATION SERVICE REPORT\n") cals.append(None) continue good_cals = [] caldir = path.join(self.caldir, rq.caltype) for calurl, calmd5 in zip(*remote_cals): log.stdinfo(f"Found calibration (url): {calurl}") calname = path.basename(urllib.parse.urlparse(calurl).path) cachefile = path.join(caldir, calname) if path.exists(cachefile): cached_md5 = generate_md5_digest(cachefile) if cached_md5 == calmd5: log.stdinfo(f"Cached calibration {cachefile} matched.") good_cals.append(cachefile) continue else: log.stdinfo(f"File {calname} is cached but") log.stdinfo("md5 checksums DO NOT MATCH") log.stdinfo(f"Making request for {calurl}") if not path.exists(caldir): makedirs(caldir) try: get_request(calurl, cachefile) except GetterError as err: for message in err.messages: log.error(message) cals.append(None) continue download_mdf5 = generate_md5_digest(cachefile) if download_mdf5 == calmd5: log.status("MD5 hash match. Download OK.") good_cals.append(cachefile) else: raise OSError("MD5 hash of downloaded file does not match " f"expected hash {calmd5}") # Append list if >1 requested, else just the filename string if good_cals: cals.append(good_cals if howmany != 1 else good_cals[0]) else: cals.append(None) return CalReturn([None if cal is None else (cal, self.name) for cal in cals]) def _store_calibration(self, cal, caltype=None): """Store calibration. If this is a processed_science, cal should be an AstroData object, otherwise it should be a filename""" is_science = caltype is not None and "science" in caltype if not ((is_science and self.store_science) or (not is_science and self.store_cal)): self.log.stdinfo(f"{self.name}: NOT storing {cal} as {caltype}") return assert isinstance(cal, str) ^ is_science self.log.stdinfo(f"{self.name}: Storing {cal} as {caltype}") if "science" in caltype: # Write to a stream in memory, not to disk f = BytesIO() cal.write(f) postdata = f.getvalue() url = f"{self._science_url}/{cal.filename}" else: postdata = open(cal, "rb").read() url = f"{self._proccal_url}/{path.basename(cal)}" try: rq = urllib.request.Request(url) rq.add_header('Content-Length', '%d' % len(postdata)) rq.add_header('Content-Type', 'application/octet-stream') rq.add_header('Cookie', "gemini_fits_upload_auth=" f"{self._upload_cookie}") u = urllib.request.urlopen(rq, postdata) response = u.read() self.log.stdinfo(f"{url} uploaded OK.") except urllib.error.HTTPError as error: self.log.error(str(error)) raise
[docs] def retrieve_calibration(rqurl, rq, howmany=1): sequence = [("descriptors", rq.descriptors), ("types", rq.tags)] postdata = urllib.parse.urlencode(sequence).encode('utf-8') try: calrq = urllib.request.Request(rqurl) u = urllib.request.urlopen(calrq, postdata) response = u.read() except (urllib.error.HTTPError, urllib.error.URLError) as err: return None, str(err) desc_nones = [k for k, v in rq.descriptors.items() if v is None] preerr = RESPONSESTR % {"sequence": pformat(sequence), "response": response.strip(), "nones" : ", ".join(desc_nones) \ if len(desc_nones) > 0 else "No Nones Sent"} try: dom = minidom.parseString(response) calurlel = [d.childNodes[0].data for d in dom.getElementsByTagName('url')[:howmany]] calurlmd5 = [d.childNodes[0].data for d in dom.getElementsByTagName('md5')[:howmany]] except IndexError: return None, preerr return calurlel, calurlmd5