Source code for quantify_scheduler.gettables

# Repository: https://gitlab.com/quantify-os/quantify-scheduler
# Licensed according to the LICENCE file on the main branch
r"""
Module containing :class:`quantify_core.measurement.types.Gettable`\s for use with
quantify-scheduler.

.. warning::

    The gettable module is expected to change significantly as the
    acquisition protocols (#36 and #80) get fully supported by the scheduler.
    Currently different Gettables are required for different acquisition modes.
    The intent is to have one generic ``ScheduleGettable``.
    Expect breaking changes.
"""
from __future__ import annotations

import json
import logging
import os
import time
import zipfile
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union

import numpy as np
from qcodes.parameters import Parameter
from qcodes.utils.json_utils import NumpyJSONEncoder
from quantify_core.data.handling import gen_tuid, get_datadir, snapshot

from quantify_scheduler import Schedule
from quantify_scheduler.device_under_test.quantum_device import QuantumDevice
from quantify_scheduler.enums import BinMode
from quantify_scheduler.helpers.schedule import (
    AcquisitionMetadata,
    extract_acquisition_metadata_from_schedule,
)

if TYPE_CHECKING:
    from quantify_scheduler.schedules.schedule import CompiledSchedule

[docs]logger = logging.getLogger(__name__)
# pylint: disable=too-many-instance-attributes # pylint: disable=too-few-public-methods
[docs]class ScheduleGettable: """ Generic gettable for a quantify schedule using vector (I,Q) acquisition. Can be set to return either static (demodulated) I and Q values or magnitude and phase. The gettable evaluates the parameters passed as `schedule_kwargs`, then generates the :class:`quantify_scheduler.schedules.schedule.Schedule` using the `schedule_function`, this is then compiled and finally executed by the :class:`~.InstrumentCoordinator`. """ # pylint: disable=line-too-long # pylint: disable=too-many-arguments # pylint: disable=line-too-long def __init__( self, quantum_device: QuantumDevice, schedule_function: Callable[..., Schedule], schedule_kwargs: Dict[str, Any], num_channels: int = 1, data_labels: Optional[List[str]] = None, real_imag: bool = True, batched: bool = False, max_batch_size: int = 1024, always_initialize=True, ): """ Create a new instance of ScheduleGettable which is used to do I and Q acquisition or alternatively magnitude and phase. Parameters ---------- quantum_device The qcodes instrument representing the quantum device under test (DUT) containing quantum device properties and setup configuration information. schedule_function A function which returns a :class:`quantify_scheduler.schedules.schedule.Schedule`. The function is required to have the `repetitions` keyword argument. schedule_kwargs The schedule function keyword arguments, when a value in this dictionary is a :class:`~qcodes.instrument.parameter.Parameter`, this parameter will be evaluated every time :code:`.get()` is called before being passed to the :code:`schedule_function`. num_channels The number of channels to expect in the acquisition data. data_labels Allows to specify custom labels. Needs to be precisely 2*num_channels if specified. The order is [Voltage I 0, Voltage Q 0, Voltage I 1, Voltage Q 1, ...], in case real_imag==True, otherwise [Magnitude 0, Phase 0, Magnitude 1, Phase 1, ...]. real_imag If true, the gettable returns I, Q values. Otherwise, magnitude and phase (degrees) are returned. batched Used to indicate if the experiment is performed in batches or in an iterative fashion. max_batch_size: Determines the maximum number of points to acquire when acquiring in batched mode. Can be used to split up a program in parts if required due to hardware constraints. always_initialize: If True, then reinitialize the schedule on each invocation of `get`. If False, then only initialize the first invocation of `get`. """ self._data_labels_specified = data_labels is not None self.always_initialize = always_initialize self.is_initialized = False self._compiled_schedule: CompiledSchedule | None = None self.real_imag = real_imag if self.real_imag: self.name = ["I", "Q"] * num_channels if data_labels: self.label = data_labels else: self.label = [ f"Voltage {iq}{ch}" for ch in range(num_channels) for iq in ["I", "Q"] ] logger.info(f"Auto-generating labels. {self.label}") self.unit = ["V", "V"] * num_channels else: self.name = ["magn", "phase"] * num_channels if data_labels: self.label = data_labels else: self.label = [ f"{val_label}{ch}" for ch in range(num_channels) for val_label in ["Magnitude", "Phase"] ] logger.info(f"Auto-generating labels. {self.label}") self.unit = ["V", "deg"] * num_channels self.batched = batched self.batch_size = max_batch_size # schedule arguments self.schedule_function = schedule_function self.schedule_kwargs = schedule_kwargs self._evaluated_sched_kwargs = {} # the quantum device object containing setup configuration information self.quantum_device = quantum_device # The backend used for compilation. Available as a private variable # to facilitate debugging. Will be assigned upon compilation in self.initialize self._backend = None def __call__(self) -> Union[Tuple[float, ...], Tuple[np.ndarray, ...]]: """Acquire and return data""" return self.get()
[docs] def _compile(self, sched): """Compile schedule, separated to allow for profiling compilation duration.""" compilation_config = self.quantum_device.generate_compilation_config() # made into a private variable for debugging and future caching functionality self._backend = compilation_config.backend(name=compilation_config.name) self._compiled_schedule = self._backend.compile( schedule=sched, config=compilation_config )
[docs] def initialize(self): """ This generates the schedule and uploads the compiled instructions to the hardware using the instrument coordinator. """ logger.debug("Initializing schedule gettable.") self._evaluated_sched_kwargs = _evaluate_parameter_dict(self.schedule_kwargs) # generate a schedule using the evaluated keyword arguments dict sched = self.schedule_function( **self._evaluated_sched_kwargs, repetitions=self.quantum_device.cfg_sched_repetitions(), ) self._compile(sched) instr_coordinator = self.quantum_device.instr_instrument_coordinator.get_instr() instr_coordinator.prepare(self._compiled_schedule) self.is_initialized = True
@property
[docs] def compiled_schedule(self) -> CompiledSchedule | None: """Return the schedule used in this class""" return self._compiled_schedule
[docs] def get(self) -> Union[Tuple[float, ...], Tuple[np.ndarray, ...]]: """ Start the experimental sequence and retrieve acquisition data. Returns ------- : The acquired I/Q voltage signal as a complex number, split into a tuple of floats: either real/imaginary parts or magnitude/phase, depending on whether :code:`real_imag` is :code:`True`. """ instr_coordinator = self.quantum_device.instr_instrument_coordinator.get_instr() # ensure the instruments are not running and we are starting from a clean state # We switch on allow_failure so that some instruments which require the prepare # before the stop can fail. instr_coordinator.stop(allow_failure=True) if not self.is_initialized or self.always_initialize: self.initialize() if self.compiled_schedule is None: raise RuntimeError( "No compiled schedule was found. Either the schedule was not " "compiled, or the compiled schedule was not assigned to the " "correct attribute." ) instr_coordinator.start() acquired_data = instr_coordinator.retrieve_acquisition() instr_coordinator.stop() acq_metadata = extract_acquisition_metadata_from_schedule( self.compiled_schedule ) if len(acquired_data) == 0 and len(acq_metadata.acq_indices) != 0: raise RuntimeError( f"InstrumentCoordinator.retrieve_acquisition() " f"('{instr_coordinator.name}') " f"did not return any data, but was expected to return data based on " f"the acquisition metadata in the compiled schedule: " f"{acq_metadata.acq_indices=}" ) result = self.process_acquired_data( acquired_data, acq_metadata, self.compiled_schedule.repetitions ) return result
[docs] def _reshape_data(self, acq_metadata, vals): if acq_metadata.acq_protocol == "TriggerCount": return [vals.real.astype(np.uint64)] if ( acq_metadata.acq_protocol == "Trace" or acq_metadata.acq_protocol == "SSBIntegrationComplex" or acq_metadata.acq_protocol == "WeightedIntegratedComplex" ): ret_val = [] if self.real_imag: ret_val.append(vals.real) ret_val.append(vals.imag) return ret_val else: ret_val.append(np.abs(vals)) ret_val.append(np.angle(vals, deg=True)) return ret_val raise NotImplementedError( f"Acquisition protocol {acq_metadata.acq_protocol} with bin" f" mode {acq_metadata.bin_mode} is not supported." )
[docs] def _process_acquired_data_trigger_count( self, acquired_data, acq_metadata: AcquisitionMetadata, repetitions: int ) -> Dict[int, np.ndarray]: """Reformat acquired data in a dictionary. Used by process_acquired_data. Parameters ---------- acquired_data Acquired data as returned by instrument coordinator acq_metadata Acquisition metadata from schedule repetitions Number of repetitions of the schedule Returns ------- : Dictionary with reformatted data. Keys correspond to the acquisition channel. Values are 1d numpy arrays with trigger counts. Raises ------ NotImplementedError If acquisition protocol other than BinMode.APPEND is used. """ dataset = {} if acq_metadata.bin_mode == BinMode.APPEND: for acq_channel, acq_indices in acq_metadata.acq_indices.items(): dataset[acq_channel] = ( acquired_data[acq_channel] .sel(repetition=0) .coords["acq_index"] .values ) return dataset raise NotImplementedError( f"Acquisition protocol {acq_metadata.acq_protocol} with bin" f" mode {acq_metadata.bin_mode} is not supported." )
[docs] def process_acquired_data( self, acquired_data, acq_metadata: AcquisitionMetadata, repetitions: int ) -> Union[Tuple[float, ...], Tuple[np.ndarray, ...]]: """ Reshapes the data as returned from the instrument coordinator into the form accepted by the measurement control. """ # pylint: disable=fixme # FIXME: this reshaping should happen inside the instrument coordinator # blocked by quantify-core#187, and quantify-core#233 # retrieve the acquisition results # FIXME: acq_metadata should be an attribute of the schedule, see also #192 if acq_metadata.acq_protocol == "TriggerCount": dataset = self._process_acquired_data_trigger_count( acquired_data, acq_metadata, repetitions ) elif acq_metadata.acq_protocol == "Trace": dataset = {} for acq_channel, acq_indices in acq_metadata.acq_indices.items(): # Trace only supports AVERAGE binmode, therefore only has values at repetition=0. dataset[acq_channel] = ( acquired_data[acq_channel].sel(repetition=0).values ) elif acq_metadata.bin_mode == BinMode.AVERAGE: dataset = {} for acq_channel, acq_indices in acq_metadata.acq_indices.items(): dataset[acq_channel] = np.zeros(len(acq_indices), dtype=complex) for acq_idx in acq_indices: dataset[acq_channel][acq_idx] = ( acquired_data[acq_channel].sel(acq_index=acq_idx).values ) elif acq_metadata.bin_mode == BinMode.APPEND: dataset = {} for acq_channel, acq_indices in acq_metadata.acq_indices.items(): dataset[acq_channel] = np.zeros( len(acq_indices) * repetitions, dtype=complex ) acq_stride = len(acq_indices) for acq_idx in acq_indices: dataset[acq_channel][acq_idx::acq_stride] = ( acquired_data[acq_channel].sel(acq_index=acq_idx).values ) else: raise NotImplementedError( f"Acquisition protocol {acq_metadata.acq_protocol} with bin" f" mode {acq_metadata.bin_mode} is not supported." ) # Reshaping of the data before returning return_data = [] for idx, (acq_channel, vals) in enumerate(dataset.items()): if not self._data_labels_specified and idx != acq_channel: logger.warning( f"Default data_labels may not match the acquisition channel. " f"Element {idx} with label {self.label[idx]} corresponds to " f"acq_channel {acq_channel}, while they were expected to match. To " f"fix this behavior either specify custom data_labels, or ensure " f"your acquisition channels are sequential starting from 0." ) if self.batched is False: if len(vals) != 1: raise ValueError( f"For iterative mode, only one value is expected for each acquisition channel." f"Got {len(vals)} values for acquisition channel '{acq_channel}' instead." ) return_data.extend(self._reshape_data(acq_metadata, vals)) logger.debug(f"Returning {len(return_data)} values.") return tuple(return_data)
[docs] def generate_diagnostics_report( self, execute_get: bool = False, update: bool = False ) -> str: """ Create a report that saves all information contained in this `ScheduleGettable` and save it in the quantify datadir with its own `tuid`. The information in the report includes the generated schedule, device config, hardware config and snapshot of the instruments. Parameters ---------- execute_get When ``True``, executes ``self.get()`` before generating the report. update When ``True``, updates all parameters before saving the snapshot. Returns ------- : The `tuid` of the generated report. """ tuid = gen_tuid() if execute_get: self.get() if not self.is_initialized: raise RuntimeError( "`generate_diagnostics_report` can only run for an initialized `ScheduleGettable`. " "Please initialize manually or run with `execute_get=True`" ) device_cfg = self.quantum_device.generate_device_config().dict() hardware_cfg = self.quantum_device.generate_hardware_config() gettable_config = { "repetitions": self.quantum_device.cfg_sched_repetitions(), "evaluated_schedule_kwargs": self._evaluated_sched_kwargs, } sched = self.schedule_function( **self._evaluated_sched_kwargs, repetitions=self.quantum_device.cfg_sched_repetitions(), ) filename = os.path.join(get_datadir(), f"{tuid}.zip") with zipfile.ZipFile( filename, mode="w", compression=zipfile.ZIP_DEFLATED, compresslevel=9 ) as zip_file: zip_file.writestr("time.txt", str(time.time())) zip_file.writestr( "device_cfg.json", json.dumps(device_cfg, cls=NumpyJSONEncoder, indent=4), ) zip_file.writestr( "hardware_cfg.json", json.dumps(hardware_cfg, cls=NumpyJSONEncoder, indent=4), ) zip_file.writestr( "gettable.json", json.dumps(gettable_config, cls=NumpyJSONEncoder, indent=4), ) zip_file.writestr("schedule.json", sched.to_json()) zip_file.writestr( "snapshot.json", json.dumps(snapshot(update=update), cls=NumpyJSONEncoder, indent=4), ) return filename
[docs]def _evaluate_parameter_dict(parameters: Dict[str, Any]) -> Dict[str, Any]: r""" Loop over the keys and values in a dict and replaces parameters with their current value. Parameters ---------- parameters A dictionary containing a mix of :class:`~qcodes.instrument.parameter.Parameter`\s and normal values. Returns ------- : The `parameters` dictionary, but with the parameters replaced by their current value. Raises ------ TypeError If a parameter returns None """ evaluated_parameters = dict() for key, val in parameters.items(): if isinstance(val, Parameter): # evaluate the parameter evaluated_parameters[key] = val.get() # verify that the parameter has a value, a missing value typically indicates # that it was not initialized. if evaluated_parameters[key] is None: raise TypeError( f"{key}: parameter {val} returns None. " "It is possible this parameter was not configured correctly." ) else: evaluated_parameters[key] = val return evaluated_parameters