Source code for quantify_scheduler.instrument_coordinator.instrument_coordinator

# Repository: https://gitlab.com/quantify-os/quantify-scheduler
# Licensed according to the LICENCE file on the main branch
"""Module containing the main InstrumentCoordinator Component."""
from __future__ import annotations

import warnings
from typing import Dict

import numpy as np
from qcodes.instrument import base as qcodes_base
from qcodes.instrument import parameter
from qcodes.utils import validators
from xarray import Dataset

from quantify_scheduler import CompiledSchedule
from quantify_scheduler.instrument_coordinator.components import base, generic
from quantify_scheduler.instrument_coordinator.utility import (
    check_already_existing_acquisition,
)


[docs] class InstrumentCoordinator(qcodes_base.Instrument): """ The :class:`~.InstrumentCoordinator` serves as the central interface of the hardware abstraction layer. It provides a standardized interface to execute Schedules on control hardware. The :class:`~.InstrumentCoordinator` has two main functionalities exposed to the user, the ability to configure its :mod:`~.instrument_coordinator.components` representing physical instruments, and the ability to execute experiments. .. admonition:: Executing a schedule using the instrument coordinator :class: dropdown To execute a :class:`~.Schedule` , one needs to first compile a schedule and then configure all the instrument coordinator components using :meth:`~.InstrumentCoordinator.prepare`. After starting the experiment, the results can be retrieved using :meth:`~.InstrumentCoordinator.retrieve_acquisition`. .. code-block:: from quantify_scheduler.backends.graph_compilation import SerialCompiler my_sched: Schedule = ... # a Schedule describing the experiment to perform quantum_device: QuantumDevice = ... # the device under test hardware_config: dict = ... # a config file describing the connection to the hardware quantum_device.hardware_config(hardware_config) compiler = SerialCompiler(name="compiler") compiled_sched = compiler.compile( schedule=sched, config=quantum_device.generate_compilation_config() ) instrument_coordinator.prepare(compiled_sched) instrument_coordinator.start() dataset = instrument_coordinator.retrieve_acquisition() .. admonition:: Adding components to the instrument coordinator :class: dropdown In order to distribute compiled instructions and execute an experiment, the instrument coordinator needs to have references to the individual instrument coordinator components. They can be added using :meth:`~.InstrumentCoordinator.add_component`. .. code-block:: instrument_coordinator.add_component(qcm_component) Parameters ---------- name The name for the instrument coordinator instance. add_default_generic_icc If True, automatically adds a GenericInstrumentCoordinatorComponent to this instrument coordinator with the default name. """ def __init__(self, name: str, add_default_generic_icc: bool = True) -> None: super().__init__(name)
[docs] self.components = parameter.ManualParameter( "components", initial_value=[], vals=validators.Lists(validators.Strings()), docstring="A list containing the names of all components that" " are part of this InstrumentCoordinator.", instrument=self, )
[docs] self.timeout = parameter.ManualParameter( "timeout", unit="s", initial_value=60, vals=validators.Numbers(min_value=0), docstring="The timeout used for waiting for the experiment to complete " "when retrieving acquisitions.", instrument=self, )
[docs] self._last_schedule = None
if add_default_generic_icc: self.add_component( generic.GenericInstrumentCoordinatorComponent(generic.DEFAULT_NAME) )
[docs] self._compiled_schedule = None
@property
[docs] def last_schedule(self) -> CompiledSchedule: """ Returns the last schedule used to prepare the instrument coordinator. This feature is intended to aid users in debugging. """ if self._last_schedule is None: raise ValueError( f"No {CompiledSchedule.__name__} was handled by the instrument " "coordinator. Try calling the .prepare() method with a Schedule." ) return self._last_schedule
@property
[docs] def is_running(self) -> bool: """ Returns if any of the :class:`.InstrumentCoordinator` components is running. Returns ------- : The :class:`.InstrumentCoordinator`'s running state. """ return any( self.find_instrument(c_name).is_running is True for c_name in self.components() )
[docs] def get_component(self, name: str) -> base.InstrumentCoordinatorComponentBase: """ Returns the InstrumentCoordinator component by name. Parameters ---------- name The component name. Returns ------- : The component. Raises ------ KeyError If key ``name`` is not present in ``self.components``. """ if name in self.components(): return self.find_instrument(name) raise KeyError( f"'{name.split('ic_')[1]}' appears in the hardware config," f" but was not added as a component to InstrumentCoordinator '{self.name}'." )
[docs] def add_component( self, component: base.InstrumentCoordinatorComponentBase, ) -> None: """ Adds a component to the components collection. Parameters ---------- component The component to add. Raises ------ ValueError If a component with a duplicated name is added to the collection. TypeError If :code:`component` is not an instance of the base component. """ if component.name in self.components(): raise ValueError(f"'{component.name}' has already been added!") if not isinstance(component, base.InstrumentCoordinatorComponentBase): raise TypeError( f"{repr(component)} is not " f"{base.__name__}.{base.InstrumentCoordinatorComponentBase.__name__}." ) self.components().append(component.name) # list gets updated in place
[docs] def remove_component(self, name: str) -> None: """ Removes a component by name. Parameters ---------- name The component name. """ self.components().remove(name) # list gets updated in place
[docs] def prepare( self, compiled_schedule: CompiledSchedule, ) -> None: """ Prepares each component for execution of a schedule. It attempts to configure all instrument coordinator components for which compiled instructions, typically consisting of a combination of sequence programs, waveforms and other instrument settings, are available in the compiled schedule. Parameters ---------- compiled_schedule A schedule containing the information required to execute the program. Raises ------ KeyError If the compiled schedule contains instructions for a component absent in the instrument coordinator. TypeError If the schedule provided is not a valid :class:`.CompiledSchedule`. """ self._compiled_schedule = compiled_schedule if not CompiledSchedule.is_valid(self._compiled_schedule): raise TypeError( f"{self._compiled_schedule} is not a valid {CompiledSchedule.__name__}" ) # Adds a reference to the last prepared schedule this can be accessed through # the self.last_schedule property. self._last_schedule = self._compiled_schedule compiled_instructions = self._compiled_schedule["compiled_instructions"] # Compiled instructions are expected to follow the structure of a dict # with keys corresponding to instrument names (InstrumentCoordinatorComponent's) # and values containing instructions in the format specific to that type # of hardware. See also the specification in the CompiledSchedule class. for instrument_name, args in compiled_instructions.items(): self.get_component( base.instrument_to_component_name(instrument_name) ).prepare(args)
[docs] def start(self) -> None: """ Start all of the components that appear in the compiled instructions. The instruments will be started in the order in which they were added to the instrument coordinator. """ if self._compiled_schedule is None: raise ValueError( "Attempting to start `InstrumentCoordinator` without a compiled " "schedule. Please pass a compiled schedule to `.prepare` before " "starting the `InstrumentCoordinator`. e.g. \n" " > ic.prepare(compiled_schedule)\n" " > ic.start()\n" ) compiled_instructions = self._compiled_schedule.get("compiled_instructions", {}) used_components = [ base.instrument_to_component_name(name) for name in compiled_instructions.keys() ] for component_name in self.components(): if component_name in used_components: component = self.get_component(component_name) component.start()
[docs] def stop(self, allow_failure=False) -> None: """ Stops all components. The components are stopped in the order in which they were added. Parameters ---------- allow_failure By default it is set to `False`. When set to `True`, the AttributeErrors raised by a component are demoted to warnings to allow other components to stop. """ for instr_name in self.components(): if allow_failure: try: instrument = self.find_instrument(instr_name) instrument.stop() except AttributeError as e: warnings.warn( f"When stopping instrument {instr_name}: Error \n {e}." ) else: instrument = self.find_instrument(instr_name) instrument.stop()
[docs] def retrieve_acquisition(self) -> Dataset: """ Retrieves the latest acquisition results of the components with acquisition capabilities. Returns ------- : The acquisition data in an :code:`xarray.Dataset`. For each acquisition channel it contains an :code:`xarray.DataArray`. """ self.wait_done(timeout_sec=self.timeout()) acquisitions: Dataset = Dataset() compiled_instructions = self._compiled_schedule.get("compiled_instructions", {}) for instrument_name in compiled_instructions: component_acquisitions = self.get_component( base.instrument_to_component_name(instrument_name) ).retrieve_acquisition() if component_acquisitions is not None: check_already_existing_acquisition( new_dataset=component_acquisitions, current_dataset=acquisitions ) acquisitions = acquisitions.merge(component_acquisitions) return acquisitions
[docs] def wait_done(self, timeout_sec: int = 10) -> None: """ Awaits each component until it is done. The timeout in seconds specifies the allowed amount of time to run before it times out. Parameters ---------- timeout_sec The maximum amount of time in seconds before a timeout. """ for instr_name in self.components(): instrument = self.find_instrument(instr_name) self.get_component(instrument.name).wait_done(timeout_sec)
[docs] def retrieve_hardware_logs(self) -> Dict[str, dict]: """ Return the hardware logs of the instruments of each component. The instruments must be referenced in the :class:`.CompiledSchedule`. Returns ------- : A nested dict containing the components hardware logs """ if not self._compiled_schedule: raise RuntimeError( "Compiled schedule not found. Please prepare the `InstrumentCoordinator`." ) hardware_logs = {} for instr_name in self.components(): component = self.get_component(instr_name) if ( hardware_log := component.get_hardware_log(self._compiled_schedule) ) is not None: hardware_logs[component.instrument.name] = hardware_log return hardware_logs
[docs] def _convert_acquisition_data_format(raw_results): acquisition_dict = {} for channel in raw_results: if channel not in acquisition_dict.keys(): acquisition_dict[channel] = [] acquisition_dict[channel] = raw_results[channel].values acquisitions_list = [] for channel in acquisition_dict: acquisitions_list.append(np.array(acquisition_dict.get(channel))) return acquisitions_list
[docs] class ZIInstrumentCoordinator(InstrumentCoordinator): """ Support ZI backend. This class is a hack and extension to the :class:`.InstrumentCoordinator`, which is introduced to support the quirks when using the ZI backend during the acquisition of results. """ def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs)
[docs] self.timeout_reacquire = parameter.ManualParameter( "timeout_reacquire", unit="", initial_value=True, vals=validators.Bool(), docstring="Turns on reacquisition in case " "of timeouts.", instrument=self, )
[docs] self.max_num_reacquisitions = parameter.ManualParameter( "max_num_reacquisitions", unit="", initial_value=5, vals=validators.Numbers(min_value=0, max_value=50), docstring="The number of retries to retrieve acquisitions in case " "of timeouts.", instrument=self, )
[docs] self._last_acquisition = None
[docs] self._num_reacquisitions = 0
[docs] def _compare_reacquire(self, raw_results): reacquire = True results_list = _convert_acquisition_data_format(raw_results) if self._last_acquisition is not None: last_acquisition_list = _convert_acquisition_data_format( self._last_acquisition ) difference_np_array = np.linalg.norm( np.array(results_list[0]) - np.array(last_acquisition_list[0]) ) if difference_np_array > 0.0: reacquire = False self._num_reacquisitions = 0 return reacquire
[docs] def retrieve_acquisition(self) -> Dataset: """ Retrieves the latest acquisition results of the components. Returns ------- : The acquisition data in an :code:`xarray.Dataset`. For each acquisition channel it contains an :code:`xarray.DataArray`. """ raw_acq_results = super().retrieve_acquisition() if self.timeout_reacquire(): reacquire = self._compare_reacquire(raw_acq_results) self._last_acquisition = raw_acq_results self._num_reacquisitions = 0 while reacquire: self._num_reacquisitions += 1 print( "Re-running acquisition " + f"{self._num_reacquisitions}/{self.max_num_reacquisitions()}" ) if self._num_reacquisitions >= self.max_num_reacquisitions(): raise RuntimeError( "Exceeded number of reacquisitions:" + f"{self._num_reacquisitions}/" + f"{self.max_num_reacquisitions()}" ) self.start() self.wait_done() # Acquire results raw_acq_results = super().retrieve_acquisition() reacquire = self._compare_reacquire(raw_acq_results) self._last_acquisition = raw_acq_results return raw_acq_results