# Repository: https://gitlab.com/quantify-os/quantify-scheduler
# Licensed according to the LICENCE file on the main branch
"""Helper functions to generate acq_indices."""
from __future__ import annotations
import warnings
from functools import reduce
from operator import mul
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Tuple, Union
from quantify_scheduler.enums import BinMode
from quantify_scheduler.helpers.schedule import (
_is_binned_type_protocol,
)
from quantify_scheduler.operations.control_flow_library import (
ConditionalOperation,
LoopOperation,
)
from quantify_scheduler.schedules.schedule import (
AcquisitionChannelData,
AcquisitionChannelsData,
ScheduleBase,
)
if TYPE_CHECKING:
from quantify_scheduler.operations.operation import Operation
[docs]
SchedulableLabel = Union[str, None]
[docs]
FullSchedulableLabel = Tuple[SchedulableLabel, ...]
[docs]
SchedulableLabelToAcquisitionIndex = Dict[Tuple[FullSchedulableLabel, int], Union[int, List[int]]]
"""
A mapping from schedulables to an acquisition index.
This mapping helps the backend to figure out which
binned acquisition corresponds to which acquisition index.
Note, it maps the (full schedulable label, acq_num_in_operation) to the bin,
where acq_num_in_operation is the i.th acquisition in the operation;
in other words: the i.th element in the acquisition_info.
Only defined for binned acquisitions, and backend independent.
For control flows, the `None` in the schedulable label refers to the `body`
of the control flow. This is for future proofing, if control flows were extended
to include maybe multiple suboperations.
"""
[docs]
def _prod(iterable: Iterable[int]) -> int:
return reduce(mul, iterable, 1)
[docs]
def _warn_unsupported_bin_mode(protocol: str, bin_mode: BinMode, acq_channel: Hashable) -> None:
warnings.warn(
f"Unsupported bin mode '{bin_mode}' for "
f"acquisition protocol '{protocol}' "
f"on acq_channel '{acq_channel}'.",
RuntimeWarning,
)
[docs]
def _generate_acq_channels_data_binned(
acq_channel_data: AcquisitionChannelData,
schedulable_label_to_acq_index: SchedulableLabelToAcquisitionIndex,
full_schedulable_label: FullSchedulableLabel,
nested_loop_repetitions: list[int],
coords: dict,
acq_num_in_operation: int,
acq_channel: Hashable,
acq_index: int | None,
) -> None:
"""
Generates the acquisition channel data, and updates acq_channel_data,
and updates schedulable_label_to_acq_index.
"""
assert isinstance(acq_channel_data.coords, list)
if acq_channel_data.bin_mode == BinMode.AVERAGE:
if acq_index is not None and acq_index != len(acq_channel_data.coords):
warnings.warn(
f"Found invalid {acq_index=} for {acq_channel=}. "
f"Make sure that each explicitly defined acq_index "
f"starts at 0, and increments by 1 for each new acquisition "
f"within the same acquisition channel, ordered by time.",
RuntimeWarning,
)
new_acq_index = len(acq_channel_data.coords)
schedulable_label_to_acq_index[(full_schedulable_label, acq_num_in_operation)] = (
new_acq_index
)
acq_channel_data.coords.append(coords)
elif acq_channel_data.bin_mode == BinMode.APPEND:
multiple_coords: list[dict] = []
if len(nested_loop_repetitions) == 0:
multiple_coords = [coords]
else:
if acq_index is not None:
warnings.warn(
(
f"Explicitly defined acquisition index for an append mode acquisition "
f"within a loop will not be supported in the future. "
f"Ignoring {acq_index=} for {acq_channel=}."
),
FutureWarning,
)
overall_nested_loop_repetitions = _prod(nested_loop_repetitions)
multiple_coords = [
{"loop_repetition": lr, **coords} for lr in range(overall_nested_loop_repetitions)
]
first_acq_index = len(acq_channel_data.coords)
number_of_new_acq_indices = len(multiple_coords)
schedulable_label_to_acq_index[(full_schedulable_label, acq_num_in_operation)] = list(
range(
first_acq_index,
first_acq_index + number_of_new_acq_indices,
)
)
acq_channel_data.coords.extend(multiple_coords)
else:
_warn_unsupported_bin_mode(
protocol=acq_channel_data.protocol,
bin_mode=acq_channel_data.bin_mode,
acq_channel=acq_channel,
)
[docs]
def _validate_trace_protocol(
acq_channel: Hashable,
acq_channels_data: AcquisitionChannelsData,
bin_mode: BinMode,
protocol: str,
nested_loop_repetitions: list[int], # noqa: ARG001
) -> None:
if acq_channel in acq_channels_data:
warnings.warn(
f"Multiple acquisitions found for acq_channel '{acq_channel}' "
f"which has a trace acquisition. "
f"Only one trace acquisition is allowed for each acq_channel.",
RuntimeWarning,
)
if bin_mode not in (BinMode.AVERAGE, BinMode.FIRST, BinMode.APPEND):
_warn_unsupported_bin_mode(
protocol=protocol,
bin_mode=bin_mode,
acq_channel=acq_channel,
)
[docs]
def _generate_acq_channels_data_for_protocol(
acquisitions_info: list[dict],
acq_channels_data: AcquisitionChannelsData,
schedulable_label_to_acq_index: SchedulableLabelToAcquisitionIndex,
full_schedulable_label: FullSchedulableLabel,
nested_loop_repetitions: list[int],
is_explicit_acq_index: bool,
) -> None:
"""
Generates the acquisition channel data, and updates acq_channel_data,
and updates schedulable_label_to_acq_index.
"""
for acq_num_in_operation, acq_info in enumerate(acquisitions_info):
acq_channel: Hashable = acq_info["acq_channel"]
protocol: str = acq_info["protocol"]
bin_mode: BinMode = acq_info["bin_mode"]
# Coords is intended to be introduced later to the operation.
# coords: dict = acq_info["coords"]
coords: dict = {}
acq_index: int | None = acq_info["acq_index"]
# If is_explicit_acq_index, then only acquisitions where acq_index
# is explicitly defined will be taken into account;
# otherwise only the acquisitions where it's not defined.
if is_explicit_acq_index is (acq_index is None):
continue
if (acq_channel_data := acq_channels_data.get(acq_channel, None)) is not None:
if acq_channel_data.protocol != protocol:
warnings.warn(
f"Found different acquisition protocols "
f"('{acq_channel_data.protocol}' and '{protocol}') "
f"for acq_channel '{acq_channel}'. "
f"Make sure there is only one protocol for each acq_channel.",
RuntimeWarning,
)
if acq_channel_data.bin_mode != bin_mode:
warnings.warn(
f"Found different bin modes "
f"('{acq_channel_data.bin_mode}' and '{bin_mode}') "
f"for acq_channel '{acq_channel}'. "
f"Make sure there is only one bin mode for each acq_channel.",
RuntimeWarning,
)
if _is_binned_type_protocol(protocol):
if acq_channel not in acq_channels_data:
acq_channels_data[acq_channel] = AcquisitionChannelData(
acq_index_dim_name=("acq_index_" + str(acq_channel)),
protocol=protocol,
bin_mode=bin_mode,
coords=[],
)
_generate_acq_channels_data_binned(
acq_channel_data=acq_channels_data[acq_channel],
schedulable_label_to_acq_index=schedulable_label_to_acq_index,
full_schedulable_label=full_schedulable_label,
nested_loop_repetitions=nested_loop_repetitions,
coords=coords,
acq_num_in_operation=acq_num_in_operation,
acq_channel=acq_channel,
acq_index=acq_index,
)
elif protocol == "Trace":
_validate_trace_protocol(
acq_channel=acq_channel,
acq_channels_data=acq_channels_data,
bin_mode=bin_mode,
protocol=protocol,
nested_loop_repetitions=nested_loop_repetitions,
)
acq_channels_data[acq_channel] = AcquisitionChannelData(
acq_index_dim_name=("acq_index_" + str(acq_channel)),
protocol=protocol,
bin_mode=bin_mode,
coords=coords,
)
elif protocol in ("TriggerCount", "Timetag", "TimetagTrace", "ThresholdedTriggerCount"):
if (
(
protocol == "TriggerCount"
and bin_mode
not in (
BinMode.DISTRIBUTION,
BinMode.APPEND,
BinMode.SUM,
)
)
or (
protocol == "Timetag"
and bin_mode
not in (
BinMode.AVERAGE,
BinMode.APPEND,
)
)
or (protocol == "TimetagTrace" and bin_mode != BinMode.APPEND)
or (protocol == "ThresholdedTriggerCount" and bin_mode != BinMode.APPEND)
):
_warn_unsupported_bin_mode(
protocol=protocol,
bin_mode=bin_mode,
acq_channel=acq_channel,
)
acq_channels_data[acq_channel] = AcquisitionChannelData(
acq_index_dim_name=("acq_index_" + str(acq_channel)),
protocol=protocol,
bin_mode=bin_mode,
coords=coords,
)
else:
warnings.warn(
f"Unsupported acquisition protocol '{protocol}' "
f"on acq_channel '{acq_channel}'.",
RuntimeWarning,
)
[docs]
def _generate_acq_channels_data(
operation: ScheduleBase | Operation,
acq_channels_data: AcquisitionChannelsData,
schedulable_label_to_acq_index: SchedulableLabelToAcquisitionIndex,
is_explicit_acq_index: bool,
full_schedulable_label: FullSchedulableLabel,
nested_loop_repetitions: list[int],
) -> None:
"""
Adds mappings to acq_channels_data and schedulable_label_to_acq_index;
these are the output arguments; the others are input arguments.
If explicit_acq_indices is True,
then it only adds Schedulables where acq_index is not None,
otherwise only adds Schedulables where acq_index is None.
In this latter case, it will generate the acq_index.
"""
if isinstance(operation, ScheduleBase):
sorted_schedulables = sorted(operation.schedulables.values(), key=lambda s: s["abs_time"])
for schedulable in sorted_schedulables:
schedulable_label = schedulable["name"]
new_full_schedulable_label = full_schedulable_label + (schedulable_label,)
inner_operation = operation.operations[schedulable["operation_id"]]
_generate_acq_channels_data(
operation=inner_operation,
acq_channels_data=acq_channels_data,
schedulable_label_to_acq_index=schedulable_label_to_acq_index,
is_explicit_acq_index=is_explicit_acq_index,
full_schedulable_label=new_full_schedulable_label,
nested_loop_repetitions=nested_loop_repetitions,
)
elif isinstance(operation, LoopOperation):
# For control flows, `None` signifies we refer to the `body` of the control flow.
new_full_schedulable_label: FullSchedulableLabel = full_schedulable_label + (None,)
repetitions: int = operation.data["control_flow_info"]["repetitions"]
new_nested_loop_repetitions: list[int] = nested_loop_repetitions + [repetitions]
_generate_acq_channels_data(
operation=operation.body,
acq_channels_data=acq_channels_data,
schedulable_label_to_acq_index=schedulable_label_to_acq_index,
is_explicit_acq_index=is_explicit_acq_index,
full_schedulable_label=new_full_schedulable_label,
nested_loop_repetitions=new_nested_loop_repetitions,
)
elif isinstance(operation, ConditionalOperation):
# For control flows, `None` signifies we refer to the `body` of the control flow.
new_full_schedulable_label = full_schedulable_label + (None,)
_generate_acq_channels_data(
operation=operation.body,
acq_channels_data=acq_channels_data,
schedulable_label_to_acq_index=schedulable_label_to_acq_index,
is_explicit_acq_index=is_explicit_acq_index,
full_schedulable_label=new_full_schedulable_label,
nested_loop_repetitions=nested_loop_repetitions,
)
elif operation.valid_acquisition:
_generate_acq_channels_data_for_protocol(
acquisitions_info=operation.data["acquisition_info"],
acq_channels_data=acq_channels_data,
schedulable_label_to_acq_index=schedulable_label_to_acq_index,
full_schedulable_label=full_schedulable_label,
nested_loop_repetitions=nested_loop_repetitions,
is_explicit_acq_index=is_explicit_acq_index,
)
[docs]
def generate_acq_channels_data(
schedule: ScheduleBase,
) -> tuple[AcquisitionChannelsData, SchedulableLabelToAcquisitionIndex]:
"""
Generate acq_index for every schedulable,
and validate schedule regarding the acquisitions.
This function generates the ``AcquisitionChannelData`` for every ``acq_channel``,
and the ``SchedulableLabelToAcquisitionIndex``. It assumes the schedule is device-level.
"""
acq_channels_data: AcquisitionChannelsData = dict()
schedulable_label_to_acq_index: SchedulableLabelToAcquisitionIndex = dict()
# First we generate all mappings for Schedulables
# where acq_index is explicitly given.
# In the next step we generate new acq_indices
# and mapping for Schedulables where acq_index is None.
#
# The reason for this is that
# * for compatibility reasons, temporarily we'd like to allow users to explicitly specify
# acquisition index on the operation (the long-term intention is not to allow this,
# and only allow the compiler to generate an acquisition index itself), and
# * the acquisition mapping data stores each acquisition index in a list, and the list
# index is not stored explicitly (to store memory), only implicitly in the `coords` list.
# Imagine the schedule: `Acq(acq_index=0); Acq(acq_index(acq_index=None); Acq(acq_index=1);`.
# We choose the following convention: the acquisition indices start from 0, increment by 1, this
# is a restriction **only** where the acquisition index is explicitly set by the user.
# (We could have chosen a different convention, but probably for the user this is easier than
# the other convention that the acqusition indices are incremented by one for all acquisitions,
# even when the acquisition index is not explicitly specified by the user.)
# Then, the only way to generate the acquisition mapping is by first iterating through the
# acquisition operations where the acquisition index has been explicitly defined.
_generate_acq_channels_data(
schedule,
acq_channels_data,
schedulable_label_to_acq_index,
is_explicit_acq_index=True,
full_schedulable_label=(),
nested_loop_repetitions=[],
)
_generate_acq_channels_data(
schedule,
acq_channels_data,
schedulable_label_to_acq_index,
is_explicit_acq_index=False,
full_schedulable_label=(),
nested_loop_repetitions=[],
)
return acq_channels_data, schedulable_label_to_acq_index