# Repository: https://gitlab.com/quantify-os/quantify-scheduler
# Licensed according to the LICENCE file on the main branch
"""Backend for Zurich Instruments."""
from __future__ import annotations
from quantify_scheduler.compatibility_check import check_zhinst_compatibility
check_zhinst_compatibility()
import logging
import re
import warnings
from collections import defaultdict
from copy import deepcopy
from dataclasses import dataclass
from functools import partial
from typing import (
TYPE_CHECKING,
Any,
Callable,
List,
Literal,
get_args,
)
from uuid import uuid4
import numpy as np
from pydantic import Field, TypeAdapter, model_validator
from zhinst.toolkit.helpers import Waveform
from quantify_scheduler import enums
from quantify_scheduler.backends.corrections import (
apply_software_distortion_corrections,
determine_relative_latency_corrections,
)
from quantify_scheduler.backends.graph_compilation import (
CompilationConfig,
SimpleNodeConfig,
)
from quantify_scheduler.backends.types import common, zhinst
from quantify_scheduler.backends.zhinst import helpers as zi_helpers
from quantify_scheduler.backends.zhinst import resolvers, seqc_il_generator
from quantify_scheduler.backends.zhinst import settings as zi_settings
from quantify_scheduler.helpers import schedule as schedule_helpers
from quantify_scheduler.helpers import waveforms as waveform_helpers
from quantify_scheduler.helpers.collections import (
find_all_port_clock_combinations,
find_port_clock_path,
)
from quantify_scheduler.instrument_coordinator.components.generic import (
DEFAULT_NAME as GENERIC_ICC_DEFAULT_NAME,
)
from quantify_scheduler.operations.control_flow_library import ControlFlowOperation
from quantify_scheduler.operations.pulse_library import SetClockFrequency
from quantify_scheduler.schedules.schedule import (
CompiledSchedule,
Schedulable,
Schedule,
ScheduleBase,
)
if TYPE_CHECKING:
import pandas
from quantify_scheduler.operations.operation import Operation
from quantify_scheduler.resources import Resource
[docs]
logger = logging.getLogger(__name__)
[docs]
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
# List of supported zhinst devices
[docs]
SUPPORTED_DEVICE_TYPES: list[str] = ["HDAWG", "UHFQA"]
[docs]
SUPPORTED_ACQ_PROTOCOLS = Literal["Trace", "SSBIntegrationComplex"]
# https://www.zhinst.com/sites/default/files/documents/2020-09/ziHDAWG_UserManual_20.07.1.pdf
# Section: 3.4. Basic Qubit Characterization, page 83
# All waveform lengths need to be multiples of 16 sample-clock cycles to comply
# with the waveform granularity specification.
# https://www.zhinst.com/sites/default/files/documents/2020-09/ziHDAWG_UserManual_20.07.2.pdf
# page: 262
[docs]
HDAWG_DEVICE_TYPE_CHANNEL_GROUPS: dict[str, dict[int, int]] = {
"HDAWG8": {
# Use the outputs in groups of 2. One sequencer program controls 2 outputs.
0: 2,
# Use the outputs in groups of 4. One sequencer program controls 4 outputs.
1: 4,
# Use the outputs in groups of 8. One sequencer program controls 8 outputs.
2: 8,
},
"HDAWG4": {
# Use the outputs in groups of 2. One sequencer program controls 2 outputs.
0: 2,
# Use the outputs in groups of 4. One sequencer program controls 4 outputs.
1: 4,
},
}
[docs]
DEVICE_SAMPLING_RATES: dict[zhinst.DeviceType, dict[int, int]] = {
zhinst.DeviceType.HDAWG: zi_helpers.get_sampling_rates(2.4e9),
zhinst.DeviceType.UHFQA: zi_helpers.get_sampling_rates(1.8e9),
}
# The sequencer clock rate always is 1/8 of the sampling rate
# (Same for UHFQA and HDAWG as of 2 Nov 2021)
[docs]
CLOCK_SAMPLE_FACTOR = 8
[docs]
NUM_UHFQA_READOUT_CHANNELS = 10
[docs]
MAX_QAS_INTEGRATION_LENGTH = 4096
[docs]
def ensure_no_operations_overlap(timing_table: pandas.DataFrame) -> None:
"""
Iterates over all hardware_channels in a schedule to determine if any of the pulses
have overlap.
Parameters
----------
timing_table:
a timing table containing the absolute time and duration as well as the hardware
channels on which these pulses are to be applied.
Raises
------
ValueError
If there is overlap between operations.
"""
for output_ch in timing_table.hardware_channel.unique():
if output_ch is None:
continue
tt_output_ch = timing_table[timing_table["hardware_channel"] == output_ch]
tt_output_ch = tt_output_ch[tt_output_ch.is_acquisition != False] # noqa: E712
end_time = tt_output_ch["abs_time"] + tt_output_ch["duration"]
# if any have overlap
if (end_time.shift(1) > tt_output_ch["abs_time"]).any():
clashing_ops = tt_output_ch[(end_time.shift(1) > tt_output_ch["abs_time"])]
clashing_op = clashing_ops.iloc[0]
preceding_op = tt_output_ch.loc[clashing_op.name - 1]
raise ValueError(
f"Operation {clashing_op.operation} at time"
f" {clashing_op.abs_time*1e9:.1f} ns "
f"overlaps with {preceding_op.operation} at "
f"time {preceding_op.abs_time*1e9:.1f} ns "
f"on output channel {clashing_op.hardware_channel}."
)
[docs]
def _determine_clock_sample_start(
hardware_channel: str,
abs_time: float,
operation_name: str = "",
) -> tuple[int, float]:
"""
Depending on the output channel, select the right clock cycle time and sample rate
from the channel descriptor for ZI channels.
the sample is returned as a float to preserve information of incorrect rounding to
full samples if present.
"""
if "uhfqa" in hardware_channel:
hw_sample_rate = DEVICE_SAMPLING_RATES[zhinst.DeviceType.UHFQA][
0
] # 0 -Assumes the default setting for the clock select
hw_clock_rate = hw_sample_rate / CLOCK_SAMPLE_FACTOR
# UHFQA has a 4.444 ns clock cycle (8 samples of ~0.55 ns)
# 9 clock cycles = 40 ns
elif "hdawg" in hardware_channel:
hw_sample_rate = DEVICE_SAMPLING_RATES[zhinst.DeviceType.HDAWG][
0
] # 0 -Assumes the default setting for the clock select
hw_clock_rate = hw_sample_rate / CLOCK_SAMPLE_FACTOR
# HDAWG has a 3.333 ns clock cycle (8 samples of ~0.4 ns)
# 3 clock cycles is 10 ns
else:
error_message = (
f"Instrument type for channel {hardware_channel} not recognized. "
+ "Could not determine clock and sample start."
)
raise NotImplementedError(f"{error_message}")
# next determine what clock cycle and sample things happen on.
# here we do a combination of floor and round as the samples are added afterwards.
# The round to 3 decimals serves to catch an edge when rounding to x.999999 clocks.
clock_cycle = np.floor(np.round(abs_time * hw_clock_rate, decimals=3))
sample_time = abs_time - clock_cycle / hw_clock_rate
# first done using floating point to be able to detect incorrect rounding on samples
sample_float = sample_time * hw_sample_rate
sample = round(sample_float)
if not np.all(np.isclose(sample_float, sample, atol=0.0001)):
# tip, the common clock cycle of UHF and HDAWG is 40 ns, for HDAWG pulses only
# 10 ns is a nice clock multiple as well.
raise ValueError(
f"Rounding to samples not exact for operation ({operation_name}) at time "
f"({abs_time*1e9:.1f} ns). Attempting to round ({sample_float}) "
f"to ({sample}) \n TIP: Try to ensure waveforms start a multiple of"
" the samlping rate e.g., try multiples of 10 ns for the HDAWG or 40 ns for"
" UFHQA pulses."
)
return (clock_cycle, sample_float)
[docs]
def _determine_clock_start(hardware_channel: str, abs_time: float, operation_name: str) -> float:
if hardware_channel is None:
return np.nan
clock_start, _ = _determine_clock_sample_start(
hardware_channel=hardware_channel,
abs_time=abs_time,
operation_name=operation_name,
)
return clock_start
[docs]
def _determine_sample_start(hardware_channel: str, abs_time: float, operation_name: str) -> float:
if hardware_channel is None:
return np.nan
_, sample_start = _determine_clock_sample_start(
hardware_channel=hardware_channel,
abs_time=abs_time,
operation_name=operation_name,
)
return sample_start
[docs]
def _apply_latency_corrections(
timing_table: pandas.DataFrame, latency_dict: dict
) -> pandas.DataFrame:
"""
Changes the "abs_time" of a timing table depending on the specified latency
corrections for each port-clock combination as specified in the latency
dict. The corrections are added to the abs_time elements fulfilling the
specific port-clock combination.
"""
for port_clock_combination_key, latency_corr in latency_dict.items():
port, clock = port_clock_combination_key.split("-")
port_mask = timing_table["port"] == port
clock_mask = timing_table["clock"] == clock
full_mask_combination = port_mask * clock_mask
timing_table.loc[full_mask_combination, "abs_time"] += latency_corr
return timing_table
[docs]
def _determine_measurement_fixpoint_correction(
measurement_start_sample: int, common_frequency: float = 600e6
) -> tuple[float, int]:
"""
Calculates by how much time to shift all operations to ensure a measurement starts
at sample 0.
Parameters
----------
measurement_start_sample:
the sample at which the measurement starts
common_frequency:
The shift needs to be such that it occurs at a multiple of the common frequency.
A larger common frequency results in a smaller time correction.
This largest common frequency is the common frequency of the HDAWG and UHFQA and
is 600 MHz.
Returns
-------
tuple[float, int]
The time correction to be applied in seconds.
The correction in the number of samples.
"""
uhf_sampling_rate = 1.8e9
samples_per_clock_cycle = 8
uhf_common_sample = uhf_sampling_rate / common_frequency
if not uhf_common_sample.is_integer():
raise ValueError(
f"Invalid common frequency: The UHF sampling rate ({uhf_sampling_rate}) is "
f"not a multiple of the common frequency {common_frequency}."
)
required_sample_correction = (-measurement_start_sample) % samples_per_clock_cycle
success = False
for i in range(10):
sample_correction = int(i * uhf_common_sample)
if sample_correction % samples_per_clock_cycle == required_sample_correction:
success = True
break
if not success:
raise ValueError("Could not identify a measurement fixpoint correction")
time_shift = sample_correction / uhf_sampling_rate
return time_shift, sample_correction
[docs]
def _apply_measurement_fixpoint_correction(
timing_table: pandas.DataFrame, common_frequency: float = 600e6
) -> pandas.DataFrame:
"""
Updates the abs_time of all operations based on the measurement fixpoint correction.
The abs_time is applied to all operations between two acquisitions.
After that the samples and clocks are re-calculated to reflect this change in time.
Parameters
----------
timing_table:
A timing table that has the samples already determined.
common_frequency:
The shift needs to be such that it occurs at a multiple of the common frequency.
A larger common frequency results in a smaller time correction.
This largest common frequency is the common frequency of the HDAWG and UHFQA and
is 600 MHz.
"""
acquisitions = timing_table[timing_table.is_acquisition]
time_corrections = np.zeros(len(timing_table))
prev_idx = 0
cumulative_sample_corrections = 0
# FIXME: there is an edge-case in the slicing of the operations when
# the pulse of the measurement is applied after triggering the acquisition.
# this should be included explicitly in the indices to slice (acquisitions.index)
for idx, sample_start in zip(acquisitions.index, acquisitions.sample_start):
effective_start_sample = round(sample_start + cumulative_sample_corrections)
time_corr, sample_corr = _determine_measurement_fixpoint_correction(
measurement_start_sample=effective_start_sample,
common_frequency=common_frequency,
)
# all subsequent waveforms are shifted in time
time_corrections[prev_idx:] += time_corr
cumulative_sample_corrections += sample_corr
prev_idx = idx + 1
timing_table["abs_time"] += time_corrections
# After shifting operations to align the measurement for the fixpoint correction the
# clock and sample start needs to be updated.
timing_table = _add_clock_sample_starts(timing_table=timing_table)
return timing_table
[docs]
def _add_clock_sample_starts(timing_table: pandas.DataFrame) -> pandas.DataFrame:
"""
Adds the sequence clock cycle start and sampling start of each operation for each
channel.
"""
timing_table["clock_cycle_start"] = timing_table.apply(
lambda row: _determine_clock_start(
hardware_channel=row["hardware_channel"],
abs_time=row["abs_time"],
operation_name=row["operation"],
),
axis=1,
)
timing_table["sample_start"] = timing_table.apply(
lambda row: _determine_sample_start(
hardware_channel=row["hardware_channel"],
abs_time=row["abs_time"],
operation_name=row["operation"],
),
axis=1,
)
return timing_table
[docs]
def _parse_local_oscillators(data: dict[str, Any]) -> dict[str, zhinst.LocalOscillator]:
"""
Returns the LocalOscillator domain models parsed from the data dictionary.
Parameters
----------
data :
The hardware map "local_oscillators" entry.
Returns
-------
:
A dictionary of unique LocalOscillator instances.
Raises
------
RuntimeError
If duplicate LocalOscillators have been found.
"""
local_oscillators: dict[str, zhinst.LocalOscillator] = dict()
lo_list_validator = TypeAdapter(List[zhinst.LocalOscillator])
lo_list = lo_list_validator.validate_python(data)
for local_oscillator in lo_list:
if local_oscillator.unique_name in local_oscillators:
raise RuntimeError(
f"Duplicate entry LocalOscillators '{local_oscillator.unique_name}' in "
"hardware configuration!"
)
local_oscillators[local_oscillator.unique_name] = local_oscillator
return local_oscillators
[docs]
def _parse_devices(data: dict[str, Any]) -> list[zhinst.Device]:
device_list_validator = TypeAdapter(List[zhinst.Device])
device_list = device_list_validator.validate_python(data)
for device in device_list:
if device.device_type.value not in SUPPORTED_DEVICE_TYPES:
raise NotImplementedError(
f"Unable to create zhinst backend for '{device.device_type.value}'!"
)
sample_rates = DEVICE_SAMPLING_RATES[device.device_type]
if device.clock_select not in sample_rates:
raise ValueError(
f"Unknown value clock_select='{device.clock_select}' "
+ f"for device type '{device.device_type.value}'"
)
device.sample_rate = sample_rates[device.clock_select]
return device_list
[docs]
def _validate_schedule(schedule: Schedule) -> None:
"""
Validates the CompiledSchedule required values for creating the backend.
Parameters
----------
schedule :
Raises
------
ValueError
The validation error.
"""
if len(schedule.schedulables) == 0:
raise ValueError(f"Undefined schedulables for schedule '{schedule.name}'!")
for schedulable in schedule.schedulables.values():
if "abs_time" not in schedulable:
raise ValueError(
"Absolute timing has not been determined " + f"for the schedule '{schedule.name}'!"
)
for op in schedule.operations.values():
for pulse_data in op.data["pulse_info"]:
if pulse_data.get("reference_magnitude", None) is not None:
raise NotImplementedError
if isinstance(op, (ControlFlowOperation, SetClockFrequency)):
raise NotImplementedError(f"Operation '{op}' is not supported by the zhinst backend.")
[docs]
def _get_instruction_list(
output_timing_table: pandas.DataFrame,
) -> list[zhinst.Instruction]:
"""
Iterates over a timing table for a specific output for which clock_cycle_start and
waveform_id have been determined to return a list of all instructions to be played
on a Zurich Instruments device.
"""
instruction_list: list[zhinst.Instruction] = []
for _, row in output_timing_table.iterrows():
if row.is_acquisition:
instruction_list.append(
zhinst.Acquisition(
waveform_id=row.waveform_id,
abs_time=row.abs_time,
duration=row.duration,
clock_cycle_start=row.clock_cycle_start,
)
)
else:
instruction_list.append(
zhinst.Wave(
waveform_id=row.waveform_id,
abs_time=row.abs_time,
duration=row.duration,
clock_cycle_start=row.clock_cycle_start,
)
)
return instruction_list
@dataclass(frozen=True)
[docs]
class ZIAcquisitionConfig:
"""
Zurich Instruments acquisition configuration.
Parameters
----------
bin_mode:
"""
"""The number of distinct acquisitions in this experiment."""
[docs]
resolvers: dict[int, Callable]
"""
Resolvers used to retrieve the results from the right UHFQA nodes.
See also :mod:`~quantify_scheduler.backends.zhinst.resolvers`
"""
[docs]
bin_mode: enums.BinMode
"""The bin mode of the acquisitions."""
[docs]
acq_protocols: dict[int, SUPPORTED_ACQ_PROTOCOLS]
"""Acquisition protocols for each acquisition channel."""
@dataclass(frozen=True)
[docs]
class ZIDeviceConfig:
"""Zurich Instruments device configuration."""
"""The name of the schedule the config is for."""
[docs]
settings_builder: zi_settings.ZISettingsBuilder
"""
The builder to configure the ZI settings.
The builder typically includes DAQ and AWG settings.
"""
[docs]
acq_config: ZIAcquisitionConfig | None
"""
Acquisition config for the schedule.
The acquisition config contains the number of acquisitions and a dictionary of
resolvers used to retrieve the results from the right UHFQA nodes.
Note that this part of the config is not needed during prepare, but only during
the retrieve acquisitions step.
"""
[docs]
def _generate_legacy_hardware_config( # noqa: PLR0912, PLR0915
schedule: Schedule,
compilation_config: CompilationConfig,
) -> dict:
"""
Extract the old-style Zhinst hardware config from the CompilationConfig.
Only the port-clok combinations that are used in the schedule are included in the
old-style hardware config.
Parameters
----------
schedule: Schedule
Schedule from which the port-clock combinations are extracted.
config: CompilationConfig
CompilationConfig from which hardware config is extracted.
Returns
-------
hardware_config : dict
Zhinst hardware configuration.
Raises
------
KeyError
If the CompilationConfig.connectivity does not contain a hardware config.
KeyError
If the 'frequency_param' is not specified for in a LO config.
ValueError
If a value is specified in both the hardware options and the hardware config.
RuntimeError
If no external local oscillator is found in the generated zhinst
hardware configuration.
"""
hardware_description = compilation_config.hardware_compilation_config.hardware_description
hardware_options = compilation_config.hardware_compilation_config.hardware_options
connectivity = compilation_config.hardware_compilation_config.connectivity
if isinstance(connectivity, dict):
if "graph" in connectivity:
raise KeyError(
"Connectivity contains a dictionary including a 'graph' key, most likely"
" because the networkx Graph object could not be parsed correctly."
)
return connectivity
port_clocks = schedule.get_used_port_clocks()
hardware_config: dict = {"devices": [], "local_oscillators": []}
hardware_config["backend"] = "quantify_scheduler.backends.zhinst_backend.compile_backend"
# Add connectivity information to the hardware config:
connectivity_graph = compilation_config.hardware_compilation_config.connectivity.graph
for port, clock in port_clocks:
connected_nodes = {}
for node in connectivity_graph:
if port in node:
connected_nodes = connectivity_graph[node]
break
for connected_node in connected_nodes:
io_path = connected_node.split(sep=".")
instrument = io_path[0]
lo_name = None
if hardware_description[instrument].instrument_type == "IQMixer":
# Find which lo is used for this IQ mixer
lo_name = list(connectivity_graph[instrument + ".lo"])[0].split(sep=".")[0]
# Find which instrument is connected to if port
io_path = list(connectivity_graph[instrument + ".if"])[0].split(sep=".")
# Set port-clock combination in io config:
instr_indices = [
i for i, v in enumerate(hardware_config["devices"]) if v["name"] == io_path[0]
]
if len(instr_indices) == 0:
hardware_config["devices"].append({"name": io_path[0]})
instr_indices = [-1]
instr_config = hardware_config["devices"][instr_indices[0]]
instrument_io = io_path[-1]
if instrument_io not in instr_config:
instr_config[instrument_io] = {}
if (old_port := instr_config[instrument_io].get("port")) or (
old_clock := instr_config[instrument_io].get("clock")
):
raise ValueError(
f"Trying to set port-clock combination {port}-{clock} for "
f"instrument channel {io_path} in the hardware config, while "
f"it has previously been set to {old_port}-{old_clock}."
)
instr_config[instrument_io]["port"] = port
instr_config[instrument_io]["clock"] = clock
if lo_name is not None:
instr_config[instrument_io]["local_oscillator"] = lo_name
# Sort the devices to ensure deterministic behaviour:
hardware_config["devices"] = sorted(hardware_config["devices"], key=lambda x: x["name"])
# Add info from hardware description to hardware config
if hardware_config.get("local_oscillators") is None:
hardware_config["local_oscillators"] = []
if hardware_config.get("devices") is None:
hardware_config["devices"] = []
for instr_name, instr_description in hardware_description.items():
if instr_description.instrument_type in ["UHFQA", "HDAWG4", "HDAWG8"]:
instr_indices = [
i for i, v in enumerate(hardware_config["devices"]) if v["name"] == instr_name
]
if len(instr_indices) > 1:
raise ValueError(
f"Device with name {instr_name} can only appear once in "
f"the hardware_config['devices'] list."
)
if len(instr_indices) == 0:
# Instrument is not in the hardware config, because it is not
# used in the schedule.
continue
instr_config = hardware_config["devices"][instr_indices[0]]
instr_config["type"] = instr_description.instrument_type
instr_config["ref"] = instr_description.ref
if instr_description.instrument_type in ["HDAWG4", "HDAWG8"]:
instr_config["channelgrouping"] = instr_description.channelgrouping
instr_config["clock_select"] = instr_description.clock_select
if instr_description.channel_0 is not None and "channel_0" in instr_config:
# Only propagate channel description settings if it was already
# added based on the Connectivity and the Schedule.
instr_config["channel_0"]["mode"] = instr_description.channel_0.mode
instr_config["channel_0"]["markers"] = instr_description.channel_0.markers
instr_config["channel_0"]["trigger"] = instr_description.channel_0.trigger
if (
instr_description.instrument_type in ["HDAWG4", "HDAWG8"]
and instr_description.channel_1 is not None
and "channel_1" in instr_config
):
# Only propagate channel description settings if it was already
# added based on the Connectivity and the Schedule.
instr_config["channel_1"]["mode"] = instr_description.channel_1.mode
instr_config["channel_1"]["markers"] = instr_description.channel_1.markers
instr_config["channel_1"]["trigger"] = instr_description.channel_1.trigger
if (
instr_description.instrument_type == "HDAWG8"
and instr_description.channel_2 is not None
and "channel_2" in instr_config
):
# Only propagate channel description settings if it was already
# added based on the Connectivity and the Schedule.
instr_config["channel_2"]["mode"] = instr_description.channel_2.mode
instr_config["channel_2"]["markers"] = instr_description.channel_2.markers
instr_config["channel_2"]["trigger"] = instr_description.channel_2.trigger
if (
instr_description.instrument_type == "HDAWG8"
and instr_description.channel_3 is not None
and "channel_3" in instr_config
):
# Only propagate channel description settings if it was already
# added based on the Connectivity and the Schedule.
instr_config["channel_3"]["mode"] = instr_description.channel_3.mode
instr_config["channel_3"]["markers"] = instr_description.channel_3.markers
instr_config["channel_3"]["trigger"] = instr_description.channel_3.trigger
elif instr_description.instrument_type == "LocalOscillator":
lo_indices = [
i
for i, v in enumerate(hardware_config["local_oscillators"])
if v["unique_name"] == instr_name
]
if len(lo_indices) > 1:
raise ValueError(
f"LocalOscillator with name {instr_name} appears multiple "
f"times in the hardware_config['local_oscillators'] list."
)
elif len(lo_indices) == 0:
hardware_config["local_oscillators"].append({})
lo_indices = [len(hardware_config["local_oscillators"]) - 1]
lo_config = hardware_config["local_oscillators"][lo_indices[0]]
lo_config["unique_name"] = instr_name
lo_config["instrument_name"] = instr_description.instrument_name
lo_config["frequency_param"] = instr_description.frequency_param
# Set the LO power in the LO config:
if "power" not in lo_config:
lo_config["power"] = {}
elif isinstance(lo_config["power"], int):
lo_config["power"] = {"power": lo_config["power"]}
lo_config["power"][instr_description.power_param] = instr_description.power
# Set hardware options in the hardware config
if hardware_options.latency_corrections is not None:
hardware_config["latency_corrections"] = hardware_options.model_dump()[
"latency_corrections"
]
if hardware_options.distortion_corrections is not None:
hardware_config["distortion_corrections"] = hardware_options.model_dump()[
"distortion_corrections"
]
modulation_frequencies = hardware_options.modulation_frequencies
if modulation_frequencies is not None:
for port, clock in port_clocks:
if (pc_mod_freqs := modulation_frequencies.get(f"{port}-{clock}")) is None:
# No modulation frequencies to set for this port-clock.
continue
# Find path to port-clock combination in the hardware config, e.g.,
# ["devices", 0, "channel_0"]
ch_path = find_port_clock_path(hardware_config=hardware_config, port=port, clock=clock)
# Extract channel config dict:
ch_config = hardware_config
for key in ch_path:
ch_config = ch_config[key]
if "modulation" not in ch_config:
# Initialize modulation config:
ch_config["modulation"] = {"type": "premod"}
# Set the interm_freq in the modulation config:
ch_config["modulation"]["interm_freq"] = pc_mod_freqs.interm_freq
# Find the LO config and add the frequency config:
lo_name: str = ch_config["local_oscillator"]
lo_configs: list = hardware_config.get("local_oscillators", [])
lo_config_found = False
for lo_config in lo_configs:
if lo_config["unique_name"] == lo_name:
lo_config_found = True
if "frequency_param" not in lo_config:
raise KeyError(
f"Frequency parameter for {lo_name} not found in the"
f" hardware config. Please specify it under the "
f" 'frequency_param' key in {lo_config=}."
)
if "frequency" not in lo_config:
# Initialize frequency config dict:
lo_config["frequency"] = {}
lo_freq_key = lo_config.get("frequency_param")
# Set the LO freq in the LO config:
lo_config["frequency"][lo_freq_key] = pc_mod_freqs.lo_freq
if not lo_config_found:
raise RuntimeError(
f"External local oscillator '{lo_name}' set to "
f"be used for {port=} and {clock=} not found! Make "
f"sure it is present in the hardware configuration."
)
mixer_corrections = hardware_options.mixer_corrections
if mixer_corrections is not None:
for port, clock in port_clocks:
if (pc_mix_corr := mixer_corrections.get(f"{port}-{clock}")) is None:
# No mixer corrections to set for this port-clock.
continue
# Find path to port-clock combination in the hardware config, e.g.,
# ["devices", 0, "channel_0"]
ch_path = find_port_clock_path(hardware_config=hardware_config, port=port, clock=clock)
# Extract channel config dict:
ch_config = hardware_config
for key in ch_path:
ch_config = ch_config[key]
# Set mixer corrections from hw options in channel config dict:
ch_config["mixer_corrections"] = pc_mix_corr.model_dump()
output_gain = hardware_options.output_gain
if output_gain is not None:
for port, clock in find_all_port_clock_combinations(hardware_config):
if (pc_output_gain := output_gain.get(f"{port}-{clock}")) is None:
# No modulation frequencies to set for this port-clock.
continue
# Find path to port-clock combination in the hardware config, e.g.,
# ["devices", 0, "channel_0"]
ch_path = find_port_clock_path(hardware_config=hardware_config, port=port, clock=clock)
# Extract instrument config and I/O channel config dicts:
instr_config = hardware_config
for key in ch_path[:-1]:
instr_config = instr_config[key]
ch_name = ch_path[-1]
ch_config = instr_config[ch_name]
# Set the output gain in the channel config:
ch_config["gain1"] = pc_output_gain.gain_I
ch_config["gain2"] = pc_output_gain.gain_Q
return hardware_config
[docs]
def _generate_new_style_hardware_compilation_config( # noqa: PLR0912, PLR0915
old_style_config: dict,
) -> dict:
"""
Generate a new-style ZIHardwareCompilationConfig from an old-style hardware config.
Parameters
----------
old_style_config
Old-style hardware config.
Returns
-------
dict
New-style hardware compilation config dictionary.
"""
# Avoid modifying the original config
old_style_config = deepcopy(old_style_config)
# Initialize input dicts (before validation)
hardware_description = {}
hardware_options = defaultdict(lambda: defaultdict(dict))
connectivity = {"graph": []}
# Loop over old-style config and populate new-style input dicts
for hw_cfg_key, hw_cfg_value in old_style_config.items():
if hw_cfg_key == "backend":
pass
elif hw_cfg_key in ["latency_corrections", "distortion_corrections"]:
hardware_options[hw_cfg_key] = hw_cfg_value
elif hw_cfg_key == "local_oscillators": # noqa: SIM114
for lo_cfg in hw_cfg_value:
power_cfg = lo_cfg.get("power")
if power_cfg:
power_param = list(power_cfg.keys())[0]
power = power_cfg[power_param]
else:
power_param = "power"
power = None
hardware_description[lo_cfg["unique_name"]] = {
"instrument_type": "LocalOscillator",
"instrument_name": lo_cfg.get("instrument_name"),
"generic_icc_name": lo_cfg.get("generic_icc_name"),
"frequency_param": lo_cfg.get("frequency_param", "frequency"),
"power_param": power_param,
"power": power,
}
elif hw_cfg_key == "devices":
for instr_cfg in hw_cfg_value:
if instr_cfg["type"] in ["HDAWG4", "HDAWG8"]:
# Hardware Description
hardware_description[instr_cfg["name"]] = {
"instrument_type": instr_cfg["type"],
"ref": instr_cfg["ref"],
"channelgrouping": instr_cfg["channelgrouping"],
"clock_select": instr_cfg["clock_select"],
}
for ch_idx in [0, 1, 2, 3]:
if ch_cfg := instr_cfg.get(f"channel_{ch_idx}"):
hardware_description[instr_cfg["name"]][f"channel_{ch_idx}"] = {
"mode": ch_cfg["mode"],
"markers": ch_cfg.get("markers", []),
"trigger": ch_cfg.get("trigger"),
}
else:
continue
# Connectivity
port = ch_cfg.get("port")
clock = ch_cfg.get("clock")
port_name = f"{instr_cfg['name']}.channel_{ch_idx}"
if ch_cfg.get("local_oscillator") is not None:
# Add IQ mixer to the hardware_description:
iq_mixer_id = f"iqm_{instr_cfg['name']}_ch{ch_idx}"
hardware_description[iq_mixer_id] = {"instrument_type": "IQMixer"}
# Add LO and IQ mixer to connectivity graph:
connectivity["graph"].extend(
[
(
port_name,
f"{iq_mixer_id}.if",
),
(
f"{ch_cfg['local_oscillator']}.output",
f"{iq_mixer_id}.lo",
),
]
)
# Overwrite port_name to IQ mixer RF output:
port_name = f"{iq_mixer_id}.rf"
# Set LO frequency in hardware options:
lo_cfg = [
lo_cfg
for lo_cfg in old_style_config["local_oscillators"]
if lo_cfg["unique_name"] == ch_cfg["local_oscillator"]
][0]
if "frequency" in lo_cfg:
hardware_options["modulation_frequencies"][f"{port}-{clock}"][
"lo_freq"
] = list(lo_cfg["frequency"].values())[0]
connectivity["graph"].append((port_name, port))
# Hardware Options
if ch_cfg.get("mixer_corrections"):
# Decapitalize DC offset keys:
mix_corr_lower_case = {}
for key in ch_cfg["mixer_corrections"]:
mix_corr_lower_case[key.lower()] = ch_cfg["mixer_corrections"][key]
hardware_options["mixer_corrections"][
f"{port}-{clock}"
] = mix_corr_lower_case
if ch_cfg.get("gain1"):
hardware_options["output_gain"][f"{port}-{clock}"]["gain_I"] = ch_cfg[
"gain1"
]
if ch_cfg.get("gain2"):
hardware_options["output_gain"][f"{port}-{clock}"]["gain_Q"] = ch_cfg[
"gain2"
]
if ch_cfg.get("modulation") and ch_cfg["modulation"]["type"] == "premod":
hardware_options["modulation_frequencies"][f"{port}-{clock}"][
"interm_freq"
] = ch_cfg["modulation"]["interm_freq"]
elif instr_cfg["type"] == "UHFQA":
# Hardware Description
hardware_description[instr_cfg["name"]] = {
"instrument_type": instr_cfg["type"],
"ref": instr_cfg["ref"],
}
if ch_cfg := instr_cfg.get("channel_0"):
hardware_description[instr_cfg["name"]]["channel_0"] = {
"mode": ch_cfg["mode"],
"markers": ch_cfg.get("markers", []),
"trigger": ch_cfg.get("trigger"),
}
# Connectivity
port = ch_cfg.get("port")
clock = ch_cfg.get("clock")
port_name = f"{instr_cfg['name']}.{'channel_0'}"
if ch_cfg.get("local_oscillator") is not None:
# Add IQ mixer to the hardware_description:
iq_mixer_id = f"iqm_{instr_cfg['name']}_ch0"
hardware_description[iq_mixer_id] = {"instrument_type": "IQMixer"}
# Add LO and IQ mixer to connectivity graph:
connectivity["graph"].extend(
[
(
port_name,
f"{iq_mixer_id}.if",
),
(
f"{ch_cfg['local_oscillator']}.output",
f"{iq_mixer_id}.lo",
),
]
)
# Overwrite port_name to IQ mixer RF output:
port_name = f"{iq_mixer_id}.rf"
# Set LO frequency in hardware options:
lo_cfg = [
lo_cfg
for lo_cfg in old_style_config["local_oscillators"]
if lo_cfg["unique_name"] == ch_cfg["local_oscillator"]
][0]
if "frequency" in lo_cfg:
hardware_options["modulation_frequencies"][f"{port}-{clock}"][
"lo_freq"
] = list(lo_cfg["frequency"].values())[0]
connectivity["graph"].append((port_name, port))
# Hardware Options
if ch_cfg.get("mixer_corrections"):
hardware_options["mixer_corrections"][f"{port}-{clock}"] = ch_cfg[
"mixer_corrections"
]
if ch_cfg.get("gain1"):
hardware_options["output_gain"][f"{port}-{clock}"]["gain_I"] = ch_cfg[
"gain1"
]
if ch_cfg.get("gain2"):
hardware_options["output_gain"][f"{port}-{clock}"]["gain_Q"] = ch_cfg[
"gain2"
]
if ch_cfg.get("modulation") and ch_cfg["modulation"]["type"] == "premod":
hardware_options["modulation_frequencies"][f"{port}-{clock}"][
"interm_freq"
] = ch_cfg["modulation"]["interm_freq"]
else:
raise ValueError(
f"Unexpected instrument type {instr_cfg['type']} in old-style "
f"hardware config."
)
elif hw_cfg_key == "mode":
pass
else:
warnings.warn(f"Skipping unknown key {hw_cfg_key} in old-style hardware config.")
return dict(
config_type="quantify_scheduler.backends.zhinst_backend.ZIHardwareCompilationConfig",
hardware_description=hardware_description,
hardware_options=hardware_options,
connectivity=connectivity,
)
[docs]
def flatten_schedule(schedule: Schedule, config: CompilationConfig | None = None) -> Schedule:
"""
Recursively flatten subschedules based on the absolute timing.
Parameters
----------
schedule : Schedule
schedule to be flattened
config : CompilationConfig | None, optional
Compilation config for
:class:`~quantify_scheduler.backends.graph_compilation.QuantifyCompiler`,
which is currently not only used to detect if the function is called directly.
by default None
Returns
-------
Schedule
Equivalent schedule without subschedules
"""
def _insert_op_at_time(schedule: Schedule, operation: Operation, abs_time: float) -> None:
new_key = str(uuid4())
new_schedulable = Schedulable(
name=new_key,
operation_id=operation.hash,
)
# Timing constraints in the new schedulable are meaningless, so remove the list
new_schedulable["timing_constraints"] = None
new_schedulable["abs_time"] = abs_time
schedule["operation_dict"][operation.hash] = operation
schedule["schedulables"][new_key] = new_schedulable
def _move_to_end(ordered_dict: dict, key: Any) -> None: # noqa: ANN401
"""
Moves the element with ``key`` to the end of the dict.
Note: dictionaries from Python 3.7 are ordered.
"""
value = ordered_dict.pop(key)
ordered_dict[key] = value
all_resources = dict(schedule.resources)
for op in schedule.operations.values():
if isinstance(op, ScheduleBase):
flatten_schedule(op, config)
all_resources.update(op.resources)
op_keys_to_pop = set()
schedulable_keys_to_pop = set()
# we cannot use .items() directly since we modify schedule.schedulables in the loop
schedulable_iter = tuple(schedule.schedulables.items())
for schedulable_key, schedulable in schedulable_iter:
op_key = schedulable["operation_id"]
op = schedule.operations[op_key]
if isinstance(op, ScheduleBase):
offset = schedulable["abs_time"]
# insert new schedulables shifted by the correct offset
for inner_schedulable in op.schedulables.values():
inner_op = op.operations[inner_schedulable["operation_id"]]
_insert_op_at_time(schedule, inner_op, inner_schedulable["abs_time"] + offset)
# mark the inner schedule for removal from the parent
op_keys_to_pop.add(op_key)
schedulable_keys_to_pop.add(schedulable_key)
else:
_move_to_end(schedule.schedulables, schedulable_key)
for key in op_keys_to_pop:
schedule["operation_dict"].pop(key)
for key in schedulable_keys_to_pop:
schedule["schedulables"].pop(key)
for resource in all_resources.values():
if resource.name not in schedule.resources:
schedule.add_resource(resource)
return schedule
[docs]
def _get_operations_by_repr(schedule: Schedule) -> dict[str, Operation]:
operations_dict_with_repr_keys = {str(op): op for op in schedule.operations.values()}
if len(schedule.operations) != len(operations_dict_with_repr_keys):
all_reprs = set()
colliding_ops = []
for operation in schedule.operations.values():
repr_key = str(operation)
if repr_key in all_reprs:
colliding_ops.append(operation)
all_reprs.add(repr_key)
raise ValueError(
"Multiple operations with different content have the same repr."
f"Colliding operations are {colliding_ops}."
)
return operations_dict_with_repr_keys
[docs]
def compile_backend( # noqa: PLR0912
schedule: Schedule,
config: CompilationConfig | dict[str, Any] | None = None,
# config can be Dict to support (deprecated) calling with hardware config
# as positional argument.
*, # Support for (deprecated) calling with hardware_cfg as keyword argument:
hardware_cfg: dict[str, Any] | None = None,
) -> CompiledSchedule:
"""
Compiles zhinst hardware instructions for a schedule.
This method generates sequencer programs, waveforms and
configurations required for the instruments defined in
the hardware configuration.
Parameters
----------
schedule :
The schedule to be compiled.
config
Compilation config for
:class:`~quantify_scheduler.backends.graph_compilation.QuantifyCompiler`.
hardware_cfg :
(deprecated) The hardware configuration of the setup. Pass a full compilation
config instead using ``config`` argument.
Returns
-------
:
A collection containing the compiled backend
configuration for each device.
Raises
------
NotImplementedError
Thrown when using unsupported ZI Instruments.
ValueError
When both ``config`` and ``hardware_cfg`` are supplied.
"""
if not ((config is not None) ^ (hardware_cfg is not None)):
raise ValueError(
f"Zhinst `{compile_backend.__name__}` was called with {config=} and "
f"{hardware_cfg=}. Please make sure this function is called with "
f"one of the two (CompilationConfig recommended)."
)
if not isinstance(config, CompilationConfig):
warnings.warn(
f"Zhinst `{compile_backend.__name__}` will require a full "
f"CompilationConfig as input as of quantify-scheduler >= 0.19.0",
FutureWarning,
)
if isinstance(config, CompilationConfig):
# Extract the hardware config from the CompilationConfig
hardware_cfg = _generate_legacy_hardware_config(
schedule=schedule, compilation_config=config
)
elif config is not None:
# Support for (deprecated) calling with hardware_cfg as positional argument.
hardware_cfg = config
_validate_schedule(schedule)
if "latency_corrections" in hardware_cfg:
# Important: currently only used to validate the input, should also be
# used for storing the latency corrections
# (see also https://gitlab.com/groups/quantify-os/-/epics/1)
common.HardwareOptions(latency_corrections=hardware_cfg["latency_corrections"])
if (distortion_corrections := hardware_cfg.get("distortion_corrections")) is not None:
replacing_schedule = apply_software_distortion_corrections(schedule, distortion_corrections)
if replacing_schedule is not None:
schedule = replacing_schedule
################################################
# Timing table manipulation
################################################
# the schedule has a Styled pandas dataframe as the return type.
# here we want to manipulate the data directly so we extract the raw dataframe.
timing_table = schedule.timing_table.data
# information is added on what output channel is used for every pulse and acq.
port_clock_channelmapping = _extract_port_clock_channelmapping(hardware_cfg)
timing_table = _add_channel_information(
timing_table=timing_table, port_clock_channelmapping=port_clock_channelmapping
)
# the timing of all pulses and acquisitions is corrected based on the latency corr.
latency_dict = determine_relative_latency_corrections(hardware_cfg)
timing_table = _apply_latency_corrections(timing_table=timing_table, latency_dict=latency_dict)
# ensure that operations are still sorted by time after applying the latency corr.
timing_table.sort_values("abs_time", inplace=True)
# add the sequencer clock cycle start and sampling start for the operations.
timing_table = _add_clock_sample_starts(timing_table=timing_table)
# After adjusting for the latencies, the fix-point correction can be applied.
# the fix-point correction has the goal to ensure that all measurement operations
# will always start at a multiple of *all* relevant clock domains.
# this is achieved by shifting all instructions between different measurements
# by the same amount of samples.
timing_table = _apply_measurement_fixpoint_correction(
timing_table=timing_table, common_frequency=600e6
)
# because of the shifting in time on a sub-clock delay, up to 8 distinct waveforms
# may be required to realize the identical pulse. Pre-modulation adds another
# variant depending on the starting phase of the operation.
timing_table = _add_waveform_ids(timing_table=timing_table)
ensure_no_operations_overlap(timing_table)
# Parse the hardware configuration file, zhinst.Device is a dataclass containing
# device descriptions (name, type, channels etc. )
devices: list[zhinst.Device] = _parse_devices(hardware_cfg["devices"])
local_oscillators: dict[str, zhinst.LocalOscillator] = _parse_local_oscillators(
hardware_cfg["local_oscillators"]
)
################################################
# Constructing the waveform table
################################################
device_dict = {}
for dev in devices:
device_dict[dev.name] = dev
operations_dict_with_repr_keys = _get_operations_by_repr(schedule)
numerical_wf_dict = construct_waveform_table(
timing_table,
operations_dict=operations_dict_with_repr_keys,
device_dict=device_dict,
)
################################################
# Above here is the layer that translates what should happen at the device to what
# output needs to be generated to realize that.
# COMPILATION SHOULD BE SPLIT HERE
# Below here is the layer that translates the timing table to instructions for the
# hardware.
################################################
################################################
# Assemble waveforms and timeline into seqc
################################################
# keys are instrument names, and the ZIDeviceConfig contain the settings incl seqc
# to configure.
device_configs: dict[str, ZIDeviceConfig | float] = dict()
for device in devices:
if device.device_type == zhinst.DeviceType.HDAWG:
builder = _compile_for_hdawg(
device=device,
timing_table=timing_table,
numerical_wf_dict=numerical_wf_dict,
repetitions=schedule.repetitions,
)
acq_config: ZIAcquisitionConfig | None = None
elif device.device_type == zhinst.DeviceType.UHFQA:
acq_metadata = schedule_helpers.extract_acquisition_metadata_from_schedule(schedule)
bin_mode = acq_metadata.bin_mode
builder, acq_config = _compile_for_uhfqa(
device=device,
timing_table=timing_table,
numerical_wf_dict=numerical_wf_dict,
repetitions=schedule.repetitions,
operations=operations_dict_with_repr_keys,
bin_mode=bin_mode,
)
else:
raise NotImplementedError(f"{device.device_type} not supported.")
device_configs[device.name] = ZIDeviceConfig(device.name, builder, acq_config)
# add the local oscillator config by iterating over all output channels.
# note that not all output channels have an LO associated to them.
for channel in device.channels:
_add_lo_config(
channel=channel,
local_oscillators=local_oscillators,
device_configs=device_configs,
resources=schedule.resources,
)
# Create compiled instructions key if not already present. This can happen if this
# compilation function is called directly instead of through a `QuantifyCompiler`.
if "compiled_instructions" not in schedule:
schedule["compiled_instructions"] = {}
schedule["compiled_instructions"].update(device_configs)
schedule._hardware_timing_table = timing_table
schedule._hardware_waveform_dict = numerical_wf_dict
compiled_schedule = CompiledSchedule(schedule)
return compiled_schedule
[docs]
class ZIHardwareCompilationConfig(common.HardwareCompilationConfig):
"""
Datastructure containing the information needed to compile to the Zurich Instruments backend.
This information is structured in the same way as in the generic
:class:`~quantify_scheduler.backends.types.common.HardwareCompilationConfig`, but
contains fields for hardware-specific settings.
"""
[docs]
config_type: type[ZIHardwareCompilationConfig] = Field(
default="quantify_scheduler.backends.zhinst_backend.ZIHardwareCompilationConfig",
validate_default=True,
)
"""
A reference to the ``HardwareCompilationConfig`` DataStructure for the
Zurich Instruments backend.
"""
[docs]
hardware_description: dict[
str,
zhinst.ZIHardwareDescription | common.HardwareDescription,
]
"""Description of the instruments in the physical setup."""
[docs]
hardware_options: zhinst.ZIHardwareOptions
"""
Options that are used in compiling the instructions for the hardware, such as
:class:`~quantify_scheduler.backends.types.common.LatencyCorrection` or
:class:`~quantify_scheduler.backends.types.zhinst.OutputGain`.
"""
[docs]
compilation_passes: list[SimpleNodeConfig] = [
SimpleNodeConfig(name="flatten_schedule", compilation_func=flatten_schedule),
SimpleNodeConfig(name="zhinst_hardware_compile", compilation_func=compile_backend),
]
"""
The list of compilation nodes that should be called in succession to compile a
schedule to instructions for the Zurich Instruments hardware.
"""
@model_validator(mode="before")
@classmethod
[docs]
def from_old_style_hardware_config(
cls: type[ZIHardwareCompilationConfig], data: Any # noqa: ANN401
) -> Any: # noqa: ANN401
"""Convert old style hardware config dict to new style before validation."""
if (
isinstance(data, dict)
and data.get("backend") == "quantify_scheduler.backends.zhinst_backend.compile_backend"
):
# Input is an old style ZI hardware config dict
data = _generate_new_style_hardware_compilation_config(data)
return data
[docs]
def _add_lo_config( # noqa: PLR0912
channel: zhinst.Output,
local_oscillators: dict[str, zhinst.LocalOscillator],
resources: dict[str, Resource],
device_configs: dict[str, ZIDeviceConfig | float],
) -> None:
"""
Adds configuration for a local oscillator required for a specific output channel to
the device configs.
"""
if channel.local_oscillator is None:
# no local oscillator is associated to this channel.
return
unique_name = channel.local_oscillator
if unique_name not in local_oscillators:
raise KeyError(f'Missing configuration for LocalOscillator "{unique_name}"')
local_oscillator = local_oscillators[unique_name]
# the frequencies from the config file
((lo_freq_key, lo_freq_val),) = local_oscillator.frequency.items()
# Get the power of the local oscillator
if local_oscillator.power:
((power_key, power_val),) = local_oscillator.power.items()
# Get the phase of the local oscillator
if local_oscillator.phase:
((phase_key, phase_val),) = local_oscillator.phase.items()
interm_freq = channel.modulation.interm_freq
if (lo_freq_val is not None) and (interm_freq is not None):
rf_freq = lo_freq_val + interm_freq
else:
channel_clock_resource = resources.get(channel.clock)
if channel_clock_resource is not None:
rf_freq = channel_clock_resource.get("freq")
else:
# no clock is specified for this channel.
# this can happen for e.g., baseband pulses or when the channel is not used
# in the schedule.
return
if lo_freq_val is None and interm_freq is not None:
lo_freq_val = rf_freq - interm_freq
local_oscillator.frequency[lo_freq_key] = lo_freq_val
elif interm_freq is None and lo_freq_val is not None:
interm_freq = rf_freq - lo_freq_val
channel.modulation.interm_freq = interm_freq
elif interm_freq is None and lo_freq_val is None:
raise ValueError(
"Either local oscillator frequency or channel intermediate frequency "
f'must be set for LocalOscillator "{unique_name}"'
)
if (
local_oscillator.unique_name in device_configs
and device_configs[local_oscillator.unique_name].get("frequency") != lo_freq_val
):
# the device_config currently only contains the frequency
raise ValueError(f'Multiple frequencies assigned to LocalOscillator "{unique_name}"')
lo_config = {
f"{local_oscillator.instrument_name}.{lo_freq_key}": lo_freq_val,
}
if local_oscillator.power:
lo_config[f"{local_oscillator.instrument_name}.{power_key}"] = power_val
if local_oscillator.phase:
lo_config[f"{local_oscillator.instrument_name}.{phase_key}"] = phase_val
# This line detects if the generic_icc_name exists in the local_oscillator entry of
# the hardware_config. If it exists, then, it takes the entry value, if not, the
# generic_icc_name takes the default value from the generic icc base module.
if local_oscillator.generic_icc_name:
generic_icc_name = local_oscillator.generic_icc_name
else:
generic_icc_name = GENERIC_ICC_DEFAULT_NAME
if generic_icc_name in device_configs:
device_configs[generic_icc_name].update(lo_config)
else:
device_configs[generic_icc_name] = lo_config
[docs]
def _add_wave_nodes(
device_type: zhinst.DeviceType,
awg_index: int,
wf_id_mapping: dict[str, int],
numerical_wf_dict: dict[str, np.ndarray],
settings_builder: zi_settings.ZISettingsBuilder,
) -> zi_settings.ZISettingsBuilder:
for wf_id, wf_index in wf_id_mapping.items():
if wf_id not in numerical_wf_dict:
# this is to catch an edge-case where certain acquisitions do not set
# integration weights. Ideally, these should be filtered before the wf_id
# is added to the wf_id_mapping, but it is easier to catch here.
continue
numerical_waveform = numerical_wf_dict[wf_id]
waveform = Waveform(numerical_waveform.real, numerical_waveform.imag)
if device_type == zhinst.DeviceType.UHFQA:
settings_builder.with_csv_wave_vector(awg_index, wf_index, waveform.data)
else:
settings_builder.with_wave_vector(awg_index, wf_index, waveform.data)
return settings_builder
[docs]
def _compile_for_hdawg(
device: zhinst.Device,
timing_table: pandas.DataFrame,
numerical_wf_dict: dict[str, np.ndarray],
repetitions: int,
) -> zi_settings.ZISettingsBuilder:
"""
Parameters
----------
device :
timing_table :
numerical_wf_dict :
repetitions :
Raises
------
ValueError
"""
# calculating duration over all operations instead of only the last ensures a
# long operation near the end does not get overlooked.
schedule_duration = (timing_table.abs_time + timing_table.duration).max()
########################################
# Add standard settings to builder
########################################
settings_builder = zi_settings.ZISettingsBuilder()
n_awgs: int = int(device.n_channels / 2)
settings_builder.with_defaults(
[
("sigouts/*/on", 0),
("awgs/*/single", 1),
]
).with_system_channelgrouping(device.channelgrouping)
# Set the clock-rate of an AWG
for awg_index in range(n_awgs):
settings_builder.with_awg_time(awg_index, device.clock_select) # type: ignore
# device.type is either HDAWG8 or HDAWG4
channelgroups_mode = HDAWG_DEVICE_TYPE_CHANNEL_GROUPS[device.type]
# Defaults to mode =0 -> value = 2 -> sequencers control pairs of channels
channelgroups_value = channelgroups_mode[device.channelgrouping]
sequencer_step = int(channelgroups_value / 2) # nr of awg pairs per sequencer
# the index of the last sequencer to configure
# N.B. 8-11-2021 the min(len(device.channels)) might make the wrong choice when
# using only awgs 2 and 3. To be tested.
sequencer_stop = min(len(device.channels), int(n_awgs / sequencer_step))
logger.debug(
f"HDAWG[{device.name}] devtype={device.device_type} " + f" awg_count={n_awgs} {str(device)}"
)
enabled_outputs: dict[int, zhinst.Output] = dict()
for i, awg_index in enumerate(range(0, sequencer_stop, sequencer_step)):
# here Output corresponds to an awg unit or a channel pair
# and is a dataclass containing info on port, clock, gain etc.
output = device.channels[i]
if output is None:
raise ValueError(f"Required output at index '{i}' is undefined!")
logger.debug(f"[{device.name}-awg{awg_index}] enabling outputs...")
mixer_corrections = (
output.mixer_corrections
if output.mixer_corrections is not None
else common.MixerCorrections()
)
settings_builder.with_sigouts(awg_index, (1, 1)).with_gain(
awg_index, (output.gain1, output.gain2)
).with_sigout_offset(int(awg_index * 2), mixer_corrections.dc_offset_i).with_sigout_offset(
int(awg_index * 2) + 1, mixer_corrections.dc_offset_q
)
enabled_outputs[awg_index] = output
############################################
# Add seqc instructions and waveform table
############################################
for awg_index, output in enabled_outputs.items():
# select only the instructions relevant for the output channel.
output_timing_table = timing_table[
timing_table["hardware_channel"] == f"{device.name}.awg{awg_index}"
]
instructions: list[zhinst.Instruction] = _get_instruction_list(output_timing_table)
# enumerate the waveform_ids used in this particular output channel
unique_wf_ids = output_timing_table.drop_duplicates(subset="waveform_id")["waveform_id"]
# this table maps waveform ids to indices in the seqc command table.
wf_id_mapping = {}
for i, wf_id in enumerate(unique_wf_ids):
wf_id_mapping[wf_id] = i
# Step 1: Generate and compile sequencer program AND
# Step 2: Set CommandTable JSON vector
(seqc, commandtable_json) = _assemble_hdawg_sequence(
instructions=instructions,
wf_id_mapping=wf_id_mapping,
numerical_wf_dict=numerical_wf_dict,
repetitions=repetitions,
schedule_duration=schedule_duration,
markers=device.channels[awg_index].markers,
trigger=device.channels[awg_index].trigger,
)
logger.debug(seqc)
logger.debug(commandtable_json)
settings_builder.with_commandtable_data(awg_index, commandtable_json)
settings_builder.with_compiler_sourcestring(awg_index, seqc)
#######################################################
# Set waveforms to wave nodes in the settings builder
#######################################################
# Step 3: Upload waveforms to AWG CommandTable
_add_wave_nodes(
device_type=zhinst.DeviceType.HDAWG,
awg_index=awg_index,
wf_id_mapping=wf_id_mapping,
numerical_wf_dict=numerical_wf_dict,
settings_builder=settings_builder,
)
return settings_builder
[docs]
def _assemble_hdawg_sequence(
instructions: list[zhinst.Instruction],
wf_id_mapping: dict[str, int],
numerical_wf_dict: dict[str, np.ndarray],
repetitions: int,
schedule_duration: float,
markers: str | int | None = None,
trigger: int = None,
) -> tuple[str, str]:
""" """
seqc_instructions = ""
commandtable_json = str({})
seqc_gen = seqc_il_generator.SeqcILGenerator()
# Declare sequence variables
seqc_gen.declare_var("__repetitions__", repetitions)
###############################################################
# Generate the command table and waveforms
###############################################################
command_table_entries: list[zhinst.CommandTableEntry] = list()
for waveform_id, waveform_index in wf_id_mapping.items():
name: str = f"w{waveform_index}"
waveform = numerical_wf_dict[waveform_id]
# Create and add variables to the Sequence program
# as well as assign the variables with operations
seqc_gen.declare_wave(name)
seqc_gen.assign_placeholder(name, len(waveform))
seqc_gen.emit_assign_wave_index(name, name, index=waveform_index)
# Do bookkeeping for the CommandTable
command_table_entry = zhinst.CommandTableEntry(
index=len(command_table_entries),
waveform=zhinst.CommandTableWaveform(index=waveform_index, length=len(waveform)),
)
command_table_entries.append(command_table_entry)
command_table = zhinst.CommandTable(table=command_table_entries)
###############################################################
# Add the loop that executes the program.
###############################################################
# N.B. All HDAWG markers can be used to trigger a UHFQA or other HDAWGs.
# marker output is set to 0 before the loop is started
seqc_il_generator.add_set_trigger(seqc_gen, value=0, device_type=zhinst.DeviceType.HDAWG)
seqc_gen.emit_begin_repeat("__repetitions__")
current_clock: int = 0
# set markers to high at the start of the repeition if this is the primary
# channel or wait for an external trigger if this is a secondary channel
if markers is not None and len(markers) > 0:
current_clock += seqc_il_generator.add_set_trigger(
seqc_gen,
value=markers,
device_type=zhinst.DeviceType.HDAWG,
)
elif trigger is not None:
assert trigger in [1, 2]
seqc_gen.emit_wait_dig_trigger(
index=trigger,
comment=f"\t// clock={current_clock}",
device_type=zhinst.DeviceType.HDAWG,
)
else:
# If the hardware config does not provide any settings assume this is a
# primary HDAWG channel and send triggers on all channels
current_clock += seqc_il_generator.add_set_trigger(
seqc_gen,
value=["AWG_MARKER1", "AWG_MARKER2"],
device_type=zhinst.DeviceType.HDAWG,
)
# this is where a longer wait statement is added to allow for latency corrections.
for instruction in instructions:
assert isinstance(instruction, zhinst.Wave)
clock_cycles_to_wait = instruction.clock_cycle_start - current_clock
if clock_cycles_to_wait < 0:
# a common mistake if there is no overlap if the instruction needs to start
# to soon after the start of a new cycle.
raise ValueError("Negative wait time, please ensure operations do not overlap in time.")
current_clock += seqc_il_generator.add_wait(
seqc_gen=seqc_gen,
delay=int(clock_cycles_to_wait),
device_type=zhinst.DeviceType.HDAWG,
comment=f"clock={current_clock}",
)
current_clock += seqc_il_generator.add_execute_table_entry(
seqc_gen=seqc_gen,
index=wf_id_mapping[instruction.waveform_id],
device_type=zhinst.DeviceType.HDAWG,
comment=f"clock={current_clock}",
)
current_clock += seqc_il_generator.add_set_trigger(
seqc_gen,
value=0,
device_type=zhinst.DeviceType.HDAWG,
comment=f"clock={current_clock}",
)
# clock rate = 2.4e9/8 for HDAWG
clock_rate = DEVICE_SAMPLING_RATES[zhinst.DeviceType.HDAWG][0] / CLOCK_SAMPLE_FACTOR
total_duration_in_clocks = int(schedule_duration * clock_rate)
clock_cycles_to_wait = total_duration_in_clocks - current_clock
if trigger is None:
current_clock += seqc_il_generator.add_wait(
seqc_gen=seqc_gen,
delay=int(clock_cycles_to_wait),
device_type=zhinst.DeviceType.HDAWG,
comment=(f"clock={current_clock}, dead time to ensure total schedule duration"),
)
seqc_gen.emit_end_repeat()
seqc_instructions = seqc_gen.generate()
commandtable_json = command_table.model_dump_json()
return seqc_instructions, commandtable_json
[docs]
def _compile_for_uhfqa( # noqa: PLR0915
device: zhinst.Device,
timing_table: pandas.DataFrame,
numerical_wf_dict: dict[str, np.ndarray],
repetitions: int,
operations: dict[str, Operation],
bin_mode: enums.BinMode,
) -> tuple[zi_settings.ZISettingsBuilder, ZIAcquisitionConfig]:
"""
Initialize programming the UHFQA ZI Instrument.
Creates a sequence program and converts schedule
pulses to waveforms for the UHFQA.
Parameters
----------
device :
timing_table :
numerical_wf_dict :
repetitions :
operations :
Returns
-------
:
"""
########################################
# Add standard settings to builder
########################################
settings_builder = zi_settings.ZISettingsBuilder()
instrument_info = zhinst.InstrumentInfo(
sample_rate=device.sample_rate,
num_samples_per_clock=CLOCK_SAMPLE_FACTOR,
granularity=WAVEFORM_GRANULARITY[device.device_type],
)
channels = device.channels
channels = list(filter(lambda c: c.mode == zhinst.SignalModeType.REAL, channels))
awg_index = 0
channel = channels[awg_index]
logger.debug(f"[{device.name}-awg{awg_index}] {str(device)}")
mixer_corrections = (
channel.mixer_corrections
if channel.mixer_corrections is not None
else common.MixerCorrections()
)
# Set all integration weigths to default
settings_builder.with_defaults(
[
("awgs/0/single", 1),
("qas/0/rotations/*", (1 + 1j)),
("qas/0/integration/sources/*", 0),
]
).with_sigouts(0, (1, 1)).with_awg_time(
0, device.clock_select
).with_qas_integration_weights_real(
channels=list(range(NUM_UHFQA_READOUT_CHANNELS)),
real=np.zeros(MAX_QAS_INTEGRATION_LENGTH),
).with_qas_integration_weights_imag(
channels=list(range(NUM_UHFQA_READOUT_CHANNELS)),
imag=np.zeros(MAX_QAS_INTEGRATION_LENGTH),
).with_sigout_offset(
0, mixer_corrections.dc_offset_i
).with_sigout_offset(
1, mixer_corrections.dc_offset_q
)
logger.debug(f"[{device.name}-awg{awg_index}] channel={str(channel)}")
############################################
# Add seqc instructions and waveform table
############################################
# select only the instructions relevant for the output channel.
output_timing_table = timing_table[
timing_table["hardware_channel"] == f"{device.name}.awg{awg_index}"
]
instructions: list[zhinst.Instruction] = _get_instruction_list(output_timing_table)
# FIXME ensure unique_wf_ids is only for pulses and not integration weights
# enumerate the waveform_ids used in this particular output channel
unique_wf_ids = output_timing_table.drop_duplicates(subset="waveform_id")["waveform_id"]
# this table maps waveform ids to indices in the seqc command table.
wf_id_mapping = {}
for i, wf_id in enumerate(unique_wf_ids):
wf_id_mapping[wf_id] = i
# # Generate and apply sequencer program
seqc = _assemble_uhfqa_sequence(
instructions=instructions,
wf_id_mapping=wf_id_mapping,
repetitions=repetitions,
device_name=device.name,
trigger=device.channels[awg_index].trigger,
)
settings_builder.with_compiler_sourcestring(awg_index, seqc)
logger.debug(seqc)
#######################################################
# Set waveforms to wave nodes in the settings builder
#######################################################
# Apply waveforms to AWG
settings_builder = _add_wave_nodes(
device_type=zhinst.DeviceType.UHFQA,
awg_index=0,
wf_id_mapping=wf_id_mapping,
numerical_wf_dict=numerical_wf_dict,
settings_builder=settings_builder,
)
#######################################################
# Set integration weights and configure acquisitions
#######################################################
# Get a list of all acquisition protocol channels
acq_channel_resolvers_map: dict[int, Callable[..., Any]] = dict()
# select only the acquisition operations relevant for the output channel.
timing_table_acquisitions = output_timing_table[output_timing_table.is_acquisition]
timing_table_unique_acquisitions = timing_table_acquisitions.drop_duplicates(
subset="waveform_id"
)
n_unique_acquisitions = len(timing_table_acquisitions)
if bin_mode == enums.BinMode.AVERAGE:
n_acquisitions = n_unique_acquisitions
elif bin_mode == enums.BinMode.APPEND:
n_acquisitions = n_unique_acquisitions * repetitions
repetitions = 1
else:
raise NotImplementedError(f"BinMode {bin_mode} is not supported.")
# These variables have to be identical for all acquisitions.
# initialized to None here and overwritten while iterating over the acquisitions.
acq_duration: float = np.nan
# a list of used acquisition channels, this is used to raise an exception
# when multiple acquisitions assign to the same channel.
acq_channels_used: list[int] = []
acq_protocols: dict[int, SUPPORTED_ACQ_PROTOCOLS] = {}
for _, acq_row in timing_table_unique_acquisitions.iterrows():
acquisition = operations[acq_row.operation]
wf_id = acq_row.wf_idx
acq_info = acquisition.data["acquisition_info"][acq_row.wf_idx]
# update acq_duration only if it was not set before
acq_duration = acq_info["duration"] if np.isnan(acq_duration) else acq_duration
# verify that the both durations are identical, if not raise an exception
# this exception relates to a limitation of the hardware.
if acq_duration != acq_info["duration"]:
raise ValueError(
f"Different acquisitions have a different duration "
f"{acq_duration*1e9:.1f}ns and {acq_info['duration']*1e9:.1f}ns. "
"The integration lenght needs to be identical for all acquisitions."
)
if (acq_protocol := acq_info["protocol"]) not in get_args(SUPPORTED_ACQ_PROTOCOLS):
raise ValueError(
f"Acquisition protocol {acq_protocol} is not supported by"
" Zurich Instruments LabOne backend."
)
acq_protocol: SUPPORTED_ACQ_PROTOCOLS
acq_channel: int = acq_info["acq_channel"]
acq_protocols[acq_channel] = acq_protocol
if acq_channel not in acq_channels_used:
acq_channels_used.append(acq_channel)
else:
raise ValueError(
f"Acquisition channel {acq_channel} is already used by another "
"acquisition. Different acquisitions should use a unique "
"acquisition channel."
f"Offending acquisition ({acq_row.waveform_id})"
)
integration_length = round(acq_duration * instrument_info.sample_rate)
logger.debug(
f"[{device.name}] acq_info={acq_info} "
+ f" acq_duration={acq_duration} integration_length={integration_length}"
)
if acq_protocol == "Trace":
# Disable Weighted integration because we'd like to see
# the raw signal.
settings_builder.with_qas_monitor_enable(True).with_qas_monitor_averages(
repetitions
).with_qas_monitor_length(integration_length).with_qas_integration_weights_real(
list(range(NUM_UHFQA_READOUT_CHANNELS)),
np.ones(MAX_QAS_INTEGRATION_LENGTH),
).with_qas_integration_weights_imag(
list(range(NUM_UHFQA_READOUT_CHANNELS)),
np.ones(MAX_QAS_INTEGRATION_LENGTH),
)
monitor_nodes = (
"qas/0/monitor/inputs/0/wave",
"qas/0/monitor/inputs/1/wave",
)
acq_channel_resolvers_map[acq_channel] = partial(
resolvers.monitor_acquisition_resolver, monitor_nodes=monitor_nodes
)
else:
# The waveform is slightly larger then the integration_length
# because of the waveform granularity. This is irrelevant
# due to the waveform being appended with zeros. Therefore
# avoiding an extra slice of waveform[0:integration_length]
acquisition_waveform = numerical_wf_dict[acq_row.waveform_id]
weights_i = np.zeros(MAX_QAS_INTEGRATION_LENGTH)
weights_q = np.zeros(MAX_QAS_INTEGRATION_LENGTH)
weights_i[0 : len(acquisition_waveform)] = acquisition_waveform.real
weights_q[0 : len(acquisition_waveform)] = acquisition_waveform.imag
# set the integration weights, note that we need to set 4 weights in order
# to use a complex valued weight function in the right way.
# Z = (w0*sI + w1*sQ) + 1j ( w1*sI - w0 * sQ)
settings_builder.with_qas_integration_weights_real(
2 * acq_channel, list(weights_i)
).with_qas_integration_weights_imag(
2 * acq_channel, list(weights_q)
).with_qas_integration_weights_real(
2 * acq_channel + 1, list(weights_q)
).with_qas_integration_weights_imag(
2 * acq_channel + 1, list(-1 * weights_i)
)
# Create partial function for delayed execution
acq_channel_resolvers_map[acq_channel] = partial(
resolvers.result_acquisition_resolver,
result_nodes=[
f"qas/0/result/data/{2*acq_channel}/wave",
f"qas/0/result/data/{2*acq_channel+1}/wave",
],
)
# only configure these variables if there are actually acquisitions present in
# the schedule.
if len(timing_table_unique_acquisitions) > 0:
integration_length = round(acq_duration * instrument_info.sample_rate)
settings_builder.with_qas_integration_mode(
zhinst.QasIntegrationMode.NORMAL
).with_qas_integration_length(integration_length).with_qas_result_enable(
False
).with_qas_monitor_enable(
False
).with_qas_delay(
0
)
settings_builder.with_qas_result_mode(zhinst.QasResultMode.CYCLIC).with_qas_result_source(
zhinst.QasResultSource.INTEGRATION
).with_qas_result_length(n_acquisitions).with_qas_result_enable(
True
).with_qas_result_averages(
repetitions
)
settings_builder.with_qas_result_reset(0).with_qas_result_reset(1)
settings_builder.with_qas_monitor_reset(0).with_qas_monitor_reset(1)
return (
settings_builder,
ZIAcquisitionConfig(
n_unique_acquisitions,
resolvers=acq_channel_resolvers_map,
bin_mode=bin_mode,
acq_protocols=acq_protocols,
),
)
[docs]
def _assemble_uhfqa_sequence(
instructions: list[zhinst.Instruction],
wf_id_mapping: dict[str, int],
repetitions: int,
device_name: str,
trigger: int = 2,
) -> str:
""" """
seqc_instructions = ""
seqc_gen = seqc_il_generator.SeqcILGenerator()
# Declare sequence variables
seqc_gen.declare_var("__repetitions__", repetitions)
current_clock: int = 0
###############################################################
# Generate the .csv based waveform table
###############################################################
seqc_il_generator.declare_csv_waveform_variables(
seqc_gen=seqc_gen,
device_name=device_name,
waveform_indices=list(wf_id_mapping.values()),
awg_index=0,
)
###############################################################
# Add the loop that executes the program.
###############################################################
seqc_gen.emit_begin_repeat("__repetitions__")
# N.B.! The UHFQA will always need to be triggered by an external device such as
# an HDAWG or a trigger box. It will wait for a trigger.
# Triggers must be a list but we may only wait for one so lets choose the
# first one in the list, I guess.
# This does not account for dio ports. Which are not implemented in the current
# version.
assert trigger < 5 and trigger > 0
assert trigger is not None
seqc_gen.emit_wait_dig_trigger(
index=trigger,
comment=f"\t// clock={current_clock}",
device_type=zhinst.DeviceType.UHFQA,
)
# this is where a longer wait statement is added to allow for latency corrections.
for instruction in instructions:
clock_cycles_to_wait = instruction.clock_cycle_start - current_clock
if clock_cycles_to_wait < 0:
# a common mistake if there is no overlap if the instruction needs to start
# to soon after the start of a new cycle.
raise ValueError("Negative wait time, please ensure operations do not overlap in time.")
current_clock += seqc_il_generator.add_wait(
seqc_gen=seqc_gen,
delay=int(clock_cycles_to_wait),
device_type=zhinst.DeviceType.UHFQA,
comment=f"clock={current_clock}",
)
# Acquisition
if isinstance(instruction, zhinst.Acquisition):
current_clock += seqc_il_generator.add_start_qa(
seqc_gen=seqc_gen,
device_type=zhinst.DeviceType.UHFQA,
comment=f"clock={current_clock}",
)
# Waveform
elif isinstance(instruction, zhinst.Wave):
current_clock += seqc_il_generator.add_play_wave(
seqc_gen,
f"w{wf_id_mapping[instruction.waveform_id]}",
device_type=zhinst.DeviceType.UHFQA,
comment=f"clock={current_clock}",
)
seqc_gen.emit_end_repeat()
seqc_instructions = seqc_gen.generate()
return seqc_instructions