Source code for quantify_scheduler.helpers.schedule
# Repository: https://gitlab.com/quantify-os/quantify-scheduler
# Licensed according to the LICENCE file on the main branch
"""Schedule helper functions."""
from __future__ import annotations
from itertools import count
from typing import TYPE_CHECKING, Any, Hashable
from quantify_scheduler.helpers.collections import make_hash, without
from quantify_scheduler.operations.control_flow_library import ControlFlowOperation
from quantify_scheduler.schedules.schedule import (
AcquisitionChannelMetadata,
AcquisitionMetadata,
Schedule,
ScheduleBase,
)
if TYPE_CHECKING:
from quantify_scheduler.operations.operation import Operation
[docs]
def get_pulse_uuid(pulse_info: dict[str, Any], excludes: list[str] | None = None) -> int:
"""
Return an unique identifier for a pulse.
Parameters
----------
pulse_info
The pulse information dictionary.
excludes
A list of keys to exclude.
Returns
-------
:
The uuid hash.
"""
if excludes is None:
excludes = ["t0"]
return make_hash(without(pulse_info, excludes))
[docs]
def get_acq_uuid(acq_info: dict[str, Any]) -> int:
"""
Return an unique identifier for a acquisition protocol.
Parameters
----------
acq_info
The acquisition information dictionary.
Returns
-------
:
The uuid hash.
"""
return make_hash(without(acq_info, ["t0", "waveforms"]))
[docs]
def _generate_acq_info_by_uuid(
operation: Operation | ScheduleBase, acqid_acqinfo_dict: dict
) -> None:
if isinstance(operation, ScheduleBase):
for schedulable in operation.schedulables.values():
inner_operation = operation.operations[schedulable["operation_id"]]
_generate_acq_info_by_uuid(inner_operation, acqid_acqinfo_dict)
elif isinstance(operation, ControlFlowOperation):
_generate_acq_info_by_uuid(operation.body, acqid_acqinfo_dict)
else:
for acq_info in operation["acquisition_info"]:
acq_id = get_acq_uuid(acq_info)
if acq_id in acqid_acqinfo_dict:
# Unique acquisition info already populated in the dictionary.
continue
acqid_acqinfo_dict[acq_id] = acq_info
[docs]
def get_acq_info_by_uuid(schedule: Schedule) -> dict[int, dict[str, Any]]:
"""
Return a lookup dictionary of unique identifiers of acquisition information.
Parameters
----------
schedule
The schedule.
"""
acqid_acqinfo_dict: dict[int, dict[str, Any]] = {}
_generate_acq_info_by_uuid(schedule, acqid_acqinfo_dict)
return acqid_acqinfo_dict
[docs]
def extract_acquisition_metadata_from_schedule(
schedule: Schedule,
) -> AcquisitionMetadata:
"""
Extract acquisition metadata from a schedule.
This function operates under certain assumptions with respect to the schedule.
- The acquisition_metadata should be sufficient to initialize the xarray dataset
(described in quantify-core !212) that executing the schedule will result in.
- All measurements in the schedule use the same acquisition protocol.
- The used acquisition index channel combinations for each measurement are unique.
- The used acquisition indices for each channel are the same.
- When :class:`~quantify_scheduler.enums.BinMode` is :code:`APPEND` The number of
data points per acquisition index assumed to be given by the
schedule's repetition property. This implies no support for feedback (conditional
measurements).
Parameters
----------
schedule
schedule containing measurements from which acquisition metadata can be
extracted.
Returns
-------
:
The acquisition metadata provides a summary of the
acquisition protocol, bin-mode, return-type and acquisition indices
of the acquisitions in the schedule.
Raises
------
AssertionError
If not all acquisition protocols in a schedule are the same.
If not all acquisitions use the same bin_mode.
If the return type of the acquisitions is different.
""" # FIXME update when quantify-core!212 spec is ready
# a dictionary containing the acquisition indices used for each channel
acqid_acqinfo_dict = get_acq_info_by_uuid(schedule)
return extract_acquisition_metadata_from_acquisition_protocols(
acquisition_protocols=list(acqid_acqinfo_dict.values()),
repetitions=schedule.repetitions,
)
[docs]
def extract_acquisition_metadata_from_acquisition_protocols(
acquisition_protocols: list[dict[str, Any]], repetitions: int
) -> AcquisitionMetadata:
"""
Private function containing the logic of extract_acquisition_metadata_from_schedule.
The logic is factored out as to work around limitations of the different interfaces
required.
Parameters
----------
acquisition_protocols
A list of acquisition protocols.
repetitions
How many times the acquisition was repeated.
"""
acq_channels_metadata: dict[int, AcquisitionChannelMetadata] = {}
# Generating hardware indices this way is intended as a temporary solution.
# Proper solution: SE-298.
acq_channel_to_numeric_key: dict[Hashable, int] = {}
numeric_key_counter = count()
def _to_numeric_key(acq_channel: Hashable) -> int:
nonlocal numeric_key_counter
if acq_channel not in acq_channel_to_numeric_key:
acq_channel_to_numeric_key[acq_channel] = next(numeric_key_counter)
return acq_channel_to_numeric_key[acq_channel]
# Extract information from first protocol
protocol = acquisition_protocols[0]["protocol"]
bin_mode = acquisition_protocols[0]["bin_mode"]
acq_return_type = acquisition_protocols[0]["acq_return_type"]
for acq_protocol in acquisition_protocols:
# test limitation: all acquisition protocols in a schedule must be of
# the same kind
conflicts = []
if acq_protocol["protocol"] != protocol:
conflicts.append(
f'acquisition protocol: found {protocol} and {acq_protocol["protocol"]}'
)
if acq_protocol["bin_mode"] != bin_mode:
conflicts.append(
f"bin mode: found {bin_mode.__class__.__name__}.{bin_mode.name} and "
f'{acq_protocol["bin_mode"].__class__.__name__}.{acq_protocol["bin_mode"].name}'
)
if conflicts:
raise RuntimeError(
"All acquisitions in a Schedule must be of the same kind:\n" + "\n".join(conflicts)
)
# add the individual channel
acq_channel = acq_protocol["acq_channel"]
numeric_key = _to_numeric_key(acq_channel)
if numeric_key not in acq_channels_metadata:
acq_channels_metadata[numeric_key] = AcquisitionChannelMetadata(
acq_channel=acq_channel, acq_indices=[]
)
acq_indices = acq_protocol["acq_index"]
acq_channels_metadata[numeric_key].acq_indices.append(acq_indices)
# combine the information in the acq metadata dataclass.
acq_metadata = AcquisitionMetadata(
acq_protocol=protocol,
bin_mode=bin_mode,
acq_channels_metadata=acq_channels_metadata,
acq_return_type=acq_return_type,
repetitions=repetitions,
)
return acq_metadata
[docs]
def _extract_port_clocks_used(operation: Operation | Schedule) -> set[tuple]:
"""Extracts which port-clock combinations are used in an operation or schedule."""
if isinstance(operation, ScheduleBase):
port_clocks_used = set()
for op_data in operation.operations.values():
port_clocks_used |= _extract_port_clocks_used(op_data)
return port_clocks_used
elif isinstance(operation, ControlFlowOperation):
port_clocks_used = _extract_port_clocks_used(operation.body)
return port_clocks_used
elif operation.valid_pulse or operation.valid_acquisition:
port_clocks_used = set()
for op_info in operation["pulse_info"] + operation["acquisition_info"]:
if (port := op_info["port"]) is None or (clock := op_info["clock"]) is None:
continue
port_clocks_used.add((port, clock))
return port_clocks_used
else:
raise RuntimeError(
f"Operation {operation.name} is not a valid pulse or acquisition."
f" Please check whether the device compilation has been performed successfully."
f" Operation data: {repr(operation)}"
)