[docs]classArchonCommandStatus(enum.Enum):"""Status of an Archon command."""DONE=enum.auto()FAILED=enum.auto()RUNNING=enum.auto()TIMEDOUT=enum.auto()
[docs]classArchonCommand(asyncio.Future):"""Tracks the status and replies to a command sent to the Archon. ``ArchonCommand`` is a `~asyncio.Future` and can be awaited, at which point the command will have completed or failed. Parameters ---------- command_string The command to send to the Archon. Will be converted to uppercase. command_id The command id to associate with this message. controller The controller that is running this command. expected_replies How many replies to expect from the controller before the command is done. timeout Time without receiving a reply after which the command will be timed out. `None` disables the timeout. """def__init__(self,command_string:str,command_id:int,controller=None,expected_replies:Optional[int]=1,timeout:Optional[float]=None,):super().__init__()self.command_string=command_string.upper()self.command_id=command_idself.controller=controllerself._expected_replies=expected_replies#: List of str or bytes: List of replies received for this command.self.replies:list[ArchonCommandReply]=[]#: .ArchonCommandStatus: The status of the command.self.status=ArchonCommandStatus.RUNNINGifself.command_id<0orself.command_id>MAX_COMMAND_ID:raiseValueError(f"command_id must be between 0x00 and 0x{MAX_COMMAND_ID:X}")self.timer:Optional[Timer]=Timer(timeout,self._timeout)iftimeoutelseNoneself.__event=asyncio.Event()@propertydefraw(self):"""Returns the raw command sent to the Archon (without the newline)."""returnf">{self.command_id:02X}{self.command_string}"
[docs]defprocess_reply(self,reply:bytes)->ArchonCommandReply|None:"""Processes a new reply to this command. The Archon can reply to a command of the form ``>xxCOMMAND`` (where ``xx`` is a 2-digit hexadecimal) with ``?xx`` to indicate failure or ``<xxRESPONSE``. In the latter case the ``RESPONSE`` ends with a newline. The Archon can also reply with ``<xx:bbbbb...bbbb`` with the ``:`` indicating that what follows is a binary string with 1024 characters. In this case the reply does not end with a newline. Parameters ---------- reply The received reply, as bytes. """try:archon_reply=ArchonCommandReply(reply,self)exceptArchonErroraserr:warnings.warn(str(err),ArchonUserWarning)self._mark_done(self.status.FAILED)returnifarchon_reply.command_id!=self.command_id:warnings.warn(f"Received reply to command {self.raw} that does not match "f"the command id: {reply.decode()}",ArchonUserWarning,)self._mark_done(self.status.FAILED)returnself.replies.append(archon_reply)self.__event.set()# Release the event to indicate a new reply has been added.ifself.timer:self.timer.reset()ifarchon_reply.type=="?":self._mark_done(self.status.FAILED)returnarchon_replyifself._expected_repliesandlen(self.replies)==self._expected_replies:self._mark_done()returnarchon_reply
[docs]asyncdefget_replies(self)->AsyncGenerator[ArchonCommandReply,None]:"""Yields an asynchronous generator of replies as they are produced."""n_output=0whileTrue:awaitself.__event.wait()iflen(self.replies)>n_output:yieldself.replies[-1]n_output+=1ifself.done():breakelse:self.__event.clear()
[docs]defsucceeded(self):"""Reports the command success status. Returns `True` if the command succeeded, or `False` if it failed, timed out, or if the command is not yet done. """returnself.status==self.status.DONE
def_mark_done(self,status:ArchonCommandStatus=ArchonCommandStatus.DONE):"""Marks the command done with ``status``."""self.status=statusifnotself.done():self.set_result(self)# Release the event one last time to let the loop to finish and cancel timer.self.__event.set()ifself.timer:self.timer.cancel()# Return ID to the poolifself.controller:self.controller._id_pool.add(self.command_id)def_timeout(self):"""Marks the command timed out."""self._mark_done(self.status.TIMEDOUT)def__repr__(self):returnf"<ArchonCommand ({self.raw}, status={self.status})>"
[docs]classArchonCommandReply:"""A reply received from the Archon to a given command. When ``str(archon_command_reply)`` is called, the reply (without the reply code or command id) is returned, except when the reply is binary in which case an error is raised. Parameters ---------- raw_reply The raw reply received from the Archon. command The command associated with the reply. Raise ----- .ArchonError Raised if the reply cannot be parsed. """def__init__(self,raw_reply:bytes,command:ArchonCommand):parsed=REPLY_RE.match(raw_reply)ifnotparsed:raiseArchonError(f"Received unparseable reply to command {command.raw}: {raw_reply}")self.command=commandself.raw_reply=raw_replyrtype,rcid,rbin,rmessage=parsed.groups()self.type:str=rtype.decode()self.command_id:int=int(rcid,16)self.is_binary:bool=rbin.decode("latin-1")==":"self.reply:str|bytesifself.is_binary:# If the reply is binary we have already removed all the headers except# the one for the first block.self.reply=raw_reply[4:]else:ifrmessage.endswith(b"\n"):rmessage=rmessage[:-1]self.reply=rmessage.decode("latin-1")def__str__(self)->str:ifisinstance(self.reply,bytes):raiseArchonError("The reply is binary and cannot be converted to string.")returnself.replydef__repr__(self):returnf"<ArchonCommandReply ({self.raw_reply})>"