Source code for quantify_scheduler.helpers.waveforms

# Repository: https://gitlab.com/quantify-os/quantify-scheduler
# Licensed according to the LICENCE file on the main branch
"""Module containing helper functions related to waveforms."""
from __future__ import annotations

import inspect
from functools import partial
from typing import Any, Dict, List, Protocol, Tuple

import numpy as np
import math

from quantify_scheduler import math as math_helpers, waveforms
from quantify_scheduler.schedules.schedule import Schedule
from quantify_scheduler.helpers import schedule as schedule_helpers
from quantify_scheduler.helpers.importers import import_python_object_from_string


# pylint: disable=too-few-public-methods
[docs] class GetWaveformPartial(Protocol): # typing.Protocol """Protocol type definition class for the get_waveform partial function.""" def __call__(self, sampling_rate: int) -> np.ndarray: """ Execute partial get_waveform function. Parameters ---------- sampling_rate The waveform sampling rate. Returns ------- : The waveform array. """
[docs] def get_waveform_size(waveform: np.ndarray, granularity: int) -> int: """ Return the number of samples required to respect the granularity. Parameters ---------- waveform Numerical waveform. granularity The granularity. """ size: int = len(waveform) if size % granularity != 0: size = math_helpers.closest_number_ceil(size, granularity) return max(size, granularity)
[docs] def resize_waveforms(waveforms_dict: Dict[int, np.ndarray], granularity: int) -> None: """ Resizes the waveforms to a multiple of the given granularity. Parameters ---------- waveforms_dict The waveforms dictionary. granularity The granularity. """ # Modify the list while iterating to avoid copies for pulse_id in waveforms_dict: waveforms_dict[pulse_id] = resize_waveform( waveforms_dict[pulse_id], granularity )
[docs] def resize_waveform(waveform: np.ndarray, granularity: int) -> np.ndarray: """ Return the waveform in a size that is a modulo of the given granularity. Parameters ---------- waveform The waveform array. granularity The waveform granularity. Returns ------- : The resized waveform with a length equal to `mod(len(waveform), granularity) == 0`. """ size: int = len(waveform) if size == 0: return np.zeros(granularity) if size % granularity == 0: return waveform remainder = math_helpers.closest_number_ceil(size, granularity) - size # Append the waveform with the remainder zeros return np.concatenate([waveform, np.zeros(remainder)])
[docs] def shift_waveform( waveform: np.ndarray, start_in_seconds: float, sampling_rate: int, resolution: int ) -> Tuple[int, np.ndarray]: """ Return the waveform shifted with a number of samples. This compensates for rounding errors that cause misalignment of the waveform in the clock time domain. .. Note:: when using this method be sure that the pulse starts at a `round(start_in_sequencer_count)`. .. code-block:: waveform = np.ones(32) sampling_rate = int(2.4e9) resolution: int = 8 t0: float = 16e-9 # 4.8 = 16e-9 / (8 / 2.4e9) start_in_sequencer_count = (t0 // (resolution / sampling_rate)) start_waveform_at_sequencer_count(start_in_sequencer_count, waveform) Parameters ---------- waveform The waveform. start_in_seconds The start time (in seconds). sampling_rate The sampling rate resolution The sequencer resolution. """ start_in_samples_count = round(start_in_seconds * sampling_rate) samples_shift = start_in_samples_count % resolution start_in_sequencer_count = start_in_samples_count // resolution if samples_shift == 0: return start_in_sequencer_count, waveform return start_in_sequencer_count, np.concatenate([np.zeros(samples_shift), waveform])
[docs] def get_waveform( pulse_info: Dict[str, Any], sampling_rate: float, ) -> np.ndarray: """ Return the waveform of a pulse_info dictionary. Parameters ---------- pulse_info The pulse_info dictionary. sampling_rate The sample rate of the waveform. Returns ------- : The waveform. """ t: np.ndarray = np.arange(0, 0 + pulse_info["duration"], 1 / sampling_rate) wf_func: str = pulse_info["wf_func"] waveform: np.ndarray = exec_waveform_function(wf_func, t, pulse_info) return waveform
[docs] def get_waveform_by_pulseid( schedule: Schedule, ) -> Dict[int, GetWaveformPartial]: """ Return a lookup dictionary of pulse_id and its partial waveform function. The keys are pulse info ids while the values are partial functions. Executing the waveform will return a :class:`numpy.ndarray`. Parameters ---------- schedule The schedule. """ pulseid_waveformfn_dict: Dict[int, GetWaveformPartial] = {} for schedulable in schedule.schedulables.values(): operation = schedule.operations[schedulable["operation_id"]] for pulse_info in operation["pulse_info"]: pulse_id = schedule_helpers.get_pulse_uuid(pulse_info) if pulse_id in pulseid_waveformfn_dict: # Unique waveform already populated in the dictionary. continue pulseid_waveformfn_dict[pulse_id] = partial( get_waveform, pulse_info=pulse_info ) for acq_info in operation["acquisition_info"]: for pulse_info in acq_info["waveforms"]: pulse_id = schedule_helpers.get_pulse_uuid(pulse_info) pulseid_waveformfn_dict[pulse_id] = partial( get_waveform, pulse_info=pulse_info ) return pulseid_waveformfn_dict
[docs] def exec_waveform_partial( pulse_id: int, pulseid_waveformfn_dict: Dict[int, GetWaveformPartial], sampling_rate: int, ) -> np.ndarray: """ Return the result of the partial waveform function. Parameters ---------- pulse_id The pulse uuid. pulseid_waveformfn_dict The partial waveform lookup dictionary. sampling_rate The sampling rate. Returns ------- : The waveform array. """ # Execute partial function get_waveform that already has # 'pulse_info' assigned. The following method execution # adds the missing required parameters. waveform_fn: GetWaveformPartial = pulseid_waveformfn_dict[pulse_id] waveform: np.ndarray = waveform_fn( sampling_rate=sampling_rate, ) return waveform
[docs] def exec_waveform_function(wf_func: str, t: np.ndarray, pulse_info: dict) -> np.ndarray: """ Return the result of the pulse's waveform function. If the wf_func is defined outside quantify-scheduler then the wf_func is dynamically loaded and executed using :func:`~quantify_scheduler.helpers.waveforms.exec_custom_waveform_function`. Parameters ---------- wf_func The custom waveform function path. t The linear timespace. pulse_info The dictionary containing pulse information. Returns ------- : Returns the computed waveform. """ whitelist: List[str] = ["square", "ramp", "soft_square", "drag"] fn_name: str = wf_func.split(".")[-1] waveform: np.ndarray = [] if wf_func.startswith("quantify_scheduler.waveforms") and fn_name in whitelist: if fn_name == "square": waveform = waveforms.square(t=t, amp=pulse_info["amp"]) elif fn_name == "ramp": if "offset" in pulse_info.keys(): waveform = waveforms.ramp( t=t, amp=pulse_info["amp"], offset=pulse_info["offset"] ) else: waveform = waveforms.ramp(t=t, amp=pulse_info["amp"]) elif fn_name == "soft_square": waveform = waveforms.soft_square(t=t, amp=pulse_info["amp"]) elif fn_name == "drag": waveform = waveforms.drag( t=t, G_amp=pulse_info["G_amp"], D_amp=pulse_info["D_amp"], duration=pulse_info["duration"], nr_sigma=pulse_info["nr_sigma"], phase=pulse_info["phase"], ) else: waveform = exec_custom_waveform_function(wf_func, t, pulse_info) return waveform
[docs] def exec_custom_waveform_function( wf_func: str, t: np.ndarray, pulse_info: dict ) -> np.ndarray: """ Load and import an ambiguous waveform function from a module by string. The parameters of the dynamically loaded wf_func are extracted using :func:`inspect.signature` while the values are extracted from the pulse_info dictionary. Parameters ---------- wf_func The custom waveform function path. t The linear timespace. pulse_info The dictionary containing pulse information. Returns ------- : Returns the computed waveform. """ # Load the waveform function from string function = import_python_object_from_string(wf_func) # select the arguments for the waveform function that are present # in pulse info par_map = inspect.signature(function).parameters wf_kwargs = {} for key in par_map.keys(): if key in pulse_info: wf_kwargs[key] = pulse_info[key] # Calculate the numerical waveform using the wf_func return function(t=t, **wf_kwargs)
[docs] def apply_mixer_skewness_corrections( waveform: np.ndarray, amplitude_ratio: float, phase_shift: float ) -> np.ndarray: r""" Apply a correction for amplitude imbalances and phase errors. Using an IQ mixer from previously calibrated values. Phase correction is done using: .. math:: Re(z_{corrected}) (t) = Re(z (t)) + Im(z (t)) \tan(\phi) Im(z_{corrected}) (t) = Im(z (t)) / \cos(\phi) The amplitude correction is achieved by rescaling the waveforms back to their original amplitudes and multiplying or dividing the I and Q signals respectively by the square root of the amplitude ratio. Parameters ---------- waveform: The complex valued waveform on which the correction will be applied. amplitude_ratio: The ratio between the amplitudes of I and Q that is used to correct for amplitude imbalances between the different paths in the IQ mixer. phase_shift: The phase error (in deg) used to correct the phase between I and Q. Returns ------- : The complex valued waveform with the applied phase and amplitude corrections. """ def skew_real(_waveform: np.ndarray, alpha: float, phi: float): original_amp = np.max(np.abs(_waveform.real)) intermediate_wf = _waveform.real + _waveform.imag * np.tan(phi) new_amp = np.max(np.abs(intermediate_wf)) intermediate_wf = ( intermediate_wf / new_amp if new_amp != 0 else np.zeros(intermediate_wf.shape) ) return intermediate_wf * original_amp * np.sqrt(alpha) def skew_imag(_waveform: np.ndarray, alpha: float, phi: float): original_amp = np.max(np.abs(_waveform.imag)) intermediate_wf = _waveform.imag / np.cos(phi) new_amp = np.max(np.abs(intermediate_wf)) intermediate_wf = ( intermediate_wf / new_amp if new_amp != 0 else np.zeros(intermediate_wf.shape) ) return intermediate_wf * original_amp / np.sqrt(alpha) corrected_re = skew_real(waveform, amplitude_ratio, np.deg2rad(phase_shift)) corrected_im = skew_imag(waveform, amplitude_ratio, np.deg2rad(phase_shift)) return corrected_re + 1.0j * corrected_im
[docs] def modulate_waveform( t: np.ndarray, envelope: np.ndarray, freq: float, t0: float = 0 ) -> np.ndarray: r""" Generate a (single sideband) modulated waveform from a given envelope. This is done by multiplying it with a complex exponential. .. math:: z_{mod} (t) = z (t) \cdot e^{2\pi i f (t+t_0)} The signs are chosen such that the frequencies follow the relation RF = LO + IF for LO, IF > 0. Parameters ---------- t A numpy array with time values envelope The complex-valued envelope of the modulated waveform freq The frequency of the modulation t0 Time offset for the modulation Returns ------- : The modulated waveform """ modulation = np.exp(1.0j * 2 * np.pi * freq * (t + t0)) return envelope * modulation
[docs] def normalize_waveform_data(data: np.ndarray) -> Tuple[np.ndarray, float, float]: """ Normalize waveform data, such that the value is +1.0 where the absolute value is maximal. This means that two waveforms where waveform_1 = c * waveform_2 (c can be any real number) will be normalized to the same normalized waveform data; this holds separately for the real and imaginary parts. Parameters ---------- data The waveform data to rescale. Returns ------- rescaled_data The rescaled data. amp_real The original amplitude of the real part. amp_imag The original amplitude of the imaginary part. """ amp_real_index = np.argmax(np.abs(data.real)) amp_imag_index = np.argmax(np.abs(data.imag)) amp_real = data.real[amp_real_index] amp_imag = data.imag[amp_imag_index] norm_data_re = ( data.real / amp_real if not math.isclose(amp_real, 0.0) else np.zeros(data.real.shape) ) if math.isclose(amp_imag, 0.0): rescaled_data = norm_data_re else: norm_data_im = data.imag / amp_imag rescaled_data = norm_data_re + 1.0j * norm_data_im return rescaled_data, amp_real, amp_imag
[docs] def area_pulses(pulses: List[Dict[str, Any]], sampling_rate: float) -> float: """ Calculate the area of a set of pulses. For details of the calculation see `area_pulse`. Parameters ---------- pulses List of dictionary with information of the pulses sampling_rate Sampling rate for the pulse Returns ------- : The area formed by all the pulses """ area: float = 0.0 for pulse in pulses: area += area_pulse(pulse, sampling_rate) return area
[docs] def area_pulse(pulse: Dict[str, Any], sampling_rate: float) -> float: """ Calculate the area of a single pulse. The sampled area is calculated, which means that the area calculated is based on the sampled waveform. This can differ slightly from the ideal area of the parameterized pulse. The duration used for calculation is the duration of the pulse. This duration is equal to the duration of the sampled waveform for pulse durations that are integer multiples of the 1/`sampling_rate`. Parameters ---------- pulse The dictionary with information of the pulse sampling_rate Sampling rate for the pulse Returns ------- : The area defined by the pulse """ if not sampling_rate > 0: raise ValueError( f"Attempting to calculate the area of a single pulse with " f"{sampling_rate=}. Sampling rate should be a positive number." ) # Nice to have: Give the user the option to choose integration algorithm if pulse["wf_func"] == "quantify_scheduler.waveforms.square": return pulse["amp"] * pulse["duration"] waveform: np.ndarray = get_waveform(pulse, sampling_rate) return waveform.mean() * pulse["duration"]