Source code for quantify_scheduler.compilation

# Repository: https://gitlab.com/quantify-os/quantify-scheduler
# Licensed according to the LICENCE file on the main branch
"""Compiler for the quantify_scheduler."""
from __future__ import annotations

import logging
import warnings
from copy import deepcopy
from typing import TYPE_CHECKING, Any, Literal
from uuid import uuid4

from quantify_scheduler.helpers.schedule import _extract_port_clocks_used
from quantify_scheduler.json_utils import load_json_schema, validate_json
from quantify_scheduler.operations.operation import Operation
from quantify_scheduler.schedules.schedule import Schedulable, Schedule

if TYPE_CHECKING:
    from quantify_scheduler.backends.graph_compilation import (
        CompilationConfig,
    )

[docs] logger = logging.getLogger(__name__)
[docs] class _ControlFlowReturn(Operation): """ An operation that signals the end of the current control flow statement. Cannot be added to Schedule manually. Parameters ---------- t0 : float, optional time offset, by default 0 """ def __init__(self, t0: float = 0) -> None: super().__init__(name="ControlFlowReturn") self.data.update( { "name": "ControlFlowReturn ", "control_flow_info": { "t0": t0, "duration": 0.0, "return_stack": True, }, } ) self._update() def __str__(self) -> str: return self._get_signature(self.data["control_flow_info"])
[docs] def _determine_absolute_timing( # noqa: PLR0912 schedule: Schedule, time_unit: Literal[ "physical", "ideal", None ] = "physical", # should be included in CompilationConfig config: CompilationConfig | None = None, ) -> Schedule: """ Determine the absolute timing of a schedule based on the timing constraints. This function determines absolute timings for every operation in the :attr:`~.ScheduleBase.schedulables`. It does this by: 1. iterating over all and elements in the :attr:`~.ScheduleBase.schedulables`. 2. determining the absolute time of the reference operation - reference point :code:`"ref_pt"` of the reference operation defaults to :code:`"end"` in case it is not set (i.e., is :code:`None`). 3. determining the start of the operation based on the :code:`rel_time` and :code:`duration` of operations - reference point :code:`"ref_pt_new"` of the added operation defaults to :code:`"start"` in case it is not set. Parameters ---------- schedule The schedule for which to determine timings. config Compilation config for :class:`~quantify_scheduler.backends.graph_compilation.QuantifyCompiler`. time_unit Whether to use physical units to determine the absolute time or ideal time. When :code:`time_unit == "physical"` the duration attribute is used. When :code:`time_unit == "ideal"` the duration attribute is ignored and treated as if it is :code:`1`. When :code:`time_unit == None` it will revert to :code:`"physical"`. Returns ------- : The modified ``schedule`` where the absolute time for each operation has been determined. Raises ------ NotImplementedError If the scheduling strategy is not "asap" """ time_unit = time_unit or "physical" if time_unit not in (valid_time_units := ("physical", "ideal")): raise ValueError( f"Undefined time_unit '{time_unit}'! Must be one of {valid_time_units}" ) for op in schedule.operations.values(): if isinstance(op, Schedule): if op.get("duration", None) is None: _determine_absolute_timing( schedule=op, time_unit=time_unit, config=config, ) elif ( time_unit == "physical" and not op.valid_pulse and not op.valid_acquisition ): # Gates do not have a defined duration, so only ideal timing is defined raise RuntimeError( f"Operation {op.name} is not a valid pulse or acquisition." f" Please check whether the device compilation has been performed." f" Operation data: {repr(op)}" ) # If called directly and not by the compiler, ensure control flow is resolved if config is None and time_unit == "physical": resolve_control_flow(schedule) scheduling_strategy = "asap" if config is not None: scheduling_strategy = config.device_compilation_config.scheduling_strategy if scheduling_strategy != "asap": raise NotImplementedError( f"{_determine_absolute_timing.__name__} does not currently support " f"{scheduling_strategy=}. Please change to 'asap' scheduling strategy " "in the `DeviceCompilationConfig`." ) if not schedule.schedulables: raise ValueError(f"schedule '{schedule.name}' contains no schedulables.") schedulable_iterator = iter(schedule.schedulables.values()) # The first schedulable by starts at time 0, and cannot have relative timings last_schedulable = next(schedulable_iterator) last_schedulable["abs_time"] = 0 for schedulable in schedulable_iterator: curr_op = schedule.operations[schedulable["operation_id"]] for t_constr in schedulable["timing_constraints"]: t_constr["ref_schedulable"] = t_constr["ref_schedulable"] or str( last_schedulable ) abs_time = _get_start_time(schedule, t_constr, curr_op, time_unit) if "abs_time" not in schedulable or abs_time > schedulable["abs_time"]: schedulable["abs_time"] = abs_time last_schedulable = schedulable schedule["duration"] = schedule.get_schedule_duration() if time_unit == "ideal": schedule["depth"] = schedule["duration"] + 1 return schedule
[docs] def determine_absolute_timing( # noqa: PLR0912 schedule: Schedule, time_unit: Literal[ "physical", "ideal", None ] = "physical", # should be included in CompilationConfig config: CompilationConfig | None = None, ) -> Schedule: """Determine the absolute timing of a schedule based on the timing constraints.""" warnings.warn( f"Calling {determine_absolute_timing.__name__} directly is deprecated " f"and will be removed from the public interface in quantify-scheduler " f">= 0.21.0. Please use `QuantifyCompiler` instead, which calls " f"{_determine_absolute_timing.__name__} as a compilation node.", FutureWarning, ) return _determine_absolute_timing( schedule=deepcopy(schedule), time_unit=time_unit, config=config )
[docs] def _get_start_time( schedule: Schedule, t_constr: dict[str, str | float], curr_op: Operation | Schedule, time_unit: str, ) -> float: # this assumes the reference op exists. This is ensured in schedule.add ref_schedulable = schedule.schedulables[str(t_constr["ref_schedulable"])] ref_op = schedule.operations[ref_schedulable["operation_id"]] # duration = 1 is useful when e.g., drawing a circuit diagram. duration_ref_op = ( ref_op.duration if time_unit == "physical" else ref_op.get("depth", 1) ) ref_pt = t_constr["ref_pt"] or "end" if ref_pt == "start": t0 = ref_schedulable["abs_time"] elif ref_pt == "center": t0 = ref_schedulable["abs_time"] + duration_ref_op / 2 elif ref_pt == "end": t0 = ref_schedulable["abs_time"] + duration_ref_op else: raise NotImplementedError(f'Timing "{ref_pt=}" not supported by backend.') duration_new_op = ( curr_op.duration if time_unit == "physical" else curr_op.get("depth", 1) ) ref_pt_new = t_constr["ref_pt_new"] or "start" if ref_pt_new == "start": abs_time = t0 + t_constr["rel_time"] elif ref_pt_new == "center": abs_time = t0 + t_constr["rel_time"] - duration_new_op / 2 elif ref_pt_new == "end": abs_time = t0 + t_constr["rel_time"] - duration_new_op else: raise NotImplementedError(f'Timing "{ref_pt_new=}" not supported by backend.') return abs_time
[docs] def resolve_control_flow( schedule: Schedule, config: CompilationConfig | None = None, port_clocks: set | None = None, ) -> Schedule: """ If control flow is used, insert virtual operations before and after the schedulable. Parameters ---------- schedule The schedule for which to fill relative timings. config Compilation config for :class:`~quantify_scheduler.backends.graph_compilation.QuantifyCompiler`, which is currently not used in this compilation step. port_clocks Port-clock combinations to be used for control flow. Determined automatically for the outermost schedule. Returns ------- : a new schedule object where the timing constraints for each operation have been determined. """ if not port_clocks: port_clocks = _extract_port_clocks_used(schedule) for op in schedule.operations.values(): if isinstance(op, Schedule): resolve_control_flow(op, config, port_clocks) if not schedule.schedulables: raise ValueError(f"schedule '{schedule.name}' contains no schedulables.") # Iterating through the shallow copy of items, because # we modify the schedulables dict. for schedulable_key, schedulable in schedule.schedulables.copy().items(): cf = schedulable.get("control_flow", None) if cf is not None: cf["pulse_info"] = [ { "wf_func": None, "clock": clock, "port": port, "duration": 0, **cf["control_flow_info"], } for port, clock in port_clocks ] rst_op = _ControlFlowReturn() rst_op["pulse_info"] = [ { "wf_func": None, "clock": clock, "port": port, "duration": 0, **rst_op["control_flow_info"], } for port, clock in port_clocks ] _move_to_end(schedule.schedulables, schedulable_key) schedule.add( cf, rel_time=-1e-12, ref_op=str(schedulable), ref_pt="start", ref_pt_new="start", validate=False, ) # insert return stack op after the current operation schedule.add( rst_op, rel_time=0, ref_op=str(schedulable), ref_pt="end", ref_pt_new="start", validate=False, ) else: _move_to_end(schedule.schedulables, schedulable_key) return schedule
[docs] def flatten_schedule( schedule: Schedule, config: CompilationConfig | None = None ) -> Schedule: """ Recursively flatten subschedules based on the absolute timing. Parameters ---------- schedule : Schedule schedule to be flattened config : CompilationConfig | None, optional Compilation config for :class:`~quantify_scheduler.backends.graph_compilation.QuantifyCompiler`, which is currently not only used to detect if the function is called directly. by default None Returns ------- Schedule Equivalent schedule without subschedules """ # If called directly and not by the compiler, ensure timings are filled if config is None and schedule.get("duration", None) is None: _determine_absolute_timing(schedule) all_resources = dict(schedule.resources) for op in schedule.operations.values(): if isinstance(op, Schedule): flatten_schedule(op, config) all_resources.update(op.resources) op_keys_to_pop = set() schedulable_keys_to_pop = set() # we cannot use .items() directly since we modify schedule.schedulables in the loop schedulable_iter = tuple(schedule.schedulables.items()) for schedulable_key, schedulable in schedulable_iter: op_key = schedulable["operation_id"] op = schedule.operations[op_key] if isinstance(op, Schedule): offset = schedulable["abs_time"] # insert new schedulables shifted by the correct offset for inner_schedulable in op.schedulables.values(): inner_op = op.operations[inner_schedulable["operation_id"]] _insert_op_at_time( schedule, inner_op, inner_schedulable["abs_time"] + offset ) # mark the inner schedule for removal from the parent op_keys_to_pop.add(op_key) schedulable_keys_to_pop.add(schedulable_key) else: _move_to_end(schedule.schedulables, schedulable_key) for key in op_keys_to_pop: schedule["operation_dict"].pop(key) for key in schedulable_keys_to_pop: schedule["schedulables"].pop(key) for resource in all_resources.values(): if resource.name not in schedule.resources: schedule.add_resource(resource) return schedule
[docs] def _insert_op_at_time( schedule: Schedule, operation: Operation, abs_time: float ) -> None: new_key = str(uuid4()) new_schedulable = Schedulable( name=new_key, operation_id=operation.hash, ) # Timing constraints in the new schedulable are meaningless, so remove the list new_schedulable["timing_constraints"] = None new_schedulable["abs_time"] = abs_time schedule["operation_dict"][operation.hash] = operation schedule["schedulables"][new_key] = new_schedulable
[docs] def validate_config(config: dict, scheme_fn: str) -> bool: """ Validate a configuration using a schema. Parameters ---------- config The configuration to validate scheme_fn The name of a json schema in the quantify_scheduler.schemas folder. Returns ------- : True if valid """ scheme = load_json_schema(__file__, scheme_fn) validate_json(config, scheme) return True
[docs] def _move_to_end(ordered_dict: dict, key: Any) -> None: # noqa: ANN401 """ Moves the element with ``key`` to the end of the dict. Note: dictionaries from Python 3.7 are ordered. """ value = ordered_dict.pop(key) ordered_dict[key] = value