Source code for archon.actor.actor

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2021-01-21
# @Filename: actor.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import os
import pathlib
import warnings
from contextlib import suppress

from typing import ClassVar, Dict, Type

import astropy.time
import click

from clu import Command
from clu.actor import AMQPActor, BaseActor
from sdsstools import get_sjd
from sdsstools.configuration import Configuration

from archon import __version__
from archon.actor.recovery import ExposureRecovery
from archon.controller.command import ArchonCommand
from archon.controller.controller import ArchonController
from archon.exceptions import ArchonUserWarning

from .commands import parser as archon_command_parser
from .delegate import ExposureDelegate


__all__ = ["ArchonBaseActor", "ArchonActor"]


[docs] class ArchonBaseActor(BaseActor): """Archon controller base actor. This class is intended to be subclassed with a specific actor class (normally ``AMQPActor`` or ``LegacyActor``). Parameters ---------- controllers The list of `.ArchonController` instances to manage. """ parser: ClassVar[click.Group] = archon_command_parser is_legacy: bool = False BASE_CONFIG: ClassVar[str | Dict | None] = None DELEGATE_CLASS: ClassVar[Type[ExposureDelegate]] = ExposureDelegate CONTROLLER_CLASS: ClassVar[Type[ArchonController]] = ArchonController def __init__( self, *args, controllers: tuple[ArchonController, ...] = (), run_recovery_on_start: bool = True, **kwargs, ): #: dict[str, ArchonController]: A mapping of controller name to controller. self.controllers = {c.name: c for c in controllers} self.parser_args = [self.controllers] if "schema" not in kwargs: kwargs["schema"] = os.path.join( os.path.dirname(__file__), "../etc/schema.json", ) super().__init__(*args, **kwargs) self.observatory = os.environ.get("OBSERVATORY", "LCO") actor_version = kwargs.get("version", None) self.version = actor_version if actor_version else f"archon-{__version__}" # Issue status and system on a loop. # self.timed_commands.add_command("status", delay=60) # type: ignore # self.timed_commands.add_command("system", delay=60) # type: ignore self.exposure_delegate = self.DELEGATE_CLASS(self) self.run_recovery_on_start = run_recovery_on_start self.exposure_recovery = ExposureRecovery(self.controllers) self._fetch_log_jobs = [] self._status_jobs = [] self.config_file_path: str | None = None
[docs] async def start(self, **_): """Start the actor and connect the controllers.""" connect_timeout = self.config["timeouts"]["controller_connect"] connect_timeout = 10 for controller in self.controllers.values(): try: await asyncio.wait_for(controller.start(), timeout=connect_timeout) except asyncio.TimeoutError: warnings.warn( f"Timeout out connecting to {controller.name!r}.", ArchonUserWarning, ) except Exception as err: warnings.warn( f"Failed connecting to controller {controller.name} at " f"{controller.host}: {err}", ArchonUserWarning, ) await super().start() self._fetch_log_jobs = [ asyncio.create_task(self._fetch_log(controller)) for controller in self.controllers.values() ] self._status_jobs = [ asyncio.create_task(self._report_status(controller)) for controller in self.controllers.values() ] # Depending on how the actor is initialised exposure_recovery may not have # been set with the actual controllers. self.exposure_recovery.controllers = self.controllers if self.run_recovery_on_start: await self._recover_exposures() return self
[docs] async def stop(self): with suppress(asyncio.CancelledError): for task in self._fetch_log_jobs: task.cancel() await task for controller in self.controllers.values(): await controller.stop() return await super().stop()
[docs] @classmethod def from_config(cls, config, *args, **kwargs): """Creates an actor from a configuration file.""" if config is None: if cls.BASE_CONFIG is None: raise RuntimeError("The class does not have a base configuration.") config = cls.BASE_CONFIG instance = super(ArchonBaseActor, cls).from_config(config, *args, **kwargs) if isinstance(config, (str, pathlib.Path)): instance.config_file_path = str(config) elif isinstance(config, Configuration): instance.config_file_path = str( config._BASE_CONFIG_FILE if config._BASE_CONFIG_FILE else config._CONFIG_FILE ) assert isinstance(instance, ArchonBaseActor) assert isinstance(instance.config, dict) enabled_controllers = instance.config.get("enabled_controllers", None) if "controllers" in instance.config: controllers = ( cls.CONTROLLER_CLASS( ctrname, ctr["host"], ctr["port"], config=instance.config, ) for (ctrname, ctr) in instance.config["controllers"].items() if enabled_controllers is not None and ctrname in enabled_controllers ) instance.controllers = {c.name: c for c in controllers} instance.parser_args = [instance.controllers] # Need to refresh this return instance
async def _fetch_log(self, controller: ArchonController): # pragma: no cover """Fetches the log and outputs new messages. This is not implemented as a timed command because we don't want a new command popping up and running every second. We write to all users only when there's a new log. """ while True: if not controller.is_connected(): await asyncio.sleep(1) continue cmd: ArchonCommand = await controller.send_command("FETCHLOG") if cmd.succeeded() and len(cmd.replies) == 1: if str(cmd.replies[0].reply) not in ["(null)", ""]: self.write( log=dict( controller=controller.name, log=str(cmd.replies[0].reply), ) ) continue # There may be more messages, so don't wait. await asyncio.sleep(1) async def _report_status(self, controller: ArchonController): """Reports the status of the controller.""" async for status in controller.yield_status(): self.write( message_code="d", status=dict( controller=controller.name, status=status.value, status_names=[flag.name for flag in status.get_flags()], ), ) async def _recover_exposures(self): """Recovers any failed exposures for the current MJD.""" now = astropy.time.Time.now() mjd = get_sjd() if self.config.get("files.use_sjd", False) else int(now.mjd) data_dir = pathlib.Path(self.config.get("files.data_dir", "/data")) mjd_dir = data_dir / str(mjd) if not mjd_dir.exists(): self.write("w", f"Cannot find directory for MJD {mjd}. Skipping recovery.") return with self.exposure_recovery.set_command(self): recovered = await self.exposure_recovery.recover( self.config["controllers"], path=mjd_dir, write_checksum=self.config["checksum.write"], checksum_mode=self.config["checksum.mode"], checksum_file=self.config["checksum.file"], ) return recovered
[docs] class ArchonActor(ArchonBaseActor, AMQPActor): """Archon actor based on the AMQP protocol.""" pass
ArchonCommandType = Command[ArchonActor]