# Repository: https://gitlab.com/quantify-os/quantify-scheduler
# Licensed according to the LICENCE file on the main branch
"""Backend for Zurich Instruments."""
# pylint: disable=too-many-lines
from __future__ import annotations
import logging
import re
from dataclasses import dataclass
from functools import partial
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union
import numpy as np
import pandas as pd
from zhinst.toolkit.helpers import Waveform
from quantify_scheduler import enums
from quantify_scheduler.backends.corrections import (
apply_distortion_corrections,
determine_relative_latencies,
LatencyCorrections,
)
from quantify_scheduler.backends.types import common, zhinst
from quantify_scheduler.backends.zhinst import helpers as zi_helpers
from quantify_scheduler.backends.zhinst import resolvers, seqc_il_generator
from quantify_scheduler.backends.zhinst import settings as zi_settings
from quantify_scheduler.helpers import waveforms as waveform_helpers
from quantify_scheduler.helpers import schedule as schedule_helpers
from quantify_scheduler.instrument_coordinator.components.generic import (
DEFAULT_NAME as generic_icc_default_name,
)
from quantify_scheduler.operations.operation import Operation
from quantify_scheduler.resources import Resource
from quantify_scheduler.schedules.schedule import CompiledSchedule, Schedule
[docs]logger = logging.getLogger(__name__)
[docs]handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
# List of supported zhinst devices
[docs]SUPPORTED_DEVICE_TYPES: List[str] = ["HDAWG", "UHFQA"]
# https://www.zhinst.com/sites/default/files/documents/2020-09/ziHDAWG_UserManual_20.07.1.pdf
# Section: 3.4. Basic Qubit Characterization, page 83
# All waveform lengths need to be multiples of 16 sample-clock cycles to comply
# with the waveform granularity specification.
# https://www.zhinst.com/sites/default/files/documents/2020-09/ziHDAWG_UserManual_20.07.2.pdf
# page: 262
[docs]HDAWG_DEVICE_TYPE_CHANNEL_GROUPS: Dict[str, Dict[int, int]] = {
"HDAWG8": {
# Use the outputs in groups of 2. One sequencer program controls 2 outputs.
0: 2,
# Use the outputs in groups of 4. One sequencer program controls 4 outputs.
1: 4,
# Use the outputs in groups of 8. One sequencer program controls 8 outputs.
2: 8,
},
"HDAWG4": {
# Use the outputs in groups of 2. One sequencer program controls 2 outputs.
0: 2,
# Use the outputs in groups of 4. One sequencer program controls 4 outputs.
1: 4,
},
}
[docs]DEVICE_SAMPLING_RATES: Dict[zhinst.DeviceType, Dict[int, int]] = {
zhinst.DeviceType.HDAWG: zi_helpers.get_sampling_rates(2.4e9),
zhinst.DeviceType.UHFQA: zi_helpers.get_sampling_rates(1.8e9),
}
# The sequencer clock rate always is 1/8 of the sampling rate
# (Same for UHFQA and HDAWG as of 2 Nov 2021)
[docs]NUM_UHFQA_READOUT_CHANNELS = 10
[docs]MAX_QAS_INTEGRATION_LENGTH = 4096
[docs]def ensure_no_operations_overlap(timing_table: pd.DataFrame):
"""
Iterates over all hardware_channels in a schedule to determine if any of the pulses
have overlap.
Parameters
----------
timing_table:
a timing table containing the absolute time and duration as well as the hardware
channels on which these pulses are to be applied.
Raises
------
ValueError
If there is overlap between operations.
"""
for output_ch in timing_table.hardware_channel.unique():
if output_ch is None:
continue
tt_output_ch = timing_table[timing_table["hardware_channel"] == output_ch]
tt_output_ch = tt_output_ch[tt_output_ch.is_acquisition != False]
end_time = tt_output_ch["abs_time"] + tt_output_ch["duration"]
# if any have overlap
if (end_time.shift(1) > tt_output_ch["abs_time"]).any():
clashing_ops = tt_output_ch[(end_time.shift(1) > tt_output_ch["abs_time"])]
clashing_op = clashing_ops.iloc[0]
preceding_op = tt_output_ch.loc[clashing_op.name - 1]
raise ValueError(
f"Operation {clashing_op.operation} at time"
f" {clashing_op.abs_time*1e9:.1f} ns "
f"overlaps with {preceding_op.operation} at "
f"time {preceding_op.abs_time*1e9:.1f} ns "
f"on output channel {clashing_op.hardware_channel}."
)
[docs]def _determine_clock_sample_start(
hardware_channel: str,
abs_time: float,
operation_name: str = "",
) -> Tuple[int, float]:
"""
depending on the output channel, select the right clock cycle time and sample rate
from the channel descriptor for ZI channels.
the sample is returned as a float to preserve information of incorrect rounding to
full samples if present.
"""
if "uhfqa" in hardware_channel:
hw_sample_rate = DEVICE_SAMPLING_RATES[zhinst.DeviceType.UHFQA][
0
] # 0 -Assumes the default setting for the clock select
hw_clock_rate = hw_sample_rate / CLOCK_SAMPLE_FACTOR
# UHFQA has a 4.444 ns clock cycle (8 samples of ~0.55 ns)
# 9 clock cycles = 40 ns
elif "hdawg" in hardware_channel:
hw_sample_rate = DEVICE_SAMPLING_RATES[zhinst.DeviceType.HDAWG][
0
] # 0 -Assumes the default setting for the clock select
hw_clock_rate = hw_sample_rate / CLOCK_SAMPLE_FACTOR
# HDAWG has a 3.333 ns clock cycle (8 samples of ~0.4 ns)
# 3 clock cycles is 10 ns
else:
error_message = (
f"Instrument type for channel {hardware_channel} not recognized. "
+ "Could not determine clock and sample start."
)
raise NotImplementedError(f"{error_message}")
# next determine what clock cycle and sample things happen on.
# here we do a combination of floor and round as the samples are added afterwards.
# The round to 3 decimals serves to catch an edge when rounding to x.999999 clocks.
clock_cycle = np.floor(np.round(abs_time * hw_clock_rate, decimals=3))
sample_time = abs_time - clock_cycle / hw_clock_rate
# first done using floating point to be able to detect incorrect rounding on samples
sample_float = sample_time * hw_sample_rate
sample = round(sample_float)
if not np.all(np.isclose(sample_float, sample, atol=0.0001)):
# tip, the common clock cycle of UHF and HDAWG is 40 ns, for HDAWG pulses only
# 10 ns is a nice clock multiple as well.
raise ValueError(
f"Rounding to samples not exact for operation ({operation_name}) at time "
f"({abs_time*1e9:.1f} ns). Attempting to round ({sample_float}) "
f"to ({sample}) \n TIP: Try to ensure waveforms start a multiple of"
" the samlping rate e.g., try multiples of 10 ns for the HDAWG or 40 ns for"
" UFHQA pulses."
)
return (clock_cycle, sample_float)
[docs]def _determine_clock_start(hardware_channel: str, abs_time: float, operation_name: str):
if hardware_channel is None:
return float("nan")
clock_start, _ = _determine_clock_sample_start(
hardware_channel=hardware_channel,
abs_time=abs_time,
operation_name=operation_name,
)
return clock_start
[docs]def _determine_sample_start(
hardware_channel: str, abs_time: float, operation_name: str
):
if hardware_channel is None:
return float("nan")
_, sample_start = _determine_clock_sample_start(
hardware_channel=hardware_channel,
abs_time=abs_time,
operation_name=operation_name,
)
return sample_start
[docs]def _apply_latency_corrections(
timing_table: pd.DataFrame, latency_dict: dict
) -> pd.DataFrame:
"""
Changes the "abs_time" of a timing table depending on the specified latencies
for each port-clock combination as specified in the latency dict. The latencies are
added to the abs_time elements fulfilling the specific port-clock combination.
"""
for port_clock_combination_key in latency_dict:
port, clock = port_clock_combination_key.split("-")
port_mask = timing_table["port"] == port
clock_mask = timing_table["clock"] == clock
full_mask_combination = port_mask * clock_mask
latency_corr = latency_dict[port_clock_combination_key]
timing_table.loc[full_mask_combination, "abs_time"] += latency_corr
return timing_table
[docs]def _determine_measurement_fixpoint_correction(
measurement_start_sample: int, common_frequency: float = 600e6
) -> Tuple[float, int]:
"""
Calculates by how much time to shift all operations to ensure a measurement starts
at sample 0.
Parameters
----------
measurement_start_sample:
the sample at which the measurement starts
common_frequency:
The shift needs to be such that it occurs at a multiple of the common frequency.
A larger common frequency results in a smaller time correction.
This largest common frequency is the common frequency of the HDAWG and UHFQA and
is 600 MHz.
Returns
--------
:
The time correction to be applied in seconds
:
The correction in number of samples.
"""
uhf_sampling_rate = 1.8e9
samples_per_clock_cycle = 8
uhf_common_sample = uhf_sampling_rate / common_frequency
if not uhf_common_sample.is_integer():
raise ValueError(
f"Invalid common frequency: The UHF sampling rate ({uhf_sampling_rate}) is "
f"not a multiple of the common frequency {common_frequency}."
)
required_sample_correction = (-measurement_start_sample) % samples_per_clock_cycle
success = False
for i in range(10):
sample_correction = int(i * uhf_common_sample)
if sample_correction % samples_per_clock_cycle == required_sample_correction:
success = True
break
if not success:
raise ValueError("Could not identify a measurement fixpoint correction")
time_shift = sample_correction / uhf_sampling_rate
return time_shift, sample_correction
[docs]def _apply_measurement_fixpoint_correction(
timing_table: pd.DataFrame, common_frequency: float = 600e6
) -> pd.DataFrame:
"""
Updates the abs_time of all operations based on the measurement fixpoint correction.
The abs_time is applied to all operations between two acquisitions.
After that the samples and clocks are re-calculated to reflect this change in time.
Parameters
----------
timing_table:
A timing table that has the samples already determined.
common_frequency:
The shift needs to be such that it occurs at a multiple of the common frequency.
A larger common frequency results in a smaller time correction.
This largest common frequency is the common frequency of the HDAWG and UHFQA and
is 600 MHz.
"""
acquisitions = timing_table[timing_table.is_acquisition]
time_corrections = np.zeros(len(timing_table))
prev_idx = 0
cumulative_sample_corrections = 0
# FIXME: there is an edge-case in the slicing of the operations when
# the pulse of the measurement is applied after triggering the acquisition.
# this should be included explicitly in the indices to slice (acquisitions.index)
for idx, sample_start in zip(acquisitions.index, acquisitions.sample_start):
effective_start_sample = round(sample_start + cumulative_sample_corrections)
time_corr, sample_corr = _determine_measurement_fixpoint_correction(
measurement_start_sample=effective_start_sample,
common_frequency=common_frequency,
)
# all subsequent waveforms are shifted in time
time_corrections[prev_idx:] += time_corr
cumulative_sample_corrections += sample_corr
prev_idx = idx + 1
timing_table["abs_time"] += time_corrections
# After shifting operations to align the measurement for the fixpoint correction the
# clock and sample start needs to be updated.
timing_table = _add_clock_sample_starts(timing_table=timing_table)
return timing_table
[docs]def _add_clock_sample_starts(timing_table: pd.DataFrame) -> pd.DataFrame:
"""
Adds the sequence clock cycle start and sampling start of each operation for each
channel
"""
timing_table["clock_cycle_start"] = timing_table.apply(
lambda row: _determine_clock_start(
hardware_channel=row["hardware_channel"],
abs_time=row["abs_time"],
operation_name=row["operation"],
),
axis=1,
)
timing_table["sample_start"] = timing_table.apply(
lambda row: _determine_sample_start(
hardware_channel=row["hardware_channel"],
abs_time=row["abs_time"],
operation_name=row["operation"],
),
axis=1,
)
return timing_table
[docs]def _parse_local_oscillators(data: Dict[str, Any]) -> Dict[str, common.LocalOscillator]:
"""
Returns the LocalOscillator domain models parsed from the data dictionary.
Parameters
----------
data :
The hardware map "local_oscillators" entry.
Returns
-------
:
A dictionary of unique LocalOscillator instances.
Raises
------
RuntimeError
If duplicate LocalOscillators have been found.
"""
local_oscillators: Dict[str, common.LocalOscillator] = dict()
lo_list: List[common.LocalOscillator] = common.LocalOscillator.schema().load(
data, many=True
)
for local_oscillator in lo_list:
if local_oscillator.unique_name in local_oscillators:
raise RuntimeError(
f"Duplicate entry LocalOscillators '{local_oscillator.unique_name}' in "
"hardware configuration!"
)
local_oscillators[local_oscillator.unique_name] = local_oscillator
return local_oscillators
[docs]def _parse_devices(data: Dict[str, Any]) -> List[zhinst.Device]:
device_list: List[zhinst.Device] = zhinst.Device.schema().load(data, many=True)
for device in device_list:
if device.device_type.value not in SUPPORTED_DEVICE_TYPES:
raise NotImplementedError(
f"Unable to create zhinst backend for '{device.device_type.value}'!"
)
sample_rates = DEVICE_SAMPLING_RATES[device.device_type]
if not device.clock_select in sample_rates:
raise ValueError(
f"Unknown value clock_select='{device.clock_select}' "
+ f"for device type '{device.device_type.value}'"
)
device.sample_rate = sample_rates[device.clock_select]
return device_list
[docs]def _validate_schedule(schedule: Schedule) -> None:
"""
Validates the CompiledSchedule required values for creating the backend.
Parameters
----------
schedule :
Raises
------
ValueError
The validation error.
"""
if len(schedule.schedulables) == 0:
raise ValueError(f"Undefined schedulables for schedule '{schedule.name}'!")
for schedulable in schedule.schedulables.values():
if "abs_time" not in schedulable.keys():
raise ValueError(
"Absolute timing has not been determined "
+ f"for the schedule '{schedule.name}'!"
)
[docs]def _flatten_dict(collection: Dict[Any, Any]) -> Iterable[Tuple[Any, Any]]:
"""
Flattens a collection to an iterable set of tuples.
Parameters
----------
collection :
Returns
-------
:
"""
def expand(key, obj):
if isinstance(obj, dict):
for i, value in obj.items():
yield from expand(i, value)
elif isinstance(obj, list):
for value in obj:
yield (key, value)
else:
yield (key, obj)
return expand(None, collection)
[docs]def _get_instruction_list(
output_timing_table: pd.DataFrame,
) -> List[zhinst.Instruction]:
"""
Iterates over a timing table for a specific output for which clock_cycle_start and
waveform_id have been determined to return a list of all instructions to be played
on a Zurich Instruments device.
"""
instruction_list: List[zhinst.Instruction] = []
for _, row in output_timing_table.iterrows():
if row.is_acquisition:
instruction_list.append(
zhinst.Acquisition(
waveform_id=row.waveform_id,
abs_time=row.abs_time,
duration=row.duration,
clock_cycle_start=row.clock_cycle_start,
)
)
else:
instruction_list.append(
zhinst.Wave(
waveform_id=row.waveform_id,
abs_time=row.abs_time,
duration=row.duration,
clock_cycle_start=row.clock_cycle_start,
)
)
return instruction_list
@dataclass(frozen=True)
[docs]class ZIAcquisitionConfig:
"""Zurich Instruments acquisition configuration.
Parameters
----------
n_acquisitions :
the number of distinct acquisitions in this experiment.
resolvers:
resolvers used to retrieve the results from the right UHFQA nodes.
See also :mod:`~quantify_scheduler.backends.zhinst.resolvers`
bin_mode:
the bin mode (average or append)
"""
[docs] resolvers: Dict[int, Callable]
[docs] bin_mode: enums.BinMode
@dataclass(frozen=True)
[docs]class ZIDeviceConfig:
"""
Zurich Instruments device configuration.
Parameters
----------
name :
the name of the schedule the config is for.
settings_builder:
the builder to configure the ZI settings. This typically includes AWG and
AWG settings.
acq_config:
the acquisition config contains the number of acquisitions and a dictionary of
resolvers used to retrieve the results from the right UHFQA nodes.
Note that this part of the config is not needed during prepare, but only during
the retrieve acquisitions step.
"""
[docs] settings_builder: zi_settings.ZISettingsBuilder
[docs] acq_config: Optional[ZIAcquisitionConfig]
[docs]def compile_backend(
schedule: Schedule, hardware_cfg: Dict[str, Any]
) -> CompiledSchedule:
"""
Compiles backend for Zurich Instruments hardware according
to the CompiledSchedule and hardware configuration.
This method generates sequencer programs, waveforms and
configurations required for the instruments defined in
the hardware configuration.
Parameters
----------
schedule :
The schedule to be compiled.
hardware_cfg :
Hardware configuration, defines the compilation step from
the pulse-level to a hardware backend.
Returns
-------
:
A collection containing the compiled backend
configuration for each device.
Raises
------
NotImplementedError
Thrown when using unsupported ZI Instruments.
"""
_validate_schedule(schedule)
if "latency_corrections" in hardware_cfg.keys():
# Important: currently only used to validate the input, should also be
# used for storing the latency corrections
# (see also https://gitlab.com/groups/quantify-os/-/epics/1)
LatencyCorrections(latencies=hardware_cfg["latency_corrections"])
schedule = apply_distortion_corrections(schedule, hardware_cfg)
################################################
# Timing table manipulation
################################################
# the schedule has a Styled pandas dataframe as the return type.
# here we want to manipulate the data directly so we extract the raw dataframe.
timing_table = schedule.timing_table.data
# information is added on what output channel is used for every pulse and acq.
port_clock_channelmapping = _extract_port_clock_channelmapping(hardware_cfg)
timing_table = _add_channel_information(
timing_table=timing_table, port_clock_channelmapping=port_clock_channelmapping
)
# the timing of all pulses and acquisitions is corrected based on the latency corr.
latency_dict = determine_relative_latencies(hardware_cfg)
timing_table = _apply_latency_corrections(
timing_table=timing_table, latency_dict=latency_dict
)
# ensure that operations are still sorted by time after applying the latency corr.
timing_table.sort_values("abs_time", inplace=True)
# add the sequencer clock cycle start and sampling start for the operations.
timing_table = _add_clock_sample_starts(timing_table=timing_table)
# After adjusting for the latencies, the fix-point correction can be applied.
# the fix-point correction has the goal to ensure that all measurement operations
# will always start at a multiple of *all* relevant clock domains.
# this is achieved by shifting all instructions between different measurements
# by the same amount of samples.
timing_table = _apply_measurement_fixpoint_correction(
timing_table=timing_table, common_frequency=600e6
)
# because of the shifting in time on a sub-clock delay, up to 8 distinct waveforms
# may be required to realize the identical pulse. Pre-modulation adds another
# variant depending on the starting phase of the operation.
timing_table = _add_waveform_ids(timing_table=timing_table)
ensure_no_operations_overlap(timing_table)
# Parse the hardware configuration file, zhinst.Device is a dataclass containing
# device descriptions (name, type, channels etc. )
devices: List[zhinst.Device] = _parse_devices(hardware_cfg["devices"])
local_oscillators: Dict[str, common.LocalOscillator] = _parse_local_oscillators(
hardware_cfg["local_oscillators"]
)
################################################
# Constructing the waveform table
################################################
device_dict = {}
for dev in devices:
device_dict[dev.name] = dev
numerical_wf_dict = construct_waveform_table(
timing_table, operations_dict=schedule.operations, device_dict=device_dict
)
################################################
# Above here is the layer that translates what should happen at the device to what
# output needs to be generated to realize that.
# COMPILATION SHOULD BE SPLIT HERE
# Below here is the layer that translates the timing table to instructions for the
# hardware.
################################################
################################################
# Assemble waveforms and timeline into seqc
################################################
# keys are instrument names, and the ZIDeviceConfig contain the settings incl seqc
# to configure.
device_configs: Dict[str, Union[ZIDeviceConfig, float]] = dict()
for device in devices:
if device.device_type == zhinst.DeviceType.HDAWG:
builder = _compile_for_hdawg(
device=device,
timing_table=timing_table,
numerical_wf_dict=numerical_wf_dict,
repetitions=schedule.repetitions,
)
acq_config: Optional[ZIAcquisitionConfig] = None
elif device.device_type == zhinst.DeviceType.UHFQA:
acq_metadata = schedule_helpers.extract_acquisition_metadata_from_schedule(
schedule
)
bin_mode = acq_metadata.bin_mode
builder, acq_config = _compile_for_uhfqa(
device=device,
timing_table=timing_table,
numerical_wf_dict=numerical_wf_dict,
repetitions=schedule.repetitions,
operations=schedule.operations,
bin_mode=bin_mode,
)
else:
raise NotImplementedError(f"{device.device_type} not supported.")
device_configs[device.name] = ZIDeviceConfig(device.name, builder, acq_config)
# add the local oscillator config by iterating over all output channels.
# note that not all output channels have an LO associated to them.
for channel in device.channels:
_add_lo_config(
channel=channel,
local_oscillators=local_oscillators,
device_configs=device_configs,
resources=schedule.resources,
)
schedule["compiled_instructions"] = device_configs
schedule._hardware_timing_table = timing_table
schedule._hardware_waveform_dict = numerical_wf_dict
compiled_schedule = CompiledSchedule(schedule)
return compiled_schedule
[docs]def _add_lo_config(
channel: zhinst.Output,
local_oscillators: Dict[str, common.LocalOscillator],
resources: Dict[str, Resource],
device_configs: Dict[str, Union[ZIDeviceConfig, float]],
) -> None:
"""
Adds configuration for a local oscillator required for a specific output channel to
the device configs.
"""
# N.B. when using baseband pulses no LO will be associated to the channel.
# this case is caught in the case where the channel.clock is not specified.
unique_name = channel.local_oscillator
if unique_name not in local_oscillators:
raise KeyError(f'Missing configuration for LocalOscillator "{unique_name}"')
local_oscillator = local_oscillators[unique_name]
# the frequencies from the config file
((lo_freq_key, lo_freq_val),) = local_oscillator.frequency.items()
# Get the power of the local oscillator
if local_oscillator.power:
((power_key, power_val),) = local_oscillator.power.items()
# Get the phase of the local oscillator
if local_oscillator.phase:
((phase_key, phase_val),) = local_oscillator.phase.items()
interm_freq = channel.modulation.interm_freq
if (lo_freq_val is not None) and (interm_freq is not None):
rf_freq = lo_freq_val + interm_freq
else:
channel_clock_resource = resources.get(channel.clock)
if channel_clock_resource is not None:
rf_freq = channel_clock_resource.get("freq")
else:
# no clock is specified for this channel.
# this can happen for e.g., baseband pulses or when the channel is not used
# in the schedule.
return
if lo_freq_val is None and interm_freq is not None:
lo_freq_val = rf_freq - interm_freq
local_oscillator.frequency[lo_freq_key] = lo_freq_val
elif interm_freq is None and lo_freq_val is not None:
interm_freq = rf_freq - lo_freq_val
channel.modulation.interm_freq = interm_freq
elif interm_freq is None and lo_freq_val is None:
raise ValueError(
"Either local oscillator frequency or channel intermediate frequency "
f'must be set for LocalOscillator "{unique_name}"'
)
if local_oscillator.unique_name in device_configs:
# the device_config currently only contains the frequency
if device_configs[local_oscillator.unique_name].get("frequency") != lo_freq_val:
raise ValueError(
f'Multiple frequencies assigned to LocalOscillator "{unique_name}"'
)
lo_config = {
f"{local_oscillator.instrument_name}.{lo_freq_key}": lo_freq_val,
}
if local_oscillator.power:
lo_config[f"{local_oscillator.instrument_name}.{power_key}"] = power_val
if local_oscillator.phase:
lo_config[f"{local_oscillator.instrument_name}.{phase_key}"] = phase_val
# This line detects if the generic_icc_name exists in the local_oscillator entry of
# the hardware_config. If it exists, then, it takes the entry value, if not, the
# generic_icc_name takes the default value from the generic icc base module.
if local_oscillator.generic_icc_name:
generic_icc_name = local_oscillator.generic_icc_name
else:
generic_icc_name = generic_icc_default_name
if generic_icc_name in device_configs:
device_configs[generic_icc_name].update(lo_config)
else:
device_configs[generic_icc_name] = lo_config
[docs]def _add_wave_nodes(
device_type: zhinst.DeviceType,
awg_index: int,
wf_id_mapping: Dict[str, int],
numerical_wf_dict: Dict[str, np.ndarray],
settings_builder: zi_settings.ZISettingsBuilder,
) -> zi_settings.ZISettingsBuilder:
for wf_id, wf_index in wf_id_mapping.items():
if wf_id not in numerical_wf_dict:
# this is to catch an edge-case where certain acquisitions do not set
# integration weights. Ideally, these should be filtered before the wf_id
# is added to the wf_id_mapping, but it is easier to catch here.
continue
numerical_waveform = numerical_wf_dict[wf_id]
waveform = Waveform(numerical_waveform.real, numerical_waveform.imag)
if device_type == zhinst.DeviceType.UHFQA:
settings_builder.with_csv_wave_vector(awg_index, wf_index, waveform.data)
else:
settings_builder.with_wave_vector(awg_index, wf_index, waveform.data)
return settings_builder
[docs]def _compile_for_hdawg(
device: zhinst.Device,
timing_table: pd.DataFrame,
numerical_wf_dict: Dict[str, np.ndarray],
repetitions: int,
) -> zi_settings.ZISettingsBuilder:
"""
Parameters
----------
device :
timing_table :
numerical_wf_dict :
repetitions :
Raises
------
ValueError
"""
# calculating duration over all operations instead of only the last ensures a
# long operation near the end does not get overlooked.
schedule_duration = (timing_table.abs_time + timing_table.duration).max()
########################################
# Add standard settings to builder
########################################
settings_builder = zi_settings.ZISettingsBuilder()
n_awgs: int = int(device.n_channels / 2)
settings_builder.with_defaults(
[
("sigouts/*/on", 0),
("awgs/*/single", 1),
]
).with_system_channelgrouping(device.channelgrouping)
# Set the clock-rate of an AWG
for awg_index in range(n_awgs):
settings_builder.with_awg_time(awg_index, device.clock_select) # type: ignore
# device.type is either HDAWG8 or HDAWG4
channelgroups_mode = HDAWG_DEVICE_TYPE_CHANNEL_GROUPS[device.type]
# Defaults to mode =0 -> value = 2 -> sequencers control pairs of channels
channelgroups_value = channelgroups_mode[device.channelgrouping]
sequencer_step = int(channelgroups_value / 2) # nr of awg pairs per sequencer
# the index of the last sequencer to configure
# N.B. 8-11-2021 the min(len(device.channels)) might make the wrong choice when
# using only awgs 2 and 3. To be tested.
sequencer_stop = min(len(device.channels), int(n_awgs / sequencer_step))
logger.debug(
f"HDAWG[{device.name}] devtype={device.device_type} "
+ f" awg_count={n_awgs} {str(device)}"
)
enabled_outputs: Dict[int, zhinst.Output] = dict()
for i, awg_index in enumerate(range(0, sequencer_stop, sequencer_step)):
# here Output corresponds to an awg unit or a channel pair
# and is a dataclass containing info on port, clock, gain etc.
output = device.channels[i]
if output is None:
raise ValueError(f"Required output at index '{i}' is undefined!")
logger.debug(f"[{device.name}-awg{awg_index}] enabling outputs...")
mixer_corrections = (
output.mixer_corrections
if not output.mixer_corrections is None
else common.MixerCorrections()
)
settings_builder.with_sigouts(awg_index, (1, 1)).with_gain(
awg_index, (output.gain1, output.gain2)
).with_sigout_offset(
int(awg_index * 2), mixer_corrections.dc_offset_I
).with_sigout_offset(
int(awg_index * 2) + 1, mixer_corrections.dc_offset_Q
)
enabled_outputs[awg_index] = output
############################################
# Add seqc instructions and waveform table
############################################
for awg_index, output in enabled_outputs.items():
# select only the instructions relevant for the output channel.
output_timing_table = timing_table[
timing_table["hardware_channel"] == f"{device.name}.awg{awg_index}"
]
instructions: List[zhinst.Instruction] = _get_instruction_list(
output_timing_table
)
# enumerate the waveform_ids used in this particular output channel
unique_wf_ids = output_timing_table.drop_duplicates(subset="waveform_id")[
"waveform_id"
]
# this table maps waveform ids to indices in the seqc command table.
wf_id_mapping = {}
for i, wf_id in enumerate(unique_wf_ids):
wf_id_mapping[wf_id] = i
# Step 1: Generate and compile sequencer program AND
# Step 2: Set CommandTable JSON vector
(seqc, commandtable_json) = _assemble_hdawg_sequence(
instructions=instructions,
wf_id_mapping=wf_id_mapping,
numerical_wf_dict=numerical_wf_dict,
repetitions=repetitions,
schedule_duration=schedule_duration,
markers=device.channels[awg_index].markers,
trigger=device.channels[awg_index].trigger,
)
logger.debug(seqc)
logger.debug(commandtable_json)
settings_builder.with_commandtable_data(awg_index, commandtable_json)
settings_builder.with_compiler_sourcestring(awg_index, seqc)
#######################################################
# Set waveforms to wave nodes in the settings builder
#######################################################
# Step 3: Upload waveforms to AWG CommandTable
_add_wave_nodes(
device_type=zhinst.DeviceType.HDAWG,
awg_index=awg_index,
wf_id_mapping=wf_id_mapping,
numerical_wf_dict=numerical_wf_dict,
settings_builder=settings_builder,
)
return settings_builder
[docs]def _assemble_hdawg_sequence(
instructions: List[zhinst.Instruction],
wf_id_mapping: Dict[str, int],
numerical_wf_dict: Dict[str, np.ndarray],
repetitions: int,
schedule_duration: float,
markers: Union[str, int, None] = None,
trigger: int = None,
) -> Tuple[str, str]:
""" """
seqc_instructions = ""
commandtable_json = str({})
seqc_gen = seqc_il_generator.SeqcILGenerator()
# Declare sequence variables
seqc_gen.declare_var("__repetitions__", repetitions)
###############################################################
# Generate the command table and waveforms
###############################################################
command_table_entries: List[zhinst.CommandTableEntry] = list()
for waveform_id, waveform_index in wf_id_mapping.items():
name: str = f"w{waveform_index}"
waveform = numerical_wf_dict[waveform_id]
# Create and add variables to the Sequence program
# as well as assign the variables with operations
seqc_gen.declare_wave(name)
seqc_gen.assign_placeholder(name, len(waveform))
seqc_gen.emit_assign_wave_index(name, name, index=waveform_index)
# Do bookkeeping for the CommandTable
command_table_entry = zhinst.CommandTableEntry(
index=len(command_table_entries),
waveform=zhinst.CommandTableWaveform(
index=waveform_index, length=len(waveform)
),
)
command_table_entries.append(command_table_entry)
command_table = zhinst.CommandTable(table=command_table_entries)
###############################################################
# Add the loop that executes the program.
###############################################################
# N.B. All HDAWG markers can be used to trigger a UHFQA or other HDAWGs.
# marker output is set to 0 before the loop is started
seqc_il_generator.add_set_trigger(
seqc_gen, value=0, device_type=zhinst.DeviceType.HDAWG
)
seqc_gen.emit_begin_repeat("__repetitions__")
current_clock: int = 0
# set markers to high at the start of the repeition if this is the primary
# channel or wait for an external trigger if this is a secondary channel
if markers is not None and len(markers) > 0:
current_clock += seqc_il_generator.add_set_trigger(
seqc_gen,
value=markers,
device_type=zhinst.DeviceType.HDAWG,
)
elif trigger is not None:
assert trigger in [1, 2]
seqc_gen.emit_wait_dig_trigger(
index=trigger,
comment=f"\t// clock={current_clock}",
device_type=zhinst.DeviceType.HDAWG,
)
else:
# If the hardware config does not provide any settings assume this is a
# primary HDAWG channel and send triggers on all channels
current_clock += seqc_il_generator.add_set_trigger(
seqc_gen,
value=["AWG_MARKER1", "AWG_MARKER2"],
device_type=zhinst.DeviceType.HDAWG,
)
# this is where a longer wait statement is added to allow for latency corrections.
for instruction in instructions:
assert isinstance(instruction, zhinst.Wave)
clock_cycles_to_wait = instruction.clock_cycle_start - current_clock
if clock_cycles_to_wait < 0:
# a common mistake if there is no overlap if the instruction needs to start
# to soon after the start of a new cycle.
raise ValueError(
"Negative wait time, please ensure operations do not overlap in time."
)
current_clock += seqc_il_generator.add_wait(
seqc_gen=seqc_gen,
delay=int(clock_cycles_to_wait),
device_type=zhinst.DeviceType.HDAWG,
comment=f"clock={current_clock}",
)
current_clock += seqc_il_generator.add_execute_table_entry(
seqc_gen=seqc_gen,
index=wf_id_mapping[instruction.waveform_id],
device_type=zhinst.DeviceType.HDAWG,
comment=f"clock={current_clock}",
)
current_clock += seqc_il_generator.add_set_trigger(
seqc_gen,
value=0,
device_type=zhinst.DeviceType.HDAWG,
comment=f"clock={current_clock}",
)
# clock rate = 2.4e9/8 for HDAWG
clock_rate = DEVICE_SAMPLING_RATES[zhinst.DeviceType.HDAWG][0] / CLOCK_SAMPLE_FACTOR
total_duration_in_clocks = int(schedule_duration * clock_rate)
clock_cycles_to_wait = total_duration_in_clocks - current_clock
if trigger is None:
current_clock += seqc_il_generator.add_wait(
seqc_gen=seqc_gen,
delay=int(clock_cycles_to_wait),
device_type=zhinst.DeviceType.HDAWG,
comment=f"clock={current_clock}, dead time to ensure total schedule duration",
)
seqc_gen.emit_end_repeat()
seqc_instructions = seqc_gen.generate()
commandtable_json = command_table.to_json()
return seqc_instructions, commandtable_json
# pylint: disable=too-many-locals
[docs]def _compile_for_uhfqa(
device: zhinst.Device,
timing_table: pd.DataFrame,
numerical_wf_dict: Dict[str, np.ndarray],
repetitions: int,
operations: Dict[str, Operation],
bin_mode: enums.BinMode,
) -> Tuple[zi_settings.ZISettingsBuilder, ZIAcquisitionConfig]:
"""
Initialize programming the UHFQA ZI Instrument.
Creates a sequence program and converts schedule
pulses to waveforms for the UHFQA.
Parameters
----------
device :
timing_table :
numerical_wf_dict :
repetitions :
operations :
Returns
-------
:
"""
########################################
# Add standard settings to builder
########################################
settings_builder = zi_settings.ZISettingsBuilder()
instrument_info = zhinst.InstrumentInfo(
sample_rate=device.sample_rate,
num_samples_per_clock=CLOCK_SAMPLE_FACTOR,
granularity=WAVEFORM_GRANULARITY[device.device_type],
)
channels = device.channels
channels = list(filter(lambda c: c.mode == enums.SignalModeType.REAL, channels))
awg_index = 0
channel = channels[awg_index]
logger.debug(f"[{device.name}-awg{awg_index}] {str(device)}")
mixer_corrections = (
channel.mixer_corrections
if not channel.mixer_corrections is None
else common.MixerCorrections()
)
# Set all integration weigths to default
settings_builder.with_defaults(
[
("awgs/0/single", 1),
("qas/0/rotations/*", (1 + 1j)),
("qas/0/integration/sources/*", 0),
]
).with_sigouts(0, (1, 1)).with_awg_time(
0, device.clock_select
).with_qas_integration_weights_real(
channels=list(range(NUM_UHFQA_READOUT_CHANNELS)),
real=np.zeros(MAX_QAS_INTEGRATION_LENGTH),
).with_qas_integration_weights_imag(
channels=list(range(NUM_UHFQA_READOUT_CHANNELS)),
imag=np.zeros(MAX_QAS_INTEGRATION_LENGTH),
).with_sigout_offset(
0, mixer_corrections.dc_offset_I
).with_sigout_offset(
1, mixer_corrections.dc_offset_Q
)
logger.debug(f"[{device.name}-awg{awg_index}] channel={str(channel)}")
############################################
# Add seqc instructions and waveform table
############################################
# select only the instructions relevant for the output channel.
output_timing_table = timing_table[
timing_table["hardware_channel"] == f"{device.name}.awg{awg_index}"
]
instructions: List[zhinst.Instruction] = _get_instruction_list(output_timing_table)
# FIXME ensure unique_wf_ids is only for pulses and not integration weights
# enumerate the waveform_ids used in this particular output channel
unique_wf_ids = output_timing_table.drop_duplicates(subset="waveform_id")[
"waveform_id"
]
# this table maps waveform ids to indices in the seqc command table.
wf_id_mapping = {}
for i, wf_id in enumerate(unique_wf_ids):
wf_id_mapping[wf_id] = i
# # Generate and apply sequencer program
seqc = _assemble_uhfqa_sequence(
instructions=instructions,
wf_id_mapping=wf_id_mapping,
repetitions=repetitions,
device_name=device.name,
trigger=device.channels[awg_index].trigger,
)
settings_builder.with_compiler_sourcestring(awg_index, seqc)
logger.debug(seqc)
#######################################################
# Set waveforms to wave nodes in the settings builder
#######################################################
# Apply waveforms to AWG
settings_builder = _add_wave_nodes(
device_type=zhinst.DeviceType.UHFQA,
awg_index=0,
wf_id_mapping=wf_id_mapping,
numerical_wf_dict=numerical_wf_dict,
settings_builder=settings_builder,
)
#######################################################
# Set integration weights and configure acquisitions
#######################################################
# Get a list of all acquisition protocol channels
acq_channel_resolvers_map: Dict[int, Callable[..., Any]] = dict()
# select only the acquisition operations relevant for the output channel.
timing_table_acquisitions = output_timing_table[output_timing_table.is_acquisition]
timing_table_unique_acquisitions = timing_table_acquisitions.drop_duplicates(
subset="waveform_id"
)
n_unique_acquisitions = len(timing_table_acquisitions)
if bin_mode == enums.BinMode.AVERAGE:
n_acquisitions = n_unique_acquisitions
elif bin_mode == enums.BinMode.APPEND:
n_acquisitions = n_unique_acquisitions * repetitions
repetitions = 1
else:
raise NotImplementedError(f"BinMode {bin_mode} is not supported.")
# These variables have to be identical for all acquisitions.
# initialized to None here and overwritten while iterating over the acquisitions.
acq_duration: float = float("nan")
# a list of used acquisition channels, this is used to raise an exception
# when multiple acquisitions assign to the same channel.
acq_channels_used: List[int] = []
for _, acq_row in timing_table_unique_acquisitions.iterrows():
acquisition = operations[acq_row.operation]
wf_id = acq_row.wf_idx
acq_info = acquisition.data["acquisition_info"][acq_row.wf_idx]
# update acq_duration only if it was not set before
acq_duration = acq_info["duration"] if np.isnan(acq_duration) else acq_duration
# verify that the both durations are identical, if not raise an exception
# this exception relates to a limitation of the hardware.
if acq_duration != acq_info["duration"]:
raise ValueError(
f"Different acquisitions have a different duration "
f"{acq_duration*1e9:.1f}ns and {acq_info['duration']*1e9:.1f}ns. "
"The integration lenght needs to be identical for all acquisitions."
)
acq_protocol: str = acq_info["protocol"]
acq_channel: int = acq_info["acq_channel"]
if acq_channel not in acq_channels_used:
acq_channels_used.append(acq_channel)
else:
raise ValueError(
f"Acquisition channel {acq_channel} is already used by another "
"acquisition. Different acquisitions should use a unique "
"acquisition channel."
f"Offending acquisition ({acq_row.waveform_id})"
)
integration_length = round(acq_duration * instrument_info.sample_rate)
logger.debug(
f"[{device.name}] acq_info={acq_info} "
+ f" acq_duration={acq_duration} integration_length={integration_length}"
)
if acq_protocol == "Trace":
# Disable Weighted integration because we'd like to see
# the raw signal.
settings_builder.with_qas_monitor_enable(True).with_qas_monitor_averages(
repetitions
).with_qas_monitor_length(
integration_length
).with_qas_integration_weights_real(
list(range(NUM_UHFQA_READOUT_CHANNELS)),
np.ones(MAX_QAS_INTEGRATION_LENGTH),
).with_qas_integration_weights_imag(
list(range(NUM_UHFQA_READOUT_CHANNELS)),
np.ones(MAX_QAS_INTEGRATION_LENGTH),
)
monitor_nodes = (
"qas/0/monitor/inputs/0/wave",
"qas/0/monitor/inputs/1/wave",
)
acq_channel_resolvers_map[acq_channel] = partial(
resolvers.monitor_acquisition_resolver, monitor_nodes=monitor_nodes
)
else:
# The waveform is slightly larger then the integration_length
# because of the waveform granularity. This is irrelevant
# due to the waveform being appended with zeros. Therefore
# avoiding an extra slice of waveform[0:integration_length]
acquisition_waveform = numerical_wf_dict[acq_row.waveform_id]
weights_i = np.zeros(MAX_QAS_INTEGRATION_LENGTH)
weights_q = np.zeros(MAX_QAS_INTEGRATION_LENGTH)
weights_i[0 : len(acquisition_waveform)] = acquisition_waveform.real
weights_q[0 : len(acquisition_waveform)] = acquisition_waveform.imag
# set the integration weights, note that we need to set 4 weights in order
# to use a complex valued weight function in the right way.
# Z = (w0*sI + w1*sQ) + 1j ( w1*sI - w0 * sQ)
settings_builder.with_qas_integration_weights_real(
2 * acq_channel, list(weights_i)
).with_qas_integration_weights_imag(
2 * acq_channel, list(weights_q)
).with_qas_integration_weights_real(
2 * acq_channel + 1, list(weights_q)
).with_qas_integration_weights_imag(
2 * acq_channel + 1, list(-1 * weights_i)
)
# Create partial function for delayed execution
acq_channel_resolvers_map[acq_channel] = partial(
resolvers.result_acquisition_resolver,
result_nodes=[
f"qas/0/result/data/{2*acq_channel}/wave",
f"qas/0/result/data/{2*acq_channel+1}/wave",
],
)
# only configure these variables if there are actually acquisitions present in
# the schedule.
if len(timing_table_unique_acquisitions) > 0:
integration_length = round(acq_duration * instrument_info.sample_rate)
settings_builder.with_qas_integration_mode(
zhinst.QasIntegrationMode.NORMAL
).with_qas_integration_length(integration_length).with_qas_result_enable(
False
).with_qas_monitor_enable(
False
).with_qas_delay(
0
)
settings_builder.with_qas_result_mode(
zhinst.QasResultMode.CYCLIC
).with_qas_result_source(
zhinst.QasResultSource.INTEGRATION
).with_qas_result_length(
n_acquisitions
).with_qas_result_enable(
True
).with_qas_result_averages(
repetitions
)
settings_builder.with_qas_result_reset(0).with_qas_result_reset(1)
settings_builder.with_qas_monitor_reset(0).with_qas_monitor_reset(1)
return (
settings_builder,
ZIAcquisitionConfig(
n_unique_acquisitions,
resolvers=acq_channel_resolvers_map,
bin_mode=bin_mode,
),
)
[docs]def _assemble_uhfqa_sequence(
instructions: List[zhinst.Instruction],
wf_id_mapping: Dict[str, int],
repetitions: int,
device_name: str,
trigger: int = 2,
) -> str:
""" """
seqc_instructions = ""
seqc_gen = seqc_il_generator.SeqcILGenerator()
# Declare sequence variables
seqc_gen.declare_var("__repetitions__", repetitions)
current_clock: int = 0
###############################################################
# Generate the .csv based waveform table
###############################################################
seqc_il_generator.declare_csv_waveform_variables(
seqc_gen=seqc_gen,
device_name=device_name,
waveform_indices=list(wf_id_mapping.values()),
awg_index=0,
)
###############################################################
# Add the loop that executes the program.
###############################################################
seqc_gen.emit_begin_repeat("__repetitions__")
# N.B.! The UHFQA will always need to be triggered by an external device such as
# an HDAWG or a trigger box. It will wait for a trigger.
# Triggers must be a list but we may only wait for one so lets choose the
# first one in the list, I guess.
# This does not account for dio ports. Which are not implemented in the current
# version.
assert trigger < 5 and trigger > 0
assert trigger is not None
seqc_gen.emit_wait_dig_trigger(
index=trigger,
comment=f"\t// clock={current_clock}",
device_type=zhinst.DeviceType.UHFQA,
)
# this is where a longer wait statement is added to allow for latency corrections.
for instruction in instructions:
clock_cycles_to_wait = instruction.clock_cycle_start - current_clock
if clock_cycles_to_wait < 0:
# a common mistake if there is no overlap if the instruction needs to start
# to soon after the start of a new cycle.
raise ValueError(
"Negative wait time, please ensure operations do not overlap in time."
)
current_clock += seqc_il_generator.add_wait(
seqc_gen=seqc_gen,
delay=int(clock_cycles_to_wait),
device_type=zhinst.DeviceType.UHFQA,
comment=f"clock={current_clock}",
)
# Acquisition
if isinstance(instruction, zhinst.Acquisition):
current_clock += seqc_il_generator.add_start_qa(
seqc_gen=seqc_gen,
device_type=zhinst.DeviceType.UHFQA,
comment=f"clock={current_clock}",
)
# Waveform
elif isinstance(instruction, zhinst.Wave):
current_clock += seqc_il_generator.add_play_wave(
seqc_gen,
f"w{wf_id_mapping[instruction.waveform_id]}",
device_type=zhinst.DeviceType.UHFQA,
comment=f"clock={current_clock}",
)
seqc_gen.emit_end_repeat()
seqc_instructions = seqc_gen.generate()
return seqc_instructions