#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2021-05-30
# @Filename: delegate.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
from __future__ import annotations
import asyncio
import os
import pathlib
import shutil
import subprocess
from contextlib import suppress
from dataclasses import dataclass, field
from functools import partial
from tempfile import NamedTemporaryFile
from time import time
from unittest.mock import MagicMock
from typing import (
TYPE_CHECKING,
Any,
Coroutine,
Dict,
Generic,
List,
Sequence,
TypedDict,
TypeVar,
)
import astropy.time
import numpy
from astropy.io import fits
from sdsstools.configuration import Configuration
from sdsstools.time import get_sjd
from sdsstools.utils import cancel_task
from archon import __version__
from archon.controller.controller import ArchonController
from archon.controller.maskbits import ControllerStatus
from archon.exceptions import ArchonError
from archon.tools import gzip_async, subprocess_run_async
if TYPE_CHECKING:
import numpy.typing as nptyping
from clu import Command
from .actor import ArchonBaseActor
DataArray = nptyping.NDArray[numpy.uint16]
Actor_co = TypeVar("Actor_co", bound="ArchonBaseActor", covariant=True)
[docs]
class FetchDataDict(TypedDict):
"""Dictionary of fetched data."""
controller: str
buffer: int
ccd: str
data: DataArray
header: dict[str, list]
exposure_no: int
filename: str
[docs]
@dataclass
class ExposeData:
"""Data about the ongoing exposure."""
exposure_time: float
flavour: str
controllers: list[ArchonController]
start_time: astropy.time.Time = field(default_factory=astropy.time.Time.now)
end_time: astropy.time.Time | None = None
mjd: int = 0
exposure_no: int = 0
header: Dict[str, Any] = field(default_factory=dict)
delay_readout: int = 0
window_mode: str | None = None
window_params: dict = field(default_factory=dict)
[docs]
class ExposureDelegate(Generic[Actor_co]):
"""Handles the exposure workflow."""
def __init__(self, actor: Actor_co):
self.actor = actor
self.config = Configuration(actor.config.copy())
self.expose_data: ExposeData | None = None
self.last_exposure_no: int = -1
self.use_shutter: bool = True
self.is_writing: bool = False
self.lock = asyncio.Lock()
self._command: Command[Actor_co] | None = None
self._expose_cotasks: asyncio.Task | None = None
self._current_task: asyncio.Task | None = None
self._check_fitsio()
@property
def command(self):
"""Returns the current command."""
if not self._command:
raise AttributeError("Command not set")
return self._command
@command.setter
def command(self, value: Command[Actor_co] | None):
"""Sets the current command."""
self._command = value
[docs]
async def reset(self):
"""Resets the exposure delegate."""
self.expose_data = None
self.command = None
self.is_writing = False
self._expose_cotasks = await cancel_task(self._expose_cotasks)
if self.lock.locked():
self.lock.release()
[docs]
async def fail(self, message: str | None = None):
"""Fails a command."""
self._current_task = await cancel_task(self._current_task)
if message:
self.command.fail(error=message)
else:
self.command.fail()
await self.reset()
return False
[docs]
def set_task(self, coro: Coroutine):
"""Schedules a coroutine as a task. The current task is cancelled by `.fail`."""
self._current_task = asyncio.create_task(coro)
return self._current_task
[docs]
async def pre_expose(self, controllers: List[ArchonController]):
"""A routine that runs before integration begins."""
return
[docs]
async def expose(
self,
command: Command[Actor_co],
controllers: List[ArchonController],
flavour: str = "object",
exposure_time: float | None = 1.0,
readout: bool = True,
window_mode: str | None = None,
window_params: dict = {},
seqno: int | None = None,
**readout_params,
) -> bool:
self.command = command
if self.lock.locked():
await self.fail("The expose delegate is locked.")
return False
if flavour == "bias":
exposure_time = 0.0
else:
if exposure_time is None:
await self.fail(f"Exposure time required for flavour {flavour!r}.")
return False
if window_mode:
if window_mode == "default":
window_params = controllers[0].default_window.copy()
elif window_mode in self.config.get("window_modes", []):
extra_window_params = window_params.copy()
window_params = self.config["window_modes"][window_mode]
window_params.update(extra_window_params)
else:
await self.fail(f"Invalid window mode {window_mode!r}.")
return False
self.expose_data = ExposeData(
exposure_time=exposure_time,
flavour=flavour,
controllers=controllers,
window_params=window_params,
window_mode=window_mode,
)
if not (await self.check_expose()):
return False
# Lock until the exposure is done.
await self.lock.acquire()
will_write = readout_params.get("write", True)
if not await self._set_exposure_no(
controllers,
increase=will_write,
seqno=seqno,
):
return False
self.command.debug(next_exposure_no=self.expose_data.exposure_no)
# If the exposure is a bias or dark we don't open the shutter, but
# otherwise we add an extra timeout to allow for the code that handles
# the shutter to open and close it and control the exposure time that way.
if exposure_time == 0.0 or flavour == "bias":
exposure_time = 0.0
await self.pre_expose(controllers)
try:
self.command.debug("Setting exposure window.")
await asyncio.gather(*[c.set_window(**window_params) for c in controllers])
except BaseException as err:
self.command.error("One controller failed setting the exposure window.")
self.command.error(error=str(err))
return await self.fail()
self._expose_cotasks = asyncio.create_task(self.expose_cotasks())
expose_jobs = [
asyncio.create_task(controller.expose(exposure_time, readout=False))
for controller in controllers
]
try:
c_list = ", ".join([controller.name for controller in controllers])
self.command.info(text=f"Starting exposure in controllers: {c_list}.")
await asyncio.gather(*expose_jobs)
except BaseException as err:
self.command.error(error=str(err))
self.command.error("One controller failed. Cancelling remaining tasks.")
for job in expose_jobs:
if not job.done(): # pragma: no cover
with suppress(asyncio.CancelledError):
job.cancel()
await job
return await self.fail()
# Operate the shutter
if self.use_shutter:
if not (await self.shutter(True)):
return await self.fail("Shutter failed to open.")
await asyncio.sleep(exposure_time)
# Close shutter.
if self.use_shutter:
if not (await self.shutter(False)):
return await self.fail("Shutter failed to close.")
if readout:
return await self.readout(self.command, **readout_params)
return True
[docs]
async def check_expose(self) -> bool:
"""Performs a series of checks to confirm we can expose."""
assert self.expose_data
for controller in self.expose_data.controllers:
cname = controller.name
if controller.status & ControllerStatus.EXPOSING:
return await self.fail(f"Controller {cname} is exposing.")
elif controller.status & ControllerStatus.READOUT_PENDING:
return await self.fail(f"Controller {cname} has a read out pending.")
elif controller.status & ControllerStatus.ERROR:
return await self.fail(f"Controller {cname} has status ERROR.")
return True
[docs]
async def shutter(self, open=False) -> bool:
"""Operate the shutter."""
return True
[docs]
async def readout(
self,
command: Command[Actor_co],
extra_header={},
delay_readout: int = 0,
write: bool = True,
) -> bool:
"""Reads the exposure, fetches the buffer, and writes to disk."""
self.command = command
# The command could be done at this point if we are doing an async readout.
# In that case we would see an annoying warning. Instead let's replace the
# command with an empty namespace.
if self.command.done():
self.command.set_status = MagicMock()
if not self.lock.locked():
return await self.fail("Expose delegator is not locked.")
if self.expose_data is None:
return await self.fail("No exposure data found.")
controllers = self.expose_data.controllers
self.expose_data.end_time = astropy.time.Time.now()
self.expose_data.header = extra_header
self.expose_data.delay_readout = delay_readout
t0 = time()
if any([c.status & ControllerStatus.EXPOSING for c in controllers]):
return await self.fail(
"Found controllers exposing. Wait before reading or "
"manually abort them."
)
try:
command.info(text="Reading out CCDs.")
readout_tasks = [
controller.readout(
delay=self.expose_data.delay_readout,
notifier=self.command.debug,
idle_after=False,
)
for controller in controllers
]
await asyncio.gather(*readout_tasks, self.readout_cotasks())
command.debug(text="Fetching buffers.")
c_fdata = await asyncio.gather(*[self.fetch_data(c) for c in controllers])
except Exception as err:
return await self.fail(f"Failed reading out: {err}")
if len(c_fdata) == 0:
self.command.error("No data was fetched.")
return False
self.command.debug(f"Readout completed in {time() - t0:.1f} seconds.")
if write is False:
self.command.warning("Not saving images to disk.")
await self.reset()
return True
# c_fdata is a list of lists. The top level list is one per controller,
# the inner lists one per CCD. Since the inner list containes the name
# of the controller we can flatten it now.
fdata: list[FetchDataDict] = []
for cf in c_fdata:
fdata += cf
self.command.debug(text="Calling post-process routine.")
post_process_jobs = []
for fdata_ccd in fdata:
post_process_jobs.append(self.post_process(fdata_ccd))
await asyncio.gather(*post_process_jobs)
# Update save-point file after post-processing.
self.actor.exposure_recovery.update(fdata)
excluded_cameras: list[str] = self.config.get("excluded_cameras", [])
write_engine: str = self.config.get("files.write_engine", "astropy")
write_async: bool = self.config.get("files.write_async", True)
self.command.debug(text="Writing data to file.")
write_results: list = []
write_coros = [
self.write_to_disk(
fd,
excluded_cameras=excluded_cameras,
write_async=write_async,
write_engine=write_engine,
)
for fd in fdata
]
self.last_exposure_no = fdata[0]["exposure_no"]
# Prepare checksum information.
write_checksum: bool = self.config["checksum.write"]
checksum_mode: str = self.config.get("checksum.mode", "md5")
checksum_file = self.config.get("checksum.file", f"{{SJD}}.{checksum_mode}sum")
checksum_file: str = checksum_file.format(SJD=get_sjd())
if self.config.get("files.write_async", True):
coro_iter = asyncio.as_completed(write_coros)
else:
coro_iter = write_coros
for coro in coro_iter:
try:
result = await coro
write_results.append(result)
# Delete save-point. We do it here because in case one of the
# write coroutines crashes the actor.
if isinstance(result, str):
fn = result
self.actor.exposure_recovery.unlink(fn)
# Update checksum file.
if write_checksum:
try:
await self._generate_checksum(
checksum_file,
[fn],
mode=checksum_mode,
)
except Exception as err:
self.command.warning(str(err))
continue
except Exception as err:
write_results.append(err)
filenames: list[str] = []
failed_to_write: bool = False
for ii, result in enumerate(write_results):
fn = fdata[ii]["filename"]
ccd = fdata[ii]["ccd"]
if isinstance(result, str):
filenames.append(result)
elif isinstance(result, Exception):
self.command.error(f"Failed to writting {fn!s} to disk: {result!s}")
failed_to_write = True
elif result is None:
self.command.warning(f"Not saving image for camera {ccd!r}.")
self.command.info(filenames=filenames)
await self.reset()
return not failed_to_write
[docs]
async def expose_cotasks(self):
"""Tasks that will be executed concurrently with readout.
There is no guarantee that this coroutine will be waited or that
it will complete before the shutter closes and the readout begins.
To ensure that the expose tasks have completed, await the task
in ``self._expose_cotasks``.
"""
return
[docs]
async def readout_cotasks(self):
"""Tasks that will be executed concurrently with readout.
This routine can be overridden to run processes that do not need to
wait until `.post_process`. For example, reading out sensors and
telescope data can happen here to save time.
"""
return
[docs]
async def fetch_data(self, controller: ArchonController):
"""Fetches the buffer and compiles the header."""
# Fetch buffer
self.command.debug(text=f"Fetching {controller.name} buffer.")
data, buffer_no = await controller.fetch(
return_buffer=True,
notifier=self.command.debug,
)
self.is_writing = True
assert self.expose_data
self.expose_data.header["BUFFER"] = [buffer_no, "The buffer number read"]
controller_info = self.config["controllers"][controller.name]
ccd_dict: list[FetchDataDict] = []
for ccd_name in controller_info["detectors"]:
ccd_header = await self.build_base_header(controller, ccd_name)
ccd_data = self._get_ccd_data(data, controller, ccd_name, controller_info)
ccd_dict.append(
{
"controller": controller.name,
"buffer": buffer_no,
"ccd": ccd_name,
"data": ccd_data,
"header": ccd_header,
"exposure_no": self.expose_data.exposure_no,
"filename": self._get_ccd_filepath(controller, ccd_name),
}
)
# Create a save-point file.
self.actor.exposure_recovery.update(ccd_dict)
return ccd_dict
[docs]
@staticmethod
async def write_to_disk(
ccd_data: FetchDataDict,
excluded_cameras: list[str] = [],
write_async: bool = True,
write_engine: str = "astropy",
) -> str | None:
"""Writes ccd data to disk."""
# Check if the CCD is in the list of excluded cameras. If so, raise.
ccd = ccd_data["ccd"]
if ccd in excluded_cameras:
return None
# Check file path and update header with exposure number and file name.
file_path = ccd_data["filename"]
if os.path.exists(file_path):
raise ArchonError(f"Cannot overwrite file {file_path}.")
header = ccd_data["header"]
header["FILENAME"][0] = os.path.basename(file_path)
header["EXPOSURE"][0] = ccd_data["exposure_no"]
# Determine which engine to use to save the data.
if write_engine == "astropy":
writeto = partial(ExposureDelegate._write_file_astropy, ccd_data)
elif write_engine == "fitsio":
writeto = partial(ExposureDelegate._write_file_fitsio, ccd_data)
else:
raise ArchonError(f"Invalid write engine {write_engine!r}.")
# Name of the temporary file where the data will be written to first.
temp_file = NamedTemporaryFile(suffix=".fits", delete=True).name
if write_async:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, writeto, temp_file)
if file_path.endswith(".gz"):
# astropy and fitsio are slow compressing ith gzip. Instead we
# save the image uncompressed and then compress it manually.
await gzip_async(temp_file, complevel=1, suffix=".gz")
temp_file = temp_file + ".gz"
else:
writeto(temp_file)
if file_path.endswith(".gz"):
subprocess.run(f"gzip -1 {temp_file}", shell=True)
temp_file = temp_file + ".gz"
if not os.path.exists(temp_file):
raise ArchonError(f"Failed writing image {file_path!s} to disk.")
# Rename to final file.
try:
shutil.copyfile(temp_file, file_path)
except Exception:
raise ArchonError(
f"Failed renaming temporary file {temp_file}. "
"The original file is still available."
)
else:
os.unlink(temp_file)
return file_path
@staticmethod
def _write_file_astropy(data: FetchDataDict, file_path: str):
"""Writes the HDU to file using astropy."""
header = fits.Header()
for key, value in data["header"].items():
header[key] = tuple(value) if isinstance(value, (list, tuple)) else value
hdu = fits.PrimaryHDU(data["data"], header=header)
hdu.writeto(file_path, checksum=True, overwrite=True)
return
@staticmethod
def _write_file_fitsio(data: FetchDataDict, file_path: str):
"""Writes the HDU to file using astropy."""
import fitsio
header = []
for key, value in data["header"].items():
if isinstance(value, Sequence):
header.append({"name": key, "value": value[0], "comment": value[1]})
else:
header.append({"name": key, "value": value, "comment": ""})
with fitsio.FITS(file_path, "rw") as fits_:
fits_.write(data["data"], header=header)
fits_[-1].write_checksum()
return
@staticmethod
async def _generate_checksum(
checksum_file: str, filenames: list[str], mode: str = "md5"
):
"""Generates a checksum file for the images written to disk."""
if mode.startswith("sha1"):
sum_command = "sha1sum"
elif mode.startswith("md5"):
sum_command = "md5sum"
else:
raise ArchonError(f"Invalid checksum mode {mode!r}.")
for filename in filenames:
filename = str(filename)
dirname = os.path.dirname(os.path.realpath(filename))
basename = os.path.basename(filename)
try:
await subprocess_run_async(
f"{sum_command} {basename} >> {checksum_file}",
shell=True,
cwd=dirname,
)
except Exception as err:
raise ArchonError(f"Failed to generate checksum: {err}")
[docs]
async def post_process(self, fdata: FetchDataDict):
"""Custom post-processing.
This routine can be overridden to perform custom post-processing of the
fetched data. It is called with the data, header, and other metadata for
each CCD. It must modify the data in place and return ``None``.
Parameters
----------
fdata
A dictionary of fetched data with the ``data`` array, a ``header``
dictionary, the ``controller`` and ``ccd`` name, and the ``filename``
to which the data will be saved.
"""
return
async def _set_exposure_no(
self,
controllers: list[ArchonController],
increase: bool = True,
seqno: int | None = None,
):
"""Gets the exposure number for this exposure."""
assert self.expose_data
now = astropy.time.Time.now()
mjd = get_sjd() if self.config["files.use_sjd"] else int(now.mjd)
self.expose_data.mjd = mjd
# Get data directory or create it if it doesn't exist.
data_dir = pathlib.Path(self.config["files"]["data_dir"])
if not data_dir.exists():
data_dir.mkdir(parents=True)
# We store the next exposure number in a file at the root of the data directory.
next_exp_file = data_dir / "nextExposureNumber"
if not next_exp_file.exists():
self.command.warning(f"{next_exp_file} not found. Creating it.")
next_exp_file.touch()
if seqno is None:
with open(next_exp_file, "r") as fd:
data = fd.read().strip()
self.expose_data.exposure_no = int(data) if data != "" else 1
else:
self.expose_data.exposure_no = seqno
# Check that files don't exist.
for controller in controllers:
ccds = list(self.config["controllers"][controller.name]["detectors"].keys())
for ccd in ccds:
try:
self._get_ccd_filepath(controller, ccd)
except FileExistsError as err:
await self.fail(f"{err} Check the nextExposureNumber file.")
return False
if increase:
with open(next_exp_file, "w") as fd:
fd.write(str(self.expose_data.exposure_no + 1))
return True
def _get_ccd_filepath(self, controller: ArchonController, ccd: str):
"""Returns the path for an exposure."""
assert self.command.actor and self.expose_data
config = self.actor.config
data_dir = pathlib.Path(config["files"]["data_dir"])
mjd_dir = data_dir / str(self.expose_data.mjd)
mjd_dir.mkdir(parents=True, exist_ok=True)
path: pathlib.Path = mjd_dir / config["files"]["template"]
observatory = self.command.actor.observatory.lower()
hemisphere = "n" if observatory == "apo" else "s"
file_path = str(path.absolute()).format(
exposure_no=self.expose_data.exposure_no,
controller=controller.name,
observatory=observatory,
hemisphere=hemisphere,
ccd=ccd,
)
if os.path.exists(file_path):
raise FileExistsError(f"File {file_path} already exists.")
return file_path
@staticmethod
def _get_ccd_data(
data: numpy.ndarray,
controller: ArchonController,
ccd_name: str,
controller_info: Dict[str, Any],
) -> numpy.ndarray:
"""Retrieves the CCD data from the buffer frame."""
assert controller.acf_config
pixels = int(controller.acf_config["CONFIG"]["PIXELCOUNT"])
lines = int(controller.acf_config["CONFIG"]["LINECOUNT"])
framemode_int = int(controller.acf_config["CONFIG"]["FRAMEMODE"])
if framemode_int == 0:
framemode = "top"
elif framemode_int == 1:
framemode = "bottom"
else:
framemode = "split"
taps = controller_info["detectors"][ccd_name]["taps"]
ccd_index = list(controller_info["detectors"].keys()).index(ccd_name)
if framemode == "top":
x0_base = ccd_index * pixels * taps
x0 = x0_base
ccd_taps = []
for _ in range(taps):
y0 = 0
y1 = lines
x1 = x0 + pixels
ccd_taps.append(data[y0:y1, x0:x1])
x0 = x1
if len(ccd_taps) == 1:
return ccd_taps[0]
bottom = numpy.hstack(ccd_taps[0 : len(ccd_taps) // 2])
top = numpy.hstack(ccd_taps[len(ccd_taps) // 2 :])
ccd_data = numpy.vstack([top[:, ::-1], bottom[::-1, :]])
elif framemode == "split":
x0 = ccd_index * pixels * (taps // 2)
x1 = x0 + pixels * (taps // 2)
y0 = 0
y1 = lines * (taps // 2)
ccd_data = data[y0:y1, x0:x1]
else:
raise ValueError(f"Framemode {framemode} is not supported at this time.")
return ccd_data
def _check_fitsio(self):
"""Checks if fitsio is installed and needed."""
write_engine: str = self.actor.config["files"].get("write_engine", "astropy")
if write_engine == "fitsio":
try:
import fitsio # noqa: F401
except ImportError:
raise ImportError(
"fitsio is required to use fitsio. You can install "
"it with 'pip install fitsio' or 'pip install sdss-archon[fitsio]'."
)